HBASE-26474 Implement connection-level attributes (addendum)
[hbase.git] / hbase-client / src / test / java / org / apache / hadoop / hbase / client / TestRpcBasedRegistryHedgedReads.java
blob146895aca166730d20853fda63db68f6fcf503c1
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertThrows;
23 import java.io.IOException;
24 import java.net.SocketAddress;
25 import java.util.Collections;
26 import java.util.Set;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.stream.Collectors;
32 import java.util.stream.IntStream;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseClassTestRule;
35 import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
36 import org.apache.hadoop.hbase.ServerName;
37 import org.apache.hadoop.hbase.ipc.RpcClient;
38 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
39 import org.apache.hadoop.hbase.security.User;
40 import org.apache.hadoop.hbase.testclassification.ClientTests;
41 import org.apache.hadoop.hbase.testclassification.SmallTests;
42 import org.apache.hadoop.hbase.util.FutureUtils;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.ClassRule;
47 import org.junit.Test;
48 import org.junit.experimental.categories.Category;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
53 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
54 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
55 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
56 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
57 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
58 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
60 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
62 @Category({ ClientTests.class, SmallTests.class })
63 public class TestRpcBasedRegistryHedgedReads {
65 @ClassRule
66 public static final HBaseClassTestRule CLASS_RULE =
67 HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class);
69 private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class);
71 private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout";
72 private static final String INITIAL_DELAY_SECS_CONFIG_NAME =
73 "hbase.test.refresh.initial.delay.secs";
74 private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME =
75 "hbase.test.refresh.interval.secs";
76 private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME =
77 "hbase.test.min.refresh.interval.secs";
79 private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil();
81 private static final ExecutorService EXECUTOR =
82 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
84 private static Set<ServerName> BOOTSTRAP_NODES;
86 private static AtomicInteger CALLED = new AtomicInteger(0);
88 private static volatile int BAD_RESP_INDEX;
90 private static volatile Set<Integer> GOOD_RESP_INDEXS;
92 private static GetClusterIdResponse RESP =
93 GetClusterIdResponse.newBuilder().setClusterId("id").build();
95 public static final class RpcClientImpl implements RpcClient {
97 public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
98 MetricsConnection metrics) {
101 @Override
102 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) {
103 throw new UnsupportedOperationException();
106 @Override
107 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
108 return new RpcChannelImpl();
111 @Override
112 public void cancelConnections(ServerName sn) {
115 @Override
116 public void close() {
119 @Override
120 public boolean hasCellBlockSupport() {
121 return false;
126 * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
127 * errors. All other RPCs are ignored.
129 public static final class RpcChannelImpl implements RpcChannel {
131 @Override
132 public void callMethod(MethodDescriptor method, RpcController controller, Message request,
133 Message responsePrototype, RpcCallback<Message> done) {
134 if (!method.getName().equals("GetClusterId")) {
135 // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
136 // fresh. We do not want to intercept those RPCs here and double count.
137 return;
139 // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
140 EXECUTOR.execute(() -> {
141 int index = CALLED.getAndIncrement();
142 if (index == BAD_RESP_INDEX) {
143 done.run(GetClusterIdResponse.getDefaultInstance());
144 } else if (GOOD_RESP_INDEXS.contains(index)) {
145 done.run(RESP);
146 } else {
147 controller.setFailed("inject error");
148 done.run(null);
154 private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException {
155 Configuration conf = UTIL.getConfiguration();
156 conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged);
157 return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME,
158 INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME,
159 MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) {
161 @Override
162 protected Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException {
163 return BOOTSTRAP_NODES;
166 @Override
167 protected CompletableFuture<Set<ServerName>> fetchEndpoints() {
168 return CompletableFuture.completedFuture(BOOTSTRAP_NODES);
171 @Override public String getConnectionString() {
172 return "unimplemented";
177 @BeforeClass
178 public static void setUpBeforeClass() {
179 Configuration conf = UTIL.getConfiguration();
180 conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
181 RpcClient.class);
182 // disable refresh, we do not need to refresh in this test
183 conf.setLong(INITIAL_DELAY_SECS_CONFIG_NAME, Integer.MAX_VALUE);
184 conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE);
185 conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1);
186 BOOTSTRAP_NODES = IntStream.range(0, 10)
187 .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE))
188 .collect(Collectors.toSet());
191 @AfterClass
192 public static void tearDownAfterClass() {
193 EXECUTOR.shutdownNow();
196 @Before
197 public void setUp() {
198 CALLED.set(0);
199 BAD_RESP_INDEX = -1;
200 GOOD_RESP_INDEXS = Collections.emptySet();
203 private <T> T logIfError(CompletableFuture<T> future) throws IOException {
204 try {
205 return FutureUtils.get(future);
206 } catch (Throwable t) {
207 LOG.warn("", t);
208 throw t;
212 @Test
213 public void testAllFailNoHedged() throws IOException {
214 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) {
215 assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
216 assertEquals(10, CALLED.get());
220 @Test
221 public void testAllFailHedged3() throws IOException {
222 BAD_RESP_INDEX = 5;
223 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) {
224 assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
225 assertEquals(10, CALLED.get());
229 @Test
230 public void testFirstSucceededNoHedge() throws IOException {
231 GOOD_RESP_INDEXS =
232 IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
233 // will be set to 1
234 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) {
235 String clusterId = logIfError(registry.getClusterId());
236 assertEquals(RESP.getClusterId(), clusterId);
237 assertEquals(1, CALLED.get());
241 @Test
242 public void testSecondRoundSucceededHedge4() throws IOException {
243 GOOD_RESP_INDEXS = Collections.singleton(6);
244 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) {
245 String clusterId = logIfError(registry.getClusterId());
246 assertEquals(RESP.getClusterId(), clusterId);
247 UTIL.waitFor(5000, () -> CALLED.get() == 8);
251 @Test
252 public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
253 GOOD_RESP_INDEXS = Collections.singleton(5);
254 try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) {
255 String clusterId = logIfError(registry.getClusterId());
256 assertEquals(RESP.getClusterId(), clusterId);
257 UTIL.waitFor(5000, () -> CALLED.get() == 10);
258 Thread.sleep(1000);
259 // make sure we do not send more
260 assertEquals(10, CALLED.get());