2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import java
.io
.IOException
;
23 import java
.util
.EnumSet
;
24 import java
.util
.List
;
25 import org
.apache
.hadoop
.hbase
.ClusterMetrics
;
26 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
27 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
28 import org
.apache
.hadoop
.hbase
.ServerMetrics
;
29 import org
.apache
.hadoop
.hbase
.ServerName
;
30 import org
.apache
.hadoop
.hbase
.Waiter
;
31 import org
.apache
.hadoop
.hbase
.client
.Admin
;
32 import org
.apache
.hadoop
.hbase
.client
.Put
;
33 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
35 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
36 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
37 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
38 import org
.apache
.hadoop
.hbase
.util
.Threads
;
39 import org
.junit
.ClassRule
;
40 import org
.junit
.Test
;
41 import org
.junit
.experimental
.categories
.Category
;
42 import org
.slf4j
.Logger
;
43 import org
.slf4j
.LoggerFactory
;
45 @Category({ ReplicationTests
.class, MediumTests
.class })
46 public class TestReplicationStatus
extends TestReplicationBase
{
47 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationStatus
.class);
50 public static final HBaseClassTestRule CLASS_RULE
=
51 HBaseClassTestRule
.forClass(TestReplicationStatus
.class);
53 static void insertRowsOnSource() throws IOException
{
54 final byte[] qualName
= Bytes
.toBytes("q");
55 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
56 Put p
= new Put(Bytes
.toBytes("row" + i
));
57 p
.addColumn(famName
, qualName
, Bytes
.toBytes("val" + i
));
63 * Test for HBASE-9531.
65 * put a few rows into htable1, which should be replicated to htable2 <br/>
66 * create a ClusterStatus instance 'status' from HBaseAdmin <br/>
67 * test : status.getLoad(server).getReplicationLoadSourceList() <br/>
68 * test : status.getLoad(server).getReplicationLoadSink()
71 public void testReplicationStatus() throws Exception
{
72 // This test wants two RS's up. We only run one generally so add one.
73 UTIL1
.getMiniHBaseCluster().startRegionServer();
74 Waiter
.waitFor(UTIL1
.getConfiguration(), 30000, new Waiter
.Predicate
<Exception
>() {
75 @Override public boolean evaluate() throws Exception
{
76 return UTIL1
.getMiniHBaseCluster().getLiveRegionServerThreads().size() > 1;
79 Admin hbaseAdmin
= UTIL1
.getAdmin();
80 // disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
81 hbaseAdmin
.disableReplicationPeer(PEER_ID2
);
83 LOG
.info("AFTER PUTS");
84 // TODO: Change this wait to a barrier. I tried waiting on replication stats to
85 // change but sleeping in main thread seems to mess up background replication.
86 // HACK! To address flakeyness.
88 ClusterMetrics metrics
= hbaseAdmin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
));
89 for (JVMClusterUtil
.RegionServerThread thread
: UTIL1
.getHBaseCluster().
90 getRegionServerThreads()) {
91 ServerName server
= thread
.getRegionServer().getServerName();
92 assertTrue("" + server
, metrics
.getLiveServerMetrics().containsKey(server
));
93 ServerMetrics sm
= metrics
.getLiveServerMetrics().get(server
);
94 List
<ReplicationLoadSource
> rLoadSourceList
= sm
.getReplicationLoadSourceList();
95 ReplicationLoadSink rLoadSink
= sm
.getReplicationLoadSink();
97 // check SourceList only has one entry, because only has one peer
98 assertEquals("Failed to get ReplicationLoadSourceList " +
99 rLoadSourceList
+ ", " + server
,1, rLoadSourceList
.size());
100 assertEquals(PEER_ID2
, rLoadSourceList
.get(0).getPeerID());
102 // check Sink exist only as it is difficult to verify the value on the fly
103 assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
104 (rLoadSink
.getAgeOfLastAppliedOp() >= 0));
105 assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
106 (rLoadSink
.getTimestampsOfLastAppliedOp() >= 0));
109 // Stop rs1, then the queue of rs1 will be transfered to rs0
110 HRegionServer hrs
= UTIL1
.getHBaseCluster().getRegionServer(1);
111 hrs
.stop("Stop RegionServer");
112 while(hrs
.isAlive()) {
115 // To be sure it dead and references cleaned up. TODO: Change this to a barrier.
116 // I tried waiting on replication stats to change but sleeping in main thread
117 // seems to mess up background replication.
118 Threads
.sleep(10000);
119 ServerName server
= UTIL1
.getHBaseCluster().getRegionServer(0).getServerName();
120 List
<ReplicationLoadSource
> rLoadSourceList
= waitOnMetricsReport(1, server
);
121 // The remaining server should now have two queues -- the original and then the one that was
122 // added because of failover. The original should still be PEER_ID2 though.
123 assertEquals("Failed ReplicationLoadSourceList " + rLoadSourceList
, 2, rLoadSourceList
.size());
124 assertEquals(PEER_ID2
, rLoadSourceList
.get(0).getPeerID());
128 * Wait until Master shows metrics counts for ReplicationLoadSourceList that are
129 * greater than <code>greaterThan</code> for <code>serverName</code> before
130 * returning. We want to avoid case where RS hasn't yet updated Master before
131 * allowing test proceed.
132 * @param greaterThan size of replicationLoadSourceList must be greater before we proceed
134 private List
<ReplicationLoadSource
> waitOnMetricsReport(int greaterThan
, ServerName serverName
)
136 ClusterMetrics metrics
= hbaseAdmin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
));
137 List
<ReplicationLoadSource
> list
=
138 metrics
.getLiveServerMetrics().get(serverName
).getReplicationLoadSourceList();
139 while(list
.size() <= greaterThan
) {