2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
23 import static org
.junit
.Assert
.fail
;
25 import java
.io
.FileNotFoundException
;
26 import java
.io
.IOException
;
27 import java
.lang
.reflect
.Method
;
28 import java
.security
.PrivilegedExceptionAction
;
29 import java
.util
.ArrayList
;
30 import java
.util
.Arrays
;
31 import java
.util
.Collections
;
32 import java
.util
.HashMap
;
33 import java
.util
.HashSet
;
34 import java
.util
.List
;
36 import java
.util
.NavigableSet
;
37 import java
.util
.Objects
;
39 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
40 import java
.util
.concurrent
.atomic
.AtomicInteger
;
41 import java
.util
.concurrent
.atomic
.AtomicLong
;
42 import java
.util
.stream
.Collectors
;
43 import org
.apache
.hadoop
.conf
.Configuration
;
44 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
45 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
46 import org
.apache
.hadoop
.fs
.FileStatus
;
47 import org
.apache
.hadoop
.fs
.FileSystem
;
48 import org
.apache
.hadoop
.fs
.FileUtil
;
49 import org
.apache
.hadoop
.fs
.Path
;
50 import org
.apache
.hadoop
.fs
.PathFilter
;
51 import org
.apache
.hadoop
.hbase
.Cell
;
52 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
53 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
54 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
55 import org
.apache
.hadoop
.hbase
.HConstants
;
56 import org
.apache
.hadoop
.hbase
.KeyValue
;
57 import org
.apache
.hadoop
.hbase
.ServerName
;
58 import org
.apache
.hadoop
.hbase
.TableName
;
59 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
60 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FaultyProtobufLogReader
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.InstrumentedLogWriter
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogReader
;
65 import org
.apache
.hadoop
.hbase
.security
.User
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
67 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
68 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
69 import org
.apache
.hadoop
.hbase
.util
.CancelableProgressable
;
70 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
71 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
72 import org
.apache
.hadoop
.hbase
.util
.Threads
;
73 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Entry
;
74 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Reader
;
75 import org
.apache
.hadoop
.hbase
.wal
.WALProvider
.Writer
;
76 import org
.apache
.hadoop
.hbase
.wal
.WALSplitter
.CorruptedLogFileException
;
77 import org
.apache
.hadoop
.hdfs
.DFSTestUtil
;
78 import org
.apache
.hadoop
.hdfs
.server
.namenode
.LeaseExpiredException
;
79 import org
.apache
.hadoop
.ipc
.RemoteException
;
80 import org
.junit
.After
;
81 import org
.junit
.AfterClass
;
82 import org
.junit
.Before
;
83 import org
.junit
.BeforeClass
;
84 import org
.junit
.ClassRule
;
85 import org
.junit
.Rule
;
86 import org
.junit
.Test
;
87 import org
.junit
.experimental
.categories
.Category
;
88 import org
.junit
.rules
.TestName
;
89 import org
.mockito
.Mockito
;
90 import org
.mockito
.invocation
.InvocationOnMock
;
91 import org
.mockito
.stubbing
.Answer
;
92 import org
.slf4j
.Logger
;
93 import org
.slf4j
.LoggerFactory
;
95 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Joiner
;
96 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableList
;
97 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
98 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ByteString
;
100 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
101 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
;
104 * Testing {@link WAL} splitting code.
106 @Category({RegionServerTests
.class, LargeTests
.class})
107 public class TestWALSplit
{
110 public static final HBaseClassTestRule CLASS_RULE
=
111 HBaseClassTestRule
.forClass(TestWALSplit
.class);
114 // Uncomment the following lines if more verbosity is needed for
115 // debugging (see HBASE-12285 for details).
116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
120 private final static Logger LOG
= LoggerFactory
.getLogger(TestWALSplit
.class);
122 private static Configuration conf
;
123 private FileSystem fs
;
125 protected final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
127 private Path HBASEDIR
;
128 private Path HBASELOGDIR
;
130 private Path OLDLOGDIR
;
131 private Path CORRUPTDIR
;
132 private Path TABLEDIR
;
133 private String TMPDIRNAME
;
135 private static final int NUM_WRITERS
= 10;
136 private static final int ENTRIES
= 10; // entries per writer per region
138 private static final String FILENAME_BEING_SPLIT
= "testfile";
139 private static final TableName TABLE_NAME
=
140 TableName
.valueOf("t1");
141 private static final byte[] FAMILY
= Bytes
.toBytes("f1");
142 private static final byte[] QUALIFIER
= Bytes
.toBytes("q1");
143 private static final byte[] VALUE
= Bytes
.toBytes("v1");
144 private static final String WAL_FILE_PREFIX
= "wal.dat.";
145 private static List
<String
> REGIONS
= new ArrayList
<>();
146 private static final String HBASE_SKIP_ERRORS
= "hbase.hlog.split.skip.errors";
147 private static String ROBBER
;
148 private static String ZOMBIE
;
149 private static String
[] GROUP
= new String
[] {"supergroup"};
151 static enum Corruptions
{
152 INSERT_GARBAGE_ON_FIRST_LINE
,
153 INSERT_GARBAGE_IN_THE_MIDDLE
,
160 public static void setUpBeforeClass() throws Exception
{
161 conf
= TEST_UTIL
.getConfiguration();
162 conf
.setClass("hbase.regionserver.hlog.writer.impl",
163 InstrumentedLogWriter
.class, Writer
.class);
164 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
165 System
.setProperty("hbase.tests.use.shortcircuit.reads", "false");
166 // Create fake maping user to group and set it to the conf.
167 Map
<String
, String
[]> u2g_map
= new HashMap
<>(2);
168 ROBBER
= User
.getCurrent().getName() + "-robber";
169 ZOMBIE
= User
.getCurrent().getName() + "-zombie";
170 u2g_map
.put(ROBBER
, GROUP
);
171 u2g_map
.put(ZOMBIE
, GROUP
);
172 DFSTestUtil
.updateConfWithFakeGroupMapping(conf
, u2g_map
);
173 conf
.setInt("dfs.heartbeat.interval", 1);
174 TEST_UTIL
.startMiniDFSCluster(2);
178 public static void tearDownAfterClass() throws Exception
{
179 TEST_UTIL
.shutdownMiniDFSCluster();
183 public TestName name
= new TestName();
184 private WALFactory wals
= null;
187 public void setUp() throws Exception
{
188 LOG
.info("Cleaning up cluster for new test.");
189 fs
= TEST_UTIL
.getDFSCluster().getFileSystem();
190 HBASEDIR
= TEST_UTIL
.createRootDir();
191 HBASELOGDIR
= TEST_UTIL
.createWALRootDir();
192 OLDLOGDIR
= new Path(HBASELOGDIR
, HConstants
.HREGION_OLDLOGDIR_NAME
);
193 CORRUPTDIR
= new Path(HBASELOGDIR
, HConstants
.CORRUPT_DIR_NAME
);
194 TABLEDIR
= FSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
);
195 TMPDIRNAME
= conf
.get(HConstants
.TEMPORARY_FS_DIRECTORY_KEY
,
196 HConstants
.DEFAULT_TEMPORARY_HDFS_DIRECTORY
);
198 Collections
.addAll(REGIONS
, "bbb", "ccc");
199 InstrumentedLogWriter
.activateFailure
= false;
200 wals
= new WALFactory(conf
, name
.getMethodName());
201 WALDIR
= new Path(HBASELOGDIR
,
202 AbstractFSWALProvider
.getWALDirectoryName(ServerName
.valueOf(name
.getMethodName(),
203 16010, System
.currentTimeMillis()).toString()));
208 public void tearDown() throws Exception
{
211 } catch(IOException exception
) {
212 // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
213 LOG
.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
214 " you see a failure look here.");
215 LOG
.debug("exception details", exception
);
218 fs
.delete(HBASEDIR
, true);
219 fs
.delete(HBASELOGDIR
, true);
224 * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
225 * Ensures we do not lose edits.
226 * @throws IOException
227 * @throws InterruptedException
230 public void testLogCannotBeWrittenOnceParsed() throws IOException
, InterruptedException
{
231 final AtomicLong counter
= new AtomicLong(0);
232 AtomicBoolean stop
= new AtomicBoolean(false);
233 // Region we'll write edits too and then later examine to make sure they all made it in.
234 final String region
= REGIONS
.get(0);
235 final int numWriters
= 3;
236 Thread zombie
= new ZombieLastLogWriterRegionServer(counter
, stop
, region
, numWriters
);
238 long startCount
= counter
.get();
240 // Wait till writer starts going.
241 while (startCount
== counter
.get()) Threads
.sleep(1);
242 // Give it a second to write a few appends.
244 final Configuration conf2
= HBaseConfiguration
.create(conf
);
245 final User robber
= User
.createUserForTesting(conf2
, ROBBER
, GROUP
);
246 int count
= robber
.runAs(new PrivilegedExceptionAction
<Integer
>() {
248 public Integer
run() throws Exception
{
249 StringBuilder ls
= new StringBuilder("Contents of WALDIR (").append(WALDIR
)
251 for (FileStatus status
: fs
.listStatus(WALDIR
)) {
252 ls
.append("\t").append(status
.toString()).append("\n");
254 LOG
.debug(Objects
.toString(ls
));
255 LOG
.info("Splitting WALs out from under zombie. Expecting " + numWriters
+ " files.");
256 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf2
, wals
);
257 LOG
.info("Finished splitting out from under zombie.");
258 Path
[] logfiles
= getLogForRegion(TABLE_NAME
, region
);
259 assertEquals("wrong number of split files for region", numWriters
, logfiles
.length
);
261 for (Path logfile
: logfiles
) {
262 count
+= countWAL(logfile
);
267 LOG
.info("zombie=" + counter
.get() + ", robber=" + count
);
268 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
269 "Zombie could write " + counter
.get() + " and logfile had only " + count
,
270 counter
.get() == count
|| counter
.get() + 1 == count
);
274 Threads
.threadDumpingIsAlive(zombie
);
279 * This thread will keep writing to a 'wal' file even after the split process has started.
280 * It simulates a region server that was considered dead but woke up and wrote some more to the
281 * last log entry. Does its writing as an alternate user in another filesystem instance to
282 * simulate better it being a regionserver.
284 class ZombieLastLogWriterRegionServer
extends Thread
{
285 final AtomicLong editsCount
;
286 final AtomicBoolean stop
;
287 final int numOfWriters
;
289 * Region to write edits for.
294 public ZombieLastLogWriterRegionServer(AtomicLong counter
, AtomicBoolean stop
,
295 final String region
, final int writers
)
296 throws IOException
, InterruptedException
{
297 super("ZombieLastLogWriterRegionServer");
300 this.editsCount
= counter
;
301 this.region
= region
;
302 this.user
= User
.createUserForTesting(conf
, ZOMBIE
, GROUP
);
303 numOfWriters
= writers
;
310 } catch (IOException e
) {
311 LOG
.warn(getName() + " Writer exiting " + e
);
312 } catch (InterruptedException e
) {
313 LOG
.warn(getName() + " Writer exiting " + e
);
317 private void doWriting() throws IOException
, InterruptedException
{
318 this.user
.runAs(new PrivilegedExceptionAction
<Object
>() {
320 public Object
run() throws Exception
{
321 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
322 // index we supply here.
323 int walToKeepOpen
= numOfWriters
- 1;
324 // The below method writes numOfWriters files each with ENTRIES entries for a total of
325 // numOfWriters * ENTRIES added per column family in the region.
326 Writer writer
= null;
328 writer
= generateWALs(numOfWriters
, ENTRIES
, walToKeepOpen
);
329 } catch (IOException e1
) {
330 throw new RuntimeException("Failed", e1
);
332 // Update counter so has all edits written so far.
333 editsCount
.addAndGet(numOfWriters
* ENTRIES
);
335 // If we've been interruped, then things should have shifted out from under us.
336 // closing should error
339 fail("Writing closing after parsing should give an error.");
340 } catch (IOException exception
) {
341 LOG
.debug("ignoring error when closing final writer.", exception
);
348 private void loop(final Writer writer
) {
349 byte [] regionBytes
= Bytes
.toBytes(this.region
);
350 while (!stop
.get()) {
352 long seq
= appendEntry(writer
, TABLE_NAME
, regionBytes
,
353 Bytes
.toBytes("r" + editsCount
.get()), regionBytes
, QUALIFIER
, VALUE
, 0);
354 long count
= editsCount
.incrementAndGet();
355 LOG
.info(getName() + " sync count=" + count
+ ", seq=" + seq
);
358 } catch (InterruptedException e
) {
361 } catch (IOException ex
) {
362 LOG
.error(getName() + " ex " + ex
.toString());
363 if (ex
instanceof RemoteException
) {
364 LOG
.error("Juliet: got RemoteException " + ex
.getMessage() +
365 " while writing " + (editsCount
.get() + 1));
367 LOG
.error(getName() + " failed to write....at " + editsCount
.get());
368 fail("Failed to write " + editsCount
.get());
371 } catch (Throwable t
) {
372 LOG
.error(getName() + " HOW? " + t
);
373 LOG
.debug("exception details", t
);
377 LOG
.info(getName() + " Writer exiting");
382 * @see "https://issues.apache.org/jira/browse/HBASE-3020"
385 public void testRecoveredEditsPathForMeta() throws IOException
{
386 Path p
= createRecoveredEditsPathForRegion();
387 String parentOfParent
= p
.getParent().getParent().getName();
388 assertEquals(parentOfParent
, RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
392 * Test old recovered edits file doesn't break WALSplitter.
393 * This is useful in upgrading old instances.
396 public void testOldRecoveredEditsFileSidelined() throws IOException
{
397 Path p
= createRecoveredEditsPathForRegion();
398 Path tdir
= FSUtils
.getTableDir(HBASEDIR
, TableName
.META_TABLE_NAME
);
399 Path regiondir
= new Path(tdir
,
400 RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
401 fs
.mkdirs(regiondir
);
402 Path parent
= WALSplitUtil
.getRegionDirRecoveredEditsDir(regiondir
);
403 assertEquals(HConstants
.RECOVERED_EDITS_DIR
, parent
.getName());
404 fs
.createNewFile(parent
); // create a recovered.edits file
405 String parentOfParent
= p
.getParent().getParent().getName();
406 assertEquals(parentOfParent
, RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
407 WALFactory
.createRecoveredEditsWriter(fs
, p
, conf
).close();
410 private Path
createRecoveredEditsPathForRegion() throws IOException
{
411 byte[] encoded
= RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes();
412 long now
= System
.currentTimeMillis();
413 Entry entry
= new Entry(
414 new WALKeyImpl(encoded
, TableName
.META_TABLE_NAME
, 1, now
, HConstants
.DEFAULT_CLUSTER_ID
),
416 Path p
= WALSplitUtil
417 .getRegionSplitEditsPath(TableName
.META_TABLE_NAME
, encoded
, 1, FILENAME_BEING_SPLIT
,
423 public void testHasRecoveredEdits() throws IOException
{
424 Path p
= createRecoveredEditsPathForRegion();
425 assertFalse(WALSplitUtil
.hasRecoveredEdits(conf
, RegionInfoBuilder
.FIRST_META_REGIONINFO
));
426 String renamedEdit
= p
.getName().split("-")[0];
427 fs
.createNewFile(new Path(p
.getParent(), renamedEdit
));
428 assertTrue(WALSplitUtil
.hasRecoveredEdits(conf
, RegionInfoBuilder
.FIRST_META_REGIONINFO
));
431 private void useDifferentDFSClient() throws IOException
{
432 // make fs act as a different client now
433 // initialize will create a new DFSClient with a new client ID
434 fs
.initialize(fs
.getUri(), conf
);
438 public void testSplitPreservesEdits() throws IOException
{
439 final String REGION
= "region__1";
443 generateWALs(1, 10, -1, 0);
444 useDifferentDFSClient();
445 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
446 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
447 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
448 assertEquals(1, splitLog
.length
);
450 assertTrue("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
454 public void testSplitRemovesRegionEventsEdits() throws IOException
{
455 final String REGION
= "region__1";
459 generateWALs(1, 10, -1, 100);
460 useDifferentDFSClient();
461 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
462 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
463 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
464 assertEquals(1, splitLog
.length
);
466 assertFalse("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
467 // split log should only have the test edits
468 assertEquals(10, countWAL(splitLog
[0]));
473 public void testSplitLeavesCompactionEventsEdits() throws IOException
{
474 RegionInfo hri
= RegionInfoBuilder
.newBuilder(TABLE_NAME
).build();
476 REGIONS
.add(hri
.getEncodedName());
477 Path regionDir
= new Path(FSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
), hri
.getEncodedName());
478 LOG
.info("Creating region directory: " + regionDir
);
479 assertTrue(fs
.mkdirs(regionDir
));
481 Writer writer
= generateWALs(1, 10, 0, 10);
482 String
[] compactInputs
= new String
[]{"file1", "file2", "file3"};
483 String compactOutput
= "file4";
484 appendCompactionEvent(writer
, hri
, compactInputs
, compactOutput
);
487 useDifferentDFSClient();
488 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
490 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
491 // original log should have 10 test edits, 10 region markers, 1 compaction marker
492 assertEquals(21, countWAL(originalLog
));
494 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, hri
.getEncodedName());
495 assertEquals(1, splitLog
.length
);
497 assertFalse("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
498 // split log should have 10 test edits plus 1 compaction marker
499 assertEquals(11, countWAL(splitLog
[0]));
503 * @param expectedEntries -1 to not assert
504 * @return the count across all regions
506 private int splitAndCount(final int expectedFiles
, final int expectedEntries
)
508 useDifferentDFSClient();
509 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
511 for (String region
: REGIONS
) {
512 Path
[] logfiles
= getLogForRegion(TABLE_NAME
, region
);
513 assertEquals(expectedFiles
, logfiles
.length
);
515 for (Path logfile
: logfiles
) {
516 count
+= countWAL(logfile
);
518 if (-1 != expectedEntries
) {
519 assertEquals(expectedEntries
, count
);
527 public void testEmptyLogFiles() throws IOException
{
528 testEmptyLogFiles(true);
532 public void testEmptyOpenLogFiles() throws IOException
{
533 testEmptyLogFiles(false);
536 private void testEmptyLogFiles(final boolean close
) throws IOException
{
537 // we won't create the hlog dir until getWAL got called, so
538 // make dir here when testing empty log file
540 injectEmptyFile(".empty", close
);
541 generateWALs(Integer
.MAX_VALUE
);
542 injectEmptyFile("empty", close
);
543 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
); // skip 2 empty
547 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException
{
548 // generate logs but leave wal.dat.5 open.
550 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
554 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException
{
555 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
556 generateWALs(Integer
.MAX_VALUE
);
557 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
558 Corruptions
.APPEND_GARBAGE
, true);
559 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
563 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException
{
564 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
565 generateWALs(Integer
.MAX_VALUE
);
566 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
567 Corruptions
.INSERT_GARBAGE_ON_FIRST_LINE
, true);
568 splitAndCount(NUM_WRITERS
- 1, (NUM_WRITERS
- 1) * ENTRIES
); //1 corrupt
572 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException
{
573 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
574 generateWALs(Integer
.MAX_VALUE
);
575 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
576 Corruptions
.INSERT_GARBAGE_IN_THE_MIDDLE
, false);
577 // the entries in the original logs are alternating regions
578 // considering the sequence file header, the middle corruption should
579 // affect at least half of the entries
580 int goodEntries
= (NUM_WRITERS
- 1) * ENTRIES
;
581 int firstHalfEntries
= (int) Math
.ceil(ENTRIES
/ 2) - 1;
582 int allRegionsCount
= splitAndCount(NUM_WRITERS
, -1);
583 assertTrue("The file up to the corrupted area hasn't been parsed",
584 REGIONS
.size() * (goodEntries
+ firstHalfEntries
) <= allRegionsCount
);
588 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException
{
589 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
590 List
<FaultyProtobufLogReader
.FailureType
> failureTypes
= Arrays
591 .asList(FaultyProtobufLogReader
.FailureType
.values()).stream()
592 .filter(x
-> x
!= FaultyProtobufLogReader
.FailureType
.NONE
).collect(Collectors
.toList());
593 for (FaultyProtobufLogReader
.FailureType failureType
: failureTypes
) {
594 final Set
<String
> walDirContents
= splitCorruptWALs(failureType
);
595 final Set
<String
> archivedLogs
= new HashSet
<>();
596 final StringBuilder archived
= new StringBuilder("Archived logs in CORRUPTDIR:");
597 for (FileStatus log
: fs
.listStatus(CORRUPTDIR
)) {
598 archived
.append("\n\t").append(log
.toString());
599 archivedLogs
.add(log
.getPath().getName());
601 LOG
.debug(archived
.toString());
602 assertEquals(failureType
.name() + ": expected to find all of our wals corrupt.", archivedLogs
,
608 * @return set of wal names present prior to split attempt.
609 * @throws IOException if the split process fails
611 private Set
<String
> splitCorruptWALs(final FaultyProtobufLogReader
.FailureType failureType
)
613 Class
<?
> backupClass
= conf
.getClass("hbase.regionserver.hlog.reader.impl",
615 InstrumentedLogWriter
.activateFailure
= false;
618 conf
.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader
.class,
620 conf
.set("faultyprotobuflogreader.failuretype", failureType
.name());
621 // Clean up from previous tests or previous loop
624 } catch (IOException exception
) {
625 // since we're splitting out from under the factory, we should expect some closing failures.
626 LOG
.debug("Ignoring problem closing WALFactory.", exception
);
630 for (FileStatus log
: fs
.listStatus(CORRUPTDIR
)) {
631 fs
.delete(log
.getPath(), true);
633 } catch (FileNotFoundException exception
) {
634 LOG
.debug("no previous CORRUPTDIR to clean.");
636 // change to the faulty reader
637 wals
= new WALFactory(conf
, name
.getMethodName());
639 // Our reader will render all of these files corrupt.
640 final Set
<String
> walDirContents
= new HashSet
<>();
641 for (FileStatus status
: fs
.listStatus(WALDIR
)) {
642 walDirContents
.add(status
.getPath().getName());
644 useDifferentDFSClient();
645 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
646 return walDirContents
;
648 conf
.setClass("hbase.regionserver.hlog.reader.impl", backupClass
,
653 @Test (expected
= IOException
.class)
654 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
656 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
657 splitCorruptWALs(FaultyProtobufLogReader
.FailureType
.BEGINNING
);
661 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
663 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
665 splitCorruptWALs(FaultyProtobufLogReader
.FailureType
.BEGINNING
);
666 } catch (IOException e
) {
667 LOG
.debug("split with 'skip errors' set to 'false' correctly threw");
669 assertEquals("if skip.errors is false all files should remain in place",
670 NUM_WRITERS
, fs
.listStatus(WALDIR
).length
);
673 private void ignoreCorruption(final Corruptions corruption
, final int entryCount
,
674 final int expectedCount
) throws IOException
{
675 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
677 final String REGION
= "region__1";
681 Path c1
= new Path(WALDIR
, WAL_FILE_PREFIX
+ "0");
682 generateWALs(1, entryCount
, -1, 0);
683 corruptWAL(c1
, corruption
, true);
685 useDifferentDFSClient();
686 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
688 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
689 assertEquals(1, splitLog
.length
);
692 Reader in
= wals
.createReader(fs
, splitLog
[0]);
693 @SuppressWarnings("unused")
695 while ((entry
= in
.next()) != null) ++actualCount
;
696 assertEquals(expectedCount
, actualCount
);
699 // should not have stored the EOF files as corrupt
700 FileStatus
[] archivedLogs
= fs
.listStatus(CORRUPTDIR
);
701 assertEquals(0, archivedLogs
.length
);
706 public void testEOFisIgnored() throws IOException
{
708 ignoreCorruption(Corruptions
.TRUNCATE
, entryCount
, entryCount
-1);
712 public void testCorruptWALTrailer() throws IOException
{
714 ignoreCorruption(Corruptions
.TRUNCATE_TRAILER
, entryCount
, entryCount
);
718 public void testLogsGetArchivedAfterSplit() throws IOException
{
719 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
721 useDifferentDFSClient();
722 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
723 FileStatus
[] archivedLogs
= fs
.listStatus(OLDLOGDIR
);
724 assertEquals("wrong number of files in the archive log", NUM_WRITERS
, archivedLogs
.length
);
728 public void testSplit() throws IOException
{
730 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
734 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
737 useDifferentDFSClient();
738 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
739 FileStatus
[] statuses
= null;
741 statuses
= fs
.listStatus(WALDIR
);
742 if (statuses
!= null) {
743 fail("Files left in log dir: " +
744 Joiner
.on(",").join(FileUtil
.stat2Paths(statuses
)));
746 } catch (FileNotFoundException e
) {
747 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
751 @Test(expected
= IOException
.class)
752 public void testSplitWillFailIfWritingToRegionFails() throws Exception
{
753 //leave 5th log open so we could append the "trap"
754 Writer writer
= generateWALs(4);
755 useDifferentDFSClient();
757 String region
= "break";
758 Path regiondir
= new Path(TABLEDIR
, region
);
759 fs
.mkdirs(regiondir
);
761 InstrumentedLogWriter
.activateFailure
= false;
762 appendEntry(writer
, TABLE_NAME
, Bytes
.toBytes(region
),
763 Bytes
.toBytes("r" + 999), FAMILY
, QUALIFIER
, VALUE
, 0);
767 InstrumentedLogWriter
.activateFailure
= true;
768 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
769 } catch (IOException e
) {
770 assertTrue(e
.getMessage().
771 contains("This exception is instrumented and should only be thrown for testing"));
774 InstrumentedLogWriter
.activateFailure
= false;
779 public void testSplitDeletedRegion() throws IOException
{
781 String region
= "region_that_splits";
785 useDifferentDFSClient();
787 Path regiondir
= new Path(TABLEDIR
, region
);
788 fs
.delete(regiondir
, true);
789 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
790 assertFalse(fs
.exists(regiondir
));
794 public void testIOEOnOutputThread() throws Exception
{
795 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
798 useDifferentDFSClient();
799 FileStatus
[] logfiles
= fs
.listStatus(WALDIR
);
800 assertTrue("There should be some log file",
801 logfiles
!= null && logfiles
.length
> 0);
802 // wals with no entries (like the one we don't use in the factory)
803 // won't cause a failure since nothing will ever be written.
804 // pick the largest one since it's most likely to have entries.
805 int largestLogFile
= 0;
806 long largestSize
= 0;
807 for (int i
= 0; i
< logfiles
.length
; i
++) {
808 if (logfiles
[i
].getLen() > largestSize
) {
810 largestSize
= logfiles
[i
].getLen();
813 assertTrue("There should be some log greater than size 0.", 0 < largestSize
);
814 // Set up a splitter that will throw an IOE on the output side
815 WALSplitter logSplitter
=
816 new WALSplitter(wals
, conf
, HBASEDIR
, fs
, HBASEDIR
, fs
, null, null, null) {
818 protected Writer
createWriter(Path logfile
) throws IOException
{
819 Writer mockWriter
= Mockito
.mock(Writer
.class);
820 Mockito
.doThrow(new IOException("Injected")).when(mockWriter
)
821 .append(Mockito
.<Entry
> any());
825 // Set up a background thread dumper. Needs a thread to depend on and then we need to run
826 // the thread dumping in a background thread so it does not hold up the test.
827 final AtomicBoolean stop
= new AtomicBoolean(false);
828 final Thread someOldThread
= new Thread("Some-old-thread") {
831 while(!stop
.get()) Threads
.sleep(10);
834 someOldThread
.setDaemon(true);
835 someOldThread
.start();
836 final Thread t
= new Thread("Background-thread-dumper") {
840 Threads
.threadDumpingIsAlive(someOldThread
);
841 } catch (InterruptedException e
) {
849 logSplitter
.splitLogFile(logfiles
[largestLogFile
], null);
850 fail("Didn't throw!");
851 } catch (IOException ioe
) {
852 assertTrue(ioe
.toString().contains("Injected"));
854 // Setting this to true will turn off the background thread dumper.
860 * @param spiedFs should be instrumented for failure.
862 private void retryOverHdfsProblem(final FileSystem spiedFs
) throws Exception
{
864 useDifferentDFSClient();
867 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, spiedFs
, conf
, wals
);
868 assertEquals(NUM_WRITERS
, fs
.listStatus(OLDLOGDIR
).length
);
869 assertFalse(fs
.exists(WALDIR
));
870 } catch (IOException e
) {
871 fail("There shouldn't be any exception but: " + e
.toString());
875 // Test for HBASE-3412
877 public void testMovedWALDuringRecovery() throws Exception
{
878 // This partial mock will throw LEE for every file simulating
879 // files that were moved
880 FileSystem spiedFs
= Mockito
.spy(fs
);
881 // The "File does not exist" part is very important,
882 // that's how it comes out of HDFS
883 Mockito
.doThrow(new LeaseExpiredException("Injected: File does not exist")).
884 when(spiedFs
).append(Mockito
.<Path
>any());
885 retryOverHdfsProblem(spiedFs
);
889 public void testRetryOpenDuringRecovery() throws Exception
{
890 FileSystem spiedFs
= Mockito
.spy(fs
);
891 // The "Cannot obtain block length", "Could not obtain the last block",
892 // and "Blocklist for [^ ]* has changed.*" part is very important,
893 // that's how it comes out of HDFS. If HDFS changes the exception
894 // message, this test needs to be adjusted accordingly.
896 // When DFSClient tries to open a file, HDFS needs to locate
897 // the last block of the file and get its length. However, if the
898 // last block is under recovery, HDFS may have problem to obtain
899 // the block length, in which case, retry may help.
900 Mockito
.doAnswer(new Answer
<FSDataInputStream
>() {
901 private final String
[] errors
= new String
[] {
902 "Cannot obtain block length", "Could not obtain the last block",
903 "Blocklist for " + OLDLOGDIR
+ " has changed"};
904 private int count
= 0;
907 public FSDataInputStream
answer(InvocationOnMock invocation
) throws Throwable
{
909 throw new IOException(errors
[count
++]);
911 return (FSDataInputStream
)invocation
.callRealMethod();
913 }).when(spiedFs
).open(Mockito
.<Path
>any(), Mockito
.anyInt());
914 retryOverHdfsProblem(spiedFs
);
918 public void testTerminationAskedByReporter() throws IOException
, CorruptedLogFileException
{
919 generateWALs(1, 10, -1);
920 FileStatus logfile
= fs
.listStatus(WALDIR
)[0];
921 useDifferentDFSClient();
923 final AtomicInteger count
= new AtomicInteger();
925 CancelableProgressable localReporter
926 = new CancelableProgressable() {
928 public boolean progress() {
929 count
.getAndIncrement();
934 FileSystem spiedFs
= Mockito
.spy(fs
);
935 Mockito
.doAnswer(new Answer
<FSDataInputStream
>() {
937 public FSDataInputStream
answer(InvocationOnMock invocation
) throws Throwable
{
938 Thread
.sleep(1500); // Sleep a while and wait report status invoked
939 return (FSDataInputStream
)invocation
.callRealMethod();
941 }).when(spiedFs
).open(Mockito
.<Path
>any(), Mockito
.anyInt());
944 conf
.setInt("hbase.splitlog.report.period", 1000);
945 boolean ret
= WALSplitter
.splitLogFile(HBASEDIR
, logfile
, spiedFs
, conf
, localReporter
, null,
947 assertFalse("Log splitting should failed", ret
);
948 assertTrue(count
.get() > 0);
949 } catch (IOException e
) {
950 fail("There shouldn't be any exception but: " + e
.toString());
952 // reset it back to its default value
953 conf
.setInt("hbase.splitlog.report.period", 59000);
958 * Test log split process with fake data and lots of edits to trigger threading
962 public void testThreading() throws Exception
{
963 doTestThreading(20000, 128*1024*1024, 0);
967 * Test blocking behavior of the log split process if writers are writing slower
968 * than the reader is reading.
971 public void testThreadingSlowWriterSmallBuffer() throws Exception
{
972 doTestThreading(200, 1024, 50);
976 * Sets up a log splitter with a mock reader and writer. The mock reader generates
977 * a specified number of edits spread across 5 regions. The mock writer optionally
978 * sleeps for each edit it is fed.
980 * After the split is complete, verifies that the statistics show the correct number
981 * of edits output into each region.
983 * @param numFakeEdits number of fake edits to push through pipeline
984 * @param bufferSize size of in-memory buffer
985 * @param writerSlowness writer threads will sleep this many ms per edit
987 private void doTestThreading(final int numFakeEdits
,
988 final int bufferSize
,
989 final int writerSlowness
) throws Exception
{
991 Configuration localConf
= new Configuration(conf
);
992 localConf
.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize
);
994 // Create a fake log file (we'll override the reader to produce a stream of edits)
995 Path logPath
= new Path(WALDIR
, WAL_FILE_PREFIX
+ ".fake");
996 FSDataOutputStream out
= fs
.create(logPath
);
999 // Make region dirs for our destination regions so the output doesn't get skipped
1000 final List
<String
> regions
= ImmutableList
.of("r0", "r1", "r2", "r3", "r4");
1001 makeRegionDirs(regions
);
1003 // Create a splitter that reads and writes the data without touching disk
1004 WALSplitter logSplitter
=
1005 new WALSplitter(wals
, localConf
, HBASEDIR
, fs
, HBASEDIR
, fs
, null, null, null) {
1007 /* Produce a mock writer that doesn't write anywhere */
1009 protected Writer
createWriter(Path logfile
) throws IOException
{
1010 Writer mockWriter
= Mockito
.mock(Writer
.class);
1011 Mockito
.doAnswer(new Answer
<Void
>() {
1012 int expectedIndex
= 0;
1015 public Void
answer(InvocationOnMock invocation
) {
1016 if (writerSlowness
> 0) {
1018 Thread
.sleep(writerSlowness
);
1019 } catch (InterruptedException ie
) {
1020 Thread
.currentThread().interrupt();
1023 Entry entry
= (Entry
) invocation
.getArgument(0);
1024 WALEdit edit
= entry
.getEdit();
1025 List
<Cell
> cells
= edit
.getCells();
1026 assertEquals(1, cells
.size());
1027 Cell cell
= cells
.get(0);
1029 // Check that the edits come in the right order.
1030 assertEquals(expectedIndex
, Bytes
.toInt(cell
.getRowArray(), cell
.getRowOffset(),
1031 cell
.getRowLength()));
1035 }).when(mockWriter
).append(Mockito
.<Entry
>any());
1039 /* Produce a mock reader that generates fake entries */
1041 protected Reader
getReader(Path curLogFile
, CancelableProgressable reporter
)
1042 throws IOException
{
1043 Reader mockReader
= Mockito
.mock(Reader
.class);
1044 Mockito
.doAnswer(new Answer
<Entry
>() {
1048 public Entry
answer(InvocationOnMock invocation
) throws Throwable
{
1049 if (index
>= numFakeEdits
) return null;
1051 // Generate r0 through r4 in round robin fashion
1052 int regionIdx
= index
% regions
.size();
1053 byte region
[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx
)};
1055 Entry ret
= createTestEntry(TABLE_NAME
, region
,
1056 Bytes
.toBytes(index
/ regions
.size()),
1057 FAMILY
, QUALIFIER
, VALUE
, index
);
1061 }).when(mockReader
).next();
1066 logSplitter
.splitLogFile(fs
.getFileStatus(logPath
), null);
1068 // Verify number of written edits per region
1069 Map
<String
, Long
> outputCounts
= logSplitter
.outputSink
.getOutputCounts();
1070 for (Map
.Entry
<String
, Long
> entry
: outputCounts
.entrySet()) {
1071 LOG
.info("Got " + entry
.getValue() + " output edits for region " + entry
.getKey());
1072 assertEquals((long) entry
.getValue(), numFakeEdits
/ regions
.size());
1074 assertEquals("Should have as many outputs as regions", regions
.size(), outputCounts
.size());
1077 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1079 public void testSplitLogFileDeletedRegionDir() throws IOException
{
1080 LOG
.info("testSplitLogFileDeletedRegionDir");
1081 final String REGION
= "region__1";
1083 REGIONS
.add(REGION
);
1085 generateWALs(1, 10, -1);
1086 useDifferentDFSClient();
1088 Path regiondir
= new Path(TABLEDIR
, REGION
);
1089 LOG
.info("Region directory is" + regiondir
);
1090 fs
.delete(regiondir
, true);
1091 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1092 assertFalse(fs
.exists(regiondir
));
1096 public void testSplitLogFileEmpty() throws IOException
{
1097 LOG
.info("testSplitLogFileEmpty");
1098 // we won't create the hlog dir until getWAL got called, so
1099 // make dir here when testing empty log file
1101 injectEmptyFile(".empty", true);
1102 useDifferentDFSClient();
1104 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1105 Path tdir
= FSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
);
1106 assertFalse(fs
.exists(tdir
));
1108 assertEquals(0, countWAL(fs
.listStatus(OLDLOGDIR
)[0].getPath()));
1112 public void testSplitLogFileMultipleRegions() throws IOException
{
1113 LOG
.info("testSplitLogFileMultipleRegions");
1114 generateWALs(1, 10, -1);
1115 splitAndCount(1, 10);
1119 public void testSplitLogFileFirstLineCorruptionLog()
1120 throws IOException
{
1121 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
1122 generateWALs(1, 10, -1);
1123 FileStatus logfile
= fs
.listStatus(WALDIR
)[0];
1125 corruptWAL(logfile
.getPath(),
1126 Corruptions
.INSERT_GARBAGE_ON_FIRST_LINE
, true);
1128 useDifferentDFSClient();
1129 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1131 final Path corruptDir
= new Path(FSUtils
.getWALRootDir(conf
), HConstants
.CORRUPT_DIR_NAME
);
1132 assertEquals(1, fs
.listStatus(corruptDir
).length
);
1136 * @see "https://issues.apache.org/jira/browse/HBASE-4862"
1139 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException
{
1140 LOG
.info("testConcurrentSplitLogAndReplayRecoverEdit");
1141 // Generate wals for our destination region
1142 String regionName
= "r0";
1143 final Path regiondir
= new Path(TABLEDIR
, regionName
);
1145 REGIONS
.add(regionName
);
1149 FileStatus
[] logfiles
= fs
.listStatus(WALDIR
);
1150 assertTrue("There should be some log file",
1151 logfiles
!= null && logfiles
.length
> 0);
1153 WALSplitter logSplitter
=
1154 new WALSplitter(wals
, conf
, HBASEDIR
, fs
, HBASEDIR
, fs
, null, null, null) {
1156 protected Writer
createWriter(Path logfile
)
1157 throws IOException
{
1158 Writer writer
= wals
.createRecoveredEditsWriter(this.walFS
, logfile
);
1159 // After creating writer, simulate region's
1160 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1161 // region and delete them, excluding files with '.temp' suffix.
1162 NavigableSet
<Path
> files
= WALSplitUtil
.getSplitEditFilesSorted(fs
, regiondir
);
1163 if (files
!= null && !files
.isEmpty()) {
1164 for (Path file
: files
) {
1165 if (!this.walFS
.delete(file
, false)) {
1166 LOG
.error("Failed delete of " + file
);
1168 LOG
.debug("Deleted recovered.edits file=" + file
);
1176 logSplitter
.splitLogFile(logfiles
[0], null);
1177 } catch (IOException e
) {
1178 LOG
.info(e
.toString(), e
);
1179 fail("Throws IOException when spliting "
1180 + "log, it is most likely because writing file does not "
1181 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1183 if (fs
.exists(CORRUPTDIR
)) {
1184 if (fs
.listStatus(CORRUPTDIR
).length
> 0) {
1185 fail("There are some corrupt logs, "
1186 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1191 private Writer
generateWALs(int leaveOpen
) throws IOException
{
1192 return generateWALs(NUM_WRITERS
, ENTRIES
, leaveOpen
, 0);
1195 private Writer
generateWALs(int writers
, int entries
, int leaveOpen
) throws IOException
{
1196 return generateWALs(writers
, entries
, leaveOpen
, 7);
1199 private void makeRegionDirs(List
<String
> regions
) throws IOException
{
1200 for (String region
: regions
) {
1201 LOG
.debug("Creating dir for region " + region
);
1202 fs
.mkdirs(new Path(TABLEDIR
, region
));
1207 * @param leaveOpen index to leave un-closed. -1 to close all.
1208 * @return the writer that's still open, or null if all were closed.
1210 private Writer
generateWALs(int writers
, int entries
, int leaveOpen
, int regionEvents
) throws IOException
{
1211 makeRegionDirs(REGIONS
);
1213 Writer
[] ws
= new Writer
[writers
];
1215 int numRegionEventsAdded
= 0;
1216 for (int i
= 0; i
< writers
; i
++) {
1217 ws
[i
] = wals
.createWALWriter(fs
, new Path(WALDIR
, WAL_FILE_PREFIX
+ i
));
1218 for (int j
= 0; j
< entries
; j
++) {
1220 for (String region
: REGIONS
) {
1221 String row_key
= region
+ prefix
++ + i
+ j
;
1222 appendEntry(ws
[i
], TABLE_NAME
, Bytes
.toBytes(region
), Bytes
.toBytes(row_key
), FAMILY
,
1223 QUALIFIER
, VALUE
, seq
++);
1225 if (numRegionEventsAdded
< regionEvents
) {
1226 numRegionEventsAdded
++;
1227 appendRegionEvent(ws
[i
], region
);
1231 if (i
!= leaveOpen
) {
1233 LOG
.info("Closing writer " + i
);
1236 if (leaveOpen
< 0 || leaveOpen
>= writers
) {
1239 return ws
[leaveOpen
];
1244 private Path
[] getLogForRegion(TableName table
, String region
)
1245 throws IOException
{
1246 Path tdir
= FSUtils
.getWALTableDir(conf
, table
);
1247 @SuppressWarnings("deprecation")
1248 Path editsdir
= WALSplitUtil
.getRegionDirRecoveredEditsDir(HRegion
.getRegionDir(tdir
,
1249 Bytes
.toString(Bytes
.toBytes(region
))));
1250 FileStatus
[] files
= fs
.listStatus(editsdir
, new PathFilter() {
1252 public boolean accept(Path p
) {
1253 if (WALSplitUtil
.isSequenceIdFile(p
)) {
1259 Path
[] paths
= new Path
[files
.length
];
1260 for (int i
= 0; i
< files
.length
; i
++) {
1261 paths
[i
] = files
[i
].getPath();
1266 private void corruptWAL(Path path
, Corruptions corruption
, boolean close
) throws IOException
{
1267 FSDataOutputStream out
;
1268 int fileSize
= (int) fs
.listStatus(path
)[0].getLen();
1270 FSDataInputStream in
= fs
.open(path
);
1271 byte[] corrupted_bytes
= new byte[fileSize
];
1272 in
.readFully(0, corrupted_bytes
, 0, fileSize
);
1275 switch (corruption
) {
1276 case APPEND_GARBAGE
:
1277 fs
.delete(path
, false);
1278 out
= fs
.create(path
);
1279 out
.write(corrupted_bytes
);
1280 out
.write(Bytes
.toBytes("-----"));
1281 closeOrFlush(close
, out
);
1284 case INSERT_GARBAGE_ON_FIRST_LINE
:
1285 fs
.delete(path
, false);
1286 out
= fs
.create(path
);
1288 out
.write(corrupted_bytes
);
1289 closeOrFlush(close
, out
);
1292 case INSERT_GARBAGE_IN_THE_MIDDLE
:
1293 fs
.delete(path
, false);
1294 out
= fs
.create(path
);
1295 int middle
= (int) Math
.floor(corrupted_bytes
.length
/ 2);
1296 out
.write(corrupted_bytes
, 0, middle
);
1298 out
.write(corrupted_bytes
, middle
, corrupted_bytes
.length
- middle
);
1299 closeOrFlush(close
, out
);
1303 fs
.delete(path
, false);
1304 out
= fs
.create(path
);
1305 out
.write(corrupted_bytes
, 0, fileSize
1306 - (32 + ProtobufLogReader
.PB_WAL_COMPLETE_MAGIC
.length
+ Bytes
.SIZEOF_INT
));
1307 closeOrFlush(close
, out
);
1310 case TRUNCATE_TRAILER
:
1311 fs
.delete(path
, false);
1312 out
= fs
.create(path
);
1313 out
.write(corrupted_bytes
, 0, fileSize
- Bytes
.SIZEOF_INT
);// trailer is truncated.
1314 closeOrFlush(close
, out
);
1319 private void closeOrFlush(boolean close
, FSDataOutputStream out
)
1320 throws IOException
{
1324 Method syncMethod
= null;
1326 syncMethod
= out
.getClass().getMethod("hflush", new Class
<?
> []{});
1327 } catch (NoSuchMethodException e
) {
1329 syncMethod
= out
.getClass().getMethod("sync", new Class
<?
> []{});
1330 } catch (NoSuchMethodException ex
) {
1331 throw new IOException("This version of Hadoop supports " +
1332 "neither Syncable.sync() nor Syncable.hflush().");
1336 syncMethod
.invoke(out
, new Object
[]{});
1337 } catch (Exception e
) {
1338 throw new IOException(e
);
1340 // Not in 0out.hflush();
1344 private int countWAL(Path log
) throws IOException
{
1346 Reader in
= wals
.createReader(fs
, log
);
1347 while (in
.next() != null) {
1354 private static void appendCompactionEvent(Writer w
, RegionInfo hri
, String
[] inputs
,
1355 String output
) throws IOException
{
1356 WALProtos
.CompactionDescriptor
.Builder desc
= WALProtos
.CompactionDescriptor
.newBuilder();
1357 desc
.setTableName(ByteString
.copyFrom(hri
.getTable().toBytes()))
1358 .setEncodedRegionName(ByteString
.copyFrom(hri
.getEncodedNameAsBytes()))
1359 .setRegionName(ByteString
.copyFrom(hri
.getRegionName()))
1360 .setFamilyName(ByteString
.copyFrom(FAMILY
))
1361 .setStoreHomeDir(hri
.getEncodedName() + "/" + Bytes
.toString(FAMILY
))
1362 .addAllCompactionInput(Arrays
.asList(inputs
))
1363 .addCompactionOutput(output
);
1365 WALEdit edit
= WALEdit
.createCompaction(hri
, desc
.build());
1366 WALKeyImpl key
= new WALKeyImpl(hri
.getEncodedNameAsBytes(), TABLE_NAME
, 1,
1367 EnvironmentEdgeManager
.currentTime(), HConstants
.DEFAULT_CLUSTER_ID
);
1368 w
.append(new Entry(key
, edit
));
1372 private static void appendRegionEvent(Writer w
, String region
) throws IOException
{
1373 WALProtos
.RegionEventDescriptor regionOpenDesc
= ProtobufUtil
.toRegionEventDescriptor(
1374 WALProtos
.RegionEventDescriptor
.EventType
.REGION_OPEN
,
1375 TABLE_NAME
.toBytes(),
1376 Bytes
.toBytes(region
),
1377 Bytes
.toBytes(String
.valueOf(region
.hashCode())),
1379 ServerName
.parseServerName("ServerName:9099"), ImmutableMap
.<byte[], List
<Path
>>of());
1380 final long time
= EnvironmentEdgeManager
.currentTime();
1381 final WALKeyImpl walKey
= new WALKeyImpl(Bytes
.toBytes(region
), TABLE_NAME
, 1, time
,
1382 HConstants
.DEFAULT_CLUSTER_ID
);
1383 WALEdit we
= WALEdit
.createRegionEventWALEdit(Bytes
.toBytes(region
), regionOpenDesc
);
1384 w
.append(new Entry(walKey
, we
));
1388 public static long appendEntry(Writer writer
, TableName table
, byte[] region
,
1389 byte[] row
, byte[] family
, byte[] qualifier
,
1390 byte[] value
, long seq
)
1391 throws IOException
{
1392 LOG
.info(Thread
.currentThread().getName() + " append");
1393 writer
.append(createTestEntry(table
, region
, row
, family
, qualifier
, value
, seq
));
1394 LOG
.info(Thread
.currentThread().getName() + " sync");
1399 private static Entry
createTestEntry(
1400 TableName table
, byte[] region
,
1401 byte[] row
, byte[] family
, byte[] qualifier
,
1402 byte[] value
, long seq
) {
1403 long time
= System
.nanoTime();
1406 final KeyValue cell
= new KeyValue(row
, family
, qualifier
, time
, KeyValue
.Type
.Put
, value
);
1407 WALEdit edit
= new WALEdit();
1409 return new Entry(new WALKeyImpl(region
, table
, seq
, time
,
1410 HConstants
.DEFAULT_CLUSTER_ID
), edit
);
1413 private void injectEmptyFile(String suffix
, boolean closeFile
) throws IOException
{
1415 WALFactory
.createWALWriter(fs
, new Path(WALDIR
, WAL_FILE_PREFIX
+ suffix
), conf
);
1421 private boolean logsAreEqual(Path p1
, Path p2
) throws IOException
{
1423 in1
= wals
.createReader(fs
, p1
);
1424 in2
= wals
.createReader(fs
, p2
);
1427 while ((entry1
= in1
.next()) != null) {
1428 entry2
= in2
.next();
1429 if ((entry1
.getKey().compareTo(entry2
.getKey()) != 0) ||
1430 (!entry1
.getEdit().toString().equals(entry2
.getEdit().toString()))) {