Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / eval / EvalItemList.java
blob158de81277a502552563538f892695fa1ae1c83f
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.eval;
7 import java.io.*;
8 import java.util.*;
10 import com.yahoo.pig.AlgebraicAtomEvalFunc;
11 import com.yahoo.pig.AlgebraicBagEvalFunc;
12 import com.yahoo.pig.AlgebraicTupleEvalFunc;
13 import com.yahoo.pig.EvalFunc;
14 import com.yahoo.pig.data.*;
15 import com.yahoo.pig.impl.PigContext;
16 import com.yahoo.pig.impl.eval.func.*;
17 import com.yahoo.pig.impl.logicalLayer.parser.ParseException;
18 import com.yahoo.pig.impl.logicalLayer.parser.QueryParser;
19 import com.yahoo.pig.impl.logicalLayer.schema.SchemaItem;
20 import com.yahoo.pig.impl.logicalLayer.schema.SchemaItemList;
22 public class EvalItemList extends EvalSpec {
23 public List<EvalItem> columns = null;
24 boolean isJustStar;
25 int driver;
26 PigContext pigContext;
27 LinkedList<CrossProductItem> pendingCrossProducts = new LinkedList<CrossProductItem>();
29 /**
30 * Creates an empty EvalItemList
32 public EvalItemList(PigContext pigContext) {
33 this.pigContext = pigContext;
34 columns = new ArrayList<EvalItem>();
37 /**
38 * Creates an EvalSpec from an input string, by parsing it
40 public EvalItemList(PigContext pigContext, String specStr) throws IOException, ParseException {
41 this.pigContext = pigContext;
42 ByteArrayInputStream in = new ByteArrayInputStream(specStr.getBytes());
43 QueryParser parser = new QueryParser(in, pigContext, null);
44 columns = parser.PEvalItemList().columns;
47 /**
48 * add an item to projection list
50 public void add(EvalItem item) {
51 columns.add(item);
54 public boolean isSimple(){
55 selectDriver();
56 if (driver==-1)
57 return true;
58 else
59 return false;
62 /**
63 * creates a data collector that executes this EvalItemList
65 public DataCollector collector(final DataCollector output) {
66 isJustStar = isJustStar();
67 selectDriver();
69 return new DataCollector(output) {
71 public void add(Tuple t) throws IOException {
72 if (t != null){
73 if (pendingCrossProducts.peek()!=null)
74 System.err.println(Thread.currentThread().getName() + ": Before eval, head of queue was " + pendingCrossProducts.peek() + " with input " + pendingCrossProducts.peek().cpiInput.hashCode());
75 eval(t, output);
76 if (pendingCrossProducts.peek()!=null)
77 System.err.println(Thread.currentThread().getName() + ": After eval, head of queue was " + pendingCrossProducts.peek() + " with input " + pendingCrossProducts.peek().cpiInput.hashCode());
79 }else{
80 for (EvalItem item: columns){
81 if (driver==-1 || columns.get(driver)!=item)
82 item.finish();
84 while (!pendingCrossProducts.isEmpty()){
85 CrossProductItem cpi = pendingCrossProducts.remove();
86 System.err.println(Thread.currentThread().getName() + ": Dequeuing " + cpi + "having input " + cpi.cpiInput.hashCode());
87 cpi.waitToBeReady();
88 cpi.exec();
90 if (driver!=-1)
91 columns.get(driver).finish();
92 synchronized(output){
93 while (output.isStale()){
94 try{
95 output.wait();
96 }catch(InterruptedException e){}
99 output.add(null); // pass on EOF
106 * only handles cases with no nested projection or subcols or functions
108 public Tuple simpleEval(Tuple input) throws IOException {
109 // make it go fast in the case of '*'
110 if (isJustStar)
111 return input;
113 ArrayList<EvalItem> items = expandStars(columns, input.arity());
115 Tuple output = new Tuple(items.size());
117 for (int i = 0; i < items.size(); i++) {
118 output.setField(i, items.get(i).simpleEval(input));
121 return output;
124 private class CrossProductItem{
125 DataBag[] toBeCrossed;
126 final Tuple cpiInput;
127 DataCollector output, assembleTuples;
129 public CrossProductItem(Tuple input, DataCollector output) throws IOException{
131 this.cpiInput = new Tuple(input.arity());
132 for (int i=0; i<input.arity(); i++){
133 cpiInput.setField(i, input.getField(i));
136 this.output = output;
138 // materialize data for all to-be-crossed items
139 // (except driver, which is done in streaming fashion)
140 toBeCrossed = new DataBag[columns.size()];
141 for (int i = 0; i < columns.size(); i++) {
142 if (i == driver)
143 continue;
145 EvalItem item = columns.get(i);
146 if (!item.isSimple()) {
147 toBeCrossed[i] = BagFactory.getInstance().getNewBag();
148 item.eval(input, toBeCrossed[i]);
153 public void initAssembler(){
154 if (assembleTuples!=null)
155 return;
157 final int numItems = columns.size();
159 assembleTuples = new DataCollector(output) {
161 public void add(Tuple t) throws IOException {
163 if (t == null) { // EOF
164 output.add(null);
165 return;
168 // create one iterator per to-be-crossed bag
169 Iterator[] its = new Iterator[numItems];
170 int numIts = 0;
171 for (int i = 0; i < numItems; i++) {
172 if (toBeCrossed[i] != null) {
173 numIts++;
174 its[i] = toBeCrossed[i].content();
175 if (!its[i].hasNext())
176 return; // one of inputs is empty, so cross-prod yields empty result
180 Tuple[] lastOutput = null;
181 Tuple[] outData = new Tuple[numItems];
183 boolean done = false;
184 while (!done) {
185 if (lastOutput == null) { // we're generating our first output
186 for (int i = 0; i < numItems; i++) {
187 if (i == driver)
188 outData[i] = t;
189 else if (toBeCrossed[i] == null)
190 outData[i] = new Tuple(columns.get(i).simpleEval(cpiInput));
191 else
192 outData[i] = (Tuple) its[i].next();
194 } else {
195 boolean needToAdvance = true;
197 for (int i = 0; i < numItems; i++) {
198 if (its[i] != null && needToAdvance) {
199 if (its[i].hasNext()) {
200 outData[i] = (Tuple) its[i].next();
201 needToAdvance = false;
202 } else {
203 its[i] = toBeCrossed[i].content(); // rewind iterator
204 outData[i] = (Tuple) its[i].next();
205 // still need to advance some other input..
207 } else {
208 outData[i] = lastOutput[i]; // use same value as last time
213 // check for completion:
214 done = true;
215 if (numIts > 0) { // done iff all iterators empty
216 for (int i = 0; i < numItems; i++) {
217 if (its[i] != null && its[i].hasNext()) {
218 done = false;
219 break;
224 output.add(new Tuple(outData));
226 lastOutput = outData;
233 public boolean isReady(){
234 for (int i=0; i<toBeCrossed.length; i++){
235 if (toBeCrossed[i]!=null && toBeCrossed[i].isStale())
236 return false;
239 return true;
242 public void waitToBeReady(){
243 for (int i=0; i<toBeCrossed.length; i++){
244 if (toBeCrossed[i]!=null){
245 synchronized(toBeCrossed[i]){
246 while (toBeCrossed[i].isStale()){
247 try{
248 toBeCrossed[i].wait();
249 }catch (InterruptedException e){}
257 public void exec() throws IOException{
258 initAssembler();
259 columns.get(driver).eval(cpiInput, assembleTuples);
260 System.err.println(Thread.currentThread().getName() + ": Executing driver on " + cpiInput);
261 output.markStale(false);
266 * Process the actual eval of this EvalItemList
267 * @param input
268 * @param output
269 * @throws IOException
271 private void eval(Tuple input, DataCollector output) throws IOException {
272 // make it go fast in the case of single item
273 if (columns.size() == 1) {
274 columns.get(0).eval(input, output);
275 return;
278 if (driver == -1) { // all items are simple, so do simpleEval()
279 output.add(simpleEval(input));
280 return;
283 // general case (use driver method):
284 CrossProductItem cpi = new CrossProductItem(input,output);
286 pendingCrossProducts.addLast(cpi);
289 //Since potentially can return without filling output, mark output as stale
290 //the exec method of CrossProductItem will mark output as not stale
291 output.markStale(true);
292 while (pendingCrossProducts.size() > 0 && pendingCrossProducts.peek().isReady()){
293 pendingCrossProducts.remove().exec();
297 private void selectDriver() {
298 driver = -1;
300 for (int i = 0; i < columns.size(); i++) {
301 EvalItem item = columns.get(i);
302 if (!item.isSimple()) {
303 if (item instanceof BagFuncEvalItem) { // trumps 'em all
304 driver = i;
305 return;
306 } else {
307 driver = i; // we'll use this as the driver, unless something better comes along
314 * convert each star column into a list of regular columns.
316 public ArrayList<EvalItem> expandStars(List<EvalItem> items, int nCols) {
317 ArrayList<EvalItem> output = new ArrayList<EvalItem>();
319 for (Iterator<EvalItem> it = items.iterator(); it.hasNext();) {
320 EvalItem item = it.next();
322 if (item instanceof StarEvalItem) {
323 for (int col = 0; col < nCols; col++)
324 output.add(new ColEvalItem(pigContext, col));
325 } else {
326 output.add(item);
329 return output;
333 * Determine if this instance of EvalItems is a candiate for algebraic
334 * evaluation. This means it contains an Algebraic Function, and does not
335 * contain repeated references to column used by the algebraic function.
336 * @return
338 public boolean amenableToCombiner() {
339 for (Iterator<EvalItem> it = columns.iterator(); it.hasNext();) {
340 EvalItem item = it.next();
341 if (item instanceof FuncEvalItem) {
342 EvalFunc func = ((FuncEvalItem) item).func;
343 if ((func instanceof AlgebraicBagEvalFunc || func instanceof AlgebraicAtomEvalFunc || func instanceof AlgebraicTupleEvalFunc)
344 && item.isSimple()) {
345 for (EvalItem arg : ((FuncEvalItem) item).args.columns) {
346 if (arg instanceof ColEvalItem) {
347 if (!checkForUnique((FuncEvalItem) item, (ColEvalItem) arg)) {
348 System.out.println("Column is not unique!");
349 return false; // repeated column reference, no dice
351 } else {
352 return false;
355 return true;
356 } else if (item instanceof StarEvalItem) {
357 System.out.println("Star precludes algebraic!");
358 return false;
362 return false;
366 * Helper function for amenableToCombiner(). This will recurse a function
367 * eval spec and ensure that input columns are not repeatedly reference.
368 * @param func
369 * @param col
370 * @return
372 public boolean checkForUnique(FuncEvalItem func, ColEvalItem col) {
373 for (EvalItem item : columns) {
374 if (item instanceof FuncEvalItem) {
375 FuncEvalItem fi = (FuncEvalItem) item;
376 for (EvalItem ei : fi.args.columns) {
377 if (ei instanceof ColEvalItem) {
378 // You will need to make sure you don't disqualify
379 // due to matching against the arguments
380 if (fi == func && ei == col) {
381 continue;
383 if (!fi.args.checkForUnique(func, col)) {
384 return false;
388 } else if (item instanceof ColEvalItem) {
389 ColEvalItem ci = (ColEvalItem) item;
390 if (ci.colNum == col.colNum) {
391 return false;
393 } else if (item instanceof StarEvalItem) {
394 return false;
397 return true;
400 public String toString() {
401 StringBuffer sb = new StringBuffer();
402 for (Iterator<EvalItem> it = columns.iterator(); it.hasNext();) {
403 sb.append(it.next());
404 if (it.hasNext())
405 sb.append(", ");
408 return sb.toString();
411 public List<String> getFuncs() {
412 List<String> funcs = new ArrayList<String>();
413 for (Iterator<EvalItem> it = columns.iterator(); it.hasNext();) {
414 funcs.addAll(it.next().getFuncs());
416 return funcs;
419 public boolean isJustStar() {
420 return (columns.size() == 1 && columns.get(0) instanceof StarEvalItem);
423 public SchemaItem mapInputSchema(SchemaItem input) {
424 SchemaItemList output = new SchemaItemList();
426 Set<String> aliases = new HashSet<String>();
427 Set<String> conflictedAliases = new HashSet<String>();
429 for (EvalItem item: columns) {
430 SchemaItem sItem = item.mapInputSchema(input);
432 if (item instanceof NestableEvalItem && ((NestableEvalItem)item).subColSpec!=null){
433 for(SchemaItem flattenedItem: sItem.flatten()){
435 if (aliases.contains(flattenedItem.alias))
436 conflictedAliases.add(flattenedItem.alias);
437 else
438 aliases.add(flattenedItem.alias);
440 }else{
441 if (aliases.contains(sItem.alias))
442 conflictedAliases.add(sItem.alias);
446 for (EvalItem item: columns) {
447 SchemaItem sItem = item.mapInputSchema(input);
449 if (item instanceof NestableEvalItem && ((NestableEvalItem)item).subColSpec!=null){
450 for (SchemaItem flattenedItem: sItem.flatten()){
451 if (conflictedAliases.contains(flattenedItem.alias)){
452 if (sItem.alias!=null){
453 flattenedItem.alias = sItem.alias + "::" + flattenedItem.alias;
456 output.add(flattenedItem);
458 }else{
459 output.add(sItem);
462 return output;
465 public Comparator<Tuple> getComparator() {
466 return new Comparator<Tuple>() {
467 private List<EvalItem> cols = columns;
469 public int compare(Tuple t1, Tuple t2) {
470 for (Iterator<EvalItem> it = cols.iterator(); it.hasNext();) {
471 EvalItem itm = it.next();
473 int comp = 0;
475 if (itm instanceof StarEvalItem) {
476 comp = t1.compareTo(t2);
477 } else if (itm instanceof ColEvalItem) {
478 int colNum = ((ColEvalItem) itm).colNum;
479 try {
480 comp = t1.getField(colNum).compareTo(t2.getField(colNum));
481 } catch (IOException e) {
482 throw new RuntimeException(e.getMessage());
484 } else if (itm instanceof ConstEvalItem) {
485 comp = 0;
486 } else {
487 throw new RuntimeException("Internal error: unexpected EvalItem type.");
490 if (comp != 0)
491 return comp;
494 return 0;