2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.fail
;
23 import java
.io
.IOException
;
24 import java
.util
.Collection
;
25 import java
.util
.EnumSet
;
26 import java
.util
.HashMap
;
27 import java
.util
.LinkedList
;
28 import java
.util
.List
;
30 import java
.util
.Optional
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
33 import org
.apache
.hadoop
.hbase
.CompareOperator
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
36 import org
.apache
.hadoop
.hbase
.RegionMetrics
;
37 import org
.apache
.hadoop
.hbase
.ServerMetrics
;
38 import org
.apache
.hadoop
.hbase
.ServerName
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.client
.Admin
;
41 import org
.apache
.hadoop
.hbase
.client
.Append
;
42 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
43 import org
.apache
.hadoop
.hbase
.client
.Delete
;
44 import org
.apache
.hadoop
.hbase
.client
.Get
;
45 import org
.apache
.hadoop
.hbase
.client
.Increment
;
46 import org
.apache
.hadoop
.hbase
.client
.Put
;
47 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
48 import org
.apache
.hadoop
.hbase
.client
.Result
;
49 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
50 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
51 import org
.apache
.hadoop
.hbase
.client
.Scan
;
52 import org
.apache
.hadoop
.hbase
.client
.Table
;
53 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
54 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
55 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
56 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
57 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
58 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
59 import org
.apache
.hadoop
.hbase
.filter
.RowFilter
;
60 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
61 import org
.apache
.hadoop
.hbase
.master
.LoadBalancer
;
62 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
63 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
64 import org
.junit
.AfterClass
;
65 import org
.junit
.BeforeClass
;
66 import org
.junit
.ClassRule
;
67 import org
.junit
.Ignore
;
68 import org
.junit
.Test
;
69 import org
.junit
.experimental
.categories
.Category
;
70 import org
.slf4j
.Logger
;
71 import org
.slf4j
.LoggerFactory
;
73 @Ignore // Depends on Master being able to host regions. Needs fixing.
74 @Category(MediumTests
.class)
75 public class TestRegionServerReadRequestMetrics
{
78 public static final HBaseClassTestRule CLASS_RULE
=
79 HBaseClassTestRule
.forClass(TestRegionServerReadRequestMetrics
.class);
81 private static final Logger LOG
=
82 LoggerFactory
.getLogger(TestRegionServerReadRequestMetrics
.class);
83 private static final HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
84 private static final TableName TABLE_NAME
= TableName
.valueOf("test");
85 private static final byte[] CF1
= Bytes
.toBytes("c1");
86 private static final byte[] CF2
= Bytes
.toBytes("c2");
88 private static final byte[] ROW1
= Bytes
.toBytes("a");
89 private static final byte[] ROW2
= Bytes
.toBytes("b");
90 private static final byte[] ROW3
= Bytes
.toBytes("c");
91 private static final byte[] COL1
= Bytes
.toBytes("q1");
92 private static final byte[] COL2
= Bytes
.toBytes("q2");
93 private static final byte[] COL3
= Bytes
.toBytes("q3");
94 private static final byte[] VAL1
= Bytes
.toBytes("v1");
95 private static final byte[] VAL2
= Bytes
.toBytes("v2");
96 private static final byte[] VAL3
= Bytes
.toBytes(0L);
98 private static final int MAX_TRY
= 20;
99 private static final int SLEEP_MS
= 100;
100 private static final int TTL
= 1;
102 private static Admin admin
;
103 private static Collection
<ServerName
> serverNames
;
104 private static Table table
;
105 private static RegionInfo regionInfo
;
107 private static Map
<Metric
, Long
> requestsMap
= new HashMap
<>();
108 private static Map
<Metric
, Long
> requestsMapPrev
= new HashMap
<>();
111 public static void setUpOnce() throws Exception
{
112 // Default starts one regionserver only.
113 TEST_UTIL
.getConfiguration().setBoolean(LoadBalancer
.TABLES_ON_MASTER
, true);
114 // TEST_UTIL.getConfiguration().setBoolean(LoadBalancer.SYSTEM_TABLES_ON_MASTER, true);
115 TEST_UTIL
.startMiniCluster();
116 admin
= TEST_UTIL
.getAdmin();
117 serverNames
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
))
118 .getLiveServerMetrics().keySet();
119 table
= createTable();
121 List
<RegionInfo
> regions
= admin
.getRegions(TABLE_NAME
);
122 assertEquals("Table " + TABLE_NAME
+ " should have 1 region", 1, regions
.size());
123 regionInfo
= regions
.get(0);
125 for (Metric metric
: Metric
.values()) {
126 requestsMap
.put(metric
, 0L);
127 requestsMapPrev
.put(metric
, 0L);
131 private static Table
createTable() throws IOException
{
132 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(TABLE_NAME
);
133 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(CF1
));
134 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF2
).setTimeToLive(TTL
)
136 admin
.createTable(builder
.build());
137 return TEST_UTIL
.getConnection().getTable(TABLE_NAME
);
140 private static void testReadRequests(long resultCount
,
141 long expectedReadRequests
, long expectedFilteredReadRequests
)
142 throws IOException
, InterruptedException
{
144 System
.out
.println("requestsMapPrev = " + requestsMapPrev
);
145 System
.out
.println("requestsMap = " + requestsMap
);
147 assertEquals(expectedReadRequests
,
148 requestsMap
.get(Metric
.REGION_READ
) - requestsMapPrev
.get(Metric
.REGION_READ
));
149 boolean tablesOnMaster
= LoadBalancer
.isTablesOnMaster(TEST_UTIL
.getConfiguration());
150 if (tablesOnMaster
) {
151 // If NO tables on master, then the single regionserver in this test carries user-space
152 // tables and the meta table. The first time through, the read will be inflated by meta
153 // lookups. We don't know which test will be first through since junit randomizes. This
154 // method is used by a bunch of tests. Just do this check if master is hosting (system)
156 assertEquals(expectedReadRequests
,
157 requestsMap
.get(Metric
.SERVER_READ
) - requestsMapPrev
.get(Metric
.SERVER_READ
));
159 assertEquals(expectedFilteredReadRequests
,
160 requestsMap
.get(Metric
.FILTERED_REGION_READ
)
161 - requestsMapPrev
.get(Metric
.FILTERED_REGION_READ
));
162 assertEquals(expectedFilteredReadRequests
,
163 requestsMap
.get(Metric
.FILTERED_SERVER_READ
)
164 - requestsMapPrev
.get(Metric
.FILTERED_SERVER_READ
));
165 assertEquals(expectedReadRequests
, resultCount
);
168 private static void updateMetricsMap() throws IOException
, InterruptedException
{
169 for (Metric metric
: Metric
.values()) {
170 requestsMapPrev
.put(metric
, requestsMap
.get(metric
));
173 ServerMetrics serverMetrics
= null;
174 RegionMetrics regionMetricsOuter
= null;
175 boolean metricsUpdated
= false;
176 for (int i
= 0; i
< MAX_TRY
; i
++) {
177 for (ServerName serverName
: serverNames
) {
178 serverMetrics
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
))
179 .getLiveServerMetrics().get(serverName
);
181 Map
<byte[], RegionMetrics
> regionMetrics
= serverMetrics
.getRegionMetrics();
182 RegionMetrics regionMetric
= regionMetrics
.get(regionInfo
.getRegionName());
183 if (regionMetric
!= null) {
184 regionMetricsOuter
= regionMetric
;
185 for (Metric metric
: Metric
.values()) {
186 if (getReadRequest(serverMetrics
, regionMetric
, metric
) > requestsMapPrev
.get(metric
)) {
187 for (Metric metricInner
: Metric
.values()) {
188 requestsMap
.put(metricInner
, getReadRequest(serverMetrics
, regionMetric
,
191 metricsUpdated
= true;
197 if (metricsUpdated
) {
200 Thread
.sleep(SLEEP_MS
);
202 if (!metricsUpdated
) {
203 for (Metric metric
: Metric
.values()) {
204 requestsMap
.put(metric
, getReadRequest(serverMetrics
, regionMetricsOuter
, metric
));
209 private static long getReadRequest(ServerMetrics serverMetrics
, RegionMetrics regionMetrics
,
213 return regionMetrics
.getReadRequestCount();
215 return serverMetrics
.getRegionMetrics().get(regionMetrics
.getRegionName())
216 .getReadRequestCount();
217 case FILTERED_REGION_READ
:
218 return regionMetrics
.getFilteredReadRequestCount();
219 case FILTERED_SERVER_READ
:
220 return serverMetrics
.getRegionMetrics().get(regionMetrics
.getRegionName())
221 .getFilteredReadRequestCount();
223 throw new IllegalStateException();
227 private static void putData() throws IOException
{
231 put
.addColumn(CF1
, COL1
, VAL1
);
232 put
.addColumn(CF1
, COL2
, VAL2
);
233 put
.addColumn(CF1
, COL3
, VAL3
);
236 put
.addColumn(CF1
, COL1
, VAL2
); // put val2 instead of val1
237 put
.addColumn(CF1
, COL2
, VAL2
);
240 put
.addColumn(CF1
, COL1
, VAL1
);
241 put
.addColumn(CF1
, COL2
, VAL2
);
245 private static void putTTLExpiredData() throws IOException
, InterruptedException
{
249 put
.addColumn(CF2
, COL1
, VAL1
);
250 put
.addColumn(CF2
, COL2
, VAL2
);
253 Thread
.sleep(TTL
* 1000);
256 put
.addColumn(CF2
, COL1
, VAL1
);
257 put
.addColumn(CF2
, COL2
, VAL2
);
261 put
.addColumn(CF2
, COL1
, VAL1
);
262 put
.addColumn(CF2
, COL2
, VAL2
);
267 public static void tearDownOnce() throws Exception
{
268 TEST_UTIL
.shutdownMiniCluster();
272 public void testReadRequestsCountNotFiltered() throws Exception
{
282 try (ResultScanner scanner
= table
.getScanner(scan
)) {
284 for (Result ignore
: scanner
) {
287 testReadRequests(resultCount
, 3, 0);
291 scan
= new Scan(ROW2
, ROW3
);
292 try (ResultScanner scanner
= table
.getScanner(scan
)) {
294 for (Result ignore
: scanner
) {
297 testReadRequests(resultCount
, 1, 0);
302 Result result
= table
.get(get
);
303 resultCount
= result
.isEmpty() ?
0 : 1;
304 testReadRequests(resultCount
, 1, 0);
306 // test for increment
307 increment
= new Increment(ROW1
);
308 increment
.addColumn(CF1
, COL3
, 1);
309 result
= table
.increment(increment
);
310 resultCount
= result
.isEmpty() ?
0 : 1;
311 testReadRequests(resultCount
, 1, 0);
313 // test for checkAndPut
315 put
.addColumn(CF1
, COL2
, VAL2
);
316 boolean checkAndPut
=
317 table
.checkAndMutate(ROW1
, CF1
).qualifier(COL2
).ifEquals(VAL2
).thenPut(put
);
318 resultCount
= checkAndPut ?
1 : 0;
319 testReadRequests(resultCount
, 1, 0);
322 append
= new Append(ROW1
);
323 append
.addColumn(CF1
, COL2
, VAL2
);
324 result
= table
.append(append
);
325 resultCount
= result
.isEmpty() ?
0 : 1;
326 testReadRequests(resultCount
, 1, 0);
328 // test for checkAndMutate
330 put
.addColumn(CF1
, COL1
, VAL1
);
331 RowMutations rm
= new RowMutations(ROW1
);
333 boolean checkAndMutate
=
334 table
.checkAndMutate(ROW1
, CF1
).qualifier(COL1
).ifEquals(VAL1
).thenMutate(rm
);
335 resultCount
= checkAndMutate ?
1 : 0;
336 testReadRequests(resultCount
, 1, 0);
339 @Ignore // HBASE-19785
341 public void testReadRequestsCountWithFilter() throws Exception
{
347 scan
.setFilter(new SingleColumnValueFilter(CF1
, COL1
, CompareOperator
.EQUAL
, VAL1
));
348 try (ResultScanner scanner
= table
.getScanner(scan
)) {
350 for (Result ignore
: scanner
) {
353 testReadRequests(resultCount
, 2, 1);
358 scan
.setFilter(new RowFilter(CompareOperator
.EQUAL
, new BinaryComparator(ROW1
)));
359 try (ResultScanner scanner
= table
.getScanner(scan
)) {
361 for (Result ignore
: scanner
) {
364 testReadRequests(resultCount
, 1, 2);
368 scan
= new Scan(ROW2
, ROW3
);
369 scan
.setFilter(new RowFilter(CompareOperator
.EQUAL
, new BinaryComparator(ROW1
)));
370 try (ResultScanner scanner
= table
.getScanner(scan
)) {
372 for (Result ignore
: scanner
) {
375 testReadRequests(resultCount
, 0, 1);
378 // fixme filtered get should not increase readRequestsCount
379 // Get get = new Get(ROW2);
380 // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
381 // Result result = table.get(get);
382 // resultCount = result.isEmpty() ? 0 : 1;
383 // testReadRequests(resultCount, 0, 1);
386 @Ignore // HBASE-19785
388 public void testReadRequestsCountWithDeletedRow() throws Exception
{
390 Delete delete
= new Delete(ROW3
);
391 table
.delete(delete
);
393 Scan scan
= new Scan();
394 try (ResultScanner scanner
= table
.getScanner(scan
)) {
396 for (Result ignore
: scanner
) {
399 testReadRequests(resultCount
, 2, 1);
402 Put put
= new Put(ROW3
);
403 put
.addColumn(CF1
, COL1
, VAL1
);
404 put
.addColumn(CF1
, COL2
, VAL2
);
410 public void testReadRequestsCountWithTTLExpiration() throws Exception
{
413 Scan scan
= new Scan();
415 try (ResultScanner scanner
= table
.getScanner(scan
)) {
417 for (Result ignore
: scanner
) {
420 testReadRequests(resultCount
, 2, 1);
424 @Ignore // See HBASE-19785
426 public void testReadRequestsWithCoprocessor() throws Exception
{
427 TableName tableName
= TableName
.valueOf("testReadRequestsWithCoprocessor");
428 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
429 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(CF1
));
430 builder
.setCoprocessor(ScanRegionCoprocessor
.class.getName());
431 admin
.createTable(builder
.build());
434 TEST_UTIL
.waitTableAvailable(tableName
);
435 List
<RegionInfo
> regionInfos
= admin
.getRegions(tableName
);
436 assertEquals("Table " + TABLE_NAME
+ " should have 1 region", 1, regionInfos
.size());
437 boolean success
= true;
439 for (; i
< MAX_TRY
; i
++) {
441 testReadRequests(regionInfos
.get(0).getRegionName(), 3);
442 } catch (Throwable t
) {
443 LOG
.warn("Got exception when try " + i
+ " times", t
);
444 Thread
.sleep(SLEEP_MS
);
452 fail("Failed to get right read requests metric after try " + i
+ " times");
455 admin
.disableTable(tableName
);
456 admin
.deleteTable(tableName
);
460 private void testReadRequests(byte[] regionName
, int expectedReadRequests
) throws Exception
{
461 for (ServerName serverName
: serverNames
) {
462 ServerMetrics serverMetrics
= admin
.getClusterMetrics(
463 EnumSet
.of(Option
.LIVE_SERVERS
)).getLiveServerMetrics().get(serverName
);
464 Map
<byte[], RegionMetrics
> regionMetrics
= serverMetrics
.getRegionMetrics();
465 RegionMetrics regionMetric
= regionMetrics
.get(regionName
);
466 if (regionMetric
!= null) {
467 LOG
.debug("server read request is "
468 + serverMetrics
.getRegionMetrics().get(regionName
).getReadRequestCount()
469 + ", region read request is " + regionMetric
.getReadRequestCount());
470 assertEquals(3, serverMetrics
.getRegionMetrics().get(regionName
).getReadRequestCount());
471 assertEquals(3, regionMetric
.getReadRequestCount());
476 public static class ScanRegionCoprocessor
implements RegionCoprocessor
, RegionObserver
{
478 public Optional
<RegionObserver
> getRegionObserver() {
479 return Optional
.of(this);
483 public void postOpen(ObserverContext
<RegionCoprocessorEnvironment
> c
) {
484 RegionCoprocessorEnvironment env
= c
.getEnvironment();
485 Region region
= env
.getRegion();
488 RegionScanner scanner
= region
.getScanner(new Scan());
489 List
<Cell
> result
= new LinkedList
<>();
490 while (scanner
.next(result
)) {
493 } catch (Exception e
) {
494 LOG
.warn("Got exception in coprocessor", e
);
498 private void putData(Region region
) throws Exception
{
499 Put put
= new Put(ROW1
);
500 put
.addColumn(CF1
, COL1
, VAL1
);
503 put
.addColumn(CF1
, COL1
, VAL1
);
506 put
.addColumn(CF1
, COL1
, VAL1
);
511 private enum Metric
{REGION_READ
, SERVER_READ
, FILTERED_REGION_READ
, FILTERED_SERVER_READ
}