Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / builtin / ShellBagEvalFunc.java
blob3f04864ace9ce2947210009a75c52d733b3479a0
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.builtin;
7 import java.io.ByteArrayOutputStream;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.OutputStream;
11 import java.util.Queue;
12 import java.util.concurrent.LinkedBlockingQueue;
14 import com.yahoo.pig.BagEvalFunc;
15 import com.yahoo.pig.data.DataCollector;
16 import com.yahoo.pig.data.Tuple;
18 public class ShellBagEvalFunc extends BagEvalFunc {
19 byte groupDelim = '\n';
20 byte recordDelim = '\n';
21 byte fieldDelim = '\t';
22 String fieldDelimString = "\t";
23 OutputStream os;
24 InputStream is;
25 InputStream es;
26 String cmd;
28 LinkedBlockingQueue<DataCollector> bags = new LinkedBlockingQueue<DataCollector>();
31 public ShellBagEvalFunc(String cmd) {
32 this.cmd = cmd;
35 private class EndOfQueue extends DataCollector{
36 public void add(Tuple t){}
39 private void startProcess() throws IOException {
40 Process p = Runtime.getRuntime().exec(cmd);
41 is = p.getInputStream();
42 os = p.getOutputStream();
43 es = p.getErrorStream();
46 new Thread() {
47 public void run() {
48 byte b[] = new byte[256];
49 int rc;
50 try {
51 while((rc = es.read(b)) > 0) {
52 System.err.write(b, 0, rc);
54 } catch(Exception e) {
55 e.printStackTrace();
58 }.start();
61 new Thread() {
62 public void run() {
63 while(true){
64 DataCollector dc = bags.poll();
65 if (dc instanceof EndOfQueue)
66 break;
67 try {
68 readBag(dc);
69 } catch (IOException e) {
70 e.printStackTrace();
74 }.start();
77 @Override
78 public void exec(Tuple input, DataCollector output) throws IOException {
79 if (os == null) {
80 startProcess();
82 os.write(input.toDelimitedString(fieldDelimString).getBytes());
83 os.write(recordDelim);
84 os.write(groupDelim);
85 os.flush();
86 try{
87 bags.put(output);
88 }catch(InterruptedException e){}
90 //Since returning before ensuring that output is present
91 output.markStale(true);
95 public void finish(){
96 try{
97 os.close();
98 try{
99 bags.put(new EndOfQueue());
100 }catch(InterruptedException e){}
101 }catch(IOException e){
102 e.printStackTrace();
106 private void readBag(DataCollector output) throws IOException {
107 ByteArrayOutputStream baos = new ByteArrayOutputStream();
108 boolean inRecord = false;
109 int c;
110 while((c = is.read()) != -1) {
111 System.out.print(((char)c));
112 if ((inRecord == false) && (c == groupDelim)) {
113 output.add(null);
114 output.markStale(false);
115 return;
117 inRecord = true;
118 if (c == recordDelim) {
119 inRecord = false;
120 Tuple t = new Tuple(baos.toString(), fieldDelimString);
121 System.err.println(Thread.currentThread().getName() + ": Adding tuple " + t + " to collector " + output);
122 output.add(t);
123 baos = new ByteArrayOutputStream();
124 continue;
126 baos.write(c);