2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.fail
;
23 import java
.io
.IOException
;
24 import java
.util
.Collection
;
25 import java
.util
.EnumSet
;
26 import java
.util
.HashMap
;
27 import java
.util
.LinkedList
;
28 import java
.util
.List
;
30 import java
.util
.Optional
;
31 import org
.apache
.hadoop
.hbase
.Cell
;
32 import org
.apache
.hadoop
.hbase
.ClusterMetrics
.Option
;
33 import org
.apache
.hadoop
.hbase
.CompareOperator
;
34 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
35 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
36 import org
.apache
.hadoop
.hbase
.RegionMetrics
;
37 import org
.apache
.hadoop
.hbase
.ServerMetrics
;
38 import org
.apache
.hadoop
.hbase
.ServerName
;
39 import org
.apache
.hadoop
.hbase
.TableName
;
40 import org
.apache
.hadoop
.hbase
.client
.Admin
;
41 import org
.apache
.hadoop
.hbase
.client
.Append
;
42 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
43 import org
.apache
.hadoop
.hbase
.client
.Delete
;
44 import org
.apache
.hadoop
.hbase
.client
.Get
;
45 import org
.apache
.hadoop
.hbase
.client
.Increment
;
46 import org
.apache
.hadoop
.hbase
.client
.Put
;
47 import org
.apache
.hadoop
.hbase
.client
.RegionInfo
;
48 import org
.apache
.hadoop
.hbase
.client
.Result
;
49 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
50 import org
.apache
.hadoop
.hbase
.client
.RowMutations
;
51 import org
.apache
.hadoop
.hbase
.client
.Scan
;
52 import org
.apache
.hadoop
.hbase
.client
.Table
;
53 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
54 import org
.apache
.hadoop
.hbase
.coprocessor
.ObserverContext
;
55 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessor
;
56 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionCoprocessorEnvironment
;
57 import org
.apache
.hadoop
.hbase
.coprocessor
.RegionObserver
;
58 import org
.apache
.hadoop
.hbase
.filter
.BinaryComparator
;
59 import org
.apache
.hadoop
.hbase
.filter
.RowFilter
;
60 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
61 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
62 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
63 import org
.junit
.AfterClass
;
64 import org
.junit
.BeforeClass
;
65 import org
.junit
.ClassRule
;
66 import org
.junit
.Ignore
;
67 import org
.junit
.Test
;
68 import org
.junit
.experimental
.categories
.Category
;
69 import org
.slf4j
.Logger
;
70 import org
.slf4j
.LoggerFactory
;
72 @Ignore // Depends on Master being able to host regions. Needs fixing.
73 @Category(MediumTests
.class)
74 public class TestRegionServerReadRequestMetrics
{
77 public static final HBaseClassTestRule CLASS_RULE
=
78 HBaseClassTestRule
.forClass(TestRegionServerReadRequestMetrics
.class);
80 private static final Logger LOG
=
81 LoggerFactory
.getLogger(TestRegionServerReadRequestMetrics
.class);
82 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
83 private static final TableName TABLE_NAME
= TableName
.valueOf("test");
84 private static final byte[] CF1
= Bytes
.toBytes("c1");
85 private static final byte[] CF2
= Bytes
.toBytes("c2");
87 private static final byte[] ROW1
= Bytes
.toBytes("a");
88 private static final byte[] ROW2
= Bytes
.toBytes("b");
89 private static final byte[] ROW3
= Bytes
.toBytes("c");
90 private static final byte[] COL1
= Bytes
.toBytes("q1");
91 private static final byte[] COL2
= Bytes
.toBytes("q2");
92 private static final byte[] COL3
= Bytes
.toBytes("q3");
93 private static final byte[] VAL1
= Bytes
.toBytes("v1");
94 private static final byte[] VAL2
= Bytes
.toBytes("v2");
95 private static final byte[] VAL3
= Bytes
.toBytes(0L);
97 private static final int MAX_TRY
= 20;
98 private static final int SLEEP_MS
= 100;
99 private static final int TTL
= 1;
101 private static Admin admin
;
102 private static Collection
<ServerName
> serverNames
;
103 private static Table table
;
104 private static RegionInfo regionInfo
;
106 private static Map
<Metric
, Long
> requestsMap
= new HashMap
<>();
107 private static Map
<Metric
, Long
> requestsMapPrev
= new HashMap
<>();
110 public static void setUpOnce() throws Exception
{
111 TEST_UTIL
.startMiniCluster();
112 admin
= TEST_UTIL
.getAdmin();
113 serverNames
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
))
114 .getLiveServerMetrics().keySet();
115 table
= createTable();
117 List
<RegionInfo
> regions
= admin
.getRegions(TABLE_NAME
);
118 assertEquals("Table " + TABLE_NAME
+ " should have 1 region", 1, regions
.size());
119 regionInfo
= regions
.get(0);
121 for (Metric metric
: Metric
.values()) {
122 requestsMap
.put(metric
, 0L);
123 requestsMapPrev
.put(metric
, 0L);
127 private static Table
createTable() throws IOException
{
128 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(TABLE_NAME
);
129 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(CF1
));
130 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF2
).setTimeToLive(TTL
)
132 admin
.createTable(builder
.build());
133 return TEST_UTIL
.getConnection().getTable(TABLE_NAME
);
136 private static void testReadRequests(long resultCount
,
137 long expectedReadRequests
, long expectedFilteredReadRequests
)
138 throws IOException
, InterruptedException
{
140 System
.out
.println("requestsMapPrev = " + requestsMapPrev
);
141 System
.out
.println("requestsMap = " + requestsMap
);
143 assertEquals(expectedReadRequests
,
144 requestsMap
.get(Metric
.REGION_READ
) - requestsMapPrev
.get(Metric
.REGION_READ
));
145 assertEquals(expectedFilteredReadRequests
,
146 requestsMap
.get(Metric
.FILTERED_REGION_READ
)
147 - requestsMapPrev
.get(Metric
.FILTERED_REGION_READ
));
148 assertEquals(expectedFilteredReadRequests
,
149 requestsMap
.get(Metric
.FILTERED_SERVER_READ
)
150 - requestsMapPrev
.get(Metric
.FILTERED_SERVER_READ
));
151 assertEquals(expectedReadRequests
, resultCount
);
154 private static void updateMetricsMap() throws IOException
, InterruptedException
{
155 for (Metric metric
: Metric
.values()) {
156 requestsMapPrev
.put(metric
, requestsMap
.get(metric
));
159 ServerMetrics serverMetrics
= null;
160 RegionMetrics regionMetricsOuter
= null;
161 boolean metricsUpdated
= false;
162 for (int i
= 0; i
< MAX_TRY
; i
++) {
163 for (ServerName serverName
: serverNames
) {
164 serverMetrics
= admin
.getClusterMetrics(EnumSet
.of(Option
.LIVE_SERVERS
))
165 .getLiveServerMetrics().get(serverName
);
167 Map
<byte[], RegionMetrics
> regionMetrics
= serverMetrics
.getRegionMetrics();
168 RegionMetrics regionMetric
= regionMetrics
.get(regionInfo
.getRegionName());
169 if (regionMetric
!= null) {
170 regionMetricsOuter
= regionMetric
;
171 for (Metric metric
: Metric
.values()) {
172 if (getReadRequest(serverMetrics
, regionMetric
, metric
) > requestsMapPrev
.get(metric
)) {
173 for (Metric metricInner
: Metric
.values()) {
174 requestsMap
.put(metricInner
, getReadRequest(serverMetrics
, regionMetric
,
177 metricsUpdated
= true;
183 if (metricsUpdated
) {
186 Thread
.sleep(SLEEP_MS
);
188 if (!metricsUpdated
) {
189 for (Metric metric
: Metric
.values()) {
190 requestsMap
.put(metric
, getReadRequest(serverMetrics
, regionMetricsOuter
, metric
));
195 private static long getReadRequest(ServerMetrics serverMetrics
, RegionMetrics regionMetrics
,
199 return regionMetrics
.getReadRequestCount();
201 return serverMetrics
.getRegionMetrics().get(regionMetrics
.getRegionName())
202 .getReadRequestCount();
203 case FILTERED_REGION_READ
:
204 return regionMetrics
.getFilteredReadRequestCount();
205 case FILTERED_SERVER_READ
:
206 return serverMetrics
.getRegionMetrics().get(regionMetrics
.getRegionName())
207 .getFilteredReadRequestCount();
209 throw new IllegalStateException();
213 private static void putData() throws IOException
{
217 put
.addColumn(CF1
, COL1
, VAL1
);
218 put
.addColumn(CF1
, COL2
, VAL2
);
219 put
.addColumn(CF1
, COL3
, VAL3
);
222 put
.addColumn(CF1
, COL1
, VAL2
); // put val2 instead of val1
223 put
.addColumn(CF1
, COL2
, VAL2
);
226 put
.addColumn(CF1
, COL1
, VAL1
);
227 put
.addColumn(CF1
, COL2
, VAL2
);
231 private static void putTTLExpiredData() throws IOException
, InterruptedException
{
235 put
.addColumn(CF2
, COL1
, VAL1
);
236 put
.addColumn(CF2
, COL2
, VAL2
);
239 Thread
.sleep(TTL
* 1000);
242 put
.addColumn(CF2
, COL1
, VAL1
);
243 put
.addColumn(CF2
, COL2
, VAL2
);
247 put
.addColumn(CF2
, COL1
, VAL1
);
248 put
.addColumn(CF2
, COL2
, VAL2
);
253 public static void tearDownOnce() throws Exception
{
254 TEST_UTIL
.shutdownMiniCluster();
258 public void testReadRequestsCountNotFiltered() throws Exception
{
268 try (ResultScanner scanner
= table
.getScanner(scan
)) {
270 for (Result ignore
: scanner
) {
273 testReadRequests(resultCount
, 3, 0);
277 scan
= new Scan().withStartRow(ROW2
).withStopRow(ROW3
);
278 try (ResultScanner scanner
= table
.getScanner(scan
)) {
280 for (Result ignore
: scanner
) {
283 testReadRequests(resultCount
, 1, 0);
288 Result result
= table
.get(get
);
289 resultCount
= result
.isEmpty() ?
0 : 1;
290 testReadRequests(resultCount
, 1, 0);
292 // test for increment
293 increment
= new Increment(ROW1
);
294 increment
.addColumn(CF1
, COL3
, 1);
295 result
= table
.increment(increment
);
296 resultCount
= result
.isEmpty() ?
0 : 1;
297 testReadRequests(resultCount
, 1, 0);
299 // test for checkAndPut
301 put
.addColumn(CF1
, COL2
, VAL2
);
302 boolean checkAndPut
=
303 table
.checkAndMutate(ROW1
, CF1
).qualifier(COL2
).ifEquals(VAL2
).thenPut(put
);
304 resultCount
= checkAndPut ?
1 : 0;
305 testReadRequests(resultCount
, 1, 0);
308 append
= new Append(ROW1
);
309 append
.addColumn(CF1
, COL2
, VAL2
);
310 result
= table
.append(append
);
311 resultCount
= result
.isEmpty() ?
0 : 1;
312 testReadRequests(resultCount
, 1, 0);
314 // test for checkAndMutate
316 put
.addColumn(CF1
, COL1
, VAL1
);
317 RowMutations rm
= new RowMutations(ROW1
);
319 boolean checkAndMutate
=
320 table
.checkAndMutate(ROW1
, CF1
).qualifier(COL1
).ifEquals(VAL1
).thenMutate(rm
);
321 resultCount
= checkAndMutate ?
1 : 0;
322 testReadRequests(resultCount
, 1, 0);
325 @Ignore // HBASE-19785
327 public void testReadRequestsCountWithFilter() throws Exception
{
333 scan
.setFilter(new SingleColumnValueFilter(CF1
, COL1
, CompareOperator
.EQUAL
, VAL1
));
334 try (ResultScanner scanner
= table
.getScanner(scan
)) {
336 for (Result ignore
: scanner
) {
339 testReadRequests(resultCount
, 2, 1);
344 scan
.setFilter(new RowFilter(CompareOperator
.EQUAL
, new BinaryComparator(ROW1
)));
345 try (ResultScanner scanner
= table
.getScanner(scan
)) {
347 for (Result ignore
: scanner
) {
350 testReadRequests(resultCount
, 1, 2);
354 scan
= new Scan().withStartRow(ROW2
).withStopRow(ROW3
);
355 scan
.setFilter(new RowFilter(CompareOperator
.EQUAL
, new BinaryComparator(ROW1
)));
356 try (ResultScanner scanner
= table
.getScanner(scan
)) {
358 for (Result ignore
: scanner
) {
361 testReadRequests(resultCount
, 0, 1);
364 // fixme filtered get should not increase readRequestsCount
365 // Get get = new Get(ROW2);
366 // get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
367 // Result result = table.get(get);
368 // resultCount = result.isEmpty() ? 0 : 1;
369 // testReadRequests(resultCount, 0, 1);
372 @Ignore // HBASE-19785
374 public void testReadRequestsCountWithDeletedRow() throws Exception
{
376 Delete delete
= new Delete(ROW3
);
377 table
.delete(delete
);
379 Scan scan
= new Scan();
380 try (ResultScanner scanner
= table
.getScanner(scan
)) {
382 for (Result ignore
: scanner
) {
385 testReadRequests(resultCount
, 2, 1);
388 Put put
= new Put(ROW3
);
389 put
.addColumn(CF1
, COL1
, VAL1
);
390 put
.addColumn(CF1
, COL2
, VAL2
);
396 public void testReadRequestsCountWithTTLExpiration() throws Exception
{
399 Scan scan
= new Scan();
401 try (ResultScanner scanner
= table
.getScanner(scan
)) {
403 for (Result ignore
: scanner
) {
406 testReadRequests(resultCount
, 2, 1);
410 @Ignore // See HBASE-19785
412 public void testReadRequestsWithCoprocessor() throws Exception
{
413 TableName tableName
= TableName
.valueOf("testReadRequestsWithCoprocessor");
414 TableDescriptorBuilder builder
= TableDescriptorBuilder
.newBuilder(tableName
);
415 builder
.setColumnFamily(ColumnFamilyDescriptorBuilder
.of(CF1
));
416 builder
.setCoprocessor(ScanRegionCoprocessor
.class.getName());
417 admin
.createTable(builder
.build());
420 TEST_UTIL
.waitTableAvailable(tableName
);
421 List
<RegionInfo
> regionInfos
= admin
.getRegions(tableName
);
422 assertEquals("Table " + TABLE_NAME
+ " should have 1 region", 1, regionInfos
.size());
423 boolean success
= true;
425 for (; i
< MAX_TRY
; i
++) {
427 testReadRequests(regionInfos
.get(0).getRegionName(), 3);
428 } catch (Throwable t
) {
429 LOG
.warn("Got exception when try " + i
+ " times", t
);
430 Thread
.sleep(SLEEP_MS
);
438 fail("Failed to get right read requests metric after try " + i
+ " times");
441 admin
.disableTable(tableName
);
442 admin
.deleteTable(tableName
);
446 private void testReadRequests(byte[] regionName
, int expectedReadRequests
) throws Exception
{
447 for (ServerName serverName
: serverNames
) {
448 ServerMetrics serverMetrics
= admin
.getClusterMetrics(
449 EnumSet
.of(Option
.LIVE_SERVERS
)).getLiveServerMetrics().get(serverName
);
450 Map
<byte[], RegionMetrics
> regionMetrics
= serverMetrics
.getRegionMetrics();
451 RegionMetrics regionMetric
= regionMetrics
.get(regionName
);
452 if (regionMetric
!= null) {
453 LOG
.debug("server read request is "
454 + serverMetrics
.getRegionMetrics().get(regionName
).getReadRequestCount()
455 + ", region read request is " + regionMetric
.getReadRequestCount());
456 assertEquals(3, serverMetrics
.getRegionMetrics().get(regionName
).getReadRequestCount());
457 assertEquals(3, regionMetric
.getReadRequestCount());
462 public static class ScanRegionCoprocessor
implements RegionCoprocessor
, RegionObserver
{
464 public Optional
<RegionObserver
> getRegionObserver() {
465 return Optional
.of(this);
469 public void postOpen(ObserverContext
<RegionCoprocessorEnvironment
> c
) {
470 RegionCoprocessorEnvironment env
= c
.getEnvironment();
471 Region region
= env
.getRegion();
474 RegionScanner scanner
= region
.getScanner(new Scan());
475 List
<Cell
> result
= new LinkedList
<>();
476 while (scanner
.next(result
)) {
479 } catch (Exception e
) {
480 LOG
.warn("Got exception in coprocessor", e
);
484 private void putData(Region region
) throws Exception
{
485 Put put
= new Put(ROW1
);
486 put
.addColumn(CF1
, COL1
, VAL1
);
489 put
.addColumn(CF1
, COL1
, VAL1
);
492 put
.addColumn(CF1
, COL1
, VAL1
);
497 private enum Metric
{REGION_READ
, SERVER_READ
, FILTERED_REGION_READ
, FILTERED_SERVER_READ
}