HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / ScanPerNextResultScanner.java
blobc8665e912667d74775e97496a579d93ae786d78c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.util.ArrayDeque;
23 import java.util.Queue;
24 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
25 import org.apache.yetus.audience.InterfaceAudience;
27 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
29 /**
30 * A ResultScanner which will only send request to RS when there are no cached results when calling
31 * next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
32 * control when to send request to RS. The default ResultScanner implementation will fetch in
33 * background.
35 @InterfaceAudience.Private
36 public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer {
38 private final AsyncTable<AdvancedScanResultConsumer> table;
40 private final Scan scan;
42 private final Queue<Result> queue = new ArrayDeque<>();
44 private ScanMetrics scanMetrics;
46 private boolean closed = false;
48 private Throwable error;
50 private ScanResumer resumer;
52 public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) {
53 this.table = table;
54 this.scan = scan;
57 @Override
58 public synchronized void onError(Throwable error) {
59 this.error = error;
60 notifyAll();
63 @Override
64 public synchronized void onComplete() {
65 closed = true;
66 notifyAll();
69 @Override
70 public void onScanMetricsCreated(ScanMetrics scanMetrics) {
71 this.scanMetrics = scanMetrics;
74 @Override
75 public synchronized void onNext(Result[] results, ScanController controller) {
76 assert results.length > 0;
77 if (closed) {
78 controller.terminate();
79 return;
81 for (Result result : results) {
82 queue.add(result);
84 notifyAll();
85 resumer = controller.suspend();
88 @Override
89 public synchronized void onHeartbeat(ScanController controller) {
90 if (closed) {
91 controller.terminate();
92 return;
94 if (scan.isNeedCursorResult()) {
95 controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
99 @Override
100 public synchronized Result next() throws IOException {
101 if (queue.isEmpty()) {
102 if (resumer != null) {
103 resumer.resume();
104 resumer = null;
105 } else {
106 table.scan(scan, this);
109 while (queue.isEmpty()) {
110 if (closed) {
111 return null;
113 if (error != null) {
114 Throwables.propagateIfPossible(error, IOException.class);
115 throw new IOException(error);
117 try {
118 wait();
119 } catch (InterruptedException e) {
120 throw new InterruptedIOException();
123 return queue.poll();
126 @Override
127 public synchronized void close() {
128 closed = true;
129 queue.clear();
130 if (resumer != null) {
131 resumer.resume();
132 resumer = null;
134 notifyAll();
137 @Override
138 public boolean renewLease() {
139 // The renew lease operation will be handled in background
140 return false;
143 @Override
144 public ScanMetrics getScanMetrics() {
145 return scanMetrics;