4 * September 1, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import com
.yahoo
.pig
.impl
.physicalLayer
.PhysicalOperator
;
12 import com
.yahoo
.pig
.data
.Tuple
;
13 import galago
.tupleflow
.Order
;
14 import galago
.tupleflow
.TypeReader
;
15 import galago
.tupleflow
.execution
.ErrorStore
;
16 import galago
.tupleflow
.execution
.Job
;
17 import galago
.tupleflow
.execution
.JobExecutor
;
18 import galago
.tupleflow
.execution
.LocalStageExecutor
;
19 import galago
.tupleflow
.execution
.StageExecutor
;
20 import galago
.tupleflow
.execution
.StageExecutorFactory
;
21 import java
.io
.IOException
;
27 public class POGalago
extends PhysicalOperator
{
31 String dataSourceName
;
32 TypeReader
<com
.yahoo
.pig
.impl
.galago
.Tuple
> tupleReader
;
34 String outputFilename
;
35 String outputFunction
;
37 /** Creates a new instance of POGalago */
38 public POGalago( Job job
, String dataSourceName
, String
[] orderSpec
) {
40 this.dataSourceName
= dataSourceName
;
41 this.orderSpec
= orderSpec
;
44 public POGalago( Job job
) {
48 public Tuple
getNext() throws IOException
{
49 if( tupleReader
!= null )
50 return tupleReader
.read();
55 public boolean open(boolean continueFromLast
) throws IOException
{
56 if( !super.open(continueFromLast
) )
59 String temporaryStorage
= System
.getProperty( "gwd" );
61 if( temporaryStorage
== null )
62 temporaryStorage
= "/tmp/pig-galago";
64 String executorType
= System
.getProperty( "executorType" );
66 ErrorStore errorStore
= new ErrorStore();
67 StageExecutor executor
= StageExecutorFactory
.newInstance( executorType
);
69 job
= JobExecutor
.optimize(job
);
70 System
.out
.println(job
);
71 context
= new JobExecutor( job
, temporaryStorage
, errorStore
);
75 context
.run( executor
);
76 } catch( Exception e
) {
77 throw (IOException
) new IOException( "Problems executing the Galago job." ).initCause(e
);
81 System
.err
.println( errorStore
.toString() );
85 public void setOutput( String filename
, String function
) {
86 outputFilename
= filename
;
87 outputFunction
= function
;