2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.calcEstimatedSize
;
22 import java
.io
.IOException
;
23 import java
.io
.InterruptedIOException
;
24 import java
.util
.ArrayDeque
;
25 import java
.util
.Queue
;
26 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
27 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
28 import org
.apache
.yetus
.audience
.InterfaceAudience
;
29 import org
.slf4j
.Logger
;
30 import org
.slf4j
.LoggerFactory
;
33 * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
34 * in background and cache it in memory. Typically the {@link #maxCacheSize} will be
35 * {@code 2 * scan.getMaxResultSize()}.
37 @InterfaceAudience.Private
38 class AsyncTableResultScanner
implements ResultScanner
, AdvancedScanResultConsumer
{
40 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncTableResultScanner
.class);
42 private final AsyncTable
<AdvancedScanResultConsumer
> rawTable
;
44 private final long maxCacheSize
;
46 private final Scan scan
;
48 private final Queue
<Result
> queue
= new ArrayDeque
<>();
50 private ScanMetrics scanMetrics
;
52 private long cacheSize
;
54 private boolean closed
= false;
56 private Throwable error
;
58 private ScanResumer resumer
;
60 public AsyncTableResultScanner(AsyncTable
<AdvancedScanResultConsumer
> table
, Scan scan
,
62 this.rawTable
= table
;
63 this.maxCacheSize
= maxCacheSize
;
65 table
.scan(scan
, this);
68 private void addToCache(Result result
) {
70 cacheSize
+= calcEstimatedSize(result
);
73 private void stopPrefetch(ScanController controller
) {
74 if (LOG
.isDebugEnabled()) {
75 LOG
.debug(String
.format("0x%x", System
.identityHashCode(this)) +
76 " stop prefetching when scanning " + rawTable
.getName() + " as the cache size " +
77 cacheSize
+ " is greater than the maxCacheSize " + maxCacheSize
);
79 resumer
= controller
.suspend();
83 public synchronized void onNext(Result
[] results
, ScanController controller
) {
84 assert results
.length
> 0;
86 controller
.terminate();
89 for (Result result
: results
) {
93 if (cacheSize
>= maxCacheSize
) {
94 stopPrefetch(controller
);
99 public synchronized void onHeartbeat(ScanController controller
) {
101 controller
.terminate();
104 if (scan
.isNeedCursorResult()) {
105 controller
.cursor().ifPresent(c
-> queue
.add(Result
.createCursorResult(c
)));
110 public synchronized void onError(Throwable error
) {
116 public synchronized void onComplete() {
122 public void onScanMetricsCreated(ScanMetrics scanMetrics
) {
123 this.scanMetrics
= scanMetrics
;
126 private void resumePrefetch() {
127 if (LOG
.isDebugEnabled()) {
128 LOG
.debug(String
.format("0x%x", System
.identityHashCode(this)) + " resume prefetching");
135 public synchronized Result
next() throws IOException
{
136 while (queue
.isEmpty()) {
141 FutureUtils
.rethrow(error
);
145 } catch (InterruptedException e
) {
146 throw new InterruptedIOException();
149 Result result
= queue
.poll();
150 if (!result
.isCursor()) {
151 cacheSize
-= calcEstimatedSize(result
);
152 if (resumer
!= null && cacheSize
<= maxCacheSize
/ 2) {
160 public synchronized void close() {
164 if (resumer
!= null) {
171 public boolean renewLease() {
172 // we will do prefetching in the background and if there is no space we will just suspend the
173 // scanner. The renew lease operation will be handled in the background.
177 // used in tests to test whether the scanner has been suspended
178 synchronized boolean isSuspended() {
179 return resumer
!= null;
183 public ScanMetrics
getScanMetrics() {