HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / TestWALSplit.java
blob3659981c30a30250eab7fd810e233b86aed7cbaf
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import static org.junit.Assert.assertEquals;
21 import static org.junit.Assert.assertFalse;
22 import static org.junit.Assert.assertTrue;
23 import static org.junit.Assert.fail;
24 import java.io.FileNotFoundException;
25 import java.io.IOException;
26 import java.lang.reflect.Method;
27 import java.security.PrivilegedExceptionAction;
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NavigableSet;
36 import java.util.Objects;
37 import java.util.Set;
38 import java.util.concurrent.atomic.AtomicBoolean;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.concurrent.atomic.AtomicLong;
41 import java.util.stream.Collectors;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FSDataInputStream;
44 import org.apache.hadoop.fs.FSDataOutputStream;
45 import org.apache.hadoop.fs.FileStatus;
46 import org.apache.hadoop.fs.FileSystem;
47 import org.apache.hadoop.fs.FileUtil;
48 import org.apache.hadoop.fs.Path;
49 import org.apache.hadoop.fs.PathFilter;
50 import org.apache.hadoop.hbase.Cell;
51 import org.apache.hadoop.hbase.HBaseClassTestRule;
52 import org.apache.hadoop.hbase.HBaseConfiguration;
53 import org.apache.hadoop.hbase.HBaseTestingUtil;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.KeyValue;
56 import org.apache.hadoop.hbase.ServerName;
57 import org.apache.hadoop.hbase.TableName;
58 import org.apache.hadoop.hbase.client.RegionInfo;
59 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
60 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
61 import org.apache.hadoop.hbase.regionserver.HRegion;
62 import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
63 import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
64 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
65 import org.apache.hadoop.hbase.security.User;
66 import org.apache.hadoop.hbase.testclassification.LargeTests;
67 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
68 import org.apache.hadoop.hbase.util.Bytes;
69 import org.apache.hadoop.hbase.util.CancelableProgressable;
70 import org.apache.hadoop.hbase.util.CommonFSUtils;
71 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
72 import org.apache.hadoop.hbase.util.Threads;
73 import org.apache.hadoop.hbase.wal.WAL.Entry;
74 import org.apache.hadoop.hbase.wal.WAL.Reader;
75 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
76 import org.apache.hadoop.hbase.wal.WALSplitter.CorruptedLogFileException;
77 import org.apache.hadoop.hdfs.DFSTestUtil;
78 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
79 import org.apache.hadoop.ipc.RemoteException;
80 import org.junit.After;
81 import org.junit.AfterClass;
82 import org.junit.Before;
83 import org.junit.BeforeClass;
84 import org.junit.ClassRule;
85 import org.junit.Rule;
86 import org.junit.Test;
87 import org.junit.experimental.categories.Category;
88 import org.junit.rules.TestName;
89 import org.mockito.Mockito;
90 import org.mockito.invocation.InvocationOnMock;
91 import org.mockito.stubbing.Answer;
92 import org.slf4j.Logger;
93 import org.slf4j.LoggerFactory;
94 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
95 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
96 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
97 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
98 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
99 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
102 * Testing {@link WAL} splitting code.
104 @Category({RegionServerTests.class, LargeTests.class})
105 public class TestWALSplit {
106 @ClassRule
107 public static final HBaseClassTestRule CLASS_RULE =
108 HBaseClassTestRule.forClass(TestWALSplit.class);
109 private final static Logger LOG = LoggerFactory.getLogger(TestWALSplit.class);
111 private static Configuration conf;
112 private FileSystem fs;
114 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
116 private Path HBASEDIR;
117 private Path HBASELOGDIR;
118 private Path WALDIR;
119 private Path OLDLOGDIR;
120 private Path CORRUPTDIR;
121 private Path TABLEDIR;
122 private String TMPDIRNAME;
124 private static final int NUM_WRITERS = 10;
125 private static final int ENTRIES = 10; // entries per writer per region
127 private static final String FILENAME_BEING_SPLIT = "testfile";
128 private static final TableName TABLE_NAME =
129 TableName.valueOf("t1");
130 private static final byte[] FAMILY = Bytes.toBytes("f1");
131 private static final byte[] QUALIFIER = Bytes.toBytes("q1");
132 private static final byte[] VALUE = Bytes.toBytes("v1");
133 private static final String WAL_FILE_PREFIX = "wal.dat.";
134 private static List<String> REGIONS = new ArrayList<>();
135 private static String ROBBER;
136 private static String ZOMBIE;
137 private static String [] GROUP = new String [] {"supergroup"};
139 static enum Corruptions {
140 INSERT_GARBAGE_ON_FIRST_LINE,
141 INSERT_GARBAGE_IN_THE_MIDDLE,
142 APPEND_GARBAGE,
143 TRUNCATE,
144 TRUNCATE_TRAILER
147 @BeforeClass
148 public static void setUpBeforeClass() throws Exception {
149 conf = TEST_UTIL.getConfiguration();
150 conf.setClass("hbase.regionserver.hlog.writer.impl",
151 InstrumentedLogWriter.class, Writer.class);
152 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
153 System.setProperty("hbase.tests.use.shortcircuit.reads", "false");
154 // Create fake maping user to group and set it to the conf.
155 Map<String, String []> u2g_map = new HashMap<>(2);
156 ROBBER = User.getCurrent().getName() + "-robber";
157 ZOMBIE = User.getCurrent().getName() + "-zombie";
158 u2g_map.put(ROBBER, GROUP);
159 u2g_map.put(ZOMBIE, GROUP);
160 DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
161 conf.setInt("dfs.heartbeat.interval", 1);
162 TEST_UTIL.startMiniDFSCluster(2);
165 @AfterClass
166 public static void tearDownAfterClass() throws Exception {
167 TEST_UTIL.shutdownMiniDFSCluster();
170 @Rule
171 public TestName name = new TestName();
172 private WALFactory wals = null;
174 @Before
175 public void setUp() throws Exception {
176 LOG.info("Cleaning up cluster for new test.");
177 fs = TEST_UTIL.getDFSCluster().getFileSystem();
178 HBASEDIR = TEST_UTIL.createRootDir();
179 HBASELOGDIR = TEST_UTIL.createWALRootDir();
180 OLDLOGDIR = new Path(HBASELOGDIR, HConstants.HREGION_OLDLOGDIR_NAME);
181 CORRUPTDIR = new Path(HBASELOGDIR, HConstants.CORRUPT_DIR_NAME);
182 TABLEDIR = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
183 TMPDIRNAME = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
184 HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
185 REGIONS.clear();
186 Collections.addAll(REGIONS, "bbb", "ccc");
187 InstrumentedLogWriter.activateFailure = false;
188 wals = new WALFactory(conf, name.getMethodName());
189 WALDIR = new Path(HBASELOGDIR,
190 AbstractFSWALProvider.getWALDirectoryName(ServerName.valueOf(name.getMethodName(),
191 16010, EnvironmentEdgeManager.currentTime()).toString()));
192 //fs.mkdirs(WALDIR);
195 @After
196 public void tearDown() throws Exception {
197 try {
198 wals.close();
199 } catch(IOException exception) {
200 // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
201 LOG.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
202 " you see a failure look here.");
203 LOG.debug("exception details", exception);
204 } finally {
205 wals = null;
206 fs.delete(HBASEDIR, true);
207 fs.delete(HBASELOGDIR, true);
212 * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
213 * Ensures we do not lose edits.
215 @Test
216 public void testLogCannotBeWrittenOnceParsed() throws IOException, InterruptedException {
217 final AtomicLong counter = new AtomicLong(0);
218 AtomicBoolean stop = new AtomicBoolean(false);
219 // Region we'll write edits too and then later examine to make sure they all made it in.
220 final String region = REGIONS.get(0);
221 final int numWriters = 3;
222 Thread zombie = new ZombieLastLogWriterRegionServer(counter, stop, region, numWriters);
223 try {
224 long startCount = counter.get();
225 zombie.start();
226 // Wait till writer starts going.
227 while (startCount == counter.get()) Threads.sleep(1);
228 // Give it a second to write a few appends.
229 Threads.sleep(1000);
230 final Configuration conf2 = HBaseConfiguration.create(conf);
231 final User robber = User.createUserForTesting(conf2, ROBBER, GROUP);
232 int count = robber.runAs(new PrivilegedExceptionAction<Integer>() {
233 @Override
234 public Integer run() throws Exception {
235 StringBuilder ls = new StringBuilder("Contents of WALDIR (").append(WALDIR)
236 .append("):\n");
237 for (FileStatus status : fs.listStatus(WALDIR)) {
238 ls.append("\t").append(status.toString()).append("\n");
240 LOG.debug(Objects.toString(ls));
241 LOG.info("Splitting WALs out from under zombie. Expecting " + numWriters + " files.");
242 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf2, wals);
243 LOG.info("Finished splitting out from under zombie.");
244 Path[] logfiles = getLogForRegion(TABLE_NAME, region);
245 assertEquals("wrong number of split files for region", numWriters, logfiles.length);
246 int count = 0;
247 for (Path logfile: logfiles) {
248 count += countWAL(logfile);
250 return count;
253 LOG.info("zombie=" + counter.get() + ", robber=" + count);
254 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
255 "Zombie could write " + counter.get() + " and logfile had only " + count,
256 counter.get() == count || counter.get() + 1 == count);
257 } finally {
258 stop.set(true);
259 zombie.interrupt();
260 Threads.threadDumpingIsAlive(zombie);
265 * This thread will keep writing to a 'wal' file even after the split process has started.
266 * It simulates a region server that was considered dead but woke up and wrote some more to the
267 * last log entry. Does its writing as an alternate user in another filesystem instance to
268 * simulate better it being a regionserver.
270 class ZombieLastLogWriterRegionServer extends Thread {
271 final AtomicLong editsCount;
272 final AtomicBoolean stop;
273 final int numOfWriters;
275 * Region to write edits for.
277 final String region;
278 final User user;
280 public ZombieLastLogWriterRegionServer(AtomicLong counter, AtomicBoolean stop,
281 final String region, final int writers)
282 throws IOException, InterruptedException {
283 super("ZombieLastLogWriterRegionServer");
284 setDaemon(true);
285 this.stop = stop;
286 this.editsCount = counter;
287 this.region = region;
288 this.user = User.createUserForTesting(conf, ZOMBIE, GROUP);
289 numOfWriters = writers;
292 @Override
293 public void run() {
294 try {
295 doWriting();
296 } catch (IOException e) {
297 LOG.warn(getName() + " Writer exiting " + e);
298 } catch (InterruptedException e) {
299 LOG.warn(getName() + " Writer exiting " + e);
303 private void doWriting() throws IOException, InterruptedException {
304 this.user.runAs(new PrivilegedExceptionAction<Object>() {
305 @Override
306 public Object run() throws Exception {
307 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
308 // index we supply here.
309 int walToKeepOpen = numOfWriters - 1;
310 // The below method writes numOfWriters files each with ENTRIES entries for a total of
311 // numOfWriters * ENTRIES added per column family in the region.
312 Writer writer = null;
313 try {
314 writer = generateWALs(numOfWriters, ENTRIES, walToKeepOpen);
315 } catch (IOException e1) {
316 throw new RuntimeException("Failed", e1);
318 // Update counter so has all edits written so far.
319 editsCount.addAndGet(numOfWriters * ENTRIES);
320 loop(writer);
321 // If we've been interruped, then things should have shifted out from under us.
322 // closing should error
323 try {
324 writer.close();
325 fail("Writing closing after parsing should give an error.");
326 } catch (IOException exception) {
327 LOG.debug("ignoring error when closing final writer.", exception);
329 return null;
334 private void loop(final Writer writer) {
335 byte [] regionBytes = Bytes.toBytes(this.region);
336 while (!stop.get()) {
337 try {
338 long seq = appendEntry(writer, TABLE_NAME, regionBytes,
339 Bytes.toBytes("r" + editsCount.get()), regionBytes, QUALIFIER, VALUE, 0);
340 long count = editsCount.incrementAndGet();
341 LOG.info(getName() + " sync count=" + count + ", seq=" + seq);
342 try {
343 Thread.sleep(1);
344 } catch (InterruptedException e) {
347 } catch (IOException ex) {
348 LOG.error(getName() + " ex " + ex.toString());
349 if (ex instanceof RemoteException) {
350 LOG.error("Juliet: got RemoteException " + ex.getMessage() +
351 " while writing " + (editsCount.get() + 1));
352 } else {
353 LOG.error(getName() + " failed to write....at " + editsCount.get());
354 fail("Failed to write " + editsCount.get());
356 break;
357 } catch (Throwable t) {
358 LOG.error(getName() + " HOW? " + t);
359 LOG.debug("exception details", t);
360 break;
363 LOG.info(getName() + " Writer exiting");
368 * @see "https://issues.apache.org/jira/browse/HBASE-3020"
370 @Test
371 public void testRecoveredEditsPathForMeta() throws IOException {
372 Path p = createRecoveredEditsPathForRegion();
373 String parentOfParent = p.getParent().getParent().getName();
374 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
378 * Test old recovered edits file doesn't break WALSplitter.
379 * This is useful in upgrading old instances.
381 @Test
382 public void testOldRecoveredEditsFileSidelined() throws IOException {
383 Path p = createRecoveredEditsPathForRegion();
384 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TableName.META_TABLE_NAME);
385 Path regiondir = new Path(tdir,
386 RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
387 fs.mkdirs(regiondir);
388 Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
389 assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
390 fs.createNewFile(parent); // create a recovered.edits file
391 String parentOfParent = p.getParent().getParent().getName();
392 assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
393 WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
396 private Path createRecoveredEditsPathForRegion() throws IOException {
397 byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
398 long now = EnvironmentEdgeManager.currentTime();
399 Entry entry = new Entry(
400 new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
401 new WALEdit());
402 Path p = WALSplitUtil
403 .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
404 TMPDIRNAME, conf);
405 return p;
408 @Test
409 public void testHasRecoveredEdits() throws IOException {
410 Path p = createRecoveredEditsPathForRegion();
411 assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
412 String renamedEdit = p.getName().split("-")[0];
413 fs.createNewFile(new Path(p.getParent(), renamedEdit));
414 assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO));
417 private void useDifferentDFSClient() throws IOException {
418 // make fs act as a different client now
419 // initialize will create a new DFSClient with a new client ID
420 fs.initialize(fs.getUri(), conf);
423 @Test
424 public void testSplitPreservesEdits() throws IOException{
425 final String REGION = "region__1";
426 REGIONS.clear();
427 REGIONS.add(REGION);
429 generateWALs(1, 10, -1, 0);
430 useDifferentDFSClient();
431 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
432 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
433 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
434 assertEquals(1, splitLog.length);
436 assertTrue("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
439 @Test
440 public void testSplitRemovesRegionEventsEdits() throws IOException{
441 final String REGION = "region__1";
442 REGIONS.clear();
443 REGIONS.add(REGION);
445 generateWALs(1, 10, -1, 100);
446 useDifferentDFSClient();
447 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
448 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
449 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
450 assertEquals(1, splitLog.length);
452 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
453 // split log should only have the test edits
454 assertEquals(10, countWAL(splitLog[0]));
458 @Test
459 public void testSplitLeavesCompactionEventsEdits() throws IOException{
460 RegionInfo hri = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
461 REGIONS.clear();
462 REGIONS.add(hri.getEncodedName());
463 Path regionDir =
464 new Path(CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME), hri.getEncodedName());
465 LOG.info("Creating region directory: " + regionDir);
466 assertTrue(fs.mkdirs(regionDir));
468 Writer writer = generateWALs(1, 10, 0, 10);
469 String[] compactInputs = new String[]{"file1", "file2", "file3"};
470 String compactOutput = "file4";
471 appendCompactionEvent(writer, hri, compactInputs, compactOutput);
472 writer.close();
474 useDifferentDFSClient();
475 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
477 Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
478 // original log should have 10 test edits, 10 region markers, 1 compaction marker
479 assertEquals(21, countWAL(originalLog));
481 Path[] splitLog = getLogForRegion(TABLE_NAME, hri.getEncodedName());
482 assertEquals(1, splitLog.length);
484 assertFalse("edits differ after split", logsAreEqual(originalLog, splitLog[0]));
485 // split log should have 10 test edits plus 1 compaction marker
486 assertEquals(11, countWAL(splitLog[0]));
490 * @param expectedEntries -1 to not assert
491 * @return the count across all regions
493 private int splitAndCount(final int expectedFiles, final int expectedEntries)
494 throws IOException {
495 useDifferentDFSClient();
496 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
497 int result = 0;
498 for (String region : REGIONS) {
499 Path[] logfiles = getLogForRegion(TABLE_NAME, region);
500 assertEquals(expectedFiles, logfiles.length);
501 int count = 0;
502 for (Path logfile: logfiles) {
503 count += countWAL(logfile);
505 if (-1 != expectedEntries) {
506 assertEquals(expectedEntries, count);
508 result += count;
510 return result;
513 @Test
514 public void testEmptyLogFiles() throws IOException {
515 testEmptyLogFiles(true);
518 @Test
519 public void testEmptyOpenLogFiles() throws IOException {
520 testEmptyLogFiles(false);
523 private void testEmptyLogFiles(final boolean close) throws IOException {
524 // we won't create the hlog dir until getWAL got called, so
525 // make dir here when testing empty log file
526 fs.mkdirs(WALDIR);
527 injectEmptyFile(".empty", close);
528 generateWALs(Integer.MAX_VALUE);
529 injectEmptyFile("empty", close);
530 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES); // skip 2 empty
533 @Test
534 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException {
535 // generate logs but leave wal.dat.5 open.
536 generateWALs(5);
537 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
540 @Test
541 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException {
542 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
543 generateWALs(Integer.MAX_VALUE);
544 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
545 Corruptions.APPEND_GARBAGE, true);
546 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
549 @Test
550 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException {
551 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
552 generateWALs(Integer.MAX_VALUE);
553 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
554 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
555 splitAndCount(NUM_WRITERS - 1, (NUM_WRITERS - 1) * ENTRIES); //1 corrupt
558 @Test
559 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
560 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
561 generateWALs(Integer.MAX_VALUE);
562 corruptWAL(new Path(WALDIR, WAL_FILE_PREFIX + "5"),
563 Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false);
564 // the entries in the original logs are alternating regions
565 // considering the sequence file header, the middle corruption should
566 // affect at least half of the entries
567 int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
568 int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
569 int allRegionsCount = splitAndCount(NUM_WRITERS, -1);
570 assertTrue("The file up to the corrupted area hasn't been parsed",
571 REGIONS.size() * (goodEntries + firstHalfEntries) <= allRegionsCount);
574 @Test
575 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException {
576 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
577 List<FaultyProtobufLogReader.FailureType> failureTypes = Arrays
578 .asList(FaultyProtobufLogReader.FailureType.values()).stream()
579 .filter(x -> x != FaultyProtobufLogReader.FailureType.NONE).collect(Collectors.toList());
580 for (FaultyProtobufLogReader.FailureType failureType : failureTypes) {
581 final Set<String> walDirContents = splitCorruptWALs(failureType);
582 final Set<String> archivedLogs = new HashSet<>();
583 final StringBuilder archived = new StringBuilder("Archived logs in CORRUPTDIR:");
584 for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
585 archived.append("\n\t").append(log.toString());
586 archivedLogs.add(log.getPath().getName());
588 LOG.debug(archived.toString());
589 assertEquals(failureType.name() + ": expected to find all of our wals corrupt.", archivedLogs,
590 walDirContents);
595 * @return set of wal names present prior to split attempt.
596 * @throws IOException if the split process fails
598 private Set<String> splitCorruptWALs(final FaultyProtobufLogReader.FailureType failureType)
599 throws IOException {
600 Class<?> backupClass = conf.getClass("hbase.regionserver.hlog.reader.impl",
601 Reader.class);
602 InstrumentedLogWriter.activateFailure = false;
604 try {
605 conf.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader.class,
606 Reader.class);
607 conf.set("faultyprotobuflogreader.failuretype", failureType.name());
608 // Clean up from previous tests or previous loop
609 try {
610 wals.shutdown();
611 } catch (IOException exception) {
612 // since we're splitting out from under the factory, we should expect some closing failures.
613 LOG.debug("Ignoring problem closing WALFactory.", exception);
615 wals.close();
616 try {
617 for (FileStatus log : fs.listStatus(CORRUPTDIR)) {
618 fs.delete(log.getPath(), true);
620 } catch (FileNotFoundException exception) {
621 LOG.debug("no previous CORRUPTDIR to clean.");
623 // change to the faulty reader
624 wals = new WALFactory(conf, name.getMethodName());
625 generateWALs(-1);
626 // Our reader will render all of these files corrupt.
627 final Set<String> walDirContents = new HashSet<>();
628 for (FileStatus status : fs.listStatus(WALDIR)) {
629 walDirContents.add(status.getPath().getName());
631 useDifferentDFSClient();
632 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
633 return walDirContents;
634 } finally {
635 conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
636 Reader.class);
640 @Test (expected = IOException.class)
641 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
642 throws IOException {
643 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
644 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
647 @Test
648 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
649 throws IOException {
650 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
651 try {
652 splitCorruptWALs(FaultyProtobufLogReader.FailureType.BEGINNING);
653 } catch (IOException e) {
654 LOG.debug("split with 'skip errors' set to 'false' correctly threw");
656 assertEquals("if skip.errors is false all files should remain in place",
657 NUM_WRITERS, fs.listStatus(WALDIR).length);
660 private void ignoreCorruption(final Corruptions corruption, final int entryCount,
661 final int expectedCount) throws IOException {
662 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
664 final String REGION = "region__1";
665 REGIONS.clear();
666 REGIONS.add(REGION);
668 Path c1 = new Path(WALDIR, WAL_FILE_PREFIX + "0");
669 generateWALs(1, entryCount, -1, 0);
670 corruptWAL(c1, corruption, true);
672 useDifferentDFSClient();
673 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
675 Path[] splitLog = getLogForRegion(TABLE_NAME, REGION);
676 assertEquals(1, splitLog.length);
678 int actualCount = 0;
679 Reader in = wals.createReader(fs, splitLog[0]);
680 @SuppressWarnings("unused")
681 Entry entry;
682 while ((entry = in.next()) != null) ++actualCount;
683 assertEquals(expectedCount, actualCount);
684 in.close();
686 // should not have stored the EOF files as corrupt
687 FileStatus[] archivedLogs = fs.exists(CORRUPTDIR)? fs.listStatus(CORRUPTDIR): new FileStatus[0];
688 assertEquals(0, archivedLogs.length);
692 @Test
693 public void testEOFisIgnored() throws IOException {
694 int entryCount = 10;
695 ignoreCorruption(Corruptions.TRUNCATE, entryCount, entryCount-1);
698 @Test
699 public void testCorruptWALTrailer() throws IOException {
700 int entryCount = 10;
701 ignoreCorruption(Corruptions.TRUNCATE_TRAILER, entryCount, entryCount);
704 @Test
705 public void testLogsGetArchivedAfterSplit() throws IOException {
706 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
707 generateWALs(-1);
708 useDifferentDFSClient();
709 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
710 FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
711 assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
714 @Test
715 public void testSplit() throws IOException {
716 generateWALs(-1);
717 splitAndCount(NUM_WRITERS, NUM_WRITERS * ENTRIES);
720 @Test
721 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
722 throws IOException {
723 generateWALs(-1);
724 useDifferentDFSClient();
725 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
726 FileStatus [] statuses = null;
727 try {
728 statuses = fs.listStatus(WALDIR);
729 if (statuses != null) {
730 fail("Files left in log dir: " +
731 Joiner.on(",").join(FileUtil.stat2Paths(statuses)));
733 } catch (FileNotFoundException e) {
734 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
738 @Test(expected = IOException.class)
739 public void testSplitWillFailIfWritingToRegionFails() throws Exception {
740 //leave 5th log open so we could append the "trap"
741 Writer writer = generateWALs(4);
742 useDifferentDFSClient();
744 String region = "break";
745 Path regiondir = new Path(TABLEDIR, region);
746 fs.mkdirs(regiondir);
748 InstrumentedLogWriter.activateFailure = false;
749 appendEntry(writer, TABLE_NAME, Bytes.toBytes(region),
750 Bytes.toBytes("r" + 999), FAMILY, QUALIFIER, VALUE, 0);
751 writer.close();
753 try {
754 InstrumentedLogWriter.activateFailure = true;
755 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
756 } catch (IOException e) {
757 assertTrue(e.getMessage().
758 contains("This exception is instrumented and should only be thrown for testing"));
759 throw e;
760 } finally {
761 InstrumentedLogWriter.activateFailure = false;
765 @Test
766 public void testSplitDeletedRegion() throws IOException {
767 REGIONS.clear();
768 String region = "region_that_splits";
769 REGIONS.add(region);
771 generateWALs(1);
772 useDifferentDFSClient();
774 Path regiondir = new Path(TABLEDIR, region);
775 fs.delete(regiondir, true);
776 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
777 assertFalse(fs.exists(regiondir));
780 @Test
781 public void testIOEOnOutputThread() throws Exception {
782 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, false);
784 generateWALs(-1);
785 useDifferentDFSClient();
786 FileStatus[] logfiles = fs.listStatus(WALDIR);
787 assertTrue("There should be some log file",
788 logfiles != null && logfiles.length > 0);
789 // wals with no entries (like the one we don't use in the factory)
790 // won't cause a failure since nothing will ever be written.
791 // pick the largest one since it's most likely to have entries.
792 int largestLogFile = 0;
793 long largestSize = 0;
794 for (int i = 0; i < logfiles.length; i++) {
795 if (logfiles[i].getLen() > largestSize) {
796 largestLogFile = i;
797 largestSize = logfiles[i].getLen();
800 assertTrue("There should be some log greater than size 0.", 0 < largestSize);
801 // Set up a splitter that will throw an IOE on the output side
802 WALSplitter logSplitter =
803 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
804 @Override
805 protected Writer createWriter(Path logfile) throws IOException {
806 Writer mockWriter = Mockito.mock(Writer.class);
807 Mockito.doThrow(new IOException("Injected")).when(mockWriter)
808 .append(Mockito.<Entry> any());
809 return mockWriter;
812 // Set up a background thread dumper. Needs a thread to depend on and then we need to run
813 // the thread dumping in a background thread so it does not hold up the test.
814 final AtomicBoolean stop = new AtomicBoolean(false);
815 final Thread someOldThread = new Thread("Some-old-thread") {
816 @Override
817 public void run() {
818 while(!stop.get()) Threads.sleep(10);
821 someOldThread.setDaemon(true);
822 someOldThread.start();
823 final Thread t = new Thread("Background-thread-dumper") {
824 @Override
825 public void run() {
826 try {
827 Threads.threadDumpingIsAlive(someOldThread);
828 } catch (InterruptedException e) {
829 e.printStackTrace();
833 t.setDaemon(true);
834 t.start();
835 try {
836 logSplitter.splitWAL(logfiles[largestLogFile], null);
837 fail("Didn't throw!");
838 } catch (IOException ioe) {
839 assertTrue(ioe.toString().contains("Injected"));
840 } finally {
841 // Setting this to true will turn off the background thread dumper.
842 stop.set(true);
847 * @param spiedFs should be instrumented for failure.
849 private void retryOverHdfsProblem(final FileSystem spiedFs) throws Exception {
850 generateWALs(-1);
851 useDifferentDFSClient();
853 try {
854 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, spiedFs, conf, wals);
855 assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
856 assertFalse(fs.exists(WALDIR));
857 } catch (IOException e) {
858 fail("There shouldn't be any exception but: " + e.toString());
862 // Test for HBASE-3412
863 @Test
864 public void testMovedWALDuringRecovery() throws Exception {
865 // This partial mock will throw LEE for every file simulating
866 // files that were moved
867 FileSystem spiedFs = Mockito.spy(fs);
868 // The "File does not exist" part is very important,
869 // that's how it comes out of HDFS
870 Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
871 when(spiedFs).append(Mockito.<Path>any());
872 retryOverHdfsProblem(spiedFs);
875 @Test
876 public void testRetryOpenDuringRecovery() throws Exception {
877 FileSystem spiedFs = Mockito.spy(fs);
878 // The "Cannot obtain block length", "Could not obtain the last block",
879 // and "Blocklist for [^ ]* has changed.*" part is very important,
880 // that's how it comes out of HDFS. If HDFS changes the exception
881 // message, this test needs to be adjusted accordingly.
883 // When DFSClient tries to open a file, HDFS needs to locate
884 // the last block of the file and get its length. However, if the
885 // last block is under recovery, HDFS may have problem to obtain
886 // the block length, in which case, retry may help.
887 Mockito.doAnswer(new Answer<FSDataInputStream>() {
888 private final String[] errors = new String[] {
889 "Cannot obtain block length", "Could not obtain the last block",
890 "Blocklist for " + OLDLOGDIR + " has changed"};
891 private int count = 0;
893 @Override
894 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
895 if (count < 3) {
896 throw new IOException(errors[count++]);
898 return (FSDataInputStream)invocation.callRealMethod();
900 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
901 retryOverHdfsProblem(spiedFs);
904 @Test
905 public void testTerminationAskedByReporter() throws IOException, CorruptedLogFileException {
906 generateWALs(1, 10, -1);
907 FileStatus logfile = fs.listStatus(WALDIR)[0];
908 useDifferentDFSClient();
910 final AtomicInteger count = new AtomicInteger();
912 CancelableProgressable localReporter
913 = new CancelableProgressable() {
914 @Override
915 public boolean progress() {
916 count.getAndIncrement();
917 return false;
921 FileSystem spiedFs = Mockito.spy(fs);
922 Mockito.doAnswer(new Answer<FSDataInputStream>() {
923 @Override
924 public FSDataInputStream answer(InvocationOnMock invocation) throws Throwable {
925 Thread.sleep(1500); // Sleep a while and wait report status invoked
926 return (FSDataInputStream)invocation.callRealMethod();
928 }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
930 try {
931 conf.setInt("hbase.splitlog.report.period", 1000);
932 boolean ret = WALSplitter.splitLogFile(HBASEDIR, logfile, spiedFs, conf, localReporter, null,
933 Mockito.mock(SplitLogWorkerCoordination.class), wals, null);
934 assertFalse("Log splitting should failed", ret);
935 assertTrue(count.get() > 0);
936 } catch (IOException e) {
937 fail("There shouldn't be any exception but: " + e.toString());
938 } finally {
939 // reset it back to its default value
940 conf.setInt("hbase.splitlog.report.period", 59000);
945 * Test log split process with fake data and lots of edits to trigger threading
946 * issues.
948 @Test
949 public void testThreading() throws Exception {
950 doTestThreading(20000, 128*1024*1024, 0);
954 * Test blocking behavior of the log split process if writers are writing slower
955 * than the reader is reading.
957 @Test
958 public void testThreadingSlowWriterSmallBuffer() throws Exception {
959 doTestThreading(200, 1024, 50);
963 * Sets up a log splitter with a mock reader and writer. The mock reader generates
964 * a specified number of edits spread across 5 regions. The mock writer optionally
965 * sleeps for each edit it is fed.
967 * After the split is complete, verifies that the statistics show the correct number
968 * of edits output into each region.
970 * @param numFakeEdits number of fake edits to push through pipeline
971 * @param bufferSize size of in-memory buffer
972 * @param writerSlowness writer threads will sleep this many ms per edit
974 private void doTestThreading(final int numFakeEdits,
975 final int bufferSize,
976 final int writerSlowness) throws Exception {
978 Configuration localConf = new Configuration(conf);
979 localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
981 // Create a fake log file (we'll override the reader to produce a stream of edits)
982 Path logPath = new Path(WALDIR, WAL_FILE_PREFIX + ".fake");
983 FSDataOutputStream out = fs.create(logPath);
984 out.close();
986 // Make region dirs for our destination regions so the output doesn't get skipped
987 final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4");
988 makeRegionDirs(regions);
990 // Create a splitter that reads and writes the data without touching disk
991 WALSplitter logSplitter = new WALSplitter(wals, localConf, HBASEDIR, fs, HBASEDIR, fs) {
992 /* Produce a mock writer that doesn't write anywhere */
993 @Override
994 protected Writer createWriter(Path logfile) throws IOException {
995 Writer mockWriter = Mockito.mock(Writer.class);
996 Mockito.doAnswer(new Answer<Void>() {
997 int expectedIndex = 0;
999 @Override
1000 public Void answer(InvocationOnMock invocation) {
1001 if (writerSlowness > 0) {
1002 try {
1003 Thread.sleep(writerSlowness);
1004 } catch (InterruptedException ie) {
1005 Thread.currentThread().interrupt();
1008 Entry entry = (Entry) invocation.getArgument(0);
1009 WALEdit edit = entry.getEdit();
1010 List<Cell> cells = edit.getCells();
1011 assertEquals(1, cells.size());
1012 Cell cell = cells.get(0);
1014 // Check that the edits come in the right order.
1015 assertEquals(expectedIndex, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(),
1016 cell.getRowLength()));
1017 expectedIndex++;
1018 return null;
1020 }).when(mockWriter).append(Mockito.<Entry>any());
1021 return mockWriter;
1024 /* Produce a mock reader that generates fake entries */
1025 @Override
1026 protected Reader getReader(FileStatus file, boolean skipErrors,
1027 CancelableProgressable reporter) throws IOException, CorruptedLogFileException {
1028 Reader mockReader = Mockito.mock(Reader.class);
1029 Mockito.doAnswer(new Answer<Entry>() {
1030 int index = 0;
1032 @Override
1033 public Entry answer(InvocationOnMock invocation) throws Throwable {
1034 if (index >= numFakeEdits) return null;
1036 // Generate r0 through r4 in round robin fashion
1037 int regionIdx = index % regions.size();
1038 byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
1040 Entry ret = createTestEntry(TABLE_NAME, region,
1041 Bytes.toBytes(index / regions.size()),
1042 FAMILY, QUALIFIER, VALUE, index);
1043 index++;
1044 return ret;
1046 }).when(mockReader).next();
1047 return mockReader;
1051 logSplitter.splitWAL(fs.getFileStatus(logPath), null);
1053 // Verify number of written edits per region
1054 Map<String, Long> outputCounts = logSplitter.outputSink.getOutputCounts();
1055 for (Map.Entry<String, Long> entry : outputCounts.entrySet()) {
1056 LOG.info("Got " + entry.getValue() + " output edits for region " + entry.getKey());
1057 assertEquals((long) entry.getValue(), numFakeEdits / regions.size());
1059 assertEquals("Should have as many outputs as regions", regions.size(), outputCounts.size());
1062 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1063 @Test
1064 public void testSplitLogFileDeletedRegionDir() throws IOException {
1065 LOG.info("testSplitLogFileDeletedRegionDir");
1066 final String REGION = "region__1";
1067 REGIONS.clear();
1068 REGIONS.add(REGION);
1070 generateWALs(1, 10, -1);
1071 useDifferentDFSClient();
1073 Path regiondir = new Path(TABLEDIR, REGION);
1074 LOG.info("Region directory is" + regiondir);
1075 fs.delete(regiondir, true);
1076 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1077 assertFalse(fs.exists(regiondir));
1080 @Test
1081 public void testSplitLogFileEmpty() throws IOException {
1082 LOG.info("testSplitLogFileEmpty");
1083 // we won't create the hlog dir until getWAL got called, so
1084 // make dir here when testing empty log file
1085 fs.mkdirs(WALDIR);
1086 injectEmptyFile(".empty", true);
1087 useDifferentDFSClient();
1089 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1090 Path tdir = CommonFSUtils.getTableDir(HBASEDIR, TABLE_NAME);
1091 assertFalse(fs.exists(tdir));
1093 assertEquals(0, countWAL(fs.listStatus(OLDLOGDIR)[0].getPath()));
1096 @Test
1097 public void testSplitLogFileMultipleRegions() throws IOException {
1098 LOG.info("testSplitLogFileMultipleRegions");
1099 generateWALs(1, 10, -1);
1100 splitAndCount(1, 10);
1103 @Test
1104 public void testSplitLogFileFirstLineCorruptionLog()
1105 throws IOException {
1106 conf.setBoolean(WALSplitter.SPLIT_SKIP_ERRORS_KEY, true);
1107 generateWALs(1, 10, -1);
1108 FileStatus logfile = fs.listStatus(WALDIR)[0];
1110 corruptWAL(logfile.getPath(),
1111 Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true);
1113 useDifferentDFSClient();
1114 WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
1116 final Path corruptDir =
1117 new Path(CommonFSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
1118 assertEquals(1, fs.listStatus(corruptDir).length);
1122 * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1124 @Test
1125 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
1126 LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
1127 // Generate wals for our destination region
1128 String regionName = "r0";
1129 final Path regiondir = new Path(TABLEDIR, regionName);
1130 REGIONS.clear();
1131 REGIONS.add(regionName);
1132 generateWALs(-1);
1134 wals.getWAL(null);
1135 FileStatus[] logfiles = fs.listStatus(WALDIR);
1136 assertTrue("There should be some log file",
1137 logfiles != null && logfiles.length > 0);
1139 WALSplitter logSplitter =
1140 new WALSplitter(wals, conf, HBASEDIR, fs, HBASEDIR, fs, null, null, null) {
1141 @Override
1142 protected Writer createWriter(Path logfile)
1143 throws IOException {
1144 Writer writer = wals.createRecoveredEditsWriter(this.walFS, logfile);
1145 // After creating writer, simulate region's
1146 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1147 // region and delete them, excluding files with '.temp' suffix.
1148 NavigableSet<Path> files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir);
1149 if (files != null && !files.isEmpty()) {
1150 for (Path file : files) {
1151 if (!this.walFS.delete(file, false)) {
1152 LOG.error("Failed delete of " + file);
1153 } else {
1154 LOG.debug("Deleted recovered.edits file=" + file);
1158 return writer;
1161 try{
1162 logSplitter.splitWAL(logfiles[0], null);
1163 } catch (IOException e) {
1164 LOG.info(e.toString(), e);
1165 fail("Throws IOException when spliting "
1166 + "log, it is most likely because writing file does not "
1167 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1169 if (fs.exists(CORRUPTDIR)) {
1170 if (fs.listStatus(CORRUPTDIR).length > 0) {
1171 fail("There are some corrupt logs, "
1172 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1177 @Test
1178 public void testRecoveredEditsStoragePolicy() throws IOException {
1179 conf.set(HConstants.WAL_STORAGE_POLICY, "ALL_SSD");
1180 try {
1181 Path path = createRecoveredEditsPathForRegion();
1182 assertEquals("ALL_SSD", fs.getStoragePolicy(path.getParent()).getName());
1183 } finally {
1184 conf.unset(HConstants.WAL_STORAGE_POLICY);
1189 private Writer generateWALs(int leaveOpen) throws IOException {
1190 return generateWALs(NUM_WRITERS, ENTRIES, leaveOpen, 0);
1193 private Writer generateWALs(int writers, int entries, int leaveOpen) throws IOException {
1194 return generateWALs(writers, entries, leaveOpen, 7);
1197 private void makeRegionDirs(List<String> regions) throws IOException {
1198 for (String region : regions) {
1199 LOG.debug("Creating dir for region " + region);
1200 fs.mkdirs(new Path(TABLEDIR, region));
1205 * @param leaveOpen index to leave un-closed. -1 to close all.
1206 * @return the writer that's still open, or null if all were closed.
1208 private Writer generateWALs(int writers, int entries, int leaveOpen, int regionEvents) throws IOException {
1209 makeRegionDirs(REGIONS);
1210 fs.mkdirs(WALDIR);
1211 Writer [] ws = new Writer[writers];
1212 int seq = 0;
1213 int numRegionEventsAdded = 0;
1214 for (int i = 0; i < writers; i++) {
1215 ws[i] = wals.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + i));
1216 for (int j = 0; j < entries; j++) {
1217 int prefix = 0;
1218 for (String region : REGIONS) {
1219 String row_key = region + prefix++ + i + j;
1220 appendEntry(ws[i], TABLE_NAME, Bytes.toBytes(region), Bytes.toBytes(row_key), FAMILY,
1221 QUALIFIER, VALUE, seq++);
1223 if (numRegionEventsAdded < regionEvents) {
1224 numRegionEventsAdded ++;
1225 appendRegionEvent(ws[i], region);
1229 if (i != leaveOpen) {
1230 ws[i].close();
1231 LOG.info("Closing writer " + i);
1234 if (leaveOpen < 0 || leaveOpen >= writers) {
1235 return null;
1237 return ws[leaveOpen];
1242 private Path[] getLogForRegion(TableName table, String region)
1243 throws IOException {
1244 Path tdir = CommonFSUtils.getWALTableDir(conf, table);
1245 @SuppressWarnings("deprecation")
1246 Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
1247 Bytes.toString(Bytes.toBytes(region))));
1248 FileStatus[] files = fs.listStatus(editsdir, new PathFilter() {
1249 @Override
1250 public boolean accept(Path p) {
1251 if (WALSplitUtil.isSequenceIdFile(p)) {
1252 return false;
1254 return true;
1257 Path[] paths = new Path[files.length];
1258 for (int i = 0; i < files.length; i++) {
1259 paths[i] = files[i].getPath();
1261 return paths;
1264 private void corruptWAL(Path path, Corruptions corruption, boolean close) throws IOException {
1265 FSDataOutputStream out;
1266 int fileSize = (int) fs.listStatus(path)[0].getLen();
1268 FSDataInputStream in = fs.open(path);
1269 byte[] corrupted_bytes = new byte[fileSize];
1270 in.readFully(0, corrupted_bytes, 0, fileSize);
1271 in.close();
1273 switch (corruption) {
1274 case APPEND_GARBAGE:
1275 fs.delete(path, false);
1276 out = fs.create(path);
1277 out.write(corrupted_bytes);
1278 out.write(Bytes.toBytes("-----"));
1279 closeOrFlush(close, out);
1280 break;
1282 case INSERT_GARBAGE_ON_FIRST_LINE:
1283 fs.delete(path, false);
1284 out = fs.create(path);
1285 out.write(0);
1286 out.write(corrupted_bytes);
1287 closeOrFlush(close, out);
1288 break;
1290 case INSERT_GARBAGE_IN_THE_MIDDLE:
1291 fs.delete(path, false);
1292 out = fs.create(path);
1293 int middle = (int) Math.floor(corrupted_bytes.length / 2);
1294 out.write(corrupted_bytes, 0, middle);
1295 out.write(0);
1296 out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
1297 closeOrFlush(close, out);
1298 break;
1300 case TRUNCATE:
1301 fs.delete(path, false);
1302 out = fs.create(path);
1303 out.write(corrupted_bytes, 0, fileSize
1304 - (32 + ProtobufLogReader.PB_WAL_COMPLETE_MAGIC.length + Bytes.SIZEOF_INT));
1305 closeOrFlush(close, out);
1306 break;
1308 case TRUNCATE_TRAILER:
1309 fs.delete(path, false);
1310 out = fs.create(path);
1311 out.write(corrupted_bytes, 0, fileSize - Bytes.SIZEOF_INT);// trailer is truncated.
1312 closeOrFlush(close, out);
1313 break;
1317 private void closeOrFlush(boolean close, FSDataOutputStream out)
1318 throws IOException {
1319 if (close) {
1320 out.close();
1321 } else {
1322 Method syncMethod = null;
1323 try {
1324 syncMethod = out.getClass().getMethod("hflush", new Class<?> []{});
1325 } catch (NoSuchMethodException e) {
1326 try {
1327 syncMethod = out.getClass().getMethod("sync", new Class<?> []{});
1328 } catch (NoSuchMethodException ex) {
1329 throw new IOException("This version of Hadoop supports " +
1330 "neither Syncable.sync() nor Syncable.hflush().");
1333 try {
1334 syncMethod.invoke(out, new Object[]{});
1335 } catch (Exception e) {
1336 throw new IOException(e);
1338 // Not in 0out.hflush();
1342 private int countWAL(Path log) throws IOException {
1343 int count = 0;
1344 Reader in = wals.createReader(fs, log);
1345 while (in.next() != null) {
1346 count++;
1348 in.close();
1349 return count;
1352 private static void appendCompactionEvent(Writer w, RegionInfo hri, String[] inputs,
1353 String output) throws IOException {
1354 WALProtos.CompactionDescriptor.Builder desc = WALProtos.CompactionDescriptor.newBuilder();
1355 desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
1356 .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
1357 .setRegionName(ByteString.copyFrom(hri.getRegionName()))
1358 .setFamilyName(ByteString.copyFrom(FAMILY))
1359 .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
1360 .addAllCompactionInput(Arrays.asList(inputs))
1361 .addCompactionOutput(output);
1363 WALEdit edit = WALEdit.createCompaction(hri, desc.build());
1364 WALKeyImpl key = new WALKeyImpl(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
1365 EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
1366 w.append(new Entry(key, edit));
1367 w.sync(false);
1370 private static void appendRegionEvent(Writer w, String region) throws IOException {
1371 WALProtos.RegionEventDescriptor regionOpenDesc = ProtobufUtil.toRegionEventDescriptor(
1372 WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
1373 TABLE_NAME.toBytes(),
1374 Bytes.toBytes(region),
1375 Bytes.toBytes(String.valueOf(region.hashCode())),
1377 ServerName.parseServerName("ServerName:9099"), ImmutableMap.<byte[], List<Path>>of());
1378 final long time = EnvironmentEdgeManager.currentTime();
1379 final WALKeyImpl walKey = new WALKeyImpl(Bytes.toBytes(region), TABLE_NAME, 1, time,
1380 HConstants.DEFAULT_CLUSTER_ID);
1381 WALEdit we = WALEdit.createRegionEventWALEdit(Bytes.toBytes(region), regionOpenDesc);
1382 w.append(new Entry(walKey, we));
1383 w.sync(false);
1386 public static long appendEntry(Writer writer, TableName table, byte[] region,
1387 byte[] row, byte[] family, byte[] qualifier,
1388 byte[] value, long seq)
1389 throws IOException {
1390 LOG.info(Thread.currentThread().getName() + " append");
1391 writer.append(createTestEntry(table, region, row, family, qualifier, value, seq));
1392 LOG.info(Thread.currentThread().getName() + " sync");
1393 writer.sync(false);
1394 return seq;
1397 private static Entry createTestEntry(
1398 TableName table, byte[] region,
1399 byte[] row, byte[] family, byte[] qualifier,
1400 byte[] value, long seq) {
1401 long time = System.nanoTime();
1403 seq++;
1404 final KeyValue cell = new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value);
1405 WALEdit edit = new WALEdit();
1406 edit.add(cell);
1407 return new Entry(new WALKeyImpl(region, table, seq, time,
1408 HConstants.DEFAULT_CLUSTER_ID), edit);
1411 private void injectEmptyFile(String suffix, boolean closeFile) throws IOException {
1412 Writer writer =
1413 WALFactory.createWALWriter(fs, new Path(WALDIR, WAL_FILE_PREFIX + suffix), conf);
1414 if (closeFile) {
1415 writer.close();
1419 private boolean logsAreEqual(Path p1, Path p2) throws IOException {
1420 Reader in1, in2;
1421 in1 = wals.createReader(fs, p1);
1422 in2 = wals.createReader(fs, p2);
1423 Entry entry1;
1424 Entry entry2;
1425 while ((entry1 = in1.next()) != null) {
1426 entry2 = in2.next();
1427 if ((entry1.getKey().compareTo(entry2.getKey()) != 0) ||
1428 (!entry1.getEdit().toString().equals(entry2.getEdit().toString()))) {
1429 return false;
1432 in1.close();
1433 in2.close();
1434 return true;