2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.eval
;
7 import java
.io
.ByteArrayInputStream
;
8 import java
.io
.IOException
;
11 import com
.yahoo
.pig
.AlgebraicAtomEvalFunc
;
12 import com
.yahoo
.pig
.AtomEvalFunc
;
13 import com
.yahoo
.pig
.EvalFunc
;
14 import com
.yahoo
.pig
.TupleEvalFunc
;
15 import com
.yahoo
.pig
.data
.*;
16 import com
.yahoo
.pig
.impl
.PigContext
;
17 import com
.yahoo
.pig
.impl
.eval
.filter
.FilterSpec
;
18 import com
.yahoo
.pig
.impl
.eval
.func
.AtomFuncEvalItem
;
19 import com
.yahoo
.pig
.impl
.eval
.func
.FuncEvalItem
;
20 import com
.yahoo
.pig
.impl
.eval
.func
.TupleFuncEvalItem
;
21 import com
.yahoo
.pig
.impl
.eval
.sad
.SADSpec
;
22 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.ParseException
;
23 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.QueryParser
;
24 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItem
;
26 public class EvalSpecPipe
{
27 public List
<EvalSpec
> specs
;
28 PigContext pigContext
;
30 public EvalSpecPipe(PigContext pigContext
) {
31 this.pigContext
= pigContext
;
32 specs
= new ArrayList
<EvalSpec
>();
35 public EvalSpecPipe(PigContext pigContext
, List
<EvalSpec
> specs
) {
36 this.pigContext
= pigContext
;
41 * Creates an EvalSpecPipe from an input string, by parsing it
43 public EvalSpecPipe(PigContext pigContext
, String specStr
) throws IOException
{
44 this.pigContext
= pigContext
;
45 ByteArrayInputStream in
= new ByteArrayInputStream(specStr
.getBytes());
46 QueryParser parser
= new QueryParser(in
, pigContext
, null);
48 specs
= parser
.PEvalSpecPipe().specs
;
49 } catch (ParseException e
) {
50 throw new IOException(e
.getMessage());
55 // creates a data collector that executes this EvalSpecPipe
56 public DataCollector
collector(DataCollector output
) {
58 // create datacollector pipeline
59 for (int i
= specs
.size() - 1; i
>= 0; i
--) {
60 output
= specs
.get(i
).collector(output
);
66 // creates a data collector, but ignores all SAD specs (to be used if SAD is applied to a tuple
68 public DataCollector
noSADCollector(DataCollector output
) {
70 // create datacollector pipeline w/out SAD's
71 for (int i
= specs
.size() - 1; i
>= 0; i
--) {
72 if (!(specs
.get(i
) instanceof SADSpec
)) {
73 output
= specs
.get(i
).collector(output
);
80 public Tuple
simpleEval(Tuple input
) throws IOException
{
83 for (int i
= 0; i
< specs
.size(); i
++) {
84 EvalSpec spec
= specs
.get(i
);
85 if (!(spec
instanceof EvalItemList
)) {
86 throw new IOException("Internal error: Invalid use of simpleEval().");
89 t
= ((EvalItemList
) spec
).simpleEval(t
);
95 public DataBag
simpleEval(DataBag b
) throws IOException
{
97 if (specs
.isEmpty()) {
101 // create datacollector pipeline
102 DataBag result
= BagFactory
.getInstance().getNewBag();
103 DataCollector collector
= collector(result
);
105 // push data into datacollector pipeline
106 for (Iterator
<Tuple
> it
= b
.content(); it
.hasNext();) {
107 collector
.add(it
.next());
109 collector
.add(null); // EOF
116 public Datum
simpleEval(Datum input
) throws IOException
{
118 if (input
instanceof DataBag
) {
119 return simpleEval((DataBag
) input
);
120 } else if (input
instanceof Tuple
) {
121 return simpleEval((Tuple
) input
);
123 throw new IOException("Error: cannot apply nested projection to DataAtom.");
127 public void add(EvalSpec spec
) {
129 // add spec to pipe, but avoid redundant "star" specs (just adds unnecessary eval overhead)
131 if (specs
.isEmpty()) {
133 } else if (!spec
.isJustStar()) {
135 int lastSpec
= specs
.size() - 1;
136 if (specs
.get(lastSpec
).isJustStar()) {
137 specs
.remove(lastSpec
);
144 public void add(EvalSpecPipe otherPipe
) {
145 for (Iterator
<EvalSpec
> it
= otherPipe
.specs(); it
.hasNext();) {
150 public Iterator
<EvalSpec
> specs() {
151 return specs
.iterator();
154 public String
toString() {
155 StringBuffer sb
= new StringBuffer();
156 for (Iterator
<EvalSpec
> it
= specs
.iterator(); it
.hasNext();) {
157 EvalSpec es
= it
.next();
159 if (es
instanceof EvalItemList
) sb
.append("EVAL ");
160 else if (es
instanceof FilterSpec
) sb
.append("FILTER ");
165 return sb
.toString();
168 public List
<String
> getFuncs() {
169 List
<String
> funcs
= new ArrayList
<String
>();
170 for (Iterator
<EvalSpec
> it
= specs
.iterator(); it
.hasNext();) {
171 funcs
.addAll(it
.next().getFuncs());
177 * This function will return a new EvalItemList which is
178 * the combine portion of an algbraic evaluation of the
179 * EvalSpecPipe instance. Before calling this method and
180 * using the result, users should call amenableToCombiner() to
181 * validate that the EvalSpecPipe *can* be used in a algebraic
185 public EvalItemList
makeIntoToCombine() {
186 // This function takes the first spec in the pipe
187 // and searches it for the first instance of an
188 // algebraic function to split the processing on
189 // =============================================
190 EvalItemList first
= (EvalItemList
) specs
.get(0);
191 EvalItemList combine
= new EvalItemList(pigContext
);
192 boolean added
= false;
193 for (Iterator
<EvalItem
> it
= first
.columns
.iterator(); it
.hasNext();) {
194 EvalItem item
= it
.next();
195 if (item
instanceof FuncEvalItem
&& !added
) {
196 EvalFunc func
= ((FuncEvalItem
) item
).func
;
197 // this is our algebraic function, create a new function and break
198 if (func
instanceof AlgebraicAtomEvalFunc
) {
199 TupleEvalFunc init
= ((AlgebraicAtomEvalFunc
) func
).getInitial();
200 FuncEvalItem fei
= new TupleFuncEvalItem(pigContext
, init
, ((FuncEvalItem
) item
).args
);
212 * This function will return a new EvalItemList which is
213 * the reduce portion of an algbraic evaluation of the
214 * EvalSpecPipe instance. Before calling this method and
215 * using the result, users should call amenableToCombiner() to
216 * validate that the EvalSpecPipe *can* be used in a algebraic
220 public EvalItemList
makeIntoToReduce() {
221 // This function finds the proper algrebraic function which was
222 // processed by makeIntoToCombine() and then creates the proper
223 // spec for the reduce portion of the job.
224 //========================================
225 EvalItemList first
= (EvalItemList
) specs
.get(0);
226 EvalItemList reduce
= new EvalItemList(pigContext
);
227 boolean firstItem
= true;
228 boolean added
= false;
229 int colPos
= 1; // if group was not specificed
231 for (EvalItem item
: first
.columns
) {
232 // We have to see if the group column was used in the eval spec
233 // because it is implicitly included in the output of co-group
234 // and columns need to be shifted appro.
235 //======================================
236 if( item
instanceof ColEvalItem
&& firstItem
) {
237 ColEvalItem col
= (ColEvalItem
) item
;
238 if(col
.colNum
== 0 || (col
.alias
!= null & col
.alias
.equals("group"))) {
241 } else if (item
instanceof FuncEvalItem
&& !added
) {
242 EvalFunc func
= ((FuncEvalItem
) item
).func
;
243 // this is our algebraic function!
244 if (func
instanceof AlgebraicAtomEvalFunc
) {
245 AtomEvalFunc fin
= ((AlgebraicAtomEvalFunc
) func
).getFinal();
246 ColEvalItem cei
= new ColEvalItem(pigContext
, colPos
);
247 EvalItemList newargs
= new EvalItemList(pigContext
);
250 FuncEvalItem fei
= new AtomFuncEvalItem(pigContext
, fin
, newargs
);
252 // don't forget to add the post-processing specs to the
253 // output of the algbraic function
254 //================================
255 fei
.nestedEval
= ((FuncEvalItem
) item
).nestedEval
;
256 fei
.subColSpec
= ((FuncEvalItem
) item
).subColSpec
;
271 * Process the pipe and determine if the pipe is a candidate for
272 * algebraic evaluation. This is mostly reliant on determining if
273 * the EvalItemLists contain algebraic functions and have unique column
277 public boolean amenableToCombiner() {
278 if (specs
.size() > 0 && specs
.get(0) instanceof EvalItemList
) {
279 EvalItemList first
= (EvalItemList
) specs
.get(0);
280 return first
.amenableToCombiner();
287 * Given an input schema, determine the output schema of this pipe
288 * as it operates on input tuples with the input schema.
292 public SchemaItem
mapInputSchema(SchemaItem input
) {
293 SchemaItem output
= input
;
294 for(EvalSpec spec
: specs
) {
295 output
= spec
.mapInputSchema(output
);