2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Iterator
;
27 import java
.util
.List
;
28 import java
.util
.concurrent
.CountDownLatch
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
31 import java
.util
.concurrent
.atomic
.AtomicInteger
;
32 import java
.util
.concurrent
.atomic
.AtomicLong
;
33 import java
.util
.concurrent
.atomic
.AtomicReference
;
35 import org
.apache
.commons
.logging
.Log
;
36 import org
.apache
.commons
.logging
.LogFactory
;
37 import org
.apache
.hadoop
.conf
.Configuration
;
38 import org
.apache
.hadoop
.hbase
.Cell
;
39 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
40 import org
.apache
.hadoop
.hbase
.HConstants
;
41 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
42 import org
.apache
.hadoop
.hbase
.TableName
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
44 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
45 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
46 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
47 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
48 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
49 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheKey
;
50 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
51 import org
.apache
.hadoop
.hbase
.io
.hfile
.CachedBlock
;
52 import org
.apache
.hadoop
.hbase
.io
.hfile
.CombinedBlockCache
;
53 import org
.apache
.hadoop
.hbase
.io
.hfile
.bucket
.BucketCache
;
54 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
55 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
56 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
57 import org
.apache
.hadoop
.hbase
.regionserver
.ScannerContext
;
58 import org
.apache
.hadoop
.hbase
.regionserver
.Store
;
59 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
60 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
61 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
62 import org
.junit
.After
;
63 import org
.junit
.AfterClass
;
64 import org
.junit
.Before
;
65 import org
.junit
.BeforeClass
;
66 import org
.junit
.Rule
;
67 import org
.junit
.Test
;
68 import org
.junit
.experimental
.categories
.Category
;
69 import org
.junit
.rules
.TestName
;
71 @Category({ LargeTests
.class, ClientTests
.class })
72 @SuppressWarnings("deprecation")
73 public class TestBlockEvictionFromClient
{
74 private static final Log LOG
= LogFactory
.getLog(TestBlockEvictionFromClient
.class);
75 protected final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
76 static byte[][] ROWS
= new byte[2][];
77 private static int NO_OF_THREADS
= 3;
78 private static byte[] ROW
= Bytes
.toBytes("testRow");
79 private static byte[] ROW1
= Bytes
.toBytes("testRow1");
80 private static byte[] ROW2
= Bytes
.toBytes("testRow2");
81 private static byte[] ROW3
= Bytes
.toBytes("testRow3");
82 private static byte[] FAMILY
= Bytes
.toBytes("testFamily");
83 private static byte[][] FAMILIES_1
= new byte[1][0];
84 private static byte[] QUALIFIER
= Bytes
.toBytes("testQualifier");
85 private static byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
86 private static byte[] data
= new byte[1000];
87 private static byte[] data2
= Bytes
.add(data
, data
);
88 protected static int SLAVES
= 1;
89 private static CountDownLatch latch
;
90 private static CountDownLatch getLatch
;
91 private static CountDownLatch compactionLatch
;
92 private static CountDownLatch exceptionLatch
;
95 public TestName name
= new TestName();
98 * @throws java.lang.Exception
101 public static void setUpBeforeClass() throws Exception
{
104 Configuration conf
= TEST_UTIL
.getConfiguration();
105 conf
.setStrings(CoprocessorHost
.REGION_COPROCESSOR_CONF_KEY
,
106 MultiRowMutationEndpoint
.class.getName());
107 conf
.setBoolean("hbase.table.sanity.checks", true); // enable for below
109 conf
.setInt("hbase.regionserver.handler.count", 20);
110 conf
.setInt("hbase.bucketcache.size", 400);
111 conf
.setStrings("hbase.bucketcache.ioengine", "offheap");
112 conf
.setFloat("hfile.block.cache.size", 0.2f
);
113 conf
.setFloat("hbase.regionserver.global.memstore.size", 0.1f
);
114 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 0);// do not retry
115 conf
.setInt(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 5000);
116 FAMILIES_1
[0] = FAMILY
;
117 TEST_UTIL
.startMiniCluster(SLAVES
);
121 * @throws java.lang.Exception
124 public static void tearDownAfterClass() throws Exception
{
125 TEST_UTIL
.shutdownMiniCluster();
129 * @throws java.lang.Exception
132 public void setUp() throws Exception
{
133 CustomInnerRegionObserver
.waitForGets
.set(false);
134 CustomInnerRegionObserver
.countOfNext
.set(0);
135 CustomInnerRegionObserver
.countOfGets
.set(0);
139 * @throws java.lang.Exception
142 public void tearDown() throws Exception
{
144 while (latch
.getCount() > 0) {
148 if (getLatch
!= null) {
149 getLatch
.countDown();
151 if (compactionLatch
!= null) {
152 compactionLatch
.countDown();
154 if (exceptionLatch
!= null) {
155 exceptionLatch
.countDown();
159 compactionLatch
= null;
160 exceptionLatch
= null;
161 CustomInnerRegionObserver
.throwException
.set(false);
162 // Clean up the tables for every test case
163 TableName
[] listTableNames
= TEST_UTIL
.getAdmin().listTableNames();
164 for (TableName tableName
: listTableNames
) {
165 if (!tableName
.isSystemTable()) {
166 TEST_UTIL
.getAdmin().disableTable(tableName
);
167 TEST_UTIL
.getAdmin().deleteTable(tableName
);
173 public void testBlockEvictionWithParallelScans() throws Exception
{
176 latch
= new CountDownLatch(1);
177 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
178 // Create a table with block size as 1024
179 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
180 CustomInnerRegionObserver
.class.getName());
181 // get the block cache and region
182 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
183 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
184 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
186 Store store
= region
.getStores().iterator().next();
187 CacheConfig cacheConf
= store
.getCacheConfig();
188 cacheConf
.setCacheDataOnWrite(true);
189 cacheConf
.setEvictOnClose(true);
190 BlockCache cache
= cacheConf
.getBlockCache();
192 // insert data. 2 Rows are added
193 Put put
= new Put(ROW
);
194 put
.addColumn(FAMILY
, QUALIFIER
, data
);
197 put
.addColumn(FAMILY
, QUALIFIER
, data
);
199 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
200 // data was in memstore so don't expect any changes
202 System
.out
.println("Flushing cache in problematic area");
203 // Should create one Hfile with 2 blocks
206 // Create three sets of scan
207 ScanThread
[] scanThreads
= initiateScan(table
, false);
209 checkForBlockEviction(cache
, false, false);
210 for (ScanThread thread
: scanThreads
) {
213 // CustomInnerRegionObserver.sleepTime.set(0);
214 Iterator
<CachedBlock
> iterator
= cache
.iterator();
215 iterateBlockCache(cache
, iterator
);
216 // read the data and expect same blocks, one new hit, no misses
217 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
218 iterator
= cache
.iterator();
219 iterateBlockCache(cache
, iterator
);
220 // Check how this miss is happening
221 // insert a second column, read the row, no new blocks, 3 new hits
222 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
223 byte[] data2
= Bytes
.add(data
, data
);
225 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
227 Result r
= table
.get(new Get(ROW
));
228 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
229 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
230 iterator
= cache
.iterator();
231 iterateBlockCache(cache
, iterator
);
232 // flush, one new block
233 System
.out
.println("Flushing cache");
235 iterator
= cache
.iterator();
236 iterateBlockCache(cache
, iterator
);
237 // compact, net minus two blocks, two hits, no misses
238 System
.out
.println("Compacting");
239 assertEquals(2, store
.getStorefilesCount());
240 store
.triggerMajorCompaction();
241 region
.compact(true);
242 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
243 assertEquals(1, store
.getStorefilesCount());
244 iterator
= cache
.iterator();
245 iterateBlockCache(cache
, iterator
);
246 // read the row, this should be a cache miss because we don't cache data
247 // blocks on compaction
248 r
= table
.get(new Get(ROW
));
249 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
250 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
251 iterator
= cache
.iterator();
252 iterateBlockCache(cache
, iterator
);
261 public void testParallelGetsAndScans() throws IOException
, InterruptedException
{
264 latch
= new CountDownLatch(2);
265 // Check if get() returns blocks on its close() itself
266 getLatch
= new CountDownLatch(1);
267 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
268 // Create KV that will give you two blocks
269 // Create a table with block size as 1024
270 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
271 CustomInnerRegionObserver
.class.getName());
272 // get the block cache and region
273 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
274 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
275 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
277 Store store
= region
.getStores().iterator().next();
278 CacheConfig cacheConf
= store
.getCacheConfig();
279 cacheConf
.setCacheDataOnWrite(true);
280 cacheConf
.setEvictOnClose(true);
281 BlockCache cache
= cacheConf
.getBlockCache();
285 System
.out
.println("Flushing cache");
286 // Should create one Hfile with 2 blocks
288 // Create three sets of scan
289 CustomInnerRegionObserver
.waitForGets
.set(true);
290 ScanThread
[] scanThreads
= initiateScan(table
, false);
291 // Create three sets of gets
292 GetThread
[] getThreads
= initiateGet(table
, false, false);
293 checkForBlockEviction(cache
, false, false);
294 CustomInnerRegionObserver
.waitForGets
.set(false);
295 checkForBlockEviction(cache
, false, false);
296 for (GetThread thread
: getThreads
) {
299 // Verify whether the gets have returned the blocks that it had
300 CustomInnerRegionObserver
.waitForGets
.set(true);
301 // giving some time for the block to be decremented
302 checkForBlockEviction(cache
, true, false);
303 getLatch
.countDown();
304 for (ScanThread thread
: scanThreads
) {
307 System
.out
.println("Scans should have returned the bloks");
308 // Check with either true or false
309 CustomInnerRegionObserver
.waitForGets
.set(false);
310 // The scan should also have released the blocks by now
311 checkForBlockEviction(cache
, true, true);
320 public void testGetWithCellsInDifferentFiles() throws IOException
, InterruptedException
{
323 latch
= new CountDownLatch(1);
324 // Check if get() returns blocks on its close() itself
325 getLatch
= new CountDownLatch(1);
326 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
327 // Create KV that will give you two blocks
328 // Create a table with block size as 1024
329 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
330 CustomInnerRegionObserver
.class.getName());
331 // get the block cache and region
332 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
333 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
334 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
336 Store store
= region
.getStores().iterator().next();
337 CacheConfig cacheConf
= store
.getCacheConfig();
338 cacheConf
.setCacheDataOnWrite(true);
339 cacheConf
.setEvictOnClose(true);
340 BlockCache cache
= cacheConf
.getBlockCache();
342 Put put
= new Put(ROW
);
343 put
.addColumn(FAMILY
, QUALIFIER
, data
);
347 put
.addColumn(FAMILY
, QUALIFIER
, data
);
350 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
352 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
356 System
.out
.println("Flushing cache");
357 // Should create one Hfile with 2 blocks
358 CustomInnerRegionObserver
.waitForGets
.set(true);
359 // Create three sets of gets
360 GetThread
[] getThreads
= initiateGet(table
, false, false);
362 CustomInnerRegionObserver
.getCdl().get().countDown();
363 for (GetThread thread
: getThreads
) {
366 // Verify whether the gets have returned the blocks that it had
367 CustomInnerRegionObserver
.waitForGets
.set(true);
368 // giving some time for the block to be decremented
369 checkForBlockEviction(cache
, true, false);
370 getLatch
.countDown();
371 System
.out
.println("Gets should have returned the bloks");
380 // TODO : check how block index works here
381 public void testGetsWithMultiColumnsAndExplicitTracker()
382 throws IOException
, InterruptedException
{
385 latch
= new CountDownLatch(1);
386 // Check if get() returns blocks on its close() itself
387 getLatch
= new CountDownLatch(1);
388 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
389 // Create KV that will give you two blocks
390 // Create a table with block size as 1024
391 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
392 CustomInnerRegionObserver
.class.getName());
393 // get the block cache and region
394 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
395 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
396 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
398 BlockCache cache
= setCacheProperties(region
);
399 Put put
= new Put(ROW
);
400 put
.addColumn(FAMILY
, QUALIFIER
, data
);
404 put
.addColumn(FAMILY
, QUALIFIER
, data
);
407 for (int i
= 1; i
< 10; i
++) {
409 put
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + i
), data2
);
415 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
417 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
421 System
.out
.println("Flushing cache");
422 // Should create one Hfile with 2 blocks
423 CustomInnerRegionObserver
.waitForGets
.set(true);
424 // Create three sets of gets
425 GetThread
[] getThreads
= initiateGet(table
, true, false);
427 Iterator
<CachedBlock
> iterator
= cache
.iterator();
428 boolean usedBlocksFound
= false;
430 int noOfBlocksWithRef
= 0;
431 while (iterator
.hasNext()) {
432 CachedBlock next
= iterator
.next();
433 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
434 if (cache
instanceof BucketCache
) {
435 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
436 } else if (cache
instanceof CombinedBlockCache
) {
437 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
442 // Blocks will be with count 3
443 System
.out
.println("The refCount is " + refCount
);
444 assertEquals(NO_OF_THREADS
, refCount
);
445 usedBlocksFound
= true;
449 assertTrue(usedBlocksFound
);
450 // the number of blocks referred
451 assertEquals(10, noOfBlocksWithRef
);
452 CustomInnerRegionObserver
.getCdl().get().countDown();
453 for (GetThread thread
: getThreads
) {
456 // Verify whether the gets have returned the blocks that it had
457 CustomInnerRegionObserver
.waitForGets
.set(true);
458 // giving some time for the block to be decremented
459 checkForBlockEviction(cache
, true, false);
460 getLatch
.countDown();
461 System
.out
.println("Gets should have returned the bloks");
470 public void testGetWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
473 latch
= new CountDownLatch(1);
474 // Check if get() returns blocks on its close() itself
475 getLatch
= new CountDownLatch(1);
476 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
477 // Create KV that will give you two blocks
478 // Create a table with block size as 1024
479 byte[][] fams
= new byte[10][];
481 for (int i
= 1; i
< 10; i
++) {
482 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
484 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
485 CustomInnerRegionObserver
.class.getName());
486 // get the block cache and region
487 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
488 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
489 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
491 BlockCache cache
= setCacheProperties(region
);
493 Put put
= new Put(ROW
);
494 put
.addColumn(FAMILY
, QUALIFIER
, data
);
498 put
.addColumn(FAMILY
, QUALIFIER
, data
);
501 for (int i
= 1; i
< 10; i
++) {
503 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
510 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
512 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
516 System
.out
.println("Flushing cache");
517 // Should create one Hfile with 2 blocks
518 CustomInnerRegionObserver
.waitForGets
.set(true);
519 // Create three sets of gets
520 GetThread
[] getThreads
= initiateGet(table
, true, true);
522 Iterator
<CachedBlock
> iterator
= cache
.iterator();
523 boolean usedBlocksFound
= false;
525 int noOfBlocksWithRef
= 0;
526 while (iterator
.hasNext()) {
527 CachedBlock next
= iterator
.next();
528 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
529 if (cache
instanceof BucketCache
) {
530 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
531 } else if (cache
instanceof CombinedBlockCache
) {
532 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
537 // Blocks will be with count 3
538 System
.out
.println("The refCount is " + refCount
);
539 assertEquals(NO_OF_THREADS
, refCount
);
540 usedBlocksFound
= true;
544 assertTrue(usedBlocksFound
);
545 // the number of blocks referred
546 assertEquals(3, noOfBlocksWithRef
);
547 CustomInnerRegionObserver
.getCdl().get().countDown();
548 for (GetThread thread
: getThreads
) {
551 // Verify whether the gets have returned the blocks that it had
552 CustomInnerRegionObserver
.waitForGets
.set(true);
553 // giving some time for the block to be decremented
554 checkForBlockEviction(cache
, true, false);
555 getLatch
.countDown();
556 System
.out
.println("Gets should have returned the bloks");
565 public void testBlockRefCountAfterSplits() throws IOException
, InterruptedException
{
568 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
569 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024);
570 // get the block cache and region
571 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
572 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
574 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(regionName
);
575 Store store
= region
.getStores().iterator().next();
576 CacheConfig cacheConf
= store
.getCacheConfig();
577 cacheConf
.setEvictOnClose(true);
578 BlockCache cache
= cacheConf
.getBlockCache();
580 Put put
= new Put(ROW
);
581 put
.addColumn(FAMILY
, QUALIFIER
, data
);
585 put
.addColumn(FAMILY
, QUALIFIER
, data
);
588 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
590 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
593 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
596 TEST_UTIL
.getAdmin().split(tableName
, ROW1
);
597 List
<HRegionInfo
> tableRegions
= TEST_UTIL
.getAdmin().getTableRegions(tableName
);
599 while (tableRegions
.size() != 2) {
600 tableRegions
= TEST_UTIL
.getAdmin().getTableRegions(tableName
);
603 region
.compact(true);
604 Iterator
<CachedBlock
> iterator
= cache
.iterator();
605 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
606 // should be closed inorder to return those blocks
607 iterateBlockCache(cache
, iterator
);
616 public void testMultiGets() throws IOException
, InterruptedException
{
619 latch
= new CountDownLatch(2);
620 // Check if get() returns blocks on its close() itself
621 getLatch
= new CountDownLatch(1);
622 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
623 // Create KV that will give you two blocks
624 // Create a table with block size as 1024
625 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
626 CustomInnerRegionObserver
.class.getName());
627 // get the block cache and region
628 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
629 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
630 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
632 Store store
= region
.getStores().iterator().next();
633 CacheConfig cacheConf
= store
.getCacheConfig();
634 cacheConf
.setCacheDataOnWrite(true);
635 cacheConf
.setEvictOnClose(true);
636 BlockCache cache
= cacheConf
.getBlockCache();
638 Put put
= new Put(ROW
);
639 put
.addColumn(FAMILY
, QUALIFIER
, data
);
643 put
.addColumn(FAMILY
, QUALIFIER
, data
);
646 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
648 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
652 System
.out
.println("Flushing cache");
653 // Should create one Hfile with 2 blocks
654 CustomInnerRegionObserver
.waitForGets
.set(true);
655 // Create three sets of gets
656 MultiGetThread
[] getThreads
= initiateMultiGet(table
);
659 Iterator
<CachedBlock
> iterator
= cache
.iterator();
660 boolean foundNonZeroBlock
= false;
661 while (iterator
.hasNext()) {
662 CachedBlock next
= iterator
.next();
663 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
664 if (cache
instanceof BucketCache
) {
665 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
666 } else if (cache
instanceof CombinedBlockCache
) {
667 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
672 assertEquals(NO_OF_THREADS
, refCount
);
673 foundNonZeroBlock
= true;
676 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock
);
677 CustomInnerRegionObserver
.getCdl().get().countDown();
678 CustomInnerRegionObserver
.getCdl().get().countDown();
679 for (MultiGetThread thread
: getThreads
) {
682 // Verify whether the gets have returned the blocks that it had
683 CustomInnerRegionObserver
.waitForGets
.set(true);
684 // giving some time for the block to be decremented
685 iterateBlockCache(cache
, iterator
);
686 getLatch
.countDown();
687 System
.out
.println("Gets should have returned the bloks");
695 public void testScanWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
698 latch
= new CountDownLatch(1);
699 // Check if get() returns blocks on its close() itself
700 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
701 // Create KV that will give you two blocks
702 // Create a table with block size as 1024
703 byte[][] fams
= new byte[10][];
705 for (int i
= 1; i
< 10; i
++) {
706 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
708 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
709 CustomInnerRegionObserver
.class.getName());
710 // get the block cache and region
711 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
712 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
713 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
715 BlockCache cache
= setCacheProperties(region
);
717 Put put
= new Put(ROW
);
718 put
.addColumn(FAMILY
, QUALIFIER
, data
);
722 put
.addColumn(FAMILY
, QUALIFIER
, data
);
725 for (int i
= 1; i
< 10; i
++) {
727 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
734 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
736 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
740 System
.out
.println("Flushing cache");
741 // Should create one Hfile with 2 blocks
742 // Create three sets of gets
743 ScanThread
[] scanThreads
= initiateScan(table
, true);
745 Iterator
<CachedBlock
> iterator
= cache
.iterator();
746 boolean usedBlocksFound
= false;
748 int noOfBlocksWithRef
= 0;
749 while (iterator
.hasNext()) {
750 CachedBlock next
= iterator
.next();
751 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
752 if (cache
instanceof BucketCache
) {
753 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
754 } else if (cache
instanceof CombinedBlockCache
) {
755 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
760 // Blocks will be with count 3
761 System
.out
.println("The refCount is " + refCount
);
762 assertEquals(NO_OF_THREADS
, refCount
);
763 usedBlocksFound
= true;
767 assertTrue(usedBlocksFound
);
768 // the number of blocks referred
769 assertEquals(12, noOfBlocksWithRef
);
770 CustomInnerRegionObserver
.getCdl().get().countDown();
771 for (ScanThread thread
: scanThreads
) {
774 // giving some time for the block to be decremented
775 checkForBlockEviction(cache
, true, false);
783 private BlockCache
setCacheProperties(Region region
) {
784 Iterator
<Store
> strItr
= region
.getStores().iterator();
785 BlockCache cache
= null;
786 while (strItr
.hasNext()) {
787 Store store
= strItr
.next();
788 CacheConfig cacheConf
= store
.getCacheConfig();
789 cacheConf
.setCacheDataOnWrite(true);
790 cacheConf
.setEvictOnClose(true);
792 cache
= cacheConf
.getBlockCache();
798 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException
,
799 InterruptedException
{
802 latch
= new CountDownLatch(2);
803 // Check if get() returns blocks on its close() itself
804 getLatch
= new CountDownLatch(1);
805 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
806 // Create KV that will give you two blocks
807 // Create a table with block size as 1024
808 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
809 CustomInnerRegionObserverWrapper
.class.getName());
810 // get the block cache and region
811 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
812 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
813 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
815 Store store
= region
.getStores().iterator().next();
816 CacheConfig cacheConf
= store
.getCacheConfig();
817 cacheConf
.setCacheDataOnWrite(true);
818 cacheConf
.setEvictOnClose(true);
819 BlockCache cache
= cacheConf
.getBlockCache();
821 // insert data. 2 Rows are added
824 System
.out
.println("Flushing cache");
825 // Should create one Hfile with 2 blocks
827 // CustomInnerRegionObserver.sleepTime.set(5000);
828 // Create three sets of scan
829 CustomInnerRegionObserver
.waitForGets
.set(true);
830 ScanThread
[] scanThreads
= initiateScan(table
, false);
831 // Create three sets of gets
832 GetThread
[] getThreads
= initiateGet(table
, false, false);
833 // The block would have been decremented for the scan case as it was
835 // before even the postNext hook gets executed.
836 // giving some time for the block to be decremented
838 CustomInnerRegionObserver
.waitForGets
.set(false);
839 checkForBlockEviction(cache
, false, false);
840 // countdown the latch
841 CustomInnerRegionObserver
.getCdl().get().countDown();
842 for (GetThread thread
: getThreads
) {
845 getLatch
.countDown();
846 for (ScanThread thread
: scanThreads
) {
857 public void testScanWithCompaction() throws IOException
, InterruptedException
{
858 testScanWithCompactionInternals(name
.getMethodName(), false);
862 public void testReverseScanWithCompaction() throws IOException
, InterruptedException
{
863 testScanWithCompactionInternals(name
.getMethodName(), true);
866 private void testScanWithCompactionInternals(String tableNameStr
, boolean reversed
)
867 throws IOException
, InterruptedException
{
870 latch
= new CountDownLatch(1);
871 compactionLatch
= new CountDownLatch(1);
872 TableName tableName
= TableName
.valueOf(tableNameStr
);
873 // Create a table with block size as 1024
874 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
875 CustomInnerRegionObserverWrapper
.class.getName());
876 // get the block cache and region
877 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
878 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
879 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
881 Store store
= region
.getStores().iterator().next();
882 CacheConfig cacheConf
= store
.getCacheConfig();
883 cacheConf
.setCacheDataOnWrite(true);
884 cacheConf
.setEvictOnClose(true);
885 BlockCache cache
= cacheConf
.getBlockCache();
887 // insert data. 2 Rows are added
888 Put put
= new Put(ROW
);
889 put
.addColumn(FAMILY
, QUALIFIER
, data
);
892 put
.addColumn(FAMILY
, QUALIFIER
, data
);
894 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
895 // Should create one Hfile with 2 blocks
897 // read the data and expect same blocks, one new hit, no misses
899 // Check how this miss is happening
900 // insert a second column, read the row, no new blocks, 3 new hits
901 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
902 byte[] data2
= Bytes
.add(data
, data
);
904 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
906 // flush, one new block
907 System
.out
.println("Flushing cache");
909 Iterator
<CachedBlock
> iterator
= cache
.iterator();
910 iterateBlockCache(cache
, iterator
);
911 // Create three sets of scan
912 ScanThread
[] scanThreads
= initiateScan(table
, reversed
);
914 iterator
= cache
.iterator();
915 boolean usedBlocksFound
= false;
916 while (iterator
.hasNext()) {
917 CachedBlock next
= iterator
.next();
918 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
919 if (cache
instanceof BucketCache
) {
920 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
921 } else if (cache
instanceof CombinedBlockCache
) {
922 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
927 // Blocks will be with count 3
928 assertEquals(NO_OF_THREADS
, refCount
);
929 usedBlocksFound
= true;
932 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
933 usedBlocksFound
= false;
934 System
.out
.println("Compacting");
935 assertEquals(2, store
.getStorefilesCount());
936 store
.triggerMajorCompaction();
937 region
.compact(true);
938 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
939 assertEquals(1, store
.getStorefilesCount());
940 // Even after compaction is done we will have some blocks that cannot
941 // be evicted this is because the scan is still referencing them
942 iterator
= cache
.iterator();
943 while (iterator
.hasNext()) {
944 CachedBlock next
= iterator
.next();
945 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
946 if (cache
instanceof BucketCache
) {
947 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
948 } else if (cache
instanceof CombinedBlockCache
) {
949 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
954 // Blocks will be with count 3 as they are not yet cleared
955 assertEquals(NO_OF_THREADS
, refCount
);
956 usedBlocksFound
= true;
959 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
960 // Should not throw exception
961 compactionLatch
.countDown();
963 for (ScanThread thread
: scanThreads
) {
966 // by this time all blocks should have been evicted
967 iterator
= cache
.iterator();
968 iterateBlockCache(cache
, iterator
);
969 Result r
= table
.get(new Get(ROW
));
970 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
971 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
972 // The gets would be working on new blocks
973 iterator
= cache
.iterator();
974 iterateBlockCache(cache
, iterator
);
983 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
984 throws IOException
, InterruptedException
{
985 // do flush and scan in parallel
988 latch
= new CountDownLatch(1);
989 compactionLatch
= new CountDownLatch(1);
990 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
991 // Create a table with block size as 1024
992 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
993 CustomInnerRegionObserverWrapper
.class.getName());
994 // get the block cache and region
995 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
996 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
997 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
999 Store store
= region
.getStores().iterator().next();
1000 CacheConfig cacheConf
= store
.getCacheConfig();
1001 cacheConf
.setCacheDataOnWrite(true);
1002 cacheConf
.setEvictOnClose(true);
1003 BlockCache cache
= cacheConf
.getBlockCache();
1005 // insert data. 2 Rows are added
1006 Put put
= new Put(ROW
);
1007 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1009 put
= new Put(ROW1
);
1010 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1012 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
1013 // Should create one Hfile with 2 blocks
1015 // read the data and expect same blocks, one new hit, no misses
1017 // Check how this miss is happening
1018 // insert a second column, read the row, no new blocks, 3 new hits
1019 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1020 byte[] data2
= Bytes
.add(data
, data
);
1022 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1024 // flush, one new block
1025 System
.out
.println("Flushing cache");
1027 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1028 iterateBlockCache(cache
, iterator
);
1029 // Create three sets of scan
1030 ScanThread
[] scanThreads
= initiateScan(table
, false);
1032 iterator
= cache
.iterator();
1033 boolean usedBlocksFound
= false;
1034 while (iterator
.hasNext()) {
1035 CachedBlock next
= iterator
.next();
1036 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1037 if (cache
instanceof BucketCache
) {
1038 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1039 } else if (cache
instanceof CombinedBlockCache
) {
1040 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1044 if (refCount
!= 0) {
1045 // Blocks will be with count 3
1046 assertEquals(NO_OF_THREADS
, refCount
);
1047 usedBlocksFound
= true;
1050 // Make a put and do a flush
1051 QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1052 data2
= Bytes
.add(data
, data
);
1053 put
= new Put(ROW1
);
1054 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1056 // flush, one new block
1057 System
.out
.println("Flushing cache");
1059 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1060 usedBlocksFound
= false;
1061 System
.out
.println("Compacting");
1062 assertEquals(3, store
.getStorefilesCount());
1063 store
.triggerMajorCompaction();
1064 region
.compact(true);
1065 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
1066 assertEquals(1, store
.getStorefilesCount());
1067 // Even after compaction is done we will have some blocks that cannot
1068 // be evicted this is because the scan is still referencing them
1069 iterator
= cache
.iterator();
1070 while (iterator
.hasNext()) {
1071 CachedBlock next
= iterator
.next();
1072 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1073 if (cache
instanceof BucketCache
) {
1074 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1075 } else if (cache
instanceof CombinedBlockCache
) {
1076 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1080 if (refCount
!= 0) {
1081 // Blocks will be with count 3 as they are not yet cleared
1082 assertEquals(NO_OF_THREADS
, refCount
);
1083 usedBlocksFound
= true;
1086 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1087 // Should not throw exception
1088 compactionLatch
.countDown();
1090 for (ScanThread thread
: scanThreads
) {
1093 // by this time all blocks should have been evicted
1094 iterator
= cache
.iterator();
1095 // Since a flush and compaction happened after a scan started
1096 // we need to ensure that all the original blocks of the compacted file
1098 iterateBlockCache(cache
, iterator
);
1099 Result r
= table
.get(new Get(ROW
));
1100 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1101 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1102 // The gets would be working on new blocks
1103 iterator
= cache
.iterator();
1104 iterateBlockCache(cache
, iterator
);
1106 if (table
!= null) {
1114 public void testScanWithException() throws IOException
, InterruptedException
{
1117 latch
= new CountDownLatch(1);
1118 exceptionLatch
= new CountDownLatch(1);
1119 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1120 // Create KV that will give you two blocks
1121 // Create a table with block size as 1024
1122 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
1123 CustomInnerRegionObserverWrapper
.class.getName());
1124 // get the block cache and region
1125 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1126 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1127 Region region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getFromOnlineRegions(
1129 Store store
= region
.getStores().iterator().next();
1130 CacheConfig cacheConf
= store
.getCacheConfig();
1131 cacheConf
.setCacheDataOnWrite(true);
1132 cacheConf
.setEvictOnClose(true);
1133 BlockCache cache
= cacheConf
.getBlockCache();
1134 // insert data. 2 Rows are added
1137 System
.out
.println("Flushing cache");
1138 // Should create one Hfile with 2 blocks
1140 // CustomInnerRegionObserver.sleepTime.set(5000);
1141 CustomInnerRegionObserver
.throwException
.set(true);
1142 ScanThread
[] scanThreads
= initiateScan(table
, false);
1143 // The block would have been decremented for the scan case as it was
1145 // before even the postNext hook gets executed.
1146 // giving some time for the block to be decremented
1148 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1149 boolean usedBlocksFound
= false;
1151 while (iterator
.hasNext()) {
1152 CachedBlock next
= iterator
.next();
1153 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1154 if (cache
instanceof BucketCache
) {
1155 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1156 } else if (cache
instanceof CombinedBlockCache
) {
1157 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1161 if (refCount
!= 0) {
1162 // Blocks will be with count 3
1163 assertEquals(NO_OF_THREADS
, refCount
);
1164 usedBlocksFound
= true;
1167 assertTrue(usedBlocksFound
);
1168 exceptionLatch
.countDown();
1169 // countdown the latch
1170 CustomInnerRegionObserver
.getCdl().get().countDown();
1171 for (ScanThread thread
: scanThreads
) {
1174 iterator
= cache
.iterator();
1175 usedBlocksFound
= false;
1177 while (iterator
.hasNext()) {
1178 CachedBlock next
= iterator
.next();
1179 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1180 if (cache
instanceof BucketCache
) {
1181 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1182 } else if (cache
instanceof CombinedBlockCache
) {
1183 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1187 if (refCount
!= 0) {
1188 // Blocks will be with count 3
1189 assertEquals(NO_OF_THREADS
, refCount
);
1190 usedBlocksFound
= true;
1193 assertFalse(usedBlocksFound
);
1194 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1195 assertEquals(0, refCount
);
1197 if (table
!= null) {
1203 private void iterateBlockCache(BlockCache cache
, Iterator
<CachedBlock
> iterator
) {
1205 while (iterator
.hasNext()) {
1206 CachedBlock next
= iterator
.next();
1207 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1208 if (cache
instanceof BucketCache
) {
1209 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1210 } else if (cache
instanceof CombinedBlockCache
) {
1211 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1215 assertEquals(0, refCount
);
1219 private void insertData(Table table
) throws IOException
{
1220 Put put
= new Put(ROW
);
1221 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1223 put
= new Put(ROW1
);
1224 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1226 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1228 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1232 private ScanThread
[] initiateScan(Table table
, boolean reverse
) throws IOException
,
1233 InterruptedException
{
1234 ScanThread
[] scanThreads
= new ScanThread
[NO_OF_THREADS
];
1235 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1236 scanThreads
[i
] = new ScanThread(table
, reverse
);
1238 for (ScanThread thread
: scanThreads
) {
1244 private GetThread
[] initiateGet(Table table
, boolean tracker
, boolean multipleCFs
)
1245 throws IOException
, InterruptedException
{
1246 GetThread
[] getThreads
= new GetThread
[NO_OF_THREADS
];
1247 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1248 getThreads
[i
] = new GetThread(table
, tracker
, multipleCFs
);
1250 for (GetThread thread
: getThreads
) {
1256 private MultiGetThread
[] initiateMultiGet(Table table
)
1257 throws IOException
, InterruptedException
{
1258 MultiGetThread
[] multiGetThreads
= new MultiGetThread
[NO_OF_THREADS
];
1259 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1260 multiGetThreads
[i
] = new MultiGetThread(table
);
1262 for (MultiGetThread thread
: multiGetThreads
) {
1265 return multiGetThreads
;
1268 private void checkForBlockEviction(BlockCache cache
, boolean getClosed
, boolean expectOnlyZero
)
1269 throws InterruptedException
{
1270 int counter
= NO_OF_THREADS
;
1271 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1272 // Because only one row is selected, it has only 2 blocks
1273 counter
= counter
- 1;
1274 while (CustomInnerRegionObserver
.countOfGets
.get() < NO_OF_THREADS
) {
1278 while (CustomInnerRegionObserver
.countOfNext
.get() < NO_OF_THREADS
) {
1282 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1284 while (iterator
.hasNext()) {
1285 CachedBlock next
= iterator
.next();
1286 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1287 if (cache
instanceof BucketCache
) {
1288 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1289 } else if (cache
instanceof CombinedBlockCache
) {
1290 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1294 System
.out
.println(" the refcount is " + refCount
+ " block is " + cacheKey
);
1295 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1296 if (expectOnlyZero
) {
1297 assertTrue(refCount
== 0);
1299 if (refCount
!= 0) {
1300 // Because the scan would have also touched up on these blocks but
1302 // would have touched
1305 // If get has closed only the scan's blocks would be available
1306 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get());
1308 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get() + (NO_OF_THREADS
));
1312 // Because the get would have also touched up on these blocks but it
1313 // would have touched
1314 // upon only 2 additionally
1315 if (expectOnlyZero
) {
1316 assertTrue(refCount
== 0);
1318 if (refCount
!= 0) {
1319 if (getLatch
== null) {
1320 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get());
1322 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get() + (NO_OF_THREADS
));
1327 CustomInnerRegionObserver
.getCdl().get().countDown();
1330 private static class MultiGetThread
extends Thread
{
1331 private final Table table
;
1332 private final List
<Get
> gets
= new ArrayList
<>();
1333 public MultiGetThread(Table table
) {
1338 gets
.add(new Get(ROW
));
1339 gets
.add(new Get(ROW1
));
1341 CustomInnerRegionObserver
.getCdl().set(latch
);
1342 Result
[] r
= table
.get(gets
);
1343 assertTrue(Bytes
.equals(r
[0].getRow(), ROW
));
1344 assertTrue(Bytes
.equals(r
[1].getRow(), ROW1
));
1345 } catch (IOException e
) {
1350 private static class GetThread
extends Thread
{
1351 private final Table table
;
1352 private final boolean tracker
;
1353 private final boolean multipleCFs
;
1355 public GetThread(Table table
, boolean tracker
, boolean multipleCFs
) {
1357 this.tracker
= tracker
;
1358 this.multipleCFs
= multipleCFs
;
1365 } catch (IOException e
) {
1370 private void initiateGet(Table table
) throws IOException
{
1371 Get get
= new Get(ROW
);
1375 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 3));
1376 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 8));
1377 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 9));
1379 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 900));
1381 get
.addColumn(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3));
1382 get
.addColumn(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8));
1383 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9));
1385 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 900));
1388 CustomInnerRegionObserver
.getCdl().set(latch
);
1389 Result r
= table
.get(get
);
1390 System
.out
.println(r
);
1392 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1393 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1396 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 3)), data2
));
1397 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 8)), data2
));
1398 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 9)), data2
));
1400 assertTrue(Bytes
.equals(
1401 r
.getValue(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3)),
1403 assertTrue(Bytes
.equals(
1404 r
.getValue(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8)),
1406 assertTrue(Bytes
.equals(
1407 r
.getValue(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9)),
1414 private static class ScanThread
extends Thread
{
1415 private final Table table
;
1416 private final boolean reverse
;
1418 public ScanThread(Table table
, boolean reverse
) {
1420 this.reverse
= reverse
;
1426 initiateScan(table
);
1427 } catch (IOException e
) {
1432 private void initiateScan(Table table
) throws IOException
{
1433 Scan scan
= new Scan();
1435 scan
.setReversed(true);
1437 CustomInnerRegionObserver
.getCdl().set(latch
);
1438 ResultScanner resScanner
= table
.getScanner(scan
);
1439 int i
= (reverse ? ROWS
.length
- 1 : 0);
1440 boolean resultFound
= false;
1441 for (Result result
: resScanner
) {
1443 System
.out
.println(result
);
1445 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1448 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1452 assertTrue(resultFound
);
1456 private void waitForStoreFileCount(Store store
, int count
, int timeout
)
1457 throws InterruptedException
{
1458 long start
= System
.currentTimeMillis();
1459 while (start
+ timeout
> System
.currentTimeMillis() && store
.getStorefilesCount() != count
) {
1462 System
.out
.println("start=" + start
+ ", now=" + System
.currentTimeMillis() + ", cur="
1463 + store
.getStorefilesCount());
1464 assertEquals(count
, store
.getStorefilesCount());
1467 private static class CustomScanner
implements RegionScanner
{
1469 private RegionScanner delegate
;
1471 public CustomScanner(RegionScanner delegate
) {
1472 this.delegate
= delegate
;
1476 public boolean next(List
<Cell
> results
) throws IOException
{
1477 return delegate
.next(results
);
1481 public boolean next(List
<Cell
> result
, ScannerContext scannerContext
) throws IOException
{
1482 return delegate
.next(result
, scannerContext
);
1486 public boolean nextRaw(List
<Cell
> result
) throws IOException
{
1487 return delegate
.nextRaw(result
);
1491 public boolean nextRaw(List
<Cell
> result
, ScannerContext context
) throws IOException
{
1492 boolean nextRaw
= delegate
.nextRaw(result
, context
);
1493 if (compactionLatch
!= null && compactionLatch
.getCount() > 0) {
1495 compactionLatch
.await();
1496 } catch (InterruptedException ie
) {
1500 if (CustomInnerRegionObserver
.throwException
.get()) {
1501 if (exceptionLatch
.getCount() > 0) {
1503 exceptionLatch
.await();
1504 } catch (InterruptedException e
) {
1506 throw new IOException("throw exception");
1513 public void close() throws IOException
{
1518 public HRegionInfo
getRegionInfo() {
1519 return delegate
.getRegionInfo();
1523 public boolean isFilterDone() throws IOException
{
1524 return delegate
.isFilterDone();
1528 public boolean reseek(byte[] row
) throws IOException
{
1533 public long getMaxResultSize() {
1534 return delegate
.getMaxResultSize();
1538 public long getMvccReadPoint() {
1539 return delegate
.getMvccReadPoint();
1543 public int getBatch() {
1544 return delegate
.getBatch();
1548 public void shipped() throws IOException
{
1549 this.delegate
.shipped();
1553 public static class CustomInnerRegionObserverWrapper
extends CustomInnerRegionObserver
{
1555 public RegionScanner
postScannerOpen(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1556 Scan scan
, RegionScanner s
) throws IOException
{
1557 return new CustomScanner(s
);
1561 public static class CustomInnerRegionObserver
implements RegionObserver
{
1562 static final AtomicLong sleepTime
= new AtomicLong(0);
1563 static final AtomicBoolean slowDownNext
= new AtomicBoolean(false);
1564 static final AtomicInteger countOfNext
= new AtomicInteger(0);
1565 static final AtomicInteger countOfGets
= new AtomicInteger(0);
1566 static final AtomicBoolean waitForGets
= new AtomicBoolean(false);
1567 static final AtomicBoolean throwException
= new AtomicBoolean(false);
1568 private static final AtomicReference
<CountDownLatch
> cdl
= new AtomicReference
<>(
1569 new CountDownLatch(0));
1572 public boolean postScannerNext(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1573 InternalScanner s
, List
<Result
> results
, int limit
, boolean hasMore
) throws IOException
{
1574 slowdownCode(e
, false);
1575 if (getLatch
!= null && getLatch
.getCount() > 0) {
1578 } catch (InterruptedException e1
) {
1585 public void postGetOp(ObserverContext
<RegionCoprocessorEnvironment
> e
, Get get
,
1586 List
<Cell
> results
) throws IOException
{
1587 slowdownCode(e
, true);
1590 public static AtomicReference
<CountDownLatch
> getCdl() {
1594 private void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
1596 CountDownLatch latch
= getCdl().get();
1598 System
.out
.println(latch
.getCount() + " is the count " + isGet
);
1599 if (latch
.getCount() > 0) {
1601 countOfGets
.incrementAndGet();
1603 countOfNext
.incrementAndGet();
1605 LOG
.info("Waiting for the counterCountDownLatch");
1606 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
1607 if (latch
.getCount() > 0) {
1608 throw new RuntimeException("Can't wait more");
1611 } catch (InterruptedException e1
) {