HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncScanSingleRegionRpcRetryingCaller.java
blob48e038ecd2e7d63e7fc170063d720d99e0ea57d3
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
21 import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
22 import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
23 import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
24 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
25 import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan;
26 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
27 import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
28 import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
29 import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
30 import io.opentelemetry.context.Context;
31 import io.opentelemetry.context.Scope;
32 import java.io.IOException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Optional;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.TimeUnit;
38 import org.apache.hadoop.hbase.CallQueueTooBigException;
39 import org.apache.hadoop.hbase.DoNotRetryIOException;
40 import org.apache.hadoop.hbase.HConstants;
41 import org.apache.hadoop.hbase.HRegionLocation;
42 import org.apache.hadoop.hbase.NotServingRegionException;
43 import org.apache.hadoop.hbase.UnknownScannerException;
44 import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer.ScanResumer;
45 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
46 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
47 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
48 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
49 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
50 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
51 import org.apache.yetus.audience.InterfaceAudience;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
55 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
56 import org.apache.hbase.thirdparty.io.netty.util.Timer;
57 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
58 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
59 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
60 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
61 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
62 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
63 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
65 /**
66 * Retry caller for scanning a region.
67 * <p>
68 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
69 * reference of this object and use it to open new single region scanners.
71 @InterfaceAudience.Private
72 class AsyncScanSingleRegionRpcRetryingCaller {
74 private static final Logger LOG =
75 LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
77 private final Timer retryTimer;
79 private final Scan scan;
81 private final ScanMetrics scanMetrics;
83 private final long scannerId;
85 private final ScanResultCache resultCache;
87 private final AdvancedScanResultConsumer consumer;
89 private final ClientService.Interface stub;
91 private final HRegionLocation loc;
93 private final boolean regionServerRemote;
95 private final int priority;
97 private final long scannerLeaseTimeoutPeriodNs;
99 private final long pauseNs;
101 private final long pauseForCQTBENs;
103 private final int maxAttempts;
105 private final long scanTimeoutNs;
107 private final long rpcTimeoutNs;
109 private final int startLogErrorsCnt;
111 private final Runnable completeWhenNoMoreResultsInRegion;
113 private final CompletableFuture<Boolean> future;
115 private final HBaseRpcController controller;
117 private byte[] nextStartRowWhenError;
119 private boolean includeNextStartRowWhenError;
121 private long nextCallStartNs;
123 private int tries;
125 private final List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions;
127 private long nextCallSeq = -1L;
129 private enum ScanControllerState {
130 INITIALIZED, SUSPENDED, TERMINATED, DESTROYED
133 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
134 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
135 // usage. We use two things to prevent invalid usage:
136 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
137 // IllegalStateException if the caller thread is not this thread.
138 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
139 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
140 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
141 // call destroy to get the current state and set the state to DESTROYED. And when user calls
142 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
143 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
144 // suspend or terminate so the state will still be INITIALIZED when back from onNext or
145 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
146 // to be used in the future.
147 // Notice that, the public methods of this class is supposed to be called by upper layer only, and
148 // package private methods can only be called within the implementation of
149 // AsyncScanSingleRegionRpcRetryingCaller.
150 private final class ScanControllerImpl implements AdvancedScanResultConsumer.ScanController {
152 // Make sure the methods are only called in this thread.
153 private final Thread callerThread;
155 private final Optional<Cursor> cursor;
157 // INITIALIZED -> SUSPENDED -> DESTROYED
158 // INITIALIZED -> TERMINATED -> DESTROYED
159 // INITIALIZED -> DESTROYED
160 // If the state is incorrect we will throw IllegalStateException.
161 private ScanControllerState state = ScanControllerState.INITIALIZED;
163 private ScanResumerImpl resumer;
165 public ScanControllerImpl(Optional<Cursor> cursor) {
166 this.callerThread = Thread.currentThread();
167 this.cursor = cursor;
170 private void preCheck() {
171 Preconditions.checkState(Thread.currentThread() == callerThread,
172 "The current thread is %s, expected thread is %s, " +
173 "you should not call this method outside onNext or onHeartbeat",
174 Thread.currentThread(), callerThread);
175 Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
176 "Invalid Stopper state %s", state);
179 @Override
180 public ScanResumer suspend() {
181 preCheck();
182 state = ScanControllerState.SUSPENDED;
183 ScanResumerImpl resumer = new ScanResumerImpl();
184 this.resumer = resumer;
185 return resumer;
188 @Override
189 public void terminate() {
190 preCheck();
191 state = ScanControllerState.TERMINATED;
194 // return the current state, and set the state to DESTROYED.
195 ScanControllerState destroy() {
196 ScanControllerState state = this.state;
197 this.state = ScanControllerState.DESTROYED;
198 return state;
201 @Override
202 public Optional<Cursor> cursor() {
203 return cursor;
207 private enum ScanResumerState {
208 INITIALIZED, SUSPENDED, RESUMED
211 // The resume method is allowed to be called in another thread so here we also use the
212 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
213 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
214 // and when user calls resume method, we will change the state to RESUMED. But the resume method
215 // could be called in other thread, and in fact, user could just do this:
216 // controller.suspend().resume()
217 // This is strange but valid. This means the scan could be resumed before we call the prepare
218 // method to do the actual suspend work. So in the resume method, we will check if the state is
219 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
220 // method, if the state is RESUMED already, we will just return an let the scan go on.
221 // Notice that, the public methods of this class is supposed to be called by upper layer only, and
222 // package private methods can only be called within the implementation of
223 // AsyncScanSingleRegionRpcRetryingCaller.
224 private final class ScanResumerImpl implements AdvancedScanResultConsumer.ScanResumer {
226 // INITIALIZED -> SUSPENDED -> RESUMED
227 // INITIALIZED -> RESUMED
228 private ScanResumerState state = ScanResumerState.INITIALIZED;
230 private ScanResponse resp;
232 private int numberOfCompleteRows;
234 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
235 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
236 // every time when the previous task is finished. There could also be race as the renewal is
237 // executed in the timer thread, so we also need to check the state before lease renewal. If the
238 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
239 // renewal task.
240 private Timeout leaseRenewer;
242 @Override
243 public void resume() {
244 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
245 // just return at the first if condition without loading the resp and numValidResuls field. If
246 // resume is called after suspend, then it is also safe to just reference resp and
247 // numValidResults after the synchronized block as no one will change it anymore.
248 ScanResponse localResp;
249 int localNumberOfCompleteRows;
250 synchronized (this) {
251 if (state == ScanResumerState.INITIALIZED) {
252 // user calls this method before we call prepare, so just set the state to
253 // RESUMED, the implementation will just go on.
254 state = ScanResumerState.RESUMED;
255 return;
257 if (state == ScanResumerState.RESUMED) {
258 // already resumed, give up.
259 return;
261 state = ScanResumerState.RESUMED;
262 if (leaseRenewer != null) {
263 leaseRenewer.cancel();
265 localResp = this.resp;
266 localNumberOfCompleteRows = this.numberOfCompleteRows;
268 completeOrNext(localResp, localNumberOfCompleteRows);
271 private void scheduleRenewLeaseTask() {
272 leaseRenewer = retryTimer.newTimeout(t -> tryRenewLease(), scannerLeaseTimeoutPeriodNs / 2,
273 TimeUnit.NANOSECONDS);
276 private synchronized void tryRenewLease() {
277 // the scan has already been resumed, give up
278 if (state == ScanResumerState.RESUMED) {
279 return;
281 renewLease();
282 // schedule the next renew lease task again as this is a one-time task.
283 scheduleRenewLeaseTask();
286 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
287 // for more details.
288 synchronized boolean prepare(ScanResponse resp, int numberOfCompleteRows) {
289 if (state == ScanResumerState.RESUMED) {
290 // user calls resume before we actually suspend the scan, just continue;
291 return false;
293 state = ScanResumerState.SUSPENDED;
294 this.resp = resp;
295 this.numberOfCompleteRows = numberOfCompleteRows;
296 // if there are no more results in region then the scanner at RS side will be closed
297 // automatically so we do not need to renew lease.
298 if (resp.getMoreResultsInRegion()) {
299 // schedule renew lease task
300 scheduleRenewLeaseTask();
302 return true;
306 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
307 Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache,
308 AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
309 boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs,
310 long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
311 int startLogErrorsCnt) {
312 this.retryTimer = retryTimer;
313 this.scan = scan;
314 this.scanMetrics = scanMetrics;
315 this.scannerId = scannerId;
316 this.resultCache = resultCache;
317 this.consumer = consumer;
318 this.stub = stub;
319 this.loc = loc;
320 this.regionServerRemote = isRegionServerRemote;
321 this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
322 this.pauseNs = pauseNs;
323 this.pauseForCQTBENs = pauseForCQTBENs;
324 this.maxAttempts = maxAttempts;
325 this.scanTimeoutNs = scanTimeoutNs;
326 this.rpcTimeoutNs = rpcTimeoutNs;
327 this.startLogErrorsCnt = startLogErrorsCnt;
328 if (scan.isReversed()) {
329 completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion;
330 } else {
331 completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion;
333 this.future = new CompletableFuture<>();
334 this.priority = priority;
335 this.controller = conn.rpcControllerFactory.newController();
336 this.controller.setPriority(priority);
337 this.exceptions = new ArrayList<>();
340 private long elapsedMs() {
341 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
344 private long remainingTimeNs() {
345 return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
348 private void closeScanner() {
349 incRPCCallsMetrics(scanMetrics, regionServerRemote);
350 resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
351 ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
352 stub.scan(controller, req, resp -> {
353 if (controller.failed()) {
354 LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
355 " for " + loc.getRegion().getEncodedName() + " of " +
356 loc.getRegion().getTable() + " failed, ignore, probably already closed",
357 controller.getFailed());
362 private void completeExceptionally(boolean closeScanner) {
363 resultCache.clear();
364 if (closeScanner) {
365 closeScanner();
367 future.completeExceptionally(new RetriesExhaustedException(tries - 1, exceptions));
370 private void completeNoMoreResults() {
371 future.complete(false);
374 private void completeWithNextStartRow(byte[] row, boolean inclusive) {
375 scan.withStartRow(row, inclusive);
376 future.complete(true);
379 private void completeWhenError(boolean closeScanner) {
380 incRPCRetriesMetrics(scanMetrics, closeScanner);
381 resultCache.clear();
382 if (closeScanner) {
383 closeScanner();
385 if (nextStartRowWhenError != null) {
386 scan.withStartRow(nextStartRowWhenError, includeNextStartRowWhenError);
388 future.complete(true);
391 private void onError(Throwable error) {
392 error = translateException(error);
393 if (tries > startLogErrorsCnt) {
394 LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
395 loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
396 " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
397 TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
398 " ms",
399 error);
401 boolean scannerClosed =
402 error instanceof UnknownScannerException || error instanceof NotServingRegionException ||
403 error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
404 RetriesExhaustedException.ThrowableWithExtraContext qt =
405 new RetriesExhaustedException.ThrowableWithExtraContext(error,
406 EnvironmentEdgeManager.currentTime(), "");
407 exceptions.add(qt);
408 if (tries >= maxAttempts) {
409 completeExceptionally(!scannerClosed);
410 return;
412 long delayNs;
413 long pauseNsToUse = error instanceof CallQueueTooBigException ? pauseForCQTBENs : pauseNs;
414 if (scanTimeoutNs > 0) {
415 long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
416 if (maxDelayNs <= 0) {
417 completeExceptionally(!scannerClosed);
418 return;
420 delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
421 } else {
422 delayNs = getPauseTime(pauseNsToUse, tries - 1);
424 if (scannerClosed) {
425 completeWhenError(false);
426 return;
428 if (error instanceof OutOfOrderScannerNextException) {
429 completeWhenError(true);
430 return;
432 if (error instanceof DoNotRetryIOException) {
433 completeExceptionally(true);
434 return;
436 tries++;
437 retryTimer.newTimeout(t -> call(), delayNs, TimeUnit.NANOSECONDS);
440 private void updateNextStartRowWhenError(Result result) {
441 nextStartRowWhenError = result.getRow();
442 includeNextStartRowWhenError = result.mayHaveMoreCellsInRow();
445 private void completeWhenNoMoreResultsInRegion() {
446 if (noMoreResultsForScan(scan, loc.getRegion())) {
447 completeNoMoreResults();
448 } else {
449 completeWithNextStartRow(loc.getRegion().getEndKey(), true);
453 private void completeReversedWhenNoMoreResultsInRegion() {
454 if (noMoreResultsForReverseScan(scan, loc.getRegion())) {
455 completeNoMoreResults();
456 } else {
457 completeWithNextStartRow(loc.getRegion().getStartKey(), false);
461 private void completeOrNext(ScanResponse resp, int numberOfCompleteRows) {
462 if (resp.hasMoreResults() && !resp.getMoreResults()) {
463 // RS tells us there is no more data for the whole scan
464 completeNoMoreResults();
465 return;
467 if (scan.getLimit() > 0) {
468 // The RS should have set the moreResults field in ScanResponse to false when we have reached
469 // the limit, so we add an assert here.
470 int newLimit = scan.getLimit() - numberOfCompleteRows;
471 assert newLimit > 0;
472 scan.setLimit(newLimit);
474 // as in 2.0 this value will always be set
475 if (!resp.getMoreResultsInRegion()) {
476 completeWhenNoMoreResultsInRegion.run();
477 return;
479 next();
482 private void onComplete(HBaseRpcController controller, ScanResponse resp) {
483 if (controller.failed()) {
484 onError(controller.getFailed());
485 return;
487 updateServerSideMetrics(scanMetrics, resp);
488 boolean isHeartbeatMessage = resp.hasHeartbeatMessage() && resp.getHeartbeatMessage();
489 Result[] rawResults;
490 Result[] results;
491 int numberOfCompleteRowsBefore = resultCache.numberOfCompleteRows();
492 try {
493 rawResults = ResponseConverter.getResults(controller.cellScanner(), resp);
494 updateResultsMetrics(scanMetrics, rawResults, isHeartbeatMessage);
495 results = resultCache.addAndGet(
496 Optional.ofNullable(rawResults).orElse(ScanResultCache.EMPTY_RESULT_ARRAY),
497 isHeartbeatMessage);
498 } catch (IOException e) {
499 // We can not retry here. The server has responded normally and the call sequence has been
500 // increased so a new scan with the same call sequence will cause an
501 // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
502 LOG.warn("decode scan response failed", e);
503 completeWhenError(true);
504 return;
507 ScanControllerImpl scanController;
508 if (results.length > 0) {
509 scanController = new ScanControllerImpl(
510 resp.hasCursor() ? Optional.of(ProtobufUtil.toCursor(resp.getCursor()))
511 : Optional.empty());
512 updateNextStartRowWhenError(results[results.length - 1]);
513 consumer.onNext(results, scanController);
514 } else {
515 Optional<Cursor> cursor = Optional.empty();
516 if (resp.hasCursor()) {
517 cursor = Optional.of(ProtobufUtil.toCursor(resp.getCursor()));
518 } else if (scan.isNeedCursorResult() && rawResults.length > 0) {
519 // It is size limit exceed and we need to return the last Result's row.
520 // When user setBatch and the scanner is reopened, the server may return Results that
521 // user has seen and the last Result can not be seen because the number is not enough.
522 // So the row keys of results may not be same, we must use the last one.
523 cursor = Optional.of(new Cursor(rawResults[rawResults.length - 1].getRow()));
525 scanController = new ScanControllerImpl(cursor);
526 if (isHeartbeatMessage || cursor.isPresent()) {
527 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
528 // want to pass a cursor to upper layer.
529 consumer.onHeartbeat(scanController);
532 ScanControllerState state = scanController.destroy();
533 if (state == ScanControllerState.TERMINATED) {
534 if (resp.getMoreResultsInRegion()) {
535 // we have more results in region but user request to stop the scan, so we need to close the
536 // scanner explicitly.
537 closeScanner();
539 completeNoMoreResults();
540 return;
542 int numberOfCompleteRows = resultCache.numberOfCompleteRows() - numberOfCompleteRowsBefore;
543 if (state == ScanControllerState.SUSPENDED) {
544 if (scanController.resumer.prepare(resp, numberOfCompleteRows)) {
545 return;
548 completeOrNext(resp, numberOfCompleteRows);
551 private void call() {
552 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
553 // less than the scan timeout. If the server does not respond in time(usually this will not
554 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
555 // resending the next request and the only way to fix this is to close the scanner and open a
556 // new one.
557 long callTimeoutNs;
558 if (scanTimeoutNs > 0) {
559 long remainingNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
560 if (remainingNs <= 0) {
561 completeExceptionally(true);
562 return;
564 callTimeoutNs = remainingNs;
565 } else {
566 callTimeoutNs = 0L;
568 incRPCCallsMetrics(scanMetrics, regionServerRemote);
569 if (tries > 1) {
570 incRPCRetriesMetrics(scanMetrics, regionServerRemote);
572 resetController(controller, callTimeoutNs, priority);
573 ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
574 nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
575 final Context context = Context.current();
576 stub.scan(controller, req, resp -> {
577 try (Scope ignored = context.makeCurrent()) {
578 onComplete(controller, resp);
583 private void next() {
584 nextCallSeq++;
585 tries = 1;
586 exceptions.clear();
587 nextCallStartNs = System.nanoTime();
588 call();
591 private void renewLease() {
592 incRPCCallsMetrics(scanMetrics, regionServerRemote);
593 nextCallSeq++;
594 resetController(controller, rpcTimeoutNs, priority);
595 ScanRequest req =
596 RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1);
597 stub.scan(controller, req, resp -> {
602 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
603 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
604 * open scanner request is also needed because we may have some data in the CellScanner which is
605 * contained in the controller.
606 * @return {@code true} if we should continue, otherwise {@code false}.
608 public CompletableFuture<Boolean> start(HBaseRpcController controller,
609 ScanResponse respWhenOpen) {
610 onComplete(controller, respWhenOpen);
611 return future;