HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / TestSyncReplicationWALProvider.java
blob8189cef081d4d53e2e51722ec2c64084889a9977
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import static org.hamcrest.CoreMatchers.instanceOf;
21 import static org.hamcrest.CoreMatchers.not;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertThat;
25 import java.io.IOException;
26 import java.util.Optional;
27 import java.util.function.BiPredicate;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseClassTestRule;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
33 import org.apache.hadoop.hbase.client.RegionInfo;
34 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
35 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
36 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
37 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
38 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
39 import org.apache.hadoop.hbase.replication.SyncReplicationState;
40 import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.Pair;
45 import org.apache.hadoop.hdfs.DistributedFileSystem;
46 import org.junit.AfterClass;
47 import org.junit.BeforeClass;
48 import org.junit.ClassRule;
49 import org.junit.Test;
50 import org.junit.experimental.categories.Category;
52 @Category({ RegionServerTests.class, MediumTests.class })
53 public class TestSyncReplicationWALProvider {
55 @ClassRule
56 public static final HBaseClassTestRule CLASS_RULE =
57 HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
59 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
61 private static String PEER_ID = "1";
63 private static String REMOTE_WAL_DIR = "/RemoteWAL";
65 private static TableName TABLE = TableName.valueOf("table");
67 private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
69 private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build();
71 private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
73 private static WALFactory FACTORY;
75 public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
77 @Override
78 public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
79 if (table != null && table.equals(TABLE)) {
80 return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
81 } else {
82 return Optional.empty();
86 @Override
87 public boolean checkState(TableName table,
88 BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
89 return false;
93 @BeforeClass
94 public static void setUpBeforeClass() throws Exception {
95 UTIL.startMiniDFSCluster(3);
96 FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
97 ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());
98 UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
101 @AfterClass
102 public static void tearDownAfterClass() throws IOException {
103 FACTORY.close();
104 UTIL.shutdownMiniDFSCluster();
107 private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
108 int recordCount = 100;
109 int columnCount = 10;
110 byte[] row = Bytes.toBytes("testRow");
111 long timestamp = System.currentTimeMillis();
112 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
113 ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp,
114 mvcc);
115 Path localFile = wal.getCurrentFileName();
116 Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
117 try (ProtobufLogReader reader =
118 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
119 ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
120 timestamp);
122 try (ProtobufLogReader reader =
123 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
124 ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
125 timestamp);
127 wal.rollWriter();
128 DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem();
129 UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
131 @Override
132 public boolean evaluate() throws Exception {
133 return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
136 @Override
137 public String explainFailure() throws Exception {
138 StringBuilder sb = new StringBuilder();
139 if (!dfs.isFileClosed(localFile)) {
140 sb.append(localFile + " has not been closed yet.");
142 if (!dfs.isFileClosed(remoteFile)) {
143 sb.append(remoteFile + " has not been closed yet.");
145 return sb.toString();
148 try (ProtobufLogReader reader =
149 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
150 ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
151 timestamp);
153 try (ProtobufLogReader reader =
154 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
155 ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
156 timestamp);
160 @Test
161 public void test() throws Exception {
162 WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
163 assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
164 DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
165 assertEquals(2, FACTORY.getWALs().size());
166 testReadWrite(wal);
167 SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider();
168 walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE,
169 SyncReplicationState.DOWNGRADE_ACTIVE, 1);
170 assertEquals(1, FACTORY.getWALs().size());