2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import static org
.apache
.hadoop
.hbase
.TableName
.META_TABLE_NAME
;
22 import java
.io
.IOException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.Arrays
;
25 import java
.util
.Collections
;
26 import java
.util
.List
;
28 import java
.util
.NavigableMap
;
29 import java
.util
.Optional
;
30 import java
.util
.SortedMap
;
31 import java
.util
.concurrent
.CompletableFuture
;
32 import java
.util
.regex
.Matcher
;
33 import java
.util
.regex
.Pattern
;
34 import java
.util
.stream
.Collectors
;
36 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
.CollectingVisitor
;
37 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
.QueryType
;
38 import org
.apache
.hadoop
.hbase
.MetaTableAccessor
.Visitor
;
39 import org
.apache
.hadoop
.hbase
.client
.AdvancedScanResultConsumer
;
40 import org
.apache
.hadoop
.hbase
.client
.AsyncTable
;
41 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
42 import org
.apache
.hadoop
.hbase
.client
.Get
;
43 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
44 import org
.apache
.hadoop
.hbase
.client
.RegionReplicaUtil
;
45 import org
.apache
.hadoop
.hbase
.client
.Result
;
46 import org
.apache
.hadoop
.hbase
.client
.Scan
;
47 import org
.apache
.hadoop
.hbase
.client
.Scan
.ReadType
;
48 import org
.apache
.hadoop
.hbase
.client
.TableState
;
49 import org
.apache
.hadoop
.hbase
.exceptions
.DeserializationException
;
50 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
51 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
52 import org
.apache
.hadoop
.hbase
.util
.Pair
;
53 import org
.apache
.yetus
.audience
.InterfaceAudience
;
54 import org
.slf4j
.Logger
;
55 import org
.slf4j
.LoggerFactory
;
58 * The asynchronous meta table accessor. Used to read/write region and assignment information store
59 * in <code>hbase:meta</code>.
62 @InterfaceAudience.Private
63 public class AsyncMetaTableAccessor
{
65 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncMetaTableAccessor
.class);
68 /** The delimiter for meta columns for replicaIds > 0 */
69 private static final char META_REPLICA_ID_DELIMITER
= '_';
71 /** A regex for parsing server columns from meta. See above javadoc for meta layout */
72 private static final Pattern SERVER_COLUMN_PATTERN
= Pattern
73 .compile("^server(_[0-9a-fA-F]{4})?$");
75 public static CompletableFuture
<Boolean
> tableExists(AsyncTable
<?
> metaTable
,
76 TableName tableName
) {
77 if (tableName
.equals(META_TABLE_NAME
)) {
78 return CompletableFuture
.completedFuture(true);
80 return getTableState(metaTable
, tableName
).thenApply(Optional
::isPresent
);
83 public static CompletableFuture
<Optional
<TableState
>> getTableState(AsyncTable
<?
> metaTable
,
84 TableName tableName
) {
85 CompletableFuture
<Optional
<TableState
>> future
= new CompletableFuture
<>();
86 Get get
= new Get(tableName
.getName()).addColumn(getTableFamily(), getStateColumn());
87 long time
= EnvironmentEdgeManager
.currentTime();
89 get
.setTimeRange(0, time
);
90 metaTable
.get(get
).whenComplete((result
, error
) -> {
92 future
.completeExceptionally(error
);
96 future
.complete(getTableState(result
));
97 } catch (IOException e
) {
98 future
.completeExceptionally(e
);
101 } catch (IOException ioe
) {
102 future
.completeExceptionally(ioe
);
108 * Returns the HRegionLocation from meta for the given region
110 * @param regionName region we're looking for
111 * @return HRegionLocation for the given region
113 public static CompletableFuture
<Optional
<HRegionLocation
>> getRegionLocation(
114 AsyncTable
<?
> metaTable
, byte[] regionName
) {
115 CompletableFuture
<Optional
<HRegionLocation
>> future
= new CompletableFuture
<>();
117 RegionInfo parsedRegionInfo
= MetaTableAccessor
.parseRegionInfoFromRegionName(regionName
);
119 new Get(MetaTableAccessor
.getMetaKeyForRegion(parsedRegionInfo
))
120 .addFamily(HConstants
.CATALOG_FAMILY
)).whenComplete(
123 future
.completeExceptionally(err
);
126 future
.complete(getRegionLocations(r
).map(
127 locations
-> locations
.getRegionLocation(parsedRegionInfo
.getReplicaId())));
129 } catch (IOException parseEx
) {
130 LOG
.warn("Failed to parse the passed region name: " + Bytes
.toStringBinary(regionName
));
131 future
.completeExceptionally(parseEx
);
137 * Returns the HRegionLocation from meta for the given encoded region name
139 * @param encodedRegionName region we're looking for
140 * @return HRegionLocation for the given region
142 public static CompletableFuture
<Optional
<HRegionLocation
>> getRegionLocationWithEncodedName(
143 AsyncTable
<?
> metaTable
, byte[] encodedRegionName
) {
144 CompletableFuture
<Optional
<HRegionLocation
>> future
= new CompletableFuture
<>();
145 metaTable
.scanAll(new Scan().setReadType(ReadType
.PREAD
).addFamily(HConstants
.CATALOG_FAMILY
))
149 future
.completeExceptionally(err
);
152 String encodedRegionNameStr
= Bytes
.toString(encodedRegionName
);
155 .filter(result
-> !result
.isEmpty())
156 .filter(result
-> MetaTableAccessor
.getRegionInfo(result
) != null)
159 getRegionLocations(result
).ifPresent(
161 for (HRegionLocation location
: locations
.getRegionLocations()) {
163 && encodedRegionNameStr
.equals(location
.getRegion()
164 .getEncodedName())) {
165 future
.complete(Optional
.of(location
));
171 future
.complete(Optional
.empty());
176 private static Optional
<TableState
> getTableState(Result r
) throws IOException
{
177 Cell cell
= r
.getColumnLatestCell(getTableFamily(), getStateColumn());
178 if (cell
== null) return Optional
.empty();
180 return Optional
.of(TableState
.parseFrom(
181 TableName
.valueOf(r
.getRow()),
182 Arrays
.copyOfRange(cell
.getValueArray(), cell
.getValueOffset(), cell
.getValueOffset()
183 + cell
.getValueLength())));
184 } catch (DeserializationException e
) {
185 throw new IOException("Failed to parse table state from result: " + r
, e
);
190 * Used to get all region locations for the specific table.
192 * @param tableName table we're looking for, can be null for getting all regions
193 * @return the list of region locations. The return value will be wrapped by a
194 * {@link CompletableFuture}.
196 public static CompletableFuture
<List
<HRegionLocation
>> getTableHRegionLocations(
197 AsyncTable
<AdvancedScanResultConsumer
> metaTable
, Optional
<TableName
> tableName
) {
198 CompletableFuture
<List
<HRegionLocation
>> future
= new CompletableFuture
<>();
199 getTableRegionsAndLocations(metaTable
, tableName
, true).whenComplete(
200 (locations
, err
) -> {
202 future
.completeExceptionally(err
);
203 } else if (locations
== null || locations
.isEmpty()) {
204 future
.complete(Collections
.emptyList());
206 List
<HRegionLocation
> regionLocations
= locations
.stream()
207 .map(loc
-> new HRegionLocation(loc
.getFirst(), loc
.getSecond()))
208 .collect(Collectors
.toList());
209 future
.complete(regionLocations
);
216 * Used to get table regions' info and server.
218 * @param tableName table we're looking for, can be null for getting all regions
219 * @param excludeOfflinedSplitParents don't return split parents
220 * @return the list of regioninfos and server. The return value will be wrapped by a
221 * {@link CompletableFuture}.
223 private static CompletableFuture
<List
<Pair
<RegionInfo
, ServerName
>>> getTableRegionsAndLocations(
224 AsyncTable
<AdvancedScanResultConsumer
> metaTable
, final Optional
<TableName
> tableName
,
225 final boolean excludeOfflinedSplitParents
) {
226 CompletableFuture
<List
<Pair
<RegionInfo
, ServerName
>>> future
= new CompletableFuture
<>();
227 if (tableName
.filter((t
) -> t
.equals(TableName
.META_TABLE_NAME
)).isPresent()) {
228 future
.completeExceptionally(new IOException(
229 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
232 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
233 CollectingVisitor
<Pair
<RegionInfo
, ServerName
>> visitor
= new CollectingVisitor
<Pair
<RegionInfo
, ServerName
>>() {
234 private Optional
<RegionLocations
> current
= null;
237 public boolean visit(Result r
) throws IOException
{
238 current
= getRegionLocations(r
);
239 if (!current
.isPresent() || current
.get().getRegionLocation().getRegion() == null) {
240 LOG
.warn("No serialized RegionInfo in " + r
);
243 RegionInfo hri
= current
.get().getRegionLocation().getRegion();
244 if (excludeOfflinedSplitParents
&& hri
.isSplitParent()) return true;
245 // Else call super and add this Result to the collection.
246 return super.visit(r
);
251 if (!current
.isPresent()) {
254 for (HRegionLocation loc
: current
.get().getRegionLocations()) {
256 this.results
.add(new Pair
<RegionInfo
, ServerName
>(loc
.getRegion(), loc
263 scanMeta(metaTable
, tableName
, QueryType
.REGION
, visitor
).whenComplete((v
, error
) -> {
265 future
.completeExceptionally(error
);
268 future
.complete(visitor
.getResults());
274 * Performs a scan of META table for given table.
276 * @param tableName table withing we scan
277 * @param type scanned part of meta
278 * @param visitor Visitor invoked against each row
280 private static CompletableFuture
<Void
> scanMeta(AsyncTable
<AdvancedScanResultConsumer
> metaTable
,
281 Optional
<TableName
> tableName
, QueryType type
, final Visitor visitor
) {
282 return scanMeta(metaTable
, getTableStartRowForMeta(tableName
, type
),
283 getTableStopRowForMeta(tableName
, type
), type
, Integer
.MAX_VALUE
, visitor
);
287 * Performs a scan of META table for given table.
289 * @param startRow Where to start the scan
290 * @param stopRow Where to stop the scan
291 * @param type scanned part of meta
292 * @param maxRows maximum rows to return
293 * @param visitor Visitor invoked against each row
295 private static CompletableFuture
<Void
> scanMeta(AsyncTable
<AdvancedScanResultConsumer
> metaTable
,
296 Optional
<byte[]> startRow
, Optional
<byte[]> stopRow
, QueryType type
, int maxRows
,
297 final Visitor visitor
) {
298 int rowUpperLimit
= maxRows
> 0 ? maxRows
: Integer
.MAX_VALUE
;
299 Scan scan
= getMetaScan(metaTable
, rowUpperLimit
);
300 for (byte[] family
: type
.getFamilies()) {
301 scan
.addFamily(family
);
303 startRow
.ifPresent(scan
::withStartRow
);
304 stopRow
.ifPresent(scan
::withStopRow
);
306 if (LOG
.isDebugEnabled()) {
307 LOG
.debug("Scanning META" + " starting at row=" + Bytes
.toStringBinary(scan
.getStartRow())
308 + " stopping at row=" + Bytes
.toStringBinary(scan
.getStopRow()) + " for max="
309 + rowUpperLimit
+ " with caching=" + scan
.getCaching());
312 CompletableFuture
<Void
> future
= new CompletableFuture
<Void
>();
313 metaTable
.scan(scan
, new MetaTableScanResultConsumer(rowUpperLimit
, visitor
, future
));
317 private static final class MetaTableScanResultConsumer
implements AdvancedScanResultConsumer
{
319 private int currentRowCount
;
321 private final int rowUpperLimit
;
323 private final Visitor visitor
;
325 private final CompletableFuture
<Void
> future
;
327 MetaTableScanResultConsumer(int rowUpperLimit
, Visitor visitor
,
328 CompletableFuture
<Void
> future
) {
329 this.rowUpperLimit
= rowUpperLimit
;
330 this.visitor
= visitor
;
331 this.future
= future
;
332 this.currentRowCount
= 0;
336 public void onError(Throwable error
) {
337 future
.completeExceptionally(error
);
341 @edu.umd
.cs
.findbugs
.annotations
.SuppressWarnings(value
= "NP_NONNULL_PARAM_VIOLATION",
342 justification
= "https://github.com/findbugsproject/findbugs/issues/79")
343 public void onComplete() {
344 future
.complete(null);
348 public void onNext(Result
[] results
, ScanController controller
) {
349 boolean terminateScan
= false;
350 for (Result result
: results
) {
352 if (!visitor
.visit(result
)) {
353 terminateScan
= true;
356 } catch (Exception e
) {
357 future
.completeExceptionally(e
);
358 terminateScan
= true;
361 if (++currentRowCount
>= rowUpperLimit
) {
362 terminateScan
= true;
367 controller
.terminate();
372 private static Scan
getMetaScan(AsyncTable
<?
> metaTable
, int rowUpperLimit
) {
373 Scan scan
= new Scan();
374 int scannerCaching
= metaTable
.getConfiguration().getInt(HConstants
.HBASE_META_SCANNER_CACHING
,
375 HConstants
.DEFAULT_HBASE_META_SCANNER_CACHING
);
376 if (metaTable
.getConfiguration().getBoolean(HConstants
.USE_META_REPLICAS
,
377 HConstants
.DEFAULT_USE_META_REPLICAS
)) {
378 scan
.setConsistency(Consistency
.TIMELINE
);
380 if (rowUpperLimit
<= scannerCaching
) {
381 scan
.setLimit(rowUpperLimit
);
383 int rows
= Math
.min(rowUpperLimit
, scannerCaching
);
384 scan
.setCaching(rows
);
389 * Returns an HRegionLocationList extracted from the result.
390 * @return an HRegionLocationList containing all locations for the region range or null if we
391 * can't deserialize the result.
393 private static Optional
<RegionLocations
> getRegionLocations(final Result r
) {
394 if (r
== null) return Optional
.empty();
395 Optional
<RegionInfo
> regionInfo
= getHRegionInfo(r
, getRegionInfoColumn());
396 if (!regionInfo
.isPresent()) return Optional
.empty();
398 List
<HRegionLocation
> locations
= new ArrayList
<HRegionLocation
>(1);
399 NavigableMap
<byte[], NavigableMap
<byte[], byte[]>> familyMap
= r
.getNoVersionMap();
401 locations
.add(getRegionLocation(r
, regionInfo
.get(), 0));
403 NavigableMap
<byte[], byte[]> infoMap
= familyMap
.get(getCatalogFamily());
404 if (infoMap
== null) return Optional
.of(new RegionLocations(locations
));
406 // iterate until all serverName columns are seen
408 byte[] serverColumn
= getServerColumn(replicaId
);
409 SortedMap
<byte[], byte[]> serverMap
= null;
410 serverMap
= infoMap
.tailMap(serverColumn
, false);
412 if (serverMap
.isEmpty()) return Optional
.of(new RegionLocations(locations
));
414 for (Map
.Entry
<byte[], byte[]> entry
: serverMap
.entrySet()) {
415 replicaId
= parseReplicaIdFromServerColumn(entry
.getKey());
419 HRegionLocation location
= getRegionLocation(r
, regionInfo
.get(), replicaId
);
420 // In case the region replica is newly created, it's location might be null. We usually do not
421 // have HRL's in RegionLocations object with null ServerName. They are handled as null HRLs.
422 if (location
== null || location
.getServerName() == null) {
425 locations
.add(location
);
429 return Optional
.of(new RegionLocations(locations
));
433 * Returns the HRegionLocation parsed from the given meta row Result
434 * for the given regionInfo and replicaId. The regionInfo can be the default region info
436 * @param r the meta row result
437 * @param regionInfo RegionInfo for default replica
438 * @param replicaId the replicaId for the HRegionLocation
439 * @return HRegionLocation parsed from the given meta row Result for the given replicaId
441 private static HRegionLocation
getRegionLocation(final Result r
, final RegionInfo regionInfo
,
442 final int replicaId
) {
443 Optional
<ServerName
> serverName
= getServerName(r
, replicaId
);
444 long seqNum
= getSeqNumDuringOpen(r
, replicaId
);
445 RegionInfo replicaInfo
= RegionReplicaUtil
.getRegionInfoForReplica(regionInfo
, replicaId
);
446 return new HRegionLocation(replicaInfo
, serverName
.orElse(null), seqNum
);
450 * Returns a {@link ServerName} from catalog table {@link Result}.
451 * @param r Result to pull from
452 * @return A ServerName instance.
454 private static Optional
<ServerName
> getServerName(final Result r
, final int replicaId
) {
455 byte[] serverColumn
= getServerColumn(replicaId
);
456 Cell cell
= r
.getColumnLatestCell(getCatalogFamily(), serverColumn
);
457 if (cell
== null || cell
.getValueLength() == 0) return Optional
.empty();
458 String hostAndPort
= Bytes
.toString(cell
.getValueArray(), cell
.getValueOffset(),
459 cell
.getValueLength());
460 byte[] startcodeColumn
= getStartCodeColumn(replicaId
);
461 cell
= r
.getColumnLatestCell(getCatalogFamily(), startcodeColumn
);
462 if (cell
== null || cell
.getValueLength() == 0) return Optional
.empty();
464 return Optional
.of(ServerName
.valueOf(hostAndPort
,
465 Bytes
.toLong(cell
.getValueArray(), cell
.getValueOffset(), cell
.getValueLength())));
466 } catch (IllegalArgumentException e
) {
467 LOG
.error("Ignoring invalid region for server " + hostAndPort
+ "; cell=" + cell
, e
);
468 return Optional
.empty();
473 * The latest seqnum that the server writing to meta observed when opening the region.
474 * E.g. the seqNum when the result of {@link #getServerName(Result, int)} was written.
475 * @param r Result to pull the seqNum from
476 * @return SeqNum, or HConstants.NO_SEQNUM if there's no value written.
478 private static long getSeqNumDuringOpen(final Result r
, final int replicaId
) {
479 Cell cell
= r
.getColumnLatestCell(getCatalogFamily(), getSeqNumColumn(replicaId
));
480 if (cell
== null || cell
.getValueLength() == 0) return HConstants
.NO_SEQNUM
;
481 return Bytes
.toLong(cell
.getValueArray(), cell
.getValueOffset(), cell
.getValueLength());
485 * @param tableName table we're working with
486 * @return start row for scanning META according to query type
488 private static Optional
<byte[]> getTableStartRowForMeta(Optional
<TableName
> tableName
,
490 return tableName
.map((table
) -> {
494 byte[] startRow
= new byte[table
.getName().length
+ 2];
495 System
.arraycopy(table
.getName(), 0, startRow
, 0, table
.getName().length
);
496 startRow
[startRow
.length
- 2] = HConstants
.DELIMITER
;
497 startRow
[startRow
.length
- 1] = HConstants
.DELIMITER
;
502 return table
.getName();
508 * @param tableName table we're working with
509 * @return stop row for scanning META according to query type
511 private static Optional
<byte[]> getTableStopRowForMeta(Optional
<TableName
> tableName
,
513 return tableName
.map((table
) -> {
514 final byte[] stopRow
;
518 stopRow
= new byte[table
.getName().length
+ 3];
519 System
.arraycopy(table
.getName(), 0, stopRow
, 0, table
.getName().length
);
520 stopRow
[stopRow
.length
- 3] = ' ';
521 stopRow
[stopRow
.length
- 2] = HConstants
.DELIMITER
;
522 stopRow
[stopRow
.length
- 1] = HConstants
.DELIMITER
;
527 stopRow
= new byte[table
.getName().length
+ 1];
528 System
.arraycopy(table
.getName(), 0, stopRow
, 0, table
.getName().length
);
529 stopRow
[stopRow
.length
- 1] = ' ';
537 * Returns the RegionInfo object from the column {@link HConstants#CATALOG_FAMILY} and
538 * <code>qualifier</code> of the catalog table result.
539 * @param r a Result object from the catalog table scan
540 * @param qualifier Column family qualifier
541 * @return An RegionInfo instance.
543 private static Optional
<RegionInfo
> getHRegionInfo(final Result r
, byte[] qualifier
) {
544 Cell cell
= r
.getColumnLatestCell(getCatalogFamily(), qualifier
);
545 if (cell
== null) return Optional
.empty();
546 return Optional
.ofNullable(RegionInfo
.parseFromOrNull(cell
.getValueArray(),
547 cell
.getValueOffset(), cell
.getValueLength()));
551 * Returns the column family used for meta columns.
552 * @return HConstants.CATALOG_FAMILY.
554 private static byte[] getCatalogFamily() {
555 return HConstants
.CATALOG_FAMILY
;
559 * Returns the column family used for table columns.
560 * @return HConstants.TABLE_FAMILY.
562 private static byte[] getTableFamily() {
563 return HConstants
.TABLE_FAMILY
;
567 * Returns the column qualifier for serialized region info
568 * @return HConstants.REGIONINFO_QUALIFIER
570 private static byte[] getRegionInfoColumn() {
571 return HConstants
.REGIONINFO_QUALIFIER
;
575 * Returns the column qualifier for serialized table state
576 * @return HConstants.TABLE_STATE_QUALIFIER
578 private static byte[] getStateColumn() {
579 return HConstants
.TABLE_STATE_QUALIFIER
;
583 * Returns the column qualifier for server column for replicaId
584 * @param replicaId the replicaId of the region
585 * @return a byte[] for server column qualifier
587 private static byte[] getServerColumn(int replicaId
) {
588 return replicaId
== 0
589 ? HConstants
.SERVER_QUALIFIER
590 : Bytes
.toBytes(HConstants
.SERVER_QUALIFIER_STR
+ META_REPLICA_ID_DELIMITER
591 + String
.format(RegionInfo
.REPLICA_ID_FORMAT
, replicaId
));
595 * Returns the column qualifier for server start code column for replicaId
596 * @param replicaId the replicaId of the region
597 * @return a byte[] for server start code column qualifier
599 private static byte[] getStartCodeColumn(int replicaId
) {
600 return replicaId
== 0
601 ? HConstants
.STARTCODE_QUALIFIER
602 : Bytes
.toBytes(HConstants
.STARTCODE_QUALIFIER_STR
+ META_REPLICA_ID_DELIMITER
603 + String
.format(RegionInfo
.REPLICA_ID_FORMAT
, replicaId
));
607 * Returns the column qualifier for seqNum column for replicaId
608 * @param replicaId the replicaId of the region
609 * @return a byte[] for seqNum column qualifier
611 private static byte[] getSeqNumColumn(int replicaId
) {
612 return replicaId
== 0
613 ? HConstants
.SEQNUM_QUALIFIER
614 : Bytes
.toBytes(HConstants
.SEQNUM_QUALIFIER_STR
+ META_REPLICA_ID_DELIMITER
615 + String
.format(RegionInfo
.REPLICA_ID_FORMAT
, replicaId
));
619 * Parses the replicaId from the server column qualifier. See top of the class javadoc
620 * for the actual meta layout
621 * @param serverColumn the column qualifier
622 * @return an int for the replicaId
624 private static int parseReplicaIdFromServerColumn(byte[] serverColumn
) {
625 String serverStr
= Bytes
.toString(serverColumn
);
627 Matcher matcher
= SERVER_COLUMN_PATTERN
.matcher(serverStr
);
628 if (matcher
.matches() && matcher
.groupCount() > 0) {
629 String group
= matcher
.group(1);
630 if (group
!= null && group
.length() > 0) {
631 return Integer
.parseInt(group
.substring(1), 16);