Bringing tree up to date.
[galago.git] / java / pig-galago / src / com / yahoo / pig / builtin / BinStorage.java
blob6bfe492e15086b2e9dfab7d461b4fb39dca6c716
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.builtin;
7 import java.io.BufferedInputStream;
8 import java.io.BufferedOutputStream;
9 import java.io.DataInputStream;
10 import java.io.DataOutputStream;
11 import java.io.IOException;
12 import java.io.InputStream;
13 import java.io.OutputStream;
14 import java.util.Iterator;
16 import com.yahoo.pig.StorageFunc;
17 import com.yahoo.pig.PigServer;
18 import com.yahoo.pig.data.Tuple;
19 import com.yahoo.pig.impl.io.FileLocalizer;
20 import com.yahoo.pig.impl.io.InputStreamPosition;
22 public class BinStorage extends StorageFunc {
23 Iterator<Tuple> i = null;
24 DataInputStream in = null;
25 InputStreamPosition posInputStream = null;
26 InputStream fsis = null;
27 long end = Long.MAX_VALUE;
29 /**
30 * Simple binary nested reader format
32 public BinStorage() {
35 public Tuple getNext() throws IOException {
36 if (fsis == null || (posInputStream != null && posInputStream.getPosition() > end)) {
37 return null;
39 byte b = 0;
40 // skip to next record
41 while (true) {
42 b = (byte) in.read();
43 if(b != Tuple.RECORD_1 && b != -1) {
44 continue;
46 if(b == -1) return null;
47 b = (byte) in.read();
48 if(b != Tuple.RECORD_2 && b != -1) {
49 continue;
51 if(b == -1) return null;
52 b = (byte) in.read();
53 if(b != Tuple.RECORD_3 && b != -1) {
54 continue;
56 if(b == -1) return null;
57 break;
59 Tuple t = new Tuple();
60 t.readFields(in);
61 return t;
64 public void bindTo(InputStream is, long offset, long end) throws IOException {
65 this.posInputStream = new InputStreamPosition(is, offset);
66 this.in = new DataInputStream(new BufferedInputStream(posInputStream));
67 this.end = end;
68 this.fsis = is;
70 // Since we are not block aligned we throw away the first
71 // record and count on a different instance to read it
72 if (offset != 0) {
73 getNext();
78 DataOutputStream out = null;
80 @Override
81 public void bindTo(OutputStream os) throws IOException {
82 this.out = new DataOutputStream(new BufferedOutputStream(os));
85 @Override
86 public void done() throws IOException {
87 out.flush();
90 @Override
91 public void putNext(Tuple t) throws IOException {
92 out.write(Tuple.RECORD_1);
93 out.write(Tuple.RECORD_2);
94 out.write(Tuple.RECORD_3);
95 t.write(out);