HBASE-25617 Revisit the span names (#2998)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncBatchRpcRetryingCaller.java
blob7af385da2d839b3de7a518be1e99712cab7278b8
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
21 import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
22 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
23 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
24 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
25 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
26 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
27 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
28 import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
30 import java.io.IOException;
31 import java.util.ArrayList;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.IdentityHashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Optional;
38 import java.util.concurrent.CompletableFuture;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ConcurrentLinkedQueue;
41 import java.util.concurrent.ConcurrentMap;
42 import java.util.concurrent.ConcurrentSkipListMap;
43 import java.util.concurrent.TimeUnit;
44 import java.util.function.Supplier;
45 import java.util.stream.Collectors;
46 import java.util.stream.Stream;
47 import org.apache.commons.lang3.mutable.MutableBoolean;
48 import org.apache.hadoop.hbase.CallQueueTooBigException;
49 import org.apache.hadoop.hbase.CellScannable;
50 import org.apache.hadoop.hbase.DoNotRetryIOException;
51 import org.apache.hadoop.hbase.HConstants;
52 import org.apache.hadoop.hbase.HRegionLocation;
53 import org.apache.hadoop.hbase.RetryImmediatelyException;
54 import org.apache.hadoop.hbase.ServerName;
55 import org.apache.hadoop.hbase.TableName;
56 import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
57 import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
58 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
59 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
60 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
61 import org.apache.hadoop.hbase.util.Bytes;
62 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63 import org.apache.yetus.audience.InterfaceAudience;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
67 import org.apache.hbase.thirdparty.io.netty.util.Timer;
69 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
70 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
71 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
72 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
74 /**
75 * Retry caller for batch.
76 * <p>
77 * Notice that, the {@link #operationTimeoutNs} is the total time limit now which is the same with
78 * other single operations
79 * <p>
80 * And the {@link #maxAttempts} is a limit for each single operation in the batch logically. In the
81 * implementation, we will record a {@code tries} parameter for each operation group, and if it is
82 * split to several groups when retrying, the sub groups will inherit the {@code tries}. You can
83 * imagine that the whole retrying process is a tree, and the {@link #maxAttempts} is the limit of
84 * the depth of the tree.
86 @InterfaceAudience.Private
87 class AsyncBatchRpcRetryingCaller<T> {
89 private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
91 private final Timer retryTimer;
93 private final AsyncConnectionImpl conn;
95 private final TableName tableName;
97 private final List<Action> actions;
99 private final List<CompletableFuture<T>> futures;
101 private final IdentityHashMap<Action, CompletableFuture<T>> action2Future;
103 private final IdentityHashMap<Action, List<ThrowableWithExtraContext>> action2Errors;
105 private final long pauseNs;
107 private final long pauseForCQTBENs;
109 private final int maxAttempts;
111 private final long operationTimeoutNs;
113 private final long rpcTimeoutNs;
115 private final int startLogErrorsCnt;
117 private final long startNs;
119 // we can not use HRegionLocation as the map key because the hashCode and equals method of
120 // HRegionLocation only consider serverName.
121 private static final class RegionRequest {
123 public final HRegionLocation loc;
125 public final ConcurrentLinkedQueue<Action> actions = new ConcurrentLinkedQueue<>();
127 public RegionRequest(HRegionLocation loc) {
128 this.loc = loc;
132 private static final class ServerRequest {
134 public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
135 new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
137 public void addAction(HRegionLocation loc, Action action) {
138 computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(),
139 () -> new RegionRequest(loc)).actions.add(action);
142 public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
143 actionsByRegion.put(regionName, regionReq);
146 public int getPriority() {
147 return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
148 .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
152 public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
153 TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
154 int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
155 this.retryTimer = retryTimer;
156 this.conn = conn;
157 this.tableName = tableName;
158 this.pauseNs = pauseNs;
159 this.pauseForCQTBENs = pauseForCQTBENs;
160 this.maxAttempts = maxAttempts;
161 this.operationTimeoutNs = operationTimeoutNs;
162 this.rpcTimeoutNs = rpcTimeoutNs;
163 this.startLogErrorsCnt = startLogErrorsCnt;
164 this.actions = new ArrayList<>(actions.size());
165 this.futures = new ArrayList<>(actions.size());
166 this.action2Future = new IdentityHashMap<>(actions.size());
167 for (int i = 0, n = actions.size(); i < n; i++) {
168 Row rawAction = actions.get(i);
169 Action action;
170 if (rawAction instanceof OperationWithAttributes) {
171 action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority());
172 } else {
173 action = new Action(rawAction, i);
175 if (hasIncrementOrAppend(rawAction)) {
176 action.setNonce(conn.getNonceGenerator().newNonce());
178 this.actions.add(action);
179 CompletableFuture<T> future = new CompletableFuture<>();
180 futures.add(future);
181 action2Future.put(action, future);
183 this.action2Errors = new IdentityHashMap<>();
184 this.startNs = System.nanoTime();
187 private static boolean hasIncrementOrAppend(Row action) {
188 if (action instanceof Append || action instanceof Increment) {
189 return true;
190 } else if (action instanceof RowMutations) {
191 return hasIncrementOrAppend((RowMutations) action);
192 } else if (action instanceof CheckAndMutate) {
193 return hasIncrementOrAppend(((CheckAndMutate) action).getAction());
195 return false;
198 private static boolean hasIncrementOrAppend(RowMutations mutations) {
199 for (Mutation mutation : mutations.getMutations()) {
200 if (mutation instanceof Append || mutation instanceof Increment) {
201 return true;
204 return false;
207 private long remainingTimeNs() {
208 return operationTimeoutNs - (System.nanoTime() - startNs);
211 private List<ThrowableWithExtraContext> removeErrors(Action action) {
212 synchronized (action2Errors) {
213 return action2Errors.remove(action);
217 private void logException(int tries, Supplier<Stream<RegionRequest>> regionsSupplier,
218 Throwable error, ServerName serverName) {
219 if (tries > startLogErrorsCnt) {
220 String regions =
221 regionsSupplier.get().map(r -> "'" + r.loc.getRegion().getRegionNameAsString() + "'")
222 .collect(Collectors.joining(",", "[", "]"));
223 LOG.warn("Process batch for " + regions + " in " + tableName + " from " + serverName +
224 " failed, tries=" + tries, error);
228 private String getExtraContextForError(ServerName serverName) {
229 return serverName != null ? serverName.getServerName() : "";
232 private void addError(Action action, Throwable error, ServerName serverName) {
233 List<ThrowableWithExtraContext> errors;
234 synchronized (action2Errors) {
235 errors = action2Errors.computeIfAbsent(action, k -> new ArrayList<>());
237 errors.add(new ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(),
238 getExtraContextForError(serverName)));
241 private void addError(Iterable<Action> actions, Throwable error, ServerName serverName) {
242 actions.forEach(action -> addError(action, error, serverName));
245 private void failOne(Action action, int tries, Throwable error, long currentTime, String extras) {
246 CompletableFuture<T> future = action2Future.get(action);
247 if (future.isDone()) {
248 return;
250 ThrowableWithExtraContext errorWithCtx =
251 new ThrowableWithExtraContext(error, currentTime, extras);
252 List<ThrowableWithExtraContext> errors = removeErrors(action);
253 if (errors == null) {
254 errors = Collections.singletonList(errorWithCtx);
255 } else {
256 errors.add(errorWithCtx);
258 future.completeExceptionally(new RetriesExhaustedException(tries - 1, errors));
261 private void failAll(Stream<Action> actions, int tries, Throwable error, ServerName serverName) {
262 long currentTime = EnvironmentEdgeManager.currentTime();
263 String extras = getExtraContextForError(serverName);
264 actions.forEach(action -> failOne(action, tries, error, currentTime, extras));
267 private void failAll(Stream<Action> actions, int tries) {
268 actions.forEach(action -> {
269 CompletableFuture<T> future = action2Future.get(action);
270 if (future.isDone()) {
271 return;
273 future.completeExceptionally(new RetriesExhaustedException(tries,
274 Optional.ofNullable(removeErrors(action)).orElse(Collections.emptyList())));
278 private ClientProtos.MultiRequest buildReq(Map<byte[], RegionRequest> actionsByRegion,
279 List<CellScannable> cells, Map<Integer, Integer> indexMap) throws IOException {
280 ClientProtos.MultiRequest.Builder multiRequestBuilder = ClientProtos.MultiRequest.newBuilder();
281 ClientProtos.RegionAction.Builder regionActionBuilder = ClientProtos.RegionAction.newBuilder();
282 ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
283 ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
284 for (Map.Entry<byte[], RegionRequest> entry : actionsByRegion.entrySet()) {
285 long nonceGroup = conn.getNonceGenerator().getNonceGroup();
286 // multiRequestBuilder will be populated with region actions.
287 // indexMap will be non-empty after the call if there is RowMutations/CheckAndMutate in the
288 // action list.
289 RequestConverter.buildNoDataRegionActions(entry.getKey(),
290 entry.getValue().actions.stream()
291 .sorted((a1, a2) -> Integer.compare(a1.getOriginalIndex(), a2.getOriginalIndex()))
292 .collect(Collectors.toList()),
293 cells, multiRequestBuilder, regionActionBuilder, actionBuilder, mutationBuilder,
294 nonceGroup, indexMap);
296 return multiRequestBuilder.build();
299 @SuppressWarnings("unchecked")
300 private void onComplete(Action action, RegionRequest regionReq, int tries, ServerName serverName,
301 RegionResult regionResult, List<Action> failedActions, Throwable regionException,
302 MutableBoolean retryImmediately) {
303 Object result = regionResult.result.getOrDefault(action.getOriginalIndex(), regionException);
304 if (result == null) {
305 LOG.error("Server " + serverName + " sent us neither result nor exception for row '" +
306 Bytes.toStringBinary(action.getAction().getRow()) + "' of " +
307 regionReq.loc.getRegion().getRegionNameAsString());
308 addError(action, new RuntimeException("Invalid response"), serverName);
309 failedActions.add(action);
310 } else if (result instanceof Throwable) {
311 Throwable error = translateException((Throwable) result);
312 logException(tries, () -> Stream.of(regionReq), error, serverName);
313 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
314 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
315 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(),
316 getExtraContextForError(serverName));
317 } else {
318 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
319 retryImmediately.setTrue();
321 failedActions.add(action);
323 } else {
324 action2Future.get(action).complete((T) result);
328 private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
329 ServerName serverName, MultiResponse resp) {
330 ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
331 serverName, resp);
332 List<Action> failedActions = new ArrayList<>();
333 MutableBoolean retryImmediately = new MutableBoolean(false);
334 actionsByRegion.forEach((rn, regionReq) -> {
335 RegionResult regionResult = resp.getResults().get(rn);
336 Throwable regionException = resp.getException(rn);
337 if (regionResult != null) {
338 regionReq.actions.forEach(action -> onComplete(action, regionReq, tries, serverName,
339 regionResult, failedActions, regionException, retryImmediately));
340 } else {
341 Throwable error;
342 if (regionException == null) {
343 LOG.error("Server sent us neither results nor exceptions for {}",
344 Bytes.toStringBinary(rn));
345 error = new RuntimeException("Invalid response");
346 } else {
347 error = translateException(regionException);
349 logException(tries, () -> Stream.of(regionReq), error, serverName);
350 conn.getLocator().updateCachedLocationOnError(regionReq.loc, error);
351 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
352 failAll(regionReq.actions.stream(), tries, error, serverName);
353 return;
355 if (!retryImmediately.booleanValue() && error instanceof RetryImmediatelyException) {
356 retryImmediately.setTrue();
358 addError(regionReq.actions, error, serverName);
359 failedActions.addAll(regionReq.actions);
362 if (!failedActions.isEmpty()) {
363 tryResubmit(failedActions.stream(), tries, retryImmediately.booleanValue(), false);
367 private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
368 long remainingNs;
369 if (operationTimeoutNs > 0) {
370 remainingNs = remainingTimeNs();
371 if (remainingNs <= 0) {
372 failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
373 tries);
374 return;
376 } else {
377 remainingNs = Long.MAX_VALUE;
379 ClientService.Interface stub;
380 try {
381 stub = conn.getRegionServerStub(serverName);
382 } catch (IOException e) {
383 onError(serverReq.actionsByRegion, tries, e, serverName);
384 return;
386 ClientProtos.MultiRequest req;
387 List<CellScannable> cells = new ArrayList<>();
388 // Map from a created RegionAction to the original index for a RowMutations within
389 // the original list of actions. This will be used to process the results when there
390 // is RowMutations/CheckAndMutate in the action list.
391 Map<Integer, Integer> indexMap = new HashMap<>();
392 try {
393 req = buildReq(serverReq.actionsByRegion, cells, indexMap);
394 } catch (IOException e) {
395 onError(serverReq.actionsByRegion, tries, e, serverName);
396 return;
398 HBaseRpcController controller = conn.rpcControllerFactory.newController();
399 resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
400 calcPriority(serverReq.getPriority(), tableName));
401 if (!cells.isEmpty()) {
402 controller.setCellScanner(createCellScanner(cells));
404 stub.multi(controller, req, resp -> {
405 if (controller.failed()) {
406 onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
407 } else {
408 try {
409 onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
410 indexMap, resp, controller.cellScanner()));
411 } catch (Exception e) {
412 onError(serverReq.actionsByRegion, tries, e, serverName);
413 return;
419 // We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
420 // based on the load of the region server and the region.
421 private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
422 Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
423 Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
424 if (!optStats.isPresent()) {
425 actionsByServer.forEach((serverName, serverReq) -> {
426 metrics.ifPresent(MetricsConnection::incrNormalRunners);
427 sendToServer(serverName, serverReq, tries);
429 return;
431 ServerStatisticTracker stats = optStats.get();
432 ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
433 actionsByServer.forEach((serverName, serverReq) -> {
434 ServerStatistics serverStats = stats.getStats(serverName);
435 Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
436 serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
437 long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
438 groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
439 .setRegionRequest(regionName, regionReq);
441 groupByBackoff.forEach((backoff, sr) -> {
442 if (backoff > 0) {
443 metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
444 retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
445 TimeUnit.MILLISECONDS);
446 } else {
447 metrics.ifPresent(MetricsConnection::incrNormalRunners);
448 sendToServer(serverName, sr, tries);
454 private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Throwable t,
455 ServerName serverName) {
456 Throwable error = translateException(t);
457 logException(tries, () -> actionsByRegion.values().stream(), error, serverName);
458 actionsByRegion.forEach(
459 (rn, regionReq) -> conn.getLocator().updateCachedLocationOnError(regionReq.loc, error));
460 if (error instanceof DoNotRetryIOException || tries >= maxAttempts) {
461 failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error,
462 serverName);
463 return;
465 List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r -> r.actions.stream())
466 .collect(Collectors.toList());
467 addError(copiedActions, error, serverName);
468 tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
469 error instanceof CallQueueTooBigException);
472 private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
473 boolean isCallQueueTooBig) {
474 if (immediately) {
475 groupAndSend(actions, tries);
476 return;
478 long delayNs;
479 long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
480 if (operationTimeoutNs > 0) {
481 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
482 if (maxDelayNs <= 0) {
483 failAll(actions, tries);
484 return;
486 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
487 } else {
488 delayNs = getPauseTime(pauseNsToUse, tries - 1);
490 retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
493 private void groupAndSend(Stream<Action> actions, int tries) {
494 long locateTimeoutNs;
495 if (operationTimeoutNs > 0) {
496 locateTimeoutNs = remainingTimeNs();
497 if (locateTimeoutNs <= 0) {
498 failAll(actions, tries);
499 return;
501 } else {
502 locateTimeoutNs = -1L;
504 ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
505 ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
506 addListener(CompletableFuture.allOf(actions
507 .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
508 RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
509 if (error != null) {
510 error = unwrapCompletionException(translateException(error));
511 if (error instanceof DoNotRetryIOException) {
512 failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), "");
513 return;
515 addError(action, error, null);
516 locateFailed.add(action);
517 } else {
518 computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new).addAction(loc,
519 action);
522 .toArray(CompletableFuture[]::new)), (v, r) -> {
523 if (!actionsByServer.isEmpty()) {
524 sendOrDelay(actionsByServer, tries);
526 if (!locateFailed.isEmpty()) {
527 tryResubmit(locateFailed.stream(), tries, false, false);
532 public List<CompletableFuture<T>> call() {
533 groupAndSend(actions.stream(), 1);
534 return futures;