HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / AcidGuaranteesTestTool.java
blob45648062fd788b48594a0bcab0cc59cba35007f6
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase;
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.Random;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.LinkedBlockingQueue;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicLong;
29 import java.util.stream.Stream;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
32 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
33 import org.apache.hadoop.hbase.client.Admin;
34 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
35 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
36 import org.apache.hadoop.hbase.client.Connection;
37 import org.apache.hadoop.hbase.client.ConnectionFactory;
38 import org.apache.hadoop.hbase.client.Get;
39 import org.apache.hadoop.hbase.client.Put;
40 import org.apache.hadoop.hbase.client.Result;
41 import org.apache.hadoop.hbase.client.ResultScanner;
42 import org.apache.hadoop.hbase.client.Scan;
43 import org.apache.hadoop.hbase.client.Table;
44 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
45 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.Threads;
48 import org.apache.hadoop.util.StringUtils;
49 import org.apache.hadoop.util.ToolRunner;
50 import org.apache.yetus.audience.InterfaceAudience;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
54 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
56 /**
57 * A test tool that uses multiple threads to read and write multifamily rows into a table, verifying
58 * that reads never see partially-complete writes
60 @InterfaceAudience.Private
61 public class AcidGuaranteesTestTool extends AbstractHBaseTool {
63 private static final Logger LOG = LoggerFactory.getLogger(AcidGuaranteesTestTool.class);
65 public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees");
66 public static final byte[] FAMILY_A = Bytes.toBytes("A");
67 public static final byte[] FAMILY_B = Bytes.toBytes("B");
68 public static final byte[] FAMILY_C = Bytes.toBytes("C");
69 public static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");
71 public static final byte[][] FAMILIES = new byte[][] { FAMILY_A, FAMILY_B, FAMILY_C };
73 public static int NUM_COLS_TO_CHECK = 50;
75 private ExecutorService sharedPool;
77 private long millisToRun;
78 private int numWriters;
79 private int numGetters;
80 private int numScanners;
81 private int numUniqueRows;
82 private boolean crazyFlush;
83 private boolean useMob;
85 private ExecutorService createThreadPool() {
86 int maxThreads = 256;
87 int coreThreads = 128;
89 long keepAliveTime = 60;
90 BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
91 maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
93 ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
94 TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared"));
95 tpe.allowCoreThreadTimeOut(true);
96 return tpe;
99 @Override
100 protected void addOptions() {
101 addOptWithArg("millis", "time limit in milliseconds");
102 addOptWithArg("numWriters", "number of write threads");
103 addOptWithArg("numGetters", "number of get threads");
104 addOptWithArg("numScanners", "number of scan threads");
105 addOptWithArg("numUniqueRows", "number of unique rows to test");
106 addOptNoArg("crazyFlush",
107 "if specified we will flush continuously otherwise will flush every minute");
108 addOptNoArg("useMob", "if specified we will enable mob on the first column family");
111 @Override
112 protected void processOptions(CommandLine cmd) {
113 millisToRun = getOptionAsLong(cmd, "millis", 5000);
114 numWriters = getOptionAsInt(cmd, "numWriters", 50);
115 numGetters = getOptionAsInt(cmd, "numGetters", 2);
116 numScanners = getOptionAsInt(cmd, "numScanners", 2);
117 numUniqueRows = getOptionAsInt(cmd, "numUniqueRows", 3);
118 crazyFlush = cmd.hasOption("crazyFlush");
119 useMob = cmd.hasOption("useMob");
122 @Override
123 protected int doWork() throws Exception {
124 sharedPool = createThreadPool();
125 try (Connection conn = ConnectionFactory.createConnection(getConf())) {
126 runTestAtomicity(conn.getAdmin());
127 } finally {
128 sharedPool.shutdown();
130 return 0;
134 * Thread that does random full-row writes into a table.
136 public static class AtomicityWriter extends RepeatingTestThread {
137 Random rand = new Random();
138 byte data[] = new byte[10];
139 byte[][] targetRows;
140 byte[][] targetFamilies;
141 Connection connection;
142 Table table;
143 AtomicLong numWritten = new AtomicLong();
145 public AtomicityWriter(TestContext ctx, byte[][] targetRows, byte[][] targetFamilies,
146 ExecutorService pool) throws IOException {
147 super(ctx);
148 this.targetRows = targetRows;
149 this.targetFamilies = targetFamilies;
150 connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
151 table = connection.getTable(TABLE_NAME);
154 @Override
155 public void doAnAction() throws Exception {
156 // Pick a random row to write into
157 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
158 Put p = new Put(targetRow);
159 rand.nextBytes(data);
161 for (byte[] family : targetFamilies) {
162 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
163 byte qualifier[] = Bytes.toBytes("col" + i);
164 p.addColumn(family, qualifier, data);
167 table.put(p);
168 numWritten.getAndIncrement();
171 @Override
172 public void workDone() throws IOException {
173 try {
174 table.close();
175 } finally {
176 connection.close();
182 * Thread that does single-row reads in a table, looking for partially completed rows.
184 public static class AtomicGetReader extends RepeatingTestThread {
185 byte[] targetRow;
186 byte[][] targetFamilies;
187 Connection connection;
188 Table table;
189 int numVerified = 0;
190 AtomicLong numRead = new AtomicLong();
192 public AtomicGetReader(TestContext ctx, byte[] targetRow, byte[][] targetFamilies,
193 ExecutorService pool) throws IOException {
194 super(ctx);
195 this.targetRow = targetRow;
196 this.targetFamilies = targetFamilies;
197 connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
198 table = connection.getTable(TABLE_NAME);
201 @Override
202 public void doAnAction() throws Exception {
203 Get g = new Get(targetRow);
204 Result res = table.get(g);
205 byte[] gotValue = null;
206 if (res.getRow() == null) {
207 // Trying to verify but we didn't find the row - the writing
208 // thread probably just hasn't started writing yet, so we can
209 // ignore this action
210 return;
213 for (byte[] family : targetFamilies) {
214 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
215 byte qualifier[] = Bytes.toBytes("col" + i);
216 byte thisValue[] = res.getValue(family, qualifier);
217 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
218 gotFailure(gotValue, res);
220 numVerified++;
221 gotValue = thisValue;
224 numRead.getAndIncrement();
227 @Override
228 public void workDone() throws IOException {
229 try {
230 table.close();
231 } finally {
232 connection.close();
236 private void gotFailure(byte[] expected, Result res) {
237 StringBuilder msg = new StringBuilder();
238 msg.append("Failed after ").append(numVerified).append("!");
239 msg.append("Expected=").append(Bytes.toStringBinary(expected));
240 msg.append("Got:\n");
241 for (Cell kv : res.listCells()) {
242 msg.append(kv.toString());
243 msg.append(" val= ");
244 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
245 msg.append("\n");
247 throw new RuntimeException(msg.toString());
252 * Thread that does full scans of the table looking for any partially completed rows.
254 public static class AtomicScanReader extends RepeatingTestThread {
255 byte[][] targetFamilies;
256 Table table;
257 Connection connection;
258 AtomicLong numScans = new AtomicLong();
259 AtomicLong numRowsScanned = new AtomicLong();
261 public AtomicScanReader(TestContext ctx, byte[][] targetFamilies, ExecutorService pool)
262 throws IOException {
263 super(ctx);
264 this.targetFamilies = targetFamilies;
265 connection = ConnectionFactory.createConnection(ctx.getConf(), pool);
266 table = connection.getTable(TABLE_NAME);
269 @Override
270 public void doAnAction() throws Exception {
271 Scan s = new Scan();
272 for (byte[] family : targetFamilies) {
273 s.addFamily(family);
275 ResultScanner scanner = table.getScanner(s);
277 for (Result res : scanner) {
278 byte[] gotValue = null;
280 for (byte[] family : targetFamilies) {
281 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
282 byte qualifier[] = Bytes.toBytes("col" + i);
283 byte thisValue[] = res.getValue(family, qualifier);
284 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
285 gotFailure(gotValue, res);
287 gotValue = thisValue;
290 numRowsScanned.getAndIncrement();
292 numScans.getAndIncrement();
295 @Override
296 public void workDone() throws IOException {
297 try {
298 table.close();
299 } finally {
300 connection.close();
304 private void gotFailure(byte[] expected, Result res) {
305 StringBuilder msg = new StringBuilder();
306 msg.append("Failed after ").append(numRowsScanned).append("!");
307 msg.append("Expected=").append(Bytes.toStringBinary(expected));
308 msg.append("Got:\n");
309 for (Cell kv : res.listCells()) {
310 msg.append(kv.toString());
311 msg.append(" val= ");
312 msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv)));
313 msg.append("\n");
315 throw new RuntimeException(msg.toString());
319 private void createTableIfMissing(Admin admin, boolean useMob) throws IOException {
320 if (!admin.tableExists(TABLE_NAME)) {
321 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
322 Stream.of(FAMILIES).map(ColumnFamilyDescriptorBuilder::of)
323 .forEachOrdered(builder::setColumnFamily);
324 admin.createTable(builder.build());
326 ColumnFamilyDescriptor cfd = admin.getDescriptor(TABLE_NAME).getColumnFamilies()[0];
327 if (cfd.isMobEnabled() != useMob) {
328 admin.modifyColumnFamily(TABLE_NAME, ColumnFamilyDescriptorBuilder.newBuilder(cfd)
329 .setMobEnabled(useMob).setMobThreshold(4).build());
333 private void runTestAtomicity(Admin admin) throws Exception {
334 createTableIfMissing(admin, useMob);
335 TestContext ctx = new TestContext(conf);
337 byte rows[][] = new byte[numUniqueRows][];
338 for (int i = 0; i < numUniqueRows; i++) {
339 rows[i] = Bytes.toBytes("test_row_" + i);
342 List<AtomicityWriter> writers = Lists.newArrayList();
343 for (int i = 0; i < numWriters; i++) {
344 AtomicityWriter writer = new AtomicityWriter(ctx, rows, FAMILIES, sharedPool);
345 writers.add(writer);
346 ctx.addThread(writer);
348 // Add a flusher
349 ctx.addThread(new RepeatingTestThread(ctx) {
350 @Override
351 public void doAnAction() throws Exception {
352 try {
353 admin.flush(TABLE_NAME);
354 } catch (IOException ioe) {
355 LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe));
357 // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally,
358 // we would flush as often as possible. On a running cluster, this isn't practical:
359 // (1) we will cause a lot of load due to all the flushing and compacting
360 // (2) we cannot change the flushing/compacting related Configuration options to try to
361 // alleviate this
362 // (3) it is an unrealistic workload, since no one would actually flush that often.
363 // Therefore, let's flush every minute to have more flushes than usual, but not overload
364 // the running cluster.
365 if (!crazyFlush) {
366 Thread.sleep(60000);
371 List<AtomicGetReader> getters = Lists.newArrayList();
372 for (int i = 0; i < numGetters; i++) {
373 AtomicGetReader getter =
374 new AtomicGetReader(ctx, rows[i % numUniqueRows], FAMILIES, sharedPool);
375 getters.add(getter);
376 ctx.addThread(getter);
379 List<AtomicScanReader> scanners = Lists.newArrayList();
380 for (int i = 0; i < numScanners; i++) {
381 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES, sharedPool);
382 scanners.add(scanner);
383 ctx.addThread(scanner);
386 ctx.startThreads();
387 ctx.waitFor(millisToRun);
388 ctx.stop();
390 LOG.info("Finished test. Writers:");
391 for (AtomicityWriter writer : writers) {
392 LOG.info(" wrote " + writer.numWritten.get());
394 LOG.info("Readers:");
395 for (AtomicGetReader reader : getters) {
396 LOG.info(" read " + reader.numRead.get());
398 LOG.info("Scanners:");
399 for (AtomicScanReader scanner : scanners) {
400 LOG.info(" scanned " + scanner.numScans.get());
401 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
405 public static void main(String[] args) {
406 Configuration c = HBaseConfiguration.create();
407 int status;
408 try {
409 AcidGuaranteesTestTool test = new AcidGuaranteesTestTool();
410 status = ToolRunner.run(c, test, args);
411 } catch (Exception e) {
412 LOG.error("Exiting due to error", e);
413 status = -1;
415 System.exit(status);