2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertFalse
;
23 import static org
.junit
.Assert
.assertNotEquals
;
24 import static org
.junit
.Assert
.assertThat
;
25 import static org
.junit
.Assert
.assertTrue
;
26 import static org
.junit
.Assert
.fail
;
28 import java
.io
.IOException
;
29 import java
.util
.ArrayList
;
30 import java
.util
.List
;
32 import java
.util
.concurrent
.CompletableFuture
;
33 import java
.util
.concurrent
.ExecutionException
;
34 import java
.util
.concurrent
.atomic
.AtomicInteger
;
35 import java
.util
.stream
.Collectors
;
36 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
37 import org
.apache
.hadoop
.hbase
.ServerName
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
40 import org
.apache
.hadoop
.hbase
.master
.RegionState
;
41 import org
.apache
.hadoop
.hbase
.master
.ServerManager
;
42 import org
.apache
.hadoop
.hbase
.master
.assignment
.AssignmentManager
;
43 import org
.apache
.hadoop
.hbase
.master
.assignment
.RegionStates
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
50 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
51 import org
.apache
.hadoop
.hbase
.util
.Threads
;
52 import org
.junit
.ClassRule
;
53 import org
.junit
.Test
;
54 import org
.junit
.experimental
.categories
.Category
;
55 import org
.junit
.runner
.RunWith
;
56 import org
.junit
.runners
.Parameterized
;
59 * Class to test asynchronous region admin operations.
60 * @see TestAsyncRegionAdminApi2 This test and it used to be joined it was taking longer than our
61 * ten minute timeout so they were split.
63 @RunWith(Parameterized
.class)
64 @Category({ LargeTests
.class, ClientTests
.class })
65 public class TestAsyncRegionAdminApi
extends TestAsyncAdminBase
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestAsyncRegionAdminApi
.class);
71 public void testAssignRegionAndUnassignRegion() throws Exception
{
72 createTableWithDefaultConf(tableName
);
75 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
76 AssignmentManager am
= master
.getAssignmentManager();
77 RegionInfo hri
= am
.getRegionStates().getRegionsOfTable(tableName
).get(0);
79 // assert region on server
80 RegionStates regionStates
= am
.getRegionStates();
81 ServerName serverName
= regionStates
.getRegionServerOfRegion(hri
);
82 TEST_UTIL
.assertRegionOnServer(hri
, serverName
, 200);
83 assertTrue(regionStates
.getRegionState(hri
).isOpened());
85 // Region is assigned now. Let's assign it again.
86 // Master should not abort, and region should stay assigned.
88 admin
.assign(hri
.getRegionName()).get();
89 fail("Should fail when assigning an already onlined region");
90 } catch (ExecutionException e
) {
92 assertThat(e
.getCause(), instanceOf(DoNotRetryRegionException
.class));
94 assertFalse(am
.getRegionStates().getRegionStateNode(hri
).isInTransition());
95 assertTrue(regionStates
.getRegionState(hri
).isOpened());
98 admin
.unassign(hri
.getRegionName(), true).get();
99 assertFalse(am
.getRegionStates().getRegionStateNode(hri
).isInTransition());
100 assertTrue(regionStates
.getRegionState(hri
).isClosed());
103 RegionInfo
createTableAndGetOneRegion(final TableName tableName
)
104 throws IOException
, InterruptedException
, ExecutionException
{
105 TableDescriptor desc
=
106 TableDescriptorBuilder
.newBuilder(tableName
)
107 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).build();
108 admin
.createTable(desc
, Bytes
.toBytes("A"), Bytes
.toBytes("Z"), 5).get();
110 // wait till the table is assigned
111 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
112 long timeoutTime
= System
.currentTimeMillis() + 3000;
114 List
<RegionInfo
> regions
=
115 master
.getAssignmentManager().getRegionStates().getRegionsOfTable(tableName
);
116 if (regions
.size() > 3) {
117 return regions
.get(2);
119 long now
= System
.currentTimeMillis();
120 if (now
> timeoutTime
) {
121 fail("Could not find an online region");
128 public void testGetRegionByStateOfTable() throws Exception
{
129 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
131 RegionStates regionStates
=
132 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates();
133 assertTrue(regionStates
.getRegionByStateOfTable(tableName
).get(RegionState
.State
.OPEN
)
134 .stream().anyMatch(r
-> RegionInfo
.COMPARATOR
.compare(r
, hri
) == 0));
135 assertFalse(regionStates
.getRegionByStateOfTable(TableName
.valueOf("I_am_the_phantom"))
136 .get(RegionState
.State
.OPEN
).stream().anyMatch(r
-> RegionInfo
.COMPARATOR
.compare(r
, hri
) == 0));
140 public void testMoveRegion() throws Exception
{
141 admin
.balancerSwitch(false).join();
143 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
144 RawAsyncHBaseAdmin rawAdmin
= (RawAsyncHBaseAdmin
) ASYNC_CONN
.getAdmin();
145 ServerName serverName
= rawAdmin
.getRegionLocation(hri
.getRegionName()).get().getServerName();
147 HMaster master
= TEST_UTIL
.getHBaseCluster().getMaster();
148 ServerManager serverManager
= master
.getServerManager();
149 ServerName destServerName
= null;
150 List
<JVMClusterUtil
.RegionServerThread
> regionServers
=
151 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads();
152 for (JVMClusterUtil
.RegionServerThread regionServer
: regionServers
) {
153 HRegionServer destServer
= regionServer
.getRegionServer();
154 destServerName
= destServer
.getServerName();
155 if (!destServerName
.equals(serverName
) && serverManager
.isServerOnline(destServerName
)) {
160 assertTrue(destServerName
!= null && !destServerName
.equals(serverName
));
161 admin
.move(hri
.getRegionName(), destServerName
).get();
163 long timeoutTime
= System
.currentTimeMillis() + 30000;
165 ServerName sn
= rawAdmin
.getRegionLocation(hri
.getRegionName()).get().getServerName();
166 if (sn
!= null && sn
.equals(destServerName
)) {
169 long now
= System
.currentTimeMillis();
170 if (now
> timeoutTime
) {
171 fail("Failed to move the region in time: " + hri
);
175 admin
.balancerSwitch(true).join();
179 public void testGetOnlineRegions() throws Exception
{
180 createTableAndGetOneRegion(tableName
);
181 AtomicInteger regionServerCount
= new AtomicInteger(0);
184 .getLiveRegionServerThreads()
186 .map(rsThread
-> rsThread
.getRegionServer())
189 ServerName serverName
= rs
.getServerName();
191 assertEquals(admin
.getRegions(serverName
).get().size(), rs
192 .getRegions().size());
193 } catch (Exception e
) {
194 fail("admin.getOnlineRegions() method throws a exception: " + e
.getMessage());
196 regionServerCount
.incrementAndGet();
198 assertEquals(2, regionServerCount
.get());
202 public void testFlushTableAndRegion() throws Exception
{
203 RegionInfo hri
= createTableAndGetOneRegion(tableName
);
204 ServerName serverName
=
205 TEST_UTIL
.getHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
206 .getRegionServerOfRegion(hri
);
207 HRegionServer regionServer
=
208 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
209 .map(rsThread
-> rsThread
.getRegionServer())
210 .filter(rs
-> rs
.getServerName().equals(serverName
)).findFirst().get();
212 // write a put into the specific region
213 ASYNC_CONN
.getTable(tableName
)
214 .put(new Put(hri
.getStartKey()).addColumn(FAMILY
, FAMILY_0
, Bytes
.toBytes("value-1")))
216 assertTrue(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0);
217 // flush region and wait flush operation finished.
218 LOG
.info("flushing region: " + Bytes
.toStringBinary(hri
.getRegionName()));
219 admin
.flushRegion(hri
.getRegionName()).get();
220 LOG
.info("blocking until flush is complete: " + Bytes
.toStringBinary(hri
.getRegionName()));
221 Threads
.sleepWithoutInterrupt(500);
222 while (regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0) {
225 // check the memstore.
226 assertEquals(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize(), 0);
228 // write another put into the specific region
229 ASYNC_CONN
.getTable(tableName
)
230 .put(new Put(hri
.getStartKey()).addColumn(FAMILY
, FAMILY_0
, Bytes
.toBytes("value-2")))
232 assertTrue(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0);
233 admin
.flush(tableName
).get();
234 Threads
.sleepWithoutInterrupt(500);
235 while (regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize() > 0) {
238 // check the memstore.
239 assertEquals(regionServer
.getOnlineRegion(hri
.getRegionName()).getMemStoreDataSize(), 0);
242 private void waitUntilMobCompactionFinished(TableName tableName
)
243 throws ExecutionException
, InterruptedException
{
244 long finished
= EnvironmentEdgeManager
.currentTime() + 60000;
245 CompactionState state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
246 while (EnvironmentEdgeManager
.currentTime() < finished
) {
247 if (state
== CompactionState
.NONE
) {
251 state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
253 assertEquals(CompactionState
.NONE
, state
);
257 public void testCompactMob() throws Exception
{
258 ColumnFamilyDescriptor columnDescriptor
=
259 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("mob"))
260 .setMobEnabled(true).setMobThreshold(0).build();
262 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
263 .setColumnFamily(columnDescriptor
).build();
265 admin
.createTable(tableDescriptor
).get();
267 byte[][] families
= { Bytes
.toBytes("mob") };
268 loadData(tableName
, families
, 3000, 8);
270 admin
.majorCompact(tableName
, CompactType
.MOB
).get();
272 CompactionState state
= admin
.getCompactionState(tableName
, CompactType
.MOB
).get();
273 assertNotEquals(CompactionState
.NONE
, state
);
275 waitUntilMobCompactionFinished(tableName
);
279 public void testCompactRegionServer() throws Exception
{
280 byte[][] families
= { Bytes
.toBytes("f1"), Bytes
.toBytes("f2"), Bytes
.toBytes("f3") };
281 createTableWithDefaultConf(tableName
, null, families
);
282 loadData(tableName
, families
, 3000, 8);
284 List
<HRegionServer
> rsList
=
285 TEST_UTIL
.getHBaseCluster().getLiveRegionServerThreads().stream()
286 .map(rsThread
-> rsThread
.getRegionServer()).collect(Collectors
.toList());
287 List
<Region
> regions
= new ArrayList
<>();
288 rsList
.forEach(rs
-> regions
.addAll(rs
.getRegions(tableName
)));
289 assertEquals(1, regions
.size());
290 int countBefore
= countStoreFilesInFamilies(regions
, families
);
291 assertTrue(countBefore
> 0);
293 // Minor compaction for all region servers.
294 for (HRegionServer rs
: rsList
)
295 admin
.compactRegionServer(rs
.getServerName()).get();
297 int countAfterMinorCompaction
= countStoreFilesInFamilies(regions
, families
);
298 assertTrue(countAfterMinorCompaction
< countBefore
);
300 // Major compaction for all region servers.
301 for (HRegionServer rs
: rsList
)
302 admin
.majorCompactRegionServer(rs
.getServerName()).get();
304 int countAfterMajorCompaction
= countStoreFilesInFamilies(regions
, families
);
305 assertEquals(3, countAfterMajorCompaction
);
309 public void testCompactionSwitchStates() throws Exception
{
310 // Create a table with regions
311 byte[] family
= Bytes
.toBytes("family");
312 byte[][] families
= {family
, Bytes
.add(family
, Bytes
.toBytes("2")),
313 Bytes
.add(family
, Bytes
.toBytes("3"))};
314 createTableWithDefaultConf(tableName
, null, families
);
315 loadData(tableName
, families
, 3000, 8);
316 List
<Region
> regions
= new ArrayList
<>();
319 .getLiveRegionServerThreads()
320 .forEach(rsThread
-> regions
.addAll(rsThread
.getRegionServer().getRegions(tableName
)));
321 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture
=
322 admin
.compactionSwitch(true, new ArrayList
<>());
323 Map
<ServerName
, Boolean
> pairs
= listCompletableFuture
.get();
324 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs
.entrySet()) {
325 assertEquals("Default compaction state, expected=enabled actual=disabled",
328 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture1
=
329 admin
.compactionSwitch(false, new ArrayList
<>());
330 Map
<ServerName
, Boolean
> pairs1
= listCompletableFuture1
.get();
331 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs1
.entrySet()) {
332 assertEquals("Last compaction state, expected=enabled actual=disabled",
335 CompletableFuture
<Map
<ServerName
, Boolean
>> listCompletableFuture2
=
336 admin
.compactionSwitch(true, new ArrayList
<>());
337 Map
<ServerName
, Boolean
> pairs2
= listCompletableFuture2
.get();
338 for (Map
.Entry
<ServerName
, Boolean
> p
: pairs2
.entrySet()) {
339 assertEquals("Last compaction state, expected=disabled actual=enabled",
340 false, p
.getValue());
345 public void testCompact() throws Exception
{
346 compactionTest(TableName
.valueOf("testCompact1"), 8, CompactionState
.MAJOR
, false);
347 compactionTest(TableName
.valueOf("testCompact2"), 15, CompactionState
.MINOR
, false);
348 compactionTest(TableName
.valueOf("testCompact3"), 8, CompactionState
.MAJOR
, true);
349 compactionTest(TableName
.valueOf("testCompact4"), 15, CompactionState
.MINOR
, true);
352 private void compactionTest(final TableName tableName
, final int flushes
,
353 final CompactionState expectedState
, boolean singleFamily
) throws Exception
{
354 // Create a table with regions
355 byte[] family
= Bytes
.toBytes("family");
357 { family
, Bytes
.add(family
, Bytes
.toBytes("2")), Bytes
.add(family
, Bytes
.toBytes("3")) };
358 createTableWithDefaultConf(tableName
, null, families
);
359 loadData(tableName
, families
, 3000, flushes
);
361 List
<Region
> regions
= new ArrayList
<>();
364 .getLiveRegionServerThreads()
365 .forEach(rsThread
-> regions
.addAll(rsThread
.getRegionServer().getRegions(tableName
)));
366 assertEquals(1, regions
.size());
368 int countBefore
= countStoreFilesInFamilies(regions
, families
);
369 int countBeforeSingleFamily
= countStoreFilesInFamily(regions
, family
);
370 assertTrue(countBefore
> 0); // there should be some data files
371 if (expectedState
== CompactionState
.MINOR
) {
373 admin
.compact(tableName
, family
).get();
375 admin
.compact(tableName
).get();
379 admin
.majorCompact(tableName
, family
).get();
381 admin
.majorCompact(tableName
).get();
385 long curt
= System
.currentTimeMillis();
386 long waitTime
= 5000;
387 long endt
= curt
+ waitTime
;
388 CompactionState state
= admin
.getCompactionState(tableName
).get();
389 while (state
== CompactionState
.NONE
&& curt
< endt
) {
391 state
= admin
.getCompactionState(tableName
).get();
392 curt
= System
.currentTimeMillis();
394 // Now, should have the right compaction state,
395 // otherwise, the compaction should have already been done
396 if (expectedState
!= state
) {
397 for (Region region
: regions
) {
398 state
= CompactionState
.valueOf(region
.getCompactionState().toString());
399 assertEquals(CompactionState
.NONE
, state
);
402 // Wait until the compaction is done
403 state
= admin
.getCompactionState(tableName
).get();
404 while (state
!= CompactionState
.NONE
&& curt
< endt
) {
406 state
= admin
.getCompactionState(tableName
).get();
408 // Now, compaction should be done.
409 assertEquals(CompactionState
.NONE
, state
);
412 int countAfter
= countStoreFilesInFamilies(regions
, families
);
413 int countAfterSingleFamily
= countStoreFilesInFamily(regions
, family
);
414 assertTrue(countAfter
< countBefore
);
416 if (expectedState
== CompactionState
.MAJOR
) assertTrue(families
.length
== countAfter
);
417 else assertTrue(families
.length
< countAfter
);
419 int singleFamDiff
= countBeforeSingleFamily
- countAfterSingleFamily
;
420 // assert only change was to single column family
421 assertTrue(singleFamDiff
== (countBefore
- countAfter
));
422 if (expectedState
== CompactionState
.MAJOR
) {
423 assertTrue(1 == countAfterSingleFamily
);
425 assertTrue(1 < countAfterSingleFamily
);
430 private static int countStoreFilesInFamily(List
<Region
> regions
, final byte[] family
) {
431 return countStoreFilesInFamilies(regions
, new byte[][] { family
});
434 private static int countStoreFilesInFamilies(List
<Region
> regions
, final byte[][] families
) {
436 for (Region region
: regions
) {
437 count
+= region
.getStoreFileList(families
).size();
442 static void loadData(final TableName tableName
, final byte[][] families
, final int rows
)
444 loadData(tableName
, families
, rows
, 1);
447 static void loadData(final TableName tableName
, final byte[][] families
, final int rows
,
448 final int flushes
) throws IOException
{
449 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tableName
);
450 List
<Put
> puts
= new ArrayList
<>(rows
);
451 byte[] qualifier
= Bytes
.toBytes("val");
452 for (int i
= 0; i
< flushes
; i
++) {
453 for (int k
= 0; k
< rows
; k
++) {
454 byte[] row
= Bytes
.add(Bytes
.toBytes(k
), Bytes
.toBytes(i
));
455 Put p
= new Put(row
);
456 for (int j
= 0; j
< families
.length
; ++j
) {
457 p
.addColumn(families
[j
], qualifier
, row
);
461 table
.putAll(puts
).join();