HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / util / MultiThreadedUpdater.java
blob78c698975a7f5c0fbef106b1e080d361f5c5d22d
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Arrays;
28 import java.util.HashSet;
29 import java.util.Map;
30 import java.util.Random;
31 import java.util.Set;
32 import java.util.concurrent.ThreadLocalRandom;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.hbase.Cell;
36 import org.apache.hadoop.hbase.CellUtil;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Append;
40 import org.apache.hadoop.hbase.client.Delete;
41 import org.apache.hadoop.hbase.client.Get;
42 import org.apache.hadoop.hbase.client.Increment;
43 import org.apache.hadoop.hbase.client.Mutation;
44 import org.apache.hadoop.hbase.client.Put;
45 import org.apache.hadoop.hbase.client.Result;
46 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
47 import org.apache.hadoop.hbase.client.Table;
48 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
49 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
50 import org.apache.hadoop.util.StringUtils;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
55 /** Creates multiple threads that write key/values into the */
56 public class MultiThreadedUpdater extends MultiThreadedWriterBase {
57 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedUpdater.class);
59 protected Set<HBaseUpdaterThread> updaters = new HashSet<>();
61 private MultiThreadedWriterBase writer = null;
62 private boolean isBatchUpdate = false;
63 private boolean ignoreNonceConflicts = false;
64 private final double updatePercent;
66 public MultiThreadedUpdater(LoadTestDataGenerator dataGen, Configuration conf,
67 TableName tableName, double updatePercent) throws IOException {
68 super(dataGen, conf, tableName, "U");
69 this.updatePercent = updatePercent;
72 /** Use batch vs. separate updates for every column in a row */
73 public void setBatchUpdate(boolean isBatchUpdate) {
74 this.isBatchUpdate = isBatchUpdate;
77 public void linkToWriter(MultiThreadedWriterBase writer) {
78 this.writer = writer;
79 writer.setTrackWroteKeys(true);
82 @Override
83 public void start(long startKey, long endKey, int numThreads) throws IOException {
84 super.start(startKey, endKey, numThreads);
86 if (verbose) {
87 LOG.debug("Updating keys [" + startKey + ", " + endKey + ")");
90 addUpdaterThreads(numThreads);
92 startThreads(updaters);
95 protected void addUpdaterThreads(int numThreads) throws IOException {
96 for (int i = 0; i < numThreads; ++i) {
97 HBaseUpdaterThread updater = new HBaseUpdaterThread(i);
98 updaters.add(updater);
102 private long getNextKeyToUpdate() {
103 if (writer == null) {
104 return nextKeyToWrite.getAndIncrement();
106 synchronized (this) {
107 if (nextKeyToWrite.get() >= endKey) {
108 // Finished the whole key range
109 return endKey;
111 while (nextKeyToWrite.get() > writer.wroteUpToKey()) {
112 Threads.sleepWithoutInterrupt(100);
114 long k = nextKeyToWrite.getAndIncrement();
115 if (writer.failedToWriteKey(k)) {
116 failedKeySet.add(k);
117 return getNextKeyToUpdate();
119 return k;
123 protected class HBaseUpdaterThread extends Thread {
124 protected final Table table;
126 public HBaseUpdaterThread(int updaterId) throws IOException {
127 setName(getClass().getSimpleName() + "_" + updaterId);
128 table = createTable();
131 protected Table createTable() throws IOException {
132 return connection.getTable(tableName);
135 @Override
136 public void run() {
137 try {
138 Random rand = ThreadLocalRandom.current();
139 long rowKeyBase;
140 StringBuilder buf = new StringBuilder();
141 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
142 while ((rowKeyBase = getNextKeyToUpdate()) < endKey) {
143 if (rand.nextInt(100) < updatePercent) {
144 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
145 Increment inc = new Increment(rowKey);
146 Append app = new Append(rowKey);
147 numKeys.addAndGet(1);
148 int columnCount = 0;
149 for (byte[] cf : columnFamilies) {
150 long cfHash = Arrays.hashCode(cf);
151 inc.addColumn(cf, INCREMENT, cfHash);
152 buf.setLength(0); // Clear the buffer
153 buf.append("#").append(Bytes.toString(INCREMENT));
154 buf.append(":").append(MutationType.INCREMENT.getNumber());
155 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
156 ++columnCount;
157 if (!isBatchUpdate) {
158 mutate(table, inc, rowKeyBase);
159 numCols.addAndGet(1);
160 inc = new Increment(rowKey);
161 mutate(table, app, rowKeyBase);
162 numCols.addAndGet(1);
163 app = new Append(rowKey);
165 Get get = new Get(rowKey);
166 get.addFamily(cf);
167 try {
168 get = dataGenerator.beforeGet(rowKeyBase, get);
169 } catch (Exception e) {
170 // Ideally wont happen
171 LOG.warn("Failed to modify the get from the load generator = [" + Bytes.toString(get.getRow())
172 + "], column family = [" + Bytes.toString(cf) + "]", e);
174 Result result = getRow(get, rowKeyBase, cf);
175 Map<byte[], byte[]> columnValues =
176 result != null ? result.getFamilyMap(cf) : null;
177 if (columnValues == null) {
178 int specialPermCellInsertionFactor = Integer.parseInt(dataGenerator.getArgs()[2]);
179 if (((int) rowKeyBase % specialPermCellInsertionFactor == 0)) {
180 LOG.info("Null result expected for the rowkey " + Bytes.toString(rowKey));
181 } else {
182 failedKeySet.add(rowKeyBase);
183 LOG.error("Failed to update the row with key = [" + Bytes.toString(rowKey)
184 + "], since we could not get the original row");
187 if(columnValues != null) {
188 for (byte[] column : columnValues.keySet()) {
189 if (Bytes.equals(column, INCREMENT) || Bytes.equals(column, MUTATE_INFO)) {
190 continue;
192 MutationType mt =
193 MutationType.values()[rand.nextInt(MutationType.values().length)];
194 long columnHash = Arrays.hashCode(column);
195 long hashCode = cfHash + columnHash;
196 byte[] hashCodeBytes = Bytes.toBytes(hashCode);
197 byte[] checkedValue = HConstants.EMPTY_BYTE_ARRAY;
198 if (hashCode % 2 == 0) {
199 Cell kv = result.getColumnLatestCell(cf, column);
200 checkedValue = kv != null ? CellUtil.cloneValue(kv) : null;
201 Preconditions.checkNotNull(checkedValue,
202 "Column value to be checked should not be null");
204 buf.setLength(0); // Clear the buffer
205 buf.append("#").append(Bytes.toString(column)).append(":");
206 ++columnCount;
207 switch (mt) {
208 case PUT:
209 Put put = new Put(rowKey);
210 put.addColumn(cf, column, hashCodeBytes);
211 mutate(table, put, rowKeyBase, rowKey, cf, column, checkedValue);
212 buf.append(MutationType.PUT.getNumber());
213 break;
214 case DELETE:
215 Delete delete = new Delete(rowKey);
216 // Delete all versions since a put
217 // could be called multiple times if CM is used
218 delete.addColumns(cf, column);
219 mutate(table, delete, rowKeyBase, rowKey, cf, column, checkedValue);
220 buf.append(MutationType.DELETE.getNumber());
221 break;
222 default:
223 buf.append(MutationType.APPEND.getNumber());
224 app.addColumn(cf, column, hashCodeBytes);
226 app.addColumn(cf, MUTATE_INFO, Bytes.toBytes(buf.toString()));
227 if (!isBatchUpdate) {
228 mutate(table, app, rowKeyBase);
229 numCols.addAndGet(1);
230 app = new Append(rowKey);
235 if (isBatchUpdate) {
236 if (verbose) {
237 LOG.debug("Preparing increment and append for key = ["
238 + Bytes.toString(rowKey) + "], " + columnCount + " columns");
240 mutate(table, inc, rowKeyBase);
241 mutate(table, app, rowKeyBase);
242 numCols.addAndGet(columnCount);
245 if (trackWroteKeys) {
246 wroteKeys.add(rowKeyBase);
249 } finally {
250 closeHTable();
251 numThreadsWorking.decrementAndGet();
255 protected void closeHTable() {
256 try {
257 if (table != null) {
258 table.close();
260 } catch (IOException e) {
261 LOG.error("Error closing table", e);
265 protected Result getRow(Get get, long rowKeyBase, byte[] cf) {
266 Result result = null;
267 try {
268 result = table.get(get);
269 } catch (IOException ie) {
270 LOG.warn(
271 "Failed to get the row for key = [" + Bytes.toString(get.getRow()) + "], column family = ["
272 + Bytes.toString(cf) + "]", ie);
274 return result;
277 public void mutate(Table table, Mutation m, long keyBase) {
278 mutate(table, m, keyBase, null, null, null, null);
281 public void mutate(Table table, Mutation m,
282 long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
283 long start = EnvironmentEdgeManager.currentTime();
284 try {
285 m = dataGenerator.beforeMutate(keyBase, m);
286 if (m instanceof Increment) {
287 table.increment((Increment)m);
288 } else if (m instanceof Append) {
289 table.append((Append)m);
290 } else if (m instanceof Put) {
291 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
292 } else if (m instanceof Delete) {
293 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
294 } else {
295 throw new IllegalArgumentException(
296 "unsupported mutation " + m.getClass().getSimpleName());
298 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
299 } catch (IOException e) {
300 if (ignoreNonceConflicts) {
301 LOG.info("Detected nonce conflict, ignoring: " + e.getMessage());
302 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
303 return;
305 failedKeySet.add(keyBase);
306 String exceptionInfo;
307 if (e instanceof RetriesExhaustedWithDetailsException) {
308 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException) e;
309 exceptionInfo = aggEx.getExhaustiveDescription();
310 } else {
311 exceptionInfo = StringUtils.stringifyException(e);
313 LOG.error("Failed to mutate: " + keyBase + " after " +
314 (EnvironmentEdgeManager.currentTime() - start) +
315 "ms; region information: " + getRegionDebugInfoSafe(table, m.getRow()) + "; errors: "
316 + exceptionInfo);
321 @Override
322 public void waitForFinish() {
323 super.waitForFinish();
324 System.out.println("Failed to update keys: " + failedKeySet.size());
325 for (Long key : failedKeySet) {
326 System.out.println("Failed to update key: " + key);
330 public void mutate(Table table, Mutation m, long keyBase) {
331 mutate(table, m, keyBase, null, null, null, null);
334 public void mutate(Table table, Mutation m,
335 long keyBase, byte[] row, byte[] cf, byte[] q, byte[] v) {
336 long start = EnvironmentEdgeManager.currentTime();
337 try {
338 m = dataGenerator.beforeMutate(keyBase, m);
339 if (m instanceof Increment) {
340 table.increment((Increment)m);
341 } else if (m instanceof Append) {
342 table.append((Append)m);
343 } else if (m instanceof Put) {
344 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenPut((Put)m);
345 } else if (m instanceof Delete) {
346 table.checkAndMutate(row, cf).qualifier(q).ifEquals(v).thenDelete((Delete)m);
347 } else {
348 throw new IllegalArgumentException(
349 "unsupported mutation " + m.getClass().getSimpleName());
351 totalOpTimeMs.addAndGet(EnvironmentEdgeManager.currentTime() - start);
352 } catch (IOException e) {
353 failedKeySet.add(keyBase);
354 String exceptionInfo;
355 if (e instanceof RetriesExhaustedWithDetailsException) {
356 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
357 exceptionInfo = aggEx.getExhaustiveDescription();
358 } else {
359 StringWriter stackWriter = new StringWriter();
360 PrintWriter pw = new PrintWriter(stackWriter);
361 e.printStackTrace(pw);
362 pw.flush();
363 exceptionInfo = StringUtils.stringifyException(e);
365 LOG.error("Failed to mutate: " + keyBase + " after " +
366 (EnvironmentEdgeManager.currentTime() - start) + "ms; region information: " +
367 getRegionDebugInfoSafe(table, m.getRow()) + "; errors: " + exceptionInfo);
371 public void setIgnoreNonceConflicts(boolean value) {
372 this.ignoreNonceConflicts = value;