Fix up Rubinius specific library specs.
[rbx.git] / lib / join.rb
blob93d4f851b687b69fe175b64c937f1ab047b8ca42
1 class Join
2   def self.new(&block)
3     raise ArgumentError, "No block given" unless block
4     c = Class.new self
6     c.instance_eval do
7       @max = -1
8       @indices = {}
9       @chords = []
10       @sync = {}
11     end
13     def c.chord(*names, &action)
14       raise ArgumentError, "No channel names specified" if names.empty?
15       raise ArgumentError, "No block given" unless action
16       names.map! { |n| n.to_sym }
17       indices = __defchannels__(*names)
18       mask = indices.inject(0) { |acc, i| acc | ( 1 << i ) }
19       @chords.push [ mask, indices, action ].freeze
20       names.each_with_index do |n, i|
21       end
22       self
23     end
25     def c.async(*names)
26       names.map! { |n| n.to_sym }
27       __defchannels__(*names).each { |index| @sync[index] = false }
28       self
29     end
31     def c.sync(*names)
32       names.map! { |n| n.to_sym }
33       __defchannels__(*names).each { |index| @sync[index] = true }
34       self
35     end
37     def c.__defchannels__(*names)
38       indices = names.map { |name| ( @indices[name] ||= ( @max += 1 ) ) }
39       names.each_with_index do |name, i|
40         eval <<-EOS
41           def #{ name }(value=nil)
42             __channel_send__( #{ indices[i] }, value )
43           end
44         EOS
45       end
46       indices
47     end
49     class << c
50       private :chord, :async, :sync, :__defchannels__
51     end 
53     c.class_eval(&block)
54     c.instance_eval { @chords.reverse! }
56     @chords.freeze
57     @sync.freeze
59     class << c
60       remove_method :chord, :async, :sync, :__defchannels__
61       def new(*args, &block)
62         obj = allocate
63         chords = @chords
64         max = @max
65         sync = @sync
66         obj.instance_eval do
67           @join_pending = (0..max).map { Array.new }
68           @join_pending_mask = 0
69           @join_chords = chords
70           @join_sync = sync
71           @join_lock = Channel.new
72           @join_lock.send nil
73           initialize(*args, &block)
74         end
75         obj
76       end
77     end
79     c
80   end
82   def __channel_send__(index, value)
83     if @join_sync[index]
84       reply = Channel.new
85       value = [value, reply]
86     end
87     @join_lock.receive
88     begin
89       @join_pending[index].push value
90       @join_pending_mask |= 1 << index
91       @join_chords.each do |mask, indices, action|
92         if mask & @join_pending_mask == mask
93           args = []
94           waiting = []
95           indices.each do |i|
96             pending = @join_pending[i]
97             value = pending.shift
98             if @join_sync[i]
99               args.push value.first
100               waiting.push value.last
101             else
102               args.push value
103             end
104             @join_pending_mask ^= 1 << i if pending.empty?
105           end
106           thread = Thread.new { instance_exec(*args, &action) }
107           waiting.each { |waiter| waiter.send thread }
108           break
109         end
110       end
111     ensure
112       @join_lock.send nil
113     end
114     if @join_sync[index]
115       reply.receive.value
116     else
117       self
118     end
119   end
120   private :__channel_send__