archive: handle commits with an empty tree
[git/jnareb-git.git] / git_remote_helpers / util.py
blobfbbb01b14619c1d2ed6bcc8f304f019fbe98697f
1 #!/usr/bin/env python
3 """Misc. useful functionality used by the rest of this package.
5 This module provides common functionality used by the other modules in
6 this package.
8 """
10 import sys
11 import os
12 import subprocess
14 try:
15 from subprocess import CalledProcessError
16 except ImportError:
17 # from python2.7:subprocess.py
18 # Exception classes used by this module.
19 class CalledProcessError(Exception):
20 """This exception is raised when a process run by check_call() returns
21 a non-zero exit status. The exit status will be stored in the
22 returncode attribute."""
23 def __init__(self, returncode, cmd):
24 self.returncode = returncode
25 self.cmd = cmd
26 def __str__(self):
27 return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
30 # Whether or not to show debug messages
31 DEBUG = False
33 def notify(msg, *args):
34 """Print a message to stderr."""
35 print >> sys.stderr, msg % args
37 def debug (msg, *args):
38 """Print a debug message to stderr when DEBUG is enabled."""
39 if DEBUG:
40 print >> sys.stderr, msg % args
42 def error (msg, *args):
43 """Print an error message to stderr."""
44 print >> sys.stderr, "ERROR:", msg % args
46 def warn(msg, *args):
47 """Print a warning message to stderr."""
48 print >> sys.stderr, "warning:", msg % args
50 def die (msg, *args):
51 """Print as error message to stderr and exit the program."""
52 error(msg, *args)
53 sys.exit(1)
56 class ProgressIndicator(object):
58 """Simple progress indicator.
60 Displayed as a spinning character by default, but can be customized
61 by passing custom messages that overrides the spinning character.
63 """
65 States = ("|", "/", "-", "\\")
67 def __init__ (self, prefix = "", f = sys.stdout):
68 """Create a new ProgressIndicator, bound to the given file object."""
69 self.n = 0 # Simple progress counter
70 self.f = f # Progress is written to this file object
71 self.prev_len = 0 # Length of previous msg (to be overwritten)
72 self.prefix = prefix # Prefix prepended to each progress message
73 self.prefix_lens = [] # Stack of prefix string lengths
75 def pushprefix (self, prefix):
76 """Append the given prefix onto the prefix stack."""
77 self.prefix_lens.append(len(self.prefix))
78 self.prefix += prefix
80 def popprefix (self):
81 """Remove the last prefix from the prefix stack."""
82 prev_len = self.prefix_lens.pop()
83 self.prefix = self.prefix[:prev_len]
85 def __call__ (self, msg = None, lf = False):
86 """Indicate progress, possibly with a custom message."""
87 if msg is None:
88 msg = self.States[self.n % len(self.States)]
89 msg = self.prefix + msg
90 print >> self.f, "\r%-*s" % (self.prev_len, msg),
91 self.prev_len = len(msg.expandtabs())
92 if lf:
93 print >> self.f
94 self.prev_len = 0
95 self.n += 1
97 def finish (self, msg = "done", noprefix = False):
98 """Finalize progress indication with the given message."""
99 if noprefix:
100 self.prefix = ""
101 self(msg, True)
104 def start_command (args, cwd = None, shell = False, add_env = None,
105 stdin = subprocess.PIPE, stdout = subprocess.PIPE,
106 stderr = subprocess.PIPE):
107 """Start the given command, and return a subprocess object.
109 This provides a simpler interface to the subprocess module.
112 env = None
113 if add_env is not None:
114 env = os.environ.copy()
115 env.update(add_env)
116 return subprocess.Popen(args, bufsize = 1, stdin = stdin, stdout = stdout,
117 stderr = stderr, cwd = cwd, shell = shell,
118 env = env, universal_newlines = True)
121 def run_command (args, cwd = None, shell = False, add_env = None,
122 flag_error = True):
123 """Run the given command to completion, and return its results.
125 This provides a simpler interface to the subprocess module.
127 The results are formatted as a 3-tuple: (exit_code, output, errors)
129 If flag_error is enabled, Error messages will be produced if the
130 subprocess terminated with a non-zero exit code and/or stderr
131 output.
133 The other arguments are passed on to start_command().
136 process = start_command(args, cwd, shell, add_env)
137 (output, errors) = process.communicate()
138 exit_code = process.returncode
139 if flag_error and errors:
140 error("'%s' returned errors:\n---\n%s---", " ".join(args), errors)
141 if flag_error and exit_code:
142 error("'%s' returned exit code %i", " ".join(args), exit_code)
143 return (exit_code, output, errors)
146 # from python2.7:subprocess.py
147 def call(*popenargs, **kwargs):
148 """Run command with arguments. Wait for command to complete, then
149 return the returncode attribute.
151 The arguments are the same as for the Popen constructor. Example:
153 retcode = call(["ls", "-l"])
155 return subprocess.Popen(*popenargs, **kwargs).wait()
158 # from python2.7:subprocess.py
159 def check_call(*popenargs, **kwargs):
160 """Run command with arguments. Wait for command to complete. If
161 the exit code was zero then return, otherwise raise
162 CalledProcessError. The CalledProcessError object will have the
163 return code in the returncode attribute.
165 The arguments are the same as for the Popen constructor. Example:
167 check_call(["ls", "-l"])
169 retcode = call(*popenargs, **kwargs)
170 if retcode:
171 cmd = kwargs.get("args")
172 if cmd is None:
173 cmd = popenargs[0]
174 raise CalledProcessError(retcode, cmd)
175 return 0
178 # from python2.7:subprocess.py
179 def check_output(*popenargs, **kwargs):
180 r"""Run command with arguments and return its output as a byte string.
182 If the exit code was non-zero it raises a CalledProcessError. The
183 CalledProcessError object will have the return code in the returncode
184 attribute and output in the output attribute.
186 The arguments are the same as for the Popen constructor. Example:
188 >>> check_output(["ls", "-l", "/dev/null"])
189 'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n'
191 The stdout argument is not allowed as it is used internally.
192 To capture standard error in the result, use stderr=STDOUT.
194 >>> check_output(["/bin/sh", "-c",
195 ... "ls -l non_existent_file ; exit 0"],
196 ... stderr=STDOUT)
197 'ls: non_existent_file: No such file or directory\n'
199 if 'stdout' in kwargs:
200 raise ValueError('stdout argument not allowed, it will be overridden.')
201 process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
202 output, unused_err = process.communicate()
203 retcode = process.poll()
204 if retcode:
205 cmd = kwargs.get("args")
206 if cmd is None:
207 cmd = popenargs[0]
208 raise subprocess.CalledProcessError(retcode, cmd)
209 return output
212 def file_reader_method (missing_ok = False):
213 """Decorator for simplifying reading of files.
215 If missing_ok is True, a failure to open a file for reading will
216 not raise the usual IOError, but instead the wrapped method will be
217 called with f == None. The method must in this case properly
218 handle f == None.
221 def _wrap (method):
222 """Teach given method to handle both filenames and file objects.
224 The given method must take a file object as its second argument
225 (the first argument being 'self', of course). This decorator
226 will take a filename given as the second argument and promote
227 it to a file object.
230 def _wrapped_method (self, filename, *args, **kwargs):
231 if isinstance(filename, file):
232 f = filename
233 else:
234 try:
235 f = open(filename, 'r')
236 except IOError:
237 if missing_ok:
238 f = None
239 else:
240 raise
241 try:
242 return method(self, f, *args, **kwargs)
243 finally:
244 if not isinstance(filename, file) and f:
245 f.close()
246 return _wrapped_method
247 return _wrap
250 def file_writer_method (method):
251 """Decorator for simplifying writing of files.
253 Enables the given method to handle both filenames and file objects.
255 The given method must take a file object as its second argument
256 (the first argument being 'self', of course). This decorator will
257 take a filename given as the second argument and promote it to a
258 file object.
261 def _new_method (self, filename, *args, **kwargs):
262 if isinstance(filename, file):
263 f = filename
264 else:
265 # Make sure the containing directory exists
266 parent_dir = os.path.dirname(filename)
267 if not os.path.isdir(parent_dir):
268 os.makedirs(parent_dir)
269 f = open(filename, 'w')
270 try:
271 return method(self, f, *args, **kwargs)
272 finally:
273 if not isinstance(filename, file):
274 f.close()
275 return _new_method