HBASE-26567 Remove IndexType from ChunkCreator (#3947)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestFromClientSide3.java
blobddfa37515797064dd6b7c05261e0f4ed18ffd798
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.Collections;
30 import java.util.List;
31 import java.util.Optional;
32 import java.util.Random;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.ThreadLocalRandom;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.hbase.Cell;
42 import org.apache.hadoop.hbase.CellUtil;
43 import org.apache.hadoop.hbase.Coprocessor;
44 import org.apache.hadoop.hbase.HBaseClassTestRule;
45 import org.apache.hadoop.hbase.HBaseTestingUtil;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.HRegionLocation;
48 import org.apache.hadoop.hbase.RegionMetrics;
49 import org.apache.hadoop.hbase.ServerName;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
52 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
53 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
54 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
55 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
56 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
57 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
58 import org.apache.hadoop.hbase.ipc.RpcClient;
59 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
60 import org.apache.hadoop.hbase.ipc.ServerRpcController;
61 import org.apache.hadoop.hbase.regionserver.HRegion;
62 import org.apache.hadoop.hbase.regionserver.HRegionServer;
63 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
64 import org.apache.hadoop.hbase.regionserver.RegionScanner;
65 import org.apache.hadoop.hbase.testclassification.ClientTests;
66 import org.apache.hadoop.hbase.testclassification.LargeTests;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
69 import org.junit.After;
70 import org.junit.AfterClass;
71 import org.junit.Assert;
72 import org.junit.Before;
73 import org.junit.BeforeClass;
74 import org.junit.ClassRule;
75 import org.junit.Rule;
76 import org.junit.Test;
77 import org.junit.experimental.categories.Category;
78 import org.junit.rules.TestName;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
83 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
84 import org.apache.hadoop.hbase.shaded.protobuf.generated.MultiRowMutationProtos;
86 @Category({LargeTests.class, ClientTests.class})
87 public class TestFromClientSide3 {
89 @ClassRule
90 public static final HBaseClassTestRule CLASS_RULE =
91 HBaseClassTestRule.forClass(TestFromClientSide3.class);
93 private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide3.class);
94 private final static HBaseTestingUtil TEST_UTIL
95 = new HBaseTestingUtil();
96 private static final int WAITTABLE_MILLIS = 10000;
97 private static byte[] FAMILY = Bytes.toBytes("testFamily");
98 private static Random random = new Random();
99 private static int SLAVES = 3;
100 private static final byte[] ROW = Bytes.toBytes("testRow");
101 private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
102 private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
103 private static final byte[] VALUE = Bytes.toBytes("testValue");
104 private static final byte[] COL_QUAL = Bytes.toBytes("f1");
105 private static final byte[] VAL_BYTES = Bytes.toBytes("v1");
106 private static final byte[] ROW_BYTES = Bytes.toBytes("r1");
108 @Rule
109 public TestName name = new TestName();
110 private TableName tableName;
113 * @throws java.lang.Exception
115 @BeforeClass
116 public static void setUpBeforeClass() throws Exception {
117 TEST_UTIL.startMiniCluster(SLAVES);
121 * @throws java.lang.Exception
123 @AfterClass
124 public static void tearDownAfterClass() throws Exception {
125 TEST_UTIL.shutdownMiniCluster();
128 @Before
129 public void setUp() throws Exception {
130 tableName = TableName.valueOf(name.getMethodName());
133 @After
134 public void tearDown() throws Exception {
135 for (TableDescriptor htd : TEST_UTIL.getAdmin().listTableDescriptors()) {
136 LOG.info("Tear down, remove table=" + htd.getTableName());
137 TEST_UTIL.deleteTable(htd.getTableName());
141 private void randomCFPuts(Table table, byte[] row, byte[] family, int nPuts)
142 throws Exception {
143 Put put = new Put(row);
144 for (int i = 0; i < nPuts; i++) {
145 byte[] qualifier = Bytes.toBytes(random.nextInt());
146 byte[] value = Bytes.toBytes(random.nextInt());
147 put.addColumn(family, qualifier, value);
149 table.put(put);
152 private void performMultiplePutAndFlush(Admin admin, Table table, byte[] row, byte[] family,
153 int nFlushes, int nPuts) throws Exception {
154 for (int i = 0; i < nFlushes; i++) {
155 randomCFPuts(table, row, family, nPuts);
156 admin.flush(table.getName());
160 private static List<Cell> toList(ResultScanner scanner) {
161 try {
162 List<Cell> cells = new ArrayList<>();
163 for (Result r : scanner) {
164 cells.addAll(r.listCells());
166 return cells;
167 } finally {
168 scanner.close();
172 @Test
173 public void testScanAfterDeletingSpecifiedRow() throws IOException, InterruptedException {
174 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
175 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
176 byte[] row = Bytes.toBytes("SpecifiedRow");
177 byte[] value0 = Bytes.toBytes("value_0");
178 byte[] value1 = Bytes.toBytes("value_1");
179 Put put = new Put(row);
180 put.addColumn(FAMILY, QUALIFIER, VALUE);
181 table.put(put);
182 Delete d = new Delete(row);
183 table.delete(d);
184 put = new Put(row);
185 put.addColumn(FAMILY, null, value0);
186 table.put(put);
187 put = new Put(row);
188 put.addColumn(FAMILY, null, value1);
189 table.put(put);
190 List<Cell> cells = toList(table.getScanner(new Scan()));
191 assertEquals(1, cells.size());
192 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
194 cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
195 assertEquals(1, cells.size());
196 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
198 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
199 assertEquals(0, cells.size());
201 TEST_UTIL.getAdmin().flush(tableName);
202 cells = toList(table.getScanner(new Scan()));
203 assertEquals(1, cells.size());
204 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
206 cells = toList(table.getScanner(new Scan().addFamily(FAMILY)));
207 assertEquals(1, cells.size());
208 assertEquals("value_1", Bytes.toString(CellUtil.cloneValue(cells.get(0))));
210 cells = toList(table.getScanner(new Scan().addColumn(FAMILY, QUALIFIER)));
211 assertEquals(0, cells.size());
215 @Test
216 public void testScanAfterDeletingSpecifiedRowV2() throws IOException, InterruptedException {
217 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
218 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
219 byte[] row = Bytes.toBytes("SpecifiedRow");
220 byte[] qual0 = Bytes.toBytes("qual0");
221 byte[] qual1 = Bytes.toBytes("qual1");
222 long now = EnvironmentEdgeManager.currentTime();
223 Delete d = new Delete(row, now);
224 table.delete(d);
226 Put put = new Put(row);
227 put.addColumn(FAMILY, null, now + 1, VALUE);
228 table.put(put);
230 put = new Put(row);
231 put.addColumn(FAMILY, qual1, now + 2, qual1);
232 table.put(put);
234 put = new Put(row);
235 put.addColumn(FAMILY, qual0, now + 3, qual0);
236 table.put(put);
238 Result r = table.get(new Get(row));
239 assertEquals(r.toString(), 3, r.size());
240 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
241 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
242 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
244 TEST_UTIL.getAdmin().flush(tableName);
245 r = table.get(new Get(row));
246 assertEquals(3, r.size());
247 assertEquals("testValue", Bytes.toString(CellUtil.cloneValue(r.rawCells()[0])));
248 assertEquals("qual0", Bytes.toString(CellUtil.cloneValue(r.rawCells()[1])));
249 assertEquals("qual1", Bytes.toString(CellUtil.cloneValue(r.rawCells()[2])));
253 private int getStoreFileCount(Admin admin, ServerName serverName, RegionInfo region)
254 throws IOException {
255 for (RegionMetrics metrics : admin.getRegionMetrics(serverName, region.getTable())) {
256 if (Bytes.equals(region.getRegionName(), metrics.getRegionName())) {
257 return metrics.getStoreFileCount();
260 return 0;
263 // override the config settings at the CF level and ensure priority
264 @Test
265 public void testAdvancedConfigOverride() throws Exception {
267 * Overall idea: (1) create 3 store files and issue a compaction. config's
268 * compaction.min == 3, so should work. (2) Increase the compaction.min
269 * toggle in the HTD to 5 and modify table. If we use the HTD value instead
270 * of the default config value, adding 3 files and issuing a compaction
271 * SHOULD NOT work (3) Decrease the compaction.min toggle in the HCD to 2
272 * and modify table. The CF schema should override the Table schema and now
273 * cause a minor compaction.
275 TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 3);
277 final TableName tableName = TableName.valueOf(name.getMethodName());
278 try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 10)) {
279 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
280 Admin admin = TEST_UTIL.getAdmin();
282 // Create 3 store files.
283 byte[] row = Bytes.toBytes(random.nextInt());
284 performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 100);
286 try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
287 // Verify we have multiple store files.
288 HRegionLocation loc = locator.getRegionLocation(row, true);
289 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) > 1);
291 // Issue a compaction request
292 admin.compact(tableName);
294 // poll wait for the compactions to happen
295 for (int i = 0; i < 10 * 1000 / 40; ++i) {
296 // The number of store files after compaction should be lesser.
297 loc = locator.getRegionLocation(row, true);
298 if (!loc.getRegion().isOffline()) {
299 if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1) {
300 break;
303 Thread.sleep(40);
305 // verify the compactions took place and that we didn't just time out
306 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) <= 1);
308 // change the compaction.min config option for this table to 5
309 LOG.info("hbase.hstore.compaction.min should now be 5");
310 TableDescriptor htd = TableDescriptorBuilder.newBuilder(table.getDescriptor())
311 .setValue("hbase.hstore.compaction.min", String.valueOf(5)).build();
312 admin.modifyTable(htd);
313 LOG.info("alter status finished");
315 // Create 3 more store files.
316 performMultiplePutAndFlush(admin, table, row, FAMILY, 3, 10);
318 // Issue a compaction request
319 admin.compact(tableName);
321 // This time, the compaction request should not happen
322 Thread.sleep(10 * 1000);
323 loc = locator.getRegionLocation(row, true);
324 int sfCount = getStoreFileCount(admin, loc.getServerName(), loc.getRegion());
325 assertTrue(sfCount > 1);
327 // change an individual CF's config option to 2 & online schema update
328 LOG.info("hbase.hstore.compaction.min should now be 2");
329 htd = TableDescriptorBuilder.newBuilder(htd)
330 .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
331 .setValue("hbase.hstore.compaction.min", String.valueOf(2)).build())
332 .build();
333 admin.modifyTable(htd);
334 LOG.info("alter status finished");
336 // Issue a compaction request
337 admin.compact(tableName);
339 // poll wait for the compactions to happen
340 for (int i = 0; i < 10 * 1000 / 40; ++i) {
341 loc = locator.getRegionLocation(row, true);
342 try {
343 if (getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount) {
344 break;
346 } catch (Exception e) {
347 LOG.debug("Waiting for region to come online: " +
348 Bytes.toStringBinary(loc.getRegion().getRegionName()));
350 Thread.sleep(40);
353 // verify the compaction took place and that we didn't just time out
354 assertTrue(getStoreFileCount(admin, loc.getServerName(), loc.getRegion()) < sfCount);
356 // Finally, ensure that we can remove a custom config value after we made it
357 LOG.info("Removing CF config value");
358 LOG.info("hbase.hstore.compaction.min should now be 5");
359 htd = TableDescriptorBuilder.newBuilder(htd)
360 .modifyColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(htd.getColumnFamily(FAMILY))
361 .setValue("hbase.hstore.compaction.min", null).build())
362 .build();
363 admin.modifyTable(htd);
364 LOG.info("alter status finished");
365 assertNull(table.getDescriptor().getColumnFamily(FAMILY)
366 .getValue(Bytes.toBytes("hbase.hstore.compaction.min")));
371 @Test
372 public void testHTableBatchWithEmptyPut () throws IOException, InterruptedException {
373 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
374 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
375 List actions = (List) new ArrayList();
376 Object[] results = new Object[2];
377 // create an empty Put
378 Put put1 = new Put(ROW);
379 actions.add(put1);
381 Put put2 = new Put(ANOTHERROW);
382 put2.addColumn(FAMILY, QUALIFIER, VALUE);
383 actions.add(put2);
385 table.batch(actions, results);
386 fail("Empty Put should have failed the batch call");
387 } catch (IllegalArgumentException iae) {
391 // Test Table.batch with large amount of mutations against the same key.
392 // It used to trigger read lock's "Maximum lock count exceeded" Error.
393 @Test
394 public void testHTableWithLargeBatch() throws IOException, InterruptedException {
395 int sixtyFourK = 64 * 1024;
396 List actions = new ArrayList();
397 Object[] results = new Object[(sixtyFourK + 1) * 2];
399 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
400 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
402 for (int i = 0; i < sixtyFourK + 1; i++) {
403 Put put1 = new Put(ROW);
404 put1.addColumn(FAMILY, QUALIFIER, VALUE);
405 actions.add(put1);
407 Put put2 = new Put(ANOTHERROW);
408 put2.addColumn(FAMILY, QUALIFIER, VALUE);
409 actions.add(put2);
412 table.batch(actions, results);
416 @Test
417 public void testBatchWithRowMutation() throws Exception {
418 LOG.info("Starting testBatchWithRowMutation");
419 byte [][] QUALIFIERS = new byte [][] {
420 Bytes.toBytes("a"), Bytes.toBytes("b")
423 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
424 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
426 RowMutations arm = RowMutations.of(Collections.singletonList(
427 new Put(ROW).addColumn(FAMILY, QUALIFIERS[0], VALUE)));
428 Object[] batchResult = new Object[1];
429 table.batch(Arrays.asList(arm), batchResult);
431 Get g = new Get(ROW);
432 Result r = table.get(g);
433 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[0])));
435 arm = RowMutations.of(Arrays.asList(
436 new Put(ROW).addColumn(FAMILY, QUALIFIERS[1], VALUE),
437 new Delete(ROW).addColumns(FAMILY, QUALIFIERS[0])));
438 table.batch(Arrays.asList(arm), batchResult);
439 r = table.get(g);
440 assertEquals(0, Bytes.compareTo(VALUE, r.getValue(FAMILY, QUALIFIERS[1])));
441 assertNull(r.getValue(FAMILY, QUALIFIERS[0]));
443 // Test that we get the correct remote exception for RowMutations from batch()
444 try {
445 arm = RowMutations.of(Collections.singletonList(
446 new Put(ROW).addColumn(new byte[]{'b', 'o', 'g', 'u', 's'}, QUALIFIERS[0], VALUE)));
447 table.batch(Arrays.asList(arm), batchResult);
448 fail("Expected RetriesExhaustedWithDetailsException with NoSuchColumnFamilyException");
449 } catch(RetriesExhaustedException e) {
450 String msg = e.getMessage();
451 assertTrue(msg.contains("NoSuchColumnFamilyException"));
456 @Test
457 public void testBatchWithCheckAndMutate() throws Exception {
458 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
459 byte[] row1 = Bytes.toBytes("row1");
460 byte[] row2 = Bytes.toBytes("row2");
461 byte[] row3 = Bytes.toBytes("row3");
462 byte[] row4 = Bytes.toBytes("row4");
463 byte[] row5 = Bytes.toBytes("row5");
464 byte[] row6 = Bytes.toBytes("row6");
465 byte[] row7 = Bytes.toBytes("row7");
467 table.put(Arrays.asList(
468 new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
469 new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
470 new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
471 new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
472 new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
473 new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
474 new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))));
476 CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
477 .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
478 .build(new RowMutations(row1)
479 .add(new Put(row1).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("g")))
480 .add(new Delete(row1).addColumns(FAMILY, Bytes.toBytes("A")))
481 .add(new Increment(row1).addColumn(FAMILY, Bytes.toBytes("C"), 3L))
482 .add(new Append(row1).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))));
483 Get get = new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"));
484 RowMutations mutations = new RowMutations(row3)
485 .add(new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C")))
486 .add(new Put(row3).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
487 .add(new Increment(row3).addColumn(FAMILY, Bytes.toBytes("A"), 5L))
488 .add(new Append(row3).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
489 CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row4)
490 .ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
491 .build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
492 Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
493 CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
494 .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
495 .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
496 CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
497 .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
498 .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
500 List<Row> actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
501 checkAndMutate3, checkAndMutate4);
502 Object[] results = new Object[actions.size()];
503 table.batch(actions, results);
505 CheckAndMutateResult checkAndMutateResult = (CheckAndMutateResult) results[0];
506 assertTrue(checkAndMutateResult.isSuccess());
507 assertEquals(3L,
508 Bytes.toLong(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("C"))));
509 assertEquals("d",
510 Bytes.toString(checkAndMutateResult.getResult().getValue(FAMILY, Bytes.toBytes("D"))));
512 assertEquals("b",
513 Bytes.toString(((Result) results[1]).getValue(FAMILY, Bytes.toBytes("B"))));
515 Result result = (Result) results[2];
516 assertTrue(result.getExists());
517 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
518 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
520 checkAndMutateResult = (CheckAndMutateResult) results[3];
521 assertFalse(checkAndMutateResult.isSuccess());
522 assertNull(checkAndMutateResult.getResult());
524 assertTrue(((Result) results[4]).isEmpty());
526 checkAndMutateResult = (CheckAndMutateResult) results[5];
527 assertTrue(checkAndMutateResult.isSuccess());
528 assertEquals(11, Bytes.toLong(checkAndMutateResult.getResult()
529 .getValue(FAMILY, Bytes.toBytes("F"))));
531 checkAndMutateResult = (CheckAndMutateResult) results[6];
532 assertTrue(checkAndMutateResult.isSuccess());
533 assertEquals("gg", Bytes.toString(checkAndMutateResult.getResult()
534 .getValue(FAMILY, Bytes.toBytes("G"))));
536 result = table.get(new Get(row1));
537 assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
538 assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
539 assertEquals(3L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
540 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
542 result = table.get(new Get(row3));
543 assertNull(result.getValue(FAMILY, Bytes.toBytes("C")));
544 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
545 assertNull(Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
546 assertEquals(5L, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("A"))));
547 assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
549 result = table.get(new Get(row4));
550 assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
552 result = table.get(new Get(row5));
553 assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("E"))));
555 result = table.get(new Get(row6));
556 assertEquals(11, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("F"))));
558 result = table.get(new Get(row7));
559 assertEquals("gg", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("G"))));
563 @Test
564 public void testHTableExistsMethodSingleRegionSingleGet()
565 throws IOException, InterruptedException {
566 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
567 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
569 // Test with a single region table.
570 Put put = new Put(ROW);
571 put.addColumn(FAMILY, QUALIFIER, VALUE);
573 Get get = new Get(ROW);
575 boolean exist = table.exists(get);
576 assertFalse(exist);
578 table.put(put);
580 exist = table.exists(get);
581 assertTrue(exist);
585 @Test
586 public void testHTableExistsMethodSingleRegionMultipleGets()
587 throws IOException, InterruptedException {
588 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
589 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
591 Put put = new Put(ROW);
592 put.addColumn(FAMILY, QUALIFIER, VALUE);
593 table.put(put);
595 List<Get> gets = new ArrayList<>();
596 gets.add(new Get(ROW));
597 gets.add(new Get(ANOTHERROW));
599 boolean[] results = table.exists(gets);
600 assertTrue(results[0]);
601 assertFalse(results[1]);
605 @Test
606 public void testHTableExistsBeforeGet() throws IOException, InterruptedException {
607 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
608 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
610 Put put = new Put(ROW);
611 put.addColumn(FAMILY, QUALIFIER, VALUE);
612 table.put(put);
614 Get get = new Get(ROW);
616 boolean exist = table.exists(get);
617 assertEquals(true, exist);
619 Result result = table.get(get);
620 assertEquals(false, result.isEmpty());
621 assertTrue(Bytes.equals(VALUE, result.getValue(FAMILY, QUALIFIER)));
625 @Test
626 public void testHTableExistsAllBeforeGet() throws IOException, InterruptedException {
627 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
628 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
630 final byte[] ROW2 = Bytes.add(ROW, Bytes.toBytes("2"));
631 Put put = new Put(ROW);
632 put.addColumn(FAMILY, QUALIFIER, VALUE);
633 table.put(put);
634 put = new Put(ROW2);
635 put.addColumn(FAMILY, QUALIFIER, VALUE);
636 table.put(put);
638 Get get = new Get(ROW);
639 Get get2 = new Get(ROW2);
640 ArrayList<Get> getList = new ArrayList(2);
641 getList.add(get);
642 getList.add(get2);
644 boolean[] exists = table.exists(getList);
645 assertEquals(true, exists[0]);
646 assertEquals(true, exists[1]);
648 Result[] result = table.get(getList);
649 assertEquals(false, result[0].isEmpty());
650 assertTrue(Bytes.equals(VALUE, result[0].getValue(FAMILY, QUALIFIER)));
651 assertEquals(false, result[1].isEmpty());
652 assertTrue(Bytes.equals(VALUE, result[1].getValue(FAMILY, QUALIFIER)));
656 @Test
657 public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
658 try (Table table = TEST_UTIL.createTable(
659 tableName, new byte[][] { FAMILY },
660 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
661 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
663 Put put = new Put(ROW);
664 put.addColumn(FAMILY, QUALIFIER, VALUE);
666 Get get = new Get(ROW);
668 boolean exist = table.exists(get);
669 assertFalse(exist);
671 table.put(put);
673 exist = table.exists(get);
674 assertTrue(exist);
678 @Test
679 public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
680 try (Table table = TEST_UTIL.createTable(
681 tableName,
682 new byte[][] { FAMILY }, 1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255)) {
683 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
685 Put put = new Put(ROW);
686 put.addColumn(FAMILY, QUALIFIER, VALUE);
687 table.put(put);
689 List<Get> gets = new ArrayList<>();
690 gets.add(new Get(ANOTHERROW));
691 gets.add(new Get(Bytes.add(ROW, new byte[]{0x00})));
692 gets.add(new Get(ROW));
693 gets.add(new Get(Bytes.add(ANOTHERROW, new byte[]{0x00})));
695 LOG.info("Calling exists");
696 boolean[] results = table.exists(gets);
697 assertFalse(results[0]);
698 assertFalse(results[1]);
699 assertTrue(results[2]);
700 assertFalse(results[3]);
702 // Test with the first region.
703 put = new Put(new byte[]{0x00});
704 put.addColumn(FAMILY, QUALIFIER, VALUE);
705 table.put(put);
707 gets = new ArrayList<>();
708 gets.add(new Get(new byte[]{0x00}));
709 gets.add(new Get(new byte[]{0x00, 0x00}));
710 results = table.exists(gets);
711 assertTrue(results[0]);
712 assertFalse(results[1]);
714 // Test with the last region
715 put = new Put(new byte[]{(byte) 0xff, (byte) 0xff});
716 put.addColumn(FAMILY, QUALIFIER, VALUE);
717 table.put(put);
719 gets = new ArrayList<>();
720 gets.add(new Get(new byte[]{(byte) 0xff}));
721 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff}));
722 gets.add(new Get(new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff}));
723 results = table.exists(gets);
724 assertFalse(results[0]);
725 assertTrue(results[1]);
726 assertFalse(results[2]);
730 @Test
731 public void testGetEmptyRow() throws Exception {
732 //Create a table and put in 1 row
733 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
734 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
736 Put put = new Put(ROW_BYTES);
737 put.addColumn(FAMILY, COL_QUAL, VAL_BYTES);
738 table.put(put);
740 //Try getting the row with an empty row key
741 Result res = null;
742 try {
743 res = table.get(new Get(new byte[0]));
744 fail();
745 } catch (IllegalArgumentException e) {
746 // Expected.
748 assertTrue(res == null);
749 res = table.get(new Get(Bytes.toBytes("r1-not-exist")));
750 assertTrue(res.isEmpty() == true);
751 res = table.get(new Get(ROW_BYTES));
752 assertTrue(Arrays.equals(res.getValue(FAMILY, COL_QUAL), VAL_BYTES));
756 @Test
757 public void testConnectionDefaultUsesCodec() throws Exception {
758 try (
759 RpcClient client = RpcClientFactory.createClient(TEST_UTIL.getConfiguration(), "cluster")) {
760 assertTrue(client.hasCellBlockSupport());
764 @Test
765 public void testPutWithPreBatchMutate() throws Exception {
766 testPreBatchMutate(tableName, () -> {
767 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
768 Put put = new Put(ROW);
769 put.addColumn(FAMILY, QUALIFIER, VALUE);
770 t.put(put);
771 } catch (IOException ex) {
772 throw new RuntimeException(ex);
777 @Test
778 public void testRowMutationsWithPreBatchMutate() throws Exception {
779 testPreBatchMutate(tableName, () -> {
780 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
781 RowMutations rm = new RowMutations(ROW, 1);
782 Put put = new Put(ROW);
783 put.addColumn(FAMILY, QUALIFIER, VALUE);
784 rm.add(put);
785 t.mutateRow(rm);
786 } catch (IOException ex) {
787 throw new RuntimeException(ex);
792 private void testPreBatchMutate(TableName tableName, Runnable rn) throws Exception {
793 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
794 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
795 .setCoprocessor(WaitingForScanObserver.class.getName()).build();
796 TEST_UTIL.getAdmin().createTable(tableDescriptor);
797 // Don't use waitTableAvailable(), because the scanner will mess up the co-processor
799 ExecutorService service = Executors.newFixedThreadPool(2);
800 service.execute(rn);
801 final List<Cell> cells = new ArrayList<>();
802 service.execute(() -> {
803 try {
804 // waiting for update.
805 TimeUnit.SECONDS.sleep(3);
806 try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
807 Scan scan = new Scan();
808 try (ResultScanner scanner = t.getScanner(scan)) {
809 for (Result r : scanner) {
810 cells.addAll(Arrays.asList(r.rawCells()));
814 } catch (IOException | InterruptedException ex) {
815 throw new RuntimeException(ex);
818 service.shutdown();
819 service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
820 assertEquals("The write is blocking by RegionObserver#postBatchMutate"
821 + ", so the data is invisible to reader", 0, cells.size());
822 TEST_UTIL.deleteTable(tableName);
825 @Test
826 public void testLockLeakWithDelta() throws Exception, Throwable {
827 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
828 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
829 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
830 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
831 TEST_UTIL.getAdmin().createTable(tableDescriptor);
832 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
834 // new a connection for lower retry number.
835 Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
836 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
837 try (Connection con = ConnectionFactory.createConnection(copy)) {
838 HRegion region = (HRegion) find(tableName);
839 region.setTimeoutForWriteLock(10);
840 ExecutorService putService = Executors.newSingleThreadExecutor();
841 putService.execute(() -> {
842 try (Table table = con.getTable(tableName)) {
843 Put put = new Put(ROW);
844 put.addColumn(FAMILY, QUALIFIER, VALUE);
845 // the put will be blocked by WaitingForMultiMutationsObserver.
846 table.put(put);
847 } catch (IOException ex) {
848 throw new RuntimeException(ex);
851 ExecutorService appendService = Executors.newSingleThreadExecutor();
852 appendService.execute(() -> {
853 Append append = new Append(ROW);
854 append.addColumn(FAMILY, QUALIFIER, VALUE);
855 try (Table table = con.getTable(tableName)) {
856 table.append(append);
857 fail("The APPEND should fail because the target lock is blocked by previous put");
858 } catch (Exception ex) {
861 appendService.shutdown();
862 appendService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
863 WaitingForMultiMutationsObserver observer =
864 find(tableName, WaitingForMultiMutationsObserver.class);
865 observer.latch.countDown();
866 putService.shutdown();
867 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
868 try (Table table = con.getTable(tableName)) {
869 Result r = table.get(new Get(ROW));
870 assertFalse(r.isEmpty());
871 assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), VALUE));
874 HRegion region = (HRegion) find(tableName);
875 int readLockCount = region.getReadLockCount();
876 LOG.info("readLockCount:" + readLockCount);
877 assertEquals(0, readLockCount);
880 @Test
881 public void testMultiRowMutations() throws Exception, Throwable {
882 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
883 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
884 .setCoprocessor(MultiRowMutationEndpoint.class.getName())
885 .setCoprocessor(WaitingForMultiMutationsObserver.class.getName())
886 .setValue("hbase.rowlock.wait.duration", String.valueOf(5000)).build();
887 TEST_UTIL.getAdmin().createTable(tableDescriptor);
888 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
890 // new a connection for lower retry number.
891 Configuration copy = new Configuration(TEST_UTIL.getConfiguration());
892 copy.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
893 try (Connection con = ConnectionFactory.createConnection(copy)) {
894 byte[] row = Bytes.toBytes("ROW-0");
895 byte[] rowLocked= Bytes.toBytes("ROW-1");
896 byte[] value0 = Bytes.toBytes("VALUE-0");
897 byte[] value1 = Bytes.toBytes("VALUE-1");
898 byte[] value2 = Bytes.toBytes("VALUE-2");
899 assertNoLocks(tableName);
900 ExecutorService putService = Executors.newSingleThreadExecutor();
901 putService.execute(() -> {
902 try (Table table = con.getTable(tableName)) {
903 Put put0 = new Put(rowLocked);
904 put0.addColumn(FAMILY, QUALIFIER, value0);
905 // the put will be blocked by WaitingForMultiMutationsObserver.
906 table.put(put0);
907 } catch (IOException ex) {
908 throw new RuntimeException(ex);
911 ExecutorService cpService = Executors.newSingleThreadExecutor();
912 AtomicBoolean exceptionDuringMutateRows = new AtomicBoolean();
913 cpService.execute(() -> {
914 Put put1 = new Put(row);
915 Put put2 = new Put(rowLocked);
916 put1.addColumn(FAMILY, QUALIFIER, value1);
917 put2.addColumn(FAMILY, QUALIFIER, value2);
918 try (Table table = con.getTable(tableName)) {
919 MultiRowMutationProtos.MutateRowsRequest request =
920 MultiRowMutationProtos.MutateRowsRequest.newBuilder()
921 .addMutationRequest(
922 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put1))
923 .addMutationRequest(
924 ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, put2))
925 .build();
926 table.coprocessorService(MultiRowMutationProtos.MultiRowMutationService.class,
927 ROW, ROW,
928 (MultiRowMutationProtos.MultiRowMutationService exe) -> {
929 ServerRpcController controller = new ServerRpcController();
930 CoprocessorRpcUtils.BlockingRpcCallback<MultiRowMutationProtos.MutateRowsResponse>
931 rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();
932 exe.mutateRows(controller, request, rpcCallback);
933 if (controller.failedOnException() &&
934 !(controller.getFailedOn() instanceof UnknownProtocolException)) {
935 exceptionDuringMutateRows.set(true);
937 return rpcCallback.get();
939 } catch (Throwable ex) {
940 LOG.error("encountered " + ex);
943 cpService.shutdown();
944 cpService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
945 WaitingForMultiMutationsObserver observer = find(tableName,
946 WaitingForMultiMutationsObserver.class);
947 observer.latch.countDown();
948 putService.shutdown();
949 putService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
950 try (Table table = con.getTable(tableName)) {
951 Get g0 = new Get(row);
952 Get g1 = new Get(rowLocked);
953 Result r0 = table.get(g0);
954 Result r1 = table.get(g1);
955 assertTrue(r0.isEmpty());
956 assertFalse(r1.isEmpty());
957 assertTrue(Bytes.equals(r1.getValue(FAMILY, QUALIFIER), value0));
959 assertNoLocks(tableName);
960 if (!exceptionDuringMutateRows.get()) {
961 fail("This cp should fail because the target lock is blocked by previous put");
967 * A test case for issue HBASE-17482
968 * After combile seqid with mvcc readpoint, seqid/mvcc is acquired and stamped
969 * onto cells in the append thread, a countdown latch is used to ensure that happened
970 * before cells can be put into memstore. But the MVCCPreAssign patch(HBASE-16698)
971 * make the seqid/mvcc acquirement in handler thread and stamping in append thread
972 * No countdown latch to assure cells in memstore are stamped with seqid/mvcc.
973 * If cells without mvcc(A.K.A mvcc=0) are put into memstore, then a scanner
974 * with a smaller readpoint can see these data, which disobey the multi version
975 * concurrency control rules.
976 * This test case is to reproduce this scenario.
977 * @throws IOException
979 @Test
980 public void testMVCCUsingMVCCPreAssign() throws IOException, InterruptedException {
981 try (Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY })) {
982 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
983 //put two row first to init the scanner
984 Put put = new Put(Bytes.toBytes("0"));
985 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
986 table.put(put);
987 put = new Put(Bytes.toBytes("00"));
988 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes("0"));
989 table.put(put);
990 Scan scan = new Scan();
991 scan.setTimeRange(0, Long.MAX_VALUE);
992 scan.setCaching(1);
993 ResultScanner scanner = table.getScanner(scan);
994 int rowNum = scanner.next() != null ? 1 : 0;
995 //the started scanner shouldn't see the rows put below
996 for (int i = 1; i < 1000; i++) {
997 put = new Put(Bytes.toBytes(String.valueOf(i)));
998 put.setDurability(Durability.ASYNC_WAL);
999 put.addColumn(FAMILY, Bytes.toBytes(""), Bytes.toBytes(i));
1000 table.put(put);
1002 for (Result result : scanner) {
1003 rowNum++;
1005 //scanner should only see two rows
1006 assertEquals(2, rowNum);
1007 scanner = table.getScanner(scan);
1008 rowNum = 0;
1009 for (Result result : scanner) {
1010 rowNum++;
1012 // the new scanner should see all rows
1013 assertEquals(1001, rowNum);
1017 @Test
1018 public void testPutThenGetWithMultipleThreads() throws Exception {
1019 final int THREAD_NUM = 20;
1020 final int ROUND_NUM = 10;
1021 for (int round = 0; round < ROUND_NUM; round++) {
1022 ArrayList<Thread> threads = new ArrayList<>(THREAD_NUM);
1023 final AtomicInteger successCnt = new AtomicInteger(0);
1024 try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) {
1025 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1027 for (int i = 0; i < THREAD_NUM; i++) {
1028 final int index = i;
1029 Thread t = new Thread(new Runnable() {
1031 @Override
1032 public void run() {
1033 final byte[] row = Bytes.toBytes("row-" + index);
1034 final byte[] value = Bytes.toBytes("v" + index);
1035 try {
1036 Put put = new Put(row);
1037 put.addColumn(FAMILY, QUALIFIER, value);
1038 ht.put(put);
1039 Get get = new Get(row);
1040 Result result = ht.get(get);
1041 byte[] returnedValue = result.getValue(FAMILY, QUALIFIER);
1042 if (Bytes.equals(value, returnedValue)) {
1043 successCnt.getAndIncrement();
1044 } else {
1045 LOG.error("Should be equal but not, original value: " + Bytes.toString(value)
1046 + ", returned value: "
1047 + (returnedValue == null ? "null" : Bytes.toString(returnedValue)));
1049 } catch (Throwable e) {
1050 // do nothing
1054 threads.add(t);
1056 for (Thread t : threads) {
1057 t.start();
1059 for (Thread t : threads) {
1060 t.join();
1062 assertEquals("Not equal in round " + round, THREAD_NUM, successCnt.get());
1064 TEST_UTIL.deleteTable(tableName);
1068 private static void assertNoLocks(final TableName tableName)
1069 throws IOException, InterruptedException {
1070 HRegion region = (HRegion) find(tableName);
1071 assertEquals(0, region.getLockedRows().size());
1073 private static HRegion find(final TableName tableName)
1074 throws IOException, InterruptedException {
1075 HRegionServer rs = TEST_UTIL.getRSForFirstRegionInTable(tableName);
1076 List<HRegion> regions = rs.getRegions(tableName);
1077 assertEquals(1, regions.size());
1078 return regions.get(0);
1081 private static <T extends RegionObserver> T find(final TableName tableName,
1082 Class<T> clz) throws IOException, InterruptedException {
1083 HRegion region = find(tableName);
1084 Coprocessor cp = region.getCoprocessorHost().findCoprocessor(clz.getName());
1085 assertTrue("The cp instance should be " + clz.getName()
1086 + ", current instance is " + cp.getClass().getName(), clz.isInstance(cp));
1087 return clz.cast(cp);
1090 public static class WaitingForMultiMutationsObserver
1091 implements RegionCoprocessor, RegionObserver {
1092 final CountDownLatch latch = new CountDownLatch(1);
1094 @Override
1095 public Optional<RegionObserver> getRegionObserver() {
1096 return Optional.of(this);
1099 @Override
1100 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1101 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1102 try {
1103 latch.await();
1104 } catch (InterruptedException ex) {
1105 throw new IOException(ex);
1110 public static class WaitingForScanObserver implements RegionCoprocessor, RegionObserver {
1111 private final CountDownLatch latch = new CountDownLatch(1);
1113 @Override
1114 public Optional<RegionObserver> getRegionObserver() {
1115 return Optional.of(this);
1118 @Override
1119 public void postBatchMutate(final ObserverContext<RegionCoprocessorEnvironment> c,
1120 final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
1121 try {
1122 // waiting for scanner
1123 latch.await();
1124 } catch (InterruptedException ex) {
1125 throw new IOException(ex);
1129 @Override
1130 public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
1131 final Scan scan, final RegionScanner s) throws IOException {
1132 latch.countDown();
1133 return s;
1137 static byte[] generateHugeValue(int size) {
1138 Random rand = ThreadLocalRandom.current();
1139 byte[] value = new byte[size];
1140 for (int i = 0; i < value.length; i++) {
1141 value[i] = (byte) rand.nextInt(256);
1143 return value;
1146 @Test
1147 public void testScanWithBatchSizeReturnIncompleteCells() throws IOException, InterruptedException {
1148 TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
1149 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
1150 .build();
1151 try (Table table = TEST_UTIL.createTable(hd, null)) {
1152 TEST_UTIL.waitTableAvailable(tableName, WAITTABLE_MILLIS);
1154 Put put = new Put(ROW);
1155 put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
1156 table.put(put);
1158 put = new Put(ROW);
1159 put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
1160 table.put(put);
1162 for (int i = 2; i < 5; i++) {
1163 for (int version = 0; version < 2; version++) {
1164 put = new Put(ROW);
1165 put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
1166 table.put(put);
1170 Scan scan = new Scan();
1171 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(3)
1172 .setMaxResultSize(4 * 1024 * 1024);
1173 Result result;
1174 try (ResultScanner scanner = table.getScanner(scan)) {
1175 List<Result> list = new ArrayList<>();
1177 * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
1178 * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
1179 * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
1180 * moreResultsInRegion also would be false. Finally, the client should collect all the cells
1181 * into two result: 2+3 -> 3+2;
1183 while ((result = scanner.next()) != null) {
1184 list.add(result);
1187 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1188 Assert.assertEquals(2, list.size());
1189 Assert.assertEquals(3, list.get(0).size());
1190 Assert.assertEquals(2, list.get(1).size());
1193 scan = new Scan();
1194 scan.withStartRow(ROW).withStopRow(ROW, true).addFamily(FAMILY).setBatch(2)
1195 .setMaxResultSize(4 * 1024 * 1024);
1196 try (ResultScanner scanner = table.getScanner(scan)) {
1197 List<Result> list = new ArrayList<>();
1198 while ((result = scanner.next()) != null) {
1199 list.add(result);
1201 Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
1202 Assert.assertEquals(3, list.size());
1203 Assert.assertEquals(2, list.get(0).size());
1204 Assert.assertEquals(2, list.get(1).size());
1205 Assert.assertEquals(1, list.get(2).size());