# # = tuplespace: ??? # # Overview of rinda/tuplespace.rb # # Example(s) # require 'monitor' require 'thread' require 'drb/drb' require 'rinda/rinda' module Rinda # # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) # together with expiry and cancellation data. # class TupleEntry include DRbUndumped def initialize(ary, sec=nil) @cancel = false @ary = make_tuple(ary) @renewer = nil renew(sec) end attr_accessor :expires def cancel @cancel = true end def alive? !canceled? && !expired? end # Return the object which makes up the tuple itself: the Array # or Hash. def value; @ary.value; end def canceled?; @cancel; end # Has this tuple expired? (true/false). def expired? return true unless @expires return false if @expires > Time.now return true if @renewer.nil? renew(@renewer) return true unless @expires return @expires < Time.now end # Reset the expiry data according to the supplied argument. If # the argument is: # # +nil+:: it is set to expire in the far future. # +false+:: it has epired. # Numeric:: it will expire in that many seconds. # # Otherwise the argument refers to some kind of renewer object # which will reset its expiry time. def renew(sec_or_renewer) sec, @renewer = get_renewer(sec_or_renewer) @expires = make_expires(sec) end # Create an expiry time. Called with: # # +true+:: the expiry time is the start of 1970 (i.e. expired). # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when # UNIX clocks will die) # # otherwise it is +sec+ seconds into the # future. def make_expires(sec=nil) case sec when Numeric Time.now + sec when true Time.at(1) when nil Time.at(2**31-1) end end # Accessor method for the tuple. def [](key) @ary[key] end def fetch(key) @ary.fetch(key) end # The size of the tuple. def size @ary.size end # Create a new tuple from the supplied object (array-like). def make_tuple(ary) Rinda::Tuple.new(ary) end private # Given +true+, +nil+, or +Numeric+, returns that (suitable input to # make_epires) and +nil+ (no actual +renewer+), else it return the # time data from the supplied +renewer+. def get_renewer(it) case it when Numeric, true, nil return it, nil else begin return it.renew, it rescue Exception return it, nil end end end end # # The same as a TupleEntry but with methods to do matching. # class TemplateEntry < TupleEntry def initialize(ary, expires=nil) super(ary, expires) @template = Rinda::Template.new(ary) end def match(tuple) @template.match(tuple) end # An alias for #match. def ===(tuple) match(tuple) end # Create a new Template from the supplied object. def make_tuple(ary) Rinda::Template.new(ary) end end # # Documenation? # class WaitTemplateEntry < TemplateEntry def initialize(place, ary, expires=nil) super(ary, expires) @place = place @cond = place.new_cond @found = nil end attr_reader :found def cancel super signal end def wait @cond.wait end def read(tuple) @found = tuple signal end def signal @place.synchronize do @cond.signal end end end # # Documenation? # class NotifyTemplateEntry < TemplateEntry def initialize(place, event, tuple, expires=nil) ary = [event, Rinda::Template.new(tuple)] super(ary, expires) @queue = Queue.new @done = false end def notify(ev) @queue.push(ev) end def pop raise RequestExpiredError if @done it = @queue.pop @done = true if it[0] == 'close' return it end def each while !@done it = pop yield(it) end rescue ensure cancel end end # # TupleBag is an unordered collection of tuples. It is the basis # of Tuplespace. # class TupleBag def initialize @hash = {} end # Add the object to the TupleBag. def push(ary) size = ary.size @hash[size] ||= [] @hash[size].push(ary) end # Remove the object from the TupleBag. def delete(ary) size = ary.size @hash.fetch(size, []).delete(ary) end # Finds all tuples that match the template and are alive. def find_all(template) @hash.fetch(template.size, []).find_all do |tuple| tuple.alive? && template.match(tuple) end end # Finds a template that matches and is alive. def find(template) @hash.fetch(template.size, []).find do |tuple| tuple.alive? && template.match(tuple) end end # Finds all tuples in the TupleBag which when treated as # templates, match the supplied tuple and are alive. def find_all_template(tuple) @hash.fetch(tuple.size, []).find_all do |template| template.alive? && template.match(tuple) end end # Delete tuples which are not alive from the TupleBag. Returns # the list of tuples so deleted. def delete_unless_alive deleted = [] @hash.keys.each do |size| ary = [] @hash[size].each do |tuple| if tuple.alive? ary.push(tuple) else deleted.push(tuple) end end @hash[size] = ary end deleted end end # # The Tuplespace manages access to the tuples it contains, # ensuring mutual exclusion requirments are met. # class TupleSpace include DRbUndumped include MonitorMixin def initialize(period=60) super() @bag = TupleBag.new @read_waiter = TupleBag.new @take_waiter = TupleBag.new @notify_waiter = TupleBag.new @period = period @keeper = keeper end # Put a tuple into the tuplespace. def write(tuple, sec=nil) entry = TupleEntry.new(tuple, sec) synchronize do if entry.expired? @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end notify_event('write', entry.value) notify_event('delete', entry.value) else @bag.push(entry) @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end @take_waiter.find_all_template(entry).each do |template| template.signal end notify_event('write', entry.value) end end entry end # Remove an entry from the Tuplespace. def take(tuple, sec=nil, &block) move(nil, tuple, sec, &block) end def move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? synchronize do entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) notify_event('take', entry.value) return entry.value end raise RequestExpiredError if template.expired? begin @take_waiter.push(template) while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? entry = @bag.find(template) if entry port.push(entry.value) if port @bag.delete(entry) notify_event('take', entry.value) return entry.value end template.wait end ensure @take_waiter.delete(template) end end end def read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? synchronize do entry = @bag.find(template) return entry.value if entry raise RequestExpiredError if template.expired? begin @read_waiter.push(template) template.wait raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? return template.found ensure @read_waiter.delete(template) end end end def read_all(tuple) template = WaitTemplateEntry.new(self, tuple, nil) synchronize do entry = @bag.find_all(template) entry.collect do |e| e.value end end end def notify(event, tuple, sec=nil) template = NotifyTemplateEntry.new(self, event, tuple, sec) synchronize do @notify_waiter.push(template) end template end private def keep_clean synchronize do @read_waiter.delete_unless_alive.each do |e| e.signal end @take_waiter.delete_unless_alive.each do |e| e.signal end @notify_waiter.delete_unless_alive.each do |e| e.notify(['close']) end @bag.delete_unless_alive.each do |e| notify_event('delete', e.value) end end end def notify_event(event, tuple) ev = [event, tuple] @notify_waiter.find_all_template(ev).each do |template| template.notify(ev) end end def keeper Thread.new do loop do sleep(@period) keep_clean end end end end end