HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestScannerHeartbeatMessages.java
blobf678f2bcfab4d7c209b8b77911ab060541b22089
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.concurrent.Callable;
28 import org.apache.commons.lang3.exception.ExceptionUtils;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellComparator;
34 import org.apache.hadoop.hbase.CellComparatorImpl;
35 import org.apache.hadoop.hbase.CellUtil;
36 import org.apache.hadoop.hbase.HBaseClassTestRule;
37 import org.apache.hadoop.hbase.HBaseTestingUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.HTestConst;
40 import org.apache.hadoop.hbase.KeyValue;
41 import org.apache.hadoop.hbase.TableName;
42 import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
43 import org.apache.hadoop.hbase.client.AsyncConnection;
44 import org.apache.hadoop.hbase.client.AsyncTable;
45 import org.apache.hadoop.hbase.client.ConnectionFactory;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.RegionInfo;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.ResultScanner;
50 import org.apache.hadoop.hbase.client.Scan;
51 import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
52 import org.apache.hadoop.hbase.client.Table;
53 import org.apache.hadoop.hbase.client.TableDescriptor;
54 import org.apache.hadoop.hbase.filter.Filter;
55 import org.apache.hadoop.hbase.filter.FilterBase;
56 import org.apache.hadoop.hbase.testclassification.LargeTests;
57 import org.apache.hadoop.hbase.util.Bytes;
58 import org.apache.hadoop.hbase.util.Threads;
59 import org.apache.hadoop.hbase.wal.WAL;
60 import org.junit.After;
61 import org.junit.AfterClass;
62 import org.junit.Before;
63 import org.junit.BeforeClass;
64 import org.junit.ClassRule;
65 import org.junit.Test;
66 import org.junit.experimental.categories.Category;
68 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
69 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
70 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
72 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
73 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
75 /**
76 * Here we test to make sure that scans return the expected Results when the server is sending the
77 * Client heartbeat messages. Heartbeat messages are essentially keep-alive messages (they prevent
78 * the scanner on the client side from timing out). A heartbeat message is sent from the server to
79 * the client when the server has exceeded the time limit during the processing of the scan. When
80 * the time limit is reached, the server will return to the Client whatever Results it has
81 * accumulated (potentially empty).
83 @Category(LargeTests.class)
84 public class TestScannerHeartbeatMessages {
86 @ClassRule
87 public static final HBaseClassTestRule CLASS_RULE =
88 HBaseClassTestRule.forClass(TestScannerHeartbeatMessages.class);
90 private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
92 private static AsyncConnection CONN;
94 /**
95 * Table configuration
97 private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
99 private static int NUM_ROWS = 5;
100 private static byte[] ROW = Bytes.toBytes("testRow");
101 private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
103 private static int NUM_FAMILIES = 4;
104 private static byte[] FAMILY = Bytes.toBytes("testFamily");
105 private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES);
107 private static int NUM_QUALIFIERS = 3;
108 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
109 private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS);
111 private static int VALUE_SIZE = 128;
112 private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE);
114 // The time limit should be based on the rpc timeout at client, or the client will regards
115 // the request as timeout before server return a heartbeat.
116 private static int SERVER_TIMEOUT = 60000;
118 // Time, in milliseconds, that the client will wait for a response from the server before timing
119 // out. This value is used server side to determine when it is necessary to send a heartbeat
120 // message to the client. Time limit will be 500 ms.
121 private static int CLIENT_TIMEOUT = 1000;
123 // In this test, we sleep after reading each row. So we should make sure after we get some number
124 // of rows and sleep same times we must reach time limit, and do not timeout after next sleeping.
125 private static int DEFAULT_ROW_SLEEP_TIME = 300;
127 // Similar with row sleep time.
128 private static int DEFAULT_CF_SLEEP_TIME = 300;
130 @BeforeClass
131 public static void setUpBeforeClass() throws Exception {
132 Configuration conf = TEST_UTIL.getConfiguration();
134 conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
135 conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName());
136 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT);
137 conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
138 conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
140 // Check the timeout condition after every cell
141 conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
142 TEST_UTIL.startMiniCluster(1);
144 createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
146 Configuration newConf = new Configuration(conf);
147 newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
148 newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
149 CONN = ConnectionFactory.createAsyncConnection(newConf).get();
152 static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
153 byte[] cellValue) throws IOException {
154 Table ht = TEST_UTIL.createTable(name, families);
155 List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
156 ht.put(puts);
160 * Make puts to put the input value into each combination of row, family, and qualifier
162 static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
163 byte[] value) throws IOException {
164 Put put;
165 ArrayList<Put> puts = new ArrayList<>();
167 for (int row = 0; row < rows.length; row++) {
168 put = new Put(rows[row]);
169 for (int fam = 0; fam < families.length; fam++) {
170 for (int qual = 0; qual < qualifiers.length; qual++) {
171 KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value);
172 put.add(kv);
175 puts.add(put);
178 return puts;
181 @AfterClass
182 public static void tearDownAfterClass() throws Exception {
183 Closeables.close(CONN, true);
184 TEST_UTIL.shutdownMiniCluster();
187 @Before
188 public void setupBeforeTest() throws Exception {
189 disableSleeping();
192 @After
193 public void teardownAfterTest() throws Exception {
194 disableSleeping();
198 * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
199 * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
200 * disabled, the test should throw an exception.
202 private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
203 HeartbeatRPCServices.heartbeatsEnabled = true;
205 try {
206 testCallable.call();
207 } catch (Exception e) {
208 fail("Heartbeat messages are enabled, exceptions should NOT be thrown. Exception trace:"
209 + ExceptionUtils.getStackTrace(e));
212 HeartbeatRPCServices.heartbeatsEnabled = false;
213 try {
214 testCallable.call();
215 } catch (Exception e) {
216 return;
217 } finally {
218 HeartbeatRPCServices.heartbeatsEnabled = true;
220 fail("Heartbeats messages are disabled, an exception should be thrown. If an exception "
221 + " is not thrown, the test case is not testing the importance of heartbeat messages");
225 * Test the case that the time limit for the scan is reached after each full row of cells is
226 * fetched.
228 @Test
229 public void testHeartbeatBetweenRows() throws Exception {
230 testImportanceOfHeartbeats(new Callable<Void>() {
232 @Override
233 public Void call() throws Exception {
234 // Configure the scan so that it can read the entire table in a single RPC. We want to test
235 // the case where a scan stops on the server side due to a time limit
236 Scan scan = new Scan();
237 scan.setMaxResultSize(Long.MAX_VALUE);
238 scan.setCaching(Integer.MAX_VALUE);
240 testEquivalenceOfScanWithHeartbeats(scan, DEFAULT_ROW_SLEEP_TIME, -1, false);
241 return null;
247 * Test the case that the time limit for scans is reached in between column families
249 @Test
250 public void testHeartbeatBetweenColumnFamilies() throws Exception {
251 testImportanceOfHeartbeats(new Callable<Void>() {
252 @Override
253 public Void call() throws Exception {
254 // Configure the scan so that it can read the entire table in a single RPC. We want to test
255 // the case where a scan stops on the server side due to a time limit
256 Scan baseScan = new Scan();
257 baseScan.setMaxResultSize(Long.MAX_VALUE);
258 baseScan.setCaching(Integer.MAX_VALUE);
260 // Copy the scan before each test. When a scan object is used by a scanner, some of its
261 // fields may be changed such as start row
262 Scan scanCopy = new Scan(baseScan);
263 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, false);
264 scanCopy = new Scan(baseScan);
265 testEquivalenceOfScanWithHeartbeats(scanCopy, -1, DEFAULT_CF_SLEEP_TIME, true);
266 return null;
271 public static class SparseCellFilter extends FilterBase {
273 @Override
274 public ReturnCode filterCell(final Cell v) throws IOException {
275 try {
276 Thread.sleep(CLIENT_TIMEOUT / 2 + 100);
277 } catch (InterruptedException e) {
278 Thread.currentThread().interrupt();
280 return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ? ReturnCode.INCLUDE
281 : ReturnCode.SKIP;
284 public static Filter parseFrom(final byte[] pbBytes) {
285 return new SparseCellFilter();
289 public static class SparseRowFilter extends FilterBase {
291 @Override
292 public boolean filterRowKey(Cell cell) throws IOException {
293 try {
294 Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
295 } catch (InterruptedException e) {
296 Thread.currentThread().interrupt();
298 return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
301 public static Filter parseFrom(final byte[] pbBytes) {
302 return new SparseRowFilter();
307 * Test the case that there is a filter which filters most of cells
309 @Test
310 public void testHeartbeatWithSparseCellFilter() throws Exception {
311 testImportanceOfHeartbeats(new Callable<Void>() {
312 @Override
313 public Void call() throws Exception {
314 Scan scan = new Scan();
315 scan.setMaxResultSize(Long.MAX_VALUE);
316 scan.setCaching(Integer.MAX_VALUE);
317 scan.setFilter(new SparseCellFilter());
318 try (ScanPerNextResultScanner scanner =
319 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
320 int num = 0;
321 while (scanner.next() != null) {
322 num++;
324 assertEquals(1, num);
327 scan = new Scan();
328 scan.setMaxResultSize(Long.MAX_VALUE);
329 scan.setCaching(Integer.MAX_VALUE);
330 scan.setFilter(new SparseCellFilter());
331 scan.setAllowPartialResults(true);
332 try (ScanPerNextResultScanner scanner =
333 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
334 int num = 0;
335 while (scanner.next() != null) {
336 num++;
338 assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
341 return null;
347 * Test the case that there is a filter which filters most of rows
349 @Test
350 public void testHeartbeatWithSparseRowFilter() throws Exception {
351 testImportanceOfHeartbeats(new Callable<Void>() {
352 @Override
353 public Void call() throws Exception {
354 Scan scan = new Scan();
355 scan.setMaxResultSize(Long.MAX_VALUE);
356 scan.setCaching(Integer.MAX_VALUE);
357 scan.setFilter(new SparseRowFilter());
358 try (ScanPerNextResultScanner scanner =
359 new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
360 int num = 0;
361 while (scanner.next() != null) {
362 num++;
364 assertEquals(1, num);
367 return null;
373 * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
374 * necessary
375 * @param scan The scan configuration being tested
376 * @param rowSleepTime The time to sleep between fetches of row cells
377 * @param cfSleepTime The time to sleep between fetches of column family cells
378 * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
379 * that column family are fetched
381 private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
382 int cfSleepTime, boolean sleepBeforeCf) throws Exception {
383 disableSleeping();
384 AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
385 final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
386 final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
388 Result r1 = null;
389 Result r2 = null;
391 while ((r1 = scanner.next()) != null) {
392 // Enforce the specified sleep conditions during calls to the heartbeat scanner
393 configureSleepTime(rowSleepTime, cfSleepTime, sleepBeforeCf);
394 r2 = scannerWithHeartbeats.next();
395 disableSleeping();
397 assertTrue(r2 != null);
398 try {
399 Result.compareResults(r1, r2);
400 } catch (Exception e) {
401 fail(e.getMessage());
405 assertTrue(scannerWithHeartbeats.next() == null);
406 scanner.close();
407 scannerWithHeartbeats.close();
411 * Helper method for setting the time to sleep between rows and column families. If a sleep time
412 * is negative then that sleep will be disabled
414 private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
415 HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;
416 HeartbeatHRegion.rowSleepTime = rowSleepTime;
418 HeartbeatHRegion.sleepBetweenColumnFamilies = cfSleepTime > 0;
419 HeartbeatHRegion.columnFamilySleepTime = cfSleepTime;
420 HeartbeatHRegion.sleepBeforeColumnFamily = sleepBeforeCf;
424 * Disable the sleeping mechanism server side.
426 private static void disableSleeping() {
427 HeartbeatHRegion.sleepBetweenRows = false;
428 HeartbeatHRegion.sleepBetweenColumnFamilies = false;
432 * Custom HRegionServer instance that instantiates {@link HeartbeatRPCServices} in place of
433 * {@link RSRpcServices} to allow us to toggle support for heartbeat messages
435 private static class HeartbeatHRegionServer extends HRegionServer {
436 public HeartbeatHRegionServer(Configuration conf) throws IOException, InterruptedException {
437 super(conf);
440 @Override
441 protected RSRpcServices createRpcServices() throws IOException {
442 return new HeartbeatRPCServices(this);
447 * Custom RSRpcServices instance that allows heartbeat support to be toggled
449 private static class HeartbeatRPCServices extends RSRpcServices {
450 private static volatile boolean heartbeatsEnabled = true;
452 public HeartbeatRPCServices(HRegionServer rs) throws IOException {
453 super(rs);
456 @Override
457 public ScanResponse scan(RpcController controller, ScanRequest request)
458 throws ServiceException {
459 ScanRequest.Builder builder = ScanRequest.newBuilder(request);
460 builder.setClientHandlesHeartbeats(heartbeatsEnabled);
461 return super.scan(controller, builder.build());
466 * Custom HRegion class that instantiates {@link RegionScanner}s with configurable sleep times
467 * between fetches of row Results and/or column family cells. Useful for emulating an instance
468 * where the server is taking a long time to process a client's scan request
470 private static class HeartbeatHRegion extends HRegion {
471 // Row sleeps occur AFTER each row worth of cells is retrieved.
472 private static volatile int rowSleepTime = DEFAULT_ROW_SLEEP_TIME;
473 private static volatile boolean sleepBetweenRows = false;
475 // The sleep for column families can be initiated before or after we fetch the cells for the
476 // column family. If the sleep occurs BEFORE then the time limits will be reached inside
477 // StoreScanner while we are fetching individual cells. If the sleep occurs AFTER then the time
478 // limit will be reached inside RegionScanner after all the cells for a column family have been
479 // retrieved.
480 private static volatile boolean sleepBeforeColumnFamily = false;
481 private static volatile int columnFamilySleepTime = DEFAULT_CF_SLEEP_TIME;
482 private static volatile boolean sleepBetweenColumnFamilies = false;
484 public HeartbeatHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
485 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
486 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
489 public HeartbeatHRegion(HRegionFileSystem fs, WAL wal, Configuration confParam,
490 TableDescriptor htd, RegionServerServices rsServices) {
491 super(fs, wal, confParam, htd, rsServices);
494 private static void columnFamilySleep() {
495 if (sleepBetweenColumnFamilies) {
496 Threads.sleepWithoutInterrupt(columnFamilySleepTime);
500 private static void rowSleep() {
501 if (sleepBetweenRows) {
502 Threads.sleepWithoutInterrupt(rowSleepTime);
506 // Instantiate the custom heartbeat region scanners
507 @Override
508 protected RegionScannerImpl instantiateRegionScanner(Scan scan,
509 List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) throws IOException {
510 if (scan.isReversed()) {
511 if (scan.getFilter() != null) {
512 scan.getFilter().setReversed(true);
514 return new HeartbeatReversedRegionScanner(scan, additionalScanners, this);
516 return new HeartbeatRegionScanner(scan, additionalScanners, this);
521 * Custom ReversedRegionScanner that can be configured to sleep between retrievals of row Results
522 * and/or column family cells
524 private static class HeartbeatReversedRegionScanner extends ReversedRegionScannerImpl {
525 HeartbeatReversedRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners,
526 HRegion region) throws IOException {
527 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
530 @Override
531 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
532 boolean moreRows = super.nextRaw(outResults, context);
533 HeartbeatHRegion.rowSleep();
534 return moreRows;
537 @Override
538 protected void initializeKVHeap(List<KeyValueScanner> scanners,
539 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
540 this.storeHeap =
541 new HeartbeatReversedKVHeap(scanners, (CellComparatorImpl) region.getCellComparator());
542 if (!joinedScanners.isEmpty()) {
543 this.joinedHeap = new HeartbeatReversedKVHeap(joinedScanners,
544 (CellComparatorImpl) region.getCellComparator());
550 * Custom RegionScanner that can be configured to sleep between retrievals of row Results and/or
551 * column family cells
553 private static class HeartbeatRegionScanner extends RegionScannerImpl {
554 HeartbeatRegionScanner(Scan scan, List<KeyValueScanner> additionalScanners, HRegion region)
555 throws IOException {
556 super(scan, additionalScanners, region, HConstants.NO_NONCE, HConstants.NO_NONCE);
559 @Override
560 public boolean nextRaw(List<Cell> outResults, ScannerContext context) throws IOException {
561 boolean moreRows = super.nextRaw(outResults, context);
562 HeartbeatHRegion.rowSleep();
563 return moreRows;
566 @Override
567 protected void initializeKVHeap(List<KeyValueScanner> scanners,
568 List<KeyValueScanner> joinedScanners, HRegion region) throws IOException {
569 this.storeHeap =
570 new HeartbeatKVHeap(scanners, region.getCellComparator());
571 if (!joinedScanners.isEmpty()) {
572 this.joinedHeap =
573 new HeartbeatKVHeap(joinedScanners, region.getCellComparator());
579 * Custom KV Heap that can be configured to sleep/wait in between retrievals of column family
580 * cells. Useful for testing
582 private static final class HeartbeatKVHeap extends KeyValueHeap {
583 public HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, CellComparator comparator)
584 throws IOException {
585 super(scanners, comparator);
588 HeartbeatKVHeap(List<? extends KeyValueScanner> scanners, KVScannerComparator comparator)
589 throws IOException {
590 super(scanners, comparator);
593 @Override
594 public boolean next(List<Cell> result, ScannerContext context) throws IOException {
595 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
596 boolean moreRows = super.next(result, context);
597 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
598 return moreRows;
603 * Custom reversed KV Heap that can be configured to sleep in between retrievals of column family
604 * cells.
606 private static final class HeartbeatReversedKVHeap extends ReversedKeyValueHeap {
607 public HeartbeatReversedKVHeap(List<? extends KeyValueScanner> scanners,
608 CellComparatorImpl comparator) throws IOException {
609 super(scanners, comparator);
612 @Override
613 public boolean next(List<Cell> result, ScannerContext context) throws IOException {
614 if (HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
615 boolean moreRows = super.next(result, context);
616 if (!HeartbeatHRegion.sleepBeforeColumnFamily) HeartbeatHRegion.columnFamilySleep();
617 return moreRows;