2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.AsyncConnectionConfiguration
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertFalse
;
23 import static org
.junit
.Assert
.assertTrue
;
24 import static org
.junit
.Assert
.fail
;
26 import java
.io
.IOException
;
27 import java
.util
.HashMap
;
28 import java
.util
.List
;
30 import java
.util
.concurrent
.CompletionException
;
31 import java
.util
.concurrent
.ForkJoinPool
;
32 import java
.util
.regex
.Pattern
;
33 import org
.apache
.hadoop
.conf
.Configuration
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseConfiguration
;
36 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
37 import org
.apache
.hadoop
.hbase
.HConstants
;
38 import org
.apache
.hadoop
.hbase
.TableName
;
39 import org
.apache
.hadoop
.hbase
.TableNotFoundException
;
40 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfig
;
41 import org
.apache
.hadoop
.hbase
.replication
.ReplicationPeerConfigBuilder
;
42 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
44 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
45 import org
.junit
.After
;
46 import org
.junit
.AfterClass
;
47 import org
.junit
.BeforeClass
;
48 import org
.junit
.ClassRule
;
49 import org
.junit
.Test
;
50 import org
.junit
.experimental
.categories
.Category
;
51 import org
.junit
.runner
.RunWith
;
52 import org
.junit
.runners
.Parameterized
;
55 * Class to test asynchronous replication admin operations when more than 1 cluster
57 @RunWith(Parameterized
.class)
58 @Category({LargeTests
.class, ClientTests
.class})
59 public class TestAsyncReplicationAdminApiWithClusters
extends TestAsyncAdminBase
{
62 public static final HBaseClassTestRule CLASS_RULE
=
63 HBaseClassTestRule
.forClass(TestAsyncReplicationAdminApiWithClusters
.class);
65 private final static String ID_SECOND
= "2";
67 private static HBaseTestingUtil TEST_UTIL2
;
68 private static Configuration conf2
;
69 private static AsyncAdmin admin2
;
70 private static AsyncConnection connection
;
73 public static void setUpBeforeClass() throws Exception
{
74 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 60000);
75 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 120000);
76 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
77 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
78 TEST_UTIL
.startMiniCluster();
79 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
81 conf2
= HBaseConfiguration
.create(TEST_UTIL
.getConfiguration());
82 conf2
.set(HConstants
.ZOOKEEPER_ZNODE_PARENT
, "/2");
83 TEST_UTIL2
= new HBaseTestingUtil(conf2
);
84 TEST_UTIL2
.startMiniCluster();
87 ConnectionFactory
.createAsyncConnection(TEST_UTIL2
.getConfiguration()).get();
88 admin2
= connection
.getAdmin();
90 ReplicationPeerConfig rpc
= ReplicationPeerConfig
.newBuilder()
91 .setClusterKey(TEST_UTIL2
.getClusterKey()).build();
92 ASYNC_CONN
.getAdmin().addReplicationPeer(ID_SECOND
, rpc
).join();
96 public static void clearUp() throws IOException
{
102 public void tearDown() throws Exception
{
103 Pattern pattern
= Pattern
.compile(tableName
.getNameAsString() + ".*");
104 cleanupTables(admin
, pattern
);
105 cleanupTables(admin2
, pattern
);
108 private void cleanupTables(AsyncAdmin admin
, Pattern pattern
) {
109 admin
.listTableNames(pattern
, false).whenCompleteAsync((tables
, err
) -> {
110 if (tables
!= null) {
111 tables
.forEach(table
-> {
113 admin
.disableTable(table
).join();
114 } catch (Exception e
) {
115 LOG
.debug("Table: " + tableName
+ " already disabled, so just deleting it.");
117 admin
.deleteTable(table
).join();
120 }, ForkJoinPool
.commonPool()).join();
123 private void createTableWithDefaultConf(AsyncAdmin admin
, TableName tableName
) {
124 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
125 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
));
126 admin
.createTable(builder
.build()).join();
130 public void testEnableAndDisableTableReplication() throws Exception
{
131 // default replication scope is local
132 createTableWithDefaultConf(tableName
);
133 admin
.enableTableReplication(tableName
).join();
134 TableDescriptor tableDesc
= admin
.getDescriptor(tableName
).get();
135 for (ColumnFamilyDescriptor fam
: tableDesc
.getColumnFamilies()) {
136 assertEquals(HConstants
.REPLICATION_SCOPE_GLOBAL
, fam
.getScope());
139 admin
.disableTableReplication(tableName
).join();
140 tableDesc
= admin
.getDescriptor(tableName
).get();
141 for (ColumnFamilyDescriptor fam
: tableDesc
.getColumnFamilies()) {
142 assertEquals(HConstants
.REPLICATION_SCOPE_LOCAL
, fam
.getScope());
147 public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception
{
148 // Only create table in source cluster
149 createTableWithDefaultConf(tableName
);
150 assertFalse(admin2
.tableExists(tableName
).get());
151 admin
.enableTableReplication(tableName
).join();
152 assertTrue(admin2
.tableExists(tableName
).get());
156 public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception
{
157 createTableWithDefaultConf(admin
, tableName
);
158 createTableWithDefaultConf(admin2
, tableName
);
159 TableDescriptorBuilder builder
=
160 TableDescriptorBuilder
.newBuilder(admin
.getDescriptor(tableName
).get());
161 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(Bytes
.toBytes("newFamily"))
163 admin2
.disableTable(tableName
).join();
164 admin2
.modifyTable(builder
.build()).join();
165 admin2
.enableTable(tableName
).join();
168 admin
.enableTableReplication(tableName
).join();
169 fail("Exception should be thrown if table descriptors in the clusters are not same.");
170 } catch (Exception ignored
) {
174 admin
.disableTable(tableName
).join();
175 admin
.modifyTable(builder
.build()).join();
176 admin
.enableTable(tableName
).join();
177 admin
.enableTableReplication(tableName
).join();
178 TableDescriptor tableDesc
= admin
.getDescriptor(tableName
).get();
179 for (ColumnFamilyDescriptor fam
: tableDesc
.getColumnFamilies()) {
180 assertEquals(HConstants
.REPLICATION_SCOPE_GLOBAL
, fam
.getScope());
185 public void testDisableReplicationForNonExistingTable() throws Exception
{
187 admin
.disableTableReplication(tableName
).join();
188 } catch (CompletionException e
) {
189 assertTrue(e
.getCause() instanceof TableNotFoundException
);
194 public void testEnableReplicationForNonExistingTable() throws Exception
{
196 admin
.enableTableReplication(tableName
).join();
197 } catch (CompletionException e
) {
198 assertTrue(e
.getCause() instanceof TableNotFoundException
);
203 public void testDisableReplicationWhenTableNameAsNull() throws Exception
{
205 admin
.disableTableReplication(null).join();
206 } catch (CompletionException e
) {
207 assertTrue(e
.getCause() instanceof IllegalArgumentException
);
212 public void testEnableReplicationWhenTableNameAsNull() throws Exception
{
214 admin
.enableTableReplication(null).join();
215 } catch (CompletionException e
) {
216 assertTrue(e
.getCause() instanceof IllegalArgumentException
);
221 * Test enable table replication should create table only in user explicit specified table-cfs.
225 public void testEnableReplicationForExplicitSetTableCfs() throws Exception
{
226 TableName tableName2
= TableName
.valueOf(tableName
.getNameAsString() + "2");
227 // Only create table in source cluster
228 createTableWithDefaultConf(tableName
);
229 createTableWithDefaultConf(tableName2
);
230 assertFalse("Table should not exists in the peer cluster",
231 admin2
.tableExists(tableName
).get());
232 assertFalse("Table should not exists in the peer cluster",
233 admin2
.tableExists(tableName2
).get());
235 Map
<TableName
, List
<String
>> tableCfs
= new HashMap
<>();
236 tableCfs
.put(tableName
, null);
237 ReplicationPeerConfigBuilder rpcBuilder
= ReplicationPeerConfig
238 .newBuilder(admin
.getReplicationPeerConfig(ID_SECOND
).get())
239 .setReplicateAllUserTables(false)
240 .setTableCFsMap(tableCfs
);
242 // Only add tableName to replication peer config
243 admin
.updateReplicationPeerConfig(ID_SECOND
, rpcBuilder
.build()).join();
244 admin
.enableTableReplication(tableName2
).join();
245 assertFalse("Table should not be created if user has set table cfs explicitly for the "
246 + "peer and this is not part of that collection", admin2
.tableExists(tableName2
).get());
248 // Add tableName2 to replication peer config, too
249 tableCfs
.put(tableName2
, null);
250 rpcBuilder
.setTableCFsMap(tableCfs
);
251 admin
.updateReplicationPeerConfig(ID_SECOND
, rpcBuilder
.build()).join();
252 admin
.enableTableReplication(tableName2
).join();
254 "Table should be created if user has explicitly added table into table cfs collection",
255 admin2
.tableExists(tableName2
).get());
257 rpcBuilder
.setTableCFsMap(null).setReplicateAllUserTables(true).build();
258 admin
.updateReplicationPeerConfig(ID_SECOND
, rpcBuilder
.build()).join();