2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.hamcrest
.MatcherAssert
.assertThat
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertFalse
;
24 import static org
.junit
.Assert
.assertNotEquals
;
25 import static org
.junit
.Assert
.assertTrue
;
26 import static org
.junit
.Assert
.fail
;
28 import java
.io
.IOException
;
29 import java
.util
.ArrayList
;
30 import java
.util
.List
;
32 import java
.util
.concurrent
.CompletableFuture
;
33 import java
.util
.concurrent
.ExecutionException
;
34 import java
.util
.concurrent
.atomic
.AtomicInteger
;
35 import java
.util
.stream
.Collectors
;
36 import org
.apache
.hadoop
.conf
.Configuration
;
37 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
38 import org
.apache
.hadoop
.hbase
.ServerName
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
41 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
42 import org
.apache
.hadoop
.hbase
.master
.RegionState
;
43 import org
.apache
.hadoop
.hbase
.master
.ServerManager
;
44 import org
.apache
.hadoop
.hbase
.master
.assignment
.AssignmentManager
;
45 import org
.apache
.hadoop
.hbase
.master
.assignment
.RegionStates
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
48 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionConfiguration
;
49 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
50 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
51 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
52 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
53 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
54 import org
.apache
.hadoop
.hbase
.util
.Threads
;
55 import org
.junit
.ClassRule
;
56 import org
.junit
.Test
;
57 import org
.junit
.experimental
.categories
.Category
;
58 import org
.junit
.runner
.RunWith
;
59 import org
.junit
.runners
.Parameterized
;
62 * Class to test asynchronous region admin operations.
63 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
64 * ten minute timeout so they were split.
66 @RunWith(Parameterized
.class)
67 @Category({ LargeTests
.class, ClientTests
.class })
68 public class TestAsyncRegionAdminApi
extends TestAsyncAdminBase
{
70 public static final HBaseClassTestRule CLASS_RULE
=
71 HBaseClassTestRule
.forClass(TestAsyncRegionAdminApi
.class);
74 public void testAssignRegionAndUnassignRegion() throws Exception
{
75 createTableWithDefaultConf(tableName
);
78 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
79 AssignmentManager am
= master
.getAssignmentManager();
80 RegionInfo hri
= am
.getRegionStates().getRegionsOfTable(tableName
).get(0);
82 // assert region on server
83 RegionStates regionStates
= am
.getRegionStates();
84 ServerName serverName
= regionStates
.getRegionServerOfRegion(hri
);
85 TEST_UTIL
.assertRegionOnServer(hri
, serverName
, 200);
86 assertTrue(regionStates
.getRegionState(hri
).isOpened());
88 // Region is assigned now. Let's assign it again.
89 // Master should not abort, and region should stay assigned.
91 admin
.assign(hri
.getRegionName()).get();
92 fail("Should fail when assigning an already onlined region");
93 } catch (ExecutionException e
) {
95 assertThat(e
.getCause(), instanceOf(DoNotRetryRegionException
.class));
97 assertFalse(am
.getRegionStates().getRegionStateNode(hri
).isInTransition());
98 assertTrue(regionStates
.getRegionState(hri
).isOpened());
101 admin
.unassign(hri
.getRegionName(), true).get();
102 assertFalse(am
.getRegionStates().getRegionStateNode(hri
).isInTransition());
103 assertTrue(regionStates
.getRegionState(hri
).isClosed());
106 RegionInfo
createTableAndGetOneRegion(final TableName tableName
)
107 throws IOException
, InterruptedException
, ExecutionException
{
108 TableDescriptor desc
=
109 TableDescriptorBuilder
.newBuilder(tableName
)
110 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).build();
111 admin
.createTable(desc
, Bytes
.toBytes("A"), Bytes
.toBytes("Z"), 5).get();
113 // wait till the table is assigned
114 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
115 long timeoutTime
= EnvironmentEdgeManager
.currentTime() + 3000;
117 List
<RegionInfo
> regions
=
118 master
.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName
);
119 if (regions
.size() > 3) {
120 return regions
.get(2);
122 long now
= EnvironmentEdgeManager
.currentTime();
123 if (now
> timeoutTime
) {
124 fail("Could not find an online region");
131 public void testGetRegionByStateOfTable() throws Exception
{
132 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
134 RegionStates regionStates
=
135 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
136 assertTrue(regionStates
.getRegionByStateOfTable(tableName
).get(RegionState
.State
.OPEN
)
137 .stream().anyMatch(r
-> RegionInfo
.COMPARATOR
.compare(r
, hri
) == 0));
138 assertFalse(regionStates
.getRegionByStateOfTable(TableName
.valueOf("I_am_the_phantom"))
139 .get(RegionState
.State
.OPEN
).stream().anyMatch(r
-> RegionInfo
.COMPARATOR
.compare(r
, hri
) == 0));
143 public void testMoveRegion() throws Exception
{
144 admin
.balancerSwitch(false).join();
146 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
147 RawAsyncHBaseAdmin rawAdmin
= (RawAsyncHBaseAdmin
) ASYNC_CONN
.getAdmin();
148 ServerName serverName
= rawAdmin
.getRegionLocation(hri
.getRegionName()).get().getServerName();
150 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
151 ServerManager serverManager
= master
.getServerManager();
152 ServerName destServerName
= null;
153 List
<JVMClusterUtil
.RegionServerThread
> regionServers
=
154 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads();
155 for (JVMClusterUtil
.RegionServerThread regionServer
: regionServers
) {
156 HRegionServer destServer
= regionServer
.getRegionServer();
157 destServerName
= destServer
.getServerName();
158 if (!destServerName
.equals(serverName
) && serverManager
.isServerOnline(destServerName
)) {
163 assertTrue(destServerName
!= null && !destServerName
.equals(serverName
));
164 admin
.move(hri
.getRegionName(), destServerName
).get();
166 long timeoutTime
= EnvironmentEdgeManager
.currentTime() + 30000;
168 ServerName sn
= rawAdmin
.getRegionLocation(hri
.getRegionName()).get().getServerName();
169 if (sn
!= null && sn
.equals(destServerName
)) {
172 long now
= EnvironmentEdgeManager
.currentTime();
173 if (now
> timeoutTime
) {
174 fail("Failed to move the region in time: " + hri
);
178 admin
.balancerSwitch(true).join();
182 public void testGetOnlineRegions() throws Exception
{
183 createTableAndGetOneRegion(tableName
);
184 AtomicInteger regionServerCount
= new AtomicInteger(0);
185 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
186 .map(rsThread
-> rsThread
.getRegionServer()).forEach(rs
-> {
187 ServerName serverName
= rs
.getServerName();
189 assertEquals(admin
.getRegions(serverName
).get().size(), rs
.getRegions().size());
190 } catch (Exception e
) {
191 fail("admin.getOnlineRegions() method throws a exception: " + e
.getMessage());
193 regionServerCount
.incrementAndGet();
195 assertEquals(TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().size(),
196 regionServerCount
.get());
200 public void testFlushTableAndRegion() throws Exception
{
201 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
202 ServerName serverName
=
203 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
204 .getRegionServerOfRegion(hri
);
205 HRegionServer regionServer
=
206 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
207 .map(rsThread
-> rsThread
.getRegionServer())
208 .filter(rs
-> rs
.getServerName().equals(serverName
)).findFirst().get();
210 // write a put into the specific region
211 ASYNC_CONN
.getTable(tableName
)
212 .put(new Put(hri
.getStartKey()).addColumn(FAMILY
, FAMILY_0
, Bytes
.toBytes("value-1")))
214 assertTrue(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0);
215 // flush region and wait flush operation finished.
216 LOG
.info("flushing region: " + Bytes
.toStringBinary(hri
.getRegionName()));
217 admin
.flushRegion(hri
.getRegionName()).get();
218 LOG
.info("blocking until flush is complete: " + Bytes
.toStringBinary(hri
.getRegionName()));
219 Threads
.sleepWithoutInterrupt(500);
220 while (regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0) {
223 // check the memstore.
224 assertEquals(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize(), 0);
226 // write another put into the specific region
227 ASYNC_CONN
.getTable(tableName
)
228 .put(new Put(hri
.getStartKey()).addColumn(FAMILY
, FAMILY_0
, Bytes
.toBytes("value-2")))
230 assertTrue(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0);
231 admin
.flush(tableName
).get();
232 Threads
.sleepWithoutInterrupt(500);
233 while (regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0) {
236 // check the memstore.
237 assertEquals(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize(), 0);
240 private void waitUntilMobCompactionFinished(TableName tableName
)
241 throws ExecutionException
, InterruptedException
{
242 long finished
= EnvironmentEdgeManager
.currentTime() + 60000;
243 CompactionState state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
244 while (EnvironmentEdgeManager
.currentTime() < finished
) {
245 if (state
== CompactionState
.NONE
) {
249 state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
251 assertEquals(CompactionState
.NONE
, state
);
255 public void testCompactMob() throws Exception
{
256 ColumnFamilyDescriptor columnDescriptor
=
257 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("mob"))
258 .setMobEnabled(true).setMobThreshold(0).build();
260 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
261 .setColumnFamily(columnDescriptor
).build();
263 admin
.createTable(tableDescriptor
).get();
265 byte[][] families
= { Bytes
.toBytes("mob") };
266 loadData(tableName
, families
, 3000, 8);
268 admin
.majorCompact(tableName
).get();
270 CompactionState state
= admin
.getCompactionState(tableName
).get();
271 assertNotEquals(CompactionState
.NONE
, state
);
273 waitUntilMobCompactionFinished(tableName
);
277 public void testCompactRegionServer() throws Exception
{
278 byte[][] families
= { Bytes
.toBytes("f1"), Bytes
.toBytes("f2"), Bytes
.toBytes("f3") };
279 createTableWithDefaultConf(tableName
, null, families
);
280 loadData(tableName
, families
, 3000, 8);
282 List
<HRegionServer
> rsList
=
283 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
284 .map(rsThread
-> rsThread
.getRegionServer()).collect(Collectors
.toList());
285 List
<Region
> regions
= new ArrayList
<>();
286 rsList
.forEach(rs
-> regions
.addAll(rs
.getRegions(tableName
)));
287 assertEquals(1, regions
.size());
288 int countBefore
= countStoreFilesInFamilies(regions
, families
);
289 assertTrue(countBefore
> 0);
291 // Minor compaction for all region servers.
292 for (HRegionServer rs
: rsList
)
293 admin
.compactRegionServer(rs
.getServerName()).get();
295 int countAfterMinorCompaction
= countStoreFilesInFamilies(regions
, families
);
296 assertTrue(countAfterMinorCompaction
< countBefore
);
298 // Major compaction for all region servers.
299 for (HRegionServer rs
: rsList
)
300 admin
.majorCompactRegionServer(rs
.getServerName()).get();
302 int countAfterMajorCompaction
= countStoreFilesInFamilies(regions
, families
);
303 assertEquals(3, countAfterMajorCompaction
);
307 public void testCompactionSwitchStates() throws Exception
{
308 // Create a table with regions
309 byte[] family
= Bytes
.toBytes("family");
310 byte[][] families
= {family
, Bytes
.add(family
, Bytes
.toBytes("2")),
311 Bytes
.add(family
, Bytes
.toBytes("3"))};
312 createTableWithDefaultConf(tableName
, null, families
);
313 loadData(tableName
, families
, 3000, 8);
314 List
<Region
> regions
= new ArrayList
<>();
317 .getLiveRegionServerThreads()
318 .forEach(rsThread
-> regions
.addAll(rsThread
.getRegionServer().getRegions(tableName
)));
319 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture
=
320 admin
.compactionSwitch(true, new ArrayList
<>());
321 Map
<ServerName
, Boolean
> pairs
= listCompletableFuture
.get();
322 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs
.entrySet()) {
323 assertEquals("Default compaction state, expected=enabled actual=disabled",
326 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture1
=
327 admin
.compactionSwitch(false, new ArrayList
<>());
328 Map
<ServerName
, Boolean
> pairs1
= listCompletableFuture1
.get();
329 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs1
.entrySet()) {
330 assertEquals("Last compaction state, expected=enabled actual=disabled",
333 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture2
=
334 admin
.compactionSwitch(true, new ArrayList
<>());
335 Map
<ServerName
, Boolean
> pairs2
= listCompletableFuture2
.get();
336 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs2
.entrySet()) {
337 assertEquals("Last compaction state, expected=disabled actual=enabled",
338 false, p
.getValue());
340 ServerName serverName
= TEST_UTIL
.getHBaseCluster().getRegionServer(0)
342 List
<String
> serverNameList
= new ArrayList
<String
>();
343 serverNameList
.add(serverName
.getServerName());
344 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture3
=
345 admin
.compactionSwitch(false, serverNameList
);
346 Map
<ServerName
, Boolean
> pairs3
= listCompletableFuture3
.get();
347 assertEquals(pairs3
.entrySet().size(), 1);
348 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs3
.entrySet()) {
349 assertEquals("Last compaction state, expected=enabled actual=disabled",
352 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture4
=
353 admin
.compactionSwitch(true, serverNameList
);
354 Map
<ServerName
, Boolean
> pairs4
= listCompletableFuture4
.get();
355 assertEquals(pairs4
.entrySet().size(), 1);
356 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs4
.entrySet()) {
357 assertEquals("Last compaction state, expected=disabled actual=enabled",
358 false, p
.getValue());
363 public void testCompact() throws Exception
{
364 compactionTest(TableName
.valueOf("testCompact1"), 15, CompactionState
.MINOR
, false);
365 compactionTest(TableName
.valueOf("testCompact2"), 15, CompactionState
.MINOR
, true);
367 // For major compaction, set up a higher hbase.hstore.compaction.min to avoid
368 // minor compactions. It is a hack to avoid random delays introduced by Admins's
369 // updateConfiguration() method.
370 TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads().forEach(thread
-> {
371 Configuration conf
= thread
.getRegionServer().getConfiguration();
372 conf
.setInt(CompactionConfiguration
.HBASE_HSTORE_COMPACTION_MIN_KEY
, 25);
375 compactionTest(TableName
.valueOf("testCompact3"), 8, CompactionState
.MAJOR
, false);
376 compactionTest(TableName
.valueOf("testCompact4"), 8, CompactionState
.MAJOR
, true);
378 // Restore to default
379 TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads().forEach(thread
-> {
380 Configuration conf
= thread
.getRegionServer().getConfiguration();
381 conf
.unset(CompactionConfiguration
.HBASE_HSTORE_COMPACTION_MIN_KEY
);
385 private void compactionTest(final TableName tableName
, final int flushes
,
386 final CompactionState expectedState
, boolean singleFamily
) throws Exception
{
387 // Create a table with regions
388 byte[] family
= Bytes
.toBytes("family");
390 { family
, Bytes
.add(family
, Bytes
.toBytes("2")), Bytes
.add(family
, Bytes
.toBytes("3")) };
391 createTableWithDefaultConf(tableName
, null, families
);
393 byte[][] singleFamilyArray
= { family
};
395 // When singleFamily is true, only load data for the family being tested. This is to avoid
396 // the case that while major compaction is going on for the family, minor compaction could
397 // happen for other families at the same time (Two compaction threads long/short), thus
398 // pollute the compaction and store file numbers for the region.
400 loadData(tableName
, singleFamilyArray
, 3000, flushes
);
402 loadData(tableName
, families
, 3000, flushes
);
405 List
<Region
> regions
= new ArrayList
<>();
408 .getLiveRegionServerThreads()
409 .forEach(rsThread
-> regions
.addAll(rsThread
.getRegionServer().getRegions(tableName
)));
410 assertEquals(1, regions
.size());
412 int countBefore
= countStoreFilesInFamilies(regions
, families
);
413 int countBeforeSingleFamily
= countStoreFilesInFamily(regions
, family
);
414 assertTrue(countBefore
> 0); // there should be some data files
415 if (expectedState
== CompactionState
.MINOR
) {
417 admin
.compact(tableName
, family
).get();
419 admin
.compact(tableName
).get();
423 admin
.majorCompact(tableName
, family
).get();
425 admin
.majorCompact(tableName
).get();
429 long curt
= EnvironmentEdgeManager
.currentTime();
430 long waitTime
= 10000;
431 long endt
= curt
+ waitTime
;
432 CompactionState state
= admin
.getCompactionState(tableName
).get();
433 while (state
== CompactionState
.NONE
&& curt
< endt
) {
435 state
= admin
.getCompactionState(tableName
).get();
436 curt
= EnvironmentEdgeManager
.currentTime();
438 // Now, should have the right compaction state,
439 // otherwise, the compaction should have already been done
440 if (expectedState
!= state
) {
441 for (Region region
: regions
) {
442 state
= CompactionState
.valueOf(region
.getCompactionState().toString());
443 assertEquals(CompactionState
.NONE
, state
);
446 // Wait until the compaction is done
447 state
= admin
.getCompactionState(tableName
).get();
448 while (state
!= CompactionState
.NONE
&& curt
< endt
) {
450 state
= admin
.getCompactionState(tableName
).get();
452 // Now, compaction should be done.
453 assertEquals(CompactionState
.NONE
, state
);
456 int countAfter
= countStoreFilesInFamilies(regions
, families
);
457 int countAfterSingleFamily
= countStoreFilesInFamily(regions
, family
);
458 assertTrue(countAfter
< countBefore
);
460 if (expectedState
== CompactionState
.MAJOR
) {
461 assertEquals(families
.length
, countAfter
);
463 assertTrue(families
.length
<= countAfter
);
466 int singleFamDiff
= countBeforeSingleFamily
- countAfterSingleFamily
;
467 // assert only change was to single column family
468 assertEquals(singleFamDiff
, (countBefore
- countAfter
));
469 if (expectedState
== CompactionState
.MAJOR
) {
470 assertEquals(1, countAfterSingleFamily
);
472 assertTrue("" + countAfterSingleFamily
, 1 <= countAfterSingleFamily
);
478 public void testNonExistentTableCompaction() {
479 testNonExistentTableCompaction(CompactionState
.MINOR
);
480 testNonExistentTableCompaction(CompactionState
.MAJOR
);
483 private void testNonExistentTableCompaction(CompactionState compactionState
) {
485 if (compactionState
== CompactionState
.MINOR
) {
486 admin
.compact(TableName
.valueOf("NonExistentTable")).get();
488 admin
.majorCompact(TableName
.valueOf("NonExistentTable")).get();
490 fail("Expected TableNotFoundException when table doesn't exist");
491 } catch (Exception e
) {
493 assertTrue(e
.getCause() instanceof TableNotFoundException
);
497 private static int countStoreFilesInFamily(List
<Region
> regions
, final byte[] family
) {
498 return countStoreFilesInFamilies(regions
, new byte[][] { family
});
501 private static int countStoreFilesInFamilies(List
<Region
> regions
, final byte[][] families
) {
503 for (Region region
: regions
) {
504 count
+= region
.getStoreFileList(families
).size();
509 static void loadData(final TableName tableName
, final byte[][] families
, final int rows
)
511 loadData(tableName
, families
, rows
, 1);
514 static void loadData(final TableName tableName
, final byte[][] families
, final int rows
,
515 final int flushes
) throws IOException
{
516 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tableName
);
517 List
<Put
> puts
= new ArrayList
<>(rows
);
518 byte[] qualifier
= Bytes
.toBytes("val");
519 for (int i
= 0; i
< flushes
; i
++) {
520 for (int k
= 0; k
< rows
; k
++) {
521 byte[] row
= Bytes
.add(Bytes
.toBytes(k
), Bytes
.toBytes(i
));
522 Put p
= new Put(row
);
523 for (int j
= 0; j
< families
.length
; ++j
) {
524 p
.addColumn(families
[j
], qualifier
, row
);
528 table
.putAll(puts
).join();