2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
23 import static org
.junit
.Assert
.fail
;
24 import java
.io
.FileNotFoundException
;
25 import java
.io
.IOException
;
26 import java
.lang
.reflect
.Method
;
27 import java
.security
.PrivilegedExceptionAction
;
28 import java
.util
.ArrayList
;
29 import java
.util
.Arrays
;
30 import java
.util
.Collections
;
31 import java
.util
.HashMap
;
32 import java
.util
.HashSet
;
33 import java
.util
.List
;
35 import java
.util
.NavigableSet
;
36 import java
.util
.Objects
;
38 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
39 import java
.util
.concurrent
.atomic
.AtomicInteger
;
40 import java
.util
.concurrent
.atomic
.AtomicLong
;
41 import java
.util
.stream
.Collectors
;
42 import org
.apache
.hadoop
.conf
.Configuration
;
43 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
44 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
45 import org
.apache
.hadoop
.fs
.FileStatus
;
46 import org
.apache
.hadoop
.fs
.FileSystem
;
47 import org
.apache
.hadoop
.fs
.FileUtil
;
48 import org
.apache
.hadoop
.fs
.Path
;
49 import org
.apache
.hadoop
.fs
.PathFilter
;
50 import org
.apache
.hadoop
.hbase
.Cell
;
51 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
52 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
53 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
54 import org
.apache
.hadoop
.hbase
.HConstants
;
55 import org
.apache
.hadoop
.hbase
.KeyValue
;
56 import org
.apache
.hadoop
.hbase
.ServerName
;
57 import org
.apache
.hadoop
.hbase
.TableName
;
58 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
59 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
60 import org
.apache
.hadoop
.hbase
.coordination
.SplitLogWorkerCoordination
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FaultyProtobufLogReader
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.InstrumentedLogWriter
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogReader
;
65 import org
.apache
.hadoop
.hbase
.security
.User
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
67 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
68 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
69 import org
.apache
.hadoop
.hbase
.util
.CancelableProgressable
;
70 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
71 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
72 import org
.apache
.hadoop
.hbase
.util
.Threads
;
73 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Entry
;
74 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Reader
;
75 import org
.apache
.hadoop
.hbase
.wal
.WALProvider
.Writer
;
76 import org
.apache
.hadoop
.hbase
.wal
.WALSplitter
.CorruptedLogFileException
;
77 import org
.apache
.hadoop
.hdfs
.DFSTestUtil
;
78 import org
.apache
.hadoop
.hdfs
.server
.namenode
.LeaseExpiredException
;
79 import org
.apache
.hadoop
.ipc
.RemoteException
;
80 import org
.junit
.After
;
81 import org
.junit
.AfterClass
;
82 import org
.junit
.Before
;
83 import org
.junit
.BeforeClass
;
84 import org
.junit
.ClassRule
;
85 import org
.junit
.Rule
;
86 import org
.junit
.Test
;
87 import org
.junit
.experimental
.categories
.Category
;
88 import org
.junit
.rules
.TestName
;
89 import org
.mockito
.Mockito
;
90 import org
.mockito
.invocation
.InvocationOnMock
;
91 import org
.mockito
.stubbing
.Answer
;
92 import org
.slf4j
.Logger
;
93 import org
.slf4j
.LoggerFactory
;
94 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Joiner
;
95 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableList
;
96 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
97 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ByteString
;
98 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
99 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
;
102 * Testing {@link WAL} splitting code.
104 @Category({RegionServerTests
.class, LargeTests
.class})
105 public class TestWALSplit
{
107 public static final HBaseClassTestRule CLASS_RULE
=
108 HBaseClassTestRule
.forClass(TestWALSplit
.class);
109 private final static Logger LOG
= LoggerFactory
.getLogger(TestWALSplit
.class);
111 private static Configuration conf
;
112 private FileSystem fs
;
114 protected final static HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
116 private Path HBASEDIR
;
117 private Path HBASELOGDIR
;
119 private Path OLDLOGDIR
;
120 private Path CORRUPTDIR
;
121 private Path TABLEDIR
;
122 private String TMPDIRNAME
;
124 private static final int NUM_WRITERS
= 10;
125 private static final int ENTRIES
= 10; // entries per writer per region
127 private static final String FILENAME_BEING_SPLIT
= "testfile";
128 private static final TableName TABLE_NAME
=
129 TableName
.valueOf("t1");
130 private static final byte[] FAMILY
= Bytes
.toBytes("f1");
131 private static final byte[] QUALIFIER
= Bytes
.toBytes("q1");
132 private static final byte[] VALUE
= Bytes
.toBytes("v1");
133 private static final String WAL_FILE_PREFIX
= "wal.dat.";
134 private static List
<String
> REGIONS
= new ArrayList
<>();
135 private static String ROBBER
;
136 private static String ZOMBIE
;
137 private static String
[] GROUP
= new String
[] {"supergroup"};
139 static enum Corruptions
{
140 INSERT_GARBAGE_ON_FIRST_LINE
,
141 INSERT_GARBAGE_IN_THE_MIDDLE
,
148 public static void setUpBeforeClass() throws Exception
{
149 conf
= TEST_UTIL
.getConfiguration();
150 conf
.setClass("hbase.regionserver.hlog.writer.impl",
151 InstrumentedLogWriter
.class, Writer
.class);
152 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
153 System
.setProperty("hbase.tests.use.shortcircuit.reads", "false");
154 // Create fake maping user to group and set it to the conf.
155 Map
<String
, String
[]> u2g_map
= new HashMap
<>(2);
156 ROBBER
= User
.getCurrent().getName() + "-robber";
157 ZOMBIE
= User
.getCurrent().getName() + "-zombie";
158 u2g_map
.put(ROBBER
, GROUP
);
159 u2g_map
.put(ZOMBIE
, GROUP
);
160 DFSTestUtil
.updateConfWithFakeGroupMapping(conf
, u2g_map
);
161 conf
.setInt("dfs.heartbeat.interval", 1);
162 TEST_UTIL
.startMiniDFSCluster(2);
166 public static void tearDownAfterClass() throws Exception
{
167 TEST_UTIL
.shutdownMiniDFSCluster();
171 public TestName name
= new TestName();
172 private WALFactory wals
= null;
175 public void setUp() throws Exception
{
176 LOG
.info("Cleaning up cluster for new test.");
177 fs
= TEST_UTIL
.getDFSCluster().getFileSystem();
178 HBASEDIR
= TEST_UTIL
.createRootDir();
179 HBASELOGDIR
= TEST_UTIL
.createWALRootDir();
180 OLDLOGDIR
= new Path(HBASELOGDIR
, HConstants
.HREGION_OLDLOGDIR_NAME
);
181 CORRUPTDIR
= new Path(HBASELOGDIR
, HConstants
.CORRUPT_DIR_NAME
);
182 TABLEDIR
= CommonFSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
);
183 TMPDIRNAME
= conf
.get(HConstants
.TEMPORARY_FS_DIRECTORY_KEY
,
184 HConstants
.DEFAULT_TEMPORARY_HDFS_DIRECTORY
);
186 Collections
.addAll(REGIONS
, "bbb", "ccc");
187 InstrumentedLogWriter
.activateFailure
= false;
188 wals
= new WALFactory(conf
, name
.getMethodName());
189 WALDIR
= new Path(HBASELOGDIR
,
190 AbstractFSWALProvider
.getWALDirectoryName(ServerName
.valueOf(name
.getMethodName(),
191 16010, EnvironmentEdgeManager
.currentTime()).toString()));
196 public void tearDown() throws Exception
{
199 } catch(IOException exception
) {
200 // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
201 LOG
.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
202 " you see a failure look here.");
203 LOG
.debug("exception details", exception
);
206 fs
.delete(HBASEDIR
, true);
207 fs
.delete(HBASELOGDIR
, true);
212 * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
213 * Ensures we do not lose edits.
216 public void testLogCannotBeWrittenOnceParsed() throws IOException
, InterruptedException
{
217 final AtomicLong counter
= new AtomicLong(0);
218 AtomicBoolean stop
= new AtomicBoolean(false);
219 // Region we'll write edits too and then later examine to make sure they all made it in.
220 final String region
= REGIONS
.get(0);
221 final int numWriters
= 3;
222 Thread zombie
= new ZombieLastLogWriterRegionServer(counter
, stop
, region
, numWriters
);
224 long startCount
= counter
.get();
226 // Wait till writer starts going.
227 while (startCount
== counter
.get()) Threads
.sleep(1);
228 // Give it a second to write a few appends.
230 final Configuration conf2
= HBaseConfiguration
.create(conf
);
231 final User robber
= User
.createUserForTesting(conf2
, ROBBER
, GROUP
);
232 int count
= robber
.runAs(new PrivilegedExceptionAction
<Integer
>() {
234 public Integer
run() throws Exception
{
235 StringBuilder ls
= new StringBuilder("Contents of WALDIR (").append(WALDIR
)
237 for (FileStatus status
: fs
.listStatus(WALDIR
)) {
238 ls
.append("\t").append(status
.toString()).append("\n");
240 LOG
.debug(Objects
.toString(ls
));
241 LOG
.info("Splitting WALs out from under zombie. Expecting " + numWriters
+ " files.");
242 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf2
, wals
);
243 LOG
.info("Finished splitting out from under zombie.");
244 Path
[] logfiles
= getLogForRegion(TABLE_NAME
, region
);
245 assertEquals("wrong number of split files for region", numWriters
, logfiles
.length
);
247 for (Path logfile
: logfiles
) {
248 count
+= countWAL(logfile
);
253 LOG
.info("zombie=" + counter
.get() + ", robber=" + count
);
254 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
255 "Zombie could write " + counter
.get() + " and logfile had only " + count
,
256 counter
.get() == count
|| counter
.get() + 1 == count
);
260 Threads
.threadDumpingIsAlive(zombie
);
265 * This thread will keep writing to a 'wal' file even after the split process has started.
266 * It simulates a region server that was considered dead but woke up and wrote some more to the
267 * last log entry. Does its writing as an alternate user in another filesystem instance to
268 * simulate better it being a regionserver.
270 class ZombieLastLogWriterRegionServer
extends Thread
{
271 final AtomicLong editsCount
;
272 final AtomicBoolean stop
;
273 final int numOfWriters
;
275 * Region to write edits for.
280 public ZombieLastLogWriterRegionServer(AtomicLong counter
, AtomicBoolean stop
,
281 final String region
, final int writers
)
282 throws IOException
, InterruptedException
{
283 super("ZombieLastLogWriterRegionServer");
286 this.editsCount
= counter
;
287 this.region
= region
;
288 this.user
= User
.createUserForTesting(conf
, ZOMBIE
, GROUP
);
289 numOfWriters
= writers
;
296 } catch (IOException e
) {
297 LOG
.warn(getName() + " Writer exiting " + e
);
298 } catch (InterruptedException e
) {
299 LOG
.warn(getName() + " Writer exiting " + e
);
303 private void doWriting() throws IOException
, InterruptedException
{
304 this.user
.runAs(new PrivilegedExceptionAction
<Object
>() {
306 public Object
run() throws Exception
{
307 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
308 // index we supply here.
309 int walToKeepOpen
= numOfWriters
- 1;
310 // The below method writes numOfWriters files each with ENTRIES entries for a total of
311 // numOfWriters * ENTRIES added per column family in the region.
312 Writer writer
= null;
314 writer
= generateWALs(numOfWriters
, ENTRIES
, walToKeepOpen
);
315 } catch (IOException e1
) {
316 throw new RuntimeException("Failed", e1
);
318 // Update counter so has all edits written so far.
319 editsCount
.addAndGet(numOfWriters
* ENTRIES
);
321 // If we've been interruped, then things should have shifted out from under us.
322 // closing should error
325 fail("Writing closing after parsing should give an error.");
326 } catch (IOException exception
) {
327 LOG
.debug("ignoring error when closing final writer.", exception
);
334 private void loop(final Writer writer
) {
335 byte [] regionBytes
= Bytes
.toBytes(this.region
);
336 while (!stop
.get()) {
338 long seq
= appendEntry(writer
, TABLE_NAME
, regionBytes
,
339 Bytes
.toBytes("r" + editsCount
.get()), regionBytes
, QUALIFIER
, VALUE
, 0);
340 long count
= editsCount
.incrementAndGet();
341 LOG
.info(getName() + " sync count=" + count
+ ", seq=" + seq
);
344 } catch (InterruptedException e
) {
347 } catch (IOException ex
) {
348 LOG
.error(getName() + " ex " + ex
.toString());
349 if (ex
instanceof RemoteException
) {
350 LOG
.error("Juliet: got RemoteException " + ex
.getMessage() +
351 " while writing " + (editsCount
.get() + 1));
353 LOG
.error(getName() + " failed to write....at " + editsCount
.get());
354 fail("Failed to write " + editsCount
.get());
357 } catch (Throwable t
) {
358 LOG
.error(getName() + " HOW? " + t
);
359 LOG
.debug("exception details", t
);
363 LOG
.info(getName() + " Writer exiting");
368 * @see "https://issues.apache.org/jira/browse/HBASE-3020"
371 public void testRecoveredEditsPathForMeta() throws IOException
{
372 Path p
= createRecoveredEditsPathForRegion();
373 String parentOfParent
= p
.getParent().getParent().getName();
374 assertEquals(parentOfParent
, RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
378 * Test old recovered edits file doesn't break WALSplitter.
379 * This is useful in upgrading old instances.
382 public void testOldRecoveredEditsFileSidelined() throws IOException
{
383 Path p
= createRecoveredEditsPathForRegion();
384 Path tdir
= CommonFSUtils
.getTableDir(HBASEDIR
, TableName
.META_TABLE_NAME
);
385 Path regiondir
= new Path(tdir
,
386 RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
387 fs
.mkdirs(regiondir
);
388 Path parent
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
389 assertEquals(HConstants
.RECOVERED_EDITS_DIR
, parent
.getName());
390 fs
.createNewFile(parent
); // create a recovered.edits file
391 String parentOfParent
= p
.getParent().getParent().getName();
392 assertEquals(parentOfParent
, RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
393 WALFactory
.createRecoveredEditsWriter(fs
, p
, conf
).close();
396 private Path
createRecoveredEditsPathForRegion() throws IOException
{
397 byte[] encoded
= RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes();
398 long now
= EnvironmentEdgeManager
.currentTime();
399 Entry entry
= new Entry(
400 new WALKeyImpl(encoded
, TableName
.META_TABLE_NAME
, 1, now
, HConstants
.DEFAULT_CLUSTER_ID
),
402 Path p
= WALSplitUtil
403 .getRegionSplitEditsPath(TableName
.META_TABLE_NAME
, encoded
, 1, FILENAME_BEING_SPLIT
,
409 public void testHasRecoveredEdits() throws IOException
{
410 Path p
= createRecoveredEditsPathForRegion();
411 assertFalse(WALSplitUtil
.hasRecoveredEdits(conf
, RegionInfoBuilder
.FIRST_META_REGIONINFO
));
412 String renamedEdit
= p
.getName().split("-")[0];
413 fs
.createNewFile(new Path(p
.getParent(), renamedEdit
));
414 assertTrue(WALSplitUtil
.hasRecoveredEdits(conf
, RegionInfoBuilder
.FIRST_META_REGIONINFO
));
417 private void useDifferentDFSClient() throws IOException
{
418 // make fs act as a different client now
419 // initialize will create a new DFSClient with a new client ID
420 fs
.initialize(fs
.getUri(), conf
);
424 public void testSplitPreservesEdits() throws IOException
{
425 final String REGION
= "region__1";
429 generateWALs(1, 10, -1, 0);
430 useDifferentDFSClient();
431 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
432 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
433 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
434 assertEquals(1, splitLog
.length
);
436 assertTrue("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
440 public void testSplitRemovesRegionEventsEdits() throws IOException
{
441 final String REGION
= "region__1";
445 generateWALs(1, 10, -1, 100);
446 useDifferentDFSClient();
447 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
448 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
449 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
450 assertEquals(1, splitLog
.length
);
452 assertFalse("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
453 // split log should only have the test edits
454 assertEquals(10, countWAL(splitLog
[0]));
459 public void testSplitLeavesCompactionEventsEdits() throws IOException
{
460 RegionInfo hri
= RegionInfoBuilder
.newBuilder(TABLE_NAME
).build();
462 REGIONS
.add(hri
.getEncodedName());
464 new Path(CommonFSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
), hri
.getEncodedName());
465 LOG
.info("Creating region directory: " + regionDir
);
466 assertTrue(fs
.mkdirs(regionDir
));
468 Writer writer
= generateWALs(1, 10, 0, 10);
469 String
[] compactInputs
= new String
[]{"file1", "file2", "file3"};
470 String compactOutput
= "file4";
471 appendCompactionEvent(writer
, hri
, compactInputs
, compactOutput
);
474 useDifferentDFSClient();
475 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
477 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
478 // original log should have 10 test edits, 10 region markers, 1 compaction marker
479 assertEquals(21, countWAL(originalLog
));
481 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, hri
.getEncodedName());
482 assertEquals(1, splitLog
.length
);
484 assertFalse("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
485 // split log should have 10 test edits plus 1 compaction marker
486 assertEquals(11, countWAL(splitLog
[0]));
490 * @param expectedEntries -1 to not assert
491 * @return the count across all regions
493 private int splitAndCount(final int expectedFiles
, final int expectedEntries
)
495 useDifferentDFSClient();
496 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
498 for (String region
: REGIONS
) {
499 Path
[] logfiles
= getLogForRegion(TABLE_NAME
, region
);
500 assertEquals(expectedFiles
, logfiles
.length
);
502 for (Path logfile
: logfiles
) {
503 count
+= countWAL(logfile
);
505 if (-1 != expectedEntries
) {
506 assertEquals(expectedEntries
, count
);
514 public void testEmptyLogFiles() throws IOException
{
515 testEmptyLogFiles(true);
519 public void testEmptyOpenLogFiles() throws IOException
{
520 testEmptyLogFiles(false);
523 private void testEmptyLogFiles(final boolean close
) throws IOException
{
524 // we won't create the hlog dir until getWAL got called, so
525 // make dir here when testing empty log file
527 injectEmptyFile(".empty", close
);
528 generateWALs(Integer
.MAX_VALUE
);
529 injectEmptyFile("empty", close
);
530 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
); // skip 2 empty
534 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException
{
535 // generate logs but leave wal.dat.5 open.
537 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
541 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException
{
542 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, true);
543 generateWALs(Integer
.MAX_VALUE
);
544 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
545 Corruptions
.APPEND_GARBAGE
, true);
546 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
550 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException
{
551 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, true);
552 generateWALs(Integer
.MAX_VALUE
);
553 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
554 Corruptions
.INSERT_GARBAGE_ON_FIRST_LINE
, true);
555 splitAndCount(NUM_WRITERS
- 1, (NUM_WRITERS
- 1) * ENTRIES
); //1 corrupt
559 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException
{
560 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, true);
561 generateWALs(Integer
.MAX_VALUE
);
562 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
563 Corruptions
.INSERT_GARBAGE_IN_THE_MIDDLE
, false);
564 // the entries in the original logs are alternating regions
565 // considering the sequence file header, the middle corruption should
566 // affect at least half of the entries
567 int goodEntries
= (NUM_WRITERS
- 1) * ENTRIES
;
568 int firstHalfEntries
= (int) Math
.ceil(ENTRIES
/ 2) - 1;
569 int allRegionsCount
= splitAndCount(NUM_WRITERS
, -1);
570 assertTrue("The file up to the corrupted area hasn't been parsed",
571 REGIONS
.size() * (goodEntries
+ firstHalfEntries
) <= allRegionsCount
);
575 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException
{
576 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, true);
577 List
<FaultyProtobufLogReader
.FailureType
> failureTypes
= Arrays
578 .asList(FaultyProtobufLogReader
.FailureType
.values()).stream()
579 .filter(x
-> x
!= FaultyProtobufLogReader
.FailureType
.NONE
).collect(Collectors
.toList());
580 for (FaultyProtobufLogReader
.FailureType failureType
: failureTypes
) {
581 final Set
<String
> walDirContents
= splitCorruptWALs(failureType
);
582 final Set
<String
> archivedLogs
= new HashSet
<>();
583 final StringBuilder archived
= new StringBuilder("Archived logs in CORRUPTDIR:");
584 for (FileStatus log
: fs
.listStatus(CORRUPTDIR
)) {
585 archived
.append("\n\t").append(log
.toString());
586 archivedLogs
.add(log
.getPath().getName());
588 LOG
.debug(archived
.toString());
589 assertEquals(failureType
.name() + ": expected to find all of our wals corrupt.", archivedLogs
,
595 * @return set of wal names present prior to split attempt.
596 * @throws IOException if the split process fails
598 private Set
<String
> splitCorruptWALs(final FaultyProtobufLogReader
.FailureType failureType
)
600 Class
<?
> backupClass
= conf
.getClass("hbase.regionserver.hlog.reader.impl",
602 InstrumentedLogWriter
.activateFailure
= false;
605 conf
.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader
.class,
607 conf
.set("faultyprotobuflogreader.failuretype", failureType
.name());
608 // Clean up from previous tests or previous loop
611 } catch (IOException exception
) {
612 // since we're splitting out from under the factory, we should expect some closing failures.
613 LOG
.debug("Ignoring problem closing WALFactory.", exception
);
617 for (FileStatus log
: fs
.listStatus(CORRUPTDIR
)) {
618 fs
.delete(log
.getPath(), true);
620 } catch (FileNotFoundException exception
) {
621 LOG
.debug("no previous CORRUPTDIR to clean.");
623 // change to the faulty reader
624 wals
= new WALFactory(conf
, name
.getMethodName());
626 // Our reader will render all of these files corrupt.
627 final Set
<String
> walDirContents
= new HashSet
<>();
628 for (FileStatus status
: fs
.listStatus(WALDIR
)) {
629 walDirContents
.add(status
.getPath().getName());
631 useDifferentDFSClient();
632 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
633 return walDirContents
;
635 conf
.setClass("hbase.regionserver.hlog.reader.impl", backupClass
,
640 @Test (expected
= IOException
.class)
641 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
643 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, false);
644 splitCorruptWALs(FaultyProtobufLogReader
.FailureType
.BEGINNING
);
648 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
650 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, false);
652 splitCorruptWALs(FaultyProtobufLogReader
.FailureType
.BEGINNING
);
653 } catch (IOException e
) {
654 LOG
.debug("split with 'skip errors' set to 'false' correctly threw");
656 assertEquals("if skip.errors is false all files should remain in place",
657 NUM_WRITERS
, fs
.listStatus(WALDIR
).length
);
660 private void ignoreCorruption(final Corruptions corruption
, final int entryCount
,
661 final int expectedCount
) throws IOException
{
662 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, false);
664 final String REGION
= "region__1";
668 Path c1
= new Path(WALDIR
, WAL_FILE_PREFIX
+ "0");
669 generateWALs(1, entryCount
, -1, 0);
670 corruptWAL(c1
, corruption
, true);
672 useDifferentDFSClient();
673 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
675 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
676 assertEquals(1, splitLog
.length
);
679 Reader in
= wals
.createReader(fs
, splitLog
[0]);
680 @SuppressWarnings("unused")
682 while ((entry
= in
.next()) != null) ++actualCount
;
683 assertEquals(expectedCount
, actualCount
);
686 // should not have stored the EOF files as corrupt
687 FileStatus
[] archivedLogs
= fs
.exists(CORRUPTDIR
)? fs
.listStatus(CORRUPTDIR
): new FileStatus
[0];
688 assertEquals(0, archivedLogs
.length
);
693 public void testEOFisIgnored() throws IOException
{
695 ignoreCorruption(Corruptions
.TRUNCATE
, entryCount
, entryCount
-1);
699 public void testCorruptWALTrailer() throws IOException
{
701 ignoreCorruption(Corruptions
.TRUNCATE_TRAILER
, entryCount
, entryCount
);
705 public void testLogsGetArchivedAfterSplit() throws IOException
{
706 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, false);
708 useDifferentDFSClient();
709 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
710 FileStatus
[] archivedLogs
= fs
.listStatus(OLDLOGDIR
);
711 assertEquals("wrong number of files in the archive log", NUM_WRITERS
, archivedLogs
.length
);
715 public void testSplit() throws IOException
{
717 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
721 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
724 useDifferentDFSClient();
725 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
726 FileStatus
[] statuses
= null;
728 statuses
= fs
.listStatus(WALDIR
);
729 if (statuses
!= null) {
730 fail("Files left in log dir: " +
731 Joiner
.on(",").join(FileUtil
.stat2Paths(statuses
)));
733 } catch (FileNotFoundException e
) {
734 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
738 @Test(expected
= IOException
.class)
739 public void testSplitWillFailIfWritingToRegionFails() throws Exception
{
740 //leave 5th log open so we could append the "trap"
741 Writer writer
= generateWALs(4);
742 useDifferentDFSClient();
744 String region
= "break";
745 Path regiondir
= new Path(TABLEDIR
, region
);
746 fs
.mkdirs(regiondir
);
748 InstrumentedLogWriter
.activateFailure
= false;
749 appendEntry(writer
, TABLE_NAME
, Bytes
.toBytes(region
),
750 Bytes
.toBytes("r" + 999), FAMILY
, QUALIFIER
, VALUE
, 0);
754 InstrumentedLogWriter
.activateFailure
= true;
755 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
756 } catch (IOException e
) {
757 assertTrue(e
.getMessage().
758 contains("This exception is instrumented and should only be thrown for testing"));
761 InstrumentedLogWriter
.activateFailure
= false;
766 public void testSplitDeletedRegion() throws IOException
{
768 String region
= "region_that_splits";
772 useDifferentDFSClient();
774 Path regiondir
= new Path(TABLEDIR
, region
);
775 fs
.delete(regiondir
, true);
776 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
777 assertFalse(fs
.exists(regiondir
));
781 public void testIOEOnOutputThread() throws Exception
{
782 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, false);
785 useDifferentDFSClient();
786 FileStatus
[] logfiles
= fs
.listStatus(WALDIR
);
787 assertTrue("There should be some log file",
788 logfiles
!= null && logfiles
.length
> 0);
789 // wals with no entries (like the one we don't use in the factory)
790 // won't cause a failure since nothing will ever be written.
791 // pick the largest one since it's most likely to have entries.
792 int largestLogFile
= 0;
793 long largestSize
= 0;
794 for (int i
= 0; i
< logfiles
.length
; i
++) {
795 if (logfiles
[i
].getLen() > largestSize
) {
797 largestSize
= logfiles
[i
].getLen();
800 assertTrue("There should be some log greater than size 0.", 0 < largestSize
);
801 // Set up a splitter that will throw an IOE on the output side
802 WALSplitter logSplitter
=
803 new WALSplitter(wals
, conf
, HBASEDIR
, fs
, HBASEDIR
, fs
, null, null, null) {
805 protected Writer
createWriter(Path logfile
) throws IOException
{
806 Writer mockWriter
= Mockito
.mock(Writer
.class);
807 Mockito
.doThrow(new IOException("Injected")).when(mockWriter
)
808 .append(Mockito
.<Entry
> any());
812 // Set up a background thread dumper. Needs a thread to depend on and then we need to run
813 // the thread dumping in a background thread so it does not hold up the test.
814 final AtomicBoolean stop
= new AtomicBoolean(false);
815 final Thread someOldThread
= new Thread("Some-old-thread") {
818 while(!stop
.get()) Threads
.sleep(10);
821 someOldThread
.setDaemon(true);
822 someOldThread
.start();
823 final Thread t
= new Thread("Background-thread-dumper") {
827 Threads
.threadDumpingIsAlive(someOldThread
);
828 } catch (InterruptedException e
) {
836 logSplitter
.splitWAL(logfiles
[largestLogFile
], null);
837 fail("Didn't throw!");
838 } catch (IOException ioe
) {
839 assertTrue(ioe
.toString().contains("Injected"));
841 // Setting this to true will turn off the background thread dumper.
847 * @param spiedFs should be instrumented for failure.
849 private void retryOverHdfsProblem(final FileSystem spiedFs
) throws Exception
{
851 useDifferentDFSClient();
854 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, spiedFs
, conf
, wals
);
855 assertEquals(NUM_WRITERS
, fs
.listStatus(OLDLOGDIR
).length
);
856 assertFalse(fs
.exists(WALDIR
));
857 } catch (IOException e
) {
858 fail("There shouldn't be any exception but: " + e
.toString());
862 // Test for HBASE-3412
864 public void testMovedWALDuringRecovery() throws Exception
{
865 // This partial mock will throw LEE for every file simulating
866 // files that were moved
867 FileSystem spiedFs
= Mockito
.spy(fs
);
868 // The "File does not exist" part is very important,
869 // that's how it comes out of HDFS
870 Mockito
.doThrow(new LeaseExpiredException("Injected: File does not exist")).
871 when(spiedFs
).append(Mockito
.<Path
>any());
872 retryOverHdfsProblem(spiedFs
);
876 public void testRetryOpenDuringRecovery() throws Exception
{
877 FileSystem spiedFs
= Mockito
.spy(fs
);
878 // The "Cannot obtain block length", "Could not obtain the last block",
879 // and "Blocklist for [^ ]* has changed.*" part is very important,
880 // that's how it comes out of HDFS. If HDFS changes the exception
881 // message, this test needs to be adjusted accordingly.
883 // When DFSClient tries to open a file, HDFS needs to locate
884 // the last block of the file and get its length. However, if the
885 // last block is under recovery, HDFS may have problem to obtain
886 // the block length, in which case, retry may help.
887 Mockito
.doAnswer(new Answer
<FSDataInputStream
>() {
888 private final String
[] errors
= new String
[] {
889 "Cannot obtain block length", "Could not obtain the last block",
890 "Blocklist for " + OLDLOGDIR
+ " has changed"};
891 private int count
= 0;
894 public FSDataInputStream
answer(InvocationOnMock invocation
) throws Throwable
{
896 throw new IOException(errors
[count
++]);
898 return (FSDataInputStream
)invocation
.callRealMethod();
900 }).when(spiedFs
).open(Mockito
.<Path
>any(), Mockito
.anyInt());
901 retryOverHdfsProblem(spiedFs
);
905 public void testTerminationAskedByReporter() throws IOException
, CorruptedLogFileException
{
906 generateWALs(1, 10, -1);
907 FileStatus logfile
= fs
.listStatus(WALDIR
)[0];
908 useDifferentDFSClient();
910 final AtomicInteger count
= new AtomicInteger();
912 CancelableProgressable localReporter
913 = new CancelableProgressable() {
915 public boolean progress() {
916 count
.getAndIncrement();
921 FileSystem spiedFs
= Mockito
.spy(fs
);
922 Mockito
.doAnswer(new Answer
<FSDataInputStream
>() {
924 public FSDataInputStream
answer(InvocationOnMock invocation
) throws Throwable
{
925 Thread
.sleep(1500); // Sleep a while and wait report status invoked
926 return (FSDataInputStream
)invocation
.callRealMethod();
928 }).when(spiedFs
).open(Mockito
.<Path
>any(), Mockito
.anyInt());
931 conf
.setInt("hbase.splitlog.report.period", 1000);
932 boolean ret
= WALSplitter
.splitLogFile(HBASEDIR
, logfile
, spiedFs
, conf
, localReporter
, null,
933 Mockito
.mock(SplitLogWorkerCoordination
.class), wals
, null);
934 assertFalse("Log splitting should failed", ret
);
935 assertTrue(count
.get() > 0);
936 } catch (IOException e
) {
937 fail("There shouldn't be any exception but: " + e
.toString());
939 // reset it back to its default value
940 conf
.setInt("hbase.splitlog.report.period", 59000);
945 * Test log split process with fake data and lots of edits to trigger threading
949 public void testThreading() throws Exception
{
950 doTestThreading(20000, 128*1024*1024, 0);
954 * Test blocking behavior of the log split process if writers are writing slower
955 * than the reader is reading.
958 public void testThreadingSlowWriterSmallBuffer() throws Exception
{
959 doTestThreading(200, 1024, 50);
963 * Sets up a log splitter with a mock reader and writer. The mock reader generates
964 * a specified number of edits spread across 5 regions. The mock writer optionally
965 * sleeps for each edit it is fed.
967 * After the split is complete, verifies that the statistics show the correct number
968 * of edits output into each region.
970 * @param numFakeEdits number of fake edits to push through pipeline
971 * @param bufferSize size of in-memory buffer
972 * @param writerSlowness writer threads will sleep this many ms per edit
974 private void doTestThreading(final int numFakeEdits
,
975 final int bufferSize
,
976 final int writerSlowness
) throws Exception
{
978 Configuration localConf
= new Configuration(conf
);
979 localConf
.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize
);
981 // Create a fake log file (we'll override the reader to produce a stream of edits)
982 Path logPath
= new Path(WALDIR
, WAL_FILE_PREFIX
+ ".fake");
983 FSDataOutputStream out
= fs
.create(logPath
);
986 // Make region dirs for our destination regions so the output doesn't get skipped
987 final List
<String
> regions
= ImmutableList
.of("r0", "r1", "r2", "r3", "r4");
988 makeRegionDirs(regions
);
990 // Create a splitter that reads and writes the data without touching disk
991 WALSplitter logSplitter
= new WALSplitter(wals
, localConf
, HBASEDIR
, fs
, HBASEDIR
, fs
) {
992 /* Produce a mock writer that doesn't write anywhere */
994 protected Writer
createWriter(Path logfile
) throws IOException
{
995 Writer mockWriter
= Mockito
.mock(Writer
.class);
996 Mockito
.doAnswer(new Answer
<Void
>() {
997 int expectedIndex
= 0;
1000 public Void
answer(InvocationOnMock invocation
) {
1001 if (writerSlowness
> 0) {
1003 Thread
.sleep(writerSlowness
);
1004 } catch (InterruptedException ie
) {
1005 Thread
.currentThread().interrupt();
1008 Entry entry
= (Entry
) invocation
.getArgument(0);
1009 WALEdit edit
= entry
.getEdit();
1010 List
<Cell
> cells
= edit
.getCells();
1011 assertEquals(1, cells
.size());
1012 Cell cell
= cells
.get(0);
1014 // Check that the edits come in the right order.
1015 assertEquals(expectedIndex
, Bytes
.toInt(cell
.getRowArray(), cell
.getRowOffset(),
1016 cell
.getRowLength()));
1020 }).when(mockWriter
).append(Mockito
.<Entry
>any());
1024 /* Produce a mock reader that generates fake entries */
1026 protected Reader
getReader(FileStatus file
, boolean skipErrors
,
1027 CancelableProgressable reporter
) throws IOException
, CorruptedLogFileException
{
1028 Reader mockReader
= Mockito
.mock(Reader
.class);
1029 Mockito
.doAnswer(new Answer
<Entry
>() {
1033 public Entry
answer(InvocationOnMock invocation
) throws Throwable
{
1034 if (index
>= numFakeEdits
) return null;
1036 // Generate r0 through r4 in round robin fashion
1037 int regionIdx
= index
% regions
.size();
1038 byte region
[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx
)};
1040 Entry ret
= createTestEntry(TABLE_NAME
, region
,
1041 Bytes
.toBytes(index
/ regions
.size()),
1042 FAMILY
, QUALIFIER
, VALUE
, index
);
1046 }).when(mockReader
).next();
1051 logSplitter
.splitWAL(fs
.getFileStatus(logPath
), null);
1053 // Verify number of written edits per region
1054 Map
<String
, Long
> outputCounts
= logSplitter
.outputSink
.getOutputCounts();
1055 for (Map
.Entry
<String
, Long
> entry
: outputCounts
.entrySet()) {
1056 LOG
.info("Got " + entry
.getValue() + " output edits for region " + entry
.getKey());
1057 assertEquals((long) entry
.getValue(), numFakeEdits
/ regions
.size());
1059 assertEquals("Should have as many outputs as regions", regions
.size(), outputCounts
.size());
1062 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1064 public void testSplitLogFileDeletedRegionDir() throws IOException
{
1065 LOG
.info("testSplitLogFileDeletedRegionDir");
1066 final String REGION
= "region__1";
1068 REGIONS
.add(REGION
);
1070 generateWALs(1, 10, -1);
1071 useDifferentDFSClient();
1073 Path regiondir
= new Path(TABLEDIR
, REGION
);
1074 LOG
.info("Region directory is" + regiondir
);
1075 fs
.delete(regiondir
, true);
1076 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1077 assertFalse(fs
.exists(regiondir
));
1081 public void testSplitLogFileEmpty() throws IOException
{
1082 LOG
.info("testSplitLogFileEmpty");
1083 // we won't create the hlog dir until getWAL got called, so
1084 // make dir here when testing empty log file
1086 injectEmptyFile(".empty", true);
1087 useDifferentDFSClient();
1089 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1090 Path tdir
= CommonFSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
);
1091 assertFalse(fs
.exists(tdir
));
1093 assertEquals(0, countWAL(fs
.listStatus(OLDLOGDIR
)[0].getPath()));
1097 public void testSplitLogFileMultipleRegions() throws IOException
{
1098 LOG
.info("testSplitLogFileMultipleRegions");
1099 generateWALs(1, 10, -1);
1100 splitAndCount(1, 10);
1104 public void testSplitLogFileFirstLineCorruptionLog()
1105 throws IOException
{
1106 conf
.setBoolean(WALSplitter
.SPLIT_SKIP_ERRORS_KEY
, true);
1107 generateWALs(1, 10, -1);
1108 FileStatus logfile
= fs
.listStatus(WALDIR
)[0];
1110 corruptWAL(logfile
.getPath(),
1111 Corruptions
.INSERT_GARBAGE_ON_FIRST_LINE
, true);
1113 useDifferentDFSClient();
1114 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1116 final Path corruptDir
=
1117 new Path(CommonFSUtils
.getWALRootDir(conf
), HConstants
.CORRUPT_DIR_NAME
);
1118 assertEquals(1, fs
.listStatus(corruptDir
).length
);
1122 * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1125 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException
{
1126 LOG
.info("testConcurrentSplitLogAndReplayRecoverEdit");
1127 // Generate wals for our destination region
1128 String regionName
= "r0";
1129 final Path regiondir
= new Path(TABLEDIR
, regionName
);
1131 REGIONS
.add(regionName
);
1135 FileStatus
[] logfiles
= fs
.listStatus(WALDIR
);
1136 assertTrue("There should be some log file",
1137 logfiles
!= null && logfiles
.length
> 0);
1139 WALSplitter logSplitter
=
1140 new WALSplitter(wals
, conf
, HBASEDIR
, fs
, HBASEDIR
, fs
, null, null, null) {
1142 protected Writer
createWriter(Path logfile
)
1143 throws IOException
{
1144 Writer writer
= wals
.createRecoveredEditsWriter(this.walFS
, logfile
);
1145 // After creating writer, simulate region's
1146 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1147 // region and delete them, excluding files with '.temp' suffix.
1148 NavigableSet
<Path
> files
= WALSplitUtil
.getSplitEditFilesSorted(fs
, regiondir
);
1149 if (files
!= null && !files
.isEmpty()) {
1150 for (Path file
: files
) {
1151 if (!this.walFS
.delete(file
, false)) {
1152 LOG
.error("Failed delete of " + file
);
1154 LOG
.debug("Deleted recovered.edits file=" + file
);
1162 logSplitter
.splitWAL(logfiles
[0], null);
1163 } catch (IOException e
) {
1164 LOG
.info(e
.toString(), e
);
1165 fail("Throws IOException when spliting "
1166 + "log, it is most likely because writing file does not "
1167 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1169 if (fs
.exists(CORRUPTDIR
)) {
1170 if (fs
.listStatus(CORRUPTDIR
).length
> 0) {
1171 fail("There are some corrupt logs, "
1172 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1178 public void testRecoveredEditsStoragePolicy() throws IOException
{
1179 conf
.set(HConstants
.WAL_STORAGE_POLICY
, "ALL_SSD");
1181 Path path
= createRecoveredEditsPathForRegion();
1182 assertEquals("ALL_SSD", fs
.getStoragePolicy(path
.getParent()).getName());
1184 conf
.unset(HConstants
.WAL_STORAGE_POLICY
);
1189 private Writer
generateWALs(int leaveOpen
) throws IOException
{
1190 return generateWALs(NUM_WRITERS
, ENTRIES
, leaveOpen
, 0);
1193 private Writer
generateWALs(int writers
, int entries
, int leaveOpen
) throws IOException
{
1194 return generateWALs(writers
, entries
, leaveOpen
, 7);
1197 private void makeRegionDirs(List
<String
> regions
) throws IOException
{
1198 for (String region
: regions
) {
1199 LOG
.debug("Creating dir for region " + region
);
1200 fs
.mkdirs(new Path(TABLEDIR
, region
));
1205 * @param leaveOpen index to leave un-closed. -1 to close all.
1206 * @return the writer that's still open, or null if all were closed.
1208 private Writer
generateWALs(int writers
, int entries
, int leaveOpen
, int regionEvents
) throws IOException
{
1209 makeRegionDirs(REGIONS
);
1211 Writer
[] ws
= new Writer
[writers
];
1213 int numRegionEventsAdded
= 0;
1214 for (int i
= 0; i
< writers
; i
++) {
1215 ws
[i
] = wals
.createWALWriter(fs
, new Path(WALDIR
, WAL_FILE_PREFIX
+ i
));
1216 for (int j
= 0; j
< entries
; j
++) {
1218 for (String region
: REGIONS
) {
1219 String row_key
= region
+ prefix
++ + i
+ j
;
1220 appendEntry(ws
[i
], TABLE_NAME
, Bytes
.toBytes(region
), Bytes
.toBytes(row_key
), FAMILY
,
1221 QUALIFIER
, VALUE
, seq
++);
1223 if (numRegionEventsAdded
< regionEvents
) {
1224 numRegionEventsAdded
++;
1225 appendRegionEvent(ws
[i
], region
);
1229 if (i
!= leaveOpen
) {
1231 LOG
.info("Closing writer " + i
);
1234 if (leaveOpen
< 0 || leaveOpen
>= writers
) {
1237 return ws
[leaveOpen
];
1242 private Path
[] getLogForRegion(TableName table
, String region
)
1243 throws IOException
{
1244 Path tdir
= CommonFSUtils
.getWALTableDir(conf
, table
);
1245 @SuppressWarnings("deprecation")
1246 Path editsdir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(HRegion
.getRegionDir(tdir
,
1247 Bytes
.toString(Bytes
.toBytes(region
))));
1248 FileStatus
[] files
= fs
.listStatus(editsdir
, new PathFilter() {
1250 public boolean accept(Path p
) {
1251 if (WALSplitUtil
.isSequenceIdFile(p
)) {
1257 Path
[] paths
= new Path
[files
.length
];
1258 for (int i
= 0; i
< files
.length
; i
++) {
1259 paths
[i
] = files
[i
].getPath();
1264 private void corruptWAL(Path path
, Corruptions corruption
, boolean close
) throws IOException
{
1265 FSDataOutputStream out
;
1266 int fileSize
= (int) fs
.listStatus(path
)[0].getLen();
1268 FSDataInputStream in
= fs
.open(path
);
1269 byte[] corrupted_bytes
= new byte[fileSize
];
1270 in
.readFully(0, corrupted_bytes
, 0, fileSize
);
1273 switch (corruption
) {
1274 case APPEND_GARBAGE
:
1275 fs
.delete(path
, false);
1276 out
= fs
.create(path
);
1277 out
.write(corrupted_bytes
);
1278 out
.write(Bytes
.toBytes("-----"));
1279 closeOrFlush(close
, out
);
1282 case INSERT_GARBAGE_ON_FIRST_LINE
:
1283 fs
.delete(path
, false);
1284 out
= fs
.create(path
);
1286 out
.write(corrupted_bytes
);
1287 closeOrFlush(close
, out
);
1290 case INSERT_GARBAGE_IN_THE_MIDDLE
:
1291 fs
.delete(path
, false);
1292 out
= fs
.create(path
);
1293 int middle
= (int) Math
.floor(corrupted_bytes
.length
/ 2);
1294 out
.write(corrupted_bytes
, 0, middle
);
1296 out
.write(corrupted_bytes
, middle
, corrupted_bytes
.length
- middle
);
1297 closeOrFlush(close
, out
);
1301 fs
.delete(path
, false);
1302 out
= fs
.create(path
);
1303 out
.write(corrupted_bytes
, 0, fileSize
1304 - (32 + ProtobufLogReader
.PB_WAL_COMPLETE_MAGIC
.length
+ Bytes
.SIZEOF_INT
));
1305 closeOrFlush(close
, out
);
1308 case TRUNCATE_TRAILER
:
1309 fs
.delete(path
, false);
1310 out
= fs
.create(path
);
1311 out
.write(corrupted_bytes
, 0, fileSize
- Bytes
.SIZEOF_INT
);// trailer is truncated.
1312 closeOrFlush(close
, out
);
1317 private void closeOrFlush(boolean close
, FSDataOutputStream out
)
1318 throws IOException
{
1322 Method syncMethod
= null;
1324 syncMethod
= out
.getClass().getMethod("hflush", new Class
<?
> []{});
1325 } catch (NoSuchMethodException e
) {
1327 syncMethod
= out
.getClass().getMethod("sync", new Class
<?
> []{});
1328 } catch (NoSuchMethodException ex
) {
1329 throw new IOException("This version of Hadoop supports " +
1330 "neither Syncable.sync() nor Syncable.hflush().");
1334 syncMethod
.invoke(out
, new Object
[]{});
1335 } catch (Exception e
) {
1336 throw new IOException(e
);
1338 // Not in 0out.hflush();
1342 private int countWAL(Path log
) throws IOException
{
1344 Reader in
= wals
.createReader(fs
, log
);
1345 while (in
.next() != null) {
1352 private static void appendCompactionEvent(Writer w
, RegionInfo hri
, String
[] inputs
,
1353 String output
) throws IOException
{
1354 WALProtos
.CompactionDescriptor
.Builder desc
= WALProtos
.CompactionDescriptor
.newBuilder();
1355 desc
.setTableName(ByteString
.copyFrom(hri
.getTable().toBytes()))
1356 .setEncodedRegionName(ByteString
.copyFrom(hri
.getEncodedNameAsBytes()))
1357 .setRegionName(ByteString
.copyFrom(hri
.getRegionName()))
1358 .setFamilyName(ByteString
.copyFrom(FAMILY
))
1359 .setStoreHomeDir(hri
.getEncodedName() + "/" + Bytes
.toString(FAMILY
))
1360 .addAllCompactionInput(Arrays
.asList(inputs
))
1361 .addCompactionOutput(output
);
1363 WALEdit edit
= WALEdit
.createCompaction(hri
, desc
.build());
1364 WALKeyImpl key
= new WALKeyImpl(hri
.getEncodedNameAsBytes(), TABLE_NAME
, 1,
1365 EnvironmentEdgeManager
.currentTime(), HConstants
.DEFAULT_CLUSTER_ID
);
1366 w
.append(new Entry(key
, edit
));
1370 private static void appendRegionEvent(Writer w
, String region
) throws IOException
{
1371 WALProtos
.RegionEventDescriptor regionOpenDesc
= ProtobufUtil
.toRegionEventDescriptor(
1372 WALProtos
.RegionEventDescriptor
.EventType
.REGION_OPEN
,
1373 TABLE_NAME
.toBytes(),
1374 Bytes
.toBytes(region
),
1375 Bytes
.toBytes(String
.valueOf(region
.hashCode())),
1377 ServerName
.parseServerName("ServerName:9099"), ImmutableMap
.<byte[], List
<Path
>>of());
1378 final long time
= EnvironmentEdgeManager
.currentTime();
1379 final WALKeyImpl walKey
= new WALKeyImpl(Bytes
.toBytes(region
), TABLE_NAME
, 1, time
,
1380 HConstants
.DEFAULT_CLUSTER_ID
);
1381 WALEdit we
= WALEdit
.createRegionEventWALEdit(Bytes
.toBytes(region
), regionOpenDesc
);
1382 w
.append(new Entry(walKey
, we
));
1386 public static long appendEntry(Writer writer
, TableName table
, byte[] region
,
1387 byte[] row
, byte[] family
, byte[] qualifier
,
1388 byte[] value
, long seq
)
1389 throws IOException
{
1390 LOG
.info(Thread
.currentThread().getName() + " append");
1391 writer
.append(createTestEntry(table
, region
, row
, family
, qualifier
, value
, seq
));
1392 LOG
.info(Thread
.currentThread().getName() + " sync");
1397 private static Entry
createTestEntry(
1398 TableName table
, byte[] region
,
1399 byte[] row
, byte[] family
, byte[] qualifier
,
1400 byte[] value
, long seq
) {
1401 long time
= System
.nanoTime();
1404 final KeyValue cell
= new KeyValue(row
, family
, qualifier
, time
, KeyValue
.Type
.Put
, value
);
1405 WALEdit edit
= new WALEdit();
1407 return new Entry(new WALKeyImpl(region
, table
, seq
, time
,
1408 HConstants
.DEFAULT_CLUSTER_ID
), edit
);
1411 private void injectEmptyFile(String suffix
, boolean closeFile
) throws IOException
{
1413 WALFactory
.createWALWriter(fs
, new Path(WALDIR
, WAL_FILE_PREFIX
+ suffix
), conf
);
1419 private boolean logsAreEqual(Path p1
, Path p2
) throws IOException
{
1421 in1
= wals
.createReader(fs
, p1
);
1422 in2
= wals
.createReader(fs
, p2
);
1425 while ((entry1
= in1
.next()) != null) {
1426 entry2
= in2
.next();
1427 if ((entry1
.getKey().compareTo(entry2
.getKey()) != 0) ||
1428 (!entry1
.getEdit().toString().equals(entry2
.getEdit().toString()))) {