2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.apache
.hadoop
.hbase
.client
.MetricsConnection
.CLIENT_SIDE_METRICS_ENABLED_KEY
;
21 import static org
.junit
.Assert
.assertEquals
;
22 import static org
.junit
.Assert
.assertNotEquals
;
23 import static org
.junit
.Assert
.assertNotNull
;
24 import static org
.junit
.Assert
.assertTrue
;
26 import java
.io
.IOException
;
27 import java
.util
.concurrent
.CountDownLatch
;
28 import java
.util
.concurrent
.TimeUnit
;
29 import java
.util
.concurrent
.atomic
.AtomicLong
;
30 import org
.apache
.hadoop
.conf
.Configuration
;
31 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
32 import org
.apache
.hadoop
.hbase
.HConstants
;
33 import org
.apache
.hadoop
.hbase
.ServerName
;
34 import org
.apache
.hadoop
.hbase
.TableName
;
35 import org
.apache
.hadoop
.hbase
.client
.backoff
.ClientBackoffPolicy
;
36 import org
.apache
.hadoop
.hbase
.client
.backoff
.ExponentialClientBackoffPolicy
;
37 import org
.apache
.hadoop
.hbase
.client
.backoff
.ServerStatistics
;
38 import org
.apache
.hadoop
.hbase
.regionserver
.HRegionServer
;
39 import org
.apache
.hadoop
.hbase
.regionserver
.Region
;
40 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
41 import org
.apache
.hadoop
.hbase
.util
.EnvironmentEdgeManager
;
42 import org
.junit
.AfterClass
;
43 import org
.junit
.BeforeClass
;
44 import org
.junit
.Test
;
45 import org
.slf4j
.Logger
;
46 import org
.slf4j
.LoggerFactory
;
49 * Test that we can actually send and use region metrics to slowdown client writes
51 public abstract class ClientPushbackTestBase
{
53 private static final Logger LOG
= LoggerFactory
.getLogger(ClientPushbackTestBase
.class);
54 protected static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
56 protected static final TableName tableName
= TableName
.valueOf("client-pushback");
57 private static final byte[] family
= Bytes
.toBytes("f");
58 private static final byte[] qualifier
= Bytes
.toBytes("q");
59 private static final long flushSizeBytes
= 512;
62 public static void setupCluster() throws Exception
{
63 Configuration conf
= UTIL
.getConfiguration();
64 // enable backpressure
65 conf
.setBoolean(HConstants
.ENABLE_CLIENT_BACKPRESSURE
, true);
66 // use the exponential backoff policy
67 conf
.setClass(ClientBackoffPolicy
.BACKOFF_POLICY_CLASS
, ExponentialClientBackoffPolicy
.class,
68 ClientBackoffPolicy
.class);
69 // turn the memstore size way down so we don't need to write a lot to see changes in memstore
71 conf
.setLong(HConstants
.HREGION_MEMSTORE_FLUSH_SIZE
, flushSizeBytes
);
72 // ensure we block the flushes when we are double that flushsize
73 conf
.setLong(HConstants
.HREGION_MEMSTORE_BLOCK_MULTIPLIER
,
74 HConstants
.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER
);
75 conf
.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY
, true);
76 UTIL
.startMiniCluster(1);
77 UTIL
.createTable(tableName
, family
);
81 public static void cleanupCluster() throws Exception
{
82 UTIL
.shutdownMiniCluster();
85 protected abstract ClientBackoffPolicy
getBackoffPolicy() throws IOException
;
87 protected abstract ServerStatisticTracker
getStatisticsTracker() throws IOException
;
89 protected abstract MetricsConnection
getConnectionMetrics() throws IOException
;
91 protected abstract void mutate(Put put
) throws IOException
;
93 protected abstract void mutate(Put put
, AtomicLong endTime
, CountDownLatch latch
)
96 protected abstract void mutateRow(RowMutations mutations
) throws IOException
;
99 public void testClientTracksServerPushback() throws Exception
{
100 HRegionServer rs
= UTIL
.getHBaseCluster().getRegionServer(0);
101 Region region
= rs
.getRegions(tableName
).get(0);
103 LOG
.debug("Writing some data to " + tableName
);
105 Put p
= new Put(Bytes
.toBytes("row"));
106 p
.addColumn(family
, qualifier
, Bytes
.toBytes("value1"));
109 // get the current load on RS. Hopefully memstore isn't flushed since we wrote the the data
110 int load
= (int) ((region
.getMemStoreHeapSize() * 100) / flushSizeBytes
);
111 LOG
.debug("Done writing some data to " + tableName
);
113 // get the stats for the region hosting our table
114 ClientBackoffPolicy backoffPolicy
= getBackoffPolicy();
115 assertTrue("Backoff policy is not correctly configured",
116 backoffPolicy
instanceof ExponentialClientBackoffPolicy
);
118 ServerStatisticTracker stats
= getStatisticsTracker();
119 assertNotNull("No stats configured for the client!", stats
);
120 // get the names so we can query the stats
121 ServerName server
= rs
.getServerName();
122 byte[] regionName
= region
.getRegionInfo().getRegionName();
124 // check to see we found some load on the memstore
125 ServerStatistics serverStats
= stats
.getStats(server
);
126 ServerStatistics
.RegionStatistics regionStats
= serverStats
.getStatsForRegion(regionName
);
127 assertEquals("We did not find some load on the memstore", load
,
128 regionStats
.getMemStoreLoadPercent());
129 // check that the load reported produces a nonzero delay
130 long backoffTime
= backoffPolicy
.getBackoffTime(server
, regionName
, serverStats
);
131 assertNotEquals("Reported load does not produce a backoff", 0, backoffTime
);
132 LOG
.debug("Backoff calculated for " + region
.getRegionInfo().getRegionNameAsString() + " @ " +
133 server
+ " is " + backoffTime
);
135 CountDownLatch latch
= new CountDownLatch(1);
136 AtomicLong endTime
= new AtomicLong();
137 long startTime
= EnvironmentEdgeManager
.currentTime();
138 mutate(p
, endTime
, latch
);
139 // Currently the ExponentialClientBackoffPolicy under these test conditions
140 // produces a backoffTime of 151 milliseconds. This is long enough so the
141 // wait and related checks below are reasonable. Revisit if the backoff
142 // time reported by above debug logging has significantly deviated.
143 MetricsConnection metrics
= getConnectionMetrics();
144 String name
= server
.getServerName() + "," + Bytes
.toStringBinary(regionName
);
145 MetricsConnection
.RegionStats rsStats
= metrics
.serverStats
.get(server
).get(regionName
);
146 assertEquals(name
, rsStats
.name
);
147 assertEquals(rsStats
.heapOccupancyHist
.getSnapshot().getMean(),
148 (double) regionStats
.getHeapOccupancyPercent(), 0.1);
149 assertEquals(rsStats
.memstoreLoadHist
.getSnapshot().getMean(),
150 (double) regionStats
.getMemStoreLoadPercent(), 0.1);
152 MetricsConnection
.RunnerStats runnerStats
= metrics
.runnerStats
;
154 assertEquals(1, runnerStats
.delayRunners
.getCount());
155 assertEquals(1, runnerStats
.normalRunners
.getCount());
156 assertEquals("", runnerStats
.delayIntevalHist
.getSnapshot().getMean(), (double) backoffTime
,
159 latch
.await(backoffTime
* 2, TimeUnit
.MILLISECONDS
);
160 assertNotEquals("AsyncProcess did not submit the work time", 0, endTime
.get());
161 assertTrue("AsyncProcess did not delay long enough", endTime
.get() - startTime
>= backoffTime
);
165 public void testMutateRowStats() throws IOException
{
166 HRegionServer rs
= UTIL
.getHBaseCluster().getRegionServer(0);
167 Region region
= rs
.getRegions(tableName
).get(0);
169 RowMutations mutations
= new RowMutations(Bytes
.toBytes("row"));
170 Put p
= new Put(Bytes
.toBytes("row"));
171 p
.addColumn(family
, qualifier
, Bytes
.toBytes("value2"));
173 mutateRow(mutations
);
175 ServerStatisticTracker stats
= getStatisticsTracker();
176 assertNotNull("No stats configured for the client!", stats
);
177 // get the names so we can query the stats
178 ServerName server
= rs
.getServerName();
179 byte[] regionName
= region
.getRegionInfo().getRegionName();
181 // check to see we found some load on the memstore
182 ServerStatistics serverStats
= stats
.getStats(server
);
183 ServerStatistics
.RegionStatistics regionStats
= serverStats
.getStatsForRegion(regionName
);
185 assertNotNull(regionStats
);
186 assertTrue(regionStats
.getMemStoreLoadPercent() > 0);