Add a test hook for AccountIdProvider.
[chromium-blink-merge.git] / testing / legion / process.py
blob356db6131bfa123c0a97f2bc1b7f0d908544ab22
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """RPC compatible subprocess-type module.
7 This module defined both a task-side process class as well as a controller-side
8 process wrapper for easier access and usage of the task-side process.
9 """
11 import logging
12 import subprocess
13 import sys
14 import threading
16 #pylint: disable=relative-import
17 import common_lib
19 # Map swarming_client to use subprocess42
20 sys.path.append(common_lib.SWARMING_DIR)
22 from utils import subprocess42
25 class ControllerProcessWrapper(object):
26 """Controller-side process wrapper class.
28 This class provides a more intuitive interface to task-side processes
29 than calling the methods directly using the RPC object.
30 """
32 def __init__(self, rpc, cmd, verbose=False, detached=False, cwd=None):
33 self._rpc = rpc
34 self._id = rpc.subprocess.Process(cmd)
35 if verbose:
36 self._rpc.subprocess.SetVerbose(self._id)
37 if detached:
38 self._rpc.subprocess.SetDetached(self._id)
39 if cwd:
40 self._rpc.subprocess.SetCwd(self._rpc, cwd)
41 self._rpc.subprocess.Start(self._id)
43 def Terminate(self):
44 logging.debug('Terminating process %s', self._id)
45 return self._rpc.subprocess.Terminate(self._id)
47 def Kill(self):
48 logging.debug('Killing process %s', self._id)
49 self._rpc.subprocess.Kill(self._id)
51 def Delete(self):
52 return self._rpc.subprocess.Delete(self._id)
54 def GetReturncode(self):
55 return self._rpc.subprocess.GetReturncode(self._id)
57 def ReadStdout(self):
58 """Returns all stdout since the last call to ReadStdout.
60 This call allows the user to read stdout while the process is running.
61 However each call will flush the local stdout buffer. In order to make
62 multiple calls to ReadStdout and to retain the entire output the results
63 of this call will need to be buffered in the calling code.
64 """
65 return self._rpc.subprocess.ReadStdout(self._id)
67 def ReadStderr(self):
68 """Returns all stderr read since the last call to ReadStderr.
70 See ReadStdout for additional details.
71 """
72 return self._rpc.subprocess.ReadStderr(self._id)
74 def ReadOutput(self):
75 """Returns the (stdout, stderr) since the last Read* call.
77 See ReadStdout for additional details.
78 """
79 return self._rpc.subprocess.ReadOutput(self._id)
81 def Wait(self):
82 return self._rpc.subprocess.Wait(self._id)
84 def Poll(self):
85 return self._rpc.subprocess.Poll(self._id)
87 def GetPid(self):
88 return self._rpc.subprocess.GetPid(self._id)
92 class Process(object):
93 """Implements a task-side non-blocking subprocess.
95 This non-blocking subprocess allows the caller to continue operating while
96 also able to interact with this subprocess based on a key returned to
97 the caller at the time of creation.
99 Creation args are set via Set* methods called after calling Process but
100 before calling Start. This is due to a limitation of the XML-RPC
101 implementation not supporting keyword arguments.
104 _processes = {}
105 _process_next_id = 0
106 _creation_lock = threading.Lock()
108 def __init__(self, cmd):
109 self.stdout = ''
110 self.stderr = ''
111 self.cmd = cmd
112 self.proc = None
113 self.cwd = None
114 self.verbose = False
115 self.detached = False
116 self.data_lock = threading.Lock()
118 def __str__(self):
119 return '%r, cwd=%r, verbose=%r, detached=%r' % (
120 self.cmd, self.cwd, self.verbose, self.detached)
122 def _reader(self):
123 for pipe, data in self.proc.yield_any():
124 with self.data_lock:
125 if pipe == 'stdout':
126 self.stdout += data
127 if self.verbose:
128 sys.stdout.write(data)
129 else:
130 self.stderr += data
131 if self.verbose:
132 sys.stderr.write(data)
134 @classmethod
135 def KillAll(cls):
136 for key in cls._processes:
137 cls.Kill(key)
139 @classmethod
140 def Process(cls, cmd):
141 with cls._creation_lock:
142 key = 'Process%d' % cls._process_next_id
143 cls._process_next_id += 1
144 logging.debug('Creating process %s with cmd %r', key, cmd)
145 process = cls(cmd)
146 cls._processes[key] = process
147 return key
149 def _Start(self):
150 logging.info('Starting process %s', self)
151 self.proc = subprocess42.Popen(self.cmd, stdout=subprocess42.PIPE,
152 stderr=subprocess42.PIPE,
153 detached=self.detached, cwd=self.cwd)
154 threading.Thread(target=self._reader).start()
156 @classmethod
157 def Start(cls, key):
158 cls._processes[key]._Start()
160 @classmethod
161 def SetCwd(cls, key, cwd):
162 """Sets the process's cwd."""
163 logging.debug('Setting %s cwd to %s', key, cwd)
164 cls._processes[key].cwd = cwd
166 @classmethod
167 def SetDetached(cls, key):
168 """Creates a detached process."""
169 logging.debug('Setting %s.detached = True', key)
170 cls._processes[key].detached = True
172 @classmethod
173 def SetVerbose(cls, key):
174 """Sets the stdout and stderr to be emitted locally."""
175 logging.debug('Setting %s.verbose = True', key)
176 cls._processes[key].verbose = True
178 @classmethod
179 def Terminate(cls, key):
180 logging.debug('Terminating process %s', key)
181 cls._processes[key].proc.terminate()
183 @classmethod
184 def Kill(cls, key):
185 logging.debug('Killing process %s', key)
186 cls._processes[key].proc.kill()
188 @classmethod
189 def Delete(cls, key):
190 if cls.GetReturncode(key) is None:
191 logging.warning('Killing %s before deleting it', key)
192 cls.Kill(key)
193 logging.debug('Deleting process %s', key)
194 cls._processes.pop(key)
196 @classmethod
197 def GetReturncode(cls, key):
198 return cls._processes[key].proc.returncode
200 @classmethod
201 def ReadStdout(cls, key):
202 """Returns all stdout since the last call to ReadStdout.
204 This call allows the user to read stdout while the process is running.
205 However each call will flush the local stdout buffer. In order to make
206 multiple calls to ReadStdout and to retain the entire output the results
207 of this call will need to be buffered in the calling code.
209 proc = cls._processes[key]
210 with proc.data_lock:
211 # Perform a "read" on the stdout data
212 stdout = proc.stdout
213 proc.stdout = ''
214 return stdout
216 @classmethod
217 def ReadStderr(cls, key):
218 """Returns all stderr read since the last call to ReadStderr.
220 See ReadStdout for additional details.
222 proc = cls._processes[key]
223 with proc.data_lock:
224 # Perform a "read" on the stderr data
225 stderr = proc.stderr
226 proc.stderr = ''
227 return stderr
229 @classmethod
230 def ReadOutput(cls, key):
231 """Returns the (stdout, stderr) since the last Read* call.
233 See ReadStdout for additional details.
235 return cls.ReadStdout(key), cls.ReadStderr(key)
237 @classmethod
238 def Wait(cls, key):
239 return cls._processes[key].proc.wait()
241 @classmethod
242 def Poll(cls, key):
243 return cls._processes[key].proc.poll()
245 @classmethod
246 def GetPid(cls, key):
247 return cls._processes[key].proc.pid