Update to RDoc 2.1.0 r112
[rbx.git] / stdlib / ext / thread / thread.c
blobe9dde37a18f27fa2eb4725f5bc2a9e7c424474ab
1 /*
2 * Optimized Ruby Mutex implementation, loosely based on thread.rb by
3 * Yukihiro Matsumoto <matz@ruby-lang.org>
5 * Copyright 2006-2007 MenTaLguY <mental@rydia.net>
7 * RDoc taken from original.
9 * This file is made available under the same terms as Ruby.
12 #include <ruby.h>
13 #include <intern.h>
14 #include <rubysig.h>
16 static VALUE rb_cMutex;
17 static VALUE rb_cConditionVariable;
18 static VALUE rb_cQueue;
19 static VALUE rb_cSizedQueue;
21 static VALUE set_critical(VALUE value);
23 static VALUE
24 thread_exclusive_do(void)
26 rb_thread_critical = 1;
28 return rb_yield(Qundef);
32 * call-seq:
33 * Thread.exclusive { block } => obj
35 * Wraps a block in Thread.critical, restoring the original value
36 * upon exit from the critical section, and returns the value of the
37 * block.
40 static VALUE
41 rb_thread_exclusive(void)
43 return rb_ensure(thread_exclusive_do, Qundef, set_critical, rb_thread_critical);
46 typedef struct _Entry {
47 VALUE value;
48 struct _Entry *next;
49 } Entry;
51 typedef struct _List {
52 Entry *entries;
53 Entry *last_entry;
54 Entry *entry_pool;
55 unsigned long size;
56 } List;
58 static void
59 init_list(List *list)
61 list->entries = NULL;
62 list->last_entry = NULL;
63 list->entry_pool = NULL;
64 list->size = 0;
67 static void
68 mark_list(List *list)
70 Entry *entry;
71 for (entry = list->entries; entry; entry = entry->next) {
72 rb_gc_mark(entry->value);
76 static void
77 free_entries(Entry *first)
79 Entry *next;
80 while (first) {
81 next = first->next;
82 xfree(first);
83 first = next;
87 static void
88 finalize_list(List *list)
90 free_entries(list->entries);
91 free_entries(list->entry_pool);
94 static void
95 push_list(List *list, VALUE value)
97 Entry *entry;
99 if (list->entry_pool) {
100 entry = list->entry_pool;
101 list->entry_pool = entry->next;
102 } else {
103 entry = ALLOC(Entry);
106 entry->value = value;
107 entry->next = NULL;
109 if (list->last_entry) {
110 list->last_entry->next = entry;
111 } else {
112 list->entries = entry;
114 list->last_entry = entry;
116 ++list->size;
119 static void
120 push_multiple_list(List *list, VALUE *values, unsigned count)
122 unsigned i;
123 for (i = 0; i < count; i++) {
124 push_list(list, values[i]);
128 static void
129 recycle_entries(List *list, Entry *first_entry, Entry *last_entry)
131 #ifdef USE_MEM_POOLS
132 last_entry->next = list->entry_pool;
133 list->entry_pool = first_entry;
134 #else
135 last_entry->next = NULL;
136 free_entries(first_entry);
137 #endif
140 static VALUE
141 shift_list(List *list)
143 Entry *entry;
144 VALUE value;
146 entry = list->entries;
147 if (!entry) return Qundef;
149 list->entries = entry->next;
150 if (entry == list->last_entry) {
151 list->last_entry = NULL;
154 --list->size;
156 value = entry->value;
157 recycle_entries(list, entry, entry);
159 return value;
162 static void
163 remove_one(List *list, VALUE value)
165 Entry **ref;
166 Entry *entry;
168 for (ref = &list->entries, entry = list->entries;
169 entry != NULL;
170 ref = &entry->next, entry = entry->next) {
171 if (entry->value == value) {
172 *ref = entry->next;
173 recycle_entries(list, entry, entry);
174 break;
179 static void
180 clear_list(List *list)
182 if (list->last_entry) {
183 recycle_entries(list, list->entries, list->last_entry);
184 list->entries = NULL;
185 list->last_entry = NULL;
186 list->size = 0;
190 static VALUE
191 array_from_list(List const *list)
193 VALUE ary;
194 Entry *entry;
195 ary = rb_ary_new();
196 for (entry = list->entries; entry; entry = entry->next) {
197 rb_ary_push(ary, entry->value);
199 return ary;
202 static VALUE
203 wake_thread(VALUE thread)
205 return rb_rescue2(rb_thread_wakeup, thread,
206 NULL, Qundef, rb_eThreadError, 0);
209 static VALUE
210 run_thread(VALUE thread)
212 return rb_rescue2(rb_thread_run, thread,
213 NULL, Qundef, rb_eThreadError, 0);
216 static VALUE
217 wake_one(List *list)
219 VALUE waking;
221 waking = Qnil;
222 while (list->entries && !RTEST(waking)) {
223 waking = wake_thread(shift_list(list));
226 return waking;
229 static VALUE
230 wake_all(List *list)
232 while (list->entries) {
233 wake_one(list);
235 return Qnil;
238 static VALUE
239 wait_list_inner(List *list)
241 push_list(list, rb_thread_current());
242 rb_thread_stop();
243 return Qnil;
246 static VALUE
247 wait_list_cleanup(List *list)
249 /* cleanup in case of spurious wakeups */
250 remove_one(list, rb_thread_current());
251 return Qnil;
254 static void
255 wait_list(List *list)
257 rb_ensure(wait_list_inner, (VALUE)list, wait_list_cleanup, (VALUE)list);
260 static void
261 assert_no_survivors(List *waiting, const char *label, void *addr)
263 Entry *entry;
264 for (entry = waiting->entries; entry; entry = entry->next) {
265 if (RTEST(wake_thread(entry->value))) {
266 rb_bug("%s %p freed with live thread(s) waiting", label, addr);
272 * Document-class: Mutex
274 * Mutex implements a simple semaphore that can be used to coordinate access to
275 * shared data from multiple concurrent threads.
277 * Example:
279 * require 'thread'
280 * semaphore = Mutex.new
282 * a = Thread.new {
283 * semaphore.synchronize {
284 * # access shared resource
288 * b = Thread.new {
289 * semaphore.synchronize {
290 * # access shared resource
296 typedef struct _Mutex {
297 VALUE owner;
298 List waiting;
299 } Mutex;
301 static void
302 mark_mutex(Mutex *mutex)
304 rb_gc_mark(mutex->owner);
305 mark_list(&mutex->waiting);
308 static void
309 finalize_mutex(Mutex *mutex)
311 finalize_list(&mutex->waiting);
314 static void
315 free_mutex(Mutex *mutex)
317 assert_no_survivors(&mutex->waiting, "mutex", mutex);
318 finalize_mutex(mutex);
319 xfree(mutex);
322 static void
323 init_mutex(Mutex *mutex)
325 mutex->owner = Qnil;
326 init_list(&mutex->waiting);
330 * Document-method: new
331 * call-seq: Mutex.new
333 * Creates a new Mutex
337 static VALUE
338 rb_mutex_alloc(VALUE klass)
340 Mutex *mutex;
341 mutex = ALLOC(Mutex);
342 init_mutex(mutex);
343 return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex);
347 * Document-method: locked?
348 * call-seq: locked?
350 * Returns +true+ if this lock is currently held by some thread.
354 static VALUE
355 rb_mutex_locked_p(VALUE self)
357 Mutex *mutex;
358 Data_Get_Struct(self, Mutex, mutex);
359 return RTEST(mutex->owner) ? Qtrue : Qfalse;
363 * Document-method: try_lock
364 * call-seq: try_lock
366 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
367 * lock was granted.
371 static VALUE
372 rb_mutex_try_lock(VALUE self)
374 Mutex *mutex;
376 Data_Get_Struct(self, Mutex, mutex);
378 if (RTEST(mutex->owner))
379 return Qfalse;
381 mutex->owner = rb_thread_current();
382 return Qtrue;
386 * Document-method: lock
387 * call-seq: lock
389 * Attempts to grab the lock and waits if it isn't available.
393 static void
394 lock_mutex(Mutex *mutex)
396 VALUE current;
397 current = rb_thread_current();
399 rb_thread_critical = 1;
401 while (RTEST(mutex->owner)) {
402 wait_list(&mutex->waiting);
403 rb_thread_critical = 1;
405 mutex->owner = current;
407 rb_thread_critical = 0;
410 static VALUE
411 rb_mutex_lock(VALUE self)
413 Mutex *mutex;
414 Data_Get_Struct(self, Mutex, mutex);
415 lock_mutex(mutex);
416 return self;
420 * Document-method: unlock
422 * Releases the lock. Returns +nil+ if ref wasn't locked.
426 static VALUE
427 unlock_mutex_inner(Mutex *mutex)
429 VALUE waking;
431 if (!RTEST(mutex->owner)) {
432 return Qundef;
434 mutex->owner = Qnil;
435 waking = wake_one(&mutex->waiting);
437 return waking;
440 static VALUE
441 set_critical(VALUE value)
443 rb_thread_critical = (int)value;
444 return Qundef;
447 static VALUE
448 unlock_mutex(Mutex *mutex)
450 VALUE waking;
452 rb_thread_critical = 1;
453 waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0);
455 if (waking == Qundef) {
456 return Qfalse;
459 if (RTEST(waking)) {
460 run_thread(waking);
463 return Qtrue;
466 static VALUE
467 rb_mutex_unlock(VALUE self)
469 Mutex *mutex;
470 Data_Get_Struct(self, Mutex, mutex);
472 if (RTEST(unlock_mutex(mutex))) {
473 return self;
474 } else {
475 return Qnil;
480 * Document-method: exclusive_unlock
481 * call-seq: exclusive_unlock { ... }
483 * If the mutex is locked, unlocks the mutex, wakes one waiting thread, and
484 * yields in a critical section.
488 static VALUE
489 rb_mutex_exclusive_unlock_inner(Mutex *mutex)
491 VALUE waking;
492 waking = unlock_mutex_inner(mutex);
493 rb_yield(Qundef);
494 return waking;
497 static VALUE
498 rb_mutex_exclusive_unlock(VALUE self)
500 Mutex *mutex;
501 VALUE waking;
502 Data_Get_Struct(self, Mutex, mutex);
504 rb_thread_critical = 1;
505 waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0);
507 if (waking == Qundef) {
508 return Qnil;
511 if (RTEST(waking)) {
512 run_thread(waking);
515 return self;
519 * Document-method: synchronize
520 * call-seq: synchronize { ... }
522 * Obtains a lock, runs the block, and releases the lock when the block
523 * completes. See the example under Mutex.
527 static VALUE
528 rb_mutex_synchronize(VALUE self)
530 rb_mutex_lock(self);
531 return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self);
535 * Document-class: ConditionVariable
537 * ConditionVariable objects augment class Mutex. Using condition variables,
538 * it is possible to suspend while in the middle of a critical section until a
539 * resource becomes available.
541 * Example:
543 * require 'thread'
545 * mutex = Mutex.new
546 * resource = ConditionVariable.new
548 * a = Thread.new {
549 * mutex.synchronize {
550 * # Thread 'a' now needs the resource
551 * resource.wait(mutex)
552 * # 'a' can now have the resource
556 * b = Thread.new {
557 * mutex.synchronize {
558 * # Thread 'b' has finished using the resource
559 * resource.signal
565 typedef struct _ConditionVariable {
566 List waiting;
567 } ConditionVariable;
569 static void
570 mark_condvar(ConditionVariable *condvar)
572 mark_list(&condvar->waiting);
575 static void
576 finalize_condvar(ConditionVariable *condvar)
578 finalize_list(&condvar->waiting);
581 static void
582 free_condvar(ConditionVariable *condvar)
584 assert_no_survivors(&condvar->waiting, "condition variable", condvar);
585 finalize_condvar(condvar);
586 xfree(condvar);
589 static void
590 init_condvar(ConditionVariable *condvar)
592 init_list(&condvar->waiting);
596 * Document-method: new
597 * call-seq: ConditionVariable.new
599 * Creates a new ConditionVariable
603 static VALUE
604 rb_condvar_alloc(VALUE klass)
606 ConditionVariable *condvar;
608 condvar = ALLOC(ConditionVariable);
609 init_condvar(condvar);
611 return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar);
615 * Document-method: wait
616 * call-seq: wait
618 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
622 static void
623 wait_condvar(ConditionVariable *condvar, Mutex *mutex)
625 rb_thread_critical = 1;
626 if (!RTEST(mutex->owner)) {
627 rb_thread_critical = 0;
628 return;
630 if (mutex->owner != rb_thread_current()) {
631 rb_thread_critical = 0;
632 rb_raise(rb_eThreadError, "Not owner");
634 mutex->owner = Qnil;
635 wait_list(&condvar->waiting);
637 lock_mutex(mutex);
640 static VALUE
641 legacy_exclusive_unlock(VALUE mutex)
643 return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0);
646 typedef struct {
647 ConditionVariable *condvar;
648 VALUE mutex;
649 } legacy_wait_args;
651 static VALUE
652 legacy_wait(VALUE unused, legacy_wait_args *args)
654 wait_list(&args->condvar->waiting);
655 rb_funcall(args->mutex, rb_intern("lock"), 0);
656 return Qnil;
659 static VALUE
660 rb_condvar_wait(VALUE self, VALUE mutex_v)
662 ConditionVariable *condvar;
663 Data_Get_Struct(self, ConditionVariable, condvar);
665 if (CLASS_OF(mutex_v) != rb_cMutex) {
666 /* interoperate with legacy mutex */
667 legacy_wait_args args;
668 args.condvar = condvar;
669 args.mutex = mutex_v;
670 rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args);
671 } else {
672 Mutex *mutex;
673 Data_Get_Struct(mutex_v, Mutex, mutex);
674 wait_condvar(condvar, mutex);
677 return self;
681 * Document-method: broadcast
682 * call-seq: broadcast
684 * Wakes up all threads waiting for this condition.
688 static VALUE
689 rb_condvar_broadcast(VALUE self)
691 ConditionVariable *condvar;
693 Data_Get_Struct(self, ConditionVariable, condvar);
695 rb_thread_critical = 1;
696 rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0);
697 rb_thread_schedule();
699 return self;
703 * Document-method: signal
704 * call-seq: signal
706 * Wakes up the first thread in line waiting for this condition.
710 static void
711 signal_condvar(ConditionVariable *condvar)
713 VALUE waking;
714 rb_thread_critical = 1;
715 waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0);
716 if (RTEST(waking)) {
717 run_thread(waking);
721 static VALUE
722 rb_condvar_signal(VALUE self)
724 ConditionVariable *condvar;
725 Data_Get_Struct(self, ConditionVariable, condvar);
726 signal_condvar(condvar);
727 return self;
731 * Document-class: Queue
733 * This class provides a way to synchronize communication between threads.
735 * Example:
737 * require 'thread'
739 * queue = Queue.new
741 * producer = Thread.new do
742 * 5.times do |i|
743 * sleep rand(i) # simulate expense
744 * queue << i
745 * puts "#{i} produced"
746 * end
747 * end
749 * consumer = Thread.new do
750 * 5.times do |i|
751 * value = queue.pop
752 * sleep rand(i/2) # simulate expense
753 * puts "consumed #{value}"
754 * end
755 * end
757 * consumer.join
761 typedef struct _Queue {
762 Mutex mutex;
763 ConditionVariable value_available;
764 ConditionVariable space_available;
765 List values;
766 unsigned long capacity;
767 } Queue;
769 static void
770 mark_queue(Queue *queue)
772 mark_mutex(&queue->mutex);
773 mark_condvar(&queue->value_available);
774 mark_condvar(&queue->space_available);
775 mark_list(&queue->values);
778 static void
779 finalize_queue(Queue *queue)
781 finalize_mutex(&queue->mutex);
782 finalize_condvar(&queue->value_available);
783 finalize_condvar(&queue->space_available);
784 finalize_list(&queue->values);
787 static void
788 free_queue(Queue *queue)
790 assert_no_survivors(&queue->mutex.waiting, "queue", queue);
791 assert_no_survivors(&queue->space_available.waiting, "queue", queue);
792 assert_no_survivors(&queue->value_available.waiting, "queue", queue);
793 finalize_queue(queue);
794 xfree(queue);
797 static void
798 init_queue(Queue *queue)
800 init_mutex(&queue->mutex);
801 init_condvar(&queue->value_available);
802 init_condvar(&queue->space_available);
803 init_list(&queue->values);
804 queue->capacity = 0;
808 * Document-method: new
809 * call-seq: new
811 * Creates a new queue.
815 static VALUE
816 rb_queue_alloc(VALUE klass)
818 Queue *queue;
819 queue = ALLOC(Queue);
820 init_queue(queue);
821 return Data_Wrap_Struct(klass, mark_queue, free_queue, queue);
824 static VALUE
825 rb_queue_marshal_load(VALUE self, VALUE data)
827 Queue *queue;
828 VALUE array;
829 Data_Get_Struct(self, Queue, queue);
831 array = rb_marshal_load(data);
832 if (TYPE(array) != T_ARRAY) {
833 rb_raise(rb_eRuntimeError, "expected Array of queue data");
835 if (RARRAY(array)->len < 1) {
836 rb_raise(rb_eRuntimeError, "missing capacity value");
838 queue->capacity = NUM2ULONG(rb_ary_shift(array));
839 push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);
841 return self;
844 static VALUE
845 rb_queue_marshal_dump(VALUE self)
847 Queue *queue;
848 VALUE array;
849 Data_Get_Struct(self, Queue, queue);
851 array = array_from_list(&queue->values);
852 rb_ary_unshift(array, ULONG2NUM(queue->capacity));
853 return rb_marshal_dump(array, Qnil);
857 * Document-method: clear
858 * call-seq: clear
860 * Removes all objects from the queue.
864 static VALUE
865 rb_queue_clear(VALUE self)
867 Queue *queue;
868 Data_Get_Struct(self, Queue, queue);
870 lock_mutex(&queue->mutex);
871 clear_list(&queue->values);
872 signal_condvar(&queue->space_available);
873 unlock_mutex(&queue->mutex);
875 return self;
879 * Document-method: empty?
880 * call-seq: empty?
882 * Returns +true+ if the queue is empty.
886 static VALUE
887 rb_queue_empty_p(VALUE self)
889 Queue *queue;
890 VALUE result;
891 Data_Get_Struct(self, Queue, queue);
893 lock_mutex(&queue->mutex);
894 result = queue->values.size == 0 ? Qtrue : Qfalse;
895 unlock_mutex(&queue->mutex);
897 return result;
901 * Document-method: length
902 * call-seq: length
904 * Returns the length of the queue.
908 static VALUE
909 rb_queue_length(VALUE self)
911 Queue *queue;
912 VALUE result;
913 Data_Get_Struct(self, Queue, queue);
915 lock_mutex(&queue->mutex);
916 result = ULONG2NUM(queue->values.size);
917 unlock_mutex(&queue->mutex);
919 return result;
923 * Document-method: num_waiting
924 * call-seq: num_waiting
926 * Returns the number of threads waiting on the queue.
930 static VALUE
931 rb_queue_num_waiting(VALUE self)
933 Queue *queue;
934 VALUE result;
935 Data_Get_Struct(self, Queue, queue);
937 lock_mutex(&queue->mutex);
938 result = ULONG2NUM(queue->value_available.waiting.size +
939 queue->space_available.waiting.size);
940 unlock_mutex(&queue->mutex);
942 return result;
946 * Document-method: pop
947 * call_seq: pop(non_block=false)
949 * Retrieves data from the queue. If the queue is empty, the calling thread is
950 * suspended until data is pushed onto the queue. If +non_block+ is true, the
951 * thread isn't suspended, and an exception is raised.
955 static VALUE
956 rb_queue_pop(int argc, VALUE *argv, VALUE self)
958 Queue *queue;
959 int should_block;
960 VALUE result;
961 Data_Get_Struct(self, Queue, queue);
963 if (argc == 0) {
964 should_block = 1;
965 } else if (argc == 1) {
966 should_block = !RTEST(argv[0]);
967 } else {
968 rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
971 lock_mutex(&queue->mutex);
972 if (!queue->values.entries && !should_block) {
973 unlock_mutex(&queue->mutex);
974 rb_raise(rb_eThreadError, "queue empty");
977 while (!queue->values.entries) {
978 wait_condvar(&queue->value_available, &queue->mutex);
981 result = shift_list(&queue->values);
982 if (queue->capacity && queue->values.size < queue->capacity) {
983 signal_condvar(&queue->space_available);
985 unlock_mutex(&queue->mutex);
987 return result;
991 * Document-method: push
992 * call-seq: push(obj)
994 * Pushes +obj+ to the queue.
998 static VALUE
999 rb_queue_push(VALUE self, VALUE value)
1001 Queue *queue;
1002 Data_Get_Struct(self, Queue, queue);
1004 lock_mutex(&queue->mutex);
1005 while (queue->capacity && queue->values.size >= queue->capacity) {
1006 wait_condvar(&queue->space_available, &queue->mutex);
1008 push_list(&queue->values, value);
1009 signal_condvar(&queue->value_available);
1010 unlock_mutex(&queue->mutex);
1012 return self;
1016 * Document-class: SizedQueue
1018 * This class represents queues of specified size capacity. The push operation
1019 * may be blocked if the capacity is full.
1021 * See Queue for an example of how a SizedQueue works.
1026 * Document-method: new
1027 * call-seq: new
1029 * Creates a fixed-length queue with a maximum size of +max+.
1034 * Document-method: max
1035 * call-seq: max
1037 * Returns the maximum size of the queue.
1041 static VALUE
1042 rb_sized_queue_max(VALUE self)
1044 Queue *queue;
1045 VALUE result;
1046 Data_Get_Struct(self, Queue, queue);
1048 lock_mutex(&queue->mutex);
1049 result = ULONG2NUM(queue->capacity);
1050 unlock_mutex(&queue->mutex);
1052 return result;
1056 * Document-method: max=
1057 * call-seq: max=(size)
1059 * Sets the maximum size of the queue.
1063 static VALUE
1064 rb_sized_queue_max_set(VALUE self, VALUE value)
1066 Queue *queue;
1067 unsigned long new_capacity;
1068 unsigned long difference;
1069 Data_Get_Struct(self, Queue, queue);
1071 new_capacity = NUM2ULONG(value);
1073 if (new_capacity < 1) {
1074 rb_raise(rb_eArgError, "value must be positive");
1077 lock_mutex(&queue->mutex);
1078 if (queue->capacity && new_capacity > queue->capacity) {
1079 difference = new_capacity - queue->capacity;
1080 } else {
1081 difference = 0;
1083 queue->capacity = new_capacity;
1084 for (; difference > 0; --difference) {
1085 signal_condvar(&queue->space_available);
1087 unlock_mutex(&queue->mutex);
1089 return self;
1093 * Document-method: push
1094 * call-seq: push(obj)
1096 * Pushes +obj+ to the queue. If there is no space left in the queue, waits
1097 * until space becomes available.
1102 * Document-method: pop
1103 * call-seq: pop(non_block=false)
1105 * Retrieves data from the queue and runs a waiting thread, if any.
1109 /* for marshalling mutexes and condvars */
1111 static VALUE
1112 dummy_load(VALUE self, VALUE string)
1114 return Qnil;
1117 static VALUE
1118 dummy_dump(VALUE self)
1120 return rb_str_new2("");
1123 void
1124 Init_thread(void)
1126 rb_define_singleton_method(rb_cThread, "exclusive", rb_thread_exclusive, 0);
1128 rb_cMutex = rb_define_class("Mutex", rb_cObject);
1129 rb_define_alloc_func(rb_cMutex, rb_mutex_alloc);
1130 rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1);
1131 rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0);
1132 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1133 rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0);
1134 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1135 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1136 rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0);
1137 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0);
1139 rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject);
1140 rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc);
1141 rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1);
1142 rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0);
1143 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1);
1144 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1145 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1147 rb_cQueue = rb_define_class("Queue", rb_cObject);
1148 rb_define_alloc_func(rb_cQueue, rb_queue_alloc);
1149 rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1);
1150 rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0);
1151 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1152 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1153 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1154 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1155 rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
1156 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1157 rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
1158 rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
1159 rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
1160 rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
1161 rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
1163 rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue);
1164 rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1);
1165 rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0);
1166 rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1);
1167 rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1);
1168 rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0);
1169 rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1);
1170 rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push"));
1171 rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push"));
1172 rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop"));
1173 rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop"));