Make pychecker happier.
[rox-lib.git] / python / rox / processes.py
blobe83fa7603e62596a84f6cb83043dc61ea9633e26
1 """This module makes it easier to use other programs to process data.
3 The Process class provides the low-level interface, which you can extend
4 by subclassing. Processes run in the background, so users can still work
5 with your application while they are running. If you don't care about that,
6 you might like to look at Python's builtin popen2 module.
8 The PipeThroughCommand class extends Process to provide an easy way to
9 run other commands. It also, optionally, allows a stream of data to be fed
10 in to the process's standard input, and can collect the output to another
11 stream. Typical usage:
13 rox.processes.PipeThroughCommand(('echo', 'hello'), None, file('output', 'w')).wait()
15 This creates a new process, and execs 'echo hello' in it with output sent
16 to the file 'output' (any file-like object can be used). The wait() runs a
17 recursive mainloop, so that your application can still be used while the
18 command runs, but the wait() itself doesn't return until the command
19 completes.
21 Instead of using a tuple for the command, a string may be passed (eg, "echo
22 hello"). In this case, the shell is used to interpret the command, allowing
23 pipes, wildcards and so on. Be very careful of escaping in this case (think
24 about filenames containing spaces, quotes, apostrophes, etc).
25 """
27 from rox import g, saving
29 import os, sys, fcntl
30 import signal
32 def _keep_on_exec(fd): fcntl.fcntl(fd, fcntl.F_SETFD, 0)
34 class ChildError(Exception):
35 "Raised when the child process reports an error."
36 def __init__(self, message):
37 Exception.__init__(self, message)
39 class ChildKilled(ChildError):
40 "Raised when child died due to a call to the kill method."
41 def __init__(self):
42 ChildError.__init__(self, "Operation aborted at user's request")
44 class Process:
45 """This represents another process. You should subclass this
46 and override the various methods. Use this when you want to
47 run another process in the background, but still be able to
48 communicate with it."""
49 def __init__(self):
50 self.child = None
52 def start(self):
53 """Create the subprocess. Calls pre_fork() and forks.
54 The parent then calls parent_post_fork() and returns,
55 while the child calls child_post_fork() and then
56 child_run()."""
58 assert self.child is None
60 stderr_r = stderr_w = None
62 try:
63 self.pre_fork()
64 stderr_r, stderr_w = os.pipe()
65 child = os.fork()
66 except:
67 if stderr_r: os.close(stderr_r)
68 if stderr_w: os.close(stderr_w)
69 self.start_error()
70 raise
72 if child == 0:
73 # This is the child process
74 try:
75 try:
76 os.setpgid(0, 0) # Start a new process group
77 os.close(stderr_r)
79 if stderr_w != 2:
80 os.dup2(stderr_w, 2)
81 os.close(stderr_w)
83 self.child_post_fork()
84 self.child_run()
85 raise Exception('child_run() returned!')
86 except:
87 import traceback
88 traceback.print_exc()
89 finally:
90 os._exit(1)
91 assert 0
93 self.child = child
95 # This is the parent process
96 os.close(stderr_w)
97 self.err_from_child = stderr_r
99 import gobject
100 if not hasattr(gobject, 'io_add_watch'):
101 self.tag = g.input_add_full(self.err_from_child,
102 g.gdk.INPUT_READ, self._got_errors)
103 else:
104 self.tag = gobject.io_add_watch(self.err_from_child,
105 gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR,
106 self._got_errors)
108 self.parent_post_fork()
110 def pre_fork(self):
111 """This is called in 'start' just before forking into
112 two processes. If you want to share a resource between
113 both processes (eg, a pipe), create it here.
114 Default method does nothing."""
116 def parent_post_fork(self):
117 """This is called in the parent after forking. Free the
118 child part of any resources allocated in pre_fork().
119 Also called if the fork or pre_fork() fails.
120 Default method does nothing."""
122 def child_post_fork(self):
123 """Called in the child after forking. Release the parent
124 part of any resources allocated in pre_fork().
125 Also called (in the parent) if the fork or pre_fork()
126 fails. Default method does nothing."""
128 def start_error(self):
129 """An error occurred before or during the fork (possibly
130 in pre_fork(). Clean up. Default method calls
131 parent_post_fork() and child_post_fork(). On returning,
132 the original exception will be raised."""
133 self.parent_post_fork()
134 self.child_post_fork()
136 def child_run(self):
137 """Called in the child process (after child_post_fork()).
138 Do whatever processing is required (perhaps exec another
139 process). If you don't exec, call os._exit(n) when done.
140 DO NOT make gtk calls in the child process, as it shares its
141 parent's connection to the X server until you exec()."""
142 os._exit(0)
144 def kill(self, sig = signal.SIGTERM):
145 """Send a signal to all processes in the child's process
146 group. The default, SIGTERM, requests all the processes
147 terminate. SIGKILL is more forceful."""
148 assert self.child is not None
149 os.kill(-self.child, sig)
151 def got_error_output(self, data):
152 """Read some characters from the child's stderr stream.
153 The default method copies to our stderr. Note that 'data'
154 isn't necessarily a complete line; it could be a single
155 character, or several lines, etc."""
156 sys.stderr.write(data)
158 def _got_errors(self, source, cond):
159 got = os.read(self.err_from_child, 100)
160 if got:
161 self.got_error_output(got)
162 return 1
164 os.close(self.err_from_child)
165 g.input_remove(self.tag)
166 del self.tag
168 pid, status = os.waitpid(self.child, 0)
169 self.child = None
170 self.child_died(status)
172 def child_died(self, status):
173 """Called when the child died (actually, when the child
174 closes its end of the stderr pipe). The child process has
175 already been reaped at this point; 'status' is the status
176 returned by os.waitpid."""
178 class PipeThroughCommand(Process):
179 def __init__(self, command, src, dst):
180 """Execute 'command' with src as stdin and writing to stream
181 dst. If either stream is not a fileno() stream, temporary files
182 will be used as required.
183 Either stream may be None if input or output is not required.
184 Call the wait() method to wait for the command to finish.
185 'command' may be a string (passed to os.system) or a list (os.execvp).
188 if src is not None and not hasattr(src, 'fileno'):
189 import shutil
190 new = _Tmp()
191 src.seek(0)
192 shutil.copyfileobj(src, new)
193 src = new
195 Process.__init__(self)
197 self.command = command
198 self.dst = dst
199 self.src = src
200 self.tmp_stream = None
202 self.callback = None
203 self.killed = 0
204 self.errors = ""
206 self.done = False # bool or exception
207 self.waiting = False
209 def pre_fork(self):
210 # Output to 'dst' directly if it's a fileno stream. Otherwise,
211 # send output to a temporary file.
212 assert self.tmp_stream is None
214 if self.dst:
215 if hasattr(self.dst, 'fileno'):
216 self.dst.flush()
217 self.tmp_stream = self.dst
218 else:
219 self.tmp_stream = _Tmp()
221 def start_error(self):
222 self.tmp_stream = None
224 def child_run(self):
225 """Assigns file descriptors and calls child_run_with_streams."""
226 src = self.src or file('/dev/null', 'r')
228 os.dup2(src.fileno(), 0)
229 _keep_on_exec(0)
230 try:
231 os.lseek(0, 0, 0) # OpenBSD needs this, dunno why
232 except:
233 pass
235 if self.dst:
236 os.dup2(self.tmp_stream.fileno(), 1)
237 _keep_on_exec(1)
239 self.child_run_with_streams()
240 os._exit(1)
242 def child_run_with_streams(self):
243 """This is run by the child process. stdin and stdout have already been set up.
244 Should call exec() or os._exit() to finish. Default method execs self.command."""
245 # (basestr is python2.3 only)
246 if isinstance(self.command, str):
247 if os.system(self.command) == 0:
248 os._exit(0) # No error code or signal
249 else:
250 os.execvp(self.command[0], self.command)
252 def parent_post_fork(self):
253 if self.dst and self.tmp_stream is self.dst:
254 self.tmp_stream = None
256 def got_error_output(self, data):
257 self.errors += data
259 def check_errors(self, errors, status):
260 """Raise an exception here if errors (the string the child process wrote to stderr) or
261 status (the status from waitpid) seems to warrent it. It will be returned by wait()."""
262 if errors:
263 raise ChildError("Errors from command '%s':\n%s" % (str(self.command), errors))
264 raise ChildError("Command '%s' returned an error code (%d)!" % (str(self.command), status))
266 def child_died(self, status):
267 errors = self.errors.strip()
269 if self.killed:
270 self.done = ChildKilled()
271 elif errors or status:
272 try:
273 self.check_errors(errors, status)
274 self.done = True
275 except Exception, e:
276 self.done = e
277 else:
278 self.done = True
280 assert self.done is True or isinstance(self.done, Exception)
282 if self.done is True:
283 # Success
284 # If dst wasn't a fileno stream, copy from the temp file to it
285 if self.tmp_stream:
286 self.tmp_stream.seek(0)
287 self.dst.write(self.tmp_stream.read())
289 self.tmp_stream = None
291 if self.waiting:
292 assert self.done
293 self.waiting = False
294 g.mainquit()
296 def wait(self):
297 """Run a recursive mainloop until the command terminates.
298 Raises an exception on error."""
299 if self.child is None:
300 self.start()
301 self.waiting = True
302 while not self.done:
303 g.mainloop()
304 if self.done is not True:
305 raise self.done
307 def kill(self, sig = signal.SIGTERM):
308 self.killed = 1
309 Process.kill(self, sig)
311 def _Tmp(mode = 'w+b', suffix = '-tmp'):
312 "Create a seekable, randomly named temp file (deleted automatically after use)."
313 import tempfile
314 try:
315 return tempfile.NamedTemporaryFile(mode, suffix = suffix)
316 except:
317 # python2.2 doesn't have NamedTemporaryFile...
318 pass
320 import random
321 name = tempfile.mktemp(`random.randint(1, 1000000)` + suffix)
323 fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
324 tmp = tempfile.TemporaryFileWrapper(os.fdopen(fd, mode), name)
325 tmp.name = name
326 return tmp
329 def _test():
330 "Check that this module works."
332 def show():
333 error = sys.exc_info()[1]
334 print "(error reported was '%s')" % error
336 def pipe_through_command(command, src, dst): PipeThroughCommand(command, src, dst).wait()
338 print "Test _Tmp()..."
340 file = _Tmp()
341 file.write('Hello')
342 print >>file, ' ',
343 file.flush()
344 os.write(file.fileno(), 'World')
346 file.seek(0)
347 assert file.read() == 'Hello World'
349 print "Test pipe_through_command():"
351 print "Try an invalid command..."
352 try:
353 pipe_through_command('bad_command_1234', None, None)
354 assert 0
355 except ChildError:
356 show()
357 else:
358 assert 0
360 print "Try a valid command..."
361 pipe_through_command('exit 0', None, None)
363 print "Writing to a non-fileno stream..."
364 from cStringIO import StringIO
365 a = StringIO()
366 pipe_through_command('echo Hello', None, a)
367 assert a.getvalue() == 'Hello\n'
369 print "Try with args..."
370 a = StringIO()
371 pipe_through_command(('echo', 'Hello'), None, a)
372 assert a.getvalue() == 'Hello\n'
374 print "Reading from a stream to a StringIO..."
375 file.seek(1) # (ignored)
376 pipe_through_command('cat', file, a)
377 assert a.getvalue() == 'Hello\nHello World'
379 print "Writing to a fileno stream..."
380 file.seek(0)
381 file.truncate(0)
382 pipe_through_command('echo Foo', None, file)
383 file.seek(0)
384 assert file.read() == 'Foo\n'
386 print "Read and write fileno streams..."
387 src = _Tmp()
388 src.write('123')
389 src.seek(0)
390 file.seek(0)
391 file.truncate(0)
392 pipe_through_command('cat', src, file)
393 file.seek(0)
394 assert file.read() == '123'
396 print "Detect non-zero exit value..."
397 try:
398 pipe_through_command('exit 1', None, None)
399 except ChildError:
400 show()
401 else:
402 assert 0
404 print "Detect writes to stderr..."
405 try:
406 pipe_through_command('echo one >&2; sleep 2; echo two >&2', None, None)
407 except ChildError:
408 show()
409 else:
410 assert 0
412 print "Check tmp file is deleted..."
413 name = file.name
414 assert os.path.exists(name)
415 file = None
416 assert not os.path.exists(name)
418 print "Check we can kill a runaway proces..."
419 ptc = PipeThroughCommand('sleep 100; exit 1', None, None)
420 def stop():
421 ptc.kill()
422 g.timeout_add(2000, stop)
423 try:
424 ptc.wait()
425 assert 0
426 except ChildKilled:
427 pass
429 print "All tests passed!"
431 if __name__ == '__main__':
432 _test()