HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestBlockEvictionFromClient.java
blob023095ff104911a1cddbb50f9a99e5cbb5e511ad
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.concurrent.atomic.AtomicLong;
33 import java.util.concurrent.atomic.AtomicReference;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionInfo;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
44 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
45 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
46 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
47 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
48 import org.apache.hadoop.hbase.io.hfile.BlockCache;
49 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
50 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
51 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
52 import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
53 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
54 import org.apache.hadoop.hbase.regionserver.InternalScanner;
55 import org.apache.hadoop.hbase.regionserver.Region;
56 import org.apache.hadoop.hbase.regionserver.RegionScanner;
57 import org.apache.hadoop.hbase.regionserver.ScannerContext;
58 import org.apache.hadoop.hbase.regionserver.Store;
59 import org.apache.hadoop.hbase.testclassification.ClientTests;
60 import org.apache.hadoop.hbase.testclassification.LargeTests;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.junit.After;
63 import org.junit.AfterClass;
64 import org.junit.Before;
65 import org.junit.BeforeClass;
66 import org.junit.Rule;
67 import org.junit.Test;
68 import org.junit.experimental.categories.Category;
69 import org.junit.rules.TestName;
71 @Category({ LargeTests.class, ClientTests.class })
72 @SuppressWarnings("deprecation")
73 public class TestBlockEvictionFromClient {
74 private static final Log LOG = LogFactory.getLog(TestBlockEvictionFromClient.class);
75 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
76 static byte[][] ROWS = new byte[2][];
77 private static int NO_OF_THREADS = 3;
78 private static byte[] ROW = Bytes.toBytes("testRow");
79 private static byte[] ROW1 = Bytes.toBytes("testRow1");
80 private static byte[] ROW2 = Bytes.toBytes("testRow2");
81 private static byte[] ROW3 = Bytes.toBytes("testRow3");
82 private static byte[] FAMILY = Bytes.toBytes("testFamily");
83 private static byte[][] FAMILIES_1 = new byte[1][0];
84 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
85 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
86 private static byte[] data = new byte[1000];
87 private static byte[] data2 = Bytes.add(data, data);
88 protected static int SLAVES = 1;
89 private static CountDownLatch latch;
90 private static CountDownLatch getLatch;
91 private static CountDownLatch compactionLatch;
92 private static CountDownLatch exceptionLatch;
94 @Rule
95 public TestName name = new TestName();
97 /**
98 * @throws java.lang.Exception
100 @BeforeClass
101 public static void setUpBeforeClass() throws Exception {
102 ROWS[0] = ROW;
103 ROWS[1] = ROW1;
104 Configuration conf = TEST_UTIL.getConfiguration();
105 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
106 MultiRowMutationEndpoint.class.getName());
107 conf.setBoolean("hbase.table.sanity.checks", true); // enable for below
108 // tests
109 conf.setInt("hbase.regionserver.handler.count", 20);
110 conf.setInt("hbase.bucketcache.size", 400);
111 conf.setStrings("hbase.bucketcache.ioengine", "offheap");
112 conf.setFloat("hfile.block.cache.size", 0.2f);
113 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
114 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
115 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
116 FAMILIES_1[0] = FAMILY;
117 TEST_UTIL.startMiniCluster(SLAVES);
121 * @throws java.lang.Exception
123 @AfterClass
124 public static void tearDownAfterClass() throws Exception {
125 TEST_UTIL.shutdownMiniCluster();
129 * @throws java.lang.Exception
131 @Before
132 public void setUp() throws Exception {
133 CustomInnerRegionObserver.waitForGets.set(false);
134 CustomInnerRegionObserver.countOfNext.set(0);
135 CustomInnerRegionObserver.countOfGets.set(0);
139 * @throws java.lang.Exception
141 @After
142 public void tearDown() throws Exception {
143 if (latch != null) {
144 while (latch.getCount() > 0) {
145 latch.countDown();
148 if (getLatch != null) {
149 getLatch.countDown();
151 if (compactionLatch != null) {
152 compactionLatch.countDown();
154 if (exceptionLatch != null) {
155 exceptionLatch.countDown();
157 latch = null;
158 getLatch = null;
159 compactionLatch = null;
160 exceptionLatch = null;
161 CustomInnerRegionObserver.throwException.set(false);
162 // Clean up the tables for every test case
163 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
164 for (TableName tableName : listTableNames) {
165 if (!tableName.isSystemTable()) {
166 TEST_UTIL.getAdmin().disableTable(tableName);
167 TEST_UTIL.getAdmin().deleteTable(tableName);
172 @Test
173 public void testBlockEvictionWithParallelScans() throws Exception {
174 Table table = null;
175 try {
176 latch = new CountDownLatch(1);
177 final TableName tableName = TableName.valueOf(name.getMethodName());
178 // Create a table with block size as 1024
179 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
180 CustomInnerRegionObserver.class.getName());
181 // get the block cache and region
182 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
183 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
184 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
185 regionName);
186 Store store = region.getStores().iterator().next();
187 CacheConfig cacheConf = store.getCacheConfig();
188 cacheConf.setCacheDataOnWrite(true);
189 cacheConf.setEvictOnClose(true);
190 BlockCache cache = cacheConf.getBlockCache();
192 // insert data. 2 Rows are added
193 Put put = new Put(ROW);
194 put.addColumn(FAMILY, QUALIFIER, data);
195 table.put(put);
196 put = new Put(ROW1);
197 put.addColumn(FAMILY, QUALIFIER, data);
198 table.put(put);
199 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
200 // data was in memstore so don't expect any changes
201 // flush the data
202 System.out.println("Flushing cache in problematic area");
203 // Should create one Hfile with 2 blocks
204 region.flush(true);
205 // Load cache
206 // Create three sets of scan
207 ScanThread[] scanThreads = initiateScan(table, false);
208 Thread.sleep(100);
209 checkForBlockEviction(cache, false, false);
210 for (ScanThread thread : scanThreads) {
211 thread.join();
213 // CustomInnerRegionObserver.sleepTime.set(0);
214 Iterator<CachedBlock> iterator = cache.iterator();
215 iterateBlockCache(cache, iterator);
216 // read the data and expect same blocks, one new hit, no misses
217 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
218 iterator = cache.iterator();
219 iterateBlockCache(cache, iterator);
220 // Check how this miss is happening
221 // insert a second column, read the row, no new blocks, 3 new hits
222 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
223 byte[] data2 = Bytes.add(data, data);
224 put = new Put(ROW);
225 put.addColumn(FAMILY, QUALIFIER2, data2);
226 table.put(put);
227 Result r = table.get(new Get(ROW));
228 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
229 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
230 iterator = cache.iterator();
231 iterateBlockCache(cache, iterator);
232 // flush, one new block
233 System.out.println("Flushing cache");
234 region.flush(true);
235 iterator = cache.iterator();
236 iterateBlockCache(cache, iterator);
237 // compact, net minus two blocks, two hits, no misses
238 System.out.println("Compacting");
239 assertEquals(2, store.getStorefilesCount());
240 store.triggerMajorCompaction();
241 region.compact(true);
242 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
243 assertEquals(1, store.getStorefilesCount());
244 iterator = cache.iterator();
245 iterateBlockCache(cache, iterator);
246 // read the row, this should be a cache miss because we don't cache data
247 // blocks on compaction
248 r = table.get(new Get(ROW));
249 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
250 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
251 iterator = cache.iterator();
252 iterateBlockCache(cache, iterator);
253 } finally {
254 if (table != null) {
255 table.close();
260 @Test
261 public void testParallelGetsAndScans() throws IOException, InterruptedException {
262 Table table = null;
263 try {
264 latch = new CountDownLatch(2);
265 // Check if get() returns blocks on its close() itself
266 getLatch = new CountDownLatch(1);
267 final TableName tableName = TableName.valueOf(name.getMethodName());
268 // Create KV that will give you two blocks
269 // Create a table with block size as 1024
270 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
271 CustomInnerRegionObserver.class.getName());
272 // get the block cache and region
273 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
274 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
275 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
276 regionName);
277 Store store = region.getStores().iterator().next();
278 CacheConfig cacheConf = store.getCacheConfig();
279 cacheConf.setCacheDataOnWrite(true);
280 cacheConf.setEvictOnClose(true);
281 BlockCache cache = cacheConf.getBlockCache();
283 insertData(table);
284 // flush the data
285 System.out.println("Flushing cache");
286 // Should create one Hfile with 2 blocks
287 region.flush(true);
288 // Create three sets of scan
289 CustomInnerRegionObserver.waitForGets.set(true);
290 ScanThread[] scanThreads = initiateScan(table, false);
291 // Create three sets of gets
292 GetThread[] getThreads = initiateGet(table, false, false);
293 checkForBlockEviction(cache, false, false);
294 CustomInnerRegionObserver.waitForGets.set(false);
295 checkForBlockEviction(cache, false, false);
296 for (GetThread thread : getThreads) {
297 thread.join();
299 // Verify whether the gets have returned the blocks that it had
300 CustomInnerRegionObserver.waitForGets.set(true);
301 // giving some time for the block to be decremented
302 checkForBlockEviction(cache, true, false);
303 getLatch.countDown();
304 for (ScanThread thread : scanThreads) {
305 thread.join();
307 System.out.println("Scans should have returned the bloks");
308 // Check with either true or false
309 CustomInnerRegionObserver.waitForGets.set(false);
310 // The scan should also have released the blocks by now
311 checkForBlockEviction(cache, true, true);
312 } finally {
313 if (table != null) {
314 table.close();
319 @Test
320 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
321 Table table = null;
322 try {
323 latch = new CountDownLatch(1);
324 // Check if get() returns blocks on its close() itself
325 getLatch = new CountDownLatch(1);
326 final TableName tableName = TableName.valueOf(name.getMethodName());
327 // Create KV that will give you two blocks
328 // Create a table with block size as 1024
329 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
330 CustomInnerRegionObserver.class.getName());
331 // get the block cache and region
332 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
333 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
334 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
335 regionName);
336 Store store = region.getStores().iterator().next();
337 CacheConfig cacheConf = store.getCacheConfig();
338 cacheConf.setCacheDataOnWrite(true);
339 cacheConf.setEvictOnClose(true);
340 BlockCache cache = cacheConf.getBlockCache();
342 Put put = new Put(ROW);
343 put.addColumn(FAMILY, QUALIFIER, data);
344 table.put(put);
345 region.flush(true);
346 put = new Put(ROW1);
347 put.addColumn(FAMILY, QUALIFIER, data);
348 table.put(put);
349 region.flush(true);
350 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
351 put = new Put(ROW);
352 put.addColumn(FAMILY, QUALIFIER2, data2);
353 table.put(put);
354 region.flush(true);
355 // flush the data
356 System.out.println("Flushing cache");
357 // Should create one Hfile with 2 blocks
358 CustomInnerRegionObserver.waitForGets.set(true);
359 // Create three sets of gets
360 GetThread[] getThreads = initiateGet(table, false, false);
361 Thread.sleep(200);
362 CustomInnerRegionObserver.getCdl().get().countDown();
363 for (GetThread thread : getThreads) {
364 thread.join();
366 // Verify whether the gets have returned the blocks that it had
367 CustomInnerRegionObserver.waitForGets.set(true);
368 // giving some time for the block to be decremented
369 checkForBlockEviction(cache, true, false);
370 getLatch.countDown();
371 System.out.println("Gets should have returned the bloks");
372 } finally {
373 if (table != null) {
374 table.close();
379 @Test
380 // TODO : check how block index works here
381 public void testGetsWithMultiColumnsAndExplicitTracker()
382 throws IOException, InterruptedException {
383 Table table = null;
384 try {
385 latch = new CountDownLatch(1);
386 // Check if get() returns blocks on its close() itself
387 getLatch = new CountDownLatch(1);
388 final TableName tableName = TableName.valueOf(name.getMethodName());
389 // Create KV that will give you two blocks
390 // Create a table with block size as 1024
391 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
392 CustomInnerRegionObserver.class.getName());
393 // get the block cache and region
394 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
395 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
396 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
397 regionName);
398 BlockCache cache = setCacheProperties(region);
399 Put put = new Put(ROW);
400 put.addColumn(FAMILY, QUALIFIER, data);
401 table.put(put);
402 region.flush(true);
403 put = new Put(ROW1);
404 put.addColumn(FAMILY, QUALIFIER, data);
405 table.put(put);
406 region.flush(true);
407 for (int i = 1; i < 10; i++) {
408 put = new Put(ROW);
409 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
410 table.put(put);
411 if (i % 2 == 0) {
412 region.flush(true);
415 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
416 put = new Put(ROW);
417 put.addColumn(FAMILY, QUALIFIER2, data2);
418 table.put(put);
419 region.flush(true);
420 // flush the data
421 System.out.println("Flushing cache");
422 // Should create one Hfile with 2 blocks
423 CustomInnerRegionObserver.waitForGets.set(true);
424 // Create three sets of gets
425 GetThread[] getThreads = initiateGet(table, true, false);
426 Thread.sleep(200);
427 Iterator<CachedBlock> iterator = cache.iterator();
428 boolean usedBlocksFound = false;
429 int refCount = 0;
430 int noOfBlocksWithRef = 0;
431 while (iterator.hasNext()) {
432 CachedBlock next = iterator.next();
433 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
434 if (cache instanceof BucketCache) {
435 refCount = ((BucketCache) cache).getRefCount(cacheKey);
436 } else if (cache instanceof CombinedBlockCache) {
437 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
438 } else {
439 continue;
441 if (refCount != 0) {
442 // Blocks will be with count 3
443 System.out.println("The refCount is " + refCount);
444 assertEquals(NO_OF_THREADS, refCount);
445 usedBlocksFound = true;
446 noOfBlocksWithRef++;
449 assertTrue(usedBlocksFound);
450 // the number of blocks referred
451 assertEquals(10, noOfBlocksWithRef);
452 CustomInnerRegionObserver.getCdl().get().countDown();
453 for (GetThread thread : getThreads) {
454 thread.join();
456 // Verify whether the gets have returned the blocks that it had
457 CustomInnerRegionObserver.waitForGets.set(true);
458 // giving some time for the block to be decremented
459 checkForBlockEviction(cache, true, false);
460 getLatch.countDown();
461 System.out.println("Gets should have returned the bloks");
462 } finally {
463 if (table != null) {
464 table.close();
469 @Test
470 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
471 Table table = null;
472 try {
473 latch = new CountDownLatch(1);
474 // Check if get() returns blocks on its close() itself
475 getLatch = new CountDownLatch(1);
476 final TableName tableName = TableName.valueOf(name.getMethodName());
477 // Create KV that will give you two blocks
478 // Create a table with block size as 1024
479 byte[][] fams = new byte[10][];
480 fams[0] = FAMILY;
481 for (int i = 1; i < 10; i++) {
482 fams[i] = (Bytes.toBytes("testFamily" + i));
484 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
485 CustomInnerRegionObserver.class.getName());
486 // get the block cache and region
487 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
488 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
489 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
490 regionName);
491 BlockCache cache = setCacheProperties(region);
493 Put put = new Put(ROW);
494 put.addColumn(FAMILY, QUALIFIER, data);
495 table.put(put);
496 region.flush(true);
497 put = new Put(ROW1);
498 put.addColumn(FAMILY, QUALIFIER, data);
499 table.put(put);
500 region.flush(true);
501 for (int i = 1; i < 10; i++) {
502 put = new Put(ROW);
503 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
504 table.put(put);
505 if (i % 2 == 0) {
506 region.flush(true);
509 region.flush(true);
510 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
511 put = new Put(ROW);
512 put.addColumn(FAMILY, QUALIFIER2, data2);
513 table.put(put);
514 region.flush(true);
515 // flush the data
516 System.out.println("Flushing cache");
517 // Should create one Hfile with 2 blocks
518 CustomInnerRegionObserver.waitForGets.set(true);
519 // Create three sets of gets
520 GetThread[] getThreads = initiateGet(table, true, true);
521 Thread.sleep(200);
522 Iterator<CachedBlock> iterator = cache.iterator();
523 boolean usedBlocksFound = false;
524 int refCount = 0;
525 int noOfBlocksWithRef = 0;
526 while (iterator.hasNext()) {
527 CachedBlock next = iterator.next();
528 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
529 if (cache instanceof BucketCache) {
530 refCount = ((BucketCache) cache).getRefCount(cacheKey);
531 } else if (cache instanceof CombinedBlockCache) {
532 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
533 } else {
534 continue;
536 if (refCount != 0) {
537 // Blocks will be with count 3
538 System.out.println("The refCount is " + refCount);
539 assertEquals(NO_OF_THREADS, refCount);
540 usedBlocksFound = true;
541 noOfBlocksWithRef++;
544 assertTrue(usedBlocksFound);
545 // the number of blocks referred
546 assertEquals(3, noOfBlocksWithRef);
547 CustomInnerRegionObserver.getCdl().get().countDown();
548 for (GetThread thread : getThreads) {
549 thread.join();
551 // Verify whether the gets have returned the blocks that it had
552 CustomInnerRegionObserver.waitForGets.set(true);
553 // giving some time for the block to be decremented
554 checkForBlockEviction(cache, true, false);
555 getLatch.countDown();
556 System.out.println("Gets should have returned the bloks");
557 } finally {
558 if (table != null) {
559 table.close();
564 @Test
565 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
566 Table table = null;
567 try {
568 final TableName tableName = TableName.valueOf(name.getMethodName());
569 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024);
570 // get the block cache and region
571 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
572 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
573 Region region =
574 TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(regionName);
575 Store store = region.getStores().iterator().next();
576 CacheConfig cacheConf = store.getCacheConfig();
577 cacheConf.setEvictOnClose(true);
578 BlockCache cache = cacheConf.getBlockCache();
580 Put put = new Put(ROW);
581 put.addColumn(FAMILY, QUALIFIER, data);
582 table.put(put);
583 region.flush(true);
584 put = new Put(ROW1);
585 put.addColumn(FAMILY, QUALIFIER, data);
586 table.put(put);
587 region.flush(true);
588 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
589 put = new Put(ROW2);
590 put.addColumn(FAMILY, QUALIFIER2, data2);
591 table.put(put);
592 put = new Put(ROW3);
593 put.addColumn(FAMILY, QUALIFIER2, data2);
594 table.put(put);
595 region.flush(true);
596 TEST_UTIL.getAdmin().split(tableName, ROW1);
597 List<HRegionInfo> tableRegions = TEST_UTIL.getAdmin().getTableRegions(tableName);
598 // Wait for splits
599 while (tableRegions.size() != 2) {
600 tableRegions = TEST_UTIL.getAdmin().getTableRegions(tableName);
601 Thread.sleep(100);
603 region.compact(true);
604 Iterator<CachedBlock> iterator = cache.iterator();
605 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
606 // should be closed inorder to return those blocks
607 iterateBlockCache(cache, iterator);
608 } finally {
609 if (table != null) {
610 table.close();
615 @Test
616 public void testMultiGets() throws IOException, InterruptedException {
617 Table table = null;
618 try {
619 latch = new CountDownLatch(2);
620 // Check if get() returns blocks on its close() itself
621 getLatch = new CountDownLatch(1);
622 final TableName tableName = TableName.valueOf(name.getMethodName());
623 // Create KV that will give you two blocks
624 // Create a table with block size as 1024
625 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
626 CustomInnerRegionObserver.class.getName());
627 // get the block cache and region
628 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
629 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
630 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
631 regionName);
632 Store store = region.getStores().iterator().next();
633 CacheConfig cacheConf = store.getCacheConfig();
634 cacheConf.setCacheDataOnWrite(true);
635 cacheConf.setEvictOnClose(true);
636 BlockCache cache = cacheConf.getBlockCache();
638 Put put = new Put(ROW);
639 put.addColumn(FAMILY, QUALIFIER, data);
640 table.put(put);
641 region.flush(true);
642 put = new Put(ROW1);
643 put.addColumn(FAMILY, QUALIFIER, data);
644 table.put(put);
645 region.flush(true);
646 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
647 put = new Put(ROW);
648 put.addColumn(FAMILY, QUALIFIER2, data2);
649 table.put(put);
650 region.flush(true);
651 // flush the data
652 System.out.println("Flushing cache");
653 // Should create one Hfile with 2 blocks
654 CustomInnerRegionObserver.waitForGets.set(true);
655 // Create three sets of gets
656 MultiGetThread[] getThreads = initiateMultiGet(table);
657 Thread.sleep(200);
658 int refCount;
659 Iterator<CachedBlock> iterator = cache.iterator();
660 boolean foundNonZeroBlock = false;
661 while (iterator.hasNext()) {
662 CachedBlock next = iterator.next();
663 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
664 if (cache instanceof BucketCache) {
665 refCount = ((BucketCache) cache).getRefCount(cacheKey);
666 } else if (cache instanceof CombinedBlockCache) {
667 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
668 } else {
669 continue;
671 if (refCount != 0) {
672 assertEquals(NO_OF_THREADS, refCount);
673 foundNonZeroBlock = true;
676 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
677 CustomInnerRegionObserver.getCdl().get().countDown();
678 CustomInnerRegionObserver.getCdl().get().countDown();
679 for (MultiGetThread thread : getThreads) {
680 thread.join();
682 // Verify whether the gets have returned the blocks that it had
683 CustomInnerRegionObserver.waitForGets.set(true);
684 // giving some time for the block to be decremented
685 iterateBlockCache(cache, iterator);
686 getLatch.countDown();
687 System.out.println("Gets should have returned the bloks");
688 } finally {
689 if (table != null) {
690 table.close();
694 @Test
695 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
696 Table table = null;
697 try {
698 latch = new CountDownLatch(1);
699 // Check if get() returns blocks on its close() itself
700 final TableName tableName = TableName.valueOf(name.getMethodName());
701 // Create KV that will give you two blocks
702 // Create a table with block size as 1024
703 byte[][] fams = new byte[10][];
704 fams[0] = FAMILY;
705 for (int i = 1; i < 10; i++) {
706 fams[i] = (Bytes.toBytes("testFamily" + i));
708 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
709 CustomInnerRegionObserver.class.getName());
710 // get the block cache and region
711 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
712 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
713 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
714 regionName);
715 BlockCache cache = setCacheProperties(region);
717 Put put = new Put(ROW);
718 put.addColumn(FAMILY, QUALIFIER, data);
719 table.put(put);
720 region.flush(true);
721 put = new Put(ROW1);
722 put.addColumn(FAMILY, QUALIFIER, data);
723 table.put(put);
724 region.flush(true);
725 for (int i = 1; i < 10; i++) {
726 put = new Put(ROW);
727 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
728 table.put(put);
729 if (i % 2 == 0) {
730 region.flush(true);
733 region.flush(true);
734 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
735 put = new Put(ROW);
736 put.addColumn(FAMILY, QUALIFIER2, data2);
737 table.put(put);
738 region.flush(true);
739 // flush the data
740 System.out.println("Flushing cache");
741 // Should create one Hfile with 2 blocks
742 // Create three sets of gets
743 ScanThread[] scanThreads = initiateScan(table, true);
744 Thread.sleep(200);
745 Iterator<CachedBlock> iterator = cache.iterator();
746 boolean usedBlocksFound = false;
747 int refCount = 0;
748 int noOfBlocksWithRef = 0;
749 while (iterator.hasNext()) {
750 CachedBlock next = iterator.next();
751 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
752 if (cache instanceof BucketCache) {
753 refCount = ((BucketCache) cache).getRefCount(cacheKey);
754 } else if (cache instanceof CombinedBlockCache) {
755 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
756 } else {
757 continue;
759 if (refCount != 0) {
760 // Blocks will be with count 3
761 System.out.println("The refCount is " + refCount);
762 assertEquals(NO_OF_THREADS, refCount);
763 usedBlocksFound = true;
764 noOfBlocksWithRef++;
767 assertTrue(usedBlocksFound);
768 // the number of blocks referred
769 assertEquals(12, noOfBlocksWithRef);
770 CustomInnerRegionObserver.getCdl().get().countDown();
771 for (ScanThread thread : scanThreads) {
772 thread.join();
774 // giving some time for the block to be decremented
775 checkForBlockEviction(cache, true, false);
776 } finally {
777 if (table != null) {
778 table.close();
783 private BlockCache setCacheProperties(Region region) {
784 Iterator<Store> strItr = region.getStores().iterator();
785 BlockCache cache = null;
786 while (strItr.hasNext()) {
787 Store store = strItr.next();
788 CacheConfig cacheConf = store.getCacheConfig();
789 cacheConf.setCacheDataOnWrite(true);
790 cacheConf.setEvictOnClose(true);
791 // Use the last one
792 cache = cacheConf.getBlockCache();
794 return cache;
797 @Test
798 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
799 InterruptedException {
800 Table table = null;
801 try {
802 latch = new CountDownLatch(2);
803 // Check if get() returns blocks on its close() itself
804 getLatch = new CountDownLatch(1);
805 final TableName tableName = TableName.valueOf(name.getMethodName());
806 // Create KV that will give you two blocks
807 // Create a table with block size as 1024
808 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
809 CustomInnerRegionObserverWrapper.class.getName());
810 // get the block cache and region
811 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
812 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
813 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
814 regionName);
815 Store store = region.getStores().iterator().next();
816 CacheConfig cacheConf = store.getCacheConfig();
817 cacheConf.setCacheDataOnWrite(true);
818 cacheConf.setEvictOnClose(true);
819 BlockCache cache = cacheConf.getBlockCache();
821 // insert data. 2 Rows are added
822 insertData(table);
823 // flush the data
824 System.out.println("Flushing cache");
825 // Should create one Hfile with 2 blocks
826 region.flush(true);
827 // CustomInnerRegionObserver.sleepTime.set(5000);
828 // Create three sets of scan
829 CustomInnerRegionObserver.waitForGets.set(true);
830 ScanThread[] scanThreads = initiateScan(table, false);
831 // Create three sets of gets
832 GetThread[] getThreads = initiateGet(table, false, false);
833 // The block would have been decremented for the scan case as it was
834 // wrapped
835 // before even the postNext hook gets executed.
836 // giving some time for the block to be decremented
837 Thread.sleep(100);
838 CustomInnerRegionObserver.waitForGets.set(false);
839 checkForBlockEviction(cache, false, false);
840 // countdown the latch
841 CustomInnerRegionObserver.getCdl().get().countDown();
842 for (GetThread thread : getThreads) {
843 thread.join();
845 getLatch.countDown();
846 for (ScanThread thread : scanThreads) {
847 thread.join();
849 } finally {
850 if (table != null) {
851 table.close();
856 @Test
857 public void testScanWithCompaction() throws IOException, InterruptedException {
858 testScanWithCompactionInternals(name.getMethodName(), false);
861 @Test
862 public void testReverseScanWithCompaction() throws IOException, InterruptedException {
863 testScanWithCompactionInternals(name.getMethodName(), true);
866 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
867 throws IOException, InterruptedException {
868 Table table = null;
869 try {
870 latch = new CountDownLatch(1);
871 compactionLatch = new CountDownLatch(1);
872 TableName tableName = TableName.valueOf(tableNameStr);
873 // Create a table with block size as 1024
874 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
875 CustomInnerRegionObserverWrapper.class.getName());
876 // get the block cache and region
877 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
878 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
879 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
880 regionName);
881 Store store = region.getStores().iterator().next();
882 CacheConfig cacheConf = store.getCacheConfig();
883 cacheConf.setCacheDataOnWrite(true);
884 cacheConf.setEvictOnClose(true);
885 BlockCache cache = cacheConf.getBlockCache();
887 // insert data. 2 Rows are added
888 Put put = new Put(ROW);
889 put.addColumn(FAMILY, QUALIFIER, data);
890 table.put(put);
891 put = new Put(ROW1);
892 put.addColumn(FAMILY, QUALIFIER, data);
893 table.put(put);
894 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
895 // Should create one Hfile with 2 blocks
896 region.flush(true);
897 // read the data and expect same blocks, one new hit, no misses
898 int refCount = 0;
899 // Check how this miss is happening
900 // insert a second column, read the row, no new blocks, 3 new hits
901 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
902 byte[] data2 = Bytes.add(data, data);
903 put = new Put(ROW);
904 put.addColumn(FAMILY, QUALIFIER2, data2);
905 table.put(put);
906 // flush, one new block
907 System.out.println("Flushing cache");
908 region.flush(true);
909 Iterator<CachedBlock> iterator = cache.iterator();
910 iterateBlockCache(cache, iterator);
911 // Create three sets of scan
912 ScanThread[] scanThreads = initiateScan(table, reversed);
913 Thread.sleep(100);
914 iterator = cache.iterator();
915 boolean usedBlocksFound = false;
916 while (iterator.hasNext()) {
917 CachedBlock next = iterator.next();
918 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
919 if (cache instanceof BucketCache) {
920 refCount = ((BucketCache) cache).getRefCount(cacheKey);
921 } else if (cache instanceof CombinedBlockCache) {
922 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
923 } else {
924 continue;
926 if (refCount != 0) {
927 // Blocks will be with count 3
928 assertEquals(NO_OF_THREADS, refCount);
929 usedBlocksFound = true;
932 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
933 usedBlocksFound = false;
934 System.out.println("Compacting");
935 assertEquals(2, store.getStorefilesCount());
936 store.triggerMajorCompaction();
937 region.compact(true);
938 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
939 assertEquals(1, store.getStorefilesCount());
940 // Even after compaction is done we will have some blocks that cannot
941 // be evicted this is because the scan is still referencing them
942 iterator = cache.iterator();
943 while (iterator.hasNext()) {
944 CachedBlock next = iterator.next();
945 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
946 if (cache instanceof BucketCache) {
947 refCount = ((BucketCache) cache).getRefCount(cacheKey);
948 } else if (cache instanceof CombinedBlockCache) {
949 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
950 } else {
951 continue;
953 if (refCount != 0) {
954 // Blocks will be with count 3 as they are not yet cleared
955 assertEquals(NO_OF_THREADS, refCount);
956 usedBlocksFound = true;
959 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
960 // Should not throw exception
961 compactionLatch.countDown();
962 latch.countDown();
963 for (ScanThread thread : scanThreads) {
964 thread.join();
966 // by this time all blocks should have been evicted
967 iterator = cache.iterator();
968 iterateBlockCache(cache, iterator);
969 Result r = table.get(new Get(ROW));
970 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
971 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
972 // The gets would be working on new blocks
973 iterator = cache.iterator();
974 iterateBlockCache(cache, iterator);
975 } finally {
976 if (table != null) {
977 table.close();
982 @Test
983 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
984 throws IOException, InterruptedException {
985 // do flush and scan in parallel
986 Table table = null;
987 try {
988 latch = new CountDownLatch(1);
989 compactionLatch = new CountDownLatch(1);
990 final TableName tableName = TableName.valueOf(name.getMethodName());
991 // Create a table with block size as 1024
992 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
993 CustomInnerRegionObserverWrapper.class.getName());
994 // get the block cache and region
995 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
996 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
997 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
998 regionName);
999 Store store = region.getStores().iterator().next();
1000 CacheConfig cacheConf = store.getCacheConfig();
1001 cacheConf.setCacheDataOnWrite(true);
1002 cacheConf.setEvictOnClose(true);
1003 BlockCache cache = cacheConf.getBlockCache();
1005 // insert data. 2 Rows are added
1006 Put put = new Put(ROW);
1007 put.addColumn(FAMILY, QUALIFIER, data);
1008 table.put(put);
1009 put = new Put(ROW1);
1010 put.addColumn(FAMILY, QUALIFIER, data);
1011 table.put(put);
1012 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1013 // Should create one Hfile with 2 blocks
1014 region.flush(true);
1015 // read the data and expect same blocks, one new hit, no misses
1016 int refCount = 0;
1017 // Check how this miss is happening
1018 // insert a second column, read the row, no new blocks, 3 new hits
1019 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1020 byte[] data2 = Bytes.add(data, data);
1021 put = new Put(ROW);
1022 put.addColumn(FAMILY, QUALIFIER2, data2);
1023 table.put(put);
1024 // flush, one new block
1025 System.out.println("Flushing cache");
1026 region.flush(true);
1027 Iterator<CachedBlock> iterator = cache.iterator();
1028 iterateBlockCache(cache, iterator);
1029 // Create three sets of scan
1030 ScanThread[] scanThreads = initiateScan(table, false);
1031 Thread.sleep(100);
1032 iterator = cache.iterator();
1033 boolean usedBlocksFound = false;
1034 while (iterator.hasNext()) {
1035 CachedBlock next = iterator.next();
1036 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1037 if (cache instanceof BucketCache) {
1038 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1039 } else if (cache instanceof CombinedBlockCache) {
1040 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1041 } else {
1042 continue;
1044 if (refCount != 0) {
1045 // Blocks will be with count 3
1046 assertEquals(NO_OF_THREADS, refCount);
1047 usedBlocksFound = true;
1050 // Make a put and do a flush
1051 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1052 data2 = Bytes.add(data, data);
1053 put = new Put(ROW1);
1054 put.addColumn(FAMILY, QUALIFIER2, data2);
1055 table.put(put);
1056 // flush, one new block
1057 System.out.println("Flushing cache");
1058 region.flush(true);
1059 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1060 usedBlocksFound = false;
1061 System.out.println("Compacting");
1062 assertEquals(3, store.getStorefilesCount());
1063 store.triggerMajorCompaction();
1064 region.compact(true);
1065 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1066 assertEquals(1, store.getStorefilesCount());
1067 // Even after compaction is done we will have some blocks that cannot
1068 // be evicted this is because the scan is still referencing them
1069 iterator = cache.iterator();
1070 while (iterator.hasNext()) {
1071 CachedBlock next = iterator.next();
1072 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1073 if (cache instanceof BucketCache) {
1074 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1075 } else if (cache instanceof CombinedBlockCache) {
1076 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1077 } else {
1078 continue;
1080 if (refCount != 0) {
1081 // Blocks will be with count 3 as they are not yet cleared
1082 assertEquals(NO_OF_THREADS, refCount);
1083 usedBlocksFound = true;
1086 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1087 // Should not throw exception
1088 compactionLatch.countDown();
1089 latch.countDown();
1090 for (ScanThread thread : scanThreads) {
1091 thread.join();
1093 // by this time all blocks should have been evicted
1094 iterator = cache.iterator();
1095 // Since a flush and compaction happened after a scan started
1096 // we need to ensure that all the original blocks of the compacted file
1097 // is also removed.
1098 iterateBlockCache(cache, iterator);
1099 Result r = table.get(new Get(ROW));
1100 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1101 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1102 // The gets would be working on new blocks
1103 iterator = cache.iterator();
1104 iterateBlockCache(cache, iterator);
1105 } finally {
1106 if (table != null) {
1107 table.close();
1113 @Test
1114 public void testScanWithException() throws IOException, InterruptedException {
1115 Table table = null;
1116 try {
1117 latch = new CountDownLatch(1);
1118 exceptionLatch = new CountDownLatch(1);
1119 final TableName tableName = TableName.valueOf(name.getMethodName());
1120 // Create KV that will give you two blocks
1121 // Create a table with block size as 1024
1122 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1123 CustomInnerRegionObserverWrapper.class.getName());
1124 // get the block cache and region
1125 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1126 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1127 Region region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getFromOnlineRegions(
1128 regionName);
1129 Store store = region.getStores().iterator().next();
1130 CacheConfig cacheConf = store.getCacheConfig();
1131 cacheConf.setCacheDataOnWrite(true);
1132 cacheConf.setEvictOnClose(true);
1133 BlockCache cache = cacheConf.getBlockCache();
1134 // insert data. 2 Rows are added
1135 insertData(table);
1136 // flush the data
1137 System.out.println("Flushing cache");
1138 // Should create one Hfile with 2 blocks
1139 region.flush(true);
1140 // CustomInnerRegionObserver.sleepTime.set(5000);
1141 CustomInnerRegionObserver.throwException.set(true);
1142 ScanThread[] scanThreads = initiateScan(table, false);
1143 // The block would have been decremented for the scan case as it was
1144 // wrapped
1145 // before even the postNext hook gets executed.
1146 // giving some time for the block to be decremented
1147 Thread.sleep(100);
1148 Iterator<CachedBlock> iterator = cache.iterator();
1149 boolean usedBlocksFound = false;
1150 int refCount = 0;
1151 while (iterator.hasNext()) {
1152 CachedBlock next = iterator.next();
1153 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1154 if (cache instanceof BucketCache) {
1155 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1156 } else if (cache instanceof CombinedBlockCache) {
1157 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1158 } else {
1159 continue;
1161 if (refCount != 0) {
1162 // Blocks will be with count 3
1163 assertEquals(NO_OF_THREADS, refCount);
1164 usedBlocksFound = true;
1167 assertTrue(usedBlocksFound);
1168 exceptionLatch.countDown();
1169 // countdown the latch
1170 CustomInnerRegionObserver.getCdl().get().countDown();
1171 for (ScanThread thread : scanThreads) {
1172 thread.join();
1174 iterator = cache.iterator();
1175 usedBlocksFound = false;
1176 refCount = 0;
1177 while (iterator.hasNext()) {
1178 CachedBlock next = iterator.next();
1179 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1180 if (cache instanceof BucketCache) {
1181 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1182 } else if (cache instanceof CombinedBlockCache) {
1183 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1184 } else {
1185 continue;
1187 if (refCount != 0) {
1188 // Blocks will be with count 3
1189 assertEquals(NO_OF_THREADS, refCount);
1190 usedBlocksFound = true;
1193 assertFalse(usedBlocksFound);
1194 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1195 assertEquals(0, refCount);
1196 } finally {
1197 if (table != null) {
1198 table.close();
1203 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1204 int refCount;
1205 while (iterator.hasNext()) {
1206 CachedBlock next = iterator.next();
1207 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1208 if (cache instanceof BucketCache) {
1209 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1210 } else if (cache instanceof CombinedBlockCache) {
1211 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1212 } else {
1213 continue;
1215 assertEquals(0, refCount);
1219 private void insertData(Table table) throws IOException {
1220 Put put = new Put(ROW);
1221 put.addColumn(FAMILY, QUALIFIER, data);
1222 table.put(put);
1223 put = new Put(ROW1);
1224 put.addColumn(FAMILY, QUALIFIER, data);
1225 table.put(put);
1226 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1227 put = new Put(ROW);
1228 put.addColumn(FAMILY, QUALIFIER2, data2);
1229 table.put(put);
1232 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1233 InterruptedException {
1234 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1235 for (int i = 0; i < NO_OF_THREADS; i++) {
1236 scanThreads[i] = new ScanThread(table, reverse);
1238 for (ScanThread thread : scanThreads) {
1239 thread.start();
1241 return scanThreads;
1244 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1245 throws IOException, InterruptedException {
1246 GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1247 for (int i = 0; i < NO_OF_THREADS; i++) {
1248 getThreads[i] = new GetThread(table, tracker, multipleCFs);
1250 for (GetThread thread : getThreads) {
1251 thread.start();
1253 return getThreads;
1256 private MultiGetThread[] initiateMultiGet(Table table)
1257 throws IOException, InterruptedException {
1258 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1259 for (int i = 0; i < NO_OF_THREADS; i++) {
1260 multiGetThreads[i] = new MultiGetThread(table);
1262 for (MultiGetThread thread : multiGetThreads) {
1263 thread.start();
1265 return multiGetThreads;
1268 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1269 throws InterruptedException {
1270 int counter = NO_OF_THREADS;
1271 if (CustomInnerRegionObserver.waitForGets.get()) {
1272 // Because only one row is selected, it has only 2 blocks
1273 counter = counter - 1;
1274 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1275 Thread.sleep(100);
1277 } else {
1278 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1279 Thread.sleep(100);
1282 Iterator<CachedBlock> iterator = cache.iterator();
1283 int refCount = 0;
1284 while (iterator.hasNext()) {
1285 CachedBlock next = iterator.next();
1286 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1287 if (cache instanceof BucketCache) {
1288 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1289 } else if (cache instanceof CombinedBlockCache) {
1290 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1291 } else {
1292 continue;
1294 System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1295 if (CustomInnerRegionObserver.waitForGets.get()) {
1296 if (expectOnlyZero) {
1297 assertTrue(refCount == 0);
1299 if (refCount != 0) {
1300 // Because the scan would have also touched up on these blocks but
1301 // it
1302 // would have touched
1303 // all 3
1304 if (getClosed) {
1305 // If get has closed only the scan's blocks would be available
1306 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1307 } else {
1308 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1311 } else {
1312 // Because the get would have also touched up on these blocks but it
1313 // would have touched
1314 // upon only 2 additionally
1315 if (expectOnlyZero) {
1316 assertTrue(refCount == 0);
1318 if (refCount != 0) {
1319 if (getLatch == null) {
1320 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1321 } else {
1322 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1327 CustomInnerRegionObserver.getCdl().get().countDown();
1330 private static class MultiGetThread extends Thread {
1331 private final Table table;
1332 private final List<Get> gets = new ArrayList<>();
1333 public MultiGetThread(Table table) {
1334 this.table = table;
1336 @Override
1337 public void run() {
1338 gets.add(new Get(ROW));
1339 gets.add(new Get(ROW1));
1340 try {
1341 CustomInnerRegionObserver.getCdl().set(latch);
1342 Result[] r = table.get(gets);
1343 assertTrue(Bytes.equals(r[0].getRow(), ROW));
1344 assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1345 } catch (IOException e) {
1350 private static class GetThread extends Thread {
1351 private final Table table;
1352 private final boolean tracker;
1353 private final boolean multipleCFs;
1355 public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1356 this.table = table;
1357 this.tracker = tracker;
1358 this.multipleCFs = multipleCFs;
1361 @Override
1362 public void run() {
1363 try {
1364 initiateGet(table);
1365 } catch (IOException e) {
1366 // do nothing
1370 private void initiateGet(Table table) throws IOException {
1371 Get get = new Get(ROW);
1372 if (tracker) {
1373 // Change this
1374 if (!multipleCFs) {
1375 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1376 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1377 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1378 // Unknown key
1379 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1380 } else {
1381 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1382 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1383 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1384 // Unknown key
1385 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1388 CustomInnerRegionObserver.getCdl().set(latch);
1389 Result r = table.get(get);
1390 System.out.println(r);
1391 if (!tracker) {
1392 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1393 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1394 } else {
1395 if (!multipleCFs) {
1396 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1397 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1398 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1399 } else {
1400 assertTrue(Bytes.equals(
1401 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1402 data2));
1403 assertTrue(Bytes.equals(
1404 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1405 data2));
1406 assertTrue(Bytes.equals(
1407 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1408 data2));
1414 private static class ScanThread extends Thread {
1415 private final Table table;
1416 private final boolean reverse;
1418 public ScanThread(Table table, boolean reverse) {
1419 this.table = table;
1420 this.reverse = reverse;
1423 @Override
1424 public void run() {
1425 try {
1426 initiateScan(table);
1427 } catch (IOException e) {
1428 // do nothing
1432 private void initiateScan(Table table) throws IOException {
1433 Scan scan = new Scan();
1434 if (reverse) {
1435 scan.setReversed(true);
1437 CustomInnerRegionObserver.getCdl().set(latch);
1438 ResultScanner resScanner = table.getScanner(scan);
1439 int i = (reverse ? ROWS.length - 1 : 0);
1440 boolean resultFound = false;
1441 for (Result result : resScanner) {
1442 resultFound = true;
1443 System.out.println(result);
1444 if (!reverse) {
1445 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1446 i++;
1447 } else {
1448 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1449 i--;
1452 assertTrue(resultFound);
1456 private void waitForStoreFileCount(Store store, int count, int timeout)
1457 throws InterruptedException {
1458 long start = System.currentTimeMillis();
1459 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1460 Thread.sleep(100);
1462 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur="
1463 + store.getStorefilesCount());
1464 assertEquals(count, store.getStorefilesCount());
1467 private static class CustomScanner implements RegionScanner {
1469 private RegionScanner delegate;
1471 public CustomScanner(RegionScanner delegate) {
1472 this.delegate = delegate;
1475 @Override
1476 public boolean next(List<Cell> results) throws IOException {
1477 return delegate.next(results);
1480 @Override
1481 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1482 return delegate.next(result, scannerContext);
1485 @Override
1486 public boolean nextRaw(List<Cell> result) throws IOException {
1487 return delegate.nextRaw(result);
1490 @Override
1491 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1492 boolean nextRaw = delegate.nextRaw(result, context);
1493 if (compactionLatch != null && compactionLatch.getCount() > 0) {
1494 try {
1495 compactionLatch.await();
1496 } catch (InterruptedException ie) {
1500 if (CustomInnerRegionObserver.throwException.get()) {
1501 if (exceptionLatch.getCount() > 0) {
1502 try {
1503 exceptionLatch.await();
1504 } catch (InterruptedException e) {
1506 throw new IOException("throw exception");
1509 return nextRaw;
1512 @Override
1513 public void close() throws IOException {
1514 delegate.close();
1517 @Override
1518 public HRegionInfo getRegionInfo() {
1519 return delegate.getRegionInfo();
1522 @Override
1523 public boolean isFilterDone() throws IOException {
1524 return delegate.isFilterDone();
1527 @Override
1528 public boolean reseek(byte[] row) throws IOException {
1529 return false;
1532 @Override
1533 public long getMaxResultSize() {
1534 return delegate.getMaxResultSize();
1537 @Override
1538 public long getMvccReadPoint() {
1539 return delegate.getMvccReadPoint();
1542 @Override
1543 public int getBatch() {
1544 return delegate.getBatch();
1547 @Override
1548 public void shipped() throws IOException {
1549 this.delegate.shipped();
1553 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1554 @Override
1555 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1556 Scan scan, RegionScanner s) throws IOException {
1557 return new CustomScanner(s);
1561 public static class CustomInnerRegionObserver implements RegionObserver {
1562 static final AtomicLong sleepTime = new AtomicLong(0);
1563 static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
1564 static final AtomicInteger countOfNext = new AtomicInteger(0);
1565 static final AtomicInteger countOfGets = new AtomicInteger(0);
1566 static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1567 static final AtomicBoolean throwException = new AtomicBoolean(false);
1568 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1569 new CountDownLatch(0));
1571 @Override
1572 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1573 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1574 slowdownCode(e, false);
1575 if (getLatch != null && getLatch.getCount() > 0) {
1576 try {
1577 getLatch.await();
1578 } catch (InterruptedException e1) {
1581 return hasMore;
1584 @Override
1585 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1586 List<Cell> results) throws IOException {
1587 slowdownCode(e, true);
1590 public static AtomicReference<CountDownLatch> getCdl() {
1591 return cdl;
1594 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1595 boolean isGet) {
1596 CountDownLatch latch = getCdl().get();
1597 try {
1598 System.out.println(latch.getCount() + " is the count " + isGet);
1599 if (latch.getCount() > 0) {
1600 if (isGet) {
1601 countOfGets.incrementAndGet();
1602 } else {
1603 countOfNext.incrementAndGet();
1605 LOG.info("Waiting for the counterCountDownLatch");
1606 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1607 if (latch.getCount() > 0) {
1608 throw new RuntimeException("Can't wait more");
1611 } catch (InterruptedException e1) {
1612 LOG.error(e1);