2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.trace
.TraceUtil
.trace
;
21 import static org
.apache
.hadoop
.hbase
.trace
.TraceUtil
.tracedFuture
;
22 import static org
.apache
.hadoop
.hbase
.util
.FutureUtils
.addListener
;
24 import com
.google
.errorprone
.annotations
.RestrictedApi
;
25 import java
.io
.IOException
;
26 import java
.util
.ArrayList
;
27 import java
.util
.Collections
;
28 import java
.util
.List
;
30 import java
.util
.concurrent
.CompletableFuture
;
31 import java
.util
.concurrent
.ConcurrentLinkedQueue
;
32 import java
.util
.concurrent
.ThreadLocalRandom
;
33 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import java
.util
.function
.Predicate
;
35 import org
.apache
.hadoop
.conf
.Configuration
;
36 import org
.apache
.hadoop
.hbase
.HConstants
;
37 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
38 import org
.apache
.hadoop
.hbase
.RegionLocations
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.exceptions
.ClientExceptionsUtil
;
41 import org
.apache
.hadoop
.hbase
.exceptions
.MasterRegistryFetchException
;
42 import org
.apache
.hadoop
.hbase
.ipc
.HBaseRpcController
;
43 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
44 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
45 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
46 import org
.apache
.hadoop
.hbase
.security
.User
;
47 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
48 import org
.apache
.yetus
.audience
.InterfaceAudience
;
50 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
51 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
55 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
56 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.ClientMetaService
;
57 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetActiveMasterRequest
;
58 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetActiveMasterResponse
;
59 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetClusterIdRequest
;
60 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetClusterIdResponse
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetMetaRegionLocationsRequest
;
62 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetMetaRegionLocationsResponse
;
65 * Base class for rpc based connection registry implementation.
67 * The implementation needs a bootstrap node list in configuration, and then it will use the methods
68 * in {@link ClientMetaService} to refresh the connection registry end points.
70 * It also supports hedged reads, the default fan out value is 2.
72 * For the actual configuration names, see javadoc of sub classes.
74 @InterfaceAudience.Private
75 abstract class AbstractRpcBasedConnectionRegistry
implements ConnectionRegistry
{
77 /** Default value for the fan out of hedged requests. **/
78 public static final int HEDGED_REQS_FANOUT_DEFAULT
= 2;
80 private final int hedgedReadFanOut
;
82 // Configured list of end points to probe the meta information from.
83 private volatile ImmutableMap
<ServerName
, ClientMetaService
.Interface
> addr2Stub
;
85 // RPC client used to talk to the masters.
86 private final RpcClient rpcClient
;
87 private final RpcControllerFactory rpcControllerFactory
;
88 private final int rpcTimeoutMs
;
90 private final RegistryEndpointsRefresher registryEndpointRefresher
;
92 protected AbstractRpcBasedConnectionRegistry(Configuration conf
,
93 String hedgedReqsFanoutConfigName
, String initialRefreshDelaySecsConfigName
,
94 String refreshIntervalSecsConfigName
, String minRefreshIntervalSecsConfigName
)
96 this.hedgedReadFanOut
=
97 Math
.max(1, conf
.getInt(hedgedReqsFanoutConfigName
, HEDGED_REQS_FANOUT_DEFAULT
));
98 rpcTimeoutMs
= (int) Math
.min(Integer
.MAX_VALUE
,
99 conf
.getLong(HConstants
.HBASE_RPC_TIMEOUT_KEY
, HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
));
100 // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
101 // this through the master registry...
102 // This is a problem as we will use the cluster id to determine the authentication method
103 rpcClient
= RpcClientFactory
.createClient(conf
, null);
104 rpcControllerFactory
= RpcControllerFactory
.instantiate(conf
);
105 populateStubs(getBootstrapNodes(conf
));
106 // could return null here is refresh interval is less than zero
107 registryEndpointRefresher
=
108 RegistryEndpointsRefresher
.create(conf
, initialRefreshDelaySecsConfigName
,
109 refreshIntervalSecsConfigName
, minRefreshIntervalSecsConfigName
, this::refreshStubs
);
112 protected abstract Set
<ServerName
> getBootstrapNodes(Configuration conf
) throws IOException
;
114 protected abstract CompletableFuture
<Set
<ServerName
>> fetchEndpoints();
116 private void refreshStubs() throws IOException
{
117 populateStubs(FutureUtils
.get(fetchEndpoints()));
120 private void populateStubs(Set
<ServerName
> addrs
) throws IOException
{
121 Preconditions
.checkNotNull(addrs
);
122 ImmutableMap
.Builder
<ServerName
, ClientMetaService
.Interface
> builder
=
123 ImmutableMap
.builderWithExpectedSize(addrs
.size());
124 User user
= User
.getCurrent();
125 for (ServerName masterAddr
: addrs
) {
126 builder
.put(masterAddr
,
127 ClientMetaService
.newStub(rpcClient
.createRpcChannel(masterAddr
, user
, rpcTimeoutMs
)));
129 addr2Stub
= builder
.build();
133 * For describing the actual asynchronous rpc call.
135 * Typically, you can use lambda expression to implement this interface as
138 * (c, s, d) -> s.xxx(c, your request here, d)
142 protected interface Callable
<T
> {
143 void call(HBaseRpcController controller
, ClientMetaService
.Interface stub
, RpcCallback
<T
> done
);
146 private <T
extends Message
> CompletableFuture
<T
> call(ClientMetaService
.Interface stub
,
147 Callable
<T
> callable
) {
148 HBaseRpcController controller
= rpcControllerFactory
.newController();
149 CompletableFuture
<T
> future
= new CompletableFuture
<>();
150 callable
.call(controller
, stub
, resp
-> {
151 if (controller
.failed()) {
152 IOException failureReason
= controller
.getFailed();
153 future
.completeExceptionally(failureReason
);
154 if (ClientExceptionsUtil
.isConnectionException(failureReason
)) {
155 // RPC has failed, trigger a refresh of end points. We can have some spurious
156 // refreshes, but that is okay since the RPC is not expensive and not in a hot path.
157 registryEndpointRefresher
.refreshNow();
160 future
.complete(resp
);
166 private IOException
badResponse(String debug
) {
167 return new IOException(String
.format("Invalid result for request %s. Will be retried", debug
));
171 * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
172 * we will complete the future and quit. If all the requests in one round are failed, we will
173 * start another round to send requests concurrently tohedgedReadsFanout end points. If all end
174 * points have been tried and all of them are failed, we will fail the future.
176 private <T
extends Message
> void groupCall(CompletableFuture
<T
> future
, Set
<ServerName
> servers
,
177 List
<ClientMetaService
.Interface
> stubs
, int startIndexInclusive
, Callable
<T
> callable
,
178 Predicate
<T
> isValidResp
, String debug
, ConcurrentLinkedQueue
<Throwable
> errors
) {
179 int endIndexExclusive
= Math
.min(startIndexInclusive
+ hedgedReadFanOut
, stubs
.size());
180 AtomicInteger remaining
= new AtomicInteger(endIndexExclusive
- startIndexInclusive
);
181 for (int i
= startIndexInclusive
; i
< endIndexExclusive
; i
++) {
182 addListener(call(stubs
.get(i
), callable
), (r
, e
) -> {
183 // a simple check to skip all the later operations earlier
184 if (future
.isDone()) {
187 if (e
== null && !isValidResp
.test(r
)) {
188 e
= badResponse(debug
);
191 // make sure when remaining reaches 0 we have all exceptions in the errors queue
193 if (remaining
.decrementAndGet() == 0) {
194 if (endIndexExclusive
== stubs
.size()) {
195 // we are done, complete the future with exception
196 RetriesExhaustedException ex
=
197 new RetriesExhaustedException("masters", stubs
.size(), new ArrayList
<>(errors
));
198 future
.completeExceptionally(new MasterRegistryFetchException(servers
, ex
));
200 groupCall(future
, servers
, stubs
, endIndexExclusive
, callable
, isValidResp
, debug
,
205 // do not need to decrement the counter any more as we have already finished the future.
212 protected final <T
extends Message
> CompletableFuture
<T
> call(Callable
<T
> callable
,
213 Predicate
<T
> isValidResp
, String debug
) {
214 ImmutableMap
<ServerName
, ClientMetaService
.Interface
> addr2StubRef
= addr2Stub
;
215 Set
<ServerName
> servers
= addr2StubRef
.keySet();
216 List
<ClientMetaService
.Interface
> stubs
= new ArrayList
<>(addr2StubRef
.values());
217 Collections
.shuffle(stubs
, ThreadLocalRandom
.current());
218 CompletableFuture
<T
> future
= new CompletableFuture
<>();
219 groupCall(future
, servers
, stubs
, 0, callable
, isValidResp
, debug
,
220 new ConcurrentLinkedQueue
<>());
224 @RestrictedApi(explanation
= "Should only be called in tests", link
= "",
225 allowedOnPath
= ".*/src/test/.*")
226 Set
<ServerName
> getParsedServers() {
227 return addr2Stub
.keySet();
231 * Simple helper to transform the result of getMetaRegionLocations() rpc.
233 private static RegionLocations
transformMetaRegionLocations(GetMetaRegionLocationsResponse resp
) {
234 List
<HRegionLocation
> regionLocations
= new ArrayList
<>();
235 resp
.getMetaLocationsList()
236 .forEach(location
-> regionLocations
.add(ProtobufUtil
.toRegionLocation(location
)));
237 return new RegionLocations(regionLocations
);
241 public CompletableFuture
<RegionLocations
> getMetaRegionLocations() {
244 .<GetMetaRegionLocationsResponse
> call(
245 (c
, s
, d
) -> s
.getMetaRegionLocations(c
,
246 GetMetaRegionLocationsRequest
.getDefaultInstance(), d
),
247 r
-> r
.getMetaLocationsCount() != 0, "getMetaLocationsCount")
248 .thenApply(AbstractRpcBasedConnectionRegistry
::transformMetaRegionLocations
),
249 getClass().getSimpleName() + ".getMetaRegionLocations");
253 public CompletableFuture
<String
> getClusterId() {
256 .<GetClusterIdResponse
> call(
257 (c
, s
, d
) -> s
.getClusterId(c
, GetClusterIdRequest
.getDefaultInstance(), d
),
258 GetClusterIdResponse
::hasClusterId
, "getClusterId()")
259 .thenApply(GetClusterIdResponse
::getClusterId
),
260 getClass().getSimpleName() + ".getClusterId");
264 public CompletableFuture
<ServerName
> getActiveMaster() {
267 .<GetActiveMasterResponse
> call(
268 (c
, s
, d
) -> s
.getActiveMaster(c
, GetActiveMasterRequest
.getDefaultInstance(), d
),
269 GetActiveMasterResponse
::hasServerName
, "getActiveMaster()")
270 .thenApply(resp
-> ProtobufUtil
.toServerName(resp
.getServerName())),
271 getClass().getSimpleName() + ".getClusterId");
275 public void close() {
277 if (registryEndpointRefresher
!= null) {
278 registryEndpointRefresher
.stop();
280 if (rpcClient
!= null) {
283 }, getClass().getSimpleName() + ".close");