3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import com
.codahale
.metrics
.Histogram
;
22 import com
.codahale
.metrics
.UniformReservoir
;
23 import java
.io
.IOException
;
24 import java
.io
.PrintStream
;
25 import java
.lang
.reflect
.Constructor
;
26 import java
.math
.BigDecimal
;
27 import java
.math
.MathContext
;
28 import java
.text
.DecimalFormat
;
29 import java
.text
.SimpleDateFormat
;
30 import java
.util
.ArrayList
;
31 import java
.util
.Arrays
;
32 import java
.util
.Date
;
33 import java
.util
.LinkedList
;
34 import java
.util
.Locale
;
36 import java
.util
.NoSuchElementException
;
37 import java
.util
.Queue
;
38 import java
.util
.Random
;
39 import java
.util
.TreeMap
;
40 import java
.util
.concurrent
.Callable
;
41 import java
.util
.concurrent
.ExecutionException
;
42 import java
.util
.concurrent
.ExecutorService
;
43 import java
.util
.concurrent
.Executors
;
44 import java
.util
.concurrent
.Future
;
45 import org
.apache
.commons
.lang3
.StringUtils
;
46 import org
.apache
.hadoop
.conf
.Configuration
;
47 import org
.apache
.hadoop
.conf
.Configured
;
48 import org
.apache
.hadoop
.fs
.FileSystem
;
49 import org
.apache
.hadoop
.fs
.Path
;
50 import org
.apache
.hadoop
.hbase
.client
.Admin
;
51 import org
.apache
.hadoop
.hbase
.client
.Append
;
52 import org
.apache
.hadoop
.hbase
.client
.AsyncConnection
;
53 import org
.apache
.hadoop
.hbase
.client
.AsyncTable
;
54 import org
.apache
.hadoop
.hbase
.client
.BufferedMutator
;
55 import org
.apache
.hadoop
.hbase
.client
.BufferedMutatorParams
;
56 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
57 import org
.apache
.hadoop
.hbase
.client
.Connection
;
58 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
59 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
60 import org
.apache
.hadoop
.hbase
.client
.Delete
;
61 import org
.apache
.hadoop
.hbase
.client
.Durability
;
62 import org
.apache
.hadoop
.hbase
.client
.Get
;
63 import org
.apache
.hadoop
.hbase
.client
.Increment
;
64 import org
.apache
.hadoop
.hbase
.client
.Put
;
65 import org
.apache
.hadoop
.hbase
.client
.Result
;
66 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
67 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
68 import org
.apache
.hadoop
.hbase
.client
.Scan
;
69 import org
.apache
.hadoop
.hbase
.client
.Table
;
70 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
71 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
72 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
73 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
74 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
75 import org
.apache
.hadoop
.hbase
.filter
.FilterAllFilter
;
76 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
77 import org
.apache
.hadoop
.hbase
.filter
.PageFilter
;
78 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
79 import org
.apache
.hadoop
.hbase
.filter
.WhileMatchFilter
;
80 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
81 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
;
82 import org
.apache
.hadoop
.hbase
.io
.hfile
.RandomDistribution
;
83 import org
.apache
.hadoop
.hbase
.mapreduce
.TableMapReduceUtil
;
84 import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
85 import org
.apache
.hadoop
.hbase
.regionserver
.CompactingMemStore
;
86 import org
.apache
.hadoop
.hbase
.trace
.HBaseHTraceConfiguration
;
87 import org
.apache
.hadoop
.hbase
.trace
.SpanReceiverHost
;
88 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
89 import org
.apache
.hadoop
.hbase
.util
.ByteArrayHashKey
;
90 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
91 import org
.apache
.hadoop
.hbase
.util
.GsonUtil
;
92 import org
.apache
.hadoop
.hbase
.util
.Hash
;
93 import org
.apache
.hadoop
.hbase
.util
.MurmurHash
;
94 import org
.apache
.hadoop
.hbase
.util
.Pair
;
95 import org
.apache
.hadoop
.hbase
.util
.YammerHistogramUtils
;
96 import org
.apache
.hadoop
.io
.LongWritable
;
97 import org
.apache
.hadoop
.io
.Text
;
98 import org
.apache
.hadoop
.mapreduce
.Job
;
99 import org
.apache
.hadoop
.mapreduce
.Mapper
;
100 import org
.apache
.hadoop
.mapreduce
.lib
.input
.NLineInputFormat
;
101 import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
102 import org
.apache
.hadoop
.mapreduce
.lib
.reduce
.LongSumReducer
;
103 import org
.apache
.hadoop
.util
.Tool
;
104 import org
.apache
.hadoop
.util
.ToolRunner
;
105 import org
.apache
.htrace
.core
.ProbabilitySampler
;
106 import org
.apache
.htrace
.core
.Sampler
;
107 import org
.apache
.htrace
.core
.TraceScope
;
108 import org
.apache
.yetus
.audience
.InterfaceAudience
;
109 import org
.slf4j
.Logger
;
110 import org
.slf4j
.LoggerFactory
;
112 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.MoreObjects
;
113 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
114 import org
.apache
.hbase
.thirdparty
.com
.google
.gson
.Gson
;
117 * Script used evaluating HBase performance and scalability. Runs a HBase
118 * client that steps through one of a set of hardcoded tests or 'experiments'
119 * (e.g. a random reads test, a random writes test, etc.). Pass on the
120 * command-line which test to run and how many clients are participating in
121 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 * <p>This class sets up and runs the evaluation programs described in
124 * Section 7, <i>Performance Evaluation</i>, of the <a
125 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
128 * <p>By default, runs as a mapreduce job where each mapper runs a single test
129 * client. Can also run as a non-mapreduce, multithreaded application by
130 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
131 * specified otherwise.
133 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
134 public class PerformanceEvaluation
extends Configured
implements Tool
{
135 static final String RANDOM_SEEK_SCAN
= "randomSeekScan";
136 static final String RANDOM_READ
= "randomRead";
137 static final String PE_COMMAND_SHORTNAME
= "pe";
138 private static final Logger LOG
= LoggerFactory
.getLogger(PerformanceEvaluation
.class.getName());
139 private static final Gson GSON
= GsonUtil
.createGson().create();
141 public static final String TABLE_NAME
= "TestTable";
142 public static final String FAMILY_NAME_BASE
= "info";
143 public static final byte[] FAMILY_ZERO
= Bytes
.toBytes("info0");
144 public static final byte[] COLUMN_ZERO
= Bytes
.toBytes("" + 0);
145 public static final int DEFAULT_VALUE_LENGTH
= 1000;
146 public static final int ROW_LENGTH
= 26;
148 private static final int ONE_GB
= 1024 * 1024 * 1000;
149 private static final int DEFAULT_ROWS_PER_GB
= ONE_GB
/ DEFAULT_VALUE_LENGTH
;
150 // TODO : should we make this configurable
151 private static final int TAG_LENGTH
= 256;
152 private static final DecimalFormat FMT
= new DecimalFormat("0.##");
153 private static final MathContext CXT
= MathContext
.DECIMAL64
;
154 private static final BigDecimal MS_PER_SEC
= BigDecimal
.valueOf(1000);
155 private static final BigDecimal BYTES_PER_MB
= BigDecimal
.valueOf(1024 * 1024);
156 private static final TestOptions DEFAULT_OPTS
= new TestOptions();
158 private static Map
<String
, CmdDescriptor
> COMMANDS
= new TreeMap
<>();
159 private static final Path PERF_EVAL_DIR
= new Path("performance_evaluation");
162 addCommandDescriptor(AsyncRandomReadTest
.class, "asyncRandomRead",
163 "Run async random read test");
164 addCommandDescriptor(AsyncRandomWriteTest
.class, "asyncRandomWrite",
165 "Run async random write test");
166 addCommandDescriptor(AsyncSequentialReadTest
.class, "asyncSequentialRead",
167 "Run async sequential read test");
168 addCommandDescriptor(AsyncSequentialWriteTest
.class, "asyncSequentialWrite",
169 "Run async sequential write test");
170 addCommandDescriptor(AsyncScanTest
.class, "asyncScan",
171 "Run async scan test (read every row)");
172 addCommandDescriptor(RandomReadTest
.class, RANDOM_READ
,
173 "Run random read test");
174 addCommandDescriptor(RandomSeekScanTest
.class, RANDOM_SEEK_SCAN
,
175 "Run random seek and scan 100 test");
176 addCommandDescriptor(RandomScanWithRange10Test
.class, "scanRange10",
177 "Run random seek scan with both start and stop row (max 10 rows)");
178 addCommandDescriptor(RandomScanWithRange100Test
.class, "scanRange100",
179 "Run random seek scan with both start and stop row (max 100 rows)");
180 addCommandDescriptor(RandomScanWithRange1000Test
.class, "scanRange1000",
181 "Run random seek scan with both start and stop row (max 1000 rows)");
182 addCommandDescriptor(RandomScanWithRange10000Test
.class, "scanRange10000",
183 "Run random seek scan with both start and stop row (max 10000 rows)");
184 addCommandDescriptor(RandomWriteTest
.class, "randomWrite",
185 "Run random write test");
186 addCommandDescriptor(SequentialReadTest
.class, "sequentialRead",
187 "Run sequential read test");
188 addCommandDescriptor(SequentialWriteTest
.class, "sequentialWrite",
189 "Run sequential write test");
190 addCommandDescriptor(ScanTest
.class, "scan",
191 "Run scan test (read every row)");
192 addCommandDescriptor(FilteredScanTest
.class, "filterScan",
193 "Run scan test using a filter to find a specific row based on it's value " +
194 "(make sure to use --rows=20)");
195 addCommandDescriptor(IncrementTest
.class, "increment",
196 "Increment on each row; clients overlap on keyspace so some concurrent operations");
197 addCommandDescriptor(AppendTest
.class, "append",
198 "Append on each row; clients overlap on keyspace so some concurrent operations");
199 addCommandDescriptor(CheckAndMutateTest
.class, "checkAndMutate",
200 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
201 addCommandDescriptor(CheckAndPutTest
.class, "checkAndPut",
202 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
203 addCommandDescriptor(CheckAndDeleteTest
.class, "checkAndDelete",
204 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
208 * Enum for map metrics. Keep it out here rather than inside in the Map
209 * inner-class so we can find associated properties.
211 protected static enum Counter
{
214 /** number of rows */
218 protected static class RunResult
implements Comparable
<RunResult
> {
219 public RunResult(long duration
, Histogram hist
) {
220 this.duration
= duration
;
224 public final long duration
;
225 public final Histogram hist
;
228 public String
toString() {
229 return Long
.toString(duration
);
232 @Override public int compareTo(RunResult o
) {
233 return Long
.compare(this.duration
, o
.duration
);
239 * @param conf Configuration object
241 public PerformanceEvaluation(final Configuration conf
) {
245 protected static void addCommandDescriptor(Class
<?
extends TestBase
> cmdClass
,
246 String name
, String description
) {
247 CmdDescriptor cmdDescriptor
= new CmdDescriptor(cmdClass
, name
, description
);
248 COMMANDS
.put(name
, cmdDescriptor
);
252 * Implementations can have their status set.
257 * @param msg status message
258 * @throws IOException
260 void setStatus(final String msg
) throws IOException
;
264 * MapReduce job that runs a performance evaluation client in each map task.
266 public static class EvaluationMapTask
267 extends Mapper
<LongWritable
, Text
, LongWritable
, LongWritable
> {
269 /** configuration parameter name that contains the command */
270 public final static String CMD_KEY
= "EvaluationMapTask.command";
271 /** configuration parameter name that contains the PE impl */
272 public static final String PE_KEY
= "EvaluationMapTask.performanceEvalImpl";
274 private Class
<?
extends Test
> cmd
;
277 protected void setup(Context context
) throws IOException
, InterruptedException
{
278 this.cmd
= forName(context
.getConfiguration().get(CMD_KEY
), Test
.class);
280 // this is required so that extensions of PE are instantiated within the
281 // map reduce task...
282 Class
<?
extends PerformanceEvaluation
> peClass
=
283 forName(context
.getConfiguration().get(PE_KEY
), PerformanceEvaluation
.class);
285 peClass
.getConstructor(Configuration
.class).newInstance(context
.getConfiguration());
286 } catch (Exception e
) {
287 throw new IllegalStateException("Could not instantiate PE instance", e
);
291 private <Type
> Class
<?
extends Type
> forName(String className
, Class
<Type
> type
) {
293 return Class
.forName(className
).asSubclass(type
);
294 } catch (ClassNotFoundException e
) {
295 throw new IllegalStateException("Could not find class for name: " + className
, e
);
300 protected void map(LongWritable key
, Text value
, final Context context
)
301 throws IOException
, InterruptedException
{
303 Status status
= new Status() {
305 public void setStatus(String msg
) {
306 context
.setStatus(msg
);
310 TestOptions opts
= GSON
.fromJson(value
.toString(), TestOptions
.class);
311 Configuration conf
= HBaseConfiguration
.create(context
.getConfiguration());
312 final Connection con
= ConnectionFactory
.createConnection(conf
);
313 AsyncConnection asyncCon
= null;
315 asyncCon
= ConnectionFactory
.createAsyncConnection(conf
).get();
316 } catch (ExecutionException e
) {
317 throw new IOException(e
);
321 RunResult result
= PerformanceEvaluation
.runOneClient(this.cmd
, conf
, con
, asyncCon
, opts
, status
);
322 // Collect how much time the thing took. Report as map output and
323 // to the ELAPSED_TIME counter.
324 context
.getCounter(Counter
.ELAPSED_TIME
).increment(result
.duration
);
325 context
.getCounter(Counter
.ROWS
).increment(opts
.perClientRunRows
);
326 context
.write(new LongWritable(opts
.startRow
), new LongWritable(result
.duration
));
332 * If table does not already exist, create. Also create a table when
333 * {@code opts.presplitRegions} is specified or when the existing table's
334 * region replica count doesn't match {@code opts.replicas}.
336 static boolean checkTable(Admin admin
, TestOptions opts
) throws IOException
{
337 TableName tableName
= TableName
.valueOf(opts
.tableName
);
338 boolean needsDelete
= false, exists
= admin
.tableExists(tableName
);
339 boolean isReadCmd
= opts
.cmdName
.toLowerCase(Locale
.ROOT
).contains("read")
340 || opts
.cmdName
.toLowerCase(Locale
.ROOT
).contains("scan");
341 if (!exists
&& isReadCmd
) {
342 throw new IllegalStateException(
343 "Must specify an existing table for read commands. Run a write command first.");
345 TableDescriptor desc
=
346 exists ? admin
.getDescriptor(TableName
.valueOf(opts
.tableName
)) : null;
347 byte[][] splits
= getSplits(opts
);
349 // recreate the table when user has requested presplit or when existing
350 // {RegionSplitPolicy,replica count} does not match requested, or when the
351 // number of column families does not match requested.
352 if ((exists
&& opts
.presplitRegions
!= DEFAULT_OPTS
.presplitRegions
)
353 || (!isReadCmd
&& desc
!= null &&
354 !StringUtils
.equals(desc
.getRegionSplitPolicyClassName(), opts
.splitPolicy
))
355 || (!isReadCmd
&& desc
!= null && desc
.getRegionReplication() != opts
.replicas
)
356 || (desc
!= null && desc
.getColumnFamilyCount() != opts
.families
)) {
358 // wait, why did it delete my table?!?
359 LOG
.debug(MoreObjects
.toStringHelper("needsDelete")
360 .add("needsDelete", needsDelete
)
361 .add("isReadCmd", isReadCmd
)
362 .add("exists", exists
)
364 .add("presplit", opts
.presplitRegions
)
365 .add("splitPolicy", opts
.splitPolicy
)
366 .add("replicas", opts
.replicas
)
367 .add("families", opts
.families
)
371 // remove an existing table
373 if (admin
.isTableEnabled(tableName
)) {
374 admin
.disableTable(tableName
);
376 admin
.deleteTable(tableName
);
379 // table creation is necessary
380 if (!exists
|| needsDelete
) {
381 desc
= getTableDescriptor(opts
);
382 if (splits
!= null) {
383 if (LOG
.isDebugEnabled()) {
384 for (int i
= 0; i
< splits
.length
; i
++) {
385 LOG
.debug(" split " + i
+ ": " + Bytes
.toStringBinary(splits
[i
]));
389 if (splits
!= null) {
390 admin
.createTable(desc
, splits
);
392 admin
.createTable(desc
);
394 LOG
.info("Table " + desc
+ " created");
396 return admin
.tableExists(tableName
);
400 * Create an HTableDescriptor from provided TestOptions.
402 protected static TableDescriptor
getTableDescriptor(TestOptions opts
) {
403 TableDescriptorBuilder
.ModifyableTableDescriptor tableDescriptor
=
404 new TableDescriptorBuilder
.ModifyableTableDescriptor(TableName
.valueOf(opts
.tableName
));
406 for (int family
= 0; family
< opts
.families
; family
++) {
407 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
408 ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor familyDesc
=
409 new ColumnFamilyDescriptorBuilder
.ModifyableColumnFamilyDescriptor(familyName
);
410 familyDesc
.setDataBlockEncoding(opts
.blockEncoding
);
411 familyDesc
.setCompressionType(opts
.compression
);
412 familyDesc
.setBloomFilterType(opts
.bloomType
);
413 familyDesc
.setBlocksize(opts
.blockSize
);
414 if (opts
.inMemoryCF
) {
415 familyDesc
.setInMemory(true);
417 familyDesc
.setInMemoryCompaction(opts
.inMemoryCompaction
);
418 tableDescriptor
.setColumnFamily(familyDesc
);
420 if (opts
.replicas
!= DEFAULT_OPTS
.replicas
) {
421 tableDescriptor
.setRegionReplication(opts
.replicas
);
423 if (opts
.splitPolicy
!= null && !opts
.splitPolicy
.equals(DEFAULT_OPTS
.splitPolicy
)) {
424 tableDescriptor
.setRegionSplitPolicyClassName(opts
.splitPolicy
);
426 return tableDescriptor
;
430 * generates splits based on total number of rows and specified split regions
432 protected static byte[][] getSplits(TestOptions opts
) {
433 if (opts
.presplitRegions
== DEFAULT_OPTS
.presplitRegions
)
436 int numSplitPoints
= opts
.presplitRegions
- 1;
437 byte[][] splits
= new byte[numSplitPoints
][];
438 int jump
= opts
.totalRows
/ opts
.presplitRegions
;
439 for (int i
= 0; i
< numSplitPoints
; i
++) {
440 int rowkey
= jump
* (1 + i
);
441 splits
[i
] = format(rowkey
);
446 static void setupConnectionCount(final TestOptions opts
) {
450 if (opts
.connCount
== -1) {
451 // set to thread number if connCount is not set
452 opts
.connCount
= opts
.numClientThreads
;
458 * Run all clients in this vm each to its own thread.
460 static RunResult
[] doLocalClients(final TestOptions opts
, final Configuration conf
)
461 throws IOException
, InterruptedException
, ExecutionException
{
462 final Class
<?
extends TestBase
> cmd
= determineCommandClass(opts
.cmdName
);
464 @SuppressWarnings("unchecked")
465 Future
<RunResult
>[] threads
= new Future
[opts
.numClientThreads
];
466 RunResult
[] results
= new RunResult
[opts
.numClientThreads
];
467 ExecutorService pool
= Executors
.newFixedThreadPool(opts
.numClientThreads
,
468 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
469 setupConnectionCount(opts
);
470 final Connection
[] cons
= new Connection
[opts
.connCount
];
471 final AsyncConnection
[] asyncCons
= new AsyncConnection
[opts
.connCount
];
472 for (int i
= 0; i
< opts
.connCount
; i
++) {
473 cons
[i
] = ConnectionFactory
.createConnection(conf
);
474 asyncCons
[i
] = ConnectionFactory
.createAsyncConnection(conf
).get();
476 LOG
.info("Created " + opts
.connCount
+ " connections for " +
477 opts
.numClientThreads
+ " threads");
478 for (int i
= 0; i
< threads
.length
; i
++) {
480 threads
[i
] = pool
.submit(new Callable
<RunResult
>() {
482 public RunResult
call() throws Exception
{
483 TestOptions threadOpts
= new TestOptions(opts
);
484 final Connection con
= cons
[index
% cons
.length
];
485 final AsyncConnection asyncCon
= asyncCons
[index
% asyncCons
.length
];
486 if (threadOpts
.startRow
== 0) threadOpts
.startRow
= index
* threadOpts
.perClientRunRows
;
487 RunResult run
= runOneClient(cmd
, conf
, con
, asyncCon
, threadOpts
, new Status() {
489 public void setStatus(final String msg
) throws IOException
{
493 LOG
.info("Finished " + Thread
.currentThread().getName() + " in " + run
.duration
+
494 "ms over " + threadOpts
.perClientRunRows
+ " rows");
501 for (int i
= 0; i
< threads
.length
; i
++) {
503 results
[i
] = threads
[i
].get();
504 } catch (ExecutionException e
) {
505 throw new IOException(e
.getCause());
508 final String test
= cmd
.getSimpleName();
509 LOG
.info("[" + test
+ "] Summary of timings (ms): "
510 + Arrays
.toString(results
));
511 Arrays
.sort(results
);
513 float avgLatency
= 0 ;
515 for (RunResult result
: results
) {
516 total
+= result
.duration
;
517 avgLatency
+= result
.hist
.getSnapshot().getMean();
518 avgTPS
+= opts
.perClientRunRows
* 1.0f
/ result
.duration
;
520 avgTPS
*= 1000; // ms to second
521 avgLatency
= avgLatency
/ results
.length
;
522 LOG
.info("[" + test
+ " duration ]"
523 + "\tMin: " + results
[0] + "ms"
524 + "\tMax: " + results
[results
.length
- 1] + "ms"
525 + "\tAvg: " + (total
/ results
.length
) + "ms");
526 LOG
.info("[ Avg latency (us)]\t" + Math
.round(avgLatency
));
527 LOG
.info("[ Avg TPS/QPS]\t" + Math
.round(avgTPS
) + "\t row per second");
528 for (int i
= 0; i
< opts
.connCount
; i
++) {
530 asyncCons
[i
].close();
538 * Run a mapreduce job. Run as many maps as asked-for clients.
539 * Before we start up the job, write out an input file with instruction
540 * per client regards which row they are to start on.
541 * @param cmd Command to run.
542 * @throws IOException
544 static Job
doMapReduce(TestOptions opts
, final Configuration conf
)
545 throws IOException
, InterruptedException
, ClassNotFoundException
{
546 final Class
<?
extends TestBase
> cmd
= determineCommandClass(opts
.cmdName
);
548 Path inputDir
= writeInputFile(conf
, opts
);
549 conf
.set(EvaluationMapTask
.CMD_KEY
, cmd
.getName());
550 conf
.set(EvaluationMapTask
.PE_KEY
, PerformanceEvaluation
.class.getName());
551 Job job
= Job
.getInstance(conf
);
552 job
.setJarByClass(PerformanceEvaluation
.class);
553 job
.setJobName("HBase Performance Evaluation - " + opts
.cmdName
);
555 job
.setInputFormatClass(NLineInputFormat
.class);
556 NLineInputFormat
.setInputPaths(job
, inputDir
);
557 // this is default, but be explicit about it just in case.
558 NLineInputFormat
.setNumLinesPerSplit(job
, 1);
560 job
.setOutputKeyClass(LongWritable
.class);
561 job
.setOutputValueClass(LongWritable
.class);
563 job
.setMapperClass(EvaluationMapTask
.class);
564 job
.setReducerClass(LongSumReducer
.class);
566 job
.setNumReduceTasks(1);
568 job
.setOutputFormatClass(TextOutputFormat
.class);
569 TextOutputFormat
.setOutputPath(job
, new Path(inputDir
.getParent(), "outputs"));
571 TableMapReduceUtil
.addDependencyJars(job
);
572 TableMapReduceUtil
.addDependencyJarsForClasses(job
.getConfiguration(),
573 Histogram
.class, // yammer metrics
575 FilterAllFilter
.class // hbase-server tests jar
578 TableMapReduceUtil
.initCredentials(job
);
580 job
.waitForCompletion(true);
585 * Each client has one mapper to do the work, and client do the resulting count in a map task.
588 static String JOB_INPUT_FILENAME
= "input.txt";
591 * Write input file of offsets-per-client for the mapreduce job.
592 * @param c Configuration
593 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
594 * @throws IOException
596 static Path
writeInputFile(final Configuration c
, final TestOptions opts
) throws IOException
{
597 return writeInputFile(c
, opts
, new Path("."));
600 static Path
writeInputFile(final Configuration c
, final TestOptions opts
, final Path basedir
)
602 SimpleDateFormat formatter
= new SimpleDateFormat("yyyyMMddHHmmss");
603 Path jobdir
= new Path(new Path(basedir
, PERF_EVAL_DIR
), formatter
.format(new Date()));
604 Path inputDir
= new Path(jobdir
, "inputs");
606 FileSystem fs
= FileSystem
.get(c
);
609 Path inputFile
= new Path(inputDir
, JOB_INPUT_FILENAME
);
610 PrintStream out
= new PrintStream(fs
.create(inputFile
));
611 // Make input random.
612 Map
<Integer
, String
> m
= new TreeMap
<>();
613 Hash h
= MurmurHash
.getInstance();
614 int perClientRows
= (opts
.totalRows
/ opts
.numClientThreads
);
616 for (int j
= 0; j
< opts
.numClientThreads
; j
++) {
617 TestOptions next
= new TestOptions(opts
);
618 next
.startRow
= j
* perClientRows
;
619 next
.perClientRunRows
= perClientRows
;
620 String s
= GSON
.toJson(next
);
621 LOG
.info("Client=" + j
+ ", input=" + s
);
622 byte[] b
= Bytes
.toBytes(s
);
623 int hash
= h
.hash(new ByteArrayHashKey(b
, 0, b
.length
), -1);
626 for (Map
.Entry
<Integer
, String
> e
: m
.entrySet()) {
627 out
.println(e
.getValue());
636 * Describes a command.
638 static class CmdDescriptor
{
639 private Class
<?
extends TestBase
> cmdClass
;
641 private String description
;
643 CmdDescriptor(Class
<?
extends TestBase
> cmdClass
, String name
, String description
) {
644 this.cmdClass
= cmdClass
;
646 this.description
= description
;
649 public Class
<?
extends TestBase
> getCmdClass() {
653 public String
getName() {
657 public String
getDescription() {
663 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
664 * This makes tracking all these arguments a little easier.
665 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
666 * serialization of this TestOptions class behave), and you need to add to the clone constructor
667 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below.
669 static class TestOptions
{
670 String cmdName
= null;
671 boolean nomapred
= false;
672 boolean filterAll
= false;
675 int perClientRunRows
= DEFAULT_ROWS_PER_GB
;
676 int numClientThreads
= 1;
677 int totalRows
= DEFAULT_ROWS_PER_GB
;
678 int measureAfter
= 0;
679 float sampleRate
= 1.0f
;
680 double traceRate
= 0.0;
681 String tableName
= TABLE_NAME
;
682 boolean flushCommits
= true;
683 boolean writeToWAL
= true;
684 boolean autoFlush
= false;
685 boolean oneCon
= false;
686 int connCount
= -1; //wil decide the actual num later
687 boolean useTags
= false;
689 boolean reportLatency
= false;
693 boolean inMemoryCF
= false;
694 int presplitRegions
= 0;
695 int replicas
= HTableDescriptor
.DEFAULT_REGION_REPLICATION
;
696 String splitPolicy
= null;
697 Compression
.Algorithm compression
= Compression
.Algorithm
.NONE
;
698 BloomType bloomType
= BloomType
.ROW
;
699 int blockSize
= HConstants
.DEFAULT_BLOCKSIZE
;
700 DataBlockEncoding blockEncoding
= DataBlockEncoding
.NONE
;
701 boolean valueRandom
= false;
702 boolean valueZipf
= false;
703 int valueSize
= DEFAULT_VALUE_LENGTH
;
704 int period
= (this.perClientRunRows
/ 10) == 0? perClientRunRows
: perClientRunRows
/ 10;
709 boolean addColumns
= true;
710 MemoryCompactionPolicy inMemoryCompaction
=
711 MemoryCompactionPolicy
.valueOf(
712 CompactingMemStore
.COMPACTING_MEMSTORE_TYPE_DEFAULT
);
713 boolean asyncPrefetch
= false;
714 boolean cacheBlocks
= true;
715 Scan
.ReadType scanReadType
= Scan
.ReadType
.DEFAULT
;
716 long bufferSize
= 2l * 1024l * 1024l;
718 public TestOptions() {}
722 * @param that Object to copy from.
724 public TestOptions(TestOptions that
) {
725 this.cmdName
= that
.cmdName
;
726 this.cycles
= that
.cycles
;
727 this.nomapred
= that
.nomapred
;
728 this.startRow
= that
.startRow
;
729 this.size
= that
.size
;
730 this.perClientRunRows
= that
.perClientRunRows
;
731 this.numClientThreads
= that
.numClientThreads
;
732 this.totalRows
= that
.totalRows
;
733 this.sampleRate
= that
.sampleRate
;
734 this.traceRate
= that
.traceRate
;
735 this.tableName
= that
.tableName
;
736 this.flushCommits
= that
.flushCommits
;
737 this.writeToWAL
= that
.writeToWAL
;
738 this.autoFlush
= that
.autoFlush
;
739 this.oneCon
= that
.oneCon
;
740 this.connCount
= that
.connCount
;
741 this.useTags
= that
.useTags
;
742 this.noOfTags
= that
.noOfTags
;
743 this.reportLatency
= that
.reportLatency
;
744 this.multiGet
= that
.multiGet
;
745 this.multiPut
= that
.multiPut
;
746 this.inMemoryCF
= that
.inMemoryCF
;
747 this.presplitRegions
= that
.presplitRegions
;
748 this.replicas
= that
.replicas
;
749 this.splitPolicy
= that
.splitPolicy
;
750 this.compression
= that
.compression
;
751 this.blockEncoding
= that
.blockEncoding
;
752 this.filterAll
= that
.filterAll
;
753 this.bloomType
= that
.bloomType
;
754 this.blockSize
= that
.blockSize
;
755 this.valueRandom
= that
.valueRandom
;
756 this.valueZipf
= that
.valueZipf
;
757 this.valueSize
= that
.valueSize
;
758 this.period
= that
.period
;
759 this.randomSleep
= that
.randomSleep
;
760 this.measureAfter
= that
.measureAfter
;
761 this.addColumns
= that
.addColumns
;
762 this.columns
= that
.columns
;
763 this.families
= that
.families
;
764 this.caching
= that
.caching
;
765 this.inMemoryCompaction
= that
.inMemoryCompaction
;
766 this.asyncPrefetch
= that
.asyncPrefetch
;
767 this.cacheBlocks
= that
.cacheBlocks
;
768 this.scanReadType
= that
.scanReadType
;
769 this.bufferSize
= that
.bufferSize
;
772 public int getCaching() {
776 public void setCaching(final int caching
) {
777 this.caching
= caching
;
780 public int getColumns() {
784 public void setColumns(final int columns
) {
785 this.columns
= columns
;
788 public int getFamilies() {
789 return this.families
;
792 public void setFamilies(final int families
) {
793 this.families
= families
;
796 public int getCycles() {
800 public void setCycles(final int cycles
) {
801 this.cycles
= cycles
;
804 public boolean isValueZipf() {
808 public void setValueZipf(boolean valueZipf
) {
809 this.valueZipf
= valueZipf
;
812 public String
getCmdName() {
816 public void setCmdName(String cmdName
) {
817 this.cmdName
= cmdName
;
820 public int getRandomSleep() {
824 public void setRandomSleep(int randomSleep
) {
825 this.randomSleep
= randomSleep
;
828 public int getReplicas() {
832 public void setReplicas(int replicas
) {
833 this.replicas
= replicas
;
836 public String
getSplitPolicy() {
840 public void setSplitPolicy(String splitPolicy
) {
841 this.splitPolicy
= splitPolicy
;
844 public void setNomapred(boolean nomapred
) {
845 this.nomapred
= nomapred
;
848 public void setFilterAll(boolean filterAll
) {
849 this.filterAll
= filterAll
;
852 public void setStartRow(int startRow
) {
853 this.startRow
= startRow
;
856 public void setSize(float size
) {
860 public void setPerClientRunRows(int perClientRunRows
) {
861 this.perClientRunRows
= perClientRunRows
;
864 public void setNumClientThreads(int numClientThreads
) {
865 this.numClientThreads
= numClientThreads
;
868 public void setTotalRows(int totalRows
) {
869 this.totalRows
= totalRows
;
872 public void setSampleRate(float sampleRate
) {
873 this.sampleRate
= sampleRate
;
876 public void setTraceRate(double traceRate
) {
877 this.traceRate
= traceRate
;
880 public void setTableName(String tableName
) {
881 this.tableName
= tableName
;
884 public void setFlushCommits(boolean flushCommits
) {
885 this.flushCommits
= flushCommits
;
888 public void setWriteToWAL(boolean writeToWAL
) {
889 this.writeToWAL
= writeToWAL
;
892 public void setAutoFlush(boolean autoFlush
) {
893 this.autoFlush
= autoFlush
;
896 public void setOneCon(boolean oneCon
) {
897 this.oneCon
= oneCon
;
900 public int getConnCount() {
904 public void setConnCount(int connCount
) {
905 this.connCount
= connCount
;
908 public void setUseTags(boolean useTags
) {
909 this.useTags
= useTags
;
912 public void setNoOfTags(int noOfTags
) {
913 this.noOfTags
= noOfTags
;
916 public void setReportLatency(boolean reportLatency
) {
917 this.reportLatency
= reportLatency
;
920 public void setMultiGet(int multiGet
) {
921 this.multiGet
= multiGet
;
924 public void setMultiPut(int multiPut
) {
925 this.multiPut
= multiPut
;
928 public void setInMemoryCF(boolean inMemoryCF
) {
929 this.inMemoryCF
= inMemoryCF
;
932 public void setPresplitRegions(int presplitRegions
) {
933 this.presplitRegions
= presplitRegions
;
936 public void setCompression(Compression
.Algorithm compression
) {
937 this.compression
= compression
;
940 public void setBloomType(BloomType bloomType
) {
941 this.bloomType
= bloomType
;
944 public void setBlockSize(int blockSize
) {
945 this.blockSize
= blockSize
;
948 public void setBlockEncoding(DataBlockEncoding blockEncoding
) {
949 this.blockEncoding
= blockEncoding
;
952 public void setValueRandom(boolean valueRandom
) {
953 this.valueRandom
= valueRandom
;
956 public void setValueSize(int valueSize
) {
957 this.valueSize
= valueSize
;
960 public void setBufferSize(long bufferSize
) {
961 this.bufferSize
= bufferSize
;
964 public void setPeriod(int period
) {
965 this.period
= period
;
968 public boolean isNomapred() {
972 public boolean isFilterAll() {
976 public int getStartRow() {
980 public float getSize() {
984 public int getPerClientRunRows() {
985 return perClientRunRows
;
988 public int getNumClientThreads() {
989 return numClientThreads
;
992 public int getTotalRows() {
996 public float getSampleRate() {
1000 public double getTraceRate() {
1004 public String
getTableName() {
1008 public boolean isFlushCommits() {
1009 return flushCommits
;
1012 public boolean isWriteToWAL() {
1016 public boolean isAutoFlush() {
1020 public boolean isUseTags() {
1024 public int getNoOfTags() {
1028 public boolean isReportLatency() {
1029 return reportLatency
;
1032 public int getMultiGet() {
1036 public int getMultiPut() {
1040 public boolean isInMemoryCF() {
1044 public int getPresplitRegions() {
1045 return presplitRegions
;
1048 public Compression
.Algorithm
getCompression() {
1052 public DataBlockEncoding
getBlockEncoding() {
1053 return blockEncoding
;
1056 public boolean isValueRandom() {
1060 public int getValueSize() {
1064 public int getPeriod() {
1068 public BloomType
getBloomType() {
1072 public int getBlockSize() {
1076 public boolean isOneCon() {
1080 public int getMeasureAfter() {
1081 return measureAfter
;
1084 public void setMeasureAfter(int measureAfter
) {
1085 this.measureAfter
= measureAfter
;
1088 public boolean getAddColumns() {
1092 public void setAddColumns(boolean addColumns
) {
1093 this.addColumns
= addColumns
;
1096 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction
) {
1097 this.inMemoryCompaction
= inMemoryCompaction
;
1100 public MemoryCompactionPolicy
getInMemoryCompaction() {
1101 return this.inMemoryCompaction
;
1104 public long getBufferSize() {
1105 return this.bufferSize
;
1111 * Subclass to particularize what happens per row.
1113 static abstract class TestBase
{
1114 // Below is make it so when Tests are all running in the one
1115 // jvm, that they each have a differently seeded Random.
1116 private static final Random randomSeed
= new Random(System
.currentTimeMillis());
1118 private static long nextRandomSeed() {
1119 return randomSeed
.nextLong();
1121 private final int everyN
;
1123 protected final Random rand
= new Random(nextRandomSeed());
1124 protected final Configuration conf
;
1125 protected final TestOptions opts
;
1127 private final Status status
;
1128 private final Sampler traceSampler
;
1129 private final SpanReceiverHost receiverHost
;
1131 private String testName
;
1132 private Histogram latencyHistogram
;
1133 private Histogram valueSizeHistogram
;
1134 private Histogram rpcCallsHistogram
;
1135 private Histogram remoteRpcCallsHistogram
;
1136 private Histogram millisBetweenNextHistogram
;
1137 private Histogram regionsScannedHistogram
;
1138 private Histogram bytesInResultsHistogram
;
1139 private Histogram bytesInRemoteResultsHistogram
;
1140 private RandomDistribution
.Zipf zipf
;
1143 * Note that all subclasses of this class must provide a public constructor
1144 * that has the exact same list of arguments.
1146 TestBase(final Configuration conf
, final TestOptions options
, final Status status
) {
1148 this.receiverHost
= this.conf
== null?
null: SpanReceiverHost
.getInstance(conf
);
1149 this.opts
= options
;
1150 this.status
= status
;
1151 this.testName
= this.getClass().getSimpleName();
1152 if (options
.traceRate
>= 1.0) {
1153 this.traceSampler
= Sampler
.ALWAYS
;
1154 } else if (options
.traceRate
> 0.0) {
1155 conf
.setDouble("hbase.sampler.fraction", options
.traceRate
);
1156 this.traceSampler
= new ProbabilitySampler(new HBaseHTraceConfiguration(conf
));
1158 this.traceSampler
= Sampler
.NEVER
;
1160 everyN
= (int) (opts
.totalRows
/ (opts
.totalRows
* opts
.sampleRate
));
1161 if (options
.isValueZipf()) {
1162 this.zipf
= new RandomDistribution
.Zipf(this.rand
, 1, options
.getValueSize(), 1.2);
1164 LOG
.info("Sampling 1 every " + everyN
+ " out of " + opts
.perClientRunRows
+ " total rows.");
1167 int getValueLength(final Random r
) {
1168 if (this.opts
.isValueRandom()) {
1169 return r
.nextInt(opts
.valueSize
);
1170 } else if (this.opts
.isValueZipf()) {
1171 return Math
.abs(this.zipf
.nextInt());
1173 return opts
.valueSize
;
1177 void updateValueSize(final Result
[] rs
) throws IOException
{
1178 if (rs
== null || !isRandomValueSize()) return;
1179 for (Result r
: rs
) updateValueSize(r
);
1182 void updateValueSize(final Result r
) throws IOException
{
1183 if (r
== null || !isRandomValueSize()) return;
1185 for (CellScanner scanner
= r
.cellScanner(); scanner
.advance();) {
1186 size
+= scanner
.current().getValueLength();
1188 updateValueSize(size
);
1191 void updateValueSize(final int valueSize
) {
1192 if (!isRandomValueSize()) return;
1193 this.valueSizeHistogram
.update(valueSize
);
1196 void updateScanMetrics(final ScanMetrics metrics
) {
1197 if (metrics
== null) return;
1198 Map
<String
,Long
> metricsMap
= metrics
.getMetricsMap();
1199 Long rpcCalls
= metricsMap
.get(ScanMetrics
.RPC_CALLS_METRIC_NAME
);
1200 if (rpcCalls
!= null) {
1201 this.rpcCallsHistogram
.update(rpcCalls
.longValue());
1203 Long remoteRpcCalls
= metricsMap
.get(ScanMetrics
.REMOTE_RPC_CALLS_METRIC_NAME
);
1204 if (remoteRpcCalls
!= null) {
1205 this.remoteRpcCallsHistogram
.update(remoteRpcCalls
.longValue());
1207 Long millisBetweenNext
= metricsMap
.get(ScanMetrics
.MILLIS_BETWEEN_NEXTS_METRIC_NAME
);
1208 if (millisBetweenNext
!= null) {
1209 this.millisBetweenNextHistogram
.update(millisBetweenNext
.longValue());
1211 Long regionsScanned
= metricsMap
.get(ScanMetrics
.REGIONS_SCANNED_METRIC_NAME
);
1212 if (regionsScanned
!= null) {
1213 this.regionsScannedHistogram
.update(regionsScanned
.longValue());
1215 Long bytesInResults
= metricsMap
.get(ScanMetrics
.BYTES_IN_RESULTS_METRIC_NAME
);
1216 if (bytesInResults
!= null && bytesInResults
.longValue() > 0) {
1217 this.bytesInResultsHistogram
.update(bytesInResults
.longValue());
1219 Long bytesInRemoteResults
= metricsMap
.get(ScanMetrics
.BYTES_IN_REMOTE_RESULTS_METRIC_NAME
);
1220 if (bytesInRemoteResults
!= null && bytesInRemoteResults
.longValue() > 0) {
1221 this.bytesInRemoteResultsHistogram
.update(bytesInRemoteResults
.longValue());
1225 String
generateStatus(final int sr
, final int i
, final int lr
) {
1226 return sr
+ "/" + i
+ "/" + lr
+ ", latency " + getShortLatencyReport() +
1227 (!isRandomValueSize()?
"": ", value size " + getShortValueSizeReport());
1230 boolean isRandomValueSize() {
1231 return opts
.valueRandom
;
1234 protected int getReportingPeriod() {
1239 * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1241 public Histogram
getLatencyHistogram() {
1242 return latencyHistogram
;
1245 void testSetup() throws IOException
{
1247 latencyHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1248 valueSizeHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1250 rpcCallsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1251 remoteRpcCallsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1252 millisBetweenNextHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1253 regionsScannedHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1254 bytesInResultsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1255 bytesInRemoteResultsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1260 abstract void onStartup() throws IOException
;
1262 void testTakedown() throws IOException
{
1264 // Print all stats for this thread continuously.
1265 // Synchronize on Test.class so different threads don't intermingle the
1266 // output. We can't use 'this' here because each thread has its own instance of Test class.
1267 synchronized (Test
.class) {
1268 status
.setStatus("Test : " + testName
+ ", Thread : " + Thread
.currentThread().getName());
1269 status
.setStatus("Latency (us) : " + YammerHistogramUtils
.getHistogramReport(
1271 status
.setStatus("Num measures (latency) : " + latencyHistogram
.getCount());
1272 status
.setStatus(YammerHistogramUtils
.getPrettyHistogramReport(latencyHistogram
));
1273 if (valueSizeHistogram
.getCount() > 0) {
1274 status
.setStatus("ValueSize (bytes) : "
1275 + YammerHistogramUtils
.getHistogramReport(valueSizeHistogram
));
1276 status
.setStatus("Num measures (ValueSize): " + valueSizeHistogram
.getCount());
1277 status
.setStatus(YammerHistogramUtils
.getPrettyHistogramReport(valueSizeHistogram
));
1279 status
.setStatus("No valueSize statistics available");
1281 if (rpcCallsHistogram
.getCount() > 0) {
1282 status
.setStatus("rpcCalls (count): " +
1283 YammerHistogramUtils
.getHistogramReport(rpcCallsHistogram
));
1285 if (remoteRpcCallsHistogram
.getCount() > 0) {
1286 status
.setStatus("remoteRpcCalls (count): " +
1287 YammerHistogramUtils
.getHistogramReport(remoteRpcCallsHistogram
));
1289 if (millisBetweenNextHistogram
.getCount() > 0) {
1290 status
.setStatus("millisBetweenNext (latency): " +
1291 YammerHistogramUtils
.getHistogramReport(millisBetweenNextHistogram
));
1293 if (regionsScannedHistogram
.getCount() > 0) {
1294 status
.setStatus("regionsScanned (count): " +
1295 YammerHistogramUtils
.getHistogramReport(regionsScannedHistogram
));
1297 if (bytesInResultsHistogram
.getCount() > 0) {
1298 status
.setStatus("bytesInResults (size): " +
1299 YammerHistogramUtils
.getHistogramReport(bytesInResultsHistogram
));
1301 if (bytesInRemoteResultsHistogram
.getCount() > 0) {
1302 status
.setStatus("bytesInRemoteResults (size): " +
1303 YammerHistogramUtils
.getHistogramReport(bytesInRemoteResultsHistogram
));
1306 receiverHost
.closeReceivers();
1309 abstract void onTakedown() throws IOException
;
1314 * @return Elapsed time.
1315 * @throws IOException
1317 long test() throws IOException
, InterruptedException
{
1319 LOG
.info("Timed test starting in thread " + Thread
.currentThread().getName());
1320 final long startTime
= System
.nanoTime();
1326 return (System
.nanoTime() - startTime
) / 1000000;
1330 return opts
.startRow
;
1334 return getStartRow() + opts
.perClientRunRows
;
1338 * Provides an extension point for tests that don't want a per row invocation.
1340 void testTimed() throws IOException
, InterruptedException
{
1341 int startRow
= getStartRow();
1342 int lastRow
= getLastRow();
1343 TraceUtil
.addSampler(traceSampler
);
1344 // Report on completion of 1/10th of total.
1345 for (int ii
= 0; ii
< opts
.cycles
; ii
++) {
1346 if (opts
.cycles
> 1) LOG
.info("Cycle=" + ii
+ " of " + opts
.cycles
);
1347 for (int i
= startRow
; i
< lastRow
; i
++) {
1348 if (i
% everyN
!= 0) continue;
1349 long startTime
= System
.nanoTime();
1350 boolean requestSent
= false;
1351 try (TraceScope scope
= TraceUtil
.createTrace("test row");){
1352 requestSent
= testRow(i
);
1354 if ( (i
- startRow
) > opts
.measureAfter
) {
1355 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1356 // first 9 times and sends the actual get request in the 10th iteration.
1357 // We should only set latency when actual request is sent because otherwise
1358 // it turns out to be 0.
1360 latencyHistogram
.update((System
.nanoTime() - startTime
) / 1000);
1362 if (status
!= null && i
> 0 && (i
% getReportingPeriod()) == 0) {
1363 status
.setStatus(generateStatus(startRow
, i
, lastRow
));
1371 * @return Subset of the histograms' calculation.
1373 public String
getShortLatencyReport() {
1374 return YammerHistogramUtils
.getShortHistogramReport(this.latencyHistogram
);
1378 * @return Subset of the histograms' calculation.
1380 public String
getShortValueSizeReport() {
1381 return YammerHistogramUtils
.getShortHistogramReport(this.valueSizeHistogram
);
1386 * Test for individual row.
1387 * @param i Row index.
1388 * @return true if the row was sent to server and need to record metrics.
1389 * False if not, multiGet and multiPut e.g., the rows are sent
1390 * to server only if enough gets/puts are gathered.
1392 abstract boolean testRow(final int i
) throws IOException
, InterruptedException
;
1395 static abstract class Test
extends TestBase
{
1396 protected Connection connection
;
1398 Test(final Connection con
, final TestOptions options
, final Status status
) {
1399 super(con
== null ? HBaseConfiguration
.create() : con
.getConfiguration(), options
, status
);
1400 this.connection
= con
;
1404 static abstract class AsyncTest
extends TestBase
{
1405 protected AsyncConnection connection
;
1407 AsyncTest(final AsyncConnection con
, final TestOptions options
, final Status status
) {
1408 super(con
== null ? HBaseConfiguration
.create() : con
.getConfiguration(), options
, status
);
1409 this.connection
= con
;
1413 static abstract class TableTest
extends Test
{
1414 protected Table table
;
1416 TableTest(Connection con
, TestOptions options
, Status status
) {
1417 super(con
, options
, status
);
1421 void onStartup() throws IOException
{
1422 this.table
= connection
.getTable(TableName
.valueOf(opts
.tableName
));
1426 void onTakedown() throws IOException
{
1431 static abstract class AsyncTableTest
extends AsyncTest
{
1432 protected AsyncTable
<?
> table
;
1434 AsyncTableTest(AsyncConnection con
, TestOptions options
, Status status
) {
1435 super(con
, options
, status
);
1439 void onStartup() throws IOException
{
1440 this.table
= connection
.getTable(TableName
.valueOf(opts
.tableName
));
1444 void onTakedown() throws IOException
{
1448 static class AsyncRandomReadTest
extends AsyncTableTest
{
1449 private final Consistency consistency
;
1450 private ArrayList
<Get
> gets
;
1451 private Random rd
= new Random();
1453 AsyncRandomReadTest(AsyncConnection con
, TestOptions options
, Status status
) {
1454 super(con
, options
, status
);
1455 consistency
= options
.replicas
== DEFAULT_OPTS
.replicas ?
null : Consistency
.TIMELINE
;
1456 if (opts
.multiGet
> 0) {
1457 LOG
.info("MultiGet enabled. Sending GETs in batches of " + opts
.multiGet
+ ".");
1458 this.gets
= new ArrayList
<>(opts
.multiGet
);
1463 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1464 if (opts
.randomSleep
> 0) {
1465 Thread
.sleep(rd
.nextInt(opts
.randomSleep
));
1467 Get get
= new Get(getRandomRow(this.rand
, opts
.totalRows
));
1468 for (int family
= 0; family
< opts
.families
; family
++) {
1469 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1470 if (opts
.addColumns
) {
1471 for (int column
= 0; column
< opts
.columns
; column
++) {
1472 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1473 get
.addColumn(familyName
, qualifier
);
1476 get
.addFamily(familyName
);
1479 if (opts
.filterAll
) {
1480 get
.setFilter(new FilterAllFilter());
1482 get
.setConsistency(consistency
);
1483 if (LOG
.isTraceEnabled()) LOG
.trace(get
.toString());
1485 if (opts
.multiGet
> 0) {
1487 if (this.gets
.size() == opts
.multiGet
) {
1489 this.table
.get(this.gets
).stream().map(f
-> propagate(f
::get
)).toArray(Result
[]::new);
1490 updateValueSize(rs
);
1496 updateValueSize(this.table
.get(get
).get());
1498 } catch (ExecutionException e
) {
1499 throw new IOException(e
);
1504 public static RuntimeException
runtime(Throwable e
) {
1505 if (e
instanceof RuntimeException
) {
1506 return (RuntimeException
) e
;
1508 return new RuntimeException(e
);
1511 public static <V
> V
propagate(Callable
<V
> callable
) {
1513 return callable
.call();
1514 } catch (Exception e
) {
1520 protected int getReportingPeriod() {
1521 int period
= opts
.perClientRunRows
/ 10;
1522 return period
== 0 ? opts
.perClientRunRows
: period
;
1526 protected void testTakedown() throws IOException
{
1527 if (this.gets
!= null && this.gets
.size() > 0) {
1528 this.table
.get(gets
);
1531 super.testTakedown();
1535 static class AsyncRandomWriteTest
extends AsyncSequentialWriteTest
{
1537 AsyncRandomWriteTest(AsyncConnection con
, TestOptions options
, Status status
) {
1538 super(con
, options
, status
);
1542 protected byte[] generateRow(final int i
) {
1543 return getRandomRow(this.rand
, opts
.totalRows
);
1547 static class AsyncScanTest
extends AsyncTableTest
{
1548 private ResultScanner testScanner
;
1549 private AsyncTable
<?
> asyncTable
;
1551 AsyncScanTest(AsyncConnection con
, TestOptions options
, Status status
) {
1552 super(con
, options
, status
);
1556 void onStartup() throws IOException
{
1558 connection
.getTable(TableName
.valueOf(opts
.tableName
),
1559 Executors
.newFixedThreadPool(Runtime
.getRuntime().availableProcessors()));
1563 void testTakedown() throws IOException
{
1564 if (this.testScanner
!= null) {
1565 updateScanMetrics(this.testScanner
.getScanMetrics());
1566 this.testScanner
.close();
1568 super.testTakedown();
1572 boolean testRow(final int i
) throws IOException
{
1573 if (this.testScanner
== null) {
1575 new Scan().withStartRow(format(opts
.startRow
)).setCaching(opts
.caching
)
1576 .setCacheBlocks(opts
.cacheBlocks
).setAsyncPrefetch(opts
.asyncPrefetch
)
1577 .setReadType(opts
.scanReadType
).setScanMetricsEnabled(true);
1578 for (int family
= 0; family
< opts
.families
; family
++) {
1579 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1580 if (opts
.addColumns
) {
1581 for (int column
= 0; column
< opts
.columns
; column
++) {
1582 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1583 scan
.addColumn(familyName
, qualifier
);
1586 scan
.addFamily(familyName
);
1589 if (opts
.filterAll
) {
1590 scan
.setFilter(new FilterAllFilter());
1592 this.testScanner
= asyncTable
.getScanner(scan
);
1594 Result r
= testScanner
.next();
1600 static class AsyncSequentialReadTest
extends AsyncTableTest
{
1601 AsyncSequentialReadTest(AsyncConnection con
, TestOptions options
, Status status
) {
1602 super(con
, options
, status
);
1606 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1607 Get get
= new Get(format(i
));
1608 for (int family
= 0; family
< opts
.families
; family
++) {
1609 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1610 if (opts
.addColumns
) {
1611 for (int column
= 0; column
< opts
.columns
; column
++) {
1612 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1613 get
.addColumn(familyName
, qualifier
);
1616 get
.addFamily(familyName
);
1619 if (opts
.filterAll
) {
1620 get
.setFilter(new FilterAllFilter());
1623 updateValueSize(table
.get(get
).get());
1624 } catch (ExecutionException e
) {
1625 throw new IOException(e
);
1631 static class AsyncSequentialWriteTest
extends AsyncTableTest
{
1632 private ArrayList
<Put
> puts
;
1634 AsyncSequentialWriteTest(AsyncConnection con
, TestOptions options
, Status status
) {
1635 super(con
, options
, status
);
1636 if (opts
.multiPut
> 0) {
1637 LOG
.info("MultiPut enabled. Sending PUTs in batches of " + opts
.multiPut
+ ".");
1638 this.puts
= new ArrayList
<>(opts
.multiPut
);
1642 protected byte[] generateRow(final int i
) {
1647 @SuppressWarnings("ReturnValueIgnored")
1648 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1649 byte[] row
= generateRow(i
);
1650 Put put
= new Put(row
);
1651 for (int family
= 0; family
< opts
.families
; family
++) {
1652 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1653 for (int column
= 0; column
< opts
.columns
; column
++) {
1654 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1655 byte[] value
= generateData(this.rand
, getValueLength(this.rand
));
1657 byte[] tag
= generateData(this.rand
, TAG_LENGTH
);
1658 Tag
[] tags
= new Tag
[opts
.noOfTags
];
1659 for (int n
= 0; n
< opts
.noOfTags
; n
++) {
1660 Tag t
= new ArrayBackedTag((byte) n
, tag
);
1663 KeyValue kv
= new KeyValue(row
, familyName
, qualifier
, HConstants
.LATEST_TIMESTAMP
,
1666 updateValueSize(kv
.getValueLength());
1668 put
.addColumn(familyName
, qualifier
, value
);
1669 updateValueSize(value
.length
);
1673 put
.setDurability(opts
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
1675 table
.put(put
).get();
1676 if (opts
.multiPut
> 0) {
1678 if (this.puts
.size() == opts
.multiPut
) {
1679 this.table
.put(puts
).stream().map(f
-> AsyncRandomReadTest
.propagate(f
::get
));
1685 table
.put(put
).get();
1687 } catch (ExecutionException e
) {
1688 throw new IOException(e
);
1694 static abstract class BufferedMutatorTest
extends Test
{
1695 protected BufferedMutator mutator
;
1696 protected Table table
;
1698 BufferedMutatorTest(Connection con
, TestOptions options
, Status status
) {
1699 super(con
, options
, status
);
1703 void onStartup() throws IOException
{
1704 BufferedMutatorParams p
= new BufferedMutatorParams(TableName
.valueOf(opts
.tableName
));
1705 p
.writeBufferSize(opts
.bufferSize
);
1706 this.mutator
= connection
.getBufferedMutator(p
);
1707 this.table
= connection
.getTable(TableName
.valueOf(opts
.tableName
));
1711 void onTakedown() throws IOException
{
1717 static class RandomSeekScanTest
extends TableTest
{
1718 RandomSeekScanTest(Connection con
, TestOptions options
, Status status
) {
1719 super(con
, options
, status
);
1723 boolean testRow(final int i
) throws IOException
{
1724 Scan scan
= new Scan().withStartRow(getRandomRow(this.rand
, opts
.totalRows
))
1725 .setCaching(opts
.caching
).setCacheBlocks(opts
.cacheBlocks
)
1726 .setAsyncPrefetch(opts
.asyncPrefetch
).setReadType(opts
.scanReadType
)
1727 .setScanMetricsEnabled(true);
1728 FilterList list
= new FilterList();
1729 for (int family
= 0; family
< opts
.families
; family
++) {
1730 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1731 if (opts
.addColumns
) {
1732 for (int column
= 0; column
< opts
.columns
; column
++) {
1733 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1734 scan
.addColumn(familyName
, qualifier
);
1737 scan
.addFamily(familyName
);
1740 if (opts
.filterAll
) {
1741 list
.addFilter(new FilterAllFilter());
1743 list
.addFilter(new WhileMatchFilter(new PageFilter(120)));
1744 scan
.setFilter(list
);
1745 ResultScanner s
= this.table
.getScanner(scan
);
1747 for (Result rr
; (rr
= s
.next()) != null;) {
1748 updateValueSize(rr
);
1751 updateScanMetrics(s
.getScanMetrics());
1758 protected int getReportingPeriod() {
1759 int period
= opts
.perClientRunRows
/ 100;
1760 return period
== 0 ? opts
.perClientRunRows
: period
;
1765 static abstract class RandomScanWithRangeTest
extends TableTest
{
1766 RandomScanWithRangeTest(Connection con
, TestOptions options
, Status status
) {
1767 super(con
, options
, status
);
1771 boolean testRow(final int i
) throws IOException
{
1772 Pair
<byte[], byte[]> startAndStopRow
= getStartAndStopRow();
1773 Scan scan
= new Scan().withStartRow(startAndStopRow
.getFirst())
1774 .withStopRow(startAndStopRow
.getSecond()).setCaching(opts
.caching
)
1775 .setCacheBlocks(opts
.cacheBlocks
).setAsyncPrefetch(opts
.asyncPrefetch
)
1776 .setReadType(opts
.scanReadType
).setScanMetricsEnabled(true);
1777 for (int family
= 0; family
< opts
.families
; family
++) {
1778 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1779 if (opts
.addColumns
) {
1780 for (int column
= 0; column
< opts
.columns
; column
++) {
1781 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1782 scan
.addColumn(familyName
, qualifier
);
1785 scan
.addFamily(familyName
);
1788 if (opts
.filterAll
) {
1789 scan
.setFilter(new FilterAllFilter());
1793 ResultScanner s
= this.table
.getScanner(scan
);
1795 for (; (r
= s
.next()) != null;) {
1800 LOG
.info(String
.format("Scan for key range %s - %s returned %s rows",
1801 Bytes
.toString(startAndStopRow
.getFirst()),
1802 Bytes
.toString(startAndStopRow
.getSecond()), count
));
1805 updateScanMetrics(s
.getScanMetrics());
1811 protected abstract Pair
<byte[],byte[]> getStartAndStopRow();
1813 protected Pair
<byte[], byte[]> generateStartAndStopRows(int maxRange
) {
1814 int start
= this.rand
.nextInt(Integer
.MAX_VALUE
) % opts
.totalRows
;
1815 int stop
= start
+ maxRange
;
1816 return new Pair
<>(format(start
), format(stop
));
1820 protected int getReportingPeriod() {
1821 int period
= opts
.perClientRunRows
/ 100;
1822 return period
== 0? opts
.perClientRunRows
: period
;
1826 static class RandomScanWithRange10Test
extends RandomScanWithRangeTest
{
1827 RandomScanWithRange10Test(Connection con
, TestOptions options
, Status status
) {
1828 super(con
, options
, status
);
1832 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1833 return generateStartAndStopRows(10);
1837 static class RandomScanWithRange100Test
extends RandomScanWithRangeTest
{
1838 RandomScanWithRange100Test(Connection con
, TestOptions options
, Status status
) {
1839 super(con
, options
, status
);
1843 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1844 return generateStartAndStopRows(100);
1848 static class RandomScanWithRange1000Test
extends RandomScanWithRangeTest
{
1849 RandomScanWithRange1000Test(Connection con
, TestOptions options
, Status status
) {
1850 super(con
, options
, status
);
1854 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1855 return generateStartAndStopRows(1000);
1859 static class RandomScanWithRange10000Test
extends RandomScanWithRangeTest
{
1860 RandomScanWithRange10000Test(Connection con
, TestOptions options
, Status status
) {
1861 super(con
, options
, status
);
1865 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1866 return generateStartAndStopRows(10000);
1870 static class RandomReadTest
extends TableTest
{
1871 private final Consistency consistency
;
1872 private ArrayList
<Get
> gets
;
1873 private Random rd
= new Random();
1875 RandomReadTest(Connection con
, TestOptions options
, Status status
) {
1876 super(con
, options
, status
);
1877 consistency
= options
.replicas
== DEFAULT_OPTS
.replicas ?
null : Consistency
.TIMELINE
;
1878 if (opts
.multiGet
> 0) {
1879 LOG
.info("MultiGet enabled. Sending GETs in batches of " + opts
.multiGet
+ ".");
1880 this.gets
= new ArrayList
<>(opts
.multiGet
);
1885 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1886 if (opts
.randomSleep
> 0) {
1887 Thread
.sleep(rd
.nextInt(opts
.randomSleep
));
1889 Get get
= new Get(getRandomRow(this.rand
, opts
.totalRows
));
1890 for (int family
= 0; family
< opts
.families
; family
++) {
1891 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1892 if (opts
.addColumns
) {
1893 for (int column
= 0; column
< opts
.columns
; column
++) {
1894 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1895 get
.addColumn(familyName
, qualifier
);
1898 get
.addFamily(familyName
);
1901 if (opts
.filterAll
) {
1902 get
.setFilter(new FilterAllFilter());
1904 get
.setConsistency(consistency
);
1905 if (LOG
.isTraceEnabled()) LOG
.trace(get
.toString());
1906 if (opts
.multiGet
> 0) {
1908 if (this.gets
.size() == opts
.multiGet
) {
1909 Result
[] rs
= this.table
.get(this.gets
);
1910 updateValueSize(rs
);
1916 updateValueSize(this.table
.get(get
));
1922 protected int getReportingPeriod() {
1923 int period
= opts
.perClientRunRows
/ 10;
1924 return period
== 0 ? opts
.perClientRunRows
: period
;
1928 protected void testTakedown() throws IOException
{
1929 if (this.gets
!= null && this.gets
.size() > 0) {
1930 this.table
.get(gets
);
1933 super.testTakedown();
1937 static class RandomWriteTest
extends SequentialWriteTest
{
1938 RandomWriteTest(Connection con
, TestOptions options
, Status status
) {
1939 super(con
, options
, status
);
1943 protected byte[] generateRow(final int i
) {
1944 return getRandomRow(this.rand
, opts
.totalRows
);
1950 static class ScanTest
extends TableTest
{
1951 private ResultScanner testScanner
;
1953 ScanTest(Connection con
, TestOptions options
, Status status
) {
1954 super(con
, options
, status
);
1958 void testTakedown() throws IOException
{
1959 if (this.testScanner
!= null) {
1960 this.testScanner
.close();
1962 super.testTakedown();
1967 boolean testRow(final int i
) throws IOException
{
1968 if (this.testScanner
== null) {
1969 Scan scan
= new Scan().withStartRow(format(opts
.startRow
)).setCaching(opts
.caching
)
1970 .setCacheBlocks(opts
.cacheBlocks
).setAsyncPrefetch(opts
.asyncPrefetch
)
1971 .setReadType(opts
.scanReadType
).setScanMetricsEnabled(true);
1972 for (int family
= 0; family
< opts
.families
; family
++) {
1973 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1974 if (opts
.addColumns
) {
1975 for (int column
= 0; column
< opts
.columns
; column
++) {
1976 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1977 scan
.addColumn(familyName
, qualifier
);
1980 scan
.addFamily(familyName
);
1983 if (opts
.filterAll
) {
1984 scan
.setFilter(new FilterAllFilter());
1986 this.testScanner
= table
.getScanner(scan
);
1988 Result r
= testScanner
.next();
1995 * Base class for operations that are CAS-like; that read a value and then set it based off what
1996 * they read. In this category is increment, append, checkAndPut, etc.
1998 * <p>These operations also want some concurrency going on. Usually when these tests run, they
1999 * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2000 * same key space. We do this with our getStartRow and getLastRow overrides.
2002 static abstract class CASTableTest
extends TableTest
{
2003 private final byte [] qualifier
;
2004 CASTableTest(Connection con
, TestOptions options
, Status status
) {
2005 super(con
, options
, status
);
2006 qualifier
= Bytes
.toBytes(this.getClass().getSimpleName());
2009 byte [] getQualifier() {
2010 return this.qualifier
;
2020 return opts
.perClientRunRows
;
2024 static class IncrementTest
extends CASTableTest
{
2025 IncrementTest(Connection con
, TestOptions options
, Status status
) {
2026 super(con
, options
, status
);
2030 boolean testRow(final int i
) throws IOException
{
2031 Increment increment
= new Increment(format(i
));
2032 // unlike checkAndXXX tests, which make most sense to do on a single value,
2033 // if multiple families are specified for an increment test we assume it is
2034 // meant to raise the work factor
2035 for (int family
= 0; family
< opts
.families
; family
++) {
2036 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2037 increment
.addColumn(familyName
, getQualifier(), 1l);
2039 updateValueSize(this.table
.increment(increment
));
2044 static class AppendTest
extends CASTableTest
{
2045 AppendTest(Connection con
, TestOptions options
, Status status
) {
2046 super(con
, options
, status
);
2050 boolean testRow(final int i
) throws IOException
{
2051 byte [] bytes
= format(i
);
2052 Append append
= new Append(bytes
);
2053 // unlike checkAndXXX tests, which make most sense to do on a single value,
2054 // if multiple families are specified for an append test we assume it is
2055 // meant to raise the work factor
2056 for (int family
= 0; family
< opts
.families
; family
++) {
2057 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2058 append
.addColumn(familyName
, getQualifier(), bytes
);
2060 updateValueSize(this.table
.append(append
));
2065 static class CheckAndMutateTest
extends CASTableTest
{
2066 CheckAndMutateTest(Connection con
, TestOptions options
, Status status
) {
2067 super(con
, options
, status
);
2071 boolean testRow(final int i
) throws IOException
{
2072 final byte [] bytes
= format(i
);
2073 // checkAndXXX tests operate on only a single value
2074 // Put a known value so when we go to check it, it is there.
2075 Put put
= new Put(bytes
);
2076 put
.addColumn(FAMILY_ZERO
, getQualifier(), bytes
);
2077 this.table
.put(put
);
2078 RowMutations mutations
= new RowMutations(bytes
);
2080 this.table
.checkAndMutate(bytes
, FAMILY_ZERO
).qualifier(getQualifier())
2081 .ifEquals(bytes
).thenMutate(mutations
);
2086 static class CheckAndPutTest
extends CASTableTest
{
2087 CheckAndPutTest(Connection con
, TestOptions options
, Status status
) {
2088 super(con
, options
, status
);
2092 boolean testRow(final int i
) throws IOException
{
2093 final byte [] bytes
= format(i
);
2094 // checkAndXXX tests operate on only a single value
2095 // Put a known value so when we go to check it, it is there.
2096 Put put
= new Put(bytes
);
2097 put
.addColumn(FAMILY_ZERO
, getQualifier(), bytes
);
2098 this.table
.put(put
);
2099 this.table
.checkAndMutate(bytes
, FAMILY_ZERO
).qualifier(getQualifier())
2100 .ifEquals(bytes
).thenPut(put
);
2105 static class CheckAndDeleteTest
extends CASTableTest
{
2106 CheckAndDeleteTest(Connection con
, TestOptions options
, Status status
) {
2107 super(con
, options
, status
);
2111 boolean testRow(final int i
) throws IOException
{
2112 final byte [] bytes
= format(i
);
2113 // checkAndXXX tests operate on only a single value
2114 // Put a known value so when we go to check it, it is there.
2115 Put put
= new Put(bytes
);
2116 put
.addColumn(FAMILY_ZERO
, getQualifier(), bytes
);
2117 this.table
.put(put
);
2118 Delete delete
= new Delete(put
.getRow());
2119 delete
.addColumn(FAMILY_ZERO
, getQualifier());
2120 this.table
.checkAndMutate(bytes
, FAMILY_ZERO
).qualifier(getQualifier())
2121 .ifEquals(bytes
).thenDelete(delete
);
2126 static class SequentialReadTest
extends TableTest
{
2127 SequentialReadTest(Connection con
, TestOptions options
, Status status
) {
2128 super(con
, options
, status
);
2132 boolean testRow(final int i
) throws IOException
{
2133 Get get
= new Get(format(i
));
2134 for (int family
= 0; family
< opts
.families
; family
++) {
2135 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2136 if (opts
.addColumns
) {
2137 for (int column
= 0; column
< opts
.columns
; column
++) {
2138 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
2139 get
.addColumn(familyName
, qualifier
);
2142 get
.addFamily(familyName
);
2145 if (opts
.filterAll
) {
2146 get
.setFilter(new FilterAllFilter());
2148 updateValueSize(table
.get(get
));
2153 static class SequentialWriteTest
extends BufferedMutatorTest
{
2154 private ArrayList
<Put
> puts
;
2157 SequentialWriteTest(Connection con
, TestOptions options
, Status status
) {
2158 super(con
, options
, status
);
2159 if (opts
.multiPut
> 0) {
2160 LOG
.info("MultiPut enabled. Sending PUTs in batches of " + opts
.multiPut
+ ".");
2161 this.puts
= new ArrayList
<>(opts
.multiPut
);
2165 protected byte[] generateRow(final int i
) {
2170 boolean testRow(final int i
) throws IOException
{
2171 byte[] row
= generateRow(i
);
2172 Put put
= new Put(row
);
2173 for (int family
= 0; family
< opts
.families
; family
++) {
2174 byte familyName
[] = Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2175 for (int column
= 0; column
< opts
.columns
; column
++) {
2176 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
2177 byte[] value
= generateData(this.rand
, getValueLength(this.rand
));
2179 byte[] tag
= generateData(this.rand
, TAG_LENGTH
);
2180 Tag
[] tags
= new Tag
[opts
.noOfTags
];
2181 for (int n
= 0; n
< opts
.noOfTags
; n
++) {
2182 Tag t
= new ArrayBackedTag((byte) n
, tag
);
2185 KeyValue kv
= new KeyValue(row
, familyName
, qualifier
, HConstants
.LATEST_TIMESTAMP
,
2188 updateValueSize(kv
.getValueLength());
2190 put
.addColumn(familyName
, qualifier
, value
);
2191 updateValueSize(value
.length
);
2195 put
.setDurability(opts
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
2196 if (opts
.autoFlush
) {
2197 if (opts
.multiPut
> 0) {
2199 if (this.puts
.size() == opts
.multiPut
) {
2200 table
.put(this.puts
);
2209 mutator
.mutate(put
);
2215 static class FilteredScanTest
extends TableTest
{
2216 protected static final Logger LOG
= LoggerFactory
.getLogger(FilteredScanTest
.class.getName());
2218 FilteredScanTest(Connection con
, TestOptions options
, Status status
) {
2219 super(con
, options
, status
);
2220 if (opts
.perClientRunRows
== DEFAULT_ROWS_PER_GB
) {
2221 LOG
.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
+
2222 ". This could take a very long time.");
2227 boolean testRow(int i
) throws IOException
{
2228 byte[] value
= generateData(this.rand
, getValueLength(this.rand
));
2229 Scan scan
= constructScan(value
);
2230 ResultScanner scanner
= null;
2232 scanner
= this.table
.getScanner(scan
);
2233 for (Result r
= null; (r
= scanner
.next()) != null;) {
2237 if (scanner
!= null) {
2238 updateScanMetrics(scanner
.getScanMetrics());
2245 protected Scan
constructScan(byte[] valuePrefix
) throws IOException
{
2246 FilterList list
= new FilterList();
2247 Filter filter
= new SingleColumnValueFilter(FAMILY_ZERO
, COLUMN_ZERO
,
2248 CompareOperator
.EQUAL
, new BinaryComparator(valuePrefix
));
2249 list
.addFilter(filter
);
2250 if (opts
.filterAll
) {
2251 list
.addFilter(new FilterAllFilter());
2253 Scan scan
= new Scan().setCaching(opts
.caching
).setCacheBlocks(opts
.cacheBlocks
)
2254 .setAsyncPrefetch(opts
.asyncPrefetch
).setReadType(opts
.scanReadType
)
2255 .setScanMetricsEnabled(true);
2256 if (opts
.addColumns
) {
2257 for (int column
= 0; column
< opts
.columns
; column
++) {
2258 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
2259 scan
.addColumn(FAMILY_ZERO
, qualifier
);
2262 scan
.addFamily(FAMILY_ZERO
);
2264 scan
.setFilter(list
);
2270 * Compute a throughput rate in MB/s.
2271 * @param rows Number of records consumed.
2272 * @param timeMs Time taken in milliseconds.
2273 * @return String value with label, ie '123.76 MB/s'
2275 private static String
calculateMbps(int rows
, long timeMs
, final int valueSize
, int families
, int columns
) {
2276 BigDecimal rowSize
= BigDecimal
.valueOf(ROW_LENGTH
+
2277 ((valueSize
+ (FAMILY_NAME_BASE
.length()+1) + COLUMN_ZERO
.length
) * columns
) * families
);
2278 BigDecimal mbps
= BigDecimal
.valueOf(rows
).multiply(rowSize
, CXT
)
2279 .divide(BigDecimal
.valueOf(timeMs
), CXT
).multiply(MS_PER_SEC
, CXT
)
2280 .divide(BYTES_PER_MB
, CXT
);
2281 return FMT
.format(mbps
) + " MB/s";
2285 * Format passed integer.
2287 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2288 * number (Does absolute in case number is negative).
2290 public static byte [] format(final int number
) {
2291 byte [] b
= new byte[ROW_LENGTH
];
2292 int d
= Math
.abs(number
);
2293 for (int i
= b
.length
- 1; i
>= 0; i
--) {
2294 b
[i
] = (byte)((d
% 10) + '0');
2301 * This method takes some time and is done inline uploading data. For
2302 * example, doing the mapfile test, generation of the key and value
2303 * consumes about 30% of CPU time.
2304 * @return Generated random value to insert into a table cell.
2306 public static byte[] generateData(final Random r
, int length
) {
2307 byte [] b
= new byte [length
];
2310 for(i
= 0; i
< (length
-8); i
+= 8) {
2311 b
[i
] = (byte) (65 + r
.nextInt(26));
2321 byte a
= (byte) (65 + r
.nextInt(26));
2322 for(; i
< length
; i
++) {
2328 static byte [] getRandomRow(final Random random
, final int totalRows
) {
2329 return format(generateRandomRow(random
, totalRows
));
2332 static int generateRandomRow(final Random random
, final int totalRows
) {
2333 return random
.nextInt(Integer
.MAX_VALUE
) % totalRows
;
2336 static RunResult
runOneClient(final Class
<?
extends TestBase
> cmd
, Configuration conf
,
2337 Connection con
, AsyncConnection asyncCon
, TestOptions opts
, final Status status
)
2338 throws IOException
, InterruptedException
{
2339 status
.setStatus("Start " + cmd
+ " at offset " + opts
.startRow
+ " for "
2340 + opts
.perClientRunRows
+ " rows");
2341 long totalElapsedTime
;
2345 if (AsyncTest
.class.isAssignableFrom(cmd
)) {
2346 Class
<?
extends AsyncTest
> newCmd
= (Class
<?
extends AsyncTest
>) cmd
;
2347 Constructor
<?
extends AsyncTest
> constructor
=
2348 newCmd
.getDeclaredConstructor(AsyncConnection
.class, TestOptions
.class, Status
.class);
2349 t
= constructor
.newInstance(asyncCon
, opts
, status
);
2351 Class
<?
extends Test
> newCmd
= (Class
<?
extends Test
>) cmd
;
2352 Constructor
<?
extends Test
> constructor
=
2353 newCmd
.getDeclaredConstructor(Connection
.class, TestOptions
.class, Status
.class);
2354 t
= constructor
.newInstance(con
, opts
, status
);
2356 } catch (NoSuchMethodException e
) {
2357 throw new IllegalArgumentException("Invalid command class: " + cmd
.getName()
2358 + ". It does not provide a constructor as described by "
2359 + "the javadoc comment. Available constructors are: "
2360 + Arrays
.toString(cmd
.getConstructors()));
2361 } catch (Exception e
) {
2362 throw new IllegalStateException("Failed to construct command class", e
);
2364 totalElapsedTime
= t
.test();
2366 status
.setStatus("Finished " + cmd
+ " in " + totalElapsedTime
+
2367 "ms at offset " + opts
.startRow
+ " for " + opts
.perClientRunRows
+ " rows" +
2368 " (" + calculateMbps((int)(opts
.perClientRunRows
* opts
.sampleRate
), totalElapsedTime
,
2369 getAverageValueLength(opts
), opts
.families
, opts
.columns
) + ")");
2371 return new RunResult(totalElapsedTime
, t
.getLatencyHistogram());
2374 private static int getAverageValueLength(final TestOptions opts
) {
2375 return opts
.valueRandom? opts
.valueSize
/2: opts
.valueSize
;
2378 private void runTest(final Class
<?
extends TestBase
> cmd
, TestOptions opts
) throws IOException
,
2379 InterruptedException
, ClassNotFoundException
, ExecutionException
{
2380 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2381 // the TestOptions introspection for us and dump the output in a readable format.
2382 LOG
.info(cmd
.getSimpleName() + " test run options=" + GSON
.toJson(opts
));
2384 Connection connection
= null;
2386 connection
= ConnectionFactory
.createConnection(getConf());
2387 admin
= connection
.getAdmin();
2388 checkTable(admin
, opts
);
2390 if (admin
!= null) admin
.close();
2391 if (connection
!= null) connection
.close();
2393 if (opts
.nomapred
) {
2394 doLocalClients(opts
, getConf());
2396 doMapReduce(opts
, getConf());
2400 protected void printUsage() {
2401 printUsage(PE_COMMAND_SHORTNAME
, null);
2404 protected static void printUsage(final String message
) {
2405 printUsage(PE_COMMAND_SHORTNAME
, message
);
2408 protected static void printUsageAndExit(final String message
, final int exitCode
) {
2409 printUsage(message
);
2410 System
.exit(exitCode
);
2413 protected static void printUsage(final String shortName
, final String message
) {
2414 if (message
!= null && message
.length() > 0) {
2415 System
.err
.println(message
);
2417 System
.err
.print("Usage: hbase " + shortName
);
2418 System
.err
.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
2419 System
.err
.println();
2420 System
.err
.println("General Options:");
2421 System
.err
.println(" nomapred Run multiple clients using threads " +
2422 "(rather than use mapreduce)");
2423 System
.err
.println(" oneCon all the threads share the same connection. Default: False");
2424 System
.err
.println(" connCount connections all threads share. "
2425 + "For example, if set to 2, then all thread share 2 connection. "
2426 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2427 + "if not, connCount=thread number");
2429 System
.err
.println(" sampleRate Execute test on a sample of total " +
2430 "rows. Only supported by randomRead. Default: 1.0");
2431 System
.err
.println(" period Report every 'period' rows: " +
2432 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS
.getPerClientRunRows()/10);
2433 System
.err
.println(" cycles How many times to cycle the test. Defaults: 1.");
2434 System
.err
.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
2436 System
.err
.println(" latency Set to report operation latencies. Default: False");
2437 System
.err
.println(" measureAfter Start to measure the latency once 'measureAfter'" +
2438 " rows have been treated. Default: 0");
2439 System
.err
.println(" valueSize Pass value size to use: Default: "
2440 + DEFAULT_OPTS
.getValueSize());
2441 System
.err
.println(" valueRandom Set if we should vary value size between 0 and " +
2442 "'valueSize'; set on read for stats on size: Default: Not set.");
2443 System
.err
.println(" blockEncoding Block encoding to use. Value should be one of "
2444 + Arrays
.toString(DataBlockEncoding
.values()) + ". Default: NONE");
2445 System
.err
.println();
2446 System
.err
.println("Table Creation / Write Tests:");
2447 System
.err
.println(" table Alternate table name. Default: 'TestTable'");
2448 System
.err
.println(" rows Rows each client runs. Default: "
2449 + DEFAULT_OPTS
.getPerClientRunRows()
2450 + ". In case of randomReads and randomSeekScans this could"
2451 + " be specified along with --size to specify the number of rows to be scanned within"
2452 + " the total range specified by the size.");
2454 " size Total size in GiB. Mutually exclusive with --rows for writes and scans"
2455 + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2456 + " use size to specify the end range and --rows"
2457 + " specifies the number of rows within that range. " + "Default: 1.0.");
2458 System
.err
.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2459 System
.err
.println(" flushCommits Used to determine if the test should flush the table. " +
2461 System
.err
.println(" valueZipf Set if we should vary value size between 0 and " +
2462 "'valueSize' in zipf form: Default: Not set.");
2463 System
.err
.println(" writeToWAL Set writeToWAL on puts. Default: True");
2464 System
.err
.println(" autoFlush Set autoFlush on htable. Default: False");
2465 System
.err
.println(" multiPut Batch puts together into groups of N. Only supported " +
2466 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2467 System
.err
.println(" presplit Create presplit table. If a table with same name exists,"
2468 + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2469 + "Recommended for accurate perf analysis (see guide). Default: disabled");
2470 System
.err
.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
2472 System
.err
.println(" numoftags Specify the no of tags that would be needed. " +
2473 "This works only if usetags is true. Default: " + DEFAULT_OPTS
.noOfTags
);
2474 System
.err
.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
2475 System
.err
.println(" columns Columns to write per row. Default: 1");
2476 System
.err
.println(" families Specify number of column families for the table. Default: 1");
2477 System
.err
.println();
2478 System
.err
.println("Read Tests:");
2479 System
.err
.println(" filterAll Helps to filter out all the rows on the server side"
2480 + " there by not returning any thing back to the client. Helps to check the server side"
2481 + " performance. Uses FilterAllFilter internally. ");
2482 System
.err
.println(" multiGet Batch gets together into groups of N. Only supported " +
2483 "by randomRead. Default: disabled");
2484 System
.err
.println(" inmemory Tries to keep the HFiles of the CF " +
2485 "inmemory as far as possible. Not guaranteed that reads are always served " +
2486 "from memory. Default: false");
2487 System
.err
.println(" bloomFilter Bloom filter type, one of "
2488 + Arrays
.toString(BloomType
.values()));
2489 System
.err
.println(" blockSize Blocksize to use when writing out hfiles. ");
2490 System
.err
.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. "
2491 + "Uses the CompactingMemstore");
2492 System
.err
.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
2493 System
.err
.println(" replicas Enable region replica testing. Defaults: 1.");
2494 System
.err
.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
2495 System
.err
.println(" caching Scan caching to use. Default: 30");
2496 System
.err
.println(" asyncPrefetch Enable asyncPrefetch for scan");
2497 System
.err
.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true");
2498 System
.err
.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default");
2499 System
.err
.println(" bufferSize Set the value of client side buffering. Default: 2MB");
2500 System
.err
.println();
2501 System
.err
.println(" Note: -D properties will be applied to the conf used. ");
2502 System
.err
.println(" For example: ");
2503 System
.err
.println(" -Dmapreduce.output.fileoutputformat.compress=true");
2504 System
.err
.println(" -Dmapreduce.task.timeout=60000");
2505 System
.err
.println();
2506 System
.err
.println("Command:");
2507 for (CmdDescriptor command
: COMMANDS
.values()) {
2508 System
.err
.println(String
.format(" %-20s %s", command
.getName(), command
.getDescription()));
2510 System
.err
.println();
2511 System
.err
.println("Args:");
2512 System
.err
.println(" nclients Integer. Required. Total number of clients "
2513 + "(and HRegionServers) running. 1 <= value <= 500");
2514 System
.err
.println("Examples:");
2515 System
.err
.println(" To run a single client doing the default 1M sequentialWrites:");
2516 System
.err
.println(" $ hbase " + shortName
+ " sequentialWrite 1");
2517 System
.err
.println(" To run 10 clients doing increments over ten rows:");
2518 System
.err
.println(" $ hbase " + shortName
+ " --rows=10 --nomapred increment 10");
2522 * Parse options passed in via an arguments array. Assumes that array has been split
2523 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2524 * in the queue at the conclusion of this method call. It's up to the caller to deal
2525 * with these unrecognized arguments.
2527 static TestOptions
parseOpts(Queue
<String
> args
) {
2528 TestOptions opts
= new TestOptions();
2531 while ((cmd
= args
.poll()) != null) {
2532 if (cmd
.equals("-h") || cmd
.startsWith("--h")) {
2533 // place item back onto queue so that caller knows parsing was incomplete
2538 final String nmr
= "--nomapred";
2539 if (cmd
.startsWith(nmr
)) {
2540 opts
.nomapred
= true;
2544 final String rows
= "--rows=";
2545 if (cmd
.startsWith(rows
)) {
2546 opts
.perClientRunRows
= Integer
.parseInt(cmd
.substring(rows
.length()));
2550 final String cycles
= "--cycles=";
2551 if (cmd
.startsWith(cycles
)) {
2552 opts
.cycles
= Integer
.parseInt(cmd
.substring(cycles
.length()));
2556 final String sampleRate
= "--sampleRate=";
2557 if (cmd
.startsWith(sampleRate
)) {
2558 opts
.sampleRate
= Float
.parseFloat(cmd
.substring(sampleRate
.length()));
2562 final String table
= "--table=";
2563 if (cmd
.startsWith(table
)) {
2564 opts
.tableName
= cmd
.substring(table
.length());
2568 final String startRow
= "--startRow=";
2569 if (cmd
.startsWith(startRow
)) {
2570 opts
.startRow
= Integer
.parseInt(cmd
.substring(startRow
.length()));
2574 final String compress
= "--compress=";
2575 if (cmd
.startsWith(compress
)) {
2576 opts
.compression
= Compression
.Algorithm
.valueOf(cmd
.substring(compress
.length()));
2580 final String traceRate
= "--traceRate=";
2581 if (cmd
.startsWith(traceRate
)) {
2582 opts
.traceRate
= Double
.parseDouble(cmd
.substring(traceRate
.length()));
2586 final String blockEncoding
= "--blockEncoding=";
2587 if (cmd
.startsWith(blockEncoding
)) {
2588 opts
.blockEncoding
= DataBlockEncoding
.valueOf(cmd
.substring(blockEncoding
.length()));
2592 final String flushCommits
= "--flushCommits=";
2593 if (cmd
.startsWith(flushCommits
)) {
2594 opts
.flushCommits
= Boolean
.parseBoolean(cmd
.substring(flushCommits
.length()));
2598 final String writeToWAL
= "--writeToWAL=";
2599 if (cmd
.startsWith(writeToWAL
)) {
2600 opts
.writeToWAL
= Boolean
.parseBoolean(cmd
.substring(writeToWAL
.length()));
2604 final String presplit
= "--presplit=";
2605 if (cmd
.startsWith(presplit
)) {
2606 opts
.presplitRegions
= Integer
.parseInt(cmd
.substring(presplit
.length()));
2610 final String inMemory
= "--inmemory=";
2611 if (cmd
.startsWith(inMemory
)) {
2612 opts
.inMemoryCF
= Boolean
.parseBoolean(cmd
.substring(inMemory
.length()));
2616 final String autoFlush
= "--autoFlush=";
2617 if (cmd
.startsWith(autoFlush
)) {
2618 opts
.autoFlush
= Boolean
.parseBoolean(cmd
.substring(autoFlush
.length()));
2619 if (!opts
.autoFlush
&& opts
.multiPut
> 0) {
2620 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2625 final String onceCon
= "--oneCon=";
2626 if (cmd
.startsWith(onceCon
)) {
2627 opts
.oneCon
= Boolean
.parseBoolean(cmd
.substring(onceCon
.length()));
2628 if (opts
.oneCon
&& opts
.connCount
> 1) {
2629 throw new IllegalArgumentException("oneCon is set to true, "
2630 + "connCount should not bigger than 1");
2635 final String connCount
= "--connCount=";
2636 if (cmd
.startsWith(connCount
)) {
2637 opts
.connCount
= Integer
.parseInt(cmd
.substring(connCount
.length()));
2638 if (opts
.oneCon
&& opts
.connCount
> 1) {
2639 throw new IllegalArgumentException("oneCon is set to true, "
2640 + "connCount should not bigger than 1");
2645 final String latency
= "--latency";
2646 if (cmd
.startsWith(latency
)) {
2647 opts
.reportLatency
= true;
2651 final String multiGet
= "--multiGet=";
2652 if (cmd
.startsWith(multiGet
)) {
2653 opts
.multiGet
= Integer
.parseInt(cmd
.substring(multiGet
.length()));
2657 final String multiPut
= "--multiPut=";
2658 if (cmd
.startsWith(multiPut
)) {
2659 opts
.multiPut
= Integer
.parseInt(cmd
.substring(multiPut
.length()));
2660 if (!opts
.autoFlush
&& opts
.multiPut
> 0) {
2661 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2666 final String useTags
= "--usetags=";
2667 if (cmd
.startsWith(useTags
)) {
2668 opts
.useTags
= Boolean
.parseBoolean(cmd
.substring(useTags
.length()));
2672 final String noOfTags
= "--numoftags=";
2673 if (cmd
.startsWith(noOfTags
)) {
2674 opts
.noOfTags
= Integer
.parseInt(cmd
.substring(noOfTags
.length()));
2678 final String replicas
= "--replicas=";
2679 if (cmd
.startsWith(replicas
)) {
2680 opts
.replicas
= Integer
.parseInt(cmd
.substring(replicas
.length()));
2684 final String filterOutAll
= "--filterAll";
2685 if (cmd
.startsWith(filterOutAll
)) {
2686 opts
.filterAll
= true;
2690 final String size
= "--size=";
2691 if (cmd
.startsWith(size
)) {
2692 opts
.size
= Float
.parseFloat(cmd
.substring(size
.length()));
2693 if (opts
.size
<= 1.0f
) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2697 final String splitPolicy
= "--splitPolicy=";
2698 if (cmd
.startsWith(splitPolicy
)) {
2699 opts
.splitPolicy
= cmd
.substring(splitPolicy
.length());
2703 final String randomSleep
= "--randomSleep=";
2704 if (cmd
.startsWith(randomSleep
)) {
2705 opts
.randomSleep
= Integer
.parseInt(cmd
.substring(randomSleep
.length()));
2709 final String measureAfter
= "--measureAfter=";
2710 if (cmd
.startsWith(measureAfter
)) {
2711 opts
.measureAfter
= Integer
.parseInt(cmd
.substring(measureAfter
.length()));
2715 final String bloomFilter
= "--bloomFilter=";
2716 if (cmd
.startsWith(bloomFilter
)) {
2717 opts
.bloomType
= BloomType
.valueOf(cmd
.substring(bloomFilter
.length()));
2721 final String blockSize
= "--blockSize=";
2722 if(cmd
.startsWith(blockSize
) ) {
2723 opts
.blockSize
= Integer
.parseInt(cmd
.substring(blockSize
.length()));
2727 final String valueSize
= "--valueSize=";
2728 if (cmd
.startsWith(valueSize
)) {
2729 opts
.valueSize
= Integer
.parseInt(cmd
.substring(valueSize
.length()));
2733 final String valueRandom
= "--valueRandom";
2734 if (cmd
.startsWith(valueRandom
)) {
2735 opts
.valueRandom
= true;
2736 if (opts
.valueZipf
) {
2737 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2742 final String valueZipf
= "--valueZipf";
2743 if (cmd
.startsWith(valueZipf
)) {
2744 opts
.valueZipf
= true;
2745 if (opts
.valueRandom
) {
2746 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2751 final String period
= "--period=";
2752 if (cmd
.startsWith(period
)) {
2753 opts
.period
= Integer
.parseInt(cmd
.substring(period
.length()));
2757 final String addColumns
= "--addColumns=";
2758 if (cmd
.startsWith(addColumns
)) {
2759 opts
.addColumns
= Boolean
.parseBoolean(cmd
.substring(addColumns
.length()));
2763 final String inMemoryCompaction
= "--inmemoryCompaction=";
2764 if (cmd
.startsWith(inMemoryCompaction
)) {
2765 opts
.inMemoryCompaction
=
2766 MemoryCompactionPolicy
.valueOf(cmd
.substring(inMemoryCompaction
.length()));
2770 final String columns
= "--columns=";
2771 if (cmd
.startsWith(columns
)) {
2772 opts
.columns
= Integer
.parseInt(cmd
.substring(columns
.length()));
2776 final String families
= "--families=";
2777 if (cmd
.startsWith(families
)) {
2778 opts
.families
= Integer
.parseInt(cmd
.substring(families
.length()));
2782 final String caching
= "--caching=";
2783 if (cmd
.startsWith(caching
)) {
2784 opts
.caching
= Integer
.parseInt(cmd
.substring(caching
.length()));
2788 final String asyncPrefetch
= "--asyncPrefetch";
2789 if (cmd
.startsWith(asyncPrefetch
)) {
2790 opts
.asyncPrefetch
= true;
2794 final String cacheBlocks
= "--cacheBlocks=";
2795 if (cmd
.startsWith(cacheBlocks
)) {
2796 opts
.cacheBlocks
= Boolean
.parseBoolean(cmd
.substring(cacheBlocks
.length()));
2800 final String scanReadType
= "--scanReadType=";
2801 if (cmd
.startsWith(scanReadType
)) {
2803 Scan
.ReadType
.valueOf(cmd
.substring(scanReadType
.length()).toUpperCase());
2807 final String bufferSize
= "--bufferSize=";
2808 if (cmd
.startsWith(bufferSize
)) {
2809 opts
.bufferSize
= Long
.parseLong(cmd
.substring(bufferSize
.length()));
2813 if (isCommandClass(cmd
)) {
2816 opts
.numClientThreads
= Integer
.parseInt(args
.remove());
2817 } catch (NoSuchElementException
| NumberFormatException e
) {
2818 throw new IllegalArgumentException("Command " + cmd
+ " does not have threads number", e
);
2820 opts
= calculateRowsAndSize(opts
);
2823 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd
, -1);
2826 // Not matching any option or command.
2827 System
.err
.println("Error: Wrong option or command: " + cmd
);
2834 static TestOptions
calculateRowsAndSize(final TestOptions opts
) {
2835 int rowsPerGB
= getRowsPerGB(opts
);
2836 if ((opts
.getCmdName() != null
2837 && (opts
.getCmdName().equals(RANDOM_READ
) || opts
.getCmdName().equals(RANDOM_SEEK_SCAN
)))
2838 && opts
.size
!= DEFAULT_OPTS
.size
2839 && opts
.perClientRunRows
!= DEFAULT_OPTS
.perClientRunRows
) {
2840 opts
.totalRows
= (int) opts
.size
* rowsPerGB
;
2841 } else if (opts
.size
!= DEFAULT_OPTS
.size
) {
2842 // total size in GB specified
2843 opts
.totalRows
= (int) opts
.size
* rowsPerGB
;
2844 opts
.perClientRunRows
= opts
.totalRows
/ opts
.numClientThreads
;
2846 opts
.totalRows
= opts
.perClientRunRows
* opts
.numClientThreads
;
2847 opts
.size
= opts
.totalRows
/ rowsPerGB
;
2852 static int getRowsPerGB(final TestOptions opts
) {
2853 return ONE_GB
/ ((opts
.valueRandom? opts
.valueSize
/2: opts
.valueSize
) * opts
.getFamilies() *
2858 public int run(String
[] args
) throws Exception
{
2859 // Process command-line args. TODO: Better cmd-line processing
2860 // (but hopefully something not as painful as cli options).
2862 if (args
.length
< 1) {
2868 LinkedList
<String
> argv
= new LinkedList
<>();
2869 argv
.addAll(Arrays
.asList(args
));
2870 TestOptions opts
= parseOpts(argv
);
2872 // args remaining, print help and exit
2873 if (!argv
.isEmpty()) {
2879 // must run at least 1 client
2880 if (opts
.numClientThreads
<= 0) {
2881 throw new IllegalArgumentException("Number of clients must be > 0");
2884 // cmdName should not be null, print help and exit
2885 if (opts
.cmdName
== null) {
2890 Class
<?
extends TestBase
> cmdClass
= determineCommandClass(opts
.cmdName
);
2891 if (cmdClass
!= null) {
2892 runTest(cmdClass
, opts
);
2896 } catch (Exception e
) {
2897 e
.printStackTrace();
2903 private static boolean isCommandClass(String cmd
) {
2904 return COMMANDS
.containsKey(cmd
);
2907 private static Class
<?
extends TestBase
> determineCommandClass(String cmd
) {
2908 CmdDescriptor descriptor
= COMMANDS
.get(cmd
);
2909 return descriptor
!= null ? descriptor
.getCmdClass() : null;
2912 public static void main(final String
[] args
) throws Exception
{
2913 int res
= ToolRunner
.run(new PerformanceEvaluation(HBaseConfiguration
.create()), args
);