HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestAsyncRegionAdminApi.java
blob6ff2d22db98af1d73b796de10506c295aed1f944
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.hamcrest.CoreMatchers.instanceOf;
21 import static org.hamcrest.MatcherAssert.assertThat;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotEquals;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
28 import java.io.IOException;
29 import java.util.ArrayList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.CompletableFuture;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.stream.Collectors;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.HBaseClassTestRule;
38 import org.apache.hadoop.hbase.ServerName;
39 import org.apache.hadoop.hbase.TableName;
40 import org.apache.hadoop.hbase.TableNotFoundException;
41 import org.apache.hadoop.hbase.master.HMaster;
42 import org.apache.hadoop.hbase.master.RegionState;
43 import org.apache.hadoop.hbase.master.ServerManager;
44 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
45 import org.apache.hadoop.hbase.master.assignment.RegionStates;
46 import org.apache.hadoop.hbase.regionserver.HRegionServer;
47 import org.apache.hadoop.hbase.regionserver.Region;
48 import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
49 import org.apache.hadoop.hbase.testclassification.ClientTests;
50 import org.apache.hadoop.hbase.testclassification.LargeTests;
51 import org.apache.hadoop.hbase.util.Bytes;
52 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53 import org.apache.hadoop.hbase.util.JVMClusterUtil;
54 import org.apache.hadoop.hbase.util.Threads;
55 import org.junit.ClassRule;
56 import org.junit.Test;
57 import org.junit.experimental.categories.Category;
58 import org.junit.runner.RunWith;
59 import org.junit.runners.Parameterized;
61 /**
62 * Class to test asynchronous region admin operations.
63 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
64 * ten minute timeout so they were split.
66 @RunWith(Parameterized.class)
67 @Category({ LargeTests.class, ClientTests.class })
68 public class TestAsyncRegionAdminApi extends TestAsyncAdminBase {
69 @ClassRule
70 public static final HBaseClassTestRule CLASS_RULE =
71 HBaseClassTestRule.forClass(TestAsyncRegionAdminApi.class);
73 @Test
74 public void testAssignRegionAndUnassignRegion() throws Exception {
75 createTableWithDefaultConf(tableName);
77 // assign region.
78 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
79 AssignmentManager am = master.getAssignmentManager();
80 RegionInfo hri = am.getRegionStates().getRegionsOfTable(tableName).get(0);
82 // assert region on server
83 RegionStates regionStates = am.getRegionStates();
84 ServerName serverName = regionStates.getRegionServerOfRegion(hri);
85 TEST_UTIL.assertRegionOnServer(hri, serverName, 200);
86 assertTrue(regionStates.getRegionState(hri).isOpened());
88 // Region is assigned now. Let's assign it again.
89 // Master should not abort, and region should stay assigned.
90 try {
91 admin.assign(hri.getRegionName()).get();
92 fail("Should fail when assigning an already onlined region");
93 } catch (ExecutionException e) {
94 // Expected
95 assertThat(e.getCause(), instanceOf(DoNotRetryRegionException.class));
97 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
98 assertTrue(regionStates.getRegionState(hri).isOpened());
100 // unassign region
101 admin.unassign(hri.getRegionName(), true).get();
102 assertFalse(am.getRegionStates().getRegionStateNode(hri).isInTransition());
103 assertTrue(regionStates.getRegionState(hri).isClosed());
106 RegionInfo createTableAndGetOneRegion(final TableName tableName)
107 throws IOException, InterruptedException, ExecutionException {
108 TableDescriptor desc =
109 TableDescriptorBuilder.newBuilder(tableName)
110 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
111 admin.createTable(desc, Bytes.toBytes("A"), Bytes.toBytes("Z"), 5).get();
113 // wait till the table is assigned
114 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
115 long timeoutTime = EnvironmentEdgeManager.currentTime() + 3000;
116 while (true) {
117 List<RegionInfo> regions =
118 master.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName);
119 if (regions.size() > 3) {
120 return regions.get(2);
122 long now = EnvironmentEdgeManager.currentTime();
123 if (now > timeoutTime) {
124 fail("Could not find an online region");
126 Thread.sleep(10);
130 @Test
131 public void testGetRegionByStateOfTable() throws Exception {
132 RegionInfo hri = createTableAndGetOneRegion(tableName);
134 RegionStates regionStates =
135 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
136 assertTrue(regionStates.getRegionByStateOfTable(tableName).get(RegionState.State.OPEN)
137 .stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
138 assertFalse(regionStates.getRegionByStateOfTable(TableName.valueOf("I_am_the_phantom"))
139 .get(RegionState.State.OPEN).stream().anyMatch(r -> RegionInfo.COMPARATOR.compare(r, hri) == 0));
142 @Test
143 public void testMoveRegion() throws Exception {
144 admin.balancerSwitch(false).join();
146 RegionInfo hri = createTableAndGetOneRegion(tableName);
147 RawAsyncHBaseAdmin rawAdmin = (RawAsyncHBaseAdmin) ASYNC_CONN.getAdmin();
148 ServerName serverName = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
150 HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
151 ServerManager serverManager = master.getServerManager();
152 ServerName destServerName = null;
153 List<JVMClusterUtil.RegionServerThread> regionServers =
154 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads();
155 for (JVMClusterUtil.RegionServerThread regionServer : regionServers) {
156 HRegionServer destServer = regionServer.getRegionServer();
157 destServerName = destServer.getServerName();
158 if (!destServerName.equals(serverName) && serverManager.isServerOnline(destServerName)) {
159 break;
163 assertTrue(destServerName != null && !destServerName.equals(serverName));
164 admin.move(hri.getRegionName(), destServerName).get();
166 long timeoutTime = EnvironmentEdgeManager.currentTime() + 30000;
167 while (true) {
168 ServerName sn = rawAdmin.getRegionLocation(hri.getRegionName()).get().getServerName();
169 if (sn != null && sn.equals(destServerName)) {
170 break;
172 long now = EnvironmentEdgeManager.currentTime();
173 if (now > timeoutTime) {
174 fail("Failed to move the region in time: " + hri);
176 Thread.sleep(100);
178 admin.balancerSwitch(true).join();
181 @Test
182 public void testGetOnlineRegions() throws Exception {
183 createTableAndGetOneRegion(tableName);
184 AtomicInteger regionServerCount = new AtomicInteger(0);
185 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
186 .map(rsThread -> rsThread.getRegionServer()).forEach(rs -> {
187 ServerName serverName = rs.getServerName();
188 try {
189 assertEquals(admin.getRegions(serverName).get().size(), rs.getRegions().size());
190 } catch (Exception e) {
191 fail("admin.getOnlineRegions() method throws a exception: " + e.getMessage());
193 regionServerCount.incrementAndGet();
195 assertEquals(TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().size(),
196 regionServerCount.get());
199 @Test
200 public void testFlushTableAndRegion() throws Exception {
201 RegionInfo hri = createTableAndGetOneRegion(tableName);
202 ServerName serverName =
203 TEST_UTIL.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
204 .getRegionServerOfRegion(hri);
205 HRegionServer regionServer =
206 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
207 .map(rsThread -> rsThread.getRegionServer())
208 .filter(rs -> rs.getServerName().equals(serverName)).findFirst().get();
210 // write a put into the specific region
211 ASYNC_CONN.getTable(tableName)
212 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-1")))
213 .join();
214 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
215 // flush region and wait flush operation finished.
216 LOG.info("flushing region: " + Bytes.toStringBinary(hri.getRegionName()));
217 admin.flushRegion(hri.getRegionName()).get();
218 LOG.info("blocking until flush is complete: " + Bytes.toStringBinary(hri.getRegionName()));
219 Threads.sleepWithoutInterrupt(500);
220 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
221 Threads.sleep(50);
223 // check the memstore.
224 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
226 // write another put into the specific region
227 ASYNC_CONN.getTable(tableName)
228 .put(new Put(hri.getStartKey()).addColumn(FAMILY, FAMILY_0, Bytes.toBytes("value-2")))
229 .join();
230 assertTrue(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0);
231 admin.flush(tableName).get();
232 Threads.sleepWithoutInterrupt(500);
233 while (regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize() > 0) {
234 Threads.sleep(50);
236 // check the memstore.
237 assertEquals(regionServer.getOnlineRegion(hri.getRegionName()).getMemStoreDataSize(), 0);
240 private void waitUntilMobCompactionFinished(TableName tableName)
241 throws ExecutionException, InterruptedException {
242 long finished = EnvironmentEdgeManager.currentTime() + 60000;
243 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB).get();
244 while (EnvironmentEdgeManager.currentTime() < finished) {
245 if (state == CompactionState.NONE) {
246 break;
248 Thread.sleep(10);
249 state = admin.getCompactionState(tableName, CompactType.MOB).get();
251 assertEquals(CompactionState.NONE, state);
254 @Test
255 public void testCompactMob() throws Exception {
256 ColumnFamilyDescriptor columnDescriptor =
257 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("mob"))
258 .setMobEnabled(true).setMobThreshold(0).build();
260 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
261 .setColumnFamily(columnDescriptor).build();
263 admin.createTable(tableDescriptor).get();
265 byte[][] families = { Bytes.toBytes("mob") };
266 loadData(tableName, families, 3000, 8);
268 admin.majorCompact(tableName).get();
270 CompactionState state = admin.getCompactionState(tableName).get();
271 assertNotEquals(CompactionState.NONE, state);
273 waitUntilMobCompactionFinished(tableName);
276 @Test
277 public void testCompactRegionServer() throws Exception {
278 byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3") };
279 createTableWithDefaultConf(tableName, null, families);
280 loadData(tableName, families, 3000, 8);
282 List<HRegionServer> rsList =
283 TEST_UTIL.getHBaseCluster().getLiveRegionServerThreads().stream()
284 .map(rsThread -> rsThread.getRegionServer()).collect(Collectors.toList());
285 List<Region> regions = new ArrayList<>();
286 rsList.forEach(rs -> regions.addAll(rs.getRegions(tableName)));
287 assertEquals(1, regions.size());
288 int countBefore = countStoreFilesInFamilies(regions, families);
289 assertTrue(countBefore > 0);
291 // Minor compaction for all region servers.
292 for (HRegionServer rs : rsList)
293 admin.compactRegionServer(rs.getServerName()).get();
294 Thread.sleep(5000);
295 int countAfterMinorCompaction = countStoreFilesInFamilies(regions, families);
296 assertTrue(countAfterMinorCompaction < countBefore);
298 // Major compaction for all region servers.
299 for (HRegionServer rs : rsList)
300 admin.majorCompactRegionServer(rs.getServerName()).get();
301 Thread.sleep(5000);
302 int countAfterMajorCompaction = countStoreFilesInFamilies(regions, families);
303 assertEquals(3, countAfterMajorCompaction);
306 @Test
307 public void testCompactionSwitchStates() throws Exception {
308 // Create a table with regions
309 byte[] family = Bytes.toBytes("family");
310 byte[][] families = {family, Bytes.add(family, Bytes.toBytes("2")),
311 Bytes.add(family, Bytes.toBytes("3"))};
312 createTableWithDefaultConf(tableName, null, families);
313 loadData(tableName, families, 3000, 8);
314 List<Region> regions = new ArrayList<>();
315 TEST_UTIL
316 .getHBaseCluster()
317 .getLiveRegionServerThreads()
318 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
319 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture =
320 admin.compactionSwitch(true, new ArrayList<>());
321 Map<ServerName, Boolean> pairs = listCompletableFuture.get();
322 for (Map.Entry<ServerName, Boolean> p : pairs.entrySet()) {
323 assertEquals("Default compaction state, expected=enabled actual=disabled",
324 true, p.getValue());
326 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture1 =
327 admin.compactionSwitch(false, new ArrayList<>());
328 Map<ServerName, Boolean> pairs1 = listCompletableFuture1.get();
329 for (Map.Entry<ServerName, Boolean> p : pairs1.entrySet()) {
330 assertEquals("Last compaction state, expected=enabled actual=disabled",
331 true, p.getValue());
333 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture2 =
334 admin.compactionSwitch(true, new ArrayList<>());
335 Map<ServerName, Boolean> pairs2 = listCompletableFuture2.get();
336 for (Map.Entry<ServerName, Boolean> p : pairs2.entrySet()) {
337 assertEquals("Last compaction state, expected=disabled actual=enabled",
338 false, p.getValue());
340 ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0)
341 .getServerName();
342 List<String> serverNameList = new ArrayList<String>();
343 serverNameList.add(serverName.getServerName());
344 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture3 =
345 admin.compactionSwitch(false, serverNameList);
346 Map<ServerName, Boolean> pairs3 = listCompletableFuture3.get();
347 assertEquals(pairs3.entrySet().size(), 1);
348 for (Map.Entry<ServerName, Boolean> p : pairs3.entrySet()) {
349 assertEquals("Last compaction state, expected=enabled actual=disabled",
350 true, p.getValue());
352 CompletableFuture<Map<ServerName, Boolean>> listCompletableFuture4 =
353 admin.compactionSwitch(true, serverNameList);
354 Map<ServerName, Boolean> pairs4 = listCompletableFuture4.get();
355 assertEquals(pairs4.entrySet().size(), 1);
356 for (Map.Entry<ServerName, Boolean> p : pairs4.entrySet()) {
357 assertEquals("Last compaction state, expected=disabled actual=enabled",
358 false, p.getValue());
362 @Test
363 public void testCompact() throws Exception {
364 compactionTest(TableName.valueOf("testCompact1"), 15, CompactionState.MINOR, false);
365 compactionTest(TableName.valueOf("testCompact2"), 15, CompactionState.MINOR, true);
367 // For major compaction, set up a higher hbase.hstore.compaction.min to avoid
368 // minor compactions. It is a hack to avoid random delays introduced by Admins's
369 // updateConfiguration() method.
370 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
371 Configuration conf = thread.getRegionServer().getConfiguration();
372 conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 25);
375 compactionTest(TableName.valueOf("testCompact3"), 8, CompactionState.MAJOR, false);
376 compactionTest(TableName.valueOf("testCompact4"), 8, CompactionState.MAJOR, true);
378 // Restore to default
379 TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> {
380 Configuration conf = thread.getRegionServer().getConfiguration();
381 conf.unset(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY);
385 private void compactionTest(final TableName tableName, final int flushes,
386 final CompactionState expectedState, boolean singleFamily) throws Exception {
387 // Create a table with regions
388 byte[] family = Bytes.toBytes("family");
389 byte[][] families =
390 { family, Bytes.add(family, Bytes.toBytes("2")), Bytes.add(family, Bytes.toBytes("3")) };
391 createTableWithDefaultConf(tableName, null, families);
393 byte[][] singleFamilyArray = { family };
395 // When singleFamily is true, only load data for the family being tested. This is to avoid
396 // the case that while major compaction is going on for the family, minor compaction could
397 // happen for other families at the same time (Two compaction threads long/short), thus
398 // pollute the compaction and store file numbers for the region.
399 if (singleFamily) {
400 loadData(tableName, singleFamilyArray, 3000, flushes);
401 } else {
402 loadData(tableName, families, 3000, flushes);
405 List<Region> regions = new ArrayList<>();
406 TEST_UTIL
407 .getHBaseCluster()
408 .getLiveRegionServerThreads()
409 .forEach(rsThread -> regions.addAll(rsThread.getRegionServer().getRegions(tableName)));
410 assertEquals(1, regions.size());
412 int countBefore = countStoreFilesInFamilies(regions, families);
413 int countBeforeSingleFamily = countStoreFilesInFamily(regions, family);
414 assertTrue(countBefore > 0); // there should be some data files
415 if (expectedState == CompactionState.MINOR) {
416 if (singleFamily) {
417 admin.compact(tableName, family).get();
418 } else {
419 admin.compact(tableName).get();
421 } else {
422 if (singleFamily) {
423 admin.majorCompact(tableName, family).get();
424 } else {
425 admin.majorCompact(tableName).get();
429 long curt = EnvironmentEdgeManager.currentTime();
430 long waitTime = 10000;
431 long endt = curt + waitTime;
432 CompactionState state = admin.getCompactionState(tableName).get();
433 while (state == CompactionState.NONE && curt < endt) {
434 Thread.sleep(1);
435 state = admin.getCompactionState(tableName).get();
436 curt = EnvironmentEdgeManager.currentTime();
438 // Now, should have the right compaction state,
439 // otherwise, the compaction should have already been done
440 if (expectedState != state) {
441 for (Region region : regions) {
442 state = CompactionState.valueOf(region.getCompactionState().toString());
443 assertEquals(CompactionState.NONE, state);
445 } else {
446 // Wait until the compaction is done
447 state = admin.getCompactionState(tableName).get();
448 while (state != CompactionState.NONE && curt < endt) {
449 Thread.sleep(10);
450 state = admin.getCompactionState(tableName).get();
452 // Now, compaction should be done.
453 assertEquals(CompactionState.NONE, state);
456 int countAfter = countStoreFilesInFamilies(regions, families);
457 int countAfterSingleFamily = countStoreFilesInFamily(regions, family);
458 assertTrue(countAfter < countBefore);
459 if (!singleFamily) {
460 if (expectedState == CompactionState.MAJOR) {
461 assertEquals(families.length, countAfter);
462 } else {
463 assertTrue(families.length <= countAfter);
465 } else {
466 int singleFamDiff = countBeforeSingleFamily - countAfterSingleFamily;
467 // assert only change was to single column family
468 assertEquals(singleFamDiff, (countBefore - countAfter));
469 if (expectedState == CompactionState.MAJOR) {
470 assertEquals(1, countAfterSingleFamily);
471 } else {
472 assertTrue("" + countAfterSingleFamily, 1 <= countAfterSingleFamily);
477 @Test
478 public void testNonExistentTableCompaction() {
479 testNonExistentTableCompaction(CompactionState.MINOR);
480 testNonExistentTableCompaction(CompactionState.MAJOR);
483 private void testNonExistentTableCompaction(CompactionState compactionState) {
484 try {
485 if (compactionState == CompactionState.MINOR) {
486 admin.compact(TableName.valueOf("NonExistentTable")).get();
487 } else {
488 admin.majorCompact(TableName.valueOf("NonExistentTable")).get();
490 fail("Expected TableNotFoundException when table doesn't exist");
491 } catch (Exception e) {
492 // expected.
493 assertTrue(e.getCause() instanceof TableNotFoundException);
497 private static int countStoreFilesInFamily(List<Region> regions, final byte[] family) {
498 return countStoreFilesInFamilies(regions, new byte[][] { family });
501 private static int countStoreFilesInFamilies(List<Region> regions, final byte[][] families) {
502 int count = 0;
503 for (Region region : regions) {
504 count += region.getStoreFileList(families).size();
506 return count;
509 static void loadData(final TableName tableName, final byte[][] families, final int rows)
510 throws IOException {
511 loadData(tableName, families, rows, 1);
514 static void loadData(final TableName tableName, final byte[][] families, final int rows,
515 final int flushes) throws IOException {
516 AsyncTable<?> table = ASYNC_CONN.getTable(tableName);
517 List<Put> puts = new ArrayList<>(rows);
518 byte[] qualifier = Bytes.toBytes("val");
519 for (int i = 0; i < flushes; i++) {
520 for (int k = 0; k < rows; k++) {
521 byte[] row = Bytes.add(Bytes.toBytes(k), Bytes.toBytes(i));
522 Put p = new Put(row);
523 for (int j = 0; j < families.length; ++j) {
524 p.addColumn(families[j], qualifier, row);
526 puts.add(p);
528 table.putAll(puts).join();
529 TEST_UTIL.flush();
530 puts.clear();