HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / ClientAsyncPrefetchScanner.java
blobb1fc2da68f3ebe792fca5e11897044f4dd7878b4
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
22 import java.io.IOException;
23 import java.util.Queue;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicLong;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
34 import org.apache.hadoop.hbase.util.Threads;
36 /**
37 * ClientAsyncPrefetchScanner implements async scanner behaviour.
38 * Specifically, the cache used by this scanner is a concurrent queue which allows both
39 * the producer (hbase client) and consumer (application) to access the queue in parallel.
40 * The number of rows returned in a prefetch is defined by the caching factor and the result size
41 * factor.
42 * This class allocates a buffer cache, whose size is a function of both factors.
43 * The prefetch is invoked when the cache is half­filled, instead of waiting for it to be empty.
44 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
46 @InterfaceAudience.Private
47 public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
49 private static final int ESTIMATED_SINGLE_RESULT_SIZE = 1024;
50 private static final int DEFAULT_QUEUE_CAPACITY = 1024;
52 private int cacheCapacity;
53 private AtomicLong cacheSizeInBytes;
54 // exception queue (from prefetch to main scan execution)
55 private Queue<Exception> exceptionsQueue;
56 // prefetch runnable object to be executed asynchronously
57 private PrefetchRunnable prefetchRunnable;
58 // Boolean flag to ensure only a single prefetch is running (per scan)
59 // We use atomic boolean to allow multiple concurrent threads to
60 // consume records from the same cache, but still have a single prefetcher thread.
61 // For a single consumer thread this can be replace with a native boolean.
62 private AtomicBoolean prefetchRunning;
63 // an attribute for synchronizing close between scanner and prefetch threads
64 private AtomicLong closingThreadId;
65 private static final int NO_THREAD = -1;
67 public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
68 ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
69 RpcControllerFactory rpcControllerFactory, ExecutorService pool,
70 int replicaCallTimeoutMicroSecondScan) throws IOException {
71 super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool,
72 replicaCallTimeoutMicroSecondScan);
75 @Override
76 protected void initCache() {
77 // concurrent cache
78 cacheCapacity = calcCacheCapacity();
79 cache = new LinkedBlockingQueue<>();
80 cacheSizeInBytes = new AtomicLong(0);
81 exceptionsQueue = new ConcurrentLinkedQueue<>();
82 prefetchRunnable = new PrefetchRunnable();
83 prefetchRunning = new AtomicBoolean(false);
84 closingThreadId = new AtomicLong(NO_THREAD);
87 @Override
88 public Result next() throws IOException {
90 try {
91 handleException();
93 // If the scanner is closed and there's nothing left in the cache, next is a no-op.
94 if (getCacheCount() == 0 && this.closed) {
95 return null;
97 if (prefetchCondition()) {
98 // run prefetch in the background only if no prefetch is already running
99 if (!isPrefetchRunning()) {
100 if (prefetchRunning.compareAndSet(false, true)) {
101 getPool().execute(prefetchRunnable);
106 while (isPrefetchRunning()) {
107 // prefetch running or still pending
108 if (getCacheCount() > 0) {
109 return pollCache();
110 } else {
111 // (busy) wait for a record - sleep
112 Threads.sleep(1);
116 if (getCacheCount() > 0) {
117 return pollCache();
120 // if we exhausted this scanner before calling close, write out the scan metrics
121 writeScanMetrics();
122 return null;
123 } finally {
124 handleException();
128 @Override
129 public void close() {
130 if (!scanMetricsPublished) writeScanMetrics();
131 closed = true;
132 if (!isPrefetchRunning()) {
133 if(closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {
134 super.close();
136 } // else do nothing since the async prefetch still needs this resources
139 @Override
140 public int getCacheCount() {
141 if(cache != null) {
142 int size = cache.size();
143 if(size > cacheCapacity) {
144 cacheCapacity = size;
146 return size;
147 } else {
148 return 0;
152 @Override
153 protected void addEstimatedSize(long estimatedSize) {
154 cacheSizeInBytes.addAndGet(estimatedSize);
157 private void handleException() throws IOException {
158 //The prefetch task running in the background puts any exception it
159 //catches into this exception queue.
160 // Rethrow the exception so the application can handle it.
161 while (!exceptionsQueue.isEmpty()) {
162 Exception first = exceptionsQueue.peek();
163 first.printStackTrace();
164 if (first instanceof IOException) {
165 throw (IOException) first;
167 throw (RuntimeException) first;
171 private boolean isPrefetchRunning() {
172 return prefetchRunning.get();
175 // double buffer - double cache size
176 private int calcCacheCapacity() {
177 int capacity = Integer.MAX_VALUE;
178 if(caching > 0 && caching < (Integer.MAX_VALUE /2)) {
179 capacity = caching * 2 + 1;
181 if(capacity == Integer.MAX_VALUE){
182 if(maxScannerResultSize != Integer.MAX_VALUE) {
183 capacity = (int) (maxScannerResultSize / ESTIMATED_SINGLE_RESULT_SIZE);
185 else {
186 capacity = DEFAULT_QUEUE_CAPACITY;
189 return Math.max(capacity, 1);
192 private boolean prefetchCondition() {
193 return
194 (getCacheCount() < getCountThreshold()) &&
195 (maxScannerResultSize == Long.MAX_VALUE ||
196 getCacheSizeInBytes() < getSizeThreshold()) ;
199 private int getCountThreshold() {
200 return Math.max(cacheCapacity / 2, 1);
203 private long getSizeThreshold() {
204 return Math.max(maxScannerResultSize / 2, 1);
207 private long getCacheSizeInBytes() {
208 return cacheSizeInBytes.get();
211 private Result pollCache() {
212 Result res = cache.poll();
213 long estimatedSize = calcEstimatedSize(res);
214 addEstimatedSize(-estimatedSize);
215 return res;
218 private class PrefetchRunnable implements Runnable {
220 @Override
221 public void run() {
222 try {
223 loadCache();
224 } catch (Exception e) {
225 exceptionsQueue.add(e);
226 } finally {
227 prefetchRunning.set(false);
228 if(closed) {
229 if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {
230 // close was waiting for the prefetch to end
231 close();