2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import java
.io
.IOException
;
21 import java
.io
.InterruptedIOException
;
22 import java
.util
.ArrayDeque
;
23 import java
.util
.Queue
;
24 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
25 import org
.apache
.yetus
.audience
.InterfaceAudience
;
27 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Throwables
;
30 * A ResultScanner which will only send request to RS when there are no cached results when calling
31 * next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
32 * control when to send request to RS. The default ResultScanner implementation will fetch in
35 @InterfaceAudience.Private
36 public class ScanPerNextResultScanner
implements ResultScanner
, AdvancedScanResultConsumer
{
38 private final AsyncTable
<AdvancedScanResultConsumer
> table
;
40 private final Scan scan
;
42 private final Queue
<Result
> queue
= new ArrayDeque
<>();
44 private ScanMetrics scanMetrics
;
46 private boolean closed
= false;
48 private Throwable error
;
50 private ScanResumer resumer
;
52 public ScanPerNextResultScanner(AsyncTable
<AdvancedScanResultConsumer
> table
, Scan scan
) {
58 public synchronized void onError(Throwable error
) {
64 public synchronized void onComplete() {
70 public void onScanMetricsCreated(ScanMetrics scanMetrics
) {
71 this.scanMetrics
= scanMetrics
;
75 public synchronized void onNext(Result
[] results
, ScanController controller
) {
76 assert results
.length
> 0;
78 controller
.terminate();
81 for (Result result
: results
) {
85 resumer
= controller
.suspend();
89 public synchronized void onHeartbeat(ScanController controller
) {
91 controller
.terminate();
94 if (scan
.isNeedCursorResult()) {
95 controller
.cursor().ifPresent(c
-> queue
.add(Result
.createCursorResult(c
)));
100 public synchronized Result
next() throws IOException
{
101 if (queue
.isEmpty()) {
102 if (resumer
!= null) {
106 table
.scan(scan
, this);
109 while (queue
.isEmpty()) {
114 Throwables
.propagateIfPossible(error
, IOException
.class);
115 throw new IOException(error
);
119 } catch (InterruptedException e
) {
120 throw new InterruptedIOException();
127 public synchronized void close() {
130 if (resumer
!= null) {
138 public boolean renewLease() {
139 // The renew lease operation will be handled in background
144 public ScanMetrics
getScanMetrics() {