Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / mapreduceExec / PigOutputFormat.java
blob69318b2c8cc9d439831a52c0d1f755d78db4b605
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.mapreduceExec;
7 import java.io.IOException;
8 import java.io.ObjectInputStream;
10 import org.apache.hadoop.fs.FSDataOutputStream;
11 import org.apache.hadoop.fs.FileSystem;
12 import org.apache.hadoop.fs.Path;
13 import org.apache.hadoop.io.Writable;
14 import org.apache.hadoop.io.WritableComparable;
15 import org.apache.hadoop.mapred.JobConf;
16 import org.apache.hadoop.mapred.OutputFormat;
17 import org.apache.hadoop.mapred.RecordWriter;
18 import org.apache.hadoop.mapred.Reporter;
19 import org.apache.hadoop.util.Progressable;
21 import com.yahoo.pig.StorageFunc;
22 import com.yahoo.pig.builtin.PigStorage;
23 import com.yahoo.pig.data.Tuple;
24 import com.yahoo.pig.impl.PigContext;
25 import com.yahoo.pig.impl.logicalLayer.parser.QueryParser;
27 public class PigOutputFormat implements OutputFormat {
28 public static final String PIG_OUTPUT_FUNC = "pig.output.func";
30 public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)
31 throws IOException {
32 StorageFunc store;
33 Path outputDir = job.getOutputPath();
34 String storeFunc = job.get("pig.storeFunc", "");
35 if (storeFunc.length() == 0) {
36 store = new PigStorage();
37 } else {
38 try {
39 store = (StorageFunc) PigContext.instantiateArgFunc(storeFunc);
40 } catch (Exception e) {
41 RuntimeException re = new RuntimeException(e.getClass().getName() + ": " + e.getMessage());
42 re.setStackTrace(e.getStackTrace());
43 throw re;
46 return new PigRecordWriter(fs, new Path(outputDir, name), store);
49 public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
50 // TODO We really should validate things here
51 return;
54 static public class PigRecordWriter implements RecordWriter {
55 private FSDataOutputStream os = null;
56 private StorageFunc sfunc = null;
58 public PigRecordWriter(FileSystem fs, Path file, StorageFunc sfunc) throws IOException {
59 this.sfunc = sfunc;
60 fs.delete(file);
61 this.os = fs.create(file);
62 this.sfunc.bindTo(os);
65 /**
66 * We only care about the values, so we are going to skip the keys when we write.
68 * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable,
69 * org.apache.hadoop.io.Writable)
71 public void write(WritableComparable key, Writable value) throws IOException {
72 this.sfunc.putNext((Tuple) value);
75 public void close(Reporter reporter) throws IOException {
76 sfunc.done();
77 os.close();