Fix up Rubinius specific library specs.
[rbx.git] / lib / actor.rb
blob146c416abfd4963ac661b636acda51761256cbf1
1 # actor.rb - implementation of the actor model
3 # Copyright 2007-2008  MenTaLguY <mental@rydia.net>
5 # All rights reserved.
6
7 # Redistribution and use in source and binary forms, with or without 
8 # modification, are permitted provided that the following conditions are met:
9
10 # * Redistributions of source code must retain the above copyright notice,
11 #   thi slist of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above copyright notice
13 #   this list of conditions and the following disclaimer in the documentatio
14 #   and/or other materials provided with the distribution.
15 # * Neither the name of the Evan Phoenix nor the names of its contributors 
16 #   may be used to endorse or promote products derived from this software 
17 #   without specific prior written permission.
18
19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
20 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
21 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
31 class Actor
32   class DeadActorError < RuntimeError
33     attr_reader :actor
34     attr_reader :reason
35     def initialize(actor, reason)
36       super(reason)
37       @actor = actor
38       @reason = reason
39     end
40   end
42   ANY = Object.new
43   def ANY.===(other)
44     true
45   end
47   class << self
48     alias_method :private_new, :new
49     private :private_new
51     @@registered_lock = Channel.new
52     @@registered = {}
53     @@registered_lock << nil
54   
55     def current
56       Thread.current[:__current_actor__] ||= private_new
57     end
59     # Spawn a new Actor that will run in its own thread
60     def spawn(*args, &block)
61       raise ArgumentError, "no block given" unless block
62       spawned = Channel.new
63       Thread.new do
64         private_new do |actor|
65           Thread.current[:__current_actor__] = actor
66           spawned << actor
67           block.call *args
68         end
69       end
70       spawned.receive
71     end
72     alias_method :new, :spawn
74     # Atomically spawn an actor and link it to the current actor
75     def spawn_link(*args, &block)
76       current = self.current
77       link_complete = Channel.new
78       spawn do
79         begin
80           Actor.link(current)
81         ensure
82           link_complete << Actor.current
83         end
84         block.call *args
85       end
86       link_complete.receive
87     end
89     # Polls for exit notifications
90     def check_for_interrupt
91       current._check_for_interrupt
92       self
93     end
95     # Waits until a matching message is received in the current actor's
96     # mailbox, and executes the appropriate action.  May be interrupted by
97     # exit notifications.
98     def receive #:yields: filter
99       filter = Filter.new
100       if block_given?
101         yield filter
102       else
103         filter.when(ANY) { |m| m }
104       end
105       current._receive(filter)
106     end
108     # Send a "fake" exit notification to another actor, as if the current
109     # actor had exited with +reason+
110     def send_exit(recipient, reason)
111       recipient.notify_exited(current, reason)
112       self
113     end
114     
115     # Link the current Actor to another one.
116     def link(actor)
117       current = self.current
118       current.notify_link actor
119       actor.notify_link current
120       self
121     end
122     
123     # Unlink the current Actor from another one
124     def unlink(actor)
125       current = self.current
126       current.notify_unlink actor
127       actor.notify_unlink current
128       self
129     end
131     # Actors trapping exit do not die when an error occurs in an Actor they
132     # are linked to.  Instead the exit message is sent to their regular
133     # mailbox in the form [:exit, actor, reason].  This allows certain
134     # Actors to supervise sets of others and restart them in the event
135     # of an error.  Setting the trap flag may be interrupted by pending
136     # exit notifications.
137     #
138     def trap_exit=(value)
139       current._trap_exit = value
140       self
141     end
143     # Is the Actor trapping exit?
144     def trap_exit
145       current._trap_exit
146     end
147     alias_method :trap_exit?, :trap_exit
149     # Lookup a locally named service
150     def lookup(name)
151       raise ArgumentError, "name must be a symbol" unless Symbol === name
152       @@registered_lock.receive
153       begin
154         @@registered[name]
155       ensure
156         @@registered_lock << nil
157       end
158     end
159     alias_method :[], :lookup
161     # Register an Actor locally as a named service
162     def register(name, actor)
163       raise ArgumentError, "name must be a symbol" unless Symbol === name
164       unless actor.nil? or actor.is_a?(Actor)
165         raise ArgumentError, "only actors may be registered"
166       end
168       @@registered_lock.receive
169       begin
170         if actor.nil?
171           @@registered.delete(name)
172         else
173           @@registered[name] = actor
174         end
175       ensure
176         @@registered_lock << nil
177       end
178     end
179     alias_method :[]=, :register
181     def _unregister(actor) #:nodoc:
182       @@registered_lock.receive
183       begin
184         @@registered.delete_if { |n, a| actor.equal? a }
185       ensure
186         @@registered_lock << nil
187       end
188     end
189   end
191   def initialize
192     @lock = Channel.new
194     @filter = nil
195     @ready = Channel.new
196     @action = nil
197     @message = nil
199     @mailbox = []
200     @interrupts = []
201     @links = []
202     @alive = true
203     @exit_reason = nil
204     @trap_exit = false
205     @thread = Thread.current
207     @lock << nil
209     if block_given?
210       watchdog { yield self }
211     else
212       Thread.new { watchdog { @thread.join } }
213     end
214   end
216   def send(message)
217     @lock.receive
218     begin
219       return self unless @alive
220       if @filter
221         @action = @filter.action_for(message)
222         if @action
223           @filter = nil
224           @message = message
225           @ready << nil
226         else
227           @mailbox << message
228         end
229       else
230         @mailbox << message
231       end
232     ensure
233       @lock << nil
234     end
235     self
236   end
237   alias_method :<<, :send
239   def _check_for_interrupt #:nodoc:
240     check_thread
241     @lock.receive
242     begin
243       raise @interrupts.shift unless @interrupts.empty?
244     ensure
245       @lock << nil
246     end
247   end
249   def _receive(filter) #:nodoc:
250     check_thread
252     action = nil
253     message = nil
254     timed_out = false
255     @lock.receive
256     begin
257       raise @interrupts.shift unless @interrupts.empty?
259       for i in 0...(@mailbox.size)
260         message = @mailbox[i]
261         action = filter.action_for(message)
262         if action
263           @mailbox.delete_at(i)
264           break
265         end
266       end
268       unless action
269         if filter.timeout?
270           timeout_id = Scheduler.send_in_seconds(@ready, filter.timeout, true)
271         else
272           timeout_id = nil
273         end
274         @filter = filter
275         @lock << nil
276         begin
277           timed_out = @ready.receive
278         ensure
279           @lock.receive
280           if timeout_id
281             Scheduler.cancel(timeout_id)
282             @ready << nil
283             @ready = Channel.new if @ready.receive
284           end
285         end
287         if not timed_out and @interrupts.empty?
288           action = @action
289           message = @message
290         else
291           @mailbox << @message if @action
292         end
293         @action = nil
294         @message = nil
296         raise @interrupts.shift unless @interrupts.empty?
297       end
298     ensure
299       @lock << nil
300     end
302     if timed_out
303       filter.timeout_action.call
304     else
305       action.call message
306     end
307   end
309   # Notify this actor that it's now linked to the given one; this is not
310   # intended to be used directly except by actor implementations.  Most
311   # users will want to use Actor.link instead.
312   #
313   def notify_link(actor)
314     @lock.receive
315     alive = nil
316     exit_reason = nil
317     begin
318       alive = @alive
319       exit_reason = @exit_reason
320       @links << actor if alive and not @links.include? actor
321     ensure
322       @lock << nil
323     end
324     actor.notify_exited(self, exit_reason) unless alive
325     self
326   end
327   
328   # Notify this actor that it's now unlinked from the given one; this is
329   # not intended to be used directly except by actor implementations.  Most
330   # users will want to use Actor.unlink instead.
331   #
332   def notify_unlink(actor)
333     @lock.receive
334     begin
335       return self unless @alive
336       @links.delete(actor)
337     ensure
338       @lock << nil
339     end
340     self
341   end
342   
343   # Notify this actor that one of the Actors it's linked to has exited;
344   # this is not intended to be used directly except by actor implementations.
345   # Most users will want to use Actor.send_exit instead.
346   #
347   def notify_exited(actor, reason)
348     @lock.receive
349     begin
350       return self unless @alive
351       @links.delete(actor)
352       if @trap_exit
353         send [:exit, actor, reason]
354       elsif reason
355         @interrupts << DeadActorError.new(actor, reason)
356         if @filter
357           @filter = nil
358           @ready << nil
359         end
360       end
361     ensure
362       @lock << nil
363     end
364     self
365   end
367   def watchdog
368     reason = nil
369     begin
370       yield
371     rescue Exception => reason
372     ensure
373       links = nil
374       Actor._unregister(self)
375       @lock.receive
376       begin
377         @alive = false
378         @mailbox = nil
379         @interrupts = nil
380         @exit_reason = reason
381         links = @links
382         @links = nil
383       ensure
384         @lock << nil
385       end
386       links.each do |actor|
387         begin
388           actor.notify_exited(self, reason)
389         rescue Exception
390         end
391       end
392     end
393   end
394   private :watchdog
396   def check_thread
397     unless Thread.current == @thread
398       raise ThreadError, "illegal cross-actor call"
399     end
400   end
401   private :check_thread
402   
403   def _trap_exit=(value) #:nodoc:
404     check_thread
405     @lock.receive
406     begin
407       raise @interrupts.shift unless @interrupts.empty?
408       @trap_exit = !!value
409     ensure
410       @lock << nil
411     end
412   end
413   
414   def _trap_exit #:nodoc:
415     check_thread
416     @lock.receive
417     begin
418       @trap_exit
419     ensure
420       @lock << nil
421     end
422   end
425 require 'actor/filter'