Initial import into git.
[galago.git] / java / pig-galago / src / com / yahoo / pig / impl / io / FileLocalizer.java
blobbe8921158ad33684b33d995e53c524674428a366
1 /*
2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
4 */
5 package com.yahoo.pig.impl.io;
7 import java.io.FileInputStream;
8 import java.io.FileNotFoundException;
9 import java.io.FileOutputStream;
10 import java.io.IOException;
11 import java.io.InputStream;
12 import java.io.OutputStream;
13 import java.util.Random;
14 import java.util.Stack;
16 import org.apache.hadoop.conf.Configuration;
17 import org.apache.hadoop.fs.FileSystem;
18 import org.apache.hadoop.fs.FileUtil;
19 import org.apache.hadoop.fs.Path;
21 import com.yahoo.pig.PigServer.ExecType;
22 import com.yahoo.pig.impl.mapreduceExec.MapReduceLauncher;
24 public class FileLocalizer {
25 static public final String LOCAL_PREFIX = "file:";
26 static public final String HADOOP_PREFIX = "hadoop:";
28 static FileSystem getDfs() throws IOException {
29 return MapReduceLauncher.getDfs();
32 static FileSystem getLfs() throws IOException {
33 return MapReduceLauncher.getLfs();
36 static Configuration getConf() throws IOException {
37 return MapReduceLauncher.getConf();
40 static class DFSInputStreamIterator extends InputStream {
41 InputStream current;
42 Path paths[];
43 int currentPath;
45 public DFSInputStreamIterator(Path paths[]) {
46 this.paths = paths;
49 private boolean isEOF() throws IOException {
50 if (current == null) {
51 if (currentPath == paths.length)
52 return true;
53 current = getDfs().open(paths[currentPath++]);
55 return false;
58 private void doNext() throws IOException {
59 current.close();
60 current = null;
63 public int read() throws IOException {
64 while (!isEOF()) {
65 int rc = current.read();
66 if (rc != -1)
67 return rc;
68 doNext();
70 return -1;
73 public int available() throws IOException {
74 if (isEOF())
75 return 0;
76 return current.available();
79 public void close() throws IOException {
80 if (current != null) {
81 current.close();
82 current = null;
84 currentPath = paths.length;
87 public int read(byte[] b, int off, int len) throws IOException {
88 int count = 0;
89 while (!isEOF() && len > 0) {
90 int rc = current.read(b, off, len);
91 if (rc <= 0) {
92 doNext();
93 continue;
95 off += rc;
96 len -= rc;
97 count += rc;
99 return count == 0 ? (isEOF() ? -1 : 0) : count;
102 public int read(byte[] b) throws IOException {
103 return read(b, 0, b.length);
106 public long skip(long n) throws IOException {
107 while (!isEOF() && n > 0) {
108 n -= current.skip(n);
110 return n;
115 static String checkDefaultPrefix(ExecType execType, String fileSpec) {
116 if (fileSpec.startsWith(HADOOP_PREFIX) || fileSpec.startsWith(LOCAL_PREFIX))
117 return fileSpec;
118 return (execType == ExecType.LOCAL ? LOCAL_PREFIX : HADOOP_PREFIX) + fileSpec;
121 static public InputStream open(ExecType execType, String fileSpec) throws IOException {
122 fileSpec = checkDefaultPrefix(execType, fileSpec);
123 if (fileSpec.startsWith(HADOOP_PREFIX)) {
124 init();
125 fileSpec = fileSpec.substring(HADOOP_PREFIX.length());
126 Path path = new Path(fileSpec);
127 if (getDfs().isFile(path))
128 return getDfs().open(path);
129 Path paths[] = getDfs().listPaths(path);
130 return new DFSInputStreamIterator(paths);
131 } else {
132 if (fileSpec.startsWith(LOCAL_PREFIX)) {
133 fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
135 return new FileInputStream(fileSpec);
139 static public OutputStream create(ExecType execType, String fileSpec) throws IOException{
140 return create(execType,fileSpec,false);
143 static public OutputStream create(ExecType execType, String fileSpec, boolean append) throws IOException {
144 fileSpec = checkDefaultPrefix(execType, fileSpec);
145 if (fileSpec.startsWith(HADOOP_PREFIX)) {
146 init();
147 fileSpec = fileSpec.substring(HADOOP_PREFIX.length());
148 return getDfs().create(new Path(fileSpec));
149 } else {
150 if (fileSpec.startsWith(LOCAL_PREFIX)) {
151 fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
153 return new FileOutputStream(fileSpec,append);
158 static Stack<Path> toDelete = new Stack<Path>();
159 static Random r = new Random();
160 static Path relativeRoot;
161 static boolean initialized = false;
162 static private void init() {
163 if (!initialized) {
164 initialized = true;
165 relativeRoot = new Path("/tmp/temp" + r.nextInt());
166 toDelete.push(relativeRoot);
167 Runtime.getRuntime().addShutdownHook(new Thread() {
168 public void run() {
169 while (!toDelete.empty()) {
170 try {
171 Path path = toDelete.pop();
172 getDfs().delete(path);
173 } catch (IOException e) {
174 e.printStackTrace();
182 public static synchronized Path getTemporaryPath(Path relative) throws IOException {
183 init();
184 if (relative == null) {
185 relative = relativeRoot;
187 if (!getDfs().exists(relativeRoot))
188 getDfs().mkdirs(relativeRoot);
189 Path path = new Path(relative, "tmp" + r.nextInt());
190 toDelete.push(path);
191 return path;
194 public static String hadoopify(String filename) throws IOException {
195 if (filename.startsWith(LOCAL_PREFIX)) {
196 filename = filename.substring(LOCAL_PREFIX.length());
198 Path localPath = new Path(filename);
199 if (!getLfs().exists(localPath)) {
200 throw new FileNotFoundException(filename);
202 Path newPath = getTemporaryPath(null);
203 int suffixStart = filename.lastIndexOf('.');
204 if (suffixStart != -1) {
205 newPath = newPath.suffix(filename.substring(suffixStart));
207 boolean success = FileUtil.copy(getLfs(), new Path(filename), getDfs(), newPath, false, getConf());
208 if (!success) {
209 throw new IOException("Couldn't copy " + filename);
211 return newPath.toString();
214 public static String fullPath(String filename) throws IOException {
215 if (filename.charAt(0) != '/') {
216 return new Path(getDfs().getWorkingDirectory(), filename).toString();
218 return filename;