2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.SLEEP_DELTA_NS
;
21 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.getPauseTime
;
22 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.incRPCCallsMetrics
;
23 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.incRPCRetriesMetrics
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.noMoreResultsForReverseScan
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.noMoreResultsForScan
;
26 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.resetController
;
27 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.translateException
;
28 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.updateResultsMetrics
;
29 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.updateServerSideMetrics
;
30 import io
.opentelemetry
.context
.Context
;
31 import io
.opentelemetry
.context
.Scope
;
32 import java
.io
.IOException
;
33 import java
.util
.ArrayList
;
34 import java
.util
.List
;
35 import java
.util
.Optional
;
36 import java
.util
.concurrent
.CompletableFuture
;
37 import java
.util
.concurrent
.TimeUnit
;
38 import org
.apache
.hadoop
.hbase
.CallQueueTooBigException
;
39 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
40 import org
.apache
.hadoop
.hbase
.HConstants
;
41 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
42 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
43 import org
.apache
.hadoop
.hbase
.UnknownScannerException
;
44 import org
.apache
.hadoop
.hbase
.client
.AdvancedScanResultConsumer
.ScanResumer
;
45 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
46 import org
.apache
.hadoop
.hbase
.exceptions
.OutOfOrderScannerNextException
;
47 import org
.apache
.hadoop
.hbase
.exceptions
.ScannerResetException
;
48 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
49 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerStoppedException
;
50 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
51 import org
.apache
.yetus
.audience
.InterfaceAudience
;
52 import org
.slf4j
.Logger
;
53 import org
.slf4j
.LoggerFactory
;
54 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
55 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timeout
;
56 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timer
;
57 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
58 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
59 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ResponseConverter
;
60 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
.Interface
;
62 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanRequest
;
63 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanResponse
;
66 * Retry caller for scanning a region.
68 * We will modify the {@link Scan} object passed in directly. The upper layer should store the
69 * reference of this object and use it to open new single region scanners.
71 @InterfaceAudience.Private
72 class AsyncScanSingleRegionRpcRetryingCaller
{
74 private static final Logger LOG
=
75 LoggerFactory
.getLogger(AsyncScanSingleRegionRpcRetryingCaller
.class);
77 private final Timer retryTimer
;
79 private final Scan scan
;
81 private final ScanMetrics scanMetrics
;
83 private final long scannerId
;
85 private final ScanResultCache resultCache
;
87 private final AdvancedScanResultConsumer consumer
;
89 private final ClientService
.Interface stub
;
91 private final HRegionLocation loc
;
93 private final boolean regionServerRemote
;
95 private final int priority
;
97 private final long scannerLeaseTimeoutPeriodNs
;
99 private final long pauseNs
;
101 private final long pauseForCQTBENs
;
103 private final int maxAttempts
;
105 private final long scanTimeoutNs
;
107 private final long rpcTimeoutNs
;
109 private final int startLogErrorsCnt
;
111 private final Runnable completeWhenNoMoreResultsInRegion
;
113 private final CompletableFuture
<Boolean
> future
;
115 private final HBaseRpcController controller
;
117 private byte[] nextStartRowWhenError
;
119 private boolean includeNextStartRowWhenError
;
121 private long nextCallStartNs
;
125 private final List
<RetriesExhaustedException
.ThrowableWithExtraContext
> exceptions
;
127 private long nextCallSeq
= -1L;
129 private enum ScanControllerState
{
130 INITIALIZED
, SUSPENDED
, TERMINATED
, DESTROYED
133 // Since suspend and terminate should only be called within onNext or onHeartbeat(see the comments
134 // of RawScanResultConsumer.onNext and onHeartbeat), we need to add some check to prevent invalid
135 // usage. We use two things to prevent invalid usage:
136 // 1. Record the thread that construct the ScanControllerImpl instance. We will throw an
137 // IllegalStateException if the caller thread is not this thread.
138 // 2. The ControllerState. The initial state is INITIALIZED, if you call suspend, the state will
139 // be transformed to SUSPENDED, and if you call terminate, the state will be transformed to
140 // TERMINATED. And when we are back from onNext or onHeartbeat in the onComplete method, we will
141 // call destroy to get the current state and set the state to DESTROYED. And when user calls
142 // suspend or terminate, we will check if the current state is INITIALIZED, if not we will throw
143 // an IllegalStateException. Notice that the DESTROYED state is necessary as you may not call
144 // suspend or terminate so the state will still be INITIALIZED when back from onNext or
145 // onHeartbeat. We need another state to replace the INITIALIZED state to prevent the controller
146 // to be used in the future.
147 // Notice that, the public methods of this class is supposed to be called by upper layer only, and
148 // package private methods can only be called within the implementation of
149 // AsyncScanSingleRegionRpcRetryingCaller.
150 private final class ScanControllerImpl
implements AdvancedScanResultConsumer
.ScanController
{
152 // Make sure the methods are only called in this thread.
153 private final Thread callerThread
;
155 private final Optional
<Cursor
> cursor
;
157 // INITIALIZED -> SUSPENDED -> DESTROYED
158 // INITIALIZED -> TERMINATED -> DESTROYED
159 // INITIALIZED -> DESTROYED
160 // If the state is incorrect we will throw IllegalStateException.
161 private ScanControllerState state
= ScanControllerState
.INITIALIZED
;
163 private ScanResumerImpl resumer
;
165 public ScanControllerImpl(Optional
<Cursor
> cursor
) {
166 this.callerThread
= Thread
.currentThread();
167 this.cursor
= cursor
;
170 private void preCheck() {
171 Preconditions
.checkState(Thread
.currentThread() == callerThread
,
172 "The current thread is %s, expected thread is %s, " +
173 "you should not call this method outside onNext or onHeartbeat",
174 Thread
.currentThread(), callerThread
);
175 Preconditions
.checkState(state
.equals(ScanControllerState
.INITIALIZED
),
176 "Invalid Stopper state %s", state
);
180 public ScanResumer
suspend() {
182 state
= ScanControllerState
.SUSPENDED
;
183 ScanResumerImpl resumer
= new ScanResumerImpl();
184 this.resumer
= resumer
;
189 public void terminate() {
191 state
= ScanControllerState
.TERMINATED
;
194 // return the current state, and set the state to DESTROYED.
195 ScanControllerState
destroy() {
196 ScanControllerState state
= this.state
;
197 this.state
= ScanControllerState
.DESTROYED
;
202 public Optional
<Cursor
> cursor() {
207 private enum ScanResumerState
{
208 INITIALIZED
, SUSPENDED
, RESUMED
211 // The resume method is allowed to be called in another thread so here we also use the
212 // ResumerState to prevent race. The initial state is INITIALIZED, and in most cases, when back
213 // from onNext or onHeartbeat, we will call the prepare method to change the state to SUSPENDED,
214 // and when user calls resume method, we will change the state to RESUMED. But the resume method
215 // could be called in other thread, and in fact, user could just do this:
216 // controller.suspend().resume()
217 // This is strange but valid. This means the scan could be resumed before we call the prepare
218 // method to do the actual suspend work. So in the resume method, we will check if the state is
219 // INTIALIZED, if it is, then we will just set the state to RESUMED and return. And in prepare
220 // method, if the state is RESUMED already, we will just return an let the scan go on.
221 // Notice that, the public methods of this class is supposed to be called by upper layer only, and
222 // package private methods can only be called within the implementation of
223 // AsyncScanSingleRegionRpcRetryingCaller.
224 private final class ScanResumerImpl
implements AdvancedScanResultConsumer
.ScanResumer
{
226 // INITIALIZED -> SUSPENDED -> RESUMED
227 // INITIALIZED -> RESUMED
228 private ScanResumerState state
= ScanResumerState
.INITIALIZED
;
230 private ScanResponse resp
;
232 private int numberOfCompleteRows
;
234 // If the scan is suspended successfully, we need to do lease renewal to prevent it being closed
235 // by RS due to lease expire. It is a one-time timer task so we need to schedule a new task
236 // every time when the previous task is finished. There could also be race as the renewal is
237 // executed in the timer thread, so we also need to check the state before lease renewal. If the
238 // state is RESUMED already, we will give up lease renewal and also not schedule the next lease
240 private Timeout leaseRenewer
;
243 public void resume() {
244 // just used to fix findbugs warnings. In fact, if resume is called before prepare, then we
245 // just return at the first if condition without loading the resp and numValidResuls field. If
246 // resume is called after suspend, then it is also safe to just reference resp and
247 // numValidResults after the synchronized block as no one will change it anymore.
248 ScanResponse localResp
;
249 int localNumberOfCompleteRows
;
250 synchronized (this) {
251 if (state
== ScanResumerState
.INITIALIZED
) {
252 // user calls this method before we call prepare, so just set the state to
253 // RESUMED, the implementation will just go on.
254 state
= ScanResumerState
.RESUMED
;
257 if (state
== ScanResumerState
.RESUMED
) {
258 // already resumed, give up.
261 state
= ScanResumerState
.RESUMED
;
262 if (leaseRenewer
!= null) {
263 leaseRenewer
.cancel();
265 localResp
= this.resp
;
266 localNumberOfCompleteRows
= this.numberOfCompleteRows
;
268 completeOrNext(localResp
, localNumberOfCompleteRows
);
271 private void scheduleRenewLeaseTask() {
272 leaseRenewer
= retryTimer
.newTimeout(t
-> tryRenewLease(), scannerLeaseTimeoutPeriodNs
/ 2,
273 TimeUnit
.NANOSECONDS
);
276 private synchronized void tryRenewLease() {
277 // the scan has already been resumed, give up
278 if (state
== ScanResumerState
.RESUMED
) {
282 // schedule the next renew lease task again as this is a one-time task.
283 scheduleRenewLeaseTask();
286 // return false if the scan has already been resumed. See the comment above for ScanResumerImpl
288 synchronized boolean prepare(ScanResponse resp
, int numberOfCompleteRows
) {
289 if (state
== ScanResumerState
.RESUMED
) {
290 // user calls resume before we actually suspend the scan, just continue;
293 state
= ScanResumerState
.SUSPENDED
;
295 this.numberOfCompleteRows
= numberOfCompleteRows
;
296 // if there are no more results in region then the scanner at RS side will be closed
297 // automatically so we do not need to renew lease.
298 if (resp
.getMoreResultsInRegion()) {
299 // schedule renew lease task
300 scheduleRenewLeaseTask();
306 public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer
, AsyncConnectionImpl conn
,
307 Scan scan
, ScanMetrics scanMetrics
, long scannerId
, ScanResultCache resultCache
,
308 AdvancedScanResultConsumer consumer
, Interface stub
, HRegionLocation loc
,
309 boolean isRegionServerRemote
, int priority
, long scannerLeaseTimeoutPeriodNs
, long pauseNs
,
310 long pauseForCQTBENs
, int maxAttempts
, long scanTimeoutNs
, long rpcTimeoutNs
,
311 int startLogErrorsCnt
) {
312 this.retryTimer
= retryTimer
;
314 this.scanMetrics
= scanMetrics
;
315 this.scannerId
= scannerId
;
316 this.resultCache
= resultCache
;
317 this.consumer
= consumer
;
320 this.regionServerRemote
= isRegionServerRemote
;
321 this.scannerLeaseTimeoutPeriodNs
= scannerLeaseTimeoutPeriodNs
;
322 this.pauseNs
= pauseNs
;
323 this.pauseForCQTBENs
= pauseForCQTBENs
;
324 this.maxAttempts
= maxAttempts
;
325 this.scanTimeoutNs
= scanTimeoutNs
;
326 this.rpcTimeoutNs
= rpcTimeoutNs
;
327 this.startLogErrorsCnt
= startLogErrorsCnt
;
328 if (scan
.isReversed()) {
329 completeWhenNoMoreResultsInRegion
= this::completeReversedWhenNoMoreResultsInRegion
;
331 completeWhenNoMoreResultsInRegion
= this::completeWhenNoMoreResultsInRegion
;
333 this.future
= new CompletableFuture
<>();
334 this.priority
= priority
;
335 this.controller
= conn
.rpcControllerFactory
.newController();
336 this.controller
.setPriority(priority
);
337 this.exceptions
= new ArrayList
<>();
340 private long elapsedMs() {
341 return TimeUnit
.NANOSECONDS
.toMillis(System
.nanoTime() - nextCallStartNs
);
344 private long remainingTimeNs() {
345 return scanTimeoutNs
- (System
.nanoTime() - nextCallStartNs
);
348 private void closeScanner() {
349 incRPCCallsMetrics(scanMetrics
, regionServerRemote
);
350 resetController(controller
, rpcTimeoutNs
, HConstants
.HIGH_QOS
);
351 ScanRequest req
= RequestConverter
.buildScanRequest(this.scannerId
, 0, true, false);
352 stub
.scan(controller
, req
, resp
-> {
353 if (controller
.failed()) {
354 LOG
.warn("Call to " + loc
.getServerName() + " for closing scanner id = " + scannerId
+
355 " for " + loc
.getRegion().getEncodedName() + " of " +
356 loc
.getRegion().getTable() + " failed, ignore, probably already closed",
357 controller
.getFailed());
362 private void completeExceptionally(boolean closeScanner
) {
367 future
.completeExceptionally(new RetriesExhaustedException(tries
- 1, exceptions
));
370 private void completeNoMoreResults() {
371 future
.complete(false);
374 private void completeWithNextStartRow(byte[] row
, boolean inclusive
) {
375 scan
.withStartRow(row
, inclusive
);
376 future
.complete(true);
379 private void completeWhenError(boolean closeScanner
) {
380 incRPCRetriesMetrics(scanMetrics
, closeScanner
);
385 if (nextStartRowWhenError
!= null) {
386 scan
.withStartRow(nextStartRowWhenError
, includeNextStartRowWhenError
);
388 future
.complete(true);
391 private void onError(Throwable error
) {
392 error
= translateException(error
);
393 if (tries
> startLogErrorsCnt
) {
394 LOG
.warn("Call to " + loc
.getServerName() + " for scanner id = " + scannerId
+ " for " +
395 loc
.getRegion().getEncodedName() + " of " + loc
.getRegion().getTable() +
396 " failed, , tries = " + tries
+ ", maxAttempts = " + maxAttempts
+ ", timeout = " +
397 TimeUnit
.NANOSECONDS
.toMillis(scanTimeoutNs
) + " ms, time elapsed = " + elapsedMs() +
401 boolean scannerClosed
=
402 error
instanceof UnknownScannerException
|| error
instanceof NotServingRegionException
||
403 error
instanceof RegionServerStoppedException
|| error
instanceof ScannerResetException
;
404 RetriesExhaustedException
.ThrowableWithExtraContext qt
=
405 new RetriesExhaustedException
.ThrowableWithExtraContext(error
,
406 EnvironmentEdgeManager
.currentTime(), "");
408 if (tries
>= maxAttempts
) {
409 completeExceptionally(!scannerClosed
);
413 long pauseNsToUse
= error
instanceof CallQueueTooBigException ? pauseForCQTBENs
: pauseNs
;
414 if (scanTimeoutNs
> 0) {
415 long maxDelayNs
= remainingTimeNs() - SLEEP_DELTA_NS
;
416 if (maxDelayNs
<= 0) {
417 completeExceptionally(!scannerClosed
);
420 delayNs
= Math
.min(maxDelayNs
, getPauseTime(pauseNsToUse
, tries
- 1));
422 delayNs
= getPauseTime(pauseNsToUse
, tries
- 1);
425 completeWhenError(false);
428 if (error
instanceof OutOfOrderScannerNextException
) {
429 completeWhenError(true);
432 if (error
instanceof DoNotRetryIOException
) {
433 completeExceptionally(true);
437 retryTimer
.newTimeout(t
-> call(), delayNs
, TimeUnit
.NANOSECONDS
);
440 private void updateNextStartRowWhenError(Result result
) {
441 nextStartRowWhenError
= result
.getRow();
442 includeNextStartRowWhenError
= result
.mayHaveMoreCellsInRow();
445 private void completeWhenNoMoreResultsInRegion() {
446 if (noMoreResultsForScan(scan
, loc
.getRegion())) {
447 completeNoMoreResults();
449 completeWithNextStartRow(loc
.getRegion().getEndKey(), true);
453 private void completeReversedWhenNoMoreResultsInRegion() {
454 if (noMoreResultsForReverseScan(scan
, loc
.getRegion())) {
455 completeNoMoreResults();
457 completeWithNextStartRow(loc
.getRegion().getStartKey(), false);
461 private void completeOrNext(ScanResponse resp
, int numberOfCompleteRows
) {
462 if (resp
.hasMoreResults() && !resp
.getMoreResults()) {
463 // RS tells us there is no more data for the whole scan
464 completeNoMoreResults();
467 if (scan
.getLimit() > 0) {
468 // The RS should have set the moreResults field in ScanResponse to false when we have reached
469 // the limit, so we add an assert here.
470 int newLimit
= scan
.getLimit() - numberOfCompleteRows
;
472 scan
.setLimit(newLimit
);
474 // as in 2.0 this value will always be set
475 if (!resp
.getMoreResultsInRegion()) {
476 completeWhenNoMoreResultsInRegion
.run();
482 private void onComplete(HBaseRpcController controller
, ScanResponse resp
) {
483 if (controller
.failed()) {
484 onError(controller
.getFailed());
487 updateServerSideMetrics(scanMetrics
, resp
);
488 boolean isHeartbeatMessage
= resp
.hasHeartbeatMessage() && resp
.getHeartbeatMessage();
491 int numberOfCompleteRowsBefore
= resultCache
.numberOfCompleteRows();
493 rawResults
= ResponseConverter
.getResults(controller
.cellScanner(), resp
);
494 updateResultsMetrics(scanMetrics
, rawResults
, isHeartbeatMessage
);
495 results
= resultCache
.addAndGet(
496 Optional
.ofNullable(rawResults
).orElse(ScanResultCache
.EMPTY_RESULT_ARRAY
),
498 } catch (IOException e
) {
499 // We can not retry here. The server has responded normally and the call sequence has been
500 // increased so a new scan with the same call sequence will cause an
501 // OutOfOrderScannerNextException. Let the upper layer open a new scanner.
502 LOG
.warn("decode scan response failed", e
);
503 completeWhenError(true);
507 ScanControllerImpl scanController
;
508 if (results
.length
> 0) {
509 scanController
= new ScanControllerImpl(
510 resp
.hasCursor() ? Optional
.of(ProtobufUtil
.toCursor(resp
.getCursor()))
512 updateNextStartRowWhenError(results
[results
.length
- 1]);
513 consumer
.onNext(results
, scanController
);
515 Optional
<Cursor
> cursor
= Optional
.empty();
516 if (resp
.hasCursor()) {
517 cursor
= Optional
.of(ProtobufUtil
.toCursor(resp
.getCursor()));
518 } else if (scan
.isNeedCursorResult() && rawResults
.length
> 0) {
519 // It is size limit exceed and we need to return the last Result's row.
520 // When user setBatch and the scanner is reopened, the server may return Results that
521 // user has seen and the last Result can not be seen because the number is not enough.
522 // So the row keys of results may not be same, we must use the last one.
523 cursor
= Optional
.of(new Cursor(rawResults
[rawResults
.length
- 1].getRow()));
525 scanController
= new ScanControllerImpl(cursor
);
526 if (isHeartbeatMessage
|| cursor
.isPresent()) {
527 // only call onHeartbeat if server tells us explicitly this is a heartbeat message, or we
528 // want to pass a cursor to upper layer.
529 consumer
.onHeartbeat(scanController
);
532 ScanControllerState state
= scanController
.destroy();
533 if (state
== ScanControllerState
.TERMINATED
) {
534 if (resp
.getMoreResultsInRegion()) {
535 // we have more results in region but user request to stop the scan, so we need to close the
536 // scanner explicitly.
539 completeNoMoreResults();
542 int numberOfCompleteRows
= resultCache
.numberOfCompleteRows() - numberOfCompleteRowsBefore
;
543 if (state
== ScanControllerState
.SUSPENDED
) {
544 if (scanController
.resumer
.prepare(resp
, numberOfCompleteRows
)) {
548 completeOrNext(resp
, numberOfCompleteRows
);
551 private void call() {
552 // As we have a call sequence for scan, it is useless to have a different rpc timeout which is
553 // less than the scan timeout. If the server does not respond in time(usually this will not
554 // happen as we have heartbeat now), we will get an OutOfOrderScannerNextException when
555 // resending the next request and the only way to fix this is to close the scanner and open a
558 if (scanTimeoutNs
> 0) {
559 long remainingNs
= scanTimeoutNs
- (System
.nanoTime() - nextCallStartNs
);
560 if (remainingNs
<= 0) {
561 completeExceptionally(true);
564 callTimeoutNs
= remainingNs
;
568 incRPCCallsMetrics(scanMetrics
, regionServerRemote
);
570 incRPCRetriesMetrics(scanMetrics
, regionServerRemote
);
572 resetController(controller
, callTimeoutNs
, priority
);
573 ScanRequest req
= RequestConverter
.buildScanRequest(scannerId
, scan
.getCaching(), false,
574 nextCallSeq
, scan
.isScanMetricsEnabled(), false, scan
.getLimit());
575 final Context context
= Context
.current();
576 stub
.scan(controller
, req
, resp
-> {
577 try (Scope ignored
= context
.makeCurrent()) {
578 onComplete(controller
, resp
);
583 private void next() {
587 nextCallStartNs
= System
.nanoTime();
591 private void renewLease() {
592 incRPCCallsMetrics(scanMetrics
, regionServerRemote
);
594 resetController(controller
, rpcTimeoutNs
, priority
);
596 RequestConverter
.buildScanRequest(scannerId
, 0, false, nextCallSeq
, false, true, -1);
597 stub
.scan(controller
, req
, resp
-> {
602 * Now we will also fetch some cells along with the scanner id when opening a scanner, so we also
603 * need to process the ScanResponse for the open scanner request. The HBaseRpcController for the
604 * open scanner request is also needed because we may have some data in the CellScanner which is
605 * contained in the controller.
606 * @return {@code true} if we should continue, otherwise {@code false}.
608 public CompletableFuture
<Boolean
> start(HBaseRpcController controller
,
609 ScanResponse respWhenOpen
) {
610 onComplete(controller
, respWhenOpen
);