4 * September 3, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import com
.yahoo
.pig
.PigServer
.ExecType
;
12 import com
.yahoo
.pig
.data
.DataCollector
;
13 import com
.yahoo
.pig
.data
.Datum
;
14 import com
.yahoo
.pig
.impl
.PigContext
;
15 import com
.yahoo
.pig
.impl
.eval
.EvalSpecPipe
;
16 import com
.yahoo
.pig
.impl
.eval
.groupby
.GroupBySpec
;
17 import galago
.tupleflow
.InputClass
;
18 import galago
.tupleflow
.OutputClass
;
19 import galago
.tupleflow
.StandardStep
;
20 import galago
.tupleflow
.TupleFlowParameters
;
21 import galago
.tupleflow
.execution
.Verified
;
22 import java
.io
.IOException
;
29 @InputClass(className
="com.yahoo.pig.impl.galago.Tuple")
30 @OutputClass(className
="com.yahoo.pig.impl.galago.Tuple")
32 public class MapTuples
extends StandardStep
<Tuple
, Tuple
> {
34 EvalSpecPipe evalPipeSpec
;
35 DataCollector collector
;
36 PigContext context
= new PigContext( ExecType
.GALAGO
);
38 private class MapCollector
extends DataCollector
{
39 public void add(com
.yahoo
.pig
.data
.Tuple t
) throws IOException
{
43 Datum
[] multGroups
= group
.eval(t
);
45 for( int i
=0; i
<multGroups
.length
; i
++ ) {
46 Datum key
= multGroups
[i
];
47 Tuple result
= new Tuple();
49 result
.appendField(key
);
50 result
.appendTuple(new Tuple(t
));
51 processor
.process(result
);
56 public MapTuples( TupleFlowParameters parameters
) throws IOException
{
57 group
= new GroupBySpec( context
, parameters
.getXML().get("group") );
58 collector
= new MapCollector();
61 public void process(Tuple object
) throws IOException
{
62 collector
.add(object
);
65 public void close() throws IOException
{