2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
21 import org
.apache
.commons
.io
.FileUtils
;
22 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
23 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
24 import org
.apache
.hadoop
.hbase
.HConstants
;
25 import org
.apache
.hadoop
.hbase
.MiniHBaseCluster
;
26 import org
.apache
.hadoop
.hbase
.ServerName
;
27 import org
.apache
.hadoop
.hbase
.StartMiniClusterOption
;
28 import org
.apache
.hadoop
.hbase
.TableName
;
29 import org
.apache
.hadoop
.hbase
.master
.HMaster
;
30 import org
.apache
.hadoop
.hbase
.master
.assignment
.AssignmentTestingUtil
;
31 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
32 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
33 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
34 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
35 import org
.apache
.hadoop
.hbase
.util
.JVMClusterUtil
.RegionServerThread
;
36 import org
.apache
.hadoop
.hbase
.zookeeper
.MiniZooKeeperCluster
;
37 import org
.junit
.AfterClass
;
38 import org
.junit
.Assert
;
39 import org
.junit
.BeforeClass
;
40 import org
.junit
.ClassRule
;
41 import org
.junit
.Rule
;
42 import org
.junit
.Test
;
43 import org
.junit
.experimental
.categories
.Category
;
44 import org
.junit
.rules
.TestName
;
45 import org
.slf4j
.Logger
;
46 import org
.slf4j
.LoggerFactory
;
48 @Category({ ClientTests
.class, MediumTests
.class })
49 public class TestSeparateClientZKCluster
{
50 private static final Logger LOG
= LoggerFactory
.getLogger(TestSeparateClientZKCluster
.class);
51 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
52 private static final File clientZkDir
=
53 new File(TEST_UTIL
.getDataTestDir("TestSeparateClientZKCluster").toString());
54 private static final int ZK_SESSION_TIMEOUT
= 5000;
55 private static MiniZooKeeperCluster clientZkCluster
;
57 private final byte[] family
= Bytes
.toBytes("cf");
58 private final byte[] qualifier
= Bytes
.toBytes("c1");
59 private final byte[] row
= Bytes
.toBytes("row");
60 private final byte[] value
= Bytes
.toBytes("v1");
61 private final byte[] newVal
= Bytes
.toBytes("v2");
64 public TestName name
= new TestName();
67 public static final HBaseClassTestRule CLASS_RULE
=
68 HBaseClassTestRule
.forClass(TestSeparateClientZKCluster
.class);
71 public static void beforeAllTests() throws Exception
{
72 int clientZkPort
= 21828;
73 clientZkCluster
= new MiniZooKeeperCluster(TEST_UTIL
.getConfiguration());
74 clientZkCluster
.setDefaultClientPort(clientZkPort
);
75 clientZkCluster
.startup(clientZkDir
);
76 // reduce the retry number and start log counter
77 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
78 TEST_UTIL
.getConfiguration().setInt("hbase.client.start.log.errors.counter", -1);
79 TEST_UTIL
.getConfiguration().setInt("zookeeper.recovery.retry", 1);
80 // core settings for testing client ZK cluster
81 TEST_UTIL
.getConfiguration().set(HConstants
.CLIENT_ZOOKEEPER_QUORUM
, HConstants
.LOCALHOST
);
82 TEST_UTIL
.getConfiguration().setInt(HConstants
.CLIENT_ZOOKEEPER_CLIENT_PORT
, clientZkPort
);
83 // reduce zk session timeout to easier trigger session expiration
84 TEST_UTIL
.getConfiguration().setInt(HConstants
.ZK_SESSION_TIMEOUT
, ZK_SESSION_TIMEOUT
);
85 // Start a cluster with 2 masters and 3 regionservers.
86 StartMiniClusterOption option
= StartMiniClusterOption
.builder()
87 .numMasters(2).numRegionServers(3).numDataNodes(3).build();
88 TEST_UTIL
.startMiniCluster(option
);
92 public static void afterAllTests() throws Exception
{
93 TEST_UTIL
.shutdownMiniCluster();
94 clientZkCluster
.shutdown();
95 FileUtils
.deleteDirectory(clientZkDir
);
99 public void testBasicOperation() throws Exception
{
100 TableName tn
= TableName
.valueOf(name
.getMethodName());
102 Connection conn
= TEST_UTIL
.getConnection();
103 try (Admin admin
= conn
.getAdmin(); Table table
= conn
.getTable(tn
)) {
104 ColumnFamilyDescriptorBuilder cfDescBuilder
=
105 ColumnFamilyDescriptorBuilder
.newBuilder(family
);
106 TableDescriptorBuilder tableDescBuilder
=
107 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
108 admin
.createTable(tableDescBuilder
.build());
109 // test simple get and put
110 Put put
= new Put(row
);
111 put
.addColumn(family
, qualifier
, value
);
113 Get get
= new Get(row
);
114 Result result
= table
.get(get
);
115 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
116 Assert
.assertArrayEquals(value
, result
.getValue(family
, qualifier
));
121 public void testMasterSwitch() throws Exception
{
122 // get an admin instance and issue some request first
123 Connection conn
= TEST_UTIL
.getConnection();
124 try (Admin admin
= conn
.getAdmin()) {
125 LOG
.debug("Tables: " + admin
.listTableDescriptors());
126 MiniHBaseCluster cluster
= TEST_UTIL
.getHBaseCluster();
127 // switch active master
128 HMaster master
= cluster
.getMaster();
130 LOG
.info("Stopped master {}", master
.getServerName());
131 while (!master
.isShutDown()) {
134 LOG
.info("Shutdown master {}", master
.getServerName());
135 while (cluster
.getMaster() == null || !cluster
.getMaster().isInitialized()) {
136 LOG
.info("Get master {}", cluster
.getMaster() == null?
"null":
137 cluster
.getMaster().getServerName());
140 LOG
.info("Got master {}", cluster
.getMaster().getServerName());
141 // confirm client access still works
142 Assert
.assertTrue(admin
.balance(false));
147 public void testMetaRegionMove() throws Exception
{
148 TableName tn
= TableName
.valueOf(name
.getMethodName());
150 Connection conn
= TEST_UTIL
.getConnection();
151 try (Admin admin
= conn
.getAdmin();
152 Table table
= conn
.getTable(tn
);
153 RegionLocator locator
= conn
.getRegionLocator(tn
)) {
154 MiniHBaseCluster cluster
= TEST_UTIL
.getHBaseCluster();
155 ColumnFamilyDescriptorBuilder cfDescBuilder
=
156 ColumnFamilyDescriptorBuilder
.newBuilder(family
);
157 TableDescriptorBuilder tableDescBuilder
=
158 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
159 admin
.createTable(tableDescBuilder
.build());
160 // issue some requests to cache the region location
161 Put put
= new Put(row
);
162 put
.addColumn(family
, qualifier
, value
);
164 Get get
= new Get(row
);
165 Result result
= table
.get(get
);
166 // move meta region and confirm client could detect
167 ServerName destServerName
= null;
168 for (RegionServerThread rst
: cluster
.getLiveRegionServerThreads()) {
169 ServerName name
= rst
.getRegionServer().getServerName();
170 if (!name
.equals(cluster
.getServerHoldingMeta())) {
171 destServerName
= name
;
175 admin
.move(RegionInfoBuilder
.FIRST_META_REGIONINFO
.getEncodedNameAsBytes(), destServerName
);
176 LOG
.debug("Finished moving meta");
177 // invalidate client cache
178 RegionInfo region
= locator
.getRegionLocation(row
).getRegion();
179 ServerName currentServer
= cluster
.getServerHoldingRegion(tn
, region
.getRegionName());
180 for (RegionServerThread rst
: cluster
.getLiveRegionServerThreads()) {
181 ServerName name
= rst
.getRegionServer().getServerName();
182 if (!name
.equals(currentServer
)) {
183 destServerName
= name
;
187 admin
.move(region
.getEncodedNameAsBytes(), destServerName
);
188 LOG
.debug("Finished moving user region");
190 put
.addColumn(family
, qualifier
, newVal
);
192 result
= table
.get(get
);
193 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
194 Assert
.assertArrayEquals(newVal
, result
.getValue(family
, qualifier
));
199 public void testMetaMoveDuringClientZkClusterRestart() throws Exception
{
200 TableName tn
= TableName
.valueOf(name
.getMethodName());
202 Connection conn
= TEST_UTIL
.getConnection();
203 try (Admin admin
= conn
.getAdmin(); Table table
= conn
.getTable(tn
)) {
204 ColumnFamilyDescriptorBuilder cfDescBuilder
=
205 ColumnFamilyDescriptorBuilder
.newBuilder(family
);
206 TableDescriptorBuilder tableDescBuilder
=
207 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
208 admin
.createTable(tableDescBuilder
.build());
210 Put put
= new Put(row
);
211 put
.addColumn(family
, qualifier
, value
);
213 // invalid connection cache
214 conn
.clearRegionLocationCache();
215 // stop client zk cluster
216 clientZkCluster
.shutdown();
217 // stop current meta server and confirm the server shutdown process
218 // is not affected by client ZK crash
219 MiniHBaseCluster cluster
= TEST_UTIL
.getHBaseCluster();
220 int metaServerId
= cluster
.getServerWithMeta();
221 HRegionServer metaServer
= cluster
.getRegionServer(metaServerId
);
222 metaServer
.stop("Stop current RS holding meta region");
223 while (!metaServer
.isShutDown()) {
226 // wait for meta region online
227 AssignmentTestingUtil
.waitForAssignment(cluster
.getMaster().getAssignmentManager(),
228 RegionInfoBuilder
.FIRST_META_REGIONINFO
);
229 // wait some long time to make sure we will retry sync data to client ZK until data set
231 clientZkCluster
.startup(clientZkDir
);
232 // new request should pass
233 Get get
= new Get(row
);
234 Result result
= table
.get(get
);
235 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
236 Assert
.assertArrayEquals(value
, result
.getValue(family
, qualifier
));
241 public void testAsyncTable() throws Exception
{
242 TableName tn
= TableName
.valueOf(name
.getMethodName());
243 ColumnFamilyDescriptorBuilder cfDescBuilder
= ColumnFamilyDescriptorBuilder
.newBuilder(family
);
244 TableDescriptorBuilder tableDescBuilder
=
245 TableDescriptorBuilder
.newBuilder(tn
).setColumnFamily(cfDescBuilder
.build());
246 try (AsyncConnection ASYNC_CONN
=
247 ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get()) {
248 ASYNC_CONN
.getAdmin().createTable(tableDescBuilder
.build()).get();
249 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tn
);
251 Put put
= new Put(row
);
252 put
.addColumn(family
, qualifier
, value
);
253 table
.put(put
).get();
255 Get get
= new Get(row
);
256 Result result
= table
.get(get
).get();
257 LOG
.debug("Result: " + Bytes
.toString(result
.getValue(family
, qualifier
)));
258 Assert
.assertArrayEquals(value
, result
.getValue(family
, qualifier
));