* io.c (rb_open_file): encoding in mode string was ignored if perm is
[ruby-svn.git] / lib / thread.rb
blob68eeaf5b05e790528c3b14bdee12495e274ef0c5
2 #               thread.rb - thread support classes
3 #                       by Yukihiro Matsumoto <matz@netlab.co.jp>
5 # Copyright (C) 2001  Yukihiro Matsumoto
6 # Copyright (C) 2000  Network Applied Communication Laboratory, Inc.
7 # Copyright (C) 2000  Information-technology Promotion Agency, Japan
10 unless defined? Thread
11   raise "Thread not available for this ruby interpreter"
12 end
14 unless defined? ThreadError
15   class ThreadError < StandardError
16   end
17 end
19 if $DEBUG
20   Thread.abort_on_exception = true
21 end
23
24 # ConditionVariable objects augment class Mutex. Using condition variables,
25 # it is possible to suspend while in the middle of a critical section until a
26 # resource becomes available.
28 # Example:
30 #   require 'thread'
32 #   mutex = Mutex.new
33 #   resource = ConditionVariable.new
34 #   
35 #   a = Thread.new {
36 #     mutex.synchronize {
37 #       # Thread 'a' now needs the resource
38 #       resource.wait(mutex)
39 #       # 'a' can now have the resource
40 #     }
41 #   }
42 #   
43 #   b = Thread.new {
44 #     mutex.synchronize {
45 #       # Thread 'b' has finished using the resource
46 #       resource.signal
47 #     }
48 #   }
50 class ConditionVariable
51   #
52   # Creates a new ConditionVariable
53   #
54   def initialize
55     @waiters = []
56     @waiters_mutex = Mutex.new
57   end
58   
59   #
60   # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
61   #
62   def wait(mutex)
63     begin
64       # TODO: mutex should not be used
65       @waiters_mutex.synchronize do
66         @waiters.push(Thread.current)
67       end
68       mutex.sleep
69     end
70   end
71   
72   #
73   # Wakes up the first thread in line waiting for this lock.
74   #
75   def signal
76     begin
77       t = @waiters_mutex.synchronize { @waiters.shift }
78       t.run if t
79     rescue ThreadError
80       retry
81     end
82   end
83     
84   #
85   # Wakes up all threads waiting for this lock.
86   #
87   def broadcast
88     # TODO: imcomplete
89     waiters0 = nil
90     @waiters_mutex.synchronize do
91       waiters0 = @waiters.dup
92       @waiters.clear
93     end
94     for t in waiters0
95       begin
96         t.run
97       rescue ThreadError
98       end
99     end
100   end
104 # This class provides a way to synchronize communication between threads.
106 # Example:
108 #   require 'thread'
109 #   
110 #   queue = Queue.new
111 #   
112 #   producer = Thread.new do
113 #     5.times do |i|
114 #       sleep rand(i) # simulate expense
115 #       queue << i
116 #       puts "#{i} produced"
117 #     end
118 #   end
119 #   
120 #   consumer = Thread.new do
121 #     5.times do |i|
122 #       value = queue.pop
123 #       sleep rand(i/2) # simulate expense
124 #       puts "consumed #{value}"
125 #     end
126 #   end
127 #   
128 #   consumer.join
130 class Queue
131   #
132   # Creates a new queue.
133   #
134   def initialize
135     @que = []
136     @waiting = []
137     @que.taint          # enable tainted comunication
138     @waiting.taint
139     self.taint
140     @mutex = Mutex.new
141   end
143   #
144   # Pushes +obj+ to the queue.
145   #
146   def push(obj)
147     t = nil
148     @mutex.synchronize{
149       @que.push obj
150       begin
151         t = @waiting.shift
152         t.wakeup if t
153       rescue ThreadError
154         retry
155       end
156     }
157     begin
158       t.run if t
159     rescue ThreadError
160     end
161   end
163   #
164   # Alias of push
165   #
166   alias << push
168   #
169   # Alias of push
170   #
171   alias enq push
173   #
174   # Retrieves data from the queue.  If the queue is empty, the calling thread is
175   # suspended until data is pushed onto the queue.  If +non_block+ is true, the
176   # thread isn't suspended, and an exception is raised.
177   #
178   def pop(non_block=false)
179     while true
180       @mutex.synchronize{
181         if @que.empty?
182           raise ThreadError, "queue empty" if non_block
183           @waiting.push Thread.current
184           @mutex.sleep
185         else
186           return @que.shift
187         end
188       }
189     end
190   end
192   #
193   # Alias of pop
194   #
195   alias shift pop
197   #
198   # Alias of pop
199   #
200   alias deq pop
202   #
203   # Returns +true+ if the queue is empty.
204   #
205   def empty?
206     @que.empty?
207   end
209   #
210   # Removes all objects from the queue.
211   #
212   def clear
213     @que.clear
214   end
216   #
217   # Returns the length of the queue.
218   #
219   def length
220     @que.length
221   end
223   #
224   # Alias of length.
225   #
226   alias size length
228   #
229   # Returns the number of threads waiting on the queue.
230   #
231   def num_waiting
232     @waiting.size
233   end
237 # This class represents queues of specified size capacity.  The push operation
238 # may be blocked if the capacity is full.
240 # See Queue for an example of how a SizedQueue works.
242 class SizedQueue < Queue
243   #
244   # Creates a fixed-length queue with a maximum size of +max+.
245   #
246   def initialize(max)
247     raise ArgumentError, "queue size must be positive" unless max > 0
248     @max = max
249     @queue_wait = []
250     @queue_wait.taint           # enable tainted comunication
251     super()
252   end
254   #
255   # Returns the maximum size of the queue.
256   #
257   def max
258     @max
259   end
261   #
262   # Sets the maximum size of the queue.
263   #
264   def max=(max)
265     diff = nil
266     @mutex.synchronize {
267       if max <= @max
268         @max = max
269       else
270         diff = max - @max
271         @max = max
272       end
273     }
274     if diff
275       diff.times do
276         begin
277           t = @queue_wait.shift
278           t.run if t
279         rescue ThreadError
280           retry
281         end
282       end
283     end
284     max
285   end
287   #
288   # Pushes +obj+ to the queue.  If there is no space left in the queue, waits
289   # until space becomes available.
290   #
291   def push(obj)
292     t = nil
293     @mutex.synchronize{
294       while true
295         break if @que.length <= @max
296         @queue_wait.push Thread.current
297         @mutex.sleep
298       end
299     
300       @que.push obj
301       begin
302         t = @waiting.shift
303         t.wakeup if t
304       rescue ThreadError
305         retry
306       end
307     }
308     
309     begin
310       t.run if t
311     rescue ThreadError
312     end
313   end
315   #
316   # Alias of push
317   #
318   alias << push
320   #
321   # Alias of push
322   #
323   alias enq push
325   #
326   # Retrieves data from the queue and runs a waiting thread, if any.
327   #
328   def pop(*args)
329     retval = super
330     t = nil
331     @mutex.synchronize {
332       if @que.length < @max
333         begin
334           t = @queue_wait.shift
335           t.wakeup if t
336         rescue ThreadError
337           retry
338         end
339       end
340     }
341     begin
342       t.run if t
343     rescue ThreadError
344     end
345     retval
346   end
348   #
349   # Alias of pop
350   #
351   alias shift pop
353   #
354   # Alias of pop
355   #
356   alias deq pop
358   #
359   # Returns the number of threads waiting on the queue.
360   #
361   def num_waiting
362     @waiting.size + @queue_wait.size
363   end
366 # Documentation comments:
367 #  - How do you make RDoc inherit documentation from superclass?