3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.client
;
22 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.SLEEP_DELTA_NS
;
23 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.getPauseTime
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.resetController
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.translateException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.List
;
29 import java
.util
.Optional
;
30 import java
.util
.concurrent
.CompletableFuture
;
31 import java
.util
.concurrent
.TimeUnit
;
32 import java
.util
.function
.Consumer
;
33 import java
.util
.function
.Supplier
;
34 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
35 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
36 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
37 import org
.apache
.hadoop
.hbase
.TableName
;
38 import org
.apache
.hadoop
.hbase
.TableNotEnabledException
;
39 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
40 import org
.apache
.hadoop
.hbase
.exceptions
.ScannerResetException
;
41 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
42 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
43 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
44 import org
.apache
.yetus
.audience
.InterfaceAudience
;
45 import org
.slf4j
.Logger
;
46 import org
.slf4j
.LoggerFactory
;
48 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timer
;
50 @InterfaceAudience.Private
51 public abstract class AsyncRpcRetryingCaller
<T
> {
53 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncRpcRetryingCaller
.class);
55 private final Timer retryTimer
;
57 private final int priority
;
59 private final long startNs
;
61 private final long pauseNs
;
63 private final long pauseForCQTBENs
;
65 private int tries
= 1;
67 private final int maxAttempts
;
69 private final int startLogErrorsCnt
;
71 private final List
<RetriesExhaustedException
.ThrowableWithExtraContext
> exceptions
;
73 private final long rpcTimeoutNs
;
75 protected final long operationTimeoutNs
;
77 protected final AsyncConnectionImpl conn
;
79 protected final CompletableFuture
<T
> future
;
81 protected final HBaseRpcController controller
;
83 public AsyncRpcRetryingCaller(Timer retryTimer
, AsyncConnectionImpl conn
, int priority
,
84 long pauseNs
, long pauseForCQTBENs
, int maxAttempts
, long operationTimeoutNs
,
85 long rpcTimeoutNs
, int startLogErrorsCnt
) {
86 this.retryTimer
= retryTimer
;
88 this.priority
= priority
;
89 this.pauseNs
= pauseNs
;
90 this.pauseForCQTBENs
= pauseForCQTBENs
;
91 this.maxAttempts
= maxAttempts
;
92 this.operationTimeoutNs
= operationTimeoutNs
;
93 this.rpcTimeoutNs
= rpcTimeoutNs
;
94 this.startLogErrorsCnt
= startLogErrorsCnt
;
95 this.future
= new CompletableFuture
<>();
96 this.controller
= conn
.rpcControllerFactory
.newController();
97 this.controller
.setPriority(priority
);
98 this.exceptions
= new ArrayList
<>();
99 this.startNs
= System
.nanoTime();
102 private long elapsedMs() {
103 return TimeUnit
.NANOSECONDS
.toMillis(System
.nanoTime() - startNs
);
106 protected final long remainingTimeNs() {
107 return operationTimeoutNs
- (System
.nanoTime() - startNs
);
110 protected final void completeExceptionally() {
111 future
.completeExceptionally(new RetriesExhaustedException(tries
- 1, exceptions
));
114 protected final void resetCallTimeout() {
116 if (operationTimeoutNs
> 0) {
117 callTimeoutNs
= remainingTimeNs();
118 if (callTimeoutNs
<= 0) {
119 completeExceptionally();
122 callTimeoutNs
= Math
.min(callTimeoutNs
, rpcTimeoutNs
);
124 callTimeoutNs
= rpcTimeoutNs
;
126 resetController(controller
, callTimeoutNs
, priority
);
129 private void tryScheduleRetry(Throwable error
) {
130 long pauseNsToUse
= error
instanceof CallQueueTooBigException ? pauseForCQTBENs
: pauseNs
;
132 if (operationTimeoutNs
> 0) {
133 long maxDelayNs
= remainingTimeNs() - SLEEP_DELTA_NS
;
134 if (maxDelayNs
<= 0) {
135 completeExceptionally();
138 delayNs
= Math
.min(maxDelayNs
, getPauseTime(pauseNsToUse
, tries
- 1));
140 delayNs
= getPauseTime(pauseNsToUse
, tries
- 1);
143 retryTimer
.newTimeout(t
-> doCall(), delayNs
, TimeUnit
.NANOSECONDS
);
146 protected Optional
<TableName
> getTableName() {
147 return Optional
.empty();
150 // Sub classes can override this method to change the error type, to control the retry logic.
151 // For example, during rolling upgrading, if we call this newly added method, we will get a
152 // UnsupportedOperationException(wrapped by a DNRIOE), and sometimes we may want to fallback to
153 // use the old method first, so the sub class could change the exception type to something not a
154 // DNRIOE, so we will schedule a retry, and the next time the sub class could use old method to
155 // make the rpc call.
156 protected Throwable
preProcessError(Throwable error
) {
160 protected final void onError(Throwable t
, Supplier
<String
> errMsg
,
161 Consumer
<Throwable
> updateCachedLocation
) {
162 if (future
.isDone()) {
163 // Give up if the future is already done, this is possible if user has already canceled the
164 // future. And for timeline consistent read, we will also cancel some requests if we have
165 // already get one of the responses.
166 LOG
.debug("The future is already done, canceled={}, give up retrying", future
.isCancelled());
169 Throwable error
= preProcessError(translateException(t
));
170 // We use this retrying caller to open a scanner, as it is idempotent, but we may throw
171 // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
172 // also fetch data when opening a scanner. The intention here is that if we hit a
173 // ScannerResetException when scanning then we should try to open a new scanner, instead of
174 // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are
175 // exactly trying to open a new scanner, so we should retry on ScannerResetException.
176 if (error
instanceof DoNotRetryIOException
&& !(error
instanceof ScannerResetException
)) {
177 future
.completeExceptionally(error
);
180 if (tries
> startLogErrorsCnt
) {
181 LOG
.warn(errMsg
.get() + ", tries = " + tries
+ ", maxAttempts = " + maxAttempts
+
182 ", timeout = " + TimeUnit
.NANOSECONDS
.toMillis(operationTimeoutNs
) +
183 " ms, time elapsed = " + elapsedMs() + " ms", error
);
185 updateCachedLocation
.accept(error
);
186 RetriesExhaustedException
.ThrowableWithExtraContext qt
=
187 new RetriesExhaustedException
.ThrowableWithExtraContext(error
,
188 EnvironmentEdgeManager
.currentTime(), "");
190 if (tries
>= maxAttempts
) {
191 completeExceptionally();
194 // check whether the table has been disabled, notice that the check will introduce a request to
195 // meta, so here we only check for disabled for some specific exception types.
196 if (error
instanceof NotServingRegionException
|| error
instanceof RegionOfflineException
) {
197 Optional
<TableName
> tableName
= getTableName();
198 if (tableName
.isPresent()) {
199 FutureUtils
.addListener(conn
.getAdmin().isTableDisabled(tableName
.get()), (disabled
, e
) -> {
201 if (e
instanceof TableNotFoundException
) {
202 future
.completeExceptionally(e
);
204 // failed to test whether the table is disabled, not a big deal, continue retrying
205 tryScheduleRetry(error
);
210 future
.completeExceptionally(new TableNotEnabledException(tableName
.get()));
212 tryScheduleRetry(error
);
216 tryScheduleRetry(error
);
219 tryScheduleRetry(error
);
223 protected abstract void doCall();
225 CompletableFuture
<T
> call() {