summaryrefslogtreecommitdiff
path: root/lib/rinda/tuplespace.rb
diff options
context:
space:
mode:
authorseki <seki@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2003-10-05 12:23:33 +0000
committerseki <seki@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2003-10-05 12:23:33 +0000
commitfde4c2dce890a774c5285d357c2f0ebe861e8b7c (patch)
tree09bb0a5af94d240039e3c09cbcf32a41b82565af /lib/rinda/tuplespace.rb
parent4aa8b47bb9a7ba33828b7a1b654a3c4ed1d8bdf9 (diff)
add rinda, (import from drb-2.0.4)
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@4696 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'lib/rinda/tuplespace.rb')
-rw-r--r--lib/rinda/tuplespace.rb356
1 files changed, 356 insertions, 0 deletions
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb
new file mode 100644
index 0000000000..4bd4a42af5
--- /dev/null
+++ b/lib/rinda/tuplespace.rb
@@ -0,0 +1,356 @@
+require 'monitor'
+require 'thread'
+require 'drb/drb'
+require 'rinda/rinda'
+
+module Rinda
+ class TupleEntry
+ include DRbUndumped
+
+ def initialize(ary, sec=nil)
+ @cancel = false
+ @ary = make_tuple(ary)
+ @renewer = nil
+ renew(sec)
+ end
+ attr_accessor :expires
+
+ def cancel
+ @cancel = true
+ end
+
+ def alive?
+ !canceled? && !expired?
+ end
+
+ def value; @ary.value; end
+ def canceled?; @cancel; end
+ def expired?
+ return true unless @expires
+ return false if @expires > Time.now
+ return true if @renewer.nil?
+ renew(@renewer)
+ return true unless @expires
+ return @expires < Time.now
+ end
+
+ def renew(sec_or_renewer)
+ sec, @renewer = get_renewer(sec_or_renewer)
+ @expires = make_expires(sec)
+ end
+
+ def make_expires(sec=nil)
+ case sec
+ when Numeric
+ Time.now + sec
+ when true
+ Time.at(1)
+ when nil
+ Time.at(2**31-1)
+ end
+ end
+
+ def [](key)
+ @ary[key]
+ end
+
+ def size
+ @ary.size
+ end
+
+ def make_tuple(ary)
+ Rinda::Tuple.new(ary)
+ end
+
+ private
+ def get_renewer(it)
+ case it
+ when Numeric, true, nil
+ return it, nil
+ else
+ begin
+ return it.renew, it
+ rescue Exception
+ return it, nil
+ end
+ end
+ end
+ end
+
+ class TemplateEntry < TupleEntry
+ def initialize(ary, expires=nil)
+ super(ary, expires)
+ @template = Rinda::Template.new(ary)
+ end
+
+ def match(tuple)
+ @template.match(tuple)
+ end
+
+ def ===(tuple)
+ match(tuple)
+ end
+
+ def make_tuple(ary)
+ Rinda::Template.new(ary)
+ end
+ end
+
+ class WaitTemplateEntry < TemplateEntry
+ def initialize(place, ary, expires=nil)
+ super(ary, expires)
+ @place = place
+ @cond = place.new_cond
+ @found = nil
+ end
+ attr_reader :found
+
+ def cancel
+ super
+ signal
+ end
+
+ def wait
+ @cond.wait
+ end
+
+ def read(tuple)
+ @found = tuple
+ signal
+ end
+
+ def signal
+ @place.synchronize do
+ @cond.signal
+ end
+ end
+ end
+
+ class NotifyTemplateEntry < TemplateEntry
+ def initialize(place, event, tuple, expires=nil)
+ ary = [event, Rinda::Template.new(tuple)]
+ super(ary, expires)
+ @queue = Queue.new
+ @done = false
+ end
+
+ def notify(ev)
+ @queue.push(ev)
+ end
+
+ def pop
+ raise RequestExpiredError if @done
+ it = @queue.pop
+ @done = true if it[0] == 'close'
+ return it
+ end
+
+ def each
+ while !@done
+ it = pop
+ yield(it)
+ end
+ rescue
+ ensure
+ cancel
+ end
+ end
+
+ class TupleBag
+ def initialize
+ @hash = {}
+ end
+
+ def push(ary)
+ size = ary.size
+ @hash[size] ||= []
+ @hash[size].push(ary)
+ end
+
+ def delete(ary)
+ size = ary.size
+ @hash.fetch(size, []).delete(ary)
+ end
+
+ def find_all(template)
+ @hash.fetch(template.size, []).find_all do |tuple|
+ tuple.alive? && template.match(tuple)
+ end
+ end
+
+ def find(template)
+ @hash.fetch(template.size, []).find do |tuple|
+ tuple.alive? && template.match(tuple)
+ end
+ end
+
+ def find_all_template(tuple)
+ @hash.fetch(tuple.size, []).find_all do |template|
+ template.alive? && template.match(tuple)
+ end
+ end
+
+ def delete_unless_alive
+ deleted = []
+ @hash.keys.each do |size|
+ ary = []
+ @hash[size].each do |tuple|
+ if tuple.alive?
+ ary.push(tuple)
+ else
+ deleted.push(tuple)
+ end
+ end
+ @hash[size] = ary
+ end
+ deleted
+ end
+ end
+
+ class TupleSpace
+ include DRbUndumped
+ include MonitorMixin
+ def initialize(timeout=60)
+ super()
+ @bag = TupleBag.new
+ @read_waiter = TupleBag.new
+ @take_waiter = TupleBag.new
+ @notify_waiter = TupleBag.new
+ @timeout = timeout
+ @period = timeout * 2
+ @keeper = keeper
+ end
+
+ def write(tuple, sec=nil)
+ entry = TupleEntry.new(tuple, sec)
+ synchronize do
+ if entry.expired?
+ @read_waiter.find_all_template(entry).each do |template|
+ template.read(tuple)
+ end
+ notify_event('write', entry.value)
+ notify_event('delete', entry.value)
+ else
+ @bag.push(entry)
+ @read_waiter.find_all_template(entry).each do |template|
+ template.read(tuple)
+ end
+ @take_waiter.find_all_template(entry).each do |template|
+ template.signal
+ end
+ notify_event('write', entry.value)
+ end
+ end
+ entry
+ end
+
+ def take(tuple, sec=nil, &block)
+ move(nil, tuple, sec, &block)
+ end
+
+ def move(port, tuple, sec=nil)
+ template = WaitTemplateEntry.new(self, tuple, sec)
+ yield(template) if block_given?
+ synchronize do
+ entry = @bag.find(template)
+ if entry
+ port.push(entry.value) if port
+ @bag.delete(entry)
+ notify_event('take', entry.value)
+ return entry.value
+ end
+ return nil if template.expired?
+
+ begin
+ @take_waiter.push(template)
+ while true
+ raise RequestCanceledError if template.canceled?
+ raise RequestExpiredError if template.expired?
+ entry = @bag.find(template)
+ if entry
+ port.push(entry.value) if port
+ @bag.delete(entry)
+ notify_event('take', entry.value)
+ return entry.value
+ end
+ template.wait
+ end
+ ensure
+ @take_waiter.delete(template)
+ end
+ end
+ end
+
+ def read(tuple, sec=nil)
+ template = WaitTemplateEntry.new(self, tuple, sec)
+ yield(template) if block_given?
+ synchronize do
+ entry = @bag.find(template)
+ return entry.value if entry
+ return nil if template.expired?
+
+ begin
+ @read_waiter.push(template)
+ template.wait
+ raise RequestCanceledError if template.canceled?
+ raise RequestExpiredError if template.expired?
+ return template.found
+ ensure
+ @read_waiter.delete(template)
+ end
+ end
+ end
+
+ def read_all(tuple)
+ template = WaitTemplateEntry.new(self, tuple, nil)
+ synchronize do
+ entry = @bag.find_all(template)
+ entry.collect do |e|
+ e.value
+ end
+ end
+ end
+
+ def notify(event, tuple, sec=nil)
+ template = NotifyTemplateEntry.new(self, event, tuple, sec)
+ synchronize do
+ @notify_waiter.push(template)
+ end
+ template
+ end
+
+ private
+ def keep_clean
+ synchronize do
+ @read_waiter.delete_unless_alive.each do |e|
+ e.signal
+ end
+ @take_waiter.delete_unless_alive.each do |e|
+ e.signal
+ end
+ @notify_waiter.delete_unless_alive.each do |e|
+ e.notify(['close'])
+ end
+ @bag.delete_unless_alive.each do |e|
+ notify_event('delete', e.value)
+ end
+ end
+ end
+
+ def notify_event(event, tuple)
+ ev = [event, tuple]
+ @notify_waiter.find_all_template(ev).each do |template|
+ template.notify(ev)
+ end
+ end
+
+ def keeper
+ Thread.new do
+ loop do
+ sleep(@period)
+ keep_clean
+ end
+ end
+ end
+ end
+end