2 # thread.rb - thread support classes
3 # $Date: 2006-12-31 07:02:22 -0800 (Sun, 31 Dec 2006) $
4 # by Yukihiro Matsumoto <matz@netlab.co.jp>
6 # Copyright (C) 2001 Yukihiro Matsumoto
7 # Copyright (C) 2000 Network Applied Communication Laboratory, Inc.
8 # Copyright (C) 2000 Information-technology Promotion Agency, Japan
11 unless defined? Thread
12 raise "Thread not available for this ruby interpreter"
15 unless defined? ThreadError
16 class ThreadError < StandardError
21 Thread.abort_on_exception = true
26 # Wraps a block in Thread.critical, restoring the original value upon exit
27 # from the critical section.
30 _old = Thread.critical
32 Thread.critical = true
35 Thread.critical = _old
63 @owner = Thread.current
81 @owner = Thread.current
91 raise ThreadError, "Not owner" unless @owner == Thread.current
93 @waiters.shift << nil unless @waiters.empty?
111 # ConditionVariable objects augment class Mutex. Using condition variables,
112 # it is possible to suspend while in the middle of a critical section until a
113 # resource becomes available.
120 # resource = ConditionVariable.new
123 # mutex.synchronize {
124 # # Thread 'a' now needs the resource
125 # resource.wait(mutex)
126 # # 'a' can now have the resource
131 # mutex.synchronize {
132 # # Thread 'b' has finished using the resource
137 class ConditionVariable
139 # Creates a new ConditionVariable
148 # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
150 def wait(mutex, timeout=nil)
158 timeout_ms = (timeout*1000000).to_i
159 timeout_id = Scheduler.send_in_microseconds(wchan, timeout_ms, nil)
163 signaled = wchan.receive
164 Scheduler.cancel(timeout_id) if timeout
167 unless signaled or @waiters.delete wchan
168 # we timed out, but got signaled afterwards (e.g. while waiting to
169 # acquire @lock), so pass that signal on to the next waiter
170 @waiters.shift << true unless @waiters.empty?
183 # Wakes up the first thread in line waiting for this lock.
187 @waiters.shift << true unless @waiters.empty?
193 # Wakes up all threads waiting for this lock.
197 @waiters.shift << true until @waiters.empty?
204 # This class provides a way to synchronize communication between threads.
212 # producer = Thread.new do
214 # sleep rand(i) # simulate expense
216 # puts "#{i} produced"
220 # consumer = Thread.new do
223 # sleep rand(i/2) # simulate expense
224 # puts "consumed #{value}"
232 # Creates a new queue.
236 @que.taint # enable tainted comunication
241 @resource = ConditionVariable.new
245 # Pushes +obj+ to the queue.
248 @mutex.synchronize do
265 # Retrieves data from the queue. If the queue is empty, the calling thread is
266 # suspended until data is pushed onto the queue. If +non_block+ is true, the
267 # thread isn't suspended, and an exception is raised.
269 def pop(non_block=false)
271 @mutex.synchronize do
272 #FIXME: some code in net or somewhere violates encapsulation
273 #and demands that a waiting queue exist for Queue, as a result
274 #we have to do a linear search here to remove the current Thread.
275 @waiting.delete(Thread.current)
277 raise ThreadError, "queue empty" if non_block
278 @waiting.push Thread.current
279 @resource.wait(@mutex)
300 # Returns +true+ if the queue is empty.
307 # Removes all objects from the queue.
314 # Returns the length of the queue.
326 # Returns the number of threads waiting on the queue.
334 # This class represents queues of specified size capacity. The push operation
335 # may be blocked if the capacity is full.
337 # See Queue for an example of how a SizedQueue works.
339 class SizedQueue < Queue
341 # Creates a fixed-length queue with a maximum size of +max+.
344 raise ArgumentError, "queue size must be positive" unless max > 0
347 @queue_wait.taint # enable tainted comunication
348 @size_mutex = Mutex.new
349 @sem = ConditionVariable.new
354 # Returns the maximum size of the queue.
361 # Sets the maximum size of the queue.
364 @size_mutex.synchronize do
366 @sem.broadcast(@size_mutex)
372 # Pushes +obj+ to the queue. If there is no space left in the queue, waits
373 # until space becomes available.
377 @size_mutex.synchronize do
378 @queue_wait.delete(Thread.current)
379 if(@que.size >= @max)
380 @queue_wait.push Thread.current
381 @sem.wait(@size_mutex)
400 # Retrieves data from the queue and runs a waiting thread, if any.
405 @size_mutex.synchronize do
425 # Returns the number of threads waiting on the queue.
428 @waiting.size + @queue_wait.size
432 # Documentation comments:
433 # - How do you make RDoc inherit documentation from superclass?