summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--MANIFEST3
-rw-r--r--lib/rinda/rinda.rb129
-rw-r--r--lib/rinda/ring.rb164
-rw-r--r--lib/rinda/tuplespace.rb356
4 files changed, 652 insertions, 0 deletions
diff --git a/MANIFEST b/MANIFEST
index db0eb00d8d..fbcf689271 100644
--- a/MANIFEST
+++ b/MANIFEST
@@ -273,6 +273,9 @@ lib/rexml/xmldecl.rb
lib/rexml/xmltokens.rb
lib/rexml/xpath.rb
lib/rexml/xpath_parser.rb
+lib/rinda/rinda.rb
+lib/rinda/ring.rb
+lib/rinda/tuplespace.rb
lib/rubyunit.rb
lib/runit/assert.rb
lib/runit/cui/testrunner.rb
diff --git a/lib/rinda/rinda.rb b/lib/rinda/rinda.rb
new file mode 100644
index 0000000000..72b0e45cdf
--- /dev/null
+++ b/lib/rinda/rinda.rb
@@ -0,0 +1,129 @@
+require 'thread'
+
+module Rinda
+ class RequestCanceledError < ThreadError; end
+ class RequestExpiredError < ThreadError; end
+
+ class Tuple
+ def initialize(ary_or_hash)
+ if Hash === ary_or_hash
+ init_with_hash(ary_or_hash)
+ else
+ init_with_ary(ary_or_hash)
+ end
+ end
+
+ def size
+ @tuple.size
+ end
+
+ def [](k)
+ @tuple[k]
+ end
+
+ def each # FIXME
+ if Hash === @tuple
+ @tuple.each { |k, v| yield(k, v) }
+ else
+ @tuple.each_with_index { |v, k| yield(k, v) }
+ end
+ end
+
+ def value
+ @tuple
+ end
+
+ private
+ def init_with_ary(ary)
+ @tuple_size = ary.size
+ @tuple = Array.new(@tuple_size)
+ @tuple.size.times do |i|
+ @tuple[i] = ary[i]
+ end
+ end
+
+ def init_with_hash(hash)
+ @tuple_size = hash[:size]
+ @tuple = Hash.new
+ hash.each do |k, v|
+ next unless String === k
+ @tuple[k] = v
+ end
+ end
+ end
+
+ class Template < Tuple
+ def match(tuple)
+ return false unless tuple.respond_to?(:size)
+ return false unless tuple.respond_to?(:[])
+ return false if @tuple_size && (@tuple_size != tuple.size)
+ each do |k, v|
+ next if v.nil?
+ return false unless (v === tuple[k] rescue false)
+ end
+ return true
+ end
+
+ def ===(tuple)
+ match(tuple)
+ end
+ end
+
+ class DRbObjectTemplate
+ def initialize(uri=nil, ref=nil)
+ @drb_uri = uri
+ @drb_ref = ref
+ end
+
+ def ===(ro)
+ return true if super(ro)
+ unless @drb_uri.nil?
+ return false unless (@drb_uri === ro.__drburi rescue false)
+ end
+ unless @drb_ref.nil?
+ return false unless (@drb_ref === ro.__drbref rescue false)
+ end
+ true
+ end
+ end
+
+ class TupleSpaceProxy
+ def initialize(ts)
+ @ts = ts
+ end
+
+ def write(tuple, sec=nil)
+ @ts.write(tuple, sec)
+ end
+
+ def take(tuple, sec=nil, &block)
+ port = []
+ @ts.move(DRbObject.new(port), tuple, sec, &block)
+ port[0]
+ end
+
+ def read(tuple, sec=nil)
+ @ts.read(tuple, sec)
+ end
+
+ def read_all(tuple)
+ @ts.read_all
+ end
+
+ def notify(ev, tuple, sec=nil)
+ @ts.notify(ev, tuple, sec)
+ end
+ end
+
+ class SimpleRenewer
+ include DRbUndumped
+ def initialize(sec=180)
+ @sec = sec
+ end
+
+ def renew
+ @sec
+ end
+ end
+end
+
diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb
new file mode 100644
index 0000000000..7152dfb72e
--- /dev/null
+++ b/lib/rinda/ring.rb
@@ -0,0 +1,164 @@
+#
+# Note: Rinda::Ring API is unstable.
+#
+require 'drb/drb'
+require 'rinda/rinda'
+require 'thread'
+
+module Rinda
+ Ring_PORT = 7647
+ class RingServer
+ include DRbUndumped
+
+ def initialize(ts, port=Ring_PORT)
+ @ts = ts
+ @soc = UDPSocket.open
+ @soc.bind('', port)
+ @w_service = write_service
+ @r_service = reply_service
+ end
+
+ def write_service
+ Thread.new do
+ loop do
+ msg, addr = @soc.recvfrom(1024)
+ do_write(msg)
+ end
+ end
+ end
+
+ def do_write(msg)
+ Thread.new do
+ begin
+ tuple, sec = Marshal.load(msg)
+ @ts.write(tuple, sec)
+ rescue
+ end
+ end
+ end
+
+ def reply_service
+ Thread.new do
+ loop do
+ do_reply
+ end
+ end
+ end
+
+ def do_reply
+ tuple = @ts.take([:lookup_ring, DRbObject])
+ Thread.new { tuple[1].call(@ts) rescue nil}
+ rescue
+ end
+ end
+
+ class RingFinger
+ @@finger = nil
+ def self.finger
+ unless @@finger
+ @@finger = self.new
+ @@finger.lookup_ring_any
+ end
+ @@finger
+ end
+
+ def self.primary
+ finger.primary
+ end
+
+ def self.to_a
+ finger.to_a
+ end
+
+ @@broadcast_list = ['<broadcast>', 'localhost']
+ def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
+ @broadcast_list = broadcast_list || ['localhost']
+ @port = port
+ @primary = nil
+ @rings = []
+ end
+ attr_accessor :broadcast_list, :port, :primary
+
+ def to_a
+ @rings
+ end
+
+ def each
+ lookup_ring_any unless @primary
+ return unless @primary
+ yield(@primary)
+ @rings.each { |x| yield(x) }
+ end
+
+ def lookup_ring(timeout=5, &block)
+ return lookup_ring_any(timeout) unless block_given?
+
+ msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
+ @broadcast_list.each do |it|
+ soc = UDPSocket.open
+ begin
+ soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
+ soc.send(msg, 0, it, @port)
+ rescue
+ nil
+ ensure
+ soc.close
+ end
+ end
+ sleep(timeout)
+ end
+
+ def lookup_ring_any(timeout=5)
+ queue = Queue.new
+
+ th = Thread.new do
+ self.lookup_ring(timeout) do |ts|
+ queue.push(ts)
+ end
+ queue.push(nil)
+ while it = queue.pop
+ @rings.push(it)
+ end
+ end
+
+ @primary = queue.pop
+ raise('RingNotFound') if @primary.nil?
+ @primary
+ end
+ end
+
+ class RingProvider
+ def initialize(klass, front, desc, renewer = nil)
+ @tuple = [:name, klass, front, desc]
+ @renewer = renewer || Rinda::SimpleRenewer.new
+ end
+
+ def provide
+ ts = Rinda::RingFinger.primary
+ ts.write(@tuple, @renewer)
+ end
+ end
+end
+
+if __FILE__ == $0
+ DRb.start_service
+ case ARGV.shift
+ when 's'
+ require 'rinda/tuplespace'
+ ts = Rinda::TupleSpace.new
+ place = Rinda::RingServer.new(ts)
+ $stdin.gets
+ when 'w'
+ finger = Rinda::RingFinger.new(nil)
+ finger.lookup_ring do |ts|
+ p ts
+ ts.write([:hello, :world])
+ end
+ when 'r'
+ finger = Rinda::RingFinger.new(nil)
+ finger.lookup_ring do |ts|
+ p ts
+ p ts.take([nil, nil])
+ end
+ end
+end
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb
new file mode 100644
index 0000000000..4bd4a42af5
--- /dev/null
+++ b/lib/rinda/tuplespace.rb
@@ -0,0 +1,356 @@
+require 'monitor'
+require 'thread'
+require 'drb/drb'
+require 'rinda/rinda'
+
+module Rinda
+ class TupleEntry
+ include DRbUndumped
+
+ def initialize(ary, sec=nil)
+ @cancel = false
+ @ary = make_tuple(ary)
+ @renewer = nil
+ renew(sec)
+ end
+ attr_accessor :expires
+
+ def cancel
+ @cancel = true
+ end
+
+ def alive?
+ !canceled? && !expired?
+ end
+
+ def value; @ary.value; end
+ def canceled?; @cancel; end
+ def expired?
+ return true unless @expires
+ return false if @expires > Time.now
+ return true if @renewer.nil?
+ renew(@renewer)
+ return true unless @expires
+ return @expires < Time.now
+ end
+
+ def renew(sec_or_renewer)
+ sec, @renewer = get_renewer(sec_or_renewer)
+ @expires = make_expires(sec)
+ end
+
+ def make_expires(sec=nil)
+ case sec
+ when Numeric
+ Time.now + sec
+ when true
+ Time.at(1)
+ when nil
+ Time.at(2**31-1)
+ end
+ end
+
+ def [](key)
+ @ary[key]
+ end
+
+ def size
+ @ary.size
+ end
+
+ def make_tuple(ary)
+ Rinda::Tuple.new(ary)
+ end
+
+ private
+ def get_renewer(it)
+ case it
+ when Numeric, true, nil
+ return it, nil
+ else
+ begin
+ return it.renew, it
+ rescue Exception
+ return it, nil
+ end
+ end
+ end
+ end
+
+ class TemplateEntry < TupleEntry
+ def initialize(ary, expires=nil)
+ super(ary, expires)
+ @template = Rinda::Template.new(ary)
+ end
+
+ def match(tuple)
+ @template.match(tuple)
+ end
+
+ def ===(tuple)
+ match(tuple)
+ end
+
+ def make_tuple(ary)
+ Rinda::Template.new(ary)
+ end
+ end
+
+ class WaitTemplateEntry < TemplateEntry
+ def initialize(place, ary, expires=nil)
+ super(ary, expires)
+ @place = place
+ @cond = place.new_cond
+ @found = nil
+ end
+ attr_reader :found
+
+ def cancel
+ super
+ signal
+ end
+
+ def wait
+ @cond.wait
+ end
+
+ def read(tuple)
+ @found = tuple
+ signal
+ end
+
+ def signal
+ @place.synchronize do
+ @cond.signal
+ end
+ end
+ end
+
+ class NotifyTemplateEntry < TemplateEntry
+ def initialize(place, event, tuple, expires=nil)
+ ary = [event, Rinda::Template.new(tuple)]
+ super(ary, expires)
+ @queue = Queue.new
+ @done = false
+ end
+
+ def notify(ev)
+ @queue.push(ev)
+ end
+
+ def pop
+ raise RequestExpiredError if @done
+ it = @queue.pop
+ @done = true if it[0] == 'close'
+ return it
+ end
+
+ def each
+ while !@done
+ it = pop
+ yield(it)
+ end
+ rescue
+ ensure
+ cancel
+ end
+ end
+
+ class TupleBag
+ def initialize
+ @hash = {}
+ end
+
+ def push(ary)
+ size = ary.size
+ @hash[size] ||= []
+ @hash[size].push(ary)
+ end
+
+ def delete(ary)
+ size = ary.size
+ @hash.fetch(size, []).delete(ary)
+ end
+
+ def find_all(template)
+ @hash.fetch(template.size, []).find_all do |tuple|
+ tuple.alive? && template.match(tuple)
+ end
+ end
+
+ def find(template)
+ @hash.fetch(template.size, []).find do |tuple|
+ tuple.alive? && template.match(tuple)
+ end
+ end
+
+ def find_all_template(tuple)
+ @hash.fetch(tuple.size, []).find_all do |template|
+ template.alive? && template.match(tuple)
+ end
+ end
+
+ def delete_unless_alive
+ deleted = []
+ @hash.keys.each do |size|
+ ary = []
+ @hash[size].each do |tuple|
+ if tuple.alive?
+ ary.push(tuple)
+ else
+ deleted.push(tuple)
+ end
+ end
+ @hash[size] = ary
+ end
+ deleted
+ end
+ end
+
+ class TupleSpace
+ include DRbUndumped
+ include MonitorMixin
+ def initialize(timeout=60)
+ super()
+ @bag = TupleBag.new
+ @read_waiter = TupleBag.new
+ @take_waiter = TupleBag.new
+ @notify_waiter = TupleBag.new
+ @timeout = timeout
+ @period = timeout * 2
+ @keeper = keeper
+ end
+
+ def write(tuple, sec=nil)
+ entry = TupleEntry.new(tuple, sec)
+ synchronize do
+ if entry.expired?
+ @read_waiter.find_all_template(entry).each do |template|
+ template.read(tuple)
+ end
+ notify_event('write', entry.value)
+ notify_event('delete', entry.value)
+ else
+ @bag.push(entry)
+ @read_waiter.find_all_template(entry).each do |template|
+ template.read(tuple)
+ end
+ @take_waiter.find_all_template(entry).each do |template|
+ template.signal
+ end
+ notify_event('write', entry.value)
+ end
+ end
+ entry
+ end
+
+ def take(tuple, sec=nil, &block)
+ move(nil, tuple, sec, &block)
+ end
+
+ def move(port, tuple, sec=nil)
+ template = WaitTemplateEntry.new(self, tuple, sec)
+ yield(template) if block_given?
+ synchronize do
+ entry = @bag.find(template)
+ if entry
+ port.push(entry.value) if port
+ @bag.delete(entry)
+ notify_event('take', entry.value)
+ return entry.value
+ end
+ return nil if template.expired?
+
+ begin
+ @take_waiter.push(template)
+ while true
+ raise RequestCanceledError if template.canceled?
+ raise RequestExpiredError if template.expired?
+ entry = @bag.find(template)
+ if entry
+ port.push(entry.value) if port
+ @bag.delete(entry)
+ notify_event('take', entry.value)
+ return entry.value
+ end
+ template.wait
+ end
+ ensure
+ @take_waiter.delete(template)
+ end
+ end
+ end
+
+ def read(tuple, sec=nil)
+ template = WaitTemplateEntry.new(self, tuple, sec)
+ yield(template) if block_given?
+ synchronize do
+ entry = @bag.find(template)
+ return entry.value if entry
+ return nil if template.expired?
+
+ begin
+ @read_waiter.push(template)
+ template.wait
+ raise RequestCanceledError if template.canceled?
+ raise RequestExpiredError if template.expired?
+ return template.found
+ ensure
+ @read_waiter.delete(template)
+ end
+ end
+ end
+
+ def read_all(tuple)
+ template = WaitTemplateEntry.new(self, tuple, nil)
+ synchronize do
+ entry = @bag.find_all(template)
+ entry.collect do |e|
+ e.value
+ end
+ end
+ end
+
+ def notify(event, tuple, sec=nil)
+ template = NotifyTemplateEntry.new(self, event, tuple, sec)
+ synchronize do
+ @notify_waiter.push(template)
+ end
+ template
+ end
+
+ private
+ def keep_clean
+ synchronize do
+ @read_waiter.delete_unless_alive.each do |e|
+ e.signal
+ end
+ @take_waiter.delete_unless_alive.each do |e|
+ e.signal
+ end
+ @notify_waiter.delete_unless_alive.each do |e|
+ e.notify(['close'])
+ end
+ @bag.delete_unless_alive.each do |e|
+ notify_event('delete', e.value)
+ end
+ end
+ end
+
+ def notify_event(event, tuple)
+ ev = [event, tuple]
+ @notify_waiter.find_all_template(ev).each do |template|
+ template.notify(ev)
+ end
+ end
+
+ def keeper
+ Thread.new do
+ loop do
+ sleep(@period)
+ keep_clean
+ end
+ end
+ end
+ end
+end