[lit] Remove unnecessary tracking of test_index
[llvm-complete.git] / utils / lit / lit / run.py
blob29335bd59ad0ad908dbdf5447b61d8483508774f
1 import multiprocessing
2 import time
4 import lit.Test
5 import lit.util
6 import lit.worker
8 # No-operation semaphore for supporting `None` for parallelism_groups.
9 # lit_config.parallelism_groups['my_group'] = None
10 class NopSemaphore(object):
11 def acquire(self): pass
12 def release(self): pass
14 def create_run(tests, lit_config, workers, progress_callback, timeout=None):
15 # TODO(yln) assert workers > 0
16 if workers == 1:
17 return SerialRun(tests, lit_config, progress_callback, timeout)
18 return ParallelRun(tests, lit_config, progress_callback, timeout, workers)
20 class Run(object):
21 """A concrete, configured testing run."""
23 def __init__(self, tests, lit_config, progress_callback, timeout):
24 self.tests = tests
25 self.lit_config = lit_config
26 self.progress_callback = progress_callback
27 self.timeout = timeout
29 def execute(self):
30 """
31 Execute the tests in the run using up to the specified number of
32 parallel tasks, and inform the caller of each individual result. The
33 provided tests should be a subset of the tests available in this run
34 object.
36 The progress_callback will be invoked for each completed test.
38 If timeout is non-None, it should be a time in seconds after which to
39 stop executing tests.
41 Returns the elapsed testing time.
43 Upon completion, each test in the run will have its result
44 computed. Tests which were not actually executed (for any reason) will
45 be given an UNRESOLVED result.
46 """
47 if not self.tests:
48 return 0.0
50 self.failure_count = 0
51 self.hit_max_failures = False
53 one_year = 365 * 24 * 60 * 60 # days * hours * minutes * seconds
54 timeout = self.timeout or one_year
56 start = time.time()
57 deadline = start + timeout
58 self._execute(deadline)
59 end = time.time()
61 # Mark any tests that weren't run as UNRESOLVED.
62 for test in self.tests:
63 if test.result is None:
64 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
66 return end - start
68 def _consume_test_result(self, test, result):
69 """Test completion callback for lit.worker.run_one_test
71 Updates the test result status in the parent process. Each task in the
72 pool returns the test index and the result, and we use the index to look
73 up the original test object. Also updates the progress bar as tasks
74 complete.
75 """
76 # Don't add any more test results after we've hit the maximum failure
77 # count. Otherwise we're racing with the main thread, which is going
78 # to terminate the process pool soon.
79 if self.hit_max_failures:
80 return
82 # Update the parent process copy of the test. This includes the result,
83 # XFAILS, REQUIRES, and UNSUPPORTED statuses.
84 test.setResult(result)
86 self.progress_callback(test)
88 # If we've finished all the tests or too many tests have failed, notify
89 # the main thread that we've stopped testing.
90 self.failure_count += (result.code == lit.Test.FAIL)
91 if self.lit_config.maxFailures and \
92 self.failure_count == self.lit_config.maxFailures:
93 self.hit_max_failures = True
95 class SerialRun(Run):
96 def __init__(self, tests, lit_config, progress_callback, timeout):
97 super(SerialRun, self).__init__(tests, lit_config, progress_callback, timeout)
99 def _execute(self, deadline):
100 # TODO(yln): ignores deadline
101 for test in self.tests:
102 result = lit.worker._execute_test(test, self.lit_config)
103 self._consume_test_result(test, result)
104 if self.hit_max_failures:
105 break
107 class ParallelRun(Run):
108 def __init__(self, tests, lit_config, progress_callback, timeout, workers):
109 super(ParallelRun, self).__init__(tests, lit_config, progress_callback, timeout)
110 self.workers = workers
112 def _execute(self, deadline):
113 semaphores = {
114 k: NopSemaphore() if v is None else
115 multiprocessing.BoundedSemaphore(v) for k, v in
116 self.lit_config.parallelism_groups.items()}
118 # Start a process pool. Copy over the data shared between all test runs.
119 # FIXME: Find a way to capture the worker process stderr. If the user
120 # interrupts the workers before we make it into our task callback, they
121 # will each raise a KeyboardInterrupt exception and print to stderr at
122 # the same time.
123 pool = multiprocessing.Pool(self.workers, lit.worker.initializer,
124 (self.lit_config, semaphores))
126 # Install a console-control signal handler on Windows.
127 if lit.util.win32api is not None:
128 def console_ctrl_handler(type):
129 print('\nCtrl-C detected, terminating.')
130 pool.terminate()
131 pool.join()
132 lit.util.abort_now()
133 return True
134 lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
136 try:
137 async_results = [pool.apply_async(lit.worker.run_one_test,
138 args=(test,),
139 callback=lambda r,t=test: self._consume_test_result(t, r))
140 for test in self.tests]
141 pool.close()
143 # Wait for all results to come in. The callback that runs in the
144 # parent process will update the display.
145 for a in async_results:
146 timeout = deadline - time.time()
147 a.wait(timeout)
148 if not a.successful():
149 # TODO(yln): this also raises on a --max-time time
150 a.get() # Exceptions raised here come from the worker.
151 if self.hit_max_failures:
152 break
153 except:
154 # Stop the workers and wait for any straggling results to come in
155 # if we exited without waiting on every async result.
156 pool.terminate()
157 raise
158 finally:
159 pool.join()