HBASE-26582 Prune use of Random and SecureRandom objects (#4118)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / util / MultiThreadedReader.java
blob4c046c82870ee94a3074ab1b9eb2150d1ab3d64d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
15 * under the License.
17 package org.apache.hadoop.hbase.util;
19 import java.io.IOException;
20 import java.util.Arrays;
21 import java.util.HashSet;
22 import java.util.Random;
23 import java.util.Set;
24 import java.util.concurrent.ThreadLocalRandom;
25 import java.util.concurrent.atomic.AtomicLong;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HRegionLocation;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.client.Consistency;
30 import org.apache.hadoop.hbase.client.Get;
31 import org.apache.hadoop.hbase.client.RegionLocator;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Table;
34 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 /** Creates multiple threads that read and verify previously written data */
39 public class MultiThreadedReader extends MultiThreadedAction
41 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedReader.class);
43 protected Set<HBaseReaderThread> readers = new HashSet<>();
44 private final double verifyPercent;
45 protected volatile boolean aborted;
47 protected MultiThreadedWriterBase writer = null;
49 /**
50 * The number of keys verified in a sequence. This will never be larger than
51 * the total number of keys in the range. The reader might also verify
52 * random keys when it catches up with the writer.
54 private final AtomicLong numUniqueKeysVerified = new AtomicLong();
56 /**
57 * Default maximum number of read errors to tolerate before shutting down all
58 * readers.
60 public static final int DEFAULT_MAX_ERRORS = 10;
62 /**
63 * Default "window" size between the last key written by the writer and the
64 * key that we attempt to read. The lower this number, the stricter our
65 * testing is. If this is zero, we always attempt to read the highest key
66 * in the contiguous sequence of keys written by the writers.
68 public static final int DEFAULT_KEY_WINDOW = 0;
70 /**
71 * Default batch size for multigets
73 public static final int DEFAULT_BATCH_SIZE = 1; //translates to simple GET (no multi GET)
75 protected AtomicLong numKeysVerified = new AtomicLong(0);
76 protected AtomicLong numReadErrors = new AtomicLong(0);
77 protected AtomicLong numReadFailures = new AtomicLong(0);
78 protected AtomicLong nullResult = new AtomicLong(0);
79 private int maxErrors = DEFAULT_MAX_ERRORS;
80 private int keyWindow = DEFAULT_KEY_WINDOW;
81 private int batchSize = DEFAULT_BATCH_SIZE;
82 private int regionReplicaId = -1; // particular region replica id to do reads against if set
84 public MultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf,
85 TableName tableName, double verifyPercent) throws IOException {
86 super(dataGen, conf, tableName, "R");
87 this.verifyPercent = verifyPercent;
90 public void linkToWriter(MultiThreadedWriterBase writer) {
91 this.writer = writer;
92 writer.setTrackWroteKeys(true);
95 public void setMaxErrors(int maxErrors) {
96 this.maxErrors = maxErrors;
99 public void setKeyWindow(int keyWindow) {
100 this.keyWindow = keyWindow;
103 public void setMultiGetBatchSize(int batchSize) {
104 this.batchSize = batchSize;
107 public void setRegionReplicaId(int regionReplicaId) {
108 this.regionReplicaId = regionReplicaId;
111 @Override
112 public void start(long startKey, long endKey, int numThreads) throws IOException {
113 super.start(startKey, endKey, numThreads);
114 if (verbose) {
115 LOG.debug("Reading keys [" + startKey + ", " + endKey + ")");
118 addReaderThreads(numThreads);
119 startThreads(readers);
122 protected void addReaderThreads(int numThreads) throws IOException {
123 for (int i = 0; i < numThreads; ++i) {
124 HBaseReaderThread reader = createReaderThread(i);
125 readers.add(reader);
129 protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
130 HBaseReaderThread reader = new HBaseReaderThread(readerId);
131 Threads.setLoggingUncaughtExceptionHandler(reader);
132 return reader;
135 public class HBaseReaderThread extends Thread {
136 protected final int readerId;
137 protected final Table table;
139 /** The "current" key being read. Increases from startKey to endKey. */
140 private long curKey;
142 /** Time when the thread started */
143 protected long startTimeMs;
145 /** If we are ahead of the writer and reading a random key. */
146 private boolean readingRandomKey;
148 private boolean printExceptionTrace = true;
151 * @param readerId only the keys with this remainder from division by
152 * {@link #numThreads} will be read by this thread
154 public HBaseReaderThread(int readerId) throws IOException {
155 this.readerId = readerId;
156 table = createTable();
157 setName(getClass().getSimpleName() + "_" + readerId);
160 protected Table createTable() throws IOException {
161 return connection.getTable(tableName);
164 @Override
165 public void run() {
166 try {
167 runReader();
168 } finally {
169 closeTable();
170 numThreadsWorking.decrementAndGet();
174 protected void closeTable() {
175 try {
176 if (table != null) {
177 table.close();
179 } catch (IOException e) {
180 LOG.error("Error closing table", e);
184 private void runReader() {
185 if (verbose) {
186 LOG.info("Started thread #" + readerId + " for reads...");
189 startTimeMs = EnvironmentEdgeManager.currentTime();
190 curKey = startKey;
191 long [] keysForThisReader = new long[batchSize];
192 while (curKey < endKey && !aborted) {
193 int readingRandomKeyStartIndex = -1;
194 int numKeys = 0;
195 // if multiGet, loop until we have the number of keys equal to the batch size
196 do {
197 long k = getNextKeyToRead();
198 if (k < startKey || k >= endKey) {
199 numReadErrors.incrementAndGet();
200 throw new AssertionError("Load tester logic error: proposed key " +
201 "to read " + k + " is out of range (startKey=" + startKey +
202 ", endKey=" + endKey + ")");
204 if (k % numThreads != readerId || (writer != null && writer.failedToWriteKey(k))) {
205 // Skip keys that this thread should not read, as well as the keys
206 // that we know the writer failed to write.
207 continue;
209 keysForThisReader[numKeys] = k;
210 if (readingRandomKey && readingRandomKeyStartIndex == -1) {
211 //store the first index of a random read
212 readingRandomKeyStartIndex = numKeys;
214 numKeys++;
215 } while (numKeys < batchSize && curKey < endKey && !aborted);
217 if (numKeys > 0) { //meaning there is some key to read
218 readKey(keysForThisReader);
219 // We have verified some unique key(s).
220 numUniqueKeysVerified.getAndAdd(readingRandomKeyStartIndex == -1 ?
221 numKeys : readingRandomKeyStartIndex);
227 * Should only be used for the concurrent writer/reader workload. The
228 * maximum key we are allowed to read, subject to the "key window"
229 * constraint.
231 private long maxKeyWeCanRead() {
232 long insertedUpToKey = writer.wroteUpToKey();
233 if (insertedUpToKey >= endKey - 1) {
234 // The writer has finished writing our range, so we can read any
235 // key in the range.
236 return endKey - 1;
238 return Math.min(endKey - 1, writer.wroteUpToKey() - keyWindow);
241 protected long getNextKeyToRead() {
242 readingRandomKey = false;
243 if (writer == null || curKey <= maxKeyWeCanRead()) {
244 return curKey++;
247 // We caught up with the writer. See if we can read any keys at all.
248 long maxKeyToRead;
249 while ((maxKeyToRead = maxKeyWeCanRead()) < startKey) {
250 // The writer has not written sufficient keys for us to be able to read
251 // anything at all. Sleep a bit. This should only happen in the
252 // beginning of a load test run.
253 Threads.sleepWithoutInterrupt(50);
256 if (curKey <= maxKeyToRead) {
257 // The writer wrote some keys, and we are now allowed to read our
258 // current key.
259 return curKey++;
262 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
263 // Don't increment the current key -- we still have to try reading it
264 // later. Set a flag to make sure that we don't count this key towards
265 // the set of unique keys we have verified.
266 readingRandomKey = true;
267 return startKey + Math.abs(ThreadLocalRandom.current().nextLong())
268 % (maxKeyToRead - startKey + 1);
271 private Get[] readKey(long[] keysToRead) {
272 Random rand = ThreadLocalRandom.current();
273 Get [] gets = new Get[keysToRead.length];
274 int i = 0;
275 for (long keyToRead : keysToRead) {
276 try {
277 gets[i] = createGet(keyToRead);
278 if (keysToRead.length == 1) {
279 queryKey(gets[i], rand.nextInt(100) < verifyPercent, keyToRead);
281 i++;
282 } catch (IOException e) {
283 numReadFailures.addAndGet(1);
284 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
285 + ", time from start: "
286 + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
287 if (printExceptionTrace) {
288 LOG.warn(e.toString(), e);
289 printExceptionTrace = false;
293 if (keysToRead.length > 1) {
294 try {
295 queryKey(gets, rand.nextInt(100) < verifyPercent, keysToRead);
296 } catch (IOException e) {
297 numReadFailures.addAndGet(gets.length);
298 for (long keyToRead : keysToRead) {
299 LOG.debug("[" + readerId + "] FAILED read, key = " + (keyToRead + "")
300 + ", time from start: "
301 + (EnvironmentEdgeManager.currentTime() - startTimeMs) + " ms");
303 if (printExceptionTrace) {
304 LOG.warn(e.toString(), e);
305 printExceptionTrace = false;
309 return gets;
312 protected Get createGet(long keyToRead) throws IOException {
313 Get get = new Get(dataGenerator.getDeterministicUniqueKey(keyToRead));
314 String cfsString = "";
315 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
316 for (byte[] cf : columnFamilies) {
317 get.addFamily(cf);
318 if (verbose) {
319 if (cfsString.length() > 0) {
320 cfsString += ", ";
322 cfsString += "[" + Bytes.toStringBinary(cf) + "]";
325 get = dataGenerator.beforeGet(keyToRead, get);
326 if (regionReplicaId > 0) {
327 get.setReplicaId(regionReplicaId);
328 get.setConsistency(Consistency.TIMELINE);
330 if (verbose) {
331 LOG.info("[" + readerId + "] " + "Querying key " + keyToRead + ", cfs " + cfsString);
333 return get;
336 public void queryKey(Get[] gets, boolean verify, long[] keysToRead) throws IOException {
337 // read the data
338 long start = System.nanoTime();
339 // Uses multi/batch gets
340 Result[] results = table.get(Arrays.asList(gets));
341 long end = System.nanoTime();
342 verifyResultsAndUpdateMetrics(verify, gets, end - start, results, table, false);
345 public void queryKey(Get get, boolean verify, long keyToRead) throws IOException {
346 // read the data
348 long start = System.nanoTime();
349 // Uses simple get
350 Result result = table.get(get);
351 long end = System.nanoTime();
352 verifyResultsAndUpdateMetrics(verify, get, end - start, result, table, false);
355 protected void verifyResultsAndUpdateMetrics(boolean verify, Get[] gets, long elapsedNano,
356 Result[] results, Table table, boolean isNullExpected)
357 throws IOException {
358 totalOpTimeMs.addAndGet(elapsedNano / 1000000);
359 numKeys.addAndGet(gets.length);
360 int i = 0;
361 for (Result result : results) {
362 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify, gets[i++], result, table,
363 isNullExpected);
367 protected void verifyResultsAndUpdateMetrics(boolean verify, Get get, long elapsedNano,
368 Result result, Table table, boolean isNullExpected)
369 throws IOException {
370 verifyResultsAndUpdateMetrics(verify, new Get[]{get}, elapsedNano,
371 new Result[]{result}, table, isNullExpected);
374 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify, Get get,
375 Result result, Table table, boolean isNullExpected) throws IOException {
376 if (!result.isEmpty()) {
377 if (verify) {
378 numKeysVerified.incrementAndGet();
380 } else {
381 HRegionLocation hloc;
382 try (RegionLocator locator = connection.getRegionLocator(tableName)) {
383 hloc = locator.getRegionLocation(get.getRow());
385 String rowKey = Bytes.toString(get.getRow());
386 LOG.info("Key = " + rowKey + ", Region location: " + hloc);
387 if(isNullExpected) {
388 nullResult.incrementAndGet();
389 LOG.debug("Null result obtained for the key ="+rowKey);
390 return;
393 boolean isOk = verifyResultAgainstDataGenerator(result, verify, false);
394 long numErrorsAfterThis = 0;
395 if (isOk) {
396 long cols = 0;
397 // Count the columns for reporting purposes.
398 for (byte[] cf : result.getMap().keySet()) {
399 cols += result.getFamilyMap(cf).size();
401 numCols.addAndGet(cols);
402 } else {
403 if (writer != null) {
404 LOG.error("At the time of failure, writer wrote " + writer.numKeys.get() + " keys");
406 numErrorsAfterThis = numReadErrors.incrementAndGet();
409 if (numErrorsAfterThis > maxErrors) {
410 LOG.error("Aborting readers -- found more than " + maxErrors + " errors");
411 aborted = true;
416 public long getNumReadFailures() {
417 return numReadFailures.get();
420 public long getNumReadErrors() {
421 return numReadErrors.get();
424 public long getNumKeysVerified() {
425 return numKeysVerified.get();
428 public long getNumUniqueKeysVerified() {
429 return numUniqueKeysVerified.get();
432 public long getNullResultsCount() {
433 return nullResult.get();
436 @Override
437 protected String progressInfo() {
438 StringBuilder sb = new StringBuilder();
439 appendToStatus(sb, "verified", numKeysVerified.get());
440 appendToStatus(sb, "READ FAILURES", numReadFailures.get());
441 appendToStatus(sb, "READ ERRORS", numReadErrors.get());
442 appendToStatus(sb, "NULL RESULT", nullResult.get());
443 return sb.toString();