HBASE-26582 Prune use of Random and SecureRandom objects (#4118)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestAtomicOperation.java
blob49aeae0c105139bee7e2e19baf606831b17eb95d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
21 import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Objects;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.atomic.AtomicInteger;
34 import java.util.concurrent.atomic.AtomicLong;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.CellUtil;
40 import org.apache.hadoop.hbase.CompareOperator;
41 import org.apache.hadoop.hbase.HBaseClassTestRule;
42 import org.apache.hadoop.hbase.HBaseTestingUtil;
43 import org.apache.hadoop.hbase.HConstants;
44 import org.apache.hadoop.hbase.MultithreadedTestUtil;
45 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
46 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.Append;
49 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
50 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
51 import org.apache.hadoop.hbase.client.Delete;
52 import org.apache.hadoop.hbase.client.Durability;
53 import org.apache.hadoop.hbase.client.Get;
54 import org.apache.hadoop.hbase.client.Increment;
55 import org.apache.hadoop.hbase.client.IsolationLevel;
56 import org.apache.hadoop.hbase.client.Mutation;
57 import org.apache.hadoop.hbase.client.Put;
58 import org.apache.hadoop.hbase.client.RegionInfo;
59 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
60 import org.apache.hadoop.hbase.client.Result;
61 import org.apache.hadoop.hbase.client.RowMutations;
62 import org.apache.hadoop.hbase.client.Scan;
63 import org.apache.hadoop.hbase.client.TableDescriptor;
64 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
65 import org.apache.hadoop.hbase.filter.BinaryComparator;
66 import org.apache.hadoop.hbase.io.HeapSize;
67 import org.apache.hadoop.hbase.io.hfile.BlockCache;
68 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
69 import org.apache.hadoop.hbase.testclassification.LargeTests;
70 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
71 import org.apache.hadoop.hbase.util.Bytes;
72 import org.apache.hadoop.hbase.wal.WAL;
73 import org.junit.After;
74 import org.junit.Before;
75 import org.junit.ClassRule;
76 import org.junit.Rule;
77 import org.junit.Test;
78 import org.junit.experimental.categories.Category;
79 import org.junit.rules.TestName;
80 import org.slf4j.Logger;
81 import org.slf4j.LoggerFactory;
83 /**
84 * Testing of HRegion.incrementColumnValue, HRegion.increment,
85 * and HRegion.append
87 @Category({VerySlowRegionServerTests.class, LargeTests.class}) // Starts 100 threads
88 public class TestAtomicOperation {
90 @ClassRule
91 public static final HBaseClassTestRule CLASS_RULE =
92 HBaseClassTestRule.forClass(TestAtomicOperation.class);
94 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
95 @Rule public TestName name = new TestName();
97 HRegion region = null;
98 private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
100 // Test names
101 static byte[] tableName;
102 static final byte[] qual1 = Bytes.toBytes("qual1");
103 static final byte[] qual2 = Bytes.toBytes("qual2");
104 static final byte[] qual3 = Bytes.toBytes("qual3");
105 static final byte[] value1 = Bytes.toBytes("value1");
106 static final byte[] value2 = Bytes.toBytes("value2");
107 static final byte [] row = Bytes.toBytes("rowA");
108 static final byte [] row2 = Bytes.toBytes("rowB");
110 @Before
111 public void setup() {
112 tableName = Bytes.toBytes(name.getMethodName());
115 @After
116 public void teardown() throws IOException {
117 if (region != null) {
118 CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig();
119 region.close();
120 WAL wal = region.getWAL();
121 if (wal != null) {
122 wal.close();
124 cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown);
125 region = null;
129 //////////////////////////////////////////////////////////////////////////////
130 // New tests that doesn't spin up a mini cluster but rather just test the
131 // individual code pieces in the HRegion.
132 //////////////////////////////////////////////////////////////////////////////
135 * Test basic append operation.
136 * More tests in
137 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
139 @Test
140 public void testAppend() throws IOException {
141 initHRegion(tableName, name.getMethodName(), fam1);
142 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
143 " The Universe, and Everything";
144 String v2 = " is... 42.";
145 Append a = new Append(row);
146 a.setReturnResults(false);
147 a.addColumn(fam1, qual1, Bytes.toBytes(v1));
148 a.addColumn(fam1, qual2, Bytes.toBytes(v2));
149 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
150 a = new Append(row);
151 a.addColumn(fam1, qual1, Bytes.toBytes(v2));
152 a.addColumn(fam1, qual2, Bytes.toBytes(v1));
153 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
154 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
155 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
158 @Test
159 public void testAppendWithMultipleFamilies() throws IOException {
160 final byte[] fam3 = Bytes.toBytes("colfamily31");
161 initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3);
162 String v1 = "Appended";
163 String v2 = "Value";
165 Append a = new Append(row);
166 a.setReturnResults(false);
167 a.addColumn(fam1, qual1, Bytes.toBytes(v1));
168 a.addColumn(fam2, qual2, Bytes.toBytes(v2));
169 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
170 assertTrue("Expected an empty result but result contains " + result.size() + " keys",
171 result.isEmpty());
173 a = new Append(row);
174 a.addColumn(fam2, qual2, Bytes.toBytes(v1));
175 a.addColumn(fam1, qual1, Bytes.toBytes(v2));
176 a.addColumn(fam3, qual3, Bytes.toBytes(v2));
177 a.addColumn(fam1, qual2, Bytes.toBytes(v1));
179 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
181 byte[] actualValue1 = result.getValue(fam1, qual1);
182 byte[] actualValue2 = result.getValue(fam2, qual2);
183 byte[] actualValue3 = result.getValue(fam3, qual3);
184 byte[] actualValue4 = result.getValue(fam1, qual2);
186 assertNotNull("Value1 should bot be null", actualValue1);
187 assertNotNull("Value2 should bot be null", actualValue2);
188 assertNotNull("Value3 should bot be null", actualValue3);
189 assertNotNull("Value4 should bot be null", actualValue4);
190 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1));
191 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2));
192 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3));
193 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4));
196 @Test
197 public void testAppendWithNonExistingFamily() throws IOException {
198 initHRegion(tableName, name.getMethodName(), fam1);
199 final String v1 = "Value";
200 final Append a = new Append(row);
201 a.addColumn(fam1, qual1, Bytes.toBytes(v1));
202 a.addColumn(fam2, qual2, Bytes.toBytes(v1));
203 Result result = null;
204 try {
205 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
206 fail("Append operation should fail with NoSuchColumnFamilyException.");
207 } catch (NoSuchColumnFamilyException e) {
208 assertEquals(null, result);
209 } catch (Exception e) {
210 fail("Append operation should fail with NoSuchColumnFamilyException.");
214 @Test
215 public void testIncrementWithNonExistingFamily() throws IOException {
216 initHRegion(tableName, name.getMethodName(), fam1);
217 final Increment inc = new Increment(row);
218 inc.addColumn(fam1, qual1, 1);
219 inc.addColumn(fam2, qual2, 1);
220 inc.setDurability(Durability.ASYNC_WAL);
221 try {
222 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
223 } catch (NoSuchColumnFamilyException e) {
224 final Get g = new Get(row);
225 final Result result = region.get(g);
226 assertEquals(null, result.getValue(fam1, qual1));
227 assertEquals(null, result.getValue(fam2, qual2));
228 } catch (Exception e) {
229 fail("Increment operation should fail with NoSuchColumnFamilyException.");
234 * Test multi-threaded increments.
236 @Test
237 public void testIncrementMultiThreads() throws IOException {
238 boolean fast = true;
239 LOG.info("Starting test testIncrementMultiThreads");
240 // run a with mixed column families (1 and 3 versions)
241 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
243 // Create 100 threads, each will increment by its own quantity. All 100 threads update the
244 // same row over two column families.
245 int numThreads = 100;
246 int incrementsPerThread = 1000;
247 Incrementer[] all = new Incrementer[numThreads];
248 int expectedTotal = 0;
249 // create all threads
250 for (int i = 0; i < numThreads; i++) {
251 all[i] = new Incrementer(region, i, i, incrementsPerThread);
252 expectedTotal += (i * incrementsPerThread);
255 // run all threads
256 for (int i = 0; i < numThreads; i++) {
257 all[i].start();
260 // wait for all threads to finish
261 for (int i = 0; i < numThreads; i++) {
262 try {
263 all[i].join();
264 } catch (InterruptedException e) {
265 LOG.info("Ignored", e);
268 assertICV(row, fam1, qual1, expectedTotal, fast);
269 assertICV(row, fam1, qual2, expectedTotal*2, fast);
270 assertICV(row, fam2, qual3, expectedTotal*3, fast);
271 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
275 private void assertICV(byte [] row,
276 byte [] familiy,
277 byte[] qualifier,
278 long amount,
279 boolean fast) throws IOException {
280 // run a get and see?
281 Get get = new Get(row);
282 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
283 get.addColumn(familiy, qualifier);
284 Result result = region.get(get);
285 assertEquals(1, result.size());
287 Cell kv = result.rawCells()[0];
288 long r = Bytes.toLong(CellUtil.cloneValue(kv));
289 assertEquals(amount, r);
292 private void initHRegion (byte [] tableName, String callingMethod,
293 byte[] ... families)
294 throws IOException {
295 initHRegion(tableName, callingMethod, null, families);
298 private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions,
299 byte[]... families) throws IOException {
300 TableDescriptorBuilder builder =
301 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
303 int i = 0;
304 for (byte[] family : families) {
305 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family)
306 .setMaxVersions(maxVersions != null ? maxVersions[i++] : 1).build();
307 builder.setColumnFamily(familyDescriptor);
309 TableDescriptor tableDescriptor = builder.build();
310 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
311 region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
315 * A thread that makes increment calls always on the same row, this.row against two column
316 * families on this row.
318 public static class Incrementer extends Thread {
320 private final Region region;
321 private final int numIncrements;
322 private final int amount;
325 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
326 super("Incrementer." + threadNumber);
327 this.region = region;
328 this.numIncrements = numIncrements;
329 this.amount = amount;
330 setDaemon(true);
333 @Override
334 public void run() {
335 for (int i = 0; i < numIncrements; i++) {
336 try {
337 Increment inc = new Increment(row);
338 inc.addColumn(fam1, qual1, amount);
339 inc.addColumn(fam1, qual2, amount*2);
340 inc.addColumn(fam2, qual3, amount*3);
341 inc.setDurability(Durability.ASYNC_WAL);
342 Result result = region.increment(inc);
343 if (result != null) {
344 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
345 Bytes.toLong(result.getValue(fam1, qual2)));
346 assertTrue(result.getValue(fam2, qual3) != null);
347 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
348 Bytes.toLong(result.getValue(fam2, qual3)));
349 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
350 Bytes.toLong(result.getValue(fam1, qual2)));
351 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
352 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
353 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
354 fam1Increment, fam2Increment);
356 } catch (IOException e) {
357 e.printStackTrace();
363 @Test
364 public void testAppendMultiThreads() throws IOException {
365 LOG.info("Starting test testAppendMultiThreads");
366 // run a with mixed column families (1 and 3 versions)
367 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
369 int numThreads = 100;
370 int opsPerThread = 100;
371 AtomicOperation[] all = new AtomicOperation[numThreads];
372 final byte[] val = new byte[]{1};
374 AtomicInteger failures = new AtomicInteger(0);
375 // create all threads
376 for (int i = 0; i < numThreads; i++) {
377 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
378 @Override
379 public void run() {
380 for (int i=0; i<numOps; i++) {
381 try {
382 Append a = new Append(row);
383 a.addColumn(fam1, qual1, val);
384 a.addColumn(fam1, qual2, val);
385 a.addColumn(fam2, qual3, val);
386 a.setDurability(Durability.ASYNC_WAL);
387 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
389 Get g = new Get(row);
390 Result result = region.get(g);
391 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
392 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
393 } catch (IOException e) {
394 e.printStackTrace();
395 failures.incrementAndGet();
396 fail();
403 // run all threads
404 for (int i = 0; i < numThreads; i++) {
405 all[i].start();
408 // wait for all threads to finish
409 for (int i = 0; i < numThreads; i++) {
410 try {
411 all[i].join();
412 } catch (InterruptedException e) {
415 assertEquals(0, failures.get());
416 Get g = new Get(row);
417 Result result = region.get(g);
418 assertEquals(10000, result.getValue(fam1, qual1).length);
419 assertEquals(10000, result.getValue(fam1, qual2).length);
420 assertEquals(10000, result.getValue(fam2, qual3).length);
423 * Test multi-threaded row mutations.
425 @Test
426 public void testRowMutationMultiThreads() throws IOException {
427 LOG.info("Starting test testRowMutationMultiThreads");
428 initHRegion(tableName, name.getMethodName(), fam1);
430 // create 10 threads, each will alternate between adding and
431 // removing a column
432 int numThreads = 10;
433 int opsPerThread = 250;
434 AtomicOperation[] all = new AtomicOperation[numThreads];
436 AtomicLong timeStamps = new AtomicLong(0);
437 AtomicInteger failures = new AtomicInteger(0);
438 // create all threads
439 for (int i = 0; i < numThreads; i++) {
440 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
441 @Override
442 public void run() {
443 boolean op = true;
444 for (int i=0; i<numOps; i++) {
445 try {
446 // throw in some flushes
447 if (i%10==0) {
448 synchronized(region) {
449 LOG.debug("flushing");
450 region.flush(true);
451 if (i%100==0) {
452 region.compact(false);
456 long ts = timeStamps.incrementAndGet();
457 RowMutations rm = new RowMutations(row);
458 if (op) {
459 Put p = new Put(row, ts);
460 p.addColumn(fam1, qual1, value1);
461 p.setDurability(Durability.ASYNC_WAL);
462 rm.add(p);
463 Delete d = new Delete(row);
464 d.addColumns(fam1, qual2, ts);
465 d.setDurability(Durability.ASYNC_WAL);
466 rm.add(d);
467 } else {
468 Delete d = new Delete(row);
469 d.addColumns(fam1, qual1, ts);
470 d.setDurability(Durability.ASYNC_WAL);
471 rm.add(d);
472 Put p = new Put(row, ts);
473 p.addColumn(fam1, qual2, value2);
474 p.setDurability(Durability.ASYNC_WAL);
475 rm.add(p);
477 region.mutateRow(rm);
478 op ^= true;
479 // check: should always see exactly one column
480 Get g = new Get(row);
481 Result r = region.get(g);
482 if (r.size() != 1) {
483 LOG.debug(Objects.toString(r));
484 failures.incrementAndGet();
485 fail();
487 } catch (IOException e) {
488 e.printStackTrace();
489 failures.incrementAndGet();
490 fail();
497 // run all threads
498 for (int i = 0; i < numThreads; i++) {
499 all[i].start();
502 // wait for all threads to finish
503 for (int i = 0; i < numThreads; i++) {
504 try {
505 all[i].join();
506 } catch (InterruptedException e) {
509 assertEquals(0, failures.get());
514 * Test multi-threaded region mutations.
516 @Test
517 public void testMultiRowMutationMultiThreads() throws IOException {
519 LOG.info("Starting test testMultiRowMutationMultiThreads");
520 initHRegion(tableName, name.getMethodName(), fam1);
522 // create 10 threads, each will alternate between adding and
523 // removing a column
524 int numThreads = 10;
525 int opsPerThread = 250;
526 AtomicOperation[] all = new AtomicOperation[numThreads];
528 AtomicLong timeStamps = new AtomicLong(0);
529 AtomicInteger failures = new AtomicInteger(0);
530 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
531 // create all threads
532 for (int i = 0; i < numThreads; i++) {
533 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
534 @Override
535 public void run() {
536 boolean op = true;
537 for (int i=0; i<numOps; i++) {
538 try {
539 // throw in some flushes
540 if (i%10==0) {
541 synchronized(region) {
542 LOG.debug("flushing");
543 region.flush(true);
544 if (i%100==0) {
545 region.compact(false);
549 long ts = timeStamps.incrementAndGet();
550 List<Mutation> mrm = new ArrayList<>();
551 if (op) {
552 Put p = new Put(row2, ts);
553 p.addColumn(fam1, qual1, value1);
554 p.setDurability(Durability.ASYNC_WAL);
555 mrm.add(p);
556 Delete d = new Delete(row);
557 d.addColumns(fam1, qual1, ts);
558 d.setDurability(Durability.ASYNC_WAL);
559 mrm.add(d);
560 } else {
561 Delete d = new Delete(row2);
562 d.addColumns(fam1, qual1, ts);
563 d.setDurability(Durability.ASYNC_WAL);
564 mrm.add(d);
565 Put p = new Put(row, ts);
566 p.setDurability(Durability.ASYNC_WAL);
567 p.addColumn(fam1, qual1, value2);
568 mrm.add(p);
570 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
571 op ^= true;
572 // check: should always see exactly one column
573 Scan s = new Scan().withStartRow(row);
574 RegionScanner rs = region.getScanner(s);
575 List<Cell> r = new ArrayList<>();
576 while (rs.next(r))
578 rs.close();
579 if (r.size() != 1) {
580 LOG.debug(Objects.toString(r));
581 failures.incrementAndGet();
582 fail();
584 } catch (IOException e) {
585 e.printStackTrace();
586 failures.incrementAndGet();
587 fail();
594 // run all threads
595 for (int i = 0; i < numThreads; i++) {
596 all[i].start();
599 // wait for all threads to finish
600 for (int i = 0; i < numThreads; i++) {
601 try {
602 all[i].join();
603 } catch (InterruptedException e) {
606 assertEquals(0, failures.get());
609 public static class AtomicOperation extends Thread {
610 protected final HRegion region;
611 protected final int numOps;
612 protected final AtomicLong timeStamps;
613 protected final AtomicInteger failures;
615 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
616 AtomicInteger failures) {
617 this.region = region;
618 this.numOps = numOps;
619 this.timeStamps = timeStamps;
620 this.failures = failures;
624 private static CountDownLatch latch = new CountDownLatch(1);
625 private enum TestStep {
626 INIT, // initial put of 10 to set value of the cell
627 PUT_STARTED, // began doing a put of 50 to cell
628 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
629 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
630 CHECKANDPUT_COMPLETED // completed checkAndPut
631 // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
633 private static volatile TestStep testStep = TestStep.INIT;
634 private final String family = "f1";
637 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
638 * MVCC.
640 * Moved into TestAtomicOperation from its original location, TestHBase7051
642 @Test
643 public void testPutAndCheckAndPutInParallel() throws Exception {
644 Configuration conf = TEST_UTIL.getConfiguration();
645 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
646 TableDescriptorBuilder tableDescriptorBuilder =
647 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()));
648 ColumnFamilyDescriptor columnFamilyDescriptor =
649 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();
650 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
651 this.region = TEST_UTIL.createLocalHRegion(tableDescriptorBuilder.build(), null, null);
652 Put[] puts = new Put[1];
653 Put put = new Put(Bytes.toBytes("r1"));
654 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
655 puts[0] = put;
657 region.batchMutate(puts);
658 MultithreadedTestUtil.TestContext ctx =
659 new MultithreadedTestUtil.TestContext(conf);
660 ctx.addThread(new PutThread(ctx, region));
661 ctx.addThread(new CheckAndPutThread(ctx, region));
662 ctx.startThreads();
663 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
664 Thread.sleep(100);
666 ctx.stop();
667 Scan s = new Scan();
668 RegionScanner scanner = region.getScanner(s);
669 List<Cell> results = new ArrayList<>();
670 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
671 scanner.next(results, scannerContext);
672 for (Cell keyValue : results) {
673 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
677 private class PutThread extends TestThread {
678 private Region region;
679 PutThread(TestContext ctx, Region region) {
680 super(ctx);
681 this.region = region;
684 @Override
685 public void doWork() throws Exception {
686 Put[] puts = new Put[1];
687 Put put = new Put(Bytes.toBytes("r1"));
688 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
689 puts[0] = put;
690 testStep = TestStep.PUT_STARTED;
691 region.batchMutate(puts);
695 private class CheckAndPutThread extends TestThread {
696 private Region region;
697 CheckAndPutThread(TestContext ctx, Region region) {
698 super(ctx);
699 this.region = region;
702 @Override
703 public void doWork() throws Exception {
704 Put[] puts = new Put[1];
705 Put put = new Put(Bytes.toBytes("r1"));
706 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
707 puts[0] = put;
708 while (testStep != TestStep.PUT_COMPLETED) {
709 Thread.sleep(100);
711 testStep = TestStep.CHECKANDPUT_STARTED;
712 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
713 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
714 testStep = TestStep.CHECKANDPUT_COMPLETED;
718 public static class MockHRegion extends HRegion {
720 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
721 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
722 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
725 @Override
726 protected RowLock getRowLockInternal(final byte[] row, boolean readLock,
727 final RowLock prevRowlock) throws IOException {
728 if (testStep == TestStep.CHECKANDPUT_STARTED) {
729 latch.countDown();
731 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
734 public class WrappedRowLock implements RowLock {
736 private final RowLock rowLock;
738 private WrappedRowLock(RowLock rowLock) {
739 this.rowLock = rowLock;
743 @Override
744 public void release() {
745 if (testStep == TestStep.INIT) {
746 this.rowLock.release();
747 return;
750 if (testStep == TestStep.PUT_STARTED) {
751 try {
752 testStep = TestStep.PUT_COMPLETED;
753 this.rowLock.release();
754 // put has been written to the memstore and the row lock has been released, but the
755 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
756 // operations would cause the non-atomicity to show up:
757 // 1) Put releases row lock (where we are now)
758 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
759 // because the MVCC has not advanced
760 // 3) Put advances MVCC
761 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
762 // (see below), and then wait some more to give the checkAndPut time to read the old
763 // value.
764 latch.await();
765 Thread.sleep(1000);
766 } catch (InterruptedException e) {
767 Thread.currentThread().interrupt();
770 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
771 this.rowLock.release();