2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import java
.io
.IOException
;
21 import java
.io
.InterruptedIOException
;
22 import java
.net
.SocketTimeoutException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.List
;
25 import java
.util
.Optional
;
26 import java
.util
.concurrent
.atomic
.AtomicInteger
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.hbase
.Cell
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
37 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
39 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
40 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
41 import org
.apache
.hadoop
.hbase
.util
.Threads
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.Assert
;
44 import org
.junit
.BeforeClass
;
45 import org
.junit
.ClassRule
;
46 import org
.junit
.Test
;
47 import org
.junit
.experimental
.categories
.Category
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 @Category({MediumTests
.class, ClientTests
.class})
52 public class TestClientOperationInterrupt
{
55 public static final HBaseClassTestRule CLASS_RULE
=
56 HBaseClassTestRule
.forClass(TestClientOperationInterrupt
.class);
58 private static final Logger LOG
= LoggerFactory
.getLogger(TestClientOperationInterrupt
.class);
60 private static HBaseTestingUtility util
;
61 private static final TableName tableName
= TableName
.valueOf("test");
62 private static final byte[] dummy
= Bytes
.toBytes("dummy");
63 private static final byte[] row1
= Bytes
.toBytes("r1");
64 private static final byte[] test
= Bytes
.toBytes("test");
65 private static Configuration conf
;
67 public static class TestCoprocessor
implements RegionCoprocessor
, RegionObserver
{
69 public Optional
<RegionObserver
> getRegionObserver() {
70 return Optional
.of(this);
74 public void preGetOp(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
75 final Get get
, final List
<Cell
> results
) throws IOException
{
82 public static void setUpBeforeClass() throws Exception
{
83 conf
= HBaseConfiguration
.create();
84 conf
.setStrings(CoprocessorHost
.USER_REGION_COPROCESSOR_CONF_KEY
,
85 TestCoprocessor
.class.getName());
86 util
= new HBaseTestingUtility(conf
);
87 util
.startMiniCluster();
89 Admin admin
= util
.getAdmin();
90 if (admin
.tableExists(tableName
)) {
91 if (admin
.isTableEnabled(tableName
)) {
92 admin
.disableTable(tableName
);
94 admin
.deleteTable(tableName
);
96 Table ht
= util
.createTable(tableName
, new byte[][]{dummy
, test
});
98 Put p
= new Put(row1
);
99 p
.addColumn(dummy
, dummy
, dummy
);
105 public void testInterrupt50Percent() throws IOException
, InterruptedException
{
106 final AtomicInteger noEx
= new AtomicInteger(0);
107 final AtomicInteger badEx
= new AtomicInteger(0);
108 final AtomicInteger noInt
= new AtomicInteger(0);
109 final AtomicInteger done
= new AtomicInteger(0);
110 List
<Thread
> threads
= new ArrayList
<>();
112 final int nbThread
= 100;
114 for (int i
= 0; i
< nbThread
; i
++) {
115 Thread t
= new Thread() {
119 Table ht
= util
.getConnection().getTable(tableName
);
120 Result r
= ht
.get(new Get(row1
));
121 noEx
.incrementAndGet();
122 } catch (IOException e
) {
123 LOG
.info("exception", e
);
124 if (!(e
instanceof InterruptedIOException
) || (e
instanceof SocketTimeoutException
)) {
125 badEx
.incrementAndGet();
127 if (Thread
.currentThread().isInterrupted()) {
128 noInt
.incrementAndGet();
129 LOG
.info("The thread should NOT be with the 'interrupt' status.");
133 done
.incrementAndGet();
137 t
.setName("TestClientOperationInterrupt #" + i
);
141 int expectedNoExNum
= nbThread
/ 2;
143 for (int i
= 0; i
< nbThread
/ 2; i
++) {
144 if (threads
.get(i
).getState().equals(Thread
.State
.TERMINATED
)) {
147 threads
.get(i
).interrupt();
151 boolean stillAlive
= true;
154 for (Thread t
: threads
) {
162 Assert
.assertFalse(Thread
.currentThread().isInterrupted());
163 Assert
.assertTrue(" noEx: " + noEx
.get() + ", badEx=" + badEx
.get() + ", noInt=" + noInt
.get(),
164 noEx
.get() == expectedNoExNum
&& badEx
.get() == 0);
166 // The problem here is that we need the server to free its handlers to handle all operations
167 while (done
.get() != nbThread
){
171 Table ht
= util
.getConnection().getTable(tableName
);
172 Result r
= ht
.get(new Get(row1
));
173 Assert
.assertFalse(r
.isEmpty());
177 public static void tearDownAfterClass() throws Exception
{
178 util
.shutdownMiniCluster();