2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
7 import com
.yahoo
.pig
.impl
.galago
.POGalago
;
8 import java
.io
.IOException
;
9 import java
.io
.InputStream
;
10 import java
.util
.HashMap
;
11 import java
.util
.Iterator
;
14 import org
.apache
.hadoop
.dfs
.DistributedFileSystem
;
15 import org
.apache
.hadoop
.fs
.Path
;
17 import com
.yahoo
.pig
.builtin
.BinStorage
;
18 import com
.yahoo
.pig
.builtin
.PigStorage
;
19 import com
.yahoo
.pig
.data
.*;
20 import com
.yahoo
.pig
.impl
.PigContext
;
21 import com
.yahoo
.pig
.impl
.io
.FileLocalizer
;
22 import com
.yahoo
.pig
.impl
.logicalLayer
.LORead
;
23 import com
.yahoo
.pig
.impl
.logicalLayer
.LOStore
;
24 import com
.yahoo
.pig
.impl
.logicalLayer
.LogicalOperator
;
25 import com
.yahoo
.pig
.impl
.logicalLayer
.LogicalPlan
;
26 import com
.yahoo
.pig
.impl
.logicalLayer
.LogicalPlanBuilder
;
27 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.ParseException
;
28 import com
.yahoo
.pig
.impl
.mapreduceExec
.MapReduceLauncher
;
29 import com
.yahoo
.pig
.impl
.physicalLayer
.IntermedResult
;
30 import com
.yahoo
.pig
.impl
.physicalLayer
.POMapreduce
;
31 import com
.yahoo
.pig
.impl
.physicalLayer
.PhysicalPlan
;
32 import com
.yahoo
.pig
.impl
.physicalLayer
.POStore
;
33 import com
.yahoo
.pig
.impl
.pigbodyExec
.PigBodyLauncher
;
37 * This class is the program's connection to Pig. Typically a program will create a PigServer
38 * instance and call UseMapreduce(). The programmer then registers queries using registerQuery() and
39 * retrieves results using openIterator() or store().
42 public class PigServer
{
45 * The type of query execution
47 static public enum ExecType
{
49 * Run everything on the local machine
53 * Use the Hadoop Map/Reduce framework
57 * Use the Experimental Hadoop framework; not available yet.
61 * Use the Galago TupleFlow framework
66 private static ExecType
parseExecType(String str
) throws IOException
{
67 String normStr
= str
.toLowerCase();
69 if (normStr
.equals("local")) return ExecType
.LOCAL
;
70 if (normStr
.equals("mapreduce")) return ExecType
.MAPREDUCE
;
71 if (normStr
.equals("mapred")) return ExecType
.MAPREDUCE
;
72 if (normStr
.equals("pig")) return ExecType
.PIG
;
73 if (normStr
.equals("pigbody")) return ExecType
.PIG
;
74 if (normStr
.equals("galago")) return ExecType
.GALAGO
;
76 throw new IOException("Unrecognized exec type: " + str
);
79 * a table mapping intermediate results to physical plans that read them (the state
80 * of how much has been read is maintained in PORead
82 Map
<IntermedResult
, PhysicalPlan
> physicalPlans
= new HashMap
<IntermedResult
, PhysicalPlan
>();
84 * a table mapping ID's to intermediate results
86 Map
<String
, IntermedResult
> queryResults
= new HashMap
<String
, IntermedResult
>();
88 PigContext pigContext
= null;
90 public PigServer(String execType
) throws IOException
{
91 this(parseExecType(execType
));
94 public PigServer(String execType
, String filesystemLocation
, String jobtrackerLocation
) throws IOException
{
95 this(parseExecType(execType
), filesystemLocation
, jobtrackerLocation
);
98 public PigServer(ExecType execType
) {
99 this(execType
, null, null);
102 public PigServer(ExecType execType
, String filesystemLocation
, String jobtrackerLocation
) {
103 pigContext
= new PigContext(execType
);
105 // propagate the two locations to the configuration
106 if (filesystemLocation
!= null) MapReduceLauncher
.setFilesystemLocation(filesystemLocation
);
107 if (jobtrackerLocation
!= null) MapReduceLauncher
.setJobtrackerLocation(jobtrackerLocation
);
108 if (filesystemLocation
!= null) PigBodyLauncher
.setFilesystemLocation(filesystemLocation
);
109 if (jobtrackerLocation
!= null) PigBodyLauncher
.setJobtrackerLocation(jobtrackerLocation
);
112 public void debugOn() {
113 MapReduceLauncher
.debug
= true;
116 public void debugOff() {
117 MapReduceLauncher
.debug
= false;
121 * Defines an alias for the given function spec. This
122 * is useful for functions that require arguments to the
125 * @param alias - the new function alias to define.
126 * @param functionSpec - the name of the function and any arguments.
127 * It should have the form: classname('arg1', 'arg2', ...)
129 public void registerFunction(String function
, String functionSpec
) {
130 pigContext
.registerFunction(function
, functionSpec
);
133 public void registerJar(String path
) {
134 MapReduceLauncher
.addJar(path
);
138 * Register a query with the Pig runtime. The query is parsed and registered, but it is not
139 * executed until it is needed.
142 * a Pig Latin expression to be evaluated.
143 * @return a handle to the query.
144 * @throws IOException
146 public void registerQuery(String query
) throws IOException
{
147 // Bugzilla Bug 1006706 -- ignore empty queries
148 //=============================================
150 query
= query
.trim();
151 if(query
.length() == 0) return;
156 // parse the query into a logical plan
157 LogicalPlan lp
= null;
159 lp
= (new LogicalPlanBuilder(pigContext
).parse(query
, queryResults
));
160 // System.out.println(lp.toString());
161 } catch (ParseException e
) {
162 throw (IOException
) new IOException(e
.getMessage()).initCause(e
);
165 if (lp
.alias
!= null)
166 queryResults
.put(lp
.alias
, new IntermedResult(lp
));
169 public void registerSQL(String query
) throws IOException
{
170 registerQuery(sqlToPigLatin(query
));
173 private String
sqlToPigLatin(String sql
) throws IOException
{
175 throw new IOException("SQL support not yet implemented.");
178 public void newRelation(String id
) {
179 queryResults
.put(id
, new IntermedResult());
182 public void insertTuple(String id
, Tuple t
) throws IOException
{
183 if (!queryResults
.containsKey(id
)) throw new IOException("Attempt to insert tuple into nonexistent relation.");
184 queryResults
.get(id
).add(t
);
188 public Iterator
<Tuple
> openIterator(String id
, boolean continueFromLast
) throws IOException
{
189 if (!continueFromLast
)
190 return openIterator(id
);
192 if (pigContext
.getExecType()!= ExecType
.LOCAL
){
193 System
.err
.println("Streaming execution not supported in non-local mode.");
197 if (!queryResults
.containsKey(id
))
198 throw new IOException("Invalid alias: " + id
);
200 IntermedResult readFrom
= (IntermedResult
) queryResults
.get(id
);
202 if (readFrom
.getOutputType()==LogicalOperator
.FIXED
){
203 //Its not a continuous plan, just let it go through the non-continuous channel
204 return openIterator(id
);
207 PhysicalPlan pp
= null;
209 if (!physicalPlans
.containsKey(readFrom
)){
211 //Not trying to do sharing, so won't compile and exec it as in the other cases
212 //First check if some other operator tried to execute this intermediate result
213 if (readFrom
.executed() || readFrom
.compiled()){
214 //Compile it again, so that POSlaves are not added in between query execution
215 readFrom
.compile(queryResults
);
217 LogicalPlan lp
= new LogicalPlan(new LORead(pigContext
, readFrom
), pigContext
);
218 pp
= new PhysicalPlan(lp
,queryResults
);
219 physicalPlans
.put(readFrom
, pp
);
221 pp
= physicalPlans
.get(readFrom
);
224 return pp
.exec(continueFromLast
).content();
229 * Forces execution of query (and all queries from which it reads), in order to materialize
232 public Iterator
<Tuple
> openIterator(String id
) throws IOException
{
233 if (!queryResults
.containsKey(id
))
234 throw new IOException("Invalid alias: " + id
);
236 IntermedResult readFrom
= (IntermedResult
) queryResults
.get(id
);
238 readFrom
.compile(queryResults
);
240 if (pigContext
.getExecType() == ExecType
.LOCAL
)
241 return readFrom
.read().content();
242 final BinStorage p
= new BinStorage();
243 InputStream is
= FileLocalizer
.open(pigContext
.getExecType(), readFrom
.file
);
244 p
.bindTo(is
, 0, Long
.MAX_VALUE
);
245 return new Iterator
<Tuple
>() {
249 public boolean hasNext() {
257 } catch (Exception e
) {
265 public Tuple
next() {
273 } catch (Exception e
) {
281 public void remove() {
282 throw new RuntimeException("Removal not supported");
290 * Store an alias into a file
291 * @param id: The alias to store
292 * @param filename: The file to which to store to
293 * @throws IOException
296 public void store(String id
, String filename
) throws IOException
{
297 store(id
, filename
, PigStorage
.class.getName() + "()"); // SFPig is the default store function
301 * Continuous case of store: store the updates to particular alias into the given file
304 * @throws IOException
307 public void update(String id
, String filename
, boolean append
) throws IOException
{
308 update(id
, filename
, PigStorage
.class.getName() + "()", append
);
312 * forces execution of query (and all queries from which it reads), in order to store result in file
314 public void store(String id
, String filename
, String func
) throws IOException
{
315 filename
= removeQuotes(filename
);
317 if (!queryResults
.containsKey(id
))
318 throw new IOException("Invalid alias: " + id
);
320 IntermedResult readFrom
= (IntermedResult
) queryResults
.get(id
);
322 if (!readFrom
.executed()) {
323 if (pigContext
.getExecType() == ExecType
.GALAGO
) {
324 LogicalOperator root
= readFrom
.lp
.root();
325 LogicalOperator store
= new LOStore( pigContext
, root
, filename
, func
, false );
326 LogicalPlan lp
= new LogicalPlan( store
, pigContext
);
327 readFrom
= new IntermedResult( lp
);
329 readFrom
.compile(queryResults
);
330 if (pigContext
.getExecType() == ExecType
.MAPREDUCE
) {
331 POMapreduce pom
= (POMapreduce
)readFrom
.pp
.root
;
333 pom
.outputFile
= filename
;
335 pom
.outputFile
= filename
;
336 pom
.storeFunc
= func
;
342 //else if (func != null) {
343 // //XXX We need to fix this!
344 // throw new IOException("Cannot currently restore an evaluated result");
347 // TODO: following code assumes certain things about store/load functions
349 if (pigContext
.getExecType() != ExecType
.LOCAL
) {
350 if (readFrom
.file
== null) {
351 readFrom
.toDFSFile(filename
, func
);
354 String src
= readFrom
.file
.substring(FileLocalizer
.HADOOP_PREFIX
.length());
355 String dst
= filename
;
356 boolean islocal
= false;
357 if (dst
.startsWith(FileLocalizer
.LOCAL_PREFIX
)) {
358 dst
= dst
.substring(FileLocalizer
.LOCAL_PREFIX
.length());
361 if (dst
.startsWith(FileLocalizer
.HADOOP_PREFIX
)) {
362 dst
= dst
.substring(FileLocalizer
.HADOOP_PREFIX
.length());
364 if (readFrom
.isTemporary()) { // TODO: assumes we want to use same store func that was used last time we stored
366 MapReduceLauncher
.copy(src
, dst
, islocal
);
368 MapReduceLauncher
.rename(src
, dst
);
370 readFrom
.setPermanentFilename(dst
);
373 MapReduceLauncher
.copy(src
, dst
, islocal
);
377 // generate a simple logical plan to store the results
378 LogicalPlan lp
= new LogicalPlan(new LOStore(pigContext
, new LORead(pigContext
, readFrom
), filename
, func
,false), pigContext
);
380 // build a physical plan from the lp and then exec it,
381 // this will write the file to disk
382 (new PhysicalPlan(lp
, queryResults
)).exec(false);
387 * Continuous version of the store function above
390 public void update(String id
, String filename
, String func
, boolean append
) throws IOException
{
393 if (pigContext
.getExecType()!=ExecType
.LOCAL
){
394 System
.err
.println("Streaming execution not supported in non-local mode.");
398 filename
= removeQuotes(filename
);
400 if (!queryResults
.containsKey(id
))
401 throw new IOException("Invalid alias: " + id
);
403 IntermedResult readFrom
= (IntermedResult
) queryResults
.get(id
);
405 if (readFrom
.getOutputType()==LogicalOperator
.FIXED
){
406 //Its not a continuous plan, just let it go through the non-continuous channel
407 store(id
,filename
,func
);
411 PhysicalPlan pp
= null;
413 if (!physicalPlans
.containsKey(readFrom
)){
415 //Not trying to do sharing, so won't compile and exec it as in the other cases
416 //First check if some other operator tried to execute this intermediate result
417 if (readFrom
.executed() || readFrom
.compiled()){
418 //Compile it again, so that POSlaves are not added in between query execution
419 readFrom
.compile(queryResults
);
421 LogicalPlan lp
= new LogicalPlan(new LORead(pigContext
, readFrom
), pigContext
);
422 pp
= new PhysicalPlan(lp
,queryResults
);
423 physicalPlans
.put(readFrom
, pp
);
425 pp
= physicalPlans
.get(readFrom
);
427 new PhysicalPlan(new POStore(pp
.root
,filename
,func
,append
)).exec(true);
433 * Returns the unused byte capacity of an HDFS filesystem. This value does
434 * not take into account a replication factor, as that can vary from file
435 * to file. Thus if you are using this to determine if you data set will fit
436 * in the HDFS, you need to divide the result of this call by your specific replication
439 * @throws IOException
441 public long capacity() throws IOException
{
442 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
443 throw new IOException("capacity only supported for non-local execution");
445 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
446 return dfs
.getRawCapacity() - dfs
.getRawUsed();
451 * Returns the length of a file in bytes which exists in the HDFS (accounts for replication).
454 * @throws IOException
456 public long fileSize(String filename
) throws IOException
{
457 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
458 throw new IOException("stat only supported for non-local execution");
460 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
461 Path p
= new Path(filename
);
462 long len
= dfs
.getLength(p
);
463 long replication
= dfs
.getDefaultReplication(); // did not work, for some reason: dfs.getReplication(p);
464 return len
* replication
;
468 public boolean existsFile(String filename
) throws IOException
{
469 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
470 throw new IOException("existsFile only supported for non-local execution");
472 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
473 return dfs
.exists(new Path(filename
));
477 public boolean deleteFile(String filename
) throws IOException
{
478 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
479 throw new IOException("deleteFile only supported for non-local execution");
481 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
482 return dfs
.delete(new Path(filename
));
486 public boolean renameFile(String source
, String target
) throws IOException
{
487 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
488 throw new IOException("renameFile only supported for non-local execution");
490 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
491 return dfs
.rename(new Path(source
), new Path(target
));
495 public boolean mkdirs(String dirs
) throws IOException
{
496 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
497 throw new IOException("renameFile only supported for non-local execution");
499 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
500 return dfs
.mkdirs(new Path(dirs
));
504 public String
[] listPaths(String dir
) throws IOException
{
505 if (pigContext
.getExecType() == ExecType
.LOCAL
) {
506 throw new IOException("renameFile only supported for non-local execution");
508 DistributedFileSystem dfs
= (DistributedFileSystem
) MapReduceLauncher
.getDfs();
509 Path paths
[] = dfs
.listPaths(new Path(dir
));
510 String strPaths
[] = new String
[paths
.length
];
511 for (int i
= 0; i
< paths
.length
; i
++) {
512 strPaths
[i
] = paths
[i
].toString();
518 public long totalHadoopTimeSpent() {
519 return MapReduceLauncher
.totalHadoopTimeSpent
;
522 // public static class PStat {
523 // // These are the members of the unix fstat and stat structure
524 // // we currently don't implement all of them, but try to provide
525 // // a mirror for the function and values for HDFS files
526 // // ====================================================
528 // // dev_t st_dev; /* device inode resides on */
529 // // ino_t st_ino; /* inode's number */
530 // // mode_t st_mode; /* inode protection mode */
531 // // nlink_t st_nlink; /* number or hard links to the file */
532 // // uid_t st_uid; /* user-id of owner */
533 // // gid_t st_gid; /* group-id of owner */
534 // // dev_t st_rdev; /* device type, for special file inode */
535 // // struct timespec st_atimespec; /* time of last access */
536 // // struct timespec st_mtimespec; /* time of last data modification */
537 // // struct timespec st_ctimespec; /* time of last file status change */
538 // // off_t st_size; /* file size, in bytes */
539 // // quad_t st_blocks; /* blocks allocated for file */
540 // // u_long st_blksize;/* optimal file sys I/O ops blocksize */
541 // // u_long st_flags; /* user defined flags for file */
542 // // u_long st_gen; /* file generation number */
544 // public long st_size;
545 // public long st_blocks;
548 private String
removeQuotes(String str
) {
549 if (str
.startsWith("\'") && str
.endsWith("\'"))
550 return str
.substring(1, str
.length() - 1);