2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.util
;
21 import static org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
.INCREMENT
;
22 import static org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
.MUTATE_INFO
;
24 import java
.io
.IOException
;
25 import java
.io
.PrintWriter
;
26 import java
.io
.StringWriter
;
27 import java
.util
.Arrays
;
28 import java
.util
.HashSet
;
30 import java
.util
.Random
;
32 import java
.util
.concurrent
.ThreadLocalRandom
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.Cell
;
36 import org
.apache
.hadoop
.hbase
.CellUtil
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.client
.Append
;
40 import org
.apache
.hadoop
.hbase
.client
.Delete
;
41 import org
.apache
.hadoop
.hbase
.client
.Get
;
42 import org
.apache
.hadoop
.hbase
.client
.Increment
;
43 import org
.apache
.hadoop
.hbase
.client
.Mutation
;
44 import org
.apache
.hadoop
.hbase
.client
.Put
;
45 import org
.apache
.hadoop
.hbase
.client
.Result
;
46 import org
.apache
.hadoop
.hbase
.client
.RetriesExhaustedWithDetailsException
;
47 import org
.apache
.hadoop
.hbase
.client
.Table
;
48 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClientProtos
.MutationProto
.MutationType
;
49 import org
.apache
.hadoop
.hbase
.util
.test
.LoadTestDataGenerator
;
50 import org
.apache
.hadoop
.util
.StringUtils
;
51 import org
.slf4j
.Logger
;
52 import org
.slf4j
.LoggerFactory
;
53 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Preconditions
;
55 /** Creates multiple threads that write key/values into the */
56 public class MultiThreadedUpdater
extends MultiThreadedWriterBase
{
57 private static final Logger LOG
= LoggerFactory
.getLogger(MultiThreadedUpdater
.class);
59 protected Set
<HBaseUpdaterThread
> updaters
= new HashSet
<>();
61 private MultiThreadedWriterBase writer
= null;
62 private boolean isBatchUpdate
= false;
63 private boolean ignoreNonceConflicts
= false;
64 private final double updatePercent
;
66 public MultiThreadedUpdater(LoadTestDataGenerator dataGen
, Configuration conf
,
67 TableName tableName
, double updatePercent
) throws IOException
{
68 super(dataGen
, conf
, tableName
, "U");
69 this.updatePercent
= updatePercent
;
72 /** Use batch vs. separate updates for every column in a row */
73 public void setBatchUpdate(boolean isBatchUpdate
) {
74 this.isBatchUpdate
= isBatchUpdate
;
77 public void linkToWriter(MultiThreadedWriterBase writer
) {
79 writer
.setTrackWroteKeys(true);
83 public void start(long startKey
, long endKey
, int numThreads
) throws IOException
{
84 super.start(startKey
, endKey
, numThreads
);
87 LOG
.debug("Updating keys [" + startKey
+ ", " + endKey
+ ")");
90 addUpdaterThreads(numThreads
);
92 startThreads(updaters
);
95 protected void addUpdaterThreads(int numThreads
) throws IOException
{
96 for (int i
= 0; i
< numThreads
; ++i
) {
97 HBaseUpdaterThread updater
= new HBaseUpdaterThread(i
);
98 updaters
.add(updater
);
102 private long getNextKeyToUpdate() {
103 if (writer
== null) {
104 return nextKeyToWrite
.getAndIncrement();
106 synchronized (this) {
107 if (nextKeyToWrite
.get() >= endKey
) {
108 // Finished the whole key range
111 while (nextKeyToWrite
.get() > writer
.wroteUpToKey()) {
112 Threads
.sleepWithoutInterrupt(100);
114 long k
= nextKeyToWrite
.getAndIncrement();
115 if (writer
.failedToWriteKey(k
)) {
117 return getNextKeyToUpdate();
123 protected class HBaseUpdaterThread
extends Thread
{
124 protected final Table table
;
126 public HBaseUpdaterThread(int updaterId
) throws IOException
{
127 setName(getClass().getSimpleName() + "_" + updaterId
);
128 table
= createTable();
131 protected Table
createTable() throws IOException
{
132 return connection
.getTable(tableName
);
138 Random rand
= ThreadLocalRandom
.current();
140 StringBuilder buf
= new StringBuilder();
141 byte[][] columnFamilies
= dataGenerator
.getColumnFamilies();
142 while ((rowKeyBase
= getNextKeyToUpdate()) < endKey
) {
143 if (rand
.nextInt(100) < updatePercent
) {
144 byte[] rowKey
= dataGenerator
.getDeterministicUniqueKey(rowKeyBase
);
145 Increment inc
= new Increment(rowKey
);
146 Append app
= new Append(rowKey
);
147 numKeys
.addAndGet(1);
149 for (byte[] cf
: columnFamilies
) {
150 long cfHash
= Arrays
.hashCode(cf
);
151 inc
.addColumn(cf
, INCREMENT
, cfHash
);
152 buf
.setLength(0); // Clear the buffer
153 buf
.append("#").append(Bytes
.toString(INCREMENT
));
154 buf
.append(":").append(MutationType
.INCREMENT
.getNumber());
155 app
.addColumn(cf
, MUTATE_INFO
, Bytes
.toBytes(buf
.toString()));
157 if (!isBatchUpdate
) {
158 mutate(table
, inc
, rowKeyBase
);
159 numCols
.addAndGet(1);
160 inc
= new Increment(rowKey
);
161 mutate(table
, app
, rowKeyBase
);
162 numCols
.addAndGet(1);
163 app
= new Append(rowKey
);
165 Get get
= new Get(rowKey
);
168 get
= dataGenerator
.beforeGet(rowKeyBase
, get
);
169 } catch (Exception e
) {
170 // Ideally wont happen
171 LOG
.warn("Failed to modify the get from the load generator = [" + Bytes
.toString(get
.getRow())
172 + "], column family = [" + Bytes
.toString(cf
) + "]", e
);
174 Result result
= getRow(get
, rowKeyBase
, cf
);
175 Map
<byte[], byte[]> columnValues
=
176 result
!= null ? result
.getFamilyMap(cf
) : null;
177 if (columnValues
== null) {
178 int specialPermCellInsertionFactor
= Integer
.parseInt(dataGenerator
.getArgs()[2]);
179 if (((int) rowKeyBase
% specialPermCellInsertionFactor
== 0)) {
180 LOG
.info("Null result expected for the rowkey " + Bytes
.toString(rowKey
));
182 failedKeySet
.add(rowKeyBase
);
183 LOG
.error("Failed to update the row with key = [" + Bytes
.toString(rowKey
)
184 + "], since we could not get the original row");
187 if(columnValues
!= null) {
188 for (byte[] column
: columnValues
.keySet()) {
189 if (Bytes
.equals(column
, INCREMENT
) || Bytes
.equals(column
, MUTATE_INFO
)) {
193 MutationType
.values()[rand
.nextInt(MutationType
.values().length
)];
194 long columnHash
= Arrays
.hashCode(column
);
195 long hashCode
= cfHash
+ columnHash
;
196 byte[] hashCodeBytes
= Bytes
.toBytes(hashCode
);
197 byte[] checkedValue
= HConstants
.EMPTY_BYTE_ARRAY
;
198 if (hashCode
% 2 == 0) {
199 Cell kv
= result
.getColumnLatestCell(cf
, column
);
200 checkedValue
= kv
!= null ? CellUtil
.cloneValue(kv
) : null;
201 Preconditions
.checkNotNull(checkedValue
,
202 "Column value to be checked should not be null");
204 buf
.setLength(0); // Clear the buffer
205 buf
.append("#").append(Bytes
.toString(column
)).append(":");
209 Put put
= new Put(rowKey
);
210 put
.addColumn(cf
, column
, hashCodeBytes
);
211 mutate(table
, put
, rowKeyBase
, rowKey
, cf
, column
, checkedValue
);
212 buf
.append(MutationType
.PUT
.getNumber());
215 Delete delete
= new Delete(rowKey
);
216 // Delete all versions since a put
217 // could be called multiple times if CM is used
218 delete
.addColumns(cf
, column
);
219 mutate(table
, delete
, rowKeyBase
, rowKey
, cf
, column
, checkedValue
);
220 buf
.append(MutationType
.DELETE
.getNumber());
223 buf
.append(MutationType
.APPEND
.getNumber());
224 app
.addColumn(cf
, column
, hashCodeBytes
);
226 app
.addColumn(cf
, MUTATE_INFO
, Bytes
.toBytes(buf
.toString()));
227 if (!isBatchUpdate
) {
228 mutate(table
, app
, rowKeyBase
);
229 numCols
.addAndGet(1);
230 app
= new Append(rowKey
);
237 LOG
.debug("Preparing increment and append for key = ["
238 + Bytes
.toString(rowKey
) + "], " + columnCount
+ " columns");
240 mutate(table
, inc
, rowKeyBase
);
241 mutate(table
, app
, rowKeyBase
);
242 numCols
.addAndGet(columnCount
);
245 if (trackWroteKeys
) {
246 wroteKeys
.add(rowKeyBase
);
251 numThreadsWorking
.decrementAndGet();
255 protected void closeHTable() {
260 } catch (IOException e
) {
261 LOG
.error("Error closing table", e
);
265 protected Result
getRow(Get get
, long rowKeyBase
, byte[] cf
) {
266 Result result
= null;
268 result
= table
.get(get
);
269 } catch (IOException ie
) {
271 "Failed to get the row for key = [" + Bytes
.toString(get
.getRow()) + "], column family = ["
272 + Bytes
.toString(cf
) + "]", ie
);
277 public void mutate(Table table
, Mutation m
, long keyBase
) {
278 mutate(table
, m
, keyBase
, null, null, null, null);
281 public void mutate(Table table
, Mutation m
,
282 long keyBase
, byte[] row
, byte[] cf
, byte[] q
, byte[] v
) {
283 long start
= EnvironmentEdgeManager
.currentTime();
285 m
= dataGenerator
.beforeMutate(keyBase
, m
);
286 if (m
instanceof Increment
) {
287 table
.increment((Increment
)m
);
288 } else if (m
instanceof Append
) {
289 table
.append((Append
)m
);
290 } else if (m
instanceof Put
) {
291 table
.checkAndMutate(row
, cf
).qualifier(q
).ifEquals(v
).thenPut((Put
)m
);
292 } else if (m
instanceof Delete
) {
293 table
.checkAndMutate(row
, cf
).qualifier(q
).ifEquals(v
).thenDelete((Delete
)m
);
295 throw new IllegalArgumentException(
296 "unsupported mutation " + m
.getClass().getSimpleName());
298 totalOpTimeMs
.addAndGet(EnvironmentEdgeManager
.currentTime() - start
);
299 } catch (IOException e
) {
300 if (ignoreNonceConflicts
) {
301 LOG
.info("Detected nonce conflict, ignoring: " + e
.getMessage());
302 totalOpTimeMs
.addAndGet(EnvironmentEdgeManager
.currentTime() - start
);
305 failedKeySet
.add(keyBase
);
306 String exceptionInfo
;
307 if (e
instanceof RetriesExhaustedWithDetailsException
) {
308 RetriesExhaustedWithDetailsException aggEx
= (RetriesExhaustedWithDetailsException
) e
;
309 exceptionInfo
= aggEx
.getExhaustiveDescription();
311 exceptionInfo
= StringUtils
.stringifyException(e
);
313 LOG
.error("Failed to mutate: " + keyBase
+ " after " +
314 (EnvironmentEdgeManager
.currentTime() - start
) +
315 "ms; region information: " + getRegionDebugInfoSafe(table
, m
.getRow()) + "; errors: "
322 public void waitForFinish() {
323 super.waitForFinish();
324 System
.out
.println("Failed to update keys: " + failedKeySet
.size());
325 for (Long key
: failedKeySet
) {
326 System
.out
.println("Failed to update key: " + key
);
330 public void mutate(Table table
, Mutation m
, long keyBase
) {
331 mutate(table
, m
, keyBase
, null, null, null, null);
334 public void mutate(Table table
, Mutation m
,
335 long keyBase
, byte[] row
, byte[] cf
, byte[] q
, byte[] v
) {
336 long start
= EnvironmentEdgeManager
.currentTime();
338 m
= dataGenerator
.beforeMutate(keyBase
, m
);
339 if (m
instanceof Increment
) {
340 table
.increment((Increment
)m
);
341 } else if (m
instanceof Append
) {
342 table
.append((Append
)m
);
343 } else if (m
instanceof Put
) {
344 table
.checkAndMutate(row
, cf
).qualifier(q
).ifEquals(v
).thenPut((Put
)m
);
345 } else if (m
instanceof Delete
) {
346 table
.checkAndMutate(row
, cf
).qualifier(q
).ifEquals(v
).thenDelete((Delete
)m
);
348 throw new IllegalArgumentException(
349 "unsupported mutation " + m
.getClass().getSimpleName());
351 totalOpTimeMs
.addAndGet(EnvironmentEdgeManager
.currentTime() - start
);
352 } catch (IOException e
) {
353 failedKeySet
.add(keyBase
);
354 String exceptionInfo
;
355 if (e
instanceof RetriesExhaustedWithDetailsException
) {
356 RetriesExhaustedWithDetailsException aggEx
= (RetriesExhaustedWithDetailsException
)e
;
357 exceptionInfo
= aggEx
.getExhaustiveDescription();
359 StringWriter stackWriter
= new StringWriter();
360 PrintWriter pw
= new PrintWriter(stackWriter
);
361 e
.printStackTrace(pw
);
363 exceptionInfo
= StringUtils
.stringifyException(e
);
365 LOG
.error("Failed to mutate: " + keyBase
+ " after " +
366 (EnvironmentEdgeManager
.currentTime() - start
) + "ms; region information: " +
367 getRegionDebugInfoSafe(table
, m
.getRow()) + "; errors: " + exceptionInfo
);
371 public void setIgnoreNonceConflicts(boolean value
) {
372 this.ignoreNonceConflicts
= value
;