HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-mapreduce / src / test / java / org / apache / hadoop / hbase / util / LoadTestTool.java
blob336816e2b49e5f9f5c1b9536ccfd32bf58983da0
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
17 package org.apache.hadoop.hbase.util;
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.lang.reflect.Constructor;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.atomic.AtomicReference;
27 import javax.crypto.spec.SecretKeySpec;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
31 import org.apache.hadoop.hbase.HBaseTestingUtil;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.client.Admin;
35 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
36 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.ConnectionFactory;
39 import org.apache.hadoop.hbase.client.Durability;
40 import org.apache.hadoop.hbase.client.TableDescriptor;
41 import org.apache.hadoop.hbase.io.compress.Compression;
42 import org.apache.hadoop.hbase.io.crypto.Cipher;
43 import org.apache.hadoop.hbase.io.crypto.Encryption;
44 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
45 import org.apache.hadoop.hbase.log.HBaseMarkers;
46 import org.apache.hadoop.hbase.logging.Log4jUtils;
47 import org.apache.hadoop.hbase.regionserver.BloomType;
48 import org.apache.hadoop.hbase.security.EncryptionUtil;
49 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
50 import org.apache.hadoop.hbase.security.User;
51 import org.apache.hadoop.hbase.security.access.AccessControlClient;
52 import org.apache.hadoop.hbase.security.access.Permission;
53 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
54 import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
55 import org.apache.hadoop.util.ToolRunner;
56 import org.apache.yetus.audience.InterfaceAudience;
57 import org.apache.zookeeper.ZooKeeper;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
62 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
63 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
64 import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
65 import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
66 import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
67 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
69 /**
70 * A command-line utility that reads, writes, and verifies data. Unlike
71 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
72 * and supports simultaneously writing and reading the same set of keys.
74 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
75 public class LoadTestTool extends AbstractHBaseTool {
77 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
78 private static final String COLON = ":";
80 /** Table name for the test */
81 private TableName tableName;
83 /** Column families for the test */
84 private byte[][] families;
86 /** Table name to use of not overridden on the command line */
87 protected static final String DEFAULT_TABLE_NAME = "cluster_test";
89 /** The default data size if not specified */
90 protected static final int DEFAULT_DATA_SIZE = 64;
92 /** The number of reader/writer threads if not specified */
93 protected static final int DEFAULT_NUM_THREADS = 20;
95 /** Usage string for the load option */
96 protected static final String OPT_USAGE_LOAD =
97 "<avg_cols_per_key>:<avg_data_size>" +
98 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
100 /** Usage string for the read option */
101 protected static final String OPT_USAGE_READ =
102 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
104 /** Usage string for the update option */
105 protected static final String OPT_USAGE_UPDATE =
106 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
107 + ">][:<#whether to ignore nonce collisions=0>]";
109 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
110 Arrays.toString(BloomType.values());
112 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
113 "one of " + Arrays.toString(Compression.Algorithm.values());
115 protected static final String OPT_VERBOSE = "verbose";
117 public static final String OPT_BLOOM = "bloom";
118 public static final String OPT_BLOOM_PARAM = "bloom_param";
119 public static final String OPT_COMPRESSION = "compression";
120 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
121 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
123 public static final String OPT_INMEMORY = "in_memory";
124 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
125 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
127 public static final String OPT_GENERATOR = "generator";
128 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
129 + " Any args for this class can be passed as colon separated after class name";
131 public static final String OPT_WRITER = "writer";
132 public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
134 public static final String OPT_UPDATER = "updater";
135 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
137 public static final String OPT_READER = "reader";
138 public static final String OPT_READER_USAGE = "The class for executing the read requests";
140 protected static final String OPT_KEY_WINDOW = "key_window";
141 protected static final String OPT_WRITE = "write";
142 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
143 public static final String OPT_MULTIPUT = "multiput";
144 public static final String OPT_MULTIGET = "multiget_batchsize";
145 protected static final String OPT_NUM_KEYS = "num_keys";
146 protected static final String OPT_READ = "read";
147 protected static final String OPT_START_KEY = "start_key";
148 public static final String OPT_TABLE_NAME = "tn";
149 public static final String OPT_COLUMN_FAMILIES = "families";
150 protected static final String OPT_ZK_QUORUM = "zk";
151 protected static final String OPT_ZK_PARENT_NODE = "zk_root";
152 protected static final String OPT_SKIP_INIT = "skip_init";
153 protected static final String OPT_INIT_ONLY = "init_only";
154 protected static final String NUM_TABLES = "num_tables";
155 protected static final String OPT_BATCHUPDATE = "batchupdate";
156 protected static final String OPT_UPDATE = "update";
158 public static final String OPT_ENCRYPTION = "encryption";
159 protected static final String OPT_ENCRYPTION_USAGE =
160 "Enables transparent encryption on the test table, one of " +
161 Arrays.toString(Encryption.getSupportedCiphers());
163 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
164 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
165 = "Desired number of regions per region server. Defaults to 5.";
166 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
168 public static final String OPT_REGION_REPLICATION = "region_replication";
169 protected static final String OPT_REGION_REPLICATION_USAGE =
170 "Desired number of replicas per region";
172 public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
173 protected static final String OPT_REGION_REPLICA_ID_USAGE =
174 "Region replica id to do the reads from";
176 public static final String OPT_MOB_THRESHOLD = "mob_threshold";
177 protected static final String OPT_MOB_THRESHOLD_USAGE =
178 "Desired cell size to exceed in bytes that will use the MOB write path";
180 protected static final long DEFAULT_START_KEY = 0;
182 /** This will be removed as we factor out the dependency on command line */
183 protected CommandLine cmd;
185 protected MultiThreadedWriter writerThreads = null;
186 protected MultiThreadedReader readerThreads = null;
187 protected MultiThreadedUpdater updaterThreads = null;
189 protected long startKey, endKey;
191 protected boolean isVerbose, isWrite, isRead, isUpdate;
192 protected boolean deferredLogFlush;
194 // Column family options
195 protected DataBlockEncoding dataBlockEncodingAlgo;
196 protected Compression.Algorithm compressAlgo;
197 protected BloomType bloomType;
198 private boolean inMemoryCF;
200 private User userOwner;
201 // Writer options
202 protected int numWriterThreads = DEFAULT_NUM_THREADS;
203 protected int minColsPerKey, maxColsPerKey;
204 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
205 protected boolean isMultiPut;
207 // Updater options
208 protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
209 protected int updatePercent;
210 protected boolean ignoreConflicts = false;
211 protected boolean isBatchUpdate;
213 // Reader options
214 private int numReaderThreads = DEFAULT_NUM_THREADS;
215 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
216 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
217 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
218 private int verifyPercent;
220 private int numTables = 1;
222 private String superUser;
224 private String userNames;
225 //This file is used to read authentication information in secure clusters.
226 private String authnFileName;
228 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
229 private int regionReplication = -1; // not set
230 private int regionReplicaId = -1; // not set
232 private int mobThreshold = -1; // not set
234 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
235 // console tool itself should only be used from console.
236 protected boolean isSkipInit = false;
237 protected boolean isInitOnly = false;
239 protected Cipher cipher = null;
241 protected String[] splitColonSeparated(String option,
242 int minNumCols, int maxNumCols) {
243 String optVal = cmd.getOptionValue(option);
244 String[] cols = optVal.split(COLON);
245 if (cols.length < minNumCols || cols.length > maxNumCols) {
246 throw new IllegalArgumentException("Expected at least "
247 + minNumCols + " columns but no more than " + maxNumCols +
248 " in the colon-separated value '" + optVal + "' of the " +
249 "-" + option + " option");
251 return cols;
254 protected int getNumThreads(String numThreadsStr) {
255 return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
258 public byte[][] getColumnFamilies() {
259 return families;
263 * Apply column family options such as Bloom filters, compression, and data
264 * block encoding.
266 protected void applyColumnFamilyOptions(TableName tableName,
267 byte[][] columnFamilies) throws IOException {
268 try (Connection conn = ConnectionFactory.createConnection(conf);
269 Admin admin = conn.getAdmin()) {
270 TableDescriptor tableDesc = admin.getDescriptor(tableName);
271 LOG.info("Disabling table " + tableName);
272 admin.disableTable(tableName);
273 for (byte[] cf : columnFamilies) {
274 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
275 boolean isNewCf = columnDesc == null;
276 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf ?
277 ColumnFamilyDescriptorBuilder.newBuilder(cf) :
278 ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
279 if (bloomType != null) {
280 columnDescBuilder.setBloomFilterType(bloomType);
282 if (compressAlgo != null) {
283 columnDescBuilder.setCompressionType(compressAlgo);
285 if (dataBlockEncodingAlgo != null) {
286 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
288 if (inMemoryCF) {
289 columnDescBuilder.setInMemory(inMemoryCF);
291 if (cipher != null) {
292 byte[] keyBytes = new byte[cipher.getKeyLength()];
293 Bytes.secureRandom(keyBytes);
294 columnDescBuilder.setEncryptionType(cipher.getName());
295 columnDescBuilder.setEncryptionKey(
296 EncryptionUtil.wrapKey(conf,
297 User.getCurrent().getShortName(),
298 new SecretKeySpec(keyBytes,
299 cipher.getName())));
301 if (mobThreshold >= 0) {
302 columnDescBuilder.setMobEnabled(true);
303 columnDescBuilder.setMobThreshold(mobThreshold);
306 if (isNewCf) {
307 admin.addColumnFamily(tableName, columnDescBuilder.build());
308 } else {
309 admin.modifyColumnFamily(tableName, columnDescBuilder.build());
312 LOG.info("Enabling table " + tableName);
313 admin.enableTable(tableName);
317 @Override
318 protected void addOptions() {
319 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
320 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
321 "without port numbers");
322 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
323 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
324 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
325 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
326 addOptWithArg(OPT_READ, OPT_USAGE_READ);
327 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
328 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
329 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
330 addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
331 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
332 addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
333 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
334 "to tolerate before terminating all reader threads. The default is " +
335 MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
336 addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
337 "separate gets for every column in a row");
338 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
339 "reads and writes for concurrent write/read workload. The default " +
340 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
342 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
343 "separate puts for every column in a row");
344 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
345 "separate updates for every column in a row");
346 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
347 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
348 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
349 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
350 addOptWithArg(OPT_READER, OPT_READER_USAGE);
352 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
353 addOptWithArg(OPT_START_KEY, "The first key to read/write " +
354 "(a 0-based index). The default value is " +
355 DEFAULT_START_KEY + ".");
356 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
357 + "already exists");
359 addOptWithArg(NUM_TABLES,
360 "A positive integer number. When a number n is specified, load test "
361 + "tool will load n table parallely. -tn parameter value becomes "
362 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
364 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
365 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
366 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
367 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
368 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
369 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
372 @Override
373 protected CommandLineParser newParser() {
374 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
375 // Validate in parse() to get helpful error messages instead of exploding in processOptions()
376 return new DefaultParser() {
377 @Override
378 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
379 throws ParseException {
380 CommandLine cl = super.parse(opts, args, props, stop);
382 boolean isReadWriteUpdate = cmd.hasOption(OPT_READ)
383 || cmd.hasOption(OPT_WRITE)
384 || cmd.hasOption(OPT_UPDATE);
385 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
387 if (!isInitOnly && !isReadWriteUpdate) {
388 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
389 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
392 if (isInitOnly && isReadWriteUpdate) {
393 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
394 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
397 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
398 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
401 return cl;
406 @Override
407 protected void processOptions(CommandLine cmd) {
408 this.cmd = cmd;
410 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
411 DEFAULT_TABLE_NAME));
413 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
414 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
415 families = new byte[list.length][];
416 for (int i = 0; i < list.length; i++) {
417 families[i] = Bytes.toBytes(list[i]);
419 } else {
420 families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES;
423 isVerbose = cmd.hasOption(OPT_VERBOSE);
424 isWrite = cmd.hasOption(OPT_WRITE);
425 isRead = cmd.hasOption(OPT_READ);
426 isUpdate = cmd.hasOption(OPT_UPDATE);
427 isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
428 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
430 if (!isInitOnly) {
431 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
432 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
433 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
434 Long.MAX_VALUE - startKey);
435 endKey = startKey + numKeys;
436 isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
437 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
440 parseColumnFamilyOptions(cmd);
442 if (isWrite) {
443 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
445 int colIndex = 0;
446 minColsPerKey = 1;
447 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
448 int avgColDataSize =
449 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
450 minColDataSize = avgColDataSize / 2;
451 maxColDataSize = avgColDataSize * 3 / 2;
453 if (colIndex < writeOpts.length) {
454 numWriterThreads = getNumThreads(writeOpts[colIndex++]);
457 isMultiPut = cmd.hasOption(OPT_MULTIPUT);
459 mobThreshold = -1;
460 if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
461 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
464 System.out.println("Multi-puts: " + isMultiPut);
465 System.out.println("Columns per key: " + minColsPerKey + ".."
466 + maxColsPerKey);
467 System.out.println("Data size per column: " + minColDataSize + ".."
468 + maxColDataSize);
471 if (isUpdate) {
472 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
473 int colIndex = 0;
474 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
475 if (colIndex < mutateOpts.length) {
476 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
478 if (colIndex < mutateOpts.length) {
479 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
482 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
484 System.out.println("Batch updates: " + isBatchUpdate);
485 System.out.println("Percent of keys to update: " + updatePercent);
486 System.out.println("Updater threads: " + numUpdaterThreads);
487 System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
490 if (isRead) {
491 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
492 int colIndex = 0;
493 verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
494 if (colIndex < readOpts.length) {
495 numReaderThreads = getNumThreads(readOpts[colIndex++]);
498 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
499 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
500 0, Integer.MAX_VALUE);
503 if (cmd.hasOption(OPT_KEY_WINDOW)) {
504 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
505 0, Integer.MAX_VALUE);
508 if (cmd.hasOption(OPT_MULTIGET)) {
509 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
510 0, Integer.MAX_VALUE);
513 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
514 System.out.println("Percent of keys to verify: " + verifyPercent);
515 System.out.println("Reader threads: " + numReaderThreads);
518 numTables = 1;
519 if (cmd.hasOption(NUM_TABLES)) {
520 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
523 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
524 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
525 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
528 regionReplication = 1;
529 if (cmd.hasOption(OPT_REGION_REPLICATION)) {
530 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
533 regionReplicaId = -1;
534 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
535 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
539 private void parseColumnFamilyOptions(CommandLine cmd) {
540 String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING);
541 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
542 DataBlockEncoding.valueOf(dataBlockEncodingStr);
544 String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
545 compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
546 Compression.Algorithm.valueOf(compressStr);
548 String bloomStr = cmd.getOptionValue(OPT_BLOOM);
549 bloomType = bloomStr == null ? BloomType.ROW :
550 BloomType.valueOf(bloomStr);
552 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
553 if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
554 LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
555 } else {
556 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
560 inMemoryCF = cmd.hasOption(OPT_INMEMORY);
561 if (cmd.hasOption(OPT_ENCRYPTION)) {
562 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
567 public void initTestTable() throws IOException {
568 Durability durability = Durability.USE_DEFAULT;
569 if (deferredLogFlush) {
570 durability = Durability.ASYNC_WAL;
573 HBaseTestingUtil.createPreSplitLoadTestTable(conf, tableName,
574 getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
575 regionReplication, durability);
576 applyColumnFamilyOptions(tableName, getColumnFamilies());
579 @Override
580 protected int doWork() throws IOException {
581 if (!isVerbose) {
582 Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN");
584 if (numTables > 1) {
585 return parallelLoadTables();
586 } else {
587 return loadTable();
591 protected int loadTable() throws IOException {
592 if (cmd.hasOption(OPT_ZK_QUORUM)) {
593 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
595 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
596 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
599 if (isInitOnly) {
600 LOG.info("Initializing only; no reads or writes");
601 initTestTable();
602 return 0;
605 if (!isSkipInit) {
606 initTestTable();
608 LoadTestDataGenerator dataGen = null;
609 if (cmd.hasOption(OPT_GENERATOR)) {
610 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
611 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
612 String[] args;
613 if (dataGen instanceof LoadTestDataGeneratorWithACL) {
614 LOG.info("Using LoadTestDataGeneratorWithACL");
615 if (User.isHBaseSecurityEnabled(conf)) {
616 LOG.info("Security is enabled");
617 authnFileName = clazzAndArgs[1];
618 superUser = clazzAndArgs[2];
619 userNames = clazzAndArgs[3];
620 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
621 Properties authConfig = new Properties();
622 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
623 try {
624 addAuthInfoToConf(authConfig, conf, superUser, userNames);
625 } catch (IOException exp) {
626 LOG.error(exp.toString(), exp);
627 return EXIT_FAILURE;
629 userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser));
630 } else {
631 superUser = clazzAndArgs[1];
632 userNames = clazzAndArgs[2];
633 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
634 userOwner = User.createUserForTesting(conf, superUser, new String[0]);
636 } else {
637 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
638 clazzAndArgs.length);
640 dataGen.initialize(args);
641 } else {
642 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
643 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
644 minColsPerKey, maxColsPerKey, families);
647 if (userOwner != null) {
648 LOG.info("Granting permissions for user " + userOwner.getShortName());
649 Permission.Action[] actions = {
650 Permission.Action.ADMIN, Permission.Action.CREATE,
651 Permission.Action.READ, Permission.Action.WRITE };
652 try {
653 AccessControlClient.grant(ConnectionFactory.createConnection(conf),
654 tableName, userOwner.getShortName(), null, null, actions);
655 } catch (Throwable e) {
656 LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " +
657 userOwner.getShortName(), e);
658 return EXIT_FAILURE;
662 if (userNames != null) {
663 // This will be comma separated list of expressions.
664 String users[] = userNames.split(",");
665 User user = null;
666 for (String userStr : users) {
667 if (User.isHBaseSecurityEnabled(conf)) {
668 user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr));
669 } else {
670 user = User.createUserForTesting(conf, userStr, new String[0]);
675 if (isWrite) {
676 if (userOwner != null) {
677 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
678 } else {
679 String writerClass = null;
680 if (cmd.hasOption(OPT_WRITER)) {
681 writerClass = cmd.getOptionValue(OPT_WRITER);
682 } else {
683 writerClass = MultiThreadedWriter.class.getCanonicalName();
686 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
688 writerThreads.setMultiPut(isMultiPut);
691 if (isUpdate) {
692 if (userOwner != null) {
693 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
694 userOwner, userNames);
695 } else {
696 String updaterClass = null;
697 if (cmd.hasOption(OPT_UPDATER)) {
698 updaterClass = cmd.getOptionValue(OPT_UPDATER);
699 } else {
700 updaterClass = MultiThreadedUpdater.class.getCanonicalName();
702 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
704 updaterThreads.setBatchUpdate(isBatchUpdate);
705 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
708 if (isRead) {
709 if (userOwner != null) {
710 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
711 userNames);
712 } else {
713 String readerClass = null;
714 if (cmd.hasOption(OPT_READER)) {
715 readerClass = cmd.getOptionValue(OPT_READER);
716 } else {
717 readerClass = MultiThreadedReader.class.getCanonicalName();
719 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
721 readerThreads.setMaxErrors(maxReadErrors);
722 readerThreads.setKeyWindow(keyWindow);
723 readerThreads.setMultiGetBatchSize(multiGetBatchSize);
724 readerThreads.setRegionReplicaId(regionReplicaId);
727 if (isUpdate && isWrite) {
728 LOG.info("Concurrent write/update workload: making updaters aware of the " +
729 "write point");
730 updaterThreads.linkToWriter(writerThreads);
733 if (isRead && (isUpdate || isWrite)) {
734 LOG.info("Concurrent write/read workload: making readers aware of the " +
735 "write point");
736 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
739 if (isWrite) {
740 System.out.println("Starting to write data...");
741 writerThreads.start(startKey, endKey, numWriterThreads);
744 if (isUpdate) {
745 LOG.info("Starting to mutate data...");
746 System.out.println("Starting to mutate data...");
747 // TODO : currently append and increment operations not tested with tags
748 // Will update this after it is done
749 updaterThreads.start(startKey, endKey, numUpdaterThreads);
752 if (isRead) {
753 System.out.println("Starting to read data...");
754 readerThreads.start(startKey, endKey, numReaderThreads);
757 if (isWrite) {
758 writerThreads.waitForFinish();
761 if (isUpdate) {
762 updaterThreads.waitForFinish();
765 if (isRead) {
766 readerThreads.waitForFinish();
769 boolean success = true;
770 if (isWrite) {
771 success = success && writerThreads.getNumWriteFailures() == 0;
773 if (isUpdate) {
774 success = success && updaterThreads.getNumWriteFailures() == 0;
776 if (isRead) {
777 success = success && readerThreads.getNumReadErrors() == 0
778 && readerThreads.getNumReadFailures() == 0;
780 return success ? EXIT_SUCCESS : EXIT_FAILURE;
783 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
784 try {
785 Class<?> clazz = Class.forName(clazzName);
786 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
787 byte[][].class);
788 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
789 minColsPerKey, maxColsPerKey, families);
790 } catch (Exception e) {
791 throw new IOException(e);
795 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
796 , LoadTestDataGenerator dataGen) throws IOException {
797 try {
798 Class<?> clazz = Class.forName(clazzName);
799 Constructor<?> constructor = clazz.getConstructor(
800 LoadTestDataGenerator.class, Configuration.class, TableName.class);
801 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
802 } catch (Exception e) {
803 throw new IOException(e);
807 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
808 , LoadTestDataGenerator dataGen) throws IOException {
809 try {
810 Class<?> clazz = Class.forName(clazzName);
811 Constructor<?> constructor = clazz.getConstructor(
812 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
813 return (MultiThreadedUpdater) constructor.newInstance(
814 dataGen, conf, tableName, updatePercent);
815 } catch (Exception e) {
816 throw new IOException(e);
820 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
821 , LoadTestDataGenerator dataGen) throws IOException {
822 try {
823 Class<?> clazz = Class.forName(clazzName);
824 Constructor<?> constructor = clazz.getConstructor(
825 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
826 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
827 } catch (Exception e) {
828 throw new IOException(e);
832 public static void main(String[] args) {
833 new LoadTestTool().doStaticMain(args);
837 * When NUM_TABLES is specified, the function starts multiple worker threads
838 * which individually start a LoadTestTool instance to load a table. Each
839 * table name is in format &lt;tn>_&lt;index>. For example, "-tn test -num_tables 2"
840 * , table names will be "test_1", "test_2"
842 * @throws IOException if one of the load tasks is unable to complete
844 private int parallelLoadTables()
845 throws IOException {
846 // create new command args
847 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
848 String[] newArgs = null;
849 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
850 newArgs = new String[cmdLineArgs.length + 2];
851 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
852 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
853 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
854 } else {
855 newArgs = cmdLineArgs;
858 int tableNameValueIndex = -1;
859 for (int j = 0; j < newArgs.length; j++) {
860 if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
861 tableNameValueIndex = j + 1;
862 } else if (newArgs[j].endsWith(NUM_TABLES)) {
863 // change NUM_TABLES to 1 so that each worker loads one table
864 newArgs[j + 1] = "1";
868 // starting to load multiple tables
869 List<WorkerThread> workers = new ArrayList<>();
870 for (int i = 0; i < numTables; i++) {
871 String[] workerArgs = newArgs.clone();
872 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
873 WorkerThread worker = new WorkerThread(i, workerArgs);
874 workers.add(worker);
875 LOG.info(worker + " starting");
876 worker.start();
879 // wait for all workers finish
880 LOG.info("Waiting for worker threads to finish");
881 for (WorkerThread t : workers) {
882 try {
883 t.join();
884 } catch (InterruptedException ie) {
885 IOException iie = new InterruptedIOException();
886 iie.initCause(ie);
887 throw iie;
889 checkForErrors();
892 return EXIT_SUCCESS;
895 // If an exception is thrown by one of worker threads, it will be
896 // stored here.
897 protected AtomicReference<Throwable> thrown = new AtomicReference<>();
899 private void workerThreadError(Throwable t) {
900 thrown.compareAndSet(null, t);
904 * Check for errors in the writer threads. If any is found, rethrow it.
906 private void checkForErrors() throws IOException {
907 Throwable thrown = this.thrown.get();
908 if (thrown == null) return;
909 if (thrown instanceof IOException) {
910 throw (IOException) thrown;
911 } else {
912 throw new RuntimeException(thrown);
916 class WorkerThread extends Thread {
917 private String[] workerArgs;
919 WorkerThread(int i, String[] args) {
920 super("WorkerThread-" + i);
921 workerArgs = args;
924 @Override
925 public void run() {
926 try {
927 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
928 if (ret != 0) {
929 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
931 } catch (Exception ex) {
932 LOG.error("Error in worker thread", ex);
933 workerThreadError(ex);
938 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
939 String userList) throws IOException {
940 List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
941 users.add(owner);
942 for (String user : users) {
943 String keyTabFileConfKey = "hbase." + user + ".keytab.file";
944 String principalConfKey = "hbase." + user + ".kerberos.principal";
945 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
946 throw new IOException("Authentication configs missing for user : " + user);
949 for (String key : authConfig.stringPropertyNames()) {
950 conf.set(key, authConfig.getProperty(key));
952 LOG.debug("Added authentication properties to config successfully.");