2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.builtin
;
7 import java
.io
.ByteArrayOutputStream
;
8 import java
.io
.IOException
;
9 import java
.io
.InputStream
;
10 import java
.io
.OutputStream
;
11 import java
.util
.Queue
;
12 import java
.util
.concurrent
.LinkedBlockingQueue
;
14 import com
.yahoo
.pig
.BagEvalFunc
;
15 import com
.yahoo
.pig
.data
.DataCollector
;
16 import com
.yahoo
.pig
.data
.Tuple
;
18 public class ShellBagEvalFunc
extends BagEvalFunc
{
19 byte groupDelim
= '\n';
20 byte recordDelim
= '\n';
21 byte fieldDelim
= '\t';
22 String fieldDelimString
= "\t";
28 LinkedBlockingQueue
<DataCollector
> bags
= new LinkedBlockingQueue
<DataCollector
>();
31 public ShellBagEvalFunc(String cmd
) {
35 private class EndOfQueue
extends DataCollector
{
36 public void add(Tuple t
){}
39 private void startProcess() throws IOException
{
40 Process p
= Runtime
.getRuntime().exec(cmd
);
41 is
= p
.getInputStream();
42 os
= p
.getOutputStream();
43 es
= p
.getErrorStream();
48 byte b[] = new byte[256];
51 while((rc = es.read(b)) > 0) {
52 System.err.write(b, 0, rc);
54 } catch(Exception e) {
64 DataCollector dc
= bags
.poll();
65 if (dc
instanceof EndOfQueue
)
69 } catch (IOException e
) {
78 public void exec(Tuple input
, DataCollector output
) throws IOException
{
82 os
.write(input
.toDelimitedString(fieldDelimString
).getBytes());
83 os
.write(recordDelim
);
88 }catch(InterruptedException e
){}
90 //Since returning before ensuring that output is present
91 output
.markStale(true);
99 bags
.put(new EndOfQueue());
100 }catch(InterruptedException e
){}
101 }catch(IOException e
){
106 private void readBag(DataCollector output
) throws IOException
{
107 ByteArrayOutputStream baos
= new ByteArrayOutputStream();
108 boolean inRecord
= false;
110 while((c
= is
.read()) != -1) {
111 System
.out
.print(((char)c
));
112 if ((inRecord
== false) && (c
== groupDelim
)) {
114 output
.markStale(false);
118 if (c
== recordDelim
) {
120 Tuple t
= new Tuple(baos
.toString(), fieldDelimString
);
121 System
.err
.println(Thread
.currentThread().getName() + ": Adding tuple " + t
+ " to collector " + output
);
123 baos
= new ByteArrayOutputStream();