2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.calcEstimatedSize
;
22 import java
.io
.IOException
;
23 import java
.util
.Queue
;
24 import java
.util
.concurrent
.ConcurrentLinkedQueue
;
25 import java
.util
.concurrent
.ExecutorService
;
26 import java
.util
.concurrent
.LinkedBlockingQueue
;
27 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
28 import java
.util
.concurrent
.atomic
.AtomicLong
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
33 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
34 import org
.apache
.hadoop
.hbase
.util
.Threads
;
37 * ClientAsyncPrefetchScanner implements async scanner behaviour.
38 * Specifically, the cache used by this scanner is a concurrent queue which allows both
39 * the producer (hbase client) and consumer (application) to access the queue in parallel.
40 * The number of rows returned in a prefetch is defined by the caching factor and the result size
42 * This class allocates a buffer cache, whose size is a function of both factors.
43 * The prefetch is invoked when the cache is halfÂfilled, instead of waiting for it to be empty.
44 * This is defined in the method {@link ClientAsyncPrefetchScanner#prefetchCondition()}.
46 @InterfaceAudience.Private
47 public class ClientAsyncPrefetchScanner
extends ClientSimpleScanner
{
49 private static final int ESTIMATED_SINGLE_RESULT_SIZE
= 1024;
50 private static final int DEFAULT_QUEUE_CAPACITY
= 1024;
52 private int cacheCapacity
;
53 private AtomicLong cacheSizeInBytes
;
54 // exception queue (from prefetch to main scan execution)
55 private Queue
<Exception
> exceptionsQueue
;
56 // prefetch runnable object to be executed asynchronously
57 private PrefetchRunnable prefetchRunnable
;
58 // Boolean flag to ensure only a single prefetch is running (per scan)
59 // We use atomic boolean to allow multiple concurrent threads to
60 // consume records from the same cache, but still have a single prefetcher thread.
61 // For a single consumer thread this can be replace with a native boolean.
62 private AtomicBoolean prefetchRunning
;
63 // an attribute for synchronizing close between scanner and prefetch threads
64 private AtomicLong closingThreadId
;
65 private static final int NO_THREAD
= -1;
67 public ClientAsyncPrefetchScanner(Configuration configuration
, Scan scan
, TableName name
,
68 ClusterConnection connection
, RpcRetryingCallerFactory rpcCallerFactory
,
69 RpcControllerFactory rpcControllerFactory
, ExecutorService pool
,
70 int replicaCallTimeoutMicroSecondScan
) throws IOException
{
71 super(configuration
, scan
, name
, connection
, rpcCallerFactory
, rpcControllerFactory
, pool
,
72 replicaCallTimeoutMicroSecondScan
);
76 protected void initCache() {
78 cacheCapacity
= calcCacheCapacity();
79 cache
= new LinkedBlockingQueue
<>();
80 cacheSizeInBytes
= new AtomicLong(0);
81 exceptionsQueue
= new ConcurrentLinkedQueue
<>();
82 prefetchRunnable
= new PrefetchRunnable();
83 prefetchRunning
= new AtomicBoolean(false);
84 closingThreadId
= new AtomicLong(NO_THREAD
);
88 public Result
next() throws IOException
{
93 // If the scanner is closed and there's nothing left in the cache, next is a no-op.
94 if (getCacheCount() == 0 && this.closed
) {
97 if (prefetchCondition()) {
98 // run prefetch in the background only if no prefetch is already running
99 if (!isPrefetchRunning()) {
100 if (prefetchRunning
.compareAndSet(false, true)) {
101 getPool().execute(prefetchRunnable
);
106 while (isPrefetchRunning()) {
107 // prefetch running or still pending
108 if (getCacheCount() > 0) {
111 // (busy) wait for a record - sleep
116 if (getCacheCount() > 0) {
120 // if we exhausted this scanner before calling close, write out the scan metrics
129 public void close() {
130 if (!scanMetricsPublished
) writeScanMetrics();
132 if (!isPrefetchRunning()) {
133 if(closingThreadId
.compareAndSet(NO_THREAD
, Thread
.currentThread().getId())) {
136 } // else do nothing since the async prefetch still needs this resources
140 public int getCacheCount() {
142 int size
= cache
.size();
143 if(size
> cacheCapacity
) {
144 cacheCapacity
= size
;
153 protected void addEstimatedSize(long estimatedSize
) {
154 cacheSizeInBytes
.addAndGet(estimatedSize
);
157 private void handleException() throws IOException
{
158 //The prefetch task running in the background puts any exception it
159 //catches into this exception queue.
160 // Rethrow the exception so the application can handle it.
161 while (!exceptionsQueue
.isEmpty()) {
162 Exception first
= exceptionsQueue
.peek();
163 first
.printStackTrace();
164 if (first
instanceof IOException
) {
165 throw (IOException
) first
;
167 throw (RuntimeException
) first
;
171 private boolean isPrefetchRunning() {
172 return prefetchRunning
.get();
175 // double buffer - double cache size
176 private int calcCacheCapacity() {
177 int capacity
= Integer
.MAX_VALUE
;
178 if(caching
> 0 && caching
< (Integer
.MAX_VALUE
/2)) {
179 capacity
= caching
* 2 + 1;
181 if(capacity
== Integer
.MAX_VALUE
){
182 if(maxScannerResultSize
!= Integer
.MAX_VALUE
) {
183 capacity
= (int) (maxScannerResultSize
/ ESTIMATED_SINGLE_RESULT_SIZE
);
186 capacity
= DEFAULT_QUEUE_CAPACITY
;
189 return Math
.max(capacity
, 1);
192 private boolean prefetchCondition() {
194 (getCacheCount() < getCountThreshold()) &&
195 (maxScannerResultSize
== Long
.MAX_VALUE
||
196 getCacheSizeInBytes() < getSizeThreshold()) ;
199 private int getCountThreshold() {
200 return Math
.max(cacheCapacity
/ 2, 1);
203 private long getSizeThreshold() {
204 return Math
.max(maxScannerResultSize
/ 2, 1);
207 private long getCacheSizeInBytes() {
208 return cacheSizeInBytes
.get();
211 private Result
pollCache() {
212 Result res
= cache
.poll();
213 long estimatedSize
= calcEstimatedSize(res
);
214 addEstimatedSize(-estimatedSize
);
218 private class PrefetchRunnable
implements Runnable
{
224 } catch (Exception e
) {
225 exceptionsQueue
.add(e
);
227 prefetchRunning
.set(false);
229 if (closingThreadId
.compareAndSet(NO_THREAD
, Thread
.currentThread().getId())) {
230 // close was waiting for the prefetch to end