3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.client
;
22 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.retries2Attempts
;
24 import java
.io
.IOException
;
25 import java
.io
.InterruptedIOException
;
26 import java
.lang
.reflect
.UndeclaredThrowableException
;
27 import java
.net
.SocketTimeoutException
;
28 import java
.util
.ArrayList
;
29 import java
.util
.List
;
30 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
32 import org
.apache
.commons
.logging
.Log
;
33 import org
.apache
.commons
.logging
.LogFactory
;
34 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
35 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
36 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
37 import org
.apache
.hadoop
.hbase
.exceptions
.PreemptiveFastFailException
;
38 import org
.apache
.hadoop
.hbase
.shaded
.com
.google
.protobuf
.ServiceException
;
39 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
40 import org
.apache
.hadoop
.hbase
.util
.ExceptionUtil
;
41 import org
.apache
.hadoop
.ipc
.RemoteException
;
44 * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client
45 * threadlocal outstanding timeouts as so we don't persist too much.
46 * Dynamic rather than static so can set the generic appropriately.
48 * This object has a state. It should not be used by in parallel by different threads.
49 * Reusing it is possible however, even between multiple threads. However, the user will
50 * have to manage the synchronization on its side: there is no synchronization inside the class.
52 @InterfaceAudience.Private
53 public class RpcRetryingCallerImpl
<T
> implements RpcRetryingCaller
<T
> {
54 // LOG is being used in TestMultiRowRangeFilter, hence leaving it public
55 public static final Log LOG
= LogFactory
.getLog(RpcRetryingCallerImpl
.class);
57 /** How many retries are allowed before we start to log */
58 private final int startLogErrorsCnt
;
60 private final long pause
;
61 private final long pauseForCQTBE
;
62 private final int maxAttempts
;// how many times to try
63 private final int rpcTimeout
;// timeout for each rpc request
64 private final AtomicBoolean cancelled
= new AtomicBoolean(false);
65 private final RetryingCallerInterceptor interceptor
;
66 private final RetryingCallerInterceptorContext context
;
67 private final RetryingTimeTracker tracker
;
69 public RpcRetryingCallerImpl(long pause
, long pauseForCQTBE
, int retries
, int startLogErrorsCnt
) {
70 this(pause
, pauseForCQTBE
, retries
, RetryingCallerInterceptorFactory
.NO_OP_INTERCEPTOR
,
71 startLogErrorsCnt
, 0);
74 public RpcRetryingCallerImpl(long pause
, long pauseForCQTBE
, int retries
,
75 RetryingCallerInterceptor interceptor
, int startLogErrorsCnt
, int rpcTimeout
) {
77 this.pauseForCQTBE
= pauseForCQTBE
;
78 this.maxAttempts
= retries2Attempts(retries
);
79 this.interceptor
= interceptor
;
80 context
= interceptor
.createEmptyContext();
81 this.startLogErrorsCnt
= startLogErrorsCnt
;
82 this.tracker
= new RetryingTimeTracker();
83 this.rpcTimeout
= rpcTimeout
;
89 synchronized (cancelled
){
90 cancelled
.notifyAll();
95 public T
callWithRetries(RetryingCallable
<T
> callable
, int callTimeout
)
96 throws IOException
, RuntimeException
{
97 List
<RetriesExhaustedException
.ThrowableWithExtraContext
> exceptions
= new ArrayList
<>();
100 for (int tries
= 0;; tries
++) {
103 // bad cache entries are cleared in the call to RetryingCallable#throwable() in catch block
104 callable
.prepare(tries
!= 0);
105 interceptor
.intercept(context
.prepare(callable
, tries
));
106 return callable
.call(getTimeout(callTimeout
));
107 } catch (PreemptiveFastFailException e
) {
109 } catch (Throwable t
) {
110 Throwable e
= t
.getCause();
111 ExceptionUtil
.rethrowIfInterrupt(t
);
112 Throwable cause
= t
.getCause();
113 if (cause
instanceof DoNotRetryIOException
) {
115 throw (DoNotRetryIOException
) cause
;
117 // translateException throws exception when should not retry: i.e. when request is bad.
118 interceptor
.handleFailure(context
, t
);
119 t
= translateException(t
);
121 if (tries
> startLogErrorsCnt
) {
122 LOG
.info("Call exception, tries=" + tries
+ ", maxAttempts=" + maxAttempts
+ ", started="
123 + (EnvironmentEdgeManager
.currentTime() - tracker
.getStartTime()) + " ms ago, "
124 + "cancelled=" + cancelled
.get() + ", msg="
125 + t
.getMessage() + " " + callable
.getExceptionMessageAdditionalDetail());
128 callable
.throwable(t
, maxAttempts
!= 1);
129 RetriesExhaustedException
.ThrowableWithExtraContext qt
=
130 new RetriesExhaustedException
.ThrowableWithExtraContext(t
,
131 EnvironmentEdgeManager
.currentTime(), toString());
133 if (tries
>= maxAttempts
- 1) {
134 throw new RetriesExhaustedException(tries
, exceptions
);
136 // If the server is dead, we need to wait a little before retrying, to give
137 // a chance to the regions to be moved
138 // get right pause time, start by RETRY_BACKOFF[0] * pauseBase, where pauseBase might be
139 // special when encountering CallQueueTooBigException, see #HBASE-17114
140 long pauseBase
= (t
instanceof CallQueueTooBigException
) ? pauseForCQTBE
: pause
;
141 expectedSleep
= callable
.sleep(pauseBase
, tries
);
143 // If, after the planned sleep, there won't be enough time left, we stop now.
144 long duration
= singleCallDuration(expectedSleep
);
145 if (duration
> callTimeout
) {
146 String msg
= "callTimeout=" + callTimeout
+ ", callDuration=" + duration
+
147 ": " + t
.getMessage() + " " + callable
.getExceptionMessageAdditionalDetail();
148 throw (SocketTimeoutException
)(new SocketTimeoutException(msg
).initCause(t
));
151 interceptor
.updateFailureInfo(context
);
154 if (expectedSleep
> 0) {
155 synchronized (cancelled
) {
156 if (cancelled
.get()) return null;
157 cancelled
.wait(expectedSleep
);
160 if (cancelled
.get()) return null;
161 } catch (InterruptedException e
) {
162 throw new InterruptedIOException("Interrupted after " + tries
163 + " tries while maxAttempts=" + maxAttempts
);
169 * @return Calculate how long a single call took
171 private long singleCallDuration(final long expectedSleep
) {
172 return (EnvironmentEdgeManager
.currentTime() - tracker
.getStartTime()) + expectedSleep
;
176 public T
callWithoutRetries(RetryingCallable
<T
> callable
, int callTimeout
)
177 throws IOException
, RuntimeException
{
178 // The code of this method should be shared with withRetries.
180 callable
.prepare(false);
181 return callable
.call(callTimeout
);
182 } catch (Throwable t
) {
183 Throwable t2
= translateException(t
);
184 ExceptionUtil
.rethrowIfInterrupt(t2
);
185 // It would be nice to clear the location cache here.
186 if (t2
instanceof IOException
) {
187 throw (IOException
)t2
;
189 throw new RuntimeException(t2
);
195 * Get the good or the remote exception if any, throws the DoNotRetryIOException.
196 * @param t the throwable to analyze
197 * @return the translated exception, if it's not a DoNotRetryIOException
198 * @throws DoNotRetryIOException - if we find it, we throw it instead of translating.
200 static Throwable
translateException(Throwable t
) throws DoNotRetryIOException
{
201 if (t
instanceof UndeclaredThrowableException
) {
202 if (t
.getCause() != null) {
206 if (t
instanceof RemoteException
) {
207 t
= ((RemoteException
)t
).unwrapRemoteException();
209 if (t
instanceof LinkageError
) {
210 throw new DoNotRetryIOException(t
);
212 if (t
instanceof ServiceException
) {
213 ServiceException se
= (ServiceException
)t
;
214 Throwable cause
= se
.getCause();
215 if (cause
!= null && cause
instanceof DoNotRetryIOException
) {
216 throw (DoNotRetryIOException
)cause
;
218 // Don't let ServiceException out; its rpc specific.
220 // t could be a RemoteException so go around again.
221 translateException(t
);
222 } else if (t
instanceof DoNotRetryIOException
) {
223 throw (DoNotRetryIOException
)t
;
228 private int getTimeout(int callTimeout
){
229 int timeout
= tracker
.getRemainingTime(callTimeout
);
230 if (timeout
<= 0 || rpcTimeout
> 0 && rpcTimeout
< timeout
){
231 timeout
= rpcTimeout
;
237 public String
toString() {
238 return "RpcRetryingCaller{" + "globalStartTime=" + tracker
.getStartTime() +
239 ", pause=" + pause
+ ", maxAttempts=" + maxAttempts
+ '}';