HBASE-17532 Replaced explicit type with diamond operator
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestAtomicOperation.java
blobef3ce06efb7e60a1d5bf20116501a642f4d0494c
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
19 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNull;
23 import static org.junit.Assert.assertTrue;
24 import static org.junit.Assert.fail;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Arrays;
29 import java.util.List;
30 import java.util.Random;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.atomic.AtomicLong;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FileSystem;
39 import org.apache.hadoop.fs.Path;
40 import org.apache.hadoop.hbase.Cell;
41 import org.apache.hadoop.hbase.CellUtil;
42 import org.apache.hadoop.hbase.HBaseTestingUtility;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HTableDescriptor;
47 import org.apache.hadoop.hbase.MultithreadedTestUtil;
48 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
49 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
50 import org.apache.hadoop.hbase.TableName;
51 import org.apache.hadoop.hbase.client.Append;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Durability;
54 import org.apache.hadoop.hbase.client.Get;
55 import org.apache.hadoop.hbase.client.Increment;
56 import org.apache.hadoop.hbase.client.IsolationLevel;
57 import org.apache.hadoop.hbase.client.Mutation;
58 import org.apache.hadoop.hbase.client.Put;
59 import org.apache.hadoop.hbase.client.Result;
60 import org.apache.hadoop.hbase.client.RowMutations;
61 import org.apache.hadoop.hbase.client.Scan;
62 import org.apache.hadoop.hbase.filter.BinaryComparator;
63 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
64 import org.apache.hadoop.hbase.io.HeapSize;
65 import org.apache.hadoop.hbase.io.hfile.BlockCache;
66 import org.apache.hadoop.hbase.testclassification.MediumTests;
67 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.wal.WAL;
70 import org.junit.After;
71 import org.junit.Before;
72 import org.junit.Rule;
73 import org.junit.Test;
74 import org.junit.experimental.categories.Category;
75 import org.junit.rules.TestName;
77 /**
78 * Testing of HRegion.incrementColumnValue, HRegion.increment,
79 * and HRegion.append
81 @Category({VerySlowRegionServerTests.class, MediumTests.class}) // Starts 100 threads
82 public class TestAtomicOperation {
83 private static final Log LOG = LogFactory.getLog(TestAtomicOperation.class);
84 @Rule public TestName name = new TestName();
86 Region region = null;
87 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU();
89 // Test names
90 static byte[] tableName;
91 static final byte[] qual1 = Bytes.toBytes("qual1");
92 static final byte[] qual2 = Bytes.toBytes("qual2");
93 static final byte[] qual3 = Bytes.toBytes("qual3");
94 static final byte[] value1 = Bytes.toBytes("value1");
95 static final byte[] value2 = Bytes.toBytes("value2");
96 static final byte [] row = Bytes.toBytes("rowA");
97 static final byte [] row2 = Bytes.toBytes("rowB");
99 @Before
100 public void setup() {
101 tableName = Bytes.toBytes(name.getMethodName());
104 @After
105 public void teardown() throws IOException {
106 if (region != null) {
107 BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache();
108 ((HRegion)region).close();
109 WAL wal = ((HRegion)region).getWAL();
110 if (wal != null) wal.close();
111 if (bc != null) bc.shutdown();
112 region = null;
115 //////////////////////////////////////////////////////////////////////////////
116 // New tests that doesn't spin up a mini cluster but rather just test the
117 // individual code pieces in the HRegion.
118 //////////////////////////////////////////////////////////////////////////////
121 * Test basic append operation.
122 * More tests in
123 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
125 @Test
126 public void testAppend() throws IOException {
127 initHRegion(tableName, name.getMethodName(), fam1);
128 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
129 " The Universe, and Everything";
130 String v2 = " is... 42.";
131 Append a = new Append(row);
132 a.setReturnResults(false);
133 a.add(fam1, qual1, Bytes.toBytes(v1));
134 a.add(fam1, qual2, Bytes.toBytes(v2));
135 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
136 a = new Append(row);
137 a.add(fam1, qual1, Bytes.toBytes(v2));
138 a.add(fam1, qual2, Bytes.toBytes(v1));
139 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
140 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
141 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
144 @Test
145 public void testAppendWithNonExistingFamily() throws IOException {
146 initHRegion(tableName, name.getMethodName(), fam1);
147 final String v1 = "Value";
148 final Append a = new Append(row);
149 a.add(fam1, qual1, Bytes.toBytes(v1));
150 a.add(fam2, qual2, Bytes.toBytes(v1));
151 Result result = null;
152 try {
153 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
154 fail("Append operation should fail with NoSuchColumnFamilyException.");
155 } catch (NoSuchColumnFamilyException e) {
156 assertEquals(null, result);
157 } catch (Exception e) {
158 fail("Append operation should fail with NoSuchColumnFamilyException.");
162 @Test
163 public void testIncrementWithNonExistingFamily() throws IOException {
164 initHRegion(tableName, name.getMethodName(), fam1);
165 final Increment inc = new Increment(row);
166 inc.addColumn(fam1, qual1, 1);
167 inc.addColumn(fam2, qual2, 1);
168 inc.setDurability(Durability.ASYNC_WAL);
169 try {
170 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
171 } catch (NoSuchColumnFamilyException e) {
172 final Get g = new Get(row);
173 final Result result = region.get(g);
174 assertEquals(null, result.getValue(fam1, qual1));
175 assertEquals(null, result.getValue(fam2, qual2));
176 } catch (Exception e) {
177 fail("Increment operation should fail with NoSuchColumnFamilyException.");
182 * Test multi-threaded increments.
184 @Test
185 public void testIncrementMultiThreads() throws IOException {
186 boolean fast = true;
187 LOG.info("Starting test testIncrementMultiThreads");
188 // run a with mixed column families (1 and 3 versions)
189 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
191 // Create 100 threads, each will increment by its own quantity. All 100 threads update the
192 // same row over two column families.
193 int numThreads = 100;
194 int incrementsPerThread = 1000;
195 Incrementer[] all = new Incrementer[numThreads];
196 int expectedTotal = 0;
197 // create all threads
198 for (int i = 0; i < numThreads; i++) {
199 all[i] = new Incrementer(region, i, i, incrementsPerThread);
200 expectedTotal += (i * incrementsPerThread);
203 // run all threads
204 for (int i = 0; i < numThreads; i++) {
205 all[i].start();
208 // wait for all threads to finish
209 for (int i = 0; i < numThreads; i++) {
210 try {
211 all[i].join();
212 } catch (InterruptedException e) {
213 LOG.info("Ignored", e);
216 assertICV(row, fam1, qual1, expectedTotal, fast);
217 assertICV(row, fam1, qual2, expectedTotal*2, fast);
218 assertICV(row, fam2, qual3, expectedTotal*3, fast);
219 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
223 private void assertICV(byte [] row,
224 byte [] familiy,
225 byte[] qualifier,
226 long amount,
227 boolean fast) throws IOException {
228 // run a get and see?
229 Get get = new Get(row);
230 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
231 get.addColumn(familiy, qualifier);
232 Result result = region.get(get);
233 assertEquals(1, result.size());
235 Cell kv = result.rawCells()[0];
236 long r = Bytes.toLong(CellUtil.cloneValue(kv));
237 assertEquals(amount, r);
240 private void initHRegion (byte [] tableName, String callingMethod,
241 byte[] ... families)
242 throws IOException {
243 initHRegion(tableName, callingMethod, null, families);
246 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions,
247 byte[] ... families)
248 throws IOException {
249 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
250 int i=0;
251 for(byte [] family : families) {
252 HColumnDescriptor hcd = new HColumnDescriptor(family);
253 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1);
254 htd.addFamily(hcd);
256 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
257 region = TEST_UTIL.createLocalHRegion(info, htd);
261 * A thread that makes increment calls always on the same row, this.row against two column
262 * families on this row.
264 public static class Incrementer extends Thread {
266 private final Region region;
267 private final int numIncrements;
268 private final int amount;
271 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
272 super("Incrementer." + threadNumber);
273 this.region = region;
274 this.numIncrements = numIncrements;
275 this.amount = amount;
276 setDaemon(true);
279 @Override
280 public void run() {
281 for (int i = 0; i < numIncrements; i++) {
282 try {
283 Increment inc = new Increment(row);
284 inc.addColumn(fam1, qual1, amount);
285 inc.addColumn(fam1, qual2, amount*2);
286 inc.addColumn(fam2, qual3, amount*3);
287 inc.setDurability(Durability.ASYNC_WAL);
288 Result result = region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
289 if (result != null) {
290 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
291 Bytes.toLong(result.getValue(fam1, qual2)));
292 assertTrue(result.getValue(fam2, qual3) != null);
293 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
294 Bytes.toLong(result.getValue(fam2, qual3)));
295 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
296 Bytes.toLong(result.getValue(fam1, qual2)));
297 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
298 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
299 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
300 fam1Increment, fam2Increment);
302 } catch (IOException e) {
303 e.printStackTrace();
309 @Test
310 public void testAppendMultiThreads() throws IOException {
311 LOG.info("Starting test testAppendMultiThreads");
312 // run a with mixed column families (1 and 3 versions)
313 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
315 int numThreads = 100;
316 int opsPerThread = 100;
317 AtomicOperation[] all = new AtomicOperation[numThreads];
318 final byte[] val = new byte[]{1};
320 AtomicInteger failures = new AtomicInteger(0);
321 // create all threads
322 for (int i = 0; i < numThreads; i++) {
323 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
324 @Override
325 public void run() {
326 for (int i=0; i<numOps; i++) {
327 try {
328 Append a = new Append(row);
329 a.add(fam1, qual1, val);
330 a.add(fam1, qual2, val);
331 a.add(fam2, qual3, val);
332 a.setDurability(Durability.ASYNC_WAL);
333 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
335 Get g = new Get(row);
336 Result result = region.get(g);
337 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
338 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
339 } catch (IOException e) {
340 e.printStackTrace();
341 failures.incrementAndGet();
342 fail();
349 // run all threads
350 for (int i = 0; i < numThreads; i++) {
351 all[i].start();
354 // wait for all threads to finish
355 for (int i = 0; i < numThreads; i++) {
356 try {
357 all[i].join();
358 } catch (InterruptedException e) {
361 assertEquals(0, failures.get());
362 Get g = new Get(row);
363 Result result = region.get(g);
364 assertEquals(result.getValue(fam1, qual1).length, 10000);
365 assertEquals(result.getValue(fam1, qual2).length, 10000);
366 assertEquals(result.getValue(fam2, qual3).length, 10000);
369 * Test multi-threaded row mutations.
371 @Test
372 public void testRowMutationMultiThreads() throws IOException {
373 LOG.info("Starting test testRowMutationMultiThreads");
374 initHRegion(tableName, name.getMethodName(), fam1);
376 // create 10 threads, each will alternate between adding and
377 // removing a column
378 int numThreads = 10;
379 int opsPerThread = 250;
380 AtomicOperation[] all = new AtomicOperation[numThreads];
382 AtomicLong timeStamps = new AtomicLong(0);
383 AtomicInteger failures = new AtomicInteger(0);
384 // create all threads
385 for (int i = 0; i < numThreads; i++) {
386 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
387 @Override
388 public void run() {
389 boolean op = true;
390 for (int i=0; i<numOps; i++) {
391 try {
392 // throw in some flushes
393 if (i%10==0) {
394 synchronized(region) {
395 LOG.debug("flushing");
396 region.flush(true);
397 if (i%100==0) {
398 region.compact(false);
402 long ts = timeStamps.incrementAndGet();
403 RowMutations rm = new RowMutations(row);
404 if (op) {
405 Put p = new Put(row, ts);
406 p.addColumn(fam1, qual1, value1);
407 p.setDurability(Durability.ASYNC_WAL);
408 rm.add(p);
409 Delete d = new Delete(row);
410 d.addColumns(fam1, qual2, ts);
411 d.setDurability(Durability.ASYNC_WAL);
412 rm.add(d);
413 } else {
414 Delete d = new Delete(row);
415 d.addColumns(fam1, qual1, ts);
416 d.setDurability(Durability.ASYNC_WAL);
417 rm.add(d);
418 Put p = new Put(row, ts);
419 p.addColumn(fam1, qual2, value2);
420 p.setDurability(Durability.ASYNC_WAL);
421 rm.add(p);
423 region.mutateRow(rm);
424 op ^= true;
425 // check: should always see exactly one column
426 Get g = new Get(row);
427 Result r = region.get(g);
428 if (r.size() != 1) {
429 LOG.debug(r);
430 failures.incrementAndGet();
431 fail();
433 } catch (IOException e) {
434 e.printStackTrace();
435 failures.incrementAndGet();
436 fail();
443 // run all threads
444 for (int i = 0; i < numThreads; i++) {
445 all[i].start();
448 // wait for all threads to finish
449 for (int i = 0; i < numThreads; i++) {
450 try {
451 all[i].join();
452 } catch (InterruptedException e) {
455 assertEquals(0, failures.get());
460 * Test multi-threaded region mutations.
462 @Test
463 public void testMultiRowMutationMultiThreads() throws IOException {
465 LOG.info("Starting test testMultiRowMutationMultiThreads");
466 initHRegion(tableName, name.getMethodName(), fam1);
468 // create 10 threads, each will alternate between adding and
469 // removing a column
470 int numThreads = 10;
471 int opsPerThread = 250;
472 AtomicOperation[] all = new AtomicOperation[numThreads];
474 AtomicLong timeStamps = new AtomicLong(0);
475 AtomicInteger failures = new AtomicInteger(0);
476 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
477 // create all threads
478 for (int i = 0; i < numThreads; i++) {
479 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
480 @Override
481 public void run() {
482 boolean op = true;
483 for (int i=0; i<numOps; i++) {
484 try {
485 // throw in some flushes
486 if (i%10==0) {
487 synchronized(region) {
488 LOG.debug("flushing");
489 region.flush(true);
490 if (i%100==0) {
491 region.compact(false);
495 long ts = timeStamps.incrementAndGet();
496 List<Mutation> mrm = new ArrayList<>();
497 if (op) {
498 Put p = new Put(row2, ts);
499 p.addColumn(fam1, qual1, value1);
500 p.setDurability(Durability.ASYNC_WAL);
501 mrm.add(p);
502 Delete d = new Delete(row);
503 d.addColumns(fam1, qual1, ts);
504 d.setDurability(Durability.ASYNC_WAL);
505 mrm.add(d);
506 } else {
507 Delete d = new Delete(row2);
508 d.addColumns(fam1, qual1, ts);
509 d.setDurability(Durability.ASYNC_WAL);
510 mrm.add(d);
511 Put p = new Put(row, ts);
512 p.setDurability(Durability.ASYNC_WAL);
513 p.addColumn(fam1, qual1, value2);
514 mrm.add(p);
516 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
517 op ^= true;
518 // check: should always see exactly one column
519 Scan s = new Scan(row);
520 RegionScanner rs = region.getScanner(s);
521 List<Cell> r = new ArrayList<>();
522 while (rs.next(r))
524 rs.close();
525 if (r.size() != 1) {
526 LOG.debug(r);
527 failures.incrementAndGet();
528 fail();
530 } catch (IOException e) {
531 e.printStackTrace();
532 failures.incrementAndGet();
533 fail();
540 // run all threads
541 for (int i = 0; i < numThreads; i++) {
542 all[i].start();
545 // wait for all threads to finish
546 for (int i = 0; i < numThreads; i++) {
547 try {
548 all[i].join();
549 } catch (InterruptedException e) {
552 assertEquals(0, failures.get());
555 public static class AtomicOperation extends Thread {
556 protected final Region region;
557 protected final int numOps;
558 protected final AtomicLong timeStamps;
559 protected final AtomicInteger failures;
560 protected final Random r = new Random();
562 public AtomicOperation(Region region, int numOps, AtomicLong timeStamps,
563 AtomicInteger failures) {
564 this.region = region;
565 this.numOps = numOps;
566 this.timeStamps = timeStamps;
567 this.failures = failures;
571 private static CountDownLatch latch = new CountDownLatch(1);
572 private enum TestStep {
573 INIT, // initial put of 10 to set value of the cell
574 PUT_STARTED, // began doing a put of 50 to cell
575 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
576 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
577 CHECKANDPUT_COMPLETED // completed checkAndPut
578 // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
580 private static volatile TestStep testStep = TestStep.INIT;
581 private final String family = "f1";
584 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
585 * MVCC.
587 * Moved into TestAtomicOperation from its original location, TestHBase7051
589 @Test
590 public void testPutAndCheckAndPutInParallel() throws Exception {
591 Configuration conf = TEST_UTIL.getConfiguration();
592 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
593 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(name.getMethodName()))
594 .addFamily(new HColumnDescriptor(family));
595 this.region = TEST_UTIL.createLocalHRegion(htd, null, null);
596 Put[] puts = new Put[1];
597 Put put = new Put(Bytes.toBytes("r1"));
598 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
599 puts[0] = put;
601 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
602 MultithreadedTestUtil.TestContext ctx =
603 new MultithreadedTestUtil.TestContext(conf);
604 ctx.addThread(new PutThread(ctx, region));
605 ctx.addThread(new CheckAndPutThread(ctx, region));
606 ctx.startThreads();
607 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
608 Thread.sleep(100);
610 ctx.stop();
611 Scan s = new Scan();
612 RegionScanner scanner = region.getScanner(s);
613 List<Cell> results = new ArrayList<>();
614 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
615 scanner.next(results, scannerContext);
616 for (Cell keyValue : results) {
617 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
621 private class PutThread extends TestThread {
622 private Region region;
623 PutThread(TestContext ctx, Region region) {
624 super(ctx);
625 this.region = region;
628 public void doWork() throws Exception {
629 Put[] puts = new Put[1];
630 Put put = new Put(Bytes.toBytes("r1"));
631 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
632 puts[0] = put;
633 testStep = TestStep.PUT_STARTED;
634 region.batchMutate(puts, HConstants.NO_NONCE, HConstants.NO_NONCE);
638 private class CheckAndPutThread extends TestThread {
639 private Region region;
640 CheckAndPutThread(TestContext ctx, Region region) {
641 super(ctx);
642 this.region = region;
645 public void doWork() throws Exception {
646 Put[] puts = new Put[1];
647 Put put = new Put(Bytes.toBytes("r1"));
648 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
649 puts[0] = put;
650 while (testStep != TestStep.PUT_COMPLETED) {
651 Thread.sleep(100);
653 testStep = TestStep.CHECKANDPUT_STARTED;
654 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
655 CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
656 testStep = TestStep.CHECKANDPUT_COMPLETED;
660 public static class MockHRegion extends HRegion {
662 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
663 final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) {
664 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
667 @Override
668 public RowLock getRowLockInternal(final byte[] row, boolean readLock) throws IOException {
669 if (testStep == TestStep.CHECKANDPUT_STARTED) {
670 latch.countDown();
672 return new WrappedRowLock(super.getRowLockInternal(row, readLock));
675 public class WrappedRowLock implements RowLock {
677 private final RowLock rowLock;
679 private WrappedRowLock(RowLock rowLock) {
680 this.rowLock = rowLock;
684 @Override
685 public void release() {
686 if (testStep == TestStep.INIT) {
687 this.rowLock.release();
688 return;
691 if (testStep == TestStep.PUT_STARTED) {
692 try {
693 testStep = TestStep.PUT_COMPLETED;
694 this.rowLock.release();
695 // put has been written to the memstore and the row lock has been released, but the
696 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
697 // operations would cause the non-atomicity to show up:
698 // 1) Put releases row lock (where we are now)
699 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
700 // because the MVCC has not advanced
701 // 3) Put advances MVCC
702 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
703 // (see below), and then wait some more to give the checkAndPut time to read the old
704 // value.
705 latch.await();
706 Thread.sleep(1000);
707 } catch (InterruptedException e) {
708 Thread.currentThread().interrupt();
711 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
712 this.rowLock.release();