Don't show supervised user as "already on this device" while they're being imported.
[chromium-blink-merge.git] / build / android / pylib / base / test_dispatcher.py
blobf91996512865bf2db7393382cb7f25d2510edf57
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Dispatches tests, either sharding or replicating them.
7 Performs the following steps:
8 * Create a test collection factory, using the given tests
9 - If sharding: test collection factory returns the same shared test collection
10 to all test runners
11 - If replciating: test collection factory returns a unique test collection to
12 each test runner, with the same set of tests in each.
13 * Create a test runner for each device.
14 * Run each test runner in its own thread, grabbing tests from the test
15 collection until there are no tests left.
16 """
18 # TODO(jbudorick) Deprecate and remove this class after any relevant parts have
19 # been ported to the new environment / test instance model.
21 import logging
22 import threading
24 from pylib import constants
25 from pylib.base import base_test_result
26 from pylib.base import test_collection
27 from pylib.device import device_errors
28 from pylib.utils import reraiser_thread
29 from pylib.utils import watchdog_timer
32 DEFAULT_TIMEOUT = 7 * 60 # seven minutes
35 class _ThreadSafeCounter(object):
36 """A threadsafe counter."""
38 def __init__(self):
39 self._lock = threading.Lock()
40 self._value = 0
42 def GetAndIncrement(self):
43 """Get the current value and increment it atomically.
45 Returns:
46 The value before incrementing.
47 """
48 with self._lock:
49 pre_increment = self._value
50 self._value += 1
51 return pre_increment
54 class _Test(object):
55 """Holds a test with additional metadata."""
57 def __init__(self, test, tries=0):
58 """Initializes the _Test object.
60 Args:
61 test: The test.
62 tries: Number of tries so far.
63 """
64 self.test = test
65 self.tries = tries
68 def _RunTestsFromQueue(runner, collection, out_results, watcher,
69 num_retries, tag_results_with_device=False):
70 """Runs tests from the collection until empty using the given runner.
72 Adds TestRunResults objects to the out_results list and may add tests to the
73 out_retry list.
75 Args:
76 runner: A TestRunner object used to run the tests.
77 collection: A TestCollection from which to get _Test objects to run.
78 out_results: A list to add TestRunResults to.
79 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout.
80 num_retries: Number of retries for a test.
81 tag_results_with_device: If True, appends the name of the device on which
82 the test was run to the test name. Used when replicating to identify
83 which device ran each copy of the test, and to ensure each copy of the
84 test is recorded separately.
85 """
87 def TagTestRunResults(test_run_results):
88 """Tags all results with the last 4 digits of the device id.
90 Used when replicating tests to distinguish the same tests run on different
91 devices. We use a set to store test results, so the hash (generated from
92 name and tag) must be unique to be considered different results.
93 """
94 new_test_run_results = base_test_result.TestRunResults()
95 for test_result in test_run_results.GetAll():
96 test_result.SetName('%s_%s' % (runner.device_serial[-4:],
97 test_result.GetName()))
98 new_test_run_results.AddResult(test_result)
99 return new_test_run_results
101 for test in collection:
102 watcher.Reset()
103 try:
104 if not runner.device.IsOnline():
105 # Device is unresponsive, stop handling tests on this device.
106 msg = 'Device %s is unresponsive.' % runner.device_serial
107 logging.warning(msg)
108 raise device_errors.DeviceUnreachableError(msg)
109 result, retry = runner.RunTest(test.test)
110 if tag_results_with_device:
111 result = TagTestRunResults(result)
112 test.tries += 1
113 if retry and test.tries <= num_retries:
114 # Retry non-passing results, only record passing results.
115 pass_results = base_test_result.TestRunResults()
116 pass_results.AddResults(result.GetPass())
117 out_results.append(pass_results)
118 logging.warning('Will retry test %s, try #%s.', retry, test.tries)
119 collection.add(_Test(test=retry, tries=test.tries))
120 else:
121 # All tests passed or retry limit reached. Either way, record results.
122 out_results.append(result)
123 except:
124 # An unhandleable exception, ensure tests get run by another device and
125 # reraise this exception on the main thread.
126 collection.add(test)
127 raise
128 finally:
129 # Retries count as separate tasks so always mark the popped test as done.
130 collection.test_completed()
133 def _SetUp(runner_factory, device, out_runners, threadsafe_counter):
134 """Creates a test runner for each device and calls SetUp() in parallel.
136 Note: if a device is unresponsive the corresponding TestRunner will not be
137 added to out_runners.
139 Args:
140 runner_factory: Callable that takes a device and index and returns a
141 TestRunner object.
142 device: The device serial number to set up.
143 out_runners: List to add the successfully set up TestRunner object.
144 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices.
146 try:
147 index = threadsafe_counter.GetAndIncrement()
148 logging.warning('Creating shard %s for device %s.', index, device)
149 runner = runner_factory(device, index)
150 runner.SetUp()
151 out_runners.append(runner)
152 except device_errors.DeviceUnreachableError as e:
153 logging.warning('Failed to create shard for %s: [%s]', device, e)
156 def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None,
157 tag_results_with_device=False):
158 """Run all tests using the given TestRunners.
160 Args:
161 runners: A list of TestRunner objects.
162 test_collection_factory: A callable to generate a TestCollection object for
163 each test runner.
164 num_retries: Number of retries for a test.
165 timeout: Watchdog timeout in seconds.
166 tag_results_with_device: If True, appends the name of the device on which
167 the test was run to the test name. Used when replicating to identify
168 which device ran each copy of the test, and to ensure each copy of the
169 test is recorded separately.
171 Returns:
172 A tuple of (TestRunResults object, exit code)
174 logging.warning('Running tests with %s test runners.' % (len(runners)))
175 results = []
176 exit_code = 0
177 run_results = base_test_result.TestRunResults()
178 watcher = watchdog_timer.WatchdogTimer(timeout)
179 test_collections = [test_collection_factory() for _ in runners]
181 threads = [
182 reraiser_thread.ReraiserThread(
183 _RunTestsFromQueue,
184 [r, tc, results, watcher, num_retries, tag_results_with_device],
185 name=r.device_serial[-4:])
186 for r, tc in zip(runners, test_collections)]
188 workers = reraiser_thread.ReraiserThreadGroup(threads)
189 workers.StartAll()
191 # Catch DeviceUnreachableErrors and set a warning exit code
192 try:
193 workers.JoinAll(watcher)
194 except device_errors.DeviceUnreachableError as e:
195 logging.error(e)
197 if not all((len(tc) == 0 for tc in test_collections)):
198 logging.error('Only ran %d tests (all devices are likely offline).' %
199 len(results))
200 for tc in test_collections:
201 run_results.AddResults(base_test_result.BaseTestResult(
202 t, base_test_result.ResultType.UNKNOWN) for t in tc.test_names())
204 for r in results:
205 run_results.AddTestRunResults(r)
206 if not run_results.DidRunPass():
207 exit_code = constants.ERROR_EXIT_CODE
208 return (run_results, exit_code)
211 def _CreateRunners(runner_factory, devices, timeout=None):
212 """Creates a test runner for each device and calls SetUp() in parallel.
214 Note: if a device is unresponsive the corresponding TestRunner will not be
215 included in the returned list.
217 Args:
218 runner_factory: Callable that takes a device and index and returns a
219 TestRunner object.
220 devices: List of device serial numbers as strings.
221 timeout: Watchdog timeout in seconds, defaults to the default timeout.
223 Returns:
224 A list of TestRunner objects.
226 logging.warning('Creating %s test runners.' % len(devices))
227 runners = []
228 counter = _ThreadSafeCounter()
229 threads = reraiser_thread.ReraiserThreadGroup(
230 [reraiser_thread.ReraiserThread(_SetUp,
231 [runner_factory, d, runners, counter],
232 name=str(d)[-4:])
233 for d in devices])
234 threads.StartAll()
235 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
236 return runners
239 def _TearDownRunners(runners, timeout=None):
240 """Calls TearDown() for each test runner in parallel.
242 Args:
243 runners: A list of TestRunner objects.
244 timeout: Watchdog timeout in seconds, defaults to the default timeout.
246 threads = reraiser_thread.ReraiserThreadGroup(
247 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:])
248 for r in runners])
249 threads.StartAll()
250 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout))
253 def ApplyMaxPerRun(tests, max_per_run):
254 """Rearrange the tests so that no group contains more than max_per_run tests.
256 Args:
257 tests:
258 max_per_run:
260 Returns:
261 A list of tests with no more than max_per_run per run.
263 tests_expanded = []
264 for test_group in tests:
265 if type(test_group) != str:
266 # Do not split test objects which are not strings.
267 tests_expanded.append(test_group)
268 else:
269 test_split = test_group.split(':')
270 for i in range(0, len(test_split), max_per_run):
271 tests_expanded.append(':'.join(test_split[i:i+max_per_run]))
272 return tests_expanded
275 def RunTests(tests, runner_factory, devices, shard=True,
276 test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT,
277 num_retries=2, max_per_run=256):
278 """Run all tests on attached devices, retrying tests that don't pass.
280 Args:
281 tests: List of tests to run.
282 runner_factory: Callable that takes a device and index and returns a
283 TestRunner object.
284 devices: List of attached devices.
285 shard: True if we should shard, False if we should replicate tests.
286 - Sharding tests will distribute tests across all test runners through a
287 shared test collection.
288 - Replicating tests will copy all tests to each test runner through a
289 unique test collection for each test runner.
290 test_timeout: Watchdog timeout in seconds for running tests.
291 setup_timeout: Watchdog timeout in seconds for creating and cleaning up
292 test runners.
293 num_retries: Number of retries for a test.
294 max_per_run: Maximum number of tests to run in any group.
296 Returns:
297 A tuple of (base_test_result.TestRunResults object, exit code).
299 if not tests:
300 logging.critical('No tests to run.')
301 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE)
303 tests_expanded = ApplyMaxPerRun(tests, max_per_run)
304 if shard:
305 # Generate a shared TestCollection object for all test runners, so they
306 # draw from a common pool of tests.
307 shared_test_collection = test_collection.TestCollection(
308 [_Test(t) for t in tests_expanded])
309 test_collection_factory = lambda: shared_test_collection
310 tag_results_with_device = False
311 log_string = 'sharded across devices'
312 else:
313 # Generate a unique TestCollection object for each test runner, but use
314 # the same set of tests.
315 test_collection_factory = lambda: test_collection.TestCollection(
316 [_Test(t) for t in tests_expanded])
317 tag_results_with_device = True
318 log_string = 'replicated on each device'
320 logging.info('Will run %d tests (%s): %s',
321 len(tests_expanded), log_string, str(tests_expanded))
322 runners = _CreateRunners(runner_factory, devices, setup_timeout)
323 try:
324 return _RunAllTests(runners, test_collection_factory,
325 num_retries, test_timeout, tag_results_with_device)
326 finally:
327 try:
328 _TearDownRunners(runners, setup_timeout)
329 except device_errors.DeviceUnreachableError as e:
330 logging.warning('Device unresponsive during TearDown: [%s]', e)
331 except Exception as e:
332 logging.error('Unexpected exception caught during TearDown: %s' % str(e))