2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayList
;
22 import java
.util
.List
;
23 import org
.apache
.hadoop
.fs
.Path
;
24 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
25 import org
.apache
.hadoop
.hbase
.Waiter
;
26 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
27 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
28 import org
.apache
.hadoop
.hbase
.regionserver
.wal
.AbstractFSWAL
;
29 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.Replication
;
30 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSource
;
31 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSourceInterface
;
32 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
34 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
35 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
36 import org
.junit
.Before
;
37 import org
.junit
.ClassRule
;
38 import org
.junit
.Test
;
39 import org
.junit
.experimental
.categories
.Category
;
41 @Category({ ReplicationTests
.class, LargeTests
.class })
42 public class TestReplicationEmptyWALRecovery
extends TestReplicationBase
{
45 public static final HBaseClassTestRule CLASS_RULE
=
46 HBaseClassTestRule
.forClass(TestReplicationEmptyWALRecovery
.class);
49 public void setUp() throws IOException
, InterruptedException
{
54 * Waits until there is only one log(the current writing one) in the replication queue
55 * @param numRs number of regionservers
57 private void waitForLogAdvance(int numRs
) throws Exception
{
58 Waiter
.waitFor(CONF1
, 10000, new Waiter
.Predicate
<Exception
>() {
60 public boolean evaluate() throws Exception
{
61 for (int i
= 0; i
< numRs
; i
++) {
62 HRegionServer hrs
= UTIL1
.getHBaseCluster().getRegionServer(i
);
63 RegionInfo regionInfo
=
64 UTIL1
.getHBaseCluster().getRegions(htable1
.getName()).get(0).getRegionInfo();
65 WAL wal
= hrs
.getWAL(regionInfo
);
66 Path currentFile
= ((AbstractFSWAL
<?
>) wal
).getCurrentFileName();
67 Replication replicationService
= (Replication
) UTIL1
.getHBaseCluster()
68 .getRegionServer(i
).getReplicationSourceService();
69 for (ReplicationSourceInterface rsi
: replicationService
.getReplicationManager()
71 ReplicationSource source
= (ReplicationSource
) rsi
;
72 if (!currentFile
.equals(source
.getCurrentPath())) {
83 public void testEmptyWALRecovery() throws Exception
{
84 final int numRs
= UTIL1
.getHBaseCluster().getRegionServerThreads().size();
86 // for each RS, create an empty wal with same walGroupId
87 final List
<Path
> emptyWalPaths
= new ArrayList
<>();
88 long ts
= System
.currentTimeMillis();
89 for (int i
= 0; i
< numRs
; i
++) {
90 RegionInfo regionInfo
=
91 UTIL1
.getHBaseCluster().getRegions(htable1
.getName()).get(0).getRegionInfo();
92 WAL wal
= UTIL1
.getHBaseCluster().getRegionServer(i
).getWAL(regionInfo
);
93 Path currentWalPath
= AbstractFSWALProvider
.getCurrentFileName(wal
);
94 String walGroupId
= AbstractFSWALProvider
.getWALPrefixFromWALName(currentWalPath
.getName());
95 Path emptyWalPath
= new Path(UTIL1
.getDataTestDir(), walGroupId
+ "." + ts
);
96 UTIL1
.getTestFileSystem().create(emptyWalPath
).close();
97 emptyWalPaths
.add(emptyWalPath
);
100 // inject our empty wal into the replication queue, and then roll the original wal, which
101 // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
102 // determine if the file being replicated currently is still opened for write, so just inject a
103 // new wal to the replication queue does not mean the previous file is closed.
104 for (int i
= 0; i
< numRs
; i
++) {
105 HRegionServer hrs
= UTIL1
.getHBaseCluster().getRegionServer(i
);
106 Replication replicationService
= (Replication
) hrs
.getReplicationSourceService();
107 replicationService
.getReplicationManager().preLogRoll(emptyWalPaths
.get(i
));
108 replicationService
.getReplicationManager().postLogRoll(emptyWalPaths
.get(i
));
109 RegionInfo regionInfo
=
110 UTIL1
.getHBaseCluster().getRegions(htable1
.getName()).get(0).getRegionInfo();
111 WAL wal
= hrs
.getWAL(regionInfo
);
112 wal
.rollWriter(true);
115 // ReplicationSource should advance past the empty wal, or else the test will fail
116 waitForLogAdvance(numRs
);
118 // we're now writing to the new wal
119 // if everything works, the source should've stopped reading from the empty wal, and start
120 // replicating from the new wal
121 runSimplePutDeleteTest();