2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.fail
;
23 import java
.io
.IOException
;
24 import java
.util
.ArrayList
;
25 import java
.util
.List
;
26 import java
.util
.concurrent
.atomic
.AtomicInteger
;
27 import org
.apache
.hadoop
.conf
.Configuration
;
28 import org
.apache
.hadoop
.fs
.FileSystem
;
29 import org
.apache
.hadoop
.fs
.Path
;
30 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.TableName
;
34 import org
.apache
.hadoop
.hbase
.client
.Admin
;
35 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
36 import org
.apache
.hadoop
.hbase
.client
.Connection
;
37 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
38 import org
.apache
.hadoop
.hbase
.client
.Delete
;
39 import org
.apache
.hadoop
.hbase
.client
.Get
;
40 import org
.apache
.hadoop
.hbase
.client
.Put
;
41 import org
.apache
.hadoop
.hbase
.client
.Result
;
42 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
43 import org
.apache
.hadoop
.hbase
.client
.Scan
;
44 import org
.apache
.hadoop
.hbase
.client
.Table
;
45 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
46 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
47 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.HBaseInterClusterReplicationEndpoint
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
50 import org
.apache
.hadoop
.hbase
.wal
.WAL
;
51 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
52 import org
.junit
.After
;
53 import org
.junit
.AfterClass
;
54 import org
.junit
.Before
;
55 import org
.junit
.BeforeClass
;
56 import org
.slf4j
.Logger
;
57 import org
.slf4j
.LoggerFactory
;
58 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableList
;
59 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.ImmutableMap
;
60 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
61 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.io
.Closeables
;
64 * This class is only a base for other integration-level replication tests.
65 * Do not add tests here.
66 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
67 * All other tests should have their own classes and extend this one
69 public class TestReplicationBase
{
70 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationBase
.class);
71 private static Connection connection1
;
72 private static Connection connection2
;
73 protected static Configuration CONF_WITH_LOCALFS
;
75 protected static Admin hbaseAdmin
;
77 protected static Table htable1
;
78 protected static Table htable2
;
80 protected static final HBaseTestingUtility UTIL1
= new HBaseTestingUtility();
81 protected static final HBaseTestingUtility UTIL2
= new HBaseTestingUtility();
82 protected static Configuration CONF1
= UTIL1
.getConfiguration();
83 protected static Configuration CONF2
= UTIL2
.getConfiguration();
85 protected static int NUM_SLAVES1
= 1;
86 protected static int NUM_SLAVES2
= 1;
87 protected static final int NB_ROWS_IN_BATCH
= 100;
88 protected static final int NB_ROWS_IN_BIG_BATCH
=
89 NB_ROWS_IN_BATCH
* 10;
90 protected static final long SLEEP_TIME
= 500;
91 protected static final int NB_RETRIES
= 50;
92 protected static AtomicInteger replicateCount
= new AtomicInteger();
93 protected static volatile List
<WAL
.Entry
> replicatedEntries
= Lists
.newArrayList();
95 protected static final TableName tableName
= TableName
.valueOf("test");
96 protected static final byte[] famName
= Bytes
.toBytes("f");
97 protected static final byte[] row
= Bytes
.toBytes("row");
98 protected static final byte[] noRepfamName
= Bytes
.toBytes("norep");
99 protected static final String PEER_ID2
= "2";
101 protected boolean isSerialPeer() {
105 protected boolean isSyncPeer() {
109 protected final void cleanUp() throws IOException
, InterruptedException
{
110 // Starting and stopping replication can make us miss new logs,
111 // rolling like this makes sure the most recent one gets added to the queue
112 for (JVMClusterUtil
.RegionServerThread r
: UTIL1
.getHBaseCluster()
113 .getRegionServerThreads()) {
114 UTIL1
.getAdmin().rollWALWriter(r
.getRegionServer().getServerName());
116 int rowCount
= UTIL1
.countRows(tableName
);
117 UTIL1
.deleteTableData(tableName
);
118 // truncating the table will send one Delete per row to the slave cluster
119 // in an async fashion, which is why we cannot just call deleteTableData on
120 // utility2 since late writes could make it to the slave in some way.
121 // Instead, we truncate the first table and wait for all the Deletes to
122 // make it to the slave.
123 Scan scan
= new Scan();
125 for (int i
= 0; i
< NB_RETRIES
; i
++) {
126 if (i
== NB_RETRIES
- 1) {
127 fail("Waited too much time for truncate");
129 ResultScanner scanner
= htable2
.getScanner(scan
);
130 Result
[] res
= scanner
.next(rowCount
);
132 if (res
.length
!= 0) {
133 if (res
.length
< lastCount
) {
134 i
--; // Don't increment timeout if we make progress
136 lastCount
= res
.length
;
137 LOG
.info("Still got " + res
.length
+ " rows");
138 Thread
.sleep(SLEEP_TIME
);
145 protected static void waitForReplication(int expectedRows
, int retries
)
146 throws IOException
, InterruptedException
{
147 waitForReplication(htable2
, expectedRows
, retries
);
150 protected static void waitForReplication(Table htable2
, int expectedRows
, int retries
)
151 throws IOException
, InterruptedException
{
153 for (int i
= 0; i
< retries
; i
++) {
155 if (i
== retries
-1) {
156 fail("Waited too much time for normal batch replication");
158 ResultScanner scanner
= htable2
.getScanner(scan
);
159 Result
[] res
= scanner
.next(expectedRows
);
161 if (res
.length
!= expectedRows
) {
162 LOG
.info("Only got " + res
.length
+ " rows");
163 Thread
.sleep(SLEEP_TIME
);
170 protected static void loadData(String prefix
, byte[] row
) throws IOException
{
171 loadData(prefix
, row
, famName
);
174 protected static void loadData(String prefix
, byte[] row
, byte[] familyName
) throws IOException
{
175 List
<Put
> puts
= new ArrayList
<>(NB_ROWS_IN_BATCH
);
176 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
177 Put put
= new Put(Bytes
.toBytes(prefix
+ Integer
.toString(i
)));
178 put
.addColumn(familyName
, row
, row
);
184 protected static void setupConfig(HBaseTestingUtility util
, String znodeParent
) {
185 Configuration conf
= util
.getConfiguration();
186 conf
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, znodeParent
);
187 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
188 // sufficient number of events. But we don't want to go too low because
189 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
190 // more than one batch sent to the peer cluster for better testing.
191 conf
.setInt("replication.source.size.capacity", 102400);
192 conf
.setLong("replication.source.sleepforretries", 100);
193 conf
.setInt("hbase.regionserver.maxlogs", 10);
194 conf
.setLong("hbase.master.logcleaner.ttl", 10);
195 conf
.setInt("zookeeper.recovery.retry", 1);
196 conf
.setInt("zookeeper.recovery.retry.intervalmill", 10);
197 conf
.setLong(HConstants
.THREAD_WAKE_FREQUENCY
, 100);
198 conf
.setInt("replication.stats.thread.period.seconds", 5);
199 conf
.setBoolean("hbase.tests.use.shortcircuit.reads", false);
200 conf
.setLong("replication.sleep.before.failover", 2000);
201 conf
.setInt("replication.source.maxretriesmultiplier", 10);
202 conf
.setFloat("replication.source.ratio", 1.0f
);
203 conf
.setBoolean("replication.source.eof.autorecovery", true);
204 conf
.setLong("hbase.serial.replication.waiting.ms", 100);
207 static void configureClusters(HBaseTestingUtility util1
,
208 HBaseTestingUtility util2
) {
209 setupConfig(util1
, "/1");
210 setupConfig(util2
, "/2");
212 Configuration conf2
= util2
.getConfiguration();
213 conf2
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/2");
214 conf2
.setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 6);
215 conf2
.setBoolean("hbase.tests.use.shortcircuit.reads", false);
218 static void restartSourceCluster(int numSlaves
) throws Exception
{
219 Closeables
.close(hbaseAdmin
, true);
220 Closeables
.close(htable1
, true);
221 UTIL1
.shutdownMiniHBaseCluster();
222 UTIL1
.restartHBaseCluster(numSlaves
);
223 // Invalidate the cached connection state.
224 CONF1
= UTIL1
.getConfiguration();
225 hbaseAdmin
= UTIL1
.getAdmin();
226 Connection connection1
= UTIL1
.getConnection();
227 htable1
= connection1
.getTable(tableName
);
230 static void restartTargetHBaseCluster(int numSlaves
) throws Exception
{
231 Closeables
.close(htable2
, true);
232 UTIL2
.restartHBaseCluster(numSlaves
);
233 // Invalidate the cached connection state
234 CONF2
= UTIL2
.getConfiguration();
235 htable2
= UTIL2
.getConnection().getTable(tableName
);
238 private static void startClusters() throws Exception
{
239 UTIL1
.startMiniZKCluster();
240 MiniZooKeeperCluster miniZK
= UTIL1
.getZkCluster();
241 LOG
.info("Setup first Zk");
243 UTIL2
.setZkCluster(miniZK
);
244 LOG
.info("Setup second Zk");
246 CONF_WITH_LOCALFS
= HBaseConfiguration
.create(CONF1
);
247 UTIL1
.startMiniCluster(NUM_SLAVES1
);
248 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
249 // as a component in deciding maximum number of parallel batches to send to the peer cluster.
250 UTIL2
.startMiniCluster(NUM_SLAVES2
);
252 connection1
= ConnectionFactory
.createConnection(CONF1
);
253 connection2
= ConnectionFactory
.createConnection(CONF2
);
254 hbaseAdmin
= connection1
.getAdmin();
256 TableDescriptor table
= TableDescriptorBuilder
.newBuilder(tableName
)
257 .setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(famName
).setMaxVersions(100)
258 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
259 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
262 Admin admin1
= connection1
.getAdmin();
263 Admin admin2
= connection2
.getAdmin()) {
264 admin1
.createTable(table
, HBaseTestingUtility
.KEYS_FOR_HBA_CREATE_TABLE
);
265 admin2
.createTable(table
, HBaseTestingUtility
.KEYS_FOR_HBA_CREATE_TABLE
);
266 UTIL1
.waitUntilAllRegionsAssigned(tableName
);
267 htable1
= connection1
.getTable(tableName
);
268 UTIL2
.waitUntilAllRegionsAssigned(tableName
);
269 htable2
= connection2
.getTable(tableName
);
275 public static void setUpBeforeClass() throws Exception
{
276 configureClusters(UTIL1
, UTIL2
);
280 private boolean peerExist(String peerId
) throws IOException
{
281 return hbaseAdmin
.listReplicationPeers().stream().anyMatch(p
-> peerId
.equals(p
.getPeerId()));
285 public void setUpBase() throws Exception
{
286 if (!peerExist(PEER_ID2
)) {
287 ReplicationPeerConfigBuilder builder
= ReplicationPeerConfig
.newBuilder()
288 .setClusterKey(UTIL2
.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
289 ReplicationEndpointTest
.class.getName());
291 FileSystem fs2
= UTIL2
.getTestFileSystem();
292 // The remote wal dir is not important as we do not use it in DA state, here we only need to
293 // confirm that a sync peer in DA state can still replicate data to remote cluster
295 builder
.setReplicateAllUserTables(false)
296 .setTableCFsMap(ImmutableMap
.of(tableName
, ImmutableList
.of()))
297 .setRemoteWALDir(new Path("/RemoteWAL")
298 .makeQualified(fs2
.getUri(), fs2
.getWorkingDirectory()).toUri().toString());
300 hbaseAdmin
.addReplicationPeer(PEER_ID2
, builder
.build());
305 public void tearDownBase() throws Exception
{
306 if (peerExist(PEER_ID2
)) {
307 hbaseAdmin
.removeReplicationPeer(PEER_ID2
);
311 protected static void runSimplePutDeleteTest() throws IOException
, InterruptedException
{
312 Put put
= new Put(row
);
313 put
.addColumn(famName
, row
, row
);
315 htable1
= UTIL1
.getConnection().getTable(tableName
);
318 Get get
= new Get(row
);
319 for (int i
= 0; i
< NB_RETRIES
; i
++) {
320 if (i
== NB_RETRIES
- 1) {
321 fail("Waited too much time for put replication");
323 Result res
= htable2
.get(get
);
325 LOG
.info("Row not available");
326 Thread
.sleep(SLEEP_TIME
);
328 assertArrayEquals(row
, res
.value());
333 Delete del
= new Delete(row
);
337 for (int i
= 0; i
< NB_RETRIES
; i
++) {
338 if (i
== NB_RETRIES
- 1) {
339 fail("Waited too much time for del replication");
341 Result res
= htable2
.get(get
);
342 if (res
.size() >= 1) {
343 LOG
.info("Row not deleted");
344 Thread
.sleep(SLEEP_TIME
);
351 protected static void runSmallBatchTest() throws IOException
, InterruptedException
{
352 // normal Batch tests
355 Scan scan
= new Scan();
357 ResultScanner scanner1
= htable1
.getScanner(scan
);
358 Result
[] res1
= scanner1
.next(NB_ROWS_IN_BATCH
);
360 assertEquals(NB_ROWS_IN_BATCH
, res1
.length
);
362 waitForReplication(NB_ROWS_IN_BATCH
, NB_RETRIES
);
366 public static void tearDownAfterClass() throws Exception
{
367 if (htable2
!= null) {
370 if (htable1
!= null) {
373 if (hbaseAdmin
!= null) {
377 if (connection2
!= null) {
380 if (connection1
!= null) {
383 UTIL2
.shutdownMiniCluster();
384 UTIL1
.shutdownMiniCluster();
388 * Custom replication endpoint to keep track of replication status for tests.
390 public static class ReplicationEndpointTest
extends HBaseInterClusterReplicationEndpoint
{
391 public ReplicationEndpointTest() {
392 replicateCount
.set(0);
395 @Override public boolean replicate(ReplicateContext replicateContext
) {
396 replicateCount
.incrementAndGet();
397 replicatedEntries
.addAll(replicateContext
.getEntries());
399 return super.replicate(replicateContext
);