Fix to let documentation build.
[rox-lib.git] / python / rox / processes.py
blob907b56311c1bced43d8de5f3914c17ab51c9d84b
1 """This module makes it easier to use other programs to process data.
3 The Process class provides the low-level interface, which you can extend
4 by subclassing. Processes run in the background, so users can still work
5 with your application while they are running. If you don't care about that,
6 you might like to look at Python's builtin popen2 module.
8 The PipeThroughCommand class extends Process to provide an easy way to
9 run other commands. It also, optionally, allows a stream of data to be fed
10 in to the process's standard input, and can collect the output to another
11 stream. Typical usage:
13 rox.processes.PipeThroughCommand(('echo', 'hello'), None, file('output', 'w')).wait()
15 This creates a new process, and execs 'echo hello' in it with output sent
16 to the file 'output' (any file-like object can be used). The wait() runs a
17 recursive mainloop, so that your application can still be used while the
18 command runs, but the wait() itself doesn't return until the command
19 completes.
21 Instead of using a tuple for the command, a string may be passed (eg, "echo
22 hello"). In this case, the shell is used to interpret the command, allowing
23 pipes, wildcards and so on. Be very careful of escaping in this case (think
24 about filenames containing spaces, quotes, apostrophes, etc).
25 """
27 from rox import g, saving
29 import os, sys, fcntl
30 import signal
32 def _keep_on_exec(fd): fcntl.fcntl(fd, fcntl.F_SETFD, 0)
34 class ChildError(Exception):
35 "Raised when the child process reports an error."
36 def __init__(self, message):
37 Exception.__init__(self, message)
39 class ChildKilled(ChildError):
40 "Raised when child died due to a call to the kill method."
41 def __init__(self):
42 ChildError.__init__(self, "Operation aborted at user's request")
44 class Process:
45 """This represents another process. You should subclass this
46 and override the various methods. Use this when you want to
47 run another process in the background, but still be able to
48 communicate with it."""
49 def __init__(self):
50 self.child = None
52 def start(self):
53 """Create the subprocess. Calls pre_fork() and forks.
54 The parent then calls parent_post_fork() and returns,
55 while the child calls child_post_fork() and then
56 child_run()."""
58 assert self.child is None
60 stderr_r = stderr_w = None
62 try:
63 self.pre_fork()
64 stderr_r, stderr_w = os.pipe()
65 child = os.fork()
66 except:
67 if stderr_r: os.close(stderr_r)
68 if stderr_w: os.close(stderr_w)
69 self.start_error()
70 raise
72 if child == 0:
73 # This is the child process
74 try:
75 try:
76 os.setpgid(0, 0) # Start a new process group
77 os.close(stderr_r)
79 if stderr_w != 2:
80 os.dup2(stderr_w, 2)
81 os.close(stderr_w)
83 self.child_post_fork()
84 self.child_run()
85 raise Exception('child_run() returned!')
86 except:
87 import traceback
88 traceback.print_exc()
89 finally:
90 os._exit(1)
91 assert 0
93 self.child = child
95 # This is the parent process
96 os.close(stderr_w)
97 self.err_from_child = stderr_r
99 import gobject
100 if not hasattr(gobject, 'io_add_watch'):
101 self.tag = g.input_add_full(self.err_from_child,
102 g.gdk.INPUT_READ, self._got_errors)
103 else:
104 self.tag = gobject.io_add_watch(self.err_from_child,
105 gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR,
106 self._got_errors)
108 self.parent_post_fork()
110 def pre_fork(self):
111 """This is called in 'start' just before forking into
112 two processes. If you want to share a resource between
113 both processes (eg, a pipe), create it here.
114 Default method does nothing."""
116 def parent_post_fork(self):
117 """This is called in the parent after forking. Free the
118 child part of any resources allocated in pre_fork().
119 Also called if the fork or pre_fork() fails.
120 Default method does nothing."""
122 def child_post_fork(self):
123 """Called in the child after forking. Release the parent
124 part of any resources allocated in pre_fork().
125 Also called (in the parent) if the fork or pre_fork()
126 fails. Default method does nothing."""
128 def start_error(self):
129 """An error occurred before or during the fork (possibly
130 in pre_fork(). Clean up. Default method calls
131 parent_post_fork() and child_post_fork(). On returning,
132 the original exception will be raised."""
133 self.parent_post_fork()
134 self.child_post_fork()
136 def child_run(self):
137 """Called in the child process (after child_post_fork()).
138 Do whatever processing is required (perhaps exec another
139 process). If you don't exec, call os._exit(n) when done.
140 DO NOT make gtk calls in the child process, as it shares its
141 parent's connection to the X server until you exec()."""
142 os._exit(0)
144 def kill(self, sig = signal.SIGTERM):
145 """Send a signal to all processes in the child's process
146 group. The default, SIGTERM, requests all the processes
147 terminate. SIGKILL is more forceful."""
148 assert self.child is not None
149 os.kill(-self.child, sig)
151 def got_error_output(self, data):
152 """Read some characters from the child's stderr stream.
153 The default method copies to our stderr. Note that 'data'
154 isn't necessarily a complete line; it could be a single
155 character, or several lines, etc."""
156 sys.stderr.write(data)
158 def _got_errors(self, source, cond):
159 got = os.read(self.err_from_child, 100)
160 if got:
161 self.got_error_output(got)
162 return True
164 os.close(self.err_from_child)
165 g.input_remove(self.tag)
166 del self.tag
168 pid, status = os.waitpid(self.child, 0)
169 self.child = None
170 self.child_died(status)
171 return False
173 def child_died(self, status):
174 """Called when the child died (actually, when the child
175 closes its end of the stderr pipe). The child process has
176 already been reaped at this point; 'status' is the status
177 returned by os.waitpid."""
179 class PipeThroughCommand(Process):
180 """A Process that runs a command with given input and output (Python) streams."""
182 def __init__(self, command, src, dst):
183 """Execute 'command' with src as stdin and writing to stream
184 dst. If either stream is not a fileno() stream, temporary files
185 will be used as required.
186 Either stream may be None if input or output is not required.
187 Call the wait() method to wait for the command to finish.
188 'command' may be a string (passed to os.system) or a list (os.execvp).
191 if src is not None and not hasattr(src, 'fileno'):
192 import shutil
193 new = _Tmp()
194 src.seek(0)
195 shutil.copyfileobj(src, new)
196 src = new
198 Process.__init__(self)
200 self.command = command
201 self.dst = dst
202 self.src = src
203 self.tmp_stream = None
205 self.callback = None
206 self.killed = 0
207 self.errors = ""
209 self.done = False # bool or exception
210 self.waiting = False
212 def pre_fork(self):
213 # Output to 'dst' directly if it's a fileno stream. Otherwise,
214 # send output to a temporary file.
215 assert self.tmp_stream is None
217 if self.dst:
218 if hasattr(self.dst, 'fileno'):
219 self.dst.flush()
220 self.tmp_stream = self.dst
221 else:
222 self.tmp_stream = _Tmp()
224 def start_error(self):
225 self.tmp_stream = None
227 def child_run(self):
228 """Assigns file descriptors and calls child_run_with_streams."""
229 src = self.src or file('/dev/null', 'r')
231 os.dup2(src.fileno(), 0)
232 _keep_on_exec(0)
233 try:
234 os.lseek(0, 0, 0) # OpenBSD needs this, dunno why
235 except:
236 pass
238 if self.dst:
239 os.dup2(self.tmp_stream.fileno(), 1)
240 _keep_on_exec(1)
242 self.child_run_with_streams()
243 os._exit(1)
245 def child_run_with_streams(self):
246 """This is run by the child process. stdin and stdout have already been set up.
247 Should call exec() or os._exit() to finish. Default method execs self.command."""
248 # (basestr is python2.3 only)
249 if isinstance(self.command, str):
250 if os.system(self.command) == 0:
251 os._exit(0) # No error code or signal
252 else:
253 os.execvp(self.command[0], self.command)
255 def parent_post_fork(self):
256 if self.dst and self.tmp_stream is self.dst:
257 self.tmp_stream = None
259 def got_error_output(self, data):
260 self.errors += data
262 def check_errors(self, errors, status):
263 """Raise an exception here if errors (the string the child process wrote to stderr) or
264 status (the status from waitpid) seems to warrent it. It will be returned by wait()."""
265 if errors:
266 raise ChildError("Errors from command '%s':\n%s" % (str(self.command), errors))
267 raise ChildError("Command '%s' returned an error code (%d)!" % (str(self.command), status))
269 def child_died(self, status):
270 errors = self.errors.strip()
272 if self.killed:
273 self.done = ChildKilled()
274 elif errors or status:
275 try:
276 self.check_errors(errors, status)
277 self.done = True
278 except Exception, e:
279 self.done = e
280 else:
281 self.done = True
283 assert self.done is True or isinstance(self.done, Exception)
285 if self.done is True:
286 # Success
287 # If dst wasn't a fileno stream, copy from the temp file to it
288 if self.tmp_stream:
289 self.tmp_stream.seek(0)
290 self.dst.write(self.tmp_stream.read())
292 self.tmp_stream = None
294 if self.waiting:
295 assert self.done
296 self.waiting = False
297 g.mainquit()
299 def wait(self):
300 """Run a recursive mainloop until the command terminates.
301 Raises an exception on error."""
302 if self.child is None:
303 self.start()
304 self.waiting = True
305 while not self.done:
306 g.mainloop()
307 if self.done is not True:
308 raise self.done
310 def kill(self, sig = signal.SIGTERM):
311 self.killed = 1
312 Process.kill(self, sig)
314 def _Tmp(mode = 'w+b', suffix = '-tmp'):
315 "Create a seekable, randomly named temp file (deleted automatically after use)."
316 import tempfile
317 try:
318 return tempfile.NamedTemporaryFile(mode, suffix = suffix)
319 except:
320 # python2.2 doesn't have NamedTemporaryFile...
321 pass
323 import random
324 name = tempfile.mktemp(`random.randint(1, 1000000)` + suffix)
326 fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
327 tmp = tempfile.TemporaryFileWrapper(os.fdopen(fd, mode), name)
328 tmp.name = name
329 return tmp
332 def _test():
333 "Check that this module works."
335 def show():
336 error = sys.exc_info()[1]
337 print "(error reported was '%s')" % error
339 def pipe_through_command(command, src, dst): PipeThroughCommand(command, src, dst).wait()
341 print "Test _Tmp()..."
343 test_file = _Tmp()
344 test_file.write('Hello')
345 print >>test_file, ' ',
346 test_file.flush()
347 os.write(test_file.fileno(), 'World')
349 test_file.seek(0)
350 assert test_file.read() == 'Hello World'
352 print "Test pipe_through_command():"
354 print "Try an invalid command..."
355 try:
356 pipe_through_command('bad_command_1234', None, None)
357 assert 0
358 except ChildError:
359 show()
360 else:
361 assert 0
363 print "Try a valid command..."
364 pipe_through_command('exit 0', None, None)
366 print "Writing to a non-fileno stream..."
367 from cStringIO import StringIO
368 a = StringIO()
369 pipe_through_command('echo Hello', None, a)
370 assert a.getvalue() == 'Hello\n'
372 print "Try with args..."
373 a = StringIO()
374 pipe_through_command(('echo', 'Hello'), None, a)
375 assert a.getvalue() == 'Hello\n'
377 print "Reading from a stream to a StringIO..."
378 test_file.seek(1) # (ignored)
379 pipe_through_command('cat', test_file, a)
380 assert a.getvalue() == 'Hello\nHello World'
382 print "Writing to a fileno stream..."
383 test_file.seek(0)
384 test_file.truncate(0)
385 pipe_through_command('echo Foo', None, test_file)
386 test_file.seek(0)
387 assert test_file.read() == 'Foo\n'
389 print "Read and write fileno streams..."
390 src = _Tmp()
391 src.write('123')
392 src.seek(0)
393 test_file.seek(0)
394 test_file.truncate(0)
395 pipe_through_command('cat', src, test_file)
396 test_file.seek(0)
397 assert test_file.read() == '123'
399 print "Detect non-zero exit value..."
400 try:
401 pipe_through_command('exit 1', None, None)
402 except ChildError:
403 show()
404 else:
405 assert 0
407 print "Detect writes to stderr..."
408 try:
409 pipe_through_command('echo one >&2; sleep 2; echo two >&2', None, None)
410 except ChildError:
411 show()
412 else:
413 assert 0
415 print "Check tmp file is deleted..."
416 name = test_file.name
417 assert os.path.exists(name)
418 test_file = None
419 assert not os.path.exists(name)
421 print "Check we can kill a runaway proces..."
422 ptc = PipeThroughCommand('sleep 100; exit 1', None, None)
423 def stop():
424 ptc.kill()
425 g.timeout_add(2000, stop)
426 try:
427 ptc.wait()
428 assert 0
429 except ChildKilled:
430 pass
432 print "All tests passed!"
434 if __name__ == '__main__':
435 _test()