Added PipeThroughCommand class (copied from Archive, with some minor
[rox-lib.git] / python / rox / processes.py
bloba960a9dab48c98f97ae0b2545416efa991033da8
1 """This module makes it easier to use other programs to process data."""
3 from rox import g, saving
5 import os, sys, fcntl
6 import signal
8 def _keep_on_exec(fd): fcntl.fcntl(fd, fcntl.F_SETFD, 0)
10 class ChildError(Exception):
11 "Raised when the child process reports an error."
12 def __init__(self, message):
13 Exception.__init__(self, message)
15 class ChildKilled(ChildError):
16 "Raised when child died due to a call to the kill method."
17 def __init__(self):
18 ChildError.__init__(self, "Operation aborted at user's request")
20 class Process:
21 """This represents another process. You should subclass this
22 and override the various methods. Use this when you want to
23 run another process in the background, but still be able to
24 communicate with it."""
25 def __init__(self):
26 self.child = None
28 def start(self):
29 """Create the subprocess. Calls pre_fork() and forks.
30 The parent then calls parent_post_fork() and returns,
31 while the child calls child_post_fork() and then
32 child_run()."""
34 assert self.child is None
36 stderr_r = stderr_w = None
38 try:
39 self.pre_fork()
40 stderr_r, stderr_w = os.pipe()
41 child = os.fork()
42 except:
43 if stderr_r: os.close(stderr_r)
44 if stderr_w: os.close(stderr_w)
45 self.start_error()
46 raise
48 if child == 0:
49 # This is the child process
50 try:
51 try:
52 os.setpgid(0, 0) # Start a new process group
53 os.close(stderr_r)
55 if stderr_w != 2:
56 os.dup2(stderr_w, 2)
57 os.close(stderr_w)
59 self.child_post_fork()
60 self.child_run()
61 raise Exception('child_run() returned!')
62 except:
63 import traceback
64 traceback.print_exc()
65 finally:
66 os._exit(1)
67 assert 0
69 self.child = child
71 # This is the parent process
72 os.close(stderr_w)
73 self.err_from_child = stderr_r
75 import gobject
76 if not hasattr(gobject, 'io_add_watch'):
77 self.tag = g.input_add_full(self.err_from_child,
78 g.gdk.INPUT_READ, self._got_errors)
79 else:
80 self.tag = gobject.io_add_watch(self.err_from_child,
81 gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR,
82 self._got_errors)
84 self.parent_post_fork()
86 def pre_fork(self):
87 """This is called in 'start' just before forking into
88 two processes. If you want to share a resource between
89 both processes (eg, a pipe), create it here.
90 Default method does nothing."""
92 def parent_post_fork(self):
93 """This is called in the parent after forking. Free the
94 child part of any resources allocated in pre_fork().
95 Also called if the fork or pre_fork() fails.
96 Default method does nothing."""
98 def child_post_fork(self):
99 """Called in the child after forking. Release the parent
100 part of any resources allocated in pre_fork().
101 Also called (in the parent) if the fork or pre_fork()
102 fails. Default method does nothing."""
104 def start_error(self):
105 """An error occurred before or during the fork (possibly
106 in pre_fork(). Clean up. Default method calls
107 parent_post_fork() and child_post_fork(). On returning,
108 the original exception will be raised."""
109 self.parent_post_fork()
110 self.child_post_fork()
112 def child_run(self):
113 """Called in the child process (after child_post_fork()).
114 Do whatever processing is required (perhaps exec another
115 process). If you don't exec, call os._exit(n) when done.
116 DO NOT make gtk calls in the child process, as it shares its
117 parent's connection to the X server until you exec()."""
118 os._exit(0)
120 def kill(self, sig = signal.SIGTERM):
121 """Send a signal to all processes in the child's process
122 group. The default, SIGTERM, requests all the processes
123 terminate. SIGKILL is more forceful."""
124 assert self.child is not None
125 os.kill(-self.child, sig)
127 def got_error_output(self, data):
128 """Read some characters from the child's stderr stream.
129 The default method copies to our stderr. Note that 'data'
130 isn't necessarily a complete line; it could be a single
131 character, or several lines, etc."""
132 sys.stderr.write(data)
134 def _got_errors(self, source, cond):
135 got = os.read(self.err_from_child, 100)
136 if got:
137 self.got_error_output(got)
138 return 1
140 os.close(self.err_from_child)
141 g.input_remove(self.tag)
142 del self.tag
144 pid, status = os.waitpid(self.child, 0)
145 self.child = None
146 self.child_died(status)
148 def child_died(self, status):
149 """Called when the child died (actually, when the child
150 closes its end of the stderr pipe). The child process has
151 already been reaped at this point; 'status' is the status
152 returned by os.waitpid."""
154 class PipeThroughCommand(Process):
155 def __init__(self, command, src, dst):
156 """Execute 'command' with src as stdin and writing to stream
157 dst. If either stream is not a fileno() stream, temporary files
158 will be used as required.
159 Either stream may be None if input or output is not required.
160 Call the wait() method to wait for the command to finish.
161 'command' may be a string (passed to os.system) or a list (os.execvp).
164 if src is not None and not hasattr(src, 'fileno'):
165 import shutil
166 new = Tmp()
167 src.seek(0)
168 shutil.copyfileobj(src, new)
169 src = new
171 Process.__init__(self)
173 self.command = command
174 self.dst = dst
175 self.src = src
176 self.tmp_stream = None
178 self.callback = None
179 self.killed = 0
180 self.errors = ""
182 self.start()
184 def pre_fork(self):
185 # Output to 'dst' directly if it's a fileno stream. Otherwise,
186 # send output to a temporary file.
187 assert self.tmp_stream is None
189 if self.dst:
190 if hasattr(self.dst, 'fileno'):
191 self.dst.flush()
192 self.tmp_stream = self.dst
193 else:
194 self.tmp_stream = Tmp()
196 def start_error(self):
197 self.tmp_stream = None
199 def child_run(self):
200 src = self.src
202 if src:
203 os.dup2(src.fileno(), 0)
204 _keep_on_exec(0)
205 os.lseek(0, 0, 0) # OpenBSD needs this, dunno why
206 if self.dst:
207 os.dup2(self.tmp_stream.fileno(), 1)
208 _keep_on_exec(1)
210 # (basestr is python2.3 only)
211 if isinstance(self.command, str):
212 if os.system(self.command) == 0:
213 os._exit(0) # No error code or signal
214 else:
215 os.execvp(self.command[0], self.command)
216 os._exit(1)
218 def parent_post_fork(self):
219 if self.dst and self.tmp_stream is self.dst:
220 self.tmp_stream = None
222 def got_error_output(self, data):
223 self.errors += data
225 def child_died(self, status):
226 errors = self.errors.strip()
228 err = None
230 if self.killed:
231 err = ChildKilled
232 elif errors:
233 err = ChildError("Errors from command '%s':\n%s" % (self.command, errors))
234 elif status != 0:
235 err = ChildError("Command '%s' returned an error code!" % self.command)
237 # If dst wasn't a fileno stream, copy from the temp file to it
238 if not err and self.tmp_stream:
239 self.tmp_stream.seek(0)
240 self.dst.write(self.tmp_stream.read())
241 self.tmp_stream = None
243 self.callback(err)
245 def wait(self):
246 """Run a recursive mainloop until the command terminates.
247 Raises an exception on error."""
248 done = []
249 def set_done(exception):
250 done.append(exception)
251 g.mainquit()
252 self.callback = set_done
253 while not done:
254 g.mainloop()
255 exception, = done
256 if exception:
257 raise exception
259 def kill(self):
260 self.killed = 1
261 Process.kill(self)
263 def Tmp(mode = 'w+b', suffix = '-tmp'):
264 "Create a seekable, randomly named temp file (deleted automatically after use)."
265 import tempfile
266 try:
267 return tempfile.NamedTemporaryFile(mode, suffix = suffix)
268 except:
269 # python2.2 doesn't have NamedTemporaryFile...
270 pass
272 import random
273 name = tempfile.mktemp(`random.randint(1, 1000000)` + suffix)
275 fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
276 tmp = tempfile.TemporaryFileWrapper(os.fdopen(fd, mode), name)
277 tmp.name = name
278 return tmp
281 def _test():
282 "Check that this module works."
284 def show():
285 error = sys.exc_info()[1]
286 print "(error reported was '%s')" % error
288 def pipe_through_command(command, src, dst): PipeThroughCommand(command, src, dst).wait()
290 print "Test Tmp()..."
292 file = Tmp()
293 file.write('Hello')
294 print >>file, ' ',
295 file.flush()
296 os.write(file.fileno(), 'World')
298 file.seek(0)
299 assert file.read() == 'Hello World'
301 print "Test pipe_through_command():"
303 print "Try an invalid command..."
304 try:
305 pipe_through_command('bad_command_1234', None, None)
306 assert 0
307 except ChildError:
308 show()
309 else:
310 assert 0
312 print "Try a valid command..."
313 pipe_through_command('exit 0', None, None)
315 print "Writing to a non-fileno stream..."
316 from cStringIO import StringIO
317 a = StringIO()
318 pipe_through_command('echo Hello', None, a)
319 assert a.getvalue() == 'Hello\n'
321 print "Try with args..."
322 a = StringIO()
323 pipe_through_command(('echo', 'Hello'), None, a)
324 assert a.getvalue() == 'Hello\n'
326 print "Reading from a stream to a StringIO..."
327 file.seek(1) # (ignored)
328 pipe_through_command('cat', file, a)
329 assert a.getvalue() == 'Hello\nHello World'
331 print "Writing to a fileno stream..."
332 file.seek(0)
333 file.truncate(0)
334 pipe_through_command('echo Foo', None, file)
335 file.seek(0)
336 assert file.read() == 'Foo\n'
338 print "Read and write fileno streams..."
339 src = Tmp()
340 src.write('123')
341 src.seek(0)
342 file.seek(0)
343 file.truncate(0)
344 pipe_through_command('cat', src, file)
345 file.seek(0)
346 assert file.read() == '123'
348 print "Detect non-zero exit value..."
349 try:
350 pipe_through_command('exit 1', None, None)
351 except ChildError:
352 show()
353 else:
354 assert 0
356 print "Detect writes to stderr..."
357 try:
358 pipe_through_command('echo one >&2; sleep 2; echo two >&2', None, None)
359 except ChildError:
360 show()
361 else:
362 assert 0
364 print "Check tmp file is deleted..."
365 name = file.name
366 assert os.path.exists(name)
367 file = None
368 assert not os.path.exists(name)
370 print "Check we can kill a runaway proces..."
371 ptc = PipeThroughCommand('sleep 100; exit 1', None, None)
372 def stop():
373 ptc.kill()
374 g.timeout_add(2000, stop)
375 try:
376 ptc.wait()
377 assert 0
378 except ChildKilled:
379 pass
381 print "All tests passed!"
383 if __name__ == '__main__':
384 _test()