2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.AsyncConnectionConfiguration
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.util
.ArrayList
;
26 import java
.util
.Collection
;
27 import java
.util
.Collections
;
28 import java
.util
.EnumSet
;
29 import java
.util
.List
;
31 import org
.apache
.hadoop
.conf
.Configuration
;
32 import org
.apache
.hadoop
.hbase
.ClusterMetrics
;
33 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
36 import org
.apache
.hadoop
.hbase
.HConstants
;
37 import org
.apache
.hadoop
.hbase
.RegionMetrics
;
38 import org
.apache
.hadoop
.hbase
.ServerMetrics
;
39 import org
.apache
.hadoop
.hbase
.ServerName
;
40 import org
.apache
.hadoop
.hbase
.TableName
;
41 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
42 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
43 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
44 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
45 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
46 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
47 import org
.junit
.BeforeClass
;
48 import org
.junit
.ClassRule
;
49 import org
.junit
.Test
;
50 import org
.junit
.experimental
.categories
.Category
;
51 import org
.junit
.runner
.RunWith
;
52 import org
.junit
.runners
.Parameterized
;
54 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
55 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
57 @RunWith(Parameterized
.class)
58 @Category({ ClientTests
.class, LargeTests
.class })
59 public class TestAsyncClusterAdminApi
extends TestAsyncAdminBase
{
62 public static final HBaseClassTestRule CLASS_RULE
=
63 HBaseClassTestRule
.forClass(TestAsyncClusterAdminApi
.class);
66 public static void setUpBeforeClass() throws Exception
{
68 setUpConfigurationFiles(TEST_UTIL
);
69 TEST_UTIL
.getConfiguration().setInt(HConstants
.MASTER_INFO_PORT
, 0);
70 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 60000);
71 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 120000);
72 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
73 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
75 TEST_UTIL
.startMiniCluster(2);
76 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
77 addResourceToRegionServerConfiguration(TEST_UTIL
);
81 public void testGetMasterInfoPort() throws Exception
{
82 assertEquals(TEST_UTIL
.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin
83 .getMasterInfoPort().get());
87 public void testRegionServerOnlineConfigChange() throws Exception
{
88 replaceHBaseSiteXML();
89 admin
.getRegionServers().get().forEach(server
-> admin
.updateConfiguration(server
).join());
91 // Check the configuration of the RegionServers
92 TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads().forEach(thread
-> {
93 Configuration conf
= thread
.getRegionServer().getConfiguration();
94 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
97 restoreHBaseSiteXML();
101 public void testMasterOnlineConfigChange() throws Exception
{
102 replaceHBaseSiteXML();
103 ServerName master
= admin
.getMaster().get();
104 admin
.updateConfiguration(master
).join();
105 admin
.getBackupMasters().get()
106 .forEach(backupMaster
-> admin
.updateConfiguration(backupMaster
).join());
108 // Check the configuration of the Masters
109 TEST_UTIL
.getMiniHBaseCluster().getMasterThreads().forEach(thread
-> {
110 Configuration conf
= thread
.getMaster().getConfiguration();
111 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
114 restoreHBaseSiteXML();
118 public void testAllClusterOnlineConfigChange() throws IOException
{
119 replaceHBaseSiteXML();
120 admin
.updateConfiguration().join();
122 // Check the configuration of the Masters
123 TEST_UTIL
.getMiniHBaseCluster().getMasterThreads().forEach(thread
-> {
124 Configuration conf
= thread
.getMaster().getConfiguration();
125 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
128 // Check the configuration of the RegionServers
129 TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads().forEach(thread
-> {
130 Configuration conf
= thread
.getRegionServer().getConfiguration();
131 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
134 restoreHBaseSiteXML();
138 public void testRollWALWALWriter() throws Exception
{
139 setUpforLogRolling();
140 String className
= this.getClass().getName();
141 StringBuilder v
= new StringBuilder(className
);
142 while (v
.length() < 1000) {
145 byte[] value
= Bytes
.toBytes(v
.toString());
146 HRegionServer regionServer
= startAndWriteData(tableName
, value
);
147 LOG
.info("after writing there are "
148 + AbstractFSWALProvider
.getNumRolledLogFiles(regionServer
.getWAL(null)) + " log files");
151 for (HRegion r
: regionServer
.getOnlineRegionsLocalContext()) {
154 admin
.rollWALWriter(regionServer
.getServerName()).join();
155 int count
= AbstractFSWALProvider
.getNumRolledLogFiles(regionServer
.getWAL(null));
156 LOG
.info("after flushing all regions and rolling logs there are " +
157 count
+ " log files");
158 assertTrue(("actual count: " + count
), count
<= 2);
161 private void setUpforLogRolling() {
162 // Force a region split after every 768KB
163 TEST_UTIL
.getConfiguration().setLong(HConstants
.HREGION_MAX_FILESIZE
,
166 // We roll the log after every 32 writes
167 TEST_UTIL
.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
169 TEST_UTIL
.getConfiguration().setInt(
170 "hbase.regionserver.logroll.errors.tolerated", 2);
171 TEST_UTIL
.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
173 // For less frequently updated regions flush after every 2 flushes
174 TEST_UTIL
.getConfiguration().setInt(
175 "hbase.hregion.memstore.optionalflushcount", 2);
177 // We flush the cache after every 8192 bytes
178 TEST_UTIL
.getConfiguration().setInt(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
,
181 // Increase the amount of time between client retries
182 TEST_UTIL
.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
184 // Reduce thread wake frequency so that other threads can get
186 TEST_UTIL
.getConfiguration().setInt(HConstants
.THREAD_WAKE_FREQUENCY
,
189 /**** configuration for testLogRollOnDatanodeDeath ****/
190 // lower the namenode & datanode heartbeat so the namenode
191 // quickly detects datanode failures
192 TEST_UTIL
.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
193 TEST_UTIL
.getConfiguration().setInt("dfs.heartbeat.interval", 1);
194 // the namenode might still try to choose the recently-dead datanode
195 // for a pipeline, so try to a new pipeline multiple times
196 TEST_UTIL
.getConfiguration().setInt("dfs.client.block.write.retries", 30);
197 TEST_UTIL
.getConfiguration().setInt(
198 "hbase.regionserver.hlog.tolerable.lowreplication", 2);
199 TEST_UTIL
.getConfiguration().setInt(
200 "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
203 private HRegionServer
startAndWriteData(TableName tableName
, byte[] value
) throws Exception
{
204 createTableWithDefaultConf(tableName
);
205 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tableName
);
206 HRegionServer regionServer
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
);
207 for (int i
= 1; i
<= 256; i
++) { // 256 writes should cause 8 log rolls
208 Put put
= new Put(Bytes
.toBytes("row" + String
.format("%1$04d", i
)));
209 put
.addColumn(FAMILY
, null, value
);
210 table
.put(put
).join();
212 // After every 32 writes sleep to let the log roller run
215 } catch (InterruptedException e
) {
224 public void testGetRegionLoads() throws Exception
{
225 // Turn off the balancer
226 admin
.balancerSwitch(false).join();
228 new TableName
[] { TableName
.valueOf(tableName
.getNameAsString() + "1"),
229 TableName
.valueOf(tableName
.getNameAsString() + "2"),
230 TableName
.valueOf(tableName
.getNameAsString() + "3") };
231 createAndLoadTable(tables
);
232 // Sleep to wait region server report
233 Thread
.sleep(TEST_UTIL
.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
234 // Check if regions match with the regionLoad from the server
235 Collection
<ServerName
> servers
= admin
.getRegionServers().get();
236 for (ServerName serverName
: servers
) {
237 List
<RegionInfo
> regions
= admin
.getRegions(serverName
).get();
238 checkRegionsAndRegionLoads(regions
, admin
.getRegionMetrics(serverName
).get());
241 // Check if regionLoad matches the table's regions and nothing is missed
242 for (TableName table
: tables
) {
243 List
<RegionInfo
> tableRegions
= admin
.getRegions(table
).get();
244 List
<RegionMetrics
> regionLoads
= Lists
.newArrayList();
245 for (ServerName serverName
: servers
) {
246 regionLoads
.addAll(admin
.getRegionMetrics(serverName
, table
).get());
248 checkRegionsAndRegionLoads(tableRegions
, regionLoads
);
251 // Check RegionLoad matches the regionLoad from ClusterStatus
252 ClusterMetrics clusterStatus
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
)).get();
253 assertEquals(servers
.size(), clusterStatus
.getLiveServerMetrics().size());
254 for (Map
.Entry
<ServerName
, ServerMetrics
> entry
:
255 clusterStatus
.getLiveServerMetrics().entrySet()) {
256 ServerName sn
= entry
.getKey();
257 ServerMetrics sm
= entry
.getValue();
258 compareRegionLoads(sm
.getRegionMetrics().values(), admin
.getRegionMetrics(sn
).get());
260 for (ServerName serverName
: clusterStatus
.getLiveServerMetrics().keySet()) {
261 ServerMetrics serverLoad
= clusterStatus
.getLiveServerMetrics().get(serverName
);
267 public void testGetRegionServers() throws Exception
{
268 List
<ServerName
> serverNames
= new ArrayList
<>(admin
.getRegionServers(true).get());
269 assertEquals(2, serverNames
.size());
271 List
<ServerName
> serversToDecom
= new ArrayList
<>();
272 ServerName serverToDecommission
= serverNames
.get(0);
274 serversToDecom
.add(serverToDecommission
);
275 admin
.decommissionRegionServers(serversToDecom
, false).join();
277 assertEquals(1, admin
.getRegionServers(true).get().size());
278 assertEquals(2, admin
.getRegionServers(false).get().size());
280 admin
.recommissionRegionServer(serverToDecommission
, Collections
.emptyList()).join();
282 assertEquals(2, admin
.getRegionServers(true).get().size());
283 assertEquals(2, admin
.getRegionServers(false).get().size());
286 private void compareRegionLoads(Collection
<RegionMetrics
> regionLoadCluster
,
287 Collection
<RegionMetrics
> regionLoads
) {
289 assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
290 regionLoadCluster
.size(), regionLoads
.size());
292 for (RegionMetrics loadCluster
: regionLoadCluster
) {
293 boolean matched
= false;
294 for (RegionMetrics load
: regionLoads
) {
295 if (Bytes
.equals(loadCluster
.getRegionName(), load
.getRegionName())) {
300 assertTrue("The contents of region load from cluster and server should match", matched
);
304 private void checkRegionsAndRegionLoads(Collection
<RegionInfo
> regions
,
305 Collection
<RegionMetrics
> regionLoads
) {
307 assertEquals("No of regions and regionloads doesn't match", regions
.size(), regionLoads
.size());
309 Map
<byte[], RegionMetrics
> regionLoadMap
= Maps
.newTreeMap(Bytes
.BYTES_COMPARATOR
);
310 for (RegionMetrics regionLoad
: regionLoads
) {
311 regionLoadMap
.put(regionLoad
.getRegionName(), regionLoad
);
313 for (RegionInfo info
: regions
) {
314 assertTrue("Region not in regionLoadMap region:" + info
.getRegionNameAsString()
315 + " regionMap: " + regionLoadMap
, regionLoadMap
.containsKey(info
.getRegionName()));
319 private void createAndLoadTable(TableName
[] tables
) {
320 for (TableName table
: tables
) {
321 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(table
);
322 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
));
323 admin
.createTable(builder
.build(), Bytes
.toBytes("aaaaa"), Bytes
.toBytes("zzzzz"), 16).join();
324 AsyncTable
<?
> asyncTable
= ASYNC_CONN
.getTable(table
);
325 List
<Put
> puts
= new ArrayList
<>();
326 for (byte[] row
: HBaseTestingUtil
.ROWS
) {
327 puts
.add(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("q"), Bytes
.toBytes("v")));
329 asyncTable
.putAll(puts
).join();