* transcode.c (transcode_restartable): my_transcoder argument removed.
[ruby-svn.git] / lib / rinda / tuplespace.rb
blobb0409dde3b59199023f04cda59168856c3f02d1a
1 require 'monitor'
2 require 'thread'
3 require 'drb/drb'
4 require 'rinda/rinda'
5 require 'enumerator'
6 require 'forwardable'
8 module Rinda
10   ##
11   # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
12   # together with expiry and cancellation data.
14   class TupleEntry
16     include DRbUndumped
18     attr_accessor :expires
20     ##
21     # Creates a TupleEntry based on +ary+ with an optional renewer or expiry
22     # time +sec+.
23     #
24     # A renewer must implement the +renew+ method which returns a Numeric,
25     # nil, or true to indicate when the tuple has expired.
27     def initialize(ary, sec=nil)
28       @cancel = false
29       @expires = nil
30       @tuple = make_tuple(ary)
31       @renewer = nil
32       renew(sec)
33     end
35     ##
36     # Marks this TupleEntry as canceled.
38     def cancel
39       @cancel = true
40     end
42     ##
43     # A TupleEntry is dead when it is canceled or expired.
45     def alive?
46       !canceled? && !expired?
47     end
49     ##
50     # Return the object which makes up the tuple itself: the Array
51     # or Hash.
53     def value; @tuple.value; end
55     ##
56     # Returns the canceled status.
58     def canceled?; @cancel; end
60     ##
61     # Has this tuple expired? (true/false).
62     #
63     # A tuple has expired when its expiry timer based on the +sec+ argument to
64     # #initialize runs out.
66     def expired?
67       return true unless @expires
68       return false if @expires > Time.now
69       return true if @renewer.nil?
70       renew(@renewer)
71       return true unless @expires
72       return @expires < Time.now
73     end
75     ##
76     # Reset the expiry time according to +sec_or_renewer+.  
77     #
78     # +nil+::    it is set to expire in the far future.
79     # +false+::  it has expired.
80     # Numeric::  it will expire in that many seconds.
81     #
82     # Otherwise the argument refers to some kind of renewer object
83     # which will reset its expiry time. 
85     def renew(sec_or_renewer)
86       sec, @renewer = get_renewer(sec_or_renewer)
87       @expires = make_expires(sec)
88     end
90     ##
91     # Returns an expiry Time based on +sec+ which can be one of:
92     # Numeric:: +sec+ seconds into the future
93     # +true+::  the expiry time is the start of 1970 (i.e. expired)
94     # +nil+::   it is  Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
95     #           UNIX clocks will die)
97     def make_expires(sec=nil)
98       case sec
99       when Numeric
100         Time.now + sec
101       when true
102         Time.at(1)
103       when nil
104         Time.at(2**31-1)
105       end
106     end
108     ##
109     # Retrieves +key+ from the tuple.
111     def [](key)
112       @tuple[key]
113     end
115     ##
116     # Fetches +key+ from the tuple.
118     def fetch(key)
119       @tuple.fetch(key)
120     end
122     ##
123     # The size of the tuple.
125     def size
126       @tuple.size
127     end
129     ##
130     # Creates a Rinda::Tuple for +ary+.
132     def make_tuple(ary)
133       Rinda::Tuple.new(ary)
134     end
136     private
138     ##
139     # Returns a valid argument to make_expires and the renewer or nil.
140     #
141     # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
142     # renewer).  Otherwise it returns an expiry value from calling +it.renew+
143     # and the renewer.
145     def get_renewer(it)
146       case it
147       when Numeric, true, nil
148         return it, nil
149       else
150         begin
151           return it.renew, it
152         rescue Exception
153           return it, nil
154         end
155       end
156     end
158   end
160   ##
161   # A TemplateEntry is a Template together with expiry and cancellation data.
163   class TemplateEntry < TupleEntry
164     ##
165     # Matches this TemplateEntry against +tuple+.  See Template#match for
166     # details on how a Template matches a Tuple.
168     def match(tuple)
169       @tuple.match(tuple)
170     end
171     
172     alias === match
174     def make_tuple(ary) # :nodoc:
175       Rinda::Template.new(ary)
176     end
178   end
180   ##
181   # <i>Documentation?</i>
183   class WaitTemplateEntry < TemplateEntry
185     attr_reader :found
187     def initialize(place, ary, expires=nil)
188       super(ary, expires)
189       @place = place
190       @cond = place.new_cond
191       @found = nil
192     end
194     def cancel
195       super
196       signal
197     end
199     def wait
200       @cond.wait
201     end
203     def read(tuple)
204       @found = tuple
205       signal
206     end
208     def signal
209       @place.synchronize do
210         @cond.signal
211       end
212     end
214   end
216   ##
217   # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
218   # TupleSpace changes.  You may receive either your subscribed event or the
219   # 'close' event when iterating over notifications.
220   #
221   # See TupleSpace#notify_event for valid notification types.
222   #
223   # == Example
224   #
225   #   ts = Rinda::TupleSpace.new
226   #   observer = ts.notify 'write', [nil]
227   #   
228   #   Thread.start do
229   #     observer.each { |t| p t }
230   #   end
231   #   
232   #   3.times { |i| ts.write [i] }
233   #
234   # Outputs:
235   #
236   #   ['write', [0]]
237   #   ['write', [1]]
238   #   ['write', [2]]
240   class NotifyTemplateEntry < TemplateEntry
242     ##
243     # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
244     # match +tuple+.
246     def initialize(place, event, tuple, expires=nil)
247       ary = [event, Rinda::Template.new(tuple)]
248       super(ary, expires)
249       @queue = Queue.new
250       @done = false
251     end
253     ##
254     # Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
256     def notify(ev)
257       @queue.push(ev)
258     end
260     ##
261     # Retrieves a notification.  Raises RequestExpiredError when this
262     # NotifyTemplateEntry expires.
264     def pop
265       raise RequestExpiredError if @done
266       it = @queue.pop
267       @done = true if it[0] == 'close'
268       return it
269     end
271     ##
272     # Yields event/tuple pairs until this NotifyTemplateEntry expires.
274     def each # :yields: event, tuple
275       while !@done
276         it = pop
277         yield(it)
278       end
279     rescue 
280     ensure
281       cancel
282     end
284   end
286   ##
287   # TupleBag is an unordered collection of tuples. It is the basis
288   # of Tuplespace.
290   class TupleBag
291     class TupleBin
292       extend Forwardable
293       def_delegators '@bin', :find_all, :delete_if, :each, :empty?
295       def initialize
296         @bin = []
297       end
298       
299       def add(tuple)
300         @bin.push(tuple)
301       end
302       
303       def delete(tuple)
304         idx = @bin.rindex(tuple)
305         @bin.delete_at(idx) if idx
306       end
307       
308       def find(&blk)
309         @bin.reverse_each do |x|
310           return x if yield(x)
311         end
312         nil
313       end
314     end
316     def initialize # :nodoc:
317       @hash = {}
318       @enum = Enumerable::Enumerator.new(self, :each_entry)
319     end
321     ##
322     # +true+ if the TupleBag to see if it has any expired entries.
324     def has_expires?
325       @enum.find do |tuple|
326         tuple.expires
327       end
328     end
330     ##
331     # Add +tuple+ to the TupleBag.
333     def push(tuple)
334       key = bin_key(tuple)
335       @hash[key] ||= TupleBin.new
336       @hash[key].add(tuple)
337     end
339     ##
340     # Removes +tuple+ from the TupleBag.
342     def delete(tuple)
343       key = bin_key(tuple)
344       bin = @hash[key]
345       return nil unless bin
346       bin.delete(tuple)
347       @hash.delete(key) if bin.empty?
348       tuple
349     end
351     ##
352     # Finds all live tuples that match +template+.
353     def find_all(template)
354       bin_for_find(template).find_all do |tuple|
355         tuple.alive? && template.match(tuple)
356       end
357     end
359     ##
360     # Finds a live tuple that matches +template+.
362     def find(template)
363       bin_for_find(template).find do |tuple|
364         tuple.alive? && template.match(tuple)
365       end
366     end
368     ##
369     # Finds all tuples in the TupleBag which when treated as templates, match
370     # +tuple+ and are alive.
372     def find_all_template(tuple)
373       @enum.find_all do |template|
374         template.alive? && template.match(tuple)
375       end
376     end
378     ##
379     # Delete tuples which dead tuples from the TupleBag, returning the deleted
380     # tuples.
382     def delete_unless_alive
383       deleted = []
384       @hash.each do |key, bin|
385         bin.delete_if do |tuple|
386           if tuple.alive?
387             false
388           else
389             deleted.push(tuple)
390             true
391           end
392         end
393       end
394       deleted
395     end
397     private
398     def each_entry(&blk)
399       @hash.each do |k, v|
400         v.each(&blk)
401       end
402     end
404     def bin_key(tuple)
405       head = tuple[0]
406       if head.class == Symbol
407         return head
408       else
409         false
410       end
411     end
413     def bin_for_find(template)
414       key = bin_key(template)
415       key ? @hash.fetch(key, []) : @enum
416     end
417   end
419   ##
420   # The Tuplespace manages access to the tuples it contains,
421   # ensuring mutual exclusion requirements are met.
422   #
423   # The +sec+ option for the write, take, move, read and notify methods may
424   # either be a number of seconds or a Renewer object.
426   class TupleSpace
428     include DRbUndumped
429     include MonitorMixin
431     ##
432     # Creates a new TupleSpace.  +period+ is used to control how often to look
433     # for dead tuples after modifications to the TupleSpace.
434     #
435     # If no dead tuples are found +period+ seconds after the last
436     # modification, the TupleSpace will stop looking for dead tuples.
438     def initialize(period=60)
439       super()
440       @bag = TupleBag.new
441       @read_waiter = TupleBag.new
442       @take_waiter = TupleBag.new
443       @notify_waiter = TupleBag.new
444       @period = period
445       @keeper = nil
446     end
448     ##
449     # Adds +tuple+
451     def write(tuple, sec=nil)
452       entry = create_entry(tuple, sec)
453       synchronize do
454         if entry.expired?
455           @read_waiter.find_all_template(entry).each do |template|
456             template.read(tuple)
457           end
458           notify_event('write', entry.value)
459           notify_event('delete', entry.value)
460         else
461           @bag.push(entry)
462           start_keeper if entry.expires
463           @read_waiter.find_all_template(entry).each do |template|
464             template.read(tuple)
465           end
466           @take_waiter.find_all_template(entry).each do |template|
467             template.signal
468           end
469           notify_event('write', entry.value)
470         end
471       end
472       entry
473     end
475     ##
476     # Removes +tuple+
478     def take(tuple, sec=nil, &block)
479       move(nil, tuple, sec, &block)
480     end
482     ##
483     # Moves +tuple+ to +port+.
485     def move(port, tuple, sec=nil)
486       template = WaitTemplateEntry.new(self, tuple, sec)
487       yield(template) if block_given?
488       synchronize do
489         entry = @bag.find(template)
490         if entry
491           port.push(entry.value) if port
492           @bag.delete(entry)
493           notify_event('take', entry.value)
494           return entry.value
495         end
496         raise RequestExpiredError if template.expired?
498         begin
499           @take_waiter.push(template)
500           start_keeper if template.expires
501           while true
502             raise RequestCanceledError if template.canceled?
503             raise RequestExpiredError if template.expired?
504             entry = @bag.find(template)
505             if entry
506               port.push(entry.value) if port
507               @bag.delete(entry)
508               notify_event('take', entry.value)
509               return entry.value
510             end
511             template.wait
512           end
513         ensure
514           @take_waiter.delete(template)
515         end
516       end
517     end
519     ##
520     # Reads +tuple+, but does not remove it.
522     def read(tuple, sec=nil)
523       template = WaitTemplateEntry.new(self, tuple, sec)
524       yield(template) if block_given?
525       synchronize do
526         entry = @bag.find(template)
527         return entry.value if entry
528         raise RequestExpiredError if template.expired?
530         begin
531           @read_waiter.push(template)
532           start_keeper if template.expires
533           template.wait
534           raise RequestCanceledError if template.canceled?
535           raise RequestExpiredError if template.expired?
536           return template.found
537         ensure
538           @read_waiter.delete(template)
539         end
540       end
541     end
543     ##
544     # Returns all tuples matching +tuple+.  Does not remove the found tuples.
546     def read_all(tuple)
547       template = WaitTemplateEntry.new(self, tuple, nil)
548       synchronize do
549         entry = @bag.find_all(template)
550         entry.collect do |e|
551           e.value
552         end
553       end
554     end
556     ##
557     # Registers for notifications of +event+.  Returns a NotifyTemplateEntry.
558     # See NotifyTemplateEntry for examples of how to listen for notifications.
559     #
560     # +event+ can be:
561     # 'write'::  A tuple was added
562     # 'take'::   A tuple was taken or moved
563     # 'delete':: A tuple was lost after being overwritten or expiring
564     #
565     # The TupleSpace will also notify you of the 'close' event when the
566     # NotifyTemplateEntry has expired.
568     def notify(event, tuple, sec=nil)
569       template = NotifyTemplateEntry.new(self, event, tuple, sec)
570       synchronize do
571         @notify_waiter.push(template)
572       end
573       template
574     end
576     private
578     def create_entry(tuple, sec)
579       TupleEntry.new(tuple, sec)
580     end
582     ##
583     # Removes dead tuples.
585     def keep_clean
586       synchronize do
587         @read_waiter.delete_unless_alive.each do |e|
588           e.signal
589         end
590         @take_waiter.delete_unless_alive.each do |e|
591           e.signal
592         end
593         @notify_waiter.delete_unless_alive.each do |e|
594           e.notify(['close'])
595         end
596         @bag.delete_unless_alive.each do |e|
597           notify_event('delete', e.value)
598         end
599       end
600     end
602     ##
603     # Notifies all registered listeners for +event+ of a status change of
604     # +tuple+.
606     def notify_event(event, tuple)
607       ev = [event, tuple]
608       @notify_waiter.find_all_template(ev).each do |template|
609         template.notify(ev)
610       end
611     end
613     ##
614     # Creates a thread that scans the tuplespace for expired tuples.
616     def start_keeper
617       return if @keeper && @keeper.alive?
618       @keeper = Thread.new do
619         while true
620           sleep(@period)
621           synchronize do
622             break unless need_keeper?
623             keep_clean
624           end
625         end
626       end
627     end
629     ##
630     # Checks the tuplespace to see if it needs cleaning.
632     def need_keeper?
633       return true if @bag.has_expires?
634       return true if @read_waiter.has_expires?
635       return true if @take_waiter.has_expires?
636       return true if @notify_waiter.has_expires?
637     end
639   end