3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import com
.codahale
.metrics
.Histogram
;
22 import com
.codahale
.metrics
.UniformReservoir
;
23 import java
.io
.IOException
;
24 import java
.io
.PrintStream
;
25 import java
.lang
.reflect
.Constructor
;
26 import java
.math
.BigDecimal
;
27 import java
.math
.MathContext
;
28 import java
.text
.DecimalFormat
;
29 import java
.text
.SimpleDateFormat
;
30 import java
.util
.ArrayList
;
31 import java
.util
.Arrays
;
32 import java
.util
.Date
;
33 import java
.util
.LinkedList
;
34 import java
.util
.Locale
;
36 import java
.util
.NoSuchElementException
;
37 import java
.util
.Queue
;
38 import java
.util
.Random
;
39 import java
.util
.TreeMap
;
40 import java
.util
.concurrent
.Callable
;
41 import java
.util
.concurrent
.ExecutionException
;
42 import java
.util
.concurrent
.ExecutorService
;
43 import java
.util
.concurrent
.Executors
;
44 import java
.util
.concurrent
.Future
;
45 import org
.apache
.commons
.lang3
.StringUtils
;
46 import org
.apache
.hadoop
.conf
.Configuration
;
47 import org
.apache
.hadoop
.conf
.Configured
;
48 import org
.apache
.hadoop
.fs
.FileSystem
;
49 import org
.apache
.hadoop
.fs
.Path
;
50 import org
.apache
.hadoop
.hbase
.client
.Admin
;
51 import org
.apache
.hadoop
.hbase
.client
.Append
;
52 import org
.apache
.hadoop
.hbase
.client
.AsyncConnection
;
53 import org
.apache
.hadoop
.hbase
.client
.AsyncTable
;
54 import org
.apache
.hadoop
.hbase
.client
.BufferedMutator
;
55 import org
.apache
.hadoop
.hbase
.client
.BufferedMutatorParams
;
56 import org
.apache
.hadoop
.hbase
.client
.Connection
;
57 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
58 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
59 import org
.apache
.hadoop
.hbase
.client
.Delete
;
60 import org
.apache
.hadoop
.hbase
.client
.Durability
;
61 import org
.apache
.hadoop
.hbase
.client
.Get
;
62 import org
.apache
.hadoop
.hbase
.client
.Increment
;
63 import org
.apache
.hadoop
.hbase
.client
.Put
;
64 import org
.apache
.hadoop
.hbase
.client
.Result
;
65 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
66 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
67 import org
.apache
.hadoop
.hbase
.client
.Scan
;
68 import org
.apache
.hadoop
.hbase
.client
.Table
;
69 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
70 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
71 import org
.apache
.hadoop
.hbase
.filter
.Filter
;
72 import org
.apache
.hadoop
.hbase
.filter
.FilterAllFilter
;
73 import org
.apache
.hadoop
.hbase
.filter
.FilterList
;
74 import org
.apache
.hadoop
.hbase
.filter
.PageFilter
;
75 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
76 import org
.apache
.hadoop
.hbase
.filter
.WhileMatchFilter
;
77 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
78 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
;
79 import org
.apache
.hadoop
.hbase
.io
.hfile
.RandomDistribution
;
80 import org
.apache
.hadoop
.hbase
.mapreduce
.TableMapReduceUtil
;
81 import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
82 import org
.apache
.hadoop
.hbase
.regionserver
.CompactingMemStore
;
83 import org
.apache
.hadoop
.hbase
.trace
.HBaseHTraceConfiguration
;
84 import org
.apache
.hadoop
.hbase
.trace
.SpanReceiverHost
;
85 import org
.apache
.hadoop
.hbase
.trace
.TraceUtil
;
86 import org
.apache
.hadoop
.hbase
.util
.ByteArrayHashKey
;
87 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
88 import org
.apache
.hadoop
.hbase
.util
.GsonUtil
;
89 import org
.apache
.hadoop
.hbase
.util
.Hash
;
90 import org
.apache
.hadoop
.hbase
.util
.MurmurHash
;
91 import org
.apache
.hadoop
.hbase
.util
.Pair
;
92 import org
.apache
.hadoop
.hbase
.util
.YammerHistogramUtils
;
93 import org
.apache
.hadoop
.io
.LongWritable
;
94 import org
.apache
.hadoop
.io
.Text
;
95 import org
.apache
.hadoop
.mapreduce
.Job
;
96 import org
.apache
.hadoop
.mapreduce
.Mapper
;
97 import org
.apache
.hadoop
.mapreduce
.lib
.input
.NLineInputFormat
;
98 import org
.apache
.hadoop
.mapreduce
.lib
.output
.TextOutputFormat
;
99 import org
.apache
.hadoop
.mapreduce
.lib
.reduce
.LongSumReducer
;
100 import org
.apache
.hadoop
.util
.Tool
;
101 import org
.apache
.hadoop
.util
.ToolRunner
;
102 import org
.apache
.htrace
.core
.ProbabilitySampler
;
103 import org
.apache
.htrace
.core
.Sampler
;
104 import org
.apache
.htrace
.core
.TraceScope
;
105 import org
.apache
.yetus
.audience
.InterfaceAudience
;
106 import org
.slf4j
.Logger
;
107 import org
.slf4j
.LoggerFactory
;
109 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.MoreObjects
;
110 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.ThreadFactoryBuilder
;
111 import org
.apache
.hbase
.thirdparty
.com
.google
.gson
.Gson
;
114 * Script used evaluating HBase performance and scalability. Runs a HBase
115 * client that steps through one of a set of hardcoded tests or 'experiments'
116 * (e.g. a random reads test, a random writes test, etc.). Pass on the
117 * command-line which test to run and how many clients are participating in
118 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
120 * <p>This class sets up and runs the evaluation programs described in
121 * Section 7, <i>Performance Evaluation</i>, of the <a
122 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
125 * <p>By default, runs as a mapreduce job where each mapper runs a single test
126 * client. Can also run as a non-mapreduce, multithreaded application by
127 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
128 * specified otherwise.
130 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
131 public class PerformanceEvaluation
extends Configured
implements Tool
{
132 static final String RANDOM_SEEK_SCAN
= "randomSeekScan";
133 static final String RANDOM_READ
= "randomRead";
134 static final String PE_COMMAND_SHORTNAME
= "pe";
135 private static final Logger LOG
= LoggerFactory
.getLogger(PerformanceEvaluation
.class.getName());
136 private static final Gson GSON
= GsonUtil
.createGson().create();
138 public static final String TABLE_NAME
= "TestTable";
139 public static final String FAMILY_NAME_BASE
= "info";
140 public static final byte[] FAMILY_ZERO
= Bytes
.toBytes("info0");
141 public static final byte[] COLUMN_ZERO
= Bytes
.toBytes("" + 0);
142 public static final int DEFAULT_VALUE_LENGTH
= 1000;
143 public static final int ROW_LENGTH
= 26;
145 private static final int ONE_GB
= 1024 * 1024 * 1000;
146 private static final int DEFAULT_ROWS_PER_GB
= ONE_GB
/ DEFAULT_VALUE_LENGTH
;
147 // TODO : should we make this configurable
148 private static final int TAG_LENGTH
= 256;
149 private static final DecimalFormat FMT
= new DecimalFormat("0.##");
150 private static final MathContext CXT
= MathContext
.DECIMAL64
;
151 private static final BigDecimal MS_PER_SEC
= BigDecimal
.valueOf(1000);
152 private static final BigDecimal BYTES_PER_MB
= BigDecimal
.valueOf(1024 * 1024);
153 private static final TestOptions DEFAULT_OPTS
= new TestOptions();
155 private static Map
<String
, CmdDescriptor
> COMMANDS
= new TreeMap
<>();
156 private static final Path PERF_EVAL_DIR
= new Path("performance_evaluation");
159 addCommandDescriptor(AsyncRandomReadTest
.class, "asyncRandomRead",
160 "Run async random read test");
161 addCommandDescriptor(AsyncRandomWriteTest
.class, "asyncRandomWrite",
162 "Run async random write test");
163 addCommandDescriptor(AsyncSequentialReadTest
.class, "asyncSequentialRead",
164 "Run async sequential read test");
165 addCommandDescriptor(AsyncSequentialWriteTest
.class, "asyncSequentialWrite",
166 "Run async sequential write test");
167 addCommandDescriptor(AsyncScanTest
.class, "asyncScan",
168 "Run async scan test (read every row)");
169 addCommandDescriptor(RandomReadTest
.class, RANDOM_READ
,
170 "Run random read test");
171 addCommandDescriptor(RandomSeekScanTest
.class, RANDOM_SEEK_SCAN
,
172 "Run random seek and scan 100 test");
173 addCommandDescriptor(RandomScanWithRange10Test
.class, "scanRange10",
174 "Run random seek scan with both start and stop row (max 10 rows)");
175 addCommandDescriptor(RandomScanWithRange100Test
.class, "scanRange100",
176 "Run random seek scan with both start and stop row (max 100 rows)");
177 addCommandDescriptor(RandomScanWithRange1000Test
.class, "scanRange1000",
178 "Run random seek scan with both start and stop row (max 1000 rows)");
179 addCommandDescriptor(RandomScanWithRange10000Test
.class, "scanRange10000",
180 "Run random seek scan with both start and stop row (max 10000 rows)");
181 addCommandDescriptor(RandomWriteTest
.class, "randomWrite",
182 "Run random write test");
183 addCommandDescriptor(SequentialReadTest
.class, "sequentialRead",
184 "Run sequential read test");
185 addCommandDescriptor(SequentialWriteTest
.class, "sequentialWrite",
186 "Run sequential write test");
187 addCommandDescriptor(ScanTest
.class, "scan",
188 "Run scan test (read every row)");
189 addCommandDescriptor(FilteredScanTest
.class, "filterScan",
190 "Run scan test using a filter to find a specific row based on it's value " +
191 "(make sure to use --rows=20)");
192 addCommandDescriptor(IncrementTest
.class, "increment",
193 "Increment on each row; clients overlap on keyspace so some concurrent operations");
194 addCommandDescriptor(AppendTest
.class, "append",
195 "Append on each row; clients overlap on keyspace so some concurrent operations");
196 addCommandDescriptor(CheckAndMutateTest
.class, "checkAndMutate",
197 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
198 addCommandDescriptor(CheckAndPutTest
.class, "checkAndPut",
199 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
200 addCommandDescriptor(CheckAndDeleteTest
.class, "checkAndDelete",
201 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
205 * Enum for map metrics. Keep it out here rather than inside in the Map
206 * inner-class so we can find associated properties.
208 protected static enum Counter
{
211 /** number of rows */
215 protected static class RunResult
implements Comparable
<RunResult
> {
216 public RunResult(long duration
, Histogram hist
) {
217 this.duration
= duration
;
221 public final long duration
;
222 public final Histogram hist
;
225 public String
toString() {
226 return Long
.toString(duration
);
229 @Override public int compareTo(RunResult o
) {
230 return Long
.compare(this.duration
, o
.duration
);
236 * @param conf Configuration object
238 public PerformanceEvaluation(final Configuration conf
) {
242 protected static void addCommandDescriptor(Class
<?
extends TestBase
> cmdClass
,
243 String name
, String description
) {
244 CmdDescriptor cmdDescriptor
= new CmdDescriptor(cmdClass
, name
, description
);
245 COMMANDS
.put(name
, cmdDescriptor
);
249 * Implementations can have their status set.
254 * @param msg status message
255 * @throws IOException
257 void setStatus(final String msg
) throws IOException
;
261 * MapReduce job that runs a performance evaluation client in each map task.
263 public static class EvaluationMapTask
264 extends Mapper
<LongWritable
, Text
, LongWritable
, LongWritable
> {
266 /** configuration parameter name that contains the command */
267 public final static String CMD_KEY
= "EvaluationMapTask.command";
268 /** configuration parameter name that contains the PE impl */
269 public static final String PE_KEY
= "EvaluationMapTask.performanceEvalImpl";
271 private Class
<?
extends Test
> cmd
;
274 protected void setup(Context context
) throws IOException
, InterruptedException
{
275 this.cmd
= forName(context
.getConfiguration().get(CMD_KEY
), Test
.class);
277 // this is required so that extensions of PE are instantiated within the
278 // map reduce task...
279 Class
<?
extends PerformanceEvaluation
> peClass
=
280 forName(context
.getConfiguration().get(PE_KEY
), PerformanceEvaluation
.class);
282 peClass
.getConstructor(Configuration
.class).newInstance(context
.getConfiguration());
283 } catch (Exception e
) {
284 throw new IllegalStateException("Could not instantiate PE instance", e
);
288 private <Type
> Class
<?
extends Type
> forName(String className
, Class
<Type
> type
) {
290 return Class
.forName(className
).asSubclass(type
);
291 } catch (ClassNotFoundException e
) {
292 throw new IllegalStateException("Could not find class for name: " + className
, e
);
297 protected void map(LongWritable key
, Text value
, final Context context
)
298 throws IOException
, InterruptedException
{
300 Status status
= new Status() {
302 public void setStatus(String msg
) {
303 context
.setStatus(msg
);
307 TestOptions opts
= GSON
.fromJson(value
.toString(), TestOptions
.class);
308 Configuration conf
= HBaseConfiguration
.create(context
.getConfiguration());
309 final Connection con
= ConnectionFactory
.createConnection(conf
);
310 AsyncConnection asyncCon
= null;
312 asyncCon
= ConnectionFactory
.createAsyncConnection(conf
).get();
313 } catch (ExecutionException e
) {
314 throw new IOException(e
);
318 RunResult result
= PerformanceEvaluation
.runOneClient(this.cmd
, conf
, con
, asyncCon
, opts
, status
);
319 // Collect how much time the thing took. Report as map output and
320 // to the ELAPSED_TIME counter.
321 context
.getCounter(Counter
.ELAPSED_TIME
).increment(result
.duration
);
322 context
.getCounter(Counter
.ROWS
).increment(opts
.perClientRunRows
);
323 context
.write(new LongWritable(opts
.startRow
), new LongWritable(result
.duration
));
329 * If table does not already exist, create. Also create a table when
330 * {@code opts.presplitRegions} is specified or when the existing table's
331 * region replica count doesn't match {@code opts.replicas}.
333 static boolean checkTable(Admin admin
, TestOptions opts
) throws IOException
{
334 TableName tableName
= TableName
.valueOf(opts
.tableName
);
335 boolean needsDelete
= false, exists
= admin
.tableExists(tableName
);
336 boolean isReadCmd
= opts
.cmdName
.toLowerCase(Locale
.ROOT
).contains("read")
337 || opts
.cmdName
.toLowerCase(Locale
.ROOT
).contains("scan");
338 if (!exists
&& isReadCmd
) {
339 throw new IllegalStateException(
340 "Must specify an existing table for read commands. Run a write command first.");
342 HTableDescriptor desc
=
343 exists ?
new HTableDescriptor(admin
.getDescriptor(TableName
.valueOf(opts
.tableName
))) : null;
344 byte[][] splits
= getSplits(opts
);
346 // recreate the table when user has requested presplit or when existing
347 // {RegionSplitPolicy,replica count} does not match requested, or when the
348 // number of column families does not match requested.
349 if ((exists
&& opts
.presplitRegions
!= DEFAULT_OPTS
.presplitRegions
)
350 || (!isReadCmd
&& desc
!= null &&
351 !StringUtils
.equals(desc
.getRegionSplitPolicyClassName(), opts
.splitPolicy
))
352 || (!isReadCmd
&& desc
!= null && desc
.getRegionReplication() != opts
.replicas
)
353 || (desc
!= null && desc
.getColumnFamilyCount() != opts
.families
)) {
355 // wait, why did it delete my table?!?
356 LOG
.debug(MoreObjects
.toStringHelper("needsDelete")
357 .add("needsDelete", needsDelete
)
358 .add("isReadCmd", isReadCmd
)
359 .add("exists", exists
)
361 .add("presplit", opts
.presplitRegions
)
362 .add("splitPolicy", opts
.splitPolicy
)
363 .add("replicas", opts
.replicas
)
364 .add("families", opts
.families
)
368 // remove an existing table
370 if (admin
.isTableEnabled(tableName
)) {
371 admin
.disableTable(tableName
);
373 admin
.deleteTable(tableName
);
376 // table creation is necessary
377 if (!exists
|| needsDelete
) {
378 desc
= getTableDescriptor(opts
);
379 if (splits
!= null) {
380 if (LOG
.isDebugEnabled()) {
381 for (int i
= 0; i
< splits
.length
; i
++) {
382 LOG
.debug(" split " + i
+ ": " + Bytes
.toStringBinary(splits
[i
]));
386 admin
.createTable(desc
, splits
);
387 LOG
.info("Table " + desc
+ " created");
389 return admin
.tableExists(tableName
);
393 * Create an HTableDescriptor from provided TestOptions.
395 protected static HTableDescriptor
getTableDescriptor(TestOptions opts
) {
396 HTableDescriptor tableDesc
= new HTableDescriptor(TableName
.valueOf(opts
.tableName
));
397 for (int family
= 0; family
< opts
.families
; family
++) {
398 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
399 HColumnDescriptor familyDesc
= new HColumnDescriptor(familyName
);
400 familyDesc
.setDataBlockEncoding(opts
.blockEncoding
);
401 familyDesc
.setCompressionType(opts
.compression
);
402 familyDesc
.setBloomFilterType(opts
.bloomType
);
403 familyDesc
.setBlocksize(opts
.blockSize
);
404 if (opts
.inMemoryCF
) {
405 familyDesc
.setInMemory(true);
407 familyDesc
.setInMemoryCompaction(opts
.inMemoryCompaction
);
408 tableDesc
.addFamily(familyDesc
);
410 if (opts
.replicas
!= DEFAULT_OPTS
.replicas
) {
411 tableDesc
.setRegionReplication(opts
.replicas
);
413 if (opts
.splitPolicy
!= null && !opts
.splitPolicy
.equals(DEFAULT_OPTS
.splitPolicy
)) {
414 tableDesc
.setRegionSplitPolicyClassName(opts
.splitPolicy
);
420 * generates splits based on total number of rows and specified split regions
422 protected static byte[][] getSplits(TestOptions opts
) {
423 if (opts
.presplitRegions
== DEFAULT_OPTS
.presplitRegions
)
426 int numSplitPoints
= opts
.presplitRegions
- 1;
427 byte[][] splits
= new byte[numSplitPoints
][];
428 int jump
= opts
.totalRows
/ opts
.presplitRegions
;
429 for (int i
= 0; i
< numSplitPoints
; i
++) {
430 int rowkey
= jump
* (1 + i
);
431 splits
[i
] = format(rowkey
);
436 static void setupConnectionCount(final TestOptions opts
) {
440 if (opts
.connCount
== -1) {
441 // set to thread number if connCount is not set
442 opts
.connCount
= opts
.numClientThreads
;
448 * Run all clients in this vm each to its own thread.
450 static RunResult
[] doLocalClients(final TestOptions opts
, final Configuration conf
)
451 throws IOException
, InterruptedException
, ExecutionException
{
452 final Class
<?
extends TestBase
> cmd
= determineCommandClass(opts
.cmdName
);
454 @SuppressWarnings("unchecked")
455 Future
<RunResult
>[] threads
= new Future
[opts
.numClientThreads
];
456 RunResult
[] results
= new RunResult
[opts
.numClientThreads
];
457 ExecutorService pool
= Executors
.newFixedThreadPool(opts
.numClientThreads
,
458 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
459 setupConnectionCount(opts
);
460 final Connection
[] cons
= new Connection
[opts
.connCount
];
461 final AsyncConnection
[] asyncCons
= new AsyncConnection
[opts
.connCount
];
462 for (int i
= 0; i
< opts
.connCount
; i
++) {
463 cons
[i
] = ConnectionFactory
.createConnection(conf
);
464 asyncCons
[i
] = ConnectionFactory
.createAsyncConnection(conf
).get();
466 LOG
.info("Created " + opts
.connCount
+ " connections for " +
467 opts
.numClientThreads
+ " threads");
468 for (int i
= 0; i
< threads
.length
; i
++) {
470 threads
[i
] = pool
.submit(new Callable
<RunResult
>() {
472 public RunResult
call() throws Exception
{
473 TestOptions threadOpts
= new TestOptions(opts
);
474 final Connection con
= cons
[index
% cons
.length
];
475 final AsyncConnection asyncCon
= asyncCons
[index
% asyncCons
.length
];
476 if (threadOpts
.startRow
== 0) threadOpts
.startRow
= index
* threadOpts
.perClientRunRows
;
477 RunResult run
= runOneClient(cmd
, conf
, con
, asyncCon
, threadOpts
, new Status() {
479 public void setStatus(final String msg
) throws IOException
{
483 LOG
.info("Finished " + Thread
.currentThread().getName() + " in " + run
.duration
+
484 "ms over " + threadOpts
.perClientRunRows
+ " rows");
491 for (int i
= 0; i
< threads
.length
; i
++) {
493 results
[i
] = threads
[i
].get();
494 } catch (ExecutionException e
) {
495 throw new IOException(e
.getCause());
498 final String test
= cmd
.getSimpleName();
499 LOG
.info("[" + test
+ "] Summary of timings (ms): "
500 + Arrays
.toString(results
));
501 Arrays
.sort(results
);
503 float avgLatency
= 0 ;
505 for (RunResult result
: results
) {
506 total
+= result
.duration
;
507 avgLatency
+= result
.hist
.getSnapshot().getMean();
508 avgTPS
+= opts
.perClientRunRows
* 1.0f
/ result
.duration
;
510 avgTPS
*= 1000; // ms to second
511 avgLatency
= avgLatency
/ results
.length
;
512 LOG
.info("[" + test
+ " duration ]"
513 + "\tMin: " + results
[0] + "ms"
514 + "\tMax: " + results
[results
.length
- 1] + "ms"
515 + "\tAvg: " + (total
/ results
.length
) + "ms");
516 LOG
.info("[ Avg latency (us)]\t" + Math
.round(avgLatency
));
517 LOG
.info("[ Avg TPS/QPS]\t" + Math
.round(avgTPS
) + "\t row per second");
518 for (int i
= 0; i
< opts
.connCount
; i
++) {
520 asyncCons
[i
].close();
528 * Run a mapreduce job. Run as many maps as asked-for clients.
529 * Before we start up the job, write out an input file with instruction
530 * per client regards which row they are to start on.
531 * @param cmd Command to run.
532 * @throws IOException
534 static Job
doMapReduce(TestOptions opts
, final Configuration conf
)
535 throws IOException
, InterruptedException
, ClassNotFoundException
{
536 final Class
<?
extends TestBase
> cmd
= determineCommandClass(opts
.cmdName
);
538 Path inputDir
= writeInputFile(conf
, opts
);
539 conf
.set(EvaluationMapTask
.CMD_KEY
, cmd
.getName());
540 conf
.set(EvaluationMapTask
.PE_KEY
, PerformanceEvaluation
.class.getName());
541 Job job
= Job
.getInstance(conf
);
542 job
.setJarByClass(PerformanceEvaluation
.class);
543 job
.setJobName("HBase Performance Evaluation - " + opts
.cmdName
);
545 job
.setInputFormatClass(NLineInputFormat
.class);
546 NLineInputFormat
.setInputPaths(job
, inputDir
);
547 // this is default, but be explicit about it just in case.
548 NLineInputFormat
.setNumLinesPerSplit(job
, 1);
550 job
.setOutputKeyClass(LongWritable
.class);
551 job
.setOutputValueClass(LongWritable
.class);
553 job
.setMapperClass(EvaluationMapTask
.class);
554 job
.setReducerClass(LongSumReducer
.class);
556 job
.setNumReduceTasks(1);
558 job
.setOutputFormatClass(TextOutputFormat
.class);
559 TextOutputFormat
.setOutputPath(job
, new Path(inputDir
.getParent(), "outputs"));
561 TableMapReduceUtil
.addDependencyJars(job
);
562 TableMapReduceUtil
.addDependencyJarsForClasses(job
.getConfiguration(),
563 Histogram
.class, // yammer metrics
565 FilterAllFilter
.class // hbase-server tests jar
568 TableMapReduceUtil
.initCredentials(job
);
570 job
.waitForCompletion(true);
575 * Each client has one mapper to do the work, and client do the resulting count in a map task.
578 static String JOB_INPUT_FILENAME
= "input.txt";
581 * Write input file of offsets-per-client for the mapreduce job.
582 * @param c Configuration
583 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
584 * @throws IOException
586 static Path
writeInputFile(final Configuration c
, final TestOptions opts
) throws IOException
{
587 return writeInputFile(c
, opts
, new Path("."));
590 static Path
writeInputFile(final Configuration c
, final TestOptions opts
, final Path basedir
)
592 SimpleDateFormat formatter
= new SimpleDateFormat("yyyyMMddHHmmss");
593 Path jobdir
= new Path(new Path(basedir
, PERF_EVAL_DIR
), formatter
.format(new Date()));
594 Path inputDir
= new Path(jobdir
, "inputs");
596 FileSystem fs
= FileSystem
.get(c
);
599 Path inputFile
= new Path(inputDir
, JOB_INPUT_FILENAME
);
600 PrintStream out
= new PrintStream(fs
.create(inputFile
));
601 // Make input random.
602 Map
<Integer
, String
> m
= new TreeMap
<>();
603 Hash h
= MurmurHash
.getInstance();
604 int perClientRows
= (opts
.totalRows
/ opts
.numClientThreads
);
606 for (int j
= 0; j
< opts
.numClientThreads
; j
++) {
607 TestOptions next
= new TestOptions(opts
);
608 next
.startRow
= j
* perClientRows
;
609 next
.perClientRunRows
= perClientRows
;
610 String s
= GSON
.toJson(next
);
611 LOG
.info("Client=" + j
+ ", input=" + s
);
612 byte[] b
= Bytes
.toBytes(s
);
613 int hash
= h
.hash(new ByteArrayHashKey(b
, 0, b
.length
), -1);
616 for (Map
.Entry
<Integer
, String
> e
: m
.entrySet()) {
617 out
.println(e
.getValue());
626 * Describes a command.
628 static class CmdDescriptor
{
629 private Class
<?
extends TestBase
> cmdClass
;
631 private String description
;
633 CmdDescriptor(Class
<?
extends TestBase
> cmdClass
, String name
, String description
) {
634 this.cmdClass
= cmdClass
;
636 this.description
= description
;
639 public Class
<?
extends TestBase
> getCmdClass() {
643 public String
getName() {
647 public String
getDescription() {
653 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
654 * This makes tracking all these arguments a little easier.
655 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
656 * serialization of this TestOptions class behave), and you need to add to the clone constructor
657 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below.
659 static class TestOptions
{
660 String cmdName
= null;
661 boolean nomapred
= false;
662 boolean filterAll
= false;
665 int perClientRunRows
= DEFAULT_ROWS_PER_GB
;
666 int numClientThreads
= 1;
667 int totalRows
= DEFAULT_ROWS_PER_GB
;
668 int measureAfter
= 0;
669 float sampleRate
= 1.0f
;
670 double traceRate
= 0.0;
671 String tableName
= TABLE_NAME
;
672 boolean flushCommits
= true;
673 boolean writeToWAL
= true;
674 boolean autoFlush
= false;
675 boolean oneCon
= false;
676 int connCount
= -1; //wil decide the actual num later
677 boolean useTags
= false;
679 boolean reportLatency
= false;
683 boolean inMemoryCF
= false;
684 int presplitRegions
= 0;
685 int replicas
= HTableDescriptor
.DEFAULT_REGION_REPLICATION
;
686 String splitPolicy
= null;
687 Compression
.Algorithm compression
= Compression
.Algorithm
.NONE
;
688 BloomType bloomType
= BloomType
.ROW
;
689 int blockSize
= HConstants
.DEFAULT_BLOCKSIZE
;
690 DataBlockEncoding blockEncoding
= DataBlockEncoding
.NONE
;
691 boolean valueRandom
= false;
692 boolean valueZipf
= false;
693 int valueSize
= DEFAULT_VALUE_LENGTH
;
694 int period
= (this.perClientRunRows
/ 10) == 0? perClientRunRows
: perClientRunRows
/ 10;
699 boolean addColumns
= true;
700 MemoryCompactionPolicy inMemoryCompaction
=
701 MemoryCompactionPolicy
.valueOf(
702 CompactingMemStore
.COMPACTING_MEMSTORE_TYPE_DEFAULT
);
703 boolean asyncPrefetch
= false;
704 boolean cacheBlocks
= true;
705 Scan
.ReadType scanReadType
= Scan
.ReadType
.DEFAULT
;
706 long bufferSize
= 2l * 1024l * 1024l;
708 public TestOptions() {}
712 * @param that Object to copy from.
714 public TestOptions(TestOptions that
) {
715 this.cmdName
= that
.cmdName
;
716 this.cycles
= that
.cycles
;
717 this.nomapred
= that
.nomapred
;
718 this.startRow
= that
.startRow
;
719 this.size
= that
.size
;
720 this.perClientRunRows
= that
.perClientRunRows
;
721 this.numClientThreads
= that
.numClientThreads
;
722 this.totalRows
= that
.totalRows
;
723 this.sampleRate
= that
.sampleRate
;
724 this.traceRate
= that
.traceRate
;
725 this.tableName
= that
.tableName
;
726 this.flushCommits
= that
.flushCommits
;
727 this.writeToWAL
= that
.writeToWAL
;
728 this.autoFlush
= that
.autoFlush
;
729 this.oneCon
= that
.oneCon
;
730 this.connCount
= that
.connCount
;
731 this.useTags
= that
.useTags
;
732 this.noOfTags
= that
.noOfTags
;
733 this.reportLatency
= that
.reportLatency
;
734 this.multiGet
= that
.multiGet
;
735 this.multiPut
= that
.multiPut
;
736 this.inMemoryCF
= that
.inMemoryCF
;
737 this.presplitRegions
= that
.presplitRegions
;
738 this.replicas
= that
.replicas
;
739 this.splitPolicy
= that
.splitPolicy
;
740 this.compression
= that
.compression
;
741 this.blockEncoding
= that
.blockEncoding
;
742 this.filterAll
= that
.filterAll
;
743 this.bloomType
= that
.bloomType
;
744 this.blockSize
= that
.blockSize
;
745 this.valueRandom
= that
.valueRandom
;
746 this.valueZipf
= that
.valueZipf
;
747 this.valueSize
= that
.valueSize
;
748 this.period
= that
.period
;
749 this.randomSleep
= that
.randomSleep
;
750 this.measureAfter
= that
.measureAfter
;
751 this.addColumns
= that
.addColumns
;
752 this.columns
= that
.columns
;
753 this.families
= that
.families
;
754 this.caching
= that
.caching
;
755 this.inMemoryCompaction
= that
.inMemoryCompaction
;
756 this.asyncPrefetch
= that
.asyncPrefetch
;
757 this.cacheBlocks
= that
.cacheBlocks
;
758 this.scanReadType
= that
.scanReadType
;
759 this.bufferSize
= that
.bufferSize
;
762 public int getCaching() {
766 public void setCaching(final int caching
) {
767 this.caching
= caching
;
770 public int getColumns() {
774 public void setColumns(final int columns
) {
775 this.columns
= columns
;
778 public int getFamilies() {
779 return this.families
;
782 public void setFamilies(final int families
) {
783 this.families
= families
;
786 public int getCycles() {
790 public void setCycles(final int cycles
) {
791 this.cycles
= cycles
;
794 public boolean isValueZipf() {
798 public void setValueZipf(boolean valueZipf
) {
799 this.valueZipf
= valueZipf
;
802 public String
getCmdName() {
806 public void setCmdName(String cmdName
) {
807 this.cmdName
= cmdName
;
810 public int getRandomSleep() {
814 public void setRandomSleep(int randomSleep
) {
815 this.randomSleep
= randomSleep
;
818 public int getReplicas() {
822 public void setReplicas(int replicas
) {
823 this.replicas
= replicas
;
826 public String
getSplitPolicy() {
830 public void setSplitPolicy(String splitPolicy
) {
831 this.splitPolicy
= splitPolicy
;
834 public void setNomapred(boolean nomapred
) {
835 this.nomapred
= nomapred
;
838 public void setFilterAll(boolean filterAll
) {
839 this.filterAll
= filterAll
;
842 public void setStartRow(int startRow
) {
843 this.startRow
= startRow
;
846 public void setSize(float size
) {
850 public void setPerClientRunRows(int perClientRunRows
) {
851 this.perClientRunRows
= perClientRunRows
;
854 public void setNumClientThreads(int numClientThreads
) {
855 this.numClientThreads
= numClientThreads
;
858 public void setTotalRows(int totalRows
) {
859 this.totalRows
= totalRows
;
862 public void setSampleRate(float sampleRate
) {
863 this.sampleRate
= sampleRate
;
866 public void setTraceRate(double traceRate
) {
867 this.traceRate
= traceRate
;
870 public void setTableName(String tableName
) {
871 this.tableName
= tableName
;
874 public void setFlushCommits(boolean flushCommits
) {
875 this.flushCommits
= flushCommits
;
878 public void setWriteToWAL(boolean writeToWAL
) {
879 this.writeToWAL
= writeToWAL
;
882 public void setAutoFlush(boolean autoFlush
) {
883 this.autoFlush
= autoFlush
;
886 public void setOneCon(boolean oneCon
) {
887 this.oneCon
= oneCon
;
890 public int getConnCount() {
894 public void setConnCount(int connCount
) {
895 this.connCount
= connCount
;
898 public void setUseTags(boolean useTags
) {
899 this.useTags
= useTags
;
902 public void setNoOfTags(int noOfTags
) {
903 this.noOfTags
= noOfTags
;
906 public void setReportLatency(boolean reportLatency
) {
907 this.reportLatency
= reportLatency
;
910 public void setMultiGet(int multiGet
) {
911 this.multiGet
= multiGet
;
914 public void setMultiPut(int multiPut
) {
915 this.multiPut
= multiPut
;
918 public void setInMemoryCF(boolean inMemoryCF
) {
919 this.inMemoryCF
= inMemoryCF
;
922 public void setPresplitRegions(int presplitRegions
) {
923 this.presplitRegions
= presplitRegions
;
926 public void setCompression(Compression
.Algorithm compression
) {
927 this.compression
= compression
;
930 public void setBloomType(BloomType bloomType
) {
931 this.bloomType
= bloomType
;
934 public void setBlockSize(int blockSize
) {
935 this.blockSize
= blockSize
;
938 public void setBlockEncoding(DataBlockEncoding blockEncoding
) {
939 this.blockEncoding
= blockEncoding
;
942 public void setValueRandom(boolean valueRandom
) {
943 this.valueRandom
= valueRandom
;
946 public void setValueSize(int valueSize
) {
947 this.valueSize
= valueSize
;
950 public void setBufferSize(long bufferSize
) {
951 this.bufferSize
= bufferSize
;
954 public void setPeriod(int period
) {
955 this.period
= period
;
958 public boolean isNomapred() {
962 public boolean isFilterAll() {
966 public int getStartRow() {
970 public float getSize() {
974 public int getPerClientRunRows() {
975 return perClientRunRows
;
978 public int getNumClientThreads() {
979 return numClientThreads
;
982 public int getTotalRows() {
986 public float getSampleRate() {
990 public double getTraceRate() {
994 public String
getTableName() {
998 public boolean isFlushCommits() {
1002 public boolean isWriteToWAL() {
1006 public boolean isAutoFlush() {
1010 public boolean isUseTags() {
1014 public int getNoOfTags() {
1018 public boolean isReportLatency() {
1019 return reportLatency
;
1022 public int getMultiGet() {
1026 public int getMultiPut() {
1030 public boolean isInMemoryCF() {
1034 public int getPresplitRegions() {
1035 return presplitRegions
;
1038 public Compression
.Algorithm
getCompression() {
1042 public DataBlockEncoding
getBlockEncoding() {
1043 return blockEncoding
;
1046 public boolean isValueRandom() {
1050 public int getValueSize() {
1054 public int getPeriod() {
1058 public BloomType
getBloomType() {
1062 public int getBlockSize() {
1066 public boolean isOneCon() {
1070 public int getMeasureAfter() {
1071 return measureAfter
;
1074 public void setMeasureAfter(int measureAfter
) {
1075 this.measureAfter
= measureAfter
;
1078 public boolean getAddColumns() {
1082 public void setAddColumns(boolean addColumns
) {
1083 this.addColumns
= addColumns
;
1086 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction
) {
1087 this.inMemoryCompaction
= inMemoryCompaction
;
1090 public MemoryCompactionPolicy
getInMemoryCompaction() {
1091 return this.inMemoryCompaction
;
1094 public long getBufferSize() {
1095 return this.bufferSize
;
1101 * Subclass to particularize what happens per row.
1103 static abstract class TestBase
{
1104 // Below is make it so when Tests are all running in the one
1105 // jvm, that they each have a differently seeded Random.
1106 private static final Random randomSeed
= new Random(System
.currentTimeMillis());
1108 private static long nextRandomSeed() {
1109 return randomSeed
.nextLong();
1111 private final int everyN
;
1113 protected final Random rand
= new Random(nextRandomSeed());
1114 protected final Configuration conf
;
1115 protected final TestOptions opts
;
1117 private final Status status
;
1118 private final Sampler traceSampler
;
1119 private final SpanReceiverHost receiverHost
;
1121 private String testName
;
1122 private Histogram latencyHistogram
;
1123 private Histogram valueSizeHistogram
;
1124 private Histogram rpcCallsHistogram
;
1125 private Histogram remoteRpcCallsHistogram
;
1126 private Histogram millisBetweenNextHistogram
;
1127 private Histogram regionsScannedHistogram
;
1128 private Histogram bytesInResultsHistogram
;
1129 private Histogram bytesInRemoteResultsHistogram
;
1130 private RandomDistribution
.Zipf zipf
;
1133 * Note that all subclasses of this class must provide a public constructor
1134 * that has the exact same list of arguments.
1136 TestBase(final Configuration conf
, final TestOptions options
, final Status status
) {
1138 this.receiverHost
= this.conf
== null?
null: SpanReceiverHost
.getInstance(conf
);
1139 this.opts
= options
;
1140 this.status
= status
;
1141 this.testName
= this.getClass().getSimpleName();
1142 if (options
.traceRate
>= 1.0) {
1143 this.traceSampler
= Sampler
.ALWAYS
;
1144 } else if (options
.traceRate
> 0.0) {
1145 conf
.setDouble("hbase.sampler.fraction", options
.traceRate
);
1146 this.traceSampler
= new ProbabilitySampler(new HBaseHTraceConfiguration(conf
));
1148 this.traceSampler
= Sampler
.NEVER
;
1150 everyN
= (int) (opts
.totalRows
/ (opts
.totalRows
* opts
.sampleRate
));
1151 if (options
.isValueZipf()) {
1152 this.zipf
= new RandomDistribution
.Zipf(this.rand
, 1, options
.getValueSize(), 1.2);
1154 LOG
.info("Sampling 1 every " + everyN
+ " out of " + opts
.perClientRunRows
+ " total rows.");
1157 int getValueLength(final Random r
) {
1158 if (this.opts
.isValueRandom()) {
1159 return r
.nextInt(opts
.valueSize
);
1160 } else if (this.opts
.isValueZipf()) {
1161 return Math
.abs(this.zipf
.nextInt());
1163 return opts
.valueSize
;
1167 void updateValueSize(final Result
[] rs
) throws IOException
{
1168 if (rs
== null || !isRandomValueSize()) return;
1169 for (Result r
: rs
) updateValueSize(r
);
1172 void updateValueSize(final Result r
) throws IOException
{
1173 if (r
== null || !isRandomValueSize()) return;
1175 for (CellScanner scanner
= r
.cellScanner(); scanner
.advance();) {
1176 size
+= scanner
.current().getValueLength();
1178 updateValueSize(size
);
1181 void updateValueSize(final int valueSize
) {
1182 if (!isRandomValueSize()) return;
1183 this.valueSizeHistogram
.update(valueSize
);
1186 void updateScanMetrics(final ScanMetrics metrics
) {
1187 if (metrics
== null) return;
1188 Map
<String
,Long
> metricsMap
= metrics
.getMetricsMap();
1189 Long rpcCalls
= metricsMap
.get(ScanMetrics
.RPC_CALLS_METRIC_NAME
);
1190 if (rpcCalls
!= null) {
1191 this.rpcCallsHistogram
.update(rpcCalls
.longValue());
1193 Long remoteRpcCalls
= metricsMap
.get(ScanMetrics
.REMOTE_RPC_CALLS_METRIC_NAME
);
1194 if (remoteRpcCalls
!= null) {
1195 this.remoteRpcCallsHistogram
.update(remoteRpcCalls
.longValue());
1197 Long millisBetweenNext
= metricsMap
.get(ScanMetrics
.MILLIS_BETWEEN_NEXTS_METRIC_NAME
);
1198 if (millisBetweenNext
!= null) {
1199 this.millisBetweenNextHistogram
.update(millisBetweenNext
.longValue());
1201 Long regionsScanned
= metricsMap
.get(ScanMetrics
.REGIONS_SCANNED_METRIC_NAME
);
1202 if (regionsScanned
!= null) {
1203 this.regionsScannedHistogram
.update(regionsScanned
.longValue());
1205 Long bytesInResults
= metricsMap
.get(ScanMetrics
.BYTES_IN_RESULTS_METRIC_NAME
);
1206 if (bytesInResults
!= null && bytesInResults
.longValue() > 0) {
1207 this.bytesInResultsHistogram
.update(bytesInResults
.longValue());
1209 Long bytesInRemoteResults
= metricsMap
.get(ScanMetrics
.BYTES_IN_REMOTE_RESULTS_METRIC_NAME
);
1210 if (bytesInRemoteResults
!= null && bytesInRemoteResults
.longValue() > 0) {
1211 this.bytesInRemoteResultsHistogram
.update(bytesInRemoteResults
.longValue());
1215 String
generateStatus(final int sr
, final int i
, final int lr
) {
1216 return sr
+ "/" + i
+ "/" + lr
+ ", latency " + getShortLatencyReport() +
1217 (!isRandomValueSize()?
"": ", value size " + getShortValueSizeReport());
1220 boolean isRandomValueSize() {
1221 return opts
.valueRandom
;
1224 protected int getReportingPeriod() {
1229 * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1231 public Histogram
getLatencyHistogram() {
1232 return latencyHistogram
;
1235 void testSetup() throws IOException
{
1237 latencyHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1238 valueSizeHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1240 rpcCallsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1241 remoteRpcCallsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1242 millisBetweenNextHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1243 regionsScannedHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1244 bytesInResultsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1245 bytesInRemoteResultsHistogram
= YammerHistogramUtils
.newHistogram(new UniformReservoir(1024 * 500));
1250 abstract void onStartup() throws IOException
;
1252 void testTakedown() throws IOException
{
1254 // Print all stats for this thread continuously.
1255 // Synchronize on Test.class so different threads don't intermingle the
1256 // output. We can't use 'this' here because each thread has its own instance of Test class.
1257 synchronized (Test
.class) {
1258 status
.setStatus("Test : " + testName
+ ", Thread : " + Thread
.currentThread().getName());
1259 status
.setStatus("Latency (us) : " + YammerHistogramUtils
.getHistogramReport(
1261 status
.setStatus("Num measures (latency) : " + latencyHistogram
.getCount());
1262 status
.setStatus(YammerHistogramUtils
.getPrettyHistogramReport(latencyHistogram
));
1263 if (valueSizeHistogram
.getCount() > 0) {
1264 status
.setStatus("ValueSize (bytes) : "
1265 + YammerHistogramUtils
.getHistogramReport(valueSizeHistogram
));
1266 status
.setStatus("Num measures (ValueSize): " + valueSizeHistogram
.getCount());
1267 status
.setStatus(YammerHistogramUtils
.getPrettyHistogramReport(valueSizeHistogram
));
1269 status
.setStatus("No valueSize statistics available");
1271 if (rpcCallsHistogram
.getCount() > 0) {
1272 status
.setStatus("rpcCalls (count): " +
1273 YammerHistogramUtils
.getHistogramReport(rpcCallsHistogram
));
1275 if (remoteRpcCallsHistogram
.getCount() > 0) {
1276 status
.setStatus("remoteRpcCalls (count): " +
1277 YammerHistogramUtils
.getHistogramReport(remoteRpcCallsHistogram
));
1279 if (millisBetweenNextHistogram
.getCount() > 0) {
1280 status
.setStatus("millisBetweenNext (latency): " +
1281 YammerHistogramUtils
.getHistogramReport(millisBetweenNextHistogram
));
1283 if (regionsScannedHistogram
.getCount() > 0) {
1284 status
.setStatus("regionsScanned (count): " +
1285 YammerHistogramUtils
.getHistogramReport(regionsScannedHistogram
));
1287 if (bytesInResultsHistogram
.getCount() > 0) {
1288 status
.setStatus("bytesInResults (size): " +
1289 YammerHistogramUtils
.getHistogramReport(bytesInResultsHistogram
));
1291 if (bytesInRemoteResultsHistogram
.getCount() > 0) {
1292 status
.setStatus("bytesInRemoteResults (size): " +
1293 YammerHistogramUtils
.getHistogramReport(bytesInRemoteResultsHistogram
));
1296 receiverHost
.closeReceivers();
1299 abstract void onTakedown() throws IOException
;
1304 * @return Elapsed time.
1305 * @throws IOException
1307 long test() throws IOException
, InterruptedException
{
1309 LOG
.info("Timed test starting in thread " + Thread
.currentThread().getName());
1310 final long startTime
= System
.nanoTime();
1316 return (System
.nanoTime() - startTime
) / 1000000;
1320 return opts
.startRow
;
1324 return getStartRow() + opts
.perClientRunRows
;
1328 * Provides an extension point for tests that don't want a per row invocation.
1330 void testTimed() throws IOException
, InterruptedException
{
1331 int startRow
= getStartRow();
1332 int lastRow
= getLastRow();
1333 TraceUtil
.addSampler(traceSampler
);
1334 // Report on completion of 1/10th of total.
1335 for (int ii
= 0; ii
< opts
.cycles
; ii
++) {
1336 if (opts
.cycles
> 1) LOG
.info("Cycle=" + ii
+ " of " + opts
.cycles
);
1337 for (int i
= startRow
; i
< lastRow
; i
++) {
1338 if (i
% everyN
!= 0) continue;
1339 long startTime
= System
.nanoTime();
1340 boolean requestSent
= false;
1341 try (TraceScope scope
= TraceUtil
.createTrace("test row");){
1342 requestSent
= testRow(i
);
1344 if ( (i
- startRow
) > opts
.measureAfter
) {
1345 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1346 // first 9 times and sends the actual get request in the 10th iteration.
1347 // We should only set latency when actual request is sent because otherwise
1348 // it turns out to be 0.
1350 latencyHistogram
.update((System
.nanoTime() - startTime
) / 1000);
1352 if (status
!= null && i
> 0 && (i
% getReportingPeriod()) == 0) {
1353 status
.setStatus(generateStatus(startRow
, i
, lastRow
));
1361 * @return Subset of the histograms' calculation.
1363 public String
getShortLatencyReport() {
1364 return YammerHistogramUtils
.getShortHistogramReport(this.latencyHistogram
);
1368 * @return Subset of the histograms' calculation.
1370 public String
getShortValueSizeReport() {
1371 return YammerHistogramUtils
.getShortHistogramReport(this.valueSizeHistogram
);
1376 * Test for individual row.
1377 * @param i Row index.
1378 * @return true if the row was sent to server and need to record metrics.
1379 * False if not, multiGet and multiPut e.g., the rows are sent
1380 * to server only if enough gets/puts are gathered.
1382 abstract boolean testRow(final int i
) throws IOException
, InterruptedException
;
1385 static abstract class Test
extends TestBase
{
1386 protected Connection connection
;
1388 Test(final Connection con
, final TestOptions options
, final Status status
) {
1389 super(con
== null ? HBaseConfiguration
.create() : con
.getConfiguration(), options
, status
);
1390 this.connection
= con
;
1394 static abstract class AsyncTest
extends TestBase
{
1395 protected AsyncConnection connection
;
1397 AsyncTest(final AsyncConnection con
, final TestOptions options
, final Status status
) {
1398 super(con
== null ? HBaseConfiguration
.create() : con
.getConfiguration(), options
, status
);
1399 this.connection
= con
;
1403 static abstract class TableTest
extends Test
{
1404 protected Table table
;
1406 TableTest(Connection con
, TestOptions options
, Status status
) {
1407 super(con
, options
, status
);
1411 void onStartup() throws IOException
{
1412 this.table
= connection
.getTable(TableName
.valueOf(opts
.tableName
));
1416 void onTakedown() throws IOException
{
1421 static abstract class AsyncTableTest
extends AsyncTest
{
1422 protected AsyncTable
<?
> table
;
1424 AsyncTableTest(AsyncConnection con
, TestOptions options
, Status status
) {
1425 super(con
, options
, status
);
1429 void onStartup() throws IOException
{
1430 this.table
= connection
.getTable(TableName
.valueOf(opts
.tableName
));
1434 void onTakedown() throws IOException
{
1438 static class AsyncRandomReadTest
extends AsyncTableTest
{
1439 private final Consistency consistency
;
1440 private ArrayList
<Get
> gets
;
1441 private Random rd
= new Random();
1443 AsyncRandomReadTest(AsyncConnection con
, TestOptions options
, Status status
) {
1444 super(con
, options
, status
);
1445 consistency
= options
.replicas
== DEFAULT_OPTS
.replicas ?
null : Consistency
.TIMELINE
;
1446 if (opts
.multiGet
> 0) {
1447 LOG
.info("MultiGet enabled. Sending GETs in batches of " + opts
.multiGet
+ ".");
1448 this.gets
= new ArrayList
<>(opts
.multiGet
);
1453 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1454 if (opts
.randomSleep
> 0) {
1455 Thread
.sleep(rd
.nextInt(opts
.randomSleep
));
1457 Get get
= new Get(getRandomRow(this.rand
, opts
.totalRows
));
1458 for (int family
= 0; family
< opts
.families
; family
++) {
1459 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1460 if (opts
.addColumns
) {
1461 for (int column
= 0; column
< opts
.columns
; column
++) {
1462 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1463 get
.addColumn(familyName
, qualifier
);
1466 get
.addFamily(familyName
);
1469 if (opts
.filterAll
) {
1470 get
.setFilter(new FilterAllFilter());
1472 get
.setConsistency(consistency
);
1473 if (LOG
.isTraceEnabled()) LOG
.trace(get
.toString());
1475 if (opts
.multiGet
> 0) {
1477 if (this.gets
.size() == opts
.multiGet
) {
1479 this.table
.get(this.gets
).stream().map(f
-> propagate(f
::get
)).toArray(Result
[]::new);
1480 updateValueSize(rs
);
1486 updateValueSize(this.table
.get(get
).get());
1488 } catch (ExecutionException e
) {
1489 throw new IOException(e
);
1494 public static RuntimeException
runtime(Throwable e
) {
1495 if (e
instanceof RuntimeException
) {
1496 return (RuntimeException
) e
;
1498 return new RuntimeException(e
);
1501 public static <V
> V
propagate(Callable
<V
> callable
) {
1503 return callable
.call();
1504 } catch (Exception e
) {
1510 protected int getReportingPeriod() {
1511 int period
= opts
.perClientRunRows
/ 10;
1512 return period
== 0 ? opts
.perClientRunRows
: period
;
1516 protected void testTakedown() throws IOException
{
1517 if (this.gets
!= null && this.gets
.size() > 0) {
1518 this.table
.get(gets
);
1521 super.testTakedown();
1525 static class AsyncRandomWriteTest
extends AsyncSequentialWriteTest
{
1527 AsyncRandomWriteTest(AsyncConnection con
, TestOptions options
, Status status
) {
1528 super(con
, options
, status
);
1532 protected byte[] generateRow(final int i
) {
1533 return getRandomRow(this.rand
, opts
.totalRows
);
1537 static class AsyncScanTest
extends AsyncTableTest
{
1538 private ResultScanner testScanner
;
1539 private AsyncTable
<?
> asyncTable
;
1541 AsyncScanTest(AsyncConnection con
, TestOptions options
, Status status
) {
1542 super(con
, options
, status
);
1546 void onStartup() throws IOException
{
1548 connection
.getTable(TableName
.valueOf(opts
.tableName
),
1549 Executors
.newFixedThreadPool(Runtime
.getRuntime().availableProcessors()));
1553 void testTakedown() throws IOException
{
1554 if (this.testScanner
!= null) {
1555 updateScanMetrics(this.testScanner
.getScanMetrics());
1556 this.testScanner
.close();
1558 super.testTakedown();
1562 boolean testRow(final int i
) throws IOException
{
1563 if (this.testScanner
== null) {
1565 new Scan().withStartRow(format(opts
.startRow
)).setCaching(opts
.caching
)
1566 .setCacheBlocks(opts
.cacheBlocks
).setAsyncPrefetch(opts
.asyncPrefetch
)
1567 .setReadType(opts
.scanReadType
).setScanMetricsEnabled(true);
1568 for (int family
= 0; family
< opts
.families
; family
++) {
1569 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1570 if (opts
.addColumns
) {
1571 for (int column
= 0; column
< opts
.columns
; column
++) {
1572 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1573 scan
.addColumn(familyName
, qualifier
);
1576 scan
.addFamily(familyName
);
1579 if (opts
.filterAll
) {
1580 scan
.setFilter(new FilterAllFilter());
1582 this.testScanner
= asyncTable
.getScanner(scan
);
1584 Result r
= testScanner
.next();
1590 static class AsyncSequentialReadTest
extends AsyncTableTest
{
1591 AsyncSequentialReadTest(AsyncConnection con
, TestOptions options
, Status status
) {
1592 super(con
, options
, status
);
1596 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1597 Get get
= new Get(format(i
));
1598 for (int family
= 0; family
< opts
.families
; family
++) {
1599 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1600 if (opts
.addColumns
) {
1601 for (int column
= 0; column
< opts
.columns
; column
++) {
1602 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1603 get
.addColumn(familyName
, qualifier
);
1606 get
.addFamily(familyName
);
1609 if (opts
.filterAll
) {
1610 get
.setFilter(new FilterAllFilter());
1613 updateValueSize(table
.get(get
).get());
1614 } catch (ExecutionException e
) {
1615 throw new IOException(e
);
1621 static class AsyncSequentialWriteTest
extends AsyncTableTest
{
1622 private ArrayList
<Put
> puts
;
1624 AsyncSequentialWriteTest(AsyncConnection con
, TestOptions options
, Status status
) {
1625 super(con
, options
, status
);
1626 if (opts
.multiPut
> 0) {
1627 LOG
.info("MultiPut enabled. Sending PUTs in batches of " + opts
.multiPut
+ ".");
1628 this.puts
= new ArrayList
<>(opts
.multiPut
);
1632 protected byte[] generateRow(final int i
) {
1637 @SuppressWarnings("ReturnValueIgnored")
1638 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1639 byte[] row
= generateRow(i
);
1640 Put put
= new Put(row
);
1641 for (int family
= 0; family
< opts
.families
; family
++) {
1642 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1643 for (int column
= 0; column
< opts
.columns
; column
++) {
1644 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1645 byte[] value
= generateData(this.rand
, getValueLength(this.rand
));
1647 byte[] tag
= generateData(this.rand
, TAG_LENGTH
);
1648 Tag
[] tags
= new Tag
[opts
.noOfTags
];
1649 for (int n
= 0; n
< opts
.noOfTags
; n
++) {
1650 Tag t
= new ArrayBackedTag((byte) n
, tag
);
1653 KeyValue kv
= new KeyValue(row
, familyName
, qualifier
, HConstants
.LATEST_TIMESTAMP
,
1656 updateValueSize(kv
.getValueLength());
1658 put
.addColumn(familyName
, qualifier
, value
);
1659 updateValueSize(value
.length
);
1663 put
.setDurability(opts
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
1665 table
.put(put
).get();
1666 if (opts
.multiPut
> 0) {
1668 if (this.puts
.size() == opts
.multiPut
) {
1669 this.table
.put(puts
).stream().map(f
-> AsyncRandomReadTest
.propagate(f
::get
));
1675 table
.put(put
).get();
1677 } catch (ExecutionException e
) {
1678 throw new IOException(e
);
1684 static abstract class BufferedMutatorTest
extends Test
{
1685 protected BufferedMutator mutator
;
1686 protected Table table
;
1688 BufferedMutatorTest(Connection con
, TestOptions options
, Status status
) {
1689 super(con
, options
, status
);
1693 void onStartup() throws IOException
{
1694 BufferedMutatorParams p
= new BufferedMutatorParams(TableName
.valueOf(opts
.tableName
));
1695 p
.writeBufferSize(opts
.bufferSize
);
1696 this.mutator
= connection
.getBufferedMutator(p
);
1697 this.table
= connection
.getTable(TableName
.valueOf(opts
.tableName
));
1701 void onTakedown() throws IOException
{
1707 static class RandomSeekScanTest
extends TableTest
{
1708 RandomSeekScanTest(Connection con
, TestOptions options
, Status status
) {
1709 super(con
, options
, status
);
1713 boolean testRow(final int i
) throws IOException
{
1714 Scan scan
= new Scan().withStartRow(getRandomRow(this.rand
, opts
.totalRows
))
1715 .setCaching(opts
.caching
).setCacheBlocks(opts
.cacheBlocks
)
1716 .setAsyncPrefetch(opts
.asyncPrefetch
).setReadType(opts
.scanReadType
)
1717 .setScanMetricsEnabled(true);
1718 FilterList list
= new FilterList();
1719 for (int family
= 0; family
< opts
.families
; family
++) {
1720 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1721 if (opts
.addColumns
) {
1722 for (int column
= 0; column
< opts
.columns
; column
++) {
1723 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1724 scan
.addColumn(familyName
, qualifier
);
1727 scan
.addFamily(familyName
);
1730 if (opts
.filterAll
) {
1731 list
.addFilter(new FilterAllFilter());
1733 list
.addFilter(new WhileMatchFilter(new PageFilter(120)));
1734 scan
.setFilter(list
);
1735 ResultScanner s
= this.table
.getScanner(scan
);
1737 for (Result rr
; (rr
= s
.next()) != null;) {
1738 updateValueSize(rr
);
1741 updateScanMetrics(s
.getScanMetrics());
1748 protected int getReportingPeriod() {
1749 int period
= opts
.perClientRunRows
/ 100;
1750 return period
== 0 ? opts
.perClientRunRows
: period
;
1755 static abstract class RandomScanWithRangeTest
extends TableTest
{
1756 RandomScanWithRangeTest(Connection con
, TestOptions options
, Status status
) {
1757 super(con
, options
, status
);
1761 boolean testRow(final int i
) throws IOException
{
1762 Pair
<byte[], byte[]> startAndStopRow
= getStartAndStopRow();
1763 Scan scan
= new Scan().withStartRow(startAndStopRow
.getFirst())
1764 .withStopRow(startAndStopRow
.getSecond()).setCaching(opts
.caching
)
1765 .setCacheBlocks(opts
.cacheBlocks
).setAsyncPrefetch(opts
.asyncPrefetch
)
1766 .setReadType(opts
.scanReadType
).setScanMetricsEnabled(true);
1767 for (int family
= 0; family
< opts
.families
; family
++) {
1768 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1769 if (opts
.addColumns
) {
1770 for (int column
= 0; column
< opts
.columns
; column
++) {
1771 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1772 scan
.addColumn(familyName
, qualifier
);
1775 scan
.addFamily(familyName
);
1778 if (opts
.filterAll
) {
1779 scan
.setFilter(new FilterAllFilter());
1783 ResultScanner s
= this.table
.getScanner(scan
);
1785 for (; (r
= s
.next()) != null;) {
1790 LOG
.info(String
.format("Scan for key range %s - %s returned %s rows",
1791 Bytes
.toString(startAndStopRow
.getFirst()),
1792 Bytes
.toString(startAndStopRow
.getSecond()), count
));
1795 updateScanMetrics(s
.getScanMetrics());
1801 protected abstract Pair
<byte[],byte[]> getStartAndStopRow();
1803 protected Pair
<byte[], byte[]> generateStartAndStopRows(int maxRange
) {
1804 int start
= this.rand
.nextInt(Integer
.MAX_VALUE
) % opts
.totalRows
;
1805 int stop
= start
+ maxRange
;
1806 return new Pair
<>(format(start
), format(stop
));
1810 protected int getReportingPeriod() {
1811 int period
= opts
.perClientRunRows
/ 100;
1812 return period
== 0? opts
.perClientRunRows
: period
;
1816 static class RandomScanWithRange10Test
extends RandomScanWithRangeTest
{
1817 RandomScanWithRange10Test(Connection con
, TestOptions options
, Status status
) {
1818 super(con
, options
, status
);
1822 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1823 return generateStartAndStopRows(10);
1827 static class RandomScanWithRange100Test
extends RandomScanWithRangeTest
{
1828 RandomScanWithRange100Test(Connection con
, TestOptions options
, Status status
) {
1829 super(con
, options
, status
);
1833 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1834 return generateStartAndStopRows(100);
1838 static class RandomScanWithRange1000Test
extends RandomScanWithRangeTest
{
1839 RandomScanWithRange1000Test(Connection con
, TestOptions options
, Status status
) {
1840 super(con
, options
, status
);
1844 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1845 return generateStartAndStopRows(1000);
1849 static class RandomScanWithRange10000Test
extends RandomScanWithRangeTest
{
1850 RandomScanWithRange10000Test(Connection con
, TestOptions options
, Status status
) {
1851 super(con
, options
, status
);
1855 protected Pair
<byte[], byte[]> getStartAndStopRow() {
1856 return generateStartAndStopRows(10000);
1860 static class RandomReadTest
extends TableTest
{
1861 private final Consistency consistency
;
1862 private ArrayList
<Get
> gets
;
1863 private Random rd
= new Random();
1865 RandomReadTest(Connection con
, TestOptions options
, Status status
) {
1866 super(con
, options
, status
);
1867 consistency
= options
.replicas
== DEFAULT_OPTS
.replicas ?
null : Consistency
.TIMELINE
;
1868 if (opts
.multiGet
> 0) {
1869 LOG
.info("MultiGet enabled. Sending GETs in batches of " + opts
.multiGet
+ ".");
1870 this.gets
= new ArrayList
<>(opts
.multiGet
);
1875 boolean testRow(final int i
) throws IOException
, InterruptedException
{
1876 if (opts
.randomSleep
> 0) {
1877 Thread
.sleep(rd
.nextInt(opts
.randomSleep
));
1879 Get get
= new Get(getRandomRow(this.rand
, opts
.totalRows
));
1880 for (int family
= 0; family
< opts
.families
; family
++) {
1881 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1882 if (opts
.addColumns
) {
1883 for (int column
= 0; column
< opts
.columns
; column
++) {
1884 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1885 get
.addColumn(familyName
, qualifier
);
1888 get
.addFamily(familyName
);
1891 if (opts
.filterAll
) {
1892 get
.setFilter(new FilterAllFilter());
1894 get
.setConsistency(consistency
);
1895 if (LOG
.isTraceEnabled()) LOG
.trace(get
.toString());
1896 if (opts
.multiGet
> 0) {
1898 if (this.gets
.size() == opts
.multiGet
) {
1899 Result
[] rs
= this.table
.get(this.gets
);
1900 updateValueSize(rs
);
1906 updateValueSize(this.table
.get(get
));
1912 protected int getReportingPeriod() {
1913 int period
= opts
.perClientRunRows
/ 10;
1914 return period
== 0 ? opts
.perClientRunRows
: period
;
1918 protected void testTakedown() throws IOException
{
1919 if (this.gets
!= null && this.gets
.size() > 0) {
1920 this.table
.get(gets
);
1923 super.testTakedown();
1927 static class RandomWriteTest
extends SequentialWriteTest
{
1928 RandomWriteTest(Connection con
, TestOptions options
, Status status
) {
1929 super(con
, options
, status
);
1933 protected byte[] generateRow(final int i
) {
1934 return getRandomRow(this.rand
, opts
.totalRows
);
1940 static class ScanTest
extends TableTest
{
1941 private ResultScanner testScanner
;
1943 ScanTest(Connection con
, TestOptions options
, Status status
) {
1944 super(con
, options
, status
);
1948 void testTakedown() throws IOException
{
1949 if (this.testScanner
!= null) {
1950 this.testScanner
.close();
1952 super.testTakedown();
1957 boolean testRow(final int i
) throws IOException
{
1958 if (this.testScanner
== null) {
1959 Scan scan
= new Scan().withStartRow(format(opts
.startRow
)).setCaching(opts
.caching
)
1960 .setCacheBlocks(opts
.cacheBlocks
).setAsyncPrefetch(opts
.asyncPrefetch
)
1961 .setReadType(opts
.scanReadType
).setScanMetricsEnabled(true);
1962 for (int family
= 0; family
< opts
.families
; family
++) {
1963 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
1964 if (opts
.addColumns
) {
1965 for (int column
= 0; column
< opts
.columns
; column
++) {
1966 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
1967 scan
.addColumn(familyName
, qualifier
);
1970 scan
.addFamily(familyName
);
1973 if (opts
.filterAll
) {
1974 scan
.setFilter(new FilterAllFilter());
1976 this.testScanner
= table
.getScanner(scan
);
1978 Result r
= testScanner
.next();
1985 * Base class for operations that are CAS-like; that read a value and then set it based off what
1986 * they read. In this category is increment, append, checkAndPut, etc.
1988 * <p>These operations also want some concurrency going on. Usually when these tests run, they
1989 * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1990 * same key space. We do this with our getStartRow and getLastRow overrides.
1992 static abstract class CASTableTest
extends TableTest
{
1993 private final byte [] qualifier
;
1994 CASTableTest(Connection con
, TestOptions options
, Status status
) {
1995 super(con
, options
, status
);
1996 qualifier
= Bytes
.toBytes(this.getClass().getSimpleName());
1999 byte [] getQualifier() {
2000 return this.qualifier
;
2010 return opts
.perClientRunRows
;
2014 static class IncrementTest
extends CASTableTest
{
2015 IncrementTest(Connection con
, TestOptions options
, Status status
) {
2016 super(con
, options
, status
);
2020 boolean testRow(final int i
) throws IOException
{
2021 Increment increment
= new Increment(format(i
));
2022 // unlike checkAndXXX tests, which make most sense to do on a single value,
2023 // if multiple families are specified for an increment test we assume it is
2024 // meant to raise the work factor
2025 for (int family
= 0; family
< opts
.families
; family
++) {
2026 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2027 increment
.addColumn(familyName
, getQualifier(), 1l);
2029 updateValueSize(this.table
.increment(increment
));
2034 static class AppendTest
extends CASTableTest
{
2035 AppendTest(Connection con
, TestOptions options
, Status status
) {
2036 super(con
, options
, status
);
2040 boolean testRow(final int i
) throws IOException
{
2041 byte [] bytes
= format(i
);
2042 Append append
= new Append(bytes
);
2043 // unlike checkAndXXX tests, which make most sense to do on a single value,
2044 // if multiple families are specified for an append test we assume it is
2045 // meant to raise the work factor
2046 for (int family
= 0; family
< opts
.families
; family
++) {
2047 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2048 append
.addColumn(familyName
, getQualifier(), bytes
);
2050 updateValueSize(this.table
.append(append
));
2055 static class CheckAndMutateTest
extends CASTableTest
{
2056 CheckAndMutateTest(Connection con
, TestOptions options
, Status status
) {
2057 super(con
, options
, status
);
2061 boolean testRow(final int i
) throws IOException
{
2062 final byte [] bytes
= format(i
);
2063 // checkAndXXX tests operate on only a single value
2064 // Put a known value so when we go to check it, it is there.
2065 Put put
= new Put(bytes
);
2066 put
.addColumn(FAMILY_ZERO
, getQualifier(), bytes
);
2067 this.table
.put(put
);
2068 RowMutations mutations
= new RowMutations(bytes
);
2070 this.table
.checkAndMutate(bytes
, FAMILY_ZERO
).qualifier(getQualifier())
2071 .ifEquals(bytes
).thenMutate(mutations
);
2076 static class CheckAndPutTest
extends CASTableTest
{
2077 CheckAndPutTest(Connection con
, TestOptions options
, Status status
) {
2078 super(con
, options
, status
);
2082 boolean testRow(final int i
) throws IOException
{
2083 final byte [] bytes
= format(i
);
2084 // checkAndXXX tests operate on only a single value
2085 // Put a known value so when we go to check it, it is there.
2086 Put put
= new Put(bytes
);
2087 put
.addColumn(FAMILY_ZERO
, getQualifier(), bytes
);
2088 this.table
.put(put
);
2089 this.table
.checkAndMutate(bytes
, FAMILY_ZERO
).qualifier(getQualifier())
2090 .ifEquals(bytes
).thenPut(put
);
2095 static class CheckAndDeleteTest
extends CASTableTest
{
2096 CheckAndDeleteTest(Connection con
, TestOptions options
, Status status
) {
2097 super(con
, options
, status
);
2101 boolean testRow(final int i
) throws IOException
{
2102 final byte [] bytes
= format(i
);
2103 // checkAndXXX tests operate on only a single value
2104 // Put a known value so when we go to check it, it is there.
2105 Put put
= new Put(bytes
);
2106 put
.addColumn(FAMILY_ZERO
, getQualifier(), bytes
);
2107 this.table
.put(put
);
2108 Delete delete
= new Delete(put
.getRow());
2109 delete
.addColumn(FAMILY_ZERO
, getQualifier());
2110 this.table
.checkAndMutate(bytes
, FAMILY_ZERO
).qualifier(getQualifier())
2111 .ifEquals(bytes
).thenDelete(delete
);
2116 static class SequentialReadTest
extends TableTest
{
2117 SequentialReadTest(Connection con
, TestOptions options
, Status status
) {
2118 super(con
, options
, status
);
2122 boolean testRow(final int i
) throws IOException
{
2123 Get get
= new Get(format(i
));
2124 for (int family
= 0; family
< opts
.families
; family
++) {
2125 byte[] familyName
= Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2126 if (opts
.addColumns
) {
2127 for (int column
= 0; column
< opts
.columns
; column
++) {
2128 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
2129 get
.addColumn(familyName
, qualifier
);
2132 get
.addFamily(familyName
);
2135 if (opts
.filterAll
) {
2136 get
.setFilter(new FilterAllFilter());
2138 updateValueSize(table
.get(get
));
2143 static class SequentialWriteTest
extends BufferedMutatorTest
{
2144 private ArrayList
<Put
> puts
;
2147 SequentialWriteTest(Connection con
, TestOptions options
, Status status
) {
2148 super(con
, options
, status
);
2149 if (opts
.multiPut
> 0) {
2150 LOG
.info("MultiPut enabled. Sending PUTs in batches of " + opts
.multiPut
+ ".");
2151 this.puts
= new ArrayList
<>(opts
.multiPut
);
2155 protected byte[] generateRow(final int i
) {
2160 boolean testRow(final int i
) throws IOException
{
2161 byte[] row
= generateRow(i
);
2162 Put put
= new Put(row
);
2163 for (int family
= 0; family
< opts
.families
; family
++) {
2164 byte familyName
[] = Bytes
.toBytes(FAMILY_NAME_BASE
+ family
);
2165 for (int column
= 0; column
< opts
.columns
; column
++) {
2166 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
2167 byte[] value
= generateData(this.rand
, getValueLength(this.rand
));
2169 byte[] tag
= generateData(this.rand
, TAG_LENGTH
);
2170 Tag
[] tags
= new Tag
[opts
.noOfTags
];
2171 for (int n
= 0; n
< opts
.noOfTags
; n
++) {
2172 Tag t
= new ArrayBackedTag((byte) n
, tag
);
2175 KeyValue kv
= new KeyValue(row
, familyName
, qualifier
, HConstants
.LATEST_TIMESTAMP
,
2178 updateValueSize(kv
.getValueLength());
2180 put
.addColumn(familyName
, qualifier
, value
);
2181 updateValueSize(value
.length
);
2185 put
.setDurability(opts
.writeToWAL ? Durability
.SYNC_WAL
: Durability
.SKIP_WAL
);
2186 if (opts
.autoFlush
) {
2187 if (opts
.multiPut
> 0) {
2189 if (this.puts
.size() == opts
.multiPut
) {
2190 table
.put(this.puts
);
2199 mutator
.mutate(put
);
2205 static class FilteredScanTest
extends TableTest
{
2206 protected static final Logger LOG
= LoggerFactory
.getLogger(FilteredScanTest
.class.getName());
2208 FilteredScanTest(Connection con
, TestOptions options
, Status status
) {
2209 super(con
, options
, status
);
2210 if (opts
.perClientRunRows
== DEFAULT_ROWS_PER_GB
) {
2211 LOG
.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB
+
2212 ". This could take a very long time.");
2217 boolean testRow(int i
) throws IOException
{
2218 byte[] value
= generateData(this.rand
, getValueLength(this.rand
));
2219 Scan scan
= constructScan(value
);
2220 ResultScanner scanner
= null;
2222 scanner
= this.table
.getScanner(scan
);
2223 for (Result r
= null; (r
= scanner
.next()) != null;) {
2227 if (scanner
!= null) {
2228 updateScanMetrics(scanner
.getScanMetrics());
2235 protected Scan
constructScan(byte[] valuePrefix
) throws IOException
{
2236 FilterList list
= new FilterList();
2237 Filter filter
= new SingleColumnValueFilter(FAMILY_ZERO
, COLUMN_ZERO
,
2238 CompareOperator
.EQUAL
, new BinaryComparator(valuePrefix
));
2239 list
.addFilter(filter
);
2240 if (opts
.filterAll
) {
2241 list
.addFilter(new FilterAllFilter());
2243 Scan scan
= new Scan().setCaching(opts
.caching
).setCacheBlocks(opts
.cacheBlocks
)
2244 .setAsyncPrefetch(opts
.asyncPrefetch
).setReadType(opts
.scanReadType
)
2245 .setScanMetricsEnabled(true);
2246 if (opts
.addColumns
) {
2247 for (int column
= 0; column
< opts
.columns
; column
++) {
2248 byte [] qualifier
= column
== 0? COLUMN_ZERO
: Bytes
.toBytes("" + column
);
2249 scan
.addColumn(FAMILY_ZERO
, qualifier
);
2252 scan
.addFamily(FAMILY_ZERO
);
2254 scan
.setFilter(list
);
2260 * Compute a throughput rate in MB/s.
2261 * @param rows Number of records consumed.
2262 * @param timeMs Time taken in milliseconds.
2263 * @return String value with label, ie '123.76 MB/s'
2265 private static String
calculateMbps(int rows
, long timeMs
, final int valueSize
, int families
, int columns
) {
2266 BigDecimal rowSize
= BigDecimal
.valueOf(ROW_LENGTH
+
2267 ((valueSize
+ (FAMILY_NAME_BASE
.length()+1) + COLUMN_ZERO
.length
) * columns
) * families
);
2268 BigDecimal mbps
= BigDecimal
.valueOf(rows
).multiply(rowSize
, CXT
)
2269 .divide(BigDecimal
.valueOf(timeMs
), CXT
).multiply(MS_PER_SEC
, CXT
)
2270 .divide(BYTES_PER_MB
, CXT
);
2271 return FMT
.format(mbps
) + " MB/s";
2275 * Format passed integer.
2277 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2278 * number (Does absolute in case number is negative).
2280 public static byte [] format(final int number
) {
2281 byte [] b
= new byte[ROW_LENGTH
];
2282 int d
= Math
.abs(number
);
2283 for (int i
= b
.length
- 1; i
>= 0; i
--) {
2284 b
[i
] = (byte)((d
% 10) + '0');
2291 * This method takes some time and is done inline uploading data. For
2292 * example, doing the mapfile test, generation of the key and value
2293 * consumes about 30% of CPU time.
2294 * @return Generated random value to insert into a table cell.
2296 public static byte[] generateData(final Random r
, int length
) {
2297 byte [] b
= new byte [length
];
2300 for(i
= 0; i
< (length
-8); i
+= 8) {
2301 b
[i
] = (byte) (65 + r
.nextInt(26));
2311 byte a
= (byte) (65 + r
.nextInt(26));
2312 for(; i
< length
; i
++) {
2318 static byte [] getRandomRow(final Random random
, final int totalRows
) {
2319 return format(generateRandomRow(random
, totalRows
));
2322 static int generateRandomRow(final Random random
, final int totalRows
) {
2323 return random
.nextInt(Integer
.MAX_VALUE
) % totalRows
;
2326 static RunResult
runOneClient(final Class
<?
extends TestBase
> cmd
, Configuration conf
,
2327 Connection con
, AsyncConnection asyncCon
, TestOptions opts
, final Status status
)
2328 throws IOException
, InterruptedException
{
2329 status
.setStatus("Start " + cmd
+ " at offset " + opts
.startRow
+ " for "
2330 + opts
.perClientRunRows
+ " rows");
2331 long totalElapsedTime
;
2335 if (AsyncTest
.class.isAssignableFrom(cmd
)) {
2336 Class
<?
extends AsyncTest
> newCmd
= (Class
<?
extends AsyncTest
>) cmd
;
2337 Constructor
<?
extends AsyncTest
> constructor
=
2338 newCmd
.getDeclaredConstructor(AsyncConnection
.class, TestOptions
.class, Status
.class);
2339 t
= constructor
.newInstance(asyncCon
, opts
, status
);
2341 Class
<?
extends Test
> newCmd
= (Class
<?
extends Test
>) cmd
;
2342 Constructor
<?
extends Test
> constructor
=
2343 newCmd
.getDeclaredConstructor(Connection
.class, TestOptions
.class, Status
.class);
2344 t
= constructor
.newInstance(con
, opts
, status
);
2346 } catch (NoSuchMethodException e
) {
2347 throw new IllegalArgumentException("Invalid command class: " + cmd
.getName()
2348 + ". It does not provide a constructor as described by "
2349 + "the javadoc comment. Available constructors are: "
2350 + Arrays
.toString(cmd
.getConstructors()));
2351 } catch (Exception e
) {
2352 throw new IllegalStateException("Failed to construct command class", e
);
2354 totalElapsedTime
= t
.test();
2356 status
.setStatus("Finished " + cmd
+ " in " + totalElapsedTime
+
2357 "ms at offset " + opts
.startRow
+ " for " + opts
.perClientRunRows
+ " rows" +
2358 " (" + calculateMbps((int)(opts
.perClientRunRows
* opts
.sampleRate
), totalElapsedTime
,
2359 getAverageValueLength(opts
), opts
.families
, opts
.columns
) + ")");
2361 return new RunResult(totalElapsedTime
, t
.getLatencyHistogram());
2364 private static int getAverageValueLength(final TestOptions opts
) {
2365 return opts
.valueRandom? opts
.valueSize
/2: opts
.valueSize
;
2368 private void runTest(final Class
<?
extends TestBase
> cmd
, TestOptions opts
) throws IOException
,
2369 InterruptedException
, ClassNotFoundException
, ExecutionException
{
2370 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2371 // the TestOptions introspection for us and dump the output in a readable format.
2372 LOG
.info(cmd
.getSimpleName() + " test run options=" + GSON
.toJson(opts
));
2374 Connection connection
= null;
2376 connection
= ConnectionFactory
.createConnection(getConf());
2377 admin
= connection
.getAdmin();
2378 checkTable(admin
, opts
);
2380 if (admin
!= null) admin
.close();
2381 if (connection
!= null) connection
.close();
2383 if (opts
.nomapred
) {
2384 doLocalClients(opts
, getConf());
2386 doMapReduce(opts
, getConf());
2390 protected void printUsage() {
2391 printUsage(PE_COMMAND_SHORTNAME
, null);
2394 protected static void printUsage(final String message
) {
2395 printUsage(PE_COMMAND_SHORTNAME
, message
);
2398 protected static void printUsageAndExit(final String message
, final int exitCode
) {
2399 printUsage(message
);
2400 System
.exit(exitCode
);
2403 protected static void printUsage(final String shortName
, final String message
) {
2404 if (message
!= null && message
.length() > 0) {
2405 System
.err
.println(message
);
2407 System
.err
.print("Usage: hbase " + shortName
);
2408 System
.err
.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
2409 System
.err
.println();
2410 System
.err
.println("General Options:");
2411 System
.err
.println(" nomapred Run multiple clients using threads " +
2412 "(rather than use mapreduce)");
2413 System
.err
.println(" oneCon all the threads share the same connection. Default: False");
2414 System
.err
.println(" connCount connections all threads share. "
2415 + "For example, if set to 2, then all thread share 2 connection. "
2416 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2417 + "if not, connCount=thread number");
2419 System
.err
.println(" sampleRate Execute test on a sample of total " +
2420 "rows. Only supported by randomRead. Default: 1.0");
2421 System
.err
.println(" period Report every 'period' rows: " +
2422 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS
.getPerClientRunRows()/10);
2423 System
.err
.println(" cycles How many times to cycle the test. Defaults: 1.");
2424 System
.err
.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
2426 System
.err
.println(" latency Set to report operation latencies. Default: False");
2427 System
.err
.println(" measureAfter Start to measure the latency once 'measureAfter'" +
2428 " rows have been treated. Default: 0");
2429 System
.err
.println(" valueSize Pass value size to use: Default: "
2430 + DEFAULT_OPTS
.getValueSize());
2431 System
.err
.println(" valueRandom Set if we should vary value size between 0 and " +
2432 "'valueSize'; set on read for stats on size: Default: Not set.");
2433 System
.err
.println(" blockEncoding Block encoding to use. Value should be one of "
2434 + Arrays
.toString(DataBlockEncoding
.values()) + ". Default: NONE");
2435 System
.err
.println();
2436 System
.err
.println("Table Creation / Write Tests:");
2437 System
.err
.println(" table Alternate table name. Default: 'TestTable'");
2438 System
.err
.println(" rows Rows each client runs. Default: "
2439 + DEFAULT_OPTS
.getPerClientRunRows()
2440 + ". In case of randomReads and randomSeekScans this could"
2441 + " be specified along with --size to specify the number of rows to be scanned within"
2442 + " the total range specified by the size.");
2444 " size Total size in GiB. Mutually exclusive with --rows for writes and scans"
2445 + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2446 + " use size to specify the end range and --rows"
2447 + " specifies the number of rows within that range. " + "Default: 1.0.");
2448 System
.err
.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2449 System
.err
.println(" flushCommits Used to determine if the test should flush the table. " +
2451 System
.err
.println(" valueZipf Set if we should vary value size between 0 and " +
2452 "'valueSize' in zipf form: Default: Not set.");
2453 System
.err
.println(" writeToWAL Set writeToWAL on puts. Default: True");
2454 System
.err
.println(" autoFlush Set autoFlush on htable. Default: False");
2455 System
.err
.println(" multiPut Batch puts together into groups of N. Only supported " +
2456 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2457 System
.err
.println(" presplit Create presplit table. If a table with same name exists,"
2458 + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2459 + "Recommended for accurate perf analysis (see guide). Default: disabled");
2460 System
.err
.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
2462 System
.err
.println(" numoftags Specify the no of tags that would be needed. " +
2463 "This works only if usetags is true. Default: " + DEFAULT_OPTS
.noOfTags
);
2464 System
.err
.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
2465 System
.err
.println(" columns Columns to write per row. Default: 1");
2466 System
.err
.println(" families Specify number of column families for the table. Default: 1");
2467 System
.err
.println();
2468 System
.err
.println("Read Tests:");
2469 System
.err
.println(" filterAll Helps to filter out all the rows on the server side"
2470 + " there by not returning any thing back to the client. Helps to check the server side"
2471 + " performance. Uses FilterAllFilter internally. ");
2472 System
.err
.println(" multiGet Batch gets together into groups of N. Only supported " +
2473 "by randomRead. Default: disabled");
2474 System
.err
.println(" inmemory Tries to keep the HFiles of the CF " +
2475 "inmemory as far as possible. Not guaranteed that reads are always served " +
2476 "from memory. Default: false");
2477 System
.err
.println(" bloomFilter Bloom filter type, one of "
2478 + Arrays
.toString(BloomType
.values()));
2479 System
.err
.println(" blockSize Blocksize to use when writing out hfiles. ");
2480 System
.err
.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. "
2481 + "Uses the CompactingMemstore");
2482 System
.err
.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
2483 System
.err
.println(" replicas Enable region replica testing. Defaults: 1.");
2484 System
.err
.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
2485 System
.err
.println(" caching Scan caching to use. Default: 30");
2486 System
.err
.println(" asyncPrefetch Enable asyncPrefetch for scan");
2487 System
.err
.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true");
2488 System
.err
.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default");
2489 System
.err
.println(" bufferSize Set the value of client side buffering. Default: 2MB");
2490 System
.err
.println();
2491 System
.err
.println(" Note: -D properties will be applied to the conf used. ");
2492 System
.err
.println(" For example: ");
2493 System
.err
.println(" -Dmapreduce.output.fileoutputformat.compress=true");
2494 System
.err
.println(" -Dmapreduce.task.timeout=60000");
2495 System
.err
.println();
2496 System
.err
.println("Command:");
2497 for (CmdDescriptor command
: COMMANDS
.values()) {
2498 System
.err
.println(String
.format(" %-20s %s", command
.getName(), command
.getDescription()));
2500 System
.err
.println();
2501 System
.err
.println("Args:");
2502 System
.err
.println(" nclients Integer. Required. Total number of clients "
2503 + "(and HRegionServers) running. 1 <= value <= 500");
2504 System
.err
.println("Examples:");
2505 System
.err
.println(" To run a single client doing the default 1M sequentialWrites:");
2506 System
.err
.println(" $ hbase " + shortName
+ " sequentialWrite 1");
2507 System
.err
.println(" To run 10 clients doing increments over ten rows:");
2508 System
.err
.println(" $ hbase " + shortName
+ " --rows=10 --nomapred increment 10");
2512 * Parse options passed in via an arguments array. Assumes that array has been split
2513 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2514 * in the queue at the conclusion of this method call. It's up to the caller to deal
2515 * with these unrecognized arguments.
2517 static TestOptions
parseOpts(Queue
<String
> args
) {
2518 TestOptions opts
= new TestOptions();
2521 while ((cmd
= args
.poll()) != null) {
2522 if (cmd
.equals("-h") || cmd
.startsWith("--h")) {
2523 // place item back onto queue so that caller knows parsing was incomplete
2528 final String nmr
= "--nomapred";
2529 if (cmd
.startsWith(nmr
)) {
2530 opts
.nomapred
= true;
2534 final String rows
= "--rows=";
2535 if (cmd
.startsWith(rows
)) {
2536 opts
.perClientRunRows
= Integer
.parseInt(cmd
.substring(rows
.length()));
2540 final String cycles
= "--cycles=";
2541 if (cmd
.startsWith(cycles
)) {
2542 opts
.cycles
= Integer
.parseInt(cmd
.substring(cycles
.length()));
2546 final String sampleRate
= "--sampleRate=";
2547 if (cmd
.startsWith(sampleRate
)) {
2548 opts
.sampleRate
= Float
.parseFloat(cmd
.substring(sampleRate
.length()));
2552 final String table
= "--table=";
2553 if (cmd
.startsWith(table
)) {
2554 opts
.tableName
= cmd
.substring(table
.length());
2558 final String startRow
= "--startRow=";
2559 if (cmd
.startsWith(startRow
)) {
2560 opts
.startRow
= Integer
.parseInt(cmd
.substring(startRow
.length()));
2564 final String compress
= "--compress=";
2565 if (cmd
.startsWith(compress
)) {
2566 opts
.compression
= Compression
.Algorithm
.valueOf(cmd
.substring(compress
.length()));
2570 final String traceRate
= "--traceRate=";
2571 if (cmd
.startsWith(traceRate
)) {
2572 opts
.traceRate
= Double
.parseDouble(cmd
.substring(traceRate
.length()));
2576 final String blockEncoding
= "--blockEncoding=";
2577 if (cmd
.startsWith(blockEncoding
)) {
2578 opts
.blockEncoding
= DataBlockEncoding
.valueOf(cmd
.substring(blockEncoding
.length()));
2582 final String flushCommits
= "--flushCommits=";
2583 if (cmd
.startsWith(flushCommits
)) {
2584 opts
.flushCommits
= Boolean
.parseBoolean(cmd
.substring(flushCommits
.length()));
2588 final String writeToWAL
= "--writeToWAL=";
2589 if (cmd
.startsWith(writeToWAL
)) {
2590 opts
.writeToWAL
= Boolean
.parseBoolean(cmd
.substring(writeToWAL
.length()));
2594 final String presplit
= "--presplit=";
2595 if (cmd
.startsWith(presplit
)) {
2596 opts
.presplitRegions
= Integer
.parseInt(cmd
.substring(presplit
.length()));
2600 final String inMemory
= "--inmemory=";
2601 if (cmd
.startsWith(inMemory
)) {
2602 opts
.inMemoryCF
= Boolean
.parseBoolean(cmd
.substring(inMemory
.length()));
2606 final String autoFlush
= "--autoFlush=";
2607 if (cmd
.startsWith(autoFlush
)) {
2608 opts
.autoFlush
= Boolean
.parseBoolean(cmd
.substring(autoFlush
.length()));
2609 if (!opts
.autoFlush
&& opts
.multiPut
> 0) {
2610 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2615 final String onceCon
= "--oneCon=";
2616 if (cmd
.startsWith(onceCon
)) {
2617 opts
.oneCon
= Boolean
.parseBoolean(cmd
.substring(onceCon
.length()));
2618 if (opts
.oneCon
&& opts
.connCount
> 1) {
2619 throw new IllegalArgumentException("oneCon is set to true, "
2620 + "connCount should not bigger than 1");
2625 final String connCount
= "--connCount=";
2626 if (cmd
.startsWith(connCount
)) {
2627 opts
.connCount
= Integer
.parseInt(cmd
.substring(connCount
.length()));
2628 if (opts
.oneCon
&& opts
.connCount
> 1) {
2629 throw new IllegalArgumentException("oneCon is set to true, "
2630 + "connCount should not bigger than 1");
2635 final String latency
= "--latency";
2636 if (cmd
.startsWith(latency
)) {
2637 opts
.reportLatency
= true;
2641 final String multiGet
= "--multiGet=";
2642 if (cmd
.startsWith(multiGet
)) {
2643 opts
.multiGet
= Integer
.parseInt(cmd
.substring(multiGet
.length()));
2647 final String multiPut
= "--multiPut=";
2648 if (cmd
.startsWith(multiPut
)) {
2649 opts
.multiPut
= Integer
.parseInt(cmd
.substring(multiPut
.length()));
2650 if (!opts
.autoFlush
&& opts
.multiPut
> 0) {
2651 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2656 final String useTags
= "--usetags=";
2657 if (cmd
.startsWith(useTags
)) {
2658 opts
.useTags
= Boolean
.parseBoolean(cmd
.substring(useTags
.length()));
2662 final String noOfTags
= "--numoftags=";
2663 if (cmd
.startsWith(noOfTags
)) {
2664 opts
.noOfTags
= Integer
.parseInt(cmd
.substring(noOfTags
.length()));
2668 final String replicas
= "--replicas=";
2669 if (cmd
.startsWith(replicas
)) {
2670 opts
.replicas
= Integer
.parseInt(cmd
.substring(replicas
.length()));
2674 final String filterOutAll
= "--filterAll";
2675 if (cmd
.startsWith(filterOutAll
)) {
2676 opts
.filterAll
= true;
2680 final String size
= "--size=";
2681 if (cmd
.startsWith(size
)) {
2682 opts
.size
= Float
.parseFloat(cmd
.substring(size
.length()));
2683 if (opts
.size
<= 1.0f
) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2687 final String splitPolicy
= "--splitPolicy=";
2688 if (cmd
.startsWith(splitPolicy
)) {
2689 opts
.splitPolicy
= cmd
.substring(splitPolicy
.length());
2693 final String randomSleep
= "--randomSleep=";
2694 if (cmd
.startsWith(randomSleep
)) {
2695 opts
.randomSleep
= Integer
.parseInt(cmd
.substring(randomSleep
.length()));
2699 final String measureAfter
= "--measureAfter=";
2700 if (cmd
.startsWith(measureAfter
)) {
2701 opts
.measureAfter
= Integer
.parseInt(cmd
.substring(measureAfter
.length()));
2705 final String bloomFilter
= "--bloomFilter=";
2706 if (cmd
.startsWith(bloomFilter
)) {
2707 opts
.bloomType
= BloomType
.valueOf(cmd
.substring(bloomFilter
.length()));
2711 final String blockSize
= "--blockSize=";
2712 if(cmd
.startsWith(blockSize
) ) {
2713 opts
.blockSize
= Integer
.parseInt(cmd
.substring(blockSize
.length()));
2716 final String valueSize
= "--valueSize=";
2717 if (cmd
.startsWith(valueSize
)) {
2718 opts
.valueSize
= Integer
.parseInt(cmd
.substring(valueSize
.length()));
2722 final String valueRandom
= "--valueRandom";
2723 if (cmd
.startsWith(valueRandom
)) {
2724 opts
.valueRandom
= true;
2725 if (opts
.valueZipf
) {
2726 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2731 final String valueZipf
= "--valueZipf";
2732 if (cmd
.startsWith(valueZipf
)) {
2733 opts
.valueZipf
= true;
2734 if (opts
.valueRandom
) {
2735 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2740 final String period
= "--period=";
2741 if (cmd
.startsWith(period
)) {
2742 opts
.period
= Integer
.parseInt(cmd
.substring(period
.length()));
2746 final String addColumns
= "--addColumns=";
2747 if (cmd
.startsWith(addColumns
)) {
2748 opts
.addColumns
= Boolean
.parseBoolean(cmd
.substring(addColumns
.length()));
2752 final String inMemoryCompaction
= "--inmemoryCompaction=";
2753 if (cmd
.startsWith(inMemoryCompaction
)) {
2754 opts
.inMemoryCompaction
=
2755 MemoryCompactionPolicy
.valueOf(cmd
.substring(inMemoryCompaction
.length()));
2759 final String columns
= "--columns=";
2760 if (cmd
.startsWith(columns
)) {
2761 opts
.columns
= Integer
.parseInt(cmd
.substring(columns
.length()));
2765 final String families
= "--families=";
2766 if (cmd
.startsWith(families
)) {
2767 opts
.families
= Integer
.parseInt(cmd
.substring(families
.length()));
2771 final String caching
= "--caching=";
2772 if (cmd
.startsWith(caching
)) {
2773 opts
.caching
= Integer
.parseInt(cmd
.substring(caching
.length()));
2777 final String asyncPrefetch
= "--asyncPrefetch";
2778 if (cmd
.startsWith(asyncPrefetch
)) {
2779 opts
.asyncPrefetch
= true;
2783 final String cacheBlocks
= "--cacheBlocks=";
2784 if (cmd
.startsWith(cacheBlocks
)) {
2785 opts
.cacheBlocks
= Boolean
.parseBoolean(cmd
.substring(cacheBlocks
.length()));
2789 final String scanReadType
= "--scanReadType=";
2790 if (cmd
.startsWith(scanReadType
)) {
2792 Scan
.ReadType
.valueOf(cmd
.substring(scanReadType
.length()).toUpperCase());
2796 final String bufferSize
= "--bufferSize=";
2797 if (cmd
.startsWith(bufferSize
)) {
2798 opts
.bufferSize
= Long
.parseLong(cmd
.substring(bufferSize
.length()));
2802 if (isCommandClass(cmd
)) {
2805 opts
.numClientThreads
= Integer
.parseInt(args
.remove());
2806 } catch (NoSuchElementException
| NumberFormatException e
) {
2807 throw new IllegalArgumentException("Command " + cmd
+ " does not have threads number", e
);
2809 opts
= calculateRowsAndSize(opts
);
2812 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd
, -1);
2815 // Not matching any option or command.
2816 System
.err
.println("Error: Wrong option or command: " + cmd
);
2823 static TestOptions
calculateRowsAndSize(final TestOptions opts
) {
2824 int rowsPerGB
= getRowsPerGB(opts
);
2825 if ((opts
.getCmdName() != null
2826 && (opts
.getCmdName().equals(RANDOM_READ
) || opts
.getCmdName().equals(RANDOM_SEEK_SCAN
)))
2827 && opts
.size
!= DEFAULT_OPTS
.size
2828 && opts
.perClientRunRows
!= DEFAULT_OPTS
.perClientRunRows
) {
2829 opts
.totalRows
= (int) opts
.size
* rowsPerGB
;
2830 } else if (opts
.size
!= DEFAULT_OPTS
.size
) {
2831 // total size in GB specified
2832 opts
.totalRows
= (int) opts
.size
* rowsPerGB
;
2833 opts
.perClientRunRows
= opts
.totalRows
/ opts
.numClientThreads
;
2835 opts
.totalRows
= opts
.perClientRunRows
* opts
.numClientThreads
;
2836 opts
.size
= opts
.totalRows
/ rowsPerGB
;
2841 static int getRowsPerGB(final TestOptions opts
) {
2842 return ONE_GB
/ ((opts
.valueRandom? opts
.valueSize
/2: opts
.valueSize
) * opts
.getFamilies() *
2847 public int run(String
[] args
) throws Exception
{
2848 // Process command-line args. TODO: Better cmd-line processing
2849 // (but hopefully something not as painful as cli options).
2851 if (args
.length
< 1) {
2857 LinkedList
<String
> argv
= new LinkedList
<>();
2858 argv
.addAll(Arrays
.asList(args
));
2859 TestOptions opts
= parseOpts(argv
);
2861 // args remaining, print help and exit
2862 if (!argv
.isEmpty()) {
2868 // must run at least 1 client
2869 if (opts
.numClientThreads
<= 0) {
2870 throw new IllegalArgumentException("Number of clients must be > 0");
2873 // cmdName should not be null, print help and exit
2874 if (opts
.cmdName
== null) {
2879 Class
<?
extends TestBase
> cmdClass
= determineCommandClass(opts
.cmdName
);
2880 if (cmdClass
!= null) {
2881 runTest(cmdClass
, opts
);
2885 } catch (Exception e
) {
2886 e
.printStackTrace();
2892 private static boolean isCommandClass(String cmd
) {
2893 return COMMANDS
.containsKey(cmd
);
2896 private static Class
<?
extends TestBase
> determineCommandClass(String cmd
) {
2897 CmdDescriptor descriptor
= COMMANDS
.get(cmd
);
2898 return descriptor
!= null ? descriptor
.getCmdClass() : null;
2901 public static void main(final String
[] args
) throws Exception
{
2902 int res
= ToolRunner
.run(new PerformanceEvaluation(HBaseConfiguration
.create()), args
);