2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.io
.IOException
;
24 import java
.io
.InterruptedIOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Collections
;
27 import java
.util
.List
;
28 import java
.util
.concurrent
.ExecutorService
;
29 import java
.util
.concurrent
.Executors
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
34 import org
.apache
.hadoop
.hbase
.RegionLocations
;
35 import org
.apache
.hadoop
.hbase
.ServerName
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.ipc
.RpcControllerFactory
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
39 import org
.apache
.hadoop
.hbase
.testclassification
.SmallTests
;
40 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
41 import org
.junit
.BeforeClass
;
42 import org
.junit
.ClassRule
;
43 import org
.junit
.Test
;
44 import org
.junit
.experimental
.categories
.Category
;
45 import org
.mockito
.Mockito
;
48 * The purpose of this test is to make sure the region exception won't corrupt the results
49 * of batch. The prescription is shown below.
50 * 1) honor the action result rather than region exception. If the action have both of true result
51 * and region exception, the action is fine as the exception is caused by other actions
52 * which are in the same region.
53 * 2) honor the action exception rather than region exception. If the action have both of action
54 * exception and region exception, we deal with the action exception only. If we also
55 * handle the region exception for the same action, it will introduce the negative count of
56 * actions in progress. The AsyncRequestFuture#waitUntilDone will block forever.
58 * This bug can be reproduced by real use case. see TestMalformedCellFromClient(in branch-1.4+).
59 * It uses the batch of RowMutations to present the bug. Given that the batch of RowMutations is
60 * only supported by branch-1.4+, perhaps the branch-1.3 and branch-1.2 won't encounter this issue.
61 * We still backport the fix to branch-1.3 and branch-1.2 in case we ignore some write paths.
63 @Category({ ClientTests
.class, SmallTests
.class })
64 public class TestAsyncProcessWithRegionException
{
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestAsyncProcessWithRegionException
.class);
70 private static final Result EMPTY_RESULT
= Result
.create(null, true);
71 private static final IOException IOE
= new IOException("YOU CAN'T PASS");
72 private static final Configuration CONF
= new Configuration();
73 private static final TableName DUMMY_TABLE
= TableName
.valueOf("DUMMY_TABLE");
74 private static final byte[] GOOD_ROW
= Bytes
.toBytes("GOOD_ROW");
75 private static final byte[] BAD_ROW
= Bytes
.toBytes("BAD_ROW");
76 private static final byte[] BAD_ROW_WITHOUT_ACTION_EXCEPTION
=
77 Bytes
.toBytes("BAD_ROW_WITHOUT_ACTION_EXCEPTION");
78 private static final byte[] FAMILY
= Bytes
.toBytes("FAMILY");
79 private static final ServerName SERVER_NAME
= ServerName
.valueOf("s1,1,1");
80 private static final RegionInfo REGION_INFO
=
81 RegionInfoBuilder
.newBuilder(DUMMY_TABLE
)
82 .setStartKey(HConstants
.EMPTY_START_ROW
)
83 .setEndKey(HConstants
.EMPTY_END_ROW
)
88 private static final HRegionLocation REGION_LOCATION
=
89 new HRegionLocation(REGION_INFO
, SERVER_NAME
);
92 public static void setUpBeforeClass() {
94 CONF
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 0);
98 public void testSuccessivePut() throws Exception
{
99 MyAsyncProcess ap
= new MyAsyncProcess(createHConnection(), CONF
);
101 List
<Put
> puts
= new ArrayList
<>(1);
102 puts
.add(new Put(GOOD_ROW
).addColumn(FAMILY
, FAMILY
, FAMILY
));
103 final int expectedSize
= puts
.size();
104 AsyncRequestFuture arf
= ap
.submit(DUMMY_TABLE
, puts
);
106 Object
[] result
= arf
.getResults();
107 assertEquals(expectedSize
, result
.length
);
108 for (Object r
: result
) {
109 assertEquals(Result
.class, r
.getClass());
111 assertTrue(puts
.isEmpty());
112 assertActionsInProgress(arf
);
116 public void testFailedPut() throws Exception
{
117 MyAsyncProcess ap
= new MyAsyncProcess(createHConnection(), CONF
);
119 List
<Put
> puts
= new ArrayList
<>(2);
120 puts
.add(new Put(GOOD_ROW
).addColumn(FAMILY
, FAMILY
, FAMILY
));
121 // this put should fail
122 puts
.add(new Put(BAD_ROW
).addColumn(FAMILY
, FAMILY
, FAMILY
));
123 final int expectedSize
= puts
.size();
125 AsyncRequestFuture arf
= ap
.submit(DUMMY_TABLE
, puts
);
127 // There is a failed puts
129 Object
[] result
= arf
.getResults();
130 assertEquals(expectedSize
, result
.length
);
131 assertEquals(Result
.class, result
[0].getClass());
132 assertTrue(result
[1] instanceof IOException
);
133 assertTrue(puts
.isEmpty());
134 assertActionsInProgress(arf
);
138 public void testFailedPutWithoutActionException() throws Exception
{
139 MyAsyncProcess ap
= new MyAsyncProcess(createHConnection(), CONF
);
141 List
<Put
> puts
= new ArrayList
<>(3);
142 puts
.add(new Put(GOOD_ROW
).addColumn(FAMILY
, FAMILY
, FAMILY
));
143 // this put should fail
144 puts
.add(new Put(BAD_ROW
).addColumn(FAMILY
, FAMILY
, FAMILY
));
145 // this put should fail, and it won't have action exception
146 puts
.add(new Put(BAD_ROW_WITHOUT_ACTION_EXCEPTION
).addColumn(FAMILY
, FAMILY
, FAMILY
));
147 final int expectedSize
= puts
.size();
149 AsyncRequestFuture arf
= ap
.submit(DUMMY_TABLE
, puts
);
151 // There are two failed puts
153 Object
[] result
= arf
.getResults();
154 assertEquals(expectedSize
, result
.length
);
155 assertEquals(Result
.class, result
[0].getClass());
156 assertTrue(result
[1] instanceof IOException
);
157 assertTrue(result
[2] instanceof IOException
);
158 assertTrue(puts
.isEmpty());
159 assertActionsInProgress(arf
);
162 private static void assertError(AsyncRequestFuture arf
, int expectedCountOfFailure
) {
163 assertTrue(arf
.hasError());
164 RetriesExhaustedWithDetailsException e
= arf
.getErrors();
165 List
<Throwable
> errors
= e
.getCauses();
166 assertEquals(expectedCountOfFailure
, errors
.size());
167 for (Throwable t
: errors
) {
168 assertTrue(t
instanceof IOException
);
172 private static void assertActionsInProgress(AsyncRequestFuture arf
) {
173 if (arf
instanceof AsyncRequestFutureImpl
) {
174 assertEquals(0, ((AsyncRequestFutureImpl
) arf
).getNumberOfActionsInProgress());
178 private static ConnectionImplementation
createHConnection() throws IOException
{
179 ConnectionImplementation hc
= Mockito
.mock(ConnectionImplementation
.class);
180 NonceGenerator ng
= Mockito
.mock(NonceGenerator
.class);
181 Mockito
.when(ng
.getNonceGroup()).thenReturn(HConstants
.NO_NONCE
);
182 Mockito
.when(hc
.getNonceGenerator()).thenReturn(ng
);
183 Mockito
.when(hc
.getConfiguration()).thenReturn(CONF
);
184 Mockito
.when(hc
.getConnectionConfiguration()).thenReturn(new ConnectionConfiguration(CONF
));
185 setMockLocation(hc
, GOOD_ROW
, new RegionLocations(REGION_LOCATION
));
186 setMockLocation(hc
, BAD_ROW
, new RegionLocations(REGION_LOCATION
));
188 .when(hc
.locateRegions(Mockito
.eq(DUMMY_TABLE
), Mockito
.anyBoolean(), Mockito
.anyBoolean()))
189 .thenReturn(Collections
.singletonList(REGION_LOCATION
));
193 private static void setMockLocation(ConnectionImplementation hc
, byte[] row
,
194 RegionLocations result
) throws IOException
{
195 Mockito
.when(hc
.locateRegion(Mockito
.eq(DUMMY_TABLE
), Mockito
.eq(row
), Mockito
.anyBoolean(),
196 Mockito
.anyBoolean(), Mockito
.anyInt())).thenReturn(result
);
197 Mockito
.when(hc
.locateRegion(Mockito
.eq(DUMMY_TABLE
), Mockito
.eq(row
), Mockito
.anyBoolean(),
198 Mockito
.anyBoolean())).thenReturn(result
);
201 private static class MyAsyncProcess
extends AsyncProcess
{
202 private final ExecutorService service
= Executors
.newFixedThreadPool(5);
204 MyAsyncProcess(ConnectionImplementation hc
, Configuration conf
) {
205 super(hc
, conf
, new RpcRetryingCallerFactory(conf
), new RpcControllerFactory(conf
));
208 public AsyncRequestFuture
submit(TableName tableName
, List
<?
extends Row
> rows
)
209 throws InterruptedIOException
{
210 return submit(AsyncProcessTask
.newBuilder()
212 .setTableName(tableName
)
214 .setSubmittedRows(AsyncProcessTask
.SubmittedRows
.NORMAL
)
215 .setNeedResults(true)
216 .setRpcTimeout(HConstants
.DEFAULT_HBASE_RPC_TIMEOUT
)
217 .setOperationTimeout(HConstants
.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
)
222 protected RpcRetryingCaller
<AbstractResponse
> createCaller(
223 CancellableRegionServerCallable callable
, int rpcTimeout
) {
224 MultiServerCallable callable1
= (MultiServerCallable
) callable
;
225 MultiResponse mr
= new MultiResponse();
226 callable1
.getMulti().actions
.forEach((regionName
, actions
) -> {
227 actions
.forEach(action
-> {
228 if (Bytes
.equals(action
.getAction().getRow(), GOOD_ROW
)) {
229 mr
.add(regionName
, action
.getOriginalIndex(), EMPTY_RESULT
);
230 } else if (Bytes
.equals(action
.getAction().getRow(), BAD_ROW
)) {
231 mr
.add(regionName
, action
.getOriginalIndex(), IOE
);
235 mr
.addException(REGION_INFO
.getRegionName(), IOE
);
236 return new RpcRetryingCallerImpl
<AbstractResponse
>(100, 500, 0, 9) {
238 public AbstractResponse
callWithoutRetries(RetryingCallable
<AbstractResponse
> callable
,
241 // sleep one second in order for threadpool to start another thread instead of reusing
244 } catch (InterruptedException e
) {