2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.client
;
21 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.incRPCCallsMetrics
;
22 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.incRPCRetriesMetrics
;
23 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.isRemote
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.updateResultsMetrics
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.updateServerSideMetrics
;
27 import java
.io
.IOException
;
28 import java
.io
.InterruptedIOException
;
30 import org
.apache
.commons
.logging
.Log
;
31 import org
.apache
.commons
.logging
.LogFactory
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
34 import org
.apache
.hadoop
.hbase
.HBaseIOException
;
35 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
36 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
37 import org
.apache
.hadoop
.hbase
.NotServingRegionException
;
38 import org
.apache
.hadoop
.hbase
.RegionLocations
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.UnknownScannerException
;
42 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
43 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
44 import org
.apache
.hadoop
.hbase
.exceptions
.ScannerResetException
;
45 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
46 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerStoppedException
;
47 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
49 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ResponseConverter
;
50 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanRequest
;
51 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ScanResponse
;
54 * Scanner operations such as create, next, etc.
55 * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
56 * {@link RpcRetryingCaller} so fails are retried.
58 @InterfaceAudience.Private
59 public class ScannerCallable
extends ClientServiceCallable
<Result
[]> {
60 public static final String LOG_SCANNER_LATENCY_CUTOFF
61 = "hbase.client.log.scanner.latency.cutoff";
62 public static final String LOG_SCANNER_ACTIVITY
= "hbase.client.log.scanner.activity";
64 // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
65 public static final Log LOG
= LogFactory
.getLog(ScannerCallable
.class);
66 protected long scannerId
= -1L;
67 protected boolean instantiated
= false;
68 protected boolean closed
= false;
69 protected boolean renew
= false;
70 protected final Scan scan
;
71 private int caching
= 1;
72 protected ScanMetrics scanMetrics
;
73 private boolean logScannerActivity
= false;
74 private int logCutOffLatency
= 1000;
75 protected final int id
;
81 private MoreResults moreResultsInRegion
;
82 private MoreResults moreResultsForScan
;
85 * Saves whether or not the most recent response from the server was a heartbeat message.
86 * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
88 protected boolean heartbeatMessage
= false;
90 protected Cursor cursor
;
92 // indicate if it is a remote server call
93 protected boolean isRegionServerRemote
= true;
94 private long nextCallSeq
= 0;
95 protected final RpcControllerFactory rpcControllerFactory
;
98 * @param connection which connection
99 * @param tableName table callable is on
100 * @param scan the scan to execute
101 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
103 * @param rpcControllerFactory factory to use when creating
104 * {@link com.google.protobuf.RpcController}
106 public ScannerCallable(ClusterConnection connection
, TableName tableName
, Scan scan
,
107 ScanMetrics scanMetrics
, RpcControllerFactory rpcControllerFactory
) {
108 this(connection
, tableName
, scan
, scanMetrics
, rpcControllerFactory
, 0);
116 * @param id the replicaId
118 public ScannerCallable(ClusterConnection connection
, TableName tableName
, Scan scan
,
119 ScanMetrics scanMetrics
, RpcControllerFactory rpcControllerFactory
, int id
) {
120 super(connection
, tableName
, scan
.getStartRow(), rpcControllerFactory
.newController());
123 this.scanMetrics
= scanMetrics
;
124 Configuration conf
= connection
.getConfiguration();
125 logScannerActivity
= conf
.getBoolean(LOG_SCANNER_ACTIVITY
, false);
126 logCutOffLatency
= conf
.getInt(LOG_SCANNER_LATENCY_CUTOFF
, 1000);
127 this.rpcControllerFactory
= rpcControllerFactory
;
131 * @param reload force reload of server location
132 * @throws IOException
135 public void prepare(boolean reload
) throws IOException
{
136 if (Thread
.interrupted()) {
137 throw new InterruptedIOException();
139 RegionLocations rl
= RpcRetryingCallerWithReadReplicas
.getRegionLocations(!reload
,
140 id
, getConnection(), getTableName(), getRow());
141 location
= id
< rl
.size() ? rl
.getRegionLocation(id
) : null;
142 if (location
== null || location
.getServerName() == null) {
143 // With this exception, there will be a retry. The location can be null for a replica
144 // when the table is created or after a split.
145 throw new HBaseIOException("There is no location for replica id #" + id
);
147 ServerName dest
= location
.getServerName();
148 setStub(super.getConnection().getClient(dest
));
149 if (!instantiated
|| reload
) {
150 checkIfRegionServerIsRemote();
154 // check how often we retry.
156 incRPCRetriesMetrics(scanMetrics
, isRegionServerRemote
);
161 * compare the local machine hostname with region server's hostname to decide if hbase client
162 * connects to a remote region server
164 protected void checkIfRegionServerIsRemote() {
165 isRegionServerRemote
= isRemote(getLocation().getHostname());
168 private ScanResponse
next() throws IOException
{
169 // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
170 setHeartbeatMessage(false);
171 incRPCCallsMetrics(scanMetrics
, isRegionServerRemote
);
172 ScanRequest request
= RequestConverter
.buildScanRequest(scannerId
, caching
, false, nextCallSeq
,
173 this.scanMetrics
!= null, renew
, scan
.getLimit());
175 ScanResponse response
= getStub().scan(getRpcController(), request
);
178 } catch (Exception e
) {
179 IOException ioe
= ProtobufUtil
.handleRemoteException(e
);
180 if (logScannerActivity
) {
182 "Got exception making request " + ProtobufUtil
.toText(request
) + " to " + getLocation(),
185 if (logScannerActivity
) {
186 if (ioe
instanceof UnknownScannerException
) {
188 HRegionLocation location
=
189 getConnection().relocateRegion(getTableName(), scan
.getStartRow());
190 LOG
.info("Scanner=" + scannerId
+ " expired, current region location is "
191 + location
.toString());
192 } catch (Throwable t
) {
193 LOG
.info("Failed to relocate region", t
);
195 } else if (ioe
instanceof ScannerResetException
) {
196 LOG
.info("Scanner=" + scannerId
+ " has received an exception, and the server "
197 + "asked us to reset the scanner state.",
201 // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
202 // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
203 // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
204 // a scan when doing a next in particular, we want to break out and get the scanner to
205 // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
206 // yeah and hard to follow and in need of a refactor).
207 if (ioe
instanceof NotServingRegionException
) {
208 // Throw a DNRE so that we break out of cycle of calling NSRE
209 // when what we need is to open scanner against new location.
210 // Attach NSRE to signal client that it needs to re-setup scanner.
211 if (this.scanMetrics
!= null) {
212 this.scanMetrics
.countOfNSRE
.incrementAndGet();
214 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe
);
215 } else if (ioe
instanceof RegionServerStoppedException
) {
216 // Throw a DNRE so that we break out of cycle of the retries and instead go and
217 // open scanner against new location.
218 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe
);
220 // The outer layers will retry
226 private void setAlreadyClosed() {
227 this.scannerId
= -1L;
232 protected Result
[] rpcCall() throws Exception
{
233 if (Thread
.interrupted()) {
234 throw new InterruptedIOException();
240 ScanResponse response
;
241 if (this.scannerId
== -1L) {
242 response
= openScanner();
246 long timestamp
= System
.currentTimeMillis();
247 boolean isHeartBeat
= response
.hasHeartbeatMessage() && response
.getHeartbeatMessage();
248 setHeartbeatMessage(isHeartBeat
);
249 if (isHeartBeat
&& scan
.isNeedCursorResult() && response
.hasCursor()) {
250 cursor
= ProtobufUtil
.toCursor(response
.getCursor());
252 Result
[] rrs
= ResponseConverter
.getResults(getRpcControllerCellScanner(), response
);
253 if (logScannerActivity
) {
254 long now
= System
.currentTimeMillis();
255 if (now
- timestamp
> logCutOffLatency
) {
256 int rows
= rrs
== null ?
0 : rrs
.length
;
257 LOG
.info("Took " + (now
- timestamp
) + "ms to fetch " + rows
+ " rows from scanner="
261 updateServerSideMetrics(scanMetrics
, response
);
262 // moreResults is only used for the case where a filter exhausts all elements
263 if (response
.hasMoreResults()) {
264 if (response
.getMoreResults()) {
265 setMoreResultsForScan(MoreResults
.YES
);
267 setMoreResultsForScan(MoreResults
.NO
);
271 setMoreResultsForScan(MoreResults
.UNKNOWN
);
273 if (response
.hasMoreResultsInRegion()) {
274 if (response
.getMoreResultsInRegion()) {
275 setMoreResultsInRegion(MoreResults
.YES
);
277 setMoreResultsInRegion(MoreResults
.NO
);
281 setMoreResultsInRegion(MoreResults
.UNKNOWN
);
283 updateResultsMetrics(scanMetrics
, rrs
, isRegionServerRemote
);
288 * @return true when the most recent RPC response indicated that the response was a heartbeat
289 * message. Heartbeat messages are sent back from the server when the processing of the
290 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
291 * timeouts during long running scan operations.
293 boolean isHeartbeatMessage() {
294 return heartbeatMessage
;
297 public Cursor
getCursor() {
301 private void setHeartbeatMessage(boolean heartbeatMessage
) {
302 this.heartbeatMessage
= heartbeatMessage
;
305 private void close() {
306 if (this.scannerId
== -1L) {
310 incRPCCallsMetrics(scanMetrics
, isRegionServerRemote
);
311 ScanRequest request
=
312 RequestConverter
.buildScanRequest(this.scannerId
, 0, true, this.scanMetrics
!= null);
314 getStub().scan(getRpcController(), request
);
315 } catch (Exception e
) {
316 throw ProtobufUtil
.handleRemoteException(e
);
318 } catch (IOException e
) {
319 TableName table
= getTableName();
320 String tableDetails
= (table
== null) ?
"" : (" on table: " + table
.getNameAsString());
321 LOG
.warn("Ignore, probably already closed. Current scan: " + getScan().toString()
324 this.scannerId
= -1L;
327 private ScanResponse
openScanner() throws IOException
{
328 incRPCCallsMetrics(scanMetrics
, isRegionServerRemote
);
329 ScanRequest request
= RequestConverter
.buildScanRequest(
330 getLocation().getRegionInfo().getRegionName(), this.scan
, this.caching
, false);
332 ScanResponse response
= getStub().scan(getRpcController(), request
);
333 long id
= response
.getScannerId();
334 if (logScannerActivity
) {
335 LOG
.info("Open scanner=" + id
+ " for scan=" + scan
.toString()
336 + " on region " + getLocation().toString());
338 if (response
.hasMvccReadPoint()) {
339 this.scan
.setMvccReadPoint(response
.getMvccReadPoint());
343 } catch (Exception e
) {
344 throw ProtobufUtil
.handleRemoteException(e
);
348 protected Scan
getScan() {
353 * Call this when the next invocation of call should close the scanner
355 public void setClose() {
360 * Indicate whether we make a call only to renew the lease, but without affected the scanner in
362 * @param val true if only the lease should be renewed
364 public void setRenew(boolean val
) {
369 * @return the HRegionInfo for the current region
372 public HRegionInfo
getHRegionInfo() {
376 return getLocation().getRegionInfo();
380 * Get the number of rows that will be fetched on next
381 * @return the number of rows for caching
383 public int getCaching() {
388 * Set the number of rows that will be fetched on next
389 * @param caching the number of rows for caching
391 public void setCaching(int caching
) {
392 this.caching
= caching
;
395 public ScannerCallable
getScannerCallableForReplica(int id
) {
396 ScannerCallable s
= new ScannerCallable(this.getConnection(), getTableName(),
397 this.getScan(), this.scanMetrics
, this.rpcControllerFactory
, id
);
398 s
.setCaching(this.caching
);
403 * Should the client attempt to fetch more results from this region
405 MoreResults
moreResultsInRegion() {
406 return moreResultsInRegion
;
409 void setMoreResultsInRegion(MoreResults moreResults
) {
410 this.moreResultsInRegion
= moreResults
;
414 * Should the client attempt to fetch more results for the whole scan.
416 MoreResults
moreResultsForScan() {
417 return moreResultsForScan
;
420 void setMoreResultsForScan(MoreResults moreResults
) {
421 this.moreResultsForScan
= moreResults
;