2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.AsyncConnectionConfiguration
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
21 import static org
.hamcrest
.CoreMatchers
.instanceOf
;
22 import static org
.hamcrest
.CoreMatchers
.startsWith
;
23 import static org
.hamcrest
.MatcherAssert
.assertThat
;
24 import static org
.junit
.Assert
.assertEquals
;
25 import static org
.junit
.Assert
.assertFalse
;
26 import static org
.junit
.Assert
.assertNotNull
;
27 import static org
.junit
.Assert
.assertNull
;
28 import static org
.junit
.Assert
.assertTrue
;
29 import static org
.junit
.Assert
.fail
;
31 import java
.io
.IOException
;
32 import java
.util
.ArrayList
;
33 import java
.util
.HashMap
;
34 import java
.util
.HashSet
;
35 import java
.util
.List
;
38 import java
.util
.concurrent
.CompletionException
;
39 import java
.util
.concurrent
.ExecutionException
;
40 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
41 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
42 import org
.apache
.hadoop
.hbase
.HConstants
;
43 import org
.apache
.hadoop
.hbase
.ServerName
;
44 import org
.apache
.hadoop
.hbase
.TableName
;
45 import org
.apache
.hadoop
.hbase
.replication
.ReplicationException
;
46 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
47 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfigBuilder
;
48 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerDescription
;
49 import org
.apache
.hadoop
.hbase
.replication
.ReplicationQueueStorage
;
50 import org
.apache
.hadoop
.hbase
.replication
.ReplicationStorageFactory
;
51 import org
.apache
.hadoop
.hbase
.replication
.VerifyWALEntriesReplicationEndpoint
;
52 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.HBaseInterClusterReplicationEndpoint
;
53 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
54 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
55 import org
.junit
.After
;
56 import org
.junit
.BeforeClass
;
57 import org
.junit
.ClassRule
;
58 import org
.junit
.Test
;
59 import org
.junit
.experimental
.categories
.Category
;
60 import org
.junit
.runner
.RunWith
;
61 import org
.junit
.runners
.Parameterized
;
64 * Class to test asynchronous replication admin operations.
66 @RunWith(Parameterized
.class)
67 @Category({ LargeTests
.class, ClientTests
.class })
68 public class TestAsyncReplicationAdminApi
extends TestAsyncAdminBase
{
71 public static final HBaseClassTestRule CLASS_RULE
=
72 HBaseClassTestRule
.forClass(TestAsyncReplicationAdminApi
.class);
74 private final String ID_ONE
= "1";
75 private static String KEY_ONE
;
76 private final String ID_TWO
= "2";
77 private static String KEY_TWO
;
80 public static void setUpBeforeClass() throws Exception
{
81 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 60000);
82 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 120000);
83 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
84 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
85 TEST_UTIL
.startMiniCluster();
86 KEY_ONE
= TEST_UTIL
.getClusterKey() + "-test1";
87 KEY_TWO
= TEST_UTIL
.getClusterKey() + "-test2";
88 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
92 public void clearPeerAndQueues() throws IOException
, ReplicationException
{
94 admin
.removeReplicationPeer(ID_ONE
).join();
95 } catch (Exception e
) {
98 admin
.removeReplicationPeer(ID_TWO
).join();
99 } catch (Exception e
) {
101 ReplicationQueueStorage queueStorage
= ReplicationStorageFactory
102 .getReplicationQueueStorage(TEST_UTIL
.getZooKeeperWatcher(), TEST_UTIL
.getConfiguration());
103 for (ServerName serverName
: queueStorage
.getListOfReplicators()) {
104 for (String queue
: queueStorage
.getAllQueues(serverName
)) {
105 queueStorage
.removeQueue(serverName
, queue
);
111 public void testAddRemovePeer() throws Exception
{
112 ReplicationPeerConfig rpc1
= ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
).build();
113 ReplicationPeerConfig rpc2
= ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_TWO
).build();
115 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
116 // try adding the same (fails)
118 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
119 fail("Test case should fail as adding a same peer.");
120 } catch (CompletionException e
) {
123 assertEquals(1, admin
.listReplicationPeers().get().size());
124 // Try to remove an inexisting peer
126 admin
.removeReplicationPeer(ID_TWO
).join();
127 fail("Test case should fail as removing a inexisting peer.");
128 } catch (CompletionException e
) {
131 assertEquals(1, admin
.listReplicationPeers().get().size());
132 // Add a second since multi-slave is supported
133 admin
.addReplicationPeer(ID_TWO
, rpc2
).join();
134 assertEquals(2, admin
.listReplicationPeers().get().size());
135 // Remove the first peer we added
136 admin
.removeReplicationPeer(ID_ONE
).join();
137 assertEquals(1, admin
.listReplicationPeers().get().size());
138 admin
.removeReplicationPeer(ID_TWO
).join();
139 assertEquals(0, admin
.listReplicationPeers().get().size());
143 public void testPeerConfig() throws Exception
{
144 ReplicationPeerConfig config
= ReplicationPeerConfig
.newBuilder()
145 .setClusterKey(KEY_ONE
)
146 .putConfiguration("key1", "value1")
147 .putConfiguration("key2", "value2")
149 admin
.addReplicationPeer(ID_ONE
, config
).join();
151 List
<ReplicationPeerDescription
> peers
= admin
.listReplicationPeers().get();
152 assertEquals(1, peers
.size());
153 ReplicationPeerDescription peerOne
= peers
.get(0);
154 assertNotNull(peerOne
);
155 assertEquals("value1", peerOne
.getPeerConfig().getConfiguration().get("key1"));
156 assertEquals("value2", peerOne
.getPeerConfig().getConfiguration().get("key2"));
158 admin
.removeReplicationPeer(ID_ONE
).join();
162 public void testEnableDisablePeer() throws Exception
{
163 ReplicationPeerConfig rpc1
= ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
).build();
164 admin
.addReplicationPeer(ID_ONE
, rpc1
).join();
165 List
<ReplicationPeerDescription
> peers
= admin
.listReplicationPeers().get();
166 assertEquals(1, peers
.size());
167 assertTrue(peers
.get(0).isEnabled());
169 admin
.disableReplicationPeer(ID_ONE
).join();
170 peers
= admin
.listReplicationPeers().get();
171 assertEquals(1, peers
.size());
172 assertFalse(peers
.get(0).isEnabled());
173 admin
.removeReplicationPeer(ID_ONE
).join();
177 public void testAppendPeerTableCFs() throws Exception
{
178 ReplicationPeerConfigBuilder rpcBuilder
=
179 ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
);
180 final TableName tableName1
= TableName
.valueOf(tableName
.getNameAsString() + "t1");
181 final TableName tableName2
= TableName
.valueOf(tableName
.getNameAsString() + "t2");
182 final TableName tableName3
= TableName
.valueOf(tableName
.getNameAsString() + "t3");
183 final TableName tableName4
= TableName
.valueOf(tableName
.getNameAsString() + "t4");
184 final TableName tableName5
= TableName
.valueOf(tableName
.getNameAsString() + "t5");
185 final TableName tableName6
= TableName
.valueOf(tableName
.getNameAsString() + "t6");
188 admin
.addReplicationPeer(ID_ONE
, rpcBuilder
.build()).join();
189 rpcBuilder
.setReplicateAllUserTables(false);
190 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
192 Map
<TableName
, List
<String
>> tableCFs
= new HashMap
<>();
194 // append table t1 to replication
195 tableCFs
.put(tableName1
, null);
196 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
197 Map
<TableName
, List
<String
>> result
=
198 admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
199 assertEquals(1, result
.size());
200 assertEquals(true, result
.containsKey(tableName1
));
201 assertNull(result
.get(tableName1
));
203 // append table t2 to replication
205 tableCFs
.put(tableName2
, null);
206 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
207 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
208 assertEquals(2, result
.size());
209 assertTrue("Should contain t1", result
.containsKey(tableName1
));
210 assertTrue("Should contain t2", result
.containsKey(tableName2
));
211 assertNull(result
.get(tableName1
));
212 assertNull(result
.get(tableName2
));
214 // append table column family: f1 of t3 to replication
216 tableCFs
.put(tableName3
, new ArrayList
<>());
217 tableCFs
.get(tableName3
).add("f1");
218 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
219 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
220 assertEquals(3, result
.size());
221 assertTrue("Should contain t1", result
.containsKey(tableName1
));
222 assertTrue("Should contain t2", result
.containsKey(tableName2
));
223 assertTrue("Should contain t3", result
.containsKey(tableName3
));
224 assertNull(result
.get(tableName1
));
225 assertNull(result
.get(tableName2
));
226 assertEquals(1, result
.get(tableName3
).size());
227 assertEquals("f1", result
.get(tableName3
).get(0));
229 // append table column family: f1,f2 of t4 to replication
231 tableCFs
.put(tableName4
, new ArrayList
<>());
232 tableCFs
.get(tableName4
).add("f1");
233 tableCFs
.get(tableName4
).add("f2");
234 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
235 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
236 assertEquals(4, result
.size());
237 assertTrue("Should contain t1", result
.containsKey(tableName1
));
238 assertTrue("Should contain t2", result
.containsKey(tableName2
));
239 assertTrue("Should contain t3", result
.containsKey(tableName3
));
240 assertTrue("Should contain t4", result
.containsKey(tableName4
));
241 assertNull(result
.get(tableName1
));
242 assertNull(result
.get(tableName2
));
243 assertEquals(1, result
.get(tableName3
).size());
244 assertEquals("f1", result
.get(tableName3
).get(0));
245 assertEquals(2, result
.get(tableName4
).size());
246 assertEquals("f1", result
.get(tableName4
).get(0));
247 assertEquals("f2", result
.get(tableName4
).get(1));
249 // append "table5" => [], then append "table5" => ["f1"]
251 tableCFs
.put(tableName5
, new ArrayList
<>());
252 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
254 tableCFs
.put(tableName5
, new ArrayList
<>());
255 tableCFs
.get(tableName5
).add("f1");
256 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
257 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
258 assertEquals(5, result
.size());
259 assertTrue("Should contain t5", result
.containsKey(tableName5
));
260 // null means replication all cfs of tab5
261 assertNull(result
.get(tableName5
));
263 // append "table6" => ["f1"], then append "table6" => []
265 tableCFs
.put(tableName6
, new ArrayList
<>());
266 tableCFs
.get(tableName6
).add("f1");
267 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
269 tableCFs
.put(tableName6
, new ArrayList
<>());
270 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
271 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
272 assertEquals(6, result
.size());
273 assertTrue("Should contain t6", result
.containsKey(tableName6
));
274 // null means replication all cfs of tab6
275 assertNull(result
.get(tableName6
));
277 admin
.removeReplicationPeer(ID_ONE
).join();
281 public void testRemovePeerTableCFs() throws Exception
{
282 ReplicationPeerConfigBuilder rpcBuilder
=
283 ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
);
284 final TableName tableName1
= TableName
.valueOf(tableName
.getNameAsString() + "t1");
285 final TableName tableName2
= TableName
.valueOf(tableName
.getNameAsString() + "t2");
286 final TableName tableName3
= TableName
.valueOf(tableName
.getNameAsString() + "t3");
287 final TableName tableName4
= TableName
.valueOf(tableName
.getNameAsString() + "t4");
289 admin
.addReplicationPeer(ID_ONE
, rpcBuilder
.build()).join();
290 rpcBuilder
.setReplicateAllUserTables(false);
291 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
293 Map
<TableName
, List
<String
>> tableCFs
= new HashMap
<>();
295 tableCFs
.put(tableName3
, null);
296 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
297 fail("Test case should fail as removing table-cfs from a peer whose table-cfs is null");
298 } catch (CompletionException e
) {
299 assertTrue(e
.getCause() instanceof ReplicationException
);
301 assertNull(admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap());
304 tableCFs
.put(tableName1
, null);
305 tableCFs
.put(tableName2
, new ArrayList
<>());
306 tableCFs
.get(tableName2
).add("cf1");
307 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
310 tableCFs
.put(tableName3
, null);
311 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
312 fail("Test case should fail as removing table-cfs from a peer whose" +
313 " table-cfs didn't contain t3");
314 } catch (CompletionException e
) {
315 assertTrue(e
.getCause() instanceof ReplicationException
);
317 Map
<TableName
, List
<String
>> result
=
318 admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
319 assertEquals(2, result
.size());
320 assertTrue("Should contain t1", result
.containsKey(tableName1
));
321 assertTrue("Should contain t2", result
.containsKey(tableName2
));
322 assertNull(result
.get(tableName1
));
323 assertEquals(1, result
.get(tableName2
).size());
324 assertEquals("cf1", result
.get(tableName2
).get(0));
328 tableCFs
.put(tableName1
, new ArrayList
<>());
329 tableCFs
.get(tableName1
).add("cf1");
330 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
331 fail("Test case should fail, because table t1 didn't specify cfs in peer config");
332 } catch (CompletionException e
) {
333 assertTrue(e
.getCause() instanceof ReplicationException
);
336 tableCFs
.put(tableName1
, null);
337 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
338 result
= admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap();
339 assertEquals(1, result
.size());
340 assertEquals(1, result
.get(tableName2
).size());
341 assertEquals("cf1", result
.get(tableName2
).get(0));
345 tableCFs
.put(tableName2
, null);
346 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
347 fail("Test case should fail, because table t2 hase specified cfs in peer config");
348 } catch (CompletionException e
) {
349 assertTrue(e
.getCause() instanceof ReplicationException
);
352 tableCFs
.put(tableName2
, new ArrayList
<>());
353 tableCFs
.get(tableName2
).add("cf1");
354 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
355 assertNull(admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap());
358 tableCFs
.put(tableName4
, new ArrayList
<>());
359 admin
.appendReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
360 admin
.removeReplicationPeerTableCFs(ID_ONE
, tableCFs
).join();
361 assertNull(admin
.getReplicationPeerConfig(ID_ONE
).get().getTableCFsMap());
363 admin
.removeReplicationPeer(ID_ONE
);
367 public void testSetPeerNamespaces() throws Exception
{
371 ReplicationPeerConfigBuilder rpcBuilder
=
372 ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
);
373 admin
.addReplicationPeer(ID_ONE
, rpcBuilder
.build()).join();
374 rpcBuilder
.setReplicateAllUserTables(false);
375 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
377 // add ns1 and ns2 to peer config
378 Set
<String
> namespaces
= new HashSet
<>();
381 rpcBuilder
.setNamespaces(namespaces
);
382 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
383 namespaces
= admin
.getReplicationPeerConfig(ID_ONE
).get().getNamespaces();
384 assertEquals(2, namespaces
.size());
385 assertTrue(namespaces
.contains(ns1
));
386 assertTrue(namespaces
.contains(ns2
));
388 // update peer config only contains ns1
389 namespaces
= new HashSet
<>();
391 rpcBuilder
.setNamespaces(namespaces
);
392 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
393 namespaces
= admin
.getReplicationPeerConfig(ID_ONE
).get().getNamespaces();
394 assertEquals(1, namespaces
.size());
395 assertTrue(namespaces
.contains(ns1
));
397 admin
.removeReplicationPeer(ID_ONE
).join();
401 public void testNamespacesAndTableCfsConfigConflict() throws Exception
{
404 final TableName tableName1
= TableName
.valueOf(ns1
+ ":" + tableName
.getNameAsString() + "1");
405 final TableName tableName2
= TableName
.valueOf(ns2
+ ":" + tableName
.getNameAsString() + "2");
407 ReplicationPeerConfigBuilder rpcBuilder
=
408 ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
);
409 admin
.addReplicationPeer(ID_ONE
, rpcBuilder
.build()).join();
410 rpcBuilder
.setReplicateAllUserTables(false);
411 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
413 Set
<String
> namespaces
= new HashSet
<String
>();
415 rpcBuilder
.setNamespaces(namespaces
);
416 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).get();
417 Map
<TableName
, List
<String
>> tableCfs
= new HashMap
<>();
418 tableCfs
.put(tableName1
, new ArrayList
<>());
419 rpcBuilder
.setTableCFsMap(tableCfs
);
421 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
423 "Test case should fail, because table " + tableName1
+ " conflict with namespace " + ns1
);
424 } catch (CompletionException e
) {
429 tableCfs
.put(tableName2
, new ArrayList
<>());
430 rpcBuilder
.setTableCFsMap(tableCfs
);
431 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).get();
434 rpcBuilder
.setNamespaces(namespaces
);
436 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
438 "Test case should fail, because namespace " + ns2
+ " conflict with table " + tableName2
);
439 } catch (CompletionException e
) {
443 admin
.removeReplicationPeer(ID_ONE
).join();
447 public void testPeerBandwidth() throws Exception
{
448 ReplicationPeerConfigBuilder rpcBuilder
=
449 ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
);
451 admin
.addReplicationPeer(ID_ONE
, rpcBuilder
.build()).join();;
452 assertEquals(0, admin
.getReplicationPeerConfig(ID_ONE
).get().getBandwidth());
454 rpcBuilder
.setBandwidth(2097152);
455 admin
.updateReplicationPeerConfig(ID_ONE
, rpcBuilder
.build()).join();
456 assertEquals(2097152, admin
.getReplicationPeerConfig(ID_ONE
).join().getBandwidth());
458 admin
.removeReplicationPeer(ID_ONE
).join();
462 public void testInvalidClusterKey() throws InterruptedException
{
464 admin
.addReplicationPeer(ID_ONE
,
465 ReplicationPeerConfig
.newBuilder().setClusterKey("whatever").build()).get();
467 } catch (ExecutionException e
) {
468 assertThat(e
.getCause(), instanceOf(DoNotRetryIOException
.class));
473 public void testClusterKeyWithTrailingSpace() throws Exception
{
474 admin
.addReplicationPeer(ID_ONE
,
475 ReplicationPeerConfig
.newBuilder().setClusterKey(KEY_ONE
+ " ").build()).get();
476 String clusterKey
= admin
.getReplicationPeerConfig(ID_ONE
).get().getClusterKey();
477 assertEquals(KEY_ONE
, clusterKey
);
481 public void testInvalidReplicationEndpoint() throws InterruptedException
{
483 admin
.addReplicationPeer(ID_ONE
,
484 ReplicationPeerConfig
.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
486 } catch (ExecutionException e
) {
487 assertThat(e
.getCause(), instanceOf(DoNotRetryIOException
.class));
488 assertThat(e
.getCause().getMessage(), startsWith("Can not instantiate"));
493 public void testSetReplicationEndpoint() throws InterruptedException
, ExecutionException
{
494 // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
496 .addReplicationPeer(ID_ONE
,
497 ReplicationPeerConfig
.newBuilder()
498 .setReplicationEndpointImpl(VerifyWALEntriesReplicationEndpoint
.class.getName()).build())
501 // but we still need to check cluster key if we specify the default ReplicationEndpoint
504 .addReplicationPeer(ID_TWO
, ReplicationPeerConfig
.newBuilder()
505 .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint
.class.getName()).build())
508 } catch (ExecutionException e
) {
509 assertThat(e
.getCause(), instanceOf(DoNotRetryIOException
.class));