2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertArrayEquals
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertTrue
;
25 import org
.apache
.commons
.io
.FileUtils
;
26 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
27 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
28 import org
.apache
.hadoop
.hbase
.HConstants
;
29 import org
.apache
.hadoop
.hbase
.ServerName
;
30 import org
.apache
.hadoop
.hbase
.SingleProcessHBaseCluster
;
31 import org
.apache
.hadoop
.hbase
.StartTestingClusterOption
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.TableNameTestRule
;
34 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
35 import org
.apache
.hadoop
.hbase
.master
.assignment
.AssignmentTestingUtil
;
36 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
37 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
38 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
39 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
40 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
41 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.BeforeClass
;
44 import org
.junit
.ClassRule
;
45 import org
.junit
.Rule
;
46 import org
.junit
.Test
;
47 import org
.junit
.experimental
.categories
.Category
;
48 import org
.slf4j
.Logger
;
49 import org
.slf4j
.LoggerFactory
;
51 @Category({ ClientTests
.class, MediumTests
.class })
52 public class TestSeparateClientZKCluster
{
53 private static final Logger LOG
= LoggerFactory
.getLogger(TestSeparateClientZKCluster
.class);
54 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
55 private static final File clientZkDir
=
56 new File(TEST_UTIL
.getDataTestDir("TestSeparateClientZKCluster").toString());
57 private static final int ZK_SESSION_TIMEOUT
= 5000;
58 private static MiniZooKeeperCluster clientZkCluster
;
60 private final byte[] family
= Bytes
.toBytes("cf");
61 private final byte[] qualifier
= Bytes
.toBytes("c1");
62 private final byte[] row
= Bytes
.toBytes("row");
63 private final byte[] value
= Bytes
.toBytes("v1");
64 private final byte[] newVal
= Bytes
.toBytes("v2");
67 public TableNameTestRule name
= new TableNameTestRule();
70 public static final HBaseClassTestRule CLASS_RULE
=
71 HBaseClassTestRule
.forClass(TestSeparateClientZKCluster
.class);
74 public static void beforeAllTests() throws Exception
{
75 int clientZkPort
= 21828;
76 clientZkCluster
= new MiniZooKeeperCluster(TEST_UTIL
.getConfiguration());
77 clientZkCluster
.setDefaultClientPort(clientZkPort
);
78 clientZkCluster
.startup(clientZkDir
);
79 // reduce the retry number and start log counter
80 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
81 TEST_UTIL
.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
82 TEST_UTIL
.getConfiguration().setInt("zookeeper.recovery.retry", 1);
83 // core settings for testing client ZK cluster
84 TEST_UTIL
.getConfiguration().setClass(HConstants
.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY
,
85 ZKConnectionRegistry
.class, ConnectionRegistry
.class);
86 TEST_UTIL
.getConfiguration().set(HConstants
.CLIENT_ZOOKEEPER_QUORUM
, HConstants
.LOCALHOST
);
87 TEST_UTIL
.getConfiguration().setInt(HConstants
.CLIENT_ZOOKEEPER_CLIENT_PORT
, clientZkPort
);
88 // reduce zk session timeout to easier trigger session expiration
89 TEST_UTIL
.getConfiguration().setInt(HConstants
.ZK_SESSION_TIMEOUT
, ZK_SESSION_TIMEOUT
);
90 // Start a cluster with 2 masters and 3 regionservers.
91 StartTestingClusterOption option
=
92 StartTestingClusterOption
.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
93 TEST_UTIL
.startMiniCluster(option
);
97 public static void afterAllTests() throws Exception
{
98 TEST_UTIL
.shutdownMiniCluster();
99 clientZkCluster
.shutdown();
100 FileUtils
.deleteDirectory(clientZkDir
);
104 public void testBasicOperation() throws Exception
{
105 TableName tn
= name
.getTableName();
107 Connection conn
= TEST_UTIL
.getConnection();
108 try (Admin admin
= conn
.getAdmin(); Table table
= conn
.getTable(tn
)) {
109 ColumnFamilyDescriptorBuilder cfDescBuilder
=
110 ColumnFamilyDescriptorBuilder
.newBuilder(family
);
111 TableDescriptorBuilder tableDescBuilder
=
112 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
113 admin
.createTable(tableDescBuilder
.build());
114 // test simple get and put
115 Put put
= new Put(row
);
116 put
.addColumn(family
, qualifier
, value
);
118 Get get
= new Get(row
);
119 Result result
= table
.get(get
);
120 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
121 assertArrayEquals(value
, result
.getValue(family
, qualifier
));
126 public void testMasterSwitch() throws Exception
{
127 // get an admin instance and issue some request first
128 Connection conn
= TEST_UTIL
.getConnection();
129 try (Admin admin
= conn
.getAdmin()) {
130 LOG
.debug("Tables: " + admin
.listTableDescriptors());
131 SingleProcessHBaseCluster cluster
= TEST_UTIL
.getHBaseCluster();
132 // switch active master
133 HMaster master
= cluster
.getMaster();
135 LOG
.info("Stopped master {}", master
.getServerName());
136 while (master
.isAlive()) {
139 LOG
.info("Shutdown master {}", master
.getServerName());
140 while (cluster
.getMaster() == null || !cluster
.getMaster().isInitialized()) {
141 LOG
.info("Get master {}",
142 cluster
.getMaster() == null ?
"null" : cluster
.getMaster().getServerName());
145 LOG
.info("Got master {}", cluster
.getMaster().getServerName());
146 // confirm client access still works
147 assertTrue(admin
.balance(BalanceRequest
.defaultInstance()).isBalancerRan());
152 public void testMetaRegionMove() throws Exception
{
153 TableName tn
= name
.getTableName();
155 Connection conn
= TEST_UTIL
.getConnection();
156 try (Admin admin
= conn
.getAdmin();
157 Table table
= conn
.getTable(tn
);
158 RegionLocator locator
= conn
.getRegionLocator(tn
)) {
159 SingleProcessHBaseCluster cluster
= TEST_UTIL
.getHBaseCluster();
160 ColumnFamilyDescriptorBuilder cfDescBuilder
=
161 ColumnFamilyDescriptorBuilder
.newBuilder(family
);
162 TableDescriptorBuilder tableDescBuilder
=
163 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
164 admin
.createTable(tableDescBuilder
.build());
165 // issue some requests to cache the region location
166 Put put
= new Put(row
);
167 put
.addColumn(family
, qualifier
, value
);
169 Get get
= new Get(row
);
170 Result result
= table
.get(get
);
171 // move meta region and confirm client could detect
172 ServerName destServerName
= null;
173 for (RegionServerThread rst
: cluster
.getLiveRegionServerThreads()) {
174 ServerName name
= rst
.getRegionServer().getServerName();
175 if (!name
.equals(cluster
.getServerHoldingMeta())) {
176 destServerName
= name
;
180 admin
.move(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes(), destServerName
);
181 LOG
.debug("Finished moving meta");
182 // invalidate client cache
183 RegionInfo region
= locator
.getRegionLocation(row
).getRegion();
184 ServerName currentServer
= cluster
.getServerHoldingRegion(tn
, region
.getRegionName());
185 for (RegionServerThread rst
: cluster
.getLiveRegionServerThreads()) {
186 ServerName name
= rst
.getRegionServer().getServerName();
187 if (!name
.equals(currentServer
)) {
188 destServerName
= name
;
192 admin
.move(region
.getEncodedNameAsBytes(), destServerName
);
193 LOG
.debug("Finished moving user region");
195 put
.addColumn(family
, qualifier
, newVal
);
197 result
= table
.get(get
);
198 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
199 assertArrayEquals(newVal
, result
.getValue(family
, qualifier
));
204 public void testMetaMoveDuringClientZkClusterRestart() throws Exception
{
205 TableName tn
= name
.getTableName();
207 Connection conn
= TEST_UTIL
.getConnection();
208 try (Admin admin
= conn
.getAdmin(); Table table
= conn
.getTable(tn
)) {
209 ColumnFamilyDescriptorBuilder cfDescBuilder
=
210 ColumnFamilyDescriptorBuilder
.newBuilder(family
);
211 TableDescriptorBuilder tableDescBuilder
=
212 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
213 admin
.createTable(tableDescBuilder
.build());
215 Put put
= new Put(row
);
216 put
.addColumn(family
, qualifier
, value
);
218 // invalid connection cache
219 conn
.clearRegionLocationCache();
220 // stop client zk cluster
221 clientZkCluster
.shutdown();
222 // stop current meta server and confirm the server shutdown process
223 // is not affected by client ZK crash
224 SingleProcessHBaseCluster cluster
= TEST_UTIL
.getHBaseCluster();
225 int metaServerId
= cluster
.getServerWithMeta();
226 HRegionServer metaServer
= cluster
.getRegionServer(metaServerId
);
227 metaServer
.stop("Stop current RS holding meta region");
228 while (metaServer
.isAlive()) {
231 // wait for meta region online
232 AssignmentTestingUtil
.waitForAssignment(cluster
.getMaster().getAssignmentManager(),
233 RegionInfoBuilder
.FIRST_META_REGIONINFO
);
234 // wait some long time to make sure we will retry sync data to client ZK until data set
236 clientZkCluster
.startup(clientZkDir
);
237 // new request should pass
238 Get get
= new Get(row
);
239 Result result
= table
.get(get
);
240 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
241 assertArrayEquals(value
, result
.getValue(family
, qualifier
));
246 public void testAsyncTable() throws Exception
{
247 TableName tn
= name
.getTableName();
248 ColumnFamilyDescriptorBuilder cfDescBuilder
= ColumnFamilyDescriptorBuilder
.newBuilder(family
);
249 TableDescriptorBuilder tableDescBuilder
=
250 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
251 try (AsyncConnection ASYNC_CONN
=
252 ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get()) {
253 ASYNC_CONN
.getAdmin().createTable(tableDescBuilder
.build()).get();
254 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tn
);
256 Put put
= new Put(row
);
257 put
.addColumn(family
, qualifier
, value
);
258 table
.put(put
).get();
260 Get get
= new Get(row
);
261 Result result
= table
.get(get
).get();
262 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
263 assertArrayEquals(value
, result
.getValue(family
, qualifier
));
268 public void testChangeMetaReplicaCount() throws Exception
{
269 Admin admin
= TEST_UTIL
.getAdmin();
270 try (RegionLocator locator
=
271 TEST_UTIL
.getConnection().getRegionLocator(TableName
.META_TABLE_NAME
)) {
272 assertEquals(1, locator
.getAllRegionLocations().size());
273 HBaseTestingUtil
.setReplicas(admin
, TableName
.META_TABLE_NAME
, 3);
274 TEST_UTIL
.waitFor(30000, () -> locator
.getAllRegionLocations().size() == 3);
275 HBaseTestingUtil
.setReplicas(admin
, TableName
.META_TABLE_NAME
, 2);
276 TEST_UTIL
.waitFor(30000, () -> locator
.getAllRegionLocations().size() == 2);
277 HBaseTestingUtil
.setReplicas(admin
, TableName
.META_TABLE_NAME
, 1);
278 TEST_UTIL
.waitFor(30000, () -> locator
.getAllRegionLocations().size() == 1);