Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / builtin / PigStorage.java
blobce5f8dc2820712661f41e5f3adc05e5f5a366a83
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.builtin;
7 import java.io.BufferedReader;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.InputStreamReader;
11 import java.io.OutputStream;
12 import java.text.SimpleDateFormat;
13 import java.util.Iterator;
15 import com.yahoo.pig.StorageFunc;
16 import com.yahoo.pig.PigServer;
17 import com.yahoo.pig.data.TimestampedTuple;
18 import com.yahoo.pig.data.Tuple;
19 import com.yahoo.pig.impl.io.FileLocalizer;
20 import com.yahoo.pig.impl.io.InputStreamPosition;
22 /**
23 * A load function that parses a line of input into fields using a delimiter to set the fields. The
24 * delimiter is given as a regular expression. See String.split(delimiter) and
25 * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information.
27 public class PigStorage extends StorageFunc {
28 Iterator<Tuple> i = null;
29 BufferedReader fr = null;
30 InputStreamPosition posInputStream = null;
31 InputStream fsis = null;
32 long start = Long.MIN_VALUE;
33 long end = Long.MAX_VALUE;
34 private String recordDel = "\n";
35 private String fieldDel = "\t";
37 boolean toTimestamp = false;
38 int timestampColumn = -1;
39 SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z");
40 //e.g., 26/Jan/2006:09:43:37 -0700, as found in apache web logs
42 public PigStorage() {
45 /**
46 * Constructs a Pig loader that uses specified regex as a field delimiter.
48 * @param delimiter
49 * the regular expression that is used to separate fields. ("\t" is the default.) See
50 * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for complete
51 * explanation.
53 public PigStorage(String delimiter) {
54 this.fieldDel = delimiter;
57 public PigStorage(String fieldDel, String recordDel) {
58 this.fieldDel = fieldDel;
59 this.recordDel = recordDel;
62 public PigStorage(String delimiter, String recordDel, String timestampColumn){
63 this.fieldDel = delimiter;
64 this.recordDel = recordDel;
65 toTimestamp = true;
66 try{
67 this.timestampColumn = Integer.parseInt(timestampColumn);
68 }catch(NumberFormatException e){
69 new RuntimeException("Timestamp column should be a valid number, using 0").printStackTrace();
70 this.timestampColumn = 0;
74 public PigStorage(String delimiter, String recordDel, String timestampColumn, String dateFormat){
75 this.fieldDel = delimiter;
76 this.recordDel = recordDel;
77 toTimestamp = true;
78 try{
79 this.timestampColumn = Integer.parseInt(timestampColumn);
80 }catch(NumberFormatException e){
81 new RuntimeException("Timestamp column should be a valid number, using 0").printStackTrace();
82 this.timestampColumn = 0;
84 this.dateFormat = new SimpleDateFormat(dateFormat);
87 public Tuple getNext() throws IOException {
88 if (fsis == null || (posInputStream != null && posInputStream.getPosition() > end)) {
89 return null;
91 String line;
92 if((line = fr.readLine()) != null) {
93 if (!toTimestamp) {
94 Tuple t = new Tuple(line, fieldDel);
95 return t;
96 } else {
97 return new TimestampedTuple(line, fieldDel,timestampColumn,dateFormat);
100 return null;
103 public void bindTo(InputStream is, long offset, long end) throws IOException {
104 this.posInputStream = new InputStreamPosition(is, offset);
105 this.fr = new BufferedReader(new InputStreamReader(posInputStream));
106 this.start = offset;
107 this.end = end;
108 this.fsis = is;
110 // Since we are not block aligned we throw away the first
111 // record and cound on a different instance to read it
112 if (offset != 0) {
113 if(getNext() == null) {
114 throw new RuntimeException("null tuple returned from skip " + this.start + " " + this.end + " " + posInputStream.getPosition() );
119 OutputStream os;
120 @Override
121 public void bindTo(OutputStream os) throws IOException {
122 this.os = os;
125 @Override
126 public void putNext(Tuple f) throws IOException {
127 os.write((f.toDelimitedString(this.fieldDel) + this.recordDel).getBytes());
130 @Override
131 public void done() throws IOException {