diff options
Diffstat (limited to 'lib/drb')
| -rw-r--r-- | lib/drb/acl.rb | 179 | ||||
| -rw-r--r-- | lib/drb/drb.rb | 752 | ||||
| -rw-r--r-- | lib/drb/eq.rb | 5 | ||||
| -rw-r--r-- | lib/drb/extserv.rb | 50 | ||||
| -rw-r--r-- | lib/drb/extservm.rb | 75 | ||||
| -rw-r--r-- | lib/drb/gw.rb | 43 | ||||
| -rw-r--r-- | lib/drb/invokemethod.rb | 15 | ||||
| -rw-r--r-- | lib/drb/observer.rb | 24 | ||||
| -rw-r--r-- | lib/drb/ssl.rb | 342 | ||||
| -rw-r--r-- | lib/drb/timeridconv.rb | 110 | ||||
| -rw-r--r-- | lib/drb/unix.rb | 64 |
11 files changed, 1030 insertions, 629 deletions
diff --git a/lib/drb/acl.rb b/lib/drb/acl.rb index aa86dbe70f..b004656f09 100644 --- a/lib/drb/acl.rb +++ b/lib/drb/acl.rb @@ -1,5 +1,4 @@ -# acl-2.0 - simple Access Control List -# +# frozen_string_literal: false # Copyright (c) 2000,2002,2003 Masatoshi SEKI # # acl.rb is copyrighted free software by Masatoshi SEKI. @@ -7,78 +6,176 @@ require 'ipaddr' +## +# Simple Access Control Lists. +# +# Access control lists are composed of "allow" and "deny" halves to control +# access. Use "all" or "*" to match any address. To match a specific address +# use any address or address mask that IPAddr can understand. +# +# Example: +# +# list = %w[ +# deny all +# allow 192.168.1.1 +# allow ::ffff:192.168.1.2 +# allow 192.168.1.3 +# ] +# +# # From Socket#peeraddr, see also ACL#allow_socket? +# addr = ["AF_INET", 10, "lc630", "192.168.1.3"] +# +# acl = ACL.new +# p acl.allow_addr?(addr) # => true +# +# acl = ACL.new(list, ACL::DENY_ALLOW) +# p acl.allow_addr?(addr) # => true + class ACL + + ## + # The current version of ACL + VERSION=["2.0.0"] + + ## + # An entry in an ACL + class ACLEntry + + ## + # Creates a new entry using +str+. + # + # +str+ may be "*" or "all" to match any address, an IP address string + # to match a specific address, an IP address mask per IPAddr, or one + # containing "*" to match part of an IPv4 address. + # + # IPAddr::InvalidPrefixError may be raised when an IP network + # address with an invalid netmask/prefix is given. + def initialize(str) if str == '*' or str == 'all' - @pat = [:all] + @pat = [:all] + elsif str.include?('*') + @pat = [:name, dot_pat(str)] else - begin - @pat = [:ip, IPAddr.new(str)] - rescue ArgumentError - @pat = [:name, dot_pat(str)] - end + begin + @pat = [:ip, IPAddr.new(str)] + rescue IPAddr::InvalidPrefixError + # In this case, `str` shouldn't be a host name pattern + # because it contains a slash. + raise + rescue ArgumentError + @pat = [:name, dot_pat(str)] + end end end private + + ## + # Creates a regular expression to match IPv4 addresses + def dot_pat_str(str) list = str.split('.').collect { |s| - (s == '*') ? '.+' : s + (s == '*') ? '.+' : s } list.join("\\.") end private + + ## + # Creates a Regexp to match an address. + def dot_pat(str) - exp = "^" + dot_pat_str(str) + "$" - Regexp.new(exp) + /\A#{dot_pat_str(str)}\z/ end public + + ## + # Matches +addr+ against this entry. + def match(addr) case @pat[0] when :all - true + true when :ip - begin - ipaddr = IPAddr.new(addr[3]) - ipaddr = ipaddr.ipv4_mapped if @pat[1].ipv6? && ipaddr.ipv4? - rescue ArgumentError - return false - end - (@pat[1].include?(ipaddr)) ? true : false + begin + ipaddr = IPAddr.new(addr[3]) + ipaddr = ipaddr.ipv4_mapped if @pat[1].ipv6? && ipaddr.ipv4? + rescue ArgumentError + return false + end + (@pat[1].include?(ipaddr)) ? true : false when :name - (@pat[1] =~ addr[2]) ? true : false + (@pat[1] =~ addr[2]) ? true : false else - false + false end end end + ## + # A list of ACLEntry objects. Used to implement the allow and deny halves + # of an ACL + class ACLList + + ## + # Creates an empty ACLList + def initialize @list = [] end public + + ## + # Matches +addr+ against each ACLEntry in this list. + def match(addr) @list.each do |e| - return true if e.match(addr) + return true if e.match(addr) end false end public + + ## + # Adds +str+ as an ACLEntry in this list + def add(str) @list.push(ACLEntry.new(str)) end + end + ## + # Default to deny + DENY_ALLOW = 0 + + ## + # Default to allow + ALLOW_DENY = 1 + ## + # Creates a new ACL from +list+ with an evaluation +order+ of DENY_ALLOW or + # ALLOW_DENY. + # + # An ACL +list+ is an Array of "allow" or "deny" and an address or address + # mask or "all" or "*" to match any address: + # + # %w[ + # deny all + # allow 192.0.2.2 + # allow 192.0.2.128/26 + # ] + def initialize(list=nil, order = DENY_ALLOW) @order = order @deny = ACLList.new @@ -87,11 +184,22 @@ class ACL end public + + ## + # Allow connections from Socket +soc+? + def allow_socket?(soc) allow_addr?(soc.peeraddr) end public + + ## + # Allow connections from addrinfo +addr+? It must be formatted like + # Socket#peeraddr: + # + # ["AF_INET", 10, "lc630", "192.0.2.1"] + def allow_addr?(addr) case @order when DENY_ALLOW @@ -108,37 +216,24 @@ class ACL end public + + ## + # Adds +list+ of ACL entries to this ACL. + def install_list(list) i = 0 while i < list.size permission, domain = list.slice(i,2) case permission.downcase when 'allow' - @allow.add(domain) + @allow.add(domain) when 'deny' - @deny.add(domain) + @deny.add(domain) else - raise "Invalid ACL entry #{list.to_s}" + raise "Invalid ACL entry #{list}" end i += 2 end end -end - -if __FILE__ == $0 - # example - list = %w(deny all - allow 192.168.1.1 - allow ::ffff:192.168.1.2 - allow 192.168.1.3 - ) - - addr = ["AF_INET", 10, "lc630", "192.168.1.3"] - acl = ACL.new - p acl.allow_addr?(addr) - - acl = ACL.new(list, ACL::DENY_ALLOW) - p acl.allow_addr?(addr) end - diff --git a/lib/drb/drb.rb b/lib/drb/drb.rb index 25fbb3f788..e47b303aa4 100644 --- a/lib/drb/drb.rb +++ b/lib/drb/drb.rb @@ -1,3 +1,4 @@ +# frozen_string_literal: false # # = drb/drb.rb # @@ -18,7 +19,7 @@ # # The Ruby standard library contains the core classes of the dRuby package. # However, the full package also includes access control lists and the -# Rinda tuple-space distributed task management system, as well as a +# Rinda tuple-space distributed task management system, as well as a # large number of samples. The full dRuby package can be downloaded from # the dRuby home page (see *References*). # @@ -34,17 +35,11 @@ # [http://www2a.biglobe.ne.jp/~seki/ruby/druby.en.html] # The English version of the dRuby home page. # -# [http://www.chadfowler.com/ruby/drb.html] -# A quick tutorial introduction to using dRuby by Chad Fowler. +# [http://pragprog.com/book/sidruby/the-druby-book] +# The dRuby Book: Distributed and Parallel Computing with Ruby +# by Masatoshi Seki and Makoto Inoue # -# [http://www.linux-mag.com/2002-09/ruby_05.html] -# A tutorial introduction to dRuby in Linux Magazine by Dave Thomas. -# Includes a discussion of Rinda. -# -# [http://www.eng.cse.dmu.ac.uk/~hgs/ruby/dRuby/] -# Links to English-language Ruby material collected by Hugh Sasse. -# -# [http://www.rubycentral.com/book/ospace.html] +# [http://www.ruby-doc.org/docs/ProgrammingRuby/html/ospace.html] # The chapter from *Programming* *Ruby* by Dave Thomas and Andy Hunt # which discusses dRuby. # @@ -52,8 +47,7 @@ # Translation of presentation on Ruby by Masatoshi Seki. require 'socket' -require 'thread' -require 'fcntl' +require 'io/wait' require 'drb/eq' # @@ -121,7 +115,7 @@ require 'drb/eq' # are forwarded to the local object, as described in the discussion of # DRbObjects. This has semantics similar to the normal Ruby # pass-by-reference. -# +# # The easiest way to signal that we want an otherwise marshallable # object to be passed or returned as a DRbObject reference, rather # than marshalled and sent as a copy, is to include the @@ -135,7 +129,7 @@ require 'drb/eq' # passed back to the remote execution context to be collected, before # the collected values are finally returned to the local context as # the return value of the method invocation. -# +# # == Examples of usage # # For more dRuby samples, see the +samples+ directory in the full @@ -148,33 +142,33 @@ require 'drb/eq' # starting the server code first. # # ==== Server code -# +# # require 'drb/drb' -# +# # # The URI for the server to connect to -# URI="druby://localhost:8787" -# +# URI="druby://localhost:8787" +# # class TimeServer -# +# # def get_current_time # return Time.now # end -# +# # end -# +# # # The object that handles requests on the server # FRONT_OBJECT=TimeServer.new # # $SAFE = 1 # disable eval() and friends -# +# # DRb.start_service(URI, FRONT_OBJECT) # # Wait for the drb server thread to finish before exiting. # DRb.thread.join # # ==== Client code -# +# # require 'drb/drb' -# +# # # The URI to connect to # SERVER_URI="druby://localhost:8787" # @@ -183,44 +177,47 @@ require 'drb/eq' # # Not necessary for this small example, but will be required # # as soon as we pass a non-marshallable object as an argument # # to a dRuby call. +# # +# # Note: this must be called at least once per process to take any effect. +# # This is particularly important if your application forks. # DRb.start_service -# +# # timeserver = DRbObject.new_with_uri(SERVER_URI) -# puts timeserver.get_current_time +# puts timeserver.get_current_time # # === Remote objects under dRuby # # This example illustrates returning a reference to an object # from a dRuby call. The Logger instances live in the server # process. References to them are returned to the client process, -# where methods can be invoked upon them. These methods are +# where methods can be invoked upon them. These methods are # executed in the server process. # # ==== Server code -# +# # require 'drb/drb' -# +# # URI="druby://localhost:8787" -# +# # class Logger # # # Make dRuby send Logger instances as dRuby references, # # not copies. # include DRb::DRbUndumped -# +# # def initialize(n, fname) # @name = n # @filename = fname # end -# +# # def log(message) # File.open(@filename, "a") do |f| # f.puts("#{Time.now}: #{@name}: #{message}") # end # end -# +# # end -# +# # # We have a central object for creating and retrieving loggers. # # This retains a local reference to all loggers created. This # # is so an existing logger can be looked up by name, but also @@ -228,48 +225,48 @@ require 'drb/eq' # # reference to an object is not sufficient to prevent it being # # garbage collected! # class LoggerFactory -# +# # def initialize(bdir) # @basedir = bdir # @loggers = {} # end -# +# # def get_logger(name) # if !@loggers.has_key? name # # make the filename safe, then declare it to be so -# fname = name.gsub(/[.\/]/, "_").untaint +# fname = name.gsub(/[.\/\\\:]/, "_").untaint # @loggers[name] = Logger.new(name, @basedir + "/" + fname) # end # return @loggers[name] # end -# +# # end -# +# # FRONT_OBJECT=LoggerFactory.new("/tmp/dlog") # # $SAFE = 1 # disable eval() and friends -# +# # DRb.start_service(URI, FRONT_OBJECT) # DRb.thread.join # # ==== Client code # # require 'drb/drb' -# +# # SERVER_URI="druby://localhost:8787" # # DRb.start_service -# +# # log_service=DRbObject.new_with_uri(SERVER_URI) -# +# # ["loga", "logb", "logc"].each do |logname| -# +# # logger=log_service.get_logger(logname) -# +# # logger.log("Hello, world!") # logger.log("Goodbye, world!") # logger.log("=== EOT ===") -# +# # end # # == Security @@ -288,9 +285,9 @@ require 'drb/eq' # ro.instance_eval("`rm -rf *`") # # The dangers posed by instance_eval and friends are such that a -# DRbServer should generally be run with $SAFE set to at least -# level 1. This will disable eval() and related calls on strings -# passed across the wire. The sample usage code given above follows +# DRbServer should generally be run with $SAFE set to at least +# level 1. This will disable eval() and related calls on strings +# passed across the wire. The sample usage code given above follows # this practice. # # A DRbServer can be configured with an access control list to @@ -360,7 +357,7 @@ module DRb # # This, the default implementation, uses an object's local ObjectSpace # __id__ as its id. This means that an object's identification over - # drb remains valid only while that object instance remains alive + # drb remains valid only while that object instance remains alive # within the server runtime. # # For alternative mechanisms, see DRb::TimerIdConv in rdb/timeridconv.rb @@ -374,7 +371,7 @@ module DRb def to_obj(ref) ObjectSpace._id2ref(ref) end - + # Convert an object into a reference id. # # This implementation returns the object's __id__ in the local @@ -390,7 +387,7 @@ module DRb # called over drb, then the object remains in the server space # and a reference to the object is returned, rather than the # object being marshalled and moved into the client space. - module DRbUndumped + module DRbUndumped def _dump(dummy) # :nodoc: raise TypeError, 'can\'t dump' end @@ -424,7 +421,7 @@ module DRb def self._load(s) # :nodoc: Marshal::load(s) end - + def _dump(lv) # :nodoc: Marshal::dump(@unknown) end @@ -432,6 +429,8 @@ module DRb # An exception wrapping an error object class DRbRemoteError < DRbError + + # Creates a new remote error that wraps the Exception +error+ def initialize(error) @reason = error.class.to_s super("#{error.message} (#{error.class})") @@ -456,21 +455,21 @@ module DRb # +name+ attribute. The marshalled object is held in the +buf+ # attribute. class DRbUnknown - + # Create a new DRbUnknown object. # # +buf+ is a string containing a marshalled object that could not - # be unmarshalled. +err+ is the error message that was raised + # be unmarshalled. +err+ is the error message that was raised # when the unmarshalling failed. It is used to determine the # name of the unmarshalled object. def initialize(err, buf) case err.to_s when /uninitialized constant (\S+)/ - @name = $1 + @name = $1 when /undefined class\/module (\S+)/ - @name = $1 + @name = $1 else - @name = nil + @name = nil end @buf = buf end @@ -486,9 +485,9 @@ module DRb def self._load(s) # :nodoc: begin - Marshal::load(s) + Marshal::load(s) rescue NameError, ArgumentError - DRbUnknown.new($!, s) + DRbUnknown.new($!, s) end end @@ -499,7 +498,7 @@ module DRb # Attempt to load the wrapped marshalled object again. # # If the class of the object is now known locally, the object - # will be unmarshalled and returned. Otherwise, a new + # will be unmarshalled and returned. Otherwise, a new # but identical DRbUnknown object will be returned. def reload self.class._load(@buf) @@ -511,27 +510,36 @@ module DRb end end + # An Array wrapper that can be sent to another server via DRb. + # + # All entries in the array will be dumped or be references that point to + # the local server. + class DRbArray + + # Creates a new DRbArray that either dumps or wraps all the items in the + # Array +ary+ so they can be loaded by a remote DRb server. + def initialize(ary) - @ary = ary.collect { |obj| - if obj.kind_of? DRbUndumped - DRbObject.new(obj) - else - begin - Marshal.dump(obj) - obj - rescue - DRbObject.new(obj) - end - end + @ary = ary.collect { |obj| + if obj.kind_of? DRbUndumped + DRbObject.new(obj) + else + begin + Marshal.dump(obj) + obj + rescue + DRbObject.new(obj) + end + end } end - def self._load(s) + def self._load(s) # :nodoc: Marshal::load(s) end - def _dump(lv) + def _dump(lv) # :nodoc: Marshal.dump(@ary) end end @@ -554,16 +562,16 @@ module DRb def dump(obj, error=false) # :nodoc: obj = make_proxy(obj, error) if obj.kind_of? DRbUndumped begin - str = Marshal::dump(obj) + str = Marshal::dump(obj) rescue - str = Marshal::dump(make_proxy(obj, error)) + str = Marshal::dump(make_proxy(obj, error)) end [str.size].pack('N') + str end def load(soc) # :nodoc: begin - sz = soc.read(4) # sizeof (N) + sz = soc.read(4) # sizeof (N) rescue raise(DRbConnError, $!.message, $!.backtrace) end @@ -578,7 +586,7 @@ module DRb end raise(DRbConnError, 'connection closed') if str.nil? raise(DRbConnError, 'premature marshal format(can\'t read)') if str.size < sz - Thread.exclusive do + DRb.mutex.synchronize do begin save = Thread.current[:drb_untaint] Thread.current[:drb_untaint] = [] @@ -600,23 +608,23 @@ module DRb ary.push(dump(msg_id.id2name)) ary.push(dump(arg.length)) arg.each do |e| - ary.push(dump(e)) + ary.push(dump(e)) end ary.push(dump(b)) stream.write(ary.join('')) rescue raise(DRbConnError, $!.message, $!.backtrace) end - + def recv_request(stream) # :nodoc: ref = load(stream) ro = DRb.to_obj(ref) msg = load(stream) argc = load(stream) - raise ArgumentError, 'too many arguments' if @argc_limit < argc + raise(DRbConnError, "too many arguments") if @argc_limit < argc argv = Array.new(argc, nil) argc.times do |n| - argv[n] = load(stream) + argv[n] = load(stream) end block = load(stream) return ro, msg, argv, block @@ -635,7 +643,7 @@ module DRb end private - def make_proxy(obj, error=false) + def make_proxy(obj, error=false) # :nodoc: if error DRbRemoteError.new(obj) else @@ -656,10 +664,10 @@ module DRb # using configuration +config+. Return a # protocol instance for this listener. # [uri_option(uri, config)] Take a URI, possibly containing an option - # component (e.g. a trailing '?param=val'), + # component (e.g. a trailing '?param=val'), # and return a [uri, option] tuple. # - # All of these methods should raise a DRbBadScheme error if the URI + # All of these methods should raise a DRbBadScheme error if the URI # does not identify the protocol they support (e.g. "druby:" for # the standard Ruby protocol). This is how the DRbProtocol module, # given a URI, determines which protocol implementation serves that @@ -675,14 +683,14 @@ module DRb # # The protocol instance returned by #open must have the following methods: # - # [send_request (ref, msg_id, arg, b)] + # [send_request (ref, msg_id, arg, b)] # Send a request to +ref+ with the given message id and arguments. # This is most easily implemented by calling DRbMessage.send_request, # providing a stream that sits on top of the current protocol. # [recv_reply] # Receive a reply from the server and return it as a [success-boolean, # reply-value] pair. This is most easily implemented by calling - # DRb.recv_reply, providing a stream that sits on top of the + # DRb.recv_reply, providing a stream that sits on top of the # current protocol. # [alive?] # Is this connection still alive? @@ -725,44 +733,44 @@ module DRb # URI by raising a DRbBadScheme error. If no protocol recognises the # URI, then a DRbBadURI error is raised. If a protocol accepts the # URI, but an error occurs in opening it, a DRbConnError is raised. - def open(uri, config, first=true) + def open(uri, config, first=true) @protocol.each do |prot| - begin - return prot.open(uri, config) - rescue DRbBadScheme - rescue DRbConnError - raise($!) - rescue - raise(DRbConnError, "#{uri} - #{$!.inspect}") - end + begin + return prot.open(uri, config) + rescue DRbBadScheme + rescue DRbConnError + raise($!) + rescue + raise(DRbConnError, "#{uri} - #{$!.inspect}") + end end if first && (config[:auto_load] != false) - auto_load(uri, config) - return open(uri, config, false) + auto_load(uri) + return open(uri, config, false) end raise DRbBadURI, 'can\'t parse uri:' + uri end module_function :open - # Open a server listening for connections at +uri+ with + # Open a server listening for connections at +uri+ with # configuration +config+. # # The DRbProtocol module asks each registered protocol in turn to - # try to open a server at the URI. Each protocol signals that it does - # not handle that URI by raising a DRbBadScheme error. If no protocol - # recognises the URI, then a DRbBadURI error is raised. If a protocol - # accepts the URI, but an error occurs in opening it, the underlying + # try to open a server at the URI. Each protocol signals that it does + # not handle that URI by raising a DRbBadScheme error. If no protocol + # recognises the URI, then a DRbBadURI error is raised. If a protocol + # accepts the URI, but an error occurs in opening it, the underlying # error is passed on to the caller. def open_server(uri, config, first=true) @protocol.each do |prot| - begin - return prot.open_server(uri, config) - rescue DRbBadScheme - end + begin + return prot.open_server(uri, config) + rescue DRbBadScheme + end end if first && (config[:auto_load] != false) - auto_load(uri, config) - return open_server(uri, config, false) + auto_load(uri) + return open_server(uri, config, false) end raise DRbBadURI, 'can\'t parse uri:' + uri end @@ -773,60 +781,69 @@ module DRb # The DRbProtocol module asks each registered protocol in turn to # try to parse the URI. Each protocol signals that it does not handle that # URI by raising a DRbBadScheme error. If no protocol recognises the - # URI, then a DRbBadURI error is raised. + # URI, then a DRbBadURI error is raised. def uri_option(uri, config, first=true) @protocol.each do |prot| - begin - uri, opt = prot.uri_option(uri, config) - # opt = nil if opt == '' - return uri, opt - rescue DRbBadScheme - end + begin + uri, opt = prot.uri_option(uri, config) + # opt = nil if opt == '' + return uri, opt + rescue DRbBadScheme + end end if first && (config[:auto_load] != false) - auto_load(uri, config) + auto_load(uri) return uri_option(uri, config, false) end raise DRbBadURI, 'can\'t parse uri:' + uri end module_function :uri_option - def auto_load(uri, config) # :nodoc: + def auto_load(uri) # :nodoc: if uri =~ /^drb([a-z0-9]+):/ - require("drb/#{$1}") rescue nil + require("drb/#{$1}") rescue nil end end module_function :auto_load end - # The default drb protocol. + # The default drb protocol which communicates over a TCP socket. # - # Communicates over a TCP socket. + # The DRb TCP protocol URI looks like: + # <code>druby://<host>:<port>?<option></code>. The option is optional. + class DRbTCPSocket + # :stopdoc: private def self.parse_uri(uri) if uri =~ /^druby:\/\/(.*?):(\d+)(\?(.*))?$/ - host = $1 - port = $2.to_i - option = $4 - [host, port, option] + host = $1 + port = $2.to_i + option = $4 + [host, port, option] else - raise(DRbBadScheme, uri) unless uri =~ /^druby:/ - raise(DRbBadURI, 'can\'t parse uri:' + uri) + raise(DRbBadScheme, uri) unless uri =~ /^druby:/ + raise(DRbBadURI, 'can\'t parse uri:' + uri) end end public - # Open a client connection to +uri+ using configuration +config+. + # Open a client connection to +uri+ (DRb URI string) using configuration + # +config+. + # + # This can raise DRb::DRbBadScheme or DRb::DRbBadURI if +uri+ is not for a + # recognized protocol. See DRb::DRbServer.new for information on built-in + # URI protocols. def self.open(uri, config) - host, port, option = parse_uri(uri) + host, port, = parse_uri(uri) host.untaint port.untaint soc = TCPSocket.open(host, port) self.new(uri, soc, config) end + # Returns the hostname of this server def self.getservername host = Socket::gethostname begin @@ -836,35 +853,36 @@ module DRb end end + # For the families available for +host+, returns a TCPServer on +port+. + # If +port+ is 0 the first available port is used. IPv4 servers are + # preferred over IPv6 servers. def self.open_server_inaddr_any(host, port) - infos = Socket::getaddrinfo(host, nil, + infos = Socket::getaddrinfo(host, nil, Socket::AF_UNSPEC, - Socket::SOCK_STREAM, + Socket::SOCK_STREAM, 0, Socket::AI_PASSIVE) - family = infos.collect { |af, *_| af }.uniq - case family - when ['AF_INET'] - return TCPServer.open('0.0.0.0', port) - when ['AF_INET6'] - return TCPServer.open('::', port) - else - return TCPServer.open(port) - end + families = Hash[*infos.collect { |af, *_| af }.uniq.zip([]).flatten] + return TCPServer.open('0.0.0.0', port) if families.has_key?('AF_INET') + return TCPServer.open('::', port) if families.has_key?('AF_INET6') + return TCPServer.open(port) + # :stopdoc: end - # Open a server listening for connections at +uri+ using + # Open a server listening for connections at +uri+ using # configuration +config+. def self.open_server(uri, config) uri = 'druby://:0' unless uri - host, port, opt = parse_uri(uri) + host, port, _ = parse_uri(uri) + config = {:tcp_original_host => host}.update(config) if host.size == 0 host = getservername soc = open_server_inaddr_any(host, port) else - soc = TCPServer.open(host, port) + soc = TCPServer.open(host, port) end port = soc.addr[1] if port == 0 + config[:tcp_port] = port uri = "druby://#{host}:#{port}" self.new(uri, soc, config) end @@ -887,6 +905,7 @@ module DRb @acl = config[:tcp_acl] @msg = DRbMessage.new(config) set_sockopt(@socket) + @shutdown_pipe_r, @shutdown_pipe_w = IO.pipe end # Get the URI that we are connected to. @@ -897,7 +916,7 @@ module DRb def peeraddr @socket.peeraddr end - + # Get the socket. def stream; @socket; end @@ -905,7 +924,7 @@ module DRb def send_request(ref, msg_id, arg, b) @msg.send_request(stream, ref, msg_id, arg, b) end - + # On the server side, receive a request from the client. def recv_request @msg.recv_request(stream) @@ -931,36 +950,68 @@ module DRb # client-server session. def close if @socket - @socket.close - @socket = nil + @socket.close + @socket = nil + end + close_shutdown_pipe + end + + def close_shutdown_pipe + if @shutdown_pipe_r && !@shutdown_pipe_r.closed? + @shutdown_pipe_r.close + @shutdown_pipe_r = nil + end + if @shutdown_pipe_w && !@shutdown_pipe_w.closed? + @shutdown_pipe_w.close + @shutdown_pipe_w = nil end end - - # On the server side, for an instance returned by #open_server, + private :close_shutdown_pipe + + # On the server side, for an instance returned by #open_server, # accept a client connection and return a new instance to handle # the server's side of this client-server session. def accept while true - s = @socket.accept - break if (@acl ? @acl.allow_socket?(s) : true) - s.close + s = accept_or_shutdown + return nil unless s + break if (@acl ? @acl.allow_socket?(s) : true) + s.close + end + if @config[:tcp_original_host].to_s.size == 0 + uri = "druby://#{s.addr[3]}:#{@config[:tcp_port]}" + else + uri = @uri end - self.class.new(nil, s, @config) + self.class.new(uri, s, @config) + end + + def accept_or_shutdown + readables, = IO.select([@socket, @shutdown_pipe_r]) + if readables.include? @shutdown_pipe_r + return nil + end + @socket.accept + end + private :accept_or_shutdown + + # Graceful shutdown + def shutdown + @shutdown_pipe_w.close if @shutdown_pipe_w && !@shutdown_pipe_w.closed? end # Check to see if this connection is alive. def alive? return false unless @socket - if IO.select([@socket], nil, nil, 0) - close - return false + if @socket.to_io.wait_readable(0) + close + return false end true end def set_sockopt(soc) # :nodoc: soc.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - soc.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined? Fcntl::FD_CLOEXEC end end @@ -972,18 +1023,18 @@ module DRb def initialize(option) @option = option.to_s end - attr :option + attr_reader :option def to_s; @option; end - + def ==(other) return false unless DRbURIOption === other @option == other.option end - + def hash @option.hash end - + alias eql? == end @@ -1000,9 +1051,9 @@ module DRb # created to act as a stub for the remote referenced object. def self._load(s) uri, ref = Marshal.load(s) - + if DRb.here?(uri) - obj = DRb.to_obj(ref) + obj = DRb.to_obj(ref) if ((! obj.tainted?) && Thread.current[:drb_untaint]) Thread.current[:drb_untaint].push(obj) end @@ -1012,10 +1063,13 @@ module DRb self.new_with(uri, ref) end + # Creates a DRb::DRbObject given the reference information to the remote + # host +uri+ and object +ref+. + def self.new_with(uri, ref) it = self.allocate - it.instance_variable_set('@uri', uri) - it.instance_variable_set('@ref', ref) + it.instance_variable_set(:@uri, uri) + it.instance_variable_set(:@ref, ref) it end @@ -1040,17 +1094,17 @@ module DRb @uri = nil @ref = nil if obj.nil? - return if uri.nil? - @uri, option = DRbProtocol.uri_option(uri, DRb.config) - @ref = DRbURIOption.new(option) unless option.nil? + return if uri.nil? + @uri, option = DRbProtocol.uri_option(uri, DRb.config) + @ref = DRbURIOption.new(option) unless option.nil? else - @uri = uri ? uri : (DRb.uri rescue nil) - @ref = obj ? DRb.to_id(obj) : nil + @uri = uri ? uri : (DRb.uri rescue nil) + @ref = obj ? DRb.to_id(obj) : nil end end # Get the URI of the remote object. - def __drburi + def __drburi @uri end @@ -1062,6 +1116,7 @@ module DRb undef :to_s undef :to_a if respond_to?(:to_a) + # Routes respond_to? to the referenced remote object. def respond_to?(msg_id, priv=false) case msg_id when :_dump @@ -1073,12 +1128,12 @@ module DRb end end - # Routes method calls to the referenced object. + # Routes method calls to the referenced remote object. def method_missing(msg_id, *a, &b) if DRb.here?(@uri) - obj = DRb.to_obj(@ref) - DRb.current_server.check_insecure_method(obj, msg_id) - return obj.__send__(msg_id, *a, &b) + obj = DRb.to_obj(@ref) + DRb.current_server.check_insecure_method(obj, msg_id) + return obj.__send__(msg_id, *a, &b) end succ, result = self.class.with_friend(@uri) do @@ -1093,15 +1148,16 @@ module DRb raise result else bt = self.class.prepare_backtrace(@uri, result) - result.set_backtrace(bt + caller) + result.set_backtrace(bt + caller) raise result end end - def self.with_friend(uri) + # Given the +uri+ of another host executes the block provided. + def self.with_friend(uri) # :nodoc: friend = DRb.fetch_server(uri) return yield() unless friend - + save = Thread.current['DRb'] Thread.current['DRb'] = { 'server' => friend } return yield @@ -1109,11 +1165,13 @@ module DRb Thread.current['DRb'] = save if friend end - def self.prepare_backtrace(uri, result) + # Returns a modified backtrace from +result+ with the +uri+ where each call + # in the backtrace came from. + def self.prepare_backtrace(uri, result) # :nodoc: prefix = "(#{uri}) " bt = [] result.backtrace.each do |x| - break if /`__send__'$/ =~ x + break if /`__send__'$/ =~ x if /^\(druby:\/\// =~ x bt.push(x) else @@ -1146,41 +1204,41 @@ module DRb # not normally need to deal with it directly. class DRbConn POOL_SIZE = 16 # :nodoc: - @mutex = Mutex.new + @mutex = Thread::Mutex.new @pool = [] def self.open(remote_uri) # :nodoc: begin - conn = nil - - @mutex.synchronize do - #FIXME - new_pool = [] - @pool.each do |c| - if conn.nil? and c.uri == remote_uri - conn = c if c.alive? - else - new_pool.push c - end - end - @pool = new_pool - end - - conn = self.new(remote_uri) unless conn - succ, result = yield(conn) - return succ, result + conn = nil + + @mutex.synchronize do + #FIXME + new_pool = [] + @pool.each do |c| + if conn.nil? and c.uri == remote_uri + conn = c if c.alive? + else + new_pool.push c + end + end + @pool = new_pool + end + + conn = self.new(remote_uri) unless conn + succ, result = yield(conn) + return succ, result ensure - if conn - if succ - @mutex.synchronize do - @pool.unshift(conn) - @pool.pop.close while @pool.size > POOL_SIZE - end - else - conn.close - end - end + if conn + if succ + @mutex.synchronize do + @pool.unshift(conn) + @pool.pop.close while @pool.size > POOL_SIZE + end + else + conn.close + end + end end end @@ -1201,6 +1259,7 @@ module DRb end def alive? # :nodoc: + return false unless @protocol @protocol.alive? end end @@ -1239,9 +1298,9 @@ module DRb @@load_limit = sz end - # Set the default value for the :acl option. + # Set the default access control list to +acl+. The default ACL is +nil+. # - # See #new(). The initial default value is nil. + # See also DRb::ACL and #new() def self.default_acl(acl) @@acl = acl end @@ -1253,6 +1312,9 @@ module DRb @@idconv = idconv end + # Set the default safe level to +level+. The default safe level is 0 + # + # See #new for more information. def self.default_safe_level(level) @@safe_level = level end @@ -1263,19 +1325,19 @@ module DRb def self.verbose=(on) @@verbose = on end - + # Get the default value of the :verbose option. def self.verbose @@verbose end def self.make_config(hash={}) # :nodoc: - default_config = { - :idconv => @@idconv, - :verbose => @@verbose, - :tcp_acl => @@acl, - :load_limit => @@load_limit, - :argc_limit => @@argc_limit, + default_config = { + :idconv => @@idconv, + :verbose => @@verbose, + :tcp_acl => @@acl, + :load_limit => @@load_limit, + :argc_limit => @@argc_limit, :safe_level => @@safe_level } default_config.update(hash) @@ -1310,6 +1372,9 @@ module DRb # :argc_limit :: the maximum number of arguments to a remote # method accepted by the server. Defaults to # 256. + # :safe_level :: The safe level of the DRbServer. The attribute + # sets $SAFE for methods performed in the main_loop. + # Defaults to 0. # # The default values of these options can be modified on # a class-wide basis by the class methods #default_argc_limit, @@ -1326,18 +1391,19 @@ module DRb # The server will immediately start running in its own thread. def initialize(uri=nil, front=nil, config_or_acl=nil) if Hash === config_or_acl - config = config_or_acl.dup + config = config_or_acl.dup else - acl = config_or_acl || @@acl - config = { - :tcp_acl => acl - } + acl = config_or_acl || @@acl + config = { + :tcp_acl => acl + } end @config = self.class.make_config(config) @protocol = DRbProtocol.open_server(uri, @config) @uri = @protocol.uri + @exported_uri = [@uri] @front = front @idconv = @config[:idconv] @@ -1360,7 +1426,7 @@ module DRb attr_reader :thread # The front object of the DRbServer. - # + # # This object receives remote method calls made on the server's # URI alone, with an object id. attr_reader :front @@ -1368,6 +1434,10 @@ module DRb # The configuration of this DRbServer attr_reader :config + # The safe level for this server. This is a number corresponding to + # $SAFE. + # + # The default safe_level is 0 attr_reader :safe_level # Set whether to operate in verbose mode. @@ -1385,13 +1455,18 @@ module DRb @thread.alive? end + # Is +uri+ the URI for this server? + def here?(uri) + @exported_uri.include?(uri) + end + # Stop this server. def stop_service DRb.remove_server(self) if Thread.current['DRb'] && Thread.current['DRb']['server'] == self Thread.current['DRb']['stop_service'] = true else - @thread.kill + shutdown end end @@ -1409,30 +1484,30 @@ module DRb end private - def kill_sub_thread - Thread.new do - grp = ThreadGroup.new - grp.add(Thread.current) - list = @grp.list - while list.size > 0 - list.each do |th| - th.kill if th.alive? - end - list = @grp.list - end + + def shutdown + current = Thread.current + if @protocol.respond_to? :shutdown + @protocol.shutdown + else + [@thread, *@grp.list].each { |thread| + thread.kill unless thread == current # xxx: Thread#kill + } end + @thread.join unless @thread == current end + ## + # Starts the DRb main loop in a new thread. + def run Thread.start do - begin - while true - main_loop - end - ensure - @protocol.close if @protocol - kill_sub_thread - end + begin + while main_loop + end + ensure + @protocol.close if @protocol + end end end @@ -1453,7 +1528,7 @@ module DRb def any_to_s(obj) obj.to_s + ":#{obj.class}" rescue - sprintf("#<%s:0x%lx>", obj.class, obj.__id__) + sprintf("#<%s:0x%lx>", obj.class, obj.__id__) end # Check that a method is callable via dRuby. @@ -1461,37 +1536,37 @@ module DRb # +obj+ is the object we want to invoke the method on. +msg_id+ is the # method name, as a Symbol. # - # If the method is an insecure method (see #insecure_method?) a + # If the method is an insecure method (see #insecure_method?) a # SecurityError is thrown. If the method is private or undefined, # a NameError is thrown. def check_insecure_method(obj, msg_id) return true if Proc === obj && msg_id == :__drb_yield raise(ArgumentError, "#{any_to_s(msg_id)} is not a symbol") unless Symbol == msg_id.class raise(SecurityError, "insecure method `#{msg_id}'") if insecure_method?(msg_id) - - if obj.private_methods.include?(msg_id.to_s) - desc = any_to_s(obj) + + if obj.private_methods.include?(msg_id) + desc = any_to_s(obj) raise NoMethodError, "private method `#{msg_id}' called for #{desc}" - elsif obj.protected_methods.include?(msg_id.to_s) - desc = any_to_s(obj) + elsif obj.protected_methods.include?(msg_id) + desc = any_to_s(obj) raise NoMethodError, "protected method `#{msg_id}' called for #{desc}" else true end end public :check_insecure_method - + class InvokeMethod # :nodoc: def initialize(drb_server, client) - @drb_server = drb_server + @drb_server = drb_server @safe_level = drb_server.safe_level - @client = client + @client = client end def perform - @result = nil - @succ = false - setup_message + @result = nil + @succ = false + setup_message if $SAFE < @safe_level info = Thread.current['DRb'] @@ -1502,7 +1577,7 @@ module DRb perform_with_block }.value else - @result = Thread.new { + @result = Thread.new { Thread.current['DRb'] = info $SAFE = @safe_level perform_without_block @@ -1515,58 +1590,62 @@ module DRb @result = perform_without_block end end - @succ = true - if @msg_id == :to_ary && @result.class == Array - @result = DRbArray.new(@result) - end - return @succ, @result + @succ = true + if @msg_id == :to_ary && @result.class == Array + @result = DRbArray.new(@result) + end + return @succ, @result rescue StandardError, ScriptError, Interrupt - @result = $! - return @succ, @result + @result = $! + return @succ, @result end private def init_with_client - obj, msg, argv, block = @client.recv_request + obj, msg, argv, block = @client.recv_request @obj = obj @msg_id = msg.intern @argv = argv @block = block end - + def check_insecure_method @drb_server.check_insecure_method(@obj, @msg_id) end def setup_message - init_with_client - check_insecure_method + init_with_client + check_insecure_method end - + def perform_without_block - if Proc === @obj && @msg_id == :__drb_yield + if Proc === @obj && @msg_id == :__drb_yield if @argv.size == 1 - ary = @argv - else - ary = [@argv] - end - ary.collect(&@obj)[0] - else - @obj.__send__(@msg_id, *@argv) - end + ary = @argv + else + ary = [@argv] + end + ary.collect(&@obj)[0] + else + @obj.__send__(@msg_id, *@argv) + end end end - if RUBY_VERSION >= '1.8' - require 'drb/invokemethod' - class InvokeMethod - include InvokeMethod18Mixin - end - else - require 'drb/invokemethod16' - class InvokeMethod - include InvokeMethod16Mixin + require 'drb/invokemethod' + class InvokeMethod + include InvokeMethod18Mixin + end + + def error_print(exception) + exception.backtrace.inject(true) do |first, x| + if first + $stderr.puts "#{x}: #{exception} (#{exception.class})" + else + $stderr.puts "\tfrom #{x}" + end + false end end @@ -1578,30 +1657,34 @@ module DRb # returning responses, until the client closes the connection # or a local method call fails. def main_loop - Thread.start(@protocol.accept) do |client| - @grp.add Thread.current - Thread.current['DRb'] = { 'client' => client , - 'server' => self } - loop do - begin - succ = false - invoke_method = InvokeMethod.new(self, client) - succ, result = invoke_method.perform - if !succ && verbose - p result - result.backtrace.each do |x| - puts x - end - end - client.send_reply(succ, result) rescue nil - ensure + client0 = @protocol.accept + return nil if !client0 + Thread.start(client0) do |client| + @grp.add Thread.current + Thread.current['DRb'] = { 'client' => client , + 'server' => self } + DRb.mutex.synchronize do + client_uri = client.uri + @exported_uri << client_uri unless @exported_uri.include?(client_uri) + end + loop do + begin + succ = false + invoke_method = InvokeMethod.new(self, client) + succ, result = invoke_method.perform + error_print(result) if !succ && verbose + client.send_reply(succ, result) + rescue Exception => e + error_print(e) if verbose + ensure client.close unless succ if Thread.current['DRb']['stop_service'] - Thread.new { stop_service } + shutdown + break end break unless succ - end - end + end + end end end end @@ -1630,7 +1713,7 @@ module DRb # The primary local dRuby server. # - # This is the server created by the #start_service call. + # This is the server created by the #start_service call. attr_accessor :primary_server module_function :primary_server=, :primary_server @@ -1645,8 +1728,8 @@ module DRb # If the above rule fails to find a server, a DRbServerNotFound # error is raised. def current_server - drb = Thread.current['DRb'] - server = (drb && drb['server']) ? drb['server'] : @primary_server + drb = Thread.current['DRb'] + server = (drb && drb['server']) ? drb['server'] : @primary_server raise DRbServerNotFound unless server return server end @@ -1666,13 +1749,20 @@ module DRb # # This is the URI of the current server. See #current_server. def uri + drb = Thread.current['DRb'] + client = (drb && drb['client']) + if client + uri = client.uri + return uri if uri + end current_server.uri end module_function :uri # Is +uri+ the URI for the current local server? def here?(uri) - (current_server.uri rescue nil) == uri + current_server.here?(uri) rescue false + # (current_server.uri rescue nil) == uri end module_function :here? @@ -1686,7 +1776,7 @@ module DRb DRbServer.make_config end module_function :config - + # Get the front object of the current server. # # This raises a DRbServerNotFound error if there is no current server. @@ -1722,7 +1812,10 @@ module DRb end module_function :thread - # Set the default id conv object. + # Set the default id conversion object. + # + # This is expected to be an instance such as DRb::DRbIdConv that responds to + # #to_id and #to_obj that can convert objects to and from DRb references. # # See DRbServer#default_id_conv. def install_id_conv(idconv) @@ -1730,7 +1823,7 @@ module DRb end module_function :install_id_conv - # Set the default acl. + # Set the default ACL to +acl+. # # See DRb::DRbServer.default_acl. def install_acl(acl) @@ -1738,26 +1831,49 @@ module DRb end module_function :install_acl + @mutex = Thread::Mutex.new + def mutex # :nodoc: + @mutex + end + module_function :mutex + @server = {} + # Registers +server+ with DRb. + # + # This is called when a new DRb::DRbServer is created. + # + # If there is no primary server then +server+ becomes the primary server. + # + # Example: + # + # require 'drb' + # + # s = DRb::DRbServer.new # automatically calls regist_server + # DRb.fetch_server s.uri #=> #<DRb::DRbServer:0x...> def regist_server(server) @server[server.uri] = server - Thread.exclusive do + mutex.synchronize do @primary_server = server unless @primary_server end end module_function :regist_server + # Removes +server+ from the list of registered servers. def remove_server(server) @server.delete(server.uri) end module_function :remove_server - + + # Retrieves the server with the given +uri+. + # + # See also regist_server and remove_server. def fetch_server(uri) @server[uri] end module_function :fetch_server end +# :stopdoc: DRbObject = DRb::DRbObject DRbUndumped = DRb::DRbUndumped DRbIdConv = DRb::DRbIdConv diff --git a/lib/drb/eq.rb b/lib/drb/eq.rb index e24512d6a7..15ca5cae42 100644 --- a/lib/drb/eq.rb +++ b/lib/drb/eq.rb @@ -1,7 +1,6 @@ -require 'drb/drb' - +# frozen_string_literal: false module DRb - class DRbObject + class DRbObject # :nodoc: def ==(other) return false unless DRbObject === other (@ref == other.__drbref) && (@uri == other.__drburi) diff --git a/lib/drb/extserv.rb b/lib/drb/extserv.rb index 7da8130c2b..1cb1be4709 100644 --- a/lib/drb/extserv.rb +++ b/lib/drb/extserv.rb @@ -1,19 +1,25 @@ +# frozen_string_literal: false =begin external service - Copyright (c) 2000,2002 Masatoshi SEKI + Copyright (c) 2000,2002 Masatoshi SEKI =end require 'drb/drb' +require 'monitor' module DRb class ExtServ + include MonitorMixin include DRbUndumped def initialize(there, name, server=nil) + super() @server = server || DRb::primary_server @name = name ro = DRbObject.new(nil, there) - @invoker = ro.regist(name, DRbObject.new(self, @server.uri)) + synchronize do + @invoker = ro.regist(name, DRbObject.new(self, @server.uri)) + end end attr_reader :server @@ -22,11 +28,13 @@ module DRb end def stop_service - @invoker.unregist(@name) - server = @server - @server = nil - server.stop_service - true + synchronize do + @invoker.unregist(@name) + server = @server + @server = nil + server.stop_service + true + end end def alive? @@ -34,31 +42,3 @@ module DRb end end end - -if __FILE__ == $0 - class Foo - include DRbUndumped - - def initialize(str) - @str = str - end - - def hello(it) - "#{it}: #{self}" - end - - def to_s - @str - end - end - - cmd = ARGV.shift - case cmd - when 'itest1', 'itest2' - front = Foo.new(cmd) - manager = DRb::DRbServer.new(nil, front) - es = DRb::ExtServ.new(ARGV.shift, ARGV.shift, manager) - es.server.thread.join - end -end - diff --git a/lib/drb/extservm.rb b/lib/drb/extservm.rb index 7066f84c65..2d3c369bbf 100644 --- a/lib/drb/extservm.rb +++ b/lib/drb/extservm.rb @@ -1,14 +1,16 @@ +# frozen_string_literal: false =begin external service manager - Copyright (c) 2000 Masatoshi SEKI + Copyright (c) 2000 Masatoshi SEKI =end require 'drb/drb' -require 'thread' +require 'monitor' module DRb class ExtServManager include DRbUndumped + include MonitorMixin @@command = {} @@ -19,78 +21,73 @@ module DRb def self.command=(cmd) @@command = cmd end - + def initialize + super() + @cond = new_cond @servers = {} @waiting = [] - @queue = Queue.new + @queue = Thread::Queue.new @thread = invoke_thread @uri = nil end attr_accessor :uri def service(name) - while true - server = nil - Thread.exclusive do - server = @servers[name] if @servers[name] - end - return server if server && server.alive? - invoke_service(name) + synchronize do + while true + server = @servers[name] + return server if server && server.alive? # server may be `false' + invoke_service(name) + @cond.wait + end end end def regist(name, ro) - ary = nil - Thread.exclusive do - @servers[name] = ro - ary = @waiting - @waiting = [] - end - ary.each do |th| - begin - th.run - rescue ThreadError - end + synchronize do + @servers[name] = ro + @cond.signal end self end - + def unregist(name) - Thread.exclusive do - @servers.delete(name) + synchronize do + @servers.delete(name) end end private def invoke_thread Thread.new do - while true - name = @queue.pop - invoke_service_command(name, @@command[name]) - end + while true + name = @queue.pop + invoke_service_command(name, @@command[name]) + end end end def invoke_service(name) - Thread.critical = true - @waiting.push Thread.current - @queue.push name - Thread.stop + @queue.push(name) end def invoke_service_command(name, command) raise "invalid command. name: #{name}" unless command - Thread.exclusive do - return if @servers.include?(name) - @servers[name] = false + synchronize do + return if @servers.include?(name) + @servers[name] = false end uri = @uri || DRb.uri - if RUBY_PLATFORM =~ /mswin32/ && /NT/ =~ ENV["OS"] - system(%Q'cmd /c start "ruby" /b #{command} #{uri} #{name}') + if command.respond_to? :to_ary + command = command.to_ary + [uri, name] + pid = spawn(*command) else - system("#{command} #{uri} #{name} &") + pid = spawn("#{command} #{uri} #{name}") end + th = Process.detach(pid) + th[:drb_service] = name + th end end end diff --git a/lib/drb/gw.rb b/lib/drb/gw.rb index b7a5f5383f..d000507644 100644 --- a/lib/drb/gw.rb +++ b/lib/drb/gw.rb @@ -1,9 +1,38 @@ +# frozen_string_literal: false require 'drb/drb' require 'monitor' module DRb + + # Gateway id conversion forms a gateway between different DRb protocols or + # networks. + # + # The gateway needs to install this id conversion and create servers for + # each of the protocols or networks it will be a gateway between. It then + # needs to create a server that attaches to each of these networks. For + # example: + # + # require 'drb/drb' + # require 'drb/unix' + # require 'drb/gw' + # + # DRb.install_id_conv DRb::GWIdConv.new + # gw = DRb::GW.new + # s1 = DRb::DRbServer.new 'drbunix:/path/to/gateway', gw + # s2 = DRb::DRbServer.new 'druby://example:10000', gw + # + # s1.thread.join + # s2.thread.join + # + # Each client must register services with the gateway, for example: + # + # DRb.start_service 'drbunix:', nil # an anonymous server + # gw = DRbObject.new nil, 'drbunix:/path/to/gateway' + # gw[:unix] = some_service + # DRb.thread.join + class GWIdConv < DRbIdConv - def to_obj(ref) + def to_obj(ref) # :nodoc: if Array === ref && ref[0] == :DRbObject return DRbObject.new_with(ref[1], ref[2]) end @@ -11,19 +40,29 @@ module DRb end end + # The GW provides a synchronized store for participants in the gateway to + # communicate. + class GW include MonitorMixin + + # Creates a new GW + def initialize super() @hash = {} end + # Retrieves +key+ from the GW + def [](key) synchronize do @hash[key] end end + # Stores value +v+ at +key+ in the GW + def []=(key, v) synchronize do @hash[key] = v @@ -31,7 +70,7 @@ module DRb end end - class DRbObject + class DRbObject # :nodoc: def self._load(s) uri, ref = Marshal.load(s) if DRb.uri == uri diff --git a/lib/drb/invokemethod.rb b/lib/drb/invokemethod.rb index 412b2ab9b5..0fae6d52b6 100644 --- a/lib/drb/invokemethod.rb +++ b/lib/drb/invokemethod.rb @@ -1,15 +1,16 @@ +# frozen_string_literal: false # for ruby-1.8.0 -module DRb +module DRb # :nodoc: all class DRbServer module InvokeMethod18Mixin def block_yield(x) - if x.size == 1 && x[0].class == Array - x[0] = DRbArray.new(x[0]) - end - block_value = @block.call(*x) + if x.size == 1 && x[0].class == Array + x[0] = DRbArray.new(x[0]) + end + @block.call(*x) end - + def perform_with_block @obj.__send__(@msg_id, *@argv) do |*x| jump_error = nil @@ -20,8 +21,6 @@ module DRb end if jump_error case jump_error.reason - when :retry - retry when :break break(jump_error.exit_value) else diff --git a/lib/drb/observer.rb b/lib/drb/observer.rb index e7f1668c52..3ee15331a1 100644 --- a/lib/drb/observer.rb +++ b/lib/drb/observer.rb @@ -1,21 +1,25 @@ +# frozen_string_literal: false require 'observer' module DRb + # The Observable module extended to DRb. See Observable for details. module DRbObservable include Observable + # Notifies observers of a change in state. See also + # Observable#notify_observers def notify_observers(*arg) if defined? @observer_state and @observer_state - if defined? @observer_peers - for i in @observer_peers.dup - begin - i.update(*arg) - rescue - delete_observer(i) - end - end - end - @observer_state = false + if defined? @observer_peers + @observer_peers.each do |observer, method| + begin + observer.send(method, *arg) + rescue + delete_observer(observer) + end + end + end + @observer_state = false end end end diff --git a/lib/drb/ssl.rb b/lib/drb/ssl.rb index 58d6b7d1e0..45fe4f1f74 100644 --- a/lib/drb/ssl.rb +++ b/lib/drb/ssl.rb @@ -1,3 +1,4 @@ +# frozen_string_literal: false require 'socket' require 'openssl' require 'drb/drb' @@ -5,124 +6,248 @@ require 'singleton' module DRb + # The protocol for DRb over an SSL socket + # + # The URI for a DRb socket over SSL is: + # <code>drbssl://<host>:<port>?<option></code>. The option is optional class DRbSSLSocket < DRbTCPSocket + # SSLConfig handles the needed SSL information for establishing a + # DRbSSLSocket connection, including generating the X509 / RSA pair. + # + # An instance of this config can be passed to DRbSSLSocket.new, + # DRbSSLSocket.open and DRbSSLSocket.open_server + # + # See DRb::DRbSSLSocket::SSLConfig.new for more details class SSLConfig + # Default values for a SSLConfig instance. + # + # See DRb::DRbSSLSocket::SSLConfig.new for more details DEFAULT = { - :SSLCertificate => nil, - :SSLPrivateKey => nil, - :SSLClientCA => nil, - :SSLCACertificatePath => nil, - :SSLCACertificateFile => nil, - :SSLVerifyMode => ::OpenSSL::SSL::VERIFY_NONE, - :SSLVerifyDepth => nil, - :SSLVerifyCallback => nil, # custom verification + :SSLCertificate => nil, + :SSLPrivateKey => nil, + :SSLClientCA => nil, + :SSLCACertificatePath => nil, + :SSLCACertificateFile => nil, + :SSLTmpDhCallback => nil, + :SSLVerifyMode => ::OpenSSL::SSL::VERIFY_NONE, + :SSLVerifyDepth => nil, + :SSLVerifyCallback => nil, # custom verification :SSLCertificateStore => nil, - # Must specify if you use auto generated certificate. - :SSLCertName => nil, # e.g. [["CN","fqdn.example.com"]] - :SSLCertComment => "Generated by Ruby/OpenSSL" + # Must specify if you use auto generated certificate. + :SSLCertName => nil, # e.g. [["CN","fqdn.example.com"]] + :SSLCertComment => "Generated by Ruby/OpenSSL" } + # Create a new DRb::DRbSSLSocket::SSLConfig instance + # + # The DRb::DRbSSLSocket will take either a +config+ Hash or an instance + # of SSLConfig, and will setup the certificate for its session for the + # configuration. If want it to generate a generic certificate, the bare + # minimum is to provide the :SSLCertName + # + # === Config options + # + # From +config+ Hash: + # + # :SSLCertificate :: + # An instance of OpenSSL::X509::Certificate. If this is not provided, + # then a generic X509 is generated, with a correspond :SSLPrivateKey + # + # :SSLPrivateKey :: + # A private key instance, like OpenSSL::PKey::RSA. This key must be + # the key that signed the :SSLCertificate + # + # :SSLClientCA :: + # An OpenSSL::X509::Certificate, or Array of certificates that will + # used as ClientCAs in the SSL Context + # + # :SSLCACertificatePath :: + # A path to the directory of CA certificates. The certificates must + # be in PEM format. + # + # :SSLCACertificateFile :: + # A path to a CA certificate file, in PEM format. + # + # :SSLTmpDhCallback :: + # A DH callback. See OpenSSL::SSL::SSLContext.tmp_dh_callback + # + # :SSLVerifyMode :: + # This is the SSL verification mode. See OpenSSL::SSL::VERIFY_* for + # available modes. The default is OpenSSL::SSL::VERIFY_NONE + # + # :SSLVerifyDepth :: + # Number of CA certificates to walk, when verifying a certificate + # chain. + # + # :SSLVerifyCallback :: + # A callback to be used for additional verification. See + # OpenSSL::SSL::SSLContext.verify_callback + # + # :SSLCertificateStore :: + # A OpenSSL::X509::Store used for verification of certificates + # + # :SSLCertName :: + # Issuer name for the certificate. This is required when generating + # the certificate (if :SSLCertificate and :SSLPrivateKey were not + # given). The value of this is to be an Array of pairs: + # + # [["C", "Raleigh"], ["ST","North Carolina"], + # ["CN","fqdn.example.com"]] + # + # See also OpenSSL::X509::Name + # + # :SSLCertComment :: + # A comment to be used for generating the certificate. The default is + # "Generated by Ruby/OpenSSL" + # + # + # === Example + # + # These values can be added after the fact, like a Hash. + # + # require 'drb/ssl' + # c = DRb::DRbSSLSocket::SSLConfig.new {} + # c[:SSLCertificate] = + # OpenSSL::X509::Certificate.new(File.read('mycert.crt')) + # c[:SSLPrivateKey] = OpenSSL::PKey::RSA.new(File.read('mycert.key')) + # c[:SSLVerifyMode] = OpenSSL::SSL::VERIFY_PEER + # c[:SSLCACertificatePath] = "/etc/ssl/certs/" + # c.setup_certificate + # + # or + # + # require 'drb/ssl' + # c = DRb::DRbSSLSocket::SSLConfig.new({ + # :SSLCertName => [["CN" => DRb::DRbSSLSocket.getservername]] + # }) + # c.setup_certificate + # def initialize(config) - @config = config + @config = config @cert = config[:SSLCertificate] @pkey = config[:SSLPrivateKey] @ssl_ctx = nil end - def [](key); - @config[key] || DEFAULT[key] + # A convenience method to access the values like a Hash + def [](key); + @config[key] || DEFAULT[key] end + # Connect to IO +tcp+, with context of the current certificate + # configuration def connect(tcp) - ssl = ::OpenSSL::SSL::SSLSocket.new(tcp, @ssl_ctx) - ssl.sync = true - ssl.connect - ssl + ssl = ::OpenSSL::SSL::SSLSocket.new(tcp, @ssl_ctx) + ssl.sync = true + ssl.connect + ssl end - + + # Accept connection to IO +tcp+, with context of the current certificate + # configuration def accept(tcp) - ssl = OpenSSL::SSL::SSLSocket.new(tcp, @ssl_ctx) - ssl.sync = true - ssl.accept - ssl + ssl = OpenSSL::SSL::SSLSocket.new(tcp, @ssl_ctx) + ssl.sync = true + ssl.accept + ssl end - + + # Ensures that :SSLCertificate and :SSLPrivateKey have been provided + # or that a new certificate is generated with the other parameters + # provided. def setup_certificate if @cert && @pkey return end - rsa = OpenSSL::PKey::RSA.new(512){|p, n| - next unless self[:verbose] - case p - when 0; $stderr.putc "." # BN_generate_prime - when 1; $stderr.putc "+" # BN_generate_prime - when 2; $stderr.putc "*" # searching good prime, - # n = #of try, - # but also data from BN_generate_prime - when 3; $stderr.putc "\n" # found good prime, n==0 - p, n==1 - q, - # but also data from BN_generate_prime - else; $stderr.putc "*" # BN_generate_prime - end - } - - cert = OpenSSL::X509::Certificate.new - cert.version = 3 - cert.serial = 0 - name = OpenSSL::X509::Name.new(self[:SSLCertName]) - cert.subject = name - cert.issuer = name - cert.not_before = Time.now - cert.not_after = Time.now + (365*24*60*60) - cert.public_key = rsa.public_key - - ef = OpenSSL::X509::ExtensionFactory.new(nil,cert) - cert.extensions = [ - ef.create_extension("basicConstraints","CA:FALSE"), - ef.create_extension("subjectKeyIdentifier", "hash") ] - ef.issuer_certificate = cert - cert.add_extension(ef.create_extension("authorityKeyIdentifier", - "keyid:always,issuer:always")) - if comment = self[:SSLCertComment] - cert.add_extension(ef.create_extension("nsComment", comment)) - end - cert.sign(rsa, OpenSSL::Digest::SHA1.new) - - @cert = cert + rsa = OpenSSL::PKey::RSA.new(1024){|p, n| + next unless self[:verbose] + case p + when 0; $stderr.putc "." # BN_generate_prime + when 1; $stderr.putc "+" # BN_generate_prime + when 2; $stderr.putc "*" # searching good prime, + # n = #of try, + # but also data from BN_generate_prime + when 3; $stderr.putc "\n" # found good prime, n==0 - p, n==1 - q, + # but also data from BN_generate_prime + else; $stderr.putc "*" # BN_generate_prime + end + } + + cert = OpenSSL::X509::Certificate.new + cert.version = 3 + cert.serial = 0 + name = OpenSSL::X509::Name.new(self[:SSLCertName]) + cert.subject = name + cert.issuer = name + cert.not_before = Time.now + cert.not_after = Time.now + (365*24*60*60) + cert.public_key = rsa.public_key + + ef = OpenSSL::X509::ExtensionFactory.new(nil,cert) + cert.extensions = [ + ef.create_extension("basicConstraints","CA:FALSE"), + ef.create_extension("subjectKeyIdentifier", "hash") ] + ef.issuer_certificate = cert + cert.add_extension(ef.create_extension("authorityKeyIdentifier", + "keyid:always,issuer:always")) + if comment = self[:SSLCertComment] + cert.add_extension(ef.create_extension("nsComment", comment)) + end + cert.sign(rsa, OpenSSL::Digest::SHA1.new) + + @cert = cert @pkey = rsa end + # Establish the OpenSSL::SSL::SSLContext with the configuration + # parameters provided. def setup_ssl_context ctx = ::OpenSSL::SSL::SSLContext.new ctx.cert = @cert ctx.key = @pkey - ctx.client_ca = self[:SSLClientCA] - ctx.ca_path = self[:SSLCACertificatePath] - ctx.ca_file = self[:SSLCACertificateFile] - ctx.verify_mode = self[:SSLVerifyMode] - ctx.verify_depth = self[:SSLVerifyDepth] - ctx.verify_callback = self[:SSLVerifyCallback] + ctx.client_ca = self[:SSLClientCA] + ctx.ca_path = self[:SSLCACertificatePath] + ctx.ca_file = self[:SSLCACertificateFile] + ctx.tmp_dh_callback = self[:SSLTmpDhCallback] + ctx.verify_mode = self[:SSLVerifyMode] + ctx.verify_depth = self[:SSLVerifyDepth] + ctx.verify_callback = self[:SSLVerifyCallback] ctx.cert_store = self[:SSLCertificateStore] @ssl_ctx = ctx end end - def self.parse_uri(uri) + # Parse the dRuby +uri+ for an SSL connection. + # + # Expects drbssl://... + # + # Raises DRbBadScheme or DRbBadURI if +uri+ is not matching or malformed + def self.parse_uri(uri) # :nodoc: if uri =~ /^drbssl:\/\/(.*?):(\d+)(\?(.*))?$/ - host = $1 - port = $2.to_i - option = $4 - [host, port, option] + host = $1 + port = $2.to_i + option = $4 + [host, port, option] else - raise(DRbBadScheme, uri) unless uri =~ /^drbssl:/ - raise(DRbBadURI, 'can\'t parse uri:' + uri) + raise(DRbBadScheme, uri) unless uri =~ /^drbssl:/ + raise(DRbBadURI, 'can\'t parse uri:' + uri) end end + # Return an DRb::DRbSSLSocket instance as a client-side connection, + # with the SSL connected. This is called from DRb::start_service or while + # connecting to a remote object: + # + # DRb.start_service 'drbssl://localhost:0', front, config + # + # +uri+ is the URI we are connected to, + # <code>'drbssl://localhost:0'</code> above, +config+ is our + # configuration. Either a Hash or DRb::DRbSSLSocket::SSLConfig def self.open(uri, config) - host, port, option = parse_uri(uri) + host, port, = parse_uri(uri) host.untaint port.untaint soc = TCPSocket.open(host, port) @@ -132,59 +257,90 @@ module DRb self.new(uri, ssl, ssl_conf, true) end + # Returns a DRb::DRbSSLSocket instance as a server-side connection, with + # the SSL connected. This is called from DRb::start_service or while + # connecting to a remote object: + # + # DRb.start_service 'drbssl://localhost:0', front, config + # + # +uri+ is the URI we are connected to, + # <code>'drbssl://localhost:0'</code> above, +config+ is our + # configuration. Either a Hash or DRb::DRbSSLSocket::SSLConfig def self.open_server(uri, config) uri = 'drbssl://:0' unless uri - host, port, opt = parse_uri(uri) + host, port, = parse_uri(uri) if host.size == 0 host = getservername soc = open_server_inaddr_any(host, port) else - soc = TCPServer.open(host, port) + soc = TCPServer.open(host, port) end port = soc.addr[1] if port == 0 @uri = "drbssl://#{host}:#{port}" - + ssl_conf = SSLConfig.new(config) ssl_conf.setup_certificate ssl_conf.setup_ssl_context self.new(@uri, soc, ssl_conf, false) end - def self.uri_option(uri, config) + # This is a convenience method to parse +uri+ and separate out any + # additional options appended in the +uri+. + # + # Returns an option-less uri and the option => [uri,option] + # + # The +config+ is completely unused, so passing nil is sufficient. + def self.uri_option(uri, config) # :nodoc: host, port, option = parse_uri(uri) return "drbssl://#{host}:#{port}", option end + # Create a DRb::DRbSSLSocket instance. + # + # +uri+ is the URI we are connected to. + # +soc+ is the tcp socket we are bound to. + # +config+ is our configuration. Either a Hash or SSLConfig + # +is_established+ is a boolean of whether +soc+ is currently established + # + # This is called automatically based on the DRb protocol. def initialize(uri, soc, config, is_established) @ssl = is_established ? soc : nil super(uri, soc.to_io, config) end - - def stream; @ssl; end - def close + # Returns the SSL stream + def stream; @ssl; end # :nodoc: + + # Closes the SSL stream before closing the dRuby connection. + def close # :nodoc: if @ssl - @ssl.close - @ssl = nil + @ssl.close + @ssl = nil end super end - - def accept + + def accept # :nodoc: begin while true - soc = @socket.accept - break if (@acl ? @acl.allow_socket?(soc) : true) - soc.close + soc = accept_or_shutdown + return nil unless soc + break if (@acl ? @acl.allow_socket?(soc) : true) + soc.close + end + begin + ssl = @config.accept(soc) + rescue Exception + soc.close + raise end - ssl = @config.accept(soc) self.class.new(uri, ssl, @config, true) rescue OpenSSL::SSL::SSLError - warn("#{__FILE__}:#{__LINE__}: warning: #{$!.message} (#{$!.class})") if @config[:verbose] - retry + warn("#{$!.message} (#{$!.class})", uplevel: 0) if @config[:verbose] + retry end end end - + DRbProtocol.add_protocol(DRbSSLSocket) end diff --git a/lib/drb/timeridconv.rb b/lib/drb/timeridconv.rb index bb2c48d528..9ac7e1e69c 100644 --- a/lib/drb/timeridconv.rb +++ b/lib/drb/timeridconv.rb @@ -1,88 +1,94 @@ +# frozen_string_literal: false require 'drb/drb' require 'monitor' module DRb + + # Timer id conversion keeps objects alive for a certain amount of time after + # their last access. The default time period is 600 seconds and can be + # changed upon initialization. + # + # To use TimerIdConv: + # + # DRb.install_id_conv TimerIdConv.new 60 # one minute + class TimerIdConv < DRbIdConv - class TimerHolder2 + class TimerHolder2 # :nodoc: include MonitorMixin class InvalidIndexError < RuntimeError; end - def initialize(timeout=600) - super() - @sentinel = Object.new - @gc = {} - @curr = {} - @renew = {} - @timeout = timeout - @keeper = keeper + def initialize(keeping=600) + super() + @sentinel = Object.new + @gc = {} + @renew = {} + @keeping = keeping + @expires = nil end def add(obj) - synchronize do - key = obj.__id__ - @curr[key] = obj - return key - end + synchronize do + rotate + key = obj.__id__ + @renew[key] = obj + invoke_keeper + return key + end end - def fetch(key, dv=@sentinel) - synchronize do - obj = peek(key) - if obj == @sentinel - return dv unless dv == @sentinel - raise InvalidIndexError - end - @renew[key] = obj # KeepIt - return obj - end + def fetch(key) + synchronize do + rotate + obj = peek(key) + raise InvalidIndexError if obj == @sentinel + @renew[key] = obj # KeepIt + return obj + end end - def include?(key) - synchronize do - obj = peek(key) - return false if obj == @sentinel - true - end + private + def peek(key) + return @renew.fetch(key) { @gc.fetch(key, @sentinel) } end - def peek(key) - synchronize do - return @curr.fetch(key, @renew.fetch(key, @gc.fetch(key, @sentinel))) - end + def invoke_keeper + return if @expires + @expires = Time.now + @keeping + on_gc end - private - def alternate - synchronize do - @gc = @curr # GCed - @curr = @renew - @renew = {} - end + def on_gc + return unless Thread.main.alive? + return if @expires.nil? + Thread.new { rotate } if @expires < Time.now + ObjectSpace.define_finalizer(Object.new) {on_gc} end - def keeper - Thread.new do - loop do - size = alternate - sleep(@timeout) - end - end + def rotate + synchronize do + if @expires &.< Time.now + @gc = @renew # GCed + @renew = {} + @expires = @gc.empty? ? nil : Time.now + @keeping + end + end end end - def initialize(timeout=600) - @holder = TimerHolder2.new(timeout) + # Creates a new TimerIdConv which will hold objects for +keeping+ seconds. + def initialize(keeping=600) + @holder = TimerHolder2.new(keeping) end - def to_obj(ref) + def to_obj(ref) # :nodoc: return super if ref.nil? @holder.fetch(ref) rescue TimerHolder2::InvalidIndexError raise "invalid reference" end - def to_id(obj) + def to_id(obj) # :nodoc: return @holder.add(obj) end end diff --git a/lib/drb/unix.rb b/lib/drb/unix.rb index 989ec57eed..adacf6df5b 100644 --- a/lib/drb/unix.rb +++ b/lib/drb/unix.rb @@ -1,3 +1,4 @@ +# frozen_string_literal: false require 'socket' require 'drb/drb' require 'tmpdir' @@ -6,33 +7,39 @@ raise(LoadError, "UNIXServer is required") unless defined?(UNIXServer) module DRb + # Implements DRb over a UNIX socket + # + # DRb UNIX socket URIs look like <code>drbunix:<path>?<option></code>. The + # option is optional. + class DRbUNIXSocket < DRbTCPSocket + # :stopdoc: def self.parse_uri(uri) - if /^drbunix:(.*?)(\?(.*))?$/ =~ uri - filename = $1 - option = $3 - [filename, option] + if /^drbunix:(.*?)(\?(.*))?$/ =~ uri + filename = $1 + option = $3 + [filename, option] else - raise(DRbBadScheme, uri) unless uri =~ /^drbunix:/ - raise(DRbBadURI, 'can\'t parse uri:' + uri) + raise(DRbBadScheme, uri) unless uri =~ /^drbunix:/ + raise(DRbBadURI, 'can\'t parse uri:' + uri) end end def self.open(uri, config) - filename, option = parse_uri(uri) + filename, = parse_uri(uri) filename.untaint soc = UNIXSocket.open(filename) self.new(uri, soc, config) end def self.open_server(uri, config) - filename, option = parse_uri(uri) + filename, = parse_uri(uri) if filename.size == 0 - soc = temp_server + soc = temp_server filename = soc.path - uri = 'drbunix:' + soc.path + uri = 'drbunix:' + soc.path else - soc = UNIXServer.open(filename) + soc = UNIXServer.open(filename) end owner = config[:UNIXFileOwner] group = config[:UNIXFileGroup] @@ -59,7 +66,7 @@ module DRb @server_mode = server_mode @acl = nil end - + # import from tempfile.rb Max_try = 10 private @@ -67,18 +74,18 @@ module DRb tmpdir = Dir::tmpdir n = 0 while true - begin - tmpname = sprintf('%s/druby%d.%d', tmpdir, $$, n) - lock = tmpname + '.lock' - unless File.exist?(tmpname) or File.exist?(lock) - Dir.mkdir(lock) - break - end - rescue - raise "cannot generate tempfile `%s'" % tmpname if n >= Max_try - #sleep(1) - end - n += 1 + begin + tmpname = sprintf('%s/druby%d.%d', tmpdir, $$, n) + lock = tmpname + '.lock' + unless File.exist?(tmpname) or File.exist?(lock) + Dir.mkdir(lock) + break + end + rescue + raise "cannot generate tempfile `%s'" % tmpname if n >= Max_try + #sleep(1) + end + n += 1 end soc = UNIXServer.new(tmpname) Dir.rmdir(lock) @@ -88,21 +95,24 @@ module DRb public def close return unless @socket - path = @socket.path + path = @socket.path if @server_mode @socket.close File.unlink(path) if @server_mode @socket = nil + close_shutdown_pipe end def accept - s = @socket.accept + s = accept_or_shutdown + return nil unless s self.class.new(nil, s, @config) end def set_sockopt(soc) - soc.fcntl(Fcntl::F_SETFL, Fcntl::FD_CLOEXEC) if defined? Fcntl::FD_CLOEXEC + # no-op for now end end DRbProtocol.add_protocol(DRbUNIXSocket) + # :startdoc: end |
