HBASE-18420 Some methods of Admin don't use ColumnFamilyDescriptor
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / ScannerCallable.java
blob4227e41eacb9e41e282492fb192d4ade8890852c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.client;
21 import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
22 import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
23 import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
24 import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
25 import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.DoNotRetryIOException;
34 import org.apache.hadoop.hbase.HBaseIOException;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HRegionLocation;
37 import org.apache.hadoop.hbase.NotServingRegionException;
38 import org.apache.hadoop.hbase.RegionLocations;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.TableName;
41 import org.apache.hadoop.hbase.UnknownScannerException;
42 import org.apache.hadoop.hbase.classification.InterfaceAudience;
43 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
44 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
47 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
48 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
49 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
50 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
51 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
53 /**
54 * Scanner operations such as create, next, etc.
55 * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as
56 * {@link RpcRetryingCaller} so fails are retried.
58 @InterfaceAudience.Private
59 public class ScannerCallable extends ClientServiceCallable<Result[]> {
60 public static final String LOG_SCANNER_LATENCY_CUTOFF
61 = "hbase.client.log.scanner.latency.cutoff";
62 public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity";
64 // Keeping LOG public as it is being used in TestScannerHeartbeatMessages
65 public static final Log LOG = LogFactory.getLog(ScannerCallable.class);
66 protected long scannerId = -1L;
67 protected boolean instantiated = false;
68 protected boolean closed = false;
69 protected boolean renew = false;
70 protected final Scan scan;
71 private int caching = 1;
72 protected ScanMetrics scanMetrics;
73 private boolean logScannerActivity = false;
74 private int logCutOffLatency = 1000;
75 protected final int id;
77 enum MoreResults {
78 YES, NO, UNKNOWN
81 private MoreResults moreResultsInRegion;
82 private MoreResults moreResultsForScan;
84 /**
85 * Saves whether or not the most recent response from the server was a heartbeat message.
86 * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()}
88 protected boolean heartbeatMessage = false;
90 protected Cursor cursor;
92 // indicate if it is a remote server call
93 protected boolean isRegionServerRemote = true;
94 private long nextCallSeq = 0;
95 protected final RpcControllerFactory rpcControllerFactory;
97 /**
98 * @param connection which connection
99 * @param tableName table callable is on
100 * @param scan the scan to execute
101 * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect
102 * metrics
103 * @param rpcControllerFactory factory to use when creating
104 * {@link com.google.protobuf.RpcController}
106 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
107 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory) {
108 this(connection, tableName, scan, scanMetrics, rpcControllerFactory, 0);
112 * @param connection
113 * @param tableName
114 * @param scan
115 * @param scanMetrics
116 * @param id the replicaId
118 public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
119 ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
120 super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController());
121 this.id = id;
122 this.scan = scan;
123 this.scanMetrics = scanMetrics;
124 Configuration conf = connection.getConfiguration();
125 logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false);
126 logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000);
127 this.rpcControllerFactory = rpcControllerFactory;
131 * @param reload force reload of server location
132 * @throws IOException
134 @Override
135 public void prepare(boolean reload) throws IOException {
136 if (Thread.interrupted()) {
137 throw new InterruptedIOException();
139 RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
140 id, getConnection(), getTableName(), getRow());
141 location = id < rl.size() ? rl.getRegionLocation(id) : null;
142 if (location == null || location.getServerName() == null) {
143 // With this exception, there will be a retry. The location can be null for a replica
144 // when the table is created or after a split.
145 throw new HBaseIOException("There is no location for replica id #" + id);
147 ServerName dest = location.getServerName();
148 setStub(super.getConnection().getClient(dest));
149 if (!instantiated || reload) {
150 checkIfRegionServerIsRemote();
151 instantiated = true;
153 cursor = null;
154 // check how often we retry.
155 if (reload) {
156 incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
161 * compare the local machine hostname with region server's hostname to decide if hbase client
162 * connects to a remote region server
164 protected void checkIfRegionServerIsRemote() {
165 isRegionServerRemote = isRemote(getLocation().getHostname());
168 private ScanResponse next() throws IOException {
169 // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server
170 setHeartbeatMessage(false);
171 incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
172 ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq,
173 this.scanMetrics != null, renew, scan.getLimit());
174 try {
175 ScanResponse response = getStub().scan(getRpcController(), request);
176 nextCallSeq++;
177 return response;
178 } catch (Exception e) {
179 IOException ioe = ProtobufUtil.handleRemoteException(e);
180 if (logScannerActivity) {
181 LOG.info(
182 "Got exception making request " + ProtobufUtil.toText(request) + " to " + getLocation(),
185 if (logScannerActivity) {
186 if (ioe instanceof UnknownScannerException) {
187 try {
188 HRegionLocation location =
189 getConnection().relocateRegion(getTableName(), scan.getStartRow());
190 LOG.info("Scanner=" + scannerId + " expired, current region location is "
191 + location.toString());
192 } catch (Throwable t) {
193 LOG.info("Failed to relocate region", t);
195 } else if (ioe instanceof ScannerResetException) {
196 LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
197 + "asked us to reset the scanner state.",
198 ioe);
201 // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
202 // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want
203 // ServerCallable#withRetries to just retry when it gets these exceptions. In here in
204 // a scan when doing a next in particular, we want to break out and get the scanner to
205 // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly,
206 // yeah and hard to follow and in need of a refactor).
207 if (ioe instanceof NotServingRegionException) {
208 // Throw a DNRE so that we break out of cycle of calling NSRE
209 // when what we need is to open scanner against new location.
210 // Attach NSRE to signal client that it needs to re-setup scanner.
211 if (this.scanMetrics != null) {
212 this.scanMetrics.countOfNSRE.incrementAndGet();
214 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
215 } else if (ioe instanceof RegionServerStoppedException) {
216 // Throw a DNRE so that we break out of cycle of the retries and instead go and
217 // open scanner against new location.
218 throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
219 } else {
220 // The outer layers will retry
221 throw ioe;
226 private void setAlreadyClosed() {
227 this.scannerId = -1L;
228 this.closed = true;
231 @Override
232 protected Result[] rpcCall() throws Exception {
233 if (Thread.interrupted()) {
234 throw new InterruptedIOException();
236 if (closed) {
237 close();
238 return null;
240 ScanResponse response;
241 if (this.scannerId == -1L) {
242 response = openScanner();
243 } else {
244 response = next();
246 long timestamp = System.currentTimeMillis();
247 boolean isHeartBeat = response.hasHeartbeatMessage() && response.getHeartbeatMessage();
248 setHeartbeatMessage(isHeartBeat);
249 if (isHeartBeat && scan.isNeedCursorResult() && response.hasCursor()) {
250 cursor = ProtobufUtil.toCursor(response.getCursor());
252 Result[] rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response);
253 if (logScannerActivity) {
254 long now = System.currentTimeMillis();
255 if (now - timestamp > logCutOffLatency) {
256 int rows = rrs == null ? 0 : rrs.length;
257 LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner="
258 + scannerId);
261 updateServerSideMetrics(scanMetrics, response);
262 // moreResults is only used for the case where a filter exhausts all elements
263 if (response.hasMoreResults()) {
264 if (response.getMoreResults()) {
265 setMoreResultsForScan(MoreResults.YES);
266 } else {
267 setMoreResultsForScan(MoreResults.NO);
268 setAlreadyClosed();
270 } else {
271 setMoreResultsForScan(MoreResults.UNKNOWN);
273 if (response.hasMoreResultsInRegion()) {
274 if (response.getMoreResultsInRegion()) {
275 setMoreResultsInRegion(MoreResults.YES);
276 } else {
277 setMoreResultsInRegion(MoreResults.NO);
278 setAlreadyClosed();
280 } else {
281 setMoreResultsInRegion(MoreResults.UNKNOWN);
283 updateResultsMetrics(scanMetrics, rrs, isRegionServerRemote);
284 return rrs;
288 * @return true when the most recent RPC response indicated that the response was a heartbeat
289 * message. Heartbeat messages are sent back from the server when the processing of the
290 * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid
291 * timeouts during long running scan operations.
293 boolean isHeartbeatMessage() {
294 return heartbeatMessage;
297 public Cursor getCursor() {
298 return cursor;
301 private void setHeartbeatMessage(boolean heartbeatMessage) {
302 this.heartbeatMessage = heartbeatMessage;
305 private void close() {
306 if (this.scannerId == -1L) {
307 return;
309 try {
310 incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
311 ScanRequest request =
312 RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null);
313 try {
314 getStub().scan(getRpcController(), request);
315 } catch (Exception e) {
316 throw ProtobufUtil.handleRemoteException(e);
318 } catch (IOException e) {
319 TableName table = getTableName();
320 String tableDetails = (table == null) ? "" : (" on table: " + table.getNameAsString());
321 LOG.warn("Ignore, probably already closed. Current scan: " + getScan().toString()
322 + tableDetails, e);
324 this.scannerId = -1L;
327 private ScanResponse openScanner() throws IOException {
328 incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
329 ScanRequest request = RequestConverter.buildScanRequest(
330 getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false);
331 try {
332 ScanResponse response = getStub().scan(getRpcController(), request);
333 long id = response.getScannerId();
334 if (logScannerActivity) {
335 LOG.info("Open scanner=" + id + " for scan=" + scan.toString()
336 + " on region " + getLocation().toString());
338 if (response.hasMvccReadPoint()) {
339 this.scan.setMvccReadPoint(response.getMvccReadPoint());
341 this.scannerId = id;
342 return response;
343 } catch (Exception e) {
344 throw ProtobufUtil.handleRemoteException(e);
348 protected Scan getScan() {
349 return scan;
353 * Call this when the next invocation of call should close the scanner
355 public void setClose() {
356 this.closed = true;
360 * Indicate whether we make a call only to renew the lease, but without affected the scanner in
361 * any other way.
362 * @param val true if only the lease should be renewed
364 public void setRenew(boolean val) {
365 this.renew = val;
369 * @return the HRegionInfo for the current region
371 @Override
372 public HRegionInfo getHRegionInfo() {
373 if (!instantiated) {
374 return null;
376 return getLocation().getRegionInfo();
380 * Get the number of rows that will be fetched on next
381 * @return the number of rows for caching
383 public int getCaching() {
384 return caching;
388 * Set the number of rows that will be fetched on next
389 * @param caching the number of rows for caching
391 public void setCaching(int caching) {
392 this.caching = caching;
395 public ScannerCallable getScannerCallableForReplica(int id) {
396 ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(),
397 this.getScan(), this.scanMetrics, this.rpcControllerFactory, id);
398 s.setCaching(this.caching);
399 return s;
403 * Should the client attempt to fetch more results from this region
405 MoreResults moreResultsInRegion() {
406 return moreResultsInRegion;
409 void setMoreResultsInRegion(MoreResults moreResults) {
410 this.moreResultsInRegion = moreResults;
414 * Should the client attempt to fetch more results for the whole scan.
416 MoreResults moreResultsForScan() {
417 return moreResultsForScan;
420 void setMoreResultsForScan(MoreResults moreResults) {
421 this.moreResultsForScan = moreResults;