2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtil
.fam1
;
21 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtil
.fam2
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertNotNull
;
24 import static org
.junit
.Assert
.assertTrue
;
25 import static org
.junit
.Assert
.fail
;
27 import java
.io
.IOException
;
28 import java
.util
.ArrayList
;
29 import java
.util
.Arrays
;
30 import java
.util
.List
;
31 import java
.util
.Objects
;
32 import java
.util
.concurrent
.CountDownLatch
;
33 import java
.util
.concurrent
.atomic
.AtomicInteger
;
34 import java
.util
.concurrent
.atomic
.AtomicLong
;
35 import org
.apache
.hadoop
.conf
.Configuration
;
36 import org
.apache
.hadoop
.fs
.FileSystem
;
37 import org
.apache
.hadoop
.fs
.Path
;
38 import org
.apache
.hadoop
.hbase
.Cell
;
39 import org
.apache
.hadoop
.hbase
.CellUtil
;
40 import org
.apache
.hadoop
.hbase
.CompareOperator
;
41 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
42 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
43 import org
.apache
.hadoop
.hbase
.HConstants
;
44 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
;
45 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestContext
;
46 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestThread
;
47 import org
.apache
.hadoop
.hbase
.TableName
;
48 import org
.apache
.hadoop
.hbase
.client
.Append
;
49 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
51 import org
.apache
.hadoop
.hbase
.client
.Delete
;
52 import org
.apache
.hadoop
.hbase
.client
.Durability
;
53 import org
.apache
.hadoop
.hbase
.client
.Get
;
54 import org
.apache
.hadoop
.hbase
.client
.Increment
;
55 import org
.apache
.hadoop
.hbase
.client
.IsolationLevel
;
56 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
57 import org
.apache
.hadoop
.hbase
.client
.Put
;
58 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
59 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
60 import org
.apache
.hadoop
.hbase
.client
.Result
;
61 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
62 import org
.apache
.hadoop
.hbase
.client
.Scan
;
63 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
64 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
65 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
66 import org
.apache
.hadoop
.hbase
.io
.HeapSize
;
67 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
68 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
69 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
70 import org
.apache
.hadoop
.hbase
.testclassification
.VerySlowRegionServerTests
;
71 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
72 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
73 import org
.junit
.After
;
74 import org
.junit
.Before
;
75 import org
.junit
.ClassRule
;
76 import org
.junit
.Rule
;
77 import org
.junit
.Test
;
78 import org
.junit
.experimental
.categories
.Category
;
79 import org
.junit
.rules
.TestName
;
80 import org
.slf4j
.Logger
;
81 import org
.slf4j
.LoggerFactory
;
84 * Testing of HRegion.incrementColumnValue, HRegion.increment,
87 @Category({VerySlowRegionServerTests
.class, LargeTests
.class}) // Starts 100 threads
88 public class TestAtomicOperation
{
91 public static final HBaseClassTestRule CLASS_RULE
=
92 HBaseClassTestRule
.forClass(TestAtomicOperation
.class);
94 private static final Logger LOG
= LoggerFactory
.getLogger(TestAtomicOperation
.class);
95 @Rule public TestName name
= new TestName();
97 HRegion region
= null;
98 private HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
101 static byte[] tableName
;
102 static final byte[] qual1
= Bytes
.toBytes("qual1");
103 static final byte[] qual2
= Bytes
.toBytes("qual2");
104 static final byte[] qual3
= Bytes
.toBytes("qual3");
105 static final byte[] value1
= Bytes
.toBytes("value1");
106 static final byte[] value2
= Bytes
.toBytes("value2");
107 static final byte [] row
= Bytes
.toBytes("rowA");
108 static final byte [] row2
= Bytes
.toBytes("rowB");
111 public void setup() {
112 tableName
= Bytes
.toBytes(name
.getMethodName());
116 public void teardown() throws IOException
{
117 if (region
!= null) {
118 CacheConfig cacheConfig
= region
.getStores().get(0).getCacheConfig();
120 WAL wal
= region
.getWAL();
124 cacheConfig
.getBlockCache().ifPresent(BlockCache
::shutdown
);
129 //////////////////////////////////////////////////////////////////////////////
130 // New tests that doesn't spin up a mini cluster but rather just test the
131 // individual code pieces in the HRegion.
132 //////////////////////////////////////////////////////////////////////////////
135 * Test basic append operation.
137 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
140 public void testAppend() throws IOException
{
141 initHRegion(tableName
, name
.getMethodName(), fam1
);
142 String v1
= "Ultimate Answer to the Ultimate Question of Life,"+
143 " The Universe, and Everything";
144 String v2
= " is... 42.";
145 Append a
= new Append(row
);
146 a
.setReturnResults(false);
147 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v1
));
148 a
.addColumn(fam1
, qual2
, Bytes
.toBytes(v2
));
149 assertTrue(region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
).isEmpty());
151 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v2
));
152 a
.addColumn(fam1
, qual2
, Bytes
.toBytes(v1
));
153 Result result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
154 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
+v2
), result
.getValue(fam1
, qual1
)));
155 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
+v1
), result
.getValue(fam1
, qual2
)));
159 public void testAppendWithMultipleFamilies() throws IOException
{
160 final byte[] fam3
= Bytes
.toBytes("colfamily31");
161 initHRegion(tableName
, name
.getMethodName(), fam1
, fam2
, fam3
);
162 String v1
= "Appended";
165 Append a
= new Append(row
);
166 a
.setReturnResults(false);
167 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v1
));
168 a
.addColumn(fam2
, qual2
, Bytes
.toBytes(v2
));
169 Result result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
170 assertTrue("Expected an empty result but result contains " + result
.size() + " keys",
174 a
.addColumn(fam2
, qual2
, Bytes
.toBytes(v1
));
175 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v2
));
176 a
.addColumn(fam3
, qual3
, Bytes
.toBytes(v2
));
177 a
.addColumn(fam1
, qual2
, Bytes
.toBytes(v1
));
179 result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
181 byte[] actualValue1
= result
.getValue(fam1
, qual1
);
182 byte[] actualValue2
= result
.getValue(fam2
, qual2
);
183 byte[] actualValue3
= result
.getValue(fam3
, qual3
);
184 byte[] actualValue4
= result
.getValue(fam1
, qual2
);
186 assertNotNull("Value1 should bot be null", actualValue1
);
187 assertNotNull("Value2 should bot be null", actualValue2
);
188 assertNotNull("Value3 should bot be null", actualValue3
);
189 assertNotNull("Value4 should bot be null", actualValue4
);
190 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
+ v2
), actualValue1
));
191 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
+ v1
), actualValue2
));
192 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
), actualValue3
));
193 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
), actualValue4
));
197 public void testAppendWithNonExistingFamily() throws IOException
{
198 initHRegion(tableName
, name
.getMethodName(), fam1
);
199 final String v1
= "Value";
200 final Append a
= new Append(row
);
201 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v1
));
202 a
.addColumn(fam2
, qual2
, Bytes
.toBytes(v1
));
203 Result result
= null;
205 result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
206 fail("Append operation should fail with NoSuchColumnFamilyException.");
207 } catch (NoSuchColumnFamilyException e
) {
208 assertEquals(null, result
);
209 } catch (Exception e
) {
210 fail("Append operation should fail with NoSuchColumnFamilyException.");
215 public void testIncrementWithNonExistingFamily() throws IOException
{
216 initHRegion(tableName
, name
.getMethodName(), fam1
);
217 final Increment inc
= new Increment(row
);
218 inc
.addColumn(fam1
, qual1
, 1);
219 inc
.addColumn(fam2
, qual2
, 1);
220 inc
.setDurability(Durability
.ASYNC_WAL
);
222 region
.increment(inc
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
223 } catch (NoSuchColumnFamilyException e
) {
224 final Get g
= new Get(row
);
225 final Result result
= region
.get(g
);
226 assertEquals(null, result
.getValue(fam1
, qual1
));
227 assertEquals(null, result
.getValue(fam2
, qual2
));
228 } catch (Exception e
) {
229 fail("Increment operation should fail with NoSuchColumnFamilyException.");
234 * Test multi-threaded increments.
237 public void testIncrementMultiThreads() throws IOException
{
239 LOG
.info("Starting test testIncrementMultiThreads");
240 // run a with mixed column families (1 and 3 versions)
241 initHRegion(tableName
, name
.getMethodName(), new int[] {1,3}, fam1
, fam2
);
243 // Create 100 threads, each will increment by its own quantity. All 100 threads update the
244 // same row over two column families.
245 int numThreads
= 100;
246 int incrementsPerThread
= 1000;
247 Incrementer
[] all
= new Incrementer
[numThreads
];
248 int expectedTotal
= 0;
249 // create all threads
250 for (int i
= 0; i
< numThreads
; i
++) {
251 all
[i
] = new Incrementer(region
, i
, i
, incrementsPerThread
);
252 expectedTotal
+= (i
* incrementsPerThread
);
256 for (int i
= 0; i
< numThreads
; i
++) {
260 // wait for all threads to finish
261 for (int i
= 0; i
< numThreads
; i
++) {
264 } catch (InterruptedException e
) {
265 LOG
.info("Ignored", e
);
268 assertICV(row
, fam1
, qual1
, expectedTotal
, fast
);
269 assertICV(row
, fam1
, qual2
, expectedTotal
*2, fast
);
270 assertICV(row
, fam2
, qual3
, expectedTotal
*3, fast
);
271 LOG
.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal
);
275 private void assertICV(byte [] row
,
279 boolean fast
) throws IOException
{
280 // run a get and see?
281 Get get
= new Get(row
);
282 if (fast
) get
.setIsolationLevel(IsolationLevel
.READ_UNCOMMITTED
);
283 get
.addColumn(familiy
, qualifier
);
284 Result result
= region
.get(get
);
285 assertEquals(1, result
.size());
287 Cell kv
= result
.rawCells()[0];
288 long r
= Bytes
.toLong(CellUtil
.cloneValue(kv
));
289 assertEquals(amount
, r
);
292 private void initHRegion (byte [] tableName
, String callingMethod
,
295 initHRegion(tableName
, callingMethod
, null, families
);
298 private void initHRegion(byte[] tableName
, String callingMethod
, int[] maxVersions
,
299 byte[]... families
) throws IOException
{
300 TableDescriptorBuilder builder
=
301 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(tableName
));
304 for (byte[] family
: families
) {
305 ColumnFamilyDescriptor familyDescriptor
= ColumnFamilyDescriptorBuilder
.newBuilder(family
)
306 .setMaxVersions(maxVersions
!= null ? maxVersions
[i
++] : 1).build();
307 builder
.setColumnFamily(familyDescriptor
);
309 TableDescriptor tableDescriptor
= builder
.build();
310 RegionInfo info
= RegionInfoBuilder
.newBuilder(tableDescriptor
.getTableName()).build();
311 region
= TEST_UTIL
.createLocalHRegion(info
, tableDescriptor
);
315 * A thread that makes increment calls always on the same row, this.row against two column
316 * families on this row.
318 public static class Incrementer
extends Thread
{
320 private final Region region
;
321 private final int numIncrements
;
322 private final int amount
;
325 public Incrementer(Region region
, int threadNumber
, int amount
, int numIncrements
) {
326 super("Incrementer." + threadNumber
);
327 this.region
= region
;
328 this.numIncrements
= numIncrements
;
329 this.amount
= amount
;
335 for (int i
= 0; i
< numIncrements
; i
++) {
337 Increment inc
= new Increment(row
);
338 inc
.addColumn(fam1
, qual1
, amount
);
339 inc
.addColumn(fam1
, qual2
, amount
*2);
340 inc
.addColumn(fam2
, qual3
, amount
*3);
341 inc
.setDurability(Durability
.ASYNC_WAL
);
342 Result result
= region
.increment(inc
);
343 if (result
!= null) {
344 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*2,
345 Bytes
.toLong(result
.getValue(fam1
, qual2
)));
346 assertTrue(result
.getValue(fam2
, qual3
) != null);
347 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*3,
348 Bytes
.toLong(result
.getValue(fam2
, qual3
)));
349 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*2,
350 Bytes
.toLong(result
.getValue(fam1
, qual2
)));
351 long fam1Increment
= Bytes
.toLong(result
.getValue(fam1
, qual1
))*3;
352 long fam2Increment
= Bytes
.toLong(result
.getValue(fam2
, qual3
));
353 assertEquals("fam1=" + fam1Increment
+ ", fam2=" + fam2Increment
,
354 fam1Increment
, fam2Increment
);
356 } catch (IOException e
) {
364 public void testAppendMultiThreads() throws IOException
{
365 LOG
.info("Starting test testAppendMultiThreads");
366 // run a with mixed column families (1 and 3 versions)
367 initHRegion(tableName
, name
.getMethodName(), new int[] {1,3}, fam1
, fam2
);
369 int numThreads
= 100;
370 int opsPerThread
= 100;
371 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
372 final byte[] val
= new byte[]{1};
374 AtomicInteger failures
= new AtomicInteger(0);
375 // create all threads
376 for (int i
= 0; i
< numThreads
; i
++) {
377 all
[i
] = new AtomicOperation(region
, opsPerThread
, null, failures
) {
380 for (int i
=0; i
<numOps
; i
++) {
382 Append a
= new Append(row
);
383 a
.addColumn(fam1
, qual1
, val
);
384 a
.addColumn(fam1
, qual2
, val
);
385 a
.addColumn(fam2
, qual3
, val
);
386 a
.setDurability(Durability
.ASYNC_WAL
);
387 region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
389 Get g
= new Get(row
);
390 Result result
= region
.get(g
);
391 assertEquals(result
.getValue(fam1
, qual1
).length
, result
.getValue(fam1
, qual2
).length
);
392 assertEquals(result
.getValue(fam1
, qual1
).length
, result
.getValue(fam2
, qual3
).length
);
393 } catch (IOException e
) {
395 failures
.incrementAndGet();
404 for (int i
= 0; i
< numThreads
; i
++) {
408 // wait for all threads to finish
409 for (int i
= 0; i
< numThreads
; i
++) {
412 } catch (InterruptedException e
) {
415 assertEquals(0, failures
.get());
416 Get g
= new Get(row
);
417 Result result
= region
.get(g
);
418 assertEquals(10000, result
.getValue(fam1
, qual1
).length
);
419 assertEquals(10000, result
.getValue(fam1
, qual2
).length
);
420 assertEquals(10000, result
.getValue(fam2
, qual3
).length
);
423 * Test multi-threaded row mutations.
426 public void testRowMutationMultiThreads() throws IOException
{
427 LOG
.info("Starting test testRowMutationMultiThreads");
428 initHRegion(tableName
, name
.getMethodName(), fam1
);
430 // create 10 threads, each will alternate between adding and
433 int opsPerThread
= 250;
434 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
436 AtomicLong timeStamps
= new AtomicLong(0);
437 AtomicInteger failures
= new AtomicInteger(0);
438 // create all threads
439 for (int i
= 0; i
< numThreads
; i
++) {
440 all
[i
] = new AtomicOperation(region
, opsPerThread
, timeStamps
, failures
) {
444 for (int i
=0; i
<numOps
; i
++) {
446 // throw in some flushes
448 synchronized(region
) {
449 LOG
.debug("flushing");
452 region
.compact(false);
456 long ts
= timeStamps
.incrementAndGet();
457 RowMutations rm
= new RowMutations(row
);
459 Put p
= new Put(row
, ts
);
460 p
.addColumn(fam1
, qual1
, value1
);
461 p
.setDurability(Durability
.ASYNC_WAL
);
463 Delete d
= new Delete(row
);
464 d
.addColumns(fam1
, qual2
, ts
);
465 d
.setDurability(Durability
.ASYNC_WAL
);
468 Delete d
= new Delete(row
);
469 d
.addColumns(fam1
, qual1
, ts
);
470 d
.setDurability(Durability
.ASYNC_WAL
);
472 Put p
= new Put(row
, ts
);
473 p
.addColumn(fam1
, qual2
, value2
);
474 p
.setDurability(Durability
.ASYNC_WAL
);
477 region
.mutateRow(rm
);
479 // check: should always see exactly one column
480 Get g
= new Get(row
);
481 Result r
= region
.get(g
);
483 LOG
.debug(Objects
.toString(r
));
484 failures
.incrementAndGet();
487 } catch (IOException e
) {
489 failures
.incrementAndGet();
498 for (int i
= 0; i
< numThreads
; i
++) {
502 // wait for all threads to finish
503 for (int i
= 0; i
< numThreads
; i
++) {
506 } catch (InterruptedException e
) {
509 assertEquals(0, failures
.get());
514 * Test multi-threaded region mutations.
517 public void testMultiRowMutationMultiThreads() throws IOException
{
519 LOG
.info("Starting test testMultiRowMutationMultiThreads");
520 initHRegion(tableName
, name
.getMethodName(), fam1
);
522 // create 10 threads, each will alternate between adding and
525 int opsPerThread
= 250;
526 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
528 AtomicLong timeStamps
= new AtomicLong(0);
529 AtomicInteger failures
= new AtomicInteger(0);
530 final List
<byte[]> rowsToLock
= Arrays
.asList(row
, row2
);
531 // create all threads
532 for (int i
= 0; i
< numThreads
; i
++) {
533 all
[i
] = new AtomicOperation(region
, opsPerThread
, timeStamps
, failures
) {
537 for (int i
=0; i
<numOps
; i
++) {
539 // throw in some flushes
541 synchronized(region
) {
542 LOG
.debug("flushing");
545 region
.compact(false);
549 long ts
= timeStamps
.incrementAndGet();
550 List
<Mutation
> mrm
= new ArrayList
<>();
552 Put p
= new Put(row2
, ts
);
553 p
.addColumn(fam1
, qual1
, value1
);
554 p
.setDurability(Durability
.ASYNC_WAL
);
556 Delete d
= new Delete(row
);
557 d
.addColumns(fam1
, qual1
, ts
);
558 d
.setDurability(Durability
.ASYNC_WAL
);
561 Delete d
= new Delete(row2
);
562 d
.addColumns(fam1
, qual1
, ts
);
563 d
.setDurability(Durability
.ASYNC_WAL
);
565 Put p
= new Put(row
, ts
);
566 p
.setDurability(Durability
.ASYNC_WAL
);
567 p
.addColumn(fam1
, qual1
, value2
);
570 region
.mutateRowsWithLocks(mrm
, rowsToLock
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
572 // check: should always see exactly one column
573 Scan s
= new Scan().withStartRow(row
);
574 RegionScanner rs
= region
.getScanner(s
);
575 List
<Cell
> r
= new ArrayList
<>();
580 LOG
.debug(Objects
.toString(r
));
581 failures
.incrementAndGet();
584 } catch (IOException e
) {
586 failures
.incrementAndGet();
595 for (int i
= 0; i
< numThreads
; i
++) {
599 // wait for all threads to finish
600 for (int i
= 0; i
< numThreads
; i
++) {
603 } catch (InterruptedException e
) {
606 assertEquals(0, failures
.get());
609 public static class AtomicOperation
extends Thread
{
610 protected final HRegion region
;
611 protected final int numOps
;
612 protected final AtomicLong timeStamps
;
613 protected final AtomicInteger failures
;
615 public AtomicOperation(HRegion region
, int numOps
, AtomicLong timeStamps
,
616 AtomicInteger failures
) {
617 this.region
= region
;
618 this.numOps
= numOps
;
619 this.timeStamps
= timeStamps
;
620 this.failures
= failures
;
624 private static CountDownLatch latch
= new CountDownLatch(1);
625 private enum TestStep
{
626 INIT
, // initial put of 10 to set value of the cell
627 PUT_STARTED
, // began doing a put of 50 to cell
628 PUT_COMPLETED
, // put complete (released RowLock, but may not have advanced MVCC).
629 CHECKANDPUT_STARTED
, // began checkAndPut: if 10 -> 11
630 CHECKANDPUT_COMPLETED
// completed checkAndPut
631 // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
633 private static volatile TestStep testStep
= TestStep
.INIT
;
634 private final String family
= "f1";
637 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
640 * Moved into TestAtomicOperation from its original location, TestHBase7051
643 public void testPutAndCheckAndPutInParallel() throws Exception
{
644 Configuration conf
= TEST_UTIL
.getConfiguration();
645 conf
.setClass(HConstants
.REGION_IMPL
, MockHRegion
.class, HeapSize
.class);
646 TableDescriptorBuilder tableDescriptorBuilder
=
647 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(name
.getMethodName()));
648 ColumnFamilyDescriptor columnFamilyDescriptor
=
649 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes(family
)).build();
650 tableDescriptorBuilder
.setColumnFamily(columnFamilyDescriptor
);
651 this.region
= TEST_UTIL
.createLocalHRegion(tableDescriptorBuilder
.build(), null, null);
652 Put
[] puts
= new Put
[1];
653 Put put
= new Put(Bytes
.toBytes("r1"));
654 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("10"));
657 region
.batchMutate(puts
);
658 MultithreadedTestUtil
.TestContext ctx
=
659 new MultithreadedTestUtil
.TestContext(conf
);
660 ctx
.addThread(new PutThread(ctx
, region
));
661 ctx
.addThread(new CheckAndPutThread(ctx
, region
));
663 while (testStep
!= TestStep
.CHECKANDPUT_COMPLETED
) {
668 RegionScanner scanner
= region
.getScanner(s
);
669 List
<Cell
> results
= new ArrayList
<>();
670 ScannerContext scannerContext
= ScannerContext
.newBuilder().setBatchLimit(2).build();
671 scanner
.next(results
, scannerContext
);
672 for (Cell keyValue
: results
) {
673 assertEquals("50",Bytes
.toString(CellUtil
.cloneValue(keyValue
)));
677 private class PutThread
extends TestThread
{
678 private Region region
;
679 PutThread(TestContext ctx
, Region region
) {
681 this.region
= region
;
685 public void doWork() throws Exception
{
686 Put
[] puts
= new Put
[1];
687 Put put
= new Put(Bytes
.toBytes("r1"));
688 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("50"));
690 testStep
= TestStep
.PUT_STARTED
;
691 region
.batchMutate(puts
);
695 private class CheckAndPutThread
extends TestThread
{
696 private Region region
;
697 CheckAndPutThread(TestContext ctx
, Region region
) {
699 this.region
= region
;
703 public void doWork() throws Exception
{
704 Put
[] puts
= new Put
[1];
705 Put put
= new Put(Bytes
.toBytes("r1"));
706 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("11"));
708 while (testStep
!= TestStep
.PUT_COMPLETED
) {
711 testStep
= TestStep
.CHECKANDPUT_STARTED
;
712 region
.checkAndMutate(Bytes
.toBytes("r1"), Bytes
.toBytes(family
), Bytes
.toBytes("q1"),
713 CompareOperator
.EQUAL
, new BinaryComparator(Bytes
.toBytes("10")), put
);
714 testStep
= TestStep
.CHECKANDPUT_COMPLETED
;
718 public static class MockHRegion
extends HRegion
{
720 public MockHRegion(Path tableDir
, WAL log
, FileSystem fs
, Configuration conf
,
721 final RegionInfo regionInfo
, final TableDescriptor htd
, RegionServerServices rsServices
) {
722 super(tableDir
, log
, fs
, conf
, regionInfo
, htd
, rsServices
);
726 protected RowLock
getRowLockInternal(final byte[] row
, boolean readLock
,
727 final RowLock prevRowlock
) throws IOException
{
728 if (testStep
== TestStep
.CHECKANDPUT_STARTED
) {
731 return new WrappedRowLock(super.getRowLockInternal(row
, readLock
, null));
734 public class WrappedRowLock
implements RowLock
{
736 private final RowLock rowLock
;
738 private WrappedRowLock(RowLock rowLock
) {
739 this.rowLock
= rowLock
;
744 public void release() {
745 if (testStep
== TestStep
.INIT
) {
746 this.rowLock
.release();
750 if (testStep
== TestStep
.PUT_STARTED
) {
752 testStep
= TestStep
.PUT_COMPLETED
;
753 this.rowLock
.release();
754 // put has been written to the memstore and the row lock has been released, but the
755 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
756 // operations would cause the non-atomicity to show up:
757 // 1) Put releases row lock (where we are now)
758 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
759 // because the MVCC has not advanced
760 // 3) Put advances MVCC
761 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
762 // (see below), and then wait some more to give the checkAndPut time to read the old
766 } catch (InterruptedException e
) {
767 Thread
.currentThread().interrupt();
770 else if (testStep
== TestStep
.CHECKANDPUT_STARTED
) {
771 this.rowLock
.release();