From 2a60a61a73d3b001324e159397b2083dfcbbc2aa Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sun, 22 Mar 2020 22:56:30 +0800 Subject: [PATCH] HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322) Signed-off-by: Duo Zhang --- .../apache/hadoop/hbase/regionserver/HRegion.java | 1 + .../org/apache/hadoop/hbase/wal/WALSplitUtil.java | 6 +- .../hadoop/hbase/wal/TestWALSplitToHFile.java | 168 ++++++++++++++++----- 3 files changed, 133 insertions(+), 42 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 11a10180d8..a9483558e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5449,6 +5449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi store.assertBulkLoadHFileOk(filePath); } catch (IOException e) { handleException(fs.getFileSystem(), filePath, e); + continue; } Pair pair = store.preBulkLoadHFile(filePath.toString(), -1); store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index 8011a8bc67..3ff1e70ad7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -345,16 +345,16 @@ public final class WALSplitUtil { /** * Move aside a bad edits file. - * @param walFS WAL FileSystem used to rename bad edits file. + * @param fs the file system used to rename bad edits file. * @param edits Edits file to move aside. * @return The name of the moved aside file. * @throws IOException */ - public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) + public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits) throws IOException { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); - if (!walFS.rename(edits, moveAsideName)) { + if (!fs.rename(edits, moveAsideName)) { LOG.warn("Rename failed from {} to {}", edits, moveAsideName); } return moveAsideName; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java index eaf6b6021f..a7fb7317cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.wal; import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.when; @@ -32,6 +33,9 @@ import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -51,6 +55,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -61,10 +66,12 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -95,6 +102,11 @@ public class TestWALSplitToHFile { private Configuration conf; private WALFactory wals; + private static final byte[] ROW = Bytes.toBytes("row"); + private static final byte[] VALUE1 = Bytes.toBytes("value1"); + private static final byte[] VALUE2 = Bytes.toBytes("value2"); + private static final int countPerFamily = 10; + @Rule public final TestName TEST_NAME = new TestName(); @@ -116,6 +128,7 @@ public class TestWALSplitToHFile { @Before public void setUp() throws Exception { this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); + this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false); this.fs = UTIL.getDFSCluster().getFileSystem(); this.rootDir = FSUtils.getRootDir(this.conf); this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); @@ -163,24 +176,93 @@ public class TestWALSplitToHFile { return wal; } - /** - * Test writing edits into an HRegion, closing it, splitting logs, opening - * Region again. Verify seqids. - */ - @Test - public void testReplayEditsWrittenViaHRegion() - throws IOException, SecurityException, IllegalArgumentException, InterruptedException { + private Pair setupTableAndRegion() throws IOException { final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); final TableDescriptor td = createBasic3FamilyTD(tableName); final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName); deleteDir(tableDir); FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; + HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); + HBaseTestingUtility.closeRegionAndWAL(region); + return new Pair<>(td, ri); + } + + @Test + public void testCorruptRecoveredHFile() throws Exception { + Pair pair = setupTableAndRegion(); + TableDescriptor td = pair.getFirst(); + RegionInfo ri = pair.getSecond(); + + WAL wal = createWAL(this.conf, rootDir, logName); + HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); + final long timestamp = this.ee.currentTime(); + // Write data and flush + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1)); + } + region.flush(true); + + // Now assert edits made it in. + Result result1 = region.get(new Get(ROW)); + assertEquals(td.getColumnFamilies().length, result1.size()); + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x")))); + } + + // Now close the region + region.close(true); + wal.shutdown(); + // split the log + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + + // Write a corrupt recovered hfile + Path regionDir = + new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName()); + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + FileStatus[] files = + WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); + assertNotNull(files); + assertTrue(files.length > 0); + writeCorruptRecoveredHFile(files[0].getPath()); + } + + // Failed to reopen the region + WAL wal2 = createWAL(this.conf, rootDir, logName); + try { + HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); + fail("Should fail to open region"); + } catch (CorruptHFileException che) { + // Expected + } + + // Set skip errors to true and reopen the region + this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true); + HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2); + Result result2 = region2.get(new Get(ROW)); + assertEquals(td.getColumnFamilies().length, result2.size()); + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), Bytes.toBytes("x")))); + // Assert the corrupt file was skipped and still exist + FileStatus[] files = + WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString()); + assertNotNull(files); + assertEquals(1, files.length); + assertTrue(files[0].getPath().getName().contains("corrupt")); + } + } + + /** + * Test writing edits into an HRegion, closing it, splitting logs, opening + * Region again. Verify seqids. + */ + @Test + public void testWrittenViaHRegion() + throws IOException, SecurityException, IllegalArgumentException, InterruptedException { + Pair pair = setupTableAndRegion(); + TableDescriptor td = pair.getFirst(); + RegionInfo ri = pair.getSecond(); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); - HBaseTestingUtility.closeRegionAndWAL(region3); // Write countPerFamily edits into the three families. Do a flush on one // of the families during the load of edits so its seqid is not same as // others to test we do right thing when different seqids. @@ -189,7 +271,7 @@ public class TestWALSplitToHFile { long seqid = region.getOpenSeqNum(); boolean first = true; for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { - addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x"); + addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); if (first) { // If first, so we have at least one family w/ different seqid to rest. region.flush(true); @@ -197,7 +279,7 @@ public class TestWALSplitToHFile { } } // Now assert edits made it in. - final Get g = new Get(rowName); + final Get g = new Get(ROW); Result result = region.get(g); assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); // Now close the region (without flush), split the log, reopen the region and assert that @@ -222,14 +304,14 @@ public class TestWALSplitToHFile { // out from under it and assert that replay of the log adds the edits back // correctly when region is opened again. for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) { - addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y"); + addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y"); } // Get count of edits. final Result result2 = region2.get(g); assertEquals(2 * result.size(), result2.size()); wal2.sync(); final Configuration newConf = HBaseConfiguration.create(this.conf); - User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString()); + User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString()); user.runAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { @@ -237,6 +319,7 @@ public class TestWALSplitToHFile { FileSystem newFS = FileSystem.get(newConf); // Make a new wal for new region open. WAL wal3 = createWAL(newConf, rootDir, logName); + Path tableDir = FSUtils.getTableDir(rootDir, td.getTableName()); HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null); long seqid3 = region3.initialize(); Result result3 = region3.get(g); @@ -262,18 +345,12 @@ public class TestWALSplitToHFile { * We restart Region again, and verify that the edits were replayed. */ @Test - public void testReplayEditsAfterPartialFlush() + public void testAfterPartialFlush() throws IOException, SecurityException, IllegalArgumentException { - final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); - final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); - final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName); - deleteDir(tableDir); - final byte[] rowName = tableName.getName(); - final int countPerFamily = 10; - final TableDescriptor td = createBasic3FamilyTD(tableName); - FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); - HBaseTestingUtility.closeRegionAndWAL(region3); + Pair pair = setupTableAndRegion(); + TableDescriptor td = pair.getFirst(); + RegionInfo ri = pair.getSecond(); + // Write countPerFamily edits into the three families. Do a flush on one // of the families during the load of edits so its seqid is not same as // others to test we do right thing when different seqids. @@ -281,11 +358,11 @@ public class TestWALSplitToHFile { HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); long seqid = region.getOpenSeqNum(); for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { - addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x"); + addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x"); } // Now assert edits made it in. - final Get g = new Get(rowName); + final Get g = new Get(ROW); Result result = region.get(g); assertEquals(countPerFamily * td.getColumnFamilies().length, result.size()); @@ -323,15 +400,11 @@ public class TestWALSplitToHFile { * and flush again, at last verify the data. */ @Test - public void testReplayEditsAfterAbortingFlush() throws IOException { - final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); - final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build(); - final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName); - deleteDir(tableDir); - final TableDescriptor td = createBasic3FamilyTD(tableName); - FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false); - HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td); - HBaseTestingUtility.closeRegionAndWAL(region3); + public void testAfterAbortingFlush() throws IOException { + Pair pair = setupTableAndRegion(); + TableDescriptor td = pair.getFirst(); + RegionInfo ri = pair.getSecond(); + // Write countPerFamily edits into the three families. Do a flush on one // of the families during the load of edits so its seqid is not same as // others to test we do right thing when different seqids. @@ -347,7 +420,7 @@ public class TestWALSplitToHFile { int writtenRowCount = 10; List families = Arrays.asList(td.getColumnFamilies()); for (int i = 0; i < writtenRowCount; i++) { - Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); + Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), Bytes.toBytes("val")); region.put(put); @@ -372,7 +445,7 @@ public class TestWALSplitToHFile { // writing more data int moreRow = 10; for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) { - Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i))); + Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i))); put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"), Bytes.toBytes("val")); region.put(put); @@ -414,4 +487,21 @@ public class TestWALSplitToHFile { } return scannedCount; } + + private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception { + // Read the recovered hfile + int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen(); + FSDataInputStream in = fs.open(recoveredHFile); + byte[] fileContent = new byte[fileSize]; + in.readFully(0, fileContent, 0, fileSize); + in.close(); + + // Write a corrupt hfile by append garbage + Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt"); + FSDataOutputStream out; + out = fs.create(path); + out.write(fileContent); + out.write(Bytes.toBytes("-----")); + out.close(); + } } -- 2.11.4.GIT