2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import static org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
.INCREMENT
;
22 import static org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
.MUTATE_INFO
;
24 import java
.io
.IOException
;
25 import java
.io
.PrintWriter
;
26 import java
.io
.StringWriter
;
27 import java
.util
.Arrays
;
28 import java
.util
.HashSet
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.Put
;
35 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedWithDetailsException
;
36 import org
.apache
.hadoop
.hbase
.client
.Table
;
37 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
38 import org
.apache
.hadoop
.util
.StringUtils
;
39 import org
.slf4j
.Logger
;
40 import org
.slf4j
.LoggerFactory
;
42 /** Creates multiple threads that write key/values into the */
43 public class MultiThreadedWriter
extends MultiThreadedWriterBase
{
44 private static final Logger LOG
= LoggerFactory
.getLogger(MultiThreadedWriter
.class);
46 protected Set
<HBaseWriterThread
> writers
= new HashSet
<>();
48 protected boolean isMultiPut
= false;
50 public MultiThreadedWriter(LoadTestDataGenerator dataGen
, Configuration conf
,
51 TableName tableName
) throws IOException
{
52 super(dataGen
, conf
, tableName
, "W");
55 /** Use multi-puts vs. separate puts for every column in a row */
56 public void setMultiPut(boolean isMultiPut
) {
57 this.isMultiPut
= isMultiPut
;
61 public void start(long startKey
, long endKey
, int numThreads
) throws IOException
{
62 super.start(startKey
, endKey
, numThreads
);
65 LOG
.debug("Inserting keys [" + startKey
+ ", " + endKey
+ ")");
68 createWriterThreads(numThreads
);
70 startThreads(writers
);
73 protected void createWriterThreads(int numThreads
) throws IOException
{
74 for (int i
= 0; i
< numThreads
; ++i
) {
75 HBaseWriterThread writer
= new HBaseWriterThread(i
);
76 Threads
.setLoggingUncaughtExceptionHandler(writer
);
81 public class HBaseWriterThread
extends Thread
{
82 private final Table table
;
84 public HBaseWriterThread(int writerId
) throws IOException
{
85 setName(getClass().getSimpleName() + "_" + writerId
);
86 table
= createTable();
89 protected Table
createTable() throws IOException
{
90 return connection
.getTable(tableName
);
97 byte[][] columnFamilies
= dataGenerator
.getColumnFamilies();
98 while ((rowKeyBase
= nextKeyToWrite
.getAndIncrement()) < endKey
) {
99 byte[] rowKey
= dataGenerator
.getDeterministicUniqueKey(rowKeyBase
);
100 Put put
= new Put(rowKey
);
101 numKeys
.addAndGet(1);
103 for (byte[] cf
: columnFamilies
) {
104 byte[][] columns
= dataGenerator
.generateColumnsForCf(rowKey
, cf
);
105 for (byte[] column
: columns
) {
106 byte[] value
= dataGenerator
.generateValue(rowKey
, cf
, column
);
107 put
.addColumn(cf
, column
, value
);
110 insert(table
, put
, rowKeyBase
);
111 numCols
.addAndGet(1);
112 put
= new Put(rowKey
);
115 long rowKeyHash
= Arrays
.hashCode(rowKey
);
116 put
.addColumn(cf
, MUTATE_INFO
, HConstants
.EMPTY_BYTE_ARRAY
);
117 put
.addColumn(cf
, INCREMENT
, Bytes
.toBytes(rowKeyHash
));
119 insert(table
, put
, rowKeyBase
);
120 numCols
.addAndGet(1);
121 put
= new Put(rowKey
);
126 LOG
.debug("Preparing put for key = [" + Bytes
.toString(rowKey
) + "], " + columnCount
+ " columns");
128 insert(table
, put
, rowKeyBase
);
129 numCols
.addAndGet(columnCount
);
131 if (trackWroteKeys
) {
132 wroteKeys
.add(rowKeyBase
);
137 numThreadsWorking
.decrementAndGet();
141 public void insert(Table table
, Put put
, long keyBase
) {
142 long start
= System
.currentTimeMillis();
144 put
= (Put
) dataGenerator
.beforeMutate(keyBase
, put
);
146 totalOpTimeMs
.addAndGet(System
.currentTimeMillis() - start
);
147 } catch (IOException e
) {
148 failedKeySet
.add(keyBase
);
149 String exceptionInfo
;
150 if (e
instanceof RetriesExhaustedWithDetailsException
) {
151 RetriesExhaustedWithDetailsException aggEx
= (RetriesExhaustedWithDetailsException
)e
;
152 exceptionInfo
= aggEx
.getExhaustiveDescription();
154 StringWriter stackWriter
= new StringWriter();
155 PrintWriter pw
= new PrintWriter(stackWriter
);
156 e
.printStackTrace(pw
);
158 exceptionInfo
= StringUtils
.stringifyException(e
);
160 LOG
.error("Failed to insert: " + keyBase
+ " after " + (System
.currentTimeMillis() - start
)
161 + "ms; region information: " + getRegionDebugInfoSafe(table
, put
.getRow())
162 + "; errors: " + exceptionInfo
);
165 protected void closeHTable() {
170 } catch (IOException e
) {
171 LOG
.error("Error closing table", e
);
177 public void waitForFinish() {
178 super.waitForFinish();
179 System
.out
.println("Failed to write keys: " + failedKeySet
.size());
180 for (Long key
: failedKeySet
) {
181 System
.out
.println("Failed to write key: " + key
);