HBASE-20147 Serial replication will be stuck if we create a table with serial replica...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / AsyncMetaTableAccessor.java
blob13245d377aaff53047c5ea0677f3e2947da77474
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase;
20 import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.NavigableMap;
29 import java.util.Optional;
30 import java.util.SortedMap;
31 import java.util.concurrent.CompletableFuture;
32 import java.util.regex.Matcher;
33 import java.util.regex.Pattern;
34 import java.util.stream.Collectors;
36 import org.apache.hadoop.hbase.MetaTableAccessor.CollectingVisitor;
37 import org.apache.hadoop.hbase.MetaTableAccessor.QueryType;
38 import org.apache.hadoop.hbase.MetaTableAccessor.Visitor;
39 import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
40 import org.apache.hadoop.hbase.client.AsyncTable;
41 import org.apache.hadoop.hbase.client.Consistency;
42 import org.apache.hadoop.hbase.client.Get;
43 import org.apache.hadoop.hbase.client.RegionInfo;
44 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
45 import org.apache.hadoop.hbase.client.Result;
46 import org.apache.hadoop.hbase.client.Scan;
47 import org.apache.hadoop.hbase.client.Scan.ReadType;
48 import org.apache.hadoop.hbase.client.TableState;
49 import org.apache.hadoop.hbase.exceptions.DeserializationException;
50 import org.apache.hadoop.hbase.util.Bytes;
51 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52 import org.apache.hadoop.hbase.util.Pair;
53 import org.apache.yetus.audience.InterfaceAudience;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
57 /**
58 * The asynchronous meta table accessor. Used to read/write region and assignment information store
59 * in <code>hbase:meta</code>.
60 * @since 2.0.0
62 @InterfaceAudience.Private
63 public class AsyncMetaTableAccessor {
65 private static final Logger LOG = LoggerFactory.getLogger(AsyncMetaTableAccessor.class);
68 /** The delimiter for meta columns for replicaIds &gt; 0 */
69 private static final char META_REPLICA_ID_DELIMITER = '_';
71 /** A regex for parsing server columns from meta. See above javadoc for meta layout */
72 private static final Pattern SERVER_COLUMN_PATTERN = Pattern
73 .compile("^server(_[0-9a-fA-F]{4})?$");
75 public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable,
76 TableName tableName) {
77 if (tableName.equals(META_TABLE_NAME)) {
78 return CompletableFuture.completedFuture(true);
80 return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
83 public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable,
84 TableName tableName) {
85 CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
86 Get get = new Get(tableName.getName()).addColumn(getTableFamily(), getStateColumn());
87 long time = EnvironmentEdgeManager.currentTime();
88 try {
89 get.setTimeRange(0, time);
90 metaTable.get(get).whenComplete((result, error) -> {
91 if (error != null) {
92 future.completeExceptionally(error);
93 return;
95 try {
96 future.complete(getTableState(result));
97 } catch (IOException e) {
98 future.completeExceptionally(e);
101 } catch (IOException ioe) {
102 future.completeExceptionally(ioe);
104 return future;
108 * Returns the HRegionLocation from meta for the given region
109 * @param metaTable
110 * @param regionName region we're looking for
111 * @return HRegionLocation for the given region
113 public static CompletableFuture<Optional<HRegionLocation>> getRegionLocation(
114 AsyncTable<?> metaTable, byte[] regionName) {
115 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
116 try {
117 RegionInfo parsedRegionInfo = MetaTableAccessor.parseRegionInfoFromRegionName(regionName);
118 metaTable.get(
119 new Get(MetaTableAccessor.getMetaKeyForRegion(parsedRegionInfo))
120 .addFamily(HConstants.CATALOG_FAMILY)).whenComplete(
121 (r, err) -> {
122 if (err != null) {
123 future.completeExceptionally(err);
124 return;
126 future.complete(getRegionLocations(r).map(
127 locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
129 } catch (IOException parseEx) {
130 LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
131 future.completeExceptionally(parseEx);
133 return future;
137 * Returns the HRegionLocation from meta for the given encoded region name
138 * @param metaTable
139 * @param encodedRegionName region we're looking for
140 * @return HRegionLocation for the given region
142 public static CompletableFuture<Optional<HRegionLocation>> getRegionLocationWithEncodedName(
143 AsyncTable<?> metaTable, byte[] encodedRegionName) {
144 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
145 metaTable.scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY))
146 .whenComplete(
147 (results, err) -> {
148 if (err != null) {
149 future.completeExceptionally(err);
150 return;
152 String encodedRegionNameStr = Bytes.toString(encodedRegionName);
153 results
154 .stream()
155 .filter(result -> !result.isEmpty())
156 .filter(result -> MetaTableAccessor.getRegionInfo(result) != null)
157 .forEach(
158 result -> {
159 getRegionLocations(result).ifPresent(
160 locations -> {
161 for (HRegionLocation location : locations.getRegionLocations()) {
162 if (location != null
163 && encodedRegionNameStr.equals(location.getRegion()
164 .getEncodedName())) {
165 future.complete(Optional.of(location));
166 return;
171 future.complete(Optional.empty());
173 return future;
176 private static Optional<TableState> getTableState(Result r) throws IOException {
177 Cell cell = r.getColumnLatestCell(getTableFamily(), getStateColumn());
178 if (cell == null) return Optional.empty();
179 try {
180 return Optional.of(TableState.parseFrom(
181 TableName.valueOf(r.getRow()),
182 Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset()
183 + cell.getValueLength())));
184 } catch (DeserializationException e) {
185 throw new IOException("Failed to parse table state from result: " + r, e);
190 * Used to get all region locations for the specific table.
191 * @param metaTable
192 * @param tableName table we're looking for, can be null for getting all regions
193 * @return the list of region locations. The return value will be wrapped by a
194 * {@link CompletableFuture}.
196 public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
197 AsyncTable<AdvancedScanResultConsumer> metaTable, Optional<TableName> tableName) {
198 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
199 getTableRegionsAndLocations(metaTable, tableName, true).whenComplete(
200 (locations, err) -> {
201 if (err != null) {
202 future.completeExceptionally(err);
203 } else if (locations == null || locations.isEmpty()) {
204 future.complete(Collections.emptyList());
205 } else {
206 List<HRegionLocation> regionLocations = locations.stream()
207 .map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
208 .collect(Collectors.toList());
209 future.complete(regionLocations);
212 return future;
216 * Used to get table regions' info and server.
217 * @param metaTable
218 * @param tableName table we're looking for, can be null for getting all regions
219 * @param excludeOfflinedSplitParents don't return split parents
220 * @return the list of regioninfos and server. The return value will be wrapped by a
221 * {@link CompletableFuture}.
223 private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
224 AsyncTable<AdvancedScanResultConsumer> metaTable, final Optional<TableName> tableName,
225 final boolean excludeOfflinedSplitParents) {
226 CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
227 if (tableName.filter((t) -> t.equals(TableName.META_TABLE_NAME)).isPresent()) {
228 future.completeExceptionally(new IOException(
229 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
232 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
233 CollectingVisitor<Pair<RegionInfo, ServerName>> visitor = new CollectingVisitor<Pair<RegionInfo, ServerName>>() {
234 private Optional<RegionLocations> current = null;
236 @Override
237 public boolean visit(Result r) throws IOException {
238 current = getRegionLocations(r);
239 if (!current.isPresent() || current.get().getRegionLocation().getRegion() == null) {
240 LOG.warn("No serialized RegionInfo in " + r);
241 return true;
243 RegionInfo hri = current.get().getRegionLocation().getRegion();
244 if (excludeOfflinedSplitParents && hri.isSplitParent()) return true;
245 // Else call super and add this Result to the collection.
246 return super.visit(r);
249 @Override
250 void add(Result r) {
251 if (!current.isPresent()) {
252 return;
254 for (HRegionLocation loc : current.get().getRegionLocations()) {
255 if (loc != null) {
256 this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc
257 .getServerName()));
263 scanMeta(metaTable, tableName, QueryType.REGION, visitor).whenComplete((v, error) -> {
264 if (error != null) {
265 future.completeExceptionally(error);
266 return;
268 future.complete(visitor.getResults());
270 return future;
274 * Performs a scan of META table for given table.
275 * @param metaTable
276 * @param tableName table withing we scan
277 * @param type scanned part of meta
278 * @param visitor Visitor invoked against each row
280 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
281 Optional<TableName> tableName, QueryType type, final Visitor visitor) {
282 return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
283 getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
287 * Performs a scan of META table for given table.
288 * @param metaTable
289 * @param startRow Where to start the scan
290 * @param stopRow Where to stop the scan
291 * @param type scanned part of meta
292 * @param maxRows maximum rows to return
293 * @param visitor Visitor invoked against each row
295 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
296 Optional<byte[]> startRow, Optional<byte[]> stopRow, QueryType type, int maxRows,
297 final Visitor visitor) {
298 int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
299 Scan scan = getMetaScan(metaTable, rowUpperLimit);
300 for (byte[] family : type.getFamilies()) {
301 scan.addFamily(family);
303 startRow.ifPresent(scan::withStartRow);
304 stopRow.ifPresent(scan::withStopRow);
306 if (LOG.isDebugEnabled()) {
307 LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow())
308 + " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max="
309 + rowUpperLimit + " with caching=" + scan.getCaching());
312 CompletableFuture<Void> future = new CompletableFuture<Void>();
313 metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
314 return future;
317 private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer {
319 private int currentRowCount;
321 private final int rowUpperLimit;
323 private final Visitor visitor;
325 private final CompletableFuture<Void> future;
327 MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor,
328 CompletableFuture<Void> future) {
329 this.rowUpperLimit = rowUpperLimit;
330 this.visitor = visitor;
331 this.future = future;
332 this.currentRowCount = 0;
335 @Override
336 public void onError(Throwable error) {
337 future.completeExceptionally(error);
340 @Override
341 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
342 justification = "https://github.com/findbugsproject/findbugs/issues/79")
343 public void onComplete() {
344 future.complete(null);
347 @Override
348 public void onNext(Result[] results, ScanController controller) {
349 boolean terminateScan = false;
350 for (Result result : results) {
351 try {
352 if (!visitor.visit(result)) {
353 terminateScan = true;
354 break;
356 } catch (Exception e) {
357 future.completeExceptionally(e);
358 terminateScan = true;
359 break;
361 if (++currentRowCount >= rowUpperLimit) {
362 terminateScan = true;
363 break;
366 if (terminateScan) {
367 controller.terminate();
372 private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
373 Scan scan = new Scan();
374 int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
375 HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
376 if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
377 HConstants.DEFAULT_USE_META_REPLICAS)) {
378 scan.setConsistency(Consistency.TIMELINE);
380 if (rowUpperLimit <= scannerCaching) {
381 scan.setLimit(rowUpperLimit);
383 int rows = Math.min(rowUpperLimit, scannerCaching);
384 scan.setCaching(rows);
385 return scan;
389 * Returns an HRegionLocationList extracted from the result.
390 * @return an HRegionLocationList containing all locations for the region range or null if we
391 * can't deserialize the result.
393 private static Optional<RegionLocations> getRegionLocations(final Result r) {
394 if (r == null) return Optional.empty();
395 Optional<RegionInfo> regionInfo = getHRegionInfo(r, getRegionInfoColumn());
396 if (!regionInfo.isPresent()) return Optional.empty();
398 List<HRegionLocation> locations = new ArrayList<HRegionLocation>(1);
399 NavigableMap<byte[], NavigableMap<byte[], byte[]>> familyMap = r.getNoVersionMap();
401 locations.add(getRegionLocation(r, regionInfo.get(), 0));
403 NavigableMap<byte[], byte[]> infoMap = familyMap.get(getCatalogFamily());
404 if (infoMap == null) return Optional.of(new RegionLocations(locations));
406 // iterate until all serverName columns are seen
407 int replicaId = 0;
408 byte[] serverColumn = getServerColumn(replicaId);
409 SortedMap<byte[], byte[]> serverMap = null;
410 serverMap = infoMap.tailMap(serverColumn, false);
412 if (serverMap.isEmpty()) return Optional.of(new RegionLocations(locations));
414 for (Map.Entry<byte[], byte[]> entry : serverMap.entrySet()) {
415 replicaId = parseReplicaIdFromServerColumn(entry.getKey());
416 if (replicaId < 0) {
417 break;
419 HRegionLocation location = getRegionLocation(r, regionInfo.get(), replicaId);
420 // In case the region replica is newly created, it's location might be null. We usually do not
421 // have HRL's in RegionLocations object with null ServerName. They are handled as null HRLs.
422 if (location == null || location.getServerName() == null) {
423 locations.add(null);
424 } else {
425 locations.add(location);
429 return Optional.of(new RegionLocations(locations));
433 * Returns the HRegionLocation parsed from the given meta row Result
434 * for the given regionInfo and replicaId. The regionInfo can be the default region info
435 * for the replica.
436 * @param r the meta row result
437 * @param regionInfo RegionInfo for default replica
438 * @param replicaId the replicaId for the HRegionLocation
439 * @return HRegionLocation parsed from the given meta row Result for the given replicaId
441 private static HRegionLocation getRegionLocation(final Result r, final RegionInfo regionInfo,
442 final int replicaId) {
443 Optional<ServerName> serverName = getServerName(r, replicaId);
444 long seqNum = getSeqNumDuringOpen(r, replicaId);
445 RegionInfo replicaInfo = RegionReplicaUtil.getRegionInfoForReplica(regionInfo, replicaId);
446 return new HRegionLocation(replicaInfo, serverName.orElse(null), seqNum);
450 * Returns a {@link ServerName} from catalog table {@link Result}.
451 * @param r Result to pull from
452 * @return A ServerName instance.
454 private static Optional<ServerName> getServerName(final Result r, final int replicaId) {
455 byte[] serverColumn = getServerColumn(replicaId);
456 Cell cell = r.getColumnLatestCell(getCatalogFamily(), serverColumn);
457 if (cell == null || cell.getValueLength() == 0) return Optional.empty();
458 String hostAndPort = Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
459 cell.getValueLength());
460 byte[] startcodeColumn = getStartCodeColumn(replicaId);
461 cell = r.getColumnLatestCell(getCatalogFamily(), startcodeColumn);
462 if (cell == null || cell.getValueLength() == 0) return Optional.empty();
463 try {
464 return Optional.of(ServerName.valueOf(hostAndPort,
465 Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())));
466 } catch (IllegalArgumentException e) {
467 LOG.error("Ignoring invalid region for server " + hostAndPort + "; cell=" + cell, e);
468 return Optional.empty();
473 * The latest seqnum that the server writing to meta observed when opening the region.
474 * E.g. the seqNum when the result of {@link #getServerName(Result, int)} was written.
475 * @param r Result to pull the seqNum from
476 * @return SeqNum, or HConstants.NO_SEQNUM if there's no value written.
478 private static long getSeqNumDuringOpen(final Result r, final int replicaId) {
479 Cell cell = r.getColumnLatestCell(getCatalogFamily(), getSeqNumColumn(replicaId));
480 if (cell == null || cell.getValueLength() == 0) return HConstants.NO_SEQNUM;
481 return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
485 * @param tableName table we're working with
486 * @return start row for scanning META according to query type
488 private static Optional<byte[]> getTableStartRowForMeta(Optional<TableName> tableName,
489 QueryType type) {
490 return tableName.map((table) -> {
491 switch (type) {
492 case REGION:
493 case REPLICATION:
494 byte[] startRow = new byte[table.getName().length + 2];
495 System.arraycopy(table.getName(), 0, startRow, 0, table.getName().length);
496 startRow[startRow.length - 2] = HConstants.DELIMITER;
497 startRow[startRow.length - 1] = HConstants.DELIMITER;
498 return startRow;
499 case ALL:
500 case TABLE:
501 default:
502 return table.getName();
508 * @param tableName table we're working with
509 * @return stop row for scanning META according to query type
511 private static Optional<byte[]> getTableStopRowForMeta(Optional<TableName> tableName,
512 QueryType type) {
513 return tableName.map((table) -> {
514 final byte[] stopRow;
515 switch (type) {
516 case REGION:
517 case REPLICATION:
518 stopRow = new byte[table.getName().length + 3];
519 System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
520 stopRow[stopRow.length - 3] = ' ';
521 stopRow[stopRow.length - 2] = HConstants.DELIMITER;
522 stopRow[stopRow.length - 1] = HConstants.DELIMITER;
523 break;
524 case ALL:
525 case TABLE:
526 default:
527 stopRow = new byte[table.getName().length + 1];
528 System.arraycopy(table.getName(), 0, stopRow, 0, table.getName().length);
529 stopRow[stopRow.length - 1] = ' ';
530 break;
532 return stopRow;
537 * Returns the RegionInfo object from the column {@link HConstants#CATALOG_FAMILY} and
538 * <code>qualifier</code> of the catalog table result.
539 * @param r a Result object from the catalog table scan
540 * @param qualifier Column family qualifier
541 * @return An RegionInfo instance.
543 private static Optional<RegionInfo> getHRegionInfo(final Result r, byte[] qualifier) {
544 Cell cell = r.getColumnLatestCell(getCatalogFamily(), qualifier);
545 if (cell == null) return Optional.empty();
546 return Optional.ofNullable(RegionInfo.parseFromOrNull(cell.getValueArray(),
547 cell.getValueOffset(), cell.getValueLength()));
551 * Returns the column family used for meta columns.
552 * @return HConstants.CATALOG_FAMILY.
554 private static byte[] getCatalogFamily() {
555 return HConstants.CATALOG_FAMILY;
559 * Returns the column family used for table columns.
560 * @return HConstants.TABLE_FAMILY.
562 private static byte[] getTableFamily() {
563 return HConstants.TABLE_FAMILY;
567 * Returns the column qualifier for serialized region info
568 * @return HConstants.REGIONINFO_QUALIFIER
570 private static byte[] getRegionInfoColumn() {
571 return HConstants.REGIONINFO_QUALIFIER;
575 * Returns the column qualifier for serialized table state
576 * @return HConstants.TABLE_STATE_QUALIFIER
578 private static byte[] getStateColumn() {
579 return HConstants.TABLE_STATE_QUALIFIER;
583 * Returns the column qualifier for server column for replicaId
584 * @param replicaId the replicaId of the region
585 * @return a byte[] for server column qualifier
587 private static byte[] getServerColumn(int replicaId) {
588 return replicaId == 0
589 ? HConstants.SERVER_QUALIFIER
590 : Bytes.toBytes(HConstants.SERVER_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
591 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId));
595 * Returns the column qualifier for server start code column for replicaId
596 * @param replicaId the replicaId of the region
597 * @return a byte[] for server start code column qualifier
599 private static byte[] getStartCodeColumn(int replicaId) {
600 return replicaId == 0
601 ? HConstants.STARTCODE_QUALIFIER
602 : Bytes.toBytes(HConstants.STARTCODE_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
603 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId));
607 * Returns the column qualifier for seqNum column for replicaId
608 * @param replicaId the replicaId of the region
609 * @return a byte[] for seqNum column qualifier
611 private static byte[] getSeqNumColumn(int replicaId) {
612 return replicaId == 0
613 ? HConstants.SEQNUM_QUALIFIER
614 : Bytes.toBytes(HConstants.SEQNUM_QUALIFIER_STR + META_REPLICA_ID_DELIMITER
615 + String.format(RegionInfo.REPLICA_ID_FORMAT, replicaId));
619 * Parses the replicaId from the server column qualifier. See top of the class javadoc
620 * for the actual meta layout
621 * @param serverColumn the column qualifier
622 * @return an int for the replicaId
624 private static int parseReplicaIdFromServerColumn(byte[] serverColumn) {
625 String serverStr = Bytes.toString(serverColumn);
627 Matcher matcher = SERVER_COLUMN_PATTERN.matcher(serverStr);
628 if (matcher.matches() && matcher.groupCount() > 0) {
629 String group = matcher.group(1);
630 if (group != null && group.length() > 0) {
631 return Integer.parseInt(group.substring(1), 16);
632 } else {
633 return 0;
636 return -1;