Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / mapreduceExec / PigCombine.java
bloba25b658be9fe266785f16169532c61302eceaca4
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.mapreduceExec;
7 import java.io.File;
8 import java.io.IOException;
9 import java.util.Iterator;
11 import org.apache.hadoop.io.WritableComparable;
12 import org.apache.hadoop.mapred.JobConf;
13 import org.apache.hadoop.mapred.OutputCollector;
14 import org.apache.hadoop.mapred.Reducer;
15 import org.apache.hadoop.mapred.Reporter;
17 import com.yahoo.pig.data.BagFactory;
18 import com.yahoo.pig.data.BigDataBag;
19 import com.yahoo.pig.data.DataCollector;
20 import com.yahoo.pig.data.Datum;
21 import com.yahoo.pig.data.IndexedTuple;
22 import com.yahoo.pig.data.Tuple;
23 import com.yahoo.pig.impl.PigContext;
24 import com.yahoo.pig.impl.eval.EvalSpecPipe;
25 import com.yahoo.pig.impl.eval.groupby.GroupBySpec;
27 public class PigCombine implements Reducer {
29 private JobConf job;
30 private CombineDataOutputCollector finalout;
31 private DataCollector evalPipe;
32 private OutputCollector oc;
33 private int index;
34 private int inputCount;
35 private BigDataBag bags[];
36 private File tmpdir;
38 public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
39 throws IOException {
41 try {
42 tmpdir = new File(job.get("mapred.task.id"));
43 tmpdir.mkdirs();
45 BagFactory.init(tmpdir);
46 PigContext pigContext = PigMapReduce.getPigContext();
47 if (evalPipe == null) {
48 inputCount = job.get("pig.inputs").split(";", -1).length;
49 oc = output;
50 finalout = new CombineDataOutputCollector(oc);
51 String evalSpec = job.get("pig.combineFunc", "");
52 EvalSpecPipe esp = new EvalSpecPipe(pigContext, evalSpec);
53 evalPipe = esp.collector(finalout);
54 //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
56 inputCount = job.get("pig.inputs").split(";", -1).length;
58 bags = new BigDataBag[inputCount];
59 for (int i = 0; i < inputCount; i++) {
60 bags[i] = BagFactory.getInstance().getNewBigBag();
64 PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
65 index = split.getIndex();
67 Datum groupName = ((Tuple) key).getField(0);
68 finalout.group = ((Tuple) key);
69 finalout.index = index;
71 Tuple t = new Tuple(1 + inputCount);
72 t.setField(0, groupName);
73 for (int i = 1; i < 1 + inputCount; i++) {
74 bags[i - 1].clear();
75 t.setField(i, bags[i - 1]);
78 while (values.hasNext()) {
79 IndexedTuple it = (IndexedTuple) values.next();
80 t.getBagField(it.index + 1).add(it);
82 for (int i = 0; i < inputCount; i++) { // XXX: shouldn't we only do this if INNER flag is set?
83 if (t.getBagField(1 + i).isEmpty())
84 return;
86 // throw new RuntimeException("combine input: " + t.toString());
87 evalPipe.add(t);
88 evalPipe.add(null); // EOF marker
89 } catch (Throwable tr) {
90 tr.printStackTrace();
91 RuntimeException exp = new RuntimeException(tr.getMessage());
92 exp.setStackTrace(tr.getStackTrace());
93 throw exp;
97 /**
98 * Just save off the JobConf for later use.
100 public void configure(JobConf job) {
101 this.job = job;
105 * Nothing happens here.
107 public void close() throws IOException {
110 private static class CombineDataOutputCollector extends DataCollector {
111 OutputCollector oc = null;
112 Tuple group = null;
113 int index = -1;
115 public CombineDataOutputCollector(OutputCollector oc) {
116 this.oc = oc;
119 public void add(Tuple t) throws IOException {
120 if (t == null) return; // EOF marker from eval pipeline; ignore
121 // throw new RuntimeException("combine collector input: " + t.toString());
122 oc.collect(group, new IndexedTuple(t.getTupleField(0),index));