2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one or more
5 * contributor license agreements. See the NOTICE file distributed with this
6 * work for additional information regarding copyright ownership. The ASF
7 * licenses this file to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16 * License for the specific language governing permissions and limitations
19 package org
.apache
.hadoop
.hbase
.client
;
21 import java
.io
.IOException
;
22 import java
.io
.InterruptedIOException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.List
;
26 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
27 import org
.apache
.hadoop
.hbase
.HConstants
;
28 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
29 import org
.apache
.hadoop
.hbase
.RegionLocations
;
30 import org
.apache
.hadoop
.hbase
.TableName
;
31 import org
.apache
.hadoop
.hbase
.classification
.InterfaceAudience
;
32 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
33 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
34 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
38 * A reversed ScannerCallable which supports backward scanning.
40 @InterfaceAudience.Private
41 public class ReversedScannerCallable
extends ScannerCallable
{
43 * The start row for locating regions. In reversed scanner, may locate the
44 * regions for a range of keys when doing
45 * {@link ReversedClientScanner#nextScanner(int, boolean)}
47 protected final byte[] locateStartRow
;
54 * @param locateStartRow The start row for locating regions
55 * @param rpcFactory to create an {@link com.google.protobuf.RpcController}
56 * to talk to the regionserver
58 public ReversedScannerCallable(ClusterConnection connection
, TableName tableName
, Scan scan
,
59 ScanMetrics scanMetrics
, byte[] locateStartRow
, RpcControllerFactory rpcFactory
) {
60 super(connection
, tableName
, scan
, scanMetrics
, rpcFactory
);
61 this.locateStartRow
= locateStartRow
;
69 * @param locateStartRow The start row for locating regions
70 * @param rpcFactory to create an {@link com.google.protobuf.RpcController}
71 * to talk to the regionserver
72 * @param replicaId the replica id
74 public ReversedScannerCallable(ClusterConnection connection
, TableName tableName
, Scan scan
,
75 ScanMetrics scanMetrics
, byte[] locateStartRow
, RpcControllerFactory rpcFactory
,
77 super(connection
, tableName
, scan
, scanMetrics
, rpcFactory
, replicaId
);
78 this.locateStartRow
= locateStartRow
;
83 * {@link #ReversedScannerCallable(ClusterConnection, TableName, Scan,
84 * ScanMetrics, byte[], RpcControllerFactory )}
87 public ReversedScannerCallable(ClusterConnection connection
, TableName tableName
,
88 Scan scan
, ScanMetrics scanMetrics
, byte[] locateStartRow
) {
89 this(connection
, tableName
, scan
, scanMetrics
, locateStartRow
, RpcControllerFactory
90 .instantiate(connection
.getConfiguration()));
94 * @param reload force reload of server location
98 public void prepare(boolean reload
) throws IOException
{
99 if (Thread
.interrupted()) {
100 throw new InterruptedIOException();
102 if (!instantiated
|| reload
) {
103 if (locateStartRow
== null) {
104 // Just locate the region with the row
105 RegionLocations rl
= RpcRetryingCallerWithReadReplicas
.getRegionLocations(reload
, id
,
106 getConnection(), tableName
, row
);
107 this.location
= id
< rl
.size() ? rl
.getRegionLocation(id
) : null;
108 if (this.location
== null) {
109 throw new IOException("Failed to find location, tableName="
110 + tableName
+ ", row=" + Bytes
.toStringBinary(row
) + ", reload="
114 // Need to locate the regions with the range, and the target location is
115 // the last one which is the previous region of last region scanner
116 List
<HRegionLocation
> locatedRegions
= locateRegionsInRange(
117 locateStartRow
, row
, reload
);
118 if (locatedRegions
.isEmpty()) {
119 throw new DoNotRetryIOException(
120 "Does hbase:meta exist hole? Couldn't get regions for the range from "
121 + Bytes
.toStringBinary(locateStartRow
) + " to "
122 + Bytes
.toStringBinary(row
));
124 this.location
= locatedRegions
.get(locatedRegions
.size() - 1);
126 setStub(getConnection().getClient(getLocation().getServerName()));
127 checkIfRegionServerIsRemote();
131 // check how often we retry.
132 if (reload
&& this.scanMetrics
!= null) {
133 this.scanMetrics
.countOfRPCRetries
.incrementAndGet();
134 if (isRegionServerRemote
) {
135 this.scanMetrics
.countOfRemoteRPCRetries
.incrementAndGet();
141 * Get the corresponding regions for an arbitrary range of keys.
142 * @param startKey Starting row in range, inclusive
143 * @param endKey Ending row in range, exclusive
144 * @param reload force reload of server location
145 * @return A list of HRegionLocation corresponding to the regions that contain
146 * the specified range
147 * @throws IOException
149 private List
<HRegionLocation
> locateRegionsInRange(byte[] startKey
,
150 byte[] endKey
, boolean reload
) throws IOException
{
151 final boolean endKeyIsEndOfTable
= Bytes
.equals(endKey
,
152 HConstants
.EMPTY_END_ROW
);
153 if ((Bytes
.compareTo(startKey
, endKey
) > 0) && !endKeyIsEndOfTable
) {
154 throw new IllegalArgumentException("Invalid range: "
155 + Bytes
.toStringBinary(startKey
) + " > "
156 + Bytes
.toStringBinary(endKey
));
158 List
<HRegionLocation
> regionList
= new ArrayList
<HRegionLocation
>();
159 byte[] currentKey
= startKey
;
161 RegionLocations rl
= RpcRetryingCallerWithReadReplicas
.getRegionLocations(reload
, id
,
162 getConnection(), tableName
, currentKey
);
163 HRegionLocation regionLocation
= id
< rl
.size() ? rl
.getRegionLocation(id
) : null;
164 if (regionLocation
!= null && regionLocation
.getRegionInfo().containsRow(currentKey
)) {
165 regionList
.add(regionLocation
);
167 throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
168 + Bytes
.toStringBinary(currentKey
) + " returns incorrect region "
169 + (regionLocation
== null ?
null : regionLocation
.getRegionInfo()));
171 currentKey
= regionLocation
.getRegionInfo().getEndKey();
172 } while (!Bytes
.equals(currentKey
, HConstants
.EMPTY_END_ROW
)
173 && (endKeyIsEndOfTable
|| Bytes
.compareTo(currentKey
, endKey
) < 0));
178 public ScannerCallable
getScannerCallableForReplica(int id
) {
179 ReversedScannerCallable r
= new ReversedScannerCallable(this.cConnection
, this.tableName
,
180 this.getScan(), this.scanMetrics
, this.locateStartRow
, controllerFactory
, id
);
181 r
.setCaching(this.getCaching());