2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.eval
.groupby
;
7 import java
.io
.ByteArrayInputStream
;
8 import java
.io
.IOException
;
11 import com
.yahoo
.pig
.GroupFunc
;
12 import com
.yahoo
.pig
.data
.*;
13 import com
.yahoo
.pig
.impl
.PigContext
;
14 import com
.yahoo
.pig
.impl
.eval
.EvalItemList
;
15 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.QueryParser
;
16 import com
.yahoo
.pig
.impl
.logicalLayer
.parser
.ParseException
;
17 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItem
;
18 import com
.yahoo
.pig
.impl
.logicalLayer
.schema
.SchemaItemList
;
20 public class GroupBySpec
{
21 public GroupFunc func
;
22 public EvalItemList args
;
23 public boolean isInner
;
24 PigContext pigContext
;
26 public GroupBySpec(PigContext pigContext
, GroupFunc func
, EvalItemList args
, boolean isInner
) {
27 this.pigContext
= pigContext
;
30 this.isInner
= isInner
;
33 public Datum
[] eval(Tuple input
) throws IOException
{
35 return func
.exec(args
.simpleEval(input
));
36 } catch (IOException e
) {
37 System
.out
.println("Warning: group function " + func
.name() + " failed. Substituting default value (no groups).");
42 public GroupBySpec(PigContext pigContext
, String specStr
) throws IOException
{
43 this.pigContext
= pigContext
;
44 ByteArrayInputStream in
= new ByteArrayInputStream(specStr
.getBytes());
45 QueryParser parser
= new QueryParser(in
, pigContext
, null);
48 copyFrom
= parser
.GroupBySpec();
49 } catch (ParseException e
) {
50 throw new IOException(specStr
+ ":" + e
.getMessage());
52 this.func
= copyFrom
.func
;
53 this.args
= copyFrom
.args
;
54 this.isInner
= copyFrom
.isInner
;
57 public String
toString() {
58 return (func
+ "(" + args
+ ") " + (isInner?
"INNER" : "OUTER"));
61 public List
<String
> getFuncs() {
62 List
<String
> funcs
= new ArrayList
<String
>();
63 funcs
.add(func
.toString());
67 public SchemaItem
mapInputSchema(SchemaItem input
){
68 SchemaItem inputToFunc
;
69 if (args
.columns
.size() == 1)
70 inputToFunc
= args
.mapInputSchema(input
).schemaFor(0).copy();
72 inputToFunc
= args
.mapInputSchema(input
);
74 SchemaItem output
= func
.outputSchema(inputToFunc
);
77 output
= new SchemaItemList();