HBASE-24163 MOB compactor implementations should use format specifiers when calling...
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / ChoreService.java
blob9dbb307df40243072c3a7d0cb6d25a54bbdd01f5
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.Map.Entry;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicInteger;
30 import org.apache.hadoop.hbase.ScheduledChore.ChoreServicer;
31 import org.apache.yetus.audience.InterfaceAudience;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
37 /**
38 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
39 * periodically while sharing threads. The ChoreService is backed by a
40 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
41 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
42 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
43 * <p>
44 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
45 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
46 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
47 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
48 * there may be a need to increase the number of threads if it is noticed that chores are no longer
49 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
50 * made to reduce the number of running threads to see if chores can still meet their start times
51 * with a smaller thread pool.
52 * <p>
53 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
54 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
56 @InterfaceAudience.Public
57 public class ChoreService implements ChoreServicer {
58 private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
60 /**
61 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
63 @InterfaceAudience.Private
64 public final static int MIN_CORE_POOL_SIZE = 1;
66 /**
67 * This thread pool is used to schedule all of the Chores
69 private final ScheduledThreadPoolExecutor scheduler;
71 /**
72 * Maps chores to their futures. Futures are used to control a chore's schedule
74 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
76 /**
77 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
78 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
79 * increase the core pool size by 1 (otherwise a single long running chore whose execution is
80 * longer than its period would be able to spawn too many threads).
82 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
84 /**
85 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
86 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
87 * running on. The prefix is useful because it allows us to monitor how the thread pool of a
88 * particular service changes over time VIA thread dumps.
90 private final String coreThreadPoolPrefix;
92 /**
94 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
95 * spawned by this service
97 @InterfaceAudience.Private
98 @VisibleForTesting
99 public ChoreService(final String coreThreadPoolPrefix) {
100 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
104 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
105 * spawned by this service
106 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
107 * to true this will add -10% to 10% jitter.
109 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
110 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
114 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
115 * spawned by this service
116 * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
117 * to during initialization. The default size is 1, but specifying a larger size may be
118 * beneficial if you know that 1 thread will not be enough.
119 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
120 * to true this will add -10% to 10% jitter.
122 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
123 this.coreThreadPoolPrefix = coreThreadPoolPrefix;
124 if (corePoolSize < MIN_CORE_POOL_SIZE) {
125 corePoolSize = MIN_CORE_POOL_SIZE;
128 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
129 if (jitter) {
130 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
131 } else {
132 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
135 scheduler.setRemoveOnCancelPolicy(true);
136 scheduledChores = new HashMap<>();
137 choresMissingStartTime = new HashMap<>();
141 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
142 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
143 * with a single ChoreService instance).
144 * @return true when the chore was successfully scheduled. false when the scheduling failed
145 * (typically occurs when a chore is scheduled during shutdown of service)
147 public synchronized boolean scheduleChore(ScheduledChore chore) {
148 if (chore == null) {
149 return false;
152 try {
153 if (chore.getPeriod() <= 0) {
154 LOG.info("Chore {} is disabled because its period is not positive.", chore);
155 return false;
157 LOG.info("Chore {} is enabled.", chore);
158 chore.setChoreServicer(this);
159 ScheduledFuture<?> future =
160 scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(), chore.getPeriod(),
161 chore.getTimeUnit());
162 scheduledChores.put(chore, future);
163 return true;
164 } catch (Exception exception) {
165 if (LOG.isInfoEnabled()) {
166 LOG.info("Could not successfully schedule chore: " + chore.getName());
168 return false;
173 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
174 * yet then this call is equivalent to a call to scheduleChore.
176 private void rescheduleChore(ScheduledChore chore) {
177 if (scheduledChores.containsKey(chore)) {
178 ScheduledFuture<?> future = scheduledChores.get(chore);
179 future.cancel(false);
181 scheduleChore(chore);
184 @InterfaceAudience.Private
185 @Override
186 public synchronized void cancelChore(ScheduledChore chore) {
187 cancelChore(chore, true);
190 @InterfaceAudience.Private
191 @Override
192 public synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
193 if (chore != null && scheduledChores.containsKey(chore)) {
194 ScheduledFuture<?> future = scheduledChores.get(chore);
195 future.cancel(mayInterruptIfRunning);
196 scheduledChores.remove(chore);
198 // Removing a chore that was missing its start time means it may be possible
199 // to reduce the number of threads
200 if (choresMissingStartTime.containsKey(chore)) {
201 choresMissingStartTime.remove(chore);
202 requestCorePoolDecrease();
207 @InterfaceAudience.Private
208 @Override
209 public synchronized boolean isChoreScheduled(ScheduledChore chore) {
210 return chore != null && scheduledChores.containsKey(chore)
211 && !scheduledChores.get(chore).isDone();
214 @InterfaceAudience.Private
215 @Override
216 public synchronized boolean triggerNow(ScheduledChore chore) {
217 if (chore != null) {
218 rescheduleChore(chore);
219 return true;
221 return false;
225 * @return number of chores that this service currently has scheduled
227 int getNumberOfScheduledChores() {
228 return scheduledChores.size();
232 * @return number of chores that this service currently has scheduled that are missing their
233 * scheduled start time
235 int getNumberOfChoresMissingStartTime() {
236 return choresMissingStartTime.size();
240 * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
242 int getCorePoolSize() {
243 return scheduler.getCorePoolSize();
247 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
248 * daemon threads, and thus, don't prevent the JVM from shutting down
250 static class ChoreServiceThreadFactory implements ThreadFactory {
251 private final String threadPrefix;
252 private final static String THREAD_NAME_SUFFIX = ".Chore.";
253 private AtomicInteger threadNumber = new AtomicInteger(1);
256 * @param threadPrefix The prefix given to all threads created by this factory
258 public ChoreServiceThreadFactory(final String threadPrefix) {
259 this.threadPrefix = threadPrefix;
262 @Override
263 public Thread newThread(Runnable r) {
264 Thread thread =
265 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
266 thread.setDaemon(true);
267 return thread;
272 * Represents a request to increase the number of core pool threads. Typically a request
273 * originates from the fact that the current core pool size is not sufficient to service all of
274 * the currently running Chores
275 * @return true when the request to increase the core pool size succeeds
277 private synchronized boolean requestCorePoolIncrease() {
278 // There is no point in creating more threads than scheduledChores.size since scheduled runs
279 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
280 // amongst occurrences of the same chore).
281 if (scheduler.getCorePoolSize() < scheduledChores.size()) {
282 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
283 printChoreServiceDetails("requestCorePoolIncrease");
284 return true;
286 return false;
290 * Represents a request to decrease the number of core pool threads. Typically a request
291 * originates from the fact that the current core pool size is more than sufficient to service the
292 * running Chores.
294 private synchronized void requestCorePoolDecrease() {
295 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
296 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
297 printChoreServiceDetails("requestCorePoolDecrease");
301 @InterfaceAudience.Private
302 @Override
303 public synchronized void onChoreMissedStartTime(ScheduledChore chore) {
304 if (chore == null || !scheduledChores.containsKey(chore)) return;
306 // If the chore has not caused an increase in the size of the core thread pool then request an
307 // increase. This allows each chore missing its start time to increase the core pool size by
308 // at most 1.
309 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
310 choresMissingStartTime.put(chore, requestCorePoolIncrease());
313 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
314 // the chore is NOT rescheduled, future executions of this chore will be delayed more and
315 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
316 // idle threads to chores based on how delayed they are.
317 rescheduleChore(chore);
318 printChoreDetails("onChoreMissedStartTime", chore);
322 * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
323 * in the middle of execution will be interrupted and shutdown. This service will be unusable
324 * after this method has been called (i.e. future scheduling attempts will fail).
326 public synchronized void shutdown() {
327 scheduler.shutdownNow();
328 if (LOG.isInfoEnabled()) {
329 LOG.info("Chore service for: " + coreThreadPoolPrefix + " had " + scheduledChores.keySet()
330 + " on shutdown");
332 cancelAllChores(true);
333 scheduledChores.clear();
334 choresMissingStartTime.clear();
338 * @return true when the service is shutdown and thus cannot be used anymore
340 public boolean isShutdown() {
341 return scheduler.isShutdown();
345 * @return true when the service is shutdown and all threads have terminated
347 public boolean isTerminated() {
348 return scheduler.isTerminated();
351 private void cancelAllChores(final boolean mayInterruptIfRunning) {
352 // Build list of chores to cancel so we can iterate through a set that won't change
353 // as chores are cancelled. If we tried to cancel each chore while iterating through
354 // keySet the results would be undefined because the keySet would be changing
355 ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet());
357 for (ScheduledChore chore : choresToCancel) {
358 cancelChore(chore, mayInterruptIfRunning);
363 * Prints a summary of important details about the chore. Used for debugging purposes
365 private void printChoreDetails(final String header, ScheduledChore chore) {
366 LinkedHashMap<String, String> output = new LinkedHashMap<>();
367 output.put(header, "");
368 output.put("Chore name: ", chore.getName());
369 output.put("Chore period: ", Integer.toString(chore.getPeriod()));
370 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
372 for (Entry<String, String> entry : output.entrySet()) {
373 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());
378 * Prints a summary of important details about the service. Used for debugging purposes
380 private void printChoreServiceDetails(final String header) {
381 LinkedHashMap<String, String> output = new LinkedHashMap<>();
382 output.put(header, "");
383 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
384 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
385 output.put("ChoreService missingStartTimeCount: ",
386 Integer.toString(getNumberOfChoresMissingStartTime()));
388 for (Entry<String, String> entry : output.entrySet()) {
389 if (LOG.isTraceEnabled()) LOG.trace(entry.getKey() + entry.getValue());