HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-procedure / src / main / java / org / apache / hadoop / hbase / procedure2 / ProcedureEvent.java
blobe9bc91986b8640d2478ecf355d5cead292962821
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.procedure2;
21 import org.apache.yetus.audience.InterfaceAudience;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 /**
26 * Basic ProcedureEvent that contains an "object", which can be a description or a reference to the
27 * resource to wait on, and a queue for suspended procedures.
29 @InterfaceAudience.Private
30 public class ProcedureEvent<T> {
31 private static final Logger LOG = LoggerFactory.getLogger(ProcedureEvent.class);
33 private final T object;
34 private boolean ready = false;
35 private ProcedureDeque suspendedProcedures = new ProcedureDeque();
37 public ProcedureEvent(final T object) {
38 this.object = object;
41 public synchronized boolean isReady() {
42 return ready;
45 /**
46 * @return true if event is not ready and adds procedure to suspended queue, else returns false.
48 public synchronized boolean suspendIfNotReady(Procedure proc) {
49 if (!ready) {
50 suspendedProcedures.addLast(proc);
52 return !ready;
55 /** Mark the event as not ready. */
56 public synchronized void suspend() {
57 ready = false;
58 if (LOG.isTraceEnabled()) {
59 LOG.trace("Suspend " + toString());
63 /**
64 * Wakes up the suspended procedures by pushing them back into scheduler queues and sets the
65 * event as ready.
66 * See {@link #wakeInternal(AbstractProcedureScheduler)} for why this is not synchronized.
68 public void wake(AbstractProcedureScheduler procedureScheduler) {
69 procedureScheduler.wakeEvents(new ProcedureEvent[]{this});
72 /**
73 * Wakes up the suspended procedures only if the given {@code proc} is waiting on this event.
74 * <p/>
75 * Mainly used by region assignment to reject stale OpenRegionProcedure/CloseRegionProcedure. Use
76 * with caution as it will cause performance issue if there are lots of procedures waiting on the
77 * event.
79 public synchronized boolean wakeIfSuspended(AbstractProcedureScheduler procedureScheduler,
80 Procedure<?> proc) {
81 if (suspendedProcedures.stream().anyMatch(p -> p.getProcId() == proc.getProcId())) {
82 wake(procedureScheduler);
83 return true;
85 return false;
88 /**
89 * Wakes up all the given events and puts the procedures waiting on them back into
90 * ProcedureScheduler queues.
92 public static void wakeEvents(AbstractProcedureScheduler scheduler, ProcedureEvent ... events) {
93 scheduler.wakeEvents(events);
96 /**
97 * Only to be used by ProcedureScheduler implementations.
98 * Reason: To wake up multiple events, locking sequence is
99 * schedLock --> synchronized (event)
100 * To wake up an event, both schedLock() and synchronized(event) are required.
101 * The order is schedLock() --> synchronized(event) because when waking up multiple events
102 * simultaneously, we keep the scheduler locked until all procedures suspended on these events
103 * have been added back to the queue (Maybe it's not required? Evaluate!)
104 * To avoid deadlocks, we want to keep the locking order same even when waking up single event.
105 * That's why, {@link #wake(AbstractProcedureScheduler)} above uses the same code path as used
106 * when waking up multiple events.
107 * Access should remain package-private.
109 public synchronized void wakeInternal(AbstractProcedureScheduler procedureScheduler) {
110 if (ready && !suspendedProcedures.isEmpty()) {
111 LOG.warn("Found procedures suspended in a ready event! Size=" + suspendedProcedures.size());
113 ready = true;
114 if (LOG.isTraceEnabled()) {
115 LOG.trace("Unsuspend " + toString());
117 // wakeProcedure adds to the front of queue, so we start from last in the
118 // waitQueue' queue, so that the procedure which was added first goes in the front for
119 // the scheduler queue.
120 procedureScheduler.addFront(suspendedProcedures.descendingIterator());
121 suspendedProcedures.clear();
125 * Access to suspendedProcedures is 'synchronized' on this object, but it's fine to return it
126 * here for tests.
128 public ProcedureDeque getSuspendedProcedures() {
129 return suspendedProcedures;
132 @Override
133 public synchronized String toString() {
134 return getClass().getSimpleName() + " for " + object + ", ready=" + isReady() + ", " +
135 suspendedProcedures;