3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import java
.util
.concurrent
.ScheduledThreadPoolExecutor
;
22 import java
.util
.concurrent
.TimeUnit
;
24 import org
.apache
.hadoop
.hbase
.util
.MovingAverage
;
25 import org
.apache
.hadoop
.hbase
.util
.WindowMovingAverage
;
26 import org
.apache
.yetus
.audience
.InterfaceAudience
;
27 import org
.slf4j
.Logger
;
28 import org
.slf4j
.LoggerFactory
;
29 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.annotations
.VisibleForTesting
;
32 * ScheduledChore is a task performed on a period in hbase. ScheduledChores become active once
33 * scheduled with a {@link ChoreService} via {@link ChoreService#scheduleChore(ScheduledChore)}. The
34 * chore is run in a {@link ScheduledThreadPoolExecutor} and competes with other ScheduledChores for
35 * access to the threads in the core thread pool. If an unhandled exception occurs, the chore
36 * cancellation is logged. Implementers should consider whether or not the Chore will be able to
37 * execute within the defined period. It is bad practice to define a ScheduledChore whose execution
38 * time exceeds its period since it will try to hog one of the threads in the {@link ChoreService}'s
41 * Don't subclass ScheduledChore if the task relies on being woken up for something to do, such as
42 * an entry being added to a queue, etc.
44 @InterfaceAudience.Public
45 public abstract class ScheduledChore
implements Runnable
{
46 private static final Logger LOG
= LoggerFactory
.getLogger(ScheduledChore
.class);
48 private final String name
;
51 * Default values for scheduling parameters should they be excluded during construction
53 private final static TimeUnit DEFAULT_TIME_UNIT
= TimeUnit
.MILLISECONDS
;
54 private final static long DEFAULT_INITIAL_DELAY
= 0;
57 * Scheduling parameters. Used by ChoreService when scheduling the chore to run periodically
59 private final int period
; // in TimeUnit units
60 private final TimeUnit timeUnit
;
61 private final long initialDelay
; // in TimeUnit units
64 * Interface to the ChoreService that this ScheduledChore is scheduled with. null if the chore is
67 private ChoreServicer choreServicer
;
70 * Variables that encapsulate the meaningful state information
72 private long timeOfLastRun
= -1; // system time millis
73 private long timeOfThisRun
= -1; // system time millis
74 private boolean initialChoreComplete
= false;
77 * A means by which a ScheduledChore can be stopped. Once a chore recognizes that it has been
78 * stopped, it will cancel itself. This is particularly useful in the case where a single stopper
79 * instance is given to multiple chores. In such a case, a single {@link Stoppable#stop(String)}
80 * command can cause many chores to stop together.
82 private final Stoppable stopper
;
84 private final MovingAverage
<Void
> timeMeasurement
;
85 private static final long FIVE_MINUTES_IN_NANOS
= TimeUnit
.MINUTES
.toNanos(5L);
86 private long lastLog
= System
.nanoTime();
88 interface ChoreServicer
{
90 * Cancel any ongoing schedules that this chore has with the implementer of this interface.
92 public void cancelChore(ScheduledChore chore
);
93 public void cancelChore(ScheduledChore chore
, boolean mayInterruptIfRunning
);
96 * @return true when the chore is scheduled with the implementer of this interface
98 public boolean isChoreScheduled(ScheduledChore chore
);
101 * This method tries to execute the chore immediately. If the chore is executing at the time of
102 * this call, the chore will begin another execution as soon as the current execution finishes
104 * If the chore is not scheduled with a ChoreService, this call will fail.
105 * @return false when the chore could not be triggered immediately
107 public boolean triggerNow(ScheduledChore chore
);
110 * A callback that tells the implementer of this interface that one of the scheduled chores is
111 * missing its start time. The implication of a chore missing its start time is that the
112 * service's current means of scheduling may not be sufficient to handle the number of ongoing
113 * chores (the other explanation is that the chore's execution time is greater than its
114 * scheduled period). The service should try to increase its concurrency when this callback is
116 * @param chore The chore that missed its start time
118 public void onChoreMissedStartTime(ScheduledChore chore
);
122 * This constructor is for test only. It allows us to create an object and to call chore() on it.
124 @InterfaceAudience.Private
126 protected ScheduledChore() {
127 this("TestChore", null, 0, DEFAULT_INITIAL_DELAY
, DEFAULT_TIME_UNIT
);
131 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
132 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
133 * @param period Period in millis with which this Chore repeats execution when scheduled.
135 public ScheduledChore(final String name
, Stoppable stopper
, final int period
) {
136 this(name
, stopper
, period
, DEFAULT_INITIAL_DELAY
);
140 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
141 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
142 * @param period Period in millis with which this Chore repeats execution when scheduled.
143 * @param initialDelay Delay before this Chore begins to execute once it has been scheduled. A
144 * value of 0 means the chore will begin to execute immediately. Negative delays are
145 * invalid and will be corrected to a value of 0.
147 public ScheduledChore(final String name
, Stoppable stopper
, final int period
,
148 final long initialDelay
) {
149 this(name
, stopper
, period
, initialDelay
, DEFAULT_TIME_UNIT
);
153 * @param name Name assigned to Chore. Useful for identification amongst chores of the same type
154 * @param stopper When {@link Stoppable#isStopped()} is true, this chore will cancel and cleanup
155 * @param period Period in Timeunit unit with which this Chore repeats execution when scheduled.
156 * @param initialDelay Delay in Timeunit unit before this Chore begins to execute once it has been
157 * scheduled. A value of 0 means the chore will begin to execute immediately. Negative
158 * delays are invalid and will be corrected to a value of 0.
159 * @param unit The unit that is used to measure period and initialDelay
161 public ScheduledChore(final String name
, Stoppable stopper
, final int period
,
162 final long initialDelay
, final TimeUnit unit
) {
164 this.stopper
= stopper
;
165 this.period
= period
;
166 this.initialDelay
= initialDelay
< 0 ?
0 : initialDelay
;
167 this.timeUnit
= unit
;
168 this.timeMeasurement
= new WindowMovingAverage(name
);
172 * @see java.lang.Runnable#run()
176 updateTimeTrackingBeforeRun();
177 if (missedStartTime() && isScheduled()) {
178 onChoreMissedStartTime();
179 if (LOG
.isInfoEnabled()) LOG
.info("Chore: " + getName() + " missed its start time");
180 } else if (stopper
.isStopped() || !isScheduled()) {
183 if (LOG
.isInfoEnabled()) LOG
.info("Chore: " + getName() + " was stopped");
186 if (!initialChoreComplete
) {
187 initialChoreComplete
= initialChore();
189 timeMeasurement
.measure(() -> {
193 if (LOG
.isInfoEnabled() && (System
.nanoTime() - lastLog
> FIVE_MINUTES_IN_NANOS
)) {
194 LOG
.info("{} average execution time: {} ns.", getName(),
195 (long)(timeMeasurement
.getAverageTime()));
196 lastLog
= System
.nanoTime();
199 } catch (Throwable t
) {
200 if (LOG
.isErrorEnabled()) LOG
.error("Caught error", t
);
201 if (this.stopper
.isStopped()) {
210 * Update our time tracking members. Called at the start of an execution of this chore's run()
211 * method so that a correct decision can be made as to whether or not we missed the start time
213 private synchronized void updateTimeTrackingBeforeRun() {
214 timeOfLastRun
= timeOfThisRun
;
215 timeOfThisRun
= System
.currentTimeMillis();
219 * Notify the ChoreService that this chore has missed its start time. Allows the ChoreService to
220 * make the decision as to whether or not it would be worthwhile to increase the number of core
223 private synchronized void onChoreMissedStartTime() {
224 if (choreServicer
!= null) choreServicer
.onChoreMissedStartTime(this);
228 * @return How long in millis has it been since this chore last run. Useful for checking if the
229 * chore has missed its scheduled start time by too large of a margin
231 synchronized long getTimeBetweenRuns() {
232 return timeOfThisRun
- timeOfLastRun
;
236 * @return true when the time between runs exceeds the acceptable threshold
238 private synchronized boolean missedStartTime() {
239 return isValidTime(timeOfLastRun
) && isValidTime(timeOfThisRun
)
240 && getTimeBetweenRuns() > getMaximumAllowedTimeBetweenRuns();
244 * @return max allowed time in millis between runs.
246 private double getMaximumAllowedTimeBetweenRuns() {
247 // Threshold used to determine if the Chore's current run started too late
248 return 1.5 * timeUnit
.toMillis(period
);
252 * @param time in system millis
253 * @return true if time is earlier or equal to current milli time
255 private synchronized boolean isValidTime(final long time
) {
256 return time
> 0 && time
<= System
.currentTimeMillis();
260 * @return false when the Chore is not currently scheduled with a ChoreService
262 public synchronized boolean triggerNow() {
263 if (choreServicer
!= null) {
264 return choreServicer
.triggerNow(this);
270 synchronized void setChoreServicer(ChoreServicer service
) {
271 // Chores should only ever be scheduled with a single ChoreService. If the choreServicer
272 // is changing, cancel any existing schedules of this chore.
273 if (choreServicer
!= null && choreServicer
!= service
) {
274 choreServicer
.cancelChore(this, false);
276 choreServicer
= service
;
277 timeOfThisRun
= System
.currentTimeMillis();
280 public synchronized void cancel() {
284 public synchronized void cancel(boolean mayInterruptIfRunning
) {
285 if (isScheduled()) choreServicer
.cancelChore(this, mayInterruptIfRunning
);
287 choreServicer
= null;
290 public String
getName() {
294 public Stoppable
getStopper() {
299 * @return period to execute chore in getTimeUnit() units
301 public int getPeriod() {
306 * @return initial delay before executing chore in getTimeUnit() units
308 public long getInitialDelay() {
312 public TimeUnit
getTimeUnit() {
316 public synchronized boolean isInitialChoreComplete() {
317 return initialChoreComplete
;
321 synchronized ChoreServicer
getChoreServicer() {
322 return choreServicer
;
326 synchronized long getTimeOfLastRun() {
327 return timeOfLastRun
;
331 synchronized long getTimeOfThisRun() {
332 return timeOfThisRun
;
336 * @return true when this Chore is scheduled with a ChoreService
338 public synchronized boolean isScheduled() {
339 return choreServicer
!= null && choreServicer
.isChoreScheduled(this);
342 @InterfaceAudience.Private
344 public synchronized void choreForTesting() {
349 * The task to execute on each scheduled execution of the Chore
351 protected abstract void chore();
354 * Override to run a task before we start looping.
355 * @return true if initial chore was successful
357 protected boolean initialChore() {
358 // Default does nothing
363 * Override to run cleanup tasks when the Chore encounters an error and must stop running
365 protected synchronized void cleanup() {
369 * A summation of this chore in human readable format. Downstream users should not presume
370 * parsing of this string can relaibly be done between versions. Instead, they should rely
371 * on the public accessor methods to get the information they desire.
373 @InterfaceAudience.Private
375 public String
toString() {
376 return "[ScheduledChore: Name: " + getName() + " Period: " + getPeriod() + " Unit: "
377 + getTimeUnit() + "]";