HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncTableResultScanner.java
blob2858d2f915f9da5ea450b88cfcf793372f4652b6
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayDeque;
25 import java.util.Queue;
26 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
27 import org.apache.hadoop.hbase.util.FutureUtils;
28 import org.apache.yetus.audience.InterfaceAudience;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 /**
33 * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
34 * in background and cache it in memory. Typically the {@link #maxCacheSize} will be
35 * {@code 2 * scan.getMaxResultSize()}.
37 @InterfaceAudience.Private
38 class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsumer {
40 private static final Logger LOG = LoggerFactory.getLogger(AsyncTableResultScanner.class);
42 private final AsyncTable<AdvancedScanResultConsumer> rawTable;
44 private final long maxCacheSize;
46 private final Scan scan;
48 private final Queue<Result> queue = new ArrayDeque<>();
50 private ScanMetrics scanMetrics;
52 private long cacheSize;
54 private boolean closed = false;
56 private Throwable error;
58 private ScanResumer resumer;
60 public AsyncTableResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan,
61 long maxCacheSize) {
62 this.rawTable = table;
63 this.maxCacheSize = maxCacheSize;
64 this.scan = scan;
65 table.scan(scan, this);
68 private void addToCache(Result result) {
69 queue.add(result);
70 cacheSize += calcEstimatedSize(result);
73 private void stopPrefetch(ScanController controller) {
74 if (LOG.isDebugEnabled()) {
75 LOG.debug(String.format("0x%x", System.identityHashCode(this)) +
76 " stop prefetching when scanning " + rawTable.getName() + " as the cache size " +
77 cacheSize + " is greater than the maxCacheSize " + maxCacheSize);
79 resumer = controller.suspend();
82 @Override
83 public synchronized void onNext(Result[] results, ScanController controller) {
84 assert results.length > 0;
85 if (closed) {
86 controller.terminate();
87 return;
89 for (Result result : results) {
90 addToCache(result);
92 notifyAll();
93 if (cacheSize >= maxCacheSize) {
94 stopPrefetch(controller);
98 @Override
99 public synchronized void onHeartbeat(ScanController controller) {
100 if (closed) {
101 controller.terminate();
102 return;
104 if (scan.isNeedCursorResult()) {
105 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
109 @Override
110 public synchronized void onError(Throwable error) {
111 this.error = error;
112 notifyAll();
115 @Override
116 public synchronized void onComplete() {
117 closed = true;
118 notifyAll();
121 @Override
122 public void onScanMetricsCreated(ScanMetrics scanMetrics) {
123 this.scanMetrics = scanMetrics;
126 private void resumePrefetch() {
127 if (LOG.isDebugEnabled()) {
128 LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching");
130 resumer.resume();
131 resumer = null;
134 @Override
135 public synchronized Result next() throws IOException {
136 while (queue.isEmpty()) {
137 if (closed) {
138 return null;
140 if (error != null) {
141 FutureUtils.rethrow(error);
143 try {
144 wait();
145 } catch (InterruptedException e) {
146 throw new InterruptedIOException();
149 Result result = queue.poll();
150 if (!result.isCursor()) {
151 cacheSize -= calcEstimatedSize(result);
152 if (resumer != null && cacheSize <= maxCacheSize / 2) {
153 resumePrefetch();
156 return result;
159 @Override
160 public synchronized void close() {
161 closed = true;
162 queue.clear();
163 cacheSize = 0;
164 if (resumer != null) {
165 resumePrefetch();
167 notifyAll();
170 @Override
171 public boolean renewLease() {
172 // we will do prefetching in the background and if there is no space we will just suspend the
173 // scanner. The renew lease operation will be handled in the background.
174 return false;
177 // used in tests to test whether the scanner has been suspended
178 synchronized boolean isSuspended() {
179 return resumer != null;
182 @Override
183 public ScanMetrics getScanMetrics() {
184 return scanMetrics;
187 int getCacheSize() {
188 return queue.size();