2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static java
.util
.stream
.Collectors
.toList
;
22 import java
.io
.IOException
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.CompletableFuture
;
25 import java
.util
.concurrent
.ExecutorService
;
26 import java
.util
.concurrent
.TimeUnit
;
27 import java
.util
.function
.Function
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.CompareOperator
;
30 import org
.apache
.hadoop
.hbase
.TableName
;
31 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
32 import org
.apache
.hadoop
.hbase
.io
.TimeRange
;
33 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
34 import org
.apache
.yetus
.audience
.InterfaceAudience
;
36 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcChannel
;
39 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
40 * thread pool when constructing this class, and the callback methods registered to the returned
41 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
42 * to do anything they want in the callbacks without breaking the rpc framework.
44 @InterfaceAudience.Private
45 class AsyncTableImpl
implements AsyncTable
<ScanResultConsumer
> {
47 private final RawAsyncTableImpl rawTable
;
49 private final ExecutorService pool
;
51 AsyncTableImpl(RawAsyncTableImpl rawTable
, ExecutorService pool
) {
52 this.rawTable
= rawTable
;
57 public TableName
getName() {
58 return rawTable
.getName();
62 public Configuration
getConfiguration() {
63 return rawTable
.getConfiguration();
67 public CompletableFuture
<TableDescriptor
> getDescriptor() {
68 return wrap(rawTable
.getDescriptor());
72 public AsyncTableRegionLocator
getRegionLocator() {
73 return rawTable
.getRegionLocator();
77 public long getRpcTimeout(TimeUnit unit
) {
78 return rawTable
.getRpcTimeout(unit
);
82 public long getReadRpcTimeout(TimeUnit unit
) {
83 return rawTable
.getReadRpcTimeout(unit
);
87 public long getWriteRpcTimeout(TimeUnit unit
) {
88 return rawTable
.getWriteRpcTimeout(unit
);
92 public long getOperationTimeout(TimeUnit unit
) {
93 return rawTable
.getOperationTimeout(unit
);
97 public long getScanTimeout(TimeUnit unit
) {
98 return rawTable
.getScanTimeout(unit
);
101 private <T
> CompletableFuture
<T
> wrap(CompletableFuture
<T
> future
) {
102 return FutureUtils
.wrapFuture(future
, pool
);
106 public CompletableFuture
<Result
> get(Get get
) {
107 return wrap(rawTable
.get(get
));
111 public CompletableFuture
<Void
> put(Put put
) {
112 return wrap(rawTable
.put(put
));
116 public CompletableFuture
<Void
> delete(Delete delete
) {
117 return wrap(rawTable
.delete(delete
));
121 public CompletableFuture
<Result
> append(Append append
) {
122 return wrap(rawTable
.append(append
));
126 public CompletableFuture
<Result
> increment(Increment increment
) {
127 return wrap(rawTable
.increment(increment
));
131 public CheckAndMutateBuilder
checkAndMutate(byte[] row
, byte[] family
) {
132 return new CheckAndMutateBuilder() {
134 private final CheckAndMutateBuilder builder
= rawTable
.checkAndMutate(row
, family
);
137 public CompletableFuture
<Boolean
> thenPut(Put put
) {
138 return wrap(builder
.thenPut(put
));
142 public CompletableFuture
<Boolean
> thenMutate(RowMutations mutation
) {
143 return wrap(builder
.thenMutate(mutation
));
147 public CompletableFuture
<Boolean
> thenDelete(Delete delete
) {
148 return wrap(builder
.thenDelete(delete
));
152 public CheckAndMutateBuilder
qualifier(byte[] qualifier
) {
153 builder
.qualifier(qualifier
);
158 public CheckAndMutateBuilder
timeRange(TimeRange timeRange
) {
159 builder
.timeRange(timeRange
);
164 public CheckAndMutateBuilder
ifNotExists() {
165 builder
.ifNotExists();
170 public CheckAndMutateBuilder
ifMatches(CompareOperator compareOp
, byte[] value
) {
171 builder
.ifMatches(compareOp
, value
);
178 public CheckAndMutateWithFilterBuilder
checkAndMutate(byte[] row
, Filter filter
) {
179 return new CheckAndMutateWithFilterBuilder() {
181 private final CheckAndMutateWithFilterBuilder builder
=
182 rawTable
.checkAndMutate(row
, filter
);
185 public CheckAndMutateWithFilterBuilder
timeRange(TimeRange timeRange
) {
186 builder
.timeRange(timeRange
);
191 public CompletableFuture
<Boolean
> thenPut(Put put
) {
192 return wrap(builder
.thenPut(put
));
196 public CompletableFuture
<Boolean
> thenDelete(Delete delete
) {
197 return wrap(builder
.thenDelete(delete
));
201 public CompletableFuture
<Boolean
> thenMutate(RowMutations mutation
) {
202 return wrap(builder
.thenMutate(mutation
));
208 public CompletableFuture
<CheckAndMutateResult
> checkAndMutate(CheckAndMutate checkAndMutate
) {
209 return wrap(rawTable
.checkAndMutate(checkAndMutate
));
213 public List
<CompletableFuture
<CheckAndMutateResult
>> checkAndMutate(
214 List
<CheckAndMutate
> checkAndMutates
) {
215 return rawTable
.checkAndMutate(checkAndMutates
).stream()
216 .map(this::wrap
).collect(toList());
220 public CompletableFuture
<Result
> mutateRow(RowMutations mutation
) {
221 return wrap(rawTable
.mutateRow(mutation
));
225 public CompletableFuture
<List
<Result
>> scanAll(Scan scan
) {
226 return wrap(rawTable
.scanAll(scan
));
230 public ResultScanner
getScanner(Scan scan
) {
231 return rawTable
.getScanner(scan
);
234 private void scan0(Scan scan
, ScanResultConsumer consumer
) {
235 try (ResultScanner scanner
= getScanner(scan
)) {
236 consumer
.onScanMetricsCreated(scanner
.getScanMetrics());
237 for (Result result
; (result
= scanner
.next()) != null;) {
238 if (!consumer
.onNext(result
)) {
242 consumer
.onComplete();
243 } catch (IOException e
) {
249 public void scan(Scan scan
, ScanResultConsumer consumer
) {
250 pool
.execute(() -> scan0(scan
, consumer
));
254 public List
<CompletableFuture
<Result
>> get(List
<Get
> gets
) {
255 return rawTable
.get(gets
).stream().map(this::wrap
).collect(toList());
259 public List
<CompletableFuture
<Void
>> put(List
<Put
> puts
) {
260 return rawTable
.put(puts
).stream().map(this::wrap
).collect(toList());
264 public List
<CompletableFuture
<Void
>> delete(List
<Delete
> deletes
) {
265 return rawTable
.delete(deletes
).stream().map(this::wrap
).collect(toList());
269 public <T
> List
<CompletableFuture
<T
>> batch(List
<?
extends Row
> actions
) {
270 return rawTable
.<T
> batch(actions
).stream().map(this::wrap
).collect(toList());
274 public <S
, R
> CompletableFuture
<R
> coprocessorService(Function
<RpcChannel
, S
> stubMaker
,
275 ServiceCaller
<S
, R
> callable
, byte[] row
) {
276 return wrap(rawTable
.coprocessorService(stubMaker
, callable
, row
));
280 public <S
, R
> CoprocessorServiceBuilder
<S
, R
> coprocessorService(
281 Function
<RpcChannel
, S
> stubMaker
, ServiceCaller
<S
, R
> callable
,
282 CoprocessorCallback
<R
> callback
) {
283 CoprocessorCallback
<R
> wrappedCallback
= new CoprocessorCallback
<R
>() {
286 public void onRegionComplete(RegionInfo region
, R resp
) {
287 pool
.execute(() -> callback
.onRegionComplete(region
, resp
));
291 public void onRegionError(RegionInfo region
, Throwable error
) {
292 pool
.execute(() -> callback
.onRegionError(region
, error
));
296 public void onComplete() {
297 pool
.execute(() -> callback
.onComplete());
301 public void onError(Throwable error
) {
302 pool
.execute(() -> callback
.onError(error
));
305 CoprocessorServiceBuilder
<S
, R
> builder
=
306 rawTable
.coprocessorService(stubMaker
, callable
, wrappedCallback
);
307 return new CoprocessorServiceBuilder
<S
, R
>() {
310 public CoprocessorServiceBuilder
<S
, R
> fromRow(byte[] startKey
, boolean inclusive
) {
311 builder
.fromRow(startKey
, inclusive
);
316 public CoprocessorServiceBuilder
<S
, R
> toRow(byte[] endKey
, boolean inclusive
) {
317 builder
.toRow(endKey
, inclusive
);
322 public void execute() {