1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """RPC compatible subprocess-type module.
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
17 #pylint: disable=relative-import
20 # Map swarming_client to use subprocess42
21 sys
.path
.append(common_lib
.SWARMING_DIR
)
23 from utils
import subprocess42
26 class ControllerProcessWrapper(object):
27 """Controller-side process wrapper class.
29 This class provides a more intuitive interface to task-side processes
30 than calling the methods directly using the RPC object.
33 def __init__(self
, rpc
, cmd
, verbose
=False, detached
=False, cwd
=None,
35 logging
.info('Creating a process with cmd=%s', cmd
)
37 self
._key
= rpc
.subprocess
.Process(cmd
, key
)
38 logging
.info('Process created with key=%s', self
._key
)
40 self
._rpc
.subprocess
.SetVerbose(self
._key
)
42 self
._rpc
.subprocess
.SetDetached(self
._key
)
44 self
._rpc
.subprocess
.SetCwd(self
._rpc
, cwd
)
45 self
._rpc
.subprocess
.Start(self
._key
)
52 logging
.debug('Terminating process %s', self
._key
)
53 return self
._rpc
.subprocess
.Terminate(self
._key
)
56 logging
.debug('Killing process %s', self
._key
)
57 self
._rpc
.subprocess
.Kill(self
._key
)
60 return self
._rpc
.subprocess
.Delete(self
._key
)
62 def GetReturncode(self
):
63 return self
._rpc
.subprocess
.GetReturncode(self
._key
)
66 """Returns all stdout since the last call to ReadStdout.
68 This call allows the user to read stdout while the process is running.
69 However each call will flush the local stdout buffer. In order to make
70 multiple calls to ReadStdout and to retain the entire output the results
71 of this call will need to be buffered in the calling code.
73 return self
._rpc
.subprocess
.ReadStdout(self
._key
)
76 """Returns all stderr read since the last call to ReadStderr.
78 See ReadStdout for additional details.
80 return self
._rpc
.subprocess
.ReadStderr(self
._key
)
83 """Returns the (stdout, stderr) since the last Read* call.
85 See ReadStdout for additional details.
87 return self
._rpc
.subprocess
.ReadOutput(self
._key
)
90 return self
._rpc
.subprocess
.Wait(self
._key
)
93 return self
._rpc
.subprocess
.Poll(self
._key
)
96 return self
._rpc
.subprocess
.GetPid(self
._key
)
100 class Process(object):
101 """Implements a task-side non-blocking subprocess.
103 This non-blocking subprocess allows the caller to continue operating while
104 also able to interact with this subprocess based on a key returned to
105 the caller at the time of creation.
107 Creation args are set via Set* methods called after calling Process but
108 before calling Start. This is due to a limitation of the XML-RPC
109 implementation not supporting keyword arguments.
114 _creation_lock
= threading
.Lock()
116 def __init__(self
, cmd
, key
):
124 self
.detached
= False
125 self
.data_lock
= threading
.Lock()
126 self
.stdout_file
= open(self
._CreateOutputFilename
('stdout'), 'wb+')
127 self
.stderr_file
= open(self
._CreateOutputFilename
('stderr'), 'wb+')
129 def _CreateOutputFilename(self
, fname
):
130 return os
.path
.join(common_lib
.GetOutputDir(), '%s.%s' % (self
.key
, fname
))
133 return '%r, cwd=%r, verbose=%r, detached=%r' % (
134 self
.cmd
, self
.cwd
, self
.verbose
, self
.detached
)
137 for pipe
, data
in self
.proc
.yield_any():
141 self
.stdout_file
.write(data
)
142 self
.stdout_file
.flush()
144 sys
.stdout
.write(data
)
147 self
.stderr_file
.write(data
)
148 self
.stderr_file
.flush()
150 sys
.stderr
.write(data
)
154 for key
in cls
._processes
:
158 def Process(cls
, cmd
, key
=None):
159 with cls
._creation
_lock
:
161 key
= 'Process%d' % cls
._process
_next
_id
162 cls
._process
_next
_id
+= 1
163 if key
in cls
._processes
:
164 raise KeyError('Key %s already in use' % key
)
165 logging
.debug('Creating process %s with cmd %r', key
, cmd
)
166 cls
._processes
[key
] = cls(cmd
, key
)
170 logging
.info('Starting process %s', self
)
171 self
.proc
= subprocess42
.Popen(self
.cmd
, stdout
=subprocess42
.PIPE
,
172 stderr
=subprocess42
.PIPE
,
173 detached
=self
.detached
, cwd
=self
.cwd
)
174 threading
.Thread(target
=self
._reader
).start()
178 cls
._processes
[key
]._Start
()
181 def SetCwd(cls
, key
, cwd
):
182 """Sets the process's cwd."""
183 logging
.debug('Setting %s cwd to %s', key
, cwd
)
184 cls
._processes
[key
].cwd
= cwd
187 def SetDetached(cls
, key
):
188 """Creates a detached process."""
189 logging
.debug('Setting %s.detached = True', key
)
190 cls
._processes
[key
].detached
= True
193 def SetVerbose(cls
, key
):
194 """Sets the stdout and stderr to be emitted locally."""
195 logging
.debug('Setting %s.verbose = True', key
)
196 cls
._processes
[key
].verbose
= True
199 def Terminate(cls
, key
):
200 logging
.debug('Terminating process %s', key
)
201 cls
._processes
[key
].proc
.terminate()
205 logging
.debug('Killing process %s', key
)
206 cls
._processes
[key
].proc
.kill()
209 def Delete(cls
, key
):
210 if cls
.GetReturncode(key
) is None:
211 logging
.warning('Killing %s before deleting it', key
)
213 logging
.debug('Deleting process %s', key
)
214 cls
._processes
.pop(key
)
217 def GetReturncode(cls
, key
):
218 return cls
._processes
[key
].proc
.returncode
221 def ReadStdout(cls
, key
):
222 """Returns all stdout since the last call to ReadStdout.
224 This call allows the user to read stdout while the process is running.
225 However each call will flush the local stdout buffer. In order to make
226 multiple calls to ReadStdout and to retain the entire output the results
227 of this call will need to be buffered in the calling code.
229 proc
= cls
._processes
[key
]
231 # Perform a "read" on the stdout data
237 def ReadStderr(cls
, key
):
238 """Returns all stderr read since the last call to ReadStderr.
240 See ReadStdout for additional details.
242 proc
= cls
._processes
[key
]
244 # Perform a "read" on the stderr data
250 def ReadOutput(cls
, key
):
251 """Returns the (stdout, stderr) since the last Read* call.
253 See ReadStdout for additional details.
255 return cls
.ReadStdout(key
), cls
.ReadStderr(key
)
259 return cls
._processes
[key
].proc
.wait()
263 return cls
._processes
[key
].proc
.poll()
266 def GetPid(cls
, key
):
267 return cls
._processes
[key
].proc
.pid