HBASE-26765 Minor refactor of async scanning code (#4121)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / TableOverAsyncTable.java
blob1260f313cf52a42fe71960e29e6c1f04aee4aa91
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.stream.Collectors;
38 import org.apache.commons.lang3.ArrayUtils;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.CompareOperator;
41 import org.apache.hadoop.hbase.DoNotRetryIOException;
42 import org.apache.hadoop.hbase.HBaseIOException;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.HRegionLocation;
45 import org.apache.hadoop.hbase.TableName;
46 import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
47 import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
48 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
49 import org.apache.hadoop.hbase.filter.Filter;
50 import org.apache.hadoop.hbase.io.TimeRange;
51 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
54 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55 import org.apache.hadoop.hbase.util.FutureUtils;
56 import org.apache.hadoop.hbase.util.Pair;
57 import org.apache.yetus.audience.InterfaceAudience;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
62 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
63 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
64 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
65 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
66 import org.apache.hbase.thirdparty.com.google.protobuf.Service;
67 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
69 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
71 /**
72 * The table implementation based on {@link AsyncTable}.
74 @InterfaceAudience.Private
75 class TableOverAsyncTable implements Table {
77 private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class);
79 private final AsyncConnectionImpl conn;
81 private final AsyncTable<?> table;
83 private final IOExceptionSupplier<ExecutorService> poolSupplier;
85 TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
86 IOExceptionSupplier<ExecutorService> poolSupplier) {
87 this.conn = conn;
88 this.table = table;
89 this.poolSupplier = poolSupplier;
92 @Override
93 public TableName getName() {
94 return table.getName();
97 @Override
98 public Configuration getConfiguration() {
99 return table.getConfiguration();
102 @Override
103 public TableDescriptor getDescriptor() throws IOException {
104 return FutureUtils.get(conn.getAdmin().getDescriptor(getName()));
107 @Override
108 public boolean exists(Get get) throws IOException {
109 return FutureUtils.get(table.exists(get));
112 @Override
113 public boolean[] exists(List<Get> gets) throws IOException {
114 return Booleans.toArray(FutureUtils.get(table.existsAll(gets)));
117 @Override
118 public void batch(List<? extends Row> actions, Object[] results) throws IOException {
119 if (ArrayUtils.isEmpty(results)) {
120 FutureUtils.get(table.batchAll(actions));
121 return;
123 List<ThrowableWithExtraContext> errors = new ArrayList<>();
124 List<CompletableFuture<Object>> futures = table.batch(actions);
125 for (int i = 0, n = results.length; i < n; i++) {
126 try {
127 results[i] = FutureUtils.get(futures.get(i));
128 } catch (IOException e) {
129 results[i] = e;
130 errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
131 "Error when processing " + actions.get(i)));
134 if (!errors.isEmpty()) {
135 throw new RetriesExhaustedException(errors.size(), errors);
139 @Override
140 public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback)
141 throws IOException, InterruptedException {
142 ConcurrentLinkedQueue<ThrowableWithExtraContext> errors = new ConcurrentLinkedQueue<>();
143 CountDownLatch latch = new CountDownLatch(actions.size());
144 AsyncTableRegionLocator locator = conn.getRegionLocator(getName());
145 List<CompletableFuture<R>> futures = table.<R> batch(actions);
146 for (int i = 0, n = futures.size(); i < n; i++) {
147 final int index = i;
148 FutureUtils.addListener(futures.get(i), (r, e) -> {
149 if (e != null) {
150 errors.add(new ThrowableWithExtraContext(e, EnvironmentEdgeManager.currentTime(),
151 "Error when processing " + actions.get(index)));
152 if (!ArrayUtils.isEmpty(results)) {
153 results[index] = e;
155 latch.countDown();
156 } else {
157 if (!ArrayUtils.isEmpty(results)) {
158 results[index] = r;
160 FutureUtils.addListener(locator.getRegionLocation(actions.get(index).getRow()),
161 (l, le) -> {
162 if (le != null) {
163 errors.add(new ThrowableWithExtraContext(le, EnvironmentEdgeManager.currentTime(),
164 "Error when finding the region for row " +
165 Bytes.toStringBinary(actions.get(index).getRow())));
166 } else {
167 callback.update(l.getRegion().getRegionName(), actions.get(index).getRow(), r);
169 latch.countDown();
174 latch.await();
175 if (!errors.isEmpty()) {
176 throw new RetriesExhaustedException(errors.size(),
177 errors.stream().collect(Collectors.toList()));
181 @Override
182 public Result get(Get get) throws IOException {
183 return FutureUtils.get(table.get(get));
186 @Override
187 public Result[] get(List<Get> gets) throws IOException {
188 return FutureUtils.get(table.getAll(gets)).toArray(new Result[0]);
191 @Override
192 public ResultScanner getScanner(Scan scan) throws IOException {
193 return table.getScanner(scan);
196 @Override
197 public ResultScanner getScanner(byte[] family) throws IOException {
198 return table.getScanner(family);
201 @Override
202 public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
203 return table.getScanner(family, qualifier);
206 @Override
207 public void put(Put put) throws IOException {
208 FutureUtils.get(table.put(put));
211 @Override
212 public void put(List<Put> puts) throws IOException {
213 FutureUtils.get(table.putAll(puts));
216 @Override
217 public void delete(Delete delete) throws IOException {
218 FutureUtils.get(table.delete(delete));
221 @Override
222 public void delete(List<Delete> deletes) throws IOException {
223 FutureUtils.get(table.deleteAll(deletes));
226 @Override
227 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
228 return new CheckAndMutateBuilder() {
230 private final AsyncTable.CheckAndMutateBuilder builder = table.checkAndMutate(row, family);
232 @Override
233 public CheckAndMutateBuilder qualifier(byte[] qualifier) {
234 builder.qualifier(qualifier);
235 return this;
238 @Override
239 public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
240 builder.timeRange(timeRange);
241 return this;
244 @Override
245 public CheckAndMutateBuilder ifNotExists() {
246 builder.ifNotExists();
247 return this;
250 @Override
251 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
252 builder.ifMatches(compareOp, value);
253 return this;
256 @Override
257 public boolean thenPut(Put put) throws IOException {
258 return FutureUtils.get(builder.thenPut(put));
261 @Override
262 public boolean thenDelete(Delete delete) throws IOException {
263 return FutureUtils.get(builder.thenDelete(delete));
266 @Override
267 public boolean thenMutate(RowMutations mutation) throws IOException {
268 return FutureUtils.get(builder.thenMutate(mutation));
273 @Override
274 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
275 return new CheckAndMutateWithFilterBuilder() {
276 private final AsyncTable.CheckAndMutateWithFilterBuilder builder =
277 table.checkAndMutate(row, filter);
279 @Override
280 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
281 builder.timeRange(timeRange);
282 return this;
285 @Override
286 public boolean thenPut(Put put) throws IOException {
287 return FutureUtils.get(builder.thenPut(put));
290 @Override
291 public boolean thenDelete(Delete delete) throws IOException {
292 return FutureUtils.get(builder.thenDelete(delete));
295 @Override
296 public boolean thenMutate(RowMutations mutation) throws IOException {
297 return FutureUtils.get(builder.thenMutate(mutation));
302 @Override
303 public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
304 return FutureUtils.get(table.checkAndMutate(checkAndMutate));
307 @Override
308 public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
309 throws IOException {
310 return FutureUtils.get(table.checkAndMutateAll(checkAndMutates));
313 @Override
314 public Result mutateRow(RowMutations rm) throws IOException {
315 return FutureUtils.get(table.mutateRow(rm));
318 @Override
319 public Result append(Append append) throws IOException {
320 return FutureUtils.get(table.append(append));
323 @Override
324 public Result increment(Increment increment) throws IOException {
325 return FutureUtils.get(table.increment(increment));
328 @Override
329 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
330 throws IOException {
331 return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount));
334 @Override
335 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
336 Durability durability) throws IOException {
337 return FutureUtils.get(table.incrementColumnValue(row, family, qualifier, amount, durability));
340 @Override
341 public void close() {
344 @SuppressWarnings("deprecation")
345 private static final class RegionCoprocessorRpcChannel extends RegionCoprocessorRpcChannelImpl
346 implements CoprocessorRpcChannel {
348 RegionCoprocessorRpcChannel(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
349 byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
350 super(conn, tableName, region, row, rpcTimeoutNs, operationTimeoutNs);
353 @Override
354 public void callMethod(MethodDescriptor method, RpcController controller, Message request,
355 Message responsePrototype, RpcCallback<Message> done) {
356 ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
357 CoprocessorBlockingRpcCallback<Message> callback = new CoprocessorBlockingRpcCallback<>();
358 super.callMethod(method, c, request, responsePrototype, callback);
359 Message ret;
360 try {
361 ret = callback.get();
362 } catch (IOException e) {
363 setCoprocessorError(controller, e);
364 return;
366 if (c.failed()) {
367 setCoprocessorError(controller, c.getFailed());
369 done.run(ret);
372 @Override
373 public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
374 Message request, Message responsePrototype) throws ServiceException {
375 ClientCoprocessorRpcController c = new ClientCoprocessorRpcController();
376 CoprocessorBlockingRpcCallback<Message> done = new CoprocessorBlockingRpcCallback<>();
377 callMethod(method, c, request, responsePrototype, done);
378 Message ret;
379 try {
380 ret = done.get();
381 } catch (IOException e) {
382 throw new ServiceException(e);
384 if (c.failed()) {
385 setCoprocessorError(controller, c.getFailed());
386 throw new ServiceException(c.getFailed());
388 return ret;
392 @Override
393 public RegionCoprocessorRpcChannel coprocessorService(byte[] row) {
394 return new RegionCoprocessorRpcChannel(conn, getName(), null, row,
395 getRpcTimeout(TimeUnit.NANOSECONDS), getOperationTimeout(TimeUnit.NANOSECONDS));
399 * Get the corresponding start keys and regions for an arbitrary range of keys.
400 * <p>
401 * @param startKey Starting row in range, inclusive
402 * @param endKey Ending row in range
403 * @param includeEndKey true if endRow is inclusive, false if exclusive
404 * @return A pair of list of start keys and list of HRegionLocations that contain the specified
405 * range
406 * @throws IOException if a remote or network exception occurs
408 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
409 final byte[] endKey, final boolean includeEndKey) throws IOException {
410 return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
414 * Get the corresponding start keys and regions for an arbitrary range of keys.
415 * <p>
416 * @param startKey Starting row in range, inclusive
417 * @param endKey Ending row in range
418 * @param includeEndKey true if endRow is inclusive, false if exclusive
419 * @param reload true to reload information or false to use cached information
420 * @return A pair of list of start keys and list of HRegionLocations that contain the specified
421 * range
422 * @throws IOException if a remote or network exception occurs
424 private Pair<List<byte[]>, List<HRegionLocation>> getKeysAndRegionsInRange(final byte[] startKey,
425 final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
426 final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
427 if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
428 throw new IllegalArgumentException(
429 "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + Bytes.toStringBinary(endKey));
431 List<byte[]> keysInRange = new ArrayList<>();
432 List<HRegionLocation> regionsInRange = new ArrayList<>();
433 byte[] currentKey = startKey;
434 do {
435 HRegionLocation regionLocation =
436 FutureUtils.get(conn.getRegionLocator(getName()).getRegionLocation(currentKey, reload));
437 keysInRange.add(currentKey);
438 regionsInRange.add(regionLocation);
439 currentKey = regionLocation.getRegion().getEndKey();
440 } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
441 (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 ||
442 (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
443 return new Pair<>(keysInRange, regionsInRange);
446 private List<byte[]> getStartKeysInRange(byte[] start, byte[] end) throws IOException {
447 if (start == null) {
448 start = HConstants.EMPTY_START_ROW;
450 if (end == null) {
451 end = HConstants.EMPTY_END_ROW;
453 return getKeysAndRegionsInRange(start, end, true).getFirst();
456 @FunctionalInterface
457 private interface StubCall<R> {
458 R call(RegionCoprocessorRpcChannel channel) throws Exception;
461 private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
462 Callback<R> callback, StubCall<R> call) throws Throwable {
463 // get regions covered by the row range
464 ExecutorService pool = this.poolSupplier.get();
465 List<byte[]> keys = getStartKeysInRange(startKey, endKey);
466 Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
467 try {
468 for (byte[] r : keys) {
469 RegionCoprocessorRpcChannel channel = coprocessorService(r);
470 Future<R> future = pool.submit(new Callable<R>() {
471 @Override
472 public R call() throws Exception {
473 R result = call.call(channel);
474 byte[] region = channel.getLastRegion();
475 if (callback != null) {
476 callback.update(region, r, result);
478 return result;
481 futures.put(r, future);
483 } catch (RejectedExecutionException e) {
484 // maybe the connection has been closed, let's check
485 if (conn.isClosed()) {
486 throw new DoNotRetryIOException("Connection is closed", e);
487 } else {
488 throw new HBaseIOException("Coprocessor operation is rejected", e);
491 for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
492 try {
493 e.getValue().get();
494 } catch (ExecutionException ee) {
495 LOG.warn("Error calling coprocessor service " + serviceName + " for row " +
496 Bytes.toStringBinary(e.getKey()), ee);
497 throw ee.getCause();
498 } catch (InterruptedException ie) {
499 throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName +
500 " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
505 @Override
506 public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
507 byte[] endKey, Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable {
508 coprocssorService(service.getName(), startKey, endKey, callback, channel -> {
509 T instance = ProtobufUtil.newServiceStub(service, channel);
510 return callable.call(instance);
514 @SuppressWarnings("unchecked")
515 @Override
516 public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
517 Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
518 throws ServiceException, Throwable {
519 coprocssorService(methodDescriptor.getFullName(), startKey, endKey, callback, channel -> {
520 return (R) channel.callBlockingMethod(methodDescriptor, null, request, responsePrototype);
524 @Override
525 public long getRpcTimeout(TimeUnit unit) {
526 return table.getRpcTimeout(unit);
529 @Override
530 public long getReadRpcTimeout(TimeUnit unit) {
531 return table.getReadRpcTimeout(unit);
534 @Override
535 public long getWriteRpcTimeout(TimeUnit unit) {
536 return table.getWriteRpcTimeout(unit);
539 @Override
540 public long getOperationTimeout(TimeUnit unit) {
541 return table.getOperationTimeout(unit);
544 @Override
545 public RegionLocator getRegionLocator() throws IOException {
546 return conn.toConnection().getRegionLocator(getName());