2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.*;
22 import java
.io
.IOException
;
23 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
24 import java
.util
.concurrent
.atomic
.AtomicInteger
;
25 import java
.util
.concurrent
.atomic
.AtomicReference
;
26 import org
.apache
.hadoop
.conf
.Configuration
;
27 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
28 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
29 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
30 import org
.apache
.hadoop
.hbase
.HConstants
;
31 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.Waiter
.Predicate
;
34 import org
.apache
.hadoop
.hbase
.client
.Admin
;
35 import org
.apache
.hadoop
.hbase
.client
.Connection
;
36 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
37 import org
.apache
.hadoop
.hbase
.client
.Consistency
;
38 import org
.apache
.hadoop
.hbase
.client
.Get
;
39 import org
.apache
.hadoop
.hbase
.client
.Table
;
40 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.TestRegionReplicaReplicationEndpoint
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
42 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
43 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
44 import org
.apache
.hadoop
.hbase
.util
.ServerRegionReplicaUtil
;
45 import org
.apache
.hadoop
.hbase
.util
.Threads
;
46 import org
.junit
.After
;
47 import org
.junit
.Before
;
48 import org
.junit
.ClassRule
;
49 import org
.junit
.Rule
;
50 import org
.junit
.Test
;
51 import org
.junit
.experimental
.categories
.Category
;
52 import org
.junit
.rules
.TestName
;
53 import org
.slf4j
.Logger
;
54 import org
.slf4j
.LoggerFactory
;
57 * Tests failover of secondary region replicas.
59 @Category(LargeTests
.class)
60 public class TestRegionReplicaFailover
{
63 public static final HBaseClassTestRule CLASS_RULE
=
64 HBaseClassTestRule
.forClass(TestRegionReplicaFailover
.class);
66 private static final Logger LOG
=
67 LoggerFactory
.getLogger(TestRegionReplicaReplicationEndpoint
.class);
69 private static final HBaseTestingUtility HTU
= new HBaseTestingUtility();
71 private static final int NB_SERVERS
= 3;
73 protected final byte[][] families
= new byte[][] {HBaseTestingUtility
.fam1
,
74 HBaseTestingUtility
.fam2
, HBaseTestingUtility
.fam3
};
75 protected final byte[] fam
= HBaseTestingUtility
.fam1
;
76 protected final byte[] qual1
= Bytes
.toBytes("qual1");
77 protected final byte[] value1
= Bytes
.toBytes("value1");
78 protected final byte[] row
= Bytes
.toBytes("rowA");
79 protected final byte[] row2
= Bytes
.toBytes("rowB");
81 @Rule public TestName name
= new TestName();
83 private HTableDescriptor htd
;
86 public void before() throws Exception
{
87 Configuration conf
= HTU
.getConfiguration();
88 // Up the handlers; this test needs more than usual.
89 conf
.setInt(HConstants
.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT
, 10);
90 conf
.setBoolean(ServerRegionReplicaUtil
.REGION_REPLICA_REPLICATION_CONF_KEY
, true);
91 conf
.setBoolean(ServerRegionReplicaUtil
.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY
, true);
92 conf
.setInt("replication.stats.thread.period.seconds", 5);
93 conf
.setBoolean("hbase.tests.use.shortcircuit.reads", false);
95 HTU
.startMiniCluster(NB_SERVERS
);
96 htd
= HTU
.createTableDescriptor(
97 TableName
.valueOf(name
.getMethodName().substring(0, name
.getMethodName().length()-3)),
98 HColumnDescriptor
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
99 HColumnDescriptor
.DEFAULT_KEEP_DELETED
);
100 htd
.setRegionReplication(3);
101 HTU
.getAdmin().createTable(htd
);
105 public void after() throws Exception
{
106 HTU
.deleteTableIfAny(htd
.getTableName());
107 HTU
.shutdownMiniCluster();
111 * Tests the case where a newly created table with region replicas and no data, the secondary
112 * region replicas are available to read immediately.
115 public void testSecondaryRegionWithEmptyRegion() throws IOException
{
116 // Create a new table with region replication, don't put any data. Test that the secondary
117 // region replica is available to read.
118 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
119 Table table
= connection
.getTable(htd
.getTableName())) {
121 Get get
= new Get(row
);
122 get
.setConsistency(Consistency
.TIMELINE
);
124 table
.get(get
); // this should not block
129 * Tests the case where if there is some data in the primary region, reopening the region replicas
130 * (enable/disable table, etc) makes the region replicas readable.
131 * @throws IOException
134 public void testSecondaryRegionWithNonEmptyRegion() throws IOException
{
135 // Create a new table with region replication and load some data
136 // than disable and enable the table again and verify the data from secondary
137 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
138 Table table
= connection
.getTable(htd
.getTableName())) {
140 HTU
.loadNumericRows(table
, fam
, 0, 1000);
142 HTU
.getAdmin().disableTable(htd
.getTableName());
143 HTU
.getAdmin().enableTable(htd
.getTableName());
145 HTU
.verifyNumericRows(table
, fam
, 0, 1000, 1);
150 * Tests the case where killing a primary region with unflushed data recovers
153 public void testPrimaryRegionKill() throws Exception
{
154 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
155 Table table
= connection
.getTable(htd
.getTableName())) {
157 HTU
.loadNumericRows(table
, fam
, 0, 1000);
159 // wal replication is async, we have to wait until the replication catches up, or we timeout
160 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 1, 30000);
161 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 2, 30000);
163 // we should not have flushed files now, but data in memstores of primary and secondary
164 // kill the primary region replica now, and ensure that when it comes back up, we can still
165 // read from it the same data from primary and secondaries
166 boolean aborted
= false;
167 for (RegionServerThread rs
: HTU
.getMiniHBaseCluster().getRegionServerThreads()) {
168 for (Region r
: rs
.getRegionServer().getRegions(htd
.getTableName())) {
169 if (r
.getRegionInfo().getReplicaId() == 0) {
170 LOG
.info("Aborting region server hosting primary region replica");
171 rs
.getRegionServer().abort("for test");
179 // wal replication is async, we have to wait until the replication catches up, or we timeout
180 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 0, 30000);
181 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 1, 30000);
182 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 2, 30000);
185 // restart the region server
186 HTU
.getMiniHBaseCluster().startRegionServer();
189 /** wal replication is async, we have to wait until the replication catches up, or we timeout
191 private void verifyNumericRowsWithTimeout(final Table table
, final byte[] f
, final int startRow
,
192 final int endRow
, final int replicaId
, final long timeout
) throws Exception
{
194 HTU
.waitFor(timeout
, new Predicate
<Exception
>() {
196 public boolean evaluate() throws Exception
{
198 HTU
.verifyNumericRows(table
, f
, startRow
, endRow
, replicaId
);
200 } catch (AssertionError ae
) {
205 } catch (Throwable t
) {
206 // ignore this, but redo the verify do get the actual exception
207 HTU
.verifyNumericRows(table
, f
, startRow
, endRow
, replicaId
);
212 * Tests the case where killing a secondary region with unflushed data recovers, and the replica
213 * becomes available to read again shortly.
216 public void testSecondaryRegionKill() throws Exception
{
217 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
218 Table table
= connection
.getTable(htd
.getTableName())) {
219 HTU
.loadNumericRows(table
, fam
, 0, 1000);
221 // wait for some time to ensure that async wal replication does it's magic
222 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 1, 30000);
223 verifyNumericRowsWithTimeout(table
, fam
, 0, 1000, 2, 30000);
225 // we should not have flushed files now, but data in memstores of primary and secondary
226 // kill the secondary region replica now, and ensure that when it comes back up, we can still
227 // read from it the same data
228 boolean aborted
= false;
229 for (RegionServerThread rs
: HTU
.getMiniHBaseCluster().getRegionServerThreads()) {
230 for (Region r
: rs
.getRegionServer().getRegions(htd
.getTableName())) {
231 if (r
.getRegionInfo().getReplicaId() == 1) {
232 LOG
.info("Aborting region server hosting secondary region replica");
233 rs
.getRegionServer().abort("for test");
243 HTU
.verifyNumericRows(table
, fam
, 0, 1000, 1);
244 HTU
.verifyNumericRows(table
, fam
, 0, 1000, 2);
247 // restart the region server
248 HTU
.getMiniHBaseCluster().startRegionServer();
252 * Tests the case where there are 3 region replicas and the primary is continuously accepting
253 * new writes while one of the secondaries is killed. Verification is done for both of the
254 * secondary replicas.
257 public void testSecondaryRegionKillWhilePrimaryIsAcceptingWrites() throws Exception
{
258 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
259 Table table
= connection
.getTable(htd
.getTableName());
260 Admin admin
= connection
.getAdmin()) {
261 // start a thread to do the loading of primary
262 HTU
.loadNumericRows(table
, fam
, 0, 1000); // start with some base
263 admin
.flush(table
.getName());
264 HTU
.loadNumericRows(table
, fam
, 1000, 2000);
266 final AtomicReference
<Throwable
> ex
= new AtomicReference
<>(null);
267 final AtomicBoolean done
= new AtomicBoolean(false);
268 final AtomicInteger key
= new AtomicInteger(2000);
270 Thread loader
= new Thread() {
273 while (!done
.get()) {
275 HTU
.loadNumericRows(table
, fam
, key
.get(), key
.get()+1000);
277 } catch (Throwable e
) {
278 ex
.compareAndSet(null, e
);
285 Thread aborter
= new Thread() {
289 boolean aborted
= false;
290 for (RegionServerThread rs
: HTU
.getMiniHBaseCluster().getRegionServerThreads()) {
291 for (Region r
: rs
.getRegionServer().getRegions(htd
.getTableName())) {
292 if (r
.getRegionInfo().getReplicaId() == 1) {
293 LOG
.info("Aborting region server hosting secondary region replica");
294 rs
.getRegionServer().abort("for test");
300 } catch (Throwable e
) {
301 ex
.compareAndSet(null, e
);
311 assertNull(ex
.get());
313 assertTrue(key
.get() > 1000); // assert that the test is working as designed
314 LOG
.info("Loaded up to key :" + key
.get());
315 verifyNumericRowsWithTimeout(table
, fam
, 0, key
.get(), 0, 30000);
316 verifyNumericRowsWithTimeout(table
, fam
, 0, key
.get(), 1, 30000);
317 verifyNumericRowsWithTimeout(table
, fam
, 0, key
.get(), 2, 30000);
320 // restart the region server
321 HTU
.getMiniHBaseCluster().startRegionServer();
325 * Tests the case where we are creating a table with a lot of regions and replicas. Opening region
326 * replicas should not block handlers on RS indefinitely.
329 public void testLotsOfRegionReplicas() throws IOException
{
330 int numRegions
= NB_SERVERS
* 20;
331 int regionReplication
= 10;
332 String tableName
= htd
.getTableName().getNameAsString() + "2";
333 htd
= HTU
.createTableDescriptor(TableName
.valueOf(tableName
),
334 HColumnDescriptor
.DEFAULT_MIN_VERSIONS
, 3, HConstants
.FOREVER
,
335 HColumnDescriptor
.DEFAULT_KEEP_DELETED
);
336 htd
.setRegionReplication(regionReplication
);
338 // dont care about splits themselves too much
339 byte[] startKey
= Bytes
.toBytes("aaa");
340 byte[] endKey
= Bytes
.toBytes("zzz");
341 byte[][] splits
= HTU
.getRegionSplitStartKeys(startKey
, endKey
, numRegions
);
342 HTU
.getAdmin().createTable(htd
, startKey
, endKey
, numRegions
);
344 try (Connection connection
= ConnectionFactory
.createConnection(HTU
.getConfiguration());
345 Table table
= connection
.getTable(htd
.getTableName())) {
347 for (int i
= 1; i
< splits
.length
; i
++) {
348 for (int j
= 0; j
< regionReplication
; j
++) {
349 Get get
= new Get(splits
[i
]);
350 get
.setConsistency(Consistency
.TIMELINE
);
352 table
.get(get
); // this should not block. Regions should be coming online
357 HTU
.deleteTableIfAny(TableName
.valueOf(tableName
));