HBASE-26567 Remove IndexType from ChunkCreator (#3947)
[hbase.git] / hbase-server / src / test / java / org / apache / hadoop / hbase / regionserver / TestBulkLoadReplication.java
blobc2333920350b65f814bdd1736d2a023da3f52d68
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org.apache.hadoop.hbase.regionserver;
20 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
21 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertTrue;
25 import java.io.File;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Optional;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FSDataOutputStream;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hbase.Cell;
39 import org.apache.hadoop.hbase.CellBuilder;
40 import org.apache.hadoop.hbase.CellBuilderFactory;
41 import org.apache.hadoop.hbase.CellBuilderType;
42 import org.apache.hadoop.hbase.HBaseClassTestRule;
43 import org.apache.hadoop.hbase.HBaseTestingUtil;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.KeyValue;
46 import org.apache.hadoop.hbase.TableName;
47 import org.apache.hadoop.hbase.client.Admin;
48 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
49 import org.apache.hadoop.hbase.client.Connection;
50 import org.apache.hadoop.hbase.client.ConnectionFactory;
51 import org.apache.hadoop.hbase.client.Get;
52 import org.apache.hadoop.hbase.client.Result;
53 import org.apache.hadoop.hbase.client.Table;
54 import org.apache.hadoop.hbase.client.TableDescriptor;
55 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
56 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
57 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
58 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
59 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
60 import org.apache.hadoop.hbase.io.hfile.HFile;
61 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
62 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
63 import org.apache.hadoop.hbase.replication.TestReplicationBase;
64 import org.apache.hadoop.hbase.testclassification.MediumTests;
65 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
66 import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
67 import org.apache.hadoop.hbase.util.Bytes;
68 import org.apache.hadoop.hbase.util.Pair;
69 import org.apache.hadoop.hdfs.MiniDFSCluster;
70 import org.junit.After;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.ClassRule;
74 import org.junit.Rule;
75 import org.junit.Test;
76 import org.junit.experimental.categories.Category;
77 import org.junit.rules.TemporaryFolder;
78 import org.junit.rules.TestName;
79 import org.slf4j.Logger;
80 import org.slf4j.LoggerFactory;
82 /**
83 * Integration test for bulk load replication. Defines three clusters, with the following
84 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
85 * 2 and 3).
87 * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
88 * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
89 * topology all these bulk loads should get replicated only once on each peer. To assert this,
90 * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
91 * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
92 * we are not entering the infinite loop condition addressed by HBASE-22380.
94 @Category({ ReplicationTests.class, MediumTests.class})
95 public class TestBulkLoadReplication extends TestReplicationBase {
97 @ClassRule
98 public static final HBaseClassTestRule CLASS_RULE =
99 HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
101 protected static final Logger LOG =
102 LoggerFactory.getLogger(TestBulkLoadReplication.class);
104 private static final String PEER1_CLUSTER_ID = "peer1";
105 private static final String PEER2_CLUSTER_ID = "peer2";
106 private static final String PEER3_CLUSTER_ID = "peer3";
108 private static final String PEER_ID1 = "1";
109 private static final String PEER_ID3 = "3";
111 private static AtomicInteger BULK_LOADS_COUNT;
112 private static CountDownLatch BULK_LOAD_LATCH;
114 protected static final HBaseTestingUtil UTIL3 = new HBaseTestingUtil();
115 protected static final Configuration CONF3 = UTIL3.getConfiguration();
117 private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
119 private static Table htable3;
121 @Rule
122 public TestName name = new TestName();
124 @ClassRule
125 public static TemporaryFolder testFolder = new TemporaryFolder();
127 @BeforeClass
128 public static void setUpBeforeClass() throws Exception {
129 setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
130 setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
131 setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
132 setupConfig(UTIL3, "/3");
133 TestReplicationBase.setUpBeforeClass();
134 startThirdCluster();
137 private static void startThirdCluster() throws Exception {
138 LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
139 UTIL3.setZkCluster(UTIL1.getZkCluster());
140 UTIL3.startMiniCluster(NUM_SLAVES1);
142 TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
143 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
144 .setMobEnabled(true)
145 .setMobThreshold(4000)
146 .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
147 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
149 Connection connection3 = ConnectionFactory.createConnection(CONF3);
150 try (Admin admin3 = connection3.getAdmin()) {
151 admin3.createTable(table, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE);
153 UTIL3.waitUntilAllRegionsAssigned(tableName);
154 htable3 = connection3.getTable(tableName);
157 @Before
158 @Override
159 public void setUpBase() throws Exception {
160 //"super.setUpBase()" already sets replication from 1->2,
161 //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
162 //So we have following topology: "1 <-> 2 <->3"
163 super.setUpBase();
164 ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
165 ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
166 ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
167 //adds cluster1 as a remote peer on cluster2
168 UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
169 //adds cluster3 as a remote peer on cluster2
170 UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
171 //adds cluster2 as a remote peer on cluster3
172 UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
173 setupCoprocessor(UTIL1);
174 setupCoprocessor(UTIL2);
175 setupCoprocessor(UTIL3);
176 BULK_LOADS_COUNT = new AtomicInteger(0);
179 private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtil util) {
180 return ReplicationPeerConfig.newBuilder()
181 .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
184 private void setupCoprocessor(HBaseTestingUtil cluster){
185 cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
186 try {
187 TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
188 findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
189 if(cp == null) {
190 r.getCoprocessorHost().
191 load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
192 cluster.getConfiguration());
193 cp = r.getCoprocessorHost().
194 findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
195 cp.clusterName = cluster.getClusterKey();
197 } catch (Exception e){
198 LOG.error(e.getMessage(), e);
203 @After
204 @Override
205 public void tearDownBase() throws Exception {
206 super.tearDownBase();
207 UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
208 UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
209 UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
212 protected static void setupBulkLoadConfigsForCluster(Configuration config,
213 String clusterReplicationId) throws Exception {
214 config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
215 config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
216 File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
217 File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
218 + "/hbase-site.xml");
219 config.writeXml(new FileOutputStream(sourceConfigFile));
220 config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
223 @Test
224 public void testBulkLoadReplicationActiveActive() throws Exception {
225 Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
226 Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
227 Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
228 byte[] row = Bytes.toBytes("001");
229 byte[] value = Bytes.toBytes("v1");
230 assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
231 peer2TestTable, peer3TestTable);
232 row = Bytes.toBytes("002");
233 value = Bytes.toBytes("v2");
234 assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable,
235 peer2TestTable, peer3TestTable);
236 row = Bytes.toBytes("003");
237 value = Bytes.toBytes("v3");
238 assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable,
239 peer2TestTable, peer3TestTable);
240 //Additional wait to make sure no extra bulk load happens
241 Thread.sleep(400);
242 //We have 3 bulk load events (1 initiated on each cluster).
243 //Each event gets 3 counts (the originator cluster, plus the two peers),
244 //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
245 assertEquals(9, BULK_LOADS_COUNT.get());
249 protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
250 HBaseTestingUtil utility, Table...tables) throws Exception {
251 BULK_LOAD_LATCH = new CountDownLatch(3);
252 bulkLoadOnCluster(tableName, row, value, utility);
253 assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
254 assertTableHasValue(tables[0], row, value);
255 assertTableHasValue(tables[1], row, value);
256 assertTableHasValue(tables[2], row, value);
259 protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
260 HBaseTestingUtil cluster) throws Exception {
261 String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
262 copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
263 BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
264 bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
267 private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
268 Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
269 cluster.getFileSystem().mkdirs(bulkLoadDir);
270 cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
273 protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
274 Get get = new Get(row);
275 Result result = table.get(get);
276 assertTrue(result.advance());
277 assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
280 protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
281 Get get = new Get(row);
282 Result result = table.get(get);
283 assertTrue(result.isEmpty());
286 private String createHFileForFamilies(byte[] row, byte[] value,
287 Configuration clusterConfig) throws IOException {
288 CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
289 cellBuilder.setRow(row)
290 .setFamily(TestReplicationBase.famName)
291 .setQualifier(Bytes.toBytes("1"))
292 .setValue(value)
293 .setType(Cell.Type.Put);
295 HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
296 // TODO We need a way to do this without creating files
297 File hFileLocation = testFolder.newFile();
298 FSDataOutputStream out =
299 new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
300 try {
301 hFileFactory.withOutputStream(out);
302 hFileFactory.withFileContext(new HFileContextBuilder().build());
303 HFile.Writer writer = hFileFactory.create();
304 try {
305 writer.append(new KeyValue(cellBuilder.build()));
306 } finally {
307 writer.close();
309 } finally {
310 out.close();
312 return hFileLocation.getAbsoluteFile().getAbsolutePath();
315 public static class BulkReplicationTestObserver implements RegionCoprocessor {
317 String clusterName;
318 AtomicInteger bulkLoadCounts = new AtomicInteger();
320 @Override
321 public Optional<RegionObserver> getRegionObserver() {
322 return Optional.of(new RegionObserver() {
324 @Override
325 public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
326 List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
327 throws IOException {
328 BULK_LOAD_LATCH.countDown();
329 BULK_LOADS_COUNT.incrementAndGet();
330 LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
331 bulkLoadCounts.addAndGet(1));