2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.replication
;
21 import java
.io
.IOException
;
22 import java
.util
.ArrayList
;
23 import java
.util
.Collections
;
24 import java
.util
.List
;
25 import java
.util
.UUID
;
27 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKListener
;
28 import org
.apache
.yetus
.audience
.InterfaceAudience
;
29 import org
.apache
.hadoop
.hbase
.Abortable
;
30 import org
.apache
.hadoop
.hbase
.ServerName
;
31 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKClusterId
;
32 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
33 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
34 import org
.apache
.zookeeper
.KeeperException
;
35 import org
.apache
.zookeeper
.KeeperException
.AuthFailedException
;
36 import org
.apache
.zookeeper
.KeeperException
.ConnectionLossException
;
37 import org
.apache
.zookeeper
.KeeperException
.SessionExpiredException
;
38 import org
.slf4j
.Logger
;
39 import org
.slf4j
.LoggerFactory
;
42 * A {@link BaseReplicationEndpoint} for replication endpoints whose
43 * target cluster is an HBase cluster.
45 @InterfaceAudience.Private
46 public abstract class HBaseReplicationEndpoint
extends BaseReplicationEndpoint
47 implements Abortable
{
49 private static final Logger LOG
= LoggerFactory
.getLogger(HBaseReplicationEndpoint
.class);
51 private ZKWatcher zkw
= null;
53 private List
<ServerName
> regionServers
= new ArrayList
<>(0);
54 private long lastRegionServerUpdate
;
56 protected synchronized void disconnect() {
63 * A private method used to re-establish a zookeeper session with a peer cluster.
66 protected void reconnect(KeeperException ke
) {
67 if (ke
instanceof ConnectionLossException
|| ke
instanceof SessionExpiredException
68 || ke
instanceof AuthFailedException
) {
69 String clusterKey
= ctx
.getPeerConfig().getClusterKey();
70 LOG
.warn("Lost the ZooKeeper connection for peer " + clusterKey
, ke
);
73 } catch (IOException io
) {
74 LOG
.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey
, io
);
90 protected void doStart() {
94 } catch (IOException e
) {
100 protected void doStop() {
106 // Synchronize peer cluster connection attempts to avoid races and rate
107 // limit connections when multiple replication sources try to connect to
108 // the peer cluster. If the peer cluster is down we can get out of control
110 public synchronized UUID
getPeerUUID() {
111 UUID peerUUID
= null;
113 peerUUID
= ZKClusterId
.getUUIDForCluster(zkw
);
114 } catch (KeeperException ke
) {
121 * Get the ZK connection to this peer
122 * @return zk connection
124 protected synchronized ZKWatcher
getZkw() {
129 * Closes the current ZKW (if not null) and creates a new one
130 * @throws IOException If anything goes wrong connecting
132 synchronized void reloadZkWatcher() throws IOException
{
133 if (zkw
!= null) zkw
.close();
134 zkw
= new ZKWatcher(ctx
.getConfiguration(),
135 "connection to cluster: " + ctx
.getPeerId(), this);
136 getZkw().registerListener(new PeerRegionServerListener(this));
140 public void abort(String why
, Throwable e
) {
141 LOG
.error("The HBaseReplicationEndpoint corresponding to peer " + ctx
.getPeerId()
142 + " was aborted for the following reason(s):" + why
, e
);
146 public boolean isAborted() {
147 // Currently this is never "Aborted", we just log when the abort method is called.
152 * Get the list of all the region servers from the specified peer
153 * @param zkw zk connection to use
154 * @return list of region server addresses or an empty list if the slave is unavailable
156 protected static List
<ServerName
> fetchSlavesAddresses(ZKWatcher zkw
)
157 throws KeeperException
{
158 List
<String
> children
= ZKUtil
.listChildrenAndWatchForNewChildren(zkw
,
159 zkw
.getZNodePaths().rsZNode
);
160 if (children
== null) {
161 return Collections
.emptyList();
163 List
<ServerName
> addresses
= new ArrayList
<>(children
.size());
164 for (String child
: children
) {
165 addresses
.add(ServerName
.parseServerName(child
));
171 * Get a list of all the addresses of all the region servers
172 * for this peer cluster
173 * @return list of addresses
175 // Synchronize peer cluster connection attempts to avoid races and rate
176 // limit connections when multiple replication sources try to connect to
177 // the peer cluster. If the peer cluster is down we can get out of control
179 public synchronized List
<ServerName
> getRegionServers() {
181 setRegionServers(fetchSlavesAddresses(this.getZkw()));
182 } catch (KeeperException ke
) {
183 if (LOG
.isDebugEnabled()) {
184 LOG
.debug("Fetch slaves addresses failed", ke
);
188 return regionServers
;
192 * Set the list of region servers for that peer
193 * @param regionServers list of addresses for the region servers
195 public synchronized void setRegionServers(List
<ServerName
> regionServers
) {
196 this.regionServers
= regionServers
;
197 lastRegionServerUpdate
= System
.currentTimeMillis();
201 * Get the timestamp at which the last change occurred to the list of region servers to replicate
203 * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
205 public long getLastRegionServerUpdate() {
206 return lastRegionServerUpdate
;
210 * Tracks changes to the list of region servers in a peer's cluster.
212 public static class PeerRegionServerListener
extends ZKListener
{
214 private final HBaseReplicationEndpoint replicationEndpoint
;
215 private final String regionServerListNode
;
217 public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer
) {
218 super(replicationPeer
.getZkw());
219 this.replicationEndpoint
= replicationPeer
;
220 this.regionServerListNode
= replicationEndpoint
.getZkw().getZNodePaths().rsZNode
;
224 public synchronized void nodeChildrenChanged(String path
) {
225 if (path
.equals(regionServerListNode
)) {
227 LOG
.info("Detected change to peer region servers, fetching updated list");
228 replicationEndpoint
.setRegionServers(fetchSlavesAddresses(replicationEndpoint
.getZkw()));
229 } catch (KeeperException e
) {
230 LOG
.error("Error reading slave addresses", e
);