2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertFalse
;
23 import static org
.junit
.Assert
.assertNotEquals
;
24 import static org
.junit
.Assert
.assertThat
;
25 import static org
.junit
.Assert
.assertTrue
;
26 import static org
.junit
.Assert
.fail
;
28 import java
.io
.IOException
;
29 import java
.util
.ArrayList
;
30 import java
.util
.List
;
32 import java
.util
.concurrent
.CompletableFuture
;
33 import java
.util
.concurrent
.ExecutionException
;
34 import java
.util
.concurrent
.atomic
.AtomicInteger
;
35 import java
.util
.stream
.Collectors
;
36 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
37 import org
.apache
.hadoop
.hbase
.ServerName
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
40 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
41 import org
.apache
.hadoop
.hbase
.master
.RegionState
;
42 import org
.apache
.hadoop
.hbase
.master
.ServerManager
;
43 import org
.apache
.hadoop
.hbase
.master
.assignment
.AssignmentManager
;
44 import org
.apache
.hadoop
.hbase
.master
.assignment
.RegionStates
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
49 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
50 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
51 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
52 import org
.apache
.hadoop
.hbase
.util
.Threads
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
56 import org
.junit
.runner
.RunWith
;
57 import org
.junit
.runners
.Parameterized
;
60 * Class to test asynchronous region admin operations.
61 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
62 * ten minute timeout so they were split.
64 @RunWith(Parameterized
.class)
65 @Category({ LargeTests
.class, ClientTests
.class })
66 public class TestAsyncRegionAdminApi
extends TestAsyncAdminBase
{
68 public static final HBaseClassTestRule CLASS_RULE
=
69 HBaseClassTestRule
.forClass(TestAsyncRegionAdminApi
.class);
72 public void testAssignRegionAndUnassignRegion() throws Exception
{
73 createTableWithDefaultConf(tableName
);
76 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
77 AssignmentManager am
= master
.getAssignmentManager();
78 RegionInfo hri
= am
.getRegionStates().getRegionsOfTable(tableName
).get(0);
80 // assert region on server
81 RegionStates regionStates
= am
.getRegionStates();
82 ServerName serverName
= regionStates
.getRegionServerOfRegion(hri
);
83 TEST_UTIL
.assertRegionOnServer(hri
, serverName
, 200);
84 assertTrue(regionStates
.getRegionState(hri
).isOpened());
86 // Region is assigned now. Let's assign it again.
87 // Master should not abort, and region should stay assigned.
89 admin
.assign(hri
.getRegionName()).get();
90 fail("Should fail when assigning an already onlined region");
91 } catch (ExecutionException e
) {
93 assertThat(e
.getCause(), instanceOf(DoNotRetryRegionException
.class));
95 assertFalse(am
.getRegionStates().getRegionStateNode(hri
).isInTransition());
96 assertTrue(regionStates
.getRegionState(hri
).isOpened());
99 admin
.unassign(hri
.getRegionName(), true).get();
100 assertFalse(am
.getRegionStates().getRegionStateNode(hri
).isInTransition());
101 assertTrue(regionStates
.getRegionState(hri
).isClosed());
104 RegionInfo
createTableAndGetOneRegion(final TableName tableName
)
105 throws IOException
, InterruptedException
, ExecutionException
{
106 TableDescriptor desc
=
107 TableDescriptorBuilder
.newBuilder(tableName
)
108 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).build();
109 admin
.createTable(desc
, Bytes
.toBytes("A"), Bytes
.toBytes("Z"), 5).get();
111 // wait till the table is assigned
112 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
113 long timeoutTime
= System
.currentTimeMillis() + 3000;
115 List
<RegionInfo
> regions
=
116 master
.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName
);
117 if (regions
.size() > 3) {
118 return regions
.get(2);
120 long now
= System
.currentTimeMillis();
121 if (now
> timeoutTime
) {
122 fail("Could not find an online region");
129 public void testGetRegionByStateOfTable() throws Exception
{
130 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
132 RegionStates regionStates
=
133 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
134 assertTrue(regionStates
.getRegionByStateOfTable(tableName
).get(RegionState
.State
.OPEN
)
135 .stream().anyMatch(r
-> RegionInfo
.COMPARATOR
.compare(r
, hri
) == 0));
136 assertFalse(regionStates
.getRegionByStateOfTable(TableName
.valueOf("I_am_the_phantom"))
137 .get(RegionState
.State
.OPEN
).stream().anyMatch(r
-> RegionInfo
.COMPARATOR
.compare(r
, hri
) == 0));
141 public void testMoveRegion() throws Exception
{
142 admin
.balancerSwitch(false).join();
144 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
145 RawAsyncHBaseAdmin rawAdmin
= (RawAsyncHBaseAdmin
) ASYNC_CONN
.getAdmin();
146 ServerName serverName
= rawAdmin
.getRegionLocation(hri
.getRegionName()).get().getServerName();
148 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
149 ServerManager serverManager
= master
.getServerManager();
150 ServerName destServerName
= null;
151 List
<JVMClusterUtil
.RegionServerThread
> regionServers
=
152 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads();
153 for (JVMClusterUtil
.RegionServerThread regionServer
: regionServers
) {
154 HRegionServer destServer
= regionServer
.getRegionServer();
155 destServerName
= destServer
.getServerName();
156 if (!destServerName
.equals(serverName
) && serverManager
.isServerOnline(destServerName
)) {
161 assertTrue(destServerName
!= null && !destServerName
.equals(serverName
));
162 admin
.move(hri
.getRegionName(), destServerName
).get();
164 long timeoutTime
= System
.currentTimeMillis() + 30000;
166 ServerName sn
= rawAdmin
.getRegionLocation(hri
.getRegionName()).get().getServerName();
167 if (sn
!= null && sn
.equals(destServerName
)) {
170 long now
= System
.currentTimeMillis();
171 if (now
> timeoutTime
) {
172 fail("Failed to move the region in time: " + hri
);
176 admin
.balancerSwitch(true).join();
180 public void testGetOnlineRegions() throws Exception
{
181 createTableAndGetOneRegion(tableName
);
182 AtomicInteger regionServerCount
= new AtomicInteger(0);
183 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
184 .map(rsThread
-> rsThread
.getRegionServer()).forEach(rs
-> {
185 ServerName serverName
= rs
.getServerName();
187 assertEquals(admin
.getRegions(serverName
).get().size(), rs
.getRegions().size());
188 } catch (Exception e
) {
189 fail("admin.getOnlineRegions() method throws a exception: " + e
.getMessage());
191 regionServerCount
.incrementAndGet();
193 assertEquals(TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().size(),
194 regionServerCount
.get());
198 public void testFlushTableAndRegion() throws Exception
{
199 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
200 ServerName serverName
=
201 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
202 .getRegionServerOfRegion(hri
);
203 HRegionServer regionServer
=
204 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
205 .map(rsThread
-> rsThread
.getRegionServer())
206 .filter(rs
-> rs
.getServerName().equals(serverName
)).findFirst().get();
208 // write a put into the specific region
209 ASYNC_CONN
.getTable(tableName
)
210 .put(new Put(hri
.getStartKey()).addColumn(FAMILY
, FAMILY_0
, Bytes
.toBytes("value-1")))
212 assertTrue(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0);
213 // flush region and wait flush operation finished.
214 LOG
.info("flushing region: " + Bytes
.toStringBinary(hri
.getRegionName()));
215 admin
.flushRegion(hri
.getRegionName()).get();
216 LOG
.info("blocking until flush is complete: " + Bytes
.toStringBinary(hri
.getRegionName()));
217 Threads
.sleepWithoutInterrupt(500);
218 while (regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0) {
221 // check the memstore.
222 assertEquals(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize(), 0);
224 // write another put into the specific region
225 ASYNC_CONN
.getTable(tableName
)
226 .put(new Put(hri
.getStartKey()).addColumn(FAMILY
, FAMILY_0
, Bytes
.toBytes("value-2")))
228 assertTrue(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0);
229 admin
.flush(tableName
).get();
230 Threads
.sleepWithoutInterrupt(500);
231 while (regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0) {
234 // check the memstore.
235 assertEquals(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize(), 0);
238 private void waitUntilMobCompactionFinished(TableName tableName
)
239 throws ExecutionException
, InterruptedException
{
240 long finished
= EnvironmentEdgeManager
.currentTime() + 60000;
241 CompactionState state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
242 while (EnvironmentEdgeManager
.currentTime() < finished
) {
243 if (state
== CompactionState
.NONE
) {
247 state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
249 assertEquals(CompactionState
.NONE
, state
);
253 public void testCompactMob() throws Exception
{
254 ColumnFamilyDescriptor columnDescriptor
=
255 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("mob"))
256 .setMobEnabled(true).setMobThreshold(0).build();
258 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
259 .setColumnFamily(columnDescriptor
).build();
261 admin
.createTable(tableDescriptor
).get();
263 byte[][] families
= { Bytes
.toBytes("mob") };
264 loadData(tableName
, families
, 3000, 8);
266 admin
.majorCompact(tableName
).get();
268 CompactionState state
= admin
.getCompactionState(tableName
).get();
269 assertNotEquals(CompactionState
.NONE
, state
);
271 waitUntilMobCompactionFinished(tableName
);
275 public void testCompactRegionServer() throws Exception
{
276 byte[][] families
= { Bytes
.toBytes("f1"), Bytes
.toBytes("f2"), Bytes
.toBytes("f3") };
277 createTableWithDefaultConf(tableName
, null, families
);
278 loadData(tableName
, families
, 3000, 8);
280 List
<HRegionServer
> rsList
=
281 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
282 .map(rsThread
-> rsThread
.getRegionServer()).collect(Collectors
.toList());
283 List
<Region
> regions
= new ArrayList
<>();
284 rsList
.forEach(rs
-> regions
.addAll(rs
.getRegions(tableName
)));
285 assertEquals(1, regions
.size());
286 int countBefore
= countStoreFilesInFamilies(regions
, families
);
287 assertTrue(countBefore
> 0);
289 // Minor compaction for all region servers.
290 for (HRegionServer rs
: rsList
)
291 admin
.compactRegionServer(rs
.getServerName()).get();
293 int countAfterMinorCompaction
= countStoreFilesInFamilies(regions
, families
);
294 assertTrue(countAfterMinorCompaction
< countBefore
);
296 // Major compaction for all region servers.
297 for (HRegionServer rs
: rsList
)
298 admin
.majorCompactRegionServer(rs
.getServerName()).get();
300 int countAfterMajorCompaction
= countStoreFilesInFamilies(regions
, families
);
301 assertEquals(3, countAfterMajorCompaction
);
305 public void testCompactionSwitchStates() throws Exception
{
306 // Create a table with regions
307 byte[] family
= Bytes
.toBytes("family");
308 byte[][] families
= {family
, Bytes
.add(family
, Bytes
.toBytes("2")),
309 Bytes
.add(family
, Bytes
.toBytes("3"))};
310 createTableWithDefaultConf(tableName
, null, families
);
311 loadData(tableName
, families
, 3000, 8);
312 List
<Region
> regions
= new ArrayList
<>();
315 .getLiveRegionServerThreads()
316 .forEach(rsThread
-> regions
.addAll(rsThread
.getRegionServer().getRegions(tableName
)));
317 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture
=
318 admin
.compactionSwitch(true, new ArrayList
<>());
319 Map
<ServerName
, Boolean
> pairs
= listCompletableFuture
.get();
320 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs
.entrySet()) {
321 assertEquals("Default compaction state, expected=enabled actual=disabled",
324 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture1
=
325 admin
.compactionSwitch(false, new ArrayList
<>());
326 Map
<ServerName
, Boolean
> pairs1
= listCompletableFuture1
.get();
327 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs1
.entrySet()) {
328 assertEquals("Last compaction state, expected=enabled actual=disabled",
331 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture2
=
332 admin
.compactionSwitch(true, new ArrayList
<>());
333 Map
<ServerName
, Boolean
> pairs2
= listCompletableFuture2
.get();
334 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs2
.entrySet()) {
335 assertEquals("Last compaction state, expected=disabled actual=enabled",
336 false, p
.getValue());
338 ServerName serverName
= TEST_UTIL
.getHBaseCluster().getRegionServer(0)
340 List
<String
> serverNameList
= new ArrayList
<String
>();
341 serverNameList
.add(serverName
.getServerName());
342 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture3
=
343 admin
.compactionSwitch(false, serverNameList
);
344 Map
<ServerName
, Boolean
> pairs3
= listCompletableFuture3
.get();
345 assertEquals(pairs3
.entrySet().size(), 1);
346 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs3
.entrySet()) {
347 assertEquals("Last compaction state, expected=enabled actual=disabled",
350 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture4
=
351 admin
.compactionSwitch(true, serverNameList
);
352 Map
<ServerName
, Boolean
> pairs4
= listCompletableFuture4
.get();
353 assertEquals(pairs4
.entrySet().size(), 1);
354 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs4
.entrySet()) {
355 assertEquals("Last compaction state, expected=disabled actual=enabled",
356 false, p
.getValue());
361 public void testCompact() throws Exception
{
362 compactionTest(TableName
.valueOf("testCompact1"), 8, CompactionState
.MAJOR
, false);
363 compactionTest(TableName
.valueOf("testCompact2"), 15, CompactionState
.MINOR
, false);
364 compactionTest(TableName
.valueOf("testCompact3"), 8, CompactionState
.MAJOR
, true);
365 compactionTest(TableName
.valueOf("testCompact4"), 15, CompactionState
.MINOR
, true);
368 private void compactionTest(final TableName tableName
, final int flushes
,
369 final CompactionState expectedState
, boolean singleFamily
) throws Exception
{
370 // Create a table with regions
371 byte[] family
= Bytes
.toBytes("family");
373 { family
, Bytes
.add(family
, Bytes
.toBytes("2")), Bytes
.add(family
, Bytes
.toBytes("3")) };
374 createTableWithDefaultConf(tableName
, null, families
);
375 loadData(tableName
, families
, 3000, flushes
);
377 List
<Region
> regions
= new ArrayList
<>();
380 .getLiveRegionServerThreads()
381 .forEach(rsThread
-> regions
.addAll(rsThread
.getRegionServer().getRegions(tableName
)));
382 assertEquals(1, regions
.size());
384 int countBefore
= countStoreFilesInFamilies(regions
, families
);
385 int countBeforeSingleFamily
= countStoreFilesInFamily(regions
, family
);
386 assertTrue(countBefore
> 0); // there should be some data files
387 if (expectedState
== CompactionState
.MINOR
) {
389 admin
.compact(tableName
, family
).get();
391 admin
.compact(tableName
).get();
395 admin
.majorCompact(tableName
, family
).get();
397 admin
.majorCompact(tableName
).get();
401 long curt
= System
.currentTimeMillis();
402 long waitTime
= 5000;
403 long endt
= curt
+ waitTime
;
404 CompactionState state
= admin
.getCompactionState(tableName
).get();
405 while (state
== CompactionState
.NONE
&& curt
< endt
) {
407 state
= admin
.getCompactionState(tableName
).get();
408 curt
= System
.currentTimeMillis();
410 // Now, should have the right compaction state,
411 // otherwise, the compaction should have already been done
412 if (expectedState
!= state
) {
413 for (Region region
: regions
) {
414 state
= CompactionState
.valueOf(region
.getCompactionState().toString());
415 assertEquals(CompactionState
.NONE
, state
);
418 // Wait until the compaction is done
419 state
= admin
.getCompactionState(tableName
).get();
420 while (state
!= CompactionState
.NONE
&& curt
< endt
) {
422 state
= admin
.getCompactionState(tableName
).get();
424 // Now, compaction should be done.
425 assertEquals(CompactionState
.NONE
, state
);
428 int countAfter
= countStoreFilesInFamilies(regions
, families
);
429 int countAfterSingleFamily
= countStoreFilesInFamily(regions
, family
);
430 assertTrue(countAfter
< countBefore
);
432 if (expectedState
== CompactionState
.MAJOR
) assertTrue(families
.length
== countAfter
);
433 else assertTrue(families
.length
< countAfter
);
435 int singleFamDiff
= countBeforeSingleFamily
- countAfterSingleFamily
;
436 // assert only change was to single column family
437 assertTrue(singleFamDiff
== (countBefore
- countAfter
));
438 if (expectedState
== CompactionState
.MAJOR
) {
439 assertTrue(1 == countAfterSingleFamily
);
441 assertTrue(1 < countAfterSingleFamily
);
447 public void testNonExistentTableCompaction() {
448 testNonExistentTableCompaction(CompactionState
.MINOR
);
449 testNonExistentTableCompaction(CompactionState
.MAJOR
);
452 private void testNonExistentTableCompaction(CompactionState compactionState
) {
454 if (compactionState
== CompactionState
.MINOR
) {
455 admin
.compact(TableName
.valueOf("NonExistentTable")).get();
457 admin
.majorCompact(TableName
.valueOf("NonExistentTable")).get();
459 fail("Expected TableNotFoundException when table doesn't exist");
460 } catch (Exception e
) {
462 assertTrue(e
.getCause() instanceof TableNotFoundException
);
466 private static int countStoreFilesInFamily(List
<Region
> regions
, final byte[] family
) {
467 return countStoreFilesInFamilies(regions
, new byte[][] { family
});
470 private static int countStoreFilesInFamilies(List
<Region
> regions
, final byte[][] families
) {
472 for (Region region
: regions
) {
473 count
+= region
.getStoreFileList(families
).size();
478 static void loadData(final TableName tableName
, final byte[][] families
, final int rows
)
480 loadData(tableName
, families
, rows
, 1);
483 static void loadData(final TableName tableName
, final byte[][] families
, final int rows
,
484 final int flushes
) throws IOException
{
485 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tableName
);
486 List
<Put
> puts
= new ArrayList
<>(rows
);
487 byte[] qualifier
= Bytes
.toBytes("val");
488 for (int i
= 0; i
< flushes
; i
++) {
489 for (int k
= 0; k
< rows
; k
++) {
490 byte[] row
= Bytes
.add(Bytes
.toBytes(k
), Bytes
.toBytes(i
));
491 Put p
= new Put(row
);
492 for (int j
= 0; j
< families
.length
; ++j
) {
493 p
.addColumn(families
[j
], qualifier
, row
);
497 table
.putAll(puts
).join();