HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestAsyncRegionAdminApi.java
blobfe79a65dbee51cea1424aa33f67f9989634df06f
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.hamcrest.CoreMatchers.instanceOf;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotEquals;
24 import static org.junit.Assert.assertThat;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
28 import java.io.IOException;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.stream.Collectors;
36 import org.apache.hadoop.hbase.HBaseClassTestRule;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.TableNotFoundException;
40 import org.apache.hadoop.hbase.master.HMaster;
41 import org.apache.hadoop.hbase.master.RegionState;
42 import org.apache.hadoop.hbase.master.ServerManager;
43 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
44 import org.apache.hadoop.hbase.master.assignment.RegionStates;
45 import org.apache.hadoop.hbase.regionserver.HRegionServer;
46 import org.apache.hadoop.hbase.regionserver.Region;
47 import org.apache.hadoop.hbase.testclassification.ClientTests;
48 import org.apache.hadoop.hbase.testclassification.LargeTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.hadoop.hbase.util.JVMClusterUtil;
52 import org.apache.hadoop.hbase.util.Threads;
53 import org.junit.ClassRule;
54 import org.junit.Test;
55 import org.junit.experimental.categories.Category;
56 import org.junit.runner.RunWith;
57 import org.junit.runners.Parameterized;
59 /**
60 * Class to test asynchronous region admin operations.
61 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
62 * ten minute timeout so they were split.
64 @RunWith(Parameterized.class)
65 @Category({ LargeTests.class, ClientTests.class })
66 public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
67 @ClassRule
68 public static final HBaseClassTestRule CLASS_RULE =
69 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
71 @Test
72 public void testAssignRegionAndUnassignRegion() throws Exception {
73 createTableWithDefaultConf(tableName);
75 // assign region.
76 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
77 AssignmentManager am = master.getAssignmentManager();
78 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
80 // assert region on server
81 RegionStates regionStates = am.getRegionStates();
82 ServerName serverName = regionStates.getRegionServerOfRegion(hri);
83 TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
84 assertTrue(regionStates.getRegionState(hri).isOpened());
86 // Region is assigned now. Let's assign it again.
87 // Master should not abort, and region should stay assigned.
88 try {
89 admin.assign(hri.getRegionName()).get();
90 fail("Should fail when assigning an already onlined region");
91 } catch (ExecutionException e) {
92 // Expected
93 assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class));
95 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
96 assertTrue(regionStates.getRegionState(hri).isOpened());
98 // unassign region
99 admin.unassign(hri.getRegionName(), true).get();
100 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
101 assertTrue(regionStates.getRegionState(hri).isClosed());
104 RegionInfo createTableAndGetOneRegion(final TableName tableName)
105 throws IOException, InterruptedException, ExecutionException {
106 TableDescriptor desc =
107 TableDescriptorBuilder.newBuilder(tableName)
108 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
109 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
111 // wait till the table is assigned
112 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
113 long timeoutTime = System.currentTimeMillis() + 3000;
114 while (true) {
115 List<RegionInfo> regions =
116 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
117 if (regions.size() > 3) {
118 return regions.get(2);
120 long now = System.currentTimeMillis();
121 if (now > timeoutTime) {
122 fail("Could not find an online region");
124 Thread.sleep(10);
128 @Test
129 public void testGetRegionByStateOfTable() throws Exception {
130 RegionInfo hri = createTableAndGetOneRegion(tableName);
132 RegionStates regionStates =
133 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
134 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
135 .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
136 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
137 .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
140 @Test
141 public void testMoveRegion() throws Exception {
142 admin.balancerSwitch(false).join();
144 RegionInfo hri = createTableAndGetOneRegion(tableName);
145 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
146 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
148 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
149 ServerManager serverManager = master.getServerManager();
150 ServerName destServerName = null;
151 List<JVMClusterUtil.RegionServerThread> regionServers =
152 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
153 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
154 HRegionServer destServer = regionServer.getRegionServer();
155 destServerName = destServer.getServerName();
156 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
157 break;
161 assertTrue(destServerName != null && !destServerName.equals(serverName));
162 admin.move(hri.getRegionName(), destServerName).get();
164 long timeoutTime = System.currentTimeMillis() + 30000;
165 while (true) {
166 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
167 if (sn != null && sn.equals(destServerName)) {
168 break;
170 long now = System.currentTimeMillis();
171 if (now > timeoutTime) {
172 fail("Failed to move the region in time: " + hri);
174 Thread.sleep(100);
176 admin.balancerSwitch(true).join();
179 @Test
180 public void testGetOnlineRegions() throws Exception {
181 createTableAndGetOneRegion(tableName);
182 AtomicInteger regionServerCount = new AtomicInteger(0);
183 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
184 .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> {
185 ServerName serverName = rs.getServerName();
186 try {
187 assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size());
188 } catch (Exception e) {
189 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
191 regionServerCount.incrementAndGet();
193 assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(),
194 regionServerCount.get());
197 @Test
198 public void testFlushTableAndRegion() throws Exception {
199 RegionInfo hri = createTableAndGetOneRegion(tableName);
200 ServerName serverName =
201 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
202 .getRegionServerOfRegion(hri);
203 HRegionServer regionServer =
204 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
205 .map(rsThread -> rsThread.getRegionServer())
206 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
208 // write a put into the specific region
209 ASYNC_CONN.getTable(tableName)
210 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
211 .join();
212 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
213 // flush region and wait flush operation finished.
214 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
215 admin.flushRegion(hri.getRegionName()).get();
216 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
217 Threads.sleepWithoutInterrupt(500);
218 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
219 Threads.sleep(50);
221 // check the memstore.
222 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
224 // write another put into the specific region
225 ASYNC_CONN.getTable(tableName)
226 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
227 .join();
228 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
229 admin.flush(tableName).get();
230 Threads.sleepWithoutInterrupt(500);
231 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
232 Threads.sleep(50);
234 // check the memstore.
235 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
238 private void waitUntilMobCompactionFinished(TableName tableName)
239 throws ExecutionException, InterruptedException {
240 long finished = EnvironmentEdgeManager.currentTime() + 60000;
241 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
242 while (EnvironmentEdgeManager.currentTime() < finished) {
243 if (state == CompactionState.NONE) {
244 break;
246 Thread.sleep(10);
247 state = admin.getCompactionState(tableName, CompactType.MOB).get();
249 assertEquals(CompactionState.NONE, state);
252 @Test
253 public void testCompactMob() throws Exception {
254 ColumnFamilyDescriptor columnDescriptor =
255 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
256 .setMobEnabled(true).setMobThreshold(0).build();
258 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
259 .setColumnFamily(columnDescriptor).build();
261 admin.createTable(tableDescriptor).get();
263 byte[][] families = { Bytes.toBytes("mob") };
264 loadData(tableName, families, 3000, 8);
266 admin.majorCompact(tableName).get();
268 CompactionState state = admin.getCompactionState(tableName).get();
269 assertNotEquals(CompactionState.NONE, state);
271 waitUntilMobCompactionFinished(tableName);
274 @Test
275 public void testCompactRegionServer() throws Exception {
276 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
277 createTableWithDefaultConf(tableName, null, families);
278 loadData(tableName, families, 3000, 8);
280 List<HRegionServer> rsList =
281 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
282 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
283 List<Region> regions = new ArrayList<>();
284 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
285 assertEquals(1, regions.size());
286 int countBefore = countStoreFilesInFamilies(regions, families);
287 assertTrue(countBefore > 0);
289 // Minor compaction for all region servers.
290 for (HRegionServer rs : rsList)
291 admin.compactRegionServer(rs.getServerName()).get();
292 Thread.sleep(5000);
293 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
294 assertTrue(countAfterMinorCompaction < countBefore);
296 // Major compaction for all region servers.
297 for (HRegionServer rs : rsList)
298 admin.majorCompactRegionServer(rs.getServerName()).get();
299 Thread.sleep(5000);
300 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
301 assertEquals(3, countAfterMajorCompaction);
304 @Test
305 public void testCompactionSwitchStates() throws Exception {
306 // Create a table with regions
307 byte[] family = Bytes.toBytes("family");
308 byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")),
309 Bytes.add(family, Bytes.toBytes("3"))};
310 createTableWithDefaultConf(tableName, null, families);
311 loadData(tableName, families, 3000, 8);
312 List<Region> regions = new ArrayList<>();
313 TEST_UTIL
314 .getHBaseCluster()
315 .getLiveRegionServerThreads()
316 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
317 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
318 admin.compactionSwitch(true, new ArrayList<>());
319 Map<ServerName, Boolean> pairs = listCompletableFuture.get();
320 for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
321 assertEquals("Default compaction state, expected=enabled actual=disabled",
322 true, p.getValue());
324 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
325 admin.compactionSwitch(false, new ArrayList<>());
326 Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
327 for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
328 assertEquals("Last compaction state, expected=enabled actual=disabled",
329 true, p.getValue());
331 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
332 admin.compactionSwitch(true, new ArrayList<>());
333 Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
334 for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
335 assertEquals("Last compaction state, expected=disabled actual=enabled",
336 false, p.getValue());
338 ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0)
339 .getServerName();
340 List<String> serverNameList = new ArrayList<String>();
341 serverNameList.add(serverName.getServerName());
342 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 =
343 admin.compactionSwitch(false, serverNameList);
344 Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get();
345 assertEquals(pairs3.entrySet().size(), 1);
346 for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) {
347 assertEquals("Last compaction state, expected=enabled actual=disabled",
348 true, p.getValue());
350 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 =
351 admin.compactionSwitch(true, serverNameList);
352 Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get();
353 assertEquals(pairs4.entrySet().size(), 1);
354 for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) {
355 assertEquals("Last compaction state, expected=disabled actual=enabled",
356 false, p.getValue());
360 @Test
361 public void testCompact() throws Exception {
362 compactionTest(TableName.valueOf("testCompact1"), 8, CompactionState.MAJOR, false);
363 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, false);
364 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, true);
365 compactionTest(TableName.valueOf("testCompact4"), 15, CompactionState.MINOR, true);
368 private void compactionTest(final TableName tableName, final int flushes,
369 final CompactionState expectedState, boolean singleFamily) throws Exception {
370 // Create a table with regions
371 byte[] family = Bytes.toBytes("family");
372 byte[][] families =
373 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
374 createTableWithDefaultConf(tableName, null, families);
375 loadData(tableName, families, 3000, flushes);
377 List<Region> regions = new ArrayList<>();
378 TEST_UTIL
379 .getHBaseCluster()
380 .getLiveRegionServerThreads()
381 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
382 assertEquals(1, regions.size());
384 int countBefore = countStoreFilesInFamilies(regions, families);
385 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
386 assertTrue(countBefore > 0); // there should be some data files
387 if (expectedState == CompactionState.MINOR) {
388 if (singleFamily) {
389 admin.compact(tableName, family).get();
390 } else {
391 admin.compact(tableName).get();
393 } else {
394 if (singleFamily) {
395 admin.majorCompact(tableName, family).get();
396 } else {
397 admin.majorCompact(tableName).get();
401 long curt = System.currentTimeMillis();
402 long waitTime = 5000;
403 long endt = curt + waitTime;
404 CompactionState state = admin.getCompactionState(tableName).get();
405 while (state == CompactionState.NONE && curt < endt) {
406 Thread.sleep(10);
407 state = admin.getCompactionState(tableName).get();
408 curt = System.currentTimeMillis();
410 // Now, should have the right compaction state,
411 // otherwise, the compaction should have already been done
412 if (expectedState != state) {
413 for (Region region : regions) {
414 state = CompactionState.valueOf(region.getCompactionState().toString());
415 assertEquals(CompactionState.NONE, state);
417 } else {
418 // Wait until the compaction is done
419 state = admin.getCompactionState(tableName).get();
420 while (state != CompactionState.NONE && curt < endt) {
421 Thread.sleep(10);
422 state = admin.getCompactionState(tableName).get();
424 // Now, compaction should be done.
425 assertEquals(CompactionState.NONE, state);
428 int countAfter = countStoreFilesInFamilies(regions, families);
429 int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
430 assertTrue(countAfter < countBefore);
431 if (!singleFamily) {
432 if (expectedState == CompactionState.MAJOR) assertTrue(families.length == countAfter);
433 else assertTrue(families.length < countAfter);
434 } else {
435 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
436 // assert only change was to single column family
437 assertTrue(singleFamDiff == (countBefore - countAfter));
438 if (expectedState == CompactionState.MAJOR) {
439 assertTrue(1 == countAfterSingleFamily);
440 } else {
441 assertTrue(1 < countAfterSingleFamily);
446 @Test
447 public void testNonExistentTableCompaction() {
448 testNonExistentTableCompaction(CompactionState.MINOR);
449 testNonExistentTableCompaction(CompactionState.MAJOR);
452 private void testNonExistentTableCompaction(CompactionState compactionState) {
453 try {
454 if (compactionState == CompactionState.MINOR) {
455 admin.compact(TableName.valueOf("NonExistentTable")).get();
456 } else {
457 admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
459 fail("Expected TableNotFoundException when table doesn't exist");
460 } catch (Exception e) {
461 // expected.
462 assertTrue(e.getCause() instanceof TableNotFoundException);
466 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
467 return countStoreFilesInFamilies(regions, new byte[][] { family });
470 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
471 int count = 0;
472 for (Region region : regions) {
473 count += region.getStoreFileList(families).size();
475 return count;
478 static void loadData(final TableName tableName, final byte[][] families, final int rows)
479 throws IOException {
480 loadData(tableName, families, rows, 1);
483 static void loadData(final TableName tableName, final byte[][] families, final int rows,
484 final int flushes) throws IOException {
485 AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
486 List<Put> puts = new ArrayList<>(rows);
487 byte[] qualifier = Bytes.toBytes("val");
488 for (int i = 0; i < flushes; i++) {
489 for (int k = 0; k < rows; k++) {
490 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
491 Put p = new Put(row);
492 for (int j = 0; j < families.length; ++j) {
493 p.addColumn(families[j], qualifier, row);
495 puts.add(p);
497 table.putAll(puts).join();
498 TEST_UTIL.flush();
499 puts.clear();