HBASE-26039 TestReplicationKillRS is useless after HBASE-23956 (#3440)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestReplicationBase.java
blob967ab75863cd2525b39f7674fbe01dde115fccc2
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import static org.junit.Assert.assertArrayEquals;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.fail;
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.FileSystem;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hbase.HBaseConfiguration;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.TableName;
34 import org.apache.hadoop.hbase.client.Admin;
35 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
36 import org.apache.hadoop.hbase.client.Connection;
37 import org.apache.hadoop.hbase.client.ConnectionFactory;
38 import org.apache.hadoop.hbase.client.Delete;
39 import org.apache.hadoop.hbase.client.Get;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.Scan;
44 import org.apache.hadoop.hbase.client.Table;
45 import org.apache.hadoop.hbase.client.TableDescriptor;
46 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
47 import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.JVMClusterUtil;
50 import org.apache.hadoop.hbase.wal.WAL;
51 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
52 import org.junit.After;
53 import org.junit.AfterClass;
54 import org.junit.Before;
55 import org.junit.BeforeClass;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
59 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
60 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
61 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
63 /**
64 * This class is only a base for other integration-level replication tests.
65 * Do not add tests here.
66 * TestReplicationSmallTests is where tests that don't require bring machines up/down should go
67 * All other tests should have their own classes and extend this one
69 public class TestReplicationBase {
70 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
71 private static Connection connection1;
72 private static Connection connection2;
73 protected static Configuration CONF_WITH_LOCALFS;
75 protected static Admin hbaseAdmin;
77 protected static Table htable1;
78 protected static Table htable2;
80 protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
81 protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
82 protected static Configuration CONF1 = UTIL1.getConfiguration();
83 protected static Configuration CONF2 = UTIL2.getConfiguration();
85 protected static int NUM_SLAVES1 = 1;
86 protected static int NUM_SLAVES2 = 1;
87 protected static final int NB_ROWS_IN_BATCH = 100;
88 protected static final int NB_ROWS_IN_BIG_BATCH =
89 NB_ROWS_IN_BATCH * 10;
90 protected static final long SLEEP_TIME = 500;
91 protected static final int NB_RETRIES = 50;
92 protected static AtomicInteger replicateCount = new AtomicInteger();
93 protected static volatile List<WAL.Entry> replicatedEntries = Lists.newArrayList();
95 protected static final TableName tableName = TableName.valueOf("test");
96 protected static final byte[] famName = Bytes.toBytes("f");
97 protected static final byte[] row = Bytes.toBytes("row");
98 protected static final byte[] noRepfamName = Bytes.toBytes("norep");
99 protected static final String PEER_ID2 = "2";
101 protected boolean isSerialPeer() {
102 return false;
105 protected boolean isSyncPeer() {
106 return false;
109 protected final void cleanUp() throws IOException, InterruptedException {
110 // Starting and stopping replication can make us miss new logs,
111 // rolling like this makes sure the most recent one gets added to the queue
112 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
113 .getRegionServerThreads()) {
114 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
116 int rowCount = UTIL1.countRows(tableName);
117 UTIL1.deleteTableData(tableName);
118 // truncating the table will send one Delete per row to the slave cluster
119 // in an async fashion, which is why we cannot just call deleteTableData on
120 // utility2 since late writes could make it to the slave in some way.
121 // Instead, we truncate the first table and wait for all the Deletes to
122 // make it to the slave.
123 Scan scan = new Scan();
124 int lastCount = 0;
125 for (int i = 0; i < NB_RETRIES; i++) {
126 if (i == NB_RETRIES - 1) {
127 fail("Waited too much time for truncate");
129 ResultScanner scanner = htable2.getScanner(scan);
130 Result[] res = scanner.next(rowCount);
131 scanner.close();
132 if (res.length != 0) {
133 if (res.length < lastCount) {
134 i--; // Don't increment timeout if we make progress
136 lastCount = res.length;
137 LOG.info("Still got " + res.length + " rows");
138 Thread.sleep(SLEEP_TIME);
139 } else {
140 break;
145 protected static void waitForReplication(int expectedRows, int retries)
146 throws IOException, InterruptedException {
147 waitForReplication(htable2, expectedRows, retries);
150 protected static void waitForReplication(Table htable2, int expectedRows, int retries)
151 throws IOException, InterruptedException {
152 Scan scan;
153 for (int i = 0; i < retries; i++) {
154 scan = new Scan();
155 if (i== retries -1) {
156 fail("Waited too much time for normal batch replication");
158 ResultScanner scanner = htable2.getScanner(scan);
159 Result[] res = scanner.next(expectedRows);
160 scanner.close();
161 if (res.length != expectedRows) {
162 LOG.info("Only got " + res.length + " rows");
163 Thread.sleep(SLEEP_TIME);
164 } else {
165 break;
170 protected static void loadData(String prefix, byte[] row) throws IOException {
171 loadData(prefix, row, famName);
174 protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException {
175 List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH);
176 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
177 Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i)));
178 put.addColumn(familyName, row, row);
179 puts.add(put);
181 htable1.put(puts);
184 protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
185 Configuration conf = util.getConfiguration();
186 conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
187 // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
188 // sufficient number of events. But we don't want to go too low because
189 // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
190 // more than one batch sent to the peer cluster for better testing.
191 conf.setInt("replication.source.size.capacity", 102400);
192 conf.setLong("replication.source.sleepforretries", 100);
193 conf.setInt("hbase.regionserver.maxlogs", 10);
194 conf.setLong("hbase.master.logcleaner.ttl", 10);
195 conf.setInt("zookeeper.recovery.retry", 1);
196 conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
197 conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
198 conf.setInt("replication.stats.thread.period.seconds", 5);
199 conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
200 conf.setLong("replication.sleep.before.failover", 2000);
201 conf.setInt("replication.source.maxretriesmultiplier", 10);
202 conf.setFloat("replication.source.ratio", 1.0f);
203 conf.setBoolean("replication.source.eof.autorecovery", true);
204 conf.setLong("hbase.serial.replication.waiting.ms", 100);
207 static void configureClusters(HBaseTestingUtility util1,
208 HBaseTestingUtility util2) {
209 setupConfig(util1, "/1");
210 setupConfig(util2, "/2");
212 Configuration conf2 = util2.getConfiguration();
213 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
214 conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
215 conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
218 static void restartSourceCluster(int numSlaves) throws Exception {
219 Closeables.close(hbaseAdmin, true);
220 Closeables.close(htable1, true);
221 UTIL1.shutdownMiniHBaseCluster();
222 UTIL1.restartHBaseCluster(numSlaves);
223 // Invalidate the cached connection state.
224 CONF1 = UTIL1.getConfiguration();
225 hbaseAdmin = UTIL1.getAdmin();
226 Connection connection1 = UTIL1.getConnection();
227 htable1 = connection1.getTable(tableName);
230 static void restartTargetHBaseCluster(int numSlaves) throws Exception {
231 Closeables.close(htable2, true);
232 UTIL2.restartHBaseCluster(numSlaves);
233 // Invalidate the cached connection state
234 CONF2 = UTIL2.getConfiguration();
235 htable2 = UTIL2.getConnection().getTable(tableName);
238 private static void startClusters() throws Exception {
239 UTIL1.startMiniZKCluster();
240 MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
241 LOG.info("Setup first Zk");
243 UTIL2.setZkCluster(miniZK);
244 LOG.info("Setup second Zk");
246 CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
247 UTIL1.startMiniCluster(NUM_SLAVES1);
248 // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
249 // as a component in deciding maximum number of parallel batches to send to the peer cluster.
250 UTIL2.startMiniCluster(NUM_SLAVES2);
252 connection1 = ConnectionFactory.createConnection(CONF1);
253 connection2 = ConnectionFactory.createConnection(CONF2);
254 hbaseAdmin = connection1.getAdmin();
256 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
257 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
258 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
259 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
261 try (
262 Admin admin1 = connection1.getAdmin();
263 Admin admin2 = connection2.getAdmin()) {
264 admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
265 admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
266 UTIL1.waitUntilAllRegionsAssigned(tableName);
267 htable1 = connection1.getTable(tableName);
268 UTIL2.waitUntilAllRegionsAssigned(tableName);
269 htable2 = connection2.getTable(tableName);
274 @BeforeClass
275 public static void setUpBeforeClass() throws Exception {
276 configureClusters(UTIL1, UTIL2);
277 startClusters();
280 private boolean peerExist(String peerId) throws IOException {
281 return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId()));
284 @Before
285 public void setUpBase() throws Exception {
286 if (!peerExist(PEER_ID2)) {
287 ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder()
288 .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).setReplicationEndpointImpl(
289 ReplicationEndpointTest.class.getName());
290 if (isSyncPeer()) {
291 FileSystem fs2 = UTIL2.getTestFileSystem();
292 // The remote wal dir is not important as we do not use it in DA state, here we only need to
293 // confirm that a sync peer in DA state can still replicate data to remote cluster
294 // asynchronously.
295 builder.setReplicateAllUserTables(false)
296 .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of()))
297 .setRemoteWALDir(new Path("/RemoteWAL")
298 .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString());
300 hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build());
304 @After
305 public void tearDownBase() throws Exception {
306 if (peerExist(PEER_ID2)) {
307 hbaseAdmin.removeReplicationPeer(PEER_ID2);
311 protected static void runSimplePutDeleteTest() throws IOException, InterruptedException {
312 Put put = new Put(row);
313 put.addColumn(famName, row, row);
315 htable1 = UTIL1.getConnection().getTable(tableName);
316 htable1.put(put);
318 Get get = new Get(row);
319 for (int i = 0; i < NB_RETRIES; i++) {
320 if (i == NB_RETRIES - 1) {
321 fail("Waited too much time for put replication");
323 Result res = htable2.get(get);
324 if (res.isEmpty()) {
325 LOG.info("Row not available");
326 Thread.sleep(SLEEP_TIME);
327 } else {
328 assertArrayEquals(row, res.value());
329 break;
333 Delete del = new Delete(row);
334 htable1.delete(del);
336 get = new Get(row);
337 for (int i = 0; i < NB_RETRIES; i++) {
338 if (i == NB_RETRIES - 1) {
339 fail("Waited too much time for del replication");
341 Result res = htable2.get(get);
342 if (res.size() >= 1) {
343 LOG.info("Row not deleted");
344 Thread.sleep(SLEEP_TIME);
345 } else {
346 break;
351 protected static void runSmallBatchTest() throws IOException, InterruptedException {
352 // normal Batch tests
353 loadData("", row);
355 Scan scan = new Scan();
357 ResultScanner scanner1 = htable1.getScanner(scan);
358 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
359 scanner1.close();
360 assertEquals(NB_ROWS_IN_BATCH, res1.length);
362 waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
365 @AfterClass
366 public static void tearDownAfterClass() throws Exception {
367 if (htable2 != null) {
368 htable2.close();
370 if (htable1 != null) {
371 htable1.close();
373 if (hbaseAdmin != null) {
374 hbaseAdmin.close();
377 if (connection2 != null) {
378 connection2.close();
380 if (connection1 != null) {
381 connection1.close();
383 UTIL2.shutdownMiniCluster();
384 UTIL1.shutdownMiniCluster();
388 * Custom replication endpoint to keep track of replication status for tests.
390 public static class ReplicationEndpointTest extends HBaseInterClusterReplicationEndpoint {
391 public ReplicationEndpointTest() {
392 replicateCount.set(0);
395 @Override public boolean replicate(ReplicateContext replicateContext) {
396 replicateCount.incrementAndGet();
397 replicatedEntries.addAll(replicateContext.getEntries());
399 return super.replicate(replicateContext);