2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure2
;
20 import java
.util
.Iterator
;
21 import java
.util
.concurrent
.TimeUnit
;
22 import java
.util
.concurrent
.locks
.Condition
;
23 import java
.util
.concurrent
.locks
.ReentrantLock
;
25 import org
.apache
.yetus
.audience
.InterfaceAudience
;
26 import org
.slf4j
.Logger
;
27 import org
.slf4j
.LoggerFactory
;
29 @InterfaceAudience.Private
30 public abstract class AbstractProcedureScheduler
implements ProcedureScheduler
{
31 private static final Logger LOG
= LoggerFactory
.getLogger(AbstractProcedureScheduler
.class);
32 private final ReentrantLock schedulerLock
= new ReentrantLock();
33 private final Condition schedWaitCond
= schedulerLock
.newCondition();
34 private boolean running
= false;
37 private long pollCalls
= 0;
38 private long nullPollCalls
= 0;
55 schedWaitCond
.signalAll();
62 public void signalAll() {
65 schedWaitCond
.signalAll();
71 // ==========================================================================
73 // ==========================================================================
75 * Add the procedure to the queue.
76 * NOTE: this method is called with the sched lock held.
77 * @param procedure the Procedure to add
78 * @param addFront true if the item should be added to the front of the queue
80 protected abstract void enqueue(Procedure procedure
, boolean addFront
);
83 public void addFront(final Procedure procedure
) {
84 push(procedure
, true, true);
88 public void addFront(final Procedure procedure
, boolean notify
) {
89 push(procedure
, true, notify
);
93 public void addFront(Iterator
<Procedure
> procedureIterator
) {
97 while (procedureIterator
.hasNext()) {
98 Procedure procedure
= procedureIterator
.next();
99 if (LOG
.isTraceEnabled()) {
100 LOG
.trace("Wake " + procedure
);
102 push(procedure
, /* addFront= */ true, /* notify= */false);
105 wakePollIfNeeded(count
);
112 public void addBack(final Procedure procedure
) {
113 push(procedure
, false, true);
117 public void addBack(final Procedure procedure
, boolean notify
) {
118 push(procedure
, false, notify
);
121 protected void push(final Procedure procedure
, final boolean addFront
, final boolean notify
) {
124 enqueue(procedure
, addFront
);
126 schedWaitCond
.signal();
133 // ==========================================================================
135 // ==========================================================================
137 * Fetch one Procedure from the queue
138 * NOTE: this method is called with the sched lock held.
139 * @return the Procedure to execute, or null if nothing is available.
141 protected abstract Procedure
dequeue();
144 public Procedure
poll() {
149 public Procedure
poll(long timeout
, TimeUnit unit
) {
150 return poll(unit
.toNanos(timeout
));
153 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
154 public Procedure
poll(final long nanos
) {
158 LOG
.debug("the scheduler is not running");
162 if (!queueHasRunnables()) {
163 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
164 // to take decisions after a wake/interruption.
166 schedWaitCond
.await();
168 schedWaitCond
.awaitNanos(nanos
);
170 if (!queueHasRunnables()) {
175 final Procedure pollResult
= dequeue();
178 nullPollCalls
+= (pollResult
== null) ?
1 : 0;
180 } catch (InterruptedException e
) {
181 Thread
.currentThread().interrupt();
189 // ==========================================================================
191 // ==========================================================================
193 * Returns the number of elements in this queue.
194 * NOTE: this method is called with the sched lock held.
195 * @return the number of elements in this queue.
197 protected abstract int queueSize();
200 * Returns true if there are procedures available to process.
201 * NOTE: this method is called with the sched lock held.
202 * @return true if there are procedures available to process, otherwise false.
204 protected abstract boolean queueHasRunnables();
217 public boolean hasRunnables() {
220 return queueHasRunnables();
226 // ============================================================================
228 // ============================================================================
229 public long getPollCalls() {
233 public long getNullPollCalls() {
234 return nullPollCalls
;
237 // ==========================================================================
239 // ==========================================================================
242 * Wake up all of the given events.
243 * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
244 * Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
245 * @param events the list of events to wake
247 public void wakeEvents(ProcedureEvent
[] events
) {
250 for (ProcedureEvent event
: events
) {
254 event
.wakeInternal(this);
262 * Wakes up given waiting procedures by pushing them back into scheduler queues.
263 * @return size of given {@code waitQueue}.
265 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue
) {
266 return lockAndQueue
.wakeWaitingProcedures(this);
269 protected void waitProcedure(LockAndQueue lockAndQueue
, final Procedure proc
) {
270 lockAndQueue
.addLast(proc
);
273 protected void wakeProcedure(final Procedure procedure
) {
274 LOG
.trace("Wake {}", procedure
);
275 push(procedure
, /* addFront= */ true, /* notify= */false);
278 // ==========================================================================
280 // ==========================================================================
281 protected void schedLock() {
282 schedulerLock
.lock();
285 protected void schedUnlock() {
286 schedulerLock
.unlock();
289 protected void wakePollIfNeeded(final int waitingCount
) {
290 if (waitingCount
<= 0) {
293 if (waitingCount
== 1) {
294 schedWaitCond
.signal();
296 schedWaitCond
.signalAll();