HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / wal / TestSyncReplicationWALProvider.java
blobd9493feace4ca1314c9fd3109a7d8fc350132f64
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.wal;
20 import static org.hamcrest.CoreMatchers.instanceOf;
21 import static org.hamcrest.CoreMatchers.not;
22 import static org.hamcrest.MatcherAssert.assertThat;
23 import static org.junit.Assert.assertEquals;
25 import java.io.IOException;
26 import java.util.Optional;
27 import java.util.function.BiPredicate;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.HBaseClassTestRule;
30 import org.apache.hadoop.hbase.HBaseTestingUtil;
31 import org.apache.hadoop.hbase.TableName;
32 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
33 import org.apache.hadoop.hbase.client.RegionInfo;
34 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
35 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
36 import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
37 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
38 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogTestHelper;
39 import org.apache.hadoop.hbase.replication.SyncReplicationState;
40 import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
41 import org.apache.hadoop.hbase.testclassification.MediumTests;
42 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
45 import org.apache.hadoop.hbase.util.Pair;
46 import org.apache.hadoop.hdfs.DistributedFileSystem;
47 import org.junit.AfterClass;
48 import org.junit.BeforeClass;
49 import org.junit.ClassRule;
50 import org.junit.Test;
51 import org.junit.experimental.categories.Category;
53 @Category({ RegionServerTests.class, MediumTests.class })
54 public class TestSyncReplicationWALProvider {
56 @ClassRule
57 public static final HBaseClassTestRule CLASS_RULE =
58 HBaseClassTestRule.forClass(TestSyncReplicationWALProvider.class);
60 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
62 private static String PEER_ID = "1";
64 private static String REMOTE_WAL_DIR = "/RemoteWAL";
66 private static TableName TABLE = TableName.valueOf("table");
68 private static TableName TABLE_NO_REP = TableName.valueOf("table-no-rep");
70 private static RegionInfo REGION = RegionInfoBuilder.newBuilder(TABLE).build();
72 private static RegionInfo REGION_NO_REP = RegionInfoBuilder.newBuilder(TABLE_NO_REP).build();
74 private static WALFactory FACTORY;
76 public static final class InfoProvider implements SyncReplicationPeerInfoProvider {
78 @Override
79 public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName table) {
80 if (table != null && table.equals(TABLE)) {
81 return Optional.of(Pair.newPair(PEER_ID, REMOTE_WAL_DIR));
82 } else {
83 return Optional.empty();
87 @Override
88 public boolean checkState(TableName table,
89 BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
90 return false;
94 @BeforeClass
95 public static void setUpBeforeClass() throws Exception {
96 UTIL.startMiniDFSCluster(3);
97 FACTORY = new WALFactory(UTIL.getConfiguration(), "test");
98 ((SyncReplicationWALProvider) FACTORY.getWALProvider()).setPeerInfoProvider(new InfoProvider());
99 UTIL.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR, PEER_ID));
102 @AfterClass
103 public static void tearDownAfterClass() throws IOException {
104 FACTORY.close();
105 UTIL.shutdownMiniDFSCluster();
108 private void testReadWrite(DualAsyncFSWAL wal) throws Exception {
109 int recordCount = 100;
110 int columnCount = 10;
111 byte[] row = Bytes.toBytes("testRow");
112 long timestamp = EnvironmentEdgeManager.currentTime();
113 MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
114 ProtobufLogTestHelper.doWrite(wal, REGION, TABLE, columnCount, recordCount, row, timestamp,
115 mvcc);
116 Path localFile = wal.getCurrentFileName();
117 Path remoteFile = new Path(REMOTE_WAL_DIR + "/" + PEER_ID, localFile.getName());
118 try (ProtobufLogReader reader =
119 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
120 ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
121 timestamp);
123 try (ProtobufLogReader reader =
124 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
125 ProtobufLogTestHelper.doRead(reader, false, REGION, TABLE, columnCount, recordCount, row,
126 timestamp);
128 wal.rollWriter();
129 DistributedFileSystem dfs = (DistributedFileSystem) UTIL.getDFSCluster().getFileSystem();
130 UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
132 @Override
133 public boolean evaluate() throws Exception {
134 return dfs.isFileClosed(localFile) && dfs.isFileClosed(remoteFile);
137 @Override
138 public String explainFailure() throws Exception {
139 StringBuilder sb = new StringBuilder();
140 if (!dfs.isFileClosed(localFile)) {
141 sb.append(localFile + " has not been closed yet.");
143 if (!dfs.isFileClosed(remoteFile)) {
144 sb.append(remoteFile + " has not been closed yet.");
146 return sb.toString();
149 try (ProtobufLogReader reader =
150 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), localFile)) {
151 ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
152 timestamp);
154 try (ProtobufLogReader reader =
155 (ProtobufLogReader) FACTORY.createReader(UTIL.getTestFileSystem(), remoteFile)) {
156 ProtobufLogTestHelper.doRead(reader, true, REGION, TABLE, columnCount, recordCount, row,
157 timestamp);
161 @Test
162 public void test() throws Exception {
163 WAL walNoRep = FACTORY.getWAL(REGION_NO_REP);
164 assertThat(walNoRep, not(instanceOf(DualAsyncFSWAL.class)));
165 DualAsyncFSWAL wal = (DualAsyncFSWAL) FACTORY.getWAL(REGION);
166 assertEquals(2, FACTORY.getWALs().size());
167 testReadWrite(wal);
168 SyncReplicationWALProvider walProvider = (SyncReplicationWALProvider) FACTORY.getWALProvider();
169 walProvider.peerSyncReplicationStateChange(PEER_ID, SyncReplicationState.ACTIVE,
170 SyncReplicationState.DOWNGRADE_ACTIVE, 1);
171 assertEquals(1, FACTORY.getWALs().size());