2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertFalse
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.net
.InetSocketAddress
;
24 import java
.net
.SocketAddress
;
25 import java
.net
.SocketTimeoutException
;
26 import java
.net
.UnknownHostException
;
27 import java
.util
.Random
;
28 import java
.util
.concurrent
.atomic
.AtomicInteger
;
29 import org
.apache
.hadoop
.conf
.Configuration
;
30 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
31 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
33 import org
.apache
.hadoop
.hbase
.HConstants
;
34 import org
.apache
.hadoop
.hbase
.MasterNotRunningException
;
35 import org
.apache
.hadoop
.hbase
.ServerName
;
36 import org
.apache
.hadoop
.hbase
.ipc
.AbstractRpcClient
;
37 import org
.apache
.hadoop
.hbase
.ipc
.BlockingRpcClient
;
38 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
39 import org
.apache
.hadoop
.hbase
.security
.User
;
40 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.BeforeClass
;
44 import org
.junit
.ClassRule
;
45 import org
.junit
.Test
;
46 import org
.junit
.experimental
.categories
.Category
;
48 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.BlockingRpcChannel
;
49 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Descriptors
.MethodDescriptor
;
50 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.Message
;
51 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.RpcController
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ServiceException
;
54 @Category({MediumTests
.class, ClientTests
.class})
55 public class TestClientTimeouts
{
58 public static final HBaseClassTestRule CLASS_RULE
=
59 HBaseClassTestRule
.forClass(TestClientTimeouts
.class);
61 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
62 protected static int SLAVES
= 1;
65 * @throws java.lang.Exception
68 public static void setUpBeforeClass() throws Exception
{
69 TEST_UTIL
.startMiniCluster(SLAVES
);
70 // Set the custom RPC client with random timeouts as the client
71 TEST_UTIL
.getConfiguration().set(
72 RpcClientFactory
.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY
,
73 RandomTimeoutRpcClient
.class.getName());
77 * @throws java.lang.Exception
80 public static void tearDownAfterClass() throws Exception
{
81 TEST_UTIL
.shutdownMiniCluster();
85 * Test that a client that fails an RPC to the master retries properly and
86 * doesn't throw any unexpected exceptions.
90 public void testAdminTimeout() throws Exception
{
91 boolean lastFailed
= false;
92 int initialInvocations
= RandomTimeoutBlockingRpcChannel
.invokations
.get();
93 RandomTimeoutRpcClient rpcClient
= (RandomTimeoutRpcClient
) RpcClientFactory
94 .createClient(TEST_UTIL
.getConfiguration(), TEST_UTIL
.getClusterKey());
97 for (int i
= 0; i
< 5 || (lastFailed
&& i
< 100); ++i
) {
99 // Ensure the HBaseAdmin uses a new connection by changing Configuration.
100 Configuration conf
= HBaseConfiguration
.create(TEST_UTIL
.getConfiguration());
101 conf
.set(HConstants
.HBASE_CLIENT_INSTANCE_ID
, String
.valueOf(-1));
103 Connection connection
= null;
105 connection
= ConnectionFactory
.createConnection(conf
);
106 admin
= connection
.getAdmin();
107 // run some admin commands
108 HBaseAdmin
.available(conf
);
109 admin
.balancerSwitch(false, false);
110 } catch (MasterNotRunningException ex
) {
111 // Since we are randomly throwing SocketTimeoutExceptions, it is possible to get
112 // a MasterNotRunningException. It's a bug if we get other exceptions.
117 if (admin
.getConnection().isClosed()) {
118 rpcClient
= (RandomTimeoutRpcClient
) RpcClientFactory
119 .createClient(TEST_UTIL
.getConfiguration(), TEST_UTIL
.getClusterKey());
122 if(connection
!= null) {
127 // Ensure the RandomTimeoutRpcEngine is actually being used.
128 assertFalse(lastFailed
);
129 assertTrue(RandomTimeoutBlockingRpcChannel
.invokations
.get() > initialInvocations
);
136 * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
138 public static class RandomTimeoutRpcClient
extends BlockingRpcClient
{
139 public RandomTimeoutRpcClient(Configuration conf
, String clusterId
, SocketAddress localAddr
,
140 MetricsConnection metrics
) {
141 super(conf
, clusterId
, localAddr
, metrics
);
144 // Return my own instance, one that does random timeouts
146 public BlockingRpcChannel
createBlockingRpcChannel(ServerName sn
,
147 User ticket
, int rpcTimeout
) throws UnknownHostException
{
148 return new RandomTimeoutBlockingRpcChannel(this, sn
, ticket
, rpcTimeout
);
153 * Blocking rpc channel that goes via hbase rpc.
155 static class RandomTimeoutBlockingRpcChannel
156 extends AbstractRpcClient
.BlockingRpcChannelImplementation
{
157 private static final Random RANDOM
= new Random(System
.currentTimeMillis());
158 public static final double CHANCE_OF_TIMEOUT
= 0.3;
159 private static AtomicInteger invokations
= new AtomicInteger();
161 RandomTimeoutBlockingRpcChannel(final BlockingRpcClient rpcClient
, final ServerName sn
,
162 final User ticket
, final int rpcTimeout
) {
163 super(rpcClient
, new InetSocketAddress(sn
.getHostname(), sn
.getPort()), ticket
, rpcTimeout
);
167 public Message
callBlockingMethod(MethodDescriptor md
,
168 RpcController controller
, Message param
, Message returnType
)
169 throws ServiceException
{
170 invokations
.getAndIncrement();
171 if (RANDOM
.nextFloat() < CHANCE_OF_TIMEOUT
) {
172 // throw a ServiceException, becuase that is the only exception type that
173 // {@link ProtobufRpcEngine} throws. If this RpcEngine is used with a different
174 // "actual" type, this may not properly mimic the underlying RpcEngine.
175 throw new ServiceException(new SocketTimeoutException("fake timeout"));
177 return super.callBlockingMethod(md
, controller
, param
, returnType
);