Update activity log owners
[chromium-blink-merge.git] / testing / legion / process.py
blobf3cabb5d94b4ab9885aa72b2de67b722e75386a5
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """RPC compatible subprocess-type module.
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
9 """
11 import logging
12 import os
13 import subprocess
14 import sys
15 import threading
17 #pylint: disable=relative-import
18 import common_lib
20 # Map swarming_client to use subprocess42
21 sys.path.append(common_lib.SWARMING_DIR)
23 from utils import subprocess42
26 class ControllerProcessWrapper(object):
27 """Controller-side process wrapper class.
29 This class provides a more intuitive interface to task-side processes
30 than calling the methods directly using the RPC object.
31 """
33 def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None,
34 key=None):
35 logging.info('Creating a process with cmd=%s', cmd)
36 self._rpc = rpc
37 self._key = rpc.subprocess.Process(cmd, key)
38 logging.info('Process created with key=%s', self._key)
39 if verbose:
40 self._rpc.subprocess.SetVerbose(self._key)
41 if detached:
42 self._rpc.subprocess.SetDetached(self._key)
43 if cwd:
44 self._rpc.subprocess.SetCwd(self._rpc, cwd)
45 self._rpc.subprocess.Start(self._key)
47 @property
48 def key(self):
49 return self._key
51 def Terminate(self):
52 logging.debug('Terminating process %s', self._key)
53 return self._rpc.subprocess.Terminate(self._key)
55 def Kill(self):
56 logging.debug('Killing process %s', self._key)
57 self._rpc.subprocess.Kill(self._key)
59 def Delete(self):
60 return self._rpc.subprocess.Delete(self._key)
62 def GetReturncode(self):
63 return self._rpc.subprocess.GetReturncode(self._key)
65 def ReadStdout(self):
66 """Returns all stdout since the last call to ReadStdout.
68 This call allows the user to read stdout while the process is running.
69 However each call will flush the local stdout buffer. In order to make
70 multiple calls to ReadStdout and to retain the entire output the results
71 of this call will need to be buffered in the calling code.
72 """
73 return self._rpc.subprocess.ReadStdout(self._key)
75 def ReadStderr(self):
76 """Returns all stderr read since the last call to ReadStderr.
78 See ReadStdout for additional details.
79 """
80 return self._rpc.subprocess.ReadStderr(self._key)
82 def ReadOutput(self):
83 """Returns the (stdout, stderr) since the last Read* call.
85 See ReadStdout for additional details.
86 """
87 return self._rpc.subprocess.ReadOutput(self._key)
89 def Wait(self):
90 return self._rpc.subprocess.Wait(self._key)
92 def Poll(self):
93 return self._rpc.subprocess.Poll(self._key)
95 def GetPid(self):
96 return self._rpc.subprocess.GetPid(self._key)
100 class Process(object):
101 """Implements a task-side non-blocking subprocess.
103 This non-blocking subprocess allows the caller to continue operating while
104 also able to interact with this subprocess based on a key returned to
105 the caller at the time of creation.
107 Creation args are set via Set* methods called after calling Process but
108 before calling Start. This is due to a limitation of the XML-RPC
109 implementation not supporting keyword arguments.
112 _processes = {}
113 _process_next_id = 0
114 _creation_lock = threading.Lock()
116 def __init__(self, cmd, key):
117 self.stdout = ''
118 self.stderr = ''
119 self.key = key
120 self.cmd = cmd
121 self.proc = None
122 self.cwd = None
123 self.verbose = False
124 self.detached = False
125 self.data_lock = threading.Lock()
126 self.stdout_file = open(self._CreateOutputFilename('stdout'), 'wb+')
127 self.stderr_file = open(self._CreateOutputFilename('stderr'), 'wb+')
129 def _CreateOutputFilename(self, fname):
130 return os.path.join(common_lib.GetOutputDir(), '%s.%s' % (self.key, fname))
132 def __str__(self):
133 return '%r, cwd=%r, verbose=%r, detached=%r' % (
134 self.cmd, self.cwd, self.verbose, self.detached)
136 def _reader(self):
137 for pipe, data in self.proc.yield_any():
138 with self.data_lock:
139 if pipe == 'stdout':
140 self.stdout += data
141 self.stdout_file.write(data)
142 self.stdout_file.flush()
143 if self.verbose:
144 sys.stdout.write(data)
145 else:
146 self.stderr += data
147 self.stderr_file.write(data)
148 self.stderr_file.flush()
149 if self.verbose:
150 sys.stderr.write(data)
152 @classmethod
153 def KillAll(cls):
154 for key in cls._processes:
155 cls.Kill(key)
157 @classmethod
158 def Process(cls, cmd, key=None):
159 with cls._creation_lock:
160 if not key:
161 key = 'Process%d' % cls._process_next_id
162 cls._process_next_id += 1
163 if key in cls._processes:
164 raise KeyError('Key %s already in use' % key)
165 logging.debug('Creating process %s with cmd %r', key, cmd)
166 cls._processes[key] = cls(cmd, key)
167 return key
169 def _Start(self):
170 logging.info('Starting process %s', self)
171 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
172 stderr=subprocess42.PIPE,
173 detached=self.detached, cwd=self.cwd)
174 threading.Thread(target=self._reader).start()
176 @classmethod
177 def Start(cls, key):
178 cls._processes[key]._Start()
180 @classmethod
181 def SetCwd(cls, key, cwd):
182 """Sets the process's cwd."""
183 logging.debug('Setting %s cwd to %s', key, cwd)
184 cls._processes[key].cwd = cwd
186 @classmethod
187 def SetDetached(cls, key):
188 """Creates a detached process."""
189 logging.debug('Setting %s.detached = True', key)
190 cls._processes[key].detached = True
192 @classmethod
193 def SetVerbose(cls, key):
194 """Sets the stdout and stderr to be emitted locally."""
195 logging.debug('Setting %s.verbose = True', key)
196 cls._processes[key].verbose = True
198 @classmethod
199 def Terminate(cls, key):
200 logging.debug('Terminating process %s', key)
201 cls._processes[key].proc.terminate()
203 @classmethod
204 def Kill(cls, key):
205 logging.debug('Killing process %s', key)
206 cls._processes[key].proc.kill()
208 @classmethod
209 def Delete(cls, key):
210 if cls.GetReturncode(key) is None:
211 logging.warning('Killing %s before deleting it', key)
212 cls.Kill(key)
213 logging.debug('Deleting process %s', key)
214 cls._processes.pop(key)
216 @classmethod
217 def GetReturncode(cls, key):
218 return cls._processes[key].proc.returncode
220 @classmethod
221 def ReadStdout(cls, key):
222 """Returns all stdout since the last call to ReadStdout.
224 This call allows the user to read stdout while the process is running.
225 However each call will flush the local stdout buffer. In order to make
226 multiple calls to ReadStdout and to retain the entire output the results
227 of this call will need to be buffered in the calling code.
229 proc = cls._processes[key]
230 with proc.data_lock:
231 # Perform a "read" on the stdout data
232 stdout = proc.stdout
233 proc.stdout = ''
234 return stdout
236 @classmethod
237 def ReadStderr(cls, key):
238 """Returns all stderr read since the last call to ReadStderr.
240 See ReadStdout for additional details.
242 proc = cls._processes[key]
243 with proc.data_lock:
244 # Perform a "read" on the stderr data
245 stderr = proc.stderr
246 proc.stderr = ''
247 return stderr
249 @classmethod
250 def ReadOutput(cls, key):
251 """Returns the (stdout, stderr) since the last Read* call.
253 See ReadStdout for additional details.
255 return cls.ReadStdout(key), cls.ReadStderr(key)
257 @classmethod
258 def Wait(cls, key):
259 return cls._processes[key].proc.wait()
261 @classmethod
262 def Poll(cls, key):
263 return cls._processes[key].proc.poll()
265 @classmethod
266 def GetPid(cls, key):
267 return cls._processes[key].proc.pid