2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.io
.IOException
;
24 import java
.io
.InterruptedIOException
;
25 import java
.util
.Optional
;
26 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.FileSystem
;
30 import org
.apache
.hadoop
.fs
.Path
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
34 import org
.apache
.hadoop
.hbase
.HConstants
;
35 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.TableNameTestRule
;
38 import org
.apache
.hadoop
.hbase
.Waiter
;
39 import org
.apache
.hadoop
.hbase
.client
.Admin
;
40 import org
.apache
.hadoop
.hbase
.client
.Append
;
41 import org
.apache
.hadoop
.hbase
.client
.BufferedMutator
;
42 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
43 import org
.apache
.hadoop
.hbase
.client
.Delete
;
44 import org
.apache
.hadoop
.hbase
.client
.Durability
;
45 import org
.apache
.hadoop
.hbase
.client
.Increment
;
46 import org
.apache
.hadoop
.hbase
.client
.Put
;
47 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
48 import org
.apache
.hadoop
.hbase
.client
.Result
;
49 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
50 import org
.apache
.hadoop
.hbase
.client
.Scan
;
51 import org
.apache
.hadoop
.hbase
.client
.Table
;
52 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
53 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
54 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
55 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
56 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
57 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
58 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
59 import org
.apache
.hadoop
.hbase
.filter
.FilterBase
;
60 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
61 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
62 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
63 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
64 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
65 import org
.junit
.After
;
66 import org
.junit
.Before
;
67 import org
.junit
.BeforeClass
;
68 import org
.junit
.ClassRule
;
69 import org
.junit
.Rule
;
70 import org
.junit
.Test
;
71 import org
.junit
.experimental
.categories
.Category
;
72 import org
.slf4j
.Logger
;
73 import org
.slf4j
.LoggerFactory
;
75 @Category({RegionServerTests
.class, LargeTests
.class})
76 public class TestRegionInterrupt
{
79 public static final HBaseClassTestRule CLASS_RULE
=
80 HBaseClassTestRule
.forClass(TestRegionInterrupt
.class);
82 private static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
83 private static final Logger LOG
= LoggerFactory
.getLogger(TestRegionInterrupt
.class);
85 static final byte[] FAMILY
= Bytes
.toBytes("info");
87 static long sleepTime
;
90 public TableNameTestRule name
= new TableNameTestRule();
93 public static void setUpBeforeClass() throws Exception
{
94 Configuration conf
= TEST_UTIL
.getConfiguration();
95 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 1);
96 conf
.setClass(HConstants
.REGION_IMPL
, InterruptInterceptingHRegion
.class, Region
.class);
97 conf
.setBoolean(HRegion
.CLOSE_WAIT_ABORT
, true);
98 // Ensure the sleep interval is long enough for interrupts to occur.
99 long waitInterval
= conf
.getLong(HRegion
.CLOSE_WAIT_INTERVAL
,
100 HRegion
.DEFAULT_CLOSE_WAIT_INTERVAL
);
101 sleepTime
= waitInterval
* 2;
102 // Try to bound the running time of this unit if expected actions do not take place.
103 conf
.setLong(HRegion
.CLOSE_WAIT_TIME
, sleepTime
* 2);
107 public void setUp() throws Exception
{
108 TEST_UTIL
.startMiniCluster();
112 public void tearDown() throws Exception
{
113 TEST_UTIL
.shutdownMiniCluster();
117 public void testCloseInterruptScanning() throws Exception
{
118 final TableName tableName
= name
.getTableName();
119 LOG
.info("Creating table " + tableName
);
120 try (Table table
= TEST_UTIL
.createTable(tableName
, FAMILY
)) {
122 TEST_UTIL
.waitUntilAllRegionsAssigned(tableName
);
123 TEST_UTIL
.loadTable(table
, FAMILY
);
124 final AtomicBoolean expectedExceptionCaught
= new AtomicBoolean(false);
125 // scan the table in the background
126 Thread scanner
= new Thread(new Runnable() {
129 Scan scan
= new Scan();
130 scan
.addFamily(FAMILY
);
131 scan
.setFilter(new DelayingFilter());
133 LOG
.info("Starting scan");
134 try (ResultScanner rs
= table
.getScanner(scan
)) {
139 LOG
.info("Scanned row " + Bytes
.toStringBinary(r
.getRow()));
143 } catch (IOException e
) {
144 LOG
.info("Scanner caught exception", e
);
145 expectedExceptionCaught
.set(true);
147 LOG
.info("Finished scan");
153 // Wait for the filter to begin sleeping
154 LOG
.info("Waiting for scanner to start");
155 Waiter
.waitFor(TEST_UTIL
.getConfiguration(), 10*1000, new Waiter
.Predicate
<Exception
>() {
157 public boolean evaluate() throws Exception
{
158 return DelayingFilter
.isSleeping();
162 // Offline the table, this will trigger closing
163 LOG
.info("Offlining table " + tableName
);
164 TEST_UTIL
.getAdmin().disableTable(tableName
);
166 // Wait for scanner termination
169 // When we get here the region has closed and the table is offline
170 assertTrue("Region operations were not interrupted",
171 InterruptInterceptingHRegion
.wasInterrupted());
172 assertTrue("Scanner did not catch expected exception", expectedExceptionCaught
.get());
177 public void testCloseInterruptMutation() throws Exception
{
178 final TableName tableName
= name
.getTableName();
179 final Admin admin
= TEST_UTIL
.getAdmin();
180 // Create the test table
181 TableDescriptor htd
= TableDescriptorBuilder
.newBuilder(tableName
)
182 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
))
183 .setCoprocessor(MutationDelayingCoprocessor
.class.getName())
185 LOG
.info("Creating table " + tableName
);
186 admin
.createTable(htd
);
187 TEST_UTIL
.waitUntilAllRegionsAssigned(tableName
);
189 // Insert some data in the background
190 LOG
.info("Starting writes to table " + tableName
);
191 final int NUM_ROWS
= 100;
192 final AtomicBoolean expectedExceptionCaught
= new AtomicBoolean(false);
193 Thread inserter
= new Thread(new Runnable() {
196 try (BufferedMutator t
= admin
.getConnection().getBufferedMutator(tableName
)) {
197 for (int i
= 0; i
< NUM_ROWS
; i
++) {
198 LOG
.info("Writing row " + i
+ " to " + tableName
);
199 byte[] value
= new byte[10], row
= Bytes
.toBytes(Integer
.toString(i
));
201 t
.mutate(new Put(row
).addColumn(FAMILY
, HConstants
.EMPTY_BYTE_ARRAY
, value
));
204 } catch (IOException e
) {
205 LOG
.info("Inserter caught exception", e
);
206 expectedExceptionCaught
.set(true);
212 // Wait for delayed insertion to begin
213 LOG
.info("Waiting for mutations to start");
214 Waiter
.waitFor(TEST_UTIL
.getConfiguration(), 10*1000, new Waiter
.Predicate
<Exception
>() {
216 public boolean evaluate() throws Exception
{
217 return MutationDelayingCoprocessor
.isSleeping();
221 // Offline the table, this will trigger closing
222 LOG
.info("Offlining table " + tableName
);
223 admin
.disableTable(tableName
);
225 // Wait for the inserter to finish
228 // When we get here the region has closed and the table is offline
229 assertTrue("Region operations were not interrupted",
230 InterruptInterceptingHRegion
.wasInterrupted());
231 assertTrue("Inserter did not catch expected exception", expectedExceptionCaught
.get());
235 public static class InterruptInterceptingHRegion
extends HRegion
{
237 private static boolean interrupted
= false;
239 public static boolean wasInterrupted() {
243 public InterruptInterceptingHRegion(Path tableDir
, WAL wal
, FileSystem fs
,
244 Configuration conf
, RegionInfo regionInfo
, TableDescriptor htd
,
245 RegionServerServices rsServices
) {
246 super(tableDir
, wal
, fs
, conf
, regionInfo
, htd
, rsServices
);
249 public InterruptInterceptingHRegion(HRegionFileSystem fs
, WAL wal
, Configuration conf
,
250 TableDescriptor htd
, RegionServerServices rsServices
) {
251 super(fs
, wal
, conf
, htd
, rsServices
);
255 void checkInterrupt() throws NotServingRegionException
, InterruptedIOException
{
257 super.checkInterrupt();
258 } catch (NotServingRegionException
| InterruptedIOException e
) {
265 IOException
throwOnInterrupt(Throwable t
) {
267 return super.throwOnInterrupt(t
);
272 public static class DelayingFilter
extends FilterBase
{
274 static volatile boolean sleeping
= false;
276 public static boolean isSleeping() {
281 public ReturnCode
filterCell(Cell v
) throws IOException
{
282 LOG
.info("Starting sleep on " + v
);
285 Thread
.sleep(sleepTime
);
286 } catch (InterruptedException e
) {
287 // restore interrupt status so region scanner can handle it as expected
288 Thread
.currentThread().interrupt();
289 LOG
.info("Interrupted during sleep on " + v
);
291 LOG
.info("Done sleep on " + v
);
294 return ReturnCode
.INCLUDE
;
297 public static DelayingFilter
parseFrom(final byte [] pbBytes
)
298 throws DeserializationException
{
299 // Just return a new instance.
300 return new DelayingFilter();
305 public static class MutationDelayingCoprocessor
implements RegionCoprocessor
, RegionObserver
{
307 static volatile boolean sleeping
= false;
309 public static boolean isSleeping() {
313 private void doSleep(Region
.Operation op
) {
314 LOG
.info("Starting sleep for " + op
);
317 Thread
.sleep(sleepTime
);
318 } catch (InterruptedException e
) {
319 // restore interrupt status so doMiniBatchMutation etc. can handle it as expected
320 Thread
.currentThread().interrupt();
321 LOG
.info("Interrupted during " + op
);
329 public Optional
<RegionObserver
> getRegionObserver() {
330 return Optional
.of(this);
334 public void prePut(ObserverContext
<RegionCoprocessorEnvironment
> c
, Put put
, WALEdit edit
,
335 Durability durability
) throws IOException
{
336 doSleep(Region
.Operation
.PUT
);
337 RegionObserver
.super.prePut(c
, put
, edit
, durability
);
341 public void preDelete(ObserverContext
<RegionCoprocessorEnvironment
> c
, Delete delete
,
342 WALEdit edit
, Durability durability
) throws IOException
{
343 doSleep(Region
.Operation
.DELETE
);
344 RegionObserver
.super.preDelete(c
, delete
, edit
, durability
);
348 public Result
preAppend(ObserverContext
<RegionCoprocessorEnvironment
> c
, Append append
)
350 doSleep(Region
.Operation
.APPEND
);
351 return RegionObserver
.super.preAppend(c
, append
);
355 public Result
preIncrement(ObserverContext
<RegionCoprocessorEnvironment
> c
, Increment increment
)
357 doSleep(Region
.Operation
.INCREMENT
);
358 return RegionObserver
.super.preIncrement(c
, increment
);