4 * September 1, 2007 -- Trevor Strohman
6 * BSD License (http://www.galagosearch.org/license)
9 package com
.yahoo
.pig
.impl
.galago
;
11 import galago
.tupleflow
.ArrayInput
;
12 import galago
.tupleflow
.ArrayOutput
;
13 import galago
.tupleflow
.ExNihiloSource
;
14 import galago
.tupleflow
.IncompatibleProcessorException
;
15 import galago
.tupleflow
.Linkage
;
16 import galago
.tupleflow
.Order
;
17 import galago
.tupleflow
.Processor
;
18 import galago
.tupleflow
.ReaderSource
;
19 import galago
.tupleflow
.Step
;
20 import galago
.tupleflow
.Type
;
21 import galago
.tupleflow
.TypeReader
;
22 import java
.io
.EOFException
;
23 import java
.io
.IOException
;
24 import java
.util
.Collection
;
25 import java
.util
.Comparator
;
26 import java
.util
.PriorityQueue
;
32 public class Tuple
extends com
.yahoo
.pig
.data
.Tuple
33 implements Type
<Tuple
> {
38 public Tuple(int fieldCount
) {
42 public Tuple(com
.yahoo
.pig
.data
.Tuple t
) {
46 public Order
<Tuple
> getOrder(String
... fields
) {
47 return new TupleOrder( fields
);
50 public static class TupleOrder
implements Order
<Tuple
> {
51 public TupleOrder( String
[] orderSpec
) {
54 public Class
<Tuple
> getOrderedClass() {
58 public String
[] getOrderSpec() {
62 public int hash(Tuple tuple
) {
63 return tuple
.fields
.get(0).hashCode();
66 public Comparator
<Tuple
> lessThan() {
67 return new Comparator
<Tuple
>() {
68 public int compare( Tuple one
, Tuple two
) {
69 return one
.compareTo(two
);
74 public Comparator
<Tuple
> greaterThan() {
75 return new Comparator
<Tuple
>() {
76 public int compare( Tuple one
, Tuple two
) {
77 return -one
.compareTo(two
);
82 public ReaderSource
<Tuple
> orderedCombiner( final Collection
<TypeReader
<Tuple
>> readers
, final boolean closeOnExit
) {
83 return new TupleOrderedCombiner( readers
, closeOnExit
);
86 public static class TupleOrderedCombiner
implements ReaderSource
<Tuple
> {
87 public Processor
<Tuple
> processor
;
88 public Collection
<TypeReader
<Tuple
>> readers
;
89 public PriorityQueue
<ReaderIterator
> iterators
;
90 public boolean closeOnExit
;
91 public boolean uninitialized
;
93 public TupleOrderedCombiner( Collection
<TypeReader
<Tuple
>> readers
, boolean closeOnExit
) {
94 this.readers
= readers
;
95 this.closeOnExit
= closeOnExit
;
96 this.iterators
= new PriorityQueue
<ReaderIterator
>();
97 this.uninitialized
= true;
100 public static class ReaderIterator
implements Comparable
<ReaderIterator
> {
102 public TypeReader
<Tuple
> reader
;
104 public ReaderIterator( TypeReader
<Tuple
> reader
) throws IOException
{
105 this.reader
= reader
;
109 public boolean isDone() {
113 public Tuple
read() throws IOException
{
118 public int compareTo( ReaderIterator other
) {
119 return top
.compareTo( other
.top
);
123 public Tuple
read() throws IOException
{
124 if( uninitialized
) {
125 for( TypeReader
<Tuple
> reader
: readers
) {
126 ReaderIterator iterator
= new ReaderIterator( reader
);
128 if( iterator
.top
!= null ) {
129 iterators
.add( iterator
);
132 uninitialized
= false;
137 if( iterators
.size() > 0 ) {
138 ReaderIterator iterator
= iterators
.poll();
139 result
= iterator
.top
;
141 if( iterator
.read() != null )
142 iterators
.offer( iterator
);
148 public void run() throws IOException
{
149 for( TypeReader
<Tuple
> reader
: readers
) {
150 ReaderIterator iterator
= new ReaderIterator( reader
);
152 if( iterator
.top
!= null ) {
153 iterators
.add( iterator
);
157 while( iterators
.size() > 0 ) {
158 ReaderIterator iterator
= iterators
.poll();
159 processor
.process( iterator
.top
);
161 if( iterator
.read() != null ) {
162 iterators
.offer( iterator
);
170 public void setProcessor( Step step
) throws IncompatibleProcessorException
{
171 Linkage
.link(this,step
);
175 public Processor
<Tuple
> orderedWriter( final ArrayOutput output
) {
176 return new Processor
<Tuple
>() {
177 public void process( Tuple t
) throws IOException
{
178 t
.write( output
.getDataOutput() );
181 public void close() {}
185 public TypeReader
<Tuple
> orderedReader(final ArrayInput input
) {
186 return new TupleOrderedReader( input
);
189 public TypeReader
<Tuple
> orderedReader(ArrayInput input
, int bufferSize
) {
190 return orderedReader(input
);
193 public static class TupleOrderedReader
implements TypeReader
<Tuple
> {
194 public Processor
<Tuple
> processor
;
195 public ArrayInput input
;
197 public TupleOrderedReader( ArrayInput input
) {
201 public Tuple
read() throws IOException
{
203 Tuple t
= new Tuple();
204 t
.readFields( input
.getDataInput() );
206 } catch( EOFException e
) {
211 public void run() throws IOException
{
218 processor
.process( t
);
224 public void setProcessor(Step step
) throws IncompatibleProcessorException
{
225 Linkage
.link( this, step
);