HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestBlockEvictionFromClient.java
blobd5babfc4864d67c02f4244e6b9efaac1630913f4
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicReference;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.HBaseClassTestRule;
37 import org.apache.hadoop.hbase.HBaseTestingUtility;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
42 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
43 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
44 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
45 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
47 import org.apache.hadoop.hbase.io.hfile.BlockCache;
48 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
49 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
51 import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
52 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
53 import org.apache.hadoop.hbase.regionserver.HRegion;
54 import org.apache.hadoop.hbase.regionserver.HStore;
55 import org.apache.hadoop.hbase.regionserver.InternalScanner;
56 import org.apache.hadoop.hbase.regionserver.RegionScanner;
57 import org.apache.hadoop.hbase.regionserver.ScannerContext;
58 import org.apache.hadoop.hbase.testclassification.ClientTests;
59 import org.apache.hadoop.hbase.testclassification.LargeTests;
60 import org.apache.hadoop.hbase.util.Bytes;
61 import org.junit.After;
62 import org.junit.AfterClass;
63 import org.junit.Before;
64 import org.junit.BeforeClass;
65 import org.junit.ClassRule;
66 import org.junit.Rule;
67 import org.junit.Test;
68 import org.junit.experimental.categories.Category;
69 import org.junit.rules.TestName;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
73 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
75 @Category({ LargeTests.class, ClientTests.class })
76 @SuppressWarnings("deprecation")
77 public class TestBlockEvictionFromClient {
79 @ClassRule
80 public static final HBaseClassTestRule CLASS_RULE =
81 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
83 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
84 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
85 static byte[][] ROWS = new byte[2][];
86 private static int NO_OF_THREADS = 3;
87 private static byte[] ROW = Bytes.toBytes("testRow");
88 private static byte[] ROW1 = Bytes.toBytes("testRow1");
89 private static byte[] ROW2 = Bytes.toBytes("testRow2");
90 private static byte[] ROW3 = Bytes.toBytes("testRow3");
91 private static byte[] FAMILY = Bytes.toBytes("testFamily");
92 private static byte[][] FAMILIES_1 = new byte[1][0];
93 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
94 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
95 private static byte[] data = new byte[1000];
96 private static byte[] data2 = Bytes.add(data, data);
97 protected static int SLAVES = 1;
98 private static CountDownLatch latch;
99 private static CountDownLatch getLatch;
100 private static CountDownLatch compactionLatch;
101 private static CountDownLatch exceptionLatch;
103 @Rule
104 public TestName name = new TestName();
107 * @throws java.lang.Exception
109 @BeforeClass
110 public static void setUpBeforeClass() throws Exception {
111 ROWS[0] = ROW;
112 ROWS[1] = ROW1;
113 Configuration conf = TEST_UTIL.getConfiguration();
114 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
115 MultiRowMutationEndpoint.class.getName());
116 conf.setInt("hbase.regionserver.handler.count", 20);
117 conf.setInt("hbase.bucketcache.size", 400);
118 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
119 conf.setFloat("hfile.block.cache.size", 0.2f);
120 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
121 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
122 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
123 FAMILIES_1[0] = FAMILY;
124 TEST_UTIL.startMiniCluster(SLAVES);
128 * @throws java.lang.Exception
130 @AfterClass
131 public static void tearDownAfterClass() throws Exception {
132 TEST_UTIL.shutdownMiniCluster();
136 * @throws java.lang.Exception
138 @Before
139 public void setUp() throws Exception {
140 CustomInnerRegionObserver.waitForGets.set(false);
141 CustomInnerRegionObserver.countOfNext.set(0);
142 CustomInnerRegionObserver.countOfGets.set(0);
146 * @throws java.lang.Exception
148 @After
149 public void tearDown() throws Exception {
150 if (latch != null) {
151 while (latch.getCount() > 0) {
152 latch.countDown();
155 if (getLatch != null) {
156 getLatch.countDown();
158 if (compactionLatch != null) {
159 compactionLatch.countDown();
161 if (exceptionLatch != null) {
162 exceptionLatch.countDown();
164 latch = null;
165 getLatch = null;
166 compactionLatch = null;
167 exceptionLatch = null;
168 CustomInnerRegionObserver.throwException.set(false);
169 // Clean up the tables for every test case
170 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
171 for (TableName tableName : listTableNames) {
172 if (!tableName.isSystemTable()) {
173 TEST_UTIL.getAdmin().disableTable(tableName);
174 TEST_UTIL.getAdmin().deleteTable(tableName);
179 @Test
180 public void testBlockEvictionWithParallelScans() throws Exception {
181 Table table = null;
182 try {
183 latch = new CountDownLatch(1);
184 final TableName tableName = TableName.valueOf(name.getMethodName());
185 // Create a table with block size as 1024
186 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
187 CustomInnerRegionObserver.class.getName());
188 // get the block cache and region
189 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
190 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
191 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
192 .getRegion(regionName);
193 HStore store = region.getStores().iterator().next();
194 CacheConfig cacheConf = store.getCacheConfig();
195 cacheConf.setCacheDataOnWrite(true);
196 cacheConf.setEvictOnClose(true);
197 BlockCache cache = cacheConf.getBlockCache().get();
199 // insert data. 2 Rows are added
200 Put put = new Put(ROW);
201 put.addColumn(FAMILY, QUALIFIER, data);
202 table.put(put);
203 put = new Put(ROW1);
204 put.addColumn(FAMILY, QUALIFIER, data);
205 table.put(put);
206 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
207 // data was in memstore so don't expect any changes
208 // flush the data
209 // Should create one Hfile with 2 blocks
210 region.flush(true);
211 // Load cache
212 // Create three sets of scan
213 ScanThread[] scanThreads = initiateScan(table, false);
214 Thread.sleep(100);
215 checkForBlockEviction(cache, false, false);
216 for (ScanThread thread : scanThreads) {
217 thread.join();
219 // CustomInnerRegionObserver.sleepTime.set(0);
220 Iterator<CachedBlock> iterator = cache.iterator();
221 iterateBlockCache(cache, iterator);
222 // read the data and expect same blocks, one new hit, no misses
223 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
224 iterator = cache.iterator();
225 iterateBlockCache(cache, iterator);
226 // Check how this miss is happening
227 // insert a second column, read the row, no new blocks, 3 new hits
228 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
229 byte[] data2 = Bytes.add(data, data);
230 put = new Put(ROW);
231 put.addColumn(FAMILY, QUALIFIER2, data2);
232 table.put(put);
233 Result r = table.get(new Get(ROW));
234 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
235 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
236 iterator = cache.iterator();
237 iterateBlockCache(cache, iterator);
238 // flush, one new block
239 System.out.println("Flushing cache");
240 region.flush(true);
241 iterator = cache.iterator();
242 iterateBlockCache(cache, iterator);
243 // compact, net minus two blocks, two hits, no misses
244 System.out.println("Compacting");
245 assertEquals(2, store.getStorefilesCount());
246 store.triggerMajorCompaction();
247 region.compact(true);
248 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
249 assertEquals(1, store.getStorefilesCount());
250 iterator = cache.iterator();
251 iterateBlockCache(cache, iterator);
252 // read the row, this should be a cache miss because we don't cache data
253 // blocks on compaction
254 r = table.get(new Get(ROW));
255 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
256 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
257 iterator = cache.iterator();
258 iterateBlockCache(cache, iterator);
259 } finally {
260 if (table != null) {
261 table.close();
266 @Test
267 public void testParallelGetsAndScans() throws IOException, InterruptedException {
268 Table table = null;
269 try {
270 latch = new CountDownLatch(2);
271 // Check if get() returns blocks on its close() itself
272 getLatch = new CountDownLatch(1);
273 final TableName tableName = TableName.valueOf(name.getMethodName());
274 // Create KV that will give you two blocks
275 // Create a table with block size as 1024
276 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
277 CustomInnerRegionObserver.class.getName());
278 // get the block cache and region
279 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
280 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
281 HRegion region =
282 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
283 HStore store = region.getStores().iterator().next();
284 CacheConfig cacheConf = store.getCacheConfig();
285 cacheConf.setCacheDataOnWrite(true);
286 cacheConf.setEvictOnClose(true);
287 BlockCache cache = cacheConf.getBlockCache().get();
289 insertData(table);
290 // flush the data
291 System.out.println("Flushing cache");
292 // Should create one Hfile with 2 blocks
293 region.flush(true);
294 // Create three sets of scan
295 CustomInnerRegionObserver.waitForGets.set(true);
296 ScanThread[] scanThreads = initiateScan(table, false);
297 // Create three sets of gets
298 GetThread[] getThreads = initiateGet(table, false, false);
299 checkForBlockEviction(cache, false, false);
300 CustomInnerRegionObserver.waitForGets.set(false);
301 checkForBlockEviction(cache, false, false);
302 for (GetThread thread : getThreads) {
303 thread.join();
305 // Verify whether the gets have returned the blocks that it had
306 CustomInnerRegionObserver.waitForGets.set(true);
307 // giving some time for the block to be decremented
308 checkForBlockEviction(cache, true, false);
309 getLatch.countDown();
310 for (ScanThread thread : scanThreads) {
311 thread.join();
313 System.out.println("Scans should have returned the bloks");
314 // Check with either true or false
315 CustomInnerRegionObserver.waitForGets.set(false);
316 // The scan should also have released the blocks by now
317 checkForBlockEviction(cache, true, true);
318 } finally {
319 if (table != null) {
320 table.close();
325 @Test
326 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
327 Table table = null;
328 try {
329 latch = new CountDownLatch(1);
330 // Check if get() returns blocks on its close() itself
331 getLatch = new CountDownLatch(1);
332 final TableName tableName = TableName.valueOf(name.getMethodName());
333 // Create KV that will give you two blocks
334 // Create a table with block size as 1024
335 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
336 CustomInnerRegionObserver.class.getName());
337 // get the block cache and region
338 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
339 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
340 HRegion region =
341 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
342 HStore store = region.getStores().iterator().next();
343 CacheConfig cacheConf = store.getCacheConfig();
344 cacheConf.setCacheDataOnWrite(true);
345 cacheConf.setEvictOnClose(true);
346 BlockCache cache = cacheConf.getBlockCache().get();
348 Put put = new Put(ROW);
349 put.addColumn(FAMILY, QUALIFIER, data);
350 table.put(put);
351 region.flush(true);
352 put = new Put(ROW1);
353 put.addColumn(FAMILY, QUALIFIER, data);
354 table.put(put);
355 region.flush(true);
356 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
357 put = new Put(ROW);
358 put.addColumn(FAMILY, QUALIFIER2, data2);
359 table.put(put);
360 region.flush(true);
361 // flush the data
362 System.out.println("Flushing cache");
363 // Should create one Hfile with 2 blocks
364 CustomInnerRegionObserver.waitForGets.set(true);
365 // Create three sets of gets
366 GetThread[] getThreads = initiateGet(table, false, false);
367 Thread.sleep(200);
368 CustomInnerRegionObserver.getCdl().get().countDown();
369 for (GetThread thread : getThreads) {
370 thread.join();
372 // Verify whether the gets have returned the blocks that it had
373 CustomInnerRegionObserver.waitForGets.set(true);
374 // giving some time for the block to be decremented
375 checkForBlockEviction(cache, true, false);
376 getLatch.countDown();
377 System.out.println("Gets should have returned the bloks");
378 } finally {
379 if (table != null) {
380 table.close();
385 @Test
386 // TODO : check how block index works here
387 public void testGetsWithMultiColumnsAndExplicitTracker()
388 throws IOException, InterruptedException {
389 Table table = null;
390 try {
391 latch = new CountDownLatch(1);
392 // Check if get() returns blocks on its close() itself
393 getLatch = new CountDownLatch(1);
394 final TableName tableName = TableName.valueOf(name.getMethodName());
395 // Create KV that will give you two blocks
396 // Create a table with block size as 1024
397 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
398 CustomInnerRegionObserver.class.getName());
399 // get the block cache and region
400 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
401 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
402 HRegion region =
403 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
404 BlockCache cache = setCacheProperties(region);
405 Put put = new Put(ROW);
406 put.addColumn(FAMILY, QUALIFIER, data);
407 table.put(put);
408 region.flush(true);
409 put = new Put(ROW1);
410 put.addColumn(FAMILY, QUALIFIER, data);
411 table.put(put);
412 region.flush(true);
413 for (int i = 1; i < 10; i++) {
414 put = new Put(ROW);
415 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
416 table.put(put);
417 if (i % 2 == 0) {
418 region.flush(true);
421 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
422 put = new Put(ROW);
423 put.addColumn(FAMILY, QUALIFIER2, data2);
424 table.put(put);
425 region.flush(true);
426 // flush the data
427 System.out.println("Flushing cache");
428 // Should create one Hfile with 2 blocks
429 CustomInnerRegionObserver.waitForGets.set(true);
430 // Create three sets of gets
431 GetThread[] getThreads = initiateGet(table, true, false);
432 Thread.sleep(200);
433 Iterator<CachedBlock> iterator = cache.iterator();
434 boolean usedBlocksFound = false;
435 int refCount = 0;
436 int noOfBlocksWithRef = 0;
437 while (iterator.hasNext()) {
438 CachedBlock next = iterator.next();
439 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
440 if (cache instanceof BucketCache) {
441 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
442 } else if (cache instanceof CombinedBlockCache) {
443 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
444 } else {
445 continue;
447 if (refCount != 0) {
448 // Blocks will be with count 3
449 System.out.println("The refCount is " + refCount);
450 assertEquals(NO_OF_THREADS, refCount);
451 usedBlocksFound = true;
452 noOfBlocksWithRef++;
455 assertTrue(usedBlocksFound);
456 // the number of blocks referred
457 assertEquals(10, noOfBlocksWithRef);
458 CustomInnerRegionObserver.getCdl().get().countDown();
459 for (GetThread thread : getThreads) {
460 thread.join();
462 // Verify whether the gets have returned the blocks that it had
463 CustomInnerRegionObserver.waitForGets.set(true);
464 // giving some time for the block to be decremented
465 checkForBlockEviction(cache, true, false);
466 getLatch.countDown();
467 System.out.println("Gets should have returned the bloks");
468 } finally {
469 if (table != null) {
470 table.close();
475 @Test
476 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
477 Table table = null;
478 try {
479 latch = new CountDownLatch(1);
480 // Check if get() returns blocks on its close() itself
481 getLatch = new CountDownLatch(1);
482 final TableName tableName = TableName.valueOf(name.getMethodName());
483 // Create KV that will give you two blocks
484 // Create a table with block size as 1024
485 byte[][] fams = new byte[10][];
486 fams[0] = FAMILY;
487 for (int i = 1; i < 10; i++) {
488 fams[i] = (Bytes.toBytes("testFamily" + i));
490 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
491 CustomInnerRegionObserver.class.getName());
492 // get the block cache and region
493 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
494 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
495 HRegion region =
496 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
497 BlockCache cache = setCacheProperties(region);
499 Put put = new Put(ROW);
500 put.addColumn(FAMILY, QUALIFIER, data);
501 table.put(put);
502 region.flush(true);
503 put = new Put(ROW1);
504 put.addColumn(FAMILY, QUALIFIER, data);
505 table.put(put);
506 region.flush(true);
507 for (int i = 1; i < 10; i++) {
508 put = new Put(ROW);
509 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
510 table.put(put);
511 if (i % 2 == 0) {
512 region.flush(true);
515 region.flush(true);
516 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
517 put = new Put(ROW);
518 put.addColumn(FAMILY, QUALIFIER2, data2);
519 table.put(put);
520 region.flush(true);
521 // flush the data
522 System.out.println("Flushing cache");
523 // Should create one Hfile with 2 blocks
524 CustomInnerRegionObserver.waitForGets.set(true);
525 // Create three sets of gets
526 GetThread[] getThreads = initiateGet(table, true, true);
527 Thread.sleep(200);
528 Iterator<CachedBlock> iterator = cache.iterator();
529 boolean usedBlocksFound = false;
530 int refCount = 0;
531 int noOfBlocksWithRef = 0;
532 while (iterator.hasNext()) {
533 CachedBlock next = iterator.next();
534 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
535 if (cache instanceof BucketCache) {
536 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
537 } else if (cache instanceof CombinedBlockCache) {
538 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
539 } else {
540 continue;
542 if (refCount != 0) {
543 // Blocks will be with count 3
544 System.out.println("The refCount is " + refCount);
545 assertEquals(NO_OF_THREADS, refCount);
546 usedBlocksFound = true;
547 noOfBlocksWithRef++;
550 assertTrue(usedBlocksFound);
551 // the number of blocks referred
552 assertEquals(3, noOfBlocksWithRef);
553 CustomInnerRegionObserver.getCdl().get().countDown();
554 for (GetThread thread : getThreads) {
555 thread.join();
557 // Verify whether the gets have returned the blocks that it had
558 CustomInnerRegionObserver.waitForGets.set(true);
559 // giving some time for the block to be decremented
560 checkForBlockEviction(cache, true, false);
561 getLatch.countDown();
562 System.out.println("Gets should have returned the bloks");
563 } finally {
564 if (table != null) {
565 table.close();
570 @Test
571 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
572 Table table = null;
573 try {
574 final TableName tableName = TableName.valueOf(name.getMethodName());
575 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024);
576 // get the block cache and region
577 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
578 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
579 HRegion region =
580 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
581 HStore store = region.getStores().iterator().next();
582 CacheConfig cacheConf = store.getCacheConfig();
583 cacheConf.setEvictOnClose(true);
584 BlockCache cache = cacheConf.getBlockCache().get();
586 Put put = new Put(ROW);
587 put.addColumn(FAMILY, QUALIFIER, data);
588 table.put(put);
589 region.flush(true);
590 put = new Put(ROW1);
591 put.addColumn(FAMILY, QUALIFIER, data);
592 table.put(put);
593 region.flush(true);
594 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
595 put = new Put(ROW2);
596 put.addColumn(FAMILY, QUALIFIER2, data2);
597 table.put(put);
598 put = new Put(ROW3);
599 put.addColumn(FAMILY, QUALIFIER2, data2);
600 table.put(put);
601 region.flush(true);
602 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
603 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
604 LOG.info("About to SPLIT on " + Bytes.toString(ROW1));
605 TEST_UTIL.getAdmin().split(tableName, ROW1);
606 // Wait for splits
607 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
608 region.compact(true);
609 Iterator<CachedBlock> iterator = cache.iterator();
610 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
611 // should be closed inorder to return those blocks
612 iterateBlockCache(cache, iterator);
613 } finally {
614 if (table != null) {
615 table.close();
620 @Test
621 public void testMultiGets() throws IOException, InterruptedException {
622 Table table = null;
623 try {
624 latch = new CountDownLatch(2);
625 // Check if get() returns blocks on its close() itself
626 getLatch = new CountDownLatch(1);
627 final TableName tableName = TableName.valueOf(name.getMethodName());
628 // Create KV that will give you two blocks
629 // Create a table with block size as 1024
630 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
631 CustomInnerRegionObserver.class.getName());
632 // get the block cache and region
633 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
634 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
635 HRegion region =
636 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
637 HStore store = region.getStores().iterator().next();
638 CacheConfig cacheConf = store.getCacheConfig();
639 cacheConf.setCacheDataOnWrite(true);
640 cacheConf.setEvictOnClose(true);
641 BlockCache cache = cacheConf.getBlockCache().get();
643 Put put = new Put(ROW);
644 put.addColumn(FAMILY, QUALIFIER, data);
645 table.put(put);
646 region.flush(true);
647 put = new Put(ROW1);
648 put.addColumn(FAMILY, QUALIFIER, data);
649 table.put(put);
650 region.flush(true);
651 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
652 put = new Put(ROW);
653 put.addColumn(FAMILY, QUALIFIER2, data2);
654 table.put(put);
655 region.flush(true);
656 // flush the data
657 System.out.println("Flushing cache");
658 // Should create one Hfile with 2 blocks
659 CustomInnerRegionObserver.waitForGets.set(true);
660 // Create three sets of gets
661 MultiGetThread[] getThreads = initiateMultiGet(table);
662 Thread.sleep(200);
663 int refCount;
664 Iterator<CachedBlock> iterator = cache.iterator();
665 boolean foundNonZeroBlock = false;
666 while (iterator.hasNext()) {
667 CachedBlock next = iterator.next();
668 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
669 if (cache instanceof BucketCache) {
670 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
671 } else if (cache instanceof CombinedBlockCache) {
672 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
673 } else {
674 continue;
676 if (refCount != 0) {
677 assertEquals(NO_OF_THREADS, refCount);
678 foundNonZeroBlock = true;
681 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
682 CustomInnerRegionObserver.getCdl().get().countDown();
683 CustomInnerRegionObserver.getCdl().get().countDown();
684 for (MultiGetThread thread : getThreads) {
685 thread.join();
687 // Verify whether the gets have returned the blocks that it had
688 CustomInnerRegionObserver.waitForGets.set(true);
689 // giving some time for the block to be decremented
690 iterateBlockCache(cache, iterator);
691 getLatch.countDown();
692 System.out.println("Gets should have returned the bloks");
693 } finally {
694 if (table != null) {
695 table.close();
699 @Test
700 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
701 Table table = null;
702 try {
703 latch = new CountDownLatch(1);
704 // Check if get() returns blocks on its close() itself
705 final TableName tableName = TableName.valueOf(name.getMethodName());
706 // Create KV that will give you two blocks
707 // Create a table with block size as 1024
708 byte[][] fams = new byte[10][];
709 fams[0] = FAMILY;
710 for (int i = 1; i < 10; i++) {
711 fams[i] = (Bytes.toBytes("testFamily" + i));
713 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
714 CustomInnerRegionObserver.class.getName());
715 // get the block cache and region
716 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
717 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
718 HRegion region =
719 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
720 BlockCache cache = setCacheProperties(region);
722 Put put = new Put(ROW);
723 put.addColumn(FAMILY, QUALIFIER, data);
724 table.put(put);
725 region.flush(true);
726 put = new Put(ROW1);
727 put.addColumn(FAMILY, QUALIFIER, data);
728 table.put(put);
729 region.flush(true);
730 for (int i = 1; i < 10; i++) {
731 put = new Put(ROW);
732 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
733 table.put(put);
734 if (i % 2 == 0) {
735 region.flush(true);
738 region.flush(true);
739 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
740 put = new Put(ROW);
741 put.addColumn(FAMILY, QUALIFIER2, data2);
742 table.put(put);
743 region.flush(true);
744 // flush the data
745 System.out.println("Flushing cache");
746 // Should create one Hfile with 2 blocks
747 // Create three sets of gets
748 ScanThread[] scanThreads = initiateScan(table, true);
749 Thread.sleep(200);
750 Iterator<CachedBlock> iterator = cache.iterator();
751 boolean usedBlocksFound = false;
752 int refCount = 0;
753 int noOfBlocksWithRef = 0;
754 while (iterator.hasNext()) {
755 CachedBlock next = iterator.next();
756 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
757 if (cache instanceof BucketCache) {
758 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
759 } else if (cache instanceof CombinedBlockCache) {
760 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
761 } else {
762 continue;
764 if (refCount != 0) {
765 // Blocks will be with count 3
766 System.out.println("The refCount is " + refCount);
767 assertEquals(NO_OF_THREADS, refCount);
768 usedBlocksFound = true;
769 noOfBlocksWithRef++;
772 assertTrue(usedBlocksFound);
773 // the number of blocks referred
774 assertEquals(12, noOfBlocksWithRef);
775 CustomInnerRegionObserver.getCdl().get().countDown();
776 for (ScanThread thread : scanThreads) {
777 thread.join();
779 // giving some time for the block to be decremented
780 checkForBlockEviction(cache, true, false);
781 } finally {
782 if (table != null) {
783 table.close();
788 private BlockCache setCacheProperties(HRegion region) {
789 Iterator<HStore> strItr = region.getStores().iterator();
790 BlockCache cache = null;
791 while (strItr.hasNext()) {
792 HStore store = strItr.next();
793 CacheConfig cacheConf = store.getCacheConfig();
794 cacheConf.setCacheDataOnWrite(true);
795 cacheConf.setEvictOnClose(true);
796 // Use the last one
797 cache = cacheConf.getBlockCache().get();
799 return cache;
802 @Test
803 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
804 InterruptedException {
805 Table table = null;
806 try {
807 latch = new CountDownLatch(2);
808 // Check if get() returns blocks on its close() itself
809 getLatch = new CountDownLatch(1);
810 final TableName tableName = TableName.valueOf(name.getMethodName());
811 // Create KV that will give you two blocks
812 // Create a table with block size as 1024
813 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
814 CustomInnerRegionObserverWrapper.class.getName());
815 // get the block cache and region
816 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
817 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
818 HRegion region =
819 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
820 HStore store = region.getStores().iterator().next();
821 CacheConfig cacheConf = store.getCacheConfig();
822 cacheConf.setCacheDataOnWrite(true);
823 cacheConf.setEvictOnClose(true);
824 BlockCache cache = cacheConf.getBlockCache().get();
826 // insert data. 2 Rows are added
827 insertData(table);
828 // flush the data
829 System.out.println("Flushing cache");
830 // Should create one Hfile with 2 blocks
831 region.flush(true);
832 // CustomInnerRegionObserver.sleepTime.set(5000);
833 // Create three sets of scan
834 CustomInnerRegionObserver.waitForGets.set(true);
835 ScanThread[] scanThreads = initiateScan(table, false);
836 // Create three sets of gets
837 GetThread[] getThreads = initiateGet(table, false, false);
838 // The block would have been decremented for the scan case as it was
839 // wrapped
840 // before even the postNext hook gets executed.
841 // giving some time for the block to be decremented
842 Thread.sleep(100);
843 CustomInnerRegionObserver.waitForGets.set(false);
844 checkForBlockEviction(cache, false, false);
845 // countdown the latch
846 CustomInnerRegionObserver.getCdl().get().countDown();
847 for (GetThread thread : getThreads) {
848 thread.join();
850 getLatch.countDown();
851 for (ScanThread thread : scanThreads) {
852 thread.join();
854 } finally {
855 if (table != null) {
856 table.close();
861 @Test
862 public void testScanWithCompaction() throws IOException, InterruptedException {
863 testScanWithCompactionInternals(name.getMethodName(), false);
866 @Test
867 public void testReverseScanWithCompaction() throws IOException, InterruptedException {
868 testScanWithCompactionInternals(name.getMethodName(), true);
871 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
872 throws IOException, InterruptedException {
873 Table table = null;
874 try {
875 latch = new CountDownLatch(1);
876 compactionLatch = new CountDownLatch(1);
877 TableName tableName = TableName.valueOf(tableNameStr);
878 // Create a table with block size as 1024
879 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
880 CustomInnerRegionObserverWrapper.class.getName());
881 // get the block cache and region
882 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
883 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
884 HRegion region =
885 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
886 HStore store = region.getStores().iterator().next();
887 CacheConfig cacheConf = store.getCacheConfig();
888 cacheConf.setCacheDataOnWrite(true);
889 cacheConf.setEvictOnClose(true);
890 BlockCache cache = cacheConf.getBlockCache().get();
892 // insert data. 2 Rows are added
893 Put put = new Put(ROW);
894 put.addColumn(FAMILY, QUALIFIER, data);
895 table.put(put);
896 put = new Put(ROW1);
897 put.addColumn(FAMILY, QUALIFIER, data);
898 table.put(put);
899 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
900 // Should create one Hfile with 2 blocks
901 region.flush(true);
902 // read the data and expect same blocks, one new hit, no misses
903 int refCount = 0;
904 // Check how this miss is happening
905 // insert a second column, read the row, no new blocks, 3 new hits
906 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
907 byte[] data2 = Bytes.add(data, data);
908 put = new Put(ROW);
909 put.addColumn(FAMILY, QUALIFIER2, data2);
910 table.put(put);
911 // flush, one new block
912 System.out.println("Flushing cache");
913 region.flush(true);
914 Iterator<CachedBlock> iterator = cache.iterator();
915 iterateBlockCache(cache, iterator);
916 // Create three sets of scan
917 ScanThread[] scanThreads = initiateScan(table, reversed);
918 Thread.sleep(100);
919 iterator = cache.iterator();
920 boolean usedBlocksFound = false;
921 while (iterator.hasNext()) {
922 CachedBlock next = iterator.next();
923 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
924 if (cache instanceof BucketCache) {
925 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
926 } else if (cache instanceof CombinedBlockCache) {
927 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
928 } else {
929 continue;
931 if (refCount != 0) {
932 // Blocks will be with count 3
933 assertEquals(NO_OF_THREADS, refCount);
934 usedBlocksFound = true;
937 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
938 usedBlocksFound = false;
939 System.out.println("Compacting");
940 assertEquals(2, store.getStorefilesCount());
941 store.triggerMajorCompaction();
942 region.compact(true);
943 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
944 assertEquals(1, store.getStorefilesCount());
945 // Even after compaction is done we will have some blocks that cannot
946 // be evicted this is because the scan is still referencing them
947 iterator = cache.iterator();
948 while (iterator.hasNext()) {
949 CachedBlock next = iterator.next();
950 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
951 if (cache instanceof BucketCache) {
952 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
953 } else if (cache instanceof CombinedBlockCache) {
954 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
955 } else {
956 continue;
958 if (refCount != 0) {
959 // Blocks will be with count 3 as they are not yet cleared
960 assertEquals(NO_OF_THREADS, refCount);
961 usedBlocksFound = true;
964 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
965 // Should not throw exception
966 compactionLatch.countDown();
967 latch.countDown();
968 for (ScanThread thread : scanThreads) {
969 thread.join();
971 // by this time all blocks should have been evicted
972 iterator = cache.iterator();
973 iterateBlockCache(cache, iterator);
974 Result r = table.get(new Get(ROW));
975 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
976 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
977 // The gets would be working on new blocks
978 iterator = cache.iterator();
979 iterateBlockCache(cache, iterator);
980 } finally {
981 if (table != null) {
982 table.close();
987 @Test
988 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
989 throws IOException, InterruptedException {
990 // do flush and scan in parallel
991 Table table = null;
992 try {
993 latch = new CountDownLatch(1);
994 compactionLatch = new CountDownLatch(1);
995 final TableName tableName = TableName.valueOf(name.getMethodName());
996 // Create a table with block size as 1024
997 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
998 CustomInnerRegionObserverWrapper.class.getName());
999 // get the block cache and region
1000 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1001 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1002 HRegion region =
1003 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1004 HStore store = region.getStores().iterator().next();
1005 CacheConfig cacheConf = store.getCacheConfig();
1006 cacheConf.setCacheDataOnWrite(true);
1007 cacheConf.setEvictOnClose(true);
1008 BlockCache cache = cacheConf.getBlockCache().get();
1010 // insert data. 2 Rows are added
1011 Put put = new Put(ROW);
1012 put.addColumn(FAMILY, QUALIFIER, data);
1013 table.put(put);
1014 put = new Put(ROW1);
1015 put.addColumn(FAMILY, QUALIFIER, data);
1016 table.put(put);
1017 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1018 // Should create one Hfile with 2 blocks
1019 region.flush(true);
1020 // read the data and expect same blocks, one new hit, no misses
1021 int refCount = 0;
1022 // Check how this miss is happening
1023 // insert a second column, read the row, no new blocks, 3 new hits
1024 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1025 byte[] data2 = Bytes.add(data, data);
1026 put = new Put(ROW);
1027 put.addColumn(FAMILY, QUALIFIER2, data2);
1028 table.put(put);
1029 // flush, one new block
1030 System.out.println("Flushing cache");
1031 region.flush(true);
1032 Iterator<CachedBlock> iterator = cache.iterator();
1033 iterateBlockCache(cache, iterator);
1034 // Create three sets of scan
1035 ScanThread[] scanThreads = initiateScan(table, false);
1036 Thread.sleep(100);
1037 iterator = cache.iterator();
1038 boolean usedBlocksFound = false;
1039 while (iterator.hasNext()) {
1040 CachedBlock next = iterator.next();
1041 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1042 if (cache instanceof BucketCache) {
1043 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1044 } else if (cache instanceof CombinedBlockCache) {
1045 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1046 } else {
1047 continue;
1049 if (refCount != 0) {
1050 // Blocks will be with count 3
1051 assertEquals(NO_OF_THREADS, refCount);
1052 usedBlocksFound = true;
1055 // Make a put and do a flush
1056 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1057 data2 = Bytes.add(data, data);
1058 put = new Put(ROW1);
1059 put.addColumn(FAMILY, QUALIFIER2, data2);
1060 table.put(put);
1061 // flush, one new block
1062 System.out.println("Flushing cache");
1063 region.flush(true);
1064 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1065 usedBlocksFound = false;
1066 System.out.println("Compacting");
1067 assertEquals(3, store.getStorefilesCount());
1068 store.triggerMajorCompaction();
1069 region.compact(true);
1070 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1071 assertEquals(1, store.getStorefilesCount());
1072 // Even after compaction is done we will have some blocks that cannot
1073 // be evicted this is because the scan is still referencing them
1074 iterator = cache.iterator();
1075 while (iterator.hasNext()) {
1076 CachedBlock next = iterator.next();
1077 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1078 if (cache instanceof BucketCache) {
1079 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1080 } else if (cache instanceof CombinedBlockCache) {
1081 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1082 } else {
1083 continue;
1085 if (refCount != 0) {
1086 // Blocks will be with count 3 as they are not yet cleared
1087 assertEquals(NO_OF_THREADS, refCount);
1088 usedBlocksFound = true;
1091 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1092 // Should not throw exception
1093 compactionLatch.countDown();
1094 latch.countDown();
1095 for (ScanThread thread : scanThreads) {
1096 thread.join();
1098 // by this time all blocks should have been evicted
1099 iterator = cache.iterator();
1100 // Since a flush and compaction happened after a scan started
1101 // we need to ensure that all the original blocks of the compacted file
1102 // is also removed.
1103 iterateBlockCache(cache, iterator);
1104 Result r = table.get(new Get(ROW));
1105 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1106 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1107 // The gets would be working on new blocks
1108 iterator = cache.iterator();
1109 iterateBlockCache(cache, iterator);
1110 } finally {
1111 if (table != null) {
1112 table.close();
1118 @Test
1119 public void testScanWithException() throws IOException, InterruptedException {
1120 Table table = null;
1121 try {
1122 latch = new CountDownLatch(1);
1123 exceptionLatch = new CountDownLatch(1);
1124 final TableName tableName = TableName.valueOf(name.getMethodName());
1125 // Create KV that will give you two blocks
1126 // Create a table with block size as 1024
1127 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1128 CustomInnerRegionObserverWrapper.class.getName());
1129 // get the block cache and region
1130 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1131 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1132 HRegion region =
1133 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1134 HStore store = region.getStores().iterator().next();
1135 CacheConfig cacheConf = store.getCacheConfig();
1136 cacheConf.setCacheDataOnWrite(true);
1137 cacheConf.setEvictOnClose(true);
1138 BlockCache cache = cacheConf.getBlockCache().get();
1139 // insert data. 2 Rows are added
1140 insertData(table);
1141 // flush the data
1142 System.out.println("Flushing cache");
1143 // Should create one Hfile with 2 blocks
1144 region.flush(true);
1145 // CustomInnerRegionObserver.sleepTime.set(5000);
1146 CustomInnerRegionObserver.throwException.set(true);
1147 ScanThread[] scanThreads = initiateScan(table, false);
1148 // The block would have been decremented for the scan case as it was
1149 // wrapped
1150 // before even the postNext hook gets executed.
1151 // giving some time for the block to be decremented
1152 Thread.sleep(100);
1153 Iterator<CachedBlock> iterator = cache.iterator();
1154 boolean usedBlocksFound = false;
1155 int refCount = 0;
1156 while (iterator.hasNext()) {
1157 CachedBlock next = iterator.next();
1158 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1159 if (cache instanceof BucketCache) {
1160 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1161 } else if (cache instanceof CombinedBlockCache) {
1162 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1163 } else {
1164 continue;
1166 if (refCount != 0) {
1167 // Blocks will be with count 3
1168 assertEquals(NO_OF_THREADS, refCount);
1169 usedBlocksFound = true;
1172 assertTrue(usedBlocksFound);
1173 exceptionLatch.countDown();
1174 // countdown the latch
1175 CustomInnerRegionObserver.getCdl().get().countDown();
1176 for (ScanThread thread : scanThreads) {
1177 thread.join();
1179 iterator = cache.iterator();
1180 usedBlocksFound = false;
1181 refCount = 0;
1182 while (iterator.hasNext()) {
1183 CachedBlock next = iterator.next();
1184 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1185 if (cache instanceof BucketCache) {
1186 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1187 } else if (cache instanceof CombinedBlockCache) {
1188 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1189 } else {
1190 continue;
1192 if (refCount != 0) {
1193 // Blocks will be with count 3
1194 assertEquals(NO_OF_THREADS, refCount);
1195 usedBlocksFound = true;
1198 assertFalse(usedBlocksFound);
1199 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1200 assertEquals(0, refCount);
1201 } finally {
1202 if (table != null) {
1203 table.close();
1208 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1209 int refCount;
1210 while (iterator.hasNext()) {
1211 CachedBlock next = iterator.next();
1212 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1213 if (cache instanceof BucketCache) {
1214 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1215 } else if (cache instanceof CombinedBlockCache) {
1216 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1217 } else {
1218 continue;
1220 assertEquals(0, refCount);
1224 private void insertData(Table table) throws IOException {
1225 Put put = new Put(ROW);
1226 put.addColumn(FAMILY, QUALIFIER, data);
1227 table.put(put);
1228 put = new Put(ROW1);
1229 put.addColumn(FAMILY, QUALIFIER, data);
1230 table.put(put);
1231 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1232 put = new Put(ROW);
1233 put.addColumn(FAMILY, QUALIFIER2, data2);
1234 table.put(put);
1237 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1238 InterruptedException {
1239 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1240 for (int i = 0; i < NO_OF_THREADS; i++) {
1241 scanThreads[i] = new ScanThread(table, reverse);
1243 for (ScanThread thread : scanThreads) {
1244 thread.start();
1246 return scanThreads;
1249 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1250 throws IOException, InterruptedException {
1251 GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1252 for (int i = 0; i < NO_OF_THREADS; i++) {
1253 getThreads[i] = new GetThread(table, tracker, multipleCFs);
1255 for (GetThread thread : getThreads) {
1256 thread.start();
1258 return getThreads;
1261 private MultiGetThread[] initiateMultiGet(Table table)
1262 throws IOException, InterruptedException {
1263 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1264 for (int i = 0; i < NO_OF_THREADS; i++) {
1265 multiGetThreads[i] = new MultiGetThread(table);
1267 for (MultiGetThread thread : multiGetThreads) {
1268 thread.start();
1270 return multiGetThreads;
1273 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1274 throws InterruptedException {
1275 int counter = NO_OF_THREADS;
1276 if (CustomInnerRegionObserver.waitForGets.get()) {
1277 // Because only one row is selected, it has only 2 blocks
1278 counter = counter - 1;
1279 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1280 Thread.sleep(100);
1282 } else {
1283 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1284 Thread.sleep(100);
1287 Iterator<CachedBlock> iterator = cache.iterator();
1288 int refCount = 0;
1289 while (iterator.hasNext()) {
1290 CachedBlock next = iterator.next();
1291 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1292 if (cache instanceof BucketCache) {
1293 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1294 } else if (cache instanceof CombinedBlockCache) {
1295 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1296 } else {
1297 continue;
1299 System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1300 if (CustomInnerRegionObserver.waitForGets.get()) {
1301 if (expectOnlyZero) {
1302 assertTrue(refCount == 0);
1304 if (refCount != 0) {
1305 // Because the scan would have also touched up on these blocks but
1306 // it
1307 // would have touched
1308 // all 3
1309 if (getClosed) {
1310 // If get has closed only the scan's blocks would be available
1311 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1312 } else {
1313 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1316 } else {
1317 // Because the get would have also touched up on these blocks but it
1318 // would have touched
1319 // upon only 2 additionally
1320 if (expectOnlyZero) {
1321 assertTrue(refCount == 0);
1323 if (refCount != 0) {
1324 if (getLatch == null) {
1325 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1326 } else {
1327 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1332 CustomInnerRegionObserver.getCdl().get().countDown();
1335 private static class MultiGetThread extends Thread {
1336 private final Table table;
1337 private final List<Get> gets = new ArrayList<>();
1338 public MultiGetThread(Table table) {
1339 this.table = table;
1341 @Override
1342 public void run() {
1343 gets.add(new Get(ROW));
1344 gets.add(new Get(ROW1));
1345 try {
1346 CustomInnerRegionObserver.getCdl().set(latch);
1347 Result[] r = table.get(gets);
1348 assertTrue(Bytes.equals(r[0].getRow(), ROW));
1349 assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1350 } catch (IOException e) {
1355 private static class GetThread extends Thread {
1356 private final Table table;
1357 private final boolean tracker;
1358 private final boolean multipleCFs;
1360 public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1361 this.table = table;
1362 this.tracker = tracker;
1363 this.multipleCFs = multipleCFs;
1366 @Override
1367 public void run() {
1368 try {
1369 initiateGet(table);
1370 } catch (IOException e) {
1371 // do nothing
1375 private void initiateGet(Table table) throws IOException {
1376 Get get = new Get(ROW);
1377 if (tracker) {
1378 // Change this
1379 if (!multipleCFs) {
1380 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1381 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1382 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1383 // Unknown key
1384 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1385 } else {
1386 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1387 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1388 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1389 // Unknown key
1390 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1393 CustomInnerRegionObserver.getCdl().set(latch);
1394 Result r = table.get(get);
1395 System.out.println(r);
1396 if (!tracker) {
1397 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1398 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1399 } else {
1400 if (!multipleCFs) {
1401 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1402 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1403 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1404 } else {
1405 assertTrue(Bytes.equals(
1406 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1407 data2));
1408 assertTrue(Bytes.equals(
1409 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1410 data2));
1411 assertTrue(Bytes.equals(
1412 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1413 data2));
1419 private static class ScanThread extends Thread {
1420 private final Table table;
1421 private final boolean reverse;
1423 public ScanThread(Table table, boolean reverse) {
1424 this.table = table;
1425 this.reverse = reverse;
1428 @Override
1429 public void run() {
1430 try {
1431 initiateScan(table);
1432 } catch (IOException e) {
1433 // do nothing
1437 private void initiateScan(Table table) throws IOException {
1438 Scan scan = new Scan();
1439 if (reverse) {
1440 scan.setReversed(true);
1442 CustomInnerRegionObserver.getCdl().set(latch);
1443 ResultScanner resScanner = table.getScanner(scan);
1444 int i = (reverse ? ROWS.length - 1 : 0);
1445 boolean resultFound = false;
1446 for (Result result : resScanner) {
1447 resultFound = true;
1448 System.out.println(result);
1449 if (!reverse) {
1450 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1451 i++;
1452 } else {
1453 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1454 i--;
1457 assertTrue(resultFound);
1461 private void waitForStoreFileCount(HStore store, int count, int timeout)
1462 throws InterruptedException {
1463 long start = System.currentTimeMillis();
1464 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1465 Thread.sleep(100);
1467 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1468 store.getStorefilesCount());
1469 assertEquals(count, store.getStorefilesCount());
1472 private static class CustomScanner implements RegionScanner {
1474 private RegionScanner delegate;
1476 public CustomScanner(RegionScanner delegate) {
1477 this.delegate = delegate;
1480 @Override
1481 public boolean next(List<Cell> results) throws IOException {
1482 return delegate.next(results);
1485 @Override
1486 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1487 return delegate.next(result, scannerContext);
1490 @Override
1491 public boolean nextRaw(List<Cell> result) throws IOException {
1492 return delegate.nextRaw(result);
1495 @Override
1496 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1497 boolean nextRaw = delegate.nextRaw(result, context);
1498 if (compactionLatch != null && compactionLatch.getCount() > 0) {
1499 try {
1500 compactionLatch.await();
1501 } catch (InterruptedException ie) {
1505 if (CustomInnerRegionObserver.throwException.get()) {
1506 if (exceptionLatch.getCount() > 0) {
1507 try {
1508 exceptionLatch.await();
1509 } catch (InterruptedException e) {
1511 throw new IOException("throw exception");
1514 return nextRaw;
1517 @Override
1518 public void close() throws IOException {
1519 delegate.close();
1522 @Override
1523 public RegionInfo getRegionInfo() {
1524 return delegate.getRegionInfo();
1527 @Override
1528 public boolean isFilterDone() throws IOException {
1529 return delegate.isFilterDone();
1532 @Override
1533 public boolean reseek(byte[] row) throws IOException {
1534 return false;
1537 @Override
1538 public long getMaxResultSize() {
1539 return delegate.getMaxResultSize();
1542 @Override
1543 public long getMvccReadPoint() {
1544 return delegate.getMvccReadPoint();
1547 @Override
1548 public int getBatch() {
1549 return delegate.getBatch();
1553 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1554 @Override
1555 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1556 Scan scan, RegionScanner s) throws IOException {
1557 return new CustomScanner(s);
1561 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1562 static final AtomicInteger countOfNext = new AtomicInteger(0);
1563 static final AtomicInteger countOfGets = new AtomicInteger(0);
1564 static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1565 static final AtomicBoolean throwException = new AtomicBoolean(false);
1566 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1567 new CountDownLatch(0));
1569 @Override
1570 public Optional<RegionObserver> getRegionObserver() {
1571 return Optional.of(this);
1574 @Override
1575 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1576 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1577 slowdownCode(e, false);
1578 if (getLatch != null && getLatch.getCount() > 0) {
1579 try {
1580 getLatch.await();
1581 } catch (InterruptedException e1) {
1584 return hasMore;
1587 @Override
1588 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1589 List<Cell> results) throws IOException {
1590 slowdownCode(e, true);
1593 public static AtomicReference<CountDownLatch> getCdl() {
1594 return cdl;
1597 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1598 boolean isGet) {
1599 CountDownLatch latch = getCdl().get();
1600 try {
1601 System.out.println(latch.getCount() + " is the count " + isGet);
1602 if (latch.getCount() > 0) {
1603 if (isGet) {
1604 countOfGets.incrementAndGet();
1605 } else {
1606 countOfNext.incrementAndGet();
1608 LOG.info("Waiting for the counterCountDownLatch");
1609 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1610 if (latch.getCount() > 0) {
1611 throw new RuntimeException("Can't wait more");
1614 } catch (InterruptedException e1) {
1615 LOG.error(e1.toString(), e1);