HBASE-22037 Re-enable TestAvoidCellReferencesIntoShippedBlocks
[hbase.git] / hbase-client / src / test / java / org / apache / hadoop / hbase / client / TestAsyncProcessWithRegionException.java
blob2c24aaa7be397284c714bdf9cb981b6e0fbc76a6
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertTrue;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.util.ArrayList;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.HBaseClassTestRule;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HRegionLocation;
34 import org.apache.hadoop.hbase.RegionLocations;
35 import org.apache.hadoop.hbase.ServerName;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
38 import org.apache.hadoop.hbase.testclassification.ClientTests;
39 import org.apache.hadoop.hbase.testclassification.SmallTests;
40 import org.apache.hadoop.hbase.util.Bytes;
41 import org.junit.BeforeClass;
42 import org.junit.ClassRule;
43 import org.junit.Test;
44 import org.junit.experimental.categories.Category;
45 import org.mockito.Mockito;
47 /**
48 * The purpose of this test is to make sure the region exception won't corrupt the results
49 * of batch. The prescription is shown below.
50 * 1) honor the action result rather than region exception. If the action have both of true result
51 * and region exception, the action is fine as the exception is caused by other actions
52 * which are in the same region.
53 * 2) honor the action exception rather than region exception. If the action have both of action
54 * exception and region exception, we deal with the action exception only. If we also
55 * handle the region exception for the same action, it will introduce the negative count of
56 * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
58 * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
59 * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
60 * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
61 * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
63 @Category({ ClientTests.class, SmallTests.class })
64 public class TestAsyncProcessWithRegionException {
66 @ClassRule
67 public static final HBaseClassTestRule CLASS_RULE =
68 HBaseClassTestRule.forClass(TestAsyncProcessWithRegionException.class);
70 private static final Result EMPTY_RESULT = Result.create(null, true);
71 private static final IOException IOE = new IOException("YOU CAN'T PASS");
72 private static final Configuration CONF = new Configuration();
73 private static final TableName DUMMY_TABLE = TableName.valueOf("DUMMY_TABLE");
74 private static final byte[] GOOD_ROW = Bytes.toBytes("GOOD_ROW");
75 private static final byte[] BAD_ROW = Bytes.toBytes("BAD_ROW");
76 private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION =
77 Bytes.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
78 private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
79 private static final ServerName SERVER_NAME = ServerName.valueOf("s1,1,1");
80 private static final RegionInfo REGION_INFO =
81 RegionInfoBuilder.newBuilder(DUMMY_TABLE)
82 .setStartKey(HConstants.EMPTY_START_ROW)
83 .setEndKey(HConstants.EMPTY_END_ROW)
84 .setSplit(false)
85 .setRegionId(1)
86 .build();
88 private static final HRegionLocation REGION_LOCATION =
89 new HRegionLocation(REGION_INFO, SERVER_NAME);
91 @BeforeClass
92 public static void setUpBeforeClass() {
93 // disable the retry
94 CONF.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
97 @Test
98 public void testSuccessivePut() throws Exception {
99 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
101 List<Put> puts = new ArrayList<>(1);
102 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
103 final int expectedSize = puts.size();
104 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
105 arf.waitUntilDone();
106 Object[] result = arf.getResults();
107 assertEquals(expectedSize, result.length);
108 for (Object r : result) {
109 assertEquals(Result.class, r.getClass());
111 assertTrue(puts.isEmpty());
112 assertActionsInProgress(arf);
115 @Test
116 public void testFailedPut() throws Exception {
117 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
119 List<Put> puts = new ArrayList<>(2);
120 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
121 // this put should fail
122 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
123 final int expectedSize = puts.size();
125 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
126 arf.waitUntilDone();
127 // There is a failed puts
128 assertError(arf, 1);
129 Object[] result = arf.getResults();
130 assertEquals(expectedSize, result.length);
131 assertEquals(Result.class, result[0].getClass());
132 assertTrue(result[1] instanceof IOException);
133 assertTrue(puts.isEmpty());
134 assertActionsInProgress(arf);
137 @Test
138 public void testFailedPutWithoutActionException() throws Exception {
139 MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), CONF);
141 List<Put> puts = new ArrayList<>(3);
142 puts.add(new Put(GOOD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
143 // this put should fail
144 puts.add(new Put(BAD_ROW).addColumn(FAMILY, FAMILY, FAMILY));
145 // this put should fail, and it won't have action exception
146 puts.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION).addColumn(FAMILY, FAMILY, FAMILY));
147 final int expectedSize = puts.size();
149 AsyncRequestFuture arf = ap.submit(DUMMY_TABLE, puts);
150 arf.waitUntilDone();
151 // There are two failed puts
152 assertError(arf, 2);
153 Object[] result = arf.getResults();
154 assertEquals(expectedSize, result.length);
155 assertEquals(Result.class, result[0].getClass());
156 assertTrue(result[1] instanceof IOException);
157 assertTrue(result[2] instanceof IOException);
158 assertTrue(puts.isEmpty());
159 assertActionsInProgress(arf);
162 private static void assertError(AsyncRequestFuture arf, int expectedCountOfFailure) {
163 assertTrue(arf.hasError());
164 RetriesExhaustedWithDetailsException e = arf.getErrors();
165 List<Throwable> errors = e.getCauses();
166 assertEquals(expectedCountOfFailure, errors.size());
167 for (Throwable t : errors) {
168 assertTrue(t instanceof IOException);
172 private static void assertActionsInProgress(AsyncRequestFuture arf) {
173 if (arf instanceof AsyncRequestFutureImpl) {
174 assertEquals(0, ((AsyncRequestFutureImpl) arf).getNumberOfActionsInProgress());
178 private static ConnectionImplementation createHConnection() throws IOException {
179 ConnectionImplementation hc = Mockito.mock(ConnectionImplementation.class);
180 NonceGenerator ng = Mockito.mock(NonceGenerator.class);
181 Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE);
182 Mockito.when(hc.getNonceGenerator()).thenReturn(ng);
183 Mockito.when(hc.getConfiguration()).thenReturn(CONF);
184 Mockito.when(hc.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF));
185 setMockLocation(hc, GOOD_ROW, new RegionLocations(REGION_LOCATION));
186 setMockLocation(hc, BAD_ROW, new RegionLocations(REGION_LOCATION));
187 Mockito
188 .when(hc.locateRegions(Mockito.eq(DUMMY_TABLE), Mockito.anyBoolean(), Mockito.anyBoolean()))
189 .thenReturn(Collections.singletonList(REGION_LOCATION));
190 return hc;
193 private static void setMockLocation(ConnectionImplementation hc, byte[] row,
194 RegionLocations result) throws IOException {
195 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
196 Mockito.anyBoolean(), Mockito.anyInt())).thenReturn(result);
197 Mockito.when(hc.locateRegion(Mockito.eq(DUMMY_TABLE), Mockito.eq(row), Mockito.anyBoolean(),
198 Mockito.anyBoolean())).thenReturn(result);
201 private static class MyAsyncProcess extends AsyncProcess {
202 private final ExecutorService service = Executors.newFixedThreadPool(5);
204 MyAsyncProcess(ConnectionImplementation hc, Configuration conf) {
205 super(hc, conf, new RpcRetryingCallerFactory(conf), new RpcControllerFactory(conf));
208 public AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows)
209 throws InterruptedIOException {
210 return submit(AsyncProcessTask.newBuilder()
211 .setPool(service)
212 .setTableName(tableName)
213 .setRowAccess(rows)
214 .setSubmittedRows(AsyncProcessTask.SubmittedRows.NORMAL)
215 .setNeedResults(true)
216 .setRpcTimeout(HConstants.DEFAULT_HBASE_RPC_TIMEOUT)
217 .setOperationTimeout(HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)
218 .build());
221 @Override
222 protected RpcRetryingCaller<AbstractResponse> createCaller(
223 CancellableRegionServerCallable callable, int rpcTimeout) {
224 MultiServerCallable callable1 = (MultiServerCallable) callable;
225 MultiResponse mr = new MultiResponse();
226 callable1.getMulti().actions.forEach((regionName, actions) -> {
227 actions.forEach(action -> {
228 if (Bytes.equals(action.getAction().getRow(), GOOD_ROW)) {
229 mr.add(regionName, action.getOriginalIndex(), EMPTY_RESULT);
230 } else if (Bytes.equals(action.getAction().getRow(), BAD_ROW)) {
231 mr.add(regionName, action.getOriginalIndex(), IOE);
235 mr.addException(REGION_INFO.getRegionName(), IOE);
236 return new RpcRetryingCallerImpl<AbstractResponse>(100, 500, 0, 9) {
237 @Override
238 public AbstractResponse callWithoutRetries(RetryingCallable<AbstractResponse> callable,
239 int callTimeout) {
240 try {
241 // sleep one second in order for threadpool to start another thread instead of reusing
242 // existing one.
243 Thread.sleep(1000);
244 } catch (InterruptedException e) {
245 // pass
247 return mr;