1 # Copyright 2015 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Defines the task controller library."""
17 #pylint: disable=relative-import
23 ISOLATE_PY
= os
.path
.join(common_lib
.SWARMING_DIR
, 'isolate.py')
24 SWARMING_PY
= os
.path
.join(common_lib
.SWARMING_DIR
, 'swarming.py')
27 class Error(Exception):
31 class ConnectionTimeoutError(Error
):
35 class TaskController(object):
36 """Provisions, configures, and controls a task machine.
38 This class is an abstraction of a physical task machine. It provides an
39 end to end API for controlling a task machine. Operations on the task machine
40 are performed using the instance's "rpc" property. A simple end to end
41 scenario is as follows:
43 task = TaskController(...)
45 task.WaitForConnection()
46 proc = task.rpc.subprocess.Popen(['ls'])
47 print task.rpc.subprocess.GetStdout(proc)
54 def __init__(self
, isolated_hash
, dimensions
, priority
=100,
55 idle_timeout_secs
=common_lib
.DEFAULT_TIMEOUT_SECS
,
56 connection_timeout_secs
=common_lib
.DEFAULT_TIMEOUT_SECS
,
57 verbosity
='ERROR', name
=None, run_id
=None):
58 assert isinstance(dimensions
, dict)
59 type(self
)._tasks
.append(self
)
60 type(self
)._task
_count
+= 1
61 self
.verbosity
= verbosity
62 self
._name
= name
or 'Task%d' % type(self
)._task
_count
63 self
._priority
= priority
64 self
._isolated
_hash
= isolated_hash
65 self
._idle
_timeout
_secs
= idle_timeout_secs
66 self
._dimensions
= dimensions
67 self
._connect
_event
= threading
.Event()
68 self
._connected
= False
69 self
._ip
_address
= None
70 self
._otp
= self
._CreateOTP
()
72 self
._output
_dir
= None
74 self
._executable
= None
76 run_id
= run_id
or datetime
.datetime
.now().strftime('%Y-%m-%d-%H-%M-%S')
77 self
._task
_name
= '%s/%s/%s' % (
78 os
.path
.splitext(sys
.argv
[0])[0], self
._name
, run_id
)
80 parser
= argparse
.ArgumentParser()
81 parser
.add_argument('--isolate-server')
82 parser
.add_argument('--swarming-server')
83 parser
.add_argument('--task-connection-timeout-secs',
84 default
=common_lib
.DEFAULT_TIMEOUT_SECS
)
85 args
, _
= parser
.parse_known_args()
87 self
._isolate
_server
= args
.isolate_server
88 self
._swarming
_server
= args
.swarming_server
89 self
._connection
_timeout
_secs
= (connection_timeout_secs
or
90 args
.task_connection_timeout_secs
)
102 return self
._connected
105 def connect_event(self
):
106 return self
._connect
_event
114 return self
._verbosity
117 def verbosity(self
, level
):
118 """Sets the verbosity level as a string.
120 Either a string ('INFO', 'DEBUG', etc) or a logging level (logging.INFO,
121 logging.DEBUG, etc) is allowed.
123 assert isinstance(level
, (str, int))
124 if isinstance(level
, int):
125 level
= logging
.getLevelName(level
)
126 self
._verbosity
= level
#pylint: disable=attribute-defined-outside-init
129 def output_dir(self
):
130 if not self
._output
_dir
:
131 self
._output
_dir
= self
.rpc
.GetOutputDir()
132 return self
._output
_dir
136 if not self
._platform
:
137 self
._platform
= self
._rpc
.GetPlatform()
138 return self
._platform
141 def ip_address(self
):
142 if not self
._ip
_address
:
143 self
._ip
_address
= self
.rpc
.GetIpAddress()
144 return self
._ip
_address
147 def executable(self
):
148 if not self
._executable
:
149 self
._executable
= self
.rpc
.GetExecutable()
150 return self
._executable
153 def ReleaseAllTasks(cls
):
154 for task
in cls
._tasks
:
157 def Process(self
, cmd
, *args
, **kwargs
):
158 return process
.ControllerProcessWrapper(self
.rpc
, cmd
, *args
, **kwargs
)
160 def _CreateOTP(self
):
161 """Creates the OTP."""
162 controller_name
= socket
.gethostname()
163 test_name
= os
.path
.basename(sys
.argv
[0])
164 creation_time
= datetime
.datetime
.utcnow()
165 otp
= 'task:%s controller:%s test:%s creation:%s' % (
166 self
._name
, controller_name
, test_name
, creation_time
)
170 """Creates the task machine."""
171 logging
.info('Creating %s', self
.name
)
172 self
._connect
_event
.clear()
173 self
._ExecuteSwarming
()
175 def WaitForConnection(self
):
176 """Waits for the task machine to connect.
179 ConnectionTimeoutError if the task doesn't connect in time.
181 logging
.info('Waiting for %s to connect with a timeout of %d seconds',
182 self
._name
, self
._connection
_timeout
_secs
)
183 self
._connect
_event
.wait(self
._connection
_timeout
_secs
)
184 if not self
._connect
_event
.is_set():
185 raise ConnectionTimeoutError('%s failed to connect' % self
.name
)
188 """Quits the task's RPC server so it can release the machine."""
189 if self
._rpc
is not None and self
._connected
:
190 logging
.info('Copying output-dir files to controller')
191 self
.RetrieveOutputFiles()
192 logging
.info('Releasing %s', self
._name
)
195 except (socket
.error
, jsonrpclib
.Fault
):
196 logging
.error('Unable to connect to %s to call Quit', self
.name
)
198 self
._connected
= False
200 def _ExecuteSwarming(self
):
201 """Executes swarming.py."""
207 '--priority', str(self
._priority
),
208 '--task-name', self
._task
_name
,
211 if self
._isolate
_server
:
212 cmd
.extend(['--isolate-server', self
._isolate
_server
])
213 if self
._swarming
_server
:
214 cmd
.extend(['--swarming', self
._swarming
_server
])
215 for key
, value
in self
._dimensions
.iteritems():
216 cmd
.extend(['--dimension', key
, value
])
220 '--controller', common_lib
.MY_IP
,
222 '--verbosity', self
._verbosity
,
223 '--idle-timeout', str(self
._idle
_timeout
_secs
),
224 '--output-dir', '${ISOLATED_OUTDIR}'
227 self
._ExecuteProcess
(cmd
)
229 def _ExecuteProcess(self
, cmd
):
230 """Executes a process, waits for it to complete, and checks for success."""
231 logging
.debug('Running %s', ' '.join(cmd
))
232 p
= subprocess
.Popen(cmd
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
233 _
, stderr
= p
.communicate()
234 if p
.returncode
!= 0:
237 def OnConnect(self
, ip_address
):
238 """Receives task ip address on connection."""
239 self
._ip
_address
= ip_address
240 self
._connected
= True
241 self
._rpc
= rpc_server
.RpcServer
.Connect(self
._ip
_address
)
242 logging
.info('%s connected from %s', self
._name
, ip_address
)
243 self
._connect
_event
.set()
245 def RetrieveOutputFiles(self
):
246 """Retrieves all files in the output-dir."""
247 files
= self
.rpc
.ListDir(self
.output_dir
)
249 remote_path
= self
.rpc
.PathJoin(self
.output_dir
, fname
)
250 local_name
= os
.path
.join(common_lib
.GetOutputDir(),
251 '%s.%s' % (self
.name
, fname
))
252 contents
= self
.rpc
.ReadFile(remote_path
)
253 with
open(local_name
, 'wb+') as fh
: