2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.apache
.hadoop
.hbase
.HConstants
.REPLICATION_CLUSTER_ID
;
21 import static org
.apache
.hadoop
.hbase
.HConstants
.REPLICATION_CONF_DIR
;
22 import static org
.junit
.Assert
.assertEquals
;
23 import static org
.junit
.Assert
.assertTrue
;
26 import java
.io
.FileOutputStream
;
27 import java
.io
.IOException
;
28 import java
.util
.List
;
30 import java
.util
.Optional
;
31 import java
.util
.concurrent
.CountDownLatch
;
32 import java
.util
.concurrent
.TimeUnit
;
33 import java
.util
.concurrent
.atomic
.AtomicInteger
;
35 import org
.apache
.hadoop
.conf
.Configuration
;
36 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
37 import org
.apache
.hadoop
.fs
.Path
;
38 import org
.apache
.hadoop
.hbase
.Cell
;
39 import org
.apache
.hadoop
.hbase
.CellBuilder
;
40 import org
.apache
.hadoop
.hbase
.CellBuilderFactory
;
41 import org
.apache
.hadoop
.hbase
.CellBuilderType
;
42 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
43 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
44 import org
.apache
.hadoop
.hbase
.HConstants
;
45 import org
.apache
.hadoop
.hbase
.KeyValue
;
46 import org
.apache
.hadoop
.hbase
.TableName
;
47 import org
.apache
.hadoop
.hbase
.client
.Admin
;
48 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
49 import org
.apache
.hadoop
.hbase
.client
.Connection
;
50 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
51 import org
.apache
.hadoop
.hbase
.client
.Get
;
52 import org
.apache
.hadoop
.hbase
.client
.Result
;
53 import org
.apache
.hadoop
.hbase
.client
.Table
;
54 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
55 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
56 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
57 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
58 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
59 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
60 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFile
;
61 import org
.apache
.hadoop
.hbase
.io
.hfile
.HFileContextBuilder
;
62 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
63 import org
.apache
.hadoop
.hbase
.replication
.TestReplicationBase
;
64 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
65 import org
.apache
.hadoop
.hbase
.testclassification
.ReplicationTests
;
66 import org
.apache
.hadoop
.hbase
.tool
.BulkLoadHFilesTool
;
67 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
68 import org
.apache
.hadoop
.hbase
.util
.Pair
;
69 import org
.apache
.hadoop
.hdfs
.MiniDFSCluster
;
70 import org
.junit
.After
;
71 import org
.junit
.Before
;
72 import org
.junit
.BeforeClass
;
73 import org
.junit
.ClassRule
;
74 import org
.junit
.Rule
;
75 import org
.junit
.Test
;
76 import org
.junit
.experimental
.categories
.Category
;
77 import org
.junit
.rules
.TemporaryFolder
;
78 import org
.junit
.rules
.TestName
;
79 import org
.slf4j
.Logger
;
80 import org
.slf4j
.LoggerFactory
;
83 * Integration test for bulk load replication. Defines three clusters, with the following
84 * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
87 * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
88 * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
89 * topology all these bulk loads should get replicated only once on each peer. To assert this,
90 * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
91 * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
92 * we are not entering the infinite loop condition addressed by HBASE-22380.
94 @Category({ ReplicationTests
.class, MediumTests
.class})
95 public class TestBulkLoadReplication
extends TestReplicationBase
{
98 public static final HBaseClassTestRule CLASS_RULE
=
99 HBaseClassTestRule
.forClass(TestBulkLoadReplication
.class);
101 protected static final Logger LOG
=
102 LoggerFactory
.getLogger(TestBulkLoadReplication
.class);
104 private static final String PEER1_CLUSTER_ID
= "peer1";
105 private static final String PEER2_CLUSTER_ID
= "peer2";
106 private static final String PEER3_CLUSTER_ID
= "peer3";
108 private static final String PEER_ID1
= "1";
109 private static final String PEER_ID3
= "3";
111 private static AtomicInteger BULK_LOADS_COUNT
;
112 private static CountDownLatch BULK_LOAD_LATCH
;
114 protected static final HBaseTestingUtil UTIL3
= new HBaseTestingUtil();
115 protected static final Configuration CONF3
= UTIL3
.getConfiguration();
117 private static final Path BULK_LOAD_BASE_DIR
= new Path("/bulk_dir");
119 private static Table htable3
;
122 public TestName name
= new TestName();
125 public static TemporaryFolder testFolder
= new TemporaryFolder();
128 public static void setUpBeforeClass() throws Exception
{
129 setupBulkLoadConfigsForCluster(CONF1
, PEER1_CLUSTER_ID
);
130 setupBulkLoadConfigsForCluster(CONF2
, PEER2_CLUSTER_ID
);
131 setupBulkLoadConfigsForCluster(CONF3
, PEER3_CLUSTER_ID
);
132 setupConfig(UTIL3
, "/3");
133 TestReplicationBase
.setUpBeforeClass();
137 private static void startThirdCluster() throws Exception
{
138 LOG
.info("Setup Zk to same one from UTIL1 and UTIL2");
139 UTIL3
.setZkCluster(UTIL1
.getZkCluster());
140 UTIL3
.startMiniCluster(NUM_SLAVES1
);
142 TableDescriptor table
= TableDescriptorBuilder
.newBuilder(tableName
)
143 .setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(famName
)
145 .setMobThreshold(4000)
146 .setScope(HConstants
.REPLICATION_SCOPE_GLOBAL
).build())
147 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of(noRepfamName
)).build();
149 Connection connection3
= ConnectionFactory
.createConnection(CONF3
);
150 try (Admin admin3
= connection3
.getAdmin()) {
151 admin3
.createTable(table
, HBaseTestingUtil
.KEYS_FOR_HBA_CREATE_TABLE
);
153 UTIL3
.waitUntilAllRegionsAssigned(tableName
);
154 htable3
= connection3
.getTable(tableName
);
159 public void setUpBase() throws Exception
{
160 //"super.setUpBase()" already sets replication from 1->2,
161 //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
162 //So we have following topology: "1 <-> 2 <->3"
164 ReplicationPeerConfig peer1Config
= getPeerConfigForCluster(UTIL1
);
165 ReplicationPeerConfig peer2Config
= getPeerConfigForCluster(UTIL2
);
166 ReplicationPeerConfig peer3Config
= getPeerConfigForCluster(UTIL3
);
167 //adds cluster1 as a remote peer on cluster2
168 UTIL2
.getAdmin().addReplicationPeer(PEER_ID1
, peer1Config
);
169 //adds cluster3 as a remote peer on cluster2
170 UTIL2
.getAdmin().addReplicationPeer(PEER_ID3
, peer3Config
);
171 //adds cluster2 as a remote peer on cluster3
172 UTIL3
.getAdmin().addReplicationPeer(PEER_ID2
, peer2Config
);
173 setupCoprocessor(UTIL1
);
174 setupCoprocessor(UTIL2
);
175 setupCoprocessor(UTIL3
);
176 BULK_LOADS_COUNT
= new AtomicInteger(0);
179 private ReplicationPeerConfig
getPeerConfigForCluster(HBaseTestingUtil util
) {
180 return ReplicationPeerConfig
.newBuilder()
181 .setClusterKey(util
.getClusterKey()).setSerial(isSerialPeer()).build();
184 private void setupCoprocessor(HBaseTestingUtil cluster
){
185 cluster
.getHBaseCluster().getRegions(tableName
).forEach(r
-> {
187 TestBulkLoadReplication
.BulkReplicationTestObserver cp
= r
.getCoprocessorHost().
188 findCoprocessor(TestBulkLoadReplication
.BulkReplicationTestObserver
.class);
190 r
.getCoprocessorHost().
191 load(TestBulkLoadReplication
.BulkReplicationTestObserver
.class, 0,
192 cluster
.getConfiguration());
193 cp
= r
.getCoprocessorHost().
194 findCoprocessor(TestBulkLoadReplication
.BulkReplicationTestObserver
.class);
195 cp
.clusterName
= cluster
.getClusterKey();
197 } catch (Exception e
){
198 LOG
.error(e
.getMessage(), e
);
205 public void tearDownBase() throws Exception
{
206 super.tearDownBase();
207 UTIL2
.getAdmin().removeReplicationPeer(PEER_ID1
);
208 UTIL2
.getAdmin().removeReplicationPeer(PEER_ID3
);
209 UTIL3
.getAdmin().removeReplicationPeer(PEER_ID2
);
212 protected static void setupBulkLoadConfigsForCluster(Configuration config
,
213 String clusterReplicationId
) throws Exception
{
214 config
.setBoolean(HConstants
.REPLICATION_BULKLOAD_ENABLE_KEY
, true);
215 config
.set(REPLICATION_CLUSTER_ID
, clusterReplicationId
);
216 File sourceConfigFolder
= testFolder
.newFolder(clusterReplicationId
);
217 File sourceConfigFile
= new File(sourceConfigFolder
.getAbsolutePath()
218 + "/hbase-site.xml");
219 config
.writeXml(new FileOutputStream(sourceConfigFile
));
220 config
.set(REPLICATION_CONF_DIR
, testFolder
.getRoot().getAbsolutePath());
224 public void testBulkLoadReplicationActiveActive() throws Exception
{
225 Table peer1TestTable
= UTIL1
.getConnection().getTable(TestReplicationBase
.tableName
);
226 Table peer2TestTable
= UTIL2
.getConnection().getTable(TestReplicationBase
.tableName
);
227 Table peer3TestTable
= UTIL3
.getConnection().getTable(TestReplicationBase
.tableName
);
228 byte[] row
= Bytes
.toBytes("001");
229 byte[] value
= Bytes
.toBytes("v1");
230 assertBulkLoadConditions(tableName
, row
, value
, UTIL1
, peer1TestTable
,
231 peer2TestTable
, peer3TestTable
);
232 row
= Bytes
.toBytes("002");
233 value
= Bytes
.toBytes("v2");
234 assertBulkLoadConditions(tableName
, row
, value
, UTIL2
, peer1TestTable
,
235 peer2TestTable
, peer3TestTable
);
236 row
= Bytes
.toBytes("003");
237 value
= Bytes
.toBytes("v3");
238 assertBulkLoadConditions(tableName
, row
, value
, UTIL3
, peer1TestTable
,
239 peer2TestTable
, peer3TestTable
);
240 //Additional wait to make sure no extra bulk load happens
242 //We have 3 bulk load events (1 initiated on each cluster).
243 //Each event gets 3 counts (the originator cluster, plus the two peers),
244 //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
245 assertEquals(9, BULK_LOADS_COUNT
.get());
249 protected void assertBulkLoadConditions(TableName tableName
, byte[] row
, byte[] value
,
250 HBaseTestingUtil utility
, Table
...tables
) throws Exception
{
251 BULK_LOAD_LATCH
= new CountDownLatch(3);
252 bulkLoadOnCluster(tableName
, row
, value
, utility
);
253 assertTrue(BULK_LOAD_LATCH
.await(1, TimeUnit
.MINUTES
));
254 assertTableHasValue(tables
[0], row
, value
);
255 assertTableHasValue(tables
[1], row
, value
);
256 assertTableHasValue(tables
[2], row
, value
);
259 protected void bulkLoadOnCluster(TableName tableName
, byte[] row
, byte[] value
,
260 HBaseTestingUtil cluster
) throws Exception
{
261 String bulkLoadFilePath
= createHFileForFamilies(row
, value
, cluster
.getConfiguration());
262 copyToHdfs(bulkLoadFilePath
, cluster
.getDFSCluster());
263 BulkLoadHFilesTool bulkLoadHFilesTool
= new BulkLoadHFilesTool(cluster
.getConfiguration());
264 bulkLoadHFilesTool
.bulkLoad(tableName
, BULK_LOAD_BASE_DIR
);
267 private void copyToHdfs(String bulkLoadFilePath
, MiniDFSCluster cluster
) throws Exception
{
268 Path bulkLoadDir
= new Path(BULK_LOAD_BASE_DIR
, "f");
269 cluster
.getFileSystem().mkdirs(bulkLoadDir
);
270 cluster
.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath
), bulkLoadDir
);
273 protected void assertTableHasValue(Table table
, byte[] row
, byte[] value
) throws Exception
{
274 Get get
= new Get(row
);
275 Result result
= table
.get(get
);
276 assertTrue(result
.advance());
277 assertEquals(Bytes
.toString(value
), Bytes
.toString(result
.value()));
280 protected void assertTableNoValue(Table table
, byte[] row
, byte[] value
) throws Exception
{
281 Get get
= new Get(row
);
282 Result result
= table
.get(get
);
283 assertTrue(result
.isEmpty());
286 private String
createHFileForFamilies(byte[] row
, byte[] value
,
287 Configuration clusterConfig
) throws IOException
{
288 CellBuilder cellBuilder
= CellBuilderFactory
.create(CellBuilderType
.DEEP_COPY
);
289 cellBuilder
.setRow(row
)
290 .setFamily(TestReplicationBase
.famName
)
291 .setQualifier(Bytes
.toBytes("1"))
293 .setType(Cell
.Type
.Put
);
295 HFile
.WriterFactory hFileFactory
= HFile
.getWriterFactoryNoCache(clusterConfig
);
296 // TODO We need a way to do this without creating files
297 File hFileLocation
= testFolder
.newFile();
298 FSDataOutputStream out
=
299 new FSDataOutputStream(new FileOutputStream(hFileLocation
), null);
301 hFileFactory
.withOutputStream(out
);
302 hFileFactory
.withFileContext(new HFileContextBuilder().build());
303 HFile
.Writer writer
= hFileFactory
.create();
305 writer
.append(new KeyValue(cellBuilder
.build()));
312 return hFileLocation
.getAbsoluteFile().getAbsolutePath();
315 public static class BulkReplicationTestObserver
implements RegionCoprocessor
{
318 AtomicInteger bulkLoadCounts
= new AtomicInteger();
321 public Optional
<RegionObserver
> getRegionObserver() {
322 return Optional
.of(new RegionObserver() {
325 public void postBulkLoadHFile(ObserverContext
<RegionCoprocessorEnvironment
> ctx
,
326 List
<Pair
<byte[], String
>> stagingFamilyPaths
, Map
<byte[], List
<Path
>> finalPaths
)
328 BULK_LOAD_LATCH
.countDown();
329 BULK_LOADS_COUNT
.incrementAndGet();
330 LOG
.debug("Another file bulk loaded. Total for {}: {}", clusterName
,
331 bulkLoadCounts
.addAndGet(1));