summaryrefslogtreecommitdiff
path: root/lib/rinda
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rinda')
-rw-r--r--lib/rinda/rinda.rb129
-rw-r--r--lib/rinda/ring.rb164
-rw-r--r--lib/rinda/tuplespace.rb356
3 files changed, 0 insertions, 649 deletions
diff --git a/lib/rinda/rinda.rb b/lib/rinda/rinda.rb
deleted file mode 100644
index 273560b1de..0000000000
--- a/lib/rinda/rinda.rb
+++ /dev/null
@@ -1,129 +0,0 @@
-require 'thread'
-
-module Rinda
- class RequestCanceledError < ThreadError; end
- class RequestExpiredError < ThreadError; end
-
- class Tuple
- def initialize(ary_or_hash)
- if Hash === ary_or_hash
- init_with_hash(ary_or_hash)
- else
- init_with_ary(ary_or_hash)
- end
- end
-
- def size
- @tuple.size
- end
-
- def [](k)
- @tuple[k]
- end
-
- def each # FIXME
- if Hash === @tuple
- @tuple.each { |k, v| yield(k, v) }
- else
- @tuple.each_with_index { |v, k| yield(k, v) }
- end
- end
-
- def value
- @tuple
- end
-
- private
- def init_with_ary(ary)
- @tuple_size = ary.size
- @tuple = Array.new(@tuple_size)
- @tuple.size.times do |i|
- @tuple[i] = ary[i]
- end
- end
-
- def init_with_hash(hash)
- @tuple_size = hash[:size]
- @tuple = Hash.new
- hash.each do |k, v|
- next unless String === k
- @tuple[k] = v
- end
- end
- end
-
- class Template < Tuple
- def match(tuple)
- return false unless tuple.respond_to?(:size)
- return false unless tuple.respond_to?(:[])
- return false if @tuple_size && (@tuple_size != tuple.size)
- each do |k, v|
- next if v.nil?
- return false unless (v === tuple[k] rescue false)
- end
- return true
- end
-
- def ===(tuple)
- match(tuple)
- end
- end
-
- class DRbObjectTemplate
- def initialize(uri=nil, ref=nil)
- @drb_uri = uri
- @drb_ref = ref
- end
-
- def ===(ro)
- return true if super(ro)
- unless @drb_uri.nil?
- return false unless (@drb_uri === ro.__drburi rescue false)
- end
- unless @drb_ref.nil?
- return false unless (@drb_ref === ro.__drbref rescue false)
- end
- true
- end
- end
-
- class TupleSpaceProxy
- def initialize(ts)
- @ts = ts
- end
-
- def write(tuple, sec=nil)
- @ts.write(tuple, sec)
- end
-
- def take(tuple, sec=nil, &block)
- port = []
- @ts.move(DRbObject.new(port), tuple, sec, &block)
- port[0]
- end
-
- def read(tuple, sec=nil, &block)
- @ts.read(tuple, sec, &block)
- end
-
- def read_all(tuple)
- @ts.read_all(tuple)
- end
-
- def notify(ev, tuple, sec=nil)
- @ts.notify(ev, tuple, sec)
- end
- end
-
- class SimpleRenewer
- include DRbUndumped
- def initialize(sec=180)
- @sec = sec
- end
-
- def renew
- @sec
- end
- end
-end
-
diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb
deleted file mode 100644
index 7152dfb72e..0000000000
--- a/lib/rinda/ring.rb
+++ /dev/null
@@ -1,164 +0,0 @@
-#
-# Note: Rinda::Ring API is unstable.
-#
-require 'drb/drb'
-require 'rinda/rinda'
-require 'thread'
-
-module Rinda
- Ring_PORT = 7647
- class RingServer
- include DRbUndumped
-
- def initialize(ts, port=Ring_PORT)
- @ts = ts
- @soc = UDPSocket.open
- @soc.bind('', port)
- @w_service = write_service
- @r_service = reply_service
- end
-
- def write_service
- Thread.new do
- loop do
- msg, addr = @soc.recvfrom(1024)
- do_write(msg)
- end
- end
- end
-
- def do_write(msg)
- Thread.new do
- begin
- tuple, sec = Marshal.load(msg)
- @ts.write(tuple, sec)
- rescue
- end
- end
- end
-
- def reply_service
- Thread.new do
- loop do
- do_reply
- end
- end
- end
-
- def do_reply
- tuple = @ts.take([:lookup_ring, DRbObject])
- Thread.new { tuple[1].call(@ts) rescue nil}
- rescue
- end
- end
-
- class RingFinger
- @@finger = nil
- def self.finger
- unless @@finger
- @@finger = self.new
- @@finger.lookup_ring_any
- end
- @@finger
- end
-
- def self.primary
- finger.primary
- end
-
- def self.to_a
- finger.to_a
- end
-
- @@broadcast_list = ['<broadcast>', 'localhost']
- def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
- @broadcast_list = broadcast_list || ['localhost']
- @port = port
- @primary = nil
- @rings = []
- end
- attr_accessor :broadcast_list, :port, :primary
-
- def to_a
- @rings
- end
-
- def each
- lookup_ring_any unless @primary
- return unless @primary
- yield(@primary)
- @rings.each { |x| yield(x) }
- end
-
- def lookup_ring(timeout=5, &block)
- return lookup_ring_any(timeout) unless block_given?
-
- msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
- @broadcast_list.each do |it|
- soc = UDPSocket.open
- begin
- soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
- soc.send(msg, 0, it, @port)
- rescue
- nil
- ensure
- soc.close
- end
- end
- sleep(timeout)
- end
-
- def lookup_ring_any(timeout=5)
- queue = Queue.new
-
- th = Thread.new do
- self.lookup_ring(timeout) do |ts|
- queue.push(ts)
- end
- queue.push(nil)
- while it = queue.pop
- @rings.push(it)
- end
- end
-
- @primary = queue.pop
- raise('RingNotFound') if @primary.nil?
- @primary
- end
- end
-
- class RingProvider
- def initialize(klass, front, desc, renewer = nil)
- @tuple = [:name, klass, front, desc]
- @renewer = renewer || Rinda::SimpleRenewer.new
- end
-
- def provide
- ts = Rinda::RingFinger.primary
- ts.write(@tuple, @renewer)
- end
- end
-end
-
-if __FILE__ == $0
- DRb.start_service
- case ARGV.shift
- when 's'
- require 'rinda/tuplespace'
- ts = Rinda::TupleSpace.new
- place = Rinda::RingServer.new(ts)
- $stdin.gets
- when 'w'
- finger = Rinda::RingFinger.new(nil)
- finger.lookup_ring do |ts|
- p ts
- ts.write([:hello, :world])
- end
- when 'r'
- finger = Rinda::RingFinger.new(nil)
- finger.lookup_ring do |ts|
- p ts
- p ts.take([nil, nil])
- end
- end
-end
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb
deleted file mode 100644
index 4bd4a42af5..0000000000
--- a/lib/rinda/tuplespace.rb
+++ /dev/null
@@ -1,356 +0,0 @@
-require 'monitor'
-require 'thread'
-require 'drb/drb'
-require 'rinda/rinda'
-
-module Rinda
- class TupleEntry
- include DRbUndumped
-
- def initialize(ary, sec=nil)
- @cancel = false
- @ary = make_tuple(ary)
- @renewer = nil
- renew(sec)
- end
- attr_accessor :expires
-
- def cancel
- @cancel = true
- end
-
- def alive?
- !canceled? && !expired?
- end
-
- def value; @ary.value; end
- def canceled?; @cancel; end
- def expired?
- return true unless @expires
- return false if @expires > Time.now
- return true if @renewer.nil?
- renew(@renewer)
- return true unless @expires
- return @expires < Time.now
- end
-
- def renew(sec_or_renewer)
- sec, @renewer = get_renewer(sec_or_renewer)
- @expires = make_expires(sec)
- end
-
- def make_expires(sec=nil)
- case sec
- when Numeric
- Time.now + sec
- when true
- Time.at(1)
- when nil
- Time.at(2**31-1)
- end
- end
-
- def [](key)
- @ary[key]
- end
-
- def size
- @ary.size
- end
-
- def make_tuple(ary)
- Rinda::Tuple.new(ary)
- end
-
- private
- def get_renewer(it)
- case it
- when Numeric, true, nil
- return it, nil
- else
- begin
- return it.renew, it
- rescue Exception
- return it, nil
- end
- end
- end
- end
-
- class TemplateEntry < TupleEntry
- def initialize(ary, expires=nil)
- super(ary, expires)
- @template = Rinda::Template.new(ary)
- end
-
- def match(tuple)
- @template.match(tuple)
- end
-
- def ===(tuple)
- match(tuple)
- end
-
- def make_tuple(ary)
- Rinda::Template.new(ary)
- end
- end
-
- class WaitTemplateEntry < TemplateEntry
- def initialize(place, ary, expires=nil)
- super(ary, expires)
- @place = place
- @cond = place.new_cond
- @found = nil
- end
- attr_reader :found
-
- def cancel
- super
- signal
- end
-
- def wait
- @cond.wait
- end
-
- def read(tuple)
- @found = tuple
- signal
- end
-
- def signal
- @place.synchronize do
- @cond.signal
- end
- end
- end
-
- class NotifyTemplateEntry < TemplateEntry
- def initialize(place, event, tuple, expires=nil)
- ary = [event, Rinda::Template.new(tuple)]
- super(ary, expires)
- @queue = Queue.new
- @done = false
- end
-
- def notify(ev)
- @queue.push(ev)
- end
-
- def pop
- raise RequestExpiredError if @done
- it = @queue.pop
- @done = true if it[0] == 'close'
- return it
- end
-
- def each
- while !@done
- it = pop
- yield(it)
- end
- rescue
- ensure
- cancel
- end
- end
-
- class TupleBag
- def initialize
- @hash = {}
- end
-
- def push(ary)
- size = ary.size
- @hash[size] ||= []
- @hash[size].push(ary)
- end
-
- def delete(ary)
- size = ary.size
- @hash.fetch(size, []).delete(ary)
- end
-
- def find_all(template)
- @hash.fetch(template.size, []).find_all do |tuple|
- tuple.alive? && template.match(tuple)
- end
- end
-
- def find(template)
- @hash.fetch(template.size, []).find do |tuple|
- tuple.alive? && template.match(tuple)
- end
- end
-
- def find_all_template(tuple)
- @hash.fetch(tuple.size, []).find_all do |template|
- template.alive? && template.match(tuple)
- end
- end
-
- def delete_unless_alive
- deleted = []
- @hash.keys.each do |size|
- ary = []
- @hash[size].each do |tuple|
- if tuple.alive?
- ary.push(tuple)
- else
- deleted.push(tuple)
- end
- end
- @hash[size] = ary
- end
- deleted
- end
- end
-
- class TupleSpace
- include DRbUndumped
- include MonitorMixin
- def initialize(timeout=60)
- super()
- @bag = TupleBag.new
- @read_waiter = TupleBag.new
- @take_waiter = TupleBag.new
- @notify_waiter = TupleBag.new
- @timeout = timeout
- @period = timeout * 2
- @keeper = keeper
- end
-
- def write(tuple, sec=nil)
- entry = TupleEntry.new(tuple, sec)
- synchronize do
- if entry.expired?
- @read_waiter.find_all_template(entry).each do |template|
- template.read(tuple)
- end
- notify_event('write', entry.value)
- notify_event('delete', entry.value)
- else
- @bag.push(entry)
- @read_waiter.find_all_template(entry).each do |template|
- template.read(tuple)
- end
- @take_waiter.find_all_template(entry).each do |template|
- template.signal
- end
- notify_event('write', entry.value)
- end
- end
- entry
- end
-
- def take(tuple, sec=nil, &block)
- move(nil, tuple, sec, &block)
- end
-
- def move(port, tuple, sec=nil)
- template = WaitTemplateEntry.new(self, tuple, sec)
- yield(template) if block_given?
- synchronize do
- entry = @bag.find(template)
- if entry
- port.push(entry.value) if port
- @bag.delete(entry)
- notify_event('take', entry.value)
- return entry.value
- end
- return nil if template.expired?
-
- begin
- @take_waiter.push(template)
- while true
- raise RequestCanceledError if template.canceled?
- raise RequestExpiredError if template.expired?
- entry = @bag.find(template)
- if entry
- port.push(entry.value) if port
- @bag.delete(entry)
- notify_event('take', entry.value)
- return entry.value
- end
- template.wait
- end
- ensure
- @take_waiter.delete(template)
- end
- end
- end
-
- def read(tuple, sec=nil)
- template = WaitTemplateEntry.new(self, tuple, sec)
- yield(template) if block_given?
- synchronize do
- entry = @bag.find(template)
- return entry.value if entry
- return nil if template.expired?
-
- begin
- @read_waiter.push(template)
- template.wait
- raise RequestCanceledError if template.canceled?
- raise RequestExpiredError if template.expired?
- return template.found
- ensure
- @read_waiter.delete(template)
- end
- end
- end
-
- def read_all(tuple)
- template = WaitTemplateEntry.new(self, tuple, nil)
- synchronize do
- entry = @bag.find_all(template)
- entry.collect do |e|
- e.value
- end
- end
- end
-
- def notify(event, tuple, sec=nil)
- template = NotifyTemplateEntry.new(self, event, tuple, sec)
- synchronize do
- @notify_waiter.push(template)
- end
- template
- end
-
- private
- def keep_clean
- synchronize do
- @read_waiter.delete_unless_alive.each do |e|
- e.signal
- end
- @take_waiter.delete_unless_alive.each do |e|
- e.signal
- end
- @notify_waiter.delete_unless_alive.each do |e|
- e.notify(['close'])
- end
- @bag.delete_unless_alive.each do |e|
- notify_event('delete', e.value)
- end
- end
- end
-
- def notify_event(event, tuple)
- ev = [event, tuple]
- @notify_waiter.find_all_template(ev).each do |template|
- template.notify(ev)
- end
- end
-
- def keeper
- Thread.new do
- loop do
- sleep(@period)
- keep_clean
- end
- end
- end
- end
-end