HBASE-19811 Fix findbugs and error-prone warnings in hbase-server (branch-2)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestPerTableCFReplication.java
blob98b3fdade317e1e72df1c3405fc097ea5033bbad
1 /*
2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 package org.apache.hadoop.hbase.replication;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.TableName;
35 import org.apache.hadoop.hbase.client.Admin;
36 import org.apache.hadoop.hbase.client.Connection;
37 import org.apache.hadoop.hbase.client.ConnectionFactory;
38 import org.apache.hadoop.hbase.client.Delete;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.Table;
43 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
44 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
45 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
46 import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
47 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
48 import org.apache.hadoop.hbase.testclassification.LargeTests;
49 import org.apache.hadoop.hbase.util.Bytes;
50 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
51 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
52 import org.junit.AfterClass;
53 import org.junit.BeforeClass;
54 import org.junit.Rule;
55 import org.junit.Test;
56 import org.junit.experimental.categories.Category;
57 import org.junit.rules.TestName;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 import static org.junit.Assert.*;
63 @Category({FlakeyTests.class, LargeTests.class})
64 public class TestPerTableCFReplication {
66 private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class);
68 private static Configuration conf1;
69 private static Configuration conf2;
70 private static Configuration conf3;
72 private static HBaseTestingUtility utility1;
73 private static HBaseTestingUtility utility2;
74 private static HBaseTestingUtility utility3;
75 private static final long SLEEP_TIME = 500;
76 private static final int NB_RETRIES = 100;
78 private static final TableName tableName = TableName.valueOf("test");
79 private static final TableName tabAName = TableName.valueOf("TA");
80 private static final TableName tabBName = TableName.valueOf("TB");
81 private static final TableName tabCName = TableName.valueOf("TC");
82 private static final byte[] famName = Bytes.toBytes("f");
83 private static final byte[] f1Name = Bytes.toBytes("f1");
84 private static final byte[] f2Name = Bytes.toBytes("f2");
85 private static final byte[] f3Name = Bytes.toBytes("f3");
86 private static final byte[] row1 = Bytes.toBytes("row1");
87 private static final byte[] row2 = Bytes.toBytes("row2");
88 private static final byte[] noRepfamName = Bytes.toBytes("norep");
89 private static final byte[] val = Bytes.toBytes("myval");
91 private static HTableDescriptor table;
92 private static HTableDescriptor tabA;
93 private static HTableDescriptor tabB;
94 private static HTableDescriptor tabC;
96 @Rule
97 public TestName name = new TestName();
99 @BeforeClass
100 public static void setUpBeforeClass() throws Exception {
101 conf1 = HBaseConfiguration.create();
102 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
103 // smaller block size and capacity to trigger more operations
104 // and test them
105 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
106 conf1.setInt("replication.source.size.capacity", 1024);
107 conf1.setLong("replication.source.sleepforretries", 100);
108 conf1.setInt("hbase.regionserver.maxlogs", 10);
109 conf1.setLong("hbase.master.logcleaner.ttl", 10);
110 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
111 conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
112 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
114 utility1 = new HBaseTestingUtility(conf1);
115 utility1.startMiniZKCluster();
116 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
117 new ZKWatcher(conf1, "cluster1", null, true);
119 conf2 = new Configuration(conf1);
120 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
122 conf3 = new Configuration(conf1);
123 conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
125 utility2 = new HBaseTestingUtility(conf2);
126 utility2.setZkCluster(miniZK);
127 new ZKWatcher(conf2, "cluster3", null, true);
129 utility3 = new HBaseTestingUtility(conf3);
130 utility3.setZkCluster(miniZK);
131 new ZKWatcher(conf3, "cluster3", null, true);
133 table = new HTableDescriptor(tableName);
134 HColumnDescriptor fam = new HColumnDescriptor(famName);
135 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
136 table.addFamily(fam);
137 fam = new HColumnDescriptor(noRepfamName);
138 table.addFamily(fam);
140 tabA = new HTableDescriptor(tabAName);
141 fam = new HColumnDescriptor(f1Name);
142 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
143 tabA.addFamily(fam);
144 fam = new HColumnDescriptor(f2Name);
145 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
146 tabA.addFamily(fam);
147 fam = new HColumnDescriptor(f3Name);
148 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
149 tabA.addFamily(fam);
151 tabB = new HTableDescriptor(tabBName);
152 fam = new HColumnDescriptor(f1Name);
153 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
154 tabB.addFamily(fam);
155 fam = new HColumnDescriptor(f2Name);
156 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
157 tabB.addFamily(fam);
158 fam = new HColumnDescriptor(f3Name);
159 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
160 tabB.addFamily(fam);
162 tabC = new HTableDescriptor(tabCName);
163 fam = new HColumnDescriptor(f1Name);
164 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
165 tabC.addFamily(fam);
166 fam = new HColumnDescriptor(f2Name);
167 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
168 tabC.addFamily(fam);
169 fam = new HColumnDescriptor(f3Name);
170 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
171 tabC.addFamily(fam);
173 utility1.startMiniCluster();
174 utility2.startMiniCluster();
175 utility3.startMiniCluster();
178 @AfterClass
179 public static void tearDownAfterClass() throws Exception {
180 utility3.shutdownMiniCluster();
181 utility2.shutdownMiniCluster();
182 utility1.shutdownMiniCluster();
185 @Test
186 public void testParseTableCFsFromConfig() {
187 Map<TableName, List<String>> tabCFsMap = null;
189 // 1. null or empty string, result should be null
190 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
191 assertEquals(null, tabCFsMap);
193 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
194 assertEquals(null, tabCFsMap);
196 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" ");
197 assertEquals(null, tabCFsMap);
199 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
200 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
201 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
203 // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
204 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
205 assertEquals(1, tabCFsMap.size()); // only one table
206 assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
207 assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
208 assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
210 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
211 assertEquals(1, tabCFsMap.size()); // only one table
212 assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
213 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
214 assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
215 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
217 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
218 assertEquals(1, tabCFsMap.size()); // only one table
219 assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
220 assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
221 assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf
222 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1"
223 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
225 // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
226 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
227 + ":cf1 ; " + tableName3 + ":cf1,cf3");
228 // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
229 assertEquals(3, tabCFsMap.size());
230 assertTrue(tabCFsMap.containsKey(tableName1));
231 assertTrue(tabCFsMap.containsKey(tableName2));
232 assertTrue(tabCFsMap.containsKey(tableName3));
233 // 3.2 table "tab1" : null cf-list
234 assertEquals(null, tabCFsMap.get(tableName1));
235 // 3.3 table "tab2" : cf-list contains a single cf "cf1"
236 assertEquals(1, tabCFsMap.get(tableName2).size());
237 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
238 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
239 assertEquals(2, tabCFsMap.get(tableName3).size());
240 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
241 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
243 // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
244 // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
245 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
246 tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
247 // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
248 assertEquals(3, tabCFsMap.size());
249 assertTrue(tabCFsMap.containsKey(tableName1));
250 assertTrue(tabCFsMap.containsKey(tableName2));
251 assertTrue(tabCFsMap.containsKey(tableName3));
252 // 4.2 table "tab1" : null cf-list
253 assertEquals(null, tabCFsMap.get(tableName1));
254 // 4.3 table "tab2" : cf-list contains a single cf "cf1"
255 assertEquals(1, tabCFsMap.get(tableName2).size());
256 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
257 // 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
258 assertEquals(2, tabCFsMap.get(tableName3).size());
259 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
260 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
262 // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
263 // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
264 tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
265 tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
266 // 5.1 no "tableName1" and "tableName2", only "tableName3"
267 assertEquals(1, tabCFsMap.size()); // only one table
268 assertFalse(tabCFsMap.containsKey(tableName1));
269 assertFalse(tabCFsMap.containsKey(tableName2));
270 assertTrue(tabCFsMap.containsKey(tableName3));
271 // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3"
272 assertEquals(2, tabCFsMap.get(tableName3).size());
273 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
274 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
277 @Test
278 public void testTableCFsHelperConverter() {
280 ReplicationProtos.TableCF[] tableCFs = null;
281 Map<TableName, List<String>> tabCFsMap = null;
283 // 1. null or empty string, result should be null
284 assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
286 tabCFsMap = new HashMap<>();
287 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
288 assertEquals(0, tableCFs.length);
290 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
291 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
292 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
294 // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
295 tabCFsMap.clear();
296 tabCFsMap.put(tableName1, null);
297 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
298 assertEquals(1, tableCFs.length); // only one table
299 assertEquals(tableName1.toString(),
300 tableCFs[0].getTableName().getQualifier().toStringUtf8());
301 assertEquals(0, tableCFs[0].getFamiliesCount());
303 tabCFsMap.clear();
304 tabCFsMap.put(tableName2, new ArrayList<>());
305 tabCFsMap.get(tableName2).add("cf1");
306 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
307 assertEquals(1, tableCFs.length); // only one table
308 assertEquals(tableName2.toString(),
309 tableCFs[0].getTableName().getQualifier().toStringUtf8());
310 assertEquals(1, tableCFs[0].getFamiliesCount());
311 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
313 tabCFsMap.clear();
314 tabCFsMap.put(tableName3, new ArrayList<>());
315 tabCFsMap.get(tableName3).add("cf1");
316 tabCFsMap.get(tableName3).add("cf3");
317 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
318 assertEquals(1, tableCFs.length);
319 assertEquals(tableName3.toString(),
320 tableCFs[0].getTableName().getQualifier().toStringUtf8());
321 assertEquals(2, tableCFs[0].getFamiliesCount());
322 assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
323 assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
325 tabCFsMap.clear();
326 tabCFsMap.put(tableName1, null);
327 tabCFsMap.put(tableName2, new ArrayList<>());
328 tabCFsMap.get(tableName2).add("cf1");
329 tabCFsMap.put(tableName3, new ArrayList<>());
330 tabCFsMap.get(tableName3).add("cf1");
331 tabCFsMap.get(tableName3).add("cf3");
333 tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
334 assertEquals(3, tableCFs.length);
335 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
336 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
337 assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
339 assertEquals(0,
340 ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
342 assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
343 .getFamiliesCount());
344 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
345 .getFamilies(0).toStringUtf8());
347 assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
348 .getFamiliesCount());
349 assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
350 .getFamilies(0).toStringUtf8());
351 assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
352 .getFamilies(1).toStringUtf8());
354 tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
355 assertEquals(3, tabCFsMap.size());
356 assertTrue(tabCFsMap.containsKey(tableName1));
357 assertTrue(tabCFsMap.containsKey(tableName2));
358 assertTrue(tabCFsMap.containsKey(tableName3));
359 // 3.2 table "tab1" : null cf-list
360 assertEquals(null, tabCFsMap.get(tableName1));
361 // 3.3 table "tab2" : cf-list contains a single cf "cf1"
362 assertEquals(1, tabCFsMap.get(tableName2).size());
363 assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
364 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
365 assertEquals(2, tabCFsMap.get(tableName3).size());
366 assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
367 assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
370 @Test(timeout=300000)
371 public void testPerTableCFReplication() throws Exception {
372 LOG.info("testPerTableCFReplication");
373 ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
374 Connection connection1 = ConnectionFactory.createConnection(conf1);
375 Connection connection2 = ConnectionFactory.createConnection(conf2);
376 Connection connection3 = ConnectionFactory.createConnection(conf3);
377 try {
378 Admin admin1 = connection1.getAdmin();
379 Admin admin2 = connection2.getAdmin();
380 Admin admin3 = connection3.getAdmin();
382 admin1.createTable(tabA);
383 admin1.createTable(tabB);
384 admin1.createTable(tabC);
385 admin2.createTable(tabA);
386 admin2.createTable(tabB);
387 admin2.createTable(tabC);
388 admin3.createTable(tabA);
389 admin3.createTable(tabB);
390 admin3.createTable(tabC);
392 Table htab1A = connection1.getTable(tabAName);
393 Table htab2A = connection2.getTable(tabAName);
394 Table htab3A = connection3.getTable(tabAName);
396 Table htab1B = connection1.getTable(tabBName);
397 Table htab2B = connection2.getTable(tabBName);
398 Table htab3B = connection3.getTable(tabBName);
400 Table htab1C = connection1.getTable(tabCName);
401 Table htab2C = connection2.getTable(tabCName);
402 Table htab3C = connection3.getTable(tabCName);
404 // A. add cluster2/cluster3 as peers to cluster1
405 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
406 rpc2.setClusterKey(utility2.getClusterKey());
407 rpc2.setReplicateAllUserTables(false);
408 Map<TableName, List<String>> tableCFs = new HashMap<>();
409 tableCFs.put(tabCName, null);
410 tableCFs.put(tabBName, new ArrayList<>());
411 tableCFs.get(tabBName).add("f1");
412 tableCFs.get(tabBName).add("f3");
413 replicationAdmin.addPeer("2", rpc2, tableCFs);
415 ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
416 rpc3.setClusterKey(utility3.getClusterKey());
417 rpc3.setReplicateAllUserTables(false);
418 tableCFs.clear();
419 tableCFs.put(tabAName, null);
420 tableCFs.put(tabBName, new ArrayList<>());
421 tableCFs.get(tabBName).add("f1");
422 tableCFs.get(tabBName).add("f2");
423 replicationAdmin.addPeer("3", rpc3, tableCFs);
425 // A1. tableA can only replicated to cluster3
426 putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
427 ensureRowNotReplicated(row1, f1Name, htab2A);
428 deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
430 putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
431 ensureRowNotReplicated(row1, f2Name, htab2A);
432 deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
434 putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
435 ensureRowNotReplicated(row1, f3Name, htab2A);
436 deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
438 // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3
439 putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
440 deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
442 // cf 'f2' of tableB can only replicated to cluster3
443 putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
444 ensureRowNotReplicated(row1, f2Name, htab2B);
445 deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
447 // cf 'f3' of tableB can only replicated to cluster2
448 putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
449 ensureRowNotReplicated(row1, f3Name, htab3B);
450 deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
452 // A3. tableC can only replicated to cluster2
453 putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
454 ensureRowNotReplicated(row1, f1Name, htab3C);
455 deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
457 putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
458 ensureRowNotReplicated(row1, f2Name, htab3C);
459 deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
461 putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
462 ensureRowNotReplicated(row1, f3Name, htab3C);
463 deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
465 // B. change peers' replicable table-cf config
466 tableCFs.clear();
467 tableCFs.put(tabAName, new ArrayList<>());
468 tableCFs.get(tabAName).add("f1");
469 tableCFs.get(tabAName).add("f2");
470 tableCFs.put(tabCName, new ArrayList<>());
471 tableCFs.get(tabCName).add("f2");
472 tableCFs.get(tabCName).add("f3");
473 replicationAdmin.setPeerTableCFs("2", tableCFs);
475 tableCFs.clear();
476 tableCFs.put(tabBName, null);
477 tableCFs.put(tabCName, new ArrayList<>());
478 tableCFs.get(tabCName).add("f3");
479 replicationAdmin.setPeerTableCFs("3", tableCFs);
481 // B1. cf 'f1' of tableA can only replicated to cluster2
482 putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
483 ensureRowNotReplicated(row2, f1Name, htab3A);
484 deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
485 // cf 'f2' of tableA can only replicated to cluster2
486 putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
487 ensureRowNotReplicated(row2, f2Name, htab3A);
488 deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
489 // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3
490 putAndWaitWithFamily(row2, f3Name, htab1A);
491 ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
492 deleteAndWaitWithFamily(row2, f3Name, htab1A);
494 // B2. tableB can only replicated to cluster3
495 putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
496 ensureRowNotReplicated(row2, f1Name, htab2B);
497 deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
499 putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
500 ensureRowNotReplicated(row2, f2Name, htab2B);
501 deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
503 putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
504 ensureRowNotReplicated(row2, f3Name, htab2B);
505 deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
507 // B3. cf 'f1' of tableC non-replicable to either cluster
508 putAndWaitWithFamily(row2, f1Name, htab1C);
509 ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
510 deleteAndWaitWithFamily(row2, f1Name, htab1C);
511 // cf 'f2' of tableC can only replicated to cluster2
512 putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
513 ensureRowNotReplicated(row2, f2Name, htab3C);
514 deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
515 // cf 'f3' of tableC can replicated to cluster2 and cluster3
516 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
517 deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
518 } finally {
519 connection1.close();
520 connection2.close();
521 connection3.close();
525 private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
526 Get get = new Get(row);
527 get.addFamily(fam);
528 for (Table table : tables) {
529 Result res = table.get(get);
530 assertEquals(0, res.size());
534 private void deleteAndWaitWithFamily(byte[] row, byte[] fam,
535 Table source, Table... targets)
536 throws Exception {
537 Delete del = new Delete(row);
538 del.addFamily(fam);
539 source.delete(del);
541 Get get = new Get(row);
542 get.addFamily(fam);
543 for (int i = 0; i < NB_RETRIES; i++) {
544 if (i==NB_RETRIES-1) {
545 fail("Waited too much time for del replication");
547 boolean removedFromAll = true;
548 for (Table target : targets) {
549 Result res = target.get(get);
550 if (res.size() >= 1) {
551 LOG.info("Row not deleted");
552 removedFromAll = false;
553 break;
556 if (removedFromAll) {
557 break;
558 } else {
559 Thread.sleep(SLEEP_TIME);
564 private void putAndWaitWithFamily(byte[] row, byte[] fam,
565 Table source, Table... targets)
566 throws Exception {
567 Put put = new Put(row);
568 put.addColumn(fam, row, val);
569 source.put(put);
571 Get get = new Get(row);
572 get.addFamily(fam);
573 for (int i = 0; i < NB_RETRIES; i++) {
574 if (i==NB_RETRIES-1) {
575 fail("Waited too much time for put replication");
577 boolean replicatedToAll = true;
578 for (Table target : targets) {
579 Result res = target.get(get);
580 if (res.isEmpty()) {
581 LOG.info("Row not available");
582 replicatedToAll = false;
583 break;
584 } else {
585 assertEquals(1, res.size());
586 assertArrayEquals(val, res.value());
589 if (replicatedToAll) {
590 break;
591 } else {
592 Thread.sleep(SLEEP_TIME);