1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """RPC compatible subprocess-type module.
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
18 #pylint: disable=relative-import
21 # Map swarming_client to use subprocess42
22 sys
.path
.append(common_lib
.SWARMING_DIR
)
24 from utils
import subprocess42
27 class TimeoutError(Exception):
31 class ControllerProcessWrapper(object):
32 """Controller-side process wrapper class.
34 This class provides a more intuitive interface to task-side processes
35 than calling the methods directly using the RPC object.
38 def __init__(self
, rpc
, cmd
, verbose
=False, detached
=False, cwd
=None,
39 key
=None, shell
=None):
40 logging
.debug('Creating a process with cmd=%s', cmd
)
42 self
._key
= rpc
.subprocess
.Process(cmd
, key
)
43 logging
.debug('Process created with key=%s', self
._key
)
45 self
._rpc
.subprocess
.SetVerbose(self
._key
)
47 self
._rpc
.subprocess
.SetDetached(self
._key
)
49 self
._rpc
.subprocess
.SetCwd(self
._key
, cwd
)
51 self
._rpc
.subprocess
.SetShell(self
._key
)
52 self
._rpc
.subprocess
.Start(self
._key
)
59 logging
.debug('Terminating process %s', self
._key
)
60 return self
._rpc
.subprocess
.Terminate(self
._key
)
63 logging
.debug('Killing process %s', self
._key
)
64 self
._rpc
.subprocess
.Kill(self
._key
)
67 return self
._rpc
.subprocess
.Delete(self
._key
)
69 def GetReturncode(self
):
70 return self
._rpc
.subprocess
.GetReturncode(self
._key
)
73 """Returns all stdout since the last call to ReadStdout.
75 This call allows the user to read stdout while the process is running.
76 However each call will flush the local stdout buffer. In order to make
77 multiple calls to ReadStdout and to retain the entire output the results
78 of this call will need to be buffered in the calling code.
80 return self
._rpc
.subprocess
.ReadStdout(self
._key
)
83 """Returns all stderr read since the last call to ReadStderr.
85 See ReadStdout for additional details.
87 return self
._rpc
.subprocess
.ReadStderr(self
._key
)
90 """Returns the (stdout, stderr) since the last Read* call.
92 See ReadStdout for additional details.
94 return self
._rpc
.subprocess
.ReadOutput(self
._key
)
96 def Wait(self
, timeout
=None):
97 return self
._rpc
.subprocess
.Wait(self
._key
, timeout
)
100 return self
._rpc
.subprocess
.Poll(self
._key
)
103 return self
._rpc
.subprocess
.GetPid(self
._key
)
106 class Process(object):
107 """Implements a task-side non-blocking subprocess.
109 This non-blocking subprocess allows the caller to continue operating while
110 also able to interact with this subprocess based on a key returned to
111 the caller at the time of creation.
113 Creation args are set via Set* methods called after calling Process but
114 before calling Start. This is due to a limitation of the XML-RPC
115 implementation not supporting keyword arguments.
120 _creation_lock
= threading
.Lock()
122 def __init__(self
, cmd
, key
):
131 self
.detached
= False
132 self
.complete
= False
133 self
.data_lock
= threading
.Lock()
134 self
.stdout_file
= open(self
._CreateOutputFilename
('stdout'), 'wb+')
135 self
.stderr_file
= open(self
._CreateOutputFilename
('stderr'), 'wb+')
137 def _CreateOutputFilename(self
, fname
):
138 return os
.path
.join(common_lib
.GetOutputDir(), '%s.%s' % (self
.key
, fname
))
141 return '%r, cwd=%r, verbose=%r, detached=%r' % (
142 self
.cmd
, self
.cwd
, self
.verbose
, self
.detached
)
145 for pipe
, data
in self
.proc
.yield_any():
149 self
.stdout_file
.write(data
)
150 self
.stdout_file
.flush()
152 sys
.stdout
.write(data
)
155 self
.stderr_file
.write(data
)
156 self
.stderr_file
.flush()
158 sys
.stderr
.write(data
)
163 for key
in cls
._processes
:
167 def Process(cls
, cmd
, key
=None):
168 with cls
._creation
_lock
:
170 key
= 'Process%d' % cls
._process
_next
_id
171 cls
._process
_next
_id
+= 1
172 if key
in cls
._processes
:
173 raise KeyError('Key %s already in use' % key
)
174 logging
.debug('Creating process %s with cmd %r', key
, cmd
)
175 cls
._processes
[key
] = cls(cmd
, key
)
179 logging
.info('Starting process %s', self
)
180 self
.proc
= subprocess42
.Popen(self
.cmd
, stdout
=subprocess42
.PIPE
,
181 stderr
=subprocess42
.PIPE
,
182 detached
=self
.detached
, cwd
=self
.cwd
,
184 threading
.Thread(target
=self
._reader
).start()
188 cls
._processes
[key
]._Start
()
191 def SetCwd(cls
, key
, cwd
):
192 """Sets the process's cwd."""
193 logging
.debug('Setting %s cwd to %s', key
, cwd
)
194 cls
._processes
[key
].cwd
= cwd
197 def SetShell(cls
, key
):
198 """Sets the process's shell arg to True."""
199 logging
.debug('Setting %s.shell = True', key
)
200 cls
._processes
[key
].shell
= True
203 def SetDetached(cls
, key
):
204 """Creates a detached process."""
205 logging
.debug('Setting %s.detached = True', key
)
206 cls
._processes
[key
].detached
= True
209 def SetVerbose(cls
, key
):
210 """Sets the stdout and stderr to be emitted locally."""
211 logging
.debug('Setting %s.verbose = True', key
)
212 cls
._processes
[key
].verbose
= True
215 def Terminate(cls
, key
):
216 logging
.debug('Terminating process %s', key
)
217 cls
._processes
[key
].proc
.terminate()
221 logging
.debug('Killing process %s', key
)
222 cls
._processes
[key
].proc
.kill()
225 def Delete(cls
, key
):
226 if cls
.GetReturncode(key
) is None:
227 logging
.warning('Killing %s before deleting it', key
)
229 logging
.debug('Deleting process %s', key
)
230 cls
._processes
.pop(key
)
233 def GetReturncode(cls
, key
):
234 return cls
._processes
[key
].proc
.returncode
237 def ReadStdout(cls
, key
):
238 """Returns all stdout since the last call to ReadStdout.
240 This call allows the user to read stdout while the process is running.
241 However each call will flush the local stdout buffer. In order to make
242 multiple calls to ReadStdout and to retain the entire output the results
243 of this call will need to be buffered in the calling code.
245 proc
= cls
._processes
[key
]
247 # Perform a "read" on the stdout data
253 def ReadStderr(cls
, key
):
254 """Returns all stderr read since the last call to ReadStderr.
256 See ReadStdout for additional details.
258 proc
= cls
._processes
[key
]
260 # Perform a "read" on the stderr data
266 def ReadOutput(cls
, key
):
267 """Returns the (stdout, stderr) since the last Read* call.
269 See ReadStdout for additional details.
271 return cls
.ReadStdout(key
), cls
.ReadStderr(key
)
274 def Wait(cls
, key
, timeout
=None):
275 """Wait for the process to complete.
277 We wait for all of the output to be written before returning. This solves
278 a race condition found on Windows where the output can lag behind the
282 TimeoutError if the process doesn't finish in the specified timeout.
284 end
= None if timeout
is None else timeout
+ time
.time()
285 while end
is None or end
> time
.time():
286 if cls
._processes
[key
].complete
:
293 return cls
._processes
[key
].proc
.poll()
296 def GetPid(cls
, key
):
297 return cls
._processes
[key
].proc
.pid