HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / TestWALSplitToHFile.java
bloba7fb7317cc08828efc2c9b8cdd65130412444b74
1 /*
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.wal;
21 import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
22 import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27 import static org.mockito.Mockito.when;
29 import java.io.IOException;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.List;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.fs.FSDataOutputStream;
38 import org.apache.hadoop.fs.FileStatus;
39 import org.apache.hadoop.fs.FileSystem;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.hbase.Cell;
42 import org.apache.hadoop.hbase.HBaseClassTestRule;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HBaseTestingUtility;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.ServerName;
47 import org.apache.hadoop.hbase.TableName;
48 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
49 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
50 import org.apache.hadoop.hbase.client.Get;
51 import org.apache.hadoop.hbase.client.Put;
52 import org.apache.hadoop.hbase.client.RegionInfo;
53 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.client.TableDescriptor;
57 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
58 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
59 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
60 import org.apache.hadoop.hbase.regionserver.HRegion;
61 import org.apache.hadoop.hbase.regionserver.RegionScanner;
62 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
63 import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
64 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
65 import org.apache.hadoop.hbase.security.User;
66 import org.apache.hadoop.hbase.testclassification.MediumTests;
67 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.CommonFSUtils;
70 import org.apache.hadoop.hbase.util.EnvironmentEdge;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.FSTableDescriptors;
73 import org.apache.hadoop.hbase.util.FSUtils;
74 import org.apache.hadoop.hbase.util.Pair;
75 import org.junit.After;
76 import org.junit.AfterClass;
77 import org.junit.Before;
78 import org.junit.BeforeClass;
79 import org.junit.ClassRule;
80 import org.junit.Rule;
81 import org.junit.Test;
82 import org.junit.experimental.categories.Category;
83 import org.junit.rules.TestName;
84 import org.mockito.Mockito;
85 import org.slf4j.Logger;
86 import org.slf4j.LoggerFactory;
88 @Category({ RegionServerTests.class, MediumTests.class })
89 public class TestWALSplitToHFile {
90 @ClassRule
91 public static final HBaseClassTestRule CLASS_RULE =
92 HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
94 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
95 static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
96 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
97 private Path rootDir = null;
98 private String logName;
99 private Path oldLogDir;
100 private Path logDir;
101 private FileSystem fs;
102 private Configuration conf;
103 private WALFactory wals;
105 private static final byte[] ROW = Bytes.toBytes("row");
106 private static final byte[] VALUE1 = Bytes.toBytes("value1");
107 private static final byte[] VALUE2 = Bytes.toBytes("value2");
108 private static final int countPerFamily = 10;
110 @Rule
111 public final TestName TEST_NAME = new TestName();
113 @BeforeClass
114 public static void setUpBeforeClass() throws Exception {
115 Configuration conf = UTIL.getConfiguration();
116 conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
117 UTIL.startMiniCluster(3);
118 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
119 LOG.info("hbase.rootdir=" + hbaseRootDir);
120 FSUtils.setRootDir(conf, hbaseRootDir);
123 @AfterClass
124 public static void tearDownAfterClass() throws Exception {
125 UTIL.shutdownMiniCluster();
128 @Before
129 public void setUp() throws Exception {
130 this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
131 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
132 this.fs = UTIL.getDFSCluster().getFileSystem();
133 this.rootDir = FSUtils.getRootDir(this.conf);
134 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
135 String serverName =
136 ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
137 .toString();
138 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
139 this.logDir = new Path(this.rootDir, logName);
140 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
141 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
143 this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
146 @After
147 public void tearDown() throws Exception {
148 this.wals.close();
149 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
153 * @param p Directory to cleanup
155 private void deleteDir(final Path p) throws IOException {
156 if (this.fs.exists(p)) {
157 if (!this.fs.delete(p, true)) {
158 throw new IOException("Failed remove of " + p);
163 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
164 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
165 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
166 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
167 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
168 TableDescriptor td = builder.build();
169 UTIL.getAdmin().createTable(td);
170 return td;
173 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
174 FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
175 wal.init();
176 return wal;
179 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
180 final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
181 final TableDescriptor td = createBasic3FamilyTD(tableName);
182 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
183 final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
184 deleteDir(tableDir);
185 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
186 HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
187 HBaseTestingUtility.closeRegionAndWAL(region);
188 return new Pair<>(td, ri);
191 @Test
192 public void testCorruptRecoveredHFile() throws Exception {
193 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
194 TableDescriptor td = pair.getFirst();
195 RegionInfo ri = pair.getSecond();
197 WAL wal = createWAL(this.conf, rootDir, logName);
198 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
199 final long timestamp = this.ee.currentTime();
200 // Write data and flush
201 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
202 region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
204 region.flush(true);
206 // Now assert edits made it in.
207 Result result1 = region.get(new Get(ROW));
208 assertEquals(td.getColumnFamilies().length, result1.size());
209 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
210 assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
213 // Now close the region
214 region.close(true);
215 wal.shutdown();
216 // split the log
217 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
219 // Write a corrupt recovered hfile
220 Path regionDir =
221 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
222 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
223 FileStatus[] files =
224 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
225 assertNotNull(files);
226 assertTrue(files.length > 0);
227 writeCorruptRecoveredHFile(files[0].getPath());
230 // Failed to reopen the region
231 WAL wal2 = createWAL(this.conf, rootDir, logName);
232 try {
233 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
234 fail("Should fail to open region");
235 } catch (CorruptHFileException che) {
236 // Expected
239 // Set skip errors to true and reopen the region
240 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
241 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
242 Result result2 = region2.get(new Get(ROW));
243 assertEquals(td.getColumnFamilies().length, result2.size());
244 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
245 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
246 // Assert the corrupt file was skipped and still exist
247 FileStatus[] files =
248 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
249 assertNotNull(files);
250 assertEquals(1, files.length);
251 assertTrue(files[0].getPath().getName().contains("corrupt"));
256 * Test writing edits into an HRegion, closing it, splitting logs, opening
257 * Region again. Verify seqids.
259 @Test
260 public void testWrittenViaHRegion()
261 throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
262 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
263 TableDescriptor td = pair.getFirst();
264 RegionInfo ri = pair.getSecond();
266 // Write countPerFamily edits into the three families. Do a flush on one
267 // of the families during the load of edits so its seqid is not same as
268 // others to test we do right thing when different seqids.
269 WAL wal = createWAL(this.conf, rootDir, logName);
270 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
271 long seqid = region.getOpenSeqNum();
272 boolean first = true;
273 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
274 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
275 if (first) {
276 // If first, so we have at least one family w/ different seqid to rest.
277 region.flush(true);
278 first = false;
281 // Now assert edits made it in.
282 final Get g = new Get(ROW);
283 Result result = region.get(g);
284 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
285 // Now close the region (without flush), split the log, reopen the region and assert that
286 // replay of log has the correct effect, that our seqids are calculated correctly so
287 // all edits in logs are seen as 'stale'/old.
288 region.close(true);
289 wal.shutdown();
290 try {
291 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
292 } catch (Exception e) {
293 LOG.debug("Got exception", e);
296 WAL wal2 = createWAL(this.conf, rootDir, logName);
297 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
298 long seqid2 = region2.getOpenSeqNum();
299 assertTrue(seqid + result.size() < seqid2);
300 final Result result1b = region2.get(g);
301 assertEquals(result.size(), result1b.size());
303 // Next test. Add more edits, then 'crash' this region by stealing its wal
304 // out from under it and assert that replay of the log adds the edits back
305 // correctly when region is opened again.
306 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
307 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
309 // Get count of edits.
310 final Result result2 = region2.get(g);
311 assertEquals(2 * result.size(), result2.size());
312 wal2.sync();
313 final Configuration newConf = HBaseConfiguration.create(this.conf);
314 User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString());
315 user.runAs(new PrivilegedExceptionAction<Object>() {
316 @Override
317 public Object run() throws Exception {
318 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
319 FileSystem newFS = FileSystem.get(newConf);
320 // Make a new wal for new region open.
321 WAL wal3 = createWAL(newConf, rootDir, logName);
322 Path tableDir = FSUtils.getTableDir(rootDir, td.getTableName());
323 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
324 long seqid3 = region3.initialize();
325 Result result3 = region3.get(g);
326 // Assert that count of cells is same as before crash.
327 assertEquals(result2.size(), result3.size());
329 // I can't close wal1. Its been appropriated when we split.
330 region3.close();
331 wal3.close();
332 return null;
338 * Test that we recover correctly when there is a failure in between the
339 * flushes. i.e. Some stores got flushed but others did not.
340 * Unfortunately, there is no easy hook to flush at a store level. The way
341 * we get around this is by flushing at the region level, and then deleting
342 * the recently flushed store file for one of the Stores. This would put us
343 * back in the situation where all but that store got flushed and the region
344 * died.
345 * We restart Region again, and verify that the edits were replayed.
347 @Test
348 public void testAfterPartialFlush()
349 throws IOException, SecurityException, IllegalArgumentException {
350 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
351 TableDescriptor td = pair.getFirst();
352 RegionInfo ri = pair.getSecond();
354 // Write countPerFamily edits into the three families. Do a flush on one
355 // of the families during the load of edits so its seqid is not same as
356 // others to test we do right thing when different seqids.
357 WAL wal = createWAL(this.conf, rootDir, logName);
358 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
359 long seqid = region.getOpenSeqNum();
360 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
361 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
364 // Now assert edits made it in.
365 final Get g = new Get(ROW);
366 Result result = region.get(g);
367 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
369 // Let us flush the region
370 region.flush(true);
371 region.close(true);
372 wal.shutdown();
374 // delete the store files in the second column family to simulate a failure
375 // in between the flushcache();
376 // we have 3 families. killing the middle one ensures that taking the maximum
377 // will make us fail.
378 int cf_count = 0;
379 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
380 cf_count++;
381 if (cf_count == 2) {
382 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
386 // Let us try to split and recover
387 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
388 WAL wal2 = createWAL(this.conf, rootDir, logName);
389 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
390 long seqid2 = region2.getOpenSeqNum();
391 assertTrue(seqid + result.size() < seqid2);
393 final Result result1b = region2.get(g);
394 assertEquals(result.size(), result1b.size());
398 * Test that we could recover the data correctly after aborting flush. In the
399 * test, first we abort flush after writing some data, then writing more data
400 * and flush again, at last verify the data.
402 @Test
403 public void testAfterAbortingFlush() throws IOException {
404 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
405 TableDescriptor td = pair.getFirst();
406 RegionInfo ri = pair.getSecond();
408 // Write countPerFamily edits into the three families. Do a flush on one
409 // of the families during the load of edits so its seqid is not same as
410 // others to test we do right thing when different seqids.
411 WAL wal = createWAL(this.conf, rootDir, logName);
412 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
413 Mockito.doReturn(false).when(rsServices).isAborted();
414 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
415 when(rsServices.getConfiguration()).thenReturn(conf);
416 Configuration customConf = new Configuration(this.conf);
417 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
418 AbstractTestWALReplay.CustomStoreFlusher.class.getName());
419 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
420 int writtenRowCount = 10;
421 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
422 for (int i = 0; i < writtenRowCount; i++) {
423 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
424 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
425 Bytes.toBytes("val"));
426 region.put(put);
429 // Now assert edits made it in.
430 RegionScanner scanner = region.getScanner(new Scan());
431 assertEquals(writtenRowCount, getScannedCount(scanner));
433 // Let us flush the region
434 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
435 try {
436 region.flush(true);
437 fail("Injected exception hasn't been thrown");
438 } catch (IOException e) {
439 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
440 // simulated to abort server
441 Mockito.doReturn(true).when(rsServices).isAborted();
442 region.setClosing(false); // region normally does not accept writes after
443 // DroppedSnapshotException. We mock around it for this test.
445 // writing more data
446 int moreRow = 10;
447 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
448 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
449 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
450 Bytes.toBytes("val"));
451 region.put(put);
453 writtenRowCount += moreRow;
454 // call flush again
455 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
456 try {
457 region.flush(true);
458 } catch (IOException t) {
459 LOG.info(
460 "Expected exception when flushing region because server is stopped," + t.getMessage());
463 region.close(true);
464 wal.shutdown();
466 // Let us try to split and recover
467 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
468 WAL wal2 = createWAL(this.conf, rootDir, logName);
469 Mockito.doReturn(false).when(rsServices).isAborted();
470 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
471 scanner = region2.getScanner(new Scan());
472 assertEquals(writtenRowCount, getScannedCount(scanner));
475 private int getScannedCount(RegionScanner scanner) throws IOException {
476 int scannedCount = 0;
477 List<Cell> results = new ArrayList<>();
478 while (true) {
479 boolean existMore = scanner.next(results);
480 if (!results.isEmpty()) {
481 scannedCount++;
483 if (!existMore) {
484 break;
486 results.clear();
488 return scannedCount;
491 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
492 // Read the recovered hfile
493 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
494 FSDataInputStream in = fs.open(recoveredHFile);
495 byte[] fileContent = new byte[fileSize];
496 in.readFully(0, fileContent, 0, fileSize);
497 in.close();
499 // Write a corrupt hfile by append garbage
500 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
501 FSDataOutputStream out;
502 out = fs.create(path);
503 out.write(fileContent);
504 out.write(Bytes.toBytes("-----"));
505 out.close();