HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestBlockEvictionFromClient.java
bloba69bc4d4dc56178825c5bc8eb8d53f86f33380d8
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Optional;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicReference;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.HBaseClassTestRule;
37 import org.apache.hadoop.hbase.HBaseTestingUtil;
38 import org.apache.hadoop.hbase.HConstants;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
42 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
43 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
44 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
45 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
46 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
47 import org.apache.hadoop.hbase.io.hfile.BlockCache;
48 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
49 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
50 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
51 import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
52 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
53 import org.apache.hadoop.hbase.regionserver.BloomType;
54 import org.apache.hadoop.hbase.regionserver.HRegion;
55 import org.apache.hadoop.hbase.regionserver.HStore;
56 import org.apache.hadoop.hbase.regionserver.InternalScanner;
57 import org.apache.hadoop.hbase.regionserver.RegionScanner;
58 import org.apache.hadoop.hbase.regionserver.ScannerContext;
59 import org.apache.hadoop.hbase.testclassification.ClientTests;
60 import org.apache.hadoop.hbase.testclassification.LargeTests;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63 import org.junit.After;
64 import org.junit.AfterClass;
65 import org.junit.Before;
66 import org.junit.BeforeClass;
67 import org.junit.ClassRule;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
75 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
77 @Category({ LargeTests.class, ClientTests.class })
78 @SuppressWarnings("deprecation")
79 public class TestBlockEvictionFromClient {
81 @ClassRule
82 public static final HBaseClassTestRule CLASS_RULE =
83 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
85 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
86 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
87 static byte[][] ROWS = new byte[2][];
88 private static int NO_OF_THREADS = 3;
89 private static byte[] ROW = Bytes.toBytes("testRow");
90 private static byte[] ROW1 = Bytes.toBytes("testRow1");
91 private static byte[] ROW2 = Bytes.toBytes("testRow2");
92 private static byte[] ROW3 = Bytes.toBytes("testRow3");
93 private static byte[] FAMILY = Bytes.toBytes("testFamily");
94 private static byte[][] FAMILIES_1 = new byte[1][0];
95 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
96 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
97 private static byte[] data = new byte[1000];
98 private static byte[] data2 = Bytes.add(data, data);
99 protected static int SLAVES = 1;
100 private static CountDownLatch latch;
101 private static CountDownLatch getLatch;
102 private static CountDownLatch compactionLatch;
103 private static CountDownLatch exceptionLatch;
105 @Rule
106 public TestName name = new TestName();
109 * @throws java.lang.Exception
111 @BeforeClass
112 public static void setUpBeforeClass() throws Exception {
113 ROWS[0] = ROW;
114 ROWS[1] = ROW1;
115 Configuration conf = TEST_UTIL.getConfiguration();
116 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
117 MultiRowMutationEndpoint.class.getName());
118 conf.setInt("hbase.regionserver.handler.count", 20);
119 conf.setInt("hbase.bucketcache.size", 400);
120 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
121 conf.setFloat("hfile.block.cache.size", 0.2f);
122 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
123 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
124 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
125 FAMILIES_1[0] = FAMILY;
126 TEST_UTIL.startMiniCluster(SLAVES);
130 * @throws java.lang.Exception
132 @AfterClass
133 public static void tearDownAfterClass() throws Exception {
134 TEST_UTIL.shutdownMiniCluster();
138 * @throws java.lang.Exception
140 @Before
141 public void setUp() throws Exception {
142 CustomInnerRegionObserver.waitForGets.set(false);
143 CustomInnerRegionObserver.countOfNext.set(0);
144 CustomInnerRegionObserver.countOfGets.set(0);
148 * @throws java.lang.Exception
150 @After
151 public void tearDown() throws Exception {
152 if (latch != null) {
153 while (latch.getCount() > 0) {
154 latch.countDown();
157 if (getLatch != null) {
158 getLatch.countDown();
160 if (compactionLatch != null) {
161 compactionLatch.countDown();
163 if (exceptionLatch != null) {
164 exceptionLatch.countDown();
166 latch = null;
167 getLatch = null;
168 compactionLatch = null;
169 exceptionLatch = null;
170 CustomInnerRegionObserver.throwException.set(false);
171 // Clean up the tables for every test case
172 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
173 for (TableName tableName : listTableNames) {
174 if (!tableName.isSystemTable()) {
175 TEST_UTIL.getAdmin().disableTable(tableName);
176 TEST_UTIL.getAdmin().deleteTable(tableName);
181 @Test
182 public void testBlockEvictionWithParallelScans() throws Exception {
183 Table table = null;
184 try {
185 latch = new CountDownLatch(1);
186 final TableName tableName = TableName.valueOf(name.getMethodName());
187 // Create a table with block size as 1024
188 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
189 CustomInnerRegionObserver.class.getName());
190 // get the block cache and region
191 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
192 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
193 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
194 .getRegion(regionName);
195 HStore store = region.getStores().iterator().next();
196 CacheConfig cacheConf = store.getCacheConfig();
197 cacheConf.setCacheDataOnWrite(true);
198 cacheConf.setEvictOnClose(true);
199 BlockCache cache = cacheConf.getBlockCache().get();
201 // insert data. 2 Rows are added
202 Put put = new Put(ROW);
203 put.addColumn(FAMILY, QUALIFIER, data);
204 table.put(put);
205 put = new Put(ROW1);
206 put.addColumn(FAMILY, QUALIFIER, data);
207 table.put(put);
208 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
209 // data was in memstore so don't expect any changes
210 // flush the data
211 // Should create one Hfile with 2 blocks
212 region.flush(true);
213 // Load cache
214 // Create three sets of scan
215 ScanThread[] scanThreads = initiateScan(table, false);
216 Thread.sleep(100);
217 checkForBlockEviction(cache, false, false);
218 for (ScanThread thread : scanThreads) {
219 thread.join();
221 // CustomInnerRegionObserver.sleepTime.set(0);
222 Iterator<CachedBlock> iterator = cache.iterator();
223 iterateBlockCache(cache, iterator);
224 // read the data and expect same blocks, one new hit, no misses
225 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
226 iterator = cache.iterator();
227 iterateBlockCache(cache, iterator);
228 // Check how this miss is happening
229 // insert a second column, read the row, no new blocks, 3 new hits
230 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
231 byte[] data2 = Bytes.add(data, data);
232 put = new Put(ROW);
233 put.addColumn(FAMILY, QUALIFIER2, data2);
234 table.put(put);
235 Result r = table.get(new Get(ROW));
236 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
237 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
238 iterator = cache.iterator();
239 iterateBlockCache(cache, iterator);
240 // flush, one new block
241 System.out.println("Flushing cache");
242 region.flush(true);
243 iterator = cache.iterator();
244 iterateBlockCache(cache, iterator);
245 // compact, net minus two blocks, two hits, no misses
246 System.out.println("Compacting");
247 assertEquals(2, store.getStorefilesCount());
248 store.triggerMajorCompaction();
249 region.compact(true);
250 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
251 assertEquals(1, store.getStorefilesCount());
252 iterator = cache.iterator();
253 iterateBlockCache(cache, iterator);
254 // read the row, this should be a cache miss because we don't cache data
255 // blocks on compaction
256 r = table.get(new Get(ROW));
257 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
258 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
259 iterator = cache.iterator();
260 iterateBlockCache(cache, iterator);
261 } finally {
262 if (table != null) {
263 table.close();
268 @Test
269 public void testParallelGetsAndScans() throws IOException, InterruptedException {
270 Table table = null;
271 try {
272 latch = new CountDownLatch(2);
273 // Check if get() returns blocks on its close() itself
274 getLatch = new CountDownLatch(1);
275 final TableName tableName = TableName.valueOf(name.getMethodName());
276 // Create KV that will give you two blocks
277 // Create a table with block size as 1024
278 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
279 CustomInnerRegionObserver.class.getName());
280 // get the block cache and region
281 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
282 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
283 HRegion region =
284 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
285 HStore store = region.getStores().iterator().next();
286 CacheConfig cacheConf = store.getCacheConfig();
287 cacheConf.setCacheDataOnWrite(true);
288 cacheConf.setEvictOnClose(true);
289 BlockCache cache = cacheConf.getBlockCache().get();
291 insertData(table);
292 // flush the data
293 System.out.println("Flushing cache");
294 // Should create one Hfile with 2 blocks
295 region.flush(true);
296 // Create three sets of scan
297 CustomInnerRegionObserver.waitForGets.set(true);
298 ScanThread[] scanThreads = initiateScan(table, false);
299 // Create three sets of gets
300 GetThread[] getThreads = initiateGet(table, false, false);
301 checkForBlockEviction(cache, false, false);
302 CustomInnerRegionObserver.waitForGets.set(false);
303 checkForBlockEviction(cache, false, false);
304 for (GetThread thread : getThreads) {
305 thread.join();
307 // Verify whether the gets have returned the blocks that it had
308 CustomInnerRegionObserver.waitForGets.set(true);
309 // giving some time for the block to be decremented
310 checkForBlockEviction(cache, true, false);
311 getLatch.countDown();
312 for (ScanThread thread : scanThreads) {
313 thread.join();
315 System.out.println("Scans should have returned the bloks");
316 // Check with either true or false
317 CustomInnerRegionObserver.waitForGets.set(false);
318 // The scan should also have released the blocks by now
319 checkForBlockEviction(cache, true, true);
320 } finally {
321 if (table != null) {
322 table.close();
327 @Test
328 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
329 Table table = null;
330 try {
331 latch = new CountDownLatch(1);
332 // Check if get() returns blocks on its close() itself
333 getLatch = new CountDownLatch(1);
334 final TableName tableName = TableName.valueOf(name.getMethodName());
335 // Create KV that will give you two blocks
336 // Create a table with block size as 1024
337 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
338 CustomInnerRegionObserver.class.getName());
339 // get the block cache and region
340 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
341 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
342 HRegion region =
343 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
344 HStore store = region.getStores().iterator().next();
345 CacheConfig cacheConf = store.getCacheConfig();
346 cacheConf.setCacheDataOnWrite(true);
347 cacheConf.setEvictOnClose(true);
348 BlockCache cache = cacheConf.getBlockCache().get();
350 Put put = new Put(ROW);
351 put.addColumn(FAMILY, QUALIFIER, data);
352 table.put(put);
353 region.flush(true);
354 put = new Put(ROW1);
355 put.addColumn(FAMILY, QUALIFIER, data);
356 table.put(put);
357 region.flush(true);
358 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
359 put = new Put(ROW);
360 put.addColumn(FAMILY, QUALIFIER2, data2);
361 table.put(put);
362 region.flush(true);
363 // flush the data
364 System.out.println("Flushing cache");
365 // Should create one Hfile with 2 blocks
366 CustomInnerRegionObserver.waitForGets.set(true);
367 // Create three sets of gets
368 GetThread[] getThreads = initiateGet(table, false, false);
369 Thread.sleep(200);
370 CustomInnerRegionObserver.getCdl().get().countDown();
371 for (GetThread thread : getThreads) {
372 thread.join();
374 // Verify whether the gets have returned the blocks that it had
375 CustomInnerRegionObserver.waitForGets.set(true);
376 // giving some time for the block to be decremented
377 checkForBlockEviction(cache, true, false);
378 getLatch.countDown();
379 System.out.println("Gets should have returned the bloks");
380 } finally {
381 if (table != null) {
382 table.close();
387 @Test
388 // TODO : check how block index works here
389 public void testGetsWithMultiColumnsAndExplicitTracker()
390 throws IOException, InterruptedException {
391 Table table = null;
392 try {
393 latch = new CountDownLatch(1);
394 // Check if get() returns blocks on its close() itself
395 getLatch = new CountDownLatch(1);
396 final TableName tableName = TableName.valueOf(name.getMethodName());
397 // Create KV that will give you two blocks
398 // Create a table with block size as 1024
399 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
400 CustomInnerRegionObserver.class.getName());
401 // get the block cache and region
402 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
403 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
404 HRegion region =
405 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
406 BlockCache cache = setCacheProperties(region);
407 Put put = new Put(ROW);
408 put.addColumn(FAMILY, QUALIFIER, data);
409 table.put(put);
410 region.flush(true);
411 put = new Put(ROW1);
412 put.addColumn(FAMILY, QUALIFIER, data);
413 table.put(put);
414 region.flush(true);
415 for (int i = 1; i < 10; i++) {
416 put = new Put(ROW);
417 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
418 table.put(put);
419 if (i % 2 == 0) {
420 region.flush(true);
423 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
424 put = new Put(ROW);
425 put.addColumn(FAMILY, QUALIFIER2, data2);
426 table.put(put);
427 region.flush(true);
428 // flush the data
429 System.out.println("Flushing cache");
430 // Should create one Hfile with 2 blocks
431 CustomInnerRegionObserver.waitForGets.set(true);
432 // Create three sets of gets
433 GetThread[] getThreads = initiateGet(table, true, false);
434 Thread.sleep(200);
435 Iterator<CachedBlock> iterator = cache.iterator();
436 boolean usedBlocksFound = false;
437 int refCount = 0;
438 int noOfBlocksWithRef = 0;
439 while (iterator.hasNext()) {
440 CachedBlock next = iterator.next();
441 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
442 if (cache instanceof BucketCache) {
443 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
444 } else if (cache instanceof CombinedBlockCache) {
445 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
446 } else {
447 continue;
449 if (refCount != 0) {
450 // Blocks will be with count 3
451 System.out.println("The refCount is " + refCount);
452 assertEquals(NO_OF_THREADS, refCount);
453 usedBlocksFound = true;
454 noOfBlocksWithRef++;
457 assertTrue(usedBlocksFound);
458 // the number of blocks referred
459 assertEquals(10, noOfBlocksWithRef);
460 CustomInnerRegionObserver.getCdl().get().countDown();
461 for (GetThread thread : getThreads) {
462 thread.join();
464 // Verify whether the gets have returned the blocks that it had
465 CustomInnerRegionObserver.waitForGets.set(true);
466 // giving some time for the block to be decremented
467 checkForBlockEviction(cache, true, false);
468 getLatch.countDown();
469 System.out.println("Gets should have returned the bloks");
470 } finally {
471 if (table != null) {
472 table.close();
477 @Test
478 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
479 Table table = null;
480 try {
481 latch = new CountDownLatch(1);
482 // Check if get() returns blocks on its close() itself
483 getLatch = new CountDownLatch(1);
484 final TableName tableName = TableName.valueOf(name.getMethodName());
485 // Create KV that will give you two blocks
486 // Create a table with block size as 1024
487 byte[][] fams = new byte[10][];
488 fams[0] = FAMILY;
489 for (int i = 1; i < 10; i++) {
490 fams[i] = (Bytes.toBytes("testFamily" + i));
492 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
493 CustomInnerRegionObserver.class.getName());
494 // get the block cache and region
495 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
496 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
497 HRegion region =
498 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
499 BlockCache cache = setCacheProperties(region);
501 Put put = new Put(ROW);
502 put.addColumn(FAMILY, QUALIFIER, data);
503 table.put(put);
504 region.flush(true);
505 put = new Put(ROW1);
506 put.addColumn(FAMILY, QUALIFIER, data);
507 table.put(put);
508 region.flush(true);
509 for (int i = 1; i < 10; i++) {
510 put = new Put(ROW);
511 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
512 table.put(put);
513 if (i % 2 == 0) {
514 region.flush(true);
517 region.flush(true);
518 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
519 put = new Put(ROW);
520 put.addColumn(FAMILY, QUALIFIER2, data2);
521 table.put(put);
522 region.flush(true);
523 // flush the data
524 System.out.println("Flushing cache");
525 // Should create one Hfile with 2 blocks
526 CustomInnerRegionObserver.waitForGets.set(true);
527 // Create three sets of gets
528 GetThread[] getThreads = initiateGet(table, true, true);
529 Thread.sleep(200);
530 Iterator<CachedBlock> iterator = cache.iterator();
531 boolean usedBlocksFound = false;
532 int refCount = 0;
533 int noOfBlocksWithRef = 0;
534 while (iterator.hasNext()) {
535 CachedBlock next = iterator.next();
536 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
537 if (cache instanceof BucketCache) {
538 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
539 } else if (cache instanceof CombinedBlockCache) {
540 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
541 } else {
542 continue;
544 if (refCount != 0) {
545 // Blocks will be with count 3
546 System.out.println("The refCount is " + refCount);
547 assertEquals(NO_OF_THREADS, refCount);
548 usedBlocksFound = true;
549 noOfBlocksWithRef++;
552 assertTrue(usedBlocksFound);
553 // the number of blocks referred
554 assertEquals(3, noOfBlocksWithRef);
555 CustomInnerRegionObserver.getCdl().get().countDown();
556 for (GetThread thread : getThreads) {
557 thread.join();
559 // Verify whether the gets have returned the blocks that it had
560 CustomInnerRegionObserver.waitForGets.set(true);
561 // giving some time for the block to be decremented
562 checkForBlockEviction(cache, true, false);
563 getLatch.countDown();
564 System.out.println("Gets should have returned the bloks");
565 } finally {
566 if (table != null) {
567 table.close();
572 @Test
573 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
574 Table table = null;
575 try {
576 final TableName tableName = TableName.valueOf(name.getMethodName());
577 TableDescriptor desc = TEST_UTIL.createTableDescriptor(tableName);
578 // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
579 // two daughter regions are opened and a compaction is scheduled to get rid of reference
580 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
581 // It is flakey since compaction can kick in anytime. To solve this issue, table is created
582 // with compaction disabled.
583 table = TEST_UTIL.createTable(
584 TableDescriptorBuilder.newBuilder(desc).setCompactionEnabled(false).build(), FAMILIES_1,
585 null, BloomType.ROW, 1024, null);
586 // get the block cache and region
587 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
588 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
589 HRegion region =
590 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
591 HStore store = region.getStores().iterator().next();
592 CacheConfig cacheConf = store.getCacheConfig();
593 cacheConf.setEvictOnClose(true);
594 BlockCache cache = cacheConf.getBlockCache().get();
596 Put put = new Put(ROW);
597 put.addColumn(FAMILY, QUALIFIER, data);
598 table.put(put);
599 region.flush(true);
600 put = new Put(ROW1);
601 put.addColumn(FAMILY, QUALIFIER, data);
602 table.put(put);
603 region.flush(true);
604 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
605 put = new Put(ROW2);
606 put.addColumn(FAMILY, QUALIFIER2, data2);
607 table.put(put);
608 put = new Put(ROW3);
609 put.addColumn(FAMILY, QUALIFIER2, data2);
610 table.put(put);
611 region.flush(true);
612 ServerName rs = Iterables.getOnlyElement(TEST_UTIL.getAdmin().getRegionServers());
613 int regionCount = TEST_UTIL.getAdmin().getRegions(rs).size();
614 LOG.info("About to SPLIT on {} {}, count={}", Bytes.toString(ROW1), region.getRegionInfo(),
615 regionCount);
616 TEST_UTIL.getAdmin().split(tableName, ROW1);
617 // Wait for splits
618 TEST_UTIL.waitFor(60000, () -> TEST_UTIL.getAdmin().getRegions(rs).size() > regionCount);
619 region.compact(true);
620 List<HRegion> regions = TEST_UTIL.getMiniHBaseCluster().getRegionServer(rs).getRegions();
621 for (HRegion r: regions) {
622 LOG.info("" + r.getCompactionState());
623 TEST_UTIL.waitFor(30000, () -> r.getCompactionState().equals(CompactionState.NONE));
625 LOG.info("Split finished, is region closed {} {}", region.isClosed(), cache);
626 Iterator<CachedBlock> iterator = cache.iterator();
627 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
628 // should be closed inorder to return those blocks
629 iterateBlockCache(cache, iterator);
630 } finally {
631 if (table != null) {
632 table.close();
637 @Test
638 public void testMultiGets() throws IOException, InterruptedException {
639 Table table = null;
640 try {
641 latch = new CountDownLatch(2);
642 // Check if get() returns blocks on its close() itself
643 getLatch = new CountDownLatch(1);
644 final TableName tableName = TableName.valueOf(name.getMethodName());
645 // Create KV that will give you two blocks
646 // Create a table with block size as 1024
647 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
648 CustomInnerRegionObserver.class.getName());
649 // get the block cache and region
650 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
651 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
652 HRegion region =
653 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
654 HStore store = region.getStores().iterator().next();
655 CacheConfig cacheConf = store.getCacheConfig();
656 cacheConf.setCacheDataOnWrite(true);
657 cacheConf.setEvictOnClose(true);
658 BlockCache cache = cacheConf.getBlockCache().get();
660 Put put = new Put(ROW);
661 put.addColumn(FAMILY, QUALIFIER, data);
662 table.put(put);
663 region.flush(true);
664 put = new Put(ROW1);
665 put.addColumn(FAMILY, QUALIFIER, data);
666 table.put(put);
667 region.flush(true);
668 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
669 put = new Put(ROW);
670 put.addColumn(FAMILY, QUALIFIER2, data2);
671 table.put(put);
672 region.flush(true);
673 // flush the data
674 System.out.println("Flushing cache");
675 // Should create one Hfile with 2 blocks
676 CustomInnerRegionObserver.waitForGets.set(true);
677 // Create three sets of gets
678 MultiGetThread[] getThreads = initiateMultiGet(table);
679 Thread.sleep(200);
680 int refCount;
681 Iterator<CachedBlock> iterator = cache.iterator();
682 boolean foundNonZeroBlock = false;
683 while (iterator.hasNext()) {
684 CachedBlock next = iterator.next();
685 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
686 if (cache instanceof BucketCache) {
687 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
688 } else if (cache instanceof CombinedBlockCache) {
689 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
690 } else {
691 continue;
693 if (refCount != 0) {
694 assertEquals(NO_OF_THREADS, refCount);
695 foundNonZeroBlock = true;
698 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
699 CustomInnerRegionObserver.getCdl().get().countDown();
700 CustomInnerRegionObserver.getCdl().get().countDown();
701 for (MultiGetThread thread : getThreads) {
702 thread.join();
704 // Verify whether the gets have returned the blocks that it had
705 CustomInnerRegionObserver.waitForGets.set(true);
706 // giving some time for the block to be decremented
707 iterateBlockCache(cache, iterator);
708 getLatch.countDown();
709 System.out.println("Gets should have returned the bloks");
710 } finally {
711 if (table != null) {
712 table.close();
716 @Test
717 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
718 Table table = null;
719 try {
720 latch = new CountDownLatch(1);
721 // Check if get() returns blocks on its close() itself
722 final TableName tableName = TableName.valueOf(name.getMethodName());
723 // Create KV that will give you two blocks
724 // Create a table with block size as 1024
725 byte[][] fams = new byte[10][];
726 fams[0] = FAMILY;
727 for (int i = 1; i < 10; i++) {
728 fams[i] = (Bytes.toBytes("testFamily" + i));
730 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
731 CustomInnerRegionObserver.class.getName());
732 // get the block cache and region
733 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
734 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
735 HRegion region =
736 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
737 BlockCache cache = setCacheProperties(region);
739 Put put = new Put(ROW);
740 put.addColumn(FAMILY, QUALIFIER, data);
741 table.put(put);
742 region.flush(true);
743 put = new Put(ROW1);
744 put.addColumn(FAMILY, QUALIFIER, data);
745 table.put(put);
746 region.flush(true);
747 for (int i = 1; i < 10; i++) {
748 put = new Put(ROW);
749 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
750 table.put(put);
751 if (i % 2 == 0) {
752 region.flush(true);
755 region.flush(true);
756 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
757 put = new Put(ROW);
758 put.addColumn(FAMILY, QUALIFIER2, data2);
759 table.put(put);
760 region.flush(true);
761 // flush the data
762 System.out.println("Flushing cache");
763 // Should create one Hfile with 2 blocks
764 // Create three sets of gets
765 ScanThread[] scanThreads = initiateScan(table, true);
766 Thread.sleep(200);
767 Iterator<CachedBlock> iterator = cache.iterator();
768 boolean usedBlocksFound = false;
769 int refCount = 0;
770 int noOfBlocksWithRef = 0;
771 while (iterator.hasNext()) {
772 CachedBlock next = iterator.next();
773 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
774 if (cache instanceof BucketCache) {
775 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
776 } else if (cache instanceof CombinedBlockCache) {
777 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
778 } else {
779 continue;
781 if (refCount != 0) {
782 // Blocks will be with count 3
783 System.out.println("The refCount is " + refCount);
784 assertEquals(NO_OF_THREADS, refCount);
785 usedBlocksFound = true;
786 noOfBlocksWithRef++;
789 assertTrue(usedBlocksFound);
790 // the number of blocks referred
791 assertEquals(12, noOfBlocksWithRef);
792 CustomInnerRegionObserver.getCdl().get().countDown();
793 for (ScanThread thread : scanThreads) {
794 thread.join();
796 // giving some time for the block to be decremented
797 checkForBlockEviction(cache, true, false);
798 } finally {
799 if (table != null) {
800 table.close();
805 private BlockCache setCacheProperties(HRegion region) {
806 Iterator<HStore> strItr = region.getStores().iterator();
807 BlockCache cache = null;
808 while (strItr.hasNext()) {
809 HStore store = strItr.next();
810 CacheConfig cacheConf = store.getCacheConfig();
811 cacheConf.setCacheDataOnWrite(true);
812 cacheConf.setEvictOnClose(true);
813 // Use the last one
814 cache = cacheConf.getBlockCache().get();
816 return cache;
819 @Test
820 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
821 InterruptedException {
822 Table table = null;
823 try {
824 latch = new CountDownLatch(2);
825 // Check if get() returns blocks on its close() itself
826 getLatch = new CountDownLatch(1);
827 final TableName tableName = TableName.valueOf(name.getMethodName());
828 // Create KV that will give you two blocks
829 // Create a table with block size as 1024
830 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
831 CustomInnerRegionObserverWrapper.class.getName());
832 // get the block cache and region
833 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
834 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
835 HRegion region =
836 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
837 HStore store = region.getStores().iterator().next();
838 CacheConfig cacheConf = store.getCacheConfig();
839 cacheConf.setCacheDataOnWrite(true);
840 cacheConf.setEvictOnClose(true);
841 BlockCache cache = cacheConf.getBlockCache().get();
843 // insert data. 2 Rows are added
844 insertData(table);
845 // flush the data
846 System.out.println("Flushing cache");
847 // Should create one Hfile with 2 blocks
848 region.flush(true);
849 // CustomInnerRegionObserver.sleepTime.set(5000);
850 // Create three sets of scan
851 CustomInnerRegionObserver.waitForGets.set(true);
852 ScanThread[] scanThreads = initiateScan(table, false);
853 // Create three sets of gets
854 GetThread[] getThreads = initiateGet(table, false, false);
855 // The block would have been decremented for the scan case as it was
856 // wrapped
857 // before even the postNext hook gets executed.
858 // giving some time for the block to be decremented
859 Thread.sleep(100);
860 CustomInnerRegionObserver.waitForGets.set(false);
861 checkForBlockEviction(cache, false, false);
862 // countdown the latch
863 CustomInnerRegionObserver.getCdl().get().countDown();
864 for (GetThread thread : getThreads) {
865 thread.join();
867 getLatch.countDown();
868 for (ScanThread thread : scanThreads) {
869 thread.join();
871 } finally {
872 if (table != null) {
873 table.close();
878 @Test
879 public void testScanWithCompaction() throws IOException, InterruptedException {
880 testScanWithCompactionInternals(name.getMethodName(), false);
883 @Test
884 public void testReverseScanWithCompaction() throws IOException, InterruptedException {
885 testScanWithCompactionInternals(name.getMethodName(), true);
888 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
889 throws IOException, InterruptedException {
890 Table table = null;
891 try {
892 latch = new CountDownLatch(1);
893 compactionLatch = new CountDownLatch(1);
894 TableName tableName = TableName.valueOf(tableNameStr);
895 // Create a table with block size as 1024
896 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
897 CustomInnerRegionObserverWrapper.class.getName());
898 // get the block cache and region
899 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
900 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
901 HRegion region =
902 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
903 HStore store = region.getStores().iterator().next();
904 CacheConfig cacheConf = store.getCacheConfig();
905 cacheConf.setCacheDataOnWrite(true);
906 cacheConf.setEvictOnClose(true);
907 BlockCache cache = cacheConf.getBlockCache().get();
909 // insert data. 2 Rows are added
910 Put put = new Put(ROW);
911 put.addColumn(FAMILY, QUALIFIER, data);
912 table.put(put);
913 put = new Put(ROW1);
914 put.addColumn(FAMILY, QUALIFIER, data);
915 table.put(put);
916 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
917 // Should create one Hfile with 2 blocks
918 region.flush(true);
919 // read the data and expect same blocks, one new hit, no misses
920 int refCount = 0;
921 // Check how this miss is happening
922 // insert a second column, read the row, no new blocks, 3 new hits
923 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
924 byte[] data2 = Bytes.add(data, data);
925 put = new Put(ROW);
926 put.addColumn(FAMILY, QUALIFIER2, data2);
927 table.put(put);
928 // flush, one new block
929 System.out.println("Flushing cache");
930 region.flush(true);
931 Iterator<CachedBlock> iterator = cache.iterator();
932 iterateBlockCache(cache, iterator);
933 // Create three sets of scan
934 ScanThread[] scanThreads = initiateScan(table, reversed);
935 Thread.sleep(100);
936 iterator = cache.iterator();
937 boolean usedBlocksFound = false;
938 while (iterator.hasNext()) {
939 CachedBlock next = iterator.next();
940 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
941 if (cache instanceof BucketCache) {
942 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
943 } else if (cache instanceof CombinedBlockCache) {
944 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
945 } else {
946 continue;
948 if (refCount != 0) {
949 // Blocks will be with count 3
950 assertEquals(NO_OF_THREADS, refCount);
951 usedBlocksFound = true;
954 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
955 usedBlocksFound = false;
956 System.out.println("Compacting");
957 assertEquals(2, store.getStorefilesCount());
958 store.triggerMajorCompaction();
959 region.compact(true);
960 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
961 assertEquals(1, store.getStorefilesCount());
962 // Even after compaction is done we will have some blocks that cannot
963 // be evicted this is because the scan is still referencing them
964 iterator = cache.iterator();
965 while (iterator.hasNext()) {
966 CachedBlock next = iterator.next();
967 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
968 if (cache instanceof BucketCache) {
969 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
970 } else if (cache instanceof CombinedBlockCache) {
971 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
972 } else {
973 continue;
975 if (refCount != 0) {
976 // Blocks will be with count 3 as they are not yet cleared
977 assertEquals(NO_OF_THREADS, refCount);
978 usedBlocksFound = true;
981 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
982 // Should not throw exception
983 compactionLatch.countDown();
984 latch.countDown();
985 for (ScanThread thread : scanThreads) {
986 thread.join();
988 // by this time all blocks should have been evicted
989 iterator = cache.iterator();
990 iterateBlockCache(cache, iterator);
991 Result r = table.get(new Get(ROW));
992 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
993 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
994 // The gets would be working on new blocks
995 iterator = cache.iterator();
996 iterateBlockCache(cache, iterator);
997 } finally {
998 if (table != null) {
999 table.close();
1004 @Test
1005 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
1006 throws IOException, InterruptedException {
1007 // do flush and scan in parallel
1008 Table table = null;
1009 try {
1010 latch = new CountDownLatch(1);
1011 compactionLatch = new CountDownLatch(1);
1012 final TableName tableName = TableName.valueOf(name.getMethodName());
1013 // Create a table with block size as 1024
1014 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1015 CustomInnerRegionObserverWrapper.class.getName());
1016 // get the block cache and region
1017 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1018 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1019 HRegion region =
1020 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1021 HStore store = region.getStores().iterator().next();
1022 CacheConfig cacheConf = store.getCacheConfig();
1023 cacheConf.setCacheDataOnWrite(true);
1024 cacheConf.setEvictOnClose(true);
1025 BlockCache cache = cacheConf.getBlockCache().get();
1027 // insert data. 2 Rows are added
1028 Put put = new Put(ROW);
1029 put.addColumn(FAMILY, QUALIFIER, data);
1030 table.put(put);
1031 put = new Put(ROW1);
1032 put.addColumn(FAMILY, QUALIFIER, data);
1033 table.put(put);
1034 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1035 // Should create one Hfile with 2 blocks
1036 region.flush(true);
1037 // read the data and expect same blocks, one new hit, no misses
1038 int refCount = 0;
1039 // Check how this miss is happening
1040 // insert a second column, read the row, no new blocks, 3 new hits
1041 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1042 byte[] data2 = Bytes.add(data, data);
1043 put = new Put(ROW);
1044 put.addColumn(FAMILY, QUALIFIER2, data2);
1045 table.put(put);
1046 // flush, one new block
1047 System.out.println("Flushing cache");
1048 region.flush(true);
1049 Iterator<CachedBlock> iterator = cache.iterator();
1050 iterateBlockCache(cache, iterator);
1051 // Create three sets of scan
1052 ScanThread[] scanThreads = initiateScan(table, false);
1053 Thread.sleep(100);
1054 iterator = cache.iterator();
1055 boolean usedBlocksFound = false;
1056 while (iterator.hasNext()) {
1057 CachedBlock next = iterator.next();
1058 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1059 if (cache instanceof BucketCache) {
1060 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1061 } else if (cache instanceof CombinedBlockCache) {
1062 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1063 } else {
1064 continue;
1066 if (refCount != 0) {
1067 // Blocks will be with count 3
1068 assertEquals(NO_OF_THREADS, refCount);
1069 usedBlocksFound = true;
1072 // Make a put and do a flush
1073 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1074 data2 = Bytes.add(data, data);
1075 put = new Put(ROW1);
1076 put.addColumn(FAMILY, QUALIFIER2, data2);
1077 table.put(put);
1078 // flush, one new block
1079 System.out.println("Flushing cache");
1080 region.flush(true);
1081 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1082 usedBlocksFound = false;
1083 System.out.println("Compacting");
1084 assertEquals(3, store.getStorefilesCount());
1085 store.triggerMajorCompaction();
1086 region.compact(true);
1087 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1088 assertEquals(1, store.getStorefilesCount());
1089 // Even after compaction is done we will have some blocks that cannot
1090 // be evicted this is because the scan is still referencing them
1091 iterator = cache.iterator();
1092 while (iterator.hasNext()) {
1093 CachedBlock next = iterator.next();
1094 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1095 if (cache instanceof BucketCache) {
1096 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1097 } else if (cache instanceof CombinedBlockCache) {
1098 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1099 } else {
1100 continue;
1102 if (refCount != 0) {
1103 // Blocks will be with count 3 as they are not yet cleared
1104 assertEquals(NO_OF_THREADS, refCount);
1105 usedBlocksFound = true;
1108 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1109 // Should not throw exception
1110 compactionLatch.countDown();
1111 latch.countDown();
1112 for (ScanThread thread : scanThreads) {
1113 thread.join();
1115 // by this time all blocks should have been evicted
1116 iterator = cache.iterator();
1117 // Since a flush and compaction happened after a scan started
1118 // we need to ensure that all the original blocks of the compacted file
1119 // is also removed.
1120 iterateBlockCache(cache, iterator);
1121 Result r = table.get(new Get(ROW));
1122 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1123 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1124 // The gets would be working on new blocks
1125 iterator = cache.iterator();
1126 iterateBlockCache(cache, iterator);
1127 } finally {
1128 if (table != null) {
1129 table.close();
1135 @Test
1136 public void testScanWithException() throws IOException, InterruptedException {
1137 Table table = null;
1138 try {
1139 latch = new CountDownLatch(1);
1140 exceptionLatch = new CountDownLatch(1);
1141 final TableName tableName = TableName.valueOf(name.getMethodName());
1142 // Create KV that will give you two blocks
1143 // Create a table with block size as 1024
1144 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1145 CustomInnerRegionObserverWrapper.class.getName());
1146 // get the block cache and region
1147 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1148 String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
1149 HRegion region =
1150 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1151 HStore store = region.getStores().iterator().next();
1152 CacheConfig cacheConf = store.getCacheConfig();
1153 cacheConf.setCacheDataOnWrite(true);
1154 cacheConf.setEvictOnClose(true);
1155 BlockCache cache = cacheConf.getBlockCache().get();
1156 // insert data. 2 Rows are added
1157 insertData(table);
1158 // flush the data
1159 System.out.println("Flushing cache");
1160 // Should create one Hfile with 2 blocks
1161 region.flush(true);
1162 // CustomInnerRegionObserver.sleepTime.set(5000);
1163 CustomInnerRegionObserver.throwException.set(true);
1164 ScanThread[] scanThreads = initiateScan(table, false);
1165 // The block would have been decremented for the scan case as it was
1166 // wrapped
1167 // before even the postNext hook gets executed.
1168 // giving some time for the block to be decremented
1169 Thread.sleep(100);
1170 Iterator<CachedBlock> iterator = cache.iterator();
1171 boolean usedBlocksFound = false;
1172 int refCount = 0;
1173 while (iterator.hasNext()) {
1174 CachedBlock next = iterator.next();
1175 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1176 if (cache instanceof BucketCache) {
1177 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1178 } else if (cache instanceof CombinedBlockCache) {
1179 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1180 } else {
1181 continue;
1183 if (refCount != 0) {
1184 // Blocks will be with count 3
1185 assertEquals(NO_OF_THREADS, refCount);
1186 usedBlocksFound = true;
1189 assertTrue(usedBlocksFound);
1190 exceptionLatch.countDown();
1191 // countdown the latch
1192 CustomInnerRegionObserver.getCdl().get().countDown();
1193 for (ScanThread thread : scanThreads) {
1194 thread.join();
1196 iterator = cache.iterator();
1197 usedBlocksFound = false;
1198 refCount = 0;
1199 while (iterator.hasNext()) {
1200 CachedBlock next = iterator.next();
1201 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1202 if (cache instanceof BucketCache) {
1203 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1204 } else if (cache instanceof CombinedBlockCache) {
1205 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1206 } else {
1207 continue;
1209 if (refCount != 0) {
1210 // Blocks will be with count 3
1211 assertEquals(NO_OF_THREADS, refCount);
1212 usedBlocksFound = true;
1215 assertFalse(usedBlocksFound);
1216 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1217 assertEquals(0, refCount);
1218 } finally {
1219 if (table != null) {
1220 table.close();
1225 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1226 int refCount;
1227 while (iterator.hasNext()) {
1228 CachedBlock next = iterator.next();
1229 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1230 if (cache instanceof BucketCache) {
1231 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1232 LOG.info("BucketCache {} {}", cacheKey, refCount);
1233 } else if (cache instanceof CombinedBlockCache) {
1234 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1235 LOG.info("CombinedBlockCache {} {}", cacheKey, refCount);
1236 } else {
1237 continue;
1239 assertEquals(0, refCount);
1243 private void insertData(Table table) throws IOException {
1244 Put put = new Put(ROW);
1245 put.addColumn(FAMILY, QUALIFIER, data);
1246 table.put(put);
1247 put = new Put(ROW1);
1248 put.addColumn(FAMILY, QUALIFIER, data);
1249 table.put(put);
1250 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1251 put = new Put(ROW);
1252 put.addColumn(FAMILY, QUALIFIER2, data2);
1253 table.put(put);
1256 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1257 InterruptedException {
1258 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1259 for (int i = 0; i < NO_OF_THREADS; i++) {
1260 scanThreads[i] = new ScanThread(table, reverse);
1262 for (ScanThread thread : scanThreads) {
1263 thread.start();
1265 return scanThreads;
1268 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1269 throws IOException, InterruptedException {
1270 GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1271 for (int i = 0; i < NO_OF_THREADS; i++) {
1272 getThreads[i] = new GetThread(table, tracker, multipleCFs);
1274 for (GetThread thread : getThreads) {
1275 thread.start();
1277 return getThreads;
1280 private MultiGetThread[] initiateMultiGet(Table table)
1281 throws IOException, InterruptedException {
1282 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1283 for (int i = 0; i < NO_OF_THREADS; i++) {
1284 multiGetThreads[i] = new MultiGetThread(table);
1286 for (MultiGetThread thread : multiGetThreads) {
1287 thread.start();
1289 return multiGetThreads;
1292 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1293 throws InterruptedException {
1294 int counter = NO_OF_THREADS;
1295 if (CustomInnerRegionObserver.waitForGets.get()) {
1296 // Because only one row is selected, it has only 2 blocks
1297 counter = counter - 1;
1298 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1299 Thread.sleep(100);
1301 } else {
1302 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1303 Thread.sleep(100);
1306 Iterator<CachedBlock> iterator = cache.iterator();
1307 int refCount = 0;
1308 while (iterator.hasNext()) {
1309 CachedBlock next = iterator.next();
1310 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1311 if (cache instanceof BucketCache) {
1312 refCount = ((BucketCache) cache).getRpcRefCount(cacheKey);
1313 } else if (cache instanceof CombinedBlockCache) {
1314 refCount = ((CombinedBlockCache) cache).getRpcRefCount(cacheKey);
1315 } else {
1316 continue;
1318 System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1319 if (CustomInnerRegionObserver.waitForGets.get()) {
1320 if (expectOnlyZero) {
1321 assertTrue(refCount == 0);
1323 if (refCount != 0) {
1324 // Because the scan would have also touched up on these blocks but
1325 // it
1326 // would have touched
1327 // all 3
1328 if (getClosed) {
1329 // If get has closed only the scan's blocks would be available
1330 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1331 } else {
1332 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1335 } else {
1336 // Because the get would have also touched up on these blocks but it
1337 // would have touched
1338 // upon only 2 additionally
1339 if (expectOnlyZero) {
1340 assertTrue(refCount == 0);
1342 if (refCount != 0) {
1343 if (getLatch == null) {
1344 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1345 } else {
1346 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1351 CustomInnerRegionObserver.getCdl().get().countDown();
1354 private static class MultiGetThread extends Thread {
1355 private final Table table;
1356 private final List<Get> gets = new ArrayList<>();
1357 public MultiGetThread(Table table) {
1358 this.table = table;
1360 @Override
1361 public void run() {
1362 gets.add(new Get(ROW));
1363 gets.add(new Get(ROW1));
1364 try {
1365 CustomInnerRegionObserver.getCdl().set(latch);
1366 Result[] r = table.get(gets);
1367 assertTrue(Bytes.equals(r[0].getRow(), ROW));
1368 assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1369 } catch (IOException e) {
1374 private static class GetThread extends Thread {
1375 private final Table table;
1376 private final boolean tracker;
1377 private final boolean multipleCFs;
1379 public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1380 this.table = table;
1381 this.tracker = tracker;
1382 this.multipleCFs = multipleCFs;
1385 @Override
1386 public void run() {
1387 try {
1388 initiateGet(table);
1389 } catch (IOException e) {
1390 // do nothing
1394 private void initiateGet(Table table) throws IOException {
1395 Get get = new Get(ROW);
1396 if (tracker) {
1397 // Change this
1398 if (!multipleCFs) {
1399 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1400 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1401 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1402 // Unknown key
1403 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1404 } else {
1405 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1406 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1407 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1408 // Unknown key
1409 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1412 CustomInnerRegionObserver.getCdl().set(latch);
1413 Result r = table.get(get);
1414 System.out.println(r);
1415 if (!tracker) {
1416 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1417 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1418 } else {
1419 if (!multipleCFs) {
1420 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1421 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1422 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1423 } else {
1424 assertTrue(Bytes.equals(
1425 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1426 data2));
1427 assertTrue(Bytes.equals(
1428 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1429 data2));
1430 assertTrue(Bytes.equals(
1431 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1432 data2));
1438 private static class ScanThread extends Thread {
1439 private final Table table;
1440 private final boolean reverse;
1442 public ScanThread(Table table, boolean reverse) {
1443 this.table = table;
1444 this.reverse = reverse;
1447 @Override
1448 public void run() {
1449 try {
1450 initiateScan(table);
1451 } catch (IOException e) {
1452 // do nothing
1456 private void initiateScan(Table table) throws IOException {
1457 Scan scan = new Scan();
1458 if (reverse) {
1459 scan.setReversed(true);
1461 CustomInnerRegionObserver.getCdl().set(latch);
1462 ResultScanner resScanner = table.getScanner(scan);
1463 int i = (reverse ? ROWS.length - 1 : 0);
1464 boolean resultFound = false;
1465 for (Result result : resScanner) {
1466 resultFound = true;
1467 System.out.println(result);
1468 if (!reverse) {
1469 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1470 i++;
1471 } else {
1472 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1473 i--;
1476 assertTrue(resultFound);
1480 private void waitForStoreFileCount(HStore store, int count, int timeout)
1481 throws InterruptedException {
1482 long start = EnvironmentEdgeManager.currentTime();
1483 while (start + timeout > EnvironmentEdgeManager.currentTime() &&
1484 store.getStorefilesCount() != count) {
1485 Thread.sleep(100);
1487 System.out.println("start=" + start + ", now=" + EnvironmentEdgeManager.currentTime() +
1488 ", cur=" + store.getStorefilesCount());
1489 assertEquals(count, store.getStorefilesCount());
1492 private static class CustomScanner implements RegionScanner {
1494 private RegionScanner delegate;
1496 public CustomScanner(RegionScanner delegate) {
1497 this.delegate = delegate;
1500 @Override
1501 public boolean next(List<Cell> results) throws IOException {
1502 return delegate.next(results);
1505 @Override
1506 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1507 return delegate.next(result, scannerContext);
1510 @Override
1511 public boolean nextRaw(List<Cell> result) throws IOException {
1512 return delegate.nextRaw(result);
1515 @Override
1516 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1517 boolean nextRaw = delegate.nextRaw(result, context);
1518 if (compactionLatch != null && compactionLatch.getCount() > 0) {
1519 try {
1520 compactionLatch.await();
1521 } catch (InterruptedException ie) {
1525 if (CustomInnerRegionObserver.throwException.get()) {
1526 if (exceptionLatch.getCount() > 0) {
1527 try {
1528 exceptionLatch.await();
1529 } catch (InterruptedException e) {
1531 throw new IOException("throw exception");
1534 return nextRaw;
1537 @Override
1538 public void close() throws IOException {
1539 delegate.close();
1542 @Override
1543 public RegionInfo getRegionInfo() {
1544 return delegate.getRegionInfo();
1547 @Override
1548 public boolean isFilterDone() throws IOException {
1549 return delegate.isFilterDone();
1552 @Override
1553 public boolean reseek(byte[] row) throws IOException {
1554 return false;
1557 @Override
1558 public long getMaxResultSize() {
1559 return delegate.getMaxResultSize();
1562 @Override
1563 public long getMvccReadPoint() {
1564 return delegate.getMvccReadPoint();
1567 @Override
1568 public int getBatch() {
1569 return delegate.getBatch();
1573 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1574 @Override
1575 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1576 Scan scan, RegionScanner s) throws IOException {
1577 return new CustomScanner(s);
1581 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1582 static final AtomicInteger countOfNext = new AtomicInteger(0);
1583 static final AtomicInteger countOfGets = new AtomicInteger(0);
1584 static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1585 static final AtomicBoolean throwException = new AtomicBoolean(false);
1586 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1587 new CountDownLatch(0));
1589 @Override
1590 public Optional<RegionObserver> getRegionObserver() {
1591 return Optional.of(this);
1594 @Override
1595 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1596 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1597 slowdownCode(e, false);
1598 if (getLatch != null && getLatch.getCount() > 0) {
1599 try {
1600 getLatch.await();
1601 } catch (InterruptedException e1) {
1604 return hasMore;
1607 @Override
1608 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1609 List<Cell> results) throws IOException {
1610 slowdownCode(e, true);
1613 public static AtomicReference<CountDownLatch> getCdl() {
1614 return cdl;
1617 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1618 boolean isGet) {
1619 CountDownLatch latch = getCdl().get();
1620 try {
1621 System.out.println(latch.getCount() + " is the count " + isGet);
1622 if (latch.getCount() > 0) {
1623 if (isGet) {
1624 countOfGets.incrementAndGet();
1625 } else {
1626 countOfNext.incrementAndGet();
1628 LOG.info("Waiting for the counterCountDownLatch");
1629 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1630 if (latch.getCount() > 0) {
1631 throw new RuntimeException("Can't wait more");
1634 } catch (InterruptedException e1) {
1635 LOG.error(e1.toString(), e1);