2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.io
.IOException
;
21 import java
.util
.Optional
;
22 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
23 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
24 import org
.apache
.hadoop
.hbase
.HConstants
;
25 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
26 import org
.apache
.hadoop
.hbase
.TableName
;
27 import org
.apache
.hadoop
.hbase
.client
.Delete
;
28 import org
.apache
.hadoop
.hbase
.client
.Get
;
29 import org
.apache
.hadoop
.hbase
.client
.Increment
;
30 import org
.apache
.hadoop
.hbase
.client
.Result
;
31 import org
.apache
.hadoop
.hbase
.client
.Table
;
32 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
33 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
36 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
37 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
38 import org
.apache
.hadoop
.hbase
.util
.Threads
;
39 import org
.junit
.AfterClass
;
40 import org
.junit
.Assert
;
41 import org
.junit
.BeforeClass
;
42 import org
.junit
.ClassRule
;
43 import org
.junit
.Rule
;
44 import org
.junit
.Test
;
45 import org
.junit
.experimental
.categories
.Category
;
46 import org
.junit
.rules
.TestName
;
48 @Category({MediumTests
.class})
49 public class TestSettingTimeoutOnBlockingPoint
{
52 public static final HBaseClassTestRule CLASS_RULE
=
53 HBaseClassTestRule
.forClass(TestSettingTimeoutOnBlockingPoint
.class);
55 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
56 private static final byte[] FAM
= Bytes
.toBytes("f");
57 private static final byte[] ROW1
= Bytes
.toBytes("row1");
58 private static final byte[] ROW2
= Bytes
.toBytes("row2");
61 public TestName testName
= new TestName();
64 public static void setUpBeforeClass() throws Exception
{
65 TEST_UTIL
.getConfiguration().setBoolean(HConstants
.STATUS_PUBLISHED
, true);
66 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1);
67 // simulate queue blocking
68 TEST_UTIL
.getConfiguration().setInt(HConstants
.REGION_SERVER_HANDLER_COUNT
, 2);
69 TEST_UTIL
.startMiniCluster(2);
73 public static void setUpAfterClass() throws Exception
{
74 TEST_UTIL
.shutdownMiniCluster();
77 public static class SleepCoprocessor
implements RegionCoprocessor
, RegionObserver
{
78 public static final int SLEEP_TIME
= 10000;
81 public Optional
<RegionObserver
> getRegionObserver() {
82 return Optional
.of(this);
86 public Result
preIncrementAfterRowLock(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
87 final Increment increment
) throws IOException
{
88 Threads
.sleep(SLEEP_TIME
);
94 public void testRowLock() throws IOException
{
95 TableName tableName
= TableName
.valueOf(testName
.getMethodName());
96 HTableDescriptor hdt
= TEST_UTIL
.createTableDescriptor(tableName
);
97 hdt
.addCoprocessor(SleepCoprocessor
.class.getName());
98 TEST_UTIL
.createTable(hdt
, new byte[][]{FAM
}, TEST_UTIL
.getConfiguration());
100 Thread incrementThread
= new Thread(() -> {
102 try( Table table
= TEST_UTIL
.getConnection().getTable(tableName
)) {
103 table
.incrementColumnValue(ROW1
, FAM
, FAM
, 1);
105 } catch (IOException e
) {
106 Assert
.fail(e
.getMessage());
109 Thread getThread
= new Thread(() -> {
111 TEST_UTIL
.getConnection().getTableBuilder(tableName
, null).setRpcTimeout(1000).build()) {
112 Delete delete
= new Delete(ROW1
);
113 table
.delete(delete
);
114 } catch (IOException e
) {
115 Assert
.fail(e
.getMessage());
119 incrementThread
.start();
124 TEST_UTIL
.getConnection().getTableBuilder(tableName
, null).setRpcTimeout(1000).build()) {
125 // We have only two handlers. The first thread will get a write lock for row1 and occupy
126 // the first handler. The second thread need a read lock for row1, it should quit after 1000
127 // ms and give back the handler because it can not get the lock in time.
128 // So we can get the value using the second handler.
129 table
.get(new Get(ROW2
)); // Will throw exception if the timeout checking is failed
131 incrementThread
.interrupt();
132 getThread
.interrupt();