2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertFalse
;
21 import static org
.junit
.Assert
.assertTrue
;
23 import java
.io
.IOException
;
24 import java
.util
.List
;
25 import java
.util
.stream
.Collectors
;
26 import org
.apache
.hadoop
.fs
.Path
;
27 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
28 import org
.apache
.hadoop
.hbase
.ServerName
;
29 import org
.apache
.hadoop
.hbase
.master
.MasterFileSystem
;
30 import org
.apache
.hadoop
.hbase
.master
.ServerManager
;
31 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
32 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
33 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
34 import org
.junit
.ClassRule
;
35 import org
.junit
.Test
;
36 import org
.junit
.experimental
.categories
.Category
;
37 import org
.slf4j
.Logger
;
38 import org
.slf4j
.LoggerFactory
;
40 @Category({ ReplicationTests
.class, LargeTests
.class })
41 public class TestSyncReplicationStandbyKillRS
extends SyncReplicationTestBase
{
43 private static final Logger LOG
=
44 LoggerFactory
.getLogger(TestSyncReplicationStandbyKillRS
.class);
46 private final long SLEEP_TIME
= 1000;
48 private final int COUNT
= 1000;
51 public static final HBaseClassTestRule CLASS_RULE
=
52 HBaseClassTestRule
.forClass(TestSyncReplicationStandbyKillRS
.class);
55 public void testStandbyKillRegionServer() throws Exception
{
56 MasterFileSystem mfs
= UTIL2
.getHBaseCluster().getMaster().getMasterFileSystem();
57 Path remoteWALDir
= getRemoteWALDir(mfs
, PEER_ID
);
58 assertFalse(mfs
.getWALFileSystem().exists(remoteWALDir
));
59 UTIL2
.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID
,
60 SyncReplicationState
.STANDBY
);
61 assertTrue(mfs
.getWALFileSystem().exists(remoteWALDir
));
62 UTIL1
.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID
,
63 SyncReplicationState
.ACTIVE
);
65 // Disable async replication and write data, then shutdown
66 UTIL1
.getAdmin().disableReplicationPeer(PEER_ID
);
67 write(UTIL1
, 0, COUNT
);
68 UTIL1
.shutdownMiniCluster();
70 JVMClusterUtil
.MasterThread activeMaster
= UTIL2
.getMiniHBaseCluster().getMasterThread();
71 String threadName
= "RegionServer-Restarter";
72 Thread t
= new Thread(() -> {
74 List
<JVMClusterUtil
.RegionServerThread
> regionServers
=
75 UTIL2
.getMiniHBaseCluster().getLiveRegionServerThreads();
76 LOG
.debug("Going to stop {} RSes: [{}]", regionServers
.size(),
77 regionServers
.stream().map(rst
-> rst
.getRegionServer().getServerName().getServerName())
78 .collect(Collectors
.joining(", ")));
79 for (JVMClusterUtil
.RegionServerThread rst
: regionServers
) {
80 ServerName serverName
= rst
.getRegionServer().getServerName();
81 LOG
.debug("Going to RS stop [{}]", serverName
);
82 rst
.getRegionServer().stop("Stop RS for test");
83 waitForRSShutdownToStartAndFinish(activeMaster
, serverName
);
84 LOG
.debug("Going to start a new RS");
85 JVMClusterUtil
.RegionServerThread restarted
=
86 UTIL2
.getMiniHBaseCluster().startRegionServer();
87 LOG
.debug("Waiting RS [{}] to online", restarted
.getRegionServer().getServerName());
88 restarted
.waitForServerOnline();
89 LOG
.debug("Waiting the old RS {} thread to quit", rst
.getName());
91 LOG
.debug("Done stop RS [{}] and restart [{}]", serverName
,
92 restarted
.getRegionServer().getServerName());
94 LOG
.debug("All RSes restarted");
95 } catch (Exception e
) {
96 LOG
.error("Failed to kill RS", e
);
101 LOG
.debug("Going to transit peer {} to {} state", PEER_ID
,
102 SyncReplicationState
.DOWNGRADE_ACTIVE
);
103 // Transit standby to DA to replay logs
105 UTIL2
.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID
,
106 SyncReplicationState
.DOWNGRADE_ACTIVE
);
107 } catch (Exception e
) {
108 LOG
.error("Failed to transit standby cluster to " + SyncReplicationState
.DOWNGRADE_ACTIVE
, e
);
111 LOG
.debug("Waiting for the restarter thread {} to quit", threadName
);
114 while (UTIL2
.getAdmin()
115 .getReplicationPeerSyncReplicationState(PEER_ID
) != SyncReplicationState
.DOWNGRADE_ACTIVE
) {
116 LOG
.debug("Waiting for peer {} to be in {} state", PEER_ID
,
117 SyncReplicationState
.DOWNGRADE_ACTIVE
);
118 Thread
.sleep(SLEEP_TIME
);
120 LOG
.debug("Going to verify the result, {} records expected", COUNT
);
121 verify(UTIL2
, 0, COUNT
);
122 LOG
.debug("Verification successfully done");
125 private void waitForRSShutdownToStartAndFinish(JVMClusterUtil
.MasterThread activeMaster
,
126 ServerName serverName
) throws InterruptedException
, IOException
{
127 ServerManager sm
= activeMaster
.getMaster().getServerManager();
128 // First wait for it to be in dead list
129 while (!sm
.getDeadServers().isDeadServer(serverName
)) {
130 LOG
.debug("Waiting for {} to be listed as dead in master", serverName
);
131 Thread
.sleep(SLEEP_TIME
);
133 LOG
.debug("Server {} marked as dead, waiting for it to finish dead processing", serverName
);
134 while (sm
.areDeadServersInProgress()) {
135 LOG
.debug("Server {} still being processed, waiting", serverName
);
136 Thread
.sleep(SLEEP_TIME
);
138 LOG
.debug("Server {} done with server shutdown processing", serverName
);