2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.replication
;
20 import static org
.apache
.hadoop
.hbase
.HBaseTestingUtility
.countRows
;
21 import static org
.apache
.hadoop
.hbase
.replication
.TestReplicationBase
.NB_RETRIES
;
22 import static org
.apache
.hadoop
.hbase
.replication
.TestReplicationBase
.SLEEP_TIME
;
23 import static org
.apache
.hadoop
.hbase
.replication
.TestReplicationBase
.row
;
24 import static org
.junit
.Assert
.assertEquals
;
26 import java
.io
.IOException
;
27 import java
.util
.ArrayList
;
28 import java
.util
.Collections
;
29 import java
.util
.HashSet
;
30 import java
.util
.Iterator
;
31 import java
.util
.List
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.fs
.FileSystem
;
35 import org
.apache
.hadoop
.fs
.Path
;
36 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.client
.Table
;
40 import org
.apache
.hadoop
.hbase
.quotas
.QuotaUtil
;
41 import org
.apache
.hadoop
.hbase
.replication
.regionserver
.TestSourceFSConfigurationProvider
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
44 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFiles
;
45 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 import org
.apache
.hadoop
.hbase
.util
.HFileTestUtil
;
47 import org
.junit
.ClassRule
;
48 import org
.junit
.Test
;
49 import org
.junit
.experimental
.categories
.Category
;
50 import org
.slf4j
.Logger
;
51 import org
.slf4j
.LoggerFactory
;
53 @Category({ ReplicationTests
.class, LargeTests
.class })
54 public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplicationSyncUpToolBase
{
57 public static final HBaseClassTestRule CLASS_RULE
=
58 HBaseClassTestRule
.forClass(TestReplicationSyncUpToolWithBulkLoadedData
.class);
60 private static final Logger LOG
=
61 LoggerFactory
.getLogger(TestReplicationSyncUpToolWithBulkLoadedData
.class);
64 protected void customizeClusterConf(Configuration conf
) {
65 conf
.setBoolean(HConstants
.REPLICATION_BULKLOAD_ENABLE_KEY
, true);
66 conf
.set(HConstants
.REPLICATION_CLUSTER_ID
, "12345");
67 conf
.setBoolean(QuotaUtil
.QUOTA_CONF_KEY
, true);
68 conf
.set("hbase.replication.source.fs.conf.provider",
69 TestSourceFSConfigurationProvider
.class.getCanonicalName());
73 public void testSyncUpTool() throws Exception
{
75 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
76 * 'cf1' : replicated 'norep': not replicated
81 * Prepare 24 random hfile ranges required for creating hfiles
83 Iterator
<String
> randomHFileRangeListIterator
= null;
84 Set
<String
> randomHFileRanges
= new HashSet
<>(24);
85 for (int i
= 0; i
< 24; i
++) {
86 randomHFileRanges
.add(UTIL1
.getRandomUUID().toString());
88 List
<String
> randomHFileRangeList
= new ArrayList
<>(randomHFileRanges
);
89 Collections
.sort(randomHFileRangeList
);
90 randomHFileRangeListIterator
= randomHFileRangeList
.iterator();
93 * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3
94 * rows into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1,
95 * and 3 rows into norep verify correctly replicated to slave
97 loadAndReplicateHFiles(true, randomHFileRangeListIterator
);
100 * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
101 * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
102 * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
103 * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
104 * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
105 * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
108 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator
);
112 private void mimicSyncUpAfterBulkLoad(Iterator
<String
> randomHFileRangeListIterator
)
114 LOG
.debug("mimicSyncUpAfterBulkLoad");
115 shutDownTargetHBaseCluster();
117 loadAndReplicateHFiles(false, randomHFileRangeListIterator
);
119 int rowCount_ht1Source
= countRows(ht1Source
);
120 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
123 int rowCount_ht2Source
= countRows(ht2Source
);
124 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
127 shutDownSourceHBaseCluster();
128 restartTargetHBaseCluster(1);
130 Thread
.sleep(SLEEP_TIME
);
133 int rowCountHt1TargetAtPeer1
= countRows(ht1TargetAtPeer1
);
134 int rowCountHt2TargetAtPeer1
= countRows(ht2TargetAtPeer1
);
135 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1
);
136 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1
);
142 for (int i
= 0; i
< NB_RETRIES
; i
++) {
144 rowCountHt1TargetAtPeer1
= countRows(ht1TargetAtPeer1
);
145 rowCountHt2TargetAtPeer1
= countRows(ht2TargetAtPeer1
);
146 if (i
== NB_RETRIES
- 1) {
147 if (rowCountHt1TargetAtPeer1
!= 200 || rowCountHt2TargetAtPeer1
!= 400) {
148 // syncUP still failed. Let's look at the source in case anything wrong there
149 restartSourceHBaseCluster(1);
150 rowCount_ht1Source
= countRows(ht1Source
);
151 LOG
.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source
);
152 rowCount_ht2Source
= countRows(ht2Source
);
153 LOG
.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source
);
155 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
156 rowCountHt1TargetAtPeer1
);
157 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
158 rowCountHt2TargetAtPeer1
);
160 if (rowCountHt1TargetAtPeer1
== 200 && rowCountHt2TargetAtPeer1
== 400) {
161 LOG
.info("SyncUpAfterBulkLoad succeeded at retry = " + i
);
164 LOG
.debug("SyncUpAfterBulkLoad failed at retry = " + i
+
165 ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1
+
166 " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1
);
168 Thread
.sleep(SLEEP_TIME
);
172 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave
,
173 Iterator
<String
> randomHFileRangeListIterator
) throws Exception
{
174 LOG
.debug("loadAndReplicateHFiles");
176 // Load 50 + 50 + 3 hfiles to t1_syncup.
177 byte[][][] hfileRanges
=
178 new byte[][][] { new byte[][] { Bytes
.toBytes(randomHFileRangeListIterator
.next()),
179 Bytes
.toBytes(randomHFileRangeListIterator
.next()) } };
180 loadAndValidateHFileReplication("HFileReplication_1", row
, FAMILY
, ht1Source
, hfileRanges
, 50);
183 new byte[][][] { new byte[][] { Bytes
.toBytes(randomHFileRangeListIterator
.next()),
184 Bytes
.toBytes(randomHFileRangeListIterator
.next()) } };
185 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row
, FAMILY
, ht1Source
,
189 new byte[][][] { new byte[][] { Bytes
.toBytes(randomHFileRangeListIterator
.next()),
190 Bytes
.toBytes(randomHFileRangeListIterator
.next()) } };
191 loadAndValidateHFileReplication("HFileReplication_1", row
, NO_REP_FAMILY
, ht1Source
,
194 // Load 100 + 100 + 3 hfiles to t2_syncup.
196 new byte[][][] { new byte[][] { Bytes
.toBytes(randomHFileRangeListIterator
.next()),
197 Bytes
.toBytes(randomHFileRangeListIterator
.next()) } };
198 loadAndValidateHFileReplication("HFileReplication_1", row
, FAMILY
, ht2Source
, hfileRanges
, 100);
201 new byte[][][] { new byte[][] { Bytes
.toBytes(randomHFileRangeListIterator
.next()),
202 Bytes
.toBytes(randomHFileRangeListIterator
.next()) } };
203 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row
, FAMILY
, ht2Source
,
207 new byte[][][] { new byte[][] { Bytes
.toBytes(randomHFileRangeListIterator
.next()),
208 Bytes
.toBytes(randomHFileRangeListIterator
.next()) } };
209 loadAndValidateHFileReplication("HFileReplication_1", row
, NO_REP_FAMILY
, ht2Source
,
212 if (verifyReplicationOnSlave
) {
213 // ensure replication completed
214 wait(ht1TargetAtPeer1
, countRows(ht1Source
) - 3,
215 "t1_syncup has 103 rows on source, and 100 on slave1");
217 wait(ht2TargetAtPeer1
, countRows(ht2Source
) - 3,
218 "t2_syncup has 203 rows on source, and 200 on slave1");
222 private void loadAndValidateHFileReplication(String testName
, byte[] row
, byte[] fam
,
223 Table source
, byte[][][] hfileRanges
, int numOfRows
) throws Exception
{
224 Path dir
= UTIL1
.getDataTestDirOnTestFS(testName
);
225 FileSystem fs
= UTIL1
.getTestFileSystem();
226 dir
= dir
.makeQualified(fs
);
227 Path familyDir
= new Path(dir
, Bytes
.toString(fam
));
230 for (byte[][] range
: hfileRanges
) {
231 byte[] from
= range
[0];
232 byte[] to
= range
[1];
233 HFileTestUtil
.createHFile(UTIL1
.getConfiguration(), fs
,
234 new Path(familyDir
, "hfile_" + hfileIdx
++), fam
, row
, from
, to
, numOfRows
);
237 final TableName tableName
= source
.getName();
238 BulkLoadHFiles loader
= BulkLoadHFiles
.create(UTIL1
.getConfiguration());
239 loader
.bulkLoad(tableName
, dir
);
242 private void loadFromOtherHDFSAndValidateHFileReplication(String testName
, byte[] row
, byte[] fam
,
243 Table source
, byte[][][] hfileRanges
, int numOfRows
) throws Exception
{
244 Path dir
= UTIL2
.getDataTestDirOnTestFS(testName
);
245 FileSystem fs
= UTIL2
.getTestFileSystem();
246 dir
= dir
.makeQualified(fs
);
247 Path familyDir
= new Path(dir
, Bytes
.toString(fam
));
250 for (byte[][] range
: hfileRanges
) {
251 byte[] from
= range
[0];
252 byte[] to
= range
[1];
253 HFileTestUtil
.createHFile(UTIL2
.getConfiguration(), fs
,
254 new Path(familyDir
, "hfile_" + hfileIdx
++), fam
, row
, from
, to
, numOfRows
);
257 final TableName tableName
= source
.getName();
258 BulkLoadHFiles loader
= BulkLoadHFiles
.create(UTIL1
.getConfiguration());
259 loader
.bulkLoad(tableName
, dir
);
262 private void wait(Table target
, int expectedCount
, String msg
)
263 throws IOException
, InterruptedException
{
264 for (int i
= 0; i
< NB_RETRIES
; i
++) {
265 int rowCountHt2TargetAtPeer1
= countRows(target
);
266 if (i
== NB_RETRIES
- 1) {
267 assertEquals(msg
, expectedCount
, rowCountHt2TargetAtPeer1
);
269 if (expectedCount
== rowCountHt2TargetAtPeer1
) {
272 Thread
.sleep(SLEEP_TIME
);