HBASE-26765 Minor refactor of async scanning code (#4121)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncConnectionImpl.java
blob506962fd2f8a475113c599e039a1d7573cc62bb4
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
21 import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
22 import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
23 import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
24 import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
25 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
26 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
27 import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
28 import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
29 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
31 import io.opentelemetry.api.trace.Span;
32 import java.io.IOException;
33 import java.net.SocketAddress;
34 import java.util.Optional;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.function.Supplier;
44 import org.apache.commons.io.IOUtils;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.AuthUtil;
47 import org.apache.hadoop.hbase.ChoreService;
48 import org.apache.hadoop.hbase.MasterNotRunningException;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
52 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
53 import org.apache.hadoop.hbase.ipc.RpcClient;
54 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
55 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56 import org.apache.hadoop.hbase.security.User;
57 import org.apache.hadoop.hbase.trace.TraceUtil;
58 import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
59 import org.apache.hadoop.hbase.util.Threads;
60 import org.apache.hadoop.security.UserGroupInformation;
61 import org.apache.yetus.audience.InterfaceAudience;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
65 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
66 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
68 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
69 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
70 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
71 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
73 /**
74 * The implementation of AsyncConnection.
76 @InterfaceAudience.Private
77 public class AsyncConnectionImpl implements AsyncConnection {
79 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
81 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
82 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
83 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
84 10, TimeUnit.MILLISECONDS);
86 private final Configuration conf;
88 final AsyncConnectionConfiguration connConf;
90 protected final User user;
92 final ConnectionRegistry registry;
94 protected final int rpcTimeout;
96 protected final RpcClient rpcClient;
98 final RpcControllerFactory rpcControllerFactory;
100 private final AsyncRegionLocator locator;
102 final AsyncRpcRetryingCallerFactory callerFactory;
104 private final NonceGenerator nonceGenerator;
106 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
107 private final ConcurrentMap<String, AdminService.Interface> adminStubs =
108 new ConcurrentHashMap<>();
110 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
112 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
113 new AtomicReference<>();
115 private final Optional<ServerStatisticTracker> stats;
116 private final ClientBackoffPolicy backoffPolicy;
118 private ChoreService choreService;
120 private final AtomicBoolean closed = new AtomicBoolean(false);
122 private final Optional<MetricsConnection> metrics;
124 private final ClusterStatusListener clusterStatusListener;
126 private volatile ConnectionOverAsyncConnection conn;
128 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
129 SocketAddress localAddress, User user) {
130 this.conf = conf;
131 this.user = user;
133 if (user.isLoginFromKeytab()) {
134 spawnRenewalChore(user.getUGI());
136 this.connConf = new AsyncConnectionConfiguration(conf);
137 this.registry = registry;
138 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
139 this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
140 } else {
141 this.metrics = Optional.empty();
143 this.rpcClient =
144 RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
145 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
146 this.rpcTimeout =
147 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
148 this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
149 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
150 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
151 nonceGenerator = PerClientRandomNonceGenerator.get();
152 } else {
153 nonceGenerator = NO_NONCE_GENERATOR;
155 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
156 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
157 ClusterStatusListener listener = null;
158 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
159 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
160 // it in, just like clusterId. Not a big problem for now as the default value is false.
161 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
162 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
163 if (listenerClass == null) {
164 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
165 } else {
166 try {
167 listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
168 @Override
169 public void newDead(ServerName sn) {
170 locator.clearCache(sn);
171 rpcClient.cancelConnections(sn);
173 }, conf, listenerClass);
174 } catch (IOException e) {
175 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
179 this.clusterStatusListener = listener;
182 private void spawnRenewalChore(final UserGroupInformation user) {
183 ChoreService service = getChoreService();
184 service.scheduleChore(AuthUtil.getAuthRenewalChore(user, conf));
188 * If choreService has not been created yet, create the ChoreService.
189 * @return ChoreService
191 synchronized ChoreService getChoreService() {
192 if (isClosed()) {
193 throw new IllegalStateException("connection is already closed");
195 if (choreService == null) {
196 choreService = new ChoreService("AsyncConn Chore Service");
198 return choreService;
201 public User getUser() {
202 return user;
205 public ConnectionRegistry getConnectionRegistry() {
206 return registry;
209 @Override
210 public Configuration getConfiguration() {
211 return conf;
214 @Override
215 public boolean isClosed() {
216 return closed.get();
219 @Override
220 public void close() {
221 TraceUtil.trace(() -> {
222 if (!closed.compareAndSet(false, true)) {
223 return;
225 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
226 if (LOG.isDebugEnabled()) {
227 logCallStack(Thread.currentThread().getStackTrace());
229 IOUtils.closeQuietly(clusterStatusListener,
230 e -> LOG.warn("failed to close clusterStatusListener", e));
231 IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
232 IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
233 synchronized (this) {
234 if (choreService != null) {
235 choreService.shutdown();
236 choreService = null;
239 metrics.ifPresent(MetricsConnection::shutdown);
240 ConnectionOverAsyncConnection c = this.conn;
241 if (c != null) {
242 c.closePool();
244 }, "AsyncConnection.close");
247 private void logCallStack(StackTraceElement[] stackTraceElements) {
248 StringBuilder stackBuilder = new StringBuilder("Call stack:");
249 for (StackTraceElement element : stackTraceElements) {
250 stackBuilder.append("\n at ");
251 stackBuilder.append(element);
253 stackBuilder.append("\n");
254 LOG.debug(stackBuilder.toString());
257 @Override
258 public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
259 return new AsyncTableRegionLocatorImpl(tableName, this);
262 @Override
263 public void clearRegionLocationCache() {
264 locator.clearCache();
267 // we will override this method for testing retry caller, so do not remove this method.
268 AsyncRegionLocator getLocator() {
269 return locator;
272 // ditto
273 NonceGenerator getNonceGenerator() {
274 return nonceGenerator;
277 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
278 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
281 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
282 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
283 getStubKey(ClientService.getDescriptor().getName(), serverName),
284 () -> createRegionServerStub(serverName));
287 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
288 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
291 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
292 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
295 AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
296 return ConcurrentMapUtils.computeIfAbsentEx(adminStubs,
297 getStubKey(AdminService.getDescriptor().getName(), serverName),
298 () -> createAdminServerStub(serverName));
301 CompletableFuture<MasterService.Interface> getMasterStub() {
302 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
303 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
304 addListener(registry.getActiveMaster(), (addr, error) -> {
305 if (error != null) {
306 future.completeExceptionally(error);
307 } else if (addr == null) {
308 future.completeExceptionally(new MasterNotRunningException(
309 "ZooKeeper available but no active master location found"));
310 } else {
311 LOG.debug("The fetched master address is {}", addr);
312 try {
313 future.complete(createMasterStub(addr));
314 } catch (IOException e) {
315 future.completeExceptionally(e);
320 return future;
321 }, stub -> true, "master stub");
324 String getClusterId() {
325 try {
326 return registry.getClusterId().get();
327 } catch (InterruptedException | ExecutionException e) {
328 LOG.error("Error fetching cluster ID: ", e);
330 return null;
333 void clearMasterStubCache(MasterService.Interface stub) {
334 masterStub.compareAndSet(stub, null);
337 Optional<ServerStatisticTracker> getStatisticsTracker() {
338 return stats;
341 ClientBackoffPolicy getBackoffPolicy() {
342 return backoffPolicy;
345 @Override
346 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
347 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
349 @Override
350 public AsyncTable<AdvancedScanResultConsumer> build() {
351 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
356 @Override
357 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
358 ExecutorService pool) {
359 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
361 @Override
362 public AsyncTable<ScanResultConsumer> build() {
363 RawAsyncTableImpl rawTable =
364 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
365 return new AsyncTableImpl(rawTable, pool);
370 @Override
371 public AsyncAdminBuilder getAdminBuilder() {
372 return new AsyncAdminBuilderBase(connConf) {
373 @Override
374 public AsyncAdmin build() {
375 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
380 @Override
381 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
382 return new AsyncAdminBuilderBase(connConf) {
383 @Override
384 public AsyncAdmin build() {
385 RawAsyncHBaseAdmin rawAdmin =
386 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
387 return new AsyncHBaseAdmin(rawAdmin, pool);
392 @Override
393 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
394 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
397 @Override
398 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
399 ExecutorService pool) {
400 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
401 RETRY_TIMER);
404 @Override
405 public Connection toConnection() {
406 ConnectionOverAsyncConnection c = this.conn;
407 if (c != null) {
408 return c;
410 synchronized (this) {
411 c = this.conn;
412 if (c != null) {
413 return c;
415 c = new ConnectionOverAsyncConnection(this);
416 this.conn = c;
418 return c;
421 private Hbck getHbckInternal(ServerName masterServer) {
422 Span.current().setAttribute(SERVER_NAME_KEY, masterServer.getServerName());
423 // we will not create a new connection when creating a new protobuf stub, and for hbck there
424 // will be no performance consideration, so for simplification we will create a new stub every
425 // time instead of caching the stub here.
426 return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
427 rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
430 @Override
431 public CompletableFuture<Hbck> getHbck() {
432 return TraceUtil.tracedFuture(() -> {
433 CompletableFuture<Hbck> future = new CompletableFuture<>();
434 addListener(registry.getActiveMaster(), (sn, error) -> {
435 if (error != null) {
436 future.completeExceptionally(error);
437 } else {
438 future.complete(getHbckInternal(sn));
441 return future;
442 }, "AsyncConnection.getHbck");
445 @Override
446 public Hbck getHbck(ServerName masterServer) {
447 return TraceUtil.trace(new Supplier<Hbck>() {
449 @Override
450 public Hbck get() {
451 return getHbckInternal(masterServer);
453 }, "AsyncConnection.getHbck");
456 Optional<MetricsConnection> getConnectionMetrics() {
457 return metrics;