2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.hamcrest
.CoreMatchers
.containsString
;
21 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
22 import static org
.junit
.Assert
.assertArrayEquals
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertFalse
;
25 import static org
.junit
.Assert
.assertNull
;
26 import static org
.junit
.Assert
.assertThat
;
27 import static org
.junit
.Assert
.assertTrue
;
28 import static org
.junit
.Assert
.fail
;
30 import java
.io
.IOException
;
31 import java
.io
.UncheckedIOException
;
32 import java
.util
.Arrays
;
33 import java
.util
.Collections
;
34 import java
.util
.List
;
35 import java
.util
.concurrent
.ArrayBlockingQueue
;
36 import java
.util
.concurrent
.BlockingQueue
;
37 import java
.util
.concurrent
.CountDownLatch
;
38 import java
.util
.concurrent
.ExecutionException
;
39 import java
.util
.concurrent
.ForkJoinPool
;
40 import java
.util
.concurrent
.atomic
.AtomicInteger
;
41 import java
.util
.concurrent
.atomic
.AtomicLong
;
42 import java
.util
.function
.Supplier
;
43 import java
.util
.stream
.IntStream
;
44 import org
.apache
.commons
.io
.IOUtils
;
45 import org
.apache
.hadoop
.hbase
.CompareOperator
;
46 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
47 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
48 import org
.apache
.hadoop
.hbase
.TableName
;
49 import org
.apache
.hadoop
.hbase
.TableNotEnabledException
;
50 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
51 import org
.apache
.hadoop
.hbase
.filter
.FamilyFilter
;
52 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
53 import org
.apache
.hadoop
.hbase
.filter
.QualifierFilter
;
54 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
55 import org
.apache
.hadoop
.hbase
.filter
.TimestampsFilter
;
56 import org
.apache
.hadoop
.hbase
.io
.TimeRange
;
57 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
58 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
59 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
60 import org
.apache
.hadoop
.hbase
.util
.Pair
;
61 import org
.junit
.AfterClass
;
62 import org
.junit
.Before
;
63 import org
.junit
.BeforeClass
;
64 import org
.junit
.ClassRule
;
65 import org
.junit
.Rule
;
66 import org
.junit
.Test
;
67 import org
.junit
.experimental
.categories
.Category
;
68 import org
.junit
.rules
.TestName
;
69 import org
.junit
.runner
.RunWith
;
70 import org
.junit
.runners
.Parameterized
;
71 import org
.junit
.runners
.Parameterized
.Parameter
;
72 import org
.junit
.runners
.Parameterized
.Parameters
;
74 @RunWith(Parameterized
.class)
75 @Category({ MediumTests
.class, ClientTests
.class })
76 public class TestAsyncTable
{
79 public static final HBaseClassTestRule CLASS_RULE
=
80 HBaseClassTestRule
.forClass(TestAsyncTable
.class);
82 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
84 private static TableName TABLE_NAME
= TableName
.valueOf("async");
86 private static byte[] FAMILY
= Bytes
.toBytes("cf");
88 private static byte[] QUALIFIER
= Bytes
.toBytes("cq");
90 private static byte[] VALUE
= Bytes
.toBytes("value");
92 private static int MAX_KEY_VALUE_SIZE
= 64 * 1024;
94 private static AsyncConnection ASYNC_CONN
;
97 public TestName testName
= new TestName();
102 public Supplier
<AsyncTable
<?
>> getTable
;
104 private static AsyncTable
<?
> getRawTable() {
105 return ASYNC_CONN
.getTable(TABLE_NAME
);
108 private static AsyncTable
<?
> getTable() {
109 return ASYNC_CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool());
113 public static List
<Object
[]> params() {
114 return Arrays
.asList(new Supplier
<?
>[] { TestAsyncTable
::getRawTable
},
115 new Supplier
<?
>[] { TestAsyncTable
::getTable
});
119 public static void setUpBeforeClass() throws Exception
{
120 TEST_UTIL
.getConfiguration().setInt(ConnectionConfiguration
.MAX_KEYVALUE_SIZE_KEY
,
122 TEST_UTIL
.startMiniCluster(1);
123 TEST_UTIL
.createTable(TABLE_NAME
, FAMILY
);
124 TEST_UTIL
.waitTableAvailable(TABLE_NAME
);
125 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
126 assertFalse(ASYNC_CONN
.isClosed());
130 public static void tearDownAfterClass() throws Exception
{
131 IOUtils
.closeQuietly(ASYNC_CONN
);
132 assertTrue(ASYNC_CONN
.isClosed());
133 TEST_UTIL
.shutdownMiniCluster();
137 public void setUp() throws IOException
, InterruptedException
, ExecutionException
{
138 row
= Bytes
.toBytes(testName
.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
139 if (ASYNC_CONN
.getAdmin().isTableDisabled(TABLE_NAME
).get()) {
140 ASYNC_CONN
.getAdmin().enableTable(TABLE_NAME
).get();
145 public void testSimple() throws Exception
{
146 AsyncTable
<?
> table
= getTable
.get();
147 table
.put(new Put(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
)).get();
148 assertTrue(table
.exists(new Get(row
).addColumn(FAMILY
, QUALIFIER
)).get());
149 Result result
= table
.get(new Get(row
).addColumn(FAMILY
, QUALIFIER
)).get();
150 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, QUALIFIER
));
151 table
.delete(new Delete(row
)).get();
152 result
= table
.get(new Get(row
).addColumn(FAMILY
, QUALIFIER
)).get();
153 assertTrue(result
.isEmpty());
154 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, QUALIFIER
)).get());
157 private byte[] concat(byte[] base
, int index
) {
158 return Bytes
.toBytes(Bytes
.toString(base
) + "-" + index
);
161 @SuppressWarnings("FutureReturnValueIgnored")
163 public void testSimpleMultiple() throws Exception
{
164 AsyncTable
<?
> table
= getTable
.get();
166 CountDownLatch putLatch
= new CountDownLatch(count
);
167 IntStream
.range(0, count
).forEach(
168 i
-> table
.put(new Put(concat(row
, i
)).addColumn(FAMILY
, QUALIFIER
, concat(VALUE
, i
)))
169 .thenAccept(x
-> putLatch
.countDown()));
171 BlockingQueue
<Boolean
> existsResp
= new ArrayBlockingQueue
<>(count
);
172 IntStream
.range(0, count
)
173 .forEach(i
-> table
.exists(new Get(concat(row
, i
)).addColumn(FAMILY
, QUALIFIER
))
174 .thenAccept(x
-> existsResp
.add(x
)));
175 for (int i
= 0; i
< count
; i
++) {
176 assertTrue(existsResp
.take());
178 BlockingQueue
<Pair
<Integer
, Result
>> getResp
= new ArrayBlockingQueue
<>(count
);
179 IntStream
.range(0, count
)
180 .forEach(i
-> table
.get(new Get(concat(row
, i
)).addColumn(FAMILY
, QUALIFIER
))
181 .thenAccept(x
-> getResp
.add(Pair
.newPair(i
, x
))));
182 for (int i
= 0; i
< count
; i
++) {
183 Pair
<Integer
, Result
> pair
= getResp
.take();
184 assertArrayEquals(concat(VALUE
, pair
.getFirst()),
185 pair
.getSecond().getValue(FAMILY
, QUALIFIER
));
187 CountDownLatch deleteLatch
= new CountDownLatch(count
);
188 IntStream
.range(0, count
).forEach(
189 i
-> table
.delete(new Delete(concat(row
, i
))).thenAccept(x
-> deleteLatch
.countDown()));
191 IntStream
.range(0, count
)
192 .forEach(i
-> table
.exists(new Get(concat(row
, i
)).addColumn(FAMILY
, QUALIFIER
))
193 .thenAccept(x
-> existsResp
.add(x
)));
194 for (int i
= 0; i
< count
; i
++) {
195 assertFalse(existsResp
.take());
197 IntStream
.range(0, count
)
198 .forEach(i
-> table
.get(new Get(concat(row
, i
)).addColumn(FAMILY
, QUALIFIER
))
199 .thenAccept(x
-> getResp
.add(Pair
.newPair(i
, x
))));
200 for (int i
= 0; i
< count
; i
++) {
201 Pair
<Integer
, Result
> pair
= getResp
.take();
202 assertTrue(pair
.getSecond().isEmpty());
206 @SuppressWarnings("FutureReturnValueIgnored")
208 public void testIncrement() throws InterruptedException
, ExecutionException
{
209 AsyncTable
<?
> table
= getTable
.get();
211 CountDownLatch latch
= new CountDownLatch(count
);
212 AtomicLong sum
= new AtomicLong(0L);
213 IntStream
.range(0, count
)
214 .forEach(i
-> table
.incrementColumnValue(row
, FAMILY
, QUALIFIER
, 1).thenAccept(x
-> {
219 assertEquals(count
, Bytes
.toLong(
220 table
.get(new Get(row
).addColumn(FAMILY
, QUALIFIER
)).get().getValue(FAMILY
, QUALIFIER
)));
221 assertEquals((1 + count
) * count
/ 2, sum
.get());
224 @SuppressWarnings("FutureReturnValueIgnored")
226 public void testAppend() throws InterruptedException
, ExecutionException
{
227 AsyncTable
<?
> table
= getTable
.get();
229 CountDownLatch latch
= new CountDownLatch(count
);
231 AtomicLong suffixCount
= new AtomicLong(0L);
232 IntStream
.range(0, count
)
233 .forEachOrdered(i
-> table
234 .append(new Append(row
).addColumn(FAMILY
, QUALIFIER
, Bytes
.toBytes("" + i
+ suffix
)))
236 suffixCount
.addAndGet(
237 Bytes
.toString(r
.getValue(FAMILY
, QUALIFIER
)).chars().filter(x
-> x
== suffix
).count());
241 assertEquals((1 + count
) * count
/ 2, suffixCount
.get());
242 String value
= Bytes
.toString(
243 table
.get(new Get(row
).addColumn(FAMILY
, QUALIFIER
)).get().getValue(FAMILY
, QUALIFIER
));
244 int[] actual
= Arrays
.asList(value
.split("" + suffix
)).stream().mapToInt(Integer
::parseInt
)
246 assertArrayEquals(IntStream
.range(0, count
).toArray(), actual
);
249 @SuppressWarnings("FutureReturnValueIgnored")
251 public void testCheckAndPut() throws InterruptedException
, ExecutionException
{
252 AsyncTable
<?
> table
= getTable
.get();
253 AtomicInteger successCount
= new AtomicInteger(0);
254 AtomicInteger successIndex
= new AtomicInteger(-1);
256 CountDownLatch latch
= new CountDownLatch(count
);
257 IntStream
.range(0, count
)
258 .forEach(i
-> table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).ifNotExists()
259 .thenPut(new Put(row
).addColumn(FAMILY
, QUALIFIER
, concat(VALUE
, i
))).thenAccept(x
-> {
261 successCount
.incrementAndGet();
267 assertEquals(1, successCount
.get());
268 String actual
= Bytes
.toString(table
.get(new Get(row
)).get().getValue(FAMILY
, QUALIFIER
));
269 assertTrue(actual
.endsWith(Integer
.toString(successIndex
.get())));
272 @SuppressWarnings("FutureReturnValueIgnored")
274 public void testCheckAndDelete() throws InterruptedException
, ExecutionException
{
275 AsyncTable
<?
> table
= getTable
.get();
277 CountDownLatch putLatch
= new CountDownLatch(count
+ 1);
278 table
.put(new Put(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
)).thenRun(() -> putLatch
.countDown());
279 IntStream
.range(0, count
)
280 .forEach(i
-> table
.put(new Put(row
).addColumn(FAMILY
, concat(QUALIFIER
, i
), VALUE
))
281 .thenRun(() -> putLatch
.countDown()));
284 AtomicInteger successCount
= new AtomicInteger(0);
285 AtomicInteger successIndex
= new AtomicInteger(-1);
286 CountDownLatch deleteLatch
= new CountDownLatch(count
);
287 IntStream
.range(0, count
)
288 .forEach(i
-> table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).ifEquals(VALUE
)
290 new Delete(row
).addColumn(FAMILY
, QUALIFIER
).addColumn(FAMILY
, concat(QUALIFIER
, i
)))
293 successCount
.incrementAndGet();
296 deleteLatch
.countDown();
299 assertEquals(1, successCount
.get());
300 Result result
= table
.get(new Get(row
)).get();
301 IntStream
.range(0, count
).forEach(i
-> {
302 if (i
== successIndex
.get()) {
303 assertFalse(result
.containsColumn(FAMILY
, concat(QUALIFIER
, i
)));
305 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, concat(QUALIFIER
, i
)));
311 public void testMutateRow() throws InterruptedException
, ExecutionException
, IOException
{
312 AsyncTable
<?
> table
= getTable
.get();
313 RowMutations mutation
= new RowMutations(row
);
314 mutation
.add((Mutation
) new Put(row
).addColumn(FAMILY
, concat(QUALIFIER
, 1), VALUE
));
315 table
.mutateRow(mutation
).get();
316 Result result
= table
.get(new Get(row
)).get();
317 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, concat(QUALIFIER
, 1)));
319 mutation
= new RowMutations(row
);
320 mutation
.add((Mutation
) new Delete(row
).addColumn(FAMILY
, concat(QUALIFIER
, 1)));
321 mutation
.add((Mutation
) new Put(row
).addColumn(FAMILY
, concat(QUALIFIER
, 2), VALUE
));
322 table
.mutateRow(mutation
).get();
323 result
= table
.get(new Get(row
)).get();
324 assertNull(result
.getValue(FAMILY
, concat(QUALIFIER
, 1)));
325 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, concat(QUALIFIER
, 2)));
328 @SuppressWarnings("FutureReturnValueIgnored")
330 public void testCheckAndMutate() throws InterruptedException
, ExecutionException
{
331 AsyncTable
<?
> table
= getTable
.get();
333 CountDownLatch putLatch
= new CountDownLatch(count
+ 1);
334 table
.put(new Put(row
).addColumn(FAMILY
, QUALIFIER
, VALUE
)).thenRun(() -> putLatch
.countDown());
335 IntStream
.range(0, count
)
336 .forEach(i
-> table
.put(new Put(row
).addColumn(FAMILY
, concat(QUALIFIER
, i
), VALUE
))
337 .thenRun(() -> putLatch
.countDown()));
340 AtomicInteger successCount
= new AtomicInteger(0);
341 AtomicInteger successIndex
= new AtomicInteger(-1);
342 CountDownLatch mutateLatch
= new CountDownLatch(count
);
343 IntStream
.range(0, count
).forEach(i
-> {
344 RowMutations mutation
= new RowMutations(row
);
346 mutation
.add((Mutation
) new Delete(row
).addColumn(FAMILY
, QUALIFIER
));
348 .add((Mutation
) new Put(row
).addColumn(FAMILY
, concat(QUALIFIER
, i
), concat(VALUE
, i
)));
349 } catch (IOException e
) {
350 throw new UncheckedIOException(e
);
352 table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).ifEquals(VALUE
).thenMutate(mutation
)
355 successCount
.incrementAndGet();
358 mutateLatch
.countDown();
362 assertEquals(1, successCount
.get());
363 Result result
= table
.get(new Get(row
)).get();
364 IntStream
.range(0, count
).forEach(i
-> {
365 if (i
== successIndex
.get()) {
366 assertArrayEquals(concat(VALUE
, i
), result
.getValue(FAMILY
, concat(QUALIFIER
, i
)));
368 assertArrayEquals(VALUE
, result
.getValue(FAMILY
, concat(QUALIFIER
, i
)));
374 public void testCheckAndMutateWithTimeRange() throws Exception
{
375 AsyncTable
<?
> table
= getTable
.get();
376 final long ts
= System
.currentTimeMillis() / 2;
377 Put put
= new Put(row
);
378 put
.addColumn(FAMILY
, QUALIFIER
, ts
, VALUE
);
381 table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).ifNotExists().thenPut(put
).get();
384 ok
= table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).timeRange(TimeRange
.at(ts
+ 10000))
385 .ifEquals(VALUE
).thenPut(put
).get();
388 ok
= table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).timeRange(TimeRange
.at(ts
))
389 .ifEquals(VALUE
).thenPut(put
).get();
392 RowMutations rm
= new RowMutations(row
).add((Mutation
) put
);
393 ok
= table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).timeRange(TimeRange
.at(ts
+ 10000))
394 .ifEquals(VALUE
).thenMutate(rm
).get();
397 ok
= table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).timeRange(TimeRange
.at(ts
))
398 .ifEquals(VALUE
).thenMutate(rm
).get();
401 Delete delete
= new Delete(row
).addColumn(FAMILY
, QUALIFIER
);
403 ok
= table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).timeRange(TimeRange
.at(ts
+ 10000))
404 .ifEquals(VALUE
).thenDelete(delete
).get();
407 ok
= table
.checkAndMutate(row
, FAMILY
).qualifier(QUALIFIER
).timeRange(TimeRange
.at(ts
))
408 .ifEquals(VALUE
).thenDelete(delete
).get();
413 public void testCheckAndMutateWithSingleFilter() throws Throwable
{
414 AsyncTable
<?
> table
= getTable
.get();
417 Put put
= new Put(row
);
418 put
.addColumn(FAMILY
, Bytes
.toBytes("A"), Bytes
.toBytes("a"));
419 put
.addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("b"));
420 put
.addColumn(FAMILY
, Bytes
.toBytes("C"), Bytes
.toBytes("c"));
421 table
.put(put
).get();
424 boolean ok
= table
.checkAndMutate(row
, new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"),
425 CompareOperator
.EQUAL
, Bytes
.toBytes("a")))
426 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d")))
430 Result result
= table
.get(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("D"))).get();
431 assertEquals("d", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("D"))));
434 ok
= table
.checkAndMutate(row
, new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"),
435 CompareOperator
.EQUAL
, Bytes
.toBytes("b")))
436 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("E"), Bytes
.toBytes("e")))
440 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("E"))).get());
442 // Delete with success
443 ok
= table
.checkAndMutate(row
, new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"),
444 CompareOperator
.EQUAL
, Bytes
.toBytes("a")))
445 .thenDelete(new Delete(row
).addColumns(FAMILY
, Bytes
.toBytes("D")))
449 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("D"))).get());
451 // Mutate with success
452 ok
= table
.checkAndMutate(row
, new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("B"),
453 CompareOperator
.EQUAL
, Bytes
.toBytes("b")))
454 .thenMutate(new RowMutations(row
)
455 .add((Mutation
) new Put(row
)
456 .addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d")))
457 .add((Mutation
) new Delete(row
).addColumns(FAMILY
, Bytes
.toBytes("A"))))
461 result
= table
.get(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("D"))).get();
462 assertEquals("d", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("D"))));
464 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("A"))).get());
468 public void testCheckAndMutateWithMultipleFilters() throws Throwable
{
469 AsyncTable
<?
> table
= getTable
.get();
472 Put put
= new Put(row
);
473 put
.addColumn(FAMILY
, Bytes
.toBytes("A"), Bytes
.toBytes("a"));
474 put
.addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("b"));
475 put
.addColumn(FAMILY
, Bytes
.toBytes("C"), Bytes
.toBytes("c"));
476 table
.put(put
).get();
479 boolean ok
= table
.checkAndMutate(row
, new FilterList(
480 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"), CompareOperator
.EQUAL
,
482 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("B"), CompareOperator
.EQUAL
,
485 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d")))
489 Result result
= table
.get(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("D"))).get();
490 assertEquals("d", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("D"))));
493 ok
= table
.checkAndMutate(row
, new FilterList(
494 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"), CompareOperator
.EQUAL
,
496 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("B"), CompareOperator
.EQUAL
,
499 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("E"), Bytes
.toBytes("e")))
503 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("E"))).get());
505 // Delete with success
506 ok
= table
.checkAndMutate(row
, new FilterList(
507 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"), CompareOperator
.EQUAL
,
509 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("B"), CompareOperator
.EQUAL
,
512 .thenDelete(new Delete(row
).addColumns(FAMILY
, Bytes
.toBytes("D")))
516 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("D"))).get());
518 // Mutate with success
519 ok
= table
.checkAndMutate(row
, new FilterList(
520 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"), CompareOperator
.EQUAL
,
522 new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("B"), CompareOperator
.EQUAL
,
525 .thenMutate(new RowMutations(row
)
526 .add((Mutation
) new Put(row
)
527 .addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d")))
528 .add((Mutation
) new Delete(row
).addColumns(FAMILY
, Bytes
.toBytes("A"))))
532 result
= table
.get(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("D"))).get();
533 assertEquals("d", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("D"))));
535 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("A"))).get());
539 public void testCheckAndMutateWithTimestampFilter() throws Throwable
{
540 AsyncTable
<?
> table
= getTable
.get();
542 // Put with specifying the timestamp
543 table
.put(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("A"), 100, Bytes
.toBytes("a"))).get();
546 boolean ok
= table
.checkAndMutate(row
, new FilterList(
547 new FamilyFilter(CompareOperator
.EQUAL
, new BinaryComparator(FAMILY
)),
548 new QualifierFilter(CompareOperator
.EQUAL
, new BinaryComparator(Bytes
.toBytes("A"))),
549 new TimestampsFilter(Collections
.singletonList(100L))
551 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("b")))
555 Result result
= table
.get(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("B"))).get();
556 assertEquals("b", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("B"))));
559 ok
= table
.checkAndMutate(row
, new FilterList(
560 new FamilyFilter(CompareOperator
.EQUAL
, new BinaryComparator(FAMILY
)),
561 new QualifierFilter(CompareOperator
.EQUAL
, new BinaryComparator(Bytes
.toBytes("A"))),
562 new TimestampsFilter(Collections
.singletonList(101L))
564 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("C"), Bytes
.toBytes("c")))
568 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("C"))).get());
572 public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable
{
573 AsyncTable
<?
> table
= getTable
.get();
575 // Put with specifying the timestamp
576 table
.put(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("A"), 100, Bytes
.toBytes("a")))
580 boolean ok
= table
.checkAndMutate(row
, new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"),
581 CompareOperator
.EQUAL
, Bytes
.toBytes("a")))
582 .timeRange(TimeRange
.between(0, 101))
583 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("b")))
587 Result result
= table
.get(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("B"))).get();
588 assertEquals("b", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("B"))));
591 ok
= table
.checkAndMutate(row
, new SingleColumnValueFilter(FAMILY
, Bytes
.toBytes("A"),
592 CompareOperator
.EQUAL
, Bytes
.toBytes("a")))
593 .timeRange(TimeRange
.between(0, 100))
594 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("C"), Bytes
.toBytes("c")))
598 assertFalse(table
.exists(new Get(row
).addColumn(FAMILY
, Bytes
.toBytes("C"))).get());
601 @Test(expected
= NullPointerException
.class)
602 public void testCheckAndMutateWithNotSpecifyingCondition() throws Throwable
{
603 getTable
.get().checkAndMutate(row
, FAMILY
)
604 .thenPut(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d")));
608 public void testDisabled() throws InterruptedException
, ExecutionException
{
609 ASYNC_CONN
.getAdmin().disableTable(TABLE_NAME
).get();
611 getTable
.get().get(new Get(row
)).get();
612 fail("Should fail since table has been disabled");
613 } catch (ExecutionException e
) {
614 Throwable cause
= e
.getCause();
615 assertThat(cause
, instanceOf(TableNotEnabledException
.class));
616 assertThat(cause
.getMessage(), containsString(TABLE_NAME
.getNameAsString()));
621 public void testInvalidPut() {
623 getTable
.get().put(new Put(Bytes
.toBytes(0)));
624 fail("Should fail since the put does not contain any cells");
625 } catch (IllegalArgumentException e
) {
626 assertThat(e
.getMessage(), containsString("No columns to insert"));
631 .put(new Put(Bytes
.toBytes(0)).addColumn(FAMILY
, QUALIFIER
, new byte[MAX_KEY_VALUE_SIZE
]));
632 fail("Should fail since the put exceeds the max key value size");
633 } catch (IllegalArgumentException e
) {
634 assertThat(e
.getMessage(), containsString("KeyValue size too large"));