Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / builtin / AVG.java
blob30c882969757ec1307b37c554194ead2a39e36b8
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.builtin;
7 import java.io.IOException;
8 import java.util.Iterator;
10 import com.yahoo.pig.AlgebraicAtomEvalFunc;
11 import com.yahoo.pig.AtomEvalFunc;
12 import com.yahoo.pig.TupleEvalFunc;
13 import com.yahoo.pig.data.DataAtom;
14 import com.yahoo.pig.data.DataBag;
15 import com.yahoo.pig.data.Tuple;
16 import com.yahoo.pig.impl.logicalLayer.schema.SchemaField;
17 import com.yahoo.pig.impl.logicalLayer.schema.SchemaItem;
18 import com.yahoo.pig.impl.logicalLayer.schema.SchemaItemList;
20 /**
21 * Generates the average of the values of the first field of a tuple. This class is Algebraic in
22 * implemenation, so if possible the execution will be split into a local and global application
24 public class AVG extends AlgebraicAtomEvalFunc {
26 public void exec(Tuple input, DataAtom output) throws IOException {
27 double sum = sum(input);
28 double count = count(input);
30 double avg = 0;
31 if (count > 0)
32 avg = sum / count;
34 output.setValue(avg);
37 public TupleEvalFunc getInitial() {
38 return new Initial();
41 public TupleEvalFunc getIntermed() {
42 return new Intermed();
45 public AtomEvalFunc getFinal() {
46 return new Final();
49 static public class Initial extends TupleEvalFunc {
50 public void exec(Tuple input, Tuple output) throws IOException {
51 try {
52 output.appendField(new DataAtom(sum(input)));
53 output.appendField(new DataAtom(count(input)));
54 output.appendField(new DataAtom("processed by initial"));
55 } catch(RuntimeException t) {
56 throw new RuntimeException(t.getMessage() + ": " + input);
61 static public class Intermed extends TupleEvalFunc {
62 public void exec(Tuple input, Tuple output) throws IOException {
63 combine(input.getBagField(0), output);
67 static public class Final extends AtomEvalFunc {
68 public void exec(Tuple input, DataAtom output) throws IOException {
69 Tuple combined = new Tuple();
70 if(input.getField(0) instanceof DataBag) {
71 combine(input.getBagField(0), combined);
72 } else {
73 throw new RuntimeException("Bag not found in: " + input);
76 //combined = input.getTupleField(0);
78 double sum = combined.getAtomField(0).numval();
79 double count = combined.getAtomField(1).numval();
81 double avg = 0;
82 if (count > 0) {
83 avg = sum / count;
85 output.setValue(avg);
89 static protected void combine(DataBag values, Tuple output) throws IOException {
90 double sum = 0;
91 double count = 0;
93 for (Iterator it = values.content(); it.hasNext();) {
94 Tuple t = (Tuple) it.next();
95 // if(!(t.getField(0) instanceof DataAtom)) {
96 // throw new RuntimeException("Unexpected Type: " + t.getField(0).getClass().getName() + " in " + t);
97 // }
99 sum += t.getAtomField(0).numval();
100 count += t.getAtomField(1).numval();
103 output.appendField(new DataAtom(sum));
104 output.appendField(new DataAtom(count));
107 static protected long count(Tuple input) throws IOException {
108 DataBag values = input.getBagField(0);
111 return values.cardinality();
114 static protected double sum(Tuple input) throws IOException {
115 DataBag values = input.getBagField(0);
117 double sum = 0;
118 for (Iterator it = values.content(); it.hasNext();) {
119 Tuple t = (Tuple) it.next();
120 sum += t.getAtomField(0).numval();
123 return sum;
126 @Override
127 public SchemaItem outputSchema() {
128 SchemaItemList schema = new SchemaItemList();
129 schema.add(new SchemaField("average"));
130 return schema;