2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static java
.util
.stream
.Collectors
.toList
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.EMPTY_END_ROW
;
22 import static org
.apache
.hadoop
.hbase
.HConstants
.EMPTY_START_ROW
;
23 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
25 import java
.io
.IOException
;
26 import java
.lang
.reflect
.UndeclaredThrowableException
;
27 import java
.net
.InetAddress
;
28 import java
.net
.InetSocketAddress
;
29 import java
.net
.UnknownHostException
;
30 import java
.util
.Arrays
;
31 import java
.util
.List
;
32 import java
.util
.Optional
;
33 import java
.util
.concurrent
.CompletableFuture
;
34 import java
.util
.concurrent
.ExecutorService
;
35 import java
.util
.concurrent
.ThreadLocalRandom
;
36 import java
.util
.concurrent
.TimeUnit
;
37 import java
.util
.concurrent
.atomic
.AtomicReference
;
38 import java
.util
.function
.Function
;
39 import java
.util
.function
.Predicate
;
40 import java
.util
.function
.Supplier
;
41 import org
.apache
.hadoop
.conf
.Configuration
;
42 import org
.apache
.hadoop
.hbase
.Cell
;
43 import org
.apache
.hadoop
.hbase
.CellComparator
;
44 import org
.apache
.hadoop
.hbase
.HConstants
;
45 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
46 import org
.apache
.hadoop
.hbase
.PrivateCellUtil
;
47 import org
.apache
.hadoop
.hbase
.RegionLocations
;
48 import org
.apache
.hadoop
.hbase
.ServerName
;
49 import org
.apache
.hadoop
.hbase
.TableName
;
50 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
51 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
52 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
53 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
54 import org
.apache
.hadoop
.hbase
.util
.ReflectionUtils
;
55 import org
.apache
.hadoop
.ipc
.RemoteException
;
56 import org
.apache
.hadoop
.net
.DNS
;
57 import org
.apache
.yetus
.audience
.InterfaceAudience
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
62 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
63 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
64 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
65 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timer
;
67 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
68 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ResponseConverter
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
70 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
71 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanResponse
;
74 * Utility used by client connections.
76 @InterfaceAudience.Private
77 public final class ConnectionUtils
{
79 private static final Logger LOG
= LoggerFactory
.getLogger(ConnectionUtils
.class);
82 * Key for configuration in Configuration whose value is the class we implement making a new
83 * Connection instance.
85 public static final String HBASE_CLIENT_CONNECTION_IMPL
= "hbase.client.connection.impl";
87 private ConnectionUtils() {
91 * Calculate pause time. Built on {@link HConstants#RETRY_BACKOFF}.
92 * @param pause time to pause
93 * @param tries amount of tries
94 * @return How long to wait after <code>tries</code> retries
96 public static long getPauseTime(final long pause
, final int tries
) {
98 if (ntries
>= HConstants
.RETRY_BACKOFF
.length
) {
99 ntries
= HConstants
.RETRY_BACKOFF
.length
- 1;
105 long normalPause
= pause
* HConstants
.RETRY_BACKOFF
[ntries
];
106 // 1% possible jitter
107 long jitter
= (long) (normalPause
* ThreadLocalRandom
.current().nextFloat() * 0.01f
);
108 return normalPause
+ jitter
;
112 * Changes the configuration to set the number of retries needed when using Connection internally,
113 * e.g. for updating catalog tables, etc. Call this method before we create any Connections.
114 * @param c The Configuration instance to set the retries into.
115 * @param log Used to log what we set in here.
117 public static void setServerSideHConnectionRetriesConfig(final Configuration c
, final String sn
,
119 // TODO: Fix this. Not all connections from server side should have 10 times the retries.
120 int hcRetries
= c
.getInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
121 HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
);
122 // Go big. Multiply by 10. If we can't get to meta after this many retries
123 // then something seriously wrong.
124 int serversideMultiplier
= c
.getInt(HConstants
.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER
,
125 HConstants
.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER
);
126 int retries
= hcRetries
* serversideMultiplier
;
127 c
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, retries
);
128 log
.info(sn
+ " server-side Connection retries=" + retries
);
132 * Get a unique key for the rpc stub to the given server.
134 static String
getStubKey(String serviceName
, ServerName serverName
) {
135 return String
.format("%s@%s", serviceName
, serverName
);
139 * Return retires + 1. The returned value will be in range [1, Integer.MAX_VALUE].
141 static int retries2Attempts(int retries
) {
142 return Math
.max(1, retries
== Integer
.MAX_VALUE ? Integer
.MAX_VALUE
: retries
+ 1);
145 static void checkHasFamilies(Mutation mutation
) {
146 Preconditions
.checkArgument(mutation
.numFamilies() > 0,
147 "Invalid arguments to %s, zero columns specified", mutation
.toString());
150 /** Dummy nonce generator for disabled nonces. */
151 static final NonceGenerator NO_NONCE_GENERATOR
= new NonceGenerator() {
154 public long newNonce() {
155 return HConstants
.NO_NONCE
;
159 public long getNonceGroup() {
160 return HConstants
.NO_NONCE
;
164 // A byte array in which all elements are the max byte, and it is used to
165 // construct closest front row
166 static final byte[] MAX_BYTE_ARRAY
= Bytes
.createMaxByteArray(9);
169 * Create the closest row after the specified row
171 static byte[] createClosestRowAfter(byte[] row
) {
172 return Arrays
.copyOf(row
, row
.length
+ 1);
176 * Create a row before the specified row and very close to the specified row.
178 static byte[] createCloseRowBefore(byte[] row
) {
179 if (row
.length
== 0) {
180 return MAX_BYTE_ARRAY
;
182 if (row
[row
.length
- 1] == 0) {
183 return Arrays
.copyOf(row
, row
.length
- 1);
185 byte[] nextRow
= new byte[row
.length
+ MAX_BYTE_ARRAY
.length
];
186 System
.arraycopy(row
, 0, nextRow
, 0, row
.length
- 1);
187 nextRow
[row
.length
- 1] = (byte) ((row
[row
.length
- 1] & 0xFF) - 1);
188 System
.arraycopy(MAX_BYTE_ARRAY
, 0, nextRow
, row
.length
, MAX_BYTE_ARRAY
.length
);
193 static boolean isEmptyStartRow(byte[] row
) {
194 return Bytes
.equals(row
, EMPTY_START_ROW
);
197 static boolean isEmptyStopRow(byte[] row
) {
198 return Bytes
.equals(row
, EMPTY_END_ROW
);
201 static void resetController(HBaseRpcController controller
, long timeoutNs
, int priority
) {
203 if (timeoutNs
>= 0) {
204 controller
.setCallTimeout(
205 (int) Math
.min(Integer
.MAX_VALUE
, TimeUnit
.NANOSECONDS
.toMillis(timeoutNs
)));
207 controller
.setPriority(priority
);
210 static Throwable
translateException(Throwable t
) {
211 if (t
instanceof UndeclaredThrowableException
&& t
.getCause() != null) {
214 if (t
instanceof RemoteException
) {
215 t
= ((RemoteException
) t
).unwrapRemoteException();
217 if (t
instanceof ServiceException
&& t
.getCause() != null) {
218 t
= translateException(t
.getCause());
223 static long calcEstimatedSize(Result rs
) {
224 long estimatedHeapSizeOfResult
= 0;
225 // We don't make Iterator here
226 for (Cell cell
: rs
.rawCells()) {
227 estimatedHeapSizeOfResult
+= cell
.heapSize();
229 return estimatedHeapSizeOfResult
;
232 static Result
filterCells(Result result
, Cell keepCellsAfter
) {
233 if (keepCellsAfter
== null) {
234 // do not need to filter
238 if (!PrivateCellUtil
.matchingRows(keepCellsAfter
, result
.getRow(), 0, result
.getRow().length
)) {
241 Cell
[] rawCells
= result
.rawCells();
242 int index
= Arrays
.binarySearch(rawCells
, keepCellsAfter
,
243 CellComparator
.getInstance()::compareWithoutRow
);
252 if (index
== rawCells
.length
) {
255 return Result
.create(Arrays
.copyOfRange(rawCells
, index
, rawCells
.length
), null,
256 result
.isStale(), result
.mayHaveMoreCellsInRow());
259 // Add a delta to avoid timeout immediately after a retry sleeping.
260 static final long SLEEP_DELTA_NS
= TimeUnit
.MILLISECONDS
.toNanos(1);
262 static Get
toCheckExistenceOnly(Get get
) {
263 if (get
.isCheckExistenceOnly()) {
266 return ReflectionUtils
.newInstance(get
.getClass(), get
).setCheckExistenceOnly(true);
269 static List
<Get
> toCheckExistenceOnly(List
<Get
> gets
) {
270 return gets
.stream().map(ConnectionUtils
::toCheckExistenceOnly
).collect(toList());
273 static RegionLocateType
getLocateType(Scan scan
) {
274 if (scan
.isReversed()) {
275 if (isEmptyStartRow(scan
.getStartRow())) {
276 return RegionLocateType
.BEFORE
;
278 return scan
.includeStartRow() ? RegionLocateType
.CURRENT
: RegionLocateType
.BEFORE
;
281 return scan
.includeStartRow() ? RegionLocateType
.CURRENT
: RegionLocateType
.AFTER
;
285 static boolean noMoreResultsForScan(Scan scan
, RegionInfo info
) {
286 if (isEmptyStopRow(info
.getEndKey())) {
289 if (isEmptyStopRow(scan
.getStopRow())) {
292 int c
= Bytes
.compareTo(info
.getEndKey(), scan
.getStopRow());
293 // 1. if our stop row is less than the endKey of the region
294 // 2. if our stop row is equal to the endKey of the region and we do not include the stop row
296 return c
> 0 || (c
== 0 && !scan
.includeStopRow());
299 static boolean noMoreResultsForReverseScan(Scan scan
, RegionInfo info
) {
300 if (isEmptyStartRow(info
.getStartKey())) {
303 if (isEmptyStopRow(scan
.getStopRow())) {
306 // no need to test the inclusive of the stop row as the start key of a region is included in
308 return Bytes
.compareTo(info
.getStartKey(), scan
.getStopRow()) <= 0;
311 static <T
> CompletableFuture
<List
<T
>> allOf(List
<CompletableFuture
<T
>> futures
) {
312 return CompletableFuture
.allOf(futures
.toArray(new CompletableFuture
[0]))
313 .thenApply(v
-> futures
.stream().map(f
-> f
.getNow(null)).collect(toList()));
316 public static ScanResultCache
createScanResultCache(Scan scan
) {
317 if (scan
.getAllowPartialResults()) {
318 return new AllowPartialScanResultCache();
319 } else if (scan
.getBatch() > 0) {
320 return new BatchScanResultCache(scan
.getBatch());
322 return new CompleteScanResultCache();
326 private static final String MY_ADDRESS
= getMyAddress();
328 private static String
getMyAddress() {
330 return DNS
.getDefaultHost("default", "default");
331 } catch (UnknownHostException uhe
) {
332 LOG
.error("cannot determine my address", uhe
);
337 static boolean isRemote(String host
) {
338 return !host
.equalsIgnoreCase(MY_ADDRESS
);
341 static void incRPCCallsMetrics(ScanMetrics scanMetrics
, boolean isRegionServerRemote
) {
342 if (scanMetrics
== null) {
345 scanMetrics
.countOfRPCcalls
.incrementAndGet();
346 if (isRegionServerRemote
) {
347 scanMetrics
.countOfRemoteRPCcalls
.incrementAndGet();
351 static void incRPCRetriesMetrics(ScanMetrics scanMetrics
, boolean isRegionServerRemote
) {
352 if (scanMetrics
== null) {
355 scanMetrics
.countOfRPCRetries
.incrementAndGet();
356 if (isRegionServerRemote
) {
357 scanMetrics
.countOfRemoteRPCRetries
.incrementAndGet();
361 static void updateResultsMetrics(ScanMetrics scanMetrics
, Result
[] rrs
,
362 boolean isRegionServerRemote
) {
363 if (scanMetrics
== null || rrs
== null || rrs
.length
== 0) {
367 for (Result rr
: rrs
) {
368 for (Cell cell
: rr
.rawCells()) {
369 resultSize
+= PrivateCellUtil
.estimatedSerializedSizeOf(cell
);
372 scanMetrics
.countOfBytesInResults
.addAndGet(resultSize
);
373 if (isRegionServerRemote
) {
374 scanMetrics
.countOfBytesInRemoteResults
.addAndGet(resultSize
);
379 * Use the scan metrics returned by the server to add to the identically named counters in the
380 * client side metrics. If a counter does not exist with the same name as the server side metric,
381 * the attempt to increase the counter will fail.
383 static void updateServerSideMetrics(ScanMetrics scanMetrics
, ScanResponse response
) {
384 if (scanMetrics
== null || response
== null || !response
.hasScanMetrics()) {
387 ResponseConverter
.getScanMetrics(response
).forEach(scanMetrics
::addToCounter
);
390 static void incRegionCountMetrics(ScanMetrics scanMetrics
) {
391 if (scanMetrics
== null) {
394 scanMetrics
.countOfRegions
.incrementAndGet();
398 * Connect the two futures, if the src future is done, then mark the dst future as done. And if
399 * the dst future is done, then cancel the src future. This is used for timeline consistent read.
401 * Pass empty metrics if you want to link the primary future and the dst future so we will not
402 * increase the hedge read related metrics.
404 private static <T
> void connect(CompletableFuture
<T
> srcFuture
, CompletableFuture
<T
> dstFuture
,
405 Optional
<MetricsConnection
> metrics
) {
406 addListener(srcFuture
, (r
, e
) -> {
408 dstFuture
.completeExceptionally(e
);
410 if (dstFuture
.complete(r
)) {
411 metrics
.ifPresent(MetricsConnection
::incrHedgedReadWin
);
415 // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
416 // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
417 // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
418 // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
420 addListener(dstFuture
, (r
, e
) -> srcFuture
.cancel(false));
423 private static <T
> void sendRequestsToSecondaryReplicas(
424 Function
<Integer
, CompletableFuture
<T
>> requestReplica
, RegionLocations locs
,
425 CompletableFuture
<T
> future
, Optional
<MetricsConnection
> metrics
) {
426 if (future
.isDone()) {
427 // do not send requests to secondary replicas if the future is done, i.e, the primary request
428 // has already been finished.
431 for (int replicaId
= 1, n
= locs
.size(); replicaId
< n
; replicaId
++) {
432 CompletableFuture
<T
> secondaryFuture
= requestReplica
.apply(replicaId
);
433 metrics
.ifPresent(MetricsConnection
::incrHedgedReadOps
);
434 connect(secondaryFuture
, future
, metrics
);
438 static <T
> CompletableFuture
<T
> timelineConsistentRead(AsyncRegionLocator locator
,
439 TableName tableName
, Query query
, byte[] row
, RegionLocateType locateType
,
440 Function
<Integer
, CompletableFuture
<T
>> requestReplica
, long rpcTimeoutNs
,
441 long primaryCallTimeoutNs
, Timer retryTimer
, Optional
<MetricsConnection
> metrics
) {
442 if (query
.getConsistency() != Consistency
.TIMELINE
) {
443 return requestReplica
.apply(RegionReplicaUtil
.DEFAULT_REPLICA_ID
);
445 // user specifies a replica id explicitly, just send request to the specific replica
446 if (query
.getReplicaId() >= 0) {
447 return requestReplica
.apply(query
.getReplicaId());
449 // Timeline consistent read, where we may send requests to other region replicas
450 CompletableFuture
<T
> primaryFuture
= requestReplica
.apply(RegionReplicaUtil
.DEFAULT_REPLICA_ID
);
451 CompletableFuture
<T
> future
= new CompletableFuture
<>();
452 connect(primaryFuture
, future
, Optional
.empty());
453 long startNs
= System
.nanoTime();
454 // after the getRegionLocations, all the locations for the replicas of this region should have
455 // been cached, so it is not big deal to locate them again when actually sending requests to
457 addListener(locator
.getRegionLocations(tableName
, row
, locateType
, false, rpcTimeoutNs
),
461 "Failed to locate all the replicas for table={}, row='{}', locateType={}" +
462 " give up timeline consistent read",
463 tableName
, Bytes
.toStringBinary(row
), locateType
, error
);
466 if (locs
.size() <= 1) {
468 "There are no secondary replicas for region {}, give up timeline consistent read",
469 locs
.getDefaultRegionLocation().getRegion());
472 long delayNs
= primaryCallTimeoutNs
- (System
.nanoTime() - startNs
);
474 sendRequestsToSecondaryReplicas(requestReplica
, locs
, future
, metrics
);
476 retryTimer
.newTimeout(
477 timeout
-> sendRequestsToSecondaryReplicas(requestReplica
, locs
, future
, metrics
),
478 delayNs
, TimeUnit
.NANOSECONDS
);
484 // validate for well-formedness
485 static void validatePut(Put put
, int maxKeyValueSize
) {
487 throw new IllegalArgumentException("No columns to insert");
489 if (maxKeyValueSize
> 0) {
490 for (List
<Cell
> list
: put
.getFamilyCellMap().values()) {
491 for (Cell cell
: list
) {
492 if (cell
.getSerializedSize() > maxKeyValueSize
) {
493 throw new IllegalArgumentException("KeyValue size too large");
500 static void validatePutsInRowMutations(RowMutations rowMutations
, int maxKeyValueSize
) {
501 for (Mutation mutation
: rowMutations
.getMutations()) {
502 if (mutation
instanceof Put
) {
503 validatePut((Put
) mutation
, maxKeyValueSize
);
509 * Select the priority for the rpc call.
513 * <li>If user set a priority explicitly, then just use it.</li>
514 * <li>For system table, use {@link HConstants#SYSTEMTABLE_QOS}.</li>
515 * <li>For other tables, use {@link HConstants#NORMAL_QOS}.</li>
517 * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}.
518 * @param tableName the table we operate on
520 static int calcPriority(int priority
, TableName tableName
) {
521 if (priority
!= HConstants
.PRIORITY_UNSET
) {
524 return getPriority(tableName
);
528 static int getPriority(TableName tableName
) {
529 if (tableName
.isSystemTable()) {
530 return HConstants
.SYSTEMTABLE_QOS
;
532 return HConstants
.NORMAL_QOS
;
536 static <T
> CompletableFuture
<T
> getOrFetch(AtomicReference
<T
> cacheRef
,
537 AtomicReference
<CompletableFuture
<T
>> futureRef
, boolean reload
,
538 Supplier
<CompletableFuture
<T
>> fetch
, Predicate
<T
> validator
, String type
) {
541 T value
= cacheRef
.get();
542 if (value
!= null && validator
.test(value
)) {
543 return CompletableFuture
.completedFuture(value
);
546 LOG
.trace("{} cache is null, try fetching from registry", type
);
547 if (futureRef
.compareAndSet(null, new CompletableFuture
<>())) {
548 LOG
.debug("Start fetching {} from registry", type
);
549 CompletableFuture
<T
> future
= futureRef
.get();
550 addListener(fetch
.get(), (value
, error
) -> {
552 LOG
.debug("Failed to fetch {} from registry", type
, error
);
553 futureRef
.getAndSet(null).completeExceptionally(error
);
556 LOG
.debug("The fetched {} is {}", type
, value
);
557 // Here we update cache before reset future, so it is possible that someone can get a
558 // stale value. Consider this:
559 // 1. update cacheRef
560 // 2. someone clears the cache and relocates again
561 // 3. the futureRef is not null so the old future is used.
562 // 4. we clear futureRef and complete the future in it with the value being
563 // cleared in step 2.
564 // But we do not think it is a big deal as it rarely happens, and even if it happens, the
565 // caller will retry again later, no correctness problems.
568 future
.complete(value
);
572 CompletableFuture
<T
> future
= futureRef
.get();
573 if (future
!= null) {
580 static void updateStats(Optional
<ServerStatisticTracker
> optStats
,
581 Optional
<MetricsConnection
> optMetrics
, ServerName serverName
, MultiResponse resp
) {
582 if (!optStats
.isPresent() && !optMetrics
.isPresent()) {
583 // ServerStatisticTracker and MetricsConnection are both not present, just return
586 resp
.getResults().forEach((regionName
, regionResult
) -> {
587 ClientProtos
.RegionLoadStats stat
= regionResult
.getStat();
589 LOG
.error("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName
,
590 Bytes
.toStringBinary(regionName
));
593 RegionLoadStats regionLoadStats
= ProtobufUtil
.createRegionLoadStats(stat
);
595 stats
-> ResultStatsUtil
.updateStats(stats
, serverName
, regionName
, regionLoadStats
));
596 optMetrics
.ifPresent(
597 metrics
-> ResultStatsUtil
.updateStats(metrics
, serverName
, regionName
, regionLoadStats
));
602 interface Converter
<D
, I
, S
> {
603 D
convert(I info
, S src
) throws IOException
;
607 interface RpcCall
<RESP
, REQ
> {
608 void call(ClientService
.Interface stub
, HBaseRpcController controller
, REQ req
,
609 RpcCallback
<RESP
> done
);
612 static <REQ
, PREQ
, PRESP
, RESP
> CompletableFuture
<RESP
> call(HBaseRpcController controller
,
613 HRegionLocation loc
, ClientService
.Interface stub
, REQ req
,
614 Converter
<PREQ
, byte[], REQ
> reqConvert
, RpcCall
<PRESP
, PREQ
> rpcCall
,
615 Converter
<RESP
, HBaseRpcController
, PRESP
> respConverter
) {
616 CompletableFuture
<RESP
> future
= new CompletableFuture
<>();
618 rpcCall
.call(stub
, controller
, reqConvert
.convert(loc
.getRegion().getRegionName(), req
),
619 new RpcCallback
<PRESP
>() {
622 public void run(PRESP resp
) {
623 if (controller
.failed()) {
624 future
.completeExceptionally(controller
.getFailed());
627 future
.complete(respConverter
.convert(controller
, resp
));
628 } catch (IOException e
) {
629 future
.completeExceptionally(e
);
634 } catch (IOException e
) {
635 future
.completeExceptionally(e
);
640 static void shutdownPool(ExecutorService pool
) {
643 if (!pool
.awaitTermination(10, TimeUnit
.SECONDS
)) {
646 } catch (InterruptedException e
) {
651 static void setCoprocessorError(RpcController controller
, Throwable error
) {
652 if (controller
== null) {
655 if (controller
instanceof ServerRpcController
) {
656 if (error
instanceof IOException
) {
657 ((ServerRpcController
) controller
).setFailedOn((IOException
) error
);
659 ((ServerRpcController
) controller
).setFailedOn(new IOException(error
));
661 } else if (controller
instanceof ClientCoprocessorRpcController
) {
662 ((ClientCoprocessorRpcController
) controller
).setFailed(error
);
664 controller
.setFailed(error
.toString());