2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import java
.io
.IOException
;
21 import java
.util
.Collection
;
22 import java
.util
.List
;
25 import org
.apache
.hadoop
.conf
.Configuration
;
26 import org
.apache
.hadoop
.fs
.FileSystem
;
27 import org
.apache
.hadoop
.fs
.Path
;
28 import org
.apache
.hadoop
.hbase
.CompoundConfiguration
;
29 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
30 import org
.apache
.hadoop
.hbase
.HConstants
;
31 import org
.apache
.hadoop
.hbase
.ServerName
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.yetus
.audience
.InterfaceAudience
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
38 * Helper class for replication.
40 @InterfaceAudience.Private
41 public final class ReplicationUtils
{
43 private static final Logger LOG
= LoggerFactory
.getLogger(ReplicationUtils
.class);
45 public static final String REPLICATION_ATTR_NAME
= "__rep__";
47 public static final String REMOTE_WAL_DIR_NAME
= "remoteWALs";
49 public static final String SYNC_WAL_SUFFIX
= ".syncrep";
51 public static final String REMOTE_WAL_REPLAY_SUFFIX
= "-replay";
53 public static final String REMOTE_WAL_SNAPSHOT_SUFFIX
= "-snapshot";
55 // This is used for copying sync replication log from local to remote and overwrite the old one
56 // since some FileSystem implementation may not support atomic rename.
57 public static final String RENAME_WAL_SUFFIX
= ".ren";
59 public static final String LEGACY_REGION_REPLICATION_ENDPOINT_NAME
=
60 "org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint";
62 private ReplicationUtils() {
65 public static Configuration
getPeerClusterConfiguration(ReplicationPeerConfig peerConfig
,
66 Configuration baseConf
) throws ReplicationException
{
67 Configuration otherConf
;
69 otherConf
= HBaseConfiguration
.createClusterConf(baseConf
, peerConfig
.getClusterKey());
70 } catch (IOException e
) {
71 throw new ReplicationException("Can't get peer configuration for peer " + peerConfig
, e
);
74 if (!peerConfig
.getConfiguration().isEmpty()) {
75 CompoundConfiguration compound
= new CompoundConfiguration();
76 compound
.add(otherConf
);
77 compound
.addStringMap(peerConfig
.getConfiguration());
84 public static void removeAllQueues(ReplicationQueueStorage queueStorage
, String peerId
)
85 throws ReplicationException
{
86 for (ServerName replicator
: queueStorage
.getListOfReplicators()) {
87 List
<String
> queueIds
= queueStorage
.getAllQueues(replicator
);
88 for (String queueId
: queueIds
) {
89 ReplicationQueueInfo queueInfo
= new ReplicationQueueInfo(queueId
);
90 if (queueInfo
.getPeerId().equals(peerId
)) {
91 queueStorage
.removeQueue(replicator
, queueId
);
94 queueStorage
.removeReplicatorIfQueueIsEmpty(replicator
);
98 private static boolean isCollectionEqual(Collection
<String
> c1
, Collection
<String
> c2
) {
105 return c1
.size() == c2
.size() && c1
.containsAll(c2
);
108 private static boolean isNamespacesEqual(Set
<String
> ns1
, Set
<String
> ns2
) {
109 return isCollectionEqual(ns1
, ns2
);
112 private static boolean isTableCFsEqual(Map
<TableName
, List
<String
>> tableCFs1
,
113 Map
<TableName
, List
<String
>> tableCFs2
) {
114 if (tableCFs1
== null) {
115 return tableCFs2
== null;
117 if (tableCFs2
== null) {
120 if (tableCFs1
.size() != tableCFs2
.size()) {
123 for (Map
.Entry
<TableName
, List
<String
>> entry1
: tableCFs1
.entrySet()) {
124 TableName table
= entry1
.getKey();
125 if (!tableCFs2
.containsKey(table
)) {
128 List
<String
> cfs1
= entry1
.getValue();
129 List
<String
> cfs2
= tableCFs2
.get(table
);
130 if (!isCollectionEqual(cfs1
, cfs2
)) {
137 public static boolean isNamespacesAndTableCFsEqual(ReplicationPeerConfig rpc1
,
138 ReplicationPeerConfig rpc2
) {
139 if (rpc1
.replicateAllUserTables() != rpc2
.replicateAllUserTables()) {
142 if (rpc1
.replicateAllUserTables()) {
143 return isNamespacesEqual(rpc1
.getExcludeNamespaces(), rpc2
.getExcludeNamespaces()) &&
144 isTableCFsEqual(rpc1
.getExcludeTableCFsMap(), rpc2
.getExcludeTableCFsMap());
146 return isNamespacesEqual(rpc1
.getNamespaces(), rpc2
.getNamespaces()) &&
147 isTableCFsEqual(rpc1
.getTableCFsMap(), rpc2
.getTableCFsMap());
152 * @param c Configuration to look at
153 * @return True if replication for bulk load data is enabled.
155 public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c
) {
156 return c
.getBoolean(HConstants
.REPLICATION_BULKLOAD_ENABLE_KEY
,
157 HConstants
.REPLICATION_BULKLOAD_ENABLE_DEFAULT
);
160 public static FileSystem
getRemoteWALFileSystem(Configuration conf
, String remoteWALDir
)
162 return new Path(remoteWALDir
).getFileSystem(conf
);
165 public static Path
getPeerRemoteWALDir(String remoteWALDir
, String peerId
) {
166 return new Path(remoteWALDir
, peerId
);
169 public static Path
getPeerRemoteWALDir(Path remoteWALDir
, String peerId
) {
170 return new Path(remoteWALDir
, peerId
);
173 public static Path
getPeerReplayWALDir(Path remoteWALDir
, String peerId
) {
174 return getPeerRemoteWALDir(remoteWALDir
, peerId
).suffix(REMOTE_WAL_REPLAY_SUFFIX
);
177 public static Path
getPeerSnapshotWALDir(String remoteWALDir
, String peerId
) {
178 return getPeerRemoteWALDir(remoteWALDir
, peerId
).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX
);
181 public static Path
getPeerSnapshotWALDir(Path remoteWALDir
, String peerId
) {
182 return getPeerRemoteWALDir(remoteWALDir
, peerId
).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX
);
186 * Do the sleeping logic
187 * @param msg Why we sleep
188 * @param sleepForRetries the base sleep time.
189 * @param sleepMultiplier by how many times the default sleeping time is augmented
190 * @param maxRetriesMultiplier the max retry multiplier
191 * @return True if <code>sleepMultiplier</code> is < <code>maxRetriesMultiplier</code>
193 public static boolean sleepForRetries(String msg
, long sleepForRetries
, int sleepMultiplier
,
194 int maxRetriesMultiplier
) {
196 LOG
.trace("{}, sleeping {} times {}", msg
, sleepForRetries
, sleepMultiplier
);
197 Thread
.sleep(sleepForRetries
* sleepMultiplier
);
198 } catch (InterruptedException e
) {
199 LOG
.debug("Interrupted while sleeping between retries");
200 Thread
.currentThread().interrupt();
202 return sleepMultiplier
< maxRetriesMultiplier
;
206 * Get the adaptive timeout value when performing a retry
208 public static int getAdaptiveTimeout(final int initialValue
, final int retries
) {
209 int ntries
= retries
;
210 if (ntries
>= HConstants
.RETRY_BACKOFF
.length
) {
211 ntries
= HConstants
.RETRY_BACKOFF
.length
- 1;
216 return initialValue
* HConstants
.RETRY_BACKOFF
[ntries
];