HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / util / MultiThreadedWriterBase.java
blob1ebc9b114320248279d4c0cce88b45cd3d26cd49
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import java.io.IOException;
22 import java.util.PriorityQueue;
23 import java.util.Queue;
24 import java.util.Set;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.ConcurrentSkipListSet;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicLong;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.HRegionLocation;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.hadoop.hbase.client.RegionLocator;
34 import org.apache.hadoop.hbase.client.Table;
35 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 /** Creates multiple threads that write key/values into the */
40 public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
41 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriterBase.class);
43 /**
44 * A temporary place to keep track of inserted/updated keys. This is written to by
45 * all writers and is drained on a separate thread that populates
46 * {@link #wroteUpToKey}, the maximum key in the contiguous range of keys
47 * being inserted/updated. This queue is supposed to stay small.
49 protected BlockingQueue<Long> wroteKeys;
51 /**
52 * This is the current key to be inserted/updated by any thread. Each thread does an
53 * atomic get and increment operation and inserts the current value.
55 protected AtomicLong nextKeyToWrite = new AtomicLong();
57 /**
58 * The highest key in the contiguous range of keys .
60 protected AtomicLong wroteUpToKey = new AtomicLong();
62 /** The sorted set of keys NOT inserted/updated by the writers */
63 protected Set<Long> failedKeySet = new ConcurrentSkipListSet<>();
65 /**
66 * The total size of the temporary inserted/updated key set that have not yet lined
67 * up in a our contiguous sequence starting from startKey. Supposed to stay
68 * small.
70 protected AtomicLong wroteKeyQueueSize = new AtomicLong();
72 /** Enable this if used in conjunction with a concurrent reader. */
73 protected boolean trackWroteKeys;
75 public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf,
76 TableName tableName, String actionLetter) throws IOException {
77 super(dataGen, conf, tableName, actionLetter);
78 this.wroteKeys = createWriteKeysQueue(conf);
81 protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
82 return new ArrayBlockingQueue<>(10000);
85 @Override
86 public void start(long startKey, long endKey, int numThreads) throws IOException {
87 super.start(startKey, endKey, numThreads);
88 nextKeyToWrite.set(startKey);
89 wroteUpToKey.set(startKey - 1);
91 if (trackWroteKeys) {
92 new Thread(new WroteKeysTracker(),
93 "MultiThreadedWriterBase-WroteKeysTracker-" + System.currentTimeMillis()).start();
94 numThreadsWorking.incrementAndGet();
98 protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
99 HRegionLocation cached = null, real = null;
100 try (RegionLocator locator = connection.getRegionLocator(tableName)) {
101 cached = locator.getRegionLocation(rowKey, false);
102 real = locator.getRegionLocation(rowKey, true);
103 } catch (Throwable t) {
104 // Cannot obtain region information for another catch block - too bad!
106 String result = "no information can be obtained";
107 if (cached != null) {
108 result = "cached: " + cached.toString();
110 if (real != null && real.getServerName() != null) {
111 if (cached != null && cached.getServerName() != null && real.equals(cached)) {
112 result += "; cache is up to date";
113 } else {
114 result = (cached != null) ? (result + "; ") : "";
115 result += "real: " + real.toString();
118 return result;
122 * A thread that keeps track of the highest key in the contiguous range of
123 * inserted/updated keys.
125 private class WroteKeysTracker implements Runnable {
127 @Override
128 public void run() {
129 Thread.currentThread().setName(getClass().getSimpleName());
130 try {
131 long expectedKey = startKey;
132 Queue<Long> sortedKeys = new PriorityQueue<>();
133 while (expectedKey < endKey) {
134 // Block until a new element is available.
135 Long k;
136 try {
137 k = wroteKeys.poll(1, TimeUnit.SECONDS);
138 } catch (InterruptedException e) {
139 LOG.info("Inserted key tracker thread interrupted", e);
140 break;
142 if (k == null) {
143 continue;
145 if (k == expectedKey) {
146 // Skip the "sorted key" queue and consume this key.
147 wroteUpToKey.set(k);
148 ++expectedKey;
149 } else {
150 sortedKeys.add(k);
153 // See if we have a sequence of contiguous keys lined up.
154 while (!sortedKeys.isEmpty()
155 && ((k = sortedKeys.peek()) == expectedKey)) {
156 sortedKeys.poll();
157 wroteUpToKey.set(k);
158 ++expectedKey;
161 wroteKeyQueueSize.set(wroteKeys.size() + sortedKeys.size());
163 } catch (Exception ex) {
164 LOG.error("Error in inserted/updaed key tracker", ex);
165 } finally {
166 numThreadsWorking.decrementAndGet();
171 public int getNumWriteFailures() {
172 return failedKeySet.size();
176 * The max key until which all keys have been inserted/updated (successfully or not).
177 * @return the last key that we have inserted/updated all keys up to (inclusive)
179 public long wroteUpToKey() {
180 return wroteUpToKey.get();
183 public boolean failedToWriteKey(long k) {
184 return failedKeySet.contains(k);
187 @Override
188 protected String progressInfo() {
189 StringBuilder sb = new StringBuilder();
190 appendToStatus(sb, "wroteUpTo", wroteUpToKey.get());
191 appendToStatus(sb, "wroteQSize", wroteKeyQueueSize.get());
192 return sb.toString();
196 * Used for a joint write/read workload. Enables tracking the last inserted/updated
197 * key, which requires a blocking queue and a consumer thread.
198 * @param enable whether to enable tracking the last inserted/updated key
200 public void setTrackWroteKeys(boolean enable) {
201 trackWroteKeys = enable;