2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
17 package org
.apache
.hadoop
.hbase
.util
;
19 import static org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
.INCREMENT
;
20 import static org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
.MUTATE_INFO
;
22 import java
.io
.IOException
;
23 import java
.util
.Arrays
;
24 import java
.util
.Collection
;
25 import java
.util
.HashMap
;
26 import java
.util
.List
;
28 import java
.util
.Random
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
31 import java
.util
.concurrent
.atomic
.AtomicLong
;
32 import org
.apache
.hadoop
.conf
.Configuration
;
33 import org
.apache
.hadoop
.hbase
.Cell
;
34 import org
.apache
.hadoop
.hbase
.CellUtil
;
35 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
36 import org
.apache
.hadoop
.hbase
.TableName
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
39 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
40 import org
.apache
.hadoop
.hbase
.client
.Result
;
41 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
42 import org
.apache
.hadoop
.util
.StringUtils
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
46 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
51 * Common base class for reader and writer parts of multi-thread HBase load
52 * test (See LoadTestTool).
54 public abstract class MultiThreadedAction
{
55 private static final Logger LOG
= LoggerFactory
.getLogger(MultiThreadedAction
.class);
57 protected final TableName tableName
;
58 protected final Configuration conf
;
59 protected final Connection connection
; // all reader / writer threads will share this connection
61 protected int numThreads
= 1;
63 /** The start key of the key range, inclusive */
64 protected long startKey
= 0;
66 /** The end key of the key range, exclusive */
67 protected long endKey
= 1;
69 protected AtomicInteger numThreadsWorking
= new AtomicInteger();
70 protected AtomicLong numKeys
= new AtomicLong();
71 protected AtomicLong numCols
= new AtomicLong();
72 protected AtomicLong totalOpTimeMs
= new AtomicLong();
73 protected boolean verbose
= false;
75 protected LoadTestDataGenerator dataGenerator
= null;
78 * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
79 * set of column families, and random number of columns in range. The table for it can
80 * be created manually or, for example, via
81 * {@link org.apache.hadoop.hbase.HBaseTestingUtil#createPreSplitLoadTestTable(Configuration,
82 * TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm,
83 * org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
85 public static class DefaultDataGenerator
extends LoadTestDataGenerator
{
86 private byte[][] columnFamilies
= null;
87 private int minColumnsPerKey
;
88 private int maxColumnsPerKey
;
89 private final Random random
= new Random();
91 public DefaultDataGenerator(int minValueSize
, int maxValueSize
,
92 int minColumnsPerKey
, int maxColumnsPerKey
, byte[]... columnFamilies
) {
93 super(minValueSize
, maxValueSize
);
94 this.columnFamilies
= columnFamilies
;
95 this.minColumnsPerKey
= minColumnsPerKey
;
96 this.maxColumnsPerKey
= maxColumnsPerKey
;
99 public DefaultDataGenerator(byte[]... columnFamilies
) {
100 // Default values for tests that didn't care to provide theirs.
101 this(256, 1024, 1, 10, columnFamilies
);
105 public byte[] getDeterministicUniqueKey(long keyBase
) {
106 return Bytes
.toBytes(LoadTestKVGenerator
.md5PrefixedKey(keyBase
));
110 public byte[][] getColumnFamilies() {
111 return columnFamilies
;
115 public byte[][] generateColumnsForCf(byte[] rowKey
, byte[] cf
) {
116 int numColumns
= minColumnsPerKey
+ random
.nextInt(maxColumnsPerKey
- minColumnsPerKey
+ 1);
117 byte[][] columns
= new byte[numColumns
][];
118 for (int i
= 0; i
< numColumns
; ++i
) {
119 columns
[i
] = Bytes
.toBytes(Integer
.toString(i
));
125 public byte[] generateValue(byte[] rowKey
, byte[] cf
, byte[] column
) {
126 return kvGenerator
.generateRandomSizeValue(rowKey
, cf
, column
);
130 public boolean verify(byte[] rowKey
, byte[] cf
, byte[] column
, byte[] value
) {
131 return LoadTestKVGenerator
.verify(value
, rowKey
, cf
, column
);
135 public boolean verify(byte[] rowKey
, byte[] cf
, Set
<byte[]> columnSet
) {
136 return (columnSet
.size() >= minColumnsPerKey
) && (columnSet
.size() <= maxColumnsPerKey
);
141 private String actionLetter
;
143 /** Whether we need to print out Hadoop Streaming-style counters */
144 private boolean streamingCounters
;
146 public static final int REPORTING_INTERVAL_MS
= 5000;
148 public MultiThreadedAction(LoadTestDataGenerator dataGen
, Configuration conf
, TableName tableName
,
149 String actionLetter
) throws IOException
{
151 this.dataGenerator
= dataGen
;
152 this.tableName
= tableName
;
153 this.actionLetter
= actionLetter
;
154 this.connection
= ConnectionFactory
.createConnection(conf
);
157 public void start(long startKey
, long endKey
, int numThreads
) throws IOException
{
158 this.startKey
= startKey
;
159 this.endKey
= endKey
;
160 this.numThreads
= numThreads
;
161 (new Thread(new ProgressReporter(actionLetter
),
162 "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager
.currentTime())).start();
165 private static String
formatTime(long elapsedTime
) {
166 String format
= String
.format("%%0%dd", 2);
167 elapsedTime
= elapsedTime
/ 1000;
168 String seconds
= String
.format(format
, elapsedTime
% 60);
169 String minutes
= String
.format(format
, (elapsedTime
% 3600) / 60);
170 String hours
= String
.format(format
, elapsedTime
/ 3600);
171 String time
= hours
+ ":" + minutes
+ ":" + seconds
;
175 /** Asynchronously reports progress */
176 private class ProgressReporter
implements Runnable
{
178 private String reporterId
= "";
180 public ProgressReporter(String id
) {
181 this.reporterId
= id
;
186 long startTime
= EnvironmentEdgeManager
.currentTime();
187 long priorNumKeys
= 0;
188 long priorCumulativeOpTime
= 0;
189 int priorAverageKeysPerSecond
= 0;
191 // Give other threads time to start.
192 Threads
.sleep(REPORTING_INTERVAL_MS
);
194 while (numThreadsWorking
.get() != 0) {
196 "[" + reporterId
+ ":" + numThreadsWorking
.get() + "] ";
197 if (numKeys
.get() == 0) {
198 LOG
.info(threadsLeft
+ "Number of keys = 0");
200 long numKeys
= MultiThreadedAction
.this.numKeys
.get();
201 long time
= EnvironmentEdgeManager
.currentTime() - startTime
;
202 long totalOpTime
= totalOpTimeMs
.get();
204 long numKeysDelta
= numKeys
- priorNumKeys
;
205 long totalOpTimeDelta
= totalOpTime
- priorCumulativeOpTime
;
207 double averageKeysPerSecond
=
208 (time
> 0) ?
(numKeys
* 1000 / time
) : 0;
214 + StringUtils
.humanReadableInt(numCols
.get())
217 + ((numKeys
> 0 && time
> 0) ?
(" Overall: [" + "keys/s= "
218 + numKeys
* 1000 / time
+ ", latency="
219 + String
.format("%.2f", (double)totalOpTime
/ (double)numKeys
)
221 + ((numKeysDelta
> 0) ?
(" Current: [" + "keys/s="
222 + numKeysDelta
* 1000 / REPORTING_INTERVAL_MS
+ ", latency="
223 + String
.format("%.2f", (double)totalOpTimeDelta
/ (double)numKeysDelta
)
227 if (streamingCounters
) {
228 printStreamingCounters(numKeysDelta
,
229 averageKeysPerSecond
- priorAverageKeysPerSecond
);
232 priorNumKeys
= numKeys
;
233 priorCumulativeOpTime
= totalOpTime
;
234 priorAverageKeysPerSecond
= (int) averageKeysPerSecond
;
237 Threads
.sleep(REPORTING_INTERVAL_MS
);
241 private void printStreamingCounters(long numKeysDelta
,
242 double avgKeysPerSecondDelta
) {
243 // Write stats in a format that can be interpreted as counters by
244 // streaming map-reduce jobs.
245 System
.err
.println("reporter:counter:numKeys," + reporterId
+ ","
247 System
.err
.println("reporter:counter:numCols," + reporterId
+ ","
249 System
.err
.println("reporter:counter:avgKeysPerSecond," + reporterId
250 + "," + (long) (avgKeysPerSecondDelta
));
254 public void close() {
255 if (connection
!= null) {
258 } catch (Exception ex
) {
259 LOG
.warn("Could not close the connection: " + ex
);
264 public void waitForFinish() {
265 while (numThreadsWorking
.get() != 0) {
266 Threads
.sleepWithoutInterrupt(1000);
271 public boolean isDone() {
272 return (numThreadsWorking
.get() == 0);
275 protected void startThreads(Collection
<?
extends Thread
> threads
) {
276 numThreadsWorking
.addAndGet(threads
.size());
277 for (Thread thread
: threads
) {
282 /** @return the end key of the key range, exclusive */
283 public long getEndKey() {
287 /** Returns a task-specific progress string */
288 protected abstract String
progressInfo();
290 protected static void appendToStatus(StringBuilder sb
, String desc
,
301 protected static void appendToStatus(StringBuilder sb
, String desc
,
310 * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
311 * Does not verify cf/column integrity.
313 public boolean verifyResultAgainstDataGenerator(Result result
, boolean verifyValues
) {
314 return verifyResultAgainstDataGenerator(result
, verifyValues
, false);
318 * Verifies the result from get or scan using the dataGenerator (that was presumably
319 * also used to generate said result).
320 * @param verifyValues verify that values in the result make sense for row/cf/column combination
321 * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
322 * that to use this multiPut should be used, or verification
323 * has to happen after writes, otherwise there can be races.
324 * @return true if the values of row result makes sense for row/cf/column combination and true if
325 * the cf/column set in the result is complete, false otherwise.
327 public boolean verifyResultAgainstDataGenerator(Result result
, boolean verifyValues
,
328 boolean verifyCfAndColumnIntegrity
) {
329 String rowKeyStr
= Bytes
.toString(result
.getRow());
330 // See if we have any data at all.
331 if (result
.isEmpty()) {
332 LOG
.error("Error checking data for key [" + rowKeyStr
+ "], no data returned");
333 printLocations(result
);
337 if (!verifyValues
&& !verifyCfAndColumnIntegrity
) {
338 return true; // as long as we have something, we are good.
341 // See if we have all the CFs.
342 byte[][] expectedCfs
= dataGenerator
.getColumnFamilies();
343 if (verifyCfAndColumnIntegrity
&& (expectedCfs
.length
!= result
.getMap().size())) {
344 LOG
.error("Error checking data for key [" + rowKeyStr
345 + "], bad family count: " + result
.getMap().size());
346 printLocations(result
);
350 // Verify each column family from get in the result.
351 for (byte[] cf
: result
.getMap().keySet()) {
352 String cfStr
= Bytes
.toString(cf
);
353 Map
<byte[], byte[]> columnValues
= result
.getFamilyMap(cf
);
354 if (columnValues
== null) {
355 LOG
.error("Error checking data for key [" + rowKeyStr
356 + "], no data for family [" + cfStr
+ "]]");
357 printLocations(result
);
361 Map
<String
, MutationType
> mutateInfo
= null;
362 if (verifyCfAndColumnIntegrity
|| verifyValues
) {
363 if (!columnValues
.containsKey(MUTATE_INFO
)) {
364 LOG
.error("Error checking data for key [" + rowKeyStr
+ "], column family ["
365 + cfStr
+ "], column [" + Bytes
.toString(MUTATE_INFO
) + "]; value is not found");
366 printLocations(result
);
370 long cfHash
= Arrays
.hashCode(cf
);
371 // Verify deleted columns, and make up column counts if deleted
372 byte[] mutateInfoValue
= columnValues
.remove(MUTATE_INFO
);
373 mutateInfo
= parseMutateInfo(mutateInfoValue
);
374 for (Map
.Entry
<String
, MutationType
> mutate
: mutateInfo
.entrySet()) {
375 if (mutate
.getValue() == MutationType
.DELETE
) {
376 byte[] column
= Bytes
.toBytes(mutate
.getKey());
377 long columnHash
= Arrays
.hashCode(column
);
378 long hashCode
= cfHash
+ columnHash
;
379 if (hashCode
% 2 == 0) {
380 if (columnValues
.containsKey(column
)) {
381 LOG
.error("Error checking data for key [" + rowKeyStr
+ "], column family ["
382 + cfStr
+ "], column [" + mutate
.getKey() + "]; should be deleted");
383 printLocations(result
);
386 byte[] hashCodeBytes
= Bytes
.toBytes(hashCode
);
387 columnValues
.put(column
, hashCodeBytes
);
393 if (!columnValues
.containsKey(INCREMENT
)) {
394 LOG
.error("Error checking data for key [" + rowKeyStr
+ "], column family ["
395 + cfStr
+ "], column [" + Bytes
.toString(INCREMENT
) + "]; value is not found");
396 printLocations(result
);
399 long currentValue
= Bytes
.toLong(columnValues
.remove(INCREMENT
));
401 long amount
= mutateInfo
.isEmpty() ?
0 : cfHash
;
402 long originalValue
= Arrays
.hashCode(result
.getRow());
403 long extra
= currentValue
- originalValue
;
404 if (extra
!= 0 && (amount
== 0 || extra
% amount
!= 0)) {
405 LOG
.error("Error checking data for key [" + rowKeyStr
+ "], column family ["
406 + cfStr
+ "], column [increment], extra [" + extra
+ "], amount [" + amount
+ "]");
407 printLocations(result
);
410 if (amount
!= 0 && extra
!= amount
) {
411 LOG
.warn("Warning checking data for key [" + rowKeyStr
+ "], column family ["
412 + cfStr
+ "], column [increment], incremented [" + (extra
/ amount
) + "] times");
416 // See if we have correct columns.
417 if (verifyCfAndColumnIntegrity
418 && !dataGenerator
.verify(result
.getRow(), cf
, columnValues
.keySet())) {
420 for (byte[] col
: columnValues
.keySet()) {
421 if (colsStr
.length() > 0) {
424 colsStr
+= "[" + Bytes
.toString(col
) + "]";
426 LOG
.error("Error checking data for key [" + rowKeyStr
427 + "], bad columns for family [" + cfStr
+ "]: " + colsStr
);
428 printLocations(result
);
431 // See if values check out.
433 for (Map
.Entry
<byte[], byte[]> kv
: columnValues
.entrySet()) {
434 String column
= Bytes
.toString(kv
.getKey());
435 MutationType mutation
= mutateInfo
.get(column
);
436 boolean verificationNeeded
= true;
437 byte[] bytes
= kv
.getValue();
438 if (mutation
!= null) {
439 boolean mutationVerified
= true;
440 long columnHash
= Arrays
.hashCode(kv
.getKey());
441 long hashCode
= cfHash
+ columnHash
;
442 byte[] hashCodeBytes
= Bytes
.toBytes(hashCode
);
443 if (mutation
== MutationType
.APPEND
) {
444 int offset
= bytes
.length
- hashCodeBytes
.length
;
445 mutationVerified
= offset
> 0 && Bytes
.equals(hashCodeBytes
,
446 0, hashCodeBytes
.length
, bytes
, offset
, hashCodeBytes
.length
);
447 if (mutationVerified
) {
450 int newOffset
= offset
- hashCodeBytes
.length
;
451 if (newOffset
< 0 || !Bytes
.equals(hashCodeBytes
, 0,
452 hashCodeBytes
.length
, bytes
, newOffset
, hashCodeBytes
.length
)) {
459 LOG
.warn("Warning checking data for key [" + rowKeyStr
+ "], column family ["
460 + cfStr
+ "], column [" + column
+ "], appended [" + n
+ "] times");
462 byte[] dest
= new byte[offset
];
463 System
.arraycopy(bytes
, 0, dest
, 0, offset
);
466 } else if (hashCode
% 2 == 0) { // checkAndPut
467 mutationVerified
= Bytes
.equals(bytes
, hashCodeBytes
);
468 verificationNeeded
= false;
470 if (!mutationVerified
) {
471 LOG
.error("Error checking data for key [" + rowKeyStr
472 + "], mutation checking failed for column family [" + cfStr
+ "], column ["
473 + column
+ "]; mutation [" + mutation
+ "], hashCode ["
474 + hashCode
+ "], verificationNeeded ["
475 + verificationNeeded
+ "]");
476 printLocations(result
);
479 } // end of mutation checking
480 if (verificationNeeded
&&
481 !dataGenerator
.verify(result
.getRow(), cf
, kv
.getKey(), bytes
)) {
482 LOG
.error("Error checking data for key [" + rowKeyStr
+ "], column family ["
483 + cfStr
+ "], column [" + column
+ "], mutation [" + mutation
484 + "]; value of length " + bytes
.length
);
485 printLocations(result
);
495 private void printLocations(Result r
) {
497 LOG
.info("FAILED FOR null Result");
500 LOG
.info("FAILED FOR " + resultToString(r
) + " Stale " + r
.isStale());
501 if (r
.getRow() == null) {
504 try (RegionLocator locator
= connection
.getRegionLocator(tableName
)) {
505 List
<HRegionLocation
> locs
= locator
.getRegionLocations(r
.getRow());
506 for (HRegionLocation h
: locs
) {
507 LOG
.info("LOCATION " + h
);
509 } catch (IOException e
) {
510 LOG
.warn("Couldn't get locations for row " + Bytes
.toString(r
.getRow()));
514 private String
resultToString(Result result
) {
515 StringBuilder sb
= new StringBuilder();
517 if(result
.isEmpty()) {
519 return sb
.toString();
522 boolean moreThanOne
= false;
523 for(Cell cell
: result
.listCells()) {
529 sb
.append(CellUtil
.toString(cell
, true));
532 return sb
.toString();
535 // Parse mutate info into a map of <column name> => <update action>
536 private Map
<String
, MutationType
> parseMutateInfo(byte[] mutateInfo
) {
537 Map
<String
, MutationType
> mi
= new HashMap
<>();
538 if (mutateInfo
!= null) {
539 String mutateInfoStr
= Bytes
.toString(mutateInfo
);
540 String
[] mutations
= mutateInfoStr
.split("#");
541 for (String mutation
: mutations
) {
542 if (mutation
.isEmpty()) continue;
543 Preconditions
.checkArgument(mutation
.contains(":"),
544 "Invalid mutation info " + mutation
);
545 int p
= mutation
.indexOf(":");
546 String column
= mutation
.substring(0, p
);
547 MutationType type
= MutationType
.valueOf(
548 Integer
.parseInt(mutation
.substring(p
+1)));
549 mi
.put(column
, type
);