HBASE-26787 TestRegionReplicaReplicationError should inject error in replicateToRepli...
[hbase.git] / hbase-mapreduce / src / test / java / org / apache / hadoop / hbase / ScanPerformanceEvaluation.java
bloba9ce959c6f9abdd2c669fcd8755da5e988c6b578
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
19 package org.apache.hadoop.hbase;
21 import java.io.IOException;
22 import java.util.concurrent.TimeUnit;
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.FSDataInputStream;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.client.Connection;
28 import org.apache.hadoop.hbase.client.ConnectionFactory;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.client.ResultScanner;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.client.Table;
33 import org.apache.hadoop.hbase.client.TableSnapshotScanner;
34 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
37 import org.apache.hadoop.hbase.mapreduce.TableMapper;
38 import org.apache.hadoop.hbase.util.AbstractHBaseTool;
39 import org.apache.hadoop.hbase.util.CommonFSUtils;
40 import org.apache.hadoop.io.NullWritable;
41 import org.apache.hadoop.mapreduce.Counters;
42 import org.apache.hadoop.mapreduce.Job;
43 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
44 import org.apache.hadoop.util.StringUtils;
45 import org.apache.hadoop.util.ToolRunner;
46 import org.apache.yetus.audience.InterfaceAudience;
48 import org.apache.hbase.thirdparty.com.google.common.base.Stopwatch;
49 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
51 /**
52 * A simple performance evaluation tool for single client and MR scans
53 * and snapshot scans.
55 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
56 public class ScanPerformanceEvaluation extends AbstractHBaseTool {
58 private static final String HBASE_COUNTER_GROUP_NAME = "HBaseCounters";
60 private String type;
61 private String file;
62 private String tablename;
63 private String snapshotName;
64 private String restoreDir;
65 private String caching;
67 @Override
68 public void setConf(Configuration conf) {
69 super.setConf(conf);
70 Path rootDir;
71 try {
72 rootDir = CommonFSUtils.getRootDir(conf);
73 rootDir.getFileSystem(conf);
74 } catch (IOException ex) {
75 throw new RuntimeException(ex);
79 @Override
80 protected void addOptions() {
81 this.addRequiredOptWithArg("t", "type", "the type of the test. One of the following: streaming|scan|snapshotscan|scanmapreduce|snapshotscanmapreduce");
82 this.addOptWithArg("f", "file", "the filename to read from");
83 this.addOptWithArg("tn", "table", "the tablename to read from");
84 this.addOptWithArg("sn", "snapshot", "the snapshot name to read from");
85 this.addOptWithArg("rs", "restoredir", "the directory to restore the snapshot");
86 this.addOptWithArg("ch", "caching", "scanner caching value");
89 @Override
90 protected void processOptions(CommandLine cmd) {
91 type = cmd.getOptionValue("type");
92 file = cmd.getOptionValue("file");
93 tablename = cmd.getOptionValue("table");
94 snapshotName = cmd.getOptionValue("snapshot");
95 restoreDir = cmd.getOptionValue("restoredir");
96 caching = cmd.getOptionValue("caching");
99 protected void testHdfsStreaming(Path filename) throws IOException {
100 byte[] buf = new byte[1024];
101 FileSystem fs = filename.getFileSystem(getConf());
103 // read the file from start to finish
104 Stopwatch fileOpenTimer = Stopwatch.createUnstarted();
105 Stopwatch streamTimer = Stopwatch.createUnstarted();
107 fileOpenTimer.start();
108 FSDataInputStream in = fs.open(filename);
109 fileOpenTimer.stop();
111 long totalBytes = 0;
112 streamTimer.start();
113 while (true) {
114 int read = in.read(buf);
115 if (read < 0) {
116 break;
118 totalBytes += read;
120 streamTimer.stop();
122 double throughput = (double)totalBytes / streamTimer.elapsed(TimeUnit.SECONDS);
124 System.out.println("HDFS streaming: ");
125 System.out.println("total time to open: " +
126 fileOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
127 System.out.println("total time to read: " + streamTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
128 System.out.println("total bytes: " + totalBytes + " bytes ("
129 + StringUtils.humanReadableInt(totalBytes) + ")");
130 System.out.println("throghput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
133 private Scan getScan() {
134 Scan scan = new Scan(); // default scan settings
135 scan.setCacheBlocks(false);
136 scan.readVersions(1);
137 scan.setScanMetricsEnabled(true);
138 if (caching != null) {
139 scan.setCaching(Integer.parseInt(caching));
142 return scan;
145 public void testScan() throws IOException {
146 Stopwatch tableOpenTimer = Stopwatch.createUnstarted();
147 Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
148 Stopwatch scanTimer = Stopwatch.createUnstarted();
150 tableOpenTimer.start();
151 Connection connection = ConnectionFactory.createConnection(getConf());
152 Table table = connection.getTable(TableName.valueOf(tablename));
153 tableOpenTimer.stop();
155 Scan scan = getScan();
156 scanOpenTimer.start();
157 ResultScanner scanner = table.getScanner(scan);
158 scanOpenTimer.stop();
160 long numRows = 0;
161 long numCells = 0;
162 scanTimer.start();
163 while (true) {
164 Result result = scanner.next();
165 if (result == null) {
166 break;
168 numRows++;
170 numCells += result.rawCells().length;
172 scanTimer.stop();
173 scanner.close();
174 table.close();
175 connection.close();
177 ScanMetrics metrics = scanner.getScanMetrics();
178 long totalBytes = metrics.countOfBytesInResults.get();
179 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
180 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
181 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
183 System.out.println("HBase scan: ");
184 System.out.println("total time to open table: " +
185 tableOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
186 System.out.println("total time to open scanner: " +
187 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
188 System.out.println("total time to scan: " +
189 scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
191 System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
193 System.out.println("total bytes: " + totalBytes + " bytes ("
194 + StringUtils.humanReadableInt(totalBytes) + ")");
195 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
196 System.out.println("total rows : " + numRows);
197 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
198 System.out.println("total cells : " + numCells);
199 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
203 public void testSnapshotScan() throws IOException {
204 Stopwatch snapshotRestoreTimer = Stopwatch.createUnstarted();
205 Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
206 Stopwatch scanTimer = Stopwatch.createUnstarted();
208 Path restoreDir = new Path(this.restoreDir);
210 snapshotRestoreTimer.start();
211 restoreDir.getFileSystem(conf).delete(restoreDir, true);
212 snapshotRestoreTimer.stop();
214 Scan scan = getScan();
215 scanOpenTimer.start();
216 TableSnapshotScanner scanner = new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
217 scanOpenTimer.stop();
219 long numRows = 0;
220 long numCells = 0;
221 scanTimer.start();
222 while (true) {
223 Result result = scanner.next();
224 if (result == null) {
225 break;
227 numRows++;
229 numCells += result.rawCells().length;
231 scanTimer.stop();
232 scanner.close();
234 ScanMetrics metrics = scanner.getScanMetrics();
235 long totalBytes = metrics.countOfBytesInResults.get();
236 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
237 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
238 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
240 System.out.println("HBase scan snapshot: ");
241 System.out.println("total time to restore snapshot: " +
242 snapshotRestoreTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
243 System.out.println("total time to open scanner: " +
244 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
245 System.out.println("total time to scan: " +
246 scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
248 System.out.println("Scan metrics:\n" + metrics.getMetricsMap());
250 System.out.println("total bytes: " + totalBytes + " bytes ("
251 + StringUtils.humanReadableInt(totalBytes) + ")");
252 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
253 System.out.println("total rows : " + numRows);
254 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
255 System.out.println("total cells : " + numCells);
256 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
260 public static enum ScanCounter {
261 NUM_ROWS,
262 NUM_CELLS,
265 public static class MyMapper<KEYOUT, VALUEOUT> extends TableMapper<KEYOUT, VALUEOUT> {
266 @Override
267 protected void map(ImmutableBytesWritable key, Result value,
268 Context context) throws IOException,
269 InterruptedException {
270 context.getCounter(ScanCounter.NUM_ROWS).increment(1);
271 context.getCounter(ScanCounter.NUM_CELLS).increment(value.rawCells().length);
275 public void testScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
276 Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
277 Stopwatch scanTimer = Stopwatch.createUnstarted();
279 Scan scan = getScan();
281 String jobName = "testScanMapReduce";
283 Job job = new Job(conf);
284 job.setJobName(jobName);
286 job.setJarByClass(getClass());
288 TableMapReduceUtil.initTableMapperJob(
289 this.tablename,
290 scan,
291 MyMapper.class,
292 NullWritable.class,
293 NullWritable.class,
297 job.setNumReduceTasks(0);
298 job.setOutputKeyClass(NullWritable.class);
299 job.setOutputValueClass(NullWritable.class);
300 job.setOutputFormatClass(NullOutputFormat.class);
302 scanTimer.start();
303 job.waitForCompletion(true);
304 scanTimer.stop();
306 Counters counters = job.getCounters();
307 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
308 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
310 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
311 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
312 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
313 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
315 System.out.println("HBase scan mapreduce: ");
316 System.out.println("total time to open scanner: " +
317 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
318 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
320 System.out.println("total bytes: " + totalBytes + " bytes ("
321 + StringUtils.humanReadableInt(totalBytes) + ")");
322 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
323 System.out.println("total rows : " + numRows);
324 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
325 System.out.println("total cells : " + numCells);
326 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
329 public void testSnapshotScanMapReduce() throws IOException, InterruptedException, ClassNotFoundException {
330 Stopwatch scanOpenTimer = Stopwatch.createUnstarted();
331 Stopwatch scanTimer = Stopwatch.createUnstarted();
333 Scan scan = getScan();
335 String jobName = "testSnapshotScanMapReduce";
337 Job job = new Job(conf);
338 job.setJobName(jobName);
340 job.setJarByClass(getClass());
342 TableMapReduceUtil.initTableSnapshotMapperJob(
343 this.snapshotName,
344 scan,
345 MyMapper.class,
346 NullWritable.class,
347 NullWritable.class,
348 job,
349 true,
350 new Path(restoreDir)
353 job.setNumReduceTasks(0);
354 job.setOutputKeyClass(NullWritable.class);
355 job.setOutputValueClass(NullWritable.class);
356 job.setOutputFormatClass(NullOutputFormat.class);
358 scanTimer.start();
359 job.waitForCompletion(true);
360 scanTimer.stop();
362 Counters counters = job.getCounters();
363 long numRows = counters.findCounter(ScanCounter.NUM_ROWS).getValue();
364 long numCells = counters.findCounter(ScanCounter.NUM_CELLS).getValue();
366 long totalBytes = counters.findCounter(HBASE_COUNTER_GROUP_NAME, "BYTES_IN_RESULTS").getValue();
367 double throughput = (double)totalBytes / scanTimer.elapsed(TimeUnit.SECONDS);
368 double throughputRows = (double)numRows / scanTimer.elapsed(TimeUnit.SECONDS);
369 double throughputCells = (double)numCells / scanTimer.elapsed(TimeUnit.SECONDS);
371 System.out.println("HBase scan mapreduce: ");
372 System.out.println("total time to open scanner: " +
373 scanOpenTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
374 System.out.println("total time to scan: " + scanTimer.elapsed(TimeUnit.MILLISECONDS) + " ms");
376 System.out.println("total bytes: " + totalBytes + " bytes ("
377 + StringUtils.humanReadableInt(totalBytes) + ")");
378 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughput) + "B/s");
379 System.out.println("total rows : " + numRows);
380 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputRows) + " rows/s");
381 System.out.println("total cells : " + numCells);
382 System.out.println("throughput : " + StringUtils.humanReadableInt((long)throughputCells) + " cells/s");
385 @Override
386 protected int doWork() throws Exception {
387 if (type.equals("streaming")) {
388 testHdfsStreaming(new Path(file));
389 } else if (type.equals("scan")){
390 testScan();
391 } else if (type.equals("snapshotscan")) {
392 testSnapshotScan();
393 } else if (type.equals("scanmapreduce")) {
394 testScanMapReduce();
395 } else if (type.equals("snapshotscanmapreduce")) {
396 testSnapshotScanMapReduce();
398 return 0;
401 public static void main (String[] args) throws Exception {
402 int ret = ToolRunner.run(HBaseConfiguration.create(), new ScanPerformanceEvaluation(), args);
403 System.exit(ret);