2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import java
.io
.IOException
;
21 import java
.net
.UnknownHostException
;
23 import java
.util
.concurrent
.CompletableFuture
;
24 import java
.util
.stream
.Collectors
;
25 import org
.apache
.commons
.lang3
.StringUtils
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
28 import org
.apache
.hadoop
.hbase
.ServerName
;
29 import org
.apache
.yetus
.audience
.InterfaceAudience
;
31 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Splitter
;
33 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
34 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.ClientMetaService
;
35 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetBootstrapNodesRequest
;
36 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetBootstrapNodesResponse
;
39 * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry
42 * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to
43 * refresh the bootstrap node list periodically.
45 * Usually, you could set masters as the bootstrap nodes,as they will also implement the
46 * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the
49 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.CONFIG
)
50 public class RpcConnectionRegistry
extends AbstractRpcBasedConnectionRegistry
{
52 /** Configuration key that controls the fan out of requests **/
53 public static final String HEDGED_REQS_FANOUT_KEY
= "hbase.client.bootstrap.hedged.fanout";
56 * As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
57 * possible that different end users will configure the same machine which makes the machine over
58 * load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
59 * the bootstrap nodes we want them to connect to.
61 * The default value for initial refresh delay is 1/10 of periodic refresh interval.
63 public static final String INITIAL_REFRESH_DELAY_SECS
=
64 "hbase.client.bootstrap.initial_refresh_delay_secs";
66 public static final String PERIODIC_REFRESH_INTERVAL_SECS
=
67 "hbase.client.bootstrap.refresh_interval_secs";
69 public static final String MIN_SECS_BETWEEN_REFRESHES
=
70 "hbase.client.bootstrap.min_secs_between_refreshes";
72 public static final String BOOTSTRAP_NODES
= "hbase.client.bootstrap.servers";
74 private static final char ADDRS_CONF_SEPARATOR
= ',';
76 private final String connectionString
;
78 RpcConnectionRegistry(Configuration conf
) throws IOException
{
79 super(conf
, HEDGED_REQS_FANOUT_KEY
, INITIAL_REFRESH_DELAY_SECS
, PERIODIC_REFRESH_INTERVAL_SECS
,
80 MIN_SECS_BETWEEN_REFRESHES
);
81 connectionString
= buildConnectionString(conf
);
84 private String
buildConnectionString(Configuration conf
) throws UnknownHostException
{
85 final String configuredBootstrapNodes
= conf
.get(BOOTSTRAP_NODES
);
86 if (StringUtils
.isBlank(configuredBootstrapNodes
)) {
87 return MasterRegistry
.getConnectionString(conf
);
89 return Splitter
.on(ADDRS_CONF_SEPARATOR
)
91 .splitToStream(configuredBootstrapNodes
)
92 .collect(Collectors
.joining(String
.valueOf(ADDRS_CONF_SEPARATOR
)));
96 protected Set
<ServerName
> getBootstrapNodes(Configuration conf
) throws IOException
{
97 // try get bootstrap nodes config first
98 String configuredBootstrapNodes
= conf
.get(BOOTSTRAP_NODES
);
99 if (!StringUtils
.isBlank(configuredBootstrapNodes
)) {
100 return Splitter
.on(ADDRS_CONF_SEPARATOR
).trimResults().splitToStream(configuredBootstrapNodes
)
101 .map(addr
-> ServerName
.valueOf(addr
, ServerName
.NON_STARTCODE
))
102 .collect(Collectors
.toSet());
104 // otherwise, just use master addresses
105 return MasterRegistry
.parseMasterAddrs(conf
);
110 public String
getConnectionString() {
111 return connectionString
;
114 private static Set
<ServerName
> transformServerNames(GetBootstrapNodesResponse resp
) {
115 return resp
.getServerNameList().stream().map(ProtobufUtil
::toServerName
)
116 .collect(Collectors
.toSet());
119 private CompletableFuture
<Set
<ServerName
>> getBootstrapNodes() {
121 .<GetBootstrapNodesResponse
> call(
122 (c
, s
, d
) -> s
.getBootstrapNodes(c
, GetBootstrapNodesRequest
.getDefaultInstance(), d
),
123 r
-> r
.getServerNameCount() != 0, "getBootstrapNodes()")
124 .thenApply(RpcConnectionRegistry
::transformServerNames
);
128 protected CompletableFuture
<Set
<ServerName
>> fetchEndpoints() {
129 return getBootstrapNodes();