HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestReplicationEndpoint.java
blob4588ace5900e143a260edb36d9681df19bca9c37
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import static org.mockito.Mockito.doNothing;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.spy;
23 import static org.mockito.Mockito.verify;
24 import static org.mockito.Mockito.when;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.UUID;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicReference;
36 import org.apache.hadoop.hbase.Cell;
37 import org.apache.hadoop.hbase.HBaseClassTestRule;
38 import org.apache.hadoop.hbase.Waiter;
39 import org.apache.hadoop.hbase.client.Connection;
40 import org.apache.hadoop.hbase.client.ConnectionFactory;
41 import org.apache.hadoop.hbase.client.Put;
42 import org.apache.hadoop.hbase.client.RegionInfo;
43 import org.apache.hadoop.hbase.client.Table;
44 import org.apache.hadoop.hbase.regionserver.HRegion;
45 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
46 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
47 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
48 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
49 import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
50 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
51 import org.apache.hadoop.hbase.testclassification.MediumTests;
52 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
53 import org.apache.hadoop.hbase.util.Bytes;
54 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
55 import org.apache.hadoop.hbase.util.Threads;
56 import org.apache.hadoop.hbase.wal.WAL.Entry;
57 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
58 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
59 import org.junit.AfterClass;
60 import org.junit.Assert;
61 import org.junit.Before;
62 import org.junit.BeforeClass;
63 import org.junit.ClassRule;
64 import org.junit.Test;
65 import org.junit.experimental.categories.Category;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
69 /**
70 * Tests ReplicationSource and ReplicationEndpoint interactions
72 @Category({ ReplicationTests.class, MediumTests.class })
73 public class TestReplicationEndpoint extends TestReplicationBase {
75 @ClassRule
76 public static final HBaseClassTestRule CLASS_RULE =
77 HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
79 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
81 static int numRegionServers;
83 @BeforeClass
84 public static void setUpBeforeClass() throws Exception {
85 TestReplicationBase.setUpBeforeClass();
86 numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
89 @AfterClass
90 public static void tearDownAfterClass() throws Exception {
91 TestReplicationBase.tearDownAfterClass();
92 // check stop is called
93 Assert.assertTrue(ReplicationEndpointForTest.stoppedCount.get() > 0);
96 @Before
97 public void setup() throws Exception {
98 ReplicationEndpointForTest.contructedCount.set(0);
99 ReplicationEndpointForTest.startedCount.set(0);
100 ReplicationEndpointForTest.replicateCount.set(0);
101 ReplicationEndpointReturningFalse.replicated.set(false);
102 ReplicationEndpointForTest.lastEntries = null;
103 final List<RegionServerThread> rsThreads =
104 UTIL1.getMiniHBaseCluster().getRegionServerThreads();
105 for (RegionServerThread rs : rsThreads) {
106 UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
108 // Wait for all log roll to finish
109 UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
110 @Override
111 public boolean evaluate() throws Exception {
112 for (RegionServerThread rs : rsThreads) {
113 if (!rs.getRegionServer().walRollRequestFinished()) {
114 return false;
117 return true;
120 @Override
121 public String explainFailure() throws Exception {
122 List<String> logRollInProgressRsList = new ArrayList<>();
123 for (RegionServerThread rs : rsThreads) {
124 if (!rs.getRegionServer().walRollRequestFinished()) {
125 logRollInProgressRsList.add(rs.getRegionServer().toString());
128 return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
133 @Test
134 public void testCustomReplicationEndpoint() throws Exception {
135 // test installing a custom replication endpoint other than the default one.
136 hbaseAdmin.addReplicationPeer("testCustomReplicationEndpoint",
137 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
138 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()));
140 // check whether the class has been constructed and started
141 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
142 @Override
143 public boolean evaluate() throws Exception {
144 return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
148 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
149 @Override
150 public boolean evaluate() throws Exception {
151 return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
155 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
157 // now replicate some data.
158 doPut(Bytes.toBytes("row42"));
160 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
161 @Override
162 public boolean evaluate() throws Exception {
163 return ReplicationEndpointForTest.replicateCount.get() >= 1;
167 doAssert(Bytes.toBytes("row42"));
169 hbaseAdmin.removeReplicationPeer("testCustomReplicationEndpoint");
172 @Test
173 public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
174 Assert.assertEquals(0, ReplicationEndpointForTest.replicateCount.get());
175 Assert.assertTrue(!ReplicationEndpointReturningFalse.replicated.get());
176 int peerCount = hbaseAdmin.listReplicationPeers().size();
177 final String id = "testReplicationEndpointReturnsFalseOnReplicate";
178 hbaseAdmin.addReplicationPeer(id,
179 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
180 .setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()));
181 // This test is flakey and then there is so much stuff flying around in here its, hard to
182 // debug. Peer needs to be up for the edit to make it across. This wait on
183 // peer count seems to be a hack that has us not progress till peer is up.
184 if (hbaseAdmin.listReplicationPeers().size() <= peerCount) {
185 LOG.info("Waiting on peercount to go up from " + peerCount);
186 Threads.sleep(100);
188 // now replicate some data
189 doPut(row);
191 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
192 @Override
193 public boolean evaluate() throws Exception {
194 // Looks like replication endpoint returns false unless we put more than 10 edits. We
195 // only send over one edit.
196 int count = ReplicationEndpointForTest.replicateCount.get();
197 LOG.info("count=" + count);
198 return ReplicationEndpointReturningFalse.replicated.get();
201 if (ReplicationEndpointReturningFalse.ex.get() != null) {
202 throw ReplicationEndpointReturningFalse.ex.get();
205 hbaseAdmin.removeReplicationPeer("testReplicationEndpointReturnsFalseOnReplicate");
208 @Test
209 public void testInterClusterReplication() throws Exception {
210 final String id = "testInterClusterReplication";
212 List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
213 int totEdits = 0;
215 // Make sure edits are spread across regions because we do region based batching
216 // before shipping edits.
217 for(HRegion region: regions) {
218 RegionInfo hri = region.getRegionInfo();
219 byte[] row = hri.getStartKey();
220 for (int i = 0; i < 100; i++) {
221 if (row.length > 0) {
222 Put put = new Put(row);
223 put.addColumn(famName, row, row);
224 region.put(put);
225 totEdits++;
230 hbaseAdmin.addReplicationPeer(id,
231 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
232 .setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()));
234 final int numEdits = totEdits;
235 Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
236 @Override
237 public boolean evaluate() throws Exception {
238 return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
241 @Override
242 public String explainFailure() throws Exception {
243 String failure = "Failed to replicate all edits, expected = " + numEdits
244 + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
245 return failure;
249 hbaseAdmin.removeReplicationPeer("testInterClusterReplication");
250 UTIL1.deleteTableData(tableName);
253 @Test
254 public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
255 ReplicationPeerConfig rpc =
256 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
257 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
258 // test that we can create mutliple WALFilters reflectively
259 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
260 EverythingPassesWALEntryFilter.class.getName() + "," +
261 EverythingPassesWALEntryFilterSubclass.class.getName());
262 hbaseAdmin.addReplicationPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
263 // now replicate some data.
264 try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
265 doPut(connection, Bytes.toBytes("row1"));
266 doPut(connection, row);
267 doPut(connection, Bytes.toBytes("row2"));
270 Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
271 @Override
272 public boolean evaluate() throws Exception {
273 return ReplicationEndpointForTest.replicateCount.get() >= 1;
277 Assert.assertNull(ReplicationEndpointWithWALEntryFilter.ex.get());
278 //make sure our reflectively created filter is in the filter chain
279 Assert.assertTrue(EverythingPassesWALEntryFilter.hasPassedAnEntry());
280 hbaseAdmin.removeReplicationPeer("testWALEntryFilterFromReplicationEndpoint");
283 @Test(expected = IOException.class)
284 public void testWALEntryFilterAddValidation() throws Exception {
285 ReplicationPeerConfig rpc =
286 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
287 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
288 // test that we can create mutliple WALFilters reflectively
289 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
290 "IAmNotARealWalEntryFilter");
291 hbaseAdmin.addReplicationPeer("testWALEntryFilterAddValidation", rpc);
294 @Test(expected = IOException.class)
295 public void testWALEntryFilterUpdateValidation() throws Exception {
296 ReplicationPeerConfig rpc =
297 new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
298 .setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
299 // test that we can create mutliple WALFilters reflectively
300 rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
301 "IAmNotARealWalEntryFilter");
302 hbaseAdmin.updateReplicationPeerConfig("testWALEntryFilterUpdateValidation", rpc);
305 @Test
306 public void testMetricsSourceBaseSourcePassthrough() {
308 * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
309 * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
310 * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
311 * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
312 * MetricsSource actually calls down through the two layers of wrapping to the actual
313 * BaseSource.
315 String id = "id";
316 DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
317 MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
318 when(singleRms.getMetricsRegistry()).thenReturn(mockRegistry);
319 MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
320 when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
322 MetricsReplicationSourceSource singleSourceSource =
323 new MetricsReplicationSourceSourceImpl(singleRms, id);
324 MetricsReplicationSourceSource globalSourceSource =
325 new MetricsReplicationGlobalSourceSource(globalRms);
326 MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
327 doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
329 Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
330 MetricsSource source =
331 new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
334 String gaugeName = "gauge";
335 String singleGaugeName = "source.id." + gaugeName;
336 String globalGaugeName = "source." + gaugeName;
337 long delta = 1;
338 String counterName = "counter";
339 String singleCounterName = "source.id." + counterName;
340 String globalCounterName = "source." + counterName;
341 long count = 2;
342 source.decGauge(gaugeName, delta);
343 source.getMetricsContext();
344 source.getMetricsDescription();
345 source.getMetricsJmxContext();
346 source.getMetricsName();
347 source.incCounters(counterName, count);
348 source.incGauge(gaugeName, delta);
349 source.init();
350 source.removeMetric(gaugeName);
351 source.setGauge(gaugeName, delta);
352 source.updateHistogram(counterName, count);
353 source.incrFailedRecoveryQueue();
356 verify(singleRms).decGauge(singleGaugeName, delta);
357 verify(globalRms).decGauge(globalGaugeName, delta);
358 verify(globalRms).getMetricsContext();
359 verify(globalRms).getMetricsJmxContext();
360 verify(globalRms).getMetricsName();
361 verify(singleRms).incCounters(singleCounterName, count);
362 verify(globalRms).incCounters(globalCounterName, count);
363 verify(singleRms).incGauge(singleGaugeName, delta);
364 verify(globalRms).incGauge(globalGaugeName, delta);
365 verify(globalRms).init();
366 verify(singleRms).removeMetric(singleGaugeName);
367 verify(globalRms).removeMetric(globalGaugeName);
368 verify(singleRms).setGauge(singleGaugeName, delta);
369 verify(globalRms).setGauge(globalGaugeName, delta);
370 verify(singleRms).updateHistogram(singleCounterName, count);
371 verify(globalRms).updateHistogram(globalCounterName, count);
372 verify(spyglobalSourceSource).incrFailedRecoveryQueue();
374 //check singleSourceSourceByTable metrics.
375 // singleSourceSourceByTable map entry will be created only
376 // after calling #setAgeOfLastShippedOpByTable
377 boolean containsRandomNewTable = source.getSingleSourceSourceByTable()
378 .containsKey("RandomNewTable");
379 Assert.assertEquals(false, containsRandomNewTable);
380 source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
381 containsRandomNewTable = source.getSingleSourceSourceByTable()
382 .containsKey("RandomNewTable");
383 Assert.assertEquals(true, containsRandomNewTable);
384 MetricsReplicationSourceSource msr = source.getSingleSourceSourceByTable()
385 .get("RandomNewTable");
386 // cannot put more concreate value here to verify because the age is arbitrary.
387 // as long as it's greater than 0, we see it as correct answer.
388 Assert.assertTrue(msr.getLastShippedAge() > 0);
392 private void doPut(byte[] row) throws IOException {
393 try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
394 doPut(connection, row);
398 private void doPut(final Connection connection, final byte [] row) throws IOException {
399 try (Table t = connection.getTable(tableName)) {
400 Put put = new Put(row);
401 put.addColumn(famName, row, row);
402 t.put(put);
406 private static void doAssert(byte[] row) throws Exception {
407 if (ReplicationEndpointForTest.lastEntries == null) {
408 return; // first call
410 Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
411 List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
412 Assert.assertEquals(1, cells.size());
413 Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
414 cells.get(0).getRowLength(), row, 0, row.length));
417 public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
418 static UUID uuid = UTIL1.getRandomUUID();
419 static AtomicInteger contructedCount = new AtomicInteger();
420 static AtomicInteger startedCount = new AtomicInteger();
421 static AtomicInteger stoppedCount = new AtomicInteger();
422 static AtomicInteger replicateCount = new AtomicInteger();
423 static volatile List<Entry> lastEntries = null;
425 public ReplicationEndpointForTest() {
426 replicateCount.set(0);
427 contructedCount.incrementAndGet();
430 @Override
431 public UUID getPeerUUID() {
432 return uuid;
435 @Override
436 public boolean replicate(ReplicateContext replicateContext) {
437 replicateCount.incrementAndGet();
438 lastEntries = new ArrayList<>(replicateContext.entries);
439 return true;
442 @Override
443 public void start() {
444 startAsync();
447 @Override
448 public void stop() {
449 stopAsync();
452 @Override
453 protected void doStart() {
454 startedCount.incrementAndGet();
455 notifyStarted();
458 @Override
459 protected void doStop() {
460 stoppedCount.incrementAndGet();
461 notifyStopped();
465 public static class InterClusterReplicationEndpointForTest
466 extends HBaseInterClusterReplicationEndpoint {
468 static AtomicInteger replicateCount = new AtomicInteger();
469 static boolean failedOnce;
471 public InterClusterReplicationEndpointForTest() {
472 replicateCount.set(0);
475 @Override
476 public boolean replicate(ReplicateContext replicateContext) {
477 boolean success = super.replicate(replicateContext);
478 if (success) {
479 replicateCount.addAndGet(replicateContext.entries.size());
481 return success;
484 @Override
485 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
486 // Fail only once, we don't want to slow down the test.
487 if (failedOnce) {
488 return () -> ordinal;
489 } else {
490 failedOnce = true;
491 return () -> {
492 throw new IOException("Sample Exception: Failed to replicate.");
498 public static class ReplicationEndpointReturningFalse extends ReplicationEndpointForTest {
499 static int COUNT = 10;
500 static AtomicReference<Exception> ex = new AtomicReference<>(null);
501 static AtomicBoolean replicated = new AtomicBoolean(false);
502 @Override
503 public boolean replicate(ReplicateContext replicateContext) {
504 try {
505 // check row
506 doAssert(row);
507 } catch (Exception e) {
508 ex.set(e);
511 super.replicate(replicateContext);
512 LOG.info("Replicated " + Bytes.toString(row) + ", count=" + replicateCount.get());
514 replicated.set(replicateCount.get() > COUNT); // first 10 times, we return false
515 return replicated.get();
519 // return a WALEntry filter which only accepts "row", but not other rows
520 public static class ReplicationEndpointWithWALEntryFilter extends ReplicationEndpointForTest {
521 static AtomicReference<Exception> ex = new AtomicReference<>(null);
523 @Override
524 public boolean replicate(ReplicateContext replicateContext) {
525 try {
526 super.replicate(replicateContext);
527 doAssert(row);
528 } catch (Exception e) {
529 ex.set(e);
531 return true;
534 @Override
535 public WALEntryFilter getWALEntryfilter() {
536 return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
537 @Override
538 public Entry filter(Entry entry) {
539 ArrayList<Cell> cells = entry.getEdit().getCells();
540 int size = cells.size();
541 for (int i = size-1; i >= 0; i--) {
542 Cell cell = cells.get(i);
543 if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
544 row, 0, row.length)) {
545 cells.remove(i);
548 return entry;
554 public static class EverythingPassesWALEntryFilter implements WALEntryFilter {
555 private static boolean passedEntry = false;
556 @Override
557 public Entry filter(Entry entry) {
558 passedEntry = true;
559 return entry;
562 public static boolean hasPassedAnEntry(){
563 return passedEntry;
567 public static class EverythingPassesWALEntryFilterSubclass
568 extends EverythingPassesWALEntryFilter {