4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
30 Apache MapReduce is a software framework used to analyze large amounts of data. It is provided by link:https://hadoop.apache.org/[Apache Hadoop].
31 MapReduce itself is out of the scope of this document.
32 A good place to get started with MapReduce is https://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html.
33 MapReduce version 2 (MR2)is now part of link:https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoop-yarn-site/[YARN].
35 This chapter discusses specific configuration steps you need to take to use MapReduce on data within HBase.
36 In addition, it discusses other interactions and issues between HBase and MapReduce
37 jobs. Finally, it discusses <<cascading,Cascading>>, an
38 link:http://www.cascading.org/[alternative API] for MapReduce.
40 .`mapred` and `mapreduce`
43 There are two mapreduce packages in HBase as in MapReduce itself: _org.apache.hadoop.hbase.mapred_ and _org.apache.hadoop.hbase.mapreduce_.
44 The former does old-style API and the latter the new mode.
45 The latter has more facility though you can usually find an equivalent in the older package.
46 Pick the package that goes with your MapReduce deploy.
47 When in doubt or starting over, pick _org.apache.hadoop.hbase.mapreduce_.
48 In the notes below, we refer to _o.a.h.h.mapreduce_ but replace with
49 _o.a.h.h.mapred_ if that is what you are using.
52 [[hbase.mapreduce.classpath]]
53 == HBase, MapReduce, and the CLASSPATH
55 By default, MapReduce jobs deployed to a MapReduce cluster do not have access to
56 either the HBase configuration under `$HBASE_CONF_DIR` or the HBase classes.
58 To give the MapReduce jobs the access they need, you could add _hbase-site.xml_to _$HADOOP_HOME/conf_ and add HBase jars to the _$HADOOP_HOME/lib_ directory.
59 You would then need to copy these changes across your cluster. Or you could edit _$HADOOP_HOME/conf/hadoop-env.sh_ and add hbase dependencies to the `HADOOP_CLASSPATH` variable.
60 Neither of these approaches is recommended because it will pollute your Hadoop install with HBase references.
61 It also requires you restart the Hadoop cluster before Hadoop can use the HBase data.
63 The recommended approach is to let HBase add its dependency jars and use `HADOOP_CLASSPATH` or `-libjars`.
65 Since HBase `0.90.x`, HBase adds its dependency JARs to the job configuration itself.
66 The dependencies only need to be available on the local `CLASSPATH` and from here they'll be picked
67 up and bundled into the fat job jar deployed to the MapReduce cluster. A basic trick just passes
68 the full hbase classpath -- all hbase and dependent jars as well as configurations -- to the mapreduce
69 job runner letting hbase utility pick out from the full-on classpath what it needs adding them to the
70 MapReduce job configuration (See the source at `TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)` for how this is done).
73 The following example runs the bundled HBase link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html[RowCounter] MapReduce job against a table named `usertable`.
74 It sets into `HADOOP_CLASSPATH` the jars hbase needs to run in an MapReduce context (including configuration files such as hbase-site.xml).
75 Be sure to use the correct version of the HBase JAR for your system; replace the VERSION string in the below command line w/ the version of
76 your local hbase install. The backticks (``` symbols) cause the shell to execute the sub-commands, setting the output of `hbase classpath` into `HADOOP_CLASSPATH`.
77 This example assumes you use a BASH-compatible shell.
81 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
82 ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-VERSION.jar \
83 org.apache.hadoop.hbase.mapreduce.RowCounter usertable
86 The above command will launch a row counting mapreduce job against the hbase cluster that is pointed to by your local configuration on a cluster that the hadoop configs are pointing to.
88 The main for the `hbase-mapreduce.jar` is a Driver that lists a few basic mapreduce tasks that ship with hbase.
89 For example, presuming your install is hbase `2.0.0-SNAPSHOT`:
93 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
94 ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar
95 An example program must be given as the first argument.
96 Valid program names are:
97 CellCounter: Count cells in HBase table.
98 WALPlayer: Replay WAL files.
99 completebulkload: Complete a bulk data load.
100 copytable: Export a table from local cluster to peer cluster.
101 export: Write table data to HDFS.
102 exportsnapshot: Export the specific snapshot to a given FileSystem.
103 import: Import data written by Export.
104 importtsv: Import data in TSV format.
105 rowcounter: Count rows in HBase table.
106 verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
110 You can use the above listed shortnames for mapreduce jobs as in the below re-run of the row counter job (again, presuming your install is hbase `2.0.0-SNAPSHOT`):
114 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
115 ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar \
119 You might find the more selective `hbase mapredcp` tool output of interest; it lists the minimum set of jars needed
120 to run a basic mapreduce job against an hbase install. It does not include configuration. You'll probably need to add
121 these if you want your MapReduce job to find the target cluster. You'll probably have to also add pointers to extra jars
122 once you start to do anything of substance. Just specify the extras by passing the system propery `-Dtmpjars` when
123 you run `hbase mapredcp`.
125 For jobs that do not package their dependencies or call `TableMapReduceUtil#addDependencyJars`, the following command structure is necessary:
129 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...
134 The example may not work if you are running HBase from its build directory rather than an installed location.
135 You may see an error like the following:
138 java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper
141 If this occurs, try modifying the command as follows, so that it uses the HBase JARs from the _target/_ directory within the build environment.
145 $ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar rowcounter usertable
149 .Notice to MapReduce users of HBase between 0.96.1 and 0.98.4
152 Some MapReduce jobs that use HBase fail to launch.
153 The symptom is an exception similar to the following:
156 Exception in thread "main" java.lang.IllegalAccessError: class
157 com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass
158 com.google.protobuf.LiteralByteString
159 at java.lang.ClassLoader.defineClass1(Native Method)
160 at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
161 at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
162 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
163 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
164 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
165 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
166 at java.security.AccessController.doPrivileged(Native Method)
167 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
168 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
169 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
171 org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818)
173 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433)
175 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186)
177 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147)
179 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270)
181 org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100)
185 This is caused by an optimization introduced in link:https://issues.apache.org/jira/browse/HBASE-9867[HBASE-9867] that inadvertently introduced a classloader dependency.
187 This affects both jobs using the `-libjars` option and "fat jar," those which package their runtime dependencies in a nested `lib` folder.
189 In order to satisfy the new classloader requirements, `hbase-protocol.jar` must be included in Hadoop's classpath.
190 See <<hbase.mapreduce.classpath>> for current recommendations for resolving classpath errors.
191 The following is included for historical purposes.
193 This can be resolved system-wide by including a reference to the `hbase-protocol.jar` in Hadoop's lib directory, via a symlink or by copying the jar into the new location.
195 This can also be achieved on a per-job launch basis by including it in the `HADOOP_CLASSPATH` environment variable at job submission time.
196 When launching jobs that package their dependencies, all three of the following job launching commands satisfy this requirement:
200 $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
201 $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
202 $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass
205 For jars that do not package their dependencies, the following command structure is necessary:
209 $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...
212 See also link:https://issues.apache.org/jira/browse/HBASE-10304[HBASE-10304] for further discussion of this issue.
215 == MapReduce Scan Caching
217 TableMapReduceUtil now restores the option to set scanner caching (the number of rows which are cached before returning the result to the client) on the Scan object that is passed in.
218 This functionality was lost due to a bug in HBase 0.95 (link:https://issues.apache.org/jira/browse/HBASE-11558[HBASE-11558]), which is fixed for HBase 0.98.5 and 0.96.3.
219 The priority order for choosing the scanner caching is as follows:
221 . Caching settings which are set on the scan object.
222 . Caching settings which are specified via the configuration option `hbase.client.scanner.caching`, which can either be set manually in _hbase-site.xml_ or via the helper method `TableMapReduceUtil.setScannerCaching()`.
223 . The default value `HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING`, which is set to `100`.
225 Optimizing the caching settings is a balance between the time the client waits for a result and the number of sets of results the client needs to receive.
226 If the caching setting is too large, the client could end up waiting for a long time or the request could even time out.
227 If the setting is too small, the scan needs to return results in several pieces.
228 If you think of the scan as a shovel, a bigger cache setting is analogous to a bigger shovel, and a smaller cache setting is equivalent to more shoveling in order to fill the bucket.
230 The list of priorities mentioned above allows you to set a reasonable default, and override it for specific operations.
232 See the API documentation for link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html[Scan] for more details.
234 == Bundled HBase MapReduce Jobs
236 The HBase JAR also serves as a Driver for some bundled MapReduce jobs.
237 To learn about the bundled MapReduce jobs, run the following command.
241 $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
242 An example program must be given as the first argument.
243 Valid program names are:
244 copytable: Export a table from local cluster to peer cluster
245 completebulkload: Complete a bulk data load.
246 export: Write table data to HDFS.
247 import: Import data written by Export.
248 importtsv: Import data in TSV format.
249 rowcounter: Count rows in HBase table
252 Each of the valid program names are bundled MapReduce jobs.
253 To run one of the jobs, model your command after the following example.
257 $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable
260 == HBase as a MapReduce Job Data Source and Data Sink
262 HBase can be used as a data source, link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html[TableInputFormat], and data sink, link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html[TableOutputFormat] or link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.html[MultiTableOutputFormat], for MapReduce jobs.
263 Writing MapReduce jobs that read or write HBase, it is advisable to subclass link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html[TableMapper] and/or link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableReducer.html[TableReducer].
264 See the do-nothing pass-through classes link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.html[IdentityTableMapper] and link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.html[IdentityTableReducer] for basic usage.
265 For a more involved example, see link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html[RowCounter] or review the `org.apache.hadoop.hbase.mapreduce.TestTableMapReduce` unit test.
267 If you run MapReduce jobs that use HBase as source or sink, need to specify source and sink table and column names in your configuration.
269 When you read from HBase, the `TableInputFormat` requests the list of regions from HBase and makes a map, which is either a `map-per-region` or `mapreduce.job.maps` map, whichever is smaller.
270 If your job only has two maps, raise `mapreduce.job.maps` to a number greater than the number of regions.
271 Maps will run on the adjacent TaskTracker/NodeManager if you are running a TaskTracer/NodeManager and RegionServer per node.
272 When writing to HBase, it may make sense to avoid the Reduce step and write back into HBase from within your map.
273 This approach works when your job does not need the sort and collation that MapReduce does on the map-emitted data.
274 On insert, HBase 'sorts' so there is no point double-sorting (and shuffling data around your MapReduce cluster) unless you need to.
275 If you do not need the Reduce, your map might emit counts of records processed for reporting at the end of the job, or set the number of Reduces to zero and use TableOutputFormat.
276 If running the Reduce step makes sense in your case, you should typically use multiple reducers so that load is spread across the HBase cluster.
278 A new HBase partitioner, the link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.html[HRegionPartitioner], can run as many reducers the number of existing regions.
279 The HRegionPartitioner is suitable when your table is large and your upload will not greatly alter the number of existing regions upon completion.
280 Otherwise use the default partitioner.
282 == Writing HFiles Directly During Bulk Import
284 If you are importing into a new table, you can bypass the HBase API and write your content directly to the filesystem, formatted into HBase data files (HFiles). Your import will run faster, perhaps an order of magnitude faster.
285 For more on how this mechanism works, see <<arch.bulk.load>>.
287 == RowCounter Example
289 The included link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html[RowCounter] MapReduce job uses `TableInputFormat` and does a count of all rows in the specified table.
290 To run it, use the following command:
294 $ ./bin/hadoop jar hbase-X.X.X.jar
297 This will invoke the HBase MapReduce Driver class.
298 Select `rowcounter` from the choice of jobs offered.
299 This will print rowcounter usage advice to standard output.
300 Specify the tablename, column to count, and output directory.
301 If you have classpath errors, see <<hbase.mapreduce.classpath>>.
304 == Map-Task Splitting
307 === The Default HBase MapReduce Splitter
309 When link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html[TableInputFormat] is used to source an HBase table in a MapReduce job, its splitter will make a map task for each region of the table.
310 Thus, if there are 100 regions in the table, there will be 100 map-tasks for the job - regardless of how many column families are selected in the Scan.
315 For those interested in implementing custom splitters, see the method `getSplits` in link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html[TableInputFormatBase].
316 That is where the logic for map-task assignment resides.
318 [[mapreduce.example]]
319 == HBase MapReduce Examples
321 [[mapreduce.example.read]]
322 === HBase MapReduce Read Example
324 The following is an example of using HBase as a MapReduce source in read-only manner.
325 Specifically, there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper.
326 The job would be defined as follows...
330 Configuration config = HBaseConfiguration.create();
331 Job job = new Job(config, "ExampleRead");
332 job.setJarByClass(MyReadJob.class); // class that contains mapper
334 Scan scan = new Scan();
335 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
336 scan.setCacheBlocks(false); // don't set to true for MR jobs
337 // set other scan attrs
340 TableMapReduceUtil.initTableMapperJob(
341 tableName, // input HBase table name
342 scan, // Scan instance to control CF and attribute selection
343 MyMapper.class, // mapper
344 null, // mapper output key
345 null, // mapper output value
347 job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
349 boolean b = job.waitForCompletion(true);
351 throw new IOException("error with job!");
355 ...and the mapper instance would extend link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html[TableMapper]...
359 public static class MyMapper extends TableMapper<Text, Text> {
361 public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
362 // process data for the row from the Result instance.
367 [[mapreduce.example.readwrite]]
368 === HBase MapReduce Read/Write Example
370 The following is an example of using HBase both as a source and as a sink with MapReduce.
371 This example will simply copy data from one table to another.
375 Configuration config = HBaseConfiguration.create();
376 Job job = new Job(config,"ExampleReadWrite");
377 job.setJarByClass(MyReadWriteJob.class); // class that contains mapper
379 Scan scan = new Scan();
380 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
381 scan.setCacheBlocks(false); // don't set to true for MR jobs
382 // set other scan attrs
384 TableMapReduceUtil.initTableMapperJob(
385 sourceTable, // input table
386 scan, // Scan instance to control CF and attribute selection
387 MyMapper.class, // mapper class
388 null, // mapper output key
389 null, // mapper output value
391 TableMapReduceUtil.initTableReducerJob(
392 targetTable, // output table
393 null, // reducer class
395 job.setNumReduceTasks(0);
397 boolean b = job.waitForCompletion(true);
399 throw new IOException("error with job!");
403 An explanation is required of what `TableMapReduceUtil` is doing, especially with the reducer. link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html[TableOutputFormat] is being used as the outputFormat class, and several parameters are being set on the config (e.g., `TableOutputFormat.OUTPUT_TABLE`), as well as setting the reducer output key to `ImmutableBytesWritable` and reducer value to `Writable`.
404 These could be set by the programmer on the job and conf, but `TableMapReduceUtil` tries to make things easier.
406 The following is the example mapper, which will create a `Put` and matching the input `Result` and emit it.
407 Note: this is what the CopyTable utility does.
411 public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
413 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
414 // this example is just copying the data from the source table...
415 context.write(row, resultToPut(row,value));
418 private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
419 Put put = new Put(key.get());
420 for (KeyValue kv : result.raw()) {
428 There isn't actually a reducer step, so `TableOutputFormat` takes care of sending the `Put` to the target table.
430 This is just an example, developers could choose not to use `TableOutputFormat` and connect to the target table themselves.
432 [[mapreduce.example.readwrite.multi]]
433 === HBase MapReduce Read/Write Example With Multi-Table Output
435 TODO: example for `MultiTableOutputFormat`.
437 [[mapreduce.example.summary]]
438 === HBase MapReduce Summary to HBase Example
440 The following example uses HBase as a MapReduce source and sink with a summarization step.
441 This example will count the number of distinct instances of a value in a table and write those summarized counts in another table.
445 Configuration config = HBaseConfiguration.create();
446 Job job = new Job(config,"ExampleSummary");
447 job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
449 Scan scan = new Scan();
450 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
451 scan.setCacheBlocks(false); // don't set to true for MR jobs
452 // set other scan attrs
454 TableMapReduceUtil.initTableMapperJob(
455 sourceTable, // input table
456 scan, // Scan instance to control CF and attribute selection
457 MyMapper.class, // mapper class
458 Text.class, // mapper output key
459 IntWritable.class, // mapper output value
461 TableMapReduceUtil.initTableReducerJob(
462 targetTable, // output table
463 MyTableReducer.class, // reducer class
465 job.setNumReduceTasks(1); // at least one, adjust as required
467 boolean b = job.waitForCompletion(true);
469 throw new IOException("error with job!");
473 In this example mapper a column with a String-value is chosen as the value to summarize upon.
474 This value is used as the key to emit from the mapper, and an `IntWritable` represents an instance counter.
478 public static class MyMapper extends TableMapper<Text, IntWritable> {
479 public static final byte[] CF = "cf".getBytes();
480 public static final byte[] ATTR1 = "attr1".getBytes();
482 private final IntWritable ONE = new IntWritable(1);
483 private Text text = new Text();
485 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
486 String val = new String(value.getValue(CF, ATTR1));
487 text.set(val); // we can only emit Writables...
488 context.write(text, ONE);
493 In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a `Put`.
497 public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
498 public static final byte[] CF = "cf".getBytes();
499 public static final byte[] COUNT = "count".getBytes();
501 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
503 for (IntWritable val : values) {
506 Put put = new Put(Bytes.toBytes(key.toString()));
507 put.add(CF, COUNT, Bytes.toBytes(i));
509 context.write(null, put);
514 [[mapreduce.example.summary.file]]
515 === HBase MapReduce Summary to File Example
517 This very similar to the summary example above, with exception that this is using HBase as a MapReduce source but HDFS as the sink.
518 The differences are in the job setup and in the reducer.
519 The mapper remains the same.
523 Configuration config = HBaseConfiguration.create();
524 Job job = new Job(config,"ExampleSummaryToFile");
525 job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
527 Scan scan = new Scan();
528 scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
529 scan.setCacheBlocks(false); // don't set to true for MR jobs
530 // set other scan attrs
532 TableMapReduceUtil.initTableMapperJob(
533 sourceTable, // input table
534 scan, // Scan instance to control CF and attribute selection
535 MyMapper.class, // mapper class
536 Text.class, // mapper output key
537 IntWritable.class, // mapper output value
539 job.setReducerClass(MyReducer.class); // reducer class
540 job.setNumReduceTasks(1); // at least one, adjust as required
541 FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
543 boolean b = job.waitForCompletion(true);
545 throw new IOException("error with job!");
549 As stated above, the previous Mapper can run unchanged with this example.
550 As for the Reducer, it is a "generic" Reducer instead of extending TableMapper and emitting Puts.
554 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
556 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
558 for (IntWritable val : values) {
561 context.write(key, new IntWritable(i));
566 [[mapreduce.example.summary.noreducer]]
567 === HBase MapReduce Summary to HBase Without Reducer
569 It is also possible to perform summaries without a reducer - if you use HBase as the reducer.
571 An HBase target table would need to exist for the job summary.
572 The Table method `incrementColumnValue` would be used to atomically increment values.
573 From a performance perspective, it might make sense to keep a Map of values with their values to be incremented for each map-task, and make one update per key at during the `cleanup` method of the mapper.
574 However, your mileage may vary depending on the number of rows to be processed and unique keys.
576 In the end, the summary results are in HBase.
578 [[mapreduce.example.summary.rdbms]]
579 === HBase MapReduce Summary to RDBMS
581 Sometimes it is more appropriate to generate summaries to an RDBMS.
582 For these cases, it is possible to generate summaries directly to an RDBMS via a custom reducer.
583 The `setup` method can connect to an RDBMS (the connection information can be passed via custom parameters in the context) and the cleanup method can close the connection.
585 It is critical to understand that number of reducers for the job affects the summarization implementation, and you'll have to design this into your reducer.
586 Specifically, whether it is designed to run as a singleton (one reducer) or multiple reducers.
587 Neither is right or wrong, it depends on your use-case.
588 Recognize that the more reducers that are assigned to the job, the more simultaneous connections to the RDBMS will be created - this will scale, but only to a point.
592 public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
594 private Connection c = null;
596 public void setup(Context context) {
597 // create DB connection...
600 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
602 // in this example the keys are Text, but this is just an example
605 public void cleanup(Context context) {
606 // close db connection
612 In the end, the summary results are written to your RDBMS table/s.
614 [[mapreduce.htable.access]]
615 == Accessing Other HBase Tables in a MapReduce Job
617 Although the framework currently allows one HBase table as input to a MapReduce job, other HBase tables can be accessed as lookup tables, etc., in a MapReduce job via creating an Table instance in the setup method of the Mapper.
620 public class MyMapper extends TableMapper<Text, LongWritable> {
621 private Table myOtherTable;
623 public void setup(Context context) {
624 // In here create a Connection to the cluster and save it or use the Connection
625 // from the existing table
626 myOtherTable = connection.getTable("myOtherTable");
629 public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
631 // use 'myOtherTable' for lookups
636 == Speculative Execution
638 It is generally advisable to turn off speculative execution for MapReduce jobs that use HBase as a source.
639 This can either be done on a per-Job basis through properties, or on the entire cluster.
640 Especially for longer running jobs, speculative execution will create duplicate map-tasks which will double-write your data to HBase; this is probably not what you want.
642 See <<spec.ex,spec.ex>> for more information.
647 link:http://www.cascading.org/[Cascading] is an alternative API for MapReduce, which
648 actually uses MapReduce, but allows you to write your MapReduce code in a simplified
651 The following example shows a Cascading `Flow` which "sinks" data into an HBase cluster. The same
652 `hBaseTap` API could be used to "source" data as well.
656 // read data from the default filesystem
657 // emits two fields: "offset" and "line"
658 Tap source = new Hfs( new TextLine(), inputFileLhs );
660 // store data in an HBase cluster
661 // accepts fields "num", "lower", and "upper"
662 // will automatically scope incoming fields to their proper familyname, "left" or "right"
663 Fields keyFields = new Fields( "num" );
664 String[] familyNames = {"left", "right"};
665 Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" ) };
666 Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
668 // a simple pipe assembly to parse the input into fields
669 // a real app would likely chain multiple Pipes together for more complex processing
670 Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), " " ) );
672 // "plan" a cluster executable Flow
673 // this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
674 Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );
676 // start the flow, and block until complete
677 parseFlow.complete();
679 // open an iterator on the HBase table we stuffed data into
680 TupleEntryIterator iterator = parseFlow.openSink();
682 while(iterator.hasNext())
684 // print out each tuple from HBase
685 System.out.println( "iterator.next() = " + iterator.next() );