2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.hamcrest
.CoreMatchers
.not
;
22 import static org
.hamcrest
.MatcherAssert
.assertThat
;
23 import static org
.junit
.Assert
.assertEquals
;
25 import java
.io
.IOException
;
26 import java
.util
.Optional
;
27 import java
.util
.function
.BiPredicate
;
28 import org
.apache
.hadoop
.fs
.Path
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.Waiter
.ExplainingPredicate
;
33 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
34 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
35 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
36 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.DualAsyncFSWAL
;
37 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogReader
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogTestHelper
;
39 import org
.apache
.hadoop
.hbase
.replication
.SyncReplicationState
;
40 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.SyncReplicationPeerInfoProvider
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
43 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
44 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
45 import org
.apache
.hadoop
.hbase
.util
.Pair
;
46 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
47 import org
.junit
.AfterClass
;
48 import org
.junit
.BeforeClass
;
49 import org
.junit
.ClassRule
;
50 import org
.junit
.Test
;
51 import org
.junit
.experimental
.categories
.Category
;
53 @Category({ RegionServerTests
.class, MediumTests
.class })
54 public class TestSyncReplicationWALProvider
{
57 public static final HBaseClassTestRule CLASS_RULE
=
58 HBaseClassTestRule
.forClass(TestSyncReplicationWALProvider
.class);
60 private static final HBaseTestingUtil UTIL
= new HBaseTestingUtil();
62 private static String PEER_ID
= "1";
64 private static String REMOTE_WAL_DIR
= "/RemoteWAL";
66 private static TableName TABLE
= TableName
.valueOf("table");
68 private static TableName TABLE_NO_REP
= TableName
.valueOf("table-no-rep");
70 private static RegionInfo REGION
= RegionInfoBuilder
.newBuilder(TABLE
).build();
72 private static RegionInfo REGION_NO_REP
= RegionInfoBuilder
.newBuilder(TABLE_NO_REP
).build();
74 private static WALFactory FACTORY
;
76 public static final class InfoProvider
implements SyncReplicationPeerInfoProvider
{
79 public Optional
<Pair
<String
, String
>> getPeerIdAndRemoteWALDir(TableName table
) {
80 if (table
!= null && table
.equals(TABLE
)) {
81 return Optional
.of(Pair
.newPair(PEER_ID
, REMOTE_WAL_DIR
));
83 return Optional
.empty();
88 public boolean checkState(TableName table
,
89 BiPredicate
<SyncReplicationState
, SyncReplicationState
> checker
) {
95 public static void setUpBeforeClass() throws Exception
{
96 UTIL
.startMiniDFSCluster(3);
97 FACTORY
= new WALFactory(UTIL
.getConfiguration(), "test");
98 ((SyncReplicationWALProvider
) FACTORY
.getWALProvider()).setPeerInfoProvider(new InfoProvider());
99 UTIL
.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR
, PEER_ID
));
103 public static void tearDownAfterClass() throws IOException
{
105 UTIL
.shutdownMiniDFSCluster();
108 private void testReadWrite(DualAsyncFSWAL wal
) throws Exception
{
109 int recordCount
= 100;
110 int columnCount
= 10;
111 byte[] row
= Bytes
.toBytes("testRow");
112 long timestamp
= EnvironmentEdgeManager
.currentTime();
113 MultiVersionConcurrencyControl mvcc
= new MultiVersionConcurrencyControl();
114 ProtobufLogTestHelper
.doWrite(wal
, REGION
, TABLE
, columnCount
, recordCount
, row
, timestamp
,
116 Path localFile
= wal
.getCurrentFileName();
117 Path remoteFile
= new Path(REMOTE_WAL_DIR
+ "/" + PEER_ID
, localFile
.getName());
118 try (ProtobufLogReader reader
=
119 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), localFile
)) {
120 ProtobufLogTestHelper
.doRead(reader
, false, REGION
, TABLE
, columnCount
, recordCount
, row
,
123 try (ProtobufLogReader reader
=
124 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), remoteFile
)) {
125 ProtobufLogTestHelper
.doRead(reader
, false, REGION
, TABLE
, columnCount
, recordCount
, row
,
129 DistributedFileSystem dfs
= (DistributedFileSystem
) UTIL
.getDFSCluster().getFileSystem();
130 UTIL
.waitFor(5000, new ExplainingPredicate
<Exception
>() {
133 public boolean evaluate() throws Exception
{
134 return dfs
.isFileClosed(localFile
) && dfs
.isFileClosed(remoteFile
);
138 public String
explainFailure() throws Exception
{
139 StringBuilder sb
= new StringBuilder();
140 if (!dfs
.isFileClosed(localFile
)) {
141 sb
.append(localFile
+ " has not been closed yet.");
143 if (!dfs
.isFileClosed(remoteFile
)) {
144 sb
.append(remoteFile
+ " has not been closed yet.");
146 return sb
.toString();
149 try (ProtobufLogReader reader
=
150 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), localFile
)) {
151 ProtobufLogTestHelper
.doRead(reader
, true, REGION
, TABLE
, columnCount
, recordCount
, row
,
154 try (ProtobufLogReader reader
=
155 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), remoteFile
)) {
156 ProtobufLogTestHelper
.doRead(reader
, true, REGION
, TABLE
, columnCount
, recordCount
, row
,
162 public void test() throws Exception
{
163 WAL walNoRep
= FACTORY
.getWAL(REGION_NO_REP
);
164 assertThat(walNoRep
, not(instanceOf(DualAsyncFSWAL
.class)));
165 DualAsyncFSWAL wal
= (DualAsyncFSWAL
) FACTORY
.getWAL(REGION
);
166 assertEquals(2, FACTORY
.getWALs().size());
168 SyncReplicationWALProvider walProvider
= (SyncReplicationWALProvider
) FACTORY
.getWALProvider();
169 walProvider
.peerSyncReplicationStateChange(PEER_ID
, SyncReplicationState
.ACTIVE
,
170 SyncReplicationState
.DOWNGRADE_ACTIVE
, 1);
171 assertEquals(1, FACTORY
.getWALs().size());