HBASE-24037 Add ut for root dir and wal root dir are different (#1336)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / TestWALSplitToHFile.java
blob52df8134873bcc4984ceeefa61e7576ceffc70d3
1 /*
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 package org.apache.hadoop.hbase.wal;
21 import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
22 import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertNotNull;
25 import static org.junit.Assert.assertTrue;
26 import static org.junit.Assert.fail;
27 import static org.mockito.Mockito.when;
29 import java.io.IOException;
30 import java.security.PrivilegedExceptionAction;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataInputStream;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.Cell;
44 import org.apache.hadoop.hbase.HBaseClassTestRule;
45 import org.apache.hadoop.hbase.HBaseConfiguration;
46 import org.apache.hadoop.hbase.HBaseTestingUtility;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.ServerName;
49 import org.apache.hadoop.hbase.TableName;
50 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
51 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
52 import org.apache.hadoop.hbase.client.Get;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.client.RegionInfo;
55 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
56 import org.apache.hadoop.hbase.client.Result;
57 import org.apache.hadoop.hbase.client.Scan;
58 import org.apache.hadoop.hbase.client.TableDescriptor;
59 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
60 import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
61 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
62 import org.apache.hadoop.hbase.regionserver.HRegion;
63 import org.apache.hadoop.hbase.regionserver.RegionScanner;
64 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
65 import org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay;
66 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
67 import org.apache.hadoop.hbase.security.User;
68 import org.apache.hadoop.hbase.testclassification.MediumTests;
69 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
70 import org.apache.hadoop.hbase.util.Bytes;
71 import org.apache.hadoop.hbase.util.CommonFSUtils;
72 import org.apache.hadoop.hbase.util.EnvironmentEdge;
73 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
74 import org.apache.hadoop.hbase.util.FSTableDescriptors;
75 import org.apache.hadoop.hbase.util.FSUtils;
76 import org.apache.hadoop.hbase.util.Pair;
77 import org.junit.After;
78 import org.junit.AfterClass;
79 import org.junit.Before;
80 import org.junit.BeforeClass;
81 import org.junit.ClassRule;
82 import org.junit.Rule;
83 import org.junit.Test;
84 import org.junit.experimental.categories.Category;
85 import org.junit.rules.TestName;
86 import org.mockito.Mockito;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
90 @Category({ RegionServerTests.class, MediumTests.class })
91 public class TestWALSplitToHFile {
92 @ClassRule
93 public static final HBaseClassTestRule CLASS_RULE =
94 HBaseClassTestRule.forClass(TestWALSplitToHFile.class);
96 private static final Logger LOG = LoggerFactory.getLogger(AbstractTestWALReplay.class);
97 static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
98 private final EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
99 private Path rootDir = null;
100 private String logName;
101 private Path oldLogDir;
102 private Path logDir;
103 private FileSystem fs;
104 private Configuration conf;
105 private WALFactory wals;
107 private static final byte[] ROW = Bytes.toBytes("row");
108 private static final byte[] QUALIFIER = Bytes.toBytes("q");
109 private static final byte[] VALUE1 = Bytes.toBytes("value1");
110 private static final byte[] VALUE2 = Bytes.toBytes("value2");
111 private static final int countPerFamily = 10;
113 @Rule
114 public final TestName TEST_NAME = new TestName();
116 @BeforeClass
117 public static void setUpBeforeClass() throws Exception {
118 Configuration conf = UTIL.getConfiguration();
119 conf.setBoolean(WAL_SPLIT_TO_HFILE, true);
120 UTIL.startMiniCluster(3);
121 Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase"));
122 LOG.info("hbase.rootdir=" + hbaseRootDir);
123 FSUtils.setRootDir(conf, hbaseRootDir);
126 @AfterClass
127 public static void tearDownAfterClass() throws Exception {
128 UTIL.shutdownMiniCluster();
131 @Before
132 public void setUp() throws Exception {
133 this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
134 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
135 this.fs = UTIL.getDFSCluster().getFileSystem();
136 this.rootDir = FSUtils.getRootDir(this.conf);
137 this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
138 String serverName =
139 ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
140 .toString();
141 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
142 this.logDir = new Path(this.rootDir, logName);
143 if (UTIL.getDFSCluster().getFileSystem().exists(this.rootDir)) {
144 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
146 this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
149 @After
150 public void tearDown() throws Exception {
151 this.wals.close();
152 UTIL.getDFSCluster().getFileSystem().delete(this.rootDir, true);
156 * @param p Directory to cleanup
158 private void deleteDir(final Path p) throws IOException {
159 if (this.fs.exists(p)) {
160 if (!this.fs.delete(p, true)) {
161 throw new IOException("Failed remove of " + p);
166 private TableDescriptor createBasic3FamilyTD(final TableName tableName) throws IOException {
167 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
168 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("a")).build());
169 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("b")).build());
170 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("c")).build());
171 TableDescriptor td = builder.build();
172 UTIL.getAdmin().createTable(td);
173 return td;
176 private WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException {
177 FSHLog wal = new FSHLog(FileSystem.get(c), hbaseRootDir, logName, c);
178 wal.init();
179 return wal;
182 private WAL createWAL(FileSystem fs, Path hbaseRootDir, String logName) throws IOException {
183 FSHLog wal = new FSHLog(fs, hbaseRootDir, logName, this.conf);
184 wal.init();
185 return wal;
188 private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
189 final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
190 final TableDescriptor td = createBasic3FamilyTD(tableName);
191 final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
192 final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
193 deleteDir(tableDir);
194 FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
195 HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
196 HBaseTestingUtility.closeRegionAndWAL(region);
197 return new Pair<>(td, ri);
200 private void writeData(TableDescriptor td, HRegion region) throws IOException {
201 final long timestamp = this.ee.currentTime();
202 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
203 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
207 @Test
208 public void testDifferentRootDirAndWALRootDir() throws Exception {
209 // Change wal root dir and reset the configuration
210 Path walRootDir = UTIL.createWALRootDir();
211 this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
213 FileSystem walFs = FSUtils.getWALFileSystem(this.conf);
214 this.oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
215 String serverName =
216 ServerName.valueOf(TEST_NAME.getMethodName() + "-manual", 16010, System.currentTimeMillis())
217 .toString();
218 this.logName = AbstractFSWALProvider.getWALDirectoryName(serverName);
219 this.logDir = new Path(walRootDir, logName);
220 this.wals = new WALFactory(conf, TEST_NAME.getMethodName());
222 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
223 TableDescriptor td = pair.getFirst();
224 RegionInfo ri = pair.getSecond();
226 WAL wal = createWAL(walFs, walRootDir, logName);
227 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
228 writeData(td, region);
230 // Now close the region without flush
231 region.close(true);
232 wal.shutdown();
233 // split the log
234 WALSplitter.split(walRootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
236 WAL wal2 = createWAL(walFs, walRootDir, logName);
237 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
238 Result result2 = region2.get(new Get(ROW));
239 assertEquals(td.getColumnFamilies().length, result2.size());
240 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
241 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
245 @Test
246 public void testCorruptRecoveredHFile() throws Exception {
247 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
248 TableDescriptor td = pair.getFirst();
249 RegionInfo ri = pair.getSecond();
251 WAL wal = createWAL(this.conf, rootDir, logName);
252 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
253 writeData(td, region);
255 // Now close the region without flush
256 region.close(true);
257 wal.shutdown();
258 // split the log
259 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
261 // Write a corrupt recovered hfile
262 Path regionDir =
263 new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
264 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
265 FileStatus[] files =
266 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
267 assertNotNull(files);
268 assertTrue(files.length > 0);
269 writeCorruptRecoveredHFile(files[0].getPath());
272 // Failed to reopen the region
273 WAL wal2 = createWAL(this.conf, rootDir, logName);
274 try {
275 HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
276 fail("Should fail to open region");
277 } catch (CorruptHFileException che) {
278 // Expected
281 // Set skip errors to true and reopen the region
282 this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
283 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
284 Result result2 = region2.get(new Get(ROW));
285 assertEquals(td.getColumnFamilies().length, result2.size());
286 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
287 assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), QUALIFIER)));
288 // Assert the corrupt file was skipped and still exist
289 FileStatus[] files =
290 WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
291 assertNotNull(files);
292 assertEquals(1, files.length);
293 assertTrue(files[0].getPath().getName().contains("corrupt"));
297 @Test
298 public void testPutWithSameTimestamp() throws Exception {
299 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
300 TableDescriptor td = pair.getFirst();
301 RegionInfo ri = pair.getSecond();
303 WAL wal = createWAL(this.conf, rootDir, logName);
304 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
305 final long timestamp = this.ee.currentTime();
306 // Write data and flush
307 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
308 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE1));
310 region.flush(true);
312 // Write data with same timestamp and do not flush
313 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
314 region.put(new Put(ROW).addColumn(cfd.getName(), QUALIFIER, timestamp, VALUE2));
316 // Now close the region without flush
317 region.close(true);
318 wal.shutdown();
319 // split the log
320 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
322 // reopen the region
323 WAL wal2 = createWAL(this.conf, rootDir, logName);
324 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
325 Result result2 = region2.get(new Get(ROW));
326 assertEquals(td.getColumnFamilies().length, result2.size());
327 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
328 assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), QUALIFIER)));
332 @Test
333 public void testRecoverSequenceId() throws Exception {
334 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
335 TableDescriptor td = pair.getFirst();
336 RegionInfo ri = pair.getSecond();
338 WAL wal = createWAL(this.conf, rootDir, logName);
339 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
340 Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
341 // Write data and do not flush
342 for (int i = 0; i < countPerFamily; i++) {
343 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
344 region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), QUALIFIER, VALUE1));
345 Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
346 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
347 List<Cell> cells = result.listCells();
348 assertEquals(1, cells.size());
349 seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
350 cells.get(0).getSequenceId());
354 // Now close the region without flush
355 region.close(true);
356 wal.shutdown();
357 // split the log
358 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
360 // reopen the region
361 WAL wal2 = createWAL(this.conf, rootDir, logName);
362 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
363 // assert the seqid was recovered
364 for (int i = 0; i < countPerFamily; i++) {
365 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
366 Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
367 assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), QUALIFIER)));
368 List<Cell> cells = result.listCells();
369 assertEquals(1, cells.size());
370 assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()),
371 cells.get(0).getSequenceId());
377 * Test writing edits into an HRegion, closing it, splitting logs, opening
378 * Region again. Verify seqids.
380 @Test
381 public void testWrittenViaHRegion()
382 throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
383 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
384 TableDescriptor td = pair.getFirst();
385 RegionInfo ri = pair.getSecond();
387 // Write countPerFamily edits into the three families. Do a flush on one
388 // of the families during the load of edits so its seqid is not same as
389 // others to test we do right thing when different seqids.
390 WAL wal = createWAL(this.conf, rootDir, logName);
391 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
392 long seqid = region.getOpenSeqNum();
393 boolean first = true;
394 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
395 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
396 if (first) {
397 // If first, so we have at least one family w/ different seqid to rest.
398 region.flush(true);
399 first = false;
402 // Now assert edits made it in.
403 final Get g = new Get(ROW);
404 Result result = region.get(g);
405 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
406 // Now close the region (without flush), split the log, reopen the region and assert that
407 // replay of log has the correct effect, that our seqids are calculated correctly so
408 // all edits in logs are seen as 'stale'/old.
409 region.close(true);
410 wal.shutdown();
411 try {
412 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
413 } catch (Exception e) {
414 LOG.debug("Got exception", e);
417 WAL wal2 = createWAL(this.conf, rootDir, logName);
418 HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
419 long seqid2 = region2.getOpenSeqNum();
420 assertTrue(seqid + result.size() < seqid2);
421 final Result result1b = region2.get(g);
422 assertEquals(result.size(), result1b.size());
424 // Next test. Add more edits, then 'crash' this region by stealing its wal
425 // out from under it and assert that replay of the log adds the edits back
426 // correctly when region is opened again.
427 for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
428 addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
430 // Get count of edits.
431 final Result result2 = region2.get(g);
432 assertEquals(2 * result.size(), result2.size());
433 wal2.sync();
434 final Configuration newConf = HBaseConfiguration.create(this.conf);
435 User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString());
436 user.runAs(new PrivilegedExceptionAction<Object>() {
437 @Override
438 public Object run() throws Exception {
439 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(conf), conf, wals);
440 FileSystem newFS = FileSystem.get(newConf);
441 // Make a new wal for new region open.
442 WAL wal3 = createWAL(newConf, rootDir, logName);
443 Path tableDir = FSUtils.getTableDir(rootDir, td.getTableName());
444 HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
445 long seqid3 = region3.initialize();
446 Result result3 = region3.get(g);
447 // Assert that count of cells is same as before crash.
448 assertEquals(result2.size(), result3.size());
450 // I can't close wal1. Its been appropriated when we split.
451 region3.close();
452 wal3.close();
453 return null;
459 * Test that we recover correctly when there is a failure in between the
460 * flushes. i.e. Some stores got flushed but others did not.
461 * Unfortunately, there is no easy hook to flush at a store level. The way
462 * we get around this is by flushing at the region level, and then deleting
463 * the recently flushed store file for one of the Stores. This would put us
464 * back in the situation where all but that store got flushed and the region
465 * died.
466 * We restart Region again, and verify that the edits were replayed.
468 @Test
469 public void testAfterPartialFlush()
470 throws IOException, SecurityException, IllegalArgumentException {
471 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
472 TableDescriptor td = pair.getFirst();
473 RegionInfo ri = pair.getSecond();
475 // Write countPerFamily edits into the three families. Do a flush on one
476 // of the families during the load of edits so its seqid is not same as
477 // others to test we do right thing when different seqids.
478 WAL wal = createWAL(this.conf, rootDir, logName);
479 HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
480 long seqid = region.getOpenSeqNum();
481 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
482 addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
485 // Now assert edits made it in.
486 final Get g = new Get(ROW);
487 Result result = region.get(g);
488 assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
490 // Let us flush the region
491 region.flush(true);
492 region.close(true);
493 wal.shutdown();
495 // delete the store files in the second column family to simulate a failure
496 // in between the flushcache();
497 // we have 3 families. killing the middle one ensures that taking the maximum
498 // will make us fail.
499 int cf_count = 0;
500 for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
501 cf_count++;
502 if (cf_count == 2) {
503 region.getRegionFileSystem().deleteFamily(cfd.getNameAsString());
507 // Let us try to split and recover
508 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
509 WAL wal2 = createWAL(this.conf, rootDir, logName);
510 HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
511 long seqid2 = region2.getOpenSeqNum();
512 assertTrue(seqid + result.size() < seqid2);
514 final Result result1b = region2.get(g);
515 assertEquals(result.size(), result1b.size());
519 * Test that we could recover the data correctly after aborting flush. In the
520 * test, first we abort flush after writing some data, then writing more data
521 * and flush again, at last verify the data.
523 @Test
524 public void testAfterAbortingFlush() throws IOException {
525 Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
526 TableDescriptor td = pair.getFirst();
527 RegionInfo ri = pair.getSecond();
529 // Write countPerFamily edits into the three families. Do a flush on one
530 // of the families during the load of edits so its seqid is not same as
531 // others to test we do right thing when different seqids.
532 WAL wal = createWAL(this.conf, rootDir, logName);
533 RegionServerServices rsServices = Mockito.mock(RegionServerServices.class);
534 Mockito.doReturn(false).when(rsServices).isAborted();
535 when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10));
536 when(rsServices.getConfiguration()).thenReturn(conf);
537 Configuration customConf = new Configuration(this.conf);
538 customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
539 AbstractTestWALReplay.CustomStoreFlusher.class.getName());
540 HRegion region = HRegion.openHRegion(this.rootDir, ri, td, wal, customConf, rsServices, null);
541 int writtenRowCount = 10;
542 List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
543 for (int i = 0; i < writtenRowCount; i++) {
544 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
545 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
546 Bytes.toBytes("val"));
547 region.put(put);
550 // Now assert edits made it in.
551 RegionScanner scanner = region.getScanner(new Scan());
552 assertEquals(writtenRowCount, getScannedCount(scanner));
554 // Let us flush the region
555 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(true);
556 try {
557 region.flush(true);
558 fail("Injected exception hasn't been thrown");
559 } catch (IOException e) {
560 LOG.info("Expected simulated exception when flushing region, {}", e.getMessage());
561 // simulated to abort server
562 Mockito.doReturn(true).when(rsServices).isAborted();
563 region.setClosing(false); // region normally does not accept writes after
564 // DroppedSnapshotException. We mock around it for this test.
566 // writing more data
567 int moreRow = 10;
568 for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
569 Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
570 put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
571 Bytes.toBytes("val"));
572 region.put(put);
574 writtenRowCount += moreRow;
575 // call flush again
576 AbstractTestWALReplay.CustomStoreFlusher.throwExceptionWhenFlushing.set(false);
577 try {
578 region.flush(true);
579 } catch (IOException t) {
580 LOG.info(
581 "Expected exception when flushing region because server is stopped," + t.getMessage());
584 region.close(true);
585 wal.shutdown();
587 // Let us try to split and recover
588 WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
589 WAL wal2 = createWAL(this.conf, rootDir, logName);
590 Mockito.doReturn(false).when(rsServices).isAborted();
591 HRegion region2 = HRegion.openHRegion(this.rootDir, ri, td, wal2, this.conf, rsServices, null);
592 scanner = region2.getScanner(new Scan());
593 assertEquals(writtenRowCount, getScannedCount(scanner));
596 private int getScannedCount(RegionScanner scanner) throws IOException {
597 int scannedCount = 0;
598 List<Cell> results = new ArrayList<>();
599 while (true) {
600 boolean existMore = scanner.next(results);
601 if (!results.isEmpty()) {
602 scannedCount++;
604 if (!existMore) {
605 break;
607 results.clear();
609 return scannedCount;
612 private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
613 // Read the recovered hfile
614 int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
615 FSDataInputStream in = fs.open(recoveredHFile);
616 byte[] fileContent = new byte[fileSize];
617 in.readFully(0, fileContent, 0, fileSize);
618 in.close();
620 // Write a corrupt hfile by append garbage
621 Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
622 FSDataOutputStream out;
623 out = fs.create(path);
624 out.write(fileContent);
625 out.write(Bytes.toBytes("-----"));
626 out.close();