diff options
Diffstat (limited to 'lib/rinda/tuplespace.rb')
| -rw-r--r-- | lib/rinda/tuplespace.rb | 356 |
1 files changed, 0 insertions, 356 deletions
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb deleted file mode 100644 index 4bd4a42af5..0000000000 --- a/lib/rinda/tuplespace.rb +++ /dev/null @@ -1,356 +0,0 @@ -require 'monitor' -require 'thread' -require 'drb/drb' -require 'rinda/rinda' - -module Rinda - class TupleEntry - include DRbUndumped - - def initialize(ary, sec=nil) - @cancel = false - @ary = make_tuple(ary) - @renewer = nil - renew(sec) - end - attr_accessor :expires - - def cancel - @cancel = true - end - - def alive? - !canceled? && !expired? - end - - def value; @ary.value; end - def canceled?; @cancel; end - def expired? - return true unless @expires - return false if @expires > Time.now - return true if @renewer.nil? - renew(@renewer) - return true unless @expires - return @expires < Time.now - end - - def renew(sec_or_renewer) - sec, @renewer = get_renewer(sec_or_renewer) - @expires = make_expires(sec) - end - - def make_expires(sec=nil) - case sec - when Numeric - Time.now + sec - when true - Time.at(1) - when nil - Time.at(2**31-1) - end - end - - def [](key) - @ary[key] - end - - def size - @ary.size - end - - def make_tuple(ary) - Rinda::Tuple.new(ary) - end - - private - def get_renewer(it) - case it - when Numeric, true, nil - return it, nil - else - begin - return it.renew, it - rescue Exception - return it, nil - end - end - end - end - - class TemplateEntry < TupleEntry - def initialize(ary, expires=nil) - super(ary, expires) - @template = Rinda::Template.new(ary) - end - - def match(tuple) - @template.match(tuple) - end - - def ===(tuple) - match(tuple) - end - - def make_tuple(ary) - Rinda::Template.new(ary) - end - end - - class WaitTemplateEntry < TemplateEntry - def initialize(place, ary, expires=nil) - super(ary, expires) - @place = place - @cond = place.new_cond - @found = nil - end - attr_reader :found - - def cancel - super - signal - end - - def wait - @cond.wait - end - - def read(tuple) - @found = tuple - signal - end - - def signal - @place.synchronize do - @cond.signal - end - end - end - - class NotifyTemplateEntry < TemplateEntry - def initialize(place, event, tuple, expires=nil) - ary = [event, Rinda::Template.new(tuple)] - super(ary, expires) - @queue = Queue.new - @done = false - end - - def notify(ev) - @queue.push(ev) - end - - def pop - raise RequestExpiredError if @done - it = @queue.pop - @done = true if it[0] == 'close' - return it - end - - def each - while !@done - it = pop - yield(it) - end - rescue - ensure - cancel - end - end - - class TupleBag - def initialize - @hash = {} - end - - def push(ary) - size = ary.size - @hash[size] ||= [] - @hash[size].push(ary) - end - - def delete(ary) - size = ary.size - @hash.fetch(size, []).delete(ary) - end - - def find_all(template) - @hash.fetch(template.size, []).find_all do |tuple| - tuple.alive? && template.match(tuple) - end - end - - def find(template) - @hash.fetch(template.size, []).find do |tuple| - tuple.alive? && template.match(tuple) - end - end - - def find_all_template(tuple) - @hash.fetch(tuple.size, []).find_all do |template| - template.alive? && template.match(tuple) - end - end - - def delete_unless_alive - deleted = [] - @hash.keys.each do |size| - ary = [] - @hash[size].each do |tuple| - if tuple.alive? - ary.push(tuple) - else - deleted.push(tuple) - end - end - @hash[size] = ary - end - deleted - end - end - - class TupleSpace - include DRbUndumped - include MonitorMixin - def initialize(timeout=60) - super() - @bag = TupleBag.new - @read_waiter = TupleBag.new - @take_waiter = TupleBag.new - @notify_waiter = TupleBag.new - @timeout = timeout - @period = timeout * 2 - @keeper = keeper - end - - def write(tuple, sec=nil) - entry = TupleEntry.new(tuple, sec) - synchronize do - if entry.expired? - @read_waiter.find_all_template(entry).each do |template| - template.read(tuple) - end - notify_event('write', entry.value) - notify_event('delete', entry.value) - else - @bag.push(entry) - @read_waiter.find_all_template(entry).each do |template| - template.read(tuple) - end - @take_waiter.find_all_template(entry).each do |template| - template.signal - end - notify_event('write', entry.value) - end - end - entry - end - - def take(tuple, sec=nil, &block) - move(nil, tuple, sec, &block) - end - - def move(port, tuple, sec=nil) - template = WaitTemplateEntry.new(self, tuple, sec) - yield(template) if block_given? - synchronize do - entry = @bag.find(template) - if entry - port.push(entry.value) if port - @bag.delete(entry) - notify_event('take', entry.value) - return entry.value - end - return nil if template.expired? - - begin - @take_waiter.push(template) - while true - raise RequestCanceledError if template.canceled? - raise RequestExpiredError if template.expired? - entry = @bag.find(template) - if entry - port.push(entry.value) if port - @bag.delete(entry) - notify_event('take', entry.value) - return entry.value - end - template.wait - end - ensure - @take_waiter.delete(template) - end - end - end - - def read(tuple, sec=nil) - template = WaitTemplateEntry.new(self, tuple, sec) - yield(template) if block_given? - synchronize do - entry = @bag.find(template) - return entry.value if entry - return nil if template.expired? - - begin - @read_waiter.push(template) - template.wait - raise RequestCanceledError if template.canceled? - raise RequestExpiredError if template.expired? - return template.found - ensure - @read_waiter.delete(template) - end - end - end - - def read_all(tuple) - template = WaitTemplateEntry.new(self, tuple, nil) - synchronize do - entry = @bag.find_all(template) - entry.collect do |e| - e.value - end - end - end - - def notify(event, tuple, sec=nil) - template = NotifyTemplateEntry.new(self, event, tuple, sec) - synchronize do - @notify_waiter.push(template) - end - template - end - - private - def keep_clean - synchronize do - @read_waiter.delete_unless_alive.each do |e| - e.signal - end - @take_waiter.delete_unless_alive.each do |e| - e.signal - end - @notify_waiter.delete_unless_alive.each do |e| - e.notify(['close']) - end - @bag.delete_unless_alive.each do |e| - notify_event('delete', e.value) - end - end - end - - def notify_event(event, tuple) - ev = [event, tuple] - @notify_waiter.find_all_template(ev).each do |template| - template.notify(ev) - end - end - - def keeper - Thread.new do - loop do - sleep(@period) - keep_clean - end - end - end - end -end |
