2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import java
.io
.FileNotFoundException
;
21 import java
.io
.IOException
;
22 import java
.util
.Arrays
;
23 import java
.util
.List
;
24 import java
.util
.stream
.Collectors
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.fs
.FileStatus
;
27 import org
.apache
.hadoop
.fs
.FileSystem
;
28 import org
.apache
.hadoop
.fs
.Path
;
29 import org
.apache
.hadoop
.hbase
.Cell
;
30 import org
.apache
.hadoop
.hbase
.CellScanner
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
33 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.master
.cleaner
.TimeToLiveHFileCleaner
;
36 import org
.apache
.hadoop
.hbase
.master
.snapshot
.SnapshotManager
;
37 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionFileSystem
;
39 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
40 import org
.apache
.hadoop
.hbase
.snapshot
.RestoreSnapshotHelper
;
41 import org
.apache
.hadoop
.hbase
.snapshot
.SnapshotTestingUtils
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
44 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
45 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
46 import org
.apache
.hadoop
.hbase
.util
.HFileArchiveUtil
;
47 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
48 import org
.junit
.After
;
49 import org
.junit
.Assert
;
50 import org
.junit
.ClassRule
;
51 import org
.junit
.Test
;
52 import org
.junit
.experimental
.categories
.Category
;
53 import org
.slf4j
.Logger
;
54 import org
.slf4j
.LoggerFactory
;
56 @Category({LargeTests
.class, ClientTests
.class})
57 public class TestTableSnapshotScanner
{
60 public static final HBaseClassTestRule CLASS_RULE
=
61 HBaseClassTestRule
.forClass(TestTableSnapshotScanner
.class);
63 private static final Logger LOG
= LoggerFactory
.getLogger(TestTableSnapshotScanner
.class);
64 private final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
65 private static final int NUM_REGION_SERVERS
= 2;
66 private static final byte[][] FAMILIES
= {Bytes
.toBytes("f1"), Bytes
.toBytes("f2")};
67 public static byte[] bbb
= Bytes
.toBytes("bbb");
68 public static byte[] yyy
= Bytes
.toBytes("yyy");
70 private FileSystem fs
;
73 public static void blockUntilSplitFinished(HBaseTestingUtility util
, TableName tableName
,
74 int expectedRegionSize
) throws Exception
{
75 for (int i
= 0; i
< 100; i
++) {
76 List
<RegionInfo
> hRegionInfoList
= util
.getAdmin().getRegions(tableName
);
77 if (hRegionInfoList
.size() >= expectedRegionSize
) {
84 public void setupCluster() throws Exception
{
85 setupConf(UTIL
.getConfiguration());
86 StartMiniClusterOption option
= StartMiniClusterOption
.builder()
87 .numRegionServers(NUM_REGION_SERVERS
).numDataNodes(NUM_REGION_SERVERS
)
88 .createRootDir(true).build();
89 UTIL
.startMiniCluster(option
);
90 rootDir
= UTIL
.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
91 fs
= rootDir
.getFileSystem(UTIL
.getConfiguration());
94 public void tearDownCluster() throws Exception
{
95 UTIL
.shutdownMiniCluster();
98 private static void setupConf(Configuration conf
) {
100 conf
.setBoolean(SnapshotManager
.HBASE_SNAPSHOT_ENABLED
, true);
104 public void tearDown() throws Exception
{
107 public static void createTableAndSnapshot(HBaseTestingUtility util
, TableName tableName
,
108 String snapshotName
, int numRegions
)
111 util
.deleteTable(tableName
);
112 } catch(Exception ex
) {
116 if (numRegions
> 1) {
117 util
.createTable(tableName
, FAMILIES
, 1, bbb
, yyy
, numRegions
);
119 util
.createTable(tableName
, FAMILIES
);
121 Admin admin
= util
.getAdmin();
123 // put some stuff in the table
124 Table table
= util
.getConnection().getTable(tableName
);
125 util
.loadTable(table
, FAMILIES
);
127 Path rootDir
= FSUtils
.getRootDir(util
.getConfiguration());
128 FileSystem fs
= rootDir
.getFileSystem(util
.getConfiguration());
130 SnapshotTestingUtils
.createSnapshotAndValidate(admin
, tableName
,
131 Arrays
.asList(FAMILIES
), null, snapshotName
, rootDir
, fs
, true);
133 // load different values
134 byte[] value
= Bytes
.toBytes("after_snapshot_value");
135 util
.loadTable(table
, FAMILIES
, value
);
137 // cause flush to create new files in the region
138 admin
.flush(tableName
);
143 public void testNoDuplicateResultsWhenSplitting() throws Exception
{
145 TableName tableName
= TableName
.valueOf("testNoDuplicateResultsWhenSplitting");
146 String snapshotName
= "testSnapshotBug";
148 if (UTIL
.getAdmin().tableExists(tableName
)) {
149 UTIL
.deleteTable(tableName
);
152 UTIL
.createTable(tableName
, FAMILIES
);
153 Admin admin
= UTIL
.getAdmin();
155 // put some stuff in the table
156 Table table
= UTIL
.getConnection().getTable(tableName
);
157 UTIL
.loadTable(table
, FAMILIES
);
159 // split to 2 regions
160 admin
.split(tableName
, Bytes
.toBytes("eee"));
161 blockUntilSplitFinished(UTIL
, tableName
, 2);
163 Path rootDir
= FSUtils
.getRootDir(UTIL
.getConfiguration());
164 FileSystem fs
= rootDir
.getFileSystem(UTIL
.getConfiguration());
166 SnapshotTestingUtils
.createSnapshotAndValidate(admin
, tableName
,
167 Arrays
.asList(FAMILIES
), null, snapshotName
, rootDir
, fs
, true);
169 // load different values
170 byte[] value
= Bytes
.toBytes("after_snapshot_value");
171 UTIL
.loadTable(table
, FAMILIES
, value
);
173 // cause flush to create new files in the region
174 admin
.flush(tableName
);
177 Path restoreDir
= UTIL
.getDataTestDirOnTestFS(snapshotName
);
178 Scan scan
= new Scan().withStartRow(bbb
).withStopRow(yyy
); // limit the scan
180 TableSnapshotScanner scanner
=
181 new TableSnapshotScanner(UTIL
.getConfiguration(), restoreDir
, snapshotName
, scan
);
183 verifyScanner(scanner
, bbb
, yyy
);
186 UTIL
.getAdmin().deleteSnapshot(snapshotName
);
187 UTIL
.deleteTable(tableName
);
193 public void testWithSingleRegion() throws Exception
{
194 testScanner(UTIL
, "testWithSingleRegion", 1, false);
198 public void testWithMultiRegion() throws Exception
{
199 testScanner(UTIL
, "testWithMultiRegion", 10, false);
203 public void testWithOfflineHBaseMultiRegion() throws Exception
{
204 testScanner(UTIL
, "testWithMultiRegion", 20, true);
208 public void testScannerWithRestoreScanner() throws Exception
{
210 TableName tableName
= TableName
.valueOf("testScanner");
211 String snapshotName
= "testScannerWithRestoreScanner";
213 createTableAndSnapshot(UTIL
, tableName
, snapshotName
, 50);
214 Path restoreDir
= UTIL
.getDataTestDirOnTestFS(snapshotName
);
215 Scan scan
= new Scan(bbb
, yyy
); // limit the scan
217 Configuration conf
= UTIL
.getConfiguration();
218 Path rootDir
= FSUtils
.getRootDir(conf
);
220 TableSnapshotScanner scanner0
=
221 new TableSnapshotScanner(conf
, restoreDir
, snapshotName
, scan
);
222 verifyScanner(scanner0
, bbb
, yyy
);
226 RestoreSnapshotHelper
.copySnapshotForScanner(conf
, fs
, rootDir
, restoreDir
, snapshotName
);
228 // scan the snapshot without restoring snapshot
229 TableSnapshotScanner scanner
=
230 new TableSnapshotScanner(conf
, rootDir
, restoreDir
, snapshotName
, scan
, true);
231 verifyScanner(scanner
, bbb
, yyy
);
234 // check whether the snapshot has been deleted by the close of scanner.
235 scanner
= new TableSnapshotScanner(conf
, rootDir
, restoreDir
, snapshotName
, scan
, true);
236 verifyScanner(scanner
, bbb
, yyy
);
239 // restore snapshot again.
240 RestoreSnapshotHelper
.copySnapshotForScanner(conf
, fs
, rootDir
, restoreDir
, snapshotName
);
242 // check whether the snapshot has been deleted by the close of scanner.
243 scanner
= new TableSnapshotScanner(conf
, rootDir
, restoreDir
, snapshotName
, scan
, true);
244 verifyScanner(scanner
, bbb
, yyy
);
247 UTIL
.getAdmin().deleteSnapshot(snapshotName
);
248 UTIL
.deleteTable(tableName
);
253 private void testScanner(HBaseTestingUtility util
, String snapshotName
, int numRegions
,
254 boolean shutdownCluster
) throws Exception
{
256 TableName tableName
= TableName
.valueOf("testScanner");
258 createTableAndSnapshot(util
, tableName
, snapshotName
, numRegions
);
260 if (shutdownCluster
) {
261 util
.shutdownMiniHBaseCluster();
264 Path restoreDir
= util
.getDataTestDirOnTestFS(snapshotName
);
265 Scan scan
= new Scan(bbb
, yyy
); // limit the scan
267 TableSnapshotScanner scanner
= new TableSnapshotScanner(UTIL
.getConfiguration(), restoreDir
,
270 verifyScanner(scanner
, bbb
, yyy
);
273 if (!shutdownCluster
) {
274 util
.getAdmin().deleteSnapshot(snapshotName
);
275 util
.deleteTable(tableName
);
281 private void verifyScanner(ResultScanner scanner
, byte[] startRow
, byte[] stopRow
)
282 throws IOException
, InterruptedException
{
284 HBaseTestingUtility
.SeenRowTracker rowTracker
=
285 new HBaseTestingUtility
.SeenRowTracker(startRow
, stopRow
);
288 Result result
= scanner
.next();
289 if (result
== null) {
293 rowTracker
.addRow(result
.getRow());
296 // validate all rows are seen
297 rowTracker
.validate();
300 private static void verifyRow(Result result
) throws IOException
{
301 byte[] row
= result
.getRow();
302 CellScanner scanner
= result
.cellScanner();
303 while (scanner
.advance()) {
304 Cell cell
= scanner
.current();
306 //assert that all Cells in the Result have the same key
307 Assert
.assertEquals(0, Bytes
.compareTo(row
, 0, row
.length
,
308 cell
.getRowArray(), cell
.getRowOffset(), cell
.getRowLength()));
311 for (int j
= 0; j
< FAMILIES
.length
; j
++) {
312 byte[] actual
= result
.getValue(FAMILIES
[j
], FAMILIES
[j
]);
313 Assert
.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes
.toString(row
)
314 + " ,actual:" + Bytes
.toString(actual
), row
, actual
);
319 public void testMergeRegion() throws Exception
{
321 TableName tableName
= TableName
.valueOf("testMergeRegion");
322 String snapshotName
= tableName
.getNameAsString() + "_snapshot";
323 Configuration conf
= UTIL
.getConfiguration();
324 Path rootDir
= UTIL
.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
325 long timeout
= 20000; // 20s
326 try (Admin admin
= UTIL
.getAdmin()) {
327 List
<String
> serverList
= admin
.getRegionServers().stream().map(sn
-> sn
.getServerName())
328 .collect(Collectors
.toList());
329 // create table with 3 regions
330 Table table
= UTIL
.createTable(tableName
, FAMILIES
, 1, bbb
, yyy
, 3);
331 List
<RegionInfo
> regions
= admin
.getRegions(tableName
);
332 Assert
.assertEquals(3, regions
.size());
333 RegionInfo region0
= regions
.get(0);
334 RegionInfo region1
= regions
.get(1);
335 RegionInfo region2
= regions
.get(2);
336 // put some data in the table
337 UTIL
.loadTable(table
, FAMILIES
);
338 admin
.flush(tableName
);
339 // wait flush is finished
340 UTIL
.waitFor(timeout
, () -> {
342 Path tableDir
= FSUtils
.getTableDir(rootDir
, tableName
);
343 for (RegionInfo region
: regions
) {
344 Path regionDir
= new Path(tableDir
, region
.getEncodedName());
345 for (Path familyDir
: FSUtils
.getFamilyDirs(fs
, regionDir
)) {
346 if (fs
.listStatus(familyDir
).length
!= 1) {
352 } catch (IOException e
) {
353 LOG
.warn("Failed check if flush is finished", e
);
358 admin
.compactionSwitch(false, serverList
);
359 admin
.mergeRegionsAsync(region0
.getEncodedNameAsBytes(), region1
.getEncodedNameAsBytes(),
361 UTIL
.waitFor(timeout
, () -> admin
.getRegions(tableName
).size() == 2);
362 List
<RegionInfo
> mergedRegions
= admin
.getRegions(tableName
);
363 RegionInfo mergedRegion
=
364 mergedRegions
.get(0).getEncodedName().equals(region2
.getEncodedName())
365 ? mergedRegions
.get(1)
366 : mergedRegions
.get(0);
368 admin
.snapshot(snapshotName
, tableName
);
369 Assert
.assertEquals(1, admin
.listSnapshots().size());
371 admin
.compactionSwitch(true, serverList
);
372 admin
.majorCompactRegion(mergedRegion
.getRegionName());
373 // wait until merged region has no reference
374 UTIL
.waitFor(timeout
, () -> {
376 for (RegionServerThread regionServerThread
: UTIL
.getMiniHBaseCluster()
377 .getRegionServerThreads()) {
378 HRegionServer regionServer
= regionServerThread
.getRegionServer();
379 for (HRegion subRegion
: regionServer
.getRegions(tableName
)) {
380 if (subRegion
.getRegionInfo().getEncodedName()
381 .equals(mergedRegion
.getEncodedName())) {
382 regionServer
.getCompactedHFilesDischarger().chore();
386 Path tableDir
= FSUtils
.getTableDir(rootDir
, tableName
);
387 HRegionFileSystem regionFs
= HRegionFileSystem
388 .openRegionFromFileSystem(UTIL
.getConfiguration(), fs
, tableDir
, mergedRegion
, true);
389 return !regionFs
.hasReferences(admin
.getDescriptor(tableName
));
390 } catch (IOException e
) {
391 LOG
.warn("Failed check merged region has no reference", e
);
395 // run catalog janitor to clean and wait for parent regions are archived
396 UTIL
.getMiniHBaseCluster().getMaster().getCatalogJanitor().choreForTesting();
397 UTIL
.waitFor(timeout
, () -> {
399 Path tableDir
= FSUtils
.getTableDir(rootDir
, tableName
);
400 for (FileStatus fileStatus
: fs
.listStatus(tableDir
)) {
401 String name
= fileStatus
.getPath().getName();
402 if (name
.equals(region0
.getEncodedName()) || name
.equals(region1
.getEncodedName())) {
407 } catch (IOException e
) {
408 LOG
.warn("Check if parent regions are archived error", e
);
412 // set file modify time and then run cleaner
413 long time
= System
.currentTimeMillis() - TimeToLiveHFileCleaner
.DEFAULT_TTL
* 1000;
414 traverseAndSetFileTime(HFileArchiveUtil
.getArchivePath(conf
), time
);
415 UTIL
.getMiniHBaseCluster().getMaster().getHFileCleaner().runCleaner();
417 try (TableSnapshotScanner scanner
= new TableSnapshotScanner(conf
,
418 UTIL
.getDataTestDirOnTestFS(snapshotName
), snapshotName
, new Scan(bbb
, yyy
))) {
419 verifyScanner(scanner
, bbb
, yyy
);
421 } catch (Exception e
) {
422 LOG
.error("scan snapshot error", e
);
423 Assert
.fail("Should not throw FileNotFoundException");
424 Assert
.assertTrue(e
.getCause() != null);
425 Assert
.assertTrue(e
.getCause().getCause() instanceof FileNotFoundException
);
431 private void traverseAndSetFileTime(Path path
, long time
) throws IOException
{
432 fs
.setTimes(path
, time
, -1);
433 if (fs
.isDirectory(path
)) {
434 List
<FileStatus
> allPaths
= Arrays
.asList(fs
.listStatus(path
));
435 List
<FileStatus
> subDirs
=
436 allPaths
.stream().filter(FileStatus
::isDirectory
).collect(Collectors
.toList());
437 List
<FileStatus
> files
=
438 allPaths
.stream().filter(FileStatus
::isFile
).collect(Collectors
.toList());
439 for (FileStatus subDir
: subDirs
) {
440 traverseAndSetFileTime(subDir
.getPath(), time
);
442 for (FileStatus file
: files
) {
443 fs
.setTimes(file
.getPath(), time
, -1);