2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.wal
;
20 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
21 import static org
.hamcrest
.CoreMatchers
.not
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertThat
;
25 import java
.io
.IOException
;
26 import java
.util
.Optional
;
27 import java
.util
.function
.BiPredicate
;
28 import org
.apache
.hadoop
.fs
.Path
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.Waiter
.ExplainingPredicate
;
33 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
34 import org
.apache
.hadoop
.hbase
.client
.RegionInfoBuilder
;
35 import org
.apache
.hadoop
.hbase
.regionserver
.MultiVersionConcurrencyControl
;
36 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.DualAsyncFSWAL
;
37 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogReader
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.ProtobufLogTestHelper
;
39 import org
.apache
.hadoop
.hbase
.replication
.SyncReplicationState
;
40 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.SyncReplicationPeerInfoProvider
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
43 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
44 import org
.apache
.hadoop
.hbase
.util
.Pair
;
45 import org
.apache
.hadoop
.hdfs
.DistributedFileSystem
;
46 import org
.junit
.AfterClass
;
47 import org
.junit
.BeforeClass
;
48 import org
.junit
.ClassRule
;
49 import org
.junit
.Test
;
50 import org
.junit
.experimental
.categories
.Category
;
52 @Category({ RegionServerTests
.class, MediumTests
.class })
53 public class TestSyncReplicationWALProvider
{
56 public static final HBaseClassTestRule CLASS_RULE
=
57 HBaseClassTestRule
.forClass(TestSyncReplicationWALProvider
.class);
59 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
61 private static String PEER_ID
= "1";
63 private static String REMOTE_WAL_DIR
= "/RemoteWAL";
65 private static TableName TABLE
= TableName
.valueOf("table");
67 private static TableName TABLE_NO_REP
= TableName
.valueOf("table-no-rep");
69 private static RegionInfo REGION
= RegionInfoBuilder
.newBuilder(TABLE
).build();
71 private static RegionInfo REGION_NO_REP
= RegionInfoBuilder
.newBuilder(TABLE_NO_REP
).build();
73 private static WALFactory FACTORY
;
75 public static final class InfoProvider
implements SyncReplicationPeerInfoProvider
{
78 public Optional
<Pair
<String
, String
>> getPeerIdAndRemoteWALDir(TableName table
) {
79 if (table
!= null && table
.equals(TABLE
)) {
80 return Optional
.of(Pair
.newPair(PEER_ID
, REMOTE_WAL_DIR
));
82 return Optional
.empty();
87 public boolean checkState(TableName table
,
88 BiPredicate
<SyncReplicationState
, SyncReplicationState
> checker
) {
94 public static void setUpBeforeClass() throws Exception
{
95 UTIL
.startMiniDFSCluster(3);
96 FACTORY
= new WALFactory(UTIL
.getConfiguration(), "test");
97 ((SyncReplicationWALProvider
) FACTORY
.getWALProvider()).setPeerInfoProvider(new InfoProvider());
98 UTIL
.getTestFileSystem().mkdirs(new Path(REMOTE_WAL_DIR
, PEER_ID
));
102 public static void tearDownAfterClass() throws IOException
{
104 UTIL
.shutdownMiniDFSCluster();
107 private void testReadWrite(DualAsyncFSWAL wal
) throws Exception
{
108 int recordCount
= 100;
109 int columnCount
= 10;
110 byte[] row
= Bytes
.toBytes("testRow");
111 long timestamp
= System
.currentTimeMillis();
112 MultiVersionConcurrencyControl mvcc
= new MultiVersionConcurrencyControl();
113 ProtobufLogTestHelper
.doWrite(wal
, REGION
, TABLE
, columnCount
, recordCount
, row
, timestamp
,
115 Path localFile
= wal
.getCurrentFileName();
116 Path remoteFile
= new Path(REMOTE_WAL_DIR
+ "/" + PEER_ID
, localFile
.getName());
117 try (ProtobufLogReader reader
=
118 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), localFile
)) {
119 ProtobufLogTestHelper
.doRead(reader
, false, REGION
, TABLE
, columnCount
, recordCount
, row
,
122 try (ProtobufLogReader reader
=
123 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), remoteFile
)) {
124 ProtobufLogTestHelper
.doRead(reader
, false, REGION
, TABLE
, columnCount
, recordCount
, row
,
128 DistributedFileSystem dfs
= (DistributedFileSystem
) UTIL
.getDFSCluster().getFileSystem();
129 UTIL
.waitFor(5000, new ExplainingPredicate
<Exception
>() {
132 public boolean evaluate() throws Exception
{
133 return dfs
.isFileClosed(localFile
) && dfs
.isFileClosed(remoteFile
);
137 public String
explainFailure() throws Exception
{
138 StringBuilder sb
= new StringBuilder();
139 if (!dfs
.isFileClosed(localFile
)) {
140 sb
.append(localFile
+ " has not been closed yet.");
142 if (!dfs
.isFileClosed(remoteFile
)) {
143 sb
.append(remoteFile
+ " has not been closed yet.");
145 return sb
.toString();
148 try (ProtobufLogReader reader
=
149 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), localFile
)) {
150 ProtobufLogTestHelper
.doRead(reader
, true, REGION
, TABLE
, columnCount
, recordCount
, row
,
153 try (ProtobufLogReader reader
=
154 (ProtobufLogReader
) FACTORY
.createReader(UTIL
.getTestFileSystem(), remoteFile
)) {
155 ProtobufLogTestHelper
.doRead(reader
, true, REGION
, TABLE
, columnCount
, recordCount
, row
,
161 public void test() throws Exception
{
162 WAL walNoRep
= FACTORY
.getWAL(REGION_NO_REP
);
163 assertThat(walNoRep
, not(instanceOf(DualAsyncFSWAL
.class)));
164 DualAsyncFSWAL wal
= (DualAsyncFSWAL
) FACTORY
.getWAL(REGION
);
165 assertEquals(2, FACTORY
.getWALs().size());
167 SyncReplicationWALProvider walProvider
= (SyncReplicationWALProvider
) FACTORY
.getWALProvider();
168 walProvider
.peerSyncReplicationStateChange(PEER_ID
, SyncReplicationState
.ACTIVE
,
169 SyncReplicationState
.DOWNGRADE_ACTIVE
, 1);
170 assertEquals(1, FACTORY
.getWALs().size());