1 # SPDX-FileCopyrightText: 2023 Blender Foundation
3 # SPDX-License-Identifier: GPL-2.0-or-later
5 from contextlib
import contextmanager
, nullcontext
7 from queue
import SimpleQueue
9 # Note: `bpy` cannot be imported here because this module is also used by the fbx2json.py and json2fbx.py scripts.
11 # For debugging/profiling purposes, can be modified at runtime to force single-threaded execution.
12 _MULTITHREADING_ENABLED
= True
13 # The concurrent.futures module may not work or may not be available on WebAssembly platforms wasm32-emscripten and
16 from concurrent
.futures
import ThreadPoolExecutor
17 except ModuleNotFoundError
:
18 _MULTITHREADING_ENABLED
= False
19 ThreadPoolExecutor
= None
22 # The module may be available, but not be fully functional. An error may be raised when attempting to start a
24 with
ThreadPoolExecutor() as tpe
:
25 # Attempt to start a thread by submitting a callable.
26 tpe
.submit(lambda: None)
28 # Assume that multithreading is not supported and fall back to single-threaded execution.
29 _MULTITHREADING_ENABLED
= False
33 """Get the number of cpus assigned to the current process if that information is available on this system.
34 If not available, get the total number of cpus.
35 If the cpu count is indeterminable, it is assumed that there is only 1 cpu available."""
36 sched_getaffinity
= getattr(os
, "sched_getaffinity", None)
37 if sched_getaffinity
is not None:
38 # Return the number of cpus assigned to the current process.
39 return len(sched_getaffinity(0))
40 count
= os
.cpu_count()
41 return count
if count
is not None else 1
44 class MultiThreadedTaskConsumer
:
45 """Helper class that encapsulates everything needed to run a function on separate threads, with a single-threaded
46 fallback if multithreading is not available.
48 Lower overhead than typical use of ThreadPoolExecutor because no Future objects are returned, which makes this class
49 more suitable to running many smaller tasks.
51 As with any threaded parallelization, because of Python's Global Interpreter Lock, only one thread can execute
52 Python code at a time, so threaded parallelization is only useful when the functions used release the GIL, such as
53 many IO related functions."""
54 # A special task value used to signal task consumer threads to shut down.
55 _SHUT_DOWN_THREADS
= object()
57 __slots__
= ("_consumer_function", "_shared_task_queue", "_task_consumer_futures", "_executor",
58 "_max_consumer_threads", "_shutting_down", "_max_queue_per_consumer")
60 def __init__(self
, consumer_function
, max_consumer_threads
, max_queue_per_consumer
=5):
61 # It's recommended to use MultiThreadedTaskConsumer.new_cpu_bound_cm() instead of creating new instances
63 # __init__ should only be called after checking _MULTITHREADING_ENABLED.
64 assert(_MULTITHREADING_ENABLED
)
65 # The function that will be called on separate threads to consume tasks.
66 self
._consumer
_function
= consumer_function
67 # All the threads share a single queue. This is a simplistic approach, but it is unlikely to be problematic
68 # unless the main thread is expected to wait a long time for the consumer threads to finish.
69 self
._shared
_task
_queue
= SimpleQueue()
70 # Reference to each thread is kept through the returned Future objects. This is used as part of determining when
71 # new threads should be started and is used to be able to receive and handle exceptions from the threads.
72 self
._task
_consumer
_futures
= []
73 # Create the executor.
74 self
._executor
= ThreadPoolExecutor(max_workers
=max_consumer_threads
)
75 # Technically the max workers of the executor is accessible through its `._max_workers`, but since it's private,
76 # meaning it could be changed without warning, we'll store the max workers/consumers ourselves.
77 self
._max
_consumer
_threads
= max_consumer_threads
78 # The maximum task queue size (before another consumer thread is started) increases by this amount with every
79 # additional consumer thread.
80 self
._max
_queue
_per
_consumer
= max_queue_per_consumer
81 # When shutting down the threads, this is set to True as an extra safeguard to prevent new tasks being
83 self
._shutting
_down
= False
86 def new_cpu_bound_cm(cls
, consumer_function
, other_cpu_bound_threads_in_use
=1, hard_max_threads
=32):
87 """Return a context manager that, when entered, returns a wrapper around `consumer_function` that schedules
88 `consumer_function` to be run on a separate thread.
90 If the system can't use multithreading, then the context manager's returned function will instead be the input
91 `consumer_function` argument, causing tasks to be run immediately on the calling thread.
93 When exiting the context manager, it waits for all scheduled tasks to complete and prevents the creation of new
94 tasks, similar to calling ThreadPoolExecutor.shutdown(). For these reasons, the wrapped function should only be
95 called from the thread that entered the context manager, otherwise there is no guarantee that all tasks will get
96 scheduled before the context manager exits.
98 Any task that fails with an exception will cause all task consumer threads to stop.
100 The maximum number of threads used matches the number of cpus available up to a maximum of `hard_max_threads`.
101 `hard_max_threads`'s default of 32 matches ThreadPoolExecutor's default behaviour.
103 The maximum number of threads used is decreased by `other_cpu_bound_threads_in_use`. Defaulting to `1`, assuming
104 that the calling thread will also be doing CPU-bound work.
106 Most IO-bound tasks can probably use a ThreadPoolExecutor directly instead because there will typically be fewer
107 tasks and, on average, each individual task will take longer.
108 If needed, `cls.new_cpu_bound_cm(consumer_function, -4)` could be suitable for lots of small IO-bound tasks,
109 because it ensures a minimum of 5 threads, like the default ThreadPoolExecutor."""
110 if _MULTITHREADING_ENABLED
:
111 max_threads
= get_cpu_count() - other_cpu_bound_threads_in_use
112 max_threads
= min(max_threads
, hard_max_threads
)
114 return cls(consumer_function
, max_threads
)._wrap
_executor
_cm
()
115 # Fall back to single-threaded.
116 return nullcontext(consumer_function
)
118 def _task_consumer_callable(self
):
119 """Callable that is run by each task consumer thread.
120 Signals the other task consumer threads to stop when stopped intentionally or when an exception occurs."""
123 # Blocks until it can get a task.
124 task_args
= self
._shared
_task
_queue
.get()
126 if task_args
is self
._SHUT
_DOWN
_THREADS
:
127 # This special value signals that it's time for all the threads to stop.
130 # Call the task consumer function.
131 self
._consumer
_function
(*task_args
)
133 # Either the thread has been told to shut down because it received _SHUT_DOWN_THREADS or an exception has
135 # Add _SHUT_DOWN_THREADS to the queue so that the other consumer threads will also shut down.
136 self
._shared
_task
_queue
.put(self
._SHUT
_DOWN
_THREADS
)
138 def _schedule_task(self
, *args
):
139 """Task consumer threads are only started as tasks are added.
141 To mitigate starting lots of threads if many tasks are scheduled in quick succession, new threads are only
142 started if the number of queued tasks grows too large.
144 This function is a slight misuse of ThreadPoolExecutor. Normally each task to be scheduled would be submitted
145 through ThreadPoolExecutor.submit, but doing so is noticeably slower for small tasks. We could start new Thread
146 instances manually without using ThreadPoolExecutor, but ThreadPoolExecutor gives us a higher level API for
147 waiting for threads to finish and handling exceptions without having to implement an API using Thread ourselves.
149 if self
._shutting
_down
:
150 # Shouldn't occur through normal usage.
151 raise RuntimeError("Cannot schedule new tasks after shutdown")
152 # Schedule the task by adding it to the task queue.
153 self
._shared
_task
_queue
.put(args
)
154 # Check if more consumer threads need to be added to account for the rate at which tasks are being scheduled
155 # compared to the rate at which tasks are being consumed.
156 current_consumer_count
= len(self
._task
_consumer
_futures
)
157 if current_consumer_count
< self
._max
_consumer
_threads
:
158 # The max queue size increases as new threads are added, otherwise, by the time the next task is added, it's
159 # likely that the queue size will still be over the max, causing another new thread to be added immediately.
160 # Increasing the max queue size whenever a new thread is started gives some time for the new thread to start
161 # up and begin consuming tasks before it's determined that another thread is needed.
162 max_queue_size_for_current_consumers
= self
._max
_queue
_per
_consumer
* current_consumer_count
164 if self
._shared
_task
_queue
.qsize() > max_queue_size_for_current_consumers
:
165 # Add a new consumer thread because the queue has grown too large.
166 self
._task
_consumer
_futures
.append(self
._executor
.submit(self
._task
_consumer
_callable
))
169 def _wrap_executor_cm(self
):
170 """Wrap the executor's context manager to instead return self._schedule_task and such that the threads
171 automatically start shutting down before the executor itself starts shutting down."""
173 # Exiting the context manager of the executor will wait for all threads to finish and prevent new
174 # threads from being created, as if its shutdown() method had been called.
177 yield self
._schedule
_task
180 self
._shutting
_down
= True
181 # Signal all consumer threads to finish up and shut down so that the executor can shut down.
182 # When this is run on the same thread that schedules new tasks, this guarantees that no more tasks will
183 # be scheduled after the consumer threads start to shut down.
184 self
._shared
_task
_queue
.put(self
._SHUT
_DOWN
_THREADS
)
186 # Because `self._executor` was entered with a context manager, it will wait for all the consumer threads
187 # to finish even if we propagate an exception from one of the threads here.
188 for future
in self
._task
_consumer
_futures
:
189 # .exception() waits for the future to finish and returns its raised exception or None.
190 ex
= future
.exception()
192 # If one of the threads raised an exception, propagate it to the main thread.
193 # Only the first exception will be propagated if there were multiple.