3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import com
.google
.errorprone
.annotations
.RestrictedApi
;
22 import java
.util
.concurrent
.ScheduledThreadPoolExecutor
;
23 import java
.util
.concurrent
.TimeUnit
;
25 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
26 import org
.apache
.yetus
.audience
.InterfaceAudience
;
27 import org
.slf4j
.Logger
;
28 import org
.slf4j
.LoggerFactory
;
31 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
32 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
33 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
34 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
35 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
36 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
37 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
40 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
41 * an entry being added to a queue, etc.
43 @InterfaceAudience.Private
44 public abstract class ScheduledChore
implements Runnable
{
45 private static final Logger LOG
= LoggerFactory
.getLogger(ScheduledChore
.class);
47 private final String name
;
50 * Default values for scheduling parameters should they be excluded during construction
52 private final static TimeUnit DEFAULT_TIME_UNIT
= TimeUnit
.MILLISECONDS
;
53 private final static long DEFAULT_INITIAL_DELAY
= 0;
56 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
58 private final int period
; // in TimeUnit units
59 private final TimeUnit timeUnit
;
60 private final long initialDelay
; // in TimeUnit units
63 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
66 private ChoreService choreService
;
69 * Variables that encapsulate the meaningful state information
71 private long timeOfLastRun
= -1; // system time millis
72 private long timeOfThisRun
= -1; // system time millis
73 private boolean initialChoreComplete
= false;
76 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
77 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
78 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
79 * command can cause many chores to stop together.
81 private final Stoppable stopper
;
84 * This constructor is for test only. It allows us to create an object and to call chore() on it.
86 @InterfaceAudience.Private
87 protected ScheduledChore() {
88 this("TestChore", null, 0, DEFAULT_INITIAL_DELAY
, DEFAULT_TIME_UNIT
);
92 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
93 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
94 * @param period Period in millis with which this Chore repeats execution when scheduled.
96 public ScheduledChore(final String name
, Stoppable stopper
, final int period
) {
97 this(name
, stopper
, period
, DEFAULT_INITIAL_DELAY
);
101 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
102 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
103 * @param period Period in millis with which this Chore repeats execution when scheduled.
104 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
105 * value of 0 means the chore will begin to execute immediately. Negative delays are
106 * invalid and will be corrected to a value of 0.
108 public ScheduledChore(final String name
, Stoppable stopper
, final int period
,
109 final long initialDelay
) {
110 this(name
, stopper
, period
, initialDelay
, DEFAULT_TIME_UNIT
);
114 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
115 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
116 * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled.
117 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
118 * scheduled. A value of 0 means the chore will begin to execute immediately. Negative
119 * delays are invalid and will be corrected to a value of 0.
120 * @param unit The unit that is used to measure period and initialDelay
122 public ScheduledChore(final String name
, Stoppable stopper
, final int period
,
123 final long initialDelay
, final TimeUnit unit
) {
125 this.stopper
= stopper
;
126 this.period
= period
;
127 this.initialDelay
= initialDelay
< 0 ?
0 : initialDelay
;
128 this.timeUnit
= unit
;
132 * @see java.lang.Runnable#run()
136 updateTimeTrackingBeforeRun();
137 if (missedStartTime() && isScheduled()) {
138 onChoreMissedStartTime();
139 LOG
.info("Chore: {} missed its start time", getName());
140 } else if (stopper
.isStopped() || !isScheduled()) {
141 // call shutdown here to cleanup the ScheduledChore.
143 LOG
.info("Chore: {} was stopped", getName());
146 // TODO: Histogram metrics per chore name.
147 // For now, just measure and log if DEBUG level logging is enabled.
149 if (LOG
.isDebugEnabled()) {
150 start
= System
.nanoTime();
152 if (!initialChoreComplete
) {
153 initialChoreComplete
= initialChore();
157 if (LOG
.isDebugEnabled() && start
> 0) {
158 long end
= System
.nanoTime();
159 LOG
.debug("{} execution time: {} ms.", getName(),
160 TimeUnit
.NANOSECONDS
.toMillis(end
- start
));
162 } catch (Throwable t
) {
163 LOG
.error("Caught error", t
);
164 if (this.stopper
.isStopped()) {
172 * Update our time tracking members. Called at the start of an execution of this chore's run()
173 * method so that a correct decision can be made as to whether or not we missed the start time
175 private synchronized void updateTimeTrackingBeforeRun() {
176 timeOfLastRun
= timeOfThisRun
;
177 timeOfThisRun
= EnvironmentEdgeManager
.currentTime();
181 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
182 * make the decision as to whether or not it would be worthwhile to increase the number of core
185 private synchronized void onChoreMissedStartTime() {
186 if (choreService
!= null) {
187 choreService
.onChoreMissedStartTime(this);
192 * @return How long in millis has it been since this chore last run. Useful for checking if the
193 * chore has missed its scheduled start time by too large of a margin
195 synchronized long getTimeBetweenRuns() {
196 return timeOfThisRun
- timeOfLastRun
;
200 * @return true when the time between runs exceeds the acceptable threshold
202 private synchronized boolean missedStartTime() {
203 return isValidTime(timeOfLastRun
) && isValidTime(timeOfThisRun
)
204 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
208 * @return max allowed time in millis between runs.
210 private double getMaximumAllowedTimeBetweenRuns() {
211 // Threshold used to determine if the Chore's current run started too late
212 return 1.5 * timeUnit
.toMillis(period
);
216 * @param time in system millis
217 * @return true if time is earlier or equal to current milli time
219 private synchronized boolean isValidTime(final long time
) {
220 return time
> 0 && time
<= EnvironmentEdgeManager
.currentTime();
224 * @return false when the Chore is not currently scheduled with a ChoreService
226 public synchronized boolean triggerNow() {
227 if (choreService
== null) {
230 choreService
.triggerNow(this);
234 @RestrictedApi(explanation
= "Should only be called in ChoreService", link
= "",
235 allowedOnPath
= ".*/org/apache/hadoop/hbase/ChoreService.java")
236 synchronized void setChoreService(ChoreService service
) {
237 choreService
= service
;
241 public synchronized void cancel() {
245 public synchronized void cancel(boolean mayInterruptIfRunning
) {
247 choreService
.cancelChore(this, mayInterruptIfRunning
);
252 public String
getName() {
256 public Stoppable
getStopper() {
261 * @return period to execute chore in getTimeUnit() units
263 public int getPeriod() {
268 * @return initial delay before executing chore in getTimeUnit() units
270 public long getInitialDelay() {
274 public TimeUnit
getTimeUnit() {
278 public synchronized boolean isInitialChoreComplete() {
279 return initialChoreComplete
;
282 synchronized ChoreService
getChoreService() {
286 synchronized long getTimeOfLastRun() {
287 return timeOfLastRun
;
290 synchronized long getTimeOfThisRun() {
291 return timeOfThisRun
;
295 * @return true when this Chore is scheduled with a ChoreService
297 public synchronized boolean isScheduled() {
298 return choreService
!= null && choreService
.isChoreScheduled(this);
301 @InterfaceAudience.Private
302 @RestrictedApi(explanation
= "Should only be called in tests", link
= "",
303 allowedOnPath
= ".*/src/test/.*")
304 public synchronized void choreForTesting() {
309 * The task to execute on each scheduled execution of the Chore
311 protected abstract void chore();
314 * Override to run a task before we start looping.
315 * @return true if initial chore was successful
317 protected boolean initialChore() {
318 // Default does nothing
323 * Override to run cleanup tasks when the Chore encounters an error and must stop running
325 protected void cleanup() {
329 * Call {@link #shutdown(boolean)} with {@code true}.
330 * @see ScheduledChore#shutdown(boolean)
332 public synchronized void shutdown() {
337 * Completely shutdown the ScheduleChore, which means we will call cleanup and you should not
340 * This is another path to cleanup the chore, comparing to stop the stopper instance passed in.
342 public synchronized void shutdown(boolean mayInterruptIfRunning
) {
343 cancel(mayInterruptIfRunning
);
348 * A summation of this chore in human readable format. Downstream users should not presume
349 * parsing of this string can relaibly be done between versions. Instead, they should rely
350 * on the public accessor methods to get the information they desire.
352 @InterfaceAudience.Private
354 public String
toString() {
355 return "ScheduledChore name=" + getName() + ", period=" + getPeriod() +
356 ", unit=" + getTimeUnit();