2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.eval
.window
;
7 import java
.io
.IOException
;
8 import java
.util
.Iterator
;
9 import java
.util
.LinkedList
;
10 import java
.util
.List
;
12 import com
.yahoo
.pig
.data
.DataCollector
;
13 import com
.yahoo
.pig
.data
.TimestampedTuple
;
14 import com
.yahoo
.pig
.data
.Tuple
;
16 public class TimeWindowSpec
extends WindowSpec
{
17 double duration
; // duration in seconds
18 List
<TimestampedTuple
> window
;
20 public TimeWindowSpec(windowType type
, double duration
){
22 this.duration
= duration
;
23 window
= new LinkedList
<TimestampedTuple
>();
27 public DataCollector
collector(final DataCollector output
) {
28 return new DataCollector() {
30 public void add(Tuple t
) throws IOException
{
32 boolean changed
= false;
34 if (!(((TimestampedTuple
) t
).isHeartbeat
)) {
35 window
.add((TimestampedTuple
) t
);
39 double expireTime
= ((TimestampedTuple
) t
).timestamp
- duration
;
41 TimestampedTuple tail
= window
.get(0);
43 if (tail
!= null & tail
.timestamp
<= expireTime
) {
52 // emit entire window content to output collector
53 for (Iterator
<TimestampedTuple
> it
= window
.iterator(); it
.hasNext(); ) {
54 output
.add(it
.next());