2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.STATUS_PUBLISHED
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.STATUS_PUBLISHED_DEFAULT
;
22 import static org
.apache
.hadoop
.hbase
.client
.ClusterStatusListener
.DEFAULT_STATUS_LISTENER_CLASS
;
23 import static org
.apache
.hadoop
.hbase
.client
.ClusterStatusListener
.STATUS_LISTENER_CLASS
;
24 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.NO_NONCE_GENERATOR
;
25 import static org
.apache
.hadoop
.hbase
.client
.ConnectionUtils
.getStubKey
;
26 import static org
.apache
.hadoop
.hbase
.client
.MetricsConnection
.CLIENT_SIDE_METRICS_ENABLED_KEY
;
27 import static org
.apache
.hadoop
.hbase
.client
.NonceGenerator
.CLIENT_NONCES_ENABLED_KEY
;
28 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
30 import io
.opentelemetry
.api
.trace
.Span
;
31 import io
.opentelemetry
.context
.Scope
;
32 import java
.io
.IOException
;
33 import java
.net
.SocketAddress
;
34 import java
.util
.Optional
;
35 import java
.util
.concurrent
.CompletableFuture
;
36 import java
.util
.concurrent
.ConcurrentHashMap
;
37 import java
.util
.concurrent
.ConcurrentMap
;
38 import java
.util
.concurrent
.ExecutionException
;
39 import java
.util
.concurrent
.ExecutorService
;
40 import java
.util
.concurrent
.TimeUnit
;
41 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
42 import java
.util
.concurrent
.atomic
.AtomicReference
;
43 import org
.apache
.commons
.io
.IOUtils
;
44 import org
.apache
.hadoop
.conf
.Configuration
;
45 import org
.apache
.hadoop
.hbase
.AuthUtil
;
46 import org
.apache
.hadoop
.hbase
.ChoreService
;
47 import org
.apache
.hadoop
.hbase
.MasterNotRunningException
;
48 import org
.apache
.hadoop
.hbase
.ServerName
;
49 import org
.apache
.hadoop
.hbase
.TableName
;
50 import org
.apache
.hadoop
.hbase
.client
.backoff
.ClientBackoffPolicy
;
51 import org
.apache
.hadoop
.hbase
.client
.backoff
.ClientBackoffPolicyFactory
;
52 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
53 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
54 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
55 import org
.apache
.hadoop
.hbase
.security
.User
;
56 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
57 import org
.apache
.hadoop
.hbase
.util
.ConcurrentMapUtils
;
58 import org
.apache
.hadoop
.hbase
.util
.Threads
;
59 import org
.apache
.hadoop
.security
.UserGroupInformation
;
60 import org
.apache
.yetus
.audience
.InterfaceAudience
;
61 import org
.slf4j
.Logger
;
62 import org
.slf4j
.LoggerFactory
;
64 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
65 import org
.apache
.hbase
.thirdparty
.io
.netty
.util
.HashedWheelTimer
;
67 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.AdminProtos
.AdminService
;
68 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.ClientService
;
69 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
;
70 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.MasterService
;
73 * The implementation of AsyncConnection.
75 @InterfaceAudience.Private
76 class AsyncConnectionImpl
implements AsyncConnection
{
78 private static final Logger LOG
= LoggerFactory
.getLogger(AsyncConnectionImpl
.class);
80 static final HashedWheelTimer RETRY_TIMER
= new HashedWheelTimer(
81 new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
82 .setUncaughtExceptionHandler(Threads
.LOGGING_EXCEPTION_HANDLER
).build(),
83 10, TimeUnit
.MILLISECONDS
);
85 private final Configuration conf
;
87 final AsyncConnectionConfiguration connConf
;
89 private final User user
;
91 final ConnectionRegistry registry
;
93 private final int rpcTimeout
;
95 protected final RpcClient rpcClient
;
97 final RpcControllerFactory rpcControllerFactory
;
99 private final AsyncRegionLocator locator
;
101 final AsyncRpcRetryingCallerFactory callerFactory
;
103 private final NonceGenerator nonceGenerator
;
105 private final ConcurrentMap
<String
, ClientService
.Interface
> rsStubs
= new ConcurrentHashMap
<>();
106 private final ConcurrentMap
<String
, AdminService
.Interface
> adminSubs
= new ConcurrentHashMap
<>();
108 private final AtomicReference
<MasterService
.Interface
> masterStub
= new AtomicReference
<>();
110 private final AtomicReference
<CompletableFuture
<MasterService
.Interface
>> masterStubMakeFuture
=
111 new AtomicReference
<>();
113 private final Optional
<ServerStatisticTracker
> stats
;
114 private final ClientBackoffPolicy backoffPolicy
;
116 private ChoreService choreService
;
118 private final AtomicBoolean closed
= new AtomicBoolean(false);
120 private final Optional
<MetricsConnection
> metrics
;
122 private final ClusterStatusListener clusterStatusListener
;
124 private volatile ConnectionOverAsyncConnection conn
;
126 public AsyncConnectionImpl(Configuration conf
, ConnectionRegistry registry
, String clusterId
,
127 SocketAddress localAddress
, User user
) {
131 if (user
.isLoginFromKeytab()) {
132 spawnRenewalChore(user
.getUGI());
134 this.connConf
= new AsyncConnectionConfiguration(conf
);
135 this.registry
= registry
;
136 if (conf
.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY
, false)) {
137 this.metrics
= Optional
.of(new MetricsConnection(this.toString(), () -> null, () -> null));
139 this.metrics
= Optional
.empty();
142 RpcClientFactory
.createClient(conf
, clusterId
, localAddress
, metrics
.orElse(null));
143 this.rpcControllerFactory
= RpcControllerFactory
.instantiate(conf
);
145 (int) Math
.min(Integer
.MAX_VALUE
, TimeUnit
.NANOSECONDS
.toMillis(connConf
.getRpcTimeoutNs()));
146 this.locator
= new AsyncRegionLocator(this, RETRY_TIMER
);
147 this.callerFactory
= new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER
);
148 if (conf
.getBoolean(CLIENT_NONCES_ENABLED_KEY
, true)) {
149 nonceGenerator
= PerClientRandomNonceGenerator
.get();
151 nonceGenerator
= NO_NONCE_GENERATOR
;
153 this.stats
= Optional
.ofNullable(ServerStatisticTracker
.create(conf
));
154 this.backoffPolicy
= ClientBackoffPolicyFactory
.create(conf
);
155 ClusterStatusListener listener
= null;
156 if (conf
.getBoolean(STATUS_PUBLISHED
, STATUS_PUBLISHED_DEFAULT
)) {
157 // TODO: this maybe a blocking operation, better to create it outside the constructor and pass
158 // it in, just like clusterId. Not a big problem for now as the default value is false.
159 Class
<?
extends ClusterStatusListener
.Listener
> listenerClass
= conf
.getClass(
160 STATUS_LISTENER_CLASS
, DEFAULT_STATUS_LISTENER_CLASS
, ClusterStatusListener
.Listener
.class);
161 if (listenerClass
== null) {
162 LOG
.warn("{} is true, but {} is not set", STATUS_PUBLISHED
, STATUS_LISTENER_CLASS
);
165 listener
= new ClusterStatusListener(new ClusterStatusListener
.DeadServerHandler() {
167 public void newDead(ServerName sn
) {
168 locator
.clearCache(sn
);
169 rpcClient
.cancelConnections(sn
);
171 }, conf
, listenerClass
);
172 } catch (IOException e
) {
173 LOG
.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", e
);
177 this.clusterStatusListener
= listener
;
180 private void spawnRenewalChore(final UserGroupInformation user
) {
181 ChoreService service
= getChoreService();
182 service
.scheduleChore(AuthUtil
.getAuthRenewalChore(user
));
186 * If choreService has not been created yet, create the ChoreService.
187 * @return ChoreService
189 synchronized ChoreService
getChoreService() {
191 throw new IllegalStateException("connection is already closed");
193 if (choreService
== null) {
194 choreService
= new ChoreService("AsyncConn Chore Service");
200 public Configuration
getConfiguration() {
205 public boolean isClosed() {
210 public void close() {
211 TraceUtil
.trace(() -> {
212 if (!closed
.compareAndSet(false, true)) {
215 LOG
.info("Connection has been closed by {}.", Thread
.currentThread().getName());
216 if (LOG
.isDebugEnabled()) {
217 logCallStack(Thread
.currentThread().getStackTrace());
219 IOUtils
.closeQuietly(clusterStatusListener
,
220 e
-> LOG
.warn("failed to close clusterStatusListener", e
));
221 IOUtils
.closeQuietly(rpcClient
, e
-> LOG
.warn("failed to close rpcClient", e
));
222 IOUtils
.closeQuietly(registry
, e
-> LOG
.warn("failed to close registry", e
));
223 synchronized (this) {
224 if (choreService
!= null) {
225 choreService
.shutdown();
229 metrics
.ifPresent(MetricsConnection
::shutdown
);
230 ConnectionOverAsyncConnection c
= this.conn
;
234 }, "AsyncConnection.close");
237 private void logCallStack(StackTraceElement
[] stackTraceElements
) {
238 StringBuilder stackBuilder
= new StringBuilder("Call stack:");
239 for (StackTraceElement element
: stackTraceElements
) {
240 stackBuilder
.append("\n at ");
241 stackBuilder
.append(element
);
243 stackBuilder
.append("\n");
244 LOG
.debug(stackBuilder
.toString());
248 public AsyncTableRegionLocator
getRegionLocator(TableName tableName
) {
249 return new AsyncTableRegionLocatorImpl(tableName
, this);
253 public void clearRegionLocationCache() {
254 locator
.clearCache();
257 // we will override this method for testing retry caller, so do not remove this method.
258 AsyncRegionLocator
getLocator() {
263 NonceGenerator
getNonceGenerator() {
264 return nonceGenerator
;
267 private ClientService
.Interface
createRegionServerStub(ServerName serverName
) throws IOException
{
268 return ClientService
.newStub(rpcClient
.createRpcChannel(serverName
, user
, rpcTimeout
));
271 ClientService
.Interface
getRegionServerStub(ServerName serverName
) throws IOException
{
272 return ConcurrentMapUtils
.computeIfAbsentEx(rsStubs
,
273 getStubKey(ClientService
.getDescriptor().getName(), serverName
),
274 () -> createRegionServerStub(serverName
));
277 private MasterService
.Interface
createMasterStub(ServerName serverName
) throws IOException
{
278 return MasterService
.newStub(rpcClient
.createRpcChannel(serverName
, user
, rpcTimeout
));
281 private AdminService
.Interface
createAdminServerStub(ServerName serverName
) throws IOException
{
282 return AdminService
.newStub(rpcClient
.createRpcChannel(serverName
, user
, rpcTimeout
));
285 AdminService
.Interface
getAdminStub(ServerName serverName
) throws IOException
{
286 return ConcurrentMapUtils
.computeIfAbsentEx(adminSubs
,
287 getStubKey(AdminService
.getDescriptor().getName(), serverName
),
288 () -> createAdminServerStub(serverName
));
291 CompletableFuture
<MasterService
.Interface
> getMasterStub() {
292 return ConnectionUtils
.getOrFetch(masterStub
, masterStubMakeFuture
, false, () -> {
293 CompletableFuture
<MasterService
.Interface
> future
= new CompletableFuture
<>();
294 addListener(registry
.getActiveMaster(), (addr
, error
) -> {
296 future
.completeExceptionally(error
);
297 } else if (addr
== null) {
298 future
.completeExceptionally(new MasterNotRunningException(
299 "ZooKeeper available but no active master location found"));
301 LOG
.debug("The fetched master address is {}", addr
);
303 future
.complete(createMasterStub(addr
));
304 } catch (IOException e
) {
305 future
.completeExceptionally(e
);
311 }, stub
-> true, "master stub");
314 String
getClusterId() {
316 return registry
.getClusterId().get();
317 } catch (InterruptedException
| ExecutionException e
) {
318 LOG
.error("Error fetching cluster ID: ", e
);
323 void clearMasterStubCache(MasterService
.Interface stub
) {
324 masterStub
.compareAndSet(stub
, null);
327 Optional
<ServerStatisticTracker
> getStatisticsTracker() {
331 ClientBackoffPolicy
getBackoffPolicy() {
332 return backoffPolicy
;
336 public AsyncTableBuilder
<AdvancedScanResultConsumer
> getTableBuilder(TableName tableName
) {
337 return new AsyncTableBuilderBase
<AdvancedScanResultConsumer
>(tableName
, connConf
) {
340 public AsyncTable
<AdvancedScanResultConsumer
> build() {
341 return new RawAsyncTableImpl(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
347 public AsyncTableBuilder
<ScanResultConsumer
> getTableBuilder(TableName tableName
,
348 ExecutorService pool
) {
349 return new AsyncTableBuilderBase
<ScanResultConsumer
>(tableName
, connConf
) {
352 public AsyncTable
<ScanResultConsumer
> build() {
353 RawAsyncTableImpl rawTable
=
354 new RawAsyncTableImpl(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
355 return new AsyncTableImpl(AsyncConnectionImpl
.this, rawTable
, pool
);
361 public AsyncAdminBuilder
getAdminBuilder() {
362 return new AsyncAdminBuilderBase(connConf
) {
364 public AsyncAdmin
build() {
365 return new RawAsyncHBaseAdmin(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
371 public AsyncAdminBuilder
getAdminBuilder(ExecutorService pool
) {
372 return new AsyncAdminBuilderBase(connConf
) {
374 public AsyncAdmin
build() {
375 RawAsyncHBaseAdmin rawAdmin
=
376 new RawAsyncHBaseAdmin(AsyncConnectionImpl
.this, RETRY_TIMER
, this);
377 return new AsyncHBaseAdmin(rawAdmin
, pool
);
383 public AsyncBufferedMutatorBuilder
getBufferedMutatorBuilder(TableName tableName
) {
384 return new AsyncBufferedMutatorBuilderImpl(connConf
, getTableBuilder(tableName
), RETRY_TIMER
);
388 public AsyncBufferedMutatorBuilder
getBufferedMutatorBuilder(TableName tableName
,
389 ExecutorService pool
) {
390 return new AsyncBufferedMutatorBuilderImpl(connConf
, getTableBuilder(tableName
, pool
),
395 public Connection
toConnection() {
396 ConnectionOverAsyncConnection c
= this.conn
;
400 synchronized (this) {
405 c
= new ConnectionOverAsyncConnection(this);
412 public CompletableFuture
<Hbck
> getHbck() {
413 return TraceUtil
.tracedFuture(() -> {
414 CompletableFuture
<Hbck
> future
= new CompletableFuture
<>();
415 addListener(registry
.getActiveMaster(), (sn
, error
) -> {
417 future
.completeExceptionally(error
);
420 future
.complete(getHbck(sn
));
421 } catch (IOException e
) {
422 future
.completeExceptionally(e
);
427 }, "AsyncConnection.getHbck");
431 public Hbck
getHbck(ServerName masterServer
) throws IOException
{
432 Span span
= TraceUtil
.createSpan("AsyncConnection.getHbck")
433 .setAttribute(TraceUtil
.SERVER_NAME_KEY
, masterServer
.getServerName());
434 try (Scope scope
= span
.makeCurrent()) {
435 // we will not create a new connection when creating a new protobuf stub, and for hbck there
436 // will be no performance consideration, so for simplification we will create a new stub every
437 // time instead of caching the stub here.
438 return new HBaseHbck(
439 MasterProtos
.HbckService
440 .newBlockingStub(rpcClient
.createBlockingRpcChannel(masterServer
, user
, rpcTimeout
)),
441 rpcControllerFactory
);
445 Optional
<MetricsConnection
> getConnectionMetrics() {