HBASE-26700 The way we bypass broken track file is not enough in StoreFileListFile...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestAtomicOperation.java
bloba221b00303a83da6c0283a4ddd83cbef94520db5
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.HBaseTestingUtil.fam1;
21 import static org.apache.hadoop.hbase.HBaseTestingUtil.fam2;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25 import static org.junit.Assert.fail;
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Objects;
32 import java.util.Random;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36 import org.apache.hadoop.conf.Configuration;
37 import org.apache.hadoop.fs.FileSystem;
38 import org.apache.hadoop.fs.Path;
39 import org.apache.hadoop.hbase.Cell;
40 import org.apache.hadoop.hbase.CellUtil;
41 import org.apache.hadoop.hbase.CompareOperator;
42 import org.apache.hadoop.hbase.HBaseClassTestRule;
43 import org.apache.hadoop.hbase.HBaseTestingUtil;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.MultithreadedTestUtil;
46 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
47 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.client.Append;
50 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
51 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
52 import org.apache.hadoop.hbase.client.Delete;
53 import org.apache.hadoop.hbase.client.Durability;
54 import org.apache.hadoop.hbase.client.Get;
55 import org.apache.hadoop.hbase.client.Increment;
56 import org.apache.hadoop.hbase.client.IsolationLevel;
57 import org.apache.hadoop.hbase.client.Mutation;
58 import org.apache.hadoop.hbase.client.Put;
59 import org.apache.hadoop.hbase.client.RegionInfo;
60 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
61 import org.apache.hadoop.hbase.client.Result;
62 import org.apache.hadoop.hbase.client.RowMutations;
63 import org.apache.hadoop.hbase.client.Scan;
64 import org.apache.hadoop.hbase.client.TableDescriptor;
65 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
66 import org.apache.hadoop.hbase.filter.BinaryComparator;
67 import org.apache.hadoop.hbase.io.HeapSize;
68 import org.apache.hadoop.hbase.io.hfile.BlockCache;
69 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
70 import org.apache.hadoop.hbase.testclassification.LargeTests;
71 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
72 import org.apache.hadoop.hbase.util.Bytes;
73 import org.apache.hadoop.hbase.wal.WAL;
74 import org.junit.After;
75 import org.junit.Before;
76 import org.junit.ClassRule;
77 import org.junit.Rule;
78 import org.junit.Test;
79 import org.junit.experimental.categories.Category;
80 import org.junit.rules.TestName;
81 import org.slf4j.Logger;
82 import org.slf4j.LoggerFactory;
84 /**
85 * Testing of HRegion.incrementColumnValue, HRegion.increment,
86 * and HRegion.append
88 @Category({VerySlowRegionServerTests.class, LargeTests.class}) // Starts 100 threads
89 public class TestAtomicOperation {
91 @ClassRule
92 public static final HBaseClassTestRule CLASS_RULE =
93 HBaseClassTestRule.forClass(TestAtomicOperation.class);
95 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class);
96 @Rule public TestName name = new TestName();
98 HRegion region = null;
99 private HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
101 // Test names
102 static byte[] tableName;
103 static final byte[] qual1 = Bytes.toBytes("qual1");
104 static final byte[] qual2 = Bytes.toBytes("qual2");
105 static final byte[] qual3 = Bytes.toBytes("qual3");
106 static final byte[] value1 = Bytes.toBytes("value1");
107 static final byte[] value2 = Bytes.toBytes("value2");
108 static final byte [] row = Bytes.toBytes("rowA");
109 static final byte [] row2 = Bytes.toBytes("rowB");
111 @Before
112 public void setup() {
113 tableName = Bytes.toBytes(name.getMethodName());
116 @After
117 public void teardown() throws IOException {
118 if (region != null) {
119 CacheConfig cacheConfig = region.getStores().get(0).getCacheConfig();
120 region.close();
121 WAL wal = region.getWAL();
122 if (wal != null) {
123 wal.close();
125 cacheConfig.getBlockCache().ifPresent(BlockCache::shutdown);
126 region = null;
130 //////////////////////////////////////////////////////////////////////////////
131 // New tests that doesn't spin up a mini cluster but rather just test the
132 // individual code pieces in the HRegion.
133 //////////////////////////////////////////////////////////////////////////////
136 * Test basic append operation.
137 * More tests in
138 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend()
140 @Test
141 public void testAppend() throws IOException {
142 initHRegion(tableName, name.getMethodName(), fam1);
143 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+
144 " The Universe, and Everything";
145 String v2 = " is... 42.";
146 Append a = new Append(row);
147 a.setReturnResults(false);
148 a.addColumn(fam1, qual1, Bytes.toBytes(v1));
149 a.addColumn(fam1, qual2, Bytes.toBytes(v2));
150 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty());
151 a = new Append(row);
152 a.addColumn(fam1, qual1, Bytes.toBytes(v2));
153 a.addColumn(fam1, qual2, Bytes.toBytes(v1));
154 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
155 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1)));
156 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2)));
159 @Test
160 public void testAppendWithMultipleFamilies() throws IOException {
161 final byte[] fam3 = Bytes.toBytes("colfamily31");
162 initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3);
163 String v1 = "Appended";
164 String v2 = "Value";
166 Append a = new Append(row);
167 a.setReturnResults(false);
168 a.addColumn(fam1, qual1, Bytes.toBytes(v1));
169 a.addColumn(fam2, qual2, Bytes.toBytes(v2));
170 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
171 assertTrue("Expected an empty result but result contains " + result.size() + " keys",
172 result.isEmpty());
174 a = new Append(row);
175 a.addColumn(fam2, qual2, Bytes.toBytes(v1));
176 a.addColumn(fam1, qual1, Bytes.toBytes(v2));
177 a.addColumn(fam3, qual3, Bytes.toBytes(v2));
178 a.addColumn(fam1, qual2, Bytes.toBytes(v1));
180 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
182 byte[] actualValue1 = result.getValue(fam1, qual1);
183 byte[] actualValue2 = result.getValue(fam2, qual2);
184 byte[] actualValue3 = result.getValue(fam3, qual3);
185 byte[] actualValue4 = result.getValue(fam1, qual2);
187 assertNotNull("Value1 should bot be null", actualValue1);
188 assertNotNull("Value2 should bot be null", actualValue2);
189 assertNotNull("Value3 should bot be null", actualValue3);
190 assertNotNull("Value4 should bot be null", actualValue4);
191 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1));
192 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2));
193 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3));
194 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4));
197 @Test
198 public void testAppendWithNonExistingFamily() throws IOException {
199 initHRegion(tableName, name.getMethodName(), fam1);
200 final String v1 = "Value";
201 final Append a = new Append(row);
202 a.addColumn(fam1, qual1, Bytes.toBytes(v1));
203 a.addColumn(fam2, qual2, Bytes.toBytes(v1));
204 Result result = null;
205 try {
206 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
207 fail("Append operation should fail with NoSuchColumnFamilyException.");
208 } catch (NoSuchColumnFamilyException e) {
209 assertEquals(null, result);
210 } catch (Exception e) {
211 fail("Append operation should fail with NoSuchColumnFamilyException.");
215 @Test
216 public void testIncrementWithNonExistingFamily() throws IOException {
217 initHRegion(tableName, name.getMethodName(), fam1);
218 final Increment inc = new Increment(row);
219 inc.addColumn(fam1, qual1, 1);
220 inc.addColumn(fam2, qual2, 1);
221 inc.setDurability(Durability.ASYNC_WAL);
222 try {
223 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE);
224 } catch (NoSuchColumnFamilyException e) {
225 final Get g = new Get(row);
226 final Result result = region.get(g);
227 assertEquals(null, result.getValue(fam1, qual1));
228 assertEquals(null, result.getValue(fam2, qual2));
229 } catch (Exception e) {
230 fail("Increment operation should fail with NoSuchColumnFamilyException.");
235 * Test multi-threaded increments.
237 @Test
238 public void testIncrementMultiThreads() throws IOException {
239 boolean fast = true;
240 LOG.info("Starting test testIncrementMultiThreads");
241 // run a with mixed column families (1 and 3 versions)
242 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
244 // Create 100 threads, each will increment by its own quantity. All 100 threads update the
245 // same row over two column families.
246 int numThreads = 100;
247 int incrementsPerThread = 1000;
248 Incrementer[] all = new Incrementer[numThreads];
249 int expectedTotal = 0;
250 // create all threads
251 for (int i = 0; i < numThreads; i++) {
252 all[i] = new Incrementer(region, i, i, incrementsPerThread);
253 expectedTotal += (i * incrementsPerThread);
256 // run all threads
257 for (int i = 0; i < numThreads; i++) {
258 all[i].start();
261 // wait for all threads to finish
262 for (int i = 0; i < numThreads; i++) {
263 try {
264 all[i].join();
265 } catch (InterruptedException e) {
266 LOG.info("Ignored", e);
269 assertICV(row, fam1, qual1, expectedTotal, fast);
270 assertICV(row, fam1, qual2, expectedTotal*2, fast);
271 assertICV(row, fam2, qual3, expectedTotal*3, fast);
272 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal);
276 private void assertICV(byte [] row,
277 byte [] familiy,
278 byte[] qualifier,
279 long amount,
280 boolean fast) throws IOException {
281 // run a get and see?
282 Get get = new Get(row);
283 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
284 get.addColumn(familiy, qualifier);
285 Result result = region.get(get);
286 assertEquals(1, result.size());
288 Cell kv = result.rawCells()[0];
289 long r = Bytes.toLong(CellUtil.cloneValue(kv));
290 assertEquals(amount, r);
293 private void initHRegion (byte [] tableName, String callingMethod,
294 byte[] ... families)
295 throws IOException {
296 initHRegion(tableName, callingMethod, null, families);
299 private void initHRegion(byte[] tableName, String callingMethod, int[] maxVersions,
300 byte[]... families) throws IOException {
301 TableDescriptorBuilder builder =
302 TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
304 int i = 0;
305 for (byte[] family : families) {
306 ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(family)
307 .setMaxVersions(maxVersions != null ? maxVersions[i++] : 1).build();
308 builder.setColumnFamily(familyDescriptor);
310 TableDescriptor tableDescriptor = builder.build();
311 RegionInfo info = RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
312 region = TEST_UTIL.createLocalHRegion(info, tableDescriptor);
316 * A thread that makes increment calls always on the same row, this.row against two column
317 * families on this row.
319 public static class Incrementer extends Thread {
321 private final Region region;
322 private final int numIncrements;
323 private final int amount;
326 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) {
327 super("Incrementer." + threadNumber);
328 this.region = region;
329 this.numIncrements = numIncrements;
330 this.amount = amount;
331 setDaemon(true);
334 @Override
335 public void run() {
336 for (int i = 0; i < numIncrements; i++) {
337 try {
338 Increment inc = new Increment(row);
339 inc.addColumn(fam1, qual1, amount);
340 inc.addColumn(fam1, qual2, amount*2);
341 inc.addColumn(fam2, qual3, amount*3);
342 inc.setDurability(Durability.ASYNC_WAL);
343 Result result = region.increment(inc);
344 if (result != null) {
345 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
346 Bytes.toLong(result.getValue(fam1, qual2)));
347 assertTrue(result.getValue(fam2, qual3) != null);
348 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3,
349 Bytes.toLong(result.getValue(fam2, qual3)));
350 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2,
351 Bytes.toLong(result.getValue(fam1, qual2)));
352 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3;
353 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3));
354 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment,
355 fam1Increment, fam2Increment);
357 } catch (IOException e) {
358 e.printStackTrace();
364 @Test
365 public void testAppendMultiThreads() throws IOException {
366 LOG.info("Starting test testAppendMultiThreads");
367 // run a with mixed column families (1 and 3 versions)
368 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2);
370 int numThreads = 100;
371 int opsPerThread = 100;
372 AtomicOperation[] all = new AtomicOperation[numThreads];
373 final byte[] val = new byte[]{1};
375 AtomicInteger failures = new AtomicInteger(0);
376 // create all threads
377 for (int i = 0; i < numThreads; i++) {
378 all[i] = new AtomicOperation(region, opsPerThread, null, failures) {
379 @Override
380 public void run() {
381 for (int i=0; i<numOps; i++) {
382 try {
383 Append a = new Append(row);
384 a.addColumn(fam1, qual1, val);
385 a.addColumn(fam1, qual2, val);
386 a.addColumn(fam2, qual3, val);
387 a.setDurability(Durability.ASYNC_WAL);
388 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE);
390 Get g = new Get(row);
391 Result result = region.get(g);
392 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length);
393 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length);
394 } catch (IOException e) {
395 e.printStackTrace();
396 failures.incrementAndGet();
397 fail();
404 // run all threads
405 for (int i = 0; i < numThreads; i++) {
406 all[i].start();
409 // wait for all threads to finish
410 for (int i = 0; i < numThreads; i++) {
411 try {
412 all[i].join();
413 } catch (InterruptedException e) {
416 assertEquals(0, failures.get());
417 Get g = new Get(row);
418 Result result = region.get(g);
419 assertEquals(10000, result.getValue(fam1, qual1).length);
420 assertEquals(10000, result.getValue(fam1, qual2).length);
421 assertEquals(10000, result.getValue(fam2, qual3).length);
424 * Test multi-threaded row mutations.
426 @Test
427 public void testRowMutationMultiThreads() throws IOException {
428 LOG.info("Starting test testRowMutationMultiThreads");
429 initHRegion(tableName, name.getMethodName(), fam1);
431 // create 10 threads, each will alternate between adding and
432 // removing a column
433 int numThreads = 10;
434 int opsPerThread = 250;
435 AtomicOperation[] all = new AtomicOperation[numThreads];
437 AtomicLong timeStamps = new AtomicLong(0);
438 AtomicInteger failures = new AtomicInteger(0);
439 // create all threads
440 for (int i = 0; i < numThreads; i++) {
441 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
442 @Override
443 public void run() {
444 boolean op = true;
445 for (int i=0; i<numOps; i++) {
446 try {
447 // throw in some flushes
448 if (i%10==0) {
449 synchronized(region) {
450 LOG.debug("flushing");
451 region.flush(true);
452 if (i%100==0) {
453 region.compact(false);
457 long ts = timeStamps.incrementAndGet();
458 RowMutations rm = new RowMutations(row);
459 if (op) {
460 Put p = new Put(row, ts);
461 p.addColumn(fam1, qual1, value1);
462 p.setDurability(Durability.ASYNC_WAL);
463 rm.add(p);
464 Delete d = new Delete(row);
465 d.addColumns(fam1, qual2, ts);
466 d.setDurability(Durability.ASYNC_WAL);
467 rm.add(d);
468 } else {
469 Delete d = new Delete(row);
470 d.addColumns(fam1, qual1, ts);
471 d.setDurability(Durability.ASYNC_WAL);
472 rm.add(d);
473 Put p = new Put(row, ts);
474 p.addColumn(fam1, qual2, value2);
475 p.setDurability(Durability.ASYNC_WAL);
476 rm.add(p);
478 region.mutateRow(rm);
479 op ^= true;
480 // check: should always see exactly one column
481 Get g = new Get(row);
482 Result r = region.get(g);
483 if (r.size() != 1) {
484 LOG.debug(Objects.toString(r));
485 failures.incrementAndGet();
486 fail();
488 } catch (IOException e) {
489 e.printStackTrace();
490 failures.incrementAndGet();
491 fail();
498 // run all threads
499 for (int i = 0; i < numThreads; i++) {
500 all[i].start();
503 // wait for all threads to finish
504 for (int i = 0; i < numThreads; i++) {
505 try {
506 all[i].join();
507 } catch (InterruptedException e) {
510 assertEquals(0, failures.get());
515 * Test multi-threaded region mutations.
517 @Test
518 public void testMultiRowMutationMultiThreads() throws IOException {
520 LOG.info("Starting test testMultiRowMutationMultiThreads");
521 initHRegion(tableName, name.getMethodName(), fam1);
523 // create 10 threads, each will alternate between adding and
524 // removing a column
525 int numThreads = 10;
526 int opsPerThread = 250;
527 AtomicOperation[] all = new AtomicOperation[numThreads];
529 AtomicLong timeStamps = new AtomicLong(0);
530 AtomicInteger failures = new AtomicInteger(0);
531 final List<byte[]> rowsToLock = Arrays.asList(row, row2);
532 // create all threads
533 for (int i = 0; i < numThreads; i++) {
534 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) {
535 @Override
536 public void run() {
537 boolean op = true;
538 for (int i=0; i<numOps; i++) {
539 try {
540 // throw in some flushes
541 if (i%10==0) {
542 synchronized(region) {
543 LOG.debug("flushing");
544 region.flush(true);
545 if (i%100==0) {
546 region.compact(false);
550 long ts = timeStamps.incrementAndGet();
551 List<Mutation> mrm = new ArrayList<>();
552 if (op) {
553 Put p = new Put(row2, ts);
554 p.addColumn(fam1, qual1, value1);
555 p.setDurability(Durability.ASYNC_WAL);
556 mrm.add(p);
557 Delete d = new Delete(row);
558 d.addColumns(fam1, qual1, ts);
559 d.setDurability(Durability.ASYNC_WAL);
560 mrm.add(d);
561 } else {
562 Delete d = new Delete(row2);
563 d.addColumns(fam1, qual1, ts);
564 d.setDurability(Durability.ASYNC_WAL);
565 mrm.add(d);
566 Put p = new Put(row, ts);
567 p.setDurability(Durability.ASYNC_WAL);
568 p.addColumn(fam1, qual1, value2);
569 mrm.add(p);
571 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
572 op ^= true;
573 // check: should always see exactly one column
574 Scan s = new Scan().withStartRow(row);
575 RegionScanner rs = region.getScanner(s);
576 List<Cell> r = new ArrayList<>();
577 while (rs.next(r))
579 rs.close();
580 if (r.size() != 1) {
581 LOG.debug(Objects.toString(r));
582 failures.incrementAndGet();
583 fail();
585 } catch (IOException e) {
586 e.printStackTrace();
587 failures.incrementAndGet();
588 fail();
595 // run all threads
596 for (int i = 0; i < numThreads; i++) {
597 all[i].start();
600 // wait for all threads to finish
601 for (int i = 0; i < numThreads; i++) {
602 try {
603 all[i].join();
604 } catch (InterruptedException e) {
607 assertEquals(0, failures.get());
610 public static class AtomicOperation extends Thread {
611 protected final HRegion region;
612 protected final int numOps;
613 protected final AtomicLong timeStamps;
614 protected final AtomicInteger failures;
615 protected final Random r = new Random();
617 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps,
618 AtomicInteger failures) {
619 this.region = region;
620 this.numOps = numOps;
621 this.timeStamps = timeStamps;
622 this.failures = failures;
626 private static CountDownLatch latch = new CountDownLatch(1);
627 private enum TestStep {
628 INIT, // initial put of 10 to set value of the cell
629 PUT_STARTED, // began doing a put of 50 to cell
630 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC).
631 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11
632 CHECKANDPUT_COMPLETED // completed checkAndPut
633 // NOTE: at the end of these steps, the value of the cell should be 50, not 11!
635 private static volatile TestStep testStep = TestStep.INIT;
636 private final String family = "f1";
639 * Test written as a verifier for HBASE-7051, CheckAndPut should properly read
640 * MVCC.
642 * Moved into TestAtomicOperation from its original location, TestHBase7051
644 @Test
645 public void testPutAndCheckAndPutInParallel() throws Exception {
646 Configuration conf = TEST_UTIL.getConfiguration();
647 conf.setClass(HConstants.REGION_IMPL, MockHRegion.class, HeapSize.class);
648 TableDescriptorBuilder tableDescriptorBuilder =
649 TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()));
650 ColumnFamilyDescriptor columnFamilyDescriptor =
651 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family)).build();
652 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
653 this.region = TEST_UTIL.createLocalHRegion(tableDescriptorBuilder.build(), null, null);
654 Put[] puts = new Put[1];
655 Put put = new Put(Bytes.toBytes("r1"));
656 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("10"));
657 puts[0] = put;
659 region.batchMutate(puts);
660 MultithreadedTestUtil.TestContext ctx =
661 new MultithreadedTestUtil.TestContext(conf);
662 ctx.addThread(new PutThread(ctx, region));
663 ctx.addThread(new CheckAndPutThread(ctx, region));
664 ctx.startThreads();
665 while (testStep != TestStep.CHECKANDPUT_COMPLETED) {
666 Thread.sleep(100);
668 ctx.stop();
669 Scan s = new Scan();
670 RegionScanner scanner = region.getScanner(s);
671 List<Cell> results = new ArrayList<>();
672 ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(2).build();
673 scanner.next(results, scannerContext);
674 for (Cell keyValue : results) {
675 assertEquals("50",Bytes.toString(CellUtil.cloneValue(keyValue)));
679 private class PutThread extends TestThread {
680 private Region region;
681 PutThread(TestContext ctx, Region region) {
682 super(ctx);
683 this.region = region;
686 @Override
687 public void doWork() throws Exception {
688 Put[] puts = new Put[1];
689 Put put = new Put(Bytes.toBytes("r1"));
690 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("50"));
691 puts[0] = put;
692 testStep = TestStep.PUT_STARTED;
693 region.batchMutate(puts);
697 private class CheckAndPutThread extends TestThread {
698 private Region region;
699 CheckAndPutThread(TestContext ctx, Region region) {
700 super(ctx);
701 this.region = region;
704 @Override
705 public void doWork() throws Exception {
706 Put[] puts = new Put[1];
707 Put put = new Put(Bytes.toBytes("r1"));
708 put.addColumn(Bytes.toBytes(family), Bytes.toBytes("q1"), Bytes.toBytes("11"));
709 puts[0] = put;
710 while (testStep != TestStep.PUT_COMPLETED) {
711 Thread.sleep(100);
713 testStep = TestStep.CHECKANDPUT_STARTED;
714 region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
715 CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
716 testStep = TestStep.CHECKANDPUT_COMPLETED;
720 public static class MockHRegion extends HRegion {
722 public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf,
723 final RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) {
724 super(tableDir, log, fs, conf, regionInfo, htd, rsServices);
727 @Override
728 protected RowLock getRowLockInternal(final byte[] row, boolean readLock,
729 final RowLock prevRowlock) throws IOException {
730 if (testStep == TestStep.CHECKANDPUT_STARTED) {
731 latch.countDown();
733 return new WrappedRowLock(super.getRowLockInternal(row, readLock, null));
736 public class WrappedRowLock implements RowLock {
738 private final RowLock rowLock;
740 private WrappedRowLock(RowLock rowLock) {
741 this.rowLock = rowLock;
745 @Override
746 public void release() {
747 if (testStep == TestStep.INIT) {
748 this.rowLock.release();
749 return;
752 if (testStep == TestStep.PUT_STARTED) {
753 try {
754 testStep = TestStep.PUT_COMPLETED;
755 this.rowLock.release();
756 // put has been written to the memstore and the row lock has been released, but the
757 // MVCC has not been advanced. Prior to fixing HBASE-7051, the following order of
758 // operations would cause the non-atomicity to show up:
759 // 1) Put releases row lock (where we are now)
760 // 2) CheckAndPut grabs row lock and reads the value prior to the put (10)
761 // because the MVCC has not advanced
762 // 3) Put advances MVCC
763 // So, in order to recreate this order, we wait for the checkAndPut to grab the rowLock
764 // (see below), and then wait some more to give the checkAndPut time to read the old
765 // value.
766 latch.await();
767 Thread.sleep(1000);
768 } catch (InterruptedException e) {
769 Thread.currentThread().interrupt();
772 else if (testStep == TestStep.CHECKANDPUT_STARTED) {
773 this.rowLock.release();