HBASE-22002 Remove the deprecated methods in Admin interface
[hbase.git] / hbase-mapreduce / src / test / java / org / apache / hadoop / hbase / PerformanceEvaluation.java
blob2dae0e8c12dfa90fe6dadb6d83c2ed2adf8b1a1c
1 /**
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import com.codahale.metrics.Histogram;
22 import com.codahale.metrics.UniformReservoir;
23 import java.io.IOException;
24 import java.io.PrintStream;
25 import java.lang.reflect.Constructor;
26 import java.math.BigDecimal;
27 import java.math.MathContext;
28 import java.text.DecimalFormat;
29 import java.text.SimpleDateFormat;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.LinkedList;
34 import java.util.Locale;
35 import java.util.Map;
36 import java.util.NoSuchElementException;
37 import java.util.Queue;
38 import java.util.Random;
39 import java.util.TreeMap;
40 import java.util.concurrent.Callable;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.concurrent.Future;
45 import org.apache.commons.lang3.StringUtils;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.conf.Configured;
48 import org.apache.hadoop.fs.FileSystem;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.hbase.client.Admin;
51 import org.apache.hadoop.hbase.client.Append;
52 import org.apache.hadoop.hbase.client.AsyncConnection;
53 import org.apache.hadoop.hbase.client.AsyncTable;
54 import org.apache.hadoop.hbase.client.BufferedMutator;
55 import org.apache.hadoop.hbase.client.BufferedMutatorParams;
56 import org.apache.hadoop.hbase.client.Connection;
57 import org.apache.hadoop.hbase.client.ConnectionFactory;
58 import org.apache.hadoop.hbase.client.Consistency;
59 import org.apache.hadoop.hbase.client.Delete;
60 import org.apache.hadoop.hbase.client.Durability;
61 import org.apache.hadoop.hbase.client.Get;
62 import org.apache.hadoop.hbase.client.Increment;
63 import org.apache.hadoop.hbase.client.Put;
64 import org.apache.hadoop.hbase.client.Result;
65 import org.apache.hadoop.hbase.client.ResultScanner;
66 import org.apache.hadoop.hbase.client.RowMutations;
67 import org.apache.hadoop.hbase.client.Scan;
68 import org.apache.hadoop.hbase.client.Table;
69 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
70 import org.apache.hadoop.hbase.filter.BinaryComparator;
71 import org.apache.hadoop.hbase.filter.Filter;
72 import org.apache.hadoop.hbase.filter.FilterAllFilter;
73 import org.apache.hadoop.hbase.filter.FilterList;
74 import org.apache.hadoop.hbase.filter.PageFilter;
75 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
76 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
77 import org.apache.hadoop.hbase.io.compress.Compression;
78 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
79 import org.apache.hadoop.hbase.io.hfile.RandomDistribution;
80 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
81 import org.apache.hadoop.hbase.regionserver.BloomType;
82 import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
83 import org.apache.hadoop.hbase.trace.HBaseHTraceConfiguration;
84 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
85 import org.apache.hadoop.hbase.trace.TraceUtil;
86 import org.apache.hadoop.hbase.util.ByteArrayHashKey;
87 import org.apache.hadoop.hbase.util.Bytes;
88 import org.apache.hadoop.hbase.util.GsonUtil;
89 import org.apache.hadoop.hbase.util.Hash;
90 import org.apache.hadoop.hbase.util.MurmurHash;
91 import org.apache.hadoop.hbase.util.Pair;
92 import org.apache.hadoop.hbase.util.YammerHistogramUtils;
93 import org.apache.hadoop.io.LongWritable;
94 import org.apache.hadoop.io.Text;
95 import org.apache.hadoop.mapreduce.Job;
96 import org.apache.hadoop.mapreduce.Mapper;
97 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
98 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
99 import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
100 import org.apache.hadoop.util.Tool;
101 import org.apache.hadoop.util.ToolRunner;
102 import org.apache.htrace.core.ProbabilitySampler;
103 import org.apache.htrace.core.Sampler;
104 import org.apache.htrace.core.TraceScope;
105 import org.apache.yetus.audience.InterfaceAudience;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
109 import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
110 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
111 import org.apache.hbase.thirdparty.com.google.gson.Gson;
114 * Script used evaluating HBase performance and scalability. Runs a HBase
115 * client that steps through one of a set of hardcoded tests or 'experiments'
116 * (e.g. a random reads test, a random writes test, etc.). Pass on the
117 * command-line which test to run and how many clients are participating in
118 * this experiment. Run {@code PerformanceEvaluation --help} to obtain usage.
120 * <p>This class sets up and runs the evaluation programs described in
121 * Section 7, <i>Performance Evaluation</i>, of the <a
122 * href="http://labs.google.com/papers/bigtable.html">Bigtable</a>
123 * paper, pages 8-10.
125 * <p>By default, runs as a mapreduce job where each mapper runs a single test
126 * client. Can also run as a non-mapreduce, multithreaded application by
127 * specifying {@code --nomapred}. Each client does about 1GB of data, unless
128 * specified otherwise.
130 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
131 public class PerformanceEvaluation extends Configured implements Tool {
132 static final String RANDOM_SEEK_SCAN = "randomSeekScan";
133 static final String RANDOM_READ = "randomRead";
134 static final String PE_COMMAND_SHORTNAME = "pe";
135 private static final Logger LOG = LoggerFactory.getLogger(PerformanceEvaluation.class.getName());
136 private static final Gson GSON = GsonUtil.createGson().create();
138 public static final String TABLE_NAME = "TestTable";
139 public static final String FAMILY_NAME_BASE = "info";
140 public static final byte[] FAMILY_ZERO = Bytes.toBytes("info0");
141 public static final byte[] COLUMN_ZERO = Bytes.toBytes("" + 0);
142 public static final int DEFAULT_VALUE_LENGTH = 1000;
143 public static final int ROW_LENGTH = 26;
145 private static final int ONE_GB = 1024 * 1024 * 1000;
146 private static final int DEFAULT_ROWS_PER_GB = ONE_GB / DEFAULT_VALUE_LENGTH;
147 // TODO : should we make this configurable
148 private static final int TAG_LENGTH = 256;
149 private static final DecimalFormat FMT = new DecimalFormat("0.##");
150 private static final MathContext CXT = MathContext.DECIMAL64;
151 private static final BigDecimal MS_PER_SEC = BigDecimal.valueOf(1000);
152 private static final BigDecimal BYTES_PER_MB = BigDecimal.valueOf(1024 * 1024);
153 private static final TestOptions DEFAULT_OPTS = new TestOptions();
155 private static Map<String, CmdDescriptor> COMMANDS = new TreeMap<>();
156 private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
158 static {
159 addCommandDescriptor(AsyncRandomReadTest.class, "asyncRandomRead",
160 "Run async random read test");
161 addCommandDescriptor(AsyncRandomWriteTest.class, "asyncRandomWrite",
162 "Run async random write test");
163 addCommandDescriptor(AsyncSequentialReadTest.class, "asyncSequentialRead",
164 "Run async sequential read test");
165 addCommandDescriptor(AsyncSequentialWriteTest.class, "asyncSequentialWrite",
166 "Run async sequential write test");
167 addCommandDescriptor(AsyncScanTest.class, "asyncScan",
168 "Run async scan test (read every row)");
169 addCommandDescriptor(RandomReadTest.class, RANDOM_READ,
170 "Run random read test");
171 addCommandDescriptor(RandomSeekScanTest.class, RANDOM_SEEK_SCAN,
172 "Run random seek and scan 100 test");
173 addCommandDescriptor(RandomScanWithRange10Test.class, "scanRange10",
174 "Run random seek scan with both start and stop row (max 10 rows)");
175 addCommandDescriptor(RandomScanWithRange100Test.class, "scanRange100",
176 "Run random seek scan with both start and stop row (max 100 rows)");
177 addCommandDescriptor(RandomScanWithRange1000Test.class, "scanRange1000",
178 "Run random seek scan with both start and stop row (max 1000 rows)");
179 addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
180 "Run random seek scan with both start and stop row (max 10000 rows)");
181 addCommandDescriptor(RandomWriteTest.class, "randomWrite",
182 "Run random write test");
183 addCommandDescriptor(SequentialReadTest.class, "sequentialRead",
184 "Run sequential read test");
185 addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite",
186 "Run sequential write test");
187 addCommandDescriptor(ScanTest.class, "scan",
188 "Run scan test (read every row)");
189 addCommandDescriptor(FilteredScanTest.class, "filterScan",
190 "Run scan test using a filter to find a specific row based on it's value " +
191 "(make sure to use --rows=20)");
192 addCommandDescriptor(IncrementTest.class, "increment",
193 "Increment on each row; clients overlap on keyspace so some concurrent operations");
194 addCommandDescriptor(AppendTest.class, "append",
195 "Append on each row; clients overlap on keyspace so some concurrent operations");
196 addCommandDescriptor(CheckAndMutateTest.class, "checkAndMutate",
197 "CheckAndMutate on each row; clients overlap on keyspace so some concurrent operations");
198 addCommandDescriptor(CheckAndPutTest.class, "checkAndPut",
199 "CheckAndPut on each row; clients overlap on keyspace so some concurrent operations");
200 addCommandDescriptor(CheckAndDeleteTest.class, "checkAndDelete",
201 "CheckAndDelete on each row; clients overlap on keyspace so some concurrent operations");
205 * Enum for map metrics. Keep it out here rather than inside in the Map
206 * inner-class so we can find associated properties.
208 protected static enum Counter {
209 /** elapsed time */
210 ELAPSED_TIME,
211 /** number of rows */
212 ROWS
215 protected static class RunResult implements Comparable<RunResult> {
216 public RunResult(long duration, Histogram hist) {
217 this.duration = duration;
218 this.hist = hist;
221 public final long duration;
222 public final Histogram hist;
224 @Override
225 public String toString() {
226 return Long.toString(duration);
229 @Override public int compareTo(RunResult o) {
230 return Long.compare(this.duration, o.duration);
235 * Constructor
236 * @param conf Configuration object
238 public PerformanceEvaluation(final Configuration conf) {
239 super(conf);
242 protected static void addCommandDescriptor(Class<? extends TestBase> cmdClass,
243 String name, String description) {
244 CmdDescriptor cmdDescriptor = new CmdDescriptor(cmdClass, name, description);
245 COMMANDS.put(name, cmdDescriptor);
249 * Implementations can have their status set.
251 interface Status {
253 * Sets status
254 * @param msg status message
255 * @throws IOException
257 void setStatus(final String msg) throws IOException;
261 * MapReduce job that runs a performance evaluation client in each map task.
263 public static class EvaluationMapTask
264 extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
266 /** configuration parameter name that contains the command */
267 public final static String CMD_KEY = "EvaluationMapTask.command";
268 /** configuration parameter name that contains the PE impl */
269 public static final String PE_KEY = "EvaluationMapTask.performanceEvalImpl";
271 private Class<? extends Test> cmd;
273 @Override
274 protected void setup(Context context) throws IOException, InterruptedException {
275 this.cmd = forName(context.getConfiguration().get(CMD_KEY), Test.class);
277 // this is required so that extensions of PE are instantiated within the
278 // map reduce task...
279 Class<? extends PerformanceEvaluation> peClass =
280 forName(context.getConfiguration().get(PE_KEY), PerformanceEvaluation.class);
281 try {
282 peClass.getConstructor(Configuration.class).newInstance(context.getConfiguration());
283 } catch (Exception e) {
284 throw new IllegalStateException("Could not instantiate PE instance", e);
288 private <Type> Class<? extends Type> forName(String className, Class<Type> type) {
289 try {
290 return Class.forName(className).asSubclass(type);
291 } catch (ClassNotFoundException e) {
292 throw new IllegalStateException("Could not find class for name: " + className, e);
296 @Override
297 protected void map(LongWritable key, Text value, final Context context)
298 throws IOException, InterruptedException {
300 Status status = new Status() {
301 @Override
302 public void setStatus(String msg) {
303 context.setStatus(msg);
307 TestOptions opts = GSON.fromJson(value.toString(), TestOptions.class);
308 Configuration conf = HBaseConfiguration.create(context.getConfiguration());
309 final Connection con = ConnectionFactory.createConnection(conf);
310 AsyncConnection asyncCon = null;
311 try {
312 asyncCon = ConnectionFactory.createAsyncConnection(conf).get();
313 } catch (ExecutionException e) {
314 throw new IOException(e);
317 // Evaluation task
318 RunResult result = PerformanceEvaluation.runOneClient(this.cmd, conf, con, asyncCon, opts, status);
319 // Collect how much time the thing took. Report as map output and
320 // to the ELAPSED_TIME counter.
321 context.getCounter(Counter.ELAPSED_TIME).increment(result.duration);
322 context.getCounter(Counter.ROWS).increment(opts.perClientRunRows);
323 context.write(new LongWritable(opts.startRow), new LongWritable(result.duration));
324 context.progress();
329 * If table does not already exist, create. Also create a table when
330 * {@code opts.presplitRegions} is specified or when the existing table's
331 * region replica count doesn't match {@code opts.replicas}.
333 static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
334 TableName tableName = TableName.valueOf(opts.tableName);
335 boolean needsDelete = false, exists = admin.tableExists(tableName);
336 boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
337 || opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
338 if (!exists && isReadCmd) {
339 throw new IllegalStateException(
340 "Must specify an existing table for read commands. Run a write command first.");
342 HTableDescriptor desc =
343 exists ? new HTableDescriptor(admin.getDescriptor(TableName.valueOf(opts.tableName))) : null;
344 byte[][] splits = getSplits(opts);
346 // recreate the table when user has requested presplit or when existing
347 // {RegionSplitPolicy,replica count} does not match requested, or when the
348 // number of column families does not match requested.
349 if ((exists && opts.presplitRegions != DEFAULT_OPTS.presplitRegions)
350 || (!isReadCmd && desc != null &&
351 !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
352 || (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
353 || (desc != null && desc.getColumnFamilyCount() != opts.families)) {
354 needsDelete = true;
355 // wait, why did it delete my table?!?
356 LOG.debug(MoreObjects.toStringHelper("needsDelete")
357 .add("needsDelete", needsDelete)
358 .add("isReadCmd", isReadCmd)
359 .add("exists", exists)
360 .add("desc", desc)
361 .add("presplit", opts.presplitRegions)
362 .add("splitPolicy", opts.splitPolicy)
363 .add("replicas", opts.replicas)
364 .add("families", opts.families)
365 .toString());
368 // remove an existing table
369 if (needsDelete) {
370 if (admin.isTableEnabled(tableName)) {
371 admin.disableTable(tableName);
373 admin.deleteTable(tableName);
376 // table creation is necessary
377 if (!exists || needsDelete) {
378 desc = getTableDescriptor(opts);
379 if (splits != null) {
380 if (LOG.isDebugEnabled()) {
381 for (int i = 0; i < splits.length; i++) {
382 LOG.debug(" split " + i + ": " + Bytes.toStringBinary(splits[i]));
386 admin.createTable(desc, splits);
387 LOG.info("Table " + desc + " created");
389 return admin.tableExists(tableName);
393 * Create an HTableDescriptor from provided TestOptions.
395 protected static HTableDescriptor getTableDescriptor(TestOptions opts) {
396 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(opts.tableName));
397 for (int family = 0; family < opts.families; family++) {
398 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
399 HColumnDescriptor familyDesc = new HColumnDescriptor(familyName);
400 familyDesc.setDataBlockEncoding(opts.blockEncoding);
401 familyDesc.setCompressionType(opts.compression);
402 familyDesc.setBloomFilterType(opts.bloomType);
403 familyDesc.setBlocksize(opts.blockSize);
404 if (opts.inMemoryCF) {
405 familyDesc.setInMemory(true);
407 familyDesc.setInMemoryCompaction(opts.inMemoryCompaction);
408 tableDesc.addFamily(familyDesc);
410 if (opts.replicas != DEFAULT_OPTS.replicas) {
411 tableDesc.setRegionReplication(opts.replicas);
413 if (opts.splitPolicy != null && !opts.splitPolicy.equals(DEFAULT_OPTS.splitPolicy)) {
414 tableDesc.setRegionSplitPolicyClassName(opts.splitPolicy);
416 return tableDesc;
420 * generates splits based on total number of rows and specified split regions
422 protected static byte[][] getSplits(TestOptions opts) {
423 if (opts.presplitRegions == DEFAULT_OPTS.presplitRegions)
424 return null;
426 int numSplitPoints = opts.presplitRegions - 1;
427 byte[][] splits = new byte[numSplitPoints][];
428 int jump = opts.totalRows / opts.presplitRegions;
429 for (int i = 0; i < numSplitPoints; i++) {
430 int rowkey = jump * (1 + i);
431 splits[i] = format(rowkey);
433 return splits;
436 static void setupConnectionCount(final TestOptions opts) {
437 if (opts.oneCon) {
438 opts.connCount = 1;
439 } else {
440 if (opts.connCount == -1) {
441 // set to thread number if connCount is not set
442 opts.connCount = opts.numClientThreads;
448 * Run all clients in this vm each to its own thread.
450 static RunResult[] doLocalClients(final TestOptions opts, final Configuration conf)
451 throws IOException, InterruptedException, ExecutionException {
452 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
453 assert cmd != null;
454 @SuppressWarnings("unchecked")
455 Future<RunResult>[] threads = new Future[opts.numClientThreads];
456 RunResult[] results = new RunResult[opts.numClientThreads];
457 ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads,
458 new ThreadFactoryBuilder().setNameFormat("TestClient-%s").build());
459 setupConnectionCount(opts);
460 final Connection[] cons = new Connection[opts.connCount];
461 final AsyncConnection[] asyncCons = new AsyncConnection[opts.connCount];
462 for (int i = 0; i < opts.connCount; i++) {
463 cons[i] = ConnectionFactory.createConnection(conf);
464 asyncCons[i] = ConnectionFactory.createAsyncConnection(conf).get();
466 LOG.info("Created " + opts.connCount + " connections for " +
467 opts.numClientThreads + " threads");
468 for (int i = 0; i < threads.length; i++) {
469 final int index = i;
470 threads[i] = pool.submit(new Callable<RunResult>() {
471 @Override
472 public RunResult call() throws Exception {
473 TestOptions threadOpts = new TestOptions(opts);
474 final Connection con = cons[index % cons.length];
475 final AsyncConnection asyncCon = asyncCons[index % asyncCons.length];
476 if (threadOpts.startRow == 0) threadOpts.startRow = index * threadOpts.perClientRunRows;
477 RunResult run = runOneClient(cmd, conf, con, asyncCon, threadOpts, new Status() {
478 @Override
479 public void setStatus(final String msg) throws IOException {
480 LOG.info(msg);
483 LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
484 "ms over " + threadOpts.perClientRunRows + " rows");
485 return run;
489 pool.shutdown();
491 for (int i = 0; i < threads.length; i++) {
492 try {
493 results[i] = threads[i].get();
494 } catch (ExecutionException e) {
495 throw new IOException(e.getCause());
498 final String test = cmd.getSimpleName();
499 LOG.info("[" + test + "] Summary of timings (ms): "
500 + Arrays.toString(results));
501 Arrays.sort(results);
502 long total = 0;
503 float avgLatency = 0 ;
504 float avgTPS = 0;
505 for (RunResult result : results) {
506 total += result.duration;
507 avgLatency += result.hist.getSnapshot().getMean();
508 avgTPS += opts.perClientRunRows * 1.0f / result.duration;
510 avgTPS *= 1000; // ms to second
511 avgLatency = avgLatency / results.length;
512 LOG.info("[" + test + " duration ]"
513 + "\tMin: " + results[0] + "ms"
514 + "\tMax: " + results[results.length - 1] + "ms"
515 + "\tAvg: " + (total / results.length) + "ms");
516 LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
517 LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
518 for (int i = 0; i < opts.connCount; i++) {
519 cons[i].close();
520 asyncCons[i].close();
524 return results;
528 * Run a mapreduce job. Run as many maps as asked-for clients.
529 * Before we start up the job, write out an input file with instruction
530 * per client regards which row they are to start on.
531 * @param cmd Command to run.
532 * @throws IOException
534 static Job doMapReduce(TestOptions opts, final Configuration conf)
535 throws IOException, InterruptedException, ClassNotFoundException {
536 final Class<? extends TestBase> cmd = determineCommandClass(opts.cmdName);
537 assert cmd != null;
538 Path inputDir = writeInputFile(conf, opts);
539 conf.set(EvaluationMapTask.CMD_KEY, cmd.getName());
540 conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName());
541 Job job = Job.getInstance(conf);
542 job.setJarByClass(PerformanceEvaluation.class);
543 job.setJobName("HBase Performance Evaluation - " + opts.cmdName);
545 job.setInputFormatClass(NLineInputFormat.class);
546 NLineInputFormat.setInputPaths(job, inputDir);
547 // this is default, but be explicit about it just in case.
548 NLineInputFormat.setNumLinesPerSplit(job, 1);
550 job.setOutputKeyClass(LongWritable.class);
551 job.setOutputValueClass(LongWritable.class);
553 job.setMapperClass(EvaluationMapTask.class);
554 job.setReducerClass(LongSumReducer.class);
556 job.setNumReduceTasks(1);
558 job.setOutputFormatClass(TextOutputFormat.class);
559 TextOutputFormat.setOutputPath(job, new Path(inputDir.getParent(), "outputs"));
561 TableMapReduceUtil.addDependencyJars(job);
562 TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
563 Histogram.class, // yammer metrics
564 Gson.class, // gson
565 FilterAllFilter.class // hbase-server tests jar
568 TableMapReduceUtil.initCredentials(job);
570 job.waitForCompletion(true);
571 return job;
575 * Each client has one mapper to do the work, and client do the resulting count in a map task.
578 static String JOB_INPUT_FILENAME = "input.txt";
581 * Write input file of offsets-per-client for the mapreduce job.
582 * @param c Configuration
583 * @return Directory that contains file written whose name is JOB_INPUT_FILENAME
584 * @throws IOException
586 static Path writeInputFile(final Configuration c, final TestOptions opts) throws IOException {
587 return writeInputFile(c, opts, new Path("."));
590 static Path writeInputFile(final Configuration c, final TestOptions opts, final Path basedir)
591 throws IOException {
592 SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmss");
593 Path jobdir = new Path(new Path(basedir, PERF_EVAL_DIR), formatter.format(new Date()));
594 Path inputDir = new Path(jobdir, "inputs");
596 FileSystem fs = FileSystem.get(c);
597 fs.mkdirs(inputDir);
599 Path inputFile = new Path(inputDir, JOB_INPUT_FILENAME);
600 PrintStream out = new PrintStream(fs.create(inputFile));
601 // Make input random.
602 Map<Integer, String> m = new TreeMap<>();
603 Hash h = MurmurHash.getInstance();
604 int perClientRows = (opts.totalRows / opts.numClientThreads);
605 try {
606 for (int j = 0; j < opts.numClientThreads; j++) {
607 TestOptions next = new TestOptions(opts);
608 next.startRow = j * perClientRows;
609 next.perClientRunRows = perClientRows;
610 String s = GSON.toJson(next);
611 LOG.info("Client=" + j + ", input=" + s);
612 byte[] b = Bytes.toBytes(s);
613 int hash = h.hash(new ByteArrayHashKey(b, 0, b.length), -1);
614 m.put(hash, s);
616 for (Map.Entry<Integer, String> e: m.entrySet()) {
617 out.println(e.getValue());
619 } finally {
620 out.close();
622 return inputDir;
626 * Describes a command.
628 static class CmdDescriptor {
629 private Class<? extends TestBase> cmdClass;
630 private String name;
631 private String description;
633 CmdDescriptor(Class<? extends TestBase> cmdClass, String name, String description) {
634 this.cmdClass = cmdClass;
635 this.name = name;
636 this.description = description;
639 public Class<? extends TestBase> getCmdClass() {
640 return cmdClass;
643 public String getName() {
644 return name;
647 public String getDescription() {
648 return description;
653 * Wraps up options passed to {@link org.apache.hadoop.hbase.PerformanceEvaluation}.
654 * This makes tracking all these arguments a little easier.
655 * NOTE: ADDING AN OPTION, you need to add a data member, a getter/setter (to make JSON
656 * serialization of this TestOptions class behave), and you need to add to the clone constructor
657 * below copying your new option from the 'that' to the 'this'. Look for 'clone' below.
659 static class TestOptions {
660 String cmdName = null;
661 boolean nomapred = false;
662 boolean filterAll = false;
663 int startRow = 0;
664 float size = 1.0f;
665 int perClientRunRows = DEFAULT_ROWS_PER_GB;
666 int numClientThreads = 1;
667 int totalRows = DEFAULT_ROWS_PER_GB;
668 int measureAfter = 0;
669 float sampleRate = 1.0f;
670 double traceRate = 0.0;
671 String tableName = TABLE_NAME;
672 boolean flushCommits = true;
673 boolean writeToWAL = true;
674 boolean autoFlush = false;
675 boolean oneCon = false;
676 int connCount = -1; //wil decide the actual num later
677 boolean useTags = false;
678 int noOfTags = 1;
679 boolean reportLatency = false;
680 int multiGet = 0;
681 int multiPut = 0;
682 int randomSleep = 0;
683 boolean inMemoryCF = false;
684 int presplitRegions = 0;
685 int replicas = HTableDescriptor.DEFAULT_REGION_REPLICATION;
686 String splitPolicy = null;
687 Compression.Algorithm compression = Compression.Algorithm.NONE;
688 BloomType bloomType = BloomType.ROW;
689 int blockSize = HConstants.DEFAULT_BLOCKSIZE;
690 DataBlockEncoding blockEncoding = DataBlockEncoding.NONE;
691 boolean valueRandom = false;
692 boolean valueZipf = false;
693 int valueSize = DEFAULT_VALUE_LENGTH;
694 int period = (this.perClientRunRows / 10) == 0? perClientRunRows: perClientRunRows / 10;
695 int cycles = 1;
696 int columns = 1;
697 int families = 1;
698 int caching = 30;
699 boolean addColumns = true;
700 MemoryCompactionPolicy inMemoryCompaction =
701 MemoryCompactionPolicy.valueOf(
702 CompactingMemStore.COMPACTING_MEMSTORE_TYPE_DEFAULT);
703 boolean asyncPrefetch = false;
704 boolean cacheBlocks = true;
705 Scan.ReadType scanReadType = Scan.ReadType.DEFAULT;
706 long bufferSize = 2l * 1024l * 1024l;
708 public TestOptions() {}
711 * Clone constructor.
712 * @param that Object to copy from.
714 public TestOptions(TestOptions that) {
715 this.cmdName = that.cmdName;
716 this.cycles = that.cycles;
717 this.nomapred = that.nomapred;
718 this.startRow = that.startRow;
719 this.size = that.size;
720 this.perClientRunRows = that.perClientRunRows;
721 this.numClientThreads = that.numClientThreads;
722 this.totalRows = that.totalRows;
723 this.sampleRate = that.sampleRate;
724 this.traceRate = that.traceRate;
725 this.tableName = that.tableName;
726 this.flushCommits = that.flushCommits;
727 this.writeToWAL = that.writeToWAL;
728 this.autoFlush = that.autoFlush;
729 this.oneCon = that.oneCon;
730 this.connCount = that.connCount;
731 this.useTags = that.useTags;
732 this.noOfTags = that.noOfTags;
733 this.reportLatency = that.reportLatency;
734 this.multiGet = that.multiGet;
735 this.multiPut = that.multiPut;
736 this.inMemoryCF = that.inMemoryCF;
737 this.presplitRegions = that.presplitRegions;
738 this.replicas = that.replicas;
739 this.splitPolicy = that.splitPolicy;
740 this.compression = that.compression;
741 this.blockEncoding = that.blockEncoding;
742 this.filterAll = that.filterAll;
743 this.bloomType = that.bloomType;
744 this.blockSize = that.blockSize;
745 this.valueRandom = that.valueRandom;
746 this.valueZipf = that.valueZipf;
747 this.valueSize = that.valueSize;
748 this.period = that.period;
749 this.randomSleep = that.randomSleep;
750 this.measureAfter = that.measureAfter;
751 this.addColumns = that.addColumns;
752 this.columns = that.columns;
753 this.families = that.families;
754 this.caching = that.caching;
755 this.inMemoryCompaction = that.inMemoryCompaction;
756 this.asyncPrefetch = that.asyncPrefetch;
757 this.cacheBlocks = that.cacheBlocks;
758 this.scanReadType = that.scanReadType;
759 this.bufferSize = that.bufferSize;
762 public int getCaching() {
763 return this.caching;
766 public void setCaching(final int caching) {
767 this.caching = caching;
770 public int getColumns() {
771 return this.columns;
774 public void setColumns(final int columns) {
775 this.columns = columns;
778 public int getFamilies() {
779 return this.families;
782 public void setFamilies(final int families) {
783 this.families = families;
786 public int getCycles() {
787 return this.cycles;
790 public void setCycles(final int cycles) {
791 this.cycles = cycles;
794 public boolean isValueZipf() {
795 return valueZipf;
798 public void setValueZipf(boolean valueZipf) {
799 this.valueZipf = valueZipf;
802 public String getCmdName() {
803 return cmdName;
806 public void setCmdName(String cmdName) {
807 this.cmdName = cmdName;
810 public int getRandomSleep() {
811 return randomSleep;
814 public void setRandomSleep(int randomSleep) {
815 this.randomSleep = randomSleep;
818 public int getReplicas() {
819 return replicas;
822 public void setReplicas(int replicas) {
823 this.replicas = replicas;
826 public String getSplitPolicy() {
827 return splitPolicy;
830 public void setSplitPolicy(String splitPolicy) {
831 this.splitPolicy = splitPolicy;
834 public void setNomapred(boolean nomapred) {
835 this.nomapred = nomapred;
838 public void setFilterAll(boolean filterAll) {
839 this.filterAll = filterAll;
842 public void setStartRow(int startRow) {
843 this.startRow = startRow;
846 public void setSize(float size) {
847 this.size = size;
850 public void setPerClientRunRows(int perClientRunRows) {
851 this.perClientRunRows = perClientRunRows;
854 public void setNumClientThreads(int numClientThreads) {
855 this.numClientThreads = numClientThreads;
858 public void setTotalRows(int totalRows) {
859 this.totalRows = totalRows;
862 public void setSampleRate(float sampleRate) {
863 this.sampleRate = sampleRate;
866 public void setTraceRate(double traceRate) {
867 this.traceRate = traceRate;
870 public void setTableName(String tableName) {
871 this.tableName = tableName;
874 public void setFlushCommits(boolean flushCommits) {
875 this.flushCommits = flushCommits;
878 public void setWriteToWAL(boolean writeToWAL) {
879 this.writeToWAL = writeToWAL;
882 public void setAutoFlush(boolean autoFlush) {
883 this.autoFlush = autoFlush;
886 public void setOneCon(boolean oneCon) {
887 this.oneCon = oneCon;
890 public int getConnCount() {
891 return connCount;
894 public void setConnCount(int connCount) {
895 this.connCount = connCount;
898 public void setUseTags(boolean useTags) {
899 this.useTags = useTags;
902 public void setNoOfTags(int noOfTags) {
903 this.noOfTags = noOfTags;
906 public void setReportLatency(boolean reportLatency) {
907 this.reportLatency = reportLatency;
910 public void setMultiGet(int multiGet) {
911 this.multiGet = multiGet;
914 public void setMultiPut(int multiPut) {
915 this.multiPut = multiPut;
918 public void setInMemoryCF(boolean inMemoryCF) {
919 this.inMemoryCF = inMemoryCF;
922 public void setPresplitRegions(int presplitRegions) {
923 this.presplitRegions = presplitRegions;
926 public void setCompression(Compression.Algorithm compression) {
927 this.compression = compression;
930 public void setBloomType(BloomType bloomType) {
931 this.bloomType = bloomType;
934 public void setBlockSize(int blockSize) {
935 this.blockSize = blockSize;
938 public void setBlockEncoding(DataBlockEncoding blockEncoding) {
939 this.blockEncoding = blockEncoding;
942 public void setValueRandom(boolean valueRandom) {
943 this.valueRandom = valueRandom;
946 public void setValueSize(int valueSize) {
947 this.valueSize = valueSize;
950 public void setBufferSize(long bufferSize) {
951 this.bufferSize = bufferSize;
954 public void setPeriod(int period) {
955 this.period = period;
958 public boolean isNomapred() {
959 return nomapred;
962 public boolean isFilterAll() {
963 return filterAll;
966 public int getStartRow() {
967 return startRow;
970 public float getSize() {
971 return size;
974 public int getPerClientRunRows() {
975 return perClientRunRows;
978 public int getNumClientThreads() {
979 return numClientThreads;
982 public int getTotalRows() {
983 return totalRows;
986 public float getSampleRate() {
987 return sampleRate;
990 public double getTraceRate() {
991 return traceRate;
994 public String getTableName() {
995 return tableName;
998 public boolean isFlushCommits() {
999 return flushCommits;
1002 public boolean isWriteToWAL() {
1003 return writeToWAL;
1006 public boolean isAutoFlush() {
1007 return autoFlush;
1010 public boolean isUseTags() {
1011 return useTags;
1014 public int getNoOfTags() {
1015 return noOfTags;
1018 public boolean isReportLatency() {
1019 return reportLatency;
1022 public int getMultiGet() {
1023 return multiGet;
1026 public int getMultiPut() {
1027 return multiPut;
1030 public boolean isInMemoryCF() {
1031 return inMemoryCF;
1034 public int getPresplitRegions() {
1035 return presplitRegions;
1038 public Compression.Algorithm getCompression() {
1039 return compression;
1042 public DataBlockEncoding getBlockEncoding() {
1043 return blockEncoding;
1046 public boolean isValueRandom() {
1047 return valueRandom;
1050 public int getValueSize() {
1051 return valueSize;
1054 public int getPeriod() {
1055 return period;
1058 public BloomType getBloomType() {
1059 return bloomType;
1062 public int getBlockSize() {
1063 return blockSize;
1066 public boolean isOneCon() {
1067 return oneCon;
1070 public int getMeasureAfter() {
1071 return measureAfter;
1074 public void setMeasureAfter(int measureAfter) {
1075 this.measureAfter = measureAfter;
1078 public boolean getAddColumns() {
1079 return addColumns;
1082 public void setAddColumns(boolean addColumns) {
1083 this.addColumns = addColumns;
1086 public void setInMemoryCompaction(MemoryCompactionPolicy inMemoryCompaction) {
1087 this.inMemoryCompaction = inMemoryCompaction;
1090 public MemoryCompactionPolicy getInMemoryCompaction() {
1091 return this.inMemoryCompaction;
1094 public long getBufferSize() {
1095 return this.bufferSize;
1100 * A test.
1101 * Subclass to particularize what happens per row.
1103 static abstract class TestBase {
1104 // Below is make it so when Tests are all running in the one
1105 // jvm, that they each have a differently seeded Random.
1106 private static final Random randomSeed = new Random(System.currentTimeMillis());
1108 private static long nextRandomSeed() {
1109 return randomSeed.nextLong();
1111 private final int everyN;
1113 protected final Random rand = new Random(nextRandomSeed());
1114 protected final Configuration conf;
1115 protected final TestOptions opts;
1117 private final Status status;
1118 private final Sampler traceSampler;
1119 private final SpanReceiverHost receiverHost;
1121 private String testName;
1122 private Histogram latencyHistogram;
1123 private Histogram valueSizeHistogram;
1124 private Histogram rpcCallsHistogram;
1125 private Histogram remoteRpcCallsHistogram;
1126 private Histogram millisBetweenNextHistogram;
1127 private Histogram regionsScannedHistogram;
1128 private Histogram bytesInResultsHistogram;
1129 private Histogram bytesInRemoteResultsHistogram;
1130 private RandomDistribution.Zipf zipf;
1133 * Note that all subclasses of this class must provide a public constructor
1134 * that has the exact same list of arguments.
1136 TestBase(final Configuration conf, final TestOptions options, final Status status) {
1137 this.conf = conf;
1138 this.receiverHost = this.conf == null? null: SpanReceiverHost.getInstance(conf);
1139 this.opts = options;
1140 this.status = status;
1141 this.testName = this.getClass().getSimpleName();
1142 if (options.traceRate >= 1.0) {
1143 this.traceSampler = Sampler.ALWAYS;
1144 } else if (options.traceRate > 0.0) {
1145 conf.setDouble("hbase.sampler.fraction", options.traceRate);
1146 this.traceSampler = new ProbabilitySampler(new HBaseHTraceConfiguration(conf));
1147 } else {
1148 this.traceSampler = Sampler.NEVER;
1150 everyN = (int) (opts.totalRows / (opts.totalRows * opts.sampleRate));
1151 if (options.isValueZipf()) {
1152 this.zipf = new RandomDistribution.Zipf(this.rand, 1, options.getValueSize(), 1.2);
1154 LOG.info("Sampling 1 every " + everyN + " out of " + opts.perClientRunRows + " total rows.");
1157 int getValueLength(final Random r) {
1158 if (this.opts.isValueRandom()) {
1159 return r.nextInt(opts.valueSize);
1160 } else if (this.opts.isValueZipf()) {
1161 return Math.abs(this.zipf.nextInt());
1162 } else {
1163 return opts.valueSize;
1167 void updateValueSize(final Result [] rs) throws IOException {
1168 if (rs == null || !isRandomValueSize()) return;
1169 for (Result r: rs) updateValueSize(r);
1172 void updateValueSize(final Result r) throws IOException {
1173 if (r == null || !isRandomValueSize()) return;
1174 int size = 0;
1175 for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
1176 size += scanner.current().getValueLength();
1178 updateValueSize(size);
1181 void updateValueSize(final int valueSize) {
1182 if (!isRandomValueSize()) return;
1183 this.valueSizeHistogram.update(valueSize);
1186 void updateScanMetrics(final ScanMetrics metrics) {
1187 if (metrics == null) return;
1188 Map<String,Long> metricsMap = metrics.getMetricsMap();
1189 Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
1190 if (rpcCalls != null) {
1191 this.rpcCallsHistogram.update(rpcCalls.longValue());
1193 Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
1194 if (remoteRpcCalls != null) {
1195 this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
1197 Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
1198 if (millisBetweenNext != null) {
1199 this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
1201 Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
1202 if (regionsScanned != null) {
1203 this.regionsScannedHistogram.update(regionsScanned.longValue());
1205 Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
1206 if (bytesInResults != null && bytesInResults.longValue() > 0) {
1207 this.bytesInResultsHistogram.update(bytesInResults.longValue());
1209 Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
1210 if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
1211 this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
1215 String generateStatus(final int sr, final int i, final int lr) {
1216 return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
1217 (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
1220 boolean isRandomValueSize() {
1221 return opts.valueRandom;
1224 protected int getReportingPeriod() {
1225 return opts.period;
1229 * Populated by testTakedown. Only implemented by RandomReadTest at the moment.
1231 public Histogram getLatencyHistogram() {
1232 return latencyHistogram;
1235 void testSetup() throws IOException {
1236 // test metrics
1237 latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1238 valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1239 // scan metrics
1240 rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1241 remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1242 millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1243 regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1244 bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1245 bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
1247 onStartup();
1250 abstract void onStartup() throws IOException;
1252 void testTakedown() throws IOException {
1253 onTakedown();
1254 // Print all stats for this thread continuously.
1255 // Synchronize on Test.class so different threads don't intermingle the
1256 // output. We can't use 'this' here because each thread has its own instance of Test class.
1257 synchronized (Test.class) {
1258 status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
1259 status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
1260 latencyHistogram));
1261 status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
1262 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
1263 if (valueSizeHistogram.getCount() > 0) {
1264 status.setStatus("ValueSize (bytes) : "
1265 + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
1266 status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
1267 status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
1268 } else {
1269 status.setStatus("No valueSize statistics available");
1271 if (rpcCallsHistogram.getCount() > 0) {
1272 status.setStatus("rpcCalls (count): " +
1273 YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
1275 if (remoteRpcCallsHistogram.getCount() > 0) {
1276 status.setStatus("remoteRpcCalls (count): " +
1277 YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
1279 if (millisBetweenNextHistogram.getCount() > 0) {
1280 status.setStatus("millisBetweenNext (latency): " +
1281 YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
1283 if (regionsScannedHistogram.getCount() > 0) {
1284 status.setStatus("regionsScanned (count): " +
1285 YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
1287 if (bytesInResultsHistogram.getCount() > 0) {
1288 status.setStatus("bytesInResults (size): " +
1289 YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
1291 if (bytesInRemoteResultsHistogram.getCount() > 0) {
1292 status.setStatus("bytesInRemoteResults (size): " +
1293 YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
1296 receiverHost.closeReceivers();
1299 abstract void onTakedown() throws IOException;
1303 * Run test
1304 * @return Elapsed time.
1305 * @throws IOException
1307 long test() throws IOException, InterruptedException {
1308 testSetup();
1309 LOG.info("Timed test starting in thread " + Thread.currentThread().getName());
1310 final long startTime = System.nanoTime();
1311 try {
1312 testTimed();
1313 } finally {
1314 testTakedown();
1316 return (System.nanoTime() - startTime) / 1000000;
1319 int getStartRow() {
1320 return opts.startRow;
1323 int getLastRow() {
1324 return getStartRow() + opts.perClientRunRows;
1328 * Provides an extension point for tests that don't want a per row invocation.
1330 void testTimed() throws IOException, InterruptedException {
1331 int startRow = getStartRow();
1332 int lastRow = getLastRow();
1333 TraceUtil.addSampler(traceSampler);
1334 // Report on completion of 1/10th of total.
1335 for (int ii = 0; ii < opts.cycles; ii++) {
1336 if (opts.cycles > 1) LOG.info("Cycle=" + ii + " of " + opts.cycles);
1337 for (int i = startRow; i < lastRow; i++) {
1338 if (i % everyN != 0) continue;
1339 long startTime = System.nanoTime();
1340 boolean requestSent = false;
1341 try (TraceScope scope = TraceUtil.createTrace("test row");){
1342 requestSent = testRow(i);
1344 if ( (i - startRow) > opts.measureAfter) {
1345 // If multiget or multiput is enabled, say set to 10, testRow() returns immediately
1346 // first 9 times and sends the actual get request in the 10th iteration.
1347 // We should only set latency when actual request is sent because otherwise
1348 // it turns out to be 0.
1349 if (requestSent) {
1350 latencyHistogram.update((System.nanoTime() - startTime) / 1000);
1352 if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
1353 status.setStatus(generateStatus(startRow, i, lastRow));
1361 * @return Subset of the histograms' calculation.
1363 public String getShortLatencyReport() {
1364 return YammerHistogramUtils.getShortHistogramReport(this.latencyHistogram);
1368 * @return Subset of the histograms' calculation.
1370 public String getShortValueSizeReport() {
1371 return YammerHistogramUtils.getShortHistogramReport(this.valueSizeHistogram);
1376 * Test for individual row.
1377 * @param i Row index.
1378 * @return true if the row was sent to server and need to record metrics.
1379 * False if not, multiGet and multiPut e.g., the rows are sent
1380 * to server only if enough gets/puts are gathered.
1382 abstract boolean testRow(final int i) throws IOException, InterruptedException;
1385 static abstract class Test extends TestBase {
1386 protected Connection connection;
1388 Test(final Connection con, final TestOptions options, final Status status) {
1389 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1390 this.connection = con;
1394 static abstract class AsyncTest extends TestBase {
1395 protected AsyncConnection connection;
1397 AsyncTest(final AsyncConnection con, final TestOptions options, final Status status) {
1398 super(con == null ? HBaseConfiguration.create() : con.getConfiguration(), options, status);
1399 this.connection = con;
1403 static abstract class TableTest extends Test {
1404 protected Table table;
1406 TableTest(Connection con, TestOptions options, Status status) {
1407 super(con, options, status);
1410 @Override
1411 void onStartup() throws IOException {
1412 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1415 @Override
1416 void onTakedown() throws IOException {
1417 table.close();
1421 static abstract class AsyncTableTest extends AsyncTest {
1422 protected AsyncTable<?> table;
1424 AsyncTableTest(AsyncConnection con, TestOptions options, Status status) {
1425 super(con, options, status);
1428 @Override
1429 void onStartup() throws IOException {
1430 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1433 @Override
1434 void onTakedown() throws IOException {
1438 static class AsyncRandomReadTest extends AsyncTableTest {
1439 private final Consistency consistency;
1440 private ArrayList<Get> gets;
1441 private Random rd = new Random();
1443 AsyncRandomReadTest(AsyncConnection con, TestOptions options, Status status) {
1444 super(con, options, status);
1445 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1446 if (opts.multiGet > 0) {
1447 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1448 this.gets = new ArrayList<>(opts.multiGet);
1452 @Override
1453 boolean testRow(final int i) throws IOException, InterruptedException {
1454 if (opts.randomSleep > 0) {
1455 Thread.sleep(rd.nextInt(opts.randomSleep));
1457 Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1458 for (int family = 0; family < opts.families; family++) {
1459 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1460 if (opts.addColumns) {
1461 for (int column = 0; column < opts.columns; column++) {
1462 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1463 get.addColumn(familyName, qualifier);
1465 } else {
1466 get.addFamily(familyName);
1469 if (opts.filterAll) {
1470 get.setFilter(new FilterAllFilter());
1472 get.setConsistency(consistency);
1473 if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1474 try {
1475 if (opts.multiGet > 0) {
1476 this.gets.add(get);
1477 if (this.gets.size() == opts.multiGet) {
1478 Result[] rs =
1479 this.table.get(this.gets).stream().map(f -> propagate(f::get)).toArray(Result[]::new);
1480 updateValueSize(rs);
1481 this.gets.clear();
1482 } else {
1483 return false;
1485 } else {
1486 updateValueSize(this.table.get(get).get());
1488 } catch (ExecutionException e) {
1489 throw new IOException(e);
1491 return true;
1494 public static RuntimeException runtime(Throwable e) {
1495 if (e instanceof RuntimeException) {
1496 return (RuntimeException) e;
1498 return new RuntimeException(e);
1501 public static <V> V propagate(Callable<V> callable) {
1502 try {
1503 return callable.call();
1504 } catch (Exception e) {
1505 throw runtime(e);
1509 @Override
1510 protected int getReportingPeriod() {
1511 int period = opts.perClientRunRows / 10;
1512 return period == 0 ? opts.perClientRunRows : period;
1515 @Override
1516 protected void testTakedown() throws IOException {
1517 if (this.gets != null && this.gets.size() > 0) {
1518 this.table.get(gets);
1519 this.gets.clear();
1521 super.testTakedown();
1525 static class AsyncRandomWriteTest extends AsyncSequentialWriteTest {
1527 AsyncRandomWriteTest(AsyncConnection con, TestOptions options, Status status) {
1528 super(con, options, status);
1531 @Override
1532 protected byte[] generateRow(final int i) {
1533 return getRandomRow(this.rand, opts.totalRows);
1537 static class AsyncScanTest extends AsyncTableTest {
1538 private ResultScanner testScanner;
1539 private AsyncTable<?> asyncTable;
1541 AsyncScanTest(AsyncConnection con, TestOptions options, Status status) {
1542 super(con, options, status);
1545 @Override
1546 void onStartup() throws IOException {
1547 this.asyncTable =
1548 connection.getTable(TableName.valueOf(opts.tableName),
1549 Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
1552 @Override
1553 void testTakedown() throws IOException {
1554 if (this.testScanner != null) {
1555 updateScanMetrics(this.testScanner.getScanMetrics());
1556 this.testScanner.close();
1558 super.testTakedown();
1561 @Override
1562 boolean testRow(final int i) throws IOException {
1563 if (this.testScanner == null) {
1564 Scan scan =
1565 new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1566 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1567 .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1568 for (int family = 0; family < opts.families; family++) {
1569 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1570 if (opts.addColumns) {
1571 for (int column = 0; column < opts.columns; column++) {
1572 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1573 scan.addColumn(familyName, qualifier);
1575 } else {
1576 scan.addFamily(familyName);
1579 if (opts.filterAll) {
1580 scan.setFilter(new FilterAllFilter());
1582 this.testScanner = asyncTable.getScanner(scan);
1584 Result r = testScanner.next();
1585 updateValueSize(r);
1586 return true;
1590 static class AsyncSequentialReadTest extends AsyncTableTest {
1591 AsyncSequentialReadTest(AsyncConnection con, TestOptions options, Status status) {
1592 super(con, options, status);
1595 @Override
1596 boolean testRow(final int i) throws IOException, InterruptedException {
1597 Get get = new Get(format(i));
1598 for (int family = 0; family < opts.families; family++) {
1599 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1600 if (opts.addColumns) {
1601 for (int column = 0; column < opts.columns; column++) {
1602 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1603 get.addColumn(familyName, qualifier);
1605 } else {
1606 get.addFamily(familyName);
1609 if (opts.filterAll) {
1610 get.setFilter(new FilterAllFilter());
1612 try {
1613 updateValueSize(table.get(get).get());
1614 } catch (ExecutionException e) {
1615 throw new IOException(e);
1617 return true;
1621 static class AsyncSequentialWriteTest extends AsyncTableTest {
1622 private ArrayList<Put> puts;
1624 AsyncSequentialWriteTest(AsyncConnection con, TestOptions options, Status status) {
1625 super(con, options, status);
1626 if (opts.multiPut > 0) {
1627 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
1628 this.puts = new ArrayList<>(opts.multiPut);
1632 protected byte[] generateRow(final int i) {
1633 return format(i);
1636 @Override
1637 @SuppressWarnings("ReturnValueIgnored")
1638 boolean testRow(final int i) throws IOException, InterruptedException {
1639 byte[] row = generateRow(i);
1640 Put put = new Put(row);
1641 for (int family = 0; family < opts.families; family++) {
1642 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1643 for (int column = 0; column < opts.columns; column++) {
1644 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1645 byte[] value = generateData(this.rand, getValueLength(this.rand));
1646 if (opts.useTags) {
1647 byte[] tag = generateData(this.rand, TAG_LENGTH);
1648 Tag[] tags = new Tag[opts.noOfTags];
1649 for (int n = 0; n < opts.noOfTags; n++) {
1650 Tag t = new ArrayBackedTag((byte) n, tag);
1651 tags[n] = t;
1653 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
1654 value, tags);
1655 put.add(kv);
1656 updateValueSize(kv.getValueLength());
1657 } else {
1658 put.addColumn(familyName, qualifier, value);
1659 updateValueSize(value.length);
1663 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1664 try {
1665 table.put(put).get();
1666 if (opts.multiPut > 0) {
1667 this.puts.add(put);
1668 if (this.puts.size() == opts.multiPut) {
1669 this.table.put(puts).stream().map(f -> AsyncRandomReadTest.propagate(f::get));
1670 this.puts.clear();
1671 } else {
1672 return false;
1674 } else {
1675 table.put(put).get();
1677 } catch (ExecutionException e) {
1678 throw new IOException(e);
1680 return true;
1684 static abstract class BufferedMutatorTest extends Test {
1685 protected BufferedMutator mutator;
1686 protected Table table;
1688 BufferedMutatorTest(Connection con, TestOptions options, Status status) {
1689 super(con, options, status);
1692 @Override
1693 void onStartup() throws IOException {
1694 BufferedMutatorParams p = new BufferedMutatorParams(TableName.valueOf(opts.tableName));
1695 p.writeBufferSize(opts.bufferSize);
1696 this.mutator = connection.getBufferedMutator(p);
1697 this.table = connection.getTable(TableName.valueOf(opts.tableName));
1700 @Override
1701 void onTakedown() throws IOException {
1702 mutator.close();
1703 table.close();
1707 static class RandomSeekScanTest extends TableTest {
1708 RandomSeekScanTest(Connection con, TestOptions options, Status status) {
1709 super(con, options, status);
1712 @Override
1713 boolean testRow(final int i) throws IOException {
1714 Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
1715 .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
1716 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
1717 .setScanMetricsEnabled(true);
1718 FilterList list = new FilterList();
1719 for (int family = 0; family < opts.families; family++) {
1720 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1721 if (opts.addColumns) {
1722 for (int column = 0; column < opts.columns; column++) {
1723 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1724 scan.addColumn(familyName, qualifier);
1726 } else {
1727 scan.addFamily(familyName);
1730 if (opts.filterAll) {
1731 list.addFilter(new FilterAllFilter());
1733 list.addFilter(new WhileMatchFilter(new PageFilter(120)));
1734 scan.setFilter(list);
1735 ResultScanner s = this.table.getScanner(scan);
1736 try {
1737 for (Result rr; (rr = s.next()) != null;) {
1738 updateValueSize(rr);
1740 } finally {
1741 updateScanMetrics(s.getScanMetrics());
1742 s.close();
1744 return true;
1747 @Override
1748 protected int getReportingPeriod() {
1749 int period = opts.perClientRunRows / 100;
1750 return period == 0 ? opts.perClientRunRows : period;
1755 static abstract class RandomScanWithRangeTest extends TableTest {
1756 RandomScanWithRangeTest(Connection con, TestOptions options, Status status) {
1757 super(con, options, status);
1760 @Override
1761 boolean testRow(final int i) throws IOException {
1762 Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
1763 Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
1764 .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
1765 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1766 .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1767 for (int family = 0; family < opts.families; family++) {
1768 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1769 if (opts.addColumns) {
1770 for (int column = 0; column < opts.columns; column++) {
1771 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1772 scan.addColumn(familyName, qualifier);
1774 } else {
1775 scan.addFamily(familyName);
1778 if (opts.filterAll) {
1779 scan.setFilter(new FilterAllFilter());
1781 Result r = null;
1782 int count = 0;
1783 ResultScanner s = this.table.getScanner(scan);
1784 try {
1785 for (; (r = s.next()) != null;) {
1786 updateValueSize(r);
1787 count++;
1789 if (i % 100 == 0) {
1790 LOG.info(String.format("Scan for key range %s - %s returned %s rows",
1791 Bytes.toString(startAndStopRow.getFirst()),
1792 Bytes.toString(startAndStopRow.getSecond()), count));
1794 } finally {
1795 updateScanMetrics(s.getScanMetrics());
1796 s.close();
1798 return true;
1801 protected abstract Pair<byte[],byte[]> getStartAndStopRow();
1803 protected Pair<byte[], byte[]> generateStartAndStopRows(int maxRange) {
1804 int start = this.rand.nextInt(Integer.MAX_VALUE) % opts.totalRows;
1805 int stop = start + maxRange;
1806 return new Pair<>(format(start), format(stop));
1809 @Override
1810 protected int getReportingPeriod() {
1811 int period = opts.perClientRunRows / 100;
1812 return period == 0? opts.perClientRunRows: period;
1816 static class RandomScanWithRange10Test extends RandomScanWithRangeTest {
1817 RandomScanWithRange10Test(Connection con, TestOptions options, Status status) {
1818 super(con, options, status);
1821 @Override
1822 protected Pair<byte[], byte[]> getStartAndStopRow() {
1823 return generateStartAndStopRows(10);
1827 static class RandomScanWithRange100Test extends RandomScanWithRangeTest {
1828 RandomScanWithRange100Test(Connection con, TestOptions options, Status status) {
1829 super(con, options, status);
1832 @Override
1833 protected Pair<byte[], byte[]> getStartAndStopRow() {
1834 return generateStartAndStopRows(100);
1838 static class RandomScanWithRange1000Test extends RandomScanWithRangeTest {
1839 RandomScanWithRange1000Test(Connection con, TestOptions options, Status status) {
1840 super(con, options, status);
1843 @Override
1844 protected Pair<byte[], byte[]> getStartAndStopRow() {
1845 return generateStartAndStopRows(1000);
1849 static class RandomScanWithRange10000Test extends RandomScanWithRangeTest {
1850 RandomScanWithRange10000Test(Connection con, TestOptions options, Status status) {
1851 super(con, options, status);
1854 @Override
1855 protected Pair<byte[], byte[]> getStartAndStopRow() {
1856 return generateStartAndStopRows(10000);
1860 static class RandomReadTest extends TableTest {
1861 private final Consistency consistency;
1862 private ArrayList<Get> gets;
1863 private Random rd = new Random();
1865 RandomReadTest(Connection con, TestOptions options, Status status) {
1866 super(con, options, status);
1867 consistency = options.replicas == DEFAULT_OPTS.replicas ? null : Consistency.TIMELINE;
1868 if (opts.multiGet > 0) {
1869 LOG.info("MultiGet enabled. Sending GETs in batches of " + opts.multiGet + ".");
1870 this.gets = new ArrayList<>(opts.multiGet);
1874 @Override
1875 boolean testRow(final int i) throws IOException, InterruptedException {
1876 if (opts.randomSleep > 0) {
1877 Thread.sleep(rd.nextInt(opts.randomSleep));
1879 Get get = new Get(getRandomRow(this.rand, opts.totalRows));
1880 for (int family = 0; family < opts.families; family++) {
1881 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1882 if (opts.addColumns) {
1883 for (int column = 0; column < opts.columns; column++) {
1884 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1885 get.addColumn(familyName, qualifier);
1887 } else {
1888 get.addFamily(familyName);
1891 if (opts.filterAll) {
1892 get.setFilter(new FilterAllFilter());
1894 get.setConsistency(consistency);
1895 if (LOG.isTraceEnabled()) LOG.trace(get.toString());
1896 if (opts.multiGet > 0) {
1897 this.gets.add(get);
1898 if (this.gets.size() == opts.multiGet) {
1899 Result [] rs = this.table.get(this.gets);
1900 updateValueSize(rs);
1901 this.gets.clear();
1902 } else {
1903 return false;
1905 } else {
1906 updateValueSize(this.table.get(get));
1908 return true;
1911 @Override
1912 protected int getReportingPeriod() {
1913 int period = opts.perClientRunRows / 10;
1914 return period == 0 ? opts.perClientRunRows : period;
1917 @Override
1918 protected void testTakedown() throws IOException {
1919 if (this.gets != null && this.gets.size() > 0) {
1920 this.table.get(gets);
1921 this.gets.clear();
1923 super.testTakedown();
1927 static class RandomWriteTest extends SequentialWriteTest {
1928 RandomWriteTest(Connection con, TestOptions options, Status status) {
1929 super(con, options, status);
1932 @Override
1933 protected byte[] generateRow(final int i) {
1934 return getRandomRow(this.rand, opts.totalRows);
1940 static class ScanTest extends TableTest {
1941 private ResultScanner testScanner;
1943 ScanTest(Connection con, TestOptions options, Status status) {
1944 super(con, options, status);
1947 @Override
1948 void testTakedown() throws IOException {
1949 if (this.testScanner != null) {
1950 this.testScanner.close();
1952 super.testTakedown();
1956 @Override
1957 boolean testRow(final int i) throws IOException {
1958 if (this.testScanner == null) {
1959 Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
1960 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
1961 .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
1962 for (int family = 0; family < opts.families; family++) {
1963 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
1964 if (opts.addColumns) {
1965 for (int column = 0; column < opts.columns; column++) {
1966 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
1967 scan.addColumn(familyName, qualifier);
1969 } else {
1970 scan.addFamily(familyName);
1973 if (opts.filterAll) {
1974 scan.setFilter(new FilterAllFilter());
1976 this.testScanner = table.getScanner(scan);
1978 Result r = testScanner.next();
1979 updateValueSize(r);
1980 return true;
1985 * Base class for operations that are CAS-like; that read a value and then set it based off what
1986 * they read. In this category is increment, append, checkAndPut, etc.
1988 * <p>These operations also want some concurrency going on. Usually when these tests run, they
1989 * operate in their own part of the key range. In CASTest, we will have them all overlap on the
1990 * same key space. We do this with our getStartRow and getLastRow overrides.
1992 static abstract class CASTableTest extends TableTest {
1993 private final byte [] qualifier;
1994 CASTableTest(Connection con, TestOptions options, Status status) {
1995 super(con, options, status);
1996 qualifier = Bytes.toBytes(this.getClass().getSimpleName());
1999 byte [] getQualifier() {
2000 return this.qualifier;
2003 @Override
2004 int getStartRow() {
2005 return 0;
2008 @Override
2009 int getLastRow() {
2010 return opts.perClientRunRows;
2014 static class IncrementTest extends CASTableTest {
2015 IncrementTest(Connection con, TestOptions options, Status status) {
2016 super(con, options, status);
2019 @Override
2020 boolean testRow(final int i) throws IOException {
2021 Increment increment = new Increment(format(i));
2022 // unlike checkAndXXX tests, which make most sense to do on a single value,
2023 // if multiple families are specified for an increment test we assume it is
2024 // meant to raise the work factor
2025 for (int family = 0; family < opts.families; family++) {
2026 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2027 increment.addColumn(familyName, getQualifier(), 1l);
2029 updateValueSize(this.table.increment(increment));
2030 return true;
2034 static class AppendTest extends CASTableTest {
2035 AppendTest(Connection con, TestOptions options, Status status) {
2036 super(con, options, status);
2039 @Override
2040 boolean testRow(final int i) throws IOException {
2041 byte [] bytes = format(i);
2042 Append append = new Append(bytes);
2043 // unlike checkAndXXX tests, which make most sense to do on a single value,
2044 // if multiple families are specified for an append test we assume it is
2045 // meant to raise the work factor
2046 for (int family = 0; family < opts.families; family++) {
2047 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2048 append.addColumn(familyName, getQualifier(), bytes);
2050 updateValueSize(this.table.append(append));
2051 return true;
2055 static class CheckAndMutateTest extends CASTableTest {
2056 CheckAndMutateTest(Connection con, TestOptions options, Status status) {
2057 super(con, options, status);
2060 @Override
2061 boolean testRow(final int i) throws IOException {
2062 final byte [] bytes = format(i);
2063 // checkAndXXX tests operate on only a single value
2064 // Put a known value so when we go to check it, it is there.
2065 Put put = new Put(bytes);
2066 put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2067 this.table.put(put);
2068 RowMutations mutations = new RowMutations(bytes);
2069 mutations.add(put);
2070 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2071 .ifEquals(bytes).thenMutate(mutations);
2072 return true;
2076 static class CheckAndPutTest extends CASTableTest {
2077 CheckAndPutTest(Connection con, TestOptions options, Status status) {
2078 super(con, options, status);
2081 @Override
2082 boolean testRow(final int i) throws IOException {
2083 final byte [] bytes = format(i);
2084 // checkAndXXX tests operate on only a single value
2085 // Put a known value so when we go to check it, it is there.
2086 Put put = new Put(bytes);
2087 put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2088 this.table.put(put);
2089 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2090 .ifEquals(bytes).thenPut(put);
2091 return true;
2095 static class CheckAndDeleteTest extends CASTableTest {
2096 CheckAndDeleteTest(Connection con, TestOptions options, Status status) {
2097 super(con, options, status);
2100 @Override
2101 boolean testRow(final int i) throws IOException {
2102 final byte [] bytes = format(i);
2103 // checkAndXXX tests operate on only a single value
2104 // Put a known value so when we go to check it, it is there.
2105 Put put = new Put(bytes);
2106 put.addColumn(FAMILY_ZERO, getQualifier(), bytes);
2107 this.table.put(put);
2108 Delete delete = new Delete(put.getRow());
2109 delete.addColumn(FAMILY_ZERO, getQualifier());
2110 this.table.checkAndMutate(bytes, FAMILY_ZERO).qualifier(getQualifier())
2111 .ifEquals(bytes).thenDelete(delete);
2112 return true;
2116 static class SequentialReadTest extends TableTest {
2117 SequentialReadTest(Connection con, TestOptions options, Status status) {
2118 super(con, options, status);
2121 @Override
2122 boolean testRow(final int i) throws IOException {
2123 Get get = new Get(format(i));
2124 for (int family = 0; family < opts.families; family++) {
2125 byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
2126 if (opts.addColumns) {
2127 for (int column = 0; column < opts.columns; column++) {
2128 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2129 get.addColumn(familyName, qualifier);
2131 } else {
2132 get.addFamily(familyName);
2135 if (opts.filterAll) {
2136 get.setFilter(new FilterAllFilter());
2138 updateValueSize(table.get(get));
2139 return true;
2143 static class SequentialWriteTest extends BufferedMutatorTest {
2144 private ArrayList<Put> puts;
2147 SequentialWriteTest(Connection con, TestOptions options, Status status) {
2148 super(con, options, status);
2149 if (opts.multiPut > 0) {
2150 LOG.info("MultiPut enabled. Sending PUTs in batches of " + opts.multiPut + ".");
2151 this.puts = new ArrayList<>(opts.multiPut);
2155 protected byte[] generateRow(final int i) {
2156 return format(i);
2159 @Override
2160 boolean testRow(final int i) throws IOException {
2161 byte[] row = generateRow(i);
2162 Put put = new Put(row);
2163 for (int family = 0; family < opts.families; family++) {
2164 byte familyName[] = Bytes.toBytes(FAMILY_NAME_BASE + family);
2165 for (int column = 0; column < opts.columns; column++) {
2166 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2167 byte[] value = generateData(this.rand, getValueLength(this.rand));
2168 if (opts.useTags) {
2169 byte[] tag = generateData(this.rand, TAG_LENGTH);
2170 Tag[] tags = new Tag[opts.noOfTags];
2171 for (int n = 0; n < opts.noOfTags; n++) {
2172 Tag t = new ArrayBackedTag((byte) n, tag);
2173 tags[n] = t;
2175 KeyValue kv = new KeyValue(row, familyName, qualifier, HConstants.LATEST_TIMESTAMP,
2176 value, tags);
2177 put.add(kv);
2178 updateValueSize(kv.getValueLength());
2179 } else {
2180 put.addColumn(familyName, qualifier, value);
2181 updateValueSize(value.length);
2185 put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
2186 if (opts.autoFlush) {
2187 if (opts.multiPut > 0) {
2188 this.puts.add(put);
2189 if (this.puts.size() == opts.multiPut) {
2190 table.put(this.puts);
2191 this.puts.clear();
2192 } else {
2193 return false;
2195 } else {
2196 table.put(put);
2198 } else {
2199 mutator.mutate(put);
2201 return true;
2205 static class FilteredScanTest extends TableTest {
2206 protected static final Logger LOG = LoggerFactory.getLogger(FilteredScanTest.class.getName());
2208 FilteredScanTest(Connection con, TestOptions options, Status status) {
2209 super(con, options, status);
2210 if (opts.perClientRunRows == DEFAULT_ROWS_PER_GB) {
2211 LOG.warn("Option \"rows\" unspecified. Using default value " + DEFAULT_ROWS_PER_GB +
2212 ". This could take a very long time.");
2216 @Override
2217 boolean testRow(int i) throws IOException {
2218 byte[] value = generateData(this.rand, getValueLength(this.rand));
2219 Scan scan = constructScan(value);
2220 ResultScanner scanner = null;
2221 try {
2222 scanner = this.table.getScanner(scan);
2223 for (Result r = null; (r = scanner.next()) != null;) {
2224 updateValueSize(r);
2226 } finally {
2227 if (scanner != null) {
2228 updateScanMetrics(scanner.getScanMetrics());
2229 scanner.close();
2232 return true;
2235 protected Scan constructScan(byte[] valuePrefix) throws IOException {
2236 FilterList list = new FilterList();
2237 Filter filter = new SingleColumnValueFilter(FAMILY_ZERO, COLUMN_ZERO,
2238 CompareOperator.EQUAL, new BinaryComparator(valuePrefix));
2239 list.addFilter(filter);
2240 if (opts.filterAll) {
2241 list.addFilter(new FilterAllFilter());
2243 Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
2244 .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
2245 .setScanMetricsEnabled(true);
2246 if (opts.addColumns) {
2247 for (int column = 0; column < opts.columns; column++) {
2248 byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
2249 scan.addColumn(FAMILY_ZERO, qualifier);
2251 } else {
2252 scan.addFamily(FAMILY_ZERO);
2254 scan.setFilter(list);
2255 return scan;
2260 * Compute a throughput rate in MB/s.
2261 * @param rows Number of records consumed.
2262 * @param timeMs Time taken in milliseconds.
2263 * @return String value with label, ie '123.76 MB/s'
2265 private static String calculateMbps(int rows, long timeMs, final int valueSize, int families, int columns) {
2266 BigDecimal rowSize = BigDecimal.valueOf(ROW_LENGTH +
2267 ((valueSize + (FAMILY_NAME_BASE.length()+1) + COLUMN_ZERO.length) * columns) * families);
2268 BigDecimal mbps = BigDecimal.valueOf(rows).multiply(rowSize, CXT)
2269 .divide(BigDecimal.valueOf(timeMs), CXT).multiply(MS_PER_SEC, CXT)
2270 .divide(BYTES_PER_MB, CXT);
2271 return FMT.format(mbps) + " MB/s";
2275 * Format passed integer.
2276 * @param number
2277 * @return Returns zero-prefixed ROW_LENGTH-byte wide decimal version of passed
2278 * number (Does absolute in case number is negative).
2280 public static byte [] format(final int number) {
2281 byte [] b = new byte[ROW_LENGTH];
2282 int d = Math.abs(number);
2283 for (int i = b.length - 1; i >= 0; i--) {
2284 b[i] = (byte)((d % 10) + '0');
2285 d /= 10;
2287 return b;
2291 * This method takes some time and is done inline uploading data. For
2292 * example, doing the mapfile test, generation of the key and value
2293 * consumes about 30% of CPU time.
2294 * @return Generated random value to insert into a table cell.
2296 public static byte[] generateData(final Random r, int length) {
2297 byte [] b = new byte [length];
2298 int i;
2300 for(i = 0; i < (length-8); i += 8) {
2301 b[i] = (byte) (65 + r.nextInt(26));
2302 b[i+1] = b[i];
2303 b[i+2] = b[i];
2304 b[i+3] = b[i];
2305 b[i+4] = b[i];
2306 b[i+5] = b[i];
2307 b[i+6] = b[i];
2308 b[i+7] = b[i];
2311 byte a = (byte) (65 + r.nextInt(26));
2312 for(; i < length; i++) {
2313 b[i] = a;
2315 return b;
2318 static byte [] getRandomRow(final Random random, final int totalRows) {
2319 return format(generateRandomRow(random, totalRows));
2322 static int generateRandomRow(final Random random, final int totalRows) {
2323 return random.nextInt(Integer.MAX_VALUE) % totalRows;
2326 static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration conf,
2327 Connection con, AsyncConnection asyncCon, TestOptions opts, final Status status)
2328 throws IOException, InterruptedException {
2329 status.setStatus("Start " + cmd + " at offset " + opts.startRow + " for "
2330 + opts.perClientRunRows + " rows");
2331 long totalElapsedTime;
2333 final TestBase t;
2334 try {
2335 if (AsyncTest.class.isAssignableFrom(cmd)) {
2336 Class<? extends AsyncTest> newCmd = (Class<? extends AsyncTest>) cmd;
2337 Constructor<? extends AsyncTest> constructor =
2338 newCmd.getDeclaredConstructor(AsyncConnection.class, TestOptions.class, Status.class);
2339 t = constructor.newInstance(asyncCon, opts, status);
2340 } else {
2341 Class<? extends Test> newCmd = (Class<? extends Test>) cmd;
2342 Constructor<? extends Test> constructor =
2343 newCmd.getDeclaredConstructor(Connection.class, TestOptions.class, Status.class);
2344 t = constructor.newInstance(con, opts, status);
2346 } catch (NoSuchMethodException e) {
2347 throw new IllegalArgumentException("Invalid command class: " + cmd.getName()
2348 + ". It does not provide a constructor as described by "
2349 + "the javadoc comment. Available constructors are: "
2350 + Arrays.toString(cmd.getConstructors()));
2351 } catch (Exception e) {
2352 throw new IllegalStateException("Failed to construct command class", e);
2354 totalElapsedTime = t.test();
2356 status.setStatus("Finished " + cmd + " in " + totalElapsedTime +
2357 "ms at offset " + opts.startRow + " for " + opts.perClientRunRows + " rows" +
2358 " (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
2359 getAverageValueLength(opts), opts.families, opts.columns) + ")");
2361 return new RunResult(totalElapsedTime, t.getLatencyHistogram());
2364 private static int getAverageValueLength(final TestOptions opts) {
2365 return opts.valueRandom? opts.valueSize/2: opts.valueSize;
2368 private void runTest(final Class<? extends TestBase> cmd, TestOptions opts) throws IOException,
2369 InterruptedException, ClassNotFoundException, ExecutionException {
2370 // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do
2371 // the TestOptions introspection for us and dump the output in a readable format.
2372 LOG.info(cmd.getSimpleName() + " test run options=" + GSON.toJson(opts));
2373 Admin admin = null;
2374 Connection connection = null;
2375 try {
2376 connection = ConnectionFactory.createConnection(getConf());
2377 admin = connection.getAdmin();
2378 checkTable(admin, opts);
2379 } finally {
2380 if (admin != null) admin.close();
2381 if (connection != null) connection.close();
2383 if (opts.nomapred) {
2384 doLocalClients(opts, getConf());
2385 } else {
2386 doMapReduce(opts, getConf());
2390 protected void printUsage() {
2391 printUsage(PE_COMMAND_SHORTNAME, null);
2394 protected static void printUsage(final String message) {
2395 printUsage(PE_COMMAND_SHORTNAME, message);
2398 protected static void printUsageAndExit(final String message, final int exitCode) {
2399 printUsage(message);
2400 System.exit(exitCode);
2403 protected static void printUsage(final String shortName, final String message) {
2404 if (message != null && message.length() > 0) {
2405 System.err.println(message);
2407 System.err.print("Usage: hbase " + shortName);
2408 System.err.println(" <OPTIONS> [-D<property=value>]* <command> <nclients>");
2409 System.err.println();
2410 System.err.println("General Options:");
2411 System.err.println(" nomapred Run multiple clients using threads " +
2412 "(rather than use mapreduce)");
2413 System.err.println(" oneCon all the threads share the same connection. Default: False");
2414 System.err.println(" connCount connections all threads share. "
2415 + "For example, if set to 2, then all thread share 2 connection. "
2416 + "Default: depend on oneCon parameter. if oneCon set to true, then connCount=1, "
2417 + "if not, connCount=thread number");
2419 System.err.println(" sampleRate Execute test on a sample of total " +
2420 "rows. Only supported by randomRead. Default: 1.0");
2421 System.err.println(" period Report every 'period' rows: " +
2422 "Default: opts.perClientRunRows / 10 = " + DEFAULT_OPTS.getPerClientRunRows()/10);
2423 System.err.println(" cycles How many times to cycle the test. Defaults: 1.");
2424 System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
2425 "Default: 0");
2426 System.err.println(" latency Set to report operation latencies. Default: False");
2427 System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" +
2428 " rows have been treated. Default: 0");
2429 System.err.println(" valueSize Pass value size to use: Default: "
2430 + DEFAULT_OPTS.getValueSize());
2431 System.err.println(" valueRandom Set if we should vary value size between 0 and " +
2432 "'valueSize'; set on read for stats on size: Default: Not set.");
2433 System.err.println(" blockEncoding Block encoding to use. Value should be one of "
2434 + Arrays.toString(DataBlockEncoding.values()) + ". Default: NONE");
2435 System.err.println();
2436 System.err.println("Table Creation / Write Tests:");
2437 System.err.println(" table Alternate table name. Default: 'TestTable'");
2438 System.err.println(" rows Rows each client runs. Default: "
2439 + DEFAULT_OPTS.getPerClientRunRows()
2440 + ". In case of randomReads and randomSeekScans this could"
2441 + " be specified along with --size to specify the number of rows to be scanned within"
2442 + " the total range specified by the size.");
2443 System.err.println(
2444 " size Total size in GiB. Mutually exclusive with --rows for writes and scans"
2445 + ". But for randomReads and randomSeekScans when you use size with --rows you could"
2446 + " use size to specify the end range and --rows"
2447 + " specifies the number of rows within that range. " + "Default: 1.0.");
2448 System.err.println(" compress Compression type to use (GZ, LZO, ...). Default: 'NONE'");
2449 System.err.println(" flushCommits Used to determine if the test should flush the table. " +
2450 "Default: false");
2451 System.err.println(" valueZipf Set if we should vary value size between 0 and " +
2452 "'valueSize' in zipf form: Default: Not set.");
2453 System.err.println(" writeToWAL Set writeToWAL on puts. Default: True");
2454 System.err.println(" autoFlush Set autoFlush on htable. Default: False");
2455 System.err.println(" multiPut Batch puts together into groups of N. Only supported " +
2456 "by write. If multiPut is bigger than 0, autoFlush need to set to true. Default: 0");
2457 System.err.println(" presplit Create presplit table. If a table with same name exists,"
2458 + " it'll be deleted and recreated (instead of verifying count of its existing regions). "
2459 + "Recommended for accurate perf analysis (see guide). Default: disabled");
2460 System.err.println(" usetags Writes tags along with KVs. Use with HFile V3. " +
2461 "Default: false");
2462 System.err.println(" numoftags Specify the no of tags that would be needed. " +
2463 "This works only if usetags is true. Default: " + DEFAULT_OPTS.noOfTags);
2464 System.err.println(" splitPolicy Specify a custom RegionSplitPolicy for the table.");
2465 System.err.println(" columns Columns to write per row. Default: 1");
2466 System.err.println(" families Specify number of column families for the table. Default: 1");
2467 System.err.println();
2468 System.err.println("Read Tests:");
2469 System.err.println(" filterAll Helps to filter out all the rows on the server side"
2470 + " there by not returning any thing back to the client. Helps to check the server side"
2471 + " performance. Uses FilterAllFilter internally. ");
2472 System.err.println(" multiGet Batch gets together into groups of N. Only supported " +
2473 "by randomRead. Default: disabled");
2474 System.err.println(" inmemory Tries to keep the HFiles of the CF " +
2475 "inmemory as far as possible. Not guaranteed that reads are always served " +
2476 "from memory. Default: false");
2477 System.err.println(" bloomFilter Bloom filter type, one of "
2478 + Arrays.toString(BloomType.values()));
2479 System.err.println(" blockSize Blocksize to use when writing out hfiles. ");
2480 System.err.println(" inmemoryCompaction Makes the column family to do inmemory flushes/compactions. "
2481 + "Uses the CompactingMemstore");
2482 System.err.println(" addColumns Adds columns to scans/gets explicitly. Default: true");
2483 System.err.println(" replicas Enable region replica testing. Defaults: 1.");
2484 System.err.println(" randomSleep Do a random sleep before each get between 0 and entered value. Defaults: 0");
2485 System.err.println(" caching Scan caching to use. Default: 30");
2486 System.err.println(" asyncPrefetch Enable asyncPrefetch for scan");
2487 System.err.println(" cacheBlocks Set the cacheBlocks option for scan. Default: true");
2488 System.err.println(" scanReadType Set the readType option for scan, stream/pread/default. Default: default");
2489 System.err.println(" bufferSize Set the value of client side buffering. Default: 2MB");
2490 System.err.println();
2491 System.err.println(" Note: -D properties will be applied to the conf used. ");
2492 System.err.println(" For example: ");
2493 System.err.println(" -Dmapreduce.output.fileoutputformat.compress=true");
2494 System.err.println(" -Dmapreduce.task.timeout=60000");
2495 System.err.println();
2496 System.err.println("Command:");
2497 for (CmdDescriptor command : COMMANDS.values()) {
2498 System.err.println(String.format(" %-20s %s", command.getName(), command.getDescription()));
2500 System.err.println();
2501 System.err.println("Args:");
2502 System.err.println(" nclients Integer. Required. Total number of clients "
2503 + "(and HRegionServers) running. 1 <= value <= 500");
2504 System.err.println("Examples:");
2505 System.err.println(" To run a single client doing the default 1M sequentialWrites:");
2506 System.err.println(" $ hbase " + shortName + " sequentialWrite 1");
2507 System.err.println(" To run 10 clients doing increments over ten rows:");
2508 System.err.println(" $ hbase " + shortName + " --rows=10 --nomapred increment 10");
2512 * Parse options passed in via an arguments array. Assumes that array has been split
2513 * on white-space and placed into a {@code Queue}. Any unknown arguments will remain
2514 * in the queue at the conclusion of this method call. It's up to the caller to deal
2515 * with these unrecognized arguments.
2517 static TestOptions parseOpts(Queue<String> args) {
2518 TestOptions opts = new TestOptions();
2520 String cmd = null;
2521 while ((cmd = args.poll()) != null) {
2522 if (cmd.equals("-h") || cmd.startsWith("--h")) {
2523 // place item back onto queue so that caller knows parsing was incomplete
2524 args.add(cmd);
2525 break;
2528 final String nmr = "--nomapred";
2529 if (cmd.startsWith(nmr)) {
2530 opts.nomapred = true;
2531 continue;
2534 final String rows = "--rows=";
2535 if (cmd.startsWith(rows)) {
2536 opts.perClientRunRows = Integer.parseInt(cmd.substring(rows.length()));
2537 continue;
2540 final String cycles = "--cycles=";
2541 if (cmd.startsWith(cycles)) {
2542 opts.cycles = Integer.parseInt(cmd.substring(cycles.length()));
2543 continue;
2546 final String sampleRate = "--sampleRate=";
2547 if (cmd.startsWith(sampleRate)) {
2548 opts.sampleRate = Float.parseFloat(cmd.substring(sampleRate.length()));
2549 continue;
2552 final String table = "--table=";
2553 if (cmd.startsWith(table)) {
2554 opts.tableName = cmd.substring(table.length());
2555 continue;
2558 final String startRow = "--startRow=";
2559 if (cmd.startsWith(startRow)) {
2560 opts.startRow = Integer.parseInt(cmd.substring(startRow.length()));
2561 continue;
2564 final String compress = "--compress=";
2565 if (cmd.startsWith(compress)) {
2566 opts.compression = Compression.Algorithm.valueOf(cmd.substring(compress.length()));
2567 continue;
2570 final String traceRate = "--traceRate=";
2571 if (cmd.startsWith(traceRate)) {
2572 opts.traceRate = Double.parseDouble(cmd.substring(traceRate.length()));
2573 continue;
2576 final String blockEncoding = "--blockEncoding=";
2577 if (cmd.startsWith(blockEncoding)) {
2578 opts.blockEncoding = DataBlockEncoding.valueOf(cmd.substring(blockEncoding.length()));
2579 continue;
2582 final String flushCommits = "--flushCommits=";
2583 if (cmd.startsWith(flushCommits)) {
2584 opts.flushCommits = Boolean.parseBoolean(cmd.substring(flushCommits.length()));
2585 continue;
2588 final String writeToWAL = "--writeToWAL=";
2589 if (cmd.startsWith(writeToWAL)) {
2590 opts.writeToWAL = Boolean.parseBoolean(cmd.substring(writeToWAL.length()));
2591 continue;
2594 final String presplit = "--presplit=";
2595 if (cmd.startsWith(presplit)) {
2596 opts.presplitRegions = Integer.parseInt(cmd.substring(presplit.length()));
2597 continue;
2600 final String inMemory = "--inmemory=";
2601 if (cmd.startsWith(inMemory)) {
2602 opts.inMemoryCF = Boolean.parseBoolean(cmd.substring(inMemory.length()));
2603 continue;
2606 final String autoFlush = "--autoFlush=";
2607 if (cmd.startsWith(autoFlush)) {
2608 opts.autoFlush = Boolean.parseBoolean(cmd.substring(autoFlush.length()));
2609 if (!opts.autoFlush && opts.multiPut > 0) {
2610 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2612 continue;
2615 final String onceCon = "--oneCon=";
2616 if (cmd.startsWith(onceCon)) {
2617 opts.oneCon = Boolean.parseBoolean(cmd.substring(onceCon.length()));
2618 if (opts.oneCon && opts.connCount > 1) {
2619 throw new IllegalArgumentException("oneCon is set to true, "
2620 + "connCount should not bigger than 1");
2622 continue;
2625 final String connCount = "--connCount=";
2626 if (cmd.startsWith(connCount)) {
2627 opts.connCount = Integer.parseInt(cmd.substring(connCount.length()));
2628 if (opts.oneCon && opts.connCount > 1) {
2629 throw new IllegalArgumentException("oneCon is set to true, "
2630 + "connCount should not bigger than 1");
2632 continue;
2635 final String latency = "--latency";
2636 if (cmd.startsWith(latency)) {
2637 opts.reportLatency = true;
2638 continue;
2641 final String multiGet = "--multiGet=";
2642 if (cmd.startsWith(multiGet)) {
2643 opts.multiGet = Integer.parseInt(cmd.substring(multiGet.length()));
2644 continue;
2647 final String multiPut = "--multiPut=";
2648 if (cmd.startsWith(multiPut)) {
2649 opts.multiPut = Integer.parseInt(cmd.substring(multiPut.length()));
2650 if (!opts.autoFlush && opts.multiPut > 0) {
2651 throw new IllegalArgumentException("autoFlush must be true when multiPut is more than 0");
2653 continue;
2656 final String useTags = "--usetags=";
2657 if (cmd.startsWith(useTags)) {
2658 opts.useTags = Boolean.parseBoolean(cmd.substring(useTags.length()));
2659 continue;
2662 final String noOfTags = "--numoftags=";
2663 if (cmd.startsWith(noOfTags)) {
2664 opts.noOfTags = Integer.parseInt(cmd.substring(noOfTags.length()));
2665 continue;
2668 final String replicas = "--replicas=";
2669 if (cmd.startsWith(replicas)) {
2670 opts.replicas = Integer.parseInt(cmd.substring(replicas.length()));
2671 continue;
2674 final String filterOutAll = "--filterAll";
2675 if (cmd.startsWith(filterOutAll)) {
2676 opts.filterAll = true;
2677 continue;
2680 final String size = "--size=";
2681 if (cmd.startsWith(size)) {
2682 opts.size = Float.parseFloat(cmd.substring(size.length()));
2683 if (opts.size <= 1.0f) throw new IllegalStateException("Size must be > 1; i.e. 1GB");
2684 continue;
2687 final String splitPolicy = "--splitPolicy=";
2688 if (cmd.startsWith(splitPolicy)) {
2689 opts.splitPolicy = cmd.substring(splitPolicy.length());
2690 continue;
2693 final String randomSleep = "--randomSleep=";
2694 if (cmd.startsWith(randomSleep)) {
2695 opts.randomSleep = Integer.parseInt(cmd.substring(randomSleep.length()));
2696 continue;
2699 final String measureAfter = "--measureAfter=";
2700 if (cmd.startsWith(measureAfter)) {
2701 opts.measureAfter = Integer.parseInt(cmd.substring(measureAfter.length()));
2702 continue;
2705 final String bloomFilter = "--bloomFilter=";
2706 if (cmd.startsWith(bloomFilter)) {
2707 opts.bloomType = BloomType.valueOf(cmd.substring(bloomFilter.length()));
2708 continue;
2711 final String blockSize = "--blockSize=";
2712 if(cmd.startsWith(blockSize) ) {
2713 opts.blockSize = Integer.parseInt(cmd.substring(blockSize.length()));
2716 final String valueSize = "--valueSize=";
2717 if (cmd.startsWith(valueSize)) {
2718 opts.valueSize = Integer.parseInt(cmd.substring(valueSize.length()));
2719 continue;
2722 final String valueRandom = "--valueRandom";
2723 if (cmd.startsWith(valueRandom)) {
2724 opts.valueRandom = true;
2725 if (opts.valueZipf) {
2726 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2728 continue;
2731 final String valueZipf = "--valueZipf";
2732 if (cmd.startsWith(valueZipf)) {
2733 opts.valueZipf = true;
2734 if (opts.valueRandom) {
2735 throw new IllegalStateException("Either valueZipf or valueRandom but not both");
2737 continue;
2740 final String period = "--period=";
2741 if (cmd.startsWith(period)) {
2742 opts.period = Integer.parseInt(cmd.substring(period.length()));
2743 continue;
2746 final String addColumns = "--addColumns=";
2747 if (cmd.startsWith(addColumns)) {
2748 opts.addColumns = Boolean.parseBoolean(cmd.substring(addColumns.length()));
2749 continue;
2752 final String inMemoryCompaction = "--inmemoryCompaction=";
2753 if (cmd.startsWith(inMemoryCompaction)) {
2754 opts.inMemoryCompaction =
2755 MemoryCompactionPolicy.valueOf(cmd.substring(inMemoryCompaction.length()));
2756 continue;
2759 final String columns = "--columns=";
2760 if (cmd.startsWith(columns)) {
2761 opts.columns = Integer.parseInt(cmd.substring(columns.length()));
2762 continue;
2765 final String families = "--families=";
2766 if (cmd.startsWith(families)) {
2767 opts.families = Integer.parseInt(cmd.substring(families.length()));
2768 continue;
2771 final String caching = "--caching=";
2772 if (cmd.startsWith(caching)) {
2773 opts.caching = Integer.parseInt(cmd.substring(caching.length()));
2774 continue;
2777 final String asyncPrefetch = "--asyncPrefetch";
2778 if (cmd.startsWith(asyncPrefetch)) {
2779 opts.asyncPrefetch = true;
2780 continue;
2783 final String cacheBlocks = "--cacheBlocks=";
2784 if (cmd.startsWith(cacheBlocks)) {
2785 opts.cacheBlocks = Boolean.parseBoolean(cmd.substring(cacheBlocks.length()));
2786 continue;
2789 final String scanReadType = "--scanReadType=";
2790 if (cmd.startsWith(scanReadType)) {
2791 opts.scanReadType =
2792 Scan.ReadType.valueOf(cmd.substring(scanReadType.length()).toUpperCase());
2793 continue;
2796 final String bufferSize = "--bufferSize=";
2797 if (cmd.startsWith(bufferSize)) {
2798 opts.bufferSize = Long.parseLong(cmd.substring(bufferSize.length()));
2799 continue;
2802 if (isCommandClass(cmd)) {
2803 opts.cmdName = cmd;
2804 try {
2805 opts.numClientThreads = Integer.parseInt(args.remove());
2806 } catch (NoSuchElementException | NumberFormatException e) {
2807 throw new IllegalArgumentException("Command " + cmd + " does not have threads number", e);
2809 opts = calculateRowsAndSize(opts);
2810 break;
2811 } else {
2812 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
2815 // Not matching any option or command.
2816 System.err.println("Error: Wrong option or command: " + cmd);
2817 args.add(cmd);
2818 break;
2820 return opts;
2823 static TestOptions calculateRowsAndSize(final TestOptions opts) {
2824 int rowsPerGB = getRowsPerGB(opts);
2825 if ((opts.getCmdName() != null
2826 && (opts.getCmdName().equals(RANDOM_READ) || opts.getCmdName().equals(RANDOM_SEEK_SCAN)))
2827 && opts.size != DEFAULT_OPTS.size
2828 && opts.perClientRunRows != DEFAULT_OPTS.perClientRunRows) {
2829 opts.totalRows = (int) opts.size * rowsPerGB;
2830 } else if (opts.size != DEFAULT_OPTS.size) {
2831 // total size in GB specified
2832 opts.totalRows = (int) opts.size * rowsPerGB;
2833 opts.perClientRunRows = opts.totalRows / opts.numClientThreads;
2834 } else {
2835 opts.totalRows = opts.perClientRunRows * opts.numClientThreads;
2836 opts.size = opts.totalRows / rowsPerGB;
2838 return opts;
2841 static int getRowsPerGB(final TestOptions opts) {
2842 return ONE_GB / ((opts.valueRandom? opts.valueSize/2: opts.valueSize) * opts.getFamilies() *
2843 opts.getColumns());
2846 @Override
2847 public int run(String[] args) throws Exception {
2848 // Process command-line args. TODO: Better cmd-line processing
2849 // (but hopefully something not as painful as cli options).
2850 int errCode = -1;
2851 if (args.length < 1) {
2852 printUsage();
2853 return errCode;
2856 try {
2857 LinkedList<String> argv = new LinkedList<>();
2858 argv.addAll(Arrays.asList(args));
2859 TestOptions opts = parseOpts(argv);
2861 // args remaining, print help and exit
2862 if (!argv.isEmpty()) {
2863 errCode = 0;
2864 printUsage();
2865 return errCode;
2868 // must run at least 1 client
2869 if (opts.numClientThreads <= 0) {
2870 throw new IllegalArgumentException("Number of clients must be > 0");
2873 // cmdName should not be null, print help and exit
2874 if (opts.cmdName == null) {
2875 printUsage();
2876 return errCode;
2879 Class<? extends TestBase> cmdClass = determineCommandClass(opts.cmdName);
2880 if (cmdClass != null) {
2881 runTest(cmdClass, opts);
2882 errCode = 0;
2885 } catch (Exception e) {
2886 e.printStackTrace();
2889 return errCode;
2892 private static boolean isCommandClass(String cmd) {
2893 return COMMANDS.containsKey(cmd);
2896 private static Class<? extends TestBase> determineCommandClass(String cmd) {
2897 CmdDescriptor descriptor = COMMANDS.get(cmd);
2898 return descriptor != null ? descriptor.getCmdClass() : null;
2901 public static void main(final String[] args) throws Exception {
2902 int res = ToolRunner.run(new PerformanceEvaluation(HBaseConfiguration.create()), args);
2903 System.exit(res);