HBASE-26688 Threads shared EMPTY_RESULT may lead to unexpected client job down. ...
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncTableImpl.java
bloba124467cd96faf51ed56427fea8a63dd66bd32cc
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static java.util.stream.Collectors.toList;
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.TimeUnit;
27 import java.util.function.Function;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.CompareOperator;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.filter.Filter;
32 import org.apache.hadoop.hbase.io.TimeRange;
33 import org.apache.hadoop.hbase.util.FutureUtils;
34 import org.apache.yetus.audience.InterfaceAudience;
36 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
38 /**
39 * Just a wrapper of {@link RawAsyncTableImpl}. The difference is that users need to provide a
40 * thread pool when constructing this class, and the callback methods registered to the returned
41 * {@link CompletableFuture} will be executed in this thread pool. So usually it is safe for users
42 * to do anything they want in the callbacks without breaking the rpc framework.
44 @InterfaceAudience.Private
45 class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
47 private final AsyncTable<AdvancedScanResultConsumer> rawTable;
49 private final ExecutorService pool;
51 AsyncTableImpl(AsyncConnectionImpl conn, AsyncTable<AdvancedScanResultConsumer> rawTable,
52 ExecutorService pool) {
53 this.rawTable = rawTable;
54 this.pool = pool;
57 @Override
58 public TableName getName() {
59 return rawTable.getName();
62 @Override
63 public Configuration getConfiguration() {
64 return rawTable.getConfiguration();
67 @Override
68 public CompletableFuture<TableDescriptor> getDescriptor() {
69 return wrap(rawTable.getDescriptor());
72 @Override
73 public AsyncTableRegionLocator getRegionLocator() {
74 return rawTable.getRegionLocator();
77 @Override
78 public long getRpcTimeout(TimeUnit unit) {
79 return rawTable.getRpcTimeout(unit);
82 @Override
83 public long getReadRpcTimeout(TimeUnit unit) {
84 return rawTable.getReadRpcTimeout(unit);
87 @Override
88 public long getWriteRpcTimeout(TimeUnit unit) {
89 return rawTable.getWriteRpcTimeout(unit);
92 @Override
93 public long getOperationTimeout(TimeUnit unit) {
94 return rawTable.getOperationTimeout(unit);
97 @Override
98 public long getScanTimeout(TimeUnit unit) {
99 return rawTable.getScanTimeout(unit);
102 private <T> CompletableFuture<T> wrap(CompletableFuture<T> future) {
103 return FutureUtils.wrapFuture(future, pool);
106 @Override
107 public CompletableFuture<Result> get(Get get) {
108 return wrap(rawTable.get(get));
111 @Override
112 public CompletableFuture<Void> put(Put put) {
113 return wrap(rawTable.put(put));
116 @Override
117 public CompletableFuture<Void> delete(Delete delete) {
118 return wrap(rawTable.delete(delete));
121 @Override
122 public CompletableFuture<Result> append(Append append) {
123 return wrap(rawTable.append(append));
126 @Override
127 public CompletableFuture<Result> increment(Increment increment) {
128 return wrap(rawTable.increment(increment));
131 @Override
132 public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
133 return new CheckAndMutateBuilder() {
135 private final CheckAndMutateBuilder builder = rawTable.checkAndMutate(row, family);
137 @Override
138 public CompletableFuture<Boolean> thenPut(Put put) {
139 return wrap(builder.thenPut(put));
142 @Override
143 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
144 return wrap(builder.thenMutate(mutation));
147 @Override
148 public CompletableFuture<Boolean> thenDelete(Delete delete) {
149 return wrap(builder.thenDelete(delete));
152 @Override
153 public CheckAndMutateBuilder qualifier(byte[] qualifier) {
154 builder.qualifier(qualifier);
155 return this;
158 @Override
159 public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
160 builder.timeRange(timeRange);
161 return this;
164 @Override
165 public CheckAndMutateBuilder ifNotExists() {
166 builder.ifNotExists();
167 return this;
170 @Override
171 public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value) {
172 builder.ifMatches(compareOp, value);
173 return this;
178 @Override
179 public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
180 return new CheckAndMutateWithFilterBuilder() {
182 private final CheckAndMutateWithFilterBuilder builder =
183 rawTable.checkAndMutate(row, filter);
185 @Override
186 public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
187 builder.timeRange(timeRange);
188 return this;
191 @Override
192 public CompletableFuture<Boolean> thenPut(Put put) {
193 return wrap(builder.thenPut(put));
196 @Override
197 public CompletableFuture<Boolean> thenDelete(Delete delete) {
198 return wrap(builder.thenDelete(delete));
201 @Override
202 public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
203 return wrap(builder.thenMutate(mutation));
208 @Override
209 public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
210 return wrap(rawTable.checkAndMutate(checkAndMutate));
213 @Override
214 public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(
215 List<CheckAndMutate> checkAndMutates) {
216 return rawTable.checkAndMutate(checkAndMutates).stream()
217 .map(this::wrap).collect(toList());
220 @Override
221 public CompletableFuture<Result> mutateRow(RowMutations mutation) {
222 return wrap(rawTable.mutateRow(mutation));
225 @Override
226 public CompletableFuture<List<Result>> scanAll(Scan scan) {
227 return wrap(rawTable.scanAll(scan));
230 @Override
231 public ResultScanner getScanner(Scan scan) {
232 return rawTable.getScanner(scan);
235 private void scan0(Scan scan, ScanResultConsumer consumer) {
236 try (ResultScanner scanner = getScanner(scan)) {
237 consumer.onScanMetricsCreated(scanner.getScanMetrics());
238 for (Result result; (result = scanner.next()) != null;) {
239 if (!consumer.onNext(result)) {
240 break;
243 consumer.onComplete();
244 } catch (IOException e) {
245 consumer.onError(e);
249 @Override
250 public void scan(Scan scan, ScanResultConsumer consumer) {
251 pool.execute(() -> scan0(scan, consumer));
254 @Override
255 public List<CompletableFuture<Result>> get(List<Get> gets) {
256 return rawTable.get(gets).stream().map(this::wrap).collect(toList());
259 @Override
260 public List<CompletableFuture<Void>> put(List<Put> puts) {
261 return rawTable.put(puts).stream().map(this::wrap).collect(toList());
264 @Override
265 public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
266 return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
269 @Override
270 public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
271 return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
274 @Override
275 public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
276 ServiceCaller<S, R> callable, byte[] row) {
277 return wrap(rawTable.coprocessorService(stubMaker, callable, row));
280 @Override
281 public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
282 Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
283 CoprocessorCallback<R> callback) {
284 CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
286 @Override
287 public void onRegionComplete(RegionInfo region, R resp) {
288 pool.execute(() -> callback.onRegionComplete(region, resp));
291 @Override
292 public void onRegionError(RegionInfo region, Throwable error) {
293 pool.execute(() -> callback.onRegionError(region, error));
296 @Override
297 public void onComplete() {
298 pool.execute(() -> callback.onComplete());
301 @Override
302 public void onError(Throwable error) {
303 pool.execute(() -> callback.onError(error));
306 CoprocessorServiceBuilder<S, R> builder =
307 rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
308 return new CoprocessorServiceBuilder<S, R>() {
310 @Override
311 public CoprocessorServiceBuilder<S, R> fromRow(byte[] startKey, boolean inclusive) {
312 builder.fromRow(startKey, inclusive);
313 return this;
316 @Override
317 public CoprocessorServiceBuilder<S, R> toRow(byte[] endKey, boolean inclusive) {
318 builder.toRow(endKey, inclusive);
319 return this;
322 @Override
323 public void execute() {
324 builder.execute();