From 151f1241c665b0307234e931bec2c32bfea0138d Mon Sep 17 00:00:00 2001 From: seki Date: Wed, 18 Jun 2003 15:45:12 +0000 Subject: import from drb-2.0.4b3 git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@3959 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- MANIFEST | 10 + lib/drb.rb | 2 + lib/drb/drb.rb | 854 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/drb/eq.rb | 16 + lib/drb/extserv.rb | 67 ++++ lib/drb/extservm.rb | 94 ++++++ lib/drb/gw.rb | 60 ++++ lib/drb/invokemethod.rb | 39 +++ lib/drb/observer.rb | 22 ++ lib/drb/timeridconv.rb | 91 ++++++ lib/drb/unix.rb | 98 ++++++ 11 files changed, 1353 insertions(+) create mode 100644 lib/drb.rb create mode 100644 lib/drb/drb.rb create mode 100644 lib/drb/eq.rb create mode 100644 lib/drb/extserv.rb create mode 100644 lib/drb/extservm.rb create mode 100644 lib/drb/gw.rb create mode 100644 lib/drb/invokemethod.rb create mode 100644 lib/drb/observer.rb create mode 100644 lib/drb/timeridconv.rb create mode 100644 lib/drb/unix.rb diff --git a/MANIFEST b/MANIFEST index 8d41fb2519..36b99b2175 100644 --- a/MANIFEST +++ b/MANIFEST @@ -120,6 +120,16 @@ lib/date/format.rb lib/date2.rb lib/debug.rb lib/delegate.rb +lib/drb.rb +lib/drb/drb.rb +lib/drb/eq.rb +lib/drb/extserv.rb +lib/drb/extservm.rb +lib/drb/gw.rb +lib/drb/invokemethod.rb +lib/drb/observer.rb +lib/drb/timeridconv.rb +lib/drb/unix.rb lib/e2mmap.rb lib/erb.rb lib/eregex.rb diff --git a/lib/drb.rb b/lib/drb.rb new file mode 100644 index 0000000000..93cc811e14 --- /dev/null +++ b/lib/drb.rb @@ -0,0 +1,2 @@ +require 'drb/drb' + diff --git a/lib/drb/drb.rb b/lib/drb/drb.rb new file mode 100644 index 0000000000..85cd3bc898 --- /dev/null +++ b/lib/drb/drb.rb @@ -0,0 +1,854 @@ +=begin += distributed Ruby --- dRuby 2.0.4 + Copyright (c) 1999-2003 Masatoshi SEKI + You can redistribute it and/or modify it under the same terms as Ruby. +=end + +require 'socket' +require 'thread' +require 'fcntl' + +module DRb + class DRbError < RuntimeError; end + class DRbConnError < DRbError; end + + class DRbIdConv + def to_obj(ref) + ObjectSpace._id2ref(ref) + end + + def to_id(obj) + obj.nil? ? nil : obj.__id__ + end + end + + module DRbUndumped + def _dump(dummy) + raise TypeError, 'can\'t dump' + end + end + + class DRbServerNotFound < DRbError; end + class DRbBadURI < DRbError; end + class DRbBadScheme < DRbError; end + + class DRbUnknownError < DRbError + def initialize(unknown) + @unknown = unknown + super(unknown.name) + end + attr_reader :unknown + + def self._load(s) + Marshal::load(s) + end + + def _dump(lv) + Marshal::dump(@unknown) + end + end + + class DRbUnknown + def initialize(err, buf) + case err + when /uninitialized constant (\S+)/ + @name = $1 + when /undefined class\/module (\S+)/ + @name = $1 + else + @name = nil + end + @buf = buf + end + attr_reader :name, :buf + + def self._load(s) + begin + Marshal::load(s) + rescue NameError, ArgumentError + DRbUnknown.new($!, s) + end + end + + def _dump(lv) + @buf + end + + def reload + self.class._load(@buf) + end + + def exception + DRbUnknownError.new(self) + end + end + + class DRbMessage + def initialize(config) + @load_limit = config[:load_limit] + @argc_limit = config[:argc_limit] + end + + def dump(obj) + obj = DRbObject.new(obj) if obj.kind_of? DRbUndumped + begin + str = Marshal::dump(obj) + rescue + str = Marshal::dump(DRbObject.new(obj)) + end + [str.size].pack('N') + str + end + + def load(soc) + sz = soc.read(4) # sizeof (N) + raise(DRbConnError, 'connection closed') if sz.nil? + raise(DRbConnError, 'premature header') if sz.size < 4 + sz = sz.unpack('N')[0] + raise(DRbConnError, "too large packet #{sz}") if @load_limit < sz + str = soc.read(sz) + raise(DRbConnError, 'connection closed') if sz.nil? + raise(DRbConnError, 'premature marshal format(can\'t read)') if str.size < sz + begin + Marshal::load(str) + rescue NameError, ArgumentError + DRbUnknown.new($!, str) + end + end + + def send_request(stream, ref, msg_id, arg, b) + ary = [] + ary.push(dump(ref.__drbref)) + ary.push(dump(msg_id.id2name)) + ary.push(dump(arg.length)) + arg.each do |e| + ary.push(dump(e)) + end + ary.push(dump(b)) + stream.write(ary.join('')) + end + + def recv_request(stream) + ref = load(stream) + ro = DRb.to_obj(ref) + msg = load(stream) + argc = load(stream) + raise ArgumentError, 'too many arguments' if @argc_limit < argc + argv = Array.new(argc, nil) + argc.times do |n| + argv[n] = load(stream) + end + block = load(stream) + return ro, msg, argv, block + end + + def send_reply(stream, succ, result) + stream.write(dump(succ) + dump(result)) + end + + def recv_reply(stream) + succ = load(stream) + result = load(stream) + [succ, result] + end + end + + module DRbProtocol + module_function + def add_protocol(prot) + @protocol.push(prot) + end + + module_function + def open(uri, config, first=true) + @protocol.each do |prot| + begin + return prot.open(uri, config) + rescue DRbBadScheme + rescue DRbConnError + raise($!) + rescue + raise(DRbConnError, "#{uri} - #{$!.inspect}") + end + end + if first && (config[:auto_load] != false) + auto_load(uri, config) + return open(uri, config, false) + end + raise DRbBadURI, 'can\'t parse uri:' + uri + end + + module_function + def open_server(uri, config, first=true) + @protocol.each do |prot| + begin + return prot.open_server(uri, config) + rescue DRbBadScheme + end + end + if first && (config[:auto_load] != false) + auto_load(uri, config) + return open_server(uri, config, false) + end + raise DRbBadURI, 'can\'t parse uri:' + uri + end + + module_function + def uri_option(uri, config, first=true) + @protocol.each do |prot| + begin + uri, opt = prot.uri_option(uri, config) + # opt = nil if opt == '' + return uri, opt + rescue DRbBadScheme + end + end + if first && (config[:auto_load] != false) + auto_load(uri, config) + return uri_option(uri, config, false) + end + raise DRbBadURI, 'can\'t parse uri:' + uri + end + + module_function + def auto_load(uri, config) + if uri =~ /^drb([a-z0-9]+):/ + require("drb/#{$1}") rescue nil + end + end + end + + class DRbTCPSocket + private + def self.parse_uri(uri) + if uri =~ /^druby:\/\/(.*?):(\d+)(\?(.*))?$/ + host = $1 + port = $2.to_i + option = $4 + [host, port, option] + else + raise(DRbBadScheme, uri) unless uri =~ /^druby:/ + raise(DRbBadURI, 'can\'t parse uri:' + uri) + end + end + + public + def self.open(uri, config) + host, port, option = parse_uri(uri) + host.untaint + port.untaint + soc = TCPSocket.open(host, port) + self.new(uri, soc, config) + end + + def self.open_server(uri, config) + uri = 'druby://:0' unless uri + host, port, opt = parse_uri(uri) + if host.size == 0 + soc = TCPServer.open(port) + host = Socket.gethostname + else + soc = TCPServer.open(host, port) + end + port = soc.addr[1] if port == 0 + uri = "druby://#{host}:#{port}" + self.new(uri, soc, config) + end + + def self.uri_option(uri, config) + host, port, option = parse_uri(uri) + return "druby://#{host}:#{port}", option + end + + def initialize(uri, soc, config={}) + @uri = uri + @socket = soc + @config = config + @acl = config[:tcp_acl] + @msg = DRbMessage.new(config) + set_sockopt(@socket) + end + attr_reader :uri + + def peeraddr + @socket.peeraddr + end + + def stream; @socket; end + + def send_request(ref, msg_id, arg, b) + @msg.send_request(stream, ref, msg_id, arg, b) + end + + def recv_request + @msg.recv_request(stream) + end + + def send_reply(succ, result) + @msg.send_reply(stream, succ, result) + end + + def recv_reply + @msg.recv_reply(stream) + end + + public + def close + if @socket + @socket.close + @socket = nil + end + end + + def accept + while true + s = @socket.accept + break if (@acl ? @acl.allow_socket?(s) : true) + s.close + end + self.class.new(nil, s, @config) + end + + def alive? + return false unless @socket + if IO.select([@socket], nil, nil, 0) + close + return false + end + true + end + + def set_sockopt(soc) + soc.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + soc.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) if defined? Fcntl::O_NONBLOCK + soc.fcntl(Fcntl::F_SETFL, Fcntl::FD_CLOEXEC) if defined? Fcntl::FD_CLOEXEC + end + end + + module DRbProtocol + @protocol = [DRbTCPSocket] # default + end + + class DRbURIOption + def initialize(option) + @option = option.to_s + end + attr :option + def to_s; @option; end + + def ==(other) + return false unless DRbURIOption === other + @option == other.option + end + + def hash + @option.hash + end + + alias eql? == + end + + class DRbObject + def self._load(s) + uri, ref = Marshal.load(s) + if DRb.here?(uri) + return DRb.to_obj(ref) + end + + it = self.new(nil) + it.reinit(uri, ref) + it + end + + def self.new_with_uri(uri) + self.new(nil, uri) + end + + def _dump(lv) + Marshal.dump([@uri, @ref]) + end + + def initialize(obj, uri=nil) + @uri = nil + @ref = nil + if obj.nil? + return if uri.nil? + @uri, option = DRbProtocol.uri_option(uri, DRb.config) + @ref = DRbURIOption.new(option) unless option.nil? + else + @uri = uri ? uri : (DRb.uri rescue nil) + @ref = obj ? DRb.to_id(obj) : nil + end + end + + def reinit(uri, ref) + @uri = uri + @ref = ref + end + + def __drburi + @uri + end + + def __drbref + @ref + end + + undef :to_s + undef :to_a + undef :respond_to? + + def method_missing(msg_id, *a, &b) + if DRb.here?(@uri) + obj = DRb.to_obj(@ref) + DRb.current_server.check_insecure_method(obj, msg_id) + return obj.__send__(msg_id, *a, &b) + end + + succ, result = DRbConn.open(@uri) do |conn| + conn.send_message(self, msg_id, a, b) + end + return result if succ + unless DRbUnknown === result + prefix = "(#{@uri}) " + bt = [] + result.backtrace.each do |x| + break if /`__send__'$/ =~ x + if /^\(druby:\/\// =~ x + bt.push(x) + else + bt.push(prefix + x) + end + end + raise result, result.message, bt + caller + else + raise result + end + end + end + + class DRbConn + POOL_SIZE = 16 + @mutex = Mutex.new + @pool = [] + + def self.open(remote_uri) + begin + conn = nil + + @mutex.synchronize do + #FIXME + new_pool = [] + @pool.each do |c| + if conn.nil? and c.uri == remote_uri + conn = c if c.alive? + else + new_pool.push c + end + end + @pool = new_pool + end + + conn = self.new(remote_uri) unless conn + succ, result = yield(conn) + return succ, result + + ensure + @mutex.synchronize do + if @pool.size > POOL_SIZE or ! succ + conn.close if conn + else + @pool.unshift(conn) + end + end + end + end + + def initialize(remote_uri) + @uri = remote_uri + @protocol = DRbProtocol.open(remote_uri, DRb.config) + end + attr_reader :uri + + def send_message(ref, msg_id, arg, block) + @protocol.send_request(ref, msg_id, arg, block) + @protocol.recv_reply + end + + def close + @protocol.close + @protocol = nil + end + + def alive? + @protocol.alive? + end + end + + class DRbServer + @@acl = nil + @@idconv = DRbIdConv.new + @@secondary_server = nil + @@argc_limit = 256 + @@load_limit = 256 * 102400 + @@verbose = false + + def self.default_argc_limit(argc) + @@argc_limit = argc + end + + def self.default_load_limit(sz) + @@load_limit = sz + end + + def self.default_acl(acl) + @@acl = acl + end + + def self.default_id_conv(idconv) + @@idconv = idconv + end + + def self.verbose=(on) + @@verbose = on + end + + def self.verbose + @@verbose + end + + def self.make_config(hash={}) + default_config = { + :idconv => @@idconv, + :verbose => @@verbose, + :tcp_acl => @@acl, + :load_limit => @@load_limit, + :argc_limit => @@argc_limit + } + default_config.update(hash) + end + + def initialize(uri=nil, front=nil, config_or_acl=nil) + if Hash === config_or_acl + config = config_or_acl.dup + else + acl = config_or_acl || @@acl + config = { + :tcp_acl => acl + } + end + + @config = self.class.make_config(config) + + @protocol = DRbProtocol.open_server(uri, @config) + @uri = @protocol.uri + + @front = front + @idconv = @config[:idconv] + + @grp = ThreadGroup.new + @thread = run + + Thread.exclusive do + DRb.primary_server = self unless DRb.primary_server + end + end + attr_reader :uri, :thread, :front + attr_reader :config + + def verbose=(v); @config[:verbose]=v; end + def verbose; @config[:verbose]; end + + def alive? + @thread.alive? + end + + def stop_service + @thread.kill + end + + def to_obj(ref) + return front if ref.nil? + return front[ref.to_s] if DRbURIOption === ref + @idconv.to_obj(ref) + end + + def to_id(obj) + return nil if obj.__id__ == front.__id__ + @idconv.to_id(obj) + end + + private + def kill_sub_thread + Thread.new do + grp = ThreadGroup.new + grp.add(Thread.current) + list = @grp.list + while list.size > 0 + list.each do |th| + th.kill if th.alive? + end + list = @grp.list + end + end + end + + def run + Thread.start do + begin + while true + main_loop + end + ensure + @protocol.close if @protocol + kill_sub_thread + end + end + end + + INSECURE_METHOD = [ + :__send__ + ] + def insecure_method?(msg_id) + INSECURE_METHOD.include?(msg_id) + end + + def any_to_s(obj) + obj.to_s rescue sprintf("#<%s:0x%lx>", obj.class, obj.__id__) + end + + def check_insecure_method(obj, msg_id) + return true if Proc === obj && msg_id == :__drb_yield + raise(ArgumentError, "#{any_to_s(msg_id)} is not a symbol") unless Symbol == msg_id.class + raise(SecurityError, "insecure method `#{msg_id}'") if insecure_method?(msg_id) + unless obj.respond_to?(msg_id) + desc = any_to_s(obj) + if desc.nil? || desc[0] == '#' + desc << ":#{obj.class}" + end + + if obj.private_methods.include?(msg_id.to_s) + raise NameError, "private method `#{msg_id}' called for #{desc}" + else + raise NameError, "undefined method `#{msg_id}' called for #{desc}" + end + end + true + end + public :check_insecure_method + + class InvokeMethod + def initialize(drb_server, client) + @drb_server = drb_server + @client = client + end + + def perform + @result = nil + @succ = false + setup_message + if @block + @result = perform_with_block + else + @result = perform_without_block + end + @succ = true + return @succ, @result + rescue StandardError, ScriptError, Interrupt + @result = $! + return @succ, @result + end + + private + def init_with_client + obj, msg, argv, block = @client.recv_request + @obj = obj + @msg_id = msg.intern + @argv = argv + @block = block + end + + def check_insecure_method + @drb_server.check_insecure_method(@obj, @msg_id) + end + + def setup_message + init_with_client + check_insecure_method + end + + def perform_without_block + if Proc === @obj && @msg_id == :__drb_yield + if @argv.size == 1 + ary = @argv + else + ary = [@argv] + end + ary.collect(&@obj)[0] + else + @obj.__send__(@msg_id, *@argv) + end + end + + def rescue_local_jump(err) + case err.message + when /^retry/ # retry from proc-closure + return :retry + when /^break/ # break from proc-closure + return rescue_break(err) + else + return :unknown + end + end + + end + + if RUBY_VERSION >= '1.8' + require 'drb/invokemethod' + class InvokeMethod + include InvokeMethod18Mixin + end + else + module InvokeMethod16Mixin + def block_yield(x) + if x.class == Array + block_value = @block.__drb_yield(*x) + else + block_value = @block.__drb_yield(x) + end + end + + def rescue_break(err) + return :break + end + + def perform_with_block + @obj.__send__(@msg_id, *@argv) do |x| + jump_error = nil + begin + block_value = block_yield(x) + rescue LocalJumpError + jump_error = $! + end + if jump_error + reason ,= rescue_local_jump(jump_error) + case reason + when :retry + retry + when :break + break + else + raise jump_error + end + end + block_value + end + end + end + + class InvokeMethod + include InvokeMethod16Mixin + end + end + + def main_loop + Thread.start(@protocol.accept) do |client| + @grp.add Thread.current + Thread.current['DRb'] = { 'client' => client , + 'server' => self } + loop do + begin + succ = false + invoke_method = InvokeMethod.new(self, client) + succ, result = invoke_method.perform + if !succ && verbose + p result + result.backtrace.each do |x| + puts x + end + end + client.send_reply(succ, result) rescue nil + ensure + unless succ + client.close + return + end + end + end + end + end + end + + @primary_server = nil + + def start_service(uri=nil, front=nil, config=nil) + @primary_server = DRbServer.new(uri, front, config) + end + module_function :start_service + + attr_accessor :primary_server + module_function :primary_server=, :primary_server + + def current_server + drb = Thread.current['DRb'] + server = (drb && drb['server']) ? drb['server'] : @primary_server + raise DRbServerNotFound unless server + return server + end + module_function :current_server + + def stop_service + @primary_server.stop_service if @primary_server + @primary_server = nil + end + module_function :stop_service + + def uri + current_server.uri + end + module_function :uri + + def here?(uri) + (current_server.uri rescue nil) == uri + end + module_function :here? + + def config + current_server.config + rescue + DRbServer.make_config + end + module_function :config + + def front + current_server.front + end + module_function :front + + def to_obj(ref) + current_server.to_obj(ref) + end + def to_id(obj) + current_server.to_id(obj) + end + module_function :to_id + module_function :to_obj + + def thread + @primary_server ? @primary_server.thread : nil + end + module_function :thread + + def install_id_conv(idconv) + DRbServer.default_id_conv(idconv) + end + module_function :install_id_conv + + def install_acl(acl) + DRbServer.default_acl(acl) + end + module_function :install_acl +end + +DRbObject = DRb::DRbObject +DRbUndumped = DRb::DRbUndumped +DRbIdConv = DRb::DRbIdConv diff --git a/lib/drb/eq.rb b/lib/drb/eq.rb new file mode 100644 index 0000000000..e24512d6a7 --- /dev/null +++ b/lib/drb/eq.rb @@ -0,0 +1,16 @@ +require 'drb/drb' + +module DRb + class DRbObject + def ==(other) + return false unless DRbObject === other + (@ref == other.__drbref) && (@uri == other.__drburi) + end + + def hash + [@uri, @ref].hash + end + + alias eql? == + end +end diff --git a/lib/drb/extserv.rb b/lib/drb/extserv.rb new file mode 100644 index 0000000000..f05bae5264 --- /dev/null +++ b/lib/drb/extserv.rb @@ -0,0 +1,67 @@ +=begin + external service + Copyright (c) 2000,2002 Masatoshi SEKI +=end + +require 'drb/drb' + +module DRb + class ExtServ + include DRbUndumped + + def initialize(there, name, server=nil) + @server = server || DRb::primary_server + @name = name + ro = DRbObject.new(nil, there) + @invoker = ro.regist(name, DRbObject.new(self, @server.uri)) + end + attr_reader :server + + def front + DRbObject.new(nil, @server.uri) + end + + def stop_service + @invoker.unregist(@name) + server = @server + @server = nil + Thread.new do + sleep 1 + server.stop_service + end + true + end + + def alive? + @server ? @server.alive? : false + end + end +end + +if __FILE__ == $0 + class Foo + include DRbUndumped + + def initialize(str) + @str = str + end + + def hello(it) + "#{it}: #{self}" + end + + def to_s + @str + end + end + + cmd = ARGV.shift + case cmd + when 'itest1', 'itest2' + front = Foo.new(cmd) + manager = DRb::DRbServer.new(nil, front) + es = DRb::ExtService.new(ARGV.shift, ARGV.shift, manager) + es.server.thread.join + end +end + diff --git a/lib/drb/extservm.rb b/lib/drb/extservm.rb new file mode 100644 index 0000000000..50009dbae2 --- /dev/null +++ b/lib/drb/extservm.rb @@ -0,0 +1,94 @@ +=begin + external service manager + Copyright (c) 2000 Masatoshi SEKI +=end + +require 'drb/drb' +require 'thread' + +module DRb + class ExtServManager + include DRbUndumped + + @@command = {} + + def self.command + @@command + end + + def self.command=(cmd) + @@command = cmd + end + + def initialize + @servers = {} + @waiting = [] + @queue = Queue.new + @thread = invoke_thread + end + attr_reader :server + + def service(name) + while true + server = nil + Thread.exclusive do + server = @servers[name] if @servers[name] + end + return server if server && server.alive? + invoke_service(name) + end + end + + def regist(name, ro) + ary = nil + Thread.exclusive do + @servers[name] = ro + ary = @waiting + @waiting = [] + end + ary.each do |th| + begin + th.run + rescue ThreadError + end + end + self + end + + def unregist(name) + Thread.exclusive do + @servers.delete(name) + end + end + + private + def invoke_thread + Thread.new do + while true + name = @queue.pop + invoke_service_command(name, @@command[name]) + end + end + end + + def invoke_service(name) + Thread.critical = true + @waiting.push Thread.current + @queue.push name + Thread.stop + end + + def invoke_service_command(name, command) + raise "invalid command. name: #{name}" unless command + Thread.exclusive do + return if @servers.include?(name) + @servers[name] = false + end + if RUBY_PLATFORM =~ /mswin32/ + system("cmd /c start /b #{command} #{DRb.uri} #{name}") + else + system("#{command} #{DRb.uri} #{name} &") + end + end + end +end diff --git a/lib/drb/gw.rb b/lib/drb/gw.rb new file mode 100644 index 0000000000..012a2b0a11 --- /dev/null +++ b/lib/drb/gw.rb @@ -0,0 +1,60 @@ +require 'drb/drb' +require 'monitor' + +module DRb + class GWIdConv < DRbIdConv + def to_obj(ref) + if Array === ref && ref[0] == :DRbObject + it = DRbObject.new(nil) + it.reinit(ref[1], ref[2]) + return it + end + super(ref) + end + end + + class GW + include MonitorMixin + def initialize + super() + @hash = {} + end + + def [](key) + synchronize do + @hash[key] + end + end + + def []=(key, v) + synchronize do + @hash[key] = v + end + end + end + + class DRbObject + def self._load(s) + uri, ref = Marshal.load(s) + if DRb.uri == uri + return ref ? DRb.to_obj(ref) : DRb.front + end + + it = self.new(nil) + it.reinit(DRb.uri, [:DRbObject, uri, ref]) + it + end + + def _dump(lv) + if DRb.uri == @uri + if Array === @ref && @ref[0] == :DRbObject + Marshal.dump([@ref[1], @ref[2]]) + else + Marshal.dump([@uri, @ref]) # ?? + end + else + Marshal.dump([DRb.uri, [:DRbObject, @uri, @ref]]) + end + end + end +end diff --git a/lib/drb/invokemethod.rb b/lib/drb/invokemethod.rb new file mode 100644 index 0000000000..f8ce0ad09b --- /dev/null +++ b/lib/drb/invokemethod.rb @@ -0,0 +1,39 @@ + +# for ruby-1.8.0 + +module DRb + class DRbServer + module InvokeMethod18Mixin + def block_yield(x) + block_value = @block.call(*x) + end + + def rescue_break(err) + return :break, err.exit_value + end + + def perform_with_block + @obj.__send__(@msg_id, *@argv) do |*x| + jump_error = nil + begin + block_value = block_yield(x) + rescue LocalJumpError + jump_error = $! + end + if jump_error + reason, jump_value = rescue_local_jump(jump_error) + case reason + when :retry + retry + when :break + break(jump_value) + else + raise jump_error + end + end + block_value + end + end + end + end +end diff --git a/lib/drb/observer.rb b/lib/drb/observer.rb new file mode 100644 index 0000000000..e7f1668c52 --- /dev/null +++ b/lib/drb/observer.rb @@ -0,0 +1,22 @@ +require 'observer' + +module DRb + module DRbObservable + include Observable + + def notify_observers(*arg) + if defined? @observer_state and @observer_state + if defined? @observer_peers + for i in @observer_peers.dup + begin + i.update(*arg) + rescue + delete_observer(i) + end + end + end + @observer_state = false + end + end + end +end diff --git a/lib/drb/timeridconv.rb b/lib/drb/timeridconv.rb new file mode 100644 index 0000000000..bb2c48d528 --- /dev/null +++ b/lib/drb/timeridconv.rb @@ -0,0 +1,91 @@ +require 'drb/drb' +require 'monitor' + +module DRb + class TimerIdConv < DRbIdConv + class TimerHolder2 + include MonitorMixin + + class InvalidIndexError < RuntimeError; end + + def initialize(timeout=600) + super() + @sentinel = Object.new + @gc = {} + @curr = {} + @renew = {} + @timeout = timeout + @keeper = keeper + end + + def add(obj) + synchronize do + key = obj.__id__ + @curr[key] = obj + return key + end + end + + def fetch(key, dv=@sentinel) + synchronize do + obj = peek(key) + if obj == @sentinel + return dv unless dv == @sentinel + raise InvalidIndexError + end + @renew[key] = obj # KeepIt + return obj + end + end + + def include?(key) + synchronize do + obj = peek(key) + return false if obj == @sentinel + true + end + end + + def peek(key) + synchronize do + return @curr.fetch(key, @renew.fetch(key, @gc.fetch(key, @sentinel))) + end + end + + private + def alternate + synchronize do + @gc = @curr # GCed + @curr = @renew + @renew = {} + end + end + + def keeper + Thread.new do + loop do + size = alternate + sleep(@timeout) + end + end + end + end + + def initialize(timeout=600) + @holder = TimerHolder2.new(timeout) + end + + def to_obj(ref) + return super if ref.nil? + @holder.fetch(ref) + rescue TimerHolder2::InvalidIndexError + raise "invalid reference" + end + + def to_id(obj) + return @holder.add(obj) + end + end +end + +# DRb.install_id_conv(TimerIdConv.new) diff --git a/lib/drb/unix.rb b/lib/drb/unix.rb new file mode 100644 index 0000000000..f9a2f75acd --- /dev/null +++ b/lib/drb/unix.rb @@ -0,0 +1,98 @@ +require 'socket' +require 'drb/drb' + +module DRb + + class DRbUNIXSocket < DRbTCPSocket + def self.parse_uri(uri) + if /^drbunix:(.*?)(\?(.*))?$/ =~ uri + filename = $1 + option = $3 + [filename, option] + else + raise(DRbBadScheme, uri) unless uri =~ /^drbunix:/ + raise(DRbBadURI, 'can\'t parse uri:' + uri) + end + end + + def self.open(uri, config) + filename, option = parse_uri(uri) + filename.untaint + soc = UNIXSocket.open(filename) + self.new(uri, soc, config) + end + + def self.open_server(uri, config) + filename, option = parse_uri(uri) + if filename.size == 0 + soc = temp_server + uri = 'drbunix:' + soc.path + else + soc = UNIXServer.open(filename) + end + mode = config[:UNIXFileMode] + File.chmod(mode, filename) if mode + + self.new(uri, soc, config, true) + end + + def self.uri_option(uri, config) + filename, option = parse_uri(uri) + return "drbunix:#{filename}", option + end + + def initialize(uri, soc, config={}, server_mode = false) + super(uri, soc, config) + set_sockopt(@socket) + @server_mode = server_mode + @acl = nil + end + + # import from tempfile.rb + Max_try = 10 + private + def self.temp_server + tmpdir = ENV['TMPDIR'] || ENV['TMP'] || ENV['TEMP'] || '/tmp' + tmpdir = '/tmp' if $SAFE > 0 and tmpdir.tainted? + n = 0 + while true + begin + tmpname = sprintf('%s/druby%d.%d', tmpdir, $$, n) + lock = tmpname + '.lock' + unless File.exist?(tmpname) or File.exist?(lock) + Dir.mkdir(lock) + break + end + rescue + raise "cannot generate tempfile `%s'" % tmpname if n >= Max_try + #sleep(1) + end + n += 1 + end + soc = UNIXServer.new(tmpname) + Dir.rmdir(lock) + soc + end + + public + def close + return unless @socket + path = @socket.path + @socket.close + File.unlink(path) if @server_mode + @socket = nil + end + + def accept + s = @socket.accept + self.class.new(nil, s, @config) + end + + def set_sockopt(soc) + soc.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) if defined? Fcntl::O_NONBLOCK + soc.fcntl(Fcntl::F_SETFL, Fcntl::FD_CLOEXEC) if defined? Fcntl::FD_CLOEXEC + end + end + + DRbProtocol.add_protocol(DRbUNIXSocket) +end -- cgit v1.2.3