HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-procedure / src / main / java / org / apache / hadoop / hbase / procedure2 / AbstractProcedureScheduler.java
blob53bfba62daf8d76d2dbaefcbe52bf679137c14ce
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure2;
20 import java.util.Iterator;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Condition;
23 import java.util.concurrent.locks.ReentrantLock;
25 import org.apache.yetus.audience.InterfaceAudience;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 @InterfaceAudience.Private
30 public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
31 private static final Logger LOG = LoggerFactory.getLogger(AbstractProcedureScheduler.class);
32 private final ReentrantLock schedulerLock = new ReentrantLock();
33 private final Condition schedWaitCond = schedulerLock.newCondition();
34 private boolean running = false;
36 // TODO: metrics
37 private long pollCalls = 0;
38 private long nullPollCalls = 0;
40 @Override
41 public void start() {
42 schedLock();
43 try {
44 running = true;
45 } finally {
46 schedUnlock();
50 @Override
51 public void stop() {
52 schedLock();
53 try {
54 running = false;
55 schedWaitCond.signalAll();
56 } finally {
57 schedUnlock();
61 @Override
62 public void signalAll() {
63 schedLock();
64 try {
65 schedWaitCond.signalAll();
66 } finally {
67 schedUnlock();
71 // ==========================================================================
72 // Add related
73 // ==========================================================================
74 /**
75 * Add the procedure to the queue.
76 * NOTE: this method is called with the sched lock held.
77 * @param procedure the Procedure to add
78 * @param addFront true if the item should be added to the front of the queue
80 protected abstract void enqueue(Procedure procedure, boolean addFront);
82 @Override
83 public void addFront(final Procedure procedure) {
84 push(procedure, true, true);
87 @Override
88 public void addFront(final Procedure procedure, boolean notify) {
89 push(procedure, true, notify);
92 @Override
93 public void addFront(Iterator<Procedure> procedureIterator) {
94 schedLock();
95 try {
96 int count = 0;
97 while (procedureIterator.hasNext()) {
98 Procedure procedure = procedureIterator.next();
99 if (LOG.isTraceEnabled()) {
100 LOG.trace("Wake " + procedure);
102 push(procedure, /* addFront= */ true, /* notify= */false);
103 count++;
105 wakePollIfNeeded(count);
106 } finally {
107 schedUnlock();
111 @Override
112 public void addBack(final Procedure procedure) {
113 push(procedure, false, true);
116 @Override
117 public void addBack(final Procedure procedure, boolean notify) {
118 push(procedure, false, notify);
121 protected void push(final Procedure procedure, final boolean addFront, final boolean notify) {
122 schedLock();
123 try {
124 enqueue(procedure, addFront);
125 if (notify) {
126 schedWaitCond.signal();
128 } finally {
129 schedUnlock();
133 // ==========================================================================
134 // Poll related
135 // ==========================================================================
137 * Fetch one Procedure from the queue
138 * NOTE: this method is called with the sched lock held.
139 * @return the Procedure to execute, or null if nothing is available.
141 protected abstract Procedure dequeue();
143 @Override
144 public Procedure poll() {
145 return poll(-1);
148 @Override
149 public Procedure poll(long timeout, TimeUnit unit) {
150 return poll(unit.toNanos(timeout));
153 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
154 public Procedure poll(final long nanos) {
155 schedLock();
156 try {
157 if (!running) {
158 LOG.debug("the scheduler is not running");
159 return null;
162 if (!queueHasRunnables()) {
163 // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller
164 // to take decisions after a wake/interruption.
165 if (nanos < 0) {
166 schedWaitCond.await();
167 } else {
168 schedWaitCond.awaitNanos(nanos);
170 if (!queueHasRunnables()) {
171 nullPollCalls++;
172 return null;
175 final Procedure pollResult = dequeue();
177 pollCalls++;
178 nullPollCalls += (pollResult == null) ? 1 : 0;
179 return pollResult;
180 } catch (InterruptedException e) {
181 Thread.currentThread().interrupt();
182 nullPollCalls++;
183 return null;
184 } finally {
185 schedUnlock();
189 // ==========================================================================
190 // Utils
191 // ==========================================================================
193 * Returns the number of elements in this queue.
194 * NOTE: this method is called with the sched lock held.
195 * @return the number of elements in this queue.
197 protected abstract int queueSize();
200 * Returns true if there are procedures available to process.
201 * NOTE: this method is called with the sched lock held.
202 * @return true if there are procedures available to process, otherwise false.
204 protected abstract boolean queueHasRunnables();
206 @Override
207 public int size() {
208 schedLock();
209 try {
210 return queueSize();
211 } finally {
212 schedUnlock();
216 @Override
217 public boolean hasRunnables() {
218 schedLock();
219 try {
220 return queueHasRunnables();
221 } finally {
222 schedUnlock();
226 // ============================================================================
227 // TODO: Metrics
228 // ============================================================================
229 public long getPollCalls() {
230 return pollCalls;
233 public long getNullPollCalls() {
234 return nullPollCalls;
237 // ==========================================================================
238 // Procedure Events
239 // ==========================================================================
242 * Wake up all of the given events.
243 * Note that we first take scheduler lock and then wakeInternal() synchronizes on the event.
244 * Access should remain package-private. Use ProcedureEvent class to wake/suspend events.
245 * @param events the list of events to wake
247 public void wakeEvents(ProcedureEvent[] events) {
248 schedLock();
249 try {
250 for (ProcedureEvent event : events) {
251 if (event == null) {
252 continue;
254 event.wakeInternal(this);
256 } finally {
257 schedUnlock();
262 * Wakes up given waiting procedures by pushing them back into scheduler queues.
263 * @return size of given {@code waitQueue}.
265 protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
266 return lockAndQueue.wakeWaitingProcedures(this);
269 protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
270 lockAndQueue.addLast(proc);
273 protected void wakeProcedure(final Procedure procedure) {
274 LOG.trace("Wake {}", procedure);
275 push(procedure, /* addFront= */ true, /* notify= */false);
278 // ==========================================================================
279 // Internal helpers
280 // ==========================================================================
281 protected void schedLock() {
282 schedulerLock.lock();
285 protected void schedUnlock() {
286 schedulerLock.unlock();
289 protected void wakePollIfNeeded(final int waitingCount) {
290 if (waitingCount <= 0) {
291 return;
293 if (waitingCount == 1) {
294 schedWaitCond.signal();
295 } else {
296 schedWaitCond.signalAll();