HBASE-26474 Implement connection-level attributes (addendum)
[hbase.git] / hbase-client / src / main / java / org / apache / hadoop / hbase / client / AbstractRpcBasedConnectionRegistry.java
blob60137d23fff258cf6072346ae1808467ae69fd11
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
21 import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
22 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
24 import com.google.errorprone.annotations.RestrictedApi;
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.List;
29 import java.util.Set;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.ThreadLocalRandom;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.function.Predicate;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.HRegionLocation;
38 import org.apache.hadoop.hbase.RegionLocations;
39 import org.apache.hadoop.hbase.ServerName;
40 import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
41 import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
42 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
43 import org.apache.hadoop.hbase.ipc.RpcClient;
44 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
45 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46 import org.apache.hadoop.hbase.security.User;
47 import org.apache.hadoop.hbase.util.FutureUtils;
48 import org.apache.yetus.audience.InterfaceAudience;
50 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
51 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
52 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
53 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
55 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
56 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
57 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
58 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
59 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
60 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
61 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
62 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
64 /**
65 * Base class for rpc based connection registry implementation.
66 * <p/>
67 * The implementation needs a bootstrap node list in configuration, and then it will use the methods
68 * in {@link ClientMetaService} to refresh the connection registry end points.
69 * <p/>
70 * It also supports hedged reads, the default fan out value is 2.
71 * <p/>
72 * For the actual configuration names, see javadoc of sub classes.
74 @InterfaceAudience.Private
75 abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
77 /** Default value for the fan out of hedged requests. **/
78 public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
80 private final int hedgedReadFanOut;
82 // Configured list of end points to probe the meta information from.
83 private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
85 // RPC client used to talk to the masters.
86 private final RpcClient rpcClient;
87 private final RpcControllerFactory rpcControllerFactory;
88 private final int rpcTimeoutMs;
90 private final RegistryEndpointsRefresher registryEndpointRefresher;
92 protected AbstractRpcBasedConnectionRegistry(Configuration conf,
93 String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName,
94 String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName)
95 throws IOException {
96 this.hedgedReadFanOut =
97 Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
98 rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
99 conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
100 // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
101 // this through the master registry...
102 // This is a problem as we will use the cluster id to determine the authentication method
103 rpcClient = RpcClientFactory.createClient(conf, null);
104 rpcControllerFactory = RpcControllerFactory.instantiate(conf);
105 populateStubs(getBootstrapNodes(conf));
106 // could return null here is refresh interval is less than zero
107 registryEndpointRefresher =
108 RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName,
109 refreshIntervalSecsConfigName, minRefreshIntervalSecsConfigName, this::refreshStubs);
112 protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
114 protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
116 private void refreshStubs() throws IOException {
117 populateStubs(FutureUtils.get(fetchEndpoints()));
120 private void populateStubs(Set<ServerName> addrs) throws IOException {
121 Preconditions.checkNotNull(addrs);
122 ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
123 ImmutableMap.builderWithExpectedSize(addrs.size());
124 User user = User.getCurrent();
125 for (ServerName masterAddr : addrs) {
126 builder.put(masterAddr,
127 ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
129 addr2Stub = builder.build();
133 * For describing the actual asynchronous rpc call.
134 * <p/>
135 * Typically, you can use lambda expression to implement this interface as
137 * <pre>
138 * (c, s, d) -> s.xxx(c, your request here, d)
139 * </pre>
141 @FunctionalInterface
142 protected interface Callable<T> {
143 void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
146 private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
147 Callable<T> callable) {
148 HBaseRpcController controller = rpcControllerFactory.newController();
149 CompletableFuture<T> future = new CompletableFuture<>();
150 callable.call(controller, stub, resp -> {
151 if (controller.failed()) {
152 IOException failureReason = controller.getFailed();
153 future.completeExceptionally(failureReason);
154 if (ClientExceptionsUtil.isConnectionException(failureReason)) {
155 // RPC has failed, trigger a refresh of end points. We can have some spurious
156 // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
157 registryEndpointRefresher.refreshNow();
159 } else {
160 future.complete(resp);
163 return future;
166 private IOException badResponse(String debug) {
167 return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
171 * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
172 * we will complete the future and quit. If all the requests in one round are failed, we will
173 * start another round to send requests concurrently tohedgedReadsFanout end points. If all end
174 * points have been tried and all of them are failed, we will fail the future.
176 private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
177 List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
178 Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
179 int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
180 AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
181 for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
182 addListener(call(stubs.get(i), callable), (r, e) -> {
183 // a simple check to skip all the later operations earlier
184 if (future.isDone()) {
185 return;
187 if (e == null && !isValidResp.test(r)) {
188 e = badResponse(debug);
190 if (e != null) {
191 // make sure when remaining reaches 0 we have all exceptions in the errors queue
192 errors.add(e);
193 if (remaining.decrementAndGet() == 0) {
194 if (endIndexExclusive == stubs.size()) {
195 // we are done, complete the future with exception
196 RetriesExhaustedException ex =
197 new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
198 future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
199 } else {
200 groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
201 errors);
204 } else {
205 // do not need to decrement the counter any more as we have already finished the future.
206 future.complete(r);
212 protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
213 Predicate<T> isValidResp, String debug) {
214 ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
215 Set<ServerName> servers = addr2StubRef.keySet();
216 List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
217 Collections.shuffle(stubs, ThreadLocalRandom.current());
218 CompletableFuture<T> future = new CompletableFuture<>();
219 groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
220 new ConcurrentLinkedQueue<>());
221 return future;
224 @RestrictedApi(explanation = "Should only be called in tests", link = "",
225 allowedOnPath = ".*/src/test/.*")
226 Set<ServerName> getParsedServers() {
227 return addr2Stub.keySet();
231 * Simple helper to transform the result of getMetaRegionLocations() rpc.
233 private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
234 List<HRegionLocation> regionLocations = new ArrayList<>();
235 resp.getMetaLocationsList()
236 .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
237 return new RegionLocations(regionLocations);
240 @Override
241 public CompletableFuture<RegionLocations> getMetaRegionLocations() {
242 return tracedFuture(
243 () -> this
244 .<GetMetaRegionLocationsResponse> call(
245 (c, s, d) -> s.getMetaRegionLocations(c,
246 GetMetaRegionLocationsRequest.getDefaultInstance(), d),
247 r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
248 .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations),
249 getClass().getSimpleName() + ".getMetaRegionLocations");
252 @Override
253 public CompletableFuture<String> getClusterId() {
254 return tracedFuture(
255 () -> this
256 .<GetClusterIdResponse> call(
257 (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
258 GetClusterIdResponse::hasClusterId, "getClusterId()")
259 .thenApply(GetClusterIdResponse::getClusterId),
260 getClass().getSimpleName() + ".getClusterId");
263 @Override
264 public CompletableFuture<ServerName> getActiveMaster() {
265 return tracedFuture(
266 () -> this
267 .<GetActiveMasterResponse> call(
268 (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
269 GetActiveMasterResponse::hasServerName, "getActiveMaster()")
270 .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())),
271 getClass().getSimpleName() + ".getClusterId");
274 @Override
275 public void close() {
276 trace(() -> {
277 if (registryEndpointRefresher != null) {
278 registryEndpointRefresher.stop();
280 if (rpcClient != null) {
281 rpcClient.close();
283 }, getClass().getSimpleName() + ".close");