Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / PigServer.java
blob2346f9e517fe03c72de27800703eac0aa25f9778
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig;
7 import com.yahoo.pig.impl.galago.POGalago;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.util.HashMap;
11 import java.util.Iterator;
12 import java.util.Map;
14 import org.apache.hadoop.dfs.DistributedFileSystem;
15 import org.apache.hadoop.fs.Path;
17 import com.yahoo.pig.builtin.BinStorage;
18 import com.yahoo.pig.builtin.PigStorage;
19 import com.yahoo.pig.data.*;
20 import com.yahoo.pig.impl.PigContext;
21 import com.yahoo.pig.impl.io.FileLocalizer;
22 import com.yahoo.pig.impl.logicalLayer.LORead;
23 import com.yahoo.pig.impl.logicalLayer.LOStore;
24 import com.yahoo.pig.impl.logicalLayer.LogicalOperator;
25 import com.yahoo.pig.impl.logicalLayer.LogicalPlan;
26 import com.yahoo.pig.impl.logicalLayer.LogicalPlanBuilder;
27 import com.yahoo.pig.impl.logicalLayer.parser.ParseException;
28 import com.yahoo.pig.impl.mapreduceExec.MapReduceLauncher;
29 import com.yahoo.pig.impl.physicalLayer.IntermedResult;
30 import com.yahoo.pig.impl.physicalLayer.POMapreduce;
31 import com.yahoo.pig.impl.physicalLayer.PhysicalPlan;
32 import com.yahoo.pig.impl.physicalLayer.POStore;
33 import com.yahoo.pig.impl.pigbodyExec.PigBodyLauncher;
35 /**
37 * This class is the program's connection to Pig. Typically a program will create a PigServer
38 * instance and call UseMapreduce(). The programmer then registers queries using registerQuery() and
39 * retrieves results using openIterator() or store().
42 public class PigServer {
44 /**
45 * The type of query execution
47 static public enum ExecType {
48 /**
49 * Run everything on the local machine
51 LOCAL,
52 /**
53 * Use the Hadoop Map/Reduce framework
55 MAPREDUCE,
56 /**
57 * Use the Experimental Hadoop framework; not available yet.
59 PIG,
60 /**
61 * Use the Galago TupleFlow framework
63 GALAGO;
66 private static ExecType parseExecType(String str) throws IOException {
67 String normStr = str.toLowerCase();
69 if (normStr.equals("local")) return ExecType.LOCAL;
70 if (normStr.equals("mapreduce")) return ExecType.MAPREDUCE;
71 if (normStr.equals("mapred")) return ExecType.MAPREDUCE;
72 if (normStr.equals("pig")) return ExecType.PIG;
73 if (normStr.equals("pigbody")) return ExecType.PIG;
74 if (normStr.equals("galago")) return ExecType.GALAGO;
76 throw new IOException("Unrecognized exec type: " + str);
78 /**
79 * a table mapping intermediate results to physical plans that read them (the state
80 * of how much has been read is maintained in PORead
82 Map<IntermedResult, PhysicalPlan> physicalPlans = new HashMap<IntermedResult, PhysicalPlan>();
83 /**
84 * a table mapping ID's to intermediate results
86 Map<String, IntermedResult> queryResults = new HashMap<String, IntermedResult>();
88 PigContext pigContext = null;
90 public PigServer(String execType) throws IOException {
91 this(parseExecType(execType));
94 public PigServer(String execType, String filesystemLocation, String jobtrackerLocation) throws IOException {
95 this(parseExecType(execType), filesystemLocation, jobtrackerLocation);
98 public PigServer(ExecType execType) {
99 this(execType, null, null);
102 public PigServer(ExecType execType, String filesystemLocation, String jobtrackerLocation) {
103 pigContext = new PigContext(execType);
105 // propagate the two locations to the configuration
106 if (filesystemLocation != null) MapReduceLauncher.setFilesystemLocation(filesystemLocation);
107 if (jobtrackerLocation != null) MapReduceLauncher.setJobtrackerLocation(jobtrackerLocation);
108 if (filesystemLocation != null) PigBodyLauncher.setFilesystemLocation(filesystemLocation);
109 if (jobtrackerLocation != null) PigBodyLauncher.setJobtrackerLocation(jobtrackerLocation);
112 public void debugOn() {
113 MapReduceLauncher.debug = true;
116 public void debugOff() {
117 MapReduceLauncher.debug = false;
121 * Defines an alias for the given function spec. This
122 * is useful for functions that require arguments to the
123 * constructor.
125 * @param alias - the new function alias to define.
126 * @param functionSpec - the name of the function and any arguments.
127 * It should have the form: classname('arg1', 'arg2', ...)
129 public void registerFunction(String function, String functionSpec) {
130 pigContext.registerFunction(function, functionSpec);
133 public void registerJar(String path) {
134 MapReduceLauncher.addJar(path);
138 * Register a query with the Pig runtime. The query is parsed and registered, but it is not
139 * executed until it is needed.
141 * @param query
142 * a Pig Latin expression to be evaluated.
143 * @return a handle to the query.
144 * @throws IOException
146 public void registerQuery(String query) throws IOException {
147 // Bugzilla Bug 1006706 -- ignore empty queries
148 //=============================================
149 if(query != null) {
150 query = query.trim();
151 if(query.length() == 0) return;
152 }else {
153 return;
156 // parse the query into a logical plan
157 LogicalPlan lp = null;
158 try {
159 lp = (new LogicalPlanBuilder(pigContext).parse(query, queryResults));
160 // System.out.println(lp.toString());
161 } catch (ParseException e) {
162 throw (IOException) new IOException(e.getMessage()).initCause(e);
165 if (lp.alias != null)
166 queryResults.put(lp.alias, new IntermedResult(lp));
169 public void registerSQL(String query) throws IOException {
170 registerQuery(sqlToPigLatin(query));
173 private String sqlToPigLatin(String sql) throws IOException {
174 // TODO
175 throw new IOException("SQL support not yet implemented.");
178 public void newRelation(String id) {
179 queryResults.put(id, new IntermedResult());
182 public void insertTuple(String id, Tuple t) throws IOException {
183 if (!queryResults.containsKey(id)) throw new IOException("Attempt to insert tuple into nonexistent relation.");
184 queryResults.get(id).add(t);
188 public Iterator<Tuple> openIterator(String id, boolean continueFromLast) throws IOException{
189 if (!continueFromLast)
190 return openIterator(id);
192 if (pigContext.getExecType()!= ExecType.LOCAL){
193 System.err.println("Streaming execution not supported in non-local mode.");
194 System.exit(-1);
197 if (!queryResults.containsKey(id))
198 throw new IOException("Invalid alias: " + id);
200 IntermedResult readFrom = (IntermedResult) queryResults.get(id);
202 if (readFrom.getOutputType()==LogicalOperator.FIXED){
203 //Its not a continuous plan, just let it go through the non-continuous channel
204 return openIterator(id);
207 PhysicalPlan pp = null;
209 if (!physicalPlans.containsKey(readFrom)){
211 //Not trying to do sharing, so won't compile and exec it as in the other cases
212 //First check if some other operator tried to execute this intermediate result
213 if (readFrom.executed() || readFrom.compiled()){
214 //Compile it again, so that POSlaves are not added in between query execution
215 readFrom.compile(queryResults);
217 LogicalPlan lp = new LogicalPlan(new LORead(pigContext, readFrom), pigContext);
218 pp = new PhysicalPlan(lp,queryResults);
219 physicalPlans.put(readFrom, pp);
220 }else{
221 pp = physicalPlans.get(readFrom);
224 return pp.exec(continueFromLast).content();
229 * Forces execution of query (and all queries from which it reads), in order to materialize
230 * result
232 public Iterator<Tuple> openIterator(String id) throws IOException {
233 if (!queryResults.containsKey(id))
234 throw new IOException("Invalid alias: " + id);
236 IntermedResult readFrom = (IntermedResult) queryResults.get(id);
238 readFrom.compile(queryResults);
239 readFrom.exec();
240 if (pigContext.getExecType() == ExecType.LOCAL)
241 return readFrom.read().content();
242 final BinStorage p = new BinStorage();
243 InputStream is = FileLocalizer.open(pigContext.getExecType(), readFrom.file);
244 p.bindTo(is, 0, Long.MAX_VALUE);
245 return new Iterator<Tuple>() {
246 Tuple t;
247 boolean atEnd;
249 public boolean hasNext() {
250 if (atEnd)
251 return false;
252 try {
253 if (t == null)
254 t = p.getNext();
255 if (t == null)
256 atEnd = true;
257 } catch (Exception e) {
258 e.printStackTrace();
259 t = null;
260 atEnd = true;
262 return !atEnd;
265 public Tuple next() {
266 Tuple next = t;
267 if (next != null) {
268 t = null;
269 return next;
271 try {
272 next = p.getNext();
273 } catch (Exception e) {
274 e.printStackTrace();
276 if (next == null)
277 atEnd = true;
278 return next;
281 public void remove() {
282 throw new RuntimeException("Removal not supported");
290 * Store an alias into a file
291 * @param id: The alias to store
292 * @param filename: The file to which to store to
293 * @throws IOException
296 public void store(String id, String filename) throws IOException {
297 store(id, filename, PigStorage.class.getName() + "()"); // SFPig is the default store function
301 * Continuous case of store: store the updates to particular alias into the given file
302 * @param id
303 * @param filename
304 * @throws IOException
307 public void update(String id, String filename, boolean append) throws IOException {
308 update(id, filename, PigStorage.class.getName() + "()", append);
312 * forces execution of query (and all queries from which it reads), in order to store result in file
314 public void store(String id, String filename, String func) throws IOException {
315 filename = removeQuotes(filename);
317 if (!queryResults.containsKey(id))
318 throw new IOException("Invalid alias: " + id);
320 IntermedResult readFrom = (IntermedResult) queryResults.get(id);
322 if (!readFrom.executed()) {
323 if (pigContext.getExecType() == ExecType.GALAGO) {
324 LogicalOperator root = readFrom.lp.root();
325 LogicalOperator store = new LOStore( pigContext, root, filename, func, false );
326 LogicalPlan lp = new LogicalPlan( store, pigContext );
327 readFrom = new IntermedResult( lp );
329 readFrom.compile(queryResults);
330 if (pigContext.getExecType() == ExecType.MAPREDUCE) {
331 POMapreduce pom = (POMapreduce)readFrom.pp.root;
332 if (func == null) {
333 pom.outputFile = filename;
334 } else {
335 pom.outputFile = filename;
336 pom.storeFunc = func;
339 readFrom.exec();
340 return;
342 //else if (func != null) {
343 // //XXX We need to fix this!
344 // throw new IOException("Cannot currently restore an evaluated result");
347 // TODO: following code assumes certain things about store/load functions
349 if (pigContext.getExecType() != ExecType.LOCAL) {
350 if (readFrom.file == null) {
351 readFrom.toDFSFile(filename, func);
352 return;
354 String src = readFrom.file.substring(FileLocalizer.HADOOP_PREFIX.length());
355 String dst = filename;
356 boolean islocal = false;
357 if (dst.startsWith(FileLocalizer.LOCAL_PREFIX)) {
358 dst = dst.substring(FileLocalizer.LOCAL_PREFIX.length());
359 islocal = true;
361 if (dst.startsWith(FileLocalizer.HADOOP_PREFIX)) {
362 dst = dst.substring(FileLocalizer.HADOOP_PREFIX.length());
364 if (readFrom.isTemporary()) { // TODO: assumes we want to use same store func that was used last time we stored
365 if (islocal) {
366 MapReduceLauncher.copy(src, dst, islocal);
367 } else {
368 MapReduceLauncher.rename(src, dst);
370 readFrom.setPermanentFilename(dst);
371 } else {
372 if (islocal) {
373 MapReduceLauncher.copy(src, dst, islocal);
376 } else {
377 // generate a simple logical plan to store the results
378 LogicalPlan lp = new LogicalPlan(new LOStore(pigContext, new LORead(pigContext, readFrom), filename, func,false), pigContext);
380 // build a physical plan from the lp and then exec it,
381 // this will write the file to disk
382 (new PhysicalPlan(lp, queryResults)).exec(false);
387 * Continuous version of the store function above
390 public void update(String id, String filename, String func, boolean append) throws IOException{
393 if (pigContext.getExecType()!=ExecType.LOCAL){
394 System.err.println("Streaming execution not supported in non-local mode.");
395 System.exit(-1);
398 filename = removeQuotes(filename);
400 if (!queryResults.containsKey(id))
401 throw new IOException("Invalid alias: " + id);
403 IntermedResult readFrom = (IntermedResult) queryResults.get(id);
405 if (readFrom.getOutputType()==LogicalOperator.FIXED){
406 //Its not a continuous plan, just let it go through the non-continuous channel
407 store(id,filename,func);
408 return;
411 PhysicalPlan pp = null;
413 if (!physicalPlans.containsKey(readFrom)){
415 //Not trying to do sharing, so won't compile and exec it as in the other cases
416 //First check if some other operator tried to execute this intermediate result
417 if (readFrom.executed() || readFrom.compiled()){
418 //Compile it again, so that POSlaves are not added in between query execution
419 readFrom.compile(queryResults);
421 LogicalPlan lp = new LogicalPlan(new LORead(pigContext, readFrom), pigContext);
422 pp = new PhysicalPlan(lp,queryResults);
423 physicalPlans.put(readFrom, pp);
424 }else{
425 pp = physicalPlans.get(readFrom);
427 new PhysicalPlan(new POStore(pp.root,filename,func,append)).exec(true);
433 * Returns the unused byte capacity of an HDFS filesystem. This value does
434 * not take into account a replication factor, as that can vary from file
435 * to file. Thus if you are using this to determine if you data set will fit
436 * in the HDFS, you need to divide the result of this call by your specific replication
437 * setting.
438 * @return
439 * @throws IOException
441 public long capacity() throws IOException {
442 if (pigContext.getExecType() == ExecType.LOCAL) {
443 throw new IOException("capacity only supported for non-local execution");
444 } else {
445 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
446 return dfs.getRawCapacity() - dfs.getRawUsed();
451 * Returns the length of a file in bytes which exists in the HDFS (accounts for replication).
452 * @param filename
453 * @return
454 * @throws IOException
456 public long fileSize(String filename) throws IOException {
457 if (pigContext.getExecType() == ExecType.LOCAL) {
458 throw new IOException("stat only supported for non-local execution");
459 } else {
460 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
461 Path p = new Path(filename);
462 long len = dfs.getLength(p);
463 long replication = dfs.getDefaultReplication(); // did not work, for some reason: dfs.getReplication(p);
464 return len * replication;
468 public boolean existsFile(String filename) throws IOException {
469 if (pigContext.getExecType() == ExecType.LOCAL) {
470 throw new IOException("existsFile only supported for non-local execution");
471 } else {
472 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
473 return dfs.exists(new Path(filename));
477 public boolean deleteFile(String filename) throws IOException {
478 if (pigContext.getExecType() == ExecType.LOCAL) {
479 throw new IOException("deleteFile only supported for non-local execution");
480 } else {
481 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
482 return dfs.delete(new Path(filename));
486 public boolean renameFile(String source, String target) throws IOException {
487 if (pigContext.getExecType() == ExecType.LOCAL) {
488 throw new IOException("renameFile only supported for non-local execution");
489 } else {
490 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
491 return dfs.rename(new Path(source), new Path(target));
495 public boolean mkdirs(String dirs) throws IOException {
496 if (pigContext.getExecType() == ExecType.LOCAL) {
497 throw new IOException("renameFile only supported for non-local execution");
498 } else {
499 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
500 return dfs.mkdirs(new Path(dirs));
504 public String[] listPaths(String dir) throws IOException {
505 if (pigContext.getExecType() == ExecType.LOCAL) {
506 throw new IOException("renameFile only supported for non-local execution");
507 } else {
508 DistributedFileSystem dfs = (DistributedFileSystem) MapReduceLauncher.getDfs();
509 Path paths[] = dfs.listPaths(new Path(dir));
510 String strPaths[] = new String[paths.length];
511 for (int i = 0; i < paths.length; i++) {
512 strPaths[i] = paths[i].toString();
514 return strPaths;
518 public long totalHadoopTimeSpent() {
519 return MapReduceLauncher.totalHadoopTimeSpent;
522 // public static class PStat {
523 // // These are the members of the unix fstat and stat structure
524 // // we currently don't implement all of them, but try to provide
525 // // a mirror for the function and values for HDFS files
526 // // ====================================================
527 // // struct stat {
528 // // dev_t st_dev; /* device inode resides on */
529 // // ino_t st_ino; /* inode's number */
530 // // mode_t st_mode; /* inode protection mode */
531 // // nlink_t st_nlink; /* number or hard links to the file */
532 // // uid_t st_uid; /* user-id of owner */
533 // // gid_t st_gid; /* group-id of owner */
534 // // dev_t st_rdev; /* device type, for special file inode */
535 // // struct timespec st_atimespec; /* time of last access */
536 // // struct timespec st_mtimespec; /* time of last data modification */
537 // // struct timespec st_ctimespec; /* time of last file status change */
538 // // off_t st_size; /* file size, in bytes */
539 // // quad_t st_blocks; /* blocks allocated for file */
540 // // u_long st_blksize;/* optimal file sys I/O ops blocksize */
541 // // u_long st_flags; /* user defined flags for file */
542 // // u_long st_gen; /* file generation number */
543 // // };
544 // public long st_size;
545 // public long st_blocks;
546 // }
548 private String removeQuotes(String str) {
549 if (str.startsWith("\'") && str.endsWith("\'"))
550 return str.substring(1, str.length() - 1);
551 else
552 return str;