2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.Arrays
;
26 import java
.util
.List
;
27 import java
.util
.Optional
;
28 import java
.util
.concurrent
.ExecutionException
;
29 import java
.util
.concurrent
.TimeUnit
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
35 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
36 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
37 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.MiniBatchOperationInProgress
;
39 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
40 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
41 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
42 import org
.apache
.hadoop
.hbase
.util
.Threads
;
43 import org
.junit
.AfterClass
;
44 import org
.junit
.Before
;
45 import org
.junit
.BeforeClass
;
46 import org
.junit
.ClassRule
;
47 import org
.junit
.Rule
;
48 import org
.junit
.Test
;
49 import org
.junit
.experimental
.categories
.Category
;
50 import org
.junit
.rules
.TestName
;
52 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
54 @Category({ MediumTests
.class, ClientTests
.class })
55 public class TestAsyncTableNoncedRetry
{
58 public static final HBaseClassTestRule CLASS_RULE
=
59 HBaseClassTestRule
.forClass(TestAsyncTableNoncedRetry
.class);
61 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
63 private static final TableName TABLE_NAME
= TableName
.valueOf("async");
65 private static final byte[] FAMILY
= Bytes
.toBytes("cf");
67 private static final byte[] QUALIFIER
= Bytes
.toBytes("cq");
69 private static final byte[] QUALIFIER2
= Bytes
.toBytes("cq2");
71 private static final byte[] QUALIFIER3
= Bytes
.toBytes("cq3");
73 private static final byte[] VALUE
= Bytes
.toBytes("value");
75 private static AsyncConnection ASYNC_CONN
;
78 public TestName testName
= new TestName();
82 private static final AtomicInteger CALLED
= new AtomicInteger();
84 private static final long SLEEP_TIME
= 2000;
86 private static final long RPC_TIMEOUT
= SLEEP_TIME
/ 4 * 3; // three fourths of the sleep time
88 // The number of miniBatchOperations that are executed in a RegionServer
89 private static int miniBatchOperationCount
;
91 public static final class SleepOnceCP
implements RegionObserver
, RegionCoprocessor
{
94 public Optional
<RegionObserver
> getRegionObserver() {
95 return Optional
.of(this);
99 public void postBatchMutate(ObserverContext
<RegionCoprocessorEnvironment
> c
,
100 MiniBatchOperationInProgress
<Mutation
> miniBatchOp
) {
101 // We sleep when the last of the miniBatchOperation is executed
102 if (CALLED
.getAndIncrement() == miniBatchOperationCount
- 1) {
103 Threads
.sleepWithoutInterrupt(SLEEP_TIME
);
109 public static void setUpBeforeClass() throws Exception
{
110 TEST_UTIL
.startMiniCluster(1);
112 .createTable(TableDescriptorBuilder
.newBuilder(TABLE_NAME
)
113 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
))
114 .setCoprocessor(SleepOnceCP
.class.getName()).build());
115 TEST_UTIL
.waitTableAvailable(TABLE_NAME
);
116 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
120 public static void tearDownAfterClass() throws Exception
{
121 Closeables
.close(ASYNC_CONN
, true);
122 TEST_UTIL
.shutdownMiniCluster();
126 public void setUp() throws IOException
, InterruptedException
{
127 row
= Bytes
.toBytes(testName
.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
132 public void testAppend() throws InterruptedException
, ExecutionException
{
133 assertEquals(0, CALLED
.get());
134 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
135 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
137 miniBatchOperationCount
= 1;
138 Result result
= table
.append(new Append(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
)).get();
140 // make sure we called twice and the result is still correct
141 assertEquals(2, CALLED
.get());
142 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, QUALIFIER
));
146 public void testAppendWhenReturnResultsEqualsFalse() throws InterruptedException
,
148 assertEquals(0, CALLED
.get());
149 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
150 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
152 miniBatchOperationCount
= 1;
153 Result result
= table
.append(new Append(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
)
154 .setReturnResults(false)).get();
156 // make sure we called twice and the result is still correct
157 assertEquals(2, CALLED
.get());
158 assertTrue(result
.isEmpty());
162 public void testIncrement() throws InterruptedException
, ExecutionException
{
163 assertEquals(0, CALLED
.get());
164 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
165 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
167 miniBatchOperationCount
= 1;
168 long result
= table
.incrementColumnValue(row
, FAMILY
, QUALIFIER
, 1L).get();
170 // make sure we called twice and the result is still correct
171 assertEquals(2, CALLED
.get());
172 assertEquals(1L, result
);
176 public void testIncrementWhenReturnResultsEqualsFalse() throws InterruptedException
,
178 assertEquals(0, CALLED
.get());
179 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
180 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
182 miniBatchOperationCount
= 1;
183 Result result
= table
.increment(new Increment(row
).addColumn(FAMILY
, QUALIFIER
, 1L)
184 .setReturnResults(false)).get();
186 // make sure we called twice and the result is still correct
187 assertEquals(2, CALLED
.get());
188 assertTrue(result
.isEmpty());
192 public void testIncrementInRowMutations()
193 throws InterruptedException
, ExecutionException
, IOException
{
194 assertEquals(0, CALLED
.get());
195 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
196 .setWriteRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
198 miniBatchOperationCount
= 1;
199 Result result
= table
.mutateRow(new RowMutations(row
)
200 .add(new Increment(row
).addColumn(FAMILY
, QUALIFIER
, 1L))
201 .add(new Delete(row
).addColumn(FAMILY
, QUALIFIER2
))).get();
203 // make sure we called twice and the result is still correct
204 assertEquals(2, CALLED
.get());
205 assertEquals(1L, Bytes
.toLong(result
.getValue(FAMILY
, QUALIFIER
)));
209 public void testAppendInRowMutations()
210 throws InterruptedException
, ExecutionException
, IOException
{
211 assertEquals(0, CALLED
.get());
212 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
213 .setWriteRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
215 miniBatchOperationCount
= 1;
216 Result result
= table
.mutateRow(new RowMutations(row
)
217 .add(new Append(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
))
218 .add(new Delete(row
).addColumn(FAMILY
, QUALIFIER2
))).get();
220 // make sure we called twice and the result is still correct
221 assertEquals(2, CALLED
.get());
222 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, QUALIFIER
));
226 public void testIncrementAndAppendInRowMutations()
227 throws InterruptedException
, ExecutionException
, IOException
{
228 assertEquals(0, CALLED
.get());
229 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
230 .setWriteRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
232 miniBatchOperationCount
= 1;
233 Result result
= table
.mutateRow(new RowMutations(row
)
234 .add(new Increment(row
).addColumn(FAMILY
, QUALIFIER
, 1L))
235 .add(new Append(row
).addColumn(FAMILY
, QUALIFIER2
, VALUE
))).get();
237 // make sure we called twice and the result is still correct
238 assertEquals(2, CALLED
.get());
239 assertEquals(1L, Bytes
.toLong(result
.getValue(FAMILY
, QUALIFIER
)));
240 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, QUALIFIER2
));
244 public void testIncrementInCheckAndMutate() throws InterruptedException
, ExecutionException
{
245 assertEquals(0, CALLED
.get());
246 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
247 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
249 miniBatchOperationCount
= 1;
250 CheckAndMutateResult result
= table
.checkAndMutate(CheckAndMutate
.newBuilder(row
)
251 .ifNotExists(FAMILY
, QUALIFIER2
)
252 .build(new Increment(row
).addColumn(FAMILY
, QUALIFIER
, 1L))).get();
254 // make sure we called twice and the result is still correct
255 assertEquals(2, CALLED
.get());
256 assertTrue(result
.isSuccess());
257 assertEquals(1L, Bytes
.toLong(result
.getResult().getValue(FAMILY
, QUALIFIER
)));
261 public void testAppendInCheckAndMutate() throws InterruptedException
, ExecutionException
{
262 assertEquals(0, CALLED
.get());
263 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
264 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
266 miniBatchOperationCount
= 1;
267 CheckAndMutateResult result
= table
.checkAndMutate(CheckAndMutate
.newBuilder(row
)
268 .ifNotExists(FAMILY
, QUALIFIER2
)
269 .build(new Append(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
))).get();
271 // make sure we called twice and the result is still correct
272 assertEquals(2, CALLED
.get());
273 assertTrue(result
.isSuccess());
274 assertArrayEquals(VALUE
, result
.getResult().getValue(FAMILY
, QUALIFIER
));
278 public void testIncrementInRowMutationsInCheckAndMutate() throws InterruptedException
,
279 ExecutionException
, IOException
{
280 assertEquals(0, CALLED
.get());
281 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
282 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
284 miniBatchOperationCount
= 1;
285 CheckAndMutateResult result
= table
.checkAndMutate(CheckAndMutate
.newBuilder(row
)
286 .ifNotExists(FAMILY
, QUALIFIER3
)
287 .build(new RowMutations(row
).add(new Increment(row
).addColumn(FAMILY
, QUALIFIER
, 1L))
288 .add(new Delete(row
).addColumn(FAMILY
, QUALIFIER2
)))).get();
290 // make sure we called twice and the result is still correct
291 assertEquals(2, CALLED
.get());
292 assertTrue(result
.isSuccess());
293 assertEquals(1L, Bytes
.toLong(result
.getResult().getValue(FAMILY
, QUALIFIER
)));
297 public void testAppendInRowMutationsInCheckAndMutate() throws InterruptedException
,
298 ExecutionException
, IOException
{
299 assertEquals(0, CALLED
.get());
300 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
301 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
303 miniBatchOperationCount
= 1;
304 CheckAndMutateResult result
= table
.checkAndMutate(CheckAndMutate
.newBuilder(row
)
305 .ifNotExists(FAMILY
, QUALIFIER3
)
306 .build(new RowMutations(row
).add(new Append(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
))
307 .add(new Delete(row
).addColumn(FAMILY
, QUALIFIER2
)))).get();
309 // make sure we called twice and the result is still correct
310 assertEquals(2, CALLED
.get());
311 assertTrue(result
.isSuccess());
312 assertArrayEquals(VALUE
, result
.getResult().getValue(FAMILY
, QUALIFIER
));
316 public void testIncrementAndAppendInRowMutationsInCheckAndMutate() throws InterruptedException
,
317 ExecutionException
, IOException
{
318 assertEquals(0, CALLED
.get());
319 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
320 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
322 miniBatchOperationCount
= 1;
323 CheckAndMutateResult result
= table
.checkAndMutate(CheckAndMutate
.newBuilder(row
)
324 .ifNotExists(FAMILY
, QUALIFIER3
)
325 .build(new RowMutations(row
).add(new Increment(row
).addColumn(FAMILY
, QUALIFIER
, 1L))
326 .add(new Append(row
).addColumn(FAMILY
, QUALIFIER2
, VALUE
)))).get();
328 // make sure we called twice and the result is still correct
329 assertEquals(2, CALLED
.get());
330 assertTrue(result
.isSuccess());
331 assertEquals(1L, Bytes
.toLong(result
.getResult().getValue(FAMILY
, QUALIFIER
)));
332 assertArrayEquals(VALUE
, result
.getResult().getValue(FAMILY
, QUALIFIER2
));
336 public void testBatch() throws InterruptedException
,
337 ExecutionException
, IOException
{
338 byte[] row2
= Bytes
.toBytes(Bytes
.toString(row
) + "2");
339 byte[] row3
= Bytes
.toBytes(Bytes
.toString(row
) + "3");
340 byte[] row4
= Bytes
.toBytes(Bytes
.toString(row
) + "4");
341 byte[] row5
= Bytes
.toBytes(Bytes
.toString(row
) + "5");
342 byte[] row6
= Bytes
.toBytes(Bytes
.toString(row
) + "6");
344 assertEquals(0, CALLED
.get());
346 AsyncTable
<?
> table
= ASYNC_CONN
.getTableBuilder(TABLE_NAME
)
347 .setRpcTimeout(RPC_TIMEOUT
, TimeUnit
.MILLISECONDS
).build();
349 miniBatchOperationCount
= 6;
350 List
<Object
> results
= table
.batchAll(Arrays
.asList(
351 new Append(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
),
352 new Increment(row2
).addColumn(FAMILY
, QUALIFIER
, 1L),
353 new RowMutations(row3
)
354 .add(new Increment(row3
).addColumn(FAMILY
, QUALIFIER
, 1L))
355 .add(new Append(row3
).addColumn(FAMILY
, QUALIFIER2
, VALUE
)),
356 CheckAndMutate
.newBuilder(row4
)
357 .ifNotExists(FAMILY
, QUALIFIER2
)
358 .build(new Increment(row4
).addColumn(FAMILY
, QUALIFIER
, 1L)),
359 CheckAndMutate
.newBuilder(row5
)
360 .ifNotExists(FAMILY
, QUALIFIER2
)
361 .build(new Append(row5
).addColumn(FAMILY
, QUALIFIER
, VALUE
)),
362 CheckAndMutate
.newBuilder(row6
)
363 .ifNotExists(FAMILY
, QUALIFIER3
)
364 .build(new RowMutations(row6
).add(new Increment(row6
).addColumn(FAMILY
, QUALIFIER
, 1L))
365 .add(new Append(row6
).addColumn(FAMILY
, QUALIFIER2
, VALUE
))))).get();
367 // make sure we called twice and the result is still correct
369 // should be called 12 times as 6 miniBatchOperations are called twice
370 assertEquals(12, CALLED
.get());
372 assertArrayEquals(VALUE
, ((Result
) results
.get(0)).getValue(FAMILY
, QUALIFIER
));
374 assertEquals(1L, Bytes
.toLong(((Result
) results
.get(1)).getValue(FAMILY
, QUALIFIER
)));
376 assertEquals(1L, Bytes
.toLong(((Result
) results
.get(2)).getValue(FAMILY
, QUALIFIER
)));
377 assertArrayEquals(VALUE
, ((Result
) results
.get(2)).getValue(FAMILY
, QUALIFIER2
));
379 CheckAndMutateResult result
;
381 result
= (CheckAndMutateResult
) results
.get(3);
382 assertTrue(result
.isSuccess());
383 assertEquals(1L, Bytes
.toLong(result
.getResult().getValue(FAMILY
, QUALIFIER
)));
385 result
= (CheckAndMutateResult
) results
.get(4);
386 assertTrue(result
.isSuccess());
387 assertArrayEquals(VALUE
, result
.getResult().getValue(FAMILY
, QUALIFIER
));
389 result
= (CheckAndMutateResult
) results
.get(5);
390 assertTrue(result
.isSuccess());
391 assertEquals(1L, Bytes
.toLong(result
.getResult().getValue(FAMILY
, QUALIFIER
)));
392 assertArrayEquals(VALUE
, result
.getResult().getValue(FAMILY
, QUALIFIER2
));