2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.procedure2
;
21 import org
.apache
.yetus
.audience
.InterfaceAudience
;
22 import org
.slf4j
.Logger
;
23 import org
.slf4j
.LoggerFactory
;
26 * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
27 * resource to wait on, and a queue for suspended procedures.
29 @InterfaceAudience.Private
30 public class ProcedureEvent
<T
> {
31 private static final Logger LOG
= LoggerFactory
.getLogger(ProcedureEvent
.class);
33 private final T object
;
34 private boolean ready
= false;
35 private ProcedureDeque suspendedProcedures
= new ProcedureDeque();
37 public ProcedureEvent(final T object
) {
41 public synchronized boolean isReady() {
46 * @return true if event is not ready and adds procedure to suspended queue, else returns false.
48 public synchronized boolean suspendIfNotReady(Procedure proc
) {
50 suspendedProcedures
.addLast(proc
);
55 /** Mark the event as not ready. */
56 public synchronized void suspend() {
58 if (LOG
.isTraceEnabled()) {
59 LOG
.trace("Suspend " + toString());
64 * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the
66 * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized.
68 public void wake(AbstractProcedureScheduler procedureScheduler
) {
69 procedureScheduler
.wakeEvents(new ProcedureEvent
[]{this});
73 * Wakes up the suspended procedures only if the given {@code proc} is waiting on this event.
75 * Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use
76 * with caution as it will cause performance issue if there are lots of procedures waiting on the
79 public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler
,
81 if (suspendedProcedures
.stream().anyMatch(p
-> p
.getProcId() == proc
.getProcId())) {
82 wake(procedureScheduler
);
89 * Wakes up all the given events and puts the procedures waiting on them back into
90 * ProcedureScheduler queues.
92 public static void wakeEvents(AbstractProcedureScheduler scheduler
, ProcedureEvent
... events
) {
93 scheduler
.wakeEvents(events
);
97 * Only to be used by ProcedureScheduler implementations.
98 * Reason: To wake up multiple events, locking sequence is
99 * schedLock --> synchronized (event)
100 * To wake up an event, both schedLock() and synchronized(event) are required.
101 * The order is schedLock() --> synchronized(event) because when waking up multiple events
102 * simultaneously, we keep the scheduler locked until all procedures suspended on these events
103 * have been added back to the queue (Maybe it's not required? Evaluate!)
104 * To avoid deadlocks, we want to keep the locking order same even when waking up single event.
105 * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used
106 * when waking up multiple events.
107 * Access should remain package-private.
109 public synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler
) {
110 if (ready
&& !suspendedProcedures
.isEmpty()) {
111 LOG
.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures
.size());
114 if (LOG
.isTraceEnabled()) {
115 LOG
.trace("Unsuspend " + toString());
117 // wakeProcedure adds to the front of queue, so we start from last in the
118 // waitQueue' queue, so that the procedure which was added first goes in the front for
119 // the scheduler queue.
120 procedureScheduler
.addFront(suspendedProcedures
.descendingIterator());
121 suspendedProcedures
.clear();
125 * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it
128 public ProcedureDeque
getSuspendedProcedures() {
129 return suspendedProcedures
;
133 public synchronized String
toString() {
134 return getClass().getSimpleName() + " for " + object
+ ", ready=" + isReady() + ", " +