HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncRpcRetryingCaller.java
blob8648572a04a36e75792b1d82b73b1c1994c0480b
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org.apache.hadoop.hbase.client;
22 import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
23 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
24 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
25 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Optional;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.TimeUnit;
32 import java.util.function.Consumer;
33 import java.util.function.Supplier;
34 import org.apache.hadoop.hbase.CallQueueTooBigException;
35 import org.apache.hadoop.hbase.DoNotRetryIOException;
36 import org.apache.hadoop.hbase.NotServingRegionException;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.TableNotEnabledException;
39 import org.apache.hadoop.hbase.TableNotFoundException;
40 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
41 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
42 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
43 import org.apache.hadoop.hbase.util.FutureUtils;
44 import org.apache.yetus.audience.InterfaceAudience;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import org.apache.hbase.thirdparty.io.netty.util.Timer;
50 @InterfaceAudience.Private
51 public abstract class AsyncRpcRetryingCaller<T> {
53 private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class);
55 private final Timer retryTimer;
57 private final int priority;
59 private final long startNs;
61 private final long pauseNs;
63 private final long pauseForCQTBENs;
65 private int tries = 1;
67 private final int maxAttempts;
69 private final int startLogErrorsCnt;
71 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
73 private final long rpcTimeoutNs;
75 protected final long operationTimeoutNs;
77 protected final AsyncConnectionImpl conn;
79 protected final CompletableFuture<T> future;
81 protected final HBaseRpcController controller;
83 public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
84 long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
85 long rpcTimeoutNs, int startLogErrorsCnt) {
86 this.retryTimer = retryTimer;
87 this.conn = conn;
88 this.priority = priority;
89 this.pauseNs = pauseNs;
90 this.pauseForCQTBENs = pauseForCQTBENs;
91 this.maxAttempts = maxAttempts;
92 this.operationTimeoutNs = operationTimeoutNs;
93 this.rpcTimeoutNs = rpcTimeoutNs;
94 this.startLogErrorsCnt = startLogErrorsCnt;
95 this.future = new CompletableFuture<>();
96 this.controller = conn.rpcControllerFactory.newController();
97 this.controller.setPriority(priority);
98 this.exceptions = new ArrayList<>();
99 this.startNs = System.nanoTime();
102 private long elapsedMs() {
103 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
106 protected final long remainingTimeNs() {
107 return operationTimeoutNs - (System.nanoTime() - startNs);
110 protected final void completeExceptionally() {
111 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
114 protected final void resetCallTimeout() {
115 long callTimeoutNs;
116 if (operationTimeoutNs > 0) {
117 callTimeoutNs = remainingTimeNs();
118 if (callTimeoutNs <= 0) {
119 completeExceptionally();
120 return;
122 callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs);
123 } else {
124 callTimeoutNs = rpcTimeoutNs;
126 resetController(controller, callTimeoutNs, priority);
129 private void tryScheduleRetry(Throwable error) {
130 long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
131 long delayNs;
132 if (operationTimeoutNs > 0) {
133 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
134 if (maxDelayNs <= 0) {
135 completeExceptionally();
136 return;
138 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
139 } else {
140 delayNs = getPauseTime(pauseNsToUse, tries - 1);
142 tries++;
143 retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
146 protected Optional<TableName> getTableName() {
147 return Optional.empty();
150 // Sub classes can override this method to change the error type, to control the retry logic.
151 // For example, during rolling upgrading, if we call this newly added method, we will get a
152 // UnsupportedOperationException(wrapped by a DNRIOE), and sometimes we may want to fallback to
153 // use the old method first, so the sub class could change the exception type to something not a
154 // DNRIOE, so we will schedule a retry, and the next time the sub class could use old method to
155 // make the rpc call.
156 protected Throwable preProcessError(Throwable error) {
157 return error;
160 protected final void onError(Throwable t, Supplier<String> errMsg,
161 Consumer<Throwable> updateCachedLocation) {
162 if (future.isDone()) {
163 // Give up if the future is already done, this is possible if user has already canceled the
164 // future. And for timeline consistent read, we will also cancel some requests if we have
165 // already get one of the responses.
166 LOG.debug("The future is already done, canceled={}, give up retrying", future.isCancelled());
167 return;
169 Throwable error = preProcessError(translateException(t));
170 // We use this retrying caller to open a scanner, as it is idempotent, but we may throw
171 // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
172 // also fetch data when opening a scanner. The intention here is that if we hit a
173 // ScannerResetException when scanning then we should try to open a new scanner, instead of
174 // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are
175 // exactly trying to open a new scanner, so we should retry on ScannerResetException.
176 if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) {
177 future.completeExceptionally(error);
178 return;
180 if (tries > startLogErrorsCnt) {
181 LOG.warn(errMsg.get() + ", tries = " + tries + ", maxAttempts = " + maxAttempts +
182 ", timeout = " + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) +
183 " ms, time elapsed = " + elapsedMs() + " ms", error);
185 updateCachedLocation.accept(error);
186 RetriesExhaustedException.ThrowableWithExtraContext qt =
187 new RetriesExhaustedException.ThrowableWithExtraContext(error,
188 EnvironmentEdgeManager.currentTime(), "");
189 exceptions.add(qt);
190 if (tries >= maxAttempts) {
191 completeExceptionally();
192 return;
194 // check whether the table has been disabled, notice that the check will introduce a request to
195 // meta, so here we only check for disabled for some specific exception types.
196 if (error instanceof NotServingRegionException || error instanceof RegionOfflineException) {
197 Optional<TableName> tableName = getTableName();
198 if (tableName.isPresent()) {
199 FutureUtils.addListener(conn.getAdmin().isTableDisabled(tableName.get()), (disabled, e) -> {
200 if (e != null) {
201 if (e instanceof TableNotFoundException) {
202 future.completeExceptionally(e);
203 } else {
204 // failed to test whether the table is disabled, not a big deal, continue retrying
205 tryScheduleRetry(error);
207 return;
209 if (disabled) {
210 future.completeExceptionally(new TableNotEnabledException(tableName.get()));
211 } else {
212 tryScheduleRetry(error);
215 } else {
216 tryScheduleRetry(error);
218 } else {
219 tryScheduleRetry(error);
223 protected abstract void doCall();
225 CompletableFuture<T> call() {
226 doCall();
227 return future;