Fix up Rubinius specific library specs.
[rbx.git] / lib / vmactor.rb
blob87b2ffefd58d8cd40ee1f6e4c2a1c7a190f4b547
1 require 'thread'
2 require 'actor'
4 # VMActors represent Actors running in another VM.  They are duck-typed to Actors
5 # and can be used interchangably with them.  They're useful for "scatter/gather"
6 # styles of programming, where a given dataset is partitioned among a number of
7 # VMs (each running in a native thread) so that respective portions of the data
8 # set are processed concurrently.  They can also be used for simple load balancing
9 # between multiple copies of the same service running in different VMs.
10 class VMActor
11   @@actors = {}
12   @@actors_lock = Mutex.new
14   class << self
15     # Store the specified actor in the local Actor registry
16     def register(actor)
17       @@actors_lock.synchronize { @@actors[actor.object_id] = actor }
18     end
20     # Process an incoming inter-VM actor message
21     def dispatch(command, *operands)
22       case command
23       when :spawn then spawn(*operands)
24       when :message then process_message(*operands)
25       end
26     end
28     # Serialize an outgoing message.  Why?  Doesn't Marshal already handle this?
29     # Well, not really.  One of the definining characteristics of Actors local
30     # ones behave identically to remote ones, be it on a different CPU on the
31     # same machine or a different machine entirely, located elsewhere on the
32     # network.
33     # Among other things, this means that we should be able to include Actor
34     # handles in messages, regardless of if the handle is to a local or remote
35     # Actor.  A potential use case is synchronous communication, which requires
36     # we include the "return address" of an Actor in a message.
37     #
38     # In order to accomplish this, we need to translate any Actor handles in the
39     # outgoing message into a form that will be instantiated into a VMActor on
40     # the other side.
41     #
42     # FIXME: This implementation is naive, and only examines Arrays and Tuples as
43     # potential containers.  Obviously it's possible to transmit Actor objects in
44     # other ways (e.g. in a Hash, or an instance variable in an arbitrary object)
45     # which this implementation will miss.  A better approach is needed.
46     def serialize_message(value)
47       case value
48       when Array then value.map { |child| serialize_message(child) }
49       when Tuple then Tuple[*serialize_message(value.to_a)]
50       when Actor
51         VMActor.register value
52         Tuple[:VMActor, Rubinius::VM_ID, value.object_id]
53       else value
54       end
55     end
57     # Process an incoming message, translating VMActor Tuples into VMActor objects
58     def unserialize_message(value)
59       case value
60       when Tuple
61         if value.first == :VMActor
62           _, vm_id, actor_id = value
63           VMActor.new(vm_id, actor_id)
64         else
65           Tuple[*value.map { |child| unserialize_message(child) }]
66         end
67       else value
68       end
69     end
71     #######
72     private
73     #######
75     # Create a new VMActor, performing either a faux-apply of a Class/Method/Argument
76     # combination or an evaluation of a string to create Actors on remote nodes.
77     # This is called directly by the remote VM itself.  In order to create new VMActors
78     # use VMActor::Container#spawn_actor instead.
79     def spawn(container, remote_actor, *operands)
80       actor = case operands.first
81       when String
82         command = operands.first
83         Actor.spawn { eval command }
84       when Symbol
85         klass, meth, *args = operands
86         klass = Object.const_lookup klass
88         Actor.spawn { klass.send(meth, *args) }
89       else
90         raise ArgumentError, "first argument must be a Symbol (for application) or String (for evaluation)"
91       end
93       register(actor)
95       Rubinius::VM.send_message(container, [
96         :actor,
97         :message,
98         remote_actor,
99         Tuple[:spawn_reply, actor.object_id]
100       ])
101     rescue Exception => ex
102       Rubinius::VM.send_message(container, [
103         :actor,
104         :message,
105         remote_actor,
106         Tuple[:spawn_error, "#{ex.class}: #{[ex, *ex.backtrace].join("\n\t")}"]
107       ])
108     end
110     # Process a message sent from a remote Actor and direct it to the appropriate
111     # local one.
112     def process_message(actor_id, message)
113       actor = @@actors_lock.synchronize { @@actors[actor_id] }
114       #puts "sending #{unserialize_message(message).inspect} to #{actor.inspect}"
115       return unless actor
116       actor << unserialize_message(message)
117     end
118   end
120   def initialize(container_id, actor_id)
121     @container_id, @actor_id = container_id, actor_id
122   end
124   # Send a message to a VMActor
125   def <<(value)
126     Rubinius::VM.send_message(@container_id, [:actor, :message, @actor_id, VMActor.serialize_message(value)])
127   end
129   alias_method :send, :<<
131   #######
132   private
133   #######
135   # VMActor::Container wraps Rubinius's native multi-VM support.  The newly
136   # created VM is automatically configured for supporting remote Actors, but
137   # can still be given arbitrary command line arguments.
138   class Container
139     @@responder = nil
141     class << self
142       # Start the message responder thread for a parent VM
143       def init_message_responder
144         @@responder ||= Thread.new(&method(:process_messages))
145       end
147       # Start message processing in a child VM
148       def child_message_responder(parent_vm = nil, actor = nil)
149         # Notify the parent VM that the current one is ready
150         Rubinius::VM.send_message(parent_vm, [
151           :actor,
152           :message,
153           actor,
154           Tuple[:ready, Rubinius::VM_ID]
155         ])
157         process_messages
158       end
160       # Receive from the VM message queue
161       # Actor events get sent to VMActor.dispatch
162       def process_messages
163         Rubinius::VM.each_message do |command, *operands|
164           VMActor.dispatch(*operands) if command == :actor
165         end
166       end
167     end
169     # Create a new VMActor.  Accepts command line arguments to pass to the new VM
170     def initialize(*args)
171       Container.init_message_responder
172       VMActor.register Actor.current
174       # Ensure any -r's passed happen before we execute the message processor
175       args += [
176         "-rvmactor",
177         "-e", "VMActor::Container.child_message_responder(#{Rubinius::VM_ID},#{Actor.current.object_id})",
178       ]
180       @vm = Rubinius::VM.spawn(*args)
182       Actor.receive do |filter|
183         # Wait for new VM to become ready
184         filter.when(Tuple[:ready, @vm.id]) {}
185       end
186     end
188     # Create a new Actor on a remote VM and obtain a handle to it.  Accepts arguments
189     # specifying what code to execute in the context of the new Actor:
190     #
191     # == "Apply" style
192     #
193     # The first approach accepts a class name and method name (as symbols) and an
194     # arbitrary number of arguments (or none).  The remote Actor will invoke the
195     # given method of the given class with the given arguments and return a handle
196     # to the VMActor the method is running in the context of:
197     #
198     #  container = VMActor::Container.new
199     #  actor = container.spawn_actor :Kernel, :puts, 'hi'
200     #
201     # == "Eval" style
202     #
203     # The second approach accepts a string to evaluate in the context of the newly
204     # created actor:
205     #
206     #  container = VMActor::Container.new
207     #  actor = container.spawn_actor "puts 'hi'"
208     #
209     def spawn_actor(*args)
210       VMActor.register Actor.current
212       Rubinius::VM.send_message(@vm.id, [
213         :actor,
214         :spawn,
215         Rubinius::VM_ID,
216         Actor.current.object_id,
217         *args
218       ])
220       Actor.receive do |filter|
221         filter.when(Tuple[:spawn_reply, Object]) do |message|
222           _, actor_id = message
223           return VMActor.new(@vm.id, actor_id)
224         end
226         filter.when(Tuple[:spawn_error, Object]) do |message|
227           _, ex = message
228           raise ex
229         end
230       end
231     end
232   end