HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / util / MultiThreadedWriter.java
blobbcd24d57b70251557961911e7b8d78e8de08548a
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.util;
21 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.INCREMENT;
22 import static org.apache.hadoop.hbase.util.test.LoadTestDataGenerator.MUTATE_INFO;
24 import java.io.IOException;
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.Arrays;
28 import java.util.HashSet;
29 import java.util.Set;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
36 import org.apache.hadoop.hbase.client.Table;
37 import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
38 import org.apache.hadoop.util.StringUtils;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 /** Creates multiple threads that write key/values into the */
43 public class MultiThreadedWriter extends MultiThreadedWriterBase {
44 private static final Logger LOG = LoggerFactory.getLogger(MultiThreadedWriter.class);
46 protected Set<HBaseWriterThread> writers = new HashSet<>();
48 protected boolean isMultiPut = false;
50 public MultiThreadedWriter(LoadTestDataGenerator dataGen, Configuration conf,
51 TableName tableName) throws IOException {
52 super(dataGen, conf, tableName, "W");
55 /** Use multi-puts vs. separate puts for every column in a row */
56 public void setMultiPut(boolean isMultiPut) {
57 this.isMultiPut = isMultiPut;
60 @Override
61 public void start(long startKey, long endKey, int numThreads) throws IOException {
62 super.start(startKey, endKey, numThreads);
64 if (verbose) {
65 LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")");
68 createWriterThreads(numThreads);
70 startThreads(writers);
73 protected void createWriterThreads(int numThreads) throws IOException {
74 for (int i = 0; i < numThreads; ++i) {
75 HBaseWriterThread writer = new HBaseWriterThread(i);
76 Threads.setLoggingUncaughtExceptionHandler(writer);
77 writers.add(writer);
81 public class HBaseWriterThread extends Thread {
82 private final Table table;
84 public HBaseWriterThread(int writerId) throws IOException {
85 setName(getClass().getSimpleName() + "_" + writerId);
86 table = createTable();
89 protected Table createTable() throws IOException {
90 return connection.getTable(tableName);
93 @Override
94 public void run() {
95 try {
96 long rowKeyBase;
97 byte[][] columnFamilies = dataGenerator.getColumnFamilies();
98 while ((rowKeyBase = nextKeyToWrite.getAndIncrement()) < endKey) {
99 byte[] rowKey = dataGenerator.getDeterministicUniqueKey(rowKeyBase);
100 Put put = new Put(rowKey);
101 numKeys.addAndGet(1);
102 int columnCount = 0;
103 for (byte[] cf : columnFamilies) {
104 byte[][] columns = dataGenerator.generateColumnsForCf(rowKey, cf);
105 for (byte[] column : columns) {
106 byte[] value = dataGenerator.generateValue(rowKey, cf, column);
107 put.addColumn(cf, column, value);
108 ++columnCount;
109 if (!isMultiPut) {
110 insert(table, put, rowKeyBase);
111 numCols.addAndGet(1);
112 put = new Put(rowKey);
115 long rowKeyHash = Arrays.hashCode(rowKey);
116 put.addColumn(cf, MUTATE_INFO, HConstants.EMPTY_BYTE_ARRAY);
117 put.addColumn(cf, INCREMENT, Bytes.toBytes(rowKeyHash));
118 if (!isMultiPut) {
119 insert(table, put, rowKeyBase);
120 numCols.addAndGet(1);
121 put = new Put(rowKey);
124 if (isMultiPut) {
125 if (verbose) {
126 LOG.debug("Preparing put for key = [" + Bytes.toString(rowKey) + "], " + columnCount + " columns");
128 insert(table, put, rowKeyBase);
129 numCols.addAndGet(columnCount);
131 if (trackWroteKeys) {
132 wroteKeys.add(rowKeyBase);
135 } finally {
136 closeHTable();
137 numThreadsWorking.decrementAndGet();
141 public void insert(Table table, Put put, long keyBase) {
142 long start = System.currentTimeMillis();
143 try {
144 put = (Put) dataGenerator.beforeMutate(keyBase, put);
145 table.put(put);
146 totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
147 } catch (IOException e) {
148 failedKeySet.add(keyBase);
149 String exceptionInfo;
150 if (e instanceof RetriesExhaustedWithDetailsException) {
151 RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
152 exceptionInfo = aggEx.getExhaustiveDescription();
153 } else {
154 StringWriter stackWriter = new StringWriter();
155 PrintWriter pw = new PrintWriter(stackWriter);
156 e.printStackTrace(pw);
157 pw.flush();
158 exceptionInfo = StringUtils.stringifyException(e);
160 LOG.error("Failed to insert: " + keyBase + " after " + (System.currentTimeMillis() - start)
161 + "ms; region information: " + getRegionDebugInfoSafe(table, put.getRow())
162 + "; errors: " + exceptionInfo);
165 protected void closeHTable() {
166 try {
167 if (table != null) {
168 table.close();
170 } catch (IOException e) {
171 LOG.error("Error closing table", e);
176 @Override
177 public void waitForFinish() {
178 super.waitForFinish();
179 System.out.println("Failed to write keys: " + failedKeySet.size());
180 for (Long key : failedKeySet) {
181 System.out.println("Failed to write key: " + key);