2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Collection
;
27 import java
.util
.Iterator
;
28 import java
.util
.List
;
29 import java
.util
.Optional
;
30 import java
.util
.concurrent
.CountDownLatch
;
31 import java
.util
.concurrent
.TimeUnit
;
32 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
33 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import java
.util
.concurrent
.atomic
.AtomicLong
;
35 import java
.util
.concurrent
.atomic
.AtomicReference
;
36 import org
.apache
.hadoop
.conf
.Configuration
;
37 import org
.apache
.hadoop
.hbase
.Cell
;
38 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
39 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
40 import org
.apache
.hadoop
.hbase
.HConstants
;
41 import org
.apache
.hadoop
.hbase
.ServerName
;
42 import org
.apache
.hadoop
.hbase
.TableName
;
43 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
44 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
45 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
46 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
47 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
48 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
49 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
50 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCacheKey
;
51 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
52 import org
.apache
.hadoop
.hbase
.io
.hfile
.CachedBlock
;
53 import org
.apache
.hadoop
.hbase
.io
.hfile
.CombinedBlockCache
;
54 import org
.apache
.hadoop
.hbase
.io
.hfile
.bucket
.BucketCache
;
55 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
56 import org
.apache
.hadoop
.hbase
.regionserver
.HStore
;
57 import org
.apache
.hadoop
.hbase
.regionserver
.InternalScanner
;
58 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
59 import org
.apache
.hadoop
.hbase
.regionserver
.ScannerContext
;
60 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
61 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
62 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
63 import org
.junit
.After
;
64 import org
.junit
.AfterClass
;
65 import org
.junit
.Before
;
66 import org
.junit
.BeforeClass
;
67 import org
.junit
.ClassRule
;
68 import org
.junit
.Rule
;
69 import org
.junit
.Test
;
70 import org
.junit
.experimental
.categories
.Category
;
71 import org
.junit
.rules
.TestName
;
72 import org
.slf4j
.Logger
;
73 import org
.slf4j
.LoggerFactory
;
75 @Category({ LargeTests
.class, ClientTests
.class })
76 @SuppressWarnings("deprecation")
77 public class TestBlockEvictionFromClient
{
80 public static final HBaseClassTestRule CLASS_RULE
=
81 HBaseClassTestRule
.forClass(TestBlockEvictionFromClient
.class);
83 private static final Logger LOG
= LoggerFactory
.getLogger(TestBlockEvictionFromClient
.class);
84 protected final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
85 static byte[][] ROWS
= new byte[2][];
86 private static int NO_OF_THREADS
= 3;
87 private static byte[] ROW
= Bytes
.toBytes("testRow");
88 private static byte[] ROW1
= Bytes
.toBytes("testRow1");
89 private static byte[] ROW2
= Bytes
.toBytes("testRow2");
90 private static byte[] ROW3
= Bytes
.toBytes("testRow3");
91 private static byte[] FAMILY
= Bytes
.toBytes("testFamily");
92 private static byte[][] FAMILIES_1
= new byte[1][0];
93 private static byte[] QUALIFIER
= Bytes
.toBytes("testQualifier");
94 private static byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
95 private static byte[] data
= new byte[1000];
96 private static byte[] data2
= Bytes
.add(data
, data
);
97 protected static int SLAVES
= 1;
98 private static CountDownLatch latch
;
99 private static CountDownLatch getLatch
;
100 private static CountDownLatch compactionLatch
;
101 private static CountDownLatch exceptionLatch
;
104 public TestName name
= new TestName();
107 * @throws java.lang.Exception
110 public static void setUpBeforeClass() throws Exception
{
113 Configuration conf
= TEST_UTIL
.getConfiguration();
114 conf
.setStrings(CoprocessorHost
.REGION_COPROCESSOR_CONF_KEY
,
115 MultiRowMutationEndpoint
.class.getName());
116 conf
.setBoolean("hbase.table.sanity.checks", true); // enable for below
118 conf
.setInt("hbase.regionserver.handler.count", 20);
119 conf
.setInt("hbase.bucketcache.size", 400);
120 conf
.setStrings(HConstants
.BUCKET_CACHE_IOENGINE_KEY
, "offheap");
121 conf
.setFloat("hfile.block.cache.size", 0.2f
);
122 conf
.setFloat("hbase.regionserver.global.memstore.size", 0.1f
);
123 conf
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 0);// do not retry
124 conf
.setInt(HConstants
.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD
, 5000);
125 FAMILIES_1
[0] = FAMILY
;
126 TEST_UTIL
.startMiniCluster(SLAVES
);
130 * @throws java.lang.Exception
133 public static void tearDownAfterClass() throws Exception
{
134 TEST_UTIL
.shutdownMiniCluster();
138 * @throws java.lang.Exception
141 public void setUp() throws Exception
{
142 CustomInnerRegionObserver
.waitForGets
.set(false);
143 CustomInnerRegionObserver
.countOfNext
.set(0);
144 CustomInnerRegionObserver
.countOfGets
.set(0);
148 * @throws java.lang.Exception
151 public void tearDown() throws Exception
{
153 while (latch
.getCount() > 0) {
157 if (getLatch
!= null) {
158 getLatch
.countDown();
160 if (compactionLatch
!= null) {
161 compactionLatch
.countDown();
163 if (exceptionLatch
!= null) {
164 exceptionLatch
.countDown();
168 compactionLatch
= null;
169 exceptionLatch
= null;
170 CustomInnerRegionObserver
.throwException
.set(false);
171 // Clean up the tables for every test case
172 TableName
[] listTableNames
= TEST_UTIL
.getAdmin().listTableNames();
173 for (TableName tableName
: listTableNames
) {
174 if (!tableName
.isSystemTable()) {
175 TEST_UTIL
.getAdmin().disableTable(tableName
);
176 TEST_UTIL
.getAdmin().deleteTable(tableName
);
182 public void testBlockEvictionWithParallelScans() throws Exception
{
185 latch
= new CountDownLatch(1);
186 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
187 // Create a table with block size as 1024
188 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
189 CustomInnerRegionObserver
.class.getName());
190 // get the block cache and region
191 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
192 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
193 HRegion region
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
)
194 .getRegion(regionName
);
195 HStore store
= region
.getStores().iterator().next();
196 CacheConfig cacheConf
= store
.getCacheConfig();
197 cacheConf
.setCacheDataOnWrite(true);
198 cacheConf
.setEvictOnClose(true);
199 BlockCache cache
= cacheConf
.getBlockCache().get();
201 // insert data. 2 Rows are added
202 Put put
= new Put(ROW
);
203 put
.addColumn(FAMILY
, QUALIFIER
, data
);
206 put
.addColumn(FAMILY
, QUALIFIER
, data
);
208 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
209 // data was in memstore so don't expect any changes
211 // Should create one Hfile with 2 blocks
214 // Create three sets of scan
215 ScanThread
[] scanThreads
= initiateScan(table
, false);
217 checkForBlockEviction(cache
, false, false);
218 for (ScanThread thread
: scanThreads
) {
221 // CustomInnerRegionObserver.sleepTime.set(0);
222 Iterator
<CachedBlock
> iterator
= cache
.iterator();
223 iterateBlockCache(cache
, iterator
);
224 // read the data and expect same blocks, one new hit, no misses
225 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
226 iterator
= cache
.iterator();
227 iterateBlockCache(cache
, iterator
);
228 // Check how this miss is happening
229 // insert a second column, read the row, no new blocks, 3 new hits
230 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
231 byte[] data2
= Bytes
.add(data
, data
);
233 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
235 Result r
= table
.get(new Get(ROW
));
236 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
237 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
238 iterator
= cache
.iterator();
239 iterateBlockCache(cache
, iterator
);
240 // flush, one new block
241 System
.out
.println("Flushing cache");
243 iterator
= cache
.iterator();
244 iterateBlockCache(cache
, iterator
);
245 // compact, net minus two blocks, two hits, no misses
246 System
.out
.println("Compacting");
247 assertEquals(2, store
.getStorefilesCount());
248 store
.triggerMajorCompaction();
249 region
.compact(true);
250 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
251 assertEquals(1, store
.getStorefilesCount());
252 iterator
= cache
.iterator();
253 iterateBlockCache(cache
, iterator
);
254 // read the row, this should be a cache miss because we don't cache data
255 // blocks on compaction
256 r
= table
.get(new Get(ROW
));
257 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
258 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
259 iterator
= cache
.iterator();
260 iterateBlockCache(cache
, iterator
);
269 public void testParallelGetsAndScans() throws IOException
, InterruptedException
{
272 latch
= new CountDownLatch(2);
273 // Check if get() returns blocks on its close() itself
274 getLatch
= new CountDownLatch(1);
275 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
276 // Create KV that will give you two blocks
277 // Create a table with block size as 1024
278 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
279 CustomInnerRegionObserver
.class.getName());
280 // get the block cache and region
281 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
282 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
284 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
285 HStore store
= region
.getStores().iterator().next();
286 CacheConfig cacheConf
= store
.getCacheConfig();
287 cacheConf
.setCacheDataOnWrite(true);
288 cacheConf
.setEvictOnClose(true);
289 BlockCache cache
= cacheConf
.getBlockCache().get();
293 System
.out
.println("Flushing cache");
294 // Should create one Hfile with 2 blocks
296 // Create three sets of scan
297 CustomInnerRegionObserver
.waitForGets
.set(true);
298 ScanThread
[] scanThreads
= initiateScan(table
, false);
299 // Create three sets of gets
300 GetThread
[] getThreads
= initiateGet(table
, false, false);
301 checkForBlockEviction(cache
, false, false);
302 CustomInnerRegionObserver
.waitForGets
.set(false);
303 checkForBlockEviction(cache
, false, false);
304 for (GetThread thread
: getThreads
) {
307 // Verify whether the gets have returned the blocks that it had
308 CustomInnerRegionObserver
.waitForGets
.set(true);
309 // giving some time for the block to be decremented
310 checkForBlockEviction(cache
, true, false);
311 getLatch
.countDown();
312 for (ScanThread thread
: scanThreads
) {
315 System
.out
.println("Scans should have returned the bloks");
316 // Check with either true or false
317 CustomInnerRegionObserver
.waitForGets
.set(false);
318 // The scan should also have released the blocks by now
319 checkForBlockEviction(cache
, true, true);
328 public void testGetWithCellsInDifferentFiles() throws IOException
, InterruptedException
{
331 latch
= new CountDownLatch(1);
332 // Check if get() returns blocks on its close() itself
333 getLatch
= new CountDownLatch(1);
334 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
335 // Create KV that will give you two blocks
336 // Create a table with block size as 1024
337 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
338 CustomInnerRegionObserver
.class.getName());
339 // get the block cache and region
340 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
341 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
343 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
344 HStore store
= region
.getStores().iterator().next();
345 CacheConfig cacheConf
= store
.getCacheConfig();
346 cacheConf
.setCacheDataOnWrite(true);
347 cacheConf
.setEvictOnClose(true);
348 BlockCache cache
= cacheConf
.getBlockCache().get();
350 Put put
= new Put(ROW
);
351 put
.addColumn(FAMILY
, QUALIFIER
, data
);
355 put
.addColumn(FAMILY
, QUALIFIER
, data
);
358 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
360 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
364 System
.out
.println("Flushing cache");
365 // Should create one Hfile with 2 blocks
366 CustomInnerRegionObserver
.waitForGets
.set(true);
367 // Create three sets of gets
368 GetThread
[] getThreads
= initiateGet(table
, false, false);
370 CustomInnerRegionObserver
.getCdl().get().countDown();
371 for (GetThread thread
: getThreads
) {
374 // Verify whether the gets have returned the blocks that it had
375 CustomInnerRegionObserver
.waitForGets
.set(true);
376 // giving some time for the block to be decremented
377 checkForBlockEviction(cache
, true, false);
378 getLatch
.countDown();
379 System
.out
.println("Gets should have returned the bloks");
388 // TODO : check how block index works here
389 public void testGetsWithMultiColumnsAndExplicitTracker()
390 throws IOException
, InterruptedException
{
393 latch
= new CountDownLatch(1);
394 // Check if get() returns blocks on its close() itself
395 getLatch
= new CountDownLatch(1);
396 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
397 // Create KV that will give you two blocks
398 // Create a table with block size as 1024
399 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
400 CustomInnerRegionObserver
.class.getName());
401 // get the block cache and region
402 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
403 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
405 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
406 BlockCache cache
= setCacheProperties(region
);
407 Put put
= new Put(ROW
);
408 put
.addColumn(FAMILY
, QUALIFIER
, data
);
412 put
.addColumn(FAMILY
, QUALIFIER
, data
);
415 for (int i
= 1; i
< 10; i
++) {
417 put
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + i
), data2
);
423 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
425 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
429 System
.out
.println("Flushing cache");
430 // Should create one Hfile with 2 blocks
431 CustomInnerRegionObserver
.waitForGets
.set(true);
432 // Create three sets of gets
433 GetThread
[] getThreads
= initiateGet(table
, true, false);
435 Iterator
<CachedBlock
> iterator
= cache
.iterator();
436 boolean usedBlocksFound
= false;
438 int noOfBlocksWithRef
= 0;
439 while (iterator
.hasNext()) {
440 CachedBlock next
= iterator
.next();
441 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
442 if (cache
instanceof BucketCache
) {
443 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
444 } else if (cache
instanceof CombinedBlockCache
) {
445 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
450 // Blocks will be with count 3
451 System
.out
.println("The refCount is " + refCount
);
452 assertEquals(NO_OF_THREADS
, refCount
);
453 usedBlocksFound
= true;
457 assertTrue(usedBlocksFound
);
458 // the number of blocks referred
459 assertEquals(10, noOfBlocksWithRef
);
460 CustomInnerRegionObserver
.getCdl().get().countDown();
461 for (GetThread thread
: getThreads
) {
464 // Verify whether the gets have returned the blocks that it had
465 CustomInnerRegionObserver
.waitForGets
.set(true);
466 // giving some time for the block to be decremented
467 checkForBlockEviction(cache
, true, false);
468 getLatch
.countDown();
469 System
.out
.println("Gets should have returned the bloks");
478 public void testGetWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
481 latch
= new CountDownLatch(1);
482 // Check if get() returns blocks on its close() itself
483 getLatch
= new CountDownLatch(1);
484 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
485 // Create KV that will give you two blocks
486 // Create a table with block size as 1024
487 byte[][] fams
= new byte[10][];
489 for (int i
= 1; i
< 10; i
++) {
490 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
492 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
493 CustomInnerRegionObserver
.class.getName());
494 // get the block cache and region
495 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
496 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
498 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
499 BlockCache cache
= setCacheProperties(region
);
501 Put put
= new Put(ROW
);
502 put
.addColumn(FAMILY
, QUALIFIER
, data
);
506 put
.addColumn(FAMILY
, QUALIFIER
, data
);
509 for (int i
= 1; i
< 10; i
++) {
511 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
518 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
520 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
524 System
.out
.println("Flushing cache");
525 // Should create one Hfile with 2 blocks
526 CustomInnerRegionObserver
.waitForGets
.set(true);
527 // Create three sets of gets
528 GetThread
[] getThreads
= initiateGet(table
, true, true);
530 Iterator
<CachedBlock
> iterator
= cache
.iterator();
531 boolean usedBlocksFound
= false;
533 int noOfBlocksWithRef
= 0;
534 while (iterator
.hasNext()) {
535 CachedBlock next
= iterator
.next();
536 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
537 if (cache
instanceof BucketCache
) {
538 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
539 } else if (cache
instanceof CombinedBlockCache
) {
540 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
545 // Blocks will be with count 3
546 System
.out
.println("The refCount is " + refCount
);
547 assertEquals(NO_OF_THREADS
, refCount
);
548 usedBlocksFound
= true;
552 assertTrue(usedBlocksFound
);
553 // the number of blocks referred
554 assertEquals(3, noOfBlocksWithRef
);
555 CustomInnerRegionObserver
.getCdl().get().countDown();
556 for (GetThread thread
: getThreads
) {
559 // Verify whether the gets have returned the blocks that it had
560 CustomInnerRegionObserver
.waitForGets
.set(true);
561 // giving some time for the block to be decremented
562 checkForBlockEviction(cache
, true, false);
563 getLatch
.countDown();
564 System
.out
.println("Gets should have returned the bloks");
573 public void testBlockRefCountAfterSplits() throws IOException
, InterruptedException
{
576 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
577 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024);
578 // get the block cache and region
579 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
580 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
582 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
583 HStore store
= region
.getStores().iterator().next();
584 CacheConfig cacheConf
= store
.getCacheConfig();
585 cacheConf
.setEvictOnClose(true);
586 BlockCache cache
= cacheConf
.getBlockCache().get();
588 Put put
= new Put(ROW
);
589 put
.addColumn(FAMILY
, QUALIFIER
, data
);
593 put
.addColumn(FAMILY
, QUALIFIER
, data
);
596 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
598 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
601 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
604 LOG
.info("About to SPLIT on " + Bytes
.toString(ROW1
));
605 TEST_UTIL
.getAdmin().split(tableName
, ROW1
);
607 Collection
<ServerName
> regionServers
= TEST_UTIL
.getAdmin().getRegionServers();
608 Iterator
<ServerName
> serverItr
= regionServers
.iterator();
610 ServerName rs
= serverItr
.next();
611 List
<RegionInfo
> onlineRegions
= TEST_UTIL
.getAdmin().getRegions(rs
);
612 while (onlineRegions
.size() != 2) {
613 onlineRegions
= TEST_UTIL
.getAdmin().getRegions(rs
);
615 LOG
.info("Waiting on SPLIT to complete...");
617 region
.compact(true);
618 Iterator
<CachedBlock
> iterator
= cache
.iterator();
619 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
620 // should be closed inorder to return those blocks
621 iterateBlockCache(cache
, iterator
);
630 public void testMultiGets() throws IOException
, InterruptedException
{
633 latch
= new CountDownLatch(2);
634 // Check if get() returns blocks on its close() itself
635 getLatch
= new CountDownLatch(1);
636 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
637 // Create KV that will give you two blocks
638 // Create a table with block size as 1024
639 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
640 CustomInnerRegionObserver
.class.getName());
641 // get the block cache and region
642 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
643 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
645 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
646 HStore store
= region
.getStores().iterator().next();
647 CacheConfig cacheConf
= store
.getCacheConfig();
648 cacheConf
.setCacheDataOnWrite(true);
649 cacheConf
.setEvictOnClose(true);
650 BlockCache cache
= cacheConf
.getBlockCache().get();
652 Put put
= new Put(ROW
);
653 put
.addColumn(FAMILY
, QUALIFIER
, data
);
657 put
.addColumn(FAMILY
, QUALIFIER
, data
);
660 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
662 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
666 System
.out
.println("Flushing cache");
667 // Should create one Hfile with 2 blocks
668 CustomInnerRegionObserver
.waitForGets
.set(true);
669 // Create three sets of gets
670 MultiGetThread
[] getThreads
= initiateMultiGet(table
);
673 Iterator
<CachedBlock
> iterator
= cache
.iterator();
674 boolean foundNonZeroBlock
= false;
675 while (iterator
.hasNext()) {
676 CachedBlock next
= iterator
.next();
677 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
678 if (cache
instanceof BucketCache
) {
679 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
680 } else if (cache
instanceof CombinedBlockCache
) {
681 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
686 assertEquals(NO_OF_THREADS
, refCount
);
687 foundNonZeroBlock
= true;
690 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock
);
691 CustomInnerRegionObserver
.getCdl().get().countDown();
692 CustomInnerRegionObserver
.getCdl().get().countDown();
693 for (MultiGetThread thread
: getThreads
) {
696 // Verify whether the gets have returned the blocks that it had
697 CustomInnerRegionObserver
.waitForGets
.set(true);
698 // giving some time for the block to be decremented
699 iterateBlockCache(cache
, iterator
);
700 getLatch
.countDown();
701 System
.out
.println("Gets should have returned the bloks");
709 public void testScanWithMultipleColumnFamilies() throws IOException
, InterruptedException
{
712 latch
= new CountDownLatch(1);
713 // Check if get() returns blocks on its close() itself
714 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
715 // Create KV that will give you two blocks
716 // Create a table with block size as 1024
717 byte[][] fams
= new byte[10][];
719 for (int i
= 1; i
< 10; i
++) {
720 fams
[i
] = (Bytes
.toBytes("testFamily" + i
));
722 table
= TEST_UTIL
.createTable(tableName
, fams
, 1, 1024,
723 CustomInnerRegionObserver
.class.getName());
724 // get the block cache and region
725 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
726 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
728 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
729 BlockCache cache
= setCacheProperties(region
);
731 Put put
= new Put(ROW
);
732 put
.addColumn(FAMILY
, QUALIFIER
, data
);
736 put
.addColumn(FAMILY
, QUALIFIER
, data
);
739 for (int i
= 1; i
< 10; i
++) {
741 put
.addColumn(Bytes
.toBytes("testFamily" + i
), Bytes
.toBytes("testQualifier" + i
), data2
);
748 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
750 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
754 System
.out
.println("Flushing cache");
755 // Should create one Hfile with 2 blocks
756 // Create three sets of gets
757 ScanThread
[] scanThreads
= initiateScan(table
, true);
759 Iterator
<CachedBlock
> iterator
= cache
.iterator();
760 boolean usedBlocksFound
= false;
762 int noOfBlocksWithRef
= 0;
763 while (iterator
.hasNext()) {
764 CachedBlock next
= iterator
.next();
765 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
766 if (cache
instanceof BucketCache
) {
767 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
768 } else if (cache
instanceof CombinedBlockCache
) {
769 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
774 // Blocks will be with count 3
775 System
.out
.println("The refCount is " + refCount
);
776 assertEquals(NO_OF_THREADS
, refCount
);
777 usedBlocksFound
= true;
781 assertTrue(usedBlocksFound
);
782 // the number of blocks referred
783 assertEquals(12, noOfBlocksWithRef
);
784 CustomInnerRegionObserver
.getCdl().get().countDown();
785 for (ScanThread thread
: scanThreads
) {
788 // giving some time for the block to be decremented
789 checkForBlockEviction(cache
, true, false);
797 private BlockCache
setCacheProperties(HRegion region
) {
798 Iterator
<HStore
> strItr
= region
.getStores().iterator();
799 BlockCache cache
= null;
800 while (strItr
.hasNext()) {
801 HStore store
= strItr
.next();
802 CacheConfig cacheConf
= store
.getCacheConfig();
803 cacheConf
.setCacheDataOnWrite(true);
804 cacheConf
.setEvictOnClose(true);
806 cache
= cacheConf
.getBlockCache().get();
812 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException
,
813 InterruptedException
{
816 latch
= new CountDownLatch(2);
817 // Check if get() returns blocks on its close() itself
818 getLatch
= new CountDownLatch(1);
819 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
820 // Create KV that will give you two blocks
821 // Create a table with block size as 1024
822 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
823 CustomInnerRegionObserverWrapper
.class.getName());
824 // get the block cache and region
825 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
826 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
828 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
829 HStore store
= region
.getStores().iterator().next();
830 CacheConfig cacheConf
= store
.getCacheConfig();
831 cacheConf
.setCacheDataOnWrite(true);
832 cacheConf
.setEvictOnClose(true);
833 BlockCache cache
= cacheConf
.getBlockCache().get();
835 // insert data. 2 Rows are added
838 System
.out
.println("Flushing cache");
839 // Should create one Hfile with 2 blocks
841 // CustomInnerRegionObserver.sleepTime.set(5000);
842 // Create three sets of scan
843 CustomInnerRegionObserver
.waitForGets
.set(true);
844 ScanThread
[] scanThreads
= initiateScan(table
, false);
845 // Create three sets of gets
846 GetThread
[] getThreads
= initiateGet(table
, false, false);
847 // The block would have been decremented for the scan case as it was
849 // before even the postNext hook gets executed.
850 // giving some time for the block to be decremented
852 CustomInnerRegionObserver
.waitForGets
.set(false);
853 checkForBlockEviction(cache
, false, false);
854 // countdown the latch
855 CustomInnerRegionObserver
.getCdl().get().countDown();
856 for (GetThread thread
: getThreads
) {
859 getLatch
.countDown();
860 for (ScanThread thread
: scanThreads
) {
871 public void testScanWithCompaction() throws IOException
, InterruptedException
{
872 testScanWithCompactionInternals(name
.getMethodName(), false);
876 public void testReverseScanWithCompaction() throws IOException
, InterruptedException
{
877 testScanWithCompactionInternals(name
.getMethodName(), true);
880 private void testScanWithCompactionInternals(String tableNameStr
, boolean reversed
)
881 throws IOException
, InterruptedException
{
884 latch
= new CountDownLatch(1);
885 compactionLatch
= new CountDownLatch(1);
886 TableName tableName
= TableName
.valueOf(tableNameStr
);
887 // Create a table with block size as 1024
888 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
889 CustomInnerRegionObserverWrapper
.class.getName());
890 // get the block cache and region
891 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
892 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
894 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
895 HStore store
= region
.getStores().iterator().next();
896 CacheConfig cacheConf
= store
.getCacheConfig();
897 cacheConf
.setCacheDataOnWrite(true);
898 cacheConf
.setEvictOnClose(true);
899 BlockCache cache
= cacheConf
.getBlockCache().get();
901 // insert data. 2 Rows are added
902 Put put
= new Put(ROW
);
903 put
.addColumn(FAMILY
, QUALIFIER
, data
);
906 put
.addColumn(FAMILY
, QUALIFIER
, data
);
908 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
909 // Should create one Hfile with 2 blocks
911 // read the data and expect same blocks, one new hit, no misses
913 // Check how this miss is happening
914 // insert a second column, read the row, no new blocks, 3 new hits
915 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
916 byte[] data2
= Bytes
.add(data
, data
);
918 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
920 // flush, one new block
921 System
.out
.println("Flushing cache");
923 Iterator
<CachedBlock
> iterator
= cache
.iterator();
924 iterateBlockCache(cache
, iterator
);
925 // Create three sets of scan
926 ScanThread
[] scanThreads
= initiateScan(table
, reversed
);
928 iterator
= cache
.iterator();
929 boolean usedBlocksFound
= false;
930 while (iterator
.hasNext()) {
931 CachedBlock next
= iterator
.next();
932 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
933 if (cache
instanceof BucketCache
) {
934 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
935 } else if (cache
instanceof CombinedBlockCache
) {
936 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
941 // Blocks will be with count 3
942 assertEquals(NO_OF_THREADS
, refCount
);
943 usedBlocksFound
= true;
946 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
947 usedBlocksFound
= false;
948 System
.out
.println("Compacting");
949 assertEquals(2, store
.getStorefilesCount());
950 store
.triggerMajorCompaction();
951 region
.compact(true);
952 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
953 assertEquals(1, store
.getStorefilesCount());
954 // Even after compaction is done we will have some blocks that cannot
955 // be evicted this is because the scan is still referencing them
956 iterator
= cache
.iterator();
957 while (iterator
.hasNext()) {
958 CachedBlock next
= iterator
.next();
959 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
960 if (cache
instanceof BucketCache
) {
961 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
962 } else if (cache
instanceof CombinedBlockCache
) {
963 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
968 // Blocks will be with count 3 as they are not yet cleared
969 assertEquals(NO_OF_THREADS
, refCount
);
970 usedBlocksFound
= true;
973 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
974 // Should not throw exception
975 compactionLatch
.countDown();
977 for (ScanThread thread
: scanThreads
) {
980 // by this time all blocks should have been evicted
981 iterator
= cache
.iterator();
982 iterateBlockCache(cache
, iterator
);
983 Result r
= table
.get(new Get(ROW
));
984 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
985 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
986 // The gets would be working on new blocks
987 iterator
= cache
.iterator();
988 iterateBlockCache(cache
, iterator
);
997 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
998 throws IOException
, InterruptedException
{
999 // do flush and scan in parallel
1002 latch
= new CountDownLatch(1);
1003 compactionLatch
= new CountDownLatch(1);
1004 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1005 // Create a table with block size as 1024
1006 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
1007 CustomInnerRegionObserverWrapper
.class.getName());
1008 // get the block cache and region
1009 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1010 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1012 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
1013 HStore store
= region
.getStores().iterator().next();
1014 CacheConfig cacheConf
= store
.getCacheConfig();
1015 cacheConf
.setCacheDataOnWrite(true);
1016 cacheConf
.setEvictOnClose(true);
1017 BlockCache cache
= cacheConf
.getBlockCache().get();
1019 // insert data. 2 Rows are added
1020 Put put
= new Put(ROW
);
1021 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1023 put
= new Put(ROW1
);
1024 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1026 assertTrue(Bytes
.equals(table
.get(new Get(ROW
)).value(), data
));
1027 // Should create one Hfile with 2 blocks
1029 // read the data and expect same blocks, one new hit, no misses
1031 // Check how this miss is happening
1032 // insert a second column, read the row, no new blocks, 3 new hits
1033 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1034 byte[] data2
= Bytes
.add(data
, data
);
1036 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1038 // flush, one new block
1039 System
.out
.println("Flushing cache");
1041 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1042 iterateBlockCache(cache
, iterator
);
1043 // Create three sets of scan
1044 ScanThread
[] scanThreads
= initiateScan(table
, false);
1046 iterator
= cache
.iterator();
1047 boolean usedBlocksFound
= false;
1048 while (iterator
.hasNext()) {
1049 CachedBlock next
= iterator
.next();
1050 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1051 if (cache
instanceof BucketCache
) {
1052 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1053 } else if (cache
instanceof CombinedBlockCache
) {
1054 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1058 if (refCount
!= 0) {
1059 // Blocks will be with count 3
1060 assertEquals(NO_OF_THREADS
, refCount
);
1061 usedBlocksFound
= true;
1064 // Make a put and do a flush
1065 QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1066 data2
= Bytes
.add(data
, data
);
1067 put
= new Put(ROW1
);
1068 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1070 // flush, one new block
1071 System
.out
.println("Flushing cache");
1073 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1074 usedBlocksFound
= false;
1075 System
.out
.println("Compacting");
1076 assertEquals(3, store
.getStorefilesCount());
1077 store
.triggerMajorCompaction();
1078 region
.compact(true);
1079 waitForStoreFileCount(store
, 1, 10000); // wait 10 seconds max
1080 assertEquals(1, store
.getStorefilesCount());
1081 // Even after compaction is done we will have some blocks that cannot
1082 // be evicted this is because the scan is still referencing them
1083 iterator
= cache
.iterator();
1084 while (iterator
.hasNext()) {
1085 CachedBlock next
= iterator
.next();
1086 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1087 if (cache
instanceof BucketCache
) {
1088 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1089 } else if (cache
instanceof CombinedBlockCache
) {
1090 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1094 if (refCount
!= 0) {
1095 // Blocks will be with count 3 as they are not yet cleared
1096 assertEquals(NO_OF_THREADS
, refCount
);
1097 usedBlocksFound
= true;
1100 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound
);
1101 // Should not throw exception
1102 compactionLatch
.countDown();
1104 for (ScanThread thread
: scanThreads
) {
1107 // by this time all blocks should have been evicted
1108 iterator
= cache
.iterator();
1109 // Since a flush and compaction happened after a scan started
1110 // we need to ensure that all the original blocks of the compacted file
1112 iterateBlockCache(cache
, iterator
);
1113 Result r
= table
.get(new Get(ROW
));
1114 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1115 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1116 // The gets would be working on new blocks
1117 iterator
= cache
.iterator();
1118 iterateBlockCache(cache
, iterator
);
1120 if (table
!= null) {
1128 public void testScanWithException() throws IOException
, InterruptedException
{
1131 latch
= new CountDownLatch(1);
1132 exceptionLatch
= new CountDownLatch(1);
1133 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
1134 // Create KV that will give you two blocks
1135 // Create a table with block size as 1024
1136 table
= TEST_UTIL
.createTable(tableName
, FAMILIES_1
, 1, 1024,
1137 CustomInnerRegionObserverWrapper
.class.getName());
1138 // get the block cache and region
1139 RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
);
1140 String regionName
= locator
.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1142 TEST_UTIL
.getRSForFirstRegionInTable(tableName
).getRegion(regionName
);
1143 HStore store
= region
.getStores().iterator().next();
1144 CacheConfig cacheConf
= store
.getCacheConfig();
1145 cacheConf
.setCacheDataOnWrite(true);
1146 cacheConf
.setEvictOnClose(true);
1147 BlockCache cache
= cacheConf
.getBlockCache().get();
1148 // insert data. 2 Rows are added
1151 System
.out
.println("Flushing cache");
1152 // Should create one Hfile with 2 blocks
1154 // CustomInnerRegionObserver.sleepTime.set(5000);
1155 CustomInnerRegionObserver
.throwException
.set(true);
1156 ScanThread
[] scanThreads
= initiateScan(table
, false);
1157 // The block would have been decremented for the scan case as it was
1159 // before even the postNext hook gets executed.
1160 // giving some time for the block to be decremented
1162 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1163 boolean usedBlocksFound
= false;
1165 while (iterator
.hasNext()) {
1166 CachedBlock next
= iterator
.next();
1167 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1168 if (cache
instanceof BucketCache
) {
1169 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1170 } else if (cache
instanceof CombinedBlockCache
) {
1171 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1175 if (refCount
!= 0) {
1176 // Blocks will be with count 3
1177 assertEquals(NO_OF_THREADS
, refCount
);
1178 usedBlocksFound
= true;
1181 assertTrue(usedBlocksFound
);
1182 exceptionLatch
.countDown();
1183 // countdown the latch
1184 CustomInnerRegionObserver
.getCdl().get().countDown();
1185 for (ScanThread thread
: scanThreads
) {
1188 iterator
= cache
.iterator();
1189 usedBlocksFound
= false;
1191 while (iterator
.hasNext()) {
1192 CachedBlock next
= iterator
.next();
1193 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1194 if (cache
instanceof BucketCache
) {
1195 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1196 } else if (cache
instanceof CombinedBlockCache
) {
1197 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1201 if (refCount
!= 0) {
1202 // Blocks will be with count 3
1203 assertEquals(NO_OF_THREADS
, refCount
);
1204 usedBlocksFound
= true;
1207 assertFalse(usedBlocksFound
);
1208 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1209 assertEquals(0, refCount
);
1211 if (table
!= null) {
1217 private void iterateBlockCache(BlockCache cache
, Iterator
<CachedBlock
> iterator
) {
1219 while (iterator
.hasNext()) {
1220 CachedBlock next
= iterator
.next();
1221 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1222 if (cache
instanceof BucketCache
) {
1223 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1224 } else if (cache
instanceof CombinedBlockCache
) {
1225 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1229 assertEquals(0, refCount
);
1233 private void insertData(Table table
) throws IOException
{
1234 Put put
= new Put(ROW
);
1235 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1237 put
= new Put(ROW1
);
1238 put
.addColumn(FAMILY
, QUALIFIER
, data
);
1240 byte[] QUALIFIER2
= Bytes
.add(QUALIFIER
, QUALIFIER
);
1242 put
.addColumn(FAMILY
, QUALIFIER2
, data2
);
1246 private ScanThread
[] initiateScan(Table table
, boolean reverse
) throws IOException
,
1247 InterruptedException
{
1248 ScanThread
[] scanThreads
= new ScanThread
[NO_OF_THREADS
];
1249 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1250 scanThreads
[i
] = new ScanThread(table
, reverse
);
1252 for (ScanThread thread
: scanThreads
) {
1258 private GetThread
[] initiateGet(Table table
, boolean tracker
, boolean multipleCFs
)
1259 throws IOException
, InterruptedException
{
1260 GetThread
[] getThreads
= new GetThread
[NO_OF_THREADS
];
1261 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1262 getThreads
[i
] = new GetThread(table
, tracker
, multipleCFs
);
1264 for (GetThread thread
: getThreads
) {
1270 private MultiGetThread
[] initiateMultiGet(Table table
)
1271 throws IOException
, InterruptedException
{
1272 MultiGetThread
[] multiGetThreads
= new MultiGetThread
[NO_OF_THREADS
];
1273 for (int i
= 0; i
< NO_OF_THREADS
; i
++) {
1274 multiGetThreads
[i
] = new MultiGetThread(table
);
1276 for (MultiGetThread thread
: multiGetThreads
) {
1279 return multiGetThreads
;
1282 private void checkForBlockEviction(BlockCache cache
, boolean getClosed
, boolean expectOnlyZero
)
1283 throws InterruptedException
{
1284 int counter
= NO_OF_THREADS
;
1285 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1286 // Because only one row is selected, it has only 2 blocks
1287 counter
= counter
- 1;
1288 while (CustomInnerRegionObserver
.countOfGets
.get() < NO_OF_THREADS
) {
1292 while (CustomInnerRegionObserver
.countOfNext
.get() < NO_OF_THREADS
) {
1296 Iterator
<CachedBlock
> iterator
= cache
.iterator();
1298 while (iterator
.hasNext()) {
1299 CachedBlock next
= iterator
.next();
1300 BlockCacheKey cacheKey
= new BlockCacheKey(next
.getFilename(), next
.getOffset());
1301 if (cache
instanceof BucketCache
) {
1302 refCount
= ((BucketCache
) cache
).getRefCount(cacheKey
);
1303 } else if (cache
instanceof CombinedBlockCache
) {
1304 refCount
= ((CombinedBlockCache
) cache
).getRefCount(cacheKey
);
1308 System
.out
.println(" the refcount is " + refCount
+ " block is " + cacheKey
);
1309 if (CustomInnerRegionObserver
.waitForGets
.get()) {
1310 if (expectOnlyZero
) {
1311 assertTrue(refCount
== 0);
1313 if (refCount
!= 0) {
1314 // Because the scan would have also touched up on these blocks but
1316 // would have touched
1319 // If get has closed only the scan's blocks would be available
1320 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get());
1322 assertEquals(refCount
, CustomInnerRegionObserver
.countOfGets
.get() + (NO_OF_THREADS
));
1326 // Because the get would have also touched up on these blocks but it
1327 // would have touched
1328 // upon only 2 additionally
1329 if (expectOnlyZero
) {
1330 assertTrue(refCount
== 0);
1332 if (refCount
!= 0) {
1333 if (getLatch
== null) {
1334 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get());
1336 assertEquals(refCount
, CustomInnerRegionObserver
.countOfNext
.get() + (NO_OF_THREADS
));
1341 CustomInnerRegionObserver
.getCdl().get().countDown();
1344 private static class MultiGetThread
extends Thread
{
1345 private final Table table
;
1346 private final List
<Get
> gets
= new ArrayList
<>();
1347 public MultiGetThread(Table table
) {
1352 gets
.add(new Get(ROW
));
1353 gets
.add(new Get(ROW1
));
1355 CustomInnerRegionObserver
.getCdl().set(latch
);
1356 Result
[] r
= table
.get(gets
);
1357 assertTrue(Bytes
.equals(r
[0].getRow(), ROW
));
1358 assertTrue(Bytes
.equals(r
[1].getRow(), ROW1
));
1359 } catch (IOException e
) {
1364 private static class GetThread
extends Thread
{
1365 private final Table table
;
1366 private final boolean tracker
;
1367 private final boolean multipleCFs
;
1369 public GetThread(Table table
, boolean tracker
, boolean multipleCFs
) {
1371 this.tracker
= tracker
;
1372 this.multipleCFs
= multipleCFs
;
1379 } catch (IOException e
) {
1384 private void initiateGet(Table table
) throws IOException
{
1385 Get get
= new Get(ROW
);
1389 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 3));
1390 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 8));
1391 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 9));
1393 get
.addColumn(FAMILY
, Bytes
.toBytes("testQualifier" + 900));
1395 get
.addColumn(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3));
1396 get
.addColumn(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8));
1397 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9));
1399 get
.addColumn(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 900));
1402 CustomInnerRegionObserver
.getCdl().set(latch
);
1403 Result r
= table
.get(get
);
1404 System
.out
.println(r
);
1406 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), data
));
1407 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER2
), data2
));
1410 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 3)), data2
));
1411 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 8)), data2
));
1412 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, Bytes
.toBytes("testQualifier" + 9)), data2
));
1414 assertTrue(Bytes
.equals(
1415 r
.getValue(Bytes
.toBytes("testFamily" + 3), Bytes
.toBytes("testQualifier" + 3)),
1417 assertTrue(Bytes
.equals(
1418 r
.getValue(Bytes
.toBytes("testFamily" + 8), Bytes
.toBytes("testQualifier" + 8)),
1420 assertTrue(Bytes
.equals(
1421 r
.getValue(Bytes
.toBytes("testFamily" + 9), Bytes
.toBytes("testQualifier" + 9)),
1428 private static class ScanThread
extends Thread
{
1429 private final Table table
;
1430 private final boolean reverse
;
1432 public ScanThread(Table table
, boolean reverse
) {
1434 this.reverse
= reverse
;
1440 initiateScan(table
);
1441 } catch (IOException e
) {
1446 private void initiateScan(Table table
) throws IOException
{
1447 Scan scan
= new Scan();
1449 scan
.setReversed(true);
1451 CustomInnerRegionObserver
.getCdl().set(latch
);
1452 ResultScanner resScanner
= table
.getScanner(scan
);
1453 int i
= (reverse ? ROWS
.length
- 1 : 0);
1454 boolean resultFound
= false;
1455 for (Result result
: resScanner
) {
1457 System
.out
.println(result
);
1459 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1462 assertTrue(Bytes
.equals(result
.getRow(), ROWS
[i
]));
1466 assertTrue(resultFound
);
1470 private void waitForStoreFileCount(HStore store
, int count
, int timeout
)
1471 throws InterruptedException
{
1472 long start
= System
.currentTimeMillis();
1473 while (start
+ timeout
> System
.currentTimeMillis() && store
.getStorefilesCount() != count
) {
1476 System
.out
.println("start=" + start
+ ", now=" + System
.currentTimeMillis() + ", cur=" +
1477 store
.getStorefilesCount());
1478 assertEquals(count
, store
.getStorefilesCount());
1481 private static class CustomScanner
implements RegionScanner
{
1483 private RegionScanner delegate
;
1485 public CustomScanner(RegionScanner delegate
) {
1486 this.delegate
= delegate
;
1490 public boolean next(List
<Cell
> results
) throws IOException
{
1491 return delegate
.next(results
);
1495 public boolean next(List
<Cell
> result
, ScannerContext scannerContext
) throws IOException
{
1496 return delegate
.next(result
, scannerContext
);
1500 public boolean nextRaw(List
<Cell
> result
) throws IOException
{
1501 return delegate
.nextRaw(result
);
1505 public boolean nextRaw(List
<Cell
> result
, ScannerContext context
) throws IOException
{
1506 boolean nextRaw
= delegate
.nextRaw(result
, context
);
1507 if (compactionLatch
!= null && compactionLatch
.getCount() > 0) {
1509 compactionLatch
.await();
1510 } catch (InterruptedException ie
) {
1514 if (CustomInnerRegionObserver
.throwException
.get()) {
1515 if (exceptionLatch
.getCount() > 0) {
1517 exceptionLatch
.await();
1518 } catch (InterruptedException e
) {
1520 throw new IOException("throw exception");
1527 public void close() throws IOException
{
1532 public RegionInfo
getRegionInfo() {
1533 return delegate
.getRegionInfo();
1537 public boolean isFilterDone() throws IOException
{
1538 return delegate
.isFilterDone();
1542 public boolean reseek(byte[] row
) throws IOException
{
1547 public long getMaxResultSize() {
1548 return delegate
.getMaxResultSize();
1552 public long getMvccReadPoint() {
1553 return delegate
.getMvccReadPoint();
1557 public int getBatch() {
1558 return delegate
.getBatch();
1562 public static class CustomInnerRegionObserverWrapper
extends CustomInnerRegionObserver
{
1564 public RegionScanner
postScannerOpen(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1565 Scan scan
, RegionScanner s
) throws IOException
{
1566 return new CustomScanner(s
);
1570 public static class CustomInnerRegionObserver
implements RegionCoprocessor
, RegionObserver
{
1571 static final AtomicLong sleepTime
= new AtomicLong(0);
1572 static final AtomicBoolean slowDownNext
= new AtomicBoolean(false);
1573 static final AtomicInteger countOfNext
= new AtomicInteger(0);
1574 static final AtomicInteger countOfGets
= new AtomicInteger(0);
1575 static final AtomicBoolean waitForGets
= new AtomicBoolean(false);
1576 static final AtomicBoolean throwException
= new AtomicBoolean(false);
1577 private static final AtomicReference
<CountDownLatch
> cdl
= new AtomicReference
<>(
1578 new CountDownLatch(0));
1581 public Optional
<RegionObserver
> getRegionObserver() {
1582 return Optional
.of(this);
1586 public boolean postScannerNext(ObserverContext
<RegionCoprocessorEnvironment
> e
,
1587 InternalScanner s
, List
<Result
> results
, int limit
, boolean hasMore
) throws IOException
{
1588 slowdownCode(e
, false);
1589 if (getLatch
!= null && getLatch
.getCount() > 0) {
1592 } catch (InterruptedException e1
) {
1599 public void postGetOp(ObserverContext
<RegionCoprocessorEnvironment
> e
, Get get
,
1600 List
<Cell
> results
) throws IOException
{
1601 slowdownCode(e
, true);
1604 public static AtomicReference
<CountDownLatch
> getCdl() {
1608 private void slowdownCode(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
1610 CountDownLatch latch
= getCdl().get();
1612 System
.out
.println(latch
.getCount() + " is the count " + isGet
);
1613 if (latch
.getCount() > 0) {
1615 countOfGets
.incrementAndGet();
1617 countOfNext
.incrementAndGet();
1619 LOG
.info("Waiting for the counterCountDownLatch");
1620 latch
.await(2, TimeUnit
.MINUTES
); // To help the tests to finish.
1621 if (latch
.getCount() > 0) {
1622 throw new RuntimeException("Can't wait more");
1625 } catch (InterruptedException e1
) {
1626 LOG
.error(e1
.toString(), e1
);