HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncSingleRequestRpcRetryingCaller.java
blob2a552c71b3dde22174c13fda345ab1326a1702ec
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
22 import java.io.IOException;
23 import java.util.Optional;
24 import java.util.concurrent.CompletableFuture;
25 import org.apache.hadoop.hbase.HRegionLocation;
26 import org.apache.hadoop.hbase.TableName;
27 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.apache.yetus.audience.InterfaceAudience;
31 import org.apache.hbase.thirdparty.io.netty.util.Timer;
33 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
35 /**
36 * Retry caller for a single request, such as get, put, delete, etc.
38 @InterfaceAudience.Private
39 class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
41 @FunctionalInterface
42 public interface Callable<T> {
43 CompletableFuture<T> call(HBaseRpcController controller, HRegionLocation loc,
44 ClientService.Interface stub);
47 private final TableName tableName;
49 private final byte[] row;
51 private final int replicaId;
53 private final RegionLocateType locateType;
55 private final Callable<T> callable;
57 public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
58 TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
59 Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs, int maxAttempts,
60 long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
61 super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
62 rpcTimeoutNs, startLogErrorsCnt);
63 this.tableName = tableName;
64 this.row = row;
65 this.replicaId = replicaId;
66 this.locateType = locateType;
67 this.callable = callable;
70 private void call(HRegionLocation loc) {
71 ClientService.Interface stub;
72 try {
73 stub = conn.getRegionServerStub(loc.getServerName());
74 } catch (IOException e) {
75 onError(e,
76 () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) +
77 "' in " + loc.getRegion().getEncodedName() + " of " + tableName + " failed",
78 err -> conn.getLocator().updateCachedLocationOnError(loc, err));
79 return;
81 resetCallTimeout();
82 addListener(callable.call(controller, loc, stub), (result, error) -> {
83 if (error != null) {
84 onError(error,
85 () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
86 loc.getRegion().getEncodedName() + " of " + tableName + " failed",
87 err -> conn.getLocator().updateCachedLocationOnError(loc, err));
88 return;
90 future.complete(result);
91 });
94 @Override
95 protected void doCall() {
96 long locateTimeoutNs;
97 if (operationTimeoutNs > 0) {
98 locateTimeoutNs = remainingTimeNs();
99 if (locateTimeoutNs <= 0) {
100 completeExceptionally();
101 return;
103 } else {
104 locateTimeoutNs = -1L;
106 addListener(
107 conn.getLocator().getRegionLocation(tableName, row, replicaId, locateType, locateTimeoutNs),
108 (loc, error) -> {
109 if (error != null) {
110 onError(error,
111 () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
113 return;
115 call(loc);
119 @Override
120 protected Optional<TableName> getTableName() {
121 return Optional.of(tableName);