HBASE-19811 Fix findbugs and error-prone warnings in hbase-server (branch-2)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestReplicationSyncUpTool.java
blob8661dd888abe25eca4c09a2c789f4a077d0b1fe3
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import org.apache.hadoop.hbase.HBaseTestingUtility;
21 import org.apache.hadoop.hbase.HColumnDescriptor;
22 import org.apache.hadoop.hbase.HConstants;
23 import org.apache.hadoop.hbase.HTableDescriptor;
24 import org.apache.hadoop.hbase.TableName;
25 import org.apache.hadoop.hbase.client.Admin;
26 import org.apache.hadoop.hbase.client.Connection;
27 import org.apache.hadoop.hbase.client.ConnectionFactory;
28 import org.apache.hadoop.hbase.client.Delete;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.client.Table;
31 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
32 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
33 import org.apache.hadoop.hbase.testclassification.LargeTests;
34 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.junit.Before;
37 import org.junit.Test;
38 import org.junit.experimental.categories.Category;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import java.util.ArrayList;
43 import java.util.List;
45 import static org.junit.Assert.assertEquals;
47 @Category({ReplicationTests.class, LargeTests.class})
48 public class TestReplicationSyncUpTool extends TestReplicationBase {
50 private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
52 private static final TableName t1_su = TableName.valueOf("t1_syncup");
53 private static final TableName t2_su = TableName.valueOf("t2_syncup");
55 protected static final byte[] famName = Bytes.toBytes("cf1");
56 private static final byte[] qualName = Bytes.toBytes("q1");
58 protected static final byte[] noRepfamName = Bytes.toBytes("norep");
60 private HTableDescriptor t1_syncupSource, t1_syncupTarget;
61 private HTableDescriptor t2_syncupSource, t2_syncupTarget;
63 protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
65 @Before
66 public void setUp() throws Exception {
67 HColumnDescriptor fam;
69 t1_syncupSource = new HTableDescriptor(t1_su);
70 fam = new HColumnDescriptor(famName);
71 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
72 t1_syncupSource.addFamily(fam);
73 fam = new HColumnDescriptor(noRepfamName);
74 t1_syncupSource.addFamily(fam);
76 t1_syncupTarget = new HTableDescriptor(t1_su);
77 fam = new HColumnDescriptor(famName);
78 t1_syncupTarget.addFamily(fam);
79 fam = new HColumnDescriptor(noRepfamName);
80 t1_syncupTarget.addFamily(fam);
82 t2_syncupSource = new HTableDescriptor(t2_su);
83 fam = new HColumnDescriptor(famName);
84 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
85 t2_syncupSource.addFamily(fam);
86 fam = new HColumnDescriptor(noRepfamName);
87 t2_syncupSource.addFamily(fam);
89 t2_syncupTarget = new HTableDescriptor(t2_su);
90 fam = new HColumnDescriptor(famName);
91 t2_syncupTarget.addFamily(fam);
92 fam = new HColumnDescriptor(noRepfamName);
93 t2_syncupTarget.addFamily(fam);
97 /**
98 * Add a row to a table in each cluster, check it's replicated, delete it,
99 * check's gone Also check the puts and deletes are not replicated back to
100 * the originating cluster.
102 @Test
103 public void testSyncUpTool() throws Exception {
106 * Set up Replication: on Master and one Slave
107 * Table: t1_syncup and t2_syncup
108 * columnfamily:
109 * 'cf1' : replicated
110 * 'norep': not replicated
112 setupReplication();
115 * at Master:
116 * t1_syncup: put 100 rows into cf1, and 1 rows into norep
117 * t2_syncup: put 200 rows into cf1, and 1 rows into norep
119 * verify correctly replicated to slave
121 putAndReplicateRows();
124 * Verify delete works
126 * step 1: stop hbase on Slave
128 * step 2: at Master:
129 * t1_syncup: delete 50 rows from cf1
130 * t2_syncup: delete 100 rows from cf1
131 * no change on 'norep'
133 * step 3: stop hbase on master, restart hbase on Slave
135 * step 4: verify Slave still have the rows before delete
136 * t1_syncup: 100 rows from cf1
137 * t2_syncup: 200 rows from cf1
139 * step 5: run syncup tool on Master
141 * step 6: verify that delete show up on Slave
142 * t1_syncup: 50 rows from cf1
143 * t2_syncup: 100 rows from cf1
145 * verify correctly replicated to Slave
147 mimicSyncUpAfterDelete();
150 * Verify put works
152 * step 1: stop hbase on Slave
154 * step 2: at Master:
155 * t1_syncup: put 100 rows from cf1
156 * t2_syncup: put 200 rows from cf1
157 * and put another row on 'norep'
158 * ATTN: put to 'cf1' will overwrite existing rows, so end count will
159 * be 100 and 200 respectively
160 * put to 'norep' will add a new row.
162 * step 3: stop hbase on master, restart hbase on Slave
164 * step 4: verify Slave still has the rows before put
165 * t1_syncup: 50 rows from cf1
166 * t2_syncup: 100 rows from cf1
168 * step 5: run syncup tool on Master
170 * step 6: verify that put show up on Slave
171 * and 'norep' does not
172 * t1_syncup: 100 rows from cf1
173 * t2_syncup: 200 rows from cf1
175 * verify correctly replicated to Slave
177 mimicSyncUpAfterPut();
180 protected void setupReplication() throws Exception {
181 ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
182 ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
184 Admin ha = utility1.getAdmin();
185 ha.createTable(t1_syncupSource);
186 ha.createTable(t2_syncupSource);
187 ha.close();
189 ha = utility2.getAdmin();
190 ha.createTable(t1_syncupTarget);
191 ha.createTable(t2_syncupTarget);
192 ha.close();
194 Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration());
195 Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration());
197 // Get HTable from Master
198 ht1Source = connection1.getTable(t1_su);
199 ht2Source = connection1.getTable(t2_su);
201 // Get HTable from Peer1
202 ht1TargetAtPeer1 = connection2.getTable(t1_su);
203 ht2TargetAtPeer1 = connection2.getTable(t2_su);
206 * set M-S : Master: utility1 Slave1: utility2
208 ReplicationPeerConfig rpc = new ReplicationPeerConfig();
209 rpc.setClusterKey(utility2.getClusterKey());
210 admin1.addPeer("1", rpc, null);
212 admin1.close();
213 admin2.close();
216 private void putAndReplicateRows() throws Exception {
217 LOG.debug("putAndReplicateRows");
218 // add rows to Master cluster,
219 Put p;
221 // 100 + 1 row to t1_syncup
222 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
223 p = new Put(Bytes.toBytes("row" + i));
224 p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
225 ht1Source.put(p);
227 p = new Put(Bytes.toBytes("row" + 9999));
228 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
229 ht1Source.put(p);
231 // 200 + 1 row to t2_syncup
232 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
233 p = new Put(Bytes.toBytes("row" + i));
234 p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
235 ht2Source.put(p);
237 p = new Put(Bytes.toBytes("row" + 9999));
238 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
239 ht2Source.put(p);
241 // ensure replication completed
242 Thread.sleep(SLEEP_TIME);
243 int rowCount_ht1Source = utility1.countRows(ht1Source);
244 for (int i = 0; i < NB_RETRIES; i++) {
245 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
246 if (i==NB_RETRIES-1) {
247 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
248 rowCount_ht1TargetAtPeer1);
250 if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
251 break;
253 Thread.sleep(SLEEP_TIME);
256 int rowCount_ht2Source = utility1.countRows(ht2Source);
257 for (int i = 0; i < NB_RETRIES; i++) {
258 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
259 if (i==NB_RETRIES-1) {
260 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
261 rowCount_ht2TargetAtPeer1);
263 if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
264 break;
266 Thread.sleep(SLEEP_TIME);
270 private void mimicSyncUpAfterDelete() throws Exception {
271 LOG.debug("mimicSyncUpAfterDelete");
272 utility2.shutdownMiniHBaseCluster();
274 List<Delete> list = new ArrayList<>();
275 // delete half of the rows
276 for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
277 String rowKey = "row" + i;
278 Delete del = new Delete(Bytes.toBytes(rowKey));
279 list.add(del);
281 ht1Source.delete(list);
283 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
284 String rowKey = "row" + i;
285 Delete del = new Delete(Bytes.toBytes(rowKey));
286 list.add(del);
288 ht2Source.delete(list);
290 int rowCount_ht1Source = utility1.countRows(ht1Source);
291 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
292 rowCount_ht1Source);
294 int rowCount_ht2Source = utility1.countRows(ht2Source);
295 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
296 101, rowCount_ht2Source);
298 utility1.shutdownMiniHBaseCluster();
299 utility2.restartHBaseCluster(1);
301 Thread.sleep(SLEEP_TIME);
303 // before sync up
304 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
305 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
306 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
307 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
309 // After sync up
310 for (int i = 0; i < NB_RETRIES; i++) {
311 syncUp(utility1);
312 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
313 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
314 if (i == NB_RETRIES - 1) {
315 if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
316 // syncUP still failed. Let's look at the source in case anything wrong there
317 utility1.restartHBaseCluster(1);
318 rowCount_ht1Source = utility1.countRows(ht1Source);
319 LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
320 rowCount_ht2Source = utility1.countRows(ht2Source);
321 LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
323 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
324 rowCount_ht1TargetAtPeer1);
325 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
326 rowCount_ht2TargetAtPeer1);
328 if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
329 LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
330 break;
331 } else {
332 LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
333 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
334 + rowCount_ht2TargetAtPeer1);
336 Thread.sleep(SLEEP_TIME);
340 private void mimicSyncUpAfterPut() throws Exception {
341 LOG.debug("mimicSyncUpAfterPut");
342 utility1.restartHBaseCluster(1);
343 utility2.shutdownMiniHBaseCluster();
345 Put p;
346 // another 100 + 1 row to t1_syncup
347 // we should see 100 + 2 rows now
348 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
349 p = new Put(Bytes.toBytes("row" + i));
350 p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
351 ht1Source.put(p);
353 p = new Put(Bytes.toBytes("row" + 9998));
354 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
355 ht1Source.put(p);
357 // another 200 + 1 row to t1_syncup
358 // we should see 200 + 2 rows now
359 for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
360 p = new Put(Bytes.toBytes("row" + i));
361 p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
362 ht2Source.put(p);
364 p = new Put(Bytes.toBytes("row" + 9998));
365 p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
366 ht2Source.put(p);
368 int rowCount_ht1Source = utility1.countRows(ht1Source);
369 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
370 int rowCount_ht2Source = utility1.countRows(ht2Source);
371 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
373 utility1.shutdownMiniHBaseCluster();
374 utility2.restartHBaseCluster(1);
376 Thread.sleep(SLEEP_TIME);
378 // before sync up
379 int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
380 int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
381 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
382 rowCount_ht1TargetAtPeer1);
383 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
384 rowCount_ht2TargetAtPeer1);
386 // after syun up
387 for (int i = 0; i < NB_RETRIES; i++) {
388 syncUp(utility1);
389 rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
390 rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
391 if (i == NB_RETRIES - 1) {
392 if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
393 // syncUP still failed. Let's look at the source in case anything wrong there
394 utility1.restartHBaseCluster(1);
395 rowCount_ht1Source = utility1.countRows(ht1Source);
396 LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
397 rowCount_ht2Source = utility1.countRows(ht2Source);
398 LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
400 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
401 rowCount_ht1TargetAtPeer1);
402 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
403 rowCount_ht2TargetAtPeer1);
405 if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
406 LOG.info("SyncUpAfterPut succeeded at retry = " + i);
407 break;
408 } else {
409 LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
410 + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
411 + rowCount_ht2TargetAtPeer1);
413 Thread.sleep(SLEEP_TIME);
417 protected void syncUp(HBaseTestingUtility ut) throws Exception {
418 ReplicationSyncUp.setConfigure(ut.getConfiguration());
419 String[] arguments = new String[] { null };
420 new ReplicationSyncUp().run(arguments);