2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.START_KEY
;
21 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.START_KEY_BYTES
;
22 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam1
;
23 import static org
.apache
.hadoop
.hbase
.regionserver
.Store
.PRIORITY_USER
;
24 import static org
.junit
.Assert
.assertEquals
;
25 import static org
.junit
.Assert
.assertFalse
;
26 import static org
.junit
.Assert
.assertTrue
;
27 import static org
.junit
.Assert
.fail
;
28 import static org
.mockito
.Matchers
.any
;
29 import static org
.mockito
.Mockito
.doAnswer
;
30 import static org
.mockito
.Mockito
.mock
;
31 import static org
.mockito
.Mockito
.spy
;
32 import static org
.mockito
.Mockito
.when
;
34 import java
.io
.IOException
;
35 import java
.util
.ArrayList
;
36 import java
.util
.Collection
;
37 import java
.util
.List
;
38 import java
.util
.Optional
;
39 import java
.util
.concurrent
.CountDownLatch
;
40 import java
.util
.concurrent
.TimeUnit
;
41 import org
.apache
.hadoop
.conf
.Configuration
;
42 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
43 import org
.apache
.hadoop
.fs
.FileStatus
;
44 import org
.apache
.hadoop
.fs
.FileSystem
;
45 import org
.apache
.hadoop
.fs
.Path
;
46 import org
.apache
.hadoop
.hbase
.ChoreService
;
47 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
48 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
49 import org
.apache
.hadoop
.hbase
.HBaseTestCase
;
50 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
51 import org
.apache
.hadoop
.hbase
.HConstants
;
52 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
53 import org
.apache
.hadoop
.hbase
.client
.Delete
;
54 import org
.apache
.hadoop
.hbase
.client
.Durability
;
55 import org
.apache
.hadoop
.hbase
.client
.Put
;
56 import org
.apache
.hadoop
.hbase
.client
.Table
;
57 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
58 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileScanner
;
59 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionContext
;
60 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequestImpl
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.DefaultCompactor
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.CompactionThroughputControllerFactory
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.NoLimitThroughputController
;
65 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
66 import org
.apache
.hadoop
.hbase
.security
.User
;
67 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
68 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
69 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
70 import org
.apache
.hadoop
.hbase
.util
.Threads
;
71 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
72 import org
.junit
.After
;
73 import org
.junit
.Assume
;
74 import org
.junit
.Before
;
75 import org
.junit
.ClassRule
;
76 import org
.junit
.Rule
;
77 import org
.junit
.Test
;
78 import org
.junit
.experimental
.categories
.Category
;
79 import org
.junit
.rules
.TestName
;
80 import org
.mockito
.Mockito
;
81 import org
.mockito
.invocation
.InvocationOnMock
;
82 import org
.mockito
.stubbing
.Answer
;
85 * Test compaction framework and common functions
87 @Category({RegionServerTests
.class, MediumTests
.class})
88 public class TestCompaction
{
91 public static final HBaseClassTestRule CLASS_RULE
=
92 HBaseClassTestRule
.forClass(TestCompaction
.class);
94 @Rule public TestName name
= new TestName();
95 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
96 protected Configuration conf
= UTIL
.getConfiguration();
98 private HRegion r
= null;
99 private TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
= null;
100 private static final byte [] COLUMN_FAMILY
= fam1
;
101 private final byte [] STARTROW
= Bytes
.toBytes(START_KEY
);
102 private static final byte [] COLUMN_FAMILY_TEXT
= COLUMN_FAMILY
;
103 private int compactionThreshold
;
104 private byte[] secondRowBytes
, thirdRowBytes
;
105 private static final long MAX_FILES_TO_COMPACT
= 10;
106 private final byte[] FAMILY
= Bytes
.toBytes("cf");
109 public TestCompaction() {
112 // Set cache flush size to 1MB
113 conf
.setInt(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, 1024 * 1024);
114 conf
.setInt(HConstants
.HREGION_MEMSTORE_BLOCK_MULTIPLIER
, 100);
115 conf
.set(CompactionThroughputControllerFactory
.HBASE_THROUGHPUT_CONTROLLER_KEY
,
116 NoLimitThroughputController
.class.getName());
117 compactionThreshold
= conf
.getInt("hbase.hstore.compactionThreshold", 3);
119 secondRowBytes
= START_KEY_BYTES
.clone();
120 // Increment the least significant character so we get to next row.
121 secondRowBytes
[START_KEY_BYTES
.length
- 1]++;
122 thirdRowBytes
= START_KEY_BYTES
.clone();
123 thirdRowBytes
[START_KEY_BYTES
.length
- 1] =
124 (byte) (thirdRowBytes
[START_KEY_BYTES
.length
- 1] + 2);
128 public void setUp() throws Exception
{
129 this.tableDescriptor
= UTIL
.createModifyableTableDescriptor(name
.getMethodName());
130 if (name
.getMethodName().equals("testCompactionSeqId")) {
131 UTIL
.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
132 UTIL
.getConfiguration().set(
133 DefaultStoreEngine
.DEFAULT_COMPACTOR_CLASS_KEY
,
134 DummyCompactor
.class.getName());
135 ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor familyDescriptor
=
136 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(FAMILY
);
137 familyDescriptor
.setMaxVersions(65536);
138 this.tableDescriptor
.setColumnFamily(familyDescriptor
);
140 this.r
= UTIL
.createLocalHRegion(tableDescriptor
, null, null);
144 public void tearDown() throws Exception
{
145 WAL wal
= r
.getWAL();
151 * Verify that you can stop a long-running compaction
152 * (used during RS shutdown)
156 public void testInterruptCompaction() throws Exception
{
157 assertEquals(0, count());
159 // lower the polling interval for this test
160 int origWI
= HStore
.closeCheckInterval
;
161 HStore
.closeCheckInterval
= 10*1000; // 10 KB
164 // Create a couple store files w/ 15KB (over 10KB interval)
165 int jmax
= (int) Math
.ceil(15.0/compactionThreshold
);
166 byte [] pad
= new byte[1000]; // 1 KB chunk
167 for (int i
= 0; i
< compactionThreshold
; i
++) {
168 Table loader
= new RegionAsTable(r
);
169 Put p
= new Put(Bytes
.add(STARTROW
, Bytes
.toBytes(i
)));
170 p
.setDurability(Durability
.SKIP_WAL
);
171 for (int j
= 0; j
< jmax
; j
++) {
172 p
.addColumn(COLUMN_FAMILY
, Bytes
.toBytes(j
), pad
);
174 HBaseTestCase
.addContent(loader
, Bytes
.toString(COLUMN_FAMILY
));
179 HRegion spyR
= spy(r
);
180 doAnswer(new Answer() {
182 public Object
answer(InvocationOnMock invocation
) throws Throwable
{
183 r
.writestate
.writesEnabled
= false;
184 return invocation
.callRealMethod();
186 }).when(spyR
).doRegionCompactionPrep();
188 // force a minor compaction, but not before requesting a stop
189 spyR
.compactStores();
191 // ensure that the compaction stopped, all old files are intact,
192 HStore s
= r
.getStore(COLUMN_FAMILY
);
193 assertEquals(compactionThreshold
, s
.getStorefilesCount());
194 assertTrue(s
.getStorefilesSize() > 15*1000);
195 // and no new store files persisted past compactStores()
196 // only one empty dir exists in temp dir
197 FileStatus
[] ls
= r
.getFilesystem().listStatus(r
.getRegionFileSystem().getTempDir());
198 assertEquals(1, ls
.length
);
199 Path storeTempDir
= new Path(r
.getRegionFileSystem().getTempDir(), Bytes
.toString(COLUMN_FAMILY
));
200 assertTrue(r
.getFilesystem().exists(storeTempDir
));
201 ls
= r
.getFilesystem().listStatus(storeTempDir
);
202 assertEquals(0, ls
.length
);
204 // don't mess up future tests
205 r
.writestate
.writesEnabled
= true;
206 HStore
.closeCheckInterval
= origWI
;
208 // Delete all Store information once done using
209 for (int i
= 0; i
< compactionThreshold
; i
++) {
210 Delete delete
= new Delete(Bytes
.add(STARTROW
, Bytes
.toBytes(i
)));
211 byte [][] famAndQf
= {COLUMN_FAMILY
, null};
212 delete
.addFamily(famAndQf
[0]);
217 // Multiple versions allowed for an entry, so the delete isn't enough
218 // Lower TTL and expire to ensure that all our entries have been wiped
219 final int ttl
= 1000;
220 for (HStore store
: this.r
.stores
.values()) {
221 ScanInfo old
= store
.getScanInfo();
222 ScanInfo si
= old
.customize(old
.getMaxVersions(), ttl
, old
.getKeepDeletedCells());
223 store
.setScanInfo(si
);
228 assertEquals(0, count());
232 private int count() throws IOException
{
234 for (HStoreFile f
: this.r
.stores
.
235 get(COLUMN_FAMILY_TEXT
).getStorefiles()) {
236 HFileScanner scanner
= f
.getReader().getScanner(false, false);
237 if (!scanner
.seekTo()) {
242 } while(scanner
.next());
247 private void createStoreFile(final HRegion region
) throws IOException
{
248 createStoreFile(region
, Bytes
.toString(COLUMN_FAMILY
));
251 private void createStoreFile(final HRegion region
, String family
) throws IOException
{
252 Table loader
= new RegionAsTable(region
);
253 HBaseTestCase
.addContent(loader
, family
);
258 public void testCompactionWithCorruptResult() throws Exception
{
260 for (int i
= 0; i
< nfiles
; i
++) {
263 HStore store
= r
.getStore(COLUMN_FAMILY
);
265 Collection
<HStoreFile
> storeFiles
= store
.getStorefiles();
266 DefaultCompactor tool
= (DefaultCompactor
)store
.storeEngine
.getCompactor();
267 tool
.compactForTesting(storeFiles
, false);
269 // Now lets corrupt the compacted file.
270 FileSystem fs
= store
.getFileSystem();
271 // default compaction policy created one and only one new compacted file
272 Path dstPath
= store
.getRegionFileSystem().createTempName();
273 FSDataOutputStream stream
= fs
.create(dstPath
, null, true, 512, (short)3, 1024L, null);
274 stream
.writeChars("CORRUPT FILE!!!!");
276 Path origPath
= store
.getRegionFileSystem().commitStoreFile(
277 Bytes
.toString(COLUMN_FAMILY
), dstPath
);
280 ((HStore
)store
).moveFileIntoPlace(origPath
);
281 } catch (Exception e
) {
282 // The complete compaction should fail and the corrupt file should remain
283 // in the 'tmp' directory;
284 assertTrue(fs
.exists(origPath
));
285 assertFalse(fs
.exists(dstPath
));
286 System
.out
.println("testCompactionWithCorruptResult Passed");
289 fail("testCompactionWithCorruptResult failed since no exception was" +
290 "thrown while completing a corrupt file");
294 * Create a custom compaction request and be sure that we can track it through the queue, knowing
295 * when the compaction is completed.
298 public void testTrackingCompactionRequest() throws Exception
{
299 // setup a compact/split thread on a mock server
300 HRegionServer mockServer
= Mockito
.mock(HRegionServer
.class);
301 Mockito
.when(mockServer
.getConfiguration()).thenReturn(r
.getBaseConf());
302 CompactSplit thread
= new CompactSplit(mockServer
);
303 Mockito
.when(mockServer
.getCompactSplitThread()).thenReturn(thread
);
305 // setup a region/store with some files
306 HStore store
= r
.getStore(COLUMN_FAMILY
);
308 for (int i
= 0; i
< MAX_FILES_TO_COMPACT
+ 1; i
++) {
312 CountDownLatch latch
= new CountDownLatch(1);
313 Tracker tracker
= new Tracker(latch
);
314 thread
.requestCompaction(r
, store
, "test custom comapction", PRIORITY_USER
, tracker
,
316 // wait for the latch to complete.
319 thread
.interruptIfNecessary();
323 public void testCompactionFailure() throws Exception
{
324 // setup a compact/split thread on a mock server
325 HRegionServer mockServer
= Mockito
.mock(HRegionServer
.class);
326 Mockito
.when(mockServer
.getConfiguration()).thenReturn(r
.getBaseConf());
327 CompactSplit thread
= new CompactSplit(mockServer
);
328 Mockito
.when(mockServer
.getCompactSplitThread()).thenReturn(thread
);
330 // setup a region/store with some files
331 HStore store
= r
.getStore(COLUMN_FAMILY
);
333 for (int i
= 0; i
< HStore
.DEFAULT_BLOCKING_STOREFILE_COUNT
- 1; i
++) {
337 HRegion mockRegion
= Mockito
.spy(r
);
338 Mockito
.when(mockRegion
.checkSplit()).thenThrow(new IndexOutOfBoundsException());
340 MetricsRegionWrapper metricsWrapper
= new MetricsRegionWrapperImpl(r
);
342 long preCompletedCount
= metricsWrapper
.getNumCompactionsCompleted();
343 long preFailedCount
= metricsWrapper
.getNumCompactionsFailed();
345 CountDownLatch latch
= new CountDownLatch(1);
346 Tracker tracker
= new Tracker(latch
);
347 thread
.requestCompaction(mockRegion
, store
, "test custom comapction", PRIORITY_USER
,
349 // wait for the latch to complete.
350 latch
.await(120, TimeUnit
.SECONDS
);
352 // compaction should have completed and been marked as failed due to error in split request
353 long postCompletedCount
= metricsWrapper
.getNumCompactionsCompleted();
354 long postFailedCount
= metricsWrapper
.getNumCompactionsFailed();
356 assertTrue("Completed count should have increased (pre=" + preCompletedCount
+
357 ", post="+postCompletedCount
+")",
358 postCompletedCount
> preCompletedCount
);
359 assertTrue("Failed count should have increased (pre=" + preFailedCount
+
360 ", post=" + postFailedCount
+ ")",
361 postFailedCount
> preFailedCount
);
365 * Test no new Compaction requests are generated after calling stop compactions
368 public void testStopStartCompaction() throws IOException
{
369 // setup a compact/split thread on a mock server
370 HRegionServer mockServer
= Mockito
.mock(HRegionServer
.class);
371 Mockito
.when(mockServer
.getConfiguration()).thenReturn(r
.getBaseConf());
372 CompactSplit thread
= new CompactSplit(mockServer
);
373 Mockito
.when(mockServer
.getCompactSplitThread()).thenReturn(thread
);
374 // setup a region/store with some files
375 HStore store
= r
.getStore(COLUMN_FAMILY
);
377 for (int i
= 0; i
< HStore
.DEFAULT_BLOCKING_STOREFILE_COUNT
- 1; i
++) {
380 thread
.switchCompaction(false);
381 thread
.requestCompaction(r
, store
, "test", Store
.PRIORITY_USER
,
382 CompactionLifeCycleTracker
.DUMMY
, null);
383 assertEquals(false, thread
.isCompactionsEnabled());
384 int longCompactions
= thread
.getLongCompactions().getActiveCount();
385 int shortCompactions
= thread
.getShortCompactions().getActiveCount();
386 assertEquals("longCompactions=" + longCompactions
+ "," +
387 "shortCompactions=" + shortCompactions
, 0, longCompactions
+ shortCompactions
);
388 thread
.switchCompaction(true);
389 assertEquals(true, thread
.isCompactionsEnabled());
390 thread
.requestCompaction(r
, store
, "test", Store
.PRIORITY_USER
,
391 CompactionLifeCycleTracker
.DUMMY
, null);
392 longCompactions
= thread
.getLongCompactions().getActiveCount();
393 shortCompactions
= thread
.getShortCompactions().getActiveCount();
394 assertEquals("longCompactions=" + longCompactions
+ "," +
395 "shortCompactions=" + shortCompactions
, 1, longCompactions
+ shortCompactions
);
398 @Test public void testInterruptingRunningCompactions() throws Exception
{
399 // setup a compact/split thread on a mock server
400 conf
.set(CompactionThroughputControllerFactory
.HBASE_THROUGHPUT_CONTROLLER_KEY
,
401 WaitThroughPutController
.class.getName());
402 HRegionServer mockServer
= Mockito
.mock(HRegionServer
.class);
403 Mockito
.when(mockServer
.getConfiguration()).thenReturn(r
.getBaseConf());
404 CompactSplit thread
= new CompactSplit(mockServer
);
406 Mockito
.when(mockServer
.getCompactSplitThread()).thenReturn(thread
);
408 // setup a region/store with some files
409 HStore store
= r
.getStore(COLUMN_FAMILY
);
410 int jmax
= (int) Math
.ceil(15.0 / compactionThreshold
);
411 byte[] pad
= new byte[1000]; // 1 KB chunk
412 for (int i
= 0; i
< compactionThreshold
; i
++) {
413 Table loader
= new RegionAsTable(r
);
414 Put p
= new Put(Bytes
.add(STARTROW
, Bytes
.toBytes(i
)));
415 p
.setDurability(Durability
.SKIP_WAL
);
416 for (int j
= 0; j
< jmax
; j
++) {
417 p
.addColumn(COLUMN_FAMILY
, Bytes
.toBytes(j
), pad
);
419 HBaseTestCase
.addContent(loader
, Bytes
.toString(COLUMN_FAMILY
));
423 HStore s
= r
.getStore(COLUMN_FAMILY
);
424 int initialFiles
= s
.getStorefilesCount();
426 thread
.requestCompaction(r
, store
, "test custom comapction", PRIORITY_USER
,
427 CompactionLifeCycleTracker
.DUMMY
, null);
430 thread
.switchCompaction(false);
431 assertEquals(initialFiles
, s
.getStorefilesCount());
432 //don't mess up future tests
433 thread
.switchCompaction(true);
437 * HBASE-7947: Regression test to ensure adding to the correct list in the
438 * {@link CompactSplit}
439 * @throws Exception on failure
442 public void testMultipleCustomCompactionRequests() throws Exception
{
443 // setup a compact/split thread on a mock server
444 HRegionServer mockServer
= Mockito
.mock(HRegionServer
.class);
445 Mockito
.when(mockServer
.getConfiguration()).thenReturn(r
.getBaseConf());
446 CompactSplit thread
= new CompactSplit(mockServer
);
447 Mockito
.when(mockServer
.getCompactSplitThread()).thenReturn(thread
);
449 // setup a region/store with some files
450 int numStores
= r
.getStores().size();
451 CountDownLatch latch
= new CountDownLatch(numStores
);
452 Tracker tracker
= new Tracker(latch
);
453 // create some store files and setup requests for each store on which we want to do a
455 for (HStore store
: r
.getStores()) {
456 createStoreFile(r
, store
.getColumnFamilyName());
457 createStoreFile(r
, store
.getColumnFamilyName());
458 createStoreFile(r
, store
.getColumnFamilyName());
459 thread
.requestCompaction(r
, store
, "test mulitple custom comapctions", PRIORITY_USER
,
462 // wait for the latch to complete.
465 thread
.interruptIfNecessary();
468 class StoreMockMaker
extends StatefulStoreMockMaker
{
469 public ArrayList
<HStoreFile
> compacting
= new ArrayList
<>();
470 public ArrayList
<HStoreFile
> notCompacting
= new ArrayList
<>();
471 private final ArrayList
<Integer
> results
;
473 public StoreMockMaker(ArrayList
<Integer
> results
) {
474 this.results
= results
;
477 public class TestCompactionContext
extends CompactionContext
{
479 private List
<HStoreFile
> selectedFiles
;
481 public TestCompactionContext(List
<HStoreFile
> selectedFiles
) {
483 this.selectedFiles
= selectedFiles
;
487 public List
<HStoreFile
> preSelect(List
<HStoreFile
> filesCompacting
) {
488 return new ArrayList
<>();
492 public boolean select(List
<HStoreFile
> filesCompacting
, boolean isUserCompaction
,
493 boolean mayUseOffPeak
, boolean forceMajor
) throws IOException
{
494 this.request
= new CompactionRequestImpl(selectedFiles
);
495 this.request
.setPriority(getPriority());
500 public List
<Path
> compact(ThroughputController throughputController
, User user
)
502 finishCompaction(this.selectedFiles
);
503 return new ArrayList
<>();
508 public synchronized Optional
<CompactionContext
> selectCompaction() {
509 CompactionContext ctx
= new TestCompactionContext(new ArrayList
<>(notCompacting
));
510 compacting
.addAll(notCompacting
);
511 notCompacting
.clear();
513 ctx
.select(null, false, false, false);
514 } catch (IOException ex
) {
515 fail("Shouldn't happen");
517 return Optional
.of(ctx
);
521 public synchronized void cancelCompaction(Object object
) {
522 TestCompactionContext ctx
= (TestCompactionContext
)object
;
523 compacting
.removeAll(ctx
.selectedFiles
);
524 notCompacting
.addAll(ctx
.selectedFiles
);
527 public synchronized void finishCompaction(List
<HStoreFile
> sfs
) {
528 if (sfs
.isEmpty()) return;
529 synchronized (results
) {
530 results
.add(sfs
.size());
532 compacting
.removeAll(sfs
);
536 public int getPriority() {
537 return 7 - compacting
.size() - notCompacting
.size();
541 public class BlockingStoreMockMaker
extends StatefulStoreMockMaker
{
542 BlockingCompactionContext blocked
= null;
544 public class BlockingCompactionContext
extends CompactionContext
{
545 public volatile boolean isInCompact
= false;
547 public void unblock() {
548 synchronized (this) {
554 public List
<Path
> compact(ThroughputController throughputController
, User user
)
558 synchronized (this) {
561 } catch (InterruptedException e
) {
562 Assume
.assumeNoException(e
);
564 return new ArrayList
<>();
568 public List
<HStoreFile
> preSelect(List
<HStoreFile
> filesCompacting
) {
569 return new ArrayList
<>();
573 public boolean select(List
<HStoreFile
> f
, boolean i
, boolean m
, boolean e
)
575 this.request
= new CompactionRequestImpl(new ArrayList
<>());
581 public Optional
<CompactionContext
> selectCompaction() {
582 this.blocked
= new BlockingCompactionContext();
584 this.blocked
.select(null, false, false, false);
585 } catch (IOException ex
) {
586 fail("Shouldn't happen");
588 return Optional
.of(blocked
);
592 public void cancelCompaction(Object object
) {}
595 public int getPriority() {
596 return Integer
.MIN_VALUE
; // some invalid value, see createStoreMock
599 public BlockingCompactionContext
waitForBlocking() {
600 while (this.blocked
== null || !this.blocked
.isInCompact
) {
601 Threads
.sleepWithoutInterrupt(50);
603 BlockingCompactionContext ctx
= this.blocked
;
609 public HStore
createStoreMock(String name
) throws Exception
{
610 return createStoreMock(Integer
.MIN_VALUE
, name
);
613 public HStore
createStoreMock(int priority
, String name
) throws Exception
{
614 // Override the mock to always return the specified priority.
615 HStore s
= super.createStoreMock(name
);
616 when(s
.getCompactPriority()).thenReturn(priority
);
621 /** Test compaction priority management and multiple compactions per store (HBASE-8665). */
623 public void testCompactionQueuePriorities() throws Exception
{
624 // Setup a compact/split thread on a mock server.
625 final Configuration conf
= HBaseConfiguration
.create();
626 HRegionServer mockServer
= mock(HRegionServer
.class);
627 when(mockServer
.isStopped()).thenReturn(false);
628 when(mockServer
.getConfiguration()).thenReturn(conf
);
629 when(mockServer
.getChoreService()).thenReturn(new ChoreService("test"));
630 CompactSplit cst
= new CompactSplit(mockServer
);
631 when(mockServer
.getCompactSplitThread()).thenReturn(cst
);
632 //prevent large compaction thread pool stealing job from small compaction queue.
633 cst
.shutdownLongCompactions();
634 // Set up the region mock that redirects compactions.
635 HRegion r
= mock(HRegion
.class);
637 r
.compact(any(), any(), any(), any())).then(new Answer
<Boolean
>() {
639 public Boolean
answer(InvocationOnMock invocation
) throws Throwable
{
640 invocation
.<CompactionContext
>getArgument(0).compact(invocation
.getArgument(2), null);
645 // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
646 ArrayList
<Integer
> results
= new ArrayList
<>();
647 StoreMockMaker sm
= new StoreMockMaker(results
), sm2
= new StoreMockMaker(results
);
648 HStore store
= sm
.createStoreMock("store1");
649 HStore store2
= sm2
.createStoreMock("store2");
650 BlockingStoreMockMaker blocker
= new BlockingStoreMockMaker();
652 // First, block the compaction thread so that we could muck with queue.
653 cst
.requestSystemCompaction(r
, blocker
.createStoreMock(1, "b-pri1"), "b-pri1");
654 BlockingStoreMockMaker
.BlockingCompactionContext currentBlock
= blocker
.waitForBlocking();
656 // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
657 for (int i
= 0; i
< 4; ++i
) {
658 sm
.notCompacting
.add(createFile());
660 cst
.requestSystemCompaction(r
, store
, "s1-pri3");
661 for (int i
= 0; i
< 3; ++i
) {
662 sm2
.notCompacting
.add(createFile());
664 cst
.requestSystemCompaction(r
, store2
, "s2-pri4");
665 // Now add 2 more files to store1 and queue compaction - pri 1.
666 for (int i
= 0; i
< 2; ++i
) {
667 sm
.notCompacting
.add(createFile());
669 cst
.requestSystemCompaction(r
, store
, "s1-pri1");
670 // Finally add blocking compaction with priority 2.
671 cst
.requestSystemCompaction(r
, blocker
.createStoreMock(2, "b-pri2"), "b-pri2");
673 // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
674 currentBlock
.unblock();
675 currentBlock
= blocker
.waitForBlocking();
676 // Pri1 should have "compacted" all 6 files.
677 assertEquals(1, results
.size());
678 assertEquals(6, results
.get(0).intValue());
679 // Add 2 files to store 1 (it has 2 files now).
680 for (int i
= 0; i
< 2; ++i
) {
681 sm
.notCompacting
.add(createFile());
683 // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
684 // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
685 cst
.requestSystemCompaction(r
, blocker
.createStoreMock(7, "b-pri7"), "b-pri7");
686 currentBlock
.unblock();
687 currentBlock
= blocker
.waitForBlocking();
688 assertEquals(3, results
.size());
689 assertEquals(3, results
.get(1).intValue()); // 3 files should go before 2 files.
690 assertEquals(2, results
.get(2).intValue());
692 currentBlock
.unblock();
693 cst
.interruptIfNecessary();
697 * Firstly write 10 cells (with different time stamp) to a qualifier and flush
698 * to hfile1, then write 10 cells (with different time stamp) to the same
699 * qualifier and flush to hfile2. The latest cell (cell-A) in hfile1 and the
700 * oldest cell (cell-B) in hfile2 are with the same time stamp but different
701 * sequence id, and will get scanned successively during compaction.
703 * We set compaction.kv.max to 10 so compaction will scan 10 versions each
704 * round, meanwhile we set keepSeqIdPeriod=0 in {@link DummyCompactor} so all
705 * 10 versions of hfile2 will be written out with seqId cleaned (set to 0)
706 * including cell-B, then when scanner goes to cell-A it will cause a scan
707 * out-of-order assertion error before HBASE-16931
710 * if error occurs during the test
713 public void testCompactionSeqId() throws Exception
{
714 final byte[] ROW
= Bytes
.toBytes("row");
715 final byte[] QUALIFIER
= Bytes
.toBytes("qualifier");
717 long timestamp
= 10000;
719 // row1/cf:a/10009/Put/vlen=2/seqid=11 V: v9
720 // row1/cf:a/10008/Put/vlen=2/seqid=10 V: v8
721 // row1/cf:a/10007/Put/vlen=2/seqid=9 V: v7
722 // row1/cf:a/10006/Put/vlen=2/seqid=8 V: v6
723 // row1/cf:a/10005/Put/vlen=2/seqid=7 V: v5
724 // row1/cf:a/10004/Put/vlen=2/seqid=6 V: v4
725 // row1/cf:a/10003/Put/vlen=2/seqid=5 V: v3
726 // row1/cf:a/10002/Put/vlen=2/seqid=4 V: v2
727 // row1/cf:a/10001/Put/vlen=2/seqid=3 V: v1
728 // row1/cf:a/10000/Put/vlen=2/seqid=2 V: v0
729 for (int i
= 0; i
< 10; i
++) {
730 Put put
= new Put(ROW
);
731 put
.addColumn(FAMILY
, QUALIFIER
, timestamp
+ i
, Bytes
.toBytes("v" + i
));
736 // row1/cf:a/10018/Put/vlen=3/seqid=16 V: v18
737 // row1/cf:a/10017/Put/vlen=3/seqid=17 V: v17
738 // row1/cf:a/10016/Put/vlen=3/seqid=18 V: v16
739 // row1/cf:a/10015/Put/vlen=3/seqid=19 V: v15
740 // row1/cf:a/10014/Put/vlen=3/seqid=20 V: v14
741 // row1/cf:a/10013/Put/vlen=3/seqid=21 V: v13
742 // row1/cf:a/10012/Put/vlen=3/seqid=22 V: v12
743 // row1/cf:a/10011/Put/vlen=3/seqid=23 V: v11
744 // row1/cf:a/10010/Put/vlen=3/seqid=24 V: v10
745 // row1/cf:a/10009/Put/vlen=2/seqid=25 V: v9
746 for (int i
= 18; i
> 8; i
--) {
747 Put put
= new Put(ROW
);
748 put
.addColumn(FAMILY
, QUALIFIER
, timestamp
+ i
, Bytes
.toBytes("v" + i
));
755 public static class DummyCompactor
extends DefaultCompactor
{
756 public DummyCompactor(Configuration conf
, HStore store
) {
758 this.keepSeqIdPeriod
= 0;
762 private static HStoreFile
createFile() throws Exception
{
763 HStoreFile sf
= mock(HStoreFile
.class);
764 when(sf
.getPath()).thenReturn(new Path("file"));
765 StoreFileReader r
= mock(StoreFileReader
.class);
766 when(r
.length()).thenReturn(10L);
767 when(sf
.getReader()).thenReturn(r
);
772 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
775 public static class Tracker
implements CompactionLifeCycleTracker
{
777 private final CountDownLatch done
;
779 public Tracker(CountDownLatch done
) {
784 public void afterExecution(Store store
) {
790 * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
793 public static class WaitThroughPutController
extends NoLimitThroughputController
{
795 public WaitThroughPutController() {
799 public long control(String compactionName
, long size
) throws InterruptedException
{
800 Thread
.sleep(6000000);