2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.eval
;
7 import java
.io
.IOException
;
8 import java
.util
.HashMap
;
9 import java
.util
.Iterator
;
12 import com
.yahoo
.pig
.BagEvalFunc
;
13 import com
.yahoo
.pig
.builtin
.ShellBagEvalFunc
;
14 import com
.yahoo
.pig
.data
.BagFactory
;
15 import com
.yahoo
.pig
.data
.DataAtom
;
16 import com
.yahoo
.pig
.data
.DataBag
;
17 import com
.yahoo
.pig
.data
.DataCollector
;
18 import com
.yahoo
.pig
.data
.Datum
;
19 import com
.yahoo
.pig
.data
.Tuple
;
20 import com
.yahoo
.pig
.impl
.PigContext
;
21 import com
.yahoo
.pig
.impl
.eval
.func
.FuncEvalItem
;
22 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItem
;
24 public abstract class NestableEvalItem
extends EvalItem
{
25 public EvalSpecPipe nestedEval
= null;
26 public EvalItemList subColSpec
= null;
27 Map
<Datum
, DataCollector
> persistentCollectors
;
29 public NestableEvalItem(PigContext pigContext
) {
31 persistentCollectors
= new HashMap
<Datum
, DataCollector
>();
34 public void addNestedEvalSpec(EvalSpec spec
) {
35 if (nestedEval
== null) nestedEval
= new EvalSpecPipe(pigContext
);
40 public boolean isSimple() {
41 if (this instanceof FuncEvalItem
&& ((FuncEvalItem
)this).func
instanceof BagEvalFunc
)
43 return (subColSpec
== null);
46 protected void atomEval(DataAtom d
, DataCollector output
) throws IOException
{
47 if (subColSpec
!= null || nestedEval
!= null) throw new IOException("Illegal nested evaluation on DataAtom.");
49 output
.add(new Tuple(d
));
52 protected void tupleEval(Tuple d
, DataCollector output
) throws IOException
{
53 if (subColSpec
!= null) output
= subColSpec
.collector(output
);
54 else output
= tupleifyCollector(output
);
56 if (nestedEval
!= null) output
= nestedEval
.noSADCollector(output
);
61 protected DataCollector
setupBagProcessingPipeline(Datum amendKey
, DataCollector output
) {
63 if (amendKey
!= null && persistentCollectors
.containsKey(amendKey
)) {
64 // reuse existing processing pipeline
65 output
= persistentCollectors
.get(amendKey
);
67 // set up new processing pipeline
68 if (subColSpec
!= null) output
= subColSpec
.collector(output
);
69 else output
= bagifyCollector(output
);
71 if (nestedEval
!= null) output
= nestedEval
.collector(output
);
74 // if this is an amendable stream, remember this processing pipeline
75 if (amendKey
!= null) persistentCollectors
.put(amendKey
, output
);
81 protected void bagEval(DataBag d
, Datum amendKey
, DataCollector output
) throws IOException
{
83 output
= setupBagProcessingPipeline(amendKey
, output
);
85 // insert tuples into processing pipeline
86 for (Iterator
<Tuple
> it
= ((DataBag
) d
).content(); it
.hasNext(); ) {
87 output
.add(it
.next());
89 output
.add(null); // EOF marker
92 // wrap each input in a tuple
93 protected DataCollector
tupleifyCollector(final DataCollector output
) {
94 return new DataCollector(output
) {
96 public void add(Tuple t
) throws IOException
{
97 if (t
!= null) output
.add(new Tuple(t
));
98 else output
.add(null); // propagate EOF
104 // stuff inputs into a bag, and wrap bag in a tuple
105 protected DataCollector
bagifyCollector(final DataCollector output
) {
106 return new DataCollector(output
) {
109 public void add(Tuple t
) throws IOException
{
110 if (bag
== null) bag
= BagFactory
.getInstance().getNewBag();
115 output
.add(new Tuple(bag
));
116 output
.add(null); // EOF
124 protected abstract SchemaItem
mapInputSchemaInitial(SchemaItem input
);
127 public SchemaItem
mapInputSchema(SchemaItem input
){
130 SchemaItem output
= mapInputSchemaInitial(input
);
131 if (nestedEval
!= null)
132 output
= nestedEval
.mapInputSchema(output
);