2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.NamespaceDescriptor
.DEFAULT_NAMESPACE_NAME_STR
;
21 import static org
.apache
.hadoop
.hbase
.client
.AsyncConnectionConfiguration
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
22 import static org
.junit
.Assert
.fail
;
24 import java
.io
.IOException
;
25 import java
.util
.Arrays
;
26 import java
.util
.List
;
27 import java
.util
.Optional
;
28 import java
.util
.concurrent
.ForkJoinPool
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.concurrent
.atomic
.AtomicLong
;
31 import java
.util
.function
.Supplier
;
32 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
33 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
34 import org
.apache
.hadoop
.hbase
.HConstants
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterCoprocessor
;
37 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterCoprocessorEnvironment
;
38 import org
.apache
.hadoop
.hbase
.coprocessor
.MasterObserver
;
39 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
40 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
42 import org
.apache
.hadoop
.hbase
.util
.Threads
;
43 import org
.junit
.After
;
44 import org
.junit
.Before
;
45 import org
.junit
.ClassRule
;
46 import org
.junit
.Test
;
47 import org
.junit
.experimental
.categories
.Category
;
48 import org
.junit
.runner
.RunWith
;
49 import org
.junit
.runners
.Parameterized
;
50 import org
.junit
.runners
.Parameterized
.Parameter
;
51 import org
.junit
.runners
.Parameterized
.Parameters
;
52 import org
.slf4j
.Logger
;
53 import org
.slf4j
.LoggerFactory
;
55 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
57 @RunWith(Parameterized
.class)
58 @Category({ LargeTests
.class, ClientTests
.class })
59 public class TestAsyncAdminBuilder
{
62 public static final HBaseClassTestRule CLASS_RULE
=
63 HBaseClassTestRule
.forClass(TestAsyncAdminBuilder
.class);
65 private static final Logger LOG
= LoggerFactory
.getLogger(TestAsyncAdminBuilder
.class);
66 private final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
67 private static AsyncConnection ASYNC_CONN
;
70 public Supplier
<AsyncAdminBuilder
> getAdminBuilder
;
72 private static AsyncAdminBuilder
getRawAsyncAdminBuilder() {
73 return ASYNC_CONN
.getAdminBuilder();
76 private static AsyncAdminBuilder
getAsyncAdminBuilder() {
77 return ASYNC_CONN
.getAdminBuilder(ForkJoinPool
.commonPool());
81 public static List
<Object
[]> params() {
82 return Arrays
.asList(new Supplier
<?
>[] { TestAsyncAdminBuilder
::getRawAsyncAdminBuilder
},
83 new Supplier
<?
>[] { TestAsyncAdminBuilder
::getAsyncAdminBuilder
});
86 private static final int DEFAULT_RPC_TIMEOUT
= 10000;
87 private static final int DEFAULT_OPERATION_TIMEOUT
= 30000;
88 private static final int DEFAULT_RETRIES_NUMBER
= 2;
91 public void setUp() throws Exception
{
92 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, DEFAULT_RPC_TIMEOUT
);
93 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
,
94 DEFAULT_OPERATION_TIMEOUT
);
95 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
,
96 DEFAULT_RETRIES_NUMBER
);
97 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
101 public void tearDown() throws Exception
{
102 Closeables
.close(ASYNC_CONN
, true);
103 TEST_UTIL
.shutdownMiniCluster();
107 public void testRpcTimeout() throws Exception
{
108 TEST_UTIL
.getConfiguration().set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
,
109 TestRpcTimeoutCoprocessor
.class.getName());
110 TEST_UTIL
.startMiniCluster(2);
111 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
114 getAdminBuilder
.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT
/ 2, TimeUnit
.MILLISECONDS
).build()
115 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
116 fail("We expect an exception here");
117 } catch (Exception e
) {
122 getAdminBuilder
.get().setRpcTimeout(DEFAULT_RPC_TIMEOUT
* 2, TimeUnit
.MILLISECONDS
).build()
123 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
124 } catch (Exception e
) {
125 fail("The Operation should succeed, unexpected exception: " + e
.getMessage());
130 public void testOperationTimeout() throws Exception
{
131 // set retry number to 100 to make sure that this test only be affected by operation timeout
132 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 100);
133 TEST_UTIL
.getConfiguration().set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
,
134 TestOperationTimeoutCoprocessor
.class.getName());
135 TEST_UTIL
.startMiniCluster(2);
136 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
139 getAdminBuilder
.get()
140 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT
/ 2, TimeUnit
.MILLISECONDS
).build()
141 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
142 fail("We expect an exception here");
143 } catch (Exception e
) {
148 getAdminBuilder
.get()
149 .setOperationTimeout(DEFAULT_OPERATION_TIMEOUT
* 2, TimeUnit
.MILLISECONDS
).build()
150 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
151 } catch (Exception e
) {
152 fail("The Operation should succeed, unexpected exception: " + e
.getMessage());
157 public void testMaxRetries() throws Exception
{
158 // set operation timeout to 300s to make sure that this test only be affected by retry number
159 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 300000);
160 TEST_UTIL
.getConfiguration().set(CoprocessorHost
.MASTER_COPROCESSOR_CONF_KEY
,
161 TestMaxRetriesCoprocessor
.class.getName());
162 TEST_UTIL
.startMiniCluster(2);
163 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
166 getAdminBuilder
.get().setMaxRetries(DEFAULT_RETRIES_NUMBER
/ 2).build()
167 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
168 fail("We expect an exception here");
169 } catch (Exception e
) {
174 getAdminBuilder
.get().setMaxRetries(DEFAULT_RETRIES_NUMBER
* 2).build()
175 .getNamespaceDescriptor(DEFAULT_NAMESPACE_NAME_STR
).get();
176 } catch (Exception e
) {
177 fail("The Operation should succeed, unexpected exception: " + e
.getMessage());
181 public static class TestRpcTimeoutCoprocessor
implements MasterCoprocessor
, MasterObserver
{
182 public TestRpcTimeoutCoprocessor() {
187 public Optional
<MasterObserver
> getMasterObserver() {
188 return Optional
.of(this);
191 public void preGetNamespaceDescriptor(ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
192 String namespace
) throws IOException
{
193 Threads
.sleep(DEFAULT_RPC_TIMEOUT
);
197 public static class TestOperationTimeoutCoprocessor
implements MasterCoprocessor
, MasterObserver
{
198 AtomicLong sleepTime
= new AtomicLong(0);
200 public TestOperationTimeoutCoprocessor() {
204 public Optional
<MasterObserver
> getMasterObserver() {
205 return Optional
.of(this);
209 public void preGetNamespaceDescriptor(ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
210 String namespace
) throws IOException
{
211 Threads
.sleep(DEFAULT_RPC_TIMEOUT
/ 2);
212 if (sleepTime
.addAndGet(DEFAULT_RPC_TIMEOUT
/ 2) < DEFAULT_OPERATION_TIMEOUT
) {
213 throw new IOException("call fail");
218 public static class TestMaxRetriesCoprocessor
implements MasterCoprocessor
, MasterObserver
{
219 AtomicLong retryNum
= new AtomicLong(0);
221 public TestMaxRetriesCoprocessor() {
225 public Optional
<MasterObserver
> getMasterObserver() {
226 return Optional
.of(this);
230 public void preGetNamespaceDescriptor(ObserverContext
<MasterCoprocessorEnvironment
> ctx
,
231 String namespace
) throws IOException
{
232 if (retryNum
.getAndIncrement() < DEFAULT_RETRIES_NUMBER
) {
233 throw new IOException("call fail");