2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import java
.io
.IOException
;
22 import java
.util
.PriorityQueue
;
23 import java
.util
.Queue
;
25 import java
.util
.concurrent
.ArrayBlockingQueue
;
26 import java
.util
.concurrent
.BlockingQueue
;
27 import java
.util
.concurrent
.ConcurrentSkipListSet
;
28 import java
.util
.concurrent
.TimeUnit
;
29 import java
.util
.concurrent
.atomic
.AtomicLong
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.HRegionLocation
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.client
.RegionLocator
;
34 import org
.apache
.hadoop
.hbase
.client
.Table
;
35 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
36 import org
.slf4j
.Logger
;
37 import org
.slf4j
.LoggerFactory
;
39 /** Creates multiple threads that write key/values into the */
40 public abstract class MultiThreadedWriterBase
extends MultiThreadedAction
{
41 private static final Logger LOG
= LoggerFactory
.getLogger(MultiThreadedWriterBase
.class);
44 * A temporary place to keep track of inserted/updated keys. This is written to by
45 * all writers and is drained on a separate thread that populates
46 * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
47 * being inserted/updated. This queue is supposed to stay small.
49 protected BlockingQueue
<Long
> wroteKeys
;
52 * This is the current key to be inserted/updated by any thread. Each thread does an
53 * atomic get and increment operation and inserts the current value.
55 protected AtomicLong nextKeyToWrite
= new AtomicLong();
58 * The highest key in the contiguous range of keys .
60 protected AtomicLong wroteUpToKey
= new AtomicLong();
62 /** The sorted set of keys NOT inserted/updated by the writers */
63 protected Set
<Long
> failedKeySet
= new ConcurrentSkipListSet
<>();
66 * The total size of the temporary inserted/updated key set that have not yet lined
67 * up in a our contiguous sequence starting from startKey. Supposed to stay
70 protected AtomicLong wroteKeyQueueSize
= new AtomicLong();
72 /** Enable this if used in conjunction with a concurrent reader. */
73 protected boolean trackWroteKeys
;
75 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen
, Configuration conf
,
76 TableName tableName
, String actionLetter
) throws IOException
{
77 super(dataGen
, conf
, tableName
, actionLetter
);
78 this.wroteKeys
= createWriteKeysQueue(conf
);
81 protected BlockingQueue
<Long
> createWriteKeysQueue(Configuration conf
) {
82 return new ArrayBlockingQueue
<>(10000);
86 public void start(long startKey
, long endKey
, int numThreads
) throws IOException
{
87 super.start(startKey
, endKey
, numThreads
);
88 nextKeyToWrite
.set(startKey
);
89 wroteUpToKey
.set(startKey
- 1);
92 new Thread(new WroteKeysTracker(),
93 "MultiThreadedWriterBase-WroteKeysTracker-" + System
.currentTimeMillis()).start();
94 numThreadsWorking
.incrementAndGet();
98 protected String
getRegionDebugInfoSafe(Table table
, byte[] rowKey
) {
99 HRegionLocation cached
= null, real
= null;
100 try (RegionLocator locator
= connection
.getRegionLocator(tableName
)) {
101 cached
= locator
.getRegionLocation(rowKey
, false);
102 real
= locator
.getRegionLocation(rowKey
, true);
103 } catch (Throwable t
) {
104 // Cannot obtain region information for another catch block - too bad!
106 String result
= "no information can be obtained";
107 if (cached
!= null) {
108 result
= "cached: " + cached
.toString();
110 if (real
!= null && real
.getServerName() != null) {
111 if (cached
!= null && cached
.getServerName() != null && real
.equals(cached
)) {
112 result
+= "; cache is up to date";
114 result
= (cached
!= null) ?
(result
+ "; ") : "";
115 result
+= "real: " + real
.toString();
122 * A thread that keeps track of the highest key in the contiguous range of
123 * inserted/updated keys.
125 private class WroteKeysTracker
implements Runnable
{
129 Thread
.currentThread().setName(getClass().getSimpleName());
131 long expectedKey
= startKey
;
132 Queue
<Long
> sortedKeys
= new PriorityQueue
<>();
133 while (expectedKey
< endKey
) {
134 // Block until a new element is available.
137 k
= wroteKeys
.poll(1, TimeUnit
.SECONDS
);
138 } catch (InterruptedException e
) {
139 LOG
.info("Inserted key tracker thread interrupted", e
);
145 if (k
== expectedKey
) {
146 // Skip the "sorted key" queue and consume this key.
153 // See if we have a sequence of contiguous keys lined up.
154 while (!sortedKeys
.isEmpty()
155 && ((k
= sortedKeys
.peek()) == expectedKey
)) {
161 wroteKeyQueueSize
.set(wroteKeys
.size() + sortedKeys
.size());
163 } catch (Exception ex
) {
164 LOG
.error("Error in inserted/updaed key tracker", ex
);
166 numThreadsWorking
.decrementAndGet();
171 public int getNumWriteFailures() {
172 return failedKeySet
.size();
176 * The max key until which all keys have been inserted/updated (successfully or not).
177 * @return the last key that we have inserted/updated all keys up to (inclusive)
179 public long wroteUpToKey() {
180 return wroteUpToKey
.get();
183 public boolean failedToWriteKey(long k
) {
184 return failedKeySet
.contains(k
);
188 protected String
progressInfo() {
189 StringBuilder sb
= new StringBuilder();
190 appendToStatus(sb
, "wroteUpTo", wroteUpToKey
.get());
191 appendToStatus(sb
, "wroteQSize", wroteKeyQueueSize
.get());
192 return sb
.toString();
196 * Used for a joint write/read workload. Enables tracking the last inserted/updated
197 * key, which requires a blocking queue and a consumer thread.
198 * @param enable whether to enable tracking the last inserted/updated key
200 public void setTrackWroteKeys(boolean enable
) {
201 trackWroteKeys
= enable
;