2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertTrue
;
22 import java
.io
.IOException
;
23 import org
.apache
.hadoop
.conf
.Configuration
;
24 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
25 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
26 import org
.apache
.hadoop
.hbase
.HConstants
;
27 import org
.apache
.hadoop
.hbase
.SingleProcessHBaseCluster
.MiniHBaseClusterRegionServer
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
30 import org
.apache
.hadoop
.hbase
.regionserver
.RSRpcServices
;
31 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
32 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
33 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
34 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
35 import org
.junit
.AfterClass
;
36 import org
.junit
.BeforeClass
;
37 import org
.junit
.ClassRule
;
38 import org
.junit
.Rule
;
39 import org
.junit
.Test
;
40 import org
.junit
.experimental
.categories
.Category
;
41 import org
.junit
.rules
.TestName
;
42 import org
.slf4j
.Logger
;
43 import org
.slf4j
.LoggerFactory
;
45 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
46 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanRequest
;
49 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanResponse
;
52 * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and
53 * getting retried. This scenario should not result in some data being skipped at RS side.
55 @Category({MediumTests
.class, ClientTests
.class})
56 public class TestClientScannerRPCTimeout
{
59 public static final HBaseClassTestRule CLASS_RULE
=
60 HBaseClassTestRule
.forClass(TestClientScannerRPCTimeout
.class);
62 private static final Logger LOG
= LoggerFactory
.getLogger(TestClientScannerRPCTimeout
.class);
63 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
64 private static final byte[] FAMILY
= Bytes
.toBytes("testFamily");
65 private static final byte[] QUALIFIER
= Bytes
.toBytes("testQualifier");
66 private static final byte[] VALUE
= Bytes
.toBytes("testValue");
67 private static final int rpcTimeout
= 2 * 1000;
68 private static final int CLIENT_RETRIES_NUMBER
= 3;
71 public TestName name
= new TestName();
74 public static void setUpBeforeClass() throws Exception
{
75 Configuration conf
= TEST_UTIL
.getConfiguration();
76 // Don't report so often so easier to see other rpcs
77 conf
.setInt("hbase.regionserver.msginterval", 3 * 10000);
78 conf
.setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, rpcTimeout
);
79 conf
.setStrings(HConstants
.REGION_SERVER_IMPL
, RegionServerWithScanTimeout
.class.getName());
80 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, CLIENT_RETRIES_NUMBER
);
81 conf
.setInt(HConstants
.HBASE_CLIENT_PAUSE
, 1000);
82 TEST_UTIL
.startMiniCluster(1);
86 public static void tearDownAfterClass() throws Exception
{
87 TEST_UTIL
.shutdownMiniCluster();
91 public void testScannerNextRPCTimesout() throws Exception
{
92 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
93 Table ht
= TEST_UTIL
.createTable(tableName
, FAMILY
);
94 byte[] r0
= Bytes
.toBytes("row-0");
95 byte[] r1
= Bytes
.toBytes("row-1");
96 byte[] r2
= Bytes
.toBytes("row-2");
97 byte[] r3
= Bytes
.toBytes("row-3");
102 LOG
.info("Wrote our three values");
103 RSRpcServicesWithScanTimeout
.seqNoToSleepOn
= 1;
104 Scan scan
= new Scan();
106 ResultScanner scanner
= ht
.getScanner(scan
);
107 Result result
= scanner
.next();
108 // fetched when openScanner
109 assertTrue("Expected row: row-0", Bytes
.equals(r0
, result
.getRow()));
110 result
= scanner
.next();
111 assertTrue("Expected row: row-1", Bytes
.equals(r1
, result
.getRow()));
112 LOG
.info("Got expected first row");
113 long t1
= EnvironmentEdgeManager
.currentTime();
114 result
= scanner
.next();
115 assertTrue((EnvironmentEdgeManager
.currentTime() - t1
) > rpcTimeout
);
116 assertTrue("Expected row: row-2", Bytes
.equals(r2
, result
.getRow()));
117 RSRpcServicesWithScanTimeout
.seqNoToSleepOn
= -1;// No need of sleep
118 result
= scanner
.next();
119 assertTrue("Expected row: row-3", Bytes
.equals(r3
, result
.getRow()));
122 // test the case that RPC is always timesout
123 scanner
= ht
.getScanner(scan
);
124 RSRpcServicesWithScanTimeout
.sleepAlways
= true;
125 RSRpcServicesWithScanTimeout
.tryNumber
= 0;
127 result
= scanner
.next();
128 } catch (IOException ioe
) {
129 // catch the exception after max retry number
130 LOG
.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER
, ioe
);
132 assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
133 + ", actual =" + RSRpcServicesWithScanTimeout
.tryNumber
,
134 RSRpcServicesWithScanTimeout
.tryNumber
<= CLIENT_RETRIES_NUMBER
);
137 private void putToTable(Table ht
, byte[] rowkey
) throws IOException
{
138 Put put
= new Put(rowkey
);
139 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
143 private static class RegionServerWithScanTimeout
extends MiniHBaseClusterRegionServer
{
144 public RegionServerWithScanTimeout(Configuration conf
)
145 throws IOException
, InterruptedException
{
150 protected RSRpcServices
createRpcServices() throws IOException
{
151 return new RSRpcServicesWithScanTimeout(this);
155 private static class RSRpcServicesWithScanTimeout
extends RSRpcServices
{
156 private long tableScannerId
;
157 private boolean slept
;
158 private static long seqNoToSleepOn
= -1;
159 private static boolean sleepAlways
= false;
160 private static int tryNumber
= 0;
162 public RSRpcServicesWithScanTimeout(HRegionServer rs
)
168 public ScanResponse
scan(final RpcController controller
, final ScanRequest request
)
169 throws ServiceException
{
170 if (request
.hasScannerId()) {
171 ScanResponse scanResponse
= super.scan(controller
, request
);
172 if (this.tableScannerId
== request
.getScannerId() &&
173 (sleepAlways
|| (!slept
&& seqNoToSleepOn
== request
.getNextCallSeq()))) {
175 LOG
.info("SLEEPING " + (rpcTimeout
+ 500));
176 Thread
.sleep(rpcTimeout
+ 500);
177 } catch (InterruptedException e
) {
181 if (tryNumber
> 2 * CLIENT_RETRIES_NUMBER
) {
187 ScanResponse scanRes
= super.scan(controller
, request
);
188 String regionName
= Bytes
.toString(request
.getRegion().getValue().toByteArray());
189 if (!regionName
.contains(TableName
.META_TABLE_NAME
.getNameAsString())) {
190 tableScannerId
= scanRes
.getScannerId();