HBASE-23868 : Replace usages of HColumnDescriptor(byte [] familyName)… (#1222)
[hbase.git] / hbase-mapreduce / src / test / java / org / apache / hadoop / hbase / PerformanceEvaluation.java
blobcb6dc1b354cfa1c27b900c5fdfe975d60fd7d5aa
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import com.codahale.metrics.Histogram;
22 import com.codahale.metrics.UniformReservoir;
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.lang.reflect.Constructor;
26 import java.math.BigDecimal;
27 import java.math.MathContext;
28 import java.text.DecimalFormat;
29 import java.text.SimpleDateFormat;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.LinkedList;
34 import java.util.Locale;
35 import java.util.Map;
36 import java.util.NoSuchElementException;
37 import java.util.Queue;
38 import java.util.Random;
39 import java.util.TreeMap;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import org.apache.commons.lang3.StringUtils;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.conf.Configured;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.client.Admin;
51 import org.apache.hadoop.hbase.client.Append;
52 import org.apache.hadoop.hbase.client.AsyncConnection;
53 import org.apache.hadoop.hbase.client.AsyncTable;
54 import org.apache.hadoop.hbase.client.BufferedMutator;
55 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
56 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
57 import org.apache.hadoop.hbase.client.Connection;
58 import org.apache.hadoop.hbase.client.ConnectionFactory;
59 import org.apache.hadoop.hbase.client.Consistency;
60 import org.apache.hadoop.hbase.client.Delete;
61 import org.apache.hadoop.hbase.client.Durability;
62 import org.apache.hadoop.hbase.client.Get;
63 import org.apache.hadoop.hbase.client.Increment;
64 import org.apache.hadoop.hbase.client.Put;
65 import org.apache.hadoop.hbase.client.Result;
66 import org.apache.hadoop.hbase.client.ResultScanner;
67 import org.apache.hadoop.hbase.client.RowMutations;
68 import org.apache.hadoop.hbase.client.Scan;
69 import org.apache.hadoop.hbase.client.Table;
70 import org.apache.hadoop.hbase.client.TableDescriptor;
71 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
72 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
73 import org.apache.hadoop.hbase.filter.BinaryComparator;
74 import org.apache.hadoop.hbase.filter.Filter;
75 import org.apache.hadoop.hbase.filter.FilterAllFilter;
76 import org.apache.hadoop.hbase.filter.FilterList;
77 import org.apache.hadoop.hbase.filter.PageFilter;
78 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
79 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
80 import org.apache.hadoop.hbase.io.compress.Compression;
81 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
82 import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
83 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
84 import org.apache.hadoop.hbase.regionserver.BloomType;
85 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
86 import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
87 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
88 import org.apache.hadoop.hbase.trace.TraceUtil;
89 import org.apache.hadoop.hbase.util.ByteArrayHashKey;
90 import org.apache.hadoop.hbase.util.Bytes;
91 import org.apache.hadoop.hbase.util.GsonUtil;
92 import org.apache.hadoop.hbase.util.Hash;
93 import org.apache.hadoop.hbase.util.MurmurHash;
94 import org.apache.hadoop.hbase.util.Pair;
95 import org.apache.hadoop.hbase.util.YammerHistogramUtils;
96 import org.apache.hadoop.io.LongWritable;
97 import org.apache.hadoop.io.Text;
98 import org.apache.hadoop.mapreduce.Job;
99 import org.apache.hadoop.mapreduce.Mapper;
100 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
101 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
102 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
103 import org.apache.hadoop.util.Tool;
104 import org.apache.hadoop.util.ToolRunner;
105 import org.apache.htrace.core.ProbabilitySampler;
106 import org.apache.htrace.core.Sampler;
107 import org.apache.htrace.core.TraceScope;
108 import org.apache.yetus.audience.InterfaceAudience;
109 import org.slf4j.Logger;
110 import org.slf4j.LoggerFactory;
112 import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
113 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
114 import org.apache.hbase.thirdparty.com.google.gson.Gson;
117 * Script used evaluating HBase performance and scalability. Runs a HBase
118 * client that steps through one of a set of hardcoded tests or 'experiments'
119 * (e.g. a random reads test, a random writes test, etc.). Pass on the
120 * command-line which test to run and how many clients are participating in
121 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
123 * <p>This class sets up and runs the evaluation programs described in
124 * Section 7, <i>Performance Evaluation</i>, of the <a
125 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
126 * paper, pages 8-10.
128 * <p>By default, runs as a mapreduce job where each mapper runs a single test
129 * client. Can also run as a non-mapreduce, multithreaded application by
130 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
131 * specified otherwise.
133 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
134 public class PerformanceEvaluation extends Configured implements Tool {
135 static final String RANDOM_SEEK_SCAN = "randomSeekScan";
136 static final String RANDOM_READ = "randomRead";
137 static final String PE_COMMAND_SHORTNAME = "pe";
138 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
139 private static final Gson GSON = GsonUtil.createGson().create();
141 public static final String TABLE_NAME = "TestTable";
142 public static final String FAMILY_NAME_BASE = "info";
143 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
144 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
145 public static final int DEFAULT_VALUE_LENGTH = 1000;
146 public static final int ROW_LENGTH = 26;
148 private static final int ONE_GB = 1024 * 1024 * 1000;
149 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
150 // TODO : should we make this configurable
151 private static final int TAG_LENGTH = 256;
152 private static final DecimalFormat FMT = new DecimalFormat("0.##");
153 private static final MathContext CXT = MathContext.DECIMAL64;
154 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
155 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
156 private static final TestOptions DEFAULT_OPTS = new TestOptions();
158 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
159 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
161 static {
162 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
163 "Run async random read test");
164 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
165 "Run async random write test");
166 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
167 "Run async sequential read test");
168 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
169 "Run async sequential write test");
170 addCommandDescriptor(AsyncScanTest.class, "asyncScan",
171 "Run async scan test (read every row)");
172 addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
173 "Run random read test");
174 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
175 "Run random seek and scan 100 test");
176 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
177 "Run random seek scan with both start and stop row (max 10 rows)");
178 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
179 "Run random seek scan with both start and stop row (max 100 rows)");
180 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
181 "Run random seek scan with both start and stop row (max 1000 rows)");
182 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
183 "Run random seek scan with both start and stop row (max 10000 rows)");
184 addCommandDescriptor(RandomWriteTest.class, "randomWrite",
185 "Run random write test");
186 addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
187 "Run sequential read test");
188 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
189 "Run sequential write test");
190 addCommandDescriptor(ScanTest.class, "scan",
191 "Run scan test (read every row)");
192 addCommandDescriptor(FilteredScanTest.class, "filterScan",
193 "Run scan test using a filter to find a specific row based on it's value " +
194 "(make sure to use --rows=20)");
195 addCommandDescriptor(IncrementTest.class, "increment",
196 "Increment on each row; clients overlap on keyspace so some concurrent operations");
197 addCommandDescriptor(AppendTest.class, "append",
198 "Append on each row; clients overlap on keyspace so some concurrent operations");
199 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
200 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
201 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
202 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
203 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
204 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
208 * Enum for map metrics. Keep it out here rather than inside in the Map
209 * inner-class so we can find associated properties.
211 protected static enum Counter {
212 /** elapsed time */
213 ELAPSED_TIME,
214 /** number of rows */
215 ROWS
218 protected static class RunResult implements Comparable<RunResult> {
219 public RunResult(long duration, Histogram hist) {
220 this.duration = duration;
221 this.hist = hist;
224 public final long duration;
225 public final Histogram hist;
227 @Override
228 public String toString() {
229 return Long.toString(duration);
232 @Override public int compareTo(RunResult o) {
233 return Long.compare(this.duration, o.duration);
238 * Constructor
239 * @param conf Configuration object
241 public PerformanceEvaluation(final Configuration conf) {
242 super(conf);
245 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
246 String name, String description) {
247 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
248 COMMANDS.put(name, cmdDescriptor);
252 * Implementations can have their status set.
254 interface Status {
256 * Sets status
257 * @param msg status message
258 * @throws IOException
260 void setStatus(final String msg) throws IOException;
264 * MapReduce job that runs a performance evaluation client in each map task.
266 public static class EvaluationMapTask
267 extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
269 /** configuration parameter name that contains the command */
270 public final static String CMD_KEY = "EvaluationMapTask.command";
271 /** configuration parameter name that contains the PE impl */
272 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
274 private Class<? extends Test> cmd;
276 @Override
277 protected void setup(Context context) throws IOException, InterruptedException {
278 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
280 // this is required so that extensions of PE are instantiated within the
281 // map reduce task...
282 Class<? extends PerformanceEvaluation> peClass =
283 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
284 try {
285 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
286 } catch (Exception e) {
287 throw new IllegalStateException("Could not instantiate PE instance", e);
291 private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
292 try {
293 return Class.forName(className).asSubclass(type);
294 } catch (ClassNotFoundException e) {
295 throw new IllegalStateException("Could not find class for name: " + className, e);
299 @Override
300 protected void map(LongWritable key, Text value, final Context context)
301 throws IOException, InterruptedException {
303 Status status = new Status() {
304 @Override
305 public void setStatus(String msg) {
306 context.setStatus(msg);
310 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
311 Configuration conf = HBaseConfiguration.create(context.getConfiguration());
312 final Connection con = ConnectionFactory.createConnection(conf);
313 AsyncConnection asyncCon = null;
314 try {
315 asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
316 } catch (ExecutionException e) {
317 throw new IOException(e);
320 // Evaluation task
321 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
322 // Collect how much time the thing took. Report as map output and
323 // to the ELAPSED_TIME counter.
324 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
325 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
326 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
327 context.progress();
332 * If table does not already exist, create. Also create a table when
333 * {@code opts.presplitRegions} is specified or when the existing table's
334 * region replica count doesn't match {@code opts.replicas}.
336 static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
337 TableName tableName = TableName.valueOf(opts.tableName);
338 boolean needsDelete = false, exists = admin.tableExists(tableName);
339 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
340 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
341 if (!exists && isReadCmd) {
342 throw new IllegalStateException(
343 "Must specify an existing table for read commands. Run a write command first.");
345 TableDescriptor desc =
346 exists ? admin.getDescriptor(TableName.valueOf(opts.tableName)) : null;
347 byte[][] splits = getSplits(opts);
349 // recreate the table when user has requested presplit or when existing
350 // {RegionSplitPolicy,replica count} does not match requested, or when the
351 // number of column families does not match requested.
352 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
353 || (!isReadCmd && desc != null &&
354 !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
355 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
356 || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
357 needsDelete = true;
358 // wait, why did it delete my table?!?
359 LOG.debug(MoreObjects.toStringHelper("needsDelete")
360 .add("needsDelete", needsDelete)
361 .add("isReadCmd", isReadCmd)
362 .add("exists", exists)
363 .add("desc", desc)
364 .add("presplit", opts.presplitRegions)
365 .add("splitPolicy", opts.splitPolicy)
366 .add("replicas", opts.replicas)
367 .add("families", opts.families)
368 .toString());
371 // remove an existing table
372 if (needsDelete) {
373 if (admin.isTableEnabled(tableName)) {
374 admin.disableTable(tableName);
376 admin.deleteTable(tableName);
379 // table creation is necessary
380 if (!exists || needsDelete) {
381 desc = getTableDescriptor(opts);
382 if (splits != null) {
383 if (LOG.isDebugEnabled()) {
384 for (int i = 0; i < splits.length; i++) {
385 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
389 if (splits != null) {
390 admin.createTable(desc, splits);
391 } else {
392 admin.createTable(desc);
394 LOG.info("Table " + desc + " created");
396 return admin.tableExists(tableName);
400 * Create an HTableDescriptor from provided TestOptions.
402 protected static TableDescriptor getTableDescriptor(TestOptions opts) {
403 TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
404 new TableDescriptorBuilder.ModifyableTableDescriptor(TableName.valueOf(opts.tableName));
406 for (int family = 0; family < opts.families; family++) {
407 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
408 ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDesc =
409 new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(familyName);
410 familyDesc.setDataBlockEncoding(opts.blockEncoding);
411 familyDesc.setCompressionType(opts.compression);
412 familyDesc.setBloomFilterType(opts.bloomType);
413 familyDesc.setBlocksize(opts.blockSize);
414 if (opts.inMemoryCF) {
415 familyDesc.setInMemory(true);
417 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
418 tableDescriptor.setColumnFamily(familyDesc);
420 if (opts.replicas != DEFAULT_OPTS.replicas) {
421 tableDescriptor.setRegionReplication(opts.replicas);
423 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
424 tableDescriptor.setRegionSplitPolicyClassName(opts.splitPolicy);
426 return tableDescriptor;
430 * generates splits based on total number of rows and specified split regions
432 protected static byte[][] getSplits(TestOptions opts) {
433 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
434 return null;
436 int numSplitPoints = opts.presplitRegions - 1;
437 byte[][] splits = new byte[numSplitPoints][];
438 int jump = opts.totalRows / opts.presplitRegions;
439 for (int i = 0; i < numSplitPoints; i++) {
440 int rowkey = jump * (1 + i);
441 splits[i] = format(rowkey);
443 return splits;
446 static void setupConnectionCount(final TestOptions opts) {
447 if (opts.oneCon) {
448 opts.connCount = 1;
449 } else {
450 if (opts.connCount == -1) {
451 // set to thread number if connCount is not set
452 opts.connCount = opts.numClientThreads;
458 * Run all clients in this vm each to its own thread.
460 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
461 throws IOException, InterruptedException, ExecutionException {
462 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
463 assert cmd != null;
464 @SuppressWarnings("unchecked")
465 Future<RunResult>[] threads = new Future[opts.numClientThreads];
466 RunResult[] results = new RunResult[opts.numClientThreads];
467 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
468 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
469 setupConnectionCount(opts);
470 final Connection[] cons = new Connection[opts.connCount];
471 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
472 for (int i = 0; i < opts.connCount; i++) {
473 cons[i] = ConnectionFactory.createConnection(conf);
474 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
476 LOG.info("Created " + opts.connCount + " connections for " +
477 opts.numClientThreads + " threads");
478 for (int i = 0; i < threads.length; i++) {
479 final int index = i;
480 threads[i] = pool.submit(new Callable<RunResult>() {
481 @Override
482 public RunResult call() throws Exception {
483 TestOptions threadOpts = new TestOptions(opts);
484 final Connection con = cons[index % cons.length];
485 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
486 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
487 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
488 @Override
489 public void setStatus(final String msg) throws IOException {
490 LOG.info(msg);
493 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
494 "ms over " + threadOpts.perClientRunRows + " rows");
495 return run;
499 pool.shutdown();
501 for (int i = 0; i < threads.length; i++) {
502 try {
503 results[i] = threads[i].get();
504 } catch (ExecutionException e) {
505 throw new IOException(e.getCause());
508 final String test = cmd.getSimpleName();
509 LOG.info("[" + test + "] Summary of timings (ms): "
510 + Arrays.toString(results));
511 Arrays.sort(results);
512 long total = 0;
513 float avgLatency = 0 ;
514 float avgTPS = 0;
515 for (RunResult result : results) {
516 total += result.duration;
517 avgLatency += result.hist.getSnapshot().getMean();
518 avgTPS += opts.perClientRunRows * 1.0f / result.duration;
520 avgTPS *= 1000; // ms to second
521 avgLatency = avgLatency / results.length;
522 LOG.info("[" + test + " duration ]"
523 + "\tMin: " + results[0] + "ms"
524 + "\tMax: " + results[results.length - 1] + "ms"
525 + "\tAvg: " + (total / results.length) + "ms");
526 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
527 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
528 for (int i = 0; i < opts.connCount; i++) {
529 cons[i].close();
530 asyncCons[i].close();
534 return results;
538 * Run a mapreduce job. Run as many maps as asked-for clients.
539 * Before we start up the job, write out an input file with instruction
540 * per client regards which row they are to start on.
541 * @param cmd Command to run.
542 * @throws IOException
544 static Job doMapReduce(TestOptions opts, final Configuration conf)
545 throws IOException, InterruptedException, ClassNotFoundException {
546 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
547 assert cmd != null;
548 Path inputDir = writeInputFile(conf, opts);
549 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
550 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
551 Job job = Job.getInstance(conf);
552 job.setJarByClass(PerformanceEvaluation.class);
553 job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
555 job.setInputFormatClass(NLineInputFormat.class);
556 NLineInputFormat.setInputPaths(job, inputDir);
557 // this is default, but be explicit about it just in case.
558 NLineInputFormat.setNumLinesPerSplit(job, 1);
560 job.setOutputKeyClass(LongWritable.class);
561 job.setOutputValueClass(LongWritable.class);
563 job.setMapperClass(EvaluationMapTask.class);
564 job.setReducerClass(LongSumReducer.class);
566 job.setNumReduceTasks(1);
568 job.setOutputFormatClass(TextOutputFormat.class);
569 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
571 TableMapReduceUtil.addDependencyJars(job);
572 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
573 Histogram.class, // yammer metrics
574 Gson.class, // gson
575 FilterAllFilter.class // hbase-server tests jar
578 TableMapReduceUtil.initCredentials(job);
580 job.waitForCompletion(true);
581 return job;
585 * Each client has one mapper to do the work, and client do the resulting count in a map task.
588 static String JOB_INPUT_FILENAME = "input.txt";
591 * Write input file of offsets-per-client for the mapreduce job.
592 * @param c Configuration
593 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
594 * @throws IOException
596 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
597 return writeInputFile(c, opts, new Path("."));
600 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
601 throws IOException {
602 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
603 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
604 Path inputDir = new Path(jobdir, "inputs");
606 FileSystem fs = FileSystem.get(c);
607 fs.mkdirs(inputDir);
609 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
610 PrintStream out = new PrintStream(fs.create(inputFile));
611 // Make input random.
612 Map<Integer, String> m = new TreeMap<>();
613 Hash h = MurmurHash.getInstance();
614 int perClientRows = (opts.totalRows / opts.numClientThreads);
615 try {
616 for (int j = 0; j < opts.numClientThreads; j++) {
617 TestOptions next = new TestOptions(opts);
618 next.startRow = j * perClientRows;
619 next.perClientRunRows = perClientRows;
620 String s = GSON.toJson(next);
621 LOG.info("Client=" + j + ", input=" + s);
622 byte[] b = Bytes.toBytes(s);
623 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
624 m.put(hash, s);
626 for (Map.Entry<Integer, String> e: m.entrySet()) {
627 out.println(e.getValue());
629 } finally {
630 out.close();
632 return inputDir;
636 * Describes a command.
638 static class CmdDescriptor {
639 private Class<? extends TestBase> cmdClass;
640 private String name;
641 private String description;
643 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
644 this.cmdClass = cmdClass;
645 this.name = name;
646 this.description = description;
649 public Class<? extends TestBase> getCmdClass() {
650 return cmdClass;
653 public String getName() {
654 return name;
657 public String getDescription() {
658 return description;
663 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
664 * This makes tracking all these arguments a little easier.
665 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
666 * serialization of this TestOptions class behave), and you need to add to the clone constructor
667 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below.
669 static class TestOptions {
670 String cmdName = null;
671 boolean nomapred = false;
672 boolean filterAll = false;
673 int startRow = 0;
674 float size = 1.0f;
675 int perClientRunRows = DEFAULT_ROWS_PER_GB;
676 int numClientThreads = 1;
677 int totalRows = DEFAULT_ROWS_PER_GB;
678 int measureAfter = 0;
679 float sampleRate = 1.0f;
680 double traceRate = 0.0;
681 String tableName = TABLE_NAME;
682 boolean flushCommits = true;
683 boolean writeToWAL = true;
684 boolean autoFlush = false;
685 boolean oneCon = false;
686 int connCount = -1; //wil decide the actual num later
687 boolean useTags = false;
688 int noOfTags = 1;
689 boolean reportLatency = false;
690 int multiGet = 0;
691 int multiPut = 0;
692 int randomSleep = 0;
693 boolean inMemoryCF = false;
694 int presplitRegions = 0;
695 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
696 String splitPolicy = null;
697 Compression.Algorithm compression = Compression.Algorithm.NONE;
698 BloomType bloomType = BloomType.ROW;
699 int blockSize = HConstants.DEFAULT_BLOCKSIZE;
700 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
701 boolean valueRandom = false;
702 boolean valueZipf = false;
703 int valueSize = DEFAULT_VALUE_LENGTH;
704 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
705 int cycles = 1;
706 int columns = 1;
707 int families = 1;
708 int caching = 30;
709 boolean addColumns = true;
710 MemoryCompactionPolicy inMemoryCompaction =
711 MemoryCompactionPolicy.valueOf(
712 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
713 boolean asyncPrefetch = false;
714 boolean cacheBlocks = true;
715 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
716 long bufferSize = 2l * 1024l * 1024l;
718 public TestOptions() {}
721 * Clone constructor.
722 * @param that Object to copy from.
724 public TestOptions(TestOptions that) {
725 this.cmdName = that.cmdName;
726 this.cycles = that.cycles;
727 this.nomapred = that.nomapred;
728 this.startRow = that.startRow;
729 this.size = that.size;
730 this.perClientRunRows = that.perClientRunRows;
731 this.numClientThreads = that.numClientThreads;
732 this.totalRows = that.totalRows;
733 this.sampleRate = that.sampleRate;
734 this.traceRate = that.traceRate;
735 this.tableName = that.tableName;
736 this.flushCommits = that.flushCommits;
737 this.writeToWAL = that.writeToWAL;
738 this.autoFlush = that.autoFlush;
739 this.oneCon = that.oneCon;
740 this.connCount = that.connCount;
741 this.useTags = that.useTags;
742 this.noOfTags = that.noOfTags;
743 this.reportLatency = that.reportLatency;
744 this.multiGet = that.multiGet;
745 this.multiPut = that.multiPut;
746 this.inMemoryCF = that.inMemoryCF;
747 this.presplitRegions = that.presplitRegions;
748 this.replicas = that.replicas;
749 this.splitPolicy = that.splitPolicy;
750 this.compression = that.compression;
751 this.blockEncoding = that.blockEncoding;
752 this.filterAll = that.filterAll;
753 this.bloomType = that.bloomType;
754 this.blockSize = that.blockSize;
755 this.valueRandom = that.valueRandom;
756 this.valueZipf = that.valueZipf;
757 this.valueSize = that.valueSize;
758 this.period = that.period;
759 this.randomSleep = that.randomSleep;
760 this.measureAfter = that.measureAfter;
761 this.addColumns = that.addColumns;
762 this.columns = that.columns;
763 this.families = that.families;
764 this.caching = that.caching;
765 this.inMemoryCompaction = that.inMemoryCompaction;
766 this.asyncPrefetch = that.asyncPrefetch;
767 this.cacheBlocks = that.cacheBlocks;
768 this.scanReadType = that.scanReadType;
769 this.bufferSize = that.bufferSize;
772 public int getCaching() {
773 return this.caching;
776 public void setCaching(final int caching) {
777 this.caching = caching;
780 public int getColumns() {
781 return this.columns;
784 public void setColumns(final int columns) {
785 this.columns = columns;
788 public int getFamilies() {
789 return this.families;
792 public void setFamilies(final int families) {
793 this.families = families;
796 public int getCycles() {
797 return this.cycles;
800 public void setCycles(final int cycles) {
801 this.cycles = cycles;
804 public boolean isValueZipf() {
805 return valueZipf;
808 public void setValueZipf(boolean valueZipf) {
809 this.valueZipf = valueZipf;
812 public String getCmdName() {
813 return cmdName;
816 public void setCmdName(String cmdName) {
817 this.cmdName = cmdName;
820 public int getRandomSleep() {
821 return randomSleep;
824 public void setRandomSleep(int randomSleep) {
825 this.randomSleep = randomSleep;
828 public int getReplicas() {
829 return replicas;
832 public void setReplicas(int replicas) {
833 this.replicas = replicas;
836 public String getSplitPolicy() {
837 return splitPolicy;
840 public void setSplitPolicy(String splitPolicy) {
841 this.splitPolicy = splitPolicy;
844 public void setNomapred(boolean nomapred) {
845 this.nomapred = nomapred;
848 public void setFilterAll(boolean filterAll) {
849 this.filterAll = filterAll;
852 public void setStartRow(int startRow) {
853 this.startRow = startRow;
856 public void setSize(float size) {
857 this.size = size;
860 public void setPerClientRunRows(int perClientRunRows) {
861 this.perClientRunRows = perClientRunRows;
864 public void setNumClientThreads(int numClientThreads) {
865 this.numClientThreads = numClientThreads;
868 public void setTotalRows(int totalRows) {
869 this.totalRows = totalRows;
872 public void setSampleRate(float sampleRate) {
873 this.sampleRate = sampleRate;
876 public void setTraceRate(double traceRate) {
877 this.traceRate = traceRate;
880 public void setTableName(String tableName) {
881 this.tableName = tableName;
884 public void setFlushCommits(boolean flushCommits) {
885 this.flushCommits = flushCommits;
888 public void setWriteToWAL(boolean writeToWAL) {
889 this.writeToWAL = writeToWAL;
892 public void setAutoFlush(boolean autoFlush) {
893 this.autoFlush = autoFlush;
896 public void setOneCon(boolean oneCon) {
897 this.oneCon = oneCon;
900 public int getConnCount() {
901 return connCount;
904 public void setConnCount(int connCount) {
905 this.connCount = connCount;
908 public void setUseTags(boolean useTags) {
909 this.useTags = useTags;
912 public void setNoOfTags(int noOfTags) {
913 this.noOfTags = noOfTags;
916 public void setReportLatency(boolean reportLatency) {
917 this.reportLatency = reportLatency;
920 public void setMultiGet(int multiGet) {
921 this.multiGet = multiGet;
924 public void setMultiPut(int multiPut) {
925 this.multiPut = multiPut;
928 public void setInMemoryCF(boolean inMemoryCF) {
929 this.inMemoryCF = inMemoryCF;
932 public void setPresplitRegions(int presplitRegions) {
933 this.presplitRegions = presplitRegions;
936 public void setCompression(Compression.Algorithm compression) {
937 this.compression = compression;
940 public void setBloomType(BloomType bloomType) {
941 this.bloomType = bloomType;
944 public void setBlockSize(int blockSize) {
945 this.blockSize = blockSize;
948 public void setBlockEncoding(DataBlockEncoding blockEncoding) {
949 this.blockEncoding = blockEncoding;
952 public void setValueRandom(boolean valueRandom) {
953 this.valueRandom = valueRandom;
956 public void setValueSize(int valueSize) {
957 this.valueSize = valueSize;
960 public void setBufferSize(long bufferSize) {
961 this.bufferSize = bufferSize;
964 public void setPeriod(int period) {
965 this.period = period;
968 public boolean isNomapred() {
969 return nomapred;
972 public boolean isFilterAll() {
973 return filterAll;
976 public int getStartRow() {
977 return startRow;
980 public float getSize() {
981 return size;
984 public int getPerClientRunRows() {
985 return perClientRunRows;
988 public int getNumClientThreads() {
989 return numClientThreads;
992 public int getTotalRows() {
993 return totalRows;
996 public float getSampleRate() {
997 return sampleRate;
1000 public double getTraceRate() {
1001 return traceRate;
1004 public String getTableName() {
1005 return tableName;
1008 public boolean isFlushCommits() {
1009 return flushCommits;
1012 public boolean isWriteToWAL() {
1013 return writeToWAL;
1016 public boolean isAutoFlush() {
1017 return autoFlush;
1020 public boolean isUseTags() {
1021 return useTags;
1024 public int getNoOfTags() {
1025 return noOfTags;
1028 public boolean isReportLatency() {
1029 return reportLatency;
1032 public int getMultiGet() {
1033 return multiGet;
1036 public int getMultiPut() {
1037 return multiPut;
1040 public boolean isInMemoryCF() {
1041 return inMemoryCF;
1044 public int getPresplitRegions() {
1045 return presplitRegions;
1048 public Compression.Algorithm getCompression() {
1049 return compression;
1052 public DataBlockEncoding getBlockEncoding() {
1053 return blockEncoding;
1056 public boolean isValueRandom() {
1057 return valueRandom;
1060 public int getValueSize() {
1061 return valueSize;
1064 public int getPeriod() {
1065 return period;
1068 public BloomType getBloomType() {
1069 return bloomType;
1072 public int getBlockSize() {
1073 return blockSize;
1076 public boolean isOneCon() {
1077 return oneCon;
1080 public int getMeasureAfter() {
1081 return measureAfter;
1084 public void setMeasureAfter(int measureAfter) {
1085 this.measureAfter = measureAfter;
1088 public boolean getAddColumns() {
1089 return addColumns;
1092 public void setAddColumns(boolean addColumns) {
1093 this.addColumns = addColumns;
1096 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1097 this.inMemoryCompaction = inMemoryCompaction;
1100 public MemoryCompactionPolicy getInMemoryCompaction() {
1101 return this.inMemoryCompaction;
1104 public long getBufferSize() {
1105 return this.bufferSize;
1110 * A test.
1111 * Subclass to particularize what happens per row.
1113 static abstract class TestBase {
1114 // Below is make it so when Tests are all running in the one
1115 // jvm, that they each have a differently seeded Random.
1116 private static final Random randomSeed = new Random(System.currentTimeMillis());
1118 private static long nextRandomSeed() {
1119 return randomSeed.nextLong();
1121 private final int everyN;
1123 protected final Random rand = new Random(nextRandomSeed());
1124 protected final Configuration conf;
1125 protected final TestOptions opts;
1127 private final Status status;
1128 private final Sampler traceSampler;
1129 private final SpanReceiverHost receiverHost;
1131 private String testName;
1132 private Histogram latencyHistogram;
1133 private Histogram valueSizeHistogram;
1134 private Histogram rpcCallsHistogram;
1135 private Histogram remoteRpcCallsHistogram;
1136 private Histogram millisBetweenNextHistogram;
1137 private Histogram regionsScannedHistogram;
1138 private Histogram bytesInResultsHistogram;
1139 private Histogram bytesInRemoteResultsHistogram;
1140 private RandomDistribution.Zipf zipf;
1143 * Note that all subclasses of this class must provide a public constructor
1144 * that has the exact same list of arguments.
1146 TestBase(final Configuration conf, final TestOptions options, final Status status) {
1147 this.conf = conf;
1148 this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1149 this.opts = options;
1150 this.status = status;
1151 this.testName = this.getClass().getSimpleName();
1152 if (options.traceRate >= 1.0) {
1153 this.traceSampler = Sampler.ALWAYS;
1154 } else if (options.traceRate > 0.0) {
1155 conf.setDouble("hbase.sampler.fraction", options.traceRate);
1156 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1157 } else {
1158 this.traceSampler = Sampler.NEVER;
1160 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1161 if (options.isValueZipf()) {
1162 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1164 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1167 int getValueLength(final Random r) {
1168 if (this.opts.isValueRandom()) {
1169 return r.nextInt(opts.valueSize);
1170 } else if (this.opts.isValueZipf()) {
1171 return Math.abs(this.zipf.nextInt());
1172 } else {
1173 return opts.valueSize;
1177 void updateValueSize(final Result [] rs) throws IOException {
1178 if (rs == null || !isRandomValueSize()) return;
1179 for (Result r: rs) updateValueSize(r);
1182 void updateValueSize(final Result r) throws IOException {
1183 if (r == null || !isRandomValueSize()) return;
1184 int size = 0;
1185 for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1186 size += scanner.current().getValueLength();
1188 updateValueSize(size);
1191 void updateValueSize(final int valueSize) {
1192 if (!isRandomValueSize()) return;
1193 this.valueSizeHistogram.update(valueSize);
1196 void updateScanMetrics(final ScanMetrics metrics) {
1197 if (metrics == null) return;
1198 Map<String,Long> metricsMap = metrics.getMetricsMap();
1199 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1200 if (rpcCalls != null) {
1201 this.rpcCallsHistogram.update(rpcCalls.longValue());
1203 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1204 if (remoteRpcCalls != null) {
1205 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1207 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1208 if (millisBetweenNext != null) {
1209 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1211 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1212 if (regionsScanned != null) {
1213 this.regionsScannedHistogram.update(regionsScanned.longValue());
1215 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1216 if (bytesInResults != null && bytesInResults.longValue() > 0) {
1217 this.bytesInResultsHistogram.update(bytesInResults.longValue());
1219 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1220 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1221 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1225 String generateStatus(final int sr, final int i, final int lr) {
1226 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1227 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1230 boolean isRandomValueSize() {
1231 return opts.valueRandom;
1234 protected int getReportingPeriod() {
1235 return opts.period;
1239 * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1241 public Histogram getLatencyHistogram() {
1242 return latencyHistogram;
1245 void testSetup() throws IOException {
1246 // test metrics
1247 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1248 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1249 // scan metrics
1250 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1251 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1252 millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1253 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1254 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1255 bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1257 onStartup();
1260 abstract void onStartup() throws IOException;
1262 void testTakedown() throws IOException {
1263 onTakedown();
1264 // Print all stats for this thread continuously.
1265 // Synchronize on Test.class so different threads don't intermingle the
1266 // output. We can't use 'this' here because each thread has its own instance of Test class.
1267 synchronized (Test.class) {
1268 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1269 status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1270 latencyHistogram));
1271 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1272 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1273 if (valueSizeHistogram.getCount() > 0) {
1274 status.setStatus("ValueSize (bytes) : "
1275 + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1276 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1277 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1278 } else {
1279 status.setStatus("No valueSize statistics available");
1281 if (rpcCallsHistogram.getCount() > 0) {
1282 status.setStatus("rpcCalls (count): " +
1283 YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1285 if (remoteRpcCallsHistogram.getCount() > 0) {
1286 status.setStatus("remoteRpcCalls (count): " +
1287 YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1289 if (millisBetweenNextHistogram.getCount() > 0) {
1290 status.setStatus("millisBetweenNext (latency): " +
1291 YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1293 if (regionsScannedHistogram.getCount() > 0) {
1294 status.setStatus("regionsScanned (count): " +
1295 YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1297 if (bytesInResultsHistogram.getCount() > 0) {
1298 status.setStatus("bytesInResults (size): " +
1299 YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1301 if (bytesInRemoteResultsHistogram.getCount() > 0) {
1302 status.setStatus("bytesInRemoteResults (size): " +
1303 YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1306 receiverHost.closeReceivers();
1309 abstract void onTakedown() throws IOException;
1313 * Run test
1314 * @return Elapsed time.
1315 * @throws IOException
1317 long test() throws IOException, InterruptedException {
1318 testSetup();
1319 LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1320 final long startTime = System.nanoTime();
1321 try {
1322 testTimed();
1323 } finally {
1324 testTakedown();
1326 return (System.nanoTime() - startTime) / 1000000;
1329 int getStartRow() {
1330 return opts.startRow;
1333 int getLastRow() {
1334 return getStartRow() + opts.perClientRunRows;
1338 * Provides an extension point for tests that don't want a per row invocation.
1340 void testTimed() throws IOException, InterruptedException {
1341 int startRow = getStartRow();
1342 int lastRow = getLastRow();
1343 TraceUtil.addSampler(traceSampler);
1344 // Report on completion of 1/10th of total.
1345 for (int ii = 0; ii < opts.cycles; ii++) {
1346 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1347 for (int i = startRow; i < lastRow; i++) {
1348 if (i % everyN != 0) continue;
1349 long startTime = System.nanoTime();
1350 boolean requestSent = false;
1351 try (TraceScope scope = TraceUtil.createTrace("test row");){
1352 requestSent = testRow(i);
1354 if ( (i - startRow) > opts.measureAfter) {
1355 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1356 // first 9 times and sends the actual get request in the 10th iteration.
1357 // We should only set latency when actual request is sent because otherwise
1358 // it turns out to be 0.
1359 if (requestSent) {
1360 latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1362 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1363 status.setStatus(generateStatus(startRow, i, lastRow));
1371 * @return Subset of the histograms' calculation.
1373 public String getShortLatencyReport() {
1374 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1378 * @return Subset of the histograms' calculation.
1380 public String getShortValueSizeReport() {
1381 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1386 * Test for individual row.
1387 * @param i Row index.
1388 * @return true if the row was sent to server and need to record metrics.
1389 * False if not, multiGet and multiPut e.g., the rows are sent
1390 * to server only if enough gets/puts are gathered.
1392 abstract boolean testRow(final int i) throws IOException, InterruptedException;
1395 static abstract class Test extends TestBase {
1396 protected Connection connection;
1398 Test(final Connection con, final TestOptions options, final Status status) {
1399 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1400 this.connection = con;
1404 static abstract class AsyncTest extends TestBase {
1405 protected AsyncConnection connection;
1407 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1408 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1409 this.connection = con;
1413 static abstract class TableTest extends Test {
1414 protected Table table;
1416 TableTest(Connection con, TestOptions options, Status status) {
1417 super(con, options, status);
1420 @Override
1421 void onStartup() throws IOException {
1422 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1425 @Override
1426 void onTakedown() throws IOException {
1427 table.close();
1431 static abstract class AsyncTableTest extends AsyncTest {
1432 protected AsyncTable<?> table;
1434 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1435 super(con, options, status);
1438 @Override
1439 void onStartup() throws IOException {
1440 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1443 @Override
1444 void onTakedown() throws IOException {
1448 static class AsyncRandomReadTest extends AsyncTableTest {
1449 private final Consistency consistency;
1450 private ArrayList<Get> gets;
1451 private Random rd = new Random();
1453 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1454 super(con, options, status);
1455 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1456 if (opts.multiGet > 0) {
1457 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1458 this.gets = new ArrayList<>(opts.multiGet);
1462 @Override
1463 boolean testRow(final int i) throws IOException, InterruptedException {
1464 if (opts.randomSleep > 0) {
1465 Thread.sleep(rd.nextInt(opts.randomSleep));
1467 Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1468 for (int family = 0; family < opts.families; family++) {
1469 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1470 if (opts.addColumns) {
1471 for (int column = 0; column < opts.columns; column++) {
1472 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1473 get.addColumn(familyName, qualifier);
1475 } else {
1476 get.addFamily(familyName);
1479 if (opts.filterAll) {
1480 get.setFilter(new FilterAllFilter());
1482 get.setConsistency(consistency);
1483 if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1484 try {
1485 if (opts.multiGet > 0) {
1486 this.gets.add(get);
1487 if (this.gets.size() == opts.multiGet) {
1488 Result[] rs =
1489 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1490 updateValueSize(rs);
1491 this.gets.clear();
1492 } else {
1493 return false;
1495 } else {
1496 updateValueSize(this.table.get(get).get());
1498 } catch (ExecutionException e) {
1499 throw new IOException(e);
1501 return true;
1504 public static RuntimeException runtime(Throwable e) {
1505 if (e instanceof RuntimeException) {
1506 return (RuntimeException) e;
1508 return new RuntimeException(e);
1511 public static <V> V propagate(Callable<V> callable) {
1512 try {
1513 return callable.call();
1514 } catch (Exception e) {
1515 throw runtime(e);
1519 @Override
1520 protected int getReportingPeriod() {
1521 int period = opts.perClientRunRows / 10;
1522 return period == 0 ? opts.perClientRunRows : period;
1525 @Override
1526 protected void testTakedown() throws IOException {
1527 if (this.gets != null && this.gets.size() > 0) {
1528 this.table.get(gets);
1529 this.gets.clear();
1531 super.testTakedown();
1535 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1537 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1538 super(con, options, status);
1541 @Override
1542 protected byte[] generateRow(final int i) {
1543 return getRandomRow(this.rand, opts.totalRows);
1547 static class AsyncScanTest extends AsyncTableTest {
1548 private ResultScanner testScanner;
1549 private AsyncTable<?> asyncTable;
1551 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1552 super(con, options, status);
1555 @Override
1556 void onStartup() throws IOException {
1557 this.asyncTable =
1558 connection.getTable(TableName.valueOf(opts.tableName),
1559 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1562 @Override
1563 void testTakedown() throws IOException {
1564 if (this.testScanner != null) {
1565 updateScanMetrics(this.testScanner.getScanMetrics());
1566 this.testScanner.close();
1568 super.testTakedown();
1571 @Override
1572 boolean testRow(final int i) throws IOException {
1573 if (this.testScanner == null) {
1574 Scan scan =
1575 new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1576 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1577 .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1578 for (int family = 0; family < opts.families; family++) {
1579 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1580 if (opts.addColumns) {
1581 for (int column = 0; column < opts.columns; column++) {
1582 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1583 scan.addColumn(familyName, qualifier);
1585 } else {
1586 scan.addFamily(familyName);
1589 if (opts.filterAll) {
1590 scan.setFilter(new FilterAllFilter());
1592 this.testScanner = asyncTable.getScanner(scan);
1594 Result r = testScanner.next();
1595 updateValueSize(r);
1596 return true;
1600 static class AsyncSequentialReadTest extends AsyncTableTest {
1601 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1602 super(con, options, status);
1605 @Override
1606 boolean testRow(final int i) throws IOException, InterruptedException {
1607 Get get = new Get(format(i));
1608 for (int family = 0; family < opts.families; family++) {
1609 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1610 if (opts.addColumns) {
1611 for (int column = 0; column < opts.columns; column++) {
1612 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1613 get.addColumn(familyName, qualifier);
1615 } else {
1616 get.addFamily(familyName);
1619 if (opts.filterAll) {
1620 get.setFilter(new FilterAllFilter());
1622 try {
1623 updateValueSize(table.get(get).get());
1624 } catch (ExecutionException e) {
1625 throw new IOException(e);
1627 return true;
1631 static class AsyncSequentialWriteTest extends AsyncTableTest {
1632 private ArrayList<Put> puts;
1634 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1635 super(con, options, status);
1636 if (opts.multiPut > 0) {
1637 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1638 this.puts = new ArrayList<>(opts.multiPut);
1642 protected byte[] generateRow(final int i) {
1643 return format(i);
1646 @Override
1647 @SuppressWarnings("ReturnValueIgnored")
1648 boolean testRow(final int i) throws IOException, InterruptedException {
1649 byte[] row = generateRow(i);
1650 Put put = new Put(row);
1651 for (int family = 0; family < opts.families; family++) {
1652 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1653 for (int column = 0; column < opts.columns; column++) {
1654 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1655 byte[] value = generateData(this.rand, getValueLength(this.rand));
1656 if (opts.useTags) {
1657 byte[] tag = generateData(this.rand, TAG_LENGTH);
1658 Tag[] tags = new Tag[opts.noOfTags];
1659 for (int n = 0; n < opts.noOfTags; n++) {
1660 Tag t = new ArrayBackedTag((byte) n, tag);
1661 tags[n] = t;
1663 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1664 value, tags);
1665 put.add(kv);
1666 updateValueSize(kv.getValueLength());
1667 } else {
1668 put.addColumn(familyName, qualifier, value);
1669 updateValueSize(value.length);
1673 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1674 try {
1675 table.put(put).get();
1676 if (opts.multiPut > 0) {
1677 this.puts.add(put);
1678 if (this.puts.size() == opts.multiPut) {
1679 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1680 this.puts.clear();
1681 } else {
1682 return false;
1684 } else {
1685 table.put(put).get();
1687 } catch (ExecutionException e) {
1688 throw new IOException(e);
1690 return true;
1694 static abstract class BufferedMutatorTest extends Test {
1695 protected BufferedMutator mutator;
1696 protected Table table;
1698 BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1699 super(con, options, status);
1702 @Override
1703 void onStartup() throws IOException {
1704 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1705 p.writeBufferSize(opts.bufferSize);
1706 this.mutator = connection.getBufferedMutator(p);
1707 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1710 @Override
1711 void onTakedown() throws IOException {
1712 mutator.close();
1713 table.close();
1717 static class RandomSeekScanTest extends TableTest {
1718 RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1719 super(con, options, status);
1722 @Override
1723 boolean testRow(final int i) throws IOException {
1724 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1725 .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1726 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1727 .setScanMetricsEnabled(true);
1728 FilterList list = new FilterList();
1729 for (int family = 0; family < opts.families; family++) {
1730 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1731 if (opts.addColumns) {
1732 for (int column = 0; column < opts.columns; column++) {
1733 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1734 scan.addColumn(familyName, qualifier);
1736 } else {
1737 scan.addFamily(familyName);
1740 if (opts.filterAll) {
1741 list.addFilter(new FilterAllFilter());
1743 list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1744 scan.setFilter(list);
1745 ResultScanner s = this.table.getScanner(scan);
1746 try {
1747 for (Result rr; (rr = s.next()) != null;) {
1748 updateValueSize(rr);
1750 } finally {
1751 updateScanMetrics(s.getScanMetrics());
1752 s.close();
1754 return true;
1757 @Override
1758 protected int getReportingPeriod() {
1759 int period = opts.perClientRunRows / 100;
1760 return period == 0 ? opts.perClientRunRows : period;
1765 static abstract class RandomScanWithRangeTest extends TableTest {
1766 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1767 super(con, options, status);
1770 @Override
1771 boolean testRow(final int i) throws IOException {
1772 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1773 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1774 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1775 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1776 .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1777 for (int family = 0; family < opts.families; family++) {
1778 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1779 if (opts.addColumns) {
1780 for (int column = 0; column < opts.columns; column++) {
1781 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1782 scan.addColumn(familyName, qualifier);
1784 } else {
1785 scan.addFamily(familyName);
1788 if (opts.filterAll) {
1789 scan.setFilter(new FilterAllFilter());
1791 Result r = null;
1792 int count = 0;
1793 ResultScanner s = this.table.getScanner(scan);
1794 try {
1795 for (; (r = s.next()) != null;) {
1796 updateValueSize(r);
1797 count++;
1799 if (i % 100 == 0) {
1800 LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1801 Bytes.toString(startAndStopRow.getFirst()),
1802 Bytes.toString(startAndStopRow.getSecond()), count));
1804 } finally {
1805 updateScanMetrics(s.getScanMetrics());
1806 s.close();
1808 return true;
1811 protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1813 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1814 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1815 int stop = start + maxRange;
1816 return new Pair<>(format(start), format(stop));
1819 @Override
1820 protected int getReportingPeriod() {
1821 int period = opts.perClientRunRows / 100;
1822 return period == 0? opts.perClientRunRows: period;
1826 static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1827 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1828 super(con, options, status);
1831 @Override
1832 protected Pair<byte[], byte[]> getStartAndStopRow() {
1833 return generateStartAndStopRows(10);
1837 static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1838 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1839 super(con, options, status);
1842 @Override
1843 protected Pair<byte[], byte[]> getStartAndStopRow() {
1844 return generateStartAndStopRows(100);
1848 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1849 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1850 super(con, options, status);
1853 @Override
1854 protected Pair<byte[], byte[]> getStartAndStopRow() {
1855 return generateStartAndStopRows(1000);
1859 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1860 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1861 super(con, options, status);
1864 @Override
1865 protected Pair<byte[], byte[]> getStartAndStopRow() {
1866 return generateStartAndStopRows(10000);
1870 static class RandomReadTest extends TableTest {
1871 private final Consistency consistency;
1872 private ArrayList<Get> gets;
1873 private Random rd = new Random();
1875 RandomReadTest(Connection con, TestOptions options, Status status) {
1876 super(con, options, status);
1877 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1878 if (opts.multiGet > 0) {
1879 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1880 this.gets = new ArrayList<>(opts.multiGet);
1884 @Override
1885 boolean testRow(final int i) throws IOException, InterruptedException {
1886 if (opts.randomSleep > 0) {
1887 Thread.sleep(rd.nextInt(opts.randomSleep));
1889 Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1890 for (int family = 0; family < opts.families; family++) {
1891 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1892 if (opts.addColumns) {
1893 for (int column = 0; column < opts.columns; column++) {
1894 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1895 get.addColumn(familyName, qualifier);
1897 } else {
1898 get.addFamily(familyName);
1901 if (opts.filterAll) {
1902 get.setFilter(new FilterAllFilter());
1904 get.setConsistency(consistency);
1905 if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1906 if (opts.multiGet > 0) {
1907 this.gets.add(get);
1908 if (this.gets.size() == opts.multiGet) {
1909 Result [] rs = this.table.get(this.gets);
1910 updateValueSize(rs);
1911 this.gets.clear();
1912 } else {
1913 return false;
1915 } else {
1916 updateValueSize(this.table.get(get));
1918 return true;
1921 @Override
1922 protected int getReportingPeriod() {
1923 int period = opts.perClientRunRows / 10;
1924 return period == 0 ? opts.perClientRunRows : period;
1927 @Override
1928 protected void testTakedown() throws IOException {
1929 if (this.gets != null && this.gets.size() > 0) {
1930 this.table.get(gets);
1931 this.gets.clear();
1933 super.testTakedown();
1937 static class RandomWriteTest extends SequentialWriteTest {
1938 RandomWriteTest(Connection con, TestOptions options, Status status) {
1939 super(con, options, status);
1942 @Override
1943 protected byte[] generateRow(final int i) {
1944 return getRandomRow(this.rand, opts.totalRows);
1950 static class ScanTest extends TableTest {
1951 private ResultScanner testScanner;
1953 ScanTest(Connection con, TestOptions options, Status status) {
1954 super(con, options, status);
1957 @Override
1958 void testTakedown() throws IOException {
1959 if (this.testScanner != null) {
1960 this.testScanner.close();
1962 super.testTakedown();
1966 @Override
1967 boolean testRow(final int i) throws IOException {
1968 if (this.testScanner == null) {
1969 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1970 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1971 .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1972 for (int family = 0; family < opts.families; family++) {
1973 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1974 if (opts.addColumns) {
1975 for (int column = 0; column < opts.columns; column++) {
1976 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1977 scan.addColumn(familyName, qualifier);
1979 } else {
1980 scan.addFamily(familyName);
1983 if (opts.filterAll) {
1984 scan.setFilter(new FilterAllFilter());
1986 this.testScanner = table.getScanner(scan);
1988 Result r = testScanner.next();
1989 updateValueSize(r);
1990 return true;
1995 * Base class for operations that are CAS-like; that read a value and then set it based off what
1996 * they read. In this category is increment, append, checkAndPut, etc.
1998 * <p>These operations also want some concurrency going on. Usually when these tests run, they
1999 * operate in their own part of the key range. In CASTest, we will have them all overlap on the
2000 * same key space. We do this with our getStartRow and getLastRow overrides.
2002 static abstract class CASTableTest extends TableTest {
2003 private final byte [] qualifier;
2004 CASTableTest(Connection con, TestOptions options, Status status) {
2005 super(con, options, status);
2006 qualifier = Bytes.toBytes(this.getClass().getSimpleName());
2009 byte [] getQualifier() {
2010 return this.qualifier;
2013 @Override
2014 int getStartRow() {
2015 return 0;
2018 @Override
2019 int getLastRow() {
2020 return opts.perClientRunRows;
2024 static class IncrementTest extends CASTableTest {
2025 IncrementTest(Connection con, TestOptions options, Status status) {
2026 super(con, options, status);
2029 @Override
2030 boolean testRow(final int i) throws IOException {
2031 Increment increment = new Increment(format(i));
2032 // unlike checkAndXXX tests, which make most sense to do on a single value,
2033 // if multiple families are specified for an increment test we assume it is
2034 // meant to raise the work factor
2035 for (int family = 0; family < opts.families; family++) {
2036 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2037 increment.addColumn(familyName, getQualifier(), 1l);
2039 updateValueSize(this.table.increment(increment));
2040 return true;
2044 static class AppendTest extends CASTableTest {
2045 AppendTest(Connection con, TestOptions options, Status status) {
2046 super(con, options, status);
2049 @Override
2050 boolean testRow(final int i) throws IOException {
2051 byte [] bytes = format(i);
2052 Append append = new Append(bytes);
2053 // unlike checkAndXXX tests, which make most sense to do on a single value,
2054 // if multiple families are specified for an append test we assume it is
2055 // meant to raise the work factor
2056 for (int family = 0; family < opts.families; family++) {
2057 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2058 append.addColumn(familyName, getQualifier(), bytes);
2060 updateValueSize(this.table.append(append));
2061 return true;
2065 static class CheckAndMutateTest extends CASTableTest {
2066 CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2067 super(con, options, status);
2070 @Override
2071 boolean testRow(final int i) throws IOException {
2072 final byte [] bytes = format(i);
2073 // checkAndXXX tests operate on only a single value
2074 // Put a known value so when we go to check it, it is there.
2075 Put put = new Put(bytes);
2076 put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2077 this.table.put(put);
2078 RowMutations mutations = new RowMutations(bytes);
2079 mutations.add(put);
2080 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2081 .ifEquals(bytes).thenMutate(mutations);
2082 return true;
2086 static class CheckAndPutTest extends CASTableTest {
2087 CheckAndPutTest(Connection con, TestOptions options, Status status) {
2088 super(con, options, status);
2091 @Override
2092 boolean testRow(final int i) throws IOException {
2093 final byte [] bytes = format(i);
2094 // checkAndXXX tests operate on only a single value
2095 // Put a known value so when we go to check it, it is there.
2096 Put put = new Put(bytes);
2097 put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2098 this.table.put(put);
2099 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2100 .ifEquals(bytes).thenPut(put);
2101 return true;
2105 static class CheckAndDeleteTest extends CASTableTest {
2106 CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2107 super(con, options, status);
2110 @Override
2111 boolean testRow(final int i) throws IOException {
2112 final byte [] bytes = format(i);
2113 // checkAndXXX tests operate on only a single value
2114 // Put a known value so when we go to check it, it is there.
2115 Put put = new Put(bytes);
2116 put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2117 this.table.put(put);
2118 Delete delete = new Delete(put.getRow());
2119 delete.addColumn(FAMILY_ZERO, getQualifier());
2120 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2121 .ifEquals(bytes).thenDelete(delete);
2122 return true;
2126 static class SequentialReadTest extends TableTest {
2127 SequentialReadTest(Connection con, TestOptions options, Status status) {
2128 super(con, options, status);
2131 @Override
2132 boolean testRow(final int i) throws IOException {
2133 Get get = new Get(format(i));
2134 for (int family = 0; family < opts.families; family++) {
2135 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2136 if (opts.addColumns) {
2137 for (int column = 0; column < opts.columns; column++) {
2138 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2139 get.addColumn(familyName, qualifier);
2141 } else {
2142 get.addFamily(familyName);
2145 if (opts.filterAll) {
2146 get.setFilter(new FilterAllFilter());
2148 updateValueSize(table.get(get));
2149 return true;
2153 static class SequentialWriteTest extends BufferedMutatorTest {
2154 private ArrayList<Put> puts;
2157 SequentialWriteTest(Connection con, TestOptions options, Status status) {
2158 super(con, options, status);
2159 if (opts.multiPut > 0) {
2160 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2161 this.puts = new ArrayList<>(opts.multiPut);
2165 protected byte[] generateRow(final int i) {
2166 return format(i);
2169 @Override
2170 boolean testRow(final int i) throws IOException {
2171 byte[] row = generateRow(i);
2172 Put put = new Put(row);
2173 for (int family = 0; family < opts.families; family++) {
2174 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2175 for (int column = 0; column < opts.columns; column++) {
2176 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2177 byte[] value = generateData(this.rand, getValueLength(this.rand));
2178 if (opts.useTags) {
2179 byte[] tag = generateData(this.rand, TAG_LENGTH);
2180 Tag[] tags = new Tag[opts.noOfTags];
2181 for (int n = 0; n < opts.noOfTags; n++) {
2182 Tag t = new ArrayBackedTag((byte) n, tag);
2183 tags[n] = t;
2185 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2186 value, tags);
2187 put.add(kv);
2188 updateValueSize(kv.getValueLength());
2189 } else {
2190 put.addColumn(familyName, qualifier, value);
2191 updateValueSize(value.length);
2195 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2196 if (opts.autoFlush) {
2197 if (opts.multiPut > 0) {
2198 this.puts.add(put);
2199 if (this.puts.size() == opts.multiPut) {
2200 table.put(this.puts);
2201 this.puts.clear();
2202 } else {
2203 return false;
2205 } else {
2206 table.put(put);
2208 } else {
2209 mutator.mutate(put);
2211 return true;
2215 static class FilteredScanTest extends TableTest {
2216 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2218 FilteredScanTest(Connection con, TestOptions options, Status status) {
2219 super(con, options, status);
2220 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2221 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2222 ". This could take a very long time.");
2226 @Override
2227 boolean testRow(int i) throws IOException {
2228 byte[] value = generateData(this.rand, getValueLength(this.rand));
2229 Scan scan = constructScan(value);
2230 ResultScanner scanner = null;
2231 try {
2232 scanner = this.table.getScanner(scan);
2233 for (Result r = null; (r = scanner.next()) != null;) {
2234 updateValueSize(r);
2236 } finally {
2237 if (scanner != null) {
2238 updateScanMetrics(scanner.getScanMetrics());
2239 scanner.close();
2242 return true;
2245 protected Scan constructScan(byte[] valuePrefix) throws IOException {
2246 FilterList list = new FilterList();
2247 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2248 CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2249 list.addFilter(filter);
2250 if (opts.filterAll) {
2251 list.addFilter(new FilterAllFilter());
2253 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2254 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2255 .setScanMetricsEnabled(true);
2256 if (opts.addColumns) {
2257 for (int column = 0; column < opts.columns; column++) {
2258 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2259 scan.addColumn(FAMILY_ZERO, qualifier);
2261 } else {
2262 scan.addFamily(FAMILY_ZERO);
2264 scan.setFilter(list);
2265 return scan;
2270 * Compute a throughput rate in MB/s.
2271 * @param rows Number of records consumed.
2272 * @param timeMs Time taken in milliseconds.
2273 * @return String value with label, ie '123.76 MB/s'
2275 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2276 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2277 ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2278 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2279 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2280 .divide(BYTES_PER_MB, CXT);
2281 return FMT.format(mbps) + " MB/s";
2285 * Format passed integer.
2286 * @param number
2287 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2288 * number (Does absolute in case number is negative).
2290 public static byte [] format(final int number) {
2291 byte [] b = new byte[ROW_LENGTH];
2292 int d = Math.abs(number);
2293 for (int i = b.length - 1; i >= 0; i--) {
2294 b[i] = (byte)((d % 10) + '0');
2295 d /= 10;
2297 return b;
2301 * This method takes some time and is done inline uploading data. For
2302 * example, doing the mapfile test, generation of the key and value
2303 * consumes about 30% of CPU time.
2304 * @return Generated random value to insert into a table cell.
2306 public static byte[] generateData(final Random r, int length) {
2307 byte [] b = new byte [length];
2308 int i;
2310 for(i = 0; i < (length-8); i += 8) {
2311 b[i] = (byte) (65 + r.nextInt(26));
2312 b[i+1] = b[i];
2313 b[i+2] = b[i];
2314 b[i+3] = b[i];
2315 b[i+4] = b[i];
2316 b[i+5] = b[i];
2317 b[i+6] = b[i];
2318 b[i+7] = b[i];
2321 byte a = (byte) (65 + r.nextInt(26));
2322 for(; i < length; i++) {
2323 b[i] = a;
2325 return b;
2328 static byte [] getRandomRow(final Random random, final int totalRows) {
2329 return format(generateRandomRow(random, totalRows));
2332 static int generateRandomRow(final Random random, final int totalRows) {
2333 return random.nextInt(Integer.MAX_VALUE) % totalRows;
2336 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2337 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2338 throws IOException, InterruptedException {
2339 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2340 + opts.perClientRunRows + " rows");
2341 long totalElapsedTime;
2343 final TestBase t;
2344 try {
2345 if (AsyncTest.class.isAssignableFrom(cmd)) {
2346 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2347 Constructor<? extends AsyncTest> constructor =
2348 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2349 t = constructor.newInstance(asyncCon, opts, status);
2350 } else {
2351 Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2352 Constructor<? extends Test> constructor =
2353 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2354 t = constructor.newInstance(con, opts, status);
2356 } catch (NoSuchMethodException e) {
2357 throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2358 + ". It does not provide a constructor as described by "
2359 + "the javadoc comment. Available constructors are: "
2360 + Arrays.toString(cmd.getConstructors()));
2361 } catch (Exception e) {
2362 throw new IllegalStateException("Failed to construct command class", e);
2364 totalElapsedTime = t.test();
2366 status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2367 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2368 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2369 getAverageValueLength(opts), opts.families, opts.columns) + ")");
2371 return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2374 private static int getAverageValueLength(final TestOptions opts) {
2375 return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2378 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2379 InterruptedException, ClassNotFoundException, ExecutionException {
2380 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2381 // the TestOptions introspection for us and dump the output in a readable format.
2382 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2383 Admin admin = null;
2384 Connection connection = null;
2385 try {
2386 connection = ConnectionFactory.createConnection(getConf());
2387 admin = connection.getAdmin();
2388 checkTable(admin, opts);
2389 } finally {
2390 if (admin != null) admin.close();
2391 if (connection != null) connection.close();
2393 if (opts.nomapred) {
2394 doLocalClients(opts, getConf());
2395 } else {
2396 doMapReduce(opts, getConf());
2400 protected void printUsage() {
2401 printUsage(PE_COMMAND_SHORTNAME, null);
2404 protected static void printUsage(final String message) {
2405 printUsage(PE_COMMAND_SHORTNAME, message);
2408 protected static void printUsageAndExit(final String message, final int exitCode) {
2409 printUsage(message);
2410 System.exit(exitCode);
2413 protected static void printUsage(final String shortName, final String message) {
2414 if (message != null && message.length() > 0) {
2415 System.err.println(message);
2417 System.err.print("Usage: hbase " + shortName);
2418 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
2419 System.err.println();
2420 System.err.println("General Options:");
2421 System.err.println(" nomapred Run multiple clients using threads " +
2422 "(rather than use mapreduce)");
2423 System.err.println(" oneCon all the threads share the same connection. Default: False");
2424 System.err.println(" connCount connections all threads share. "
2425 + "For example, if set to 2, then all thread share 2 connection. "
2426 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2427 + "if not, connCount=thread number");
2429 System.err.println(" sampleRate Execute test on a sample of total " +
2430 "rows. Only supported by randomRead. Default: 1.0");
2431 System.err.println(" period Report every 'period' rows: " +
2432 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2433 System.err.println(" cycles How many times to cycle the test. Defaults: 1.");
2434 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
2435 "Default: 0");
2436 System.err.println(" latency Set to report operation latencies. Default: False");
2437 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" +
2438 " rows have been treated. Default: 0");
2439 System.err.println(" valueSize Pass value size to use: Default: "
2440 + DEFAULT_OPTS.getValueSize());
2441 System.err.println(" valueRandom Set if we should vary value size between 0 and " +
2442 "'valueSize'; set on read for stats on size: Default: Not set.");
2443 System.err.println(" blockEncoding Block encoding to use. Value should be one of "
2444 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2445 System.err.println();
2446 System.err.println("Table Creation / Write Tests:");
2447 System.err.println(" table Alternate table name. Default: 'TestTable'");
2448 System.err.println(" rows Rows each client runs. Default: "
2449 + DEFAULT_OPTS.getPerClientRunRows()
2450 + ". In case of randomReads and randomSeekScans this could"
2451 + " be specified along with --size to specify the number of rows to be scanned within"
2452 + " the total range specified by the size.");
2453 System.err.println(
2454 " size Total size in GiB. Mutually exclusive with --rows for writes and scans"
2455 + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2456 + " use size to specify the end range and --rows"
2457 + " specifies the number of rows within that range. " + "Default: 1.0.");
2458 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2459 System.err.println(" flushCommits Used to determine if the test should flush the table. " +
2460 "Default: false");
2461 System.err.println(" valueZipf Set if we should vary value size between 0 and " +
2462 "'valueSize' in zipf form: Default: Not set.");
2463 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
2464 System.err.println(" autoFlush Set autoFlush on htable. Default: False");
2465 System.err.println(" multiPut Batch puts together into groups of N. Only supported " +
2466 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2467 System.err.println(" presplit Create presplit table. If a table with same name exists,"
2468 + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2469 + "Recommended for accurate perf analysis (see guide). Default: disabled");
2470 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
2471 "Default: false");
2472 System.err.println(" numoftags Specify the no of tags that would be needed. " +
2473 "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2474 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
2475 System.err.println(" columns Columns to write per row. Default: 1");
2476 System.err.println(" families Specify number of column families for the table. Default: 1");
2477 System.err.println();
2478 System.err.println("Read Tests:");
2479 System.err.println(" filterAll Helps to filter out all the rows on the server side"
2480 + " there by not returning any thing back to the client. Helps to check the server side"
2481 + " performance. Uses FilterAllFilter internally. ");
2482 System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
2483 "by randomRead. Default: disabled");
2484 System.err.println(" inmemory Tries to keep the HFiles of the CF " +
2485 "inmemory as far as possible. Not guaranteed that reads are always served " +
2486 "from memory. Default: false");
2487 System.err.println(" bloomFilter Bloom filter type, one of "
2488 + Arrays.toString(BloomType.values()));
2489 System.err.println(" blockSize Blocksize to use when writing out hfiles. ");
2490 System.err.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. "
2491 + "Uses the CompactingMemstore");
2492 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
2493 System.err.println(" replicas Enable region replica testing. Defaults: 1.");
2494 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
2495 System.err.println(" caching Scan caching to use. Default: 30");
2496 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan");
2497 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true");
2498 System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default");
2499 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB");
2500 System.err.println();
2501 System.err.println(" Note: -D properties will be applied to the conf used. ");
2502 System.err.println(" For example: ");
2503 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true");
2504 System.err.println(" -Dmapreduce.task.timeout=60000");
2505 System.err.println();
2506 System.err.println("Command:");
2507 for (CmdDescriptor command : COMMANDS.values()) {
2508 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2510 System.err.println();
2511 System.err.println("Args:");
2512 System.err.println(" nclients Integer. Required. Total number of clients "
2513 + "(and HRegionServers) running. 1 <= value <= 500");
2514 System.err.println("Examples:");
2515 System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2516 System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2517 System.err.println(" To run 10 clients doing increments over ten rows:");
2518 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2522 * Parse options passed in via an arguments array. Assumes that array has been split
2523 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2524 * in the queue at the conclusion of this method call. It's up to the caller to deal
2525 * with these unrecognized arguments.
2527 static TestOptions parseOpts(Queue<String> args) {
2528 TestOptions opts = new TestOptions();
2530 String cmd = null;
2531 while ((cmd = args.poll()) != null) {
2532 if (cmd.equals("-h") || cmd.startsWith("--h")) {
2533 // place item back onto queue so that caller knows parsing was incomplete
2534 args.add(cmd);
2535 break;
2538 final String nmr = "--nomapred";
2539 if (cmd.startsWith(nmr)) {
2540 opts.nomapred = true;
2541 continue;
2544 final String rows = "--rows=";
2545 if (cmd.startsWith(rows)) {
2546 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2547 continue;
2550 final String cycles = "--cycles=";
2551 if (cmd.startsWith(cycles)) {
2552 opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2553 continue;
2556 final String sampleRate = "--sampleRate=";
2557 if (cmd.startsWith(sampleRate)) {
2558 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2559 continue;
2562 final String table = "--table=";
2563 if (cmd.startsWith(table)) {
2564 opts.tableName = cmd.substring(table.length());
2565 continue;
2568 final String startRow = "--startRow=";
2569 if (cmd.startsWith(startRow)) {
2570 opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2571 continue;
2574 final String compress = "--compress=";
2575 if (cmd.startsWith(compress)) {
2576 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2577 continue;
2580 final String traceRate = "--traceRate=";
2581 if (cmd.startsWith(traceRate)) {
2582 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2583 continue;
2586 final String blockEncoding = "--blockEncoding=";
2587 if (cmd.startsWith(blockEncoding)) {
2588 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2589 continue;
2592 final String flushCommits = "--flushCommits=";
2593 if (cmd.startsWith(flushCommits)) {
2594 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2595 continue;
2598 final String writeToWAL = "--writeToWAL=";
2599 if (cmd.startsWith(writeToWAL)) {
2600 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2601 continue;
2604 final String presplit = "--presplit=";
2605 if (cmd.startsWith(presplit)) {
2606 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2607 continue;
2610 final String inMemory = "--inmemory=";
2611 if (cmd.startsWith(inMemory)) {
2612 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2613 continue;
2616 final String autoFlush = "--autoFlush=";
2617 if (cmd.startsWith(autoFlush)) {
2618 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2619 if (!opts.autoFlush && opts.multiPut > 0) {
2620 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2622 continue;
2625 final String onceCon = "--oneCon=";
2626 if (cmd.startsWith(onceCon)) {
2627 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2628 if (opts.oneCon && opts.connCount > 1) {
2629 throw new IllegalArgumentException("oneCon is set to true, "
2630 + "connCount should not bigger than 1");
2632 continue;
2635 final String connCount = "--connCount=";
2636 if (cmd.startsWith(connCount)) {
2637 opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2638 if (opts.oneCon && opts.connCount > 1) {
2639 throw new IllegalArgumentException("oneCon is set to true, "
2640 + "connCount should not bigger than 1");
2642 continue;
2645 final String latency = "--latency";
2646 if (cmd.startsWith(latency)) {
2647 opts.reportLatency = true;
2648 continue;
2651 final String multiGet = "--multiGet=";
2652 if (cmd.startsWith(multiGet)) {
2653 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2654 continue;
2657 final String multiPut = "--multiPut=";
2658 if (cmd.startsWith(multiPut)) {
2659 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2660 if (!opts.autoFlush && opts.multiPut > 0) {
2661 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2663 continue;
2666 final String useTags = "--usetags=";
2667 if (cmd.startsWith(useTags)) {
2668 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2669 continue;
2672 final String noOfTags = "--numoftags=";
2673 if (cmd.startsWith(noOfTags)) {
2674 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2675 continue;
2678 final String replicas = "--replicas=";
2679 if (cmd.startsWith(replicas)) {
2680 opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2681 continue;
2684 final String filterOutAll = "--filterAll";
2685 if (cmd.startsWith(filterOutAll)) {
2686 opts.filterAll = true;
2687 continue;
2690 final String size = "--size=";
2691 if (cmd.startsWith(size)) {
2692 opts.size = Float.parseFloat(cmd.substring(size.length()));
2693 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2694 continue;
2697 final String splitPolicy = "--splitPolicy=";
2698 if (cmd.startsWith(splitPolicy)) {
2699 opts.splitPolicy = cmd.substring(splitPolicy.length());
2700 continue;
2703 final String randomSleep = "--randomSleep=";
2704 if (cmd.startsWith(randomSleep)) {
2705 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2706 continue;
2709 final String measureAfter = "--measureAfter=";
2710 if (cmd.startsWith(measureAfter)) {
2711 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2712 continue;
2715 final String bloomFilter = "--bloomFilter=";
2716 if (cmd.startsWith(bloomFilter)) {
2717 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2718 continue;
2721 final String blockSize = "--blockSize=";
2722 if(cmd.startsWith(blockSize) ) {
2723 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2724 continue;
2727 final String valueSize = "--valueSize=";
2728 if (cmd.startsWith(valueSize)) {
2729 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2730 continue;
2733 final String valueRandom = "--valueRandom";
2734 if (cmd.startsWith(valueRandom)) {
2735 opts.valueRandom = true;
2736 if (opts.valueZipf) {
2737 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2739 continue;
2742 final String valueZipf = "--valueZipf";
2743 if (cmd.startsWith(valueZipf)) {
2744 opts.valueZipf = true;
2745 if (opts.valueRandom) {
2746 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2748 continue;
2751 final String period = "--period=";
2752 if (cmd.startsWith(period)) {
2753 opts.period = Integer.parseInt(cmd.substring(period.length()));
2754 continue;
2757 final String addColumns = "--addColumns=";
2758 if (cmd.startsWith(addColumns)) {
2759 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2760 continue;
2763 final String inMemoryCompaction = "--inmemoryCompaction=";
2764 if (cmd.startsWith(inMemoryCompaction)) {
2765 opts.inMemoryCompaction =
2766 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2767 continue;
2770 final String columns = "--columns=";
2771 if (cmd.startsWith(columns)) {
2772 opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2773 continue;
2776 final String families = "--families=";
2777 if (cmd.startsWith(families)) {
2778 opts.families = Integer.parseInt(cmd.substring(families.length()));
2779 continue;
2782 final String caching = "--caching=";
2783 if (cmd.startsWith(caching)) {
2784 opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2785 continue;
2788 final String asyncPrefetch = "--asyncPrefetch";
2789 if (cmd.startsWith(asyncPrefetch)) {
2790 opts.asyncPrefetch = true;
2791 continue;
2794 final String cacheBlocks = "--cacheBlocks=";
2795 if (cmd.startsWith(cacheBlocks)) {
2796 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2797 continue;
2800 final String scanReadType = "--scanReadType=";
2801 if (cmd.startsWith(scanReadType)) {
2802 opts.scanReadType =
2803 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2804 continue;
2807 final String bufferSize = "--bufferSize=";
2808 if (cmd.startsWith(bufferSize)) {
2809 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2810 continue;
2813 if (isCommandClass(cmd)) {
2814 opts.cmdName = cmd;
2815 try {
2816 opts.numClientThreads = Integer.parseInt(args.remove());
2817 } catch (NoSuchElementException | NumberFormatException e) {
2818 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2820 opts = calculateRowsAndSize(opts);
2821 break;
2822 } else {
2823 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2826 // Not matching any option or command.
2827 System.err.println("Error: Wrong option or command: " + cmd);
2828 args.add(cmd);
2829 break;
2831 return opts;
2834 static TestOptions calculateRowsAndSize(final TestOptions opts) {
2835 int rowsPerGB = getRowsPerGB(opts);
2836 if ((opts.getCmdName() != null
2837 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2838 && opts.size != DEFAULT_OPTS.size
2839 && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2840 opts.totalRows = (int) opts.size * rowsPerGB;
2841 } else if (opts.size != DEFAULT_OPTS.size) {
2842 // total size in GB specified
2843 opts.totalRows = (int) opts.size * rowsPerGB;
2844 opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2845 } else {
2846 opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2847 opts.size = opts.totalRows / rowsPerGB;
2849 return opts;
2852 static int getRowsPerGB(final TestOptions opts) {
2853 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2854 opts.getColumns());
2857 @Override
2858 public int run(String[] args) throws Exception {
2859 // Process command-line args. TODO: Better cmd-line processing
2860 // (but hopefully something not as painful as cli options).
2861 int errCode = -1;
2862 if (args.length < 1) {
2863 printUsage();
2864 return errCode;
2867 try {
2868 LinkedList<String> argv = new LinkedList<>();
2869 argv.addAll(Arrays.asList(args));
2870 TestOptions opts = parseOpts(argv);
2872 // args remaining, print help and exit
2873 if (!argv.isEmpty()) {
2874 errCode = 0;
2875 printUsage();
2876 return errCode;
2879 // must run at least 1 client
2880 if (opts.numClientThreads <= 0) {
2881 throw new IllegalArgumentException("Number of clients must be > 0");
2884 // cmdName should not be null, print help and exit
2885 if (opts.cmdName == null) {
2886 printUsage();
2887 return errCode;
2890 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2891 if (cmdClass != null) {
2892 runTest(cmdClass, opts);
2893 errCode = 0;
2896 } catch (Exception e) {
2897 e.printStackTrace();
2900 return errCode;
2903 private static boolean isCommandClass(String cmd) {
2904 return COMMANDS.containsKey(cmd);
2907 private static Class<? extends TestBase> determineCommandClass(String cmd) {
2908 CmdDescriptor descriptor = COMMANDS.get(cmd);
2909 return descriptor != null ? descriptor.getCmdClass() : null;
2912 public static void main(final String[] args) throws Exception {
2913 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2914 System.exit(res);