4 * September 3, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import com
.yahoo
.pig
.PigServer
.ExecType
;
12 import com
.yahoo
.pig
.data
.BagFactory
;
13 import com
.yahoo
.pig
.data
.DataBag
;
14 import com
.yahoo
.pig
.data
.DataCollector
;
15 import com
.yahoo
.pig
.data
.Datum
;
16 import com
.yahoo
.pig
.impl
.PigContext
;
17 import com
.yahoo
.pig
.impl
.eval
.EvalSpecPipe
;
18 import com
.yahoo
.pig
.impl
.eval
.groupby
.GroupBySpec
;
19 import galago
.tupleflow
.ExNihiloSource
;
20 import galago
.tupleflow
.IncompatibleProcessorException
;
21 import galago
.tupleflow
.Linkage
;
22 import galago
.tupleflow
.OutputClass
;
23 import galago
.tupleflow
.Parameters
.Value
;
24 import galago
.tupleflow
.Processor
;
25 import galago
.tupleflow
.Step
;
26 import galago
.tupleflow
.TupleFlowParameters
;
27 import galago
.tupleflow
.TypeReader
;
28 import galago
.tupleflow
.execution
.Verified
;
29 import java
.io
.IOException
;
30 import java
.util
.ArrayList
;
31 import java
.util
.List
;
39 @OutputClass(className
= "com.yahoo.pig.impl.galago.Tuple")
40 public class JoinTuples
implements ExNihiloSource
<Tuple
> {
43 public Processor
<Tuple
> processor
;
45 DataCollector evalPipe
;
46 ArrayList
<Input
> inputs
= new ArrayList
<Input
>();
51 public GroupBySpec group
;
52 public TypeReader
<Tuple
> reader
;
55 public boolean isInner
;
57 public void read() throws IOException
{
62 @SuppressWarnings("unchecked")
63 public JoinTuples(TupleFlowParameters parameters
) throws IOException
{
64 context
= new PigContext(ExecType
.GALAGO
);
65 // BUGBUG: probably need to freeze-dry the context
66 String evalSpec
= parameters
.getXML().get("eval", "");
68 List
<Value
> inputs
= parameters
.getXML().list("input");
69 for (int i
= 0; i
< inputs
.size(); i
++) {
70 Input input
= new Input();
71 input
.name
= inputs
.get(i
).get("name");
72 String groupSpec
= inputs
.get(i
).get("group");
73 input
.group
= new GroupBySpec(context
, groupSpec
);
74 input
.isInner
= input
.group
.isInner
;
76 input
.bag
= BagFactory
.getInstance().getNewBag();
77 input
.reader
= parameters
.getTypeReader(input
.name
);
78 this.inputs
.add(input
);
81 evalPipe
= (new EvalSpecPipe(context
, evalSpec
)).collector(new JoinCollector());
84 public class JoinCollector
extends DataCollector
{
86 public void add(com
.yahoo
.pig
.data
.Tuple t
) throws IOException
{
90 Tuple g
= new Tuple();
96 public boolean isDone() {
97 for (Input input
: inputs
) {
98 if (input
.top
!= null) {
106 private void initializeTemporaryDirectory() {
107 String temporary
= System
.getProperty( "java.io.tmpdir" );
108 BagFactory
.init( new File(temporary
) );
111 public void run() throws IOException
{
112 for (Input input
: inputs
) {
116 initializeTemporaryDirectory();
119 // find the smallest tuple
120 Datum groupName
= null;
122 for (Input input
: inputs
) {
123 if (input
.top
!= null) {
124 Datum first
= input
.top
.getField(0);
126 if (groupName
== null) {
128 } else if (groupName
.compareTo(first
) < 0) {
134 Tuple result
= new Tuple(1 + inputs
.size());
135 result
.setField(0, groupName
);
137 // now, add to the tuple
138 for (int i
= 0; i
< inputs
.size(); i
++) {
139 Input input
= inputs
.get(i
);
140 DataBag bag
= BagFactory
.getInstance().getNewBag();
142 while (input
.top
!= null && input
.top
.getField(0).equals(groupName
)) {
143 bag
.add((com
.yahoo
.pig
.data
.Tuple
) input
.top
.getField(1));
148 result
.setField(i
+ 1, bag
);
151 boolean usableTuple
= true;
153 for (int i
= 0; i
< inputs
.size(); i
++) {
154 Input input
= inputs
.get(i
);
156 if (input
.isInner
&& input
.bag
.isEmpty()) {
162 evalPipe
.add(result
);
170 public void setProcessor(Step step
) throws IncompatibleProcessorException
{
171 Linkage
.link(this, step
);