Roll src/third_party/WebKit d9c6159:8139f33 (svn 201974:201975)
[chromium-blink-merge.git] / testing / legion / process.py
blob6bcc5ea94832494c85b6374fa7ade92d9afbc156
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """RPC compatible subprocess-type module.
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
9 """
11 import logging
12 import os
13 import subprocess
14 import sys
15 import threading
16 import time
18 #pylint: disable=relative-import
19 import common_lib
21 # Map swarming_client to use subprocess42
22 sys.path.append(common_lib.SWARMING_DIR)
24 from utils import subprocess42
27 class TimeoutError(Exception):
28 pass
31 class ControllerProcessWrapper(object):
32 """Controller-side process wrapper class.
34 This class provides a more intuitive interface to task-side processes
35 than calling the methods directly using the RPC object.
36 """
38 def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None,
39 key=None, shell=None):
40 logging.debug('Creating a process with cmd=%s', cmd)
41 self._rpc = rpc
42 self._key = rpc.subprocess.Process(cmd, key)
43 logging.debug('Process created with key=%s', self._key)
44 if verbose:
45 self._rpc.subprocess.SetVerbose(self._key)
46 if detached:
47 self._rpc.subprocess.SetDetached(self._key)
48 if cwd:
49 self._rpc.subprocess.SetCwd(self._key, cwd)
50 if shell:
51 self._rpc.subprocess.SetShell(self._key)
52 self._rpc.subprocess.Start(self._key)
54 @property
55 def key(self):
56 return self._key
58 def Terminate(self):
59 logging.debug('Terminating process %s', self._key)
60 return self._rpc.subprocess.Terminate(self._key)
62 def Kill(self):
63 logging.debug('Killing process %s', self._key)
64 self._rpc.subprocess.Kill(self._key)
66 def Delete(self):
67 return self._rpc.subprocess.Delete(self._key)
69 def GetReturncode(self):
70 return self._rpc.subprocess.GetReturncode(self._key)
72 def ReadStdout(self):
73 """Returns all stdout since the last call to ReadStdout.
75 This call allows the user to read stdout while the process is running.
76 However each call will flush the local stdout buffer. In order to make
77 multiple calls to ReadStdout and to retain the entire output the results
78 of this call will need to be buffered in the calling code.
79 """
80 return self._rpc.subprocess.ReadStdout(self._key)
82 def ReadStderr(self):
83 """Returns all stderr read since the last call to ReadStderr.
85 See ReadStdout for additional details.
86 """
87 return self._rpc.subprocess.ReadStderr(self._key)
89 def ReadOutput(self):
90 """Returns the (stdout, stderr) since the last Read* call.
92 See ReadStdout for additional details.
93 """
94 return self._rpc.subprocess.ReadOutput(self._key)
96 def Wait(self, timeout=None):
97 return self._rpc.subprocess.Wait(self._key, timeout)
99 def Poll(self):
100 return self._rpc.subprocess.Poll(self._key)
102 def GetPid(self):
103 return self._rpc.subprocess.GetPid(self._key)
106 class Process(object):
107 """Implements a task-side non-blocking subprocess.
109 This non-blocking subprocess allows the caller to continue operating while
110 also able to interact with this subprocess based on a key returned to
111 the caller at the time of creation.
113 Creation args are set via Set* methods called after calling Process but
114 before calling Start. This is due to a limitation of the XML-RPC
115 implementation not supporting keyword arguments.
118 _processes = {}
119 _process_next_id = 0
120 _creation_lock = threading.Lock()
122 def __init__(self, cmd, key):
123 self.stdout = ''
124 self.stderr = ''
125 self.key = key
126 self.cmd = cmd
127 self.proc = None
128 self.cwd = None
129 self.shell = False
130 self.verbose = False
131 self.detached = False
132 self.complete = False
133 self.data_lock = threading.Lock()
134 self.stdout_file = open(self._CreateOutputFilename('stdout'), 'wb+')
135 self.stderr_file = open(self._CreateOutputFilename('stderr'), 'wb+')
137 def _CreateOutputFilename(self, fname):
138 return os.path.join(common_lib.GetOutputDir(), '%s.%s' % (self.key, fname))
140 def __str__(self):
141 return '%r, cwd=%r, verbose=%r, detached=%r' % (
142 self.cmd, self.cwd, self.verbose, self.detached)
144 def _reader(self):
145 for pipe, data in self.proc.yield_any():
146 with self.data_lock:
147 if pipe == 'stdout':
148 self.stdout += data
149 self.stdout_file.write(data)
150 self.stdout_file.flush()
151 if self.verbose:
152 sys.stdout.write(data)
153 else:
154 self.stderr += data
155 self.stderr_file.write(data)
156 self.stderr_file.flush()
157 if self.verbose:
158 sys.stderr.write(data)
159 self.complete = True
161 @classmethod
162 def KillAll(cls):
163 for key in cls._processes:
164 cls.Kill(key)
166 @classmethod
167 def Process(cls, cmd, key=None):
168 with cls._creation_lock:
169 if not key:
170 key = 'Process%d' % cls._process_next_id
171 cls._process_next_id += 1
172 if key in cls._processes:
173 raise KeyError('Key %s already in use' % key)
174 logging.debug('Creating process %s with cmd %r', key, cmd)
175 cls._processes[key] = cls(cmd, key)
176 return key
178 def _Start(self):
179 logging.info('Starting process %s', self)
180 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
181 stderr=subprocess42.PIPE,
182 detached=self.detached, cwd=self.cwd,
183 shell=self.shell)
184 threading.Thread(target=self._reader).start()
186 @classmethod
187 def Start(cls, key):
188 cls._processes[key]._Start()
190 @classmethod
191 def SetCwd(cls, key, cwd):
192 """Sets the process's cwd."""
193 logging.debug('Setting %s cwd to %s', key, cwd)
194 cls._processes[key].cwd = cwd
196 @classmethod
197 def SetShell(cls, key):
198 """Sets the process's shell arg to True."""
199 logging.debug('Setting %s.shell = True', key)
200 cls._processes[key].shell = True
202 @classmethod
203 def SetDetached(cls, key):
204 """Creates a detached process."""
205 logging.debug('Setting %s.detached = True', key)
206 cls._processes[key].detached = True
208 @classmethod
209 def SetVerbose(cls, key):
210 """Sets the stdout and stderr to be emitted locally."""
211 logging.debug('Setting %s.verbose = True', key)
212 cls._processes[key].verbose = True
214 @classmethod
215 def Terminate(cls, key):
216 logging.debug('Terminating process %s', key)
217 cls._processes[key].proc.terminate()
219 @classmethod
220 def Kill(cls, key):
221 logging.debug('Killing process %s', key)
222 cls._processes[key].proc.kill()
224 @classmethod
225 def Delete(cls, key):
226 if cls.GetReturncode(key) is None:
227 logging.warning('Killing %s before deleting it', key)
228 cls.Kill(key)
229 logging.debug('Deleting process %s', key)
230 cls._processes.pop(key)
232 @classmethod
233 def GetReturncode(cls, key):
234 return cls._processes[key].proc.returncode
236 @classmethod
237 def ReadStdout(cls, key):
238 """Returns all stdout since the last call to ReadStdout.
240 This call allows the user to read stdout while the process is running.
241 However each call will flush the local stdout buffer. In order to make
242 multiple calls to ReadStdout and to retain the entire output the results
243 of this call will need to be buffered in the calling code.
245 proc = cls._processes[key]
246 with proc.data_lock:
247 # Perform a "read" on the stdout data
248 stdout = proc.stdout
249 proc.stdout = ''
250 return stdout
252 @classmethod
253 def ReadStderr(cls, key):
254 """Returns all stderr read since the last call to ReadStderr.
256 See ReadStdout for additional details.
258 proc = cls._processes[key]
259 with proc.data_lock:
260 # Perform a "read" on the stderr data
261 stderr = proc.stderr
262 proc.stderr = ''
263 return stderr
265 @classmethod
266 def ReadOutput(cls, key):
267 """Returns the (stdout, stderr) since the last Read* call.
269 See ReadStdout for additional details.
271 return cls.ReadStdout(key), cls.ReadStderr(key)
273 @classmethod
274 def Wait(cls, key, timeout=None):
275 """Wait for the process to complete.
277 We wait for all of the output to be written before returning. This solves
278 a race condition found on Windows where the output can lag behind the
279 wait call.
281 Raises:
282 TimeoutError if the process doesn't finish in the specified timeout.
284 end = None if timeout is None else timeout + time.time()
285 while end is None or end > time.time():
286 if cls._processes[key].complete:
287 return
288 time.sleep(0.05)
289 raise TimeoutError()
291 @classmethod
292 def Poll(cls, key):
293 return cls._processes[key].proc.poll()
295 @classmethod
296 def GetPid(cls, key):
297 return cls._processes[key].proc.pid