2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.util
.concurrent
.ForkJoinPool
;
23 import java
.util
.stream
.Collectors
;
24 import java
.util
.stream
.IntStream
;
25 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
26 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
27 import org
.apache
.hadoop
.hbase
.TableName
;
28 import org
.apache
.hadoop
.hbase
.Waiter
.ExplainingPredicate
;
29 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
30 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
31 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
32 import org
.junit
.AfterClass
;
33 import org
.junit
.BeforeClass
;
34 import org
.junit
.ClassRule
;
35 import org
.junit
.Test
;
36 import org
.junit
.experimental
.categories
.Category
;
38 @Category({ MediumTests
.class, ClientTests
.class })
39 public class TestAsyncTableScannerCloseWhileSuspending
{
42 public static final HBaseClassTestRule CLASS_RULE
=
43 HBaseClassTestRule
.forClass(TestAsyncTableScannerCloseWhileSuspending
.class);
45 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
47 private static TableName TABLE_NAME
= TableName
.valueOf("async");
49 private static byte[] FAMILY
= Bytes
.toBytes("cf");
51 private static byte[] CQ
= Bytes
.toBytes("cq");
53 private static AsyncConnection CONN
;
55 private static AsyncTable
<?
> TABLE
;
58 public static void setUp() throws Exception
{
59 TEST_UTIL
.startMiniCluster(1);
60 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
);
61 CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
62 TABLE
= CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool());
63 TABLE
.putAll(IntStream
.range(0, 100).mapToObj(
64 i
-> new Put(Bytes
.toBytes(String
.format("%02d", i
))).addColumn(FAMILY
, CQ
, Bytes
.toBytes(i
)))
65 .collect(Collectors
.toList())).get();
69 public static void tearDown() throws Exception
{
71 TEST_UTIL
.shutdownMiniCluster();
74 private int getScannersCount() {
75 return TEST_UTIL
.getHBaseCluster().getRegionServerThreads().stream()
76 .map(t
-> t
.getRegionServer()).mapToInt(rs
-> rs
.getRSRpcServices().getScannersCount())
81 public void testCloseScannerWhileSuspending() throws Exception
{
82 try (ResultScanner scanner
= TABLE
.getScanner(new Scan().setMaxResultSize(1))) {
83 TEST_UTIL
.waitFor(10000, 100, new ExplainingPredicate
<Exception
>() {
86 public boolean evaluate() throws Exception
{
87 return ((AsyncTableResultScanner
) scanner
).isSuspended();
91 public String
explainFailure() throws Exception
{
92 return "The given scanner has been suspended in time";
95 assertEquals(1, getScannersCount());
97 TEST_UTIL
.waitFor(10000, 100, new ExplainingPredicate
<Exception
>() {
100 public boolean evaluate() throws Exception
{
101 return getScannersCount() == 0;
105 public String
explainFailure() throws Exception
{
106 return "Still have " + getScannersCount() + " scanners opened";