diff options
Diffstat (limited to 'lib/rinda')
| -rw-r--r-- | lib/rinda/rinda.rb | 129 | ||||
| -rw-r--r-- | lib/rinda/ring.rb | 164 | ||||
| -rw-r--r-- | lib/rinda/tuplespace.rb | 356 |
3 files changed, 0 insertions, 649 deletions
diff --git a/lib/rinda/rinda.rb b/lib/rinda/rinda.rb deleted file mode 100644 index 273560b1de..0000000000 --- a/lib/rinda/rinda.rb +++ /dev/null @@ -1,129 +0,0 @@ -require 'thread' - -module Rinda - class RequestCanceledError < ThreadError; end - class RequestExpiredError < ThreadError; end - - class Tuple - def initialize(ary_or_hash) - if Hash === ary_or_hash - init_with_hash(ary_or_hash) - else - init_with_ary(ary_or_hash) - end - end - - def size - @tuple.size - end - - def [](k) - @tuple[k] - end - - def each # FIXME - if Hash === @tuple - @tuple.each { |k, v| yield(k, v) } - else - @tuple.each_with_index { |v, k| yield(k, v) } - end - end - - def value - @tuple - end - - private - def init_with_ary(ary) - @tuple_size = ary.size - @tuple = Array.new(@tuple_size) - @tuple.size.times do |i| - @tuple[i] = ary[i] - end - end - - def init_with_hash(hash) - @tuple_size = hash[:size] - @tuple = Hash.new - hash.each do |k, v| - next unless String === k - @tuple[k] = v - end - end - end - - class Template < Tuple - def match(tuple) - return false unless tuple.respond_to?(:size) - return false unless tuple.respond_to?(:[]) - return false if @tuple_size && (@tuple_size != tuple.size) - each do |k, v| - next if v.nil? - return false unless (v === tuple[k] rescue false) - end - return true - end - - def ===(tuple) - match(tuple) - end - end - - class DRbObjectTemplate - def initialize(uri=nil, ref=nil) - @drb_uri = uri - @drb_ref = ref - end - - def ===(ro) - return true if super(ro) - unless @drb_uri.nil? - return false unless (@drb_uri === ro.__drburi rescue false) - end - unless @drb_ref.nil? - return false unless (@drb_ref === ro.__drbref rescue false) - end - true - end - end - - class TupleSpaceProxy - def initialize(ts) - @ts = ts - end - - def write(tuple, sec=nil) - @ts.write(tuple, sec) - end - - def take(tuple, sec=nil, &block) - port = [] - @ts.move(DRbObject.new(port), tuple, sec, &block) - port[0] - end - - def read(tuple, sec=nil, &block) - @ts.read(tuple, sec, &block) - end - - def read_all(tuple) - @ts.read_all(tuple) - end - - def notify(ev, tuple, sec=nil) - @ts.notify(ev, tuple, sec) - end - end - - class SimpleRenewer - include DRbUndumped - def initialize(sec=180) - @sec = sec - end - - def renew - @sec - end - end -end - diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb deleted file mode 100644 index 7152dfb72e..0000000000 --- a/lib/rinda/ring.rb +++ /dev/null @@ -1,164 +0,0 @@ -# -# Note: Rinda::Ring API is unstable. -# -require 'drb/drb' -require 'rinda/rinda' -require 'thread' - -module Rinda - Ring_PORT = 7647 - class RingServer - include DRbUndumped - - def initialize(ts, port=Ring_PORT) - @ts = ts - @soc = UDPSocket.open - @soc.bind('', port) - @w_service = write_service - @r_service = reply_service - end - - def write_service - Thread.new do - loop do - msg, addr = @soc.recvfrom(1024) - do_write(msg) - end - end - end - - def do_write(msg) - Thread.new do - begin - tuple, sec = Marshal.load(msg) - @ts.write(tuple, sec) - rescue - end - end - end - - def reply_service - Thread.new do - loop do - do_reply - end - end - end - - def do_reply - tuple = @ts.take([:lookup_ring, DRbObject]) - Thread.new { tuple[1].call(@ts) rescue nil} - rescue - end - end - - class RingFinger - @@finger = nil - def self.finger - unless @@finger - @@finger = self.new - @@finger.lookup_ring_any - end - @@finger - end - - def self.primary - finger.primary - end - - def self.to_a - finger.to_a - end - - @@broadcast_list = ['<broadcast>', 'localhost'] - def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT) - @broadcast_list = broadcast_list || ['localhost'] - @port = port - @primary = nil - @rings = [] - end - attr_accessor :broadcast_list, :port, :primary - - def to_a - @rings - end - - def each - lookup_ring_any unless @primary - return unless @primary - yield(@primary) - @rings.each { |x| yield(x) } - end - - def lookup_ring(timeout=5, &block) - return lookup_ring_any(timeout) unless block_given? - - msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) - @broadcast_list.each do |it| - soc = UDPSocket.open - begin - soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) - soc.send(msg, 0, it, @port) - rescue - nil - ensure - soc.close - end - end - sleep(timeout) - end - - def lookup_ring_any(timeout=5) - queue = Queue.new - - th = Thread.new do - self.lookup_ring(timeout) do |ts| - queue.push(ts) - end - queue.push(nil) - while it = queue.pop - @rings.push(it) - end - end - - @primary = queue.pop - raise('RingNotFound') if @primary.nil? - @primary - end - end - - class RingProvider - def initialize(klass, front, desc, renewer = nil) - @tuple = [:name, klass, front, desc] - @renewer = renewer || Rinda::SimpleRenewer.new - end - - def provide - ts = Rinda::RingFinger.primary - ts.write(@tuple, @renewer) - end - end -end - -if __FILE__ == $0 - DRb.start_service - case ARGV.shift - when 's' - require 'rinda/tuplespace' - ts = Rinda::TupleSpace.new - place = Rinda::RingServer.new(ts) - $stdin.gets - when 'w' - finger = Rinda::RingFinger.new(nil) - finger.lookup_ring do |ts| - p ts - ts.write([:hello, :world]) - end - when 'r' - finger = Rinda::RingFinger.new(nil) - finger.lookup_ring do |ts| - p ts - p ts.take([nil, nil]) - end - end -end diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb deleted file mode 100644 index 4bd4a42af5..0000000000 --- a/lib/rinda/tuplespace.rb +++ /dev/null @@ -1,356 +0,0 @@ -require 'monitor' -require 'thread' -require 'drb/drb' -require 'rinda/rinda' - -module Rinda - class TupleEntry - include DRbUndumped - - def initialize(ary, sec=nil) - @cancel = false - @ary = make_tuple(ary) - @renewer = nil - renew(sec) - end - attr_accessor :expires - - def cancel - @cancel = true - end - - def alive? - !canceled? && !expired? - end - - def value; @ary.value; end - def canceled?; @cancel; end - def expired? - return true unless @expires - return false if @expires > Time.now - return true if @renewer.nil? - renew(@renewer) - return true unless @expires - return @expires < Time.now - end - - def renew(sec_or_renewer) - sec, @renewer = get_renewer(sec_or_renewer) - @expires = make_expires(sec) - end - - def make_expires(sec=nil) - case sec - when Numeric - Time.now + sec - when true - Time.at(1) - when nil - Time.at(2**31-1) - end - end - - def [](key) - @ary[key] - end - - def size - @ary.size - end - - def make_tuple(ary) - Rinda::Tuple.new(ary) - end - - private - def get_renewer(it) - case it - when Numeric, true, nil - return it, nil - else - begin - return it.renew, it - rescue Exception - return it, nil - end - end - end - end - - class TemplateEntry < TupleEntry - def initialize(ary, expires=nil) - super(ary, expires) - @template = Rinda::Template.new(ary) - end - - def match(tuple) - @template.match(tuple) - end - - def ===(tuple) - match(tuple) - end - - def make_tuple(ary) - Rinda::Template.new(ary) - end - end - - class WaitTemplateEntry < TemplateEntry - def initialize(place, ary, expires=nil) - super(ary, expires) - @place = place - @cond = place.new_cond - @found = nil - end - attr_reader :found - - def cancel - super - signal - end - - def wait - @cond.wait - end - - def read(tuple) - @found = tuple - signal - end - - def signal - @place.synchronize do - @cond.signal - end - end - end - - class NotifyTemplateEntry < TemplateEntry - def initialize(place, event, tuple, expires=nil) - ary = [event, Rinda::Template.new(tuple)] - super(ary, expires) - @queue = Queue.new - @done = false - end - - def notify(ev) - @queue.push(ev) - end - - def pop - raise RequestExpiredError if @done - it = @queue.pop - @done = true if it[0] == 'close' - return it - end - - def each - while !@done - it = pop - yield(it) - end - rescue - ensure - cancel - end - end - - class TupleBag - def initialize - @hash = {} - end - - def push(ary) - size = ary.size - @hash[size] ||= [] - @hash[size].push(ary) - end - - def delete(ary) - size = ary.size - @hash.fetch(size, []).delete(ary) - end - - def find_all(template) - @hash.fetch(template.size, []).find_all do |tuple| - tuple.alive? && template.match(tuple) - end - end - - def find(template) - @hash.fetch(template.size, []).find do |tuple| - tuple.alive? && template.match(tuple) - end - end - - def find_all_template(tuple) - @hash.fetch(tuple.size, []).find_all do |template| - template.alive? && template.match(tuple) - end - end - - def delete_unless_alive - deleted = [] - @hash.keys.each do |size| - ary = [] - @hash[size].each do |tuple| - if tuple.alive? - ary.push(tuple) - else - deleted.push(tuple) - end - end - @hash[size] = ary - end - deleted - end - end - - class TupleSpace - include DRbUndumped - include MonitorMixin - def initialize(timeout=60) - super() - @bag = TupleBag.new - @read_waiter = TupleBag.new - @take_waiter = TupleBag.new - @notify_waiter = TupleBag.new - @timeout = timeout - @period = timeout * 2 - @keeper = keeper - end - - def write(tuple, sec=nil) - entry = TupleEntry.new(tuple, sec) - synchronize do - if entry.expired? - @read_waiter.find_all_template(entry).each do |template| - template.read(tuple) - end - notify_event('write', entry.value) - notify_event('delete', entry.value) - else - @bag.push(entry) - @read_waiter.find_all_template(entry).each do |template| - template.read(tuple) - end - @take_waiter.find_all_template(entry).each do |template| - template.signal - end - notify_event('write', entry.value) - end - end - entry - end - - def take(tuple, sec=nil, &block) - move(nil, tuple, sec, &block) - end - - def move(port, tuple, sec=nil) - template = WaitTemplateEntry.new(self, tuple, sec) - yield(template) if block_given? - synchronize do - entry = @bag.find(template) - if entry - port.push(entry.value) if port - @bag.delete(entry) - notify_event('take', entry.value) - return entry.value - end - return nil if template.expired? - - begin - @take_waiter.push(template) - while true - raise RequestCanceledError if template.canceled? - raise RequestExpiredError if template.expired? - entry = @bag.find(template) - if entry - port.push(entry.value) if port - @bag.delete(entry) - notify_event('take', entry.value) - return entry.value - end - template.wait - end - ensure - @take_waiter.delete(template) - end - end - end - - def read(tuple, sec=nil) - template = WaitTemplateEntry.new(self, tuple, sec) - yield(template) if block_given? - synchronize do - entry = @bag.find(template) - return entry.value if entry - return nil if template.expired? - - begin - @read_waiter.push(template) - template.wait - raise RequestCanceledError if template.canceled? - raise RequestExpiredError if template.expired? - return template.found - ensure - @read_waiter.delete(template) - end - end - end - - def read_all(tuple) - template = WaitTemplateEntry.new(self, tuple, nil) - synchronize do - entry = @bag.find_all(template) - entry.collect do |e| - e.value - end - end - end - - def notify(event, tuple, sec=nil) - template = NotifyTemplateEntry.new(self, event, tuple, sec) - synchronize do - @notify_waiter.push(template) - end - template - end - - private - def keep_clean - synchronize do - @read_waiter.delete_unless_alive.each do |e| - e.signal - end - @take_waiter.delete_unless_alive.each do |e| - e.signal - end - @notify_waiter.delete_unless_alive.each do |e| - e.notify(['close']) - end - @bag.delete_unless_alive.each do |e| - notify_event('delete', e.value) - end - end - end - - def notify_event(event, tuple) - ev = [event, tuple] - @notify_waiter.find_all_template(ev).each do |template| - template.notify(ev) - end - end - - def keeper - Thread.new do - loop do - sleep(@period) - keep_clean - end - end - end - end -end |
