2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.mapreduceExec
;
7 import java
.io
.IOException
;
8 import java
.io
.ObjectInputStream
;
10 import org
.apache
.hadoop
.fs
.FSDataOutputStream
;
11 import org
.apache
.hadoop
.fs
.FileSystem
;
12 import org
.apache
.hadoop
.fs
.Path
;
13 import org
.apache
.hadoop
.io
.Writable
;
14 import org
.apache
.hadoop
.io
.WritableComparable
;
15 import org
.apache
.hadoop
.mapred
.JobConf
;
16 import org
.apache
.hadoop
.mapred
.OutputFormat
;
17 import org
.apache
.hadoop
.mapred
.RecordWriter
;
18 import org
.apache
.hadoop
.mapred
.Reporter
;
19 import org
.apache
.hadoop
.util
.Progressable
;
21 import com
.yahoo
.pig
.StorageFunc
;
22 import com
.yahoo
.pig
.builtin
.PigStorage
;
23 import com
.yahoo
.pig
.data
.Tuple
;
24 import com
.yahoo
.pig
.impl
.PigContext
;
25 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.QueryParser
;
27 public class PigOutputFormat
implements OutputFormat
{
28 public static final String PIG_OUTPUT_FUNC
= "pig.output.func";
30 public RecordWriter
getRecordWriter(FileSystem fs
, JobConf job
, String name
, Progressable progress
)
33 Path outputDir
= job
.getOutputPath();
34 String storeFunc
= job
.get("pig.storeFunc", "");
35 if (storeFunc
.length() == 0) {
36 store
= new PigStorage();
39 store
= (StorageFunc
) PigContext
.instantiateArgFunc(storeFunc
);
40 } catch (Exception e
) {
41 RuntimeException re
= new RuntimeException(e
.getClass().getName() + ": " + e
.getMessage());
42 re
.setStackTrace(e
.getStackTrace());
46 return new PigRecordWriter(fs
, new Path(outputDir
, name
), store
);
49 public void checkOutputSpecs(FileSystem fs
, JobConf job
) throws IOException
{
50 // TODO We really should validate things here
54 static public class PigRecordWriter
implements RecordWriter
{
55 private FSDataOutputStream os
= null;
56 private StorageFunc sfunc
= null;
58 public PigRecordWriter(FileSystem fs
, Path file
, StorageFunc sfunc
) throws IOException
{
61 this.os
= fs
.create(file
);
62 this.sfunc
.bindTo(os
);
66 * We only care about the values, so we are going to skip the keys when we write.
68 * @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable,
69 * org.apache.hadoop.io.Writable)
71 public void write(WritableComparable key
, Writable value
) throws IOException
{
72 this.sfunc
.putNext((Tuple
) value
);
75 public void close(Reporter reporter
) throws IOException
{