2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
22 import java
.util
.concurrent
.BlockingQueue
;
23 import java
.util
.concurrent
.LinkedBlockingQueue
;
24 import java
.util
.concurrent
.TimeUnit
;
25 import java
.util
.concurrent
.TimeoutException
;
26 import org
.apache
.hadoop
.hbase
.Cell
;
27 import org
.apache
.hadoop
.hbase
.CellUtil
;
28 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
29 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
30 import org
.apache
.hadoop
.hbase
.TableName
;
31 import org
.apache
.hadoop
.hbase
.client
.Put
;
32 import org
.apache
.hadoop
.hbase
.client
.Table
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
37 import org
.apache
.hadoop
.hbase
.wal
.WALEdit
;
38 import org
.junit
.AfterClass
;
39 import org
.junit
.BeforeClass
;
40 import org
.junit
.ClassRule
;
41 import org
.junit
.Test
;
42 import org
.junit
.experimental
.categories
.Category
;
43 import org
.slf4j
.Logger
;
44 import org
.slf4j
.LoggerFactory
;
47 * Confirm that the empty replication endpoint can work.
49 @Category({ ReplicationTests
.class, MediumTests
.class })
50 public class TestVerifyCellsReplicationEndpoint
{
53 public static final HBaseClassTestRule CLASS_RULE
=
54 HBaseClassTestRule
.forClass(TestVerifyCellsReplicationEndpoint
.class);
56 private static final Logger LOG
=
57 LoggerFactory
.getLogger(TestVerifyCellsReplicationEndpoint
.class);
59 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
61 private static final TableName TABLE_NAME
= TableName
.valueOf("empty");
63 private static final byte[] CF
= Bytes
.toBytes("family");
65 private static final byte[] CQ
= Bytes
.toBytes("qualifier");
67 private static final String PEER_ID
= "empty";
69 private static final BlockingQueue
<Cell
> CELLS
= new LinkedBlockingQueue
<>();
71 public static final class EndpointForTest
extends VerifyWALEntriesReplicationEndpoint
{
74 public boolean replicate(ReplicateContext replicateContext
) {
75 LOG
.info(replicateContext
.getEntries().toString());
76 replicateContext
.entries
.stream().map(WAL
.Entry
::getEdit
).map(WALEdit
::getCells
)
77 .forEachOrdered(CELLS
::addAll
);
78 return super.replicate(replicateContext
);
83 public static void setUp() throws Exception
{
84 UTIL
.startMiniCluster(3);
85 // notice that we do not need to set replication scope here, EmptyReplicationEndpoint take all
86 // edits no matter what the replications scope is.
87 UTIL
.createTable(TABLE_NAME
, CF
);
88 UTIL
.getAdmin().addReplicationPeer(PEER_ID
,
89 ReplicationPeerConfig
.newBuilder().setClusterKey("zk1:8888:/hbase")
90 .setReplicationEndpointImpl(EndpointForTest
.class.getName()).build());
94 public static void tearDown() throws Exception
{
95 UTIL
.shutdownMiniCluster();
99 public void test() throws Exception
{
100 try (Table table
= UTIL
.getConnection().getTable(TABLE_NAME
)) {
101 for (int i
= 0; i
< 100; i
++) {
102 table
.put(new Put(Bytes
.toBytes(i
)).addColumn(CF
, CQ
, Bytes
.toBytes(i
)));
105 long lastNoCellTime
= -1;
106 for (int i
= 0; i
< 100;) {
107 Cell cell
= CELLS
.poll();
109 if (lastNoCellTime
< 0) {
110 lastNoCellTime
= System
.nanoTime();
112 if (System
.nanoTime() - lastNoCellTime
>= TimeUnit
.SECONDS
.toNanos(30)) {
113 throw new TimeoutException("Timeout waiting for wal edit");
120 if (!Bytes
.equals(CF
, CellUtil
.cloneFamily(cell
))) {
121 // meta edits, such as open/close/flush, etc. skip
124 assertArrayEquals(Bytes
.toBytes(i
), CellUtil
.cloneRow(cell
));
125 assertArrayEquals(CQ
, CellUtil
.cloneQualifier(cell
));
126 assertArrayEquals(Bytes
.toBytes(i
), CellUtil
.cloneValue(cell
));