2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.STATUS_PUBLISHED
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.STATUS_PUBLISHED_DEFAULT
;
22 import static org
.apache
.hadoop
.hbase
.client
.ClusterStatusListener
.DEFAULT_STATUS_LISTENER_CLASS
;
23 import static org
.apache
.hadoop
.hbase
.client
.ClusterStatusListener
.STATUS_LISTENER_CLASS
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.NO_NONCE_GENERATOR
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.getStubKey
;
26 import static org
.apache
.hadoop
.hbase
.client
.MetricsConnection
.CLIENT_SIDE_METRICS_ENABLED_KEY
;
27 import static org
.apache
.hadoop
.hbase
.client
.NonceGenerator
.CLIENT_NONCES_ENABLED_KEY
;
28 import static org
.apache
.hadoop
.hbase
.trace
.HBaseSemanticAttributes
.SERVER_NAME_KEY
;
29 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
31 import io
.opentelemetry
.api
.trace
.Span
;
32 import java
.io
.IOException
;
33 import java
.net
.SocketAddress
;
34 import java
.util
.Optional
;
35 import java
.util
.concurrent
.CompletableFuture
;
36 import java
.util
.concurrent
.ConcurrentHashMap
;
37 import java
.util
.concurrent
.ConcurrentMap
;
38 import java
.util
.concurrent
.ExecutionException
;
39 import java
.util
.concurrent
.ExecutorService
;
40 import java
.util
.concurrent
.TimeUnit
;
41 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
42 import java
.util
.concurrent
.atomic
.AtomicReference
;
43 import java
.util
.function
.Supplier
;
44 import org
.apache
.commons
.io
.IOUtils
;
45 import org
.apache
.hadoop
.conf
.Configuration
;
46 import org
.apache
.hadoop
.hbase
.AuthUtil
;
47 import org
.apache
.hadoop
.hbase
.ChoreService
;
48 import org
.apache
.hadoop
.hbase
.MasterNotRunningException
;
49 import org
.apache
.hadoop
.hbase
.ServerName
;
50 import org
.apache
.hadoop
.hbase
.TableName
;
51 import org
.apache
.hadoop
.hbase
.client
.backoff
.ClientBackoffPolicy
;
52 import org
.apache
.hadoop
.hbase
.client
.backoff
.ClientBackoffPolicyFactory
;
53 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
54 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
55 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
56 import org
.apache
.hadoop
.hbase
.security
.User
;
57 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
58 import org
.apache
.hadoop
.hbase
.util
.ConcurrentMapUtils
;
59 import org
.apache
.hadoop
.hbase
.util
.Threads
;
60 import org
.apache
.hadoop
.security
.UserGroupInformation
;
61 import org
.apache
.yetus
.audience
.InterfaceAudience
;
62 import org
.slf4j
.Logger
;
63 import org
.slf4j
.LoggerFactory
;
65 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
66 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.HashedWheelTimer
;
68 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
.AdminService
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
70 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
;
71 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.MasterService
;
74 * The implementation of AsyncConnection.
76 @InterfaceAudience.Private
77 public class AsyncConnectionImpl
implements AsyncConnection
{
79 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncConnectionImpl
.class);
81 static final HashedWheelTimer RETRY_TIMER
= new HashedWheelTimer(
82 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
83 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build(),
84 10, TimeUnit
.MILLISECONDS
);
86 private final Configuration conf
;
88 final AsyncConnectionConfiguration connConf
;
90 protected final User user
;
92 final ConnectionRegistry registry
;
94 protected final int rpcTimeout
;
96 protected final RpcClient rpcClient
;
98 final RpcControllerFactory rpcControllerFactory
;
100 private final AsyncRegionLocator locator
;
102 final AsyncRpcRetryingCallerFactory callerFactory
;
104 private final NonceGenerator nonceGenerator
;
106 private final ConcurrentMap
<String
, ClientService
.Interface
> rsStubs
= new ConcurrentHashMap
<>();
107 private final ConcurrentMap
<String
, AdminService
.Interface
> adminStubs
=
108 new ConcurrentHashMap
<>();
110 private final AtomicReference
<MasterService
.Interface
> masterStub
= new AtomicReference
<>();
112 private final AtomicReference
<CompletableFuture
<MasterService
.Interface
>> masterStubMakeFuture
=
113 new AtomicReference
<>();
115 private final Optional
<ServerStatisticTracker
> stats
;
116 private final ClientBackoffPolicy backoffPolicy
;
118 private ChoreService choreService
;
120 private final AtomicBoolean closed
= new AtomicBoolean(false);
122 private final Optional
<MetricsConnection
> metrics
;
124 private final ClusterStatusListener clusterStatusListener
;
126 private volatile ConnectionOverAsyncConnection conn
;
128 public AsyncConnectionImpl(Configuration conf
, ConnectionRegistry registry
, String clusterId
,
129 SocketAddress localAddress
, User user
) {
133 if (user
.isLoginFromKeytab()) {
134 spawnRenewalChore(user
.getUGI());
136 this.connConf
= new AsyncConnectionConfiguration(conf
);
137 this.registry
= registry
;
138 if (conf
.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY
, false)) {
139 this.metrics
= Optional
.of(new MetricsConnection(this.toString(), () -> null, () -> null));
141 this.metrics
= Optional
.empty();
144 RpcClientFactory
.createClient(conf
, clusterId
, localAddress
, metrics
.orElse(null));
145 this.rpcControllerFactory
= RpcControllerFactory
.instantiate(conf
);
147 (int) Math
.min(Integer
.MAX_VALUE
, TimeUnit
.NANOSECONDS
.toMillis(connConf
.getRpcTimeoutNs()));
148 this.locator
= new AsyncRegionLocator(this, RETRY_TIMER
);
149 this.callerFactory
= new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER
);
150 if (conf
.getBoolean(CLIENT_NONCES_ENABLED_KEY
, true)) {
151 nonceGenerator
= PerClientRandomNonceGenerator
.get();
153 nonceGenerator
= NO_NONCE_GENERATOR
;
155 this.stats
= Optional
.ofNullable(ServerStatisticTracker
.create(conf
));
156 this.backoffPolicy
= ClientBackoffPolicyFactory
.create(conf
);
157 ClusterStatusListener listener
= null;
158 if (conf
.getBoolean(STATUS_PUBLISHED
, STATUS_PUBLISHED_DEFAULT
)) {
159 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
160 // it in, just like clusterId. Not a big problem for now as the default value is false.
161 Class
<?
extends ClusterStatusListener
.Listener
> listenerClass
= conf
.getClass(
162 STATUS_LISTENER_CLASS
, DEFAULT_STATUS_LISTENER_CLASS
, ClusterStatusListener
.Listener
.class);
163 if (listenerClass
== null) {
164 LOG
.warn("{} is true, but {} is not set", STATUS_PUBLISHED
, STATUS_LISTENER_CLASS
);
167 listener
= new ClusterStatusListener(new ClusterStatusListener
.DeadServerHandler() {
169 public void newDead(ServerName sn
) {
170 locator
.clearCache(sn
);
171 rpcClient
.cancelConnections(sn
);
173 }, conf
, listenerClass
);
174 } catch (IOException e
) {
175 LOG
.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e
);
179 this.clusterStatusListener
= listener
;
182 private void spawnRenewalChore(final UserGroupInformation user
) {
183 ChoreService service
= getChoreService();
184 service
.scheduleChore(AuthUtil
.getAuthRenewalChore(user
, conf
));
188 * If choreService has not been created yet, create the ChoreService.
189 * @return ChoreService
191 synchronized ChoreService
getChoreService() {
193 throw new IllegalStateException("connection is already closed");
195 if (choreService
== null) {
196 choreService
= new ChoreService("AsyncConn Chore Service");
201 public User
getUser() {
205 public ConnectionRegistry
getConnectionRegistry() {
210 public Configuration
getConfiguration() {
215 public boolean isClosed() {
220 public void close() {
221 TraceUtil
.trace(() -> {
222 if (!closed
.compareAndSet(false, true)) {
225 LOG
.info("Connection has been closed by {}.", Thread
.currentThread().getName());
226 if (LOG
.isDebugEnabled()) {
227 logCallStack(Thread
.currentThread().getStackTrace());
229 IOUtils
.closeQuietly(clusterStatusListener
,
230 e
-> LOG
.warn("failed to close clusterStatusListener", e
));
231 IOUtils
.closeQuietly(rpcClient
, e
-> LOG
.warn("failed to close rpcClient", e
));
232 IOUtils
.closeQuietly(registry
, e
-> LOG
.warn("failed to close registry", e
));
233 synchronized (this) {
234 if (choreService
!= null) {
235 choreService
.shutdown();
239 metrics
.ifPresent(MetricsConnection
::shutdown
);
240 ConnectionOverAsyncConnection c
= this.conn
;
244 }, "AsyncConnection.close");
247 private void logCallStack(StackTraceElement
[] stackTraceElements
) {
248 StringBuilder stackBuilder
= new StringBuilder("Call stack:");
249 for (StackTraceElement element
: stackTraceElements
) {
250 stackBuilder
.append("\n at ");
251 stackBuilder
.append(element
);
253 stackBuilder
.append("\n");
254 LOG
.debug(stackBuilder
.toString());
258 public AsyncTableRegionLocator
getRegionLocator(TableName tableName
) {
259 return new AsyncTableRegionLocatorImpl(tableName
, this);
263 public void clearRegionLocationCache() {
264 locator
.clearCache();
267 // we will override this method for testing retry caller, so do not remove this method.
268 AsyncRegionLocator
getLocator() {
273 NonceGenerator
getNonceGenerator() {
274 return nonceGenerator
;
277 private ClientService
.Interface
createRegionServerStub(ServerName serverName
) throws IOException
{
278 return ClientService
.newStub(rpcClient
.createRpcChannel(serverName
, user
, rpcTimeout
));
281 ClientService
.Interface
getRegionServerStub(ServerName serverName
) throws IOException
{
282 return ConcurrentMapUtils
.computeIfAbsentEx(rsStubs
,
283 getStubKey(ClientService
.getDescriptor().getName(), serverName
),
284 () -> createRegionServerStub(serverName
));
287 private MasterService
.Interface
createMasterStub(ServerName serverName
) throws IOException
{
288 return MasterService
.newStub(rpcClient
.createRpcChannel(serverName
, user
, rpcTimeout
));
291 private AdminService
.Interface
createAdminServerStub(ServerName serverName
) throws IOException
{
292 return AdminService
.newStub(rpcClient
.createRpcChannel(serverName
, user
, rpcTimeout
));
295 AdminService
.Interface
getAdminStub(ServerName serverName
) throws IOException
{
296 return ConcurrentMapUtils
.computeIfAbsentEx(adminStubs
,
297 getStubKey(AdminService
.getDescriptor().getName(), serverName
),
298 () -> createAdminServerStub(serverName
));
301 CompletableFuture
<MasterService
.Interface
> getMasterStub() {
302 return ConnectionUtils
.getOrFetch(masterStub
, masterStubMakeFuture
, false, () -> {
303 CompletableFuture
<MasterService
.Interface
> future
= new CompletableFuture
<>();
304 addListener(registry
.getActiveMaster(), (addr
, error
) -> {
306 future
.completeExceptionally(error
);
307 } else if (addr
== null) {
308 future
.completeExceptionally(new MasterNotRunningException(
309 "ZooKeeper available but no active master location found"));
311 LOG
.debug("The fetched master address is {}", addr
);
313 future
.complete(createMasterStub(addr
));
314 } catch (IOException e
) {
315 future
.completeExceptionally(e
);
321 }, stub
-> true, "master stub");
324 String
getClusterId() {
326 return registry
.getClusterId().get();
327 } catch (InterruptedException
| ExecutionException e
) {
328 LOG
.error("Error fetching cluster ID: ", e
);
333 void clearMasterStubCache(MasterService
.Interface stub
) {
334 masterStub
.compareAndSet(stub
, null);
337 Optional
<ServerStatisticTracker
> getStatisticsTracker() {
341 ClientBackoffPolicy
getBackoffPolicy() {
342 return backoffPolicy
;
346 public AsyncTableBuilder
<AdvancedScanResultConsumer
> getTableBuilder(TableName tableName
) {
347 return new AsyncTableBuilderBase
<AdvancedScanResultConsumer
>(tableName
, connConf
) {
350 public AsyncTable
<AdvancedScanResultConsumer
> build() {
351 return new RawAsyncTableImpl(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
357 public AsyncTableBuilder
<ScanResultConsumer
> getTableBuilder(TableName tableName
,
358 ExecutorService pool
) {
359 return new AsyncTableBuilderBase
<ScanResultConsumer
>(tableName
, connConf
) {
362 public AsyncTable
<ScanResultConsumer
> build() {
363 RawAsyncTableImpl rawTable
=
364 new RawAsyncTableImpl(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
365 return new AsyncTableImpl(rawTable
, pool
);
371 public AsyncAdminBuilder
getAdminBuilder() {
372 return new AsyncAdminBuilderBase(connConf
) {
374 public AsyncAdmin
build() {
375 return new RawAsyncHBaseAdmin(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
381 public AsyncAdminBuilder
getAdminBuilder(ExecutorService pool
) {
382 return new AsyncAdminBuilderBase(connConf
) {
384 public AsyncAdmin
build() {
385 RawAsyncHBaseAdmin rawAdmin
=
386 new RawAsyncHBaseAdmin(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
387 return new AsyncHBaseAdmin(rawAdmin
, pool
);
393 public AsyncBufferedMutatorBuilder
getBufferedMutatorBuilder(TableName tableName
) {
394 return new AsyncBufferedMutatorBuilderImpl(connConf
, getTableBuilder(tableName
), RETRY_TIMER
);
398 public AsyncBufferedMutatorBuilder
getBufferedMutatorBuilder(TableName tableName
,
399 ExecutorService pool
) {
400 return new AsyncBufferedMutatorBuilderImpl(connConf
, getTableBuilder(tableName
, pool
),
405 public Connection
toConnection() {
406 ConnectionOverAsyncConnection c
= this.conn
;
410 synchronized (this) {
415 c
= new ConnectionOverAsyncConnection(this);
421 private Hbck
getHbckInternal(ServerName masterServer
) {
422 Span
.current().setAttribute(SERVER_NAME_KEY
, masterServer
.getServerName());
423 // we will not create a new connection when creating a new protobuf stub, and for hbck there
424 // will be no performance consideration, so for simplification we will create a new stub every
425 // time instead of caching the stub here.
426 return new HBaseHbck(MasterProtos
.HbckService
.newBlockingStub(
427 rpcClient
.createBlockingRpcChannel(masterServer
, user
, rpcTimeout
)), rpcControllerFactory
);
431 public CompletableFuture
<Hbck
> getHbck() {
432 return TraceUtil
.tracedFuture(() -> {
433 CompletableFuture
<Hbck
> future
= new CompletableFuture
<>();
434 addListener(registry
.getActiveMaster(), (sn
, error
) -> {
436 future
.completeExceptionally(error
);
438 future
.complete(getHbckInternal(sn
));
442 }, "AsyncConnection.getHbck");
446 public Hbck
getHbck(ServerName masterServer
) {
447 return TraceUtil
.trace(new Supplier
<Hbck
>() {
451 return getHbckInternal(masterServer
);
453 }, "AsyncConnection.getHbck");
456 Optional
<MetricsConnection
> getConnectionMetrics() {