Roll src/third_party/WebKit e0eac24:489c548 (svn 193311:193320)
[chromium-blink-merge.git] / testing / legion / task_controller.py
blob9514f44fddd89a0ad6dae6b67ccc4b086417d37a
1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Defines the task controller library."""
7 import argparse
8 import datetime
9 import logging
10 import os
11 import socket
12 import subprocess
13 import sys
14 import tempfile
15 import threading
16 import xmlrpclib
18 #pylint: disable=relative-import
19 import common_lib
20 import process
22 ISOLATE_PY = os.path.join(common_lib.SWARMING_DIR, 'isolate.py')
23 SWARMING_PY = os.path.join(common_lib.SWARMING_DIR, 'swarming.py')
26 class Error(Exception):
27 pass
30 class ConnectionTimeoutError(Error):
31 pass
34 class TaskController(object):
35 """Provisions, configures, and controls a task machine.
37 This class is an abstraction of a physical task machine. It provides an
38 end to end API for controlling a task machine. Operations on the task machine
39 are performed using the instance's "rpc" property. A simple end to end
40 scenario is as follows:
42 task = TaskController(...)
43 task.Create()
44 task.WaitForConnection()
45 proc = task.rpc.subprocess.Popen(['ls'])
46 print task.rpc.subprocess.GetStdout(proc)
47 task.Release()
48 """
50 _task_count = 0
51 _tasks = []
53 def __init__(self, isolated_hash, dimensions, priority=100,
54 idle_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
55 connection_timeout_secs=common_lib.DEFAULT_TIMEOUT_SECS,
56 verbosity='ERROR', name=None, run_id=None):
57 assert isinstance(dimensions, dict)
58 type(self)._tasks.append(self)
59 type(self)._task_count += 1
60 self.verbosity = verbosity
61 self._name = name or 'Task%d' % type(self)._task_count
62 self._priority = priority
63 self._isolated_hash = isolated_hash
64 self._idle_timeout_secs = idle_timeout_secs
65 self._dimensions = dimensions
66 self._connect_event = threading.Event()
67 self._connected = False
68 self._ip_address = None
69 self._otp = self._CreateOTP()
70 self._rpc = None
72 run_id = run_id or datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
73 self._task_name = '%s/%s/%s' % (
74 os.path.splitext(sys.argv[0])[0], self._name, run_id)
76 parser = argparse.ArgumentParser()
77 parser.add_argument('--isolate-server')
78 parser.add_argument('--swarming-server')
79 parser.add_argument('--task-connection-timeout-secs',
80 default=common_lib.DEFAULT_TIMEOUT_SECS)
81 args, _ = parser.parse_known_args()
83 self._isolate_server = args.isolate_server
84 self._swarming_server = args.swarming_server
85 self._connection_timeout_secs = (connection_timeout_secs or
86 args.task_connection_timeout_secs)
88 @property
89 def name(self):
90 return self._name
92 @property
93 def otp(self):
94 return self._otp
96 @property
97 def connected(self):
98 return self._connected
100 @property
101 def connect_event(self):
102 return self._connect_event
104 @property
105 def rpc(self):
106 return self._rpc
108 @property
109 def verbosity(self):
110 return self._verbosity
112 @verbosity.setter
113 def verbosity(self, level):
114 """Sets the verbosity level as a string.
116 Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
117 logging.DEBUG, etc) is allowed.
119 assert isinstance(level, (str, int))
120 if isinstance(level, int):
121 level = logging.getLevelName(level)
122 self._verbosity = level #pylint: disable=attribute-defined-outside-init
124 @classmethod
125 def ReleaseAllTasks(cls):
126 for task in cls._tasks:
127 task.Release()
129 def Process(self, cmd, verbose=False, detached=False, cwd=None):
130 return process.ControllerProcessWrapper(
131 self.rpc, cmd, verbose, detached, cwd)
133 def _CreateOTP(self):
134 """Creates the OTP."""
135 controller_name = socket.gethostname()
136 test_name = os.path.basename(sys.argv[0])
137 creation_time = datetime.datetime.utcnow()
138 otp = 'task:%s controller:%s test:%s creation:%s' % (
139 self._name, controller_name, test_name, creation_time)
140 return otp
142 def Create(self):
143 """Creates the task machine."""
144 logging.info('Creating %s', self.name)
145 self._connect_event.clear()
146 self._ExecuteSwarming()
148 def WaitForConnection(self):
149 """Waits for the task machine to connect.
151 Raises:
152 ConnectionTimeoutError if the task doesn't connect in time.
154 logging.info('Waiting for %s to connect with a timeout of %d seconds',
155 self._name, self._connection_timeout_secs)
156 self._connect_event.wait(self._connection_timeout_secs)
157 if not self._connect_event.is_set():
158 raise ConnectionTimeoutError('%s failed to connect' % self.name)
160 def Release(self):
161 """Quits the task's RPC server so it can release the machine."""
162 if self._rpc is not None and self._connected:
163 logging.info('Releasing %s', self._name)
164 try:
165 self._rpc.Quit()
166 except (socket.error, xmlrpclib.Fault):
167 logging.error('Unable to connect to %s to call Quit', self.name)
168 self._rpc = None
169 self._connected = False
171 def _ExecuteSwarming(self):
172 """Executes swarming.py."""
173 cmd = [
174 'python',
175 SWARMING_PY,
176 'trigger',
177 self._isolated_hash,
178 '--priority', str(self._priority),
179 '--task-name', self._task_name,
182 if self._isolate_server:
183 cmd.extend(['--isolate-server', self._isolate_server])
184 if self._swarming_server:
185 cmd.extend(['--swarming', self._swarming_server])
186 for key, value in self._dimensions.iteritems():
187 cmd.extend(['--dimension', key, value])
189 cmd.extend([
190 '--',
191 '--controller', common_lib.MY_IP,
192 '--otp', self._otp,
193 '--verbosity', self._verbosity,
194 '--idle-timeout', str(self._idle_timeout_secs),
197 self._ExecuteProcess(cmd)
199 def _ExecuteProcess(self, cmd):
200 """Executes a process, waits for it to complete, and checks for success."""
201 logging.debug('Running %s', ' '.join(cmd))
202 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
203 _, stderr = p.communicate()
204 if p.returncode != 0:
205 raise Error(stderr)
207 def OnConnect(self, ip_address):
208 """Receives task ip address on connection."""
209 self._ip_address = ip_address
210 self._connected = True
211 self._rpc = common_lib.ConnectToServer(self._ip_address)
212 logging.info('%s connected from %s', self._name, ip_address)
213 self._connect_event.set()