summaryrefslogtreecommitdiff
path: root/lib/rinda
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rinda')
-rw-r--r--lib/rinda/rinda.gemspec28
-rw-r--r--lib/rinda/rinda.rb327
-rw-r--r--lib/rinda/ring.rb484
-rw-r--r--lib/rinda/tuplespace.rb641
4 files changed, 0 insertions, 1480 deletions
diff --git a/lib/rinda/rinda.gemspec b/lib/rinda/rinda.gemspec
deleted file mode 100644
index 1cc5a453d8..0000000000
--- a/lib/rinda/rinda.gemspec
+++ /dev/null
@@ -1,28 +0,0 @@
-Gem::Specification.new do |spec|
- spec.name = "rinda"
- spec.version = "0.1.0"
- spec.authors = ["Masatoshi SEKI"]
- spec.email = ["seki@ruby-lang.org"]
-
- spec.summary = %q{The Linda distributed computing paradigm in Ruby.}
- spec.description = %q{The Linda distributed computing paradigm in Ruby.}
- spec.homepage = "https://github.com/ruby/rinda"
- spec.required_ruby_version = Gem::Requirement.new(">= 2.3.0")
- spec.licenses = ["Ruby", "BSD-2-Clause"]
-
- spec.metadata["homepage_uri"] = spec.homepage
- spec.metadata["source_code_uri"] = spec.homepage
-
- # Specify which files should be added to the gem when it is released.
- # The `git ls-files -z` loads the files in the RubyGem that have been added into git.
- spec.files = Dir.chdir(File.expand_path('..', __FILE__)) do
- `git ls-files -z`.split("\x0").reject { |f| f.match(%r{^(test|spec|features)/}) }
- end
- spec.bindir = "exe"
- spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
- spec.require_paths = ["lib"]
-
- spec.add_dependency "drb"
- spec.add_dependency "ipaddr"
- spec.add_dependency "forwardable"
-end
diff --git a/lib/rinda/rinda.rb b/lib/rinda/rinda.rb
deleted file mode 100644
index e762286d3b..0000000000
--- a/lib/rinda/rinda.rb
+++ /dev/null
@@ -1,327 +0,0 @@
-# frozen_string_literal: false
-require 'drb/drb'
-
-##
-# A module to implement the Linda distributed computing paradigm in Ruby.
-#
-# Rinda is part of DRb (dRuby).
-#
-# == Example(s)
-#
-# See the sample/drb/ directory in the Ruby distribution, from 1.8.2 onwards.
-#
-#--
-# TODO
-# == Introduction to Linda/rinda?
-#
-# == Why is this library separate from DRb?
-
-module Rinda
-
- ##
- # Rinda error base class
-
- class RindaError < RuntimeError; end
-
- ##
- # Raised when a hash-based tuple has an invalid key.
-
- class InvalidHashTupleKey < RindaError; end
-
- ##
- # Raised when trying to use a canceled tuple.
-
- class RequestCanceledError < ThreadError; end
-
- ##
- # Raised when trying to use an expired tuple.
-
- class RequestExpiredError < ThreadError; end
-
- ##
- # A tuple is the elementary object in Rinda programming.
- # Tuples may be matched against templates if the tuple and
- # the template are the same size.
-
- class Tuple
-
- ##
- # Creates a new Tuple from +ary_or_hash+ which must be an Array or Hash.
-
- def initialize(ary_or_hash)
- if hash?(ary_or_hash)
- init_with_hash(ary_or_hash)
- else
- init_with_ary(ary_or_hash)
- end
- end
-
- ##
- # The number of elements in the tuple.
-
- def size
- @tuple.size
- end
-
- ##
- # Accessor method for elements of the tuple.
-
- def [](k)
- @tuple[k]
- end
-
- ##
- # Fetches item +k+ from the tuple.
-
- def fetch(k)
- @tuple.fetch(k)
- end
-
- ##
- # Iterate through the tuple, yielding the index or key, and the
- # value, thus ensuring arrays are iterated similarly to hashes.
-
- def each # FIXME
- if Hash === @tuple
- @tuple.each { |k, v| yield(k, v) }
- else
- @tuple.each_with_index { |v, k| yield(k, v) }
- end
- end
-
- ##
- # Return the tuple itself
- def value
- @tuple
- end
-
- private
-
- def hash?(ary_or_hash)
- ary_or_hash.respond_to?(:keys)
- end
-
- ##
- # Munges +ary+ into a valid Tuple.
-
- def init_with_ary(ary)
- @tuple = Array.new(ary.size)
- @tuple.size.times do |i|
- @tuple[i] = ary[i]
- end
- end
-
- ##
- # Ensures +hash+ is a valid Tuple.
-
- def init_with_hash(hash)
- @tuple = Hash.new
- hash.each do |k, v|
- raise InvalidHashTupleKey unless String === k
- @tuple[k] = v
- end
- end
-
- end
-
- ##
- # Templates are used to match tuples in Rinda.
-
- class Template < Tuple
-
- ##
- # Matches this template against +tuple+. The +tuple+ must be the same
- # size as the template. An element with a +nil+ value in a template acts
- # as a wildcard, matching any value in the corresponding position in the
- # tuple. Elements of the template match the +tuple+ if the are #== or
- # #===.
- #
- # Template.new([:foo, 5]).match Tuple.new([:foo, 5]) # => true
- # Template.new([:foo, nil]).match Tuple.new([:foo, 5]) # => true
- # Template.new([String]).match Tuple.new(['hello']) # => true
- #
- # Template.new([:foo]).match Tuple.new([:foo, 5]) # => false
- # Template.new([:foo, 6]).match Tuple.new([:foo, 5]) # => false
- # Template.new([:foo, nil]).match Tuple.new([:foo]) # => false
- # Template.new([:foo, 6]).match Tuple.new([:foo]) # => false
-
- def match(tuple)
- return false unless tuple.respond_to?(:size)
- return false unless tuple.respond_to?(:fetch)
- return false unless self.size == tuple.size
- each do |k, v|
- begin
- it = tuple.fetch(k)
- rescue
- return false
- end
- next if v.nil?
- next if v == it
- next if v === it
- return false
- end
- return true
- end
-
- ##
- # Alias for #match.
-
- def ===(tuple)
- match(tuple)
- end
-
- end
-
- ##
- # <i>Documentation?</i>
-
- class DRbObjectTemplate
-
- ##
- # Creates a new DRbObjectTemplate that will match against +uri+ and +ref+.
-
- def initialize(uri=nil, ref=nil)
- @drb_uri = uri
- @drb_ref = ref
- end
-
- ##
- # This DRbObjectTemplate matches +ro+ if the remote object's drburi and
- # drbref are the same. +nil+ is used as a wildcard.
-
- def ===(ro)
- return true if super(ro)
- unless @drb_uri.nil?
- return false unless (@drb_uri === ro.__drburi rescue false)
- end
- unless @drb_ref.nil?
- return false unless (@drb_ref === ro.__drbref rescue false)
- end
- true
- end
-
- end
-
- ##
- # TupleSpaceProxy allows a remote Tuplespace to appear as local.
-
- class TupleSpaceProxy
- ##
- # A Port ensures that a moved tuple arrives properly at its destination
- # and does not get lost.
- #
- # See https://bugs.ruby-lang.org/issues/8125
-
- class Port # :nodoc:
- attr_reader :value
-
- def self.deliver
- port = new
-
- begin
- yield(port)
- ensure
- port.close
- end
-
- port.value
- end
-
- def initialize
- @open = true
- @value = nil
- end
-
- ##
- # Don't let the DRb thread push to it when remote sends tuple
-
- def close
- @open = false
- end
-
- ##
- # Stores +value+ and ensure it does not get marshaled multiple times.
-
- def push value
- raise 'port closed' unless @open
-
- @value = value
-
- nil # avoid Marshal
- end
- end
-
- ##
- # Creates a new TupleSpaceProxy to wrap +ts+.
-
- def initialize(ts)
- @ts = ts
- end
-
- ##
- # Adds +tuple+ to the proxied TupleSpace. See TupleSpace#write.
-
- def write(tuple, sec=nil)
- @ts.write(tuple, sec)
- end
-
- ##
- # Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take.
-
- def take(tuple, sec=nil, &block)
- Port.deliver do |port|
- @ts.move(DRbObject.new(port), tuple, sec, &block)
- end
- end
-
- ##
- # Reads +tuple+ from the proxied TupleSpace. See TupleSpace#read.
-
- def read(tuple, sec=nil, &block)
- @ts.read(tuple, sec, &block)
- end
-
- ##
- # Reads all tuples matching +tuple+ from the proxied TupleSpace. See
- # TupleSpace#read_all.
-
- def read_all(tuple)
- @ts.read_all(tuple)
- end
-
- ##
- # Registers for notifications of event +ev+ on the proxied TupleSpace.
- # See TupleSpace#notify
-
- def notify(ev, tuple, sec=nil)
- @ts.notify(ev, tuple, sec)
- end
-
- end
-
- ##
- # An SimpleRenewer allows a TupleSpace to check if a TupleEntry is still
- # alive.
-
- class SimpleRenewer
-
- include DRbUndumped
-
- ##
- # Creates a new SimpleRenewer that keeps an object alive for another +sec+
- # seconds.
-
- def initialize(sec=180)
- @sec = sec
- end
-
- ##
- # Called by the TupleSpace to check if the object is still alive.
-
- def renew
- @sec
- end
- end
-
-end
-
diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb
deleted file mode 100644
index 948cfaf208..0000000000
--- a/lib/rinda/ring.rb
+++ /dev/null
@@ -1,484 +0,0 @@
-# frozen_string_literal: false
-#
-# Note: Rinda::Ring API is unstable.
-#
-require 'drb/drb'
-require_relative 'rinda'
-require 'ipaddr'
-
-module Rinda
-
- ##
- # The default port Ring discovery will use.
-
- Ring_PORT = 7647
-
- ##
- # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts.
- # Default service location uses the following steps:
- #
- # 1. A RingServer begins listening on the network broadcast UDP address.
- # 2. A RingFinger sends a UDP packet containing the DRb URI where it will
- # listen for a reply.
- # 3. The RingServer receives the UDP packet and connects back to the
- # provided DRb URI with the DRb service.
- #
- # A RingServer requires a TupleSpace:
- #
- # ts = Rinda::TupleSpace.new
- # rs = Rinda::RingServer.new
- #
- # RingServer can also listen on multicast addresses for announcements. This
- # allows multiple RingServers to run on the same host. To use network
- # broadcast and multicast:
- #
- # ts = Rinda::TupleSpace.new
- # rs = Rinda::RingServer.new ts, %w[Socket::INADDR_ANY, 239.0.0.1 ff02::1]
-
- class RingServer
-
- include DRbUndumped
-
- ##
- # Special renewer for the RingServer to allow shutdown
-
- class Renewer # :nodoc:
- include DRbUndumped
-
- ##
- # Set to false to shutdown future requests using this Renewer
-
- attr_writer :renew
-
- def initialize # :nodoc:
- @renew = true
- end
-
- def renew # :nodoc:
- @renew ? 1 : true
- end
- end
-
- ##
- # Advertises +ts+ on the given +addresses+ at +port+.
- #
- # If +addresses+ is omitted only the UDP broadcast address is used.
- #
- # +addresses+ can contain multiple addresses. If a multicast address is
- # given in +addresses+ then the RingServer will listen for multicast
- # queries.
- #
- # If you use IPv4 multicast you may need to set an address of the inbound
- # interface which joins a multicast group.
- #
- # ts = Rinda::TupleSpace.new
- # rs = Rinda::RingServer.new(ts, [['239.0.0.1', '9.5.1.1']])
- #
- # You can set addresses as an Array Object. The first element of the
- # Array is a multicast address and the second is an inbound interface
- # address. If the second is omitted then '0.0.0.0' is used.
- #
- # If you use IPv6 multicast you may need to set both the local interface
- # address and the inbound interface index:
- #
- # rs = Rinda::RingServer.new(ts, [['ff02::1', '::1', 1]])
- #
- # The first element is a multicast address and the second is an inbound
- # interface address. The third is an inbound interface index.
- #
- # At this time there is no easy way to get an interface index by name.
- #
- # If the second is omitted then '::1' is used.
- # If the third is omitted then 0 (default interface) is used.
-
- def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
- @port = port
-
- if Integer === addresses then
- addresses, @port = [Socket::INADDR_ANY], addresses
- end
-
- @renewer = Renewer.new
-
- @ts = ts
- @sockets = []
- addresses.each do |address|
- if Array === address
- make_socket(*address)
- else
- make_socket(address)
- end
- end
-
- @w_services = write_services
- @r_service = reply_service
- end
-
- ##
- # Creates a socket at +address+
- #
- # If +address+ is multicast address then +interface_address+ and
- # +multicast_interface+ can be set as optional.
- #
- # A created socket is bound to +interface_address+. If you use IPv4
- # multicast then the interface of +interface_address+ is used as the
- # inbound interface. If +interface_address+ is omitted or nil then
- # '0.0.0.0' or '::1' is used.
- #
- # If you use IPv6 multicast then +multicast_interface+ is used as the
- # inbound interface. +multicast_interface+ is a network interface index.
- # If +multicast_interface+ is omitted then 0 (default interface) is used.
-
- def make_socket(address, interface_address=nil, multicast_interface=0)
- addrinfo = Addrinfo.udp(address, @port)
-
- socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
- addrinfo.protocol)
-
- if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
- if Socket.const_defined?(:SO_REUSEPORT) then
- socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
- else
- socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
- end
-
- if addrinfo.ipv4_multicast? then
- interface_address = '0.0.0.0' if interface_address.nil?
- socket.bind(Addrinfo.udp(interface_address, @port))
-
- mreq = IPAddr.new(addrinfo.ip_address).hton +
- IPAddr.new(interface_address).hton
-
- socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
- else
- interface_address = '::1' if interface_address.nil?
- socket.bind(Addrinfo.udp(interface_address, @port))
-
- mreq = IPAddr.new(addrinfo.ip_address).hton +
- [multicast_interface].pack('I')
-
- socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
- end
- else
- socket.bind(addrinfo)
- end
-
- socket
- rescue
- socket = socket.close if socket
- raise
- ensure
- @sockets << socket if socket
- end
-
- ##
- # Creates threads that pick up UDP packets and passes them to do_write for
- # decoding.
-
- def write_services
- @sockets.map do |s|
- Thread.new(s) do |socket|
- loop do
- msg = socket.recv(1024)
- do_write(msg)
- end
- end
- end
- end
-
- ##
- # Extracts the response URI from +msg+ and adds it to TupleSpace where it
- # will be picked up by +reply_service+ for notification.
-
- def do_write(msg)
- Thread.new do
- begin
- tuple, sec = Marshal.load(msg)
- @ts.write(tuple, sec)
- rescue
- end
- end
- end
-
- ##
- # Creates a thread that notifies waiting clients from the TupleSpace.
-
- def reply_service
- Thread.new do
- loop do
- do_reply
- end
- end
- end
-
- ##
- # Pulls lookup tuples out of the TupleSpace and sends their DRb object the
- # address of the local TupleSpace.
-
- def do_reply
- tuple = @ts.take([:lookup_ring, nil], @renewer)
- Thread.new { tuple[1].call(@ts) rescue nil}
- rescue
- end
-
- ##
- # Shuts down the RingServer
-
- def shutdown
- @renewer.renew = false
-
- @w_services.each do |thread|
- thread.kill
- thread.join
- end
-
- @sockets.each do |socket|
- socket.close
- end
-
- @r_service.kill
- @r_service.join
- end
-
- end
-
- ##
- # RingFinger is used by RingServer clients to discover the RingServer's
- # TupleSpace. Typically, all a client needs to do is call
- # RingFinger.primary to retrieve the remote TupleSpace, which it can then
- # begin using.
- #
- # To find the first available remote TupleSpace:
- #
- # Rinda::RingFinger.primary
- #
- # To create a RingFinger that broadcasts to a custom list:
- #
- # rf = Rinda::RingFinger.new ['localhost', '192.0.2.1']
- # rf.primary
- #
- # Rinda::RingFinger also understands multicast addresses and sets them up
- # properly. This allows you to run multiple RingServers on the same host:
- #
- # rf = Rinda::RingFinger.new ['239.0.0.1']
- # rf.primary
- #
- # You can set the hop count (or TTL) for multicast searches using
- # #multicast_hops.
- #
- # If you use IPv6 multicast you may need to set both an address and the
- # outbound interface index:
- #
- # rf = Rinda::RingFinger.new ['ff02::1']
- # rf.multicast_interface = 1
- # rf.primary
- #
- # At this time there is no easy way to get an interface index by name.
-
- class RingFinger
-
- @@broadcast_list = ['<broadcast>', 'localhost']
-
- @@finger = nil
-
- ##
- # Creates a singleton RingFinger and looks for a RingServer. Returns the
- # created RingFinger.
-
- def self.finger
- unless @@finger
- @@finger = self.new
- @@finger.lookup_ring_any
- end
- @@finger
- end
-
- ##
- # Returns the first advertised TupleSpace.
-
- def self.primary
- finger.primary
- end
-
- ##
- # Contains all discovered TupleSpaces except for the primary.
-
- def self.to_a
- finger.to_a
- end
-
- ##
- # The list of addresses where RingFinger will send query packets.
-
- attr_accessor :broadcast_list
-
- ##
- # Maximum number of hops for sent multicast packets (if using a multicast
- # address in the broadcast list). The default is 1 (same as UDP
- # broadcast).
-
- attr_accessor :multicast_hops
-
- ##
- # The interface index to send IPv6 multicast packets from.
-
- attr_accessor :multicast_interface
-
- ##
- # The port that RingFinger will send query packets to.
-
- attr_accessor :port
-
- ##
- # Contain the first advertised TupleSpace after lookup_ring_any is called.
-
- attr_accessor :primary
-
- ##
- # Creates a new RingFinger that will look for RingServers at +port+ on
- # the addresses in +broadcast_list+.
- #
- # If +broadcast_list+ contains a multicast address then multicast queries
- # will be made using the given multicast_hops and multicast_interface.
-
- def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
- @broadcast_list = broadcast_list || ['localhost']
- @port = port
- @primary = nil
- @rings = []
-
- @multicast_hops = 1
- @multicast_interface = 0
- end
-
- ##
- # Contains all discovered TupleSpaces except for the primary.
-
- def to_a
- @rings
- end
-
- ##
- # Iterates over all discovered TupleSpaces starting with the primary.
-
- def each
- lookup_ring_any unless @primary
- return unless @primary
- yield(@primary)
- @rings.each { |x| yield(x) }
- end
-
- ##
- # Looks up RingServers waiting +timeout+ seconds. RingServers will be
- # given +block+ as a callback, which will be called with the remote
- # TupleSpace.
-
- def lookup_ring(timeout=5, &block)
- return lookup_ring_any(timeout) unless block_given?
-
- msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
- @broadcast_list.each do |it|
- send_message(it, msg)
- end
- sleep(timeout)
- end
-
- ##
- # Returns the first found remote TupleSpace. Any further recovered
- # TupleSpaces can be found by calling +to_a+.
-
- def lookup_ring_any(timeout=5)
- queue = Thread::Queue.new
-
- Thread.new do
- self.lookup_ring(timeout) do |ts|
- queue.push(ts)
- end
- queue.push(nil)
- end
-
- @primary = queue.pop
- raise('RingNotFound') if @primary.nil?
-
- Thread.new do
- while it = queue.pop
- @rings.push(it)
- end
- end
-
- @primary
- end
-
- ##
- # Creates a socket for +address+ with the appropriate multicast options
- # for multicast addresses.
-
- def make_socket(address) # :nodoc:
- addrinfo = Addrinfo.udp(address, @port)
-
- soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
- begin
- if addrinfo.ipv4_multicast? then
- soc.setsockopt(Socket::Option.ipv4_multicast_loop(1))
- soc.setsockopt(Socket::Option.ipv4_multicast_ttl(@multicast_hops))
- elsif addrinfo.ipv6_multicast? then
- soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
- soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
- [@multicast_hops].pack('I'))
- soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
- [@multicast_interface].pack('I'))
- else
- soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
- end
-
- soc.connect(addrinfo)
- rescue Exception
- soc.close
- raise
- end
-
- soc
- end
-
- def send_message(address, message) # :nodoc:
- soc = make_socket(address)
-
- soc.send(message, 0)
- rescue
- nil
- ensure
- soc.close if soc
- end
-
- end
-
- ##
- # RingProvider uses a RingServer advertised TupleSpace as a name service.
- # TupleSpace clients can register themselves with the remote TupleSpace and
- # look up other provided services via the remote TupleSpace.
- #
- # Services are registered with a tuple of the format [:name, klass,
- # DRbObject, description].
-
- class RingProvider
-
- ##
- # Creates a RingProvider that will provide a +klass+ service running on
- # +front+, with a +description+. +renewer+ is optional.
-
- def initialize(klass, front, desc, renewer = nil)
- @tuple = [:name, klass, front, desc]
- @renewer = renewer || Rinda::SimpleRenewer.new
- end
-
- ##
- # Advertises this service on the primary remote TupleSpace.
-
- def provide
- ts = Rinda::RingFinger.primary
- ts.write(@tuple, @renewer)
- end
-
- end
-
-end
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb
deleted file mode 100644
index 6a41a7ba75..0000000000
--- a/lib/rinda/tuplespace.rb
+++ /dev/null
@@ -1,641 +0,0 @@
-# frozen_string_literal: false
-require 'monitor'
-require 'drb/drb'
-require_relative 'rinda'
-require 'forwardable'
-
-module Rinda
-
- ##
- # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace)
- # together with expiry and cancellation data.
-
- class TupleEntry
-
- include DRbUndumped
-
- attr_accessor :expires
-
- ##
- # Creates a TupleEntry based on +ary+ with an optional renewer or expiry
- # time +sec+.
- #
- # A renewer must implement the +renew+ method which returns a Numeric,
- # nil, or true to indicate when the tuple has expired.
-
- def initialize(ary, sec=nil)
- @cancel = false
- @expires = nil
- @tuple = make_tuple(ary)
- @renewer = nil
- renew(sec)
- end
-
- ##
- # Marks this TupleEntry as canceled.
-
- def cancel
- @cancel = true
- end
-
- ##
- # A TupleEntry is dead when it is canceled or expired.
-
- def alive?
- !canceled? && !expired?
- end
-
- ##
- # Return the object which makes up the tuple itself: the Array
- # or Hash.
-
- def value; @tuple.value; end
-
- ##
- # Returns the canceled status.
-
- def canceled?; @cancel; end
-
- ##
- # Has this tuple expired? (true/false).
- #
- # A tuple has expired when its expiry timer based on the +sec+ argument to
- # #initialize runs out.
-
- def expired?
- return true unless @expires
- return false if @expires > Time.now
- return true if @renewer.nil?
- renew(@renewer)
- return true unless @expires
- return @expires < Time.now
- end
-
- ##
- # Reset the expiry time according to +sec_or_renewer+.
- #
- # +nil+:: it is set to expire in the far future.
- # +true+:: it has expired.
- # Numeric:: it will expire in that many seconds.
- #
- # Otherwise the argument refers to some kind of renewer object
- # which will reset its expiry time.
-
- def renew(sec_or_renewer)
- sec, @renewer = get_renewer(sec_or_renewer)
- @expires = make_expires(sec)
- end
-
- ##
- # Returns an expiry Time based on +sec+ which can be one of:
- # Numeric:: +sec+ seconds into the future
- # +true+:: the expiry time is the start of 1970 (i.e. expired)
- # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when
- # UNIX clocks will die)
-
- def make_expires(sec=nil)
- case sec
- when Numeric
- Time.now + sec
- when true
- Time.at(1)
- when nil
- Time.at(2**31-1)
- end
- end
-
- ##
- # Retrieves +key+ from the tuple.
-
- def [](key)
- @tuple[key]
- end
-
- ##
- # Fetches +key+ from the tuple.
-
- def fetch(key)
- @tuple.fetch(key)
- end
-
- ##
- # The size of the tuple.
-
- def size
- @tuple.size
- end
-
- ##
- # Creates a Rinda::Tuple for +ary+.
-
- def make_tuple(ary)
- Rinda::Tuple.new(ary)
- end
-
- private
-
- ##
- # Returns a valid argument to make_expires and the renewer or nil.
- #
- # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual
- # renewer). Otherwise it returns an expiry value from calling +it.renew+
- # and the renewer.
-
- def get_renewer(it)
- case it
- when Numeric, true, nil
- return it, nil
- else
- begin
- return it.renew, it
- rescue Exception
- return it, nil
- end
- end
- end
-
- end
-
- ##
- # A TemplateEntry is a Template together with expiry and cancellation data.
-
- class TemplateEntry < TupleEntry
- ##
- # Matches this TemplateEntry against +tuple+. See Template#match for
- # details on how a Template matches a Tuple.
-
- def match(tuple)
- @tuple.match(tuple)
- end
-
- alias === match
-
- def make_tuple(ary) # :nodoc:
- Rinda::Template.new(ary)
- end
-
- end
-
- ##
- # <i>Documentation?</i>
-
- class WaitTemplateEntry < TemplateEntry
-
- attr_reader :found
-
- def initialize(place, ary, expires=nil)
- super(ary, expires)
- @place = place
- @cond = place.new_cond
- @found = nil
- end
-
- def cancel
- super
- signal
- end
-
- def wait
- @cond.wait
- end
-
- def read(tuple)
- @found = tuple
- signal
- end
-
- def signal
- @place.synchronize do
- @cond.signal
- end
- end
-
- end
-
- ##
- # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of
- # TupleSpace changes. You may receive either your subscribed event or the
- # 'close' event when iterating over notifications.
- #
- # See TupleSpace#notify_event for valid notification types.
- #
- # == Example
- #
- # ts = Rinda::TupleSpace.new
- # observer = ts.notify 'write', [nil]
- #
- # Thread.start do
- # observer.each { |t| p t }
- # end
- #
- # 3.times { |i| ts.write [i] }
- #
- # Outputs:
- #
- # ['write', [0]]
- # ['write', [1]]
- # ['write', [2]]
-
- class NotifyTemplateEntry < TemplateEntry
-
- ##
- # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that
- # match +tuple+.
-
- def initialize(place, event, tuple, expires=nil)
- ary = [event, Rinda::Template.new(tuple)]
- super(ary, expires)
- @queue = Thread::Queue.new
- @done = false
- end
-
- ##
- # Called by TupleSpace to notify this NotifyTemplateEntry of a new event.
-
- def notify(ev)
- @queue.push(ev)
- end
-
- ##
- # Retrieves a notification. Raises RequestExpiredError when this
- # NotifyTemplateEntry expires.
-
- def pop
- raise RequestExpiredError if @done
- it = @queue.pop
- @done = true if it[0] == 'close'
- return it
- end
-
- ##
- # Yields event/tuple pairs until this NotifyTemplateEntry expires.
-
- def each # :yields: event, tuple
- while !@done
- it = pop
- yield(it)
- end
- rescue
- ensure
- cancel
- end
-
- end
-
- ##
- # TupleBag is an unordered collection of tuples. It is the basis
- # of Tuplespace.
-
- class TupleBag
- class TupleBin
- extend Forwardable
- def_delegators '@bin', :find_all, :delete_if, :each, :empty?
-
- def initialize
- @bin = []
- end
-
- def add(tuple)
- @bin.push(tuple)
- end
-
- def delete(tuple)
- idx = @bin.rindex(tuple)
- @bin.delete_at(idx) if idx
- end
-
- def find
- @bin.reverse_each do |x|
- return x if yield(x)
- end
- nil
- end
- end
-
- def initialize # :nodoc:
- @hash = {}
- @enum = enum_for(:each_entry)
- end
-
- ##
- # +true+ if the TupleBag to see if it has any expired entries.
-
- def has_expires?
- @enum.find do |tuple|
- tuple.expires
- end
- end
-
- ##
- # Add +tuple+ to the TupleBag.
-
- def push(tuple)
- key = bin_key(tuple)
- @hash[key] ||= TupleBin.new
- @hash[key].add(tuple)
- end
-
- ##
- # Removes +tuple+ from the TupleBag.
-
- def delete(tuple)
- key = bin_key(tuple)
- bin = @hash[key]
- return nil unless bin
- bin.delete(tuple)
- @hash.delete(key) if bin.empty?
- tuple
- end
-
- ##
- # Finds all live tuples that match +template+.
- def find_all(template)
- bin_for_find(template).find_all do |tuple|
- tuple.alive? && template.match(tuple)
- end
- end
-
- ##
- # Finds a live tuple that matches +template+.
-
- def find(template)
- bin_for_find(template).find do |tuple|
- tuple.alive? && template.match(tuple)
- end
- end
-
- ##
- # Finds all tuples in the TupleBag which when treated as templates, match
- # +tuple+ and are alive.
-
- def find_all_template(tuple)
- @enum.find_all do |template|
- template.alive? && template.match(tuple)
- end
- end
-
- ##
- # Delete tuples which dead tuples from the TupleBag, returning the deleted
- # tuples.
-
- def delete_unless_alive
- deleted = []
- @hash.each do |key, bin|
- bin.delete_if do |tuple|
- if tuple.alive?
- false
- else
- deleted.push(tuple)
- true
- end
- end
- end
- deleted
- end
-
- private
- def each_entry(&blk)
- @hash.each do |k, v|
- v.each(&blk)
- end
- end
-
- def bin_key(tuple)
- head = tuple[0]
- if head.class == Symbol
- return head
- else
- false
- end
- end
-
- def bin_for_find(template)
- key = bin_key(template)
- key ? @hash.fetch(key, []) : @enum
- end
- end
-
- ##
- # The Tuplespace manages access to the tuples it contains,
- # ensuring mutual exclusion requirements are met.
- #
- # The +sec+ option for the write, take, move, read and notify methods may
- # either be a number of seconds or a Renewer object.
-
- class TupleSpace
-
- include DRbUndumped
- include MonitorMixin
-
- ##
- # Creates a new TupleSpace. +period+ is used to control how often to look
- # for dead tuples after modifications to the TupleSpace.
- #
- # If no dead tuples are found +period+ seconds after the last
- # modification, the TupleSpace will stop looking for dead tuples.
-
- def initialize(period=60)
- super()
- @bag = TupleBag.new
- @read_waiter = TupleBag.new
- @take_waiter = TupleBag.new
- @notify_waiter = TupleBag.new
- @period = period
- @keeper = nil
- end
-
- ##
- # Adds +tuple+
-
- def write(tuple, sec=nil)
- entry = create_entry(tuple, sec)
- synchronize do
- if entry.expired?
- @read_waiter.find_all_template(entry).each do |template|
- template.read(tuple)
- end
- notify_event('write', entry.value)
- notify_event('delete', entry.value)
- else
- @bag.push(entry)
- start_keeper if entry.expires
- @read_waiter.find_all_template(entry).each do |template|
- template.read(tuple)
- end
- @take_waiter.find_all_template(entry).each do |template|
- template.signal
- end
- notify_event('write', entry.value)
- end
- end
- entry
- end
-
- ##
- # Removes +tuple+
-
- def take(tuple, sec=nil, &block)
- move(nil, tuple, sec, &block)
- end
-
- ##
- # Moves +tuple+ to +port+.
-
- def move(port, tuple, sec=nil)
- template = WaitTemplateEntry.new(self, tuple, sec)
- yield(template) if block_given?
- synchronize do
- entry = @bag.find(template)
- if entry
- port.push(entry.value) if port
- @bag.delete(entry)
- notify_event('take', entry.value)
- return port ? nil : entry.value
- end
- raise RequestExpiredError if template.expired?
-
- begin
- @take_waiter.push(template)
- start_keeper if template.expires
- while true
- raise RequestCanceledError if template.canceled?
- raise RequestExpiredError if template.expired?
- entry = @bag.find(template)
- if entry
- port.push(entry.value) if port
- @bag.delete(entry)
- notify_event('take', entry.value)
- return port ? nil : entry.value
- end
- template.wait
- end
- ensure
- @take_waiter.delete(template)
- end
- end
- end
-
- ##
- # Reads +tuple+, but does not remove it.
-
- def read(tuple, sec=nil)
- template = WaitTemplateEntry.new(self, tuple, sec)
- yield(template) if block_given?
- synchronize do
- entry = @bag.find(template)
- return entry.value if entry
- raise RequestExpiredError if template.expired?
-
- begin
- @read_waiter.push(template)
- start_keeper if template.expires
- template.wait
- raise RequestCanceledError if template.canceled?
- raise RequestExpiredError if template.expired?
- return template.found
- ensure
- @read_waiter.delete(template)
- end
- end
- end
-
- ##
- # Returns all tuples matching +tuple+. Does not remove the found tuples.
-
- def read_all(tuple)
- template = WaitTemplateEntry.new(self, tuple, nil)
- synchronize do
- entry = @bag.find_all(template)
- entry.collect do |e|
- e.value
- end
- end
- end
-
- ##
- # Registers for notifications of +event+. Returns a NotifyTemplateEntry.
- # See NotifyTemplateEntry for examples of how to listen for notifications.
- #
- # +event+ can be:
- # 'write':: A tuple was added
- # 'take':: A tuple was taken or moved
- # 'delete':: A tuple was lost after being overwritten or expiring
- #
- # The TupleSpace will also notify you of the 'close' event when the
- # NotifyTemplateEntry has expired.
-
- def notify(event, tuple, sec=nil)
- template = NotifyTemplateEntry.new(self, event, tuple, sec)
- synchronize do
- @notify_waiter.push(template)
- end
- template
- end
-
- private
-
- def create_entry(tuple, sec)
- TupleEntry.new(tuple, sec)
- end
-
- ##
- # Removes dead tuples.
-
- def keep_clean
- synchronize do
- @read_waiter.delete_unless_alive.each do |e|
- e.signal
- end
- @take_waiter.delete_unless_alive.each do |e|
- e.signal
- end
- @notify_waiter.delete_unless_alive.each do |e|
- e.notify(['close'])
- end
- @bag.delete_unless_alive.each do |e|
- notify_event('delete', e.value)
- end
- end
- end
-
- ##
- # Notifies all registered listeners for +event+ of a status change of
- # +tuple+.
-
- def notify_event(event, tuple)
- ev = [event, tuple]
- @notify_waiter.find_all_template(ev).each do |template|
- template.notify(ev)
- end
- end
-
- ##
- # Creates a thread that scans the tuplespace for expired tuples.
-
- def start_keeper
- return if @keeper && @keeper.alive?
- @keeper = Thread.new do
- while true
- sleep(@period)
- synchronize do
- break unless need_keeper?
- keep_clean
- end
- end
- end
- end
-
- ##
- # Checks the tuplespace to see if it needs cleaning.
-
- def need_keeper?
- return true if @bag.has_expires?
- return true if @read_waiter.has_expires?
- return true if @take_waiter.has_expires?
- return true if @notify_waiter.has_expires?
- end
-
- end
-
-end
-