2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.mapreduceExec
;
8 import java
.io
.FileOutputStream
;
9 import java
.io
.IOException
;
10 import java
.io
.InputStream
;
11 import java
.io
.ObjectInputStream
;
12 import java
.util
.Iterator
;
14 import org
.apache
.hadoop
.fs
.FileSystem
;
15 import org
.apache
.hadoop
.io
.Writable
;
16 import org
.apache
.hadoop
.io
.WritableComparable
;
17 import org
.apache
.hadoop
.mapred
.JobConf
;
18 import org
.apache
.hadoop
.mapred
.MapRunnable
;
19 import org
.apache
.hadoop
.mapred
.OutputCollector
;
20 import org
.apache
.hadoop
.mapred
.RecordReader
;
21 import org
.apache
.hadoop
.mapred
.Reducer
;
22 import org
.apache
.hadoop
.mapred
.Reporter
;
24 import com
.yahoo
.pig
.data
.BagFactory
;
25 import com
.yahoo
.pig
.data
.BigDataBag
;
26 import com
.yahoo
.pig
.data
.DataCollector
;
27 import com
.yahoo
.pig
.data
.Datum
;
28 import com
.yahoo
.pig
.data
.IndexedTuple
;
29 import com
.yahoo
.pig
.data
.Tuple
;
30 import com
.yahoo
.pig
.impl
.PigContext
;
31 import com
.yahoo
.pig
.impl
.eval
.*;
32 import com
.yahoo
.pig
.impl
.eval
.groupby
.GroupBySpec
;
33 import com
.yahoo
.pig
.impl
.mapreduceExec
.PigOutputFormat
.PigRecordWriter
;
36 * This class is a wrapper of sorts for Pig Map/Reduce jobs. Both the Mapper and the Reducer are
37 * implemented by this class. The methods of this class are driven by job configuration variables:
40 * <dd>A semi-colon separated list of inputs. If an input uses a special parser, it will be
41 * specified by adding a colon and the name of the parser to the input. For example:
42 * /tmp/names.txt;/tmp/logs.dat:com.yahoo.research.pig.parser.LogParser will parse /tmp/names.txt
43 * using the default parser and /tmp/logs.dat using com.yahoo.research.pig.parser.LogParser.</dd>
44 * <dt>pig.mapFuncs</dt>
45 * <dd>A semi-colon separated list of functions-specification to be applied to the inputs in the
46 * Map phase. This list must have the same number of items as pig.inputs because the each
47 * functions-spectification will be matched to the corresponding input.</dd>
48 * <dt>pig.groupFuncs</dt>
49 * <dd>A semi-colon separated list of group functions. As with pig.mapFuncs, this list must have
50 * the same number of items as pig.inputs because the each group function will be matched to the
51 * corresponding input.</dd>
52 * <dt>pig.reduceFuncs</dt>
53 * <dd>functions-specification to be applied to the tuples passed into the Reduce phase.</dd>
58 public class PigMapReduce
implements MapRunnable
, Reducer
{
60 public static Reporter reporter
= null;
63 private DataCollector evalPipe
;
64 private OutputCollector oc
;
65 private GroupBySpec group
;
66 private PigRecordWriter pigWriter
;
68 private int inputCount
;
69 private boolean isInner
[];
70 private BigDataBag bags
[];
74 * This function is called in MapTask by Hadoop as the Mapper.run() method. We basically pull
75 * the tuples from our PigRecordReader (see ugly ThreadLocal hack), pipe the tuples through the
76 * function pipeline and then close the writer.
78 public void run(RecordReader input
, OutputCollector output
, Reporter reporter
) throws IOException
{
79 PigMapReduce
.reporter
= reporter
;
82 tmpdir
= new File(job
.get("mapred.task.id"));
84 BagFactory
.init(tmpdir
);
86 setupMapPipe(reporter
);
88 // allocate key & value instances that are re-used for all entries
89 WritableComparable key
= input
.createKey();
90 Writable value
= input
.createValue();
91 while (input
.next(key
, value
)) {
92 evalPipe
.add((Tuple
) value
);
94 evalPipe
.add(null); // EOF marker
96 if (pigWriter
!= null) {
97 pigWriter
.close(reporter
);
101 public void reduce(WritableComparable key
, Iterator values
, OutputCollector output
, Reporter reporter
)
104 PigMapReduce
.reporter
= reporter
;
107 tmpdir
= new File(job
.get("mapred.task.id"));
110 BagFactory
.init(tmpdir
);
113 if (evalPipe
== null) {
117 Datum groupName
= ((Tuple
) key
).getField(0);
118 Tuple t
= new Tuple(1 + inputCount
);
119 t
.setField(0, groupName
);
120 for (int i
= 1; i
< 1 + inputCount
; i
++) {
122 t
.setField(i
, bags
[i
- 1]);
125 while (values
.hasNext()) {
126 IndexedTuple it
= (IndexedTuple
) values
.next();
127 t
.getBagField(it
.index
+ 1).add(it
);
129 for (int i
= 0; i
< inputCount
; i
++) {
130 if (isInner
[i
] && t
.getBagField(1 + i
).isEmpty())
134 evalPipe
.add(null); // EOF marker
135 } catch (Throwable tr
) {
136 tr
.printStackTrace();
137 RuntimeException exp
= new RuntimeException(tr
.getMessage());
138 exp
.setStackTrace(tr
.getStackTrace());
144 * Just save off the JobConf for later use.
146 public void configure(JobConf job
) {
151 * Nothing happens here.
153 public void close() throws IOException
{
156 static PigContext
getPigContext() {
158 InputStream is
= PigMapReduce
.class.getResourceAsStream("/pigContext");
159 if (is
== null) throw new RuntimeException("/pigContext not found!");
160 return (PigContext
)new ObjectInputStream(is
).readObject();
161 } catch (Exception e
) {
162 throw new RuntimeException(e
);
165 private void setupMapPipe(Reporter reporter
) throws IOException
{
166 PigSplit split
= PigInputFormat
.PigRecordReader
.getPigRecordReader().getPigFileSplit();
168 index
= split
.getIndex();
169 String evalSpec
= split
.getApplyFunction();
170 String groupSpec
= split
.getGroupFunction();
171 PigContext pigContext
= getPigContext();
172 if (groupSpec
.length() == 0) {
174 String taskidParts
[] = job
.get("mapred.task.id").split("_"); // The format is
175 // tip_job_m_partition_try
176 String taskid
= "map-" + taskidParts
[taskidParts
.length
- 2];
177 pigWriter
= (PigRecordWriter
) job
.getOutputFormat().getRecordWriter(FileSystem
.get(job
), job
, taskid
,
179 oc
= new OutputCollector() {
180 public void collect(WritableComparable key
, Writable value
) throws IOException
{
181 pigWriter
.write(key
, value
);
185 group
= new GroupBySpec(pigContext
, groupSpec
);
188 evalPipe
= (new EvalSpecPipe(pigContext
, evalSpec
)).collector(new MapDataOutputCollector());
191 private void setupReducePipe() throws IOException
{
192 String evalSpec
= job
.get("pig.reduceFunc", "");
193 String
[] groupSpecs
= job
.get("pig.groupFuncs", "").split(";");
194 PigContext pigContext
= getPigContext();
195 isInner
= new boolean[groupSpecs
.length
];
196 for (int i
= 0; i
< groupSpecs
.length
; i
++) {
197 isInner
[i
] = (new GroupBySpec(pigContext
, groupSpecs
[i
])).isInner
;
200 evalPipe
= (new EvalSpecPipe(pigContext
, evalSpec
)).collector(new ReduceDataOutputCollector());
202 inputCount
= job
.get("pig.inputs").split(";", -1).length
;
204 bags
= new BigDataBag
[inputCount
];
205 for (int i
= 0; i
< inputCount
; i
++) {
206 bags
[i
] = BagFactory
.getInstance().getNewBigBag();
210 class MapDataOutputCollector
extends DataCollector
{
211 public void add(Tuple t
) throws IOException
{
212 if (t
== null) return; // EOF marker from eval pipeline; ignore
217 Datum
[] multGroups
= group
.eval(t
);
218 for (int i
= 0; i
< multGroups
.length
; i
++) {
219 oc
.collect(new Tuple(multGroups
[i
]), new IndexedTuple(t
, index
));
220 // wrap group label in a tuple, so it becomes writable.
226 class ReduceDataOutputCollector
extends DataCollector
{
227 public void add(Tuple t
) throws IOException
{
228 if (t
== null) return; // EOF marker from eval pipeline; ignore