2 * Copyright (C) 2024 Mikulas Patocka
4 * This file is part of Ajla.
6 * Ajla is free software: you can redistribute it and/or modify it under the
7 * terms of the GNU General Public License as published by the Free Software
8 * Foundation, either version 3 of the License, or (at your option) any later
11 * Ajla is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along with
16 * Ajla. If not, see <https://www.gnu.org/licenses/>.
38 shared_var
unsigned nr_cpus
;
39 shared_var
unsigned nr_active_cpus
;
40 shared_var
uint32_t nr_cpus_override
shared_init(0);
43 mutex_t waiting_list_mutex
;
44 struct list waiting_list
;
47 struct thread_pointers
{
51 struct task_percpu
*tpc
;
54 shared_var
struct thread_pointers
*thread_pointers
;
56 shared_var refcount_t n_ex_controls
;
57 shared_var refcount_t n_programs
;
58 shared_var cond_t task_mutex
;
59 shared_var
unsigned n_deep_sleep
;
60 shared_var
struct list task_list
;
61 shared_var tick_stamp_t task_list_stamp
;
62 shared_var thread_volatile
sig_atomic_t task_list_nonempty
;
64 static tls_decl(struct task_percpu
*, task_tls
);
66 static void spawn_another_cpu(void);
68 static bool task_is_useless(struct execution_control
*ex
)
70 struct thunk
*thunk
= ex
->thunk
;
71 if (refcount_is_one(&n_programs
))
75 if (likely(thunk_tag_volatile(thunk
) != THUNK_TAG_BLACKHOLE_DEREFERENCED
))
77 address_lock(thunk
, DEPTH_THUNK
);
78 if (unlikely(thunk_tag(thunk
) != THUNK_TAG_BLACKHOLE_DEREFERENCED
)) {
79 address_unlock(thunk
, DEPTH_THUNK
);
82 address_unlock(thunk
, DEPTH_THUNK
);
84 if (unlikely(ex
->atomic
!= 0)) {
85 ex
->atomic_interrupted
= true;
91 static bool task_useless(struct execution_control
*ex
)
93 if (unlikely(task_is_useless(ex
))) {
94 pointer_t ptr
= pointer_thunk(thunk_alloc_exception_error(error_ajla(EC_ASYNC
, AJLA_ERROR_NOT_SUPPORTED
), NULL
, NULL
, NULL pass_file_line
));
95 execution_control_terminate(ex
, ptr
);
96 pointer_dereference(ptr
);
103 void task_list_print(void)
109 list_for_each(t
, &task_list
) {
110 struct execution_control
*ex
= get_struct(t
, struct execution_control
, wait
[0].wait_entry
);
111 if (l
) str_add_string(&s
, &l
, ", ");
112 str_add_unsigned(&s
, &l
, ptr_to_num(ex
), 16);
119 void attr_fastcall
task_submit(struct execution_control
*ex
, bool can_allocate_memory
)
121 ajla_assert(ex
== frame_execution_control(ex
->current_frame
), (file_line
, "task_submit: submitting task with improper execution control: %p != %p", ex
, frame_execution_control(ex
->current_frame
)));
123 cond_lock(&task_mutex
);
124 if (!task_list_nonempty
) {
125 task_list_stamp
= tick_stamp
;
127 if (tick_stamp
- task_list_stamp
>= 2 && likely(can_allocate_memory
)) {
129 task_list_stamp
= tick_stamp
;
132 list_add(&task_list
, &ex
->wait
[0].wait_entry
);
133 task_list_nonempty
= 1;
134 cond_unlock_signal(&task_mutex
);
137 static struct execution_control
*task_list_pop(void)
139 struct execution_control
*ex
;
140 if (!task_list_nonempty
)
142 ex
= get_struct(task_list
.prev
, struct execution_control
, wait
[0].wait_entry
);
143 list_del(&ex
->wait
[0].wait_entry
);
144 task_list_nonempty
= !list_is_empty(&task_list
);
148 void * attr_fastcall
task_schedule(struct execution_control
*old_ex
)
150 struct execution_control
*new_ex
;
152 if (unlikely(task_useless(old_ex
)))
153 return POINTER_FOLLOW_THUNK_EXIT
;
155 #ifndef THREAD_SANITIZER
156 if (!task_list_nonempty
)
160 cond_lock(&task_mutex
);
161 new_ex
= task_list_pop();
162 if (unlikely(!new_ex
))
163 goto unlock_no_sched
;
165 list_add(&task_list
, &old_ex
[0].wait
->wait_entry
);
167 ajla_assert(new_ex
!= old_ex
, (file_line
, "task_schedule: submitting already submitted task"));
169 task_list_nonempty
= 1;
170 cond_unlock(&task_mutex
);
172 if (unlikely(task_useless(new_ex
)))
173 return POINTER_FOLLOW_THUNK_EXIT
;
178 cond_unlock(&task_mutex
);
179 #ifndef THREAD_SANITIZER
185 void waiting_list_add(struct execution_control
*ex
)
187 struct task_percpu
*tpc
= tls_get(struct task_percpu
*, task_tls
);
189 mutex_lock(&tpc
->waiting_list_mutex
);
190 list_add(&tpc
->waiting_list
, &ex
->waiting_list_entry
);
191 ex
->waiting_list_head
= tpc
;
192 mutex_unlock(&tpc
->waiting_list_mutex
);
195 void waiting_list_remove(struct execution_control
*ex
)
197 struct task_percpu
*tpc
= ex
->waiting_list_head
;
199 mutex_lock(&tpc
->waiting_list_mutex
);
200 list_del(&ex
->waiting_list_entry
);
201 mutex_unlock(&tpc
->waiting_list_mutex
);
204 ex
->waiting_list_head
= NULL
;
208 bool waiting_list_break(void)
210 struct task_percpu
*tpc
= tls_get(struct task_percpu
*, task_tls
);
217 os_signal_check_all();
222 mutex_lock(&tpc
->waiting_list_mutex
);
224 list_for_each_back(l
, &tpc
->waiting_list
) {
225 struct execution_control
*ex
= get_struct(l
, struct execution_control
, waiting_list_entry
);
226 if (unlikely(task_is_useless(ex
))) {
227 if (execution_control_acquire(ex
)) {
228 mutex_unlock(&tpc
->waiting_list_mutex
);
229 execution_control_unlink_and_submit(ex
, true);
233 if (!ipret_break_waiting_chain(ex
->current_frame
, ex
->current_ip
)) {
235 list_del(&ex
->waiting_list_entry
);
236 list_init(&ex
->waiting_list_entry
);
240 ret
= !list_is_empty(&tpc
->waiting_list
);
241 mutex_unlock(&tpc
->waiting_list_mutex
);
246 static void task_worker_core(void)
248 if (unlikely(profiling
))
250 cond_lock(&task_mutex
);
251 while (likely(!refcount_is_one(&n_ex_controls
))) {
252 struct execution_control
*ex
;
253 if (!(ex
= task_list_pop())) {
255 cond_unlock(&task_mutex
);
256 more
= waiting_list_break();
257 cond_lock(&task_mutex
);
258 if (!(ex
= task_list_pop())) {
259 if (likely(refcount_is_one(&n_ex_controls
))) {
264 if (++n_deep_sleep
== nr_active_cpus
) {
267 cond_wait(&task_mutex
);
268 if (n_deep_sleep
-- == nr_active_cpus
) {
272 cond_wait_us(&task_mutex
, tick_us
);
275 cond_unlock(&task_mutex
);
278 iomux_check_all(more
? tick_us
: IOMUX_INDEFINITE_WAIT
);
281 cond_lock(&task_mutex
);
283 if (unlikely(profiling
))
288 cond_unlock(&task_mutex
);
289 if (likely(!task_useless(ex
)))
290 run(ex
->current_frame
, ex
->current_ip
);
291 cond_lock(&task_mutex
);
293 cond_unlock(&task_mutex
);
296 static void set_per_thread_data(struct thread_pointers
*tp
)
298 struct task_percpu
*tpc
;
299 thread_set_id((int)(tp
- thread_pointers
));
301 tls_set(struct task_percpu
*, task_tls
, tpc
);
305 thread_function_decl(task_worker
,
306 set_per_thread_data(arg
);
311 static void spawn_another_cpu(void)
314 if (nr_active_cpus
< nr_cpus
) {
316 /*debug("spawning cpu %d", nr_active_cpus);*/
317 if (unlikely(!thread_spawn(&thread_pointers
[nr_active_cpus
].thread
, task_worker
, &thread_pointers
[nr_active_cpus
], PRIORITY_COMPUTE
, &err
)))
324 void name(task_run
)(void)
330 set_per_thread_data(&thread_pointers
[0]);
332 cond_lock(&task_mutex
);
333 while (nr_active_cpus
< nr_cpus
)
335 cond_unlock(&task_mutex
);
339 cond_lock(&task_mutex
);
340 for (i
= 1; i
< nr_active_cpus
; i
++) {
341 cond_unlock(&task_mutex
);
342 thread_join(&thread_pointers
[i
].thread
);
343 cond_lock(&task_mutex
);
345 cond_unlock(&task_mutex
);
349 void task_ex_control_started(void)
351 refcount_inc(&n_ex_controls
);
354 void task_ex_control_exited(void)
356 ajla_assert_lo(!refcount_is_one(&n_ex_controls
), (file_line
, "task_ex_control_exit: n_ex_controls underflow"));
357 refcount_add(&n_ex_controls
, -1);
358 if (unlikely(refcount_is_one(&n_ex_controls
))) {
359 cond_lock(&task_mutex
);
360 cond_unlock_broadcast(&task_mutex
);
364 void task_program_started(void)
366 refcount_inc(&n_programs
);
369 void task_program_exited(void)
371 ajla_assert_lo(!refcount_is_one(&n_programs
), (file_line
, "task_program_exited: n_programs underflow"));
372 refcount_add(&n_programs
, -1);
376 void name(task_init
)(void)
380 refcount_init(&n_ex_controls
);
381 refcount_init(&n_programs
);
383 cond_init(&task_mutex
);
384 list_init(&task_list
);
385 task_list_nonempty
= 0;
387 nr_cpus
= thread_concurrency();
389 if (nr_cpus_override
)
390 nr_cpus
= nr_cpus_override
;
393 debug("concurrency: %u", nr_cpus
);
396 thread_pointers
= mem_alloc_array_mayfail(mem_calloc_mayfail
, struct thread_pointers
*, 0, 0, nr_cpus
, sizeof(struct thread_pointers
), NULL
);
397 for (i
= 0; i
< nr_cpus
; i
++) {
398 struct task_percpu
*tpc
;
399 tpc
= thread_pointers
[i
].tpc
= mem_alloc(struct task_percpu
*, sizeof(struct task_percpu
));
400 list_init(&tpc
->waiting_list
);
401 mutex_init(&tpc
->waiting_list_mutex
);
403 tls_init(struct task_percpu
*, task_tls
);
406 void name(task_done
)(void)
410 ajla_assert_lo(refcount_is_one(&n_programs
), (file_line
, "task_done: programs leaked: %"PRIuMAX
"", (uintmax_t)refcount_get_nonatomic(&n_programs
)));
412 tls_done(struct task_percpu
*, task_tls
);
413 for (i
= 0; i
< nr_cpus
; i
++) {
414 struct task_percpu
*tpc
= thread_pointers
[i
].tpc
;
415 mutex_done(&tpc
->waiting_list_mutex
);
418 mem_free(thread_pointers
);
420 cond_done(&task_mutex
);