2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.mockito
.Mockito
.doNothing
;
21 import static org
.mockito
.Mockito
.mock
;
22 import static org
.mockito
.Mockito
.spy
;
23 import static org
.mockito
.Mockito
.verify
;
24 import static org
.mockito
.Mockito
.when
;
26 import java
.io
.IOException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.HashMap
;
29 import java
.util
.List
;
31 import java
.util
.UUID
;
32 import java
.util
.concurrent
.Callable
;
33 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
34 import java
.util
.concurrent
.atomic
.AtomicInteger
;
35 import java
.util
.concurrent
.atomic
.AtomicReference
;
36 import org
.apache
.hadoop
.hbase
.Cell
;
37 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
38 import org
.apache
.hadoop
.hbase
.Waiter
;
39 import org
.apache
.hadoop
.hbase
.client
.Connection
;
40 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
41 import org
.apache
.hadoop
.hbase
.client
.Put
;
42 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
43 import org
.apache
.hadoop
.hbase
.client
.Table
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
45 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.HBaseInterClusterReplicationEndpoint
;
46 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.MetricsReplicationGlobalSourceSource
;
47 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.MetricsReplicationSourceImpl
;
48 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.MetricsReplicationSourceSource
;
49 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.MetricsReplicationSourceSourceImpl
;
50 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.MetricsSource
;
51 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
52 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
53 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
54 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
55 import org
.apache
.hadoop
.hbase
.util
.Threads
;
56 import org
.apache
.hadoop
.hbase
.wal
.WAL
.Entry
;
57 import org
.apache
.hadoop
.hbase
.zookeeper
.ZKConfig
;
58 import org
.apache
.hadoop
.metrics2
.lib
.DynamicMetricsRegistry
;
59 import org
.junit
.AfterClass
;
60 import org
.junit
.Assert
;
61 import org
.junit
.Before
;
62 import org
.junit
.BeforeClass
;
63 import org
.junit
.ClassRule
;
64 import org
.junit
.Test
;
65 import org
.junit
.experimental
.categories
.Category
;
66 import org
.slf4j
.Logger
;
67 import org
.slf4j
.LoggerFactory
;
70 * Tests ReplicationSource and ReplicationEndpoint interactions
72 @Category({ ReplicationTests
.class, MediumTests
.class })
73 public class TestReplicationEndpoint
extends TestReplicationBase
{
76 public static final HBaseClassTestRule CLASS_RULE
=
77 HBaseClassTestRule
.forClass(TestReplicationEndpoint
.class);
79 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationEndpoint
.class);
81 static int numRegionServers
;
84 public static void setUpBeforeClass() throws Exception
{
85 TestReplicationBase
.setUpBeforeClass();
86 numRegionServers
= UTIL1
.getHBaseCluster().getRegionServerThreads().size();
90 public static void tearDownAfterClass() throws Exception
{
91 TestReplicationBase
.tearDownAfterClass();
92 // check stop is called
93 Assert
.assertTrue(ReplicationEndpointForTest
.stoppedCount
.get() > 0);
97 public void setup() throws Exception
{
98 ReplicationEndpointForTest
.contructedCount
.set(0);
99 ReplicationEndpointForTest
.startedCount
.set(0);
100 ReplicationEndpointForTest
.replicateCount
.set(0);
101 ReplicationEndpointReturningFalse
.replicated
.set(false);
102 ReplicationEndpointForTest
.lastEntries
= null;
103 final List
<RegionServerThread
> rsThreads
=
104 UTIL1
.getMiniHBaseCluster().getRegionServerThreads();
105 for (RegionServerThread rs
: rsThreads
) {
106 UTIL1
.getAdmin().rollWALWriter(rs
.getRegionServer().getServerName());
108 // Wait for all log roll to finish
109 UTIL1
.waitFor(3000, new Waiter
.ExplainingPredicate
<Exception
>() {
111 public boolean evaluate() throws Exception
{
112 for (RegionServerThread rs
: rsThreads
) {
113 if (!rs
.getRegionServer().walRollRequestFinished()) {
121 public String
explainFailure() throws Exception
{
122 List
<String
> logRollInProgressRsList
= new ArrayList
<>();
123 for (RegionServerThread rs
: rsThreads
) {
124 if (!rs
.getRegionServer().walRollRequestFinished()) {
125 logRollInProgressRsList
.add(rs
.getRegionServer().toString());
128 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList
;
134 public void testCustomReplicationEndpoint() throws Exception
{
135 // test installing a custom replication endpoint other than the default one.
136 hbaseAdmin
.addReplicationPeer("testCustomReplicationEndpoint",
137 new ReplicationPeerConfig().setClusterKey(ZKConfig
.getZooKeeperClusterKey(CONF1
))
138 .setReplicationEndpointImpl(ReplicationEndpointForTest
.class.getName()));
140 // check whether the class has been constructed and started
141 Waiter
.waitFor(CONF1
, 60000, new Waiter
.Predicate
<Exception
>() {
143 public boolean evaluate() throws Exception
{
144 return ReplicationEndpointForTest
.contructedCount
.get() >= numRegionServers
;
148 Waiter
.waitFor(CONF1
, 60000, new Waiter
.Predicate
<Exception
>() {
150 public boolean evaluate() throws Exception
{
151 return ReplicationEndpointForTest
.startedCount
.get() >= numRegionServers
;
155 Assert
.assertEquals(0, ReplicationEndpointForTest
.replicateCount
.get());
157 // now replicate some data.
158 doPut(Bytes
.toBytes("row42"));
160 Waiter
.waitFor(CONF1
, 60000, new Waiter
.Predicate
<Exception
>() {
162 public boolean evaluate() throws Exception
{
163 return ReplicationEndpointForTest
.replicateCount
.get() >= 1;
167 doAssert(Bytes
.toBytes("row42"));
169 hbaseAdmin
.removeReplicationPeer("testCustomReplicationEndpoint");
173 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception
{
174 Assert
.assertEquals(0, ReplicationEndpointForTest
.replicateCount
.get());
175 Assert
.assertTrue(!ReplicationEndpointReturningFalse
.replicated
.get());
176 int peerCount
= hbaseAdmin
.listReplicationPeers().size();
177 final String id
= "testReplicationEndpointReturnsFalseOnReplicate";
178 hbaseAdmin
.addReplicationPeer(id
,
179 new ReplicationPeerConfig().setClusterKey(ZKConfig
.getZooKeeperClusterKey(CONF1
))
180 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse
.class.getName()));
181 // This test is flakey and then there is so much stuff flying around in here its, hard to
182 // debug. Peer needs to be up for the edit to make it across. This wait on
183 // peer count seems to be a hack that has us not progress till peer is up.
184 if (hbaseAdmin
.listReplicationPeers().size() <= peerCount
) {
185 LOG
.info("Waiting on peercount to go up from " + peerCount
);
188 // now replicate some data
191 Waiter
.waitFor(CONF1
, 60000, new Waiter
.Predicate
<Exception
>() {
193 public boolean evaluate() throws Exception
{
194 // Looks like replication endpoint returns false unless we put more than 10 edits. We
195 // only send over one edit.
196 int count
= ReplicationEndpointForTest
.replicateCount
.get();
197 LOG
.info("count=" + count
);
198 return ReplicationEndpointReturningFalse
.replicated
.get();
201 if (ReplicationEndpointReturningFalse
.ex
.get() != null) {
202 throw ReplicationEndpointReturningFalse
.ex
.get();
205 hbaseAdmin
.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
209 public void testInterClusterReplication() throws Exception
{
210 final String id
= "testInterClusterReplication";
212 List
<HRegion
> regions
= UTIL1
.getHBaseCluster().getRegions(tableName
);
215 // Make sure edits are spread across regions because we do region based batching
216 // before shipping edits.
217 for(HRegion region
: regions
) {
218 RegionInfo hri
= region
.getRegionInfo();
219 byte[] row
= hri
.getStartKey();
220 for (int i
= 0; i
< 100; i
++) {
221 if (row
.length
> 0) {
222 Put put
= new Put(row
);
223 put
.addColumn(famName
, row
, row
);
230 hbaseAdmin
.addReplicationPeer(id
,
231 new ReplicationPeerConfig().setClusterKey(ZKConfig
.getZooKeeperClusterKey(CONF2
))
232 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest
.class.getName()));
234 final int numEdits
= totEdits
;
235 Waiter
.waitFor(CONF1
, 30000, new Waiter
.ExplainingPredicate
<Exception
>() {
237 public boolean evaluate() throws Exception
{
238 return InterClusterReplicationEndpointForTest
.replicateCount
.get() == numEdits
;
242 public String
explainFailure() throws Exception
{
243 String failure
= "Failed to replicate all edits, expected = " + numEdits
244 + " replicated = " + InterClusterReplicationEndpointForTest
.replicateCount
.get();
249 hbaseAdmin
.removeReplicationPeer("testInterClusterReplication");
250 UTIL1
.deleteTableData(tableName
);
254 public void testWALEntryFilterFromReplicationEndpoint() throws Exception
{
255 ReplicationPeerConfig rpc
=
256 new ReplicationPeerConfig().setClusterKey(ZKConfig
.getZooKeeperClusterKey(CONF1
))
257 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter
.class.getName());
258 // test that we can create mutliple WALFilters reflectively
259 rpc
.getConfiguration().put(BaseReplicationEndpoint
.REPLICATION_WALENTRYFILTER_CONFIG_KEY
,
260 EverythingPassesWALEntryFilter
.class.getName() + "," +
261 EverythingPassesWALEntryFilterSubclass
.class.getName());
262 hbaseAdmin
.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc
);
263 // now replicate some data.
264 try (Connection connection
= ConnectionFactory
.createConnection(CONF1
)) {
265 doPut(connection
, Bytes
.toBytes("row1"));
266 doPut(connection
, row
);
267 doPut(connection
, Bytes
.toBytes("row2"));
270 Waiter
.waitFor(CONF1
, 60000, new Waiter
.Predicate
<Exception
>() {
272 public boolean evaluate() throws Exception
{
273 return ReplicationEndpointForTest
.replicateCount
.get() >= 1;
277 Assert
.assertNull(ReplicationEndpointWithWALEntryFilter
.ex
.get());
278 //make sure our reflectively created filter is in the filter chain
279 Assert
.assertTrue(EverythingPassesWALEntryFilter
.hasPassedAnEntry());
280 hbaseAdmin
.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
283 @Test(expected
= IOException
.class)
284 public void testWALEntryFilterAddValidation() throws Exception
{
285 ReplicationPeerConfig rpc
=
286 new ReplicationPeerConfig().setClusterKey(ZKConfig
.getZooKeeperClusterKey(CONF1
))
287 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter
.class.getName());
288 // test that we can create mutliple WALFilters reflectively
289 rpc
.getConfiguration().put(BaseReplicationEndpoint
.REPLICATION_WALENTRYFILTER_CONFIG_KEY
,
290 "IAmNotARealWalEntryFilter");
291 hbaseAdmin
.addReplicationPeer("testWALEntryFilterAddValidation", rpc
);
294 @Test(expected
= IOException
.class)
295 public void testWALEntryFilterUpdateValidation() throws Exception
{
296 ReplicationPeerConfig rpc
=
297 new ReplicationPeerConfig().setClusterKey(ZKConfig
.getZooKeeperClusterKey(CONF1
))
298 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter
.class.getName());
299 // test that we can create mutliple WALFilters reflectively
300 rpc
.getConfiguration().put(BaseReplicationEndpoint
.REPLICATION_WALENTRYFILTER_CONFIG_KEY
,
301 "IAmNotARealWalEntryFilter");
302 hbaseAdmin
.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc
);
306 public void testMetricsSourceBaseSourcePassthrough() {
308 * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
309 * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
310 * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
311 * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
312 * MetricsSource actually calls down through the two layers of wrapping to the actual
316 DynamicMetricsRegistry mockRegistry
= mock(DynamicMetricsRegistry
.class);
317 MetricsReplicationSourceImpl singleRms
= mock(MetricsReplicationSourceImpl
.class);
318 when(singleRms
.getMetricsRegistry()).thenReturn(mockRegistry
);
319 MetricsReplicationSourceImpl globalRms
= mock(MetricsReplicationSourceImpl
.class);
320 when(globalRms
.getMetricsRegistry()).thenReturn(mockRegistry
);
322 MetricsReplicationSourceSource singleSourceSource
=
323 new MetricsReplicationSourceSourceImpl(singleRms
, id
);
324 MetricsReplicationSourceSource globalSourceSource
=
325 new MetricsReplicationGlobalSourceSource(globalRms
);
326 MetricsReplicationSourceSource spyglobalSourceSource
= spy(globalSourceSource
);
327 doNothing().when(spyglobalSourceSource
).incrFailedRecoveryQueue();
329 Map
<String
, MetricsReplicationSourceSource
> singleSourceSourceByTable
= new HashMap
<>();
330 MetricsSource source
=
331 new MetricsSource(id
, singleSourceSource
, spyglobalSourceSource
, singleSourceSourceByTable
);
334 String gaugeName
= "gauge";
335 String singleGaugeName
= "source.id." + gaugeName
;
336 String globalGaugeName
= "source." + gaugeName
;
338 String counterName
= "counter";
339 String singleCounterName
= "source.id." + counterName
;
340 String globalCounterName
= "source." + counterName
;
342 source
.decGauge(gaugeName
, delta
);
343 source
.getMetricsContext();
344 source
.getMetricsDescription();
345 source
.getMetricsJmxContext();
346 source
.getMetricsName();
347 source
.incCounters(counterName
, count
);
348 source
.incGauge(gaugeName
, delta
);
350 source
.removeMetric(gaugeName
);
351 source
.setGauge(gaugeName
, delta
);
352 source
.updateHistogram(counterName
, count
);
353 source
.incrFailedRecoveryQueue();
356 verify(singleRms
).decGauge(singleGaugeName
, delta
);
357 verify(globalRms
).decGauge(globalGaugeName
, delta
);
358 verify(globalRms
).getMetricsContext();
359 verify(globalRms
).getMetricsJmxContext();
360 verify(globalRms
).getMetricsName();
361 verify(singleRms
).incCounters(singleCounterName
, count
);
362 verify(globalRms
).incCounters(globalCounterName
, count
);
363 verify(singleRms
).incGauge(singleGaugeName
, delta
);
364 verify(globalRms
).incGauge(globalGaugeName
, delta
);
365 verify(globalRms
).init();
366 verify(singleRms
).removeMetric(singleGaugeName
);
367 verify(globalRms
).removeMetric(globalGaugeName
);
368 verify(singleRms
).setGauge(singleGaugeName
, delta
);
369 verify(globalRms
).setGauge(globalGaugeName
, delta
);
370 verify(singleRms
).updateHistogram(singleCounterName
, count
);
371 verify(globalRms
).updateHistogram(globalCounterName
, count
);
372 verify(spyglobalSourceSource
).incrFailedRecoveryQueue();
374 //check singleSourceSourceByTable metrics.
375 // singleSourceSourceByTable map entry will be created only
376 // after calling #setAgeOfLastShippedOpByTable
377 boolean containsRandomNewTable
= source
.getSingleSourceSourceByTable()
378 .containsKey("RandomNewTable");
379 Assert
.assertEquals(false, containsRandomNewTable
);
380 source
.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
381 containsRandomNewTable
= source
.getSingleSourceSourceByTable()
382 .containsKey("RandomNewTable");
383 Assert
.assertEquals(true, containsRandomNewTable
);
384 MetricsReplicationSourceSource msr
= source
.getSingleSourceSourceByTable()
385 .get("RandomNewTable");
386 // cannot put more concreate value here to verify because the age is arbitrary.
387 // as long as it's greater than 0, we see it as correct answer.
388 Assert
.assertTrue(msr
.getLastShippedAge() > 0);
392 private void doPut(byte[] row
) throws IOException
{
393 try (Connection connection
= ConnectionFactory
.createConnection(CONF1
)) {
394 doPut(connection
, row
);
398 private void doPut(final Connection connection
, final byte [] row
) throws IOException
{
399 try (Table t
= connection
.getTable(tableName
)) {
400 Put put
= new Put(row
);
401 put
.addColumn(famName
, row
, row
);
406 private static void doAssert(byte[] row
) throws Exception
{
407 if (ReplicationEndpointForTest
.lastEntries
== null) {
408 return; // first call
410 Assert
.assertEquals(1, ReplicationEndpointForTest
.lastEntries
.size());
411 List
<Cell
> cells
= ReplicationEndpointForTest
.lastEntries
.get(0).getEdit().getCells();
412 Assert
.assertEquals(1, cells
.size());
413 Assert
.assertTrue(Bytes
.equals(cells
.get(0).getRowArray(), cells
.get(0).getRowOffset(),
414 cells
.get(0).getRowLength(), row
, 0, row
.length
));
417 public static class ReplicationEndpointForTest
extends BaseReplicationEndpoint
{
418 static UUID uuid
= UTIL1
.getRandomUUID();
419 static AtomicInteger contructedCount
= new AtomicInteger();
420 static AtomicInteger startedCount
= new AtomicInteger();
421 static AtomicInteger stoppedCount
= new AtomicInteger();
422 static AtomicInteger replicateCount
= new AtomicInteger();
423 static volatile List
<Entry
> lastEntries
= null;
425 public ReplicationEndpointForTest() {
426 replicateCount
.set(0);
427 contructedCount
.incrementAndGet();
431 public UUID
getPeerUUID() {
436 public boolean replicate(ReplicateContext replicateContext
) {
437 replicateCount
.incrementAndGet();
438 lastEntries
= new ArrayList
<>(replicateContext
.entries
);
443 public void start() {
453 protected void doStart() {
454 startedCount
.incrementAndGet();
459 protected void doStop() {
460 stoppedCount
.incrementAndGet();
465 public static class InterClusterReplicationEndpointForTest
466 extends HBaseInterClusterReplicationEndpoint
{
468 static AtomicInteger replicateCount
= new AtomicInteger();
469 static boolean failedOnce
;
471 public InterClusterReplicationEndpointForTest() {
472 replicateCount
.set(0);
476 public boolean replicate(ReplicateContext replicateContext
) {
477 boolean success
= super.replicate(replicateContext
);
479 replicateCount
.addAndGet(replicateContext
.entries
.size());
485 protected Callable
<Integer
> createReplicator(List
<Entry
> entries
, int ordinal
, int timeout
) {
486 // Fail only once, we don't want to slow down the test.
488 return () -> ordinal
;
492 throw new IOException("Sample Exception: Failed to replicate.");
498 public static class ReplicationEndpointReturningFalse
extends ReplicationEndpointForTest
{
499 static int COUNT
= 10;
500 static AtomicReference
<Exception
> ex
= new AtomicReference
<>(null);
501 static AtomicBoolean replicated
= new AtomicBoolean(false);
503 public boolean replicate(ReplicateContext replicateContext
) {
507 } catch (Exception e
) {
511 super.replicate(replicateContext
);
512 LOG
.info("Replicated " + Bytes
.toString(row
) + ", count=" + replicateCount
.get());
514 replicated
.set(replicateCount
.get() > COUNT
); // first 10 times, we return false
515 return replicated
.get();
519 // return a WALEntry filter which only accepts "row", but not other rows
520 public static class ReplicationEndpointWithWALEntryFilter
extends ReplicationEndpointForTest
{
521 static AtomicReference
<Exception
> ex
= new AtomicReference
<>(null);
524 public boolean replicate(ReplicateContext replicateContext
) {
526 super.replicate(replicateContext
);
528 } catch (Exception e
) {
535 public WALEntryFilter
getWALEntryfilter() {
536 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
538 public Entry
filter(Entry entry
) {
539 ArrayList
<Cell
> cells
= entry
.getEdit().getCells();
540 int size
= cells
.size();
541 for (int i
= size
-1; i
>= 0; i
--) {
542 Cell cell
= cells
.get(i
);
543 if (!Bytes
.equals(cell
.getRowArray(), cell
.getRowOffset(), cell
.getRowLength(),
544 row
, 0, row
.length
)) {
554 public static class EverythingPassesWALEntryFilter
implements WALEntryFilter
{
555 private static boolean passedEntry
= false;
557 public Entry
filter(Entry entry
) {
562 public static boolean hasPassedAnEntry(){
567 public static class EverythingPassesWALEntryFilterSubclass
568 extends EverythingPassesWALEntryFilter
{