4 # VMActors represent Actors running in another VM. They are duck-typed to Actors
5 # and can be used interchangably with them. They're useful for "scatter/gather"
6 # styles of programming, where a given dataset is partitioned among a number of
7 # VMs (each running in a native thread) so that respective portions of the data
8 # set are processed concurrently. They can also be used for simple load balancing
9 # between multiple copies of the same service running in different VMs.
12 @@actors_lock = Mutex.new
15 # Store the specified actor in the local Actor registry
17 @@actors_lock.synchronize { @@actors[actor.object_id] = actor }
20 # Process an incoming inter-VM actor message
21 def dispatch(command, *operands)
23 when :spawn then spawn(*operands)
24 when :message then process_message(*operands)
28 # Serialize an outgoing message. Why? Doesn't Marshal already handle this?
29 # Well, not really. One of the definining characteristics of Actors local
30 # ones behave identically to remote ones, be it on a different CPU on the
31 # same machine or a different machine entirely, located elsewhere on the
33 # Among other things, this means that we should be able to include Actor
34 # handles in messages, regardless of if the handle is to a local or remote
35 # Actor. A potential use case is synchronous communication, which requires
36 # we include the "return address" of an Actor in a message.
38 # In order to accomplish this, we need to translate any Actor handles in the
39 # outgoing message into a form that will be instantiated into a VMActor on
42 # FIXME: This implementation is naive, and only examines Arrays and Tuples as
43 # potential containers. Obviously it's possible to transmit Actor objects in
44 # other ways (e.g. in a Hash, or an instance variable in an arbitrary object)
45 # which this implementation will miss. A better approach is needed.
46 def serialize_message(value)
48 when Array then value.map { |child| serialize_message(child) }
49 when Tuple then Tuple[*serialize_message(value.to_a)]
51 VMActor.register value
52 Tuple[:VMActor, Rubinius::VM_ID, value.object_id]
57 # Process an incoming message, translating VMActor Tuples into VMActor objects
58 def unserialize_message(value)
61 if value.first == :VMActor
62 _, vm_id, actor_id = value
63 VMActor.new(vm_id, actor_id)
65 Tuple[*value.map { |child| unserialize_message(child) }]
75 # Create a new VMActor, performing either a faux-apply of a Class/Method/Argument
76 # combination or an evaluation of a string to create Actors on remote nodes.
77 # This is called directly by the remote VM itself. In order to create new VMActors
78 # use VMActor::Container#spawn_actor instead.
79 def spawn(container, remote_actor, *operands)
80 actor = case operands.first
82 command = operands.first
83 Actor.spawn { eval command }
85 klass, meth, *args = operands
86 klass = Object.const_lookup klass
88 Actor.spawn { klass.send(meth, *args) }
90 raise ArgumentError, "first argument must be a Symbol (for application) or String (for evaluation)"
95 Rubinius::VM.send_message(container, [
99 Tuple[:spawn_reply, actor.object_id]
101 rescue Exception => ex
102 Rubinius::VM.send_message(container, [
106 Tuple[:spawn_error, "#{ex.class}: #{[ex, *ex.backtrace].join("\n\t")}"]
110 # Process a message sent from a remote Actor and direct it to the appropriate
112 def process_message(actor_id, message)
113 actor = @@actors_lock.synchronize { @@actors[actor_id] }
114 #puts "sending #{unserialize_message(message).inspect} to #{actor.inspect}"
116 actor << unserialize_message(message)
120 def initialize(container_id, actor_id)
121 @container_id, @actor_id = container_id, actor_id
124 # Send a message to a VMActor
126 Rubinius::VM.send_message(@container_id, [:actor, :message, @actor_id, VMActor.serialize_message(value)])
129 alias_method :send, :<<
135 # VMActor::Container wraps Rubinius's native multi-VM support. The newly
136 # created VM is automatically configured for supporting remote Actors, but
137 # can still be given arbitrary command line arguments.
142 # Start the message responder thread for a parent VM
143 def init_message_responder
144 @@responder ||= Thread.new(&method(:process_messages))
147 # Start message processing in a child VM
148 def child_message_responder(parent_vm = nil, actor = nil)
149 # Notify the parent VM that the current one is ready
150 Rubinius::VM.send_message(parent_vm, [
154 Tuple[:ready, Rubinius::VM_ID]
160 # Receive from the VM message queue
161 # Actor events get sent to VMActor.dispatch
163 Rubinius::VM.each_message do |command, *operands|
164 VMActor.dispatch(*operands) if command == :actor
169 # Create a new VMActor. Accepts command line arguments to pass to the new VM
170 def initialize(*args)
171 Container.init_message_responder
172 VMActor.register Actor.current
174 # Ensure any -r's passed happen before we execute the message processor
177 "-e", "VMActor::Container.child_message_responder(#{Rubinius::VM_ID},#{Actor.current.object_id})",
180 @vm = Rubinius::VM.spawn(*args)
182 Actor.receive do |filter|
183 # Wait for new VM to become ready
184 filter.when(Tuple[:ready, @vm.id]) {}
188 # Create a new Actor on a remote VM and obtain a handle to it. Accepts arguments
189 # specifying what code to execute in the context of the new Actor:
193 # The first approach accepts a class name and method name (as symbols) and an
194 # arbitrary number of arguments (or none). The remote Actor will invoke the
195 # given method of the given class with the given arguments and return a handle
196 # to the VMActor the method is running in the context of:
198 # container = VMActor::Container.new
199 # actor = container.spawn_actor :Kernel, :puts, 'hi'
203 # The second approach accepts a string to evaluate in the context of the newly
206 # container = VMActor::Container.new
207 # actor = container.spawn_actor "puts 'hi'"
209 def spawn_actor(*args)
210 VMActor.register Actor.current
212 Rubinius::VM.send_message(@vm.id, [
216 Actor.current.object_id,
220 Actor.receive do |filter|
221 filter.when(Tuple[:spawn_reply, Object]) do |message|
222 _, actor_id = message
223 return VMActor.new(@vm.id, actor_id)
226 filter.when(Tuple[:spawn_error, Object]) do |message|