2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
;
20 import java
.io
.IOException
;
21 import java
.util
.List
;
22 import java
.util
.Random
;
23 import java
.util
.concurrent
.BlockingQueue
;
24 import java
.util
.concurrent
.ExecutorService
;
25 import java
.util
.concurrent
.LinkedBlockingQueue
;
26 import java
.util
.concurrent
.ThreadPoolExecutor
;
27 import java
.util
.concurrent
.TimeUnit
;
28 import java
.util
.concurrent
.atomic
.AtomicLong
;
29 import java
.util
.stream
.Stream
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.RepeatingTestThread
;
32 import org
.apache
.hadoop
.hbase
.MultithreadedTestUtil
.TestContext
;
33 import org
.apache
.hadoop
.hbase
.client
.Admin
;
34 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
35 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
36 import org
.apache
.hadoop
.hbase
.client
.Connection
;
37 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
38 import org
.apache
.hadoop
.hbase
.client
.Get
;
39 import org
.apache
.hadoop
.hbase
.client
.Put
;
40 import org
.apache
.hadoop
.hbase
.client
.Result
;
41 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
42 import org
.apache
.hadoop
.hbase
.client
.Scan
;
43 import org
.apache
.hadoop
.hbase
.client
.Table
;
44 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
45 import org
.apache
.hadoop
.hbase
.util
.AbstractHBaseTool
;
46 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
47 import org
.apache
.hadoop
.hbase
.util
.Threads
;
48 import org
.apache
.hadoop
.util
.StringUtils
;
49 import org
.apache
.hadoop
.util
.ToolRunner
;
50 import org
.apache
.yetus
.audience
.InterfaceAudience
;
51 import org
.slf4j
.Logger
;
52 import org
.slf4j
.LoggerFactory
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
54 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
57 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
58 * that reads never see partially-complete writes
60 @InterfaceAudience.Private
61 public class AcidGuaranteesTestTool
extends AbstractHBaseTool
{
63 private static final Logger LOG
= LoggerFactory
.getLogger(AcidGuaranteesTestTool
.class);
65 public static final TableName TABLE_NAME
= TableName
.valueOf("TestAcidGuarantees");
66 public static final byte[] FAMILY_A
= Bytes
.toBytes("A");
67 public static final byte[] FAMILY_B
= Bytes
.toBytes("B");
68 public static final byte[] FAMILY_C
= Bytes
.toBytes("C");
69 public static final byte[] QUALIFIER_NAME
= Bytes
.toBytes("data");
71 public static final byte[][] FAMILIES
= new byte[][] { FAMILY_A
, FAMILY_B
, FAMILY_C
};
73 public static int NUM_COLS_TO_CHECK
= 50;
75 private ExecutorService sharedPool
;
77 private long millisToRun
;
78 private int numWriters
;
79 private int numGetters
;
80 private int numScanners
;
81 private int numUniqueRows
;
82 private boolean crazyFlush
;
83 private boolean useMob
;
85 private ExecutorService
createThreadPool() {
87 int coreThreads
= 128;
89 long keepAliveTime
= 60;
90 BlockingQueue
<Runnable
> workQueue
= new LinkedBlockingQueue
<Runnable
>(
91 maxThreads
* HConstants
.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS
);
93 ThreadPoolExecutor tpe
= new ThreadPoolExecutor(coreThreads
, maxThreads
, keepAliveTime
,
94 TimeUnit
.SECONDS
, workQueue
, Threads
.newDaemonThreadFactory(toString() + "-shared"));
95 tpe
.allowCoreThreadTimeOut(true);
100 protected void addOptions() {
101 addOptWithArg("millis", "time limit in milliseconds");
102 addOptWithArg("numWriters", "number of write threads");
103 addOptWithArg("numGetters", "number of get threads");
104 addOptWithArg("numScanners", "number of scan threads");
105 addOptWithArg("numUniqueRows", "number of unique rows to test");
106 addOptNoArg("crazyFlush",
107 "if specified we will flush continuously otherwise will flush every minute");
108 addOptNoArg("useMob", "if specified we will enable mob on the first column family");
112 protected void processOptions(CommandLine cmd
) {
113 millisToRun
= getOptionAsLong(cmd
, "millis", 5000);
114 numWriters
= getOptionAsInt(cmd
, "numWriters", 50);
115 numGetters
= getOptionAsInt(cmd
, "numGetters", 2);
116 numScanners
= getOptionAsInt(cmd
, "numScanners", 2);
117 numUniqueRows
= getOptionAsInt(cmd
, "numUniqueRows", 3);
118 crazyFlush
= cmd
.hasOption("crazyFlush");
119 useMob
= cmd
.hasOption("useMob");
123 protected int doWork() throws Exception
{
124 sharedPool
= createThreadPool();
125 try (Connection conn
= ConnectionFactory
.createConnection(getConf())) {
126 runTestAtomicity(conn
.getAdmin());
128 sharedPool
.shutdown();
134 * Thread that does random full-row writes into a table.
136 public static class AtomicityWriter
extends RepeatingTestThread
{
137 Random rand
= new Random();
138 byte data
[] = new byte[10];
140 byte[][] targetFamilies
;
141 Connection connection
;
143 AtomicLong numWritten
= new AtomicLong();
145 public AtomicityWriter(TestContext ctx
, byte[][] targetRows
, byte[][] targetFamilies
,
146 ExecutorService pool
) throws IOException
{
148 this.targetRows
= targetRows
;
149 this.targetFamilies
= targetFamilies
;
150 connection
= ConnectionFactory
.createConnection(ctx
.getConf(), pool
);
151 table
= connection
.getTable(TABLE_NAME
);
155 public void doAnAction() throws Exception
{
156 // Pick a random row to write into
157 byte[] targetRow
= targetRows
[rand
.nextInt(targetRows
.length
)];
158 Put p
= new Put(targetRow
);
159 rand
.nextBytes(data
);
161 for (byte[] family
: targetFamilies
) {
162 for (int i
= 0; i
< NUM_COLS_TO_CHECK
; i
++) {
163 byte qualifier
[] = Bytes
.toBytes("col" + i
);
164 p
.addColumn(family
, qualifier
, data
);
168 numWritten
.getAndIncrement();
172 public void workDone() throws IOException
{
182 * Thread that does single-row reads in a table, looking for partially completed rows.
184 public static class AtomicGetReader
extends RepeatingTestThread
{
186 byte[][] targetFamilies
;
187 Connection connection
;
190 AtomicLong numRead
= new AtomicLong();
192 public AtomicGetReader(TestContext ctx
, byte[] targetRow
, byte[][] targetFamilies
,
193 ExecutorService pool
) throws IOException
{
195 this.targetRow
= targetRow
;
196 this.targetFamilies
= targetFamilies
;
197 connection
= ConnectionFactory
.createConnection(ctx
.getConf(), pool
);
198 table
= connection
.getTable(TABLE_NAME
);
202 public void doAnAction() throws Exception
{
203 Get g
= new Get(targetRow
);
204 Result res
= table
.get(g
);
205 byte[] gotValue
= null;
206 if (res
.getRow() == null) {
207 // Trying to verify but we didn't find the row - the writing
208 // thread probably just hasn't started writing yet, so we can
209 // ignore this action
213 for (byte[] family
: targetFamilies
) {
214 for (int i
= 0; i
< NUM_COLS_TO_CHECK
; i
++) {
215 byte qualifier
[] = Bytes
.toBytes("col" + i
);
216 byte thisValue
[] = res
.getValue(family
, qualifier
);
217 if (gotValue
!= null && !Bytes
.equals(gotValue
, thisValue
)) {
218 gotFailure(gotValue
, res
);
221 gotValue
= thisValue
;
224 numRead
.getAndIncrement();
228 public void workDone() throws IOException
{
236 private void gotFailure(byte[] expected
, Result res
) {
237 StringBuilder msg
= new StringBuilder();
238 msg
.append("Failed after ").append(numVerified
).append("!");
239 msg
.append("Expected=").append(Bytes
.toStringBinary(expected
));
240 msg
.append("Got:\n");
241 for (Cell kv
: res
.listCells()) {
242 msg
.append(kv
.toString());
243 msg
.append(" val= ");
244 msg
.append(Bytes
.toStringBinary(CellUtil
.cloneValue(kv
)));
247 throw new RuntimeException(msg
.toString());
252 * Thread that does full scans of the table looking for any partially completed rows.
254 public static class AtomicScanReader
extends RepeatingTestThread
{
255 byte[][] targetFamilies
;
257 Connection connection
;
258 AtomicLong numScans
= new AtomicLong();
259 AtomicLong numRowsScanned
= new AtomicLong();
261 public AtomicScanReader(TestContext ctx
, byte[][] targetFamilies
, ExecutorService pool
)
264 this.targetFamilies
= targetFamilies
;
265 connection
= ConnectionFactory
.createConnection(ctx
.getConf(), pool
);
266 table
= connection
.getTable(TABLE_NAME
);
270 public void doAnAction() throws Exception
{
272 for (byte[] family
: targetFamilies
) {
275 ResultScanner scanner
= table
.getScanner(s
);
277 for (Result res
: scanner
) {
278 byte[] gotValue
= null;
280 for (byte[] family
: targetFamilies
) {
281 for (int i
= 0; i
< NUM_COLS_TO_CHECK
; i
++) {
282 byte qualifier
[] = Bytes
.toBytes("col" + i
);
283 byte thisValue
[] = res
.getValue(family
, qualifier
);
284 if (gotValue
!= null && !Bytes
.equals(gotValue
, thisValue
)) {
285 gotFailure(gotValue
, res
);
287 gotValue
= thisValue
;
290 numRowsScanned
.getAndIncrement();
292 numScans
.getAndIncrement();
296 public void workDone() throws IOException
{
304 private void gotFailure(byte[] expected
, Result res
) {
305 StringBuilder msg
= new StringBuilder();
306 msg
.append("Failed after ").append(numRowsScanned
).append("!");
307 msg
.append("Expected=").append(Bytes
.toStringBinary(expected
));
308 msg
.append("Got:\n");
309 for (Cell kv
: res
.listCells()) {
310 msg
.append(kv
.toString());
311 msg
.append(" val= ");
312 msg
.append(Bytes
.toStringBinary(CellUtil
.cloneValue(kv
)));
315 throw new RuntimeException(msg
.toString());
319 private void createTableIfMissing(Admin admin
, boolean useMob
) throws IOException
{
320 if (!admin
.tableExists(TABLE_NAME
)) {
321 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(TABLE_NAME
);
322 Stream
.of(FAMILIES
).map(ColumnFamilyDescriptorBuilder
::of
)
323 .forEachOrdered(builder
::setColumnFamily
);
324 admin
.createTable(builder
.build());
326 ColumnFamilyDescriptor cfd
= admin
.getDescriptor(TABLE_NAME
).getColumnFamilies()[0];
327 if (cfd
.isMobEnabled() != useMob
) {
328 admin
.modifyColumnFamily(TABLE_NAME
, ColumnFamilyDescriptorBuilder
.newBuilder(cfd
)
329 .setMobEnabled(useMob
).setMobThreshold(4).build());
333 private void runTestAtomicity(Admin admin
) throws Exception
{
334 createTableIfMissing(admin
, useMob
);
335 TestContext ctx
= new TestContext(conf
);
337 byte rows
[][] = new byte[numUniqueRows
][];
338 for (int i
= 0; i
< numUniqueRows
; i
++) {
339 rows
[i
] = Bytes
.toBytes("test_row_" + i
);
342 List
<AtomicityWriter
> writers
= Lists
.newArrayList();
343 for (int i
= 0; i
< numWriters
; i
++) {
344 AtomicityWriter writer
= new AtomicityWriter(ctx
, rows
, FAMILIES
, sharedPool
);
346 ctx
.addThread(writer
);
349 ctx
.addThread(new RepeatingTestThread(ctx
) {
351 public void doAnAction() throws Exception
{
353 admin
.flush(TABLE_NAME
);
354 } catch (IOException ioe
) {
355 LOG
.warn("Ignoring exception while flushing: " + StringUtils
.stringifyException(ioe
));
357 // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
358 // we would flush as often as possible. On a running cluster, this isn't practical:
359 // (1) we will cause a lot of load due to all the flushing and compacting
360 // (2) we cannot change the flushing/compacting related Configuration options to try to
362 // (3) it is an unrealistic workload, since no one would actually flush that often.
363 // Therefore, let's flush every minute to have more flushes than usual, but not overload
364 // the running cluster.
371 List
<AtomicGetReader
> getters
= Lists
.newArrayList();
372 for (int i
= 0; i
< numGetters
; i
++) {
373 AtomicGetReader getter
=
374 new AtomicGetReader(ctx
, rows
[i
% numUniqueRows
], FAMILIES
, sharedPool
);
376 ctx
.addThread(getter
);
379 List
<AtomicScanReader
> scanners
= Lists
.newArrayList();
380 for (int i
= 0; i
< numScanners
; i
++) {
381 AtomicScanReader scanner
= new AtomicScanReader(ctx
, FAMILIES
, sharedPool
);
382 scanners
.add(scanner
);
383 ctx
.addThread(scanner
);
387 ctx
.waitFor(millisToRun
);
390 LOG
.info("Finished test. Writers:");
391 for (AtomicityWriter writer
: writers
) {
392 LOG
.info(" wrote " + writer
.numWritten
.get());
394 LOG
.info("Readers:");
395 for (AtomicGetReader reader
: getters
) {
396 LOG
.info(" read " + reader
.numRead
.get());
398 LOG
.info("Scanners:");
399 for (AtomicScanReader scanner
: scanners
) {
400 LOG
.info(" scanned " + scanner
.numScans
.get());
401 LOG
.info(" verified " + scanner
.numRowsScanned
.get() + " rows");
405 public static void main(String
[] args
) {
406 Configuration c
= HBaseConfiguration
.create();
409 AcidGuaranteesTestTool test
= new AcidGuaranteesTestTool();
410 status
= ToolRunner
.run(c
, test
, args
);
411 } catch (Exception e
) {
412 LOG
.error("Exiting due to error", e
);