4 * September 3, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import com
.yahoo
.pig
.PigServer
.ExecType
;
12 import com
.yahoo
.pig
.StorageFunc
;
13 import com
.yahoo
.pig
.data
.DataAtom
;
14 import com
.yahoo
.pig
.impl
.PigContext
;
15 import com
.yahoo
.pig
.impl
.io
.FileLocalizer
;
16 import galago
.tupleflow
.InputClass
;
17 import galago
.tupleflow
.OutputClass
;
18 import galago
.tupleflow
.StandardStep
;
19 import galago
.tupleflow
.TupleFlowParameters
;
20 import galago
.tupleflow
.execution
.ErrorHandler
;
22 import java
.io
.IOException
;
23 import java
.io
.InputStream
;
24 import java
.lang
.reflect
.InvocationTargetException
;
25 import java
.util
.zip
.GZIPInputStream
;
32 @InputClass(className
= "com.yahoo.pig.impl.galago.Tuple")
33 @OutputClass(className
= "com.yahoo.pig.impl.galago.Tuple")
34 public class ReadTuples
extends StandardStep
<Tuple
, Tuple
> {
38 /** Creates a new instance of ReadTuples */
39 public ReadTuples(TupleFlowParameters parameters
) throws IOException
, ClassNotFoundException
, NoSuchMethodException
, InstantiationException
, IllegalAccessException
, InvocationTargetException
{
40 funcSpec
= parameters
.getXML().get("function");
43 public void process(Tuple t
) throws IOException
{
44 String filename
= ((DataAtom
) t
.getField(1)).strval();
45 long offset
= ((DataAtom
) t
.getField(2)).numval().longValue();
46 long end
= ((DataAtom
) t
.getField(3)).numval().longValue();
47 StorageFunc function
= null;
50 function
= (StorageFunc
) PigContext
.instantiateArgFunc(funcSpec
);
51 } catch (Exception e
) {
52 throw (IOException
) new IOException( "Couldn't instantiate storage function" ).initCause(e
);
55 InputStream stream
= FileLocalizer
.open(ExecType
.LOCAL
, filename
);
57 if (filename
.endsWith(".gz")) {
58 stream
= new GZIPInputStream(stream
);
62 function
.bindTo(stream
, offset
, end
);
63 com
.yahoo
.pig
.data
.Tuple tuple
;
65 while ((tuple
= function
.getNext()) != null) {
66 Tuple wrapped
= new Tuple();
67 wrapped
.copyFrom(tuple
);
68 processor
.process(wrapped
);
74 public static void verify(TupleFlowParameters parameters
, ErrorHandler handler
) {
75 if (!parameters
.getXML().containsKey("function")) {
76 handler
.addError("'function' is a required parameter of ReadTuples.");