HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / client / TestAsyncReplicationAdminApi.java
blob506f8e75f6fd7876414c289653a6eccd64e043bd
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.client;
20 import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY;
21 import static org.hamcrest.CoreMatchers.instanceOf;
22 import static org.hamcrest.CoreMatchers.startsWith;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertThat;
28 import static org.junit.Assert.assertTrue;
29 import static org.junit.Assert.fail;
31 import java.io.IOException;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CompletionException;
39 import java.util.concurrent.ExecutionException;
40 import org.apache.hadoop.hbase.DoNotRetryIOException;
41 import org.apache.hadoop.hbase.HBaseClassTestRule;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.ServerName;
44 import org.apache.hadoop.hbase.TableName;
45 import org.apache.hadoop.hbase.replication.ReplicationException;
46 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
47 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
48 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
49 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
50 import org.apache.hadoop.hbase.replication.VerifyWALEntriesReplicationEndpoint;
51 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
52 import org.apache.hadoop.hbase.testclassification.ClientTests;
53 import org.apache.hadoop.hbase.testclassification.LargeTests;
54 import org.junit.After;
55 import org.junit.BeforeClass;
56 import org.junit.ClassRule;
57 import org.junit.Test;
58 import org.junit.experimental.categories.Category;
59 import org.junit.runner.RunWith;
60 import org.junit.runners.Parameterized;
62 /**
63 * Class to test asynchronous replication admin operations.
65 @RunWith(Parameterized.class)
66 @Category({ LargeTests.class, ClientTests.class })
67 public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
69 @ClassRule
70 public static final HBaseClassTestRule CLASS_RULE =
71 HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
73 private final String ID_ONE = "1";
74 private final String KEY_ONE = "127.0.0.1:2181:/hbase";
75 private final String ID_TWO = "2";
76 private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
78 @BeforeClass
79 public static void setUpBeforeClass() throws Exception {
80 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
81 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
82 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
83 TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
84 TEST_UTIL.startMiniCluster();
85 ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
88 @After
89 public void clearPeerAndQueues() throws IOException, ReplicationException {
90 try {
91 admin.removeReplicationPeer(ID_ONE).join();
92 } catch (Exception e) {
94 try {
95 admin.removeReplicationPeer(ID_TWO).join();
96 } catch (Exception e) {
98 ReplicationQueueStorage queueStorage = ReplicationStorageFactory
99 .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
100 for (ServerName serverName : queueStorage.getListOfReplicators()) {
101 for (String queue : queueStorage.getAllQueues(serverName)) {
102 queueStorage.removeQueue(serverName, queue);
107 @Test
108 public void testAddRemovePeer() throws Exception {
109 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
110 rpc1.setClusterKey(KEY_ONE);
111 ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
112 rpc2.setClusterKey(KEY_TWO);
113 // Add a valid peer
114 admin.addReplicationPeer(ID_ONE, rpc1).join();
115 // try adding the same (fails)
116 try {
117 admin.addReplicationPeer(ID_ONE, rpc1).join();
118 fail("Test case should fail as adding a same peer.");
119 } catch (CompletionException e) {
120 // OK!
122 assertEquals(1, admin.listReplicationPeers().get().size());
123 // Try to remove an inexisting peer
124 try {
125 admin.removeReplicationPeer(ID_TWO).join();
126 fail("Test case should fail as removing a inexisting peer.");
127 } catch (CompletionException e) {
128 // OK!
130 assertEquals(1, admin.listReplicationPeers().get().size());
131 // Add a second since multi-slave is supported
132 admin.addReplicationPeer(ID_TWO, rpc2).join();
133 assertEquals(2, admin.listReplicationPeers().get().size());
134 // Remove the first peer we added
135 admin.removeReplicationPeer(ID_ONE).join();
136 assertEquals(1, admin.listReplicationPeers().get().size());
137 admin.removeReplicationPeer(ID_TWO).join();
138 assertEquals(0, admin.listReplicationPeers().get().size());
141 @Test
142 public void testPeerConfig() throws Exception {
143 ReplicationPeerConfig config = new ReplicationPeerConfig();
144 config.setClusterKey(KEY_ONE);
145 config.getConfiguration().put("key1", "value1");
146 config.getConfiguration().put("key2", "value2");
147 admin.addReplicationPeer(ID_ONE, config).join();
149 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
150 assertEquals(1, peers.size());
151 ReplicationPeerDescription peerOne = peers.get(0);
152 assertNotNull(peerOne);
153 assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
154 assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
156 admin.removeReplicationPeer(ID_ONE).join();
159 @Test
160 public void testEnableDisablePeer() throws Exception {
161 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
162 rpc1.setClusterKey(KEY_ONE);
163 admin.addReplicationPeer(ID_ONE, rpc1).join();
164 List<ReplicationPeerDescription> peers = admin.listReplicationPeers().get();
165 assertEquals(1, peers.size());
166 assertTrue(peers.get(0).isEnabled());
168 admin.disableReplicationPeer(ID_ONE).join();
169 peers = admin.listReplicationPeers().get();
170 assertEquals(1, peers.size());
171 assertFalse(peers.get(0).isEnabled());
172 admin.removeReplicationPeer(ID_ONE).join();
175 @Test
176 public void testAppendPeerTableCFs() throws Exception {
177 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
178 rpc1.setClusterKey(KEY_ONE);
179 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
180 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
181 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
182 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
183 final TableName tableName5 = TableName.valueOf(tableName.getNameAsString() + "t5");
184 final TableName tableName6 = TableName.valueOf(tableName.getNameAsString() + "t6");
186 // Add a valid peer
187 admin.addReplicationPeer(ID_ONE, rpc1).join();
188 rpc1.setReplicateAllUserTables(false);
189 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
191 Map<TableName, List<String>> tableCFs = new HashMap<>();
193 // append table t1 to replication
194 tableCFs.put(tableName1, null);
195 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
196 Map<TableName, List<String>> result =
197 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
198 assertEquals(1, result.size());
199 assertEquals(true, result.containsKey(tableName1));
200 assertNull(result.get(tableName1));
202 // append table t2 to replication
203 tableCFs.clear();
204 tableCFs.put(tableName2, null);
205 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
206 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
207 assertEquals(2, result.size());
208 assertTrue("Should contain t1", result.containsKey(tableName1));
209 assertTrue("Should contain t2", result.containsKey(tableName2));
210 assertNull(result.get(tableName1));
211 assertNull(result.get(tableName2));
213 // append table column family: f1 of t3 to replication
214 tableCFs.clear();
215 tableCFs.put(tableName3, new ArrayList<>());
216 tableCFs.get(tableName3).add("f1");
217 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
218 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
219 assertEquals(3, result.size());
220 assertTrue("Should contain t1", result.containsKey(tableName1));
221 assertTrue("Should contain t2", result.containsKey(tableName2));
222 assertTrue("Should contain t3", result.containsKey(tableName3));
223 assertNull(result.get(tableName1));
224 assertNull(result.get(tableName2));
225 assertEquals(1, result.get(tableName3).size());
226 assertEquals("f1", result.get(tableName3).get(0));
228 // append table column family: f1,f2 of t4 to replication
229 tableCFs.clear();
230 tableCFs.put(tableName4, new ArrayList<>());
231 tableCFs.get(tableName4).add("f1");
232 tableCFs.get(tableName4).add("f2");
233 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
234 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
235 assertEquals(4, result.size());
236 assertTrue("Should contain t1", result.containsKey(tableName1));
237 assertTrue("Should contain t2", result.containsKey(tableName2));
238 assertTrue("Should contain t3", result.containsKey(tableName3));
239 assertTrue("Should contain t4", result.containsKey(tableName4));
240 assertNull(result.get(tableName1));
241 assertNull(result.get(tableName2));
242 assertEquals(1, result.get(tableName3).size());
243 assertEquals("f1", result.get(tableName3).get(0));
244 assertEquals(2, result.get(tableName4).size());
245 assertEquals("f1", result.get(tableName4).get(0));
246 assertEquals("f2", result.get(tableName4).get(1));
248 // append "table5" => [], then append "table5" => ["f1"]
249 tableCFs.clear();
250 tableCFs.put(tableName5, new ArrayList<>());
251 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
252 tableCFs.clear();
253 tableCFs.put(tableName5, new ArrayList<>());
254 tableCFs.get(tableName5).add("f1");
255 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
256 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
257 assertEquals(5, result.size());
258 assertTrue("Should contain t5", result.containsKey(tableName5));
259 // null means replication all cfs of tab5
260 assertNull(result.get(tableName5));
262 // append "table6" => ["f1"], then append "table6" => []
263 tableCFs.clear();
264 tableCFs.put(tableName6, new ArrayList<>());
265 tableCFs.get(tableName6).add("f1");
266 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
267 tableCFs.clear();
268 tableCFs.put(tableName6, new ArrayList<>());
269 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
270 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
271 assertEquals(6, result.size());
272 assertTrue("Should contain t6", result.containsKey(tableName6));
273 // null means replication all cfs of tab6
274 assertNull(result.get(tableName6));
276 admin.removeReplicationPeer(ID_ONE).join();
279 @Test
280 public void testRemovePeerTableCFs() throws Exception {
281 ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
282 rpc1.setClusterKey(KEY_ONE);
283 final TableName tableName1 = TableName.valueOf(tableName.getNameAsString() + "t1");
284 final TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "t2");
285 final TableName tableName3 = TableName.valueOf(tableName.getNameAsString() + "t3");
286 final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
287 // Add a valid peer
288 admin.addReplicationPeer(ID_ONE, rpc1).join();
289 rpc1.setReplicateAllUserTables(false);
290 admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
292 Map<TableName, List<String>> tableCFs = new HashMap<>();
293 try {
294 tableCFs.put(tableName3, null);
295 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
296 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
297 } catch (CompletionException e) {
298 assertTrue(e.getCause() instanceof ReplicationException);
300 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
302 tableCFs.clear();
303 tableCFs.put(tableName1, null);
304 tableCFs.put(tableName2, new ArrayList<>());
305 tableCFs.get(tableName2).add("cf1");
306 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
307 try {
308 tableCFs.clear();
309 tableCFs.put(tableName3, null);
310 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
311 fail("Test case should fail as removing table-cfs from a peer whose" +
312 " table-cfs didn't contain t3");
313 } catch (CompletionException e) {
314 assertTrue(e.getCause() instanceof ReplicationException);
316 Map<TableName, List<String>> result =
317 admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
318 assertEquals(2, result.size());
319 assertTrue("Should contain t1", result.containsKey(tableName1));
320 assertTrue("Should contain t2", result.containsKey(tableName2));
321 assertNull(result.get(tableName1));
322 assertEquals(1, result.get(tableName2).size());
323 assertEquals("cf1", result.get(tableName2).get(0));
325 try {
326 tableCFs.clear();
327 tableCFs.put(tableName1, new ArrayList<>());
328 tableCFs.get(tableName1).add("cf1");
329 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
330 fail("Test case should fail, because table t1 didn't specify cfs in peer config");
331 } catch (CompletionException e) {
332 assertTrue(e.getCause() instanceof ReplicationException);
334 tableCFs.clear();
335 tableCFs.put(tableName1, null);
336 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
337 result = admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
338 assertEquals(1, result.size());
339 assertEquals(1, result.get(tableName2).size());
340 assertEquals("cf1", result.get(tableName2).get(0));
342 try {
343 tableCFs.clear();
344 tableCFs.put(tableName2, null);
345 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
346 fail("Test case should fail, because table t2 hase specified cfs in peer config");
347 } catch (CompletionException e) {
348 assertTrue(e.getCause() instanceof ReplicationException);
350 tableCFs.clear();
351 tableCFs.put(tableName2, new ArrayList<>());
352 tableCFs.get(tableName2).add("cf1");
353 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
354 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
356 tableCFs.clear();
357 tableCFs.put(tableName4, new ArrayList<>());
358 admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
359 admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
360 assertNull(admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap());
362 admin.removeReplicationPeer(ID_ONE);
365 @Test
366 public void testSetPeerNamespaces() throws Exception {
367 String ns1 = "ns1";
368 String ns2 = "ns2";
370 ReplicationPeerConfig rpc = new ReplicationPeerConfig();
371 rpc.setClusterKey(KEY_ONE);
372 admin.addReplicationPeer(ID_ONE, rpc).join();
373 rpc.setReplicateAllUserTables(false);
374 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
376 // add ns1 and ns2 to peer config
377 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
378 Set<String> namespaces = new HashSet<>();
379 namespaces.add(ns1);
380 namespaces.add(ns2);
381 rpc.setNamespaces(namespaces);
382 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
383 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
384 assertEquals(2, namespaces.size());
385 assertTrue(namespaces.contains(ns1));
386 assertTrue(namespaces.contains(ns2));
388 // update peer config only contains ns1
389 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
390 namespaces = new HashSet<>();
391 namespaces.add(ns1);
392 rpc.setNamespaces(namespaces);
393 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
394 namespaces = admin.getReplicationPeerConfig(ID_ONE).get().getNamespaces();
395 assertEquals(1, namespaces.size());
396 assertTrue(namespaces.contains(ns1));
398 admin.removeReplicationPeer(ID_ONE).join();
401 @Test
402 public void testNamespacesAndTableCfsConfigConflict() throws Exception {
403 String ns1 = "ns1";
404 String ns2 = "ns2";
405 final TableName tableName1 = TableName.valueOf(ns1 + ":" + tableName.getNameAsString() + "1");
406 final TableName tableName2 = TableName.valueOf(ns2 + ":" + tableName.getNameAsString() + "2");
408 ReplicationPeerConfig rpc = new ReplicationPeerConfig();
409 rpc.setClusterKey(KEY_ONE);
410 admin.addReplicationPeer(ID_ONE, rpc).join();
411 rpc.setReplicateAllUserTables(false);
412 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
414 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
415 Set<String> namespaces = new HashSet<String>();
416 namespaces.add(ns1);
417 rpc.setNamespaces(namespaces);
418 admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
419 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
420 Map<TableName, List<String>> tableCfs = new HashMap<>();
421 tableCfs.put(tableName1, new ArrayList<>());
422 rpc.setTableCFsMap(tableCfs);
423 try {
424 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
425 fail(
426 "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
427 } catch (CompletionException e) {
428 // OK
431 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
432 tableCfs.clear();
433 tableCfs.put(tableName2, new ArrayList<>());
434 rpc.setTableCFsMap(tableCfs);
435 admin.updateReplicationPeerConfig(ID_ONE, rpc).get();
436 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
437 namespaces.clear();
438 namespaces.add(ns2);
439 rpc.setNamespaces(namespaces);
440 try {
441 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
442 fail(
443 "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
444 } catch (CompletionException e) {
445 // OK
448 admin.removeReplicationPeer(ID_ONE).join();
451 @Test
452 public void testPeerBandwidth() throws Exception {
453 ReplicationPeerConfig rpc = new ReplicationPeerConfig();
454 rpc.setClusterKey(KEY_ONE);
456 admin.addReplicationPeer(ID_ONE, rpc).join();
457 rpc = admin.getReplicationPeerConfig(ID_ONE).get();
458 assertEquals(0, rpc.getBandwidth());
460 rpc.setBandwidth(2097152);
461 admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
462 assertEquals(2097152, admin.getReplicationPeerConfig(ID_ONE).join().getBandwidth());
464 admin.removeReplicationPeer(ID_ONE).join();
467 @Test
468 public void testInvalidClusterKey() throws InterruptedException {
469 try {
470 admin.addReplicationPeer(ID_ONE,
471 ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
472 fail();
473 } catch (ExecutionException e) {
474 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
478 @Test
479 public void testInvalidReplicationEndpoint() throws InterruptedException {
480 try {
481 admin.addReplicationPeer(ID_ONE,
482 ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
483 fail();
484 } catch (ExecutionException e) {
485 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
486 assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
490 @Test
491 public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
492 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
493 admin
494 .addReplicationPeer(ID_ONE,
495 ReplicationPeerConfig.newBuilder()
496 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint.class.getName()).build())
497 .get();
499 // but we still need to check cluster key if we specify the default ReplicationEndpoint
500 try {
501 admin
502 .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
503 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
504 .get();
505 fail();
506 } catch (ExecutionException e) {
507 assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));