2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import java
.util
.Collection
;
21 import java
.util
.Collections
;
22 import java
.util
.HashMap
;
23 import java
.util
.List
;
26 import java
.util
.TreeMap
;
27 import org
.apache
.commons
.lang3
.StringUtils
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
30 import org
.apache
.yetus
.audience
.InterfaceAudience
;
32 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.collections4
.CollectionUtils
;
35 * A configuration for the replication peer cluster.
37 @InterfaceAudience.Public
38 public class ReplicationPeerConfig
{
40 private String clusterKey
;
41 private String replicationEndpointImpl
;
42 private final Map
<byte[], byte[]> peerData
;
43 private final Map
<String
, String
> configuration
;
44 private Map
<TableName
, ?
extends Collection
<String
>> tableCFsMap
= null;
45 private Set
<String
> namespaces
= null;
46 // Default value is true, means replicate all user tables to peer cluster.
47 private boolean replicateAllUserTables
= true;
48 private Map
<TableName
, ?
extends Collection
<String
>> excludeTableCFsMap
= null;
49 private Set
<String
> excludeNamespaces
= null;
50 private long bandwidth
= 0;
51 private final boolean serial
;
52 // Used by synchronous replication
53 private String remoteWALDir
;
55 private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder
) {
56 this.clusterKey
= builder
.clusterKey
;
57 this.replicationEndpointImpl
= builder
.replicationEndpointImpl
;
58 this.peerData
= Collections
.unmodifiableMap(builder
.peerData
);
59 this.configuration
= Collections
.unmodifiableMap(builder
.configuration
);
61 builder
.tableCFsMap
!= null ?
unmodifiableTableCFsMap(builder
.tableCFsMap
) : null;
63 builder
.namespaces
!= null ? Collections
.unmodifiableSet(builder
.namespaces
) : null;
64 this.replicateAllUserTables
= builder
.replicateAllUserTables
;
65 this.excludeTableCFsMap
=
66 builder
.excludeTableCFsMap
!= null ?
unmodifiableTableCFsMap(builder
.excludeTableCFsMap
)
68 this.excludeNamespaces
=
69 builder
.excludeNamespaces
!= null ? Collections
.unmodifiableSet(builder
.excludeNamespaces
)
71 this.bandwidth
= builder
.bandwidth
;
72 this.serial
= builder
.serial
;
73 this.remoteWALDir
= builder
.remoteWALDir
;
76 private Map
<TableName
, List
<String
>>
77 unmodifiableTableCFsMap(Map
<TableName
, List
<String
>> tableCFsMap
) {
78 Map
<TableName
, List
<String
>> newTableCFsMap
= new HashMap
<>();
79 tableCFsMap
.forEach((table
, cfs
) -> newTableCFsMap
.put(table
,
80 cfs
!= null ? Collections
.unmodifiableList(cfs
) : null));
81 return Collections
.unmodifiableMap(newTableCFsMap
);
84 public String
getClusterKey() {
88 public String
getReplicationEndpointImpl() {
89 return replicationEndpointImpl
;
92 public Map
<byte[], byte[]> getPeerData() {
96 public Map
<String
, String
> getConfiguration() {
100 public Map
<TableName
, List
<String
>> getTableCFsMap() {
101 return (Map
<TableName
, List
<String
>>) tableCFsMap
;
104 public Set
<String
> getNamespaces() {
105 return this.namespaces
;
108 public long getBandwidth() {
109 return this.bandwidth
;
112 public boolean replicateAllUserTables() {
113 return this.replicateAllUserTables
;
116 public Map
<TableName
, List
<String
>> getExcludeTableCFsMap() {
117 return (Map
<TableName
, List
<String
>>) excludeTableCFsMap
;
120 public Set
<String
> getExcludeNamespaces() {
121 return this.excludeNamespaces
;
124 public String
getRemoteWALDir() {
125 return this.remoteWALDir
;
129 * Use remote wal dir to decide whether a peer is sync replication peer
131 public boolean isSyncReplication() {
132 return !StringUtils
.isBlank(this.remoteWALDir
);
135 public static ReplicationPeerConfigBuilder
newBuilder() {
136 return new ReplicationPeerConfigBuilderImpl();
139 public boolean isSerial() {
143 public static ReplicationPeerConfigBuilder
newBuilder(ReplicationPeerConfig peerConfig
) {
144 ReplicationPeerConfigBuilderImpl builder
= new ReplicationPeerConfigBuilderImpl();
145 builder
.setClusterKey(peerConfig
.getClusterKey())
146 .setReplicationEndpointImpl(peerConfig
.getReplicationEndpointImpl())
147 .putAllPeerData(peerConfig
.getPeerData()).putAllConfiguration(peerConfig
.getConfiguration())
148 .setTableCFsMap(peerConfig
.getTableCFsMap()).setNamespaces(peerConfig
.getNamespaces())
149 .setReplicateAllUserTables(peerConfig
.replicateAllUserTables())
150 .setExcludeTableCFsMap(peerConfig
.getExcludeTableCFsMap())
151 .setExcludeNamespaces(peerConfig
.getExcludeNamespaces())
152 .setBandwidth(peerConfig
.getBandwidth()).setSerial(peerConfig
.isSerial())
153 .setRemoteWALDir(peerConfig
.getRemoteWALDir());
157 static class ReplicationPeerConfigBuilderImpl
implements ReplicationPeerConfigBuilder
{
159 private String clusterKey
;
161 private String replicationEndpointImpl
;
163 private Map
<byte[], byte[]> peerData
= new TreeMap
<>(Bytes
.BYTES_COMPARATOR
);
165 private Map
<String
, String
> configuration
= new HashMap
<>();
167 private Map
<TableName
, List
<String
>> tableCFsMap
= null;
169 private Set
<String
> namespaces
= null;
171 // Default value is true, means replicate all user tables to peer cluster.
172 private boolean replicateAllUserTables
= true;
174 private Map
<TableName
, List
<String
>> excludeTableCFsMap
= null;
176 private Set
<String
> excludeNamespaces
= null;
178 private long bandwidth
= 0;
180 private boolean serial
= false;
182 private String remoteWALDir
= null;
185 public ReplicationPeerConfigBuilder
setClusterKey(String clusterKey
) {
186 this.clusterKey
= clusterKey
!= null ? clusterKey
.trim() : null;
191 public ReplicationPeerConfigBuilder
setReplicationEndpointImpl(String replicationEndpointImpl
) {
192 this.replicationEndpointImpl
= replicationEndpointImpl
;
197 public ReplicationPeerConfigBuilder
putConfiguration(String key
, String value
) {
198 this.configuration
.put(key
, value
);
203 public ReplicationPeerConfigBuilder
removeConfiguration(String key
) {
204 this.configuration
.remove(key
);
209 public ReplicationPeerConfigBuilder
putPeerData(byte[] key
, byte[] value
) {
210 this.peerData
.put(key
, value
);
215 public ReplicationPeerConfigBuilder
216 setTableCFsMap(Map
<TableName
, List
<String
>> tableCFsMap
) {
217 this.tableCFsMap
= tableCFsMap
;
222 public ReplicationPeerConfigBuilder
setNamespaces(Set
<String
> namespaces
) {
223 this.namespaces
= namespaces
;
228 public ReplicationPeerConfigBuilder
setReplicateAllUserTables(boolean replicateAllUserTables
) {
229 this.replicateAllUserTables
= replicateAllUserTables
;
234 public ReplicationPeerConfigBuilder
235 setExcludeTableCFsMap(Map
<TableName
, List
<String
>> excludeTableCFsMap
) {
236 this.excludeTableCFsMap
= excludeTableCFsMap
;
241 public ReplicationPeerConfigBuilder
setExcludeNamespaces(Set
<String
> excludeNamespaces
) {
242 this.excludeNamespaces
= excludeNamespaces
;
247 public ReplicationPeerConfigBuilder
setBandwidth(long bandwidth
) {
248 this.bandwidth
= bandwidth
;
253 public ReplicationPeerConfigBuilder
setSerial(boolean serial
) {
254 this.serial
= serial
;
259 public ReplicationPeerConfigBuilder
setRemoteWALDir(String dir
) {
260 this.remoteWALDir
= dir
;
265 public ReplicationPeerConfig
build() {
266 // It would be nice to validate the configuration, but we have to work with "old" data
267 // from ZK which makes it much more difficult.
268 return new ReplicationPeerConfig(this);
273 public String
toString() {
274 StringBuilder builder
= new StringBuilder("clusterKey=").append(clusterKey
).append(",");
275 builder
.append("replicationEndpointImpl=").append(replicationEndpointImpl
).append(",");
276 builder
.append("replicateAllUserTables=").append(replicateAllUserTables
).append(",");
277 if (replicateAllUserTables
) {
278 if (excludeNamespaces
!= null) {
279 builder
.append("excludeNamespaces=").append(excludeNamespaces
.toString()).append(",");
281 if (excludeTableCFsMap
!= null) {
282 builder
.append("excludeTableCFsMap=").append(excludeTableCFsMap
.toString()).append(",");
285 if (namespaces
!= null) {
286 builder
.append("namespaces=").append(namespaces
.toString()).append(",");
288 if (tableCFsMap
!= null) {
289 builder
.append("tableCFs=").append(tableCFsMap
.toString()).append(",");
292 builder
.append("bandwidth=").append(bandwidth
).append(",");
293 builder
.append("serial=").append(serial
);
294 if (this.remoteWALDir
!= null) {
295 builder
.append(",remoteWALDir=").append(remoteWALDir
);
297 return builder
.toString();
301 * Decide whether the table need replicate to the peer cluster
302 * @param table name of the table
303 * @return true if the table need replicate to the peer cluster
305 public boolean needToReplicate(TableName table
) {
306 return needToReplicate(table
, null);
310 * Decide whether the passed family of the table need replicate to the peer cluster according to
312 * @param table name of the table
313 * @param family family name
314 * @return true if (the family of) the table need replicate to the peer cluster.
315 * If passed family is null, return true if any CFs of the table need replicate;
316 * If passed family is not null, return true if the passed family need replicate.
318 public boolean needToReplicate(TableName table
, byte[] family
) {
319 String namespace
= table
.getNamespaceAsString();
320 if (replicateAllUserTables
) {
321 // replicate all user tables, but filter by exclude namespaces and table-cfs config
322 if (excludeNamespaces
!= null && excludeNamespaces
.contains(namespace
)) {
325 // trap here, must check existence first since HashMap allows null value.
326 if (excludeTableCFsMap
== null || !excludeTableCFsMap
.containsKey(table
)) {
329 Collection
<String
> cfs
= excludeTableCFsMap
.get(table
);
330 // If cfs is null or empty then we can make sure that we do not need to replicate this table,
331 // otherwise, we may still need to replicate the table but filter out some families.
332 return cfs
!= null && !cfs
.isEmpty()
333 // If exclude-table-cfs contains passed family then we make sure that we do not need to
334 // replicate this family.
335 && (family
== null || !cfs
.contains(Bytes
.toString(family
)));
337 // Not replicate all user tables, so filter by namespaces and table-cfs config
338 if (namespaces
== null && tableCFsMap
== null) {
341 // First filter by namespaces config
342 // If table's namespace in peer config, all the tables data are applicable for replication
343 if (namespaces
!= null && namespaces
.contains(namespace
)) {
346 // If table-cfs contains this table then we can make sure that we need replicate some CFs of
347 // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty.
348 return tableCFsMap
!= null && tableCFsMap
.containsKey(table
)
349 && (family
== null || CollectionUtils
.isEmpty(tableCFsMap
.get(table
))
350 // If table-cfs must contain passed family then we need to replicate this family.
351 || tableCFsMap
.get(table
).contains(Bytes
.toString(family
)));