2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.fail
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.List
;
27 import org
.apache
.commons
.io
.IOUtils
;
28 import org
.apache
.hadoop
.conf
.Configuration
;
29 import org
.apache
.hadoop
.fs
.FileSystem
;
30 import org
.apache
.hadoop
.fs
.Path
;
31 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
32 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
33 import org
.apache
.hadoop
.hbase
.HConstants
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.client
.Admin
;
36 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
37 import org
.apache
.hadoop
.hbase
.client
.Connection
;
38 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
39 import org
.apache
.hadoop
.hbase
.client
.Delete
;
40 import org
.apache
.hadoop
.hbase
.client
.Get
;
41 import org
.apache
.hadoop
.hbase
.client
.Put
;
42 import org
.apache
.hadoop
.hbase
.client
.Result
;
43 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
44 import org
.apache
.hadoop
.hbase
.client
.Scan
;
45 import org
.apache
.hadoop
.hbase
.client
.Table
;
46 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
47 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
50 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
51 import org
.junit
.After
;
52 import org
.junit
.AfterClass
;
53 import org
.junit
.Before
;
54 import org
.junit
.BeforeClass
;
55 import org
.slf4j
.Logger
;
56 import org
.slf4j
.LoggerFactory
;
58 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableList
;
59 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
62 * This class is only a base for other integration-level replication tests.
63 * Do not add tests here.
64 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
65 * All other tests should have their own classes and extend this one
67 public class TestReplicationBase
{
68 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationBase
.class);
70 protected static Configuration CONF_WITH_LOCALFS
;
72 protected static Admin hbaseAdmin
;
74 protected static Table htable1
;
75 protected static Table htable2
;
77 protected static final HBaseTestingUtility UTIL1
= new HBaseTestingUtility();
78 protected static final HBaseTestingUtility UTIL2
= new HBaseTestingUtility();
79 protected static Configuration CONF1
= UTIL1
.getConfiguration();
80 protected static Configuration CONF2
= UTIL2
.getConfiguration();
82 protected static final int NUM_SLAVES1
= 1;
83 protected static final int NUM_SLAVES2
= 1;
84 protected static final int NB_ROWS_IN_BATCH
= 100;
85 protected static final int NB_ROWS_IN_BIG_BATCH
=
86 NB_ROWS_IN_BATCH
* 10;
87 protected static final long SLEEP_TIME
= 500;
88 protected static final int NB_RETRIES
= 50;
90 protected static final TableName tableName
= TableName
.valueOf("test");
91 protected static final byte[] famName
= Bytes
.toBytes("f");
92 protected static final byte[] row
= Bytes
.toBytes("row");
93 protected static final byte[] noRepfamName
= Bytes
.toBytes("norep");
94 protected static final String PEER_ID2
= "2";
96 protected boolean isSerialPeer() {
100 protected boolean isSyncPeer() {
104 protected final void cleanUp() throws IOException
, InterruptedException
{
105 // Starting and stopping replication can make us miss new logs,
106 // rolling like this makes sure the most recent one gets added to the queue
107 for (JVMClusterUtil
.RegionServerThread r
: UTIL1
.getHBaseCluster()
108 .getRegionServerThreads()) {
109 UTIL1
.getAdmin().rollWALWriter(r
.getRegionServer().getServerName());
111 int rowCount
= UTIL1
.countRows(tableName
);
112 UTIL1
.deleteTableData(tableName
);
113 // truncating the table will send one Delete per row to the slave cluster
114 // in an async fashion, which is why we cannot just call deleteTableData on
115 // utility2 since late writes could make it to the slave in some way.
116 // Instead, we truncate the first table and wait for all the Deletes to
117 // make it to the slave.
118 Scan scan
= new Scan();
120 for (int i
= 0; i
< NB_RETRIES
; i
++) {
121 if (i
== NB_RETRIES
- 1) {
122 fail("Waited too much time for truncate");
124 ResultScanner scanner
= htable2
.getScanner(scan
);
125 Result
[] res
= scanner
.next(rowCount
);
127 if (res
.length
!= 0) {
128 if (res
.length
< lastCount
) {
129 i
--; // Don't increment timeout if we make progress
131 lastCount
= res
.length
;
132 LOG
.info("Still got " + res
.length
+ " rows");
133 Thread
.sleep(SLEEP_TIME
);
140 protected static void waitForReplication(int expectedRows
, int retries
)
141 throws IOException
, InterruptedException
{
142 waitForReplication(htable2
, expectedRows
, retries
);
145 protected static void waitForReplication(Table htable2
, int expectedRows
, int retries
)
146 throws IOException
, InterruptedException
{
148 for (int i
= 0; i
< retries
; i
++) {
150 if (i
== retries
-1) {
151 fail("Waited too much time for normal batch replication");
153 ResultScanner scanner
= htable2
.getScanner(scan
);
154 Result
[] res
= scanner
.next(expectedRows
);
156 if (res
.length
!= expectedRows
) {
157 LOG
.info("Only got " + res
.length
+ " rows");
158 Thread
.sleep(SLEEP_TIME
);
165 protected static void loadData(String prefix
, byte[] row
) throws IOException
{
166 loadData(prefix
, row
, famName
);
169 protected static void loadData(String prefix
, byte[] row
, byte[] familyName
) throws IOException
{
170 List
<Put
> puts
= new ArrayList
<>(NB_ROWS_IN_BATCH
);
171 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
172 Put put
= new Put(Bytes
.toBytes(prefix
+ Integer
.toString(i
)));
173 put
.addColumn(familyName
, row
, row
);
179 protected static void setupConfig(HBaseTestingUtility util
, String znodeParent
) {
180 Configuration conf
= util
.getConfiguration();
181 conf
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, znodeParent
);
182 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
183 // sufficient number of events. But we don't want to go too low because
184 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
185 // more than one batch sent to the peer cluster for better testing.
186 conf
.setInt("replication.source.size.capacity", 102400);
187 conf
.setLong("replication.source.sleepforretries", 100);
188 conf
.setInt("hbase.regionserver.maxlogs", 10);
189 conf
.setLong("hbase.master.logcleaner.ttl", 10);
190 conf
.setInt("zookeeper.recovery.retry", 1);
191 conf
.setInt("zookeeper.recovery.retry.intervalmill", 10);
192 conf
.setLong(HConstants
.THREAD_WAKE_FREQUENCY
, 100);
193 conf
.setInt("replication.stats.thread.period.seconds", 5);
194 conf
.setBoolean("hbase.tests.use.shortcircuit.reads", false);
195 conf
.setLong("replication.sleep.before.failover", 2000);
196 conf
.setInt("replication.source.maxretriesmultiplier", 10);
197 conf
.setFloat("replication.source.ratio", 1.0f
);
198 conf
.setBoolean("replication.source.eof.autorecovery", true);
199 conf
.setLong("hbase.serial.replication.waiting.ms", 100);
202 static void configureClusters(HBaseTestingUtility util1
,
203 HBaseTestingUtility util2
) {
204 setupConfig(util1
, "/1");
205 setupConfig(util2
, "/2");
207 Configuration conf2
= util2
.getConfiguration();
208 conf2
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/2");
209 conf2
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 6);
210 conf2
.setBoolean("hbase.tests.use.shortcircuit.reads", false);
213 static void restartSourceCluster(int numSlaves
)
215 IOUtils
.closeQuietly(hbaseAdmin
, htable1
);
216 UTIL1
.shutdownMiniHBaseCluster();
217 UTIL1
.restartHBaseCluster(numSlaves
);
218 // Invalidate the cached connection state.
219 CONF1
= UTIL1
.getConfiguration();
220 hbaseAdmin
= UTIL1
.getAdmin();
221 Connection connection1
= UTIL1
.getConnection();
222 htable1
= connection1
.getTable(tableName
);
225 static void restartTargetHBaseCluster(int numSlaves
) throws Exception
{
226 IOUtils
.closeQuietly(htable2
);
227 UTIL2
.restartHBaseCluster(numSlaves
);
228 // Invalidate the cached connection state
229 CONF2
= UTIL2
.getConfiguration();
230 htable2
= UTIL2
.getConnection().getTable(tableName
);
233 private static void startClusters() throws Exception
{
234 UTIL1
.startMiniZKCluster();
235 MiniZooKeeperCluster miniZK
= UTIL1
.getZkCluster();
236 LOG
.info("Setup first Zk");
238 UTIL2
.setZkCluster(miniZK
);
239 LOG
.info("Setup second Zk");
241 CONF_WITH_LOCALFS
= HBaseConfiguration
.create(CONF1
);
242 UTIL1
.startMiniCluster(NUM_SLAVES1
);
243 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
244 // as a component in deciding maximum number of parallel batches to send to the peer cluster.
245 UTIL2
.startMiniCluster(NUM_SLAVES2
);
247 hbaseAdmin
= ConnectionFactory
.createConnection(CONF1
).getAdmin();
249 TableDescriptor table
= TableDescriptorBuilder
.newBuilder(tableName
)
250 .setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(famName
).setMaxVersions(100)
251 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
252 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
254 Connection connection1
= ConnectionFactory
.createConnection(CONF1
);
255 Connection connection2
= ConnectionFactory
.createConnection(CONF2
);
256 try (Admin admin1
= connection1
.getAdmin()) {
257 admin1
.createTable(table
, HBaseTestingUtility
.KEYS_FOR_HBA_CREATE_TABLE
);
259 try (Admin admin2
= connection2
.getAdmin()) {
260 admin2
.createTable(table
, HBaseTestingUtility
.KEYS_FOR_HBA_CREATE_TABLE
);
262 UTIL1
.waitUntilAllRegionsAssigned(tableName
);
263 UTIL2
.waitUntilAllRegionsAssigned(tableName
);
264 htable1
= connection1
.getTable(tableName
);
265 htable2
= connection2
.getTable(tableName
);
269 public static void setUpBeforeClass() throws Exception
{
270 configureClusters(UTIL1
, UTIL2
);
274 private boolean peerExist(String peerId
) throws IOException
{
275 return hbaseAdmin
.listReplicationPeers().stream().anyMatch(p
-> peerId
.equals(p
.getPeerId()));
279 public void setUpBase() throws Exception
{
280 if (!peerExist(PEER_ID2
)) {
281 ReplicationPeerConfigBuilder builder
= ReplicationPeerConfig
.newBuilder()
282 .setClusterKey(UTIL2
.getClusterKey()).setSerial(isSerialPeer());
284 FileSystem fs2
= UTIL2
.getTestFileSystem();
285 // The remote wal dir is not important as we do not use it in DA state, here we only need to
286 // confirm that a sync peer in DA state can still replicate data to remote cluster
288 builder
.setReplicateAllUserTables(false)
289 .setTableCFsMap(ImmutableMap
.of(tableName
, ImmutableList
.of()))
290 .setRemoteWALDir(new Path("/RemoteWAL")
291 .makeQualified(fs2
.getUri(), fs2
.getWorkingDirectory()).toUri().toString());
293 hbaseAdmin
.addReplicationPeer(PEER_ID2
, builder
.build());
298 public void tearDownBase() throws Exception
{
299 if (peerExist(PEER_ID2
)) {
300 hbaseAdmin
.removeReplicationPeer(PEER_ID2
);
304 protected static void runSimplePutDeleteTest() throws IOException
, InterruptedException
{
305 Put put
= new Put(row
);
306 put
.addColumn(famName
, row
, row
);
308 htable1
= UTIL1
.getConnection().getTable(tableName
);
311 Get get
= new Get(row
);
312 for (int i
= 0; i
< NB_RETRIES
; i
++) {
313 if (i
== NB_RETRIES
- 1) {
314 fail("Waited too much time for put replication");
316 Result res
= htable2
.get(get
);
318 LOG
.info("Row not available");
319 Thread
.sleep(SLEEP_TIME
);
321 assertArrayEquals(row
, res
.value());
326 Delete del
= new Delete(row
);
330 for (int i
= 0; i
< NB_RETRIES
; i
++) {
331 if (i
== NB_RETRIES
- 1) {
332 fail("Waited too much time for del replication");
334 Result res
= htable2
.get(get
);
335 if (res
.size() >= 1) {
336 LOG
.info("Row not deleted");
337 Thread
.sleep(SLEEP_TIME
);
344 protected static void runSmallBatchTest() throws IOException
, InterruptedException
{
345 // normal Batch tests
348 Scan scan
= new Scan();
350 ResultScanner scanner1
= htable1
.getScanner(scan
);
351 Result
[] res1
= scanner1
.next(NB_ROWS_IN_BATCH
);
353 assertEquals(NB_ROWS_IN_BATCH
, res1
.length
);
355 waitForReplication(NB_ROWS_IN_BATCH
, NB_RETRIES
);
359 public static void tearDownAfterClass() throws Exception
{
360 if (htable2
!= null) {
363 if (htable1
!= null) {
366 if (hbaseAdmin
!= null) {
369 UTIL2
.shutdownMiniCluster();
370 UTIL1
.shutdownMiniCluster();