updated copyright on README
[scons.git] / SCons / Taskmaster / Job.py
blobfae985e16f6d4b36b231d76e2d7a9259a0ffc6fc
1 # MIT License
3 # Copyright The SCons Foundation
5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 """Serial and Parallel classes to execute build tasks.
26 The Jobs class provides a higher level interface to start,
27 stop, and wait on jobs.
28 """
30 import SCons.compat
32 import logging
33 import os
34 import queue
35 import signal
36 import sys
37 import threading
39 from enum import Enum
41 import SCons.Errors
42 import SCons.Warnings
45 # The default stack size (in kilobytes) of the threads used to execute
46 # jobs in parallel.
48 # We use a stack size of 256 kilobytes. The default on some platforms
49 # is too large and prevents us from creating enough threads to fully
50 # parallelized the build. For example, the default stack size on linux
51 # is 8 MBytes.
53 explicit_stack_size = None
54 default_stack_size = 256
56 interrupt_msg = 'Build interrupted.'
58 class InterruptState:
59 def __init__(self) -> None:
60 self.interrupted = False
62 def set(self) -> None:
63 self.interrupted = True
65 def __call__(self):
66 return self.interrupted
69 class Jobs:
70 """An instance of this class initializes N jobs, and provides
71 methods for starting, stopping, and waiting on all N jobs.
72 """
74 def __init__(self, num, taskmaster) -> None:
75 """
76 Create 'num' jobs using the given taskmaster. The exact implementation
77 used varies with the number of jobs requested and the state of the `legacy_sched` flag
78 to `--experimental`.
79 """
81 # Importing GetOption here instead of at top of file to avoid
82 # circular imports
83 # pylint: disable=import-outside-toplevel
84 from SCons.Script import GetOption
86 stack_size = explicit_stack_size
87 if stack_size is None:
88 stack_size = default_stack_size
90 experimental_option = GetOption('experimental') or []
91 if 'legacy_sched' in experimental_option:
92 if num > 1:
93 self.job = LegacyParallel(taskmaster, num, stack_size)
94 else:
95 self.job = Serial(taskmaster)
96 else:
97 self.job = NewParallel(taskmaster, num, stack_size)
99 self.num_jobs = num
101 def run(self, postfunc=lambda: None) -> None:
102 """Run the jobs.
104 postfunc() will be invoked after the jobs has run. It will be
105 invoked even if the jobs are interrupted by a keyboard
106 interrupt (well, in fact by a signal such as either SIGINT,
107 SIGTERM or SIGHUP). The execution of postfunc() is protected
108 against keyboard interrupts and is guaranteed to run to
109 completion."""
110 self._setup_sig_handler()
111 try:
112 self.job.start()
113 finally:
114 postfunc()
115 self._reset_sig_handler()
117 def were_interrupted(self):
118 """Returns whether the jobs were interrupted by a signal."""
119 return self.job.interrupted()
121 def _setup_sig_handler(self) -> None:
122 """Setup an interrupt handler so that SCons can shutdown cleanly in
123 various conditions:
125 a) SIGINT: Keyboard interrupt
126 b) SIGTERM: kill or system shutdown
127 c) SIGHUP: Controlling shell exiting
129 We handle all of these cases by stopping the taskmaster. It
130 turns out that it's very difficult to stop the build process
131 by throwing asynchronously an exception such as
132 KeyboardInterrupt. For example, the python Condition
133 variables (threading.Condition) and queues do not seem to be
134 asynchronous-exception-safe. It would require adding a whole
135 bunch of try/finally block and except KeyboardInterrupt all
136 over the place.
138 Note also that we have to be careful to handle the case when
139 SCons forks before executing another process. In that case, we
140 want the child to exit immediately.
142 def handler(signum, stack, self=self, parentpid=os.getpid()) -> None:
143 if os.getpid() == parentpid:
144 self.job.taskmaster.stop()
145 self.job.interrupted.set()
146 else:
147 os._exit(2) # pylint: disable=protected-access
149 self.old_sigint = signal.signal(signal.SIGINT, handler)
150 self.old_sigterm = signal.signal(signal.SIGTERM, handler)
151 try:
152 self.old_sighup = signal.signal(signal.SIGHUP, handler)
153 except AttributeError:
154 pass
155 if (self.old_sigint is None) or (self.old_sigterm is None) or \
156 (hasattr(self, "old_sighup") and self.old_sighup is None):
157 msg = "Overwritting previous signal handler which was not installed from Python. " + \
158 "Will not be able to reinstate and so will return to default handler."
159 SCons.Warnings.warn(SCons.Warnings.SConsWarning, msg)
161 def _reset_sig_handler(self) -> None:
162 """Restore the signal handlers to their previous state (before the
163 call to _setup_sig_handler()."""
164 sigint_to_use = self.old_sigint if self.old_sigint is not None else signal.SIG_DFL
165 sigterm_to_use = self.old_sigterm if self.old_sigterm is not None else signal.SIG_DFL
166 signal.signal(signal.SIGINT, sigint_to_use)
167 signal.signal(signal.SIGTERM, sigterm_to_use)
168 try:
169 sigterm_to_use = self.old_sighup if self.old_sighup is not None else signal.SIG_DFL
170 signal.signal(signal.SIGHUP, sigterm_to_use)
171 except AttributeError:
172 pass
175 class Serial:
176 """This class is used to execute tasks in series, and is more efficient
177 than Parallel, but is only appropriate for non-parallel builds. Only
178 one instance of this class should be in existence at a time.
180 This class is not thread safe.
183 def __init__(self, taskmaster) -> None:
184 """Create a new serial job given a taskmaster.
186 The taskmaster's next_task() method should return the next task
187 that needs to be executed, or None if there are no more tasks. The
188 taskmaster's executed() method will be called for each task when it
189 is successfully executed, or failed() will be called if it failed to
190 execute (e.g. execute() raised an exception)."""
192 self.taskmaster = taskmaster
193 self.interrupted = InterruptState()
195 def start(self):
196 """Start the job. This will begin pulling tasks from the taskmaster
197 and executing them, and return when there are no more tasks. If a task
198 fails to execute (i.e. execute() raises an exception), then the job will
199 stop."""
201 while True:
202 task = self.taskmaster.next_task()
204 if task is None:
205 break
207 try:
208 task.prepare()
209 if task.needs_execute():
210 task.execute()
211 except Exception:
212 if self.interrupted():
213 try:
214 raise SCons.Errors.BuildError(
215 task.targets[0], errstr=interrupt_msg)
216 except Exception:
217 task.exception_set()
218 else:
219 task.exception_set()
221 # Let the failed() callback function arrange for the
222 # build to stop if that's appropriate.
223 task.failed()
224 else:
225 task.executed()
227 task.postprocess()
228 self.taskmaster.cleanup()
231 class Worker(threading.Thread):
232 """A worker thread waits on a task to be posted to its request queue,
233 dequeues the task, executes it, and posts a tuple including the task
234 and a boolean indicating whether the task executed successfully. """
236 def __init__(self, requestQueue, resultsQueue, interrupted) -> None:
237 super().__init__()
238 self.daemon = True
239 self.requestQueue = requestQueue
240 self.resultsQueue = resultsQueue
241 self.interrupted = interrupted
242 self.start()
244 def run(self):
245 while True:
246 task = self.requestQueue.get()
248 if task is None:
249 # The "None" value is used as a sentinel by
250 # ThreadPool.cleanup(). This indicates that there
251 # are no more tasks, so we should quit.
252 break
254 try:
255 if self.interrupted():
256 raise SCons.Errors.BuildError(
257 task.targets[0], errstr=interrupt_msg)
258 task.execute()
259 except Exception:
260 task.exception_set()
261 ok = False
262 else:
263 ok = True
265 self.resultsQueue.put((task, ok))
267 class ThreadPool:
268 """This class is responsible for spawning and managing worker threads."""
270 def __init__(self, num, stack_size, interrupted) -> None:
271 """Create the request and reply queues, and 'num' worker threads.
273 One must specify the stack size of the worker threads. The
274 stack size is specified in kilobytes.
276 self.requestQueue = queue.Queue(0)
277 self.resultsQueue = queue.Queue(0)
279 try:
280 prev_size = threading.stack_size(stack_size * 1024)
281 except RuntimeError as e:
282 # Only print a warning if the stack size has been explicitly set.
283 if explicit_stack_size is not None:
284 msg = "Setting stack size is unsupported by this version of Python:\n " + \
285 e.args[0]
286 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
287 except ValueError as e:
288 msg = "Setting stack size failed:\n " + str(e)
289 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
291 # Create worker threads
292 self.workers = []
293 for _ in range(num):
294 worker = Worker(self.requestQueue, self.resultsQueue, interrupted)
295 self.workers.append(worker)
297 if 'prev_size' in locals():
298 threading.stack_size(prev_size)
300 def put(self, task) -> None:
301 """Put task into request queue."""
302 self.requestQueue.put(task)
304 def get(self):
305 """Remove and return a result tuple from the results queue."""
306 return self.resultsQueue.get()
308 def preparation_failed(self, task) -> None:
309 self.resultsQueue.put((task, False))
311 def cleanup(self) -> None:
313 Shuts down the thread pool, giving each worker thread a
314 chance to shut down gracefully.
316 # For each worker thread, put a sentinel "None" value
317 # on the requestQueue (indicating that there's no work
318 # to be done) so that each worker thread will get one and
319 # terminate gracefully.
320 for _ in self.workers:
321 self.requestQueue.put(None)
323 # Wait for all of the workers to terminate.
325 # If we don't do this, later Python versions (2.4, 2.5) often
326 # seem to raise exceptions during shutdown. This happens
327 # in requestQueue.get(), as an assertion failure that
328 # requestQueue.not_full is notified while not acquired,
329 # seemingly because the main thread has shut down (or is
330 # in the process of doing so) while the workers are still
331 # trying to pull sentinels off the requestQueue.
333 # Normally these terminations should happen fairly quickly,
334 # but we'll stick a one-second timeout on here just in case
335 # someone gets hung.
336 for worker in self.workers:
337 worker.join(1.0)
338 self.workers = []
340 class LegacyParallel:
341 """This class is used to execute tasks in parallel, and is somewhat
342 less efficient than Serial, but is appropriate for parallel builds.
344 This class is thread safe.
347 def __init__(self, taskmaster, num, stack_size) -> None:
348 """Create a new parallel job given a taskmaster.
350 The taskmaster's next_task() method should return the next
351 task that needs to be executed, or None if there are no more
352 tasks. The taskmaster's executed() method will be called
353 for each task when it is successfully executed, or failed()
354 will be called if the task failed to execute (i.e. execute()
355 raised an exception).
357 Note: calls to taskmaster are serialized, but calls to
358 execute() on distinct tasks are not serialized, because
359 that is the whole point of parallel jobs: they can execute
360 multiple tasks simultaneously. """
362 self.taskmaster = taskmaster
363 self.interrupted = InterruptState()
364 self.tp = ThreadPool(num, stack_size, self.interrupted)
366 self.maxjobs = num
368 def start(self):
369 """Start the job. This will begin pulling tasks from the
370 taskmaster and executing them, and return when there are no
371 more tasks. If a task fails to execute (i.e. execute() raises
372 an exception), then the job will stop."""
374 jobs = 0
376 while True:
377 # Start up as many available tasks as we're
378 # allowed to.
379 while jobs < self.maxjobs:
380 task = self.taskmaster.next_task()
381 if task is None:
382 break
384 try:
385 # prepare task for execution
386 task.prepare()
387 except Exception:
388 task.exception_set()
389 task.failed()
390 task.postprocess()
391 else:
392 if task.needs_execute():
393 # dispatch task
394 self.tp.put(task)
395 jobs += 1
396 else:
397 task.executed()
398 task.postprocess()
400 if not task and not jobs:
401 break
403 # Let any/all completed tasks finish up before we go
404 # back and put the next batch of tasks on the queue.
405 while True:
406 task, ok = self.tp.get()
407 jobs -= 1
409 if ok:
410 task.executed()
411 else:
412 if self.interrupted():
413 try:
414 raise SCons.Errors.BuildError(
415 task.targets[0], errstr=interrupt_msg)
416 except Exception:
417 task.exception_set()
419 # Let the failed() callback function arrange
420 # for the build to stop if that's appropriate.
421 task.failed()
423 task.postprocess()
425 if self.tp.resultsQueue.empty():
426 break
428 self.tp.cleanup()
429 self.taskmaster.cleanup()
431 # An experimental new parallel scheduler that uses a leaders/followers pattern.
432 class NewParallel:
434 class State(Enum):
435 READY = 0
436 SEARCHING = 1
437 STALLED = 2
438 COMPLETED = 3
440 class Worker(threading.Thread):
441 def __init__(self, owner) -> None:
442 super().__init__()
443 self.daemon = True
444 self.owner = owner
445 self.start()
447 def run(self) -> None:
448 self.owner._work()
450 class FakeLock(object):
451 def lock(self):
452 pass
453 def unlock(self):
454 pass
455 def __enter__(self):
456 pass
457 def __exit__(self, *args):
458 pass
460 class FakeCondition(object):
461 def __init__(self, lock):
462 pass
463 def wait(self):
464 fatal();
465 def notify(self):
466 pass
467 def notify_all(self):
468 pass
469 def __enter__(self):
470 pass
471 def __exit__(self, *args):
472 pass
474 def __init__(self, taskmaster, num, stack_size) -> None:
475 self.taskmaster = taskmaster
476 self.max_workers = num
477 self.stack_size = stack_size
478 self.interrupted = InterruptState()
479 self.workers = []
481 # The `tm_lock` is what ensures that we only have one
482 # thread interacting with the taskmaster at a time. It
483 # also protects access to our state that gets updated
484 # concurrently. The `can_search_cv` is associated with
485 # this mutex.
486 self.tm_lock = (threading.Lock if self.max_workers > 1 else NewParallel.FakeLock)()
488 # Guarded under `tm_lock`.
489 self.jobs = 0
490 self.state = NewParallel.State.READY
492 # The `can_search_cv` is used to manage a leader /
493 # follower pattern for access to the taskmaster, and to
494 # awaken from stalls.
495 self.can_search_cv = (threading.Condition if self.max_workers > 1 else NewParallel.FakeCondition)(self.tm_lock)
497 # The queue of tasks that have completed execution. The
498 # next thread to obtain `tm_lock`` will retire them.
499 self.results_queue_lock = (threading.Lock if self.max_workers > 1 else NewParallel.FakeLock)()
500 self.results_queue = []
502 if self.taskmaster.trace:
503 self.trace = self._setup_logging()
504 else:
505 self.trace = False
507 def _setup_logging(self):
508 jl = logging.getLogger("Job")
509 jl.setLevel(level=logging.DEBUG)
510 jl.addHandler(self.taskmaster.trace.log_handler)
511 return jl
513 def trace_message(self, message) -> None:
514 # This grabs the name of the function which calls trace_message()
515 method_name = sys._getframe(1).f_code.co_name + "():"
516 thread_id=threading.get_ident()
517 self.trace.debug('%s.%s [Thread:%s] %s' % (type(self).__name__, method_name, thread_id, message))
519 def start(self) -> None:
520 if self.max_workers == 1:
521 self._work()
522 else:
523 self._start_worker()
524 while len(self.workers) > 0:
525 self.workers[0].join()
526 self.workers.pop(0)
527 self.taskmaster.cleanup()
529 def _maybe_start_worker(self) -> None:
530 if self.max_workers > 1 and len(self.workers) < self.max_workers:
531 if self.jobs >= len(self.workers):
532 self._start_worker()
534 def _start_worker(self) -> None:
535 prev_size = self._adjust_stack_size()
536 if self.trace:
537 self.trace_message("Starting new worker thread")
538 self.workers.append(NewParallel.Worker(self))
539 self._restore_stack_size(prev_size)
541 def _adjust_stack_size(self):
542 try:
543 prev_size = threading.stack_size(self.stack_size * 1024)
544 return prev_size
545 except AttributeError as e:
546 # Only print a warning if the stack size has been
547 # explicitly set.
548 if explicit_stack_size is not None:
549 msg = "Setting stack size is unsupported by this version of Python:\n " + \
550 e.args[0]
551 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
552 except ValueError as e:
553 msg = "Setting stack size failed:\n " + str(e)
554 SCons.Warnings.warn(SCons.Warnings.StackSizeWarning, msg)
556 return None
558 def _restore_stack_size(self, prev_size) -> None:
559 if prev_size is not None:
560 threading.stack_size(prev_size)
562 def _work(self):
564 task = None
566 while True:
568 # Obtain `tm_lock`, granting exclusive access to the taskmaster.
569 with self.can_search_cv:
571 if self.trace:
572 self.trace_message("Gained exclusive access")
574 # Capture whether we got here with `task` set,
575 # then drop our reference to the task as we are no
576 # longer interested in the actual object.
577 completed_task = (task is not None)
578 task = None
580 # We will only have `completed_task` set here if
581 # we have looped back after executing a task. If
582 # we have completed a task and find that we are
583 # stalled, we should speculatively indicate that
584 # we are no longer stalled by transitioning to the
585 # 'ready' state which will bypass the condition
586 # wait so that we immediately process the results
587 # queue and hopefully light up new
588 # work. Otherwise, stay stalled, and we will wait
589 # in the condvar. Some other thread will come back
590 # here with a completed task.
591 if self.state == NewParallel.State.STALLED and completed_task:
592 if self.trace:
593 self.trace_message("Detected stall with completed task, bypassing wait")
594 self.state = NewParallel.State.READY
596 # Wait until we are neither searching nor stalled.
597 while self.state == NewParallel.State.SEARCHING or self.state == NewParallel.State.STALLED:
598 if self.trace:
599 self.trace_message("Search already in progress, waiting")
600 self.can_search_cv.wait()
602 # If someone set the completed flag, bail.
603 if self.state == NewParallel.State.COMPLETED:
604 if self.trace:
605 self.trace_message("Completion detected, breaking from main loop")
606 break
608 # Set the searching flag to indicate that a thread
609 # is currently in the critical section for
610 # taskmaster work.
612 if self.trace:
613 self.trace_message("Starting search")
614 self.state = NewParallel.State.SEARCHING
616 # Bulk acquire the tasks in the results queue
617 # under the result queue lock, then process them
618 # all outside that lock. We need to process the
619 # tasks in the results queue before looking for
620 # new work because we might be unable to find new
621 # work if we don't.
622 results_queue = []
623 with self.results_queue_lock:
624 results_queue, self.results_queue = self.results_queue, results_queue
626 if self.trace:
627 self.trace_message(f"Found {len(results_queue)} completed tasks to process")
628 for (rtask, rresult) in results_queue:
629 if rresult:
630 rtask.executed()
631 else:
632 if self.interrupted():
633 try:
634 raise SCons.Errors.BuildError(
635 rtask.targets[0], errstr=interrupt_msg)
636 except Exception:
637 rtask.exception_set()
639 # Let the failed() callback function arrange
640 # for the build to stop if that's appropriate.
641 rtask.failed()
643 rtask.postprocess()
644 self.jobs -= 1
646 # We are done with any task objects that were in
647 # the results queue.
648 results_queue.clear()
650 # Now, turn the crank on the taskmaster until we
651 # either run out of tasks, or find a task that
652 # needs execution. If we run out of tasks, go idle
653 # until results arrive if jobs are pending, or
654 # mark the walk as complete if not.
655 while self.state == NewParallel.State.SEARCHING:
656 if self.trace:
657 self.trace_message("Searching for new tasks")
658 task = self.taskmaster.next_task()
660 if task:
661 # We found a task. Walk it through the
662 # task lifecycle. If it does not need
663 # execution, just complete the task and
664 # look for the next one. Otherwise,
665 # indicate that we are no longer searching
666 # so we can drop out of this loop, execute
667 # the task outside the lock, and allow
668 # another thread in to search.
669 try:
670 task.prepare()
671 except Exception:
672 task.exception_set()
673 task.failed()
674 task.postprocess()
675 else:
676 if not task.needs_execute():
677 if self.trace:
678 self.trace_message("Found internal task")
679 task.executed()
680 task.postprocess()
681 else:
682 self.jobs += 1
683 if self.trace:
684 self.trace_message("Found task requiring execution")
685 self.state = NewParallel.State.READY
686 self.can_search_cv.notify()
687 # This thread will be busy taking care of
688 # `execute`ing this task. If we haven't
689 # reached the limit, spawn a new thread to
690 # turn the crank and find the next task.
691 self._maybe_start_worker()
693 else:
694 # We failed to find a task, so this thread
695 # cannot continue turning the taskmaster
696 # crank. We must exit the loop.
697 if self.jobs:
698 # No task was found, but there are
699 # outstanding jobs executing that
700 # might unblock new tasks when they
701 # complete. Transition to the stalled
702 # state. We do not need a notify,
703 # because we know there are threads
704 # outstanding that will re-enter the
705 # loop.
707 if self.trace:
708 self.trace_message("Found no task requiring execution, but have jobs: marking stalled")
709 self.state = NewParallel.State.STALLED
710 else:
711 # We didn't find a task and there are
712 # no jobs outstanding, so there is
713 # nothing that will ever return
714 # results which might unblock new
715 # tasks. We can conclude that the walk
716 # is complete. Update our state to
717 # note completion and awaken anyone
718 # sleeping on the condvar.
720 if self.trace:
721 self.trace_message("Found no task requiring execution, and have no jobs: marking complete")
722 self.state = NewParallel.State.COMPLETED
723 self.can_search_cv.notify_all()
725 # We no longer hold `tm_lock` here. If we have a task,
726 # we can now execute it. If there are threads waiting
727 # to search, one of them can now begin turning the
728 # taskmaster crank in NewParallel.
729 if task:
730 if self.trace:
731 self.trace_message("Executing task")
732 ok = True
733 try:
734 if self.interrupted():
735 raise SCons.Errors.BuildError(
736 task.targets[0], errstr=interrupt_msg)
737 task.execute()
738 except Exception:
739 ok = False
740 task.exception_set()
742 # Grab the results queue lock and enqueue the
743 # executed task and state. The next thread into
744 # the searching loop will complete the
745 # postprocessing work under the taskmaster lock.
747 if self.trace:
748 self.trace_message("Enqueueing executed task results")
749 with self.results_queue_lock:
750 self.results_queue.append((task, ok))
752 # Tricky state "fallthrough" here. We are going back
753 # to the top of the loop, which behaves differently
754 # depending on whether `task` is set. Do not perturb
755 # the value of the `task` variable if you add new code
756 # after this comment.
758 # Local Variables:
759 # tab-width:4
760 # indent-tabs-mode:nil
761 # End:
762 # vim: set expandtab tabstop=4 shiftwidth=4: