Test and implement a wrapper for ‘subprocess.check_call’.
[dput.git] / dput / helper / dputhelper.py
blobb623b8381acdbb3ba8bcb7181a51643d4448998e
1 # -*- coding: utf-8; -*-
3 # dput/helper/dputhelper.py
4 # Part of ‘dput’, a Debian package upload toolkit.
6 # Copyright © 2015 Ben Finney <ben+python@benfinney.id.au>
7 # Copyright © 2009–2010 Y Giridhar Appaji Nag <appaji@debian.org>
8 # Copyright © 2007–2008 Thomas Viehmann <tv@beamnet.de>
10 # This is free software: you may copy, modify, and/or distribute this work
11 # under the terms of the GNU General Public License as published by the
12 # Free Software Foundation; version 2 of that license or any later version.
13 # No warranty expressed or implied. See the file ‘LICENSE.GPL-2’ for details.
15 """ Helper code for Dput. """
17 import os
18 import sys
19 import subprocess
20 import time
22 import pkg_resources
25 class DputException(Exception):
26 pass
29 class DputUploadFatalException(DputException):
30 pass
33 def spawnv(mode, file, args):
34 """ Wrap spawnv with error output. """
35 ret = os.spawnv(mode, file, args)
36 if ret == 127:
37 print "Error: Failed to execute '" + file + "'."
38 print " The file may not exist or not be executable."
39 elif ret != 0:
40 print "Warning: The execution of '" + file + "' as"
41 print " '" + " ".join(args) + "'"
42 print " returned a nonzero exit code."
43 return ret
46 # This wrapper is intended as a migration target for the existing
47 # `spawnv` wrapper, and will produce the same output for now.
48 def check_call(args, *posargs, **kwargs):
49 """ Wrap `subprocess.check_call` with error output. """
50 command_file_path = args[0]
51 try:
52 subprocess.check_call(args, *posargs, **kwargs)
53 exit_status = 0
54 except subprocess.CalledProcessError as exc:
55 exit_status = exc.returncode
56 if exit_status == 127:
57 sys.stderr.write(
58 "Error: Failed to execute '{path}'.\n"
59 " "
60 "The file may not exist or not be executable.\n".format(
61 path=command_file_path))
62 else:
63 sys.stderr.write(
64 "Warning: The execution of '{path}' as\n"
65 " '{command}'\n"
66 " returned a nonzero exit code.\n".format(
67 path=command_file_path, command=" ".join(args)))
68 return exit_status
71 class TimestampFile:
73 def __init__(self, f):
74 self.f = f
75 self.buf = ""
77 def __getattr__(self, name):
78 return getattr(self.f, name)
80 def write(self, s):
81 self.buf += s
82 idx = self.buf.find('\n')
83 while idx >= 0:
84 self.f.write(str(time.time()) + ': ' + self.buf[:idx + 1])
85 self.buf = self.buf[idx + 1:]
86 idx = self.buf.find('\n')
88 def close(self):
89 if self.buf:
90 self.f.write(str(time.time()) + ': ' + self.buf)
91 self.buf = ""
92 f.close()
95 class FileWithProgress:
96 """ Mimics a file (passed as f, an open file), but with progress.
98 FileWithProgress(f, args)
100 args:
101 * ptype = 1,2 is the type ("|/-\" or numeric), default 0 (no progress)
102 * progressf = file to output progress to (default sys.stdout)
103 * size = size of file (or -1, the default, to ignore)
104 for numeric output
105 * step = stepsize (default 1024)
109 def __init__(self, f, ptype=0, progressf=sys.stdout, size=-1, step=1024):
110 self.f = f
111 self.count = 0
112 self.lastupdate = 0
113 self.ptype = ptype
114 self.ppos = 0
115 self.progresschars = ['|', '/', '-', '\\']
116 self.progressf = progressf
117 self.size = size
118 self.step = step
119 self.closed = 0
121 def __getattr__(self, name):
122 return getattr(self.f, name)
124 def read(self, size=-1):
125 a = self.f.read(size)
126 self.count = self.count + len(a)
127 if (self.count - self.lastupdate) > 1024:
128 if self.ptype == 1:
129 self.ppos = (self.ppos + 1) % len(self.progresschars)
130 self.progressf.write(
131 (self.lastupdate != 0) * "\b" +
132 self.progresschars[self.ppos])
133 self.progressf.flush()
134 self.lastupdate = self.count
135 elif self.ptype == 2:
136 s = str(self.count // self.step) + "k"
137 if self.size >= 0:
138 s += (
139 '/' + str((self.size + self.step - 1) // self.step)
140 + 'k')
141 s += min(self.ppos - len(s), 0) * ' '
142 self.progressf.write(self.ppos * "\b" + s)
143 self.progressf.flush()
144 self.ppos = len(s)
145 return a
147 def close(self):
148 if not self.closed:
149 self.f.close()
150 self.closed = 1
151 if self.ptype == 1:
152 if self.lastupdate:
153 self.progressf.write("\b \b")
154 self.progressf.flush()
155 elif self.ptype == 2:
156 self.progressf.write(
157 self.ppos * "\b" + self.ppos * " " + self.ppos * "\b")
158 self.progressf.flush()
160 def __del__(self):
161 self.close()
164 def get_progname(argv=None):
165 """ Get the program name from the command line arguments.
167 :param argv: Sequence of command-line arguments.
168 Defaults to `sys.argv`.
169 :return: The program name used to invoke this program.
172 if argv is None:
173 argv = sys.argv
174 progname = os.path.basename(argv[0])
175 return progname
178 def get_distribution_version():
179 """ Get the version string for this distribution. """
180 distribution = pkg_resources.get_distribution("dput")
181 return distribution.version
184 def getopt(args, shortopts, longopts):
185 args = args[:]
186 optlist = []
187 while args and args[0].startswith('-'):
188 if args[0] == '--':
189 args = args[1:]
190 break
191 if args[0] == '-':
192 break
193 if args[0].startswith('--'):
194 opt = args.pop(0)[2:]
195 if '=' in opt:
196 opt, optarg = opt.split('=', 1)
197 else:
198 optarg = None
199 prefixmatch = [x for x in longopts if x.startswith(opt)]
200 if len(prefixmatch) == 0:
201 raise DputException('unknown option --%s' % opt)
202 elif len(prefixmatch) > 1:
203 raise DputException('non-unique prefix --%s' % opt)
204 opt = prefixmatch[0]
205 if opt.endswith('=='):
206 opt = opt[:-2]
207 optarg = optarg or ''
208 elif opt.endswith('='):
209 opt = opt[:-1]
210 if not optarg:
211 if not args:
212 raise DputException(
213 'option --%s requires argument' % opt)
214 optarg = args.pop(0)
215 else:
216 if optarg is not None:
217 raise DputException(
218 'option --%s does not take arguments' % opt)
219 optarg = ''
220 optlist.append(('--' + opt, optarg))
221 else:
222 s = args.pop(0)[1:]
223 while s:
224 pos = shortopts.find(s[0])
225 if pos == -1:
226 raise DputException('option -%s unknown' % s[0])
227 if pos + 1 >= len(shortopts) or shortopts[pos + 1] != ':':
228 optlist.append(('-' + s[0], ''))
229 s = s[1:]
230 elif len(s) > 1:
231 optlist.append(('-' + s[0], s[1:]))
232 s = ''
233 elif args:
234 optlist.append(('-' + s, args.pop(0)))
235 s = ''
236 else:
237 raise DputException('option -%s requires argument' % s)
238 return optlist, args
241 if __name__ == '__main__':
242 file_name = "dput.py"
243 file_path = os.path.join(os.path.dirname(__file__), os.pardir, file_name)
244 file_size = os.stat(file_path).st_size
245 for i in range(1, 3):
246 sys.stdout.write("Reading %s " % file_name)
247 sys.stdout.flush()
248 a = FileWithProgress(open(file_path), ptype=i, size=file_size)
249 b = ' '
250 while b:
251 b = a.read(4096)
252 time.sleep(1)
253 a.close()
254 print
257 # Local variables:
258 # coding: utf-8
259 # mode: python
260 # End:
261 # vim: fileencoding=utf-8 filetype=python :