2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.hamcrest
.MatcherAssert
.assertThat
;
22 import static org
.junit
.Assert
.fail
;
24 import java
.io
.IOException
;
25 import java
.net
.SocketTimeoutException
;
26 import java
.util
.Arrays
;
27 import java
.util
.List
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
30 import org
.apache
.hadoop
.hbase
.client
.Connection
;
31 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
32 import org
.apache
.hadoop
.hbase
.client
.Get
;
33 import org
.apache
.hadoop
.hbase
.client
.Put
;
34 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
35 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
36 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedException
;
37 import org
.apache
.hadoop
.hbase
.client
.Scan
;
38 import org
.apache
.hadoop
.hbase
.client
.Table
;
39 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
40 import org
.apache
.hadoop
.hbase
.ipc
.CallTimeoutException
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.RSRpcServices
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
45 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 import org
.junit
.AfterClass
;
47 import org
.junit
.Before
;
48 import org
.junit
.BeforeClass
;
49 import org
.junit
.ClassRule
;
50 import org
.junit
.Test
;
51 import org
.junit
.experimental
.categories
.Category
;
52 import org
.slf4j
.Logger
;
53 import org
.slf4j
.LoggerFactory
;
55 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
56 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
57 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
59 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
62 * These tests verify that the RPC timeouts ('hbase.client.operation.timeout' and
63 * 'hbase.client.scanner.timeout.period') work correctly using a modified Region Server which
64 * injects delays to get, scan and mutate operations.
66 * When 'hbase.client.operation.timeout' is set and client operation is not completed in time the
67 * client will retry the operation 'hbase.client.retries.number' times. After that
68 * {@link SocketTimeoutException} will be thrown.
70 * Using 'hbase.client.scanner.timeout.period' configuration property similar behavior can be
71 * specified for scan related operations such as openScanner(), next(). If that times out
72 * {@link RetriesExhaustedException} will be thrown.
74 @Category({ ClientTests
.class, MediumTests
.class })
75 public class TestClientOperationTimeout
{
77 private static final Logger LOG
= LoggerFactory
.getLogger(TestClientOperationTimeout
.class);
80 public static final HBaseClassTestRule CLASS_RULE
=
81 HBaseClassTestRule
.forClass(TestClientOperationTimeout
.class);
83 private static final HBaseTestingUtil UTIL
= new HBaseTestingUtil();
85 // Activate the delays after table creation to test get/scan/put
86 private static int DELAY_GET
;
87 private static int DELAY_SCAN
;
88 private static int DELAY_MUTATE
;
89 private static int DELAY_BATCH_MUTATE
;
91 private static final TableName TABLE_NAME
= TableName
.valueOf("Timeout");
92 private static final byte[] FAMILY
= Bytes
.toBytes("family");
93 private static final byte[] ROW
= Bytes
.toBytes("row");
94 private static final byte[] QUALIFIER
= Bytes
.toBytes("qualifier");
95 private static final byte[] VALUE
= Bytes
.toBytes("value");
97 private static Connection CONN
;
98 private static Table TABLE
;
101 public static void setUp() throws Exception
{
102 // Set RegionServer class and use default values for other options.
103 StartTestingClusterOption option
=
104 StartTestingClusterOption
.builder().rsClass(DelayedRegionServer
.class).build();
105 UTIL
.startMiniCluster(option
);
106 UTIL
.getAdmin().createTable(TableDescriptorBuilder
.newBuilder(TABLE_NAME
)
107 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
)).build());
109 Configuration conf
= new Configuration(UTIL
.getConfiguration());
110 conf
.setLong(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 500);
111 conf
.setLong(HConstants
.HBASE_CLIENT_META_OPERATION_TIMEOUT
, 500);
112 conf
.setLong(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 500);
113 conf
.setLong(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1);
114 CONN
= ConnectionFactory
.createConnection(conf
);
115 TABLE
= CONN
.getTable(TABLE_NAME
);
119 public static void tearDown() throws Exception
{
120 Closeables
.close(TABLE
, true);
121 Closeables
.close(CONN
, true);
122 UTIL
.shutdownMiniCluster();
126 public void setUpBeforeTest() throws Exception
{
130 DELAY_BATCH_MUTATE
= 0;
134 * Tests that a get on a table throws {@link RetriesExhaustedException} when the operation takes
135 * longer than 'hbase.client.operation.timeout'.
138 public void testGetTimeout() {
141 TABLE
.get(new Get(ROW
));
142 fail("should not reach here");
143 } catch (Exception e
) {
144 LOG
.info("Got exception for get", e
);
145 assertThat(e
, instanceOf(RetriesExhaustedException
.class));
146 assertThat(e
.getCause(), instanceOf(CallTimeoutException
.class));
151 * Tests that a put on a table throws {@link RetriesExhaustedException} when the operation takes
152 * longer than 'hbase.client.operation.timeout'.
155 public void testPutTimeout() {
157 Put put
= new Put(ROW
);
158 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
161 fail("should not reach here");
162 } catch (Exception e
) {
163 LOG
.info("Got exception for put", e
);
164 assertThat(e
, instanceOf(RetriesExhaustedException
.class));
165 assertThat(e
.getCause(), instanceOf(CallTimeoutException
.class));
170 * Tests that a batch mutate on a table throws {@link RetriesExhaustedException} when the
171 * operation takes longer than 'hbase.client.operation.timeout'.
174 public void testMultiPutsTimeout() {
175 DELAY_BATCH_MUTATE
= 600;
176 Put put1
= new Put(ROW
).addColumn(FAMILY
, QUALIFIER
, VALUE
);
177 Put put2
= new Put(ROW
).addColumn(FAMILY
, QUALIFIER
, VALUE
);
178 List
<Put
> puts
= Arrays
.asList(put1
, put2
);
180 TABLE
.batch(puts
, new Object
[2]);
181 fail("should not reach here");
182 } catch (Exception e
) {
183 LOG
.info("Got exception for batch", e
);
184 assertThat(e
, instanceOf(RetriesExhaustedException
.class));
185 assertThat(e
.getCause(), instanceOf(RetriesExhaustedException
.class));
186 assertThat(e
.getCause().getCause(), instanceOf(CallTimeoutException
.class));
191 * Tests that scan on a table throws {@link RetriesExhaustedException} when the operation takes
192 * longer than 'hbase.client.scanner.timeout.period'.
195 public void testScanTimeout() throws IOException
, InterruptedException
{
196 // cache the region location.
197 try (RegionLocator locator
= TABLE
.getRegionLocator()) {
198 locator
.getRegionLocation(HConstants
.EMPTY_BYTE_ARRAY
);
200 // sleep a bit to make sure the location has been cached as it is an async operation.
203 try (ResultScanner scanner
= TABLE
.getScanner(new Scan())) {
205 fail("should not reach here");
206 } catch (Exception e
) {
207 LOG
.info("Got exception for scan", e
);
208 assertThat(e
, instanceOf(RetriesExhaustedException
.class));
209 assertThat(e
.getCause(), instanceOf(CallTimeoutException
.class));
213 public static final class DelayedRegionServer
214 extends SingleProcessHBaseCluster
.MiniHBaseClusterRegionServer
{
215 public DelayedRegionServer(Configuration conf
) throws IOException
, InterruptedException
{
220 protected RSRpcServices
createRpcServices() throws IOException
{
221 return new DelayedRSRpcServices(this);
226 * This {@link RSRpcServices} class injects delay for Rpc calls and after executes super methods.
228 private static final class DelayedRSRpcServices
extends RSRpcServices
{
229 DelayedRSRpcServices(HRegionServer rs
) throws IOException
{
234 public ClientProtos
.GetResponse
get(RpcController controller
, ClientProtos
.GetRequest request
)
235 throws ServiceException
{
237 Thread
.sleep(DELAY_GET
);
238 } catch (InterruptedException e
) {
239 LOG
.error("Sleep interrupted during get operation", e
);
241 return super.get(controller
, request
);
245 public ClientProtos
.MutateResponse
mutate(RpcController rpcc
,
246 ClientProtos
.MutateRequest request
) throws ServiceException
{
248 Thread
.sleep(DELAY_MUTATE
);
249 } catch (InterruptedException e
) {
250 LOG
.error("Sleep interrupted during mutate operation", e
);
252 return super.mutate(rpcc
, request
);
256 public ClientProtos
.ScanResponse
scan(RpcController controller
,
257 ClientProtos
.ScanRequest request
) throws ServiceException
{
259 Thread
.sleep(DELAY_SCAN
);
260 } catch (InterruptedException e
) {
261 LOG
.error("Sleep interrupted during scan operation", e
);
263 return super.scan(controller
, request
);
267 public ClientProtos
.MultiResponse
multi(RpcController rpcc
, ClientProtos
.MultiRequest request
)
268 throws ServiceException
{
270 Thread
.sleep(DELAY_BATCH_MUTATE
);
271 } catch (InterruptedException e
) {
272 LOG
.error("Sleep interrupted during multi operation", e
);
274 return super.multi(rpcc
, request
);