2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import java
.io
.IOException
;
21 import java
.util
.Collections
;
22 import java
.util
.List
;
23 import org
.apache
.hadoop
.conf
.Configuration
;
24 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
25 import org
.apache
.hadoop
.hbase
.HConstants
;
26 import org
.apache
.hadoop
.hbase
.ServerName
;
27 import org
.apache
.hadoop
.hbase
.TableName
;
28 import org
.apache
.hadoop
.hbase
.client
.Table
;
29 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
30 import org
.apache
.hadoop
.hbase
.master
.MasterServices
;
31 import org
.apache
.hadoop
.hbase
.master
.ServerManager
;
32 import org
.apache
.hadoop
.hbase
.master
.procedure
.ServerCrashProcedure
;
33 import org
.apache
.hadoop
.hbase
.master
.replication
.ClaimReplicationQueuesProcedure
;
34 import org
.apache
.hadoop
.hbase
.procedure2
.Procedure
;
35 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
36 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
37 import org
.junit
.AfterClass
;
38 import org
.junit
.BeforeClass
;
39 import org
.junit
.ClassRule
;
40 import org
.junit
.Test
;
41 import org
.junit
.experimental
.categories
.Category
;
43 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
45 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ProcedureProtos
.ProcedureState
;
48 * In HBASE-26029, we reimplement the claim queue operation with proc-v2 and make it a step in SCP,
49 * this is a UT to make sure the {@link ClaimReplicationQueuesProcedure} works correctly.
51 @Category({ ReplicationTests
.class, LargeTests
.class })
52 public class TestClaimReplicationQueue
extends TestReplicationBase
{
55 public static final HBaseClassTestRule CLASS_RULE
=
56 HBaseClassTestRule
.forClass(TestClaimReplicationQueue
.class);
58 private static final TableName tableName3
= TableName
.valueOf("test3");
60 private static final String PEER_ID3
= "3";
62 private static Table table3
;
64 private static Table table4
;
66 private static volatile boolean EMPTY
= false;
68 public static final class ServerManagerForTest
extends ServerManager
{
70 public ServerManagerForTest(MasterServices master
) {
75 public List
<ServerName
> getOnlineServersList() {
76 // return no region server to make the procedure hang
78 for (StackTraceElement e
: Thread
.currentThread().getStackTrace()) {
79 if (e
.getClassName().equals(ClaimReplicationQueuesProcedure
.class.getName())) {
80 return Collections
.emptyList();
84 return super.getOnlineServersList();
88 public static final class HMasterForTest
extends HMaster
{
90 public HMasterForTest(Configuration conf
) throws IOException
{
95 protected ServerManager
createServerManager(MasterServices master
) throws IOException
{
96 setupClusterConnection();
97 return new ServerManagerForTest(master
);
102 public static void setUpBeforeClass() throws Exception
{
103 CONF1
.setClass(HConstants
.MASTER_IMPL
, HMasterForTest
.class, HMaster
.class);
104 TestReplicationBase
.setUpBeforeClass();
105 createTable(tableName3
);
106 table3
= connection1
.getTable(tableName3
);
107 table4
= connection2
.getTable(tableName3
);
111 public static void tearDownAfterClass() throws Exception
{
112 Closeables
.close(table3
, true);
113 Closeables
.close(table4
, true);
114 TestReplicationBase
.tearDownAfterClass();
118 public void setUpBase() throws Exception
{
120 // set up two replication peers and only 1 rs to test claim replication queue with multiple
122 addPeer(PEER_ID3
, tableName3
);
126 public void tearDownBase() throws Exception
{
127 super.tearDownBase();
128 removePeer(PEER_ID3
);
132 public void testClaim() throws Exception
{
134 hbaseAdmin
.disableReplicationPeer(PEER_ID2
);
135 hbaseAdmin
.disableReplicationPeer(PEER_ID3
);
138 int count1
= UTIL1
.loadTable(htable1
, famName
);
139 int count2
= UTIL1
.loadTable(table3
, famName
);
142 UTIL1
.getMiniHBaseCluster().stopRegionServer(0).join();
143 UTIL1
.getMiniHBaseCluster().startRegionServer();
145 // since there is no active region server to get the replication queue, the procedure should be
146 // in WAITING_TIMEOUT state for most time to retry
147 HMaster master
= UTIL1
.getMiniHBaseCluster().getMaster();
149 () -> master
.getProcedures().stream()
150 .filter(p
-> p
instanceof ClaimReplicationQueuesProcedure
)
151 .anyMatch(p
-> p
.getState() == ProcedureState
.WAITING_TIMEOUT
));
153 hbaseAdmin
.enableReplicationPeer(PEER_ID2
);
154 hbaseAdmin
.enableReplicationPeer(PEER_ID3
);
157 // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
158 UTIL1
.waitFor(30000, () -> master
.getProcedures().stream()
159 .filter(p
-> p
instanceof ServerCrashProcedure
).allMatch(Procedure
::isSuccess
));
161 // we should get all the data in the target cluster
162 waitForReplication(htable2
, count1
, NB_RETRIES
);
163 waitForReplication(table4
, count2
, NB_RETRIES
);