cygprofile: increase timeouts to allow showing web contents
[chromium-blink-merge.git] / build / android / pylib / base / test_collection.py
blobde510272bd71918d5371e0d27f9bf44ceae83649
1 # Copyright 2013 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 import threading
7 class TestCollection(object):
8 """A threadsafe collection of tests.
10 Args:
11 tests: List of tests to put in the collection.
12 """
14 def __init__(self, tests=None):
15 if not tests:
16 tests = []
17 self._lock = threading.Lock()
18 self._tests = []
19 self._tests_in_progress = 0
20 # Used to signal that an item is available or all items have been handled.
21 self._item_available_or_all_done = threading.Event()
22 for t in tests:
23 self.add(t)
25 def _pop(self):
26 """Pop a test from the collection.
28 Waits until a test is available or all tests have been handled.
30 Returns:
31 A test or None if all tests have been handled.
32 """
33 while True:
34 # Wait for a test to be available or all tests to have been handled.
35 self._item_available_or_all_done.wait()
36 with self._lock:
37 # Check which of the two conditions triggered the signal.
38 if self._tests_in_progress == 0:
39 return None
40 try:
41 return self._tests.pop(0)
42 except IndexError:
43 # Another thread beat us to the available test, wait again.
44 self._item_available_or_all_done.clear()
46 def add(self, test):
47 """Add a test to the collection.
49 Args:
50 test: A test to add.
51 """
52 with self._lock:
53 self._tests.append(test)
54 self._item_available_or_all_done.set()
55 self._tests_in_progress += 1
57 def test_completed(self):
58 """Indicate that a test has been fully handled."""
59 with self._lock:
60 self._tests_in_progress -= 1
61 if self._tests_in_progress == 0:
62 # All tests have been handled, signal all waiting threads.
63 self._item_available_or_all_done.set()
65 def __iter__(self):
66 """Iterate through tests in the collection until all have been handled."""
67 while True:
68 r = self._pop()
69 if r is None:
70 break
71 yield r
73 def __len__(self):
74 """Return the number of tests currently in the collection."""
75 return len(self._tests)
77 def test_names(self):
78 """Return a list of the names of the tests currently in the collection."""
79 with self._lock:
80 return list(t.test for t in self._tests)