HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestCompaction.java
blob48c7609b8439dc0eaad9fb84ac6ceb126499fc9d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
21 import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
22 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
23 import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertTrue;
27 import static org.junit.Assert.fail;
28 import static org.mockito.Matchers.any;
29 import static org.mockito.Mockito.doAnswer;
30 import static org.mockito.Mockito.mock;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.when;
34 import java.io.IOException;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.List;
38 import java.util.Optional;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.fs.FSDataOutputStream;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.hbase.ChoreService;
47 import org.apache.hadoop.hbase.HBaseClassTestRule;
48 import org.apache.hadoop.hbase.HBaseConfiguration;
49 import org.apache.hadoop.hbase.HBaseTestCase;
50 import org.apache.hadoop.hbase.HBaseTestingUtility;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
53 import org.apache.hadoop.hbase.client.Delete;
54 import org.apache.hadoop.hbase.client.Durability;
55 import org.apache.hadoop.hbase.client.Put;
56 import org.apache.hadoop.hbase.client.Table;
57 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
58 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
59 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
60 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
61 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
62 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
63 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
64 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
65 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
66 import org.apache.hadoop.hbase.security.User;
67 import org.apache.hadoop.hbase.testclassification.MediumTests;
68 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
69 import org.apache.hadoop.hbase.util.Bytes;
70 import org.apache.hadoop.hbase.util.Threads;
71 import org.apache.hadoop.hbase.wal.WAL;
72 import org.junit.After;
73 import org.junit.Assume;
74 import org.junit.Before;
75 import org.junit.ClassRule;
76 import org.junit.Rule;
77 import org.junit.Test;
78 import org.junit.experimental.categories.Category;
79 import org.junit.rules.TestName;
80 import org.mockito.Mockito;
81 import org.mockito.invocation.InvocationOnMock;
82 import org.mockito.stubbing.Answer;
84 /**
85 * Test compaction framework and common functions
87 @Category({RegionServerTests.class, MediumTests.class})
88 public class TestCompaction {
90 @ClassRule
91 public static final HBaseClassTestRule CLASS_RULE =
92 HBaseClassTestRule.forClass(TestCompaction.class);
94 @Rule public TestName name = new TestName();
95 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
96 protected Configuration conf = UTIL.getConfiguration();
98 private HRegion r = null;
99 private TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor = null;
100 private static final byte [] COLUMN_FAMILY = fam1;
101 private final byte [] STARTROW = Bytes.toBytes(START_KEY);
102 private static final byte [] COLUMN_FAMILY_TEXT = COLUMN_FAMILY;
103 private int compactionThreshold;
104 private byte[] secondRowBytes, thirdRowBytes;
105 private static final long MAX_FILES_TO_COMPACT = 10;
106 private final byte[] FAMILY = Bytes.toBytes("cf");
108 /** constructor */
109 public TestCompaction() {
110 super();
112 // Set cache flush size to 1MB
113 conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
114 conf.setInt(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 100);
115 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
116 NoLimitThroughputController.class.getName());
117 compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
119 secondRowBytes = START_KEY_BYTES.clone();
120 // Increment the least significant character so we get to next row.
121 secondRowBytes[START_KEY_BYTES.length - 1]++;
122 thirdRowBytes = START_KEY_BYTES.clone();
123 thirdRowBytes[START_KEY_BYTES.length - 1] =
124 (byte) (thirdRowBytes[START_KEY_BYTES.length - 1] + 2);
127 @Before
128 public void setUp() throws Exception {
129 this.tableDescriptor = UTIL.createModifyableTableDescriptor(name.getMethodName());
130 if (name.getMethodName().equals("testCompactionSeqId")) {
131 UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
132 UTIL.getConfiguration().set(
133 DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
134 DummyCompactor.class.getName());
135 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor =
136 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(FAMILY);
137 familyDescriptor.setMaxVersions(65536);
138 this.tableDescriptor.setColumnFamily(familyDescriptor);
140 this.r = UTIL.createLocalHRegion(tableDescriptor, null, null);
143 @After
144 public void tearDown() throws Exception {
145 WAL wal = r.getWAL();
146 this.r.close();
147 wal.close();
151 * Verify that you can stop a long-running compaction
152 * (used during RS shutdown)
153 * @throws Exception
155 @Test
156 public void testInterruptCompaction() throws Exception {
157 assertEquals(0, count());
159 // lower the polling interval for this test
160 int origWI = HStore.closeCheckInterval;
161 HStore.closeCheckInterval = 10*1000; // 10 KB
163 try {
164 // Create a couple store files w/ 15KB (over 10KB interval)
165 int jmax = (int) Math.ceil(15.0/compactionThreshold);
166 byte [] pad = new byte[1000]; // 1 KB chunk
167 for (int i = 0; i < compactionThreshold; i++) {
168 Table loader = new RegionAsTable(r);
169 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
170 p.setDurability(Durability.SKIP_WAL);
171 for (int j = 0; j < jmax; j++) {
172 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
174 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
175 loader.put(p);
176 r.flush(true);
179 HRegion spyR = spy(r);
180 doAnswer(new Answer() {
181 @Override
182 public Object answer(InvocationOnMock invocation) throws Throwable {
183 r.writestate.writesEnabled = false;
184 return invocation.callRealMethod();
186 }).when(spyR).doRegionCompactionPrep();
188 // force a minor compaction, but not before requesting a stop
189 spyR.compactStores();
191 // ensure that the compaction stopped, all old files are intact,
192 HStore s = r.getStore(COLUMN_FAMILY);
193 assertEquals(compactionThreshold, s.getStorefilesCount());
194 assertTrue(s.getStorefilesSize() > 15*1000);
195 // and no new store files persisted past compactStores()
196 // only one empty dir exists in temp dir
197 FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
198 assertEquals(1, ls.length);
199 Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
200 assertTrue(r.getFilesystem().exists(storeTempDir));
201 ls = r.getFilesystem().listStatus(storeTempDir);
202 assertEquals(0, ls.length);
203 } finally {
204 // don't mess up future tests
205 r.writestate.writesEnabled = true;
206 HStore.closeCheckInterval = origWI;
208 // Delete all Store information once done using
209 for (int i = 0; i < compactionThreshold; i++) {
210 Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
211 byte [][] famAndQf = {COLUMN_FAMILY, null};
212 delete.addFamily(famAndQf[0]);
213 r.delete(delete);
215 r.flush(true);
217 // Multiple versions allowed for an entry, so the delete isn't enough
218 // Lower TTL and expire to ensure that all our entries have been wiped
219 final int ttl = 1000;
220 for (HStore store : this.r.stores.values()) {
221 ScanInfo old = store.getScanInfo();
222 ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
223 store.setScanInfo(si);
225 Thread.sleep(ttl);
227 r.compact(true);
228 assertEquals(0, count());
232 private int count() throws IOException {
233 int count = 0;
234 for (HStoreFile f: this.r.stores.
235 get(COLUMN_FAMILY_TEXT).getStorefiles()) {
236 HFileScanner scanner = f.getReader().getScanner(false, false);
237 if (!scanner.seekTo()) {
238 continue;
240 do {
241 count++;
242 } while(scanner.next());
244 return count;
247 private void createStoreFile(final HRegion region) throws IOException {
248 createStoreFile(region, Bytes.toString(COLUMN_FAMILY));
251 private void createStoreFile(final HRegion region, String family) throws IOException {
252 Table loader = new RegionAsTable(region);
253 HBaseTestCase.addContent(loader, family);
254 region.flush(true);
257 @Test
258 public void testCompactionWithCorruptResult() throws Exception {
259 int nfiles = 10;
260 for (int i = 0; i < nfiles; i++) {
261 createStoreFile(r);
263 HStore store = r.getStore(COLUMN_FAMILY);
265 Collection<HStoreFile> storeFiles = store.getStorefiles();
266 DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
267 tool.compactForTesting(storeFiles, false);
269 // Now lets corrupt the compacted file.
270 FileSystem fs = store.getFileSystem();
271 // default compaction policy created one and only one new compacted file
272 Path dstPath = store.getRegionFileSystem().createTempName();
273 FSDataOutputStream stream = fs.create(dstPath, null, true, 512, (short)3, 1024L, null);
274 stream.writeChars("CORRUPT FILE!!!!");
275 stream.close();
276 Path origPath = store.getRegionFileSystem().commitStoreFile(
277 Bytes.toString(COLUMN_FAMILY), dstPath);
279 try {
280 ((HStore)store).moveFileIntoPlace(origPath);
281 } catch (Exception e) {
282 // The complete compaction should fail and the corrupt file should remain
283 // in the 'tmp' directory;
284 assertTrue(fs.exists(origPath));
285 assertFalse(fs.exists(dstPath));
286 System.out.println("testCompactionWithCorruptResult Passed");
287 return;
289 fail("testCompactionWithCorruptResult failed since no exception was" +
290 "thrown while completing a corrupt file");
294 * Create a custom compaction request and be sure that we can track it through the queue, knowing
295 * when the compaction is completed.
297 @Test
298 public void testTrackingCompactionRequest() throws Exception {
299 // setup a compact/split thread on a mock server
300 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
301 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
302 CompactSplit thread = new CompactSplit(mockServer);
303 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
305 // setup a region/store with some files
306 HStore store = r.getStore(COLUMN_FAMILY);
307 createStoreFile(r);
308 for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
309 createStoreFile(r);
312 CountDownLatch latch = new CountDownLatch(1);
313 Tracker tracker = new Tracker(latch);
314 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER, tracker,
315 null);
316 // wait for the latch to complete.
317 latch.await();
319 thread.interruptIfNecessary();
322 @Test
323 public void testCompactionFailure() throws Exception {
324 // setup a compact/split thread on a mock server
325 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
326 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
327 CompactSplit thread = new CompactSplit(mockServer);
328 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
330 // setup a region/store with some files
331 HStore store = r.getStore(COLUMN_FAMILY);
332 createStoreFile(r);
333 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
334 createStoreFile(r);
337 HRegion mockRegion = Mockito.spy(r);
338 Mockito.when(mockRegion.checkSplit()).thenThrow(new IndexOutOfBoundsException());
340 MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
342 long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
343 long preFailedCount = metricsWrapper.getNumCompactionsFailed();
345 CountDownLatch latch = new CountDownLatch(1);
346 Tracker tracker = new Tracker(latch);
347 thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
348 tracker, null);
349 // wait for the latch to complete.
350 latch.await(120, TimeUnit.SECONDS);
352 // compaction should have completed and been marked as failed due to error in split request
353 long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
354 long postFailedCount = metricsWrapper.getNumCompactionsFailed();
356 assertTrue("Completed count should have increased (pre=" + preCompletedCount +
357 ", post="+postCompletedCount+")",
358 postCompletedCount > preCompletedCount);
359 assertTrue("Failed count should have increased (pre=" + preFailedCount +
360 ", post=" + postFailedCount + ")",
361 postFailedCount > preFailedCount);
365 * Test no new Compaction requests are generated after calling stop compactions
367 @Test
368 public void testStopStartCompaction() throws IOException {
369 // setup a compact/split thread on a mock server
370 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
371 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
372 CompactSplit thread = new CompactSplit(mockServer);
373 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
374 // setup a region/store with some files
375 HStore store = r.getStore(COLUMN_FAMILY);
376 createStoreFile(r);
377 for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
378 createStoreFile(r);
380 thread.switchCompaction(false);
381 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
382 CompactionLifeCycleTracker.DUMMY, null);
383 assertEquals(false, thread.isCompactionsEnabled());
384 int longCompactions = thread.getLongCompactions().getActiveCount();
385 int shortCompactions = thread.getShortCompactions().getActiveCount();
386 assertEquals("longCompactions=" + longCompactions + "," +
387 "shortCompactions=" + shortCompactions, 0, longCompactions + shortCompactions);
388 thread.switchCompaction(true);
389 assertEquals(true, thread.isCompactionsEnabled());
390 thread.requestCompaction(r, store, "test", Store.PRIORITY_USER,
391 CompactionLifeCycleTracker.DUMMY, null);
392 longCompactions = thread.getLongCompactions().getActiveCount();
393 shortCompactions = thread.getShortCompactions().getActiveCount();
394 assertEquals("longCompactions=" + longCompactions + "," +
395 "shortCompactions=" + shortCompactions, 1, longCompactions + shortCompactions);
398 @Test public void testInterruptingRunningCompactions() throws Exception {
399 // setup a compact/split thread on a mock server
400 conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
401 WaitThroughPutController.class.getName());
402 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
403 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
404 CompactSplit thread = new CompactSplit(mockServer);
406 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
408 // setup a region/store with some files
409 HStore store = r.getStore(COLUMN_FAMILY);
410 int jmax = (int) Math.ceil(15.0 / compactionThreshold);
411 byte[] pad = new byte[1000]; // 1 KB chunk
412 for (int i = 0; i < compactionThreshold; i++) {
413 Table loader = new RegionAsTable(r);
414 Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
415 p.setDurability(Durability.SKIP_WAL);
416 for (int j = 0; j < jmax; j++) {
417 p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
419 HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
420 loader.put(p);
421 r.flush(true);
423 HStore s = r.getStore(COLUMN_FAMILY);
424 int initialFiles = s.getStorefilesCount();
426 thread.requestCompaction(r, store, "test custom comapction", PRIORITY_USER,
427 CompactionLifeCycleTracker.DUMMY, null);
429 Thread.sleep(3000);
430 thread.switchCompaction(false);
431 assertEquals(initialFiles, s.getStorefilesCount());
432 //don't mess up future tests
433 thread.switchCompaction(true);
437 * HBASE-7947: Regression test to ensure adding to the correct list in the
438 * {@link CompactSplit}
439 * @throws Exception on failure
441 @Test
442 public void testMultipleCustomCompactionRequests() throws Exception {
443 // setup a compact/split thread on a mock server
444 HRegionServer mockServer = Mockito.mock(HRegionServer.class);
445 Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
446 CompactSplit thread = new CompactSplit(mockServer);
447 Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
449 // setup a region/store with some files
450 int numStores = r.getStores().size();
451 CountDownLatch latch = new CountDownLatch(numStores);
452 Tracker tracker = new Tracker(latch);
453 // create some store files and setup requests for each store on which we want to do a
454 // compaction
455 for (HStore store : r.getStores()) {
456 createStoreFile(r, store.getColumnFamilyName());
457 createStoreFile(r, store.getColumnFamilyName());
458 createStoreFile(r, store.getColumnFamilyName());
459 thread.requestCompaction(r, store, "test mulitple custom comapctions", PRIORITY_USER,
460 tracker, null);
462 // wait for the latch to complete.
463 latch.await();
465 thread.interruptIfNecessary();
468 class StoreMockMaker extends StatefulStoreMockMaker {
469 public ArrayList<HStoreFile> compacting = new ArrayList<>();
470 public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
471 private final ArrayList<Integer> results;
473 public StoreMockMaker(ArrayList<Integer> results) {
474 this.results = results;
477 public class TestCompactionContext extends CompactionContext {
479 private List<HStoreFile> selectedFiles;
481 public TestCompactionContext(List<HStoreFile> selectedFiles) {
482 super();
483 this.selectedFiles = selectedFiles;
486 @Override
487 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
488 return new ArrayList<>();
491 @Override
492 public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
493 boolean mayUseOffPeak, boolean forceMajor) throws IOException {
494 this.request = new CompactionRequestImpl(selectedFiles);
495 this.request.setPriority(getPriority());
496 return true;
499 @Override
500 public List<Path> compact(ThroughputController throughputController, User user)
501 throws IOException {
502 finishCompaction(this.selectedFiles);
503 return new ArrayList<>();
507 @Override
508 public synchronized Optional<CompactionContext> selectCompaction() {
509 CompactionContext ctx = new TestCompactionContext(new ArrayList<>(notCompacting));
510 compacting.addAll(notCompacting);
511 notCompacting.clear();
512 try {
513 ctx.select(null, false, false, false);
514 } catch (IOException ex) {
515 fail("Shouldn't happen");
517 return Optional.of(ctx);
520 @Override
521 public synchronized void cancelCompaction(Object object) {
522 TestCompactionContext ctx = (TestCompactionContext)object;
523 compacting.removeAll(ctx.selectedFiles);
524 notCompacting.addAll(ctx.selectedFiles);
527 public synchronized void finishCompaction(List<HStoreFile> sfs) {
528 if (sfs.isEmpty()) return;
529 synchronized (results) {
530 results.add(sfs.size());
532 compacting.removeAll(sfs);
535 @Override
536 public int getPriority() {
537 return 7 - compacting.size() - notCompacting.size();
541 public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
542 BlockingCompactionContext blocked = null;
544 public class BlockingCompactionContext extends CompactionContext {
545 public volatile boolean isInCompact = false;
547 public void unblock() {
548 synchronized (this) {
549 this.notifyAll();
553 @Override
554 public List<Path> compact(ThroughputController throughputController, User user)
555 throws IOException {
556 try {
557 isInCompact = true;
558 synchronized (this) {
559 this.wait();
561 } catch (InterruptedException e) {
562 Assume.assumeNoException(e);
564 return new ArrayList<>();
567 @Override
568 public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
569 return new ArrayList<>();
572 @Override
573 public boolean select(List<HStoreFile> f, boolean i, boolean m, boolean e)
574 throws IOException {
575 this.request = new CompactionRequestImpl(new ArrayList<>());
576 return true;
580 @Override
581 public Optional<CompactionContext> selectCompaction() {
582 this.blocked = new BlockingCompactionContext();
583 try {
584 this.blocked.select(null, false, false, false);
585 } catch (IOException ex) {
586 fail("Shouldn't happen");
588 return Optional.of(blocked);
591 @Override
592 public void cancelCompaction(Object object) {}
594 @Override
595 public int getPriority() {
596 return Integer.MIN_VALUE; // some invalid value, see createStoreMock
599 public BlockingCompactionContext waitForBlocking() {
600 while (this.blocked == null || !this.blocked.isInCompact) {
601 Threads.sleepWithoutInterrupt(50);
603 BlockingCompactionContext ctx = this.blocked;
604 this.blocked = null;
605 return ctx;
608 @Override
609 public HStore createStoreMock(String name) throws Exception {
610 return createStoreMock(Integer.MIN_VALUE, name);
613 public HStore createStoreMock(int priority, String name) throws Exception {
614 // Override the mock to always return the specified priority.
615 HStore s = super.createStoreMock(name);
616 when(s.getCompactPriority()).thenReturn(priority);
617 return s;
621 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
622 @Test
623 public void testCompactionQueuePriorities() throws Exception {
624 // Setup a compact/split thread on a mock server.
625 final Configuration conf = HBaseConfiguration.create();
626 HRegionServer mockServer = mock(HRegionServer.class);
627 when(mockServer.isStopped()).thenReturn(false);
628 when(mockServer.getConfiguration()).thenReturn(conf);
629 when(mockServer.getChoreService()).thenReturn(new ChoreService("test"));
630 CompactSplit cst = new CompactSplit(mockServer);
631 when(mockServer.getCompactSplitThread()).thenReturn(cst);
632 //prevent large compaction thread pool stealing job from small compaction queue.
633 cst.shutdownLongCompactions();
634 // Set up the region mock that redirects compactions.
635 HRegion r = mock(HRegion.class);
636 when(
637 r.compact(any(), any(), any(), any())).then(new Answer<Boolean>() {
638 @Override
639 public Boolean answer(InvocationOnMock invocation) throws Throwable {
640 invocation.<CompactionContext>getArgument(0).compact(invocation.getArgument(2), null);
641 return true;
645 // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
646 ArrayList<Integer> results = new ArrayList<>();
647 StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
648 HStore store = sm.createStoreMock("store1");
649 HStore store2 = sm2.createStoreMock("store2");
650 BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
652 // First, block the compaction thread so that we could muck with queue.
653 cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
654 BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
656 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
657 for (int i = 0; i < 4; ++i) {
658 sm.notCompacting.add(createFile());
660 cst.requestSystemCompaction(r, store, "s1-pri3");
661 for (int i = 0; i < 3; ++i) {
662 sm2.notCompacting.add(createFile());
664 cst.requestSystemCompaction(r, store2, "s2-pri4");
665 // Now add 2 more files to store1 and queue compaction - pri 1.
666 for (int i = 0; i < 2; ++i) {
667 sm.notCompacting.add(createFile());
669 cst.requestSystemCompaction(r, store, "s1-pri1");
670 // Finally add blocking compaction with priority 2.
671 cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
673 // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
674 currentBlock.unblock();
675 currentBlock = blocker.waitForBlocking();
676 // Pri1 should have "compacted" all 6 files.
677 assertEquals(1, results.size());
678 assertEquals(6, results.get(0).intValue());
679 // Add 2 files to store 1 (it has 2 files now).
680 for (int i = 0; i < 2; ++i) {
681 sm.notCompacting.add(createFile());
683 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
684 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
685 cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
686 currentBlock.unblock();
687 currentBlock = blocker.waitForBlocking();
688 assertEquals(3, results.size());
689 assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
690 assertEquals(2, results.get(2).intValue());
692 currentBlock.unblock();
693 cst.interruptIfNecessary();
697 * Firstly write 10 cells (with different time stamp) to a qualifier and flush
698 * to hfile1, then write 10 cells (with different time stamp) to the same
699 * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
700 * oldest cell (cell-B) in hfile2 are with the same time stamp but different
701 * sequence id, and will get scanned successively during compaction.
702 * <p/>
703 * We set compaction.kv.max to 10 so compaction will scan 10 versions each
704 * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
705 * 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
706 * including cell-B, then when scanner goes to cell-A it will cause a scan
707 * out-of-order assertion error before HBASE-16931
709 * @throws Exception
710 * if error occurs during the test
712 @Test
713 public void testCompactionSeqId() throws Exception {
714 final byte[] ROW = Bytes.toBytes("row");
715 final byte[] QUALIFIER = Bytes.toBytes("qualifier");
717 long timestamp = 10000;
719 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
720 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
721 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
722 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
723 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
724 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
725 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
726 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
727 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
728 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
729 for (int i = 0; i < 10; i++) {
730 Put put = new Put(ROW);
731 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
732 r.put(put);
734 r.flush(true);
736 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
737 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
738 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
739 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
740 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
741 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
742 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
743 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
744 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
745 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
746 for (int i = 18; i > 8; i--) {
747 Put put = new Put(ROW);
748 put.addColumn(FAMILY, QUALIFIER, timestamp + i, Bytes.toBytes("v" + i));
749 r.put(put);
751 r.flush(true);
752 r.compact(true);
755 public static class DummyCompactor extends DefaultCompactor {
756 public DummyCompactor(Configuration conf, HStore store) {
757 super(conf, store);
758 this.keepSeqIdPeriod = 0;
762 private static HStoreFile createFile() throws Exception {
763 HStoreFile sf = mock(HStoreFile.class);
764 when(sf.getPath()).thenReturn(new Path("file"));
765 StoreFileReader r = mock(StoreFileReader.class);
766 when(r.length()).thenReturn(10L);
767 when(sf.getReader()).thenReturn(r);
768 return sf;
772 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
773 * finishes.
775 public static class Tracker implements CompactionLifeCycleTracker {
777 private final CountDownLatch done;
779 public Tracker(CountDownLatch done) {
780 this.done = done;
783 @Override
784 public void afterExecution(Store store) {
785 done.countDown();
790 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
791 * finishes.
793 public static class WaitThroughPutController extends NoLimitThroughputController{
795 public WaitThroughPutController() {
798 @Override
799 public long control(String compactionName, long size) throws InterruptedException {
800 Thread.sleep(6000000);
801 return 6000000;