Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / MakeReadSplits.java
blob3dc4421a323ca2a502bb6fd162e5bfd94c00a68a
1 /*
2 * MakeReadSplits
4 * September 7, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import galago.tupleflow.ExNihiloSource;
12 import galago.tupleflow.IncompatibleProcessorException;
13 import galago.tupleflow.Linkage;
14 import galago.tupleflow.OutputClass;
15 import galago.tupleflow.Processor;
16 import galago.tupleflow.Step;
17 import galago.tupleflow.TupleFlowParameters;
18 import galago.tupleflow.execution.Verified;
19 import java.io.File;
20 import java.util.List;
21 import java.io.IOException;
23 /**
25 * @author trevor
27 @Verified
28 @OutputClass(className = "com.yahoo.pig.impl.galago.Tuple")
29 public class MakeReadSplits implements ExNihiloSource<Tuple> {
31 public Processor<Tuple> processor;
32 public List<String> filenames;
33 public long increment;
35 /** Creates a new instance of MakeReadSplits */
36 public MakeReadSplits(TupleFlowParameters parameters) {
37 filenames = parameters.getXML().stringList("filename");
38 increment = parameters.getXML().get("splitLength", 10 * 1024 * 1024);
41 public void run() throws IOException {
42 for (String filename : filenames) {
43 if (filename.endsWith(".gz")) {
44 // compressed files can't be split
45 Tuple t = new Tuple(4);
46 t.setField(0, filename + "-" + 0 + "-" + Long.MAX_VALUE);
47 t.setField(1, filename);
48 t.setField(2, 0);
49 t.setField(3, Long.MAX_VALUE);
50 } else {
51 long fileLength = new File(filename).length();
53 for (long start = 0; start < fileLength; start += increment) {
54 long end = Math.min(fileLength, start + increment);
56 Tuple t = new Tuple(4);
57 t.setField(0, filename + "-" + start + "-" + end);
58 t.setField(1, filename);
59 t.setField(2, start);
60 t.setField(3, end);
62 processor.process(t);
67 processor.close();
70 public void setProcessor(Step next) throws IncompatibleProcessorException {
71 Linkage.link(this, next);