HBASE-25292 Improve InetSocketAddress usage discipline (#2669)
[hbase.git] / hbase-client / src / test / java / org / apache / hadoop / hbase / client / TestMasterRegistryHedgedReads.java
blob40a38c706a1047cf48e4215c17bfa15cf8106a67
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertThrows;
23 import java.io.IOException;
24 import java.net.SocketAddress;
25 import java.util.Collections;
26 import java.util.Set;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.stream.Collectors;
32 import java.util.stream.IntStream;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseClassTestRule;
35 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
36 import org.apache.hadoop.hbase.HConstants;
37 import org.apache.hadoop.hbase.ServerName;
38 import org.apache.hadoop.hbase.ipc.RpcClient;
39 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
40 import org.apache.hadoop.hbase.security.User;
41 import org.apache.hadoop.hbase.testclassification.ClientTests;
42 import org.apache.hadoop.hbase.testclassification.SmallTests;
43 import org.apache.hadoop.hbase.util.FutureUtils;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.ClassRule;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
54 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
55 import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
56 import org.apache.hbase.thirdparty.com.google.protobuf.Message;
57 import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
58 import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
59 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
61 import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
63 @Category({ ClientTests.class, SmallTests.class })
64 public class TestMasterRegistryHedgedReads {
66 @ClassRule
67 public static final HBaseClassTestRule CLASS_RULE =
68 HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class);
70 private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class);
72 private static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
74 private static final ExecutorService EXECUTOR =
75 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
77 private static AtomicInteger CALLED = new AtomicInteger(0);
79 private static volatile int BAD_RESP_INDEX;
81 private static volatile Set<Integer> GOOD_RESP_INDEXS;
83 private static GetClusterIdResponse RESP =
84 GetClusterIdResponse.newBuilder().setClusterId("id").build();
86 public static final class RpcClientImpl implements RpcClient {
88 public RpcClientImpl(Configuration configuration, String clusterId, SocketAddress localAddress,
89 MetricsConnection metrics) {
92 @Override
93 public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTimeout) {
94 throw new UnsupportedOperationException();
97 @Override
98 public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
99 return new RpcChannelImpl();
102 @Override
103 public void cancelConnections(ServerName sn) {
106 @Override
107 public void close() {
110 @Override
111 public boolean hasCellBlockSupport() {
112 return false;
117 * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
118 * errors. All other RPCs are ignored.
120 public static final class RpcChannelImpl implements RpcChannel {
122 @Override
123 public void callMethod(MethodDescriptor method, RpcController controller, Message request,
124 Message responsePrototype, RpcCallback<Message> done) {
125 if (!method.getName().equals("GetClusterId")) {
126 // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
127 // fresh. We do not want to intercept those RPCs here and double count.
128 return;
130 // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
131 EXECUTOR.execute(() -> {
132 int index = CALLED.getAndIncrement();
133 if (index == BAD_RESP_INDEX) {
134 done.run(GetClusterIdResponse.getDefaultInstance());
135 } else if (GOOD_RESP_INDEXS.contains(index)) {
136 done.run(RESP);
137 } else {
138 controller.setFailed("inject error");
139 done.run(null);
145 @BeforeClass
146 public static void setUpBeforeClass() {
147 Configuration conf = UTIL.getConfiguration();
148 conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class,
149 RpcClient.class);
150 String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i))
151 .collect(Collectors.joining(","));
152 conf.set(HConstants.MASTER_ADDRS_KEY, masters);
155 @AfterClass
156 public static void tearDownAfterClass() {
157 EXECUTOR.shutdownNow();
160 @Before
161 public void setUp() {
162 CALLED.set(0);
163 BAD_RESP_INDEX = -1;
164 GOOD_RESP_INDEXS = Collections.emptySet();
167 private <T> T logIfError(CompletableFuture<T> future) throws IOException {
168 try {
169 return FutureUtils.get(future);
170 } catch (Throwable t) {
171 LOG.warn("", t);
172 throw t;
176 @Test
177 public void testAllFailNoHedged() throws IOException {
178 Configuration conf = UTIL.getConfiguration();
179 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1);
180 try (MasterRegistry registry = new MasterRegistry(conf)) {
181 assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
182 assertEquals(10, CALLED.get());
186 @Test
187 public void testAllFailHedged3() throws IOException {
188 Configuration conf = UTIL.getConfiguration();
189 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3);
190 BAD_RESP_INDEX = 5;
191 try (MasterRegistry registry = new MasterRegistry(conf)) {
192 assertThrows(IOException.class, () -> logIfError(registry.getClusterId()));
193 assertEquals(10, CALLED.get());
197 @Test
198 public void testFirstSucceededNoHedge() throws IOException {
199 Configuration conf = UTIL.getConfiguration();
200 // will be set to 1
201 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0);
202 GOOD_RESP_INDEXS =
203 IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet());
204 try (MasterRegistry registry = new MasterRegistry(conf)) {
205 String clusterId = logIfError(registry.getClusterId());
206 assertEquals(RESP.getClusterId(), clusterId);
207 assertEquals(1, CALLED.get());
211 @Test
212 public void testSecondRoundSucceededHedge4() throws IOException {
213 Configuration conf = UTIL.getConfiguration();
214 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4);
215 GOOD_RESP_INDEXS = Collections.singleton(6);
216 try (MasterRegistry registry = new MasterRegistry(conf)) {
217 String clusterId = logIfError(registry.getClusterId());
218 assertEquals(RESP.getClusterId(), clusterId);
219 UTIL.waitFor(5000, () -> CALLED.get() == 8);
223 @Test
224 public void testSucceededWithLargestHedged() throws IOException, InterruptedException {
225 Configuration conf = UTIL.getConfiguration();
226 conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE);
227 GOOD_RESP_INDEXS = Collections.singleton(5);
228 try (MasterRegistry registry = new MasterRegistry(conf)) {
229 String clusterId = logIfError(registry.getClusterId());
230 assertEquals(RESP.getClusterId(), clusterId);
231 UTIL.waitFor(5000, () -> CALLED.get() == 10);
232 Thread.sleep(1000);
233 // make sure we do not send more
234 assertEquals(10, CALLED.get());