HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / BufferingScanResultConsumer.java
blob4606ebc0790c630174ebe56e6721966c4940dc6c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
22 import java.io.IOException;
23 import java.util.ArrayDeque;
24 import java.util.Queue;
26 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
28 /**
29 * A scan result consumer which buffers all the data in memory and you can call the {@link #take()}
30 * method below to get the result one by one. Should only be used by tests, do not write production
31 * code like this as the buffer is unlimited and may cause OOM.
33 class BufferingScanResultConsumer implements AdvancedScanResultConsumer {
35 private ScanMetrics scanMetrics;
37 private final Queue<Result> queue = new ArrayDeque<>();
39 private boolean finished;
41 private Throwable error;
43 @Override
44 public void onScanMetricsCreated(ScanMetrics scanMetrics) {
45 this.scanMetrics = scanMetrics;
48 @Override
49 public synchronized void onNext(Result[] results, ScanController controller) {
50 for (Result result : results) {
51 queue.offer(result);
53 notifyAll();
56 @Override
57 public synchronized void onError(Throwable error) {
58 finished = true;
59 this.error = error;
60 notifyAll();
63 @Override
64 public synchronized void onComplete() {
65 finished = true;
66 notifyAll();
69 public synchronized Result take() throws IOException, InterruptedException {
70 for (;;) {
71 if (!queue.isEmpty()) {
72 return queue.poll();
74 if (finished) {
75 if (error != null) {
76 Throwables.propagateIfPossible(error, IOException.class);
77 throw new IOException(error);
78 } else {
79 return null;
82 wait();
86 public ScanMetrics getScanMetrics() {
87 return scanMetrics;