2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertThrows
;
23 import java
.io
.IOException
;
24 import java
.net
.SocketAddress
;
25 import java
.util
.Collections
;
27 import java
.util
.concurrent
.CompletableFuture
;
28 import java
.util
.concurrent
.ExecutorService
;
29 import java
.util
.concurrent
.Executors
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import java
.util
.stream
.Collectors
;
32 import java
.util
.stream
.IntStream
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseCommonTestingUtil
;
36 import org
.apache
.hadoop
.hbase
.ServerName
;
37 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
38 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
39 import org
.apache
.hadoop
.hbase
.security
.User
;
40 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.SmallTests
;
42 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
43 import org
.junit
.AfterClass
;
44 import org
.junit
.Before
;
45 import org
.junit
.BeforeClass
;
46 import org
.junit
.ClassRule
;
47 import org
.junit
.Test
;
48 import org
.junit
.experimental
.categories
.Category
;
49 import org
.slf4j
.Logger
;
50 import org
.slf4j
.LoggerFactory
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
54 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
55 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
56 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
57 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcChannel
;
58 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
60 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegistryProtos
.GetClusterIdResponse
;
62 @Category({ ClientTests
.class, SmallTests
.class })
63 public class TestRpcBasedRegistryHedgedReads
{
66 public static final HBaseClassTestRule CLASS_RULE
=
67 HBaseClassTestRule
.forClass(TestRpcBasedRegistryHedgedReads
.class);
69 private static final Logger LOG
= LoggerFactory
.getLogger(TestRpcBasedRegistryHedgedReads
.class);
71 private static final String HEDGED_REQS_FANOUT_CONFIG_NAME
= "hbase.test.hedged.reqs.fanout";
72 private static final String INITIAL_DELAY_SECS_CONFIG_NAME
=
73 "hbase.test.refresh.initial.delay.secs";
74 private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME
=
75 "hbase.test.refresh.interval.secs";
76 private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME
=
77 "hbase.test.min.refresh.interval.secs";
79 private static final HBaseCommonTestingUtil UTIL
= new HBaseCommonTestingUtil();
81 private static final ExecutorService EXECUTOR
=
82 Executors
.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
84 private static Set
<ServerName
> BOOTSTRAP_NODES
;
86 private static AtomicInteger CALLED
= new AtomicInteger(0);
88 private static volatile int BAD_RESP_INDEX
;
90 private static volatile Set
<Integer
> GOOD_RESP_INDEXS
;
92 private static GetClusterIdResponse RESP
=
93 GetClusterIdResponse
.newBuilder().setClusterId("id").build();
95 public static final class RpcClientImpl
implements RpcClient
{
97 public RpcClientImpl(Configuration configuration
, String clusterId
, SocketAddress localAddress
,
98 MetricsConnection metrics
) {
102 public BlockingRpcChannel
createBlockingRpcChannel(ServerName sn
, User user
, int rpcTimeout
) {
103 throw new UnsupportedOperationException();
107 public RpcChannel
createRpcChannel(ServerName sn
, User user
, int rpcTimeout
) {
108 return new RpcChannelImpl();
112 public void cancelConnections(ServerName sn
) {
116 public void close() {
120 public boolean hasCellBlockSupport() {
126 * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
127 * errors. All other RPCs are ignored.
129 public static final class RpcChannelImpl
implements RpcChannel
{
132 public void callMethod(MethodDescriptor method
, RpcController controller
, Message request
,
133 Message responsePrototype
, RpcCallback
<Message
> done
) {
134 if (!method
.getName().equals("GetClusterId")) {
135 // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
136 // fresh. We do not want to intercept those RPCs here and double count.
139 // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
140 EXECUTOR
.execute(() -> {
141 int index
= CALLED
.getAndIncrement();
142 if (index
== BAD_RESP_INDEX
) {
143 done
.run(GetClusterIdResponse
.getDefaultInstance());
144 } else if (GOOD_RESP_INDEXS
.contains(index
)) {
147 controller
.setFailed("inject error");
154 private AbstractRpcBasedConnectionRegistry
createRegistry(int hedged
) throws IOException
{
155 Configuration conf
= UTIL
.getConfiguration();
156 conf
.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME
, hedged
);
157 return new AbstractRpcBasedConnectionRegistry(conf
, HEDGED_REQS_FANOUT_CONFIG_NAME
,
158 INITIAL_DELAY_SECS_CONFIG_NAME
, REFRESH_INTERVAL_SECS_CONFIG_NAME
,
159 MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME
) {
162 protected Set
<ServerName
> getBootstrapNodes(Configuration conf
) throws IOException
{
163 return BOOTSTRAP_NODES
;
167 protected CompletableFuture
<Set
<ServerName
>> fetchEndpoints() {
168 return CompletableFuture
.completedFuture(BOOTSTRAP_NODES
);
171 @Override public String
getConnectionString() {
172 return "unimplemented";
178 public static void setUpBeforeClass() {
179 Configuration conf
= UTIL
.getConfiguration();
180 conf
.setClass(RpcClientFactory
.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY
, RpcClientImpl
.class,
182 // disable refresh, we do not need to refresh in this test
183 conf
.setLong(INITIAL_DELAY_SECS_CONFIG_NAME
, Integer
.MAX_VALUE
);
184 conf
.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME
, Integer
.MAX_VALUE
);
185 conf
.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME
, Integer
.MAX_VALUE
- 1);
186 BOOTSTRAP_NODES
= IntStream
.range(0, 10)
187 .mapToObj(i
-> ServerName
.valueOf("localhost", (10000 + 100 * i
), ServerName
.NON_STARTCODE
))
188 .collect(Collectors
.toSet());
192 public static void tearDownAfterClass() {
193 EXECUTOR
.shutdownNow();
197 public void setUp() {
200 GOOD_RESP_INDEXS
= Collections
.emptySet();
203 private <T
> T
logIfError(CompletableFuture
<T
> future
) throws IOException
{
205 return FutureUtils
.get(future
);
206 } catch (Throwable t
) {
213 public void testAllFailNoHedged() throws IOException
{
214 try (AbstractRpcBasedConnectionRegistry registry
= createRegistry(1)) {
215 assertThrows(IOException
.class, () -> logIfError(registry
.getClusterId()));
216 assertEquals(10, CALLED
.get());
221 public void testAllFailHedged3() throws IOException
{
223 try (AbstractRpcBasedConnectionRegistry registry
= createRegistry(3)) {
224 assertThrows(IOException
.class, () -> logIfError(registry
.getClusterId()));
225 assertEquals(10, CALLED
.get());
230 public void testFirstSucceededNoHedge() throws IOException
{
232 IntStream
.range(0, 10).mapToObj(Integer
::valueOf
).collect(Collectors
.toSet());
234 try (AbstractRpcBasedConnectionRegistry registry
= createRegistry(0)) {
235 String clusterId
= logIfError(registry
.getClusterId());
236 assertEquals(RESP
.getClusterId(), clusterId
);
237 assertEquals(1, CALLED
.get());
242 public void testSecondRoundSucceededHedge4() throws IOException
{
243 GOOD_RESP_INDEXS
= Collections
.singleton(6);
244 try (AbstractRpcBasedConnectionRegistry registry
= createRegistry(4)) {
245 String clusterId
= logIfError(registry
.getClusterId());
246 assertEquals(RESP
.getClusterId(), clusterId
);
247 UTIL
.waitFor(5000, () -> CALLED
.get() == 8);
252 public void testSucceededWithLargestHedged() throws IOException
, InterruptedException
{
253 GOOD_RESP_INDEXS
= Collections
.singleton(5);
254 try (AbstractRpcBasedConnectionRegistry registry
= createRegistry(Integer
.MAX_VALUE
)) {
255 String clusterId
= logIfError(registry
.getClusterId());
256 assertEquals(RESP
.getClusterId(), clusterId
);
257 UTIL
.waitFor(5000, () -> CALLED
.get() == 10);
259 // make sure we do not send more
260 assertEquals(10, CALLED
.get());