2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.mapreduceExec
;
8 import java
.io
.IOException
;
9 import java
.util
.Iterator
;
11 import org
.apache
.hadoop
.io
.WritableComparable
;
12 import org
.apache
.hadoop
.mapred
.JobConf
;
13 import org
.apache
.hadoop
.mapred
.OutputCollector
;
14 import org
.apache
.hadoop
.mapred
.Reducer
;
15 import org
.apache
.hadoop
.mapred
.Reporter
;
17 import com
.yahoo
.pig
.data
.BagFactory
;
18 import com
.yahoo
.pig
.data
.BigDataBag
;
19 import com
.yahoo
.pig
.data
.DataCollector
;
20 import com
.yahoo
.pig
.data
.Datum
;
21 import com
.yahoo
.pig
.data
.IndexedTuple
;
22 import com
.yahoo
.pig
.data
.Tuple
;
23 import com
.yahoo
.pig
.impl
.PigContext
;
24 import com
.yahoo
.pig
.impl
.eval
.EvalSpecPipe
;
25 import com
.yahoo
.pig
.impl
.eval
.groupby
.GroupBySpec
;
27 public class PigCombine
implements Reducer
{
30 private CombineDataOutputCollector finalout
;
31 private DataCollector evalPipe
;
32 private OutputCollector oc
;
34 private int inputCount
;
35 private BigDataBag bags
[];
38 public void reduce(WritableComparable key
, Iterator values
, OutputCollector output
, Reporter reporter
)
42 tmpdir
= new File(job
.get("mapred.task.id"));
45 BagFactory
.init(tmpdir
);
46 PigContext pigContext
= PigMapReduce
.getPigContext();
47 if (evalPipe
== null) {
48 inputCount
= job
.get("pig.inputs").split(";", -1).length
;
50 finalout
= new CombineDataOutputCollector(oc
);
51 String evalSpec
= job
.get("pig.combineFunc", "");
52 EvalSpecPipe esp
= new EvalSpecPipe(pigContext
, evalSpec
);
53 evalPipe
= esp
.collector(finalout
);
54 //throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
56 inputCount
= job
.get("pig.inputs").split(";", -1).length
;
58 bags
= new BigDataBag
[inputCount
];
59 for (int i
= 0; i
< inputCount
; i
++) {
60 bags
[i
] = BagFactory
.getInstance().getNewBigBag();
64 PigSplit split
= PigInputFormat
.PigRecordReader
.getPigRecordReader().getPigFileSplit();
65 index
= split
.getIndex();
67 Datum groupName
= ((Tuple
) key
).getField(0);
68 finalout
.group
= ((Tuple
) key
);
69 finalout
.index
= index
;
71 Tuple t
= new Tuple(1 + inputCount
);
72 t
.setField(0, groupName
);
73 for (int i
= 1; i
< 1 + inputCount
; i
++) {
75 t
.setField(i
, bags
[i
- 1]);
78 while (values
.hasNext()) {
79 IndexedTuple it
= (IndexedTuple
) values
.next();
80 t
.getBagField(it
.index
+ 1).add(it
);
82 for (int i
= 0; i
< inputCount
; i
++) { // XXX: shouldn't we only do this if INNER flag is set?
83 if (t
.getBagField(1 + i
).isEmpty())
86 // throw new RuntimeException("combine input: " + t.toString());
88 evalPipe
.add(null); // EOF marker
89 } catch (Throwable tr
) {
91 RuntimeException exp
= new RuntimeException(tr
.getMessage());
92 exp
.setStackTrace(tr
.getStackTrace());
98 * Just save off the JobConf for later use.
100 public void configure(JobConf job
) {
105 * Nothing happens here.
107 public void close() throws IOException
{
110 private static class CombineDataOutputCollector
extends DataCollector
{
111 OutputCollector oc
= null;
115 public CombineDataOutputCollector(OutputCollector oc
) {
119 public void add(Tuple t
) throws IOException
{
120 if (t
== null) return; // EOF marker from eval pipeline; ignore
121 // throw new RuntimeException("combine collector input: " + t.toString());
122 oc
.collect(group
, new IndexedTuple(t
.getTupleField(0),index
));