Add some missing owners for SBClientPhishing.* histograms.
[chromium-blink-merge.git] / testing / legion / task_controller.py
blob1725f8fbe18873672111e75fd24f515146c2c85b
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Defines the task controller library."""
7 import argparse
8 import datetime
9 import logging
10 import os
11 import socket
12 import subprocess
13 import sys
14 import tempfile
15 import threading
17 #pylint: disable=relative-import
18 import common_lib
19 import process
20 import rpc_server
21 import jsonrpclib
23 ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
24 SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
27 class Error(Exception):
28 pass
31 class ConnectionTimeoutError(Error):
32 pass
35 class TaskController(object):
36 """Provisions, configures, and controls a task machine.
38 This class is an abstraction of a physical task machine. It provides an
39 end to end API for controlling a task machine. Operations on the task machine
40 are performed using the instance's "rpc" property. A simple end to end
41 scenario is as follows:
43 task = TaskController(...)
44 task.Create()
45 task.WaitForConnection()
46 proc = task.rpc.subprocess.Popen(['ls'])
47 print task.rpc.subprocess.GetStdout(proc)
48 task.Release()
49 """
51 _task_count = 0
52 _tasks = []
54 def __init__(self, isolated_hash, dimensions, priority=100,
55 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
56 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
57 verbosity='ERROR', name=None, run_id=None):
58 assert isinstance(dimensions, dict)
59 type(self)._tasks.append(self)
60 type(self)._task_count += 1
61 self.verbosity = verbosity
62 self._name = name or 'Task%d' % type(self)._task_count
63 self._priority = priority
64 self._isolated_hash = isolated_hash
65 self._idle_timeout_secs = idle_timeout_secs
66 self._dimensions = dimensions
67 self._connect_event = threading.Event()
68 self._connected = False
69 self._ip_address = None
70 self._otp = self._CreateOTP()
71 self._rpc = None
72 self._output_dir = None
73 self._platform = None
74 self._executable = None
76 run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
77 self._task_name = '%s/%s/%s' % (
78 os.path.splitext(sys.argv[0])[0], self._name, run_id)
80 parser = argparse.ArgumentParser()
81 parser.add_argument('--isolate-server')
82 parser.add_argument('--swarming-server')
83 parser.add_argument('--task-connection-timeout-secs',
84 default=common_lib.DEFAULT_TIMEOUT_SECS)
85 args, _ = parser.parse_known_args()
87 self._isolate_server = args.isolate_server
88 self._swarming_server = args.swarming_server
89 self._connection_timeout_secs = (connection_timeout_secs or
90 args.task_connection_timeout_secs)
92 @property
93 def name(self):
94 return self._name
96 @property
97 def otp(self):
98 return self._otp
100 @property
101 def connected(self):
102 return self._connected
104 @property
105 def connect_event(self):
106 return self._connect_event
108 @property
109 def rpc(self):
110 return self._rpc
112 @property
113 def verbosity(self):
114 return self._verbosity
116 @verbosity.setter
117 def verbosity(self, level):
118 """Sets the verbosity level as a string.
120 Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
121 logging.DEBUG, etc) is allowed.
123 assert isinstance(level, (str, int))
124 if isinstance(level, int):
125 level = logging.getLevelName(level)
126 self._verbosity = level #pylint: disable=attribute-defined-outside-init
128 @property
129 def output_dir(self):
130 if not self._output_dir:
131 self._output_dir = self.rpc.GetOutputDir()
132 return self._output_dir
134 @property
135 def platform(self):
136 if not self._platform:
137 self._platform = self._rpc.GetPlatform()
138 return self._platform
140 @property
141 def ip_address(self):
142 if not self._ip_address:
143 self._ip_address = self.rpc.GetIpAddress()
144 return self._ip_address
146 @property
147 def executable(self):
148 if not self._executable:
149 self._executable = self.rpc.GetExecutable()
150 return self._executable
152 @classmethod
153 def ReleaseAllTasks(cls):
154 for task in cls._tasks:
155 task.Release()
157 def Process(self, cmd, *args, **kwargs):
158 return process.ControllerProcessWrapper(self.rpc, cmd, *args, **kwargs)
160 def _CreateOTP(self):
161 """Creates the OTP."""
162 controller_name = socket.gethostname()
163 test_name = os.path.basename(sys.argv[0])
164 creation_time = datetime.datetime.utcnow()
165 otp = 'task:%s controller:%s test:%s creation:%s' % (
166 self._name, controller_name, test_name, creation_time)
167 return otp
169 def Create(self):
170 """Creates the task machine."""
171 logging.info('Creating %s', self.name)
172 self._connect_event.clear()
173 self._ExecuteSwarming()
175 def WaitForConnection(self):
176 """Waits for the task machine to connect.
178 Raises:
179 ConnectionTimeoutError if the task doesn't connect in time.
181 logging.info('Waiting for %s to connect with a timeout of %d seconds',
182 self._name, self._connection_timeout_secs)
183 self._connect_event.wait(self._connection_timeout_secs)
184 if not self._connect_event.is_set():
185 raise ConnectionTimeoutError('%s failed to connect' % self.name)
187 def Release(self):
188 """Quits the task's RPC server so it can release the machine."""
189 if self._rpc is not None and self._connected:
190 logging.info('Copying output-dir files to controller')
191 self.RetrieveOutputFiles()
192 logging.info('Releasing %s', self._name)
193 try:
194 self._rpc.Quit()
195 except (socket.error, jsonrpclib.Fault):
196 logging.error('Unable to connect to %s to call Quit', self.name)
197 self._rpc = None
198 self._connected = False
200 def _ExecuteSwarming(self):
201 """Executes swarming.py."""
202 cmd = [
203 'python',
204 SWARMING_PY,
205 'trigger',
206 self._isolated_hash,
207 '--priority', str(self._priority),
208 '--task-name', self._task_name,
211 if self._isolate_server:
212 cmd.extend(['--isolate-server', self._isolate_server])
213 if self._swarming_server:
214 cmd.extend(['--swarming', self._swarming_server])
215 for key, value in self._dimensions.iteritems():
216 cmd.extend(['--dimension', key, value])
218 cmd.extend([
219 '--',
220 '--controller', common_lib.MY_IP,
221 '--otp', self._otp,
222 '--verbosity', self._verbosity,
223 '--idle-timeout', str(self._idle_timeout_secs),
224 '--output-dir', '${ISOLATED_OUTDIR}'
227 self._ExecuteProcess(cmd)
229 def _ExecuteProcess(self, cmd):
230 """Executes a process, waits for it to complete, and checks for success."""
231 logging.debug('Running %s', ' '.join(cmd))
232 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
233 _, stderr = p.communicate()
234 if p.returncode != 0:
235 raise Error(stderr)
237 def OnConnect(self, ip_address):
238 """Receives task ip address on connection."""
239 self._ip_address = ip_address
240 self._connected = True
241 self._rpc = rpc_server.RpcServer.Connect(self._ip_address)
242 logging.info('%s connected from %s', self._name, ip_address)
243 self._connect_event.set()
245 def RetrieveOutputFiles(self):
246 """Retrieves all files in the output-dir."""
247 files = self.rpc.ListDir(self.output_dir)
248 for fname in files:
249 remote_path = self.rpc.PathJoin(self.output_dir, fname)
250 local_name = os.path.join(common_lib.GetOutputDir(),
251 '%s.%s' % (self.name, fname))
252 contents = self.rpc.ReadFile(remote_path)
253 with open(local_name, 'wb+') as fh:
254 fh.write(contents)