HBASE-24163 MOB compactor implementations should use format specifiers when calling...
[hbase.git] / hbase-common / src / main / java / org / apache / hadoop / hbase / util / FutureUtils.java
blobdfd9ead2785401333b1c644ccda94bff5a661895
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.util;
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.CompletionException;
24 import java.util.concurrent.ExecutionException;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.Future;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import java.util.function.BiConsumer;
30 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
31 import org.apache.yetus.audience.InterfaceAudience;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 /**
36 * Helper class for processing futures.
38 @InterfaceAudience.Private
39 public final class FutureUtils {
41 private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class);
43 private FutureUtils() {
46 /**
47 * This is method is used when you just want to add a listener to the given future. We will call
48 * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
49 * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may
50 * suppress exceptions thrown from the code that completes the future, and this method will catch
51 * all the exception thrown from the {@code action} to catch possible code bugs.
52 * <p/>
53 * And the error phone check will always report FutureReturnValueIgnored because every method in
54 * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always
55 * have one future that has not been checked. So we introduce this method and add a suppress
56 * warnings annotation here.
58 @SuppressWarnings("FutureReturnValueIgnored")
59 public static <T> void addListener(CompletableFuture<T> future,
60 BiConsumer<? super T, ? super Throwable> action) {
61 future.whenComplete((resp, error) -> {
62 try {
63 // See this post on stack overflow(shorten since the url is too long),
64 // https://s.apache.org/completionexception
65 // For a chain of CompleableFuture, only the first child CompletableFuture can get the
66 // original exception, others will get a CompletionException, which wraps the original
67 // exception. So here we unwrap it before passing it to the callback action.
68 action.accept(resp, unwrapCompletionException(error));
69 } catch (Throwable t) {
70 LOG.error("Unexpected error caught when processing CompletableFuture", t);
72 });
75 /**
76 * Almost the same with {@link #addListener(CompletableFuture, BiConsumer)} method above, the only
77 * exception is that we will call
78 * {@link CompletableFuture#whenCompleteAsync(BiConsumer, Executor)}.
79 * @see #addListener(CompletableFuture, BiConsumer)
81 @SuppressWarnings("FutureReturnValueIgnored")
82 public static <T> void addListener(CompletableFuture<T> future,
83 BiConsumer<? super T, ? super Throwable> action, Executor executor) {
84 future.whenCompleteAsync((resp, error) -> {
85 try {
86 action.accept(resp, unwrapCompletionException(error));
87 } catch (Throwable t) {
88 LOG.error("Unexpected error caught when processing CompletableFuture", t);
90 }, executor);
93 /**
94 * Return a {@link CompletableFuture} which is same with the given {@code future}, but execute all
95 * the callbacks in the given {@code executor}.
97 public static <T> CompletableFuture<T> wrapFuture(CompletableFuture<T> future,
98 Executor executor) {
99 CompletableFuture<T> wrappedFuture = new CompletableFuture<>();
100 addListener(future, (r, e) -> {
101 if (e != null) {
102 wrappedFuture.completeExceptionally(e);
103 } else {
104 wrappedFuture.complete(r);
106 }, executor);
107 return wrappedFuture;
111 * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
113 public static Throwable unwrapCompletionException(Throwable error) {
114 if (error instanceof CompletionException) {
115 Throwable cause = error.getCause();
116 if (cause != null) {
117 return cause;
120 return error;
123 // This method is used to record the stack trace that calling the FutureUtils.get method. As in
124 // async client, the retry will be done in the retry timer thread, so the exception we get from
125 // the CompletableFuture will have a stack trace starting from the root of the retry timer. If we
126 // just throw this exception out when calling future.get(by unwrapping the ExecutionException),
127 // the upper layer even can not know where is the exception thrown...
128 // See HBASE-22316.
129 private static void setStackTrace(Throwable error) {
130 StackTraceElement[] localStackTrace = Thread.currentThread().getStackTrace();
131 StackTraceElement[] originalStackTrace = error.getStackTrace();
132 StackTraceElement[] newStackTrace =
133 new StackTraceElement[localStackTrace.length + originalStackTrace.length + 1];
134 System.arraycopy(localStackTrace, 0, newStackTrace, 0, localStackTrace.length);
135 newStackTrace[localStackTrace.length] =
136 new StackTraceElement("--------Future", "get--------", null, -1);
137 System.arraycopy(originalStackTrace, 0, newStackTrace, localStackTrace.length + 1,
138 originalStackTrace.length);
139 error.setStackTrace(newStackTrace);
142 private static IOException rethrow(ExecutionException error) throws IOException {
143 Throwable cause = error.getCause();
144 if (cause instanceof IOException) {
145 setStackTrace(cause);
146 throw (IOException) cause;
147 } else if (cause instanceof RuntimeException) {
148 setStackTrace(cause);
149 throw (RuntimeException) cause;
150 } else if (cause instanceof Error) {
151 setStackTrace(cause);
152 throw (Error) cause;
153 } else {
154 throw new IOException(cause);
159 * A helper class for getting the result of a Future, and convert the error to an
160 * {@link IOException}.
162 public static <T> T get(Future<T> future) throws IOException {
163 try {
164 return future.get();
165 } catch (InterruptedException e) {
166 throw (IOException) new InterruptedIOException().initCause(e);
167 } catch (ExecutionException e) {
168 throw rethrow(e);
173 * A helper class for getting the result of a Future with timeout, and convert the error to an
174 * {@link IOException}.
176 public static <T> T get(Future<T> future, long timeout, TimeUnit unit) throws IOException {
177 try {
178 return future.get(timeout, unit);
179 } catch (InterruptedException e) {
180 throw (IOException) new InterruptedIOException().initCause(e);
181 } catch (ExecutionException e) {
182 throw rethrow(e);
183 } catch (TimeoutException e) {
184 throw new TimeoutIOException(e);
189 * Returns a CompletableFuture that is already completed exceptionally with the given exception.
191 public static <T> CompletableFuture<T> failedFuture(Throwable e) {
192 CompletableFuture<T> future = new CompletableFuture<>();
193 future.completeExceptionally(e);
194 return future;