2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
.replication
;
21 import java
.io
.IOException
;
22 import java
.util
.ArrayList
;
24 import org
.apache
.yetus
.audience
.InterfaceAudience
;
25 import org
.slf4j
.Logger
;
26 import org
.slf4j
.LoggerFactory
;
27 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
28 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.util
.concurrent
.AbstractService
;
31 * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal
34 // This class has been made InterfaceAudience.Private in 2.0.0. It used to be
35 // LimitedPrivate. See HBASE-15982.
36 @InterfaceAudience.Private
37 public abstract class BaseReplicationEndpoint
extends AbstractService
38 implements ReplicationEndpoint
{
40 private static final Logger LOG
= LoggerFactory
.getLogger(BaseReplicationEndpoint
.class);
41 public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
42 = "hbase.replication.source.custom.walentryfilters";
43 protected Context ctx
;
46 public void init(Context context
) throws IOException
{
49 if (this.ctx
!= null){
50 ReplicationPeer peer
= this.ctx
.getReplicationPeer();
52 peer
.registerPeerConfigListener(this);
54 LOG
.warn("Not tracking replication peer config changes for Peer Id " + this.ctx
.getPeerId() +
55 " because there's no such peer");
62 * No-op implementation for subclasses to override if they wish to execute logic if their config changes
64 public void peerConfigUpdated(ReplicationPeerConfig rpc
){
68 /** Returns a default set of filters */
70 public WALEntryFilter
getWALEntryfilter() {
71 ArrayList
<WALEntryFilter
> filters
= Lists
.newArrayList();
72 WALEntryFilter scopeFilter
= getScopeWALEntryFilter();
73 if (scopeFilter
!= null) {
74 filters
.add(scopeFilter
);
76 WALEntryFilter tableCfFilter
= getNamespaceTableCfWALEntryFilter();
77 if (tableCfFilter
!= null) {
78 filters
.add(tableCfFilter
);
80 if (ctx
!= null && ctx
.getPeerConfig() != null) {
81 String filterNameCSV
= ctx
.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY
);
82 if (filterNameCSV
!= null && !filterNameCSV
.isEmpty()) {
83 String
[] filterNames
= filterNameCSV
.split(",");
84 for (String filterName
: filterNames
) {
86 Class
<?
> clazz
= Class
.forName(filterName
);
87 filters
.add((WALEntryFilter
) clazz
.getDeclaredConstructor().newInstance());
88 } catch (Exception e
) {
89 LOG
.error("Unable to create WALEntryFilter " + filterName
, e
);
94 return filters
.isEmpty() ?
null : new ChainWALEntryFilter(filters
);
97 /** Returns a WALEntryFilter for checking the scope. Subclasses can
98 * return null if they don't want this filter */
99 protected WALEntryFilter
getScopeWALEntryFilter() {
100 return new ScopeWALEntryFilter();
103 /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
104 * return null if they don't want this filter */
105 protected WALEntryFilter
getNamespaceTableCfWALEntryFilter() {
106 return new NamespaceTableCfWALEntryFilter(ctx
.getReplicationPeer());
110 public boolean canReplicateToSameCluster() {
115 public boolean isStarting() {
116 return state() == State
.STARTING
;