HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestAsyncSingleRequestRpcRetryingCaller.java
blob4205012db112d53618ab2a273a41a58e63876888
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.hamcrest.CoreMatchers.instanceOf;
21 import static org.junit.Assert.assertArrayEquals;
22 import static org.junit.Assert.assertThat;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
26 import java.io.IOException;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import org.apache.commons.io.IOUtils;
33 import org.apache.hadoop.hbase.HBaseClassTestRule;
34 import org.apache.hadoop.hbase.HBaseTestingUtility;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.security.User;
38 import org.apache.hadoop.hbase.testclassification.ClientTests;
39 import org.apache.hadoop.hbase.testclassification.MediumTests;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.junit.AfterClass;
42 import org.junit.BeforeClass;
43 import org.junit.ClassRule;
44 import org.junit.Test;
45 import org.junit.experimental.categories.Category;
47 @Category({ MediumTests.class, ClientTests.class })
48 public class TestAsyncSingleRequestRpcRetryingCaller {
50 @ClassRule
51 public static final HBaseClassTestRule CLASS_RULE =
52 HBaseClassTestRule.forClass(TestAsyncSingleRequestRpcRetryingCaller.class);
54 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
56 private static TableName TABLE_NAME = TableName.valueOf("async");
58 private static byte[] FAMILY = Bytes.toBytes("cf");
60 private static byte[] QUALIFIER = Bytes.toBytes("cq");
62 private static byte[] ROW = Bytes.toBytes("row");
64 private static byte[] VALUE = Bytes.toBytes("value");
66 private static AsyncConnectionImpl CONN;
68 @BeforeClass
69 public static void setUpBeforeClass() throws Exception {
70 TEST_UTIL.startMiniCluster(2);
71 TEST_UTIL.getAdmin().balancerSwitch(false, true);
72 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
73 TEST_UTIL.waitTableAvailable(TABLE_NAME);
74 ConnectionRegistry registry =
75 ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
76 CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
77 registry.getClusterId().get(), null, User.getCurrent());
80 @AfterClass
81 public static void tearDownAfterClass() throws Exception {
82 IOUtils.closeQuietly(CONN);
83 TEST_UTIL.shutdownMiniCluster();
86 @Test
87 public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
88 // This will leave a cached entry in location cache
89 HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
90 int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegion().getRegionName());
91 TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(),
92 TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName());
93 AsyncTable<?> table = CONN.getTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
94 .setMaxRetries(30).build();
95 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
97 // move back
98 TEST_UTIL.getAdmin().move(loc.getRegion().getEncodedNameAsBytes(), loc.getServerName());
99 Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
100 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
103 private <T> CompletableFuture<T> failedFuture() {
104 CompletableFuture<T> future = new CompletableFuture<>();
105 future.completeExceptionally(new RuntimeException("Inject error!"));
106 return future;
109 @Test
110 public void testMaxRetries() throws IOException, InterruptedException {
111 try {
112 CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
113 .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
114 .action((controller, loc, stub) -> failedFuture()).call().get();
115 fail();
116 } catch (ExecutionException e) {
117 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
121 @Test
122 public void testOperationTimeout() throws IOException, InterruptedException {
123 long startNs = System.nanoTime();
124 try {
125 CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
126 .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
127 .action((controller, loc, stub) -> failedFuture()).call().get();
128 fail();
129 } catch (ExecutionException e) {
130 e.printStackTrace();
131 assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
133 long costNs = System.nanoTime() - startNs;
134 assertTrue(costNs >= TimeUnit.SECONDS.toNanos(1));
135 assertTrue(costNs < TimeUnit.SECONDS.toNanos(2));
138 @Test
139 public void testLocateError() throws IOException, InterruptedException, ExecutionException {
140 AtomicBoolean errorTriggered = new AtomicBoolean(false);
141 AtomicInteger count = new AtomicInteger(0);
142 HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
143 AsyncRegionLocator mockedLocator =
144 new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
145 @Override
146 CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
147 int replicaId, RegionLocateType locateType, long timeoutNs) {
148 if (tableName.equals(TABLE_NAME)) {
149 CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
150 if (count.getAndIncrement() == 0) {
151 errorTriggered.set(true);
152 future.completeExceptionally(new RuntimeException("Inject error!"));
153 } else {
154 future.complete(loc);
156 return future;
157 } else {
158 return super.getRegionLocation(tableName, row, replicaId, locateType, timeoutNs);
162 @Override
163 void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
166 try (AsyncConnectionImpl mockedConn = new AsyncConnectionImpl(CONN.getConfiguration(),
167 CONN.registry, CONN.registry.getClusterId().get(), null, User.getCurrent()) {
169 @Override
170 AsyncRegionLocator getLocator() {
171 return mockedLocator;
173 }) {
174 AsyncTable<?> table = mockedConn.getTableBuilder(TABLE_NAME)
175 .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
176 table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
177 assertTrue(errorTriggered.get());
178 errorTriggered.set(false);
179 count.set(0);
180 Result result = table.get(new Get(ROW).addColumn(FAMILY, QUALIFIER)).get();
181 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
182 assertTrue(errorTriggered.get());