1 """This module makes it easier to use other programs to process data.
3 The Process class provides the low-level interface, which you can extend
4 by subclassing. Processes run in the background, so users can still work
5 with your application while they are running. If you don't care about that,
6 you might like to look at Python's builtin popen2 module.
8 The PipeThroughCommand class extends Process to provide an easy way to
9 run other commands. It also, optionally, allows a stream of data to be fed
10 in to the process's standard input, and can collect the output to another
11 stream. Typical usage:
13 rox.processes.PipeThroughCommand(('echo', 'hello'), None, file('output', 'w')).wait()
15 This creates a new process, and execs 'echo hello' in it with output sent
16 to the file 'output' (any file-like object can be used). The wait() runs a
17 recursive mainloop, so that your application can still be used while the
18 command runs, but the wait() itself doesn't return until the command
21 Instead of using a tuple for the command, a string may be passed (eg, "echo
22 hello"). In this case, the shell is used to interpret the command, allowing
23 pipes, wildcards and so on. Be very careful of escaping in this case (think
24 about filenames containing spaces, quotes, apostrophes, etc).
27 from rox
import g
, saving
32 def _keep_on_exec(fd
): fcntl
.fcntl(fd
, fcntl
.F_SETFD
, 0)
34 class ChildError(Exception):
35 "Raised when the child process reports an error."
36 def __init__(self
, message
):
37 Exception.__init
__(self
, message
)
39 class ChildKilled(ChildError
):
40 "Raised when child died due to a call to the kill method."
42 ChildError
.__init
__(self
, "Operation aborted at user's request")
45 """This represents another process. You should subclass this
46 and override the various methods. Use this when you want to
47 run another process in the background, but still be able to
48 communicate with it."""
53 """Create the subprocess. Calls pre_fork() and forks.
54 The parent then calls parent_post_fork() and returns,
55 while the child calls child_post_fork() and then
58 assert self
.child
is None
60 stderr_r
= stderr_w
= None
64 stderr_r
, stderr_w
= os
.pipe()
67 if stderr_r
: os
.close(stderr_r
)
68 if stderr_w
: os
.close(stderr_w
)
73 # This is the child process
76 os
.setpgid(0, 0) # Start a new process group
83 self
.child_post_fork()
85 raise Exception('child_run() returned!')
95 # This is the parent process
97 self
.err_from_child
= stderr_r
100 if not hasattr(gobject
, 'io_add_watch'):
101 self
.tag
= g
.input_add_full(self
.err_from_child
,
102 g
.gdk
.INPUT_READ
, self
._got
_errors
)
104 self
.tag
= gobject
.io_add_watch(self
.err_from_child
,
105 gobject
.IO_IN | gobject
.IO_HUP | gobject
.IO_ERR
,
108 self
.parent_post_fork()
111 """This is called in 'start' just before forking into
112 two processes. If you want to share a resource between
113 both processes (eg, a pipe), create it here.
114 Default method does nothing."""
116 def parent_post_fork(self
):
117 """This is called in the parent after forking. Free the
118 child part of any resources allocated in pre_fork().
119 Also called if the fork or pre_fork() fails.
120 Default method does nothing."""
122 def child_post_fork(self
):
123 """Called in the child after forking. Release the parent
124 part of any resources allocated in pre_fork().
125 Also called (in the parent) if the fork or pre_fork()
126 fails. Default method does nothing."""
128 def start_error(self
):
129 """An error occurred before or during the fork (possibly
130 in pre_fork(). Clean up. Default method calls
131 parent_post_fork() and child_post_fork(). On returning,
132 the original exception will be raised."""
133 self
.parent_post_fork()
134 self
.child_post_fork()
137 """Called in the child process (after child_post_fork()).
138 Do whatever processing is required (perhaps exec another
139 process). If you don't exec, call os._exit(n) when done.
140 DO NOT make gtk calls in the child process, as it shares its
141 parent's connection to the X server until you exec()."""
144 def kill(self
, sig
= signal
.SIGTERM
):
145 """Send a signal to all processes in the child's process
146 group. The default, SIGTERM, requests all the processes
147 terminate. SIGKILL is more forceful."""
148 assert self
.child
is not None
149 os
.kill(-self
.child
, sig
)
151 def got_error_output(self
, data
):
152 """Read some characters from the child's stderr stream.
153 The default method copies to our stderr. Note that 'data'
154 isn't necessarily a complete line; it could be a single
155 character, or several lines, etc."""
156 sys
.stderr
.write(data
)
158 def _got_errors(self
, source
, cond
):
159 got
= os
.read(self
.err_from_child
, 100)
161 self
.got_error_output(got
)
164 os
.close(self
.err_from_child
)
165 g
.input_remove(self
.tag
)
168 pid
, status
= os
.waitpid(self
.child
, 0)
170 self
.child_died(status
)
173 def child_died(self
, status
):
174 """Called when the child died (actually, when the child
175 closes its end of the stderr pipe). The child process has
176 already been reaped at this point; 'status' is the status
177 returned by os.waitpid."""
179 class PipeThroughCommand(Process
):
180 def __init__(self
, command
, src
, dst
):
181 """Execute 'command' with src as stdin and writing to stream
182 dst. If either stream is not a fileno() stream, temporary files
183 will be used as required.
184 Either stream may be None if input or output is not required.
185 Call the wait() method to wait for the command to finish.
186 'command' may be a string (passed to os.system) or a list (os.execvp).
189 if src
is not None and not hasattr(src
, 'fileno'):
193 shutil
.copyfileobj(src
, new
)
196 Process
.__init
__(self
)
198 self
.command
= command
201 self
.tmp_stream
= None
207 self
.done
= False # bool or exception
211 # Output to 'dst' directly if it's a fileno stream. Otherwise,
212 # send output to a temporary file.
213 assert self
.tmp_stream
is None
216 if hasattr(self
.dst
, 'fileno'):
218 self
.tmp_stream
= self
.dst
220 self
.tmp_stream
= _Tmp()
222 def start_error(self
):
223 self
.tmp_stream
= None
226 """Assigns file descriptors and calls child_run_with_streams."""
227 src
= self
.src
or file('/dev/null', 'r')
229 os
.dup2(src
.fileno(), 0)
232 os
.lseek(0, 0, 0) # OpenBSD needs this, dunno why
237 os
.dup2(self
.tmp_stream
.fileno(), 1)
240 self
.child_run_with_streams()
243 def child_run_with_streams(self
):
244 """This is run by the child process. stdin and stdout have already been set up.
245 Should call exec() or os._exit() to finish. Default method execs self.command."""
246 # (basestr is python2.3 only)
247 if isinstance(self
.command
, str):
248 if os
.system(self
.command
) == 0:
249 os
._exit
(0) # No error code or signal
251 os
.execvp(self
.command
[0], self
.command
)
253 def parent_post_fork(self
):
254 if self
.dst
and self
.tmp_stream
is self
.dst
:
255 self
.tmp_stream
= None
257 def got_error_output(self
, data
):
260 def check_errors(self
, errors
, status
):
261 """Raise an exception here if errors (the string the child process wrote to stderr) or
262 status (the status from waitpid) seems to warrent it. It will be returned by wait()."""
264 raise ChildError("Errors from command '%s':\n%s" % (str(self
.command
), errors
))
265 raise ChildError("Command '%s' returned an error code (%d)!" % (str(self
.command
), status
))
267 def child_died(self
, status
):
268 errors
= self
.errors
.strip()
271 self
.done
= ChildKilled()
272 elif errors
or status
:
274 self
.check_errors(errors
, status
)
281 assert self
.done
is True or isinstance(self
.done
, Exception)
283 if self
.done
is True:
285 # If dst wasn't a fileno stream, copy from the temp file to it
287 self
.tmp_stream
.seek(0)
288 self
.dst
.write(self
.tmp_stream
.read())
290 self
.tmp_stream
= None
298 """Run a recursive mainloop until the command terminates.
299 Raises an exception on error."""
300 if self
.child
is None:
305 if self
.done
is not True:
308 def kill(self
, sig
= signal
.SIGTERM
):
310 Process
.kill(self
, sig
)
312 def _Tmp(mode
= 'w+b', suffix
= '-tmp'):
313 "Create a seekable, randomly named temp file (deleted automatically after use)."
316 return tempfile
.NamedTemporaryFile(mode
, suffix
= suffix
)
318 # python2.2 doesn't have NamedTemporaryFile...
322 name
= tempfile
.mktemp(`random
.randint(1, 1000000)`
+ suffix
)
324 fd
= os
.open(name
, os
.O_RDWR|os
.O_CREAT|os
.O_EXCL
, 0700)
325 tmp
= tempfile
.TemporaryFileWrapper(os
.fdopen(fd
, mode
), name
)
331 "Check that this module works."
334 error
= sys
.exc_info()[1]
335 print "(error reported was '%s')" % error
337 def pipe_through_command(command
, src
, dst
): PipeThroughCommand(command
, src
, dst
).wait()
339 print "Test _Tmp()..."
342 test_file
.write('Hello')
343 print >>test_file
, ' ',
345 os
.write(test_file
.fileno(), 'World')
348 assert test_file
.read() == 'Hello World'
350 print "Test pipe_through_command():"
352 print "Try an invalid command..."
354 pipe_through_command('bad_command_1234', None, None)
361 print "Try a valid command..."
362 pipe_through_command('exit 0', None, None)
364 print "Writing to a non-fileno stream..."
365 from cStringIO
import StringIO
367 pipe_through_command('echo Hello', None, a
)
368 assert a
.getvalue() == 'Hello\n'
370 print "Try with args..."
372 pipe_through_command(('echo', 'Hello'), None, a
)
373 assert a
.getvalue() == 'Hello\n'
375 print "Reading from a stream to a StringIO..."
376 test_file
.seek(1) # (ignored)
377 pipe_through_command('cat', test_file
, a
)
378 assert a
.getvalue() == 'Hello\nHello World'
380 print "Writing to a fileno stream..."
382 test_file
.truncate(0)
383 pipe_through_command('echo Foo', None, test_file
)
385 assert test_file
.read() == 'Foo\n'
387 print "Read and write fileno streams..."
392 test_file
.truncate(0)
393 pipe_through_command('cat', src
, test_file
)
395 assert test_file
.read() == '123'
397 print "Detect non-zero exit value..."
399 pipe_through_command('exit 1', None, None)
405 print "Detect writes to stderr..."
407 pipe_through_command('echo one >&2; sleep 2; echo two >&2', None, None)
413 print "Check tmp file is deleted..."
414 name
= test_file
.name
415 assert os
.path
.exists(name
)
417 assert not os
.path
.exists(name
)
419 print "Check we can kill a runaway proces..."
420 ptc
= PipeThroughCommand('sleep 100; exit 1', None, None)
423 g
.timeout_add(2000, stop
)
430 print "All tests passed!"
432 if __name__
== '__main__':