Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / MapTuples.java
blob702e299837a7a8970aadedc4ec8607618692f20e
1 /*
2 * MapTuples
4 * September 3, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import com.yahoo.pig.PigServer.ExecType;
12 import com.yahoo.pig.data.DataCollector;
13 import com.yahoo.pig.data.Datum;
14 import com.yahoo.pig.impl.PigContext;
15 import com.yahoo.pig.impl.eval.EvalSpecPipe;
16 import com.yahoo.pig.impl.eval.groupby.GroupBySpec;
17 import galago.tupleflow.InputClass;
18 import galago.tupleflow.OutputClass;
19 import galago.tupleflow.StandardStep;
20 import galago.tupleflow.TupleFlowParameters;
21 import galago.tupleflow.execution.Verified;
22 import java.io.IOException;
24 /**
26 * @author trevor
29 @InputClass(className="com.yahoo.pig.impl.galago.Tuple")
30 @OutputClass(className="com.yahoo.pig.impl.galago.Tuple")
31 @Verified
32 public class MapTuples extends StandardStep<Tuple, Tuple> {
33 GroupBySpec group;
34 EvalSpecPipe evalPipeSpec;
35 DataCollector collector;
36 PigContext context = new PigContext( ExecType.GALAGO );
38 private class MapCollector extends DataCollector {
39 public void add(com.yahoo.pig.data.Tuple t) throws IOException {
40 if( t == null )
41 return;
43 Datum[] multGroups = group.eval(t);
45 for( int i=0; i<multGroups.length; i++ ) {
46 Datum key = multGroups[i];
47 Tuple result = new Tuple();
49 result.appendField(key);
50 result.appendTuple(new Tuple(t));
51 processor.process(result);
56 public MapTuples( TupleFlowParameters parameters ) throws IOException {
57 group = new GroupBySpec( context, parameters.getXML().get("group") );
58 collector = new MapCollector();
61 public void process(Tuple object) throws IOException {
62 collector.add(object);
65 public void close() throws IOException {
66 collector.add(null);
67 super.close();