2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.List
;
25 import java
.util
.concurrent
.ThreadLocalRandom
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.fs
.FileSystem
;
28 import org
.apache
.hadoop
.hbase
.Cell
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
31 import org
.apache
.hadoop
.hbase
.HConstants
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.client
.Durability
;
34 import org
.apache
.hadoop
.hbase
.client
.Increment
;
35 import org
.apache
.hadoop
.hbase
.client
.Scan
;
36 import org
.apache
.hadoop
.hbase
.client
.TestIncrementsFromClientSide
;
37 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FSHLog
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
39 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
40 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
41 import org
.junit
.After
;
42 import org
.junit
.Before
;
43 import org
.junit
.ClassRule
;
44 import org
.junit
.Rule
;
45 import org
.junit
.Test
;
46 import org
.junit
.experimental
.categories
.Category
;
47 import org
.junit
.rules
.TestName
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
52 * Increments with some concurrency against a region to ensure we get the right answer.
53 * Test is parameterized to run the fast and slow path increments; if fast,
54 * HRegion.INCREMENT_FAST_BUT_NARROW_CONSISTENCY_KEY is true.
56 * <p>There is similar test up in TestAtomicOperation. It does a test where it has 100 threads
57 * doing increments across two column families all on one row and the increments are connected to
58 * prove atomicity on row.
60 @Category(MediumTests
.class)
61 public class TestRegionIncrement
{
64 public static final HBaseClassTestRule CLASS_RULE
=
65 HBaseClassTestRule
.forClass(TestRegionIncrement
.class);
67 private static final Logger LOG
= LoggerFactory
.getLogger(TestRegionIncrement
.class);
68 @Rule public TestName name
= new TestName();
69 private static HBaseTestingUtil TEST_UTIL
;
70 private final static byte [] INCREMENT_BYTES
= Bytes
.toBytes("increment");
71 private static final int THREAD_COUNT
= 10;
72 private static final int INCREMENT_COUNT
= 10000;
75 public void setUp() throws Exception
{
76 TEST_UTIL
= new HBaseTestingUtil();
80 public void tearDown() throws Exception
{
81 TEST_UTIL
.cleanupTestDir();
84 private HRegion
getRegion(final Configuration conf
, final String tableName
) throws IOException
{
85 FSHLog wal
= new FSHLog(FileSystem
.get(conf
), TEST_UTIL
.getDataTestDir(),
86 TEST_UTIL
.getDataTestDir().toString(), conf
);
88 ChunkCreator
.initialize(MemStoreLAB
.CHUNK_SIZE_DEFAULT
, false, 0, 0,
89 0, null, MemStoreLAB
.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT
);
90 return TEST_UTIL
.createLocalHRegion(TableName
.valueOf(tableName
), HConstants
.EMPTY_BYTE_ARRAY
,
91 HConstants
.EMPTY_BYTE_ARRAY
, conf
, false, Durability
.SKIP_WAL
, wal
, INCREMENT_BYTES
);
94 private void closeRegion(final HRegion region
) throws IOException
{
96 region
.getWAL().close();
100 public void testMVCCCausingMisRead() throws IOException
{
101 final HRegion region
= getRegion(TEST_UTIL
.getConfiguration(), this.name
.getMethodName());
110 * Increments a single cell a bunch of times.
112 private static class SingleCellIncrementer
extends Thread
{
113 private final int count
;
114 private final HRegion region
;
115 private final Increment increment
;
117 SingleCellIncrementer(final int i
, final int count
, final HRegion region
,
118 final Increment increment
) {
122 this.region
= region
;
123 this.increment
= increment
;
128 for (int i
= 0; i
< this.count
; i
++) {
130 this.region
.increment(this.increment
);
131 // LOG.info(getName() + " " + i);
132 } catch (IOException e
) {
133 throw new RuntimeException(e
);
140 * Increments a random row's Cell <code>count</code> times.
142 private static class CrossRowCellIncrementer
extends Thread
{
143 private final int count
;
144 private final HRegion region
;
145 private final Increment
[] increments
;
147 CrossRowCellIncrementer(final int i
, final int count
, final HRegion region
, final int range
) {
151 this.region
= region
;
152 this.increments
= new Increment
[range
];
153 for (int ii
= 0; ii
< range
; ii
++) {
154 this.increments
[ii
] = new Increment(Bytes
.toBytes(i
));
155 this.increments
[ii
].addColumn(INCREMENT_BYTES
, INCREMENT_BYTES
, 1);
161 for (int i
= 0; i
< this.count
; i
++) {
163 int index
= ThreadLocalRandom
.current().nextInt(0, this.increments
.length
);
164 this.region
.increment(this.increments
[index
]);
165 // LOG.info(getName() + " " + index);
166 } catch (IOException e
) {
167 throw new RuntimeException(e
);
174 * Have each thread update its own Cell. Avoid contention with another thread.
177 public void testUnContendedSingleCellIncrement()
178 throws IOException
, InterruptedException
{
179 final HRegion region
= getRegion(TEST_UTIL
.getConfiguration(),
180 TestIncrementsFromClientSide
.filterStringSoTableNameSafe(this.name
.getMethodName()));
181 long startTime
= EnvironmentEdgeManager
.currentTime();
183 SingleCellIncrementer
[] threads
= new SingleCellIncrementer
[THREAD_COUNT
];
184 for (int i
= 0; i
< threads
.length
; i
++) {
185 byte [] rowBytes
= Bytes
.toBytes(i
);
186 Increment increment
= new Increment(rowBytes
);
187 increment
.addColumn(INCREMENT_BYTES
, INCREMENT_BYTES
, 1);
188 threads
[i
] = new SingleCellIncrementer(i
, INCREMENT_COUNT
, region
, increment
);
190 for (int i
= 0; i
< threads
.length
; i
++) {
193 for (int i
= 0; i
< threads
.length
; i
++) {
196 RegionScanner regionScanner
= region
.getScanner(new Scan());
197 List
<Cell
> cells
= new ArrayList
<>(THREAD_COUNT
);
198 while(regionScanner
.next(cells
)) continue;
199 assertEquals(THREAD_COUNT
, cells
.size());
201 for (Cell cell
: cells
) total
+=
202 Bytes
.toLong(cell
.getValueArray(), cell
.getValueOffset(), cell
.getValueLength());
203 assertEquals(INCREMENT_COUNT
* THREAD_COUNT
, total
);
206 LOG
.info(this.name
.getMethodName() + " " +
207 (EnvironmentEdgeManager
.currentTime() - startTime
) + "ms");
212 * Have each thread update its own Cell. Avoid contention with another thread.
215 public void testContendedAcrossCellsIncrement() throws IOException
, InterruptedException
{
216 final HRegion region
= getRegion(TEST_UTIL
.getConfiguration(),
217 TestIncrementsFromClientSide
.filterStringSoTableNameSafe(this.name
.getMethodName()));
218 long startTime
= EnvironmentEdgeManager
.currentTime();
220 CrossRowCellIncrementer
[] threads
= new CrossRowCellIncrementer
[THREAD_COUNT
];
221 for (int i
= 0; i
< threads
.length
; i
++) {
222 threads
[i
] = new CrossRowCellIncrementer(i
, INCREMENT_COUNT
, region
, THREAD_COUNT
);
224 for (int i
= 0; i
< threads
.length
; i
++) {
227 for (int i
= 0; i
< threads
.length
; i
++) {
230 RegionScanner regionScanner
= region
.getScanner(new Scan());
231 List
<Cell
> cells
= new ArrayList
<>(100);
232 while(regionScanner
.next(cells
)) continue;
233 assertEquals(THREAD_COUNT
, cells
.size());
235 for (Cell cell
: cells
) total
+=
236 Bytes
.toLong(cell
.getValueArray(), cell
.getValueOffset(), cell
.getValueLength());
237 assertEquals(INCREMENT_COUNT
* THREAD_COUNT
, total
);
240 LOG
.info(this.name
.getMethodName() + " " +
241 (EnvironmentEdgeManager
.currentTime() - startTime
) + "ms");