summaryrefslogtreecommitdiff
path: root/lib/rinda/tuplespace.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rinda/tuplespace.rb')
-rw-r--r--lib/rinda/tuplespace.rb356
1 files changed, 0 insertions, 356 deletions
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb
deleted file mode 100644
index 4bd4a42af5..0000000000
--- a/lib/rinda/tuplespace.rb
+++ /dev/null
@@ -1,356 +0,0 @@
-require 'monitor'
-require 'thread'
-require 'drb/drb'
-require 'rinda/rinda'
-
-module Rinda
- class TupleEntry
- include DRbUndumped
-
- def initialize(ary, sec=nil)
- @cancel = false
- @ary = make_tuple(ary)
- @renewer = nil
- renew(sec)
- end
- attr_accessor :expires
-
- def cancel
- @cancel = true
- end
-
- def alive?
- !canceled? && !expired?
- end
-
- def value; @ary.value; end
- def canceled?; @cancel; end
- def expired?
- return true unless @expires
- return false if @expires > Time.now
- return true if @renewer.nil?
- renew(@renewer)
- return true unless @expires
- return @expires < Time.now
- end
-
- def renew(sec_or_renewer)
- sec, @renewer = get_renewer(sec_or_renewer)
- @expires = make_expires(sec)
- end
-
- def make_expires(sec=nil)
- case sec
- when Numeric
- Time.now + sec
- when true
- Time.at(1)
- when nil
- Time.at(2**31-1)
- end
- end
-
- def [](key)
- @ary[key]
- end
-
- def size
- @ary.size
- end
-
- def make_tuple(ary)
- Rinda::Tuple.new(ary)
- end
-
- private
- def get_renewer(it)
- case it
- when Numeric, true, nil
- return it, nil
- else
- begin
- return it.renew, it
- rescue Exception
- return it, nil
- end
- end
- end
- end
-
- class TemplateEntry < TupleEntry
- def initialize(ary, expires=nil)
- super(ary, expires)
- @template = Rinda::Template.new(ary)
- end
-
- def match(tuple)
- @template.match(tuple)
- end
-
- def ===(tuple)
- match(tuple)
- end
-
- def make_tuple(ary)
- Rinda::Template.new(ary)
- end
- end
-
- class WaitTemplateEntry < TemplateEntry
- def initialize(place, ary, expires=nil)
- super(ary, expires)
- @place = place
- @cond = place.new_cond
- @found = nil
- end
- attr_reader :found
-
- def cancel
- super
- signal
- end
-
- def wait
- @cond.wait
- end
-
- def read(tuple)
- @found = tuple
- signal
- end
-
- def signal
- @place.synchronize do
- @cond.signal
- end
- end
- end
-
- class NotifyTemplateEntry < TemplateEntry
- def initialize(place, event, tuple, expires=nil)
- ary = [event, Rinda::Template.new(tuple)]
- super(ary, expires)
- @queue = Queue.new
- @done = false
- end
-
- def notify(ev)
- @queue.push(ev)
- end
-
- def pop
- raise RequestExpiredError if @done
- it = @queue.pop
- @done = true if it[0] == 'close'
- return it
- end
-
- def each
- while !@done
- it = pop
- yield(it)
- end
- rescue
- ensure
- cancel
- end
- end
-
- class TupleBag
- def initialize
- @hash = {}
- end
-
- def push(ary)
- size = ary.size
- @hash[size] ||= []
- @hash[size].push(ary)
- end
-
- def delete(ary)
- size = ary.size
- @hash.fetch(size, []).delete(ary)
- end
-
- def find_all(template)
- @hash.fetch(template.size, []).find_all do |tuple|
- tuple.alive? && template.match(tuple)
- end
- end
-
- def find(template)
- @hash.fetch(template.size, []).find do |tuple|
- tuple.alive? && template.match(tuple)
- end
- end
-
- def find_all_template(tuple)
- @hash.fetch(tuple.size, []).find_all do |template|
- template.alive? && template.match(tuple)
- end
- end
-
- def delete_unless_alive
- deleted = []
- @hash.keys.each do |size|
- ary = []
- @hash[size].each do |tuple|
- if tuple.alive?
- ary.push(tuple)
- else
- deleted.push(tuple)
- end
- end
- @hash[size] = ary
- end
- deleted
- end
- end
-
- class TupleSpace
- include DRbUndumped
- include MonitorMixin
- def initialize(timeout=60)
- super()
- @bag = TupleBag.new
- @read_waiter = TupleBag.new
- @take_waiter = TupleBag.new
- @notify_waiter = TupleBag.new
- @timeout = timeout
- @period = timeout * 2
- @keeper = keeper
- end
-
- def write(tuple, sec=nil)
- entry = TupleEntry.new(tuple, sec)
- synchronize do
- if entry.expired?
- @read_waiter.find_all_template(entry).each do |template|
- template.read(tuple)
- end
- notify_event('write', entry.value)
- notify_event('delete', entry.value)
- else
- @bag.push(entry)
- @read_waiter.find_all_template(entry).each do |template|
- template.read(tuple)
- end
- @take_waiter.find_all_template(entry).each do |template|
- template.signal
- end
- notify_event('write', entry.value)
- end
- end
- entry
- end
-
- def take(tuple, sec=nil, &block)
- move(nil, tuple, sec, &block)
- end
-
- def move(port, tuple, sec=nil)
- template = WaitTemplateEntry.new(self, tuple, sec)
- yield(template) if block_given?
- synchronize do
- entry = @bag.find(template)
- if entry
- port.push(entry.value) if port
- @bag.delete(entry)
- notify_event('take', entry.value)
- return entry.value
- end
- return nil if template.expired?
-
- begin
- @take_waiter.push(template)
- while true
- raise RequestCanceledError if template.canceled?
- raise RequestExpiredError if template.expired?
- entry = @bag.find(template)
- if entry
- port.push(entry.value) if port
- @bag.delete(entry)
- notify_event('take', entry.value)
- return entry.value
- end
- template.wait
- end
- ensure
- @take_waiter.delete(template)
- end
- end
- end
-
- def read(tuple, sec=nil)
- template = WaitTemplateEntry.new(self, tuple, sec)
- yield(template) if block_given?
- synchronize do
- entry = @bag.find(template)
- return entry.value if entry
- return nil if template.expired?
-
- begin
- @read_waiter.push(template)
- template.wait
- raise RequestCanceledError if template.canceled?
- raise RequestExpiredError if template.expired?
- return template.found
- ensure
- @read_waiter.delete(template)
- end
- end
- end
-
- def read_all(tuple)
- template = WaitTemplateEntry.new(self, tuple, nil)
- synchronize do
- entry = @bag.find_all(template)
- entry.collect do |e|
- e.value
- end
- end
- end
-
- def notify(event, tuple, sec=nil)
- template = NotifyTemplateEntry.new(self, event, tuple, sec)
- synchronize do
- @notify_waiter.push(template)
- end
- template
- end
-
- private
- def keep_clean
- synchronize do
- @read_waiter.delete_unless_alive.each do |e|
- e.signal
- end
- @take_waiter.delete_unless_alive.each do |e|
- e.signal
- end
- @notify_waiter.delete_unless_alive.each do |e|
- e.notify(['close'])
- end
- @bag.delete_unless_alive.each do |e|
- notify_event('delete', e.value)
- end
- end
- end
-
- def notify_event(event, tuple)
- ev = [event, tuple]
- @notify_waiter.find_all_template(ev).each do |template|
- template.notify(ev)
- end
- end
-
- def keeper
- Thread.new do
- loop do
- sleep(@period)
- keep_clean
- end
- end
- end
- end
-end