3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.wal
;
21 import static org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractTestWALReplay
.addRegionEdits
;
22 import static org
.apache
.hadoop
.hbase
.wal
.BoundedRecoveredHFilesOutputSink
.WAL_SPLIT_TO_HFILE
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertNotNull
;
25 import static org
.junit
.Assert
.assertTrue
;
26 import static org
.junit
.Assert
.fail
;
27 import static org
.mockito
.Mockito
.when
;
29 import java
.io
.IOException
;
30 import java
.security
.PrivilegedExceptionAction
;
31 import java
.util
.ArrayList
;
32 import java
.util
.Arrays
;
33 import java
.util
.HashMap
;
34 import java
.util
.List
;
37 import org
.apache
.hadoop
.conf
.Configuration
;
38 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
39 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
40 import org
.apache
.hadoop
.fs
.FileStatus
;
41 import org
.apache
.hadoop
.fs
.FileSystem
;
42 import org
.apache
.hadoop
.fs
.Path
;
43 import org
.apache
.hadoop
.hbase
.Cell
;
44 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
45 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
46 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
47 import org
.apache
.hadoop
.hbase
.HConstants
;
48 import org
.apache
.hadoop
.hbase
.ServerName
;
49 import org
.apache
.hadoop
.hbase
.TableName
;
50 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
51 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
52 import org
.apache
.hadoop
.hbase
.client
.Get
;
53 import org
.apache
.hadoop
.hbase
.client
.Put
;
54 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
55 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
56 import org
.apache
.hadoop
.hbase
.client
.Result
;
57 import org
.apache
.hadoop
.hbase
.client
.Scan
;
58 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
59 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
60 import org
.apache
.hadoop
.hbase
.io
.hfile
.CorruptHFileException
;
61 import org
.apache
.hadoop
.hbase
.regionserver
.DefaultStoreEngine
;
62 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
63 import org
.apache
.hadoop
.hbase
.regionserver
.RegionScanner
;
64 import org
.apache
.hadoop
.hbase
.regionserver
.RegionServerServices
;
65 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractTestWALReplay
;
66 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.FSHLog
;
67 import org
.apache
.hadoop
.hbase
.security
.User
;
68 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
69 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
70 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
71 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
72 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdge
;
73 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
74 import org
.apache
.hadoop
.hbase
.util
.FSTableDescriptors
;
75 import org
.apache
.hadoop
.hbase
.util
.FSUtils
;
76 import org
.apache
.hadoop
.hbase
.util
.Pair
;
77 import org
.junit
.After
;
78 import org
.junit
.AfterClass
;
79 import org
.junit
.Before
;
80 import org
.junit
.BeforeClass
;
81 import org
.junit
.ClassRule
;
82 import org
.junit
.Rule
;
83 import org
.junit
.Test
;
84 import org
.junit
.experimental
.categories
.Category
;
85 import org
.junit
.rules
.TestName
;
86 import org
.mockito
.Mockito
;
87 import org
.slf4j
.Logger
;
88 import org
.slf4j
.LoggerFactory
;
90 @Category({ RegionServerTests
.class, MediumTests
.class })
91 public class TestWALSplitToHFile
{
93 public static final HBaseClassTestRule CLASS_RULE
=
94 HBaseClassTestRule
.forClass(TestWALSplitToHFile
.class);
96 private static final Logger LOG
= LoggerFactory
.getLogger(AbstractTestWALReplay
.class);
97 static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
98 private final EnvironmentEdge ee
= EnvironmentEdgeManager
.getDelegate();
99 private Path rootDir
= null;
100 private String logName
;
101 private Path oldLogDir
;
103 private FileSystem fs
;
104 private Configuration conf
;
105 private WALFactory wals
;
107 private static final byte[] ROW
= Bytes
.toBytes("row");
108 private static final byte[] VALUE1
= Bytes
.toBytes("value1");
109 private static final byte[] VALUE2
= Bytes
.toBytes("value2");
110 private static final int countPerFamily
= 10;
113 public final TestName TEST_NAME
= new TestName();
116 public static void setUpBeforeClass() throws Exception
{
117 Configuration conf
= UTIL
.getConfiguration();
118 conf
.setBoolean(WAL_SPLIT_TO_HFILE
, true);
119 UTIL
.startMiniCluster(3);
120 Path hbaseRootDir
= UTIL
.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
121 LOG
.info("hbase.rootdir=" + hbaseRootDir
);
122 FSUtils
.setRootDir(conf
, hbaseRootDir
);
126 public static void tearDownAfterClass() throws Exception
{
127 UTIL
.shutdownMiniCluster();
131 public void setUp() throws Exception
{
132 this.conf
= HBaseConfiguration
.create(UTIL
.getConfiguration());
133 this.conf
.setBoolean(HConstants
.HREGION_EDITS_REPLAY_SKIP_ERRORS
, false);
134 this.fs
= UTIL
.getDFSCluster().getFileSystem();
135 this.rootDir
= FSUtils
.getRootDir(this.conf
);
136 this.oldLogDir
= new Path(this.rootDir
, HConstants
.HREGION_OLDLOGDIR_NAME
);
138 ServerName
.valueOf(TEST_NAME
.getMethodName() + "-manual", 16010, System
.currentTimeMillis())
140 this.logName
= AbstractFSWALProvider
.getWALDirectoryName(serverName
);
141 this.logDir
= new Path(this.rootDir
, logName
);
142 if (UTIL
.getDFSCluster().getFileSystem().exists(this.rootDir
)) {
143 UTIL
.getDFSCluster().getFileSystem().delete(this.rootDir
, true);
145 this.wals
= new WALFactory(conf
, TEST_NAME
.getMethodName());
149 public void tearDown() throws Exception
{
151 UTIL
.getDFSCluster().getFileSystem().delete(this.rootDir
, true);
155 * @param p Directory to cleanup
157 private void deleteDir(final Path p
) throws IOException
{
158 if (this.fs
.exists(p
)) {
159 if (!this.fs
.delete(p
, true)) {
160 throw new IOException("Failed remove of " + p
);
165 private TableDescriptor
createBasic3FamilyTD(final TableName tableName
) throws IOException
{
166 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
167 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("a")).build());
168 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("b")).build());
169 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("c")).build());
170 TableDescriptor td
= builder
.build();
171 UTIL
.getAdmin().createTable(td
);
175 private WAL
createWAL(Configuration c
, Path hbaseRootDir
, String logName
) throws IOException
{
176 FSHLog wal
= new FSHLog(FileSystem
.get(c
), hbaseRootDir
, logName
, c
);
181 private Pair
<TableDescriptor
, RegionInfo
> setupTableAndRegion() throws IOException
{
182 final TableName tableName
= TableName
.valueOf(TEST_NAME
.getMethodName());
183 final TableDescriptor td
= createBasic3FamilyTD(tableName
);
184 final RegionInfo ri
= RegionInfoBuilder
.newBuilder(tableName
).build();
185 final Path tableDir
= FSUtils
.getTableDir(this.rootDir
, tableName
);
187 FSTableDescriptors
.createTableDescriptorForTableDirectory(fs
, tableDir
, td
, false);
188 HRegion region
= HBaseTestingUtility
.createRegionAndWAL(ri
, rootDir
, this.conf
, td
);
189 HBaseTestingUtility
.closeRegionAndWAL(region
);
190 return new Pair
<>(td
, ri
);
194 public void testCorruptRecoveredHFile() throws Exception
{
195 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
196 TableDescriptor td
= pair
.getFirst();
197 RegionInfo ri
= pair
.getSecond();
199 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
200 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
201 final long timestamp
= this.ee
.currentTime();
202 // Write data and flush
203 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
204 region
.put(new Put(ROW
).addColumn(cfd
.getName(), Bytes
.toBytes("x"), timestamp
, VALUE1
));
208 // Now assert edits made it in.
209 Result result1
= region
.get(new Get(ROW
));
210 assertEquals(td
.getColumnFamilies().length
, result1
.size());
211 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
212 assertTrue(Bytes
.equals(VALUE1
, result1
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
215 // Now close the region
219 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
221 // Write a corrupt recovered hfile
223 new Path(CommonFSUtils
.getTableDir(rootDir
, td
.getTableName()), ri
.getEncodedName());
224 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
226 WALSplitUtil
.getRecoveredHFiles(this.fs
, regionDir
, cfd
.getNameAsString());
227 assertNotNull(files
);
228 assertTrue(files
.length
> 0);
229 writeCorruptRecoveredHFile(files
[0].getPath());
232 // Failed to reopen the region
233 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
235 HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
236 fail("Should fail to open region");
237 } catch (CorruptHFileException che
) {
241 // Set skip errors to true and reopen the region
242 this.conf
.setBoolean(HConstants
.HREGION_EDITS_REPLAY_SKIP_ERRORS
, true);
243 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
244 Result result2
= region2
.get(new Get(ROW
));
245 assertEquals(td
.getColumnFamilies().length
, result2
.size());
246 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
247 assertTrue(Bytes
.equals(VALUE1
, result2
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
248 // Assert the corrupt file was skipped and still exist
250 WALSplitUtil
.getRecoveredHFiles(this.fs
, regionDir
, cfd
.getNameAsString());
251 assertNotNull(files
);
252 assertEquals(1, files
.length
);
253 assertTrue(files
[0].getPath().getName().contains("corrupt"));
258 public void testPutWithSameTimestamp() throws Exception
{
259 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
260 TableDescriptor td
= pair
.getFirst();
261 RegionInfo ri
= pair
.getSecond();
263 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
264 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
265 final long timestamp
= this.ee
.currentTime();
266 // Write data and flush
267 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
268 region
.put(new Put(ROW
).addColumn(cfd
.getName(), Bytes
.toBytes("x"), timestamp
, VALUE1
));
272 // Now assert edits made it in.
273 Result result1
= region
.get(new Get(ROW
));
274 assertEquals(td
.getColumnFamilies().length
, result1
.size());
275 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
276 assertTrue(Bytes
.equals(VALUE1
, result1
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
279 // Write data with same timestamp and do not flush
280 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
281 region
.put(new Put(ROW
).addColumn(cfd
.getName(), Bytes
.toBytes("x"), timestamp
, VALUE2
));
283 // Now close the region (without flush)
287 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
290 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
291 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
292 Result result2
= region2
.get(new Get(ROW
));
293 assertEquals(td
.getColumnFamilies().length
, result2
.size());
294 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
295 assertTrue(Bytes
.equals(VALUE2
, result2
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
300 public void testRecoverSequenceId() throws Exception
{
301 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
302 TableDescriptor td
= pair
.getFirst();
303 RegionInfo ri
= pair
.getSecond();
305 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
306 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
307 Map
<Integer
, Map
<String
, Long
>> seqIdMap
= new HashMap
<>();
308 // Write data and do not flush
309 for (int i
= 0; i
< countPerFamily
; i
++) {
310 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
311 region
.put(new Put(Bytes
.toBytes(i
)).addColumn(cfd
.getName(), Bytes
.toBytes("x"), VALUE1
));
312 Result result
= region
.get(new Get(Bytes
.toBytes(i
)).addFamily(cfd
.getName()));
313 assertTrue(Bytes
.equals(VALUE1
, result
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
314 List
<Cell
> cells
= result
.listCells();
315 assertEquals(1, cells
.size());
316 seqIdMap
.computeIfAbsent(i
, r
-> new HashMap
<>()).put(cfd
.getNameAsString(),
317 cells
.get(0).getSequenceId());
321 // Now close the region (without flush)
325 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
328 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
329 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
330 // assert the seqid was recovered
331 for (int i
= 0; i
< countPerFamily
; i
++) {
332 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
333 Result result
= region2
.get(new Get(Bytes
.toBytes(i
)).addFamily(cfd
.getName()));
334 assertTrue(Bytes
.equals(VALUE1
, result
.getValue(cfd
.getName(), Bytes
.toBytes("x"))));
335 List
<Cell
> cells
= result
.listCells();
336 assertEquals(1, cells
.size());
337 assertEquals((long) seqIdMap
.get(i
).get(cfd
.getNameAsString()),
338 cells
.get(0).getSequenceId());
344 * Test writing edits into an HRegion, closing it, splitting logs, opening
345 * Region again. Verify seqids.
348 public void testWrittenViaHRegion()
349 throws IOException
, SecurityException
, IllegalArgumentException
, InterruptedException
{
350 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
351 TableDescriptor td
= pair
.getFirst();
352 RegionInfo ri
= pair
.getSecond();
354 // Write countPerFamily edits into the three families. Do a flush on one
355 // of the families during the load of edits so its seqid is not same as
356 // others to test we do right thing when different seqids.
357 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
358 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
359 long seqid
= region
.getOpenSeqNum();
360 boolean first
= true;
361 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
362 addRegionEdits(ROW
, cfd
.getName(), countPerFamily
, this.ee
, region
, "x");
364 // If first, so we have at least one family w/ different seqid to rest.
369 // Now assert edits made it in.
370 final Get g
= new Get(ROW
);
371 Result result
= region
.get(g
);
372 assertEquals(countPerFamily
* td
.getColumnFamilies().length
, result
.size());
373 // Now close the region (without flush), split the log, reopen the region and assert that
374 // replay of log has the correct effect, that our seqids are calculated correctly so
375 // all edits in logs are seen as 'stale'/old.
379 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
380 } catch (Exception e
) {
381 LOG
.debug("Got exception", e
);
384 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
385 HRegion region2
= HRegion
.openHRegion(conf
, this.fs
, rootDir
, ri
, td
, wal2
);
386 long seqid2
= region2
.getOpenSeqNum();
387 assertTrue(seqid
+ result
.size() < seqid2
);
388 final Result result1b
= region2
.get(g
);
389 assertEquals(result
.size(), result1b
.size());
391 // Next test. Add more edits, then 'crash' this region by stealing its wal
392 // out from under it and assert that replay of the log adds the edits back
393 // correctly when region is opened again.
394 for (ColumnFamilyDescriptor hcd
: td
.getColumnFamilies()) {
395 addRegionEdits(ROW
, hcd
.getName(), countPerFamily
, this.ee
, region2
, "y");
397 // Get count of edits.
398 final Result result2
= region2
.get(g
);
399 assertEquals(2 * result
.size(), result2
.size());
401 final Configuration newConf
= HBaseConfiguration
.create(this.conf
);
402 User user
= HBaseTestingUtility
.getDifferentUser(newConf
, td
.getTableName().getNameAsString());
403 user
.runAs(new PrivilegedExceptionAction
<Object
>() {
405 public Object
run() throws Exception
{
406 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(conf
), conf
, wals
);
407 FileSystem newFS
= FileSystem
.get(newConf
);
408 // Make a new wal for new region open.
409 WAL wal3
= createWAL(newConf
, rootDir
, logName
);
410 Path tableDir
= FSUtils
.getTableDir(rootDir
, td
.getTableName());
411 HRegion region3
= new HRegion(tableDir
, wal3
, newFS
, newConf
, ri
, td
, null);
412 long seqid3
= region3
.initialize();
413 Result result3
= region3
.get(g
);
414 // Assert that count of cells is same as before crash.
415 assertEquals(result2
.size(), result3
.size());
417 // I can't close wal1. Its been appropriated when we split.
426 * Test that we recover correctly when there is a failure in between the
427 * flushes. i.e. Some stores got flushed but others did not.
428 * Unfortunately, there is no easy hook to flush at a store level. The way
429 * we get around this is by flushing at the region level, and then deleting
430 * the recently flushed store file for one of the Stores. This would put us
431 * back in the situation where all but that store got flushed and the region
433 * We restart Region again, and verify that the edits were replayed.
436 public void testAfterPartialFlush()
437 throws IOException
, SecurityException
, IllegalArgumentException
{
438 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
439 TableDescriptor td
= pair
.getFirst();
440 RegionInfo ri
= pair
.getSecond();
442 // Write countPerFamily edits into the three families. Do a flush on one
443 // of the families during the load of edits so its seqid is not same as
444 // others to test we do right thing when different seqids.
445 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
446 HRegion region
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal
);
447 long seqid
= region
.getOpenSeqNum();
448 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
449 addRegionEdits(ROW
, cfd
.getName(), countPerFamily
, this.ee
, region
, "x");
452 // Now assert edits made it in.
453 final Get g
= new Get(ROW
);
454 Result result
= region
.get(g
);
455 assertEquals(countPerFamily
* td
.getColumnFamilies().length
, result
.size());
457 // Let us flush the region
462 // delete the store files in the second column family to simulate a failure
463 // in between the flushcache();
464 // we have 3 families. killing the middle one ensures that taking the maximum
465 // will make us fail.
467 for (ColumnFamilyDescriptor cfd
: td
.getColumnFamilies()) {
470 region
.getRegionFileSystem().deleteFamily(cfd
.getNameAsString());
474 // Let us try to split and recover
475 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
476 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
477 HRegion region2
= HRegion
.openHRegion(this.conf
, this.fs
, rootDir
, ri
, td
, wal2
);
478 long seqid2
= region2
.getOpenSeqNum();
479 assertTrue(seqid
+ result
.size() < seqid2
);
481 final Result result1b
= region2
.get(g
);
482 assertEquals(result
.size(), result1b
.size());
486 * Test that we could recover the data correctly after aborting flush. In the
487 * test, first we abort flush after writing some data, then writing more data
488 * and flush again, at last verify the data.
491 public void testAfterAbortingFlush() throws IOException
{
492 Pair
<TableDescriptor
, RegionInfo
> pair
= setupTableAndRegion();
493 TableDescriptor td
= pair
.getFirst();
494 RegionInfo ri
= pair
.getSecond();
496 // Write countPerFamily edits into the three families. Do a flush on one
497 // of the families during the load of edits so its seqid is not same as
498 // others to test we do right thing when different seqids.
499 WAL wal
= createWAL(this.conf
, rootDir
, logName
);
500 RegionServerServices rsServices
= Mockito
.mock(RegionServerServices
.class);
501 Mockito
.doReturn(false).when(rsServices
).isAborted();
502 when(rsServices
.getServerName()).thenReturn(ServerName
.valueOf("foo", 10, 10));
503 when(rsServices
.getConfiguration()).thenReturn(conf
);
504 Configuration customConf
= new Configuration(this.conf
);
505 customConf
.set(DefaultStoreEngine
.DEFAULT_STORE_FLUSHER_CLASS_KEY
,
506 AbstractTestWALReplay
.CustomStoreFlusher
.class.getName());
507 HRegion region
= HRegion
.openHRegion(this.rootDir
, ri
, td
, wal
, customConf
, rsServices
, null);
508 int writtenRowCount
= 10;
509 List
<ColumnFamilyDescriptor
> families
= Arrays
.asList(td
.getColumnFamilies());
510 for (int i
= 0; i
< writtenRowCount
; i
++) {
511 Put put
= new Put(Bytes
.toBytes(td
.getTableName() + Integer
.toString(i
)));
512 put
.addColumn(families
.get(i
% families
.size()).getName(), Bytes
.toBytes("q"),
513 Bytes
.toBytes("val"));
517 // Now assert edits made it in.
518 RegionScanner scanner
= region
.getScanner(new Scan());
519 assertEquals(writtenRowCount
, getScannedCount(scanner
));
521 // Let us flush the region
522 AbstractTestWALReplay
.CustomStoreFlusher
.throwExceptionWhenFlushing
.set(true);
525 fail("Injected exception hasn't been thrown");
526 } catch (IOException e
) {
527 LOG
.info("Expected simulated exception when flushing region, {}", e
.getMessage());
528 // simulated to abort server
529 Mockito
.doReturn(true).when(rsServices
).isAborted();
530 region
.setClosing(false); // region normally does not accept writes after
531 // DroppedSnapshotException. We mock around it for this test.
535 for (int i
= writtenRowCount
; i
< writtenRowCount
+ moreRow
; i
++) {
536 Put put
= new Put(Bytes
.toBytes(td
.getTableName() + Integer
.toString(i
)));
537 put
.addColumn(families
.get(i
% families
.size()).getName(), Bytes
.toBytes("q"),
538 Bytes
.toBytes("val"));
541 writtenRowCount
+= moreRow
;
543 AbstractTestWALReplay
.CustomStoreFlusher
.throwExceptionWhenFlushing
.set(false);
546 } catch (IOException t
) {
548 "Expected exception when flushing region because server is stopped," + t
.getMessage());
554 // Let us try to split and recover
555 WALSplitter
.split(rootDir
, logDir
, oldLogDir
, FileSystem
.get(this.conf
), this.conf
, wals
);
556 WAL wal2
= createWAL(this.conf
, rootDir
, logName
);
557 Mockito
.doReturn(false).when(rsServices
).isAborted();
558 HRegion region2
= HRegion
.openHRegion(this.rootDir
, ri
, td
, wal2
, this.conf
, rsServices
, null);
559 scanner
= region2
.getScanner(new Scan());
560 assertEquals(writtenRowCount
, getScannedCount(scanner
));
563 private int getScannedCount(RegionScanner scanner
) throws IOException
{
564 int scannedCount
= 0;
565 List
<Cell
> results
= new ArrayList
<>();
567 boolean existMore
= scanner
.next(results
);
568 if (!results
.isEmpty()) {
579 private void writeCorruptRecoveredHFile(Path recoveredHFile
) throws Exception
{
580 // Read the recovered hfile
581 int fileSize
= (int) fs
.listStatus(recoveredHFile
)[0].getLen();
582 FSDataInputStream in
= fs
.open(recoveredHFile
);
583 byte[] fileContent
= new byte[fileSize
];
584 in
.readFully(0, fileContent
, 0, fileSize
);
587 // Write a corrupt hfile by append garbage
588 Path path
= new Path(recoveredHFile
.getParent(), recoveredHFile
.getName() + ".corrupt");
589 FSDataOutputStream out
;
590 out
= fs
.create(path
);
591 out
.write(fileContent
);
592 out
.write(Bytes
.toBytes("-----"));