2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
19 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam1
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.fam2
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertNull
;
23 import static org
.junit
.Assert
.assertTrue
;
24 import static org
.junit
.Assert
.fail
;
26 import java
.io
.IOException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Arrays
;
29 import java
.util
.List
;
30 import java
.util
.Random
;
31 import java
.util
.concurrent
.CountDownLatch
;
32 import java
.util
.concurrent
.atomic
.AtomicInteger
;
33 import java
.util
.concurrent
.atomic
.AtomicLong
;
35 import org
.apache
.commons
.logging
.Log
;
36 import org
.apache
.commons
.logging
.LogFactory
;
37 import org
.apache
.hadoop
.conf
.Configuration
;
38 import org
.apache
.hadoop
.fs
.FileSystem
;
39 import org
.apache
.hadoop
.fs
.Path
;
40 import org
.apache
.hadoop
.hbase
.Cell
;
41 import org
.apache
.hadoop
.hbase
.CellUtil
;
42 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
43 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
44 import org
.apache
.hadoop
.hbase
.HConstants
;
45 import org
.apache
.hadoop
.hbase
.HRegionInfo
;
46 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
47 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
;
48 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestContext
;
49 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestThread
;
50 import org
.apache
.hadoop
.hbase
.TableName
;
51 import org
.apache
.hadoop
.hbase
.client
.Append
;
52 import org
.apache
.hadoop
.hbase
.client
.Delete
;
53 import org
.apache
.hadoop
.hbase
.client
.Durability
;
54 import org
.apache
.hadoop
.hbase
.client
.Get
;
55 import org
.apache
.hadoop
.hbase
.client
.Increment
;
56 import org
.apache
.hadoop
.hbase
.client
.IsolationLevel
;
57 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
58 import org
.apache
.hadoop
.hbase
.client
.Put
;
59 import org
.apache
.hadoop
.hbase
.client
.Result
;
60 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
61 import org
.apache
.hadoop
.hbase
.client
.Scan
;
62 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
63 import org
.apache
.hadoop
.hbase
.filter
.CompareFilter
.CompareOp
;
64 import org
.apache
.hadoop
.hbase
.io
.HeapSize
;
65 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
67 import org
.apache
.hadoop
.hbase
.testclassification
.VerySlowRegionServerTests
;
68 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
69 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
70 import org
.junit
.After
;
71 import org
.junit
.Before
;
72 import org
.junit
.Rule
;
73 import org
.junit
.Test
;
74 import org
.junit
.experimental
.categories
.Category
;
75 import org
.junit
.rules
.TestName
;
78 * Testing of HRegion.incrementColumnValue, HRegion.increment,
81 @Category({VerySlowRegionServerTests
.class, MediumTests
.class}) // Starts 100 threads
82 public class TestAtomicOperation
{
83 private static final Log LOG
= LogFactory
.getLog(TestAtomicOperation
.class);
84 @Rule public TestName name
= new TestName();
87 private HBaseTestingUtility TEST_UTIL
= HBaseTestingUtility
.createLocalHTU();
90 static byte[] tableName
;
91 static final byte[] qual1
= Bytes
.toBytes("qual1");
92 static final byte[] qual2
= Bytes
.toBytes("qual2");
93 static final byte[] qual3
= Bytes
.toBytes("qual3");
94 static final byte[] value1
= Bytes
.toBytes("value1");
95 static final byte[] value2
= Bytes
.toBytes("value2");
96 static final byte [] row
= Bytes
.toBytes("rowA");
97 static final byte [] row2
= Bytes
.toBytes("rowB");
100 public void setup() {
101 tableName
= Bytes
.toBytes(name
.getMethodName());
105 public void teardown() throws IOException
{
106 if (region
!= null) {
107 BlockCache bc
= region
.getStores().get(0).getCacheConfig().getBlockCache();
108 ((HRegion
)region
).close();
109 WAL wal
= ((HRegion
)region
).getWAL();
110 if (wal
!= null) wal
.close();
111 if (bc
!= null) bc
.shutdown();
115 //////////////////////////////////////////////////////////////////////////////
116 // New tests that doesn't spin up a mini cluster but rather just test the
117 // individual code pieces in the HRegion.
118 //////////////////////////////////////////////////////////////////////////////
121 * Test basic append operation.
123 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
126 public void testAppend() throws IOException
{
127 initHRegion(tableName
, name
.getMethodName(), fam1
);
128 String v1
= "Ultimate Answer to the Ultimate Question of Life,"+
129 " The Universe, and Everything";
130 String v2
= " is... 42.";
131 Append a
= new Append(row
);
132 a
.setReturnResults(false);
133 a
.add(fam1
, qual1
, Bytes
.toBytes(v1
));
134 a
.add(fam1
, qual2
, Bytes
.toBytes(v2
));
135 assertTrue(region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
).isEmpty());
137 a
.add(fam1
, qual1
, Bytes
.toBytes(v2
));
138 a
.add(fam1
, qual2
, Bytes
.toBytes(v1
));
139 Result result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
140 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
+v2
), result
.getValue(fam1
, qual1
)));
141 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
+v1
), result
.getValue(fam1
, qual2
)));
145 public void testAppendWithNonExistingFamily() throws IOException
{
146 initHRegion(tableName
, name
.getMethodName(), fam1
);
147 final String v1
= "Value";
148 final Append a
= new Append(row
);
149 a
.add(fam1
, qual1
, Bytes
.toBytes(v1
));
150 a
.add(fam2
, qual2
, Bytes
.toBytes(v1
));
151 Result result
= null;
153 result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
154 fail("Append operation should fail with NoSuchColumnFamilyException.");
155 } catch (NoSuchColumnFamilyException e
) {
156 assertEquals(null, result
);
157 } catch (Exception e
) {
158 fail("Append operation should fail with NoSuchColumnFamilyException.");
163 public void testIncrementWithNonExistingFamily() throws IOException
{
164 initHRegion(tableName
, name
.getMethodName(), fam1
);
165 final Increment inc
= new Increment(row
);
166 inc
.addColumn(fam1
, qual1
, 1);
167 inc
.addColumn(fam2
, qual2
, 1);
168 inc
.setDurability(Durability
.ASYNC_WAL
);
170 region
.increment(inc
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
171 } catch (NoSuchColumnFamilyException e
) {
172 final Get g
= new Get(row
);
173 final Result result
= region
.get(g
);
174 assertEquals(null, result
.getValue(fam1
, qual1
));
175 assertEquals(null, result
.getValue(fam2
, qual2
));
176 } catch (Exception e
) {
177 fail("Increment operation should fail with NoSuchColumnFamilyException.");
182 * Test multi-threaded increments.
185 public void testIncrementMultiThreads() throws IOException
{
187 LOG
.info("Starting test testIncrementMultiThreads");
188 // run a with mixed column families (1 and 3 versions)
189 initHRegion(tableName
, name
.getMethodName(), new int[] {1,3}, fam1
, fam2
);
191 // Create 100 threads, each will increment by its own quantity. All 100 threads update the
192 // same row over two column families.
193 int numThreads
= 100;
194 int incrementsPerThread
= 1000;
195 Incrementer
[] all
= new Incrementer
[numThreads
];
196 int expectedTotal
= 0;
197 // create all threads
198 for (int i
= 0; i
< numThreads
; i
++) {
199 all
[i
] = new Incrementer(region
, i
, i
, incrementsPerThread
);
200 expectedTotal
+= (i
* incrementsPerThread
);
204 for (int i
= 0; i
< numThreads
; i
++) {
208 // wait for all threads to finish
209 for (int i
= 0; i
< numThreads
; i
++) {
212 } catch (InterruptedException e
) {
213 LOG
.info("Ignored", e
);
216 assertICV(row
, fam1
, qual1
, expectedTotal
, fast
);
217 assertICV(row
, fam1
, qual2
, expectedTotal
*2, fast
);
218 assertICV(row
, fam2
, qual3
, expectedTotal
*3, fast
);
219 LOG
.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal
);
223 private void assertICV(byte [] row
,
227 boolean fast
) throws IOException
{
228 // run a get and see?
229 Get get
= new Get(row
);
230 if (fast
) get
.setIsolationLevel(IsolationLevel
.READ_UNCOMMITTED
);
231 get
.addColumn(familiy
, qualifier
);
232 Result result
= region
.get(get
);
233 assertEquals(1, result
.size());
235 Cell kv
= result
.rawCells()[0];
236 long r
= Bytes
.toLong(CellUtil
.cloneValue(kv
));
237 assertEquals(amount
, r
);
240 private void initHRegion (byte [] tableName
, String callingMethod
,
243 initHRegion(tableName
, callingMethod
, null, families
);
246 private void initHRegion (byte [] tableName
, String callingMethod
, int [] maxVersions
,
249 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(tableName
));
251 for(byte [] family
: families
) {
252 HColumnDescriptor hcd
= new HColumnDescriptor(family
);
253 hcd
.setMaxVersions(maxVersions
!= null ? maxVersions
[i
++] : 1);
256 HRegionInfo info
= new HRegionInfo(htd
.getTableName(), null, null, false);
257 region
= TEST_UTIL
.createLocalHRegion(info
, htd
);
261 * A thread that makes increment calls always on the same row, this.row against two column
262 * families on this row.
264 public static class Incrementer
extends Thread
{
266 private final Region region
;
267 private final int numIncrements
;
268 private final int amount
;
271 public Incrementer(Region region
, int threadNumber
, int amount
, int numIncrements
) {
272 super("Incrementer." + threadNumber
);
273 this.region
= region
;
274 this.numIncrements
= numIncrements
;
275 this.amount
= amount
;
281 for (int i
= 0; i
< numIncrements
; i
++) {
283 Increment inc
= new Increment(row
);
284 inc
.addColumn(fam1
, qual1
, amount
);
285 inc
.addColumn(fam1
, qual2
, amount
*2);
286 inc
.addColumn(fam2
, qual3
, amount
*3);
287 inc
.setDurability(Durability
.ASYNC_WAL
);
288 Result result
= region
.increment(inc
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
289 if (result
!= null) {
290 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*2,
291 Bytes
.toLong(result
.getValue(fam1
, qual2
)));
292 assertTrue(result
.getValue(fam2
, qual3
) != null);
293 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*3,
294 Bytes
.toLong(result
.getValue(fam2
, qual3
)));
295 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*2,
296 Bytes
.toLong(result
.getValue(fam1
, qual2
)));
297 long fam1Increment
= Bytes
.toLong(result
.getValue(fam1
, qual1
))*3;
298 long fam2Increment
= Bytes
.toLong(result
.getValue(fam2
, qual3
));
299 assertEquals("fam1=" + fam1Increment
+ ", fam2=" + fam2Increment
,
300 fam1Increment
, fam2Increment
);
302 } catch (IOException e
) {
310 public void testAppendMultiThreads() throws IOException
{
311 LOG
.info("Starting test testAppendMultiThreads");
312 // run a with mixed column families (1 and 3 versions)
313 initHRegion(tableName
, name
.getMethodName(), new int[] {1,3}, fam1
, fam2
);
315 int numThreads
= 100;
316 int opsPerThread
= 100;
317 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
318 final byte[] val
= new byte[]{1};
320 AtomicInteger failures
= new AtomicInteger(0);
321 // create all threads
322 for (int i
= 0; i
< numThreads
; i
++) {
323 all
[i
] = new AtomicOperation(region
, opsPerThread
, null, failures
) {
326 for (int i
=0; i
<numOps
; i
++) {
328 Append a
= new Append(row
);
329 a
.add(fam1
, qual1
, val
);
330 a
.add(fam1
, qual2
, val
);
331 a
.add(fam2
, qual3
, val
);
332 a
.setDurability(Durability
.ASYNC_WAL
);
333 region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
335 Get g
= new Get(row
);
336 Result result
= region
.get(g
);
337 assertEquals(result
.getValue(fam1
, qual1
).length
, result
.getValue(fam1
, qual2
).length
);
338 assertEquals(result
.getValue(fam1
, qual1
).length
, result
.getValue(fam2
, qual3
).length
);
339 } catch (IOException e
) {
341 failures
.incrementAndGet();
350 for (int i
= 0; i
< numThreads
; i
++) {
354 // wait for all threads to finish
355 for (int i
= 0; i
< numThreads
; i
++) {
358 } catch (InterruptedException e
) {
361 assertEquals(0, failures
.get());
362 Get g
= new Get(row
);
363 Result result
= region
.get(g
);
364 assertEquals(result
.getValue(fam1
, qual1
).length
, 10000);
365 assertEquals(result
.getValue(fam1
, qual2
).length
, 10000);
366 assertEquals(result
.getValue(fam2
, qual3
).length
, 10000);
369 * Test multi-threaded row mutations.
372 public void testRowMutationMultiThreads() throws IOException
{
373 LOG
.info("Starting test testRowMutationMultiThreads");
374 initHRegion(tableName
, name
.getMethodName(), fam1
);
376 // create 10 threads, each will alternate between adding and
379 int opsPerThread
= 250;
380 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
382 AtomicLong timeStamps
= new AtomicLong(0);
383 AtomicInteger failures
= new AtomicInteger(0);
384 // create all threads
385 for (int i
= 0; i
< numThreads
; i
++) {
386 all
[i
] = new AtomicOperation(region
, opsPerThread
, timeStamps
, failures
) {
390 for (int i
=0; i
<numOps
; i
++) {
392 // throw in some flushes
394 synchronized(region
) {
395 LOG
.debug("flushing");
398 region
.compact(false);
402 long ts
= timeStamps
.incrementAndGet();
403 RowMutations rm
= new RowMutations(row
);
405 Put p
= new Put(row
, ts
);
406 p
.addColumn(fam1
, qual1
, value1
);
407 p
.setDurability(Durability
.ASYNC_WAL
);
409 Delete d
= new Delete(row
);
410 d
.addColumns(fam1
, qual2
, ts
);
411 d
.setDurability(Durability
.ASYNC_WAL
);
414 Delete d
= new Delete(row
);
415 d
.addColumns(fam1
, qual1
, ts
);
416 d
.setDurability(Durability
.ASYNC_WAL
);
418 Put p
= new Put(row
, ts
);
419 p
.addColumn(fam1
, qual2
, value2
);
420 p
.setDurability(Durability
.ASYNC_WAL
);
423 region
.mutateRow(rm
);
425 // check: should always see exactly one column
426 Get g
= new Get(row
);
427 Result r
= region
.get(g
);
430 failures
.incrementAndGet();
433 } catch (IOException e
) {
435 failures
.incrementAndGet();
444 for (int i
= 0; i
< numThreads
; i
++) {
448 // wait for all threads to finish
449 for (int i
= 0; i
< numThreads
; i
++) {
452 } catch (InterruptedException e
) {
455 assertEquals(0, failures
.get());
460 * Test multi-threaded region mutations.
463 public void testMultiRowMutationMultiThreads() throws IOException
{
465 LOG
.info("Starting test testMultiRowMutationMultiThreads");
466 initHRegion(tableName
, name
.getMethodName(), fam1
);
468 // create 10 threads, each will alternate between adding and
471 int opsPerThread
= 250;
472 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
474 AtomicLong timeStamps
= new AtomicLong(0);
475 AtomicInteger failures
= new AtomicInteger(0);
476 final List
<byte[]> rowsToLock
= Arrays
.asList(row
, row2
);
477 // create all threads
478 for (int i
= 0; i
< numThreads
; i
++) {
479 all
[i
] = new AtomicOperation(region
, opsPerThread
, timeStamps
, failures
) {
483 for (int i
=0; i
<numOps
; i
++) {
485 // throw in some flushes
487 synchronized(region
) {
488 LOG
.debug("flushing");
491 region
.compact(false);
495 long ts
= timeStamps
.incrementAndGet();
496 List
<Mutation
> mrm
= new ArrayList
<>();
498 Put p
= new Put(row2
, ts
);
499 p
.addColumn(fam1
, qual1
, value1
);
500 p
.setDurability(Durability
.ASYNC_WAL
);
502 Delete d
= new Delete(row
);
503 d
.addColumns(fam1
, qual1
, ts
);
504 d
.setDurability(Durability
.ASYNC_WAL
);
507 Delete d
= new Delete(row2
);
508 d
.addColumns(fam1
, qual1
, ts
);
509 d
.setDurability(Durability
.ASYNC_WAL
);
511 Put p
= new Put(row
, ts
);
512 p
.setDurability(Durability
.ASYNC_WAL
);
513 p
.addColumn(fam1
, qual1
, value2
);
516 region
.mutateRowsWithLocks(mrm
, rowsToLock
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
518 // check: should always see exactly one column
519 Scan s
= new Scan(row
);
520 RegionScanner rs
= region
.getScanner(s
);
521 List
<Cell
> r
= new ArrayList
<>();
527 failures
.incrementAndGet();
530 } catch (IOException e
) {
532 failures
.incrementAndGet();
541 for (int i
= 0; i
< numThreads
; i
++) {
545 // wait for all threads to finish
546 for (int i
= 0; i
< numThreads
; i
++) {
549 } catch (InterruptedException e
) {
552 assertEquals(0, failures
.get());
555 public static class AtomicOperation
extends Thread
{
556 protected final Region region
;
557 protected final int numOps
;
558 protected final AtomicLong timeStamps
;
559 protected final AtomicInteger failures
;
560 protected final Random r
= new Random();
562 public AtomicOperation(Region region
, int numOps
, AtomicLong timeStamps
,
563 AtomicInteger failures
) {
564 this.region
= region
;
565 this.numOps
= numOps
;
566 this.timeStamps
= timeStamps
;
567 this.failures
= failures
;
571 private static CountDownLatch latch
= new CountDownLatch(1);
572 private enum TestStep
{
573 INIT
, // initial put of 10 to set value of the cell
574 PUT_STARTED
, // began doing a put of 50 to cell
575 PUT_COMPLETED
, // put complete (released RowLock, but may not have advanced MVCC).
576 CHECKANDPUT_STARTED
, // began checkAndPut: if 10 -> 11
577 CHECKANDPUT_COMPLETED
// completed checkAndPut
578 // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
580 private static volatile TestStep testStep
= TestStep
.INIT
;
581 private final String family
= "f1";
584 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
587 * Moved into TestAtomicOperation from its original location, TestHBase7051
590 public void testPutAndCheckAndPutInParallel() throws Exception
{
591 Configuration conf
= TEST_UTIL
.getConfiguration();
592 conf
.setClass(HConstants
.REGION_IMPL
, MockHRegion
.class, HeapSize
.class);
593 HTableDescriptor htd
= new HTableDescriptor(TableName
.valueOf(name
.getMethodName()))
594 .addFamily(new HColumnDescriptor(family
));
595 this.region
= TEST_UTIL
.createLocalHRegion(htd
, null, null);
596 Put
[] puts
= new Put
[1];
597 Put put
= new Put(Bytes
.toBytes("r1"));
598 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("10"));
601 region
.batchMutate(puts
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
602 MultithreadedTestUtil
.TestContext ctx
=
603 new MultithreadedTestUtil
.TestContext(conf
);
604 ctx
.addThread(new PutThread(ctx
, region
));
605 ctx
.addThread(new CheckAndPutThread(ctx
, region
));
607 while (testStep
!= TestStep
.CHECKANDPUT_COMPLETED
) {
612 RegionScanner scanner
= region
.getScanner(s
);
613 List
<Cell
> results
= new ArrayList
<>();
614 ScannerContext scannerContext
= ScannerContext
.newBuilder().setBatchLimit(2).build();
615 scanner
.next(results
, scannerContext
);
616 for (Cell keyValue
: results
) {
617 assertEquals("50",Bytes
.toString(CellUtil
.cloneValue(keyValue
)));
621 private class PutThread
extends TestThread
{
622 private Region region
;
623 PutThread(TestContext ctx
, Region region
) {
625 this.region
= region
;
628 public void doWork() throws Exception
{
629 Put
[] puts
= new Put
[1];
630 Put put
= new Put(Bytes
.toBytes("r1"));
631 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("50"));
633 testStep
= TestStep
.PUT_STARTED
;
634 region
.batchMutate(puts
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
638 private class CheckAndPutThread
extends TestThread
{
639 private Region region
;
640 CheckAndPutThread(TestContext ctx
, Region region
) {
642 this.region
= region
;
645 public void doWork() throws Exception
{
646 Put
[] puts
= new Put
[1];
647 Put put
= new Put(Bytes
.toBytes("r1"));
648 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("11"));
650 while (testStep
!= TestStep
.PUT_COMPLETED
) {
653 testStep
= TestStep
.CHECKANDPUT_STARTED
;
654 region
.checkAndMutate(Bytes
.toBytes("r1"), Bytes
.toBytes(family
), Bytes
.toBytes("q1"),
655 CompareOp
.EQUAL
, new BinaryComparator(Bytes
.toBytes("10")), put
, true);
656 testStep
= TestStep
.CHECKANDPUT_COMPLETED
;
660 public static class MockHRegion
extends HRegion
{
662 public MockHRegion(Path tableDir
, WAL log
, FileSystem fs
, Configuration conf
,
663 final HRegionInfo regionInfo
, final HTableDescriptor htd
, RegionServerServices rsServices
) {
664 super(tableDir
, log
, fs
, conf
, regionInfo
, htd
, rsServices
);
668 public RowLock
getRowLockInternal(final byte[] row
, boolean readLock
) throws IOException
{
669 if (testStep
== TestStep
.CHECKANDPUT_STARTED
) {
672 return new WrappedRowLock(super.getRowLockInternal(row
, readLock
));
675 public class WrappedRowLock
implements RowLock
{
677 private final RowLock rowLock
;
679 private WrappedRowLock(RowLock rowLock
) {
680 this.rowLock
= rowLock
;
685 public void release() {
686 if (testStep
== TestStep
.INIT
) {
687 this.rowLock
.release();
691 if (testStep
== TestStep
.PUT_STARTED
) {
693 testStep
= TestStep
.PUT_COMPLETED
;
694 this.rowLock
.release();
695 // put has been written to the memstore and the row lock has been released, but the
696 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
697 // operations would cause the non-atomicity to show up:
698 // 1) Put releases row lock (where we are now)
699 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
700 // because the MVCC has not advanced
701 // 3) Put advances MVCC
702 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
703 // (see below), and then wait some more to give the checkAndPut time to read the old
707 } catch (InterruptedException e
) {
708 Thread
.currentThread().interrupt();
711 else if (testStep
== TestStep
.CHECKANDPUT_STARTED
) {
712 this.rowLock
.release();