Bringing tree up to date.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / GalagoPlanCompiler.java
blobcfdaafcc666bdf040d7423bbd334f434b2bba865
1 /*
2 * GalagoPlanCompiler
4 * September 1, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import com.yahoo.pig.impl.PigContext;
12 import com.yahoo.pig.impl.eval.groupby.GroupBySpec;
13 import com.yahoo.pig.impl.logicalLayer.LOCogroup;
14 import com.yahoo.pig.impl.logicalLayer.LOEval;
15 import com.yahoo.pig.impl.logicalLayer.LOLoad;
16 import com.yahoo.pig.impl.logicalLayer.LORead;
17 import com.yahoo.pig.impl.logicalLayer.LOStore;
18 import com.yahoo.pig.impl.logicalLayer.LOUnion;
19 import com.yahoo.pig.impl.logicalLayer.LogicalOperator;
20 import com.yahoo.pig.impl.physicalLayer.IntermedResult;
21 import com.yahoo.pig.impl.physicalLayer.PhysicalOperator;
22 import com.yahoo.pig.impl.physicalLayer.PlanCompiler;
23 import galago.tupleflow.Parameters;
24 import galago.tupleflow.execution.Job;
25 import galago.tupleflow.execution.Connection;
26 import galago.tupleflow.execution.ConnectionAssignmentType;
27 import galago.tupleflow.execution.ConnectionEndPoint;
28 import galago.tupleflow.execution.ConnectionPointType;
29 import galago.tupleflow.execution.InputStep;
30 import galago.tupleflow.execution.OutputStep;
31 import galago.tupleflow.execution.Stage;
32 import galago.tupleflow.execution.StageConnectionPoint;
33 import galago.tupleflow.execution.Step;
34 import galago.tupleflow.Parameters.Value;
35 import galago.tupleflow.execution.Connection;
36 import java.io.IOException;
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.Map;
41 /**
43 * @author trevor
45 public class GalagoPlanCompiler extends PlanCompiler {
46 PigContext pigContext;
47 int stageCounter = 0;
48 Job job = new Job();
49 HashMap<String, String> outputToStage = new HashMap<String, String>();
50 HashMap<String, Connection> connections = new HashMap<String, Connection>();
52 public GalagoPlanCompiler( PigContext pigContext ) {
53 super(pigContext);
55 if( System.getProperty( "hashCount" ) != null )
56 job.properties.put( "hashCount", System.getProperty("hashCount") );
59 private Stage buildStage( String stageType ) {
60 Stage s = new Stage();
61 s.name = stageType + stageCounter;
62 stageCounter++;
63 job.add(s);
64 return s;
67 private StageConnectionPoint buildPoint( ConnectionPointType type, String name ) {
68 return new StageConnectionPoint( type,
69 name,
70 "com.yahoo.pig.impl.galago.Tuple",
71 new String[0],
72 null );
75 private StageConnectionPoint buildOutput( String name ) {
76 return buildPoint( ConnectionPointType.Output, name );
79 private StageConnectionPoint buildInput( String name ) {
80 return buildPoint( ConnectionPointType.Input, name );
83 private void storeOutputLookup( String aliasName, String stageName ) {
84 outputToStage.put( aliasName, stageName );
87 public void addLoadStage( LOLoad load ) {
88 Stage splitStage = buildStage( "split" + load.alias );
89 String splitAlias = load.alias + "$split";
91 Parameters splitParameters = new Parameters();
92 splitParameters.add( "filename", load.filename );
94 Step splitGeneratorStep = new Step( null, "com.yahoo.pig.impl.galago.MakeReadSplits", splitParameters );
95 splitStage.steps.add( splitGeneratorStep );
96 splitStage.steps.add( new OutputStep( null, splitAlias ) );
97 splitStage.connections.put( splitAlias, buildOutput( splitAlias ) );
98 storeOutputLookup( splitAlias, splitStage.name );
100 Stage loadStage = buildStage( "load" + load.alias );
102 Parameters loadParameters = new Parameters();
103 loadParameters.add( "function", load.lf.toString() );
105 Step inputStep = new InputStep( null, splitAlias );
106 Step loadStep = new Step( null, "com.yahoo.pig.impl.galago.ReadTuples", loadParameters );
107 Step storeStep = new OutputStep( null, load.alias );
109 loadStage.steps.add( inputStep );
110 loadStage.steps.add(loadStep);
111 loadStage.steps.add(storeStep);
112 loadStage.connections.put( splitAlias, buildInput( splitAlias ) );
113 loadStage.connections.put( load.alias, buildOutput( load.alias ) );
115 storeOutputLookup( load.alias, loadStage.name );
116 createHashedConnection( loadStage, splitAlias );
119 private void addEvalStage(LOEval eval) {
120 assert eval.inputs.length == 1;
121 LogicalOperator input = eval.inputs[0];
122 Stage evalStage = buildStage( "eval" + input.alias );
124 Parameters evalParameters = new Parameters();
125 evalParameters.add( "spec", eval.spec.toString() );
127 Step inputStep = new InputStep( null, input.alias );
128 Step evalStep = new Step( null, "com.yahoo.pig.impl.galago.EvalTuples", evalParameters );
129 Step outputStep = new OutputStep( null, eval.alias );
131 evalStage.steps.add( inputStep );
132 evalStage.steps.add( evalStep );
133 evalStage.steps.add( outputStep );
135 evalStage.connections.put( input.alias, buildInput( input.alias ) );
136 evalStage.connections.put( eval.alias, buildOutput( eval.alias ) );
138 createEachConnection( evalStage, input.alias );
139 storeOutputLookup( eval.alias, evalStage.name );
140 connectNowhere( evalStage, eval.alias );
143 private ConnectionEndPoint createConnectionInput( String source ) {
144 assert outputToStage.containsKey( source );
145 return new ConnectionEndPoint( null, outputToStage.get( source ), source, ConnectionPointType.Input );
148 private ConnectionEndPoint createConnectionOutput( Stage destination, String source, String[] hash ) {
149 ConnectionAssignmentType type = (hash != null ? ConnectionAssignmentType.Each : ConnectionAssignmentType.Combined);
150 return createConnectionOutput( destination, source, hash, type );
153 private ConnectionEndPoint createConnectionOutput( Stage destination, String source, String[] hash, ConnectionAssignmentType type ) {
154 return new ConnectionEndPoint( null, destination.name, source, type, ConnectionPointType.Output );
157 private void createEachConnection( Stage destination, String source ) {
158 createConnection( destination, source, new String[0], null, ConnectionAssignmentType.Each );
161 private void createCombinedConnection( Stage destination, String source ) {
162 createConnection( destination, source, new String[0], null, ConnectionAssignmentType.Combined );
165 private void createHashedConnection( Stage destination, String source ) {
166 createConnection( destination, source, new String[0], new String[] { "+0" }, ConnectionAssignmentType.Each );
169 private void createConnection( Stage destination, String source, String[] order, String[] hash, ConnectionAssignmentType type ) {
170 assert outputToStage.containsKey( source ) : destination.name + " " + source;
172 if( !connections.containsKey(source) ) {
173 Connection c = new Connection( null, "com.yahoo.pig.impl.galago.Tuple", order, hash, -1 );
174 c.inputs.add( createConnectionInput( source ) );
175 connections.put( source, c );
178 Connection connection = connections.get( source );
179 connection.outputs.add( createConnectionOutput( destination, source, hash, type ) );
180 job.connections.add( connection );
183 private void connectNowhere( Stage outputStage, String outputPoint ) {
184 // BUGBUG: should remove this method
186 Connection connection = new Connection( null, "com.yahoo.pig.impl.galago.Tuple", new String[0], null, -1 );
187 connection.inputs.add( new ConnectionEndPoint( null, outputStage.name, outputPoint, ConnectionPointType.Input ) );
188 job.connections.add( connection );
192 private void addStoreStage(LOStore store) {
193 Stage s = buildStage( "store" + store.alias );
194 String inputName = store.inputs[0].alias;
195 Parameters parameters = new Parameters();
197 parameters.add( "filename", store.filename );
198 parameters.add( "function", store.sf );
199 Step inputStep = new InputStep( inputName );
200 Step storeStep = new Step( null, "com.yahoo.pig.impl.galago.StoreTuples", parameters );
202 s.steps.add( inputStep );
203 s.steps.add( storeStep );
204 s.connections.put( inputName, buildInput( inputName ) );
206 createCombinedConnection( s, inputName );
209 private void addGroupStage(LOCogroup group) {
210 ArrayList<String> outputNames = new ArrayList();
212 // Create one Map stage for each input to the COGROUP operator.
213 // A COGROUP looks like:
214 // COGROUP a by x, b by y, c by z
215 // This COGROUP ends up being 3 different map stages, one for each input,
216 // which performs the hashing, etc.
217 // Then, there's a final Reduce stage that aligns all the similar tuples
218 // and outputs the final tuple set.
220 Parameters reduceParameters = new Parameters();
221 ArrayList<Value> inputParamList = new ArrayList<Value>();
223 for( int i=0; i<group.inputs.length; i++ ) {
224 LogicalOperator input = group.inputs[i];
225 GroupBySpec spec = group.specs[i];
227 Stage inputStage = buildStage( "input" + input.alias );
228 String outputName = "map$" + stageCounter + "$" + input.alias + "$" + group.alias;
229 outputNames.add( outputName );
231 inputStage.connections.put( input.alias, buildInput( input.alias ) );
232 inputStage.connections.put( outputName, buildOutput( outputName ) );
233 storeOutputLookup( outputName, inputStage.name );
235 Parameters inputParameters = new Parameters();
236 inputParameters.add( "group", spec.toString() );
238 Step inputStep = new InputStep( null, input.alias );
239 Step mapStep = new Step( null, "com.yahoo.pig.impl.galago.MapTuples", inputParameters );
240 Step outputStep = new OutputStep( null, outputName );
242 inputStage.steps.add( inputStep );
243 inputStage.steps.add( mapStep );
244 inputStage.steps.add( outputStep );
246 createEachConnection( inputStage, input.alias );
248 Value root = new Value();
249 root.add( "name", outputName );
250 root.add( "group", spec.toString() );
251 inputParamList.add( root );
254 Stage reduceStage = buildStage( "reduce" + group.alias );
255 reduceParameters.add( "input", inputParamList );
256 reduceParameters.add( "eval", "" );
258 Step reduceStep = new Step( null, "com.yahoo.pig.impl.galago.JoinTuples", reduceParameters );
259 Step outputStep = new OutputStep( null, group.alias );
261 reduceStage.steps.add(reduceStep);
262 reduceStage.steps.add(outputStep);
264 // Connect the Map stages to the Reduce stage
265 for( String outputName : outputNames ) {
266 // This is the connection spec in the reduce stage
267 reduceStage.connections.put( outputName, buildInput( outputName ) );
268 // This builds the connection at the job level
269 createHashedConnection( reduceStage, outputName );
272 reduceStage.connections.put( group.alias, buildOutput( group.alias ) );
274 storeOutputLookup( group.alias, reduceStage.name );
275 connectNowhere( reduceStage, group.alias );
278 private void addUnionStage(LOUnion lOUnion) {
279 throw new UnsupportedOperationException("Not yet implemented");
282 private void addReadStage( LORead read, Map queryResults ) throws IOException {
283 IntermedResult intermediate = read.readFrom;
285 if( !intermediate.compiled() ) {
286 compile( intermediate.lp.root(), queryResults );
287 intermediate.setCompiled( true );
291 public PhysicalOperator compile(LogicalOperator lo, Map queryResults) throws IOException {
292 // each compile stage just creates something in the job
293 // that we can reference later.
294 for( LogicalOperator input : lo.inputs )
295 compile( input, queryResults );
297 if (lo instanceof LOLoad) {
298 addLoadStage( (LOLoad)lo );
299 } else if(lo instanceof LORead) {
300 LORead read = (LORead) lo;
301 addReadStage( read, queryResults );
302 } else if(lo instanceof LOStore) {
303 LOStore store = (LOStore) lo;
304 addStoreStage( store );
305 } else if(lo instanceof LOEval) {
306 LOEval eval = (LOEval)lo;
307 addEvalStage( eval );
308 } else if(lo instanceof LOCogroup) {
309 addGroupStage( (LOCogroup)lo );
310 } else if(lo instanceof LOUnion) {
311 addUnionStage( (LOUnion)lo );
312 } else {
313 throw new IOException( "unknown logical operator type " + lo.getClass().getName() );
316 return new POGalago( job );