summaryrefslogtreecommitdiff
path: root/lib/drb/drb.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/drb/drb.rb')
-rw-r--r--lib/drb/drb.rb854
1 files changed, 854 insertions, 0 deletions
diff --git a/lib/drb/drb.rb b/lib/drb/drb.rb
new file mode 100644
index 00000000000..85cd3bc8986
--- /dev/null
+++ b/lib/drb/drb.rb
@@ -0,0 +1,854 @@
+=begin
+= distributed Ruby --- dRuby 2.0.4
+ Copyright (c) 1999-2003 Masatoshi SEKI
+ You can redistribute it and/or modify it under the same terms as Ruby.
+=end
+
+require 'socket'
+require 'thread'
+require 'fcntl'
+
+module DRb
+ class DRbError < RuntimeError; end
+ class DRbConnError < DRbError; end
+
+ class DRbIdConv
+ def to_obj(ref)
+ ObjectSpace._id2ref(ref)
+ end
+
+ def to_id(obj)
+ obj.nil? ? nil : obj.__id__
+ end
+ end
+
+ module DRbUndumped
+ def _dump(dummy)
+ raise TypeError, 'can\'t dump'
+ end
+ end
+
+ class DRbServerNotFound < DRbError; end
+ class DRbBadURI < DRbError; end
+ class DRbBadScheme < DRbError; end
+
+ class DRbUnknownError < DRbError
+ def initialize(unknown)
+ @unknown = unknown
+ super(unknown.name)
+ end
+ attr_reader :unknown
+
+ def self._load(s)
+ Marshal::load(s)
+ end
+
+ def _dump(lv)
+ Marshal::dump(@unknown)
+ end
+ end
+
+ class DRbUnknown
+ def initialize(err, buf)
+ case err
+ when /uninitialized constant (\S+)/
+ @name = $1
+ when /undefined class\/module (\S+)/
+ @name = $1
+ else
+ @name = nil
+ end
+ @buf = buf
+ end
+ attr_reader :name, :buf
+
+ def self._load(s)
+ begin
+ Marshal::load(s)
+ rescue NameError, ArgumentError
+ DRbUnknown.new($!, s)
+ end
+ end
+
+ def _dump(lv)
+ @buf
+ end
+
+ def reload
+ self.class._load(@buf)
+ end
+
+ def exception
+ DRbUnknownError.new(self)
+ end
+ end
+
+ class DRbMessage
+ def initialize(config)
+ @load_limit = config[:load_limit]
+ @argc_limit = config[:argc_limit]
+ end
+
+ def dump(obj)
+ obj = DRbObject.new(obj) if obj.kind_of? DRbUndumped
+ begin
+ str = Marshal::dump(obj)
+ rescue
+ str = Marshal::dump(DRbObject.new(obj))
+ end
+ [str.size].pack('N') + str
+ end
+
+ def load(soc)
+ sz = soc.read(4) # sizeof (N)
+ raise(DRbConnError, 'connection closed') if sz.nil?
+ raise(DRbConnError, 'premature header') if sz.size < 4
+ sz = sz.unpack('N')[0]
+ raise(DRbConnError, "too large packet #{sz}") if @load_limit < sz
+ str = soc.read(sz)
+ raise(DRbConnError, 'connection closed') if sz.nil?
+ raise(DRbConnError, 'premature marshal format(can\'t read)') if str.size < sz
+ begin
+ Marshal::load(str)
+ rescue NameError, ArgumentError
+ DRbUnknown.new($!, str)
+ end
+ end
+
+ def send_request(stream, ref, msg_id, arg, b)
+ ary = []
+ ary.push(dump(ref.__drbref))
+ ary.push(dump(msg_id.id2name))
+ ary.push(dump(arg.length))
+ arg.each do |e|
+ ary.push(dump(e))
+ end
+ ary.push(dump(b))
+ stream.write(ary.join(''))
+ end
+
+ def recv_request(stream)
+ ref = load(stream)
+ ro = DRb.to_obj(ref)
+ msg = load(stream)
+ argc = load(stream)
+ raise ArgumentError, 'too many arguments' if @argc_limit < argc
+ argv = Array.new(argc, nil)
+ argc.times do |n|
+ argv[n] = load(stream)
+ end
+ block = load(stream)
+ return ro, msg, argv, block
+ end
+
+ def send_reply(stream, succ, result)
+ stream.write(dump(succ) + dump(result))
+ end
+
+ def recv_reply(stream)
+ succ = load(stream)
+ result = load(stream)
+ [succ, result]
+ end
+ end
+
+ module DRbProtocol
+ module_function
+ def add_protocol(prot)
+ @protocol.push(prot)
+ end
+
+ module_function
+ def open(uri, config, first=true)
+ @protocol.each do |prot|
+ begin
+ return prot.open(uri, config)
+ rescue DRbBadScheme
+ rescue DRbConnError
+ raise($!)
+ rescue
+ raise(DRbConnError, "#{uri} - #{$!.inspect}")
+ end
+ end
+ if first && (config[:auto_load] != false)
+ auto_load(uri, config)
+ return open(uri, config, false)
+ end
+ raise DRbBadURI, 'can\'t parse uri:' + uri
+ end
+
+ module_function
+ def open_server(uri, config, first=true)
+ @protocol.each do |prot|
+ begin
+ return prot.open_server(uri, config)
+ rescue DRbBadScheme
+ end
+ end
+ if first && (config[:auto_load] != false)
+ auto_load(uri, config)
+ return open_server(uri, config, false)
+ end
+ raise DRbBadURI, 'can\'t parse uri:' + uri
+ end
+
+ module_function
+ def uri_option(uri, config, first=true)
+ @protocol.each do |prot|
+ begin
+ uri, opt = prot.uri_option(uri, config)
+ # opt = nil if opt == ''
+ return uri, opt
+ rescue DRbBadScheme
+ end
+ end
+ if first && (config[:auto_load] != false)
+ auto_load(uri, config)
+ return uri_option(uri, config, false)
+ end
+ raise DRbBadURI, 'can\'t parse uri:' + uri
+ end
+
+ module_function
+ def auto_load(uri, config)
+ if uri =~ /^drb([a-z0-9]+):/
+ require("drb/#{$1}") rescue nil
+ end
+ end
+ end
+
+ class DRbTCPSocket
+ private
+ def self.parse_uri(uri)
+ if uri =~ /^druby:\/\/(.*?):(\d+)(\?(.*))?$/
+ host = $1
+ port = $2.to_i
+ option = $4
+ [host, port, option]
+ else
+ raise(DRbBadScheme, uri) unless uri =~ /^druby:/
+ raise(DRbBadURI, 'can\'t parse uri:' + uri)
+ end
+ end
+
+ public
+ def self.open(uri, config)
+ host, port, option = parse_uri(uri)
+ host.untaint
+ port.untaint
+ soc = TCPSocket.open(host, port)
+ self.new(uri, soc, config)
+ end
+
+ def self.open_server(uri, config)
+ uri = 'druby://:0' unless uri
+ host, port, opt = parse_uri(uri)
+ if host.size == 0
+ soc = TCPServer.open(port)
+ host = Socket.gethostname
+ else
+ soc = TCPServer.open(host, port)
+ end
+ port = soc.addr[1] if port == 0
+ uri = "druby://#{host}:#{port}"
+ self.new(uri, soc, config)
+ end
+
+ def self.uri_option(uri, config)
+ host, port, option = parse_uri(uri)
+ return "druby://#{host}:#{port}", option
+ end
+
+ def initialize(uri, soc, config={})
+ @uri = uri
+ @socket = soc
+ @config = config
+ @acl = config[:tcp_acl]
+ @msg = DRbMessage.new(config)
+ set_sockopt(@socket)
+ end
+ attr_reader :uri
+
+ def peeraddr
+ @socket.peeraddr
+ end
+
+ def stream; @socket; end
+
+ def send_request(ref, msg_id, arg, b)
+ @msg.send_request(stream, ref, msg_id, arg, b)
+ end
+
+ def recv_request
+ @msg.recv_request(stream)
+ end
+
+ def send_reply(succ, result)
+ @msg.send_reply(stream, succ, result)
+ end
+
+ def recv_reply
+ @msg.recv_reply(stream)
+ end
+
+ public
+ def close
+ if @socket
+ @socket.close
+ @socket = nil
+ end
+ end
+
+ def accept
+ while true
+ s = @socket.accept
+ break if (@acl ? @acl.allow_socket?(s) : true)
+ s.close
+ end
+ self.class.new(nil, s, @config)
+ end
+
+ def alive?
+ return false unless @socket
+ if IO.select([@socket], nil, nil, 0)
+ close
+ return false
+ end
+ true
+ end
+
+ def set_sockopt(soc)
+ soc.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
+ soc.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) if defined? Fcntl::O_NONBLOCK
+ soc.fcntl(Fcntl::F_SETFL, Fcntl::FD_CLOEXEC) if defined? Fcntl::FD_CLOEXEC
+ end
+ end
+
+ module DRbProtocol
+ @protocol = [DRbTCPSocket] # default
+ end
+
+ class DRbURIOption
+ def initialize(option)
+ @option = option.to_s
+ end
+ attr :option
+ def to_s; @option; end
+
+ def ==(other)
+ return false unless DRbURIOption === other
+ @option == other.option
+ end
+
+ def hash
+ @option.hash
+ end
+
+ alias eql? ==
+ end
+
+ class DRbObject
+ def self._load(s)
+ uri, ref = Marshal.load(s)
+ if DRb.here?(uri)
+ return DRb.to_obj(ref)
+ end
+
+ it = self.new(nil)
+ it.reinit(uri, ref)
+ it
+ end
+
+ def self.new_with_uri(uri)
+ self.new(nil, uri)
+ end
+
+ def _dump(lv)
+ Marshal.dump([@uri, @ref])
+ end
+
+ def initialize(obj, uri=nil)
+ @uri = nil
+ @ref = nil
+ if obj.nil?
+ return if uri.nil?
+ @uri, option = DRbProtocol.uri_option(uri, DRb.config)
+ @ref = DRbURIOption.new(option) unless option.nil?
+ else
+ @uri = uri ? uri : (DRb.uri rescue nil)
+ @ref = obj ? DRb.to_id(obj) : nil
+ end
+ end
+
+ def reinit(uri, ref)
+ @uri = uri
+ @ref = ref
+ end
+
+ def __drburi
+ @uri
+ end
+
+ def __drbref
+ @ref
+ end
+
+ undef :to_s
+ undef :to_a
+ undef :respond_to?
+
+ def method_missing(msg_id, *a, &b)
+ if DRb.here?(@uri)
+ obj = DRb.to_obj(@ref)
+ DRb.current_server.check_insecure_method(obj, msg_id)
+ return obj.__send__(msg_id, *a, &b)
+ end
+
+ succ, result = DRbConn.open(@uri) do |conn|
+ conn.send_message(self, msg_id, a, b)
+ end
+ return result if succ
+ unless DRbUnknown === result
+ prefix = "(#{@uri}) "
+ bt = []
+ result.backtrace.each do |x|
+ break if /`__send__'$/ =~ x
+ if /^\(druby:\/\// =~ x
+ bt.push(x)
+ else
+ bt.push(prefix + x)
+ end
+ end
+ raise result, result.message, bt + caller
+ else
+ raise result
+ end
+ end
+ end
+
+ class DRbConn
+ POOL_SIZE = 16
+ @mutex = Mutex.new
+ @pool = []
+
+ def self.open(remote_uri)
+ begin
+ conn = nil
+
+ @mutex.synchronize do
+ #FIXME
+ new_pool = []
+ @pool.each do |c|
+ if conn.nil? and c.uri == remote_uri
+ conn = c if c.alive?
+ else
+ new_pool.push c
+ end
+ end
+ @pool = new_pool
+ end
+
+ conn = self.new(remote_uri) unless conn
+ succ, result = yield(conn)
+ return succ, result
+
+ ensure
+ @mutex.synchronize do
+ if @pool.size > POOL_SIZE or ! succ
+ conn.close if conn
+ else
+ @pool.unshift(conn)
+ end
+ end
+ end
+ end
+
+ def initialize(remote_uri)
+ @uri = remote_uri
+ @protocol = DRbProtocol.open(remote_uri, DRb.config)
+ end
+ attr_reader :uri
+
+ def send_message(ref, msg_id, arg, block)
+ @protocol.send_request(ref, msg_id, arg, block)
+ @protocol.recv_reply
+ end
+
+ def close
+ @protocol.close
+ @protocol = nil
+ end
+
+ def alive?
+ @protocol.alive?
+ end
+ end
+
+ class DRbServer
+ @@acl = nil
+ @@idconv = DRbIdConv.new
+ @@secondary_server = nil
+ @@argc_limit = 256
+ @@load_limit = 256 * 102400
+ @@verbose = false
+
+ def self.default_argc_limit(argc)
+ @@argc_limit = argc
+ end
+
+ def self.default_load_limit(sz)
+ @@load_limit = sz
+ end
+
+ def self.default_acl(acl)
+ @@acl = acl
+ end
+
+ def self.default_id_conv(idconv)
+ @@idconv = idconv
+ end
+
+ def self.verbose=(on)
+ @@verbose = on
+ end
+
+ def self.verbose
+ @@verbose
+ end
+
+ def self.make_config(hash={})
+ default_config = {
+ :idconv => @@idconv,
+ :verbose => @@verbose,
+ :tcp_acl => @@acl,
+ :load_limit => @@load_limit,
+ :argc_limit => @@argc_limit
+ }
+ default_config.update(hash)
+ end
+
+ def initialize(uri=nil, front=nil, config_or_acl=nil)
+ if Hash === config_or_acl
+ config = config_or_acl.dup
+ else
+ acl = config_or_acl || @@acl
+ config = {
+ :tcp_acl => acl
+ }
+ end
+
+ @config = self.class.make_config(config)
+
+ @protocol = DRbProtocol.open_server(uri, @config)
+ @uri = @protocol.uri
+
+ @front = front
+ @idconv = @config[:idconv]
+
+ @grp = ThreadGroup.new
+ @thread = run
+
+ Thread.exclusive do
+ DRb.primary_server = self unless DRb.primary_server
+ end
+ end
+ attr_reader :uri, :thread, :front
+ attr_reader :config
+
+ def verbose=(v); @config[:verbose]=v; end
+ def verbose; @config[:verbose]; end
+
+ def alive?
+ @thread.alive?
+ end
+
+ def stop_service
+ @thread.kill
+ end
+
+ def to_obj(ref)
+ return front if ref.nil?
+ return front[ref.to_s] if DRbURIOption === ref
+ @idconv.to_obj(ref)
+ end
+
+ def to_id(obj)
+ return nil if obj.__id__ == front.__id__
+ @idconv.to_id(obj)
+ end
+
+ private
+ def kill_sub_thread
+ Thread.new do
+ grp = ThreadGroup.new
+ grp.add(Thread.current)
+ list = @grp.list
+ while list.size > 0
+ list.each do |th|
+ th.kill if th.alive?
+ end
+ list = @grp.list
+ end
+ end
+ end
+
+ def run
+ Thread.start do
+ begin
+ while true
+ main_loop
+ end
+ ensure
+ @protocol.close if @protocol
+ kill_sub_thread
+ end
+ end
+ end
+
+ INSECURE_METHOD = [
+ :__send__
+ ]
+ def insecure_method?(msg_id)
+ INSECURE_METHOD.include?(msg_id)
+ end
+
+ def any_to_s(obj)
+ obj.to_s rescue sprintf("#<%s:0x%lx>", obj.class, obj.__id__)
+ end
+
+ def check_insecure_method(obj, msg_id)
+ return true if Proc === obj && msg_id == :__drb_yield
+ raise(ArgumentError, "#{any_to_s(msg_id)} is not a symbol") unless Symbol == msg_id.class
+ raise(SecurityError, "insecure method `#{msg_id}'") if insecure_method?(msg_id)
+ unless obj.respond_to?(msg_id)
+ desc = any_to_s(obj)
+ if desc.nil? || desc[0] == '#'
+ desc << ":#{obj.class}"
+ end
+
+ if obj.private_methods.include?(msg_id.to_s)
+ raise NameError, "private method `#{msg_id}' called for #{desc}"
+ else
+ raise NameError, "undefined method `#{msg_id}' called for #{desc}"
+ end
+ end
+ true
+ end
+ public :check_insecure_method
+
+ class InvokeMethod
+ def initialize(drb_server, client)
+ @drb_server = drb_server
+ @client = client
+ end
+
+ def perform
+ @result = nil
+ @succ = false
+ setup_message
+ if @block
+ @result = perform_with_block
+ else
+ @result = perform_without_block
+ end
+ @succ = true
+ return @succ, @result
+ rescue StandardError, ScriptError, Interrupt
+ @result = $!
+ return @succ, @result
+ end
+
+ private
+ def init_with_client
+ obj, msg, argv, block = @client.recv_request
+ @obj = obj
+ @msg_id = msg.intern
+ @argv = argv
+ @block = block
+ end
+
+ def check_insecure_method
+ @drb_server.check_insecure_method(@obj, @msg_id)
+ end
+
+ def setup_message
+ init_with_client
+ check_insecure_method
+ end
+
+ def perform_without_block
+ if Proc === @obj && @msg_id == :__drb_yield
+ if @argv.size == 1
+ ary = @argv
+ else
+ ary = [@argv]
+ end
+ ary.collect(&@obj)[0]
+ else
+ @obj.__send__(@msg_id, *@argv)
+ end
+ end
+
+ def rescue_local_jump(err)
+ case err.message
+ when /^retry/ # retry from proc-closure
+ return :retry
+ when /^break/ # break from proc-closure
+ return rescue_break(err)
+ else
+ return :unknown
+ end
+ end
+
+ end
+
+ if RUBY_VERSION >= '1.8'
+ require 'drb/invokemethod'
+ class InvokeMethod
+ include InvokeMethod18Mixin
+ end
+ else
+ module InvokeMethod16Mixin
+ def block_yield(x)
+ if x.class == Array
+ block_value = @block.__drb_yield(*x)
+ else
+ block_value = @block.__drb_yield(x)
+ end
+ end
+
+ def rescue_break(err)
+ return :break
+ end
+
+ def perform_with_block
+ @obj.__send__(@msg_id, *@argv) do |x|
+ jump_error = nil
+ begin
+ block_value = block_yield(x)
+ rescue LocalJumpError
+ jump_error = $!
+ end
+ if jump_error
+ reason ,= rescue_local_jump(jump_error)
+ case reason
+ when :retry
+ retry
+ when :break
+ break
+ else
+ raise jump_error
+ end
+ end
+ block_value
+ end
+ end
+ end
+
+ class InvokeMethod
+ include InvokeMethod16Mixin
+ end
+ end
+
+ def main_loop
+ Thread.start(@protocol.accept) do |client|
+ @grp.add Thread.current
+ Thread.current['DRb'] = { 'client' => client ,
+ 'server' => self }
+ loop do
+ begin
+ succ = false
+ invoke_method = InvokeMethod.new(self, client)
+ succ, result = invoke_method.perform
+ if !succ && verbose
+ p result
+ result.backtrace.each do |x|
+ puts x
+ end
+ end
+ client.send_reply(succ, result) rescue nil
+ ensure
+ unless succ
+ client.close
+ return
+ end
+ end
+ end
+ end
+ end
+ end
+
+ @primary_server = nil
+
+ def start_service(uri=nil, front=nil, config=nil)
+ @primary_server = DRbServer.new(uri, front, config)
+ end
+ module_function :start_service
+
+ attr_accessor :primary_server
+ module_function :primary_server=, :primary_server
+
+ def current_server
+ drb = Thread.current['DRb']
+ server = (drb && drb['server']) ? drb['server'] : @primary_server
+ raise DRbServerNotFound unless server
+ return server
+ end
+ module_function :current_server
+
+ def stop_service
+ @primary_server.stop_service if @primary_server
+ @primary_server = nil
+ end
+ module_function :stop_service
+
+ def uri
+ current_server.uri
+ end
+ module_function :uri
+
+ def here?(uri)
+ (current_server.uri rescue nil) == uri
+ end
+ module_function :here?
+
+ def config
+ current_server.config
+ rescue
+ DRbServer.make_config
+ end
+ module_function :config
+
+ def front
+ current_server.front
+ end
+ module_function :front
+
+ def to_obj(ref)
+ current_server.to_obj(ref)
+ end
+ def to_id(obj)
+ current_server.to_id(obj)
+ end
+ module_function :to_id
+ module_function :to_obj
+
+ def thread
+ @primary_server ? @primary_server.thread : nil
+ end
+ module_function :thread
+
+ def install_id_conv(idconv)
+ DRbServer.default_id_conv(idconv)
+ end
+ module_function :install_id_conv
+
+ def install_acl(acl)
+ DRbServer.default_acl(acl)
+ end
+ module_function :install_acl
+end
+
+DRbObject = DRb::DRbObject
+DRbUndumped = DRb::DRbUndumped
+DRbIdConv = DRb::DRbIdConv