HBASE-26416 Implement a new method for region replication instead of using replay...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / util / MultiThreadedAction.java
blob19461434e894f38fa09d74ecc368d8c60e42f271
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
17 package org.apache.hadoop.hbase.util;
19 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
20 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
22 import java.io.IOException;
23 import java.util.Arrays;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Random;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.hbase.Cell;
34 import org.apache.hadoop.hbase.CellUtil;
35 import org.apache.hadoop.hbase.HRegionLocation;
36 import org.apache.hadoop.hbase.TableName;
37 import org.apache.hadoop.hbase.client.Connection;
38 import org.apache.hadoop.hbase.client.ConnectionFactory;
39 import org.apache.hadoop.hbase.client.RegionLocator;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
42 import org.apache.hadoop.util.StringUtils;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
48 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
50 /**
51 * Common base class for reader and writer parts of multi-thread HBase load
52 * test (See LoadTestTool).
54 public abstract class MultiThreadedAction {
55 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedAction.class);
57 protected final TableName tableName;
58 protected final Configuration conf;
59 protected final Connection connection; // all reader / writer threads will share this connection
61 protected int numThreads = 1;
63 /** The start key of the key range, inclusive */
64 protected long startKey = 0;
66 /** The end key of the key range, exclusive */
67 protected long endKey = 1;
69 protected AtomicInteger numThreadsWorking = new AtomicInteger();
70 protected AtomicLong numKeys = new AtomicLong();
71 protected AtomicLong numCols = new AtomicLong();
72 protected AtomicLong totalOpTimeMs = new AtomicLong();
73 protected boolean verbose = false;
75 protected LoadTestDataGenerator dataGenerator = null;
77 /**
78 * Default implementation of LoadTestDataGenerator that uses LoadTestKVGenerator, fixed
79 * set of column families, and random number of columns in range. The table for it can
80 * be created manually or, for example, via
81 * {@link org.apache.hadoop.hbase.HBaseTestingUtil#createPreSplitLoadTestTable(Configuration,
82 * TableName, byte[], org.apache.hadoop.hbase.io.compress.Compression.Algorithm,
83 * org.apache.hadoop.hbase.io.encoding.DataBlockEncoding)}
85 public static class DefaultDataGenerator extends LoadTestDataGenerator {
86 private byte[][] columnFamilies = null;
87 private int minColumnsPerKey;
88 private int maxColumnsPerKey;
89 private final Random random = new Random();
91 public DefaultDataGenerator(int minValueSize, int maxValueSize,
92 int minColumnsPerKey, int maxColumnsPerKey, byte[]... columnFamilies) {
93 super(minValueSize, maxValueSize);
94 this.columnFamilies = columnFamilies;
95 this.minColumnsPerKey = minColumnsPerKey;
96 this.maxColumnsPerKey = maxColumnsPerKey;
99 public DefaultDataGenerator(byte[]... columnFamilies) {
100 // Default values for tests that didn't care to provide theirs.
101 this(256, 1024, 1, 10, columnFamilies);
104 @Override
105 public byte[] getDeterministicUniqueKey(long keyBase) {
106 return Bytes.toBytes(LoadTestKVGenerator.md5PrefixedKey(keyBase));
109 @Override
110 public byte[][] getColumnFamilies() {
111 return columnFamilies;
114 @Override
115 public byte[][] generateColumnsForCf(byte[] rowKey, byte[] cf) {
116 int numColumns = minColumnsPerKey + random.nextInt(maxColumnsPerKey - minColumnsPerKey + 1);
117 byte[][] columns = new byte[numColumns][];
118 for (int i = 0; i < numColumns; ++i) {
119 columns[i] = Bytes.toBytes(Integer.toString(i));
121 return columns;
124 @Override
125 public byte[] generateValue(byte[] rowKey, byte[] cf, byte[] column) {
126 return kvGenerator.generateRandomSizeValue(rowKey, cf, column);
129 @Override
130 public boolean verify(byte[] rowKey, byte[] cf, byte[] column, byte[] value) {
131 return LoadTestKVGenerator.verify(value, rowKey, cf, column);
134 @Override
135 public boolean verify(byte[] rowKey, byte[] cf, Set<byte[]> columnSet) {
136 return (columnSet.size() >= minColumnsPerKey) && (columnSet.size() <= maxColumnsPerKey);
140 /** "R" or "W" */
141 private String actionLetter;
143 /** Whether we need to print out Hadoop Streaming-style counters */
144 private boolean streamingCounters;
146 public static final int REPORTING_INTERVAL_MS = 5000;
148 public MultiThreadedAction(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName,
149 String actionLetter) throws IOException {
150 this.conf = conf;
151 this.dataGenerator = dataGen;
152 this.tableName = tableName;
153 this.actionLetter = actionLetter;
154 this.connection = ConnectionFactory.createConnection(conf);
157 public void start(long startKey, long endKey, int numThreads) throws IOException {
158 this.startKey = startKey;
159 this.endKey = endKey;
160 this.numThreads = numThreads;
161 (new Thread(new ProgressReporter(actionLetter),
162 "MultiThreadedAction-ProgressReporter-" + EnvironmentEdgeManager.currentTime())).start();
165 private static String formatTime(long elapsedTime) {
166 String format = String.format("%%0%dd", 2);
167 elapsedTime = elapsedTime / 1000;
168 String seconds = String.format(format, elapsedTime % 60);
169 String minutes = String.format(format, (elapsedTime % 3600) / 60);
170 String hours = String.format(format, elapsedTime / 3600);
171 String time = hours + ":" + minutes + ":" + seconds;
172 return time;
175 /** Asynchronously reports progress */
176 private class ProgressReporter implements Runnable {
178 private String reporterId = "";
180 public ProgressReporter(String id) {
181 this.reporterId = id;
184 @Override
185 public void run() {
186 long startTime = EnvironmentEdgeManager.currentTime();
187 long priorNumKeys = 0;
188 long priorCumulativeOpTime = 0;
189 int priorAverageKeysPerSecond = 0;
191 // Give other threads time to start.
192 Threads.sleep(REPORTING_INTERVAL_MS);
194 while (numThreadsWorking.get() != 0) {
195 String threadsLeft =
196 "[" + reporterId + ":" + numThreadsWorking.get() + "] ";
197 if (numKeys.get() == 0) {
198 LOG.info(threadsLeft + "Number of keys = 0");
199 } else {
200 long numKeys = MultiThreadedAction.this.numKeys.get();
201 long time = EnvironmentEdgeManager.currentTime() - startTime;
202 long totalOpTime = totalOpTimeMs.get();
204 long numKeysDelta = numKeys - priorNumKeys;
205 long totalOpTimeDelta = totalOpTime - priorCumulativeOpTime;
207 double averageKeysPerSecond =
208 (time > 0) ? (numKeys * 1000 / time) : 0;
210 LOG.info(threadsLeft
211 + "Keys="
212 + numKeys
213 + ", cols="
214 + StringUtils.humanReadableInt(numCols.get())
215 + ", time="
216 + formatTime(time)
217 + ((numKeys > 0 && time > 0) ? (" Overall: [" + "keys/s= "
218 + numKeys * 1000 / time + ", latency="
219 + String.format("%.2f", (double)totalOpTime / (double)numKeys)
220 + " ms]") : "")
221 + ((numKeysDelta > 0) ? (" Current: [" + "keys/s="
222 + numKeysDelta * 1000 / REPORTING_INTERVAL_MS + ", latency="
223 + String.format("%.2f", (double)totalOpTimeDelta / (double)numKeysDelta)
224 + " ms]") : "")
225 + progressInfo());
227 if (streamingCounters) {
228 printStreamingCounters(numKeysDelta,
229 averageKeysPerSecond - priorAverageKeysPerSecond);
232 priorNumKeys = numKeys;
233 priorCumulativeOpTime = totalOpTime;
234 priorAverageKeysPerSecond = (int) averageKeysPerSecond;
237 Threads.sleep(REPORTING_INTERVAL_MS);
241 private void printStreamingCounters(long numKeysDelta,
242 double avgKeysPerSecondDelta) {
243 // Write stats in a format that can be interpreted as counters by
244 // streaming map-reduce jobs.
245 System.err.println("reporter:counter:numKeys," + reporterId + ","
246 + numKeysDelta);
247 System.err.println("reporter:counter:numCols," + reporterId + ","
248 + numCols.get());
249 System.err.println("reporter:counter:avgKeysPerSecond," + reporterId
250 + "," + (long) (avgKeysPerSecondDelta));
254 public void close() {
255 if (connection != null) {
256 try {
257 connection.close();
258 } catch (Exception ex) {
259 LOG.warn("Could not close the connection: " + ex);
264 public void waitForFinish() {
265 while (numThreadsWorking.get() != 0) {
266 Threads.sleepWithoutInterrupt(1000);
268 close();
271 public boolean isDone() {
272 return (numThreadsWorking.get() == 0);
275 protected void startThreads(Collection<? extends Thread> threads) {
276 numThreadsWorking.addAndGet(threads.size());
277 for (Thread thread : threads) {
278 thread.start();
282 /** @return the end key of the key range, exclusive */
283 public long getEndKey() {
284 return endKey;
287 /** Returns a task-specific progress string */
288 protected abstract String progressInfo();
290 protected static void appendToStatus(StringBuilder sb, String desc,
291 long v) {
292 if (v == 0) {
293 return;
295 sb.append(", ");
296 sb.append(desc);
297 sb.append("=");
298 sb.append(v);
301 protected static void appendToStatus(StringBuilder sb, String desc,
302 String v) {
303 sb.append(", ");
304 sb.append(desc);
305 sb.append("=");
306 sb.append(v);
310 * See {@link #verifyResultAgainstDataGenerator(Result, boolean, boolean)}.
311 * Does not verify cf/column integrity.
313 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues) {
314 return verifyResultAgainstDataGenerator(result, verifyValues, false);
318 * Verifies the result from get or scan using the dataGenerator (that was presumably
319 * also used to generate said result).
320 * @param verifyValues verify that values in the result make sense for row/cf/column combination
321 * @param verifyCfAndColumnIntegrity verify that cf/column set in the result is complete. Note
322 * that to use this multiPut should be used, or verification
323 * has to happen after writes, otherwise there can be races.
324 * @return true if the values of row result makes sense for row/cf/column combination and true if
325 * the cf/column set in the result is complete, false otherwise.
327 public boolean verifyResultAgainstDataGenerator(Result result, boolean verifyValues,
328 boolean verifyCfAndColumnIntegrity) {
329 String rowKeyStr = Bytes.toString(result.getRow());
330 // See if we have any data at all.
331 if (result.isEmpty()) {
332 LOG.error("Error checking data for key [" + rowKeyStr + "], no data returned");
333 printLocations(result);
334 return false;
337 if (!verifyValues && !verifyCfAndColumnIntegrity) {
338 return true; // as long as we have something, we are good.
341 // See if we have all the CFs.
342 byte[][] expectedCfs = dataGenerator.getColumnFamilies();
343 if (verifyCfAndColumnIntegrity && (expectedCfs.length != result.getMap().size())) {
344 LOG.error("Error checking data for key [" + rowKeyStr
345 + "], bad family count: " + result.getMap().size());
346 printLocations(result);
347 return false;
350 // Verify each column family from get in the result.
351 for (byte[] cf : result.getMap().keySet()) {
352 String cfStr = Bytes.toString(cf);
353 Map<byte[], byte[]> columnValues = result.getFamilyMap(cf);
354 if (columnValues == null) {
355 LOG.error("Error checking data for key [" + rowKeyStr
356 + "], no data for family [" + cfStr + "]]");
357 printLocations(result);
358 return false;
361 Map<String, MutationType> mutateInfo = null;
362 if (verifyCfAndColumnIntegrity || verifyValues) {
363 if (!columnValues.containsKey(MUTATE_INFO)) {
364 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
365 + cfStr + "], column [" + Bytes.toString(MUTATE_INFO) + "]; value is not found");
366 printLocations(result);
367 return false;
370 long cfHash = Arrays.hashCode(cf);
371 // Verify deleted columns, and make up column counts if deleted
372 byte[] mutateInfoValue = columnValues.remove(MUTATE_INFO);
373 mutateInfo = parseMutateInfo(mutateInfoValue);
374 for (Map.Entry<String, MutationType> mutate: mutateInfo.entrySet()) {
375 if (mutate.getValue() == MutationType.DELETE) {
376 byte[] column = Bytes.toBytes(mutate.getKey());
377 long columnHash = Arrays.hashCode(column);
378 long hashCode = cfHash + columnHash;
379 if (hashCode % 2 == 0) {
380 if (columnValues.containsKey(column)) {
381 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
382 + cfStr + "], column [" + mutate.getKey() + "]; should be deleted");
383 printLocations(result);
384 return false;
386 byte[] hashCodeBytes = Bytes.toBytes(hashCode);
387 columnValues.put(column, hashCodeBytes);
392 // Verify increment
393 if (!columnValues.containsKey(INCREMENT)) {
394 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
395 + cfStr + "], column [" + Bytes.toString(INCREMENT) + "]; value is not found");
396 printLocations(result);
397 return false;
399 long currentValue = Bytes.toLong(columnValues.remove(INCREMENT));
400 if (verifyValues) {
401 long amount = mutateInfo.isEmpty() ? 0 : cfHash;
402 long originalValue = Arrays.hashCode(result.getRow());
403 long extra = currentValue - originalValue;
404 if (extra != 0 && (amount == 0 || extra % amount != 0)) {
405 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
406 + cfStr + "], column [increment], extra [" + extra + "], amount [" + amount + "]");
407 printLocations(result);
408 return false;
410 if (amount != 0 && extra != amount) {
411 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
412 + cfStr + "], column [increment], incremented [" + (extra / amount) + "] times");
416 // See if we have correct columns.
417 if (verifyCfAndColumnIntegrity
418 && !dataGenerator.verify(result.getRow(), cf, columnValues.keySet())) {
419 String colsStr = "";
420 for (byte[] col : columnValues.keySet()) {
421 if (colsStr.length() > 0) {
422 colsStr += ", ";
424 colsStr += "[" + Bytes.toString(col) + "]";
426 LOG.error("Error checking data for key [" + rowKeyStr
427 + "], bad columns for family [" + cfStr + "]: " + colsStr);
428 printLocations(result);
429 return false;
431 // See if values check out.
432 if (verifyValues) {
433 for (Map.Entry<byte[], byte[]> kv : columnValues.entrySet()) {
434 String column = Bytes.toString(kv.getKey());
435 MutationType mutation = mutateInfo.get(column);
436 boolean verificationNeeded = true;
437 byte[] bytes = kv.getValue();
438 if (mutation != null) {
439 boolean mutationVerified = true;
440 long columnHash = Arrays.hashCode(kv.getKey());
441 long hashCode = cfHash + columnHash;
442 byte[] hashCodeBytes = Bytes.toBytes(hashCode);
443 if (mutation == MutationType.APPEND) {
444 int offset = bytes.length - hashCodeBytes.length;
445 mutationVerified = offset > 0 && Bytes.equals(hashCodeBytes,
446 0, hashCodeBytes.length, bytes, offset, hashCodeBytes.length);
447 if (mutationVerified) {
448 int n = 1;
449 while (true) {
450 int newOffset = offset - hashCodeBytes.length;
451 if (newOffset < 0 || !Bytes.equals(hashCodeBytes, 0,
452 hashCodeBytes.length, bytes, newOffset, hashCodeBytes.length)) {
453 break;
455 offset = newOffset;
456 n++;
458 if (n > 1) {
459 LOG.warn("Warning checking data for key [" + rowKeyStr + "], column family ["
460 + cfStr + "], column [" + column + "], appended [" + n + "] times");
462 byte[] dest = new byte[offset];
463 System.arraycopy(bytes, 0, dest, 0, offset);
464 bytes = dest;
466 } else if (hashCode % 2 == 0) { // checkAndPut
467 mutationVerified = Bytes.equals(bytes, hashCodeBytes);
468 verificationNeeded = false;
470 if (!mutationVerified) {
471 LOG.error("Error checking data for key [" + rowKeyStr
472 + "], mutation checking failed for column family [" + cfStr + "], column ["
473 + column + "]; mutation [" + mutation + "], hashCode ["
474 + hashCode + "], verificationNeeded ["
475 + verificationNeeded + "]");
476 printLocations(result);
477 return false;
479 } // end of mutation checking
480 if (verificationNeeded &&
481 !dataGenerator.verify(result.getRow(), cf, kv.getKey(), bytes)) {
482 LOG.error("Error checking data for key [" + rowKeyStr + "], column family ["
483 + cfStr + "], column [" + column + "], mutation [" + mutation
484 + "]; value of length " + bytes.length);
485 printLocations(result);
486 return false;
492 return true;
495 private void printLocations(Result r) {
496 if (r == null) {
497 LOG.info("FAILED FOR null Result");
498 return;
500 LOG.info("FAILED FOR " + resultToString(r) + " Stale " + r.isStale());
501 if (r.getRow() == null) {
502 return;
504 try (RegionLocator locator = connection.getRegionLocator(tableName)) {
505 List<HRegionLocation> locs = locator.getRegionLocations(r.getRow());
506 for (HRegionLocation h : locs) {
507 LOG.info("LOCATION " + h);
509 } catch (IOException e) {
510 LOG.warn("Couldn't get locations for row " + Bytes.toString(r.getRow()));
514 private String resultToString(Result result) {
515 StringBuilder sb = new StringBuilder();
516 sb.append("cells=");
517 if(result.isEmpty()) {
518 sb.append("NONE");
519 return sb.toString();
521 sb.append("{");
522 boolean moreThanOne = false;
523 for(Cell cell : result.listCells()) {
524 if(moreThanOne) {
525 sb.append(", ");
526 } else {
527 moreThanOne = true;
529 sb.append(CellUtil.toString(cell, true));
531 sb.append("}");
532 return sb.toString();
535 // Parse mutate info into a map of <column name> => <update action>
536 private Map<String, MutationType> parseMutateInfo(byte[] mutateInfo) {
537 Map<String, MutationType> mi = new HashMap<>();
538 if (mutateInfo != null) {
539 String mutateInfoStr = Bytes.toString(mutateInfo);
540 String[] mutations = mutateInfoStr.split("#");
541 for (String mutation: mutations) {
542 if (mutation.isEmpty()) continue;
543 Preconditions.checkArgument(mutation.contains(":"),
544 "Invalid mutation info " + mutation);
545 int p = mutation.indexOf(":");
546 String column = mutation.substring(0, p);
547 MutationType type = MutationType.valueOf(
548 Integer.parseInt(mutation.substring(p+1)));
549 mi.put(column, type);
552 return mi;