2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertFalse
;
22 import static org
.junit
.Assert
.assertTrue
;
23 import static org
.junit
.Assert
.fail
;
25 import java
.io
.FileNotFoundException
;
26 import java
.io
.IOException
;
27 import java
.lang
.reflect
.Method
;
28 import java
.security
.PrivilegedExceptionAction
;
29 import java
.util
.ArrayList
;
30 import java
.util
.Arrays
;
31 import java
.util
.Collections
;
32 import java
.util
.HashMap
;
33 import java
.util
.HashSet
;
34 import java
.util
.List
;
36 import java
.util
.NavigableSet
;
37 import java
.util
.Objects
;
39 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
40 import java
.util
.concurrent
.atomic
.AtomicInteger
;
41 import java
.util
.concurrent
.atomic
.AtomicLong
;
42 import java
.util
.stream
.Collectors
;
43 import org
.apache
.hadoop
.conf
.Configuration
;
44 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
45 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
46 import org
.apache
.hadoop
.fs
.FileStatus
;
47 import org
.apache
.hadoop
.fs
.FileSystem
;
48 import org
.apache
.hadoop
.fs
.FileUtil
;
49 import org
.apache
.hadoop
.fs
.Path
;
50 import org
.apache
.hadoop
.fs
.PathFilter
;
51 import org
.apache
.hadoop
.hbase
.Cell
;
52 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
53 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
54 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
55 import org
.apache
.hadoop
.hbase
.HConstants
;
56 import org
.apache
.hadoop
.hbase
.KeyValue
;
57 import org
.apache
.hadoop
.hbase
.ServerName
;
58 import org
.apache
.hadoop
.hbase
.TableName
;
59 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
60 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FaultyProtobufLogReader
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.InstrumentedLogWriter
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogReader
;
65 import org
.apache
.hadoop
.hbase
.security
.User
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
67 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
68 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
69 import org
.apache
.hadoop
.hbase
.util
.CancelableProgressable
;
70 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
71 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
72 import org
.apache
.hadoop
.hbase
.util
.Threads
;
73 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Entry
;
74 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Reader
;
75 import org
.apache
.hadoop
.hbase
.wal
.WALProvider
.Writer
;
76 import org
.apache
.hadoop
.hbase
.wal
.WALSplitter
.CorruptedLogFileException
;
77 import org
.apache
.hadoop
.hdfs
.DFSTestUtil
;
78 import org
.apache
.hadoop
.hdfs
.server
.namenode
.LeaseExpiredException
;
79 import org
.apache
.hadoop
.ipc
.RemoteException
;
80 import org
.junit
.After
;
81 import org
.junit
.AfterClass
;
82 import org
.junit
.Before
;
83 import org
.junit
.BeforeClass
;
84 import org
.junit
.ClassRule
;
85 import org
.junit
.Rule
;
86 import org
.junit
.Test
;
87 import org
.junit
.experimental
.categories
.Category
;
88 import org
.junit
.rules
.TestName
;
89 import org
.mockito
.Mockito
;
90 import org
.mockito
.invocation
.InvocationOnMock
;
91 import org
.mockito
.stubbing
.Answer
;
92 import org
.slf4j
.Logger
;
93 import org
.slf4j
.LoggerFactory
;
95 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Joiner
;
96 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableList
;
97 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
98 import org
.apache
.hbase
.thirdparty
.com
.google
.protobuf
.ByteString
;
100 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
101 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.WALProtos
;
104 * Testing {@link WAL} splitting code.
106 @Category({RegionServerTests
.class, LargeTests
.class})
107 public class TestWALSplit
{
110 public static final HBaseClassTestRule CLASS_RULE
=
111 HBaseClassTestRule
.forClass(TestWALSplit
.class);
114 // Uncomment the following lines if more verbosity is needed for
115 // debugging (see HBASE-12285 for details).
116 //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
117 //((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
118 //((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
120 private final static Logger LOG
= LoggerFactory
.getLogger(TestWALSplit
.class);
122 private static Configuration conf
;
123 private FileSystem fs
;
125 protected final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
127 private Path HBASEDIR
;
128 private Path HBASELOGDIR
;
130 private Path OLDLOGDIR
;
131 private Path CORRUPTDIR
;
132 private Path TABLEDIR
;
133 private String TMPDIRNAME
;
135 private static final int NUM_WRITERS
= 10;
136 private static final int ENTRIES
= 10; // entries per writer per region
138 private static final String FILENAME_BEING_SPLIT
= "testfile";
139 private static final TableName TABLE_NAME
=
140 TableName
.valueOf("t1");
141 private static final byte[] FAMILY
= Bytes
.toBytes("f1");
142 private static final byte[] QUALIFIER
= Bytes
.toBytes("q1");
143 private static final byte[] VALUE
= Bytes
.toBytes("v1");
144 private static final String WAL_FILE_PREFIX
= "wal.dat.";
145 private static List
<String
> REGIONS
= new ArrayList
<>();
146 private static final String HBASE_SKIP_ERRORS
= "hbase.hlog.split.skip.errors";
147 private static String ROBBER
;
148 private static String ZOMBIE
;
149 private static String
[] GROUP
= new String
[] {"supergroup"};
151 static enum Corruptions
{
152 INSERT_GARBAGE_ON_FIRST_LINE
,
153 INSERT_GARBAGE_IN_THE_MIDDLE
,
160 public static void setUpBeforeClass() throws Exception
{
161 conf
= TEST_UTIL
.getConfiguration();
162 conf
.setClass("hbase.regionserver.hlog.writer.impl",
163 InstrumentedLogWriter
.class, Writer
.class);
164 // This is how you turn off shortcircuit read currently. TODO: Fix. Should read config.
165 System
.setProperty("hbase.tests.use.shortcircuit.reads", "false");
166 // Create fake maping user to group and set it to the conf.
167 Map
<String
, String
[]> u2g_map
= new HashMap
<>(2);
168 ROBBER
= User
.getCurrent().getName() + "-robber";
169 ZOMBIE
= User
.getCurrent().getName() + "-zombie";
170 u2g_map
.put(ROBBER
, GROUP
);
171 u2g_map
.put(ZOMBIE
, GROUP
);
172 DFSTestUtil
.updateConfWithFakeGroupMapping(conf
, u2g_map
);
173 conf
.setInt("dfs.heartbeat.interval", 1);
174 TEST_UTIL
.startMiniDFSCluster(2);
178 public static void tearDownAfterClass() throws Exception
{
179 TEST_UTIL
.shutdownMiniDFSCluster();
183 public TestName name
= new TestName();
184 private WALFactory wals
= null;
187 public void setUp() throws Exception
{
188 LOG
.info("Cleaning up cluster for new test.");
189 fs
= TEST_UTIL
.getDFSCluster().getFileSystem();
190 HBASEDIR
= TEST_UTIL
.createRootDir();
191 HBASELOGDIR
= TEST_UTIL
.createWALRootDir();
192 OLDLOGDIR
= new Path(HBASELOGDIR
, HConstants
.HREGION_OLDLOGDIR_NAME
);
193 CORRUPTDIR
= new Path(HBASELOGDIR
, HConstants
.CORRUPT_DIR_NAME
);
194 TABLEDIR
= FSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
);
195 TMPDIRNAME
= conf
.get(HConstants
.TEMPORARY_FS_DIRECTORY_KEY
,
196 HConstants
.DEFAULT_TEMPORARY_HDFS_DIRECTORY
);
198 Collections
.addAll(REGIONS
, "bbb", "ccc");
199 InstrumentedLogWriter
.activateFailure
= false;
200 wals
= new WALFactory(conf
, name
.getMethodName());
201 WALDIR
= new Path(HBASELOGDIR
,
202 AbstractFSWALProvider
.getWALDirectoryName(ServerName
.valueOf(name
.getMethodName(),
203 16010, System
.currentTimeMillis()).toString()));
208 public void tearDown() throws Exception
{
211 } catch(IOException exception
) {
212 // Some tests will move WALs out from under us. In those cases, we'll get an error on close.
213 LOG
.info("Ignoring an error while closing down our WALFactory. Fine for some tests, but if" +
214 " you see a failure look here.");
215 LOG
.debug("exception details", exception
);
218 fs
.delete(HBASEDIR
, true);
219 fs
.delete(HBASELOGDIR
, true);
224 * Simulates splitting a WAL out from under a regionserver that is still trying to write it.
225 * Ensures we do not lose edits.
226 * @throws IOException
227 * @throws InterruptedException
230 public void testLogCannotBeWrittenOnceParsed() throws IOException
, InterruptedException
{
231 final AtomicLong counter
= new AtomicLong(0);
232 AtomicBoolean stop
= new AtomicBoolean(false);
233 // Region we'll write edits too and then later examine to make sure they all made it in.
234 final String region
= REGIONS
.get(0);
235 final int numWriters
= 3;
236 Thread zombie
= new ZombieLastLogWriterRegionServer(counter
, stop
, region
, numWriters
);
238 long startCount
= counter
.get();
240 // Wait till writer starts going.
241 while (startCount
== counter
.get()) Threads
.sleep(1);
242 // Give it a second to write a few appends.
244 final Configuration conf2
= HBaseConfiguration
.create(conf
);
245 final User robber
= User
.createUserForTesting(conf2
, ROBBER
, GROUP
);
246 int count
= robber
.runAs(new PrivilegedExceptionAction
<Integer
>() {
248 public Integer
run() throws Exception
{
249 StringBuilder ls
= new StringBuilder("Contents of WALDIR (").append(WALDIR
)
251 for (FileStatus status
: fs
.listStatus(WALDIR
)) {
252 ls
.append("\t").append(status
.toString()).append("\n");
254 LOG
.debug(Objects
.toString(ls
));
255 LOG
.info("Splitting WALs out from under zombie. Expecting " + numWriters
+ " files.");
256 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf2
, wals
);
257 LOG
.info("Finished splitting out from under zombie.");
258 Path
[] logfiles
= getLogForRegion(TABLE_NAME
, region
);
259 assertEquals("wrong number of split files for region", numWriters
, logfiles
.length
);
261 for (Path logfile
: logfiles
) {
262 count
+= countWAL(logfile
);
267 LOG
.info("zombie=" + counter
.get() + ", robber=" + count
);
268 assertTrue("The log file could have at most 1 extra log entry, but can't have less. " +
269 "Zombie could write " + counter
.get() + " and logfile had only " + count
,
270 counter
.get() == count
|| counter
.get() + 1 == count
);
274 Threads
.threadDumpingIsAlive(zombie
);
279 * This thread will keep writing to a 'wal' file even after the split process has started.
280 * It simulates a region server that was considered dead but woke up and wrote some more to the
281 * last log entry. Does its writing as an alternate user in another filesystem instance to
282 * simulate better it being a regionserver.
284 class ZombieLastLogWriterRegionServer
extends Thread
{
285 final AtomicLong editsCount
;
286 final AtomicBoolean stop
;
287 final int numOfWriters
;
289 * Region to write edits for.
294 public ZombieLastLogWriterRegionServer(AtomicLong counter
, AtomicBoolean stop
,
295 final String region
, final int writers
)
296 throws IOException
, InterruptedException
{
297 super("ZombieLastLogWriterRegionServer");
300 this.editsCount
= counter
;
301 this.region
= region
;
302 this.user
= User
.createUserForTesting(conf
, ZOMBIE
, GROUP
);
303 numOfWriters
= writers
;
310 } catch (IOException e
) {
311 LOG
.warn(getName() + " Writer exiting " + e
);
312 } catch (InterruptedException e
) {
313 LOG
.warn(getName() + " Writer exiting " + e
);
317 private void doWriting() throws IOException
, InterruptedException
{
318 this.user
.runAs(new PrivilegedExceptionAction
<Object
>() {
320 public Object
run() throws Exception
{
321 // Index of the WAL we want to keep open. generateWALs will leave open the WAL whose
322 // index we supply here.
323 int walToKeepOpen
= numOfWriters
- 1;
324 // The below method writes numOfWriters files each with ENTRIES entries for a total of
325 // numOfWriters * ENTRIES added per column family in the region.
326 Writer writer
= null;
328 writer
= generateWALs(numOfWriters
, ENTRIES
, walToKeepOpen
);
329 } catch (IOException e1
) {
330 throw new RuntimeException("Failed", e1
);
332 // Update counter so has all edits written so far.
333 editsCount
.addAndGet(numOfWriters
* ENTRIES
);
335 // If we've been interruped, then things should have shifted out from under us.
336 // closing should error
339 fail("Writing closing after parsing should give an error.");
340 } catch (IOException exception
) {
341 LOG
.debug("ignoring error when closing final writer.", exception
);
348 private void loop(final Writer writer
) {
349 byte [] regionBytes
= Bytes
.toBytes(this.region
);
350 while (!stop
.get()) {
352 long seq
= appendEntry(writer
, TABLE_NAME
, regionBytes
,
353 Bytes
.toBytes("r" + editsCount
.get()), regionBytes
, QUALIFIER
, VALUE
, 0);
354 long count
= editsCount
.incrementAndGet();
355 LOG
.info(getName() + " sync count=" + count
+ ", seq=" + seq
);
358 } catch (InterruptedException e
) {
361 } catch (IOException ex
) {
362 LOG
.error(getName() + " ex " + ex
.toString());
363 if (ex
instanceof RemoteException
) {
364 LOG
.error("Juliet: got RemoteException " + ex
.getMessage() +
365 " while writing " + (editsCount
.get() + 1));
367 LOG
.error(getName() + " failed to write....at " + editsCount
.get());
368 fail("Failed to write " + editsCount
.get());
371 } catch (Throwable t
) {
372 LOG
.error(getName() + " HOW? " + t
);
373 LOG
.debug("exception details", t
);
377 LOG
.info(getName() + " Writer exiting");
382 * {@see https://issues.apache.org/jira/browse/HBASE-3020}
385 public void testRecoveredEditsPathForMeta() throws IOException
{
386 byte[] encoded
= RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes();
387 Path tdir
= FSUtils
.getTableDir(HBASEDIR
, TableName
.META_TABLE_NAME
);
388 Path regiondir
= new Path(tdir
,
389 RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
390 fs
.mkdirs(regiondir
);
391 long now
= System
.currentTimeMillis();
393 new Entry(new WALKeyImpl(encoded
,
394 TableName
.META_TABLE_NAME
, 1, now
, HConstants
.DEFAULT_CLUSTER_ID
),
396 Path p
= WALSplitter
.getRegionSplitEditsPath(entry
,
397 FILENAME_BEING_SPLIT
, TMPDIRNAME
, conf
);
398 String parentOfParent
= p
.getParent().getParent().getName();
399 assertEquals(parentOfParent
, RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
403 * Test old recovered edits file doesn't break WALSplitter.
404 * This is useful in upgrading old instances.
407 public void testOldRecoveredEditsFileSidelined() throws IOException
{
408 byte [] encoded
= RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes();
409 Path tdir
= FSUtils
.getTableDir(HBASEDIR
, TableName
.META_TABLE_NAME
);
410 Path regiondir
= new Path(tdir
,
411 RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
412 fs
.mkdirs(regiondir
);
413 long now
= System
.currentTimeMillis();
415 new Entry(new WALKeyImpl(encoded
,
416 TableName
.META_TABLE_NAME
, 1, now
, HConstants
.DEFAULT_CLUSTER_ID
),
418 Path parent
= WALSplitter
.getRegionDirRecoveredEditsDir(regiondir
);
419 assertEquals(HConstants
.RECOVERED_EDITS_DIR
, parent
.getName());
420 fs
.createNewFile(parent
); // create a recovered.edits file
422 Path p
= WALSplitter
.getRegionSplitEditsPath(entry
,
423 FILENAME_BEING_SPLIT
, TMPDIRNAME
, conf
);
424 String parentOfParent
= p
.getParent().getParent().getName();
425 assertEquals(parentOfParent
, RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedName());
426 WALFactory
.createRecoveredEditsWriter(fs
, p
, conf
).close();
429 private void useDifferentDFSClient() throws IOException
{
430 // make fs act as a different client now
431 // initialize will create a new DFSClient with a new client ID
432 fs
.initialize(fs
.getUri(), conf
);
436 public void testSplitPreservesEdits() throws IOException
{
437 final String REGION
= "region__1";
441 generateWALs(1, 10, -1, 0);
442 useDifferentDFSClient();
443 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
444 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
445 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
446 assertEquals(1, splitLog
.length
);
448 assertTrue("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
452 public void testSplitRemovesRegionEventsEdits() throws IOException
{
453 final String REGION
= "region__1";
457 generateWALs(1, 10, -1, 100);
458 useDifferentDFSClient();
459 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
460 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
461 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
462 assertEquals(1, splitLog
.length
);
464 assertFalse("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
465 // split log should only have the test edits
466 assertEquals(10, countWAL(splitLog
[0]));
471 public void testSplitLeavesCompactionEventsEdits() throws IOException
{
472 RegionInfo hri
= RegionInfoBuilder
.newBuilder(TABLE_NAME
).build();
474 REGIONS
.add(hri
.getEncodedName());
475 Path regionDir
= new Path(FSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
), hri
.getEncodedName());
476 LOG
.info("Creating region directory: " + regionDir
);
477 assertTrue(fs
.mkdirs(regionDir
));
479 Writer writer
= generateWALs(1, 10, 0, 10);
480 String
[] compactInputs
= new String
[]{"file1", "file2", "file3"};
481 String compactOutput
= "file4";
482 appendCompactionEvent(writer
, hri
, compactInputs
, compactOutput
);
485 useDifferentDFSClient();
486 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
488 Path originalLog
= (fs
.listStatus(OLDLOGDIR
))[0].getPath();
489 // original log should have 10 test edits, 10 region markers, 1 compaction marker
490 assertEquals(21, countWAL(originalLog
));
492 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, hri
.getEncodedName());
493 assertEquals(1, splitLog
.length
);
495 assertFalse("edits differ after split", logsAreEqual(originalLog
, splitLog
[0]));
496 // split log should have 10 test edits plus 1 compaction marker
497 assertEquals(11, countWAL(splitLog
[0]));
501 * @param expectedEntries -1 to not assert
502 * @return the count across all regions
504 private int splitAndCount(final int expectedFiles
, final int expectedEntries
)
506 useDifferentDFSClient();
507 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
509 for (String region
: REGIONS
) {
510 Path
[] logfiles
= getLogForRegion(TABLE_NAME
, region
);
511 assertEquals(expectedFiles
, logfiles
.length
);
513 for (Path logfile
: logfiles
) {
514 count
+= countWAL(logfile
);
516 if (-1 != expectedEntries
) {
517 assertEquals(expectedEntries
, count
);
525 public void testEmptyLogFiles() throws IOException
{
526 testEmptyLogFiles(true);
530 public void testEmptyOpenLogFiles() throws IOException
{
531 testEmptyLogFiles(false);
534 private void testEmptyLogFiles(final boolean close
) throws IOException
{
535 // we won't create the hlog dir until getWAL got called, so
536 // make dir here when testing empty log file
538 injectEmptyFile(".empty", close
);
539 generateWALs(Integer
.MAX_VALUE
);
540 injectEmptyFile("empty", close
);
541 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
); // skip 2 empty
545 public void testOpenZeroLengthReportedFileButWithDataGetsSplit() throws IOException
{
546 // generate logs but leave wal.dat.5 open.
548 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
552 public void testTralingGarbageCorruptionFileSkipErrorsPasses() throws IOException
{
553 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
554 generateWALs(Integer
.MAX_VALUE
);
555 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
556 Corruptions
.APPEND_GARBAGE
, true);
557 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
561 public void testFirstLineCorruptionLogFileSkipErrorsPasses() throws IOException
{
562 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
563 generateWALs(Integer
.MAX_VALUE
);
564 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
565 Corruptions
.INSERT_GARBAGE_ON_FIRST_LINE
, true);
566 splitAndCount(NUM_WRITERS
- 1, (NUM_WRITERS
- 1) * ENTRIES
); //1 corrupt
570 public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException
{
571 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
572 generateWALs(Integer
.MAX_VALUE
);
573 corruptWAL(new Path(WALDIR
, WAL_FILE_PREFIX
+ "5"),
574 Corruptions
.INSERT_GARBAGE_IN_THE_MIDDLE
, false);
575 // the entries in the original logs are alternating regions
576 // considering the sequence file header, the middle corruption should
577 // affect at least half of the entries
578 int goodEntries
= (NUM_WRITERS
- 1) * ENTRIES
;
579 int firstHalfEntries
= (int) Math
.ceil(ENTRIES
/ 2) - 1;
580 int allRegionsCount
= splitAndCount(NUM_WRITERS
, -1);
581 assertTrue("The file up to the corrupted area hasn't been parsed",
582 REGIONS
.size() * (goodEntries
+ firstHalfEntries
) <= allRegionsCount
);
586 public void testCorruptedFileGetsArchivedIfSkipErrors() throws IOException
{
587 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
588 List
<FaultyProtobufLogReader
.FailureType
> failureTypes
= Arrays
589 .asList(FaultyProtobufLogReader
.FailureType
.values()).stream()
590 .filter(x
-> x
!= FaultyProtobufLogReader
.FailureType
.NONE
).collect(Collectors
.toList());
591 for (FaultyProtobufLogReader
.FailureType failureType
: failureTypes
) {
592 final Set
<String
> walDirContents
= splitCorruptWALs(failureType
);
593 final Set
<String
> archivedLogs
= new HashSet
<>();
594 final StringBuilder archived
= new StringBuilder("Archived logs in CORRUPTDIR:");
595 for (FileStatus log
: fs
.listStatus(CORRUPTDIR
)) {
596 archived
.append("\n\t").append(log
.toString());
597 archivedLogs
.add(log
.getPath().getName());
599 LOG
.debug(archived
.toString());
600 assertEquals(failureType
.name() + ": expected to find all of our wals corrupt.", archivedLogs
,
606 * @return set of wal names present prior to split attempt.
607 * @throws IOException if the split process fails
609 private Set
<String
> splitCorruptWALs(final FaultyProtobufLogReader
.FailureType failureType
)
611 Class
<?
> backupClass
= conf
.getClass("hbase.regionserver.hlog.reader.impl",
613 InstrumentedLogWriter
.activateFailure
= false;
616 conf
.setClass("hbase.regionserver.hlog.reader.impl", FaultyProtobufLogReader
.class,
618 conf
.set("faultyprotobuflogreader.failuretype", failureType
.name());
619 // Clean up from previous tests or previous loop
622 } catch (IOException exception
) {
623 // since we're splitting out from under the factory, we should expect some closing failures.
624 LOG
.debug("Ignoring problem closing WALFactory.", exception
);
628 for (FileStatus log
: fs
.listStatus(CORRUPTDIR
)) {
629 fs
.delete(log
.getPath(), true);
631 } catch (FileNotFoundException exception
) {
632 LOG
.debug("no previous CORRUPTDIR to clean.");
634 // change to the faulty reader
635 wals
= new WALFactory(conf
, name
.getMethodName());
637 // Our reader will render all of these files corrupt.
638 final Set
<String
> walDirContents
= new HashSet
<>();
639 for (FileStatus status
: fs
.listStatus(WALDIR
)) {
640 walDirContents
.add(status
.getPath().getName());
642 useDifferentDFSClient();
643 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
644 return walDirContents
;
646 conf
.setClass("hbase.regionserver.hlog.reader.impl", backupClass
,
651 @Test (expected
= IOException
.class)
652 public void testTrailingGarbageCorruptionLogFileSkipErrorsFalseThrows()
654 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
655 splitCorruptWALs(FaultyProtobufLogReader
.FailureType
.BEGINNING
);
659 public void testCorruptedLogFilesSkipErrorsFalseDoesNotTouchLogs()
661 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
663 splitCorruptWALs(FaultyProtobufLogReader
.FailureType
.BEGINNING
);
664 } catch (IOException e
) {
665 LOG
.debug("split with 'skip errors' set to 'false' correctly threw");
667 assertEquals("if skip.errors is false all files should remain in place",
668 NUM_WRITERS
, fs
.listStatus(WALDIR
).length
);
671 private void ignoreCorruption(final Corruptions corruption
, final int entryCount
,
672 final int expectedCount
) throws IOException
{
673 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
675 final String REGION
= "region__1";
679 Path c1
= new Path(WALDIR
, WAL_FILE_PREFIX
+ "0");
680 generateWALs(1, entryCount
, -1, 0);
681 corruptWAL(c1
, corruption
, true);
683 useDifferentDFSClient();
684 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
686 Path
[] splitLog
= getLogForRegion(TABLE_NAME
, REGION
);
687 assertEquals(1, splitLog
.length
);
690 Reader in
= wals
.createReader(fs
, splitLog
[0]);
691 @SuppressWarnings("unused")
693 while ((entry
= in
.next()) != null) ++actualCount
;
694 assertEquals(expectedCount
, actualCount
);
697 // should not have stored the EOF files as corrupt
698 FileStatus
[] archivedLogs
= fs
.listStatus(CORRUPTDIR
);
699 assertEquals(0, archivedLogs
.length
);
704 public void testEOFisIgnored() throws IOException
{
706 ignoreCorruption(Corruptions
.TRUNCATE
, entryCount
, entryCount
-1);
710 public void testCorruptWALTrailer() throws IOException
{
712 ignoreCorruption(Corruptions
.TRUNCATE_TRAILER
, entryCount
, entryCount
);
716 public void testLogsGetArchivedAfterSplit() throws IOException
{
717 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
719 useDifferentDFSClient();
720 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
721 FileStatus
[] archivedLogs
= fs
.listStatus(OLDLOGDIR
);
722 assertEquals("wrong number of files in the archive log", NUM_WRITERS
, archivedLogs
.length
);
726 public void testSplit() throws IOException
{
728 splitAndCount(NUM_WRITERS
, NUM_WRITERS
* ENTRIES
);
732 public void testLogDirectoryShouldBeDeletedAfterSuccessfulSplit()
735 useDifferentDFSClient();
736 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
737 FileStatus
[] statuses
= null;
739 statuses
= fs
.listStatus(WALDIR
);
740 if (statuses
!= null) {
741 fail("Files left in log dir: " +
742 Joiner
.on(",").join(FileUtil
.stat2Paths(statuses
)));
744 } catch (FileNotFoundException e
) {
745 // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
749 @Test(expected
= IOException
.class)
750 public void testSplitWillFailIfWritingToRegionFails() throws Exception
{
751 //leave 5th log open so we could append the "trap"
752 Writer writer
= generateWALs(4);
753 useDifferentDFSClient();
755 String region
= "break";
756 Path regiondir
= new Path(TABLEDIR
, region
);
757 fs
.mkdirs(regiondir
);
759 InstrumentedLogWriter
.activateFailure
= false;
760 appendEntry(writer
, TABLE_NAME
, Bytes
.toBytes(region
),
761 Bytes
.toBytes("r" + 999), FAMILY
, QUALIFIER
, VALUE
, 0);
765 InstrumentedLogWriter
.activateFailure
= true;
766 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
767 } catch (IOException e
) {
768 assertTrue(e
.getMessage().
769 contains("This exception is instrumented and should only be thrown for testing"));
772 InstrumentedLogWriter
.activateFailure
= false;
777 public void testSplitDeletedRegion() throws IOException
{
779 String region
= "region_that_splits";
783 useDifferentDFSClient();
785 Path regiondir
= new Path(TABLEDIR
, region
);
786 fs
.delete(regiondir
, true);
787 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
788 assertFalse(fs
.exists(regiondir
));
792 public void testIOEOnOutputThread() throws Exception
{
793 conf
.setBoolean(HBASE_SKIP_ERRORS
, false);
796 useDifferentDFSClient();
797 FileStatus
[] logfiles
= fs
.listStatus(WALDIR
);
798 assertTrue("There should be some log file",
799 logfiles
!= null && logfiles
.length
> 0);
800 // wals with no entries (like the one we don't use in the factory)
801 // won't cause a failure since nothing will ever be written.
802 // pick the largest one since it's most likely to have entries.
803 int largestLogFile
= 0;
804 long largestSize
= 0;
805 for (int i
= 0; i
< logfiles
.length
; i
++) {
806 if (logfiles
[i
].getLen() > largestSize
) {
808 largestSize
= logfiles
[i
].getLen();
811 assertTrue("There should be some log greater than size 0.", 0 < largestSize
);
812 // Set up a splitter that will throw an IOE on the output side
813 WALSplitter logSplitter
= new WALSplitter(wals
, conf
, HBASEDIR
, fs
, null, null) {
815 protected Writer
createWriter(Path logfile
) throws IOException
{
816 Writer mockWriter
= Mockito
.mock(Writer
.class);
817 Mockito
.doThrow(new IOException("Injected")).when(
818 mockWriter
).append(Mockito
.<Entry
>any());
822 // Set up a background thread dumper. Needs a thread to depend on and then we need to run
823 // the thread dumping in a background thread so it does not hold up the test.
824 final AtomicBoolean stop
= new AtomicBoolean(false);
825 final Thread someOldThread
= new Thread("Some-old-thread") {
828 while(!stop
.get()) Threads
.sleep(10);
831 someOldThread
.setDaemon(true);
832 someOldThread
.start();
833 final Thread t
= new Thread("Background-thread-dumper") {
837 Threads
.threadDumpingIsAlive(someOldThread
);
838 } catch (InterruptedException e
) {
846 logSplitter
.splitLogFile(logfiles
[largestLogFile
], null);
847 fail("Didn't throw!");
848 } catch (IOException ioe
) {
849 assertTrue(ioe
.toString().contains("Injected"));
851 // Setting this to true will turn off the background thread dumper.
857 * @param spiedFs should be instrumented for failure.
859 private void retryOverHdfsProblem(final FileSystem spiedFs
) throws Exception
{
861 useDifferentDFSClient();
864 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, spiedFs
, conf
, wals
);
865 assertEquals(NUM_WRITERS
, fs
.listStatus(OLDLOGDIR
).length
);
866 assertFalse(fs
.exists(WALDIR
));
867 } catch (IOException e
) {
868 fail("There shouldn't be any exception but: " + e
.toString());
872 // Test for HBASE-3412
874 public void testMovedWALDuringRecovery() throws Exception
{
875 // This partial mock will throw LEE for every file simulating
876 // files that were moved
877 FileSystem spiedFs
= Mockito
.spy(fs
);
878 // The "File does not exist" part is very important,
879 // that's how it comes out of HDFS
880 Mockito
.doThrow(new LeaseExpiredException("Injected: File does not exist")).
881 when(spiedFs
).append(Mockito
.<Path
>any());
882 retryOverHdfsProblem(spiedFs
);
886 public void testRetryOpenDuringRecovery() throws Exception
{
887 FileSystem spiedFs
= Mockito
.spy(fs
);
888 // The "Cannot obtain block length", "Could not obtain the last block",
889 // and "Blocklist for [^ ]* has changed.*" part is very important,
890 // that's how it comes out of HDFS. If HDFS changes the exception
891 // message, this test needs to be adjusted accordingly.
893 // When DFSClient tries to open a file, HDFS needs to locate
894 // the last block of the file and get its length. However, if the
895 // last block is under recovery, HDFS may have problem to obtain
896 // the block length, in which case, retry may help.
897 Mockito
.doAnswer(new Answer
<FSDataInputStream
>() {
898 private final String
[] errors
= new String
[] {
899 "Cannot obtain block length", "Could not obtain the last block",
900 "Blocklist for " + OLDLOGDIR
+ " has changed"};
901 private int count
= 0;
904 public FSDataInputStream
answer(InvocationOnMock invocation
) throws Throwable
{
906 throw new IOException(errors
[count
++]);
908 return (FSDataInputStream
)invocation
.callRealMethod();
910 }).when(spiedFs
).open(Mockito
.<Path
>any(), Mockito
.anyInt());
911 retryOverHdfsProblem(spiedFs
);
915 public void testTerminationAskedByReporter() throws IOException
, CorruptedLogFileException
{
916 generateWALs(1, 10, -1);
917 FileStatus logfile
= fs
.listStatus(WALDIR
)[0];
918 useDifferentDFSClient();
920 final AtomicInteger count
= new AtomicInteger();
922 CancelableProgressable localReporter
923 = new CancelableProgressable() {
925 public boolean progress() {
926 count
.getAndIncrement();
931 FileSystem spiedFs
= Mockito
.spy(fs
);
932 Mockito
.doAnswer(new Answer
<FSDataInputStream
>() {
934 public FSDataInputStream
answer(InvocationOnMock invocation
) throws Throwable
{
935 Thread
.sleep(1500); // Sleep a while and wait report status invoked
936 return (FSDataInputStream
)invocation
.callRealMethod();
938 }).when(spiedFs
).open(Mockito
.<Path
>any(), Mockito
.anyInt());
941 conf
.setInt("hbase.splitlog.report.period", 1000);
942 boolean ret
= WALSplitter
.splitLogFile(
943 HBASEDIR
, logfile
, spiedFs
, conf
, localReporter
, null, null, wals
);
944 assertFalse("Log splitting should failed", ret
);
945 assertTrue(count
.get() > 0);
946 } catch (IOException e
) {
947 fail("There shouldn't be any exception but: " + e
.toString());
949 // reset it back to its default value
950 conf
.setInt("hbase.splitlog.report.period", 59000);
955 * Test log split process with fake data and lots of edits to trigger threading
959 public void testThreading() throws Exception
{
960 doTestThreading(20000, 128*1024*1024, 0);
964 * Test blocking behavior of the log split process if writers are writing slower
965 * than the reader is reading.
968 public void testThreadingSlowWriterSmallBuffer() throws Exception
{
969 doTestThreading(200, 1024, 50);
973 * Sets up a log splitter with a mock reader and writer. The mock reader generates
974 * a specified number of edits spread across 5 regions. The mock writer optionally
975 * sleeps for each edit it is fed.
977 * After the split is complete, verifies that the statistics show the correct number
978 * of edits output into each region.
980 * @param numFakeEdits number of fake edits to push through pipeline
981 * @param bufferSize size of in-memory buffer
982 * @param writerSlowness writer threads will sleep this many ms per edit
984 private void doTestThreading(final int numFakeEdits
,
985 final int bufferSize
,
986 final int writerSlowness
) throws Exception
{
988 Configuration localConf
= new Configuration(conf
);
989 localConf
.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize
);
991 // Create a fake log file (we'll override the reader to produce a stream of edits)
992 Path logPath
= new Path(WALDIR
, WAL_FILE_PREFIX
+ ".fake");
993 FSDataOutputStream out
= fs
.create(logPath
);
996 // Make region dirs for our destination regions so the output doesn't get skipped
997 final List
<String
> regions
= ImmutableList
.of("r0", "r1", "r2", "r3", "r4");
998 makeRegionDirs(regions
);
1000 // Create a splitter that reads and writes the data without touching disk
1001 WALSplitter logSplitter
= new WALSplitter(wals
, localConf
, HBASEDIR
, fs
, null, null) {
1003 /* Produce a mock writer that doesn't write anywhere */
1005 protected Writer
createWriter(Path logfile
) throws IOException
{
1006 Writer mockWriter
= Mockito
.mock(Writer
.class);
1007 Mockito
.doAnswer(new Answer
<Void
>() {
1008 int expectedIndex
= 0;
1011 public Void
answer(InvocationOnMock invocation
) {
1012 if (writerSlowness
> 0) {
1014 Thread
.sleep(writerSlowness
);
1015 } catch (InterruptedException ie
) {
1016 Thread
.currentThread().interrupt();
1019 Entry entry
= (Entry
) invocation
.getArgument(0);
1020 WALEdit edit
= entry
.getEdit();
1021 List
<Cell
> cells
= edit
.getCells();
1022 assertEquals(1, cells
.size());
1023 Cell cell
= cells
.get(0);
1025 // Check that the edits come in the right order.
1026 assertEquals(expectedIndex
, Bytes
.toInt(cell
.getRowArray(), cell
.getRowOffset(),
1027 cell
.getRowLength()));
1031 }).when(mockWriter
).append(Mockito
.<Entry
>any());
1035 /* Produce a mock reader that generates fake entries */
1037 protected Reader
getReader(Path curLogFile
, CancelableProgressable reporter
)
1038 throws IOException
{
1039 Reader mockReader
= Mockito
.mock(Reader
.class);
1040 Mockito
.doAnswer(new Answer
<Entry
>() {
1044 public Entry
answer(InvocationOnMock invocation
) throws Throwable
{
1045 if (index
>= numFakeEdits
) return null;
1047 // Generate r0 through r4 in round robin fashion
1048 int regionIdx
= index
% regions
.size();
1049 byte region
[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx
)};
1051 Entry ret
= createTestEntry(TABLE_NAME
, region
,
1052 Bytes
.toBytes(index
/ regions
.size()),
1053 FAMILY
, QUALIFIER
, VALUE
, index
);
1057 }).when(mockReader
).next();
1062 logSplitter
.splitLogFile(fs
.getFileStatus(logPath
), null);
1064 // Verify number of written edits per region
1065 Map
<byte[], Long
> outputCounts
= logSplitter
.outputSink
.getOutputCounts();
1066 for (Map
.Entry
<byte[], Long
> entry
: outputCounts
.entrySet()) {
1067 LOG
.info("Got " + entry
.getValue() + " output edits for region " +
1068 Bytes
.toString(entry
.getKey()));
1069 assertEquals((long)entry
.getValue(), numFakeEdits
/ regions
.size());
1071 assertEquals("Should have as many outputs as regions", regions
.size(), outputCounts
.size());
1074 // Does leaving the writer open in testSplitDeletedRegion matter enough for two tests?
1076 public void testSplitLogFileDeletedRegionDir() throws IOException
{
1077 LOG
.info("testSplitLogFileDeletedRegionDir");
1078 final String REGION
= "region__1";
1080 REGIONS
.add(REGION
);
1082 generateWALs(1, 10, -1);
1083 useDifferentDFSClient();
1085 Path regiondir
= new Path(TABLEDIR
, REGION
);
1086 LOG
.info("Region directory is" + regiondir
);
1087 fs
.delete(regiondir
, true);
1088 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1089 assertFalse(fs
.exists(regiondir
));
1093 public void testSplitLogFileEmpty() throws IOException
{
1094 LOG
.info("testSplitLogFileEmpty");
1095 // we won't create the hlog dir until getWAL got called, so
1096 // make dir here when testing empty log file
1098 injectEmptyFile(".empty", true);
1099 useDifferentDFSClient();
1101 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1102 Path tdir
= FSUtils
.getTableDir(HBASEDIR
, TABLE_NAME
);
1103 assertFalse(fs
.exists(tdir
));
1105 assertEquals(0, countWAL(fs
.listStatus(OLDLOGDIR
)[0].getPath()));
1109 public void testSplitLogFileMultipleRegions() throws IOException
{
1110 LOG
.info("testSplitLogFileMultipleRegions");
1111 generateWALs(1, 10, -1);
1112 splitAndCount(1, 10);
1116 public void testSplitLogFileFirstLineCorruptionLog()
1117 throws IOException
{
1118 conf
.setBoolean(HBASE_SKIP_ERRORS
, true);
1119 generateWALs(1, 10, -1);
1120 FileStatus logfile
= fs
.listStatus(WALDIR
)[0];
1122 corruptWAL(logfile
.getPath(),
1123 Corruptions
.INSERT_GARBAGE_ON_FIRST_LINE
, true);
1125 useDifferentDFSClient();
1126 WALSplitter
.split(HBASELOGDIR
, WALDIR
, OLDLOGDIR
, fs
, conf
, wals
);
1128 final Path corruptDir
= new Path(FSUtils
.getWALRootDir(conf
), HConstants
.CORRUPT_DIR_NAME
);
1129 assertEquals(1, fs
.listStatus(corruptDir
).length
);
1133 * {@see https://issues.apache.org/jira/browse/HBASE-4862}
1136 public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException
{
1137 LOG
.info("testConcurrentSplitLogAndReplayRecoverEdit");
1138 // Generate wals for our destination region
1139 String regionName
= "r0";
1140 final Path regiondir
= new Path(TABLEDIR
, regionName
);
1142 REGIONS
.add(regionName
);
1146 FileStatus
[] logfiles
= fs
.listStatus(WALDIR
);
1147 assertTrue("There should be some log file",
1148 logfiles
!= null && logfiles
.length
> 0);
1150 WALSplitter logSplitter
= new WALSplitter(wals
, conf
, HBASEDIR
, fs
, null, null) {
1152 protected Writer
createWriter(Path logfile
)
1153 throws IOException
{
1154 Writer writer
= wals
.createRecoveredEditsWriter(this.walFS
, logfile
);
1155 // After creating writer, simulate region's
1156 // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
1157 // region and delete them, excluding files with '.temp' suffix.
1158 NavigableSet
<Path
> files
= WALSplitter
.getSplitEditFilesSorted(fs
, regiondir
);
1159 if (files
!= null && !files
.isEmpty()) {
1160 for (Path file
: files
) {
1161 if (!this.walFS
.delete(file
, false)) {
1162 LOG
.error("Failed delete of " + file
);
1164 LOG
.debug("Deleted recovered.edits file=" + file
);
1172 logSplitter
.splitLogFile(logfiles
[0], null);
1173 } catch (IOException e
) {
1174 LOG
.info(e
.toString(), e
);
1175 fail("Throws IOException when spliting "
1176 + "log, it is most likely because writing file does not "
1177 + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
1179 if (fs
.exists(CORRUPTDIR
)) {
1180 if (fs
.listStatus(CORRUPTDIR
).length
> 0) {
1181 fail("There are some corrupt logs, "
1182 + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
1187 private Writer
generateWALs(int leaveOpen
) throws IOException
{
1188 return generateWALs(NUM_WRITERS
, ENTRIES
, leaveOpen
, 0);
1191 private Writer
generateWALs(int writers
, int entries
, int leaveOpen
) throws IOException
{
1192 return generateWALs(writers
, entries
, leaveOpen
, 7);
1195 private void makeRegionDirs(List
<String
> regions
) throws IOException
{
1196 for (String region
: regions
) {
1197 LOG
.debug("Creating dir for region " + region
);
1198 fs
.mkdirs(new Path(TABLEDIR
, region
));
1203 * @param leaveOpen index to leave un-closed. -1 to close all.
1204 * @return the writer that's still open, or null if all were closed.
1206 private Writer
generateWALs(int writers
, int entries
, int leaveOpen
, int regionEvents
) throws IOException
{
1207 makeRegionDirs(REGIONS
);
1209 Writer
[] ws
= new Writer
[writers
];
1211 int numRegionEventsAdded
= 0;
1212 for (int i
= 0; i
< writers
; i
++) {
1213 ws
[i
] = wals
.createWALWriter(fs
, new Path(WALDIR
, WAL_FILE_PREFIX
+ i
));
1214 for (int j
= 0; j
< entries
; j
++) {
1216 for (String region
: REGIONS
) {
1217 String row_key
= region
+ prefix
++ + i
+ j
;
1218 appendEntry(ws
[i
], TABLE_NAME
, Bytes
.toBytes(region
), Bytes
.toBytes(row_key
), FAMILY
,
1219 QUALIFIER
, VALUE
, seq
++);
1221 if (numRegionEventsAdded
< regionEvents
) {
1222 numRegionEventsAdded
++;
1223 appendRegionEvent(ws
[i
], region
);
1227 if (i
!= leaveOpen
) {
1229 LOG
.info("Closing writer " + i
);
1232 if (leaveOpen
< 0 || leaveOpen
>= writers
) {
1235 return ws
[leaveOpen
];
1240 private Path
[] getLogForRegion(TableName table
, String region
)
1241 throws IOException
{
1242 Path tdir
= FSUtils
.getWALTableDir(conf
, table
);
1243 @SuppressWarnings("deprecation")
1244 Path editsdir
= WALSplitter
.getRegionDirRecoveredEditsDir(HRegion
.getRegionDir(tdir
,
1245 Bytes
.toString(Bytes
.toBytes(region
))));
1246 FileStatus
[] files
= fs
.listStatus(editsdir
, new PathFilter() {
1248 public boolean accept(Path p
) {
1249 if (WALSplitter
.isSequenceIdFile(p
)) {
1255 Path
[] paths
= new Path
[files
.length
];
1256 for (int i
= 0; i
< files
.length
; i
++) {
1257 paths
[i
] = files
[i
].getPath();
1262 private void corruptWAL(Path path
, Corruptions corruption
, boolean close
) throws IOException
{
1263 FSDataOutputStream out
;
1264 int fileSize
= (int) fs
.listStatus(path
)[0].getLen();
1266 FSDataInputStream in
= fs
.open(path
);
1267 byte[] corrupted_bytes
= new byte[fileSize
];
1268 in
.readFully(0, corrupted_bytes
, 0, fileSize
);
1271 switch (corruption
) {
1272 case APPEND_GARBAGE
:
1273 fs
.delete(path
, false);
1274 out
= fs
.create(path
);
1275 out
.write(corrupted_bytes
);
1276 out
.write(Bytes
.toBytes("-----"));
1277 closeOrFlush(close
, out
);
1280 case INSERT_GARBAGE_ON_FIRST_LINE
:
1281 fs
.delete(path
, false);
1282 out
= fs
.create(path
);
1284 out
.write(corrupted_bytes
);
1285 closeOrFlush(close
, out
);
1288 case INSERT_GARBAGE_IN_THE_MIDDLE
:
1289 fs
.delete(path
, false);
1290 out
= fs
.create(path
);
1291 int middle
= (int) Math
.floor(corrupted_bytes
.length
/ 2);
1292 out
.write(corrupted_bytes
, 0, middle
);
1294 out
.write(corrupted_bytes
, middle
, corrupted_bytes
.length
- middle
);
1295 closeOrFlush(close
, out
);
1299 fs
.delete(path
, false);
1300 out
= fs
.create(path
);
1301 out
.write(corrupted_bytes
, 0, fileSize
1302 - (32 + ProtobufLogReader
.PB_WAL_COMPLETE_MAGIC
.length
+ Bytes
.SIZEOF_INT
));
1303 closeOrFlush(close
, out
);
1306 case TRUNCATE_TRAILER
:
1307 fs
.delete(path
, false);
1308 out
= fs
.create(path
);
1309 out
.write(corrupted_bytes
, 0, fileSize
- Bytes
.SIZEOF_INT
);// trailer is truncated.
1310 closeOrFlush(close
, out
);
1315 private void closeOrFlush(boolean close
, FSDataOutputStream out
)
1316 throws IOException
{
1320 Method syncMethod
= null;
1322 syncMethod
= out
.getClass().getMethod("hflush", new Class
<?
> []{});
1323 } catch (NoSuchMethodException e
) {
1325 syncMethod
= out
.getClass().getMethod("sync", new Class
<?
> []{});
1326 } catch (NoSuchMethodException ex
) {
1327 throw new IOException("This version of Hadoop supports " +
1328 "neither Syncable.sync() nor Syncable.hflush().");
1332 syncMethod
.invoke(out
, new Object
[]{});
1333 } catch (Exception e
) {
1334 throw new IOException(e
);
1336 // Not in 0out.hflush();
1340 private int countWAL(Path log
) throws IOException
{
1342 Reader in
= wals
.createReader(fs
, log
);
1343 while (in
.next() != null) {
1350 private static void appendCompactionEvent(Writer w
, RegionInfo hri
, String
[] inputs
,
1351 String output
) throws IOException
{
1352 WALProtos
.CompactionDescriptor
.Builder desc
= WALProtos
.CompactionDescriptor
.newBuilder();
1353 desc
.setTableName(ByteString
.copyFrom(hri
.getTable().toBytes()))
1354 .setEncodedRegionName(ByteString
.copyFrom(hri
.getEncodedNameAsBytes()))
1355 .setRegionName(ByteString
.copyFrom(hri
.getRegionName()))
1356 .setFamilyName(ByteString
.copyFrom(FAMILY
))
1357 .setStoreHomeDir(hri
.getEncodedName() + "/" + Bytes
.toString(FAMILY
))
1358 .addAllCompactionInput(Arrays
.asList(inputs
))
1359 .addCompactionOutput(output
);
1361 WALEdit edit
= WALEdit
.createCompaction(hri
, desc
.build());
1362 WALKeyImpl key
= new WALKeyImpl(hri
.getEncodedNameAsBytes(), TABLE_NAME
, 1,
1363 EnvironmentEdgeManager
.currentTime(), HConstants
.DEFAULT_CLUSTER_ID
);
1364 w
.append(new Entry(key
, edit
));
1368 private static void appendRegionEvent(Writer w
, String region
) throws IOException
{
1369 WALProtos
.RegionEventDescriptor regionOpenDesc
= ProtobufUtil
.toRegionEventDescriptor(
1370 WALProtos
.RegionEventDescriptor
.EventType
.REGION_OPEN
,
1371 TABLE_NAME
.toBytes(),
1372 Bytes
.toBytes(region
),
1373 Bytes
.toBytes(String
.valueOf(region
.hashCode())),
1375 ServerName
.parseServerName("ServerName:9099"), ImmutableMap
.<byte[], List
<Path
>>of());
1376 final long time
= EnvironmentEdgeManager
.currentTime();
1377 KeyValue kv
= new KeyValue(Bytes
.toBytes(region
), WALEdit
.METAFAMILY
, WALEdit
.REGION_EVENT
,
1378 time
, regionOpenDesc
.toByteArray());
1379 final WALKeyImpl walKey
= new WALKeyImpl(Bytes
.toBytes(region
), TABLE_NAME
, 1, time
,
1380 HConstants
.DEFAULT_CLUSTER_ID
);
1382 new Entry(walKey
, new WALEdit().add(kv
)));
1386 public static long appendEntry(Writer writer
, TableName table
, byte[] region
,
1387 byte[] row
, byte[] family
, byte[] qualifier
,
1388 byte[] value
, long seq
)
1389 throws IOException
{
1390 LOG
.info(Thread
.currentThread().getName() + " append");
1391 writer
.append(createTestEntry(table
, region
, row
, family
, qualifier
, value
, seq
));
1392 LOG
.info(Thread
.currentThread().getName() + " sync");
1397 private static Entry
createTestEntry(
1398 TableName table
, byte[] region
,
1399 byte[] row
, byte[] family
, byte[] qualifier
,
1400 byte[] value
, long seq
) {
1401 long time
= System
.nanoTime();
1404 final KeyValue cell
= new KeyValue(row
, family
, qualifier
, time
, KeyValue
.Type
.Put
, value
);
1405 WALEdit edit
= new WALEdit();
1407 return new Entry(new WALKeyImpl(region
, table
, seq
, time
,
1408 HConstants
.DEFAULT_CLUSTER_ID
), edit
);
1411 private void injectEmptyFile(String suffix
, boolean closeFile
) throws IOException
{
1413 WALFactory
.createWALWriter(fs
, new Path(WALDIR
, WAL_FILE_PREFIX
+ suffix
), conf
);
1419 private boolean logsAreEqual(Path p1
, Path p2
) throws IOException
{
1421 in1
= wals
.createReader(fs
, p1
);
1422 in2
= wals
.createReader(fs
, p2
);
1425 while ((entry1
= in1
.next()) != null) {
1426 entry2
= in2
.next();
1427 if ((entry1
.getKey().compareTo(entry2
.getKey()) != 0) ||
1428 (!entry1
.getEdit().toString().equals(entry2
.getEdit().toString()))) {