summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMasatoshi SEKI <m_seki@mva.biglobe.ne.jp>2019-10-14 20:30:22 +0900
committerMasatoshi SEKI <m_seki@mva.biglobe.ne.jp>2019-10-14 20:30:22 +0900
commit8488d5b5b6cc9205e8e0641c514f1f2e38bf7d1e (patch)
tree8ea91a3be39771563ec14ca69f188940be3bfaa9 /lib
parenta4cf11c10fcaadfab738d8e98712551f701eef5a (diff)
Automatically close fds on fork (and GC). The connection pools are maintained at thread scope.
Diffstat (limited to 'lib')
-rw-r--r--lib/drb/drb.rb100
1 files changed, 76 insertions, 24 deletions
diff --git a/lib/drb/drb.rb b/lib/drb/drb.rb
index 8c2ef63963..c90ff8e169 100644
--- a/lib/drb/drb.rb
+++ b/lib/drb/drb.rb
@@ -1211,6 +1211,49 @@ module DRb
end
end
+ class ThreadObject
+ include MonitorMixin
+
+ def initialize(&blk)
+ super()
+ @wait_ev = new_cond
+ @req_ev = new_cond
+ @res_ev = new_cond
+ @status = :wait
+ @req = nil
+ @res = nil
+ @thread = Thread.new(self, &blk)
+ end
+
+ def alive?
+ @thread.alive?
+ end
+
+ def method_missing(msg, *arg, &blk)
+ synchronize do
+ @wait_ev.wait_until { @status == :wait }
+ @req = [msg] + arg
+ @status = :req
+ @req_ev.broadcast
+ @res_ev.wait_until { @status == :res }
+ value = @res
+ @req = @res = nil
+ @status = :wait
+ @wait_ev.broadcast
+ return value
+ end
+ end
+
+ def _execute()
+ synchronize do
+ @req_ev.wait_until { @status == :req }
+ @res = yield(@req)
+ @status = :res
+ @res_ev.signal
+ end
+ end
+ end
+
# Class handling the connection between a DRbObject and the
# server the real object lives on.
#
@@ -1222,31 +1265,45 @@ module DRb
# not normally need to deal with it directly.
class DRbConn
POOL_SIZE = 16 # :nodoc:
- @mutex = Thread::Mutex.new
- @pool = []
- def self.open(remote_uri) # :nodoc:
- begin
- conn = nil
- pid = $$
-
- @mutex.synchronize do
- #FIXME
- new_pool = []
- @pool.each do |c|
- if c.pid == pid
- if conn.nil? and c.uri == remote_uri
- conn = c if c.alive?
- else
- new_pool.push c
+ def self.make_pool
+ ThreadObject.new do |queue|
+ pool = []
+ while true
+ queue._execute do |message|
+ case(message[0])
+ when :take then
+ remote_uri = message[1]
+ conn = nil
+ new_pool = []
+ pool.each do |c|
+ if conn.nil? and c.uri == remote_uri
+ conn = c if c.alive?
+ else
+ new_pool.push c
+ end
end
+ pool = new_pool
+ conn
+ when :store then
+ conn = message[1]
+ pool.unshift(conn)
+ pool.pop.close while pool.size > POOL_SIZE
+ conn
else
- c.close
+ nil
end
end
- @pool = new_pool
end
+ end
+ end
+ @pool_proxy = make_pool
+ def self.open(remote_uri) # :nodoc:
+ begin
+ @pool_proxy = make_pool unless @pool_proxy.alive?
+
+ conn = @pool_proxy.take(remote_uri)
conn = self.new(remote_uri) unless conn
succ, result = yield(conn)
return succ, result
@@ -1254,10 +1311,7 @@ module DRb
ensure
if conn
if succ
- @mutex.synchronize do
- @pool.unshift(conn)
- @pool.pop.close while @pool.size > POOL_SIZE
- end
+ @pool_proxy.store(conn)
else
conn.close
end
@@ -1267,11 +1321,9 @@ module DRb
def initialize(remote_uri) # :nodoc:
@uri = remote_uri
- @pid = $$
@protocol = DRbProtocol.open(remote_uri, DRb.config)
end
attr_reader :uri # :nodoc:
- attr_reader :pid # :nodoc:
def send_message(ref, msg_id, arg, block) # :nodoc:
@protocol.send_request(ref, msg_id, arg, block)