3 # Copyright The SCons Foundation
5 # Permission is hereby granted, free of charge, to any person obtaining
6 # a copy of this software and associated documentation files (the
7 # "Software"), to deal in the Software without restriction, including
8 # without limitation the rights to use, copy, modify, merge, publish,
9 # distribute, sublicense, and/or sell copies of the Software, and to
10 # permit persons to whom the Software is furnished to do so, subject to
11 # the following conditions:
13 # The above copyright notice and this permission notice shall be included
14 # in all copies or substantial portions of the Software.
16 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY
17 # KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
18 # WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20 # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24 """Serial and Parallel classes to execute build tasks.
26 The Jobs class provides a higher level interface to start,
27 stop, and wait on jobs.
45 # The default stack size (in kilobytes) of the threads used to execute
48 # We use a stack size of 256 kilobytes. The default on some platforms
49 # is too large and prevents us from creating enough threads to fully
50 # parallelized the build. For example, the default stack size on linux
53 explicit_stack_size
= None
54 default_stack_size
= 256
56 interrupt_msg
= 'Build interrupted.'
59 def __init__(self
) -> None:
60 self
.interrupted
= False
62 def set(self
) -> None:
63 self
.interrupted
= True
66 return self
.interrupted
70 """An instance of this class initializes N jobs, and provides
71 methods for starting, stopping, and waiting on all N jobs.
74 def __init__(self
, num
, taskmaster
) -> None:
76 Create 'num' jobs using the given taskmaster. The exact implementation
77 used varies with the number of jobs requested and the state of the `legacy_sched` flag
81 # Importing GetOption here instead of at top of file to avoid
83 # pylint: disable=import-outside-toplevel
84 from SCons
.Script
import GetOption
86 stack_size
= explicit_stack_size
87 if stack_size
is None:
88 stack_size
= default_stack_size
90 experimental_option
= GetOption('experimental') or []
91 if 'legacy_sched' in experimental_option
:
93 self
.job
= LegacyParallel(taskmaster
, num
, stack_size
)
95 self
.job
= Serial(taskmaster
)
97 self
.job
= NewParallel(taskmaster
, num
, stack_size
)
101 def run(self
, postfunc
=lambda: None) -> None:
104 postfunc() will be invoked after the jobs has run. It will be
105 invoked even if the jobs are interrupted by a keyboard
106 interrupt (well, in fact by a signal such as either SIGINT,
107 SIGTERM or SIGHUP). The execution of postfunc() is protected
108 against keyboard interrupts and is guaranteed to run to
110 self
._setup
_sig
_handler
()
115 self
._reset
_sig
_handler
()
117 def were_interrupted(self
):
118 """Returns whether the jobs were interrupted by a signal."""
119 return self
.job
.interrupted()
121 def _setup_sig_handler(self
) -> None:
122 """Setup an interrupt handler so that SCons can shutdown cleanly in
125 a) SIGINT: Keyboard interrupt
126 b) SIGTERM: kill or system shutdown
127 c) SIGHUP: Controlling shell exiting
129 We handle all of these cases by stopping the taskmaster. It
130 turns out that it's very difficult to stop the build process
131 by throwing asynchronously an exception such as
132 KeyboardInterrupt. For example, the python Condition
133 variables (threading.Condition) and queues do not seem to be
134 asynchronous-exception-safe. It would require adding a whole
135 bunch of try/finally block and except KeyboardInterrupt all
138 Note also that we have to be careful to handle the case when
139 SCons forks before executing another process. In that case, we
140 want the child to exit immediately.
142 def handler(signum
, stack
, self
=self
, parentpid
=os
.getpid()) -> None:
143 if os
.getpid() == parentpid
:
144 self
.job
.taskmaster
.stop()
145 self
.job
.interrupted
.set()
147 os
._exit
(2) # pylint: disable=protected-access
149 self
.old_sigint
= signal
.signal(signal
.SIGINT
, handler
)
150 self
.old_sigterm
= signal
.signal(signal
.SIGTERM
, handler
)
152 self
.old_sighup
= signal
.signal(signal
.SIGHUP
, handler
)
153 except AttributeError:
155 if (self
.old_sigint
is None) or (self
.old_sigterm
is None) or \
156 (hasattr(self
, "old_sighup") and self
.old_sighup
is None):
157 msg
= "Overwritting previous signal handler which was not installed from Python. " + \
158 "Will not be able to reinstate and so will return to default handler."
159 SCons
.Warnings
.warn(SCons
.Warnings
.SConsWarning
, msg
)
161 def _reset_sig_handler(self
) -> None:
162 """Restore the signal handlers to their previous state (before the
163 call to _setup_sig_handler()."""
164 sigint_to_use
= self
.old_sigint
if self
.old_sigint
is not None else signal
.SIG_DFL
165 sigterm_to_use
= self
.old_sigterm
if self
.old_sigterm
is not None else signal
.SIG_DFL
166 signal
.signal(signal
.SIGINT
, sigint_to_use
)
167 signal
.signal(signal
.SIGTERM
, sigterm_to_use
)
169 sigterm_to_use
= self
.old_sighup
if self
.old_sighup
is not None else signal
.SIG_DFL
170 signal
.signal(signal
.SIGHUP
, sigterm_to_use
)
171 except AttributeError:
176 """This class is used to execute tasks in series, and is more efficient
177 than Parallel, but is only appropriate for non-parallel builds. Only
178 one instance of this class should be in existence at a time.
180 This class is not thread safe.
183 def __init__(self
, taskmaster
) -> None:
184 """Create a new serial job given a taskmaster.
186 The taskmaster's next_task() method should return the next task
187 that needs to be executed, or None if there are no more tasks. The
188 taskmaster's executed() method will be called for each task when it
189 is successfully executed, or failed() will be called if it failed to
190 execute (e.g. execute() raised an exception)."""
192 self
.taskmaster
= taskmaster
193 self
.interrupted
= InterruptState()
196 """Start the job. This will begin pulling tasks from the taskmaster
197 and executing them, and return when there are no more tasks. If a task
198 fails to execute (i.e. execute() raises an exception), then the job will
202 task
= self
.taskmaster
.next_task()
209 if task
.needs_execute():
212 if self
.interrupted():
214 raise SCons
.Errors
.BuildError(
215 task
.targets
[0], errstr
=interrupt_msg
)
221 # Let the failed() callback function arrange for the
222 # build to stop if that's appropriate.
228 self
.taskmaster
.cleanup()
231 class Worker(threading
.Thread
):
232 """A worker thread waits on a task to be posted to its request queue,
233 dequeues the task, executes it, and posts a tuple including the task
234 and a boolean indicating whether the task executed successfully. """
236 def __init__(self
, requestQueue
, resultsQueue
, interrupted
) -> None:
239 self
.requestQueue
= requestQueue
240 self
.resultsQueue
= resultsQueue
241 self
.interrupted
= interrupted
246 task
= self
.requestQueue
.get()
249 # The "None" value is used as a sentinel by
250 # ThreadPool.cleanup(). This indicates that there
251 # are no more tasks, so we should quit.
255 if self
.interrupted():
256 raise SCons
.Errors
.BuildError(
257 task
.targets
[0], errstr
=interrupt_msg
)
265 self
.resultsQueue
.put((task
, ok
))
268 """This class is responsible for spawning and managing worker threads."""
270 def __init__(self
, num
, stack_size
, interrupted
) -> None:
271 """Create the request and reply queues, and 'num' worker threads.
273 One must specify the stack size of the worker threads. The
274 stack size is specified in kilobytes.
276 self
.requestQueue
= queue
.Queue(0)
277 self
.resultsQueue
= queue
.Queue(0)
280 prev_size
= threading
.stack_size(stack_size
* 1024)
281 except RuntimeError as e
:
282 # Only print a warning if the stack size has been explicitly set.
283 if explicit_stack_size
is not None:
284 msg
= "Setting stack size is unsupported by this version of Python:\n " + \
286 SCons
.Warnings
.warn(SCons
.Warnings
.StackSizeWarning
, msg
)
287 except ValueError as e
:
288 msg
= "Setting stack size failed:\n " + str(e
)
289 SCons
.Warnings
.warn(SCons
.Warnings
.StackSizeWarning
, msg
)
291 # Create worker threads
294 worker
= Worker(self
.requestQueue
, self
.resultsQueue
, interrupted
)
295 self
.workers
.append(worker
)
297 if 'prev_size' in locals():
298 threading
.stack_size(prev_size
)
300 def put(self
, task
) -> None:
301 """Put task into request queue."""
302 self
.requestQueue
.put(task
)
305 """Remove and return a result tuple from the results queue."""
306 return self
.resultsQueue
.get()
308 def preparation_failed(self
, task
) -> None:
309 self
.resultsQueue
.put((task
, False))
311 def cleanup(self
) -> None:
313 Shuts down the thread pool, giving each worker thread a
314 chance to shut down gracefully.
316 # For each worker thread, put a sentinel "None" value
317 # on the requestQueue (indicating that there's no work
318 # to be done) so that each worker thread will get one and
319 # terminate gracefully.
320 for _
in self
.workers
:
321 self
.requestQueue
.put(None)
323 # Wait for all of the workers to terminate.
325 # If we don't do this, later Python versions (2.4, 2.5) often
326 # seem to raise exceptions during shutdown. This happens
327 # in requestQueue.get(), as an assertion failure that
328 # requestQueue.not_full is notified while not acquired,
329 # seemingly because the main thread has shut down (or is
330 # in the process of doing so) while the workers are still
331 # trying to pull sentinels off the requestQueue.
333 # Normally these terminations should happen fairly quickly,
334 # but we'll stick a one-second timeout on here just in case
336 for worker
in self
.workers
:
340 class LegacyParallel
:
341 """This class is used to execute tasks in parallel, and is somewhat
342 less efficient than Serial, but is appropriate for parallel builds.
344 This class is thread safe.
347 def __init__(self
, taskmaster
, num
, stack_size
) -> None:
348 """Create a new parallel job given a taskmaster.
350 The taskmaster's next_task() method should return the next
351 task that needs to be executed, or None if there are no more
352 tasks. The taskmaster's executed() method will be called
353 for each task when it is successfully executed, or failed()
354 will be called if the task failed to execute (i.e. execute()
355 raised an exception).
357 Note: calls to taskmaster are serialized, but calls to
358 execute() on distinct tasks are not serialized, because
359 that is the whole point of parallel jobs: they can execute
360 multiple tasks simultaneously. """
362 self
.taskmaster
= taskmaster
363 self
.interrupted
= InterruptState()
364 self
.tp
= ThreadPool(num
, stack_size
, self
.interrupted
)
369 """Start the job. This will begin pulling tasks from the
370 taskmaster and executing them, and return when there are no
371 more tasks. If a task fails to execute (i.e. execute() raises
372 an exception), then the job will stop."""
377 # Start up as many available tasks as we're
379 while jobs
< self
.maxjobs
:
380 task
= self
.taskmaster
.next_task()
385 # prepare task for execution
392 if task
.needs_execute():
400 if not task
and not jobs
:
403 # Let any/all completed tasks finish up before we go
404 # back and put the next batch of tasks on the queue.
406 task
, ok
= self
.tp
.get()
412 if self
.interrupted():
414 raise SCons
.Errors
.BuildError(
415 task
.targets
[0], errstr
=interrupt_msg
)
419 # Let the failed() callback function arrange
420 # for the build to stop if that's appropriate.
425 if self
.tp
.resultsQueue
.empty():
429 self
.taskmaster
.cleanup()
431 # An experimental new parallel scheduler that uses a leaders/followers pattern.
440 class Worker(threading
.Thread
):
441 def __init__(self
, owner
) -> None:
447 def run(self
) -> None:
450 class FakeLock(object):
457 def __exit__(self
, *args
):
460 class FakeCondition(object):
461 def __init__(self
, lock
):
467 def notify_all(self
):
471 def __exit__(self
, *args
):
474 def __init__(self
, taskmaster
, num
, stack_size
) -> None:
475 self
.taskmaster
= taskmaster
476 self
.max_workers
= num
477 self
.stack_size
= stack_size
478 self
.interrupted
= InterruptState()
481 # The `tm_lock` is what ensures that we only have one
482 # thread interacting with the taskmaster at a time. It
483 # also protects access to our state that gets updated
484 # concurrently. The `can_search_cv` is associated with
486 self
.tm_lock
= (threading
.Lock
if self
.max_workers
> 1 else NewParallel
.FakeLock
)()
488 # Guarded under `tm_lock`.
490 self
.state
= NewParallel
.State
.READY
492 # The `can_search_cv` is used to manage a leader /
493 # follower pattern for access to the taskmaster, and to
494 # awaken from stalls.
495 self
.can_search_cv
= (threading
.Condition
if self
.max_workers
> 1 else NewParallel
.FakeCondition
)(self
.tm_lock
)
497 # The queue of tasks that have completed execution. The
498 # next thread to obtain `tm_lock`` will retire them.
499 self
.results_queue_lock
= (threading
.Lock
if self
.max_workers
> 1 else NewParallel
.FakeLock
)()
500 self
.results_queue
= []
502 if self
.taskmaster
.trace
:
503 self
.trace
= self
._setup
_logging
()
507 def _setup_logging(self
):
508 jl
= logging
.getLogger("Job")
509 jl
.setLevel(level
=logging
.DEBUG
)
510 jl
.addHandler(self
.taskmaster
.trace
.log_handler
)
513 def trace_message(self
, message
) -> None:
514 # This grabs the name of the function which calls trace_message()
515 method_name
= sys
._getframe
(1).f_code
.co_name
+ "():"
516 thread_id
=threading
.get_ident()
517 self
.trace
.debug('%s.%s [Thread:%s] %s' % (type(self
).__name
__, method_name
, thread_id
, message
))
519 def start(self
) -> None:
520 if self
.max_workers
== 1:
524 while len(self
.workers
) > 0:
525 self
.workers
[0].join()
527 self
.taskmaster
.cleanup()
529 def _maybe_start_worker(self
) -> None:
530 if self
.max_workers
> 1 and len(self
.workers
) < self
.max_workers
:
531 if self
.jobs
>= len(self
.workers
):
534 def _start_worker(self
) -> None:
535 prev_size
= self
._adjust
_stack
_size
()
537 self
.trace_message("Starting new worker thread")
538 self
.workers
.append(NewParallel
.Worker(self
))
539 self
._restore
_stack
_size
(prev_size
)
541 def _adjust_stack_size(self
):
543 prev_size
= threading
.stack_size(self
.stack_size
* 1024)
545 except AttributeError as e
:
546 # Only print a warning if the stack size has been
548 if explicit_stack_size
is not None:
549 msg
= "Setting stack size is unsupported by this version of Python:\n " + \
551 SCons
.Warnings
.warn(SCons
.Warnings
.StackSizeWarning
, msg
)
552 except ValueError as e
:
553 msg
= "Setting stack size failed:\n " + str(e
)
554 SCons
.Warnings
.warn(SCons
.Warnings
.StackSizeWarning
, msg
)
558 def _restore_stack_size(self
, prev_size
) -> None:
559 if prev_size
is not None:
560 threading
.stack_size(prev_size
)
568 # Obtain `tm_lock`, granting exclusive access to the taskmaster.
569 with self
.can_search_cv
:
572 self
.trace_message("Gained exclusive access")
574 # Capture whether we got here with `task` set,
575 # then drop our reference to the task as we are no
576 # longer interested in the actual object.
577 completed_task
= (task
is not None)
580 # We will only have `completed_task` set here if
581 # we have looped back after executing a task. If
582 # we have completed a task and find that we are
583 # stalled, we should speculatively indicate that
584 # we are no longer stalled by transitioning to the
585 # 'ready' state which will bypass the condition
586 # wait so that we immediately process the results
587 # queue and hopefully light up new
588 # work. Otherwise, stay stalled, and we will wait
589 # in the condvar. Some other thread will come back
590 # here with a completed task.
591 if self
.state
== NewParallel
.State
.STALLED
and completed_task
:
593 self
.trace_message("Detected stall with completed task, bypassing wait")
594 self
.state
= NewParallel
.State
.READY
596 # Wait until we are neither searching nor stalled.
597 while self
.state
== NewParallel
.State
.SEARCHING
or self
.state
== NewParallel
.State
.STALLED
:
599 self
.trace_message("Search already in progress, waiting")
600 self
.can_search_cv
.wait()
602 # If someone set the completed flag, bail.
603 if self
.state
== NewParallel
.State
.COMPLETED
:
605 self
.trace_message("Completion detected, breaking from main loop")
608 # Set the searching flag to indicate that a thread
609 # is currently in the critical section for
613 self
.trace_message("Starting search")
614 self
.state
= NewParallel
.State
.SEARCHING
616 # Bulk acquire the tasks in the results queue
617 # under the result queue lock, then process them
618 # all outside that lock. We need to process the
619 # tasks in the results queue before looking for
620 # new work because we might be unable to find new
623 with self
.results_queue_lock
:
624 results_queue
, self
.results_queue
= self
.results_queue
, results_queue
627 self
.trace_message(f
"Found {len(results_queue)} completed tasks to process")
628 for (rtask
, rresult
) in results_queue
:
632 if self
.interrupted():
634 raise SCons
.Errors
.BuildError(
635 rtask
.targets
[0], errstr
=interrupt_msg
)
637 rtask
.exception_set()
639 # Let the failed() callback function arrange
640 # for the build to stop if that's appropriate.
646 # We are done with any task objects that were in
648 results_queue
.clear()
650 # Now, turn the crank on the taskmaster until we
651 # either run out of tasks, or find a task that
652 # needs execution. If we run out of tasks, go idle
653 # until results arrive if jobs are pending, or
654 # mark the walk as complete if not.
655 while self
.state
== NewParallel
.State
.SEARCHING
:
657 self
.trace_message("Searching for new tasks")
658 task
= self
.taskmaster
.next_task()
661 # We found a task. Walk it through the
662 # task lifecycle. If it does not need
663 # execution, just complete the task and
664 # look for the next one. Otherwise,
665 # indicate that we are no longer searching
666 # so we can drop out of this loop, execute
667 # the task outside the lock, and allow
668 # another thread in to search.
676 if not task
.needs_execute():
678 self
.trace_message("Found internal task")
684 self
.trace_message("Found task requiring execution")
685 self
.state
= NewParallel
.State
.READY
686 self
.can_search_cv
.notify()
687 # This thread will be busy taking care of
688 # `execute`ing this task. If we haven't
689 # reached the limit, spawn a new thread to
690 # turn the crank and find the next task.
691 self
._maybe
_start
_worker
()
694 # We failed to find a task, so this thread
695 # cannot continue turning the taskmaster
696 # crank. We must exit the loop.
698 # No task was found, but there are
699 # outstanding jobs executing that
700 # might unblock new tasks when they
701 # complete. Transition to the stalled
702 # state. We do not need a notify,
703 # because we know there are threads
704 # outstanding that will re-enter the
708 self
.trace_message("Found no task requiring execution, but have jobs: marking stalled")
709 self
.state
= NewParallel
.State
.STALLED
711 # We didn't find a task and there are
712 # no jobs outstanding, so there is
713 # nothing that will ever return
714 # results which might unblock new
715 # tasks. We can conclude that the walk
716 # is complete. Update our state to
717 # note completion and awaken anyone
718 # sleeping on the condvar.
721 self
.trace_message("Found no task requiring execution, and have no jobs: marking complete")
722 self
.state
= NewParallel
.State
.COMPLETED
723 self
.can_search_cv
.notify_all()
725 # We no longer hold `tm_lock` here. If we have a task,
726 # we can now execute it. If there are threads waiting
727 # to search, one of them can now begin turning the
728 # taskmaster crank in NewParallel.
731 self
.trace_message("Executing task")
734 if self
.interrupted():
735 raise SCons
.Errors
.BuildError(
736 task
.targets
[0], errstr
=interrupt_msg
)
742 # Grab the results queue lock and enqueue the
743 # executed task and state. The next thread into
744 # the searching loop will complete the
745 # postprocessing work under the taskmaster lock.
748 self
.trace_message("Enqueueing executed task results")
749 with self
.results_queue_lock
:
750 self
.results_queue
.append((task
, ok
))
752 # Tricky state "fallthrough" here. We are going back
753 # to the top of the loop, which behaves differently
754 # depending on whether `task` is set. Do not perturb
755 # the value of the `task` variable if you add new code
756 # after this comment.
760 # indent-tabs-mode:nil
762 # vim: set expandtab tabstop=4 shiftwidth=4: