2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.client
;
20 import static org
.junit
.Assert
.assertEquals
;
21 import static org
.junit
.Assert
.assertNull
;
23 import java
.io
.IOException
;
24 import java
.util
.ArrayList
;
25 import java
.util
.Arrays
;
26 import java
.util
.List
;
27 import java
.util
.concurrent
.ForkJoinPool
;
28 import org
.apache
.commons
.io
.IOUtils
;
29 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
30 import org
.apache
.hadoop
.hbase
.HBaseTestingUtility
;
31 import org
.apache
.hadoop
.hbase
.PrivateCellUtil
;
32 import org
.apache
.hadoop
.hbase
.TableName
;
33 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
34 import org
.apache
.hadoop
.hbase
.testclassification
.ClientTests
;
35 import org
.apache
.hadoop
.hbase
.testclassification
.MediumTests
;
36 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
37 import org
.apache
.hadoop
.hbase
.util
.Pair
;
38 import org
.junit
.AfterClass
;
39 import org
.junit
.BeforeClass
;
40 import org
.junit
.ClassRule
;
41 import org
.junit
.Test
;
42 import org
.junit
.experimental
.categories
.Category
;
43 import org
.junit
.runner
.RunWith
;
44 import org
.junit
.runners
.Parameterized
;
45 import org
.junit
.runners
.Parameterized
.Parameter
;
46 import org
.junit
.runners
.Parameterized
.Parameters
;
48 @RunWith(Parameterized
.class)
49 @Category({ MediumTests
.class, ClientTests
.class })
50 public class TestAsyncTableScanMetrics
{
53 public static final HBaseClassTestRule CLASS_RULE
=
54 HBaseClassTestRule
.forClass(TestAsyncTableScanMetrics
.class);
56 private static final HBaseTestingUtility UTIL
= new HBaseTestingUtility();
58 private static final TableName TABLE_NAME
= TableName
.valueOf("ScanMetrics");
60 private static final byte[] CF
= Bytes
.toBytes("cf");
62 private static final byte[] CQ
= Bytes
.toBytes("cq");
64 private static final byte[] VALUE
= Bytes
.toBytes("value");
66 private static AsyncConnection CONN
;
68 private static int NUM_REGIONS
;
71 private interface ScanWithMetrics
{
72 Pair
<List
<Result
>, ScanMetrics
> scan(Scan scan
) throws Exception
;
76 public String methodName
;
79 public ScanWithMetrics method
;
81 @Parameters(name
= "{index}: scan={0}")
82 public static List
<Object
[]> params() {
83 ScanWithMetrics doScanWithRawAsyncTable
= TestAsyncTableScanMetrics
::doScanWithRawAsyncTable
;
84 ScanWithMetrics doScanWithAsyncTableScan
= TestAsyncTableScanMetrics
::doScanWithAsyncTableScan
;
85 ScanWithMetrics doScanWithAsyncTableScanner
=
86 TestAsyncTableScanMetrics
::doScanWithAsyncTableScanner
;
87 return Arrays
.asList(new Object
[] { "doScanWithRawAsyncTable", doScanWithRawAsyncTable
},
88 new Object
[] { "doScanWithAsyncTableScan", doScanWithAsyncTableScan
},
89 new Object
[] { "doScanWithAsyncTableScanner", doScanWithAsyncTableScanner
});
93 public static void setUp() throws Exception
{
94 UTIL
.startMiniCluster(3);
95 // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
96 // scan are forced to hit all the regions.
97 try (Table table
= UTIL
.createMultiRegionTable(TABLE_NAME
, CF
)) {
98 table
.put(Arrays
.asList(new Put(Bytes
.toBytes("zzz1")).addColumn(CF
, CQ
, VALUE
),
99 new Put(Bytes
.toBytes("zzz2")).addColumn(CF
, CQ
, VALUE
),
100 new Put(Bytes
.toBytes("zzz3")).addColumn(CF
, CQ
, VALUE
)));
102 CONN
= ConnectionFactory
.createAsyncConnection(UTIL
.getConfiguration()).get();
103 NUM_REGIONS
= UTIL
.getHBaseCluster().getRegions(TABLE_NAME
).size();
107 public static void tearDown() throws Exception
{
108 IOUtils
.closeQuietly(CONN
);
109 UTIL
.shutdownMiniCluster();
112 private static Pair
<List
<Result
>, ScanMetrics
> doScanWithRawAsyncTable(Scan scan
)
113 throws IOException
, InterruptedException
{
114 BufferingScanResultConsumer consumer
= new BufferingScanResultConsumer();
115 CONN
.getTable(TABLE_NAME
).scan(scan
, consumer
);
116 List
<Result
> results
= new ArrayList
<>();
117 for (Result result
; (result
= consumer
.take()) != null;) {
120 return Pair
.newPair(results
, consumer
.getScanMetrics());
123 private static Pair
<List
<Result
>, ScanMetrics
> doScanWithAsyncTableScan(Scan scan
)
125 SimpleScanResultConsumer consumer
= new SimpleScanResultConsumer();
126 CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool()).scan(scan
, consumer
);
127 return Pair
.newPair(consumer
.getAll(), consumer
.getScanMetrics());
130 private static Pair
<List
<Result
>, ScanMetrics
> doScanWithAsyncTableScanner(Scan scan
)
132 try (ResultScanner scanner
=
133 CONN
.getTable(TABLE_NAME
, ForkJoinPool
.commonPool()).getScanner(scan
)) {
134 List
<Result
> results
= new ArrayList
<>();
135 for (Result result
; (result
= scanner
.next()) != null;) {
138 return Pair
.newPair(results
, scanner
.getScanMetrics());
143 public void testNoScanMetrics() throws Exception
{
144 Pair
<List
<Result
>, ScanMetrics
> pair
= method
.scan(new Scan());
145 assertEquals(3, pair
.getFirst().size());
146 assertNull(pair
.getSecond());
150 public void testScanMetrics() throws Exception
{
151 Pair
<List
<Result
>, ScanMetrics
> pair
= method
.scan(new Scan().setScanMetricsEnabled(true));
152 List
<Result
> results
= pair
.getFirst();
153 assertEquals(3, results
.size());
154 long bytes
= results
.stream().flatMap(r
-> Arrays
.asList(r
.rawCells()).stream())
155 .mapToLong(c
-> PrivateCellUtil
.estimatedSerializedSizeOf(c
)).sum();
156 ScanMetrics scanMetrics
= pair
.getSecond();
157 assertEquals(NUM_REGIONS
, scanMetrics
.countOfRegions
.get());
158 assertEquals(bytes
, scanMetrics
.countOfBytesInResults
.get());
159 assertEquals(NUM_REGIONS
, scanMetrics
.countOfRPCcalls
.get());
160 // also assert a server side metric to ensure that we have published them into the client side
162 assertEquals(3, scanMetrics
.countOfRowsScanned
.get());