2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations
17 package org
.apache
.hadoop
.hbase
.util
;
19 import java
.io
.IOException
;
20 import java
.util
.Arrays
;
21 import java
.util
.HashSet
;
22 import java
.util
.Random
;
24 import java
.util
.concurrent
.ThreadLocalRandom
;
25 import java
.util
.concurrent
.atomic
.AtomicLong
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
30 import org
.apache
.hadoop
.hbase
.client
.Get
;
31 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
32 import org
.apache
.hadoop
.hbase
.client
.Result
;
33 import org
.apache
.hadoop
.hbase
.client
.Table
;
34 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
35 import org
.slf4j
.Logger
;
36 import org
.slf4j
.LoggerFactory
;
38 /** Creates multiple threads that read and verify previously written data */
39 public class MultiThreadedReader
extends MultiThreadedAction
41 private static final Logger LOG
= LoggerFactory
.getLogger(MultiThreadedReader
.class);
43 protected Set
<HBaseReaderThread
> readers
= new HashSet
<>();
44 private final double verifyPercent
;
45 protected volatile boolean aborted
;
47 protected MultiThreadedWriterBase writer
= null;
50 * The number of keys verified in a sequence. This will never be larger than
51 * the total number of keys in the range. The reader might also verify
52 * random keys when it catches up with the writer.
54 private final AtomicLong numUniqueKeysVerified
= new AtomicLong();
57 * Default maximum number of read errors to tolerate before shutting down all
60 public static final int DEFAULT_MAX_ERRORS
= 10;
63 * Default "window" size between the last key written by the writer and the
64 * key that we attempt to read. The lower this number, the stricter our
65 * testing is. If this is zero, we always attempt to read the highest key
66 * in the contiguous sequence of keys written by the writers.
68 public static final int DEFAULT_KEY_WINDOW
= 0;
71 * Default batch size for multigets
73 public static final int DEFAULT_BATCH_SIZE
= 1; //translates to simple GET (no multi GET)
75 protected AtomicLong numKeysVerified
= new AtomicLong(0);
76 protected AtomicLong numReadErrors
= new AtomicLong(0);
77 protected AtomicLong numReadFailures
= new AtomicLong(0);
78 protected AtomicLong nullResult
= new AtomicLong(0);
79 private int maxErrors
= DEFAULT_MAX_ERRORS
;
80 private int keyWindow
= DEFAULT_KEY_WINDOW
;
81 private int batchSize
= DEFAULT_BATCH_SIZE
;
82 private int regionReplicaId
= -1; // particular region replica id to do reads against if set
84 public MultiThreadedReader(LoadTestDataGenerator dataGen
, Configuration conf
,
85 TableName tableName
, double verifyPercent
) throws IOException
{
86 super(dataGen
, conf
, tableName
, "R");
87 this.verifyPercent
= verifyPercent
;
90 public void linkToWriter(MultiThreadedWriterBase writer
) {
92 writer
.setTrackWroteKeys(true);
95 public void setMaxErrors(int maxErrors
) {
96 this.maxErrors
= maxErrors
;
99 public void setKeyWindow(int keyWindow
) {
100 this.keyWindow
= keyWindow
;
103 public void setMultiGetBatchSize(int batchSize
) {
104 this.batchSize
= batchSize
;
107 public void setRegionReplicaId(int regionReplicaId
) {
108 this.regionReplicaId
= regionReplicaId
;
112 public void start(long startKey
, long endKey
, int numThreads
) throws IOException
{
113 super.start(startKey
, endKey
, numThreads
);
115 LOG
.debug("Reading keys [" + startKey
+ ", " + endKey
+ ")");
118 addReaderThreads(numThreads
);
119 startThreads(readers
);
122 protected void addReaderThreads(int numThreads
) throws IOException
{
123 for (int i
= 0; i
< numThreads
; ++i
) {
124 HBaseReaderThread reader
= createReaderThread(i
);
129 protected HBaseReaderThread
createReaderThread(int readerId
) throws IOException
{
130 HBaseReaderThread reader
= new HBaseReaderThread(readerId
);
131 Threads
.setLoggingUncaughtExceptionHandler(reader
);
135 public class HBaseReaderThread
extends Thread
{
136 protected final int readerId
;
137 protected final Table table
;
139 /** The "current" key being read. Increases from startKey to endKey. */
142 /** Time when the thread started */
143 protected long startTimeMs
;
145 /** If we are ahead of the writer and reading a random key. */
146 private boolean readingRandomKey
;
148 private boolean printExceptionTrace
= true;
151 * @param readerId only the keys with this remainder from division by
152 * {@link #numThreads} will be read by this thread
154 public HBaseReaderThread(int readerId
) throws IOException
{
155 this.readerId
= readerId
;
156 table
= createTable();
157 setName(getClass().getSimpleName() + "_" + readerId
);
160 protected Table
createTable() throws IOException
{
161 return connection
.getTable(tableName
);
170 numThreadsWorking
.decrementAndGet();
174 protected void closeTable() {
179 } catch (IOException e
) {
180 LOG
.error("Error closing table", e
);
184 private void runReader() {
186 LOG
.info("Started thread #" + readerId
+ " for reads...");
189 startTimeMs
= EnvironmentEdgeManager
.currentTime();
191 long [] keysForThisReader
= new long[batchSize
];
192 while (curKey
< endKey
&& !aborted
) {
193 int readingRandomKeyStartIndex
= -1;
195 // if multiGet, loop until we have the number of keys equal to the batch size
197 long k
= getNextKeyToRead();
198 if (k
< startKey
|| k
>= endKey
) {
199 numReadErrors
.incrementAndGet();
200 throw new AssertionError("Load tester logic error: proposed key " +
201 "to read " + k
+ " is out of range (startKey=" + startKey
+
202 ", endKey=" + endKey
+ ")");
204 if (k
% numThreads
!= readerId
|| (writer
!= null && writer
.failedToWriteKey(k
))) {
205 // Skip keys that this thread should not read, as well as the keys
206 // that we know the writer failed to write.
209 keysForThisReader
[numKeys
] = k
;
210 if (readingRandomKey
&& readingRandomKeyStartIndex
== -1) {
211 //store the first index of a random read
212 readingRandomKeyStartIndex
= numKeys
;
215 } while (numKeys
< batchSize
&& curKey
< endKey
&& !aborted
);
217 if (numKeys
> 0) { //meaning there is some key to read
218 readKey(keysForThisReader
);
219 // We have verified some unique key(s).
220 numUniqueKeysVerified
.getAndAdd(readingRandomKeyStartIndex
== -1 ?
221 numKeys
: readingRandomKeyStartIndex
);
227 * Should only be used for the concurrent writer/reader workload. The
228 * maximum key we are allowed to read, subject to the "key window"
231 private long maxKeyWeCanRead() {
232 long insertedUpToKey
= writer
.wroteUpToKey();
233 if (insertedUpToKey
>= endKey
- 1) {
234 // The writer has finished writing our range, so we can read any
238 return Math
.min(endKey
- 1, writer
.wroteUpToKey() - keyWindow
);
241 protected long getNextKeyToRead() {
242 readingRandomKey
= false;
243 if (writer
== null || curKey
<= maxKeyWeCanRead()) {
247 // We caught up with the writer. See if we can read any keys at all.
249 while ((maxKeyToRead
= maxKeyWeCanRead()) < startKey
) {
250 // The writer has not written sufficient keys for us to be able to read
251 // anything at all. Sleep a bit. This should only happen in the
252 // beginning of a load test run.
253 Threads
.sleepWithoutInterrupt(50);
256 if (curKey
<= maxKeyToRead
) {
257 // The writer wrote some keys, and we are now allowed to read our
262 // startKey <= maxKeyToRead <= curKey - 1. Read one of the previous keys.
263 // Don't increment the current key -- we still have to try reading it
264 // later. Set a flag to make sure that we don't count this key towards
265 // the set of unique keys we have verified.
266 readingRandomKey
= true;
267 return startKey
+ Math
.abs(ThreadLocalRandom
.current().nextLong())
268 % (maxKeyToRead
- startKey
+ 1);
271 private Get
[] readKey(long[] keysToRead
) {
272 Random rand
= ThreadLocalRandom
.current();
273 Get
[] gets
= new Get
[keysToRead
.length
];
275 for (long keyToRead
: keysToRead
) {
277 gets
[i
] = createGet(keyToRead
);
278 if (keysToRead
.length
== 1) {
279 queryKey(gets
[i
], rand
.nextInt(100) < verifyPercent
, keyToRead
);
282 } catch (IOException e
) {
283 numReadFailures
.addAndGet(1);
284 LOG
.debug("[" + readerId
+ "] FAILED read, key = " + (keyToRead
+ "")
285 + ", time from start: "
286 + (EnvironmentEdgeManager
.currentTime() - startTimeMs
) + " ms");
287 if (printExceptionTrace
) {
288 LOG
.warn(e
.toString(), e
);
289 printExceptionTrace
= false;
293 if (keysToRead
.length
> 1) {
295 queryKey(gets
, rand
.nextInt(100) < verifyPercent
, keysToRead
);
296 } catch (IOException e
) {
297 numReadFailures
.addAndGet(gets
.length
);
298 for (long keyToRead
: keysToRead
) {
299 LOG
.debug("[" + readerId
+ "] FAILED read, key = " + (keyToRead
+ "")
300 + ", time from start: "
301 + (EnvironmentEdgeManager
.currentTime() - startTimeMs
) + " ms");
303 if (printExceptionTrace
) {
304 LOG
.warn(e
.toString(), e
);
305 printExceptionTrace
= false;
312 protected Get
createGet(long keyToRead
) throws IOException
{
313 Get get
= new Get(dataGenerator
.getDeterministicUniqueKey(keyToRead
));
314 String cfsString
= "";
315 byte[][] columnFamilies
= dataGenerator
.getColumnFamilies();
316 for (byte[] cf
: columnFamilies
) {
319 if (cfsString
.length() > 0) {
322 cfsString
+= "[" + Bytes
.toStringBinary(cf
) + "]";
325 get
= dataGenerator
.beforeGet(keyToRead
, get
);
326 if (regionReplicaId
> 0) {
327 get
.setReplicaId(regionReplicaId
);
328 get
.setConsistency(Consistency
.TIMELINE
);
331 LOG
.info("[" + readerId
+ "] " + "Querying key " + keyToRead
+ ", cfs " + cfsString
);
336 public void queryKey(Get
[] gets
, boolean verify
, long[] keysToRead
) throws IOException
{
338 long start
= System
.nanoTime();
339 // Uses multi/batch gets
340 Result
[] results
= table
.get(Arrays
.asList(gets
));
341 long end
= System
.nanoTime();
342 verifyResultsAndUpdateMetrics(verify
, gets
, end
- start
, results
, table
, false);
345 public void queryKey(Get get
, boolean verify
, long keyToRead
) throws IOException
{
348 long start
= System
.nanoTime();
350 Result result
= table
.get(get
);
351 long end
= System
.nanoTime();
352 verifyResultsAndUpdateMetrics(verify
, get
, end
- start
, result
, table
, false);
355 protected void verifyResultsAndUpdateMetrics(boolean verify
, Get
[] gets
, long elapsedNano
,
356 Result
[] results
, Table table
, boolean isNullExpected
)
358 totalOpTimeMs
.addAndGet(elapsedNano
/ 1000000);
359 numKeys
.addAndGet(gets
.length
);
361 for (Result result
: results
) {
362 verifyResultsAndUpdateMetricsOnAPerGetBasis(verify
, gets
[i
++], result
, table
,
367 protected void verifyResultsAndUpdateMetrics(boolean verify
, Get get
, long elapsedNano
,
368 Result result
, Table table
, boolean isNullExpected
)
370 verifyResultsAndUpdateMetrics(verify
, new Get
[]{get
}, elapsedNano
,
371 new Result
[]{result
}, table
, isNullExpected
);
374 private void verifyResultsAndUpdateMetricsOnAPerGetBasis(boolean verify
, Get get
,
375 Result result
, Table table
, boolean isNullExpected
) throws IOException
{
376 if (!result
.isEmpty()) {
378 numKeysVerified
.incrementAndGet();
381 HRegionLocation hloc
;
382 try (RegionLocator locator
= connection
.getRegionLocator(tableName
)) {
383 hloc
= locator
.getRegionLocation(get
.getRow());
385 String rowKey
= Bytes
.toString(get
.getRow());
386 LOG
.info("Key = " + rowKey
+ ", Region location: " + hloc
);
388 nullResult
.incrementAndGet();
389 LOG
.debug("Null result obtained for the key ="+rowKey
);
393 boolean isOk
= verifyResultAgainstDataGenerator(result
, verify
, false);
394 long numErrorsAfterThis
= 0;
397 // Count the columns for reporting purposes.
398 for (byte[] cf
: result
.getMap().keySet()) {
399 cols
+= result
.getFamilyMap(cf
).size();
401 numCols
.addAndGet(cols
);
403 if (writer
!= null) {
404 LOG
.error("At the time of failure, writer wrote " + writer
.numKeys
.get() + " keys");
406 numErrorsAfterThis
= numReadErrors
.incrementAndGet();
409 if (numErrorsAfterThis
> maxErrors
) {
410 LOG
.error("Aborting readers -- found more than " + maxErrors
+ " errors");
416 public long getNumReadFailures() {
417 return numReadFailures
.get();
420 public long getNumReadErrors() {
421 return numReadErrors
.get();
424 public long getNumKeysVerified() {
425 return numKeysVerified
.get();
428 public long getNumUniqueKeysVerified() {
429 return numUniqueKeysVerified
.get();
432 public long getNullResultsCount() {
433 return nullResult
.get();
437 protected String
progressInfo() {
438 StringBuilder sb
= new StringBuilder();
439 appendToStatus(sb
, "verified", numKeysVerified
.get());
440 appendToStatus(sb
, "READ FAILURES", numReadFailures
.get());
441 appendToStatus(sb
, "READ ERRORS", numReadErrors
.get());
442 appendToStatus(sb
, "NULL RESULT", nullResult
.get());
443 return sb
.toString();