2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertThrows
;
23 import java
.io
.IOException
;
24 import java
.net
.SocketAddress
;
25 import java
.util
.Collections
;
27 import java
.util
.concurrent
.CompletableFuture
;
28 import java
.util
.concurrent
.ExecutorService
;
29 import java
.util
.concurrent
.Executors
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import java
.util
.stream
.Collectors
;
32 import java
.util
.stream
.IntStream
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseCommonTestingUtility
;
36 import org
.apache
.hadoop
.hbase
.HConstants
;
37 import org
.apache
.hadoop
.hbase
.ServerName
;
38 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
39 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
40 import org
.apache
.hadoop
.hbase
.security
.User
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.SmallTests
;
43 import org
.apache
.hadoop
.hbase
.util
.FutureUtils
;
44 import org
.junit
.AfterClass
;
45 import org
.junit
.Before
;
46 import org
.junit
.BeforeClass
;
47 import org
.junit
.ClassRule
;
48 import org
.junit
.Test
;
49 import org
.junit
.experimental
.categories
.Category
;
50 import org
.slf4j
.Logger
;
51 import org
.slf4j
.LoggerFactory
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
54 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
55 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
56 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
57 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcCallback
;
58 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcChannel
;
59 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
61 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MasterProtos
.GetClusterIdResponse
;
63 @Category({ ClientTests
.class, SmallTests
.class })
64 public class TestMasterRegistryHedgedReads
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestMasterRegistryHedgedReads
.class);
70 private static final Logger LOG
= LoggerFactory
.getLogger(TestMasterRegistryHedgedReads
.class);
72 private static final HBaseCommonTestingUtility UTIL
= new HBaseCommonTestingUtility();
74 private static final ExecutorService EXECUTOR
=
75 Executors
.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build());
77 private static AtomicInteger CALLED
= new AtomicInteger(0);
79 private static volatile int BAD_RESP_INDEX
;
81 private static volatile Set
<Integer
> GOOD_RESP_INDEXS
;
83 private static GetClusterIdResponse RESP
=
84 GetClusterIdResponse
.newBuilder().setClusterId("id").build();
86 public static final class RpcClientImpl
implements RpcClient
{
88 public RpcClientImpl(Configuration configuration
, String clusterId
, SocketAddress localAddress
,
89 MetricsConnection metrics
) {
93 public BlockingRpcChannel
createBlockingRpcChannel(ServerName sn
, User user
, int rpcTimeout
) {
94 throw new UnsupportedOperationException();
98 public RpcChannel
createRpcChannel(ServerName sn
, User user
, int rpcTimeout
) {
99 return new RpcChannelImpl();
103 public void cancelConnections(ServerName sn
) {
107 public void close() {
111 public boolean hasCellBlockSupport() {
117 * A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
118 * errors. All other RPCs are ignored.
120 public static final class RpcChannelImpl
implements RpcChannel
{
123 public void callMethod(MethodDescriptor method
, RpcController controller
, Message request
,
124 Message responsePrototype
, RpcCallback
<Message
> done
) {
125 if (!method
.getName().equals("GetClusterId")) {
126 // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
127 // fresh. We do not want to intercept those RPCs here and double count.
130 // simulate the asynchronous behavior otherwise all logic will perform in the same thread...
131 EXECUTOR
.execute(() -> {
132 int index
= CALLED
.getAndIncrement();
133 if (index
== BAD_RESP_INDEX
) {
134 done
.run(GetClusterIdResponse
.getDefaultInstance());
135 } else if (GOOD_RESP_INDEXS
.contains(index
)) {
138 controller
.setFailed("inject error");
146 public static void setUpBeforeClass() {
147 Configuration conf
= UTIL
.getConfiguration();
148 conf
.setClass(RpcClientFactory
.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY
, RpcClientImpl
.class,
150 String masters
= IntStream
.range(0, 10).mapToObj(i
-> "localhost:" + (10000 + 100 * i
))
151 .collect(Collectors
.joining(","));
152 conf
.set(HConstants
.MASTER_ADDRS_KEY
, masters
);
156 public static void tearDownAfterClass() {
157 EXECUTOR
.shutdownNow();
161 public void setUp() {
164 GOOD_RESP_INDEXS
= Collections
.emptySet();
167 private <T
> T
logIfError(CompletableFuture
<T
> future
) throws IOException
{
169 return FutureUtils
.get(future
);
170 } catch (Throwable t
) {
177 public void testAllFailNoHedged() throws IOException
{
178 Configuration conf
= UTIL
.getConfiguration();
179 conf
.setInt(MasterRegistry
.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY
, 1);
180 try (MasterRegistry registry
= new MasterRegistry(conf
)) {
181 assertThrows(IOException
.class, () -> logIfError(registry
.getClusterId()));
182 assertEquals(10, CALLED
.get());
187 public void testAllFailHedged3() throws IOException
{
188 Configuration conf
= UTIL
.getConfiguration();
189 conf
.setInt(MasterRegistry
.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY
, 3);
191 try (MasterRegistry registry
= new MasterRegistry(conf
)) {
192 assertThrows(IOException
.class, () -> logIfError(registry
.getClusterId()));
193 assertEquals(10, CALLED
.get());
198 public void testFirstSucceededNoHedge() throws IOException
{
199 Configuration conf
= UTIL
.getConfiguration();
201 conf
.setInt(MasterRegistry
.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY
, 0);
203 IntStream
.range(0, 10).mapToObj(Integer
::valueOf
).collect(Collectors
.toSet());
204 try (MasterRegistry registry
= new MasterRegistry(conf
)) {
205 String clusterId
= logIfError(registry
.getClusterId());
206 assertEquals(RESP
.getClusterId(), clusterId
);
207 assertEquals(1, CALLED
.get());
212 public void testSecondRoundSucceededHedge4() throws IOException
{
213 Configuration conf
= UTIL
.getConfiguration();
214 conf
.setInt(MasterRegistry
.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY
, 4);
215 GOOD_RESP_INDEXS
= Collections
.singleton(6);
216 try (MasterRegistry registry
= new MasterRegistry(conf
)) {
217 String clusterId
= logIfError(registry
.getClusterId());
218 assertEquals(RESP
.getClusterId(), clusterId
);
219 UTIL
.waitFor(5000, () -> CALLED
.get() == 8);
224 public void testSucceededWithLargestHedged() throws IOException
, InterruptedException
{
225 Configuration conf
= UTIL
.getConfiguration();
226 conf
.setInt(MasterRegistry
.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY
, Integer
.MAX_VALUE
);
227 GOOD_RESP_INDEXS
= Collections
.singleton(5);
228 try (MasterRegistry registry
= new MasterRegistry(conf
)) {
229 String clusterId
= logIfError(registry
.getClusterId());
230 assertEquals(RESP
.getClusterId(), clusterId
);
231 UTIL
.waitFor(5000, () -> CALLED
.get() == 10);
233 // make sure we do not send more
234 assertEquals(10, CALLED
.get());