4 * September 1, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import com
.yahoo
.pig
.impl
.PigContext
;
12 import com
.yahoo
.pig
.impl
.eval
.groupby
.GroupBySpec
;
13 import com
.yahoo
.pig
.impl
.logicalLayer
.LOCogroup
;
14 import com
.yahoo
.pig
.impl
.logicalLayer
.LOEval
;
15 import com
.yahoo
.pig
.impl
.logicalLayer
.LOLoad
;
16 import com
.yahoo
.pig
.impl
.logicalLayer
.LORead
;
17 import com
.yahoo
.pig
.impl
.logicalLayer
.LOStore
;
18 import com
.yahoo
.pig
.impl
.logicalLayer
.LOUnion
;
19 import com
.yahoo
.pig
.impl
.logicalLayer
.LogicalOperator
;
20 import com
.yahoo
.pig
.impl
.physicalLayer
.IntermedResult
;
21 import com
.yahoo
.pig
.impl
.physicalLayer
.PhysicalOperator
;
22 import com
.yahoo
.pig
.impl
.physicalLayer
.PlanCompiler
;
23 import galago
.tupleflow
.Parameters
;
24 import galago
.tupleflow
.execution
.Job
;
25 import galago
.tupleflow
.execution
.Connection
;
26 import galago
.tupleflow
.execution
.ConnectionAssignmentType
;
27 import galago
.tupleflow
.execution
.ConnectionEndPoint
;
28 import galago
.tupleflow
.execution
.ConnectionPointType
;
29 import galago
.tupleflow
.execution
.InputStep
;
30 import galago
.tupleflow
.execution
.OutputStep
;
31 import galago
.tupleflow
.execution
.Stage
;
32 import galago
.tupleflow
.execution
.StageConnectionPoint
;
33 import galago
.tupleflow
.execution
.Step
;
34 import galago
.tupleflow
.Parameters
.Value
;
35 import galago
.tupleflow
.execution
.Connection
;
36 import java
.io
.IOException
;
37 import java
.util
.ArrayList
;
38 import java
.util
.HashMap
;
45 public class GalagoPlanCompiler
extends PlanCompiler
{
46 PigContext pigContext
;
49 HashMap
<String
, String
> outputToStage
= new HashMap
<String
, String
>();
50 HashMap
<String
, Connection
> connections
= new HashMap
<String
, Connection
>();
52 public GalagoPlanCompiler( PigContext pigContext
) {
55 if( System
.getProperty( "hashCount" ) != null )
56 job
.properties
.put( "hashCount", System
.getProperty("hashCount") );
59 private Stage
buildStage( String stageType
) {
60 Stage s
= new Stage();
61 s
.name
= stageType
+ stageCounter
;
67 private StageConnectionPoint
buildPoint( ConnectionPointType type
, String name
) {
68 return new StageConnectionPoint( type
,
70 "com.yahoo.pig.impl.galago.Tuple",
75 private StageConnectionPoint
buildOutput( String name
) {
76 return buildPoint( ConnectionPointType
.Output
, name
);
79 private StageConnectionPoint
buildInput( String name
) {
80 return buildPoint( ConnectionPointType
.Input
, name
);
83 private void storeOutputLookup( String aliasName
, String stageName
) {
84 outputToStage
.put( aliasName
, stageName
);
87 public void addLoadStage( LOLoad load
) {
88 Stage splitStage
= buildStage( "split" + load
.alias
);
89 String splitAlias
= load
.alias
+ "$split";
91 Parameters splitParameters
= new Parameters();
92 splitParameters
.add( "filename", load
.filename
);
94 Step splitGeneratorStep
= new Step( null, "com.yahoo.pig.impl.galago.MakeReadSplits", splitParameters
);
95 splitStage
.steps
.add( splitGeneratorStep
);
96 splitStage
.steps
.add( new OutputStep( null, splitAlias
) );
97 splitStage
.connections
.put( splitAlias
, buildOutput( splitAlias
) );
98 storeOutputLookup( splitAlias
, splitStage
.name
);
100 Stage loadStage
= buildStage( "load" + load
.alias
);
102 Parameters loadParameters
= new Parameters();
103 loadParameters
.add( "function", load
.lf
.toString() );
105 Step inputStep
= new InputStep( null, splitAlias
);
106 Step loadStep
= new Step( null, "com.yahoo.pig.impl.galago.ReadTuples", loadParameters
);
107 Step storeStep
= new OutputStep( null, load
.alias
);
109 loadStage
.steps
.add( inputStep
);
110 loadStage
.steps
.add(loadStep
);
111 loadStage
.steps
.add(storeStep
);
112 loadStage
.connections
.put( splitAlias
, buildInput( splitAlias
) );
113 loadStage
.connections
.put( load
.alias
, buildOutput( load
.alias
) );
115 storeOutputLookup( load
.alias
, loadStage
.name
);
116 createHashedConnection( loadStage
, splitAlias
);
119 private void addEvalStage(LOEval eval
) {
120 assert eval
.inputs
.length
== 1;
121 LogicalOperator input
= eval
.inputs
[0];
122 Stage evalStage
= buildStage( "eval" + input
.alias
);
124 Parameters evalParameters
= new Parameters();
125 evalParameters
.add( "spec", eval
.spec
.toString() );
127 Step inputStep
= new InputStep( null, input
.alias
);
128 Step evalStep
= new Step( null, "com.yahoo.pig.impl.galago.EvalTuples", evalParameters
);
129 Step outputStep
= new OutputStep( null, eval
.alias
);
131 evalStage
.steps
.add( inputStep
);
132 evalStage
.steps
.add( evalStep
);
133 evalStage
.steps
.add( outputStep
);
135 evalStage
.connections
.put( input
.alias
, buildInput( input
.alias
) );
136 evalStage
.connections
.put( eval
.alias
, buildOutput( eval
.alias
) );
138 createEachConnection( evalStage
, input
.alias
);
139 storeOutputLookup( eval
.alias
, evalStage
.name
);
140 connectNowhere( evalStage
, eval
.alias
);
143 private ConnectionEndPoint
createConnectionInput( String source
) {
144 assert outputToStage
.containsKey( source
);
145 return new ConnectionEndPoint( null, outputToStage
.get( source
), source
, ConnectionPointType
.Input
);
148 private ConnectionEndPoint
createConnectionOutput( Stage destination
, String source
, String
[] hash
) {
149 ConnectionAssignmentType type
= (hash
!= null ? ConnectionAssignmentType
.Each
: ConnectionAssignmentType
.Combined
);
150 return createConnectionOutput( destination
, source
, hash
, type
);
153 private ConnectionEndPoint
createConnectionOutput( Stage destination
, String source
, String
[] hash
, ConnectionAssignmentType type
) {
154 return new ConnectionEndPoint( null, destination
.name
, source
, type
, ConnectionPointType
.Output
);
157 private void createEachConnection( Stage destination
, String source
) {
158 createConnection( destination
, source
, new String
[0], null, ConnectionAssignmentType
.Each
);
161 private void createCombinedConnection( Stage destination
, String source
) {
162 createConnection( destination
, source
, new String
[0], null, ConnectionAssignmentType
.Combined
);
165 private void createHashedConnection( Stage destination
, String source
) {
166 createConnection( destination
, source
, new String
[0], new String
[] { "+0" }, ConnectionAssignmentType
.Each
);
169 private void createConnection( Stage destination
, String source
, String
[] order
, String
[] hash
, ConnectionAssignmentType type
) {
170 assert outputToStage
.containsKey( source
) : destination
.name
+ " " + source
;
172 if( !connections
.containsKey(source
) ) {
173 Connection c
= new Connection( null, "com.yahoo.pig.impl.galago.Tuple", order
, hash
, -1 );
174 c
.inputs
.add( createConnectionInput( source
) );
175 connections
.put( source
, c
);
178 Connection connection
= connections
.get( source
);
179 connection
.outputs
.add( createConnectionOutput( destination
, source
, hash
, type
) );
180 job
.connections
.add( connection
);
183 private void connectNowhere( Stage outputStage
, String outputPoint
) {
184 // BUGBUG: should remove this method
186 Connection connection = new Connection( null, "com.yahoo.pig.impl.galago.Tuple", new String[0], null, -1 );
187 connection.inputs.add( new ConnectionEndPoint( null, outputStage.name, outputPoint, ConnectionPointType.Input ) );
188 job.connections.add( connection );
192 private void addStoreStage(LOStore store
) {
193 Stage s
= buildStage( "store" + store
.alias
);
194 String inputName
= store
.inputs
[0].alias
;
195 Parameters parameters
= new Parameters();
197 parameters
.add( "filename", store
.filename
);
198 parameters
.add( "function", store
.sf
);
199 Step inputStep
= new InputStep( inputName
);
200 Step storeStep
= new Step( null, "com.yahoo.pig.impl.galago.StoreTuples", parameters
);
202 s
.steps
.add( inputStep
);
203 s
.steps
.add( storeStep
);
204 s
.connections
.put( inputName
, buildInput( inputName
) );
206 createCombinedConnection( s
, inputName
);
209 private void addGroupStage(LOCogroup group
) {
210 ArrayList
<String
> outputNames
= new ArrayList();
212 // Create one Map stage for each input to the COGROUP operator.
213 // A COGROUP looks like:
214 // COGROUP a by x, b by y, c by z
215 // This COGROUP ends up being 3 different map stages, one for each input,
216 // which performs the hashing, etc.
217 // Then, there's a final Reduce stage that aligns all the similar tuples
218 // and outputs the final tuple set.
220 Parameters reduceParameters
= new Parameters();
221 ArrayList
<Value
> inputParamList
= new ArrayList
<Value
>();
223 for( int i
=0; i
<group
.inputs
.length
; i
++ ) {
224 LogicalOperator input
= group
.inputs
[i
];
225 GroupBySpec spec
= group
.specs
[i
];
227 Stage inputStage
= buildStage( "input" + input
.alias
);
228 String outputName
= "map$" + stageCounter
+ "$" + input
.alias
+ "$" + group
.alias
;
229 outputNames
.add( outputName
);
231 inputStage
.connections
.put( input
.alias
, buildInput( input
.alias
) );
232 inputStage
.connections
.put( outputName
, buildOutput( outputName
) );
233 storeOutputLookup( outputName
, inputStage
.name
);
235 Parameters inputParameters
= new Parameters();
236 inputParameters
.add( "group", spec
.toString() );
238 Step inputStep
= new InputStep( null, input
.alias
);
239 Step mapStep
= new Step( null, "com.yahoo.pig.impl.galago.MapTuples", inputParameters
);
240 Step outputStep
= new OutputStep( null, outputName
);
242 inputStage
.steps
.add( inputStep
);
243 inputStage
.steps
.add( mapStep
);
244 inputStage
.steps
.add( outputStep
);
246 createEachConnection( inputStage
, input
.alias
);
248 Value root
= new Value();
249 root
.add( "name", outputName
);
250 root
.add( "group", spec
.toString() );
251 inputParamList
.add( root
);
254 Stage reduceStage
= buildStage( "reduce" + group
.alias
);
255 reduceParameters
.add( "input", inputParamList
);
256 reduceParameters
.add( "eval", "" );
258 Step reduceStep
= new Step( null, "com.yahoo.pig.impl.galago.JoinTuples", reduceParameters
);
259 Step outputStep
= new OutputStep( null, group
.alias
);
261 reduceStage
.steps
.add(reduceStep
);
262 reduceStage
.steps
.add(outputStep
);
264 // Connect the Map stages to the Reduce stage
265 for( String outputName
: outputNames
) {
266 // This is the connection spec in the reduce stage
267 reduceStage
.connections
.put( outputName
, buildInput( outputName
) );
268 // This builds the connection at the job level
269 createHashedConnection( reduceStage
, outputName
);
272 reduceStage
.connections
.put( group
.alias
, buildOutput( group
.alias
) );
274 storeOutputLookup( group
.alias
, reduceStage
.name
);
275 connectNowhere( reduceStage
, group
.alias
);
278 private void addUnionStage(LOUnion lOUnion
) {
279 throw new UnsupportedOperationException("Not yet implemented");
282 private void addReadStage( LORead read
, Map queryResults
) throws IOException
{
283 IntermedResult intermediate
= read
.readFrom
;
285 if( !intermediate
.compiled() ) {
286 compile( intermediate
.lp
.root(), queryResults
);
287 intermediate
.setCompiled( true );
291 public PhysicalOperator
compile(LogicalOperator lo
, Map queryResults
) throws IOException
{
292 // each compile stage just creates something in the job
293 // that we can reference later.
294 for( LogicalOperator input
: lo
.inputs
)
295 compile( input
, queryResults
);
297 if (lo
instanceof LOLoad
) {
298 addLoadStage( (LOLoad
)lo
);
299 } else if(lo
instanceof LORead
) {
300 LORead read
= (LORead
) lo
;
301 addReadStage( read
, queryResults
);
302 } else if(lo
instanceof LOStore
) {
303 LOStore store
= (LOStore
) lo
;
304 addStoreStage( store
);
305 } else if(lo
instanceof LOEval
) {
306 LOEval eval
= (LOEval
)lo
;
307 addEvalStage( eval
);
308 } else if(lo
instanceof LOCogroup
) {
309 addGroupStage( (LOCogroup
)lo
);
310 } else if(lo
instanceof LOUnion
) {
311 addUnionStage( (LOUnion
)lo
);
313 throw new IOException( "unknown logical operator type " + lo
.getClass().getName() );
316 return new POGalago( job
);