Fix up Rubinius specific library specs.
[rbx.git] / lib / thread.rb
blob7074795a6de883433522cd8a8c660f0102bd2279
2 #               thread.rb - thread support classes
3 #                       $Date: 2006-12-31 07:02:22 -0800 (Sun, 31 Dec 2006) $
4 #                       by Yukihiro Matsumoto <matz@netlab.co.jp>
6 # Copyright (C) 2001  Yukihiro Matsumoto
7 # Copyright (C) 2000  Network Applied Communication Laboratory, Inc.
8 # Copyright (C) 2000  Information-technology Promotion Agency, Japan
11 unless defined? Thread
12   raise "Thread not available for this ruby interpreter"
13 end
15 unless defined? ThreadError
16   class ThreadError < StandardError
17   end
18 end
20 if $DEBUG
21   Thread.abort_on_exception = true
22 end
24 class Thread
25   #
26   # Wraps a block in Thread.critical, restoring the original value upon exit
27   # from the critical section.
28   #
29   def Thread.exclusive
30     _old = Thread.critical
31     begin
32       Thread.critical = true
33       return yield
34     ensure
35       Thread.critical = _old
36     end
37   end
38 end
40 class Mutex
41   def initialize
42     @lock = Channel.new
43     @owner = nil
44     @waiters = []
45     @lock << nil
46   end
48   def locked?
49     @lock.receive
50     begin
51       !!@owner
52     ensure
53       @lock << nil
54     end
55   end
57   def try_lock
58     @lock.receive
59     begin
60       if @owner
61         false
62       else
63         @owner = Thread.current
64         true
65       end
66     ensure
67       @lock << nil
68     end
69   end
71   def lock
72     @lock.receive
73     begin
74       while @owner
75         wchan = Channel.new
76         @waiters.push wchan
77         @lock << nil
78         wchan.receive
79         @lock.receive
80       end
81       @owner = Thread.current
82       self
83     ensure
84       @lock << nil
85     end
86   end
88   def unlock
89     @lock.receive
90     begin
91       raise ThreadError, "Not owner" unless @owner == Thread.current
92       @owner = nil
93       @waiters.shift << nil unless @waiters.empty?
94       self
95     ensure
96       @lock << nil
97     end
98   end
100   def synchronize
101     lock
102     begin
103       yield
104     ensure
105       unlock
106     end
107   end
111 # ConditionVariable objects augment class Mutex. Using condition variables,
112 # it is possible to suspend while in the middle of a critical section until a
113 # resource becomes available.
115 # Example:
117 #   require 'thread'
119 #   mutex = Mutex.new
120 #   resource = ConditionVariable.new
121 #   
122 #   a = Thread.new {
123 #     mutex.synchronize {
124 #       # Thread 'a' now needs the resource
125 #       resource.wait(mutex)
126 #       # 'a' can now have the resource
127 #     }
128 #   }
129 #   
130 #   b = Thread.new {
131 #     mutex.synchronize {
132 #       # Thread 'b' has finished using the resource
133 #       resource.signal
134 #     }
135 #   }
137 class ConditionVariable
138   #
139   # Creates a new ConditionVariable
140   #
141   def initialize
142     @lock = Channel.new
143     @waiters = []
144     @lock << nil
145   end
146   
147   #
148   # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
149   #
150   def wait(mutex, timeout=nil)
151     @lock.receive
152     begin
153       wchan = Channel.new
154       mutex.unlock
155       @waiters.push wchan
156       @lock << nil
157       if timeout
158         timeout_ms = (timeout*1000000).to_i
159         timeout_id = Scheduler.send_in_microseconds(wchan, timeout_ms, nil)
160       else
161         timeout_id = nil
162       end
163       signaled = wchan.receive
164       Scheduler.cancel(timeout_id) if timeout
165       mutex.lock
166       @lock.receive
167       unless signaled or @waiters.delete wchan
168         # we timed out, but got signaled afterwards (e.g. while waiting to
169         # acquire @lock), so pass that signal on to the next waiter
170         @waiters.shift << true unless @waiters.empty?
171       end
172       if timeout
173         !!signaled
174       else
175         nil
176       end
177     ensure
178       @lock << nil
179     end
180   end
181   
182   #
183   # Wakes up the first thread in line waiting for this lock.
184   #
185   def signal
186     @lock.receive
187     @waiters.shift << true unless @waiters.empty?
188     @lock << nil
189     nil
190   end
192   #
193   # Wakes up all threads waiting for this lock.
194   #
195   def broadcast
196     @lock.receive
197     @waiters.shift << true until @waiters.empty?
198     @lock << nil
199     nil
200   end
204 # This class provides a way to synchronize communication between threads.
206 # Example:
208 #   require 'thread'
209 #   
210 #   queue = Queue.new
211 #   
212 #   producer = Thread.new do
213 #     5.times do |i|
214 #       sleep rand(i) # simulate expense
215 #       queue << i
216 #       puts "#{i} produced"
217 #     end
218 #   end
219 #   
220 #   consumer = Thread.new do
221 #     5.times do |i|
222 #       value = queue.pop
223 #       sleep rand(i/2) # simulate expense
224 #       puts "consumed #{value}"
225 #     end
226 #   end
227 #   
228 #   consumer.join
230 class Queue
231   #
232   # Creates a new queue.
233   #
234   def initialize
235     @que = []
236     @que.taint          # enable tainted comunication
237     self.taint
238     @waiting = []
239     @waiting.taint
240     @mutex = Mutex.new
241     @resource = ConditionVariable.new
242   end
244   #
245   # Pushes +obj+ to the queue.
246   #
247   def push(obj)
248     @mutex.synchronize do
249       @que.push obj
250       @resource.signal
251     end
252   end
254   #
255   # Alias of push
256   #
257   alias << push
259   #
260   # Alias of push
261   #
262   alias enq push
264   #
265   # Retrieves data from the queue.  If the queue is empty, the calling thread is
266   # suspended until data is pushed onto the queue.  If +non_block+ is true, the
267   # thread isn't suspended, and an exception is raised.
268   #
269   def pop(non_block=false)
270     while true
271       @mutex.synchronize do
272         #FIXME: some code in net or somewhere violates encapsulation
273         #and demands that a waiting queue exist for Queue, as a result
274         #we have to do a linear search here to remove the current Thread.
275         @waiting.delete(Thread.current)
276         if @que.empty?
277           raise ThreadError, "queue empty" if non_block
278           @waiting.push Thread.current
279           @resource.wait(@mutex)
280         else
281           retval = @que.shift
282           @resource.signal
283           return retval
284         end
285       end
286     end
287   end
289   #
290   # Alias of pop
291   #
292   alias shift pop
294   #
295   # Alias of pop
296   #
297   alias deq pop
299   #
300   # Returns +true+ if the queue is empty.
301   #
302   def empty?
303     @que.empty?
304   end
306   #
307   # Removes all objects from the queue.
308   #
309   def clear
310     @que.clear
311   end
313   #
314   # Returns the length of the queue.
315   #
316   def length
317     @que.length
318   end
320   #
321   # Alias of length.
322   #
323   alias size length
325   #
326   # Returns the number of threads waiting on the queue.
327   #
328   def num_waiting
329     @waiting.size
330   end
334 # This class represents queues of specified size capacity.  The push operation
335 # may be blocked if the capacity is full.
337 # See Queue for an example of how a SizedQueue works.
339 class SizedQueue < Queue
340   #
341   # Creates a fixed-length queue with a maximum size of +max+.
342   #
343   def initialize(max)
344     raise ArgumentError, "queue size must be positive" unless max > 0
345     @max = max
346     @queue_wait = []
347     @queue_wait.taint           # enable tainted comunication
348     @size_mutex = Mutex.new
349     @sem = ConditionVariable.new
350     super()
351   end
353   #
354   # Returns the maximum size of the queue.
355   #
356   def max
357     @max
358   end
360   #
361   # Sets the maximum size of the queue.
362   #
363   def max=(max)
364     @size_mutex.synchronize do
365       @max = max
366       @sem.broadcast(@size_mutex)
367     end
368     max
369   end
371   #
372   # Pushes +obj+ to the queue.  If there is no space left in the queue, waits
373   # until space becomes available.
374   #
375   def push(obj)
376     while(true)
377       @size_mutex.synchronize do
378         @queue_wait.delete(Thread.current)
379         if(@que.size >= @max)
380           @queue_wait.push Thread.current
381           @sem.wait(@size_mutex)          
382         else
383           return super(obj)
384         end
385       end      
386     end
387   end
389   #
390   # Alias of push
391   #
392   alias << push
394   #
395   # Alias of push
396   #
397   alias enq push
399   #
400   # Retrieves data from the queue and runs a waiting thread, if any.
401   #
402   def pop(*args)
403     retval = super
404     
405     @size_mutex.synchronize do
406       if @que.size < @max
407         @sem.broadcast
408       end      
409     end
410     
411     return retval
412   end
414   #
415   # Alias of pop
416   #
417   alias shift pop
419   #
420   # Alias of pop
421   #
422   alias deq pop
424   #
425   # Returns the number of threads waiting on the queue.
426   #
427   def num_waiting
428     @waiting.size + @queue_wait.size
429   end
432 # Documentation comments:
433 #  - How do you make RDoc inherit documentation from superclass?