HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestClientScannerRPCTimeout.java
blobe58a66ffebc9a43a2ab4096d4269e642715b4606
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertTrue;
22 import java.io.IOException;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.hbase.HBaseClassTestRule;
25 import org.apache.hadoop.hbase.HBaseTestingUtil;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.regionserver.HRegionServer;
30 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
31 import org.apache.hadoop.hbase.testclassification.ClientTests;
32 import org.apache.hadoop.hbase.testclassification.MediumTests;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
35 import org.junit.AfterClass;
36 import org.junit.BeforeClass;
37 import org.junit.ClassRule;
38 import org.junit.Rule;
39 import org.junit.Test;
40 import org.junit.experimental.categories.Category;
41 import org.junit.rules.TestName;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
46 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
48 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
49 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
51 /**
52 * Test the scenario where a HRegionServer#scan() call, while scanning, timeout at client side and
53 * getting retried. This scenario should not result in some data being skipped at RS side.
55 @Category({MediumTests.class, ClientTests.class})
56 public class TestClientScannerRPCTimeout {
58 @ClassRule
59 public static final HBaseClassTestRule CLASS_RULE =
60 HBaseClassTestRule.forClass(TestClientScannerRPCTimeout.class);
62 private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerRPCTimeout.class);
63 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
64 private static final byte[] FAMILY = Bytes.toBytes("testFamily");
65 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
66 private static final byte[] VALUE = Bytes.toBytes("testValue");
67 private static final int rpcTimeout = 2 * 1000;
68 private static final int CLIENT_RETRIES_NUMBER = 3;
70 @Rule
71 public TestName name = new TestName();
73 @BeforeClass
74 public static void setUpBeforeClass() throws Exception {
75 Configuration conf = TEST_UTIL.getConfiguration();
76 // Don't report so often so easier to see other rpcs
77 conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
78 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
79 conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
80 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
81 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
82 TEST_UTIL.startMiniCluster(1);
85 @AfterClass
86 public static void tearDownAfterClass() throws Exception {
87 TEST_UTIL.shutdownMiniCluster();
90 @Test
91 public void testScannerNextRPCTimesout() throws Exception {
92 final TableName tableName = TableName.valueOf(name.getMethodName());
93 Table ht = TEST_UTIL.createTable(tableName, FAMILY);
94 byte[] r0 = Bytes.toBytes("row-0");
95 byte[] r1 = Bytes.toBytes("row-1");
96 byte[] r2 = Bytes.toBytes("row-2");
97 byte[] r3 = Bytes.toBytes("row-3");
98 putToTable(ht, r0);
99 putToTable(ht, r1);
100 putToTable(ht, r2);
101 putToTable(ht, r3);
102 LOG.info("Wrote our three values");
103 RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
104 Scan scan = new Scan();
105 scan.setCaching(1);
106 ResultScanner scanner = ht.getScanner(scan);
107 Result result = scanner.next();
108 // fetched when openScanner
109 assertTrue("Expected row: row-0", Bytes.equals(r0, result.getRow()));
110 result = scanner.next();
111 assertTrue("Expected row: row-1", Bytes.equals(r1, result.getRow()));
112 LOG.info("Got expected first row");
113 long t1 = EnvironmentEdgeManager.currentTime();
114 result = scanner.next();
115 assertTrue((EnvironmentEdgeManager.currentTime() - t1) > rpcTimeout);
116 assertTrue("Expected row: row-2", Bytes.equals(r2, result.getRow()));
117 RSRpcServicesWithScanTimeout.seqNoToSleepOn = -1;// No need of sleep
118 result = scanner.next();
119 assertTrue("Expected row: row-3", Bytes.equals(r3, result.getRow()));
120 scanner.close();
122 // test the case that RPC is always timesout
123 scanner = ht.getScanner(scan);
124 RSRpcServicesWithScanTimeout.sleepAlways = true;
125 RSRpcServicesWithScanTimeout.tryNumber = 0;
126 try {
127 result = scanner.next();
128 } catch (IOException ioe) {
129 // catch the exception after max retry number
130 LOG.info("Failed after maximal attempts=" + CLIENT_RETRIES_NUMBER, ioe);
132 assertTrue("Expected maximal try number=" + CLIENT_RETRIES_NUMBER
133 + ", actual =" + RSRpcServicesWithScanTimeout.tryNumber,
134 RSRpcServicesWithScanTimeout.tryNumber <= CLIENT_RETRIES_NUMBER);
137 private void putToTable(Table ht, byte[] rowkey) throws IOException {
138 Put put = new Put(rowkey);
139 put.addColumn(FAMILY, QUALIFIER, VALUE);
140 ht.put(put);
143 private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
144 public RegionServerWithScanTimeout(Configuration conf)
145 throws IOException, InterruptedException {
146 super(conf);
149 @Override
150 protected RSRpcServices createRpcServices() throws IOException {
151 return new RSRpcServicesWithScanTimeout(this);
155 private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
156 private long tableScannerId;
157 private boolean slept;
158 private static long seqNoToSleepOn = -1;
159 private static boolean sleepAlways = false;
160 private static int tryNumber = 0;
162 public RSRpcServicesWithScanTimeout(HRegionServer rs)
163 throws IOException {
164 super(rs);
167 @Override
168 public ScanResponse scan(final RpcController controller, final ScanRequest request)
169 throws ServiceException {
170 if (request.hasScannerId()) {
171 ScanResponse scanResponse = super.scan(controller, request);
172 if (this.tableScannerId == request.getScannerId() &&
173 (sleepAlways || (!slept && seqNoToSleepOn == request.getNextCallSeq()))) {
174 try {
175 LOG.info("SLEEPING " + (rpcTimeout + 500));
176 Thread.sleep(rpcTimeout + 500);
177 } catch (InterruptedException e) {
179 slept = true;
180 tryNumber++;
181 if (tryNumber > 2 * CLIENT_RETRIES_NUMBER) {
182 sleepAlways = false;
185 return scanResponse;
186 } else {
187 ScanResponse scanRes = super.scan(controller, request);
188 String regionName = Bytes.toString(request.getRegion().getValue().toByteArray());
189 if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) {
190 tableScannerId = scanRes.getScannerId();
192 return scanRes;