2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.builtin
;
7 import java
.io
.IOException
;
8 import java
.util
.Iterator
;
10 import com
.yahoo
.pig
.AlgebraicAtomEvalFunc
;
11 import com
.yahoo
.pig
.AtomEvalFunc
;
12 import com
.yahoo
.pig
.TupleEvalFunc
;
13 import com
.yahoo
.pig
.data
.DataAtom
;
14 import com
.yahoo
.pig
.data
.DataBag
;
15 import com
.yahoo
.pig
.data
.Tuple
;
16 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaField
;
17 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItem
;
18 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItemList
;
21 * Generates the average of the values of the first field of a tuple. This class is Algebraic in
22 * implemenation, so if possible the execution will be split into a local and global application
24 public class AVG
extends AlgebraicAtomEvalFunc
{
26 public void exec(Tuple input
, DataAtom output
) throws IOException
{
27 double sum
= sum(input
);
28 double count
= count(input
);
37 public TupleEvalFunc
getInitial() {
41 public TupleEvalFunc
getIntermed() {
42 return new Intermed();
45 public AtomEvalFunc
getFinal() {
49 static public class Initial
extends TupleEvalFunc
{
50 public void exec(Tuple input
, Tuple output
) throws IOException
{
52 output
.appendField(new DataAtom(sum(input
)));
53 output
.appendField(new DataAtom(count(input
)));
54 output
.appendField(new DataAtom("processed by initial"));
55 } catch(RuntimeException t
) {
56 throw new RuntimeException(t
.getMessage() + ": " + input
);
61 static public class Intermed
extends TupleEvalFunc
{
62 public void exec(Tuple input
, Tuple output
) throws IOException
{
63 combine(input
.getBagField(0), output
);
67 static public class Final
extends AtomEvalFunc
{
68 public void exec(Tuple input
, DataAtom output
) throws IOException
{
69 Tuple combined
= new Tuple();
70 if(input
.getField(0) instanceof DataBag
) {
71 combine(input
.getBagField(0), combined
);
73 throw new RuntimeException("Bag not found in: " + input
);
76 //combined = input.getTupleField(0);
78 double sum
= combined
.getAtomField(0).numval();
79 double count
= combined
.getAtomField(1).numval();
89 static protected void combine(DataBag values
, Tuple output
) throws IOException
{
93 for (Iterator it
= values
.content(); it
.hasNext();) {
94 Tuple t
= (Tuple
) it
.next();
95 // if(!(t.getField(0) instanceof DataAtom)) {
96 // throw new RuntimeException("Unexpected Type: " + t.getField(0).getClass().getName() + " in " + t);
99 sum
+= t
.getAtomField(0).numval();
100 count
+= t
.getAtomField(1).numval();
103 output
.appendField(new DataAtom(sum
));
104 output
.appendField(new DataAtom(count
));
107 static protected long count(Tuple input
) throws IOException
{
108 DataBag values
= input
.getBagField(0);
111 return values
.cardinality();
114 static protected double sum(Tuple input
) throws IOException
{
115 DataBag values
= input
.getBagField(0);
118 for (Iterator it
= values
.content(); it
.hasNext();) {
119 Tuple t
= (Tuple
) it
.next();
120 sum
+= t
.getAtomField(0).numval();
127 public SchemaItem
outputSchema() {
128 SchemaItemList schema
= new SchemaItemList();
129 schema
.add(new SchemaField("average"));