2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static java
.util
.stream
.Collectors
.toList
;
21 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.checkHasFamilies
;
22 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.isEmptyStopRow
;
23 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.timelineConsistentRead
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.validatePut
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.validatePutsInRowMutations
;
26 import static org
.apache
.hadoop
.hbase
.trace
.TraceUtil
.tracedFuture
;
27 import static org
.apache
.hadoop
.hbase
.trace
.TraceUtil
.tracedFutures
;
28 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
30 import io
.opentelemetry
.api
.trace
.Span
;
31 import java
.io
.IOException
;
32 import java
.util
.ArrayList
;
33 import java
.util
.Arrays
;
34 import java
.util
.List
;
35 import java
.util
.concurrent
.CompletableFuture
;
36 import java
.util
.concurrent
.TimeUnit
;
37 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
38 import java
.util
.concurrent
.atomic
.AtomicInteger
;
39 import java
.util
.function
.Function
;
40 import java
.util
.function
.Supplier
;
41 import org
.apache
.hadoop
.conf
.Configuration
;
42 import org
.apache
.hadoop
.hbase
.CompareOperator
;
43 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
44 import org
.apache
.hadoop
.hbase
.HConstants
;
45 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
46 import org
.apache
.hadoop
.hbase
.TableName
;
47 import org
.apache
.hadoop
.hbase
.client
.AsyncRpcRetryingCallerFactory
.SingleRequestCallerBuilder
;
48 import org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.Converter
;
49 import org
.apache
.hadoop
.hbase
.client
.trace
.TableOperationSpanBuilder
;
50 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
51 import org
.apache
.hadoop
.hbase
.io
.TimeRange
;
52 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
53 import org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
;
54 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
55 import org
.apache
.hadoop
.hbase
.util
.ReflectionUtils
;
56 import org
.apache
.yetus
.audience
.InterfaceAudience
;
57 import org
.slf4j
.Logger
;
58 import org
.slf4j
.LoggerFactory
;
60 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
62 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcChannel
;
63 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.Timer
;
65 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
66 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.RequestConverter
;
67 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ResponseConverter
;
68 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.GetRequest
;
70 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.GetResponse
;
71 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MultiRequest
;
72 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MultiResponse
;
73 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutateRequest
;
74 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutateResponse
;
77 * The implementation of RawAsyncTable.
79 * The word 'Raw' means that this is a low level class. The returned {@link CompletableFuture} will
80 * be finished inside the rpc framework thread, which means that the callbacks registered to the
81 * {@link CompletableFuture} will also be executed inside the rpc framework thread. So users who use
82 * this class should not try to do time consuming tasks in the callbacks.
86 @InterfaceAudience.Private
87 class RawAsyncTableImpl
implements AsyncTable
<AdvancedScanResultConsumer
> {
89 private static final Logger LOG
= LoggerFactory
.getLogger(RawAsyncTableImpl
.class);
91 private final AsyncConnectionImpl conn
;
93 private final Timer retryTimer
;
95 private final TableName tableName
;
97 private final int defaultScannerCaching
;
99 private final long defaultScannerMaxResultSize
;
101 private final long rpcTimeoutNs
;
103 private final long readRpcTimeoutNs
;
105 private final long writeRpcTimeoutNs
;
107 private final long operationTimeoutNs
;
109 private final long scanTimeoutNs
;
111 private final long pauseNs
;
113 private final long pauseForCQTBENs
;
115 private final int maxAttempts
;
117 private final int startLogErrorsCnt
;
119 RawAsyncTableImpl(AsyncConnectionImpl conn
, Timer retryTimer
, AsyncTableBuilderBase
<?
> builder
) {
121 this.retryTimer
= retryTimer
;
122 this.tableName
= builder
.tableName
;
123 this.rpcTimeoutNs
= builder
.rpcTimeoutNs
;
124 this.readRpcTimeoutNs
= builder
.readRpcTimeoutNs
;
125 this.writeRpcTimeoutNs
= builder
.writeRpcTimeoutNs
;
126 this.operationTimeoutNs
= builder
.operationTimeoutNs
;
127 this.scanTimeoutNs
= builder
.scanTimeoutNs
;
128 this.pauseNs
= builder
.pauseNs
;
129 if (builder
.pauseForCQTBENs
< builder
.pauseNs
) {
131 "Configured value of pauseForCQTBENs is {} ms, which is less than" +
132 " the normal pause value {} ms, use the greater one instead",
133 TimeUnit
.NANOSECONDS
.toMillis(builder
.pauseForCQTBENs
),
134 TimeUnit
.NANOSECONDS
.toMillis(builder
.pauseNs
));
135 this.pauseForCQTBENs
= builder
.pauseNs
;
137 this.pauseForCQTBENs
= builder
.pauseForCQTBENs
;
139 this.maxAttempts
= builder
.maxAttempts
;
140 this.startLogErrorsCnt
= builder
.startLogErrorsCnt
;
141 this.defaultScannerCaching
= tableName
.isSystemTable() ? conn
.connConf
.getMetaScannerCaching() :
142 conn
.connConf
.getScannerCaching();
143 this.defaultScannerMaxResultSize
= conn
.connConf
.getScannerMaxResultSize();
147 public TableName
getName() {
152 public Configuration
getConfiguration() {
153 return conn
.getConfiguration();
157 public CompletableFuture
<TableDescriptor
> getDescriptor() {
158 return conn
.getAdmin().getDescriptor(tableName
);
162 public AsyncTableRegionLocator
getRegionLocator() {
163 return conn
.getRegionLocator(tableName
);
166 private static <REQ
, RESP
> CompletableFuture
<RESP
> mutate(HBaseRpcController controller
,
167 HRegionLocation loc
, ClientService
.Interface stub
, REQ req
,
168 Converter
<MutateRequest
, byte[], REQ
> reqConvert
,
169 Converter
<RESP
, HBaseRpcController
, MutateResponse
> respConverter
) {
170 return ConnectionUtils
.call(controller
, loc
, stub
, req
, reqConvert
,
171 (s
, c
, r
, done
) -> s
.mutate(c
, r
, done
), respConverter
);
174 private static <REQ
> CompletableFuture
<Void
> voidMutate(HBaseRpcController controller
,
175 HRegionLocation loc
, ClientService
.Interface stub
, REQ req
,
176 Converter
<MutateRequest
, byte[], REQ
> reqConvert
) {
177 return mutate(controller
, loc
, stub
, req
, reqConvert
, (c
, resp
) -> {
182 private static Result
toResult(HBaseRpcController controller
, MutateResponse resp
)
184 if (!resp
.hasResult()) {
187 return ProtobufUtil
.toResult(resp
.getResult(), controller
.cellScanner());
191 private interface NoncedConverter
<D
, I
, S
> {
192 D
convert(I info
, S src
, long nonceGroup
, long nonce
) throws IOException
;
195 private <REQ
, RESP
> CompletableFuture
<RESP
> noncedMutate(long nonceGroup
, long nonce
,
196 HBaseRpcController controller
, HRegionLocation loc
, ClientService
.Interface stub
, REQ req
,
197 NoncedConverter
<MutateRequest
, byte[], REQ
> reqConvert
,
198 Converter
<RESP
, HBaseRpcController
, MutateResponse
> respConverter
) {
199 return mutate(controller
, loc
, stub
, req
,
200 (info
, src
) -> reqConvert
.convert(info
, src
, nonceGroup
, nonce
), respConverter
);
203 private <T
> SingleRequestCallerBuilder
<T
> newCaller(byte[] row
, int priority
, long rpcTimeoutNs
) {
204 return conn
.callerFactory
.<T
> single().table(tableName
).row(row
).priority(priority
)
205 .rpcTimeout(rpcTimeoutNs
, TimeUnit
.NANOSECONDS
)
206 .operationTimeout(operationTimeoutNs
, TimeUnit
.NANOSECONDS
)
207 .pause(pauseNs
, TimeUnit
.NANOSECONDS
).pauseForCQTBE(pauseForCQTBENs
, TimeUnit
.NANOSECONDS
)
208 .maxAttempts(maxAttempts
).startLogErrorsCnt(startLogErrorsCnt
);
211 private <T
, R
extends OperationWithAttributes
& Row
> SingleRequestCallerBuilder
<T
>
212 newCaller(R row
, long rpcTimeoutNs
) {
213 return newCaller(row
.getRow(), row
.getPriority(), rpcTimeoutNs
);
216 private CompletableFuture
<Result
> get(Get get
, int replicaId
) {
217 return this.<Result
, Get
> newCaller(get
, readRpcTimeoutNs
)
218 .action((controller
, loc
, stub
) -> ConnectionUtils
219 .<Get
, GetRequest
, GetResponse
, Result
> call(controller
, loc
, stub
, get
,
220 RequestConverter
::buildGetRequest
, (s
, c
, req
, done
) -> s
.get(c
, req
, done
),
221 (c
, resp
) -> ProtobufUtil
.toResult(resp
.getResult(), c
.cellScanner())))
222 .replicaId(replicaId
).call();
225 private TableOperationSpanBuilder
newTableOperationSpanBuilder() {
226 return new TableOperationSpanBuilder(conn
).setTableName(tableName
);
230 public CompletableFuture
<Result
> get(Get get
) {
231 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
234 () -> timelineConsistentRead(conn
.getLocator(), tableName
, get
, get
.getRow(),
235 RegionLocateType
.CURRENT
, replicaId
-> get(get
, replicaId
), readRpcTimeoutNs
,
236 conn
.connConf
.getPrimaryCallTimeoutNs(), retryTimer
, conn
.getConnectionMetrics()),
241 public CompletableFuture
<Void
> put(Put put
) {
242 validatePut(put
, conn
.connConf
.getMaxKeyValueSize());
243 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
245 return tracedFuture(() -> this.<Void
, Put
> newCaller(put
, writeRpcTimeoutNs
)
246 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.<Put
> voidMutate(controller
, loc
, stub
,
247 put
, RequestConverter
::buildMutateRequest
))
252 public CompletableFuture
<Void
> delete(Delete delete
) {
253 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
254 .setOperation(delete
);
256 () -> this.<Void
, Delete
> newCaller(delete
, writeRpcTimeoutNs
)
257 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.<Delete
> voidMutate(controller
, loc
,
258 stub
, delete
, RequestConverter
::buildMutateRequest
))
264 public CompletableFuture
<Result
> append(Append append
) {
265 checkHasFamilies(append
);
266 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
267 .setOperation(append
);
268 return tracedFuture(() -> {
269 long nonceGroup
= conn
.getNonceGenerator().getNonceGroup();
270 long nonce
= conn
.getNonceGenerator().newNonce();
271 return this.<Result
, Append
> newCaller(append
, rpcTimeoutNs
)
272 .action((controller
, loc
, stub
) -> this.<Append
, Result
> noncedMutate(nonceGroup
, nonce
,
273 controller
, loc
, stub
, append
, RequestConverter
::buildMutateRequest
,
274 RawAsyncTableImpl
::toResult
))
280 public CompletableFuture
<Result
> increment(Increment increment
) {
281 checkHasFamilies(increment
);
282 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
283 .setOperation(increment
);
284 return tracedFuture(() -> {
285 long nonceGroup
= conn
.getNonceGenerator().getNonceGroup();
286 long nonce
= conn
.getNonceGenerator().newNonce();
287 return this.<Result
, Increment
> newCaller(increment
, rpcTimeoutNs
)
288 .action((controller
, loc
, stub
) -> this.<Increment
, Result
> noncedMutate(nonceGroup
, nonce
,
289 controller
, loc
, stub
, increment
, RequestConverter
::buildMutateRequest
,
290 RawAsyncTableImpl
::toResult
))
295 private final class CheckAndMutateBuilderImpl
implements CheckAndMutateBuilder
{
297 private final byte[] row
;
299 private final byte[] family
;
301 private byte[] qualifier
;
303 private TimeRange timeRange
;
305 private CompareOperator op
;
307 private byte[] value
;
309 public CheckAndMutateBuilderImpl(byte[] row
, byte[] family
) {
310 this.row
= Preconditions
.checkNotNull(row
, "row is null");
311 this.family
= Preconditions
.checkNotNull(family
, "family is null");
315 public CheckAndMutateBuilder
qualifier(byte[] qualifier
) {
316 this.qualifier
= Preconditions
.checkNotNull(qualifier
, "qualifier is null. Consider using" +
317 " an empty byte array, or just do not call this method if you want a null qualifier");
322 public CheckAndMutateBuilder
timeRange(TimeRange timeRange
) {
323 this.timeRange
= timeRange
;
328 public CheckAndMutateBuilder
ifNotExists() {
329 this.op
= CompareOperator
.EQUAL
;
335 public CheckAndMutateBuilder
ifMatches(CompareOperator compareOp
, byte[] value
) {
336 this.op
= Preconditions
.checkNotNull(compareOp
, "compareOp is null");
337 this.value
= Preconditions
.checkNotNull(value
, "value is null");
341 private void preCheck() {
342 Preconditions
.checkNotNull(op
, "condition is null. You need to specify the condition by" +
343 " calling ifNotExists/ifEquals/ifMatches before executing the request");
347 public CompletableFuture
<Boolean
> thenPut(Put put
) {
348 validatePut(put
, conn
.connConf
.getMaxKeyValueSize());
350 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
351 .setOperation(HBaseSemanticAttributes
.Operation
.CHECK_AND_MUTATE
);
353 () -> RawAsyncTableImpl
.this.<Boolean
> newCaller(row
, put
.getPriority(), rpcTimeoutNs
)
354 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.mutate(controller
, loc
, stub
, put
,
355 (rn
, p
) -> RequestConverter
.buildMutateRequest(rn
, row
, family
, qualifier
, op
, value
,
356 null, timeRange
, p
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
),
357 (c
, r
) -> r
.getProcessed()))
363 public CompletableFuture
<Boolean
> thenDelete(Delete delete
) {
365 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
366 .setOperation(HBaseSemanticAttributes
.Operation
.CHECK_AND_MUTATE
);
368 () -> RawAsyncTableImpl
.this.<Boolean
> newCaller(row
, delete
.getPriority(), rpcTimeoutNs
)
369 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.mutate(controller
, loc
, stub
, delete
,
370 (rn
, d
) -> RequestConverter
.buildMutateRequest(rn
, row
, family
, qualifier
, op
, value
,
371 null, timeRange
, d
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
),
372 (c
, r
) -> r
.getProcessed()))
378 public CompletableFuture
<Boolean
> thenMutate(RowMutations mutations
) {
380 validatePutsInRowMutations(mutations
, conn
.connConf
.getMaxKeyValueSize());
381 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
382 .setOperation(HBaseSemanticAttributes
.Operation
.CHECK_AND_MUTATE
);
384 () -> RawAsyncTableImpl
.this
385 .<Boolean
> newCaller(row
, mutations
.getMaxPriority(), rpcTimeoutNs
)
386 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.this.mutateRow(controller
, loc
, stub
,
388 (rn
, rm
) -> RequestConverter
.buildMultiRequest(rn
, row
, family
, qualifier
, op
, value
,
389 null, timeRange
, rm
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
),
390 CheckAndMutateResult
::isSuccess
))
397 public CheckAndMutateBuilder
checkAndMutate(byte[] row
, byte[] family
) {
398 return new CheckAndMutateBuilderImpl(row
, family
);
401 private final class CheckAndMutateWithFilterBuilderImpl
402 implements CheckAndMutateWithFilterBuilder
{
404 private final byte[] row
;
406 private final Filter filter
;
408 private TimeRange timeRange
;
410 public CheckAndMutateWithFilterBuilderImpl(byte[] row
, Filter filter
) {
411 this.row
= Preconditions
.checkNotNull(row
, "row is null");
412 this.filter
= Preconditions
.checkNotNull(filter
, "filter is null");
416 public CheckAndMutateWithFilterBuilder
timeRange(TimeRange timeRange
) {
417 this.timeRange
= timeRange
;
422 public CompletableFuture
<Boolean
> thenPut(Put put
) {
423 validatePut(put
, conn
.connConf
.getMaxKeyValueSize());
424 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
425 .setOperation(HBaseSemanticAttributes
.Operation
.CHECK_AND_MUTATE
);
427 () -> RawAsyncTableImpl
.this.<Boolean
> newCaller(row
, put
.getPriority(), rpcTimeoutNs
)
428 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.mutate(controller
, loc
,
430 (rn
, p
) -> RequestConverter
.buildMutateRequest(rn
, row
, null, null, null, null,
431 filter
, timeRange
, p
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
),
432 (c
, r
) -> r
.getProcessed()))
438 public CompletableFuture
<Boolean
> thenDelete(Delete delete
) {
439 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
440 .setOperation(HBaseSemanticAttributes
.Operation
.CHECK_AND_MUTATE
);
442 () -> RawAsyncTableImpl
.this.<Boolean
> newCaller(row
, delete
.getPriority(), rpcTimeoutNs
)
443 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.mutate(controller
, loc
, stub
, delete
,
444 (rn
, d
) -> RequestConverter
.buildMutateRequest(rn
, row
, null, null, null, null, filter
,
445 timeRange
, d
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
),
446 (c
, r
) -> r
.getProcessed()))
452 public CompletableFuture
<Boolean
> thenMutate(RowMutations mutations
) {
453 validatePutsInRowMutations(mutations
, conn
.connConf
.getMaxKeyValueSize());
454 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
455 .setOperation(HBaseSemanticAttributes
.Operation
.CHECK_AND_MUTATE
);
457 () -> RawAsyncTableImpl
.this
458 .<Boolean
> newCaller(row
, mutations
.getMaxPriority(), rpcTimeoutNs
)
459 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.this.mutateRow(controller
, loc
, stub
,
461 (rn
, rm
) -> RequestConverter
.buildMultiRequest(rn
, row
, null, null, null, null, filter
,
462 timeRange
, rm
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
),
463 CheckAndMutateResult
::isSuccess
))
470 public CheckAndMutateWithFilterBuilder
checkAndMutate(byte[] row
, Filter filter
) {
471 return new CheckAndMutateWithFilterBuilderImpl(row
, filter
);
475 public CompletableFuture
<CheckAndMutateResult
> checkAndMutate(CheckAndMutate checkAndMutate
) {
476 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
477 .setOperation(checkAndMutate
);
478 return tracedFuture(() -> {
479 if (checkAndMutate
.getAction() instanceof Put
||
480 checkAndMutate
.getAction() instanceof Delete
||
481 checkAndMutate
.getAction() instanceof Increment
||
482 checkAndMutate
.getAction() instanceof Append
) {
483 Mutation mutation
= (Mutation
) checkAndMutate
.getAction();
484 if (mutation
instanceof Put
) {
485 validatePut((Put
) mutation
, conn
.connConf
.getMaxKeyValueSize());
487 long nonceGroup
= conn
.getNonceGenerator().getNonceGroup();
488 long nonce
= conn
.getNonceGenerator().newNonce();
489 return RawAsyncTableImpl
.this
490 .<CheckAndMutateResult
> newCaller(checkAndMutate
.getRow(), mutation
.getPriority(),
493 (controller
, loc
, stub
) -> RawAsyncTableImpl
.mutate(controller
, loc
, stub
, mutation
,
494 (rn
, m
) -> RequestConverter
.buildMutateRequest(rn
, checkAndMutate
.getRow(),
495 checkAndMutate
.getFamily(), checkAndMutate
.getQualifier(),
496 checkAndMutate
.getCompareOp(), checkAndMutate
.getValue(),
497 checkAndMutate
.getFilter(), checkAndMutate
.getTimeRange(), m
, nonceGroup
, nonce
),
498 (c
, r
) -> ResponseConverter
.getCheckAndMutateResult(r
, c
.cellScanner())))
500 } else if (checkAndMutate
.getAction() instanceof RowMutations
) {
501 RowMutations rowMutations
= (RowMutations
) checkAndMutate
.getAction();
502 validatePutsInRowMutations(rowMutations
, conn
.connConf
.getMaxKeyValueSize());
503 long nonceGroup
= conn
.getNonceGenerator().getNonceGroup();
504 long nonce
= conn
.getNonceGenerator().newNonce();
505 return RawAsyncTableImpl
.this
506 .<CheckAndMutateResult
> newCaller(checkAndMutate
.getRow(), rowMutations
.getMaxPriority(),
508 .action((controller
, loc
, stub
) -> RawAsyncTableImpl
.this
509 .<CheckAndMutateResult
, CheckAndMutateResult
> mutateRow(controller
, loc
, stub
,
511 (rn
, rm
) -> RequestConverter
.buildMultiRequest(rn
, checkAndMutate
.getRow(),
512 checkAndMutate
.getFamily(), checkAndMutate
.getQualifier(),
513 checkAndMutate
.getCompareOp(), checkAndMutate
.getValue(),
514 checkAndMutate
.getFilter(), checkAndMutate
.getTimeRange(), rm
, nonceGroup
, nonce
),
518 CompletableFuture
<CheckAndMutateResult
> future
= new CompletableFuture
<>();
519 future
.completeExceptionally(new DoNotRetryIOException(
520 "CheckAndMutate doesn't support " + checkAndMutate
.getAction().getClass().getName()));
527 public List
<CompletableFuture
<CheckAndMutateResult
>>
528 checkAndMutate(List
<CheckAndMutate
> checkAndMutates
) {
529 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
530 .setOperation(checkAndMutates
);
531 return tracedFutures(
532 () -> batch(checkAndMutates
, rpcTimeoutNs
).stream()
533 .map(f
-> f
.thenApply(r
-> (CheckAndMutateResult
) r
)).collect(toList()),
537 // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
538 // so here I write a new method as I do not want to change the abstraction of call method.
539 @SuppressWarnings("unchecked")
540 private <RES
, RESP
> CompletableFuture
<RESP
> mutateRow(HBaseRpcController controller
,
541 HRegionLocation loc
, ClientService
.Interface stub
, RowMutations mutation
,
542 Converter
<MultiRequest
, byte[], RowMutations
> reqConvert
, Function
<RES
, RESP
> respConverter
) {
543 CompletableFuture
<RESP
> future
= new CompletableFuture
<>();
545 byte[] regionName
= loc
.getRegion().getRegionName();
546 MultiRequest req
= reqConvert
.convert(regionName
, mutation
);
547 stub
.multi(controller
, req
, new RpcCallback
<MultiResponse
>() {
550 public void run(MultiResponse resp
) {
551 if (controller
.failed()) {
552 future
.completeExceptionally(controller
.getFailed());
555 org
.apache
.hadoop
.hbase
.client
.MultiResponse multiResp
=
556 ResponseConverter
.getResults(req
, resp
, controller
.cellScanner());
557 ConnectionUtils
.updateStats(conn
.getStatisticsTracker(), conn
.getConnectionMetrics(),
558 loc
.getServerName(), multiResp
);
559 Throwable ex
= multiResp
.getException(regionName
);
561 future
.completeExceptionally(ex
instanceof IOException ? ex
:
563 "Failed to mutate row: " + Bytes
.toStringBinary(mutation
.getRow()), ex
));
566 respConverter
.apply((RES
) multiResp
.getResults().get(regionName
).result
.get(0)));
568 } catch (IOException e
) {
569 future
.completeExceptionally(e
);
574 } catch (IOException e
) {
575 future
.completeExceptionally(e
);
581 public CompletableFuture
<Result
> mutateRow(RowMutations mutations
) {
582 validatePutsInRowMutations(mutations
, conn
.connConf
.getMaxKeyValueSize());
583 long nonceGroup
= conn
.getNonceGenerator().getNonceGroup();
584 long nonce
= conn
.getNonceGenerator().newNonce();
585 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
586 .setOperation(mutations
);
589 .<Result
> newCaller(mutations
.getRow(), mutations
.getMaxPriority(), writeRpcTimeoutNs
)
590 .action((controller
, loc
, stub
) -> this.<Result
, Result
> mutateRow(controller
, loc
, stub
,
591 mutations
, (rn
, rm
) -> RequestConverter
.buildMultiRequest(rn
, rm
, nonceGroup
, nonce
),
597 private Scan
setDefaultScanConfig(Scan scan
) {
598 // always create a new scan object as we may reset the start row later.
599 Scan newScan
= ReflectionUtils
.newInstance(scan
.getClass(), scan
);
600 if (newScan
.getCaching() <= 0) {
601 newScan
.setCaching(defaultScannerCaching
);
603 if (newScan
.getMaxResultSize() <= 0) {
604 newScan
.setMaxResultSize(defaultScannerMaxResultSize
);
610 public void scan(Scan scan
, AdvancedScanResultConsumer consumer
) {
611 new AsyncClientScanner(setDefaultScanConfig(scan
), consumer
, tableName
, conn
, retryTimer
,
612 pauseNs
, pauseForCQTBENs
, maxAttempts
, scanTimeoutNs
, readRpcTimeoutNs
, startLogErrorsCnt
)
616 private long resultSize2CacheSize(long maxResultSize
) {
618 return maxResultSize
> Long
.MAX_VALUE
/ 2 ? maxResultSize
: maxResultSize
* 2;
622 public ResultScanner
getScanner(Scan scan
) {
623 return new AsyncTableResultScanner(this, ReflectionUtils
.newInstance(scan
.getClass(), scan
),
624 resultSize2CacheSize(
625 scan
.getMaxResultSize() > 0 ? scan
.getMaxResultSize() : defaultScannerMaxResultSize
));
629 public CompletableFuture
<List
<Result
>> scanAll(Scan scan
) {
630 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
632 return tracedFuture(() -> {
633 CompletableFuture
<List
<Result
>> future
= new CompletableFuture
<>();
634 List
<Result
> scanResults
= new ArrayList
<>();
635 scan(scan
, new AdvancedScanResultConsumer() {
638 public void onNext(Result
[] results
, ScanController controller
) {
639 scanResults
.addAll(Arrays
.asList(results
));
643 public void onError(Throwable error
) {
644 future
.completeExceptionally(error
);
648 public void onComplete() {
649 future
.complete(scanResults
);
657 public List
<CompletableFuture
<Result
>> get(List
<Get
> gets
) {
658 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
660 return tracedFutures(() -> batch(gets
, readRpcTimeoutNs
), supplier
);
664 public List
<CompletableFuture
<Void
>> put(List
<Put
> puts
) {
665 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
667 return tracedFutures(() -> voidMutate(puts
), supplier
);
671 public List
<CompletableFuture
<Void
>> delete(List
<Delete
> deletes
) {
672 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
673 .setOperation(deletes
);
674 return tracedFutures(() -> voidMutate(deletes
), supplier
);
678 public <T
> List
<CompletableFuture
<T
>> batch(List
<?
extends Row
> actions
) {
679 final Supplier
<Span
> supplier
= newTableOperationSpanBuilder()
680 .setOperation(actions
);
681 return tracedFutures(() -> batch(actions
, rpcTimeoutNs
), supplier
);
684 private List
<CompletableFuture
<Void
>> voidMutate(List
<?
extends Row
> actions
) {
685 return this.<Object
> batch(actions
, writeRpcTimeoutNs
).stream()
686 .map(f
-> f
.<Void
> thenApply(r
-> null)).collect(toList());
689 private <T
> List
<CompletableFuture
<T
>> batch(List
<?
extends Row
> actions
, long rpcTimeoutNs
) {
690 for (Row action
: actions
) {
691 if (action
instanceof Put
) {
692 validatePut((Put
) action
, conn
.connConf
.getMaxKeyValueSize());
693 } else if (action
instanceof CheckAndMutate
) {
694 CheckAndMutate checkAndMutate
= (CheckAndMutate
) action
;
695 if (checkAndMutate
.getAction() instanceof Put
) {
696 validatePut((Put
) checkAndMutate
.getAction(), conn
.connConf
.getMaxKeyValueSize());
697 } else if (checkAndMutate
.getAction() instanceof RowMutations
) {
698 validatePutsInRowMutations((RowMutations
) checkAndMutate
.getAction(),
699 conn
.connConf
.getMaxKeyValueSize());
701 } else if (action
instanceof RowMutations
) {
702 validatePutsInRowMutations((RowMutations
) action
, conn
.connConf
.getMaxKeyValueSize());
705 return conn
.callerFactory
.batch().table(tableName
).actions(actions
)
706 .operationTimeout(operationTimeoutNs
, TimeUnit
.NANOSECONDS
)
707 .rpcTimeout(rpcTimeoutNs
, TimeUnit
.NANOSECONDS
).pause(pauseNs
, TimeUnit
.NANOSECONDS
)
708 .pauseForCQTBE(pauseForCQTBENs
, TimeUnit
.NANOSECONDS
).maxAttempts(maxAttempts
)
709 .startLogErrorsCnt(startLogErrorsCnt
).call();
713 public long getRpcTimeout(TimeUnit unit
) {
714 return unit
.convert(rpcTimeoutNs
, TimeUnit
.NANOSECONDS
);
718 public long getReadRpcTimeout(TimeUnit unit
) {
719 return unit
.convert(readRpcTimeoutNs
, TimeUnit
.NANOSECONDS
);
723 public long getWriteRpcTimeout(TimeUnit unit
) {
724 return unit
.convert(writeRpcTimeoutNs
, TimeUnit
.NANOSECONDS
);
728 public long getOperationTimeout(TimeUnit unit
) {
729 return unit
.convert(operationTimeoutNs
, TimeUnit
.NANOSECONDS
);
733 public long getScanTimeout(TimeUnit unit
) {
734 return unit
.convert(scanTimeoutNs
, TimeUnit
.NANOSECONDS
);
737 private <S
, R
> CompletableFuture
<R
> coprocessorService(Function
<RpcChannel
, S
> stubMaker
,
738 ServiceCaller
<S
, R
> callable
, RegionInfo region
, byte[] row
) {
739 RegionCoprocessorRpcChannelImpl channel
= new RegionCoprocessorRpcChannelImpl(conn
, tableName
,
740 region
, row
, rpcTimeoutNs
, operationTimeoutNs
);
741 S stub
= stubMaker
.apply(channel
);
742 CompletableFuture
<R
> future
= new CompletableFuture
<>();
743 ClientCoprocessorRpcController controller
= new ClientCoprocessorRpcController();
744 callable
.call(stub
, controller
, resp
-> {
745 if (controller
.failed()) {
746 future
.completeExceptionally(controller
.getFailed());
748 future
.complete(resp
);
755 public <S
, R
> CompletableFuture
<R
> coprocessorService(Function
<RpcChannel
, S
> stubMaker
,
756 ServiceCaller
<S
, R
> callable
, byte[] row
) {
757 return coprocessorService(stubMaker
, callable
, null, row
);
760 private boolean locateFinished(RegionInfo region
, byte[] endKey
, boolean endKeyInclusive
) {
761 if (isEmptyStopRow(endKey
)) {
762 if (isEmptyStopRow(region
.getEndKey())) {
767 if (isEmptyStopRow(region
.getEndKey())) {
770 int c
= Bytes
.compareTo(endKey
, region
.getEndKey());
771 // 1. if the region contains endKey
772 // 2. endKey is equal to the region's endKey and we do not want to include endKey.
773 return c
< 0 || c
== 0 && !endKeyInclusive
;
777 private <S
, R
> void onLocateComplete(Function
<RpcChannel
, S
> stubMaker
,
778 ServiceCaller
<S
, R
> callable
, CoprocessorCallback
<R
> callback
, List
<HRegionLocation
> locs
,
779 byte[] endKey
, boolean endKeyInclusive
, AtomicBoolean locateFinished
,
780 AtomicInteger unfinishedRequest
, HRegionLocation loc
, Throwable error
) {
782 callback
.onError(error
);
785 unfinishedRequest
.incrementAndGet();
786 RegionInfo region
= loc
.getRegion();
787 if (locateFinished(region
, endKey
, endKeyInclusive
)) {
788 locateFinished
.set(true);
791 conn
.getLocator().getRegionLocation(tableName
, region
.getEndKey(), RegionLocateType
.CURRENT
,
793 (l
, e
) -> onLocateComplete(stubMaker
, callable
, callback
, locs
, endKey
, endKeyInclusive
,
794 locateFinished
, unfinishedRequest
, l
, e
));
796 addListener(coprocessorService(stubMaker
, callable
, region
, region
.getStartKey()), (r
, e
) -> {
798 callback
.onRegionError(region
, e
);
800 callback
.onRegionComplete(region
, r
);
802 if (unfinishedRequest
.decrementAndGet() == 0 && locateFinished
.get()) {
803 callback
.onComplete();
808 private final class CoprocessorServiceBuilderImpl
<S
, R
>
809 implements CoprocessorServiceBuilder
<S
, R
> {
811 private final Function
<RpcChannel
, S
> stubMaker
;
813 private final ServiceCaller
<S
, R
> callable
;
815 private final CoprocessorCallback
<R
> callback
;
817 private byte[] startKey
= HConstants
.EMPTY_START_ROW
;
819 private boolean startKeyInclusive
;
821 private byte[] endKey
= HConstants
.EMPTY_END_ROW
;
823 private boolean endKeyInclusive
;
825 public CoprocessorServiceBuilderImpl(Function
<RpcChannel
, S
> stubMaker
,
826 ServiceCaller
<S
, R
> callable
, CoprocessorCallback
<R
> callback
) {
827 this.stubMaker
= Preconditions
.checkNotNull(stubMaker
, "stubMaker is null");
828 this.callable
= Preconditions
.checkNotNull(callable
, "callable is null");
829 this.callback
= Preconditions
.checkNotNull(callback
, "callback is null");
833 public CoprocessorServiceBuilderImpl
<S
, R
> fromRow(byte[] startKey
, boolean inclusive
) {
834 this.startKey
= Preconditions
.checkNotNull(startKey
,
835 "startKey is null. Consider using" +
836 " an empty byte array, or just do not call this method if you want to start selection" +
837 " from the first region");
838 this.startKeyInclusive
= inclusive
;
843 public CoprocessorServiceBuilderImpl
<S
, R
> toRow(byte[] endKey
, boolean inclusive
) {
844 this.endKey
= Preconditions
.checkNotNull(endKey
,
845 "endKey is null. Consider using" +
846 " an empty byte array, or just do not call this method if you want to continue" +
847 " selection to the last region");
848 this.endKeyInclusive
= inclusive
;
853 public void execute() {
854 addListener(conn
.getLocator().getRegionLocation(tableName
, startKey
,
855 startKeyInclusive ? RegionLocateType
.CURRENT
: RegionLocateType
.AFTER
, operationTimeoutNs
),
856 (loc
, error
) -> onLocateComplete(stubMaker
, callable
, callback
, new ArrayList
<>(), endKey
,
857 endKeyInclusive
, new AtomicBoolean(false), new AtomicInteger(0), loc
, error
));
862 public <S
, R
> CoprocessorServiceBuilder
<S
, R
> coprocessorService(
863 Function
<RpcChannel
, S
> stubMaker
, ServiceCaller
<S
, R
> callable
,
864 CoprocessorCallback
<R
> callback
) {
865 return new CoprocessorServiceBuilderImpl
<>(stubMaker
, callable
, callback
);