Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / ReadTuples.java
blob6406faacfa18afd44c5ff9e1a832317505a383fe
1 /*
2 * ReadTuples
4 * September 3, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import com.yahoo.pig.PigServer.ExecType;
12 import com.yahoo.pig.StorageFunc;
13 import com.yahoo.pig.data.DataAtom;
14 import com.yahoo.pig.impl.PigContext;
15 import com.yahoo.pig.impl.io.FileLocalizer;
16 import galago.tupleflow.InputClass;
17 import galago.tupleflow.OutputClass;
18 import galago.tupleflow.StandardStep;
19 import galago.tupleflow.TupleFlowParameters;
20 import galago.tupleflow.execution.ErrorHandler;
21 import java.io.File;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.lang.reflect.InvocationTargetException;
25 import java.util.zip.GZIPInputStream;
27 /**
29 * @author trevor
32 @InputClass(className = "com.yahoo.pig.impl.galago.Tuple")
33 @OutputClass(className = "com.yahoo.pig.impl.galago.Tuple")
34 public class ReadTuples extends StandardStep<Tuple, Tuple> {
36 String funcSpec;
38 /** Creates a new instance of ReadTuples */
39 public ReadTuples(TupleFlowParameters parameters) throws IOException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException {
40 funcSpec = parameters.getXML().get("function");
43 public void process(Tuple t) throws IOException {
44 String filename = ((DataAtom) t.getField(1)).strval();
45 long offset = ((DataAtom) t.getField(2)).numval().longValue();
46 long end = ((DataAtom) t.getField(3)).numval().longValue();
47 StorageFunc function = null;
49 try {
50 function = (StorageFunc) PigContext.instantiateArgFunc(funcSpec);
51 } catch (Exception e) {
52 throw (IOException) new IOException( "Couldn't instantiate storage function" ).initCause(e);
55 InputStream stream = FileLocalizer.open(ExecType.LOCAL, filename);
57 if (filename.endsWith(".gz")) {
58 stream = new GZIPInputStream(stream);
61 stream.skip(offset);
62 function.bindTo(stream, offset, end);
63 com.yahoo.pig.data.Tuple tuple;
65 while ((tuple = function.getNext()) != null) {
66 Tuple wrapped = new Tuple();
67 wrapped.copyFrom(tuple);
68 processor.process(wrapped);
71 stream.close();
74 public static void verify(TupleFlowParameters parameters, ErrorHandler handler) {
75 if (!parameters.getXML().containsKey("function")) {
76 handler.addError("'function' is a required parameter of ReadTuples.");