2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertEquals
;
22 import java
.io
.IOException
;
23 import java
.util
.Collections
;
24 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
25 import org
.apache
.hadoop
.hbase
.HConstants
;
26 import org
.apache
.hadoop
.hbase
.TableName
;
27 import org
.apache
.hadoop
.hbase
.Waiter
.ExplainingPredicate
;
28 import org
.apache
.hadoop
.hbase
.client
.Put
;
29 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
30 import org
.apache
.hadoop
.hbase
.client
.Table
;
31 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
32 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
33 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
34 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
.StreamLacksCapabilityException
;
35 import org
.junit
.Before
;
36 import org
.junit
.ClassRule
;
37 import org
.junit
.Test
;
38 import org
.junit
.experimental
.categories
.Category
;
40 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
43 * Testcase for HBASE-20296.
45 @Category({ ReplicationTests
.class, MediumTests
.class })
46 public class TestRemoveFromSerialReplicationPeer
extends SerialReplicationTestBase
{
49 public static final HBaseClassTestRule CLASS_RULE
=
50 HBaseClassTestRule
.forClass(TestRemoveFromSerialReplicationPeer
.class);
53 public void setUp() throws IOException
, StreamLacksCapabilityException
{
57 private void waitUntilHasLastPushedSequenceId(RegionInfo region
) throws Exception
{
58 ReplicationQueueStorage queueStorage
=
59 UTIL
.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
60 UTIL
.waitFor(30000, new ExplainingPredicate
<Exception
>() {
63 public boolean evaluate() throws Exception
{
64 return queueStorage
.getLastSequenceId(region
.getEncodedName(), PEER_ID
) > 0;
68 public String
explainFailure() throws Exception
{
69 return "Still no last pushed sequence id for " + region
;
75 public void testRemoveTable() throws Exception
{
76 TableName tableName
= createTable();
77 ReplicationPeerConfig peerConfig
= ReplicationPeerConfig
.newBuilder()
78 .setClusterKey("127.0.0.1:2181:/hbase")
79 .setReplicationEndpointImpl(LocalReplicationEndpoint
.class.getName())
80 .setReplicateAllUserTables(false)
81 .setTableCFsMap(ImmutableMap
.of(tableName
, Collections
.emptyList())).setSerial(true).build();
82 UTIL
.getAdmin().addReplicationPeer(PEER_ID
, peerConfig
, true);
83 try (Table table
= UTIL
.getConnection().getTable(tableName
)) {
84 for (int i
= 0; i
< 100; i
++) {
85 table
.put(new Put(Bytes
.toBytes(i
)).addColumn(CF
, CQ
, Bytes
.toBytes(i
)));
88 RegionInfo region
= UTIL
.getMiniHBaseCluster().getRegions(tableName
).get(0).getRegionInfo();
89 waitUntilHasLastPushedSequenceId(region
);
91 UTIL
.getAdmin().updateReplicationPeerConfig(PEER_ID
,
92 ReplicationPeerConfig
.newBuilder(peerConfig
).setTableCFsMap(Collections
.emptyMap()).build());
94 ReplicationQueueStorage queueStorage
=
95 UTIL
.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
96 assertEquals(HConstants
.NO_SEQNUM
,
97 queueStorage
.getLastSequenceId(region
.getEncodedName(), PEER_ID
));
101 public void testRemoveSerialFlag() throws Exception
{
102 TableName tableName
= createTable();
104 try (Table table
= UTIL
.getConnection().getTable(tableName
)) {
105 for (int i
= 0; i
< 100; i
++) {
106 table
.put(new Put(Bytes
.toBytes(i
)).addColumn(CF
, CQ
, Bytes
.toBytes(i
)));
109 RegionInfo region
= UTIL
.getMiniHBaseCluster().getRegions(tableName
).get(0).getRegionInfo();
110 waitUntilHasLastPushedSequenceId(region
);
111 UTIL
.getAdmin().updateReplicationPeerConfig(PEER_ID
, ReplicationPeerConfig
112 .newBuilder(UTIL
.getAdmin().getReplicationPeerConfig(PEER_ID
)).setSerial(false).build());
113 waitUntilReplicationDone(100);
115 ReplicationQueueStorage queueStorage
=
116 UTIL
.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
117 assertEquals(HConstants
.NO_SEQNUM
,
118 queueStorage
.getLastSequenceId(region
.getEncodedName(), PEER_ID
));