2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.fail
;
23 import java
.io
.IOException
;
24 import java
.util
.List
;
25 import java
.util
.concurrent
.atomic
.AtomicInteger
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.fs
.FileSystem
;
28 import org
.apache
.hadoop
.hbase
.ChoreService
;
29 import org
.apache
.hadoop
.hbase
.ClusterId
;
30 import org
.apache
.hadoop
.hbase
.CoordinatedStateManager
;
31 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
33 import org
.apache
.hadoop
.hbase
.HConstants
;
34 import org
.apache
.hadoop
.hbase
.Server
;
35 import org
.apache
.hadoop
.hbase
.ServerName
;
36 import org
.apache
.hadoop
.hbase
.client
.AsyncClusterConnection
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
39 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
40 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
41 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
42 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
43 import org
.apache
.hadoop
.hbase
.zookeeper
.ZNodePaths
;
44 import org
.apache
.zookeeper
.KeeperException
;
45 import org
.junit
.AfterClass
;
46 import org
.junit
.Before
;
47 import org
.junit
.BeforeClass
;
48 import org
.junit
.ClassRule
;
49 import org
.junit
.Test
;
50 import org
.junit
.experimental
.categories
.Category
;
51 import org
.slf4j
.Logger
;
52 import org
.slf4j
.LoggerFactory
;
55 * This class tests the ReplicationTrackerZKImpl class and ReplicationListener interface. One
56 * MiniZKCluster is used throughout the entire class. The cluster is initialized with the creation
57 * of the rsZNode. All other znode creation/initialization is handled by the replication state
58 * interfaces (i.e. ReplicationPeers, etc.). Each test case in this class should ensure that the
59 * MiniZKCluster is cleaned and returned to it's initial state (i.e. nothing but the rsZNode).
61 @Category({ReplicationTests
.class, MediumTests
.class})
62 public class TestReplicationTrackerZKImpl
{
65 public static final HBaseClassTestRule CLASS_RULE
=
66 HBaseClassTestRule
.forClass(TestReplicationTrackerZKImpl
.class);
68 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationTrackerZKImpl
.class);
70 private static Configuration conf
;
71 private static HBaseTestingUtility utility
;
73 // Each one of the below variables are reinitialized before every test case
74 private ZKWatcher zkw
;
75 private ReplicationPeers rp
;
76 private ReplicationTracker rt
;
77 private AtomicInteger rsRemovedCount
;
78 private String rsRemovedData
;
81 public static void setUpBeforeClass() throws Exception
{
82 utility
= new HBaseTestingUtility();
83 utility
.startMiniZKCluster();
84 conf
= utility
.getConfiguration();
85 ZKWatcher zk
= HBaseTestingUtility
.getZooKeeperWatcher(utility
);
86 ZKUtil
.createWithParents(zk
, zk
.getZNodePaths().rsZNode
);
90 public void setUp() throws Exception
{
91 zkw
= HBaseTestingUtility
.getZooKeeperWatcher(utility
);
92 String fakeRs1
= ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
,
93 "hostname1.example.org:1234");
95 ZKClusterId
.setClusterId(zkw
, new ClusterId());
96 rp
= ReplicationFactory
.getReplicationPeers(zkw
, conf
);
98 rt
= ReplicationFactory
.getReplicationTracker(zkw
, new DummyServer(fakeRs1
),
99 new DummyServer(fakeRs1
));
100 } catch (Exception e
) {
101 fail("Exception during test setup: " + e
);
103 rsRemovedCount
= new AtomicInteger(0);
108 public static void tearDownAfterClass() throws Exception
{
109 utility
.shutdownMiniZKCluster();
113 public void testGetListOfRegionServers() throws Exception
{
115 assertEquals(0, rt
.getListOfRegionServers().size());
118 ZKUtil
.createWithParents(zkw
,
119 ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
, "hostname1.example.org:1234"));
120 List
<String
> rss
= rt
.getListOfRegionServers();
121 assertEquals(rss
.toString(), 1, rss
.size());
124 ZKUtil
.createWithParents(zkw
,
125 ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
, "hostname2.example.org:1234"));
126 rss
= rt
.getListOfRegionServers();
127 assertEquals(rss
.toString(), 2, rss
.size());
130 ZKUtil
.deleteNode(zkw
,
131 ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
, "hostname2.example.org:1234"));
132 rss
= rt
.getListOfRegionServers();
133 assertEquals(1, rss
.size());
136 ZKUtil
.deleteNode(zkw
,
137 ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
, "hostname1.example.org:1234"));
138 rss
= rt
.getListOfRegionServers();
139 assertEquals(rss
.toString(), 0, rss
.size());
143 public void testRegionServerRemovedEvent() throws Exception
{
144 ZKUtil
.createAndWatch(zkw
,
145 ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
, "hostname2.example.org:1234"),
146 HConstants
.EMPTY_BYTE_ARRAY
);
147 rt
.registerListener(new DummyReplicationListener());
149 ZKUtil
.deleteNode(zkw
,
150 ZNodePaths
.joinZNode(zkw
.getZNodePaths().rsZNode
, "hostname2.example.org:1234"));
152 while (rsRemovedCount
.get() < 1) {
155 assertEquals("hostname2.example.org:1234", rsRemovedData
);
159 public void testPeerNameControl() throws Exception
{
161 rp
.getPeerStorage().addPeer("6",
162 ReplicationPeerConfig
.newBuilder().setClusterKey(utility
.getClusterKey()).build(), true,
163 SyncReplicationState
.NONE
);
166 rp
.getPeerStorage().addPeer("6",
167 ReplicationPeerConfig
.newBuilder().setClusterKey(utility
.getClusterKey()).build(), true,
168 SyncReplicationState
.NONE
);
169 } catch (ReplicationException e
) {
170 if (e
.getCause() instanceof KeeperException
.NodeExistsException
) {
175 assertEquals(1, exists
);
178 rp
.getPeerStorage().removePeer("6");
181 private class DummyReplicationListener
implements ReplicationListener
{
184 public void regionServerRemoved(String regionServer
) {
185 rsRemovedData
= regionServer
;
186 rsRemovedCount
.getAndIncrement();
187 LOG
.debug("Received regionServerRemoved event: " + regionServer
);
191 private class DummyServer
implements Server
{
192 private String serverName
;
193 private boolean isAborted
= false;
194 private boolean isStopped
= false;
196 public DummyServer(String serverName
) {
197 this.serverName
= serverName
;
201 public Configuration
getConfiguration() {
206 public ZKWatcher
getZooKeeper() {
211 public CoordinatedStateManager
getCoordinatedStateManager() {
216 public Connection
getConnection() {
221 public ServerName
getServerName() {
222 return ServerName
.valueOf(this.serverName
);
226 public void abort(String why
, Throwable e
) {
227 LOG
.info("Aborting " + serverName
);
228 this.isAborted
= true;
232 public boolean isAborted() {
233 return this.isAborted
;
237 public void stop(String why
) {
238 this.isStopped
= true;
242 public boolean isStopped() {
243 return this.isStopped
;
247 public ChoreService
getChoreService() {
252 public FileSystem
getFileSystem() {
257 public boolean isStopping() {
262 public Connection
createConnection(Configuration conf
) throws IOException
{
267 public AsyncClusterConnection
getAsyncClusterConnection() {