2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.AsyncProcess
.START_LOG_ERRORS_AFTER_COUNT_KEY
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertTrue
;
24 import java
.io
.IOException
;
25 import java
.nio
.file
.FileSystems
;
26 import java
.nio
.file
.Files
;
27 import java
.nio
.file
.Path
;
28 import java
.nio
.file
.StandardCopyOption
;
29 import java
.util
.ArrayList
;
30 import java
.util
.Collection
;
31 import java
.util
.EnumSet
;
32 import java
.util
.List
;
34 import org
.apache
.hadoop
.conf
.Configuration
;
35 import org
.apache
.hadoop
.hbase
.ClusterMetrics
;
36 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
37 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
38 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
39 import org
.apache
.hadoop
.hbase
.HConstants
;
40 import org
.apache
.hadoop
.hbase
.RegionMetrics
;
41 import org
.apache
.hadoop
.hbase
.ServerMetrics
;
42 import org
.apache
.hadoop
.hbase
.ServerName
;
43 import org
.apache
.hadoop
.hbase
.TableName
;
44 import org
.apache
.hadoop
.hbase
.regionserver
.HRegion
;
45 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
47 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
48 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
49 import org
.apache
.hadoop
.hbase
.wal
.AbstractFSWALProvider
;
50 import org
.junit
.BeforeClass
;
51 import org
.junit
.ClassRule
;
52 import org
.junit
.Test
;
53 import org
.junit
.experimental
.categories
.Category
;
54 import org
.junit
.runner
.RunWith
;
55 import org
.junit
.runners
.Parameterized
;
57 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Lists
;
58 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.collect
.Maps
;
60 @RunWith(Parameterized
.class)
61 @Category({ ClientTests
.class, LargeTests
.class })
62 public class TestAsyncClusterAdminApi
extends TestAsyncAdminBase
{
65 public static final HBaseClassTestRule CLASS_RULE
=
66 HBaseClassTestRule
.forClass(TestAsyncClusterAdminApi
.class);
68 private final Path cnfPath
= FileSystems
.getDefault().getPath("target/test-classes/hbase-site.xml");
69 private final Path cnf2Path
= FileSystems
.getDefault().getPath("target/test-classes/hbase-site2.xml");
70 private final Path cnf3Path
= FileSystems
.getDefault().getPath("target/test-classes/hbase-site3.xml");
73 public static void setUpBeforeClass() throws Exception
{
74 TEST_UTIL
.getConfiguration().setInt(HConstants
.MASTER_INFO_PORT
, 0);
75 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_RPC_TIMEOUT_KEY
, 60000);
76 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_OPERATION_TIMEOUT
, 120000);
77 TEST_UTIL
.getConfiguration().setInt(HConstants
.HBASE_CLIENT_RETRIES_NUMBER
, 2);
78 TEST_UTIL
.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY
, 0);
79 TEST_UTIL
.startMiniCluster(2);
80 ASYNC_CONN
= ConnectionFactory
.createAsyncConnection(TEST_UTIL
.getConfiguration()).get();
84 public void testGetMasterInfoPort() throws Exception
{
85 assertEquals(TEST_UTIL
.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin
86 .getMasterInfoPort().get());
90 public void testRegionServerOnlineConfigChange() throws Exception
{
91 replaceHBaseSiteXML();
92 admin
.getRegionServers().get().forEach(server
-> admin
.updateConfiguration(server
).join());
94 // Check the configuration of the RegionServers
95 TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads().forEach(thread
-> {
96 Configuration conf
= thread
.getRegionServer().getConfiguration();
97 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
100 restoreHBaseSiteXML();
104 public void testMasterOnlineConfigChange() throws Exception
{
105 replaceHBaseSiteXML();
106 ServerName master
= admin
.getMaster().get();
107 admin
.updateConfiguration(master
).join();
108 admin
.getBackupMasters().get()
109 .forEach(backupMaster
-> admin
.updateConfiguration(backupMaster
).join());
111 // Check the configuration of the Masters
112 TEST_UTIL
.getMiniHBaseCluster().getMasterThreads().forEach(thread
-> {
113 Configuration conf
= thread
.getMaster().getConfiguration();
114 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
117 restoreHBaseSiteXML();
121 public void testAllClusterOnlineConfigChange() throws IOException
{
122 replaceHBaseSiteXML();
123 admin
.updateConfiguration().join();
125 // Check the configuration of the Masters
126 TEST_UTIL
.getMiniHBaseCluster().getMasterThreads().forEach(thread
-> {
127 Configuration conf
= thread
.getMaster().getConfiguration();
128 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
131 // Check the configuration of the RegionServers
132 TEST_UTIL
.getMiniHBaseCluster().getRegionServerThreads().forEach(thread
-> {
133 Configuration conf
= thread
.getRegionServer().getConfiguration();
134 assertEquals(1000, conf
.getInt("hbase.custom.config", 0));
137 restoreHBaseSiteXML();
140 private void replaceHBaseSiteXML() throws IOException
{
141 // make a backup of hbase-site.xml
142 Files
.copy(cnfPath
, cnf3Path
, StandardCopyOption
.REPLACE_EXISTING
);
143 // update hbase-site.xml by overwriting it
144 Files
.copy(cnf2Path
, cnfPath
, StandardCopyOption
.REPLACE_EXISTING
);
147 private void restoreHBaseSiteXML() throws IOException
{
148 // restore hbase-site.xml
149 Files
.copy(cnf3Path
, cnfPath
, StandardCopyOption
.REPLACE_EXISTING
);
153 public void testRollWALWALWriter() throws Exception
{
154 setUpforLogRolling();
155 String className
= this.getClass().getName();
156 StringBuilder v
= new StringBuilder(className
);
157 while (v
.length() < 1000) {
160 byte[] value
= Bytes
.toBytes(v
.toString());
161 HRegionServer regionServer
= startAndWriteData(tableName
, value
);
162 LOG
.info("after writing there are "
163 + AbstractFSWALProvider
.getNumRolledLogFiles(regionServer
.getWAL(null)) + " log files");
166 for (HRegion r
: regionServer
.getOnlineRegionsLocalContext()) {
169 admin
.rollWALWriter(regionServer
.getServerName()).join();
170 int count
= AbstractFSWALProvider
.getNumRolledLogFiles(regionServer
.getWAL(null));
171 LOG
.info("after flushing all regions and rolling logs there are " +
172 count
+ " log files");
173 assertTrue(("actual count: " + count
), count
<= 2);
176 private void setUpforLogRolling() {
177 // Force a region split after every 768KB
178 TEST_UTIL
.getConfiguration().setLong(HConstants
.HREGION_MAX_FILESIZE
,
181 // We roll the log after every 32 writes
182 TEST_UTIL
.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
184 TEST_UTIL
.getConfiguration().setInt(
185 "hbase.regionserver.logroll.errors.tolerated", 2);
186 TEST_UTIL
.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000);
188 // For less frequently updated regions flush after every 2 flushes
189 TEST_UTIL
.getConfiguration().setInt(
190 "hbase.hregion.memstore.optionalflushcount", 2);
192 // We flush the cache after every 8192 bytes
193 TEST_UTIL
.getConfiguration().setInt(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
,
196 // Increase the amount of time between client retries
197 TEST_UTIL
.getConfiguration().setLong("hbase.client.pause", 10 * 1000);
199 // Reduce thread wake frequency so that other threads can get
201 TEST_UTIL
.getConfiguration().setInt(HConstants
.THREAD_WAKE_FREQUENCY
,
204 /**** configuration for testLogRollOnDatanodeDeath ****/
205 // lower the namenode & datanode heartbeat so the namenode
206 // quickly detects datanode failures
207 TEST_UTIL
.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
208 TEST_UTIL
.getConfiguration().setInt("dfs.heartbeat.interval", 1);
209 // the namenode might still try to choose the recently-dead datanode
210 // for a pipeline, so try to a new pipeline multiple times
211 TEST_UTIL
.getConfiguration().setInt("dfs.client.block.write.retries", 30);
212 TEST_UTIL
.getConfiguration().setInt(
213 "hbase.regionserver.hlog.tolerable.lowreplication", 2);
214 TEST_UTIL
.getConfiguration().setInt(
215 "hbase.regionserver.hlog.lowreplication.rolllimit", 3);
218 private HRegionServer
startAndWriteData(TableName tableName
, byte[] value
) throws Exception
{
219 createTableWithDefaultConf(tableName
);
220 AsyncTable
<?
> table
= ASYNC_CONN
.getTable(tableName
);
221 HRegionServer regionServer
= TEST_UTIL
.getRSForFirstRegionInTable(tableName
);
222 for (int i
= 1; i
<= 256; i
++) { // 256 writes should cause 8 log rolls
223 Put put
= new Put(Bytes
.toBytes("row" + String
.format("%1$04d", i
)));
224 put
.addColumn(FAMILY
, null, value
);
225 table
.put(put
).join();
227 // After every 32 writes sleep to let the log roller run
230 } catch (InterruptedException e
) {
239 public void testGetRegionLoads() throws Exception
{
240 // Turn off the balancer
241 admin
.balancerSwitch(false).join();
243 new TableName
[] { TableName
.valueOf(tableName
.getNameAsString() + "1"),
244 TableName
.valueOf(tableName
.getNameAsString() + "2"),
245 TableName
.valueOf(tableName
.getNameAsString() + "3") };
246 createAndLoadTable(tables
);
247 // Sleep to wait region server report
248 Thread
.sleep(TEST_UTIL
.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2);
249 // Check if regions match with the regionLoad from the server
250 Collection
<ServerName
> servers
= admin
.getRegionServers().get();
251 for (ServerName serverName
: servers
) {
252 List
<RegionInfo
> regions
= admin
.getRegions(serverName
).get();
253 checkRegionsAndRegionLoads(regions
, admin
.getRegionMetrics(serverName
).get());
256 // Check if regionLoad matches the table's regions and nothing is missed
257 for (TableName table
: tables
) {
258 List
<RegionInfo
> tableRegions
= admin
.getRegions(table
).get();
259 List
<RegionMetrics
> regionLoads
= Lists
.newArrayList();
260 for (ServerName serverName
: servers
) {
261 regionLoads
.addAll(admin
.getRegionMetrics(serverName
, table
).get());
263 checkRegionsAndRegionLoads(tableRegions
, regionLoads
);
266 // Check RegionLoad matches the regionLoad from ClusterStatus
267 ClusterMetrics clusterStatus
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
)).get();
268 for (Map
.Entry
<ServerName
, ServerMetrics
> entry
:
269 clusterStatus
.getLiveServerMetrics().entrySet()) {
270 ServerName sn
= entry
.getKey();
271 ServerMetrics sm
= entry
.getValue();
272 compareRegionLoads(sm
.getRegionMetrics().values(), admin
.getRegionMetrics(sn
).get());
274 for (ServerName serverName
: clusterStatus
.getLiveServerMetrics().keySet()) {
275 ServerMetrics serverLoad
= clusterStatus
.getLiveServerMetrics().get(serverName
);
280 private void compareRegionLoads(Collection
<RegionMetrics
> regionLoadCluster
,
281 Collection
<RegionMetrics
> regionLoads
) {
283 assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match",
284 regionLoadCluster
.size(), regionLoads
.size());
286 for (RegionMetrics loadCluster
: regionLoadCluster
) {
287 boolean matched
= false;
288 for (RegionMetrics load
: regionLoads
) {
289 if (Bytes
.equals(loadCluster
.getRegionName(), load
.getRegionName())) {
294 assertTrue("The contents of region load from cluster and server should match", matched
);
298 private void checkRegionsAndRegionLoads(Collection
<RegionInfo
> regions
,
299 Collection
<RegionMetrics
> regionLoads
) {
301 assertEquals("No of regions and regionloads doesn't match", regions
.size(), regionLoads
.size());
303 Map
<byte[], RegionMetrics
> regionLoadMap
= Maps
.newTreeMap(Bytes
.BYTES_COMPARATOR
);
304 for (RegionMetrics regionLoad
: regionLoads
) {
305 regionLoadMap
.put(regionLoad
.getRegionName(), regionLoad
);
307 for (RegionInfo info
: regions
) {
308 assertTrue("Region not in regionLoadMap region:" + info
.getRegionNameAsString()
309 + " regionMap: " + regionLoadMap
, regionLoadMap
.containsKey(info
.getRegionName()));
313 private void createAndLoadTable(TableName
[] tables
) {
314 for (TableName table
: tables
) {
315 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(table
);
316 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(FAMILY
));
317 admin
.createTable(builder
.build(), Bytes
.toBytes("aaaaa"), Bytes
.toBytes("zzzzz"), 16).join();
318 AsyncTable
<?
> asyncTable
= ASYNC_CONN
.getTable(table
);
319 List
<Put
> puts
= new ArrayList
<>();
320 for (byte[] row
: HBaseTestingUtility
.ROWS
) {
321 puts
.add(new Put(row
).addColumn(FAMILY
, Bytes
.toBytes("q"), Bytes
.toBytes("v")));
323 asyncTable
.putAll(puts
).join();