HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestVerifyCellsReplicationEndpoint.java
blobf1b1004076a4ce3a49afde1a176c921ccbc95187
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import static org.junit.Assert.assertArrayEquals;
22 import java.util.concurrent.BlockingQueue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26 import org.apache.hadoop.hbase.Cell;
27 import org.apache.hadoop.hbase.CellUtil;
28 import org.apache.hadoop.hbase.HBaseClassTestRule;
29 import org.apache.hadoop.hbase.HBaseTestingUtility;
30 import org.apache.hadoop.hbase.TableName;
31 import org.apache.hadoop.hbase.client.Put;
32 import org.apache.hadoop.hbase.client.Table;
33 import org.apache.hadoop.hbase.testclassification.MediumTests;
34 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.apache.hadoop.hbase.wal.WAL;
37 import org.apache.hadoop.hbase.wal.WALEdit;
38 import org.junit.AfterClass;
39 import org.junit.BeforeClass;
40 import org.junit.ClassRule;
41 import org.junit.Test;
42 import org.junit.experimental.categories.Category;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 /**
47 * Confirm that the empty replication endpoint can work.
49 @Category({ ReplicationTests.class, MediumTests.class })
50 public class TestVerifyCellsReplicationEndpoint {
52 @ClassRule
53 public static final HBaseClassTestRule CLASS_RULE =
54 HBaseClassTestRule.forClass(TestVerifyCellsReplicationEndpoint.class);
56 private static final Logger LOG =
57 LoggerFactory.getLogger(TestVerifyCellsReplicationEndpoint.class);
59 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
61 private static final TableName TABLE_NAME = TableName.valueOf("empty");
63 private static final byte[] CF = Bytes.toBytes("family");
65 private static final byte[] CQ = Bytes.toBytes("qualifier");
67 private static final String PEER_ID = "empty";
69 private static final BlockingQueue<Cell> CELLS = new LinkedBlockingQueue<>();
71 public static final class EndpointForTest extends VerifyWALEntriesReplicationEndpoint {
73 @Override
74 public boolean replicate(ReplicateContext replicateContext) {
75 LOG.info(replicateContext.getEntries().toString());
76 replicateContext.entries.stream().map(WAL.Entry::getEdit).map(WALEdit::getCells)
77 .forEachOrdered(CELLS::addAll);
78 return super.replicate(replicateContext);
82 @BeforeClass
83 public static void setUp() throws Exception {
84 UTIL.startMiniCluster(3);
85 // notice that we do not need to set replication scope here, EmptyReplicationEndpoint take all
86 // edits no matter what the replications scope is.
87 UTIL.createTable(TABLE_NAME, CF);
88 UTIL.getAdmin().addReplicationPeer(PEER_ID,
89 ReplicationPeerConfig.newBuilder().setClusterKey("zk1:8888:/hbase")
90 .setReplicationEndpointImpl(EndpointForTest.class.getName()).build());
93 @AfterClass
94 public static void tearDown() throws Exception {
95 UTIL.shutdownMiniCluster();
98 @Test
99 public void test() throws Exception {
100 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
101 for (int i = 0; i < 100; i++) {
102 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
105 long lastNoCellTime = -1;
106 for (int i = 0; i < 100;) {
107 Cell cell = CELLS.poll();
108 if (cell == null) {
109 if (lastNoCellTime < 0) {
110 lastNoCellTime = System.nanoTime();
111 } else {
112 if (System.nanoTime() - lastNoCellTime >= TimeUnit.SECONDS.toNanos(30)) {
113 throw new TimeoutException("Timeout waiting for wal edit");
116 Thread.sleep(1000);
117 continue;
119 lastNoCellTime = -1;
120 if (!Bytes.equals(CF, CellUtil.cloneFamily(cell))) {
121 // meta edits, such as open/close/flush, etc. skip
122 continue;
124 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneRow(cell));
125 assertArrayEquals(CQ, CellUtil.cloneQualifier(cell));
126 assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(cell));
127 i++;