2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Utility script for emulating common unix commands."""
8 from __future__
import print_function
21 if sys
.version_info
< (2, 7, 0):
22 sys
.stderr
.write("python 2.7 or later is required run this script\n")
26 def IncludeFiles(filters
, files
):
27 """Filter files based on inclusion lists
29 Return a list of files which match and of the Unix shell-style wildcards
30 provided, or return all the files if no filter is provided.
35 for file_filter
in filters
:
36 match |
= set(fnmatch
.filter(files
, file_filter
))
37 return [name
for name
in files
if name
in match
]
40 def ExcludeFiles(filters
, files
):
41 """Filter files based on exclusions lists
43 Return a list of files which do not match any of the Unix shell-style
44 wildcards provided, or return all the files if no filter is provided.
49 for file_filter
in filters
:
50 excludes
= set(fnmatch
.filter(files
, file_filter
))
52 return [name
for name
in files
if name
not in match
]
55 def CopyPath(options
, src
, dst
):
56 """CopyPath from src to dst
58 Copy a fully specified src to a fully specified dst. If src and dst are
59 both files, the dst file is removed first to prevent error. If and include
60 or exclude list are provided, the destination is first matched against that
64 if not IncludeFiles(options
.includes
, [src
]):
68 if not ExcludeFiles(options
.excludes
, [src
]):
72 print('cp %s %s' % (src
, dst
))
74 # If the source is a single file, copy it individually
75 if os
.path
.isfile(src
):
76 # We can not copy over a directory with a file.
77 if os
.path
.exists(dst
):
78 if not os
.path
.isfile(dst
):
79 msg
= "cp: cannot overwrite non-file '%s' with file." % dst
81 # If the destination exists as a file, remove it before copying to avoid
85 # Now copy to the non-existent fully qualified target
89 # Otherwise it's a directory, ignore it unless allowed
90 if os
.path
.isdir(src
):
91 if not options
.recursive
:
92 print("cp: omitting directory '%s'" % src
)
95 # We can not copy over a file with a directory.
96 if os
.path
.exists(dst
):
97 if not os
.path
.isdir(dst
):
98 msg
= "cp: cannot overwrite non-directory '%s' with directory." % dst
101 # if it didn't exist, create the directory
104 # Now copy all members
105 for filename
in os
.listdir(src
):
106 srcfile
= os
.path
.join(src
, filename
)
107 dstfile
= os
.path
.join(dst
, filename
)
108 CopyPath(options
, srcfile
, dstfile
)
113 """A Unix cp style copy.
115 Copies multiple sources to a single destination using the normal cp
116 semantics. In addition, it support inclusion and exclusion filters which
117 allows the copy to skip certain types of files.
119 parser
= argparse
.ArgumentParser(usage
='cp [Options] sources... dest',
120 description
=Copy
.__doc
__)
122 '-R', '-r', '--recursive', dest
='recursive', action
='store_true',
124 help='copy directories recursively.')
126 '-v', '--verbose', dest
='verbose', action
='store_true',
128 help='verbose output.')
130 '--include', dest
='includes', action
='append', default
=[],
131 help='include files matching this expression.')
133 '--exclude', dest
='excludes', action
='append', default
=[],
134 help='exclude files matching this expression.')
135 parser
.add_argument('srcs', nargs
='+', help='files to copy')
136 parser
.add_argument('dest', help='destination')
138 options
= parser
.parse_args(args
)
141 for src
in options
.srcs
:
142 files
= glob
.glob(src
)
144 raise OSError('cp: no such file or directory: ' + src
)
146 src_list
.extend(files
)
149 # If the destination is a directory, then append the basename of the src
150 # to the destination.
151 if os
.path
.isdir(options
.dest
):
152 CopyPath(options
, src
, os
.path
.join(options
.dest
, os
.path
.basename(src
)))
154 CopyPath(options
, src
, options
.dest
)
158 """A Unix style mkdir."""
159 parser
= argparse
.ArgumentParser(usage
='mkdir [Options] DIRECTORY...',
160 description
=Mkdir
.__doc
__)
162 '-p', '--parents', dest
='parents', action
='store_true',
164 help='ignore existing parents, create parents as needed.')
166 '-v', '--verbose', dest
='verbose', action
='store_true',
168 help='verbose output.')
169 parser
.add_argument('dirs', nargs
='+', help='directory(s) to create')
171 options
= parser
.parse_args(args
)
173 for dst
in options
.dirs
:
175 print('mkdir %s' % dst
)
179 if os
.path
.isdir(dst
):
182 raise OSError('mkdir: Already exists: ' + dst
)
184 raise OSError('mkdir: Failed to create: ' + dst
)
188 def MovePath(options
, src
, dst
):
189 """MovePath from src to dst.
191 Moves the src to the dst much like the Unix style mv command, except it
192 only handles one source at a time. Because of possible temporary failures
193 do to locks (such as anti-virus software on Windows), the function will retry
196 # if the destination is not an existing directory, then overwrite it
197 if os
.path
.isdir(dst
):
198 dst
= os
.path
.join(dst
, os
.path
.basename(src
))
200 # If the destination exists, the remove it
201 if os
.path
.exists(dst
):
203 Remove(['-vfr', dst
])
204 if os
.path
.exists(dst
):
205 raise OSError('mv: FAILED TO REMOVE ' + dst
)
207 raise OSError('mv: already exists ' + dst
)
212 except OSError as error
:
213 print('Failed on %s with %s, retrying' % (src
, error
))
217 raise OSError('mv: ' + error
)
221 """A Unix style mv."""
223 parser
= argparse
.ArgumentParser(usage
='mv [Options] sources... dest',
224 description
=Move
.__doc
__)
226 '-v', '--verbose', dest
='verbose', action
='store_true',
228 help='verbose output.')
230 '-f', '--force', dest
='force', action
='store_true',
232 help='force, do not error it files already exist.')
233 parser
.add_argument('srcs', nargs
='+')
234 parser
.add_argument('dest')
236 options
= parser
.parse_args(args
)
239 print('mv %s %s' % (' '.join(options
.srcs
), options
.dest
))
241 for src
in options
.srcs
:
242 MovePath(options
, src
, options
.dest
)
249 Removes the list of paths. Because of possible temporary failures do to locks
250 (such as anti-virus software on Windows), the function will retry up to five
253 parser
= argparse
.ArgumentParser(usage
='rm [Options] PATHS...',
254 description
=Remove
.__doc
__)
256 '-R', '-r', '--recursive', dest
='recursive', action
='store_true',
258 help='remove directories recursively.')
260 '-v', '--verbose', dest
='verbose', action
='store_true',
262 help='verbose output.')
264 '-f', '--force', dest
='force', action
='store_true',
266 help='force, do not error it files does not exist.')
267 parser
.add_argument('files', nargs
='+')
268 options
= parser
.parse_args(args
)
271 for pattern
in options
.files
:
272 dst_files
= glob
.glob(pattern
)
274 # Ignore non existing files when using force
277 raise OSError('rm: no such file or directory: ' + pattern
)
279 for dst
in dst_files
:
283 if os
.path
.isfile(dst
) or os
.path
.islink(dst
):
286 # Check every time, since it may have been deleted after the
287 # previous failed attempt.
288 if os
.path
.isfile(dst
) or os
.path
.islink(dst
):
291 except OSError as error
:
292 print('Failed remove with %s, retrying' % error
)
296 raise OSError('rm: ' + str(error
))
298 if options
.recursive
:
301 if os
.path
.isdir(dst
):
304 except OSError as error
:
305 print('Failed rmtree with %s, retrying' % error
)
309 raise OSError('rm: ' + str(error
))
311 except OSError as error
:
317 def MakeZipPath(os_path
, isdir
, iswindows
):
318 """Changes a path into zipfile format.
320 # doctest doesn't seem to honor r'' strings, so the backslashes need to be
322 >>> MakeZipPath(r'C:\\users\\foobar\\blah', False, True)
324 >>> MakeZipPath('/tmp/tmpfoobar/something', False, False)
325 'tmp/tmpfoobar/something'
326 >>> MakeZipPath('./somefile.txt', False, False)
328 >>> MakeZipPath('somedir', True, False)
330 >>> MakeZipPath('../dir/filename.txt', False, False)
331 '../dir/filename.txt'
332 >>> MakeZipPath('dir/../filename.txt', False, False)
338 # zipfile paths are always posix-style. They also have the drive
339 # letter and leading slashes removed.
340 zip_path
= ntpath
.splitdrive(os_path
)[1].replace('\\', '/')
341 if zip_path
.startswith('/'):
342 zip_path
= zip_path
[1:]
343 zip_path
= posixpath
.normpath(zip_path
)
344 # zipfile also always appends a slash to a directory name.
350 def OSMakeZipPath(os_path
):
351 return MakeZipPath(os_path
, os
.path
.isdir(os_path
), sys
.platform
== 'win32')
357 Compresses the listed files.
359 parser
= argparse
.ArgumentParser(description
=Zip
.__doc
__)
361 '-r', dest
='recursive', action
='store_true',
363 help='recurse into directories')
365 '-q', dest
='quiet', action
='store_true',
367 help='quiet operation')
368 parser
.add_argument('zipfile')
369 parser
.add_argument('filenames', nargs
='+')
370 options
= parser
.parse_args(args
)
373 for filename
in options
.filenames
:
374 globbed_src_args
= glob
.glob(filename
)
375 if not globbed_src_args
:
376 if not options
.quiet
:
377 print('zip warning: name not matched: %s' % filename
)
379 for src_file
in globbed_src_args
:
380 src_file
= os
.path
.normpath(src_file
)
381 src_files
.append(src_file
)
382 if options
.recursive
and os
.path
.isdir(src_file
):
383 for root
, dirs
, files
in os
.walk(src_file
):
385 src_files
.append(os
.path
.join(root
, dirname
))
386 for filename
in files
:
387 src_files
.append(os
.path
.join(root
, filename
))
389 # zip_data represents a list of the data to be written or appended to the
390 # zip_stream. It is a list of tuples:
391 # (OS file path, zip path/zip file info, and file data)
392 # In all cases one of the |os path| or the |file data| will be None.
393 # |os path| is None when there is no OS file to write to the archive (i.e.
394 # the file data already existed in the archive). |file data| is None when the
395 # file is new (never existed in the archive) or being updated.
397 new_files_to_add
= [OSMakeZipPath(src_file
) for src_file
in src_files
]
398 zip_path_to_os_path_dict
= dict((new_files_to_add
[i
], src_files
[i
])
399 for i
in range(len(src_files
)))
401 if os
.path
.exists(options
.zipfile
):
402 with zipfile
.ZipFile(options
.zipfile
, 'r') as zip_stream
:
404 files_to_update
= set(new_files_to_add
).intersection(
405 set(zip_stream
.namelist()))
407 # As far as I can tell, there is no way to update a zip entry using
408 # zipfile; the best you can do is rewrite the archive.
409 # Iterate through the zipfile to maintain file order.
411 for zip_path
in zip_stream
.namelist():
412 if zip_path
in files_to_update
:
413 os_path
= zip_path_to_os_path_dict
[zip_path
]
414 zip_data
.append((os_path
, zip_path
, None))
415 new_files_to_add
.remove(zip_path
)
417 file_bytes
= zip_stream
.read(zip_path
)
418 file_info
= zip_stream
.getinfo(zip_path
)
419 zip_data
.append((None, file_info
, file_bytes
))
423 for zip_path
in new_files_to_add
:
424 zip_data
.append((zip_path_to_os_path_dict
[zip_path
], zip_path
, None))
427 print('zip error: Nothing to do! (%s)' % options
.zipfile
)
430 with zipfile
.ZipFile(options
.zipfile
, write_mode
,
431 zipfile
.ZIP_DEFLATED
) as zip_stream
:
432 for os_path
, file_info_or_zip_path
, file_bytes
in zip_data
:
433 if isinstance(file_info_or_zip_path
, zipfile
.ZipInfo
):
434 zip_path
= file_info_or_zip_path
.filename
436 zip_path
= file_info_or_zip_path
439 st
= os
.stat(os_path
)
440 if stat
.S_ISDIR(st
.st_mode
):
441 # Python 2.6 on the buildbots doesn't support writing directories to
442 # zip files. This was resolved in a later version of Python 2.6.
443 # We'll work around it by writing an empty file with the correct
444 # path. (This is basically what later versions do anyway.)
445 zip_info
= zipfile
.ZipInfo()
446 zip_info
.filename
= zip_path
447 zip_info
.date_time
= time
.localtime(st
.st_mtime
)[0:6]
448 zip_info
.compress_type
= zip_stream
.compression
449 zip_info
.flag_bits
= 0x00
450 zip_info
.external_attr
= (st
[0] & 0xFFFF) << 16L
452 zip_info
.compress_size
= 0
453 zip_info
.file_size
= 0
454 zip_stream
.writestr(zip_info
, '')
456 zip_stream
.write(os_path
, zip_path
)
458 zip_stream
.writestr(file_info_or_zip_path
, file_bytes
)
460 if not options
.quiet
:
461 if zip_path
in new_files_to_add
:
464 operation
= 'updating'
465 zip_info
= zip_stream
.getinfo(zip_path
)
466 if (zip_info
.compress_type
== zipfile
.ZIP_STORED
or
467 zip_info
.file_size
== 0):
468 print(' %s: %s (stored 0%%)' % (operation
, zip_path
))
469 elif zip_info
.compress_type
== zipfile
.ZIP_DEFLATED
:
470 print(' %s: %s (deflated %d%%)' % (operation
, zip_path
,
471 100 - zip_info
.compress_size
* 100 / zip_info
.file_size
))
476 def FindExeInPath(filename
):
477 env_path
= os
.environ
.get('PATH', '')
478 paths
= env_path
.split(os
.pathsep
)
480 def IsExecutableFile(path
):
481 return os
.path
.isfile(path
) and os
.access(path
, os
.X_OK
)
483 if os
.path
.sep
in filename
:
484 if IsExecutableFile(filename
):
488 filepath
= os
.path
.join(path
, filename
)
489 if IsExecutableFile(filepath
):
490 return os
.path
.abspath(os
.path
.join(path
, filename
))
494 """A Unix style which.
496 Looks for all arguments in the PATH environment variable, and prints their
497 path if they are executable files.
499 Note: If you pass an argument with a path to which, it will just test if it
500 is executable, not if it is in the path.
502 parser
= argparse
.ArgumentParser(description
=Which
.__doc
__)
503 parser
.add_argument('files', nargs
='+')
504 options
= parser
.parse_args(args
)
507 for filename
in options
.files
:
508 fullname
= FindExeInPath(filename
)
529 print('No command specified')
530 print('Available commands: %s' % ' '.join(FuncMap
))
533 func
= FuncMap
.get(func_name
)
535 print('Do not recognize command: %s' % func_name
)
536 print('Available commands: %s' % ' '.join(FuncMap
))
539 return func(args
[1:])
540 except KeyboardInterrupt:
541 print('%s: interrupted' % func_name
)
544 if __name__
== '__main__':
545 sys
.exit(main(sys
.argv
[1:]))