Sys.Signals module for a Variant type of signals (and a set_signal function that...
[ocaml.git] / otherlibs / threads / scheduler.c
blobc73ac67e81a002a94dc99f3d53b7fe2efc16668f
1 /***********************************************************************/
2 /* */
3 /* Objective Caml */
4 /* */
5 /* Xavier Leroy, projet Cristal, INRIA Rocquencourt */
6 /* */
7 /* Copyright 1996 Institut National de Recherche en Informatique et */
8 /* en Automatique. All rights reserved. This file is distributed */
9 /* under the terms of the GNU Library General Public License, with */
10 /* the special exception on linking described in file ../../LICENSE. */
11 /* */
12 /***********************************************************************/
14 /* $Id$ */
16 /* The thread scheduler */
18 #include <string.h>
19 #include <stdlib.h>
20 #include <stdio.h>
22 #include "alloc.h"
23 #include "backtrace.h"
24 #include "callback.h"
25 #include "config.h"
26 #include "fail.h"
27 #include "io.h"
28 #include "memory.h"
29 #include "misc.h"
30 #include "mlvalues.h"
31 #include "printexc.h"
32 #include "roots.h"
33 #include "signals.h"
34 #include "stacks.h"
35 #include "sys.h"
37 #if ! (defined(HAS_SELECT) && \
38 defined(HAS_SETITIMER) && \
39 defined(HAS_GETTIMEOFDAY) && \
40 (defined(HAS_WAITPID) || defined(HAS_WAIT4)))
41 #include "Cannot compile libthreads, system calls missing"
42 #endif
44 #include <errno.h>
45 #include <sys/time.h>
46 #include <sys/types.h>
47 #include <sys/wait.h>
48 #include <sys/stat.h>
49 #include <fcntl.h>
50 #ifdef HAS_UNISTD
51 #include <unistd.h>
52 #endif
53 #ifdef HAS_SYS_SELECT_H
54 #include <sys/select.h>
55 #endif
57 #ifndef HAS_WAITPID
58 #define waitpid(pid,status,opts) wait4(pid,status,opts,NULL)
59 #endif
61 #ifndef O_NONBLOCK
62 #define O_NONBLOCK O_NDELAY
63 #endif
65 /* Configuration */
67 /* Initial size of stack when a thread is created (4kB) */
68 #define Thread_stack_size (Stack_size / 4)
70 /* Max computation time before rescheduling, in microseconds (50ms) */
71 #define Thread_timeout 50000
73 /* The thread descriptors */
75 struct caml_thread_struct {
76 value ident; /* Unique id (for equality comparisons) */
77 struct caml_thread_struct * next; /* Double linking of threads */
78 struct caml_thread_struct * prev;
79 value * stack_low; /* The execution stack for this thread */
80 value * stack_high;
81 value * stack_threshold;
82 value * sp;
83 value * trapsp;
84 value backtrace_pos; /* The backtrace info for this thread */
85 code_t * backtrace_buffer;
86 value backtrace_last_exn;
87 value status; /* RUNNABLE, KILLED. etc (see below) */
88 value fd; /* File descriptor on which we're doing read or write */
89 value readfds, writefds, exceptfds;
90 /* Lists of file descriptors on which we're doing select() */
91 value delay; /* Time until which this thread is blocked */
92 value joining; /* Thread we're trying to join */
93 value waitpid; /* PID of process we're waiting for */
94 value retval; /* Value to return when thread resumes */
97 typedef struct caml_thread_struct * caml_thread_t;
99 #define RUNNABLE Val_int(0)
100 #define KILLED Val_int(1)
101 #define SUSPENDED Val_int(2)
102 #define BLOCKED_READ Val_int(4)
103 #define BLOCKED_WRITE Val_int(8)
104 #define BLOCKED_SELECT Val_int(16)
105 #define BLOCKED_DELAY Val_int(32)
106 #define BLOCKED_JOIN Val_int(64)
107 #define BLOCKED_WAIT Val_int(128)
109 #define RESUMED_WAKEUP Val_int(0)
110 #define RESUMED_DELAY Val_int(1)
111 #define RESUMED_JOIN Val_int(2)
112 #define RESUMED_IO Val_int(3)
114 #define TAG_RESUMED_SELECT 0
115 #define TAG_RESUMED_WAIT 1
117 #define NO_FDS Val_unit
118 #define NO_DELAY Val_unit
119 #define NO_JOINING Val_unit
120 #define NO_WAITPID Val_int(0)
122 #define DELAY_INFTY 1E30 /* +infty, for this purpose */
124 /* The thread currently active */
125 static caml_thread_t curr_thread = NULL;
126 /* Identifier for next thread creation */
127 static value next_ident = Val_int(0);
129 #define Assign(dst,src) modify((value *)&(dst), (value)(src))
131 /* Scan the stacks of the other threads */
133 static void (*prev_scan_roots_hook) (scanning_action);
135 static void thread_scan_roots(scanning_action action)
137 caml_thread_t th, start;
139 /* Scan all active descriptors */
140 start = curr_thread;
141 (*action)((value) curr_thread, (value *) &curr_thread);
142 /* Don't scan curr_thread->sp, this has already been done.
143 Don't scan local roots either, for the same reason. */
144 for (th = start->next; th != start; th = th->next) {
145 do_local_roots(action, th->sp, th->stack_high, NULL);
147 /* Hook */
148 if (prev_scan_roots_hook != NULL) (*prev_scan_roots_hook)(action);
151 /* Forward declarations for async I/O handling */
153 static int stdin_initial_status, stdout_initial_status, stderr_initial_status;
154 static void thread_restore_std_descr(void);
156 /* Initialize the thread machinery */
158 value thread_initialize(value unit) /* ML */
160 /* Protect against repeated initialization (PR#1325) */
161 if (curr_thread != NULL) return Val_unit;
162 /* Create a descriptor for the current thread */
163 curr_thread =
164 (caml_thread_t) alloc_shr(sizeof(struct caml_thread_struct)
165 / sizeof(value), 0);
166 curr_thread->ident = next_ident;
167 next_ident = Val_int(Int_val(next_ident) + 1);
168 curr_thread->next = curr_thread;
169 curr_thread->prev = curr_thread;
170 curr_thread->stack_low = stack_low;
171 curr_thread->stack_high = stack_high;
172 curr_thread->stack_threshold = stack_threshold;
173 curr_thread->sp = extern_sp;
174 curr_thread->trapsp = trapsp;
175 curr_thread->backtrace_pos = Val_int(backtrace_pos);
176 curr_thread->backtrace_buffer = backtrace_buffer;
177 caml_initialize (&curr_thread->backtrace_last_exn, backtrace_last_exn);
178 curr_thread->status = RUNNABLE;
179 curr_thread->fd = Val_int(0);
180 curr_thread->readfds = NO_FDS;
181 curr_thread->writefds = NO_FDS;
182 curr_thread->exceptfds = NO_FDS;
183 curr_thread->delay = NO_DELAY;
184 curr_thread->joining = NO_JOINING;
185 curr_thread->waitpid = NO_WAITPID;
186 curr_thread->retval = Val_unit;
187 /* Initialize GC */
188 prev_scan_roots_hook = scan_roots_hook;
189 scan_roots_hook = thread_scan_roots;
190 /* Set standard file descriptors to non-blocking mode */
191 stdin_initial_status = fcntl(0, F_GETFL);
192 stdout_initial_status = fcntl(1, F_GETFL);
193 stderr_initial_status = fcntl(2, F_GETFL);
194 if (stdin_initial_status != -1)
195 fcntl(0, F_SETFL, stdin_initial_status | O_NONBLOCK);
196 if (stdout_initial_status != -1)
197 fcntl(1, F_SETFL, stdout_initial_status | O_NONBLOCK);
198 if (stderr_initial_status != -1)
199 fcntl(2, F_SETFL, stderr_initial_status | O_NONBLOCK);
200 /* Register an at-exit function to restore the standard file descriptors */
201 atexit(thread_restore_std_descr);
202 return Val_unit;
205 /* Initialize the interval timer used for preemption */
207 value thread_initialize_preemption(value unit) /* ML */
209 struct itimerval timer;
211 timer.it_interval.tv_sec = 0;
212 timer.it_interval.tv_usec = Thread_timeout;
213 timer.it_value = timer.it_interval;
214 setitimer(ITIMER_VIRTUAL, &timer, NULL);
215 return Val_unit;
218 /* Create a thread */
220 value thread_new(value clos) /* ML */
222 caml_thread_t th;
223 /* Allocate the thread and its stack */
224 Begin_root(clos);
225 th = (caml_thread_t) alloc_shr(sizeof(struct caml_thread_struct)
226 / sizeof(value), 0);
227 End_roots();
228 th->ident = next_ident;
229 next_ident = Val_int(Int_val(next_ident) + 1);
230 th->stack_low = (value *) stat_alloc(Thread_stack_size);
231 th->stack_high = th->stack_low + Thread_stack_size / sizeof(value);
232 th->stack_threshold = th->stack_low + Stack_threshold / sizeof(value);
233 th->sp = th->stack_high;
234 th->trapsp = th->stack_high;
235 /* Set up a return frame that pretends we're applying the function to ().
236 This way, the next RETURN instruction will run the function. */
237 th->sp -= 5;
238 th->sp[0] = Val_unit; /* dummy local to be popped by RETURN 1 */
239 th->sp[1] = (value) Code_val(clos);
240 th->sp[2] = clos;
241 th->sp[3] = Val_long(0); /* no extra args */
242 th->sp[4] = Val_unit; /* the () argument */
243 /* Fake a C call frame */
244 th->sp--;
245 th->sp[0] = Val_unit; /* a dummy environment */
246 /* Finish initialization of th */
247 th->backtrace_pos = Val_int(0);
248 th->backtrace_buffer = NULL;
249 th->backtrace_last_exn = Val_unit;
250 /* The thread is initially runnable */
251 th->status = RUNNABLE;
252 th->fd = Val_int(0);
253 th->readfds = NO_FDS;
254 th->writefds = NO_FDS;
255 th->exceptfds = NO_FDS;
256 th->delay = NO_DELAY;
257 th->joining = NO_JOINING;
258 th->waitpid = NO_WAITPID;
259 th->retval = Val_unit;
260 /* Insert thread in doubly linked list of threads */
261 th->prev = curr_thread->prev;
262 th->next = curr_thread;
263 Assign(curr_thread->prev->next, th);
264 Assign(curr_thread->prev, th);
265 /* Return thread */
266 return (value) th;
269 /* Return the thread identifier */
271 value thread_id(value th) /* ML */
273 return ((caml_thread_t)th)->ident;
276 /* Return the current time as a floating-point number */
278 static double timeofday(void)
280 struct timeval tv;
281 gettimeofday(&tv, NULL);
282 return (double) tv.tv_sec + (double) tv.tv_usec * 1e-6;
285 /* Find a runnable thread and activate it */
287 #define FOREACH_THREAD(x) x = curr_thread; do { x = x->next;
288 #define END_FOREACH(x) } while (x != curr_thread)
290 static value alloc_process_status(int pid, int status);
291 static void add_fdlist_to_set(value fdl, fd_set *set);
292 static value inter_fdlist_set(value fdl, fd_set *set, int *count);
293 static void find_bad_fd(int fd, fd_set *set);
294 static void find_bad_fds(value fdl, fd_set *set);
296 static value schedule_thread(void)
298 caml_thread_t run_thread, th;
299 fd_set readfds, writefds, exceptfds;
300 double delay, now;
301 int need_select, need_wait;
303 /* Don't allow preemption during a callback */
304 if (callback_depth > 1) return curr_thread->retval;
306 /* Save the status of the current thread */
307 curr_thread->stack_low = stack_low;
308 curr_thread->stack_high = stack_high;
309 curr_thread->stack_threshold = stack_threshold;
310 curr_thread->sp = extern_sp;
311 curr_thread->trapsp = trapsp;
312 curr_thread->backtrace_pos = Val_int(backtrace_pos);
313 curr_thread->backtrace_buffer = backtrace_buffer;
314 caml_modify (&curr_thread->backtrace_last_exn, backtrace_last_exn);
316 try_again:
317 /* Find if a thread is runnable.
318 Build fdsets and delay for select.
319 See if some join or wait operations succeeded. */
320 run_thread = NULL;
321 FD_ZERO(&readfds);
322 FD_ZERO(&writefds);
323 FD_ZERO(&exceptfds);
324 delay = DELAY_INFTY;
325 now = -1.0;
326 need_select = 0;
327 need_wait = 0;
329 FOREACH_THREAD(th)
330 if (th->status <= SUSPENDED) continue;
332 if (th->status & (BLOCKED_READ - 1)) {
333 FD_SET(Int_val(th->fd), &readfds);
334 need_select = 1;
336 if (th->status & (BLOCKED_WRITE - 1)) {
337 FD_SET(Int_val(th->fd), &writefds);
338 need_select = 1;
340 if (th->status & (BLOCKED_SELECT - 1)) {
341 add_fdlist_to_set(th->readfds, &readfds);
342 add_fdlist_to_set(th->writefds, &writefds);
343 add_fdlist_to_set(th->exceptfds, &exceptfds);
344 need_select = 1;
346 if (th->status & (BLOCKED_DELAY - 1)) {
347 double th_delay;
348 if (now < 0.0) now = timeofday();
349 th_delay = Double_val(th->delay) - now;
350 if (th_delay <= 0) {
351 th->status = RUNNABLE;
352 Assign(th->retval,RESUMED_DELAY);
353 } else {
354 if (th_delay < delay) delay = th_delay;
357 if (th->status & (BLOCKED_JOIN - 1)) {
358 if (((caml_thread_t)(th->joining))->status == KILLED) {
359 th->status = RUNNABLE;
360 Assign(th->retval, RESUMED_JOIN);
363 if (th->status & (BLOCKED_WAIT - 1)) {
364 int status, pid;
365 pid = waitpid(Int_val(th->waitpid), &status, WNOHANG);
366 if (pid > 0) {
367 th->status = RUNNABLE;
368 Assign(th->retval, alloc_process_status(pid, status));
369 } else {
370 need_wait = 1;
373 END_FOREACH(th);
375 /* Find if a thread is runnable. */
376 run_thread = NULL;
377 FOREACH_THREAD(th)
378 if (th->status == RUNNABLE) { run_thread = th; break; }
379 END_FOREACH(th);
381 /* Do the select if needed */
382 if (need_select || run_thread == NULL) {
383 struct timeval delay_tv, * delay_ptr;
384 int retcode;
385 /* If a thread is blocked on wait, don't block forever */
386 if (need_wait && delay > Thread_timeout * 1e-6) {
387 delay = Thread_timeout * 1e-6;
389 /* Convert delay to a timeval */
390 /* If a thread is runnable, just poll */
391 if (run_thread != NULL) {
392 delay_tv.tv_sec = 0;
393 delay_tv.tv_usec = 0;
394 delay_ptr = &delay_tv;
396 else if (delay != DELAY_INFTY) {
397 delay_tv.tv_sec = (unsigned int) delay;
398 delay_tv.tv_usec = (delay - (double) delay_tv.tv_sec) * 1E6;
399 delay_ptr = &delay_tv;
401 else {
402 delay_ptr = NULL;
404 enter_blocking_section();
405 retcode = select(FD_SETSIZE, &readfds, &writefds, &exceptfds, delay_ptr);
406 leave_blocking_section();
407 if (retcode == -1)
408 switch (errno) {
409 case EINTR:
410 break;
411 case EBADF:
412 /* One of the descriptors in the sets was closed or is bad.
413 Find it using fstat() and wake up the threads waiting on it
414 so that they'll get an error when operating on it. */
415 FOREACH_THREAD(th)
416 if (th->status & (BLOCKED_READ - 1)) {
417 find_bad_fd(Int_val(th->fd), &readfds);
419 if (th->status & (BLOCKED_WRITE - 1)) {
420 find_bad_fd(Int_val(th->fd), &writefds);
422 if (th->status & (BLOCKED_SELECT - 1)) {
423 find_bad_fds(th->readfds, &readfds);
424 find_bad_fds(th->writefds, &writefds);
425 find_bad_fds(th->exceptfds, &exceptfds);
427 END_FOREACH(th);
428 retcode = FD_SETSIZE;
429 break;
430 default:
431 sys_error(NO_ARG);
433 if (retcode > 0) {
434 /* Some descriptors are ready.
435 Mark the corresponding threads runnable. */
436 FOREACH_THREAD(th)
437 if (retcode <= 0) break;
438 if ((th->status & (BLOCKED_READ - 1)) &&
439 FD_ISSET(Int_val(th->fd), &readfds)) {
440 Assign(th->retval, RESUMED_IO);
441 th->status = RUNNABLE;
442 if (run_thread == NULL) run_thread = th; /* Found one. */
443 /* Wake up only one thread per fd */
444 FD_CLR(Int_val(th->fd), &readfds);
445 retcode--;
447 if ((th->status & (BLOCKED_WRITE - 1)) &&
448 FD_ISSET(Int_val(th->fd), &writefds)) {
449 Assign(th->retval, RESUMED_IO);
450 th->status = RUNNABLE;
451 if (run_thread == NULL) run_thread = th; /* Found one. */
452 /* Wake up only one thread per fd */
453 FD_CLR(Int_val(th->fd), &readfds);
454 retcode--;
456 if (th->status & (BLOCKED_SELECT - 1)) {
457 value r = Val_unit, w = Val_unit, e = Val_unit;
458 Begin_roots3(r,w,e)
459 r = inter_fdlist_set(th->readfds, &readfds, &retcode);
460 w = inter_fdlist_set(th->writefds, &writefds, &retcode);
461 e = inter_fdlist_set(th->exceptfds, &exceptfds, &retcode);
462 if (r != NO_FDS || w != NO_FDS || e != NO_FDS) {
463 value retval = alloc_small(3, TAG_RESUMED_SELECT);
464 Field(retval, 0) = r;
465 Field(retval, 1) = w;
466 Field(retval, 2) = e;
467 Assign(th->retval, retval);
468 th->status = RUNNABLE;
469 if (run_thread == NULL) run_thread = th; /* Found one. */
471 End_roots();
473 END_FOREACH(th);
475 /* If we get here with run_thread still NULL, one of the following
476 may have happened:
477 - a delay has expired
478 - a wait() needs to be polled again
479 - the select() failed (e.g. was interrupted)
480 In these cases, we go through the loop once more to make the
481 corresponding threads runnable. */
482 if (run_thread == NULL &&
483 (delay != DELAY_INFTY || need_wait || retcode == -1))
484 goto try_again;
487 /* If we haven't something to run at that point, we're in big trouble. */
488 if (run_thread == NULL) invalid_argument("Thread: deadlock");
490 /* Free everything the thread was waiting on */
491 Assign(run_thread->readfds, NO_FDS);
492 Assign(run_thread->writefds, NO_FDS);
493 Assign(run_thread->exceptfds, NO_FDS);
494 Assign(run_thread->delay, NO_DELAY);
495 Assign(run_thread->joining, NO_JOINING);
496 run_thread->waitpid = NO_WAITPID;
498 /* Activate the thread */
499 curr_thread = run_thread;
500 stack_low = curr_thread->stack_low;
501 stack_high = curr_thread->stack_high;
502 stack_threshold = curr_thread->stack_threshold;
503 extern_sp = curr_thread->sp;
504 trapsp = curr_thread->trapsp;
505 backtrace_pos = Int_val(curr_thread->backtrace_pos);
506 backtrace_buffer = curr_thread->backtrace_buffer;
507 backtrace_last_exn = curr_thread->backtrace_last_exn;
508 return curr_thread->retval;
511 /* Since context switching is not allowed in callbacks, a thread that
512 blocks during a callback is a deadlock. */
514 static void check_callback(void)
516 if (callback_depth > 1)
517 caml_fatal_error("Thread: deadlock during callback");
520 /* Reschedule without suspending the current thread */
522 value thread_yield(value unit) /* ML */
524 Assert(curr_thread != NULL);
525 Assign(curr_thread->retval, Val_unit);
526 return schedule_thread();
529 /* Honor an asynchronous request for re-scheduling */
531 static void thread_reschedule(void)
533 value accu;
535 Assert(curr_thread != NULL);
536 /* Pop accu from event frame, making it look like a C_CALL frame
537 followed by a RETURN frame */
538 accu = *extern_sp++;
539 /* Reschedule */
540 Assign(curr_thread->retval, accu);
541 accu = schedule_thread();
542 /* Push accu below C_CALL frame so that it looks like an event frame */
543 *--extern_sp = accu;
546 /* Request a re-scheduling as soon as possible */
548 value thread_request_reschedule(value unit) /* ML */
550 async_action_hook = thread_reschedule;
551 something_to_do = 1;
552 return Val_unit;
555 /* Suspend the current thread */
557 value thread_sleep(value unit) /* ML */
559 Assert(curr_thread != NULL);
560 check_callback();
561 curr_thread->status = SUSPENDED;
562 return schedule_thread();
565 /* Suspend the current thread on a read() or write() request */
567 static value thread_wait_rw(int kind, value fd)
569 /* Don't do an error if we're not initialized yet
570 (we can be called from thread-safe Pervasives before initialization),
571 just return immediately. */
572 if (curr_thread == NULL) return RESUMED_WAKEUP;
573 /* As a special case, if we're in a callback, don't fail but block
574 the whole process till I/O is possible */
575 if (callback_depth > 1) {
576 fd_set fds;
577 FD_ZERO(&fds);
578 FD_SET(Int_val(fd), &fds);
579 switch(kind) {
580 case BLOCKED_READ: select(FD_SETSIZE, &fds, NULL, NULL, NULL); break;
581 case BLOCKED_WRITE: select(FD_SETSIZE, NULL, &fds, NULL, NULL); break;
583 return RESUMED_IO;
584 } else {
585 curr_thread->fd = fd;
586 curr_thread->status = kind;
587 return schedule_thread();
591 value thread_wait_read(value fd)
593 return thread_wait_rw(BLOCKED_READ, fd);
596 value thread_wait_write(value fd)
598 return thread_wait_rw(BLOCKED_WRITE, fd);
601 /* Suspend the current thread on a read() or write() request with timeout */
603 static value thread_wait_timed_rw(int kind, value arg)
605 double date;
607 check_callback();
608 curr_thread->fd = Field(arg, 0);
609 date = timeofday() + Double_val(Field(arg, 1));
610 Assign(curr_thread->delay, copy_double(date));
611 curr_thread->status = kind | BLOCKED_DELAY;
612 return schedule_thread();
615 value thread_wait_timed_read(value arg)
617 return thread_wait_timed_rw(BLOCKED_READ, arg);
620 value thread_wait_timed_write(value arg)
622 return thread_wait_timed_rw(BLOCKED_WRITE, arg);
625 /* Suspend the current thread on a select() request */
627 value thread_select(value arg) /* ML */
629 double date;
630 check_callback();
631 Assign(curr_thread->readfds, Field(arg, 0));
632 Assign(curr_thread->writefds, Field(arg, 1));
633 Assign(curr_thread->exceptfds, Field(arg, 2));
634 date = Double_val(Field(arg, 3));
635 if (date >= 0.0) {
636 date += timeofday();
637 Assign(curr_thread->delay, copy_double(date));
638 curr_thread->status = BLOCKED_SELECT | BLOCKED_DELAY;
639 } else {
640 curr_thread->status = BLOCKED_SELECT;
642 return schedule_thread();
645 /* Primitives to implement suspension on buffered channels */
647 value thread_inchan_ready(value vchan) /* ML */
649 struct channel * chan = Channel(vchan);
650 return Val_bool(chan->curr < chan->max);
653 value thread_outchan_ready(value vchan, value vsize) /* ML */
655 struct channel * chan = Channel(vchan);
656 intnat size = Long_val(vsize);
657 /* Negative size means we want to flush the buffer entirely */
658 if (size < 0) {
659 return Val_bool(chan->curr == chan->buff);
660 } else {
661 int free = chan->end - chan->curr;
662 if (chan->curr == chan->buff)
663 return Val_bool(size < free);
664 else
665 return Val_bool(size <= free);
669 /* Suspend the current thread for some time */
671 value thread_delay(value time) /* ML */
673 double date = timeofday() + Double_val(time);
674 Assert(curr_thread != NULL);
675 check_callback();
676 curr_thread->status = BLOCKED_DELAY;
677 Assign(curr_thread->delay, copy_double(date));
678 return schedule_thread();
681 /* Suspend the current thread until another thread terminates */
683 value thread_join(value th) /* ML */
685 check_callback();
686 Assert(curr_thread != NULL);
687 if (((caml_thread_t)th)->status == KILLED) return Val_unit;
688 curr_thread->status = BLOCKED_JOIN;
689 Assign(curr_thread->joining, th);
690 return schedule_thread();
693 /* Suspend the current thread until a Unix process exits */
695 value thread_wait_pid(value pid) /* ML */
697 Assert(curr_thread != NULL);
698 check_callback();
699 curr_thread->status = BLOCKED_WAIT;
700 curr_thread->waitpid = pid;
701 return schedule_thread();
704 /* Reactivate another thread */
706 value thread_wakeup(value thread) /* ML */
708 caml_thread_t th = (caml_thread_t) thread;
709 switch (th->status) {
710 case SUSPENDED:
711 th->status = RUNNABLE;
712 Assign(th->retval, RESUMED_WAKEUP);
713 break;
714 case KILLED:
715 failwith("Thread.wakeup: killed thread");
716 default:
717 failwith("Thread.wakeup: thread not suspended");
719 return Val_unit;
722 /* Return the current thread */
724 value thread_self(value unit) /* ML */
726 Assert(curr_thread != NULL);
727 return (value) curr_thread;
730 /* Kill a thread */
732 value thread_kill(value thread) /* ML */
734 value retval = Val_unit;
735 caml_thread_t th = (caml_thread_t) thread;
736 if (th->status == KILLED) failwith("Thread.kill: killed thread");
737 /* Don't paint ourselves in a corner */
738 if (th == th->next) failwith("Thread.kill: cannot kill the last thread");
739 /* This thread is no longer waiting on anything */
740 th->status = KILLED;
741 /* If this is the current thread, activate another one */
742 if (th == curr_thread) {
743 Begin_root(thread);
744 retval = schedule_thread();
745 th = (caml_thread_t) thread;
746 End_roots();
748 /* Remove thread from the doubly-linked list */
749 Assign(th->prev->next, th->next);
750 Assign(th->next->prev, th->prev);
751 /* Free its resources */
752 stat_free((char *) th->stack_low);
753 th->stack_low = NULL;
754 th->stack_high = NULL;
755 th->stack_threshold = NULL;
756 th->sp = NULL;
757 th->trapsp = NULL;
758 if (th->backtrace_buffer != NULL) {
759 free(th->backtrace_buffer);
760 th->backtrace_buffer = NULL;
762 return retval;
765 /* Print uncaught exception and backtrace */
767 value thread_uncaught_exception(value exn) /* ML */
769 char * msg = format_caml_exception(exn);
770 fprintf(stderr, "Thread %d killed on uncaught exception %s\n",
771 Int_val(curr_thread->ident), msg);
772 free(msg);
773 if (backtrace_active) print_exception_backtrace();
774 fflush(stderr);
775 return Val_unit;
778 /* Set a list of file descriptors in a fdset */
780 static void add_fdlist_to_set(value fdl, fd_set *set)
782 for (/*nothing*/; fdl != NO_FDS; fdl = Field(fdl, 1)) {
783 int fd = Int_val(Field(fdl, 0));
784 /* Ignore funky file descriptors, which can cause crashes */
785 if (fd >= 0 && fd < FD_SETSIZE) FD_SET(fd, set);
789 /* Build the intersection of a list and a fdset (the list of file descriptors
790 which are both in the list and in the fdset). */
792 static value inter_fdlist_set(value fdl, fd_set *set, int *count)
794 value res = Val_unit;
795 value cons;
797 Begin_roots2(fdl, res);
798 for (res = NO_FDS; fdl != NO_FDS; fdl = Field(fdl, 1)) {
799 int fd = Int_val(Field(fdl, 0));
800 if (FD_ISSET(fd, set)) {
801 cons = alloc_small(2, 0);
802 Field(cons, 0) = Val_int(fd);
803 Field(cons, 1) = res;
804 res = cons;
805 FD_CLR(fd, set); /* wake up only one thread per fd ready */
806 (*count)--;
809 End_roots();
810 return res;
813 /* Find closed file descriptors in a waiting list and set them to 1 in
814 the given fdset */
816 static void find_bad_fd(int fd, fd_set *set)
818 struct stat s;
819 if (fd >= 0 && fd < FD_SETSIZE && fstat(fd, &s) == -1 && errno == EBADF)
820 FD_SET(fd, set);
823 static void find_bad_fds(value fdl, fd_set *set)
825 for (/*nothing*/; fdl != NO_FDS; fdl = Field(fdl, 1))
826 find_bad_fd(Int_val(Field(fdl, 0)), set);
829 /* Auxiliary function for allocating the result of a waitpid() call */
831 #if !(defined(WIFEXITED) && defined(WEXITSTATUS) && defined(WIFSTOPPED) && \
832 defined(WSTOPSIG) && defined(WTERMSIG))
833 /* Assume old-style V7 status word */
834 #define WIFEXITED(status) (((status) & 0xFF) == 0)
835 #define WEXITSTATUS(status) (((status) >> 8) & 0xFF)
836 #define WIFSTOPPED(status) (((status) & 0xFF) == 0xFF)
837 #define WSTOPSIG(status) (((status) >> 8) & 0xFF)
838 #define WTERMSIG(status) ((status) & 0x3F)
839 #endif
841 #define TAG_WEXITED 0
842 #define TAG_WSIGNALED 1
843 #define TAG_WSTOPPED 2
845 static value alloc_process_status(int pid, int status)
847 value st, res;
849 if (WIFEXITED(status)) {
850 st = alloc_small(1, TAG_WEXITED);
851 Field(st, 0) = Val_int(WEXITSTATUS(status));
853 else if (WIFSTOPPED(status)) {
854 st = alloc_small(1, TAG_WSTOPPED);
855 Field(st, 0) = Val_int(WSTOPSIG(status));
857 else {
858 st = alloc_small(1, TAG_WSIGNALED);
859 Field(st, 0) = Val_int(WTERMSIG(status));
861 Begin_root(st);
862 res = alloc_small(2, TAG_RESUMED_WAIT);
863 Field(res, 0) = Val_int(pid);
864 Field(res, 1) = st;
865 End_roots();
866 return res;
869 /* Restore the standard file descriptors to their initial state */
871 static void thread_restore_std_descr(void)
873 if (stdin_initial_status != -1) fcntl(0, F_SETFL, stdin_initial_status);
874 if (stdout_initial_status != -1) fcntl(1, F_SETFL, stdout_initial_status);
875 if (stderr_initial_status != -1) fcntl(2, F_SETFL, stderr_initial_status);