1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """RPC compatible subprocess-type module.
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
16 #pylint: disable=relative-import
19 # Map swarming_client to use subprocess42
20 sys
.path
.append(common_lib
.SWARMING_DIR
)
22 from utils
import subprocess42
25 class ControllerProcessWrapper(object):
26 """Controller-side process wrapper class.
28 This class provides a more intuitive interface to task-side processes
29 than calling the methods directly using the RPC object.
32 def __init__(self
, rpc
, cmd
, verbose
=False, detached
=False, cwd
=None):
34 self
._id
= rpc
.subprocess
.Process(cmd
)
36 self
._rpc
.subprocess
.SetVerbose(self
._id
)
38 self
._rpc
.subprocess
.SetDetached(self
._id
)
40 self
._rpc
.subprocess
.SetCwd(self
._rpc
, cwd
)
41 self
._rpc
.subprocess
.Start(self
._id
)
44 logging
.debug('Terminating process %s', self
._id
)
45 return self
._rpc
.subprocess
.Terminate(self
._id
)
48 logging
.debug('Killing process %s', self
._id
)
49 self
._rpc
.subprocess
.Kill(self
._id
)
52 return self
._rpc
.subprocess
.Delete(self
._id
)
54 def GetReturncode(self
):
55 return self
._rpc
.subprocess
.GetReturncode(self
._id
)
58 """Returns all stdout since the last call to ReadStdout.
60 This call allows the user to read stdout while the process is running.
61 However each call will flush the local stdout buffer. In order to make
62 multiple calls to ReadStdout and to retain the entire output the results
63 of this call will need to be buffered in the calling code.
65 return self
._rpc
.subprocess
.ReadStdout(self
._id
)
68 """Returns all stderr read since the last call to ReadStderr.
70 See ReadStdout for additional details.
72 return self
._rpc
.subprocess
.ReadStderr(self
._id
)
75 """Returns the (stdout, stderr) since the last Read* call.
77 See ReadStdout for additional details.
79 return self
._rpc
.subprocess
.ReadOutput(self
._id
)
82 return self
._rpc
.subprocess
.Wait(self
._id
)
85 return self
._rpc
.subprocess
.Poll(self
._id
)
88 return self
._rpc
.subprocess
.GetPid(self
._id
)
92 class Process(object):
93 """Implements a task-side non-blocking subprocess.
95 This non-blocking subprocess allows the caller to continue operating while
96 also able to interact with this subprocess based on a key returned to
97 the caller at the time of creation.
99 Creation args are set via Set* methods called after calling Process but
100 before calling Start. This is due to a limitation of the XML-RPC
101 implementation not supporting keyword arguments.
106 _creation_lock
= threading
.Lock()
108 def __init__(self
, cmd
):
115 self
.detached
= False
116 self
.data_lock
= threading
.Lock()
119 return '%r, cwd=%r, verbose=%r, detached=%r' % (
120 self
.cmd
, self
.cwd
, self
.verbose
, self
.detached
)
123 for pipe
, data
in self
.proc
.yield_any():
128 sys
.stdout
.write(data
)
132 sys
.stderr
.write(data
)
136 for key
in cls
._processes
:
140 def Process(cls
, cmd
):
141 with cls
._creation
_lock
:
142 key
= 'Process%d' % cls
._process
_next
_id
143 cls
._process
_next
_id
+= 1
144 logging
.debug('Creating process %s with cmd %r', key
, cmd
)
146 cls
._processes
[key
] = process
150 logging
.info('Starting process %s', self
)
151 self
.proc
= subprocess42
.Popen(self
.cmd
, stdout
=subprocess42
.PIPE
,
152 stderr
=subprocess42
.PIPE
,
153 detached
=self
.detached
, cwd
=self
.cwd
)
154 threading
.Thread(target
=self
._reader
).start()
158 cls
._processes
[key
]._Start
()
161 def SetCwd(cls
, key
, cwd
):
162 """Sets the process's cwd."""
163 logging
.debug('Setting %s cwd to %s', key
, cwd
)
164 cls
._processes
[key
].cwd
= cwd
167 def SetDetached(cls
, key
):
168 """Creates a detached process."""
169 logging
.debug('Setting %s.detached = True', key
)
170 cls
._processes
[key
].detached
= True
173 def SetVerbose(cls
, key
):
174 """Sets the stdout and stderr to be emitted locally."""
175 logging
.debug('Setting %s.verbose = True', key
)
176 cls
._processes
[key
].verbose
= True
179 def Terminate(cls
, key
):
180 logging
.debug('Terminating process %s', key
)
181 cls
._processes
[key
].proc
.terminate()
185 logging
.debug('Killing process %s', key
)
186 cls
._processes
[key
].proc
.kill()
189 def Delete(cls
, key
):
190 if cls
.GetReturncode(key
) is None:
191 logging
.warning('Killing %s before deleting it', key
)
193 logging
.debug('Deleting process %s', key
)
194 cls
._processes
.pop(key
)
197 def GetReturncode(cls
, key
):
198 return cls
._processes
[key
].proc
.returncode
201 def ReadStdout(cls
, key
):
202 """Returns all stdout since the last call to ReadStdout.
204 This call allows the user to read stdout while the process is running.
205 However each call will flush the local stdout buffer. In order to make
206 multiple calls to ReadStdout and to retain the entire output the results
207 of this call will need to be buffered in the calling code.
209 proc
= cls
._processes
[key
]
211 # Perform a "read" on the stdout data
217 def ReadStderr(cls
, key
):
218 """Returns all stderr read since the last call to ReadStderr.
220 See ReadStdout for additional details.
222 proc
= cls
._processes
[key
]
224 # Perform a "read" on the stderr data
230 def ReadOutput(cls
, key
):
231 """Returns the (stdout, stderr) since the last Read* call.
233 See ReadStdout for additional details.
235 return cls
.ReadStdout(key
), cls
.ReadStderr(key
)
239 return cls
._processes
[key
].proc
.wait()
243 return cls
._processes
[key
].proc
.poll()
246 def GetPid(cls
, key
):
247 return cls
._processes
[key
].proc
.pid