HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncRequestFutureImpl.java
blob41431bbd1349d823fcb1858460876f512f7d299b
1 /*
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
20 package org.apache.hadoop.hbase.client;
22 import com.google.common.annotations.VisibleForTesting;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.Date;
30 import java.util.HashMap;
31 import java.util.LinkedList;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.RejectedExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.hbase.CallQueueTooBigException;
44 import org.apache.hadoop.hbase.DoNotRetryIOException;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HRegionLocation;
48 import org.apache.hadoop.hbase.RegionLocations;
49 import org.apache.hadoop.hbase.RetryImmediatelyException;
50 import org.apache.hadoop.hbase.ServerName;
51 import org.apache.hadoop.hbase.TableName;
52 import org.apache.hadoop.hbase.classification.InterfaceAudience;
53 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
54 import org.apache.hadoop.hbase.client.coprocessor.Batch;
55 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
56 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
57 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
58 import org.apache.hadoop.hbase.util.Bytes;
59 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
60 import org.apache.htrace.Trace;
62 /**
63 * The context, and return value, for a single submit/submitAll call.
64 * Note on how this class (one AP submit) works. Initially, all requests are split into groups
65 * by server; request is sent to each server in parallel; the RPC calls are not async so a
66 * thread per server is used. Every time some actions fail, regions/locations might have
67 * changed, so we re-group them by server and region again and send these groups in parallel
68 * too. The result, in case of retries, is a "tree" of threads, with parent exiting after
69 * scheduling children. This is why lots of code doesn't require any synchronization.
71 @InterfaceAudience.Private
72 class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
74 private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class);
76 private RetryingTimeTracker tracker;
78 /**
79 * Runnable (that can be submitted to thread pool) that waits for when it's time
80 * to issue replica calls, finds region replicas, groups the requests by replica and
81 * issues the calls (on separate threads, via sendMultiAction).
82 * This is done on a separate thread because we don't want to wait on user thread for
83 * our asynchronous call, and usually we have to wait before making replica calls.
85 private final class ReplicaCallIssuingRunnable implements Runnable {
86 private final long startTime;
87 private final List<Action> initialActions;
89 public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
90 this.initialActions = initialActions;
91 this.startTime = startTime;
94 @Override
95 public void run() {
96 boolean done = false;
97 if (asyncProcess.primaryCallTimeoutMicroseconds > 0) {
98 try {
99 done = waitUntilDone(startTime * 1000L + asyncProcess.primaryCallTimeoutMicroseconds);
100 } catch (InterruptedException ex) {
101 LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
102 return;
105 if (done) return; // Done within primary timeout
106 Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
107 List<Action> unknownLocActions = new ArrayList<>();
108 if (replicaGetIndices == null) {
109 for (int i = 0; i < results.length; ++i) {
110 addReplicaActions(i, actionsByServer, unknownLocActions);
112 } else {
113 for (int replicaGetIndice : replicaGetIndices) {
114 addReplicaActions(replicaGetIndice, actionsByServer, unknownLocActions);
117 if (!actionsByServer.isEmpty()) {
118 sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
120 if (!unknownLocActions.isEmpty()) {
121 actionsByServer = new HashMap<>();
122 for (Action action : unknownLocActions) {
123 addReplicaActionsAgain(action, actionsByServer);
125 // Some actions may have completely failed, they are handled inside addAgain.
126 if (!actionsByServer.isEmpty()) {
127 sendMultiAction(actionsByServer, 1, null, true);
133 * Add replica actions to action map by server.
134 * @param index Index of the original action.
135 * @param actionsByServer The map by server to add it to.
137 private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
138 List<Action> unknownReplicaActions) {
139 if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
140 Action action = initialActions.get(index);
141 RegionLocations loc = findAllLocationsOrFail(action, true);
142 if (loc == null) return;
143 HRegionLocation[] locs = loc.getRegionLocations();
144 if (locs.length == 1) {
145 LOG.warn("No replicas found for " + action.getAction());
146 return;
148 synchronized (replicaResultLock) {
149 // Don't run replica calls if the original has finished. We could do it e.g. if
150 // original has already failed before first replica call (unlikely given retries),
151 // but that would require additional synchronization w.r.t. returning to caller.
152 if (results[index] != null) return;
153 // We set the number of calls here. After that any path must call setResult/setError.
154 // True even for replicas that are not found - if we refuse to send we MUST set error.
155 results[index] = new ReplicaResultState(locs.length);
157 for (int i = 1; i < locs.length; ++i) {
158 Action replicaAction = new Action(action, i);
159 if (locs[i] != null) {
160 asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
161 replicaAction, actionsByServer, nonceGroup);
162 } else {
163 unknownReplicaActions.add(replicaAction);
168 private void addReplicaActionsAgain(
169 Action action, Map<ServerName, MultiAction> actionsByServer) {
170 if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
171 throw new AssertionError("Cannot have default replica here");
173 HRegionLocation loc = getReplicaLocationOrFail(action);
174 if (loc == null) return;
175 asyncProcess.addAction(loc.getServerName(), loc.getRegionInfo().getRegionName(),
176 action, actionsByServer, nonceGroup);
181 * Runnable (that can be submitted to thread pool) that submits MultiAction to a
182 * single server. The server call is synchronous, therefore we do it on a thread pool.
184 private final class SingleServerRequestRunnable implements Runnable {
185 private final MultiAction multiAction;
186 private final int numAttempt;
187 private final ServerName server;
188 private final Set<CancellableRegionServerCallable> callsInProgress;
189 private Long heapSize = null;
190 private SingleServerRequestRunnable(
191 MultiAction multiAction, int numAttempt, ServerName server,
192 Set<CancellableRegionServerCallable> callsInProgress) {
193 this.multiAction = multiAction;
194 this.numAttempt = numAttempt;
195 this.server = server;
196 this.callsInProgress = callsInProgress;
199 @VisibleForTesting
200 long heapSize() {
201 if (heapSize != null) {
202 return heapSize;
204 heapSize = 0L;
205 for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
206 List<Action> actions = e.getValue();
207 for (Action action: actions) {
208 Row row = action.getAction();
209 if (row instanceof Mutation) {
210 heapSize += ((Mutation) row).heapSize();
214 return heapSize;
217 @Override
218 public void run() {
219 AbstractResponse res = null;
220 CancellableRegionServerCallable callable = currentCallable;
221 try {
222 // setup the callable based on the actions, if we don't have one already from the request
223 if (callable == null) {
224 callable = createCallable(server, tableName, multiAction);
226 RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
227 try {
228 if (callsInProgress != null) {
229 callsInProgress.add(callable);
231 res = caller.callWithoutRetries(callable, operationTimeout);
232 if (res == null) {
233 // Cancelled
234 return;
236 } catch (IOException e) {
237 // The service itself failed . It may be an error coming from the communication
238 // layer, but, as well, a functional error raised by the server.
239 receiveGlobalFailure(multiAction, server, numAttempt, e);
240 return;
241 } catch (Throwable t) {
242 // This should not happen. Let's log & retry anyway.
243 LOG.error("#" + asyncProcess.id + ", Caught throwable while calling. This is unexpected." +
244 " Retrying. Server is " + server + ", tableName=" + tableName, t);
245 receiveGlobalFailure(multiAction, server, numAttempt, t);
246 return;
248 if (res.type() == AbstractResponse.ResponseType.MULTI) {
249 // Normal case: we received an answer from the server, and it's not an exception.
250 receiveMultiAction(multiAction, server, (MultiResponse) res, numAttempt);
251 } else {
252 if (results != null) {
253 SingleResponse singleResponse = (SingleResponse) res;
254 results[0] = singleResponse.getEntry();
256 decActionCounter(1);
258 } catch (Throwable t) {
259 // Something really bad happened. We are on the send thread that will now die.
260 LOG.error("Internal AsyncProcess #" + asyncProcess.id + " error for "
261 + tableName + " processing for " + server, t);
262 throw new RuntimeException(t);
263 } finally {
264 asyncProcess.decTaskCounters(multiAction.getRegions(), server);
265 if (callsInProgress != null && callable != null && res != null) {
266 callsInProgress.remove(callable);
272 private final Batch.Callback<CResult> callback;
273 private final BatchErrors errors;
274 private final ConnectionImplementation.ServerErrorTracker errorsByServer;
275 private final ExecutorService pool;
276 private final Set<CancellableRegionServerCallable> callsInProgress;
279 private final TableName tableName;
280 private final AtomicLong actionsInProgress = new AtomicLong(-1);
282 * The lock controls access to results. It is only held when populating results where
283 * there might be several callers (eventual consistency gets). For other requests,
284 * there's one unique call going on per result index.
286 private final Object replicaResultLock = new Object();
288 * Result array. Null if results are not needed. Otherwise, each index corresponds to
289 * the action index in initial actions submitted. For most request types, has null-s for
290 * requests that are not done, and result/exception for those that are done.
291 * For eventual-consistency gets, initially the same applies; at some point, replica calls
292 * might be started, and ReplicaResultState is put at the corresponding indices. The
293 * returning calls check the type to detect when this is the case. After all calls are done,
294 * ReplicaResultState-s are replaced with results for the user.
296 private final Object[] results;
298 * Indices of replica gets in results. If null, all or no actions are replica-gets.
300 private final int[] replicaGetIndices;
301 private final boolean hasAnyReplicaGets;
302 private final long nonceGroup;
303 private final CancellableRegionServerCallable currentCallable;
304 private final int operationTimeout;
305 private final int rpcTimeout;
306 private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
307 private final AsyncProcess asyncProcess;
310 * For {@link AsyncRequestFutureImpl#manageError(int, Row, Retry, Throwable, ServerName)}. Only
311 * used to make logging more clear, we don't actually care why we don't retry.
313 public enum Retry {
314 YES,
315 NO_LOCATION_PROBLEM,
316 NO_NOT_RETRIABLE,
317 NO_RETRIES_EXHAUSTED,
318 NO_OTHER_SUCCEEDED
321 /** Sync point for calls to multiple replicas for the same user request (Get).
322 * Created and put in the results array (we assume replica calls require results) when
323 * the replica calls are launched. See results for details of this process.
324 * POJO, all fields are public. To modify them, the object itself is locked. */
325 private static class ReplicaResultState {
326 public ReplicaResultState(int callCount) {
327 this.callCount = callCount;
330 /** Number of calls outstanding, or 0 if a call succeeded (even with others outstanding). */
331 int callCount;
332 /** Errors for which it is not decided whether we will report them to user. If one of the
333 * calls succeeds, we will discard the errors that may have happened in the other calls. */
334 BatchErrors replicaErrors = null;
336 @Override
337 public String toString() {
338 return "[call count " + callCount + "; errors " + replicaErrors + "]";
342 public AsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
343 long nonceGroup, AsyncProcess asyncProcess) {
344 this.pool = task.getPool();
345 this.callback = task.getCallback();
346 this.nonceGroup = nonceGroup;
347 this.tableName = task.getTableName();
348 this.actionsInProgress.set(actions.size());
349 if (task.getResults() == null) {
350 results = task.getNeedResults() ? new Object[actions.size()] : null;
351 } else {
352 if (task.getResults().length != actions.size()) {
353 throw new AssertionError("results.length");
355 this.results = task.getResults();
356 for (int i = 0; i != this.results.length; ++i) {
357 results[i] = null;
360 List<Integer> replicaGetIndices = null;
361 boolean hasAnyReplicaGets = false;
362 if (results != null) {
363 // Check to see if any requests might require replica calls.
364 // We expect that many requests will consist of all or no multi-replica gets; in such
365 // cases we would just use a boolean (hasAnyReplicaGets). If there's a mix, we will
366 // store the list of action indexes for which replica gets are possible, and set
367 // hasAnyReplicaGets to true.
368 boolean hasAnyNonReplicaReqs = false;
369 int posInList = 0;
370 for (Action action : actions) {
371 boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
372 if (isReplicaGet) {
373 hasAnyReplicaGets = true;
374 if (hasAnyNonReplicaReqs) { // Mixed case
375 if (replicaGetIndices == null) {
376 replicaGetIndices = new ArrayList<>(actions.size() - 1);
378 replicaGetIndices.add(posInList);
380 } else if (!hasAnyNonReplicaReqs) {
381 // The first non-multi-replica request in the action list.
382 hasAnyNonReplicaReqs = true;
383 if (posInList > 0) {
384 // Add all the previous requests to the index lists. We know they are all
385 // replica-gets because this is the first non-multi-replica request in the list.
386 replicaGetIndices = new ArrayList<>(actions.size() - 1);
387 for (int i = 0; i < posInList; ++i) {
388 replicaGetIndices.add(i);
392 ++posInList;
395 this.hasAnyReplicaGets = hasAnyReplicaGets;
396 if (replicaGetIndices != null) {
397 this.replicaGetIndices = new int[replicaGetIndices.size()];
398 int i = 0;
399 for (Integer el : replicaGetIndices) {
400 this.replicaGetIndices[i++] = el;
402 } else {
403 this.replicaGetIndices = null;
405 this.callsInProgress = !hasAnyReplicaGets ? null :
406 Collections.newSetFromMap(
407 new ConcurrentHashMap<CancellableRegionServerCallable, Boolean>());
408 this.asyncProcess = asyncProcess;
409 this.errorsByServer = createServerErrorTracker();
410 this.errors = (asyncProcess.globalErrors != null)
411 ? asyncProcess.globalErrors : new BatchErrors();
412 this.operationTimeout = task.getOperationTimeout();
413 this.rpcTimeout = task.getRpcTimeout();
414 this.currentCallable = task.getCallable();
415 if (task.getCallable() == null) {
416 tracker = new RetryingTimeTracker().start();
420 @VisibleForTesting
421 protected Set<CancellableRegionServerCallable> getCallsInProgress() {
422 return callsInProgress;
425 @VisibleForTesting
426 Map<ServerName, List<Long>> getRequestHeapSize() {
427 return heapSizesByServer;
430 private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
431 SingleServerRequestRunnable runnable) {
432 List<Long> heapCount = heapSizesByServer.get(server);
433 if (heapCount == null) {
434 heapCount = new LinkedList<>();
435 heapSizesByServer.put(server, heapCount);
437 heapCount.add(runnable.heapSize());
438 return runnable;
441 * Group a list of actions per region servers, and send them.
443 * @param currentActions - the list of row to submit
444 * @param numAttempt - the current numAttempt (first attempt is 1)
446 void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
447 Map<ServerName, MultiAction> actionsByServer = new HashMap<>();
449 boolean isReplica = false;
450 List<Action> unknownReplicaActions = null;
451 for (Action action : currentActions) {
452 RegionLocations locs = findAllLocationsOrFail(action, true);
453 if (locs == null) continue;
454 boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
455 if (isReplica && !isReplicaAction) {
456 // This is the property of the current implementation, not a requirement.
457 throw new AssertionError("Replica and non-replica actions in the same retry");
459 isReplica = isReplicaAction;
460 HRegionLocation loc = locs.getRegionLocation(action.getReplicaId());
461 if (loc == null || loc.getServerName() == null) {
462 if (isReplica) {
463 if (unknownReplicaActions == null) {
464 unknownReplicaActions = new ArrayList<>(1);
466 unknownReplicaActions.add(action);
467 } else {
468 // TODO: relies on primary location always being fetched
469 manageLocationError(action, null);
471 } else {
472 byte[] regionName = loc.getRegionInfo().getRegionName();
473 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
476 boolean doStartReplica = (numAttempt == 1 && !isReplica && hasAnyReplicaGets);
477 boolean hasUnknown = unknownReplicaActions != null && !unknownReplicaActions.isEmpty();
479 if (!actionsByServer.isEmpty()) {
480 // If this is a first attempt to group and send, no replicas, we need replica thread.
481 sendMultiAction(actionsByServer, numAttempt, (doStartReplica && !hasUnknown)
482 ? currentActions : null, numAttempt > 1 && !hasUnknown);
485 if (hasUnknown) {
486 actionsByServer = new HashMap<>();
487 for (Action action : unknownReplicaActions) {
488 HRegionLocation loc = getReplicaLocationOrFail(action);
489 if (loc == null) continue;
490 byte[] regionName = loc.getRegionInfo().getRegionName();
491 AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
493 if (!actionsByServer.isEmpty()) {
494 sendMultiAction(
495 actionsByServer, numAttempt, doStartReplica ? currentActions : null, true);
500 private HRegionLocation getReplicaLocationOrFail(Action action) {
501 // We are going to try get location once again. For each action, we'll do it once
502 // from cache, because the previous calls in the loop might populate it.
503 int replicaId = action.getReplicaId();
504 RegionLocations locs = findAllLocationsOrFail(action, true);
505 if (locs == null) return null; // manageError already called
506 HRegionLocation loc = locs.getRegionLocation(replicaId);
507 if (loc == null || loc.getServerName() == null) {
508 locs = findAllLocationsOrFail(action, false);
509 if (locs == null) return null; // manageError already called
510 loc = locs.getRegionLocation(replicaId);
512 if (loc == null || loc.getServerName() == null) {
513 manageLocationError(action, null);
514 return null;
516 return loc;
519 private void manageLocationError(Action action, Exception ex) {
520 String msg = "Cannot get replica " + action.getReplicaId()
521 + " location for " + action.getAction();
522 LOG.error(msg);
523 if (ex == null) {
524 ex = new IOException(msg);
526 manageError(action.getOriginalIndex(), action.getAction(),
527 Retry.NO_LOCATION_PROBLEM, ex, null);
530 private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
531 if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id +
532 ", row cannot be null");
533 RegionLocations loc = null;
534 try {
535 loc = asyncProcess.connection.locateRegion(
536 tableName, action.getAction().getRow(), useCache, true, action.getReplicaId());
537 } catch (IOException ex) {
538 manageLocationError(action, ex);
540 return loc;
544 * Send a multi action structure to the servers, after a delay depending on the attempt
545 * number. Asynchronous.
547 * @param actionsByServer the actions structured by regions
548 * @param numAttempt the attempt number.
549 * @param actionsForReplicaThread original actions for replica thread; null on non-first call.
551 void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
552 int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
553 // Run the last item on the same thread if we are already on a send thread.
554 // We hope most of the time it will be the only item, so we can cut down on threads.
555 int actionsRemaining = actionsByServer.size();
556 // This iteration is by server (the HRegionLocation comparator is by server portion only).
557 for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
558 ServerName server = e.getKey();
559 MultiAction multiAction = e.getValue();
560 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
561 numAttempt);
562 // make sure we correctly count the number of runnables before we try to reuse the send
563 // thread, in case we had to split the request into different runnables because of backoff
564 if (runnables.size() > actionsRemaining) {
565 actionsRemaining = runnables.size();
568 // run all the runnables
569 // HBASE-17475: Do not reuse the thread after stack reach a certain depth to prevent stack overflow
570 // for now, we use HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER to control the depth
571 for (Runnable runnable : runnables) {
572 if ((--actionsRemaining == 0) && reuseThread
573 && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
574 runnable.run();
575 } else {
576 try {
577 pool.submit(runnable);
578 } catch (Throwable t) {
579 if (t instanceof RejectedExecutionException) {
580 // This should never happen. But as the pool is provided by the end user,
581 // let's secure this a little.
582 LOG.warn("#" + asyncProcess.id + ", the task was rejected by the pool. This is unexpected." +
583 " Server is " + server.getServerName(), t);
584 } else {
585 // see #HBASE-14359 for more details
586 LOG.warn("Caught unexpected exception/error: ", t);
588 asyncProcess.decTaskCounters(multiAction.getRegions(), server);
589 // We're likely to fail again, but this will increment the attempt counter,
590 // so it will finish.
591 receiveGlobalFailure(multiAction, server, numAttempt, t);
597 if (actionsForReplicaThread != null) {
598 startWaitingForReplicaCalls(actionsForReplicaThread);
602 private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
603 MultiAction multiAction,
604 int numAttempt) {
605 // no stats to manage, just do the standard action
606 if (asyncProcess.connection.getStatisticsTracker() == null) {
607 if (asyncProcess.connection.getConnectionMetrics() != null) {
608 asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
610 asyncProcess.incTaskCounters(multiAction.getRegions(), server);
611 SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
612 new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
613 return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
616 // group the actions by the amount of delay
617 Map<Long, DelayingRunner> actions = new HashMap<>(multiAction.size());
619 // split up the actions
620 for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
621 Long backoff = getBackoff(server, e.getKey());
622 DelayingRunner runner = actions.get(backoff);
623 if (runner == null) {
624 actions.put(backoff, new DelayingRunner(backoff, e));
625 } else {
626 runner.add(e);
630 List<Runnable> toReturn = new ArrayList<>(actions.size());
631 for (DelayingRunner runner : actions.values()) {
632 asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
633 String traceText = "AsyncProcess.sendMultiAction";
634 Runnable runnable = addSingleServerRequestHeapSize(server,
635 new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
636 // use a delay runner only if we need to sleep for some time
637 if (runner.getSleepTime() > 0) {
638 runner.setRunner(runnable);
639 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
640 runnable = runner;
641 if (asyncProcess.connection.getConnectionMetrics() != null) {
642 asyncProcess.connection.getConnectionMetrics().incrDelayRunners();
643 asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
645 } else {
646 if (asyncProcess.connection.getConnectionMetrics() != null) {
647 asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
650 runnable = Trace.wrap(traceText, runnable);
651 toReturn.add(runnable);
654 return toReturn;
658 * @param server server location where the target region is hosted
659 * @param regionName name of the region which we are going to write some data
660 * @return the amount of time the client should wait until it submit a request to the
661 * specified server and region
663 private Long getBackoff(ServerName server, byte[] regionName) {
664 ServerStatisticTracker tracker = asyncProcess.connection.getStatisticsTracker();
665 ServerStatistics stats = tracker.getStats(server);
666 return asyncProcess.connection.getBackoffPolicy()
667 .getBackoffTime(server, regionName, stats);
671 * Starts waiting to issue replica calls on a different thread; or issues them immediately.
673 private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
674 long startTime = EnvironmentEdgeManager.currentTime();
675 ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
676 actionsForReplicaThread, startTime);
677 if (asyncProcess.primaryCallTimeoutMicroseconds == 0) {
678 // Start replica calls immediately.
679 replicaRunnable.run();
680 } else {
681 // Start the thread that may kick off replica gets.
682 // TODO: we could do it on the same thread, but it's a user thread, might be a bad idea.
683 try {
684 pool.submit(replicaRunnable);
685 } catch (RejectedExecutionException ree) {
686 LOG.warn("#" + asyncProcess.id + ", replica task was rejected by the pool - no replica calls", ree);
692 * Check that we can retry acts accordingly: logs, set the error status.
694 * @param originalIndex the position in the list sent
695 * @param row the row
696 * @param canRetry if false, we won't retry whatever the settings.
697 * @param throwable the throwable, if any (can be null)
698 * @param server the location, if any (can be null)
699 * @return true if the action can be retried, false otherwise.
701 Retry manageError(int originalIndex, Row row, Retry canRetry,
702 Throwable throwable, ServerName server) {
703 if (canRetry == Retry.YES
704 && throwable != null && throwable instanceof DoNotRetryIOException) {
705 canRetry = Retry.NO_NOT_RETRIABLE;
708 if (canRetry != Retry.YES) {
709 // Batch.Callback<Res> was not called on failure in 0.94. We keep this.
710 setError(originalIndex, row, throwable, server);
711 } else if (isActionComplete(originalIndex, row)) {
712 canRetry = Retry.NO_OTHER_SUCCEEDED;
714 return canRetry;
718 * Resubmit all the actions from this multiaction after a failure.
720 * @param rsActions the actions still to do from the initial list
721 * @param server the destination
722 * @param numAttempt the number of attempts so far
723 * @param t the throwable (if any) that caused the resubmit
725 private void receiveGlobalFailure(
726 MultiAction rsActions, ServerName server, int numAttempt, Throwable t) {
727 errorsByServer.reportServerError(server);
728 Retry canRetry = errorsByServer.canTryMore(numAttempt)
729 ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
731 if (tableName == null && ClientExceptionsUtil.isMetaClearingException(t)) {
732 // tableName is null when we made a cross-table RPC call.
733 asyncProcess.connection.clearCaches(server);
735 int failed = 0, stopped = 0;
736 List<Action> toReplay = new ArrayList<>();
737 for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
738 byte[] regionName = e.getKey();
739 byte[] row = e.getValue().iterator().next().getAction().getRow();
740 // Do not use the exception for updating cache because it might be coming from
741 // any of the regions in the MultiAction.
742 try {
743 if (tableName != null) {
744 asyncProcess.connection.updateCachedLocations(tableName, regionName, row,
745 ClientExceptionsUtil.isMetaClearingException(t) ? null : t, server);
747 } catch (Throwable ex) {
748 // That should never happen, but if it did, we want to make sure
749 // we still process errors
750 LOG.error("Couldn't update cached region locations: " + ex);
752 for (Action action : e.getValue()) {
753 Retry retry = manageError(
754 action.getOriginalIndex(), action.getAction(), canRetry, t, server);
755 if (retry == Retry.YES) {
756 toReplay.add(action);
757 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
758 ++stopped;
759 } else {
760 ++failed;
765 if (toReplay.isEmpty()) {
766 logNoResubmit(server, numAttempt, rsActions.size(), t, failed, stopped);
767 } else {
768 resubmit(server, toReplay, numAttempt, rsActions.size(), t);
773 * Log as much info as possible, and, if there is something to replay,
774 * submit it again after a back off sleep.
776 private void resubmit(ServerName oldServer, List<Action> toReplay,
777 int numAttempt, int failureCount, Throwable throwable) {
778 // We have something to replay. We're going to sleep a little before.
780 // We have two contradicting needs here:
781 // 1) We want to get the new location after having slept, as it may change.
782 // 2) We want to take into account the location when calculating the sleep time.
783 // 3) If all this is just because the response needed to be chunked try again FAST.
784 // It should be possible to have some heuristics to take the right decision. Short term,
785 // we go for one.
786 boolean retryImmediately = throwable instanceof RetryImmediatelyException;
787 int nextAttemptNumber = retryImmediately ? numAttempt : numAttempt + 1;
788 long backOffTime;
789 if (retryImmediately) {
790 backOffTime = 0;
791 } else if (throwable instanceof CallQueueTooBigException) {
792 // Give a special check on CQTBE, see #HBASE-17114
793 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pauseForCQTBE);
794 } else {
795 backOffTime = errorsByServer.calculateBackoffTime(oldServer, asyncProcess.pause);
797 if (numAttempt > asyncProcess.startLogErrorsCnt) {
798 // We use this value to have some logs when we have multiple failures, but not too many
799 // logs, as errors are to be expected when a region moves, splits and so on
800 LOG.info(createLog(numAttempt, failureCount, toReplay.size(),
801 oldServer, throwable, backOffTime, true, null, -1, -1));
804 try {
805 if (backOffTime > 0) {
806 Thread.sleep(backOffTime);
808 } catch (InterruptedException e) {
809 LOG.warn("#" + asyncProcess.id + ", not sent: " + toReplay.size() + " operations, " + oldServer, e);
810 Thread.currentThread().interrupt();
811 return;
814 groupAndSendMultiAction(toReplay, nextAttemptNumber);
817 private void logNoResubmit(ServerName oldServer, int numAttempt,
818 int failureCount, Throwable throwable, int failed, int stopped) {
819 if (failureCount != 0 || numAttempt > asyncProcess.startLogErrorsCnt + 1) {
820 String timeStr = new Date(errorsByServer.getStartTrackingTime()).toString();
821 String logMessage = createLog(numAttempt, failureCount, 0, oldServer,
822 throwable, -1, false, timeStr, failed, stopped);
823 if (failed != 0) {
824 // Only log final failures as warning
825 LOG.warn(logMessage);
826 } else {
827 LOG.info(logMessage);
833 * Called when we receive the result of a server query.
835 * @param multiAction - the multiAction we sent
836 * @param server - the location. It's used as a server name.
837 * @param responses - the response, if any
838 * @param numAttempt - the attempt
840 private void receiveMultiAction(MultiAction multiAction,
841 ServerName server, MultiResponse responses, int numAttempt) {
842 assert responses != null;
844 // Success or partial success
845 // Analyze detailed results. We can still have individual failures to be redo.
846 // two specific throwables are managed:
847 // - DoNotRetryIOException: we continue to retry for other actions
848 // - RegionMovedException: we update the cache with the new region location
850 List<Action> toReplay = new ArrayList<>();
851 Throwable throwable = null;
852 int failureCount = 0;
853 boolean canRetry = true;
855 Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
856 updateStats(server, results);
858 int failed = 0, stopped = 0;
859 // Go by original action.
860 for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
861 byte[] regionName = regionEntry.getKey();
862 Map<Integer, Object> regionResults = results.get(regionName) == null
863 ? null : results.get(regionName).result;
864 if (regionResults == null) {
865 if (!responses.getExceptions().containsKey(regionName)) {
866 LOG.error("Server sent us neither results nor exceptions for "
867 + Bytes.toStringBinary(regionName));
868 responses.getExceptions().put(regionName, new RuntimeException("Invalid response"));
870 continue;
872 boolean regionFailureRegistered = false;
873 for (Action sentAction : regionEntry.getValue()) {
874 Object result = regionResults.get(sentAction.getOriginalIndex());
875 // Failure: retry if it's make sense else update the errors lists
876 if (result == null || result instanceof Throwable) {
877 Row row = sentAction.getAction();
878 throwable = ClientExceptionsUtil.findException(result);
879 // Register corresponding failures once per server/once per region.
880 if (!regionFailureRegistered) {
881 regionFailureRegistered = true;
882 try {
883 asyncProcess.connection.updateCachedLocations(
884 tableName, regionName, row.getRow(), result, server);
885 } catch (Throwable ex) {
886 // That should never happen, but if it did, we want to make sure
887 // we still process errors
888 LOG.error("Couldn't update cached region locations: " + ex);
891 if (failureCount == 0) {
892 errorsByServer.reportServerError(server);
893 // We determine canRetry only once for all calls, after reporting server failure.
894 canRetry = errorsByServer.canTryMore(numAttempt);
896 ++failureCount;
897 Retry retry = manageError(sentAction.getOriginalIndex(), row,
898 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, (Throwable) result, server);
899 if (retry == Retry.YES) {
900 toReplay.add(sentAction);
901 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
902 ++stopped;
903 } else {
904 ++failed;
906 } else {
907 if (callback != null) {
908 try {
909 //noinspection unchecked
910 // TODO: would callback expect a replica region name if it gets one?
911 this.callback.update(regionName, sentAction.getAction().getRow(), (CResult) result);
912 } catch (Throwable t) {
913 LOG.error("User callback threw an exception for "
914 + Bytes.toStringBinary(regionName) + ", ignoring", t);
917 setResult(sentAction, result);
922 // The failures global to a region. We will use for multiAction we sent previously to find the
923 // actions to replay.
924 for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
925 throwable = throwableEntry.getValue();
926 byte[] region = throwableEntry.getKey();
927 List<Action> actions = multiAction.actions.get(region);
928 if (actions == null || actions.isEmpty()) {
929 throw new IllegalStateException("Wrong response for the region: " +
930 HRegionInfo.encodeRegionName(region));
933 if (failureCount == 0) {
934 errorsByServer.reportServerError(server);
935 canRetry = errorsByServer.canTryMore(numAttempt);
937 if (null == tableName && ClientExceptionsUtil.isMetaClearingException(throwable)) {
938 // For multi-actions, we don't have a table name, but we want to make sure to clear the
939 // cache in case there were location-related exceptions. We don't to clear the cache
940 // for every possible exception that comes through, however.
941 asyncProcess.connection.clearCaches(server);
942 } else {
943 try {
944 asyncProcess.connection.updateCachedLocations(
945 tableName, region, actions.get(0).getAction().getRow(), throwable, server);
946 } catch (Throwable ex) {
947 // That should never happen, but if it did, we want to make sure
948 // we still process errors
949 LOG.error("Couldn't update cached region locations: " + ex);
952 failureCount += actions.size();
954 for (Action action : actions) {
955 Row row = action.getAction();
956 Retry retry = manageError(action.getOriginalIndex(), row,
957 canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
958 if (retry == Retry.YES) {
959 toReplay.add(action);
960 } else if (retry == Retry.NO_OTHER_SUCCEEDED) {
961 ++stopped;
962 } else {
963 ++failed;
967 if (toReplay.isEmpty()) {
968 logNoResubmit(server, numAttempt, failureCount, throwable, failed, stopped);
969 } else {
970 resubmit(server, toReplay, numAttempt, failureCount, throwable);
974 @VisibleForTesting
975 protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
976 boolean metrics = asyncProcess.connection.getConnectionMetrics() != null;
977 boolean stats = asyncProcess.connection.getStatisticsTracker() != null;
978 if (!stats && !metrics) {
979 return;
981 for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
982 byte[] regionName = regionStats.getKey();
983 ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
984 if (stat == null) {
985 LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
986 + ", region=" + Bytes.toStringBinary(regionName));
987 continue;
989 RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat);
990 ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server,
991 regionName, regionLoadstats);
992 ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
993 server, regionName, regionLoadstats);
998 private String createLog(int numAttempt, int failureCount, int replaySize, ServerName sn,
999 Throwable error, long backOffTime, boolean willRetry, String startTime,
1000 int failed, int stopped) {
1001 StringBuilder sb = new StringBuilder();
1002 sb.append("#").append(asyncProcess.id).append(", table=").append(tableName).append(", ")
1003 .append("attempt=").append(numAttempt)
1004 .append("/").append(asyncProcess.numTries).append(" ");
1006 if (failureCount > 0 || error != null){
1007 sb.append("failed=").append(failureCount).append("ops").append(", last exception: ").
1008 append(error == null ? "null" : error);
1009 } else {
1010 sb.append("succeeded");
1013 sb.append(" on ").append(sn).append(", tracking started ").append(startTime);
1015 if (willRetry) {
1016 sb.append(", retrying after=").append(backOffTime).append("ms").
1017 append(", replay=").append(replaySize).append("ops");
1018 } else if (failureCount > 0) {
1019 if (stopped > 0) {
1020 sb.append("; not retrying ").append(stopped).append(" due to success from other replica");
1022 if (failed > 0) {
1023 sb.append("; not retrying ").append(failed).append(" - final failure");
1028 return sb.toString();
1032 * Sets the non-error result from a particular action.
1033 * @param action Action (request) that the server responded to.
1034 * @param result The result.
1036 private void setResult(Action action, Object result) {
1037 if (result == null) {
1038 throw new RuntimeException("Result cannot be null");
1040 ReplicaResultState state = null;
1041 boolean isStale = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
1042 int index = action.getOriginalIndex();
1043 if (results == null) {
1044 decActionCounter(index);
1045 return; // Simple case, no replica requests.
1047 state = trySetResultSimple(index, action.getAction(), false, result, null, isStale);
1048 if (state == null) {
1049 return; // Simple case, no replica requests.
1051 // At this point we know that state is set to replica tracking class.
1052 // It could be that someone else is also looking at it; however, we know there can
1053 // only be one state object, and only one thread can set callCount to 0. Other threads
1054 // will either see state with callCount 0 after locking it; or will not see state at all
1055 // we will replace it with the result.
1056 synchronized (state) {
1057 if (state.callCount == 0) {
1058 return; // someone already set the result
1060 state.callCount = 0;
1062 synchronized (replicaResultLock) {
1063 if (results[index] != state) {
1064 throw new AssertionError("We set the callCount but someone else replaced the result");
1066 results[index] = result;
1069 decActionCounter(index);
1073 * Sets the error from a particular action.
1074 * @param index Original action index.
1075 * @param row Original request.
1076 * @param throwable The resulting error.
1077 * @param server The source server.
1079 private void setError(int index, Row row, Throwable throwable, ServerName server) {
1080 ReplicaResultState state = null;
1081 if (results == null) {
1082 // Note that we currently cannot have replica requests with null results. So it shouldn't
1083 // happen that multiple replica calls will call dAC for same actions with results == null.
1084 // Only one call per action should be present in this case.
1085 errors.add(throwable, row, server);
1086 decActionCounter(index);
1087 return; // Simple case, no replica requests.
1089 state = trySetResultSimple(index, row, true, throwable, server, false);
1090 if (state == null) {
1091 return; // Simple case, no replica requests.
1093 BatchErrors target = null; // Error will be added to final errors, or temp replica errors.
1094 boolean isActionDone = false;
1095 synchronized (state) {
1096 switch (state.callCount) {
1097 case 0: return; // someone already set the result
1098 case 1: { // All calls failed, we are the last error.
1099 target = errors;
1100 isActionDone = true;
1101 break;
1103 default: {
1104 assert state.callCount > 1;
1105 if (state.replicaErrors == null) {
1106 state.replicaErrors = new BatchErrors();
1108 target = state.replicaErrors;
1109 break;
1112 --state.callCount;
1114 target.add(throwable, row, server);
1115 if (isActionDone) {
1116 if (state.replicaErrors != null) { // last call, no need to lock
1117 errors.merge(state.replicaErrors);
1119 // See setResult for explanations.
1120 synchronized (replicaResultLock) {
1121 if (results[index] != state) {
1122 throw new AssertionError("We set the callCount but someone else replaced the result");
1124 results[index] = throwable;
1126 decActionCounter(index);
1131 * Checks if the action is complete; used on error to prevent needless retries.
1132 * Does not synchronize, assuming element index/field accesses are atomic.
1133 * This is an opportunistic optimization check, doesn't have to be strict.
1134 * @param index Original action index.
1135 * @param row Original request.
1137 private boolean isActionComplete(int index, Row row) {
1138 if (!AsyncProcess.isReplicaGet(row)) return false;
1139 Object resObj = results[index];
1140 return (resObj != null) && (!(resObj instanceof ReplicaResultState)
1141 || ((ReplicaResultState)resObj).callCount == 0);
1145 * Tries to set the result or error for a particular action as if there were no replica calls.
1146 * @return null if successful; replica state if there were in fact replica calls.
1148 private ReplicaResultState trySetResultSimple(int index, Row row, boolean isError,
1149 Object result, ServerName server, boolean isFromReplica) {
1150 Object resObj = null;
1151 if (!AsyncProcess.isReplicaGet(row)) {
1152 if (isFromReplica) {
1153 throw new AssertionError("Unexpected stale result for " + row);
1155 results[index] = result;
1156 } else {
1157 synchronized (replicaResultLock) {
1158 resObj = results[index];
1159 if (resObj == null) {
1160 if (isFromReplica) {
1161 throw new AssertionError("Unexpected stale result for " + row);
1163 results[index] = result;
1168 ReplicaResultState rrs =
1169 (resObj instanceof ReplicaResultState) ? (ReplicaResultState)resObj : null;
1170 if (rrs == null && isError) {
1171 // The resObj is not replica state (null or already set).
1172 errors.add((Throwable)result, row, server);
1175 if (resObj == null) {
1176 // resObj is null - no replica calls were made.
1177 decActionCounter(index);
1178 return null;
1180 return rrs;
1183 private void decActionCounter(int index) {
1184 long actionsRemaining = actionsInProgress.decrementAndGet();
1185 if (actionsRemaining < 0) {
1186 String error = buildDetailedErrorMsg("Incorrect actions in progress", index);
1187 throw new AssertionError(error);
1188 } else if (actionsRemaining == 0) {
1189 synchronized (actionsInProgress) {
1190 actionsInProgress.notifyAll();
1195 private String buildDetailedErrorMsg(String string, int index) {
1196 StringBuilder error = new StringBuilder(128);
1197 error.append(string).append("; called for ").append(index).append(", actionsInProgress ")
1198 .append(actionsInProgress.get()).append("; replica gets: ");
1199 if (replicaGetIndices != null) {
1200 for (int i = 0; i < replicaGetIndices.length; ++i) {
1201 error.append(replicaGetIndices[i]).append(", ");
1203 } else {
1204 error.append(hasAnyReplicaGets ? "all" : "none");
1206 error.append("; results ");
1207 if (results != null) {
1208 for (int i = 0; i < results.length; ++i) {
1209 Object o = results[i];
1210 error.append(((o == null) ? "null" : o.toString())).append(", ");
1213 return error.toString();
1216 @Override
1217 public void waitUntilDone() throws InterruptedIOException {
1218 try {
1219 waitUntilDone(Long.MAX_VALUE);
1220 } catch (InterruptedException iex) {
1221 throw new InterruptedIOException(iex.getMessage());
1222 } finally {
1223 if (callsInProgress != null) {
1224 for (CancellableRegionServerCallable clb : callsInProgress) {
1225 clb.cancel();
1231 private boolean waitUntilDone(long cutoff) throws InterruptedException {
1232 boolean hasWait = cutoff != Long.MAX_VALUE;
1233 long lastLog = EnvironmentEdgeManager.currentTime();
1234 long currentInProgress;
1235 while (0 != (currentInProgress = actionsInProgress.get())) {
1236 long now = EnvironmentEdgeManager.currentTime();
1237 if (hasWait && (now * 1000L) > cutoff) {
1238 return false;
1240 if (!hasWait) { // Only log if wait is infinite.
1241 if (now > lastLog + 10000) {
1242 lastLog = now;
1243 LOG.info("#" + asyncProcess.id + ", waiting for " + currentInProgress
1244 + " actions to finish on table: " + tableName);
1247 synchronized (actionsInProgress) {
1248 if (actionsInProgress.get() == 0) break;
1249 if (!hasWait) {
1250 actionsInProgress.wait(10);
1251 } else {
1252 long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
1253 TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
1257 return true;
1260 @Override
1261 public boolean hasError() {
1262 return errors.hasErrors();
1265 @Override
1266 public List<? extends Row> getFailedOperations() {
1267 return errors.actions;
1270 @Override
1271 public RetriesExhaustedWithDetailsException getErrors() {
1272 return errors.makeException(asyncProcess.logBatchErrorDetails);
1275 @Override
1276 public Object[] getResults() throws InterruptedIOException {
1277 waitUntilDone();
1278 return results;
1282 * Creates the server error tracker to use inside process.
1283 * Currently, to preserve the main assumption about current retries, and to work well with
1284 * the retry-limit-based calculation, the calculation is local per Process object.
1285 * We may benefit from connection-wide tracking of server errors.
1286 * @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
1288 private ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
1289 return new ConnectionImplementation.ServerErrorTracker(
1290 asyncProcess.serverTrackerTimeout, asyncProcess.numTries);
1294 * Create a callable. Isolated to be easily overridden in the tests.
1296 private MultiServerCallable createCallable(final ServerName server, TableName tableName,
1297 final MultiAction multi) {
1298 return new MultiServerCallable(asyncProcess.connection, tableName, server,
1299 multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);