HBASE-26921 Rewrite the counting cells part in TestMultiVersions (#4316)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestReplicationDroppedTables.java
blob872aa0a204ddbeafb2a2b20fc793d576a94f48bc
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import static org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.REPLICATION_DROP_ON_DELETED_TABLE_KEY;
21 import static org.junit.Assert.fail;
23 import java.io.IOException;
25 import org.apache.hadoop.hbase.HBaseClassTestRule;
26 import org.apache.hadoop.hbase.HConstants;
27 import org.apache.hadoop.hbase.NamespaceDescriptor;
28 import org.apache.hadoop.hbase.TableName;
29 import org.apache.hadoop.hbase.client.Admin;
30 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
31 import org.apache.hadoop.hbase.client.Connection;
32 import org.apache.hadoop.hbase.client.ConnectionFactory;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.ResultScanner;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.client.Table;
38 import org.apache.hadoop.hbase.client.TableDescriptor;
39 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
40 import org.apache.hadoop.hbase.ipc.RpcServer;
41 import org.apache.hadoop.hbase.testclassification.LargeTests;
42 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.hbase.util.JVMClusterUtil;
45 import org.junit.Assert;
46 import org.junit.Before;
47 import org.junit.ClassRule;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 @Category({ ReplicationTests.class, LargeTests.class })
54 public class TestReplicationDroppedTables extends TestReplicationBase {
56 @ClassRule
57 public static final HBaseClassTestRule CLASS_RULE =
58 HBaseClassTestRule.forClass(TestReplicationDroppedTables.class);
60 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
61 private static final int ROWS_COUNT = 1000;
63 @Before
64 public void setUpBase() throws Exception {
65 // Starting and stopping replication can make us miss new logs,
66 // rolling like this makes sure the most recent one gets added to the queue
67 for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
68 .getRegionServerThreads()) {
69 UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
71 // Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
72 super.setUpBase();
73 int rowCount = UTIL1.countRows(tableName);
74 UTIL1.deleteTableData(tableName);
75 // truncating the table will send one Delete per row to the slave cluster
76 // in an async fashion, which is why we cannot just call deleteTableData on
77 // utility2 since late writes could make it to the slave in some way.
78 // Instead, we truncate the first table and wait for all the Deletes to
79 // make it to the slave.
80 Scan scan = new Scan();
81 int lastCount = 0;
82 for (int i = 0; i < NB_RETRIES; i++) {
83 if (i == NB_RETRIES - 1) {
84 fail("Waited too much time for truncate");
86 ResultScanner scanner = htable2.getScanner(scan);
87 Result[] res = scanner.next(rowCount);
88 scanner.close();
89 if (res.length != 0) {
90 if (res.length < lastCount) {
91 i--; // Don't increment timeout if we make progress
93 lastCount = res.length;
94 LOG.info("Still got " + res.length + " rows");
95 Thread.sleep(SLEEP_TIME);
96 } else {
97 break;
100 // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
101 // batches. the default max request size is 256M, so all replication entries are in a batch, but
102 // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
103 // may apply first, and then test_dropped table, and we will believe that the replication is not
104 // got stuck (HBASE-20475).
105 CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
108 @Test
109 public void testEditsStuckBehindDroppedTable() throws Exception {
110 // Sanity check Make sure by default edits for dropped tables stall the replication queue, even
111 // when the table(s) in question have been deleted on both ends.
112 testEditsBehindDroppedTable(false, "test_dropped");
115 @Test
116 public void testEditsDroppedWithDroppedTable() throws Exception {
117 // Make sure by default edits for dropped tables are themselves dropped when the
118 // table(s) in question have been deleted on both ends.
119 testEditsBehindDroppedTable(true, "test_dropped");
122 @Test
123 public void testEditsDroppedWithDroppedTableNS() throws Exception {
124 // also try with a namespace
125 UTIL1.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
126 UTIL2.getAdmin().createNamespace(NamespaceDescriptor.create("NS").build());
127 try {
128 testEditsBehindDroppedTable(true, "NS:test_dropped");
129 } finally {
130 UTIL1.getAdmin().deleteNamespace("NS");
131 UTIL2.getAdmin().deleteNamespace("NS");
135 private byte[] generateRowKey(int id) {
136 return Bytes.toBytes(String.format("NormalPut%03d", id));
139 private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
140 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
141 CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
143 // make sure we have a single region server only, so that all
144 // edits for all tables go there
145 restartSourceCluster(1);
147 TableName tablename = TableName.valueOf(tName);
148 byte[] familyName = Bytes.toBytes("fam");
149 byte[] row = Bytes.toBytes("row");
151 TableDescriptor table =
152 TableDescriptorBuilder
153 .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
154 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
155 .build();
157 Connection connection1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
158 Connection connection2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
159 try (Admin admin1 = connection1.getAdmin()) {
160 admin1.createTable(table);
162 try (Admin admin2 = connection2.getAdmin()) {
163 admin2.createTable(table);
165 UTIL1.waitUntilAllRegionsAssigned(tablename);
166 UTIL2.waitUntilAllRegionsAssigned(tablename);
168 // now suspend replication
169 try (Admin admin1 = connection1.getAdmin()) {
170 admin1.disableReplicationPeer(PEER_ID2);
173 // put some data (lead with 0 so the edit gets sorted before the other table's edits
174 // in the replication batch) write a bunch of edits, making sure we fill a batch
175 try (Table droppedTable = connection1.getTable(tablename)) {
176 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
177 Put put = new Put(rowKey);
178 put.addColumn(familyName, row, row);
179 droppedTable.put(put);
182 try (Table table1 = connection1.getTable(tableName)) {
183 for (int i = 0; i < ROWS_COUNT; i++) {
184 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
185 table1.put(put);
189 try (Admin admin1 = connection1.getAdmin()) {
190 admin1.disableTable(tablename);
191 admin1.deleteTable(tablename);
193 try (Admin admin2 = connection2.getAdmin()) {
194 admin2.disableTable(tablename);
195 admin2.deleteTable(tablename);
198 try (Admin admin1 = connection1.getAdmin()) {
199 admin1.enableReplicationPeer(PEER_ID2);
202 if (allowProceeding) {
203 // in this we'd expect the key to make it over
204 verifyReplicationProceeded();
205 } else {
206 verifyReplicationStuck();
208 // just to be safe
209 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
212 @Test
213 public void testEditsBehindDroppedTableTiming() throws Exception {
214 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
215 CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
217 // make sure we have a single region server only, so that all
218 // edits for all tables go there
219 restartSourceCluster(1);
221 TableName tablename = TableName.valueOf("testdroppedtimed");
222 byte[] familyName = Bytes.toBytes("fam");
223 byte[] row = Bytes.toBytes("row");
225 TableDescriptor table =
226 TableDescriptorBuilder
227 .newBuilder(tablename).setColumnFamily(ColumnFamilyDescriptorBuilder
228 .newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
229 .build();
231 Connection connection1 = ConnectionFactory.createConnection(CONF1);
232 Connection connection2 = ConnectionFactory.createConnection(CONF2);
233 try (Admin admin1 = connection1.getAdmin()) {
234 admin1.createTable(table);
236 try (Admin admin2 = connection2.getAdmin()) {
237 admin2.createTable(table);
239 UTIL1.waitUntilAllRegionsAssigned(tablename);
240 UTIL2.waitUntilAllRegionsAssigned(tablename);
242 // now suspend replication
243 try (Admin admin1 = connection1.getAdmin()) {
244 admin1.disableReplicationPeer(PEER_ID2);
247 // put some data (lead with 0 so the edit gets sorted before the other table's edits
248 // in the replication batch) write a bunch of edits, making sure we fill a batch
249 try (Table droppedTable = connection1.getTable(tablename)) {
250 byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
251 Put put = new Put(rowKey);
252 put.addColumn(familyName, row, row);
253 droppedTable.put(put);
256 try (Table table1 = connection1.getTable(tableName)) {
257 for (int i = 0; i < ROWS_COUNT; i++) {
258 Put put = new Put(generateRowKey(i)).addColumn(famName, row, row);
259 table1.put(put);
263 try (Admin admin2 = connection2.getAdmin()) {
264 admin2.disableTable(tablename);
265 admin2.deleteTable(tablename);
268 // edit should still be stuck
269 try (Admin admin1 = connection1.getAdmin()) {
270 // enable the replication peer.
271 admin1.enableReplicationPeer(PEER_ID2);
272 // the source table still exists, replication should be stalled
273 verifyReplicationStuck();
275 admin1.disableTable(tablename);
276 // still stuck, source table still exists
277 verifyReplicationStuck();
279 admin1.deleteTable(tablename);
280 // now the source table is gone, replication should proceed, the
281 // offending edits be dropped
282 verifyReplicationProceeded();
284 // just to be safe
285 CONF1.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
288 private boolean peerHasAllNormalRows() throws IOException {
289 try (ResultScanner scanner = htable2.getScanner(new Scan())) {
290 Result[] results = scanner.next(ROWS_COUNT);
291 if (results.length != ROWS_COUNT) {
292 return false;
294 for (int i = 0; i < results.length; i++) {
295 Assert.assertArrayEquals(generateRowKey(i), results[i].getRow());
297 return true;
301 private void verifyReplicationProceeded() throws Exception {
302 for (int i = 0; i < NB_RETRIES; i++) {
303 if (i == NB_RETRIES - 1) {
304 fail("Waited too much time for put replication");
306 if (!peerHasAllNormalRows()) {
307 LOG.info("Row not available");
308 Thread.sleep(SLEEP_TIME);
309 } else {
310 break;
315 private void verifyReplicationStuck() throws Exception {
316 for (int i = 0; i < NB_RETRIES; i++) {
317 if (peerHasAllNormalRows()) {
318 fail("Edit should have been stuck behind dropped tables");
319 } else {
320 LOG.info("Row not replicated, let's wait a bit more...");
321 Thread.sleep(SLEEP_TIME);