2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.CellUtil
.createCellScanner
;
21 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.SLEEP_DELTA_NS
;
22 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.calcPriority
;
23 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.getPauseTime
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.resetController
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.translateException
;
26 import static org
.apache
.hadoop
.hbase
.util
.ConcurrentMapUtils
.computeIfAbsent
;
27 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
28 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.unwrapCompletionException
;
30 import java
.io
.IOException
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Collections
;
33 import java
.util
.HashMap
;
34 import java
.util
.IdentityHashMap
;
35 import java
.util
.List
;
37 import java
.util
.Optional
;
38 import java
.util
.concurrent
.CompletableFuture
;
39 import java
.util
.concurrent
.ConcurrentHashMap
;
40 import java
.util
.concurrent
.ConcurrentLinkedQueue
;
41 import java
.util
.concurrent
.ConcurrentMap
;
42 import java
.util
.concurrent
.ConcurrentSkipListMap
;
43 import java
.util
.concurrent
.TimeUnit
;
44 import java
.util
.function
.Supplier
;
45 import java
.util
.stream
.Collectors
;
46 import java
.util
.stream
.Stream
;
47 import org
.apache
.commons
.lang3
.mutable
.MutableBoolean
;
48 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
49 import org
.apache
.hadoop
.hbase
.CellScannable
;
50 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
51 import org
.apache
.hadoop
.hbase
.HConstants
;
52 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
53 import org
.apache
.hadoop
.hbase
.RetryImmediatelyException
;
54 import org
.apache
.hadoop
.hbase
.ServerName
;
55 import org
.apache
.hadoop
.hbase
.TableName
;
56 import org
.apache
.hadoop
.hbase
.client
.MultiResponse
.RegionResult
;
57 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedException
.ThrowableWithExtraContext
;
58 import org
.apache
.hadoop
.hbase
.client
.backoff
.ClientBackoffPolicy
;
59 import org
.apache
.hadoop
.hbase
.client
.backoff
.ServerStatistics
;
60 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
61 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
62 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
63 import org
.apache
.yetus
.audience
.InterfaceAudience
;
64 import org
.slf4j
.Logger
;
65 import org
.slf4j
.LoggerFactory
;
67 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timer
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
70 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ResponseConverter
;
71 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
72 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
75 * Retry caller for batch.
77 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
78 * other single operations
80 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
81 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
82 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
83 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
84 * the depth of the tree.
86 @InterfaceAudience.Private
87 class AsyncBatchRpcRetryingCaller
<T
> {
89 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncBatchRpcRetryingCaller
.class);
91 private final Timer retryTimer
;
93 private final AsyncConnectionImpl conn
;
95 private final TableName tableName
;
97 private final List
<Action
> actions
;
99 private final List
<CompletableFuture
<T
>> futures
;
101 private final IdentityHashMap
<Action
, CompletableFuture
<T
>> action2Future
;
103 private final IdentityHashMap
<Action
, List
<ThrowableWithExtraContext
>> action2Errors
;
105 private final long pauseNs
;
107 private final long pauseForCQTBENs
;
109 private final int maxAttempts
;
111 private final long operationTimeoutNs
;
113 private final long rpcTimeoutNs
;
115 private final int startLogErrorsCnt
;
117 private final long startNs
;
119 // we can not use HRegionLocation as the map key because the hashCode and equals method of
120 // HRegionLocation only consider serverName.
121 private static final class RegionRequest
{
123 public final HRegionLocation loc
;
125 public final ConcurrentLinkedQueue
<Action
> actions
= new ConcurrentLinkedQueue
<>();
127 public RegionRequest(HRegionLocation loc
) {
132 private static final class ServerRequest
{
134 public final ConcurrentMap
<byte[], RegionRequest
> actionsByRegion
=
135 new ConcurrentSkipListMap
<>(Bytes
.BYTES_COMPARATOR
);
137 public void addAction(HRegionLocation loc
, Action action
) {
138 computeIfAbsent(actionsByRegion
, loc
.getRegion().getRegionName(),
139 () -> new RegionRequest(loc
)).actions
.add(action
);
142 public void setRegionRequest(byte[] regionName
, RegionRequest regionReq
) {
143 actionsByRegion
.put(regionName
, regionReq
);
146 public int getPriority() {
147 return actionsByRegion
.values().stream().flatMap(rr
-> rr
.actions
.stream())
148 .mapToInt(Action
::getPriority
).max().orElse(HConstants
.PRIORITY_UNSET
);
152 public AsyncBatchRpcRetryingCaller(Timer retryTimer
, AsyncConnectionImpl conn
,
153 TableName tableName
, List
<?
extends Row
> actions
, long pauseNs
, long pauseForCQTBENs
,
154 int maxAttempts
, long operationTimeoutNs
, long rpcTimeoutNs
, int startLogErrorsCnt
) {
155 this.retryTimer
= retryTimer
;
157 this.tableName
= tableName
;
158 this.pauseNs
= pauseNs
;
159 this.pauseForCQTBENs
= pauseForCQTBENs
;
160 this.maxAttempts
= maxAttempts
;
161 this.operationTimeoutNs
= operationTimeoutNs
;
162 this.rpcTimeoutNs
= rpcTimeoutNs
;
163 this.startLogErrorsCnt
= startLogErrorsCnt
;
164 this.actions
= new ArrayList
<>(actions
.size());
165 this.futures
= new ArrayList
<>(actions
.size());
166 this.action2Future
= new IdentityHashMap
<>(actions
.size());
167 for (int i
= 0, n
= actions
.size(); i
< n
; i
++) {
168 Row rawAction
= actions
.get(i
);
170 if (rawAction
instanceof OperationWithAttributes
) {
171 action
= new Action(rawAction
, i
, ((OperationWithAttributes
) rawAction
).getPriority());
173 action
= new Action(rawAction
, i
);
175 if (hasIncrementOrAppend(rawAction
)) {
176 action
.setNonce(conn
.getNonceGenerator().newNonce());
178 this.actions
.add(action
);
179 CompletableFuture
<T
> future
= new CompletableFuture
<>();
181 action2Future
.put(action
, future
);
183 this.action2Errors
= new IdentityHashMap
<>();
184 this.startNs
= System
.nanoTime();
187 private static boolean hasIncrementOrAppend(Row action
) {
188 if (action
instanceof Append
|| action
instanceof Increment
) {
190 } else if (action
instanceof RowMutations
) {
191 return hasIncrementOrAppend((RowMutations
) action
);
192 } else if (action
instanceof CheckAndMutate
) {
193 return hasIncrementOrAppend(((CheckAndMutate
) action
).getAction());
198 private static boolean hasIncrementOrAppend(RowMutations mutations
) {
199 for (Mutation mutation
: mutations
.getMutations()) {
200 if (mutation
instanceof Append
|| mutation
instanceof Increment
) {
207 private long remainingTimeNs() {
208 return operationTimeoutNs
- (System
.nanoTime() - startNs
);
211 private List
<ThrowableWithExtraContext
> removeErrors(Action action
) {
212 synchronized (action2Errors
) {
213 return action2Errors
.remove(action
);
217 private void logException(int tries
, Supplier
<Stream
<RegionRequest
>> regionsSupplier
,
218 Throwable error
, ServerName serverName
) {
219 if (tries
> startLogErrorsCnt
) {
221 regionsSupplier
.get().map(r
-> "'" + r
.loc
.getRegion().getRegionNameAsString() + "'")
222 .collect(Collectors
.joining(",", "[", "]"));
223 LOG
.warn("Process batch for " + regions
+ " in " + tableName
+ " from " + serverName
+
224 " failed, tries=" + tries
, error
);
228 private String
getExtraContextForError(ServerName serverName
) {
229 return serverName
!= null ? serverName
.getServerName() : "";
232 private void addError(Action action
, Throwable error
, ServerName serverName
) {
233 List
<ThrowableWithExtraContext
> errors
;
234 synchronized (action2Errors
) {
235 errors
= action2Errors
.computeIfAbsent(action
, k
-> new ArrayList
<>());
237 errors
.add(new ThrowableWithExtraContext(error
, EnvironmentEdgeManager
.currentTime(),
238 getExtraContextForError(serverName
)));
241 private void addError(Iterable
<Action
> actions
, Throwable error
, ServerName serverName
) {
242 actions
.forEach(action
-> addError(action
, error
, serverName
));
245 private void failOne(Action action
, int tries
, Throwable error
, long currentTime
, String extras
) {
246 CompletableFuture
<T
> future
= action2Future
.get(action
);
247 if (future
.isDone()) {
250 ThrowableWithExtraContext errorWithCtx
=
251 new ThrowableWithExtraContext(error
, currentTime
, extras
);
252 List
<ThrowableWithExtraContext
> errors
= removeErrors(action
);
253 if (errors
== null) {
254 errors
= Collections
.singletonList(errorWithCtx
);
256 errors
.add(errorWithCtx
);
258 future
.completeExceptionally(new RetriesExhaustedException(tries
- 1, errors
));
261 private void failAll(Stream
<Action
> actions
, int tries
, Throwable error
, ServerName serverName
) {
262 long currentTime
= EnvironmentEdgeManager
.currentTime();
263 String extras
= getExtraContextForError(serverName
);
264 actions
.forEach(action
-> failOne(action
, tries
, error
, currentTime
, extras
));
267 private void failAll(Stream
<Action
> actions
, int tries
) {
268 actions
.forEach(action
-> {
269 CompletableFuture
<T
> future
= action2Future
.get(action
);
270 if (future
.isDone()) {
273 future
.completeExceptionally(new RetriesExhaustedException(tries
,
274 Optional
.ofNullable(removeErrors(action
)).orElse(Collections
.emptyList())));
278 private ClientProtos
.MultiRequest
buildReq(Map
<byte[], RegionRequest
> actionsByRegion
,
279 List
<CellScannable
> cells
, Map
<Integer
, Integer
> indexMap
) throws IOException
{
280 ClientProtos
.MultiRequest
.Builder multiRequestBuilder
= ClientProtos
.MultiRequest
.newBuilder();
281 ClientProtos
.RegionAction
.Builder regionActionBuilder
= ClientProtos
.RegionAction
.newBuilder();
282 ClientProtos
.Action
.Builder actionBuilder
= ClientProtos
.Action
.newBuilder();
283 ClientProtos
.MutationProto
.Builder mutationBuilder
= ClientProtos
.MutationProto
.newBuilder();
284 for (Map
.Entry
<byte[], RegionRequest
> entry
: actionsByRegion
.entrySet()) {
285 long nonceGroup
= conn
.getNonceGenerator().getNonceGroup();
286 // multiRequestBuilder will be populated with region actions.
287 // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
289 RequestConverter
.buildNoDataRegionActions(entry
.getKey(),
290 entry
.getValue().actions
.stream()
291 .sorted((a1
, a2
) -> Integer
.compare(a1
.getOriginalIndex(), a2
.getOriginalIndex()))
292 .collect(Collectors
.toList()),
293 cells
, multiRequestBuilder
, regionActionBuilder
, actionBuilder
, mutationBuilder
,
294 nonceGroup
, indexMap
);
296 return multiRequestBuilder
.build();
299 @SuppressWarnings("unchecked")
300 private void onComplete(Action action
, RegionRequest regionReq
, int tries
, ServerName serverName
,
301 RegionResult regionResult
, List
<Action
> failedActions
, Throwable regionException
,
302 MutableBoolean retryImmediately
) {
303 Object result
= regionResult
.result
.getOrDefault(action
.getOriginalIndex(), regionException
);
304 if (result
== null) {
305 LOG
.error("Server " + serverName
+ " sent us neither result nor exception for row '" +
306 Bytes
.toStringBinary(action
.getAction().getRow()) + "' of " +
307 regionReq
.loc
.getRegion().getRegionNameAsString());
308 addError(action
, new RuntimeException("Invalid response"), serverName
);
309 failedActions
.add(action
);
310 } else if (result
instanceof Throwable
) {
311 Throwable error
= translateException((Throwable
) result
);
312 logException(tries
, () -> Stream
.of(regionReq
), error
, serverName
);
313 conn
.getLocator().updateCachedLocationOnError(regionReq
.loc
, error
);
314 if (error
instanceof DoNotRetryIOException
|| tries
>= maxAttempts
) {
315 failOne(action
, tries
, error
, EnvironmentEdgeManager
.currentTime(),
316 getExtraContextForError(serverName
));
318 if (!retryImmediately
.booleanValue() && error
instanceof RetryImmediatelyException
) {
319 retryImmediately
.setTrue();
321 failedActions
.add(action
);
324 action2Future
.get(action
).complete((T
) result
);
328 private void onComplete(Map
<byte[], RegionRequest
> actionsByRegion
, int tries
,
329 ServerName serverName
, MultiResponse resp
) {
330 ConnectionUtils
.updateStats(conn
.getStatisticsTracker(), conn
.getConnectionMetrics(),
332 List
<Action
> failedActions
= new ArrayList
<>();
333 MutableBoolean retryImmediately
= new MutableBoolean(false);
334 actionsByRegion
.forEach((rn
, regionReq
) -> {
335 RegionResult regionResult
= resp
.getResults().get(rn
);
336 Throwable regionException
= resp
.getException(rn
);
337 if (regionResult
!= null) {
338 regionReq
.actions
.forEach(action
-> onComplete(action
, regionReq
, tries
, serverName
,
339 regionResult
, failedActions
, regionException
, retryImmediately
));
342 if (regionException
== null) {
343 LOG
.error("Server sent us neither results nor exceptions for {}",
344 Bytes
.toStringBinary(rn
));
345 error
= new RuntimeException("Invalid response");
347 error
= translateException(regionException
);
349 logException(tries
, () -> Stream
.of(regionReq
), error
, serverName
);
350 conn
.getLocator().updateCachedLocationOnError(regionReq
.loc
, error
);
351 if (error
instanceof DoNotRetryIOException
|| tries
>= maxAttempts
) {
352 failAll(regionReq
.actions
.stream(), tries
, error
, serverName
);
355 if (!retryImmediately
.booleanValue() && error
instanceof RetryImmediatelyException
) {
356 retryImmediately
.setTrue();
358 addError(regionReq
.actions
, error
, serverName
);
359 failedActions
.addAll(regionReq
.actions
);
362 if (!failedActions
.isEmpty()) {
363 tryResubmit(failedActions
.stream(), tries
, retryImmediately
.booleanValue(), false);
367 private void sendToServer(ServerName serverName
, ServerRequest serverReq
, int tries
) {
369 if (operationTimeoutNs
> 0) {
370 remainingNs
= remainingTimeNs();
371 if (remainingNs
<= 0) {
372 failAll(serverReq
.actionsByRegion
.values().stream().flatMap(r
-> r
.actions
.stream()),
377 remainingNs
= Long
.MAX_VALUE
;
379 ClientService
.Interface stub
;
381 stub
= conn
.getRegionServerStub(serverName
);
382 } catch (IOException e
) {
383 onError(serverReq
.actionsByRegion
, tries
, e
, serverName
);
386 ClientProtos
.MultiRequest req
;
387 List
<CellScannable
> cells
= new ArrayList
<>();
388 // Map from a created RegionAction to the original index for a RowMutations within
389 // the original list of actions. This will be used to process the results when there
390 // is RowMutations/CheckAndMutate in the action list.
391 Map
<Integer
, Integer
> indexMap
= new HashMap
<>();
393 req
= buildReq(serverReq
.actionsByRegion
, cells
, indexMap
);
394 } catch (IOException e
) {
395 onError(serverReq
.actionsByRegion
, tries
, e
, serverName
);
398 HBaseRpcController controller
= conn
.rpcControllerFactory
.newController();
399 resetController(controller
, Math
.min(rpcTimeoutNs
, remainingNs
),
400 calcPriority(serverReq
.getPriority(), tableName
));
401 if (!cells
.isEmpty()) {
402 controller
.setCellScanner(createCellScanner(cells
));
404 stub
.multi(controller
, req
, resp
-> {
405 if (controller
.failed()) {
406 onError(serverReq
.actionsByRegion
, tries
, controller
.getFailed(), serverName
);
409 onComplete(serverReq
.actionsByRegion
, tries
, serverName
, ResponseConverter
.getResults(req
,
410 indexMap
, resp
, controller
.cellScanner()));
411 } catch (Exception e
) {
412 onError(serverReq
.actionsByRegion
, tries
, e
, serverName
);
419 // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
420 // based on the load of the region server and the region.
421 private void sendOrDelay(Map
<ServerName
, ServerRequest
> actionsByServer
, int tries
) {
422 Optional
<MetricsConnection
> metrics
= conn
.getConnectionMetrics();
423 Optional
<ServerStatisticTracker
> optStats
= conn
.getStatisticsTracker();
424 if (!optStats
.isPresent()) {
425 actionsByServer
.forEach((serverName
, serverReq
) -> {
426 metrics
.ifPresent(MetricsConnection
::incrNormalRunners
);
427 sendToServer(serverName
, serverReq
, tries
);
431 ServerStatisticTracker stats
= optStats
.get();
432 ClientBackoffPolicy backoffPolicy
= conn
.getBackoffPolicy();
433 actionsByServer
.forEach((serverName
, serverReq
) -> {
434 ServerStatistics serverStats
= stats
.getStats(serverName
);
435 Map
<Long
, ServerRequest
> groupByBackoff
= new HashMap
<>();
436 serverReq
.actionsByRegion
.forEach((regionName
, regionReq
) -> {
437 long backoff
= backoffPolicy
.getBackoffTime(serverName
, regionName
, serverStats
);
438 groupByBackoff
.computeIfAbsent(backoff
, k
-> new ServerRequest())
439 .setRegionRequest(regionName
, regionReq
);
441 groupByBackoff
.forEach((backoff
, sr
) -> {
443 metrics
.ifPresent(m
-> m
.incrDelayRunnersAndUpdateDelayInterval(backoff
));
444 retryTimer
.newTimeout(timer
-> sendToServer(serverName
, sr
, tries
), backoff
,
445 TimeUnit
.MILLISECONDS
);
447 metrics
.ifPresent(MetricsConnection
::incrNormalRunners
);
448 sendToServer(serverName
, sr
, tries
);
454 private void onError(Map
<byte[], RegionRequest
> actionsByRegion
, int tries
, Throwable t
,
455 ServerName serverName
) {
456 Throwable error
= translateException(t
);
457 logException(tries
, () -> actionsByRegion
.values().stream(), error
, serverName
);
458 actionsByRegion
.forEach(
459 (rn
, regionReq
) -> conn
.getLocator().updateCachedLocationOnError(regionReq
.loc
, error
));
460 if (error
instanceof DoNotRetryIOException
|| tries
>= maxAttempts
) {
461 failAll(actionsByRegion
.values().stream().flatMap(r
-> r
.actions
.stream()), tries
, error
,
465 List
<Action
> copiedActions
= actionsByRegion
.values().stream().flatMap(r
-> r
.actions
.stream())
466 .collect(Collectors
.toList());
467 addError(copiedActions
, error
, serverName
);
468 tryResubmit(copiedActions
.stream(), tries
, error
instanceof RetryImmediatelyException
,
469 error
instanceof CallQueueTooBigException
);
472 private void tryResubmit(Stream
<Action
> actions
, int tries
, boolean immediately
,
473 boolean isCallQueueTooBig
) {
475 groupAndSend(actions
, tries
);
479 long pauseNsToUse
= isCallQueueTooBig ? pauseForCQTBENs
: pauseNs
;
480 if (operationTimeoutNs
> 0) {
481 long maxDelayNs
= remainingTimeNs() - SLEEP_DELTA_NS
;
482 if (maxDelayNs
<= 0) {
483 failAll(actions
, tries
);
486 delayNs
= Math
.min(maxDelayNs
, getPauseTime(pauseNsToUse
, tries
- 1));
488 delayNs
= getPauseTime(pauseNsToUse
, tries
- 1);
490 retryTimer
.newTimeout(t
-> groupAndSend(actions
, tries
+ 1), delayNs
, TimeUnit
.NANOSECONDS
);
493 private void groupAndSend(Stream
<Action
> actions
, int tries
) {
494 long locateTimeoutNs
;
495 if (operationTimeoutNs
> 0) {
496 locateTimeoutNs
= remainingTimeNs();
497 if (locateTimeoutNs
<= 0) {
498 failAll(actions
, tries
);
502 locateTimeoutNs
= -1L;
504 ConcurrentMap
<ServerName
, ServerRequest
> actionsByServer
= new ConcurrentHashMap
<>();
505 ConcurrentLinkedQueue
<Action
> locateFailed
= new ConcurrentLinkedQueue
<>();
506 addListener(CompletableFuture
.allOf(actions
507 .map(action
-> conn
.getLocator().getRegionLocation(tableName
, action
.getAction().getRow(),
508 RegionLocateType
.CURRENT
, locateTimeoutNs
).whenComplete((loc
, error
) -> {
510 error
= unwrapCompletionException(translateException(error
));
511 if (error
instanceof DoNotRetryIOException
) {
512 failOne(action
, tries
, error
, EnvironmentEdgeManager
.currentTime(), "");
515 addError(action
, error
, null);
516 locateFailed
.add(action
);
518 computeIfAbsent(actionsByServer
, loc
.getServerName(), ServerRequest
::new).addAction(loc
,
522 .toArray(CompletableFuture
[]::new)), (v
, r
) -> {
523 if (!actionsByServer
.isEmpty()) {
524 sendOrDelay(actionsByServer
, tries
);
526 if (!locateFailed
.isEmpty()) {
527 tryResubmit(locateFailed
.stream(), tries
, false, false);
532 public List
<CompletableFuture
<T
>> call() {
533 groupAndSend(actions
.stream(), 1);