HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestAsyncTable.java
blobb9fb81195756d30edf2402ef56b649820b9306a0
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.hamcrest.CoreMatchers.containsString;
21 import static org.hamcrest.CoreMatchers.instanceOf;
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNull;
26 import static org.junit.Assert.assertThat;
27 import static org.junit.Assert.assertTrue;
28 import static org.junit.Assert.fail;
30 import java.io.IOException;
31 import java.io.UncheckedIOException;
32 import java.util.Arrays;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.concurrent.ArrayBlockingQueue;
36 import java.util.concurrent.BlockingQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ForkJoinPool;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.function.Supplier;
43 import java.util.stream.IntStream;
44 import org.apache.commons.io.IOUtils;
45 import org.apache.hadoop.hbase.CompareOperator;
46 import org.apache.hadoop.hbase.HBaseClassTestRule;
47 import org.apache.hadoop.hbase.HBaseTestingUtility;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.TableNotEnabledException;
50 import org.apache.hadoop.hbase.filter.BinaryComparator;
51 import org.apache.hadoop.hbase.filter.FamilyFilter;
52 import org.apache.hadoop.hbase.filter.FilterList;
53 import org.apache.hadoop.hbase.filter.QualifierFilter;
54 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
55 import org.apache.hadoop.hbase.filter.TimestampsFilter;
56 import org.apache.hadoop.hbase.io.TimeRange;
57 import org.apache.hadoop.hbase.testclassification.ClientTests;
58 import org.apache.hadoop.hbase.testclassification.MediumTests;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.Pair;
61 import org.junit.AfterClass;
62 import org.junit.Before;
63 import org.junit.BeforeClass;
64 import org.junit.ClassRule;
65 import org.junit.Rule;
66 import org.junit.Test;
67 import org.junit.experimental.categories.Category;
68 import org.junit.rules.TestName;
69 import org.junit.runner.RunWith;
70 import org.junit.runners.Parameterized;
71 import org.junit.runners.Parameterized.Parameter;
72 import org.junit.runners.Parameterized.Parameters;
74 @RunWith(Parameterized.class)
75 @Category({ MediumTests.class, ClientTests.class })
76 public class TestAsyncTable {
78 @ClassRule
79 public static final HBaseClassTestRule CLASS_RULE =
80 HBaseClassTestRule.forClass(TestAsyncTable.class);
82 private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
84 private static TableName TABLE_NAME = TableName.valueOf("async");
86 private static byte[] FAMILY = Bytes.toBytes("cf");
88 private static byte[] QUALIFIER = Bytes.toBytes("cq");
90 private static byte[] VALUE = Bytes.toBytes("value");
92 private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
94 private static AsyncConnection ASYNC_CONN;
96 @Rule
97 public TestName testName = new TestName();
99 private byte[] row;
101 @Parameter
102 public Supplier<AsyncTable<?>> getTable;
104 private static AsyncTable<?> getRawTable() {
105 return ASYNC_CONN.getTable(TABLE_NAME);
108 private static AsyncTable<?> getTable() {
109 return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
112 @Parameters
113 public static List<Object[]> params() {
114 return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
115 new Supplier<?>[] { TestAsyncTable::getTable });
118 @BeforeClass
119 public static void setUpBeforeClass() throws Exception {
120 TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
121 MAX_KEY_VALUE_SIZE);
122 TEST_UTIL.startMiniCluster(1);
123 TEST_UTIL.createTable(TABLE_NAME, FAMILY);
124 TEST_UTIL.waitTableAvailable(TABLE_NAME);
125 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
126 assertFalse(ASYNC_CONN.isClosed());
129 @AfterClass
130 public static void tearDownAfterClass() throws Exception {
131 IOUtils.closeQuietly(ASYNC_CONN);
132 assertTrue(ASYNC_CONN.isClosed());
133 TEST_UTIL.shutdownMiniCluster();
136 @Before
137 public void setUp() throws IOException, InterruptedException, ExecutionException {
138 row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
139 if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
140 ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
144 @Test
145 public void testSimple() throws Exception {
146 AsyncTable<?> table = getTable.get();
147 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
148 assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
149 Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
150 assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
151 table.delete(new Delete(row)).get();
152 result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
153 assertTrue(result.isEmpty());
154 assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
157 private byte[] concat(byte[] base, int index) {
158 return Bytes.toBytes(Bytes.toString(base) + "-" + index);
161 @SuppressWarnings("FutureReturnValueIgnored")
162 @Test
163 public void testSimpleMultiple() throws Exception {
164 AsyncTable<?> table = getTable.get();
165 int count = 100;
166 CountDownLatch putLatch = new CountDownLatch(count);
167 IntStream.range(0, count).forEach(
168 i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
169 .thenAccept(x -> putLatch.countDown()));
170 putLatch.await();
171 BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
172 IntStream.range(0, count)
173 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
174 .thenAccept(x -> existsResp.add(x)));
175 for (int i = 0; i < count; i++) {
176 assertTrue(existsResp.take());
178 BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
179 IntStream.range(0, count)
180 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
181 .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
182 for (int i = 0; i < count; i++) {
183 Pair<Integer, Result> pair = getResp.take();
184 assertArrayEquals(concat(VALUE, pair.getFirst()),
185 pair.getSecond().getValue(FAMILY, QUALIFIER));
187 CountDownLatch deleteLatch = new CountDownLatch(count);
188 IntStream.range(0, count).forEach(
189 i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
190 deleteLatch.await();
191 IntStream.range(0, count)
192 .forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
193 .thenAccept(x -> existsResp.add(x)));
194 for (int i = 0; i < count; i++) {
195 assertFalse(existsResp.take());
197 IntStream.range(0, count)
198 .forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
199 .thenAccept(x -> getResp.add(Pair.newPair(i, x))));
200 for (int i = 0; i < count; i++) {
201 Pair<Integer, Result> pair = getResp.take();
202 assertTrue(pair.getSecond().isEmpty());
206 @SuppressWarnings("FutureReturnValueIgnored")
207 @Test
208 public void testIncrement() throws InterruptedException, ExecutionException {
209 AsyncTable<?> table = getTable.get();
210 int count = 100;
211 CountDownLatch latch = new CountDownLatch(count);
212 AtomicLong sum = new AtomicLong(0L);
213 IntStream.range(0, count)
214 .forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
215 sum.addAndGet(x);
216 latch.countDown();
217 }));
218 latch.await();
219 assertEquals(count, Bytes.toLong(
220 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
221 assertEquals((1 + count) * count / 2, sum.get());
224 @SuppressWarnings("FutureReturnValueIgnored")
225 @Test
226 public void testAppend() throws InterruptedException, ExecutionException {
227 AsyncTable<?> table = getTable.get();
228 int count = 10;
229 CountDownLatch latch = new CountDownLatch(count);
230 char suffix = ':';
231 AtomicLong suffixCount = new AtomicLong(0L);
232 IntStream.range(0, count)
233 .forEachOrdered(i -> table
234 .append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
235 .thenAccept(r -> {
236 suffixCount.addAndGet(
237 Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
238 latch.countDown();
239 }));
240 latch.await();
241 assertEquals((1 + count) * count / 2, suffixCount.get());
242 String value = Bytes.toString(
243 table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
244 int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
245 .sorted().toArray();
246 assertArrayEquals(IntStream.range(0, count).toArray(), actual);
249 @SuppressWarnings("FutureReturnValueIgnored")
250 @Test
251 public void testCheckAndPut() throws InterruptedException, ExecutionException {
252 AsyncTable<?> table = getTable.get();
253 AtomicInteger successCount = new AtomicInteger(0);
254 AtomicInteger successIndex = new AtomicInteger(-1);
255 int count = 10;
256 CountDownLatch latch = new CountDownLatch(count);
257 IntStream.range(0, count)
258 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
259 .thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
260 if (x) {
261 successCount.incrementAndGet();
262 successIndex.set(i);
264 latch.countDown();
265 }));
266 latch.await();
267 assertEquals(1, successCount.get());
268 String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
269 assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
272 @SuppressWarnings("FutureReturnValueIgnored")
273 @Test
274 public void testCheckAndDelete() throws InterruptedException, ExecutionException {
275 AsyncTable<?> table = getTable.get();
276 int count = 10;
277 CountDownLatch putLatch = new CountDownLatch(count + 1);
278 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
279 IntStream.range(0, count)
280 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
281 .thenRun(() -> putLatch.countDown()));
282 putLatch.await();
284 AtomicInteger successCount = new AtomicInteger(0);
285 AtomicInteger successIndex = new AtomicInteger(-1);
286 CountDownLatch deleteLatch = new CountDownLatch(count);
287 IntStream.range(0, count)
288 .forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
289 .thenDelete(
290 new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
291 .thenAccept(x -> {
292 if (x) {
293 successCount.incrementAndGet();
294 successIndex.set(i);
296 deleteLatch.countDown();
297 }));
298 deleteLatch.await();
299 assertEquals(1, successCount.get());
300 Result result = table.get(new Get(row)).get();
301 IntStream.range(0, count).forEach(i -> {
302 if (i == successIndex.get()) {
303 assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
304 } else {
305 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
310 @Test
311 public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
312 AsyncTable<?> table = getTable.get();
313 RowMutations mutation = new RowMutations(row);
314 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
315 table.mutateRow(mutation).get();
316 Result result = table.get(new Get(row)).get();
317 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
319 mutation = new RowMutations(row);
320 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
321 mutation.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
322 table.mutateRow(mutation).get();
323 result = table.get(new Get(row)).get();
324 assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
325 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
328 @SuppressWarnings("FutureReturnValueIgnored")
329 @Test
330 public void testCheckAndMutate() throws InterruptedException, ExecutionException {
331 AsyncTable<?> table = getTable.get();
332 int count = 10;
333 CountDownLatch putLatch = new CountDownLatch(count + 1);
334 table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
335 IntStream.range(0, count)
336 .forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
337 .thenRun(() -> putLatch.countDown()));
338 putLatch.await();
340 AtomicInteger successCount = new AtomicInteger(0);
341 AtomicInteger successIndex = new AtomicInteger(-1);
342 CountDownLatch mutateLatch = new CountDownLatch(count);
343 IntStream.range(0, count).forEach(i -> {
344 RowMutations mutation = new RowMutations(row);
345 try {
346 mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
347 mutation
348 .add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
349 } catch (IOException e) {
350 throw new UncheckedIOException(e);
352 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
353 .thenAccept(x -> {
354 if (x) {
355 successCount.incrementAndGet();
356 successIndex.set(i);
358 mutateLatch.countDown();
361 mutateLatch.await();
362 assertEquals(1, successCount.get());
363 Result result = table.get(new Get(row)).get();
364 IntStream.range(0, count).forEach(i -> {
365 if (i == successIndex.get()) {
366 assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
367 } else {
368 assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
373 @Test
374 public void testCheckAndMutateWithTimeRange() throws Exception {
375 AsyncTable<?> table = getTable.get();
376 final long ts = System.currentTimeMillis() / 2;
377 Put put = new Put(row);
378 put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
380 boolean ok =
381 table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
382 assertTrue(ok);
384 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
385 .ifEquals(VALUE).thenPut(put).get();
386 assertFalse(ok);
388 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
389 .ifEquals(VALUE).thenPut(put).get();
390 assertTrue(ok);
392 RowMutations rm = new RowMutations(row).add((Mutation) put);
393 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
394 .ifEquals(VALUE).thenMutate(rm).get();
395 assertFalse(ok);
397 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
398 .ifEquals(VALUE).thenMutate(rm).get();
399 assertTrue(ok);
401 Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
403 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
404 .ifEquals(VALUE).thenDelete(delete).get();
405 assertFalse(ok);
407 ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
408 .ifEquals(VALUE).thenDelete(delete).get();
409 assertTrue(ok);
412 @Test
413 public void testCheckAndMutateWithSingleFilter() throws Throwable {
414 AsyncTable<?> table = getTable.get();
416 // Put one row
417 Put put = new Put(row);
418 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
419 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
420 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
421 table.put(put).get();
423 // Put with success
424 boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
425 CompareOperator.EQUAL, Bytes.toBytes("a")))
426 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
427 .get();
428 assertTrue(ok);
430 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
431 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
433 // Put with failure
434 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
435 CompareOperator.EQUAL, Bytes.toBytes("b")))
436 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
437 .get();
438 assertFalse(ok);
440 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
442 // Delete with success
443 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
444 CompareOperator.EQUAL, Bytes.toBytes("a")))
445 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
446 .get();
447 assertTrue(ok);
449 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
451 // Mutate with success
452 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
453 CompareOperator.EQUAL, Bytes.toBytes("b")))
454 .thenMutate(new RowMutations(row)
455 .add((Mutation) new Put(row)
456 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
457 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
458 .get();
459 assertTrue(ok);
461 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
462 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
464 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
467 @Test
468 public void testCheckAndMutateWithMultipleFilters() throws Throwable {
469 AsyncTable<?> table = getTable.get();
471 // Put one row
472 Put put = new Put(row);
473 put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
474 put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
475 put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
476 table.put(put).get();
478 // Put with success
479 boolean ok = table.checkAndMutate(row, new FilterList(
480 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
481 Bytes.toBytes("a")),
482 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
483 Bytes.toBytes("b"))
485 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
486 .get();
487 assertTrue(ok);
489 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
490 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
492 // Put with failure
493 ok = table.checkAndMutate(row, new FilterList(
494 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
495 Bytes.toBytes("a")),
496 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
497 Bytes.toBytes("c"))
499 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
500 .get();
501 assertFalse(ok);
503 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
505 // Delete with success
506 ok = table.checkAndMutate(row, new FilterList(
507 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
508 Bytes.toBytes("a")),
509 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
510 Bytes.toBytes("b"))
512 .thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
513 .get();
514 assertTrue(ok);
516 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
518 // Mutate with success
519 ok = table.checkAndMutate(row, new FilterList(
520 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
521 Bytes.toBytes("a")),
522 new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
523 Bytes.toBytes("b"))
525 .thenMutate(new RowMutations(row)
526 .add((Mutation) new Put(row)
527 .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
528 .add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
529 .get();
530 assertTrue(ok);
532 result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
533 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
535 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
538 @Test
539 public void testCheckAndMutateWithTimestampFilter() throws Throwable {
540 AsyncTable<?> table = getTable.get();
542 // Put with specifying the timestamp
543 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
545 // Put with success
546 boolean ok = table.checkAndMutate(row, new FilterList(
547 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
548 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
549 new TimestampsFilter(Collections.singletonList(100L))
551 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
552 .get();
553 assertTrue(ok);
555 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
556 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
558 // Put with failure
559 ok = table.checkAndMutate(row, new FilterList(
560 new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
561 new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
562 new TimestampsFilter(Collections.singletonList(101L))
564 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
565 .get();
566 assertFalse(ok);
568 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
571 @Test
572 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
573 AsyncTable<?> table = getTable.get();
575 // Put with specifying the timestamp
576 table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
577 .get();
579 // Put with success
580 boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
581 CompareOperator.EQUAL, Bytes.toBytes("a")))
582 .timeRange(TimeRange.between(0, 101))
583 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
584 .get();
585 assertTrue(ok);
587 Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
588 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
590 // Put with failure
591 ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
592 CompareOperator.EQUAL, Bytes.toBytes("a")))
593 .timeRange(TimeRange.between(0, 100))
594 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
595 .get();
596 assertFalse(ok);
598 assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
601 @Test(expected = NullPointerException.class)
602 public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable {
603 getTable.get().checkAndMutate(row, FAMILY)
604 .thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
607 @Test
608 public void testDisabled() throws InterruptedException, ExecutionException {
609 ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
610 try {
611 getTable.get().get(new Get(row)).get();
612 fail("Should fail since table has been disabled");
613 } catch (ExecutionException e) {
614 Throwable cause = e.getCause();
615 assertThat(cause, instanceOf(TableNotEnabledException.class));
616 assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
620 @Test
621 public void testInvalidPut() {
622 try {
623 getTable.get().put(new Put(Bytes.toBytes(0)));
624 fail("Should fail since the put does not contain any cells");
625 } catch (IllegalArgumentException e) {
626 assertThat(e.getMessage(), containsString("No columns to insert"));
629 try {
630 getTable.get()
631 .put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
632 fail("Should fail since the put exceeds the max key value size");
633 } catch (IllegalArgumentException e) {
634 assertThat(e.getMessage(), containsString("KeyValue size too large"));