2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Iterator
;
27 import java
.util
.List
;
28 import java
.util
.Optional
;
29 import java
.util
.concurrent
.CountDownLatch
;
30 import java
.util
.concurrent
.TimeUnit
;
31 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
32 import java
.util
.concurrent
.atomic
.AtomicInteger
;
33 import java
.util
.concurrent
.atomic
.AtomicReference
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.Cell
;
36 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
37 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
38 import org
.apache
.hadoop
.hbase
.HConstants
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
42 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
44 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
45 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
46 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
47 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
48 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheKey
;
49 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
50 import org
.apache
.hadoop
.hbase
.io
.hfile
.CachedBlock
;
51 import org
.apache
.hadoop
.hbase
.io
.hfile
.CombinedBlockCache
;
52 import org
.apache
.hadoop
.hbase
.io
.hfile
.bucket
.BucketCache
;
53 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
54 import org
.apache
.hadoop
.hbase
.regionserver
.HStore
;
55 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
56 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
57 import org
.apache
.hadoop
.hbase
.regionserver
.ScannerContext
;
58 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
59 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
60 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
61 import org
.junit
.After
;
62 import org
.junit
.AfterClass
;
63 import org
.junit
.Before
;
64 import org
.junit
.BeforeClass
;
65 import org
.junit
.ClassRule
;
66 import org
.junit
.Rule
;
67 import org
.junit
.Test
;
68 import org
.junit
.experimental
.categories
.Category
;
69 import org
.junit
.rules
.TestName
;
70 import org
.slf4j
.Logger
;
71 import org
.slf4j
.LoggerFactory
;
73 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Iterables
;
75 @Category({ LargeTests
.class, ClientTests
.class })
76 @SuppressWarnings("deprecation")
77 public class TestBlockEvictionFromClient
{
80 public static final HBaseClassTestRule CLASS_RULE
=
81 HBaseClassTestRule
.forClass(TestBlockEvictionFromClient
.class);
83 private static final Logger LOG
= LoggerFactory
.getLogger(TestBlockEvictionFromClient
.class);
84 protected final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
85 static byte[][] ROWS
= new byte[2][];
86 private static int NO_OF_THREADS
= 3;
87 private static byte[] ROW
= Bytes
.toBytes("testRow");
88 private static byte[] ROW1
= Bytes
.toBytes("testRow1");
89 private static byte[] ROW2
= Bytes
.toBytes("testRow2");
90 private static byte[] ROW3
= Bytes
.toBytes("testRow3");
91 private static byte[] FAMILY
= Bytes
.toBytes("testFamily");
92 private static byte[][] FAMILIES_1
= new byte[1][0];
93 private static byte[] QUALIFIER
= Bytes
.toBytes("testQualifier");
94 private static byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
95 private static byte[] data
= new byte[1000];
96 private static byte[] data2
= Bytes
.add(data
, data
);
97 protected static int SLAVES
= 1;
98 private static CountDownLatch latch
;
99 private static CountDownLatch getLatch
;
100 private static CountDownLatch compactionLatch
;
101 private static CountDownLatch exceptionLatch
;
104 public TestName name
= new TestName();
107 * @throws java.lang.Exception
110 public static void setUpBeforeClass() throws Exception
{
113 Configuration conf
= TEST_UTIL
.getConfiguration();
114 conf
.setStrings(CoprocessorHost
.REGION_COPROCESSOR_CONF_KEY
,
115 MultiRowMutationEndpoint
.class.getName());
116 conf
.setInt("hbase.regionserver.handler.count", 20);
117 conf
.setInt("hbase.bucketcache.size", 400);
118 conf
.setStrings(HConstants
.BUCKET_CACHE_IOENGINE_KEY
, "offheap");
119 conf
.setFloat("hfile.block.cache.size", 0.2f
);
120 conf
.setFloat("hbase.regionserver.global.memstore.size", 0.1f
);
121 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 0);// do not retry
122 conf
.setInt(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 5000);
123 FAMILIES_1
[0] = FAMILY
;
124 TEST_UTIL
.startMiniCluster(SLAVES
);
128 * @throws java.lang.Exception
131 public static void tearDownAfterClass() throws Exception
{
132 TEST_UTIL
.shutdownMiniCluster();
136 * @throws java.lang.Exception
139 public void setUp() throws Exception
{
140 CustomInnerRegionObserver
.waitForGets
.set(false);
141 CustomInnerRegionObserver
.countOfNext
.set(0);
142 CustomInnerRegionObserver
.countOfGets
.set(0);
146 * @throws java.lang.Exception
149 public void tearDown() throws Exception
{
151 while (latch
.getCount() > 0) {
155 if (getLatch
!= null) {
156 getLatch
.countDown();
158 if (compactionLatch
!= null) {
159 compactionLatch
.countDown();
161 if (exceptionLatch
!= null) {
162 exceptionLatch
.countDown();
166 compactionLatch
= null;
167 exceptionLatch
= null;
168 CustomInnerRegionObserver
.throwException
.set(false);
169 // Clean up the tables for every test case
170 TableName
[] listTableNames
= TEST_UTIL
.getAdmin().listTableNames();
171 for (TableName tableName
: listTableNames
) {
172 if (!tableName
.isSystemTable()) {
173 TEST_UTIL
.getAdmin().disableTable(tableName
);
174 TEST_UTIL
.getAdmin().deleteTable(tableName
);
180 public void testBlockEvictionWithParallelScans() throws Exception
{
183 latch
= new CountDownLatch(1);
184 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
185 // Create a table with block size as 1024
186 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
187 CustomInnerRegionObserver
.class.getName());
188 // get the block cache and region
189 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
190 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
191 HRegion region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
)
192 .getRegion(regionName
);
193 HStore store
= region
.getStores().iterator().next();
194 CacheConfig cacheConf
= store
.getCacheConfig();
195 cacheConf
.setCacheDataOnWrite(true);
196 cacheConf
.setEvictOnClose(true);
197 BlockCache cache
= cacheConf
.getBlockCache().get();
199 // insert data. 2 Rows are added
200 Put put
= new Put(ROW
);
201 put
.addColumn(FAMILY
, QUALIFIER
, data
);
204 put
.addColumn(FAMILY
, QUALIFIER
, data
);
206 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
207 // data was in memstore so don't expect any changes
209 // Should create one Hfile with 2 blocks
212 // Create three sets of scan
213 ScanThread
[] scanThreads
= initiateScan(table
, false);
215 checkForBlockEviction(cache
, false, false);
216 for (ScanThread thread
: scanThreads
) {
219 // CustomInnerRegionObserver.sleepTime.set(0);
220 Iterator
<CachedBlock
> iterator
= cache
.iterator();
221 iterateBlockCache(cache
, iterator
);
222 // read the data and expect same blocks, one new hit, no misses
223 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
224 iterator
= cache
.iterator();
225 iterateBlockCache(cache
, iterator
);
226 // Check how this miss is happening
227 // insert a second column, read the row, no new blocks, 3 new hits
228 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
229 byte[] data2
= Bytes
.add(data
, data
);
231 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
233 Result r
= table
.get(new Get(ROW
));
234 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
235 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
236 iterator
= cache
.iterator();
237 iterateBlockCache(cache
, iterator
);
238 // flush, one new block
239 System
.out
.println("Flushing cache");
241 iterator
= cache
.iterator();
242 iterateBlockCache(cache
, iterator
);
243 // compact, net minus two blocks, two hits, no misses
244 System
.out
.println("Compacting");
245 assertEquals(2, store
.getStorefilesCount());
246 store
.triggerMajorCompaction();
247 region
.compact(true);
248 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
249 assertEquals(1, store
.getStorefilesCount());
250 iterator
= cache
.iterator();
251 iterateBlockCache(cache
, iterator
);
252 // read the row, this should be a cache miss because we don't cache data
253 // blocks on compaction
254 r
= table
.get(new Get(ROW
));
255 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
256 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
257 iterator
= cache
.iterator();
258 iterateBlockCache(cache
, iterator
);
267 public void testParallelGetsAndScans() throws IOException
, InterruptedException
{
270 latch
= new CountDownLatch(2);
271 // Check if get() returns blocks on its close() itself
272 getLatch
= new CountDownLatch(1);
273 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
274 // Create KV that will give you two blocks
275 // Create a table with block size as 1024
276 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
277 CustomInnerRegionObserver
.class.getName());
278 // get the block cache and region
279 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
280 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
282 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
283 HStore store
= region
.getStores().iterator().next();
284 CacheConfig cacheConf
= store
.getCacheConfig();
285 cacheConf
.setCacheDataOnWrite(true);
286 cacheConf
.setEvictOnClose(true);
287 BlockCache cache
= cacheConf
.getBlockCache().get();
291 System
.out
.println("Flushing cache");
292 // Should create one Hfile with 2 blocks
294 // Create three sets of scan
295 CustomInnerRegionObserver
.waitForGets
.set(true);
296 ScanThread
[] scanThreads
= initiateScan(table
, false);
297 // Create three sets of gets
298 GetThread
[] getThreads
= initiateGet(table
, false, false);
299 checkForBlockEviction(cache
, false, false);
300 CustomInnerRegionObserver
.waitForGets
.set(false);
301 checkForBlockEviction(cache
, false, false);
302 for (GetThread thread
: getThreads
) {
305 // Verify whether the gets have returned the blocks that it had
306 CustomInnerRegionObserver
.waitForGets
.set(true);
307 // giving some time for the block to be decremented
308 checkForBlockEviction(cache
, true, false);
309 getLatch
.countDown();
310 for (ScanThread thread
: scanThreads
) {
313 System
.out
.println("Scans should have returned the bloks");
314 // Check with either true or false
315 CustomInnerRegionObserver
.waitForGets
.set(false);
316 // The scan should also have released the blocks by now
317 checkForBlockEviction(cache
, true, true);
326 public void testGetWithCellsInDifferentFiles() throws IOException
, InterruptedException
{
329 latch
= new CountDownLatch(1);
330 // Check if get() returns blocks on its close() itself
331 getLatch
= new CountDownLatch(1);
332 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
333 // Create KV that will give you two blocks
334 // Create a table with block size as 1024
335 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
336 CustomInnerRegionObserver
.class.getName());
337 // get the block cache and region
338 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
339 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
341 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
342 HStore store
= region
.getStores().iterator().next();
343 CacheConfig cacheConf
= store
.getCacheConfig();
344 cacheConf
.setCacheDataOnWrite(true);
345 cacheConf
.setEvictOnClose(true);
346 BlockCache cache
= cacheConf
.getBlockCache().get();
348 Put put
= new Put(ROW
);
349 put
.addColumn(FAMILY
, QUALIFIER
, data
);
353 put
.addColumn(FAMILY
, QUALIFIER
, data
);
356 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
358 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
362 System
.out
.println("Flushing cache");
363 // Should create one Hfile with 2 blocks
364 CustomInnerRegionObserver
.waitForGets
.set(true);
365 // Create three sets of gets
366 GetThread
[] getThreads
= initiateGet(table
, false, false);
368 CustomInnerRegionObserver
.getCdl().get().countDown();
369 for (GetThread thread
: getThreads
) {
372 // Verify whether the gets have returned the blocks that it had
373 CustomInnerRegionObserver
.waitForGets
.set(true);
374 // giving some time for the block to be decremented
375 checkForBlockEviction(cache
, true, false);
376 getLatch
.countDown();
377 System
.out
.println("Gets should have returned the bloks");
386 // TODO : check how block index works here
387 public void testGetsWithMultiColumnsAndExplicitTracker()
388 throws IOException
, InterruptedException
{
391 latch
= new CountDownLatch(1);
392 // Check if get() returns blocks on its close() itself
393 getLatch
= new CountDownLatch(1);
394 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
395 // Create KV that will give you two blocks
396 // Create a table with block size as 1024
397 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
398 CustomInnerRegionObserver
.class.getName());
399 // get the block cache and region
400 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
401 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
403 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
404 BlockCache cache
= setCacheProperties(region
);
405 Put put
= new Put(ROW
);
406 put
.addColumn(FAMILY
, QUALIFIER
, data
);
410 put
.addColumn(FAMILY
, QUALIFIER
, data
);
413 for (int i
= 1; i
< 10; i
++) {
415 put
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + i
), data2
);
421 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
423 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
427 System
.out
.println("Flushing cache");
428 // Should create one Hfile with 2 blocks
429 CustomInnerRegionObserver
.waitForGets
.set(true);
430 // Create three sets of gets
431 GetThread
[] getThreads
= initiateGet(table
, true, false);
433 Iterator
<CachedBlock
> iterator
= cache
.iterator();
434 boolean usedBlocksFound
= false;
436 int noOfBlocksWithRef
= 0;
437 while (iterator
.hasNext()) {
438 CachedBlock next
= iterator
.next();
439 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
440 if (cache
instanceof BucketCache
) {
441 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
442 } else if (cache
instanceof CombinedBlockCache
) {
443 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
448 // Blocks will be with count 3
449 System
.out
.println("The refCount is " + refCount
);
450 assertEquals(NO_OF_THREADS
, refCount
);
451 usedBlocksFound
= true;
455 assertTrue(usedBlocksFound
);
456 // the number of blocks referred
457 assertEquals(10, noOfBlocksWithRef
);
458 CustomInnerRegionObserver
.getCdl().get().countDown();
459 for (GetThread thread
: getThreads
) {
462 // Verify whether the gets have returned the blocks that it had
463 CustomInnerRegionObserver
.waitForGets
.set(true);
464 // giving some time for the block to be decremented
465 checkForBlockEviction(cache
, true, false);
466 getLatch
.countDown();
467 System
.out
.println("Gets should have returned the bloks");
476 public void testGetWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
479 latch
= new CountDownLatch(1);
480 // Check if get() returns blocks on its close() itself
481 getLatch
= new CountDownLatch(1);
482 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
483 // Create KV that will give you two blocks
484 // Create a table with block size as 1024
485 byte[][] fams
= new byte[10][];
487 for (int i
= 1; i
< 10; i
++) {
488 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
490 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
491 CustomInnerRegionObserver
.class.getName());
492 // get the block cache and region
493 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
494 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
496 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
497 BlockCache cache
= setCacheProperties(region
);
499 Put put
= new Put(ROW
);
500 put
.addColumn(FAMILY
, QUALIFIER
, data
);
504 put
.addColumn(FAMILY
, QUALIFIER
, data
);
507 for (int i
= 1; i
< 10; i
++) {
509 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
516 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
518 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
522 System
.out
.println("Flushing cache");
523 // Should create one Hfile with 2 blocks
524 CustomInnerRegionObserver
.waitForGets
.set(true);
525 // Create three sets of gets
526 GetThread
[] getThreads
= initiateGet(table
, true, true);
528 Iterator
<CachedBlock
> iterator
= cache
.iterator();
529 boolean usedBlocksFound
= false;
531 int noOfBlocksWithRef
= 0;
532 while (iterator
.hasNext()) {
533 CachedBlock next
= iterator
.next();
534 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
535 if (cache
instanceof BucketCache
) {
536 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
537 } else if (cache
instanceof CombinedBlockCache
) {
538 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
543 // Blocks will be with count 3
544 System
.out
.println("The refCount is " + refCount
);
545 assertEquals(NO_OF_THREADS
, refCount
);
546 usedBlocksFound
= true;
550 assertTrue(usedBlocksFound
);
551 // the number of blocks referred
552 assertEquals(3, noOfBlocksWithRef
);
553 CustomInnerRegionObserver
.getCdl().get().countDown();
554 for (GetThread thread
: getThreads
) {
557 // Verify whether the gets have returned the blocks that it had
558 CustomInnerRegionObserver
.waitForGets
.set(true);
559 // giving some time for the block to be decremented
560 checkForBlockEviction(cache
, true, false);
561 getLatch
.countDown();
562 System
.out
.println("Gets should have returned the bloks");
571 public void testBlockRefCountAfterSplits() throws IOException
, InterruptedException
{
574 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
575 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024);
576 // get the block cache and region
577 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
578 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
580 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
581 HStore store
= region
.getStores().iterator().next();
582 CacheConfig cacheConf
= store
.getCacheConfig();
583 cacheConf
.setEvictOnClose(true);
584 BlockCache cache
= cacheConf
.getBlockCache().get();
586 Put put
= new Put(ROW
);
587 put
.addColumn(FAMILY
, QUALIFIER
, data
);
591 put
.addColumn(FAMILY
, QUALIFIER
, data
);
594 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
596 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
599 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
602 ServerName rs
= Iterables
.getOnlyElement(TEST_UTIL
.getAdmin().getRegionServers());
603 int regionCount
= TEST_UTIL
.getAdmin().getRegions(rs
).size();
604 LOG
.info("About to SPLIT on " + Bytes
.toString(ROW1
));
605 TEST_UTIL
.getAdmin().split(tableName
, ROW1
);
607 TEST_UTIL
.waitFor(60000, () -> TEST_UTIL
.getAdmin().getRegions(rs
).size() > regionCount
);
608 region
.compact(true);
609 Iterator
<CachedBlock
> iterator
= cache
.iterator();
610 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
611 // should be closed inorder to return those blocks
612 iterateBlockCache(cache
, iterator
);
621 public void testMultiGets() throws IOException
, InterruptedException
{
624 latch
= new CountDownLatch(2);
625 // Check if get() returns blocks on its close() itself
626 getLatch
= new CountDownLatch(1);
627 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
628 // Create KV that will give you two blocks
629 // Create a table with block size as 1024
630 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
631 CustomInnerRegionObserver
.class.getName());
632 // get the block cache and region
633 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
634 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
636 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
637 HStore store
= region
.getStores().iterator().next();
638 CacheConfig cacheConf
= store
.getCacheConfig();
639 cacheConf
.setCacheDataOnWrite(true);
640 cacheConf
.setEvictOnClose(true);
641 BlockCache cache
= cacheConf
.getBlockCache().get();
643 Put put
= new Put(ROW
);
644 put
.addColumn(FAMILY
, QUALIFIER
, data
);
648 put
.addColumn(FAMILY
, QUALIFIER
, data
);
651 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
653 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
657 System
.out
.println("Flushing cache");
658 // Should create one Hfile with 2 blocks
659 CustomInnerRegionObserver
.waitForGets
.set(true);
660 // Create three sets of gets
661 MultiGetThread
[] getThreads
= initiateMultiGet(table
);
664 Iterator
<CachedBlock
> iterator
= cache
.iterator();
665 boolean foundNonZeroBlock
= false;
666 while (iterator
.hasNext()) {
667 CachedBlock next
= iterator
.next();
668 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
669 if (cache
instanceof BucketCache
) {
670 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
671 } else if (cache
instanceof CombinedBlockCache
) {
672 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
677 assertEquals(NO_OF_THREADS
, refCount
);
678 foundNonZeroBlock
= true;
681 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock
);
682 CustomInnerRegionObserver
.getCdl().get().countDown();
683 CustomInnerRegionObserver
.getCdl().get().countDown();
684 for (MultiGetThread thread
: getThreads
) {
687 // Verify whether the gets have returned the blocks that it had
688 CustomInnerRegionObserver
.waitForGets
.set(true);
689 // giving some time for the block to be decremented
690 iterateBlockCache(cache
, iterator
);
691 getLatch
.countDown();
692 System
.out
.println("Gets should have returned the bloks");
700 public void testScanWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
703 latch
= new CountDownLatch(1);
704 // Check if get() returns blocks on its close() itself
705 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
706 // Create KV that will give you two blocks
707 // Create a table with block size as 1024
708 byte[][] fams
= new byte[10][];
710 for (int i
= 1; i
< 10; i
++) {
711 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
713 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
714 CustomInnerRegionObserver
.class.getName());
715 // get the block cache and region
716 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
717 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
719 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
720 BlockCache cache
= setCacheProperties(region
);
722 Put put
= new Put(ROW
);
723 put
.addColumn(FAMILY
, QUALIFIER
, data
);
727 put
.addColumn(FAMILY
, QUALIFIER
, data
);
730 for (int i
= 1; i
< 10; i
++) {
732 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
739 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
741 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
745 System
.out
.println("Flushing cache");
746 // Should create one Hfile with 2 blocks
747 // Create three sets of gets
748 ScanThread
[] scanThreads
= initiateScan(table
, true);
750 Iterator
<CachedBlock
> iterator
= cache
.iterator();
751 boolean usedBlocksFound
= false;
753 int noOfBlocksWithRef
= 0;
754 while (iterator
.hasNext()) {
755 CachedBlock next
= iterator
.next();
756 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
757 if (cache
instanceof BucketCache
) {
758 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
759 } else if (cache
instanceof CombinedBlockCache
) {
760 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
765 // Blocks will be with count 3
766 System
.out
.println("The refCount is " + refCount
);
767 assertEquals(NO_OF_THREADS
, refCount
);
768 usedBlocksFound
= true;
772 assertTrue(usedBlocksFound
);
773 // the number of blocks referred
774 assertEquals(12, noOfBlocksWithRef
);
775 CustomInnerRegionObserver
.getCdl().get().countDown();
776 for (ScanThread thread
: scanThreads
) {
779 // giving some time for the block to be decremented
780 checkForBlockEviction(cache
, true, false);
788 private BlockCache
setCacheProperties(HRegion region
) {
789 Iterator
<HStore
> strItr
= region
.getStores().iterator();
790 BlockCache cache
= null;
791 while (strItr
.hasNext()) {
792 HStore store
= strItr
.next();
793 CacheConfig cacheConf
= store
.getCacheConfig();
794 cacheConf
.setCacheDataOnWrite(true);
795 cacheConf
.setEvictOnClose(true);
797 cache
= cacheConf
.getBlockCache().get();
803 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException
,
804 InterruptedException
{
807 latch
= new CountDownLatch(2);
808 // Check if get() returns blocks on its close() itself
809 getLatch
= new CountDownLatch(1);
810 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
811 // Create KV that will give you two blocks
812 // Create a table with block size as 1024
813 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
814 CustomInnerRegionObserverWrapper
.class.getName());
815 // get the block cache and region
816 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
817 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
819 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
820 HStore store
= region
.getStores().iterator().next();
821 CacheConfig cacheConf
= store
.getCacheConfig();
822 cacheConf
.setCacheDataOnWrite(true);
823 cacheConf
.setEvictOnClose(true);
824 BlockCache cache
= cacheConf
.getBlockCache().get();
826 // insert data. 2 Rows are added
829 System
.out
.println("Flushing cache");
830 // Should create one Hfile with 2 blocks
832 // CustomInnerRegionObserver.sleepTime.set(5000);
833 // Create three sets of scan
834 CustomInnerRegionObserver
.waitForGets
.set(true);
835 ScanThread
[] scanThreads
= initiateScan(table
, false);
836 // Create three sets of gets
837 GetThread
[] getThreads
= initiateGet(table
, false, false);
838 // The block would have been decremented for the scan case as it was
840 // before even the postNext hook gets executed.
841 // giving some time for the block to be decremented
843 CustomInnerRegionObserver
.waitForGets
.set(false);
844 checkForBlockEviction(cache
, false, false);
845 // countdown the latch
846 CustomInnerRegionObserver
.getCdl().get().countDown();
847 for (GetThread thread
: getThreads
) {
850 getLatch
.countDown();
851 for (ScanThread thread
: scanThreads
) {
862 public void testScanWithCompaction() throws IOException
, InterruptedException
{
863 testScanWithCompactionInternals(name
.getMethodName(), false);
867 public void testReverseScanWithCompaction() throws IOException
, InterruptedException
{
868 testScanWithCompactionInternals(name
.getMethodName(), true);
871 private void testScanWithCompactionInternals(String tableNameStr
, boolean reversed
)
872 throws IOException
, InterruptedException
{
875 latch
= new CountDownLatch(1);
876 compactionLatch
= new CountDownLatch(1);
877 TableName tableName
= TableName
.valueOf(tableNameStr
);
878 // Create a table with block size as 1024
879 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
880 CustomInnerRegionObserverWrapper
.class.getName());
881 // get the block cache and region
882 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
883 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
885 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
886 HStore store
= region
.getStores().iterator().next();
887 CacheConfig cacheConf
= store
.getCacheConfig();
888 cacheConf
.setCacheDataOnWrite(true);
889 cacheConf
.setEvictOnClose(true);
890 BlockCache cache
= cacheConf
.getBlockCache().get();
892 // insert data. 2 Rows are added
893 Put put
= new Put(ROW
);
894 put
.addColumn(FAMILY
, QUALIFIER
, data
);
897 put
.addColumn(FAMILY
, QUALIFIER
, data
);
899 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
900 // Should create one Hfile with 2 blocks
902 // read the data and expect same blocks, one new hit, no misses
904 // Check how this miss is happening
905 // insert a second column, read the row, no new blocks, 3 new hits
906 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
907 byte[] data2
= Bytes
.add(data
, data
);
909 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
911 // flush, one new block
912 System
.out
.println("Flushing cache");
914 Iterator
<CachedBlock
> iterator
= cache
.iterator();
915 iterateBlockCache(cache
, iterator
);
916 // Create three sets of scan
917 ScanThread
[] scanThreads
= initiateScan(table
, reversed
);
919 iterator
= cache
.iterator();
920 boolean usedBlocksFound
= false;
921 while (iterator
.hasNext()) {
922 CachedBlock next
= iterator
.next();
923 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
924 if (cache
instanceof BucketCache
) {
925 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
926 } else if (cache
instanceof CombinedBlockCache
) {
927 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
932 // Blocks will be with count 3
933 assertEquals(NO_OF_THREADS
, refCount
);
934 usedBlocksFound
= true;
937 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
938 usedBlocksFound
= false;
939 System
.out
.println("Compacting");
940 assertEquals(2, store
.getStorefilesCount());
941 store
.triggerMajorCompaction();
942 region
.compact(true);
943 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
944 assertEquals(1, store
.getStorefilesCount());
945 // Even after compaction is done we will have some blocks that cannot
946 // be evicted this is because the scan is still referencing them
947 iterator
= cache
.iterator();
948 while (iterator
.hasNext()) {
949 CachedBlock next
= iterator
.next();
950 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
951 if (cache
instanceof BucketCache
) {
952 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
953 } else if (cache
instanceof CombinedBlockCache
) {
954 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
959 // Blocks will be with count 3 as they are not yet cleared
960 assertEquals(NO_OF_THREADS
, refCount
);
961 usedBlocksFound
= true;
964 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
965 // Should not throw exception
966 compactionLatch
.countDown();
968 for (ScanThread thread
: scanThreads
) {
971 // by this time all blocks should have been evicted
972 iterator
= cache
.iterator();
973 iterateBlockCache(cache
, iterator
);
974 Result r
= table
.get(new Get(ROW
));
975 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
976 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
977 // The gets would be working on new blocks
978 iterator
= cache
.iterator();
979 iterateBlockCache(cache
, iterator
);
988 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
989 throws IOException
, InterruptedException
{
990 // do flush and scan in parallel
993 latch
= new CountDownLatch(1);
994 compactionLatch
= new CountDownLatch(1);
995 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
996 // Create a table with block size as 1024
997 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
998 CustomInnerRegionObserverWrapper
.class.getName());
999 // get the block cache and region
1000 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1001 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
1003 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
1004 HStore store
= region
.getStores().iterator().next();
1005 CacheConfig cacheConf
= store
.getCacheConfig();
1006 cacheConf
.setCacheDataOnWrite(true);
1007 cacheConf
.setEvictOnClose(true);
1008 BlockCache cache
= cacheConf
.getBlockCache().get();
1010 // insert data. 2 Rows are added
1011 Put put
= new Put(ROW
);
1012 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1014 put
= new Put(ROW1
);
1015 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1017 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
1018 // Should create one Hfile with 2 blocks
1020 // read the data and expect same blocks, one new hit, no misses
1022 // Check how this miss is happening
1023 // insert a second column, read the row, no new blocks, 3 new hits
1024 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1025 byte[] data2
= Bytes
.add(data
, data
);
1027 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1029 // flush, one new block
1030 System
.out
.println("Flushing cache");
1032 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1033 iterateBlockCache(cache
, iterator
);
1034 // Create three sets of scan
1035 ScanThread
[] scanThreads
= initiateScan(table
, false);
1037 iterator
= cache
.iterator();
1038 boolean usedBlocksFound
= false;
1039 while (iterator
.hasNext()) {
1040 CachedBlock next
= iterator
.next();
1041 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1042 if (cache
instanceof BucketCache
) {
1043 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1044 } else if (cache
instanceof CombinedBlockCache
) {
1045 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1049 if (refCount
!= 0) {
1050 // Blocks will be with count 3
1051 assertEquals(NO_OF_THREADS
, refCount
);
1052 usedBlocksFound
= true;
1055 // Make a put and do a flush
1056 QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1057 data2
= Bytes
.add(data
, data
);
1058 put
= new Put(ROW1
);
1059 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1061 // flush, one new block
1062 System
.out
.println("Flushing cache");
1064 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1065 usedBlocksFound
= false;
1066 System
.out
.println("Compacting");
1067 assertEquals(3, store
.getStorefilesCount());
1068 store
.triggerMajorCompaction();
1069 region
.compact(true);
1070 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
1071 assertEquals(1, store
.getStorefilesCount());
1072 // Even after compaction is done we will have some blocks that cannot
1073 // be evicted this is because the scan is still referencing them
1074 iterator
= cache
.iterator();
1075 while (iterator
.hasNext()) {
1076 CachedBlock next
= iterator
.next();
1077 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1078 if (cache
instanceof BucketCache
) {
1079 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1080 } else if (cache
instanceof CombinedBlockCache
) {
1081 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1085 if (refCount
!= 0) {
1086 // Blocks will be with count 3 as they are not yet cleared
1087 assertEquals(NO_OF_THREADS
, refCount
);
1088 usedBlocksFound
= true;
1091 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1092 // Should not throw exception
1093 compactionLatch
.countDown();
1095 for (ScanThread thread
: scanThreads
) {
1098 // by this time all blocks should have been evicted
1099 iterator
= cache
.iterator();
1100 // Since a flush and compaction happened after a scan started
1101 // we need to ensure that all the original blocks of the compacted file
1103 iterateBlockCache(cache
, iterator
);
1104 Result r
= table
.get(new Get(ROW
));
1105 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1106 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1107 // The gets would be working on new blocks
1108 iterator
= cache
.iterator();
1109 iterateBlockCache(cache
, iterator
);
1111 if (table
!= null) {
1119 public void testScanWithException() throws IOException
, InterruptedException
{
1122 latch
= new CountDownLatch(1);
1123 exceptionLatch
= new CountDownLatch(1);
1124 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1125 // Create KV that will give you two blocks
1126 // Create a table with block size as 1024
1127 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
1128 CustomInnerRegionObserverWrapper
.class.getName());
1129 // get the block cache and region
1130 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1131 String regionName
= locator
.getAllRegionLocations().get(0).getRegion().getEncodedName();
1133 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
1134 HStore store
= region
.getStores().iterator().next();
1135 CacheConfig cacheConf
= store
.getCacheConfig();
1136 cacheConf
.setCacheDataOnWrite(true);
1137 cacheConf
.setEvictOnClose(true);
1138 BlockCache cache
= cacheConf
.getBlockCache().get();
1139 // insert data. 2 Rows are added
1142 System
.out
.println("Flushing cache");
1143 // Should create one Hfile with 2 blocks
1145 // CustomInnerRegionObserver.sleepTime.set(5000);
1146 CustomInnerRegionObserver
.throwException
.set(true);
1147 ScanThread
[] scanThreads
= initiateScan(table
, false);
1148 // The block would have been decremented for the scan case as it was
1150 // before even the postNext hook gets executed.
1151 // giving some time for the block to be decremented
1153 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1154 boolean usedBlocksFound
= false;
1156 while (iterator
.hasNext()) {
1157 CachedBlock next
= iterator
.next();
1158 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1159 if (cache
instanceof BucketCache
) {
1160 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1161 } else if (cache
instanceof CombinedBlockCache
) {
1162 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1166 if (refCount
!= 0) {
1167 // Blocks will be with count 3
1168 assertEquals(NO_OF_THREADS
, refCount
);
1169 usedBlocksFound
= true;
1172 assertTrue(usedBlocksFound
);
1173 exceptionLatch
.countDown();
1174 // countdown the latch
1175 CustomInnerRegionObserver
.getCdl().get().countDown();
1176 for (ScanThread thread
: scanThreads
) {
1179 iterator
= cache
.iterator();
1180 usedBlocksFound
= false;
1182 while (iterator
.hasNext()) {
1183 CachedBlock next
= iterator
.next();
1184 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1185 if (cache
instanceof BucketCache
) {
1186 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1187 } else if (cache
instanceof CombinedBlockCache
) {
1188 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1192 if (refCount
!= 0) {
1193 // Blocks will be with count 3
1194 assertEquals(NO_OF_THREADS
, refCount
);
1195 usedBlocksFound
= true;
1198 assertFalse(usedBlocksFound
);
1199 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1200 assertEquals(0, refCount
);
1202 if (table
!= null) {
1208 private void iterateBlockCache(BlockCache cache
, Iterator
<CachedBlock
> iterator
) {
1210 while (iterator
.hasNext()) {
1211 CachedBlock next
= iterator
.next();
1212 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1213 if (cache
instanceof BucketCache
) {
1214 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1215 } else if (cache
instanceof CombinedBlockCache
) {
1216 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1220 assertEquals(0, refCount
);
1224 private void insertData(Table table
) throws IOException
{
1225 Put put
= new Put(ROW
);
1226 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1228 put
= new Put(ROW1
);
1229 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1231 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1233 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1237 private ScanThread
[] initiateScan(Table table
, boolean reverse
) throws IOException
,
1238 InterruptedException
{
1239 ScanThread
[] scanThreads
= new ScanThread
[NO_OF_THREADS
];
1240 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1241 scanThreads
[i
] = new ScanThread(table
, reverse
);
1243 for (ScanThread thread
: scanThreads
) {
1249 private GetThread
[] initiateGet(Table table
, boolean tracker
, boolean multipleCFs
)
1250 throws IOException
, InterruptedException
{
1251 GetThread
[] getThreads
= new GetThread
[NO_OF_THREADS
];
1252 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1253 getThreads
[i
] = new GetThread(table
, tracker
, multipleCFs
);
1255 for (GetThread thread
: getThreads
) {
1261 private MultiGetThread
[] initiateMultiGet(Table table
)
1262 throws IOException
, InterruptedException
{
1263 MultiGetThread
[] multiGetThreads
= new MultiGetThread
[NO_OF_THREADS
];
1264 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1265 multiGetThreads
[i
] = new MultiGetThread(table
);
1267 for (MultiGetThread thread
: multiGetThreads
) {
1270 return multiGetThreads
;
1273 private void checkForBlockEviction(BlockCache cache
, boolean getClosed
, boolean expectOnlyZero
)
1274 throws InterruptedException
{
1275 int counter
= NO_OF_THREADS
;
1276 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1277 // Because only one row is selected, it has only 2 blocks
1278 counter
= counter
- 1;
1279 while (CustomInnerRegionObserver
.countOfGets
.get() < NO_OF_THREADS
) {
1283 while (CustomInnerRegionObserver
.countOfNext
.get() < NO_OF_THREADS
) {
1287 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1289 while (iterator
.hasNext()) {
1290 CachedBlock next
= iterator
.next();
1291 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1292 if (cache
instanceof BucketCache
) {
1293 refCount
= ((BucketCache
) cache
).getRpcRefCount(cacheKey
);
1294 } else if (cache
instanceof CombinedBlockCache
) {
1295 refCount
= ((CombinedBlockCache
) cache
).getRpcRefCount(cacheKey
);
1299 System
.out
.println(" the refcount is " + refCount
+ " block is " + cacheKey
);
1300 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1301 if (expectOnlyZero
) {
1302 assertTrue(refCount
== 0);
1304 if (refCount
!= 0) {
1305 // Because the scan would have also touched up on these blocks but
1307 // would have touched
1310 // If get has closed only the scan's blocks would be available
1311 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get());
1313 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get() + (NO_OF_THREADS
));
1317 // Because the get would have also touched up on these blocks but it
1318 // would have touched
1319 // upon only 2 additionally
1320 if (expectOnlyZero
) {
1321 assertTrue(refCount
== 0);
1323 if (refCount
!= 0) {
1324 if (getLatch
== null) {
1325 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get());
1327 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get() + (NO_OF_THREADS
));
1332 CustomInnerRegionObserver
.getCdl().get().countDown();
1335 private static class MultiGetThread
extends Thread
{
1336 private final Table table
;
1337 private final List
<Get
> gets
= new ArrayList
<>();
1338 public MultiGetThread(Table table
) {
1343 gets
.add(new Get(ROW
));
1344 gets
.add(new Get(ROW1
));
1346 CustomInnerRegionObserver
.getCdl().set(latch
);
1347 Result
[] r
= table
.get(gets
);
1348 assertTrue(Bytes
.equals(r
[0].getRow(), ROW
));
1349 assertTrue(Bytes
.equals(r
[1].getRow(), ROW1
));
1350 } catch (IOException e
) {
1355 private static class GetThread
extends Thread
{
1356 private final Table table
;
1357 private final boolean tracker
;
1358 private final boolean multipleCFs
;
1360 public GetThread(Table table
, boolean tracker
, boolean multipleCFs
) {
1362 this.tracker
= tracker
;
1363 this.multipleCFs
= multipleCFs
;
1370 } catch (IOException e
) {
1375 private void initiateGet(Table table
) throws IOException
{
1376 Get get
= new Get(ROW
);
1380 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 3));
1381 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 8));
1382 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 9));
1384 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 900));
1386 get
.addColumn(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3));
1387 get
.addColumn(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8));
1388 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9));
1390 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 900));
1393 CustomInnerRegionObserver
.getCdl().set(latch
);
1394 Result r
= table
.get(get
);
1395 System
.out
.println(r
);
1397 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1398 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1401 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 3)), data2
));
1402 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 8)), data2
));
1403 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 9)), data2
));
1405 assertTrue(Bytes
.equals(
1406 r
.getValue(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3)),
1408 assertTrue(Bytes
.equals(
1409 r
.getValue(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8)),
1411 assertTrue(Bytes
.equals(
1412 r
.getValue(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9)),
1419 private static class ScanThread
extends Thread
{
1420 private final Table table
;
1421 private final boolean reverse
;
1423 public ScanThread(Table table
, boolean reverse
) {
1425 this.reverse
= reverse
;
1431 initiateScan(table
);
1432 } catch (IOException e
) {
1437 private void initiateScan(Table table
) throws IOException
{
1438 Scan scan
= new Scan();
1440 scan
.setReversed(true);
1442 CustomInnerRegionObserver
.getCdl().set(latch
);
1443 ResultScanner resScanner
= table
.getScanner(scan
);
1444 int i
= (reverse ? ROWS
.length
- 1 : 0);
1445 boolean resultFound
= false;
1446 for (Result result
: resScanner
) {
1448 System
.out
.println(result
);
1450 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1453 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1457 assertTrue(resultFound
);
1461 private void waitForStoreFileCount(HStore store
, int count
, int timeout
)
1462 throws InterruptedException
{
1463 long start
= System
.currentTimeMillis();
1464 while (start
+ timeout
> System
.currentTimeMillis() && store
.getStorefilesCount() != count
) {
1467 System
.out
.println("start=" + start
+ ", now=" + System
.currentTimeMillis() + ", cur=" +
1468 store
.getStorefilesCount());
1469 assertEquals(count
, store
.getStorefilesCount());
1472 private static class CustomScanner
implements RegionScanner
{
1474 private RegionScanner delegate
;
1476 public CustomScanner(RegionScanner delegate
) {
1477 this.delegate
= delegate
;
1481 public boolean next(List
<Cell
> results
) throws IOException
{
1482 return delegate
.next(results
);
1486 public boolean next(List
<Cell
> result
, ScannerContext scannerContext
) throws IOException
{
1487 return delegate
.next(result
, scannerContext
);
1491 public boolean nextRaw(List
<Cell
> result
) throws IOException
{
1492 return delegate
.nextRaw(result
);
1496 public boolean nextRaw(List
<Cell
> result
, ScannerContext context
) throws IOException
{
1497 boolean nextRaw
= delegate
.nextRaw(result
, context
);
1498 if (compactionLatch
!= null && compactionLatch
.getCount() > 0) {
1500 compactionLatch
.await();
1501 } catch (InterruptedException ie
) {
1505 if (CustomInnerRegionObserver
.throwException
.get()) {
1506 if (exceptionLatch
.getCount() > 0) {
1508 exceptionLatch
.await();
1509 } catch (InterruptedException e
) {
1511 throw new IOException("throw exception");
1518 public void close() throws IOException
{
1523 public RegionInfo
getRegionInfo() {
1524 return delegate
.getRegionInfo();
1528 public boolean isFilterDone() throws IOException
{
1529 return delegate
.isFilterDone();
1533 public boolean reseek(byte[] row
) throws IOException
{
1538 public long getMaxResultSize() {
1539 return delegate
.getMaxResultSize();
1543 public long getMvccReadPoint() {
1544 return delegate
.getMvccReadPoint();
1548 public int getBatch() {
1549 return delegate
.getBatch();
1553 public static class CustomInnerRegionObserverWrapper
extends CustomInnerRegionObserver
{
1555 public RegionScanner
postScannerOpen(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1556 Scan scan
, RegionScanner s
) throws IOException
{
1557 return new CustomScanner(s
);
1561 public static class CustomInnerRegionObserver
implements RegionCoprocessor
, RegionObserver
{
1562 static final AtomicInteger countOfNext
= new AtomicInteger(0);
1563 static final AtomicInteger countOfGets
= new AtomicInteger(0);
1564 static final AtomicBoolean waitForGets
= new AtomicBoolean(false);
1565 static final AtomicBoolean throwException
= new AtomicBoolean(false);
1566 private static final AtomicReference
<CountDownLatch
> cdl
= new AtomicReference
<>(
1567 new CountDownLatch(0));
1570 public Optional
<RegionObserver
> getRegionObserver() {
1571 return Optional
.of(this);
1575 public boolean postScannerNext(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1576 InternalScanner s
, List
<Result
> results
, int limit
, boolean hasMore
) throws IOException
{
1577 slowdownCode(e
, false);
1578 if (getLatch
!= null && getLatch
.getCount() > 0) {
1581 } catch (InterruptedException e1
) {
1588 public void postGetOp(ObserverContext
<RegionCoprocessorEnvironment
> e
, Get get
,
1589 List
<Cell
> results
) throws IOException
{
1590 slowdownCode(e
, true);
1593 public static AtomicReference
<CountDownLatch
> getCdl() {
1597 private void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
1599 CountDownLatch latch
= getCdl().get();
1601 System
.out
.println(latch
.getCount() + " is the count " + isGet
);
1602 if (latch
.getCount() > 0) {
1604 countOfGets
.incrementAndGet();
1606 countOfNext
.incrementAndGet();
1608 LOG
.info("Waiting for the counterCountDownLatch");
1609 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
1610 if (latch
.getCount() > 0) {
1611 throw new RuntimeException("Can't wait more");
1614 } catch (InterruptedException e1
) {
1615 LOG
.error(e1
.toString(), e1
);