3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.wal
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractTestWALReplay
.addRegionEdits
;
22 import static org
.apache
.hadoop
.hbase
.wal
.BoundedRecoveredHFilesOutputSink
.WAL_SPLIT_TO_HFILE
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertNotNull
;
25 import static org
.junit
.Assert
.assertTrue
;
26 import static org
.junit
.Assert
.fail
;
27 import static org
.mockito
.Mockito
.when
;
29 import java
.io
.IOException
;
30 import java
.security
.PrivilegedExceptionAction
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Arrays
;
33 import java
.util
.HashMap
;
34 import java
.util
.List
;
37 import org
.apache
.hadoop
.conf
.Configuration
;
38 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
39 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
40 import org
.apache
.hadoop
.fs
.FileStatus
;
41 import org
.apache
.hadoop
.fs
.FileSystem
;
42 import org
.apache
.hadoop
.fs
.Path
;
43 import org
.apache
.hadoop
.hbase
.Cell
;
44 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
45 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
46 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
47 import org
.apache
.hadoop
.hbase
.HConstants
;
48 import org
.apache
.hadoop
.hbase
.ServerName
;
49 import org
.apache
.hadoop
.hbase
.TableName
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
51 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
52 import org
.apache
.hadoop
.hbase
.client
.Get
;
53 import org
.apache
.hadoop
.hbase
.client
.Put
;
54 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
55 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
56 import org
.apache
.hadoop
.hbase
.client
.Result
;
57 import org
.apache
.hadoop
.hbase
.client
.Scan
;
58 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
59 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
60 import org
.apache
.hadoop
.hbase
.io
.hfile
.CorruptHFileException
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.DefaultStoreEngine
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
65 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractTestWALReplay
;
66 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FSHLog
;
67 import org
.apache
.hadoop
.hbase
.security
.User
;
68 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
69 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
70 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
71 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
72 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdge
;
73 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
74 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
75 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
76 import org
.apache
.hadoop
.hbase
.util
.Pair
;
77 import org
.junit
.After
;
78 import org
.junit
.AfterClass
;
79 import org
.junit
.Before
;
80 import org
.junit
.BeforeClass
;
81 import org
.junit
.ClassRule
;
82 import org
.junit
.Rule
;
83 import org
.junit
.Test
;
84 import org
.junit
.experimental
.categories
.Category
;
85 import org
.junit
.rules
.TestName
;
86 import org
.mockito
.Mockito
;
87 import org
.slf4j
.Logger
;
88 import org
.slf4j
.LoggerFactory
;
90 @Category({ RegionServerTests
.class, MediumTests
.class })
91 public class TestWALSplitToHFile
{
93 public static final HBaseClassTestRule CLASS_RULE
=
94 HBaseClassTestRule
.forClass(TestWALSplitToHFile
.class);
96 private static final Logger LOG
= LoggerFactory
.getLogger(AbstractTestWALReplay
.class);
97 static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
98 private final EnvironmentEdge ee
= EnvironmentEdgeManager
.getDelegate();
99 private Path rootDir
= null;
100 private String logName
;
101 private Path oldLogDir
;
103 private FileSystem fs
;
104 private Configuration conf
;
105 private WALFactory wals
;
107 private static final byte[] ROW
= Bytes
.toBytes("row");
108 private static final byte[] QUALIFIER
= Bytes
.toBytes("q");
109 private static final byte[] VALUE1
= Bytes
.toBytes("value1");
110 private static final byte[] VALUE2
= Bytes
.toBytes("value2");
111 private static final int countPerFamily
= 10;
114 public final TestName TEST_NAME
= new TestName();
117 public static void setUpBeforeClass() throws Exception
{
118 Configuration conf
= UTIL
.getConfiguration();
119 conf
.setBoolean(WAL_SPLIT_TO_HFILE
, true);
120 UTIL
.startMiniCluster(3);
121 Path hbaseRootDir
= UTIL
.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
122 LOG
.info("hbase.rootdir=" + hbaseRootDir
);
123 FSUtils
.setRootDir(conf
, hbaseRootDir
);
127 public static void tearDownAfterClass() throws Exception
{
128 UTIL
.shutdownMiniCluster();
132 public void setUp() throws Exception
{
133 this.conf
= HBaseConfiguration
.create(UTIL
.getConfiguration());
134 this.conf
.setBoolean(HConstants
.HREGION_EDITS_REPLAY_SKIP_ERRORS
, false);
135 this.fs
= UTIL
.getDFSCluster().getFileSystem();
136 this.rootDir
= FSUtils
.getRootDir(this.conf
);
137 this.oldLogDir
= new Path(this.rootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
139 ServerName
.valueOf(TEST_NAME
.getMethodName() + "-manual", 16010, System
.currentTimeMillis())
141 this.logName
= AbstractFSWALProvider
.getWALDirectoryName(serverName
);
142 this.logDir
= new Path(this.rootDir
, logName
);
143 if (UTIL
.getDFSCluster().getFileSystem().exists(this.rootDir
)) {
144 UTIL
.getDFSCluster().getFileSystem().delete(this.rootDir
, true);
146 this.wals
= new WALFactory(conf
, TEST_NAME
.getMethodName());
150 public void tearDown() throws Exception
{
152 UTIL
.getDFSCluster().getFileSystem().delete(this.rootDir
, true);
156 * @param p Directory to cleanup
158 private void deleteDir(final Path p
) throws IOException
{
159 if (this.fs
.exists(p
)) {
160 if (!this.fs
.delete(p
, true)) {
161 throw new IOException("Failed remove of " + p
);
166 private TableDescriptor
createBasic3FamilyTD(final TableName tableName
) throws IOException
{
167 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
168 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("a")).build());
169 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("b")).build());
170 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("c")).build());
171 TableDescriptor td
= builder
.build();
172 UTIL
.getAdmin().createTable(td
);
176 private WAL
createWAL(Configuration c
, Path hbaseRootDir
, String logName
) throws IOException
{
177 FSHLog wal
= new FSHLog(FileSystem
.get(c
), hbaseRootDir
, logName
, c
);
182 private WAL
createWAL(FileSystem fs
, Path hbaseRootDir
, String logName
) throws IOException
{
183 FSHLog wal
= new FSHLog(fs
, hbaseRootDir
, logName
, this.conf
);
188 private Pair
<TableDescriptor
, RegionInfo
> setupTableAndRegion() throws IOException
{
189 final TableName tableName
= TableName
.valueOf(TEST_NAME
.getMethodName());
190 final TableDescriptor td
= createBasic3FamilyTD(tableName
);
191 final RegionInfo ri
= RegionInfoBuilder
.newBuilder(tableName
).build();
192 final Path tableDir
= FSUtils
.getTableDir(this.rootDir
, tableName
);
194 FSTableDescriptors
.createTableDescriptorForTableDirectory(fs
, tableDir
, td
, false);
195 HRegion region
= HBaseTestingUtility
.createRegionAndWAL(ri
, rootDir
, this.conf
, td
);
196 HBaseTestingUtility
.closeRegionAndWAL(region
);
197 return new Pair
<>(td
, ri
);
200 private void writeData(TableDescriptor td
, HRegion region
) throws IOException
{
201 final long timestamp
= this.ee
.currentTime();
202 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
203 region
.put(new Put(ROW
).addColumn(cfd
.getName(), QUALIFIER
, timestamp
, VALUE1
));
208 public void testDifferentRootDirAndWALRootDir() throws Exception
{
209 // Change wal root dir and reset the configuration
210 Path walRootDir
= UTIL
.createWALRootDir();
211 this.conf
= HBaseConfiguration
.create(UTIL
.getConfiguration());
213 FileSystem walFs
= FSUtils
.getWALFileSystem(this.conf
);
214 this.oldLogDir
= new Path(walRootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
216 ServerName
.valueOf(TEST_NAME
.getMethodName() + "-manual", 16010, System
.currentTimeMillis())
218 this.logName
= AbstractFSWALProvider
.getWALDirectoryName(serverName
);
219 this.logDir
= new Path(walRootDir
, logName
);
220 this.wals
= new WALFactory(conf
, TEST_NAME
.getMethodName());
222 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
223 TableDescriptor td
= pair
.getFirst();
224 RegionInfo ri
= pair
.getSecond();
226 WAL wal
= createWAL(walFs
, walRootDir
, logName
);
227 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
228 writeData(td
, region
);
230 // Now close the region without flush
234 WALSplitter
.split(walRootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
236 WAL wal2
= createWAL(walFs
, walRootDir
, logName
);
237 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
238 Result result2
= region2
.get(new Get(ROW
));
239 assertEquals(td
.getColumnFamilies().length
, result2
.size());
240 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
241 assertTrue(Bytes
.equals(VALUE1
, result2
.getValue(cfd
.getName(), QUALIFIER
)));
246 public void testCorruptRecoveredHFile() throws Exception
{
247 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
248 TableDescriptor td
= pair
.getFirst();
249 RegionInfo ri
= pair
.getSecond();
251 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
252 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
253 writeData(td
, region
);
255 // Now close the region without flush
259 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
261 // Write a corrupt recovered hfile
263 new Path(CommonFSUtils
.getTableDir(rootDir
, td
.getTableName()), ri
.getEncodedName());
264 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
266 WALSplitUtil
.getRecoveredHFiles(this.fs
, regionDir
, cfd
.getNameAsString());
267 assertNotNull(files
);
268 assertTrue(files
.length
> 0);
269 writeCorruptRecoveredHFile(files
[0].getPath());
272 // Failed to reopen the region
273 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
275 HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
276 fail("Should fail to open region");
277 } catch (CorruptHFileException che
) {
281 // Set skip errors to true and reopen the region
282 this.conf
.setBoolean(HConstants
.HREGION_EDITS_REPLAY_SKIP_ERRORS
, true);
283 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
284 Result result2
= region2
.get(new Get(ROW
));
285 assertEquals(td
.getColumnFamilies().length
, result2
.size());
286 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
287 assertTrue(Bytes
.equals(VALUE1
, result2
.getValue(cfd
.getName(), QUALIFIER
)));
288 // Assert the corrupt file was skipped and still exist
290 WALSplitUtil
.getRecoveredHFiles(this.fs
, regionDir
, cfd
.getNameAsString());
291 assertNotNull(files
);
292 assertEquals(1, files
.length
);
293 assertTrue(files
[0].getPath().getName().contains("corrupt"));
298 public void testPutWithSameTimestamp() throws Exception
{
299 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
300 TableDescriptor td
= pair
.getFirst();
301 RegionInfo ri
= pair
.getSecond();
303 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
304 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
305 final long timestamp
= this.ee
.currentTime();
306 // Write data and flush
307 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
308 region
.put(new Put(ROW
).addColumn(cfd
.getName(), QUALIFIER
, timestamp
, VALUE1
));
312 // Write data with same timestamp and do not flush
313 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
314 region
.put(new Put(ROW
).addColumn(cfd
.getName(), QUALIFIER
, timestamp
, VALUE2
));
316 // Now close the region without flush
320 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
323 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
324 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
325 Result result2
= region2
.get(new Get(ROW
));
326 assertEquals(td
.getColumnFamilies().length
, result2
.size());
327 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
328 assertTrue(Bytes
.equals(VALUE2
, result2
.getValue(cfd
.getName(), QUALIFIER
)));
333 public void testRecoverSequenceId() throws Exception
{
334 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
335 TableDescriptor td
= pair
.getFirst();
336 RegionInfo ri
= pair
.getSecond();
338 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
339 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
340 Map
<Integer
, Map
<String
, Long
>> seqIdMap
= new HashMap
<>();
341 // Write data and do not flush
342 for (int i
= 0; i
< countPerFamily
; i
++) {
343 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
344 region
.put(new Put(Bytes
.toBytes(i
)).addColumn(cfd
.getName(), QUALIFIER
, VALUE1
));
345 Result result
= region
.get(new Get(Bytes
.toBytes(i
)).addFamily(cfd
.getName()));
346 assertTrue(Bytes
.equals(VALUE1
, result
.getValue(cfd
.getName(), QUALIFIER
)));
347 List
<Cell
> cells
= result
.listCells();
348 assertEquals(1, cells
.size());
349 seqIdMap
.computeIfAbsent(i
, r
-> new HashMap
<>()).put(cfd
.getNameAsString(),
350 cells
.get(0).getSequenceId());
354 // Now close the region without flush
358 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
361 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
362 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
363 // assert the seqid was recovered
364 for (int i
= 0; i
< countPerFamily
; i
++) {
365 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
366 Result result
= region2
.get(new Get(Bytes
.toBytes(i
)).addFamily(cfd
.getName()));
367 assertTrue(Bytes
.equals(VALUE1
, result
.getValue(cfd
.getName(), QUALIFIER
)));
368 List
<Cell
> cells
= result
.listCells();
369 assertEquals(1, cells
.size());
370 assertEquals((long) seqIdMap
.get(i
).get(cfd
.getNameAsString()),
371 cells
.get(0).getSequenceId());
377 * Test writing edits into an HRegion, closing it, splitting logs, opening
378 * Region again. Verify seqids.
381 public void testWrittenViaHRegion()
382 throws IOException
, SecurityException
, IllegalArgumentException
, InterruptedException
{
383 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
384 TableDescriptor td
= pair
.getFirst();
385 RegionInfo ri
= pair
.getSecond();
387 // Write countPerFamily edits into the three families. Do a flush on one
388 // of the families during the load of edits so its seqid is not same as
389 // others to test we do right thing when different seqids.
390 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
391 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
392 long seqid
= region
.getOpenSeqNum();
393 boolean first
= true;
394 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
395 addRegionEdits(ROW
, cfd
.getName(), countPerFamily
, this.ee
, region
, "x");
397 // If first, so we have at least one family w/ different seqid to rest.
402 // Now assert edits made it in.
403 final Get g
= new Get(ROW
);
404 Result result
= region
.get(g
);
405 assertEquals(countPerFamily
* td
.getColumnFamilies().length
, result
.size());
406 // Now close the region (without flush), split the log, reopen the region and assert that
407 // replay of log has the correct effect, that our seqids are calculated correctly so
408 // all edits in logs are seen as 'stale'/old.
412 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
413 } catch (Exception e
) {
414 LOG
.debug("Got exception", e
);
417 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
418 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
419 long seqid2
= region2
.getOpenSeqNum();
420 assertTrue(seqid
+ result
.size() < seqid2
);
421 final Result result1b
= region2
.get(g
);
422 assertEquals(result
.size(), result1b
.size());
424 // Next test. Add more edits, then 'crash' this region by stealing its wal
425 // out from under it and assert that replay of the log adds the edits back
426 // correctly when region is opened again.
427 for (ColumnFamilyDescriptor hcd
: td
.getColumnFamilies()) {
428 addRegionEdits(ROW
, hcd
.getName(), countPerFamily
, this.ee
, region2
, "y");
430 // Get count of edits.
431 final Result result2
= region2
.get(g
);
432 assertEquals(2 * result
.size(), result2
.size());
434 final Configuration newConf
= HBaseConfiguration
.create(this.conf
);
435 User user
= HBaseTestingUtility
.getDifferentUser(newConf
, td
.getTableName().getNameAsString());
436 user
.runAs(new PrivilegedExceptionAction
<Object
>() {
438 public Object
run() throws Exception
{
439 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(conf
), conf
, wals
);
440 FileSystem newFS
= FileSystem
.get(newConf
);
441 // Make a new wal for new region open.
442 WAL wal3
= createWAL(newConf
, rootDir
, logName
);
443 Path tableDir
= FSUtils
.getTableDir(rootDir
, td
.getTableName());
444 HRegion region3
= new HRegion(tableDir
, wal3
, newFS
, newConf
, ri
, td
, null);
445 long seqid3
= region3
.initialize();
446 Result result3
= region3
.get(g
);
447 // Assert that count of cells is same as before crash.
448 assertEquals(result2
.size(), result3
.size());
450 // I can't close wal1. Its been appropriated when we split.
459 * Test that we recover correctly when there is a failure in between the
460 * flushes. i.e. Some stores got flushed but others did not.
461 * Unfortunately, there is no easy hook to flush at a store level. The way
462 * we get around this is by flushing at the region level, and then deleting
463 * the recently flushed store file for one of the Stores. This would put us
464 * back in the situation where all but that store got flushed and the region
466 * We restart Region again, and verify that the edits were replayed.
469 public void testAfterPartialFlush()
470 throws IOException
, SecurityException
, IllegalArgumentException
{
471 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
472 TableDescriptor td
= pair
.getFirst();
473 RegionInfo ri
= pair
.getSecond();
475 // Write countPerFamily edits into the three families. Do a flush on one
476 // of the families during the load of edits so its seqid is not same as
477 // others to test we do right thing when different seqids.
478 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
479 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
480 long seqid
= region
.getOpenSeqNum();
481 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
482 addRegionEdits(ROW
, cfd
.getName(), countPerFamily
, this.ee
, region
, "x");
485 // Now assert edits made it in.
486 final Get g
= new Get(ROW
);
487 Result result
= region
.get(g
);
488 assertEquals(countPerFamily
* td
.getColumnFamilies().length
, result
.size());
490 // Let us flush the region
495 // delete the store files in the second column family to simulate a failure
496 // in between the flushcache();
497 // we have 3 families. killing the middle one ensures that taking the maximum
498 // will make us fail.
500 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
503 region
.getRegionFileSystem().deleteFamily(cfd
.getNameAsString());
507 // Let us try to split and recover
508 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
509 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
510 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
511 long seqid2
= region2
.getOpenSeqNum();
512 assertTrue(seqid
+ result
.size() < seqid2
);
514 final Result result1b
= region2
.get(g
);
515 assertEquals(result
.size(), result1b
.size());
519 * Test that we could recover the data correctly after aborting flush. In the
520 * test, first we abort flush after writing some data, then writing more data
521 * and flush again, at last verify the data.
524 public void testAfterAbortingFlush() throws IOException
{
525 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
526 TableDescriptor td
= pair
.getFirst();
527 RegionInfo ri
= pair
.getSecond();
529 // Write countPerFamily edits into the three families. Do a flush on one
530 // of the families during the load of edits so its seqid is not same as
531 // others to test we do right thing when different seqids.
532 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
533 RegionServerServices rsServices
= Mockito
.mock(RegionServerServices
.class);
534 Mockito
.doReturn(false).when(rsServices
).isAborted();
535 when(rsServices
.getServerName()).thenReturn(ServerName
.valueOf("foo", 10, 10));
536 when(rsServices
.getConfiguration()).thenReturn(conf
);
537 Configuration customConf
= new Configuration(this.conf
);
538 customConf
.set(DefaultStoreEngine
.DEFAULT_STORE_FLUSHER_CLASS_KEY
,
539 AbstractTestWALReplay
.CustomStoreFlusher
.class.getName());
540 HRegion region
= HRegion
.openHRegion(this.rootDir
, ri
, td
, wal
, customConf
, rsServices
, null);
541 int writtenRowCount
= 10;
542 List
<ColumnFamilyDescriptor
> families
= Arrays
.asList(td
.getColumnFamilies());
543 for (int i
= 0; i
< writtenRowCount
; i
++) {
544 Put put
= new Put(Bytes
.toBytes(td
.getTableName() + Integer
.toString(i
)));
545 put
.addColumn(families
.get(i
% families
.size()).getName(), Bytes
.toBytes("q"),
546 Bytes
.toBytes("val"));
550 // Now assert edits made it in.
551 RegionScanner scanner
= region
.getScanner(new Scan());
552 assertEquals(writtenRowCount
, getScannedCount(scanner
));
554 // Let us flush the region
555 AbstractTestWALReplay
.CustomStoreFlusher
.throwExceptionWhenFlushing
.set(true);
558 fail("Injected exception hasn't been thrown");
559 } catch (IOException e
) {
560 LOG
.info("Expected simulated exception when flushing region, {}", e
.getMessage());
561 // simulated to abort server
562 Mockito
.doReturn(true).when(rsServices
).isAborted();
563 region
.setClosing(false); // region normally does not accept writes after
564 // DroppedSnapshotException. We mock around it for this test.
568 for (int i
= writtenRowCount
; i
< writtenRowCount
+ moreRow
; i
++) {
569 Put put
= new Put(Bytes
.toBytes(td
.getTableName() + Integer
.toString(i
)));
570 put
.addColumn(families
.get(i
% families
.size()).getName(), Bytes
.toBytes("q"),
571 Bytes
.toBytes("val"));
574 writtenRowCount
+= moreRow
;
576 AbstractTestWALReplay
.CustomStoreFlusher
.throwExceptionWhenFlushing
.set(false);
579 } catch (IOException t
) {
581 "Expected exception when flushing region because server is stopped," + t
.getMessage());
587 // Let us try to split and recover
588 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
589 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
590 Mockito
.doReturn(false).when(rsServices
).isAborted();
591 HRegion region2
= HRegion
.openHRegion(this.rootDir
, ri
, td
, wal2
, this.conf
, rsServices
, null);
592 scanner
= region2
.getScanner(new Scan());
593 assertEquals(writtenRowCount
, getScannedCount(scanner
));
596 private int getScannedCount(RegionScanner scanner
) throws IOException
{
597 int scannedCount
= 0;
598 List
<Cell
> results
= new ArrayList
<>();
600 boolean existMore
= scanner
.next(results
);
601 if (!results
.isEmpty()) {
612 private void writeCorruptRecoveredHFile(Path recoveredHFile
) throws Exception
{
613 // Read the recovered hfile
614 int fileSize
= (int) fs
.listStatus(recoveredHFile
)[0].getLen();
615 FSDataInputStream in
= fs
.open(recoveredHFile
);
616 byte[] fileContent
= new byte[fileSize
];
617 in
.readFully(0, fileContent
, 0, fileSize
);
620 // Write a corrupt hfile by append garbage
621 Path path
= new Path(recoveredHFile
.getParent(), recoveredHFile
.getName() + ".corrupt");
622 FSDataOutputStream out
;
623 out
= fs
.create(path
);
624 out
.write(fileContent
);
625 out
.write(Bytes
.toBytes("-----"));