HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / regionserver / CompactSplit.java
blobbb8c9b836616a1cc604ef369925217ccf8b203ce
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.regionserver;
21 import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY;
22 import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Comparator;
28 import java.util.Iterator;
29 import java.util.Optional;
30 import java.util.Set;
31 import java.util.concurrent.BlockingQueue;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.RejectedExecutionHandler;
36 import java.util.concurrent.ThreadPoolExecutor;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.IntSupplier;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.conf.ConfigurationManager;
42 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
43 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
44 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
45 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
46 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
47 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
48 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
49 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
50 import org.apache.hadoop.hbase.security.Superusers;
51 import org.apache.hadoop.hbase.security.User;
52 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53 import org.apache.hadoop.hbase.util.StealJobQueue;
54 import org.apache.hadoop.ipc.RemoteException;
55 import org.apache.hadoop.util.StringUtils;
56 import org.apache.yetus.audience.InterfaceAudience;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
61 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
63 /**
64 * Compact region on request and then run split if appropriate
66 @InterfaceAudience.Private
67 public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
68 private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);
70 // Configuration key for the large compaction threads.
71 public final static String LARGE_COMPACTION_THREADS =
72 "hbase.regionserver.thread.compaction.large";
73 public final static int LARGE_COMPACTION_THREADS_DEFAULT = 1;
75 // Configuration key for the small compaction threads.
76 public final static String SMALL_COMPACTION_THREADS =
77 "hbase.regionserver.thread.compaction.small";
78 public final static int SMALL_COMPACTION_THREADS_DEFAULT = 1;
80 // Configuration key for split threads
81 public final static String SPLIT_THREADS = "hbase.regionserver.thread.split";
82 public final static int SPLIT_THREADS_DEFAULT = 1;
84 public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
85 "hbase.regionserver.regionSplitLimit";
86 public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
87 public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
88 "hbase.regionserver.compaction.enabled";
90 private final HRegionServer server;
91 private final Configuration conf;
92 private volatile ThreadPoolExecutor longCompactions;
93 private volatile ThreadPoolExecutor shortCompactions;
94 private volatile ThreadPoolExecutor splits;
96 private volatile ThroughputController compactionThroughputController;
97 private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
99 private volatile boolean compactionsEnabled;
101 * Splitting should not take place if the total number of regions exceed this.
102 * This is not a hard limit to the number of regions but it is a guideline to
103 * stop splitting after number of online regions is greater than this.
105 private int regionSplitLimit;
107 CompactSplit(HRegionServer server) {
108 this.server = server;
109 this.conf = server.getConfiguration();
110 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
111 createCompactionExecutors();
112 createSplitExcecutors();
114 // compaction throughput controller
115 this.compactionThroughputController =
116 CompactionThroughputControllerFactory.create(server, conf);
119 // only for test
120 public CompactSplit(Configuration conf) {
121 this.server = null;
122 this.conf = conf;
123 this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
124 createCompactionExecutors();
125 createSplitExcecutors();
128 private void createSplitExcecutors() {
129 final String n = Thread.currentThread().getName();
130 int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
131 this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
132 new ThreadFactoryBuilder().setNameFormat(n + "-splits-%d").setDaemon(true).build());
135 private void createCompactionExecutors() {
136 this.regionSplitLimit =
137 conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
139 int largeThreads =
140 Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
141 int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
143 // if we have throttle threads, make sure the user also specified size
144 Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
146 final String n = Thread.currentThread().getName();
148 StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
149 this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
150 stealJobQueue, new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d")
151 .setDaemon(true).build());
152 this.longCompactions.setRejectedExecutionHandler(new Rejection());
153 this.longCompactions.prestartAllCoreThreads();
154 this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
155 stealJobQueue.getStealFromQueue(), new ThreadFactoryBuilder()
156 .setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
157 this.shortCompactions.setRejectedExecutionHandler(new Rejection());
160 @Override
161 public String toString() {
162 return "compactionQueue=(longCompactions="
163 + longCompactions.getQueue().size() + ":shortCompactions="
164 + shortCompactions.getQueue().size() + ")"
165 + ", splitQueue=" + splits.getQueue().size();
168 public String dumpQueue() {
169 StringBuilder queueLists = new StringBuilder();
170 queueLists.append("Compaction/Split Queue dump:\n");
171 queueLists.append(" LargeCompation Queue:\n");
172 BlockingQueue<Runnable> lq = longCompactions.getQueue();
173 Iterator<Runnable> it = lq.iterator();
174 while (it.hasNext()) {
175 queueLists.append(" " + it.next().toString());
176 queueLists.append("\n");
179 if (shortCompactions != null) {
180 queueLists.append("\n");
181 queueLists.append(" SmallCompation Queue:\n");
182 lq = shortCompactions.getQueue();
183 it = lq.iterator();
184 while (it.hasNext()) {
185 queueLists.append(" " + it.next().toString());
186 queueLists.append("\n");
190 queueLists.append("\n");
191 queueLists.append(" Split Queue:\n");
192 lq = splits.getQueue();
193 it = lq.iterator();
194 while (it.hasNext()) {
195 queueLists.append(" " + it.next().toString());
196 queueLists.append("\n");
199 return queueLists.toString();
202 public synchronized boolean requestSplit(final Region r) {
203 // don't split regions that are blocking
204 HRegion hr = (HRegion)r;
205 try {
206 if (shouldSplitRegion() && hr.getCompactPriority() >= PRIORITY_USER) {
207 byte[] midKey = hr.checkSplit().orElse(null);
208 if (midKey != null) {
209 requestSplit(r, midKey);
210 return true;
213 } catch (IndexOutOfBoundsException e) {
214 // We get this sometimes. Not sure why. Catch and return false; no split request.
215 LOG.warn("Catching out-of-bounds; region={}, policy={}", hr == null? null: hr.getRegionInfo(),
216 hr == null? "null": hr.getCompactPriority(), e);
218 return false;
221 public synchronized void requestSplit(final Region r, byte[] midKey) {
222 requestSplit(r, midKey, null);
226 * The User parameter allows the split thread to assume the correct user identity
228 public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
229 if (midKey == null) {
230 LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
231 " not splittable because midkey=null");
232 return;
234 try {
235 this.splits.execute(new SplitRequest(r, midKey, this.server, user));
236 if (LOG.isDebugEnabled()) {
237 LOG.debug("Splitting " + r + ", " + this);
239 } catch (RejectedExecutionException ree) {
240 LOG.info("Could not execute split for " + r, ree);
244 private void interrupt() {
245 longCompactions.shutdownNow();
246 shortCompactions.shutdownNow();
249 private void reInitializeCompactionsExecutors() {
250 createCompactionExecutors();
253 // set protected for test
254 protected interface CompactionCompleteTracker {
256 default void completed(Store store) {
260 private static final CompactionCompleteTracker DUMMY_COMPLETE_TRACKER =
261 new CompactionCompleteTracker() {};
263 private static final class AggregatingCompleteTracker implements CompactionCompleteTracker {
265 private final CompactionLifeCycleTracker tracker;
267 private final AtomicInteger remaining;
269 public AggregatingCompleteTracker(CompactionLifeCycleTracker tracker, int numberOfStores) {
270 this.tracker = tracker;
271 this.remaining = new AtomicInteger(numberOfStores);
274 @Override
275 public void completed(Store store) {
276 if (remaining.decrementAndGet() == 0) {
277 tracker.completed();
282 private CompactionCompleteTracker getCompleteTracker(CompactionLifeCycleTracker tracker,
283 IntSupplier numberOfStores) {
284 if (tracker == CompactionLifeCycleTracker.DUMMY) {
285 // a simple optimization to avoid creating unnecessary objects as usually we do not care about
286 // the life cycle of a compaction.
287 return DUMMY_COMPLETE_TRACKER;
288 } else {
289 return new AggregatingCompleteTracker(tracker, numberOfStores.getAsInt());
293 @Override
294 public synchronized void requestCompaction(HRegion region, String why, int priority,
295 CompactionLifeCycleTracker tracker, User user) throws IOException {
296 requestCompactionInternal(region, why, priority, true, tracker,
297 getCompleteTracker(tracker, () -> region.getTableDescriptor().getColumnFamilyCount()), user);
300 @Override
301 public synchronized void requestCompaction(HRegion region, HStore store, String why, int priority,
302 CompactionLifeCycleTracker tracker, User user) throws IOException {
303 requestCompactionInternal(region, store, why, priority, true, tracker,
304 getCompleteTracker(tracker, () -> 1), user);
307 @Override
308 public void switchCompaction(boolean onOrOff) {
309 if (onOrOff) {
310 // re-create executor pool if compactions are disabled.
311 if (!isCompactionsEnabled()) {
312 LOG.info("Re-Initializing compactions because user switched on compactions");
313 reInitializeCompactionsExecutors();
315 } else {
316 LOG.info("Interrupting running compactions because user switched off compactions");
317 interrupt();
319 setCompactionsEnabled(onOrOff);
322 private void requestCompactionInternal(HRegion region, String why, int priority,
323 boolean selectNow, CompactionLifeCycleTracker tracker,
324 CompactionCompleteTracker completeTracker, User user) throws IOException {
325 // request compaction on all stores
326 for (HStore store : region.stores.values()) {
327 requestCompactionInternal(region, store, why, priority, selectNow, tracker, completeTracker,
328 user);
332 // set protected for test
333 protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
334 boolean selectNow, CompactionLifeCycleTracker tracker,
335 CompactionCompleteTracker completeTracker, User user) throws IOException {
336 if (this.server.isStopped() || (region.getTableDescriptor() != null &&
337 !region.getTableDescriptor().isCompactionEnabled())) {
338 return;
340 RegionServerSpaceQuotaManager spaceQuotaManager =
341 this.server.getRegionServerSpaceQuotaManager();
343 if (user != null && !Superusers.isSuperUser(user) && spaceQuotaManager != null
344 && spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName())) {
345 // Enter here only when:
346 // It's a user generated req, the user is super user, quotas enabled, compactions disabled.
347 String reason = "Ignoring compaction request for " + region +
348 " as an active space quota violation " + " policy disallows compactions.";
349 tracker.notExecuted(store, reason);
350 completeTracker.completed(store);
351 LOG.debug(reason);
352 return;
355 CompactionContext compaction;
356 if (selectNow) {
357 Optional<CompactionContext> c =
358 selectCompaction(region, store, priority, tracker, completeTracker, user);
359 if (!c.isPresent()) {
360 // message logged inside
361 return;
363 compaction = c.get();
364 } else {
365 compaction = null;
368 ThreadPoolExecutor pool;
369 if (selectNow) {
370 // compaction.get is safe as we will just return if selectNow is true but no compaction is
371 // selected
372 pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
373 : shortCompactions;
374 } else {
375 // We assume that most compactions are small. So, put system compactions into small
376 // pool; we will do selection there, and move to large pool if necessary.
377 pool = shortCompactions;
379 pool.execute(
380 new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
381 if (LOG.isDebugEnabled()) {
382 LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
383 + "store size is {}", getStoreNameForUnderCompaction(store), priority,
384 underCompactionStores.size());
386 underCompactionStores.add(getStoreNameForUnderCompaction(store));
387 region.incrementCompactionsQueuedCount();
388 if (LOG.isDebugEnabled()) {
389 String type = (pool == shortCompactions) ? "Small " : "Large ";
390 LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
391 + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
395 public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException {
396 requestCompactionInternal(region, why, NO_PRIORITY, false, CompactionLifeCycleTracker.DUMMY,
397 DUMMY_COMPLETE_TRACKER, null);
400 public void requestSystemCompaction(HRegion region, HStore store, String why)
401 throws IOException {
402 requestSystemCompaction(region, store, why, false);
405 public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
406 boolean giveUpIfRequestedOrCompacting) throws IOException {
407 if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
408 LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
409 store.getColumnFamilyName());
410 return;
412 requestCompactionInternal(region, store, why, NO_PRIORITY, false,
413 CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
416 private Optional<CompactionContext> selectCompaction(HRegion region, HStore store, int priority,
417 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker, User user)
418 throws IOException {
419 // don't even select for compaction if disableCompactions is set to true
420 if (!isCompactionsEnabled()) {
421 LOG.info(String.format("User has disabled compactions"));
422 return Optional.empty();
424 Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker, user);
425 if (!compaction.isPresent() && region.getRegionInfo() != null) {
426 String reason = "Not compacting " + region.getRegionInfo().getRegionNameAsString() +
427 " because compaction request was cancelled";
428 tracker.notExecuted(store, reason);
429 completeTracker.completed(store);
430 LOG.debug(reason);
432 return compaction;
436 * Only interrupt once it's done with a run through the work loop.
438 void interruptIfNecessary() {
439 splits.shutdown();
440 longCompactions.shutdown();
441 shortCompactions.shutdown();
444 private void waitFor(ThreadPoolExecutor t, String name) {
445 boolean done = false;
446 while (!done) {
447 try {
448 done = t.awaitTermination(60, TimeUnit.SECONDS);
449 LOG.info("Waiting for " + name + " to finish...");
450 if (!done) {
451 t.shutdownNow();
453 } catch (InterruptedException ie) {
454 LOG.warn("Interrupted waiting for " + name + " to finish...");
455 t.shutdownNow();
460 void join() {
461 waitFor(splits, "Split Thread");
462 waitFor(longCompactions, "Large Compaction Thread");
463 waitFor(shortCompactions, "Small Compaction Thread");
467 * Returns the current size of the queue containing regions that are
468 * processed.
470 * @return The current size of the regions queue.
472 public int getCompactionQueueSize() {
473 return longCompactions.getQueue().size() + shortCompactions.getQueue().size();
476 public int getLargeCompactionQueueSize() {
477 return longCompactions.getQueue().size();
481 public int getSmallCompactionQueueSize() {
482 return shortCompactions.getQueue().size();
485 public int getSplitQueueSize() {
486 return splits.getQueue().size();
489 private boolean shouldSplitRegion() {
490 if(server.getNumberOfOnlineRegions() > 0.9*regionSplitLimit) {
491 LOG.warn("Total number of regions is approaching the upper limit " + regionSplitLimit + ". "
492 + "Please consider taking a look at http://hbase.apache.org/book.html#ops.regionmgt");
494 return (regionSplitLimit > server.getNumberOfOnlineRegions());
498 * @return the regionSplitLimit
500 public int getRegionSplitLimit() {
501 return this.regionSplitLimit;
505 * Check if this store is under compaction
507 public boolean isUnderCompaction(final HStore s) {
508 return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
511 private static final Comparator<Runnable> COMPARATOR =
512 new Comparator<Runnable>() {
514 private int compare(CompactionRequestImpl r1, CompactionRequestImpl r2) {
515 if (r1 == r2) {
516 return 0; //they are the same request
518 // less first
519 int cmp = Integer.compare(r1.getPriority(), r2.getPriority());
520 if (cmp != 0) {
521 return cmp;
523 cmp = Long.compare(r1.getSelectionTime(), r2.getSelectionTime());
524 if (cmp != 0) {
525 return cmp;
528 // break the tie based on hash code
529 return System.identityHashCode(r1) - System.identityHashCode(r2);
532 @Override
533 public int compare(Runnable r1, Runnable r2) {
534 // CompactionRunner first
535 if (r1 instanceof CompactionRunner) {
536 if (!(r2 instanceof CompactionRunner)) {
537 return -1;
539 } else {
540 if (r2 instanceof CompactionRunner) {
541 return 1;
542 } else {
543 // break the tie based on hash code
544 return System.identityHashCode(r1) - System.identityHashCode(r2);
547 CompactionRunner o1 = (CompactionRunner) r1;
548 CompactionRunner o2 = (CompactionRunner) r2;
549 // less first
550 int cmp = Integer.compare(o1.queuedPriority, o2.queuedPriority);
551 if (cmp != 0) {
552 return cmp;
554 CompactionContext c1 = o1.compaction;
555 CompactionContext c2 = o2.compaction;
556 if (c1 != null) {
557 return c2 != null ? compare(c1.getRequest(), c2.getRequest()) : -1;
558 } else {
559 return c2 != null ? 1 : 0;
564 private final class CompactionRunner implements Runnable {
565 private final HStore store;
566 private final HRegion region;
567 private final CompactionContext compaction;
568 private final CompactionLifeCycleTracker tracker;
569 private final CompactionCompleteTracker completeTracker;
570 private int queuedPriority;
571 private ThreadPoolExecutor parent;
572 private User user;
573 private long time;
575 public CompactionRunner(HStore store, HRegion region, CompactionContext compaction,
576 CompactionLifeCycleTracker tracker, CompactionCompleteTracker completeTracker,
577 ThreadPoolExecutor parent, User user) {
578 this.store = store;
579 this.region = region;
580 this.compaction = compaction;
581 this.tracker = tracker;
582 this.completeTracker = completeTracker;
583 this.queuedPriority =
584 compaction != null ? compaction.getRequest().getPriority() : store.getCompactPriority();
585 this.parent = parent;
586 this.user = user;
587 this.time = EnvironmentEdgeManager.currentTime();
590 @Override
591 public String toString() {
592 if (compaction != null) {
593 return "Request=" + compaction.getRequest();
594 } else {
595 return "region=" + region.toString() + ", storeName=" + store.toString() +
596 ", priority=" + queuedPriority + ", startTime=" + time;
600 private void doCompaction(User user) {
601 CompactionContext c;
602 // Common case - system compaction without a file selection. Select now.
603 if (compaction == null) {
604 int oldPriority = this.queuedPriority;
605 this.queuedPriority = this.store.getCompactPriority();
606 if (this.queuedPriority > oldPriority) {
607 // Store priority decreased while we were in queue (due to some other compaction?),
608 // requeue with new priority to avoid blocking potential higher priorities.
609 this.parent.execute(this);
610 return;
612 Optional<CompactionContext> selected;
613 try {
614 selected = selectCompaction(this.region, this.store, queuedPriority, tracker,
615 completeTracker, user);
616 } catch (IOException ex) {
617 LOG.error("Compaction selection failed " + this, ex);
618 server.checkFileSystem();
619 region.decrementCompactionsQueuedCount();
620 return;
622 if (!selected.isPresent()) {
623 region.decrementCompactionsQueuedCount();
624 return; // nothing to do
626 c = selected.get();
627 assert c.hasSelection();
628 // Now see if we are in correct pool for the size; if not, go to the correct one.
629 // We might end up waiting for a while, so cancel the selection.
631 ThreadPoolExecutor pool =
632 store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
634 // Long compaction pool can process small job
635 // Short compaction pool should not process large job
636 if (this.parent == shortCompactions && pool == longCompactions) {
637 this.store.cancelRequestedCompaction(c);
638 this.parent = pool;
639 this.parent.execute(this);
640 return;
642 } else {
643 c = compaction;
645 // Finally we can compact something.
646 assert c != null;
648 tracker.beforeExecution(store);
649 try {
650 // Note: please don't put single-compaction logic here;
651 // put it into region/store/etc. This is CST logic.
652 long start = EnvironmentEdgeManager.currentTime();
653 boolean completed =
654 region.compact(c, store, compactionThroughputController, user);
655 long now = EnvironmentEdgeManager.currentTime();
656 LOG.info(((completed) ? "Completed" : "Aborted") + " compaction " +
657 this + "; duration=" + StringUtils.formatTimeDiff(now, start));
658 if (completed) {
659 // degenerate case: blocked regions require recursive enqueues
660 if (store.getCompactPriority() <= 0) {
661 requestSystemCompaction(region, store, "Recursive enqueue");
662 } else {
663 // see if the compaction has caused us to exceed max region size
664 requestSplit(region);
667 } catch (IOException ex) {
668 IOException remoteEx =
669 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex;
670 LOG.error("Compaction failed " + this, remoteEx);
671 if (remoteEx != ex) {
672 LOG.info("Compaction failed at original callstack: " + formatStackTrace(ex));
674 region.reportCompactionRequestFailure();
675 server.checkFileSystem();
676 } catch (Exception ex) {
677 LOG.error("Compaction failed " + this, ex);
678 region.reportCompactionRequestFailure();
679 server.checkFileSystem();
680 } finally {
681 tracker.afterExecution(store);
682 completeTracker.completed(store);
683 region.decrementCompactionsQueuedCount();
684 LOG.debug("Status {}", CompactSplit.this);
688 @Override
689 public void run() {
690 try {
691 Preconditions.checkNotNull(server);
692 if (server.isStopped() || (region.getTableDescriptor() != null &&
693 !region.getTableDescriptor().isCompactionEnabled())) {
694 region.decrementCompactionsQueuedCount();
695 return;
697 doCompaction(user);
698 } finally {
699 if (LOG.isDebugEnabled()) {
700 LOG.debug("Remove under compaction mark for store: {}",
701 store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
702 .getColumnFamilyName());
704 underCompactionStores.remove(getStoreNameForUnderCompaction(store));
708 private String formatStackTrace(Exception ex) {
709 StringWriter sw = new StringWriter();
710 PrintWriter pw = new PrintWriter(sw);
711 ex.printStackTrace(pw);
712 pw.flush();
713 return sw.toString();
718 * Cleanup class to use when rejecting a compaction request from the queue.
720 private static class Rejection implements RejectedExecutionHandler {
721 @Override
722 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
723 if (runnable instanceof CompactionRunner) {
724 CompactionRunner runner = (CompactionRunner) runnable;
725 LOG.debug("Compaction Rejected: " + runner);
726 if (runner.compaction != null) {
727 runner.store.cancelRequestedCompaction(runner.compaction);
734 * {@inheritDoc}
736 @Override
737 public void onConfigurationChange(Configuration newConf) {
738 // Check if number of large / small compaction threads has changed, and then
739 // adjust the core pool size of the thread pools, by using the
740 // setCorePoolSize() method. According to the javadocs, it is safe to
741 // change the core pool size on-the-fly. We need to reset the maximum
742 // pool size, as well.
743 int largeThreads = Math.max(1, newConf.getInt(
744 LARGE_COMPACTION_THREADS,
745 LARGE_COMPACTION_THREADS_DEFAULT));
746 if (this.longCompactions.getCorePoolSize() != largeThreads) {
747 LOG.info("Changing the value of " + LARGE_COMPACTION_THREADS +
748 " from " + this.longCompactions.getCorePoolSize() + " to " +
749 largeThreads);
750 if(this.longCompactions.getCorePoolSize() < largeThreads) {
751 this.longCompactions.setMaximumPoolSize(largeThreads);
752 this.longCompactions.setCorePoolSize(largeThreads);
753 } else {
754 this.longCompactions.setCorePoolSize(largeThreads);
755 this.longCompactions.setMaximumPoolSize(largeThreads);
759 int smallThreads = newConf.getInt(SMALL_COMPACTION_THREADS,
760 SMALL_COMPACTION_THREADS_DEFAULT);
761 if (this.shortCompactions.getCorePoolSize() != smallThreads) {
762 LOG.info("Changing the value of " + SMALL_COMPACTION_THREADS +
763 " from " + this.shortCompactions.getCorePoolSize() + " to " +
764 smallThreads);
765 if(this.shortCompactions.getCorePoolSize() < smallThreads) {
766 this.shortCompactions.setMaximumPoolSize(smallThreads);
767 this.shortCompactions.setCorePoolSize(smallThreads);
768 } else {
769 this.shortCompactions.setCorePoolSize(smallThreads);
770 this.shortCompactions.setMaximumPoolSize(smallThreads);
774 int splitThreads = newConf.getInt(SPLIT_THREADS,
775 SPLIT_THREADS_DEFAULT);
776 if (this.splits.getCorePoolSize() != splitThreads) {
777 LOG.info("Changing the value of " + SPLIT_THREADS +
778 " from " + this.splits.getCorePoolSize() + " to " +
779 splitThreads);
780 if(this.splits.getCorePoolSize() < splitThreads) {
781 this.splits.setMaximumPoolSize(splitThreads);
782 this.splits.setCorePoolSize(splitThreads);
783 } else {
784 this.splits.setCorePoolSize(splitThreads);
785 this.splits.setMaximumPoolSize(splitThreads);
789 ThroughputController old = this.compactionThroughputController;
790 if (old != null) {
791 old.stop("configuration change");
793 this.compactionThroughputController =
794 CompactionThroughputControllerFactory.create(server, newConf);
796 // We change this atomically here instead of reloading the config in order that upstream
797 // would be the only one with the flexibility to reload the config.
798 this.conf.reloadConfiguration();
801 protected int getSmallCompactionThreadNum() {
802 return this.shortCompactions.getCorePoolSize();
805 protected int getLargeCompactionThreadNum() {
806 return this.longCompactions.getCorePoolSize();
809 protected int getSplitThreadNum() {
810 return this.splits.getCorePoolSize();
814 * {@inheritDoc}
816 @Override
817 public void registerChildren(ConfigurationManager manager) {
818 // No children to register.
822 * {@inheritDoc}
824 @Override
825 public void deregisterChildren(ConfigurationManager manager) {
826 // No children to register
829 public ThroughputController getCompactionThroughputController() {
830 return compactionThroughputController;
834 * Shutdown the long compaction thread pool.
835 * Should only be used in unit test to prevent long compaction thread pool from stealing job
836 * from short compaction queue
838 void shutdownLongCompactions(){
839 this.longCompactions.shutdown();
842 public void clearLongCompactionsQueue() {
843 longCompactions.getQueue().clear();
846 public void clearShortCompactionsQueue() {
847 shortCompactions.getQueue().clear();
850 public boolean isCompactionsEnabled() {
851 return compactionsEnabled;
854 public void setCompactionsEnabled(boolean compactionsEnabled) {
855 this.compactionsEnabled = compactionsEnabled;
856 this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled));
860 * @return the longCompactions thread pool executor
862 ThreadPoolExecutor getLongCompactions() {
863 return longCompactions;
867 * @return the shortCompactions thread pool executor
869 ThreadPoolExecutor getShortCompactions() {
870 return shortCompactions;
873 private String getStoreNameForUnderCompaction(HStore store) {
874 return String.format("%s:%s",
875 store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
876 store.getColumnFamilyName());