2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
17 package org
.apache
.hadoop
.hbase
.util
;
19 import java
.io
.IOException
;
20 import java
.io
.InterruptedIOException
;
21 import java
.lang
.reflect
.Constructor
;
22 import java
.util
.ArrayList
;
23 import java
.util
.Arrays
;
24 import java
.util
.List
;
25 import java
.util
.Properties
;
26 import java
.util
.concurrent
.atomic
.AtomicReference
;
27 import javax
.crypto
.spec
.SecretKeySpec
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
30 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.Admin
;
35 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
39 import org
.apache
.hadoop
.hbase
.client
.Durability
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
41 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
42 import org
.apache
.hadoop
.hbase
.io
.crypto
.Cipher
;
43 import org
.apache
.hadoop
.hbase
.io
.crypto
.Encryption
;
44 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
;
45 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
46 import org
.apache
.hadoop
.hbase
.logging
.Log4jUtils
;
47 import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
48 import org
.apache
.hadoop
.hbase
.security
.EncryptionUtil
;
49 import org
.apache
.hadoop
.hbase
.security
.HBaseKerberosUtils
;
50 import org
.apache
.hadoop
.hbase
.security
.User
;
51 import org
.apache
.hadoop
.hbase
.security
.access
.AccessControlClient
;
52 import org
.apache
.hadoop
.hbase
.security
.access
.Permission
;
53 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
54 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGeneratorWithACL
;
55 import org
.apache
.hadoop
.util
.ToolRunner
;
56 import org
.apache
.yetus
.audience
.InterfaceAudience
;
57 import org
.apache
.zookeeper
.ZooKeeper
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
61 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.AlreadySelectedException
;
62 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
63 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLineParser
;
64 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.DefaultParser
;
65 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.MissingOptionException
;
66 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Options
;
67 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.ParseException
;
70 * A command-line utility that reads, writes, and verifies data. Unlike
71 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
72 * and supports simultaneously writing and reading the same set of keys.
74 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
75 public class LoadTestTool
extends AbstractHBaseTool
{
77 private static final Logger LOG
= LoggerFactory
.getLogger(LoadTestTool
.class);
78 private static final String COLON
= ":";
80 /** Table name for the test */
81 private TableName tableName
;
83 /** Column families for the test */
84 private byte[][] families
;
86 /** Table name to use of not overridden on the command line */
87 protected static final String DEFAULT_TABLE_NAME
= "cluster_test";
89 /** The default data size if not specified */
90 protected static final int DEFAULT_DATA_SIZE
= 64;
92 /** The number of reader/writer threads if not specified */
93 protected static final int DEFAULT_NUM_THREADS
= 20;
95 /** Usage string for the load option */
96 protected static final String OPT_USAGE_LOAD
=
97 "<avg_cols_per_key>:<avg_data_size>" +
98 "[:<#threads=" + DEFAULT_NUM_THREADS
+ ">]";
100 /** Usage string for the read option */
101 protected static final String OPT_USAGE_READ
=
102 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS
+ ">]";
104 /** Usage string for the update option */
105 protected static final String OPT_USAGE_UPDATE
=
106 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
107 + ">][:<#whether to ignore nonce collisions=0>]";
109 protected static final String OPT_USAGE_BLOOM
= "Bloom filter type, one of " +
110 Arrays
.toString(BloomType
.values());
112 protected static final String OPT_USAGE_COMPRESSION
= "Compression type, " +
113 "one of " + Arrays
.toString(Compression
.Algorithm
.values());
115 protected static final String OPT_VERBOSE
= "verbose";
117 public static final String OPT_BLOOM
= "bloom";
118 public static final String OPT_BLOOM_PARAM
= "bloom_param";
119 public static final String OPT_COMPRESSION
= "compression";
120 public static final String OPT_DEFERRED_LOG_FLUSH
= "deferredlogflush";
121 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE
= "Enable deferred log flush.";
123 public static final String OPT_INMEMORY
= "in_memory";
124 public static final String OPT_USAGE_IN_MEMORY
= "Tries to keep the HFiles of the CF " +
125 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
127 public static final String OPT_GENERATOR
= "generator";
128 public static final String OPT_GENERATOR_USAGE
= "The class which generates load for the tool."
129 + " Any args for this class can be passed as colon separated after class name";
131 public static final String OPT_WRITER
= "writer";
132 public static final String OPT_WRITER_USAGE
= "The class for executing the write requests";
134 public static final String OPT_UPDATER
= "updater";
135 public static final String OPT_UPDATER_USAGE
= "The class for executing the update requests";
137 public static final String OPT_READER
= "reader";
138 public static final String OPT_READER_USAGE
= "The class for executing the read requests";
140 protected static final String OPT_KEY_WINDOW
= "key_window";
141 protected static final String OPT_WRITE
= "write";
142 protected static final String OPT_MAX_READ_ERRORS
= "max_read_errors";
143 public static final String OPT_MULTIPUT
= "multiput";
144 public static final String OPT_MULTIGET
= "multiget_batchsize";
145 protected static final String OPT_NUM_KEYS
= "num_keys";
146 protected static final String OPT_READ
= "read";
147 protected static final String OPT_START_KEY
= "start_key";
148 public static final String OPT_TABLE_NAME
= "tn";
149 public static final String OPT_COLUMN_FAMILIES
= "families";
150 protected static final String OPT_ZK_QUORUM
= "zk";
151 protected static final String OPT_ZK_PARENT_NODE
= "zk_root";
152 protected static final String OPT_SKIP_INIT
= "skip_init";
153 protected static final String OPT_INIT_ONLY
= "init_only";
154 protected static final String NUM_TABLES
= "num_tables";
155 protected static final String OPT_BATCHUPDATE
= "batchupdate";
156 protected static final String OPT_UPDATE
= "update";
158 public static final String OPT_ENCRYPTION
= "encryption";
159 protected static final String OPT_ENCRYPTION_USAGE
=
160 "Enables transparent encryption on the test table, one of " +
161 Arrays
.toString(Encryption
.getSupportedCiphers());
163 public static final String OPT_NUM_REGIONS_PER_SERVER
= "num_regions_per_server";
164 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
165 = "Desired number of regions per region server. Defaults to 5.";
166 public static int DEFAULT_NUM_REGIONS_PER_SERVER
= 5;
168 public static final String OPT_REGION_REPLICATION
= "region_replication";
169 protected static final String OPT_REGION_REPLICATION_USAGE
=
170 "Desired number of replicas per region";
172 public static final String OPT_REGION_REPLICA_ID
= "region_replica_id";
173 protected static final String OPT_REGION_REPLICA_ID_USAGE
=
174 "Region replica id to do the reads from";
176 public static final String OPT_MOB_THRESHOLD
= "mob_threshold";
177 protected static final String OPT_MOB_THRESHOLD_USAGE
=
178 "Desired cell size to exceed in bytes that will use the MOB write path";
180 protected static final long DEFAULT_START_KEY
= 0;
182 /** This will be removed as we factor out the dependency on command line */
183 protected CommandLine cmd
;
185 protected MultiThreadedWriter writerThreads
= null;
186 protected MultiThreadedReader readerThreads
= null;
187 protected MultiThreadedUpdater updaterThreads
= null;
189 protected long startKey
, endKey
;
191 protected boolean isVerbose
, isWrite
, isRead
, isUpdate
;
192 protected boolean deferredLogFlush
;
194 // Column family options
195 protected DataBlockEncoding dataBlockEncodingAlgo
;
196 protected Compression
.Algorithm compressAlgo
;
197 protected BloomType bloomType
;
198 private boolean inMemoryCF
;
200 private User userOwner
;
202 protected int numWriterThreads
= DEFAULT_NUM_THREADS
;
203 protected int minColsPerKey
, maxColsPerKey
;
204 protected int minColDataSize
= DEFAULT_DATA_SIZE
, maxColDataSize
= DEFAULT_DATA_SIZE
;
205 protected boolean isMultiPut
;
208 protected int numUpdaterThreads
= DEFAULT_NUM_THREADS
;
209 protected int updatePercent
;
210 protected boolean ignoreConflicts
= false;
211 protected boolean isBatchUpdate
;
214 private int numReaderThreads
= DEFAULT_NUM_THREADS
;
215 private int keyWindow
= MultiThreadedReader
.DEFAULT_KEY_WINDOW
;
216 private int multiGetBatchSize
= MultiThreadedReader
.DEFAULT_BATCH_SIZE
;
217 private int maxReadErrors
= MultiThreadedReader
.DEFAULT_MAX_ERRORS
;
218 private int verifyPercent
;
220 private int numTables
= 1;
222 private String superUser
;
224 private String userNames
;
225 //This file is used to read authentication information in secure clusters.
226 private String authnFileName
;
228 private int numRegionsPerServer
= DEFAULT_NUM_REGIONS_PER_SERVER
;
229 private int regionReplication
= -1; // not set
230 private int regionReplicaId
= -1; // not set
232 private int mobThreshold
= -1; // not set
234 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
235 // console tool itself should only be used from console.
236 protected boolean isSkipInit
= false;
237 protected boolean isInitOnly
= false;
239 protected Cipher cipher
= null;
241 protected String
[] splitColonSeparated(String option
,
242 int minNumCols
, int maxNumCols
) {
243 String optVal
= cmd
.getOptionValue(option
);
244 String
[] cols
= optVal
.split(COLON
);
245 if (cols
.length
< minNumCols
|| cols
.length
> maxNumCols
) {
246 throw new IllegalArgumentException("Expected at least "
247 + minNumCols
+ " columns but no more than " + maxNumCols
+
248 " in the colon-separated value '" + optVal
+ "' of the " +
249 "-" + option
+ " option");
254 protected int getNumThreads(String numThreadsStr
) {
255 return parseInt(numThreadsStr
, 1, Short
.MAX_VALUE
);
258 public byte[][] getColumnFamilies() {
263 * Apply column family options such as Bloom filters, compression, and data
266 protected void applyColumnFamilyOptions(TableName tableName
,
267 byte[][] columnFamilies
) throws IOException
{
268 try (Connection conn
= ConnectionFactory
.createConnection(conf
);
269 Admin admin
= conn
.getAdmin()) {
270 TableDescriptor tableDesc
= admin
.getDescriptor(tableName
);
271 LOG
.info("Disabling table " + tableName
);
272 admin
.disableTable(tableName
);
273 for (byte[] cf
: columnFamilies
) {
274 ColumnFamilyDescriptor columnDesc
= tableDesc
.getColumnFamily(cf
);
275 boolean isNewCf
= columnDesc
== null;
276 ColumnFamilyDescriptorBuilder columnDescBuilder
= isNewCf ?
277 ColumnFamilyDescriptorBuilder
.newBuilder(cf
) :
278 ColumnFamilyDescriptorBuilder
.newBuilder(columnDesc
);
279 if (bloomType
!= null) {
280 columnDescBuilder
.setBloomFilterType(bloomType
);
282 if (compressAlgo
!= null) {
283 columnDescBuilder
.setCompressionType(compressAlgo
);
285 if (dataBlockEncodingAlgo
!= null) {
286 columnDescBuilder
.setDataBlockEncoding(dataBlockEncodingAlgo
);
289 columnDescBuilder
.setInMemory(inMemoryCF
);
291 if (cipher
!= null) {
292 byte[] keyBytes
= new byte[cipher
.getKeyLength()];
293 Bytes
.secureRandom(keyBytes
);
294 columnDescBuilder
.setEncryptionType(cipher
.getName());
295 columnDescBuilder
.setEncryptionKey(
296 EncryptionUtil
.wrapKey(conf
,
297 User
.getCurrent().getShortName(),
298 new SecretKeySpec(keyBytes
,
301 if (mobThreshold
>= 0) {
302 columnDescBuilder
.setMobEnabled(true);
303 columnDescBuilder
.setMobThreshold(mobThreshold
);
307 admin
.addColumnFamily(tableName
, columnDescBuilder
.build());
309 admin
.modifyColumnFamily(tableName
, columnDescBuilder
.build());
312 LOG
.info("Enabling table " + tableName
);
313 admin
.enableTable(tableName
);
318 protected void addOptions() {
319 addOptNoArg("v", OPT_VERBOSE
, "Will display a full readout of logs, including ZooKeeper");
320 addOptWithArg(OPT_ZK_QUORUM
, "ZK quorum as comma-separated host names " +
321 "without port numbers");
322 addOptWithArg(OPT_ZK_PARENT_NODE
, "name of parent znode in zookeeper");
323 addOptWithArg(OPT_TABLE_NAME
, "The name of the table to read or write");
324 addOptWithArg(OPT_COLUMN_FAMILIES
, "The name of the column families to use separated by comma");
325 addOptWithArg(OPT_WRITE
, OPT_USAGE_LOAD
);
326 addOptWithArg(OPT_READ
, OPT_USAGE_READ
);
327 addOptWithArg(OPT_UPDATE
, OPT_USAGE_UPDATE
);
328 addOptNoArg(OPT_INIT_ONLY
, "Initialize the test table only, don't do any loading");
329 addOptWithArg(OPT_BLOOM
, OPT_USAGE_BLOOM
);
330 addOptWithArg(OPT_BLOOM_PARAM
, "the parameter of bloom filter type");
331 addOptWithArg(OPT_COMPRESSION
, OPT_USAGE_COMPRESSION
);
332 addOptWithArg(HFileTestUtil
.OPT_DATA_BLOCK_ENCODING
, HFileTestUtil
.OPT_DATA_BLOCK_ENCODING_USAGE
);
333 addOptWithArg(OPT_MAX_READ_ERRORS
, "The maximum number of read errors " +
334 "to tolerate before terminating all reader threads. The default is " +
335 MultiThreadedReader
.DEFAULT_MAX_ERRORS
+ ".");
336 addOptWithArg(OPT_MULTIGET
, "Whether to use multi-gets as opposed to " +
337 "separate gets for every column in a row");
338 addOptWithArg(OPT_KEY_WINDOW
, "The 'key window' to maintain between " +
339 "reads and writes for concurrent write/read workload. The default " +
340 "is " + MultiThreadedReader
.DEFAULT_KEY_WINDOW
+ ".");
342 addOptNoArg(OPT_MULTIPUT
, "Whether to use multi-puts as opposed to " +
343 "separate puts for every column in a row");
344 addOptNoArg(OPT_BATCHUPDATE
, "Whether to use batch as opposed to " +
345 "separate updates for every column in a row");
346 addOptNoArg(OPT_INMEMORY
, OPT_USAGE_IN_MEMORY
);
347 addOptWithArg(OPT_GENERATOR
, OPT_GENERATOR_USAGE
);
348 addOptWithArg(OPT_WRITER
, OPT_WRITER_USAGE
);
349 addOptWithArg(OPT_UPDATER
, OPT_UPDATER_USAGE
);
350 addOptWithArg(OPT_READER
, OPT_READER_USAGE
);
352 addOptWithArg(OPT_NUM_KEYS
, "The number of keys to read/write");
353 addOptWithArg(OPT_START_KEY
, "The first key to read/write " +
354 "(a 0-based index). The default value is " +
355 DEFAULT_START_KEY
+ ".");
356 addOptNoArg(OPT_SKIP_INIT
, "Skip the initialization; assume test table "
359 addOptWithArg(NUM_TABLES
,
360 "A positive integer number. When a number n is specified, load test "
361 + "tool will load n table parallely. -tn parameter value becomes "
362 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
364 addOptWithArg(OPT_ENCRYPTION
, OPT_ENCRYPTION_USAGE
);
365 addOptNoArg(OPT_DEFERRED_LOG_FLUSH
, OPT_DEFERRED_LOG_FLUSH_USAGE
);
366 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER
, OPT_NUM_REGIONS_PER_SERVER_USAGE
);
367 addOptWithArg(OPT_REGION_REPLICATION
, OPT_REGION_REPLICATION_USAGE
);
368 addOptWithArg(OPT_REGION_REPLICA_ID
, OPT_REGION_REPLICA_ID_USAGE
);
369 addOptWithArg(OPT_MOB_THRESHOLD
, OPT_MOB_THRESHOLD_USAGE
);
373 protected CommandLineParser
newParser() {
374 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
375 // Validate in parse() to get helpful error messages instead of exploding in processOptions()
376 return new DefaultParser() {
378 public CommandLine
parse(Options opts
, String
[] args
, Properties props
, boolean stop
)
379 throws ParseException
{
380 CommandLine cl
= super.parse(opts
, args
, props
, stop
);
382 boolean isReadWriteUpdate
= cmd
.hasOption(OPT_READ
)
383 || cmd
.hasOption(OPT_WRITE
)
384 || cmd
.hasOption(OPT_UPDATE
);
385 boolean isInitOnly
= cmd
.hasOption(OPT_INIT_ONLY
);
387 if (!isInitOnly
&& !isReadWriteUpdate
) {
388 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
389 + " or at least one of -" + OPT_READ
+ ", -" + OPT_WRITE
+ ", -" + OPT_UPDATE
);
392 if (isInitOnly
&& isReadWriteUpdate
) {
393 throw new AlreadySelectedException(OPT_INIT_ONLY
+ " cannot be specified with any of -"
394 + OPT_READ
+ ", -" + OPT_WRITE
+ ", -" + OPT_UPDATE
);
397 if (isReadWriteUpdate
&& !cmd
.hasOption(OPT_NUM_KEYS
)) {
398 throw new MissingOptionException(OPT_NUM_KEYS
+ " must be specified in read/write mode.");
407 protected void processOptions(CommandLine cmd
) {
410 tableName
= TableName
.valueOf(cmd
.getOptionValue(OPT_TABLE_NAME
,
411 DEFAULT_TABLE_NAME
));
413 if (cmd
.hasOption(OPT_COLUMN_FAMILIES
)) {
414 String
[] list
= cmd
.getOptionValue(OPT_COLUMN_FAMILIES
).split(",");
415 families
= new byte[list
.length
][];
416 for (int i
= 0; i
< list
.length
; i
++) {
417 families
[i
] = Bytes
.toBytes(list
[i
]);
420 families
= HFileTestUtil
.DEFAULT_COLUMN_FAMILIES
;
423 isVerbose
= cmd
.hasOption(OPT_VERBOSE
);
424 isWrite
= cmd
.hasOption(OPT_WRITE
);
425 isRead
= cmd
.hasOption(OPT_READ
);
426 isUpdate
= cmd
.hasOption(OPT_UPDATE
);
427 isInitOnly
= cmd
.hasOption(OPT_INIT_ONLY
);
428 deferredLogFlush
= cmd
.hasOption(OPT_DEFERRED_LOG_FLUSH
);
431 startKey
= parseLong(cmd
.getOptionValue(OPT_START_KEY
,
432 String
.valueOf(DEFAULT_START_KEY
)), 0, Long
.MAX_VALUE
);
433 long numKeys
= parseLong(cmd
.getOptionValue(OPT_NUM_KEYS
), 1,
434 Long
.MAX_VALUE
- startKey
);
435 endKey
= startKey
+ numKeys
;
436 isSkipInit
= cmd
.hasOption(OPT_SKIP_INIT
);
437 System
.out
.println("Key range: [" + startKey
+ ".." + (endKey
- 1) + "]");
440 parseColumnFamilyOptions(cmd
);
443 String
[] writeOpts
= splitColonSeparated(OPT_WRITE
, 2, 3);
447 maxColsPerKey
= 2 * Integer
.parseInt(writeOpts
[colIndex
++]);
449 parseInt(writeOpts
[colIndex
++], 1, Integer
.MAX_VALUE
);
450 minColDataSize
= avgColDataSize
/ 2;
451 maxColDataSize
= avgColDataSize
* 3 / 2;
453 if (colIndex
< writeOpts
.length
) {
454 numWriterThreads
= getNumThreads(writeOpts
[colIndex
++]);
457 isMultiPut
= cmd
.hasOption(OPT_MULTIPUT
);
460 if (cmd
.hasOption(OPT_MOB_THRESHOLD
)) {
461 mobThreshold
= Integer
.parseInt(cmd
.getOptionValue(OPT_MOB_THRESHOLD
));
464 System
.out
.println("Multi-puts: " + isMultiPut
);
465 System
.out
.println("Columns per key: " + minColsPerKey
+ ".."
467 System
.out
.println("Data size per column: " + minColDataSize
+ ".."
472 String
[] mutateOpts
= splitColonSeparated(OPT_UPDATE
, 1, 3);
474 updatePercent
= parseInt(mutateOpts
[colIndex
++], 0, 100);
475 if (colIndex
< mutateOpts
.length
) {
476 numUpdaterThreads
= getNumThreads(mutateOpts
[colIndex
++]);
478 if (colIndex
< mutateOpts
.length
) {
479 ignoreConflicts
= parseInt(mutateOpts
[colIndex
++], 0, 1) == 1;
482 isBatchUpdate
= cmd
.hasOption(OPT_BATCHUPDATE
);
484 System
.out
.println("Batch updates: " + isBatchUpdate
);
485 System
.out
.println("Percent of keys to update: " + updatePercent
);
486 System
.out
.println("Updater threads: " + numUpdaterThreads
);
487 System
.out
.println("Ignore nonce conflicts: " + ignoreConflicts
);
491 String
[] readOpts
= splitColonSeparated(OPT_READ
, 1, 2);
493 verifyPercent
= parseInt(readOpts
[colIndex
++], 0, 100);
494 if (colIndex
< readOpts
.length
) {
495 numReaderThreads
= getNumThreads(readOpts
[colIndex
++]);
498 if (cmd
.hasOption(OPT_MAX_READ_ERRORS
)) {
499 maxReadErrors
= parseInt(cmd
.getOptionValue(OPT_MAX_READ_ERRORS
),
500 0, Integer
.MAX_VALUE
);
503 if (cmd
.hasOption(OPT_KEY_WINDOW
)) {
504 keyWindow
= parseInt(cmd
.getOptionValue(OPT_KEY_WINDOW
),
505 0, Integer
.MAX_VALUE
);
508 if (cmd
.hasOption(OPT_MULTIGET
)) {
509 multiGetBatchSize
= parseInt(cmd
.getOptionValue(OPT_MULTIGET
),
510 0, Integer
.MAX_VALUE
);
513 System
.out
.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize
);
514 System
.out
.println("Percent of keys to verify: " + verifyPercent
);
515 System
.out
.println("Reader threads: " + numReaderThreads
);
519 if (cmd
.hasOption(NUM_TABLES
)) {
520 numTables
= parseInt(cmd
.getOptionValue(NUM_TABLES
), 1, Short
.MAX_VALUE
);
523 numRegionsPerServer
= DEFAULT_NUM_REGIONS_PER_SERVER
;
524 if (cmd
.hasOption(OPT_NUM_REGIONS_PER_SERVER
)) {
525 numRegionsPerServer
= Integer
.parseInt(cmd
.getOptionValue(OPT_NUM_REGIONS_PER_SERVER
));
528 regionReplication
= 1;
529 if (cmd
.hasOption(OPT_REGION_REPLICATION
)) {
530 regionReplication
= Integer
.parseInt(cmd
.getOptionValue(OPT_REGION_REPLICATION
));
533 regionReplicaId
= -1;
534 if (cmd
.hasOption(OPT_REGION_REPLICA_ID
)) {
535 regionReplicaId
= Integer
.parseInt(cmd
.getOptionValue(OPT_REGION_REPLICA_ID
));
539 private void parseColumnFamilyOptions(CommandLine cmd
) {
540 String dataBlockEncodingStr
= cmd
.getOptionValue(HFileTestUtil
.OPT_DATA_BLOCK_ENCODING
);
541 dataBlockEncodingAlgo
= dataBlockEncodingStr
== null ?
null :
542 DataBlockEncoding
.valueOf(dataBlockEncodingStr
);
544 String compressStr
= cmd
.getOptionValue(OPT_COMPRESSION
);
545 compressAlgo
= compressStr
== null ? Compression
.Algorithm
.NONE
:
546 Compression
.Algorithm
.valueOf(compressStr
);
548 String bloomStr
= cmd
.getOptionValue(OPT_BLOOM
);
549 bloomType
= bloomStr
== null ? BloomType
.ROW
:
550 BloomType
.valueOf(bloomStr
);
552 if (bloomType
== BloomType
.ROWPREFIX_FIXED_LENGTH
) {
553 if (!cmd
.hasOption(OPT_BLOOM_PARAM
)) {
554 LOG
.error("the parameter of bloom filter {} is not specified", bloomType
.name());
556 conf
.set(BloomFilterUtil
.PREFIX_LENGTH_KEY
, cmd
.getOptionValue(OPT_BLOOM_PARAM
));
560 inMemoryCF
= cmd
.hasOption(OPT_INMEMORY
);
561 if (cmd
.hasOption(OPT_ENCRYPTION
)) {
562 cipher
= Encryption
.getCipher(conf
, cmd
.getOptionValue(OPT_ENCRYPTION
));
567 public void initTestTable() throws IOException
{
568 Durability durability
= Durability
.USE_DEFAULT
;
569 if (deferredLogFlush
) {
570 durability
= Durability
.ASYNC_WAL
;
573 HBaseTestingUtil
.createPreSplitLoadTestTable(conf
, tableName
,
574 getColumnFamilies(), compressAlgo
, dataBlockEncodingAlgo
, numRegionsPerServer
,
575 regionReplication
, durability
);
576 applyColumnFamilyOptions(tableName
, getColumnFamilies());
580 protected int doWork() throws IOException
{
582 Log4jUtils
.setLogLevel(ZooKeeper
.class.getName(), "WARN");
585 return parallelLoadTables();
591 protected int loadTable() throws IOException
{
592 if (cmd
.hasOption(OPT_ZK_QUORUM
)) {
593 conf
.set(HConstants
.ZOOKEEPER_QUORUM
, cmd
.getOptionValue(OPT_ZK_QUORUM
));
595 if (cmd
.hasOption(OPT_ZK_PARENT_NODE
)) {
596 conf
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, cmd
.getOptionValue(OPT_ZK_PARENT_NODE
));
600 LOG
.info("Initializing only; no reads or writes");
608 LoadTestDataGenerator dataGen
= null;
609 if (cmd
.hasOption(OPT_GENERATOR
)) {
610 String
[] clazzAndArgs
= cmd
.getOptionValue(OPT_GENERATOR
).split(COLON
);
611 dataGen
= getLoadGeneratorInstance(clazzAndArgs
[0]);
613 if (dataGen
instanceof LoadTestDataGeneratorWithACL
) {
614 LOG
.info("Using LoadTestDataGeneratorWithACL");
615 if (User
.isHBaseSecurityEnabled(conf
)) {
616 LOG
.info("Security is enabled");
617 authnFileName
= clazzAndArgs
[1];
618 superUser
= clazzAndArgs
[2];
619 userNames
= clazzAndArgs
[3];
620 args
= Arrays
.copyOfRange(clazzAndArgs
, 2, clazzAndArgs
.length
);
621 Properties authConfig
= new Properties();
622 authConfig
.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName
));
624 addAuthInfoToConf(authConfig
, conf
, superUser
, userNames
);
625 } catch (IOException exp
) {
626 LOG
.error(exp
.toString(), exp
);
629 userOwner
= User
.create(HBaseKerberosUtils
.loginAndReturnUGI(conf
, superUser
));
631 superUser
= clazzAndArgs
[1];
632 userNames
= clazzAndArgs
[2];
633 args
= Arrays
.copyOfRange(clazzAndArgs
, 1, clazzAndArgs
.length
);
634 userOwner
= User
.createUserForTesting(conf
, superUser
, new String
[0]);
637 args
= clazzAndArgs
.length
== 1 ?
new String
[0] : Arrays
.copyOfRange(clazzAndArgs
, 1,
638 clazzAndArgs
.length
);
640 dataGen
.initialize(args
);
642 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
643 dataGen
= new MultiThreadedAction
.DefaultDataGenerator(minColDataSize
, maxColDataSize
,
644 minColsPerKey
, maxColsPerKey
, families
);
647 if (userOwner
!= null) {
648 LOG
.info("Granting permissions for user " + userOwner
.getShortName());
649 Permission
.Action
[] actions
= {
650 Permission
.Action
.ADMIN
, Permission
.Action
.CREATE
,
651 Permission
.Action
.READ
, Permission
.Action
.WRITE
};
653 AccessControlClient
.grant(ConnectionFactory
.createConnection(conf
),
654 tableName
, userOwner
.getShortName(), null, null, actions
);
655 } catch (Throwable e
) {
656 LOG
.error(HBaseMarkers
.FATAL
, "Error in granting permission for the user " +
657 userOwner
.getShortName(), e
);
662 if (userNames
!= null) {
663 // This will be comma separated list of expressions.
664 String users
[] = userNames
.split(",");
666 for (String userStr
: users
) {
667 if (User
.isHBaseSecurityEnabled(conf
)) {
668 user
= User
.create(HBaseKerberosUtils
.loginAndReturnUGI(conf
, userStr
));
670 user
= User
.createUserForTesting(conf
, userStr
, new String
[0]);
676 if (userOwner
!= null) {
677 writerThreads
= new MultiThreadedWriterWithACL(dataGen
, conf
, tableName
, userOwner
);
679 String writerClass
= null;
680 if (cmd
.hasOption(OPT_WRITER
)) {
681 writerClass
= cmd
.getOptionValue(OPT_WRITER
);
683 writerClass
= MultiThreadedWriter
.class.getCanonicalName();
686 writerThreads
= getMultiThreadedWriterInstance(writerClass
, dataGen
);
688 writerThreads
.setMultiPut(isMultiPut
);
692 if (userOwner
!= null) {
693 updaterThreads
= new MultiThreadedUpdaterWithACL(dataGen
, conf
, tableName
, updatePercent
,
694 userOwner
, userNames
);
696 String updaterClass
= null;
697 if (cmd
.hasOption(OPT_UPDATER
)) {
698 updaterClass
= cmd
.getOptionValue(OPT_UPDATER
);
700 updaterClass
= MultiThreadedUpdater
.class.getCanonicalName();
702 updaterThreads
= getMultiThreadedUpdaterInstance(updaterClass
, dataGen
);
704 updaterThreads
.setBatchUpdate(isBatchUpdate
);
705 updaterThreads
.setIgnoreNonceConflicts(ignoreConflicts
);
709 if (userOwner
!= null) {
710 readerThreads
= new MultiThreadedReaderWithACL(dataGen
, conf
, tableName
, verifyPercent
,
713 String readerClass
= null;
714 if (cmd
.hasOption(OPT_READER
)) {
715 readerClass
= cmd
.getOptionValue(OPT_READER
);
717 readerClass
= MultiThreadedReader
.class.getCanonicalName();
719 readerThreads
= getMultiThreadedReaderInstance(readerClass
, dataGen
);
721 readerThreads
.setMaxErrors(maxReadErrors
);
722 readerThreads
.setKeyWindow(keyWindow
);
723 readerThreads
.setMultiGetBatchSize(multiGetBatchSize
);
724 readerThreads
.setRegionReplicaId(regionReplicaId
);
727 if (isUpdate
&& isWrite
) {
728 LOG
.info("Concurrent write/update workload: making updaters aware of the " +
730 updaterThreads
.linkToWriter(writerThreads
);
733 if (isRead
&& (isUpdate
|| isWrite
)) {
734 LOG
.info("Concurrent write/read workload: making readers aware of the " +
736 readerThreads
.linkToWriter(isUpdate ? updaterThreads
: writerThreads
);
740 System
.out
.println("Starting to write data...");
741 writerThreads
.start(startKey
, endKey
, numWriterThreads
);
745 LOG
.info("Starting to mutate data...");
746 System
.out
.println("Starting to mutate data...");
747 // TODO : currently append and increment operations not tested with tags
748 // Will update this after it is done
749 updaterThreads
.start(startKey
, endKey
, numUpdaterThreads
);
753 System
.out
.println("Starting to read data...");
754 readerThreads
.start(startKey
, endKey
, numReaderThreads
);
758 writerThreads
.waitForFinish();
762 updaterThreads
.waitForFinish();
766 readerThreads
.waitForFinish();
769 boolean success
= true;
771 success
= success
&& writerThreads
.getNumWriteFailures() == 0;
774 success
= success
&& updaterThreads
.getNumWriteFailures() == 0;
777 success
= success
&& readerThreads
.getNumReadErrors() == 0
778 && readerThreads
.getNumReadFailures() == 0;
780 return success ? EXIT_SUCCESS
: EXIT_FAILURE
;
783 private LoadTestDataGenerator
getLoadGeneratorInstance(String clazzName
) throws IOException
{
785 Class
<?
> clazz
= Class
.forName(clazzName
);
786 Constructor
<?
> constructor
= clazz
.getConstructor(int.class, int.class, int.class, int.class,
788 return (LoadTestDataGenerator
) constructor
.newInstance(minColDataSize
, maxColDataSize
,
789 minColsPerKey
, maxColsPerKey
, families
);
790 } catch (Exception e
) {
791 throw new IOException(e
);
795 private MultiThreadedWriter
getMultiThreadedWriterInstance(String clazzName
796 , LoadTestDataGenerator dataGen
) throws IOException
{
798 Class
<?
> clazz
= Class
.forName(clazzName
);
799 Constructor
<?
> constructor
= clazz
.getConstructor(
800 LoadTestDataGenerator
.class, Configuration
.class, TableName
.class);
801 return (MultiThreadedWriter
) constructor
.newInstance(dataGen
, conf
, tableName
);
802 } catch (Exception e
) {
803 throw new IOException(e
);
807 private MultiThreadedUpdater
getMultiThreadedUpdaterInstance(String clazzName
808 , LoadTestDataGenerator dataGen
) throws IOException
{
810 Class
<?
> clazz
= Class
.forName(clazzName
);
811 Constructor
<?
> constructor
= clazz
.getConstructor(
812 LoadTestDataGenerator
.class, Configuration
.class, TableName
.class, double.class);
813 return (MultiThreadedUpdater
) constructor
.newInstance(
814 dataGen
, conf
, tableName
, updatePercent
);
815 } catch (Exception e
) {
816 throw new IOException(e
);
820 private MultiThreadedReader
getMultiThreadedReaderInstance(String clazzName
821 , LoadTestDataGenerator dataGen
) throws IOException
{
823 Class
<?
> clazz
= Class
.forName(clazzName
);
824 Constructor
<?
> constructor
= clazz
.getConstructor(
825 LoadTestDataGenerator
.class, Configuration
.class, TableName
.class, double.class);
826 return (MultiThreadedReader
) constructor
.newInstance(dataGen
, conf
, tableName
, verifyPercent
);
827 } catch (Exception e
) {
828 throw new IOException(e
);
832 public static void main(String
[] args
) {
833 new LoadTestTool().doStaticMain(args
);
837 * When NUM_TABLES is specified, the function starts multiple worker threads
838 * which individually start a LoadTestTool instance to load a table. Each
839 * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
840 * , table names will be "test_1", "test_2"
842 * @throws IOException if one of the load tasks is unable to complete
844 private int parallelLoadTables()
846 // create new command args
847 String tableName
= cmd
.getOptionValue(OPT_TABLE_NAME
, DEFAULT_TABLE_NAME
);
848 String
[] newArgs
= null;
849 if (!cmd
.hasOption(LoadTestTool
.OPT_TABLE_NAME
)) {
850 newArgs
= new String
[cmdLineArgs
.length
+ 2];
851 newArgs
[0] = "-" + LoadTestTool
.OPT_TABLE_NAME
;
852 newArgs
[1] = LoadTestTool
.DEFAULT_TABLE_NAME
;
853 System
.arraycopy(cmdLineArgs
, 0, newArgs
, 2, cmdLineArgs
.length
);
855 newArgs
= cmdLineArgs
;
858 int tableNameValueIndex
= -1;
859 for (int j
= 0; j
< newArgs
.length
; j
++) {
860 if (newArgs
[j
].endsWith(OPT_TABLE_NAME
)) {
861 tableNameValueIndex
= j
+ 1;
862 } else if (newArgs
[j
].endsWith(NUM_TABLES
)) {
863 // change NUM_TABLES to 1 so that each worker loads one table
864 newArgs
[j
+ 1] = "1";
868 // starting to load multiple tables
869 List
<WorkerThread
> workers
= new ArrayList
<>();
870 for (int i
= 0; i
< numTables
; i
++) {
871 String
[] workerArgs
= newArgs
.clone();
872 workerArgs
[tableNameValueIndex
] = tableName
+ "_" + (i
+1);
873 WorkerThread worker
= new WorkerThread(i
, workerArgs
);
875 LOG
.info(worker
+ " starting");
879 // wait for all workers finish
880 LOG
.info("Waiting for worker threads to finish");
881 for (WorkerThread t
: workers
) {
884 } catch (InterruptedException ie
) {
885 IOException iie
= new InterruptedIOException();
895 // If an exception is thrown by one of worker threads, it will be
897 protected AtomicReference
<Throwable
> thrown
= new AtomicReference
<>();
899 private void workerThreadError(Throwable t
) {
900 thrown
.compareAndSet(null, t
);
904 * Check for errors in the writer threads. If any is found, rethrow it.
906 private void checkForErrors() throws IOException
{
907 Throwable thrown
= this.thrown
.get();
908 if (thrown
== null) return;
909 if (thrown
instanceof IOException
) {
910 throw (IOException
) thrown
;
912 throw new RuntimeException(thrown
);
916 class WorkerThread
extends Thread
{
917 private String
[] workerArgs
;
919 WorkerThread(int i
, String
[] args
) {
920 super("WorkerThread-" + i
);
927 int ret
= ToolRunner
.run(HBaseConfiguration
.create(), new LoadTestTool(), workerArgs
);
929 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
931 } catch (Exception ex
) {
932 LOG
.error("Error in worker thread", ex
);
933 workerThreadError(ex
);
938 private void addAuthInfoToConf(Properties authConfig
, Configuration conf
, String owner
,
939 String userList
) throws IOException
{
940 List
<String
> users
= new ArrayList
<>(Arrays
.asList(userList
.split(",")));
942 for (String user
: users
) {
943 String keyTabFileConfKey
= "hbase." + user
+ ".keytab.file";
944 String principalConfKey
= "hbase." + user
+ ".kerberos.principal";
945 if (!authConfig
.containsKey(keyTabFileConfKey
) || !authConfig
.containsKey(principalConfKey
)) {
946 throw new IOException("Authentication configs missing for user : " + user
);
949 for (String key
: authConfig
.stringPropertyNames()) {
950 conf
.set(key
, authConfig
.getProperty(key
));
952 LOG
.debug("Added authentication properties to config successfully.");