2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.physicalLayer
;
7 import java
.io
.IOException
;
8 import java
.util
.HashMap
;
9 import java
.util
.Iterator
;
10 import java
.util
.LinkedList
;
13 import com
.yahoo
.pig
.data
.Tuple
;
16 * Right now putting in a very dumb implementation which replicates the queue
19 class POSplitMaster
extends POSplit
{
22 Map
<POSplit
, LinkedList
<Tuple
>> lists
;
24 public POSplitMaster(PhysicalOperator input
, int outputType
) {
26 inputs
= new PhysicalOperator
[1];
29 lists
= new HashMap
<POSplit
, LinkedList
<Tuple
>>();
30 registerSlave(this); // register self as one consumer
33 public Tuple
getNext() throws IOException
{
34 return slaveGetNext(this);
37 // should only be called by POSplitSlave or POSplitMaster
38 public void registerSlave(POSplit slave
) {
39 lists
.put(slave
, new LinkedList
<Tuple
>());
42 // should only be called by POSplitSlave or POSplitMaster
43 public Tuple
slaveGetNext(POSplit slave
) throws IOException
{
45 LinkedList
<Tuple
> list
= lists
.get(slave
);
49 Tuple t
= inputs
[0].getNext();
50 Iterator
<LinkedList
<Tuple
>> iter
= lists
.values().iterator();
52 while(iter
.hasNext()){
57 return list
.removeFirst();
61 private Tuple ReadFromBuffer(int pos) throws IOException {
62 while (buffer.size() <= pos)
63 buffer.add(inputs[0].getNext());
64 return buffer.get(pos);
68 private void ShrinkBuffer() {
69 Set<POSplit> slaves = positions.keySet();
71 // determine how many buffer positions we can safely remove
72 int n = buffer.size(); // begin optimistically
73 for (Iterator<POSplit> it = slaves.iterator(); it.hasNext();) {
74 int pos = positions.get(it.next()).intValue();
76 n = pos; // if our current "n" would screw up this slave, reduce it
79 // now, perform the removal
80 for (int i = 0; i < n; i++)
83 // update the positions based on the removal
85 for (Iterator<POSplit> it = slaves.iterator(); it.hasNext();) {
86 POSplit slave = it.next();
87 int pos = positions.get(slave).intValue();
88 positions.put(slave, new Integer(pos - n));