4 * September 7, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import galago
.tupleflow
.ExNihiloSource
;
12 import galago
.tupleflow
.IncompatibleProcessorException
;
13 import galago
.tupleflow
.Linkage
;
14 import galago
.tupleflow
.OutputClass
;
15 import galago
.tupleflow
.Processor
;
16 import galago
.tupleflow
.Step
;
17 import galago
.tupleflow
.TupleFlowParameters
;
18 import galago
.tupleflow
.execution
.Verified
;
20 import java
.util
.List
;
21 import java
.io
.IOException
;
28 @OutputClass(className
= "com.yahoo.pig.impl.galago.Tuple")
29 public class MakeReadSplits
implements ExNihiloSource
<Tuple
> {
31 public Processor
<Tuple
> processor
;
32 public List
<String
> filenames
;
33 public long increment
;
35 /** Creates a new instance of MakeReadSplits */
36 public MakeReadSplits(TupleFlowParameters parameters
) {
37 filenames
= parameters
.getXML().stringList("filename");
38 increment
= parameters
.getXML().get("splitLength", 10 * 1024 * 1024);
41 public void run() throws IOException
{
42 for (String filename
: filenames
) {
43 if (filename
.endsWith(".gz")) {
44 // compressed files can't be split
45 Tuple t
= new Tuple(4);
46 t
.setField(0, filename
+ "-" + 0 + "-" + Long
.MAX_VALUE
);
47 t
.setField(1, filename
);
49 t
.setField(3, Long
.MAX_VALUE
);
51 long fileLength
= new File(filename
).length();
53 for (long start
= 0; start
< fileLength
; start
+= increment
) {
54 long end
= Math
.min(fileLength
, start
+ increment
);
56 Tuple t
= new Tuple(4);
57 t
.setField(0, filename
+ "-" + start
+ "-" + end
);
58 t
.setField(1, filename
);
70 public void setProcessor(Step next
) throws IncompatibleProcessorException
{
71 Linkage
.link(this, next
);