2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.fail
;
23 import java
.io
.IOException
;
24 import java
.util
.List
;
25 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
26 import org
.apache
.hadoop
.hbase
.SingleProcessHBaseCluster
;
27 import org
.apache
.hadoop
.hbase
.Waiter
;
28 import org
.apache
.hadoop
.hbase
.client
.Get
;
29 import org
.apache
.hadoop
.hbase
.client
.Put
;
30 import org
.apache
.hadoop
.hbase
.client
.Result
;
31 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
32 import org
.apache
.hadoop
.hbase
.client
.Scan
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
37 import org
.junit
.Before
;
38 import org
.junit
.ClassRule
;
39 import org
.junit
.Test
;
40 import org
.junit
.experimental
.categories
.Category
;
41 import org
.junit
.runner
.RunWith
;
42 import org
.junit
.runners
.Parameterized
;
43 import org
.junit
.runners
.Parameterized
.Parameter
;
44 import org
.junit
.runners
.Parameterized
.Parameters
;
45 import org
.slf4j
.Logger
;
46 import org
.slf4j
.LoggerFactory
;
47 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableList
;
50 * Test handling of changes to the number of a peer's regionservers.
52 @RunWith(Parameterized
.class)
53 @Category({ ReplicationTests
.class, LargeTests
.class })
54 public class TestReplicationChangingPeerRegionservers
extends TestReplicationBase
{
57 public static final HBaseClassTestRule CLASS_RULE
=
58 HBaseClassTestRule
.forClass(TestReplicationChangingPeerRegionservers
.class);
60 private static final Logger LOG
=
61 LoggerFactory
.getLogger(TestReplicationChangingPeerRegionservers
.class);
63 @SuppressWarnings("checkstyle:VisibilityModifier") @Parameter(0)
64 public boolean serialPeer
;
67 public boolean syncPeer
;
70 protected boolean isSerialPeer() {
75 protected boolean isSyncPeer() {
79 @Parameters(name
= "{index}: serialPeer={0}, syncPeer={1}")
80 public static List
<Object
[]> parameters() {
81 return ImmutableList
.of(new Object
[] { false, false }, new Object
[] { false, true },
82 new Object
[] { true, false }, new Object
[] { true, true });
86 public void setUp() throws Exception
{
87 // Starting and stopping replication can make us miss new logs,
88 // rolling like this makes sure the most recent one gets added to the queue
89 for (JVMClusterUtil
.RegionServerThread r
: UTIL1
.getHBaseCluster()
90 .getRegionServerThreads()) {
91 UTIL1
.getAdmin().rollWALWriter(r
.getRegionServer().getServerName());
93 UTIL1
.deleteTableData(tableName
);
94 // truncating the table will send one Delete per row to the slave cluster
95 // in an async fashion, which is why we cannot just call deleteTableData on
96 // utility2 since late writes could make it to the slave in some way.
97 // Instead, we truncate the first table and wait for all the Deletes to
98 // make it to the slave.
99 Scan scan
= new Scan();
101 for (int i
= 0; i
< NB_RETRIES
; i
++) {
102 if (i
== NB_RETRIES
- 1) {
103 fail("Waited too much time for truncate");
105 ResultScanner scanner
= htable2
.getScanner(scan
);
106 Result
[] res
= scanner
.next(NB_ROWS_IN_BIG_BATCH
);
108 if (res
.length
!= 0) {
109 if (res
.length
< lastCount
) {
110 i
--; // Don't increment timeout if we make progress
112 lastCount
= res
.length
;
113 LOG
.info("Still got " + res
.length
+ " rows");
114 Thread
.sleep(SLEEP_TIME
);
122 public void testChangingNumberOfPeerRegionServers() throws IOException
, InterruptedException
{
123 LOG
.info("testSimplePutDelete");
124 SingleProcessHBaseCluster peerCluster
= UTIL2
.getMiniHBaseCluster();
125 // This test wants two RS's up. We only run one generally so add one.
126 peerCluster
.startRegionServer();
127 Waiter
.waitFor(peerCluster
.getConfiguration(), 30000, new Waiter
.Predicate
<Exception
>() {
128 @Override public boolean evaluate() throws Exception
{
129 return peerCluster
.getLiveRegionServerThreads().size() > 1;
132 int numRS
= peerCluster
.getRegionServerThreads().size();
134 doPutTest(Bytes
.toBytes(1));
136 int rsToStop
= peerCluster
.getServerWithMeta() == 0 ?
1 : 0;
137 peerCluster
.stopRegionServer(rsToStop
);
138 peerCluster
.waitOnRegionServer(rsToStop
);
141 assertEquals(numRS
- 1, peerCluster
.getRegionServerThreads().size());
143 doPutTest(Bytes
.toBytes(2));
145 peerCluster
.startRegionServer();
148 assertEquals(numRS
, peerCluster
.getRegionServerThreads().size());
150 doPutTest(Bytes
.toBytes(3));
153 private void doPutTest(byte[] row
) throws IOException
, InterruptedException
{
154 Put put
= new Put(row
);
155 put
.addColumn(famName
, row
, row
);
157 if (htable1
== null) {
158 htable1
= UTIL1
.getConnection().getTable(tableName
);
163 Get get
= new Get(row
);
164 for (int i
= 0; i
< NB_RETRIES
; i
++) {
165 if (i
== NB_RETRIES
- 1) {
166 fail("Waited too much time for put replication");
168 Result res
= htable2
.get(get
);
170 LOG
.info("Row not available");
171 Thread
.sleep(SLEEP_TIME
);
173 assertArrayEquals(res
.value(), row
);