2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.eval
;
10 import com
.yahoo
.pig
.AlgebraicAtomEvalFunc
;
11 import com
.yahoo
.pig
.AlgebraicBagEvalFunc
;
12 import com
.yahoo
.pig
.AlgebraicTupleEvalFunc
;
13 import com
.yahoo
.pig
.EvalFunc
;
14 import com
.yahoo
.pig
.data
.*;
15 import com
.yahoo
.pig
.impl
.PigContext
;
16 import com
.yahoo
.pig
.impl
.eval
.func
.*;
17 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.ParseException
;
18 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.QueryParser
;
19 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItem
;
20 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItemList
;
22 public class EvalItemList
extends EvalSpec
{
23 public List
<EvalItem
> columns
= null;
26 PigContext pigContext
;
27 LinkedList
<CrossProductItem
> pendingCrossProducts
= new LinkedList
<CrossProductItem
>();
30 * Creates an empty EvalItemList
32 public EvalItemList(PigContext pigContext
) {
33 this.pigContext
= pigContext
;
34 columns
= new ArrayList
<EvalItem
>();
38 * Creates an EvalSpec from an input string, by parsing it
40 public EvalItemList(PigContext pigContext
, String specStr
) throws IOException
, ParseException
{
41 this.pigContext
= pigContext
;
42 ByteArrayInputStream in
= new ByteArrayInputStream(specStr
.getBytes());
43 QueryParser parser
= new QueryParser(in
, pigContext
, null);
44 columns
= parser
.PEvalItemList().columns
;
48 * add an item to projection list
50 public void add(EvalItem item
) {
54 public boolean isSimple(){
63 * creates a data collector that executes this EvalItemList
65 public DataCollector
collector(final DataCollector output
) {
66 isJustStar
= isJustStar();
69 return new DataCollector(output
) {
71 public void add(Tuple t
) throws IOException
{
73 if (pendingCrossProducts
.peek()!=null)
74 System
.err
.println(Thread
.currentThread().getName() + ": Before eval, head of queue was " + pendingCrossProducts
.peek() + " with input " + pendingCrossProducts
.peek().cpiInput
.hashCode());
76 if (pendingCrossProducts
.peek()!=null)
77 System
.err
.println(Thread
.currentThread().getName() + ": After eval, head of queue was " + pendingCrossProducts
.peek() + " with input " + pendingCrossProducts
.peek().cpiInput
.hashCode());
80 for (EvalItem item
: columns
){
81 if (driver
==-1 || columns
.get(driver
)!=item
)
84 while (!pendingCrossProducts
.isEmpty()){
85 CrossProductItem cpi
= pendingCrossProducts
.remove();
86 System
.err
.println(Thread
.currentThread().getName() + ": Dequeuing " + cpi
+ "having input " + cpi
.cpiInput
.hashCode());
91 columns
.get(driver
).finish();
93 while (output
.isStale()){
96 }catch(InterruptedException e
){}
99 output
.add(null); // pass on EOF
106 * only handles cases with no nested projection or subcols or functions
108 public Tuple
simpleEval(Tuple input
) throws IOException
{
109 // make it go fast in the case of '*'
113 ArrayList
<EvalItem
> items
= expandStars(columns
, input
.arity());
115 Tuple output
= new Tuple(items
.size());
117 for (int i
= 0; i
< items
.size(); i
++) {
118 output
.setField(i
, items
.get(i
).simpleEval(input
));
124 private class CrossProductItem
{
125 DataBag
[] toBeCrossed
;
126 final Tuple cpiInput
;
127 DataCollector output
, assembleTuples
;
129 public CrossProductItem(Tuple input
, DataCollector output
) throws IOException
{
131 this.cpiInput
= new Tuple(input
.arity());
132 for (int i
=0; i
<input
.arity(); i
++){
133 cpiInput
.setField(i
, input
.getField(i
));
136 this.output
= output
;
138 // materialize data for all to-be-crossed items
139 // (except driver, which is done in streaming fashion)
140 toBeCrossed
= new DataBag
[columns
.size()];
141 for (int i
= 0; i
< columns
.size(); i
++) {
145 EvalItem item
= columns
.get(i
);
146 if (!item
.isSimple()) {
147 toBeCrossed
[i
] = BagFactory
.getInstance().getNewBag();
148 item
.eval(input
, toBeCrossed
[i
]);
153 public void initAssembler(){
154 if (assembleTuples
!=null)
157 final int numItems
= columns
.size();
159 assembleTuples
= new DataCollector(output
) {
161 public void add(Tuple t
) throws IOException
{
163 if (t
== null) { // EOF
168 // create one iterator per to-be-crossed bag
169 Iterator
[] its
= new Iterator
[numItems
];
171 for (int i
= 0; i
< numItems
; i
++) {
172 if (toBeCrossed
[i
] != null) {
174 its
[i
] = toBeCrossed
[i
].content();
175 if (!its
[i
].hasNext())
176 return; // one of inputs is empty, so cross-prod yields empty result
180 Tuple
[] lastOutput
= null;
181 Tuple
[] outData
= new Tuple
[numItems
];
183 boolean done
= false;
185 if (lastOutput
== null) { // we're generating our first output
186 for (int i
= 0; i
< numItems
; i
++) {
189 else if (toBeCrossed
[i
] == null)
190 outData
[i
] = new Tuple(columns
.get(i
).simpleEval(cpiInput
));
192 outData
[i
] = (Tuple
) its
[i
].next();
195 boolean needToAdvance
= true;
197 for (int i
= 0; i
< numItems
; i
++) {
198 if (its
[i
] != null && needToAdvance
) {
199 if (its
[i
].hasNext()) {
200 outData
[i
] = (Tuple
) its
[i
].next();
201 needToAdvance
= false;
203 its
[i
] = toBeCrossed
[i
].content(); // rewind iterator
204 outData
[i
] = (Tuple
) its
[i
].next();
205 // still need to advance some other input..
208 outData
[i
] = lastOutput
[i
]; // use same value as last time
213 // check for completion:
215 if (numIts
> 0) { // done iff all iterators empty
216 for (int i
= 0; i
< numItems
; i
++) {
217 if (its
[i
] != null && its
[i
].hasNext()) {
224 output
.add(new Tuple(outData
));
226 lastOutput
= outData
;
233 public boolean isReady(){
234 for (int i
=0; i
<toBeCrossed
.length
; i
++){
235 if (toBeCrossed
[i
]!=null && toBeCrossed
[i
].isStale())
242 public void waitToBeReady(){
243 for (int i
=0; i
<toBeCrossed
.length
; i
++){
244 if (toBeCrossed
[i
]!=null){
245 synchronized(toBeCrossed
[i
]){
246 while (toBeCrossed
[i
].isStale()){
248 toBeCrossed
[i
].wait();
249 }catch (InterruptedException e
){}
257 public void exec() throws IOException
{
259 columns
.get(driver
).eval(cpiInput
, assembleTuples
);
260 System
.err
.println(Thread
.currentThread().getName() + ": Executing driver on " + cpiInput
);
261 output
.markStale(false);
266 * Process the actual eval of this EvalItemList
269 * @throws IOException
271 private void eval(Tuple input
, DataCollector output
) throws IOException
{
272 // make it go fast in the case of single item
273 if (columns
.size() == 1) {
274 columns
.get(0).eval(input
, output
);
278 if (driver
== -1) { // all items are simple, so do simpleEval()
279 output
.add(simpleEval(input
));
283 // general case (use driver method):
284 CrossProductItem cpi
= new CrossProductItem(input
,output
);
286 pendingCrossProducts
.addLast(cpi
);
289 //Since potentially can return without filling output, mark output as stale
290 //the exec method of CrossProductItem will mark output as not stale
291 output
.markStale(true);
292 while (pendingCrossProducts
.size() > 0 && pendingCrossProducts
.peek().isReady()){
293 pendingCrossProducts
.remove().exec();
297 private void selectDriver() {
300 for (int i
= 0; i
< columns
.size(); i
++) {
301 EvalItem item
= columns
.get(i
);
302 if (!item
.isSimple()) {
303 if (item
instanceof BagFuncEvalItem
) { // trumps 'em all
307 driver
= i
; // we'll use this as the driver, unless something better comes along
314 * convert each star column into a list of regular columns.
316 public ArrayList
<EvalItem
> expandStars(List
<EvalItem
> items
, int nCols
) {
317 ArrayList
<EvalItem
> output
= new ArrayList
<EvalItem
>();
319 for (Iterator
<EvalItem
> it
= items
.iterator(); it
.hasNext();) {
320 EvalItem item
= it
.next();
322 if (item
instanceof StarEvalItem
) {
323 for (int col
= 0; col
< nCols
; col
++)
324 output
.add(new ColEvalItem(pigContext
, col
));
333 * Determine if this instance of EvalItems is a candiate for algebraic
334 * evaluation. This means it contains an Algebraic Function, and does not
335 * contain repeated references to column used by the algebraic function.
338 public boolean amenableToCombiner() {
339 for (Iterator
<EvalItem
> it
= columns
.iterator(); it
.hasNext();) {
340 EvalItem item
= it
.next();
341 if (item
instanceof FuncEvalItem
) {
342 EvalFunc func
= ((FuncEvalItem
) item
).func
;
343 if ((func
instanceof AlgebraicBagEvalFunc
|| func
instanceof AlgebraicAtomEvalFunc
|| func
instanceof AlgebraicTupleEvalFunc
)
344 && item
.isSimple()) {
345 for (EvalItem arg
: ((FuncEvalItem
) item
).args
.columns
) {
346 if (arg
instanceof ColEvalItem
) {
347 if (!checkForUnique((FuncEvalItem
) item
, (ColEvalItem
) arg
)) {
348 System
.out
.println("Column is not unique!");
349 return false; // repeated column reference, no dice
356 } else if (item
instanceof StarEvalItem
) {
357 System
.out
.println("Star precludes algebraic!");
366 * Helper function for amenableToCombiner(). This will recurse a function
367 * eval spec and ensure that input columns are not repeatedly reference.
372 public boolean checkForUnique(FuncEvalItem func
, ColEvalItem col
) {
373 for (EvalItem item
: columns
) {
374 if (item
instanceof FuncEvalItem
) {
375 FuncEvalItem fi
= (FuncEvalItem
) item
;
376 for (EvalItem ei
: fi
.args
.columns
) {
377 if (ei
instanceof ColEvalItem
) {
378 // You will need to make sure you don't disqualify
379 // due to matching against the arguments
380 if (fi
== func
&& ei
== col
) {
383 if (!fi
.args
.checkForUnique(func
, col
)) {
388 } else if (item
instanceof ColEvalItem
) {
389 ColEvalItem ci
= (ColEvalItem
) item
;
390 if (ci
.colNum
== col
.colNum
) {
393 } else if (item
instanceof StarEvalItem
) {
400 public String
toString() {
401 StringBuffer sb
= new StringBuffer();
402 for (Iterator
<EvalItem
> it
= columns
.iterator(); it
.hasNext();) {
403 sb
.append(it
.next());
408 return sb
.toString();
411 public List
<String
> getFuncs() {
412 List
<String
> funcs
= new ArrayList
<String
>();
413 for (Iterator
<EvalItem
> it
= columns
.iterator(); it
.hasNext();) {
414 funcs
.addAll(it
.next().getFuncs());
419 public boolean isJustStar() {
420 return (columns
.size() == 1 && columns
.get(0) instanceof StarEvalItem
);
423 public SchemaItem
mapInputSchema(SchemaItem input
) {
424 SchemaItemList output
= new SchemaItemList();
426 Set
<String
> aliases
= new HashSet
<String
>();
427 Set
<String
> conflictedAliases
= new HashSet
<String
>();
429 for (EvalItem item
: columns
) {
430 SchemaItem sItem
= item
.mapInputSchema(input
);
432 if (item
instanceof NestableEvalItem
&& ((NestableEvalItem
)item
).subColSpec
!=null){
433 for(SchemaItem flattenedItem
: sItem
.flatten()){
435 if (aliases
.contains(flattenedItem
.alias
))
436 conflictedAliases
.add(flattenedItem
.alias
);
438 aliases
.add(flattenedItem
.alias
);
441 if (aliases
.contains(sItem
.alias
))
442 conflictedAliases
.add(sItem
.alias
);
446 for (EvalItem item
: columns
) {
447 SchemaItem sItem
= item
.mapInputSchema(input
);
449 if (item
instanceof NestableEvalItem
&& ((NestableEvalItem
)item
).subColSpec
!=null){
450 for (SchemaItem flattenedItem
: sItem
.flatten()){
451 if (conflictedAliases
.contains(flattenedItem
.alias
)){
452 if (sItem
.alias
!=null){
453 flattenedItem
.alias
= sItem
.alias
+ "::" + flattenedItem
.alias
;
456 output
.add(flattenedItem
);
465 public Comparator
<Tuple
> getComparator() {
466 return new Comparator
<Tuple
>() {
467 private List
<EvalItem
> cols
= columns
;
469 public int compare(Tuple t1
, Tuple t2
) {
470 for (Iterator
<EvalItem
> it
= cols
.iterator(); it
.hasNext();) {
471 EvalItem itm
= it
.next();
475 if (itm
instanceof StarEvalItem
) {
476 comp
= t1
.compareTo(t2
);
477 } else if (itm
instanceof ColEvalItem
) {
478 int colNum
= ((ColEvalItem
) itm
).colNum
;
480 comp
= t1
.getField(colNum
).compareTo(t2
.getField(colNum
));
481 } catch (IOException e
) {
482 throw new RuntimeException(e
.getMessage());
484 } else if (itm
instanceof ConstEvalItem
) {
487 throw new RuntimeException("Internal error: unexpected EvalItem type.");