require 'monitor' require 'thread' require 'drb/drb' require 'rinda/rinda' require 'enumerator' require 'forwardable' module Rinda ## # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) # together with expiry and cancellation data. class TupleEntry include DRbUndumped attr_accessor :expires ## # Creates a TupleEntry based on +ary+ with an optional renewer or expiry # time +sec+. # # A renewer must implement the +renew+ method which returns a Numeric, # nil, or true to indicate when the tuple has expired. def initialize(ary, sec=nil) @cancel = false @expires = nil @tuple = make_tuple(ary) @renewer = nil renew(sec) end ## # Marks this TupleEntry as canceled. def cancel @cancel = true end ## # A TupleEntry is dead when it is canceled or expired. def alive? !canceled? && !expired? end ## # Return the object which makes up the tuple itself: the Array # or Hash. def value; @tuple.value; end ## # Returns the canceled status. def canceled?; @cancel; end ## # Has this tuple expired? (true/false). # # A tuple has expired when its expiry timer based on the +sec+ argument to # #initialize runs out. def expired? return true unless @expires return false if @expires > Time.now return true if @renewer.nil? renew(@renewer) return true unless @expires return @expires < Time.now end ## # Reset the expiry time according to +sec_or_renewer+. # # +nil+:: it is set to expire in the far future. # +false+:: it has expired. # Numeric:: it will expire in that many seconds. # # Otherwise the argument refers to some kind of renewer object # which will reset its expiry time. def renew(sec_or_renewer) sec, @renewer = get_renewer(sec_or_renewer) @expires = make_expires(sec) end ## # Returns an expiry Time based on +sec+ which can be one of: # Numeric:: +sec+ seconds into the future # +true+:: the expiry time is the start of 1970 (i.e. expired) # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when # UNIX clocks will die) def make_expires(sec=nil) case sec when Numeric Time.now + sec when true Time.at(1) when nil Time.at(2**31-1) end end ## # Retrieves +key+ from the tuple. def [](key) @tuple[key] end ## # Fetches +key+ from the tuple. def fetch(key) @tuple.fetch(key) end ## # The size of the tuple. def size @tuple.size end ## # Creates a Rinda::Tuple for +ary+. def make_tuple(ary) Rinda::Tuple.new(ary) end private ## # Returns a valid argument to make_expires and the renewer or nil. # # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual # renewer). Otherwise it returns an expiry value from calling +it.renew+ # and the renewer. def get_renewer(it) case it when Numeric, true, nil return it, nil else begin return it.renew, it rescue Exception return it, nil end end end end ## # A TemplateEntry is a Template together with expiry and cancellation data. class TemplateEntry < TupleEntry ## # Matches this TemplateEntry against +tuple+. See Template#match for # details on how a Template matches a Tuple. def match(tuple) @tuple.match(tuple) end alias === match def make_tuple(ary) # :nodoc: Rinda::Template.new(ary) end end ## # Documentation? class WaitTemplateEntry < TemplateEntry attr_reader :found def initialize(place, ary, expires=nil) super(ary, expires) @place = place @cond = place.new_cond @found = nil end def cancel super signal end def wait @cond.wait end def read(tuple) @found = tuple signal end def signal @place.synchronize do @cond.signal end end end ## # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of # TupleSpace changes. You may receive either your subscribed event or the # 'close' event when iterating over notifications. # # See TupleSpace#notify_event for valid notification types. # # == Example # # ts = Rinda::TupleSpace.new # observer = ts.notify 'write', [nil] # # Thread.start do # observer.each { |t| p t } # end # # 3.times { |i| ts.write [i] } # # Outputs: # # ['write', [0]] # ['write', [1]] # ['write', [2]] class NotifyTemplateEntry < TemplateEntry ## # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that # match +tuple+. def initialize(place, event, tuple, expires=nil) ary = [event, Rinda::Template.new(tuple)] super(ary, expires) @queue = Queue.new @done = false end ## # Called by TupleSpace to notify this NotifyTemplateEntry of a new event. def notify(ev) @queue.push(ev) end ## # Retrieves a notification. Raises RequestExpiredError when this # NotifyTemplateEntry expires. def pop raise RequestExpiredError if @done it = @queue.pop @done = true if it[0] == 'close' return it end ## # Yields event/tuple pairs until this NotifyTemplateEntry expires. def each # :yields: event, tuple while !@done it = pop yield(it) end rescue ensure cancel end end ## # TupleBag is an unordered collection of tuples. It is the basis # of Tuplespace. class TupleBag class TupleBin extend Forwardable def_delegators '@bin', :find_all, :delete_if, :each, :empty? def initialize @bin = [] end def add(tuple) @bin.push(tuple) end def delete(tuple) idx = @bin.rindex(tuple) @bin.delete_at(idx) if idx end def find(&blk) @bin.reverse_each do |x| return x if yield(x) end nil end end def initialize # :nodoc: @hash = {} @enum = Enumerable::Enumerator.new(self, :each_entry) end ## # +true+ if the TupleBag to see if it has any expired entries. def has_expires? @enum.find do |tuple| tuple.expires end end ## # Add +tuple+ to the TupleBag. def push(tuple) key = bin_key(tuple) @hash[key] ||= TupleBin.new @hash[key].add(tuple) end ## # Removes +tuple+ from the TupleBag. def delete(tuple) key = bin_key(tuple) bin = @hash[key] return nil unless bin bin.delete(tuple) @hash.delete(key) if bin.empty? tuple end ## # Finds all live tuples that match +template+. def find_all(template) bin_for_find(template).find_all do |tuple| tuple.alive? && template.match(tuple) end end ## # Finds a live tuple that matches +template+. def find(template) bin_for_find(template).find do |tuple| tuple.alive? && template.match(tuple) end end ## # Finds all tuples in the TupleBag which when treated as templates, match # +tuple+ and are alive. def find_all_template(tuple) @enum.find_all do |template| template.alive? && template.match(tuple) end end ## # Delete tuples which dead tuples from the TupleBag, returning the deleted # tuples. def delete_unless_alive deleted = [] @hash.each do |key, bin| bin.delete_if do |tuple| if tuple.alive? false else deleted.push(tuple) true end end end deleted end private def each_entry(&blk) @hash.each do |k, v| v.each(&blk) end end def bin_key(tuple) head = tuple[0] if head.class == Symbol return head else false end end def bin_for_find(template) key = bin_key(template) key ? @hash.fetch(key, []) : @enum end end ## # The Tuplespace manages access to the tuples it contains, # ensuring mutual exclusion requirements are met. # # The +sec+ option for the write, take, move, read and notify methods may # either be a number of seconds or a Renewer object. class TupleSpace include DRbUndumped include MonitorMixin ## # Creates a new TupleSpace. +period+ is used to control how often to look # for dead tuples after modifications to the TupleSpace. # # If no dead tuples are found +period+ seconds after the last # modification, the TupleSpace will stop looking for dead tuples. def initialize(period=60) super() @bag = TupleBag.new @read_waiter = TupleBag.new @take_waiter = TupleBag.new @notify_waiter = TupleBag.new @period = period @keeper = nil end ## # Adds +tuple+ def write(tuple, sec=nil) entry = create_entry(tuple, sec) synchronize do if entry.expired? @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end notify_event('write', entry.value) notify_event('delete', entry.value) else @bag.push(entry) start_keeper if entry.expires @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end @take_waiter.find_all_template(entry).each do |template| template.signal end notify_event('write', entry.value) end end entry end ## # Removes +tuple+ def take(tuple, sec=nil, &block) move(nil, tuple, sec, &block) end ## # Moves +tuple+ to +port+. def move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? synchronize do entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) notify_event('take', entry.value) return entry.value end raise RequestExpiredError if template.expired? begin @take_waiter.push(template) start_keeper if template.expires while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) notify_event('take', entry.value) return entry.value end template.wait end ensure @take_waiter.delete(template) end end end ## # Reads +tuple+, but does not remove it. def read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? synchronize do entry = @bag.find(template) return entry.value if entry raise RequestExpiredError if template.expired? begin @read_waiter.push(template) start_keeper if template.expires template.wait raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? return template.found ensure @read_waiter.delete(template) end end end ## # Returns all tuples matching +tuple+. Does not remove the found tuples. def read_all(tuple) template = WaitTemplateEntry.new(self, tuple, nil) synchronize do entry = @bag.find_all(template) entry.collect do |e| e.value end end end ## # Registers for notifications of +event+. Returns a NotifyTemplateEntry. # See NotifyTemplateEntry for examples of how to listen for notifications. # # +event+ can be: # 'write':: A tuple was added # 'take':: A tuple was taken or moved # 'delete':: A tuple was lost after being overwritten or expiring # # The TupleSpace will also notify you of the 'close' event when the # NotifyTemplateEntry has expired. def notify(event, tuple, sec=nil) template = NotifyTemplateEntry.new(self, event, tuple, sec) synchronize do @notify_waiter.push(template) end template end private def create_entry(tuple, sec) TupleEntry.new(tuple, sec) end ## # Removes dead tuples. def keep_clean synchronize do @read_waiter.delete_unless_alive.each do |e| e.signal end @take_waiter.delete_unless_alive.each do |e| e.signal end @notify_waiter.delete_unless_alive.each do |e| e.notify(['close']) end @bag.delete_unless_alive.each do |e| notify_event('delete', e.value) end end end ## # Notifies all registered listeners for +event+ of a status change of # +tuple+. def notify_event(event, tuple) ev = [event, tuple] @notify_waiter.find_all_template(ev).each do |template| template.notify(ev) end end ## # Creates a thread that scans the tuplespace for expired tuples. def start_keeper return if @keeper && @keeper.alive? @keeper = Thread.new do while true sleep(@period) synchronize do break unless need_keeper? keep_clean end end end end ## # Checks the tuplespace to see if it needs cleaning. def need_keeper? return true if @bag.has_expires? return true if @read_waiter.has_expires? return true if @take_waiter.has_expires? return true if @notify_waiter.has_expires? end end end