2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
21 import org
.apache
.hadoop
.hbase
.HColumnDescriptor
;
22 import org
.apache
.hadoop
.hbase
.HConstants
;
23 import org
.apache
.hadoop
.hbase
.HTableDescriptor
;
24 import org
.apache
.hadoop
.hbase
.TableName
;
25 import org
.apache
.hadoop
.hbase
.client
.Admin
;
26 import org
.apache
.hadoop
.hbase
.client
.Connection
;
27 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
28 import org
.apache
.hadoop
.hbase
.client
.Delete
;
29 import org
.apache
.hadoop
.hbase
.client
.Put
;
30 import org
.apache
.hadoop
.hbase
.client
.Table
;
31 import org
.apache
.hadoop
.hbase
.client
.replication
.ReplicationAdmin
;
32 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.ReplicationSyncUp
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
35 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
36 import org
.junit
.Before
;
37 import org
.junit
.Test
;
38 import org
.junit
.experimental
.categories
.Category
;
39 import org
.slf4j
.Logger
;
40 import org
.slf4j
.LoggerFactory
;
42 import java
.util
.ArrayList
;
43 import java
.util
.List
;
45 import static org
.junit
.Assert
.assertEquals
;
47 @Category({ReplicationTests
.class, LargeTests
.class})
48 public class TestReplicationSyncUpTool
extends TestReplicationBase
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(TestReplicationSyncUpTool
.class);
52 private static final TableName t1_su
= TableName
.valueOf("t1_syncup");
53 private static final TableName t2_su
= TableName
.valueOf("t2_syncup");
55 protected static final byte[] famName
= Bytes
.toBytes("cf1");
56 private static final byte[] qualName
= Bytes
.toBytes("q1");
58 protected static final byte[] noRepfamName
= Bytes
.toBytes("norep");
60 private HTableDescriptor t1_syncupSource
, t1_syncupTarget
;
61 private HTableDescriptor t2_syncupSource
, t2_syncupTarget
;
63 protected Table ht1Source
, ht2Source
, ht1TargetAtPeer1
, ht2TargetAtPeer1
;
66 public void setUp() throws Exception
{
67 HColumnDescriptor fam
;
69 t1_syncupSource
= new HTableDescriptor(t1_su
);
70 fam
= new HColumnDescriptor(famName
);
71 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
72 t1_syncupSource
.addFamily(fam
);
73 fam
= new HColumnDescriptor(noRepfamName
);
74 t1_syncupSource
.addFamily(fam
);
76 t1_syncupTarget
= new HTableDescriptor(t1_su
);
77 fam
= new HColumnDescriptor(famName
);
78 t1_syncupTarget
.addFamily(fam
);
79 fam
= new HColumnDescriptor(noRepfamName
);
80 t1_syncupTarget
.addFamily(fam
);
82 t2_syncupSource
= new HTableDescriptor(t2_su
);
83 fam
= new HColumnDescriptor(famName
);
84 fam
.setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
);
85 t2_syncupSource
.addFamily(fam
);
86 fam
= new HColumnDescriptor(noRepfamName
);
87 t2_syncupSource
.addFamily(fam
);
89 t2_syncupTarget
= new HTableDescriptor(t2_su
);
90 fam
= new HColumnDescriptor(famName
);
91 t2_syncupTarget
.addFamily(fam
);
92 fam
= new HColumnDescriptor(noRepfamName
);
93 t2_syncupTarget
.addFamily(fam
);
98 * Add a row to a table in each cluster, check it's replicated, delete it,
99 * check's gone Also check the puts and deletes are not replicated back to
100 * the originating cluster.
103 public void testSyncUpTool() throws Exception
{
106 * Set up Replication: on Master and one Slave
107 * Table: t1_syncup and t2_syncup
110 * 'norep': not replicated
116 * t1_syncup: put 100 rows into cf1, and 1 rows into norep
117 * t2_syncup: put 200 rows into cf1, and 1 rows into norep
119 * verify correctly replicated to slave
121 putAndReplicateRows();
124 * Verify delete works
126 * step 1: stop hbase on Slave
129 * t1_syncup: delete 50 rows from cf1
130 * t2_syncup: delete 100 rows from cf1
131 * no change on 'norep'
133 * step 3: stop hbase on master, restart hbase on Slave
135 * step 4: verify Slave still have the rows before delete
136 * t1_syncup: 100 rows from cf1
137 * t2_syncup: 200 rows from cf1
139 * step 5: run syncup tool on Master
141 * step 6: verify that delete show up on Slave
142 * t1_syncup: 50 rows from cf1
143 * t2_syncup: 100 rows from cf1
145 * verify correctly replicated to Slave
147 mimicSyncUpAfterDelete();
152 * step 1: stop hbase on Slave
155 * t1_syncup: put 100 rows from cf1
156 * t2_syncup: put 200 rows from cf1
157 * and put another row on 'norep'
158 * ATTN: put to 'cf1' will overwrite existing rows, so end count will
159 * be 100 and 200 respectively
160 * put to 'norep' will add a new row.
162 * step 3: stop hbase on master, restart hbase on Slave
164 * step 4: verify Slave still has the rows before put
165 * t1_syncup: 50 rows from cf1
166 * t2_syncup: 100 rows from cf1
168 * step 5: run syncup tool on Master
170 * step 6: verify that put show up on Slave
171 * and 'norep' does not
172 * t1_syncup: 100 rows from cf1
173 * t2_syncup: 200 rows from cf1
175 * verify correctly replicated to Slave
177 mimicSyncUpAfterPut();
180 protected void setupReplication() throws Exception
{
181 ReplicationAdmin admin1
= new ReplicationAdmin(conf1
);
182 ReplicationAdmin admin2
= new ReplicationAdmin(conf2
);
184 Admin ha
= utility1
.getAdmin();
185 ha
.createTable(t1_syncupSource
);
186 ha
.createTable(t2_syncupSource
);
189 ha
= utility2
.getAdmin();
190 ha
.createTable(t1_syncupTarget
);
191 ha
.createTable(t2_syncupTarget
);
194 Connection connection1
= ConnectionFactory
.createConnection(utility1
.getConfiguration());
195 Connection connection2
= ConnectionFactory
.createConnection(utility2
.getConfiguration());
197 // Get HTable from Master
198 ht1Source
= connection1
.getTable(t1_su
);
199 ht2Source
= connection1
.getTable(t2_su
);
201 // Get HTable from Peer1
202 ht1TargetAtPeer1
= connection2
.getTable(t1_su
);
203 ht2TargetAtPeer1
= connection2
.getTable(t2_su
);
206 * set M-S : Master: utility1 Slave1: utility2
208 ReplicationPeerConfig rpc
= new ReplicationPeerConfig();
209 rpc
.setClusterKey(utility2
.getClusterKey());
210 admin1
.addPeer("1", rpc
, null);
216 private void putAndReplicateRows() throws Exception
{
217 LOG
.debug("putAndReplicateRows");
218 // add rows to Master cluster,
221 // 100 + 1 row to t1_syncup
222 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
223 p
= new Put(Bytes
.toBytes("row" + i
));
224 p
.addColumn(famName
, qualName
, Bytes
.toBytes("val" + i
));
227 p
= new Put(Bytes
.toBytes("row" + 9999));
228 p
.addColumn(noRepfamName
, qualName
, Bytes
.toBytes("val" + 9999));
231 // 200 + 1 row to t2_syncup
232 for (int i
= 0; i
< NB_ROWS_IN_BATCH
* 2; i
++) {
233 p
= new Put(Bytes
.toBytes("row" + i
));
234 p
.addColumn(famName
, qualName
, Bytes
.toBytes("val" + i
));
237 p
= new Put(Bytes
.toBytes("row" + 9999));
238 p
.addColumn(noRepfamName
, qualName
, Bytes
.toBytes("val" + 9999));
241 // ensure replication completed
242 Thread
.sleep(SLEEP_TIME
);
243 int rowCount_ht1Source
= utility1
.countRows(ht1Source
);
244 for (int i
= 0; i
< NB_RETRIES
; i
++) {
245 int rowCount_ht1TargetAtPeer1
= utility2
.countRows(ht1TargetAtPeer1
);
246 if (i
==NB_RETRIES
-1) {
247 assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source
- 1,
248 rowCount_ht1TargetAtPeer1
);
250 if (rowCount_ht1Source
- 1 == rowCount_ht1TargetAtPeer1
) {
253 Thread
.sleep(SLEEP_TIME
);
256 int rowCount_ht2Source
= utility1
.countRows(ht2Source
);
257 for (int i
= 0; i
< NB_RETRIES
; i
++) {
258 int rowCount_ht2TargetAtPeer1
= utility2
.countRows(ht2TargetAtPeer1
);
259 if (i
==NB_RETRIES
-1) {
260 assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source
- 1,
261 rowCount_ht2TargetAtPeer1
);
263 if (rowCount_ht2Source
- 1 == rowCount_ht2TargetAtPeer1
) {
266 Thread
.sleep(SLEEP_TIME
);
270 private void mimicSyncUpAfterDelete() throws Exception
{
271 LOG
.debug("mimicSyncUpAfterDelete");
272 utility2
.shutdownMiniHBaseCluster();
274 List
<Delete
> list
= new ArrayList
<>();
275 // delete half of the rows
276 for (int i
= 0; i
< NB_ROWS_IN_BATCH
/ 2; i
++) {
277 String rowKey
= "row" + i
;
278 Delete del
= new Delete(Bytes
.toBytes(rowKey
));
281 ht1Source
.delete(list
);
283 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
284 String rowKey
= "row" + i
;
285 Delete del
= new Delete(Bytes
.toBytes(rowKey
));
288 ht2Source
.delete(list
);
290 int rowCount_ht1Source
= utility1
.countRows(ht1Source
);
291 assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
294 int rowCount_ht2Source
= utility1
.countRows(ht2Source
);
295 assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
296 101, rowCount_ht2Source
);
298 utility1
.shutdownMiniHBaseCluster();
299 utility2
.restartHBaseCluster(1);
301 Thread
.sleep(SLEEP_TIME
);
304 int rowCount_ht1TargetAtPeer1
= utility2
.countRows(ht1TargetAtPeer1
);
305 int rowCount_ht2TargetAtPeer1
= utility2
.countRows(ht2TargetAtPeer1
);
306 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1
);
307 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1
);
310 for (int i
= 0; i
< NB_RETRIES
; i
++) {
312 rowCount_ht1TargetAtPeer1
= utility2
.countRows(ht1TargetAtPeer1
);
313 rowCount_ht2TargetAtPeer1
= utility2
.countRows(ht2TargetAtPeer1
);
314 if (i
== NB_RETRIES
- 1) {
315 if (rowCount_ht1TargetAtPeer1
!= 50 || rowCount_ht2TargetAtPeer1
!= 100) {
316 // syncUP still failed. Let's look at the source in case anything wrong there
317 utility1
.restartHBaseCluster(1);
318 rowCount_ht1Source
= utility1
.countRows(ht1Source
);
319 LOG
.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source
);
320 rowCount_ht2Source
= utility1
.countRows(ht2Source
);
321 LOG
.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source
);
323 assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
324 rowCount_ht1TargetAtPeer1
);
325 assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
326 rowCount_ht2TargetAtPeer1
);
328 if (rowCount_ht1TargetAtPeer1
== 50 && rowCount_ht2TargetAtPeer1
== 100) {
329 LOG
.info("SyncUpAfterDelete succeeded at retry = " + i
);
332 LOG
.debug("SyncUpAfterDelete failed at retry = " + i
+ ", with rowCount_ht1TargetPeer1 ="
333 + rowCount_ht1TargetAtPeer1
+ " and rowCount_ht2TargetAtPeer1 ="
334 + rowCount_ht2TargetAtPeer1
);
336 Thread
.sleep(SLEEP_TIME
);
340 private void mimicSyncUpAfterPut() throws Exception
{
341 LOG
.debug("mimicSyncUpAfterPut");
342 utility1
.restartHBaseCluster(1);
343 utility2
.shutdownMiniHBaseCluster();
346 // another 100 + 1 row to t1_syncup
347 // we should see 100 + 2 rows now
348 for (int i
= 0; i
< NB_ROWS_IN_BATCH
; i
++) {
349 p
= new Put(Bytes
.toBytes("row" + i
));
350 p
.addColumn(famName
, qualName
, Bytes
.toBytes("val" + i
));
353 p
= new Put(Bytes
.toBytes("row" + 9998));
354 p
.addColumn(noRepfamName
, qualName
, Bytes
.toBytes("val" + 9998));
357 // another 200 + 1 row to t1_syncup
358 // we should see 200 + 2 rows now
359 for (int i
= 0; i
< NB_ROWS_IN_BATCH
* 2; i
++) {
360 p
= new Put(Bytes
.toBytes("row" + i
));
361 p
.addColumn(famName
, qualName
, Bytes
.toBytes("val" + i
));
364 p
= new Put(Bytes
.toBytes("row" + 9998));
365 p
.addColumn(noRepfamName
, qualName
, Bytes
.toBytes("val" + 9998));
368 int rowCount_ht1Source
= utility1
.countRows(ht1Source
);
369 assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source
);
370 int rowCount_ht2Source
= utility1
.countRows(ht2Source
);
371 assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source
);
373 utility1
.shutdownMiniHBaseCluster();
374 utility2
.restartHBaseCluster(1);
376 Thread
.sleep(SLEEP_TIME
);
379 int rowCount_ht1TargetAtPeer1
= utility2
.countRows(ht1TargetAtPeer1
);
380 int rowCount_ht2TargetAtPeer1
= utility2
.countRows(ht2TargetAtPeer1
);
381 assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
382 rowCount_ht1TargetAtPeer1
);
383 assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
384 rowCount_ht2TargetAtPeer1
);
387 for (int i
= 0; i
< NB_RETRIES
; i
++) {
389 rowCount_ht1TargetAtPeer1
= utility2
.countRows(ht1TargetAtPeer1
);
390 rowCount_ht2TargetAtPeer1
= utility2
.countRows(ht2TargetAtPeer1
);
391 if (i
== NB_RETRIES
- 1) {
392 if (rowCount_ht1TargetAtPeer1
!= 100 || rowCount_ht2TargetAtPeer1
!= 200) {
393 // syncUP still failed. Let's look at the source in case anything wrong there
394 utility1
.restartHBaseCluster(1);
395 rowCount_ht1Source
= utility1
.countRows(ht1Source
);
396 LOG
.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source
);
397 rowCount_ht2Source
= utility1
.countRows(ht2Source
);
398 LOG
.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source
);
400 assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
401 rowCount_ht1TargetAtPeer1
);
402 assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
403 rowCount_ht2TargetAtPeer1
);
405 if (rowCount_ht1TargetAtPeer1
== 100 && rowCount_ht2TargetAtPeer1
== 200) {
406 LOG
.info("SyncUpAfterPut succeeded at retry = " + i
);
409 LOG
.debug("SyncUpAfterPut failed at retry = " + i
+ ", with rowCount_ht1TargetPeer1 ="
410 + rowCount_ht1TargetAtPeer1
+ " and rowCount_ht2TargetAtPeer1 ="
411 + rowCount_ht2TargetAtPeer1
);
413 Thread
.sleep(SLEEP_TIME
);
417 protected void syncUp(HBaseTestingUtility ut
) throws Exception
{
418 ReplicationSyncUp
.setConfigure(ut
.getConfiguration());
419 String
[] arguments
= new String
[] { null };
420 new ReplicationSyncUp().run(arguments
);