Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / physicalLayer / LocalPlanCompiler.java
blob97a1e565bbf2714cd0c869a5495fc458a5d65b9e
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.physicalLayer;
7 import java.io.IOException;
8 import java.lang.reflect.Constructor;
9 import java.util.Map;
11 import com.yahoo.pig.StorageFunc;
12 import com.yahoo.pig.builtin.PigStorage;
13 import com.yahoo.pig.impl.PigContext;
14 import com.yahoo.pig.impl.logicalLayer.LOCogroup;
15 import com.yahoo.pig.impl.logicalLayer.LOEval;
16 import com.yahoo.pig.impl.logicalLayer.LOLoad;
17 import com.yahoo.pig.impl.logicalLayer.LORead;
18 import com.yahoo.pig.impl.logicalLayer.LOStore;
19 import com.yahoo.pig.impl.logicalLayer.LOUnion;
20 import com.yahoo.pig.impl.logicalLayer.LogicalOperator;
22 /**
23 * LocalPlanCompliler will general an execution plan for ExecType.LOCAL jobs
24 * using the PhysicalOperator framework. It fulfills as similar role as the
25 * MapreducePlanCompiler, but is far more simple.
26 * @author dnm
29 public class LocalPlanCompiler extends PlanCompiler {
31 protected LocalPlanCompiler(PigContext pigContext) {
32 super(pigContext);
34 @Override
35 public PhysicalOperator compile(LogicalOperator lo, Map queryResults) throws IOException {
36 PhysicalOperator po = compileOperator(lo, queryResults);
37 for (int i = 0; i < lo.inputs.length; i++) {
38 po.inputs[i] = compile(lo.inputs[i], queryResults);
40 return po;
43 protected PhysicalOperator compileOperator(LogicalOperator lo, Map queryResults) throws IOException {
45 if (lo instanceof LOEval) {
46 return new POEval(((LOEval) lo).spec,lo.getOutputType());
47 } else if (lo instanceof LOCogroup) {
48 return new POCogroup(((LOCogroup) lo).specs,lo.getOutputType());
49 } else if (lo instanceof LOLoad) {
50 LOLoad lol = (LOLoad) lo;
51 StorageFunc lf = null;
52 // We compile the passed parameters in to a proper load function
53 // for local execution.
54 // ==============================
55 String loaderName = lol.lf.getClass().getName();
56 try {
57 Class loaderClass = Class.forName(loaderName);
58 if (lol.args != null && lol.args.length > 0) {
59 String cleaned[] = new String[lol.args.length];
60 Class paramTypes[] = new Class[lol.args.length];
61 for (int i = 0; i < paramTypes.length; i++) {
62 paramTypes[i] = String.class;
63 cleaned[i] = lol.args[i].replaceAll("\'", "");
65 Constructor c = loaderClass.getConstructor(paramTypes);
66 lf = (StorageFunc) c.newInstance((Object[]) cleaned);
67 } else {
68 lf = (StorageFunc) loaderClass.newInstance();
70 } catch (Exception e) {
71 e.printStackTrace();
72 lf = new PigStorage();
74 return new POLoad(((LOLoad) lo).filename, lf,lo.getOutputType());
75 } else if (lo instanceof LORead) {
76 IntermedResult readFrom = ((LORead) lo).readFrom;
78 if (readFrom.executed()) {
79 // reading from a materialized databag; use PORead
80 return new PORead(readFrom.read(),readFrom.getOutputType());
81 } else {
82 if (readFrom.compiled()) {
83 // other plan already compiled, so split its output
84 return new POSplitSlave(readFrom.pp.root,readFrom.getOutputType());
85 } else {
86 // other plan not compiled yet, so compile it and use it directly
87 readFrom.compile(queryResults);
88 return readFrom.pp.root;
91 } else if (lo instanceof LOStore) {
92 LOStore los = (LOStore) lo;
93 return new POStore(los.filename, los.sf,los.append);
94 } else if (lo instanceof LOUnion) {
95 return new POUnion(((LOUnion) lo).inputs.length,lo.getOutputType());
96 } else {
97 throw new IOException("Unknown logical operator.");