2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.List
;
24 import java
.util
.Optional
;
25 import java
.util
.concurrent
.atomic
.AtomicLong
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.Cell
;
28 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
29 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
30 import org
.apache
.hadoop
.hbase
.HConstants
;
31 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
32 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
33 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
35 import org
.apache
.hadoop
.hbase
.ipc
.ServerTooBusyException
;
36 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
37 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
38 import org
.apache
.hadoop
.hbase
.util
.Threads
;
39 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
40 import org
.junit
.AfterClass
;
41 import org
.junit
.BeforeClass
;
42 import org
.junit
.ClassRule
;
43 import org
.junit
.Rule
;
44 import org
.junit
.Test
;
45 import org
.junit
.experimental
.categories
.Category
;
46 import org
.junit
.rules
.TestName
;
49 * This class is for testing HBaseConnectionManager ServerBusyException.
50 * Be careful adding to this class. It sets a low
51 * HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
53 @Category({LargeTests
.class})
54 public class TestServerBusyException
{
57 public static final HBaseClassTestRule CLASS_RULE
=
58 HBaseClassTestRule
.forClass(TestServerBusyException
.class);
60 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
61 private static final byte[] FAM_NAM
= Bytes
.toBytes("f");
62 private static final byte[] ROW
= Bytes
.toBytes("bbb");
63 private static final int RPC_RETRY
= 5;
66 public TestName name
= new TestName();
68 public static class SleepCoprocessor
implements RegionCoprocessor
, RegionObserver
{
69 public static final int SLEEP_TIME
= 5000;
71 public Optional
<RegionObserver
> getRegionObserver() {
72 return Optional
.of(this);
76 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
77 final Get get
, final List
<Cell
> results
) throws IOException
{
78 Threads
.sleep(SLEEP_TIME
);
82 public void prePut(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
83 final Put put
, final WALEdit edit
, final Durability durability
) throws IOException
{
84 Threads
.sleep(SLEEP_TIME
);
88 public Result
preIncrement(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
89 final Increment increment
) throws IOException
{
90 Threads
.sleep(SLEEP_TIME
);
95 public void preDelete(final ObserverContext
<RegionCoprocessorEnvironment
> e
, final Delete delete
,
96 final WALEdit edit
, final Durability durability
) throws IOException
{
97 Threads
.sleep(SLEEP_TIME
);
102 public static class SleepLongerAtFirstCoprocessor
implements RegionCoprocessor
, RegionObserver
{
103 public static final int SLEEP_TIME
= 2000;
104 static final AtomicLong ct
= new AtomicLong(0);
107 public Optional
<RegionObserver
> getRegionObserver() {
108 return Optional
.of(this);
112 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
113 final Get get
, final List
<Cell
> results
) throws IOException
{
114 // After first sleep, all requests are timeout except the last retry. If we handle
115 // all the following requests, finally the last request is also timeout. If we drop all
116 // timeout requests, we can handle the last request immediately and it will not timeout.
117 if (ct
.incrementAndGet() <= 1) {
118 Threads
.sleep(SLEEP_TIME
* RPC_RETRY
* 2);
120 Threads
.sleep(SLEEP_TIME
);
126 public static void setUpBeforeClass() throws Exception
{
127 TEST_UTIL
.getConfiguration().setBoolean(HConstants
.STATUS_PUBLISHED
, true);
128 // Up the handlers; this test needs more than usual.
129 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
130 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, RPC_RETRY
);
131 // simulate queue blocking in testDropTimeoutRequest
132 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HANDLER_COUNT
, 1);
133 // Needed by the server busy test.
134 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD
, 3);
135 TEST_UTIL
.startMiniCluster(2);
138 @AfterClass public static void tearDownAfterClass() throws Exception
{
139 TEST_UTIL
.shutdownMiniCluster();
142 private static class TestPutThread
extends Thread
{
144 int getServerBusyException
= 0;
146 TestPutThread(Table table
){
153 Put p
= new Put(ROW
);
154 p
.addColumn(FAM_NAM
, new byte[] { 0 }, new byte[] { 0 });
156 } catch (ServerTooBusyException e
) {
157 getServerBusyException
= 1;
158 } catch (IOException ignore
) {
163 private static class TestGetThread
extends Thread
{
165 int getServerBusyException
= 0;
167 TestGetThread(Table table
){
174 Get g
= new Get(ROW
);
175 g
.addColumn(FAM_NAM
, new byte[] { 0 });
177 } catch (ServerTooBusyException e
) {
178 getServerBusyException
= 1;
179 } catch (IOException ignore
) {
185 public void testServerBusyException() throws Exception
{
186 TableDescriptor hdt
= TEST_UTIL
.createModifyableTableDescriptor(name
.getMethodName())
187 .setCoprocessor(SleepCoprocessor
.class.getName()).build();
188 Configuration c
= new Configuration(TEST_UTIL
.getConfiguration());
189 TEST_UTIL
.createTable(hdt
, new byte[][] { FAM_NAM
}, c
);
192 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
194 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
196 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
198 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
200 new TestGetThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
212 tg1
.getServerBusyException
+ tg2
.getServerBusyException
+ tg3
.getServerBusyException
213 + tg4
.getServerBusyException
+ tg5
.getServerBusyException
);
215 // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
216 // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
219 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
221 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
223 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
225 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
227 new TestPutThread(TEST_UTIL
.getConnection().getTable(hdt
.getTableName()));
239 tp1
.getServerBusyException
+ tp2
.getServerBusyException
+ tp3
.getServerBusyException
240 + tp4
.getServerBusyException
+ tp5
.getServerBusyException
);