2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package org
.apache
.hadoop
.hbase
.regionserver
;
20 import java
.io
.IOException
;
21 import java
.util
.ArrayList
;
22 import java
.util
.List
;
23 import java
.util
.Random
;
24 import java
.util
.concurrent
.ThreadLocalRandom
;
26 import org
.apache
.hadoop
.hbase
.CompareOperator
;
27 import org
.apache
.hadoop
.hbase
.DoNotRetryIOException
;
28 import org
.apache
.hadoop
.hbase
.HBaseClassTestRule
;
29 import org
.apache
.hadoop
.hbase
.HBaseTestingUtil
;
30 import org
.apache
.hadoop
.hbase
.StartTestingClusterOption
;
31 import org
.apache
.hadoop
.hbase
.TableName
;
32 import org
.apache
.hadoop
.hbase
.client
.Admin
;
33 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptor
;
34 import org
.apache
.hadoop
.hbase
.client
.ColumnFamilyDescriptorBuilder
;
35 import org
.apache
.hadoop
.hbase
.client
.Connection
;
36 import org
.apache
.hadoop
.hbase
.client
.Put
;
37 import org
.apache
.hadoop
.hbase
.client
.Result
;
38 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
39 import org
.apache
.hadoop
.hbase
.client
.Scan
;
40 import org
.apache
.hadoop
.hbase
.client
.Table
;
41 import org
.apache
.hadoop
.hbase
.client
.TableDescriptor
;
42 import org
.apache
.hadoop
.hbase
.client
.TableDescriptorBuilder
;
43 import org
.apache
.hadoop
.hbase
.filter
.SingleColumnValueFilter
;
44 import org
.apache
.hadoop
.hbase
.io
.encoding
.DataBlockEncoding
;
45 import org
.apache
.hadoop
.hbase
.testclassification
.LargeTests
;
46 import org
.apache
.hadoop
.hbase
.testclassification
.RegionServerTests
;
47 import org
.apache
.hadoop
.hbase
.util
.Bytes
;
48 import org
.junit
.AfterClass
;
49 import org
.junit
.BeforeClass
;
50 import org
.junit
.ClassRule
;
51 import org
.junit
.Rule
;
52 import org
.junit
.Test
;
53 import org
.junit
.experimental
.categories
.Category
;
54 import org
.junit
.rules
.TestName
;
55 import org
.slf4j
.Logger
;
56 import org
.slf4j
.LoggerFactory
;
58 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
59 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLineParser
;
60 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.GnuParser
;
61 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.HelpFormatter
;
62 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Option
;
63 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.Options
;
66 * Test performance improvement of joined scanners optimization:
67 * https://issues.apache.org/jira/browse/HBASE-5416
69 @Category({RegionServerTests
.class, LargeTests
.class})
70 public class TestJoinedScanners
{
73 public static final HBaseClassTestRule CLASS_RULE
=
74 HBaseClassTestRule
.forClass(TestJoinedScanners
.class);
76 private static final Logger LOG
= LoggerFactory
.getLogger(TestJoinedScanners
.class);
78 private static final HBaseTestingUtil TEST_UTIL
= new HBaseTestingUtil();
80 private static final byte[] cf_essential
= Bytes
.toBytes("essential");
81 private static final byte[] cf_joined
= Bytes
.toBytes("joined");
82 private static final byte[] col_name
= Bytes
.toBytes("a");
83 private static final byte[] flag_yes
= Bytes
.toBytes("Y");
84 private static final byte[] flag_no
= Bytes
.toBytes("N");
86 private static DataBlockEncoding blockEncoding
= DataBlockEncoding
.FAST_DIFF
;
87 private static int selectionRatio
= 30;
88 private static int valueWidth
= 128 * 1024;
91 public TestName name
= new TestName();
94 public static void setUpBeforeClass() throws Exception
{
95 final int DEFAULT_BLOCK_SIZE
= 1024 * 1024;
96 TEST_UTIL
.getConfiguration().setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE
);
97 TEST_UTIL
.getConfiguration().setInt("dfs.replication", 1);
98 TEST_UTIL
.getConfiguration().setLong("hbase.hregion.max.filesize", 322122547200L);
100 String
[] dataNodeHosts
= new String
[] {"host1", "host2", "host3"};
101 int regionServersCount
= 3;
102 StartTestingClusterOption option
= StartTestingClusterOption
.builder()
103 .numRegionServers(regionServersCount
).dataNodeHosts(dataNodeHosts
).build();
104 TEST_UTIL
.startMiniCluster(option
);
108 public static void tearDownAfterClass() throws Exception
{
109 TEST_UTIL
.shutdownMiniCluster();
113 public void testJoinedScanners() throws Exception
{
114 byte[][] families
= {cf_essential
, cf_joined
};
116 final TableName tableName
= TableName
.valueOf(name
.getMethodName());
117 TableDescriptorBuilder builder
=
118 TableDescriptorBuilder
.newBuilder(tableName
);
119 for (byte[] family
: families
) {
120 ColumnFamilyDescriptor familyDescriptor
= ColumnFamilyDescriptorBuilder
.newBuilder(family
)
121 .setDataBlockEncoding(blockEncoding
).build();
122 builder
.setColumnFamily(familyDescriptor
);
124 TableDescriptor tableDescriptor
= builder
.build();
125 TEST_UTIL
.getAdmin().createTable(tableDescriptor
);
126 Table ht
= TEST_UTIL
.getConnection().getTable(tableName
);
128 long rows_to_insert
= 1000;
129 int insert_batch
= 20;
131 LOG
.info("Make " + Long
.toString(rows_to_insert
) + " rows, total size = " + Float
132 .toString(rows_to_insert
* valueWidth
/ 1024 / 1024) + " MB");
134 long time
= System
.nanoTime();
135 Random rand
= ThreadLocalRandom
.current();
136 byte[] val_large
= new byte[valueWidth
];
137 List
<Put
> puts
= new ArrayList
<>();
138 for (long i
= 0; i
< rows_to_insert
; i
++) {
139 Put put
= new Put(Bytes
.toBytes(Long
.toString(i
)));
140 if (rand
.nextInt(100) <= selectionRatio
) {
141 put
.addColumn(cf_essential
, col_name
, flag_yes
);
143 put
.addColumn(cf_essential
, col_name
, flag_no
);
145 put
.addColumn(cf_joined
, col_name
, val_large
);
147 if (puts
.size() >= insert_batch
) {
152 if (!puts
.isEmpty()) {
157 LOG
.info("Data generated in "
158 + Double
.toString((System
.nanoTime() - time
) / 1000000000.0) + " seconds");
161 for (int i
= 0; i
< 10; ++i
) {
162 runScanner(ht
, slow
);
169 private void runScanner(Table table
, boolean slow
) throws Exception
{
170 long time
= System
.nanoTime();
171 Scan scan
= new Scan();
172 scan
.addColumn(cf_essential
, col_name
);
173 scan
.addColumn(cf_joined
, col_name
);
175 SingleColumnValueFilter filter
= new SingleColumnValueFilter(
176 cf_essential
, col_name
, CompareOperator
.EQUAL
, flag_yes
);
177 filter
.setFilterIfMissing(true);
178 scan
.setFilter(filter
);
179 scan
.setLoadColumnFamiliesOnDemand(!slow
);
181 ResultScanner result_scanner
= table
.getScanner(scan
);
184 while ((res
= result_scanner
.next()) != null) {
188 double timeSec
= (System
.nanoTime() - time
) / 1000000000.0;
189 result_scanner
.close();
190 LOG
.info((slow ?
"Slow" : "Joined") + " scanner finished in " + Double
.toString(timeSec
)
191 + " seconds, got " + Long
.toString(rows_count
/2) + " rows");
194 private static Options options
= new Options();
197 * Command line interface:
199 * @throws IOException if there is a bug while reading from disk
201 public static void main(final String
[] args
) throws Exception
{
202 Option encodingOption
= new Option("e", "blockEncoding", true,
203 "Data block encoding; Default: FAST_DIFF");
204 encodingOption
.setRequired(false);
205 options
.addOption(encodingOption
);
207 Option ratioOption
= new Option("r", "selectionRatio", true,
208 "Ratio of selected rows using essential column family");
209 ratioOption
.setRequired(false);
210 options
.addOption(ratioOption
);
212 Option widthOption
= new Option("w", "valueWidth", true,
213 "Width of value for non-essential column family");
214 widthOption
.setRequired(false);
215 options
.addOption(widthOption
);
217 CommandLineParser parser
= new GnuParser();
218 CommandLine cmd
= parser
.parse(options
, args
);
219 if (args
.length
< 1) {
220 HelpFormatter formatter
= new HelpFormatter();
221 formatter
.printHelp("TestJoinedScanners", options
, true);
224 if (cmd
.hasOption("e")) {
225 blockEncoding
= DataBlockEncoding
.valueOf(cmd
.getOptionValue("e"));
227 if (cmd
.hasOption("r")) {
228 selectionRatio
= Integer
.parseInt(cmd
.getOptionValue("r"));
230 if (cmd
.hasOption("w")) {
231 valueWidth
= Integer
.parseInt(cmd
.getOptionValue("w"));
234 TestJoinedScanners test
= new TestJoinedScanners();
235 test
.testJoinedScanners();
238 @Test(expected
= DoNotRetryIOException
.class)
239 public void testWithReverseScan() throws Exception
{
240 try (Connection con
= TEST_UTIL
.getConnection(); Admin admin
= con
.getAdmin()) {
241 TableName tableName
= TableName
.valueOf(name
.getMethodName());
243 TableDescriptor tableDescriptor
= TableDescriptorBuilder
.newBuilder(tableName
)
244 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("cf1"))
245 .setColumnFamily(ColumnFamilyDescriptorBuilder
.of("cf2"))
247 admin
.createTable(tableDescriptor
);
249 try (Table table
= con
.getTable(tableName
)) {
250 SingleColumnValueFilter filter
= new SingleColumnValueFilter(Bytes
.toBytes("cf1"),
251 Bytes
.toBytes("col"), CompareOperator
.EQUAL
, Bytes
.toBytes("val"));
252 filter
.setFilterIfMissing(true);
254 // Reverse scan with loading CFs on demand
255 Scan scan
= new Scan();
256 scan
.setFilter(filter
);
257 scan
.setReversed(true);
258 scan
.setLoadColumnFamiliesOnDemand(true);
260 try (ResultScanner scanner
= table
.getScanner(scan
)) {
261 // DoNotRetryIOException should occur