2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.apache
.hadoop
.hbase
.replication
.regionserver
.HBaseInterClusterReplicationEndpoint
.REPLICATION_DROP_ON_DELETED_TABLE_KEY
;
21 import static org
.junit
.Assert
.fail
;
23 import java
.io
.IOException
;
25 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
26 import org
.apache
.hadoop
.hbase
.HConstants
;
27 import org
.apache
.hadoop
.hbase
.NamespaceDescriptor
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.client
.Admin
;
30 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
31 import org
.apache
.hadoop
.hbase
.client
.Connection
;
32 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
33 import org
.apache
.hadoop
.hbase
.client
.Put
;
34 import org
.apache
.hadoop
.hbase
.client
.Result
;
35 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
36 import org
.apache
.hadoop
.hbase
.client
.Scan
;
37 import org
.apache
.hadoop
.hbase
.client
.Table
;
38 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
39 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
40 import org
.apache
.hadoop
.hbase
.ipc
.RpcServer
;
41 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
43 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
44 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
;
45 import org
.junit
.Assert
;
46 import org
.junit
.Before
;
47 import org
.junit
.ClassRule
;
48 import org
.junit
.Test
;
49 import org
.junit
.experimental
.categories
.Category
;
50 import org
.slf4j
.Logger
;
51 import org
.slf4j
.LoggerFactory
;
53 @Category({ ReplicationTests
.class, LargeTests
.class })
54 public class TestReplicationDroppedTables
extends TestReplicationBase
{
57 public static final HBaseClassTestRule CLASS_RULE
=
58 HBaseClassTestRule
.forClass(TestReplicationDroppedTables
.class);
60 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationDroppedTables
.class);
61 private static final int ROWS_COUNT
= 1000;
64 public void setUpBase() throws Exception
{
65 // Starting and stopping replication can make us miss new logs,
66 // rolling like this makes sure the most recent one gets added to the queue
67 for (JVMClusterUtil
.RegionServerThread r
: UTIL1
.getHBaseCluster()
68 .getRegionServerThreads()) {
69 UTIL1
.getAdmin().rollWALWriter(r
.getRegionServer().getServerName());
71 // Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
73 int rowCount
= UTIL1
.countRows(tableName
);
74 UTIL1
.deleteTableData(tableName
);
75 // truncating the table will send one Delete per row to the slave cluster
76 // in an async fashion, which is why we cannot just call deleteTableData on
77 // utility2 since late writes could make it to the slave in some way.
78 // Instead, we truncate the first table and wait for all the Deletes to
79 // make it to the slave.
80 Scan scan
= new Scan();
82 for (int i
= 0; i
< NB_RETRIES
; i
++) {
83 if (i
== NB_RETRIES
- 1) {
84 fail("Waited too much time for truncate");
86 ResultScanner scanner
= htable2
.getScanner(scan
);
87 Result
[] res
= scanner
.next(rowCount
);
89 if (res
.length
!= 0) {
90 if (res
.length
< lastCount
) {
91 i
--; // Don't increment timeout if we make progress
93 lastCount
= res
.length
;
94 LOG
.info("Still got " + res
.length
+ " rows");
95 Thread
.sleep(SLEEP_TIME
);
100 // Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
101 // batches. the default max request size is 256M, so all replication entries are in a batch, but
102 // when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
103 // may apply first, and then test_dropped table, and we will believe that the replication is not
104 // got stuck (HBASE-20475).
105 CONF1
.setInt(RpcServer
.MAX_REQUEST_SIZE
, 10 * 1024);
109 public void testEditsStuckBehindDroppedTable() throws Exception
{
110 // Sanity check Make sure by default edits for dropped tables stall the replication queue, even
111 // when the table(s) in question have been deleted on both ends.
112 testEditsBehindDroppedTable(false, "test_dropped");
116 public void testEditsDroppedWithDroppedTable() throws Exception
{
117 // Make sure by default edits for dropped tables are themselves dropped when the
118 // table(s) in question have been deleted on both ends.
119 testEditsBehindDroppedTable(true, "test_dropped");
123 public void testEditsDroppedWithDroppedTableNS() throws Exception
{
124 // also try with a namespace
125 UTIL1
.getAdmin().createNamespace(NamespaceDescriptor
.create("NS").build());
126 UTIL2
.getAdmin().createNamespace(NamespaceDescriptor
.create("NS").build());
128 testEditsBehindDroppedTable(true, "NS:test_dropped");
130 UTIL1
.getAdmin().deleteNamespace("NS");
131 UTIL2
.getAdmin().deleteNamespace("NS");
135 private byte[] generateRowKey(int id
) {
136 return Bytes
.toBytes(String
.format("NormalPut%03d", id
));
139 private void testEditsBehindDroppedTable(boolean allowProceeding
, String tName
) throws Exception
{
140 CONF1
.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY
, allowProceeding
);
141 CONF1
.setInt(HConstants
.REPLICATION_SOURCE_MAXTHREADS_KEY
, 1);
143 // make sure we have a single region server only, so that all
144 // edits for all tables go there
145 restartSourceCluster(1);
147 TableName tablename
= TableName
.valueOf(tName
);
148 byte[] familyName
= Bytes
.toBytes("fam");
149 byte[] row
= Bytes
.toBytes("row");
151 TableDescriptor table
=
152 TableDescriptorBuilder
153 .newBuilder(tablename
).setColumnFamily(ColumnFamilyDescriptorBuilder
154 .newBuilder(familyName
).setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
157 Connection connection1
= ConnectionFactory
.createConnection(UTIL1
.getConfiguration());
158 Connection connection2
= ConnectionFactory
.createConnection(UTIL2
.getConfiguration());
159 try (Admin admin1
= connection1
.getAdmin()) {
160 admin1
.createTable(table
);
162 try (Admin admin2
= connection2
.getAdmin()) {
163 admin2
.createTable(table
);
165 UTIL1
.waitUntilAllRegionsAssigned(tablename
);
166 UTIL2
.waitUntilAllRegionsAssigned(tablename
);
168 // now suspend replication
169 try (Admin admin1
= connection1
.getAdmin()) {
170 admin1
.disableReplicationPeer(PEER_ID2
);
173 // put some data (lead with 0 so the edit gets sorted before the other table's edits
174 // in the replication batch) write a bunch of edits, making sure we fill a batch
175 try (Table droppedTable
= connection1
.getTable(tablename
)) {
176 byte[] rowKey
= Bytes
.toBytes(0 + " put on table to be dropped");
177 Put put
= new Put(rowKey
);
178 put
.addColumn(familyName
, row
, row
);
179 droppedTable
.put(put
);
182 try (Table table1
= connection1
.getTable(tableName
)) {
183 for (int i
= 0; i
< ROWS_COUNT
; i
++) {
184 Put put
= new Put(generateRowKey(i
)).addColumn(famName
, row
, row
);
189 try (Admin admin1
= connection1
.getAdmin()) {
190 admin1
.disableTable(tablename
);
191 admin1
.deleteTable(tablename
);
193 try (Admin admin2
= connection2
.getAdmin()) {
194 admin2
.disableTable(tablename
);
195 admin2
.deleteTable(tablename
);
198 try (Admin admin1
= connection1
.getAdmin()) {
199 admin1
.enableReplicationPeer(PEER_ID2
);
202 if (allowProceeding
) {
203 // in this we'd expect the key to make it over
204 verifyReplicationProceeded();
206 verifyReplicationStuck();
209 CONF1
.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY
, false);
213 public void testEditsBehindDroppedTableTiming() throws Exception
{
214 CONF1
.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY
, true);
215 CONF1
.setInt(HConstants
.REPLICATION_SOURCE_MAXTHREADS_KEY
, 1);
217 // make sure we have a single region server only, so that all
218 // edits for all tables go there
219 restartSourceCluster(1);
221 TableName tablename
= TableName
.valueOf("testdroppedtimed");
222 byte[] familyName
= Bytes
.toBytes("fam");
223 byte[] row
= Bytes
.toBytes("row");
225 TableDescriptor table
=
226 TableDescriptorBuilder
227 .newBuilder(tablename
).setColumnFamily(ColumnFamilyDescriptorBuilder
228 .newBuilder(familyName
).setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
231 Connection connection1
= ConnectionFactory
.createConnection(CONF1
);
232 Connection connection2
= ConnectionFactory
.createConnection(CONF2
);
233 try (Admin admin1
= connection1
.getAdmin()) {
234 admin1
.createTable(table
);
236 try (Admin admin2
= connection2
.getAdmin()) {
237 admin2
.createTable(table
);
239 UTIL1
.waitUntilAllRegionsAssigned(tablename
);
240 UTIL2
.waitUntilAllRegionsAssigned(tablename
);
242 // now suspend replication
243 try (Admin admin1
= connection1
.getAdmin()) {
244 admin1
.disableReplicationPeer(PEER_ID2
);
247 // put some data (lead with 0 so the edit gets sorted before the other table's edits
248 // in the replication batch) write a bunch of edits, making sure we fill a batch
249 try (Table droppedTable
= connection1
.getTable(tablename
)) {
250 byte[] rowKey
= Bytes
.toBytes(0 + " put on table to be dropped");
251 Put put
= new Put(rowKey
);
252 put
.addColumn(familyName
, row
, row
);
253 droppedTable
.put(put
);
256 try (Table table1
= connection1
.getTable(tableName
)) {
257 for (int i
= 0; i
< ROWS_COUNT
; i
++) {
258 Put put
= new Put(generateRowKey(i
)).addColumn(famName
, row
, row
);
263 try (Admin admin2
= connection2
.getAdmin()) {
264 admin2
.disableTable(tablename
);
265 admin2
.deleteTable(tablename
);
268 // edit should still be stuck
269 try (Admin admin1
= connection1
.getAdmin()) {
270 // enable the replication peer.
271 admin1
.enableReplicationPeer(PEER_ID2
);
272 // the source table still exists, replication should be stalled
273 verifyReplicationStuck();
275 admin1
.disableTable(tablename
);
276 // still stuck, source table still exists
277 verifyReplicationStuck();
279 admin1
.deleteTable(tablename
);
280 // now the source table is gone, replication should proceed, the
281 // offending edits be dropped
282 verifyReplicationProceeded();
285 CONF1
.setBoolean(REPLICATION_DROP_ON_DELETED_TABLE_KEY
, false);
288 private boolean peerHasAllNormalRows() throws IOException
{
289 try (ResultScanner scanner
= htable2
.getScanner(new Scan())) {
290 Result
[] results
= scanner
.next(ROWS_COUNT
);
291 if (results
.length
!= ROWS_COUNT
) {
294 for (int i
= 0; i
< results
.length
; i
++) {
295 Assert
.assertArrayEquals(generateRowKey(i
), results
[i
].getRow());
301 private void verifyReplicationProceeded() throws Exception
{
302 for (int i
= 0; i
< NB_RETRIES
; i
++) {
303 if (i
== NB_RETRIES
- 1) {
304 fail("Waited too much time for put replication");
306 if (!peerHasAllNormalRows()) {
307 LOG
.info("Row not available");
308 Thread
.sleep(SLEEP_TIME
);
315 private void verifyReplicationStuck() throws Exception
{
316 for (int i
= 0; i
< NB_RETRIES
; i
++) {
317 if (peerHasAllNormalRows()) {
318 fail("Edit should have been stuck behind dropped tables");
320 LOG
.info("Row not replicated, let's wait a bit more...");
321 Thread
.sleep(SLEEP_TIME
);