HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (...
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / replication / TestReplicationSyncUpToolWithBulkLoadedData.java
blob5c4fc9198ccb6ea4dafd6eec91e3b34c5f6455f4
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.replication;
20 import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows;
21 import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
22 import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
23 import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
24 import static org.junit.Assert.assertEquals;
26 import java.io.IOException;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Set;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileSystem;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HBaseClassTestRule;
37 import org.apache.hadoop.hbase.HConstants;
38 import org.apache.hadoop.hbase.TableName;
39 import org.apache.hadoop.hbase.client.Table;
40 import org.apache.hadoop.hbase.quotas.QuotaUtil;
41 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
42 import org.apache.hadoop.hbase.testclassification.LargeTests;
43 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
44 import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
45 import org.apache.hadoop.hbase.util.Bytes;
46 import org.apache.hadoop.hbase.util.HFileTestUtil;
47 import org.junit.ClassRule;
48 import org.junit.Test;
49 import org.junit.experimental.categories.Category;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 @Category({ ReplicationTests.class, LargeTests.class })
54 public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
56 @ClassRule
57 public static final HBaseClassTestRule CLASS_RULE =
58 HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
60 private static final Logger LOG =
61 LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
63 @Override
64 protected void customizeClusterConf(Configuration conf) {
65 conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
66 conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
67 conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
68 conf.set("hbase.replication.source.fs.conf.provider",
69 TestSourceFSConfigurationProvider.class.getCanonicalName());
72 @Test
73 public void testSyncUpTool() throws Exception {
74 /**
75 * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
76 * 'cf1' : replicated 'norep': not replicated
78 setupReplication();
80 /**
81 * Prepare 24 random hfile ranges required for creating hfiles
83 Iterator<String> randomHFileRangeListIterator = null;
84 Set<String> randomHFileRanges = new HashSet<>(24);
85 for (int i = 0; i < 24; i++) {
86 randomHFileRanges.add(UTIL1.getRandomUUID().toString());
88 List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
89 Collections.sort(randomHFileRangeList);
90 randomHFileRangeListIterator = randomHFileRangeList.iterator();
92 /**
93 * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3
94 * rows into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1,
95 * and 3 rows into norep verify correctly replicated to slave
97 loadAndReplicateHFiles(true, randomHFileRangeListIterator);
99 /**
100 * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
101 * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
102 * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
103 * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
104 * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
105 * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
106 * Slave
108 mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
112 private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
113 throws Exception {
114 LOG.debug("mimicSyncUpAfterBulkLoad");
115 shutDownTargetHBaseCluster();
117 loadAndReplicateHFiles(false, randomHFileRangeListIterator);
119 int rowCount_ht1Source = countRows(ht1Source);
120 assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
121 rowCount_ht1Source);
123 int rowCount_ht2Source = countRows(ht2Source);
124 assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
125 rowCount_ht2Source);
127 shutDownSourceHBaseCluster();
128 restartTargetHBaseCluster(1);
130 Thread.sleep(SLEEP_TIME);
132 // Before sync up
133 int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
134 int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
135 assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
136 assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
138 // Run sync up tool
139 syncUp(UTIL1);
141 // After syun up
142 for (int i = 0; i < NB_RETRIES; i++) {
143 syncUp(UTIL1);
144 rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
145 rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
146 if (i == NB_RETRIES - 1) {
147 if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
148 // syncUP still failed. Let's look at the source in case anything wrong there
149 restartSourceHBaseCluster(1);
150 rowCount_ht1Source = countRows(ht1Source);
151 LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
152 rowCount_ht2Source = countRows(ht2Source);
153 LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
155 assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
156 rowCountHt1TargetAtPeer1);
157 assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
158 rowCountHt2TargetAtPeer1);
160 if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
161 LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
162 break;
163 } else {
164 LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i +
165 ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 +
166 " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
168 Thread.sleep(SLEEP_TIME);
172 private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
173 Iterator<String> randomHFileRangeListIterator) throws Exception {
174 LOG.debug("loadAndReplicateHFiles");
176 // Load 50 + 50 + 3 hfiles to t1_syncup.
177 byte[][][] hfileRanges =
178 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
179 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
180 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
182 hfileRanges =
183 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
184 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
185 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
186 hfileRanges, 50);
188 hfileRanges =
189 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
190 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
191 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
192 hfileRanges, 3);
194 // Load 100 + 100 + 3 hfiles to t2_syncup.
195 hfileRanges =
196 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
197 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
198 loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
200 hfileRanges =
201 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
202 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
203 loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
204 hfileRanges, 100);
206 hfileRanges =
207 new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
208 Bytes.toBytes(randomHFileRangeListIterator.next()) } };
209 loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
210 hfileRanges, 3);
212 if (verifyReplicationOnSlave) {
213 // ensure replication completed
214 wait(ht1TargetAtPeer1, countRows(ht1Source) - 3,
215 "t1_syncup has 103 rows on source, and 100 on slave1");
217 wait(ht2TargetAtPeer1, countRows(ht2Source) - 3,
218 "t2_syncup has 203 rows on source, and 200 on slave1");
222 private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
223 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
224 Path dir = UTIL1.getDataTestDirOnTestFS(testName);
225 FileSystem fs = UTIL1.getTestFileSystem();
226 dir = dir.makeQualified(fs);
227 Path familyDir = new Path(dir, Bytes.toString(fam));
229 int hfileIdx = 0;
230 for (byte[][] range : hfileRanges) {
231 byte[] from = range[0];
232 byte[] to = range[1];
233 HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
234 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
237 final TableName tableName = source.getName();
238 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
239 loader.bulkLoad(tableName, dir);
242 private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
243 Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
244 Path dir = UTIL2.getDataTestDirOnTestFS(testName);
245 FileSystem fs = UTIL2.getTestFileSystem();
246 dir = dir.makeQualified(fs);
247 Path familyDir = new Path(dir, Bytes.toString(fam));
249 int hfileIdx = 0;
250 for (byte[][] range : hfileRanges) {
251 byte[] from = range[0];
252 byte[] to = range[1];
253 HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
254 new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
257 final TableName tableName = source.getName();
258 BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
259 loader.bulkLoad(tableName, dir);
262 private void wait(Table target, int expectedCount, String msg)
263 throws IOException, InterruptedException {
264 for (int i = 0; i < NB_RETRIES; i++) {
265 int rowCountHt2TargetAtPeer1 = countRows(target);
266 if (i == NB_RETRIES - 1) {
267 assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
269 if (expectedCount == rowCountHt2TargetAtPeer1) {
270 break;
272 Thread.sleep(SLEEP_TIME);