2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.junit
.Assert
.assertArrayEquals
;
22 import static org
.junit
.Assert
.assertThat
;
23 import static org
.junit
.Assert
.assertTrue
;
24 import static org
.junit
.Assert
.fail
;
26 import java
.io
.IOException
;
27 import java
.util
.concurrent
.CompletableFuture
;
28 import java
.util
.concurrent
.ExecutionException
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
31 import java
.util
.concurrent
.atomic
.AtomicInteger
;
32 import org
.apache
.commons
.io
.IOUtils
;
33 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
34 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
35 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.security
.User
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
39 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
40 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
41 import org
.junit
.AfterClass
;
42 import org
.junit
.BeforeClass
;
43 import org
.junit
.ClassRule
;
44 import org
.junit
.Test
;
45 import org
.junit
.experimental
.categories
.Category
;
47 @Category({ MediumTests
.class, ClientTests
.class })
48 public class TestAsyncSingleRequestRpcRetryingCaller
{
51 public static final HBaseClassTestRule CLASS_RULE
=
52 HBaseClassTestRule
.forClass(TestAsyncSingleRequestRpcRetryingCaller
.class);
54 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
56 private static TableName TABLE_NAME
= TableName
.valueOf("async");
58 private static byte[] FAMILY
= Bytes
.toBytes("cf");
60 private static byte[] QUALIFIER
= Bytes
.toBytes("cq");
62 private static byte[] ROW
= Bytes
.toBytes("row");
64 private static byte[] VALUE
= Bytes
.toBytes("value");
66 private static AsyncConnectionImpl CONN
;
69 public static void setUpBeforeClass() throws Exception
{
70 TEST_UTIL
.startMiniCluster(2);
71 TEST_UTIL
.getAdmin().balancerSwitch(false, true);
72 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
);
73 TEST_UTIL
.waitTableAvailable(TABLE_NAME
);
74 ConnectionRegistry registry
=
75 ConnectionRegistryFactory
.getRegistry(TEST_UTIL
.getConfiguration());
76 CONN
= new AsyncConnectionImpl(TEST_UTIL
.getConfiguration(), registry
,
77 registry
.getClusterId().get(), null, User
.getCurrent());
81 public static void tearDownAfterClass() throws Exception
{
82 IOUtils
.closeQuietly(CONN
);
83 TEST_UTIL
.shutdownMiniCluster();
87 public void testRegionMove() throws InterruptedException
, ExecutionException
, IOException
{
88 // This will leave a cached entry in location cache
89 HRegionLocation loc
= CONN
.getRegionLocator(TABLE_NAME
).getRegionLocation(ROW
).get();
90 int index
= TEST_UTIL
.getHBaseCluster().getServerWith(loc
.getRegion().getRegionName());
91 TEST_UTIL
.getAdmin().move(loc
.getRegion().getEncodedNameAsBytes(),
92 TEST_UTIL
.getHBaseCluster().getRegionServer(1 - index
).getServerName());
93 AsyncTable
<?
> table
= CONN
.getTableBuilder(TABLE_NAME
).setRetryPause(100, TimeUnit
.MILLISECONDS
)
94 .setMaxRetries(30).build();
95 table
.put(new Put(ROW
).addColumn(FAMILY
, QUALIFIER
, VALUE
)).get();
98 TEST_UTIL
.getAdmin().move(loc
.getRegion().getEncodedNameAsBytes(), loc
.getServerName());
99 Result result
= table
.get(new Get(ROW
).addColumn(FAMILY
, QUALIFIER
)).get();
100 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, QUALIFIER
));
103 private <T
> CompletableFuture
<T
> failedFuture() {
104 CompletableFuture
<T
> future
= new CompletableFuture
<>();
105 future
.completeExceptionally(new RuntimeException("Inject error!"));
110 public void testMaxRetries() throws IOException
, InterruptedException
{
112 CONN
.callerFactory
.single().table(TABLE_NAME
).row(ROW
).operationTimeout(1, TimeUnit
.DAYS
)
113 .maxAttempts(3).pause(10, TimeUnit
.MILLISECONDS
)
114 .action((controller
, loc
, stub
) -> failedFuture()).call().get();
116 } catch (ExecutionException e
) {
117 assertThat(e
.getCause(), instanceOf(RetriesExhaustedException
.class));
122 public void testOperationTimeout() throws IOException
, InterruptedException
{
123 long startNs
= System
.nanoTime();
125 CONN
.callerFactory
.single().table(TABLE_NAME
).row(ROW
).operationTimeout(1, TimeUnit
.SECONDS
)
126 .pause(100, TimeUnit
.MILLISECONDS
).maxAttempts(Integer
.MAX_VALUE
)
127 .action((controller
, loc
, stub
) -> failedFuture()).call().get();
129 } catch (ExecutionException e
) {
131 assertThat(e
.getCause(), instanceOf(RetriesExhaustedException
.class));
133 long costNs
= System
.nanoTime() - startNs
;
134 assertTrue(costNs
>= TimeUnit
.SECONDS
.toNanos(1));
135 assertTrue(costNs
< TimeUnit
.SECONDS
.toNanos(2));
139 public void testLocateError() throws IOException
, InterruptedException
, ExecutionException
{
140 AtomicBoolean errorTriggered
= new AtomicBoolean(false);
141 AtomicInteger count
= new AtomicInteger(0);
142 HRegionLocation loc
= CONN
.getRegionLocator(TABLE_NAME
).getRegionLocation(ROW
).get();
143 AsyncRegionLocator mockedLocator
=
144 new AsyncRegionLocator(CONN
, AsyncConnectionImpl
.RETRY_TIMER
) {
146 CompletableFuture
<HRegionLocation
> getRegionLocation(TableName tableName
, byte[] row
,
147 int replicaId
, RegionLocateType locateType
, long timeoutNs
) {
148 if (tableName
.equals(TABLE_NAME
)) {
149 CompletableFuture
<HRegionLocation
> future
= new CompletableFuture
<>();
150 if (count
.getAndIncrement() == 0) {
151 errorTriggered
.set(true);
152 future
.completeExceptionally(new RuntimeException("Inject error!"));
154 future
.complete(loc
);
158 return super.getRegionLocation(tableName
, row
, replicaId
, locateType
, timeoutNs
);
163 void updateCachedLocationOnError(HRegionLocation loc
, Throwable exception
) {
166 try (AsyncConnectionImpl mockedConn
= new AsyncConnectionImpl(CONN
.getConfiguration(),
167 CONN
.registry
, CONN
.registry
.getClusterId().get(), null, User
.getCurrent()) {
170 AsyncRegionLocator
getLocator() {
171 return mockedLocator
;
174 AsyncTable
<?
> table
= mockedConn
.getTableBuilder(TABLE_NAME
)
175 .setRetryPause(100, TimeUnit
.MILLISECONDS
).setMaxRetries(5).build();
176 table
.put(new Put(ROW
).addColumn(FAMILY
, QUALIFIER
, VALUE
)).get();
177 assertTrue(errorTriggered
.get());
178 errorTriggered
.set(false);
180 Result result
= table
.get(new Get(ROW
).addColumn(FAMILY
, QUALIFIER
)).get();
181 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, QUALIFIER
));
182 assertTrue(errorTriggered
.get());