HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / ChoreService.java
blob8786c5af554f121cd2fa8b6e1a1205c5b4924044
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import com.google.errorprone.annotations.RestrictedApi;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.LinkedHashMap;
25 import java.util.Map.Entry;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import org.apache.yetus.audience.InterfaceAudience;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 /**
35 * ChoreService is a service that can be used to schedule instances of {@link ScheduledChore} to run
36 * periodically while sharing threads. The ChoreService is backed by a
37 * {@link ScheduledThreadPoolExecutor} whose core pool size changes dynamically depending on the
38 * number of {@link ScheduledChore} scheduled. All of the threads in the core thread pool of the
39 * underlying {@link ScheduledThreadPoolExecutor} are set to be daemon threads.
40 * <p>
41 * The ChoreService provides the ability to schedule, cancel, and trigger instances of
42 * {@link ScheduledChore}. The ChoreService also provides the ability to check on the status of
43 * scheduled chores. The number of threads used by the ChoreService changes based on the scheduling
44 * load and whether or not the scheduled chores are executing on time. As more chores are scheduled,
45 * there may be a need to increase the number of threads if it is noticed that chores are no longer
46 * meeting their scheduled start times. On the other hand, as chores are cancelled, an attempt is
47 * made to reduce the number of running threads to see if chores can still meet their start times
48 * with a smaller thread pool.
49 * <p>
50 * When finished with a ChoreService it is good practice to call {@link ChoreService#shutdown()}.
51 * Calling this method ensures that all scheduled chores are cancelled and cleaned up properly.
53 @InterfaceAudience.Private
54 public class ChoreService {
55 private static final Logger LOG = LoggerFactory.getLogger(ChoreService.class);
57 /**
58 * The minimum number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
60 @InterfaceAudience.Private
61 public final static int MIN_CORE_POOL_SIZE = 1;
63 /**
64 * This thread pool is used to schedule all of the Chores
66 private final ScheduledThreadPoolExecutor scheduler;
68 /**
69 * Maps chores to their futures. Futures are used to control a chore's schedule
71 private final HashMap<ScheduledChore, ScheduledFuture<?>> scheduledChores;
73 /**
74 * Maps chores to Booleans which indicate whether or not a chore has caused an increase in the
75 * core pool size of the ScheduledThreadPoolExecutor. Each chore should only be allowed to
76 * increase the core pool size by 1 (otherwise a single long running chore whose execution is
77 * longer than its period would be able to spawn too many threads).
79 private final HashMap<ScheduledChore, Boolean> choresMissingStartTime;
81 /**
82 * The coreThreadPoolPrefix is the prefix that will be applied to all threads within the
83 * ScheduledThreadPoolExecutor. The prefix is typically related to the Server that the service is
84 * running on. The prefix is useful because it allows us to monitor how the thread pool of a
85 * particular service changes over time VIA thread dumps.
87 private final String coreThreadPoolPrefix;
89 /**
91 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
92 * spawned by this service
94 @InterfaceAudience.Private
95 public ChoreService(final String coreThreadPoolPrefix) {
96 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, false);
99 /**
100 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
101 * spawned by this service
102 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
103 * to true this will add -10% to 10% jitter.
105 public ChoreService(final String coreThreadPoolPrefix, final boolean jitter) {
106 this(coreThreadPoolPrefix, MIN_CORE_POOL_SIZE, jitter);
110 * @param coreThreadPoolPrefix Prefix that will be applied to the Thread name of all threads
111 * spawned by this service
112 * @param corePoolSize The initial size to set the core pool of the ScheduledThreadPoolExecutor
113 * to during initialization. The default size is 1, but specifying a larger size may be
114 * beneficial if you know that 1 thread will not be enough.
115 * @param jitter Should chore service add some jitter for all of the scheduled chores. When set
116 * to true this will add -10% to 10% jitter.
118 public ChoreService(final String coreThreadPoolPrefix, int corePoolSize, boolean jitter) {
119 this.coreThreadPoolPrefix = coreThreadPoolPrefix;
120 if (corePoolSize < MIN_CORE_POOL_SIZE) {
121 corePoolSize = MIN_CORE_POOL_SIZE;
124 final ThreadFactory threadFactory = new ChoreServiceThreadFactory(coreThreadPoolPrefix);
125 if (jitter) {
126 scheduler = new JitterScheduledThreadPoolExecutorImpl(corePoolSize, threadFactory, 0.1);
127 } else {
128 scheduler = new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
131 scheduler.setRemoveOnCancelPolicy(true);
132 scheduledChores = new HashMap<>();
133 choresMissingStartTime = new HashMap<>();
137 * @param chore Chore to be scheduled. If the chore is already scheduled with another ChoreService
138 * instance, that schedule will be cancelled (i.e. a Chore can only ever be scheduled
139 * with a single ChoreService instance).
140 * @return true when the chore was successfully scheduled. false when the scheduling failed
141 * (typically occurs when a chore is scheduled during shutdown of service)
143 public boolean scheduleChore(ScheduledChore chore) {
144 if (chore == null) {
145 return false;
147 // always lock chore first to prevent dead lock
148 synchronized (chore) {
149 synchronized (this) {
150 try {
151 // Chores should only ever be scheduled with a single ChoreService. If the choreService
152 // is changing, cancel any existing schedules of this chore.
153 if (chore.getChoreService() == this) {
154 LOG.warn("Chore {} has already been scheduled with us", chore);
155 return false;
157 if (chore.getPeriod() <= 0) {
158 LOG.info("Chore {} is disabled because its period is not positive.", chore);
159 return false;
161 LOG.info("Chore {} is enabled.", chore);
162 if (chore.getChoreService() != null) {
163 LOG.info("Cancel chore {} from its previous service", chore);
164 chore.getChoreService().cancelChore(chore);
166 chore.setChoreService(this);
167 ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
168 chore.getPeriod(), chore.getTimeUnit());
169 scheduledChores.put(chore, future);
170 return true;
171 } catch (Exception e) {
172 LOG.error("Could not successfully schedule chore: {}", chore.getName(), e);
173 return false;
180 * @param chore The Chore to be rescheduled. If the chore is not scheduled with this ChoreService
181 * yet then this call is equivalent to a call to scheduleChore.
183 private void rescheduleChore(ScheduledChore chore) {
184 if (scheduledChores.containsKey(chore)) {
185 ScheduledFuture<?> future = scheduledChores.get(chore);
186 future.cancel(false);
188 ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
189 chore.getPeriod(), chore.getTimeUnit());
190 scheduledChores.put(chore, future);
194 * Cancel any ongoing schedules that this chore has with the implementer of this interface.
195 * <p/>
196 * Call {@link ScheduledChore#cancel()} to cancel a {@link ScheduledChore}, in
197 * {@link ScheduledChore#cancel()} method we will call this method to remove the
198 * {@link ScheduledChore} from this {@link ChoreService}.
200 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
201 allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
202 synchronized void cancelChore(ScheduledChore chore) {
203 cancelChore(chore, true);
207 * Cancel any ongoing schedules that this chore has with the implementer of this interface.
208 * <p/>
209 * Call {@link ScheduledChore#cancel(boolean)} to cancel a {@link ScheduledChore}, in
210 * {@link ScheduledChore#cancel(boolean)} method we will call this method to remove the
211 * {@link ScheduledChore} from this {@link ChoreService}.
213 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
214 allowedOnPath = ".*/org/apache/hadoop/hbase/(ScheduledChore|ChoreService).java")
215 synchronized void cancelChore(ScheduledChore chore, boolean mayInterruptIfRunning) {
216 if (scheduledChores.containsKey(chore)) {
217 ScheduledFuture<?> future = scheduledChores.get(chore);
218 future.cancel(mayInterruptIfRunning);
219 scheduledChores.remove(chore);
221 // Removing a chore that was missing its start time means it may be possible
222 // to reduce the number of threads
223 if (choresMissingStartTime.containsKey(chore)) {
224 choresMissingStartTime.remove(chore);
225 requestCorePoolDecrease();
231 * @return true when the chore is scheduled with the implementer of this interface
233 @InterfaceAudience.Private
234 public synchronized boolean isChoreScheduled(ScheduledChore chore) {
235 return chore != null && scheduledChores.containsKey(chore)
236 && !scheduledChores.get(chore).isDone();
240 * This method tries to execute the chore immediately. If the chore is executing at the time of
241 * this call, the chore will begin another execution as soon as the current execution finishes
243 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
244 allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
245 synchronized void triggerNow(ScheduledChore chore) {
246 assert chore.getChoreService() == this;
247 rescheduleChore(chore);
251 * @return number of chores that this service currently has scheduled
253 int getNumberOfScheduledChores() {
254 return scheduledChores.size();
258 * @return number of chores that this service currently has scheduled that are missing their
259 * scheduled start time
261 int getNumberOfChoresMissingStartTime() {
262 return choresMissingStartTime.size();
266 * @return number of threads in the core pool of the underlying ScheduledThreadPoolExecutor
268 int getCorePoolSize() {
269 return scheduler.getCorePoolSize();
273 * Custom ThreadFactory used with the ScheduledThreadPoolExecutor so that all the threads are
274 * daemon threads, and thus, don't prevent the JVM from shutting down
276 static class ChoreServiceThreadFactory implements ThreadFactory {
277 private final String threadPrefix;
278 private final static String THREAD_NAME_SUFFIX = ".Chore.";
279 private AtomicInteger threadNumber = new AtomicInteger(1);
282 * @param threadPrefix The prefix given to all threads created by this factory
284 public ChoreServiceThreadFactory(final String threadPrefix) {
285 this.threadPrefix = threadPrefix;
288 @Override
289 public Thread newThread(Runnable r) {
290 Thread thread =
291 new Thread(r, threadPrefix + THREAD_NAME_SUFFIX + threadNumber.getAndIncrement());
292 thread.setDaemon(true);
293 return thread;
298 * Represents a request to increase the number of core pool threads. Typically a request
299 * originates from the fact that the current core pool size is not sufficient to service all of
300 * the currently running Chores
301 * @return true when the request to increase the core pool size succeeds
303 private synchronized boolean requestCorePoolIncrease() {
304 // There is no point in creating more threads than scheduledChores.size since scheduled runs
305 // of the same chore cannot run concurrently (i.e. happen-before behavior is enforced
306 // amongst occurrences of the same chore).
307 if (scheduler.getCorePoolSize() < scheduledChores.size()) {
308 scheduler.setCorePoolSize(scheduler.getCorePoolSize() + 1);
309 printChoreServiceDetails("requestCorePoolIncrease");
310 return true;
312 return false;
316 * Represents a request to decrease the number of core pool threads. Typically a request
317 * originates from the fact that the current core pool size is more than sufficient to service the
318 * running Chores.
320 private synchronized void requestCorePoolDecrease() {
321 if (scheduler.getCorePoolSize() > MIN_CORE_POOL_SIZE) {
322 scheduler.setCorePoolSize(scheduler.getCorePoolSize() - 1);
323 printChoreServiceDetails("requestCorePoolDecrease");
328 * A callback that tells the implementer of this interface that one of the scheduled chores is
329 * missing its start time. The implication of a chore missing its start time is that the service's
330 * current means of scheduling may not be sufficient to handle the number of ongoing chores (the
331 * other explanation is that the chore's execution time is greater than its scheduled period). The
332 * service should try to increase its concurrency when this callback is received.
333 * @param chore The chore that missed its start time
335 @RestrictedApi(explanation = "Should only be called in ScheduledChore", link = "",
336 allowedOnPath = ".*/org/apache/hadoop/hbase/ScheduledChore.java")
337 synchronized void onChoreMissedStartTime(ScheduledChore chore) {
338 if (!scheduledChores.containsKey(chore)) {
339 return;
342 // If the chore has not caused an increase in the size of the core thread pool then request an
343 // increase. This allows each chore missing its start time to increase the core pool size by
344 // at most 1.
345 if (!choresMissingStartTime.containsKey(chore) || !choresMissingStartTime.get(chore)) {
346 choresMissingStartTime.put(chore, requestCorePoolIncrease());
349 // Must reschedule the chore to prevent unnecessary delays of chores in the scheduler. If
350 // the chore is NOT rescheduled, future executions of this chore will be delayed more and
351 // more on each iteration. This hurts us because the ScheduledThreadPoolExecutor allocates
352 // idle threads to chores based on how delayed they are.
353 rescheduleChore(chore);
354 printChoreDetails("onChoreMissedStartTime", chore);
358 * shutdown the service. Any chores that are scheduled for execution will be cancelled. Any chores
359 * in the middle of execution will be interrupted and shutdown. This service will be unusable
360 * after this method has been called (i.e. future scheduling attempts will fail).
361 * <p/>
362 * Notice that, this will only clean the chore from this ChoreService but you could still schedule
363 * the chore with other ChoreService.
365 public synchronized void shutdown() {
366 if (isShutdown()) {
367 return;
369 scheduler.shutdownNow();
370 LOG.info("Chore service for: {} had {} on shutdown", coreThreadPoolPrefix,
371 scheduledChores.keySet());
372 cancelAllChores(true);
373 scheduledChores.clear();
374 choresMissingStartTime.clear();
378 * @return true when the service is shutdown and thus cannot be used anymore
380 public boolean isShutdown() {
381 return scheduler.isShutdown();
385 * @return true when the service is shutdown and all threads have terminated
387 public boolean isTerminated() {
388 return scheduler.isTerminated();
391 private void cancelAllChores(final boolean mayInterruptIfRunning) {
392 // Build list of chores to cancel so we can iterate through a set that won't change
393 // as chores are cancelled. If we tried to cancel each chore while iterating through
394 // keySet the results would be undefined because the keySet would be changing
395 ArrayList<ScheduledChore> choresToCancel = new ArrayList<>(scheduledChores.keySet());
397 for (ScheduledChore chore : choresToCancel) {
398 cancelChore(chore, mayInterruptIfRunning);
403 * Prints a summary of important details about the chore. Used for debugging purposes
405 private void printChoreDetails(final String header, ScheduledChore chore) {
406 if (!LOG.isTraceEnabled()) {
407 return;
409 LinkedHashMap<String, String> output = new LinkedHashMap<>();
410 output.put(header, "");
411 output.put("Chore name: ", chore.getName());
412 output.put("Chore period: ", Integer.toString(chore.getPeriod()));
413 output.put("Chore timeBetweenRuns: ", Long.toString(chore.getTimeBetweenRuns()));
415 for (Entry<String, String> entry : output.entrySet()) {
416 LOG.trace(entry.getKey() + entry.getValue());
421 * Prints a summary of important details about the service. Used for debugging purposes
423 private void printChoreServiceDetails(final String header) {
424 if (!LOG.isTraceEnabled()) {
425 return;
427 LinkedHashMap<String, String> output = new LinkedHashMap<>();
428 output.put(header, "");
429 output.put("ChoreService corePoolSize: ", Integer.toString(getCorePoolSize()));
430 output.put("ChoreService scheduledChores: ", Integer.toString(getNumberOfScheduledChores()));
431 output.put("ChoreService missingStartTimeCount: ",
432 Integer.toString(getNumberOfChoresMissingStartTime()));
434 for (Entry<String, String> entry : output.entrySet()) {
435 LOG.trace(entry.getKey() + entry.getValue());