HBASE-23949 refactor loadBalancer implements for rsgroup balance by table to achieve...
[hbase.git] / hbase-server / src / main / java / org / apache / hadoop / hbase / replication / HBaseReplicationEndpoint.java
blob1ca70ad85dd3c33c1352f500508408f90dc68c33
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase.replication;
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.UUID;
27 import org.apache.hadoop.hbase.zookeeper.ZKListener;
28 import org.apache.yetus.audience.InterfaceAudience;
29 import org.apache.hadoop.hbase.Abortable;
30 import org.apache.hadoop.hbase.ServerName;
31 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
32 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
33 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
34 import org.apache.zookeeper.KeeperException;
35 import org.apache.zookeeper.KeeperException.AuthFailedException;
36 import org.apache.zookeeper.KeeperException.ConnectionLossException;
37 import org.apache.zookeeper.KeeperException.SessionExpiredException;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 /**
42 * A {@link BaseReplicationEndpoint} for replication endpoints whose
43 * target cluster is an HBase cluster.
45 @InterfaceAudience.Private
46 public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
47 implements Abortable {
49 private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
51 private ZKWatcher zkw = null;
53 private List<ServerName> regionServers = new ArrayList<>(0);
54 private long lastRegionServerUpdate;
56 protected synchronized void disconnect() {
57 if (zkw != null) {
58 zkw.close();
62 /**
63 * A private method used to re-establish a zookeeper session with a peer cluster.
64 * @param ke
66 protected void reconnect(KeeperException ke) {
67 if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
68 || ke instanceof AuthFailedException) {
69 String clusterKey = ctx.getPeerConfig().getClusterKey();
70 LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
71 try {
72 reloadZkWatcher();
73 } catch (IOException io) {
74 LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
79 @Override
80 public void start() {
81 startAsync();
84 @Override
85 public void stop() {
86 stopAsync();
89 @Override
90 protected void doStart() {
91 try {
92 reloadZkWatcher();
93 notifyStarted();
94 } catch (IOException e) {
95 notifyFailed(e);
99 @Override
100 protected void doStop() {
101 disconnect();
102 notifyStopped();
105 @Override
106 // Synchronize peer cluster connection attempts to avoid races and rate
107 // limit connections when multiple replication sources try to connect to
108 // the peer cluster. If the peer cluster is down we can get out of control
109 // over time.
110 public synchronized UUID getPeerUUID() {
111 UUID peerUUID = null;
112 try {
113 peerUUID = ZKClusterId.getUUIDForCluster(zkw);
114 } catch (KeeperException ke) {
115 reconnect(ke);
117 return peerUUID;
121 * Get the ZK connection to this peer
122 * @return zk connection
124 protected synchronized ZKWatcher getZkw() {
125 return zkw;
129 * Closes the current ZKW (if not null) and creates a new one
130 * @throws IOException If anything goes wrong connecting
132 synchronized void reloadZkWatcher() throws IOException {
133 if (zkw != null) zkw.close();
134 zkw = new ZKWatcher(ctx.getConfiguration(),
135 "connection to cluster: " + ctx.getPeerId(), this);
136 getZkw().registerListener(new PeerRegionServerListener(this));
139 @Override
140 public void abort(String why, Throwable e) {
141 LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
142 + " was aborted for the following reason(s):" + why, e);
145 @Override
146 public boolean isAborted() {
147 // Currently this is never "Aborted", we just log when the abort method is called.
148 return false;
152 * Get the list of all the region servers from the specified peer
153 * @param zkw zk connection to use
154 * @return list of region server addresses or an empty list if the slave is unavailable
156 protected static List<ServerName> fetchSlavesAddresses(ZKWatcher zkw)
157 throws KeeperException {
158 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw,
159 zkw.getZNodePaths().rsZNode);
160 if (children == null) {
161 return Collections.emptyList();
163 List<ServerName> addresses = new ArrayList<>(children.size());
164 for (String child : children) {
165 addresses.add(ServerName.parseServerName(child));
167 return addresses;
171 * Get a list of all the addresses of all the region servers
172 * for this peer cluster
173 * @return list of addresses
175 // Synchronize peer cluster connection attempts to avoid races and rate
176 // limit connections when multiple replication sources try to connect to
177 // the peer cluster. If the peer cluster is down we can get out of control
178 // over time.
179 public synchronized List<ServerName> getRegionServers() {
180 try {
181 setRegionServers(fetchSlavesAddresses(this.getZkw()));
182 } catch (KeeperException ke) {
183 if (LOG.isDebugEnabled()) {
184 LOG.debug("Fetch slaves addresses failed", ke);
186 reconnect(ke);
188 return regionServers;
192 * Set the list of region servers for that peer
193 * @param regionServers list of addresses for the region servers
195 public synchronized void setRegionServers(List<ServerName> regionServers) {
196 this.regionServers = regionServers;
197 lastRegionServerUpdate = System.currentTimeMillis();
201 * Get the timestamp at which the last change occurred to the list of region servers to replicate
202 * to.
203 * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
205 public long getLastRegionServerUpdate() {
206 return lastRegionServerUpdate;
210 * Tracks changes to the list of region servers in a peer's cluster.
212 public static class PeerRegionServerListener extends ZKListener {
214 private final HBaseReplicationEndpoint replicationEndpoint;
215 private final String regionServerListNode;
217 public PeerRegionServerListener(HBaseReplicationEndpoint replicationPeer) {
218 super(replicationPeer.getZkw());
219 this.replicationEndpoint = replicationPeer;
220 this.regionServerListNode = replicationEndpoint.getZkw().getZNodePaths().rsZNode;
223 @Override
224 public synchronized void nodeChildrenChanged(String path) {
225 if (path.equals(regionServerListNode)) {
226 try {
227 LOG.info("Detected change to peer region servers, fetching updated list");
228 replicationEndpoint.setRegionServers(fetchSlavesAddresses(replicationEndpoint.getZkw()));
229 } catch (KeeperException e) {
230 LOG.error("Error reading slave addresses", e);