HBASE-26474 Implement connection-level attributes (addendum)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / RpcConnectionRegistry.java
blob660d74e74c2895f2f857a01ef5e43f7c2f676fd1
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import java.io.IOException;
21 import java.net.UnknownHostException;
22 import java.util.Set;
23 import java.util.concurrent.CompletableFuture;
24 import java.util.stream.Collectors;
25 import org.apache.commons.lang3.StringUtils;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
28 import org.apache.hadoop.hbase.ServerName;
29 import org.apache.yetus.audience.InterfaceAudience;
31 import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
33 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
34 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
35 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest;
36 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse;
38 /**
39 * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry
40 * information.
41 * <p/>
42 * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to
43 * refresh the bootstrap node list periodically.
44 * <p/>
45 * Usually, you could set masters as the bootstrap nodes,as they will also implement the
46 * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the
47 * bootstrap nodes.
49 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
50 public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
52 /** Configuration key that controls the fan out of requests **/
53 public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.bootstrap.hedged.fanout";
55 /**
56 * As end users could configure any nodes in a cluster as the initial bootstrap nodes, it is
57 * possible that different end users will configure the same machine which makes the machine over
58 * load. So we should have a shorter delay for the initial refresh, to let users quickly switch to
59 * the bootstrap nodes we want them to connect to.
60 * <p/>
61 * The default value for initial refresh delay is 1/10 of periodic refresh interval.
63 public static final String INITIAL_REFRESH_DELAY_SECS =
64 "hbase.client.bootstrap.initial_refresh_delay_secs";
66 public static final String PERIODIC_REFRESH_INTERVAL_SECS =
67 "hbase.client.bootstrap.refresh_interval_secs";
69 public static final String MIN_SECS_BETWEEN_REFRESHES =
70 "hbase.client.bootstrap.min_secs_between_refreshes";
72 public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers";
74 private static final char ADDRS_CONF_SEPARATOR = ',';
76 private final String connectionString;
78 RpcConnectionRegistry(Configuration conf) throws IOException {
79 super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
80 MIN_SECS_BETWEEN_REFRESHES);
81 connectionString = buildConnectionString(conf);
84 private String buildConnectionString(Configuration conf) throws UnknownHostException {
85 final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
86 if (StringUtils.isBlank(configuredBootstrapNodes)) {
87 return MasterRegistry.getConnectionString(conf);
89 return Splitter.on(ADDRS_CONF_SEPARATOR)
90 .trimResults()
91 .splitToStream(configuredBootstrapNodes)
92 .collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR)));
95 @Override
96 protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
97 // try get bootstrap nodes config first
98 String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
99 if (!StringUtils.isBlank(configuredBootstrapNodes)) {
100 return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes)
101 .map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE))
102 .collect(Collectors.toSet());
103 } else {
104 // otherwise, just use master addresses
105 return MasterRegistry.parseMasterAddrs(conf);
109 @Override
110 public String getConnectionString() {
111 return connectionString;
114 private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
115 return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
116 .collect(Collectors.toSet());
119 private CompletableFuture<Set<ServerName>> getBootstrapNodes() {
120 return this
121 .<GetBootstrapNodesResponse> call(
122 (c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d),
123 r -> r.getServerNameCount() != 0, "getBootstrapNodes()")
124 .thenApply(RpcConnectionRegistry::transformServerNames);
127 @Override
128 protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
129 return getBootstrapNodes();