2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.tool
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertNotNull
;
23 import static org
.junit
.Assert
.assertNull
;
24 import static org
.junit
.Assert
.assertTrue
;
25 import static org
.junit
.Assert
.fail
;
26 import static org
.mockito
.ArgumentMatchers
.any
;
27 import static org
.mockito
.ArgumentMatchers
.anyBoolean
;
28 import static org
.mockito
.ArgumentMatchers
.anyList
;
29 import static org
.mockito
.Mockito
.doReturn
;
30 import static org
.mockito
.Mockito
.spy
;
32 import java
.io
.IOException
;
33 import java
.nio
.ByteBuffer
;
34 import java
.util
.Deque
;
35 import java
.util
.List
;
37 import java
.util
.concurrent
.CompletableFuture
;
38 import java
.util
.concurrent
.atomic
.AtomicInteger
;
39 import java
.util
.stream
.IntStream
;
40 import org
.apache
.hadoop
.conf
.Configuration
;
41 import org
.apache
.hadoop
.fs
.FileSystem
;
42 import org
.apache
.hadoop
.fs
.Path
;
43 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
44 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
45 import org
.apache
.hadoop
.hbase
.HConstants
;
46 import org
.apache
.hadoop
.hbase
.TableExistsException
;
47 import org
.apache
.hadoop
.hbase
.TableName
;
48 import org
.apache
.hadoop
.hbase
.client
.Admin
;
49 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
51 import org
.apache
.hadoop
.hbase
.client
.Connection
;
52 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
53 import org
.apache
.hadoop
.hbase
.client
.Result
;
54 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
55 import org
.apache
.hadoop
.hbase
.client
.Scan
;
56 import org
.apache
.hadoop
.hbase
.client
.Table
;
57 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
58 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
59 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
60 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.TestHRegionServerBulkLoad
;
62 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
63 import org
.apache
.hadoop
.hbase
.testclassification
.MiscTests
;
64 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
65 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
66 import org
.apache
.hadoop
.hbase
.util
.Pair
;
67 import org
.junit
.AfterClass
;
68 import org
.junit
.BeforeClass
;
69 import org
.junit
.ClassRule
;
70 import org
.junit
.Rule
;
71 import org
.junit
.Test
;
72 import org
.junit
.experimental
.categories
.Category
;
73 import org
.junit
.rules
.TestName
;
74 import org
.slf4j
.Logger
;
75 import org
.slf4j
.LoggerFactory
;
77 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Multimap
;
79 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
82 * Test cases for the atomic load error handling of the bulk load functionality.
84 @Category({ MiscTests
.class, LargeTests
.class })
85 public class TestBulkLoadHFilesSplitRecovery
{
88 public static final HBaseClassTestRule CLASS_RULE
=
89 HBaseClassTestRule
.forClass(TestBulkLoadHFilesSplitRecovery
.class);
91 private static final Logger LOG
= LoggerFactory
.getLogger(TestHRegionServerBulkLoad
.class);
93 static HBaseTestingUtil util
;
94 // used by secure subclass
95 static boolean useSecure
= false;
97 final static int NUM_CFS
= 10;
98 final static byte[] QUAL
= Bytes
.toBytes("qual");
99 final static int ROWCOUNT
= 100;
101 private final static byte[][] families
= new byte[NUM_CFS
][];
104 public TestName name
= new TestName();
107 for (int i
= 0; i
< NUM_CFS
; i
++) {
108 families
[i
] = Bytes
.toBytes(family(i
));
112 static byte[] rowkey(int i
) {
113 return Bytes
.toBytes(String
.format("row_%08d", i
));
116 static String
family(int i
) {
117 return String
.format("family_%04d", i
);
120 static byte[] value(int i
) {
121 return Bytes
.toBytes(String
.format("%010d", i
));
124 public static void buildHFiles(FileSystem fs
, Path dir
, int value
) throws IOException
{
125 byte[] val
= value(value
);
126 for (int i
= 0; i
< NUM_CFS
; i
++) {
127 Path testIn
= new Path(dir
, family(i
));
129 TestHRegionServerBulkLoad
.createHFile(fs
, new Path(testIn
, "hfile_" + i
),
130 Bytes
.toBytes(family(i
)), QUAL
, val
, ROWCOUNT
);
134 private TableDescriptor
createTableDesc(TableName name
, int cfs
) {
135 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(name
);
136 IntStream
.range(0, cfs
).mapToObj(i
-> ColumnFamilyDescriptorBuilder
.of(family(i
)))
137 .forEachOrdered(builder
::setColumnFamily
);
138 return builder
.build();
142 * Creates a table with given table name and specified number of column families if the table does
145 private void setupTable(final Connection connection
, TableName table
, int cfs
)
148 LOG
.info("Creating table " + table
);
149 try (Admin admin
= connection
.getAdmin()) {
150 admin
.createTable(createTableDesc(table
, cfs
));
152 } catch (TableExistsException tee
) {
153 LOG
.info("Table " + table
+ " already exists");
158 * Creates a table with given table name,specified number of column families<br>
159 * and splitkeys if the table does not already exist.
164 private void setupTableWithSplitkeys(TableName table
, int cfs
, byte[][] SPLIT_KEYS
)
167 LOG
.info("Creating table " + table
);
168 util
.createTable(createTableDesc(table
, cfs
), SPLIT_KEYS
);
169 } catch (TableExistsException tee
) {
170 LOG
.info("Table " + table
+ " already exists");
174 private Path
buildBulkFiles(TableName table
, int value
) throws Exception
{
175 Path dir
= util
.getDataTestDirOnTestFS(table
.getNameAsString());
176 Path bulk1
= new Path(dir
, table
.getNameAsString() + value
);
177 FileSystem fs
= util
.getTestFileSystem();
178 buildHFiles(fs
, bulk1
, value
);
183 * Populate table with known values.
185 private void populateTable(final Connection connection
, TableName table
, int value
)
187 // create HFiles for different column families
188 Path dir
= buildBulkFiles(table
, value
);
189 BulkLoadHFiles
.create(util
.getConfiguration()).bulkLoad(table
, dir
);
193 * Split the known table in half. (this is hard coded for this test suite)
195 private void forceSplit(TableName table
) {
197 // need to call regions server to by synchronous but isn't visible.
198 HRegionServer hrs
= util
.getRSForFirstRegionInTable(table
);
200 for (RegionInfo hri
: ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices())) {
201 if (hri
.getTable().equals(table
)) {
202 util
.getAdmin().splitRegionAsync(hri
.getRegionName(), rowkey(ROWCOUNT
/ 2));
203 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
207 // verify that split completed.
211 for (RegionInfo hri
: ProtobufUtil
.getOnlineRegions(hrs
.getRSRpcServices())) {
212 if (hri
.getTable().equals(table
)) {
217 LOG
.info("Taking some time to complete split...");
220 } while (regions
!= 2);
221 } catch (IOException e
) {
223 } catch (InterruptedException e
) {
229 public static void setupCluster() throws Exception
{
230 util
= new HBaseTestingUtil();
231 util
.getConfiguration().set(CoprocessorHost
.REGION_COPROCESSOR_CONF_KEY
, "");
232 util
.startMiniCluster(1);
236 public static void teardownCluster() throws Exception
{
237 util
.shutdownMiniCluster();
241 * Checks that all columns have the expected value and that there is the expected number of rows.
242 * @throws IOException
244 void assertExpectedTable(TableName table
, int count
, int value
) throws IOException
{
245 TableDescriptor htd
= util
.getAdmin().getDescriptor(table
);
247 try (Table t
= util
.getConnection().getTable(table
);
248 ResultScanner sr
= t
.getScanner(new Scan())) {
250 for (Result r
; (r
= sr
.next()) != null;) {
251 r
.getNoVersionMap().values().stream().flatMap(m
-> m
.values().stream())
252 .forEach(v
-> assertArrayEquals(value(value
), v
));
255 assertEquals(count
, i
);
256 } catch (IOException e
) {
257 fail("Failed due to exception");
261 private static <T
> CompletableFuture
<T
> failedFuture(Throwable error
) {
262 CompletableFuture
<T
> future
= new CompletableFuture
<>();
263 future
.completeExceptionally(error
);
267 private static AsyncClusterConnection
mockAndInjectError(AsyncClusterConnection conn
) {
268 AsyncClusterConnection errConn
= spy(conn
);
269 doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn
)
270 .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
276 * Test that shows that exception thrown from the RS side will result in an exception on the
279 @Test(expected
= IOException
.class)
280 public void testBulkLoadPhaseFailure() throws Exception
{
281 final TableName table
= TableName
.valueOf(name
.getMethodName());
282 final AtomicInteger attemptedCalls
= new AtomicInteger();
283 Configuration conf
= new Configuration(util
.getConfiguration());
284 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
285 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(conf
) {
288 protected void bulkLoadPhase(AsyncClusterConnection conn
, TableName tableName
,
289 Deque
<LoadQueueItem
> queue
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
290 boolean copyFiles
, Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
291 AsyncClusterConnection c
=
292 attemptedCalls
.incrementAndGet() == 1 ?
mockAndInjectError(conn
) : conn
;
293 super.bulkLoadPhase(c
, tableName
, queue
, regionGroups
, copyFiles
, item2RegionMap
);
296 Path dir
= buildBulkFiles(table
, 1);
297 loader
.bulkLoad(table
, dir
);
301 * Test that shows that exception thrown from the RS side will result in the expected number of
302 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
303 * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set
306 public void testRetryOnIOException() throws Exception
{
307 TableName table
= TableName
.valueOf(name
.getMethodName());
308 AtomicInteger calls
= new AtomicInteger(0);
309 setupTable(util
.getConnection(), table
, 10);
310 Configuration conf
= new Configuration(util
.getConfiguration());
311 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
312 conf
.setBoolean(BulkLoadHFiles
.RETRY_ON_IO_EXCEPTION
, true);
313 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(conf
) {
316 protected void bulkLoadPhase(AsyncClusterConnection conn
, TableName tableName
,
317 Deque
<LoadQueueItem
> queue
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
318 boolean copyFiles
, Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
319 if (calls
.get() < conf
.getInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
320 HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
)) {
321 calls
.incrementAndGet();
322 super.bulkLoadPhase(mockAndInjectError(conn
), tableName
, queue
, regionGroups
, copyFiles
,
325 super.bulkLoadPhase(conn
, tableName
, queue
, regionGroups
, copyFiles
, item2RegionMap
);
329 Path dir
= buildBulkFiles(table
, 1);
330 loader
.bulkLoad(table
, dir
);
331 assertEquals(calls
.get(), 2);
335 * This test exercises the path where there is a split after initial validation but before the
336 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
337 * split just before the atomic region load.
340 public void testSplitWhileBulkLoadPhase() throws Exception
{
341 final TableName table
= TableName
.valueOf(name
.getMethodName());
342 setupTable(util
.getConnection(), table
, 10);
343 populateTable(util
.getConnection(), table
, 1);
344 assertExpectedTable(table
, ROWCOUNT
, 1);
346 // Now let's cause trouble. This will occur after checks and cause bulk
347 // files to fail when attempt to atomically import. This is recoverable.
348 final AtomicInteger attemptedCalls
= new AtomicInteger();
349 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(util
.getConfiguration()) {
352 protected void bulkLoadPhase(AsyncClusterConnection conn
, TableName tableName
,
353 Deque
<LoadQueueItem
> queue
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
354 boolean copyFiles
, Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
355 int i
= attemptedCalls
.incrementAndGet();
357 // On first attempt force a split.
360 super.bulkLoadPhase(conn
, tableName
, queue
, regionGroups
, copyFiles
, item2RegionMap
);
364 // create HFiles for different column families
365 Path dir
= buildBulkFiles(table
, 2);
366 loader
.bulkLoad(table
, dir
);
368 // check that data was loaded
369 // The three expected attempts are 1) failure because need to split, 2)
370 // load of split top 3) load of split bottom
371 assertEquals(3, attemptedCalls
.get());
372 assertExpectedTable(table
, ROWCOUNT
, 2);
376 * This test splits a table and attempts to bulk load. The bulk import files should be split
377 * before atomically importing.
380 public void testGroupOrSplitPresplit() throws Exception
{
381 final TableName table
= TableName
.valueOf(name
.getMethodName());
382 setupTable(util
.getConnection(), table
, 10);
383 populateTable(util
.getConnection(), table
, 1);
384 assertExpectedTable(util
.getConnection(), table
, ROWCOUNT
, 1);
387 final AtomicInteger countedLqis
= new AtomicInteger();
388 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(util
.getConfiguration()) {
391 protected Pair
<List
<LoadQueueItem
>, String
> groupOrSplit(AsyncClusterConnection conn
,
392 TableName tableName
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
, LoadQueueItem item
,
393 List
<Pair
<byte[], byte[]>> startEndKeys
) throws IOException
{
394 Pair
<List
<LoadQueueItem
>, String
> lqis
=
395 super.groupOrSplit(conn
, tableName
, regionGroups
, item
, startEndKeys
);
396 if (lqis
!= null && lqis
.getFirst() != null) {
397 countedLqis
.addAndGet(lqis
.getFirst().size());
403 // create HFiles for different column families
404 Path dir
= buildBulkFiles(table
, 2);
405 loader
.bulkLoad(table
, dir
);
406 assertExpectedTable(util
.getConnection(), table
, ROWCOUNT
, 2);
407 assertEquals(20, countedLqis
.get());
411 public void testCorrectSplitPoint() throws Exception
{
412 final TableName table
= TableName
.valueOf(name
.getMethodName());
413 byte[][] SPLIT_KEYS
= new byte[][] { Bytes
.toBytes("row_00000010"),
414 Bytes
.toBytes("row_00000020"), Bytes
.toBytes("row_00000030"), Bytes
.toBytes("row_00000040"),
415 Bytes
.toBytes("row_00000050"), Bytes
.toBytes("row_00000060"),
416 Bytes
.toBytes("row_00000070") };
417 setupTableWithSplitkeys(table
, NUM_CFS
, SPLIT_KEYS
);
419 final AtomicInteger bulkloadRpcTimes
= new AtomicInteger();
420 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(util
.getConfiguration()) {
423 protected void bulkLoadPhase(AsyncClusterConnection conn
, TableName tableName
,
424 Deque
<LoadQueueItem
> queue
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
425 boolean copyFiles
, Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
426 bulkloadRpcTimes
.addAndGet(1);
427 super.bulkLoadPhase(conn
, tableName
, queue
, regionGroups
, copyFiles
, item2RegionMap
);
431 Path dir
= buildBulkFiles(table
, 1);
432 loader
.bulkLoad(table
, dir
);
433 // before HBASE-25281 we need invoke bulkload rpc 8 times
434 assertEquals(4, bulkloadRpcTimes
.get());
438 * This test creates a table with many small regions. The bulk load files would be splitted
439 * multiple times before all of them can be loaded successfully.
442 public void testSplitTmpFileCleanUp() throws Exception
{
443 final TableName table
= TableName
.valueOf(name
.getMethodName());
444 byte[][] SPLIT_KEYS
= new byte[][] { Bytes
.toBytes("row_00000010"),
445 Bytes
.toBytes("row_00000020"), Bytes
.toBytes("row_00000030"), Bytes
.toBytes("row_00000040"),
446 Bytes
.toBytes("row_00000050") };
447 setupTableWithSplitkeys(table
, 10, SPLIT_KEYS
);
449 BulkLoadHFiles loader
= BulkLoadHFiles
.create(util
.getConfiguration());
452 Path dir
= buildBulkFiles(table
, 2);
453 loader
.bulkLoad(table
, dir
);
455 Path tmpPath
= new Path(dir
, family(0));
456 // TMP_DIR under family path
457 tmpPath
= new Path(tmpPath
, BulkLoadHFilesTool
.TMP_DIR
);
458 FileSystem fs
= dir
.getFileSystem(util
.getConfiguration());
459 // HFiles have been splitted, there is TMP_DIR
460 assertTrue(fs
.exists(tmpPath
));
461 // TMP_DIR should have been cleaned-up
462 assertNull(BulkLoadHFilesTool
.TMP_DIR
+ " should be empty.",
463 CommonFSUtils
.listStatus(fs
, tmpPath
));
464 assertExpectedTable(util
.getConnection(), table
, ROWCOUNT
, 2);
468 * This simulates an remote exception which should cause LIHF to exit with an exception.
470 @Test(expected
= IOException
.class)
471 public void testGroupOrSplitFailure() throws Exception
{
472 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
473 setupTable(util
.getConnection(), tableName
, 10);
474 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(util
.getConfiguration()) {
479 protected Pair
<List
<LoadQueueItem
>, String
> groupOrSplit(AsyncClusterConnection conn
,
480 TableName tableName
, Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
, LoadQueueItem item
,
481 List
<Pair
<byte[], byte[]>> startEndKeys
) throws IOException
{
485 throw new IOException("failure");
487 return super.groupOrSplit(conn
, tableName
, regionGroups
, item
, startEndKeys
);
491 // create HFiles for different column families
492 Path dir
= buildBulkFiles(tableName
, 1);
493 loader
.bulkLoad(tableName
, dir
);
497 * We are testing a split after initial validation but before the atomic bulk load call.
498 * We cannot use presplitting to test this path, so we actually inject a
499 * split just before the atomic region load. However, we will pass null item2RegionMap
500 * and that should not affect the bulk load behavior.
503 public void testSplitWhileBulkLoadPhaseWithoutItemMap() throws Exception
{
504 final TableName table
= TableName
.valueOf(name
.getMethodName());
505 setupTable(util
.getConnection(), table
, 10);
506 populateTable(util
.getConnection(), table
, 1);
507 assertExpectedTable(table
, ROWCOUNT
, 1);
509 // Now let's cause trouble. This will occur after checks and cause bulk
510 // files to fail when attempt to atomically import. This is recoverable.
511 final AtomicInteger attemptedCalls
= new AtomicInteger();
512 BulkLoadHFilesTool loader
= new BulkLoadHFilesTool(util
.getConfiguration()) {
515 protected void bulkLoadPhase(final AsyncClusterConnection conn
, final TableName tableName
,
516 final Deque
<LoadQueueItem
> queue
, final Multimap
<ByteBuffer
, LoadQueueItem
> regionGroups
,
517 final boolean copyFiles
,
518 final Map
<LoadQueueItem
, ByteBuffer
> item2RegionMap
) throws IOException
{
520 int i
= attemptedCalls
.incrementAndGet();
522 // On first attempt force a split.
526 // Passing item2RegionMap null
527 // In the absence of LoadQueueItem, bulk load should work as expected
528 super.bulkLoadPhase(conn
, tableName
, queue
, regionGroups
, copyFiles
, null);
533 // create HFiles for different column families
534 Path dir
= buildBulkFiles(table
, 2);
535 loader
.bulkLoad(table
, dir
);
537 // check that data was loaded
538 // The three expected attempts are 1) failure because need to split, 2)
539 // load of split top 3) load of split bottom
540 assertEquals(3, attemptedCalls
.get());
541 assertExpectedTable(table
, ROWCOUNT
, 2);
546 * Checks that all columns have the expected value and that there is the expected number of rows.
548 void assertExpectedTable(final Connection connection
, TableName table
, int count
, int value
)
550 TableDescriptor htd
= util
.getAdmin().getDescriptor(table
);
552 try (Table t
= connection
.getTable(table
); ResultScanner sr
= t
.getScanner(new Scan())) {
554 for (Result r
; (r
= sr
.next()) != null;) {
555 r
.getNoVersionMap().values().stream().flatMap(m
-> m
.values().stream())
556 .forEach(v
-> assertArrayEquals(value(value
), v
));
559 assertEquals(count
, i
);
560 } catch (IOException e
) {
561 fail("Failed due to exception");