2 # Copyright (c) 2012 The Chromium Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Utility script for emulating common unix commands."""
8 from __future__
import print_function
22 if sys
.version_info
< (2, 7, 0):
23 sys
.stderr
.write("python 2.7 or later is required run this script\n")
27 def IncludeFiles(filters
, files
):
28 """Filter files based on inclusion lists
30 Return a list of files which match and of the Unix shell-style wildcards
31 provided, or return all the files if no filter is provided.
36 for file_filter
in filters
:
37 match |
= set(fnmatch
.filter(files
, file_filter
))
38 return [name
for name
in files
if name
in match
]
41 def ExcludeFiles(filters
, files
):
42 """Filter files based on exclusions lists
44 Return a list of files which do not match any of the Unix shell-style
45 wildcards provided, or return all the files if no filter is provided.
50 for file_filter
in filters
:
51 excludes
= set(fnmatch
.filter(files
, file_filter
))
53 return [name
for name
in files
if name
not in match
]
56 def CopyPath(options
, src
, dst
):
57 """CopyPath from src to dst
59 Copy a fully specified src to a fully specified dst. If src and dst are
60 both files, the dst file is removed first to prevent error. If and include
61 or exclude list are provided, the destination is first matched against that
65 if not IncludeFiles(options
.includes
, [src
]):
69 if not ExcludeFiles(options
.excludes
, [src
]):
73 print('cp %s %s' % (src
, dst
))
75 # If the source is a single file, copy it individually
76 if os
.path
.isfile(src
):
77 # We can not copy over a directory with a file.
78 if os
.path
.exists(dst
):
79 if not os
.path
.isfile(dst
):
80 msg
= "cp: cannot overwrite non-file '%s' with file." % dst
82 # If the destination exists as a file, remove it before copying to avoid
86 # Now copy to the non-existent fully qualified target
90 # Otherwise it's a directory, ignore it unless allowed
91 if os
.path
.isdir(src
):
92 if not options
.recursive
:
93 print("cp: omitting directory '%s'" % src
)
96 # We can not copy over a file with a directory.
97 if os
.path
.exists(dst
):
98 if not os
.path
.isdir(dst
):
99 msg
= "cp: cannot overwrite non-directory '%s' with directory." % dst
102 # if it didn't exist, create the directory
105 # Now copy all members
106 for filename
in os
.listdir(src
):
107 srcfile
= os
.path
.join(src
, filename
)
108 dstfile
= os
.path
.join(dst
, filename
)
109 CopyPath(options
, srcfile
, dstfile
)
114 """A Unix cp style copy.
116 Copies multiple sources to a single destination using the normal cp
117 semantics. In addition, it support inclusion and exclusion filters which
118 allows the copy to skip certain types of files.
120 parser
= argparse
.ArgumentParser(usage
='cp [Options] sources... dest',
121 description
=Copy
.__doc
__)
123 '-R', '-r', '--recursive', dest
='recursive', action
='store_true',
125 help='copy directories recursively.')
127 '-v', '--verbose', dest
='verbose', action
='store_true',
129 help='verbose output.')
131 '--include', dest
='includes', action
='append', default
=[],
132 help='include files matching this expression.')
134 '--exclude', dest
='excludes', action
='append', default
=[],
135 help='exclude files matching this expression.')
136 parser
.add_argument('srcs', nargs
='+', help='files to copy')
137 parser
.add_argument('dest', help='destination')
139 options
= parser
.parse_args(args
)
142 for src
in options
.srcs
:
143 files
= glob
.glob(src
)
145 raise OSError('cp: no such file or directory: ' + src
)
147 src_list
.extend(files
)
150 # If the destination is a directory, then append the basename of the src
151 # to the destination.
152 if os
.path
.isdir(options
.dest
):
153 CopyPath(options
, src
, os
.path
.join(options
.dest
, os
.path
.basename(src
)))
155 CopyPath(options
, src
, options
.dest
)
159 """A Unix style mkdir."""
160 parser
= argparse
.ArgumentParser(usage
='mkdir [Options] DIRECTORY...',
161 description
=Mkdir
.__doc
__)
163 '-p', '--parents', dest
='parents', action
='store_true',
165 help='ignore existing parents, create parents as needed.')
167 '-v', '--verbose', dest
='verbose', action
='store_true',
169 help='verbose output.')
170 parser
.add_argument('dirs', nargs
='+', help='directory(s) to create')
172 options
= parser
.parse_args(args
)
174 for dst
in options
.dirs
:
176 print('mkdir %s' % dst
)
180 if os
.path
.isdir(dst
):
183 raise OSError('mkdir: Already exists: ' + dst
)
185 raise OSError('mkdir: Failed to create: ' + dst
)
189 def MovePath(options
, src
, dst
):
190 """MovePath from src to dst.
192 Moves the src to the dst much like the Unix style mv command, except it
193 only handles one source at a time. Because of possible temporary failures
194 do to locks (such as anti-virus software on Windows), the function will retry
197 # if the destination is not an existing directory, then overwrite it
198 if os
.path
.isdir(dst
):
199 dst
= os
.path
.join(dst
, os
.path
.basename(src
))
201 # If the destination exists, the remove it
202 if os
.path
.exists(dst
):
204 Remove(['-vfr', dst
])
205 if os
.path
.exists(dst
):
206 raise OSError('mv: FAILED TO REMOVE ' + dst
)
208 raise OSError('mv: already exists ' + dst
)
213 except OSError as error
:
214 print('Failed on %s with %s, retrying' % (src
, error
))
218 raise OSError('mv: ' + error
)
222 """A Unix style mv."""
224 parser
= argparse
.ArgumentParser(usage
='mv [Options] sources... dest',
225 description
=Move
.__doc
__)
227 '-v', '--verbose', dest
='verbose', action
='store_true',
229 help='verbose output.')
231 '-f', '--force', dest
='force', action
='store_true',
233 help='force, do not error it files already exist.')
234 parser
.add_argument('srcs', nargs
='+')
235 parser
.add_argument('dest')
237 options
= parser
.parse_args(args
)
240 print('mv %s %s' % (' '.join(options
.srcs
), options
.dest
))
242 for src
in options
.srcs
:
243 MovePath(options
, src
, options
.dest
)
250 Removes the list of paths. Because of possible temporary failures do to locks
251 (such as anti-virus software on Windows), the function will retry up to five
254 parser
= argparse
.ArgumentParser(usage
='rm [Options] PATHS...',
255 description
=Remove
.__doc
__)
257 '-R', '-r', '--recursive', dest
='recursive', action
='store_true',
259 help='remove directories recursively.')
261 '-v', '--verbose', dest
='verbose', action
='store_true',
263 help='verbose output.')
265 '-f', '--force', dest
='force', action
='store_true',
267 help='force, do not error it files does not exist.')
268 parser
.add_argument('files', nargs
='+')
269 options
= parser
.parse_args(args
)
272 for pattern
in options
.files
:
273 dst_files
= glob
.glob(pattern
)
275 # Ignore non existing files when using force
278 raise OSError('rm: no such file or directory: ' + pattern
)
280 for dst
in dst_files
:
284 if os
.path
.isfile(dst
) or os
.path
.islink(dst
):
287 # Check every time, since it may have been deleted after the
288 # previous failed attempt.
289 if os
.path
.isfile(dst
) or os
.path
.islink(dst
):
292 except OSError as error
:
293 print('Failed remove with %s, retrying' % error
)
297 raise OSError('rm: ' + str(error
))
299 if options
.recursive
:
302 if os
.path
.isdir(dst
):
303 if sys
.platform
== 'win32':
304 # shutil.rmtree doesn't handle junctions properly. Let's just
305 # shell out to rd for this.
306 subprocess
.check_call([
307 'rd', '/s', '/q', os
.path
.normpath(dst
)], shell
=True)
311 except OSError as error
:
312 print('Failed rmtree with %s, retrying' % error
)
316 raise OSError('rm: ' + str(error
))
318 except OSError as error
:
324 def MakeZipPath(os_path
, isdir
, iswindows
):
325 """Changes a path into zipfile format.
327 # doctest doesn't seem to honor r'' strings, so the backslashes need to be
329 >>> MakeZipPath(r'C:\\users\\foobar\\blah', False, True)
331 >>> MakeZipPath('/tmp/tmpfoobar/something', False, False)
332 'tmp/tmpfoobar/something'
333 >>> MakeZipPath('./somefile.txt', False, False)
335 >>> MakeZipPath('somedir', True, False)
337 >>> MakeZipPath('../dir/filename.txt', False, False)
338 '../dir/filename.txt'
339 >>> MakeZipPath('dir/../filename.txt', False, False)
345 # zipfile paths are always posix-style. They also have the drive
346 # letter and leading slashes removed.
347 zip_path
= ntpath
.splitdrive(os_path
)[1].replace('\\', '/')
348 if zip_path
.startswith('/'):
349 zip_path
= zip_path
[1:]
350 zip_path
= posixpath
.normpath(zip_path
)
351 # zipfile also always appends a slash to a directory name.
357 def OSMakeZipPath(os_path
):
358 return MakeZipPath(os_path
, os
.path
.isdir(os_path
), sys
.platform
== 'win32')
364 Compresses the listed files.
366 parser
= argparse
.ArgumentParser(description
=Zip
.__doc
__)
368 '-r', dest
='recursive', action
='store_true',
370 help='recurse into directories')
372 '-q', dest
='quiet', action
='store_true',
374 help='quiet operation')
375 parser
.add_argument('zipfile')
376 parser
.add_argument('filenames', nargs
='+')
377 options
= parser
.parse_args(args
)
380 for filename
in options
.filenames
:
381 globbed_src_args
= glob
.glob(filename
)
382 if not globbed_src_args
:
383 if not options
.quiet
:
384 print('zip warning: name not matched: %s' % filename
)
386 for src_file
in globbed_src_args
:
387 src_file
= os
.path
.normpath(src_file
)
388 src_files
.append(src_file
)
389 if options
.recursive
and os
.path
.isdir(src_file
):
390 for root
, dirs
, files
in os
.walk(src_file
):
392 src_files
.append(os
.path
.join(root
, dirname
))
393 for filename
in files
:
394 src_files
.append(os
.path
.join(root
, filename
))
396 # zip_data represents a list of the data to be written or appended to the
397 # zip_stream. It is a list of tuples:
398 # (OS file path, zip path/zip file info, and file data)
399 # In all cases one of the |os path| or the |file data| will be None.
400 # |os path| is None when there is no OS file to write to the archive (i.e.
401 # the file data already existed in the archive). |file data| is None when the
402 # file is new (never existed in the archive) or being updated.
404 new_files_to_add
= [OSMakeZipPath(src_file
) for src_file
in src_files
]
405 zip_path_to_os_path_dict
= dict((new_files_to_add
[i
], src_files
[i
])
406 for i
in range(len(src_files
)))
408 if os
.path
.exists(options
.zipfile
):
409 with zipfile
.ZipFile(options
.zipfile
, 'r') as zip_stream
:
411 files_to_update
= set(new_files_to_add
).intersection(
412 set(zip_stream
.namelist()))
414 # As far as I can tell, there is no way to update a zip entry using
415 # zipfile; the best you can do is rewrite the archive.
416 # Iterate through the zipfile to maintain file order.
418 for zip_path
in zip_stream
.namelist():
419 if zip_path
in files_to_update
:
420 os_path
= zip_path_to_os_path_dict
[zip_path
]
421 zip_data
.append((os_path
, zip_path
, None))
422 new_files_to_add
.remove(zip_path
)
424 file_bytes
= zip_stream
.read(zip_path
)
425 file_info
= zip_stream
.getinfo(zip_path
)
426 zip_data
.append((None, file_info
, file_bytes
))
430 for zip_path
in new_files_to_add
:
431 zip_data
.append((zip_path_to_os_path_dict
[zip_path
], zip_path
, None))
434 print('zip error: Nothing to do! (%s)' % options
.zipfile
)
437 with zipfile
.ZipFile(options
.zipfile
, write_mode
,
438 zipfile
.ZIP_DEFLATED
) as zip_stream
:
439 for os_path
, file_info_or_zip_path
, file_bytes
in zip_data
:
440 if isinstance(file_info_or_zip_path
, zipfile
.ZipInfo
):
441 zip_path
= file_info_or_zip_path
.filename
443 zip_path
= file_info_or_zip_path
446 st
= os
.stat(os_path
)
447 if stat
.S_ISDIR(st
.st_mode
):
448 # Python 2.6 on the buildbots doesn't support writing directories to
449 # zip files. This was resolved in a later version of Python 2.6.
450 # We'll work around it by writing an empty file with the correct
451 # path. (This is basically what later versions do anyway.)
452 zip_info
= zipfile
.ZipInfo()
453 zip_info
.filename
= zip_path
454 zip_info
.date_time
= time
.localtime(st
.st_mtime
)[0:6]
455 zip_info
.compress_type
= zip_stream
.compression
456 zip_info
.flag_bits
= 0x00
457 zip_info
.external_attr
= (st
[0] & 0xFFFF) << 16L
459 zip_info
.compress_size
= 0
460 zip_info
.file_size
= 0
461 zip_stream
.writestr(zip_info
, '')
463 zip_stream
.write(os_path
, zip_path
)
465 zip_stream
.writestr(file_info_or_zip_path
, file_bytes
)
467 if not options
.quiet
:
468 if zip_path
in new_files_to_add
:
471 operation
= 'updating'
472 zip_info
= zip_stream
.getinfo(zip_path
)
473 if (zip_info
.compress_type
== zipfile
.ZIP_STORED
or
474 zip_info
.file_size
== 0):
475 print(' %s: %s (stored 0%%)' % (operation
, zip_path
))
476 elif zip_info
.compress_type
== zipfile
.ZIP_DEFLATED
:
477 print(' %s: %s (deflated %d%%)' % (operation
, zip_path
,
478 100 - zip_info
.compress_size
* 100 / zip_info
.file_size
))
483 def FindExeInPath(filename
):
484 env_path
= os
.environ
.get('PATH', '')
485 paths
= env_path
.split(os
.pathsep
)
487 def IsExecutableFile(path
):
488 return os
.path
.isfile(path
) and os
.access(path
, os
.X_OK
)
490 if os
.path
.sep
in filename
:
491 if IsExecutableFile(filename
):
495 filepath
= os
.path
.join(path
, filename
)
496 if IsExecutableFile(filepath
):
497 return os
.path
.abspath(os
.path
.join(path
, filename
))
501 """A Unix style which.
503 Looks for all arguments in the PATH environment variable, and prints their
504 path if they are executable files.
506 Note: If you pass an argument with a path to which, it will just test if it
507 is executable, not if it is in the path.
509 parser
= argparse
.ArgumentParser(description
=Which
.__doc
__)
510 parser
.add_argument('files', nargs
='+')
511 options
= parser
.parse_args(args
)
514 for filename
in options
.files
:
515 fullname
= FindExeInPath(filename
)
536 print('No command specified')
537 print('Available commands: %s' % ' '.join(FuncMap
))
540 func
= FuncMap
.get(func_name
)
542 print('Do not recognize command: %s' % func_name
)
543 print('Available commands: %s' % ' '.join(FuncMap
))
546 return func(args
[1:])
547 except KeyboardInterrupt:
548 print('%s: interrupted' % func_name
)
551 if __name__
== '__main__':
552 sys
.exit(main(sys
.argv
[1:]))