Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / POGalago.java
blobca34ad59f305f875ad1f59929acfde27ebc44bb4
1 /*
2 * POGalago
4 * September 1, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import com.yahoo.pig.impl.physicalLayer.PhysicalOperator;
12 import com.yahoo.pig.data.Tuple;
13 import galago.tupleflow.Order;
14 import galago.tupleflow.TypeReader;
15 import galago.tupleflow.execution.ErrorStore;
16 import galago.tupleflow.execution.Job;
17 import galago.tupleflow.execution.JobExecutor;
18 import galago.tupleflow.execution.LocalStageExecutor;
19 import galago.tupleflow.execution.StageExecutor;
20 import galago.tupleflow.execution.StageExecutorFactory;
21 import java.io.IOException;
23 /**
25 * @author trevor
27 public class POGalago extends PhysicalOperator {
28 Job job;
29 JobExecutor context;
30 String[] orderSpec;
31 String dataSourceName;
32 TypeReader<com.yahoo.pig.impl.galago.Tuple> tupleReader;
34 String outputFilename;
35 String outputFunction;
37 /** Creates a new instance of POGalago */
38 public POGalago( Job job, String dataSourceName, String[] orderSpec ) {
39 this.job = job;
40 this.dataSourceName = dataSourceName;
41 this.orderSpec = orderSpec;
44 public POGalago( Job job ) {
45 this.job = job;
48 public Tuple getNext() throws IOException {
49 if( tupleReader != null )
50 return tupleReader.read();
51 return null;
54 @Override
55 public boolean open(boolean continueFromLast) throws IOException {
56 if( !super.open(continueFromLast) )
57 return false;
59 String temporaryStorage = System.getProperty( "gwd" );
61 if( temporaryStorage == null )
62 temporaryStorage = "/tmp/pig-galago";
64 String executorType = System.getProperty( "executorType" );
66 ErrorStore errorStore = new ErrorStore();
67 StageExecutor executor = StageExecutorFactory.newInstance( executorType );
69 job = JobExecutor.optimize(job);
70 System.out.println(job);
71 context = new JobExecutor( job, temporaryStorage, errorStore );
72 context.prepare();
74 try {
75 context.run( executor );
76 } catch( Exception e ) {
77 throw (IOException) new IOException( "Problems executing the Galago job." ).initCause(e);
80 executor.shutdown();
81 System.err.println( errorStore.toString() );
82 return true;
85 public void setOutput( String filename, String function ) {
86 outputFilename = filename;
87 outputFunction = function;