Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / eval / window / TimeWindowSpec.java
blob1f55aeb6a7d534ade356cff199c780481c863e4d
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.eval.window;
7 import java.io.IOException;
8 import java.util.Iterator;
9 import java.util.LinkedList;
10 import java.util.List;
12 import com.yahoo.pig.data.DataCollector;
13 import com.yahoo.pig.data.TimestampedTuple;
14 import com.yahoo.pig.data.Tuple;
16 public class TimeWindowSpec extends WindowSpec {
17 double duration; // duration in seconds
18 List<TimestampedTuple> window;
20 public TimeWindowSpec(windowType type, double duration){
21 super(type);
22 this.duration = duration;
23 window = new LinkedList<TimestampedTuple>();
26 @Override
27 public DataCollector collector(final DataCollector output) {
28 return new DataCollector() {
30 public void add(Tuple t) throws IOException {
32 boolean changed = false;
34 if (!(((TimestampedTuple) t).isHeartbeat)) {
35 window.add((TimestampedTuple) t);
36 changed = true;
39 double expireTime = ((TimestampedTuple) t).timestamp - duration;
40 while (true) {
41 TimestampedTuple tail = window.get(0);
43 if (tail != null & tail.timestamp <= expireTime) {
44 window.remove(0);
45 changed = true;
46 } else {
47 break;
51 if (changed) {
52 // emit entire window content to output collector
53 for (Iterator<TimestampedTuple> it = window.iterator(); it.hasNext(); ) {
54 output.add(it.next());