Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / eval / NestableEvalItem.java
blob63d39b0b805ac56bbccf65035436110040c7046c
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.eval;
7 import java.io.IOException;
8 import java.util.HashMap;
9 import java.util.Iterator;
10 import java.util.Map;
12 import com.yahoo.pig.BagEvalFunc;
13 import com.yahoo.pig.builtin.ShellBagEvalFunc;
14 import com.yahoo.pig.data.BagFactory;
15 import com.yahoo.pig.data.DataAtom;
16 import com.yahoo.pig.data.DataBag;
17 import com.yahoo.pig.data.DataCollector;
18 import com.yahoo.pig.data.Datum;
19 import com.yahoo.pig.data.Tuple;
20 import com.yahoo.pig.impl.PigContext;
21 import com.yahoo.pig.impl.eval.func.FuncEvalItem;
22 import com.yahoo.pig.impl.logicalLayer.schema.SchemaItem;
24 public abstract class NestableEvalItem extends EvalItem {
25 public EvalSpecPipe nestedEval = null;
26 public EvalItemList subColSpec = null;
27 Map<Datum, DataCollector> persistentCollectors;
29 public NestableEvalItem(PigContext pigContext) {
30 super(pigContext);
31 persistentCollectors = new HashMap<Datum, DataCollector>();
34 public void addNestedEvalSpec(EvalSpec spec) {
35 if (nestedEval == null) nestedEval = new EvalSpecPipe(pigContext);
37 nestedEval.add(spec);
40 public boolean isSimple() {
41 if (this instanceof FuncEvalItem && ((FuncEvalItem)this).func instanceof BagEvalFunc)
42 return false;
43 return (subColSpec == null);
46 protected void atomEval(DataAtom d, DataCollector output) throws IOException {
47 if (subColSpec != null || nestedEval != null) throw new IOException("Illegal nested evaluation on DataAtom.");
49 output.add(new Tuple(d));
52 protected void tupleEval(Tuple d, DataCollector output) throws IOException {
53 if (subColSpec != null) output = subColSpec.collector(output);
54 else output = tupleifyCollector(output);
56 if (nestedEval != null) output = nestedEval.noSADCollector(output);
58 output.add(d);
61 protected DataCollector setupBagProcessingPipeline(Datum amendKey, DataCollector output) {
63 if (amendKey != null && persistentCollectors.containsKey(amendKey)) {
64 // reuse existing processing pipeline
65 output = persistentCollectors.get(amendKey);
66 } else {
67 // set up new processing pipeline
68 if (subColSpec != null) output = subColSpec.collector(output);
69 else output = bagifyCollector(output);
71 if (nestedEval != null) output = nestedEval.collector(output);
74 // if this is an amendable stream, remember this processing pipeline
75 if (amendKey != null) persistentCollectors.put(amendKey, output);
78 return output;
81 protected void bagEval(DataBag d, Datum amendKey, DataCollector output) throws IOException {
83 output = setupBagProcessingPipeline(amendKey, output);
85 // insert tuples into processing pipeline
86 for (Iterator<Tuple> it = ((DataBag) d).content(); it.hasNext(); ) {
87 output.add(it.next());
89 output.add(null); // EOF marker
92 // wrap each input in a tuple
93 protected DataCollector tupleifyCollector(final DataCollector output) {
94 return new DataCollector(output) {
96 public void add(Tuple t) throws IOException {
97 if (t != null) output.add(new Tuple(t));
98 else output.add(null); // propagate EOF
104 // stuff inputs into a bag, and wrap bag in a tuple
105 protected DataCollector bagifyCollector(final DataCollector output) {
106 return new DataCollector(output) {
107 DataBag bag = null;
109 public void add(Tuple t) throws IOException {
110 if (bag == null) bag = BagFactory.getInstance().getNewBag();
112 if (t != null) {
113 bag.add(t);
114 } else {
115 output.add(new Tuple(bag));
116 output.add(null); // EOF
124 protected abstract SchemaItem mapInputSchemaInitial(SchemaItem input);
126 @Override
127 public SchemaItem mapInputSchema(SchemaItem input){
128 if (schema!=null)
129 return schema;
130 SchemaItem output = mapInputSchemaInitial(input);
131 if (nestedEval != null)
132 output = nestedEval.mapInputSchema(output);
134 return output;