1 # actor.rb - implementation of the actor model
3 # Copyright 2007-2008 MenTaLguY <mental@rydia.net>
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are met:
10 # * Redistributions of source code must retain the above copyright notice,
11 # thi slist of conditions and the following disclaimer.
12 # * Redistributions in binary form must reproduce the above copyright notice
13 # this list of conditions and the following disclaimer in the documentatio
14 # and/or other materials provided with the distribution.
15 # * Neither the name of the Evan Phoenix nor the names of its contributors
16 # may be used to endorse or promote products derived from this software
17 # without specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 # POSSIBILITY OF SUCH DAMAGE.
32 class DeadActorError < RuntimeError
35 def initialize(actor, reason)
48 alias_method :private_new, :new
51 @@registered_lock = Channel.new
53 @@registered_lock << nil
56 Thread.current[:__current_actor__] ||= private_new
59 # Spawn a new Actor that will run in its own thread
60 def spawn(*args, &block)
61 raise ArgumentError, "no block given" unless block
64 private_new do |actor|
65 Thread.current[:__current_actor__] = actor
72 alias_method :new, :spawn
74 # Atomically spawn an actor and link it to the current actor
75 def spawn_link(*args, &block)
76 current = self.current
77 link_complete = Channel.new
82 link_complete << Actor.current
89 # Polls for exit notifications
90 def check_for_interrupt
91 current._check_for_interrupt
95 # Waits until a matching message is received in the current actor's
96 # mailbox, and executes the appropriate action. May be interrupted by
98 def receive #:yields: filter
103 filter.when(ANY) { |m| m }
105 current._receive(filter)
108 # Send a "fake" exit notification to another actor, as if the current
109 # actor had exited with +reason+
110 def send_exit(recipient, reason)
111 recipient.notify_exited(current, reason)
115 # Link the current Actor to another one.
117 current = self.current
118 current.notify_link actor
119 actor.notify_link current
123 # Unlink the current Actor from another one
125 current = self.current
126 current.notify_unlink actor
127 actor.notify_unlink current
131 # Actors trapping exit do not die when an error occurs in an Actor they
132 # are linked to. Instead the exit message is sent to their regular
133 # mailbox in the form [:exit, actor, reason]. This allows certain
134 # Actors to supervise sets of others and restart them in the event
135 # of an error. Setting the trap flag may be interrupted by pending
136 # exit notifications.
138 def trap_exit=(value)
139 current._trap_exit = value
143 # Is the Actor trapping exit?
147 alias_method :trap_exit?, :trap_exit
149 # Lookup a locally named service
151 raise ArgumentError, "name must be a symbol" unless Symbol === name
152 @@registered_lock.receive
156 @@registered_lock << nil
159 alias_method :[], :lookup
161 # Register an Actor locally as a named service
162 def register(name, actor)
163 raise ArgumentError, "name must be a symbol" unless Symbol === name
164 unless actor.nil? or actor.is_a?(Actor)
165 raise ArgumentError, "only actors may be registered"
168 @@registered_lock.receive
171 @@registered.delete(name)
173 @@registered[name] = actor
176 @@registered_lock << nil
179 alias_method :[]=, :register
181 def _unregister(actor) #:nodoc:
182 @@registered_lock.receive
184 @@registered.delete_if { |n, a| actor.equal? a }
186 @@registered_lock << nil
205 @thread = Thread.current
210 watchdog { yield self }
212 Thread.new { watchdog { @thread.join } }
219 return self unless @alive
221 @action = @filter.action_for(message)
237 alias_method :<<, :send
239 def _check_for_interrupt #:nodoc:
243 raise @interrupts.shift unless @interrupts.empty?
249 def _receive(filter) #:nodoc:
257 raise @interrupts.shift unless @interrupts.empty?
259 for i in 0...(@mailbox.size)
260 message = @mailbox[i]
261 action = filter.action_for(message)
263 @mailbox.delete_at(i)
270 timeout_id = Scheduler.send_in_seconds(@ready, filter.timeout, true)
277 timed_out = @ready.receive
281 Scheduler.cancel(timeout_id)
283 @ready = Channel.new if @ready.receive
287 if not timed_out and @interrupts.empty?
291 @mailbox << @message if @action
296 raise @interrupts.shift unless @interrupts.empty?
303 filter.timeout_action.call
309 # Notify this actor that it's now linked to the given one; this is not
310 # intended to be used directly except by actor implementations. Most
311 # users will want to use Actor.link instead.
313 def notify_link(actor)
319 exit_reason = @exit_reason
320 @links << actor if alive and not @links.include? actor
324 actor.notify_exited(self, exit_reason) unless alive
328 # Notify this actor that it's now unlinked from the given one; this is
329 # not intended to be used directly except by actor implementations. Most
330 # users will want to use Actor.unlink instead.
332 def notify_unlink(actor)
335 return self unless @alive
343 # Notify this actor that one of the Actors it's linked to has exited;
344 # this is not intended to be used directly except by actor implementations.
345 # Most users will want to use Actor.send_exit instead.
347 def notify_exited(actor, reason)
350 return self unless @alive
353 send [:exit, actor, reason]
355 @interrupts << DeadActorError.new(actor, reason)
371 rescue Exception => reason
374 Actor._unregister(self)
380 @exit_reason = reason
386 links.each do |actor|
388 actor.notify_exited(self, reason)
397 unless Thread.current == @thread
398 raise ThreadError, "illegal cross-actor call"
401 private :check_thread
403 def _trap_exit=(value) #:nodoc:
407 raise @interrupts.shift unless @interrupts.empty?
414 def _trap_exit #:nodoc:
425 require 'actor/filter'