Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / mapreduceExec / PigMapReduce.java
blobd3433982ff8868fb7aefb8f31499e1ccf294525c
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.mapreduceExec;
7 import java.io.File;
8 import java.io.FileOutputStream;
9 import java.io.IOException;
10 import java.io.InputStream;
11 import java.io.ObjectInputStream;
12 import java.util.Iterator;
14 import org.apache.hadoop.fs.FileSystem;
15 import org.apache.hadoop.io.Writable;
16 import org.apache.hadoop.io.WritableComparable;
17 import org.apache.hadoop.mapred.JobConf;
18 import org.apache.hadoop.mapred.MapRunnable;
19 import org.apache.hadoop.mapred.OutputCollector;
20 import org.apache.hadoop.mapred.RecordReader;
21 import org.apache.hadoop.mapred.Reducer;
22 import org.apache.hadoop.mapred.Reporter;
24 import com.yahoo.pig.data.BagFactory;
25 import com.yahoo.pig.data.BigDataBag;
26 import com.yahoo.pig.data.DataCollector;
27 import com.yahoo.pig.data.Datum;
28 import com.yahoo.pig.data.IndexedTuple;
29 import com.yahoo.pig.data.Tuple;
30 import com.yahoo.pig.impl.PigContext;
31 import com.yahoo.pig.impl.eval.*;
32 import com.yahoo.pig.impl.eval.groupby.GroupBySpec;
33 import com.yahoo.pig.impl.mapreduceExec.PigOutputFormat.PigRecordWriter;
35 /**
36 * This class is a wrapper of sorts for Pig Map/Reduce jobs. Both the Mapper and the Reducer are
37 * implemented by this class. The methods of this class are driven by job configuration variables:
38 * <dl>
39 * <dt>pig.inputs</dt>
40 * <dd>A semi-colon separated list of inputs. If an input uses a special parser, it will be
41 * specified by adding a colon and the name of the parser to the input. For example:
42 * /tmp/names.txt;/tmp/logs.dat:com.yahoo.research.pig.parser.LogParser will parse /tmp/names.txt
43 * using the default parser and /tmp/logs.dat using com.yahoo.research.pig.parser.LogParser.</dd>
44 * <dt>pig.mapFuncs</dt>
45 * <dd>A semi-colon separated list of functions-specification to be applied to the inputs in the
46 * Map phase. This list must have the same number of items as pig.inputs because the each
47 * functions-spectification will be matched to the corresponding input.</dd>
48 * <dt>pig.groupFuncs</dt>
49 * <dd>A semi-colon separated list of group functions. As with pig.mapFuncs, this list must have
50 * the same number of items as pig.inputs because the each group function will be matched to the
51 * corresponding input.</dd>
52 * <dt>pig.reduceFuncs</dt>
53 * <dd>functions-specification to be applied to the tuples passed into the Reduce phase.</dd>
54 * </dl>
56 * @author breed
58 public class PigMapReduce implements MapRunnable, Reducer {
60 public static Reporter reporter = null;
62 private JobConf job;
63 private DataCollector evalPipe;
64 private OutputCollector oc;
65 private GroupBySpec group;
66 private PigRecordWriter pigWriter;
67 private int index;
68 private int inputCount;
69 private boolean isInner[];
70 private BigDataBag bags[];
71 private File tmpdir;
73 /**
74 * This function is called in MapTask by Hadoop as the Mapper.run() method. We basically pull
75 * the tuples from our PigRecordReader (see ugly ThreadLocal hack), pipe the tuples through the
76 * function pipeline and then close the writer.
78 public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException {
79 PigMapReduce.reporter = reporter;
81 oc = output;
82 tmpdir = new File(job.get("mapred.task.id"));
83 tmpdir.mkdirs();
84 BagFactory.init(tmpdir);
86 setupMapPipe(reporter);
88 // allocate key & value instances that are re-used for all entries
89 WritableComparable key = input.createKey();
90 Writable value = input.createValue();
91 while (input.next(key, value)) {
92 evalPipe.add((Tuple) value);
94 evalPipe.add(null); // EOF marker
96 if (pigWriter != null) {
97 pigWriter.close(reporter);
101 public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
102 throws IOException {
104 PigMapReduce.reporter = reporter;
106 try {
107 tmpdir = new File(job.get("mapred.task.id"));
108 tmpdir.mkdirs();
110 BagFactory.init(tmpdir);
112 oc = output;
113 if (evalPipe == null) {
114 setupReducePipe();
117 Datum groupName = ((Tuple) key).getField(0);
118 Tuple t = new Tuple(1 + inputCount);
119 t.setField(0, groupName);
120 for (int i = 1; i < 1 + inputCount; i++) {
121 bags[i - 1].clear();
122 t.setField(i, bags[i - 1]);
125 while (values.hasNext()) {
126 IndexedTuple it = (IndexedTuple) values.next();
127 t.getBagField(it.index + 1).add(it);
129 for (int i = 0; i < inputCount; i++) {
130 if (isInner[i] && t.getBagField(1 + i).isEmpty())
131 return;
133 evalPipe.add(t);
134 evalPipe.add(null); // EOF marker
135 } catch (Throwable tr) {
136 tr.printStackTrace();
137 RuntimeException exp = new RuntimeException(tr.getMessage());
138 exp.setStackTrace(tr.getStackTrace());
139 throw exp;
144 * Just save off the JobConf for later use.
146 public void configure(JobConf job) {
147 this.job = job;
151 * Nothing happens here.
153 public void close() throws IOException {
156 static PigContext getPigContext() {
157 try {
158 InputStream is = PigMapReduce.class.getResourceAsStream("/pigContext");
159 if (is == null) throw new RuntimeException("/pigContext not found!");
160 return (PigContext)new ObjectInputStream(is).readObject();
161 } catch (Exception e) {
162 throw new RuntimeException(e);
165 private void setupMapPipe(Reporter reporter) throws IOException {
166 PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
168 index = split.getIndex();
169 String evalSpec = split.getApplyFunction();
170 String groupSpec = split.getGroupFunction();
171 PigContext pigContext = getPigContext();
172 if (groupSpec.length() == 0) {
174 String taskidParts[] = job.get("mapred.task.id").split("_"); // The format is
175 // tip_job_m_partition_try
176 String taskid = "map-" + taskidParts[taskidParts.length - 2];
177 pigWriter = (PigRecordWriter) job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, taskid,
178 reporter);
179 oc = new OutputCollector() {
180 public void collect(WritableComparable key, Writable value) throws IOException {
181 pigWriter.write(key, value);
184 } else {
185 group = new GroupBySpec(pigContext, groupSpec);
188 evalPipe = (new EvalSpecPipe(pigContext, evalSpec)).collector(new MapDataOutputCollector());
191 private void setupReducePipe() throws IOException {
192 String evalSpec = job.get("pig.reduceFunc", "");
193 String[] groupSpecs = job.get("pig.groupFuncs", "").split(";");
194 PigContext pigContext = getPigContext();
195 isInner = new boolean[groupSpecs.length];
196 for (int i = 0; i < groupSpecs.length; i++) {
197 isInner[i] = (new GroupBySpec(pigContext, groupSpecs[i])).isInner;
200 evalPipe = (new EvalSpecPipe(pigContext, evalSpec)).collector(new ReduceDataOutputCollector());
202 inputCount = job.get("pig.inputs").split(";", -1).length;
204 bags = new BigDataBag[inputCount];
205 for (int i = 0; i < inputCount; i++) {
206 bags[i] = BagFactory.getInstance().getNewBigBag();
210 class MapDataOutputCollector extends DataCollector {
211 public void add(Tuple t) throws IOException {
212 if (t == null) return; // EOF marker from eval pipeline; ignore
214 if (group == null) {
215 oc.collect(null, t);
216 } else {
217 Datum[] multGroups = group.eval(t);
218 for (int i = 0; i < multGroups.length; i++) {
219 oc.collect(new Tuple(multGroups[i]), new IndexedTuple(t, index));
220 // wrap group label in a tuple, so it becomes writable.
226 class ReduceDataOutputCollector extends DataCollector {
227 public void add(Tuple t) throws IOException {
228 if (t == null) return; // EOF marker from eval pipeline; ignore
230 oc.collect(null, t);