2 * Copyright (c) 2007 Yahoo! Inc. All rights reserved.
3 * See accompanying LICENSE file.
5 package com
.yahoo
.pig
.impl
.io
;
7 import java
.io
.FileInputStream
;
8 import java
.io
.FileNotFoundException
;
9 import java
.io
.FileOutputStream
;
10 import java
.io
.IOException
;
11 import java
.io
.InputStream
;
12 import java
.io
.OutputStream
;
13 import java
.util
.Random
;
14 import java
.util
.Stack
;
16 import org
.apache
.hadoop
.conf
.Configuration
;
17 import org
.apache
.hadoop
.fs
.FileSystem
;
18 import org
.apache
.hadoop
.fs
.FileUtil
;
19 import org
.apache
.hadoop
.fs
.Path
;
21 import com
.yahoo
.pig
.PigServer
.ExecType
;
22 import com
.yahoo
.pig
.impl
.mapreduceExec
.MapReduceLauncher
;
24 public class FileLocalizer
{
25 static public final String LOCAL_PREFIX
= "file:";
26 static public final String HADOOP_PREFIX
= "hadoop:";
28 static FileSystem
getDfs() throws IOException
{
29 return MapReduceLauncher
.getDfs();
32 static FileSystem
getLfs() throws IOException
{
33 return MapReduceLauncher
.getLfs();
36 static Configuration
getConf() throws IOException
{
37 return MapReduceLauncher
.getConf();
40 static class DFSInputStreamIterator
extends InputStream
{
45 public DFSInputStreamIterator(Path paths
[]) {
49 private boolean isEOF() throws IOException
{
50 if (current
== null) {
51 if (currentPath
== paths
.length
)
53 current
= getDfs().open(paths
[currentPath
++]);
58 private void doNext() throws IOException
{
63 public int read() throws IOException
{
65 int rc
= current
.read();
73 public int available() throws IOException
{
76 return current
.available();
79 public void close() throws IOException
{
80 if (current
!= null) {
84 currentPath
= paths
.length
;
87 public int read(byte[] b
, int off
, int len
) throws IOException
{
89 while (!isEOF() && len
> 0) {
90 int rc
= current
.read(b
, off
, len
);
99 return count
== 0 ?
(isEOF() ?
-1 : 0) : count
;
102 public int read(byte[] b
) throws IOException
{
103 return read(b
, 0, b
.length
);
106 public long skip(long n
) throws IOException
{
107 while (!isEOF() && n
> 0) {
108 n
-= current
.skip(n
);
115 static String
checkDefaultPrefix(ExecType execType
, String fileSpec
) {
116 if (fileSpec
.startsWith(HADOOP_PREFIX
) || fileSpec
.startsWith(LOCAL_PREFIX
))
118 return (execType
== ExecType
.LOCAL ? LOCAL_PREFIX
: HADOOP_PREFIX
) + fileSpec
;
121 static public InputStream
open(ExecType execType
, String fileSpec
) throws IOException
{
122 fileSpec
= checkDefaultPrefix(execType
, fileSpec
);
123 if (fileSpec
.startsWith(HADOOP_PREFIX
)) {
125 fileSpec
= fileSpec
.substring(HADOOP_PREFIX
.length());
126 Path path
= new Path(fileSpec
);
127 if (getDfs().isFile(path
))
128 return getDfs().open(path
);
129 Path paths
[] = getDfs().listPaths(path
);
130 return new DFSInputStreamIterator(paths
);
132 if (fileSpec
.startsWith(LOCAL_PREFIX
)) {
133 fileSpec
= fileSpec
.substring(LOCAL_PREFIX
.length());
135 return new FileInputStream(fileSpec
);
139 static public OutputStream
create(ExecType execType
, String fileSpec
) throws IOException
{
140 return create(execType
,fileSpec
,false);
143 static public OutputStream
create(ExecType execType
, String fileSpec
, boolean append
) throws IOException
{
144 fileSpec
= checkDefaultPrefix(execType
, fileSpec
);
145 if (fileSpec
.startsWith(HADOOP_PREFIX
)) {
147 fileSpec
= fileSpec
.substring(HADOOP_PREFIX
.length());
148 return getDfs().create(new Path(fileSpec
));
150 if (fileSpec
.startsWith(LOCAL_PREFIX
)) {
151 fileSpec
= fileSpec
.substring(LOCAL_PREFIX
.length());
153 return new FileOutputStream(fileSpec
,append
);
158 static Stack
<Path
> toDelete
= new Stack
<Path
>();
159 static Random r
= new Random();
160 static Path relativeRoot
;
161 static boolean initialized
= false;
162 static private void init() {
165 relativeRoot
= new Path("/tmp/temp" + r
.nextInt());
166 toDelete
.push(relativeRoot
);
167 Runtime
.getRuntime().addShutdownHook(new Thread() {
169 while (!toDelete
.empty()) {
171 Path path
= toDelete
.pop();
172 getDfs().delete(path
);
173 } catch (IOException e
) {
182 public static synchronized Path
getTemporaryPath(Path relative
) throws IOException
{
184 if (relative
== null) {
185 relative
= relativeRoot
;
187 if (!getDfs().exists(relativeRoot
))
188 getDfs().mkdirs(relativeRoot
);
189 Path path
= new Path(relative
, "tmp" + r
.nextInt());
194 public static String
hadoopify(String filename
) throws IOException
{
195 if (filename
.startsWith(LOCAL_PREFIX
)) {
196 filename
= filename
.substring(LOCAL_PREFIX
.length());
198 Path localPath
= new Path(filename
);
199 if (!getLfs().exists(localPath
)) {
200 throw new FileNotFoundException(filename
);
202 Path newPath
= getTemporaryPath(null);
203 int suffixStart
= filename
.lastIndexOf('.');
204 if (suffixStart
!= -1) {
205 newPath
= newPath
.suffix(filename
.substring(suffixStart
));
207 boolean success
= FileUtil
.copy(getLfs(), new Path(filename
), getDfs(), newPath
, false, getConf());
209 throw new IOException("Couldn't copy " + filename
);
211 return newPath
.toString();
214 public static String
fullPath(String filename
) throws IOException
{
215 if (filename
.charAt(0) != '/') {
216 return new Path(getDfs().getWorkingDirectory(), filename
).toString();