Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / physicalLayer / POSplitMaster.java
blob5b692729947130186ac1c4d607170e9c26ff9352
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.physicalLayer;
7 import java.io.IOException;
8 import java.util.HashMap;
9 import java.util.Iterator;
10 import java.util.LinkedList;
11 import java.util.Map;
13 import com.yahoo.pig.data.Tuple;
16 * Right now putting in a very dumb implementation which replicates the queue
17 every slave
19 class POSplitMaster extends POSplit {
22 Map<POSplit, LinkedList<Tuple>> lists;
24 public POSplitMaster(PhysicalOperator input, int outputType) {
25 super(outputType);
26 inputs = new PhysicalOperator[1];
27 inputs[0] = input;
29 lists = new HashMap<POSplit, LinkedList<Tuple>>();
30 registerSlave(this); // register self as one consumer
33 public Tuple getNext() throws IOException {
34 return slaveGetNext(this);
37 // should only be called by POSplitSlave or POSplitMaster
38 public void registerSlave(POSplit slave) {
39 lists.put(slave, new LinkedList<Tuple>());
42 // should only be called by POSplitSlave or POSplitMaster
43 public Tuple slaveGetNext(POSplit slave) throws IOException {
45 LinkedList<Tuple> list = lists.get(slave);
47 if (list.isEmpty()){
48 //Get more input
49 Tuple t = inputs[0].getNext();
50 Iterator<LinkedList<Tuple>> iter = lists.values().iterator();
52 while(iter.hasNext()){
53 iter.next().add(t);
57 return list.removeFirst();
61 private Tuple ReadFromBuffer(int pos) throws IOException {
62 while (buffer.size() <= pos)
63 buffer.add(inputs[0].getNext());
64 return buffer.get(pos);
68 private void ShrinkBuffer() {
69 Set<POSplit> slaves = positions.keySet();
71 // determine how many buffer positions we can safely remove
72 int n = buffer.size(); // begin optimistically
73 for (Iterator<POSplit> it = slaves.iterator(); it.hasNext();) {
74 int pos = positions.get(it.next()).intValue();
75 if (pos < n)
76 n = pos; // if our current "n" would screw up this slave, reduce it
79 // now, perform the removal
80 for (int i = 0; i < n; i++)
81 buffer.removeFirst();
83 // update the positions based on the removal
84 if (n > 0) {
85 for (Iterator<POSplit> it = slaves.iterator(); it.hasNext();) {
86 POSplit slave = it.next();
87 int pos = positions.get(slave).intValue();
88 positions.put(slave, new Integer(pos - n));