HBASE-26481 Consider rolling upgrading from old region replication framework (#3880)
[hbase.git] / hbase-replication / src / main / java / org / apache / hadoop / hbase / replication / ReplicationUtils.java
blobe8ecec262bf641a0deca0c0b599f4596453555b1
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import java.io.IOException;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.CompoundConfiguration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.ServerName;
32 import org.apache.hadoop.hbase.TableName;
33 import org.apache.yetus.audience.InterfaceAudience;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 /**
38 * Helper class for replication.
40 @InterfaceAudience.Private
41 public final class ReplicationUtils {
43 private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class);
45 public static final String REPLICATION_ATTR_NAME = "__rep__";
47 public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
49 public static final String SYNC_WAL_SUFFIX = ".syncrep";
51 public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
53 public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
55 // This is used for copying sync replication log from local to remote and overwrite the old one
56 // since some FileSystem implementation may not support atomic rename.
57 public static final String RENAME_WAL_SUFFIX = ".ren";
59 public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME =
60 "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint";
62 private ReplicationUtils() {
65 public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
66 Configuration baseConf) throws ReplicationException {
67 Configuration otherConf;
68 try {
69 otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
70 } catch (IOException e) {
71 throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
74 if (!peerConfig.getConfiguration().isEmpty()) {
75 CompoundConfiguration compound = new CompoundConfiguration();
76 compound.add(otherConf);
77 compound.addStringMap(peerConfig.getConfiguration());
78 return compound;
81 return otherConf;
84 public static void removeAllQueues(ReplicationQueueStorage queueStorage, String peerId)
85 throws ReplicationException {
86 for (ServerName replicator : queueStorage.getListOfReplicators()) {
87 List<String> queueIds = queueStorage.getAllQueues(replicator);
88 for (String queueId : queueIds) {
89 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
90 if (queueInfo.getPeerId().equals(peerId)) {
91 queueStorage.removeQueue(replicator, queueId);
94 queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
98 private static boolean isCollectionEqual(Collection<String> c1, Collection<String> c2) {
99 if (c1 == null) {
100 return c2 == null;
102 if (c2 == null) {
103 return false;
105 return c1.size() == c2.size() && c1.containsAll(c2);
108 private static boolean isNamespacesEqual(Set<String> ns1, Set<String> ns2) {
109 return isCollectionEqual(ns1, ns2);
112 private static boolean isTableCFsEqual(Map<TableName, List<String>> tableCFs1,
113 Map<TableName, List<String>> tableCFs2) {
114 if (tableCFs1 == null) {
115 return tableCFs2 == null;
117 if (tableCFs2 == null) {
118 return false;
120 if (tableCFs1.size() != tableCFs2.size()) {
121 return false;
123 for (Map.Entry<TableName, List<String>> entry1 : tableCFs1.entrySet()) {
124 TableName table = entry1.getKey();
125 if (!tableCFs2.containsKey(table)) {
126 return false;
128 List<String> cfs1 = entry1.getValue();
129 List<String> cfs2 = tableCFs2.get(table);
130 if (!isCollectionEqual(cfs1, cfs2)) {
131 return false;
134 return true;
137 public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1,
138 ReplicationPeerConfig rpc2) {
139 if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
140 return false;
142 if (rpc1.replicateAllUserTables()) {
143 return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
144 isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
145 } else {
146 return isNamespacesEqual(rpc1.getNamespaces(), rpc2.getNamespaces()) &&
147 isTableCFsEqual(rpc1.getTableCFsMap(), rpc2.getTableCFsMap());
152 * @param c Configuration to look at
153 * @return True if replication for bulk load data is enabled.
155 public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
156 return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
157 HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
160 public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
161 throws IOException {
162 return new Path(remoteWALDir).getFileSystem(conf);
165 public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
166 return new Path(remoteWALDir, peerId);
169 public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
170 return new Path(remoteWALDir, peerId);
173 public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
174 return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
177 public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
178 return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
181 public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
182 return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
186 * Do the sleeping logic
187 * @param msg Why we sleep
188 * @param sleepForRetries the base sleep time.
189 * @param sleepMultiplier by how many times the default sleeping time is augmented
190 * @param maxRetriesMultiplier the max retry multiplier
191 * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
193 public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier,
194 int maxRetriesMultiplier) {
195 try {
196 LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
197 Thread.sleep(sleepForRetries * sleepMultiplier);
198 } catch (InterruptedException e) {
199 LOG.debug("Interrupted while sleeping between retries");
200 Thread.currentThread().interrupt();
202 return sleepMultiplier < maxRetriesMultiplier;
206 * Get the adaptive timeout value when performing a retry
208 public static int getAdaptiveTimeout(final int initialValue, final int retries) {
209 int ntries = retries;
210 if (ntries >= HConstants.RETRY_BACKOFF.length) {
211 ntries = HConstants.RETRY_BACKOFF.length - 1;
213 if (ntries < 0) {
214 ntries = 0;
216 return initialValue * HConstants.RETRY_BACKOFF[ntries];