2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.HBASE_CLIENT_META_OPERATION_TIMEOUT
;
21 import static org
.apache
.hadoop
.hbase
.master
.LoadBalancer
.TABLES_ON_MASTER
;
22 import static org
.junit
.Assert
.assertEquals
;
24 import java
.util
.ArrayList
;
25 import java
.util
.Arrays
;
26 import java
.util
.Collections
;
27 import java
.util
.List
;
28 import java
.util
.Random
;
29 import java
.util
.concurrent
.ExecutionException
;
30 import java
.util
.concurrent
.ExecutorService
;
31 import java
.util
.concurrent
.Executors
;
32 import java
.util
.concurrent
.Future
;
33 import java
.util
.concurrent
.TimeUnit
;
34 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
35 import java
.util
.stream
.Collectors
;
36 import java
.util
.stream
.IntStream
;
38 import org
.apache
.commons
.io
.IOUtils
;
39 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
40 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
41 import org
.apache
.hadoop
.hbase
.MemoryCompactionPolicy
;
42 import org
.apache
.hadoop
.hbase
.ServerName
;
43 import org
.apache
.hadoop
.hbase
.TableName
;
44 import org
.apache
.hadoop
.hbase
.Waiter
.ExplainingPredicate
;
45 import org
.apache
.hadoop
.hbase
.io
.ByteBufferPool
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.CompactingMemStore
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
49 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.apache
.hadoop
.hbase
.util
.RetryCounter
;
52 import org
.apache
.hadoop
.hbase
.util
.Threads
;
53 import org
.junit
.AfterClass
;
54 import org
.junit
.BeforeClass
;
55 import org
.junit
.ClassRule
;
56 import org
.junit
.Test
;
57 import org
.junit
.experimental
.categories
.Category
;
60 * Will split the table, and move region randomly when testing.
62 @Category({ LargeTests
.class, ClientTests
.class })
63 public class TestAsyncTableGetMultiThreaded
{
66 public static final HBaseClassTestRule CLASS_RULE
=
67 HBaseClassTestRule
.forClass(TestAsyncTableGetMultiThreaded
.class);
69 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
71 private static TableName TABLE_NAME
= TableName
.valueOf("async");
73 private static byte[] FAMILY
= Bytes
.toBytes("cf");
75 private static byte[] QUALIFIER
= Bytes
.toBytes("cq");
77 private static int COUNT
= 1000;
79 private static AsyncConnection CONN
;
81 private static AsyncTable
<?
> TABLE
;
83 private static byte[][] SPLIT_KEYS
;
86 public static void setUp() throws Exception
{
87 setUp(MemoryCompactionPolicy
.NONE
);
90 protected static void setUp(MemoryCompactionPolicy memoryCompaction
) throws Exception
{
91 TEST_UTIL
.getConfiguration().set(TABLES_ON_MASTER
, "none");
92 TEST_UTIL
.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT
, 60000L);
93 TEST_UTIL
.getConfiguration().setInt(ByteBufferPool
.MAX_POOL_SIZE_KEY
, 100);
94 TEST_UTIL
.getConfiguration().set(CompactingMemStore
.COMPACTING_MEMSTORE_TYPE_KEY
,
95 String
.valueOf(memoryCompaction
));
97 TEST_UTIL
.startMiniCluster(5);
98 SPLIT_KEYS
= new byte[8][];
99 for (int i
= 111; i
< 999; i
+= 111) {
100 SPLIT_KEYS
[i
/ 111 - 1] = Bytes
.toBytes(String
.format("%03d", i
));
102 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
);
103 TEST_UTIL
.waitTableAvailable(TABLE_NAME
);
104 CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
105 TABLE
= CONN
.getTableBuilder(TABLE_NAME
).setReadRpcTimeout(1, TimeUnit
.SECONDS
)
106 .setMaxRetries(1000).build();
108 IntStream
.range(0, COUNT
).mapToObj(i
-> new Put(Bytes
.toBytes(String
.format("%03d", i
)))
109 .addColumn(FAMILY
, QUALIFIER
, Bytes
.toBytes(i
))).collect(Collectors
.toList()))
114 public static void tearDown() throws Exception
{
115 IOUtils
.closeQuietly(CONN
);
116 TEST_UTIL
.shutdownMiniCluster();
119 private void run(AtomicBoolean stop
) throws InterruptedException
, ExecutionException
{
120 while (!stop
.get()) {
121 for (int i
= 0; i
< COUNT
; i
++) {
122 assertEquals(i
, Bytes
.toInt(TABLE
.get(new Get(Bytes
.toBytes(String
.format("%03d", i
))))
123 .get().getValue(FAMILY
, QUALIFIER
)));
129 public void test() throws Exception
{
131 AtomicBoolean stop
= new AtomicBoolean(false);
132 ExecutorService executor
=
133 Executors
.newFixedThreadPool(numThreads
, Threads
.newDaemonThreadFactory("TestAsyncGet-"));
134 List
<Future
<?
>> futures
= new ArrayList
<>();
135 IntStream
.range(0, numThreads
).forEach(i
-> futures
.add(executor
.submit(() -> {
139 Collections
.shuffle(Arrays
.asList(SPLIT_KEYS
), new Random(123));
140 Admin admin
= TEST_UTIL
.getAdmin();
141 for (byte[] splitPoint
: SPLIT_KEYS
) {
142 int oldRegionCount
= admin
.getRegions(TABLE_NAME
).size();
143 admin
.split(TABLE_NAME
, splitPoint
);
144 TEST_UTIL
.waitFor(30000, new ExplainingPredicate
<Exception
>() {
146 public boolean evaluate() throws Exception
{
147 return TEST_UTIL
.getMiniHBaseCluster().getRegions(TABLE_NAME
).size() > oldRegionCount
;
151 public String
explainFailure() throws Exception
{
152 return "Split has not finished yet";
156 for (HRegion region
: TEST_UTIL
.getHBaseCluster().getRegions(TABLE_NAME
)) {
157 region
.compact(true);
159 //Waiting for compaction to complete and references are cleaned up
160 RetryCounter retrier
= new RetryCounter(30, 1, TimeUnit
.SECONDS
);
161 while (CompactionState
.NONE
!= admin
162 .getCompactionStateForRegion(region
.getRegionInfo().getRegionName())
163 && retrier
.shouldRetry()) {
164 retrier
.sleepUntilNextRetry();
166 region
.getStores().get(0).closeAndArchiveCompactedFiles();
171 ServerName metaServer
= TEST_UTIL
.getHBaseCluster().getServerHoldingMeta();
172 ServerName newMetaServer
= TEST_UTIL
.getHBaseCluster().getRegionServerThreads().stream()
173 .map(t
-> t
.getRegionServer().getServerName()).filter(s
-> !s
.equals(metaServer
))
175 admin
.move(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes(),
176 Bytes
.toBytes(newMetaServer
.getServerName()));
181 for (Future
<?
> future
: futures
) {