Bringing tree up to date.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / JoinTuples.java
blob83fb91cb5554c348cb1e3d43f86d29c52cfecf2e
1 /*
2 * JoinTuples
4 * September 3, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import com.yahoo.pig.PigServer.ExecType;
12 import com.yahoo.pig.data.BagFactory;
13 import com.yahoo.pig.data.DataBag;
14 import com.yahoo.pig.data.DataCollector;
15 import com.yahoo.pig.data.Datum;
16 import com.yahoo.pig.impl.PigContext;
17 import com.yahoo.pig.impl.eval.EvalSpecPipe;
18 import com.yahoo.pig.impl.eval.groupby.GroupBySpec;
19 import galago.tupleflow.ExNihiloSource;
20 import galago.tupleflow.IncompatibleProcessorException;
21 import galago.tupleflow.Linkage;
22 import galago.tupleflow.OutputClass;
23 import galago.tupleflow.Parameters.Value;
24 import galago.tupleflow.Processor;
25 import galago.tupleflow.Step;
26 import galago.tupleflow.TupleFlowParameters;
27 import galago.tupleflow.TypeReader;
28 import galago.tupleflow.execution.Verified;
29 import java.io.IOException;
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.io.File;
34 /**
36 * @author trevor
38 @Verified
39 @OutputClass(className = "com.yahoo.pig.impl.galago.Tuple")
40 public class JoinTuples implements ExNihiloSource<Tuple> {
42 TypeReader[] readers;
43 public Processor<Tuple> processor;
44 PigContext context;
45 DataCollector evalPipe;
46 ArrayList<Input> inputs = new ArrayList<Input>();
48 public class Input {
50 public String name;
51 public GroupBySpec group;
52 public TypeReader<Tuple> reader;
53 public Tuple top;
54 public DataBag bag;
55 public boolean isInner;
57 public void read() throws IOException {
58 top = reader.read();
62 @SuppressWarnings("unchecked")
63 public JoinTuples(TupleFlowParameters parameters) throws IOException {
64 context = new PigContext(ExecType.GALAGO);
65 // BUGBUG: probably need to freeze-dry the context
66 String evalSpec = parameters.getXML().get("eval", "");
68 List<Value> inputs = parameters.getXML().list("input");
69 for (int i = 0; i < inputs.size(); i++) {
70 Input input = new Input();
71 input.name = inputs.get(i).get("name");
72 String groupSpec = inputs.get(i).get("group");
73 input.group = new GroupBySpec(context, groupSpec);
74 input.isInner = input.group.isInner;
75 input.top = null;
76 input.bag = BagFactory.getInstance().getNewBag();
77 input.reader = parameters.getTypeReader(input.name);
78 this.inputs.add(input);
81 evalPipe = (new EvalSpecPipe(context, evalSpec)).collector(new JoinCollector());
84 public class JoinCollector extends DataCollector {
86 public void add(com.yahoo.pig.data.Tuple t) throws IOException {
87 if (t == null) {
88 return;
90 Tuple g = new Tuple();
91 g.copyFrom(t);
92 processor.process(g);
96 public boolean isDone() {
97 for (Input input : inputs) {
98 if (input.top != null) {
99 return false;
103 return true;
106 private void initializeTemporaryDirectory() {
107 String temporary = System.getProperty( "java.io.tmpdir" );
108 BagFactory.init( new File(temporary) );
111 public void run() throws IOException {
112 for (Input input : inputs) {
113 input.read();
116 initializeTemporaryDirectory();
118 while (!isDone()) {
119 // find the smallest tuple
120 Datum groupName = null;
122 for (Input input : inputs) {
123 if (input.top != null) {
124 Datum first = input.top.getField(0);
126 if (groupName == null) {
127 groupName = first;
128 } else if (groupName.compareTo(first) < 0) {
129 groupName = first;
134 Tuple result = new Tuple(1 + inputs.size());
135 result.setField(0, groupName);
137 // now, add to the tuple
138 for (int i = 0; i < inputs.size(); i++) {
139 Input input = inputs.get(i);
140 DataBag bag = BagFactory.getInstance().getNewBag();
142 while (input.top != null && input.top.getField(0).equals(groupName)) {
143 bag.add((com.yahoo.pig.data.Tuple) input.top.getField(1));
144 input.read();
147 input.bag = bag;
148 result.setField(i + 1, bag);
151 boolean usableTuple = true;
153 for (int i = 0; i < inputs.size(); i++) {
154 Input input = inputs.get(i);
156 if (input.isInner && input.bag.isEmpty()) {
157 usableTuple = false;
161 if (usableTuple) {
162 evalPipe.add(result);
163 evalPipe.add(null);
167 processor.close();
170 public void setProcessor(Step step) throws IncompatibleProcessorException {
171 Linkage.link(this, step);