HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / tool / TestBulkLoadHFilesSplitRecovery.java
blob7c5f1139ff0d4867876855ba1108d7ba88ab4ae5
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.tool;
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 import static org.junit.Assert.assertNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
26 import static org.mockito.ArgumentMatchers.any;
27 import static org.mockito.ArgumentMatchers.anyBoolean;
28 import static org.mockito.ArgumentMatchers.anyList;
29 import static org.mockito.Mockito.doReturn;
30 import static org.mockito.Mockito.spy;
32 import java.io.IOException;
33 import java.nio.ByteBuffer;
34 import java.util.Deque;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.stream.IntStream;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.HBaseClassTestRule;
44 import org.apache.hadoop.hbase.HBaseTestingUtil;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.TableExistsException;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Admin;
49 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
50 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
51 import org.apache.hadoop.hbase.client.Connection;
52 import org.apache.hadoop.hbase.client.RegionInfo;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.ResultScanner;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.client.Table;
57 import org.apache.hadoop.hbase.client.TableDescriptor;
58 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
59 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
60 import org.apache.hadoop.hbase.regionserver.HRegionServer;
61 import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
62 import org.apache.hadoop.hbase.testclassification.LargeTests;
63 import org.apache.hadoop.hbase.testclassification.MiscTests;
64 import org.apache.hadoop.hbase.util.Bytes;
65 import org.apache.hadoop.hbase.util.CommonFSUtils;
66 import org.apache.hadoop.hbase.util.Pair;
67 import org.junit.AfterClass;
68 import org.junit.BeforeClass;
69 import org.junit.ClassRule;
70 import org.junit.Rule;
71 import org.junit.Test;
72 import org.junit.experimental.categories.Category;
73 import org.junit.rules.TestName;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
77 import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
79 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
81 /**
82 * Test cases for the atomic load error handling of the bulk load functionality.
84 @Category({ MiscTests.class, LargeTests.class })
85 public class TestBulkLoadHFilesSplitRecovery {
87 @ClassRule
88 public static final HBaseClassTestRule CLASS_RULE =
89 HBaseClassTestRule.forClass(TestBulkLoadHFilesSplitRecovery.class);
91 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
93 static HBaseTestingUtil util;
94 // used by secure subclass
95 static boolean useSecure = false;
97 final static int NUM_CFS = 10;
98 final static byte[] QUAL = Bytes.toBytes("qual");
99 final static int ROWCOUNT = 100;
101 private final static byte[][] families = new byte[NUM_CFS][];
103 @Rule
104 public TestName name = new TestName();
106 static {
107 for (int i = 0; i < NUM_CFS; i++) {
108 families[i] = Bytes.toBytes(family(i));
112 static byte[] rowkey(int i) {
113 return Bytes.toBytes(String.format("row_%08d", i));
116 static String family(int i) {
117 return String.format("family_%04d", i);
120 static byte[] value(int i) {
121 return Bytes.toBytes(String.format("%010d", i));
124 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
125 byte[] val = value(value);
126 for (int i = 0; i < NUM_CFS; i++) {
127 Path testIn = new Path(dir, family(i));
129 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
130 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
134 private TableDescriptor createTableDesc(TableName name, int cfs) {
135 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
136 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
137 .forEachOrdered(builder::setColumnFamily);
138 return builder.build();
142 * Creates a table with given table name and specified number of column families if the table does
143 * not already exist.
145 private void setupTable(final Connection connection, TableName table, int cfs)
146 throws IOException {
147 try {
148 LOG.info("Creating table " + table);
149 try (Admin admin = connection.getAdmin()) {
150 admin.createTable(createTableDesc(table, cfs));
152 } catch (TableExistsException tee) {
153 LOG.info("Table " + table + " already exists");
158 * Creates a table with given table name,specified number of column families<br>
159 * and splitkeys if the table does not already exist.
160 * @param table
161 * @param cfs
162 * @param SPLIT_KEYS
164 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
165 throws IOException {
166 try {
167 LOG.info("Creating table " + table);
168 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
169 } catch (TableExistsException tee) {
170 LOG.info("Table " + table + " already exists");
174 private Path buildBulkFiles(TableName table, int value) throws Exception {
175 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
176 Path bulk1 = new Path(dir, table.getNameAsString() + value);
177 FileSystem fs = util.getTestFileSystem();
178 buildHFiles(fs, bulk1, value);
179 return bulk1;
183 * Populate table with known values.
185 private void populateTable(final Connection connection, TableName table, int value)
186 throws Exception {
187 // create HFiles for different column families
188 Path dir = buildBulkFiles(table, value);
189 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir);
193 * Split the known table in half. (this is hard coded for this test suite)
195 private void forceSplit(TableName table) {
196 try {
197 // need to call regions server to by synchronous but isn't visible.
198 HRegionServer hrs = util.getRSForFirstRegionInTable(table);
200 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
201 if (hri.getTable().equals(table)) {
202 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
203 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
207 // verify that split completed.
208 int regions;
209 do {
210 regions = 0;
211 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
212 if (hri.getTable().equals(table)) {
213 regions++;
216 if (regions != 2) {
217 LOG.info("Taking some time to complete split...");
218 Thread.sleep(250);
220 } while (regions != 2);
221 } catch (IOException e) {
222 e.printStackTrace();
223 } catch (InterruptedException e) {
224 e.printStackTrace();
228 @BeforeClass
229 public static void setupCluster() throws Exception {
230 util = new HBaseTestingUtil();
231 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
232 util.startMiniCluster(1);
235 @AfterClass
236 public static void teardownCluster() throws Exception {
237 util.shutdownMiniCluster();
241 * Checks that all columns have the expected value and that there is the expected number of rows.
242 * @throws IOException
244 void assertExpectedTable(TableName table, int count, int value) throws IOException {
245 TableDescriptor htd = util.getAdmin().getDescriptor(table);
246 assertNotNull(htd);
247 try (Table t = util.getConnection().getTable(table);
248 ResultScanner sr = t.getScanner(new Scan())) {
249 int i = 0;
250 for (Result r; (r = sr.next()) != null;) {
251 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
252 .forEach(v -> assertArrayEquals(value(value), v));
253 i++;
255 assertEquals(count, i);
256 } catch (IOException e) {
257 fail("Failed due to exception");
261 private static <T> CompletableFuture<T> failedFuture(Throwable error) {
262 CompletableFuture<T> future = new CompletableFuture<>();
263 future.completeExceptionally(error);
264 return future;
267 private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
268 AsyncClusterConnection errConn = spy(conn);
269 doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
270 .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
271 anyBoolean());
272 return errConn;
276 * Test that shows that exception thrown from the RS side will result in an exception on the
277 * LIHFile client.
279 @Test(expected = IOException.class)
280 public void testBulkLoadPhaseFailure() throws Exception {
281 final TableName table = TableName.valueOf(name.getMethodName());
282 final AtomicInteger attemptedCalls = new AtomicInteger();
283 Configuration conf = new Configuration(util.getConfiguration());
284 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
285 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
287 @Override
288 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
289 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
290 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
291 AsyncClusterConnection c =
292 attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn;
293 super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap);
296 Path dir = buildBulkFiles(table, 1);
297 loader.bulkLoad(table, dir);
301 * Test that shows that exception thrown from the RS side will result in the expected number of
302 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
303 * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set
305 @Test
306 public void testRetryOnIOException() throws Exception {
307 TableName table = TableName.valueOf(name.getMethodName());
308 AtomicInteger calls = new AtomicInteger(0);
309 setupTable(util.getConnection(), table, 10);
310 Configuration conf = new Configuration(util.getConfiguration());
311 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
312 conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true);
313 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
315 @Override
316 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
317 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
318 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
319 if (calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
320 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
321 calls.incrementAndGet();
322 super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles,
323 item2RegionMap);
324 } else {
325 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
329 Path dir = buildBulkFiles(table, 1);
330 loader.bulkLoad(table, dir);
331 assertEquals(calls.get(), 2);
335 * This test exercises the path where there is a split after initial validation but before the
336 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
337 * split just before the atomic region load.
339 @Test
340 public void testSplitWhileBulkLoadPhase() throws Exception {
341 final TableName table = TableName.valueOf(name.getMethodName());
342 setupTable(util.getConnection(), table, 10);
343 populateTable(util.getConnection(), table, 1);
344 assertExpectedTable(table, ROWCOUNT, 1);
346 // Now let's cause trouble. This will occur after checks and cause bulk
347 // files to fail when attempt to atomically import. This is recoverable.
348 final AtomicInteger attemptedCalls = new AtomicInteger();
349 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
351 @Override
352 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
353 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
354 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
355 int i = attemptedCalls.incrementAndGet();
356 if (i == 1) {
357 // On first attempt force a split.
358 forceSplit(table);
360 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
364 // create HFiles for different column families
365 Path dir = buildBulkFiles(table, 2);
366 loader.bulkLoad(table, dir);
368 // check that data was loaded
369 // The three expected attempts are 1) failure because need to split, 2)
370 // load of split top 3) load of split bottom
371 assertEquals(3, attemptedCalls.get());
372 assertExpectedTable(table, ROWCOUNT, 2);
376 * This test splits a table and attempts to bulk load. The bulk import files should be split
377 * before atomically importing.
379 @Test
380 public void testGroupOrSplitPresplit() throws Exception {
381 final TableName table = TableName.valueOf(name.getMethodName());
382 setupTable(util.getConnection(), table, 10);
383 populateTable(util.getConnection(), table, 1);
384 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1);
385 forceSplit(table);
387 final AtomicInteger countedLqis = new AtomicInteger();
388 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
390 @Override
391 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
392 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
393 List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
394 Pair<List<LoadQueueItem>, String> lqis =
395 super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
396 if (lqis != null && lqis.getFirst() != null) {
397 countedLqis.addAndGet(lqis.getFirst().size());
399 return lqis;
403 // create HFiles for different column families
404 Path dir = buildBulkFiles(table, 2);
405 loader.bulkLoad(table, dir);
406 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
407 assertEquals(20, countedLqis.get());
410 @Test
411 public void testCorrectSplitPoint() throws Exception {
412 final TableName table = TableName.valueOf(name.getMethodName());
413 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
414 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
415 Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"),
416 Bytes.toBytes("row_00000070") };
417 setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
419 final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
420 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
422 @Override
423 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
424 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
425 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
426 bulkloadRpcTimes.addAndGet(1);
427 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
431 Path dir = buildBulkFiles(table, 1);
432 loader.bulkLoad(table, dir);
433 // before HBASE-25281 we need invoke bulkload rpc 8 times
434 assertEquals(4, bulkloadRpcTimes.get());
438 * This test creates a table with many small regions. The bulk load files would be splitted
439 * multiple times before all of them can be loaded successfully.
441 @Test
442 public void testSplitTmpFileCleanUp() throws Exception {
443 final TableName table = TableName.valueOf(name.getMethodName());
444 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
445 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
446 Bytes.toBytes("row_00000050") };
447 setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
449 BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration());
451 // create HFiles
452 Path dir = buildBulkFiles(table, 2);
453 loader.bulkLoad(table, dir);
454 // family path
455 Path tmpPath = new Path(dir, family(0));
456 // TMP_DIR under family path
457 tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR);
458 FileSystem fs = dir.getFileSystem(util.getConfiguration());
459 // HFiles have been splitted, there is TMP_DIR
460 assertTrue(fs.exists(tmpPath));
461 // TMP_DIR should have been cleaned-up
462 assertNull(BulkLoadHFilesTool.TMP_DIR + " should be empty.",
463 CommonFSUtils.listStatus(fs, tmpPath));
464 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
468 * This simulates an remote exception which should cause LIHF to exit with an exception.
470 @Test(expected = IOException.class)
471 public void testGroupOrSplitFailure() throws Exception {
472 final TableName tableName = TableName.valueOf(name.getMethodName());
473 setupTable(util.getConnection(), tableName, 10);
474 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
476 private int i = 0;
478 @Override
479 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
480 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
481 List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
482 i++;
484 if (i == 5) {
485 throw new IOException("failure");
487 return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
491 // create HFiles for different column families
492 Path dir = buildBulkFiles(tableName, 1);
493 loader.bulkLoad(tableName, dir);
497 * We are testing a split after initial validation but before the atomic bulk load call.
498 * We cannot use presplitting to test this path, so we actually inject a
499 * split just before the atomic region load. However, we will pass null item2RegionMap
500 * and that should not affect the bulk load behavior.
502 @Test
503 public void testSplitWhileBulkLoadPhaseWithoutItemMap() throws Exception {
504 final TableName table = TableName.valueOf(name.getMethodName());
505 setupTable(util.getConnection(), table, 10);
506 populateTable(util.getConnection(), table, 1);
507 assertExpectedTable(table, ROWCOUNT, 1);
509 // Now let's cause trouble. This will occur after checks and cause bulk
510 // files to fail when attempt to atomically import. This is recoverable.
511 final AtomicInteger attemptedCalls = new AtomicInteger();
512 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
514 @Override
515 protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName,
516 final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups,
517 final boolean copyFiles,
518 final Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
520 int i = attemptedCalls.incrementAndGet();
521 if (i == 1) {
522 // On first attempt force a split.
523 forceSplit(table);
526 // Passing item2RegionMap null
527 // In the absence of LoadQueueItem, bulk load should work as expected
528 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
533 // create HFiles for different column families
534 Path dir = buildBulkFiles(table, 2);
535 loader.bulkLoad(table, dir);
537 // check that data was loaded
538 // The three expected attempts are 1) failure because need to split, 2)
539 // load of split top 3) load of split bottom
540 assertEquals(3, attemptedCalls.get());
541 assertExpectedTable(table, ROWCOUNT, 2);
546 * Checks that all columns have the expected value and that there is the expected number of rows.
548 void assertExpectedTable(final Connection connection, TableName table, int count, int value)
549 throws IOException {
550 TableDescriptor htd = util.getAdmin().getDescriptor(table);
551 assertNotNull(htd);
552 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
553 int i = 0;
554 for (Result r; (r = sr.next()) != null;) {
555 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
556 .forEach(v -> assertArrayEquals(value(value), v));
557 i++;
559 assertEquals(count, i);
560 } catch (IOException e) {
561 fail("Failed due to exception");