2 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3 * agreements. See the NOTICE file distributed with this work for additional information regarding
4 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9 * for the specific language governing permissions and limitations under the License.
11 package org
.apache
.hadoop
.hbase
.master
;
13 import static org
.junit
.Assert
.assertEquals
;
15 import java
.io
.IOException
;
16 import java
.util
.HashMap
;
17 import java
.util
.List
;
19 import org
.apache
.hadoop
.conf
.Configuration
;
20 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
21 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
22 import org
.apache
.hadoop
.hbase
.MiniHBaseCluster
;
23 import org
.apache
.hadoop
.hbase
.ServerName
;
24 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
25 import org
.apache
.hadoop
.hbase
.replication
.ReplicationLoadSource
;
26 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
27 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
28 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
29 import org
.apache
.hadoop
.hbase
.util
.Pair
;
30 import org
.apache
.zookeeper
.KeeperException
;
31 import org
.junit
.AfterClass
;
32 import org
.junit
.BeforeClass
;
33 import org
.junit
.ClassRule
;
34 import org
.junit
.Test
;
35 import org
.junit
.experimental
.categories
.Category
;
36 import org
.slf4j
.Logger
;
37 import org
.slf4j
.LoggerFactory
;
39 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
40 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
43 @Category({ MasterTests
.class, MediumTests
.class })
44 public class TestGetReplicationLoad
{
46 public static final HBaseClassTestRule CLASS_RULE
=
47 HBaseClassTestRule
.forClass(TestGetReplicationLoad
.class);
49 private static final Logger LOG
= LoggerFactory
.getLogger(TestGetReplicationLoad
.class);
51 private static MiniHBaseCluster cluster
;
52 private static HMaster master
;
53 private static HBaseTestingUtility TEST_UTIL
;
55 public static class MyMaster
extends HMaster
{
56 public MyMaster(Configuration conf
) throws IOException
, KeeperException
, InterruptedException
{
61 protected void tryRegionServerReport(long reportStartTime
, long reportEndTime
) {
67 public static void startCluster() throws Exception
{
68 LOG
.info("Starting cluster");
69 TEST_UTIL
= new HBaseTestingUtility();
70 // Set master class and use default values for other options.
71 StartMiniClusterOption option
= StartMiniClusterOption
.builder()
72 .masterClass(TestMasterMetrics
.MyMaster
.class).build();
73 TEST_UTIL
.startMiniCluster(option
);
74 cluster
= TEST_UTIL
.getHBaseCluster();
75 LOG
.info("Waiting for active/ready master");
76 cluster
.waitForActiveAndReadyMaster();
77 master
= cluster
.getMaster();
81 public static void after() throws Exception
{
82 if (TEST_UTIL
!= null) {
83 TEST_UTIL
.shutdownMiniCluster();
88 public void testGetReplicationMetrics() throws Exception
{
89 String peer1
= "test1", peer2
= "test2", queueId
="1";
90 long ageOfLastShippedOp
= 2,
92 timeStampOfLastShippedOp
= 4,
93 timeStampOfNextToReplicate
=5,
96 int sizeOfLogQueue
= 8;
97 boolean recovered
=false,
99 editsSinceRestart
=false;
100 RegionServerStatusProtos
.RegionServerReportRequest
.Builder request
=
101 RegionServerStatusProtos
.RegionServerReportRequest
.newBuilder();
102 ServerName serverName
= cluster
.getMaster(0).getServerName();
103 request
.setServer(ProtobufUtil
.toServerName(serverName
));
104 ClusterStatusProtos
.ReplicationLoadSource rload1
= ClusterStatusProtos
.ReplicationLoadSource
105 .newBuilder().setPeerID(peer1
)
106 .setAgeOfLastShippedOp(ageOfLastShippedOp
)
107 .setReplicationLag(replicationLag
)
108 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp
)
109 .setSizeOfLogQueue(sizeOfLogQueue
)
110 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate
)
112 .setEditsRead(editsRead
)
113 .setOPsShipped(oPsShipped
)
115 .setRecovered(recovered
)
116 .setEditsSinceRestart(editsSinceRestart
)
118 ClusterStatusProtos
.ReplicationLoadSource rload2
=
119 ClusterStatusProtos
.ReplicationLoadSource
122 .setAgeOfLastShippedOp(ageOfLastShippedOp
+ 1)
123 .setReplicationLag(replicationLag
+ 1)
124 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp
+ 1)
125 .setSizeOfLogQueue(sizeOfLogQueue
+ 1)
126 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate
+1)
128 .setEditsRead(editsRead
+1)
129 .setOPsShipped(oPsShipped
+1)
131 .setRecovered(recovered
)
132 .setEditsSinceRestart(editsSinceRestart
)
135 ClusterStatusProtos
.ServerLoad sl
= ClusterStatusProtos
.ServerLoad
.newBuilder()
136 .addReplLoadSource(rload1
).addReplLoadSource(rload2
).build();
138 master
.getReplicationPeerManager().addPeer(peer1
,
139 ReplicationPeerConfig
.newBuilder().setClusterKey("test").build(), true);
140 master
.getReplicationPeerManager().addPeer(peer2
,
141 ReplicationPeerConfig
.newBuilder().setClusterKey("test").build(), true);
142 master
.getMasterRpcServices().regionServerReport(null, request
.build());
143 HashMap
<String
, List
<Pair
<ServerName
, ReplicationLoadSource
>>> replicationLoad
=
144 master
.getReplicationLoad(new ServerName
[] { serverName
});
145 assertEquals("peer size ", 2, replicationLoad
.size());
146 assertEquals("load size ", 1, replicationLoad
.get(peer1
).size());
147 assertEquals("log queue size of peer1", sizeOfLogQueue
,
148 replicationLoad
.get(peer1
).get(0).getSecond().getSizeOfLogQueue());
149 assertEquals("replication lag of peer2", replicationLag
+ 1,
150 replicationLoad
.get(peer2
).get(0).getSecond().getReplicationLag());