Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / physicalLayer / POEval.java
blobc1044aa8009a8ad77c1b95f4d8bc2c5888f714ba
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.physicalLayer;
7 import java.io.IOException;
9 import com.yahoo.pig.data.*;
10 import com.yahoo.pig.impl.eval.*;
11 import com.yahoo.pig.impl.util.DataBuffer;
13 // unary, non-blocking operator.
14 class POEval extends PhysicalOperator {
15 EvalSpecPipe spec;
16 private boolean alreadyOpen = false;
17 private DataBuffer buf = null;
18 private DataCollector evalPipeline = null;
19 private boolean inputDrained;
21 public POEval(PhysicalOperator input, EvalSpecPipe specIn, int outputType) {
22 super(outputType);
23 inputs = new PhysicalOperator[1];
24 inputs[0] = input;
26 spec = specIn;
29 public POEval(EvalSpecPipe specIn, int outputType) {
30 super(outputType);
31 inputs = new PhysicalOperator[1];
32 inputs[0] = null;
34 spec = specIn;
37 public boolean open(boolean continueFromLast) throws IOException {
38 if (!super.open(continueFromLast)) return false;
40 if (buf==null)
41 buf = new DataBuffer();
42 if (evalPipeline == null)
43 evalPipeline = spec.collector(buf);
45 inputDrained = false;
47 return true;
50 public Tuple getNext() throws IOException {
51 while (true) {
52 if (buf.isEmpty()){
53 // no pending outputs, so look to input to provide more food
55 if (inputDrained){
56 return null;
59 Tuple nextTuple = inputs[0].getNext();
61 if (nextTuple == null)
62 inputDrained = true;
64 evalPipeline.add(nextTuple); // if nextTuple is null, then we're inserting an EOF marker
65 }else{
66 Tuple t = buf.removeFirst();
67 return t;