2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
22 import java
.io
.Closeable
;
23 import java
.io
.IOException
;
24 import java
.util
.ArrayList
;
25 import java
.util
.Collections
;
26 import java
.util
.List
;
27 import java
.util
.Optional
;
28 import java
.util
.concurrent
.CompletableFuture
;
29 import java
.util
.stream
.Collectors
;
30 import org
.apache
.hadoop
.hbase
.client
.AdvancedScanResultConsumer
;
31 import org
.apache
.hadoop
.hbase
.client
.AsyncTable
;
32 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
33 import org
.apache
.hadoop
.hbase
.client
.Get
;
34 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
35 import org
.apache
.hadoop
.hbase
.client
.Result
;
36 import org
.apache
.hadoop
.hbase
.client
.Scan
;
37 import org
.apache
.hadoop
.hbase
.client
.Scan
.ReadType
;
38 import org
.apache
.hadoop
.hbase
.client
.TableState
;
39 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
40 import org
.apache
.hadoop
.hbase
.util
.Pair
;
41 import org
.apache
.yetus
.audience
.InterfaceAudience
;
42 import org
.slf4j
.Logger
;
43 import org
.slf4j
.LoggerFactory
;
46 * The (asynchronous) meta table accessor used at client side. Used to read/write region and
47 * assignment information store in <code>hbase:meta</code>.
49 * @see CatalogFamilyFormat
51 @InterfaceAudience.Private
52 public final class ClientMetaTableAccessor
{
54 private static final Logger LOG
= LoggerFactory
.getLogger(ClientMetaTableAccessor
.class);
56 private ClientMetaTableAccessor() {
59 @InterfaceAudience.Private
60 public enum QueryType
{
61 ALL(HConstants
.TABLE_FAMILY
, HConstants
.CATALOG_FAMILY
), REGION(HConstants
.CATALOG_FAMILY
),
62 TABLE(HConstants
.TABLE_FAMILY
), REPLICATION(HConstants
.REPLICATION_BARRIER_FAMILY
);
64 private final byte[][] families
;
66 QueryType(byte[]... families
) {
67 this.families
= families
;
70 byte[][] getFamilies() {
75 public static CompletableFuture
<Boolean
> tableExists(AsyncTable
<?
> metaTable
,
76 TableName tableName
) {
77 return getTableState(metaTable
, tableName
).thenApply(Optional
::isPresent
);
80 public static CompletableFuture
<Optional
<TableState
>> getTableState(AsyncTable
<?
> metaTable
,
81 TableName tableName
) {
82 CompletableFuture
<Optional
<TableState
>> future
= new CompletableFuture
<>();
83 Get get
= new Get(tableName
.getName()).addColumn(HConstants
.TABLE_FAMILY
,
84 HConstants
.TABLE_STATE_QUALIFIER
);
85 addListener(metaTable
.get(get
), (result
, error
) -> {
87 future
.completeExceptionally(error
);
91 future
.complete(getTableState(result
));
92 } catch (IOException e
) {
93 future
.completeExceptionally(e
);
100 * Returns the HRegionLocation from meta for the given region
102 * @param regionName region we're looking for
103 * @return HRegionLocation for the given region
105 public static CompletableFuture
<Optional
<HRegionLocation
>>
106 getRegionLocation(AsyncTable
<?
> metaTable
, byte[] regionName
) {
107 CompletableFuture
<Optional
<HRegionLocation
>> future
= new CompletableFuture
<>();
109 RegionInfo parsedRegionInfo
= CatalogFamilyFormat
.parseRegionInfoFromRegionName(regionName
);
110 addListener(metaTable
.get(new Get(CatalogFamilyFormat
.getMetaKeyForRegion(parsedRegionInfo
))
111 .addFamily(HConstants
.CATALOG_FAMILY
)), (r
, err
) -> {
113 future
.completeExceptionally(err
);
116 future
.complete(getRegionLocations(r
)
117 .map(locations
-> locations
.getRegionLocation(parsedRegionInfo
.getReplicaId())));
119 } catch (IOException parseEx
) {
120 LOG
.warn("Failed to parse the passed region name: " + Bytes
.toStringBinary(regionName
));
121 future
.completeExceptionally(parseEx
);
127 * Returns the HRegionLocation from meta for the given encoded region name
129 * @param encodedRegionName region we're looking for
130 * @return HRegionLocation for the given region
132 public static CompletableFuture
<Optional
<HRegionLocation
>>
133 getRegionLocationWithEncodedName(AsyncTable
<?
> metaTable
, byte[] encodedRegionName
) {
134 CompletableFuture
<Optional
<HRegionLocation
>> future
= new CompletableFuture
<>();
137 .scanAll(new Scan().setReadType(ReadType
.PREAD
).addFamily(HConstants
.CATALOG_FAMILY
)),
140 future
.completeExceptionally(err
);
143 String encodedRegionNameStr
= Bytes
.toString(encodedRegionName
);
144 results
.stream().filter(result
-> !result
.isEmpty())
145 .filter(result
-> CatalogFamilyFormat
.getRegionInfo(result
) != null).forEach(result
-> {
146 getRegionLocations(result
).ifPresent(locations
-> {
147 for (HRegionLocation location
: locations
.getRegionLocations()) {
148 if (location
!= null &&
149 encodedRegionNameStr
.equals(location
.getRegion().getEncodedName())) {
150 future
.complete(Optional
.of(location
));
156 future
.complete(Optional
.empty());
161 private static Optional
<TableState
> getTableState(Result r
) throws IOException
{
162 return Optional
.ofNullable(CatalogFamilyFormat
.getTableState(r
));
166 * Used to get all region locations for the specific table.
168 * @param tableName table we're looking for, can be null for getting all regions
169 * @return the list of region locations. The return value will be wrapped by a
170 * {@link CompletableFuture}.
172 public static CompletableFuture
<List
<HRegionLocation
>> getTableHRegionLocations(
173 AsyncTable
<AdvancedScanResultConsumer
> metaTable
, TableName tableName
) {
174 CompletableFuture
<List
<HRegionLocation
>> future
= new CompletableFuture
<>();
175 addListener(getTableRegionsAndLocations(metaTable
, tableName
, true), (locations
, err
) -> {
177 future
.completeExceptionally(err
);
178 } else if (locations
== null || locations
.isEmpty()) {
179 future
.complete(Collections
.emptyList());
181 List
<HRegionLocation
> regionLocations
=
182 locations
.stream().map(loc
-> new HRegionLocation(loc
.getFirst(), loc
.getSecond()))
183 .collect(Collectors
.toList());
184 future
.complete(regionLocations
);
191 * Used to get table regions' info and server.
193 * @param tableName table we're looking for, can be null for getting all regions
194 * @param excludeOfflinedSplitParents don't return split parents
195 * @return the list of regioninfos and server. The return value will be wrapped by a
196 * {@link CompletableFuture}.
198 private static CompletableFuture
<List
<Pair
<RegionInfo
, ServerName
>>> getTableRegionsAndLocations(
199 final AsyncTable
<AdvancedScanResultConsumer
> metaTable
, final TableName tableName
,
200 final boolean excludeOfflinedSplitParents
) {
201 CompletableFuture
<List
<Pair
<RegionInfo
, ServerName
>>> future
= new CompletableFuture
<>();
202 if (TableName
.META_TABLE_NAME
.equals(tableName
)) {
203 future
.completeExceptionally(new IOException(
204 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
207 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
208 CollectRegionLocationsVisitor visitor
=
209 new CollectRegionLocationsVisitor(excludeOfflinedSplitParents
);
211 addListener(scanMeta(metaTable
, tableName
, QueryType
.REGION
, visitor
), (v
, error
) -> {
213 future
.completeExceptionally(error
);
216 future
.complete(visitor
.getResults());
222 * Performs a scan of META table for given table.
224 * @param tableName table withing we scan
225 * @param type scanned part of meta
226 * @param visitor Visitor invoked against each row
228 private static CompletableFuture
<Void
> scanMeta(AsyncTable
<AdvancedScanResultConsumer
> metaTable
,
229 TableName tableName
, QueryType type
, final Visitor visitor
) {
230 return scanMeta(metaTable
, getTableStartRowForMeta(tableName
, type
),
231 getTableStopRowForMeta(tableName
, type
), type
, Integer
.MAX_VALUE
, visitor
);
235 * Performs a scan of META table for given table.
237 * @param startRow Where to start the scan
238 * @param stopRow Where to stop the scan
239 * @param type scanned part of meta
240 * @param maxRows maximum rows to return
241 * @param visitor Visitor invoked against each row
243 private static CompletableFuture
<Void
> scanMeta(AsyncTable
<AdvancedScanResultConsumer
> metaTable
,
244 byte[] startRow
, byte[] stopRow
, QueryType type
, int maxRows
, final Visitor visitor
) {
245 int rowUpperLimit
= maxRows
> 0 ? maxRows
: Integer
.MAX_VALUE
;
246 Scan scan
= getMetaScan(metaTable
, rowUpperLimit
);
247 for (byte[] family
: type
.getFamilies()) {
248 scan
.addFamily(family
);
250 if (startRow
!= null) {
251 scan
.withStartRow(startRow
);
253 if (stopRow
!= null) {
254 scan
.withStopRow(stopRow
);
257 if (LOG
.isDebugEnabled()) {
258 LOG
.debug("Scanning META" + " starting at row=" + Bytes
.toStringBinary(scan
.getStartRow()) +
259 " stopping at row=" + Bytes
.toStringBinary(scan
.getStopRow()) + " for max=" +
260 rowUpperLimit
+ " with caching=" + scan
.getCaching());
263 CompletableFuture
<Void
> future
= new CompletableFuture
<Void
>();
264 metaTable
.scan(scan
, new MetaTableScanResultConsumer(rowUpperLimit
, visitor
, future
));
268 private static final class MetaTableScanResultConsumer
implements AdvancedScanResultConsumer
{
270 private int currentRowCount
;
272 private final int rowUpperLimit
;
274 private final Visitor visitor
;
276 private final CompletableFuture
<Void
> future
;
278 MetaTableScanResultConsumer(int rowUpperLimit
, Visitor visitor
,
279 CompletableFuture
<Void
> future
) {
280 this.rowUpperLimit
= rowUpperLimit
;
281 this.visitor
= visitor
;
282 this.future
= future
;
283 this.currentRowCount
= 0;
287 public void onError(Throwable error
) {
288 future
.completeExceptionally(error
);
292 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
= "NP_NONNULL_PARAM_VIOLATION",
293 justification
= "https://github.com/findbugsproject/findbugs/issues/79")
294 public void onComplete() {
295 future
.complete(null);
299 public void onNext(Result
[] results
, ScanController controller
) {
300 boolean terminateScan
= false;
301 for (Result result
: results
) {
303 if (!visitor
.visit(result
)) {
304 terminateScan
= true;
307 } catch (Exception e
) {
308 future
.completeExceptionally(e
);
309 terminateScan
= true;
312 if (++currentRowCount
>= rowUpperLimit
) {
313 terminateScan
= true;
318 controller
.terminate();
324 * Implementations 'visit' a catalog table row.
326 public interface Visitor
{
328 * Visit the catalog table row.
329 * @param r A row from catalog table
330 * @return True if we are to proceed scanning the table, else false if we are to stop now.
332 boolean visit(final Result r
) throws IOException
;
336 * Implementations 'visit' a catalog table row but with close() at the end.
338 public interface CloseableVisitor
extends Visitor
, Closeable
{
342 * A {@link Visitor} that collects content out of passed {@link Result}.
344 private static abstract class CollectingVisitor
<T
> implements Visitor
{
345 final List
<T
> results
= new ArrayList
<>();
348 public boolean visit(Result r
) throws IOException
{
349 if (r
!= null && !r
.isEmpty()) {
355 abstract void add(Result r
);
358 * @return Collected results; wait till visits complete to collect all possible results
360 List
<T
> getResults() {
365 static class CollectRegionLocationsVisitor
366 extends CollectingVisitor
<Pair
<RegionInfo
, ServerName
>> {
368 private final boolean excludeOfflinedSplitParents
;
370 private RegionLocations current
= null;
372 CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents
) {
373 this.excludeOfflinedSplitParents
= excludeOfflinedSplitParents
;
377 public boolean visit(Result r
) throws IOException
{
378 Optional
<RegionLocations
> currentRegionLocations
= getRegionLocations(r
);
379 current
= currentRegionLocations
.orElse(null);
380 if (current
== null || current
.getRegionLocation().getRegion() == null) {
381 LOG
.warn("No serialized RegionInfo in " + r
);
384 RegionInfo hri
= current
.getRegionLocation().getRegion();
385 if (excludeOfflinedSplitParents
&& hri
.isSplitParent()) {
388 // Else call super and add this Result to the collection.
389 return super.visit(r
);
394 if (current
== null) {
397 for (HRegionLocation loc
: current
.getRegionLocations()) {
399 this.results
.add(new Pair
<RegionInfo
, ServerName
>(loc
.getRegion(), loc
.getServerName()));
406 * Collects all returned.
408 static class CollectAllVisitor
extends CollectingVisitor
<Result
> {
415 private static Scan
getMetaScan(AsyncTable
<?
> metaTable
, int rowUpperLimit
) {
416 Scan scan
= new Scan();
417 int scannerCaching
= metaTable
.getConfiguration().getInt(HConstants
.HBASE_META_SCANNER_CACHING
,
418 HConstants
.DEFAULT_HBASE_META_SCANNER_CACHING
);
419 if (metaTable
.getConfiguration().getBoolean(HConstants
.USE_META_REPLICAS
,
420 HConstants
.DEFAULT_USE_META_REPLICAS
)) {
421 scan
.setConsistency(Consistency
.TIMELINE
);
423 if (rowUpperLimit
<= scannerCaching
) {
424 scan
.setLimit(rowUpperLimit
);
426 int rows
= Math
.min(rowUpperLimit
, scannerCaching
);
427 scan
.setCaching(rows
);
432 * Returns an HRegionLocationList extracted from the result.
433 * @return an HRegionLocationList containing all locations for the region range or null if we
434 * can't deserialize the result.
436 private static Optional
<RegionLocations
> getRegionLocations(Result r
) {
437 return Optional
.ofNullable(CatalogFamilyFormat
.getRegionLocations(r
));
441 * @param tableName table we're working with
442 * @return start row for scanning META according to query type
444 public static byte[] getTableStartRowForMeta(TableName tableName
, QueryType type
) {
445 if (tableName
== null) {
451 byte[] startRow
= new byte[tableName
.getName().length
+ 2];
452 System
.arraycopy(tableName
.getName(), 0, startRow
, 0, tableName
.getName().length
);
453 startRow
[startRow
.length
- 2] = HConstants
.DELIMITER
;
454 startRow
[startRow
.length
- 1] = HConstants
.DELIMITER
;
460 return tableName
.getName();
466 * @param tableName table we're working with
467 * @return stop row for scanning META according to query type
469 public static byte[] getTableStopRowForMeta(TableName tableName
, QueryType type
) {
470 if (tableName
== null) {
473 final byte[] stopRow
;
477 stopRow
= new byte[tableName
.getName().length
+ 3];
478 System
.arraycopy(tableName
.getName(), 0, stopRow
, 0, tableName
.getName().length
);
479 stopRow
[stopRow
.length
- 3] = ' ';
480 stopRow
[stopRow
.length
- 2] = HConstants
.DELIMITER
;
481 stopRow
[stopRow
.length
- 1] = HConstants
.DELIMITER
;
487 stopRow
= new byte[tableName
.getName().length
+ 1];
488 System
.arraycopy(tableName
.getName(), 0, stopRow
, 0, tableName
.getName().length
);
489 stopRow
[stopRow
.length
- 1] = ' ';