2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.List
;
24 import java
.util
.Optional
;
25 import java
.util
.concurrent
.atomic
.AtomicLong
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.Cell
;
28 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
29 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
30 import org
.apache
.hadoop
.hbase
.HConstants
;
31 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
37 import org
.apache
.hadoop
.hbase
.ipc
.ServerTooBusyException
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
39 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
40 import org
.apache
.hadoop
.hbase
.util
.Threads
;
41 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.BeforeClass
;
44 import org
.junit
.ClassRule
;
45 import org
.junit
.Rule
;
46 import org
.junit
.Test
;
47 import org
.junit
.experimental
.categories
.Category
;
48 import org
.junit
.rules
.TestName
;
51 * This class is for testing HBaseConnectionManager ServerBusyException.
52 * Be careful adding to this class. It sets a low
53 * HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
55 @Category({LargeTests
.class})
56 public class TestServerBusyException
{
59 public static final HBaseClassTestRule CLASS_RULE
=
60 HBaseClassTestRule
.forClass(TestServerBusyException
.class);
62 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
63 private static final byte[] FAM_NAM
= Bytes
.toBytes("f");
64 private static final byte[] ROW
= Bytes
.toBytes("bbb");
65 private static final int RPC_RETRY
= 5;
68 public TestName name
= new TestName();
70 public static class SleepCoprocessor
implements RegionCoprocessor
, RegionObserver
{
71 public static final int SLEEP_TIME
= 5000;
73 public Optional
<RegionObserver
> getRegionObserver() {
74 return Optional
.of(this);
78 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
79 final Get get
, final List
<Cell
> results
) throws IOException
{
80 Threads
.sleep(SLEEP_TIME
);
84 public void prePut(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
85 final Put put
, final WALEdit edit
, final Durability durability
) throws IOException
{
86 Threads
.sleep(SLEEP_TIME
);
90 public Result
preIncrement(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
91 final Increment increment
) throws IOException
{
92 Threads
.sleep(SLEEP_TIME
);
97 public void preDelete(final ObserverContext
<RegionCoprocessorEnvironment
> e
, final Delete delete
,
98 final WALEdit edit
, final Durability durability
) throws IOException
{
99 Threads
.sleep(SLEEP_TIME
);
104 public static class SleepLongerAtFirstCoprocessor
implements RegionCoprocessor
, RegionObserver
{
105 public static final int SLEEP_TIME
= 2000;
106 static final AtomicLong ct
= new AtomicLong(0);
109 public Optional
<RegionObserver
> getRegionObserver() {
110 return Optional
.of(this);
114 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
115 final Get get
, final List
<Cell
> results
) throws IOException
{
116 // After first sleep, all requests are timeout except the last retry. If we handle
117 // all the following requests, finally the last request is also timeout. If we drop all
118 // timeout requests, we can handle the last request immediately and it will not timeout.
119 if (ct
.incrementAndGet() <= 1) {
120 Threads
.sleep(SLEEP_TIME
* RPC_RETRY
* 2);
122 Threads
.sleep(SLEEP_TIME
);
128 public static void setUpBeforeClass() throws Exception
{
129 TEST_UTIL
.getConfiguration().setBoolean(HConstants
.STATUS_PUBLISHED
, true);
130 // Up the handlers; this test needs more than usual.
131 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
132 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, RPC_RETRY
);
133 // simulate queue blocking in testDropTimeoutRequest
134 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HANDLER_COUNT
, 1);
135 // Needed by the server busy test.
136 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
, 3);
137 TEST_UTIL
.startMiniCluster(2);
140 @AfterClass public static void tearDownAfterClass() throws Exception
{
141 TEST_UTIL
.shutdownMiniCluster();
144 private static class TestPutThread
extends Thread
{
146 int getServerBusyException
= 0;
148 TestPutThread(Table table
){
155 Put p
= new Put(ROW
);
156 p
.addColumn(FAM_NAM
, new byte[] { 0 }, new byte[] { 0 });
158 } catch (ServerTooBusyException e
) {
159 getServerBusyException
= 1;
160 } catch (IOException ignore
) {
165 private static class TestGetThread
extends Thread
{
167 int getServerBusyException
= 0;
169 TestGetThread(Table table
){
176 Get g
= new Get(ROW
);
177 g
.addColumn(FAM_NAM
, new byte[] { 0 });
179 } catch (ServerTooBusyException e
) {
180 getServerBusyException
= 1;
181 } catch (IOException ignore
) {
187 public void testServerBusyException() throws Exception
{
188 HTableDescriptor hdt
= TEST_UTIL
.createTableDescriptor(TableName
.valueOf(name
.getMethodName()));
189 hdt
.addCoprocessor(SleepCoprocessor
.class.getName());
190 Configuration c
= new Configuration(TEST_UTIL
.getConfiguration());
191 TEST_UTIL
.createTable(hdt
, new byte[][] { FAM_NAM
}, c
);
194 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
196 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
198 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
200 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
202 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
214 tg1
.getServerBusyException
+ tg2
.getServerBusyException
+ tg3
.getServerBusyException
215 + tg4
.getServerBusyException
+ tg5
.getServerBusyException
);
217 // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
218 // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
221 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
223 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
225 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
227 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
229 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
241 tp1
.getServerBusyException
+ tp2
.getServerBusyException
+ tp3
.getServerBusyException
242 + tp4
.getServerBusyException
+ tp5
.getServerBusyException
);