2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.builtin
;
7 import java
.io
.BufferedReader
;
8 import java
.io
.IOException
;
9 import java
.io
.InputStream
;
10 import java
.io
.InputStreamReader
;
11 import java
.io
.OutputStream
;
12 import java
.text
.SimpleDateFormat
;
13 import java
.util
.Iterator
;
15 import com
.yahoo
.pig
.StorageFunc
;
16 import com
.yahoo
.pig
.PigServer
;
17 import com
.yahoo
.pig
.data
.TimestampedTuple
;
18 import com
.yahoo
.pig
.data
.Tuple
;
19 import com
.yahoo
.pig
.impl
.io
.FileLocalizer
;
20 import com
.yahoo
.pig
.impl
.io
.InputStreamPosition
;
23 * A load function that parses a line of input into fields using a delimiter to set the fields. The
24 * delimiter is given as a regular expression. See String.split(delimiter) and
25 * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for more information.
27 public class PigStorage
extends StorageFunc
{
28 Iterator
<Tuple
> i
= null;
29 BufferedReader fr
= null;
30 InputStreamPosition posInputStream
= null;
31 InputStream fsis
= null;
32 long start
= Long
.MIN_VALUE
;
33 long end
= Long
.MAX_VALUE
;
34 private String recordDel
= "\n";
35 private String fieldDel
= "\t";
37 boolean toTimestamp
= false;
38 int timestampColumn
= -1;
39 SimpleDateFormat dateFormat
= new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z");
40 //e.g., 26/Jan/2006:09:43:37 -0700, as found in apache web logs
46 * Constructs a Pig loader that uses specified regex as a field delimiter.
49 * the regular expression that is used to separate fields. ("\t" is the default.) See
50 * http://java.sun.com/j2se/1.5.0/docs/api/java/util/regex/Pattern.html for complete
53 public PigStorage(String delimiter
) {
54 this.fieldDel
= delimiter
;
57 public PigStorage(String fieldDel
, String recordDel
) {
58 this.fieldDel
= fieldDel
;
59 this.recordDel
= recordDel
;
62 public PigStorage(String delimiter
, String recordDel
, String timestampColumn
){
63 this.fieldDel
= delimiter
;
64 this.recordDel
= recordDel
;
67 this.timestampColumn
= Integer
.parseInt(timestampColumn
);
68 }catch(NumberFormatException e
){
69 new RuntimeException("Timestamp column should be a valid number, using 0").printStackTrace();
70 this.timestampColumn
= 0;
74 public PigStorage(String delimiter
, String recordDel
, String timestampColumn
, String dateFormat
){
75 this.fieldDel
= delimiter
;
76 this.recordDel
= recordDel
;
79 this.timestampColumn
= Integer
.parseInt(timestampColumn
);
80 }catch(NumberFormatException e
){
81 new RuntimeException("Timestamp column should be a valid number, using 0").printStackTrace();
82 this.timestampColumn
= 0;
84 this.dateFormat
= new SimpleDateFormat(dateFormat
);
87 public Tuple
getNext() throws IOException
{
88 if (fsis
== null || (posInputStream
!= null && posInputStream
.getPosition() > end
)) {
92 if((line
= fr
.readLine()) != null) {
94 Tuple t
= new Tuple(line
, fieldDel
);
97 return new TimestampedTuple(line
, fieldDel
,timestampColumn
,dateFormat
);
103 public void bindTo(InputStream is
, long offset
, long end
) throws IOException
{
104 this.posInputStream
= new InputStreamPosition(is
, offset
);
105 this.fr
= new BufferedReader(new InputStreamReader(posInputStream
));
110 // Since we are not block aligned we throw away the first
111 // record and cound on a different instance to read it
113 if(getNext() == null) {
114 throw new RuntimeException("null tuple returned from skip " + this.start
+ " " + this.end
+ " " + posInputStream
.getPosition() );
121 public void bindTo(OutputStream os
) throws IOException
{
126 public void putNext(Tuple f
) throws IOException
{
127 os
.write((f
.toDelimitedString(this.fieldDel
) + this.recordDel
).getBytes());
131 public void done() throws IOException
{