2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertTrue
;
22 import java
.util
.HashMap
;
23 import java
.util
.HashSet
;
24 import java
.util
.List
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
31 import org
.apache
.hadoop
.hbase
.HConstants
;
32 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.Admin
;
35 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
36 import org
.apache
.hadoop
.hbase
.client
.Connection
;
37 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
38 import org
.apache
.hadoop
.hbase
.client
.Table
;
39 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
40 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.TestBulkLoadReplication
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
44 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
45 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
46 import org
.apache
.hadoop
.hbase
.zookeeper
.RecoverableZooKeeper
;
47 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKUtil
;
48 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
50 import org
.junit
.After
;
51 import org
.junit
.Before
;
52 import org
.junit
.BeforeClass
;
53 import org
.junit
.ClassRule
;
54 import org
.junit
.Test
;
55 import org
.junit
.experimental
.categories
.Category
;
56 import org
.slf4j
.Logger
;
57 import org
.slf4j
.LoggerFactory
;
60 * Testcase for HBASE-23098
62 // LargeTest because spins up four clusters.
63 @Category({ ReplicationTests
.class, LargeTests
.class })
64 public final class TestNamespaceReplicationWithBulkLoadedData
extends TestBulkLoadReplication
{
66 public static final HBaseClassTestRule CLASS_RULE
=
67 HBaseClassTestRule
.forClass(TestNamespaceReplicationWithBulkLoadedData
.class);
68 private static final Logger LOG
=
69 LoggerFactory
.getLogger(TestNamespaceReplicationWithBulkLoadedData
.class);
71 private static final HBaseTestingUtility UTIL4
= new HBaseTestingUtility();
72 private static final String PEER4_CLUSTER_ID
= "peer4";
73 private static final String PEER4_NS
= "ns_peer1";
74 private static final String PEER4_NS_TABLE
= "ns_peer2";
76 private static final Configuration CONF4
= UTIL4
.getConfiguration();
78 private static final String NS1
= "ns1";
79 private static final String NS2
= "ns2";
81 private static final TableName NS1_TABLE
= TableName
.valueOf(NS1
+ ":t1_syncup");
82 private static final TableName NS2_TABLE
= TableName
.valueOf(NS2
+ ":t2_syncup");
85 public static void setUpBeforeClass() throws Exception
{
86 setupBulkLoadConfigsForCluster(CONF4
, PEER4_CLUSTER_ID
);
87 setupConfig(UTIL4
, "/4");
88 TestBulkLoadReplication
.setUpBeforeClass();
92 private static void startFourthCluster() throws Exception
{
93 LOG
.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
94 UTIL4
.setZkCluster(UTIL1
.getZkCluster());
95 UTIL4
.startMiniCluster(NUM_SLAVES1
);
97 TableDescriptor table
= TableDescriptorBuilder
.newBuilder(tableName
)
98 .setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(famName
).setMaxVersions(100)
99 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
100 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
102 Connection connection4
= ConnectionFactory
.createConnection(CONF4
);
103 try (Admin admin4
= connection4
.getAdmin()) {
104 admin4
.createTable(table
, HBaseTestingUtility
.KEYS_FOR_HBA_CREATE_TABLE
);
106 UTIL4
.waitUntilAllRegionsAssigned(tableName
);
111 public void setUpBase() throws Exception
{
112 /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
113 * and this test add the fourth cluster.
114 * So we have following topology:
121 * The 1 -> 4 has two peers,
122 * ns_peer1: ns1 -> ns1 (validate this peer hfile-refs)
123 * ns_peer1 configuration is NAMESPACES => ["ns1"]
125 * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is
126 * ns_peer2 configuration is NAMESPACES => ["ns2"],
127 * TABLE_CFS => { "ns2:t2_syncup" => []}
129 * The 1 -> 2 has one peer, this peer configuration is
130 * add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
136 TableDescriptor table1
= TableDescriptorBuilder
.newBuilder(NS1_TABLE
)
138 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
139 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
140 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
142 TableDescriptor table2
= TableDescriptorBuilder
.newBuilder(NS2_TABLE
)
144 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
145 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
146 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
148 Admin admin1
= UTIL1
.getAdmin();
149 admin1
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
150 admin1
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
151 admin1
.createTable(table1
);
152 admin1
.createTable(table2
);
154 Admin admin2
= UTIL2
.getAdmin();
155 admin2
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
156 admin2
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
157 admin2
.createTable(table1
);
158 admin2
.createTable(table2
);
160 Admin admin3
= UTIL3
.getAdmin();
161 admin3
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
162 admin3
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
163 admin3
.createTable(table1
);
164 admin3
.createTable(table2
);
166 Admin admin4
= UTIL4
.getAdmin();
167 admin4
.createNamespace(NamespaceDescriptor
.create(NS1
).build());
168 admin4
.createNamespace(NamespaceDescriptor
.create(NS2
).build());
169 admin4
.createTable(table1
);
170 admin4
.createTable(table2
);
173 * Set ns_peer1 1: ns1 -> 2: ns1
175 * add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
176 * NAMESPACES => ["ns1"]
178 Set
<String
> namespaces
= new HashSet
<>();
180 ReplicationPeerConfig rpc4_ns
=
181 ReplicationPeerConfig
.newBuilder().setClusterKey(UTIL4
.getClusterKey())
182 .setReplicateAllUserTables(false).setNamespaces(namespaces
).build();
183 admin1
.addReplicationPeer(PEER4_NS
, rpc4_ns
);
186 * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
188 * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
189 * NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
191 Map
<TableName
, List
<String
>> tableCFsMap
= new HashMap
<>();
192 tableCFsMap
.put(NS2_TABLE
, null);
193 ReplicationPeerConfig rpc4_ns_table
=
194 ReplicationPeerConfig
.newBuilder().setClusterKey(UTIL4
.getClusterKey())
195 .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap
).build();
196 admin1
.addReplicationPeer(PEER4_NS_TABLE
, rpc4_ns_table
);
201 public void tearDownBase() throws Exception
{
202 super.tearDownBase();
203 TableDescriptor table1
= TableDescriptorBuilder
.newBuilder(NS1_TABLE
)
205 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
206 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
207 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
209 TableDescriptor table2
= TableDescriptorBuilder
.newBuilder(NS2_TABLE
)
211 ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
212 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
213 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
214 Admin admin1
= UTIL1
.getAdmin();
215 admin1
.disableTable(table1
.getTableName());
216 admin1
.deleteTable(table1
.getTableName());
217 admin1
.disableTable(table2
.getTableName());
218 admin1
.deleteTable(table2
.getTableName());
219 admin1
.deleteNamespace(NS1
);
220 admin1
.deleteNamespace(NS2
);
222 Admin admin2
= UTIL2
.getAdmin();
223 admin2
.disableTable(table1
.getTableName());
224 admin2
.deleteTable(table1
.getTableName());
225 admin2
.disableTable(table2
.getTableName());
226 admin2
.deleteTable(table2
.getTableName());
227 admin2
.deleteNamespace(NS1
);
228 admin2
.deleteNamespace(NS2
);
230 Admin admin3
= UTIL3
.getAdmin();
231 admin3
.disableTable(table1
.getTableName());
232 admin3
.deleteTable(table1
.getTableName());
233 admin3
.disableTable(table2
.getTableName());
234 admin3
.deleteTable(table2
.getTableName());
235 admin3
.deleteNamespace(NS1
);
236 admin3
.deleteNamespace(NS2
);
238 Admin admin4
= UTIL4
.getAdmin();
239 admin4
.disableTable(table1
.getTableName());
240 admin4
.deleteTable(table1
.getTableName());
241 admin4
.disableTable(table2
.getTableName());
242 admin4
.deleteTable(table2
.getTableName());
243 admin4
.deleteNamespace(NS1
);
244 admin4
.deleteNamespace(NS2
);
245 UTIL1
.getAdmin().removeReplicationPeer(PEER4_NS
);
246 UTIL1
.getAdmin().removeReplicationPeer(PEER4_NS_TABLE
);
251 public void testBulkLoadReplicationActiveActive() throws Exception
{
252 Table peer1TestTable
= UTIL1
.getConnection().getTable(TestReplicationBase
.tableName
);
253 Table peer2TestTable
= UTIL2
.getConnection().getTable(TestReplicationBase
.tableName
);
254 Table peer3TestTable
= UTIL3
.getConnection().getTable(TestReplicationBase
.tableName
);
255 Table notPeerTable
= UTIL4
.getConnection().getTable(TestReplicationBase
.tableName
);
256 Table ns1Table
= UTIL4
.getConnection().getTable(NS1_TABLE
);
257 Table ns2Table
= UTIL4
.getConnection().getTable(NS2_TABLE
);
259 // case1: The ns1 tables will be replicate to cluster4
260 byte[] row
= Bytes
.toBytes("002_ns_peer");
261 byte[] value
= Bytes
.toBytes("v2");
262 bulkLoadOnCluster(ns1Table
.getName(), row
, value
, UTIL1
);
263 waitForReplication(ns1Table
, 1, NB_RETRIES
);
264 assertTableHasValue(ns1Table
, row
, value
);
266 // case2: The ns2:t2_syncup will be replicate to cluster4
267 // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
268 row
= Bytes
.toBytes("003_ns_table_peer");
269 value
= Bytes
.toBytes("v2");
270 bulkLoadOnCluster(ns2Table
.getName(), row
, value
, UTIL1
);
271 waitForReplication(ns2Table
, 1, NB_RETRIES
);
272 assertTableHasValue(ns2Table
, row
, value
);
274 // case3: The table test will be replicate to cluster1,cluster2,cluster3
275 // not replicate to cluster4, because we not set other peer for that tables.
276 row
= Bytes
.toBytes("001_nopeer");
277 value
= Bytes
.toBytes("v1");
278 assertBulkLoadConditions(tableName
, row
, value
, UTIL1
, peer1TestTable
,
279 peer2TestTable
, peer3TestTable
);
280 assertTableNoValue(notPeerTable
, row
, value
); // 1 -> 4, table is empty
282 // Verify hfile-refs for 1:ns_peer1, expect is empty
283 MiniZooKeeperCluster zkCluster
= UTIL1
.getZkCluster();
284 ZKWatcher watcher
= new ZKWatcher(UTIL1
.getConfiguration(), "TestZnodeHFiles-refs", null);
285 RecoverableZooKeeper zk
= ZKUtil
.connect(UTIL1
.getConfiguration(), watcher
);
286 ZKReplicationQueueStorage replicationQueueStorage
=
287 new ZKReplicationQueueStorage(watcher
, UTIL1
.getConfiguration());
288 Set
<String
> hfiles
= replicationQueueStorage
.getAllHFileRefs();
289 assertTrue(hfiles
.isEmpty());