HBASE-22002 Remove the deprecated methods in Admin interface
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestBlockEvictionFromClient.java
blob7f201953c033f0e59042ee4519d6a50ea4e7d2b7
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicBoolean;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HBaseClassTestRule;
39 import org.apache.hadoop.hbase.HBaseTestingUtility;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.ServerName;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
44 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
45 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
46 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
47 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
48 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
49 import org.apache.hadoop.hbase.io.hfile.BlockCache;
50 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
51 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
52 import org.apache.hadoop.hbase.io.hfile.CachedBlock;
53 import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
54 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
55 import org.apache.hadoop.hbase.regionserver.HRegion;
56 import org.apache.hadoop.hbase.regionserver.HStore;
57 import org.apache.hadoop.hbase.regionserver.InternalScanner;
58 import org.apache.hadoop.hbase.regionserver.RegionScanner;
59 import org.apache.hadoop.hbase.regionserver.ScannerContext;
60 import org.apache.hadoop.hbase.testclassification.ClientTests;
61 import org.apache.hadoop.hbase.testclassification.LargeTests;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.junit.After;
64 import org.junit.AfterClass;
65 import org.junit.Before;
66 import org.junit.BeforeClass;
67 import org.junit.ClassRule;
68 import org.junit.Rule;
69 import org.junit.Test;
70 import org.junit.experimental.categories.Category;
71 import org.junit.rules.TestName;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
75 @Category({ LargeTests.class, ClientTests.class })
76 @SuppressWarnings("deprecation")
77 public class TestBlockEvictionFromClient {
79 @ClassRule
80 public static final HBaseClassTestRule CLASS_RULE =
81 HBaseClassTestRule.forClass(TestBlockEvictionFromClient.class);
83 private static final Logger LOG = LoggerFactory.getLogger(TestBlockEvictionFromClient.class);
84 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
85 static byte[][] ROWS = new byte[2][];
86 private static int NO_OF_THREADS = 3;
87 private static byte[] ROW = Bytes.toBytes("testRow");
88 private static byte[] ROW1 = Bytes.toBytes("testRow1");
89 private static byte[] ROW2 = Bytes.toBytes("testRow2");
90 private static byte[] ROW3 = Bytes.toBytes("testRow3");
91 private static byte[] FAMILY = Bytes.toBytes("testFamily");
92 private static byte[][] FAMILIES_1 = new byte[1][0];
93 private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
94 private static byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
95 private static byte[] data = new byte[1000];
96 private static byte[] data2 = Bytes.add(data, data);
97 protected static int SLAVES = 1;
98 private static CountDownLatch latch;
99 private static CountDownLatch getLatch;
100 private static CountDownLatch compactionLatch;
101 private static CountDownLatch exceptionLatch;
103 @Rule
104 public TestName name = new TestName();
107 * @throws java.lang.Exception
109 @BeforeClass
110 public static void setUpBeforeClass() throws Exception {
111 ROWS[0] = ROW;
112 ROWS[1] = ROW1;
113 Configuration conf = TEST_UTIL.getConfiguration();
114 conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
115 MultiRowMutationEndpoint.class.getName());
116 conf.setBoolean("hbase.table.sanity.checks", true); // enable for below
117 // tests
118 conf.setInt("hbase.regionserver.handler.count", 20);
119 conf.setInt("hbase.bucketcache.size", 400);
120 conf.setStrings(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
121 conf.setFloat("hfile.block.cache.size", 0.2f);
122 conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
123 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry
124 conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 5000);
125 FAMILIES_1[0] = FAMILY;
126 TEST_UTIL.startMiniCluster(SLAVES);
130 * @throws java.lang.Exception
132 @AfterClass
133 public static void tearDownAfterClass() throws Exception {
134 TEST_UTIL.shutdownMiniCluster();
138 * @throws java.lang.Exception
140 @Before
141 public void setUp() throws Exception {
142 CustomInnerRegionObserver.waitForGets.set(false);
143 CustomInnerRegionObserver.countOfNext.set(0);
144 CustomInnerRegionObserver.countOfGets.set(0);
148 * @throws java.lang.Exception
150 @After
151 public void tearDown() throws Exception {
152 if (latch != null) {
153 while (latch.getCount() > 0) {
154 latch.countDown();
157 if (getLatch != null) {
158 getLatch.countDown();
160 if (compactionLatch != null) {
161 compactionLatch.countDown();
163 if (exceptionLatch != null) {
164 exceptionLatch.countDown();
166 latch = null;
167 getLatch = null;
168 compactionLatch = null;
169 exceptionLatch = null;
170 CustomInnerRegionObserver.throwException.set(false);
171 // Clean up the tables for every test case
172 TableName[] listTableNames = TEST_UTIL.getAdmin().listTableNames();
173 for (TableName tableName : listTableNames) {
174 if (!tableName.isSystemTable()) {
175 TEST_UTIL.getAdmin().disableTable(tableName);
176 TEST_UTIL.getAdmin().deleteTable(tableName);
181 @Test
182 public void testBlockEvictionWithParallelScans() throws Exception {
183 Table table = null;
184 try {
185 latch = new CountDownLatch(1);
186 final TableName tableName = TableName.valueOf(name.getMethodName());
187 // Create a table with block size as 1024
188 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
189 CustomInnerRegionObserver.class.getName());
190 // get the block cache and region
191 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
192 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
193 HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName)
194 .getRegion(regionName);
195 HStore store = region.getStores().iterator().next();
196 CacheConfig cacheConf = store.getCacheConfig();
197 cacheConf.setCacheDataOnWrite(true);
198 cacheConf.setEvictOnClose(true);
199 BlockCache cache = cacheConf.getBlockCache().get();
201 // insert data. 2 Rows are added
202 Put put = new Put(ROW);
203 put.addColumn(FAMILY, QUALIFIER, data);
204 table.put(put);
205 put = new Put(ROW1);
206 put.addColumn(FAMILY, QUALIFIER, data);
207 table.put(put);
208 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
209 // data was in memstore so don't expect any changes
210 // flush the data
211 // Should create one Hfile with 2 blocks
212 region.flush(true);
213 // Load cache
214 // Create three sets of scan
215 ScanThread[] scanThreads = initiateScan(table, false);
216 Thread.sleep(100);
217 checkForBlockEviction(cache, false, false);
218 for (ScanThread thread : scanThreads) {
219 thread.join();
221 // CustomInnerRegionObserver.sleepTime.set(0);
222 Iterator<CachedBlock> iterator = cache.iterator();
223 iterateBlockCache(cache, iterator);
224 // read the data and expect same blocks, one new hit, no misses
225 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
226 iterator = cache.iterator();
227 iterateBlockCache(cache, iterator);
228 // Check how this miss is happening
229 // insert a second column, read the row, no new blocks, 3 new hits
230 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
231 byte[] data2 = Bytes.add(data, data);
232 put = new Put(ROW);
233 put.addColumn(FAMILY, QUALIFIER2, data2);
234 table.put(put);
235 Result r = table.get(new Get(ROW));
236 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
237 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
238 iterator = cache.iterator();
239 iterateBlockCache(cache, iterator);
240 // flush, one new block
241 System.out.println("Flushing cache");
242 region.flush(true);
243 iterator = cache.iterator();
244 iterateBlockCache(cache, iterator);
245 // compact, net minus two blocks, two hits, no misses
246 System.out.println("Compacting");
247 assertEquals(2, store.getStorefilesCount());
248 store.triggerMajorCompaction();
249 region.compact(true);
250 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
251 assertEquals(1, store.getStorefilesCount());
252 iterator = cache.iterator();
253 iterateBlockCache(cache, iterator);
254 // read the row, this should be a cache miss because we don't cache data
255 // blocks on compaction
256 r = table.get(new Get(ROW));
257 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
258 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
259 iterator = cache.iterator();
260 iterateBlockCache(cache, iterator);
261 } finally {
262 if (table != null) {
263 table.close();
268 @Test
269 public void testParallelGetsAndScans() throws IOException, InterruptedException {
270 Table table = null;
271 try {
272 latch = new CountDownLatch(2);
273 // Check if get() returns blocks on its close() itself
274 getLatch = new CountDownLatch(1);
275 final TableName tableName = TableName.valueOf(name.getMethodName());
276 // Create KV that will give you two blocks
277 // Create a table with block size as 1024
278 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
279 CustomInnerRegionObserver.class.getName());
280 // get the block cache and region
281 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
282 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
283 HRegion region =
284 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
285 HStore store = region.getStores().iterator().next();
286 CacheConfig cacheConf = store.getCacheConfig();
287 cacheConf.setCacheDataOnWrite(true);
288 cacheConf.setEvictOnClose(true);
289 BlockCache cache = cacheConf.getBlockCache().get();
291 insertData(table);
292 // flush the data
293 System.out.println("Flushing cache");
294 // Should create one Hfile with 2 blocks
295 region.flush(true);
296 // Create three sets of scan
297 CustomInnerRegionObserver.waitForGets.set(true);
298 ScanThread[] scanThreads = initiateScan(table, false);
299 // Create three sets of gets
300 GetThread[] getThreads = initiateGet(table, false, false);
301 checkForBlockEviction(cache, false, false);
302 CustomInnerRegionObserver.waitForGets.set(false);
303 checkForBlockEviction(cache, false, false);
304 for (GetThread thread : getThreads) {
305 thread.join();
307 // Verify whether the gets have returned the blocks that it had
308 CustomInnerRegionObserver.waitForGets.set(true);
309 // giving some time for the block to be decremented
310 checkForBlockEviction(cache, true, false);
311 getLatch.countDown();
312 for (ScanThread thread : scanThreads) {
313 thread.join();
315 System.out.println("Scans should have returned the bloks");
316 // Check with either true or false
317 CustomInnerRegionObserver.waitForGets.set(false);
318 // The scan should also have released the blocks by now
319 checkForBlockEviction(cache, true, true);
320 } finally {
321 if (table != null) {
322 table.close();
327 @Test
328 public void testGetWithCellsInDifferentFiles() throws IOException, InterruptedException {
329 Table table = null;
330 try {
331 latch = new CountDownLatch(1);
332 // Check if get() returns blocks on its close() itself
333 getLatch = new CountDownLatch(1);
334 final TableName tableName = TableName.valueOf(name.getMethodName());
335 // Create KV that will give you two blocks
336 // Create a table with block size as 1024
337 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
338 CustomInnerRegionObserver.class.getName());
339 // get the block cache and region
340 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
341 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
342 HRegion region =
343 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
344 HStore store = region.getStores().iterator().next();
345 CacheConfig cacheConf = store.getCacheConfig();
346 cacheConf.setCacheDataOnWrite(true);
347 cacheConf.setEvictOnClose(true);
348 BlockCache cache = cacheConf.getBlockCache().get();
350 Put put = new Put(ROW);
351 put.addColumn(FAMILY, QUALIFIER, data);
352 table.put(put);
353 region.flush(true);
354 put = new Put(ROW1);
355 put.addColumn(FAMILY, QUALIFIER, data);
356 table.put(put);
357 region.flush(true);
358 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
359 put = new Put(ROW);
360 put.addColumn(FAMILY, QUALIFIER2, data2);
361 table.put(put);
362 region.flush(true);
363 // flush the data
364 System.out.println("Flushing cache");
365 // Should create one Hfile with 2 blocks
366 CustomInnerRegionObserver.waitForGets.set(true);
367 // Create three sets of gets
368 GetThread[] getThreads = initiateGet(table, false, false);
369 Thread.sleep(200);
370 CustomInnerRegionObserver.getCdl().get().countDown();
371 for (GetThread thread : getThreads) {
372 thread.join();
374 // Verify whether the gets have returned the blocks that it had
375 CustomInnerRegionObserver.waitForGets.set(true);
376 // giving some time for the block to be decremented
377 checkForBlockEviction(cache, true, false);
378 getLatch.countDown();
379 System.out.println("Gets should have returned the bloks");
380 } finally {
381 if (table != null) {
382 table.close();
387 @Test
388 // TODO : check how block index works here
389 public void testGetsWithMultiColumnsAndExplicitTracker()
390 throws IOException, InterruptedException {
391 Table table = null;
392 try {
393 latch = new CountDownLatch(1);
394 // Check if get() returns blocks on its close() itself
395 getLatch = new CountDownLatch(1);
396 final TableName tableName = TableName.valueOf(name.getMethodName());
397 // Create KV that will give you two blocks
398 // Create a table with block size as 1024
399 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
400 CustomInnerRegionObserver.class.getName());
401 // get the block cache and region
402 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
403 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
404 HRegion region =
405 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
406 BlockCache cache = setCacheProperties(region);
407 Put put = new Put(ROW);
408 put.addColumn(FAMILY, QUALIFIER, data);
409 table.put(put);
410 region.flush(true);
411 put = new Put(ROW1);
412 put.addColumn(FAMILY, QUALIFIER, data);
413 table.put(put);
414 region.flush(true);
415 for (int i = 1; i < 10; i++) {
416 put = new Put(ROW);
417 put.addColumn(FAMILY, Bytes.toBytes("testQualifier" + i), data2);
418 table.put(put);
419 if (i % 2 == 0) {
420 region.flush(true);
423 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
424 put = new Put(ROW);
425 put.addColumn(FAMILY, QUALIFIER2, data2);
426 table.put(put);
427 region.flush(true);
428 // flush the data
429 System.out.println("Flushing cache");
430 // Should create one Hfile with 2 blocks
431 CustomInnerRegionObserver.waitForGets.set(true);
432 // Create three sets of gets
433 GetThread[] getThreads = initiateGet(table, true, false);
434 Thread.sleep(200);
435 Iterator<CachedBlock> iterator = cache.iterator();
436 boolean usedBlocksFound = false;
437 int refCount = 0;
438 int noOfBlocksWithRef = 0;
439 while (iterator.hasNext()) {
440 CachedBlock next = iterator.next();
441 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
442 if (cache instanceof BucketCache) {
443 refCount = ((BucketCache) cache).getRefCount(cacheKey);
444 } else if (cache instanceof CombinedBlockCache) {
445 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
446 } else {
447 continue;
449 if (refCount != 0) {
450 // Blocks will be with count 3
451 System.out.println("The refCount is " + refCount);
452 assertEquals(NO_OF_THREADS, refCount);
453 usedBlocksFound = true;
454 noOfBlocksWithRef++;
457 assertTrue(usedBlocksFound);
458 // the number of blocks referred
459 assertEquals(10, noOfBlocksWithRef);
460 CustomInnerRegionObserver.getCdl().get().countDown();
461 for (GetThread thread : getThreads) {
462 thread.join();
464 // Verify whether the gets have returned the blocks that it had
465 CustomInnerRegionObserver.waitForGets.set(true);
466 // giving some time for the block to be decremented
467 checkForBlockEviction(cache, true, false);
468 getLatch.countDown();
469 System.out.println("Gets should have returned the bloks");
470 } finally {
471 if (table != null) {
472 table.close();
477 @Test
478 public void testGetWithMultipleColumnFamilies() throws IOException, InterruptedException {
479 Table table = null;
480 try {
481 latch = new CountDownLatch(1);
482 // Check if get() returns blocks on its close() itself
483 getLatch = new CountDownLatch(1);
484 final TableName tableName = TableName.valueOf(name.getMethodName());
485 // Create KV that will give you two blocks
486 // Create a table with block size as 1024
487 byte[][] fams = new byte[10][];
488 fams[0] = FAMILY;
489 for (int i = 1; i < 10; i++) {
490 fams[i] = (Bytes.toBytes("testFamily" + i));
492 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
493 CustomInnerRegionObserver.class.getName());
494 // get the block cache and region
495 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
496 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
497 HRegion region =
498 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
499 BlockCache cache = setCacheProperties(region);
501 Put put = new Put(ROW);
502 put.addColumn(FAMILY, QUALIFIER, data);
503 table.put(put);
504 region.flush(true);
505 put = new Put(ROW1);
506 put.addColumn(FAMILY, QUALIFIER, data);
507 table.put(put);
508 region.flush(true);
509 for (int i = 1; i < 10; i++) {
510 put = new Put(ROW);
511 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
512 table.put(put);
513 if (i % 2 == 0) {
514 region.flush(true);
517 region.flush(true);
518 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
519 put = new Put(ROW);
520 put.addColumn(FAMILY, QUALIFIER2, data2);
521 table.put(put);
522 region.flush(true);
523 // flush the data
524 System.out.println("Flushing cache");
525 // Should create one Hfile with 2 blocks
526 CustomInnerRegionObserver.waitForGets.set(true);
527 // Create three sets of gets
528 GetThread[] getThreads = initiateGet(table, true, true);
529 Thread.sleep(200);
530 Iterator<CachedBlock> iterator = cache.iterator();
531 boolean usedBlocksFound = false;
532 int refCount = 0;
533 int noOfBlocksWithRef = 0;
534 while (iterator.hasNext()) {
535 CachedBlock next = iterator.next();
536 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
537 if (cache instanceof BucketCache) {
538 refCount = ((BucketCache) cache).getRefCount(cacheKey);
539 } else if (cache instanceof CombinedBlockCache) {
540 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
541 } else {
542 continue;
544 if (refCount != 0) {
545 // Blocks will be with count 3
546 System.out.println("The refCount is " + refCount);
547 assertEquals(NO_OF_THREADS, refCount);
548 usedBlocksFound = true;
549 noOfBlocksWithRef++;
552 assertTrue(usedBlocksFound);
553 // the number of blocks referred
554 assertEquals(3, noOfBlocksWithRef);
555 CustomInnerRegionObserver.getCdl().get().countDown();
556 for (GetThread thread : getThreads) {
557 thread.join();
559 // Verify whether the gets have returned the blocks that it had
560 CustomInnerRegionObserver.waitForGets.set(true);
561 // giving some time for the block to be decremented
562 checkForBlockEviction(cache, true, false);
563 getLatch.countDown();
564 System.out.println("Gets should have returned the bloks");
565 } finally {
566 if (table != null) {
567 table.close();
572 @Test
573 public void testBlockRefCountAfterSplits() throws IOException, InterruptedException {
574 Table table = null;
575 try {
576 final TableName tableName = TableName.valueOf(name.getMethodName());
577 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024);
578 // get the block cache and region
579 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
580 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
581 HRegion region =
582 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
583 HStore store = region.getStores().iterator().next();
584 CacheConfig cacheConf = store.getCacheConfig();
585 cacheConf.setEvictOnClose(true);
586 BlockCache cache = cacheConf.getBlockCache().get();
588 Put put = new Put(ROW);
589 put.addColumn(FAMILY, QUALIFIER, data);
590 table.put(put);
591 region.flush(true);
592 put = new Put(ROW1);
593 put.addColumn(FAMILY, QUALIFIER, data);
594 table.put(put);
595 region.flush(true);
596 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
597 put = new Put(ROW2);
598 put.addColumn(FAMILY, QUALIFIER2, data2);
599 table.put(put);
600 put = new Put(ROW3);
601 put.addColumn(FAMILY, QUALIFIER2, data2);
602 table.put(put);
603 region.flush(true);
604 LOG.info("About to SPLIT on " + Bytes.toString(ROW1));
605 TEST_UTIL.getAdmin().split(tableName, ROW1);
606 // Wait for splits
607 Collection<ServerName> regionServers = TEST_UTIL.getAdmin().getRegionServers();
608 Iterator<ServerName> serverItr = regionServers.iterator();
609 serverItr.hasNext();
610 ServerName rs = serverItr.next();
611 List<RegionInfo> onlineRegions = TEST_UTIL.getAdmin().getRegions(rs);
612 while (onlineRegions.size() != 2) {
613 onlineRegions = TEST_UTIL.getAdmin().getRegions(rs);
614 Thread.sleep(100);
615 LOG.info("Waiting on SPLIT to complete...");
617 region.compact(true);
618 Iterator<CachedBlock> iterator = cache.iterator();
619 // Though the split had created the HalfStorefileReader - the firstkey and lastkey scanners
620 // should be closed inorder to return those blocks
621 iterateBlockCache(cache, iterator);
622 } finally {
623 if (table != null) {
624 table.close();
629 @Test
630 public void testMultiGets() throws IOException, InterruptedException {
631 Table table = null;
632 try {
633 latch = new CountDownLatch(2);
634 // Check if get() returns blocks on its close() itself
635 getLatch = new CountDownLatch(1);
636 final TableName tableName = TableName.valueOf(name.getMethodName());
637 // Create KV that will give you two blocks
638 // Create a table with block size as 1024
639 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
640 CustomInnerRegionObserver.class.getName());
641 // get the block cache and region
642 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
643 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
644 HRegion region =
645 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
646 HStore store = region.getStores().iterator().next();
647 CacheConfig cacheConf = store.getCacheConfig();
648 cacheConf.setCacheDataOnWrite(true);
649 cacheConf.setEvictOnClose(true);
650 BlockCache cache = cacheConf.getBlockCache().get();
652 Put put = new Put(ROW);
653 put.addColumn(FAMILY, QUALIFIER, data);
654 table.put(put);
655 region.flush(true);
656 put = new Put(ROW1);
657 put.addColumn(FAMILY, QUALIFIER, data);
658 table.put(put);
659 region.flush(true);
660 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
661 put = new Put(ROW);
662 put.addColumn(FAMILY, QUALIFIER2, data2);
663 table.put(put);
664 region.flush(true);
665 // flush the data
666 System.out.println("Flushing cache");
667 // Should create one Hfile with 2 blocks
668 CustomInnerRegionObserver.waitForGets.set(true);
669 // Create three sets of gets
670 MultiGetThread[] getThreads = initiateMultiGet(table);
671 Thread.sleep(200);
672 int refCount;
673 Iterator<CachedBlock> iterator = cache.iterator();
674 boolean foundNonZeroBlock = false;
675 while (iterator.hasNext()) {
676 CachedBlock next = iterator.next();
677 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
678 if (cache instanceof BucketCache) {
679 refCount = ((BucketCache) cache).getRefCount(cacheKey);
680 } else if (cache instanceof CombinedBlockCache) {
681 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
682 } else {
683 continue;
685 if (refCount != 0) {
686 assertEquals(NO_OF_THREADS, refCount);
687 foundNonZeroBlock = true;
690 assertTrue("Should have found nonzero ref count block",foundNonZeroBlock);
691 CustomInnerRegionObserver.getCdl().get().countDown();
692 CustomInnerRegionObserver.getCdl().get().countDown();
693 for (MultiGetThread thread : getThreads) {
694 thread.join();
696 // Verify whether the gets have returned the blocks that it had
697 CustomInnerRegionObserver.waitForGets.set(true);
698 // giving some time for the block to be decremented
699 iterateBlockCache(cache, iterator);
700 getLatch.countDown();
701 System.out.println("Gets should have returned the bloks");
702 } finally {
703 if (table != null) {
704 table.close();
708 @Test
709 public void testScanWithMultipleColumnFamilies() throws IOException, InterruptedException {
710 Table table = null;
711 try {
712 latch = new CountDownLatch(1);
713 // Check if get() returns blocks on its close() itself
714 final TableName tableName = TableName.valueOf(name.getMethodName());
715 // Create KV that will give you two blocks
716 // Create a table with block size as 1024
717 byte[][] fams = new byte[10][];
718 fams[0] = FAMILY;
719 for (int i = 1; i < 10; i++) {
720 fams[i] = (Bytes.toBytes("testFamily" + i));
722 table = TEST_UTIL.createTable(tableName, fams, 1, 1024,
723 CustomInnerRegionObserver.class.getName());
724 // get the block cache and region
725 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
726 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
727 HRegion region =
728 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
729 BlockCache cache = setCacheProperties(region);
731 Put put = new Put(ROW);
732 put.addColumn(FAMILY, QUALIFIER, data);
733 table.put(put);
734 region.flush(true);
735 put = new Put(ROW1);
736 put.addColumn(FAMILY, QUALIFIER, data);
737 table.put(put);
738 region.flush(true);
739 for (int i = 1; i < 10; i++) {
740 put = new Put(ROW);
741 put.addColumn(Bytes.toBytes("testFamily" + i), Bytes.toBytes("testQualifier" + i), data2);
742 table.put(put);
743 if (i % 2 == 0) {
744 region.flush(true);
747 region.flush(true);
748 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
749 put = new Put(ROW);
750 put.addColumn(FAMILY, QUALIFIER2, data2);
751 table.put(put);
752 region.flush(true);
753 // flush the data
754 System.out.println("Flushing cache");
755 // Should create one Hfile with 2 blocks
756 // Create three sets of gets
757 ScanThread[] scanThreads = initiateScan(table, true);
758 Thread.sleep(200);
759 Iterator<CachedBlock> iterator = cache.iterator();
760 boolean usedBlocksFound = false;
761 int refCount = 0;
762 int noOfBlocksWithRef = 0;
763 while (iterator.hasNext()) {
764 CachedBlock next = iterator.next();
765 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
766 if (cache instanceof BucketCache) {
767 refCount = ((BucketCache) cache).getRefCount(cacheKey);
768 } else if (cache instanceof CombinedBlockCache) {
769 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
770 } else {
771 continue;
773 if (refCount != 0) {
774 // Blocks will be with count 3
775 System.out.println("The refCount is " + refCount);
776 assertEquals(NO_OF_THREADS, refCount);
777 usedBlocksFound = true;
778 noOfBlocksWithRef++;
781 assertTrue(usedBlocksFound);
782 // the number of blocks referred
783 assertEquals(12, noOfBlocksWithRef);
784 CustomInnerRegionObserver.getCdl().get().countDown();
785 for (ScanThread thread : scanThreads) {
786 thread.join();
788 // giving some time for the block to be decremented
789 checkForBlockEviction(cache, true, false);
790 } finally {
791 if (table != null) {
792 table.close();
797 private BlockCache setCacheProperties(HRegion region) {
798 Iterator<HStore> strItr = region.getStores().iterator();
799 BlockCache cache = null;
800 while (strItr.hasNext()) {
801 HStore store = strItr.next();
802 CacheConfig cacheConf = store.getCacheConfig();
803 cacheConf.setCacheDataOnWrite(true);
804 cacheConf.setEvictOnClose(true);
805 // Use the last one
806 cache = cacheConf.getBlockCache().get();
808 return cache;
811 @Test
812 public void testParallelGetsAndScanWithWrappedRegionScanner() throws IOException,
813 InterruptedException {
814 Table table = null;
815 try {
816 latch = new CountDownLatch(2);
817 // Check if get() returns blocks on its close() itself
818 getLatch = new CountDownLatch(1);
819 final TableName tableName = TableName.valueOf(name.getMethodName());
820 // Create KV that will give you two blocks
821 // Create a table with block size as 1024
822 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
823 CustomInnerRegionObserverWrapper.class.getName());
824 // get the block cache and region
825 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
826 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
827 HRegion region =
828 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
829 HStore store = region.getStores().iterator().next();
830 CacheConfig cacheConf = store.getCacheConfig();
831 cacheConf.setCacheDataOnWrite(true);
832 cacheConf.setEvictOnClose(true);
833 BlockCache cache = cacheConf.getBlockCache().get();
835 // insert data. 2 Rows are added
836 insertData(table);
837 // flush the data
838 System.out.println("Flushing cache");
839 // Should create one Hfile with 2 blocks
840 region.flush(true);
841 // CustomInnerRegionObserver.sleepTime.set(5000);
842 // Create three sets of scan
843 CustomInnerRegionObserver.waitForGets.set(true);
844 ScanThread[] scanThreads = initiateScan(table, false);
845 // Create three sets of gets
846 GetThread[] getThreads = initiateGet(table, false, false);
847 // The block would have been decremented for the scan case as it was
848 // wrapped
849 // before even the postNext hook gets executed.
850 // giving some time for the block to be decremented
851 Thread.sleep(100);
852 CustomInnerRegionObserver.waitForGets.set(false);
853 checkForBlockEviction(cache, false, false);
854 // countdown the latch
855 CustomInnerRegionObserver.getCdl().get().countDown();
856 for (GetThread thread : getThreads) {
857 thread.join();
859 getLatch.countDown();
860 for (ScanThread thread : scanThreads) {
861 thread.join();
863 } finally {
864 if (table != null) {
865 table.close();
870 @Test
871 public void testScanWithCompaction() throws IOException, InterruptedException {
872 testScanWithCompactionInternals(name.getMethodName(), false);
875 @Test
876 public void testReverseScanWithCompaction() throws IOException, InterruptedException {
877 testScanWithCompactionInternals(name.getMethodName(), true);
880 private void testScanWithCompactionInternals(String tableNameStr, boolean reversed)
881 throws IOException, InterruptedException {
882 Table table = null;
883 try {
884 latch = new CountDownLatch(1);
885 compactionLatch = new CountDownLatch(1);
886 TableName tableName = TableName.valueOf(tableNameStr);
887 // Create a table with block size as 1024
888 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
889 CustomInnerRegionObserverWrapper.class.getName());
890 // get the block cache and region
891 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
892 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
893 HRegion region =
894 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
895 HStore store = region.getStores().iterator().next();
896 CacheConfig cacheConf = store.getCacheConfig();
897 cacheConf.setCacheDataOnWrite(true);
898 cacheConf.setEvictOnClose(true);
899 BlockCache cache = cacheConf.getBlockCache().get();
901 // insert data. 2 Rows are added
902 Put put = new Put(ROW);
903 put.addColumn(FAMILY, QUALIFIER, data);
904 table.put(put);
905 put = new Put(ROW1);
906 put.addColumn(FAMILY, QUALIFIER, data);
907 table.put(put);
908 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
909 // Should create one Hfile with 2 blocks
910 region.flush(true);
911 // read the data and expect same blocks, one new hit, no misses
912 int refCount = 0;
913 // Check how this miss is happening
914 // insert a second column, read the row, no new blocks, 3 new hits
915 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
916 byte[] data2 = Bytes.add(data, data);
917 put = new Put(ROW);
918 put.addColumn(FAMILY, QUALIFIER2, data2);
919 table.put(put);
920 // flush, one new block
921 System.out.println("Flushing cache");
922 region.flush(true);
923 Iterator<CachedBlock> iterator = cache.iterator();
924 iterateBlockCache(cache, iterator);
925 // Create three sets of scan
926 ScanThread[] scanThreads = initiateScan(table, reversed);
927 Thread.sleep(100);
928 iterator = cache.iterator();
929 boolean usedBlocksFound = false;
930 while (iterator.hasNext()) {
931 CachedBlock next = iterator.next();
932 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
933 if (cache instanceof BucketCache) {
934 refCount = ((BucketCache) cache).getRefCount(cacheKey);
935 } else if (cache instanceof CombinedBlockCache) {
936 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
937 } else {
938 continue;
940 if (refCount != 0) {
941 // Blocks will be with count 3
942 assertEquals(NO_OF_THREADS, refCount);
943 usedBlocksFound = true;
946 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
947 usedBlocksFound = false;
948 System.out.println("Compacting");
949 assertEquals(2, store.getStorefilesCount());
950 store.triggerMajorCompaction();
951 region.compact(true);
952 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
953 assertEquals(1, store.getStorefilesCount());
954 // Even after compaction is done we will have some blocks that cannot
955 // be evicted this is because the scan is still referencing them
956 iterator = cache.iterator();
957 while (iterator.hasNext()) {
958 CachedBlock next = iterator.next();
959 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
960 if (cache instanceof BucketCache) {
961 refCount = ((BucketCache) cache).getRefCount(cacheKey);
962 } else if (cache instanceof CombinedBlockCache) {
963 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
964 } else {
965 continue;
967 if (refCount != 0) {
968 // Blocks will be with count 3 as they are not yet cleared
969 assertEquals(NO_OF_THREADS, refCount);
970 usedBlocksFound = true;
973 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
974 // Should not throw exception
975 compactionLatch.countDown();
976 latch.countDown();
977 for (ScanThread thread : scanThreads) {
978 thread.join();
980 // by this time all blocks should have been evicted
981 iterator = cache.iterator();
982 iterateBlockCache(cache, iterator);
983 Result r = table.get(new Get(ROW));
984 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
985 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
986 // The gets would be working on new blocks
987 iterator = cache.iterator();
988 iterateBlockCache(cache, iterator);
989 } finally {
990 if (table != null) {
991 table.close();
996 @Test
997 public void testBlockEvictionAfterHBASE13082WithCompactionAndFlush()
998 throws IOException, InterruptedException {
999 // do flush and scan in parallel
1000 Table table = null;
1001 try {
1002 latch = new CountDownLatch(1);
1003 compactionLatch = new CountDownLatch(1);
1004 final TableName tableName = TableName.valueOf(name.getMethodName());
1005 // Create a table with block size as 1024
1006 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1007 CustomInnerRegionObserverWrapper.class.getName());
1008 // get the block cache and region
1009 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1010 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1011 HRegion region =
1012 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1013 HStore store = region.getStores().iterator().next();
1014 CacheConfig cacheConf = store.getCacheConfig();
1015 cacheConf.setCacheDataOnWrite(true);
1016 cacheConf.setEvictOnClose(true);
1017 BlockCache cache = cacheConf.getBlockCache().get();
1019 // insert data. 2 Rows are added
1020 Put put = new Put(ROW);
1021 put.addColumn(FAMILY, QUALIFIER, data);
1022 table.put(put);
1023 put = new Put(ROW1);
1024 put.addColumn(FAMILY, QUALIFIER, data);
1025 table.put(put);
1026 assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
1027 // Should create one Hfile with 2 blocks
1028 region.flush(true);
1029 // read the data and expect same blocks, one new hit, no misses
1030 int refCount = 0;
1031 // Check how this miss is happening
1032 // insert a second column, read the row, no new blocks, 3 new hits
1033 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1034 byte[] data2 = Bytes.add(data, data);
1035 put = new Put(ROW);
1036 put.addColumn(FAMILY, QUALIFIER2, data2);
1037 table.put(put);
1038 // flush, one new block
1039 System.out.println("Flushing cache");
1040 region.flush(true);
1041 Iterator<CachedBlock> iterator = cache.iterator();
1042 iterateBlockCache(cache, iterator);
1043 // Create three sets of scan
1044 ScanThread[] scanThreads = initiateScan(table, false);
1045 Thread.sleep(100);
1046 iterator = cache.iterator();
1047 boolean usedBlocksFound = false;
1048 while (iterator.hasNext()) {
1049 CachedBlock next = iterator.next();
1050 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1051 if (cache instanceof BucketCache) {
1052 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1053 } else if (cache instanceof CombinedBlockCache) {
1054 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1055 } else {
1056 continue;
1058 if (refCount != 0) {
1059 // Blocks will be with count 3
1060 assertEquals(NO_OF_THREADS, refCount);
1061 usedBlocksFound = true;
1064 // Make a put and do a flush
1065 QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1066 data2 = Bytes.add(data, data);
1067 put = new Put(ROW1);
1068 put.addColumn(FAMILY, QUALIFIER2, data2);
1069 table.put(put);
1070 // flush, one new block
1071 System.out.println("Flushing cache");
1072 region.flush(true);
1073 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1074 usedBlocksFound = false;
1075 System.out.println("Compacting");
1076 assertEquals(3, store.getStorefilesCount());
1077 store.triggerMajorCompaction();
1078 region.compact(true);
1079 waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
1080 assertEquals(1, store.getStorefilesCount());
1081 // Even after compaction is done we will have some blocks that cannot
1082 // be evicted this is because the scan is still referencing them
1083 iterator = cache.iterator();
1084 while (iterator.hasNext()) {
1085 CachedBlock next = iterator.next();
1086 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1087 if (cache instanceof BucketCache) {
1088 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1089 } else if (cache instanceof CombinedBlockCache) {
1090 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1091 } else {
1092 continue;
1094 if (refCount != 0) {
1095 // Blocks will be with count 3 as they are not yet cleared
1096 assertEquals(NO_OF_THREADS, refCount);
1097 usedBlocksFound = true;
1100 assertTrue("Blocks with non zero ref count should be found ", usedBlocksFound);
1101 // Should not throw exception
1102 compactionLatch.countDown();
1103 latch.countDown();
1104 for (ScanThread thread : scanThreads) {
1105 thread.join();
1107 // by this time all blocks should have been evicted
1108 iterator = cache.iterator();
1109 // Since a flush and compaction happened after a scan started
1110 // we need to ensure that all the original blocks of the compacted file
1111 // is also removed.
1112 iterateBlockCache(cache, iterator);
1113 Result r = table.get(new Get(ROW));
1114 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1115 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1116 // The gets would be working on new blocks
1117 iterator = cache.iterator();
1118 iterateBlockCache(cache, iterator);
1119 } finally {
1120 if (table != null) {
1121 table.close();
1127 @Test
1128 public void testScanWithException() throws IOException, InterruptedException {
1129 Table table = null;
1130 try {
1131 latch = new CountDownLatch(1);
1132 exceptionLatch = new CountDownLatch(1);
1133 final TableName tableName = TableName.valueOf(name.getMethodName());
1134 // Create KV that will give you two blocks
1135 // Create a table with block size as 1024
1136 table = TEST_UTIL.createTable(tableName, FAMILIES_1, 1, 1024,
1137 CustomInnerRegionObserverWrapper.class.getName());
1138 // get the block cache and region
1139 RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
1140 String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
1141 HRegion region =
1142 TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
1143 HStore store = region.getStores().iterator().next();
1144 CacheConfig cacheConf = store.getCacheConfig();
1145 cacheConf.setCacheDataOnWrite(true);
1146 cacheConf.setEvictOnClose(true);
1147 BlockCache cache = cacheConf.getBlockCache().get();
1148 // insert data. 2 Rows are added
1149 insertData(table);
1150 // flush the data
1151 System.out.println("Flushing cache");
1152 // Should create one Hfile with 2 blocks
1153 region.flush(true);
1154 // CustomInnerRegionObserver.sleepTime.set(5000);
1155 CustomInnerRegionObserver.throwException.set(true);
1156 ScanThread[] scanThreads = initiateScan(table, false);
1157 // The block would have been decremented for the scan case as it was
1158 // wrapped
1159 // before even the postNext hook gets executed.
1160 // giving some time for the block to be decremented
1161 Thread.sleep(100);
1162 Iterator<CachedBlock> iterator = cache.iterator();
1163 boolean usedBlocksFound = false;
1164 int refCount = 0;
1165 while (iterator.hasNext()) {
1166 CachedBlock next = iterator.next();
1167 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1168 if (cache instanceof BucketCache) {
1169 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1170 } else if (cache instanceof CombinedBlockCache) {
1171 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1172 } else {
1173 continue;
1175 if (refCount != 0) {
1176 // Blocks will be with count 3
1177 assertEquals(NO_OF_THREADS, refCount);
1178 usedBlocksFound = true;
1181 assertTrue(usedBlocksFound);
1182 exceptionLatch.countDown();
1183 // countdown the latch
1184 CustomInnerRegionObserver.getCdl().get().countDown();
1185 for (ScanThread thread : scanThreads) {
1186 thread.join();
1188 iterator = cache.iterator();
1189 usedBlocksFound = false;
1190 refCount = 0;
1191 while (iterator.hasNext()) {
1192 CachedBlock next = iterator.next();
1193 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1194 if (cache instanceof BucketCache) {
1195 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1196 } else if (cache instanceof CombinedBlockCache) {
1197 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1198 } else {
1199 continue;
1201 if (refCount != 0) {
1202 // Blocks will be with count 3
1203 assertEquals(NO_OF_THREADS, refCount);
1204 usedBlocksFound = true;
1207 assertFalse(usedBlocksFound);
1208 // you should always see 0 ref count. since after HBASE-16604 we always recreate the scanner
1209 assertEquals(0, refCount);
1210 } finally {
1211 if (table != null) {
1212 table.close();
1217 private void iterateBlockCache(BlockCache cache, Iterator<CachedBlock> iterator) {
1218 int refCount;
1219 while (iterator.hasNext()) {
1220 CachedBlock next = iterator.next();
1221 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1222 if (cache instanceof BucketCache) {
1223 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1224 } else if (cache instanceof CombinedBlockCache) {
1225 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1226 } else {
1227 continue;
1229 assertEquals(0, refCount);
1233 private void insertData(Table table) throws IOException {
1234 Put put = new Put(ROW);
1235 put.addColumn(FAMILY, QUALIFIER, data);
1236 table.put(put);
1237 put = new Put(ROW1);
1238 put.addColumn(FAMILY, QUALIFIER, data);
1239 table.put(put);
1240 byte[] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
1241 put = new Put(ROW);
1242 put.addColumn(FAMILY, QUALIFIER2, data2);
1243 table.put(put);
1246 private ScanThread[] initiateScan(Table table, boolean reverse) throws IOException,
1247 InterruptedException {
1248 ScanThread[] scanThreads = new ScanThread[NO_OF_THREADS];
1249 for (int i = 0; i < NO_OF_THREADS; i++) {
1250 scanThreads[i] = new ScanThread(table, reverse);
1252 for (ScanThread thread : scanThreads) {
1253 thread.start();
1255 return scanThreads;
1258 private GetThread[] initiateGet(Table table, boolean tracker, boolean multipleCFs)
1259 throws IOException, InterruptedException {
1260 GetThread[] getThreads = new GetThread[NO_OF_THREADS];
1261 for (int i = 0; i < NO_OF_THREADS; i++) {
1262 getThreads[i] = new GetThread(table, tracker, multipleCFs);
1264 for (GetThread thread : getThreads) {
1265 thread.start();
1267 return getThreads;
1270 private MultiGetThread[] initiateMultiGet(Table table)
1271 throws IOException, InterruptedException {
1272 MultiGetThread[] multiGetThreads = new MultiGetThread[NO_OF_THREADS];
1273 for (int i = 0; i < NO_OF_THREADS; i++) {
1274 multiGetThreads[i] = new MultiGetThread(table);
1276 for (MultiGetThread thread : multiGetThreads) {
1277 thread.start();
1279 return multiGetThreads;
1282 private void checkForBlockEviction(BlockCache cache, boolean getClosed, boolean expectOnlyZero)
1283 throws InterruptedException {
1284 int counter = NO_OF_THREADS;
1285 if (CustomInnerRegionObserver.waitForGets.get()) {
1286 // Because only one row is selected, it has only 2 blocks
1287 counter = counter - 1;
1288 while (CustomInnerRegionObserver.countOfGets.get() < NO_OF_THREADS) {
1289 Thread.sleep(100);
1291 } else {
1292 while (CustomInnerRegionObserver.countOfNext.get() < NO_OF_THREADS) {
1293 Thread.sleep(100);
1296 Iterator<CachedBlock> iterator = cache.iterator();
1297 int refCount = 0;
1298 while (iterator.hasNext()) {
1299 CachedBlock next = iterator.next();
1300 BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset());
1301 if (cache instanceof BucketCache) {
1302 refCount = ((BucketCache) cache).getRefCount(cacheKey);
1303 } else if (cache instanceof CombinedBlockCache) {
1304 refCount = ((CombinedBlockCache) cache).getRefCount(cacheKey);
1305 } else {
1306 continue;
1308 System.out.println(" the refcount is " + refCount + " block is " + cacheKey);
1309 if (CustomInnerRegionObserver.waitForGets.get()) {
1310 if (expectOnlyZero) {
1311 assertTrue(refCount == 0);
1313 if (refCount != 0) {
1314 // Because the scan would have also touched up on these blocks but
1315 // it
1316 // would have touched
1317 // all 3
1318 if (getClosed) {
1319 // If get has closed only the scan's blocks would be available
1320 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get());
1321 } else {
1322 assertEquals(refCount, CustomInnerRegionObserver.countOfGets.get() + (NO_OF_THREADS));
1325 } else {
1326 // Because the get would have also touched up on these blocks but it
1327 // would have touched
1328 // upon only 2 additionally
1329 if (expectOnlyZero) {
1330 assertTrue(refCount == 0);
1332 if (refCount != 0) {
1333 if (getLatch == null) {
1334 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get());
1335 } else {
1336 assertEquals(refCount, CustomInnerRegionObserver.countOfNext.get() + (NO_OF_THREADS));
1341 CustomInnerRegionObserver.getCdl().get().countDown();
1344 private static class MultiGetThread extends Thread {
1345 private final Table table;
1346 private final List<Get> gets = new ArrayList<>();
1347 public MultiGetThread(Table table) {
1348 this.table = table;
1350 @Override
1351 public void run() {
1352 gets.add(new Get(ROW));
1353 gets.add(new Get(ROW1));
1354 try {
1355 CustomInnerRegionObserver.getCdl().set(latch);
1356 Result[] r = table.get(gets);
1357 assertTrue(Bytes.equals(r[0].getRow(), ROW));
1358 assertTrue(Bytes.equals(r[1].getRow(), ROW1));
1359 } catch (IOException e) {
1364 private static class GetThread extends Thread {
1365 private final Table table;
1366 private final boolean tracker;
1367 private final boolean multipleCFs;
1369 public GetThread(Table table, boolean tracker, boolean multipleCFs) {
1370 this.table = table;
1371 this.tracker = tracker;
1372 this.multipleCFs = multipleCFs;
1375 @Override
1376 public void run() {
1377 try {
1378 initiateGet(table);
1379 } catch (IOException e) {
1380 // do nothing
1384 private void initiateGet(Table table) throws IOException {
1385 Get get = new Get(ROW);
1386 if (tracker) {
1387 // Change this
1388 if (!multipleCFs) {
1389 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 3));
1390 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 8));
1391 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 9));
1392 // Unknown key
1393 get.addColumn(FAMILY, Bytes.toBytes("testQualifier" + 900));
1394 } else {
1395 get.addColumn(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3));
1396 get.addColumn(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8));
1397 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9));
1398 // Unknown key
1399 get.addColumn(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 900));
1402 CustomInnerRegionObserver.getCdl().set(latch);
1403 Result r = table.get(get);
1404 System.out.println(r);
1405 if (!tracker) {
1406 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
1407 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
1408 } else {
1409 if (!multipleCFs) {
1410 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 3)), data2));
1411 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 8)), data2));
1412 assertTrue(Bytes.equals(r.getValue(FAMILY, Bytes.toBytes("testQualifier" + 9)), data2));
1413 } else {
1414 assertTrue(Bytes.equals(
1415 r.getValue(Bytes.toBytes("testFamily" + 3), Bytes.toBytes("testQualifier" + 3)),
1416 data2));
1417 assertTrue(Bytes.equals(
1418 r.getValue(Bytes.toBytes("testFamily" + 8), Bytes.toBytes("testQualifier" + 8)),
1419 data2));
1420 assertTrue(Bytes.equals(
1421 r.getValue(Bytes.toBytes("testFamily" + 9), Bytes.toBytes("testQualifier" + 9)),
1422 data2));
1428 private static class ScanThread extends Thread {
1429 private final Table table;
1430 private final boolean reverse;
1432 public ScanThread(Table table, boolean reverse) {
1433 this.table = table;
1434 this.reverse = reverse;
1437 @Override
1438 public void run() {
1439 try {
1440 initiateScan(table);
1441 } catch (IOException e) {
1442 // do nothing
1446 private void initiateScan(Table table) throws IOException {
1447 Scan scan = new Scan();
1448 if (reverse) {
1449 scan.setReversed(true);
1451 CustomInnerRegionObserver.getCdl().set(latch);
1452 ResultScanner resScanner = table.getScanner(scan);
1453 int i = (reverse ? ROWS.length - 1 : 0);
1454 boolean resultFound = false;
1455 for (Result result : resScanner) {
1456 resultFound = true;
1457 System.out.println(result);
1458 if (!reverse) {
1459 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1460 i++;
1461 } else {
1462 assertTrue(Bytes.equals(result.getRow(), ROWS[i]));
1463 i--;
1466 assertTrue(resultFound);
1470 private void waitForStoreFileCount(HStore store, int count, int timeout)
1471 throws InterruptedException {
1472 long start = System.currentTimeMillis();
1473 while (start + timeout > System.currentTimeMillis() && store.getStorefilesCount() != count) {
1474 Thread.sleep(100);
1476 System.out.println("start=" + start + ", now=" + System.currentTimeMillis() + ", cur=" +
1477 store.getStorefilesCount());
1478 assertEquals(count, store.getStorefilesCount());
1481 private static class CustomScanner implements RegionScanner {
1483 private RegionScanner delegate;
1485 public CustomScanner(RegionScanner delegate) {
1486 this.delegate = delegate;
1489 @Override
1490 public boolean next(List<Cell> results) throws IOException {
1491 return delegate.next(results);
1494 @Override
1495 public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
1496 return delegate.next(result, scannerContext);
1499 @Override
1500 public boolean nextRaw(List<Cell> result) throws IOException {
1501 return delegate.nextRaw(result);
1504 @Override
1505 public boolean nextRaw(List<Cell> result, ScannerContext context) throws IOException {
1506 boolean nextRaw = delegate.nextRaw(result, context);
1507 if (compactionLatch != null && compactionLatch.getCount() > 0) {
1508 try {
1509 compactionLatch.await();
1510 } catch (InterruptedException ie) {
1514 if (CustomInnerRegionObserver.throwException.get()) {
1515 if (exceptionLatch.getCount() > 0) {
1516 try {
1517 exceptionLatch.await();
1518 } catch (InterruptedException e) {
1520 throw new IOException("throw exception");
1523 return nextRaw;
1526 @Override
1527 public void close() throws IOException {
1528 delegate.close();
1531 @Override
1532 public RegionInfo getRegionInfo() {
1533 return delegate.getRegionInfo();
1536 @Override
1537 public boolean isFilterDone() throws IOException {
1538 return delegate.isFilterDone();
1541 @Override
1542 public boolean reseek(byte[] row) throws IOException {
1543 return false;
1546 @Override
1547 public long getMaxResultSize() {
1548 return delegate.getMaxResultSize();
1551 @Override
1552 public long getMvccReadPoint() {
1553 return delegate.getMvccReadPoint();
1556 @Override
1557 public int getBatch() {
1558 return delegate.getBatch();
1562 public static class CustomInnerRegionObserverWrapper extends CustomInnerRegionObserver {
1563 @Override
1564 public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
1565 Scan scan, RegionScanner s) throws IOException {
1566 return new CustomScanner(s);
1570 public static class CustomInnerRegionObserver implements RegionCoprocessor, RegionObserver {
1571 static final AtomicLong sleepTime = new AtomicLong(0);
1572 static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
1573 static final AtomicInteger countOfNext = new AtomicInteger(0);
1574 static final AtomicInteger countOfGets = new AtomicInteger(0);
1575 static final AtomicBoolean waitForGets = new AtomicBoolean(false);
1576 static final AtomicBoolean throwException = new AtomicBoolean(false);
1577 private static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(
1578 new CountDownLatch(0));
1580 @Override
1581 public Optional<RegionObserver> getRegionObserver() {
1582 return Optional.of(this);
1585 @Override
1586 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
1587 InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
1588 slowdownCode(e, false);
1589 if (getLatch != null && getLatch.getCount() > 0) {
1590 try {
1591 getLatch.await();
1592 } catch (InterruptedException e1) {
1595 return hasMore;
1598 @Override
1599 public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
1600 List<Cell> results) throws IOException {
1601 slowdownCode(e, true);
1604 public static AtomicReference<CountDownLatch> getCdl() {
1605 return cdl;
1608 private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e,
1609 boolean isGet) {
1610 CountDownLatch latch = getCdl().get();
1611 try {
1612 System.out.println(latch.getCount() + " is the count " + isGet);
1613 if (latch.getCount() > 0) {
1614 if (isGet) {
1615 countOfGets.incrementAndGet();
1616 } else {
1617 countOfNext.incrementAndGet();
1619 LOG.info("Waiting for the counterCountDownLatch");
1620 latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
1621 if (latch.getCount() > 0) {
1622 throw new RuntimeException("Can't wait more");
1625 } catch (InterruptedException e1) {
1626 LOG.error(e1.toString(), e1);