2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static java
.util
.stream
.Collectors
.toList
;
22 import java
.io
.IOException
;
23 import java
.util
.List
;
24 import java
.util
.concurrent
.CompletableFuture
;
25 import java
.util
.concurrent
.ExecutorService
;
26 import java
.util
.concurrent
.TimeUnit
;
27 import java
.util
.function
.Function
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.CompareOperator
;
30 import org
.apache
.hadoop
.hbase
.TableName
;
31 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
32 import org
.apache
.hadoop
.hbase
.io
.TimeRange
;
33 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
34 import org
.apache
.yetus
.audience
.InterfaceAudience
;
36 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcChannel
;
39 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
40 * thread pool when constructing this class, and the callback methods registered to the returned
41 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
42 * to do anything they want in the callbacks without breaking the rpc framework.
44 @InterfaceAudience.Private
45 class AsyncTableImpl
implements AsyncTable
<ScanResultConsumer
> {
47 private final AsyncTable
<AdvancedScanResultConsumer
> rawTable
;
49 private final ExecutorService pool
;
51 AsyncTableImpl(AsyncConnectionImpl conn
, AsyncTable
<AdvancedScanResultConsumer
> rawTable
,
52 ExecutorService pool
) {
53 this.rawTable
= rawTable
;
58 public TableName
getName() {
59 return rawTable
.getName();
63 public Configuration
getConfiguration() {
64 return rawTable
.getConfiguration();
68 public CompletableFuture
<TableDescriptor
> getDescriptor() {
69 return wrap(rawTable
.getDescriptor());
73 public AsyncTableRegionLocator
getRegionLocator() {
74 return rawTable
.getRegionLocator();
78 public long getRpcTimeout(TimeUnit unit
) {
79 return rawTable
.getRpcTimeout(unit
);
83 public long getReadRpcTimeout(TimeUnit unit
) {
84 return rawTable
.getReadRpcTimeout(unit
);
88 public long getWriteRpcTimeout(TimeUnit unit
) {
89 return rawTable
.getWriteRpcTimeout(unit
);
93 public long getOperationTimeout(TimeUnit unit
) {
94 return rawTable
.getOperationTimeout(unit
);
98 public long getScanTimeout(TimeUnit unit
) {
99 return rawTable
.getScanTimeout(unit
);
102 private <T
> CompletableFuture
<T
> wrap(CompletableFuture
<T
> future
) {
103 return FutureUtils
.wrapFuture(future
, pool
);
107 public CompletableFuture
<Result
> get(Get get
) {
108 return wrap(rawTable
.get(get
));
112 public CompletableFuture
<Void
> put(Put put
) {
113 return wrap(rawTable
.put(put
));
117 public CompletableFuture
<Void
> delete(Delete delete
) {
118 return wrap(rawTable
.delete(delete
));
122 public CompletableFuture
<Result
> append(Append append
) {
123 return wrap(rawTable
.append(append
));
127 public CompletableFuture
<Result
> increment(Increment increment
) {
128 return wrap(rawTable
.increment(increment
));
132 public CheckAndMutateBuilder
checkAndMutate(byte[] row
, byte[] family
) {
133 return new CheckAndMutateBuilder() {
135 private final CheckAndMutateBuilder builder
= rawTable
.checkAndMutate(row
, family
);
138 public CompletableFuture
<Boolean
> thenPut(Put put
) {
139 return wrap(builder
.thenPut(put
));
143 public CompletableFuture
<Boolean
> thenMutate(RowMutations mutation
) {
144 return wrap(builder
.thenMutate(mutation
));
148 public CompletableFuture
<Boolean
> thenDelete(Delete delete
) {
149 return wrap(builder
.thenDelete(delete
));
153 public CheckAndMutateBuilder
qualifier(byte[] qualifier
) {
154 builder
.qualifier(qualifier
);
159 public CheckAndMutateBuilder
timeRange(TimeRange timeRange
) {
160 builder
.timeRange(timeRange
);
165 public CheckAndMutateBuilder
ifNotExists() {
166 builder
.ifNotExists();
171 public CheckAndMutateBuilder
ifMatches(CompareOperator compareOp
, byte[] value
) {
172 builder
.ifMatches(compareOp
, value
);
179 public CheckAndMutateWithFilterBuilder
checkAndMutate(byte[] row
, Filter filter
) {
180 return new CheckAndMutateWithFilterBuilder() {
182 private final CheckAndMutateWithFilterBuilder builder
=
183 rawTable
.checkAndMutate(row
, filter
);
186 public CheckAndMutateWithFilterBuilder
timeRange(TimeRange timeRange
) {
187 builder
.timeRange(timeRange
);
192 public CompletableFuture
<Boolean
> thenPut(Put put
) {
193 return wrap(builder
.thenPut(put
));
197 public CompletableFuture
<Boolean
> thenDelete(Delete delete
) {
198 return wrap(builder
.thenDelete(delete
));
202 public CompletableFuture
<Boolean
> thenMutate(RowMutations mutation
) {
203 return wrap(builder
.thenMutate(mutation
));
209 public CompletableFuture
<CheckAndMutateResult
> checkAndMutate(CheckAndMutate checkAndMutate
) {
210 return wrap(rawTable
.checkAndMutate(checkAndMutate
));
214 public List
<CompletableFuture
<CheckAndMutateResult
>> checkAndMutate(
215 List
<CheckAndMutate
> checkAndMutates
) {
216 return rawTable
.checkAndMutate(checkAndMutates
).stream()
217 .map(this::wrap
).collect(toList());
221 public CompletableFuture
<Result
> mutateRow(RowMutations mutation
) {
222 return wrap(rawTable
.mutateRow(mutation
));
226 public CompletableFuture
<List
<Result
>> scanAll(Scan scan
) {
227 return wrap(rawTable
.scanAll(scan
));
231 public ResultScanner
getScanner(Scan scan
) {
232 return rawTable
.getScanner(scan
);
235 private void scan0(Scan scan
, ScanResultConsumer consumer
) {
236 try (ResultScanner scanner
= getScanner(scan
)) {
237 consumer
.onScanMetricsCreated(scanner
.getScanMetrics());
238 for (Result result
; (result
= scanner
.next()) != null;) {
239 if (!consumer
.onNext(result
)) {
243 consumer
.onComplete();
244 } catch (IOException e
) {
250 public void scan(Scan scan
, ScanResultConsumer consumer
) {
251 pool
.execute(() -> scan0(scan
, consumer
));
255 public List
<CompletableFuture
<Result
>> get(List
<Get
> gets
) {
256 return rawTable
.get(gets
).stream().map(this::wrap
).collect(toList());
260 public List
<CompletableFuture
<Void
>> put(List
<Put
> puts
) {
261 return rawTable
.put(puts
).stream().map(this::wrap
).collect(toList());
265 public List
<CompletableFuture
<Void
>> delete(List
<Delete
> deletes
) {
266 return rawTable
.delete(deletes
).stream().map(this::wrap
).collect(toList());
270 public <T
> List
<CompletableFuture
<T
>> batch(List
<?
extends Row
> actions
) {
271 return rawTable
.<T
> batch(actions
).stream().map(this::wrap
).collect(toList());
275 public <S
, R
> CompletableFuture
<R
> coprocessorService(Function
<RpcChannel
, S
> stubMaker
,
276 ServiceCaller
<S
, R
> callable
, byte[] row
) {
277 return wrap(rawTable
.coprocessorService(stubMaker
, callable
, row
));
281 public <S
, R
> CoprocessorServiceBuilder
<S
, R
> coprocessorService(
282 Function
<RpcChannel
, S
> stubMaker
, ServiceCaller
<S
, R
> callable
,
283 CoprocessorCallback
<R
> callback
) {
284 CoprocessorCallback
<R
> wrappedCallback
= new CoprocessorCallback
<R
>() {
287 public void onRegionComplete(RegionInfo region
, R resp
) {
288 pool
.execute(() -> callback
.onRegionComplete(region
, resp
));
292 public void onRegionError(RegionInfo region
, Throwable error
) {
293 pool
.execute(() -> callback
.onRegionError(region
, error
));
297 public void onComplete() {
298 pool
.execute(() -> callback
.onComplete());
302 public void onError(Throwable error
) {
303 pool
.execute(() -> callback
.onError(error
));
306 CoprocessorServiceBuilder
<S
, R
> builder
=
307 rawTable
.coprocessorService(stubMaker
, callable
, wrappedCallback
);
308 return new CoprocessorServiceBuilder
<S
, R
>() {
311 public CoprocessorServiceBuilder
<S
, R
> fromRow(byte[] startKey
, boolean inclusive
) {
312 builder
.fromRow(startKey
, inclusive
);
317 public CoprocessorServiceBuilder
<S
, R
> toRow(byte[] endKey
, boolean inclusive
) {
318 builder
.toRow(endKey
, inclusive
);
323 public void execute() {