HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestHRegionServerBulkLoad.java
blob9acc928756ff2175262c25821ef79cf47702c3e2
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.regionserver.HStoreFile.BULKLOAD_TIME_KEY;
21 import static org.hamcrest.core.Is.is;
22 import static org.junit.Assert.assertThat;
24 import java.io.IOException;
25 import java.io.InterruptedIOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Optional;
32 import java.util.TreeMap;
33 import java.util.concurrent.atomic.AtomicLong;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.hbase.Cell;
38 import org.apache.hadoop.hbase.HBaseClassTestRule;
39 import org.apache.hadoop.hbase.HBaseConfiguration;
40 import org.apache.hadoop.hbase.HBaseTestingUtility;
41 import org.apache.hadoop.hbase.HRegionLocation;
42 import org.apache.hadoop.hbase.KeyValue;
43 import org.apache.hadoop.hbase.KeyValueUtil;
44 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
45 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
46 import org.apache.hadoop.hbase.StartMiniClusterOption;
47 import org.apache.hadoop.hbase.TableExistsException;
48 import org.apache.hadoop.hbase.TableName;
49 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
50 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
51 import org.apache.hadoop.hbase.client.Connection;
52 import org.apache.hadoop.hbase.client.RegionLocator;
53 import org.apache.hadoop.hbase.client.Result;
54 import org.apache.hadoop.hbase.client.ResultScanner;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.client.Table;
57 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
58 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
59 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
60 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
61 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
62 import org.apache.hadoop.hbase.io.compress.Compression;
63 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
64 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
65 import org.apache.hadoop.hbase.io.hfile.HFile;
66 import org.apache.hadoop.hbase.io.hfile.HFileContext;
67 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
68 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
69 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
70 import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
71 import org.apache.hadoop.hbase.testclassification.LargeTests;
72 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
73 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
74 import org.apache.hadoop.hbase.util.Bytes;
75 import org.apache.hadoop.hbase.wal.WAL;
76 import org.apache.hadoop.hbase.wal.WALEdit;
77 import org.apache.hadoop.hbase.wal.WALKey;
78 import org.junit.BeforeClass;
79 import org.junit.ClassRule;
80 import org.junit.Test;
81 import org.junit.experimental.categories.Category;
82 import org.junit.runner.RunWith;
83 import org.junit.runners.Parameterized;
84 import org.junit.runners.Parameterized.Parameters;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
88 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
90 /**
91 * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of
92 * the region server's bullkLoad functionality.
94 @RunWith(Parameterized.class)
95 @Category({RegionServerTests.class, LargeTests.class})
96 public class TestHRegionServerBulkLoad {
98 @ClassRule
99 public static final HBaseClassTestRule CLASS_RULE =
100 HBaseClassTestRule.forClass(TestHRegionServerBulkLoad.class);
102 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
103 protected static HBaseTestingUtility UTIL = new HBaseTestingUtility();
104 protected final static Configuration conf = UTIL.getConfiguration();
105 protected final static byte[] QUAL = Bytes.toBytes("qual");
106 protected final static int NUM_CFS = 10;
107 private int sleepDuration;
108 public static int BLOCKSIZE = 64 * 1024;
109 public static Algorithm COMPRESSION = Compression.Algorithm.NONE;
111 protected final static byte[][] families = new byte[NUM_CFS][];
112 static {
113 for (int i = 0; i < NUM_CFS; i++) {
114 families[i] = Bytes.toBytes(family(i));
117 @Parameters
118 public static final Collection<Object[]> parameters() {
119 int[] sleepDurations = new int[] { 0, 30000 };
120 List<Object[]> configurations = new ArrayList<>();
121 for (int i : sleepDurations) {
122 configurations.add(new Object[] { i });
124 return configurations;
127 public TestHRegionServerBulkLoad(int duration) {
128 this.sleepDuration = duration;
131 @BeforeClass
132 public static void setUpBeforeClass() throws Exception {
133 conf.setInt("hbase.rpc.timeout", 10 * 1000);
137 * Create a rowkey compatible with
138 * {@link #createHFile(FileSystem, Path, byte[], byte[], byte[], int)}.
140 public static byte[] rowkey(int i) {
141 return Bytes.toBytes(String.format("row_%08d", i));
144 static String family(int i) {
145 return String.format("family_%04d", i);
149 * Create an HFile with the given number of rows with a specified value.
151 public static void createHFile(FileSystem fs, Path path, byte[] family,
152 byte[] qualifier, byte[] value, int numRows) throws IOException {
153 HFileContext context = new HFileContextBuilder().withBlockSize(BLOCKSIZE)
154 .withCompression(COMPRESSION)
155 .build();
156 HFile.Writer writer = HFile
157 .getWriterFactory(conf, new CacheConfig(conf))
158 .withPath(fs, path)
159 .withFileContext(context)
160 .create();
161 long now = System.currentTimeMillis();
162 try {
163 // subtract 2 since iterateOnSplits doesn't include boundary keys
164 for (int i = 0; i < numRows; i++) {
165 KeyValue kv = new KeyValue(rowkey(i), family, qualifier, now, value);
166 writer.append(kv);
168 writer.appendFileInfo(BULKLOAD_TIME_KEY, Bytes.toBytes(now));
169 } finally {
170 writer.close();
175 * Thread that does full scans of the table looking for any partially
176 * completed rows.
178 * Each iteration of this loads 10 hdfs files, which occupies 5 file open file
179 * handles. So every 10 iterations (500 file handles) it does a region
180 * compaction to reduce the number of open file handles.
182 public static class AtomicHFileLoader extends RepeatingTestThread {
183 final AtomicLong numBulkLoads = new AtomicLong();
184 final AtomicLong numCompactions = new AtomicLong();
185 private TableName tableName;
187 public AtomicHFileLoader(TableName tableName, TestContext ctx,
188 byte targetFamilies[][]) throws IOException {
189 super(ctx);
190 this.tableName = tableName;
193 @Override
194 public void doAnAction() throws Exception {
195 long iteration = numBulkLoads.getAndIncrement();
196 Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
197 iteration));
199 // create HFiles for different column families
200 FileSystem fs = UTIL.getTestFileSystem();
201 byte[] val = Bytes.toBytes(String.format("%010d", iteration));
202 Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
203 for (int i = 0; i < NUM_CFS; i++) {
204 Path hfile = new Path(dir, family(i));
205 byte[] fam = Bytes.toBytes(family(i));
206 createHFile(fs, hfile, fam, QUAL, val, 1000);
207 family2Files.put(fam, Collections.singletonList(hfile));
209 // bulk load HFiles
210 BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
211 final Connection conn = UTIL.getConnection();
212 // Periodically do compaction to reduce the number of open file handles.
213 if (numBulkLoads.get() % 5 == 0) {
214 // 5 * 50 = 250 open file handles!
215 try (RegionLocator locator = conn.getRegionLocator(tableName)) {
216 HRegionLocation loc = locator.getRegionLocation(Bytes.toBytes("aaa"), true);
217 conn.getAdmin().compactRegion(loc.getRegion().getRegionName());
218 numCompactions.incrementAndGet();
224 public static class MyObserver implements RegionCoprocessor, RegionObserver {
225 static int sleepDuration;
227 @Override
228 public Optional<RegionObserver> getRegionObserver() {
229 return Optional.of(this);
232 @Override
233 public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
234 InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
235 CompactionRequest request)
236 throws IOException {
237 try {
238 Thread.sleep(sleepDuration);
239 } catch (InterruptedException ie) {
240 IOException ioe = new InterruptedIOException();
241 ioe.initCause(ie);
242 throw ioe;
244 return scanner;
249 * Thread that does full scans of the table looking for any partially
250 * completed rows.
252 public static class AtomicScanReader extends RepeatingTestThread {
253 byte targetFamilies[][];
254 Table table;
255 AtomicLong numScans = new AtomicLong();
256 AtomicLong numRowsScanned = new AtomicLong();
257 TableName TABLE_NAME;
259 public AtomicScanReader(TableName TABLE_NAME, TestContext ctx,
260 byte targetFamilies[][]) throws IOException {
261 super(ctx);
262 this.TABLE_NAME = TABLE_NAME;
263 this.targetFamilies = targetFamilies;
264 table = UTIL.getConnection().getTable(TABLE_NAME);
267 @Override
268 public void doAnAction() throws Exception {
269 Scan s = new Scan();
270 for (byte[] family : targetFamilies) {
271 s.addFamily(family);
273 ResultScanner scanner = table.getScanner(s);
275 for (Result res : scanner) {
276 byte[] lastRow = null, lastFam = null, lastQual = null;
277 byte[] gotValue = null;
278 for (byte[] family : targetFamilies) {
279 byte qualifier[] = QUAL;
280 byte thisValue[] = res.getValue(family, qualifier);
281 if (gotValue != null && thisValue != null
282 && !Bytes.equals(gotValue, thisValue)) {
284 StringBuilder msg = new StringBuilder();
285 msg.append("Failed on scan ").append(numScans)
286 .append(" after scanning ").append(numRowsScanned)
287 .append(" rows!\n");
288 msg.append("Current was " + Bytes.toString(res.getRow()) + "/"
289 + Bytes.toString(family) + ":" + Bytes.toString(qualifier)
290 + " = " + Bytes.toString(thisValue) + "\n");
291 msg.append("Previous was " + Bytes.toString(lastRow) + "/"
292 + Bytes.toString(lastFam) + ":" + Bytes.toString(lastQual)
293 + " = " + Bytes.toString(gotValue));
294 throw new RuntimeException(msg.toString());
297 lastFam = family;
298 lastQual = qualifier;
299 lastRow = res.getRow();
300 gotValue = thisValue;
302 numRowsScanned.getAndIncrement();
304 numScans.getAndIncrement();
309 * Creates a table with given table name and specified number of column
310 * families if the table does not already exist.
312 public void setupTable(TableName table, int cfs) throws IOException {
313 try {
314 LOG.info("Creating table " + table);
315 TableDescriptorBuilder tableDescriptorBuilder =
316 TableDescriptorBuilder.newBuilder(table);
318 tableDescriptorBuilder.setCoprocessor(MyObserver.class.getName());
319 MyObserver.sleepDuration = this.sleepDuration;
320 for (int i = 0; i < 10; i++) {
321 ColumnFamilyDescriptor columnFamilyDescriptor =
322 ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(family(i))).build();
323 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
326 UTIL.getAdmin().createTable(tableDescriptorBuilder.build());
327 } catch (TableExistsException tee) {
328 LOG.info("Table " + table + " already exists");
333 * Atomic bulk load.
335 @Test
336 public void testAtomicBulkLoad() throws Exception {
337 TableName TABLE_NAME = TableName.valueOf("atomicBulkLoad");
339 int millisToRun = 30000;
340 int numScanners = 50;
342 // Set createWALDir to true and use default values for other options.
343 UTIL.startMiniCluster(StartMiniClusterOption.builder().createWALDir(true).build());
344 try {
345 WAL log = UTIL.getHBaseCluster().getRegionServer(0).getWAL(null);
346 FindBulkHBaseListener listener = new FindBulkHBaseListener();
347 log.registerWALActionsListener(listener);
348 runAtomicBulkloadTest(TABLE_NAME, millisToRun, numScanners);
349 assertThat(listener.isFound(), is(true));
350 } finally {
351 UTIL.shutdownMiniCluster();
355 void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
356 throws Exception {
357 setupTable(tableName, 10);
359 TestContext ctx = new TestContext(UTIL.getConfiguration());
361 AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
362 ctx.addThread(loader);
364 List<AtomicScanReader> scanners = Lists.newArrayList();
365 for (int i = 0; i < numScanners; i++) {
366 AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
367 scanners.add(scanner);
368 ctx.addThread(scanner);
371 ctx.startThreads();
372 ctx.waitFor(millisToRun);
373 ctx.stop();
375 LOG.info("Loaders:");
376 LOG.info(" loaded " + loader.numBulkLoads.get());
377 LOG.info(" compations " + loader.numCompactions.get());
379 LOG.info("Scanners:");
380 for (AtomicScanReader scanner : scanners) {
381 LOG.info(" scanned " + scanner.numScans.get());
382 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
387 * Run test on an HBase instance for 5 minutes. This assumes that the table
388 * under test only has a single region.
390 public static void main(String args[]) throws Exception {
391 try {
392 Configuration c = HBaseConfiguration.create();
393 TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0);
394 test.setConf(c);
395 test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50);
396 } finally {
397 System.exit(0); // something hangs (believe it is lru threadpool)
401 private void setConf(Configuration c) {
402 UTIL = new HBaseTestingUtility(c);
405 static class FindBulkHBaseListener extends TestWALActionsListener.DummyWALActionsListener {
406 private boolean found = false;
408 @Override
409 public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) {
410 for (Cell cell : logEdit.getCells()) {
411 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
412 for (Map.Entry entry : kv.toStringMap().entrySet()) {
413 if (entry.getValue().equals(Bytes.toString(WALEdit.BULK_LOAD))) {
414 found = true;
420 public boolean isFound() {
421 return found;