2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.setCoprocessorError
;
22 import java
.io
.IOException
;
23 import java
.io
.InterruptedIOException
;
24 import java
.util
.ArrayList
;
25 import java
.util
.List
;
27 import java
.util
.TreeMap
;
28 import java
.util
.concurrent
.Callable
;
29 import java
.util
.concurrent
.CompletableFuture
;
30 import java
.util
.concurrent
.ConcurrentLinkedQueue
;
31 import java
.util
.concurrent
.CountDownLatch
;
32 import java
.util
.concurrent
.ExecutionException
;
33 import java
.util
.concurrent
.ExecutorService
;
34 import java
.util
.concurrent
.Future
;
35 import java
.util
.concurrent
.RejectedExecutionException
;
36 import java
.util
.concurrent
.TimeUnit
;
37 import java
.util
.stream
.Collectors
;
38 import org
.apache
.commons
.lang3
.ArrayUtils
;
39 import org
.apache
.hadoop
.conf
.Configuration
;
40 import org
.apache
.hadoop
.hbase
.CompareOperator
;
41 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
42 import org
.apache
.hadoop
.hbase
.HBaseIOException
;
43 import org
.apache
.hadoop
.hbase
.HConstants
;
44 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
45 import org
.apache
.hadoop
.hbase
.TableName
;
46 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedException
.ThrowableWithExtraContext
;
47 import org
.apache
.hadoop
.hbase
.client
.coprocessor
.Batch
.Call
;
48 import org
.apache
.hadoop
.hbase
.client
.coprocessor
.Batch
.Callback
;
49 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
50 import org
.apache
.hadoop
.hbase
.io
.TimeRange
;
51 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcChannel
;
52 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
53 import org
.apache
.hadoop
.hbase
.util
.ConcurrentMapUtils
.IOExceptionSupplier
;
54 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
55 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
56 import org
.apache
.hadoop
.hbase
.util
.Pair
;
57 import org
.apache
.yetus
.audience
.InterfaceAudience
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.primitives
.Booleans
;
62 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
63 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
64 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
65 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
66 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Service
;
67 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
72 * The table implementation based on {@link AsyncTable}.
74 @InterfaceAudience.Private
75 class TableOverAsyncTable
implements Table
{
77 private static final Logger LOG
= LoggerFactory
.getLogger(TableOverAsyncTable
.class);
79 private final AsyncConnectionImpl conn
;
81 private final AsyncTable
<?
> table
;
83 private final IOExceptionSupplier
<ExecutorService
> poolSupplier
;
85 TableOverAsyncTable(AsyncConnectionImpl conn
, AsyncTable
<?
> table
,
86 IOExceptionSupplier
<ExecutorService
> poolSupplier
) {
89 this.poolSupplier
= poolSupplier
;
93 public TableName
getName() {
94 return table
.getName();
98 public Configuration
getConfiguration() {
99 return table
.getConfiguration();
103 public TableDescriptor
getDescriptor() throws IOException
{
104 return FutureUtils
.get(conn
.getAdmin().getDescriptor(getName()));
108 public boolean exists(Get get
) throws IOException
{
109 return FutureUtils
.get(table
.exists(get
));
113 public boolean[] exists(List
<Get
> gets
) throws IOException
{
114 return Booleans
.toArray(FutureUtils
.get(table
.existsAll(gets
)));
118 public void batch(List
<?
extends Row
> actions
, Object
[] results
) throws IOException
{
119 if (ArrayUtils
.isEmpty(results
)) {
120 FutureUtils
.get(table
.batchAll(actions
));
123 List
<ThrowableWithExtraContext
> errors
= new ArrayList
<>();
124 List
<CompletableFuture
<Object
>> futures
= table
.batch(actions
);
125 for (int i
= 0, n
= results
.length
; i
< n
; i
++) {
127 results
[i
] = FutureUtils
.get(futures
.get(i
));
128 } catch (IOException e
) {
130 errors
.add(new ThrowableWithExtraContext(e
, EnvironmentEdgeManager
.currentTime(),
131 "Error when processing " + actions
.get(i
)));
134 if (!errors
.isEmpty()) {
135 throw new RetriesExhaustedException(errors
.size(), errors
);
140 public <R
> void batchCallback(List
<?
extends Row
> actions
, Object
[] results
, Callback
<R
> callback
)
141 throws IOException
, InterruptedException
{
142 ConcurrentLinkedQueue
<ThrowableWithExtraContext
> errors
= new ConcurrentLinkedQueue
<>();
143 CountDownLatch latch
= new CountDownLatch(actions
.size());
144 AsyncTableRegionLocator locator
= conn
.getRegionLocator(getName());
145 List
<CompletableFuture
<R
>> futures
= table
.<R
> batch(actions
);
146 for (int i
= 0, n
= futures
.size(); i
< n
; i
++) {
148 FutureUtils
.addListener(futures
.get(i
), (r
, e
) -> {
150 errors
.add(new ThrowableWithExtraContext(e
, EnvironmentEdgeManager
.currentTime(),
151 "Error when processing " + actions
.get(index
)));
152 if (!ArrayUtils
.isEmpty(results
)) {
157 if (!ArrayUtils
.isEmpty(results
)) {
160 FutureUtils
.addListener(locator
.getRegionLocation(actions
.get(index
).getRow()),
163 errors
.add(new ThrowableWithExtraContext(le
, EnvironmentEdgeManager
.currentTime(),
164 "Error when finding the region for row " +
165 Bytes
.toStringBinary(actions
.get(index
).getRow())));
167 callback
.update(l
.getRegion().getRegionName(), actions
.get(index
).getRow(), r
);
175 if (!errors
.isEmpty()) {
176 throw new RetriesExhaustedException(errors
.size(),
177 errors
.stream().collect(Collectors
.toList()));
182 public Result
get(Get get
) throws IOException
{
183 return FutureUtils
.get(table
.get(get
));
187 public Result
[] get(List
<Get
> gets
) throws IOException
{
188 return FutureUtils
.get(table
.getAll(gets
)).toArray(new Result
[0]);
192 public ResultScanner
getScanner(Scan scan
) throws IOException
{
193 return table
.getScanner(scan
);
197 public ResultScanner
getScanner(byte[] family
) throws IOException
{
198 return table
.getScanner(family
);
202 public ResultScanner
getScanner(byte[] family
, byte[] qualifier
) throws IOException
{
203 return table
.getScanner(family
, qualifier
);
207 public void put(Put put
) throws IOException
{
208 FutureUtils
.get(table
.put(put
));
212 public void put(List
<Put
> puts
) throws IOException
{
213 FutureUtils
.get(table
.putAll(puts
));
217 public void delete(Delete delete
) throws IOException
{
218 FutureUtils
.get(table
.delete(delete
));
222 public void delete(List
<Delete
> deletes
) throws IOException
{
223 FutureUtils
.get(table
.deleteAll(deletes
));
227 public CheckAndMutateBuilder
checkAndMutate(byte[] row
, byte[] family
) {
228 return new CheckAndMutateBuilder() {
230 private final AsyncTable
.CheckAndMutateBuilder builder
= table
.checkAndMutate(row
, family
);
233 public CheckAndMutateBuilder
qualifier(byte[] qualifier
) {
234 builder
.qualifier(qualifier
);
239 public CheckAndMutateBuilder
timeRange(TimeRange timeRange
) {
240 builder
.timeRange(timeRange
);
245 public CheckAndMutateBuilder
ifNotExists() {
246 builder
.ifNotExists();
251 public CheckAndMutateBuilder
ifMatches(CompareOperator compareOp
, byte[] value
) {
252 builder
.ifMatches(compareOp
, value
);
257 public boolean thenPut(Put put
) throws IOException
{
258 return FutureUtils
.get(builder
.thenPut(put
));
262 public boolean thenDelete(Delete delete
) throws IOException
{
263 return FutureUtils
.get(builder
.thenDelete(delete
));
267 public boolean thenMutate(RowMutations mutation
) throws IOException
{
268 return FutureUtils
.get(builder
.thenMutate(mutation
));
274 public CheckAndMutateWithFilterBuilder
checkAndMutate(byte[] row
, Filter filter
) {
275 return new CheckAndMutateWithFilterBuilder() {
276 private final AsyncTable
.CheckAndMutateWithFilterBuilder builder
=
277 table
.checkAndMutate(row
, filter
);
280 public CheckAndMutateWithFilterBuilder
timeRange(TimeRange timeRange
) {
281 builder
.timeRange(timeRange
);
286 public boolean thenPut(Put put
) throws IOException
{
287 return FutureUtils
.get(builder
.thenPut(put
));
291 public boolean thenDelete(Delete delete
) throws IOException
{
292 return FutureUtils
.get(builder
.thenDelete(delete
));
296 public boolean thenMutate(RowMutations mutation
) throws IOException
{
297 return FutureUtils
.get(builder
.thenMutate(mutation
));
303 public CheckAndMutateResult
checkAndMutate(CheckAndMutate checkAndMutate
) throws IOException
{
304 return FutureUtils
.get(table
.checkAndMutate(checkAndMutate
));
308 public List
<CheckAndMutateResult
> checkAndMutate(List
<CheckAndMutate
> checkAndMutates
)
310 return FutureUtils
.get(table
.checkAndMutateAll(checkAndMutates
));
314 public Result
mutateRow(RowMutations rm
) throws IOException
{
315 return FutureUtils
.get(table
.mutateRow(rm
));
319 public Result
append(Append append
) throws IOException
{
320 return FutureUtils
.get(table
.append(append
));
324 public Result
increment(Increment increment
) throws IOException
{
325 return FutureUtils
.get(table
.increment(increment
));
329 public long incrementColumnValue(byte[] row
, byte[] family
, byte[] qualifier
, long amount
)
331 return FutureUtils
.get(table
.incrementColumnValue(row
, family
, qualifier
, amount
));
335 public long incrementColumnValue(byte[] row
, byte[] family
, byte[] qualifier
, long amount
,
336 Durability durability
) throws IOException
{
337 return FutureUtils
.get(table
.incrementColumnValue(row
, family
, qualifier
, amount
, durability
));
341 public void close() {
344 @SuppressWarnings("deprecation")
345 private static final class RegionCoprocessorRpcChannel
extends RegionCoprocessorRpcChannelImpl
346 implements CoprocessorRpcChannel
{
348 RegionCoprocessorRpcChannel(AsyncConnectionImpl conn
, TableName tableName
, RegionInfo region
,
349 byte[] row
, long rpcTimeoutNs
, long operationTimeoutNs
) {
350 super(conn
, tableName
, region
, row
, rpcTimeoutNs
, operationTimeoutNs
);
354 public void callMethod(MethodDescriptor method
, RpcController controller
, Message request
,
355 Message responsePrototype
, RpcCallback
<Message
> done
) {
356 ClientCoprocessorRpcController c
= new ClientCoprocessorRpcController();
357 CoprocessorBlockingRpcCallback
<Message
> callback
= new CoprocessorBlockingRpcCallback
<>();
358 super.callMethod(method
, c
, request
, responsePrototype
, callback
);
361 ret
= callback
.get();
362 } catch (IOException e
) {
363 setCoprocessorError(controller
, e
);
367 setCoprocessorError(controller
, c
.getFailed());
373 public Message
callBlockingMethod(MethodDescriptor method
, RpcController controller
,
374 Message request
, Message responsePrototype
) throws ServiceException
{
375 ClientCoprocessorRpcController c
= new ClientCoprocessorRpcController();
376 CoprocessorBlockingRpcCallback
<Message
> done
= new CoprocessorBlockingRpcCallback
<>();
377 callMethod(method
, c
, request
, responsePrototype
, done
);
381 } catch (IOException e
) {
382 throw new ServiceException(e
);
385 setCoprocessorError(controller
, c
.getFailed());
386 throw new ServiceException(c
.getFailed());
393 public RegionCoprocessorRpcChannel
coprocessorService(byte[] row
) {
394 return new RegionCoprocessorRpcChannel(conn
, getName(), null, row
,
395 getRpcTimeout(TimeUnit
.NANOSECONDS
), getOperationTimeout(TimeUnit
.NANOSECONDS
));
399 * Get the corresponding start keys and regions for an arbitrary range of keys.
401 * @param startKey Starting row in range, inclusive
402 * @param endKey Ending row in range
403 * @param includeEndKey true if endRow is inclusive, false if exclusive
404 * @return A pair of list of start keys and list of HRegionLocations that contain the specified
406 * @throws IOException if a remote or network exception occurs
408 private Pair
<List
<byte[]>, List
<HRegionLocation
>> getKeysAndRegionsInRange(final byte[] startKey
,
409 final byte[] endKey
, final boolean includeEndKey
) throws IOException
{
410 return getKeysAndRegionsInRange(startKey
, endKey
, includeEndKey
, false);
414 * Get the corresponding start keys and regions for an arbitrary range of keys.
416 * @param startKey Starting row in range, inclusive
417 * @param endKey Ending row in range
418 * @param includeEndKey true if endRow is inclusive, false if exclusive
419 * @param reload true to reload information or false to use cached information
420 * @return A pair of list of start keys and list of HRegionLocations that contain the specified
422 * @throws IOException if a remote or network exception occurs
424 private Pair
<List
<byte[]>, List
<HRegionLocation
>> getKeysAndRegionsInRange(final byte[] startKey
,
425 final byte[] endKey
, final boolean includeEndKey
, final boolean reload
) throws IOException
{
426 final boolean endKeyIsEndOfTable
= Bytes
.equals(endKey
, HConstants
.EMPTY_END_ROW
);
427 if ((Bytes
.compareTo(startKey
, endKey
) > 0) && !endKeyIsEndOfTable
) {
428 throw new IllegalArgumentException(
429 "Invalid range: " + Bytes
.toStringBinary(startKey
) + " > " + Bytes
.toStringBinary(endKey
));
431 List
<byte[]> keysInRange
= new ArrayList
<>();
432 List
<HRegionLocation
> regionsInRange
= new ArrayList
<>();
433 byte[] currentKey
= startKey
;
435 HRegionLocation regionLocation
=
436 FutureUtils
.get(conn
.getRegionLocator(getName()).getRegionLocation(currentKey
, reload
));
437 keysInRange
.add(currentKey
);
438 regionsInRange
.add(regionLocation
);
439 currentKey
= regionLocation
.getRegion().getEndKey();
440 } while (!Bytes
.equals(currentKey
, HConstants
.EMPTY_END_ROW
) &&
441 (endKeyIsEndOfTable
|| Bytes
.compareTo(currentKey
, endKey
) < 0 ||
442 (includeEndKey
&& Bytes
.compareTo(currentKey
, endKey
) == 0)));
443 return new Pair
<>(keysInRange
, regionsInRange
);
446 private List
<byte[]> getStartKeysInRange(byte[] start
, byte[] end
) throws IOException
{
448 start
= HConstants
.EMPTY_START_ROW
;
451 end
= HConstants
.EMPTY_END_ROW
;
453 return getKeysAndRegionsInRange(start
, end
, true).getFirst();
457 private interface StubCall
<R
> {
458 R
call(RegionCoprocessorRpcChannel channel
) throws Exception
;
461 private <R
> void coprocssorService(String serviceName
, byte[] startKey
, byte[] endKey
,
462 Callback
<R
> callback
, StubCall
<R
> call
) throws Throwable
{
463 // get regions covered by the row range
464 ExecutorService pool
= this.poolSupplier
.get();
465 List
<byte[]> keys
= getStartKeysInRange(startKey
, endKey
);
466 Map
<byte[], Future
<R
>> futures
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
468 for (byte[] r
: keys
) {
469 RegionCoprocessorRpcChannel channel
= coprocessorService(r
);
470 Future
<R
> future
= pool
.submit(new Callable
<R
>() {
472 public R
call() throws Exception
{
473 R result
= call
.call(channel
);
474 byte[] region
= channel
.getLastRegion();
475 if (callback
!= null) {
476 callback
.update(region
, r
, result
);
481 futures
.put(r
, future
);
483 } catch (RejectedExecutionException e
) {
484 // maybe the connection has been closed, let's check
485 if (conn
.isClosed()) {
486 throw new DoNotRetryIOException("Connection is closed", e
);
488 throw new HBaseIOException("Coprocessor operation is rejected", e
);
491 for (Map
.Entry
<byte[], Future
<R
>> e
: futures
.entrySet()) {
494 } catch (ExecutionException ee
) {
495 LOG
.warn("Error calling coprocessor service " + serviceName
+ " for row " +
496 Bytes
.toStringBinary(e
.getKey()), ee
);
498 } catch (InterruptedException ie
) {
499 throw new InterruptedIOException("Interrupted calling coprocessor service " + serviceName
+
500 " for row " + Bytes
.toStringBinary(e
.getKey())).initCause(ie
);
506 public <T
extends Service
, R
> void coprocessorService(Class
<T
> service
, byte[] startKey
,
507 byte[] endKey
, Call
<T
, R
> callable
, Callback
<R
> callback
) throws ServiceException
, Throwable
{
508 coprocssorService(service
.getName(), startKey
, endKey
, callback
, channel
-> {
509 T instance
= ProtobufUtil
.newServiceStub(service
, channel
);
510 return callable
.call(instance
);
514 @SuppressWarnings("unchecked")
516 public <R
extends Message
> void batchCoprocessorService(MethodDescriptor methodDescriptor
,
517 Message request
, byte[] startKey
, byte[] endKey
, R responsePrototype
, Callback
<R
> callback
)
518 throws ServiceException
, Throwable
{
519 coprocssorService(methodDescriptor
.getFullName(), startKey
, endKey
, callback
, channel
-> {
520 return (R
) channel
.callBlockingMethod(methodDescriptor
, null, request
, responsePrototype
);
525 public long getRpcTimeout(TimeUnit unit
) {
526 return table
.getRpcTimeout(unit
);
530 public long getReadRpcTimeout(TimeUnit unit
) {
531 return table
.getReadRpcTimeout(unit
);
535 public long getWriteRpcTimeout(TimeUnit unit
) {
536 return table
.getWriteRpcTimeout(unit
);
540 public long getOperationTimeout(TimeUnit unit
) {
541 return table
.getOperationTimeout(unit
);
545 public RegionLocator
getRegionLocator() throws IOException
{
546 return conn
.toConnection().getRegionLocator(getName());