HBASE-25617 Revisit the span names (#2998)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AsyncConnectionImpl.java
blob78fe8a8a76cb2f2c7d39998a7a706bff448c791c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
21 import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
22 import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
23 import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
24 import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
25 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
26 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
27 import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
28 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
30 import io.opentelemetry.api.trace.Span;
31 import io.opentelemetry.context.Scope;
32 import java.io.IOException;
33 import java.net.SocketAddress;
34 import java.util.Optional;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicReference;
43 import org.apache.commons.io.IOUtils;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.AuthUtil;
46 import org.apache.hadoop.hbase.ChoreService;
47 import org.apache.hadoop.hbase.MasterNotRunningException;
48 import org.apache.hadoop.hbase.ServerName;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
51 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
52 import org.apache.hadoop.hbase.ipc.RpcClient;
53 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
54 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
55 import org.apache.hadoop.hbase.security.User;
56 import org.apache.hadoop.hbase.trace.TraceUtil;
57 import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
58 import org.apache.hadoop.hbase.util.Threads;
59 import org.apache.hadoop.security.UserGroupInformation;
60 import org.apache.yetus.audience.InterfaceAudience;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
64 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
65 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
67 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
68 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
69 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
70 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
72 /**
73 * The implementation of AsyncConnection.
75 @InterfaceAudience.Private
76 class AsyncConnectionImpl implements AsyncConnection {
78 private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
80 static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
81 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
82 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
83 10, TimeUnit.MILLISECONDS);
85 private final Configuration conf;
87 final AsyncConnectionConfiguration connConf;
89 private final User user;
91 final ConnectionRegistry registry;
93 private final int rpcTimeout;
95 protected final RpcClient rpcClient;
97 final RpcControllerFactory rpcControllerFactory;
99 private final AsyncRegionLocator locator;
101 final AsyncRpcRetryingCallerFactory callerFactory;
103 private final NonceGenerator nonceGenerator;
105 private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
106 private final ConcurrentMap<String, AdminService.Interface> adminSubs = new ConcurrentHashMap<>();
108 private final AtomicReference<MasterService.Interface> masterStub = new AtomicReference<>();
110 private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
111 new AtomicReference<>();
113 private final Optional<ServerStatisticTracker> stats;
114 private final ClientBackoffPolicy backoffPolicy;
116 private ChoreService choreService;
118 private final AtomicBoolean closed = new AtomicBoolean(false);
120 private final Optional<MetricsConnection> metrics;
122 private final ClusterStatusListener clusterStatusListener;
124 private volatile ConnectionOverAsyncConnection conn;
126 public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
127 SocketAddress localAddress, User user) {
128 this.conf = conf;
129 this.user = user;
131 if (user.isLoginFromKeytab()) {
132 spawnRenewalChore(user.getUGI());
134 this.connConf = new AsyncConnectionConfiguration(conf);
135 this.registry = registry;
136 if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
137 this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null));
138 } else {
139 this.metrics = Optional.empty();
141 this.rpcClient =
142 RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
143 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
144 this.rpcTimeout =
145 (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
146 this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
147 this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
148 if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
149 nonceGenerator = PerClientRandomNonceGenerator.get();
150 } else {
151 nonceGenerator = NO_NONCE_GENERATOR;
153 this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
154 this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
155 ClusterStatusListener listener = null;
156 if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
157 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
158 // it in, just like clusterId. Not a big problem for now as the default value is false.
159 Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
160 STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
161 if (listenerClass == null) {
162 LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
163 } else {
164 try {
165 listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler() {
166 @Override
167 public void newDead(ServerName sn) {
168 locator.clearCache(sn);
169 rpcClient.cancelConnections(sn);
171 }, conf, listenerClass);
172 } catch (IOException e) {
173 LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e);
177 this.clusterStatusListener = listener;
180 private void spawnRenewalChore(final UserGroupInformation user) {
181 ChoreService service = getChoreService();
182 service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
186 * If choreService has not been created yet, create the ChoreService.
187 * @return ChoreService
189 synchronized ChoreService getChoreService() {
190 if (isClosed()) {
191 throw new IllegalStateException("connection is already closed");
193 if (choreService == null) {
194 choreService = new ChoreService("AsyncConn Chore Service");
196 return choreService;
199 @Override
200 public Configuration getConfiguration() {
201 return conf;
204 @Override
205 public boolean isClosed() {
206 return closed.get();
209 @Override
210 public void close() {
211 TraceUtil.trace(() -> {
212 if (!closed.compareAndSet(false, true)) {
213 return;
215 LOG.info("Connection has been closed by {}.", Thread.currentThread().getName());
216 if (LOG.isDebugEnabled()) {
217 logCallStack(Thread.currentThread().getStackTrace());
219 IOUtils.closeQuietly(clusterStatusListener,
220 e -> LOG.warn("failed to close clusterStatusListener", e));
221 IOUtils.closeQuietly(rpcClient, e -> LOG.warn("failed to close rpcClient", e));
222 IOUtils.closeQuietly(registry, e -> LOG.warn("failed to close registry", e));
223 synchronized (this) {
224 if (choreService != null) {
225 choreService.shutdown();
226 choreService = null;
229 metrics.ifPresent(MetricsConnection::shutdown);
230 ConnectionOverAsyncConnection c = this.conn;
231 if (c != null) {
232 c.closePool();
234 }, "AsyncConnection.close");
237 private void logCallStack(StackTraceElement[] stackTraceElements) {
238 StringBuilder stackBuilder = new StringBuilder("Call stack:");
239 for (StackTraceElement element : stackTraceElements) {
240 stackBuilder.append("\n at ");
241 stackBuilder.append(element);
243 stackBuilder.append("\n");
244 LOG.debug(stackBuilder.toString());
247 @Override
248 public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
249 return new AsyncTableRegionLocatorImpl(tableName, this);
252 @Override
253 public void clearRegionLocationCache() {
254 locator.clearCache();
257 // we will override this method for testing retry caller, so do not remove this method.
258 AsyncRegionLocator getLocator() {
259 return locator;
262 // ditto
263 NonceGenerator getNonceGenerator() {
264 return nonceGenerator;
267 private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
268 return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
271 ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
272 return ConcurrentMapUtils.computeIfAbsentEx(rsStubs,
273 getStubKey(ClientService.getDescriptor().getName(), serverName),
274 () -> createRegionServerStub(serverName));
277 private MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
278 return MasterService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
281 private AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
282 return AdminService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
285 AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
286 return ConcurrentMapUtils.computeIfAbsentEx(adminSubs,
287 getStubKey(AdminService.getDescriptor().getName(), serverName),
288 () -> createAdminServerStub(serverName));
291 CompletableFuture<MasterService.Interface> getMasterStub() {
292 return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
293 CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
294 addListener(registry.getActiveMaster(), (addr, error) -> {
295 if (error != null) {
296 future.completeExceptionally(error);
297 } else if (addr == null) {
298 future.completeExceptionally(new MasterNotRunningException(
299 "ZooKeeper available but no active master location found"));
300 } else {
301 LOG.debug("The fetched master address is {}", addr);
302 try {
303 future.complete(createMasterStub(addr));
304 } catch (IOException e) {
305 future.completeExceptionally(e);
310 return future;
311 }, stub -> true, "master stub");
314 String getClusterId() {
315 try {
316 return registry.getClusterId().get();
317 } catch (InterruptedException | ExecutionException e) {
318 LOG.error("Error fetching cluster ID: ", e);
320 return null;
323 void clearMasterStubCache(MasterService.Interface stub) {
324 masterStub.compareAndSet(stub, null);
327 Optional<ServerStatisticTracker> getStatisticsTracker() {
328 return stats;
331 ClientBackoffPolicy getBackoffPolicy() {
332 return backoffPolicy;
335 @Override
336 public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
337 return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
339 @Override
340 public AsyncTable<AdvancedScanResultConsumer> build() {
341 return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
346 @Override
347 public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
348 ExecutorService pool) {
349 return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, connConf) {
351 @Override
352 public AsyncTable<ScanResultConsumer> build() {
353 RawAsyncTableImpl rawTable =
354 new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
355 return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
360 @Override
361 public AsyncAdminBuilder getAdminBuilder() {
362 return new AsyncAdminBuilderBase(connConf) {
363 @Override
364 public AsyncAdmin build() {
365 return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
370 @Override
371 public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
372 return new AsyncAdminBuilderBase(connConf) {
373 @Override
374 public AsyncAdmin build() {
375 RawAsyncHBaseAdmin rawAdmin =
376 new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
377 return new AsyncHBaseAdmin(rawAdmin, pool);
382 @Override
383 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
384 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName), RETRY_TIMER);
387 @Override
388 public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
389 ExecutorService pool) {
390 return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool),
391 RETRY_TIMER);
394 @Override
395 public Connection toConnection() {
396 ConnectionOverAsyncConnection c = this.conn;
397 if (c != null) {
398 return c;
400 synchronized (this) {
401 c = this.conn;
402 if (c != null) {
403 return c;
405 c = new ConnectionOverAsyncConnection(this);
406 this.conn = c;
408 return c;
411 @Override
412 public CompletableFuture<Hbck> getHbck() {
413 return TraceUtil.tracedFuture(() -> {
414 CompletableFuture<Hbck> future = new CompletableFuture<>();
415 addListener(registry.getActiveMaster(), (sn, error) -> {
416 if (error != null) {
417 future.completeExceptionally(error);
418 } else {
419 try {
420 future.complete(getHbck(sn));
421 } catch (IOException e) {
422 future.completeExceptionally(e);
426 return future;
427 }, "AsyncConnection.getHbck");
430 @Override
431 public Hbck getHbck(ServerName masterServer) throws IOException {
432 Span span = TraceUtil.createSpan("AsyncConnection.getHbck")
433 .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
434 try (Scope scope = span.makeCurrent()) {
435 // we will not create a new connection when creating a new protobuf stub, and for hbck there
436 // will be no performance consideration, so for simplification we will create a new stub every
437 // time instead of caching the stub here.
438 return new HBaseHbck(
439 MasterProtos.HbckService
440 .newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
441 rpcControllerFactory);
445 Optional<MetricsConnection> getConnectionMetrics() {
446 return metrics;