HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / TestWALSplit.java
blobb028d34a10578c85e7db0d0e93606809a85b8400
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.lang.reflect.Method;
28 import java.security.PrivilegedExceptionAction;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NavigableSet;
37 import java.util.Objects;
38 import java.util.Set;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.atomic.AtomicLong;
42 import java.util.stream.Collectors;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FSDataInputStream;
45 import org.apache.hadoop.fs.FSDataOutputStream;
46 import org.apache.hadoop.fs.FileStatus;
47 import org.apache.hadoop.fs.FileSystem;
48 import org.apache.hadoop.fs.FileUtil;
49 import org.apache.hadoop.fs.Path;
50 import org.apache.hadoop.fs.PathFilter;
51 import org.apache.hadoop.hbase.Cell;
52 import org.apache.hadoop.hbase.HBaseClassTestRule;
53 import org.apache.hadoop.hbase.HBaseConfiguration;
54 import org.apache.hadoop.hbase.HBaseTestingUtility;
55 import org.apache.hadoop.hbase.HConstants;
56 import org.apache.hadoop.hbase.KeyValue;
57 import org.apache.hadoop.hbase.ServerName;
58 import org.apache.hadoop.hbase.TableName;
59 import org.apache.hadoop.hbase.client.RegionInfo;
60 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
61 import org.apache.hadoop.hbase.regionserver.HRegion;
62 import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
63 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
64 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
65 import org.apache.hadoop.hbase.security.User;
66 import org.apache.hadoop.hbase.testclassification.LargeTests;
67 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.CancelableProgressable;
70 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
71 import org.apache.hadoop.hbase.util.FSUtils;
72 import org.apache.hadoop.hbase.util.Threads;
73 import org.apache.hadoop.hbase.wal.WAL.Entry;
74 import org.apache.hadoop.hbase.wal.WAL.Reader;
75 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
76 import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
77 import org.apache.hadoop.hdfs.DFSTestUtil;
78 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
79 import org.apache.hadoop.ipc.RemoteException;
80 import org.junit.After;
81 import org.junit.AfterClass;
82 import org.junit.Before;
83 import org.junit.BeforeClass;
84 import org.junit.ClassRule;
85 import org.junit.Rule;
86 import org.junit.Test;
87 import org.junit.experimental.categories.Category;
88 import org.junit.rules.TestName;
89 import org.mockito.Mockito;
90 import org.mockito.invocation.InvocationOnMock;
91 import org.mockito.stubbing.Answer;
92 import org.slf4j.Logger;
93 import org.slf4j.LoggerFactory;
95 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
96 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
97 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
98 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
100 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
101 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
104 * Testing {@link WAL} splitting code.
106 @Category({RegionServerTests.class, LargeTests.class})
107 public class TestWALSplit {
109 @ClassRule
110 public static final HBaseClassTestRule CLASS_RULE =
111 HBaseClassTestRule.forClass(TestWALSplit.class);
114 // Uncomment the following lines if more verbosity is needed for
115 // debugging (see HBASE-12285 for details).
116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
120 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
122 private static Configuration conf;
123 private FileSystem fs;
125 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
127 private Path HBASEDIR;
128 private Path HBASELOGDIR;
129 private Path WALDIR;
130 private Path OLDLOGDIR;
131 private Path CORRUPTDIR;
132 private Path TABLEDIR;
133 private String TMPDIRNAME;
135 private static final int NUM_WRITERS = 10;
136 private static final int ENTRIES = 10; // entries per writer per region
138 private static final String FILENAME_BEING_SPLIT = "testfile";
139 private static final TableName TABLE_NAME =
140 TableName.valueOf("t1");
141 private static final byte[] FAMILY = Bytes.toBytes("f1");
142 private static final byte[] QUALIFIER = Bytes.toBytes("q1");
143 private static final byte[] VALUE = Bytes.toBytes("v1");
144 private static final String WAL_FILE_PREFIX = "wal.dat.";
145 private static List<String> REGIONS = new ArrayList<>();
146 private static final String HBASE_SKIP_ERRORS = "hbase.hlog.split.skip.errors";
147 private static String ROBBER;
148 private static String ZOMBIE;
149 private static String [] GROUP = new String [] {"supergroup"};
151 static enum Corruptions {
152 INSERT_GARBAGE_ON_FIRST_LINE,
153 INSERT_GARBAGE_IN_THE_MIDDLE,
154 APPEND_GARBAGE,
155 TRUNCATE,
156 TRUNCATE_TRAILER
159 @BeforeClass
160 public static void setUpBeforeClass() throws Exception {
161 conf = TEST_UTIL.getConfiguration();
162 conf.setClass("hbase.regionserver.hlog.writer.impl",
163 InstrumentedLogWriter.class, Writer.class);
164 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
165 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
166 // Create fake maping user to group and set it to the conf.
167 Map<String, String []> u2g_map = new HashMap<>(2);
168 ROBBER = User.getCurrent().getName() + "-robber";
169 ZOMBIE = User.getCurrent().getName() + "-zombie";
170 u2g_map.put(ROBBER, GROUP);
171 u2g_map.put(ZOMBIE, GROUP);
172 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
173 conf.setInt("dfs.heartbeat.interval", 1);
174 TEST_UTIL.startMiniDFSCluster(2);
177 @AfterClass
178 public static void tearDownAfterClass() throws Exception {
179 TEST_UTIL.shutdownMiniDFSCluster();
182 @Rule
183 public TestName name = new TestName();
184 private WALFactory wals = null;
186 @Before
187 public void setUp() throws Exception {
188 LOG.info("Cleaning up cluster for new test.");
189 fs = TEST_UTIL.getDFSCluster().getFileSystem();
190 HBASEDIR = TEST_UTIL.createRootDir();
191 HBASELOGDIR = TEST_UTIL.createWALRootDir();
192 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
193 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
194 TABLEDIR = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
195 TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
196 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
197 REGIONS.clear();
198 Collections.addAll(REGIONS, "bbb", "ccc");
199 InstrumentedLogWriter.activateFailure = false;
200 wals = new WALFactory(conf, name.getMethodName());
201 WALDIR = new Path(HBASELOGDIR,
202 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
203 16010, System.currentTimeMillis()).toString()));
204 //fs.mkdirs(WALDIR);
207 @After
208 public void tearDown() throws Exception {
209 try {
210 wals.close();
211 } catch(IOException exception) {
212 // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
213 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
214 " you see a failure look here.");
215 LOG.debug("exception details", exception);
216 } finally {
217 wals = null;
218 fs.delete(HBASEDIR, true);
219 fs.delete(HBASELOGDIR, true);
224 * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
225 * Ensures we do not lose edits.
226 * @throws IOException
227 * @throws InterruptedException
229 @Test
230 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
231 final AtomicLong counter = new AtomicLong(0);
232 AtomicBoolean stop = new AtomicBoolean(false);
233 // Region we'll write edits too and then later examine to make sure they all made it in.
234 final String region = REGIONS.get(0);
235 final int numWriters = 3;
236 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
237 try {
238 long startCount = counter.get();
239 zombie.start();
240 // Wait till writer starts going.
241 while (startCount == counter.get()) Threads.sleep(1);
242 // Give it a second to write a few appends.
243 Threads.sleep(1000);
244 final Configuration conf2 = HBaseConfiguration.create(conf);
245 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
246 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
247 @Override
248 public Integer run() throws Exception {
249 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
250 .append("):\n");
251 for (FileStatus status : fs.listStatus(WALDIR)) {
252 ls.append("\t").append(status.toString()).append("\n");
254 LOG.debug(Objects.toString(ls));
255 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
256 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
257 LOG.info("Finished splitting out from under zombie.");
258 Path[] logfiles = getLogForRegion(TABLE_NAME, region);
259 assertEquals("wrong number of split files for region", numWriters, logfiles.length);
260 int count = 0;
261 for (Path logfile: logfiles) {
262 count += countWAL(logfile);
264 return count;
267 LOG.info("zombie=" + counter.get() + ", robber=" + count);
268 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
269 "Zombie could write " + counter.get() + " and logfile had only " + count,
270 counter.get() == count || counter.get() + 1 == count);
271 } finally {
272 stop.set(true);
273 zombie.interrupt();
274 Threads.threadDumpingIsAlive(zombie);
279 * This thread will keep writing to a 'wal' file even after the split process has started.
280 * It simulates a region server that was considered dead but woke up and wrote some more to the
281 * last log entry. Does its writing as an alternate user in another filesystem instance to
282 * simulate better it being a regionserver.
284 class ZombieLastLogWriterRegionServer extends Thread {
285 final AtomicLong editsCount;
286 final AtomicBoolean stop;
287 final int numOfWriters;
289 * Region to write edits for.
291 final String region;
292 final User user;
294 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
295 final String region, final int writers)
296 throws IOException, InterruptedException {
297 super("ZombieLastLogWriterRegionServer");
298 setDaemon(true);
299 this.stop = stop;
300 this.editsCount = counter;
301 this.region = region;
302 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
303 numOfWriters = writers;
306 @Override
307 public void run() {
308 try {
309 doWriting();
310 } catch (IOException e) {
311 LOG.warn(getName() + " Writer exiting " + e);
312 } catch (InterruptedException e) {
313 LOG.warn(getName() + " Writer exiting " + e);
317 private void doWriting() throws IOException, InterruptedException {
318 this.user.runAs(new PrivilegedExceptionAction<Object>() {
319 @Override
320 public Object run() throws Exception {
321 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
322 // index we supply here.
323 int walToKeepOpen = numOfWriters - 1;
324 // The below method writes numOfWriters files each with ENTRIES entries for a total of
325 // numOfWriters * ENTRIES added per column family in the region.
326 Writer writer = null;
327 try {
328 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
329 } catch (IOException e1) {
330 throw new RuntimeException("Failed", e1);
332 // Update counter so has all edits written so far.
333 editsCount.addAndGet(numOfWriters * ENTRIES);
334 loop(writer);
335 // If we've been interruped, then things should have shifted out from under us.
336 // closing should error
337 try {
338 writer.close();
339 fail("Writing closing after parsing should give an error.");
340 } catch (IOException exception) {
341 LOG.debug("ignoring error when closing final writer.", exception);
343 return null;
348 private void loop(final Writer writer) {
349 byte [] regionBytes = Bytes.toBytes(this.region);
350 while (!stop.get()) {
351 try {
352 long seq = appendEntry(writer, TABLE_NAME, regionBytes,
353 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
354 long count = editsCount.incrementAndGet();
355 LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
356 try {
357 Thread.sleep(1);
358 } catch (InterruptedException e) {
361 } catch (IOException ex) {
362 LOG.error(getName() + " ex " + ex.toString());
363 if (ex instanceof RemoteException) {
364 LOG.error("Juliet: got RemoteException " + ex.getMessage() +
365 " while writing " + (editsCount.get() + 1));
366 } else {
367 LOG.error(getName() + " failed to write....at " + editsCount.get());
368 fail("Failed to write " + editsCount.get());
370 break;
371 } catch (Throwable t) {
372 LOG.error(getName() + " HOW? " + t);
373 LOG.debug("exception details", t);
374 break;
377 LOG.info(getName() + " Writer exiting");
382 * @see "https://issues.apache.org/jira/browse/HBASE-3020"
384 @Test
385 public void testRecoveredEditsPathForMeta() throws IOException {
386 Path p = createRecoveredEditsPathForRegion();
387 String parentOfParent = p.getParent().getParent().getName();
388 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
392 * Test old recovered edits file doesn't break WALSplitter.
393 * This is useful in upgrading old instances.
395 @Test
396 public void testOldRecoveredEditsFileSidelined() throws IOException {
397 Path p = createRecoveredEditsPathForRegion();
398 Path tdir = FSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
399 Path regiondir = new Path(tdir,
400 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
401 fs.mkdirs(regiondir);
402 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
403 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
404 fs.createNewFile(parent); // create a recovered.edits file
405 String parentOfParent = p.getParent().getParent().getName();
406 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
407 WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
410 private Path createRecoveredEditsPathForRegion() throws IOException {
411 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
412 long now = System.currentTimeMillis();
413 Entry entry = new Entry(
414 new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
415 new WALEdit());
416 Path p = WALSplitUtil
417 .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
418 TMPDIRNAME, conf);
419 return p;
422 @Test
423 public void testHasRecoveredEdits() throws IOException {
424 Path p = createRecoveredEditsPathForRegion();
425 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
426 String renamedEdit = p.getName().split("-")[0];
427 fs.createNewFile(new Path(p.getParent(), renamedEdit));
428 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
431 private void useDifferentDFSClient() throws IOException {
432 // make fs act as a different client now
433 // initialize will create a new DFSClient with a new client ID
434 fs.initialize(fs.getUri(), conf);
437 @Test
438 public void testSplitPreservesEdits() throws IOException{
439 final String REGION = "region__1";
440 REGIONS.clear();
441 REGIONS.add(REGION);
443 generateWALs(1, 10, -1, 0);
444 useDifferentDFSClient();
445 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
446 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
447 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
448 assertEquals(1, splitLog.length);
450 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
453 @Test
454 public void testSplitRemovesRegionEventsEdits() throws IOException{
455 final String REGION = "region__1";
456 REGIONS.clear();
457 REGIONS.add(REGION);
459 generateWALs(1, 10, -1, 100);
460 useDifferentDFSClient();
461 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
462 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
463 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
464 assertEquals(1, splitLog.length);
466 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
467 // split log should only have the test edits
468 assertEquals(10, countWAL(splitLog[0]));
472 @Test
473 public void testSplitLeavesCompactionEventsEdits() throws IOException{
474 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
475 REGIONS.clear();
476 REGIONS.add(hri.getEncodedName());
477 Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
478 LOG.info("Creating region directory: " + regionDir);
479 assertTrue(fs.mkdirs(regionDir));
481 Writer writer = generateWALs(1, 10, 0, 10);
482 String[] compactInputs = new String[]{"file1", "file2", "file3"};
483 String compactOutput = "file4";
484 appendCompactionEvent(writer, hri, compactInputs, compactOutput);
485 writer.close();
487 useDifferentDFSClient();
488 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
490 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
491 // original log should have 10 test edits, 10 region markers, 1 compaction marker
492 assertEquals(21, countWAL(originalLog));
494 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
495 assertEquals(1, splitLog.length);
497 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
498 // split log should have 10 test edits plus 1 compaction marker
499 assertEquals(11, countWAL(splitLog[0]));
503 * @param expectedEntries -1 to not assert
504 * @return the count across all regions
506 private int splitAndCount(final int expectedFiles, final int expectedEntries)
507 throws IOException {
508 useDifferentDFSClient();
509 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
510 int result = 0;
511 for (String region : REGIONS) {
512 Path[] logfiles = getLogForRegion(TABLE_NAME, region);
513 assertEquals(expectedFiles, logfiles.length);
514 int count = 0;
515 for (Path logfile: logfiles) {
516 count += countWAL(logfile);
518 if (-1 != expectedEntries) {
519 assertEquals(expectedEntries, count);
521 result += count;
523 return result;
526 @Test
527 public void testEmptyLogFiles() throws IOException {
528 testEmptyLogFiles(true);
531 @Test
532 public void testEmptyOpenLogFiles() throws IOException {
533 testEmptyLogFiles(false);
536 private void testEmptyLogFiles(final boolean close) throws IOException {
537 // we won't create the hlog dir until getWAL got called, so
538 // make dir here when testing empty log file
539 fs.mkdirs(WALDIR);
540 injectEmptyFile(".empty", close);
541 generateWALs(Integer.MAX_VALUE);
542 injectEmptyFile("empty", close);
543 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
546 @Test
547 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
548 // generate logs but leave wal.dat.5 open.
549 generateWALs(5);
550 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
553 @Test
554 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
555 conf.setBoolean(HBASE_SKIP_ERRORS, true);
556 generateWALs(Integer.MAX_VALUE);
557 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
558 Corruptions.APPEND_GARBAGE, true);
559 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
562 @Test
563 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
564 conf.setBoolean(HBASE_SKIP_ERRORS, true);
565 generateWALs(Integer.MAX_VALUE);
566 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
567 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
568 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
571 @Test
572 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
573 conf.setBoolean(HBASE_SKIP_ERRORS, true);
574 generateWALs(Integer.MAX_VALUE);
575 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
576 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
577 // the entries in the original logs are alternating regions
578 // considering the sequence file header, the middle corruption should
579 // affect at least half of the entries
580 int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
581 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
582 int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
583 assertTrue("The file up to the corrupted area hasn't been parsed",
584 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
587 @Test
588 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
589 conf.setBoolean(HBASE_SKIP_ERRORS, true);
590 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
591 .asList(FaultyProtobufLogReader.FailureType.values()).stream()
592 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
593 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
594 final Set<String> walDirContents = splitCorruptWALs(failureType);
595 final Set<String> archivedLogs = new HashSet<>();
596 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
597 for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
598 archived.append("\n\t").append(log.toString());
599 archivedLogs.add(log.getPath().getName());
601 LOG.debug(archived.toString());
602 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
603 walDirContents);
608 * @return set of wal names present prior to split attempt.
609 * @throws IOException if the split process fails
611 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
612 throws IOException {
613 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
614 Reader.class);
615 InstrumentedLogWriter.activateFailure = false;
617 try {
618 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
619 Reader.class);
620 conf.set("faultyprotobuflogreader.failuretype", failureType.name());
621 // Clean up from previous tests or previous loop
622 try {
623 wals.shutdown();
624 } catch (IOException exception) {
625 // since we're splitting out from under the factory, we should expect some closing failures.
626 LOG.debug("Ignoring problem closing WALFactory.", exception);
628 wals.close();
629 try {
630 for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
631 fs.delete(log.getPath(), true);
633 } catch (FileNotFoundException exception) {
634 LOG.debug("no previous CORRUPTDIR to clean.");
636 // change to the faulty reader
637 wals = new WALFactory(conf, name.getMethodName());
638 generateWALs(-1);
639 // Our reader will render all of these files corrupt.
640 final Set<String> walDirContents = new HashSet<>();
641 for (FileStatus status : fs.listStatus(WALDIR)) {
642 walDirContents.add(status.getPath().getName());
644 useDifferentDFSClient();
645 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
646 return walDirContents;
647 } finally {
648 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
649 Reader.class);
653 @Test (expected = IOException.class)
654 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
655 throws IOException {
656 conf.setBoolean(HBASE_SKIP_ERRORS, false);
657 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
660 @Test
661 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
662 throws IOException {
663 conf.setBoolean(HBASE_SKIP_ERRORS, false);
664 try {
665 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
666 } catch (IOException e) {
667 LOG.debug("split with 'skip errors' set to 'false' correctly threw");
669 assertEquals("if skip.errors is false all files should remain in place",
670 NUM_WRITERS, fs.listStatus(WALDIR).length);
673 private void ignoreCorruption(final Corruptions corruption, final int entryCount,
674 final int expectedCount) throws IOException {
675 conf.setBoolean(HBASE_SKIP_ERRORS, false);
677 final String REGION = "region__1";
678 REGIONS.clear();
679 REGIONS.add(REGION);
681 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
682 generateWALs(1, entryCount, -1, 0);
683 corruptWAL(c1, corruption, true);
685 useDifferentDFSClient();
686 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
688 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
689 assertEquals(1, splitLog.length);
691 int actualCount = 0;
692 Reader in = wals.createReader(fs, splitLog[0]);
693 @SuppressWarnings("unused")
694 Entry entry;
695 while ((entry = in.next()) != null) ++actualCount;
696 assertEquals(expectedCount, actualCount);
697 in.close();
699 // should not have stored the EOF files as corrupt
700 FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
701 assertEquals(0, archivedLogs.length);
705 @Test
706 public void testEOFisIgnored() throws IOException {
707 int entryCount = 10;
708 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
711 @Test
712 public void testCorruptWALTrailer() throws IOException {
713 int entryCount = 10;
714 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
717 @Test
718 public void testLogsGetArchivedAfterSplit() throws IOException {
719 conf.setBoolean(HBASE_SKIP_ERRORS, false);
720 generateWALs(-1);
721 useDifferentDFSClient();
722 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
723 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
724 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
727 @Test
728 public void testSplit() throws IOException {
729 generateWALs(-1);
730 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
733 @Test
734 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
735 throws IOException {
736 generateWALs(-1);
737 useDifferentDFSClient();
738 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
739 FileStatus [] statuses = null;
740 try {
741 statuses = fs.listStatus(WALDIR);
742 if (statuses != null) {
743 fail("Files left in log dir: " +
744 Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
746 } catch (FileNotFoundException e) {
747 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
751 @Test(expected = IOException.class)
752 public void testSplitWillFailIfWritingToRegionFails() throws Exception {
753 //leave 5th log open so we could append the "trap"
754 Writer writer = generateWALs(4);
755 useDifferentDFSClient();
757 String region = "break";
758 Path regiondir = new Path(TABLEDIR, region);
759 fs.mkdirs(regiondir);
761 InstrumentedLogWriter.activateFailure = false;
762 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
763 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
764 writer.close();
766 try {
767 InstrumentedLogWriter.activateFailure = true;
768 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
769 } catch (IOException e) {
770 assertTrue(e.getMessage().
771 contains("This exception is instrumented and should only be thrown for testing"));
772 throw e;
773 } finally {
774 InstrumentedLogWriter.activateFailure = false;
778 @Test
779 public void testSplitDeletedRegion() throws IOException {
780 REGIONS.clear();
781 String region = "region_that_splits";
782 REGIONS.add(region);
784 generateWALs(1);
785 useDifferentDFSClient();
787 Path regiondir = new Path(TABLEDIR, region);
788 fs.delete(regiondir, true);
789 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
790 assertFalse(fs.exists(regiondir));
793 @Test
794 public void testIOEOnOutputThread() throws Exception {
795 conf.setBoolean(HBASE_SKIP_ERRORS, false);
797 generateWALs(-1);
798 useDifferentDFSClient();
799 FileStatus[] logfiles = fs.listStatus(WALDIR);
800 assertTrue("There should be some log file",
801 logfiles != null && logfiles.length > 0);
802 // wals with no entries (like the one we don't use in the factory)
803 // won't cause a failure since nothing will ever be written.
804 // pick the largest one since it's most likely to have entries.
805 int largestLogFile = 0;
806 long largestSize = 0;
807 for (int i = 0; i < logfiles.length; i++) {
808 if (logfiles[i].getLen() > largestSize) {
809 largestLogFile = i;
810 largestSize = logfiles[i].getLen();
813 assertTrue("There should be some log greater than size 0.", 0 < largestSize);
814 // Set up a splitter that will throw an IOE on the output side
815 WALSplitter logSplitter =
816 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
817 @Override
818 protected Writer createWriter(Path logfile) throws IOException {
819 Writer mockWriter = Mockito.mock(Writer.class);
820 Mockito.doThrow(new IOException("Injected")).when(mockWriter)
821 .append(Mockito.<Entry> any());
822 return mockWriter;
825 // Set up a background thread dumper. Needs a thread to depend on and then we need to run
826 // the thread dumping in a background thread so it does not hold up the test.
827 final AtomicBoolean stop = new AtomicBoolean(false);
828 final Thread someOldThread = new Thread("Some-old-thread") {
829 @Override
830 public void run() {
831 while(!stop.get()) Threads.sleep(10);
834 someOldThread.setDaemon(true);
835 someOldThread.start();
836 final Thread t = new Thread("Background-thread-dumper") {
837 @Override
838 public void run() {
839 try {
840 Threads.threadDumpingIsAlive(someOldThread);
841 } catch (InterruptedException e) {
842 e.printStackTrace();
846 t.setDaemon(true);
847 t.start();
848 try {
849 logSplitter.splitLogFile(logfiles[largestLogFile], null);
850 fail("Didn't throw!");
851 } catch (IOException ioe) {
852 assertTrue(ioe.toString().contains("Injected"));
853 } finally {
854 // Setting this to true will turn off the background thread dumper.
855 stop.set(true);
860 * @param spiedFs should be instrumented for failure.
862 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
863 generateWALs(-1);
864 useDifferentDFSClient();
866 try {
867 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
868 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
869 assertFalse(fs.exists(WALDIR));
870 } catch (IOException e) {
871 fail("There shouldn't be any exception but: " + e.toString());
875 // Test for HBASE-3412
876 @Test
877 public void testMovedWALDuringRecovery() throws Exception {
878 // This partial mock will throw LEE for every file simulating
879 // files that were moved
880 FileSystem spiedFs = Mockito.spy(fs);
881 // The "File does not exist" part is very important,
882 // that's how it comes out of HDFS
883 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
884 when(spiedFs).append(Mockito.<Path>any());
885 retryOverHdfsProblem(spiedFs);
888 @Test
889 public void testRetryOpenDuringRecovery() throws Exception {
890 FileSystem spiedFs = Mockito.spy(fs);
891 // The "Cannot obtain block length", "Could not obtain the last block",
892 // and "Blocklist for [^ ]* has changed.*" part is very important,
893 // that's how it comes out of HDFS. If HDFS changes the exception
894 // message, this test needs to be adjusted accordingly.
896 // When DFSClient tries to open a file, HDFS needs to locate
897 // the last block of the file and get its length. However, if the
898 // last block is under recovery, HDFS may have problem to obtain
899 // the block length, in which case, retry may help.
900 Mockito.doAnswer(new Answer<FSDataInputStream>() {
901 private final String[] errors = new String[] {
902 "Cannot obtain block length", "Could not obtain the last block",
903 "Blocklist for " + OLDLOGDIR + " has changed"};
904 private int count = 0;
906 @Override
907 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
908 if (count < 3) {
909 throw new IOException(errors[count++]);
911 return (FSDataInputStream)invocation.callRealMethod();
913 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
914 retryOverHdfsProblem(spiedFs);
917 @Test
918 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
919 generateWALs(1, 10, -1);
920 FileStatus logfile = fs.listStatus(WALDIR)[0];
921 useDifferentDFSClient();
923 final AtomicInteger count = new AtomicInteger();
925 CancelableProgressable localReporter
926 = new CancelableProgressable() {
927 @Override
928 public boolean progress() {
929 count.getAndIncrement();
930 return false;
934 FileSystem spiedFs = Mockito.spy(fs);
935 Mockito.doAnswer(new Answer<FSDataInputStream>() {
936 @Override
937 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
938 Thread.sleep(1500); // Sleep a while and wait report status invoked
939 return (FSDataInputStream)invocation.callRealMethod();
941 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
943 try {
944 conf.setInt("hbase.splitlog.report.period", 1000);
945 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
946 null, wals, null);
947 assertFalse("Log splitting should failed", ret);
948 assertTrue(count.get() > 0);
949 } catch (IOException e) {
950 fail("There shouldn't be any exception but: " + e.toString());
951 } finally {
952 // reset it back to its default value
953 conf.setInt("hbase.splitlog.report.period", 59000);
958 * Test log split process with fake data and lots of edits to trigger threading
959 * issues.
961 @Test
962 public void testThreading() throws Exception {
963 doTestThreading(20000, 128*1024*1024, 0);
967 * Test blocking behavior of the log split process if writers are writing slower
968 * than the reader is reading.
970 @Test
971 public void testThreadingSlowWriterSmallBuffer() throws Exception {
972 doTestThreading(200, 1024, 50);
976 * Sets up a log splitter with a mock reader and writer. The mock reader generates
977 * a specified number of edits spread across 5 regions. The mock writer optionally
978 * sleeps for each edit it is fed.
980 * After the split is complete, verifies that the statistics show the correct number
981 * of edits output into each region.
983 * @param numFakeEdits number of fake edits to push through pipeline
984 * @param bufferSize size of in-memory buffer
985 * @param writerSlowness writer threads will sleep this many ms per edit
987 private void doTestThreading(final int numFakeEdits,
988 final int bufferSize,
989 final int writerSlowness) throws Exception {
991 Configuration localConf = new Configuration(conf);
992 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
994 // Create a fake log file (we'll override the reader to produce a stream of edits)
995 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
996 FSDataOutputStream out = fs.create(logPath);
997 out.close();
999 // Make region dirs for our destination regions so the output doesn't get skipped
1000 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
1001 makeRegionDirs(regions);
1003 // Create a splitter that reads and writes the data without touching disk
1004 WALSplitter logSplitter =
1005 new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1007 /* Produce a mock writer that doesn't write anywhere */
1008 @Override
1009 protected Writer createWriter(Path logfile) throws IOException {
1010 Writer mockWriter = Mockito.mock(Writer.class);
1011 Mockito.doAnswer(new Answer<Void>() {
1012 int expectedIndex = 0;
1014 @Override
1015 public Void answer(InvocationOnMock invocation) {
1016 if (writerSlowness > 0) {
1017 try {
1018 Thread.sleep(writerSlowness);
1019 } catch (InterruptedException ie) {
1020 Thread.currentThread().interrupt();
1023 Entry entry = (Entry) invocation.getArgument(0);
1024 WALEdit edit = entry.getEdit();
1025 List<Cell> cells = edit.getCells();
1026 assertEquals(1, cells.size());
1027 Cell cell = cells.get(0);
1029 // Check that the edits come in the right order.
1030 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1031 cell.getRowLength()));
1032 expectedIndex++;
1033 return null;
1035 }).when(mockWriter).append(Mockito.<Entry>any());
1036 return mockWriter;
1039 /* Produce a mock reader that generates fake entries */
1040 @Override
1041 protected Reader getReader(Path curLogFile, CancelableProgressable reporter)
1042 throws IOException {
1043 Reader mockReader = Mockito.mock(Reader.class);
1044 Mockito.doAnswer(new Answer<Entry>() {
1045 int index = 0;
1047 @Override
1048 public Entry answer(InvocationOnMock invocation) throws Throwable {
1049 if (index >= numFakeEdits) return null;
1051 // Generate r0 through r4 in round robin fashion
1052 int regionIdx = index % regions.size();
1053 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1055 Entry ret = createTestEntry(TABLE_NAME, region,
1056 Bytes.toBytes(index / regions.size()),
1057 FAMILY, QUALIFIER, VALUE, index);
1058 index++;
1059 return ret;
1061 }).when(mockReader).next();
1062 return mockReader;
1066 logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
1068 // Verify number of written edits per region
1069 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1070 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1071 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1072 assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1074 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1077 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1078 @Test
1079 public void testSplitLogFileDeletedRegionDir() throws IOException {
1080 LOG.info("testSplitLogFileDeletedRegionDir");
1081 final String REGION = "region__1";
1082 REGIONS.clear();
1083 REGIONS.add(REGION);
1085 generateWALs(1, 10, -1);
1086 useDifferentDFSClient();
1088 Path regiondir = new Path(TABLEDIR, REGION);
1089 LOG.info("Region directory is" + regiondir);
1090 fs.delete(regiondir, true);
1091 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1092 assertFalse(fs.exists(regiondir));
1095 @Test
1096 public void testSplitLogFileEmpty() throws IOException {
1097 LOG.info("testSplitLogFileEmpty");
1098 // we won't create the hlog dir until getWAL got called, so
1099 // make dir here when testing empty log file
1100 fs.mkdirs(WALDIR);
1101 injectEmptyFile(".empty", true);
1102 useDifferentDFSClient();
1104 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1105 Path tdir = FSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1106 assertFalse(fs.exists(tdir));
1108 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1111 @Test
1112 public void testSplitLogFileMultipleRegions() throws IOException {
1113 LOG.info("testSplitLogFileMultipleRegions");
1114 generateWALs(1, 10, -1);
1115 splitAndCount(1, 10);
1118 @Test
1119 public void testSplitLogFileFirstLineCorruptionLog()
1120 throws IOException {
1121 conf.setBoolean(HBASE_SKIP_ERRORS, true);
1122 generateWALs(1, 10, -1);
1123 FileStatus logfile = fs.listStatus(WALDIR)[0];
1125 corruptWAL(logfile.getPath(),
1126 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1128 useDifferentDFSClient();
1129 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1131 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1132 assertEquals(1, fs.listStatus(corruptDir).length);
1136 * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1138 @Test
1139 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1140 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1141 // Generate wals for our destination region
1142 String regionName = "r0";
1143 final Path regiondir = new Path(TABLEDIR, regionName);
1144 REGIONS.clear();
1145 REGIONS.add(regionName);
1146 generateWALs(-1);
1148 wals.getWAL(null);
1149 FileStatus[] logfiles = fs.listStatus(WALDIR);
1150 assertTrue("There should be some log file",
1151 logfiles != null && logfiles.length > 0);
1153 WALSplitter logSplitter =
1154 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1155 @Override
1156 protected Writer createWriter(Path logfile)
1157 throws IOException {
1158 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1159 // After creating writer, simulate region's
1160 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1161 // region and delete them, excluding files with '.temp' suffix.
1162 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1163 if (files != null && !files.isEmpty()) {
1164 for (Path file : files) {
1165 if (!this.walFS.delete(file, false)) {
1166 LOG.error("Failed delete of " + file);
1167 } else {
1168 LOG.debug("Deleted recovered.edits file=" + file);
1172 return writer;
1175 try{
1176 logSplitter.splitLogFile(logfiles[0], null);
1177 } catch (IOException e) {
1178 LOG.info(e.toString(), e);
1179 fail("Throws IOException when spliting "
1180 + "log, it is most likely because writing file does not "
1181 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1183 if (fs.exists(CORRUPTDIR)) {
1184 if (fs.listStatus(CORRUPTDIR).length > 0) {
1185 fail("There are some corrupt logs, "
1186 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1191 private Writer generateWALs(int leaveOpen) throws IOException {
1192 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1195 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1196 return generateWALs(writers, entries, leaveOpen, 7);
1199 private void makeRegionDirs(List<String> regions) throws IOException {
1200 for (String region : regions) {
1201 LOG.debug("Creating dir for region " + region);
1202 fs.mkdirs(new Path(TABLEDIR, region));
1207 * @param leaveOpen index to leave un-closed. -1 to close all.
1208 * @return the writer that's still open, or null if all were closed.
1210 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1211 makeRegionDirs(REGIONS);
1212 fs.mkdirs(WALDIR);
1213 Writer [] ws = new Writer[writers];
1214 int seq = 0;
1215 int numRegionEventsAdded = 0;
1216 for (int i = 0; i < writers; i++) {
1217 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1218 for (int j = 0; j < entries; j++) {
1219 int prefix = 0;
1220 for (String region : REGIONS) {
1221 String row_key = region + prefix++ + i + j;
1222 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1223 QUALIFIER, VALUE, seq++);
1225 if (numRegionEventsAdded < regionEvents) {
1226 numRegionEventsAdded ++;
1227 appendRegionEvent(ws[i], region);
1231 if (i != leaveOpen) {
1232 ws[i].close();
1233 LOG.info("Closing writer " + i);
1236 if (leaveOpen < 0 || leaveOpen >= writers) {
1237 return null;
1239 return ws[leaveOpen];
1244 private Path[] getLogForRegion(TableName table, String region)
1245 throws IOException {
1246 Path tdir = FSUtils.getWALTableDir(conf, table);
1247 @SuppressWarnings("deprecation")
1248 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1249 Bytes.toString(Bytes.toBytes(region))));
1250 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1251 @Override
1252 public boolean accept(Path p) {
1253 if (WALSplitUtil.isSequenceIdFile(p)) {
1254 return false;
1256 return true;
1259 Path[] paths = new Path[files.length];
1260 for (int i = 0; i < files.length; i++) {
1261 paths[i] = files[i].getPath();
1263 return paths;
1266 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1267 FSDataOutputStream out;
1268 int fileSize = (int) fs.listStatus(path)[0].getLen();
1270 FSDataInputStream in = fs.open(path);
1271 byte[] corrupted_bytes = new byte[fileSize];
1272 in.readFully(0, corrupted_bytes, 0, fileSize);
1273 in.close();
1275 switch (corruption) {
1276 case APPEND_GARBAGE:
1277 fs.delete(path, false);
1278 out = fs.create(path);
1279 out.write(corrupted_bytes);
1280 out.write(Bytes.toBytes("-----"));
1281 closeOrFlush(close, out);
1282 break;
1284 case INSERT_GARBAGE_ON_FIRST_LINE:
1285 fs.delete(path, false);
1286 out = fs.create(path);
1287 out.write(0);
1288 out.write(corrupted_bytes);
1289 closeOrFlush(close, out);
1290 break;
1292 case INSERT_GARBAGE_IN_THE_MIDDLE:
1293 fs.delete(path, false);
1294 out = fs.create(path);
1295 int middle = (int) Math.floor(corrupted_bytes.length / 2);
1296 out.write(corrupted_bytes, 0, middle);
1297 out.write(0);
1298 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1299 closeOrFlush(close, out);
1300 break;
1302 case TRUNCATE:
1303 fs.delete(path, false);
1304 out = fs.create(path);
1305 out.write(corrupted_bytes, 0, fileSize
1306 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1307 closeOrFlush(close, out);
1308 break;
1310 case TRUNCATE_TRAILER:
1311 fs.delete(path, false);
1312 out = fs.create(path);
1313 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1314 closeOrFlush(close, out);
1315 break;
1319 private void closeOrFlush(boolean close, FSDataOutputStream out)
1320 throws IOException {
1321 if (close) {
1322 out.close();
1323 } else {
1324 Method syncMethod = null;
1325 try {
1326 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1327 } catch (NoSuchMethodException e) {
1328 try {
1329 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1330 } catch (NoSuchMethodException ex) {
1331 throw new IOException("This version of Hadoop supports " +
1332 "neither Syncable.sync() nor Syncable.hflush().");
1335 try {
1336 syncMethod.invoke(out, new Object[]{});
1337 } catch (Exception e) {
1338 throw new IOException(e);
1340 // Not in 0out.hflush();
1344 private int countWAL(Path log) throws IOException {
1345 int count = 0;
1346 Reader in = wals.createReader(fs, log);
1347 while (in.next() != null) {
1348 count++;
1350 in.close();
1351 return count;
1354 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1355 String output) throws IOException {
1356 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1357 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1358 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1359 .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1360 .setFamilyName(ByteString.copyFrom(FAMILY))
1361 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1362 .addAllCompactionInput(Arrays.asList(inputs))
1363 .addCompactionOutput(output);
1365 WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1366 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1367 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1368 w.append(new Entry(key, edit));
1369 w.sync(false);
1372 private static void appendRegionEvent(Writer w, String region) throws IOException {
1373 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1374 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1375 TABLE_NAME.toBytes(),
1376 Bytes.toBytes(region),
1377 Bytes.toBytes(String.valueOf(region.hashCode())),
1379 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1380 final long time = EnvironmentEdgeManager.currentTime();
1381 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1382 HConstants.DEFAULT_CLUSTER_ID);
1383 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1384 w.append(new Entry(walKey, we));
1385 w.sync(false);
1388 public static long appendEntry(Writer writer, TableName table, byte[] region,
1389 byte[] row, byte[] family, byte[] qualifier,
1390 byte[] value, long seq)
1391 throws IOException {
1392 LOG.info(Thread.currentThread().getName() + " append");
1393 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1394 LOG.info(Thread.currentThread().getName() + " sync");
1395 writer.sync(false);
1396 return seq;
1399 private static Entry createTestEntry(
1400 TableName table, byte[] region,
1401 byte[] row, byte[] family, byte[] qualifier,
1402 byte[] value, long seq) {
1403 long time = System.nanoTime();
1405 seq++;
1406 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1407 WALEdit edit = new WALEdit();
1408 edit.add(cell);
1409 return new Entry(new WALKeyImpl(region, table, seq, time,
1410 HConstants.DEFAULT_CLUSTER_ID), edit);
1413 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1414 Writer writer =
1415 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1416 if (closeFile) {
1417 writer.close();
1421 private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1422 Reader in1, in2;
1423 in1 = wals.createReader(fs, p1);
1424 in2 = wals.createReader(fs, p2);
1425 Entry entry1;
1426 Entry entry2;
1427 while ((entry1 = in1.next()) != null) {
1428 entry2 = in2.next();
1429 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1430 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1431 return false;
1434 in1.close();
1435 in2.close();
1436 return true;