2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org
.apache
.hadoop
.hbase
;
21 import java
.io
.IOException
;
22 import java
.util
.concurrent
.TimeUnit
;
23 import org
.apache
.hadoop
.conf
.Configuration
;
24 import org
.apache
.hadoop
.fs
.FSDataInputStream
;
25 import org
.apache
.hadoop
.fs
.FileSystem
;
26 import org
.apache
.hadoop
.fs
.Path
;
27 import org
.apache
.hadoop
.hbase
.client
.Connection
;
28 import org
.apache
.hadoop
.hbase
.client
.ConnectionFactory
;
29 import org
.apache
.hadoop
.hbase
.client
.Result
;
30 import org
.apache
.hadoop
.hbase
.client
.ResultScanner
;
31 import org
.apache
.hadoop
.hbase
.client
.Scan
;
32 import org
.apache
.hadoop
.hbase
.client
.Table
;
33 import org
.apache
.hadoop
.hbase
.client
.TableSnapshotScanner
;
34 import org
.apache
.hadoop
.hbase
.client
.metrics
.ScanMetrics
;
35 import org
.apache
.hadoop
.hbase
.io
.ImmutableBytesWritable
;
36 import org
.apache
.hadoop
.hbase
.mapreduce
.TableMapReduceUtil
;
37 import org
.apache
.hadoop
.hbase
.mapreduce
.TableMapper
;
38 import org
.apache
.hadoop
.hbase
.util
.AbstractHBaseTool
;
39 import org
.apache
.hadoop
.hbase
.util
.CommonFSUtils
;
40 import org
.apache
.hadoop
.io
.NullWritable
;
41 import org
.apache
.hadoop
.mapreduce
.Counters
;
42 import org
.apache
.hadoop
.mapreduce
.Job
;
43 import org
.apache
.hadoop
.mapreduce
.lib
.output
.NullOutputFormat
;
44 import org
.apache
.hadoop
.util
.StringUtils
;
45 import org
.apache
.hadoop
.util
.ToolRunner
;
46 import org
.apache
.yetus
.audience
.InterfaceAudience
;
48 import org
.apache
.hbase
.thirdparty
.com
.google
.common
.base
.Stopwatch
;
49 import org
.apache
.hbase
.thirdparty
.org
.apache
.commons
.cli
.CommandLine
;
52 * A simple performance evaluation tool for single client and MR scans
55 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience
.TOOLS
)
56 public class ScanPerformanceEvaluation
extends AbstractHBaseTool
{
58 private static final String HBASE_COUNTER_GROUP_NAME
= "HBaseCounters";
62 private String tablename
;
63 private String snapshotName
;
64 private String restoreDir
;
65 private String caching
;
68 public void setConf(Configuration conf
) {
72 rootDir
= CommonFSUtils
.getRootDir(conf
);
73 rootDir
.getFileSystem(conf
);
74 } catch (IOException ex
) {
75 throw new RuntimeException(ex
);
80 protected void addOptions() {
81 this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
82 this.addOptWithArg("f", "file", "the filename to read from");
83 this.addOptWithArg("tn", "table", "the tablename to read from");
84 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
85 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
86 this.addOptWithArg("ch", "caching", "scanner caching value");
90 protected void processOptions(CommandLine cmd
) {
91 type
= cmd
.getOptionValue("type");
92 file
= cmd
.getOptionValue("file");
93 tablename
= cmd
.getOptionValue("table");
94 snapshotName
= cmd
.getOptionValue("snapshot");
95 restoreDir
= cmd
.getOptionValue("restoredir");
96 caching
= cmd
.getOptionValue("caching");
99 protected void testHdfsStreaming(Path filename
) throws IOException
{
100 byte[] buf
= new byte[1024];
101 FileSystem fs
= filename
.getFileSystem(getConf());
103 // read the file from start to finish
104 Stopwatch fileOpenTimer
= Stopwatch
.createUnstarted();
105 Stopwatch streamTimer
= Stopwatch
.createUnstarted();
107 fileOpenTimer
.start();
108 FSDataInputStream in
= fs
.open(filename
);
109 fileOpenTimer
.stop();
114 int read
= in
.read(buf
);
122 double throughput
= (double)totalBytes
/ streamTimer
.elapsed(TimeUnit
.SECONDS
);
124 System
.out
.println("HDFS streaming: ");
125 System
.out
.println("total time to open: " +
126 fileOpenTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
127 System
.out
.println("total time to read: " + streamTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
128 System
.out
.println("total bytes: " + totalBytes
+ " bytes ("
129 + StringUtils
.humanReadableInt(totalBytes
) + ")");
130 System
.out
.println("throghput : " + StringUtils
.humanReadableInt((long)throughput
) + "B/s");
133 private Scan
getScan() {
134 Scan scan
= new Scan(); // default scan settings
135 scan
.setCacheBlocks(false);
136 scan
.readVersions(1);
137 scan
.setScanMetricsEnabled(true);
138 if (caching
!= null) {
139 scan
.setCaching(Integer
.parseInt(caching
));
145 public void testScan() throws IOException
{
146 Stopwatch tableOpenTimer
= Stopwatch
.createUnstarted();
147 Stopwatch scanOpenTimer
= Stopwatch
.createUnstarted();
148 Stopwatch scanTimer
= Stopwatch
.createUnstarted();
150 tableOpenTimer
.start();
151 Connection connection
= ConnectionFactory
.createConnection(getConf());
152 Table table
= connection
.getTable(TableName
.valueOf(tablename
));
153 tableOpenTimer
.stop();
155 Scan scan
= getScan();
156 scanOpenTimer
.start();
157 ResultScanner scanner
= table
.getScanner(scan
);
158 scanOpenTimer
.stop();
164 Result result
= scanner
.next();
165 if (result
== null) {
170 numCells
+= result
.rawCells().length
;
177 ScanMetrics metrics
= scanner
.getScanMetrics();
178 long totalBytes
= metrics
.countOfBytesInResults
.get();
179 double throughput
= (double)totalBytes
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
180 double throughputRows
= (double)numRows
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
181 double throughputCells
= (double)numCells
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
183 System
.out
.println("HBase scan: ");
184 System
.out
.println("total time to open table: " +
185 tableOpenTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
186 System
.out
.println("total time to open scanner: " +
187 scanOpenTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
188 System
.out
.println("total time to scan: " +
189 scanTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
191 System
.out
.println("Scan metrics:\n" + metrics
.getMetricsMap());
193 System
.out
.println("total bytes: " + totalBytes
+ " bytes ("
194 + StringUtils
.humanReadableInt(totalBytes
) + ")");
195 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughput
) + "B/s");
196 System
.out
.println("total rows : " + numRows
);
197 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputRows
) + " rows/s");
198 System
.out
.println("total cells : " + numCells
);
199 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputCells
) + " cells/s");
203 public void testSnapshotScan() throws IOException
{
204 Stopwatch snapshotRestoreTimer
= Stopwatch
.createUnstarted();
205 Stopwatch scanOpenTimer
= Stopwatch
.createUnstarted();
206 Stopwatch scanTimer
= Stopwatch
.createUnstarted();
208 Path restoreDir
= new Path(this.restoreDir
);
210 snapshotRestoreTimer
.start();
211 restoreDir
.getFileSystem(conf
).delete(restoreDir
, true);
212 snapshotRestoreTimer
.stop();
214 Scan scan
= getScan();
215 scanOpenTimer
.start();
216 TableSnapshotScanner scanner
= new TableSnapshotScanner(conf
, restoreDir
, snapshotName
, scan
);
217 scanOpenTimer
.stop();
223 Result result
= scanner
.next();
224 if (result
== null) {
229 numCells
+= result
.rawCells().length
;
234 ScanMetrics metrics
= scanner
.getScanMetrics();
235 long totalBytes
= metrics
.countOfBytesInResults
.get();
236 double throughput
= (double)totalBytes
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
237 double throughputRows
= (double)numRows
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
238 double throughputCells
= (double)numCells
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
240 System
.out
.println("HBase scan snapshot: ");
241 System
.out
.println("total time to restore snapshot: " +
242 snapshotRestoreTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
243 System
.out
.println("total time to open scanner: " +
244 scanOpenTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
245 System
.out
.println("total time to scan: " +
246 scanTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
248 System
.out
.println("Scan metrics:\n" + metrics
.getMetricsMap());
250 System
.out
.println("total bytes: " + totalBytes
+ " bytes ("
251 + StringUtils
.humanReadableInt(totalBytes
) + ")");
252 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughput
) + "B/s");
253 System
.out
.println("total rows : " + numRows
);
254 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputRows
) + " rows/s");
255 System
.out
.println("total cells : " + numCells
);
256 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputCells
) + " cells/s");
260 public static enum ScanCounter
{
265 public static class MyMapper
<KEYOUT
, VALUEOUT
> extends TableMapper
<KEYOUT
, VALUEOUT
> {
267 protected void map(ImmutableBytesWritable key
, Result value
,
268 Context context
) throws IOException
,
269 InterruptedException
{
270 context
.getCounter(ScanCounter
.NUM_ROWS
).increment(1);
271 context
.getCounter(ScanCounter
.NUM_CELLS
).increment(value
.rawCells().length
);
275 public void testScanMapReduce() throws IOException
, InterruptedException
, ClassNotFoundException
{
276 Stopwatch scanOpenTimer
= Stopwatch
.createUnstarted();
277 Stopwatch scanTimer
= Stopwatch
.createUnstarted();
279 Scan scan
= getScan();
281 String jobName
= "testScanMapReduce";
283 Job job
= new Job(conf
);
284 job
.setJobName(jobName
);
286 job
.setJarByClass(getClass());
288 TableMapReduceUtil
.initTableMapperJob(
297 job
.setNumReduceTasks(0);
298 job
.setOutputKeyClass(NullWritable
.class);
299 job
.setOutputValueClass(NullWritable
.class);
300 job
.setOutputFormatClass(NullOutputFormat
.class);
303 job
.waitForCompletion(true);
306 Counters counters
= job
.getCounters();
307 long numRows
= counters
.findCounter(ScanCounter
.NUM_ROWS
).getValue();
308 long numCells
= counters
.findCounter(ScanCounter
.NUM_CELLS
).getValue();
310 long totalBytes
= counters
.findCounter(HBASE_COUNTER_GROUP_NAME
, "BYTES_IN_RESULTS").getValue();
311 double throughput
= (double)totalBytes
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
312 double throughputRows
= (double)numRows
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
313 double throughputCells
= (double)numCells
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
315 System
.out
.println("HBase scan mapreduce: ");
316 System
.out
.println("total time to open scanner: " +
317 scanOpenTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
318 System
.out
.println("total time to scan: " + scanTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
320 System
.out
.println("total bytes: " + totalBytes
+ " bytes ("
321 + StringUtils
.humanReadableInt(totalBytes
) + ")");
322 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughput
) + "B/s");
323 System
.out
.println("total rows : " + numRows
);
324 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputRows
) + " rows/s");
325 System
.out
.println("total cells : " + numCells
);
326 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputCells
) + " cells/s");
329 public void testSnapshotScanMapReduce() throws IOException
, InterruptedException
, ClassNotFoundException
{
330 Stopwatch scanOpenTimer
= Stopwatch
.createUnstarted();
331 Stopwatch scanTimer
= Stopwatch
.createUnstarted();
333 Scan scan
= getScan();
335 String jobName
= "testSnapshotScanMapReduce";
337 Job job
= new Job(conf
);
338 job
.setJobName(jobName
);
340 job
.setJarByClass(getClass());
342 TableMapReduceUtil
.initTableSnapshotMapperJob(
353 job
.setNumReduceTasks(0);
354 job
.setOutputKeyClass(NullWritable
.class);
355 job
.setOutputValueClass(NullWritable
.class);
356 job
.setOutputFormatClass(NullOutputFormat
.class);
359 job
.waitForCompletion(true);
362 Counters counters
= job
.getCounters();
363 long numRows
= counters
.findCounter(ScanCounter
.NUM_ROWS
).getValue();
364 long numCells
= counters
.findCounter(ScanCounter
.NUM_CELLS
).getValue();
366 long totalBytes
= counters
.findCounter(HBASE_COUNTER_GROUP_NAME
, "BYTES_IN_RESULTS").getValue();
367 double throughput
= (double)totalBytes
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
368 double throughputRows
= (double)numRows
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
369 double throughputCells
= (double)numCells
/ scanTimer
.elapsed(TimeUnit
.SECONDS
);
371 System
.out
.println("HBase scan mapreduce: ");
372 System
.out
.println("total time to open scanner: " +
373 scanOpenTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
374 System
.out
.println("total time to scan: " + scanTimer
.elapsed(TimeUnit
.MILLISECONDS
) + " ms");
376 System
.out
.println("total bytes: " + totalBytes
+ " bytes ("
377 + StringUtils
.humanReadableInt(totalBytes
) + ")");
378 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughput
) + "B/s");
379 System
.out
.println("total rows : " + numRows
);
380 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputRows
) + " rows/s");
381 System
.out
.println("total cells : " + numCells
);
382 System
.out
.println("throughput : " + StringUtils
.humanReadableInt((long)throughputCells
) + " cells/s");
386 protected int doWork() throws Exception
{
387 if (type
.equals("streaming")) {
388 testHdfsStreaming(new Path(file
));
389 } else if (type
.equals("scan")){
391 } else if (type
.equals("snapshotscan")) {
393 } else if (type
.equals("scanmapreduce")) {
395 } else if (type
.equals("snapshotscanmapreduce")) {
396 testSnapshotScanMapReduce();
401 public static void main (String
[] args
) throws Exception
{
402 int ret
= ToolRunner
.run(HBaseConfiguration
.create(), new ScanPerformanceEvaluation(), args
);