2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_META_OPERATION_TIMEOUT
;
21 import static org
.apache
.hadoop
.hbase
.io
.ByteBuffAllocator
.MAX_BUFFER_COUNT_KEY
;
22 import static org
.junit
.Assert
.assertEquals
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Arrays
;
27 import java
.util
.Collections
;
28 import java
.util
.List
;
29 import java
.util
.concurrent
.ExecutionException
;
30 import java
.util
.concurrent
.ExecutorService
;
31 import java
.util
.concurrent
.Executors
;
32 import java
.util
.concurrent
.Future
;
33 import java
.util
.concurrent
.ThreadLocalRandom
;
34 import java
.util
.concurrent
.TimeUnit
;
35 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
36 import java
.util
.stream
.Collectors
;
37 import java
.util
.stream
.IntStream
;
38 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
39 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
40 import org
.apache
.hadoop
.hbase
.MemoryCompactionPolicy
;
41 import org
.apache
.hadoop
.hbase
.ServerName
;
42 import org
.apache
.hadoop
.hbase
.TableName
;
43 import org
.apache
.hadoop
.hbase
.Waiter
.ExplainingPredicate
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.CompactingMemStore
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
50 import org
.apache
.hadoop
.hbase
.util
.Threads
;
51 import org
.junit
.AfterClass
;
52 import org
.junit
.Assert
;
53 import org
.junit
.BeforeClass
;
54 import org
.junit
.ClassRule
;
55 import org
.junit
.Test
;
56 import org
.junit
.experimental
.categories
.Category
;
57 import org
.slf4j
.Logger
;
58 import org
.slf4j
.LoggerFactory
;
60 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
64 * Will split the table, and move region randomly when testing.
66 @Category({ LargeTests
.class, ClientTests
.class })
67 public class TestAsyncTableGetMultiThreaded
{
70 public static final HBaseClassTestRule CLASS_RULE
=
71 HBaseClassTestRule
.forClass(TestAsyncTableGetMultiThreaded
.class);
73 private static final Logger LOG
= LoggerFactory
.getLogger(TestAsyncTableGetMultiThreaded
.class);
75 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
77 private static final TableName TABLE_NAME
= TableName
.valueOf("async");
78 private static final byte[] FAMILY
= Bytes
.toBytes("cf");
79 private static final byte[] QUALIFIER
= Bytes
.toBytes("cq");
80 private static final int COUNT
= 1000;
82 private static AsyncConnection CONN
;
84 private static AsyncTable
<?
> TABLE
;
86 private static byte[][] SPLIT_KEYS
;
89 public static void setUp() throws Exception
{
90 setUp(MemoryCompactionPolicy
.NONE
);
93 protected static void setUp(MemoryCompactionPolicy memoryCompaction
) throws Exception
{
94 TEST_UTIL
.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT
, 60000L);
95 TEST_UTIL
.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY
, 100);
96 TEST_UTIL
.getConfiguration().set(CompactingMemStore
.COMPACTING_MEMSTORE_TYPE_KEY
,
97 String
.valueOf(memoryCompaction
));
98 TEST_UTIL
.getConfiguration().setBoolean("hbase.master.balancer.decision.buffer.enabled", true);
100 TEST_UTIL
.startMiniCluster(3);
101 SPLIT_KEYS
= new byte[8][];
102 for (int i
= 111; i
< 999; i
+= 111) {
103 SPLIT_KEYS
[i
/ 111 - 1] = Bytes
.toBytes(String
.format("%03d", i
));
105 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
);
106 TEST_UTIL
.waitTableAvailable(TABLE_NAME
);
107 CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
108 TABLE
= CONN
.getTableBuilder(TABLE_NAME
).setReadRpcTimeout(1, TimeUnit
.SECONDS
)
109 .setMaxRetries(1000).build();
111 IntStream
.range(0, COUNT
).mapToObj(i
-> new Put(Bytes
.toBytes(String
.format("%03d", i
)))
112 .addColumn(FAMILY
, QUALIFIER
, Bytes
.toBytes(i
))).collect(Collectors
.toList()))
117 public static void tearDown() throws Exception
{
118 Closeables
.close(CONN
, true);
119 TEST_UTIL
.shutdownMiniCluster();
122 private void run(AtomicBoolean stop
) throws InterruptedException
, ExecutionException
{
123 while (!stop
.get()) {
124 for (int i
= 0; i
< COUNT
; i
++) {
125 assertEquals(i
, Bytes
.toInt(TABLE
.get(new Get(Bytes
.toBytes(String
.format("%03d", i
))))
126 .get().getValue(FAMILY
, QUALIFIER
)));
128 // sleep a bit so we do not add to much load to the test machine as we have 20 threads here
134 public void test() throws Exception
{
135 LOG
.info("====== Test started ======");
137 AtomicBoolean stop
= new AtomicBoolean(false);
138 ExecutorService executor
= Executors
.newFixedThreadPool(numThreads
,
139 new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d").setDaemon(true)
140 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build());
141 List
<Future
<?
>> futures
= new ArrayList
<>();
142 IntStream
.range(0, numThreads
).forEach(i
-> futures
.add(executor
.submit(() -> {
146 LOG
.info("====== Scheduled {} read threads ======", numThreads
);
147 Collections
.shuffle(Arrays
.asList(SPLIT_KEYS
), ThreadLocalRandom
.current());
148 Admin admin
= TEST_UTIL
.getAdmin();
149 for (byte[] splitPoint
: SPLIT_KEYS
) {
150 int oldRegionCount
= admin
.getRegions(TABLE_NAME
).size();
151 LOG
.info("====== Splitting at {} ======, region count before splitting is {}",
152 Bytes
.toStringBinary(splitPoint
), oldRegionCount
);
153 admin
.split(TABLE_NAME
, splitPoint
);
154 TEST_UTIL
.waitFor(30000, new ExplainingPredicate
<Exception
>() {
156 public boolean evaluate() throws Exception
{
157 return TEST_UTIL
.getMiniHBaseCluster().getRegions(TABLE_NAME
).size() > oldRegionCount
;
161 public String
explainFailure() throws Exception
{
162 return "Split has not finished yet";
165 List
<HRegion
> regions
= TEST_UTIL
.getMiniHBaseCluster().getRegions(TABLE_NAME
);
166 LOG
.info("====== Split at {} ======, region count after splitting is {}",
167 Bytes
.toStringBinary(splitPoint
), regions
.size());
168 for (HRegion region
: regions
) {
169 LOG
.info("====== Compact {} ======", region
.getRegionInfo());
170 region
.compact(true);
172 for (HRegion region
: regions
) {
173 // Waiting for compaction to complete and references are cleaned up
174 LOG
.info("====== Waiting for compaction on {} ======", region
.getRegionInfo());
175 RetryCounter retrier
= new RetryCounter(30, 1, TimeUnit
.SECONDS
);
178 if (admin
.getCompactionStateForRegion(
179 region
.getRegionInfo().getRegionName()) == CompactionState
.NONE
) {
182 } catch (IOException e
) {
183 LOG
.warn("Failed to query");
185 if (!retrier
.shouldRetry()) {
186 throw new IOException("Can not finish compaction in time after attempt " +
187 retrier
.getAttemptTimes() + " times");
189 retrier
.sleepUntilNextRetry();
191 LOG
.info("====== Compaction on {} finished, close and archive compacted files ======",
192 region
.getRegionInfo());
193 region
.getStores().get(0).closeAndArchiveCompactedFiles();
194 LOG
.info("====== Close and archive compacted files on {} done ======",
195 region
.getRegionInfo());
198 LOG
.info("====== Balancing cluster ======");
199 admin
.balance(BalanceRequest
.newBuilder().setIgnoreRegionsInTransition(true).build());
200 LOG
.info("====== Balance cluster done ======");
202 ServerName metaServer
= TEST_UTIL
.getHBaseCluster().getServerHoldingMeta();
203 ServerName newMetaServer
= TEST_UTIL
.getHBaseCluster().getRegionServerThreads().stream()
204 .map(t
-> t
.getRegionServer().getServerName()).filter(s
-> !s
.equals(metaServer
))
206 LOG
.info("====== Moving meta from {} to {} ======", metaServer
, newMetaServer
);
207 admin
.move(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes(), newMetaServer
);
208 LOG
.info("====== Move meta done ======");
211 List
<LogEntry
> balancerDecisionRecords
=
212 admin
.getLogEntries(null, "BALANCER_DECISION", ServerType
.MASTER
, 2, null);
213 Assert
.assertEquals(balancerDecisionRecords
.size(), 2);
214 LOG
.info("====== Read test finished, shutdown thread pool ======");
217 for (int i
= 0; i
< numThreads
; i
++) {
218 LOG
.info("====== Waiting for {} threads to finish, remaining {} ======", numThreads
,
220 futures
.get(i
).get();
222 LOG
.info("====== Test test finished ======");