2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.physicalLayer
;
7 import java
.io
.IOException
;
8 import java
.lang
.reflect
.Constructor
;
11 import com
.yahoo
.pig
.StorageFunc
;
12 import com
.yahoo
.pig
.builtin
.PigStorage
;
13 import com
.yahoo
.pig
.impl
.PigContext
;
14 import com
.yahoo
.pig
.impl
.logicalLayer
.LOCogroup
;
15 import com
.yahoo
.pig
.impl
.logicalLayer
.LOEval
;
16 import com
.yahoo
.pig
.impl
.logicalLayer
.LOLoad
;
17 import com
.yahoo
.pig
.impl
.logicalLayer
.LORead
;
18 import com
.yahoo
.pig
.impl
.logicalLayer
.LOStore
;
19 import com
.yahoo
.pig
.impl
.logicalLayer
.LOUnion
;
20 import com
.yahoo
.pig
.impl
.logicalLayer
.LogicalOperator
;
23 * LocalPlanCompliler will general an execution plan for ExecType.LOCAL jobs
24 * using the PhysicalOperator framework. It fulfills as similar role as the
25 * MapreducePlanCompiler, but is far more simple.
29 public class LocalPlanCompiler
extends PlanCompiler
{
31 protected LocalPlanCompiler(PigContext pigContext
) {
35 public PhysicalOperator
compile(LogicalOperator lo
, Map queryResults
) throws IOException
{
36 PhysicalOperator po
= compileOperator(lo
, queryResults
);
37 for (int i
= 0; i
< lo
.inputs
.length
; i
++) {
38 po
.inputs
[i
] = compile(lo
.inputs
[i
], queryResults
);
43 protected PhysicalOperator
compileOperator(LogicalOperator lo
, Map queryResults
) throws IOException
{
45 if (lo
instanceof LOEval
) {
46 return new POEval(((LOEval
) lo
).spec
,lo
.getOutputType());
47 } else if (lo
instanceof LOCogroup
) {
48 return new POCogroup(((LOCogroup
) lo
).specs
,lo
.getOutputType());
49 } else if (lo
instanceof LOLoad
) {
50 LOLoad lol
= (LOLoad
) lo
;
51 StorageFunc lf
= null;
52 // We compile the passed parameters in to a proper load function
53 // for local execution.
54 // ==============================
55 String loaderName
= lol
.lf
.getClass().getName();
57 Class loaderClass
= Class
.forName(loaderName
);
58 if (lol
.args
!= null && lol
.args
.length
> 0) {
59 String cleaned
[] = new String
[lol
.args
.length
];
60 Class paramTypes
[] = new Class
[lol
.args
.length
];
61 for (int i
= 0; i
< paramTypes
.length
; i
++) {
62 paramTypes
[i
] = String
.class;
63 cleaned
[i
] = lol
.args
[i
].replaceAll("\'", "");
65 Constructor c
= loaderClass
.getConstructor(paramTypes
);
66 lf
= (StorageFunc
) c
.newInstance((Object
[]) cleaned
);
68 lf
= (StorageFunc
) loaderClass
.newInstance();
70 } catch (Exception e
) {
72 lf
= new PigStorage();
74 return new POLoad(((LOLoad
) lo
).filename
, lf
,lo
.getOutputType());
75 } else if (lo
instanceof LORead
) {
76 IntermedResult readFrom
= ((LORead
) lo
).readFrom
;
78 if (readFrom
.executed()) {
79 // reading from a materialized databag; use PORead
80 return new PORead(readFrom
.read(),readFrom
.getOutputType());
82 if (readFrom
.compiled()) {
83 // other plan already compiled, so split its output
84 return new POSplitSlave(readFrom
.pp
.root
,readFrom
.getOutputType());
86 // other plan not compiled yet, so compile it and use it directly
87 readFrom
.compile(queryResults
);
88 return readFrom
.pp
.root
;
91 } else if (lo
instanceof LOStore
) {
92 LOStore los
= (LOStore
) lo
;
93 return new POStore(los
.filename
, los
.sf
,los
.append
);
94 } else if (lo
instanceof LOUnion
) {
95 return new POUnion(((LOUnion
) lo
).inputs
.length
,lo
.getOutputType());
97 throw new IOException("Unknown logical operator.");