From 8488d5b5b6cc9205e8e0641c514f1f2e38bf7d1e Mon Sep 17 00:00:00 2001 From: Masatoshi SEKI Date: Mon, 14 Oct 2019 20:30:22 +0900 Subject: Automatically close fds on fork (and GC). The connection pools are maintained at thread scope. --- lib/drb/drb.rb | 100 +++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 76 insertions(+), 24 deletions(-) (limited to 'lib/drb') diff --git a/lib/drb/drb.rb b/lib/drb/drb.rb index 8c2ef63963..c90ff8e169 100644 --- a/lib/drb/drb.rb +++ b/lib/drb/drb.rb @@ -1211,6 +1211,49 @@ module DRb end end + class ThreadObject + include MonitorMixin + + def initialize(&blk) + super() + @wait_ev = new_cond + @req_ev = new_cond + @res_ev = new_cond + @status = :wait + @req = nil + @res = nil + @thread = Thread.new(self, &blk) + end + + def alive? + @thread.alive? + end + + def method_missing(msg, *arg, &blk) + synchronize do + @wait_ev.wait_until { @status == :wait } + @req = [msg] + arg + @status = :req + @req_ev.broadcast + @res_ev.wait_until { @status == :res } + value = @res + @req = @res = nil + @status = :wait + @wait_ev.broadcast + return value + end + end + + def _execute() + synchronize do + @req_ev.wait_until { @status == :req } + @res = yield(@req) + @status = :res + @res_ev.signal + end + end + end + # Class handling the connection between a DRbObject and the # server the real object lives on. # @@ -1222,31 +1265,45 @@ module DRb # not normally need to deal with it directly. class DRbConn POOL_SIZE = 16 # :nodoc: - @mutex = Thread::Mutex.new - @pool = [] - def self.open(remote_uri) # :nodoc: - begin - conn = nil - pid = $$ - - @mutex.synchronize do - #FIXME - new_pool = [] - @pool.each do |c| - if c.pid == pid - if conn.nil? and c.uri == remote_uri - conn = c if c.alive? - else - new_pool.push c + def self.make_pool + ThreadObject.new do |queue| + pool = [] + while true + queue._execute do |message| + case(message[0]) + when :take then + remote_uri = message[1] + conn = nil + new_pool = [] + pool.each do |c| + if conn.nil? and c.uri == remote_uri + conn = c if c.alive? + else + new_pool.push c + end end + pool = new_pool + conn + when :store then + conn = message[1] + pool.unshift(conn) + pool.pop.close while pool.size > POOL_SIZE + conn else - c.close + nil end end - @pool = new_pool end + end + end + @pool_proxy = make_pool + def self.open(remote_uri) # :nodoc: + begin + @pool_proxy = make_pool unless @pool_proxy.alive? + + conn = @pool_proxy.take(remote_uri) conn = self.new(remote_uri) unless conn succ, result = yield(conn) return succ, result @@ -1254,10 +1311,7 @@ module DRb ensure if conn if succ - @mutex.synchronize do - @pool.unshift(conn) - @pool.pop.close while @pool.size > POOL_SIZE - end + @pool_proxy.store(conn) else conn.close end @@ -1267,11 +1321,9 @@ module DRb def initialize(remote_uri) # :nodoc: @uri = remote_uri - @pid = $$ @protocol = DRbProtocol.open(remote_uri, DRb.config) end attr_reader :uri # :nodoc: - attr_reader :pid # :nodoc: def send_message(ref, msg_id, arg, block) # :nodoc: @protocol.send_request(ref, msg_id, arg, block) -- cgit v1.2.3