2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Iterator
;
27 import java
.util
.List
;
28 import java
.util
.Optional
;
29 import java
.util
.concurrent
.CountDownLatch
;
30 import java
.util
.concurrent
.TimeUnit
;
31 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
32 import java
.util
.concurrent
.atomic
.AtomicInteger
;
33 import java
.util
.concurrent
.atomic
.AtomicReference
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.Cell
;
36 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
37 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
38 import org
.apache
.hadoop
.hbase
.HConstants
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
42 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
44 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
45 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
46 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
47 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
48 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheKey
;
49 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
50 import org
.apache
.hadoop
.hbase
.io
.hfile
.CachedBlock
;
51 import org
.apache
.hadoop
.hbase
.io
.hfile
.CombinedBlockCache
;
52 import org
.apache
.hadoop
.hbase
.io
.hfile
.bucket
.BucketCache
;
53 import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
54 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
55 import org
.apache
.hadoop
.hbase
.regionserver
.HStore
;
56 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
57 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
58 import org
.apache
.hadoop
.hbase
.regionserver
.ScannerContext
;
59 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
60 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
61 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
62 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
63 import org
.junit
.After
;
64 import org
.junit
.AfterClass
;
65 import org
.junit
.Before
;
66 import org
.junit
.BeforeClass
;
67 import org
.junit
.ClassRule
;
68 import org
.junit
.Rule
;
69 import org
.junit
.Test
;
70 import org
.junit
.experimental
.categories
.Category
;
71 import org
.junit
.rules
.TestName
;
72 import org
.slf4j
.Logger
;
73 import org
.slf4j
.LoggerFactory
;
75 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Iterables
;
77 @Category({ LargeTests
.class, ClientTests
.class })
78 @SuppressWarnings("deprecation")
79 public class TestBlockEvictionFromClient
{
82 public static final HBaseClassTestRule CLASS_RULE
=
83 HBaseClassTestRule
.forClass(TestBlockEvictionFromClient
.class);
85 private static final Logger LOG
= LoggerFactory
.getLogger(TestBlockEvictionFromClient
.class);
86 protected final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
87 static byte[][] ROWS
= new byte[2][];
88 private static int NO_OF_THREADS
= 3;
89 private static byte[] ROW
= Bytes
.toBytes("testRow");
90 private static byte[] ROW1
= Bytes
.toBytes("testRow1");
91 private static byte[] ROW2
= Bytes
.toBytes("testRow2");
92 private static byte[] ROW3
= Bytes
.toBytes("testRow3");
93 private static byte[] FAMILY
= Bytes
.toBytes("testFamily");
94 private static byte[][] FAMILIES_1
= new byte[1][0];
95 private static byte[] QUALIFIER
= Bytes
.toBytes("testQualifier");
96 private static byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
97 private static byte[] data
= new byte[1000];
98 private static byte[] data2
= Bytes
.add(data
, data
);
99 protected static int SLAVES
= 1;
100 private static CountDownLatch latch
;
101 private static CountDownLatch getLatch
;
102 private static CountDownLatch compactionLatch
;
103 private static CountDownLatch exceptionLatch
;
106 public TestName name
= new TestName();
109 * @throws java.lang.Exception
112 public static void setUpBeforeClass() throws Exception
{
115 Configuration conf
= TEST_UTIL
.getConfiguration();
116 conf
.setStrings(CoprocessorHost
.REGION_COPROCESSOR_CONF_KEY
,
117 MultiRowMutationEndpoint
.class.getName());
118 conf
.setInt("hbase.regionserver.handler.count", 20);
119 conf
.setInt("hbase.bucketcache.size", 400);
120 conf
.setStrings(HConstants
.BUCKET_CACHE_IOENGINE_KEY
, "offheap");
121 conf
.setFloat("hfile.block.cache.size", 0.2f
);
122 conf
.setFloat("hbase.regionserver.global.memstore.size", 0.1f
);
123 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 0);// do not retry
124 conf
.setInt(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 5000);
125 FAMILIES_1
[0] = FAMILY
;
126 TEST_UTIL
.startMiniCluster(SLAVES
);
130 * @throws java.lang.Exception
133 public static void tearDownAfterClass() throws Exception
{
134 TEST_UTIL
.shutdownMiniCluster();
138 * @throws java.lang.Exception
141 public void setUp() throws Exception
{
142 CustomInnerRegionObserver
.waitForGets
.set(false);
143 CustomInnerRegionObserver
.countOfNext
.set(0);
144 CustomInnerRegionObserver
.countOfGets
.set(0);
148 * @throws java.lang.Exception
151 public void tearDown() throws Exception
{
153 while (latch
.getCount() > 0) {
157 if (getLatch
!= null) {
158 getLatch
.countDown();
160 if (compactionLatch
!= null) {
161 compactionLatch
.countDown();
163 if (exceptionLatch
!= null) {
164 exceptionLatch
.countDown();
168 compactionLatch
= null;
169 exceptionLatch
= null;
170 CustomInnerRegionObserver
.throwException
.set(false);
171 // Clean up the tables for every test case
172 TableName
[] listTableNames
= TEST_UTIL
.getAdmin().listTableNames();
173 for (TableName tableName
: listTableNames
) {
174 if (!tableName
.isSystemTable()) {
175 TEST_UTIL
.getAdmin().disableTable(tableName
);
176 TEST_UTIL
.getAdmin().deleteTable(tableName
);
182 public void testBlockEvictionWithParallelScans() throws Exception
{
185 latch
= new CountDownLatch(1);
186 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
187 // Create a table with block size as 1024
188 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
189 CustomInnerRegionObserver
.class.getName());
190 // get the block cache and region
191 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
192 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
193 HRegion region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
)
194 .getRegion(regionName
);
195 HStore store
= region
.getStores().iterator().next();
196 CacheConfig cacheConf
= store
.getCacheConfig();
197 cacheConf
.setCacheDataOnWrite(true);
198 cacheConf
.setEvictOnClose(true);
199 BlockCache cache
= cacheConf
.getBlockCache().get();
201 // insert data. 2 Rows are added
202 Put put
= new Put(ROW
);
203 put
.addColumn(FAMILY
, QUALIFIER
, data
);
206 put
.addColumn(FAMILY
, QUALIFIER
, data
);
208 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
209 // data was in memstore so don't expect any changes
211 // Should create one Hfile with 2 blocks
214 // Create three sets of scan
215 ScanThread
[] scanThreads
= initiateScan(table
, false);
217 checkForBlockEviction(cache
, false, false);
218 for (ScanThread thread
: scanThreads
) {
221 // CustomInnerRegionObserver.sleepTime.set(0);
222 Iterator
<CachedBlock
> iterator
= cache
.iterator();
223 iterateBlockCache(cache
, iterator
);
224 // read the data and expect same blocks, one new hit, no misses
225 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
226 iterator
= cache
.iterator();
227 iterateBlockCache(cache
, iterator
);
228 // Check how this miss is happening
229 // insert a second column, read the row, no new blocks, 3 new hits
230 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
231 byte[] data2
= Bytes
.add(data
, data
);
233 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
235 Result r
= table
.get(new Get(ROW
));
236 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
237 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
238 iterator
= cache
.iterator();
239 iterateBlockCache(cache
, iterator
);
240 // flush, one new block
241 System
.out
.println("Flushing cache");
243 iterator
= cache
.iterator();
244 iterateBlockCache(cache
, iterator
);
245 // compact, net minus two blocks, two hits, no misses
246 System
.out
.println("Compacting");
247 assertEquals(2, store
.getStorefilesCount());
248 store
.triggerMajorCompaction();
249 region
.compact(true);
250 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
251 assertEquals(1, store
.getStorefilesCount());
252 iterator
= cache
.iterator();
253 iterateBlockCache(cache
, iterator
);
254 // read the row, this should be a cache miss because we don't cache data
255 // blocks on compaction
256 r
= table
.get(new Get(ROW
));
257 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
258 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
259 iterator
= cache
.iterator();
260 iterateBlockCache(cache
, iterator
);
269 public void testParallelGetsAndScans() throws IOException
, InterruptedException
{
272 latch
= new CountDownLatch(2);
273 // Check if get() returns blocks on its close() itself
274 getLatch
= new CountDownLatch(1);
275 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
276 // Create KV that will give you two blocks
277 // Create a table with block size as 1024
278 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
279 CustomInnerRegionObserver
.class.getName());
280 // get the block cache and region
281 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
282 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
284 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
285 HStore store
= region
.getStores().iterator().next();
286 CacheConfig cacheConf
= store
.getCacheConfig();
287 cacheConf
.setCacheDataOnWrite(true);
288 cacheConf
.setEvictOnClose(true);
289 BlockCache cache
= cacheConf
.getBlockCache().get();
293 System
.out
.println("Flushing cache");
294 // Should create one Hfile with 2 blocks
296 // Create three sets of scan
297 CustomInnerRegionObserver
.waitForGets
.set(true);
298 ScanThread
[] scanThreads
= initiateScan(table
, false);
299 // Create three sets of gets
300 GetThread
[] getThreads
= initiateGet(table
, false, false);
301 checkForBlockEviction(cache
, false, false);
302 CustomInnerRegionObserver
.waitForGets
.set(false);
303 checkForBlockEviction(cache
, false, false);
304 for (GetThread thread
: getThreads
) {
307 // Verify whether the gets have returned the blocks that it had
308 CustomInnerRegionObserver
.waitForGets
.set(true);
309 // giving some time for the block to be decremented
310 checkForBlockEviction(cache
, true, false);
311 getLatch
.countDown();
312 for (ScanThread thread
: scanThreads
) {
315 System
.out
.println("Scans should have returned the bloks");
316 // Check with either true or false
317 CustomInnerRegionObserver
.waitForGets
.set(false);
318 // The scan should also have released the blocks by now
319 checkForBlockEviction(cache
, true, true);
328 public void testGetWithCellsInDifferentFiles() throws IOException
, InterruptedException
{
331 latch
= new CountDownLatch(1);
332 // Check if get() returns blocks on its close() itself
333 getLatch
= new CountDownLatch(1);
334 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
335 // Create KV that will give you two blocks
336 // Create a table with block size as 1024
337 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
338 CustomInnerRegionObserver
.class.getName());
339 // get the block cache and region
340 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
341 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
343 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
344 HStore store
= region
.getStores().iterator().next();
345 CacheConfig cacheConf
= store
.getCacheConfig();
346 cacheConf
.setCacheDataOnWrite(true);
347 cacheConf
.setEvictOnClose(true);
348 BlockCache cache
= cacheConf
.getBlockCache().get();
350 Put put
= new Put(ROW
);
351 put
.addColumn(FAMILY
, QUALIFIER
, data
);
355 put
.addColumn(FAMILY
, QUALIFIER
, data
);
358 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
360 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
364 System
.out
.println("Flushing cache");
365 // Should create one Hfile with 2 blocks
366 CustomInnerRegionObserver
.waitForGets
.set(true);
367 // Create three sets of gets
368 GetThread
[] getThreads
= initiateGet(table
, false, false);
370 CustomInnerRegionObserver
.getCdl().get().countDown();
371 for (GetThread thread
: getThreads
) {
374 // Verify whether the gets have returned the blocks that it had
375 CustomInnerRegionObserver
.waitForGets
.set(true);
376 // giving some time for the block to be decremented
377 checkForBlockEviction(cache
, true, false);
378 getLatch
.countDown();
379 System
.out
.println("Gets should have returned the bloks");
388 // TODO : check how block index works here
389 public void testGetsWithMultiColumnsAndExplicitTracker()
390 throws IOException
, InterruptedException
{
393 latch
= new CountDownLatch(1);
394 // Check if get() returns blocks on its close() itself
395 getLatch
= new CountDownLatch(1);
396 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
397 // Create KV that will give you two blocks
398 // Create a table with block size as 1024
399 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
400 CustomInnerRegionObserver
.class.getName());
401 // get the block cache and region
402 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
403 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
405 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
406 BlockCache cache
= setCacheProperties(region
);
407 Put put
= new Put(ROW
);
408 put
.addColumn(FAMILY
, QUALIFIER
, data
);
412 put
.addColumn(FAMILY
, QUALIFIER
, data
);
415 for (int i
= 1; i
< 10; i
++) {
417 put
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + i
), data2
);
423 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
425 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
429 System
.out
.println("Flushing cache");
430 // Should create one Hfile with 2 blocks
431 CustomInnerRegionObserver
.waitForGets
.set(true);
432 // Create three sets of gets
433 GetThread
[] getThreads
= initiateGet(table
, true, false);
435 Iterator
<CachedBlock
> iterator
= cache
.iterator();
436 boolean usedBlocksFound
= false;
438 int noOfBlocksWithRef
= 0;
439 while (iterator
.hasNext()) {
440 CachedBlock next
= iterator
.next();
441 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
442 if (cache
instanceof BucketCache
) {
443 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
444 } else if (cache
instanceof CombinedBlockCache
) {
445 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
450 // Blocks will be with count 3
451 System
.out
.println("The refCount is " + refCount
);
452 assertEquals(NO_OF_THREADS
, refCount
);
453 usedBlocksFound
= true;
457 assertTrue(usedBlocksFound
);
458 // the number of blocks referred
459 assertEquals(10, noOfBlocksWithRef
);
460 CustomInnerRegionObserver
.getCdl().get().countDown();
461 for (GetThread thread
: getThreads
) {
464 // Verify whether the gets have returned the blocks that it had
465 CustomInnerRegionObserver
.waitForGets
.set(true);
466 // giving some time for the block to be decremented
467 checkForBlockEviction(cache
, true, false);
468 getLatch
.countDown();
469 System
.out
.println("Gets should have returned the bloks");
478 public void testGetWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
481 latch
= new CountDownLatch(1);
482 // Check if get() returns blocks on its close() itself
483 getLatch
= new CountDownLatch(1);
484 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
485 // Create KV that will give you two blocks
486 // Create a table with block size as 1024
487 byte[][] fams
= new byte[10][];
489 for (int i
= 1; i
< 10; i
++) {
490 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
492 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
493 CustomInnerRegionObserver
.class.getName());
494 // get the block cache and region
495 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
496 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
498 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
499 BlockCache cache
= setCacheProperties(region
);
501 Put put
= new Put(ROW
);
502 put
.addColumn(FAMILY
, QUALIFIER
, data
);
506 put
.addColumn(FAMILY
, QUALIFIER
, data
);
509 for (int i
= 1; i
< 10; i
++) {
511 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
518 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
520 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
524 System
.out
.println("Flushing cache");
525 // Should create one Hfile with 2 blocks
526 CustomInnerRegionObserver
.waitForGets
.set(true);
527 // Create three sets of gets
528 GetThread
[] getThreads
= initiateGet(table
, true, true);
530 Iterator
<CachedBlock
> iterator
= cache
.iterator();
531 boolean usedBlocksFound
= false;
533 int noOfBlocksWithRef
= 0;
534 while (iterator
.hasNext()) {
535 CachedBlock next
= iterator
.next();
536 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
537 if (cache
instanceof BucketCache
) {
538 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
539 } else if (cache
instanceof CombinedBlockCache
) {
540 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
545 // Blocks will be with count 3
546 System
.out
.println("The refCount is " + refCount
);
547 assertEquals(NO_OF_THREADS
, refCount
);
548 usedBlocksFound
= true;
552 assertTrue(usedBlocksFound
);
553 // the number of blocks referred
554 assertEquals(3, noOfBlocksWithRef
);
555 CustomInnerRegionObserver
.getCdl().get().countDown();
556 for (GetThread thread
: getThreads
) {
559 // Verify whether the gets have returned the blocks that it had
560 CustomInnerRegionObserver
.waitForGets
.set(true);
561 // giving some time for the block to be decremented
562 checkForBlockEviction(cache
, true, false);
563 getLatch
.countDown();
564 System
.out
.println("Gets should have returned the bloks");
573 public void testBlockRefCountAfterSplits() throws IOException
, InterruptedException
{
576 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
577 TableDescriptor desc
= TEST_UTIL
.createTableDescriptor(tableName
);
578 // This test expects rpc refcount of cached data blocks to be 0 after split. After split,
579 // two daughter regions are opened and a compaction is scheduled to get rid of reference
580 // of the parent region hfiles. Compaction will increase refcount of cached data blocks by 1.
581 // It is flakey since compaction can kick in anytime. To solve this issue, table is created
582 // with compaction disabled.
583 table
= TEST_UTIL
.createTable(
584 TableDescriptorBuilder
.newBuilder(desc
).setCompactionEnabled(false).build(), FAMILIES_1
,
585 null, BloomType
.ROW
, 1024, null);
586 // get the block cache and region
587 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
588 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
590 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
591 HStore store
= region
.getStores().iterator().next();
592 CacheConfig cacheConf
= store
.getCacheConfig();
593 cacheConf
.setEvictOnClose(true);
594 BlockCache cache
= cacheConf
.getBlockCache().get();
596 Put put
= new Put(ROW
);
597 put
.addColumn(FAMILY
, QUALIFIER
, data
);
601 put
.addColumn(FAMILY
, QUALIFIER
, data
);
604 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
606 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
609 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
612 ServerName rs
= Iterables
.getOnlyElement(TEST_UTIL
.getAdmin().getRegionServers());
613 int regionCount
= TEST_UTIL
.getAdmin().getRegions(rs
).size();
614 LOG
.info("About to SPLIT on {} {}, count={}", Bytes
.toString(ROW1
), region
.getRegionInfo(),
616 TEST_UTIL
.getAdmin().split(tableName
, ROW1
);
618 TEST_UTIL
.waitFor(60000, () -> TEST_UTIL
.getAdmin().getRegions(rs
).size() > regionCount
);
619 region
.compact(true);
620 List
<HRegion
> regions
= TEST_UTIL
.getMiniHBaseCluster().getRegionServer(rs
).getRegions();
621 for (HRegion r
: regions
) {
622 LOG
.info("" + r
.getCompactionState());
623 TEST_UTIL
.waitFor(30000, () -> r
.getCompactionState().equals(CompactionState
.NONE
));
625 LOG
.info("Split finished, is region closed {} {}", region
.isClosed(), cache
);
626 Iterator
<CachedBlock
> iterator
= cache
.iterator();
627 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
628 // should be closed inorder to return those blocks
629 iterateBlockCache(cache
, iterator
);
638 public void testMultiGets() throws IOException
, InterruptedException
{
641 latch
= new CountDownLatch(2);
642 // Check if get() returns blocks on its close() itself
643 getLatch
= new CountDownLatch(1);
644 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
645 // Create KV that will give you two blocks
646 // Create a table with block size as 1024
647 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
648 CustomInnerRegionObserver
.class.getName());
649 // get the block cache and region
650 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
651 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
653 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
654 HStore store
= region
.getStores().iterator().next();
655 CacheConfig cacheConf
= store
.getCacheConfig();
656 cacheConf
.setCacheDataOnWrite(true);
657 cacheConf
.setEvictOnClose(true);
658 BlockCache cache
= cacheConf
.getBlockCache().get();
660 Put put
= new Put(ROW
);
661 put
.addColumn(FAMILY
, QUALIFIER
, data
);
665 put
.addColumn(FAMILY
, QUALIFIER
, data
);
668 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
670 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
674 System
.out
.println("Flushing cache");
675 // Should create one Hfile with 2 blocks
676 CustomInnerRegionObserver
.waitForGets
.set(true);
677 // Create three sets of gets
678 MultiGetThread
[] getThreads
= initiateMultiGet(table
);
681 Iterator
<CachedBlock
> iterator
= cache
.iterator();
682 boolean foundNonZeroBlock
= false;
683 while (iterator
.hasNext()) {
684 CachedBlock next
= iterator
.next();
685 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
686 if (cache
instanceof BucketCache
) {
687 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
688 } else if (cache
instanceof CombinedBlockCache
) {
689 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
694 assertEquals(NO_OF_THREADS
, refCount
);
695 foundNonZeroBlock
= true;
698 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock
);
699 CustomInnerRegionObserver
.getCdl().get().countDown();
700 CustomInnerRegionObserver
.getCdl().get().countDown();
701 for (MultiGetThread thread
: getThreads
) {
704 // Verify whether the gets have returned the blocks that it had
705 CustomInnerRegionObserver
.waitForGets
.set(true);
706 // giving some time for the block to be decremented
707 iterateBlockCache(cache
, iterator
);
708 getLatch
.countDown();
709 System
.out
.println("Gets should have returned the bloks");
717 public void testScanWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
720 latch
= new CountDownLatch(1);
721 // Check if get() returns blocks on its close() itself
722 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
723 // Create KV that will give you two blocks
724 // Create a table with block size as 1024
725 byte[][] fams
= new byte[10][];
727 for (int i
= 1; i
< 10; i
++) {
728 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
730 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
731 CustomInnerRegionObserver
.class.getName());
732 // get the block cache and region
733 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
734 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
736 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
737 BlockCache cache
= setCacheProperties(region
);
739 Put put
= new Put(ROW
);
740 put
.addColumn(FAMILY
, QUALIFIER
, data
);
744 put
.addColumn(FAMILY
, QUALIFIER
, data
);
747 for (int i
= 1; i
< 10; i
++) {
749 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
756 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
758 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
762 System
.out
.println("Flushing cache");
763 // Should create one Hfile with 2 blocks
764 // Create three sets of gets
765 ScanThread
[] scanThreads
= initiateScan(table
, true);
767 Iterator
<CachedBlock
> iterator
= cache
.iterator();
768 boolean usedBlocksFound
= false;
770 int noOfBlocksWithRef
= 0;
771 while (iterator
.hasNext()) {
772 CachedBlock next
= iterator
.next();
773 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
774 if (cache
instanceof BucketCache
) {
775 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
776 } else if (cache
instanceof CombinedBlockCache
) {
777 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
782 // Blocks will be with count 3
783 System
.out
.println("The refCount is " + refCount
);
784 assertEquals(NO_OF_THREADS
, refCount
);
785 usedBlocksFound
= true;
789 assertTrue(usedBlocksFound
);
790 // the number of blocks referred
791 assertEquals(12, noOfBlocksWithRef
);
792 CustomInnerRegionObserver
.getCdl().get().countDown();
793 for (ScanThread thread
: scanThreads
) {
796 // giving some time for the block to be decremented
797 checkForBlockEviction(cache
, true, false);
805 private BlockCache
setCacheProperties(HRegion region
) {
806 Iterator
<HStore
> strItr
= region
.getStores().iterator();
807 BlockCache cache
= null;
808 while (strItr
.hasNext()) {
809 HStore store
= strItr
.next();
810 CacheConfig cacheConf
= store
.getCacheConfig();
811 cacheConf
.setCacheDataOnWrite(true);
812 cacheConf
.setEvictOnClose(true);
814 cache
= cacheConf
.getBlockCache().get();
820 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException
,
821 InterruptedException
{
824 latch
= new CountDownLatch(2);
825 // Check if get() returns blocks on its close() itself
826 getLatch
= new CountDownLatch(1);
827 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
828 // Create KV that will give you two blocks
829 // Create a table with block size as 1024
830 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
831 CustomInnerRegionObserverWrapper
.class.getName());
832 // get the block cache and region
833 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
834 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
836 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
837 HStore store
= region
.getStores().iterator().next();
838 CacheConfig cacheConf
= store
.getCacheConfig();
839 cacheConf
.setCacheDataOnWrite(true);
840 cacheConf
.setEvictOnClose(true);
841 BlockCache cache
= cacheConf
.getBlockCache().get();
843 // insert data. 2 Rows are added
846 System
.out
.println("Flushing cache");
847 // Should create one Hfile with 2 blocks
849 // CustomInnerRegionObserver.sleepTime.set(5000);
850 // Create three sets of scan
851 CustomInnerRegionObserver
.waitForGets
.set(true);
852 ScanThread
[] scanThreads
= initiateScan(table
, false);
853 // Create three sets of gets
854 GetThread
[] getThreads
= initiateGet(table
, false, false);
855 // The block would have been decremented for the scan case as it was
857 // before even the postNext hook gets executed.
858 // giving some time for the block to be decremented
860 CustomInnerRegionObserver
.waitForGets
.set(false);
861 checkForBlockEviction(cache
, false, false);
862 // countdown the latch
863 CustomInnerRegionObserver
.getCdl().get().countDown();
864 for (GetThread thread
: getThreads
) {
867 getLatch
.countDown();
868 for (ScanThread thread
: scanThreads
) {
879 public void testScanWithCompaction() throws IOException
, InterruptedException
{
880 testScanWithCompactionInternals(name
.getMethodName(), false);
884 public void testReverseScanWithCompaction() throws IOException
, InterruptedException
{
885 testScanWithCompactionInternals(name
.getMethodName(), true);
888 private void testScanWithCompactionInternals(String tableNameStr
, boolean reversed
)
889 throws IOException
, InterruptedException
{
892 latch
= new CountDownLatch(1);
893 compactionLatch
= new CountDownLatch(1);
894 TableName tableName
= TableName
.valueOf(tableNameStr
);
895 // Create a table with block size as 1024
896 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
897 CustomInnerRegionObserverWrapper
.class.getName());
898 // get the block cache and region
899 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
900 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
902 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
903 HStore store
= region
.getStores().iterator().next();
904 CacheConfig cacheConf
= store
.getCacheConfig();
905 cacheConf
.setCacheDataOnWrite(true);
906 cacheConf
.setEvictOnClose(true);
907 BlockCache cache
= cacheConf
.getBlockCache().get();
909 // insert data. 2 Rows are added
910 Put put
= new Put(ROW
);
911 put
.addColumn(FAMILY
, QUALIFIER
, data
);
914 put
.addColumn(FAMILY
, QUALIFIER
, data
);
916 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
917 // Should create one Hfile with 2 blocks
919 // read the data and expect same blocks, one new hit, no misses
921 // Check how this miss is happening
922 // insert a second column, read the row, no new blocks, 3 new hits
923 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
924 byte[] data2
= Bytes
.add(data
, data
);
926 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
928 // flush, one new block
929 System
.out
.println("Flushing cache");
931 Iterator
<CachedBlock
> iterator
= cache
.iterator();
932 iterateBlockCache(cache
, iterator
);
933 // Create three sets of scan
934 ScanThread
[] scanThreads
= initiateScan(table
, reversed
);
936 iterator
= cache
.iterator();
937 boolean usedBlocksFound
= false;
938 while (iterator
.hasNext()) {
939 CachedBlock next
= iterator
.next();
940 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
941 if (cache
instanceof BucketCache
) {
942 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
943 } else if (cache
instanceof CombinedBlockCache
) {
944 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
949 // Blocks will be with count 3
950 assertEquals(NO_OF_THREADS
, refCount
);
951 usedBlocksFound
= true;
954 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
955 usedBlocksFound
= false;
956 System
.out
.println("Compacting");
957 assertEquals(2, store
.getStorefilesCount());
958 store
.triggerMajorCompaction();
959 region
.compact(true);
960 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
961 assertEquals(1, store
.getStorefilesCount());
962 // Even after compaction is done we will have some blocks that cannot
963 // be evicted this is because the scan is still referencing them
964 iterator
= cache
.iterator();
965 while (iterator
.hasNext()) {
966 CachedBlock next
= iterator
.next();
967 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
968 if (cache
instanceof BucketCache
) {
969 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
970 } else if (cache
instanceof CombinedBlockCache
) {
971 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
976 // Blocks will be with count 3 as they are not yet cleared
977 assertEquals(NO_OF_THREADS
, refCount
);
978 usedBlocksFound
= true;
981 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
982 // Should not throw exception
983 compactionLatch
.countDown();
985 for (ScanThread thread
: scanThreads
) {
988 // by this time all blocks should have been evicted
989 iterator
= cache
.iterator();
990 iterateBlockCache(cache
, iterator
);
991 Result r
= table
.get(new Get(ROW
));
992 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
993 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
994 // The gets would be working on new blocks
995 iterator
= cache
.iterator();
996 iterateBlockCache(cache
, iterator
);
1005 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
1006 throws IOException
, InterruptedException
{
1007 // do flush and scan in parallel
1010 latch
= new CountDownLatch(1);
1011 compactionLatch
= new CountDownLatch(1);
1012 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1013 // Create a table with block size as 1024
1014 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
1015 CustomInnerRegionObserverWrapper
.class.getName());
1016 // get the block cache and region
1017 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1018 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
1020 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
1021 HStore store
= region
.getStores().iterator().next();
1022 CacheConfig cacheConf
= store
.getCacheConfig();
1023 cacheConf
.setCacheDataOnWrite(true);
1024 cacheConf
.setEvictOnClose(true);
1025 BlockCache cache
= cacheConf
.getBlockCache().get();
1027 // insert data. 2 Rows are added
1028 Put put
= new Put(ROW
);
1029 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1031 put
= new Put(ROW1
);
1032 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1034 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
1035 // Should create one Hfile with 2 blocks
1037 // read the data and expect same blocks, one new hit, no misses
1039 // Check how this miss is happening
1040 // insert a second column, read the row, no new blocks, 3 new hits
1041 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1042 byte[] data2
= Bytes
.add(data
, data
);
1044 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1046 // flush, one new block
1047 System
.out
.println("Flushing cache");
1049 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1050 iterateBlockCache(cache
, iterator
);
1051 // Create three sets of scan
1052 ScanThread
[] scanThreads
= initiateScan(table
, false);
1054 iterator
= cache
.iterator();
1055 boolean usedBlocksFound
= false;
1056 while (iterator
.hasNext()) {
1057 CachedBlock next
= iterator
.next();
1058 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1059 if (cache
instanceof BucketCache
) {
1060 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1061 } else if (cache
instanceof CombinedBlockCache
) {
1062 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1066 if (refCount
!= 0) {
1067 // Blocks will be with count 3
1068 assertEquals(NO_OF_THREADS
, refCount
);
1069 usedBlocksFound
= true;
1072 // Make a put and do a flush
1073 QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1074 data2
= Bytes
.add(data
, data
);
1075 put
= new Put(ROW1
);
1076 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1078 // flush, one new block
1079 System
.out
.println("Flushing cache");
1081 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1082 usedBlocksFound
= false;
1083 System
.out
.println("Compacting");
1084 assertEquals(3, store
.getStorefilesCount());
1085 store
.triggerMajorCompaction();
1086 region
.compact(true);
1087 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
1088 assertEquals(1, store
.getStorefilesCount());
1089 // Even after compaction is done we will have some blocks that cannot
1090 // be evicted this is because the scan is still referencing them
1091 iterator
= cache
.iterator();
1092 while (iterator
.hasNext()) {
1093 CachedBlock next
= iterator
.next();
1094 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1095 if (cache
instanceof BucketCache
) {
1096 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1097 } else if (cache
instanceof CombinedBlockCache
) {
1098 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1102 if (refCount
!= 0) {
1103 // Blocks will be with count 3 as they are not yet cleared
1104 assertEquals(NO_OF_THREADS
, refCount
);
1105 usedBlocksFound
= true;
1108 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1109 // Should not throw exception
1110 compactionLatch
.countDown();
1112 for (ScanThread thread
: scanThreads
) {
1115 // by this time all blocks should have been evicted
1116 iterator
= cache
.iterator();
1117 // Since a flush and compaction happened after a scan started
1118 // we need to ensure that all the original blocks of the compacted file
1120 iterateBlockCache(cache
, iterator
);
1121 Result r
= table
.get(new Get(ROW
));
1122 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1123 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1124 // The gets would be working on new blocks
1125 iterator
= cache
.iterator();
1126 iterateBlockCache(cache
, iterator
);
1128 if (table
!= null) {
1136 public void testScanWithException() throws IOException
, InterruptedException
{
1139 latch
= new CountDownLatch(1);
1140 exceptionLatch
= new CountDownLatch(1);
1141 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1142 // Create KV that will give you two blocks
1143 // Create a table with block size as 1024
1144 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
1145 CustomInnerRegionObserverWrapper
.class.getName());
1146 // get the block cache and region
1147 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1148 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
1150 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
1151 HStore store
= region
.getStores().iterator().next();
1152 CacheConfig cacheConf
= store
.getCacheConfig();
1153 cacheConf
.setCacheDataOnWrite(true);
1154 cacheConf
.setEvictOnClose(true);
1155 BlockCache cache
= cacheConf
.getBlockCache().get();
1156 // insert data. 2 Rows are added
1159 System
.out
.println("Flushing cache");
1160 // Should create one Hfile with 2 blocks
1162 // CustomInnerRegionObserver.sleepTime.set(5000);
1163 CustomInnerRegionObserver
.throwException
.set(true);
1164 ScanThread
[] scanThreads
= initiateScan(table
, false);
1165 // The block would have been decremented for the scan case as it was
1167 // before even the postNext hook gets executed.
1168 // giving some time for the block to be decremented
1170 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1171 boolean usedBlocksFound
= false;
1173 while (iterator
.hasNext()) {
1174 CachedBlock next
= iterator
.next();
1175 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1176 if (cache
instanceof BucketCache
) {
1177 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1178 } else if (cache
instanceof CombinedBlockCache
) {
1179 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1183 if (refCount
!= 0) {
1184 // Blocks will be with count 3
1185 assertEquals(NO_OF_THREADS
, refCount
);
1186 usedBlocksFound
= true;
1189 assertTrue(usedBlocksFound
);
1190 exceptionLatch
.countDown();
1191 // countdown the latch
1192 CustomInnerRegionObserver
.getCdl().get().countDown();
1193 for (ScanThread thread
: scanThreads
) {
1196 iterator
= cache
.iterator();
1197 usedBlocksFound
= false;
1199 while (iterator
.hasNext()) {
1200 CachedBlock next
= iterator
.next();
1201 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1202 if (cache
instanceof BucketCache
) {
1203 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1204 } else if (cache
instanceof CombinedBlockCache
) {
1205 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1209 if (refCount
!= 0) {
1210 // Blocks will be with count 3
1211 assertEquals(NO_OF_THREADS
, refCount
);
1212 usedBlocksFound
= true;
1215 assertFalse(usedBlocksFound
);
1216 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1217 assertEquals(0, refCount
);
1219 if (table
!= null) {
1225 private void iterateBlockCache(BlockCache cache
, Iterator
<CachedBlock
> iterator
) {
1227 while (iterator
.hasNext()) {
1228 CachedBlock next
= iterator
.next();
1229 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1230 if (cache
instanceof BucketCache
) {
1231 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1232 LOG
.info("BucketCache {} {}", cacheKey
, refCount
);
1233 } else if (cache
instanceof CombinedBlockCache
) {
1234 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1235 LOG
.info("CombinedBlockCache {} {}", cacheKey
, refCount
);
1239 assertEquals(0, refCount
);
1243 private void insertData(Table table
) throws IOException
{
1244 Put put
= new Put(ROW
);
1245 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1247 put
= new Put(ROW1
);
1248 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1250 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1252 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1256 private ScanThread
[] initiateScan(Table table
, boolean reverse
) throws IOException
,
1257 InterruptedException
{
1258 ScanThread
[] scanThreads
= new ScanThread
[NO_OF_THREADS
];
1259 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1260 scanThreads
[i
] = new ScanThread(table
, reverse
);
1262 for (ScanThread thread
: scanThreads
) {
1268 private GetThread
[] initiateGet(Table table
, boolean tracker
, boolean multipleCFs
)
1269 throws IOException
, InterruptedException
{
1270 GetThread
[] getThreads
= new GetThread
[NO_OF_THREADS
];
1271 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1272 getThreads
[i
] = new GetThread(table
, tracker
, multipleCFs
);
1274 for (GetThread thread
: getThreads
) {
1280 private MultiGetThread
[] initiateMultiGet(Table table
)
1281 throws IOException
, InterruptedException
{
1282 MultiGetThread
[] multiGetThreads
= new MultiGetThread
[NO_OF_THREADS
];
1283 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1284 multiGetThreads
[i
] = new MultiGetThread(table
);
1286 for (MultiGetThread thread
: multiGetThreads
) {
1289 return multiGetThreads
;
1292 private void checkForBlockEviction(BlockCache cache
, boolean getClosed
, boolean expectOnlyZero
)
1293 throws InterruptedException
{
1294 int counter
= NO_OF_THREADS
;
1295 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1296 // Because only one row is selected, it has only 2 blocks
1297 counter
= counter
- 1;
1298 while (CustomInnerRegionObserver
.countOfGets
.get() < NO_OF_THREADS
) {
1302 while (CustomInnerRegionObserver
.countOfNext
.get() < NO_OF_THREADS
) {
1306 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1308 while (iterator
.hasNext()) {
1309 CachedBlock next
= iterator
.next();
1310 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1311 if (cache
instanceof BucketCache
) {
1312 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1313 } else if (cache
instanceof CombinedBlockCache
) {
1314 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1318 System
.out
.println(" the refcount is " + refCount
+ " block is " + cacheKey
);
1319 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1320 if (expectOnlyZero
) {
1321 assertTrue(refCount
== 0);
1323 if (refCount
!= 0) {
1324 // Because the scan would have also touched up on these blocks but
1326 // would have touched
1329 // If get has closed only the scan's blocks would be available
1330 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get());
1332 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get() + (NO_OF_THREADS
));
1336 // Because the get would have also touched up on these blocks but it
1337 // would have touched
1338 // upon only 2 additionally
1339 if (expectOnlyZero
) {
1340 assertTrue(refCount
== 0);
1342 if (refCount
!= 0) {
1343 if (getLatch
== null) {
1344 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get());
1346 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get() + (NO_OF_THREADS
));
1351 CustomInnerRegionObserver
.getCdl().get().countDown();
1354 private static class MultiGetThread
extends Thread
{
1355 private final Table table
;
1356 private final List
<Get
> gets
= new ArrayList
<>();
1357 public MultiGetThread(Table table
) {
1362 gets
.add(new Get(ROW
));
1363 gets
.add(new Get(ROW1
));
1365 CustomInnerRegionObserver
.getCdl().set(latch
);
1366 Result
[] r
= table
.get(gets
);
1367 assertTrue(Bytes
.equals(r
[0].getRow(), ROW
));
1368 assertTrue(Bytes
.equals(r
[1].getRow(), ROW1
));
1369 } catch (IOException e
) {
1374 private static class GetThread
extends Thread
{
1375 private final Table table
;
1376 private final boolean tracker
;
1377 private final boolean multipleCFs
;
1379 public GetThread(Table table
, boolean tracker
, boolean multipleCFs
) {
1381 this.tracker
= tracker
;
1382 this.multipleCFs
= multipleCFs
;
1389 } catch (IOException e
) {
1394 private void initiateGet(Table table
) throws IOException
{
1395 Get get
= new Get(ROW
);
1399 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 3));
1400 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 8));
1401 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 9));
1403 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 900));
1405 get
.addColumn(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3));
1406 get
.addColumn(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8));
1407 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9));
1409 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 900));
1412 CustomInnerRegionObserver
.getCdl().set(latch
);
1413 Result r
= table
.get(get
);
1414 System
.out
.println(r
);
1416 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1417 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1420 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 3)), data2
));
1421 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 8)), data2
));
1422 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 9)), data2
));
1424 assertTrue(Bytes
.equals(
1425 r
.getValue(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3)),
1427 assertTrue(Bytes
.equals(
1428 r
.getValue(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8)),
1430 assertTrue(Bytes
.equals(
1431 r
.getValue(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9)),
1438 private static class ScanThread
extends Thread
{
1439 private final Table table
;
1440 private final boolean reverse
;
1442 public ScanThread(Table table
, boolean reverse
) {
1444 this.reverse
= reverse
;
1450 initiateScan(table
);
1451 } catch (IOException e
) {
1456 private void initiateScan(Table table
) throws IOException
{
1457 Scan scan
= new Scan();
1459 scan
.setReversed(true);
1461 CustomInnerRegionObserver
.getCdl().set(latch
);
1462 ResultScanner resScanner
= table
.getScanner(scan
);
1463 int i
= (reverse ? ROWS
.length
- 1 : 0);
1464 boolean resultFound
= false;
1465 for (Result result
: resScanner
) {
1467 System
.out
.println(result
);
1469 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1472 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1476 assertTrue(resultFound
);
1480 private void waitForStoreFileCount(HStore store
, int count
, int timeout
)
1481 throws InterruptedException
{
1482 long start
= EnvironmentEdgeManager
.currentTime();
1483 while (start
+ timeout
> EnvironmentEdgeManager
.currentTime() &&
1484 store
.getStorefilesCount() != count
) {
1487 System
.out
.println("start=" + start
+ ", now=" + EnvironmentEdgeManager
.currentTime() +
1488 ", cur=" + store
.getStorefilesCount());
1489 assertEquals(count
, store
.getStorefilesCount());
1492 private static class CustomScanner
implements RegionScanner
{
1494 private RegionScanner delegate
;
1496 public CustomScanner(RegionScanner delegate
) {
1497 this.delegate
= delegate
;
1501 public boolean next(List
<Cell
> results
) throws IOException
{
1502 return delegate
.next(results
);
1506 public boolean next(List
<Cell
> result
, ScannerContext scannerContext
) throws IOException
{
1507 return delegate
.next(result
, scannerContext
);
1511 public boolean nextRaw(List
<Cell
> result
) throws IOException
{
1512 return delegate
.nextRaw(result
);
1516 public boolean nextRaw(List
<Cell
> result
, ScannerContext context
) throws IOException
{
1517 boolean nextRaw
= delegate
.nextRaw(result
, context
);
1518 if (compactionLatch
!= null && compactionLatch
.getCount() > 0) {
1520 compactionLatch
.await();
1521 } catch (InterruptedException ie
) {
1525 if (CustomInnerRegionObserver
.throwException
.get()) {
1526 if (exceptionLatch
.getCount() > 0) {
1528 exceptionLatch
.await();
1529 } catch (InterruptedException e
) {
1531 throw new IOException("throw exception");
1538 public void close() throws IOException
{
1543 public RegionInfo
getRegionInfo() {
1544 return delegate
.getRegionInfo();
1548 public boolean isFilterDone() throws IOException
{
1549 return delegate
.isFilterDone();
1553 public boolean reseek(byte[] row
) throws IOException
{
1558 public long getMaxResultSize() {
1559 return delegate
.getMaxResultSize();
1563 public long getMvccReadPoint() {
1564 return delegate
.getMvccReadPoint();
1568 public int getBatch() {
1569 return delegate
.getBatch();
1573 public static class CustomInnerRegionObserverWrapper
extends CustomInnerRegionObserver
{
1575 public RegionScanner
postScannerOpen(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1576 Scan scan
, RegionScanner s
) throws IOException
{
1577 return new CustomScanner(s
);
1581 public static class CustomInnerRegionObserver
implements RegionCoprocessor
, RegionObserver
{
1582 static final AtomicInteger countOfNext
= new AtomicInteger(0);
1583 static final AtomicInteger countOfGets
= new AtomicInteger(0);
1584 static final AtomicBoolean waitForGets
= new AtomicBoolean(false);
1585 static final AtomicBoolean throwException
= new AtomicBoolean(false);
1586 private static final AtomicReference
<CountDownLatch
> cdl
= new AtomicReference
<>(
1587 new CountDownLatch(0));
1590 public Optional
<RegionObserver
> getRegionObserver() {
1591 return Optional
.of(this);
1595 public boolean postScannerNext(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1596 InternalScanner s
, List
<Result
> results
, int limit
, boolean hasMore
) throws IOException
{
1597 slowdownCode(e
, false);
1598 if (getLatch
!= null && getLatch
.getCount() > 0) {
1601 } catch (InterruptedException e1
) {
1608 public void postGetOp(ObserverContext
<RegionCoprocessorEnvironment
> e
, Get get
,
1609 List
<Cell
> results
) throws IOException
{
1610 slowdownCode(e
, true);
1613 public static AtomicReference
<CountDownLatch
> getCdl() {
1617 private void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
1619 CountDownLatch latch
= getCdl().get();
1621 System
.out
.println(latch
.getCount() + " is the count " + isGet
);
1622 if (latch
.getCount() > 0) {
1624 countOfGets
.incrementAndGet();
1626 countOfNext
.incrementAndGet();
1628 LOG
.info("Waiting for the counterCountDownLatch");
1629 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
1630 if (latch
.getCount() > 0) {
1631 throw new RuntimeException("Can't wait more");
1634 } catch (InterruptedException e1
) {
1635 LOG
.error(e1
.toString(), e1
);