2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.procedure
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayList
;
22 import java
.util
.HashMap
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.Callable
;
25 import java
.util
.concurrent
.CountDownLatch
;
26 import java
.util
.concurrent
.TimeUnit
;
28 import org
.apache
.yetus
.audience
.InterfaceAudience
;
29 import org
.slf4j
.Logger
;
30 import org
.slf4j
.LoggerFactory
;
31 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignException
;
32 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionDispatcher
;
33 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionListener
;
34 import org
.apache
.hadoop
.hbase
.errorhandling
.ForeignExceptionSnare
;
35 import org
.apache
.hadoop
.hbase
.errorhandling
.TimeoutExceptionInjector
;
37 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
40 * A globally-barriered distributed procedure. This class encapsulates state and methods for
41 * tracking and managing a distributed procedure, as well as aborting if any member encounters
42 * a problem or if a cancellation is requested.
44 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
45 * method. The procedure contacts all members and waits for all subprocedures to execute
46 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
47 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed,
48 * the coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to
49 * execute the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all
50 * {@link Subprocedure#insideBarrier} executions complete at the members. When
51 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
52 * the coordinator. Once all members complete, the coordinator calls
53 * {@link #sendGlobalBarrierComplete()}.
55 * If errors are encountered remotely, they are forwarded to the coordinator, and
56 * {@link Subprocedure#cleanup(Exception)} is called.
58 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
59 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
60 * an {@link ForeignException} to abort the procedure. This is particularly useful for situations
61 * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
62 * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
64 * Users should generally not directly create or subclass instances of this. They are created
65 * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
66 * String, byte[], List)}}
68 @InterfaceAudience.Private
69 public class Procedure
implements Callable
<Void
>, ForeignExceptionListener
{
70 private static final Logger LOG
= LoggerFactory
.getLogger(Procedure
.class);
73 // Arguments and naming
76 // Name of the procedure
77 final private String procName
;
78 // Arguments for this procedure execution
79 final private byte[] args
;
84 /** latch for waiting until all members have acquire in barrier state */
85 final CountDownLatch acquiredBarrierLatch
;
86 /** latch for waiting until all members have executed and released their in barrier state */
87 final CountDownLatch releasedBarrierLatch
;
88 /** latch for waiting until a procedure has completed */
89 final CountDownLatch completedLatch
;
90 /** monitor to check for errors */
91 private final ForeignExceptionDispatcher monitor
;
94 // Execution Timeout Handling.
97 /** frequency to check for errors (ms) */
98 protected final long wakeFrequency
;
99 protected final TimeoutExceptionInjector timeoutInjector
;
102 // Members' and Coordinator's state
105 /** lock to prevent nodes from acquiring and then releasing before we can track them */
106 private final Object joinBarrierLock
= new Object();
107 private final List
<String
> acquiringMembers
;
108 private final List
<String
> inBarrierMembers
;
109 private final HashMap
<String
, byte[]> dataFromFinishedMembers
;
110 private ProcedureCoordinator coord
;
113 * Creates a procedure. (FOR TESTING)
115 * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116 * @param coord coordinator to call back to for general errors (e.g.
117 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118 * @param monitor error monitor to check for external errors
119 * @param wakeFreq frequency to check for errors while waiting
120 * @param timeout amount of time to allow the procedure to run before cancelling
121 * @param procName name of the procedure instance
122 * @param args argument data associated with the procedure instance
123 * @param expectedMembers names of the expected members
125 public Procedure(ProcedureCoordinator coord
, ForeignExceptionDispatcher monitor
, long wakeFreq
,
126 long timeout
, String procName
, byte[] args
, List
<String
> expectedMembers
) {
128 this.acquiringMembers
= new ArrayList
<>(expectedMembers
);
129 this.inBarrierMembers
= new ArrayList
<>(acquiringMembers
.size());
130 this.dataFromFinishedMembers
= new HashMap
<>();
131 this.procName
= procName
;
133 this.monitor
= monitor
;
134 this.wakeFrequency
= wakeFreq
;
136 int count
= expectedMembers
.size();
137 this.acquiredBarrierLatch
= new CountDownLatch(count
);
138 this.releasedBarrierLatch
= new CountDownLatch(count
);
139 this.completedLatch
= new CountDownLatch(1);
140 this.timeoutInjector
= new TimeoutExceptionInjector(monitor
, timeout
);
144 * Create a procedure.
146 * Users should generally not directly create instances of this. They are created them
147 * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
148 * String, byte[], List)}}
150 * @param coord coordinator to call back to for general errors (e.g.
151 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
152 * @param wakeFreq frequency to check for errors while waiting
153 * @param timeout amount of time to allow the procedure to run before cancelling
154 * @param procName name of the procedure instance
155 * @param args argument data associated with the procedure instance
156 * @param expectedMembers names of the expected members
158 public Procedure(ProcedureCoordinator coord
, long wakeFreq
, long timeout
,
159 String procName
, byte[] args
, List
<String
> expectedMembers
) {
160 this(coord
, new ForeignExceptionDispatcher(), wakeFreq
, timeout
, procName
, args
,
164 public String
getName() {
169 * @return String of the procedure members both trying to enter the barrier and already in barrier
171 public String
getStatus() {
172 String waiting
, done
;
173 synchronized (joinBarrierLock
) {
174 waiting
= acquiringMembers
.toString();
175 done
= inBarrierMembers
.toString();
177 return "Procedure " + procName
+ " { waiting=" + waiting
+ " done="+ done
+ " }";
181 * Get the ForeignExceptionDispatcher
182 * @return the Procedure's monitor.
184 public ForeignExceptionDispatcher
getErrorMonitor() {
189 * This call is the main execution thread of the barriered procedure. It sends messages and
190 * essentially blocks until all procedure members acquire or later complete but periodically
191 * checks for foreign exceptions.
194 @SuppressWarnings("finally")
195 final public Void
call() {
196 LOG
.info("Starting procedure '" + procName
+ "'");
198 timeoutInjector
.start();
202 // start by checking for error first
203 monitor
.rethrowException();
204 LOG
.debug("Procedure '" + procName
+ "' starting 'acquire'");
205 sendGlobalBarrierStart();
207 // wait for all the members to report acquisition
208 LOG
.debug("Waiting for all members to 'acquire'");
209 waitForLatch(acquiredBarrierLatch
, monitor
, wakeFrequency
, "acquired");
210 monitor
.rethrowException();
212 LOG
.debug("Procedure '" + procName
+ "' starting 'in-barrier' execution.");
213 sendGlobalBarrierReached();
215 // wait for all members to report barrier release
216 LOG
.debug("Waiting for all members to 'release'");
217 waitForLatch(releasedBarrierLatch
, monitor
, wakeFrequency
, "released");
219 // make sure we didn't get an error during in barrier execution and release
220 monitor
.rethrowException();
221 LOG
.info("Procedure '" + procName
+ "' execution completed");
222 } catch (Exception e
) {
223 if (e
instanceof InterruptedException
) {
224 Thread
.currentThread().interrupt();
226 String msg
= "Procedure '" + procName
+"' execution failed!";
228 receive(new ForeignException(getName(), e
));
230 LOG
.debug("Running finish phase.");
231 sendGlobalBarrierComplete();
232 completedLatch
.countDown();
234 // tell the timer we are done, if we get here successfully
235 timeoutInjector
.complete();
241 * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
242 * the {@link Subprocedure#acquireBarrier} step.
243 * @throws ForeignException
245 public void sendGlobalBarrierStart() throws ForeignException
{
246 // start the procedure
247 LOG
.debug("Starting procedure '" + procName
+ "', kicking off acquire phase on members.");
249 // send procedure barrier start to specified list of members. cloning the list to avoid
250 // concurrent modification from the controller setting the prepared nodes
251 coord
.getRpcs().sendGlobalBarrierAcquire(this, args
, Lists
.newArrayList(this.acquiringMembers
));
252 } catch (IOException e
) {
253 coord
.rpcConnectionFailure("Can't reach controller.", e
);
254 } catch (IllegalArgumentException e
) {
255 throw new ForeignException(getName(), e
);
260 * Sends a message to all members that the global barrier condition has been satisfied. This
261 * should only be executed after all members have completed its
262 * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member
263 * {@link Subprocedure#insideBarrier} method.
264 * @throws ForeignException
266 public void sendGlobalBarrierReached() throws ForeignException
{
268 // trigger to have member run {@link Subprocedure#insideBarrier}
269 coord
.getRpcs().sendGlobalBarrierReached(this, Lists
.newArrayList(inBarrierMembers
));
270 } catch (IOException e
) {
271 coord
.rpcConnectionFailure("Can't reach controller.", e
);
276 * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
277 * After this executes, the coordinator can assume that any state resources about this barrier
278 * procedure state has been released.
280 public void sendGlobalBarrierComplete() {
281 LOG
.debug("Finished coordinator procedure - removing self from list of running procedures");
283 coord
.getRpcs().resetMembers(this);
284 } catch (IOException e
) {
285 coord
.rpcConnectionFailure("Failed to reset procedure:" + procName
, e
);
290 // Call backs from other external processes.
294 * Call back triggered by an individual member upon successful local barrier acquisition
297 public void barrierAcquiredByMember(String member
) {
298 LOG
.debug("member: '" + member
+ "' joining acquired barrier for procedure '" + procName
299 + "' on coordinator");
300 if (this.acquiringMembers
.contains(member
)) {
301 synchronized (joinBarrierLock
) {
302 if (this.acquiringMembers
.remove(member
)) {
303 this.inBarrierMembers
.add(member
);
304 acquiredBarrierLatch
.countDown();
307 LOG
.debug("Waiting on: " + acquiredBarrierLatch
+ " remaining members to acquire global barrier");
309 LOG
.warn("Member " + member
+ " joined barrier, but we weren't waiting on it to join." +
315 * Call back triggered by a individual member upon successful local in-barrier execution and
318 * @param dataFromMember
320 public void barrierReleasedByMember(String member
, byte[] dataFromMember
) {
321 boolean removed
= false;
322 synchronized (joinBarrierLock
) {
323 removed
= this.inBarrierMembers
.remove(member
);
325 releasedBarrierLatch
.countDown();
329 LOG
.debug("Member: '" + member
+ "' released barrier for procedure'" + procName
330 + "', counting down latch. Waiting for " + releasedBarrierLatch
.getCount()
333 LOG
.warn("Member: '" + member
+ "' released barrier for procedure'" + procName
334 + "', but we weren't waiting on it to release!");
336 dataFromFinishedMembers
.put(member
, dataFromMember
);
340 * Waits until the entire procedure has globally completed, or has been aborted. If an
341 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
343 * @throws ForeignException
344 * @throws InterruptedException
346 public void waitForCompleted() throws ForeignException
, InterruptedException
{
347 waitForLatch(completedLatch
, monitor
, wakeFrequency
, procName
+ " completed");
351 * Waits until the entire procedure has globally completed, or has been aborted. If an
352 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
354 * @return data returned from procedure members upon successfully completing subprocedure.
355 * @throws ForeignException
356 * @throws InterruptedException
358 public HashMap
<String
, byte[]> waitForCompletedWithRet() throws ForeignException
, InterruptedException
{
360 return dataFromFinishedMembers
;
364 * Check if the entire procedure has globally completed, or has been aborted.
365 * @throws ForeignException
367 public boolean isCompleted() throws ForeignException
{
368 // Rethrow exception if any
369 monitor
.rethrowException();
370 return (completedLatch
.getCount() == 0);
374 * A callback that handles incoming ForeignExceptions.
377 public void receive(ForeignException e
) {
382 * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
384 * @param latch latch to wait on
385 * @param monitor monitor to check for errors while waiting
386 * @param wakeFrequency frequency to wake up and check for errors (in
387 * {@link TimeUnit#MILLISECONDS})
388 * @param latchDescription description of the latch, for logging
389 * @throws ForeignException type of error the monitor can throw, if the task fails
390 * @throws InterruptedException if we are interrupted while waiting on latch
392 public static void waitForLatch(CountDownLatch latch
, ForeignExceptionSnare monitor
,
393 long wakeFrequency
, String latchDescription
) throws ForeignException
,
394 InterruptedException
{
395 boolean released
= false;
397 if (monitor
!= null) {
398 monitor
.rethrowException();
401 ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
402 + wakeFrequency + " ms)"); */
403 released
= latch
.await(wakeFrequency
, TimeUnit
.MILLISECONDS
);
405 // check error again in case an error raised during last wait
406 if (monitor
!= null) {
407 monitor
.rethrowException();