2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtil
.fam1
;
21 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtil
.fam2
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertNotNull
;
24 import static org
.junit
.Assert
.assertTrue
;
25 import static org
.junit
.Assert
.fail
;
27 import java
.io
.IOException
;
28 import java
.util
.ArrayList
;
29 import java
.util
.Arrays
;
30 import java
.util
.List
;
31 import java
.util
.Objects
;
32 import java
.util
.Random
;
33 import java
.util
.concurrent
.CountDownLatch
;
34 import java
.util
.concurrent
.atomic
.AtomicInteger
;
35 import java
.util
.concurrent
.atomic
.AtomicLong
;
36 import org
.apache
.hadoop
.conf
.Configuration
;
37 import org
.apache
.hadoop
.fs
.FileSystem
;
38 import org
.apache
.hadoop
.fs
.Path
;
39 import org
.apache
.hadoop
.hbase
.Cell
;
40 import org
.apache
.hadoop
.hbase
.CellUtil
;
41 import org
.apache
.hadoop
.hbase
.CompareOperator
;
42 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
43 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
44 import org
.apache
.hadoop
.hbase
.HConstants
;
45 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
;
46 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestContext
;
47 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestThread
;
48 import org
.apache
.hadoop
.hbase
.TableName
;
49 import org
.apache
.hadoop
.hbase
.client
.Append
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
51 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
52 import org
.apache
.hadoop
.hbase
.client
.Delete
;
53 import org
.apache
.hadoop
.hbase
.client
.Durability
;
54 import org
.apache
.hadoop
.hbase
.client
.Get
;
55 import org
.apache
.hadoop
.hbase
.client
.Increment
;
56 import org
.apache
.hadoop
.hbase
.client
.IsolationLevel
;
57 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
58 import org
.apache
.hadoop
.hbase
.client
.Put
;
59 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
60 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
61 import org
.apache
.hadoop
.hbase
.client
.Result
;
62 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
63 import org
.apache
.hadoop
.hbase
.client
.Scan
;
64 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
65 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
66 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
67 import org
.apache
.hadoop
.hbase
.io
.HeapSize
;
68 import org
.apache
.hadoop
.hbase
.io
.hfile
.BlockCache
;
69 import org
.apache
.hadoop
.hbase
.io
.hfile
.CacheConfig
;
70 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
71 import org
.apache
.hadoop
.hbase
.testclassification
.VerySlowRegionServerTests
;
72 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
73 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
74 import org
.junit
.After
;
75 import org
.junit
.Before
;
76 import org
.junit
.ClassRule
;
77 import org
.junit
.Rule
;
78 import org
.junit
.Test
;
79 import org
.junit
.experimental
.categories
.Category
;
80 import org
.junit
.rules
.TestName
;
81 import org
.slf4j
.Logger
;
82 import org
.slf4j
.LoggerFactory
;
85 * Testing of HRegion.incrementColumnValue, HRegion.increment,
88 @Category({VerySlowRegionServerTests
.class, LargeTests
.class}) // Starts 100 threads
89 public class TestAtomicOperation
{
92 public static final HBaseClassTestRule CLASS_RULE
=
93 HBaseClassTestRule
.forClass(TestAtomicOperation
.class);
95 private static final Logger LOG
= LoggerFactory
.getLogger(TestAtomicOperation
.class);
96 @Rule public TestName name
= new TestName();
98 HRegion region
= null;
99 private HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
102 static byte[] tableName
;
103 static final byte[] qual1
= Bytes
.toBytes("qual1");
104 static final byte[] qual2
= Bytes
.toBytes("qual2");
105 static final byte[] qual3
= Bytes
.toBytes("qual3");
106 static final byte[] value1
= Bytes
.toBytes("value1");
107 static final byte[] value2
= Bytes
.toBytes("value2");
108 static final byte [] row
= Bytes
.toBytes("rowA");
109 static final byte [] row2
= Bytes
.toBytes("rowB");
112 public void setup() {
113 tableName
= Bytes
.toBytes(name
.getMethodName());
117 public void teardown() throws IOException
{
118 if (region
!= null) {
119 CacheConfig cacheConfig
= region
.getStores().get(0).getCacheConfig();
121 WAL wal
= region
.getWAL();
125 cacheConfig
.getBlockCache().ifPresent(BlockCache
::shutdown
);
130 //////////////////////////////////////////////////////////////////////////////
131 // New tests that doesn't spin up a mini cluster but rather just test the
132 // individual code pieces in the HRegion.
133 //////////////////////////////////////////////////////////////////////////////
136 * Test basic append operation.
138 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
141 public void testAppend() throws IOException
{
142 initHRegion(tableName
, name
.getMethodName(), fam1
);
143 String v1
= "Ultimate Answer to the Ultimate Question of Life,"+
144 " The Universe, and Everything";
145 String v2
= " is... 42.";
146 Append a
= new Append(row
);
147 a
.setReturnResults(false);
148 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v1
));
149 a
.addColumn(fam1
, qual2
, Bytes
.toBytes(v2
));
150 assertTrue(region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
).isEmpty());
152 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v2
));
153 a
.addColumn(fam1
, qual2
, Bytes
.toBytes(v1
));
154 Result result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
155 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
+v2
), result
.getValue(fam1
, qual1
)));
156 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
+v1
), result
.getValue(fam1
, qual2
)));
160 public void testAppendWithMultipleFamilies() throws IOException
{
161 final byte[] fam3
= Bytes
.toBytes("colfamily31");
162 initHRegion(tableName
, name
.getMethodName(), fam1
, fam2
, fam3
);
163 String v1
= "Appended";
166 Append a
= new Append(row
);
167 a
.setReturnResults(false);
168 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v1
));
169 a
.addColumn(fam2
, qual2
, Bytes
.toBytes(v2
));
170 Result result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
171 assertTrue("Expected an empty result but result contains " + result
.size() + " keys",
175 a
.addColumn(fam2
, qual2
, Bytes
.toBytes(v1
));
176 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v2
));
177 a
.addColumn(fam3
, qual3
, Bytes
.toBytes(v2
));
178 a
.addColumn(fam1
, qual2
, Bytes
.toBytes(v1
));
180 result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
182 byte[] actualValue1
= result
.getValue(fam1
, qual1
);
183 byte[] actualValue2
= result
.getValue(fam2
, qual2
);
184 byte[] actualValue3
= result
.getValue(fam3
, qual3
);
185 byte[] actualValue4
= result
.getValue(fam1
, qual2
);
187 assertNotNull("Value1 should bot be null", actualValue1
);
188 assertNotNull("Value2 should bot be null", actualValue2
);
189 assertNotNull("Value3 should bot be null", actualValue3
);
190 assertNotNull("Value4 should bot be null", actualValue4
);
191 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
+ v2
), actualValue1
));
192 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
+ v1
), actualValue2
));
193 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v2
), actualValue3
));
194 assertEquals(0, Bytes
.compareTo(Bytes
.toBytes(v1
), actualValue4
));
198 public void testAppendWithNonExistingFamily() throws IOException
{
199 initHRegion(tableName
, name
.getMethodName(), fam1
);
200 final String v1
= "Value";
201 final Append a
= new Append(row
);
202 a
.addColumn(fam1
, qual1
, Bytes
.toBytes(v1
));
203 a
.addColumn(fam2
, qual2
, Bytes
.toBytes(v1
));
204 Result result
= null;
206 result
= region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
207 fail("Append operation should fail with NoSuchColumnFamilyException.");
208 } catch (NoSuchColumnFamilyException e
) {
209 assertEquals(null, result
);
210 } catch (Exception e
) {
211 fail("Append operation should fail with NoSuchColumnFamilyException.");
216 public void testIncrementWithNonExistingFamily() throws IOException
{
217 initHRegion(tableName
, name
.getMethodName(), fam1
);
218 final Increment inc
= new Increment(row
);
219 inc
.addColumn(fam1
, qual1
, 1);
220 inc
.addColumn(fam2
, qual2
, 1);
221 inc
.setDurability(Durability
.ASYNC_WAL
);
223 region
.increment(inc
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
224 } catch (NoSuchColumnFamilyException e
) {
225 final Get g
= new Get(row
);
226 final Result result
= region
.get(g
);
227 assertEquals(null, result
.getValue(fam1
, qual1
));
228 assertEquals(null, result
.getValue(fam2
, qual2
));
229 } catch (Exception e
) {
230 fail("Increment operation should fail with NoSuchColumnFamilyException.");
235 * Test multi-threaded increments.
238 public void testIncrementMultiThreads() throws IOException
{
240 LOG
.info("Starting test testIncrementMultiThreads");
241 // run a with mixed column families (1 and 3 versions)
242 initHRegion(tableName
, name
.getMethodName(), new int[] {1,3}, fam1
, fam2
);
244 // Create 100 threads, each will increment by its own quantity. All 100 threads update the
245 // same row over two column families.
246 int numThreads
= 100;
247 int incrementsPerThread
= 1000;
248 Incrementer
[] all
= new Incrementer
[numThreads
];
249 int expectedTotal
= 0;
250 // create all threads
251 for (int i
= 0; i
< numThreads
; i
++) {
252 all
[i
] = new Incrementer(region
, i
, i
, incrementsPerThread
);
253 expectedTotal
+= (i
* incrementsPerThread
);
257 for (int i
= 0; i
< numThreads
; i
++) {
261 // wait for all threads to finish
262 for (int i
= 0; i
< numThreads
; i
++) {
265 } catch (InterruptedException e
) {
266 LOG
.info("Ignored", e
);
269 assertICV(row
, fam1
, qual1
, expectedTotal
, fast
);
270 assertICV(row
, fam1
, qual2
, expectedTotal
*2, fast
);
271 assertICV(row
, fam2
, qual3
, expectedTotal
*3, fast
);
272 LOG
.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal
);
276 private void assertICV(byte [] row
,
280 boolean fast
) throws IOException
{
281 // run a get and see?
282 Get get
= new Get(row
);
283 if (fast
) get
.setIsolationLevel(IsolationLevel
.READ_UNCOMMITTED
);
284 get
.addColumn(familiy
, qualifier
);
285 Result result
= region
.get(get
);
286 assertEquals(1, result
.size());
288 Cell kv
= result
.rawCells()[0];
289 long r
= Bytes
.toLong(CellUtil
.cloneValue(kv
));
290 assertEquals(amount
, r
);
293 private void initHRegion (byte [] tableName
, String callingMethod
,
296 initHRegion(tableName
, callingMethod
, null, families
);
299 private void initHRegion(byte[] tableName
, String callingMethod
, int[] maxVersions
,
300 byte[]... families
) throws IOException
{
301 TableDescriptorBuilder builder
=
302 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(tableName
));
305 for (byte[] family
: families
) {
306 ColumnFamilyDescriptor familyDescriptor
= ColumnFamilyDescriptorBuilder
.newBuilder(family
)
307 .setMaxVersions(maxVersions
!= null ? maxVersions
[i
++] : 1).build();
308 builder
.setColumnFamily(familyDescriptor
);
310 TableDescriptor tableDescriptor
= builder
.build();
311 RegionInfo info
= RegionInfoBuilder
.newBuilder(tableDescriptor
.getTableName()).build();
312 region
= TEST_UTIL
.createLocalHRegion(info
, tableDescriptor
);
316 * A thread that makes increment calls always on the same row, this.row against two column
317 * families on this row.
319 public static class Incrementer
extends Thread
{
321 private final Region region
;
322 private final int numIncrements
;
323 private final int amount
;
326 public Incrementer(Region region
, int threadNumber
, int amount
, int numIncrements
) {
327 super("Incrementer." + threadNumber
);
328 this.region
= region
;
329 this.numIncrements
= numIncrements
;
330 this.amount
= amount
;
336 for (int i
= 0; i
< numIncrements
; i
++) {
338 Increment inc
= new Increment(row
);
339 inc
.addColumn(fam1
, qual1
, amount
);
340 inc
.addColumn(fam1
, qual2
, amount
*2);
341 inc
.addColumn(fam2
, qual3
, amount
*3);
342 inc
.setDurability(Durability
.ASYNC_WAL
);
343 Result result
= region
.increment(inc
);
344 if (result
!= null) {
345 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*2,
346 Bytes
.toLong(result
.getValue(fam1
, qual2
)));
347 assertTrue(result
.getValue(fam2
, qual3
) != null);
348 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*3,
349 Bytes
.toLong(result
.getValue(fam2
, qual3
)));
350 assertEquals(Bytes
.toLong(result
.getValue(fam1
, qual1
))*2,
351 Bytes
.toLong(result
.getValue(fam1
, qual2
)));
352 long fam1Increment
= Bytes
.toLong(result
.getValue(fam1
, qual1
))*3;
353 long fam2Increment
= Bytes
.toLong(result
.getValue(fam2
, qual3
));
354 assertEquals("fam1=" + fam1Increment
+ ", fam2=" + fam2Increment
,
355 fam1Increment
, fam2Increment
);
357 } catch (IOException e
) {
365 public void testAppendMultiThreads() throws IOException
{
366 LOG
.info("Starting test testAppendMultiThreads");
367 // run a with mixed column families (1 and 3 versions)
368 initHRegion(tableName
, name
.getMethodName(), new int[] {1,3}, fam1
, fam2
);
370 int numThreads
= 100;
371 int opsPerThread
= 100;
372 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
373 final byte[] val
= new byte[]{1};
375 AtomicInteger failures
= new AtomicInteger(0);
376 // create all threads
377 for (int i
= 0; i
< numThreads
; i
++) {
378 all
[i
] = new AtomicOperation(region
, opsPerThread
, null, failures
) {
381 for (int i
=0; i
<numOps
; i
++) {
383 Append a
= new Append(row
);
384 a
.addColumn(fam1
, qual1
, val
);
385 a
.addColumn(fam1
, qual2
, val
);
386 a
.addColumn(fam2
, qual3
, val
);
387 a
.setDurability(Durability
.ASYNC_WAL
);
388 region
.append(a
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
390 Get g
= new Get(row
);
391 Result result
= region
.get(g
);
392 assertEquals(result
.getValue(fam1
, qual1
).length
, result
.getValue(fam1
, qual2
).length
);
393 assertEquals(result
.getValue(fam1
, qual1
).length
, result
.getValue(fam2
, qual3
).length
);
394 } catch (IOException e
) {
396 failures
.incrementAndGet();
405 for (int i
= 0; i
< numThreads
; i
++) {
409 // wait for all threads to finish
410 for (int i
= 0; i
< numThreads
; i
++) {
413 } catch (InterruptedException e
) {
416 assertEquals(0, failures
.get());
417 Get g
= new Get(row
);
418 Result result
= region
.get(g
);
419 assertEquals(10000, result
.getValue(fam1
, qual1
).length
);
420 assertEquals(10000, result
.getValue(fam1
, qual2
).length
);
421 assertEquals(10000, result
.getValue(fam2
, qual3
).length
);
424 * Test multi-threaded row mutations.
427 public void testRowMutationMultiThreads() throws IOException
{
428 LOG
.info("Starting test testRowMutationMultiThreads");
429 initHRegion(tableName
, name
.getMethodName(), fam1
);
431 // create 10 threads, each will alternate between adding and
434 int opsPerThread
= 250;
435 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
437 AtomicLong timeStamps
= new AtomicLong(0);
438 AtomicInteger failures
= new AtomicInteger(0);
439 // create all threads
440 for (int i
= 0; i
< numThreads
; i
++) {
441 all
[i
] = new AtomicOperation(region
, opsPerThread
, timeStamps
, failures
) {
445 for (int i
=0; i
<numOps
; i
++) {
447 // throw in some flushes
449 synchronized(region
) {
450 LOG
.debug("flushing");
453 region
.compact(false);
457 long ts
= timeStamps
.incrementAndGet();
458 RowMutations rm
= new RowMutations(row
);
460 Put p
= new Put(row
, ts
);
461 p
.addColumn(fam1
, qual1
, value1
);
462 p
.setDurability(Durability
.ASYNC_WAL
);
464 Delete d
= new Delete(row
);
465 d
.addColumns(fam1
, qual2
, ts
);
466 d
.setDurability(Durability
.ASYNC_WAL
);
469 Delete d
= new Delete(row
);
470 d
.addColumns(fam1
, qual1
, ts
);
471 d
.setDurability(Durability
.ASYNC_WAL
);
473 Put p
= new Put(row
, ts
);
474 p
.addColumn(fam1
, qual2
, value2
);
475 p
.setDurability(Durability
.ASYNC_WAL
);
478 region
.mutateRow(rm
);
480 // check: should always see exactly one column
481 Get g
= new Get(row
);
482 Result r
= region
.get(g
);
484 LOG
.debug(Objects
.toString(r
));
485 failures
.incrementAndGet();
488 } catch (IOException e
) {
490 failures
.incrementAndGet();
499 for (int i
= 0; i
< numThreads
; i
++) {
503 // wait for all threads to finish
504 for (int i
= 0; i
< numThreads
; i
++) {
507 } catch (InterruptedException e
) {
510 assertEquals(0, failures
.get());
515 * Test multi-threaded region mutations.
518 public void testMultiRowMutationMultiThreads() throws IOException
{
520 LOG
.info("Starting test testMultiRowMutationMultiThreads");
521 initHRegion(tableName
, name
.getMethodName(), fam1
);
523 // create 10 threads, each will alternate between adding and
526 int opsPerThread
= 250;
527 AtomicOperation
[] all
= new AtomicOperation
[numThreads
];
529 AtomicLong timeStamps
= new AtomicLong(0);
530 AtomicInteger failures
= new AtomicInteger(0);
531 final List
<byte[]> rowsToLock
= Arrays
.asList(row
, row2
);
532 // create all threads
533 for (int i
= 0; i
< numThreads
; i
++) {
534 all
[i
] = new AtomicOperation(region
, opsPerThread
, timeStamps
, failures
) {
538 for (int i
=0; i
<numOps
; i
++) {
540 // throw in some flushes
542 synchronized(region
) {
543 LOG
.debug("flushing");
546 region
.compact(false);
550 long ts
= timeStamps
.incrementAndGet();
551 List
<Mutation
> mrm
= new ArrayList
<>();
553 Put p
= new Put(row2
, ts
);
554 p
.addColumn(fam1
, qual1
, value1
);
555 p
.setDurability(Durability
.ASYNC_WAL
);
557 Delete d
= new Delete(row
);
558 d
.addColumns(fam1
, qual1
, ts
);
559 d
.setDurability(Durability
.ASYNC_WAL
);
562 Delete d
= new Delete(row2
);
563 d
.addColumns(fam1
, qual1
, ts
);
564 d
.setDurability(Durability
.ASYNC_WAL
);
566 Put p
= new Put(row
, ts
);
567 p
.setDurability(Durability
.ASYNC_WAL
);
568 p
.addColumn(fam1
, qual1
, value2
);
571 region
.mutateRowsWithLocks(mrm
, rowsToLock
, HConstants
.NO_NONCE
, HConstants
.NO_NONCE
);
573 // check: should always see exactly one column
574 Scan s
= new Scan().withStartRow(row
);
575 RegionScanner rs
= region
.getScanner(s
);
576 List
<Cell
> r
= new ArrayList
<>();
581 LOG
.debug(Objects
.toString(r
));
582 failures
.incrementAndGet();
585 } catch (IOException e
) {
587 failures
.incrementAndGet();
596 for (int i
= 0; i
< numThreads
; i
++) {
600 // wait for all threads to finish
601 for (int i
= 0; i
< numThreads
; i
++) {
604 } catch (InterruptedException e
) {
607 assertEquals(0, failures
.get());
610 public static class AtomicOperation
extends Thread
{
611 protected final HRegion region
;
612 protected final int numOps
;
613 protected final AtomicLong timeStamps
;
614 protected final AtomicInteger failures
;
615 protected final Random r
= new Random();
617 public AtomicOperation(HRegion region
, int numOps
, AtomicLong timeStamps
,
618 AtomicInteger failures
) {
619 this.region
= region
;
620 this.numOps
= numOps
;
621 this.timeStamps
= timeStamps
;
622 this.failures
= failures
;
626 private static CountDownLatch latch
= new CountDownLatch(1);
627 private enum TestStep
{
628 INIT
, // initial put of 10 to set value of the cell
629 PUT_STARTED
, // began doing a put of 50 to cell
630 PUT_COMPLETED
, // put complete (released RowLock, but may not have advanced MVCC).
631 CHECKANDPUT_STARTED
, // began checkAndPut: if 10 -> 11
632 CHECKANDPUT_COMPLETED
// completed checkAndPut
633 // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
635 private static volatile TestStep testStep
= TestStep
.INIT
;
636 private final String family
= "f1";
639 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
642 * Moved into TestAtomicOperation from its original location, TestHBase7051
645 public void testPutAndCheckAndPutInParallel() throws Exception
{
646 Configuration conf
= TEST_UTIL
.getConfiguration();
647 conf
.setClass(HConstants
.REGION_IMPL
, MockHRegion
.class, HeapSize
.class);
648 TableDescriptorBuilder tableDescriptorBuilder
=
649 TableDescriptorBuilder
.newBuilder(TableName
.valueOf(name
.getMethodName()));
650 ColumnFamilyDescriptor columnFamilyDescriptor
=
651 ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes(family
)).build();
652 tableDescriptorBuilder
.setColumnFamily(columnFamilyDescriptor
);
653 this.region
= TEST_UTIL
.createLocalHRegion(tableDescriptorBuilder
.build(), null, null);
654 Put
[] puts
= new Put
[1];
655 Put put
= new Put(Bytes
.toBytes("r1"));
656 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("10"));
659 region
.batchMutate(puts
);
660 MultithreadedTestUtil
.TestContext ctx
=
661 new MultithreadedTestUtil
.TestContext(conf
);
662 ctx
.addThread(new PutThread(ctx
, region
));
663 ctx
.addThread(new CheckAndPutThread(ctx
, region
));
665 while (testStep
!= TestStep
.CHECKANDPUT_COMPLETED
) {
670 RegionScanner scanner
= region
.getScanner(s
);
671 List
<Cell
> results
= new ArrayList
<>();
672 ScannerContext scannerContext
= ScannerContext
.newBuilder().setBatchLimit(2).build();
673 scanner
.next(results
, scannerContext
);
674 for (Cell keyValue
: results
) {
675 assertEquals("50",Bytes
.toString(CellUtil
.cloneValue(keyValue
)));
679 private class PutThread
extends TestThread
{
680 private Region region
;
681 PutThread(TestContext ctx
, Region region
) {
683 this.region
= region
;
687 public void doWork() throws Exception
{
688 Put
[] puts
= new Put
[1];
689 Put put
= new Put(Bytes
.toBytes("r1"));
690 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("50"));
692 testStep
= TestStep
.PUT_STARTED
;
693 region
.batchMutate(puts
);
697 private class CheckAndPutThread
extends TestThread
{
698 private Region region
;
699 CheckAndPutThread(TestContext ctx
, Region region
) {
701 this.region
= region
;
705 public void doWork() throws Exception
{
706 Put
[] puts
= new Put
[1];
707 Put put
= new Put(Bytes
.toBytes("r1"));
708 put
.addColumn(Bytes
.toBytes(family
), Bytes
.toBytes("q1"), Bytes
.toBytes("11"));
710 while (testStep
!= TestStep
.PUT_COMPLETED
) {
713 testStep
= TestStep
.CHECKANDPUT_STARTED
;
714 region
.checkAndMutate(Bytes
.toBytes("r1"), Bytes
.toBytes(family
), Bytes
.toBytes("q1"),
715 CompareOperator
.EQUAL
, new BinaryComparator(Bytes
.toBytes("10")), put
);
716 testStep
= TestStep
.CHECKANDPUT_COMPLETED
;
720 public static class MockHRegion
extends HRegion
{
722 public MockHRegion(Path tableDir
, WAL log
, FileSystem fs
, Configuration conf
,
723 final RegionInfo regionInfo
, final TableDescriptor htd
, RegionServerServices rsServices
) {
724 super(tableDir
, log
, fs
, conf
, regionInfo
, htd
, rsServices
);
728 protected RowLock
getRowLockInternal(final byte[] row
, boolean readLock
,
729 final RowLock prevRowlock
) throws IOException
{
730 if (testStep
== TestStep
.CHECKANDPUT_STARTED
) {
733 return new WrappedRowLock(super.getRowLockInternal(row
, readLock
, null));
736 public class WrappedRowLock
implements RowLock
{
738 private final RowLock rowLock
;
740 private WrappedRowLock(RowLock rowLock
) {
741 this.rowLock
= rowLock
;
746 public void release() {
747 if (testStep
== TestStep
.INIT
) {
748 this.rowLock
.release();
752 if (testStep
== TestStep
.PUT_STARTED
) {
754 testStep
= TestStep
.PUT_COMPLETED
;
755 this.rowLock
.release();
756 // put has been written to the memstore and the row lock has been released, but the
757 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
758 // operations would cause the non-atomicity to show up:
759 // 1) Put releases row lock (where we are now)
760 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
761 // because the MVCC has not advanced
762 // 3) Put advances MVCC
763 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
764 // (see below), and then wait some more to give the checkAndPut time to read the old
768 } catch (InterruptedException e
) {
769 Thread
.currentThread().interrupt();
772 else if (testStep
== TestStep
.CHECKANDPUT_STARTED
) {
773 this.rowLock
.release();