HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestRegionInterrupt.java
blob1e3ce753037b71285e4d03a1d36cd716087198a1
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.regionserver;
21 import static org.junit.Assert.assertTrue;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.Optional;
26 import java.util.concurrent.atomic.AtomicBoolean;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.HBaseClassTestRule;
33 import org.apache.hadoop.hbase.HBaseTestingUtil;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.NotServingRegionException;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.TableNameTestRule;
38 import org.apache.hadoop.hbase.Waiter;
39 import org.apache.hadoop.hbase.client.Admin;
40 import org.apache.hadoop.hbase.client.Append;
41 import org.apache.hadoop.hbase.client.BufferedMutator;
42 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
43 import org.apache.hadoop.hbase.client.Delete;
44 import org.apache.hadoop.hbase.client.Durability;
45 import org.apache.hadoop.hbase.client.Increment;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.RegionInfo;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.ResultScanner;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.client.Table;
52 import org.apache.hadoop.hbase.client.TableDescriptor;
53 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
54 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
55 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
56 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
57 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
58 import org.apache.hadoop.hbase.exceptions.DeserializationException;
59 import org.apache.hadoop.hbase.filter.FilterBase;
60 import org.apache.hadoop.hbase.testclassification.LargeTests;
61 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.wal.WAL;
64 import org.apache.hadoop.hbase.wal.WALEdit;
65 import org.junit.After;
66 import org.junit.Before;
67 import org.junit.BeforeClass;
68 import org.junit.ClassRule;
69 import org.junit.Rule;
70 import org.junit.Test;
71 import org.junit.experimental.categories.Category;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
75 @Category({RegionServerTests.class, LargeTests.class})
76 public class TestRegionInterrupt {
78 @ClassRule
79 public static final HBaseClassTestRule CLASS_RULE =
80 HBaseClassTestRule.forClass(TestRegionInterrupt.class);
82 private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
83 private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class);
85 static final byte[] FAMILY = Bytes.toBytes("info");
87 static long sleepTime;
89 @Rule
90 public TableNameTestRule name = new TableNameTestRule();
92 @BeforeClass
93 public static void setUpBeforeClass() throws Exception {
94 Configuration conf = TEST_UTIL.getConfiguration();
95 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
96 conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class);
97 conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
98 // Ensure the sleep interval is long enough for interrupts to occur.
99 long waitInterval = conf.getLong(HRegion.CLOSE_WAIT_INTERVAL,
100 HRegion.DEFAULT_CLOSE_WAIT_INTERVAL);
101 sleepTime = waitInterval * 2;
102 // Try to bound the running time of this unit if expected actions do not take place.
103 conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2);
106 @Before
107 public void setUp() throws Exception {
108 TEST_UTIL.startMiniCluster();
111 @After
112 public void tearDown() throws Exception {
113 TEST_UTIL.shutdownMiniCluster();
116 @Test
117 public void testCloseInterruptScanning() throws Exception {
118 final TableName tableName = name.getTableName();
119 LOG.info("Creating table " + tableName);
120 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
121 // load some data
122 TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
123 TEST_UTIL.loadTable(table, FAMILY);
124 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
125 // scan the table in the background
126 Thread scanner = new Thread(new Runnable() {
127 @Override
128 public void run() {
129 Scan scan = new Scan();
130 scan.addFamily(FAMILY);
131 scan.setFilter(new DelayingFilter());
132 try {
133 LOG.info("Starting scan");
134 try (ResultScanner rs = table.getScanner(scan)) {
135 Result r;
136 do {
137 r = rs.next();
138 if (r != null) {
139 LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow()));
141 } while (r != null);
143 } catch (IOException e) {
144 LOG.info("Scanner caught exception", e);
145 expectedExceptionCaught.set(true);
146 } finally {
147 LOG.info("Finished scan");
151 scanner.start();
153 // Wait for the filter to begin sleeping
154 LOG.info("Waiting for scanner to start");
155 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
156 @Override
157 public boolean evaluate() throws Exception {
158 return DelayingFilter.isSleeping();
162 // Offline the table, this will trigger closing
163 LOG.info("Offlining table " + tableName);
164 TEST_UTIL.getAdmin().disableTable(tableName);
166 // Wait for scanner termination
167 scanner.join();
169 // When we get here the region has closed and the table is offline
170 assertTrue("Region operations were not interrupted",
171 InterruptInterceptingHRegion.wasInterrupted());
172 assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get());
176 @Test
177 public void testCloseInterruptMutation() throws Exception {
178 final TableName tableName = name.getTableName();
179 final Admin admin = TEST_UTIL.getAdmin();
180 // Create the test table
181 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
182 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
183 .setCoprocessor(MutationDelayingCoprocessor.class.getName())
184 .build();
185 LOG.info("Creating table " + tableName);
186 admin.createTable(htd);
187 TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
189 // Insert some data in the background
190 LOG.info("Starting writes to table " + tableName);
191 final int NUM_ROWS = 100;
192 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
193 Thread inserter = new Thread(new Runnable() {
194 @Override
195 public void run() {
196 try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) {
197 for (int i = 0; i < NUM_ROWS; i++) {
198 LOG.info("Writing row " + i + " to " + tableName);
199 byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i));
200 Bytes.random(value);
201 t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value));
202 t.flush();
204 } catch (IOException e) {
205 LOG.info("Inserter caught exception", e);
206 expectedExceptionCaught.set(true);
210 inserter.start();
212 // Wait for delayed insertion to begin
213 LOG.info("Waiting for mutations to start");
214 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
215 @Override
216 public boolean evaluate() throws Exception {
217 return MutationDelayingCoprocessor.isSleeping();
221 // Offline the table, this will trigger closing
222 LOG.info("Offlining table " + tableName);
223 admin.disableTable(tableName);
225 // Wait for the inserter to finish
226 inserter.join();
228 // When we get here the region has closed and the table is offline
229 assertTrue("Region operations were not interrupted",
230 InterruptInterceptingHRegion.wasInterrupted());
231 assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get());
235 public static class InterruptInterceptingHRegion extends HRegion {
237 private static boolean interrupted = false;
239 public static boolean wasInterrupted() {
240 return interrupted;
243 public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs,
244 Configuration conf, RegionInfo regionInfo, TableDescriptor htd,
245 RegionServerServices rsServices) {
246 super(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
249 public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf,
250 TableDescriptor htd, RegionServerServices rsServices) {
251 super(fs, wal, conf, htd, rsServices);
254 @Override
255 void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
256 try {
257 super.checkInterrupt();
258 } catch (NotServingRegionException | InterruptedIOException e) {
259 interrupted = true;
260 throw e;
264 @Override
265 IOException throwOnInterrupt(Throwable t) {
266 interrupted = true;
267 return super.throwOnInterrupt(t);
272 public static class DelayingFilter extends FilterBase {
274 static volatile boolean sleeping = false;
276 public static boolean isSleeping() {
277 return sleeping;
280 @Override
281 public ReturnCode filterCell(Cell v) throws IOException {
282 LOG.info("Starting sleep on " + v);
283 sleeping = true;
284 try {
285 Thread.sleep(sleepTime);
286 } catch (InterruptedException e) {
287 // restore interrupt status so region scanner can handle it as expected
288 Thread.currentThread().interrupt();
289 LOG.info("Interrupted during sleep on " + v);
290 } finally {
291 LOG.info("Done sleep on " + v);
292 sleeping = false;
294 return ReturnCode.INCLUDE;
297 public static DelayingFilter parseFrom(final byte [] pbBytes)
298 throws DeserializationException {
299 // Just return a new instance.
300 return new DelayingFilter();
305 public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver {
307 static volatile boolean sleeping = false;
309 public static boolean isSleeping() {
310 return sleeping;
313 private void doSleep(Region.Operation op) {
314 LOG.info("Starting sleep for " + op);
315 sleeping = true;
316 try {
317 Thread.sleep(sleepTime);
318 } catch (InterruptedException e) {
319 // restore interrupt status so doMiniBatchMutation etc. can handle it as expected
320 Thread.currentThread().interrupt();
321 LOG.info("Interrupted during " + op);
322 } finally {
323 LOG.info("Done");
324 sleeping = false;
328 @Override
329 public Optional<RegionObserver> getRegionObserver() {
330 return Optional.of(this);
333 @Override
334 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
335 Durability durability) throws IOException {
336 doSleep(Region.Operation.PUT);
337 RegionObserver.super.prePut(c, put, edit, durability);
340 @Override
341 public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
342 WALEdit edit, Durability durability) throws IOException {
343 doSleep(Region.Operation.DELETE);
344 RegionObserver.super.preDelete(c, delete, edit, durability);
347 @Override
348 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
349 throws IOException {
350 doSleep(Region.Operation.APPEND);
351 return RegionObserver.super.preAppend(c, append);
354 @Override
355 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
356 throws IOException {
357 doSleep(Region.Operation.INCREMENT);
358 return RegionObserver.super.preIncrement(c, increment);