ssa: delete unused label skip_must_be_flat
[ajla.git] / task.c
blob580e13b30e68948ee75e6d64dc82d12d63c34578
1 /*
2 * Copyright (C) 2024 Mikulas Patocka
4 * This file is part of Ajla.
6 * Ajla is free software: you can redistribute it and/or modify it under the
7 * terms of the GNU General Public License as published by the Free Software
8 * Foundation, either version 3 of the License, or (at your option) any later
9 * version.
11 * Ajla is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
13 * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License along with
16 * Ajla. If not, see <https://www.gnu.org/licenses/>.
19 #include "ajla.h"
21 #ifndef FILE_OMIT
23 #include "list.h"
24 #include "thread.h"
25 #include "ipret.h"
26 #include "refcount.h"
27 #include "tick.h"
28 #include "iomux.h"
29 #include "timer.h"
30 #include "ipfn.h"
32 #include "task.h"
34 #ifdef HAVE_SIGNAL_H
35 #include <signal.h>
36 #endif
38 shared_var unsigned nr_cpus;
39 shared_var unsigned nr_active_cpus;
40 shared_var uint32_t nr_cpus_override shared_init(0);
42 struct task_percpu {
43 mutex_t waiting_list_mutex;
44 struct list waiting_list;
47 struct thread_pointers {
48 #ifndef THREAD_NONE
49 thread_t thread;
50 #endif
51 struct task_percpu *tpc;
54 shared_var struct thread_pointers *thread_pointers;
56 shared_var refcount_t n_ex_controls;
57 shared_var refcount_t n_programs;
58 shared_var cond_t task_mutex;
59 shared_var unsigned n_deep_sleep;
60 shared_var struct list task_list;
61 shared_var tick_stamp_t task_list_stamp;
62 shared_var thread_volatile sig_atomic_t task_list_nonempty;
64 static tls_decl(struct task_percpu *, task_tls);
66 static void spawn_another_cpu(void);
68 static bool task_is_useless(struct execution_control *ex)
70 struct thunk *thunk = ex->thunk;
71 if (refcount_is_one(&n_programs))
72 goto ret_true;
73 if (unlikely(!thunk))
74 return false;
75 if (likely(thunk_tag_volatile(thunk) != THUNK_TAG_BLACKHOLE_DEREFERENCED))
76 return false;
77 address_lock(thunk, DEPTH_THUNK);
78 if (unlikely(thunk_tag(thunk) != THUNK_TAG_BLACKHOLE_DEREFERENCED)) {
79 address_unlock(thunk, DEPTH_THUNK);
80 return false;
82 address_unlock(thunk, DEPTH_THUNK);
83 ret_true:
84 if (unlikely(ex->atomic != 0)) {
85 ex->atomic_interrupted = true;
86 return false;
88 return true;
91 static bool task_useless(struct execution_control *ex)
93 if (unlikely(task_is_useless(ex))) {
94 pointer_t ptr = pointer_thunk(thunk_alloc_exception_error(error_ajla(EC_ASYNC, AJLA_ERROR_NOT_SUPPORTED), NULL, NULL, NULL pass_file_line));
95 execution_control_terminate(ex, ptr);
96 pointer_dereference(ptr);
97 return true;
99 return false;
102 #if 0
103 void task_list_print(void)
105 char *s;
106 size_t l;
107 struct list *t;
108 str_init(&s, &l);
109 list_for_each(t, &task_list) {
110 struct execution_control *ex = get_struct(t, struct execution_control, wait[0].wait_entry);
111 if (l) str_add_string(&s, &l, ", ");
112 str_add_unsigned(&s, &l, ptr_to_num(ex), 16);
114 str_finish(&s, &l);
115 mem_free(s);
117 #endif
119 void attr_fastcall task_submit(struct execution_control *ex, bool can_allocate_memory)
121 ajla_assert(ex == frame_execution_control(ex->current_frame), (file_line, "task_submit: submitting task with improper execution control: %p != %p", ex, frame_execution_control(ex->current_frame)));
123 cond_lock(&task_mutex);
124 if (!task_list_nonempty) {
125 task_list_stamp = tick_stamp;
126 } else {
127 if (tick_stamp - task_list_stamp >= 2 && likely(can_allocate_memory)) {
128 spawn_another_cpu();
129 task_list_stamp = tick_stamp;
132 list_add(&task_list, &ex->wait[0].wait_entry);
133 task_list_nonempty = 1;
134 cond_unlock_signal(&task_mutex);
137 static struct execution_control *task_list_pop(void)
139 struct execution_control *ex;
140 if (!task_list_nonempty)
141 return NULL;
142 ex = get_struct(task_list.prev, struct execution_control, wait[0].wait_entry);
143 list_del(&ex->wait[0].wait_entry);
144 task_list_nonempty = !list_is_empty(&task_list);
145 return ex;
148 void * attr_fastcall task_schedule(struct execution_control *old_ex)
150 struct execution_control *new_ex;
152 if (unlikely(task_useless(old_ex)))
153 return POINTER_FOLLOW_THUNK_EXIT;
155 #ifndef THREAD_SANITIZER
156 if (!task_list_nonempty)
157 goto no_sched;
158 #endif
160 cond_lock(&task_mutex);
161 new_ex = task_list_pop();
162 if (unlikely(!new_ex))
163 goto unlock_no_sched;
165 list_add(&task_list, &old_ex[0].wait->wait_entry);
167 ajla_assert(new_ex != old_ex, (file_line, "task_schedule: submitting already submitted task"));
169 task_list_nonempty = 1;
170 cond_unlock(&task_mutex);
172 if (unlikely(task_useless(new_ex)))
173 return POINTER_FOLLOW_THUNK_EXIT;
175 return new_ex;
177 unlock_no_sched:
178 cond_unlock(&task_mutex);
179 #ifndef THREAD_SANITIZER
180 no_sched:
181 #endif
182 return old_ex;
185 void waiting_list_add(struct execution_control *ex)
187 struct task_percpu *tpc = tls_get(struct task_percpu *, task_tls);
189 mutex_lock(&tpc->waiting_list_mutex);
190 list_add(&tpc->waiting_list, &ex->waiting_list_entry);
191 ex->waiting_list_head = tpc;
192 mutex_unlock(&tpc->waiting_list_mutex);
195 void waiting_list_remove(struct execution_control *ex)
197 struct task_percpu *tpc = ex->waiting_list_head;
199 mutex_lock(&tpc->waiting_list_mutex);
200 list_del(&ex->waiting_list_entry);
201 mutex_unlock(&tpc->waiting_list_mutex);
203 #ifdef DEBUG
204 ex->waiting_list_head = NULL;
205 #endif
208 bool waiting_list_break(void)
210 struct task_percpu *tpc = tls_get(struct task_percpu *, task_tls);
211 struct list *l;
212 bool ret;
214 #ifdef THREAD_NONE
215 timer_check_all();
216 os_proc_check_all();
217 os_signal_check_all();
218 iomux_check_all(0);
219 #endif
221 again:
222 mutex_lock(&tpc->waiting_list_mutex);
224 list_for_each_back(l, &tpc->waiting_list) {
225 struct execution_control *ex = get_struct(l, struct execution_control, waiting_list_entry);
226 if (unlikely(task_is_useless(ex))) {
227 if (execution_control_acquire(ex)) {
228 mutex_unlock(&tpc->waiting_list_mutex);
229 execution_control_unlink_and_submit(ex, true);
230 goto again;
233 if (!ipret_break_waiting_chain(ex->current_frame, ex->current_ip)) {
234 l = l->next;
235 list_del(&ex->waiting_list_entry);
236 list_init(&ex->waiting_list_entry);
240 ret = !list_is_empty(&tpc->waiting_list);
241 mutex_unlock(&tpc->waiting_list_mutex);
243 return ret;
246 static void task_worker_core(void)
248 if (unlikely(profiling))
249 profile_unblock();
250 cond_lock(&task_mutex);
251 while (likely(!refcount_is_one(&n_ex_controls))) {
252 struct execution_control *ex;
253 if (!(ex = task_list_pop())) {
254 bool more;
255 cond_unlock(&task_mutex);
256 more = waiting_list_break();
257 cond_lock(&task_mutex);
258 if (!(ex = task_list_pop())) {
259 if (likely(refcount_is_one(&n_ex_controls))) {
260 break;
262 #ifndef THREAD_NONE
263 if (!more) {
264 if (++n_deep_sleep == nr_active_cpus) {
265 tick_suspend();
267 cond_wait(&task_mutex);
268 if (n_deep_sleep-- == nr_active_cpus) {
269 tick_resume();
271 } else {
272 cond_wait_us(&task_mutex, tick_us);
274 #else
275 cond_unlock(&task_mutex);
276 if (!more)
277 tick_suspend();
278 iomux_check_all(more ? tick_us : IOMUX_INDEFINITE_WAIT);
279 if (!more)
280 tick_resume();
281 cond_lock(&task_mutex);
282 #endif
283 if (unlikely(profiling))
284 profile_unblock();
285 continue;
288 cond_unlock(&task_mutex);
289 if (likely(!task_useless(ex)))
290 run(ex->current_frame, ex->current_ip);
291 cond_lock(&task_mutex);
293 cond_unlock(&task_mutex);
296 static void set_per_thread_data(struct thread_pointers *tp)
298 struct task_percpu *tpc;
299 thread_set_id((int)(tp - thread_pointers));
300 tpc = tp->tpc;
301 tls_set(struct task_percpu *, task_tls, tpc);
304 #ifndef THREAD_NONE
305 thread_function_decl(task_worker,
306 set_per_thread_data(arg);
307 task_worker_core();
309 #endif
311 static void spawn_another_cpu(void)
313 #ifndef THREAD_NONE
314 if (nr_active_cpus < nr_cpus) {
315 ajla_error_t err;
316 /*debug("spawning cpu %d", nr_active_cpus);*/
317 if (unlikely(!thread_spawn(&thread_pointers[nr_active_cpus].thread, task_worker, &thread_pointers[nr_active_cpus], PRIORITY_COMPUTE, &err)))
318 return;
319 nr_active_cpus++;
321 #endif
324 void name(task_run)(void)
326 #ifndef THREAD_NONE
327 unsigned i;
328 #endif
329 nr_active_cpus = 1;
330 set_per_thread_data(&thread_pointers[0]);
331 #if 0
332 cond_lock(&task_mutex);
333 while (nr_active_cpus < nr_cpus)
334 spawn_another_cpu();
335 cond_unlock(&task_mutex);
336 #endif
337 task_worker_core();
338 #ifndef THREAD_NONE
339 cond_lock(&task_mutex);
340 for (i = 1; i < nr_active_cpus; i++) {
341 cond_unlock(&task_mutex);
342 thread_join(&thread_pointers[i].thread);
343 cond_lock(&task_mutex);
345 cond_unlock(&task_mutex);
346 #endif
349 void task_ex_control_started(void)
351 refcount_inc(&n_ex_controls);
354 void task_ex_control_exited(void)
356 ajla_assert_lo(!refcount_is_one(&n_ex_controls), (file_line, "task_ex_control_exit: n_ex_controls underflow"));
357 refcount_add(&n_ex_controls, -1);
358 if (unlikely(refcount_is_one(&n_ex_controls))) {
359 cond_lock(&task_mutex);
360 cond_unlock_broadcast(&task_mutex);
364 void task_program_started(void)
366 refcount_inc(&n_programs);
369 void task_program_exited(void)
371 ajla_assert_lo(!refcount_is_one(&n_programs), (file_line, "task_program_exited: n_programs underflow"));
372 refcount_add(&n_programs, -1);
376 void name(task_init)(void)
378 unsigned i;
380 refcount_init(&n_ex_controls);
381 refcount_init(&n_programs);
382 n_deep_sleep = 0;
383 cond_init(&task_mutex);
384 list_init(&task_list);
385 task_list_nonempty = 0;
387 nr_cpus = thread_concurrency();
388 #ifndef THREAD_NONE
389 if (nr_cpus_override)
390 nr_cpus = nr_cpus_override;
391 #endif
392 #ifdef DEBUG_INFO
393 debug("concurrency: %u", nr_cpus);
394 #endif
396 thread_pointers = mem_alloc_array_mayfail(mem_calloc_mayfail, struct thread_pointers *, 0, 0, nr_cpus, sizeof(struct thread_pointers), NULL);
397 for (i = 0; i < nr_cpus; i++) {
398 struct task_percpu *tpc;
399 tpc = thread_pointers[i].tpc = mem_alloc(struct task_percpu *, sizeof(struct task_percpu));
400 list_init(&tpc->waiting_list);
401 mutex_init(&tpc->waiting_list_mutex);
403 tls_init(struct task_percpu *, task_tls);
406 void name(task_done)(void)
408 unsigned i;
410 ajla_assert_lo(refcount_is_one(&n_programs), (file_line, "task_done: programs leaked: %"PRIuMAX"", (uintmax_t)refcount_get_nonatomic(&n_programs)));
412 tls_done(struct task_percpu *, task_tls);
413 for (i = 0; i < nr_cpus; i++) {
414 struct task_percpu *tpc = thread_pointers[i].tpc;
415 mutex_done(&tpc->waiting_list_mutex);
416 mem_free(tpc);
418 mem_free(thread_pointers);
420 cond_done(&task_mutex);
423 #endif