HBASE-26286: Add support for specifying store file tracker when restoring or cloning...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / procedure / Procedure.java
blobfe3edfa63d45b3ad6dd22dc33f366e3c29bd6d98
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.procedure;
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
28 import org.apache.yetus.audience.InterfaceAudience;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31 import org.apache.hadoop.hbase.errorhandling.ForeignException;
32 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
33 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
34 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
35 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
37 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
39 /**
40 * A globally-barriered distributed procedure. This class encapsulates state and methods for
41 * tracking and managing a distributed procedure, as well as aborting if any member encounters
42 * a problem or if a cancellation is requested.
43 * <p>
44 * All procedures first attempt to reach a barrier point with the {@link #sendGlobalBarrierStart()}
45 * method. The procedure contacts all members and waits for all subprocedures to execute
46 * {@link Subprocedure#acquireBarrier} to acquire its local piece of the global barrier and then
47 * send acquisition info back to the coordinator. If all acquisitions at subprocedures succeed,
48 * the coordinator then will call {@link #sendGlobalBarrierReached()}. This notifies members to
49 * execute the {@link Subprocedure#insideBarrier()} method. The procedure is blocked until all
50 * {@link Subprocedure#insideBarrier} executions complete at the members. When
51 * {@link Subprocedure#insideBarrier} completes at each member, the member sends notification to
52 * the coordinator. Once all members complete, the coordinator calls
53 * {@link #sendGlobalBarrierComplete()}.
54 * <p>
55 * If errors are encountered remotely, they are forwarded to the coordinator, and
56 * {@link Subprocedure#cleanup(Exception)} is called.
57 * <p>
58 * Each Procedure and each Subprocedure enforces a time limit on the execution time. If the time
59 * limit expires before the procedure completes the {@link TimeoutExceptionInjector} will trigger
60 * an {@link ForeignException} to abort the procedure. This is particularly useful for situations
61 * when running a distributed {@link Subprocedure} so participants can avoid blocking for extreme
62 * amounts of time if one of the participants fails or takes a really long time (e.g. GC pause).
63 * <p>
64 * Users should generally not directly create or subclass instances of this. They are created
65 * for them implicitly via {@link ProcedureCoordinator#startProcedure(ForeignExceptionDispatcher,
66 * String, byte[], List)}}
68 @InterfaceAudience.Private
69 public class Procedure implements Callable<Void>, ForeignExceptionListener {
70 private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
73 // Arguments and naming
76 // Name of the procedure
77 final private String procName;
78 // Arguments for this procedure execution
79 final private byte[] args;
82 // Execution State
84 /** latch for waiting until all members have acquire in barrier state */
85 final CountDownLatch acquiredBarrierLatch;
86 /** latch for waiting until all members have executed and released their in barrier state */
87 final CountDownLatch releasedBarrierLatch;
88 /** latch for waiting until a procedure has completed */
89 final CountDownLatch completedLatch;
90 /** monitor to check for errors */
91 private final ForeignExceptionDispatcher monitor;
94 // Execution Timeout Handling.
97 /** frequency to check for errors (ms) */
98 protected final long wakeFrequency;
99 protected final TimeoutExceptionInjector timeoutInjector;
102 // Members' and Coordinator's state
105 /** lock to prevent nodes from acquiring and then releasing before we can track them */
106 private final Object joinBarrierLock = new Object();
107 private final List<String> acquiringMembers;
108 private final List<String> inBarrierMembers;
109 private final HashMap<String, byte[]> dataFromFinishedMembers;
110 private ProcedureCoordinator coord;
113 * Creates a procedure. (FOR TESTING)
115 * {@link Procedure} state to be run by a {@link ProcedureCoordinator}.
116 * @param coord coordinator to call back to for general errors (e.g.
117 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
118 * @param monitor error monitor to check for external errors
119 * @param wakeFreq frequency to check for errors while waiting
120 * @param timeout amount of time to allow the procedure to run before cancelling
121 * @param procName name of the procedure instance
122 * @param args argument data associated with the procedure instance
123 * @param expectedMembers names of the expected members
125 public Procedure(ProcedureCoordinator coord, ForeignExceptionDispatcher monitor, long wakeFreq,
126 long timeout, String procName, byte[] args, List<String> expectedMembers) {
127 this.coord = coord;
128 this.acquiringMembers = new ArrayList<>(expectedMembers);
129 this.inBarrierMembers = new ArrayList<>(acquiringMembers.size());
130 this.dataFromFinishedMembers = new HashMap<>();
131 this.procName = procName;
132 this.args = args;
133 this.monitor = monitor;
134 this.wakeFrequency = wakeFreq;
136 int count = expectedMembers.size();
137 this.acquiredBarrierLatch = new CountDownLatch(count);
138 this.releasedBarrierLatch = new CountDownLatch(count);
139 this.completedLatch = new CountDownLatch(1);
140 this.timeoutInjector = new TimeoutExceptionInjector(monitor, timeout);
144 * Create a procedure.
146 * Users should generally not directly create instances of this. They are created them
147 * implicitly via {@link ProcedureCoordinator#createProcedure(ForeignExceptionDispatcher,
148 * String, byte[], List)}}
150 * @param coord coordinator to call back to for general errors (e.g.
151 * {@link ProcedureCoordinator#rpcConnectionFailure(String, IOException)}).
152 * @param wakeFreq frequency to check for errors while waiting
153 * @param timeout amount of time to allow the procedure to run before cancelling
154 * @param procName name of the procedure instance
155 * @param args argument data associated with the procedure instance
156 * @param expectedMembers names of the expected members
158 public Procedure(ProcedureCoordinator coord, long wakeFreq, long timeout,
159 String procName, byte[] args, List<String> expectedMembers) {
160 this(coord, new ForeignExceptionDispatcher(), wakeFreq, timeout, procName, args,
161 expectedMembers);
164 public String getName() {
165 return procName;
169 * @return String of the procedure members both trying to enter the barrier and already in barrier
171 public String getStatus() {
172 String waiting, done;
173 synchronized (joinBarrierLock) {
174 waiting = acquiringMembers.toString();
175 done = inBarrierMembers.toString();
177 return "Procedure " + procName + " { waiting=" + waiting + " done="+ done + " }";
181 * Get the ForeignExceptionDispatcher
182 * @return the Procedure's monitor.
184 public ForeignExceptionDispatcher getErrorMonitor() {
185 return monitor;
189 * This call is the main execution thread of the barriered procedure. It sends messages and
190 * essentially blocks until all procedure members acquire or later complete but periodically
191 * checks for foreign exceptions.
193 @Override
194 @SuppressWarnings("finally")
195 final public Void call() {
196 LOG.info("Starting procedure '" + procName + "'");
197 // start the timer
198 timeoutInjector.start();
200 // run the procedure
201 try {
202 // start by checking for error first
203 monitor.rethrowException();
204 LOG.debug("Procedure '" + procName + "' starting 'acquire'");
205 sendGlobalBarrierStart();
207 // wait for all the members to report acquisition
208 LOG.debug("Waiting for all members to 'acquire'");
209 waitForLatch(acquiredBarrierLatch, monitor, wakeFrequency, "acquired");
210 monitor.rethrowException();
212 LOG.debug("Procedure '" + procName + "' starting 'in-barrier' execution.");
213 sendGlobalBarrierReached();
215 // wait for all members to report barrier release
216 LOG.debug("Waiting for all members to 'release'");
217 waitForLatch(releasedBarrierLatch, monitor, wakeFrequency, "released");
219 // make sure we didn't get an error during in barrier execution and release
220 monitor.rethrowException();
221 LOG.info("Procedure '" + procName + "' execution completed");
222 } catch (Exception e) {
223 if (e instanceof InterruptedException) {
224 Thread.currentThread().interrupt();
226 String msg = "Procedure '" + procName +"' execution failed!";
227 LOG.error(msg, e);
228 receive(new ForeignException(getName(), e));
229 } finally {
230 LOG.debug("Running finish phase.");
231 sendGlobalBarrierComplete();
232 completedLatch.countDown();
234 // tell the timer we are done, if we get here successfully
235 timeoutInjector.complete();
236 return null;
241 * Sends a message to Members to create a new {@link Subprocedure} for this Procedure and execute
242 * the {@link Subprocedure#acquireBarrier} step.
243 * @throws ForeignException
245 public void sendGlobalBarrierStart() throws ForeignException {
246 // start the procedure
247 LOG.debug("Starting procedure '" + procName + "', kicking off acquire phase on members.");
248 try {
249 // send procedure barrier start to specified list of members. cloning the list to avoid
250 // concurrent modification from the controller setting the prepared nodes
251 coord.getRpcs().sendGlobalBarrierAcquire(this, args, Lists.newArrayList(this.acquiringMembers));
252 } catch (IOException e) {
253 coord.rpcConnectionFailure("Can't reach controller.", e);
254 } catch (IllegalArgumentException e) {
255 throw new ForeignException(getName(), e);
260 * Sends a message to all members that the global barrier condition has been satisfied. This
261 * should only be executed after all members have completed its
262 * {@link Subprocedure#acquireBarrier()} call successfully. This triggers the member
263 * {@link Subprocedure#insideBarrier} method.
264 * @throws ForeignException
266 public void sendGlobalBarrierReached() throws ForeignException {
267 try {
268 // trigger to have member run {@link Subprocedure#insideBarrier}
269 coord.getRpcs().sendGlobalBarrierReached(this, Lists.newArrayList(inBarrierMembers));
270 } catch (IOException e) {
271 coord.rpcConnectionFailure("Can't reach controller.", e);
276 * Sends a message to members that all {@link Subprocedure#insideBarrier} calls have completed.
277 * After this executes, the coordinator can assume that any state resources about this barrier
278 * procedure state has been released.
280 public void sendGlobalBarrierComplete() {
281 LOG.debug("Finished coordinator procedure - removing self from list of running procedures");
282 try {
283 coord.getRpcs().resetMembers(this);
284 } catch (IOException e) {
285 coord.rpcConnectionFailure("Failed to reset procedure:" + procName, e);
290 // Call backs from other external processes.
294 * Call back triggered by an individual member upon successful local barrier acquisition
295 * @param member
297 public void barrierAcquiredByMember(String member) {
298 LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName
299 + "' on coordinator");
300 if (this.acquiringMembers.contains(member)) {
301 synchronized (joinBarrierLock) {
302 if (this.acquiringMembers.remove(member)) {
303 this.inBarrierMembers.add(member);
304 acquiredBarrierLatch.countDown();
307 LOG.debug("Waiting on: " + acquiredBarrierLatch + " remaining members to acquire global barrier");
308 } else {
309 LOG.warn("Member " + member + " joined barrier, but we weren't waiting on it to join." +
310 " Continuing on.");
315 * Call back triggered by a individual member upon successful local in-barrier execution and
316 * release
317 * @param member
318 * @param dataFromMember
320 public void barrierReleasedByMember(String member, byte[] dataFromMember) {
321 boolean removed = false;
322 synchronized (joinBarrierLock) {
323 removed = this.inBarrierMembers.remove(member);
324 if (removed) {
325 releasedBarrierLatch.countDown();
328 if (removed) {
329 LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
330 + "', counting down latch. Waiting for " + releasedBarrierLatch.getCount()
331 + " more");
332 } else {
333 LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
334 + "', but we weren't waiting on it to release!");
336 dataFromFinishedMembers.put(member, dataFromMember);
340 * Waits until the entire procedure has globally completed, or has been aborted. If an
341 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
342 * yet.
343 * @throws ForeignException
344 * @throws InterruptedException
346 public void waitForCompleted() throws ForeignException, InterruptedException {
347 waitForLatch(completedLatch, monitor, wakeFrequency, procName + " completed");
351 * Waits until the entire procedure has globally completed, or has been aborted. If an
352 * exception is thrown the procedure may or not have run cleanup to trigger the completion latch
353 * yet.
354 * @return data returned from procedure members upon successfully completing subprocedure.
355 * @throws ForeignException
356 * @throws InterruptedException
358 public HashMap<String, byte[]> waitForCompletedWithRet() throws ForeignException, InterruptedException {
359 waitForCompleted();
360 return dataFromFinishedMembers;
364 * Check if the entire procedure has globally completed, or has been aborted.
365 * @throws ForeignException
367 public boolean isCompleted() throws ForeignException {
368 // Rethrow exception if any
369 monitor.rethrowException();
370 return (completedLatch.getCount() == 0);
374 * A callback that handles incoming ForeignExceptions.
376 @Override
377 public void receive(ForeignException e) {
378 monitor.receive(e);
382 * Wait for latch to count to zero, ignoring any spurious wake-ups, but waking periodically to
383 * check for errors
384 * @param latch latch to wait on
385 * @param monitor monitor to check for errors while waiting
386 * @param wakeFrequency frequency to wake up and check for errors (in
387 * {@link TimeUnit#MILLISECONDS})
388 * @param latchDescription description of the latch, for logging
389 * @throws ForeignException type of error the monitor can throw, if the task fails
390 * @throws InterruptedException if we are interrupted while waiting on latch
392 public static void waitForLatch(CountDownLatch latch, ForeignExceptionSnare monitor,
393 long wakeFrequency, String latchDescription) throws ForeignException,
394 InterruptedException {
395 boolean released = false;
396 while (!released) {
397 if (monitor != null) {
398 monitor.rethrowException();
401 ForeignExceptionDispatcher.LOG.debug("Waiting for '" + latchDescription + "' latch. (sleep:"
402 + wakeFrequency + " ms)"); */
403 released = latch.await(wakeFrequency, TimeUnit.MILLISECONDS);
405 // check error again in case an error raised during last wait
406 if (monitor != null) {
407 monitor.rethrowException();