1 /* included by thread.c */
2 #include "ccan/list/list.h"
4 static VALUE rb_cMutex
, rb_cQueue
, rb_cSizedQueue
, rb_cConditionVariable
;
5 static VALUE rb_eClosedQueueError
;
8 typedef struct rb_mutex_struct
{
10 struct rb_mutex_struct
*next_mutex
;
11 struct list_head waitq
; /* protected by GVL */
14 /* sync_waiter is always on-stack */
19 struct list_node node
;
22 #define MUTEX_ALLOW_TRAP FL_USER1
25 sync_wakeup(struct list_head
*head
, long max
)
27 struct sync_waiter
*cur
= 0, *next
;
29 list_for_each_safe(head
, cur
, next
, node
) {
30 list_del_init(&cur
->node
);
32 if (cur
->th
->status
!= THREAD_KILLED
) {
34 if (cur
->th
->scheduler
!= Qnil
&& rb_fiberptr_blocking(cur
->fiber
) == 0) {
35 rb_fiber_scheduler_unblock(cur
->th
->scheduler
, cur
->self
, rb_fiberptr_self(cur
->fiber
));
38 rb_threadptr_interrupt(cur
->th
);
39 cur
->th
->status
= THREAD_RUNNABLE
;
42 if (--max
== 0) return;
48 wakeup_one(struct list_head
*head
)
54 wakeup_all(struct list_head
*head
)
56 sync_wakeup(head
, LONG_MAX
);
59 #if defined(HAVE_WORKING_FORK)
60 static void rb_mutex_abandon_all(rb_mutex_t
*mutexes
);
61 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t
*th
);
62 static void rb_mutex_abandon_locking_mutex(rb_thread_t
*th
);
64 static const char* rb_mutex_unlock_th(rb_mutex_t
*mutex
, rb_thread_t
*th
, rb_fiber_t
*fiber
);
67 * Document-class: Thread::Mutex
69 * Thread::Mutex implements a simple semaphore that can be used to
70 * coordinate access to shared data from multiple concurrent threads.
74 * semaphore = Thread::Mutex.new
77 * semaphore.synchronize {
78 * # access shared resource
83 * semaphore.synchronize {
84 * # access shared resource
90 #define mutex_mark ((void(*)(void*))0)
93 rb_mutex_num_waiting(rb_mutex_t
*mutex
)
95 struct sync_waiter
*w
= 0;
98 list_for_each(&mutex
->waitq
, w
, node
) {
105 rb_thread_t
* rb_fiber_threadptr(const rb_fiber_t
*fiber
);
108 mutex_free(void *ptr
)
110 rb_mutex_t
*mutex
= ptr
;
112 /* rb_warn("free locked mutex"); */
113 const char *err
= rb_mutex_unlock_th(mutex
, rb_fiber_threadptr(mutex
->fiber
), mutex
->fiber
);
114 if (err
) rb_bug("%s", err
);
120 mutex_memsize(const void *ptr
)
122 return sizeof(rb_mutex_t
);
125 static const rb_data_type_t mutex_data_type
= {
127 {mutex_mark
, mutex_free
, mutex_memsize
,},
128 0, 0, RUBY_TYPED_WB_PROTECTED
| RUBY_TYPED_FREE_IMMEDIATELY
136 TypedData_Get_Struct(obj
, rb_mutex_t
, &mutex_data_type
, mutex
);
142 rb_obj_is_mutex(VALUE obj
)
144 return RBOOL(rb_typeddata_is_kind_of(obj
, &mutex_data_type
));
148 mutex_alloc(VALUE klass
)
153 obj
= TypedData_Make_Struct(klass
, rb_mutex_t
, &mutex_data_type
, mutex
);
155 list_head_init(&mutex
->waitq
);
161 * Thread::Mutex.new -> mutex
163 * Creates a new Mutex
166 mutex_initialize(VALUE self
)
174 return mutex_alloc(rb_cMutex
);
179 * mutex.locked? -> true or false
181 * Returns +true+ if this lock is currently held by some thread.
184 rb_mutex_locked_p(VALUE self
)
186 rb_mutex_t
*mutex
= mutex_ptr(self
);
188 return RBOOL(mutex
->fiber
);
192 thread_mutex_insert(rb_thread_t
*thread
, rb_mutex_t
*mutex
)
194 if (thread
->keeping_mutexes
) {
195 mutex
->next_mutex
= thread
->keeping_mutexes
;
198 thread
->keeping_mutexes
= mutex
;
202 thread_mutex_remove(rb_thread_t
*thread
, rb_mutex_t
*mutex
)
204 rb_mutex_t
**keeping_mutexes
= &thread
->keeping_mutexes
;
206 while (*keeping_mutexes
&& *keeping_mutexes
!= mutex
) {
207 // Move to the next mutex in the list:
208 keeping_mutexes
= &(*keeping_mutexes
)->next_mutex
;
211 if (*keeping_mutexes
) {
212 *keeping_mutexes
= mutex
->next_mutex
;
213 mutex
->next_mutex
= NULL
;
218 mutex_locked(rb_thread_t
*th
, VALUE self
)
220 rb_mutex_t
*mutex
= mutex_ptr(self
);
222 thread_mutex_insert(th
, mutex
);
227 * mutex.try_lock -> true or false
229 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
233 rb_mutex_trylock(VALUE self
)
235 rb_mutex_t
*mutex
= mutex_ptr(self
);
237 if (mutex
->fiber
== 0) {
238 rb_fiber_t
*fiber
= GET_EC()->fiber_ptr
;
239 rb_thread_t
*th
= GET_THREAD();
240 mutex
->fiber
= fiber
;
242 mutex_locked(th
, self
);
250 * At maximum, only one thread can use cond_timedwait and watch deadlock
251 * periodically. Multiple polling thread (i.e. concurrent deadlock check)
252 * introduces new race conditions. [Bug #6278] [ruby-core:44275]
254 static const rb_thread_t
*patrol_thread
= NULL
;
257 mutex_owned_p(rb_fiber_t
*fiber
, rb_mutex_t
*mutex
)
259 return RBOOL(mutex
->fiber
== fiber
);
263 call_rb_fiber_scheduler_block(VALUE mutex
)
265 return rb_fiber_scheduler_block(rb_fiber_scheduler_current(), mutex
, Qnil
);
269 delete_from_waitq(VALUE value
)
271 struct sync_waiter
*sync_waiter
= (void *)value
;
272 list_del(&sync_waiter
->node
);
278 do_mutex_lock(VALUE self
, int interruptible_p
)
280 rb_execution_context_t
*ec
= GET_EC();
281 rb_thread_t
*th
= ec
->thread_ptr
;
282 rb_fiber_t
*fiber
= ec
->fiber_ptr
;
283 rb_mutex_t
*mutex
= mutex_ptr(self
);
285 /* When running trap handler */
286 if (!FL_TEST_RAW(self
, MUTEX_ALLOW_TRAP
) &&
287 th
->ec
->interrupt_mask
& TRAP_INTERRUPT_MASK
) {
288 rb_raise(rb_eThreadError
, "can't be called from trap context");
291 if (rb_mutex_trylock(self
) == Qfalse
) {
292 if (mutex
->fiber
== fiber
) {
293 rb_raise(rb_eThreadError
, "deadlock; recursive locking");
296 while (mutex
->fiber
!= fiber
) {
297 VALUE scheduler
= rb_fiber_scheduler_current();
298 if (scheduler
!= Qnil
) {
299 struct sync_waiter sync_waiter
= {
305 list_add_tail(&mutex
->waitq
, &sync_waiter
.node
);
307 rb_ensure(call_rb_fiber_scheduler_block
, self
, delete_from_waitq
, (VALUE
)&sync_waiter
);
310 mutex
->fiber
= fiber
;
314 enum rb_thread_status prev_status
= th
->status
;
315 rb_hrtime_t
*timeout
= 0;
316 rb_hrtime_t rel
= rb_msec2hrtime(100);
318 th
->status
= THREAD_STOPPED_FOREVER
;
319 th
->locking_mutex
= self
;
320 rb_ractor_sleeper_threads_inc(th
->ractor
);
322 * Carefully! while some contended threads are in native_sleep(),
323 * ractor->sleeper is unstable value. we have to avoid both deadlock
326 if ((rb_ractor_living_thread_num(th
->ractor
) == rb_ractor_sleeper_thread_num(th
->ractor
)) &&
332 struct sync_waiter sync_waiter
= {
338 list_add_tail(&mutex
->waitq
, &sync_waiter
.node
);
340 native_sleep(th
, timeout
); /* release GVL */
342 list_del(&sync_waiter
.node
);
345 mutex
->fiber
= fiber
;
348 if (patrol_thread
== th
)
349 patrol_thread
= NULL
;
351 th
->locking_mutex
= Qfalse
;
352 if (mutex
->fiber
&& timeout
&& !RUBY_VM_INTERRUPTED(th
->ec
)) {
353 rb_check_deadlock(th
->ractor
);
355 if (th
->status
== THREAD_STOPPED_FOREVER
) {
356 th
->status
= prev_status
;
358 rb_ractor_sleeper_threads_dec(th
->ractor
);
361 if (interruptible_p
) {
362 /* release mutex before checking for interrupts...as interrupt checking
363 * code might call rb_raise() */
364 if (mutex
->fiber
== fiber
) mutex
->fiber
= 0;
365 RUBY_VM_CHECK_INTS_BLOCKING(th
->ec
); /* may release mutex */
367 mutex
->fiber
= fiber
;
372 if (mutex
->fiber
== fiber
) mutex_locked(th
, self
);
376 if (mutex_owned_p(fiber
, mutex
) == Qfalse
) rb_bug("do_mutex_lock: mutex is not owned.");
382 mutex_lock_uninterruptible(VALUE self
)
384 return do_mutex_lock(self
, 0);
391 * Attempts to grab the lock and waits if it isn't available.
392 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
395 rb_mutex_lock(VALUE self
)
397 return do_mutex_lock(self
, 1);
402 * mutex.owned? -> true or false
404 * Returns +true+ if this lock is currently held by current thread.
407 rb_mutex_owned_p(VALUE self
)
409 rb_fiber_t
*fiber
= GET_EC()->fiber_ptr
;
410 rb_mutex_t
*mutex
= mutex_ptr(self
);
412 return mutex_owned_p(fiber
, mutex
);
416 rb_mutex_unlock_th(rb_mutex_t
*mutex
, rb_thread_t
*th
, rb_fiber_t
*fiber
)
418 const char *err
= NULL
;
420 if (mutex
->fiber
== 0) {
421 err
= "Attempt to unlock a mutex which is not locked";
423 else if (mutex
->fiber
!= fiber
) {
424 err
= "Attempt to unlock a mutex which is locked by another thread/fiber";
427 struct sync_waiter
*cur
= 0, *next
;
430 list_for_each_safe(&mutex
->waitq
, cur
, next
, node
) {
431 list_del_init(&cur
->node
);
433 if (cur
->th
->scheduler
!= Qnil
&& rb_fiberptr_blocking(cur
->fiber
) == 0) {
434 rb_fiber_scheduler_unblock(cur
->th
->scheduler
, cur
->self
, rb_fiberptr_self(cur
->fiber
));
438 switch (cur
->th
->status
) {
439 case THREAD_RUNNABLE
: /* from someone else calling Thread#run */
440 case THREAD_STOPPED_FOREVER
: /* likely (rb_mutex_lock) */
441 rb_threadptr_interrupt(cur
->th
);
443 case THREAD_STOPPED
: /* probably impossible */
444 rb_bug("unexpected THREAD_STOPPED");
446 /* not sure about this, possible in exit GC? */
447 rb_bug("unexpected THREAD_KILLED");
454 thread_mutex_remove(th
, mutex
);
462 * mutex.unlock -> self
465 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
468 rb_mutex_unlock(VALUE self
)
471 rb_mutex_t
*mutex
= mutex_ptr(self
);
472 rb_thread_t
*th
= GET_THREAD();
474 err
= rb_mutex_unlock_th(mutex
, th
, GET_EC()->fiber_ptr
);
475 if (err
) rb_raise(rb_eThreadError
, "%s", err
);
480 #if defined(HAVE_WORKING_FORK)
482 rb_mutex_abandon_keeping_mutexes(rb_thread_t
*th
)
484 rb_mutex_abandon_all(th
->keeping_mutexes
);
485 th
->keeping_mutexes
= NULL
;
489 rb_mutex_abandon_locking_mutex(rb_thread_t
*th
)
491 if (th
->locking_mutex
) {
492 rb_mutex_t
*mutex
= mutex_ptr(th
->locking_mutex
);
494 list_head_init(&mutex
->waitq
);
495 th
->locking_mutex
= Qfalse
;
500 rb_mutex_abandon_all(rb_mutex_t
*mutexes
)
506 mutexes
= mutex
->next_mutex
;
508 mutex
->next_mutex
= 0;
509 list_head_init(&mutex
->waitq
);
515 rb_mutex_sleep_forever(VALUE self
)
517 rb_thread_sleep_deadly_allow_spurious_wakeup(self
);
522 rb_mutex_wait_for(VALUE time
)
524 rb_hrtime_t
*rel
= (rb_hrtime_t
*)time
;
525 /* permit spurious check */
526 return RBOOL(sleep_hrtime(GET_THREAD(), *rel
, 0));
530 rb_mutex_sleep(VALUE self
, VALUE timeout
)
535 if (!NIL_P(timeout
)) {
536 t
= rb_time_interval(timeout
);
539 rb_mutex_unlock(self
);
540 time_t beg
= time(0);
542 VALUE scheduler
= rb_fiber_scheduler_current();
543 if (scheduler
!= Qnil
) {
544 rb_fiber_scheduler_kernel_sleep(scheduler
, timeout
);
545 mutex_lock_uninterruptible(self
);
548 if (NIL_P(timeout
)) {
549 rb_ensure(rb_mutex_sleep_forever
, self
, mutex_lock_uninterruptible
, self
);
552 rb_hrtime_t rel
= rb_timeval2hrtime(&t
);
553 woken
= rb_ensure(rb_mutex_wait_for
, (VALUE
)&rel
, mutex_lock_uninterruptible
, self
);
557 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
558 if (!woken
) return Qnil
;
559 time_t end
= time(0) - beg
;
560 return TIMET2NUM(end
);
565 * mutex.sleep(timeout = nil) -> number or nil
567 * Releases the lock and sleeps +timeout+ seconds if it is given and
568 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
569 * the current thread.
571 * When the thread is next woken up, it will attempt to reacquire
574 * Note that this method can wakeup without explicit Thread#wakeup call.
575 * For example, receiving signal and so on.
577 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
580 mutex_sleep(int argc
, VALUE
*argv
, VALUE self
)
584 timeout
= rb_check_arity(argc
, 0, 1) ? argv
[0] : Qnil
;
585 return rb_mutex_sleep(self
, timeout
);
590 * mutex.synchronize { ... } -> result of the block
592 * Obtains a lock, runs the block, and releases the lock when the block
593 * completes. See the example under Thread::Mutex.
597 rb_mutex_synchronize(VALUE mutex
, VALUE (*func
)(VALUE arg
), VALUE arg
)
599 rb_mutex_lock(mutex
);
600 return rb_ensure(func
, arg
, rb_mutex_unlock
, mutex
);
605 * mutex.synchronize { ... } -> result of the block
607 * Obtains a lock, runs the block, and releases the lock when the block
608 * completes. See the example under Thread::Mutex.
611 rb_mutex_synchronize_m(VALUE self
)
613 if (!rb_block_given_p()) {
614 rb_raise(rb_eThreadError
, "must be called with a block");
617 return rb_mutex_synchronize(self
, rb_yield
, Qundef
);
620 void rb_mutex_allow_trap(VALUE self
, int val
)
622 Check_TypedStruct(self
, &mutex_data_type
);
625 FL_SET_RAW(self
, MUTEX_ALLOW_TRAP
);
627 FL_UNSET_RAW(self
, MUTEX_ALLOW_TRAP
);
632 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
633 PACKED_STRUCT_UNALIGNED(struct rb_queue
{
634 struct list_head waitq
;
635 rb_serial_t fork_gen
;
640 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
641 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
642 PACKED_STRUCT_UNALIGNED(struct rb_szqueue
{
644 int num_waiting_push
;
645 struct list_head pushq
;
650 queue_mark(void *ptr
)
652 struct rb_queue
*q
= ptr
;
654 /* no need to mark threads in waitq, they are on stack */
659 queue_memsize(const void *ptr
)
661 return sizeof(struct rb_queue
);
664 static const rb_data_type_t queue_data_type
= {
666 {queue_mark
, RUBY_TYPED_DEFAULT_FREE
, queue_memsize
,},
667 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|RUBY_TYPED_WB_PROTECTED
671 queue_alloc(VALUE klass
)
676 obj
= TypedData_Make_Struct(klass
, struct rb_queue
, &queue_data_type
, q
);
677 list_head_init(queue_waitq(q
));
682 queue_fork_check(struct rb_queue
*q
)
684 rb_serial_t fork_gen
= GET_VM()->fork_gen
;
686 if (q
->fork_gen
== fork_gen
) {
689 /* forked children can't reach into parent thread stacks */
690 q
->fork_gen
= fork_gen
;
691 list_head_init(queue_waitq(q
));
696 static struct rb_queue
*
701 TypedData_Get_Struct(obj
, struct rb_queue
, &queue_data_type
, q
);
707 #define QUEUE_CLOSED FL_USER5
710 szqueue_mark(void *ptr
)
712 struct rb_szqueue
*sq
= ptr
;
718 szqueue_memsize(const void *ptr
)
720 return sizeof(struct rb_szqueue
);
723 static const rb_data_type_t szqueue_data_type
= {
725 {szqueue_mark
, RUBY_TYPED_DEFAULT_FREE
, szqueue_memsize
,},
726 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|RUBY_TYPED_WB_PROTECTED
730 szqueue_alloc(VALUE klass
)
732 struct rb_szqueue
*sq
;
733 VALUE obj
= TypedData_Make_Struct(klass
, struct rb_szqueue
,
734 &szqueue_data_type
, sq
);
735 list_head_init(szqueue_waitq(sq
));
736 list_head_init(szqueue_pushq(sq
));
740 static struct rb_szqueue
*
741 szqueue_ptr(VALUE obj
)
743 struct rb_szqueue
*sq
;
745 TypedData_Get_Struct(obj
, struct rb_szqueue
, &szqueue_data_type
, sq
);
746 if (queue_fork_check(&sq
->q
)) {
747 list_head_init(szqueue_pushq(sq
));
748 sq
->num_waiting_push
= 0;
757 return rb_ary_tmp_new(1);
761 check_array(VALUE obj
, VALUE ary
)
763 if (!RB_TYPE_P(ary
, T_ARRAY
)) {
764 rb_raise(rb_eTypeError
, "%+"PRIsVALUE
" not initialized", obj
);
770 queue_length(VALUE self
, struct rb_queue
*q
)
772 return RARRAY_LEN(check_array(self
, q
->que
));
776 queue_closed_p(VALUE self
)
778 return FL_TEST_RAW(self
, QUEUE_CLOSED
) != 0;
782 * Document-class: ClosedQueueError
784 * The exception class which will be raised when pushing into a closed
785 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
788 NORETURN(static void raise_closed_queue_error(VALUE self
));
791 raise_closed_queue_error(VALUE self
)
793 rb_raise(rb_eClosedQueueError
, "queue closed");
797 queue_closed_result(VALUE self
, struct rb_queue
*q
)
799 assert(queue_length(self
, q
) == 0);
804 * Document-class: Thread::Queue
806 * The Thread::Queue class implements multi-producer, multi-consumer
807 * queues. It is especially useful in threaded programming when
808 * information must be exchanged safely between multiple threads. The
809 * Thread::Queue class implements all the required locking semantics.
811 * The class implements FIFO type of queue. In a FIFO queue, the first
812 * tasks added are the first retrieved.
816 * queue = Thread::Queue.new
818 * producer = Thread.new do
820 * sleep rand(i) # simulate expense
822 * puts "#{i} produced"
826 * consumer = Thread.new do
829 * sleep rand(i/2) # simulate expense
830 * puts "consumed #{value}"
839 * Document-method: Queue::new
842 * Thread::Queue.new -> empty_queue
843 * Thread::Queue.new(enumerable) -> queue
845 * Creates a new queue instance, optionally using the contents of an +enumerable+
846 * for its initial state.
850 * q = Thread::Queue.new
851 * #=> #<Thread::Queue:0x00007ff7501110d0>
855 * q = Thread::Queue.new([1, 2, 3])
856 * #=> #<Thread::Queue:0x00007ff7500ec500>
864 rb_queue_initialize(int argc
, VALUE
*argv
, VALUE self
)
867 struct rb_queue
*q
= queue_ptr(self
);
868 if ((argc
= rb_scan_args(argc
, argv
, "01", &initial
)) == 1) {
869 initial
= rb_to_array(initial
);
871 RB_OBJ_WRITE(self
, &q
->que
, ary_buf_new());
872 list_head_init(queue_waitq(q
));
874 rb_ary_concat(q
->que
, initial
);
880 queue_do_push(VALUE self
, struct rb_queue
*q
, VALUE obj
)
882 if (queue_closed_p(self
)) {
883 raise_closed_queue_error(self
);
885 rb_ary_push(check_array(self
, q
->que
), obj
);
886 wakeup_one(queue_waitq(q
));
891 * Document-method: Thread::Queue#close
895 * Closes the queue. A closed queue cannot be re-opened.
897 * After the call to close completes, the following are true:
899 * - +closed?+ will return true
901 * - +close+ will be ignored.
903 * - calling enq/push/<< will raise a +ClosedQueueError+.
905 * - when +empty?+ is false, calling deq/pop/shift will return an object
906 * from the queue as usual.
907 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
908 * deq(true) will raise a +ThreadError+.
910 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
914 * q = Thread::Queue.new
916 * while e = q.deq # wait for nil to break loop
924 rb_queue_close(VALUE self
)
926 struct rb_queue
*q
= queue_ptr(self
);
928 if (!queue_closed_p(self
)) {
929 FL_SET(self
, QUEUE_CLOSED
);
931 wakeup_all(queue_waitq(q
));
938 * Document-method: Thread::Queue#closed?
941 * Returns +true+ if the queue is closed.
945 rb_queue_closed_p(VALUE self
)
947 return RBOOL(queue_closed_p(self
));
951 * Document-method: Thread::Queue#push
957 * Pushes the given +object+ to the queue.
961 rb_queue_push(VALUE self
, VALUE obj
)
963 return queue_do_push(self
, queue_ptr(self
), obj
);
967 queue_sleep(VALUE self
)
969 rb_thread_sleep_deadly_allow_spurious_wakeup(self
);
973 struct queue_waiter
{
974 struct sync_waiter w
;
977 struct rb_szqueue
*sq
;
982 queue_sleep_done(VALUE p
)
984 struct queue_waiter
*qw
= (struct queue_waiter
*)p
;
986 list_del(&qw
->w
.node
);
987 qw
->as
.q
->num_waiting
--;
993 szqueue_sleep_done(VALUE p
)
995 struct queue_waiter
*qw
= (struct queue_waiter
*)p
;
997 list_del(&qw
->w
.node
);
998 qw
->as
.sq
->num_waiting_push
--;
1004 queue_do_pop(VALUE self
, struct rb_queue
*q
, int should_block
)
1006 check_array(self
, q
->que
);
1008 while (RARRAY_LEN(q
->que
) == 0) {
1009 if (!should_block
) {
1010 rb_raise(rb_eThreadError
, "queue empty");
1012 else if (queue_closed_p(self
)) {
1013 return queue_closed_result(self
, q
);
1016 rb_execution_context_t
*ec
= GET_EC();
1018 assert(RARRAY_LEN(q
->que
) == 0);
1019 assert(queue_closed_p(self
) == 0);
1021 struct queue_waiter queue_waiter
= {
1022 .w
= {.self
= self
, .th
= ec
->thread_ptr
, .fiber
= ec
->fiber_ptr
},
1026 struct list_head
*waitq
= queue_waitq(q
);
1028 list_add_tail(waitq
, &queue_waiter
.w
.node
);
1029 queue_waiter
.as
.q
->num_waiting
++;
1031 rb_ensure(queue_sleep
, self
, queue_sleep_done
, (VALUE
)&queue_waiter
);
1035 return rb_ary_shift(q
->que
);
1039 queue_pop_should_block(int argc
, const VALUE
*argv
)
1041 int should_block
= 1;
1042 rb_check_arity(argc
, 0, 1);
1044 should_block
= !RTEST(argv
[0]);
1046 return should_block
;
1050 * Document-method: Thread::Queue#pop
1052 * pop(non_block=false)
1053 * deq(non_block=false)
1054 * shift(non_block=false)
1056 * Retrieves data from the queue.
1058 * If the queue is empty, the calling thread is suspended until data is pushed
1059 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1060 * +ThreadError+ is raised.
1064 rb_queue_pop(int argc
, VALUE
*argv
, VALUE self
)
1066 int should_block
= queue_pop_should_block(argc
, argv
);
1067 return queue_do_pop(self
, queue_ptr(self
), should_block
);
1071 * Document-method: Thread::Queue#empty?
1074 * Returns +true+ if the queue is empty.
1078 rb_queue_empty_p(VALUE self
)
1080 return RBOOL(queue_length(self
, queue_ptr(self
)) == 0);
1084 * Document-method: Thread::Queue#clear
1086 * Removes all objects from the queue.
1090 rb_queue_clear(VALUE self
)
1092 struct rb_queue
*q
= queue_ptr(self
);
1094 rb_ary_clear(check_array(self
, q
->que
));
1099 * Document-method: Thread::Queue#length
1104 * Returns the length of the queue.
1108 rb_queue_length(VALUE self
)
1110 return LONG2NUM(queue_length(self
, queue_ptr(self
)));
1114 * Document-method: Thread::Queue#num_waiting
1116 * Returns the number of threads waiting on the queue.
1120 rb_queue_num_waiting(VALUE self
)
1122 struct rb_queue
*q
= queue_ptr(self
);
1124 return INT2NUM(q
->num_waiting
);
1128 * Document-class: Thread::SizedQueue
1130 * This class represents queues of specified size capacity. The push operation
1131 * may be blocked if the capacity is full.
1133 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1137 * Document-method: SizedQueue::new
1138 * call-seq: new(max)
1140 * Creates a fixed-length queue with a maximum size of +max+.
1144 rb_szqueue_initialize(VALUE self
, VALUE vmax
)
1147 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1149 max
= NUM2LONG(vmax
);
1151 rb_raise(rb_eArgError
, "queue size must be positive");
1154 RB_OBJ_WRITE(self
, &sq
->q
.que
, ary_buf_new());
1155 list_head_init(szqueue_waitq(sq
));
1156 list_head_init(szqueue_pushq(sq
));
1163 * Document-method: Thread::SizedQueue#close
1167 * Similar to Thread::Queue#close.
1169 * The difference is behavior with waiting enqueuing threads.
1171 * If there are waiting enqueuing threads, they are interrupted by
1172 * raising ClosedQueueError('queue closed').
1175 rb_szqueue_close(VALUE self
)
1177 if (!queue_closed_p(self
)) {
1178 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1180 FL_SET(self
, QUEUE_CLOSED
);
1181 wakeup_all(szqueue_waitq(sq
));
1182 wakeup_all(szqueue_pushq(sq
));
1188 * Document-method: Thread::SizedQueue#max
1190 * Returns the maximum size of the queue.
1194 rb_szqueue_max_get(VALUE self
)
1196 return LONG2NUM(szqueue_ptr(self
)->max
);
1200 * Document-method: Thread::SizedQueue#max=
1201 * call-seq: max=(number)
1203 * Sets the maximum size of the queue to the given +number+.
1207 rb_szqueue_max_set(VALUE self
, VALUE vmax
)
1209 long max
= NUM2LONG(vmax
);
1211 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1214 rb_raise(rb_eArgError
, "queue size must be positive");
1216 if (max
> sq
->max
) {
1217 diff
= max
- sq
->max
;
1220 sync_wakeup(szqueue_pushq(sq
), diff
);
1225 szqueue_push_should_block(int argc
, const VALUE
*argv
)
1227 int should_block
= 1;
1228 rb_check_arity(argc
, 1, 2);
1230 should_block
= !RTEST(argv
[1]);
1232 return should_block
;
1236 * Document-method: Thread::SizedQueue#push
1238 * push(object, non_block=false)
1239 * enq(object, non_block=false)
1242 * Pushes +object+ to the queue.
1244 * If there is no space left in the queue, waits until space becomes
1245 * available, unless +non_block+ is true. If +non_block+ is true, the
1246 * thread isn't suspended, and +ThreadError+ is raised.
1250 rb_szqueue_push(int argc
, VALUE
*argv
, VALUE self
)
1252 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1253 int should_block
= szqueue_push_should_block(argc
, argv
);
1255 while (queue_length(self
, &sq
->q
) >= sq
->max
) {
1256 if (!should_block
) {
1257 rb_raise(rb_eThreadError
, "queue full");
1259 else if (queue_closed_p(self
)) {
1263 rb_execution_context_t
*ec
= GET_EC();
1264 struct queue_waiter queue_waiter
= {
1265 .w
= {.self
= self
, .th
= ec
->thread_ptr
, .fiber
= ec
->fiber_ptr
},
1269 struct list_head
*pushq
= szqueue_pushq(sq
);
1271 list_add_tail(pushq
, &queue_waiter
.w
.node
);
1272 sq
->num_waiting_push
++;
1274 rb_ensure(queue_sleep
, self
, szqueue_sleep_done
, (VALUE
)&queue_waiter
);
1278 if (queue_closed_p(self
)) {
1279 raise_closed_queue_error(self
);
1282 return queue_do_push(self
, &sq
->q
, argv
[0]);
1286 szqueue_do_pop(VALUE self
, int should_block
)
1288 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1289 VALUE retval
= queue_do_pop(self
, &sq
->q
, should_block
);
1291 if (queue_length(self
, &sq
->q
) < sq
->max
) {
1292 wakeup_one(szqueue_pushq(sq
));
1299 * Document-method: Thread::SizedQueue#pop
1301 * pop(non_block=false)
1302 * deq(non_block=false)
1303 * shift(non_block=false)
1305 * Retrieves data from the queue.
1307 * If the queue is empty, the calling thread is suspended until data is pushed
1308 * onto the queue. If +non_block+ is true, the thread isn't suspended, and
1309 * +ThreadError+ is raised.
1313 rb_szqueue_pop(int argc
, VALUE
*argv
, VALUE self
)
1315 int should_block
= queue_pop_should_block(argc
, argv
);
1316 return szqueue_do_pop(self
, should_block
);
1320 * Document-method: Thread::SizedQueue#clear
1322 * Removes all objects from the queue.
1326 rb_szqueue_clear(VALUE self
)
1328 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1330 rb_ary_clear(check_array(self
, sq
->q
.que
));
1331 wakeup_all(szqueue_pushq(sq
));
1336 * Document-method: Thread::SizedQueue#length
1341 * Returns the length of the queue.
1345 rb_szqueue_length(VALUE self
)
1347 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1349 return LONG2NUM(queue_length(self
, &sq
->q
));
1353 * Document-method: Thread::SizedQueue#num_waiting
1355 * Returns the number of threads waiting on the queue.
1359 rb_szqueue_num_waiting(VALUE self
)
1361 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1363 return INT2NUM(sq
->q
.num_waiting
+ sq
->num_waiting_push
);
1367 * Document-method: Thread::SizedQueue#empty?
1370 * Returns +true+ if the queue is empty.
1374 rb_szqueue_empty_p(VALUE self
)
1376 struct rb_szqueue
*sq
= szqueue_ptr(self
);
1378 return RBOOL(queue_length(self
, &sq
->q
) == 0);
1382 /* ConditionalVariable */
1384 struct list_head waitq
;
1385 rb_serial_t fork_gen
;
1389 * Document-class: Thread::ConditionVariable
1391 * ConditionVariable objects augment class Mutex. Using condition variables,
1392 * it is possible to suspend while in the middle of a critical section until a
1393 * resource becomes available.
1397 * mutex = Thread::Mutex.new
1398 * resource = Thread::ConditionVariable.new
1401 * mutex.synchronize {
1402 * # Thread 'a' now needs the resource
1403 * resource.wait(mutex)
1404 * # 'a' can now have the resource
1409 * mutex.synchronize {
1410 * # Thread 'b' has finished using the resource
1417 condvar_memsize(const void *ptr
)
1419 return sizeof(struct rb_condvar
);
1422 static const rb_data_type_t cv_data_type
= {
1424 {0, RUBY_TYPED_DEFAULT_FREE
, condvar_memsize
,},
1425 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
|RUBY_TYPED_WB_PROTECTED
1428 static struct rb_condvar
*
1429 condvar_ptr(VALUE self
)
1431 struct rb_condvar
*cv
;
1432 rb_serial_t fork_gen
= GET_VM()->fork_gen
;
1434 TypedData_Get_Struct(self
, struct rb_condvar
, &cv_data_type
, cv
);
1436 /* forked children can't reach into parent thread stacks */
1437 if (cv
->fork_gen
!= fork_gen
) {
1438 cv
->fork_gen
= fork_gen
;
1439 list_head_init(&cv
->waitq
);
1446 condvar_alloc(VALUE klass
)
1448 struct rb_condvar
*cv
;
1451 obj
= TypedData_Make_Struct(klass
, struct rb_condvar
, &cv_data_type
, cv
);
1452 list_head_init(&cv
->waitq
);
1458 * Document-method: ConditionVariable::new
1460 * Creates a new condition variable instance.
1464 rb_condvar_initialize(VALUE self
)
1466 struct rb_condvar
*cv
= condvar_ptr(self
);
1467 list_head_init(&cv
->waitq
);
1479 do_sleep(VALUE args
)
1481 struct sleep_call
*p
= (struct sleep_call
*)args
;
1482 return rb_funcallv(p
->mutex
, id_sleep
, 1, &p
->timeout
);
1486 * Document-method: Thread::ConditionVariable#wait
1487 * call-seq: wait(mutex, timeout=nil)
1489 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1491 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1492 * even if no other thread doesn't signal.
1494 * Returns the slept result on +mutex+.
1498 rb_condvar_wait(int argc
, VALUE
*argv
, VALUE self
)
1500 rb_execution_context_t
*ec
= GET_EC();
1502 struct rb_condvar
*cv
= condvar_ptr(self
);
1503 struct sleep_call args
;
1505 rb_scan_args(argc
, argv
, "11", &args
.mutex
, &args
.timeout
);
1507 struct sync_waiter sync_waiter
= {
1509 .th
= ec
->thread_ptr
,
1510 .fiber
= ec
->fiber_ptr
1513 list_add_tail(&cv
->waitq
, &sync_waiter
.node
);
1514 return rb_ensure(do_sleep
, (VALUE
)&args
, delete_from_waitq
, (VALUE
)&sync_waiter
);
1518 * Document-method: Thread::ConditionVariable#signal
1520 * Wakes up the first thread in line waiting for this lock.
1524 rb_condvar_signal(VALUE self
)
1526 struct rb_condvar
*cv
= condvar_ptr(self
);
1527 wakeup_one(&cv
->waitq
);
1532 * Document-method: Thread::ConditionVariable#broadcast
1534 * Wakes up all threads waiting for this lock.
1538 rb_condvar_broadcast(VALUE self
)
1540 struct rb_condvar
*cv
= condvar_ptr(self
);
1541 wakeup_all(&cv
->waitq
);
1545 NORETURN(static VALUE
undumpable(VALUE obj
));
1548 undumpable(VALUE obj
)
1550 rb_raise(rb_eTypeError
, "can't dump %"PRIsVALUE
, rb_obj_class(obj
));
1551 UNREACHABLE_RETURN(Qnil
);
1555 define_thread_class(VALUE outer
, const ID name
, VALUE super
)
1557 VALUE klass
= rb_define_class_id_under(outer
, name
, super
);
1558 rb_const_set(rb_cObject
, name
, klass
);
1563 Init_thread_sync(void)
1566 #if defined(TEACH_RDOC) && TEACH_RDOC == 42
1567 rb_cMutex
= rb_define_class_under(rb_cThread
, "Mutex", rb_cObject
);
1568 rb_cConditionVariable
= rb_define_class_under(rb_cThread
, "ConditionVariable", rb_cObject
);
1569 rb_cQueue
= rb_define_class_under(rb_cThread
, "Queue", rb_cObject
);
1570 rb_cSizedQueue
= rb_define_class_under(rb_cThread
, "SizedQueue", rb_cObject
);
1573 #define DEFINE_CLASS(name, super) \
1574 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1577 DEFINE_CLASS(Mutex
, Object
);
1578 rb_define_alloc_func(rb_cMutex
, mutex_alloc
);
1579 rb_define_method(rb_cMutex
, "initialize", mutex_initialize
, 0);
1580 rb_define_method(rb_cMutex
, "locked?", rb_mutex_locked_p
, 0);
1581 rb_define_method(rb_cMutex
, "try_lock", rb_mutex_trylock
, 0);
1582 rb_define_method(rb_cMutex
, "lock", rb_mutex_lock
, 0);
1583 rb_define_method(rb_cMutex
, "unlock", rb_mutex_unlock
, 0);
1584 rb_define_method(rb_cMutex
, "sleep", mutex_sleep
, -1);
1585 rb_define_method(rb_cMutex
, "synchronize", rb_mutex_synchronize_m
, 0);
1586 rb_define_method(rb_cMutex
, "owned?", rb_mutex_owned_p
, 0);
1589 DEFINE_CLASS(Queue
, Object
);
1590 rb_define_alloc_func(rb_cQueue
, queue_alloc
);
1592 rb_eClosedQueueError
= rb_define_class("ClosedQueueError", rb_eStopIteration
);
1594 rb_define_method(rb_cQueue
, "initialize", rb_queue_initialize
, -1);
1595 rb_undef_method(rb_cQueue
, "initialize_copy");
1596 rb_define_method(rb_cQueue
, "marshal_dump", undumpable
, 0);
1597 rb_define_method(rb_cQueue
, "close", rb_queue_close
, 0);
1598 rb_define_method(rb_cQueue
, "closed?", rb_queue_closed_p
, 0);
1599 rb_define_method(rb_cQueue
, "push", rb_queue_push
, 1);
1600 rb_define_method(rb_cQueue
, "pop", rb_queue_pop
, -1);
1601 rb_define_method(rb_cQueue
, "empty?", rb_queue_empty_p
, 0);
1602 rb_define_method(rb_cQueue
, "clear", rb_queue_clear
, 0);
1603 rb_define_method(rb_cQueue
, "length", rb_queue_length
, 0);
1604 rb_define_method(rb_cQueue
, "num_waiting", rb_queue_num_waiting
, 0);
1606 rb_define_alias(rb_cQueue
, "enq", "push");
1607 rb_define_alias(rb_cQueue
, "<<", "push");
1608 rb_define_alias(rb_cQueue
, "deq", "pop");
1609 rb_define_alias(rb_cQueue
, "shift", "pop");
1610 rb_define_alias(rb_cQueue
, "size", "length");
1612 DEFINE_CLASS(SizedQueue
, Queue
);
1613 rb_define_alloc_func(rb_cSizedQueue
, szqueue_alloc
);
1615 rb_define_method(rb_cSizedQueue
, "initialize", rb_szqueue_initialize
, 1);
1616 rb_define_method(rb_cSizedQueue
, "close", rb_szqueue_close
, 0);
1617 rb_define_method(rb_cSizedQueue
, "max", rb_szqueue_max_get
, 0);
1618 rb_define_method(rb_cSizedQueue
, "max=", rb_szqueue_max_set
, 1);
1619 rb_define_method(rb_cSizedQueue
, "push", rb_szqueue_push
, -1);
1620 rb_define_method(rb_cSizedQueue
, "pop", rb_szqueue_pop
, -1);
1621 rb_define_method(rb_cSizedQueue
, "empty?", rb_szqueue_empty_p
, 0);
1622 rb_define_method(rb_cSizedQueue
, "clear", rb_szqueue_clear
, 0);
1623 rb_define_method(rb_cSizedQueue
, "length", rb_szqueue_length
, 0);
1624 rb_define_method(rb_cSizedQueue
, "num_waiting", rb_szqueue_num_waiting
, 0);
1626 rb_define_alias(rb_cSizedQueue
, "enq", "push");
1627 rb_define_alias(rb_cSizedQueue
, "<<", "push");
1628 rb_define_alias(rb_cSizedQueue
, "deq", "pop");
1629 rb_define_alias(rb_cSizedQueue
, "shift", "pop");
1630 rb_define_alias(rb_cSizedQueue
, "size", "length");
1633 DEFINE_CLASS(ConditionVariable
, Object
);
1634 rb_define_alloc_func(rb_cConditionVariable
, condvar_alloc
);
1636 id_sleep
= rb_intern("sleep");
1638 rb_define_method(rb_cConditionVariable
, "initialize", rb_condvar_initialize
, 0);
1639 rb_undef_method(rb_cConditionVariable
, "initialize_copy");
1640 rb_define_method(rb_cConditionVariable
, "marshal_dump", undumpable
, 0);
1641 rb_define_method(rb_cConditionVariable
, "wait", rb_condvar_wait
, -1);
1642 rb_define_method(rb_cConditionVariable
, "signal", rb_condvar_signal
, 0);
1643 rb_define_method(rb_cConditionVariable
, "broadcast", rb_condvar_broadcast
, 0);
1645 rb_provide("thread.rb");