3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.client
;
22 import com
.google
.common
.annotations
.VisibleForTesting
;
24 import java
.io
.IOException
;
25 import java
.io
.InterruptedIOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Collection
;
28 import java
.util
.Collections
;
29 import java
.util
.Date
;
30 import java
.util
.HashMap
;
31 import java
.util
.LinkedList
;
32 import java
.util
.List
;
35 import java
.util
.concurrent
.ConcurrentHashMap
;
36 import java
.util
.concurrent
.ExecutorService
;
37 import java
.util
.concurrent
.RejectedExecutionException
;
38 import java
.util
.concurrent
.TimeUnit
;
39 import java
.util
.concurrent
.atomic
.AtomicLong
;
41 import org
.apache
.commons
.logging
.Log
;
42 import org
.apache
.commons
.logging
.LogFactory
;
43 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
44 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
45 import org
.apache
.hadoop
.hbase
.HConstants
;
46 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
47 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
48 import org
.apache
.hadoop
.hbase
.RegionLocations
;
49 import org
.apache
.hadoop
.hbase
.RetryImmediatelyException
;
50 import org
.apache
.hadoop
.hbase
.ServerName
;
51 import org
.apache
.hadoop
.hbase
.TableName
;
52 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
53 import org
.apache
.hadoop
.hbase
.client
.backoff
.ServerStatistics
;
54 import org
.apache
.hadoop
.hbase
.client
.coprocessor
.Batch
;
55 import org
.apache
.hadoop
.hbase
.exceptions
.ClientExceptionsUtil
;
56 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
57 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
58 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
59 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
60 import org
.apache
.htrace
.Trace
;
63 * The context, and return value, for a single submit/submitAll call.
64 * Note on how this class (one AP submit) works. Initially, all requests are split into groups
65 * by server; request is sent to each server in parallel; the RPC calls are not async so a
66 * thread per server is used. Every time some actions fail, regions/locations might have
67 * changed, so we re-group them by server and region again and send these groups in parallel
68 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
69 * scheduling children. This is why lots of code doesn't require any synchronization.
71 @InterfaceAudience.Private
72 class AsyncRequestFutureImpl
<CResult
> implements AsyncRequestFuture
{
74 private static final Log LOG
= LogFactory
.getLog(AsyncRequestFutureImpl
.class);
76 private RetryingTimeTracker tracker
;
79 * Runnable (that can be submitted to thread pool) that waits for when it's time
80 * to issue replica calls, finds region replicas, groups the requests by replica and
81 * issues the calls (on separate threads, via sendMultiAction).
82 * This is done on a separate thread because we don't want to wait on user thread for
83 * our asynchronous call, and usually we have to wait before making replica calls.
85 private final class ReplicaCallIssuingRunnable
implements Runnable
{
86 private final long startTime
;
87 private final List
<Action
> initialActions
;
89 public ReplicaCallIssuingRunnable(List
<Action
> initialActions
, long startTime
) {
90 this.initialActions
= initialActions
;
91 this.startTime
= startTime
;
97 if (asyncProcess
.primaryCallTimeoutMicroseconds
> 0) {
99 done
= waitUntilDone(startTime
* 1000L + asyncProcess
.primaryCallTimeoutMicroseconds
);
100 } catch (InterruptedException ex
) {
101 LOG
.error("Replica thread was interrupted - no replica calls: " + ex
.getMessage());
105 if (done
) return; // Done within primary timeout
106 Map
<ServerName
, MultiAction
> actionsByServer
= new HashMap
<>();
107 List
<Action
> unknownLocActions
= new ArrayList
<>();
108 if (replicaGetIndices
== null) {
109 for (int i
= 0; i
< results
.length
; ++i
) {
110 addReplicaActions(i
, actionsByServer
, unknownLocActions
);
113 for (int replicaGetIndice
: replicaGetIndices
) {
114 addReplicaActions(replicaGetIndice
, actionsByServer
, unknownLocActions
);
117 if (!actionsByServer
.isEmpty()) {
118 sendMultiAction(actionsByServer
, 1, null, unknownLocActions
.isEmpty());
120 if (!unknownLocActions
.isEmpty()) {
121 actionsByServer
= new HashMap
<>();
122 for (Action action
: unknownLocActions
) {
123 addReplicaActionsAgain(action
, actionsByServer
);
125 // Some actions may have completely failed, they are handled inside addAgain.
126 if (!actionsByServer
.isEmpty()) {
127 sendMultiAction(actionsByServer
, 1, null, true);
133 * Add replica actions to action map by server.
134 * @param index Index of the original action.
135 * @param actionsByServer The map by server to add it to.
137 private void addReplicaActions(int index
, Map
<ServerName
, MultiAction
> actionsByServer
,
138 List
<Action
> unknownReplicaActions
) {
139 if (results
[index
] != null) return; // opportunistic. Never goes from non-null to null.
140 Action action
= initialActions
.get(index
);
141 RegionLocations loc
= findAllLocationsOrFail(action
, true);
142 if (loc
== null) return;
143 HRegionLocation
[] locs
= loc
.getRegionLocations();
144 if (locs
.length
== 1) {
145 LOG
.warn("No replicas found for " + action
.getAction());
148 synchronized (replicaResultLock
) {
149 // Don't run replica calls if the original has finished. We could do it e.g. if
150 // original has already failed before first replica call (unlikely given retries),
151 // but that would require additional synchronization w.r.t. returning to caller.
152 if (results
[index
] != null) return;
153 // We set the number of calls here. After that any path must call setResult/setError.
154 // True even for replicas that are not found - if we refuse to send we MUST set error.
155 results
[index
] = new ReplicaResultState(locs
.length
);
157 for (int i
= 1; i
< locs
.length
; ++i
) {
158 Action replicaAction
= new Action(action
, i
);
159 if (locs
[i
] != null) {
160 asyncProcess
.addAction(locs
[i
].getServerName(), locs
[i
].getRegionInfo().getRegionName(),
161 replicaAction
, actionsByServer
, nonceGroup
);
163 unknownReplicaActions
.add(replicaAction
);
168 private void addReplicaActionsAgain(
169 Action action
, Map
<ServerName
, MultiAction
> actionsByServer
) {
170 if (action
.getReplicaId() == RegionReplicaUtil
.DEFAULT_REPLICA_ID
) {
171 throw new AssertionError("Cannot have default replica here");
173 HRegionLocation loc
= getReplicaLocationOrFail(action
);
174 if (loc
== null) return;
175 asyncProcess
.addAction(loc
.getServerName(), loc
.getRegionInfo().getRegionName(),
176 action
, actionsByServer
, nonceGroup
);
181 * Runnable (that can be submitted to thread pool) that submits MultiAction to a
182 * single server. The server call is synchronous, therefore we do it on a thread pool.
184 private final class SingleServerRequestRunnable
implements Runnable
{
185 private final MultiAction multiAction
;
186 private final int numAttempt
;
187 private final ServerName server
;
188 private final Set
<CancellableRegionServerCallable
> callsInProgress
;
189 private Long heapSize
= null;
190 private SingleServerRequestRunnable(
191 MultiAction multiAction
, int numAttempt
, ServerName server
,
192 Set
<CancellableRegionServerCallable
> callsInProgress
) {
193 this.multiAction
= multiAction
;
194 this.numAttempt
= numAttempt
;
195 this.server
= server
;
196 this.callsInProgress
= callsInProgress
;
201 if (heapSize
!= null) {
205 for (Map
.Entry
<byte[], List
<Action
>> e
: this.multiAction
.actions
.entrySet()) {
206 List
<Action
> actions
= e
.getValue();
207 for (Action action
: actions
) {
208 Row row
= action
.getAction();
209 if (row
instanceof Mutation
) {
210 heapSize
+= ((Mutation
) row
).heapSize();
219 AbstractResponse res
= null;
220 CancellableRegionServerCallable callable
= currentCallable
;
222 // setup the callable based on the actions, if we don't have one already from the request
223 if (callable
== null) {
224 callable
= createCallable(server
, tableName
, multiAction
);
226 RpcRetryingCaller
<AbstractResponse
> caller
= asyncProcess
.createCaller(callable
,rpcTimeout
);
228 if (callsInProgress
!= null) {
229 callsInProgress
.add(callable
);
231 res
= caller
.callWithoutRetries(callable
, operationTimeout
);
236 } catch (IOException e
) {
237 // The service itself failed . It may be an error coming from the communication
238 // layer, but, as well, a functional error raised by the server.
239 receiveGlobalFailure(multiAction
, server
, numAttempt
, e
);
241 } catch (Throwable t
) {
242 // This should not happen. Let's log & retry anyway.
243 LOG
.error("#" + asyncProcess
.id
+ ", Caught throwable while calling. This is unexpected." +
244 " Retrying. Server is " + server
+ ", tableName=" + tableName
, t
);
245 receiveGlobalFailure(multiAction
, server
, numAttempt
, t
);
248 if (res
.type() == AbstractResponse
.ResponseType
.MULTI
) {
249 // Normal case: we received an answer from the server, and it's not an exception.
250 receiveMultiAction(multiAction
, server
, (MultiResponse
) res
, numAttempt
);
252 if (results
!= null) {
253 SingleResponse singleResponse
= (SingleResponse
) res
;
254 results
[0] = singleResponse
.getEntry();
258 } catch (Throwable t
) {
259 // Something really bad happened. We are on the send thread that will now die.
260 LOG
.error("Internal AsyncProcess #" + asyncProcess
.id
+ " error for "
261 + tableName
+ " processing for " + server
, t
);
262 throw new RuntimeException(t
);
264 asyncProcess
.decTaskCounters(multiAction
.getRegions(), server
);
265 if (callsInProgress
!= null && callable
!= null && res
!= null) {
266 callsInProgress
.remove(callable
);
272 private final Batch
.Callback
<CResult
> callback
;
273 private final BatchErrors errors
;
274 private final ConnectionImplementation
.ServerErrorTracker errorsByServer
;
275 private final ExecutorService pool
;
276 private final Set
<CancellableRegionServerCallable
> callsInProgress
;
279 private final TableName tableName
;
280 private final AtomicLong actionsInProgress
= new AtomicLong(-1);
282 * The lock controls access to results. It is only held when populating results where
283 * there might be several callers (eventual consistency gets). For other requests,
284 * there's one unique call going on per result index.
286 private final Object replicaResultLock
= new Object();
288 * Result array. Null if results are not needed. Otherwise, each index corresponds to
289 * the action index in initial actions submitted. For most request types, has null-s for
290 * requests that are not done, and result/exception for those that are done.
291 * For eventual-consistency gets, initially the same applies; at some point, replica calls
292 * might be started, and ReplicaResultState is put at the corresponding indices. The
293 * returning calls check the type to detect when this is the case. After all calls are done,
294 * ReplicaResultState-s are replaced with results for the user.
296 private final Object
[] results
;
298 * Indices of replica gets in results. If null, all or no actions are replica-gets.
300 private final int[] replicaGetIndices
;
301 private final boolean hasAnyReplicaGets
;
302 private final long nonceGroup
;
303 private final CancellableRegionServerCallable currentCallable
;
304 private final int operationTimeout
;
305 private final int rpcTimeout
;
306 private final Map
<ServerName
, List
<Long
>> heapSizesByServer
= new HashMap
<>();
307 private final AsyncProcess asyncProcess
;
310 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
311 * used to make logging more clear, we don't actually care why we don't retry.
317 NO_RETRIES_EXHAUSTED
,
321 /** Sync point for calls to multiple replicas for the same user request (Get).
322 * Created and put in the results array (we assume replica calls require results) when
323 * the replica calls are launched. See results for details of this process.
324 * POJO, all fields are public. To modify them, the object itself is locked. */
325 private static class ReplicaResultState
{
326 public ReplicaResultState(int callCount
) {
327 this.callCount
= callCount
;
330 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
332 /** Errors for which it is not decided whether we will report them to user. If one of the
333 * calls succeeds, we will discard the errors that may have happened in the other calls. */
334 BatchErrors replicaErrors
= null;
337 public String
toString() {
338 return "[call count " + callCount
+ "; errors " + replicaErrors
+ "]";
342 public AsyncRequestFutureImpl(AsyncProcessTask task
, List
<Action
> actions
,
343 long nonceGroup
, AsyncProcess asyncProcess
) {
344 this.pool
= task
.getPool();
345 this.callback
= task
.getCallback();
346 this.nonceGroup
= nonceGroup
;
347 this.tableName
= task
.getTableName();
348 this.actionsInProgress
.set(actions
.size());
349 if (task
.getResults() == null) {
350 results
= task
.getNeedResults() ?
new Object
[actions
.size()] : null;
352 if (task
.getResults().length
!= actions
.size()) {
353 throw new AssertionError("results.length");
355 this.results
= task
.getResults();
356 for (int i
= 0; i
!= this.results
.length
; ++i
) {
360 List
<Integer
> replicaGetIndices
= null;
361 boolean hasAnyReplicaGets
= false;
362 if (results
!= null) {
363 // Check to see if any requests might require replica calls.
364 // We expect that many requests will consist of all or no multi-replica gets; in such
365 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
366 // store the list of action indexes for which replica gets are possible, and set
367 // hasAnyReplicaGets to true.
368 boolean hasAnyNonReplicaReqs
= false;
370 for (Action action
: actions
) {
371 boolean isReplicaGet
= AsyncProcess
.isReplicaGet(action
.getAction());
373 hasAnyReplicaGets
= true;
374 if (hasAnyNonReplicaReqs
) { // Mixed case
375 if (replicaGetIndices
== null) {
376 replicaGetIndices
= new ArrayList
<>(actions
.size() - 1);
378 replicaGetIndices
.add(posInList
);
380 } else if (!hasAnyNonReplicaReqs
) {
381 // The first non-multi-replica request in the action list.
382 hasAnyNonReplicaReqs
= true;
384 // Add all the previous requests to the index lists. We know they are all
385 // replica-gets because this is the first non-multi-replica request in the list.
386 replicaGetIndices
= new ArrayList
<>(actions
.size() - 1);
387 for (int i
= 0; i
< posInList
; ++i
) {
388 replicaGetIndices
.add(i
);
395 this.hasAnyReplicaGets
= hasAnyReplicaGets
;
396 if (replicaGetIndices
!= null) {
397 this.replicaGetIndices
= new int[replicaGetIndices
.size()];
399 for (Integer el
: replicaGetIndices
) {
400 this.replicaGetIndices
[i
++] = el
;
403 this.replicaGetIndices
= null;
405 this.callsInProgress
= !hasAnyReplicaGets ?
null :
406 Collections
.newSetFromMap(
407 new ConcurrentHashMap
<CancellableRegionServerCallable
, Boolean
>());
408 this.asyncProcess
= asyncProcess
;
409 this.errorsByServer
= createServerErrorTracker();
410 this.errors
= (asyncProcess
.globalErrors
!= null)
411 ? asyncProcess
.globalErrors
: new BatchErrors();
412 this.operationTimeout
= task
.getOperationTimeout();
413 this.rpcTimeout
= task
.getRpcTimeout();
414 this.currentCallable
= task
.getCallable();
415 if (task
.getCallable() == null) {
416 tracker
= new RetryingTimeTracker().start();
421 protected Set
<CancellableRegionServerCallable
> getCallsInProgress() {
422 return callsInProgress
;
426 Map
<ServerName
, List
<Long
>> getRequestHeapSize() {
427 return heapSizesByServer
;
430 private SingleServerRequestRunnable
addSingleServerRequestHeapSize(ServerName server
,
431 SingleServerRequestRunnable runnable
) {
432 List
<Long
> heapCount
= heapSizesByServer
.get(server
);
433 if (heapCount
== null) {
434 heapCount
= new LinkedList
<>();
435 heapSizesByServer
.put(server
, heapCount
);
437 heapCount
.add(runnable
.heapSize());
441 * Group a list of actions per region servers, and send them.
443 * @param currentActions - the list of row to submit
444 * @param numAttempt - the current numAttempt (first attempt is 1)
446 void groupAndSendMultiAction(List
<Action
> currentActions
, int numAttempt
) {
447 Map
<ServerName
, MultiAction
> actionsByServer
= new HashMap
<>();
449 boolean isReplica
= false;
450 List
<Action
> unknownReplicaActions
= null;
451 for (Action action
: currentActions
) {
452 RegionLocations locs
= findAllLocationsOrFail(action
, true);
453 if (locs
== null) continue;
454 boolean isReplicaAction
= !RegionReplicaUtil
.isDefaultReplica(action
.getReplicaId());
455 if (isReplica
&& !isReplicaAction
) {
456 // This is the property of the current implementation, not a requirement.
457 throw new AssertionError("Replica and non-replica actions in the same retry");
459 isReplica
= isReplicaAction
;
460 HRegionLocation loc
= locs
.getRegionLocation(action
.getReplicaId());
461 if (loc
== null || loc
.getServerName() == null) {
463 if (unknownReplicaActions
== null) {
464 unknownReplicaActions
= new ArrayList
<>(1);
466 unknownReplicaActions
.add(action
);
468 // TODO: relies on primary location always being fetched
469 manageLocationError(action
, null);
472 byte[] regionName
= loc
.getRegionInfo().getRegionName();
473 AsyncProcess
.addAction(loc
.getServerName(), regionName
, action
, actionsByServer
, nonceGroup
);
476 boolean doStartReplica
= (numAttempt
== 1 && !isReplica
&& hasAnyReplicaGets
);
477 boolean hasUnknown
= unknownReplicaActions
!= null && !unknownReplicaActions
.isEmpty();
479 if (!actionsByServer
.isEmpty()) {
480 // If this is a first attempt to group and send, no replicas, we need replica thread.
481 sendMultiAction(actionsByServer
, numAttempt
, (doStartReplica
&& !hasUnknown
)
482 ? currentActions
: null, numAttempt
> 1 && !hasUnknown
);
486 actionsByServer
= new HashMap
<>();
487 for (Action action
: unknownReplicaActions
) {
488 HRegionLocation loc
= getReplicaLocationOrFail(action
);
489 if (loc
== null) continue;
490 byte[] regionName
= loc
.getRegionInfo().getRegionName();
491 AsyncProcess
.addAction(loc
.getServerName(), regionName
, action
, actionsByServer
, nonceGroup
);
493 if (!actionsByServer
.isEmpty()) {
495 actionsByServer
, numAttempt
, doStartReplica ? currentActions
: null, true);
500 private HRegionLocation
getReplicaLocationOrFail(Action action
) {
501 // We are going to try get location once again. For each action, we'll do it once
502 // from cache, because the previous calls in the loop might populate it.
503 int replicaId
= action
.getReplicaId();
504 RegionLocations locs
= findAllLocationsOrFail(action
, true);
505 if (locs
== null) return null; // manageError already called
506 HRegionLocation loc
= locs
.getRegionLocation(replicaId
);
507 if (loc
== null || loc
.getServerName() == null) {
508 locs
= findAllLocationsOrFail(action
, false);
509 if (locs
== null) return null; // manageError already called
510 loc
= locs
.getRegionLocation(replicaId
);
512 if (loc
== null || loc
.getServerName() == null) {
513 manageLocationError(action
, null);
519 private void manageLocationError(Action action
, Exception ex
) {
520 String msg
= "Cannot get replica " + action
.getReplicaId()
521 + " location for " + action
.getAction();
524 ex
= new IOException(msg
);
526 manageError(action
.getOriginalIndex(), action
.getAction(),
527 Retry
.NO_LOCATION_PROBLEM
, ex
, null);
530 private RegionLocations
findAllLocationsOrFail(Action action
, boolean useCache
) {
531 if (action
.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess
.id
+
532 ", row cannot be null");
533 RegionLocations loc
= null;
535 loc
= asyncProcess
.connection
.locateRegion(
536 tableName
, action
.getAction().getRow(), useCache
, true, action
.getReplicaId());
537 } catch (IOException ex
) {
538 manageLocationError(action
, ex
);
544 * Send a multi action structure to the servers, after a delay depending on the attempt
545 * number. Asynchronous.
547 * @param actionsByServer the actions structured by regions
548 * @param numAttempt the attempt number.
549 * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
551 void sendMultiAction(Map
<ServerName
, MultiAction
> actionsByServer
,
552 int numAttempt
, List
<Action
> actionsForReplicaThread
, boolean reuseThread
) {
553 // Run the last item on the same thread if we are already on a send thread.
554 // We hope most of the time it will be the only item, so we can cut down on threads.
555 int actionsRemaining
= actionsByServer
.size();
556 // This iteration is by server (the HRegionLocation comparator is by server portion only).
557 for (Map
.Entry
<ServerName
, MultiAction
> e
: actionsByServer
.entrySet()) {
558 ServerName server
= e
.getKey();
559 MultiAction multiAction
= e
.getValue();
560 Collection
<?
extends Runnable
> runnables
= getNewMultiActionRunnable(server
, multiAction
,
562 // make sure we correctly count the number of runnables before we try to reuse the send
563 // thread, in case we had to split the request into different runnables because of backoff
564 if (runnables
.size() > actionsRemaining
) {
565 actionsRemaining
= runnables
.size();
568 // run all the runnables
569 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow
570 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
571 for (Runnable runnable
: runnables
) {
572 if ((--actionsRemaining
== 0) && reuseThread
573 && numAttempt
% HConstants
.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER
!= 0) {
577 pool
.submit(runnable
);
578 } catch (Throwable t
) {
579 if (t
instanceof RejectedExecutionException
) {
580 // This should never happen. But as the pool is provided by the end user,
581 // let's secure this a little.
582 LOG
.warn("#" + asyncProcess
.id
+ ", the task was rejected by the pool. This is unexpected." +
583 " Server is " + server
.getServerName(), t
);
585 // see #HBASE-14359 for more details
586 LOG
.warn("Caught unexpected exception/error: ", t
);
588 asyncProcess
.decTaskCounters(multiAction
.getRegions(), server
);
589 // We're likely to fail again, but this will increment the attempt counter,
590 // so it will finish.
591 receiveGlobalFailure(multiAction
, server
, numAttempt
, t
);
597 if (actionsForReplicaThread
!= null) {
598 startWaitingForReplicaCalls(actionsForReplicaThread
);
602 private Collection
<?
extends Runnable
> getNewMultiActionRunnable(ServerName server
,
603 MultiAction multiAction
,
605 // no stats to manage, just do the standard action
606 if (asyncProcess
.connection
.getStatisticsTracker() == null) {
607 if (asyncProcess
.connection
.getConnectionMetrics() != null) {
608 asyncProcess
.connection
.getConnectionMetrics().incrNormalRunners();
610 asyncProcess
.incTaskCounters(multiAction
.getRegions(), server
);
611 SingleServerRequestRunnable runnable
= addSingleServerRequestHeapSize(server
,
612 new SingleServerRequestRunnable(multiAction
, numAttempt
, server
, callsInProgress
));
613 return Collections
.singletonList(Trace
.wrap("AsyncProcess.sendMultiAction", runnable
));
616 // group the actions by the amount of delay
617 Map
<Long
, DelayingRunner
> actions
= new HashMap
<>(multiAction
.size());
619 // split up the actions
620 for (Map
.Entry
<byte[], List
<Action
>> e
: multiAction
.actions
.entrySet()) {
621 Long backoff
= getBackoff(server
, e
.getKey());
622 DelayingRunner runner
= actions
.get(backoff
);
623 if (runner
== null) {
624 actions
.put(backoff
, new DelayingRunner(backoff
, e
));
630 List
<Runnable
> toReturn
= new ArrayList
<>(actions
.size());
631 for (DelayingRunner runner
: actions
.values()) {
632 asyncProcess
.incTaskCounters(runner
.getActions().getRegions(), server
);
633 String traceText
= "AsyncProcess.sendMultiAction";
634 Runnable runnable
= addSingleServerRequestHeapSize(server
,
635 new SingleServerRequestRunnable(runner
.getActions(), numAttempt
, server
, callsInProgress
));
636 // use a delay runner only if we need to sleep for some time
637 if (runner
.getSleepTime() > 0) {
638 runner
.setRunner(runnable
);
639 traceText
= "AsyncProcess.clientBackoff.sendMultiAction";
641 if (asyncProcess
.connection
.getConnectionMetrics() != null) {
642 asyncProcess
.connection
.getConnectionMetrics().incrDelayRunners();
643 asyncProcess
.connection
.getConnectionMetrics().updateDelayInterval(runner
.getSleepTime());
646 if (asyncProcess
.connection
.getConnectionMetrics() != null) {
647 asyncProcess
.connection
.getConnectionMetrics().incrNormalRunners();
650 runnable
= Trace
.wrap(traceText
, runnable
);
651 toReturn
.add(runnable
);
658 * @param server server location where the target region is hosted
659 * @param regionName name of the region which we are going to write some data
660 * @return the amount of time the client should wait until it submit a request to the
661 * specified server and region
663 private Long
getBackoff(ServerName server
, byte[] regionName
) {
664 ServerStatisticTracker tracker
= asyncProcess
.connection
.getStatisticsTracker();
665 ServerStatistics stats
= tracker
.getStats(server
);
666 return asyncProcess
.connection
.getBackoffPolicy()
667 .getBackoffTime(server
, regionName
, stats
);
671 * Starts waiting to issue replica calls on a different thread; or issues them immediately.
673 private void startWaitingForReplicaCalls(List
<Action
> actionsForReplicaThread
) {
674 long startTime
= EnvironmentEdgeManager
.currentTime();
675 ReplicaCallIssuingRunnable replicaRunnable
= new ReplicaCallIssuingRunnable(
676 actionsForReplicaThread
, startTime
);
677 if (asyncProcess
.primaryCallTimeoutMicroseconds
== 0) {
678 // Start replica calls immediately.
679 replicaRunnable
.run();
681 // Start the thread that may kick off replica gets.
682 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
684 pool
.submit(replicaRunnable
);
685 } catch (RejectedExecutionException ree
) {
686 LOG
.warn("#" + asyncProcess
.id
+ ", replica task was rejected by the pool - no replica calls", ree
);
692 * Check that we can retry acts accordingly: logs, set the error status.
694 * @param originalIndex the position in the list sent
696 * @param canRetry if false, we won't retry whatever the settings.
697 * @param throwable the throwable, if any (can be null)
698 * @param server the location, if any (can be null)
699 * @return true if the action can be retried, false otherwise.
701 Retry
manageError(int originalIndex
, Row row
, Retry canRetry
,
702 Throwable throwable
, ServerName server
) {
703 if (canRetry
== Retry
.YES
704 && throwable
!= null && throwable
instanceof DoNotRetryIOException
) {
705 canRetry
= Retry
.NO_NOT_RETRIABLE
;
708 if (canRetry
!= Retry
.YES
) {
709 // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
710 setError(originalIndex
, row
, throwable
, server
);
711 } else if (isActionComplete(originalIndex
, row
)) {
712 canRetry
= Retry
.NO_OTHER_SUCCEEDED
;
718 * Resubmit all the actions from this multiaction after a failure.
720 * @param rsActions the actions still to do from the initial list
721 * @param server the destination
722 * @param numAttempt the number of attempts so far
723 * @param t the throwable (if any) that caused the resubmit
725 private void receiveGlobalFailure(
726 MultiAction rsActions
, ServerName server
, int numAttempt
, Throwable t
) {
727 errorsByServer
.reportServerError(server
);
728 Retry canRetry
= errorsByServer
.canTryMore(numAttempt
)
729 ? Retry
.YES
: Retry
.NO_RETRIES_EXHAUSTED
;
731 if (tableName
== null && ClientExceptionsUtil
.isMetaClearingException(t
)) {
732 // tableName is null when we made a cross-table RPC call.
733 asyncProcess
.connection
.clearCaches(server
);
735 int failed
= 0, stopped
= 0;
736 List
<Action
> toReplay
= new ArrayList
<>();
737 for (Map
.Entry
<byte[], List
<Action
>> e
: rsActions
.actions
.entrySet()) {
738 byte[] regionName
= e
.getKey();
739 byte[] row
= e
.getValue().iterator().next().getAction().getRow();
740 // Do not use the exception for updating cache because it might be coming from
741 // any of the regions in the MultiAction.
743 if (tableName
!= null) {
744 asyncProcess
.connection
.updateCachedLocations(tableName
, regionName
, row
,
745 ClientExceptionsUtil
.isMetaClearingException(t
) ?
null : t
, server
);
747 } catch (Throwable ex
) {
748 // That should never happen, but if it did, we want to make sure
749 // we still process errors
750 LOG
.error("Couldn't update cached region locations: " + ex
);
752 for (Action action
: e
.getValue()) {
753 Retry retry
= manageError(
754 action
.getOriginalIndex(), action
.getAction(), canRetry
, t
, server
);
755 if (retry
== Retry
.YES
) {
756 toReplay
.add(action
);
757 } else if (retry
== Retry
.NO_OTHER_SUCCEEDED
) {
765 if (toReplay
.isEmpty()) {
766 logNoResubmit(server
, numAttempt
, rsActions
.size(), t
, failed
, stopped
);
768 resubmit(server
, toReplay
, numAttempt
, rsActions
.size(), t
);
773 * Log as much info as possible, and, if there is something to replay,
774 * submit it again after a back off sleep.
776 private void resubmit(ServerName oldServer
, List
<Action
> toReplay
,
777 int numAttempt
, int failureCount
, Throwable throwable
) {
778 // We have something to replay. We're going to sleep a little before.
780 // We have two contradicting needs here:
781 // 1) We want to get the new location after having slept, as it may change.
782 // 2) We want to take into account the location when calculating the sleep time.
783 // 3) If all this is just because the response needed to be chunked try again FAST.
784 // It should be possible to have some heuristics to take the right decision. Short term,
786 boolean retryImmediately
= throwable
instanceof RetryImmediatelyException
;
787 int nextAttemptNumber
= retryImmediately ? numAttempt
: numAttempt
+ 1;
789 if (retryImmediately
) {
791 } else if (throwable
instanceof CallQueueTooBigException
) {
792 // Give a special check on CQTBE, see #HBASE-17114
793 backOffTime
= errorsByServer
.calculateBackoffTime(oldServer
, asyncProcess
.pauseForCQTBE
);
795 backOffTime
= errorsByServer
.calculateBackoffTime(oldServer
, asyncProcess
.pause
);
797 if (numAttempt
> asyncProcess
.startLogErrorsCnt
) {
798 // We use this value to have some logs when we have multiple failures, but not too many
799 // logs, as errors are to be expected when a region moves, splits and so on
800 LOG
.info(createLog(numAttempt
, failureCount
, toReplay
.size(),
801 oldServer
, throwable
, backOffTime
, true, null, -1, -1));
805 if (backOffTime
> 0) {
806 Thread
.sleep(backOffTime
);
808 } catch (InterruptedException e
) {
809 LOG
.warn("#" + asyncProcess
.id
+ ", not sent: " + toReplay
.size() + " operations, " + oldServer
, e
);
810 Thread
.currentThread().interrupt();
814 groupAndSendMultiAction(toReplay
, nextAttemptNumber
);
817 private void logNoResubmit(ServerName oldServer
, int numAttempt
,
818 int failureCount
, Throwable throwable
, int failed
, int stopped
) {
819 if (failureCount
!= 0 || numAttempt
> asyncProcess
.startLogErrorsCnt
+ 1) {
820 String timeStr
= new Date(errorsByServer
.getStartTrackingTime()).toString();
821 String logMessage
= createLog(numAttempt
, failureCount
, 0, oldServer
,
822 throwable
, -1, false, timeStr
, failed
, stopped
);
824 // Only log final failures as warning
825 LOG
.warn(logMessage
);
827 LOG
.info(logMessage
);
833 * Called when we receive the result of a server query.
835 * @param multiAction - the multiAction we sent
836 * @param server - the location. It's used as a server name.
837 * @param responses - the response, if any
838 * @param numAttempt - the attempt
840 private void receiveMultiAction(MultiAction multiAction
,
841 ServerName server
, MultiResponse responses
, int numAttempt
) {
842 assert responses
!= null;
844 // Success or partial success
845 // Analyze detailed results. We can still have individual failures to be redo.
846 // two specific throwables are managed:
847 // - DoNotRetryIOException: we continue to retry for other actions
848 // - RegionMovedException: we update the cache with the new region location
850 List
<Action
> toReplay
= new ArrayList
<>();
851 Throwable throwable
= null;
852 int failureCount
= 0;
853 boolean canRetry
= true;
855 Map
<byte[], MultiResponse
.RegionResult
> results
= responses
.getResults();
856 updateStats(server
, results
);
858 int failed
= 0, stopped
= 0;
859 // Go by original action.
860 for (Map
.Entry
<byte[], List
<Action
>> regionEntry
: multiAction
.actions
.entrySet()) {
861 byte[] regionName
= regionEntry
.getKey();
862 Map
<Integer
, Object
> regionResults
= results
.get(regionName
) == null
863 ?
null : results
.get(regionName
).result
;
864 if (regionResults
== null) {
865 if (!responses
.getExceptions().containsKey(regionName
)) {
866 LOG
.error("Server sent us neither results nor exceptions for "
867 + Bytes
.toStringBinary(regionName
));
868 responses
.getExceptions().put(regionName
, new RuntimeException("Invalid response"));
872 boolean regionFailureRegistered
= false;
873 for (Action sentAction
: regionEntry
.getValue()) {
874 Object result
= regionResults
.get(sentAction
.getOriginalIndex());
875 // Failure: retry if it's make sense else update the errors lists
876 if (result
== null || result
instanceof Throwable
) {
877 Row row
= sentAction
.getAction();
878 throwable
= ClientExceptionsUtil
.findException(result
);
879 // Register corresponding failures once per server/once per region.
880 if (!regionFailureRegistered
) {
881 regionFailureRegistered
= true;
883 asyncProcess
.connection
.updateCachedLocations(
884 tableName
, regionName
, row
.getRow(), result
, server
);
885 } catch (Throwable ex
) {
886 // That should never happen, but if it did, we want to make sure
887 // we still process errors
888 LOG
.error("Couldn't update cached region locations: " + ex
);
891 if (failureCount
== 0) {
892 errorsByServer
.reportServerError(server
);
893 // We determine canRetry only once for all calls, after reporting server failure.
894 canRetry
= errorsByServer
.canTryMore(numAttempt
);
897 Retry retry
= manageError(sentAction
.getOriginalIndex(), row
,
898 canRetry ? Retry
.YES
: Retry
.NO_RETRIES_EXHAUSTED
, (Throwable
) result
, server
);
899 if (retry
== Retry
.YES
) {
900 toReplay
.add(sentAction
);
901 } else if (retry
== Retry
.NO_OTHER_SUCCEEDED
) {
907 if (callback
!= null) {
909 //noinspection unchecked
910 // TODO: would callback expect a replica region name if it gets one?
911 this.callback
.update(regionName
, sentAction
.getAction().getRow(), (CResult
) result
);
912 } catch (Throwable t
) {
913 LOG
.error("User callback threw an exception for "
914 + Bytes
.toStringBinary(regionName
) + ", ignoring", t
);
917 setResult(sentAction
, result
);
922 // The failures global to a region. We will use for multiAction we sent previously to find the
923 // actions to replay.
924 for (Map
.Entry
<byte[], Throwable
> throwableEntry
: responses
.getExceptions().entrySet()) {
925 throwable
= throwableEntry
.getValue();
926 byte[] region
= throwableEntry
.getKey();
927 List
<Action
> actions
= multiAction
.actions
.get(region
);
928 if (actions
== null || actions
.isEmpty()) {
929 throw new IllegalStateException("Wrong response for the region: " +
930 HRegionInfo
.encodeRegionName(region
));
933 if (failureCount
== 0) {
934 errorsByServer
.reportServerError(server
);
935 canRetry
= errorsByServer
.canTryMore(numAttempt
);
937 if (null == tableName
&& ClientExceptionsUtil
.isMetaClearingException(throwable
)) {
938 // For multi-actions, we don't have a table name, but we want to make sure to clear the
939 // cache in case there were location-related exceptions. We don't to clear the cache
940 // for every possible exception that comes through, however.
941 asyncProcess
.connection
.clearCaches(server
);
944 asyncProcess
.connection
.updateCachedLocations(
945 tableName
, region
, actions
.get(0).getAction().getRow(), throwable
, server
);
946 } catch (Throwable ex
) {
947 // That should never happen, but if it did, we want to make sure
948 // we still process errors
949 LOG
.error("Couldn't update cached region locations: " + ex
);
952 failureCount
+= actions
.size();
954 for (Action action
: actions
) {
955 Row row
= action
.getAction();
956 Retry retry
= manageError(action
.getOriginalIndex(), row
,
957 canRetry ? Retry
.YES
: Retry
.NO_RETRIES_EXHAUSTED
, throwable
, server
);
958 if (retry
== Retry
.YES
) {
959 toReplay
.add(action
);
960 } else if (retry
== Retry
.NO_OTHER_SUCCEEDED
) {
967 if (toReplay
.isEmpty()) {
968 logNoResubmit(server
, numAttempt
, failureCount
, throwable
, failed
, stopped
);
970 resubmit(server
, toReplay
, numAttempt
, failureCount
, throwable
);
975 protected void updateStats(ServerName server
, Map
<byte[], MultiResponse
.RegionResult
> results
) {
976 boolean metrics
= asyncProcess
.connection
.getConnectionMetrics() != null;
977 boolean stats
= asyncProcess
.connection
.getStatisticsTracker() != null;
978 if (!stats
&& !metrics
) {
981 for (Map
.Entry
<byte[], MultiResponse
.RegionResult
> regionStats
: results
.entrySet()) {
982 byte[] regionName
= regionStats
.getKey();
983 ClientProtos
.RegionLoadStats stat
= regionStats
.getValue().getStat();
985 LOG
.error("No ClientProtos.RegionLoadStats found for server=" + server
986 + ", region=" + Bytes
.toStringBinary(regionName
));
989 RegionLoadStats regionLoadstats
= ProtobufUtil
.createRegionLoadStats(stat
);
990 ResultStatsUtil
.updateStats(asyncProcess
.connection
.getStatisticsTracker(), server
,
991 regionName
, regionLoadstats
);
992 ResultStatsUtil
.updateStats(asyncProcess
.connection
.getConnectionMetrics(),
993 server
, regionName
, regionLoadstats
);
998 private String
createLog(int numAttempt
, int failureCount
, int replaySize
, ServerName sn
,
999 Throwable error
, long backOffTime
, boolean willRetry
, String startTime
,
1000 int failed
, int stopped
) {
1001 StringBuilder sb
= new StringBuilder();
1002 sb
.append("#").append(asyncProcess
.id
).append(", table=").append(tableName
).append(", ")
1003 .append("attempt=").append(numAttempt
)
1004 .append("/").append(asyncProcess
.numTries
).append(" ");
1006 if (failureCount
> 0 || error
!= null){
1007 sb
.append("failed=").append(failureCount
).append("ops").append(", last exception: ").
1008 append(error
== null ?
"null" : error
);
1010 sb
.append("succeeded");
1013 sb
.append(" on ").append(sn
).append(", tracking started ").append(startTime
);
1016 sb
.append(", retrying after=").append(backOffTime
).append("ms").
1017 append(", replay=").append(replaySize
).append("ops");
1018 } else if (failureCount
> 0) {
1020 sb
.append("; not retrying ").append(stopped
).append(" due to success from other replica");
1023 sb
.append("; not retrying ").append(failed
).append(" - final failure");
1028 return sb
.toString();
1032 * Sets the non-error result from a particular action.
1033 * @param action Action (request) that the server responded to.
1034 * @param result The result.
1036 private void setResult(Action action
, Object result
) {
1037 if (result
== null) {
1038 throw new RuntimeException("Result cannot be null");
1040 ReplicaResultState state
= null;
1041 boolean isStale
= !RegionReplicaUtil
.isDefaultReplica(action
.getReplicaId());
1042 int index
= action
.getOriginalIndex();
1043 if (results
== null) {
1044 decActionCounter(index
);
1045 return; // Simple case, no replica requests.
1047 state
= trySetResultSimple(index
, action
.getAction(), false, result
, null, isStale
);
1048 if (state
== null) {
1049 return; // Simple case, no replica requests.
1051 // At this point we know that state is set to replica tracking class.
1052 // It could be that someone else is also looking at it; however, we know there can
1053 // only be one state object, and only one thread can set callCount to 0. Other threads
1054 // will either see state with callCount 0 after locking it; or will not see state at all
1055 // we will replace it with the result.
1056 synchronized (state
) {
1057 if (state
.callCount
== 0) {
1058 return; // someone already set the result
1060 state
.callCount
= 0;
1062 synchronized (replicaResultLock
) {
1063 if (results
[index
] != state
) {
1064 throw new AssertionError("We set the callCount but someone else replaced the result");
1066 results
[index
] = result
;
1069 decActionCounter(index
);
1073 * Sets the error from a particular action.
1074 * @param index Original action index.
1075 * @param row Original request.
1076 * @param throwable The resulting error.
1077 * @param server The source server.
1079 private void setError(int index
, Row row
, Throwable throwable
, ServerName server
) {
1080 ReplicaResultState state
= null;
1081 if (results
== null) {
1082 // Note that we currently cannot have replica requests with null results. So it shouldn't
1083 // happen that multiple replica calls will call dAC for same actions with results == null.
1084 // Only one call per action should be present in this case.
1085 errors
.add(throwable
, row
, server
);
1086 decActionCounter(index
);
1087 return; // Simple case, no replica requests.
1089 state
= trySetResultSimple(index
, row
, true, throwable
, server
, false);
1090 if (state
== null) {
1091 return; // Simple case, no replica requests.
1093 BatchErrors target
= null; // Error will be added to final errors, or temp replica errors.
1094 boolean isActionDone
= false;
1095 synchronized (state
) {
1096 switch (state
.callCount
) {
1097 case 0: return; // someone already set the result
1098 case 1: { // All calls failed, we are the last error.
1100 isActionDone
= true;
1104 assert state
.callCount
> 1;
1105 if (state
.replicaErrors
== null) {
1106 state
.replicaErrors
= new BatchErrors();
1108 target
= state
.replicaErrors
;
1114 target
.add(throwable
, row
, server
);
1116 if (state
.replicaErrors
!= null) { // last call, no need to lock
1117 errors
.merge(state
.replicaErrors
);
1119 // See setResult for explanations.
1120 synchronized (replicaResultLock
) {
1121 if (results
[index
] != state
) {
1122 throw new AssertionError("We set the callCount but someone else replaced the result");
1124 results
[index
] = throwable
;
1126 decActionCounter(index
);
1131 * Checks if the action is complete; used on error to prevent needless retries.
1132 * Does not synchronize, assuming element index/field accesses are atomic.
1133 * This is an opportunistic optimization check, doesn't have to be strict.
1134 * @param index Original action index.
1135 * @param row Original request.
1137 private boolean isActionComplete(int index
, Row row
) {
1138 if (!AsyncProcess
.isReplicaGet(row
)) return false;
1139 Object resObj
= results
[index
];
1140 return (resObj
!= null) && (!(resObj
instanceof ReplicaResultState
)
1141 || ((ReplicaResultState
)resObj
).callCount
== 0);
1145 * Tries to set the result or error for a particular action as if there were no replica calls.
1146 * @return null if successful; replica state if there were in fact replica calls.
1148 private ReplicaResultState
trySetResultSimple(int index
, Row row
, boolean isError
,
1149 Object result
, ServerName server
, boolean isFromReplica
) {
1150 Object resObj
= null;
1151 if (!AsyncProcess
.isReplicaGet(row
)) {
1152 if (isFromReplica
) {
1153 throw new AssertionError("Unexpected stale result for " + row
);
1155 results
[index
] = result
;
1157 synchronized (replicaResultLock
) {
1158 resObj
= results
[index
];
1159 if (resObj
== null) {
1160 if (isFromReplica
) {
1161 throw new AssertionError("Unexpected stale result for " + row
);
1163 results
[index
] = result
;
1168 ReplicaResultState rrs
=
1169 (resObj
instanceof ReplicaResultState
) ?
(ReplicaResultState
)resObj
: null;
1170 if (rrs
== null && isError
) {
1171 // The resObj is not replica state (null or already set).
1172 errors
.add((Throwable
)result
, row
, server
);
1175 if (resObj
== null) {
1176 // resObj is null - no replica calls were made.
1177 decActionCounter(index
);
1183 private void decActionCounter(int index
) {
1184 long actionsRemaining
= actionsInProgress
.decrementAndGet();
1185 if (actionsRemaining
< 0) {
1186 String error
= buildDetailedErrorMsg("Incorrect actions in progress", index
);
1187 throw new AssertionError(error
);
1188 } else if (actionsRemaining
== 0) {
1189 synchronized (actionsInProgress
) {
1190 actionsInProgress
.notifyAll();
1195 private String
buildDetailedErrorMsg(String string
, int index
) {
1196 StringBuilder error
= new StringBuilder(128);
1197 error
.append(string
).append("; called for ").append(index
).append(", actionsInProgress ")
1198 .append(actionsInProgress
.get()).append("; replica gets: ");
1199 if (replicaGetIndices
!= null) {
1200 for (int i
= 0; i
< replicaGetIndices
.length
; ++i
) {
1201 error
.append(replicaGetIndices
[i
]).append(", ");
1204 error
.append(hasAnyReplicaGets ?
"all" : "none");
1206 error
.append("; results ");
1207 if (results
!= null) {
1208 for (int i
= 0; i
< results
.length
; ++i
) {
1209 Object o
= results
[i
];
1210 error
.append(((o
== null) ?
"null" : o
.toString())).append(", ");
1213 return error
.toString();
1217 public void waitUntilDone() throws InterruptedIOException
{
1219 waitUntilDone(Long
.MAX_VALUE
);
1220 } catch (InterruptedException iex
) {
1221 throw new InterruptedIOException(iex
.getMessage());
1223 if (callsInProgress
!= null) {
1224 for (CancellableRegionServerCallable clb
: callsInProgress
) {
1231 private boolean waitUntilDone(long cutoff
) throws InterruptedException
{
1232 boolean hasWait
= cutoff
!= Long
.MAX_VALUE
;
1233 long lastLog
= EnvironmentEdgeManager
.currentTime();
1234 long currentInProgress
;
1235 while (0 != (currentInProgress
= actionsInProgress
.get())) {
1236 long now
= EnvironmentEdgeManager
.currentTime();
1237 if (hasWait
&& (now
* 1000L) > cutoff
) {
1240 if (!hasWait
) { // Only log if wait is infinite.
1241 if (now
> lastLog
+ 10000) {
1243 LOG
.info("#" + asyncProcess
.id
+ ", waiting for " + currentInProgress
1244 + " actions to finish on table: " + tableName
);
1247 synchronized (actionsInProgress
) {
1248 if (actionsInProgress
.get() == 0) break;
1250 actionsInProgress
.wait(10);
1252 long waitMicroSecond
= Math
.min(100000L, (cutoff
- now
* 1000L));
1253 TimeUnit
.MICROSECONDS
.timedWait(actionsInProgress
, waitMicroSecond
);
1261 public boolean hasError() {
1262 return errors
.hasErrors();
1266 public List
<?
extends Row
> getFailedOperations() {
1267 return errors
.actions
;
1271 public RetriesExhaustedWithDetailsException
getErrors() {
1272 return errors
.makeException(asyncProcess
.logBatchErrorDetails
);
1276 public Object
[] getResults() throws InterruptedIOException
{
1282 * Creates the server error tracker to use inside process.
1283 * Currently, to preserve the main assumption about current retries, and to work well with
1284 * the retry-limit-based calculation, the calculation is local per Process object.
1285 * We may benefit from connection-wide tracking of server errors.
1286 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1288 private ConnectionImplementation
.ServerErrorTracker
createServerErrorTracker() {
1289 return new ConnectionImplementation
.ServerErrorTracker(
1290 asyncProcess
.serverTrackerTimeout
, asyncProcess
.numTries
);
1294 * Create a callable. Isolated to be easily overridden in the tests.
1296 private MultiServerCallable
createCallable(final ServerName server
, TableName tableName
,
1297 final MultiAction multi
) {
1298 return new MultiServerCallable(asyncProcess
.connection
, tableName
, server
,
1299 multi
, asyncProcess
.rpcFactory
.newController(), rpcTimeout
, tracker
);