Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / galago / Tuple.java
blobbe485ea0314a7e9f666f7afc161d1fb1c7ececb3
1 /*
2 * Tuple
4 * September 1, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
7 */
9 package com.yahoo.pig.impl.galago;
11 import galago.tupleflow.ArrayInput;
12 import galago.tupleflow.ArrayOutput;
13 import galago.tupleflow.ExNihiloSource;
14 import galago.tupleflow.IncompatibleProcessorException;
15 import galago.tupleflow.Linkage;
16 import galago.tupleflow.Order;
17 import galago.tupleflow.Processor;
18 import galago.tupleflow.ReaderSource;
19 import galago.tupleflow.Step;
20 import galago.tupleflow.Type;
21 import galago.tupleflow.TypeReader;
22 import java.io.EOFException;
23 import java.io.IOException;
24 import java.util.Collection;
25 import java.util.Comparator;
26 import java.util.PriorityQueue;
28 /**
30 * @author trevor
32 public class Tuple extends com.yahoo.pig.data.Tuple
33 implements Type<Tuple> {
34 public Tuple() {
35 super();
38 public Tuple(int fieldCount) {
39 super(fieldCount);
42 public Tuple(com.yahoo.pig.data.Tuple t) {
43 super(t);
46 public Order<Tuple> getOrder(String... fields) {
47 return new TupleOrder( fields );
50 public static class TupleOrder implements Order<Tuple> {
51 public TupleOrder( String[] orderSpec ) {
54 public Class<Tuple> getOrderedClass() {
55 return Tuple.class;
58 public String[] getOrderSpec() {
59 return new String[0];
62 public int hash(Tuple tuple) {
63 return tuple.fields.get(0).hashCode();
66 public Comparator<Tuple> lessThan() {
67 return new Comparator<Tuple>() {
68 public int compare( Tuple one, Tuple two ) {
69 return one.compareTo(two);
74 public Comparator<Tuple> greaterThan() {
75 return new Comparator<Tuple>() {
76 public int compare( Tuple one, Tuple two ) {
77 return -one.compareTo(two);
82 public ReaderSource<Tuple> orderedCombiner( final Collection<TypeReader<Tuple>> readers, final boolean closeOnExit) {
83 return new TupleOrderedCombiner( readers, closeOnExit );
86 public static class TupleOrderedCombiner implements ReaderSource<Tuple> {
87 public Processor<Tuple> processor;
88 public Collection<TypeReader<Tuple>> readers;
89 public PriorityQueue<ReaderIterator> iterators;
90 public boolean closeOnExit;
91 public boolean uninitialized;
93 public TupleOrderedCombiner( Collection<TypeReader<Tuple>> readers, boolean closeOnExit ) {
94 this.readers = readers;
95 this.closeOnExit = closeOnExit;
96 this.iterators = new PriorityQueue<ReaderIterator>();
97 this.uninitialized = true;
100 public static class ReaderIterator implements Comparable<ReaderIterator> {
101 public Tuple top;
102 public TypeReader<Tuple> reader;
104 public ReaderIterator( TypeReader<Tuple> reader ) throws IOException {
105 this.reader = reader;
106 read();
109 public boolean isDone() {
110 return top == null;
113 public Tuple read() throws IOException {
114 top = reader.read();
115 return top;
118 public int compareTo( ReaderIterator other ) {
119 return top.compareTo( other.top );
123 public Tuple read() throws IOException {
124 if( uninitialized ) {
125 for( TypeReader<Tuple> reader : readers ) {
126 ReaderIterator iterator = new ReaderIterator( reader );
128 if( iterator.top != null ) {
129 iterators.add( iterator );
132 uninitialized = false;
135 Tuple result = null;
137 if( iterators.size() > 0 ) {
138 ReaderIterator iterator = iterators.poll();
139 result = iterator.top;
141 if( iterator.read() != null )
142 iterators.offer( iterator );
145 return result;
148 public void run() throws IOException {
149 for( TypeReader<Tuple> reader : readers ) {
150 ReaderIterator iterator = new ReaderIterator( reader );
152 if( iterator.top != null ) {
153 iterators.add( iterator );
157 while( iterators.size() > 0 ) {
158 ReaderIterator iterator = iterators.poll();
159 processor.process( iterator.top );
161 if( iterator.read() != null ) {
162 iterators.offer( iterator );
166 if( closeOnExit )
167 processor.close();
170 public void setProcessor( Step step ) throws IncompatibleProcessorException {
171 Linkage.link(this,step);
175 public Processor<Tuple> orderedWriter( final ArrayOutput output ) {
176 return new Processor<Tuple>() {
177 public void process( Tuple t ) throws IOException {
178 t.write( output.getDataOutput() );
181 public void close() {}
185 public TypeReader<Tuple> orderedReader(final ArrayInput input) {
186 return new TupleOrderedReader( input );
189 public TypeReader<Tuple> orderedReader(ArrayInput input, int bufferSize) {
190 return orderedReader(input);
193 public static class TupleOrderedReader implements TypeReader<Tuple> {
194 public Processor<Tuple> processor;
195 public ArrayInput input;
197 public TupleOrderedReader( ArrayInput input ) {
198 this.input = input;
201 public Tuple read() throws IOException {
202 try {
203 Tuple t = new Tuple();
204 t.readFields( input.getDataInput() );
205 return t;
206 } catch( EOFException e ) {
207 return null;
211 public void run() throws IOException {
212 while(true) {
213 Tuple t = read();
215 if( t == null )
216 break;
218 processor.process( t );
221 processor.close();
224 public void setProcessor(Step step) throws IncompatibleProcessorException {
225 Linkage.link( this, step );