2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.NamespaceDescriptor
.DEFAULT_NAMESPACE_NAME_STR
;
21 import static org
.apache
.hadoop
.hbase
.client
.AsyncConnectionConfiguration
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
22 import static org
.junit
.Assert
.fail
;
24 import java
.io
.IOException
;
25 import java
.util
.Arrays
;
26 import java
.util
.List
;
27 import java
.util
.Optional
;
28 import java
.util
.concurrent
.ForkJoinPool
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.concurrent
.atomic
.AtomicLong
;
31 import java
.util
.function
.Supplier
;
32 import org
.apache
.commons
.io
.IOUtils
;
33 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
34 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
35 import org
.apache
.hadoop
.hbase
.HConstants
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
37 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterCoprocessor
;
38 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterCoprocessorEnvironment
;
39 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterObserver
;
40 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
43 import org
.apache
.hadoop
.hbase
.util
.Threads
;
44 import org
.junit
.After
;
45 import org
.junit
.Before
;
46 import org
.junit
.ClassRule
;
47 import org
.junit
.Test
;
48 import org
.junit
.experimental
.categories
.Category
;
49 import org
.junit
.runner
.RunWith
;
50 import org
.junit
.runners
.Parameterized
;
51 import org
.junit
.runners
.Parameterized
.Parameter
;
52 import org
.junit
.runners
.Parameterized
.Parameters
;
53 import org
.slf4j
.Logger
;
54 import org
.slf4j
.LoggerFactory
;
56 @RunWith(Parameterized
.class)
57 @Category({ LargeTests
.class, ClientTests
.class })
58 public class TestAsyncAdminBuilder
{
61 public static final HBaseClassTestRule CLASS_RULE
=
62 HBaseClassTestRule
.forClass(TestAsyncAdminBuilder
.class);
64 private static final Logger LOG
= LoggerFactory
.getLogger(TestAsyncAdminBuilder
.class);
65 private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
66 private static AsyncConnection ASYNC_CONN
;
69 public Supplier
<AsyncAdminBuilder
> getAdminBuilder
;
71 private static AsyncAdminBuilder
getRawAsyncAdminBuilder() {
72 return ASYNC_CONN
.getAdminBuilder();
75 private static AsyncAdminBuilder
getAsyncAdminBuilder() {
76 return ASYNC_CONN
.getAdminBuilder(ForkJoinPool
.commonPool());
80 public static List
<Object
[]> params() {
81 return Arrays
.asList(new Supplier
<?
>[] { TestAsyncAdminBuilder
::getRawAsyncAdminBuilder
},
82 new Supplier
<?
>[] { TestAsyncAdminBuilder
::getAsyncAdminBuilder
});
85 private static final int DEFAULT_RPC_TIMEOUT
= 10000;
86 private static final int DEFAULT_OPERATION_TIMEOUT
= 30000;
87 private static final int DEFAULT_RETRIES_NUMBER
= 2;
90 public void setUp() throws Exception
{
91 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, DEFAULT_RPC_TIMEOUT
);
92 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
,
93 DEFAULT_OPERATION_TIMEOUT
);
94 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
95 DEFAULT_RETRIES_NUMBER
);
96 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
100 public void tearDown() throws Exception
{
101 IOUtils
.closeQuietly(ASYNC_CONN
);
102 TEST_UTIL
.shutdownMiniCluster();
106 public void testRpcTimeout() throws Exception
{
107 TEST_UTIL
.getConfiguration().set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
,
108 TestRpcTimeoutCoprocessor
.class.getName());
109 TEST_UTIL
.startMiniCluster(2);
110 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
113 getAdminBuilder
.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT
/ 2, TimeUnit
.MILLISECONDS
).build()
114 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
115 fail("We expect an exception here");
116 } catch (Exception e
) {
121 getAdminBuilder
.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT
* 2, TimeUnit
.MILLISECONDS
).build()
122 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
123 } catch (Exception e
) {
124 fail("The Operation should succeed, unexpected exception: " + e
.getMessage());
129 public void testOperationTimeout() throws Exception
{
130 // set retry number to 100 to make sure that this test only be affected by operation timeout
131 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 100);
132 TEST_UTIL
.getConfiguration().set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
,
133 TestOperationTimeoutCoprocessor
.class.getName());
134 TEST_UTIL
.startMiniCluster(2);
135 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
138 getAdminBuilder
.get()
139 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT
/ 2, TimeUnit
.MILLISECONDS
).build()
140 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
141 fail("We expect an exception here");
142 } catch (Exception e
) {
147 getAdminBuilder
.get()
148 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT
* 2, TimeUnit
.MILLISECONDS
).build()
149 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
150 } catch (Exception e
) {
151 fail("The Operation should succeed, unexpected exception: " + e
.getMessage());
156 public void testMaxRetries() throws Exception
{
157 // set operation timeout to 300s to make sure that this test only be affected by retry number
158 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 300000);
159 TEST_UTIL
.getConfiguration().set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
,
160 TestMaxRetriesCoprocessor
.class.getName());
161 TEST_UTIL
.startMiniCluster(2);
162 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
165 getAdminBuilder
.get().setMaxRetries(DEFAULT_RETRIES_NUMBER
/ 2).build()
166 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
167 fail("We expect an exception here");
168 } catch (Exception e
) {
173 getAdminBuilder
.get().setMaxRetries(DEFAULT_RETRIES_NUMBER
* 2).build()
174 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
175 } catch (Exception e
) {
176 fail("The Operation should succeed, unexpected exception: " + e
.getMessage());
180 public static class TestRpcTimeoutCoprocessor
implements MasterCoprocessor
, MasterObserver
{
181 public TestRpcTimeoutCoprocessor() {
186 public Optional
<MasterObserver
> getMasterObserver() {
187 return Optional
.of(this);
190 public void preGetNamespaceDescriptor(ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
191 String namespace
) throws IOException
{
192 Threads
.sleep(DEFAULT_RPC_TIMEOUT
);
196 public static class TestOperationTimeoutCoprocessor
implements MasterCoprocessor
, MasterObserver
{
197 AtomicLong sleepTime
= new AtomicLong(0);
199 public TestOperationTimeoutCoprocessor() {
203 public Optional
<MasterObserver
> getMasterObserver() {
204 return Optional
.of(this);
208 public void preGetNamespaceDescriptor(ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
209 String namespace
) throws IOException
{
210 Threads
.sleep(DEFAULT_RPC_TIMEOUT
/ 2);
211 if (sleepTime
.addAndGet(DEFAULT_RPC_TIMEOUT
/ 2) < DEFAULT_OPERATION_TIMEOUT
) {
212 throw new IOException("call fail");
217 public static class TestMaxRetriesCoprocessor
implements MasterCoprocessor
, MasterObserver
{
218 AtomicLong retryNum
= new AtomicLong(0);
220 public TestMaxRetriesCoprocessor() {
224 public Optional
<MasterObserver
> getMasterObserver() {
225 return Optional
.of(this);
229 public void preGetNamespaceDescriptor(ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
230 String namespace
) throws IOException
{
231 if (retryNum
.getAndIncrement() < DEFAULT_RETRIES_NUMBER
) {
232 throw new IOException("call fail");