2 * Optimized Ruby Mutex implementation, loosely based on thread.rb by
3 * Yukihiro Matsumoto <matz@ruby-lang.org>
5 * Copyright 2006-2007 MenTaLguY <mental@rydia.net>
7 * RDoc taken from original.
9 * This file is made available under the same terms as Ruby.
16 static VALUE rb_cMutex
;
17 static VALUE rb_cConditionVariable
;
18 static VALUE rb_cQueue
;
19 static VALUE rb_cSizedQueue
;
21 static VALUE
set_critical(VALUE value
);
24 thread_exclusive_do(void)
26 rb_thread_critical
= 1;
28 return rb_yield(Qundef
);
33 * Thread.exclusive { block } => obj
35 * Wraps a block in Thread.critical, restoring the original value
36 * upon exit from the critical section, and returns the value of the
41 rb_thread_exclusive(void)
43 return rb_ensure(thread_exclusive_do
, Qundef
, set_critical
, rb_thread_critical
);
46 typedef struct _Entry
{
51 typedef struct _List
{
62 list
->last_entry
= NULL
;
63 list
->entry_pool
= NULL
;
71 for (entry
= list
->entries
; entry
; entry
= entry
->next
) {
72 rb_gc_mark(entry
->value
);
77 free_entries(Entry
*first
)
88 finalize_list(List
*list
)
90 free_entries(list
->entries
);
91 free_entries(list
->entry_pool
);
95 push_list(List
*list
, VALUE value
)
99 if (list
->entry_pool
) {
100 entry
= list
->entry_pool
;
101 list
->entry_pool
= entry
->next
;
103 entry
= ALLOC(Entry
);
106 entry
->value
= value
;
109 if (list
->last_entry
) {
110 list
->last_entry
->next
= entry
;
112 list
->entries
= entry
;
114 list
->last_entry
= entry
;
120 push_multiple_list(List
*list
, VALUE
*values
, unsigned count
)
123 for (i
= 0; i
< count
; i
++) {
124 push_list(list
, values
[i
]);
129 recycle_entries(List
*list
, Entry
*first_entry
, Entry
*last_entry
)
132 last_entry
->next
= list
->entry_pool
;
133 list
->entry_pool
= first_entry
;
135 last_entry
->next
= NULL
;
136 free_entries(first_entry
);
141 shift_list(List
*list
)
146 entry
= list
->entries
;
147 if (!entry
) return Qundef
;
149 list
->entries
= entry
->next
;
150 if (entry
== list
->last_entry
) {
151 list
->last_entry
= NULL
;
156 value
= entry
->value
;
157 recycle_entries(list
, entry
, entry
);
163 remove_one(List
*list
, VALUE value
)
168 for (ref
= &list
->entries
, entry
= list
->entries
;
170 ref
= &entry
->next
, entry
= entry
->next
) {
171 if (entry
->value
== value
) {
173 recycle_entries(list
, entry
, entry
);
180 clear_list(List
*list
)
182 if (list
->last_entry
) {
183 recycle_entries(list
, list
->entries
, list
->last_entry
);
184 list
->entries
= NULL
;
185 list
->last_entry
= NULL
;
191 array_from_list(List
const *list
)
196 for (entry
= list
->entries
; entry
; entry
= entry
->next
) {
197 rb_ary_push(ary
, entry
->value
);
203 wake_thread(VALUE thread
)
205 return rb_rescue2(rb_thread_wakeup
, thread
,
206 NULL
, Qundef
, rb_eThreadError
, 0);
210 run_thread(VALUE thread
)
212 return rb_rescue2(rb_thread_run
, thread
,
213 NULL
, Qundef
, rb_eThreadError
, 0);
222 while (list
->entries
&& !RTEST(waking
)) {
223 waking
= wake_thread(shift_list(list
));
232 while (list
->entries
) {
239 wait_list_inner(List
*list
)
241 push_list(list
, rb_thread_current());
247 wait_list_cleanup(List
*list
)
249 /* cleanup in case of spurious wakeups */
250 remove_one(list
, rb_thread_current());
255 wait_list(List
*list
)
257 rb_ensure(wait_list_inner
, (VALUE
)list
, wait_list_cleanup
, (VALUE
)list
);
261 assert_no_survivors(List
*waiting
, const char *label
, void *addr
)
264 for (entry
= waiting
->entries
; entry
; entry
= entry
->next
) {
265 if (RTEST(wake_thread(entry
->value
))) {
266 rb_bug("%s %p freed with live thread(s) waiting", label
, addr
);
272 * Document-class: Mutex
274 * Mutex implements a simple semaphore that can be used to coordinate access to
275 * shared data from multiple concurrent threads.
280 * semaphore = Mutex.new
283 * semaphore.synchronize {
284 * # access shared resource
289 * semaphore.synchronize {
290 * # access shared resource
296 typedef struct _Mutex
{
302 mark_mutex(Mutex
*mutex
)
304 rb_gc_mark(mutex
->owner
);
305 mark_list(&mutex
->waiting
);
309 finalize_mutex(Mutex
*mutex
)
311 finalize_list(&mutex
->waiting
);
315 free_mutex(Mutex
*mutex
)
317 assert_no_survivors(&mutex
->waiting
, "mutex", mutex
);
318 finalize_mutex(mutex
);
323 init_mutex(Mutex
*mutex
)
326 init_list(&mutex
->waiting
);
330 * Document-method: new
331 * call-seq: Mutex.new
333 * Creates a new Mutex
338 rb_mutex_alloc(VALUE klass
)
341 mutex
= ALLOC(Mutex
);
343 return Data_Wrap_Struct(klass
, mark_mutex
, free_mutex
, mutex
);
347 * Document-method: locked?
350 * Returns +true+ if this lock is currently held by some thread.
355 rb_mutex_locked_p(VALUE self
)
358 Data_Get_Struct(self
, Mutex
, mutex
);
359 return RTEST(mutex
->owner
) ? Qtrue
: Qfalse
;
363 * Document-method: try_lock
366 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
372 rb_mutex_try_lock(VALUE self
)
376 Data_Get_Struct(self
, Mutex
, mutex
);
378 if (RTEST(mutex
->owner
))
381 mutex
->owner
= rb_thread_current();
386 * Document-method: lock
389 * Attempts to grab the lock and waits if it isn't available.
394 lock_mutex(Mutex
*mutex
)
397 current
= rb_thread_current();
399 rb_thread_critical
= 1;
401 while (RTEST(mutex
->owner
)) {
402 wait_list(&mutex
->waiting
);
403 rb_thread_critical
= 1;
405 mutex
->owner
= current
;
407 rb_thread_critical
= 0;
411 rb_mutex_lock(VALUE self
)
414 Data_Get_Struct(self
, Mutex
, mutex
);
420 * Document-method: unlock
422 * Releases the lock. Returns +nil+ if ref wasn't locked.
427 unlock_mutex_inner(Mutex
*mutex
)
431 if (!RTEST(mutex
->owner
)) {
435 waking
= wake_one(&mutex
->waiting
);
441 set_critical(VALUE value
)
443 rb_thread_critical
= (int)value
;
448 unlock_mutex(Mutex
*mutex
)
452 rb_thread_critical
= 1;
453 waking
= rb_ensure(unlock_mutex_inner
, (VALUE
)mutex
, set_critical
, 0);
455 if (waking
== Qundef
) {
467 rb_mutex_unlock(VALUE self
)
470 Data_Get_Struct(self
, Mutex
, mutex
);
472 if (RTEST(unlock_mutex(mutex
))) {
480 * Document-method: exclusive_unlock
481 * call-seq: exclusive_unlock { ... }
483 * If the mutex is locked, unlocks the mutex, wakes one waiting thread, and
484 * yields in a critical section.
489 rb_mutex_exclusive_unlock_inner(Mutex
*mutex
)
492 waking
= unlock_mutex_inner(mutex
);
498 rb_mutex_exclusive_unlock(VALUE self
)
502 Data_Get_Struct(self
, Mutex
, mutex
);
504 rb_thread_critical
= 1;
505 waking
= rb_ensure(rb_mutex_exclusive_unlock_inner
, (VALUE
)mutex
, set_critical
, 0);
507 if (waking
== Qundef
) {
519 * Document-method: synchronize
520 * call-seq: synchronize { ... }
522 * Obtains a lock, runs the block, and releases the lock when the block
523 * completes. See the example under Mutex.
528 rb_mutex_synchronize(VALUE self
)
531 return rb_ensure(rb_yield
, Qundef
, rb_mutex_unlock
, self
);
535 * Document-class: ConditionVariable
537 * ConditionVariable objects augment class Mutex. Using condition variables,
538 * it is possible to suspend while in the middle of a critical section until a
539 * resource becomes available.
546 * resource = ConditionVariable.new
549 * mutex.synchronize {
550 * # Thread 'a' now needs the resource
551 * resource.wait(mutex)
552 * # 'a' can now have the resource
557 * mutex.synchronize {
558 * # Thread 'b' has finished using the resource
565 typedef struct _ConditionVariable
{
570 mark_condvar(ConditionVariable
*condvar
)
572 mark_list(&condvar
->waiting
);
576 finalize_condvar(ConditionVariable
*condvar
)
578 finalize_list(&condvar
->waiting
);
582 free_condvar(ConditionVariable
*condvar
)
584 assert_no_survivors(&condvar
->waiting
, "condition variable", condvar
);
585 finalize_condvar(condvar
);
590 init_condvar(ConditionVariable
*condvar
)
592 init_list(&condvar
->waiting
);
596 * Document-method: new
597 * call-seq: ConditionVariable.new
599 * Creates a new ConditionVariable
604 rb_condvar_alloc(VALUE klass
)
606 ConditionVariable
*condvar
;
608 condvar
= ALLOC(ConditionVariable
);
609 init_condvar(condvar
);
611 return Data_Wrap_Struct(klass
, mark_condvar
, free_condvar
, condvar
);
615 * Document-method: wait
618 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
623 wait_condvar(ConditionVariable
*condvar
, Mutex
*mutex
)
625 rb_thread_critical
= 1;
626 if (!RTEST(mutex
->owner
)) {
627 rb_thread_critical
= 0;
630 if (mutex
->owner
!= rb_thread_current()) {
631 rb_thread_critical
= 0;
632 rb_raise(rb_eThreadError
, "Not owner");
635 wait_list(&condvar
->waiting
);
641 legacy_exclusive_unlock(VALUE mutex
)
643 return rb_funcall(mutex
, rb_intern("exclusive_unlock"), 0);
647 ConditionVariable
*condvar
;
652 legacy_wait(VALUE unused
, legacy_wait_args
*args
)
654 wait_list(&args
->condvar
->waiting
);
655 rb_funcall(args
->mutex
, rb_intern("lock"), 0);
660 rb_condvar_wait(VALUE self
, VALUE mutex_v
)
662 ConditionVariable
*condvar
;
663 Data_Get_Struct(self
, ConditionVariable
, condvar
);
665 if (CLASS_OF(mutex_v
) != rb_cMutex
) {
666 /* interoperate with legacy mutex */
667 legacy_wait_args args
;
668 args
.condvar
= condvar
;
669 args
.mutex
= mutex_v
;
670 rb_iterate(legacy_exclusive_unlock
, mutex_v
, legacy_wait
, (VALUE
)&args
);
673 Data_Get_Struct(mutex_v
, Mutex
, mutex
);
674 wait_condvar(condvar
, mutex
);
681 * Document-method: broadcast
682 * call-seq: broadcast
684 * Wakes up all threads waiting for this condition.
689 rb_condvar_broadcast(VALUE self
)
691 ConditionVariable
*condvar
;
693 Data_Get_Struct(self
, ConditionVariable
, condvar
);
695 rb_thread_critical
= 1;
696 rb_ensure(wake_all
, (VALUE
)&condvar
->waiting
, set_critical
, 0);
697 rb_thread_schedule();
703 * Document-method: signal
706 * Wakes up the first thread in line waiting for this condition.
711 signal_condvar(ConditionVariable
*condvar
)
714 rb_thread_critical
= 1;
715 waking
= rb_ensure(wake_one
, (VALUE
)&condvar
->waiting
, set_critical
, 0);
722 rb_condvar_signal(VALUE self
)
724 ConditionVariable
*condvar
;
725 Data_Get_Struct(self
, ConditionVariable
, condvar
);
726 signal_condvar(condvar
);
731 * Document-class: Queue
733 * This class provides a way to synchronize communication between threads.
741 * producer = Thread.new do
743 * sleep rand(i) # simulate expense
745 * puts "#{i} produced"
749 * consumer = Thread.new do
752 * sleep rand(i/2) # simulate expense
753 * puts "consumed #{value}"
761 typedef struct _Queue
{
763 ConditionVariable value_available
;
764 ConditionVariable space_available
;
766 unsigned long capacity
;
770 mark_queue(Queue
*queue
)
772 mark_mutex(&queue
->mutex
);
773 mark_condvar(&queue
->value_available
);
774 mark_condvar(&queue
->space_available
);
775 mark_list(&queue
->values
);
779 finalize_queue(Queue
*queue
)
781 finalize_mutex(&queue
->mutex
);
782 finalize_condvar(&queue
->value_available
);
783 finalize_condvar(&queue
->space_available
);
784 finalize_list(&queue
->values
);
788 free_queue(Queue
*queue
)
790 assert_no_survivors(&queue
->mutex
.waiting
, "queue", queue
);
791 assert_no_survivors(&queue
->space_available
.waiting
, "queue", queue
);
792 assert_no_survivors(&queue
->value_available
.waiting
, "queue", queue
);
793 finalize_queue(queue
);
798 init_queue(Queue
*queue
)
800 init_mutex(&queue
->mutex
);
801 init_condvar(&queue
->value_available
);
802 init_condvar(&queue
->space_available
);
803 init_list(&queue
->values
);
808 * Document-method: new
811 * Creates a new queue.
816 rb_queue_alloc(VALUE klass
)
819 queue
= ALLOC(Queue
);
821 return Data_Wrap_Struct(klass
, mark_queue
, free_queue
, queue
);
825 rb_queue_marshal_load(VALUE self
, VALUE data
)
829 Data_Get_Struct(self
, Queue
, queue
);
831 array
= rb_marshal_load(data
);
832 if (TYPE(array
) != T_ARRAY
) {
833 rb_raise(rb_eRuntimeError
, "expected Array of queue data");
835 if (RARRAY(array
)->len
< 1) {
836 rb_raise(rb_eRuntimeError
, "missing capacity value");
838 queue
->capacity
= NUM2ULONG(rb_ary_shift(array
));
839 push_multiple_list(&queue
->values
, RARRAY(array
)->ptr
, (unsigned)RARRAY(array
)->len
);
845 rb_queue_marshal_dump(VALUE self
)
849 Data_Get_Struct(self
, Queue
, queue
);
851 array
= array_from_list(&queue
->values
);
852 rb_ary_unshift(array
, ULONG2NUM(queue
->capacity
));
853 return rb_marshal_dump(array
, Qnil
);
857 * Document-method: clear
860 * Removes all objects from the queue.
865 rb_queue_clear(VALUE self
)
868 Data_Get_Struct(self
, Queue
, queue
);
870 lock_mutex(&queue
->mutex
);
871 clear_list(&queue
->values
);
872 signal_condvar(&queue
->space_available
);
873 unlock_mutex(&queue
->mutex
);
879 * Document-method: empty?
882 * Returns +true+ if the queue is empty.
887 rb_queue_empty_p(VALUE self
)
891 Data_Get_Struct(self
, Queue
, queue
);
893 lock_mutex(&queue
->mutex
);
894 result
= queue
->values
.size
== 0 ? Qtrue
: Qfalse
;
895 unlock_mutex(&queue
->mutex
);
901 * Document-method: length
904 * Returns the length of the queue.
909 rb_queue_length(VALUE self
)
913 Data_Get_Struct(self
, Queue
, queue
);
915 lock_mutex(&queue
->mutex
);
916 result
= ULONG2NUM(queue
->values
.size
);
917 unlock_mutex(&queue
->mutex
);
923 * Document-method: num_waiting
924 * call-seq: num_waiting
926 * Returns the number of threads waiting on the queue.
931 rb_queue_num_waiting(VALUE self
)
935 Data_Get_Struct(self
, Queue
, queue
);
937 lock_mutex(&queue
->mutex
);
938 result
= ULONG2NUM(queue
->value_available
.waiting
.size
+
939 queue
->space_available
.waiting
.size
);
940 unlock_mutex(&queue
->mutex
);
946 * Document-method: pop
947 * call_seq: pop(non_block=false)
949 * Retrieves data from the queue. If the queue is empty, the calling thread is
950 * suspended until data is pushed onto the queue. If +non_block+ is true, the
951 * thread isn't suspended, and an exception is raised.
956 rb_queue_pop(int argc
, VALUE
*argv
, VALUE self
)
961 Data_Get_Struct(self
, Queue
, queue
);
965 } else if (argc
== 1) {
966 should_block
= !RTEST(argv
[0]);
968 rb_raise(rb_eArgError
, "wrong number of arguments (%d for 1)", argc
);
971 lock_mutex(&queue
->mutex
);
972 if (!queue
->values
.entries
&& !should_block
) {
973 unlock_mutex(&queue
->mutex
);
974 rb_raise(rb_eThreadError
, "queue empty");
977 while (!queue
->values
.entries
) {
978 wait_condvar(&queue
->value_available
, &queue
->mutex
);
981 result
= shift_list(&queue
->values
);
982 if (queue
->capacity
&& queue
->values
.size
< queue
->capacity
) {
983 signal_condvar(&queue
->space_available
);
985 unlock_mutex(&queue
->mutex
);
991 * Document-method: push
992 * call-seq: push(obj)
994 * Pushes +obj+ to the queue.
999 rb_queue_push(VALUE self
, VALUE value
)
1002 Data_Get_Struct(self
, Queue
, queue
);
1004 lock_mutex(&queue
->mutex
);
1005 while (queue
->capacity
&& queue
->values
.size
>= queue
->capacity
) {
1006 wait_condvar(&queue
->space_available
, &queue
->mutex
);
1008 push_list(&queue
->values
, value
);
1009 signal_condvar(&queue
->value_available
);
1010 unlock_mutex(&queue
->mutex
);
1016 * Document-class: SizedQueue
1018 * This class represents queues of specified size capacity. The push operation
1019 * may be blocked if the capacity is full.
1021 * See Queue for an example of how a SizedQueue works.
1026 * Document-method: new
1029 * Creates a fixed-length queue with a maximum size of +max+.
1034 * Document-method: max
1037 * Returns the maximum size of the queue.
1042 rb_sized_queue_max(VALUE self
)
1046 Data_Get_Struct(self
, Queue
, queue
);
1048 lock_mutex(&queue
->mutex
);
1049 result
= ULONG2NUM(queue
->capacity
);
1050 unlock_mutex(&queue
->mutex
);
1056 * Document-method: max=
1057 * call-seq: max=(size)
1059 * Sets the maximum size of the queue.
1064 rb_sized_queue_max_set(VALUE self
, VALUE value
)
1067 unsigned long new_capacity
;
1068 unsigned long difference
;
1069 Data_Get_Struct(self
, Queue
, queue
);
1071 new_capacity
= NUM2ULONG(value
);
1073 if (new_capacity
< 1) {
1074 rb_raise(rb_eArgError
, "value must be positive");
1077 lock_mutex(&queue
->mutex
);
1078 if (queue
->capacity
&& new_capacity
> queue
->capacity
) {
1079 difference
= new_capacity
- queue
->capacity
;
1083 queue
->capacity
= new_capacity
;
1084 for (; difference
> 0; --difference
) {
1085 signal_condvar(&queue
->space_available
);
1087 unlock_mutex(&queue
->mutex
);
1093 * Document-method: push
1094 * call-seq: push(obj)
1096 * Pushes +obj+ to the queue. If there is no space left in the queue, waits
1097 * until space becomes available.
1102 * Document-method: pop
1103 * call-seq: pop(non_block=false)
1105 * Retrieves data from the queue and runs a waiting thread, if any.
1109 /* for marshalling mutexes and condvars */
1112 dummy_load(VALUE self
, VALUE string
)
1118 dummy_dump(VALUE self
)
1120 return rb_str_new2("");
1126 rb_define_singleton_method(rb_cThread
, "exclusive", rb_thread_exclusive
, 0);
1128 rb_cMutex
= rb_define_class("Mutex", rb_cObject
);
1129 rb_define_alloc_func(rb_cMutex
, rb_mutex_alloc
);
1130 rb_define_method(rb_cMutex
, "marshal_load", dummy_load
, 1);
1131 rb_define_method(rb_cMutex
, "marshal_dump", dummy_dump
, 0);
1132 rb_define_method(rb_cMutex
, "locked?", rb_mutex_locked_p
, 0);
1133 rb_define_method(rb_cMutex
, "try_lock", rb_mutex_try_lock
, 0);
1134 rb_define_method(rb_cMutex
, "lock", rb_mutex_lock
, 0);
1135 rb_define_method(rb_cMutex
, "unlock", rb_mutex_unlock
, 0);
1136 rb_define_method(rb_cMutex
, "exclusive_unlock", rb_mutex_exclusive_unlock
, 0);
1137 rb_define_method(rb_cMutex
, "synchronize", rb_mutex_synchronize
, 0);
1139 rb_cConditionVariable
= rb_define_class("ConditionVariable", rb_cObject
);
1140 rb_define_alloc_func(rb_cConditionVariable
, rb_condvar_alloc
);
1141 rb_define_method(rb_cConditionVariable
, "marshal_load", dummy_load
, 1);
1142 rb_define_method(rb_cConditionVariable
, "marshal_dump", dummy_dump
, 0);
1143 rb_define_method(rb_cConditionVariable
, "wait", rb_condvar_wait
, 1);
1144 rb_define_method(rb_cConditionVariable
, "broadcast", rb_condvar_broadcast
, 0);
1145 rb_define_method(rb_cConditionVariable
, "signal", rb_condvar_signal
, 0);
1147 rb_cQueue
= rb_define_class("Queue", rb_cObject
);
1148 rb_define_alloc_func(rb_cQueue
, rb_queue_alloc
);
1149 rb_define_method(rb_cQueue
, "marshal_load", rb_queue_marshal_load
, 1);
1150 rb_define_method(rb_cQueue
, "marshal_dump", rb_queue_marshal_dump
, 0);
1151 rb_define_method(rb_cQueue
, "clear", rb_queue_clear
, 0);
1152 rb_define_method(rb_cQueue
, "empty?", rb_queue_empty_p
, 0);
1153 rb_define_method(rb_cQueue
, "length", rb_queue_length
, 0);
1154 rb_define_method(rb_cQueue
, "num_waiting", rb_queue_num_waiting
, 0);
1155 rb_define_method(rb_cQueue
, "pop", rb_queue_pop
, -1);
1156 rb_define_method(rb_cQueue
, "push", rb_queue_push
, 1);
1157 rb_alias(rb_cQueue
, rb_intern("enq"), rb_intern("push"));
1158 rb_alias(rb_cQueue
, rb_intern("<<"), rb_intern("push"));
1159 rb_alias(rb_cQueue
, rb_intern("deq"), rb_intern("pop"));
1160 rb_alias(rb_cQueue
, rb_intern("shift"), rb_intern("pop"));
1161 rb_alias(rb_cQueue
, rb_intern("size"), rb_intern("length"));
1163 rb_cSizedQueue
= rb_define_class("SizedQueue", rb_cQueue
);
1164 rb_define_method(rb_cSizedQueue
, "initialize", rb_sized_queue_max_set
, 1);
1165 rb_define_method(rb_cSizedQueue
, "num_waiting", rb_queue_num_waiting
, 0);
1166 rb_define_method(rb_cSizedQueue
, "pop", rb_queue_pop
, -1);
1167 rb_define_method(rb_cSizedQueue
, "push", rb_queue_push
, 1);
1168 rb_define_method(rb_cSizedQueue
, "max", rb_sized_queue_max
, 0);
1169 rb_define_method(rb_cSizedQueue
, "max=", rb_sized_queue_max_set
, 1);
1170 rb_alias(rb_cSizedQueue
, rb_intern("enq"), rb_intern("push"));
1171 rb_alias(rb_cSizedQueue
, rb_intern("<<"), rb_intern("push"));
1172 rb_alias(rb_cSizedQueue
, rb_intern("deq"), rb_intern("pop"));
1173 rb_alias(rb_cSizedQueue
, rb_intern("shift"), rb_intern("pop"));