2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
17 package org
.apache
.hadoop
.hbase
.util
;
19 import java
.io
.IOException
;
20 import java
.io
.InterruptedIOException
;
21 import java
.lang
.reflect
.Constructor
;
22 import java
.security
.SecureRandom
;
23 import java
.util
.ArrayList
;
24 import java
.util
.Arrays
;
25 import java
.util
.List
;
26 import java
.util
.Properties
;
27 import java
.util
.concurrent
.atomic
.AtomicReference
;
28 import javax
.crypto
.spec
.SecretKeySpec
;
29 import org
.apache
.hadoop
.conf
.Configuration
;
30 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
31 import org
.apache
.hadoop
.hbase
.HBaseInterfaceAudience
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
33 import org
.apache
.hadoop
.hbase
.HConstants
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.client
.Admin
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
37 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
38 import org
.apache
.hadoop
.hbase
.client
.Connection
;
39 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
40 import org
.apache
.hadoop
.hbase
.client
.Durability
;
41 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
42 import org
.apache
.hadoop
.hbase
.io
.compress
.Compression
;
43 import org
.apache
.hadoop
.hbase
.io
.crypto
.Cipher
;
44 import org
.apache
.hadoop
.hbase
.io
.crypto
.Encryption
;
45 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
;
46 import org
.apache
.hadoop
.hbase
.log
.HBaseMarkers
;
47 import org
.apache
.hadoop
.hbase
.logging
.Log4jUtils
;
48 import org
.apache
.hadoop
.hbase
.regionserver
.BloomType
;
49 import org
.apache
.hadoop
.hbase
.security
.EncryptionUtil
;
50 import org
.apache
.hadoop
.hbase
.security
.HBaseKerberosUtils
;
51 import org
.apache
.hadoop
.hbase
.security
.User
;
52 import org
.apache
.hadoop
.hbase
.security
.access
.AccessControlClient
;
53 import org
.apache
.hadoop
.hbase
.security
.access
.Permission
;
54 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
55 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGeneratorWithACL
;
56 import org
.apache
.hadoop
.util
.ToolRunner
;
57 import org
.apache
.yetus
.audience
.InterfaceAudience
;
58 import org
.apache
.zookeeper
.ZooKeeper
;
59 import org
.slf4j
.Logger
;
60 import org
.slf4j
.LoggerFactory
;
62 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.AlreadySelectedException
;
63 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
64 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLineParser
;
65 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.DefaultParser
;
66 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.MissingOptionException
;
67 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Options
;
68 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.ParseException
;
71 * A command-line utility that reads, writes, and verifies data. Unlike
72 * {@link org.apache.hadoop.hbase.PerformanceEvaluation}, this tool validates the data written,
73 * and supports simultaneously writing and reading the same set of keys.
75 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
76 public class LoadTestTool
extends AbstractHBaseTool
{
78 private static final Logger LOG
= LoggerFactory
.getLogger(LoadTestTool
.class);
79 private static final String COLON
= ":";
81 /** Table name for the test */
82 private TableName tableName
;
84 /** Column families for the test */
85 private byte[][] families
;
87 /** Table name to use of not overridden on the command line */
88 protected static final String DEFAULT_TABLE_NAME
= "cluster_test";
90 /** The default data size if not specified */
91 protected static final int DEFAULT_DATA_SIZE
= 64;
93 /** The number of reader/writer threads if not specified */
94 protected static final int DEFAULT_NUM_THREADS
= 20;
96 /** Usage string for the load option */
97 protected static final String OPT_USAGE_LOAD
=
98 "<avg_cols_per_key>:<avg_data_size>" +
99 "[:<#threads=" + DEFAULT_NUM_THREADS
+ ">]";
101 /** Usage string for the read option */
102 protected static final String OPT_USAGE_READ
=
103 "<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS
+ ">]";
105 /** Usage string for the update option */
106 protected static final String OPT_USAGE_UPDATE
=
107 "<update_percent>[:<#threads=" + DEFAULT_NUM_THREADS
108 + ">][:<#whether to ignore nonce collisions=0>]";
110 protected static final String OPT_USAGE_BLOOM
= "Bloom filter type, one of " +
111 Arrays
.toString(BloomType
.values());
113 protected static final String OPT_USAGE_COMPRESSION
= "Compression type, " +
114 "one of " + Arrays
.toString(Compression
.Algorithm
.values());
116 protected static final String OPT_VERBOSE
= "verbose";
118 public static final String OPT_BLOOM
= "bloom";
119 public static final String OPT_BLOOM_PARAM
= "bloom_param";
120 public static final String OPT_COMPRESSION
= "compression";
121 public static final String OPT_DEFERRED_LOG_FLUSH
= "deferredlogflush";
122 public static final String OPT_DEFERRED_LOG_FLUSH_USAGE
= "Enable deferred log flush.";
124 public static final String OPT_INMEMORY
= "in_memory";
125 public static final String OPT_USAGE_IN_MEMORY
= "Tries to keep the HFiles of the CF " +
126 "inmemory as far as possible. Not guaranteed that reads are always served from inmemory";
128 public static final String OPT_GENERATOR
= "generator";
129 public static final String OPT_GENERATOR_USAGE
= "The class which generates load for the tool."
130 + " Any args for this class can be passed as colon separated after class name";
132 public static final String OPT_WRITER
= "writer";
133 public static final String OPT_WRITER_USAGE
= "The class for executing the write requests";
135 public static final String OPT_UPDATER
= "updater";
136 public static final String OPT_UPDATER_USAGE
= "The class for executing the update requests";
138 public static final String OPT_READER
= "reader";
139 public static final String OPT_READER_USAGE
= "The class for executing the read requests";
141 protected static final String OPT_KEY_WINDOW
= "key_window";
142 protected static final String OPT_WRITE
= "write";
143 protected static final String OPT_MAX_READ_ERRORS
= "max_read_errors";
144 public static final String OPT_MULTIPUT
= "multiput";
145 public static final String OPT_MULTIGET
= "multiget_batchsize";
146 protected static final String OPT_NUM_KEYS
= "num_keys";
147 protected static final String OPT_READ
= "read";
148 protected static final String OPT_START_KEY
= "start_key";
149 public static final String OPT_TABLE_NAME
= "tn";
150 public static final String OPT_COLUMN_FAMILIES
= "families";
151 protected static final String OPT_ZK_QUORUM
= "zk";
152 protected static final String OPT_ZK_PARENT_NODE
= "zk_root";
153 protected static final String OPT_SKIP_INIT
= "skip_init";
154 protected static final String OPT_INIT_ONLY
= "init_only";
155 protected static final String NUM_TABLES
= "num_tables";
156 protected static final String OPT_BATCHUPDATE
= "batchupdate";
157 protected static final String OPT_UPDATE
= "update";
159 public static final String OPT_ENCRYPTION
= "encryption";
160 protected static final String OPT_ENCRYPTION_USAGE
=
161 "Enables transparent encryption on the test table, one of " +
162 Arrays
.toString(Encryption
.getSupportedCiphers());
164 public static final String OPT_NUM_REGIONS_PER_SERVER
= "num_regions_per_server";
165 protected static final String OPT_NUM_REGIONS_PER_SERVER_USAGE
166 = "Desired number of regions per region server. Defaults to 5.";
167 public static int DEFAULT_NUM_REGIONS_PER_SERVER
= 5;
169 public static final String OPT_REGION_REPLICATION
= "region_replication";
170 protected static final String OPT_REGION_REPLICATION_USAGE
=
171 "Desired number of replicas per region";
173 public static final String OPT_REGION_REPLICA_ID
= "region_replica_id";
174 protected static final String OPT_REGION_REPLICA_ID_USAGE
=
175 "Region replica id to do the reads from";
177 public static final String OPT_MOB_THRESHOLD
= "mob_threshold";
178 protected static final String OPT_MOB_THRESHOLD_USAGE
=
179 "Desired cell size to exceed in bytes that will use the MOB write path";
181 protected static final long DEFAULT_START_KEY
= 0;
183 /** This will be removed as we factor out the dependency on command line */
184 protected CommandLine cmd
;
186 protected MultiThreadedWriter writerThreads
= null;
187 protected MultiThreadedReader readerThreads
= null;
188 protected MultiThreadedUpdater updaterThreads
= null;
190 protected long startKey
, endKey
;
192 protected boolean isVerbose
, isWrite
, isRead
, isUpdate
;
193 protected boolean deferredLogFlush
;
195 // Column family options
196 protected DataBlockEncoding dataBlockEncodingAlgo
;
197 protected Compression
.Algorithm compressAlgo
;
198 protected BloomType bloomType
;
199 private boolean inMemoryCF
;
201 private User userOwner
;
203 protected int numWriterThreads
= DEFAULT_NUM_THREADS
;
204 protected int minColsPerKey
, maxColsPerKey
;
205 protected int minColDataSize
= DEFAULT_DATA_SIZE
, maxColDataSize
= DEFAULT_DATA_SIZE
;
206 protected boolean isMultiPut
;
209 protected int numUpdaterThreads
= DEFAULT_NUM_THREADS
;
210 protected int updatePercent
;
211 protected boolean ignoreConflicts
= false;
212 protected boolean isBatchUpdate
;
215 private int numReaderThreads
= DEFAULT_NUM_THREADS
;
216 private int keyWindow
= MultiThreadedReader
.DEFAULT_KEY_WINDOW
;
217 private int multiGetBatchSize
= MultiThreadedReader
.DEFAULT_BATCH_SIZE
;
218 private int maxReadErrors
= MultiThreadedReader
.DEFAULT_MAX_ERRORS
;
219 private int verifyPercent
;
221 private int numTables
= 1;
223 private String superUser
;
225 private String userNames
;
226 //This file is used to read authentication information in secure clusters.
227 private String authnFileName
;
229 private int numRegionsPerServer
= DEFAULT_NUM_REGIONS_PER_SERVER
;
230 private int regionReplication
= -1; // not set
231 private int regionReplicaId
= -1; // not set
233 private int mobThreshold
= -1; // not set
235 // TODO: refactor LoadTestToolImpl somewhere to make the usage from tests less bad,
236 // console tool itself should only be used from console.
237 protected boolean isSkipInit
= false;
238 protected boolean isInitOnly
= false;
240 protected Cipher cipher
= null;
242 protected String
[] splitColonSeparated(String option
,
243 int minNumCols
, int maxNumCols
) {
244 String optVal
= cmd
.getOptionValue(option
);
245 String
[] cols
= optVal
.split(COLON
);
246 if (cols
.length
< minNumCols
|| cols
.length
> maxNumCols
) {
247 throw new IllegalArgumentException("Expected at least "
248 + minNumCols
+ " columns but no more than " + maxNumCols
+
249 " in the colon-separated value '" + optVal
+ "' of the " +
250 "-" + option
+ " option");
255 protected int getNumThreads(String numThreadsStr
) {
256 return parseInt(numThreadsStr
, 1, Short
.MAX_VALUE
);
259 public byte[][] getColumnFamilies() {
264 * Apply column family options such as Bloom filters, compression, and data
267 protected void applyColumnFamilyOptions(TableName tableName
,
268 byte[][] columnFamilies
) throws IOException
{
269 try (Connection conn
= ConnectionFactory
.createConnection(conf
);
270 Admin admin
= conn
.getAdmin()) {
271 TableDescriptor tableDesc
= admin
.getDescriptor(tableName
);
272 LOG
.info("Disabling table " + tableName
);
273 admin
.disableTable(tableName
);
274 for (byte[] cf
: columnFamilies
) {
275 ColumnFamilyDescriptor columnDesc
= tableDesc
.getColumnFamily(cf
);
276 boolean isNewCf
= columnDesc
== null;
277 ColumnFamilyDescriptorBuilder columnDescBuilder
= isNewCf ?
278 ColumnFamilyDescriptorBuilder
.newBuilder(cf
) :
279 ColumnFamilyDescriptorBuilder
.newBuilder(columnDesc
);
280 if (bloomType
!= null) {
281 columnDescBuilder
.setBloomFilterType(bloomType
);
283 if (compressAlgo
!= null) {
284 columnDescBuilder
.setCompressionType(compressAlgo
);
286 if (dataBlockEncodingAlgo
!= null) {
287 columnDescBuilder
.setDataBlockEncoding(dataBlockEncodingAlgo
);
290 columnDescBuilder
.setInMemory(inMemoryCF
);
292 if (cipher
!= null) {
293 byte[] keyBytes
= new byte[cipher
.getKeyLength()];
294 new SecureRandom().nextBytes(keyBytes
);
295 columnDescBuilder
.setEncryptionType(cipher
.getName());
296 columnDescBuilder
.setEncryptionKey(
297 EncryptionUtil
.wrapKey(conf
,
298 User
.getCurrent().getShortName(),
299 new SecretKeySpec(keyBytes
,
302 if (mobThreshold
>= 0) {
303 columnDescBuilder
.setMobEnabled(true);
304 columnDescBuilder
.setMobThreshold(mobThreshold
);
308 admin
.addColumnFamily(tableName
, columnDescBuilder
.build());
310 admin
.modifyColumnFamily(tableName
, columnDescBuilder
.build());
313 LOG
.info("Enabling table " + tableName
);
314 admin
.enableTable(tableName
);
319 protected void addOptions() {
320 addOptNoArg("v", OPT_VERBOSE
, "Will display a full readout of logs, including ZooKeeper");
321 addOptWithArg(OPT_ZK_QUORUM
, "ZK quorum as comma-separated host names " +
322 "without port numbers");
323 addOptWithArg(OPT_ZK_PARENT_NODE
, "name of parent znode in zookeeper");
324 addOptWithArg(OPT_TABLE_NAME
, "The name of the table to read or write");
325 addOptWithArg(OPT_COLUMN_FAMILIES
, "The name of the column families to use separated by comma");
326 addOptWithArg(OPT_WRITE
, OPT_USAGE_LOAD
);
327 addOptWithArg(OPT_READ
, OPT_USAGE_READ
);
328 addOptWithArg(OPT_UPDATE
, OPT_USAGE_UPDATE
);
329 addOptNoArg(OPT_INIT_ONLY
, "Initialize the test table only, don't do any loading");
330 addOptWithArg(OPT_BLOOM
, OPT_USAGE_BLOOM
);
331 addOptWithArg(OPT_BLOOM_PARAM
, "the parameter of bloom filter type");
332 addOptWithArg(OPT_COMPRESSION
, OPT_USAGE_COMPRESSION
);
333 addOptWithArg(HFileTestUtil
.OPT_DATA_BLOCK_ENCODING
, HFileTestUtil
.OPT_DATA_BLOCK_ENCODING_USAGE
);
334 addOptWithArg(OPT_MAX_READ_ERRORS
, "The maximum number of read errors " +
335 "to tolerate before terminating all reader threads. The default is " +
336 MultiThreadedReader
.DEFAULT_MAX_ERRORS
+ ".");
337 addOptWithArg(OPT_MULTIGET
, "Whether to use multi-gets as opposed to " +
338 "separate gets for every column in a row");
339 addOptWithArg(OPT_KEY_WINDOW
, "The 'key window' to maintain between " +
340 "reads and writes for concurrent write/read workload. The default " +
341 "is " + MultiThreadedReader
.DEFAULT_KEY_WINDOW
+ ".");
343 addOptNoArg(OPT_MULTIPUT
, "Whether to use multi-puts as opposed to " +
344 "separate puts for every column in a row");
345 addOptNoArg(OPT_BATCHUPDATE
, "Whether to use batch as opposed to " +
346 "separate updates for every column in a row");
347 addOptNoArg(OPT_INMEMORY
, OPT_USAGE_IN_MEMORY
);
348 addOptWithArg(OPT_GENERATOR
, OPT_GENERATOR_USAGE
);
349 addOptWithArg(OPT_WRITER
, OPT_WRITER_USAGE
);
350 addOptWithArg(OPT_UPDATER
, OPT_UPDATER_USAGE
);
351 addOptWithArg(OPT_READER
, OPT_READER_USAGE
);
353 addOptWithArg(OPT_NUM_KEYS
, "The number of keys to read/write");
354 addOptWithArg(OPT_START_KEY
, "The first key to read/write " +
355 "(a 0-based index). The default value is " +
356 DEFAULT_START_KEY
+ ".");
357 addOptNoArg(OPT_SKIP_INIT
, "Skip the initialization; assume test table "
360 addOptWithArg(NUM_TABLES
,
361 "A positive integer number. When a number n is specified, load test "
362 + "tool will load n table parallely. -tn parameter value becomes "
363 + "table name prefix. Each table name is in format <tn>_1...<tn>_n");
365 addOptWithArg(OPT_ENCRYPTION
, OPT_ENCRYPTION_USAGE
);
366 addOptNoArg(OPT_DEFERRED_LOG_FLUSH
, OPT_DEFERRED_LOG_FLUSH_USAGE
);
367 addOptWithArg(OPT_NUM_REGIONS_PER_SERVER
, OPT_NUM_REGIONS_PER_SERVER_USAGE
);
368 addOptWithArg(OPT_REGION_REPLICATION
, OPT_REGION_REPLICATION_USAGE
);
369 addOptWithArg(OPT_REGION_REPLICA_ID
, OPT_REGION_REPLICA_ID_USAGE
);
370 addOptWithArg(OPT_MOB_THRESHOLD
, OPT_MOB_THRESHOLD_USAGE
);
374 protected CommandLineParser
newParser() {
375 // Commons-CLI lacks the capability to handle combinations of options, so we do it ourselves
376 // Validate in parse() to get helpful error messages instead of exploding in processOptions()
377 return new DefaultParser() {
379 public CommandLine
parse(Options opts
, String
[] args
, Properties props
, boolean stop
)
380 throws ParseException
{
381 CommandLine cl
= super.parse(opts
, args
, props
, stop
);
383 boolean isReadWriteUpdate
= cmd
.hasOption(OPT_READ
)
384 || cmd
.hasOption(OPT_WRITE
)
385 || cmd
.hasOption(OPT_UPDATE
);
386 boolean isInitOnly
= cmd
.hasOption(OPT_INIT_ONLY
);
388 if (!isInitOnly
&& !isReadWriteUpdate
) {
389 throw new MissingOptionException("Must specify either -" + OPT_INIT_ONLY
390 + " or at least one of -" + OPT_READ
+ ", -" + OPT_WRITE
+ ", -" + OPT_UPDATE
);
393 if (isInitOnly
&& isReadWriteUpdate
) {
394 throw new AlreadySelectedException(OPT_INIT_ONLY
+ " cannot be specified with any of -"
395 + OPT_READ
+ ", -" + OPT_WRITE
+ ", -" + OPT_UPDATE
);
398 if (isReadWriteUpdate
&& !cmd
.hasOption(OPT_NUM_KEYS
)) {
399 throw new MissingOptionException(OPT_NUM_KEYS
+ " must be specified in read/write mode.");
408 protected void processOptions(CommandLine cmd
) {
411 tableName
= TableName
.valueOf(cmd
.getOptionValue(OPT_TABLE_NAME
,
412 DEFAULT_TABLE_NAME
));
414 if (cmd
.hasOption(OPT_COLUMN_FAMILIES
)) {
415 String
[] list
= cmd
.getOptionValue(OPT_COLUMN_FAMILIES
).split(",");
416 families
= new byte[list
.length
][];
417 for (int i
= 0; i
< list
.length
; i
++) {
418 families
[i
] = Bytes
.toBytes(list
[i
]);
421 families
= HFileTestUtil
.DEFAULT_COLUMN_FAMILIES
;
424 isVerbose
= cmd
.hasOption(OPT_VERBOSE
);
425 isWrite
= cmd
.hasOption(OPT_WRITE
);
426 isRead
= cmd
.hasOption(OPT_READ
);
427 isUpdate
= cmd
.hasOption(OPT_UPDATE
);
428 isInitOnly
= cmd
.hasOption(OPT_INIT_ONLY
);
429 deferredLogFlush
= cmd
.hasOption(OPT_DEFERRED_LOG_FLUSH
);
432 startKey
= parseLong(cmd
.getOptionValue(OPT_START_KEY
,
433 String
.valueOf(DEFAULT_START_KEY
)), 0, Long
.MAX_VALUE
);
434 long numKeys
= parseLong(cmd
.getOptionValue(OPT_NUM_KEYS
), 1,
435 Long
.MAX_VALUE
- startKey
);
436 endKey
= startKey
+ numKeys
;
437 isSkipInit
= cmd
.hasOption(OPT_SKIP_INIT
);
438 System
.out
.println("Key range: [" + startKey
+ ".." + (endKey
- 1) + "]");
441 parseColumnFamilyOptions(cmd
);
444 String
[] writeOpts
= splitColonSeparated(OPT_WRITE
, 2, 3);
448 maxColsPerKey
= 2 * Integer
.parseInt(writeOpts
[colIndex
++]);
450 parseInt(writeOpts
[colIndex
++], 1, Integer
.MAX_VALUE
);
451 minColDataSize
= avgColDataSize
/ 2;
452 maxColDataSize
= avgColDataSize
* 3 / 2;
454 if (colIndex
< writeOpts
.length
) {
455 numWriterThreads
= getNumThreads(writeOpts
[colIndex
++]);
458 isMultiPut
= cmd
.hasOption(OPT_MULTIPUT
);
461 if (cmd
.hasOption(OPT_MOB_THRESHOLD
)) {
462 mobThreshold
= Integer
.parseInt(cmd
.getOptionValue(OPT_MOB_THRESHOLD
));
465 System
.out
.println("Multi-puts: " + isMultiPut
);
466 System
.out
.println("Columns per key: " + minColsPerKey
+ ".."
468 System
.out
.println("Data size per column: " + minColDataSize
+ ".."
473 String
[] mutateOpts
= splitColonSeparated(OPT_UPDATE
, 1, 3);
475 updatePercent
= parseInt(mutateOpts
[colIndex
++], 0, 100);
476 if (colIndex
< mutateOpts
.length
) {
477 numUpdaterThreads
= getNumThreads(mutateOpts
[colIndex
++]);
479 if (colIndex
< mutateOpts
.length
) {
480 ignoreConflicts
= parseInt(mutateOpts
[colIndex
++], 0, 1) == 1;
483 isBatchUpdate
= cmd
.hasOption(OPT_BATCHUPDATE
);
485 System
.out
.println("Batch updates: " + isBatchUpdate
);
486 System
.out
.println("Percent of keys to update: " + updatePercent
);
487 System
.out
.println("Updater threads: " + numUpdaterThreads
);
488 System
.out
.println("Ignore nonce conflicts: " + ignoreConflicts
);
492 String
[] readOpts
= splitColonSeparated(OPT_READ
, 1, 2);
494 verifyPercent
= parseInt(readOpts
[colIndex
++], 0, 100);
495 if (colIndex
< readOpts
.length
) {
496 numReaderThreads
= getNumThreads(readOpts
[colIndex
++]);
499 if (cmd
.hasOption(OPT_MAX_READ_ERRORS
)) {
500 maxReadErrors
= parseInt(cmd
.getOptionValue(OPT_MAX_READ_ERRORS
),
501 0, Integer
.MAX_VALUE
);
504 if (cmd
.hasOption(OPT_KEY_WINDOW
)) {
505 keyWindow
= parseInt(cmd
.getOptionValue(OPT_KEY_WINDOW
),
506 0, Integer
.MAX_VALUE
);
509 if (cmd
.hasOption(OPT_MULTIGET
)) {
510 multiGetBatchSize
= parseInt(cmd
.getOptionValue(OPT_MULTIGET
),
511 0, Integer
.MAX_VALUE
);
514 System
.out
.println("Multi-gets (value of 1 means no multigets): " + multiGetBatchSize
);
515 System
.out
.println("Percent of keys to verify: " + verifyPercent
);
516 System
.out
.println("Reader threads: " + numReaderThreads
);
520 if (cmd
.hasOption(NUM_TABLES
)) {
521 numTables
= parseInt(cmd
.getOptionValue(NUM_TABLES
), 1, Short
.MAX_VALUE
);
524 numRegionsPerServer
= DEFAULT_NUM_REGIONS_PER_SERVER
;
525 if (cmd
.hasOption(OPT_NUM_REGIONS_PER_SERVER
)) {
526 numRegionsPerServer
= Integer
.parseInt(cmd
.getOptionValue(OPT_NUM_REGIONS_PER_SERVER
));
529 regionReplication
= 1;
530 if (cmd
.hasOption(OPT_REGION_REPLICATION
)) {
531 regionReplication
= Integer
.parseInt(cmd
.getOptionValue(OPT_REGION_REPLICATION
));
534 regionReplicaId
= -1;
535 if (cmd
.hasOption(OPT_REGION_REPLICA_ID
)) {
536 regionReplicaId
= Integer
.parseInt(cmd
.getOptionValue(OPT_REGION_REPLICA_ID
));
540 private void parseColumnFamilyOptions(CommandLine cmd
) {
541 String dataBlockEncodingStr
= cmd
.getOptionValue(HFileTestUtil
.OPT_DATA_BLOCK_ENCODING
);
542 dataBlockEncodingAlgo
= dataBlockEncodingStr
== null ?
null :
543 DataBlockEncoding
.valueOf(dataBlockEncodingStr
);
545 String compressStr
= cmd
.getOptionValue(OPT_COMPRESSION
);
546 compressAlgo
= compressStr
== null ? Compression
.Algorithm
.NONE
:
547 Compression
.Algorithm
.valueOf(compressStr
);
549 String bloomStr
= cmd
.getOptionValue(OPT_BLOOM
);
550 bloomType
= bloomStr
== null ? BloomType
.ROW
:
551 BloomType
.valueOf(bloomStr
);
553 if (bloomType
== BloomType
.ROWPREFIX_FIXED_LENGTH
) {
554 if (!cmd
.hasOption(OPT_BLOOM_PARAM
)) {
555 LOG
.error("the parameter of bloom filter {} is not specified", bloomType
.name());
557 conf
.set(BloomFilterUtil
.PREFIX_LENGTH_KEY
, cmd
.getOptionValue(OPT_BLOOM_PARAM
));
561 inMemoryCF
= cmd
.hasOption(OPT_INMEMORY
);
562 if (cmd
.hasOption(OPT_ENCRYPTION
)) {
563 cipher
= Encryption
.getCipher(conf
, cmd
.getOptionValue(OPT_ENCRYPTION
));
568 public void initTestTable() throws IOException
{
569 Durability durability
= Durability
.USE_DEFAULT
;
570 if (deferredLogFlush
) {
571 durability
= Durability
.ASYNC_WAL
;
574 HBaseTestingUtil
.createPreSplitLoadTestTable(conf
, tableName
,
575 getColumnFamilies(), compressAlgo
, dataBlockEncodingAlgo
, numRegionsPerServer
,
576 regionReplication
, durability
);
577 applyColumnFamilyOptions(tableName
, getColumnFamilies());
581 protected int doWork() throws IOException
{
583 Log4jUtils
.setLogLevel(ZooKeeper
.class.getName(), "WARN");
586 return parallelLoadTables();
592 protected int loadTable() throws IOException
{
593 if (cmd
.hasOption(OPT_ZK_QUORUM
)) {
594 conf
.set(HConstants
.ZOOKEEPER_QUORUM
, cmd
.getOptionValue(OPT_ZK_QUORUM
));
596 if (cmd
.hasOption(OPT_ZK_PARENT_NODE
)) {
597 conf
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, cmd
.getOptionValue(OPT_ZK_PARENT_NODE
));
601 LOG
.info("Initializing only; no reads or writes");
609 LoadTestDataGenerator dataGen
= null;
610 if (cmd
.hasOption(OPT_GENERATOR
)) {
611 String
[] clazzAndArgs
= cmd
.getOptionValue(OPT_GENERATOR
).split(COLON
);
612 dataGen
= getLoadGeneratorInstance(clazzAndArgs
[0]);
614 if (dataGen
instanceof LoadTestDataGeneratorWithACL
) {
615 LOG
.info("Using LoadTestDataGeneratorWithACL");
616 if (User
.isHBaseSecurityEnabled(conf
)) {
617 LOG
.info("Security is enabled");
618 authnFileName
= clazzAndArgs
[1];
619 superUser
= clazzAndArgs
[2];
620 userNames
= clazzAndArgs
[3];
621 args
= Arrays
.copyOfRange(clazzAndArgs
, 2, clazzAndArgs
.length
);
622 Properties authConfig
= new Properties();
623 authConfig
.load(this.getClass().getClassLoader().getResourceAsStream(authnFileName
));
625 addAuthInfoToConf(authConfig
, conf
, superUser
, userNames
);
626 } catch (IOException exp
) {
627 LOG
.error(exp
.toString(), exp
);
630 userOwner
= User
.create(HBaseKerberosUtils
.loginAndReturnUGI(conf
, superUser
));
632 superUser
= clazzAndArgs
[1];
633 userNames
= clazzAndArgs
[2];
634 args
= Arrays
.copyOfRange(clazzAndArgs
, 1, clazzAndArgs
.length
);
635 userOwner
= User
.createUserForTesting(conf
, superUser
, new String
[0]);
638 args
= clazzAndArgs
.length
== 1 ?
new String
[0] : Arrays
.copyOfRange(clazzAndArgs
, 1,
639 clazzAndArgs
.length
);
641 dataGen
.initialize(args
);
643 // Default DataGenerator is MultiThreadedAction.DefaultDataGenerator
644 dataGen
= new MultiThreadedAction
.DefaultDataGenerator(minColDataSize
, maxColDataSize
,
645 minColsPerKey
, maxColsPerKey
, families
);
648 if (userOwner
!= null) {
649 LOG
.info("Granting permissions for user " + userOwner
.getShortName());
650 Permission
.Action
[] actions
= {
651 Permission
.Action
.ADMIN
, Permission
.Action
.CREATE
,
652 Permission
.Action
.READ
, Permission
.Action
.WRITE
};
654 AccessControlClient
.grant(ConnectionFactory
.createConnection(conf
),
655 tableName
, userOwner
.getShortName(), null, null, actions
);
656 } catch (Throwable e
) {
657 LOG
.error(HBaseMarkers
.FATAL
, "Error in granting permission for the user " +
658 userOwner
.getShortName(), e
);
663 if (userNames
!= null) {
664 // This will be comma separated list of expressions.
665 String users
[] = userNames
.split(",");
667 for (String userStr
: users
) {
668 if (User
.isHBaseSecurityEnabled(conf
)) {
669 user
= User
.create(HBaseKerberosUtils
.loginAndReturnUGI(conf
, userStr
));
671 user
= User
.createUserForTesting(conf
, userStr
, new String
[0]);
677 if (userOwner
!= null) {
678 writerThreads
= new MultiThreadedWriterWithACL(dataGen
, conf
, tableName
, userOwner
);
680 String writerClass
= null;
681 if (cmd
.hasOption(OPT_WRITER
)) {
682 writerClass
= cmd
.getOptionValue(OPT_WRITER
);
684 writerClass
= MultiThreadedWriter
.class.getCanonicalName();
687 writerThreads
= getMultiThreadedWriterInstance(writerClass
, dataGen
);
689 writerThreads
.setMultiPut(isMultiPut
);
693 if (userOwner
!= null) {
694 updaterThreads
= new MultiThreadedUpdaterWithACL(dataGen
, conf
, tableName
, updatePercent
,
695 userOwner
, userNames
);
697 String updaterClass
= null;
698 if (cmd
.hasOption(OPT_UPDATER
)) {
699 updaterClass
= cmd
.getOptionValue(OPT_UPDATER
);
701 updaterClass
= MultiThreadedUpdater
.class.getCanonicalName();
703 updaterThreads
= getMultiThreadedUpdaterInstance(updaterClass
, dataGen
);
705 updaterThreads
.setBatchUpdate(isBatchUpdate
);
706 updaterThreads
.setIgnoreNonceConflicts(ignoreConflicts
);
710 if (userOwner
!= null) {
711 readerThreads
= new MultiThreadedReaderWithACL(dataGen
, conf
, tableName
, verifyPercent
,
714 String readerClass
= null;
715 if (cmd
.hasOption(OPT_READER
)) {
716 readerClass
= cmd
.getOptionValue(OPT_READER
);
718 readerClass
= MultiThreadedReader
.class.getCanonicalName();
720 readerThreads
= getMultiThreadedReaderInstance(readerClass
, dataGen
);
722 readerThreads
.setMaxErrors(maxReadErrors
);
723 readerThreads
.setKeyWindow(keyWindow
);
724 readerThreads
.setMultiGetBatchSize(multiGetBatchSize
);
725 readerThreads
.setRegionReplicaId(regionReplicaId
);
728 if (isUpdate
&& isWrite
) {
729 LOG
.info("Concurrent write/update workload: making updaters aware of the " +
731 updaterThreads
.linkToWriter(writerThreads
);
734 if (isRead
&& (isUpdate
|| isWrite
)) {
735 LOG
.info("Concurrent write/read workload: making readers aware of the " +
737 readerThreads
.linkToWriter(isUpdate ? updaterThreads
: writerThreads
);
741 System
.out
.println("Starting to write data...");
742 writerThreads
.start(startKey
, endKey
, numWriterThreads
);
746 LOG
.info("Starting to mutate data...");
747 System
.out
.println("Starting to mutate data...");
748 // TODO : currently append and increment operations not tested with tags
749 // Will update this after it is done
750 updaterThreads
.start(startKey
, endKey
, numUpdaterThreads
);
754 System
.out
.println("Starting to read data...");
755 readerThreads
.start(startKey
, endKey
, numReaderThreads
);
759 writerThreads
.waitForFinish();
763 updaterThreads
.waitForFinish();
767 readerThreads
.waitForFinish();
770 boolean success
= true;
772 success
= success
&& writerThreads
.getNumWriteFailures() == 0;
775 success
= success
&& updaterThreads
.getNumWriteFailures() == 0;
778 success
= success
&& readerThreads
.getNumReadErrors() == 0
779 && readerThreads
.getNumReadFailures() == 0;
781 return success ? EXIT_SUCCESS
: EXIT_FAILURE
;
784 private LoadTestDataGenerator
getLoadGeneratorInstance(String clazzName
) throws IOException
{
786 Class
<?
> clazz
= Class
.forName(clazzName
);
787 Constructor
<?
> constructor
= clazz
.getConstructor(int.class, int.class, int.class, int.class,
789 return (LoadTestDataGenerator
) constructor
.newInstance(minColDataSize
, maxColDataSize
,
790 minColsPerKey
, maxColsPerKey
, families
);
791 } catch (Exception e
) {
792 throw new IOException(e
);
796 private MultiThreadedWriter
getMultiThreadedWriterInstance(String clazzName
797 , LoadTestDataGenerator dataGen
) throws IOException
{
799 Class
<?
> clazz
= Class
.forName(clazzName
);
800 Constructor
<?
> constructor
= clazz
.getConstructor(
801 LoadTestDataGenerator
.class, Configuration
.class, TableName
.class);
802 return (MultiThreadedWriter
) constructor
.newInstance(dataGen
, conf
, tableName
);
803 } catch (Exception e
) {
804 throw new IOException(e
);
808 private MultiThreadedUpdater
getMultiThreadedUpdaterInstance(String clazzName
809 , LoadTestDataGenerator dataGen
) throws IOException
{
811 Class
<?
> clazz
= Class
.forName(clazzName
);
812 Constructor
<?
> constructor
= clazz
.getConstructor(
813 LoadTestDataGenerator
.class, Configuration
.class, TableName
.class, double.class);
814 return (MultiThreadedUpdater
) constructor
.newInstance(
815 dataGen
, conf
, tableName
, updatePercent
);
816 } catch (Exception e
) {
817 throw new IOException(e
);
821 private MultiThreadedReader
getMultiThreadedReaderInstance(String clazzName
822 , LoadTestDataGenerator dataGen
) throws IOException
{
824 Class
<?
> clazz
= Class
.forName(clazzName
);
825 Constructor
<?
> constructor
= clazz
.getConstructor(
826 LoadTestDataGenerator
.class, Configuration
.class, TableName
.class, double.class);
827 return (MultiThreadedReader
) constructor
.newInstance(dataGen
, conf
, tableName
, verifyPercent
);
828 } catch (Exception e
) {
829 throw new IOException(e
);
833 public static void main(String
[] args
) {
834 new LoadTestTool().doStaticMain(args
);
838 * When NUM_TABLES is specified, the function starts multiple worker threads
839 * which individually start a LoadTestTool instance to load a table. Each
840 * table name is in format <tn>_<index>. For example, "-tn test -num_tables 2"
841 * , table names will be "test_1", "test_2"
843 * @throws IOException if one of the load tasks is unable to complete
845 private int parallelLoadTables()
847 // create new command args
848 String tableName
= cmd
.getOptionValue(OPT_TABLE_NAME
, DEFAULT_TABLE_NAME
);
849 String
[] newArgs
= null;
850 if (!cmd
.hasOption(LoadTestTool
.OPT_TABLE_NAME
)) {
851 newArgs
= new String
[cmdLineArgs
.length
+ 2];
852 newArgs
[0] = "-" + LoadTestTool
.OPT_TABLE_NAME
;
853 newArgs
[1] = LoadTestTool
.DEFAULT_TABLE_NAME
;
854 System
.arraycopy(cmdLineArgs
, 0, newArgs
, 2, cmdLineArgs
.length
);
856 newArgs
= cmdLineArgs
;
859 int tableNameValueIndex
= -1;
860 for (int j
= 0; j
< newArgs
.length
; j
++) {
861 if (newArgs
[j
].endsWith(OPT_TABLE_NAME
)) {
862 tableNameValueIndex
= j
+ 1;
863 } else if (newArgs
[j
].endsWith(NUM_TABLES
)) {
864 // change NUM_TABLES to 1 so that each worker loads one table
865 newArgs
[j
+ 1] = "1";
869 // starting to load multiple tables
870 List
<WorkerThread
> workers
= new ArrayList
<>();
871 for (int i
= 0; i
< numTables
; i
++) {
872 String
[] workerArgs
= newArgs
.clone();
873 workerArgs
[tableNameValueIndex
] = tableName
+ "_" + (i
+1);
874 WorkerThread worker
= new WorkerThread(i
, workerArgs
);
876 LOG
.info(worker
+ " starting");
880 // wait for all workers finish
881 LOG
.info("Waiting for worker threads to finish");
882 for (WorkerThread t
: workers
) {
885 } catch (InterruptedException ie
) {
886 IOException iie
= new InterruptedIOException();
896 // If an exception is thrown by one of worker threads, it will be
898 protected AtomicReference
<Throwable
> thrown
= new AtomicReference
<>();
900 private void workerThreadError(Throwable t
) {
901 thrown
.compareAndSet(null, t
);
905 * Check for errors in the writer threads. If any is found, rethrow it.
907 private void checkForErrors() throws IOException
{
908 Throwable thrown
= this.thrown
.get();
909 if (thrown
== null) return;
910 if (thrown
instanceof IOException
) {
911 throw (IOException
) thrown
;
913 throw new RuntimeException(thrown
);
917 class WorkerThread
extends Thread
{
918 private String
[] workerArgs
;
920 WorkerThread(int i
, String
[] args
) {
921 super("WorkerThread-" + i
);
928 int ret
= ToolRunner
.run(HBaseConfiguration
.create(), new LoadTestTool(), workerArgs
);
930 throw new RuntimeException("LoadTestTool exit with non-zero return code.");
932 } catch (Exception ex
) {
933 LOG
.error("Error in worker thread", ex
);
934 workerThreadError(ex
);
939 private void addAuthInfoToConf(Properties authConfig
, Configuration conf
, String owner
,
940 String userList
) throws IOException
{
941 List
<String
> users
= new ArrayList
<>(Arrays
.asList(userList
.split(",")));
943 for (String user
: users
) {
944 String keyTabFileConfKey
= "hbase." + user
+ ".keytab.file";
945 String principalConfKey
= "hbase." + user
+ ".kerberos.principal";
946 if (!authConfig
.containsKey(keyTabFileConfKey
) || !authConfig
.containsKey(principalConfKey
)) {
947 throw new IOException("Authentication configs missing for user : " + user
);
950 for (String key
: authConfig
.stringPropertyNames()) {
951 conf
.set(key
, authConfig
.getProperty(key
));
953 LOG
.debug("Added authentication properties to config successfully.");