HBASE-26787 TestRegionReplicaReplicationError should inject error in replicateToRepli...
[hbase.git] / hbase-mapreduce / src / test / java / org / apache / hadoop / hbase / util / LoadTestTool.java
blobc2f22f63a25fd834b68ce7c407bdedecf5606d2b
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
17 package org.apache.hadoop.hbase.util;
19 import java.io.IOException;
20 import java.io.InterruptedIOException;
21 import java.lang.reflect.Constructor;
22 import java.security.SecureRandom;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.concurrent.atomic.AtomicReference;
28 import javax.crypto.spec.SecretKeySpec;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
32 import org.apache.hadoop.hbase.HBaseTestingUtil;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
37 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
38 import org.apache.hadoop.hbase.client.Connection;
39 import org.apache.hadoop.hbase.client.ConnectionFactory;
40 import org.apache.hadoop.hbase.client.Durability;
41 import org.apache.hadoop.hbase.client.TableDescriptor;
42 import org.apache.hadoop.hbase.io.compress.Compression;
43 import org.apache.hadoop.hbase.io.crypto.Cipher;
44 import org.apache.hadoop.hbase.io.crypto.Encryption;
45 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
46 import org.apache.hadoop.hbase.log.HBaseMarkers;
47 import org.apache.hadoop.hbase.logging.Log4jUtils;
48 import org.apache.hadoop.hbase.regionserver.BloomType;
49 import org.apache.hadoop.hbase.security.EncryptionUtil;
50 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
51 import org.apache.hadoop.hbase.security.User;
52 import org.apache.hadoop.hbase.security.access.AccessControlClient;
53 import org.apache.hadoop.hbase.security.access.Permission;
54 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
55 import org.apache.hadoop.hbase.util.test.LoadTestDataGeneratorWithACL;
56 import org.apache.hadoop.util.ToolRunner;
57 import org.apache.yetus.audience.InterfaceAudience;
58 import org.apache.zookeeper.ZooKeeper;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
62 import org.apache.hbase.thirdparty.org.apache.commons.cli.AlreadySelectedException;
63 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
64 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
65 import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
66 import org.apache.hbase.thirdparty.org.apache.commons.cli.MissingOptionException;
67 import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
68 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
70 /**
71 * A command-line utility that reads, writes, and verifies data. Unlike
72 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
73 * and supports simultaneously writing and reading the same set of keys.
75 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
76 public class LoadTestTool extends AbstractHBaseTool {
78 private static final Logger LOG = LoggerFactory.getLogger(LoadTestTool.class);
79 private static final String COLON = ":";
81 /** Table name for the test */
82 private TableName tableName;
84 /** Column families for the test */
85 private byte[][] families;
87 /** Table name to use of not overridden on the command line */
88 protected static final String DEFAULT_TABLE_NAME = "cluster_test";
90 /** The default data size if not specified */
91 protected static final int DEFAULT_DATA_SIZE = 64;
93 /** The number of reader/writer threads if not specified */
94 protected static final int DEFAULT_NUM_THREADS = 20;
96 /** Usage string for the load option */
97 protected static final String OPT_USAGE_LOAD =
98 "<avg_cols_per_key>:<avg_data_size>" +
99 "[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
101 /** Usage string for the read option */
102 protected static final String OPT_USAGE_READ =
103 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
105 /** Usage string for the update option */
106 protected static final String OPT_USAGE_UPDATE =
107 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108 + ">][:<#whether to ignore nonce collisions=0>]";
110 protected static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
111 Arrays.toString(BloomType.values());
113 protected static final String OPT_USAGE_COMPRESSION = "Compression type, " +
114 "one of " + Arrays.toString(Compression.Algorithm.values());
116 protected static final String OPT_VERBOSE = "verbose";
118 public static final String OPT_BLOOM = "bloom";
119 public static final String OPT_BLOOM_PARAM = "bloom_param";
120 public static final String OPT_COMPRESSION = "compression";
121 public static final String OPT_DEFERRED_LOG_FLUSH = "deferredlogflush";
122 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE = "Enable deferred log flush.";
124 public static final String OPT_INMEMORY = "in_memory";
125 public static final String OPT_USAGE_IN_MEMORY = "Tries to keep the HFiles of the CF " +
126 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
128 public static final String OPT_GENERATOR = "generator";
129 public static final String OPT_GENERATOR_USAGE = "The class which generates load for the tool."
130 + " Any args for this class can be passed as colon separated after class name";
132 public static final String OPT_WRITER = "writer";
133 public static final String OPT_WRITER_USAGE = "The class for executing the write requests";
135 public static final String OPT_UPDATER = "updater";
136 public static final String OPT_UPDATER_USAGE = "The class for executing the update requests";
138 public static final String OPT_READER = "reader";
139 public static final String OPT_READER_USAGE = "The class for executing the read requests";
141 protected static final String OPT_KEY_WINDOW = "key_window";
142 protected static final String OPT_WRITE = "write";
143 protected static final String OPT_MAX_READ_ERRORS = "max_read_errors";
144 public static final String OPT_MULTIPUT = "multiput";
145 public static final String OPT_MULTIGET = "multiget_batchsize";
146 protected static final String OPT_NUM_KEYS = "num_keys";
147 protected static final String OPT_READ = "read";
148 protected static final String OPT_START_KEY = "start_key";
149 public static final String OPT_TABLE_NAME = "tn";
150 public static final String OPT_COLUMN_FAMILIES = "families";
151 protected static final String OPT_ZK_QUORUM = "zk";
152 protected static final String OPT_ZK_PARENT_NODE = "zk_root";
153 protected static final String OPT_SKIP_INIT = "skip_init";
154 protected static final String OPT_INIT_ONLY = "init_only";
155 protected static final String NUM_TABLES = "num_tables";
156 protected static final String OPT_BATCHUPDATE = "batchupdate";
157 protected static final String OPT_UPDATE = "update";
159 public static final String OPT_ENCRYPTION = "encryption";
160 protected static final String OPT_ENCRYPTION_USAGE =
161 "Enables transparent encryption on the test table, one of " +
162 Arrays.toString(Encryption.getSupportedCiphers());
164 public static final String OPT_NUM_REGIONS_PER_SERVER = "num_regions_per_server";
165 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
166 = "Desired number of regions per region server. Defaults to 5.";
167 public static int DEFAULT_NUM_REGIONS_PER_SERVER = 5;
169 public static final String OPT_REGION_REPLICATION = "region_replication";
170 protected static final String OPT_REGION_REPLICATION_USAGE =
171 "Desired number of replicas per region";
173 public static final String OPT_REGION_REPLICA_ID = "region_replica_id";
174 protected static final String OPT_REGION_REPLICA_ID_USAGE =
175 "Region replica id to do the reads from";
177 public static final String OPT_MOB_THRESHOLD = "mob_threshold";
178 protected static final String OPT_MOB_THRESHOLD_USAGE =
179 "Desired cell size to exceed in bytes that will use the MOB write path";
181 protected static final long DEFAULT_START_KEY = 0;
183 /** This will be removed as we factor out the dependency on command line */
184 protected CommandLine cmd;
186 protected MultiThreadedWriter writerThreads = null;
187 protected MultiThreadedReader readerThreads = null;
188 protected MultiThreadedUpdater updaterThreads = null;
190 protected long startKey, endKey;
192 protected boolean isVerbose, isWrite, isRead, isUpdate;
193 protected boolean deferredLogFlush;
195 // Column family options
196 protected DataBlockEncoding dataBlockEncodingAlgo;
197 protected Compression.Algorithm compressAlgo;
198 protected BloomType bloomType;
199 private boolean inMemoryCF;
201 private User userOwner;
202 // Writer options
203 protected int numWriterThreads = DEFAULT_NUM_THREADS;
204 protected int minColsPerKey, maxColsPerKey;
205 protected int minColDataSize = DEFAULT_DATA_SIZE, maxColDataSize = DEFAULT_DATA_SIZE;
206 protected boolean isMultiPut;
208 // Updater options
209 protected int numUpdaterThreads = DEFAULT_NUM_THREADS;
210 protected int updatePercent;
211 protected boolean ignoreConflicts = false;
212 protected boolean isBatchUpdate;
214 // Reader options
215 private int numReaderThreads = DEFAULT_NUM_THREADS;
216 private int keyWindow = MultiThreadedReader.DEFAULT_KEY_WINDOW;
217 private int multiGetBatchSize = MultiThreadedReader.DEFAULT_BATCH_SIZE;
218 private int maxReadErrors = MultiThreadedReader.DEFAULT_MAX_ERRORS;
219 private int verifyPercent;
221 private int numTables = 1;
223 private String superUser;
225 private String userNames;
226 //This file is used to read authentication information in secure clusters.
227 private String authnFileName;
229 private int numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
230 private int regionReplication = -1; // not set
231 private int regionReplicaId = -1; // not set
233 private int mobThreshold = -1; // not set
235 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
236 // console tool itself should only be used from console.
237 protected boolean isSkipInit = false;
238 protected boolean isInitOnly = false;
240 protected Cipher cipher = null;
242 protected String[] splitColonSeparated(String option,
243 int minNumCols, int maxNumCols) {
244 String optVal = cmd.getOptionValue(option);
245 String[] cols = optVal.split(COLON);
246 if (cols.length < minNumCols || cols.length > maxNumCols) {
247 throw new IllegalArgumentException("Expected at least "
248 + minNumCols + " columns but no more than " + maxNumCols +
249 " in the colon-separated value '" + optVal + "' of the " +
250 "-" + option + " option");
252 return cols;
255 protected int getNumThreads(String numThreadsStr) {
256 return parseInt(numThreadsStr, 1, Short.MAX_VALUE);
259 public byte[][] getColumnFamilies() {
260 return families;
264 * Apply column family options such as Bloom filters, compression, and data
265 * block encoding.
267 protected void applyColumnFamilyOptions(TableName tableName,
268 byte[][] columnFamilies) throws IOException {
269 try (Connection conn = ConnectionFactory.createConnection(conf);
270 Admin admin = conn.getAdmin()) {
271 TableDescriptor tableDesc = admin.getDescriptor(tableName);
272 LOG.info("Disabling table " + tableName);
273 admin.disableTable(tableName);
274 for (byte[] cf : columnFamilies) {
275 ColumnFamilyDescriptor columnDesc = tableDesc.getColumnFamily(cf);
276 boolean isNewCf = columnDesc == null;
277 ColumnFamilyDescriptorBuilder columnDescBuilder = isNewCf ?
278 ColumnFamilyDescriptorBuilder.newBuilder(cf) :
279 ColumnFamilyDescriptorBuilder.newBuilder(columnDesc);
280 if (bloomType != null) {
281 columnDescBuilder.setBloomFilterType(bloomType);
283 if (compressAlgo != null) {
284 columnDescBuilder.setCompressionType(compressAlgo);
286 if (dataBlockEncodingAlgo != null) {
287 columnDescBuilder.setDataBlockEncoding(dataBlockEncodingAlgo);
289 if (inMemoryCF) {
290 columnDescBuilder.setInMemory(inMemoryCF);
292 if (cipher != null) {
293 byte[] keyBytes = new byte[cipher.getKeyLength()];
294 new SecureRandom().nextBytes(keyBytes);
295 columnDescBuilder.setEncryptionType(cipher.getName());
296 columnDescBuilder.setEncryptionKey(
297 EncryptionUtil.wrapKey(conf,
298 User.getCurrent().getShortName(),
299 new SecretKeySpec(keyBytes,
300 cipher.getName())));
302 if (mobThreshold >= 0) {
303 columnDescBuilder.setMobEnabled(true);
304 columnDescBuilder.setMobThreshold(mobThreshold);
307 if (isNewCf) {
308 admin.addColumnFamily(tableName, columnDescBuilder.build());
309 } else {
310 admin.modifyColumnFamily(tableName, columnDescBuilder.build());
313 LOG.info("Enabling table " + tableName);
314 admin.enableTable(tableName);
318 @Override
319 protected void addOptions() {
320 addOptNoArg("v", OPT_VERBOSE, "Will display a full readout of logs, including ZooKeeper");
321 addOptWithArg(OPT_ZK_QUORUM, "ZK quorum as comma-separated host names " +
322 "without port numbers");
323 addOptWithArg(OPT_ZK_PARENT_NODE, "name of parent znode in zookeeper");
324 addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
325 addOptWithArg(OPT_COLUMN_FAMILIES, "The name of the column families to use separated by comma");
326 addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
327 addOptWithArg(OPT_READ, OPT_USAGE_READ);
328 addOptWithArg(OPT_UPDATE, OPT_USAGE_UPDATE);
329 addOptNoArg(OPT_INIT_ONLY, "Initialize the test table only, don't do any loading");
330 addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
331 addOptWithArg(OPT_BLOOM_PARAM, "the parameter of bloom filter type");
332 addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
333 addOptWithArg(HFileTestUtil.OPT_DATA_BLOCK_ENCODING, HFileTestUtil.OPT_DATA_BLOCK_ENCODING_USAGE);
334 addOptWithArg(OPT_MAX_READ_ERRORS, "The maximum number of read errors " +
335 "to tolerate before terminating all reader threads. The default is " +
336 MultiThreadedReader.DEFAULT_MAX_ERRORS + ".");
337 addOptWithArg(OPT_MULTIGET, "Whether to use multi-gets as opposed to " +
338 "separate gets for every column in a row");
339 addOptWithArg(OPT_KEY_WINDOW, "The 'key window' to maintain between " +
340 "reads and writes for concurrent write/read workload. The default " +
341 "is " + MultiThreadedReader.DEFAULT_KEY_WINDOW + ".");
343 addOptNoArg(OPT_MULTIPUT, "Whether to use multi-puts as opposed to " +
344 "separate puts for every column in a row");
345 addOptNoArg(OPT_BATCHUPDATE, "Whether to use batch as opposed to " +
346 "separate updates for every column in a row");
347 addOptNoArg(OPT_INMEMORY, OPT_USAGE_IN_MEMORY);
348 addOptWithArg(OPT_GENERATOR, OPT_GENERATOR_USAGE);
349 addOptWithArg(OPT_WRITER, OPT_WRITER_USAGE);
350 addOptWithArg(OPT_UPDATER, OPT_UPDATER_USAGE);
351 addOptWithArg(OPT_READER, OPT_READER_USAGE);
353 addOptWithArg(OPT_NUM_KEYS, "The number of keys to read/write");
354 addOptWithArg(OPT_START_KEY, "The first key to read/write " +
355 "(a 0-based index). The default value is " +
356 DEFAULT_START_KEY + ".");
357 addOptNoArg(OPT_SKIP_INIT, "Skip the initialization; assume test table "
358 + "already exists");
360 addOptWithArg(NUM_TABLES,
361 "A positive integer number. When a number n is specified, load test "
362 + "tool will load n table parallely. -tn parameter value becomes "
363 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
365 addOptWithArg(OPT_ENCRYPTION, OPT_ENCRYPTION_USAGE);
366 addOptNoArg(OPT_DEFERRED_LOG_FLUSH, OPT_DEFERRED_LOG_FLUSH_USAGE);
367 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER, OPT_NUM_REGIONS_PER_SERVER_USAGE);
368 addOptWithArg(OPT_REGION_REPLICATION, OPT_REGION_REPLICATION_USAGE);
369 addOptWithArg(OPT_REGION_REPLICA_ID, OPT_REGION_REPLICA_ID_USAGE);
370 addOptWithArg(OPT_MOB_THRESHOLD, OPT_MOB_THRESHOLD_USAGE);
373 @Override
374 protected CommandLineParser newParser() {
375 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
376 // Validate in parse() to get helpful error messages instead of exploding in processOptions()
377 return new DefaultParser() {
378 @Override
379 public CommandLine parse(Options opts, String[] args, Properties props, boolean stop)
380 throws ParseException {
381 CommandLine cl = super.parse(opts, args, props, stop);
383 boolean isReadWriteUpdate = cmd.hasOption(OPT_READ)
384 || cmd.hasOption(OPT_WRITE)
385 || cmd.hasOption(OPT_UPDATE);
386 boolean isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
388 if (!isInitOnly && !isReadWriteUpdate) {
389 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
390 + " or at least one of -" + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
393 if (isInitOnly && isReadWriteUpdate) {
394 throw new AlreadySelectedException(OPT_INIT_ONLY + " cannot be specified with any of -"
395 + OPT_READ + ", -" + OPT_WRITE + ", -" + OPT_UPDATE);
398 if (isReadWriteUpdate && !cmd.hasOption(OPT_NUM_KEYS)) {
399 throw new MissingOptionException(OPT_NUM_KEYS + " must be specified in read/write mode.");
402 return cl;
407 @Override
408 protected void processOptions(CommandLine cmd) {
409 this.cmd = cmd;
411 tableName = TableName.valueOf(cmd.getOptionValue(OPT_TABLE_NAME,
412 DEFAULT_TABLE_NAME));
414 if (cmd.hasOption(OPT_COLUMN_FAMILIES)) {
415 String[] list = cmd.getOptionValue(OPT_COLUMN_FAMILIES).split(",");
416 families = new byte[list.length][];
417 for (int i = 0; i < list.length; i++) {
418 families[i] = Bytes.toBytes(list[i]);
420 } else {
421 families = HFileTestUtil.DEFAULT_COLUMN_FAMILIES;
424 isVerbose = cmd.hasOption(OPT_VERBOSE);
425 isWrite = cmd.hasOption(OPT_WRITE);
426 isRead = cmd.hasOption(OPT_READ);
427 isUpdate = cmd.hasOption(OPT_UPDATE);
428 isInitOnly = cmd.hasOption(OPT_INIT_ONLY);
429 deferredLogFlush = cmd.hasOption(OPT_DEFERRED_LOG_FLUSH);
431 if (!isInitOnly) {
432 startKey = parseLong(cmd.getOptionValue(OPT_START_KEY,
433 String.valueOf(DEFAULT_START_KEY)), 0, Long.MAX_VALUE);
434 long numKeys = parseLong(cmd.getOptionValue(OPT_NUM_KEYS), 1,
435 Long.MAX_VALUE - startKey);
436 endKey = startKey + numKeys;
437 isSkipInit = cmd.hasOption(OPT_SKIP_INIT);
438 System.out.println("Key range: [" + startKey + ".." + (endKey - 1) + "]");
441 parseColumnFamilyOptions(cmd);
443 if (isWrite) {
444 String[] writeOpts = splitColonSeparated(OPT_WRITE, 2, 3);
446 int colIndex = 0;
447 minColsPerKey = 1;
448 maxColsPerKey = 2 * Integer.parseInt(writeOpts[colIndex++]);
449 int avgColDataSize =
450 parseInt(writeOpts[colIndex++], 1, Integer.MAX_VALUE);
451 minColDataSize = avgColDataSize / 2;
452 maxColDataSize = avgColDataSize * 3 / 2;
454 if (colIndex < writeOpts.length) {
455 numWriterThreads = getNumThreads(writeOpts[colIndex++]);
458 isMultiPut = cmd.hasOption(OPT_MULTIPUT);
460 mobThreshold = -1;
461 if (cmd.hasOption(OPT_MOB_THRESHOLD)) {
462 mobThreshold = Integer.parseInt(cmd.getOptionValue(OPT_MOB_THRESHOLD));
465 System.out.println("Multi-puts: " + isMultiPut);
466 System.out.println("Columns per key: " + minColsPerKey + ".."
467 + maxColsPerKey);
468 System.out.println("Data size per column: " + minColDataSize + ".."
469 + maxColDataSize);
472 if (isUpdate) {
473 String[] mutateOpts = splitColonSeparated(OPT_UPDATE, 1, 3);
474 int colIndex = 0;
475 updatePercent = parseInt(mutateOpts[colIndex++], 0, 100);
476 if (colIndex < mutateOpts.length) {
477 numUpdaterThreads = getNumThreads(mutateOpts[colIndex++]);
479 if (colIndex < mutateOpts.length) {
480 ignoreConflicts = parseInt(mutateOpts[colIndex++], 0, 1) == 1;
483 isBatchUpdate = cmd.hasOption(OPT_BATCHUPDATE);
485 System.out.println("Batch updates: " + isBatchUpdate);
486 System.out.println("Percent of keys to update: " + updatePercent);
487 System.out.println("Updater threads: " + numUpdaterThreads);
488 System.out.println("Ignore nonce conflicts: " + ignoreConflicts);
491 if (isRead) {
492 String[] readOpts = splitColonSeparated(OPT_READ, 1, 2);
493 int colIndex = 0;
494 verifyPercent = parseInt(readOpts[colIndex++], 0, 100);
495 if (colIndex < readOpts.length) {
496 numReaderThreads = getNumThreads(readOpts[colIndex++]);
499 if (cmd.hasOption(OPT_MAX_READ_ERRORS)) {
500 maxReadErrors = parseInt(cmd.getOptionValue(OPT_MAX_READ_ERRORS),
501 0, Integer.MAX_VALUE);
504 if (cmd.hasOption(OPT_KEY_WINDOW)) {
505 keyWindow = parseInt(cmd.getOptionValue(OPT_KEY_WINDOW),
506 0, Integer.MAX_VALUE);
509 if (cmd.hasOption(OPT_MULTIGET)) {
510 multiGetBatchSize = parseInt(cmd.getOptionValue(OPT_MULTIGET),
511 0, Integer.MAX_VALUE);
514 System.out.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize);
515 System.out.println("Percent of keys to verify: " + verifyPercent);
516 System.out.println("Reader threads: " + numReaderThreads);
519 numTables = 1;
520 if (cmd.hasOption(NUM_TABLES)) {
521 numTables = parseInt(cmd.getOptionValue(NUM_TABLES), 1, Short.MAX_VALUE);
524 numRegionsPerServer = DEFAULT_NUM_REGIONS_PER_SERVER;
525 if (cmd.hasOption(OPT_NUM_REGIONS_PER_SERVER)) {
526 numRegionsPerServer = Integer.parseInt(cmd.getOptionValue(OPT_NUM_REGIONS_PER_SERVER));
529 regionReplication = 1;
530 if (cmd.hasOption(OPT_REGION_REPLICATION)) {
531 regionReplication = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICATION));
534 regionReplicaId = -1;
535 if (cmd.hasOption(OPT_REGION_REPLICA_ID)) {
536 regionReplicaId = Integer.parseInt(cmd.getOptionValue(OPT_REGION_REPLICA_ID));
540 private void parseColumnFamilyOptions(CommandLine cmd) {
541 String dataBlockEncodingStr = cmd.getOptionValue(HFileTestUtil.OPT_DATA_BLOCK_ENCODING);
542 dataBlockEncodingAlgo = dataBlockEncodingStr == null ? null :
543 DataBlockEncoding.valueOf(dataBlockEncodingStr);
545 String compressStr = cmd.getOptionValue(OPT_COMPRESSION);
546 compressAlgo = compressStr == null ? Compression.Algorithm.NONE :
547 Compression.Algorithm.valueOf(compressStr);
549 String bloomStr = cmd.getOptionValue(OPT_BLOOM);
550 bloomType = bloomStr == null ? BloomType.ROW :
551 BloomType.valueOf(bloomStr);
553 if (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH) {
554 if (!cmd.hasOption(OPT_BLOOM_PARAM)) {
555 LOG.error("the parameter of bloom filter {} is not specified", bloomType.name());
556 } else {
557 conf.set(BloomFilterUtil.PREFIX_LENGTH_KEY, cmd.getOptionValue(OPT_BLOOM_PARAM));
561 inMemoryCF = cmd.hasOption(OPT_INMEMORY);
562 if (cmd.hasOption(OPT_ENCRYPTION)) {
563 cipher = Encryption.getCipher(conf, cmd.getOptionValue(OPT_ENCRYPTION));
568 public void initTestTable() throws IOException {
569 Durability durability = Durability.USE_DEFAULT;
570 if (deferredLogFlush) {
571 durability = Durability.ASYNC_WAL;
574 HBaseTestingUtil.createPreSplitLoadTestTable(conf, tableName,
575 getColumnFamilies(), compressAlgo, dataBlockEncodingAlgo, numRegionsPerServer,
576 regionReplication, durability);
577 applyColumnFamilyOptions(tableName, getColumnFamilies());
580 @Override
581 protected int doWork() throws IOException {
582 if (!isVerbose) {
583 Log4jUtils.setLogLevel(ZooKeeper.class.getName(), "WARN");
585 if (numTables > 1) {
586 return parallelLoadTables();
587 } else {
588 return loadTable();
592 protected int loadTable() throws IOException {
593 if (cmd.hasOption(OPT_ZK_QUORUM)) {
594 conf.set(HConstants.ZOOKEEPER_QUORUM, cmd.getOptionValue(OPT_ZK_QUORUM));
596 if (cmd.hasOption(OPT_ZK_PARENT_NODE)) {
597 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, cmd.getOptionValue(OPT_ZK_PARENT_NODE));
600 if (isInitOnly) {
601 LOG.info("Initializing only; no reads or writes");
602 initTestTable();
603 return 0;
606 if (!isSkipInit) {
607 initTestTable();
609 LoadTestDataGenerator dataGen = null;
610 if (cmd.hasOption(OPT_GENERATOR)) {
611 String[] clazzAndArgs = cmd.getOptionValue(OPT_GENERATOR).split(COLON);
612 dataGen = getLoadGeneratorInstance(clazzAndArgs[0]);
613 String[] args;
614 if (dataGen instanceof LoadTestDataGeneratorWithACL) {
615 LOG.info("Using LoadTestDataGeneratorWithACL");
616 if (User.isHBaseSecurityEnabled(conf)) {
617 LOG.info("Security is enabled");
618 authnFileName = clazzAndArgs[1];
619 superUser = clazzAndArgs[2];
620 userNames = clazzAndArgs[3];
621 args = Arrays.copyOfRange(clazzAndArgs, 2, clazzAndArgs.length);
622 Properties authConfig = new Properties();
623 authConfig.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName));
624 try {
625 addAuthInfoToConf(authConfig, conf, superUser, userNames);
626 } catch (IOException exp) {
627 LOG.error(exp.toString(), exp);
628 return EXIT_FAILURE;
630 userOwner = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, superUser));
631 } else {
632 superUser = clazzAndArgs[1];
633 userNames = clazzAndArgs[2];
634 args = Arrays.copyOfRange(clazzAndArgs, 1, clazzAndArgs.length);
635 userOwner = User.createUserForTesting(conf, superUser, new String[0]);
637 } else {
638 args = clazzAndArgs.length == 1 ? new String[0] : Arrays.copyOfRange(clazzAndArgs, 1,
639 clazzAndArgs.length);
641 dataGen.initialize(args);
642 } else {
643 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
644 dataGen = new MultiThreadedAction.DefaultDataGenerator(minColDataSize, maxColDataSize,
645 minColsPerKey, maxColsPerKey, families);
648 if (userOwner != null) {
649 LOG.info("Granting permissions for user " + userOwner.getShortName());
650 Permission.Action[] actions = {
651 Permission.Action.ADMIN, Permission.Action.CREATE,
652 Permission.Action.READ, Permission.Action.WRITE };
653 try {
654 AccessControlClient.grant(ConnectionFactory.createConnection(conf),
655 tableName, userOwner.getShortName(), null, null, actions);
656 } catch (Throwable e) {
657 LOG.error(HBaseMarkers.FATAL, "Error in granting permission for the user " +
658 userOwner.getShortName(), e);
659 return EXIT_FAILURE;
663 if (userNames != null) {
664 // This will be comma separated list of expressions.
665 String users[] = userNames.split(",");
666 User user = null;
667 for (String userStr : users) {
668 if (User.isHBaseSecurityEnabled(conf)) {
669 user = User.create(HBaseKerberosUtils.loginAndReturnUGI(conf, userStr));
670 } else {
671 user = User.createUserForTesting(conf, userStr, new String[0]);
676 if (isWrite) {
677 if (userOwner != null) {
678 writerThreads = new MultiThreadedWriterWithACL(dataGen, conf, tableName, userOwner);
679 } else {
680 String writerClass = null;
681 if (cmd.hasOption(OPT_WRITER)) {
682 writerClass = cmd.getOptionValue(OPT_WRITER);
683 } else {
684 writerClass = MultiThreadedWriter.class.getCanonicalName();
687 writerThreads = getMultiThreadedWriterInstance(writerClass, dataGen);
689 writerThreads.setMultiPut(isMultiPut);
692 if (isUpdate) {
693 if (userOwner != null) {
694 updaterThreads = new MultiThreadedUpdaterWithACL(dataGen, conf, tableName, updatePercent,
695 userOwner, userNames);
696 } else {
697 String updaterClass = null;
698 if (cmd.hasOption(OPT_UPDATER)) {
699 updaterClass = cmd.getOptionValue(OPT_UPDATER);
700 } else {
701 updaterClass = MultiThreadedUpdater.class.getCanonicalName();
703 updaterThreads = getMultiThreadedUpdaterInstance(updaterClass, dataGen);
705 updaterThreads.setBatchUpdate(isBatchUpdate);
706 updaterThreads.setIgnoreNonceConflicts(ignoreConflicts);
709 if (isRead) {
710 if (userOwner != null) {
711 readerThreads = new MultiThreadedReaderWithACL(dataGen, conf, tableName, verifyPercent,
712 userNames);
713 } else {
714 String readerClass = null;
715 if (cmd.hasOption(OPT_READER)) {
716 readerClass = cmd.getOptionValue(OPT_READER);
717 } else {
718 readerClass = MultiThreadedReader.class.getCanonicalName();
720 readerThreads = getMultiThreadedReaderInstance(readerClass, dataGen);
722 readerThreads.setMaxErrors(maxReadErrors);
723 readerThreads.setKeyWindow(keyWindow);
724 readerThreads.setMultiGetBatchSize(multiGetBatchSize);
725 readerThreads.setRegionReplicaId(regionReplicaId);
728 if (isUpdate && isWrite) {
729 LOG.info("Concurrent write/update workload: making updaters aware of the " +
730 "write point");
731 updaterThreads.linkToWriter(writerThreads);
734 if (isRead && (isUpdate || isWrite)) {
735 LOG.info("Concurrent write/read workload: making readers aware of the " +
736 "write point");
737 readerThreads.linkToWriter(isUpdate ? updaterThreads : writerThreads);
740 if (isWrite) {
741 System.out.println("Starting to write data...");
742 writerThreads.start(startKey, endKey, numWriterThreads);
745 if (isUpdate) {
746 LOG.info("Starting to mutate data...");
747 System.out.println("Starting to mutate data...");
748 // TODO : currently append and increment operations not tested with tags
749 // Will update this after it is done
750 updaterThreads.start(startKey, endKey, numUpdaterThreads);
753 if (isRead) {
754 System.out.println("Starting to read data...");
755 readerThreads.start(startKey, endKey, numReaderThreads);
758 if (isWrite) {
759 writerThreads.waitForFinish();
762 if (isUpdate) {
763 updaterThreads.waitForFinish();
766 if (isRead) {
767 readerThreads.waitForFinish();
770 boolean success = true;
771 if (isWrite) {
772 success = success && writerThreads.getNumWriteFailures() == 0;
774 if (isUpdate) {
775 success = success && updaterThreads.getNumWriteFailures() == 0;
777 if (isRead) {
778 success = success && readerThreads.getNumReadErrors() == 0
779 && readerThreads.getNumReadFailures() == 0;
781 return success ? EXIT_SUCCESS : EXIT_FAILURE;
784 private LoadTestDataGenerator getLoadGeneratorInstance(String clazzName) throws IOException {
785 try {
786 Class<?> clazz = Class.forName(clazzName);
787 Constructor<?> constructor = clazz.getConstructor(int.class, int.class, int.class, int.class,
788 byte[][].class);
789 return (LoadTestDataGenerator) constructor.newInstance(minColDataSize, maxColDataSize,
790 minColsPerKey, maxColsPerKey, families);
791 } catch (Exception e) {
792 throw new IOException(e);
796 private MultiThreadedWriter getMultiThreadedWriterInstance(String clazzName
797 , LoadTestDataGenerator dataGen) throws IOException {
798 try {
799 Class<?> clazz = Class.forName(clazzName);
800 Constructor<?> constructor = clazz.getConstructor(
801 LoadTestDataGenerator.class, Configuration.class, TableName.class);
802 return (MultiThreadedWriter) constructor.newInstance(dataGen, conf, tableName);
803 } catch (Exception e) {
804 throw new IOException(e);
808 private MultiThreadedUpdater getMultiThreadedUpdaterInstance(String clazzName
809 , LoadTestDataGenerator dataGen) throws IOException {
810 try {
811 Class<?> clazz = Class.forName(clazzName);
812 Constructor<?> constructor = clazz.getConstructor(
813 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
814 return (MultiThreadedUpdater) constructor.newInstance(
815 dataGen, conf, tableName, updatePercent);
816 } catch (Exception e) {
817 throw new IOException(e);
821 private MultiThreadedReader getMultiThreadedReaderInstance(String clazzName
822 , LoadTestDataGenerator dataGen) throws IOException {
823 try {
824 Class<?> clazz = Class.forName(clazzName);
825 Constructor<?> constructor = clazz.getConstructor(
826 LoadTestDataGenerator.class, Configuration.class, TableName.class, double.class);
827 return (MultiThreadedReader) constructor.newInstance(dataGen, conf, tableName, verifyPercent);
828 } catch (Exception e) {
829 throw new IOException(e);
833 public static void main(String[] args) {
834 new LoadTestTool().doStaticMain(args);
838 * When NUM_TABLES is specified, the function starts multiple worker threads
839 * which individually start a LoadTestTool instance to load a table. Each
840 * table name is in format &lt;tn>_&lt;index>. For example, "-tn test -num_tables 2"
841 * , table names will be "test_1", "test_2"
843 * @throws IOException if one of the load tasks is unable to complete
845 private int parallelLoadTables()
846 throws IOException {
847 // create new command args
848 String tableName = cmd.getOptionValue(OPT_TABLE_NAME, DEFAULT_TABLE_NAME);
849 String[] newArgs = null;
850 if (!cmd.hasOption(LoadTestTool.OPT_TABLE_NAME)) {
851 newArgs = new String[cmdLineArgs.length + 2];
852 newArgs[0] = "-" + LoadTestTool.OPT_TABLE_NAME;
853 newArgs[1] = LoadTestTool.DEFAULT_TABLE_NAME;
854 System.arraycopy(cmdLineArgs, 0, newArgs, 2, cmdLineArgs.length);
855 } else {
856 newArgs = cmdLineArgs;
859 int tableNameValueIndex = -1;
860 for (int j = 0; j < newArgs.length; j++) {
861 if (newArgs[j].endsWith(OPT_TABLE_NAME)) {
862 tableNameValueIndex = j + 1;
863 } else if (newArgs[j].endsWith(NUM_TABLES)) {
864 // change NUM_TABLES to 1 so that each worker loads one table
865 newArgs[j + 1] = "1";
869 // starting to load multiple tables
870 List<WorkerThread> workers = new ArrayList<>();
871 for (int i = 0; i < numTables; i++) {
872 String[] workerArgs = newArgs.clone();
873 workerArgs[tableNameValueIndex] = tableName + "_" + (i+1);
874 WorkerThread worker = new WorkerThread(i, workerArgs);
875 workers.add(worker);
876 LOG.info(worker + " starting");
877 worker.start();
880 // wait for all workers finish
881 LOG.info("Waiting for worker threads to finish");
882 for (WorkerThread t : workers) {
883 try {
884 t.join();
885 } catch (InterruptedException ie) {
886 IOException iie = new InterruptedIOException();
887 iie.initCause(ie);
888 throw iie;
890 checkForErrors();
893 return EXIT_SUCCESS;
896 // If an exception is thrown by one of worker threads, it will be
897 // stored here.
898 protected AtomicReference<Throwable> thrown = new AtomicReference<>();
900 private void workerThreadError(Throwable t) {
901 thrown.compareAndSet(null, t);
905 * Check for errors in the writer threads. If any is found, rethrow it.
907 private void checkForErrors() throws IOException {
908 Throwable thrown = this.thrown.get();
909 if (thrown == null) return;
910 if (thrown instanceof IOException) {
911 throw (IOException) thrown;
912 } else {
913 throw new RuntimeException(thrown);
917 class WorkerThread extends Thread {
918 private String[] workerArgs;
920 WorkerThread(int i, String[] args) {
921 super("WorkerThread-" + i);
922 workerArgs = args;
925 @Override
926 public void run() {
927 try {
928 int ret = ToolRunner.run(HBaseConfiguration.create(), new LoadTestTool(), workerArgs);
929 if (ret != 0) {
930 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
932 } catch (Exception ex) {
933 LOG.error("Error in worker thread", ex);
934 workerThreadError(ex);
939 private void addAuthInfoToConf(Properties authConfig, Configuration conf, String owner,
940 String userList) throws IOException {
941 List<String> users = new ArrayList<>(Arrays.asList(userList.split(",")));
942 users.add(owner);
943 for (String user : users) {
944 String keyTabFileConfKey = "hbase." + user + ".keytab.file";
945 String principalConfKey = "hbase." + user + ".kerberos.principal";
946 if (!authConfig.containsKey(keyTabFileConfKey) || !authConfig.containsKey(principalConfKey)) {
947 throw new IOException("Authentication configs missing for user : " + user);
950 for (String key : authConfig.stringPropertyNames()) {
951 conf.set(key, authConfig.getProperty(key));
953 LOG.debug("Added authentication properties to config successfully.");