HBASE-20276 restore original shell REPL functionality where commands can return results
[hbase.git] / src / main / asciidoc / _chapters / mapreduce.adoc
blob61cff86993faf6e30b6d4720e3b75681d06e3367
1 ////
2 /**
3  *
4  * Licensed to the Apache Software Foundation (ASF) under one
5  * or more contributor license agreements.  See the NOTICE file
6  * distributed with this work for additional information
7  * regarding copyright ownership.  The ASF licenses this file
8  * to you under the Apache License, Version 2.0 (the
9  * "License"); you may not use this file except in compliance
10  * with the License.  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  */
20 ////
22 [[mapreduce]]
23 = HBase and MapReduce
24 :doctype: book
25 :numbered:
26 :toc: left
27 :icons: font
28 :experimental:
30 Apache MapReduce is a software framework used to analyze large amounts of data. It is provided by link:https://hadoop.apache.org/[Apache Hadoop].
31 MapReduce itself is out of the scope of this document.
32 A good place to get started with MapReduce is https://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html.
33 MapReduce version 2 (MR2)is now part of link:https://hadoop.apache.org/docs/r2.3.0/hadoop-yarn/hadoop-yarn-site/[YARN].
35 This chapter discusses specific configuration steps you need to take to use MapReduce on data within HBase.
36 In addition, it discusses other interactions and issues between HBase and MapReduce
37 jobs. Finally, it discusses <<cascading,Cascading>>, an
38 link:http://www.cascading.org/[alternative API] for MapReduce.
40 .`mapred` and `mapreduce`
41 [NOTE]
42 ====
43 There are two mapreduce packages in HBase as in MapReduce itself: _org.apache.hadoop.hbase.mapred_ and _org.apache.hadoop.hbase.mapreduce_.
44 The former does old-style API and the latter the new mode.
45 The latter has more facility though you can usually find an equivalent in the older package.
46 Pick the package that goes with your MapReduce deploy.
47 When in doubt or starting over, pick _org.apache.hadoop.hbase.mapreduce_.
48 In the notes below, we refer to _o.a.h.h.mapreduce_ but replace with
49 _o.a.h.h.mapred_ if that is what you are using.
50 ====
52 [[hbase.mapreduce.classpath]]
53 == HBase, MapReduce, and the CLASSPATH
55 By default, MapReduce jobs deployed to a MapReduce cluster do not have access to
56 either the HBase configuration under `$HBASE_CONF_DIR` or the HBase classes.
58 To give the MapReduce jobs the access they need, you could add _hbase-site.xml_to _$HADOOP_HOME/conf_ and add HBase jars to the _$HADOOP_HOME/lib_ directory.
59 You would then need to copy these changes across your cluster. Or you could edit _$HADOOP_HOME/conf/hadoop-env.sh_ and add hbase dependencies to the `HADOOP_CLASSPATH` variable.
60 Neither of these approaches is recommended because it will pollute your Hadoop install with HBase references.
61 It also requires you restart the Hadoop cluster before Hadoop can use the HBase data.
63 The recommended approach is to let HBase add its dependency jars and use `HADOOP_CLASSPATH` or `-libjars`.
65 Since HBase `0.90.x`, HBase adds its dependency JARs to the job configuration itself.
66 The dependencies only need to be available on the local `CLASSPATH` and from here they'll be picked
67 up and bundled into the fat job jar deployed to the MapReduce cluster. A basic trick just passes
68 the full hbase classpath -- all hbase and dependent jars as well as configurations -- to the mapreduce
69 job runner letting hbase utility pick out from the full-on classpath what it needs adding them to the
70 MapReduce job configuration (See the source at `TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job)` for how this is done).
73 The following example runs the bundled HBase link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html[RowCounter] MapReduce job against a table named `usertable`.
74 It sets into `HADOOP_CLASSPATH` the jars hbase needs to run in an MapReduce context (including configuration files such as hbase-site.xml).
75 Be sure to use the correct version of the HBase JAR for your system; replace the VERSION string in the below command line w/ the version of
76 your local hbase install.  The backticks (``` symbols) cause the shell to execute the sub-commands, setting the output of `hbase classpath` into `HADOOP_CLASSPATH`.
77 This example assumes you use a BASH-compatible shell.
79 [source,bash]
80 ----
81 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
82   ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-VERSION.jar \
83   org.apache.hadoop.hbase.mapreduce.RowCounter usertable
84 ----
86 The above command will launch a row counting mapreduce job against the hbase cluster that is pointed to by your local configuration on a cluster that the hadoop configs are pointing to.
88 The main for the `hbase-mapreduce.jar` is a Driver that lists a few basic mapreduce tasks that ship with hbase.
89 For example, presuming your install is hbase `2.0.0-SNAPSHOT`:
91 [source,bash]
92 ----
93 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
94   ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar
95 An example program must be given as the first argument.
96 Valid program names are:
97   CellCounter: Count cells in HBase table.
98   WALPlayer: Replay WAL files.
99   completebulkload: Complete a bulk data load.
100   copytable: Export a table from local cluster to peer cluster.
101   export: Write table data to HDFS.
102   exportsnapshot: Export the specific snapshot to a given FileSystem.
103   import: Import data written by Export.
104   importtsv: Import data in TSV format.
105   rowcounter: Count rows in HBase table.
106   verifyrep: Compare the data from tables in two different clusters. WARNING: It doesn't work for incrementColumnValues'd cells since the timestamp is changed after being appended to the log.
108 ----
110 You can use the above listed shortnames for mapreduce jobs as in the below re-run of the row counter job (again, presuming your install is hbase `2.0.0-SNAPSHOT`):
112 [source,bash]
113 ----
114 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` \
115   ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-mapreduce-2.0.0-SNAPSHOT.jar \
116   rowcounter usertable
117 ----
119 You might find the more selective `hbase mapredcp` tool output of interest; it lists the minimum set of jars needed
120 to run a basic mapreduce job against an hbase install. It does not include configuration. You'll probably need to add
121 these if you want your MapReduce job to find the target cluster. You'll probably have to also add pointers to extra jars
122 once you start to do anything of substance. Just specify the extras by passing the system propery `-Dtmpjars` when
123 you run `hbase mapredcp`. 
125 For jobs that do not package their dependencies or call `TableMapReduceUtil#addDependencyJars`, the following command structure is necessary:
127 [source,bash]
128 ----
129 $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...
130 ----
132 [NOTE]
133 ====
134 The example may not work if you are running HBase from its build directory rather than an installed location.
135 You may see an error like the following:
137 ----
138 java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper
139 ----
141 If this occurs, try modifying the command as follows, so that it uses the HBase JARs from the _target/_ directory within the build environment.
143 [source,bash]
144 ----
145 $ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar:`${HBASE_BUILD_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-mapreduce/target/hbase-mapreduce-VERSION-SNAPSHOT.jar rowcounter usertable
146 ----
147 ====
149 .Notice to MapReduce users of HBase between 0.96.1 and 0.98.4
150 [CAUTION]
151 ====
152 Some MapReduce jobs that use HBase fail to launch.
153 The symptom is an exception similar to the following:
155 ----
156 Exception in thread "main" java.lang.IllegalAccessError: class
157     com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass
158     com.google.protobuf.LiteralByteString
159     at java.lang.ClassLoader.defineClass1(Native Method)
160     at java.lang.ClassLoader.defineClass(ClassLoader.java:792)
161     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
162     at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
163     at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
164     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
165     at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
166     at java.security.AccessController.doPrivileged(Native Method)
167     at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
168     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
169     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
170     at
171     org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818)
172     at
173     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433)
174     at
175     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186)
176     at
177     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147)
178     at
179     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270)
180     at
181     org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100)
183 ----
185 This is caused by an optimization introduced in link:https://issues.apache.org/jira/browse/HBASE-9867[HBASE-9867] that inadvertently introduced a classloader dependency.
187 This affects both jobs using the `-libjars` option and "fat jar," those which package their runtime dependencies in a nested `lib` folder.
189 In order to satisfy the new classloader requirements, `hbase-protocol.jar` must be included in Hadoop's classpath.
190 See <<hbase.mapreduce.classpath>> for current recommendations for resolving classpath errors.
191 The following is included for historical purposes.
193 This can be resolved system-wide by including a reference to the `hbase-protocol.jar` in Hadoop's lib directory, via a symlink or by copying the jar into the new location.
195 This can also be achieved on a per-job launch basis by including it in the `HADOOP_CLASSPATH` environment variable at job submission time.
196 When launching jobs that package their dependencies, all three of the following job launching commands satisfy this requirement:
198 [source,bash]
199 ----
200 $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
201 $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
202 $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass
203 ----
205 For jars that do not package their dependencies, the following command structure is necessary:
207 [source,bash]
208 ----
209 $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...
210 ----
212 See also link:https://issues.apache.org/jira/browse/HBASE-10304[HBASE-10304] for further discussion of this issue.
213 ====
215 == MapReduce Scan Caching
217 TableMapReduceUtil now restores the option to set scanner caching (the number of rows which are cached before returning the result to the client) on the Scan object that is passed in.
218 This functionality was lost due to a bug in HBase 0.95 (link:https://issues.apache.org/jira/browse/HBASE-11558[HBASE-11558]), which is fixed for HBase 0.98.5 and 0.96.3.
219 The priority order for choosing the scanner caching is as follows:
221 . Caching settings which are set on the scan object.
222 . Caching settings which are specified via the configuration option `hbase.client.scanner.caching`, which can either be set manually in _hbase-site.xml_ or via the helper method `TableMapReduceUtil.setScannerCaching()`.
223 . The default value `HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING`, which is set to `100`.
225 Optimizing the caching settings is a balance between the time the client waits for a result and the number of sets of results the client needs to receive.
226 If the caching setting is too large, the client could end up waiting for a long time or the request could even time out.
227 If the setting is too small, the scan needs to return results in several pieces.
228 If you think of the scan as a shovel, a bigger cache setting is analogous to a bigger shovel, and a smaller cache setting is equivalent to more shoveling in order to fill the bucket.
230 The list of priorities mentioned above allows you to set a reasonable default, and override it for specific operations.
232 See the API documentation for link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html[Scan] for more details.
234 == Bundled HBase MapReduce Jobs
236 The HBase JAR also serves as a Driver for some bundled MapReduce jobs.
237 To learn about the bundled MapReduce jobs, run the following command.
239 [source,bash]
240 ----
241 $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar
242 An example program must be given as the first argument.
243 Valid program names are:
244   copytable: Export a table from local cluster to peer cluster
245   completebulkload: Complete a bulk data load.
246   export: Write table data to HDFS.
247   import: Import data written by Export.
248   importtsv: Import data in TSV format.
249   rowcounter: Count rows in HBase table
250 ----
252 Each of the valid program names are bundled MapReduce jobs.
253 To run one of the jobs, model your command after the following example.
255 [source,bash]
256 ----
257 $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-mapreduce-VERSION.jar rowcounter myTable
258 ----
260 == HBase as a MapReduce Job Data Source and Data Sink
262 HBase can be used as a data source, link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html[TableInputFormat], and data sink, link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html[TableOutputFormat] or link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.html[MultiTableOutputFormat], for MapReduce jobs.
263 Writing MapReduce jobs that read or write HBase, it is advisable to subclass link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html[TableMapper]        and/or link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableReducer.html[TableReducer].
264 See the do-nothing pass-through classes link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/IdentityTableMapper.html[IdentityTableMapper] and link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/IdentityTableReducer.html[IdentityTableReducer] for basic usage.
265 For a more involved example, see link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html[RowCounter] or review the `org.apache.hadoop.hbase.mapreduce.TestTableMapReduce` unit test.
267 If you run MapReduce jobs that use HBase as source or sink, need to specify source and sink table and column names in your configuration.
269 When you read from HBase, the `TableInputFormat` requests the list of regions from HBase and makes a map, which is either a `map-per-region` or `mapreduce.job.maps` map, whichever is smaller.
270 If your job only has two maps, raise `mapreduce.job.maps` to a number greater than the number of regions.
271 Maps will run on the adjacent TaskTracker/NodeManager if you are running a TaskTracer/NodeManager and RegionServer per node.
272 When writing to HBase, it may make sense to avoid the Reduce step and write back into HBase from within your map.
273 This approach works when your job does not need the sort and collation that MapReduce does on the map-emitted data.
274 On insert, HBase 'sorts' so there is no point double-sorting (and shuffling data around your MapReduce cluster) unless you need to.
275 If you do not need the Reduce, your map might emit counts of records processed for reporting at the end of the job, or set the number of Reduces to zero and use TableOutputFormat.
276 If running the Reduce step makes sense in your case, you should typically use multiple reducers so that load is spread across the HBase cluster.
278 A new HBase partitioner, the link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.html[HRegionPartitioner], can run as many reducers the number of existing regions.
279 The HRegionPartitioner is suitable when your table is large and your upload will not greatly alter the number of existing regions upon completion.
280 Otherwise use the default partitioner.
282 == Writing HFiles Directly During Bulk Import
284 If you are importing into a new table, you can bypass the HBase API and write your content directly to the filesystem, formatted into HBase data files (HFiles). Your import will run faster, perhaps an order of magnitude faster.
285 For more on how this mechanism works, see <<arch.bulk.load>>.
287 == RowCounter Example
289 The included link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/RowCounter.html[RowCounter] MapReduce job uses `TableInputFormat` and does a count of all rows in the specified table.
290 To run it, use the following command:
292 [source,bash]
293 ----
294 $ ./bin/hadoop jar hbase-X.X.X.jar
295 ----
297 This will invoke the HBase MapReduce Driver class.
298 Select `rowcounter` from the choice of jobs offered.
299 This will print rowcounter usage advice to standard output.
300 Specify the tablename, column to count, and output directory.
301 If you have classpath errors, see <<hbase.mapreduce.classpath>>.
303 [[splitter]]
304 == Map-Task Splitting
306 [[splitter.default]]
307 === The Default HBase MapReduce Splitter
309 When link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html[TableInputFormat] is used to source an HBase table in a MapReduce job, its splitter will make a map task for each region of the table.
310 Thus, if there are 100 regions in the table, there will be 100 map-tasks for the job - regardless of how many column families are selected in the Scan.
312 [[splitter.custom]]
313 === Custom Splitters
315 For those interested in implementing custom splitters, see the method `getSplits` in link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.html[TableInputFormatBase].
316 That is where the logic for map-task assignment resides.
318 [[mapreduce.example]]
319 == HBase MapReduce Examples
321 [[mapreduce.example.read]]
322 === HBase MapReduce Read Example
324 The following is an example of using HBase as a MapReduce source in read-only manner.
325 Specifically, there is a Mapper instance but no Reducer, and nothing is being emitted from the Mapper.
326 The job would be defined as follows...
328 [source,java]
329 ----
330 Configuration config = HBaseConfiguration.create();
331 Job job = new Job(config, "ExampleRead");
332 job.setJarByClass(MyReadJob.class);     // class that contains mapper
334 Scan scan = new Scan();
335 scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
336 scan.setCacheBlocks(false);  // don't set to true for MR jobs
337 // set other scan attrs
340 TableMapReduceUtil.initTableMapperJob(
341   tableName,        // input HBase table name
342   scan,             // Scan instance to control CF and attribute selection
343   MyMapper.class,   // mapper
344   null,             // mapper output key
345   null,             // mapper output value
346   job);
347 job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper
349 boolean b = job.waitForCompletion(true);
350 if (!b) {
351   throw new IOException("error with job!");
353 ----
355 ...and the mapper instance would extend link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableMapper.html[TableMapper]...
357 [source,java]
358 ----
359 public static class MyMapper extends TableMapper<Text, Text> {
361   public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
362     // process data for the row from the Result instance.
363    }
365 ----
367 [[mapreduce.example.readwrite]]
368 === HBase MapReduce Read/Write Example
370 The following is an example of using HBase both as a source and as a sink with MapReduce.
371 This example will simply copy data from one table to another.
373 [source,java]
374 ----
375 Configuration config = HBaseConfiguration.create();
376 Job job = new Job(config,"ExampleReadWrite");
377 job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper
379 Scan scan = new Scan();
380 scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
381 scan.setCacheBlocks(false);  // don't set to true for MR jobs
382 // set other scan attrs
384 TableMapReduceUtil.initTableMapperJob(
385   sourceTable,      // input table
386   scan,             // Scan instance to control CF and attribute selection
387   MyMapper.class,   // mapper class
388   null,             // mapper output key
389   null,             // mapper output value
390   job);
391 TableMapReduceUtil.initTableReducerJob(
392   targetTable,      // output table
393   null,             // reducer class
394   job);
395 job.setNumReduceTasks(0);
397 boolean b = job.waitForCompletion(true);
398 if (!b) {
399     throw new IOException("error with job!");
401 ----
403 An explanation is required of what `TableMapReduceUtil` is doing, especially with the reducer. link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.html[TableOutputFormat] is being used as the outputFormat class, and several parameters are being set on the config (e.g., `TableOutputFormat.OUTPUT_TABLE`), as well as setting the reducer output key to `ImmutableBytesWritable` and reducer value to `Writable`.
404 These could be set by the programmer on the job and conf, but `TableMapReduceUtil` tries to make things easier.
406 The following is the example mapper, which will create a `Put` and matching the input `Result` and emit it.
407 Note: this is what the CopyTable utility does.
409 [source,java]
410 ----
411 public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>  {
413   public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
414     // this example is just copying the data from the source table...
415       context.write(row, resultToPut(row,value));
416     }
418     private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
419       Put put = new Put(key.get());
420       for (KeyValue kv : result.raw()) {
421         put.add(kv);
422       }
423       return put;
424     }
426 ----
428 There isn't actually a reducer step, so `TableOutputFormat` takes care of sending the `Put` to the target table.
430 This is just an example, developers could choose not to use `TableOutputFormat` and connect to the target table themselves.
432 [[mapreduce.example.readwrite.multi]]
433 === HBase MapReduce Read/Write Example With Multi-Table Output
435 TODO: example for `MultiTableOutputFormat`.
437 [[mapreduce.example.summary]]
438 === HBase MapReduce Summary to HBase Example
440 The following example uses HBase as a MapReduce source and sink with a summarization step.
441 This example will count the number of distinct instances of a value in a table and write those summarized counts in another table.
443 [source,java]
444 ----
445 Configuration config = HBaseConfiguration.create();
446 Job job = new Job(config,"ExampleSummary");
447 job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer
449 Scan scan = new Scan();
450 scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
451 scan.setCacheBlocks(false);  // don't set to true for MR jobs
452 // set other scan attrs
454 TableMapReduceUtil.initTableMapperJob(
455   sourceTable,        // input table
456   scan,               // Scan instance to control CF and attribute selection
457   MyMapper.class,     // mapper class
458   Text.class,         // mapper output key
459   IntWritable.class,  // mapper output value
460   job);
461 TableMapReduceUtil.initTableReducerJob(
462   targetTable,        // output table
463   MyTableReducer.class,    // reducer class
464   job);
465 job.setNumReduceTasks(1);   // at least one, adjust as required
467 boolean b = job.waitForCompletion(true);
468 if (!b) {
469   throw new IOException("error with job!");
471 ----
473 In this example mapper a column with a String-value is chosen as the value to summarize upon.
474 This value is used as the key to emit from the mapper, and an `IntWritable` represents an instance counter.
476 [source,java]
477 ----
478 public static class MyMapper extends TableMapper<Text, IntWritable>  {
479   public static final byte[] CF = "cf".getBytes();
480   public static final byte[] ATTR1 = "attr1".getBytes();
482   private final IntWritable ONE = new IntWritable(1);
483   private Text text = new Text();
485   public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
486     String val = new String(value.getValue(CF, ATTR1));
487     text.set(val);     // we can only emit Writables...
488     context.write(text, ONE);
489   }
491 ----
493 In the reducer, the "ones" are counted (just like any other MR example that does this), and then emits a `Put`.
495 [source,java]
496 ----
497 public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {
498   public static final byte[] CF = "cf".getBytes();
499   public static final byte[] COUNT = "count".getBytes();
501   public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
502     int i = 0;
503     for (IntWritable val : values) {
504       i += val.get();
505     }
506     Put put = new Put(Bytes.toBytes(key.toString()));
507     put.add(CF, COUNT, Bytes.toBytes(i));
509     context.write(null, put);
510   }
512 ----
514 [[mapreduce.example.summary.file]]
515 === HBase MapReduce Summary to File Example
517 This very similar to the summary example above, with exception that this is using HBase as a MapReduce source but HDFS as the sink.
518 The differences are in the job setup and in the reducer.
519 The mapper remains the same.
521 [source,java]
522 ----
523 Configuration config = HBaseConfiguration.create();
524 Job job = new Job(config,"ExampleSummaryToFile");
525 job.setJarByClass(MySummaryFileJob.class);     // class that contains mapper and reducer
527 Scan scan = new Scan();
528 scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
529 scan.setCacheBlocks(false);  // don't set to true for MR jobs
530 // set other scan attrs
532 TableMapReduceUtil.initTableMapperJob(
533   sourceTable,        // input table
534   scan,               // Scan instance to control CF and attribute selection
535   MyMapper.class,     // mapper class
536   Text.class,         // mapper output key
537   IntWritable.class,  // mapper output value
538   job);
539 job.setReducerClass(MyReducer.class);    // reducer class
540 job.setNumReduceTasks(1);    // at least one, adjust as required
541 FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile"));  // adjust directories as required
543 boolean b = job.waitForCompletion(true);
544 if (!b) {
545   throw new IOException("error with job!");
547 ----
549 As stated above, the previous Mapper can run unchanged with this example.
550 As for the Reducer, it is a "generic" Reducer instead of extending TableMapper and emitting Puts.
552 [source,java]
553 ----
554 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {
556   public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
557     int i = 0;
558     for (IntWritable val : values) {
559       i += val.get();
560     }
561     context.write(key, new IntWritable(i));
562   }
564 ----
566 [[mapreduce.example.summary.noreducer]]
567 === HBase MapReduce Summary to HBase Without Reducer
569 It is also possible to perform summaries without a reducer - if you use HBase as the reducer.
571 An HBase target table would need to exist for the job summary.
572 The Table method `incrementColumnValue` would be used to atomically increment values.
573 From a performance perspective, it might make sense to keep a Map of values with their values to be incremented for each map-task, and make one update per key at during the `cleanup` method of the mapper.
574 However, your mileage may vary depending on the number of rows to be processed and unique keys.
576 In the end, the summary results are in HBase.
578 [[mapreduce.example.summary.rdbms]]
579 === HBase MapReduce Summary to RDBMS
581 Sometimes it is more appropriate to generate summaries to an RDBMS.
582 For these cases, it is possible to generate summaries directly to an RDBMS via a custom reducer.
583 The `setup` method can connect to an RDBMS (the connection information can be passed via custom parameters in the context) and the cleanup method can close the connection.
585 It is critical to understand that number of reducers for the job affects the summarization implementation, and you'll have to design this into your reducer.
586 Specifically, whether it is designed to run as a singleton (one reducer) or multiple reducers.
587 Neither is right or wrong, it depends on your use-case.
588 Recognize that the more reducers that are assigned to the job, the more simultaneous connections to the RDBMS will be created - this will scale, but only to a point.
590 [source,java]
591 ----
592 public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {
594   private Connection c = null;
596   public void setup(Context context) {
597     // create DB connection...
598   }
600   public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
601     // do summarization
602     // in this example the keys are Text, but this is just an example
603   }
605   public void cleanup(Context context) {
606     // close db connection
607   }
610 ----
612 In the end, the summary results are written to your RDBMS table/s.
614 [[mapreduce.htable.access]]
615 == Accessing Other HBase Tables in a MapReduce Job
617 Although the framework currently allows one HBase table as input to a MapReduce job, other HBase tables can be accessed as lookup tables, etc., in a MapReduce job via creating an Table instance in the setup method of the Mapper.
618 [source,java]
619 ----
620 public class MyMapper extends TableMapper<Text, LongWritable> {
621   private Table myOtherTable;
623   public void setup(Context context) {
624     // In here create a Connection to the cluster and save it or use the Connection
625     // from the existing table
626     myOtherTable = connection.getTable("myOtherTable");
627   }
629   public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
630     // process Result...
631     // use 'myOtherTable' for lookups
632   }
633 ----
635 [[mapreduce.specex]]
636 == Speculative Execution
638 It is generally advisable to turn off speculative execution for MapReduce jobs that use HBase as a source.
639 This can either be done on a per-Job basis through properties, or on the entire cluster.
640 Especially for longer running jobs, speculative execution will create duplicate map-tasks which will double-write your data to HBase; this is probably not what you want.
642 See <<spec.ex,spec.ex>> for more information.
644 [[cascading]]
645 == Cascading
647 link:http://www.cascading.org/[Cascading] is an alternative API for MapReduce, which
648 actually uses MapReduce, but allows you to write your MapReduce code in a simplified
649 way.
651 The following example shows a Cascading `Flow` which "sinks" data into an HBase cluster. The same
652 `hBaseTap` API could be used to "source" data as well.
654 [source, java]
655 ----
656 // read data from the default filesystem
657 // emits two fields: "offset" and "line"
658 Tap source = new Hfs( new TextLine(), inputFileLhs );
660 // store data in an HBase cluster
661 // accepts fields "num", "lower", and "upper"
662 // will automatically scope incoming fields to their proper familyname, "left" or "right"
663 Fields keyFields = new Fields( "num" );
664 String[] familyNames = {"left", "right"};
665 Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" ) };
666 Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
668 // a simple pipe assembly to parse the input into fields
669 // a real app would likely chain multiple Pipes together for more complex processing
670 Pipe parsePipe = new Each( "insert", new Fields( "line" ), new RegexSplitter( new Fields( "num", "lower", "upper" ), " " ) );
672 // "plan" a cluster executable Flow
673 // this connects the source Tap and hBaseTap (the sink Tap) to the parsePipe
674 Flow parseFlow = new FlowConnector( properties ).connect( source, hBaseTap, parsePipe );
676 // start the flow, and block until complete
677 parseFlow.complete();
679 // open an iterator on the HBase table we stuffed data into
680 TupleEntryIterator iterator = parseFlow.openSink();
682 while(iterator.hasNext())
683   {
684   // print out each tuple from HBase
685   System.out.println( "iterator.next() = " + iterator.next() );
686   }
688 iterator.close();
689 ----