HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / ClientMetaTableAccessor.java
blobecc65733c12bf7f5f7e0586c844794504228ac95
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase;
20 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.stream.Collectors;
30 import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
31 import org.apache.hadoop.hbase.client.AsyncTable;
32 import org.apache.hadoop.hbase.client.Consistency;
33 import org.apache.hadoop.hbase.client.Get;
34 import org.apache.hadoop.hbase.client.RegionInfo;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.client.Scan.ReadType;
38 import org.apache.hadoop.hbase.client.TableState;
39 import org.apache.hadoop.hbase.util.Bytes;
40 import org.apache.hadoop.hbase.util.Pair;
41 import org.apache.yetus.audience.InterfaceAudience;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 /**
46 * The (asynchronous) meta table accessor used at client side. Used to read/write region and
47 * assignment information store in <code>hbase:meta</code>.
48 * @since 2.0.0
49 * @see CatalogFamilyFormat
51 @InterfaceAudience.Private
52 public final class ClientMetaTableAccessor {
54 private static final Logger LOG = LoggerFactory.getLogger(ClientMetaTableAccessor.class);
56 private ClientMetaTableAccessor() {
59 @InterfaceAudience.Private
60 public enum QueryType {
61 ALL(HConstants.TABLE_FAMILY, HConstants.CATALOG_FAMILY), REGION(HConstants.CATALOG_FAMILY),
62 TABLE(HConstants.TABLE_FAMILY), REPLICATION(HConstants.REPLICATION_BARRIER_FAMILY);
64 private final byte[][] families;
66 QueryType(byte[]... families) {
67 this.families = families;
70 byte[][] getFamilies() {
71 return this.families;
75 public static CompletableFuture<Boolean> tableExists(AsyncTable<?> metaTable,
76 TableName tableName) {
77 return getTableState(metaTable, tableName).thenApply(Optional::isPresent);
80 public static CompletableFuture<Optional<TableState>> getTableState(AsyncTable<?> metaTable,
81 TableName tableName) {
82 CompletableFuture<Optional<TableState>> future = new CompletableFuture<>();
83 Get get = new Get(tableName.getName()).addColumn(HConstants.TABLE_FAMILY,
84 HConstants.TABLE_STATE_QUALIFIER);
85 addListener(metaTable.get(get), (result, error) -> {
86 if (error != null) {
87 future.completeExceptionally(error);
88 return;
90 try {
91 future.complete(getTableState(result));
92 } catch (IOException e) {
93 future.completeExceptionally(e);
95 });
96 return future;
99 /**
100 * Returns the HRegionLocation from meta for the given region
101 * @param metaTable
102 * @param regionName region we're looking for
103 * @return HRegionLocation for the given region
105 public static CompletableFuture<Optional<HRegionLocation>>
106 getRegionLocation(AsyncTable<?> metaTable, byte[] regionName) {
107 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
108 try {
109 RegionInfo parsedRegionInfo = CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
110 addListener(metaTable.get(new Get(CatalogFamilyFormat.getMetaKeyForRegion(parsedRegionInfo))
111 .addFamily(HConstants.CATALOG_FAMILY)), (r, err) -> {
112 if (err != null) {
113 future.completeExceptionally(err);
114 return;
116 future.complete(getRegionLocations(r)
117 .map(locations -> locations.getRegionLocation(parsedRegionInfo.getReplicaId())));
119 } catch (IOException parseEx) {
120 LOG.warn("Failed to parse the passed region name: " + Bytes.toStringBinary(regionName));
121 future.completeExceptionally(parseEx);
123 return future;
127 * Returns the HRegionLocation from meta for the given encoded region name
128 * @param metaTable
129 * @param encodedRegionName region we're looking for
130 * @return HRegionLocation for the given region
132 public static CompletableFuture<Optional<HRegionLocation>>
133 getRegionLocationWithEncodedName(AsyncTable<?> metaTable, byte[] encodedRegionName) {
134 CompletableFuture<Optional<HRegionLocation>> future = new CompletableFuture<>();
135 addListener(
136 metaTable
137 .scanAll(new Scan().setReadType(ReadType.PREAD).addFamily(HConstants.CATALOG_FAMILY)),
138 (results, err) -> {
139 if (err != null) {
140 future.completeExceptionally(err);
141 return;
143 String encodedRegionNameStr = Bytes.toString(encodedRegionName);
144 results.stream().filter(result -> !result.isEmpty())
145 .filter(result -> CatalogFamilyFormat.getRegionInfo(result) != null).forEach(result -> {
146 getRegionLocations(result).ifPresent(locations -> {
147 for (HRegionLocation location : locations.getRegionLocations()) {
148 if (location != null &&
149 encodedRegionNameStr.equals(location.getRegion().getEncodedName())) {
150 future.complete(Optional.of(location));
151 return;
156 future.complete(Optional.empty());
158 return future;
161 private static Optional<TableState> getTableState(Result r) throws IOException {
162 return Optional.ofNullable(CatalogFamilyFormat.getTableState(r));
166 * Used to get all region locations for the specific table.
167 * @param metaTable
168 * @param tableName table we're looking for, can be null for getting all regions
169 * @return the list of region locations. The return value will be wrapped by a
170 * {@link CompletableFuture}.
172 public static CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(
173 AsyncTable<AdvancedScanResultConsumer> metaTable, TableName tableName) {
174 CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
175 addListener(getTableRegionsAndLocations(metaTable, tableName, true), (locations, err) -> {
176 if (err != null) {
177 future.completeExceptionally(err);
178 } else if (locations == null || locations.isEmpty()) {
179 future.complete(Collections.emptyList());
180 } else {
181 List<HRegionLocation> regionLocations =
182 locations.stream().map(loc -> new HRegionLocation(loc.getFirst(), loc.getSecond()))
183 .collect(Collectors.toList());
184 future.complete(regionLocations);
187 return future;
191 * Used to get table regions' info and server.
192 * @param metaTable
193 * @param tableName table we're looking for, can be null for getting all regions
194 * @param excludeOfflinedSplitParents don't return split parents
195 * @return the list of regioninfos and server. The return value will be wrapped by a
196 * {@link CompletableFuture}.
198 private static CompletableFuture<List<Pair<RegionInfo, ServerName>>> getTableRegionsAndLocations(
199 final AsyncTable<AdvancedScanResultConsumer> metaTable, final TableName tableName,
200 final boolean excludeOfflinedSplitParents) {
201 CompletableFuture<List<Pair<RegionInfo, ServerName>>> future = new CompletableFuture<>();
202 if (TableName.META_TABLE_NAME.equals(tableName)) {
203 future.completeExceptionally(new IOException(
204 "This method can't be used to locate meta regions;" + " use MetaTableLocator instead"));
207 // Make a version of CollectingVisitor that collects RegionInfo and ServerAddress
208 CollectRegionLocationsVisitor visitor =
209 new CollectRegionLocationsVisitor(excludeOfflinedSplitParents);
211 addListener(scanMeta(metaTable, tableName, QueryType.REGION, visitor), (v, error) -> {
212 if (error != null) {
213 future.completeExceptionally(error);
214 return;
216 future.complete(visitor.getResults());
218 return future;
222 * Performs a scan of META table for given table.
223 * @param metaTable
224 * @param tableName table withing we scan
225 * @param type scanned part of meta
226 * @param visitor Visitor invoked against each row
228 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
229 TableName tableName, QueryType type, final Visitor visitor) {
230 return scanMeta(metaTable, getTableStartRowForMeta(tableName, type),
231 getTableStopRowForMeta(tableName, type), type, Integer.MAX_VALUE, visitor);
235 * Performs a scan of META table for given table.
236 * @param metaTable
237 * @param startRow Where to start the scan
238 * @param stopRow Where to stop the scan
239 * @param type scanned part of meta
240 * @param maxRows maximum rows to return
241 * @param visitor Visitor invoked against each row
243 private static CompletableFuture<Void> scanMeta(AsyncTable<AdvancedScanResultConsumer> metaTable,
244 byte[] startRow, byte[] stopRow, QueryType type, int maxRows, final Visitor visitor) {
245 int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
246 Scan scan = getMetaScan(metaTable, rowUpperLimit);
247 for (byte[] family : type.getFamilies()) {
248 scan.addFamily(family);
250 if (startRow != null) {
251 scan.withStartRow(startRow);
253 if (stopRow != null) {
254 scan.withStopRow(stopRow);
257 if (LOG.isDebugEnabled()) {
258 LOG.debug("Scanning META" + " starting at row=" + Bytes.toStringBinary(scan.getStartRow()) +
259 " stopping at row=" + Bytes.toStringBinary(scan.getStopRow()) + " for max=" +
260 rowUpperLimit + " with caching=" + scan.getCaching());
263 CompletableFuture<Void> future = new CompletableFuture<Void>();
264 metaTable.scan(scan, new MetaTableScanResultConsumer(rowUpperLimit, visitor, future));
265 return future;
268 private static final class MetaTableScanResultConsumer implements AdvancedScanResultConsumer {
270 private int currentRowCount;
272 private final int rowUpperLimit;
274 private final Visitor visitor;
276 private final CompletableFuture<Void> future;
278 MetaTableScanResultConsumer(int rowUpperLimit, Visitor visitor,
279 CompletableFuture<Void> future) {
280 this.rowUpperLimit = rowUpperLimit;
281 this.visitor = visitor;
282 this.future = future;
283 this.currentRowCount = 0;
286 @Override
287 public void onError(Throwable error) {
288 future.completeExceptionally(error);
291 @Override
292 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NONNULL_PARAM_VIOLATION",
293 justification = "https://github.com/findbugsproject/findbugs/issues/79")
294 public void onComplete() {
295 future.complete(null);
298 @Override
299 public void onNext(Result[] results, ScanController controller) {
300 boolean terminateScan = false;
301 for (Result result : results) {
302 try {
303 if (!visitor.visit(result)) {
304 terminateScan = true;
305 break;
307 } catch (Exception e) {
308 future.completeExceptionally(e);
309 terminateScan = true;
310 break;
312 if (++currentRowCount >= rowUpperLimit) {
313 terminateScan = true;
314 break;
317 if (terminateScan) {
318 controller.terminate();
324 * Implementations 'visit' a catalog table row.
326 public interface Visitor {
328 * Visit the catalog table row.
329 * @param r A row from catalog table
330 * @return True if we are to proceed scanning the table, else false if we are to stop now.
332 boolean visit(final Result r) throws IOException;
336 * Implementations 'visit' a catalog table row but with close() at the end.
338 public interface CloseableVisitor extends Visitor, Closeable {
342 * A {@link Visitor} that collects content out of passed {@link Result}.
344 private static abstract class CollectingVisitor<T> implements Visitor {
345 final List<T> results = new ArrayList<>();
347 @Override
348 public boolean visit(Result r) throws IOException {
349 if (r != null && !r.isEmpty()) {
350 add(r);
352 return true;
355 abstract void add(Result r);
358 * @return Collected results; wait till visits complete to collect all possible results
360 List<T> getResults() {
361 return this.results;
365 static class CollectRegionLocationsVisitor
366 extends CollectingVisitor<Pair<RegionInfo, ServerName>> {
368 private final boolean excludeOfflinedSplitParents;
370 private RegionLocations current = null;
372 CollectRegionLocationsVisitor(boolean excludeOfflinedSplitParents) {
373 this.excludeOfflinedSplitParents = excludeOfflinedSplitParents;
376 @Override
377 public boolean visit(Result r) throws IOException {
378 Optional<RegionLocations> currentRegionLocations = getRegionLocations(r);
379 current = currentRegionLocations.orElse(null);
380 if (current == null || current.getRegionLocation().getRegion() == null) {
381 LOG.warn("No serialized RegionInfo in " + r);
382 return true;
384 RegionInfo hri = current.getRegionLocation().getRegion();
385 if (excludeOfflinedSplitParents && hri.isSplitParent()) {
386 return true;
388 // Else call super and add this Result to the collection.
389 return super.visit(r);
392 @Override
393 void add(Result r) {
394 if (current == null) {
395 return;
397 for (HRegionLocation loc : current.getRegionLocations()) {
398 if (loc != null) {
399 this.results.add(new Pair<RegionInfo, ServerName>(loc.getRegion(), loc.getServerName()));
406 * Collects all returned.
408 static class CollectAllVisitor extends CollectingVisitor<Result> {
409 @Override
410 void add(Result r) {
411 this.results.add(r);
415 private static Scan getMetaScan(AsyncTable<?> metaTable, int rowUpperLimit) {
416 Scan scan = new Scan();
417 int scannerCaching = metaTable.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
418 HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
419 if (metaTable.getConfiguration().getBoolean(HConstants.USE_META_REPLICAS,
420 HConstants.DEFAULT_USE_META_REPLICAS)) {
421 scan.setConsistency(Consistency.TIMELINE);
423 if (rowUpperLimit <= scannerCaching) {
424 scan.setLimit(rowUpperLimit);
426 int rows = Math.min(rowUpperLimit, scannerCaching);
427 scan.setCaching(rows);
428 return scan;
432 * Returns an HRegionLocationList extracted from the result.
433 * @return an HRegionLocationList containing all locations for the region range or null if we
434 * can't deserialize the result.
436 private static Optional<RegionLocations> getRegionLocations(Result r) {
437 return Optional.ofNullable(CatalogFamilyFormat.getRegionLocations(r));
441 * @param tableName table we're working with
442 * @return start row for scanning META according to query type
444 public static byte[] getTableStartRowForMeta(TableName tableName, QueryType type) {
445 if (tableName == null) {
446 return null;
448 switch (type) {
449 case REGION:
450 case REPLICATION: {
451 byte[] startRow = new byte[tableName.getName().length + 2];
452 System.arraycopy(tableName.getName(), 0, startRow, 0, tableName.getName().length);
453 startRow[startRow.length - 2] = HConstants.DELIMITER;
454 startRow[startRow.length - 1] = HConstants.DELIMITER;
455 return startRow;
457 case ALL:
458 case TABLE:
459 default: {
460 return tableName.getName();
466 * @param tableName table we're working with
467 * @return stop row for scanning META according to query type
469 public static byte[] getTableStopRowForMeta(TableName tableName, QueryType type) {
470 if (tableName == null) {
471 return null;
473 final byte[] stopRow;
474 switch (type) {
475 case REGION:
476 case REPLICATION: {
477 stopRow = new byte[tableName.getName().length + 3];
478 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
479 stopRow[stopRow.length - 3] = ' ';
480 stopRow[stopRow.length - 2] = HConstants.DELIMITER;
481 stopRow[stopRow.length - 1] = HConstants.DELIMITER;
482 break;
484 case ALL:
485 case TABLE:
486 default: {
487 stopRow = new byte[tableName.getName().length + 1];
488 System.arraycopy(tableName.getName(), 0, stopRow, 0, tableName.getName().length);
489 stopRow[stopRow.length - 1] = ' ';
490 break;
493 return stopRow;