Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / eval / EvalSpecPipe.java
blob08d8195d9a6dc58eea4254cfb21222ac35d51db8
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.eval;
7 import java.io.ByteArrayInputStream;
8 import java.io.IOException;
9 import java.util.*;
11 import com.yahoo.pig.AlgebraicAtomEvalFunc;
12 import com.yahoo.pig.AtomEvalFunc;
13 import com.yahoo.pig.EvalFunc;
14 import com.yahoo.pig.TupleEvalFunc;
15 import com.yahoo.pig.data.*;
16 import com.yahoo.pig.impl.PigContext;
17 import com.yahoo.pig.impl.eval.filter.FilterSpec;
18 import com.yahoo.pig.impl.eval.func.AtomFuncEvalItem;
19 import com.yahoo.pig.impl.eval.func.FuncEvalItem;
20 import com.yahoo.pig.impl.eval.func.TupleFuncEvalItem;
21 import com.yahoo.pig.impl.eval.sad.SADSpec;
22 import com.yahoo.pig.impl.logicalLayer.parser.ParseException;
23 import com.yahoo.pig.impl.logicalLayer.parser.QueryParser;
24 import com.yahoo.pig.impl.logicalLayer.schema.SchemaItem;
26 public class EvalSpecPipe {
27 public List<EvalSpec> specs;
28 PigContext pigContext;
30 public EvalSpecPipe(PigContext pigContext) {
31 this.pigContext = pigContext;
32 specs = new ArrayList<EvalSpec>();
35 public EvalSpecPipe(PigContext pigContext, List<EvalSpec> specs) {
36 this.pigContext = pigContext;
37 this.specs = specs;
40 /**
41 * Creates an EvalSpecPipe from an input string, by parsing it
43 public EvalSpecPipe(PigContext pigContext, String specStr) throws IOException {
44 this.pigContext = pigContext;
45 ByteArrayInputStream in = new ByteArrayInputStream(specStr.getBytes());
46 QueryParser parser = new QueryParser(in, pigContext, null);
47 try {
48 specs = parser.PEvalSpecPipe().specs;
49 } catch (ParseException e) {
50 throw new IOException(e.getMessage());
55 // creates a data collector that executes this EvalSpecPipe
56 public DataCollector collector(DataCollector output) {
58 // create datacollector pipeline
59 for (int i = specs.size() - 1; i >= 0; i--) {
60 output = specs.get(i).collector(output);
63 return output;
66 // creates a data collector, but ignores all SAD specs (to be used if SAD is applied to a tuple
67 // rather than a bag)
68 public DataCollector noSADCollector(DataCollector output) {
70 // create datacollector pipeline w/out SAD's
71 for (int i = specs.size() - 1; i >= 0; i--) {
72 if (!(specs.get(i) instanceof SADSpec)) {
73 output = specs.get(i).collector(output);
77 return output;
80 public Tuple simpleEval(Tuple input) throws IOException {
81 Tuple t = input;
83 for (int i = 0; i < specs.size(); i++) {
84 EvalSpec spec = specs.get(i);
85 if (!(spec instanceof EvalItemList)) {
86 throw new IOException("Internal error: Invalid use of simpleEval().");
89 t = ((EvalItemList) spec).simpleEval(t);
92 return t;
95 public DataBag simpleEval(DataBag b) throws IOException {
97 if (specs.isEmpty()) {
98 return b;
99 } else {
101 // create datacollector pipeline
102 DataBag result = BagFactory.getInstance().getNewBag();
103 DataCollector collector = collector(result);
105 // push data into datacollector pipeline
106 for (Iterator<Tuple> it = b.content(); it.hasNext();) {
107 collector.add(it.next());
109 collector.add(null); // EOF
111 return result;
116 public Datum simpleEval(Datum input) throws IOException {
118 if (input instanceof DataBag) {
119 return simpleEval((DataBag) input);
120 } else if (input instanceof Tuple) {
121 return simpleEval((Tuple) input);
122 } else {
123 throw new IOException("Error: cannot apply nested projection to DataAtom.");
127 public void add(EvalSpec spec) {
129 // add spec to pipe, but avoid redundant "star" specs (just adds unnecessary eval overhead)
131 if (specs.isEmpty()) {
132 specs.add(spec);
133 } else if (!spec.isJustStar()) {
135 int lastSpec = specs.size() - 1;
136 if (specs.get(lastSpec).isJustStar()) {
137 specs.remove(lastSpec);
140 specs.add(spec);
144 public void add(EvalSpecPipe otherPipe) {
145 for (Iterator<EvalSpec> it = otherPipe.specs(); it.hasNext();) {
146 add(it.next());
150 public Iterator<EvalSpec> specs() {
151 return specs.iterator();
154 public String toString() {
155 StringBuffer sb = new StringBuffer();
156 for (Iterator<EvalSpec> it = specs.iterator(); it.hasNext();) {
157 EvalSpec es = it.next();
158 sb.append("[");
159 if (es instanceof EvalItemList) sb.append("EVAL ");
160 else if (es instanceof FilterSpec) sb.append("FILTER ");
161 sb.append(es);
162 sb.append("]");
165 return sb.toString();
168 public List<String> getFuncs() {
169 List<String> funcs = new ArrayList<String>();
170 for (Iterator<EvalSpec> it = specs.iterator(); it.hasNext();) {
171 funcs.addAll(it.next().getFuncs());
173 return funcs;
177 * This function will return a new EvalItemList which is
178 * the combine portion of an algbraic evaluation of the
179 * EvalSpecPipe instance. Before calling this method and
180 * using the result, users should call amenableToCombiner() to
181 * validate that the EvalSpecPipe *can* be used in a algebraic
182 * manner.
183 * @return
185 public EvalItemList makeIntoToCombine() {
186 // This function takes the first spec in the pipe
187 // and searches it for the first instance of an
188 // algebraic function to split the processing on
189 // =============================================
190 EvalItemList first = (EvalItemList) specs.get(0);
191 EvalItemList combine = new EvalItemList(pigContext);
192 boolean added = false;
193 for (Iterator<EvalItem> it = first.columns.iterator(); it.hasNext();) {
194 EvalItem item = it.next();
195 if (item instanceof FuncEvalItem && !added) {
196 EvalFunc func = ((FuncEvalItem) item).func;
197 // this is our algebraic function, create a new function and break
198 if (func instanceof AlgebraicAtomEvalFunc) {
199 TupleEvalFunc init = ((AlgebraicAtomEvalFunc) func).getInitial();
200 FuncEvalItem fei = new TupleFuncEvalItem(pigContext, init, ((FuncEvalItem) item).args);
201 combine.add(fei);
202 added = true;
203 break;
206 //combine.add(item);
208 return combine;
212 * This function will return a new EvalItemList which is
213 * the reduce portion of an algbraic evaluation of the
214 * EvalSpecPipe instance. Before calling this method and
215 * using the result, users should call amenableToCombiner() to
216 * validate that the EvalSpecPipe *can* be used in a algebraic
217 * manner.
218 * @return
220 public EvalItemList makeIntoToReduce() {
221 // This function finds the proper algrebraic function which was
222 // processed by makeIntoToCombine() and then creates the proper
223 // spec for the reduce portion of the job.
224 //========================================
225 EvalItemList first = (EvalItemList) specs.get(0);
226 EvalItemList reduce = new EvalItemList(pigContext);
227 boolean firstItem = true;
228 boolean added = false;
229 int colPos = 1; // if group was not specificed
231 for (EvalItem item : first.columns) {
232 // We have to see if the group column was used in the eval spec
233 // because it is implicitly included in the output of co-group
234 // and columns need to be shifted appro.
235 //======================================
236 if( item instanceof ColEvalItem && firstItem) {
237 ColEvalItem col = (ColEvalItem) item;
238 if(col.colNum == 0 || (col.alias != null & col.alias.equals("group"))) {
239 colPos--;
241 } else if (item instanceof FuncEvalItem && !added) {
242 EvalFunc func = ((FuncEvalItem) item).func;
243 // this is our algebraic function!
244 if (func instanceof AlgebraicAtomEvalFunc) {
245 AtomEvalFunc fin = ((AlgebraicAtomEvalFunc) func).getFinal();
246 ColEvalItem cei = new ColEvalItem(pigContext, colPos);
247 EvalItemList newargs = new EvalItemList(pigContext);
249 newargs.add(cei);
250 FuncEvalItem fei = new AtomFuncEvalItem(pigContext, fin, newargs);
252 // don't forget to add the post-processing specs to the
253 // output of the algbraic function
254 //================================
255 fei.nestedEval = ((FuncEvalItem) item).nestedEval;
256 fei.subColSpec = ((FuncEvalItem) item).subColSpec;
257 reduce.add(fei);
258 added = true;
259 continue;
262 reduce.add(item);
263 colPos++;
264 firstItem = false;
266 return reduce;
271 * Process the pipe and determine if the pipe is a candidate for
272 * algebraic evaluation. This is mostly reliant on determining if
273 * the EvalItemLists contain algebraic functions and have unique column
274 * references.
275 * @return
277 public boolean amenableToCombiner() {
278 if (specs.size() > 0 && specs.get(0) instanceof EvalItemList) {
279 EvalItemList first = (EvalItemList) specs.get(0);
280 return first.amenableToCombiner();
281 } else {
282 return false;
287 * Given an input schema, determine the output schema of this pipe
288 * as it operates on input tuples with the input schema.
289 * @param input
290 * @return
292 public SchemaItem mapInputSchema(SchemaItem input) {
293 SchemaItem output = input;
294 for(EvalSpec spec : specs) {
295 output = spec.mapInputSchema(output);
297 return output;