2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.AsyncConnectionConfiguration
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
21 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
22 import static org
.hamcrest
.CoreMatchers
.startsWith
;
23 import static org
.junit
.Assert
.assertEquals
;
24 import static org
.junit
.Assert
.assertFalse
;
25 import static org
.junit
.Assert
.assertNotNull
;
26 import static org
.junit
.Assert
.assertNull
;
27 import static org
.junit
.Assert
.assertThat
;
28 import static org
.junit
.Assert
.assertTrue
;
29 import static org
.junit
.Assert
.fail
;
31 import java
.io
.IOException
;
32 import java
.util
.ArrayList
;
33 import java
.util
.HashMap
;
34 import java
.util
.HashSet
;
35 import java
.util
.List
;
38 import java
.util
.concurrent
.CompletionException
;
39 import java
.util
.concurrent
.ExecutionException
;
40 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
41 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
42 import org
.apache
.hadoop
.hbase
.HConstants
;
43 import org
.apache
.hadoop
.hbase
.ServerName
;
44 import org
.apache
.hadoop
.hbase
.TableName
;
45 import org
.apache
.hadoop
.hbase
.replication
.ReplicationException
;
46 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
47 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerDescription
;
48 import org
.apache
.hadoop
.hbase
.replication
.ReplicationQueueStorage
;
49 import org
.apache
.hadoop
.hbase
.replication
.ReplicationStorageFactory
;
50 import org
.apache
.hadoop
.hbase
.replication
.VerifyWALEntriesReplicationEndpoint
;
51 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.HBaseInterClusterReplicationEndpoint
;
52 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
53 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
54 import org
.junit
.After
;
55 import org
.junit
.BeforeClass
;
56 import org
.junit
.ClassRule
;
57 import org
.junit
.Test
;
58 import org
.junit
.experimental
.categories
.Category
;
59 import org
.junit
.runner
.RunWith
;
60 import org
.junit
.runners
.Parameterized
;
63 * Class to test asynchronous replication admin operations.
65 @RunWith(Parameterized
.class)
66 @Category({ LargeTests
.class, ClientTests
.class })
67 public class TestAsyncReplicationAdminApi
extends TestAsyncAdminBase
{
70 public static final HBaseClassTestRule CLASS_RULE
=
71 HBaseClassTestRule
.forClass(TestAsyncReplicationAdminApi
.class);
73 private final String ID_ONE
= "1";
74 private final String KEY_ONE
= "127.0.0.1:2181:/hbase";
75 private final String ID_TWO
= "2";
76 private final String KEY_TWO
= "127.0.0.1:2181:/hbase2";
79 public static void setUpBeforeClass() throws Exception
{
80 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 60000);
81 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 120000);
82 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
83 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
84 TEST_UTIL
.startMiniCluster();
85 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
89 public void clearPeerAndQueues() throws IOException
, ReplicationException
{
91 admin
.removeReplicationPeer(ID_ONE
).join();
92 } catch (Exception e
) {
95 admin
.removeReplicationPeer(ID_TWO
).join();
96 } catch (Exception e
) {
98 ReplicationQueueStorage queueStorage
= ReplicationStorageFactory
99 .getReplicationQueueStorage(TEST_UTIL
.getZooKeeperWatcher(), TEST_UTIL
.getConfiguration());
100 for (ServerName serverName
: queueStorage
.getListOfReplicators()) {
101 for (String queue
: queueStorage
.getAllQueues(serverName
)) {
102 queueStorage
.removeQueue(serverName
, queue
);
108 public void testAddRemovePeer() throws Exception
{
109 ReplicationPeerConfig rpc1
= new ReplicationPeerConfig();
110 rpc1
.setClusterKey(KEY_ONE
);
111 ReplicationPeerConfig rpc2
= new ReplicationPeerConfig();
112 rpc2
.setClusterKey(KEY_TWO
);
114 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
115 // try adding the same (fails)
117 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
118 fail("Test case should fail as adding a same peer.");
119 } catch (CompletionException e
) {
122 assertEquals(1, admin
.listReplicationPeers().get().size());
123 // Try to remove an inexisting peer
125 admin
.removeReplicationPeer(ID_TWO
).join();
126 fail("Test case should fail as removing a inexisting peer.");
127 } catch (CompletionException e
) {
130 assertEquals(1, admin
.listReplicationPeers().get().size());
131 // Add a second since multi-slave is supported
132 admin
.addReplicationPeer(ID_TWO
, rpc2
).join();
133 assertEquals(2, admin
.listReplicationPeers().get().size());
134 // Remove the first peer we added
135 admin
.removeReplicationPeer(ID_ONE
).join();
136 assertEquals(1, admin
.listReplicationPeers().get().size());
137 admin
.removeReplicationPeer(ID_TWO
).join();
138 assertEquals(0, admin
.listReplicationPeers().get().size());
142 public void testPeerConfig() throws Exception
{
143 ReplicationPeerConfig config
= new ReplicationPeerConfig();
144 config
.setClusterKey(KEY_ONE
);
145 config
.getConfiguration().put("key1", "value1");
146 config
.getConfiguration().put("key2", "value2");
147 admin
.addReplicationPeer(ID_ONE
, config
).join();
149 List
<ReplicationPeerDescription
> peers
= admin
.listReplicationPeers().get();
150 assertEquals(1, peers
.size());
151 ReplicationPeerDescription peerOne
= peers
.get(0);
152 assertNotNull(peerOne
);
153 assertEquals("value1", peerOne
.getPeerConfig().getConfiguration().get("key1"));
154 assertEquals("value2", peerOne
.getPeerConfig().getConfiguration().get("key2"));
156 admin
.removeReplicationPeer(ID_ONE
).join();
160 public void testEnableDisablePeer() throws Exception
{
161 ReplicationPeerConfig rpc1
= new ReplicationPeerConfig();
162 rpc1
.setClusterKey(KEY_ONE
);
163 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
164 List
<ReplicationPeerDescription
> peers
= admin
.listReplicationPeers().get();
165 assertEquals(1, peers
.size());
166 assertTrue(peers
.get(0).isEnabled());
168 admin
.disableReplicationPeer(ID_ONE
).join();
169 peers
= admin
.listReplicationPeers().get();
170 assertEquals(1, peers
.size());
171 assertFalse(peers
.get(0).isEnabled());
172 admin
.removeReplicationPeer(ID_ONE
).join();
176 public void testAppendPeerTableCFs() throws Exception
{
177 ReplicationPeerConfig rpc1
= new ReplicationPeerConfig();
178 rpc1
.setClusterKey(KEY_ONE
);
179 final TableName tableName1
= TableName
.valueOf(tableName
.getNameAsString() + "t1");
180 final TableName tableName2
= TableName
.valueOf(tableName
.getNameAsString() + "t2");
181 final TableName tableName3
= TableName
.valueOf(tableName
.getNameAsString() + "t3");
182 final TableName tableName4
= TableName
.valueOf(tableName
.getNameAsString() + "t4");
183 final TableName tableName5
= TableName
.valueOf(tableName
.getNameAsString() + "t5");
184 final TableName tableName6
= TableName
.valueOf(tableName
.getNameAsString() + "t6");
187 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
188 rpc1
.setReplicateAllUserTables(false);
189 admin
.updateReplicationPeerConfig(ID_ONE
, rpc1
).join();
191 Map
<TableName
, List
<String
>> tableCFs
= new HashMap
<>();
193 // append table t1 to replication
194 tableCFs
.put(tableName1
, null);
195 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
196 Map
<TableName
, List
<String
>> result
=
197 admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
198 assertEquals(1, result
.size());
199 assertEquals(true, result
.containsKey(tableName1
));
200 assertNull(result
.get(tableName1
));
202 // append table t2 to replication
204 tableCFs
.put(tableName2
, null);
205 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
206 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
207 assertEquals(2, result
.size());
208 assertTrue("Should contain t1", result
.containsKey(tableName1
));
209 assertTrue("Should contain t2", result
.containsKey(tableName2
));
210 assertNull(result
.get(tableName1
));
211 assertNull(result
.get(tableName2
));
213 // append table column family: f1 of t3 to replication
215 tableCFs
.put(tableName3
, new ArrayList
<>());
216 tableCFs
.get(tableName3
).add("f1");
217 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
218 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
219 assertEquals(3, result
.size());
220 assertTrue("Should contain t1", result
.containsKey(tableName1
));
221 assertTrue("Should contain t2", result
.containsKey(tableName2
));
222 assertTrue("Should contain t3", result
.containsKey(tableName3
));
223 assertNull(result
.get(tableName1
));
224 assertNull(result
.get(tableName2
));
225 assertEquals(1, result
.get(tableName3
).size());
226 assertEquals("f1", result
.get(tableName3
).get(0));
228 // append table column family: f1,f2 of t4 to replication
230 tableCFs
.put(tableName4
, new ArrayList
<>());
231 tableCFs
.get(tableName4
).add("f1");
232 tableCFs
.get(tableName4
).add("f2");
233 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
234 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
235 assertEquals(4, result
.size());
236 assertTrue("Should contain t1", result
.containsKey(tableName1
));
237 assertTrue("Should contain t2", result
.containsKey(tableName2
));
238 assertTrue("Should contain t3", result
.containsKey(tableName3
));
239 assertTrue("Should contain t4", result
.containsKey(tableName4
));
240 assertNull(result
.get(tableName1
));
241 assertNull(result
.get(tableName2
));
242 assertEquals(1, result
.get(tableName3
).size());
243 assertEquals("f1", result
.get(tableName3
).get(0));
244 assertEquals(2, result
.get(tableName4
).size());
245 assertEquals("f1", result
.get(tableName4
).get(0));
246 assertEquals("f2", result
.get(tableName4
).get(1));
248 // append "table5" => [], then append "table5" => ["f1"]
250 tableCFs
.put(tableName5
, new ArrayList
<>());
251 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
253 tableCFs
.put(tableName5
, new ArrayList
<>());
254 tableCFs
.get(tableName5
).add("f1");
255 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
256 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
257 assertEquals(5, result
.size());
258 assertTrue("Should contain t5", result
.containsKey(tableName5
));
259 // null means replication all cfs of tab5
260 assertNull(result
.get(tableName5
));
262 // append "table6" => ["f1"], then append "table6" => []
264 tableCFs
.put(tableName6
, new ArrayList
<>());
265 tableCFs
.get(tableName6
).add("f1");
266 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
268 tableCFs
.put(tableName6
, new ArrayList
<>());
269 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
270 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
271 assertEquals(6, result
.size());
272 assertTrue("Should contain t6", result
.containsKey(tableName6
));
273 // null means replication all cfs of tab6
274 assertNull(result
.get(tableName6
));
276 admin
.removeReplicationPeer(ID_ONE
).join();
280 public void testRemovePeerTableCFs() throws Exception
{
281 ReplicationPeerConfig rpc1
= new ReplicationPeerConfig();
282 rpc1
.setClusterKey(KEY_ONE
);
283 final TableName tableName1
= TableName
.valueOf(tableName
.getNameAsString() + "t1");
284 final TableName tableName2
= TableName
.valueOf(tableName
.getNameAsString() + "t2");
285 final TableName tableName3
= TableName
.valueOf(tableName
.getNameAsString() + "t3");
286 final TableName tableName4
= TableName
.valueOf(tableName
.getNameAsString() + "t4");
288 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
289 rpc1
.setReplicateAllUserTables(false);
290 admin
.updateReplicationPeerConfig(ID_ONE
, rpc1
).join();
292 Map
<TableName
, List
<String
>> tableCFs
= new HashMap
<>();
294 tableCFs
.put(tableName3
, null);
295 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
296 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
297 } catch (CompletionException e
) {
298 assertTrue(e
.getCause() instanceof ReplicationException
);
300 assertNull(admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap());
303 tableCFs
.put(tableName1
, null);
304 tableCFs
.put(tableName2
, new ArrayList
<>());
305 tableCFs
.get(tableName2
).add("cf1");
306 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
309 tableCFs
.put(tableName3
, null);
310 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
311 fail("Test case should fail as removing table-cfs from a peer whose" +
312 " table-cfs didn't contain t3");
313 } catch (CompletionException e
) {
314 assertTrue(e
.getCause() instanceof ReplicationException
);
316 Map
<TableName
, List
<String
>> result
=
317 admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
318 assertEquals(2, result
.size());
319 assertTrue("Should contain t1", result
.containsKey(tableName1
));
320 assertTrue("Should contain t2", result
.containsKey(tableName2
));
321 assertNull(result
.get(tableName1
));
322 assertEquals(1, result
.get(tableName2
).size());
323 assertEquals("cf1", result
.get(tableName2
).get(0));
327 tableCFs
.put(tableName1
, new ArrayList
<>());
328 tableCFs
.get(tableName1
).add("cf1");
329 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
330 fail("Test case should fail, because table t1 didn't specify cfs in peer config");
331 } catch (CompletionException e
) {
332 assertTrue(e
.getCause() instanceof ReplicationException
);
335 tableCFs
.put(tableName1
, null);
336 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
337 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
338 assertEquals(1, result
.size());
339 assertEquals(1, result
.get(tableName2
).size());
340 assertEquals("cf1", result
.get(tableName2
).get(0));
344 tableCFs
.put(tableName2
, null);
345 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
346 fail("Test case should fail, because table t2 hase specified cfs in peer config");
347 } catch (CompletionException e
) {
348 assertTrue(e
.getCause() instanceof ReplicationException
);
351 tableCFs
.put(tableName2
, new ArrayList
<>());
352 tableCFs
.get(tableName2
).add("cf1");
353 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
354 assertNull(admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap());
357 tableCFs
.put(tableName4
, new ArrayList
<>());
358 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
359 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
360 assertNull(admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap());
362 admin
.removeReplicationPeer(ID_ONE
);
366 public void testSetPeerNamespaces() throws Exception
{
370 ReplicationPeerConfig rpc
= new ReplicationPeerConfig();
371 rpc
.setClusterKey(KEY_ONE
);
372 admin
.addReplicationPeer(ID_ONE
, rpc
).join();
373 rpc
.setReplicateAllUserTables(false);
374 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
376 // add ns1 and ns2 to peer config
377 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
378 Set
<String
> namespaces
= new HashSet
<>();
381 rpc
.setNamespaces(namespaces
);
382 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
383 namespaces
= admin
.getReplicationPeerConfig(ID_ONE
).get().getNamespaces();
384 assertEquals(2, namespaces
.size());
385 assertTrue(namespaces
.contains(ns1
));
386 assertTrue(namespaces
.contains(ns2
));
388 // update peer config only contains ns1
389 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
390 namespaces
= new HashSet
<>();
392 rpc
.setNamespaces(namespaces
);
393 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
394 namespaces
= admin
.getReplicationPeerConfig(ID_ONE
).get().getNamespaces();
395 assertEquals(1, namespaces
.size());
396 assertTrue(namespaces
.contains(ns1
));
398 admin
.removeReplicationPeer(ID_ONE
).join();
402 public void testNamespacesAndTableCfsConfigConflict() throws Exception
{
405 final TableName tableName1
= TableName
.valueOf(ns1
+ ":" + tableName
.getNameAsString() + "1");
406 final TableName tableName2
= TableName
.valueOf(ns2
+ ":" + tableName
.getNameAsString() + "2");
408 ReplicationPeerConfig rpc
= new ReplicationPeerConfig();
409 rpc
.setClusterKey(KEY_ONE
);
410 admin
.addReplicationPeer(ID_ONE
, rpc
).join();
411 rpc
.setReplicateAllUserTables(false);
412 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
414 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
415 Set
<String
> namespaces
= new HashSet
<String
>();
417 rpc
.setNamespaces(namespaces
);
418 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).get();
419 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
420 Map
<TableName
, List
<String
>> tableCfs
= new HashMap
<>();
421 tableCfs
.put(tableName1
, new ArrayList
<>());
422 rpc
.setTableCFsMap(tableCfs
);
424 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
426 "Test case should fail, because table " + tableName1
+ " conflict with namespace " + ns1
);
427 } catch (CompletionException e
) {
431 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
433 tableCfs
.put(tableName2
, new ArrayList
<>());
434 rpc
.setTableCFsMap(tableCfs
);
435 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).get();
436 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
439 rpc
.setNamespaces(namespaces
);
441 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
443 "Test case should fail, because namespace " + ns2
+ " conflict with table " + tableName2
);
444 } catch (CompletionException e
) {
448 admin
.removeReplicationPeer(ID_ONE
).join();
452 public void testPeerBandwidth() throws Exception
{
453 ReplicationPeerConfig rpc
= new ReplicationPeerConfig();
454 rpc
.setClusterKey(KEY_ONE
);
456 admin
.addReplicationPeer(ID_ONE
, rpc
).join();
457 rpc
= admin
.getReplicationPeerConfig(ID_ONE
).get();
458 assertEquals(0, rpc
.getBandwidth());
460 rpc
.setBandwidth(2097152);
461 admin
.updateReplicationPeerConfig(ID_ONE
, rpc
).join();
462 assertEquals(2097152, admin
.getReplicationPeerConfig(ID_ONE
).join().getBandwidth());
464 admin
.removeReplicationPeer(ID_ONE
).join();
468 public void testInvalidClusterKey() throws InterruptedException
{
470 admin
.addReplicationPeer(ID_ONE
,
471 ReplicationPeerConfig
.newBuilder().setClusterKey("whatever").build()).get();
473 } catch (ExecutionException e
) {
474 assertThat(e
.getCause(), instanceOf(DoNotRetryIOException
.class));
479 public void testInvalidReplicationEndpoint() throws InterruptedException
{
481 admin
.addReplicationPeer(ID_ONE
,
482 ReplicationPeerConfig
.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
484 } catch (ExecutionException e
) {
485 assertThat(e
.getCause(), instanceOf(DoNotRetryIOException
.class));
486 assertThat(e
.getCause().getMessage(), startsWith("Can not instantiate"));
491 public void testSetReplicationEndpoint() throws InterruptedException
, ExecutionException
{
492 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
494 .addReplicationPeer(ID_ONE
,
495 ReplicationPeerConfig
.newBuilder()
496 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint
.class.getName()).build())
499 // but we still need to check cluster key if we specify the default ReplicationEndpoint
502 .addReplicationPeer(ID_TWO
, ReplicationPeerConfig
.newBuilder()
503 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint
.class.getName()).build())
506 } catch (ExecutionException e
) {
507 assertThat(e
.getCause(), instanceOf(DoNotRetryIOException
.class));