2 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
3 * agreements. See the NOTICE file distributed with this work for additional information regarding
4 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
5 * "License"); you may not use this file except in compliance with the License. You may obtain a
6 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
7 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
8 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
9 * for the specific language governing permissions and limitations under the License.
11 package org
.apache
.hadoop
.hbase
.master
;
13 import static org
.junit
.Assert
.assertEquals
;
15 import java
.io
.IOException
;
16 import java
.util
.HashMap
;
17 import java
.util
.List
;
19 import org
.apache
.hadoop
.conf
.Configuration
;
20 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
21 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
22 import org
.apache
.hadoop
.hbase
.ServerName
;
23 import org
.apache
.hadoop
.hbase
.SingleProcessHBaseCluster
;
24 import org
.apache
.hadoop
.hbase
.StartTestingClusterOption
;
25 import org
.apache
.hadoop
.hbase
.replication
.ReplicationLoadSource
;
26 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
27 import org
.apache
.hadoop
.hbase
.testclassification
.MasterTests
;
28 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
29 import org
.apache
.hadoop
.hbase
.util
.Pair
;
30 import org
.apache
.zookeeper
.KeeperException
;
31 import org
.junit
.AfterClass
;
32 import org
.junit
.BeforeClass
;
33 import org
.junit
.ClassRule
;
34 import org
.junit
.Test
;
35 import org
.junit
.experimental
.categories
.Category
;
36 import org
.slf4j
.Logger
;
37 import org
.slf4j
.LoggerFactory
;
39 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.ProtobufUtil
;
40 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.ClusterStatusProtos
;
41 import org
.apache
.hadoop
.hbase
.shaded
.protobuf
.generated
.RegionServerStatusProtos
;
43 @Category({ MasterTests
.class, MediumTests
.class })
44 public class TestGetReplicationLoad
{
46 public static final HBaseClassTestRule CLASS_RULE
=
47 HBaseClassTestRule
.forClass(TestGetReplicationLoad
.class);
49 private static final Logger LOG
= LoggerFactory
.getLogger(TestGetReplicationLoad
.class);
51 private static SingleProcessHBaseCluster cluster
;
52 private static HMaster master
;
53 private static HBaseTestingUtil TEST_UTIL
;
55 public static class MyMaster
extends HMaster
{
56 public MyMaster(Configuration conf
) throws IOException
, KeeperException
, InterruptedException
{
62 public static void startCluster() throws Exception
{
63 LOG
.info("Starting cluster");
64 TEST_UTIL
= new HBaseTestingUtil();
65 // Set master class and use default values for other options.
66 StartTestingClusterOption option
= StartTestingClusterOption
.builder()
67 .masterClass(TestMasterMetrics
.MyMaster
.class).build();
68 TEST_UTIL
.startMiniCluster(option
);
69 cluster
= TEST_UTIL
.getHBaseCluster();
70 LOG
.info("Waiting for active/ready master");
71 cluster
.waitForActiveAndReadyMaster();
72 master
= cluster
.getMaster();
76 public static void after() throws Exception
{
77 if (TEST_UTIL
!= null) {
78 TEST_UTIL
.shutdownMiniCluster();
83 public void testGetReplicationMetrics() throws Exception
{
84 String peer1
= "test1", peer2
= "test2", queueId
="1";
85 long ageOfLastShippedOp
= 2,
87 timeStampOfLastShippedOp
= 4,
88 timeStampOfNextToReplicate
=5,
91 int sizeOfLogQueue
= 8;
92 boolean recovered
=false,
94 editsSinceRestart
=false;
95 RegionServerStatusProtos
.RegionServerReportRequest
.Builder request
=
96 RegionServerStatusProtos
.RegionServerReportRequest
.newBuilder();
97 ServerName serverName
= cluster
.getMaster(0).getServerName();
98 request
.setServer(ProtobufUtil
.toServerName(serverName
));
99 ClusterStatusProtos
.ReplicationLoadSource rload1
= ClusterStatusProtos
.ReplicationLoadSource
100 .newBuilder().setPeerID(peer1
)
101 .setAgeOfLastShippedOp(ageOfLastShippedOp
)
102 .setReplicationLag(replicationLag
)
103 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp
)
104 .setSizeOfLogQueue(sizeOfLogQueue
)
105 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate
)
107 .setEditsRead(editsRead
)
108 .setOPsShipped(oPsShipped
)
110 .setRecovered(recovered
)
111 .setEditsSinceRestart(editsSinceRestart
)
113 ClusterStatusProtos
.ReplicationLoadSource rload2
=
114 ClusterStatusProtos
.ReplicationLoadSource
117 .setAgeOfLastShippedOp(ageOfLastShippedOp
+ 1)
118 .setReplicationLag(replicationLag
+ 1)
119 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp
+ 1)
120 .setSizeOfLogQueue(sizeOfLogQueue
+ 1)
121 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate
+1)
123 .setEditsRead(editsRead
+1)
124 .setOPsShipped(oPsShipped
+1)
126 .setRecovered(recovered
)
127 .setEditsSinceRestart(editsSinceRestart
)
130 ClusterStatusProtos
.ServerLoad sl
= ClusterStatusProtos
.ServerLoad
.newBuilder()
131 .addReplLoadSource(rload1
).addReplLoadSource(rload2
).build();
133 master
.getReplicationPeerManager().addPeer(peer1
,
134 ReplicationPeerConfig
.newBuilder().setClusterKey("test").build(), true);
135 master
.getReplicationPeerManager().addPeer(peer2
,
136 ReplicationPeerConfig
.newBuilder().setClusterKey("test").build(), true);
137 master
.getMasterRpcServices().regionServerReport(null, request
.build());
138 HashMap
<String
, List
<Pair
<ServerName
, ReplicationLoadSource
>>> replicationLoad
=
139 master
.getReplicationLoad(new ServerName
[] { serverName
});
140 assertEquals("peer size ", 2, replicationLoad
.size());
141 assertEquals("load size ", 1, replicationLoad
.get(peer1
).size());
142 assertEquals("log queue size of peer1", sizeOfLogQueue
,
143 replicationLoad
.get(peer1
).get(0).getSecond().getSizeOfLogQueue());
144 assertEquals("replication lag of peer2", replicationLag
+ 1,
145 replicationLoad
.get(peer2
).get(0).getSecond().getReplicationLag());