2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.io
.IOException
;
24 import java
.io
.UncheckedIOException
;
25 import java
.util
.Arrays
;
26 import java
.util
.List
;
27 import java
.util
.concurrent
.ForkJoinPool
;
28 import java
.util
.function
.Supplier
;
29 import java
.util
.stream
.Collectors
;
30 import java
.util
.stream
.IntStream
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.regionserver
.NoSuchColumnFamilyException
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.util
.Pair
;
37 import org
.junit
.AfterClass
;
38 import org
.junit
.BeforeClass
;
39 import org
.junit
.Test
;
41 public abstract class AbstractTestAsyncTableScan
{
43 protected static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
45 protected static TableName TABLE_NAME
= TableName
.valueOf("async");
47 protected static byte[] FAMILY
= Bytes
.toBytes("cf");
49 protected static byte[] CQ1
= Bytes
.toBytes("cq1");
51 protected static byte[] CQ2
= Bytes
.toBytes("cq2");
53 protected static int COUNT
= 1000;
55 protected static AsyncConnection ASYNC_CONN
;
58 public static void setUp() throws Exception
{
59 TEST_UTIL
.startMiniCluster(3);
60 byte[][] splitKeys
= new byte[8][];
61 for (int i
= 111; i
< 999; i
+= 111) {
62 splitKeys
[i
/ 111 - 1] = Bytes
.toBytes(String
.format("%03d", i
));
64 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
, splitKeys
);
65 TEST_UTIL
.waitTableAvailable(TABLE_NAME
);
66 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
67 ASYNC_CONN
.getTable(TABLE_NAME
).putAll(IntStream
.range(0, COUNT
)
68 .mapToObj(i
-> new Put(Bytes
.toBytes(String
.format("%03d", i
)))
69 .addColumn(FAMILY
, CQ1
, Bytes
.toBytes(i
)).addColumn(FAMILY
, CQ2
, Bytes
.toBytes(i
* i
)))
70 .collect(Collectors
.toList())).get();
74 public static void tearDown() throws Exception
{
76 TEST_UTIL
.shutdownMiniCluster();
79 protected static Scan
createNormalScan() {
83 protected static Scan
createBatchScan() {
84 return new Scan().setBatch(1);
87 // set a small result size for testing flow control
88 protected static Scan
createSmallResultSizeScan() {
89 return new Scan().setMaxResultSize(1);
92 protected static Scan
createBatchSmallResultSizeScan() {
93 return new Scan().setBatch(1).setMaxResultSize(1);
96 protected static AsyncTable
<?
> getRawTable() {
97 return ASYNC_CONN
.getTable(TABLE_NAME
);
100 protected static AsyncTable
<?
> getTable() {
101 return ASYNC_CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool());
104 private static List
<Pair
<String
, Supplier
<Scan
>>> getScanCreator() {
105 return Arrays
.asList(Pair
.newPair("normal", AbstractTestAsyncTableScan
::createNormalScan
),
106 Pair
.newPair("batch", AbstractTestAsyncTableScan
::createBatchScan
),
107 Pair
.newPair("smallResultSize", AbstractTestAsyncTableScan
::createSmallResultSizeScan
),
108 Pair
.newPair("batchSmallResultSize",
109 AbstractTestAsyncTableScan
::createBatchSmallResultSizeScan
));
112 protected static List
<Object
[]> getScanCreatorParams() {
113 return getScanCreator().stream().map(p
-> new Object
[] { p
.getFirst(), p
.getSecond() })
114 .collect(Collectors
.toList());
117 private static List
<Pair
<String
, Supplier
<AsyncTable
<?
>>>> getTableCreator() {
118 return Arrays
.asList(Pair
.newPair("raw", AbstractTestAsyncTableScan
::getRawTable
),
119 Pair
.newPair("normal", AbstractTestAsyncTableScan
::getTable
));
122 protected static List
<Object
[]> getTableAndScanCreatorParams() {
123 List
<Pair
<String
, Supplier
<AsyncTable
<?
>>>> tableCreator
= getTableCreator();
124 List
<Pair
<String
, Supplier
<Scan
>>> scanCreator
= getScanCreator();
125 return tableCreator
.stream()
126 .flatMap(tp
-> scanCreator
.stream().map(
127 sp
-> new Object
[] { tp
.getFirst(), tp
.getSecond(), sp
.getFirst(), sp
.getSecond() }))
128 .collect(Collectors
.toList());
131 protected abstract Scan
createScan();
133 protected abstract List
<Result
> doScan(Scan scan
) throws Exception
;
135 protected final List
<Result
> convertFromBatchResult(List
<Result
> results
) {
136 assertTrue(results
.size() % 2 == 0);
137 return IntStream
.range(0, results
.size() / 2).mapToObj(i
-> {
140 .createCompleteResult(Arrays
.asList(results
.get(2 * i
), results
.get(2 * i
+ 1)));
141 } catch (IOException e
) {
142 throw new UncheckedIOException(e
);
144 }).collect(Collectors
.toList());
148 public void testScanAll() throws Exception
{
149 List
<Result
> results
= doScan(createScan());
150 // make sure all scanners are closed at RS side
151 TEST_UTIL
.getHBaseCluster().getRegionServerThreads().stream().map(t
-> t
.getRegionServer())
154 "The scanner count of " + rs
.getServerName() + " is " +
155 rs
.getRSRpcServices().getScannersCount(),
156 0, rs
.getRSRpcServices().getScannersCount()));
157 assertEquals(COUNT
, results
.size());
158 IntStream
.range(0, COUNT
).forEach(i
-> {
159 Result result
= results
.get(i
);
160 assertEquals(String
.format("%03d", i
), Bytes
.toString(result
.getRow()));
161 assertEquals(i
, Bytes
.toInt(result
.getValue(FAMILY
, CQ1
)));
165 private void assertResultEquals(Result result
, int i
) {
166 assertEquals(String
.format("%03d", i
), Bytes
.toString(result
.getRow()));
167 assertEquals(i
, Bytes
.toInt(result
.getValue(FAMILY
, CQ1
)));
168 assertEquals(i
* i
, Bytes
.toInt(result
.getValue(FAMILY
, CQ2
)));
172 public void testReversedScanAll() throws Exception
{
173 List
<Result
> results
= doScan(createScan().setReversed(true));
174 assertEquals(COUNT
, results
.size());
175 IntStream
.range(0, COUNT
).forEach(i
-> assertResultEquals(results
.get(i
), COUNT
- i
- 1));
179 public void testScanNoStopKey() throws Exception
{
181 List
<Result
> results
=
182 doScan(createScan().withStartRow(Bytes
.toBytes(String
.format("%03d", start
))));
183 assertEquals(COUNT
- start
, results
.size());
184 IntStream
.range(0, COUNT
- start
).forEach(i
-> assertResultEquals(results
.get(i
), start
+ i
));
188 public void testReverseScanNoStopKey() throws Exception
{
190 List
<Result
> results
= doScan(
191 createScan().withStartRow(Bytes
.toBytes(String
.format("%03d", start
))).setReversed(true));
192 assertEquals(start
+ 1, results
.size());
193 IntStream
.range(0, start
+ 1).forEach(i
-> assertResultEquals(results
.get(i
), start
- i
));
197 public void testScanWrongColumnFamily() throws Exception
{
199 doScan(createScan().addFamily(Bytes
.toBytes("WrongColumnFamily")));
200 } catch (Exception e
) {
201 assertTrue(e
instanceof NoSuchColumnFamilyException
||
202 e
.getCause() instanceof NoSuchColumnFamilyException
);
206 private void testScan(int start
, boolean startInclusive
, int stop
, boolean stopInclusive
,
207 int limit
) throws Exception
{
209 createScan().withStartRow(Bytes
.toBytes(String
.format("%03d", start
)), startInclusive
)
210 .withStopRow(Bytes
.toBytes(String
.format("%03d", stop
)), stopInclusive
);
212 scan
.setLimit(limit
);
214 List
<Result
> results
= doScan(scan
);
215 int actualStart
= startInclusive ? start
: start
+ 1;
216 int actualStop
= stopInclusive ? stop
+ 1 : stop
;
217 int count
= actualStop
- actualStart
;
219 count
= Math
.min(count
, limit
);
221 assertEquals(count
, results
.size());
222 IntStream
.range(0, count
).forEach(i
-> assertResultEquals(results
.get(i
), actualStart
+ i
));
225 private void testReversedScan(int start
, boolean startInclusive
, int stop
, boolean stopInclusive
,
226 int limit
) throws Exception
{
228 createScan().withStartRow(Bytes
.toBytes(String
.format("%03d", start
)), startInclusive
)
229 .withStopRow(Bytes
.toBytes(String
.format("%03d", stop
)), stopInclusive
).setReversed(true);
231 scan
.setLimit(limit
);
233 List
<Result
> results
= doScan(scan
);
234 int actualStart
= startInclusive ? start
: start
- 1;
235 int actualStop
= stopInclusive ? stop
- 1 : stop
;
236 int count
= actualStart
- actualStop
;
238 count
= Math
.min(count
, limit
);
240 assertEquals(count
, results
.size());
241 IntStream
.range(0, count
).forEach(i
-> assertResultEquals(results
.get(i
), actualStart
- i
));
245 public void testScanWithStartKeyAndStopKey() throws Exception
{
246 testScan(1, true, 998, false, -1); // from first region to last region
247 testScan(123, true, 345, true, -1);
248 testScan(234, true, 456, false, -1);
249 testScan(345, false, 567, true, -1);
250 testScan(456, false, 678, false, -1);
254 public void testReversedScanWithStartKeyAndStopKey() throws Exception
{
255 testReversedScan(998, true, 1, false, -1); // from last region to first region
256 testReversedScan(543, true, 321, true, -1);
257 testReversedScan(654, true, 432, false, -1);
258 testReversedScan(765, false, 543, true, -1);
259 testReversedScan(876, false, 654, false, -1);
263 public void testScanAtRegionBoundary() throws Exception
{
264 testScan(222, true, 333, true, -1);
265 testScan(333, true, 444, false, -1);
266 testScan(444, false, 555, true, -1);
267 testScan(555, false, 666, false, -1);
271 public void testReversedScanAtRegionBoundary() throws Exception
{
272 testReversedScan(333, true, 222, true, -1);
273 testReversedScan(444, true, 333, false, -1);
274 testReversedScan(555, false, 444, true, -1);
275 testReversedScan(666, false, 555, false, -1);
279 public void testScanWithLimit() throws Exception
{
280 testScan(1, true, 998, false, 900); // from first region to last region
281 testScan(123, true, 234, true, 100);
282 testScan(234, true, 456, false, 100);
283 testScan(345, false, 567, true, 100);
284 testScan(456, false, 678, false, 100);
288 public void testScanWithLimitGreaterThanActualCount() throws Exception
{
289 testScan(1, true, 998, false, 1000); // from first region to last region
290 testScan(123, true, 345, true, 200);
291 testScan(234, true, 456, false, 200);
292 testScan(345, false, 567, true, 200);
293 testScan(456, false, 678, false, 200);
297 public void testReversedScanWithLimit() throws Exception
{
298 testReversedScan(998, true, 1, false, 900); // from last region to first region
299 testReversedScan(543, true, 321, true, 100);
300 testReversedScan(654, true, 432, false, 100);
301 testReversedScan(765, false, 543, true, 100);
302 testReversedScan(876, false, 654, false, 100);
306 public void testReversedScanWithLimitGreaterThanActualCount() throws Exception
{
307 testReversedScan(998, true, 1, false, 1000); // from last region to first region
308 testReversedScan(543, true, 321, true, 200);
309 testReversedScan(654, true, 432, false, 200);
310 testReversedScan(765, false, 543, true, 200);
311 testReversedScan(876, false, 654, false, 200);