3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.wal
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractTestWALReplay
.addRegionEdits
;
22 import static org
.apache
.hadoop
.hbase
.wal
.BoundedRecoveredHFilesOutputSink
.WAL_SPLIT_TO_HFILE
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertNotNull
;
25 import static org
.junit
.Assert
.assertTrue
;
26 import static org
.junit
.Assert
.fail
;
27 import static org
.mockito
.Mockito
.when
;
29 import java
.io
.IOException
;
30 import java
.security
.PrivilegedExceptionAction
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Arrays
;
33 import java
.util
.List
;
35 import org
.apache
.hadoop
.conf
.Configuration
;
36 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
37 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
38 import org
.apache
.hadoop
.fs
.FileStatus
;
39 import org
.apache
.hadoop
.fs
.FileSystem
;
40 import org
.apache
.hadoop
.fs
.Path
;
41 import org
.apache
.hadoop
.hbase
.Cell
;
42 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
43 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
44 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
45 import org
.apache
.hadoop
.hbase
.HConstants
;
46 import org
.apache
.hadoop
.hbase
.ServerName
;
47 import org
.apache
.hadoop
.hbase
.TableName
;
48 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
49 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
50 import org
.apache
.hadoop
.hbase
.client
.Get
;
51 import org
.apache
.hadoop
.hbase
.client
.Put
;
52 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
53 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
54 import org
.apache
.hadoop
.hbase
.client
.Result
;
55 import org
.apache
.hadoop
.hbase
.client
.Scan
;
56 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
57 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
58 import org
.apache
.hadoop
.hbase
.io
.hfile
.CorruptHFileException
;
59 import org
.apache
.hadoop
.hbase
.regionserver
.DefaultStoreEngine
;
60 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractTestWALReplay
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FSHLog
;
65 import org
.apache
.hadoop
.hbase
.security
.User
;
66 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
67 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
68 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
69 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
70 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdge
;
71 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
72 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
73 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
74 import org
.apache
.hadoop
.hbase
.util
.Pair
;
75 import org
.junit
.After
;
76 import org
.junit
.AfterClass
;
77 import org
.junit
.Before
;
78 import org
.junit
.BeforeClass
;
79 import org
.junit
.ClassRule
;
80 import org
.junit
.Rule
;
81 import org
.junit
.Test
;
82 import org
.junit
.experimental
.categories
.Category
;
83 import org
.junit
.rules
.TestName
;
84 import org
.mockito
.Mockito
;
85 import org
.slf4j
.Logger
;
86 import org
.slf4j
.LoggerFactory
;
88 @Category({ RegionServerTests
.class, MediumTests
.class })
89 public class TestWALSplitToHFile
{
91 public static final HBaseClassTestRule CLASS_RULE
=
92 HBaseClassTestRule
.forClass(TestWALSplitToHFile
.class);
94 private static final Logger LOG
= LoggerFactory
.getLogger(AbstractTestWALReplay
.class);
95 static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
96 private final EnvironmentEdge ee
= EnvironmentEdgeManager
.getDelegate();
97 private Path rootDir
= null;
98 private String logName
;
99 private Path oldLogDir
;
101 private FileSystem fs
;
102 private Configuration conf
;
103 private WALFactory wals
;
105 private static final byte[] ROW
= Bytes
.toBytes("row");
106 private static final byte[] VALUE1
= Bytes
.toBytes("value1");
107 private static final byte[] VALUE2
= Bytes
.toBytes("value2");
108 private static final int countPerFamily
= 10;
111 public final TestName TEST_NAME
= new TestName();
114 public static void setUpBeforeClass() throws Exception
{
115 Configuration conf
= UTIL
.getConfiguration();
116 conf
.setBoolean(WAL_SPLIT_TO_HFILE
, true);
117 UTIL
.startMiniCluster(3);
118 Path hbaseRootDir
= UTIL
.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
119 LOG
.info("hbase.rootdir=" + hbaseRootDir
);
120 FSUtils
.setRootDir(conf
, hbaseRootDir
);
124 public static void tearDownAfterClass() throws Exception
{
125 UTIL
.shutdownMiniCluster();
129 public void setUp() throws Exception
{
130 this.conf
= HBaseConfiguration
.create(UTIL
.getConfiguration());
131 this.conf
.setBoolean(HConstants
.HREGION_EDITS_REPLAY_SKIP_ERRORS
, false);
132 this.fs
= UTIL
.getDFSCluster().getFileSystem();
133 this.rootDir
= FSUtils
.getRootDir(this.conf
);
134 this.oldLogDir
= new Path(this.rootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
136 ServerName
.valueOf(TEST_NAME
.getMethodName() + "-manual", 16010, System
.currentTimeMillis())
138 this.logName
= AbstractFSWALProvider
.getWALDirectoryName(serverName
);
139 this.logDir
= new Path(this.rootDir
, logName
);
140 if (UTIL
.getDFSCluster().getFileSystem().exists(this.rootDir
)) {
141 UTIL
.getDFSCluster().getFileSystem().delete(this.rootDir
, true);
143 this.wals
= new WALFactory(conf
, TEST_NAME
.getMethodName());
147 public void tearDown() throws Exception
{
149 UTIL
.getDFSCluster().getFileSystem().delete(this.rootDir
, true);
153 * @param p Directory to cleanup
155 private void deleteDir(final Path p
) throws IOException
{
156 if (this.fs
.exists(p
)) {
157 if (!this.fs
.delete(p
, true)) {
158 throw new IOException("Failed remove of " + p
);
163 private TableDescriptor
createBasic3FamilyTD(final TableName tableName
) throws IOException
{
164 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
165 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("a")).build());
166 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("b")).build());
167 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("c")).build());
168 TableDescriptor td
= builder
.build();
169 UTIL
.getAdmin().createTable(td
);
173 private WAL
createWAL(Configuration c
, Path hbaseRootDir
, String logName
) throws IOException
{
174 FSHLog wal
= new FSHLog(FileSystem
.get(c
), hbaseRootDir
, logName
, c
);
179 private Pair
<TableDescriptor
, RegionInfo
> setupTableAndRegion() throws IOException
{
180 final TableName tableName
= TableName
.valueOf(TEST_NAME
.getMethodName());
181 final TableDescriptor td
= createBasic3FamilyTD(tableName
);
182 final RegionInfo ri
= RegionInfoBuilder
.newBuilder(tableName
).build();
183 final Path tableDir
= FSUtils
.getTableDir(this.rootDir
, tableName
);
185 FSTableDescriptors
.createTableDescriptorForTableDirectory(fs
, tableDir
, td
, false);
186 HRegion region
= HBaseTestingUtility
.createRegionAndWAL(ri
, rootDir
, this.conf
, td
);
187 HBaseTestingUtility
.closeRegionAndWAL(region
);
188 return new Pair
<>(td
, ri
);
192 public void testCorruptRecoveredHFile() throws Exception
{
193 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
194 TableDescriptor td
= pair
.getFirst();
195 RegionInfo ri
= pair
.getSecond();
197 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
198 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
199 final long timestamp
= this.ee
.currentTime();
200 // Write data and flush
201 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
202 region
.put(new Put(ROW
).addColumn(cfd
.getName(), Bytes
.toBytes("x"), timestamp
, VALUE1
));
206 // Now assert edits made it in.
207 Result result1
= region
.get(new Get(ROW
));
208 assertEquals(td
.getColumnFamilies().length
, result1
.size());
209 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
210 assertTrue(Bytes
.equals(VALUE1
, result1
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
213 // Now close the region
217 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
219 // Write a corrupt recovered hfile
221 new Path(CommonFSUtils
.getTableDir(rootDir
, td
.getTableName()), ri
.getEncodedName());
222 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
224 WALSplitUtil
.getRecoveredHFiles(this.fs
, regionDir
, cfd
.getNameAsString());
225 assertNotNull(files
);
226 assertTrue(files
.length
> 0);
227 writeCorruptRecoveredHFile(files
[0].getPath());
230 // Failed to reopen the region
231 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
233 HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
234 fail("Should fail to open region");
235 } catch (CorruptHFileException che
) {
239 // Set skip errors to true and reopen the region
240 this.conf
.setBoolean(HConstants
.HREGION_EDITS_REPLAY_SKIP_ERRORS
, true);
241 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
242 Result result2
= region2
.get(new Get(ROW
));
243 assertEquals(td
.getColumnFamilies().length
, result2
.size());
244 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
245 assertTrue(Bytes
.equals(VALUE1
, result2
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
246 // Assert the corrupt file was skipped and still exist
248 WALSplitUtil
.getRecoveredHFiles(this.fs
, regionDir
, cfd
.getNameAsString());
249 assertNotNull(files
);
250 assertEquals(1, files
.length
);
251 assertTrue(files
[0].getPath().getName().contains("corrupt"));
256 * Test writing edits into an HRegion, closing it, splitting logs, opening
257 * Region again. Verify seqids.
260 public void testWrittenViaHRegion()
261 throws IOException
, SecurityException
, IllegalArgumentException
, InterruptedException
{
262 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
263 TableDescriptor td
= pair
.getFirst();
264 RegionInfo ri
= pair
.getSecond();
266 // Write countPerFamily edits into the three families. Do a flush on one
267 // of the families during the load of edits so its seqid is not same as
268 // others to test we do right thing when different seqids.
269 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
270 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
271 long seqid
= region
.getOpenSeqNum();
272 boolean first
= true;
273 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
274 addRegionEdits(ROW
, cfd
.getName(), countPerFamily
, this.ee
, region
, "x");
276 // If first, so we have at least one family w/ different seqid to rest.
281 // Now assert edits made it in.
282 final Get g
= new Get(ROW
);
283 Result result
= region
.get(g
);
284 assertEquals(countPerFamily
* td
.getColumnFamilies().length
, result
.size());
285 // Now close the region (without flush), split the log, reopen the region and assert that
286 // replay of log has the correct effect, that our seqids are calculated correctly so
287 // all edits in logs are seen as 'stale'/old.
291 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
292 } catch (Exception e
) {
293 LOG
.debug("Got exception", e
);
296 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
297 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
298 long seqid2
= region2
.getOpenSeqNum();
299 assertTrue(seqid
+ result
.size() < seqid2
);
300 final Result result1b
= region2
.get(g
);
301 assertEquals(result
.size(), result1b
.size());
303 // Next test. Add more edits, then 'crash' this region by stealing its wal
304 // out from under it and assert that replay of the log adds the edits back
305 // correctly when region is opened again.
306 for (ColumnFamilyDescriptor hcd
: td
.getColumnFamilies()) {
307 addRegionEdits(ROW
, hcd
.getName(), countPerFamily
, this.ee
, region2
, "y");
309 // Get count of edits.
310 final Result result2
= region2
.get(g
);
311 assertEquals(2 * result
.size(), result2
.size());
313 final Configuration newConf
= HBaseConfiguration
.create(this.conf
);
314 User user
= HBaseTestingUtility
.getDifferentUser(newConf
, td
.getTableName().getNameAsString());
315 user
.runAs(new PrivilegedExceptionAction
<Object
>() {
317 public Object
run() throws Exception
{
318 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(conf
), conf
, wals
);
319 FileSystem newFS
= FileSystem
.get(newConf
);
320 // Make a new wal for new region open.
321 WAL wal3
= createWAL(newConf
, rootDir
, logName
);
322 Path tableDir
= FSUtils
.getTableDir(rootDir
, td
.getTableName());
323 HRegion region3
= new HRegion(tableDir
, wal3
, newFS
, newConf
, ri
, td
, null);
324 long seqid3
= region3
.initialize();
325 Result result3
= region3
.get(g
);
326 // Assert that count of cells is same as before crash.
327 assertEquals(result2
.size(), result3
.size());
329 // I can't close wal1. Its been appropriated when we split.
338 * Test that we recover correctly when there is a failure in between the
339 * flushes. i.e. Some stores got flushed but others did not.
340 * Unfortunately, there is no easy hook to flush at a store level. The way
341 * we get around this is by flushing at the region level, and then deleting
342 * the recently flushed store file for one of the Stores. This would put us
343 * back in the situation where all but that store got flushed and the region
345 * We restart Region again, and verify that the edits were replayed.
348 public void testAfterPartialFlush()
349 throws IOException
, SecurityException
, IllegalArgumentException
{
350 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
351 TableDescriptor td
= pair
.getFirst();
352 RegionInfo ri
= pair
.getSecond();
354 // Write countPerFamily edits into the three families. Do a flush on one
355 // of the families during the load of edits so its seqid is not same as
356 // others to test we do right thing when different seqids.
357 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
358 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
359 long seqid
= region
.getOpenSeqNum();
360 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
361 addRegionEdits(ROW
, cfd
.getName(), countPerFamily
, this.ee
, region
, "x");
364 // Now assert edits made it in.
365 final Get g
= new Get(ROW
);
366 Result result
= region
.get(g
);
367 assertEquals(countPerFamily
* td
.getColumnFamilies().length
, result
.size());
369 // Let us flush the region
374 // delete the store files in the second column family to simulate a failure
375 // in between the flushcache();
376 // we have 3 families. killing the middle one ensures that taking the maximum
377 // will make us fail.
379 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
382 region
.getRegionFileSystem().deleteFamily(cfd
.getNameAsString());
386 // Let us try to split and recover
387 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
388 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
389 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
390 long seqid2
= region2
.getOpenSeqNum();
391 assertTrue(seqid
+ result
.size() < seqid2
);
393 final Result result1b
= region2
.get(g
);
394 assertEquals(result
.size(), result1b
.size());
398 * Test that we could recover the data correctly after aborting flush. In the
399 * test, first we abort flush after writing some data, then writing more data
400 * and flush again, at last verify the data.
403 public void testAfterAbortingFlush() throws IOException
{
404 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
405 TableDescriptor td
= pair
.getFirst();
406 RegionInfo ri
= pair
.getSecond();
408 // Write countPerFamily edits into the three families. Do a flush on one
409 // of the families during the load of edits so its seqid is not same as
410 // others to test we do right thing when different seqids.
411 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
412 RegionServerServices rsServices
= Mockito
.mock(RegionServerServices
.class);
413 Mockito
.doReturn(false).when(rsServices
).isAborted();
414 when(rsServices
.getServerName()).thenReturn(ServerName
.valueOf("foo", 10, 10));
415 when(rsServices
.getConfiguration()).thenReturn(conf
);
416 Configuration customConf
= new Configuration(this.conf
);
417 customConf
.set(DefaultStoreEngine
.DEFAULT_STORE_FLUSHER_CLASS_KEY
,
418 AbstractTestWALReplay
.CustomStoreFlusher
.class.getName());
419 HRegion region
= HRegion
.openHRegion(this.rootDir
, ri
, td
, wal
, customConf
, rsServices
, null);
420 int writtenRowCount
= 10;
421 List
<ColumnFamilyDescriptor
> families
= Arrays
.asList(td
.getColumnFamilies());
422 for (int i
= 0; i
< writtenRowCount
; i
++) {
423 Put put
= new Put(Bytes
.toBytes(td
.getTableName() + Integer
.toString(i
)));
424 put
.addColumn(families
.get(i
% families
.size()).getName(), Bytes
.toBytes("q"),
425 Bytes
.toBytes("val"));
429 // Now assert edits made it in.
430 RegionScanner scanner
= region
.getScanner(new Scan());
431 assertEquals(writtenRowCount
, getScannedCount(scanner
));
433 // Let us flush the region
434 AbstractTestWALReplay
.CustomStoreFlusher
.throwExceptionWhenFlushing
.set(true);
437 fail("Injected exception hasn't been thrown");
438 } catch (IOException e
) {
439 LOG
.info("Expected simulated exception when flushing region, {}", e
.getMessage());
440 // simulated to abort server
441 Mockito
.doReturn(true).when(rsServices
).isAborted();
442 region
.setClosing(false); // region normally does not accept writes after
443 // DroppedSnapshotException. We mock around it for this test.
447 for (int i
= writtenRowCount
; i
< writtenRowCount
+ moreRow
; i
++) {
448 Put put
= new Put(Bytes
.toBytes(td
.getTableName() + Integer
.toString(i
)));
449 put
.addColumn(families
.get(i
% families
.size()).getName(), Bytes
.toBytes("q"),
450 Bytes
.toBytes("val"));
453 writtenRowCount
+= moreRow
;
455 AbstractTestWALReplay
.CustomStoreFlusher
.throwExceptionWhenFlushing
.set(false);
458 } catch (IOException t
) {
460 "Expected exception when flushing region because server is stopped," + t
.getMessage());
466 // Let us try to split and recover
467 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
468 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
469 Mockito
.doReturn(false).when(rsServices
).isAborted();
470 HRegion region2
= HRegion
.openHRegion(this.rootDir
, ri
, td
, wal2
, this.conf
, rsServices
, null);
471 scanner
= region2
.getScanner(new Scan());
472 assertEquals(writtenRowCount
, getScannedCount(scanner
));
475 private int getScannedCount(RegionScanner scanner
) throws IOException
{
476 int scannedCount
= 0;
477 List
<Cell
> results
= new ArrayList
<>();
479 boolean existMore
= scanner
.next(results
);
480 if (!results
.isEmpty()) {
491 private void writeCorruptRecoveredHFile(Path recoveredHFile
) throws Exception
{
492 // Read the recovered hfile
493 int fileSize
= (int) fs
.listStatus(recoveredHFile
)[0].getLen();
494 FSDataInputStream in
= fs
.open(recoveredHFile
);
495 byte[] fileContent
= new byte[fileSize
];
496 in
.readFully(0, fileContent
, 0, fileSize
);
499 // Write a corrupt hfile by append garbage
500 Path path
= new Path(recoveredHFile
.getParent(), recoveredHFile
.getName() + ".corrupt");
501 FSDataOutputStream out
;
502 out
= fs
.create(path
);
503 out
.write(fileContent
);
504 out
.write(Bytes
.toBytes("-----"));