2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertTrue
;
22 import java
.util
.HashMap
;
23 import java
.util
.HashSet
;
24 import java
.util
.List
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
31 import org
.apache
.hadoop
.hbase
.HConstants
;
32 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.Admin
;
35 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
36 import org
.apache
.hadoop
.hbase
.client
.Connection
;
37 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
38 import org
.apache
.hadoop
.hbase
.client
.Table
;
39 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.TestBulkLoadReplication
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
44 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
45 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
46 import org
.apache
.hadoop
.hbase
.zookeeper
.RecoverableZooKeeper
;
47 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
49 import org
.junit
.After
;
50 import org
.junit
.Before
;
51 import org
.junit
.BeforeClass
;
52 import org
.junit
.ClassRule
;
53 import org
.junit
.Test
;
54 import org
.junit
.experimental
.categories
.Category
;
55 import org
.slf4j
.Logger
;
56 import org
.slf4j
.LoggerFactory
;
59 * Testcase for HBASE-23098
61 // LargeTest because spins up four clusters.
62 @Category({ ReplicationTests
.class, LargeTests
.class })
63 public final class TestNamespaceReplicationWithBulkLoadedData
extends TestBulkLoadReplication
{
65 public static final HBaseClassTestRule CLASS_RULE
=
66 HBaseClassTestRule
.forClass(TestNamespaceReplicationWithBulkLoadedData
.class);
67 private static final Logger LOG
=
68 LoggerFactory
.getLogger(TestNamespaceReplicationWithBulkLoadedData
.class);
70 private static final HBaseTestingUtil UTIL4
= new HBaseTestingUtil();
71 private static final String PEER4_CLUSTER_ID
= "peer4";
72 private static final String PEER4_NS
= "ns_peer1";
73 private static final String PEER4_NS_TABLE
= "ns_peer2";
75 private static final Configuration CONF4
= UTIL4
.getConfiguration();
77 private static final String NS1
= "ns1";
78 private static final String NS2
= "ns2";
80 private static final TableName NS1_TABLE
= TableName
.valueOf(NS1
+ ":t1_syncup");
81 private static final TableName NS2_TABLE
= TableName
.valueOf(NS2
+ ":t2_syncup");
84 public static void setUpBeforeClass() throws Exception
{
85 setupBulkLoadConfigsForCluster(CONF4
, PEER4_CLUSTER_ID
);
86 setupConfig(UTIL4
, "/4");
87 TestBulkLoadReplication
.setUpBeforeClass();
91 private static void startFourthCluster() throws Exception
{
92 LOG
.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
93 UTIL4
.setZkCluster(UTIL1
.getZkCluster());
94 UTIL4
.startMiniCluster(NUM_SLAVES1
);
96 TableDescriptor table
= TableDescriptorBuilder
.newBuilder(tableName
)
97 .setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(famName
).setMaxVersions(100)
98 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
99 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
101 Connection connection4
= ConnectionFactory
.createConnection(CONF4
);
102 try (Admin admin4
= connection4
.getAdmin()) {
103 admin4
.createTable(table
, HBaseTestingUtil
.KEYS_FOR_HBA_CREATE_TABLE
);
105 UTIL4
.waitUntilAllRegionsAssigned(tableName
);
110 public void setUpBase() throws Exception
{
111 /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
112 * and this test add the fourth cluster.
113 * So we have following topology:
120 * The 1 -> 4 has two peers,
121 * ns_peer1: ns1 -> ns1 (validate this peer hfile-refs)
122 * ns_peer1 configuration is NAMESPACES => ["ns1"]
124 * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is
125 * ns_peer2 configuration is NAMESPACES => ["ns2"],
126 * TABLE_CFS => { "ns2:t2_syncup" => []}
128 * The 1 -> 2 has one peer, this peer configuration is
129 * add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
135 TableDescriptor table1
= TableDescriptorBuilder
.newBuilder(NS1_TABLE
)
137 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
138 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
139 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
141 TableDescriptor table2
= TableDescriptorBuilder
.newBuilder(NS2_TABLE
)
143 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
144 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
145 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
147 Admin admin1
= UTIL1
.getAdmin();
148 admin1
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
149 admin1
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
150 admin1
.createTable(table1
);
151 admin1
.createTable(table2
);
153 Admin admin2
= UTIL2
.getAdmin();
154 admin2
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
155 admin2
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
156 admin2
.createTable(table1
);
157 admin2
.createTable(table2
);
159 Admin admin3
= UTIL3
.getAdmin();
160 admin3
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
161 admin3
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
162 admin3
.createTable(table1
);
163 admin3
.createTable(table2
);
165 Admin admin4
= UTIL4
.getAdmin();
166 admin4
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
167 admin4
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
168 admin4
.createTable(table1
);
169 admin4
.createTable(table2
);
172 * Set ns_peer1 1: ns1 -> 2: ns1
174 * add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
175 * NAMESPACES => ["ns1"]
177 Set
<String
> namespaces
= new HashSet
<>();
179 ReplicationPeerConfig rpc4_ns
=
180 ReplicationPeerConfig
.newBuilder().setClusterKey(UTIL4
.getClusterKey())
181 .setReplicateAllUserTables(false).setNamespaces(namespaces
).build();
182 admin1
.addReplicationPeer(PEER4_NS
, rpc4_ns
);
185 * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
187 * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
188 * NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
190 Map
<TableName
, List
<String
>> tableCFsMap
= new HashMap
<>();
191 tableCFsMap
.put(NS2_TABLE
, null);
192 ReplicationPeerConfig rpc4_ns_table
=
193 ReplicationPeerConfig
.newBuilder().setClusterKey(UTIL4
.getClusterKey())
194 .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap
).build();
195 admin1
.addReplicationPeer(PEER4_NS_TABLE
, rpc4_ns_table
);
200 public void tearDownBase() throws Exception
{
201 super.tearDownBase();
202 TableDescriptor table1
= TableDescriptorBuilder
.newBuilder(NS1_TABLE
)
204 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
205 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
206 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
208 TableDescriptor table2
= TableDescriptorBuilder
.newBuilder(NS2_TABLE
)
210 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
211 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
212 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
213 Admin admin1
= UTIL1
.getAdmin();
214 admin1
.disableTable(table1
.getTableName());
215 admin1
.deleteTable(table1
.getTableName());
216 admin1
.disableTable(table2
.getTableName());
217 admin1
.deleteTable(table2
.getTableName());
218 admin1
.deleteNamespace(NS1
);
219 admin1
.deleteNamespace(NS2
);
221 Admin admin2
= UTIL2
.getAdmin();
222 admin2
.disableTable(table1
.getTableName());
223 admin2
.deleteTable(table1
.getTableName());
224 admin2
.disableTable(table2
.getTableName());
225 admin2
.deleteTable(table2
.getTableName());
226 admin2
.deleteNamespace(NS1
);
227 admin2
.deleteNamespace(NS2
);
229 Admin admin3
= UTIL3
.getAdmin();
230 admin3
.disableTable(table1
.getTableName());
231 admin3
.deleteTable(table1
.getTableName());
232 admin3
.disableTable(table2
.getTableName());
233 admin3
.deleteTable(table2
.getTableName());
234 admin3
.deleteNamespace(NS1
);
235 admin3
.deleteNamespace(NS2
);
237 Admin admin4
= UTIL4
.getAdmin();
238 admin4
.disableTable(table1
.getTableName());
239 admin4
.deleteTable(table1
.getTableName());
240 admin4
.disableTable(table2
.getTableName());
241 admin4
.deleteTable(table2
.getTableName());
242 admin4
.deleteNamespace(NS1
);
243 admin4
.deleteNamespace(NS2
);
244 UTIL1
.getAdmin().removeReplicationPeer(PEER4_NS
);
245 UTIL1
.getAdmin().removeReplicationPeer(PEER4_NS_TABLE
);
250 public void testBulkLoadReplicationActiveActive() throws Exception
{
251 Table peer1TestTable
= UTIL1
.getConnection().getTable(TestReplicationBase
.tableName
);
252 Table peer2TestTable
= UTIL2
.getConnection().getTable(TestReplicationBase
.tableName
);
253 Table peer3TestTable
= UTIL3
.getConnection().getTable(TestReplicationBase
.tableName
);
254 Table notPeerTable
= UTIL4
.getConnection().getTable(TestReplicationBase
.tableName
);
255 Table ns1Table
= UTIL4
.getConnection().getTable(NS1_TABLE
);
256 Table ns2Table
= UTIL4
.getConnection().getTable(NS2_TABLE
);
258 // case1: The ns1 tables will be replicate to cluster4
259 byte[] row
= Bytes
.toBytes("002_ns_peer");
260 byte[] value
= Bytes
.toBytes("v2");
261 bulkLoadOnCluster(ns1Table
.getName(), row
, value
, UTIL1
);
262 waitForReplication(ns1Table
, 1, NB_RETRIES
);
263 assertTableHasValue(ns1Table
, row
, value
);
265 // case2: The ns2:t2_syncup will be replicate to cluster4
266 // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
267 row
= Bytes
.toBytes("003_ns_table_peer");
268 value
= Bytes
.toBytes("v2");
269 bulkLoadOnCluster(ns2Table
.getName(), row
, value
, UTIL1
);
270 waitForReplication(ns2Table
, 1, NB_RETRIES
);
271 assertTableHasValue(ns2Table
, row
, value
);
273 // case3: The table test will be replicate to cluster1,cluster2,cluster3
274 // not replicate to cluster4, because we not set other peer for that tables.
275 row
= Bytes
.toBytes("001_nopeer");
276 value
= Bytes
.toBytes("v1");
277 assertBulkLoadConditions(tableName
, row
, value
, UTIL1
, peer1TestTable
,
278 peer2TestTable
, peer3TestTable
);
279 assertTableNoValue(notPeerTable
, row
, value
); // 1 -> 4, table is empty
281 // Verify hfile-refs for 1:ns_peer1, expect is empty
282 MiniZooKeeperCluster zkCluster
= UTIL1
.getZkCluster();
283 ZKWatcher watcher
= new ZKWatcher(UTIL1
.getConfiguration(), "TestZnodeHFiles-refs", null);
284 RecoverableZooKeeper zk
= RecoverableZooKeeper
.connect(UTIL1
.getConfiguration(), watcher
);
285 ZKReplicationQueueStorage replicationQueueStorage
=
286 new ZKReplicationQueueStorage(watcher
, UTIL1
.getConfiguration());
287 Set
<String
> hfiles
= replicationQueueStorage
.getAllHFileRefs();
288 assertTrue(hfiles
.isEmpty());