2 * Copyright The Apache Software Foundation
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 package org
.apache
.hadoop
.hbase
.replication
;
22 import java
.io
.IOException
;
23 import java
.util
.ArrayList
;
24 import java
.util
.HashMap
;
25 import java
.util
.List
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
31 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.client
.Admin
;
36 import org
.apache
.hadoop
.hbase
.client
.Connection
;
37 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
38 import org
.apache
.hadoop
.hbase
.client
.Delete
;
39 import org
.apache
.hadoop
.hbase
.client
.Get
;
40 import org
.apache
.hadoop
.hbase
.client
.Put
;
41 import org
.apache
.hadoop
.hbase
.client
.Result
;
42 import org
.apache
.hadoop
.hbase
.client
.Table
;
43 import org
.apache
.hadoop
.hbase
.client
.replication
.ReplicationAdmin
;
44 import org
.apache
.hadoop
.hbase
.client
.replication
.ReplicationPeerConfigUtil
;
45 import org
.apache
.hadoop
.hbase
.coprocessor
.CoprocessorHost
;
46 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ReplicationProtos
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.FlakeyTests
;
48 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
49 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
50 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
51 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKWatcher
;
52 import org
.junit
.AfterClass
;
53 import org
.junit
.BeforeClass
;
54 import org
.junit
.Rule
;
55 import org
.junit
.Test
;
56 import org
.junit
.experimental
.categories
.Category
;
57 import org
.junit
.rules
.TestName
;
58 import org
.slf4j
.Logger
;
59 import org
.slf4j
.LoggerFactory
;
61 import static org
.junit
.Assert
.*;
63 @Category({FlakeyTests
.class, LargeTests
.class})
64 public class TestPerTableCFReplication
{
66 private static final Logger LOG
= LoggerFactory
.getLogger(TestPerTableCFReplication
.class);
68 private static Configuration conf1
;
69 private static Configuration conf2
;
70 private static Configuration conf3
;
72 private static HBaseTestingUtility utility1
;
73 private static HBaseTestingUtility utility2
;
74 private static HBaseTestingUtility utility3
;
75 private static final long SLEEP_TIME
= 500;
76 private static final int NB_RETRIES
= 100;
78 private static final TableName tableName
= TableName
.valueOf("test");
79 private static final TableName tabAName
= TableName
.valueOf("TA");
80 private static final TableName tabBName
= TableName
.valueOf("TB");
81 private static final TableName tabCName
= TableName
.valueOf("TC");
82 private static final byte[] famName
= Bytes
.toBytes("f");
83 private static final byte[] f1Name
= Bytes
.toBytes("f1");
84 private static final byte[] f2Name
= Bytes
.toBytes("f2");
85 private static final byte[] f3Name
= Bytes
.toBytes("f3");
86 private static final byte[] row1
= Bytes
.toBytes("row1");
87 private static final byte[] row2
= Bytes
.toBytes("row2");
88 private static final byte[] noRepfamName
= Bytes
.toBytes("norep");
89 private static final byte[] val
= Bytes
.toBytes("myval");
91 private static HTableDescriptor table
;
92 private static HTableDescriptor tabA
;
93 private static HTableDescriptor tabB
;
94 private static HTableDescriptor tabC
;
97 public TestName name
= new TestName();
100 public static void setUpBeforeClass() throws Exception
{
101 conf1
= HBaseConfiguration
.create();
102 conf1
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/1");
103 // smaller block size and capacity to trigger more operations
105 conf1
.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
106 conf1
.setInt("replication.source.size.capacity", 1024);
107 conf1
.setLong("replication.source.sleepforretries", 100);
108 conf1
.setInt("hbase.regionserver.maxlogs", 10);
109 conf1
.setLong("hbase.master.logcleaner.ttl", 10);
110 conf1
.setLong(HConstants
.THREAD_WAKE_FREQUENCY
, 100);
111 conf1
.setStrings(CoprocessorHost
.USER_REGION_COPROCESSOR_CONF_KEY
,
112 "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
114 utility1
= new HBaseTestingUtility(conf1
);
115 utility1
.startMiniZKCluster();
116 MiniZooKeeperCluster miniZK
= utility1
.getZkCluster();
117 new ZKWatcher(conf1
, "cluster1", null, true);
119 conf2
= new Configuration(conf1
);
120 conf2
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/2");
122 conf3
= new Configuration(conf1
);
123 conf3
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/3");
125 utility2
= new HBaseTestingUtility(conf2
);
126 utility2
.setZkCluster(miniZK
);
127 new ZKWatcher(conf2
, "cluster3", null, true);
129 utility3
= new HBaseTestingUtility(conf3
);
130 utility3
.setZkCluster(miniZK
);
131 new ZKWatcher(conf3
, "cluster3", null, true);
133 table
= new HTableDescriptor(tableName
);
134 HColumnDescriptor fam
= new HColumnDescriptor(famName
);
135 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
136 table
.addFamily(fam
);
137 fam
= new HColumnDescriptor(noRepfamName
);
138 table
.addFamily(fam
);
140 tabA
= new HTableDescriptor(tabAName
);
141 fam
= new HColumnDescriptor(f1Name
);
142 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
144 fam
= new HColumnDescriptor(f2Name
);
145 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
147 fam
= new HColumnDescriptor(f3Name
);
148 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
151 tabB
= new HTableDescriptor(tabBName
);
152 fam
= new HColumnDescriptor(f1Name
);
153 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
155 fam
= new HColumnDescriptor(f2Name
);
156 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
158 fam
= new HColumnDescriptor(f3Name
);
159 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
162 tabC
= new HTableDescriptor(tabCName
);
163 fam
= new HColumnDescriptor(f1Name
);
164 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
166 fam
= new HColumnDescriptor(f2Name
);
167 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
169 fam
= new HColumnDescriptor(f3Name
);
170 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
173 utility1
.startMiniCluster();
174 utility2
.startMiniCluster();
175 utility3
.startMiniCluster();
179 public static void tearDownAfterClass() throws Exception
{
180 utility3
.shutdownMiniCluster();
181 utility2
.shutdownMiniCluster();
182 utility1
.shutdownMiniCluster();
186 public void testParseTableCFsFromConfig() {
187 Map
<TableName
, List
<String
>> tabCFsMap
= null;
189 // 1. null or empty string, result should be null
190 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(null);
191 assertEquals(null, tabCFsMap
);
193 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig("");
194 assertEquals(null, tabCFsMap
);
196 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(" ");
197 assertEquals(null, tabCFsMap
);
199 final TableName tableName1
= TableName
.valueOf(name
.getMethodName() + "1");
200 final TableName tableName2
= TableName
.valueOf(name
.getMethodName() + "2");
201 final TableName tableName3
= TableName
.valueOf(name
.getMethodName() + "3");
203 // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
204 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(tableName1
.getNameAsString());
205 assertEquals(1, tabCFsMap
.size()); // only one table
206 assertTrue(tabCFsMap
.containsKey(tableName1
)); // its table name is "tableName1"
207 assertFalse(tabCFsMap
.containsKey(tableName2
)); // not other table
208 assertEquals(null, tabCFsMap
.get(tableName1
)); // null cf-list,
210 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(tableName2
+ ":cf1");
211 assertEquals(1, tabCFsMap
.size()); // only one table
212 assertTrue(tabCFsMap
.containsKey(tableName2
)); // its table name is "tableName2"
213 assertFalse(tabCFsMap
.containsKey(tableName1
)); // not other table
214 assertEquals(1, tabCFsMap
.get(tableName2
).size()); // cf-list contains only 1 cf
215 assertEquals("cf1", tabCFsMap
.get(tableName2
).get(0));// the only cf is "cf1"
217 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(tableName3
+ " : cf1 , cf3");
218 assertEquals(1, tabCFsMap
.size()); // only one table
219 assertTrue(tabCFsMap
.containsKey(tableName3
)); // its table name is "tableName2"
220 assertFalse(tabCFsMap
.containsKey(tableName1
)); // not other table
221 assertEquals(2, tabCFsMap
.get(tableName3
).size()); // cf-list contains 2 cf
222 assertTrue(tabCFsMap
.get(tableName3
).contains("cf1"));// contains "cf1"
223 assertTrue(tabCFsMap
.get(tableName3
).contains("cf3"));// contains "cf3"
225 // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
226 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(tableName1
+ " ; " + tableName2
227 + ":cf1 ; " + tableName3
+ ":cf1,cf3");
228 // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
229 assertEquals(3, tabCFsMap
.size());
230 assertTrue(tabCFsMap
.containsKey(tableName1
));
231 assertTrue(tabCFsMap
.containsKey(tableName2
));
232 assertTrue(tabCFsMap
.containsKey(tableName3
));
233 // 3.2 table "tab1" : null cf-list
234 assertEquals(null, tabCFsMap
.get(tableName1
));
235 // 3.3 table "tab2" : cf-list contains a single cf "cf1"
236 assertEquals(1, tabCFsMap
.get(tableName2
).size());
237 assertEquals("cf1", tabCFsMap
.get(tableName2
).get(0));
238 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
239 assertEquals(2, tabCFsMap
.get(tableName3
).size());
240 assertTrue(tabCFsMap
.get(tableName3
).contains("cf1"));
241 assertTrue(tabCFsMap
.get(tableName3
).contains("cf3"));
243 // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
244 // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
245 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(
246 tableName1
+ " ; ; " + tableName2
+ ":cf1 ; " + tableName3
+ ":cf1,,cf3 ;");
247 // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
248 assertEquals(3, tabCFsMap
.size());
249 assertTrue(tabCFsMap
.containsKey(tableName1
));
250 assertTrue(tabCFsMap
.containsKey(tableName2
));
251 assertTrue(tabCFsMap
.containsKey(tableName3
));
252 // 4.2 table "tab1" : null cf-list
253 assertEquals(null, tabCFsMap
.get(tableName1
));
254 // 4.3 table "tab2" : cf-list contains a single cf "cf1"
255 assertEquals(1, tabCFsMap
.get(tableName2
).size());
256 assertEquals("cf1", tabCFsMap
.get(tableName2
).get(0));
257 // 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
258 assertEquals(2, tabCFsMap
.get(tableName3
).size());
259 assertTrue(tabCFsMap
.get(tableName3
).contains("cf1"));
260 assertTrue(tabCFsMap
.get(tableName3
).contains("cf3"));
262 // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
263 // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
264 tabCFsMap
= ReplicationPeerConfigUtil
.parseTableCFsFromConfig(
265 tableName1
+ ":tt:cf1 ; " + tableName2
+ "::cf1 ; " + tableName3
+ ":cf1,cf3");
266 // 5.1 no "tableName1" and "tableName2", only "tableName3"
267 assertEquals(1, tabCFsMap
.size()); // only one table
268 assertFalse(tabCFsMap
.containsKey(tableName1
));
269 assertFalse(tabCFsMap
.containsKey(tableName2
));
270 assertTrue(tabCFsMap
.containsKey(tableName3
));
271 // 5.2 table "tableName3" : cf-list contains "cf1" and "cf3"
272 assertEquals(2, tabCFsMap
.get(tableName3
).size());
273 assertTrue(tabCFsMap
.get(tableName3
).contains("cf1"));
274 assertTrue(tabCFsMap
.get(tableName3
).contains("cf3"));
278 public void testTableCFsHelperConverter() {
280 ReplicationProtos
.TableCF
[] tableCFs
= null;
281 Map
<TableName
, List
<String
>> tabCFsMap
= null;
283 // 1. null or empty string, result should be null
284 assertNull(ReplicationPeerConfigUtil
.convert(tabCFsMap
));
286 tabCFsMap
= new HashMap
<>();
287 tableCFs
= ReplicationPeerConfigUtil
.convert(tabCFsMap
);
288 assertEquals(0, tableCFs
.length
);
290 final TableName tableName1
= TableName
.valueOf(name
.getMethodName() + "1");
291 final TableName tableName2
= TableName
.valueOf(name
.getMethodName() + "2");
292 final TableName tableName3
= TableName
.valueOf(name
.getMethodName() + "3");
294 // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
296 tabCFsMap
.put(tableName1
, null);
297 tableCFs
= ReplicationPeerConfigUtil
.convert(tabCFsMap
);
298 assertEquals(1, tableCFs
.length
); // only one table
299 assertEquals(tableName1
.toString(),
300 tableCFs
[0].getTableName().getQualifier().toStringUtf8());
301 assertEquals(0, tableCFs
[0].getFamiliesCount());
304 tabCFsMap
.put(tableName2
, new ArrayList
<>());
305 tabCFsMap
.get(tableName2
).add("cf1");
306 tableCFs
= ReplicationPeerConfigUtil
.convert(tabCFsMap
);
307 assertEquals(1, tableCFs
.length
); // only one table
308 assertEquals(tableName2
.toString(),
309 tableCFs
[0].getTableName().getQualifier().toStringUtf8());
310 assertEquals(1, tableCFs
[0].getFamiliesCount());
311 assertEquals("cf1", tableCFs
[0].getFamilies(0).toStringUtf8());
314 tabCFsMap
.put(tableName3
, new ArrayList
<>());
315 tabCFsMap
.get(tableName3
).add("cf1");
316 tabCFsMap
.get(tableName3
).add("cf3");
317 tableCFs
= ReplicationPeerConfigUtil
.convert(tabCFsMap
);
318 assertEquals(1, tableCFs
.length
);
319 assertEquals(tableName3
.toString(),
320 tableCFs
[0].getTableName().getQualifier().toStringUtf8());
321 assertEquals(2, tableCFs
[0].getFamiliesCount());
322 assertEquals("cf1", tableCFs
[0].getFamilies(0).toStringUtf8());
323 assertEquals("cf3", tableCFs
[0].getFamilies(1).toStringUtf8());
326 tabCFsMap
.put(tableName1
, null);
327 tabCFsMap
.put(tableName2
, new ArrayList
<>());
328 tabCFsMap
.get(tableName2
).add("cf1");
329 tabCFsMap
.put(tableName3
, new ArrayList
<>());
330 tabCFsMap
.get(tableName3
).add("cf1");
331 tabCFsMap
.get(tableName3
).add("cf3");
333 tableCFs
= ReplicationPeerConfigUtil
.convert(tabCFsMap
);
334 assertEquals(3, tableCFs
.length
);
335 assertNotNull(ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName1
.toString()));
336 assertNotNull(ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName2
.toString()));
337 assertNotNull(ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName3
.toString()));
340 ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName1
.toString()).getFamiliesCount());
342 assertEquals(1, ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName2
.toString())
343 .getFamiliesCount());
344 assertEquals("cf1", ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName2
.toString())
345 .getFamilies(0).toStringUtf8());
347 assertEquals(2, ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName3
.toString())
348 .getFamiliesCount());
349 assertEquals("cf1", ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName3
.toString())
350 .getFamilies(0).toStringUtf8());
351 assertEquals("cf3", ReplicationPeerConfigUtil
.getTableCF(tableCFs
, tableName3
.toString())
352 .getFamilies(1).toStringUtf8());
354 tabCFsMap
= ReplicationPeerConfigUtil
.convert2Map(tableCFs
);
355 assertEquals(3, tabCFsMap
.size());
356 assertTrue(tabCFsMap
.containsKey(tableName1
));
357 assertTrue(tabCFsMap
.containsKey(tableName2
));
358 assertTrue(tabCFsMap
.containsKey(tableName3
));
359 // 3.2 table "tab1" : null cf-list
360 assertEquals(null, tabCFsMap
.get(tableName1
));
361 // 3.3 table "tab2" : cf-list contains a single cf "cf1"
362 assertEquals(1, tabCFsMap
.get(tableName2
).size());
363 assertEquals("cf1", tabCFsMap
.get(tableName2
).get(0));
364 // 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
365 assertEquals(2, tabCFsMap
.get(tableName3
).size());
366 assertTrue(tabCFsMap
.get(tableName3
).contains("cf1"));
367 assertTrue(tabCFsMap
.get(tableName3
).contains("cf3"));
370 @Test(timeout
=300000)
371 public void testPerTableCFReplication() throws Exception
{
372 LOG
.info("testPerTableCFReplication");
373 ReplicationAdmin replicationAdmin
= new ReplicationAdmin(conf1
);
374 Connection connection1
= ConnectionFactory
.createConnection(conf1
);
375 Connection connection2
= ConnectionFactory
.createConnection(conf2
);
376 Connection connection3
= ConnectionFactory
.createConnection(conf3
);
378 Admin admin1
= connection1
.getAdmin();
379 Admin admin2
= connection2
.getAdmin();
380 Admin admin3
= connection3
.getAdmin();
382 admin1
.createTable(tabA
);
383 admin1
.createTable(tabB
);
384 admin1
.createTable(tabC
);
385 admin2
.createTable(tabA
);
386 admin2
.createTable(tabB
);
387 admin2
.createTable(tabC
);
388 admin3
.createTable(tabA
);
389 admin3
.createTable(tabB
);
390 admin3
.createTable(tabC
);
392 Table htab1A
= connection1
.getTable(tabAName
);
393 Table htab2A
= connection2
.getTable(tabAName
);
394 Table htab3A
= connection3
.getTable(tabAName
);
396 Table htab1B
= connection1
.getTable(tabBName
);
397 Table htab2B
= connection2
.getTable(tabBName
);
398 Table htab3B
= connection3
.getTable(tabBName
);
400 Table htab1C
= connection1
.getTable(tabCName
);
401 Table htab2C
= connection2
.getTable(tabCName
);
402 Table htab3C
= connection3
.getTable(tabCName
);
404 // A. add cluster2/cluster3 as peers to cluster1
405 ReplicationPeerConfig rpc2
= new ReplicationPeerConfig();
406 rpc2
.setClusterKey(utility2
.getClusterKey());
407 rpc2
.setReplicateAllUserTables(false);
408 Map
<TableName
, List
<String
>> tableCFs
= new HashMap
<>();
409 tableCFs
.put(tabCName
, null);
410 tableCFs
.put(tabBName
, new ArrayList
<>());
411 tableCFs
.get(tabBName
).add("f1");
412 tableCFs
.get(tabBName
).add("f3");
413 replicationAdmin
.addPeer("2", rpc2
, tableCFs
);
415 ReplicationPeerConfig rpc3
= new ReplicationPeerConfig();
416 rpc3
.setClusterKey(utility3
.getClusterKey());
417 rpc3
.setReplicateAllUserTables(false);
419 tableCFs
.put(tabAName
, null);
420 tableCFs
.put(tabBName
, new ArrayList
<>());
421 tableCFs
.get(tabBName
).add("f1");
422 tableCFs
.get(tabBName
).add("f2");
423 replicationAdmin
.addPeer("3", rpc3
, tableCFs
);
425 // A1. tableA can only replicated to cluster3
426 putAndWaitWithFamily(row1
, f1Name
, htab1A
, htab3A
);
427 ensureRowNotReplicated(row1
, f1Name
, htab2A
);
428 deleteAndWaitWithFamily(row1
, f1Name
, htab1A
, htab3A
);
430 putAndWaitWithFamily(row1
, f2Name
, htab1A
, htab3A
);
431 ensureRowNotReplicated(row1
, f2Name
, htab2A
);
432 deleteAndWaitWithFamily(row1
, f2Name
, htab1A
, htab3A
);
434 putAndWaitWithFamily(row1
, f3Name
, htab1A
, htab3A
);
435 ensureRowNotReplicated(row1
, f3Name
, htab2A
);
436 deleteAndWaitWithFamily(row1
, f3Name
, htab1A
, htab3A
);
438 // A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3
439 putAndWaitWithFamily(row1
, f1Name
, htab1B
, htab2B
, htab3B
);
440 deleteAndWaitWithFamily(row1
, f1Name
, htab1B
, htab2B
, htab3B
);
442 // cf 'f2' of tableB can only replicated to cluster3
443 putAndWaitWithFamily(row1
, f2Name
, htab1B
, htab3B
);
444 ensureRowNotReplicated(row1
, f2Name
, htab2B
);
445 deleteAndWaitWithFamily(row1
, f2Name
, htab1B
, htab3B
);
447 // cf 'f3' of tableB can only replicated to cluster2
448 putAndWaitWithFamily(row1
, f3Name
, htab1B
, htab2B
);
449 ensureRowNotReplicated(row1
, f3Name
, htab3B
);
450 deleteAndWaitWithFamily(row1
, f3Name
, htab1B
, htab2B
);
452 // A3. tableC can only replicated to cluster2
453 putAndWaitWithFamily(row1
, f1Name
, htab1C
, htab2C
);
454 ensureRowNotReplicated(row1
, f1Name
, htab3C
);
455 deleteAndWaitWithFamily(row1
, f1Name
, htab1C
, htab2C
);
457 putAndWaitWithFamily(row1
, f2Name
, htab1C
, htab2C
);
458 ensureRowNotReplicated(row1
, f2Name
, htab3C
);
459 deleteAndWaitWithFamily(row1
, f2Name
, htab1C
, htab2C
);
461 putAndWaitWithFamily(row1
, f3Name
, htab1C
, htab2C
);
462 ensureRowNotReplicated(row1
, f3Name
, htab3C
);
463 deleteAndWaitWithFamily(row1
, f3Name
, htab1C
, htab2C
);
465 // B. change peers' replicable table-cf config
467 tableCFs
.put(tabAName
, new ArrayList
<>());
468 tableCFs
.get(tabAName
).add("f1");
469 tableCFs
.get(tabAName
).add("f2");
470 tableCFs
.put(tabCName
, new ArrayList
<>());
471 tableCFs
.get(tabCName
).add("f2");
472 tableCFs
.get(tabCName
).add("f3");
473 replicationAdmin
.setPeerTableCFs("2", tableCFs
);
476 tableCFs
.put(tabBName
, null);
477 tableCFs
.put(tabCName
, new ArrayList
<>());
478 tableCFs
.get(tabCName
).add("f3");
479 replicationAdmin
.setPeerTableCFs("3", tableCFs
);
481 // B1. cf 'f1' of tableA can only replicated to cluster2
482 putAndWaitWithFamily(row2
, f1Name
, htab1A
, htab2A
);
483 ensureRowNotReplicated(row2
, f1Name
, htab3A
);
484 deleteAndWaitWithFamily(row2
, f1Name
, htab1A
, htab2A
);
485 // cf 'f2' of tableA can only replicated to cluster2
486 putAndWaitWithFamily(row2
, f2Name
, htab1A
, htab2A
);
487 ensureRowNotReplicated(row2
, f2Name
, htab3A
);
488 deleteAndWaitWithFamily(row2
, f2Name
, htab1A
, htab2A
);
489 // cf 'f3' of tableA isn't replicable to either cluster2 or cluster3
490 putAndWaitWithFamily(row2
, f3Name
, htab1A
);
491 ensureRowNotReplicated(row2
, f3Name
, htab2A
, htab3A
);
492 deleteAndWaitWithFamily(row2
, f3Name
, htab1A
);
494 // B2. tableB can only replicated to cluster3
495 putAndWaitWithFamily(row2
, f1Name
, htab1B
, htab3B
);
496 ensureRowNotReplicated(row2
, f1Name
, htab2B
);
497 deleteAndWaitWithFamily(row2
, f1Name
, htab1B
, htab3B
);
499 putAndWaitWithFamily(row2
, f2Name
, htab1B
, htab3B
);
500 ensureRowNotReplicated(row2
, f2Name
, htab2B
);
501 deleteAndWaitWithFamily(row2
, f2Name
, htab1B
, htab3B
);
503 putAndWaitWithFamily(row2
, f3Name
, htab1B
, htab3B
);
504 ensureRowNotReplicated(row2
, f3Name
, htab2B
);
505 deleteAndWaitWithFamily(row2
, f3Name
, htab1B
, htab3B
);
507 // B3. cf 'f1' of tableC non-replicable to either cluster
508 putAndWaitWithFamily(row2
, f1Name
, htab1C
);
509 ensureRowNotReplicated(row2
, f1Name
, htab2C
, htab3C
);
510 deleteAndWaitWithFamily(row2
, f1Name
, htab1C
);
511 // cf 'f2' of tableC can only replicated to cluster2
512 putAndWaitWithFamily(row2
, f2Name
, htab1C
, htab2C
);
513 ensureRowNotReplicated(row2
, f2Name
, htab3C
);
514 deleteAndWaitWithFamily(row2
, f2Name
, htab1C
, htab2C
);
515 // cf 'f3' of tableC can replicated to cluster2 and cluster3
516 putAndWaitWithFamily(row2
, f3Name
, htab1C
, htab2C
, htab3C
);
517 deleteAndWaitWithFamily(row2
, f3Name
, htab1C
, htab2C
, htab3C
);
525 private void ensureRowNotReplicated(byte[] row
, byte[] fam
, Table
... tables
) throws IOException
{
526 Get get
= new Get(row
);
528 for (Table table
: tables
) {
529 Result res
= table
.get(get
);
530 assertEquals(0, res
.size());
534 private void deleteAndWaitWithFamily(byte[] row
, byte[] fam
,
535 Table source
, Table
... targets
)
537 Delete del
= new Delete(row
);
541 Get get
= new Get(row
);
543 for (int i
= 0; i
< NB_RETRIES
; i
++) {
544 if (i
==NB_RETRIES
-1) {
545 fail("Waited too much time for del replication");
547 boolean removedFromAll
= true;
548 for (Table target
: targets
) {
549 Result res
= target
.get(get
);
550 if (res
.size() >= 1) {
551 LOG
.info("Row not deleted");
552 removedFromAll
= false;
556 if (removedFromAll
) {
559 Thread
.sleep(SLEEP_TIME
);
564 private void putAndWaitWithFamily(byte[] row
, byte[] fam
,
565 Table source
, Table
... targets
)
567 Put put
= new Put(row
);
568 put
.addColumn(fam
, row
, val
);
571 Get get
= new Get(row
);
573 for (int i
= 0; i
< NB_RETRIES
; i
++) {
574 if (i
==NB_RETRIES
-1) {
575 fail("Waited too much time for put replication");
577 boolean replicatedToAll
= true;
578 for (Table target
: targets
) {
579 Result res
= target
.get(get
);
581 LOG
.info("Row not available");
582 replicatedToAll
= false;
585 assertEquals(1, res
.size());
586 assertArrayEquals(val
, res
.value());
589 if (replicatedToAll
) {
592 Thread
.sleep(SLEEP_TIME
);