3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.regionserver
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.Store
.NO_PRIORITY
;
22 import static org
.apache
.hadoop
.hbase
.regionserver
.Store
.PRIORITY_USER
;
24 import java
.io
.IOException
;
25 import java
.io
.PrintWriter
;
26 import java
.io
.StringWriter
;
27 import java
.util
.Comparator
;
28 import java
.util
.Iterator
;
29 import java
.util
.Optional
;
31 import java
.util
.concurrent
.BlockingQueue
;
32 import java
.util
.concurrent
.ConcurrentHashMap
;
33 import java
.util
.concurrent
.Executors
;
34 import java
.util
.concurrent
.RejectedExecutionException
;
35 import java
.util
.concurrent
.RejectedExecutionHandler
;
36 import java
.util
.concurrent
.ThreadPoolExecutor
;
37 import java
.util
.concurrent
.TimeUnit
;
38 import java
.util
.concurrent
.atomic
.AtomicInteger
;
39 import java
.util
.function
.IntSupplier
;
40 import org
.apache
.hadoop
.conf
.Configuration
;
41 import org
.apache
.hadoop
.hbase
.conf
.ConfigurationManager
;
42 import org
.apache
.hadoop
.hbase
.conf
.PropagatingConfigurationObserver
;
43 import org
.apache
.hadoop
.hbase
.quotas
.RegionServerSpaceQuotaManager
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionContext
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionLifeCycleTracker
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequestImpl
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.compactions
.CompactionRequester
;
48 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.CompactionThroughputControllerFactory
;
49 import org
.apache
.hadoop
.hbase
.regionserver
.throttle
.ThroughputController
;
50 import org
.apache
.hadoop
.hbase
.security
.Superusers
;
51 import org
.apache
.hadoop
.hbase
.security
.User
;
52 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
53 import org
.apache
.hadoop
.hbase
.util
.StealJobQueue
;
54 import org
.apache
.hadoop
.ipc
.RemoteException
;
55 import org
.apache
.hadoop
.util
.StringUtils
;
56 import org
.apache
.yetus
.audience
.InterfaceAudience
;
57 import org
.slf4j
.Logger
;
58 import org
.slf4j
.LoggerFactory
;
60 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
64 * Compact region on request and then run split if appropriate
66 @InterfaceAudience.Private
67 public class CompactSplit
implements CompactionRequester
, PropagatingConfigurationObserver
{
68 private static final Logger LOG
= LoggerFactory
.getLogger(CompactSplit
.class);
70 // Configuration key for the large compaction threads.
71 public final static String LARGE_COMPACTION_THREADS
=
72 "hbase.regionserver.thread.compaction.large";
73 public final static int LARGE_COMPACTION_THREADS_DEFAULT
= 1;
75 // Configuration key for the small compaction threads.
76 public final static String SMALL_COMPACTION_THREADS
=
77 "hbase.regionserver.thread.compaction.small";
78 public final static int SMALL_COMPACTION_THREADS_DEFAULT
= 1;
80 // Configuration key for split threads
81 public final static String SPLIT_THREADS
= "hbase.regionserver.thread.split";
82 public final static int SPLIT_THREADS_DEFAULT
= 1;
84 public static final String REGION_SERVER_REGION_SPLIT_LIMIT
=
85 "hbase.regionserver.regionSplitLimit";
86 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT
= 1000;
87 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION
=
88 "hbase.regionserver.compaction.enabled";
90 private final HRegionServer server
;
91 private final Configuration conf
;
92 private volatile ThreadPoolExecutor longCompactions
;
93 private volatile ThreadPoolExecutor shortCompactions
;
94 private volatile ThreadPoolExecutor splits
;
96 private volatile ThroughputController compactionThroughputController
;
97 private volatile Set
<String
> underCompactionStores
= ConcurrentHashMap
.newKeySet();
99 private volatile boolean compactionsEnabled
;
101 * Splitting should not take place if the total number of regions exceed this.
102 * This is not a hard limit to the number of regions but it is a guideline to
103 * stop splitting after number of online regions is greater than this.
105 private int regionSplitLimit
;
107 CompactSplit(HRegionServer server
) {
108 this.server
= server
;
109 this.conf
= server
.getConfiguration();
110 this.compactionsEnabled
= this.conf
.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION
,true);
111 createCompactionExecutors();
112 createSplitExcecutors();
114 // compaction throughput controller
115 this.compactionThroughputController
=
116 CompactionThroughputControllerFactory
.create(server
, conf
);
120 public CompactSplit(Configuration conf
) {
123 this.compactionsEnabled
= this.conf
.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION
,true);
124 createCompactionExecutors();
125 createSplitExcecutors();
128 private void createSplitExcecutors() {
129 final String n
= Thread
.currentThread().getName();
130 int splitThreads
= conf
.getInt(SPLIT_THREADS
, SPLIT_THREADS_DEFAULT
);
131 this.splits
= (ThreadPoolExecutor
) Executors
.newFixedThreadPool(splitThreads
,
132 new ThreadFactoryBuilder().setNameFormat(n
+ "-splits-%d").setDaemon(true).build());
135 private void createCompactionExecutors() {
136 this.regionSplitLimit
=
137 conf
.getInt(REGION_SERVER_REGION_SPLIT_LIMIT
, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT
);
140 Math
.max(1, conf
.getInt(LARGE_COMPACTION_THREADS
, LARGE_COMPACTION_THREADS_DEFAULT
));
141 int smallThreads
= conf
.getInt(SMALL_COMPACTION_THREADS
, SMALL_COMPACTION_THREADS_DEFAULT
);
143 // if we have throttle threads, make sure the user also specified size
144 Preconditions
.checkArgument(largeThreads
> 0 && smallThreads
> 0);
146 final String n
= Thread
.currentThread().getName();
148 StealJobQueue
<Runnable
> stealJobQueue
= new StealJobQueue
<Runnable
>(COMPARATOR
);
149 this.longCompactions
= new ThreadPoolExecutor(largeThreads
, largeThreads
, 60, TimeUnit
.SECONDS
,
150 stealJobQueue
, new ThreadFactoryBuilder().setNameFormat(n
+ "-longCompactions-%d")
151 .setDaemon(true).build());
152 this.longCompactions
.setRejectedExecutionHandler(new Rejection());
153 this.longCompactions
.prestartAllCoreThreads();
154 this.shortCompactions
= new ThreadPoolExecutor(smallThreads
, smallThreads
, 60, TimeUnit
.SECONDS
,
155 stealJobQueue
.getStealFromQueue(), new ThreadFactoryBuilder()
156 .setNameFormat(n
+ "-shortCompactions-%d").setDaemon(true).build());
157 this.shortCompactions
.setRejectedExecutionHandler(new Rejection());
161 public String
toString() {
162 return "compactionQueue=(longCompactions="
163 + longCompactions
.getQueue().size() + ":shortCompactions="
164 + shortCompactions
.getQueue().size() + ")"
165 + ", splitQueue=" + splits
.getQueue().size();
168 public String
dumpQueue() {
169 StringBuilder queueLists
= new StringBuilder();
170 queueLists
.append("Compaction/Split Queue dump:\n");
171 queueLists
.append(" LargeCompation Queue:\n");
172 BlockingQueue
<Runnable
> lq
= longCompactions
.getQueue();
173 Iterator
<Runnable
> it
= lq
.iterator();
174 while (it
.hasNext()) {
175 queueLists
.append(" " + it
.next().toString());
176 queueLists
.append("\n");
179 if (shortCompactions
!= null) {
180 queueLists
.append("\n");
181 queueLists
.append(" SmallCompation Queue:\n");
182 lq
= shortCompactions
.getQueue();
184 while (it
.hasNext()) {
185 queueLists
.append(" " + it
.next().toString());
186 queueLists
.append("\n");
190 queueLists
.append("\n");
191 queueLists
.append(" Split Queue:\n");
192 lq
= splits
.getQueue();
194 while (it
.hasNext()) {
195 queueLists
.append(" " + it
.next().toString());
196 queueLists
.append("\n");
199 return queueLists
.toString();
202 public synchronized boolean requestSplit(final Region r
) {
203 // don't split regions that are blocking
204 HRegion hr
= (HRegion
)r
;
206 if (shouldSplitRegion() && hr
.getCompactPriority() >= PRIORITY_USER
) {
207 byte[] midKey
= hr
.checkSplit().orElse(null);
208 if (midKey
!= null) {
209 requestSplit(r
, midKey
);
213 } catch (IndexOutOfBoundsException e
) {
214 // We get this sometimes. Not sure why. Catch and return false; no split request.
215 LOG
.warn("Catching out-of-bounds; region={}, policy={}", hr
== null?
null: hr
.getRegionInfo(),
216 hr
== null?
"null": hr
.getCompactPriority(), e
);
221 public synchronized void requestSplit(final Region r
, byte[] midKey
) {
222 requestSplit(r
, midKey
, null);
226 * The User parameter allows the split thread to assume the correct user identity
228 public synchronized void requestSplit(final Region r
, byte[] midKey
, User user
) {
229 if (midKey
== null) {
230 LOG
.debug("Region " + r
.getRegionInfo().getRegionNameAsString() +
231 " not splittable because midkey=null");
235 this.splits
.execute(new SplitRequest(r
, midKey
, this.server
, user
));
236 if (LOG
.isDebugEnabled()) {
237 LOG
.debug("Splitting " + r
+ ", " + this);
239 } catch (RejectedExecutionException ree
) {
240 LOG
.info("Could not execute split for " + r
, ree
);
244 private void interrupt() {
245 longCompactions
.shutdownNow();
246 shortCompactions
.shutdownNow();
249 private void reInitializeCompactionsExecutors() {
250 createCompactionExecutors();
253 // set protected for test
254 protected interface CompactionCompleteTracker
{
256 default void completed(Store store
) {
260 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER
=
261 new CompactionCompleteTracker() {};
263 private static final class AggregatingCompleteTracker
implements CompactionCompleteTracker
{
265 private final CompactionLifeCycleTracker tracker
;
267 private final AtomicInteger remaining
;
269 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker
, int numberOfStores
) {
270 this.tracker
= tracker
;
271 this.remaining
= new AtomicInteger(numberOfStores
);
275 public void completed(Store store
) {
276 if (remaining
.decrementAndGet() == 0) {
282 private CompactionCompleteTracker
getCompleteTracker(CompactionLifeCycleTracker tracker
,
283 IntSupplier numberOfStores
) {
284 if (tracker
== CompactionLifeCycleTracker
.DUMMY
) {
285 // a simple optimization to avoid creating unnecessary objects as usually we do not care about
286 // the life cycle of a compaction.
287 return DUMMY_COMPLETE_TRACKER
;
289 return new AggregatingCompleteTracker(tracker
, numberOfStores
.getAsInt());
294 public synchronized void requestCompaction(HRegion region
, String why
, int priority
,
295 CompactionLifeCycleTracker tracker
, User user
) throws IOException
{
296 requestCompactionInternal(region
, why
, priority
, true, tracker
,
297 getCompleteTracker(tracker
, () -> region
.getTableDescriptor().getColumnFamilyCount()), user
);
301 public synchronized void requestCompaction(HRegion region
, HStore store
, String why
, int priority
,
302 CompactionLifeCycleTracker tracker
, User user
) throws IOException
{
303 requestCompactionInternal(region
, store
, why
, priority
, true, tracker
,
304 getCompleteTracker(tracker
, () -> 1), user
);
308 public void switchCompaction(boolean onOrOff
) {
310 // re-create executor pool if compactions are disabled.
311 if (!isCompactionsEnabled()) {
312 LOG
.info("Re-Initializing compactions because user switched on compactions");
313 reInitializeCompactionsExecutors();
316 LOG
.info("Interrupting running compactions because user switched off compactions");
319 setCompactionsEnabled(onOrOff
);
322 private void requestCompactionInternal(HRegion region
, String why
, int priority
,
323 boolean selectNow
, CompactionLifeCycleTracker tracker
,
324 CompactionCompleteTracker completeTracker
, User user
) throws IOException
{
325 // request compaction on all stores
326 for (HStore store
: region
.stores
.values()) {
327 requestCompactionInternal(region
, store
, why
, priority
, selectNow
, tracker
, completeTracker
,
332 // set protected for test
333 protected void requestCompactionInternal(HRegion region
, HStore store
, String why
, int priority
,
334 boolean selectNow
, CompactionLifeCycleTracker tracker
,
335 CompactionCompleteTracker completeTracker
, User user
) throws IOException
{
336 if (this.server
.isStopped() || (region
.getTableDescriptor() != null &&
337 !region
.getTableDescriptor().isCompactionEnabled())) {
340 RegionServerSpaceQuotaManager spaceQuotaManager
=
341 this.server
.getRegionServerSpaceQuotaManager();
343 if (user
!= null && !Superusers
.isSuperUser(user
) && spaceQuotaManager
!= null
344 && spaceQuotaManager
.areCompactionsDisabled(region
.getTableDescriptor().getTableName())) {
345 // Enter here only when:
346 // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
347 String reason
= "Ignoring compaction request for " + region
+
348 " as an active space quota violation " + " policy disallows compactions.";
349 tracker
.notExecuted(store
, reason
);
350 completeTracker
.completed(store
);
355 CompactionContext compaction
;
357 Optional
<CompactionContext
> c
=
358 selectCompaction(region
, store
, priority
, tracker
, completeTracker
, user
);
359 if (!c
.isPresent()) {
360 // message logged inside
363 compaction
= c
.get();
368 ThreadPoolExecutor pool
;
370 // compaction.get is safe as we will just return if selectNow is true but no compaction is
372 pool
= store
.throttleCompaction(compaction
.getRequest().getSize()) ? longCompactions
375 // We assume that most compactions are small. So, put system compactions into small
376 // pool; we will do selection there, and move to large pool if necessary.
377 pool
= shortCompactions
;
380 new CompactionRunner(store
, region
, compaction
, tracker
, completeTracker
, pool
, user
));
381 if (LOG
.isDebugEnabled()) {
382 LOG
.debug("Add compact mark for store {}, priority={}, current under compaction "
383 + "store size is {}", getStoreNameForUnderCompaction(store
), priority
,
384 underCompactionStores
.size());
386 underCompactionStores
.add(getStoreNameForUnderCompaction(store
));
387 region
.incrementCompactionsQueuedCount();
388 if (LOG
.isDebugEnabled()) {
389 String type
= (pool
== shortCompactions
) ?
"Small " : "Large ";
390 LOG
.debug(type
+ "Compaction requested: " + (selectNow ? compaction
.toString() : "system")
391 + (why
!= null && !why
.isEmpty() ?
"; Because: " + why
: "") + "; " + this);
395 public synchronized void requestSystemCompaction(HRegion region
, String why
) throws IOException
{
396 requestCompactionInternal(region
, why
, NO_PRIORITY
, false, CompactionLifeCycleTracker
.DUMMY
,
397 DUMMY_COMPLETE_TRACKER
, null);
400 public void requestSystemCompaction(HRegion region
, HStore store
, String why
)
402 requestSystemCompaction(region
, store
, why
, false);
405 public synchronized void requestSystemCompaction(HRegion region
, HStore store
, String why
,
406 boolean giveUpIfRequestedOrCompacting
) throws IOException
{
407 if (giveUpIfRequestedOrCompacting
&& isUnderCompaction(store
)) {
408 LOG
.debug("Region {} store {} is under compaction now, skip to request compaction", region
,
409 store
.getColumnFamilyName());
412 requestCompactionInternal(region
, store
, why
, NO_PRIORITY
, false,
413 CompactionLifeCycleTracker
.DUMMY
, DUMMY_COMPLETE_TRACKER
, null);
416 private Optional
<CompactionContext
> selectCompaction(HRegion region
, HStore store
, int priority
,
417 CompactionLifeCycleTracker tracker
, CompactionCompleteTracker completeTracker
, User user
)
419 // don't even select for compaction if disableCompactions is set to true
420 if (!isCompactionsEnabled()) {
421 LOG
.info(String
.format("User has disabled compactions"));
422 return Optional
.empty();
424 Optional
<CompactionContext
> compaction
= store
.requestCompaction(priority
, tracker
, user
);
425 if (!compaction
.isPresent() && region
.getRegionInfo() != null) {
426 String reason
= "Not compacting " + region
.getRegionInfo().getRegionNameAsString() +
427 " because compaction request was cancelled";
428 tracker
.notExecuted(store
, reason
);
429 completeTracker
.completed(store
);
436 * Only interrupt once it's done with a run through the work loop.
438 void interruptIfNecessary() {
440 longCompactions
.shutdown();
441 shortCompactions
.shutdown();
444 private void waitFor(ThreadPoolExecutor t
, String name
) {
445 boolean done
= false;
448 done
= t
.awaitTermination(60, TimeUnit
.SECONDS
);
449 LOG
.info("Waiting for " + name
+ " to finish...");
453 } catch (InterruptedException ie
) {
454 LOG
.warn("Interrupted waiting for " + name
+ " to finish...");
461 waitFor(splits
, "Split Thread");
462 waitFor(longCompactions
, "Large Compaction Thread");
463 waitFor(shortCompactions
, "Small Compaction Thread");
467 * Returns the current size of the queue containing regions that are
470 * @return The current size of the regions queue.
472 public int getCompactionQueueSize() {
473 return longCompactions
.getQueue().size() + shortCompactions
.getQueue().size();
476 public int getLargeCompactionQueueSize() {
477 return longCompactions
.getQueue().size();
481 public int getSmallCompactionQueueSize() {
482 return shortCompactions
.getQueue().size();
485 public int getSplitQueueSize() {
486 return splits
.getQueue().size();
489 private boolean shouldSplitRegion() {
490 if(server
.getNumberOfOnlineRegions() > 0.9*regionSplitLimit
) {
491 LOG
.warn("Total number of regions is approaching the upper limit " + regionSplitLimit
+ ". "
492 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
494 return (regionSplitLimit
> server
.getNumberOfOnlineRegions());
498 * @return the regionSplitLimit
500 public int getRegionSplitLimit() {
501 return this.regionSplitLimit
;
505 * Check if this store is under compaction
507 public boolean isUnderCompaction(final HStore s
) {
508 return underCompactionStores
.contains(getStoreNameForUnderCompaction(s
));
511 private static final Comparator
<Runnable
> COMPARATOR
=
512 new Comparator
<Runnable
>() {
514 private int compare(CompactionRequestImpl r1
, CompactionRequestImpl r2
) {
516 return 0; //they are the same request
519 int cmp
= Integer
.compare(r1
.getPriority(), r2
.getPriority());
523 cmp
= Long
.compare(r1
.getSelectionTime(), r2
.getSelectionTime());
528 // break the tie based on hash code
529 return System
.identityHashCode(r1
) - System
.identityHashCode(r2
);
533 public int compare(Runnable r1
, Runnable r2
) {
534 // CompactionRunner first
535 if (r1
instanceof CompactionRunner
) {
536 if (!(r2
instanceof CompactionRunner
)) {
540 if (r2
instanceof CompactionRunner
) {
543 // break the tie based on hash code
544 return System
.identityHashCode(r1
) - System
.identityHashCode(r2
);
547 CompactionRunner o1
= (CompactionRunner
) r1
;
548 CompactionRunner o2
= (CompactionRunner
) r2
;
550 int cmp
= Integer
.compare(o1
.queuedPriority
, o2
.queuedPriority
);
554 CompactionContext c1
= o1
.compaction
;
555 CompactionContext c2
= o2
.compaction
;
557 return c2
!= null ?
compare(c1
.getRequest(), c2
.getRequest()) : -1;
559 return c2
!= null ?
1 : 0;
564 private final class CompactionRunner
implements Runnable
{
565 private final HStore store
;
566 private final HRegion region
;
567 private final CompactionContext compaction
;
568 private final CompactionLifeCycleTracker tracker
;
569 private final CompactionCompleteTracker completeTracker
;
570 private int queuedPriority
;
571 private ThreadPoolExecutor parent
;
575 public CompactionRunner(HStore store
, HRegion region
, CompactionContext compaction
,
576 CompactionLifeCycleTracker tracker
, CompactionCompleteTracker completeTracker
,
577 ThreadPoolExecutor parent
, User user
) {
579 this.region
= region
;
580 this.compaction
= compaction
;
581 this.tracker
= tracker
;
582 this.completeTracker
= completeTracker
;
583 this.queuedPriority
=
584 compaction
!= null ? compaction
.getRequest().getPriority() : store
.getCompactPriority();
585 this.parent
= parent
;
587 this.time
= EnvironmentEdgeManager
.currentTime();
591 public String
toString() {
592 if (compaction
!= null) {
593 return "Request=" + compaction
.getRequest();
595 return "region=" + region
.toString() + ", storeName=" + store
.toString() +
596 ", priority=" + queuedPriority
+ ", startTime=" + time
;
600 private void doCompaction(User user
) {
602 // Common case - system compaction without a file selection. Select now.
603 if (compaction
== null) {
604 int oldPriority
= this.queuedPriority
;
605 this.queuedPriority
= this.store
.getCompactPriority();
606 if (this.queuedPriority
> oldPriority
) {
607 // Store priority decreased while we were in queue (due to some other compaction?),
608 // requeue with new priority to avoid blocking potential higher priorities.
609 this.parent
.execute(this);
612 Optional
<CompactionContext
> selected
;
614 selected
= selectCompaction(this.region
, this.store
, queuedPriority
, tracker
,
615 completeTracker
, user
);
616 } catch (IOException ex
) {
617 LOG
.error("Compaction selection failed " + this, ex
);
618 server
.checkFileSystem();
619 region
.decrementCompactionsQueuedCount();
622 if (!selected
.isPresent()) {
623 region
.decrementCompactionsQueuedCount();
624 return; // nothing to do
627 assert c
.hasSelection();
628 // Now see if we are in correct pool for the size; if not, go to the correct one.
629 // We might end up waiting for a while, so cancel the selection.
631 ThreadPoolExecutor pool
=
632 store
.throttleCompaction(c
.getRequest().getSize()) ? longCompactions
: shortCompactions
;
634 // Long compaction pool can process small job
635 // Short compaction pool should not process large job
636 if (this.parent
== shortCompactions
&& pool
== longCompactions
) {
637 this.store
.cancelRequestedCompaction(c
);
639 this.parent
.execute(this);
645 // Finally we can compact something.
648 tracker
.beforeExecution(store
);
650 // Note: please don't put single-compaction logic here;
651 // put it into region/store/etc. This is CST logic.
652 long start
= EnvironmentEdgeManager
.currentTime();
654 region
.compact(c
, store
, compactionThroughputController
, user
);
655 long now
= EnvironmentEdgeManager
.currentTime();
656 LOG
.info(((completed
) ?
"Completed" : "Aborted") + " compaction " +
657 this + "; duration=" + StringUtils
.formatTimeDiff(now
, start
));
659 // degenerate case: blocked regions require recursive enqueues
660 if (store
.getCompactPriority() <= 0) {
661 requestSystemCompaction(region
, store
, "Recursive enqueue");
663 // see if the compaction has caused us to exceed max region size
664 requestSplit(region
);
667 } catch (IOException ex
) {
668 IOException remoteEx
=
669 ex
instanceof RemoteException ?
((RemoteException
) ex
).unwrapRemoteException() : ex
;
670 LOG
.error("Compaction failed " + this, remoteEx
);
671 if (remoteEx
!= ex
) {
672 LOG
.info("Compaction failed at original callstack: " + formatStackTrace(ex
));
674 region
.reportCompactionRequestFailure();
675 server
.checkFileSystem();
676 } catch (Exception ex
) {
677 LOG
.error("Compaction failed " + this, ex
);
678 region
.reportCompactionRequestFailure();
679 server
.checkFileSystem();
681 tracker
.afterExecution(store
);
682 completeTracker
.completed(store
);
683 region
.decrementCompactionsQueuedCount();
684 LOG
.debug("Status {}", CompactSplit
.this);
691 Preconditions
.checkNotNull(server
);
692 if (server
.isStopped() || (region
.getTableDescriptor() != null &&
693 !region
.getTableDescriptor().isCompactionEnabled())) {
694 region
.decrementCompactionsQueuedCount();
699 if (LOG
.isDebugEnabled()) {
700 LOG
.debug("Remove under compaction mark for store: {}",
701 store
.getHRegion().getRegionInfo().getEncodedName() + ":" + store
702 .getColumnFamilyName());
704 underCompactionStores
.remove(getStoreNameForUnderCompaction(store
));
708 private String
formatStackTrace(Exception ex
) {
709 StringWriter sw
= new StringWriter();
710 PrintWriter pw
= new PrintWriter(sw
);
711 ex
.printStackTrace(pw
);
713 return sw
.toString();
718 * Cleanup class to use when rejecting a compaction request from the queue.
720 private static class Rejection
implements RejectedExecutionHandler
{
722 public void rejectedExecution(Runnable runnable
, ThreadPoolExecutor pool
) {
723 if (runnable
instanceof CompactionRunner
) {
724 CompactionRunner runner
= (CompactionRunner
) runnable
;
725 LOG
.debug("Compaction Rejected: " + runner
);
726 if (runner
.compaction
!= null) {
727 runner
.store
.cancelRequestedCompaction(runner
.compaction
);
737 public void onConfigurationChange(Configuration newConf
) {
738 // Check if number of large / small compaction threads has changed, and then
739 // adjust the core pool size of the thread pools, by using the
740 // setCorePoolSize() method. According to the javadocs, it is safe to
741 // change the core pool size on-the-fly. We need to reset the maximum
742 // pool size, as well.
743 int largeThreads
= Math
.max(1, newConf
.getInt(
744 LARGE_COMPACTION_THREADS
,
745 LARGE_COMPACTION_THREADS_DEFAULT
));
746 if (this.longCompactions
.getCorePoolSize() != largeThreads
) {
747 LOG
.info("Changing the value of " + LARGE_COMPACTION_THREADS
+
748 " from " + this.longCompactions
.getCorePoolSize() + " to " +
750 if(this.longCompactions
.getCorePoolSize() < largeThreads
) {
751 this.longCompactions
.setMaximumPoolSize(largeThreads
);
752 this.longCompactions
.setCorePoolSize(largeThreads
);
754 this.longCompactions
.setCorePoolSize(largeThreads
);
755 this.longCompactions
.setMaximumPoolSize(largeThreads
);
759 int smallThreads
= newConf
.getInt(SMALL_COMPACTION_THREADS
,
760 SMALL_COMPACTION_THREADS_DEFAULT
);
761 if (this.shortCompactions
.getCorePoolSize() != smallThreads
) {
762 LOG
.info("Changing the value of " + SMALL_COMPACTION_THREADS
+
763 " from " + this.shortCompactions
.getCorePoolSize() + " to " +
765 if(this.shortCompactions
.getCorePoolSize() < smallThreads
) {
766 this.shortCompactions
.setMaximumPoolSize(smallThreads
);
767 this.shortCompactions
.setCorePoolSize(smallThreads
);
769 this.shortCompactions
.setCorePoolSize(smallThreads
);
770 this.shortCompactions
.setMaximumPoolSize(smallThreads
);
774 int splitThreads
= newConf
.getInt(SPLIT_THREADS
,
775 SPLIT_THREADS_DEFAULT
);
776 if (this.splits
.getCorePoolSize() != splitThreads
) {
777 LOG
.info("Changing the value of " + SPLIT_THREADS
+
778 " from " + this.splits
.getCorePoolSize() + " to " +
780 if(this.splits
.getCorePoolSize() < splitThreads
) {
781 this.splits
.setMaximumPoolSize(splitThreads
);
782 this.splits
.setCorePoolSize(splitThreads
);
784 this.splits
.setCorePoolSize(splitThreads
);
785 this.splits
.setMaximumPoolSize(splitThreads
);
789 ThroughputController old
= this.compactionThroughputController
;
791 old
.stop("configuration change");
793 this.compactionThroughputController
=
794 CompactionThroughputControllerFactory
.create(server
, newConf
);
796 // We change this atomically here instead of reloading the config in order that upstream
797 // would be the only one with the flexibility to reload the config.
798 this.conf
.reloadConfiguration();
801 protected int getSmallCompactionThreadNum() {
802 return this.shortCompactions
.getCorePoolSize();
805 protected int getLargeCompactionThreadNum() {
806 return this.longCompactions
.getCorePoolSize();
809 protected int getSplitThreadNum() {
810 return this.splits
.getCorePoolSize();
817 public void registerChildren(ConfigurationManager manager
) {
818 // No children to register.
825 public void deregisterChildren(ConfigurationManager manager
) {
826 // No children to register
829 public ThroughputController
getCompactionThroughputController() {
830 return compactionThroughputController
;
834 * Shutdown the long compaction thread pool.
835 * Should only be used in unit test to prevent long compaction thread pool from stealing job
836 * from short compaction queue
838 void shutdownLongCompactions(){
839 this.longCompactions
.shutdown();
842 public void clearLongCompactionsQueue() {
843 longCompactions
.getQueue().clear();
846 public void clearShortCompactionsQueue() {
847 shortCompactions
.getQueue().clear();
850 public boolean isCompactionsEnabled() {
851 return compactionsEnabled
;
854 public void setCompactionsEnabled(boolean compactionsEnabled
) {
855 this.compactionsEnabled
= compactionsEnabled
;
856 this.conf
.set(HBASE_REGION_SERVER_ENABLE_COMPACTION
,String
.valueOf(compactionsEnabled
));
860 * @return the longCompactions thread pool executor
862 ThreadPoolExecutor
getLongCompactions() {
863 return longCompactions
;
867 * @return the shortCompactions thread pool executor
869 ThreadPoolExecutor
getShortCompactions() {
870 return shortCompactions
;
873 private String
getStoreNameForUnderCompaction(HStore store
) {
874 return String
.format("%s:%s",
875 store
.getHRegion() != null ? store
.getHRegion().getRegionInfo().getEncodedName() : "",
876 store
.getColumnFamilyName());