Change soft-fail to use the config, rather than env
[rbx.git] / stdlib / rinda / tuplespace.rb
blob73e79bb401e1085c27723486a4a93503ed379d99
1 require 'monitor'
2 require 'thread'
3 require 'drb/drb'
4 require 'rinda/rinda'
6 module Rinda
8   ##
9   # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
10   # together with expiry and cancellation data.
12   class TupleEntry
14     include DRbUndumped
16     attr_accessor :expires
18     ##
19     # Creates a TupleEntry based on +ary+ with an optional renewer or expiry
20     # time +sec+.
21     #
22     # A renewer must implement the +renew+ method which returns a Numeric,
23     # nil, or true to indicate when the tuple has expired.
25     def initialize(ary, sec=nil)
26       @cancel = false
27       @expires = nil
28       @tuple = make_tuple(ary)
29       @renewer = nil
30       renew(sec)
31     end
33     ##
34     # Marks this TupleEntry as canceled.
36     def cancel
37       @cancel = true
38     end
40     ##
41     # A TupleEntry is dead when it is canceled or expired.
43     def alive?
44       !canceled? && !expired?
45     end
47     ##
48     # Return the object which makes up the tuple itself: the Array
49     # or Hash.
51     def value; @tuple.value; end
53     ##
54     # Returns the canceled status.
56     def canceled?; @cancel; end
58     ##
59     # Has this tuple expired? (true/false).
60     #
61     # A tuple has expired when its expiry timer based on the +sec+ argument to
62     # #initialize runs out.
64     def expired?
65       return true unless @expires
66       return false if @expires > Time.now
67       return true if @renewer.nil?
68       renew(@renewer)
69       return true unless @expires
70       return @expires < Time.now
71     end
73     ##
74     # Reset the expiry time according to +sec_or_renewer+.  
75     #
76     # +nil+::    it is set to expire in the far future.
77     # +false+::  it has expired.
78     # Numeric::  it will expire in that many seconds.
79     #
80     # Otherwise the argument refers to some kind of renewer object
81     # which will reset its expiry time. 
83     def renew(sec_or_renewer)
84       sec, @renewer = get_renewer(sec_or_renewer)
85       @expires = make_expires(sec)
86     end
88     ##
89     # Returns an expiry Time based on +sec+ which can be one of:
90     # Numeric:: +sec+ seconds into the future
91     # +true+::  the expiry time is the start of 1970 (i.e. expired)
92     # +nil+::   it is  Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
93     #           UNIX clocks will die)
95     def make_expires(sec=nil)
96       case sec
97       when Numeric
98         Time.now + sec
99       when true
100         Time.at(1)
101       when nil
102         Time.at(2**31-1)
103       end
104     end
106     ##
107     # Retrieves +key+ from the tuple.
109     def [](key)
110       @tuple[key]
111     end
113     ##
114     # Fetches +key+ from the tuple.
116     def fetch(key)
117       @tuple.fetch(key)
118     end
120     ##
121     # The size of the tuple.
123     def size
124       @tuple.size
125     end
127     ##
128     # Creates a Rinda::Tuple for +ary+.
130     def make_tuple(ary)
131       Rinda::Tuple.new(ary)
132     end
134     private
136     ##
137     # Returns a valid argument to make_expires and the renewer or nil.
138     #
139     # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
140     # renewer).  Otherwise it returns an expiry value from calling +it.renew+
141     # and the renewer.
143     def get_renewer(it)
144       case it
145       when Numeric, true, nil
146         return it, nil
147       else
148         begin
149           return it.renew, it
150         rescue Exception
151           return it, nil
152         end
153       end
154     end
156   end
158   ##
159   # A TemplateEntry is a Template together with expiry and cancellation data.
161   class TemplateEntry < TupleEntry
162     ##
163     # Matches this TemplateEntry against +tuple+.  See Template#match for
164     # details on how a Template matches a Tuple.
166     def match(tuple)
167       @tuple.match(tuple)
168     end
169     
170     alias === match
172     def make_tuple(ary) # :nodoc:
173       Rinda::Template.new(ary)
174     end
176   end
178   ##
179   # <i>Documentation?</i>
181   class WaitTemplateEntry < TemplateEntry
183     attr_reader :found
185     def initialize(place, ary, expires=nil)
186       super(ary, expires)
187       @place = place
188       @cond = place.new_cond
189       @found = nil
190     end
192     def cancel
193       super
194       signal
195     end
197     def wait
198       @cond.wait
199     end
201     def read(tuple)
202       @found = tuple
203       signal
204     end
206     def signal
207       @place.synchronize do
208         @cond.signal
209       end
210     end
212   end
214   ##
215   # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
216   # TupleSpace changes.  You may receive either your subscribed event or the
217   # 'close' event when iterating over notifications.
218   #
219   # See TupleSpace#notify_event for valid notification types.
220   #
221   # == Example
222   #
223   #   ts = Rinda::TupleSpace.new
224   #   observer = ts.notify 'write', [nil]
225   #   
226   #   Thread.start do
227   #     observer.each { |t| p t }
228   #   end
229   #   
230   #   3.times { |i| ts.write [i] }
231   #
232   # Outputs:
233   #
234   #   ['write', [0]]
235   #   ['write', [1]]
236   #   ['write', [2]]
238   class NotifyTemplateEntry < TemplateEntry
240     ##
241     # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
242     # match +tuple+.
244     def initialize(place, event, tuple, expires=nil)
245       ary = [event, Rinda::Template.new(tuple)]
246       super(ary, expires)
247       @queue = Queue.new
248       @done = false
249     end
251     ##
252     # Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
254     def notify(ev)
255       @queue.push(ev)
256     end
258     ##
259     # Retrieves a notification.  Raises RequestExpiredError when this
260     # NotifyTemplateEntry expires.
262     def pop
263       raise RequestExpiredError if @done
264       it = @queue.pop
265       @done = true if it[0] == 'close'
266       return it
267     end
269     ##
270     # Yields event/tuple pairs until this NotifyTemplateEntry expires.
272     def each # :yields: event, tuple
273       while !@done
274         it = pop
275         yield(it)
276       end
277     rescue 
278     ensure
279       cancel
280     end
282   end
284   ##
285   # TupleBag is an unordered collection of tuples. It is the basis
286   # of Tuplespace.
288   class TupleBag
290     def initialize # :nodoc:
291       @hash = {}
292     end
294     ##
295     # +true+ if the TupleBag to see if it has any expired entries.
297     def has_expires?
298       @hash.each do |k, v|
299         v.each do |tuple|
300           return true if tuple.expires
301         end
302       end
303       false
304     end
306     ##
307     # Add +ary+ to the TupleBag.
309     def push(ary)
310       size = ary.size
311       @hash[size] ||= []
312       @hash[size].push(ary)
313     end
315     ##
316     # Removes +ary+ from the TupleBag.
318     def delete(ary)
319       size = ary.size
320       @hash.fetch(size, []).delete(ary)
321     end
323     ##
324     # Finds all live tuples that match +template+.
326     def find_all(template)
327       @hash.fetch(template.size, []).find_all do |tuple|
328         tuple.alive? && template.match(tuple)
329       end
330     end
332     ##
333     # Finds a live tuple that matches +template+.
335     def find(template)
336       @hash.fetch(template.size, []).find do |tuple|
337         tuple.alive? && template.match(tuple)
338       end
339     end
341     ##
342     # Finds all tuples in the TupleBag which when treated as templates, match
343     # +tuple+ and are alive.
345     def find_all_template(tuple)
346       @hash.fetch(tuple.size, []).find_all do |template|
347         template.alive? && template.match(tuple)
348       end
349     end
351     ##
352     # Delete tuples which dead tuples from the TupleBag, returning the deleted
353     # tuples.
355     def delete_unless_alive
356       deleted = []
357       @hash.keys.each do |size|
358         ary = []
359         @hash[size].each do |tuple|
360           if tuple.alive?
361             ary.push(tuple)
362           else
363             deleted.push(tuple)
364           end
365         end
366         @hash[size] = ary
367       end
368       deleted
369     end
371   end
373   ##
374   # The Tuplespace manages access to the tuples it contains,
375   # ensuring mutual exclusion requirements are met.
376   #
377   # The +sec+ option for the write, take, move, read and notify methods may
378   # either be a number of seconds or a Renewer object.
380   class TupleSpace
382     include DRbUndumped
383     include MonitorMixin
385     ##
386     # Creates a new TupleSpace.  +period+ is used to control how often to look
387     # for dead tuples after modifications to the TupleSpace.
388     #
389     # If no dead tuples are found +period+ seconds after the last
390     # modification, the TupleSpace will stop looking for dead tuples.
392     def initialize(period=60)
393       super()
394       @bag = TupleBag.new
395       @read_waiter = TupleBag.new
396       @take_waiter = TupleBag.new
397       @notify_waiter = TupleBag.new
398       @period = period
399       @keeper = nil
400     end
402     ##
403     # Adds +tuple+
405     def write(tuple, sec=nil)
406       entry = TupleEntry.new(tuple, sec)
407       start_keeper
408       synchronize do
409         if entry.expired?
410           @read_waiter.find_all_template(entry).each do |template|
411             template.read(tuple)
412           end
413           notify_event('write', entry.value)
414           notify_event('delete', entry.value)
415         else
416           @bag.push(entry)
417           @read_waiter.find_all_template(entry).each do |template|
418             template.read(tuple)
419           end
420           @take_waiter.find_all_template(entry).each do |template|
421             template.signal
422           end
423           notify_event('write', entry.value)
424         end
425       end
426       entry
427     end
429     ##
430     # Removes +tuple+
432     def take(tuple, sec=nil, &block)
433       move(nil, tuple, sec, &block)
434     end
436     ##
437     # Moves +tuple+ to +port+.
439     def move(port, tuple, sec=nil)
440       template = WaitTemplateEntry.new(self, tuple, sec)
441       yield(template) if block_given?
442       start_keeper
443       synchronize do
444         entry = @bag.find(template)
445         if entry
446           port.push(entry.value) if port
447           @bag.delete(entry)
448           notify_event('take', entry.value)
449           return entry.value
450         end
451         raise RequestExpiredError if template.expired?
453         begin
454           @take_waiter.push(template)
455           while true
456             raise RequestCanceledError if template.canceled?
457             raise RequestExpiredError if template.expired?
458             entry = @bag.find(template)
459             if entry
460               port.push(entry.value) if port
461               @bag.delete(entry)
462               notify_event('take', entry.value)
463               return entry.value
464             end
465             template.wait
466           end
467         ensure
468           @take_waiter.delete(template)
469         end
470       end
471     end
473     ##
474     # Reads +tuple+, but does not remove it.
476     def read(tuple, sec=nil)
477       template = WaitTemplateEntry.new(self, tuple, sec)
478       yield(template) if block_given?
479       start_keeper
480       synchronize do
481         entry = @bag.find(template)
482         return entry.value if entry
483         raise RequestExpiredError if template.expired?
485         begin
486           @read_waiter.push(template)
487           template.wait
488           raise RequestCanceledError if template.canceled?
489           raise RequestExpiredError if template.expired?
490           return template.found
491         ensure
492           @read_waiter.delete(template)
493         end
494       end
495     end
497     ##
498     # Returns all tuples matching +tuple+.  Does not remove the found tuples.
500     def read_all(tuple)
501       template = WaitTemplateEntry.new(self, tuple, nil)
502       synchronize do
503         entry = @bag.find_all(template)
504         entry.collect do |e|
505           e.value
506         end
507       end
508     end
510     ##
511     # Registers for notifications of +event+.  Returns a NotifyTemplateEntry.
512     # See NotifyTemplateEntry for examples of how to listen for notifications.
513     #
514     # +event+ can be:
515     # 'write'::  A tuple was added
516     # 'take'::   A tuple was taken or moved
517     # 'delete':: A tuple was lost after being overwritten or expiring
518     #
519     # The TupleSpace will also notify you of the 'close' event when the
520     # NotifyTemplateEntry has expired.
522     def notify(event, tuple, sec=nil)
523       template = NotifyTemplateEntry.new(self, event, tuple, sec)
524       synchronize do
525         @notify_waiter.push(template)
526       end
527       template
528     end
530     private
532     ##
533     # Removes dead tuples.
535     def keep_clean
536       synchronize do
537         @read_waiter.delete_unless_alive.each do |e|
538           e.signal
539         end
540         @take_waiter.delete_unless_alive.each do |e|
541           e.signal
542         end
543         @notify_waiter.delete_unless_alive.each do |e|
544           e.notify(['close'])
545         end
546         @bag.delete_unless_alive.each do |e|
547           notify_event('delete', e.value)
548         end
549       end
550     end
552     ##
553     # Notifies all registered listeners for +event+ of a status change of
554     # +tuple+.
556     def notify_event(event, tuple)
557       ev = [event, tuple]
558       @notify_waiter.find_all_template(ev).each do |template|
559         template.notify(ev)
560       end
561     end
563     ##
564     # Creates a thread that scans the tuplespace for expired tuples.
566     def start_keeper
567       return if @keeper && @keeper.alive?
568       @keeper = Thread.new do
569         while need_keeper?
570           keep_clean
571           sleep(@period)
572         end
573       end
574     end
576     ##
577     # Checks the tuplespace to see if it needs cleaning.
579     def need_keeper?
580       return true if @bag.has_expires?
581       return true if @read_waiter.has_expires?
582       return true if @take_waiter.has_expires?
583       return true if @notify_waiter.has_expires?
584     end
586   end