2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertTrue
;
22 import java
.io
.IOException
;
23 import java
.util
.EnumSet
;
24 import java
.util
.List
;
25 import org
.apache
.hadoop
.hbase
.ClusterMetrics
;
26 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
27 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
28 import org
.apache
.hadoop
.hbase
.ServerMetrics
;
29 import org
.apache
.hadoop
.hbase
.ServerName
;
30 import org
.apache
.hadoop
.hbase
.Waiter
;
31 import org
.apache
.hadoop
.hbase
.client
.Admin
;
32 import org
.apache
.hadoop
.hbase
.client
.Put
;
33 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
35 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
36 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
37 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
38 import org
.apache
.hadoop
.hbase
.util
.Threads
;
39 import org
.junit
.ClassRule
;
40 import org
.junit
.Test
;
41 import org
.junit
.experimental
.categories
.Category
;
42 import org
.slf4j
.Logger
;
43 import org
.slf4j
.LoggerFactory
;
45 @Category({ ReplicationTests
.class, MediumTests
.class })
46 public class TestReplicationStatus
extends TestReplicationBase
{
47 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationStatus
.class);
50 public static final HBaseClassTestRule CLASS_RULE
=
51 HBaseClassTestRule
.forClass(TestReplicationStatus
.class);
54 * Test for HBASE-9531.
56 * put a few rows into htable1, which should be replicated to htable2 <br/>
57 * create a ClusterStatus instance 'status' from HBaseAdmin <br/>
58 * test : status.getLoad(server).getReplicationLoadSourceList() <br/>
59 * test : status.getLoad(server).getReplicationLoadSink()
62 public void testReplicationStatus() throws Exception
{
63 // This test wants two RS's up. We only run one generally so add one.
64 UTIL1
.getMiniHBaseCluster().startRegionServer();
65 Waiter
.waitFor(UTIL1
.getConfiguration(), 30000, new Waiter
.Predicate
<Exception
>() {
66 @Override public boolean evaluate() throws Exception
{
67 return UTIL1
.getMiniHBaseCluster().getLiveRegionServerThreads().size() > 1;
70 Admin hbaseAdmin
= UTIL1
.getAdmin();
71 // disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
72 hbaseAdmin
.disableReplicationPeer(PEER_ID2
);
73 final byte[] qualName
= Bytes
.toBytes("q");
74 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
75 Put p
= new Put(Bytes
.toBytes("row" + i
));
76 p
.addColumn(famName
, qualName
, Bytes
.toBytes("val" + i
));
79 LOG
.info("AFTER PUTS");
80 // TODO: Change this wait to a barrier. I tried waiting on replication stats to
81 // change but sleeping in main thread seems to mess up background replication.
82 // HACK! To address flakeyness.
84 ClusterMetrics metrics
= hbaseAdmin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
));
85 for (JVMClusterUtil
.RegionServerThread thread
: UTIL1
.getHBaseCluster().
86 getRegionServerThreads()) {
87 ServerName server
= thread
.getRegionServer().getServerName();
88 assertTrue("" + server
, metrics
.getLiveServerMetrics().containsKey(server
));
89 ServerMetrics sm
= metrics
.getLiveServerMetrics().get(server
);
90 List
<ReplicationLoadSource
> rLoadSourceList
= sm
.getReplicationLoadSourceList();
91 ReplicationLoadSink rLoadSink
= sm
.getReplicationLoadSink();
93 // check SourceList only has one entry, because only has one peer
94 assertEquals("Failed to get ReplicationLoadSourceList " +
95 rLoadSourceList
+ ", " + server
,1, rLoadSourceList
.size());
96 assertEquals(PEER_ID2
, rLoadSourceList
.get(0).getPeerID());
98 // check Sink exist only as it is difficult to verify the value on the fly
99 assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
100 (rLoadSink
.getAgeOfLastAppliedOp() >= 0));
101 assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
102 (rLoadSink
.getTimestampsOfLastAppliedOp() >= 0));
105 // Stop rs1, then the queue of rs1 will be transfered to rs0
106 HRegionServer hrs
= UTIL1
.getHBaseCluster().getRegionServer(1);
107 hrs
.stop("Stop RegionServer");
108 while(!hrs
.isShutDown()) {
111 // To be sure it dead and references cleaned up. TODO: Change this to a barrier.
112 // I tried waiting on replication stats to change but sleeping in main thread
113 // seems to mess up background replication.
114 Threads
.sleep(10000);
115 ServerName server
= UTIL1
.getHBaseCluster().getRegionServer(0).getServerName();
116 List
<ReplicationLoadSource
> rLoadSourceList
= waitOnMetricsReport(1, server
);
117 // The remaining server should now have two queues -- the original and then the one that was
118 // added because of failover. The original should still be PEER_ID2 though.
119 assertEquals("Failed ReplicationLoadSourceList " + rLoadSourceList
, 2, rLoadSourceList
.size());
120 assertEquals(PEER_ID2
, rLoadSourceList
.get(0).getPeerID());
124 * Wait until Master shows metrics counts for ReplicationLoadSourceList that are
125 * greater than <code>greaterThan</code> for <code>serverName</code> before
126 * returning. We want to avoid case where RS hasn't yet updated Master before
127 * allowing test proceed.
128 * @param greaterThan size of replicationLoadSourceList must be greater before we proceed
130 private List
<ReplicationLoadSource
> waitOnMetricsReport(int greaterThan
, ServerName serverName
)
132 ClusterMetrics metrics
= hbaseAdmin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
));
133 List
<ReplicationLoadSource
> list
=
134 metrics
.getLiveServerMetrics().get(serverName
).getReplicationLoadSourceList();
135 while(list
.size() <= greaterThan
) {