2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertNull
;
23 import static org
.junit
.Assert
.assertTrue
;
24 import static org
.junit
.Assert
.fail
;
26 import java
.io
.IOException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Arrays
;
29 import java
.util
.Collections
;
30 import java
.util
.List
;
31 import java
.util
.Optional
;
32 import java
.util
.Random
;
33 import java
.util
.concurrent
.CountDownLatch
;
34 import java
.util
.concurrent
.ExecutorService
;
35 import java
.util
.concurrent
.Executors
;
36 import java
.util
.concurrent
.ThreadLocalRandom
;
37 import java
.util
.concurrent
.TimeUnit
;
38 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
39 import java
.util
.concurrent
.atomic
.AtomicInteger
;
40 import org
.apache
.hadoop
.conf
.Configuration
;
41 import org
.apache
.hadoop
.hbase
.Cell
;
42 import org
.apache
.hadoop
.hbase
.CellUtil
;
43 import org
.apache
.hadoop
.hbase
.Coprocessor
;
44 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
45 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
46 import org
.apache
.hadoop
.hbase
.HConstants
;
47 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
48 import org
.apache
.hadoop
.hbase
.RegionMetrics
;
49 import org
.apache
.hadoop
.hbase
.ServerName
;
50 import org
.apache
.hadoop
.hbase
.TableName
;
51 import org
.apache
.hadoop
.hbase
.coprocessor
.MultiRowMutationEndpoint
;
52 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
53 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
54 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
55 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
56 import org
.apache
.hadoop
.hbase
.exceptions
.UnknownProtocolException
;
57 import org
.apache
.hadoop
.hbase
.ipc
.CoprocessorRpcUtils
;
58 import org
.apache
.hadoop
.hbase
.ipc
.RpcClient
;
59 import org
.apache
.hadoop
.hbase
.ipc
.RpcClientFactory
;
60 import org
.apache
.hadoop
.hbase
.ipc
.ServerRpcController
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.MiniBatchOperationInProgress
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
65 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
67 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
68 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
69 import org
.junit
.After
;
70 import org
.junit
.AfterClass
;
71 import org
.junit
.Assert
;
72 import org
.junit
.Before
;
73 import org
.junit
.BeforeClass
;
74 import org
.junit
.ClassRule
;
75 import org
.junit
.Rule
;
76 import org
.junit
.Test
;
77 import org
.junit
.experimental
.categories
.Category
;
78 import org
.junit
.rules
.TestName
;
79 import org
.slf4j
.Logger
;
80 import org
.slf4j
.LoggerFactory
;
82 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
83 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
;
84 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.MultiRowMutationProtos
;
86 @Category({LargeTests
.class, ClientTests
.class})
87 public class TestFromClientSide3
{
90 public static final HBaseClassTestRule CLASS_RULE
=
91 HBaseClassTestRule
.forClass(TestFromClientSide3
.class);
93 private static final Logger LOG
= LoggerFactory
.getLogger(TestFromClientSide3
.class);
94 private final static HBaseTestingUtil TEST_UTIL
95 = new HBaseTestingUtil();
96 private static final int WAITTABLE_MILLIS
= 10000;
97 private static byte[] FAMILY
= Bytes
.toBytes("testFamily");
98 private static Random random
= new Random();
99 private static int SLAVES
= 3;
100 private static final byte[] ROW
= Bytes
.toBytes("testRow");
101 private static final byte[] ANOTHERROW
= Bytes
.toBytes("anotherrow");
102 private static final byte[] QUALIFIER
= Bytes
.toBytes("testQualifier");
103 private static final byte[] VALUE
= Bytes
.toBytes("testValue");
104 private static final byte[] COL_QUAL
= Bytes
.toBytes("f1");
105 private static final byte[] VAL_BYTES
= Bytes
.toBytes("v1");
106 private static final byte[] ROW_BYTES
= Bytes
.toBytes("r1");
109 public TestName name
= new TestName();
110 private TableName tableName
;
113 * @throws java.lang.Exception
116 public static void setUpBeforeClass() throws Exception
{
117 TEST_UTIL
.startMiniCluster(SLAVES
);
121 * @throws java.lang.Exception
124 public static void tearDownAfterClass() throws Exception
{
125 TEST_UTIL
.shutdownMiniCluster();
129 public void setUp() throws Exception
{
130 tableName
= TableName
.valueOf(name
.getMethodName());
134 public void tearDown() throws Exception
{
135 for (TableDescriptor htd
: TEST_UTIL
.getAdmin().listTableDescriptors()) {
136 LOG
.info("Tear down, remove table=" + htd
.getTableName());
137 TEST_UTIL
.deleteTable(htd
.getTableName());
141 private void randomCFPuts(Table table
, byte[] row
, byte[] family
, int nPuts
)
143 Put put
= new Put(row
);
144 for (int i
= 0; i
< nPuts
; i
++) {
145 byte[] qualifier
= Bytes
.toBytes(random
.nextInt());
146 byte[] value
= Bytes
.toBytes(random
.nextInt());
147 put
.addColumn(family
, qualifier
, value
);
152 private void performMultiplePutAndFlush(Admin admin
, Table table
, byte[] row
, byte[] family
,
153 int nFlushes
, int nPuts
) throws Exception
{
154 for (int i
= 0; i
< nFlushes
; i
++) {
155 randomCFPuts(table
, row
, family
, nPuts
);
156 admin
.flush(table
.getName());
160 private static List
<Cell
> toList(ResultScanner scanner
) {
162 List
<Cell
> cells
= new ArrayList
<>();
163 for (Result r
: scanner
) {
164 cells
.addAll(r
.listCells());
173 public void testScanAfterDeletingSpecifiedRow() throws IOException
, InterruptedException
{
174 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
175 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
176 byte[] row
= Bytes
.toBytes("SpecifiedRow");
177 byte[] value0
= Bytes
.toBytes("value_0");
178 byte[] value1
= Bytes
.toBytes("value_1");
179 Put put
= new Put(row
);
180 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
182 Delete d
= new Delete(row
);
185 put
.addColumn(FAMILY
, null, value0
);
188 put
.addColumn(FAMILY
, null, value1
);
190 List
<Cell
> cells
= toList(table
.getScanner(new Scan()));
191 assertEquals(1, cells
.size());
192 assertEquals("value_1", Bytes
.toString(CellUtil
.cloneValue(cells
.get(0))));
194 cells
= toList(table
.getScanner(new Scan().addFamily(FAMILY
)));
195 assertEquals(1, cells
.size());
196 assertEquals("value_1", Bytes
.toString(CellUtil
.cloneValue(cells
.get(0))));
198 cells
= toList(table
.getScanner(new Scan().addColumn(FAMILY
, QUALIFIER
)));
199 assertEquals(0, cells
.size());
201 TEST_UTIL
.getAdmin().flush(tableName
);
202 cells
= toList(table
.getScanner(new Scan()));
203 assertEquals(1, cells
.size());
204 assertEquals("value_1", Bytes
.toString(CellUtil
.cloneValue(cells
.get(0))));
206 cells
= toList(table
.getScanner(new Scan().addFamily(FAMILY
)));
207 assertEquals(1, cells
.size());
208 assertEquals("value_1", Bytes
.toString(CellUtil
.cloneValue(cells
.get(0))));
210 cells
= toList(table
.getScanner(new Scan().addColumn(FAMILY
, QUALIFIER
)));
211 assertEquals(0, cells
.size());
216 public void testScanAfterDeletingSpecifiedRowV2() throws IOException
, InterruptedException
{
217 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
218 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
219 byte[] row
= Bytes
.toBytes("SpecifiedRow");
220 byte[] qual0
= Bytes
.toBytes("qual0");
221 byte[] qual1
= Bytes
.toBytes("qual1");
222 long now
= EnvironmentEdgeManager
.currentTime();
223 Delete d
= new Delete(row
, now
);
226 Put put
= new Put(row
);
227 put
.addColumn(FAMILY
, null, now
+ 1, VALUE
);
231 put
.addColumn(FAMILY
, qual1
, now
+ 2, qual1
);
235 put
.addColumn(FAMILY
, qual0
, now
+ 3, qual0
);
238 Result r
= table
.get(new Get(row
));
239 assertEquals(r
.toString(), 3, r
.size());
240 assertEquals("testValue", Bytes
.toString(CellUtil
.cloneValue(r
.rawCells()[0])));
241 assertEquals("qual0", Bytes
.toString(CellUtil
.cloneValue(r
.rawCells()[1])));
242 assertEquals("qual1", Bytes
.toString(CellUtil
.cloneValue(r
.rawCells()[2])));
244 TEST_UTIL
.getAdmin().flush(tableName
);
245 r
= table
.get(new Get(row
));
246 assertEquals(3, r
.size());
247 assertEquals("testValue", Bytes
.toString(CellUtil
.cloneValue(r
.rawCells()[0])));
248 assertEquals("qual0", Bytes
.toString(CellUtil
.cloneValue(r
.rawCells()[1])));
249 assertEquals("qual1", Bytes
.toString(CellUtil
.cloneValue(r
.rawCells()[2])));
253 private int getStoreFileCount(Admin admin
, ServerName serverName
, RegionInfo region
)
255 for (RegionMetrics metrics
: admin
.getRegionMetrics(serverName
, region
.getTable())) {
256 if (Bytes
.equals(region
.getRegionName(), metrics
.getRegionName())) {
257 return metrics
.getStoreFileCount();
263 // override the config settings at the CF level and ensure priority
265 public void testAdvancedConfigOverride() throws Exception
{
267 * Overall idea: (1) create 3 store files and issue a compaction. config's
268 * compaction.min == 3, so should work. (2) Increase the compaction.min
269 * toggle in the HTD to 5 and modify table. If we use the HTD value instead
270 * of the default config value, adding 3 files and issuing a compaction
271 * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
272 * and modify table. The CF schema should override the Table schema and now
273 * cause a minor compaction.
275 TEST_UTIL
.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
277 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
278 try (Table table
= TEST_UTIL
.createTable(tableName
, FAMILY
, 10)) {
279 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
280 Admin admin
= TEST_UTIL
.getAdmin();
282 // Create 3 store files.
283 byte[] row
= Bytes
.toBytes(random
.nextInt());
284 performMultiplePutAndFlush(admin
, table
, row
, FAMILY
, 3, 100);
286 try (RegionLocator locator
= TEST_UTIL
.getConnection().getRegionLocator(tableName
)) {
287 // Verify we have multiple store files.
288 HRegionLocation loc
= locator
.getRegionLocation(row
, true);
289 assertTrue(getStoreFileCount(admin
, loc
.getServerName(), loc
.getRegion()) > 1);
291 // Issue a compaction request
292 admin
.compact(tableName
);
294 // poll wait for the compactions to happen
295 for (int i
= 0; i
< 10 * 1000 / 40; ++i
) {
296 // The number of store files after compaction should be lesser.
297 loc
= locator
.getRegionLocation(row
, true);
298 if (!loc
.getRegion().isOffline()) {
299 if (getStoreFileCount(admin
, loc
.getServerName(), loc
.getRegion()) <= 1) {
305 // verify the compactions took place and that we didn't just time out
306 assertTrue(getStoreFileCount(admin
, loc
.getServerName(), loc
.getRegion()) <= 1);
308 // change the compaction.min config option for this table to 5
309 LOG
.info("hbase.hstore.compaction.min should now be 5");
310 TableDescriptor htd
= TableDescriptorBuilder
.newBuilder(table
.getDescriptor())
311 .setValue("hbase.hstore.compaction.min", String
.valueOf(5)).build();
312 admin
.modifyTable(htd
);
313 LOG
.info("alter status finished");
315 // Create 3 more store files.
316 performMultiplePutAndFlush(admin
, table
, row
, FAMILY
, 3, 10);
318 // Issue a compaction request
319 admin
.compact(tableName
);
321 // This time, the compaction request should not happen
322 Thread
.sleep(10 * 1000);
323 loc
= locator
.getRegionLocation(row
, true);
324 int sfCount
= getStoreFileCount(admin
, loc
.getServerName(), loc
.getRegion());
325 assertTrue(sfCount
> 1);
327 // change an individual CF's config option to 2 & online schema update
328 LOG
.info("hbase.hstore.compaction.min should now be 2");
329 htd
= TableDescriptorBuilder
.newBuilder(htd
)
330 .modifyColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(htd
.getColumnFamily(FAMILY
))
331 .setValue("hbase.hstore.compaction.min", String
.valueOf(2)).build())
333 admin
.modifyTable(htd
);
334 LOG
.info("alter status finished");
336 // Issue a compaction request
337 admin
.compact(tableName
);
339 // poll wait for the compactions to happen
340 for (int i
= 0; i
< 10 * 1000 / 40; ++i
) {
341 loc
= locator
.getRegionLocation(row
, true);
343 if (getStoreFileCount(admin
, loc
.getServerName(), loc
.getRegion()) < sfCount
) {
346 } catch (Exception e
) {
347 LOG
.debug("Waiting for region to come online: " +
348 Bytes
.toStringBinary(loc
.getRegion().getRegionName()));
353 // verify the compaction took place and that we didn't just time out
354 assertTrue(getStoreFileCount(admin
, loc
.getServerName(), loc
.getRegion()) < sfCount
);
356 // Finally, ensure that we can remove a custom config value after we made it
357 LOG
.info("Removing CF config value");
358 LOG
.info("hbase.hstore.compaction.min should now be 5");
359 htd
= TableDescriptorBuilder
.newBuilder(htd
)
360 .modifyColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(htd
.getColumnFamily(FAMILY
))
361 .setValue("hbase.hstore.compaction.min", null).build())
363 admin
.modifyTable(htd
);
364 LOG
.info("alter status finished");
365 assertNull(table
.getDescriptor().getColumnFamily(FAMILY
)
366 .getValue(Bytes
.toBytes("hbase.hstore.compaction.min")));
372 public void testHTableBatchWithEmptyPut () throws IOException
, InterruptedException
{
373 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
374 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
375 List actions
= (List
) new ArrayList();
376 Object
[] results
= new Object
[2];
377 // create an empty Put
378 Put put1
= new Put(ROW
);
381 Put put2
= new Put(ANOTHERROW
);
382 put2
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
385 table
.batch(actions
, results
);
386 fail("Empty Put should have failed the batch call");
387 } catch (IllegalArgumentException iae
) {
391 // Test Table.batch with large amount of mutations against the same key.
392 // It used to trigger read lock's "Maximum lock count exceeded" Error.
394 public void testHTableWithLargeBatch() throws IOException
, InterruptedException
{
395 int sixtyFourK
= 64 * 1024;
396 List actions
= new ArrayList();
397 Object
[] results
= new Object
[(sixtyFourK
+ 1) * 2];
399 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
400 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
402 for (int i
= 0; i
< sixtyFourK
+ 1; i
++) {
403 Put put1
= new Put(ROW
);
404 put1
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
407 Put put2
= new Put(ANOTHERROW
);
408 put2
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
412 table
.batch(actions
, results
);
417 public void testBatchWithRowMutation() throws Exception
{
418 LOG
.info("Starting testBatchWithRowMutation");
419 byte [][] QUALIFIERS
= new byte [][] {
420 Bytes
.toBytes("a"), Bytes
.toBytes("b")
423 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
424 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
426 RowMutations arm
= RowMutations
.of(Collections
.singletonList(
427 new Put(ROW
).addColumn(FAMILY
, QUALIFIERS
[0], VALUE
)));
428 Object
[] batchResult
= new Object
[1];
429 table
.batch(Arrays
.asList(arm
), batchResult
);
431 Get g
= new Get(ROW
);
432 Result r
= table
.get(g
);
433 assertEquals(0, Bytes
.compareTo(VALUE
, r
.getValue(FAMILY
, QUALIFIERS
[0])));
435 arm
= RowMutations
.of(Arrays
.asList(
436 new Put(ROW
).addColumn(FAMILY
, QUALIFIERS
[1], VALUE
),
437 new Delete(ROW
).addColumns(FAMILY
, QUALIFIERS
[0])));
438 table
.batch(Arrays
.asList(arm
), batchResult
);
440 assertEquals(0, Bytes
.compareTo(VALUE
, r
.getValue(FAMILY
, QUALIFIERS
[1])));
441 assertNull(r
.getValue(FAMILY
, QUALIFIERS
[0]));
443 // Test that we get the correct remote exception for RowMutations from batch()
445 arm
= RowMutations
.of(Collections
.singletonList(
446 new Put(ROW
).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS
[0], VALUE
)));
447 table
.batch(Arrays
.asList(arm
), batchResult
);
448 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
449 } catch(RetriesExhaustedException e
) {
450 String msg
= e
.getMessage();
451 assertTrue(msg
.contains("NoSuchColumnFamilyException"));
457 public void testBatchWithCheckAndMutate() throws Exception
{
458 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
459 byte[] row1
= Bytes
.toBytes("row1");
460 byte[] row2
= Bytes
.toBytes("row2");
461 byte[] row3
= Bytes
.toBytes("row3");
462 byte[] row4
= Bytes
.toBytes("row4");
463 byte[] row5
= Bytes
.toBytes("row5");
464 byte[] row6
= Bytes
.toBytes("row6");
465 byte[] row7
= Bytes
.toBytes("row7");
467 table
.put(Arrays
.asList(
468 new Put(row1
).addColumn(FAMILY
, Bytes
.toBytes("A"), Bytes
.toBytes("a")),
469 new Put(row2
).addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("b")),
470 new Put(row3
).addColumn(FAMILY
, Bytes
.toBytes("C"), Bytes
.toBytes("c")),
471 new Put(row4
).addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d")),
472 new Put(row5
).addColumn(FAMILY
, Bytes
.toBytes("E"), Bytes
.toBytes("e")),
473 new Put(row6
).addColumn(FAMILY
, Bytes
.toBytes("F"), Bytes
.toBytes(10L)),
474 new Put(row7
).addColumn(FAMILY
, Bytes
.toBytes("G"), Bytes
.toBytes("g"))));
476 CheckAndMutate checkAndMutate1
= CheckAndMutate
.newBuilder(row1
)
477 .ifEquals(FAMILY
, Bytes
.toBytes("A"), Bytes
.toBytes("a"))
478 .build(new RowMutations(row1
)
479 .add(new Put(row1
).addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("g")))
480 .add(new Delete(row1
).addColumns(FAMILY
, Bytes
.toBytes("A")))
481 .add(new Increment(row1
).addColumn(FAMILY
, Bytes
.toBytes("C"), 3L))
482 .add(new Append(row1
).addColumn(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("d"))));
483 Get get
= new Get(row2
).addColumn(FAMILY
, Bytes
.toBytes("B"));
484 RowMutations mutations
= new RowMutations(row3
)
485 .add(new Delete(row3
).addColumns(FAMILY
, Bytes
.toBytes("C")))
486 .add(new Put(row3
).addColumn(FAMILY
, Bytes
.toBytes("F"), Bytes
.toBytes("f")))
487 .add(new Increment(row3
).addColumn(FAMILY
, Bytes
.toBytes("A"), 5L))
488 .add(new Append(row3
).addColumn(FAMILY
, Bytes
.toBytes("B"), Bytes
.toBytes("b")));
489 CheckAndMutate checkAndMutate2
= CheckAndMutate
.newBuilder(row4
)
490 .ifEquals(FAMILY
, Bytes
.toBytes("D"), Bytes
.toBytes("a"))
491 .build(new Put(row4
).addColumn(FAMILY
, Bytes
.toBytes("E"), Bytes
.toBytes("h")));
492 Put put
= new Put(row5
).addColumn(FAMILY
, Bytes
.toBytes("E"), Bytes
.toBytes("f"));
493 CheckAndMutate checkAndMutate3
= CheckAndMutate
.newBuilder(row6
)
494 .ifEquals(FAMILY
, Bytes
.toBytes("F"), Bytes
.toBytes(10L))
495 .build(new Increment(row6
).addColumn(FAMILY
, Bytes
.toBytes("F"), 1));
496 CheckAndMutate checkAndMutate4
= CheckAndMutate
.newBuilder(row7
)
497 .ifEquals(FAMILY
, Bytes
.toBytes("G"), Bytes
.toBytes("g"))
498 .build(new Append(row7
).addColumn(FAMILY
, Bytes
.toBytes("G"), Bytes
.toBytes("g")));
500 List
<Row
> actions
= Arrays
.asList(checkAndMutate1
, get
, mutations
, checkAndMutate2
, put
,
501 checkAndMutate3
, checkAndMutate4
);
502 Object
[] results
= new Object
[actions
.size()];
503 table
.batch(actions
, results
);
505 CheckAndMutateResult checkAndMutateResult
= (CheckAndMutateResult
) results
[0];
506 assertTrue(checkAndMutateResult
.isSuccess());
508 Bytes
.toLong(checkAndMutateResult
.getResult().getValue(FAMILY
, Bytes
.toBytes("C"))));
510 Bytes
.toString(checkAndMutateResult
.getResult().getValue(FAMILY
, Bytes
.toBytes("D"))));
513 Bytes
.toString(((Result
) results
[1]).getValue(FAMILY
, Bytes
.toBytes("B"))));
515 Result result
= (Result
) results
[2];
516 assertTrue(result
.getExists());
517 assertEquals(5L, Bytes
.toLong(result
.getValue(FAMILY
, Bytes
.toBytes("A"))));
518 assertEquals("b", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("B"))));
520 checkAndMutateResult
= (CheckAndMutateResult
) results
[3];
521 assertFalse(checkAndMutateResult
.isSuccess());
522 assertNull(checkAndMutateResult
.getResult());
524 assertTrue(((Result
) results
[4]).isEmpty());
526 checkAndMutateResult
= (CheckAndMutateResult
) results
[5];
527 assertTrue(checkAndMutateResult
.isSuccess());
528 assertEquals(11, Bytes
.toLong(checkAndMutateResult
.getResult()
529 .getValue(FAMILY
, Bytes
.toBytes("F"))));
531 checkAndMutateResult
= (CheckAndMutateResult
) results
[6];
532 assertTrue(checkAndMutateResult
.isSuccess());
533 assertEquals("gg", Bytes
.toString(checkAndMutateResult
.getResult()
534 .getValue(FAMILY
, Bytes
.toBytes("G"))));
536 result
= table
.get(new Get(row1
));
537 assertEquals("g", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("B"))));
538 assertNull(result
.getValue(FAMILY
, Bytes
.toBytes("A")));
539 assertEquals(3L, Bytes
.toLong(result
.getValue(FAMILY
, Bytes
.toBytes("C"))));
540 assertEquals("d", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("D"))));
542 result
= table
.get(new Get(row3
));
543 assertNull(result
.getValue(FAMILY
, Bytes
.toBytes("C")));
544 assertEquals("f", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("F"))));
545 assertNull(Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("C"))));
546 assertEquals(5L, Bytes
.toLong(result
.getValue(FAMILY
, Bytes
.toBytes("A"))));
547 assertEquals("b", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("B"))));
549 result
= table
.get(new Get(row4
));
550 assertEquals("d", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("D"))));
552 result
= table
.get(new Get(row5
));
553 assertEquals("f", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("E"))));
555 result
= table
.get(new Get(row6
));
556 assertEquals(11, Bytes
.toLong(result
.getValue(FAMILY
, Bytes
.toBytes("F"))));
558 result
= table
.get(new Get(row7
));
559 assertEquals("gg", Bytes
.toString(result
.getValue(FAMILY
, Bytes
.toBytes("G"))));
564 public void testHTableExistsMethodSingleRegionSingleGet()
565 throws IOException
, InterruptedException
{
566 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
567 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
569 // Test with a single region table.
570 Put put
= new Put(ROW
);
571 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
573 Get get
= new Get(ROW
);
575 boolean exist
= table
.exists(get
);
580 exist
= table
.exists(get
);
586 public void testHTableExistsMethodSingleRegionMultipleGets()
587 throws IOException
, InterruptedException
{
588 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
589 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
591 Put put
= new Put(ROW
);
592 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
595 List
<Get
> gets
= new ArrayList
<>();
596 gets
.add(new Get(ROW
));
597 gets
.add(new Get(ANOTHERROW
));
599 boolean[] results
= table
.exists(gets
);
600 assertTrue(results
[0]);
601 assertFalse(results
[1]);
606 public void testHTableExistsBeforeGet() throws IOException
, InterruptedException
{
607 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
608 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
610 Put put
= new Put(ROW
);
611 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
614 Get get
= new Get(ROW
);
616 boolean exist
= table
.exists(get
);
617 assertEquals(true, exist
);
619 Result result
= table
.get(get
);
620 assertEquals(false, result
.isEmpty());
621 assertTrue(Bytes
.equals(VALUE
, result
.getValue(FAMILY
, QUALIFIER
)));
626 public void testHTableExistsAllBeforeGet() throws IOException
, InterruptedException
{
627 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
628 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
630 final byte[] ROW2
= Bytes
.add(ROW
, Bytes
.toBytes("2"));
631 Put put
= new Put(ROW
);
632 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
635 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
638 Get get
= new Get(ROW
);
639 Get get2
= new Get(ROW2
);
640 ArrayList
<Get
> getList
= new ArrayList(2);
644 boolean[] exists
= table
.exists(getList
);
645 assertEquals(true, exists
[0]);
646 assertEquals(true, exists
[1]);
648 Result
[] result
= table
.get(getList
);
649 assertEquals(false, result
[0].isEmpty());
650 assertTrue(Bytes
.equals(VALUE
, result
[0].getValue(FAMILY
, QUALIFIER
)));
651 assertEquals(false, result
[1].isEmpty());
652 assertTrue(Bytes
.equals(VALUE
, result
[1].getValue(FAMILY
, QUALIFIER
)));
657 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception
{
658 try (Table table
= TEST_UTIL
.createTable(
659 tableName
, new byte[][] { FAMILY
},
660 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
661 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
663 Put put
= new Put(ROW
);
664 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
666 Get get
= new Get(ROW
);
668 boolean exist
= table
.exists(get
);
673 exist
= table
.exists(get
);
679 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception
{
680 try (Table table
= TEST_UTIL
.createTable(
682 new byte[][] { FAMILY
}, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
683 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
685 Put put
= new Put(ROW
);
686 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
689 List
<Get
> gets
= new ArrayList
<>();
690 gets
.add(new Get(ANOTHERROW
));
691 gets
.add(new Get(Bytes
.add(ROW
, new byte[]{0x00})));
692 gets
.add(new Get(ROW
));
693 gets
.add(new Get(Bytes
.add(ANOTHERROW
, new byte[]{0x00})));
695 LOG
.info("Calling exists");
696 boolean[] results
= table
.exists(gets
);
697 assertFalse(results
[0]);
698 assertFalse(results
[1]);
699 assertTrue(results
[2]);
700 assertFalse(results
[3]);
702 // Test with the first region.
703 put
= new Put(new byte[]{0x00});
704 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
707 gets
= new ArrayList
<>();
708 gets
.add(new Get(new byte[]{0x00}));
709 gets
.add(new Get(new byte[]{0x00, 0x00}));
710 results
= table
.exists(gets
);
711 assertTrue(results
[0]);
712 assertFalse(results
[1]);
714 // Test with the last region
715 put
= new Put(new byte[]{(byte) 0xff, (byte) 0xff});
716 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
719 gets
= new ArrayList
<>();
720 gets
.add(new Get(new byte[]{(byte) 0xff}));
721 gets
.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
722 gets
.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
723 results
= table
.exists(gets
);
724 assertFalse(results
[0]);
725 assertTrue(results
[1]);
726 assertFalse(results
[2]);
731 public void testGetEmptyRow() throws Exception
{
732 //Create a table and put in 1 row
733 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
734 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
736 Put put
= new Put(ROW_BYTES
);
737 put
.addColumn(FAMILY
, COL_QUAL
, VAL_BYTES
);
740 //Try getting the row with an empty row key
743 res
= table
.get(new Get(new byte[0]));
745 } catch (IllegalArgumentException e
) {
748 assertTrue(res
== null);
749 res
= table
.get(new Get(Bytes
.toBytes("r1-not-exist")));
750 assertTrue(res
.isEmpty() == true);
751 res
= table
.get(new Get(ROW_BYTES
));
752 assertTrue(Arrays
.equals(res
.getValue(FAMILY
, COL_QUAL
), VAL_BYTES
));
757 public void testConnectionDefaultUsesCodec() throws Exception
{
759 RpcClient client
= RpcClientFactory
.createClient(TEST_UTIL
.getConfiguration(), "cluster")) {
760 assertTrue(client
.hasCellBlockSupport());
765 public void testPutWithPreBatchMutate() throws Exception
{
766 testPreBatchMutate(tableName
, () -> {
767 try (Table t
= TEST_UTIL
.getConnection().getTable(tableName
)) {
768 Put put
= new Put(ROW
);
769 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
771 } catch (IOException ex
) {
772 throw new RuntimeException(ex
);
778 public void testRowMutationsWithPreBatchMutate() throws Exception
{
779 testPreBatchMutate(tableName
, () -> {
780 try (Table t
= TEST_UTIL
.getConnection().getTable(tableName
)) {
781 RowMutations rm
= new RowMutations(ROW
, 1);
782 Put put
= new Put(ROW
);
783 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
786 } catch (IOException ex
) {
787 throw new RuntimeException(ex
);
792 private void testPreBatchMutate(TableName tableName
, Runnable rn
) throws Exception
{
793 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
794 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
))
795 .setCoprocessor(WaitingForScanObserver
.class.getName()).build();
796 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
797 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
799 ExecutorService service
= Executors
.newFixedThreadPool(2);
801 final List
<Cell
> cells
= new ArrayList
<>();
802 service
.execute(() -> {
804 // waiting for update.
805 TimeUnit
.SECONDS
.sleep(3);
806 try (Table t
= TEST_UTIL
.getConnection().getTable(tableName
)) {
807 Scan scan
= new Scan();
808 try (ResultScanner scanner
= t
.getScanner(scan
)) {
809 for (Result r
: scanner
) {
810 cells
.addAll(Arrays
.asList(r
.rawCells()));
814 } catch (IOException
| InterruptedException ex
) {
815 throw new RuntimeException(ex
);
819 service
.awaitTermination(Long
.MAX_VALUE
, TimeUnit
.DAYS
);
820 assertEquals("The write is blocking by RegionObserver#postBatchMutate"
821 + ", so the data is invisible to reader", 0, cells
.size());
822 TEST_UTIL
.deleteTable(tableName
);
826 public void testLockLeakWithDelta() throws Exception
, Throwable
{
827 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
828 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
))
829 .setCoprocessor(WaitingForMultiMutationsObserver
.class.getName())
830 .setValue("hbase.rowlock.wait.duration", String
.valueOf(5000)).build();
831 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
832 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
834 // new a connection for lower retry number.
835 Configuration copy
= new Configuration(TEST_UTIL
.getConfiguration());
836 copy
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
837 try (Connection con
= ConnectionFactory
.createConnection(copy
)) {
838 HRegion region
= (HRegion
) find(tableName
);
839 region
.setTimeoutForWriteLock(10);
840 ExecutorService putService
= Executors
.newSingleThreadExecutor();
841 putService
.execute(() -> {
842 try (Table table
= con
.getTable(tableName
)) {
843 Put put
= new Put(ROW
);
844 put
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
845 // the put will be blocked by WaitingForMultiMutationsObserver.
847 } catch (IOException ex
) {
848 throw new RuntimeException(ex
);
851 ExecutorService appendService
= Executors
.newSingleThreadExecutor();
852 appendService
.execute(() -> {
853 Append append
= new Append(ROW
);
854 append
.addColumn(FAMILY
, QUALIFIER
, VALUE
);
855 try (Table table
= con
.getTable(tableName
)) {
856 table
.append(append
);
857 fail("The APPEND should fail because the target lock is blocked by previous put");
858 } catch (Exception ex
) {
861 appendService
.shutdown();
862 appendService
.awaitTermination(Long
.MAX_VALUE
, TimeUnit
.DAYS
);
863 WaitingForMultiMutationsObserver observer
=
864 find(tableName
, WaitingForMultiMutationsObserver
.class);
865 observer
.latch
.countDown();
866 putService
.shutdown();
867 putService
.awaitTermination(Long
.MAX_VALUE
, TimeUnit
.DAYS
);
868 try (Table table
= con
.getTable(tableName
)) {
869 Result r
= table
.get(new Get(ROW
));
870 assertFalse(r
.isEmpty());
871 assertTrue(Bytes
.equals(r
.getValue(FAMILY
, QUALIFIER
), VALUE
));
874 HRegion region
= (HRegion
) find(tableName
);
875 int readLockCount
= region
.getReadLockCount();
876 LOG
.info("readLockCount:" + readLockCount
);
877 assertEquals(0, readLockCount
);
881 public void testMultiRowMutations() throws Exception
, Throwable
{
882 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
883 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
))
884 .setCoprocessor(MultiRowMutationEndpoint
.class.getName())
885 .setCoprocessor(WaitingForMultiMutationsObserver
.class.getName())
886 .setValue("hbase.rowlock.wait.duration", String
.valueOf(5000)).build();
887 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
888 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
890 // new a connection for lower retry number.
891 Configuration copy
= new Configuration(TEST_UTIL
.getConfiguration());
892 copy
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
893 try (Connection con
= ConnectionFactory
.createConnection(copy
)) {
894 byte[] row
= Bytes
.toBytes("ROW-0");
895 byte[] rowLocked
= Bytes
.toBytes("ROW-1");
896 byte[] value0
= Bytes
.toBytes("VALUE-0");
897 byte[] value1
= Bytes
.toBytes("VALUE-1");
898 byte[] value2
= Bytes
.toBytes("VALUE-2");
899 assertNoLocks(tableName
);
900 ExecutorService putService
= Executors
.newSingleThreadExecutor();
901 putService
.execute(() -> {
902 try (Table table
= con
.getTable(tableName
)) {
903 Put put0
= new Put(rowLocked
);
904 put0
.addColumn(FAMILY
, QUALIFIER
, value0
);
905 // the put will be blocked by WaitingForMultiMutationsObserver.
907 } catch (IOException ex
) {
908 throw new RuntimeException(ex
);
911 ExecutorService cpService
= Executors
.newSingleThreadExecutor();
912 AtomicBoolean exceptionDuringMutateRows
= new AtomicBoolean();
913 cpService
.execute(() -> {
914 Put put1
= new Put(row
);
915 Put put2
= new Put(rowLocked
);
916 put1
.addColumn(FAMILY
, QUALIFIER
, value1
);
917 put2
.addColumn(FAMILY
, QUALIFIER
, value2
);
918 try (Table table
= con
.getTable(tableName
)) {
919 MultiRowMutationProtos
.MutateRowsRequest request
=
920 MultiRowMutationProtos
.MutateRowsRequest
.newBuilder()
922 ProtobufUtil
.toMutation(ClientProtos
.MutationProto
.MutationType
.PUT
, put1
))
924 ProtobufUtil
.toMutation(ClientProtos
.MutationProto
.MutationType
.PUT
, put2
))
926 table
.coprocessorService(MultiRowMutationProtos
.MultiRowMutationService
.class,
928 (MultiRowMutationProtos
.MultiRowMutationService exe
) -> {
929 ServerRpcController controller
= new ServerRpcController();
930 CoprocessorRpcUtils
.BlockingRpcCallback
<MultiRowMutationProtos
.MutateRowsResponse
>
931 rpcCallback
= new CoprocessorRpcUtils
.BlockingRpcCallback
<>();
932 exe
.mutateRows(controller
, request
, rpcCallback
);
933 if (controller
.failedOnException() &&
934 !(controller
.getFailedOn() instanceof UnknownProtocolException
)) {
935 exceptionDuringMutateRows
.set(true);
937 return rpcCallback
.get();
939 } catch (Throwable ex
) {
940 LOG
.error("encountered " + ex
);
943 cpService
.shutdown();
944 cpService
.awaitTermination(Long
.MAX_VALUE
, TimeUnit
.DAYS
);
945 WaitingForMultiMutationsObserver observer
= find(tableName
,
946 WaitingForMultiMutationsObserver
.class);
947 observer
.latch
.countDown();
948 putService
.shutdown();
949 putService
.awaitTermination(Long
.MAX_VALUE
, TimeUnit
.DAYS
);
950 try (Table table
= con
.getTable(tableName
)) {
951 Get g0
= new Get(row
);
952 Get g1
= new Get(rowLocked
);
953 Result r0
= table
.get(g0
);
954 Result r1
= table
.get(g1
);
955 assertTrue(r0
.isEmpty());
956 assertFalse(r1
.isEmpty());
957 assertTrue(Bytes
.equals(r1
.getValue(FAMILY
, QUALIFIER
), value0
));
959 assertNoLocks(tableName
);
960 if (!exceptionDuringMutateRows
.get()) {
961 fail("This cp should fail because the target lock is blocked by previous put");
967 * A test case for issue HBASE-17482
968 * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
969 * onto cells in the append thread, a countdown latch is used to ensure that happened
970 * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
971 * make the seqid/mvcc acquirement in handler thread and stamping in append thread
972 * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
973 * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
974 * with a smaller readpoint can see these data, which disobey the multi version
975 * concurrency control rules.
976 * This test case is to reproduce this scenario.
977 * @throws IOException
980 public void testMVCCUsingMVCCPreAssign() throws IOException
, InterruptedException
{
981 try (Table table
= TEST_UTIL
.createTable(tableName
, new byte[][] { FAMILY
})) {
982 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
983 //put two row first to init the scanner
984 Put put
= new Put(Bytes
.toBytes("0"));
985 put
.addColumn(FAMILY
, Bytes
.toBytes(""), Bytes
.toBytes("0"));
987 put
= new Put(Bytes
.toBytes("00"));
988 put
.addColumn(FAMILY
, Bytes
.toBytes(""), Bytes
.toBytes("0"));
990 Scan scan
= new Scan();
991 scan
.setTimeRange(0, Long
.MAX_VALUE
);
993 ResultScanner scanner
= table
.getScanner(scan
);
994 int rowNum
= scanner
.next() != null ?
1 : 0;
995 //the started scanner shouldn't see the rows put below
996 for (int i
= 1; i
< 1000; i
++) {
997 put
= new Put(Bytes
.toBytes(String
.valueOf(i
)));
998 put
.setDurability(Durability
.ASYNC_WAL
);
999 put
.addColumn(FAMILY
, Bytes
.toBytes(""), Bytes
.toBytes(i
));
1002 for (Result result
: scanner
) {
1005 //scanner should only see two rows
1006 assertEquals(2, rowNum
);
1007 scanner
= table
.getScanner(scan
);
1009 for (Result result
: scanner
) {
1012 // the new scanner should see all rows
1013 assertEquals(1001, rowNum
);
1018 public void testPutThenGetWithMultipleThreads() throws Exception
{
1019 final int THREAD_NUM
= 20;
1020 final int ROUND_NUM
= 10;
1021 for (int round
= 0; round
< ROUND_NUM
; round
++) {
1022 ArrayList
<Thread
> threads
= new ArrayList
<>(THREAD_NUM
);
1023 final AtomicInteger successCnt
= new AtomicInteger(0);
1024 try (Table ht
= TEST_UTIL
.createTable(tableName
, FAMILY
)) {
1025 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
1027 for (int i
= 0; i
< THREAD_NUM
; i
++) {
1028 final int index
= i
;
1029 Thread t
= new Thread(new Runnable() {
1033 final byte[] row
= Bytes
.toBytes("row-" + index
);
1034 final byte[] value
= Bytes
.toBytes("v" + index
);
1036 Put put
= new Put(row
);
1037 put
.addColumn(FAMILY
, QUALIFIER
, value
);
1039 Get get
= new Get(row
);
1040 Result result
= ht
.get(get
);
1041 byte[] returnedValue
= result
.getValue(FAMILY
, QUALIFIER
);
1042 if (Bytes
.equals(value
, returnedValue
)) {
1043 successCnt
.getAndIncrement();
1045 LOG
.error("Should be equal but not, original value: " + Bytes
.toString(value
)
1046 + ", returned value: "
1047 + (returnedValue
== null ?
"null" : Bytes
.toString(returnedValue
)));
1049 } catch (Throwable e
) {
1056 for (Thread t
: threads
) {
1059 for (Thread t
: threads
) {
1062 assertEquals("Not equal in round " + round
, THREAD_NUM
, successCnt
.get());
1064 TEST_UTIL
.deleteTable(tableName
);
1068 private static void assertNoLocks(final TableName tableName
)
1069 throws IOException
, InterruptedException
{
1070 HRegion region
= (HRegion
) find(tableName
);
1071 assertEquals(0, region
.getLockedRows().size());
1073 private static HRegion
find(final TableName tableName
)
1074 throws IOException
, InterruptedException
{
1075 HRegionServer rs
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
);
1076 List
<HRegion
> regions
= rs
.getRegions(tableName
);
1077 assertEquals(1, regions
.size());
1078 return regions
.get(0);
1081 private static <T
extends RegionObserver
> T
find(final TableName tableName
,
1082 Class
<T
> clz
) throws IOException
, InterruptedException
{
1083 HRegion region
= find(tableName
);
1084 Coprocessor cp
= region
.getCoprocessorHost().findCoprocessor(clz
.getName());
1085 assertTrue("The cp instance should be " + clz
.getName()
1086 + ", current instance is " + cp
.getClass().getName(), clz
.isInstance(cp
));
1087 return clz
.cast(cp
);
1090 public static class WaitingForMultiMutationsObserver
1091 implements RegionCoprocessor
, RegionObserver
{
1092 final CountDownLatch latch
= new CountDownLatch(1);
1095 public Optional
<RegionObserver
> getRegionObserver() {
1096 return Optional
.of(this);
1100 public void postBatchMutate(final ObserverContext
<RegionCoprocessorEnvironment
> c
,
1101 final MiniBatchOperationInProgress
<Mutation
> miniBatchOp
) throws IOException
{
1104 } catch (InterruptedException ex
) {
1105 throw new IOException(ex
);
1110 public static class WaitingForScanObserver
implements RegionCoprocessor
, RegionObserver
{
1111 private final CountDownLatch latch
= new CountDownLatch(1);
1114 public Optional
<RegionObserver
> getRegionObserver() {
1115 return Optional
.of(this);
1119 public void postBatchMutate(final ObserverContext
<RegionCoprocessorEnvironment
> c
,
1120 final MiniBatchOperationInProgress
<Mutation
> miniBatchOp
) throws IOException
{
1122 // waiting for scanner
1124 } catch (InterruptedException ex
) {
1125 throw new IOException(ex
);
1130 public RegionScanner
postScannerOpen(final ObserverContext
<RegionCoprocessorEnvironment
> e
,
1131 final Scan scan
, final RegionScanner s
) throws IOException
{
1137 static byte[] generateHugeValue(int size
) {
1138 Random rand
= ThreadLocalRandom
.current();
1139 byte[] value
= new byte[size
];
1140 for (int i
= 0; i
< value
.length
; i
++) {
1141 value
[i
] = (byte) rand
.nextInt(256);
1147 public void testScanWithBatchSizeReturnIncompleteCells() throws IOException
, InterruptedException
{
1148 TableDescriptor hd
= TableDescriptorBuilder
.newBuilder(tableName
)
1149 .setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(FAMILY
).setMaxVersions(3).build())
1151 try (Table table
= TEST_UTIL
.createTable(hd
, null)) {
1152 TEST_UTIL
.waitTableAvailable(tableName
, WAITTABLE_MILLIS
);
1154 Put put
= new Put(ROW
);
1155 put
.addColumn(FAMILY
, Bytes
.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1159 put
.addColumn(FAMILY
, Bytes
.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1162 for (int i
= 2; i
< 5; i
++) {
1163 for (int version
= 0; version
< 2; version
++) {
1165 put
.addColumn(FAMILY
, Bytes
.toBytes(i
), generateHugeValue(1024));
1170 Scan scan
= new Scan();
1171 scan
.withStartRow(ROW
).withStopRow(ROW
, true).addFamily(FAMILY
).setBatch(3)
1172 .setMaxResultSize(4 * 1024 * 1024);
1174 try (ResultScanner scanner
= table
.getScanner(scan
)) {
1175 List
<Result
> list
= new ArrayList
<>();
1177 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
1178 * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
1179 * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1180 * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1181 * into two result: 2+3 -> 3+2;
1183 while ((result
= scanner
.next()) != null) {
1187 Assert
.assertEquals(5, list
.stream().mapToInt(Result
::size
).sum());
1188 Assert
.assertEquals(2, list
.size());
1189 Assert
.assertEquals(3, list
.get(0).size());
1190 Assert
.assertEquals(2, list
.get(1).size());
1194 scan
.withStartRow(ROW
).withStopRow(ROW
, true).addFamily(FAMILY
).setBatch(2)
1195 .setMaxResultSize(4 * 1024 * 1024);
1196 try (ResultScanner scanner
= table
.getScanner(scan
)) {
1197 List
<Result
> list
= new ArrayList
<>();
1198 while ((result
= scanner
.next()) != null) {
1201 Assert
.assertEquals(5, list
.stream().mapToInt(Result
::size
).sum());
1202 Assert
.assertEquals(3, list
.size());
1203 Assert
.assertEquals(2, list
.get(0).size());
1204 Assert
.assertEquals(2, list
.get(1).size());
1205 Assert
.assertEquals(1, list
.get(2).size());