diff options
Diffstat (limited to 'lib/bundler/vendor/connection_pool')
6 files changed, 169 insertions, 166 deletions
diff --git a/lib/bundler/vendor/connection_pool/.document b/lib/bundler/vendor/connection_pool/.document new file mode 100644 index 0000000000..0c43bbd6b3 --- /dev/null +++ b/lib/bundler/vendor/connection_pool/.document @@ -0,0 +1 @@ +# Vendored files do not need to be documented diff --git a/lib/bundler/vendor/connection_pool/lib/connection_pool.rb b/lib/bundler/vendor/connection_pool/lib/connection_pool.rb index fbcd26c765..317088a866 100644 --- a/lib/bundler/vendor/connection_pool/lib/connection_pool.rb +++ b/lib/bundler/vendor/connection_pool/lib/connection_pool.rb @@ -1,14 +1,20 @@ -require_relative 'connection_pool/version' -require_relative 'connection_pool/timed_stack' +require_relative "../../../vendored_timeout" +require_relative "connection_pool/version" +class Bundler::ConnectionPool + class Error < ::RuntimeError; end + + class PoolShuttingDownError < ::Bundler::ConnectionPool::Error; end + + class TimeoutError < ::Gem::Timeout::Error; end +end -# Generic connection pool class for e.g. sharing a limited number of network connections -# among many threads. Note: Connections are lazily created. +# Generic connection pool class for sharing a limited number of objects or network connections +# among many threads. Note: pool elements are lazily created. # # Example usage with block (faster): # # @pool = Bundler::ConnectionPool.new { Redis.new } -# # @pool.with do |redis| # redis.lpop('my-list') if redis.llen('my-list') > 0 # end @@ -30,33 +36,72 @@ require_relative 'connection_pool/timed_stack' # Accepts the following options: # - :size - number of connections to pool, defaults to 5 # - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds +# - :auto_reload_after_fork - automatically drop all connections after fork, defaults to true # class Bundler::ConnectionPool - DEFAULTS = {size: 5, timeout: 5} - - class Error < RuntimeError - end + DEFAULTS = {size: 5, timeout: 5, auto_reload_after_fork: true} def self.wrap(options, &block) Wrapper.new(options, &block) end + if Process.respond_to?(:fork) + INSTANCES = ObjectSpace::WeakMap.new + private_constant :INSTANCES + + def self.after_fork + INSTANCES.values.each do |pool| + next unless pool.auto_reload_after_fork + + # We're on after fork, so we know all other threads are dead. + # All we need to do is to ensure the main thread doesn't have a + # checked out connection + pool.checkin(force: true) + pool.reload do |connection| + # Unfortunately we don't know what method to call to close the connection, + # so we try the most common one. + connection.close if connection.respond_to?(:close) + end + end + nil + end + + if ::Process.respond_to?(:_fork) # MRI 3.1+ + module ForkTracker + def _fork + pid = super + if pid == 0 + Bundler::ConnectionPool.after_fork + end + pid + end + end + Process.singleton_class.prepend(ForkTracker) + end + else + INSTANCES = nil + private_constant :INSTANCES + + def self.after_fork + # noop + end + end + def initialize(options = {}, &block) - raise ArgumentError, 'Connection pool requires a block' unless block + raise ArgumentError, "Connection pool requires a block" unless block options = DEFAULTS.merge(options) - @size = options.fetch(:size) + @size = Integer(options.fetch(:size)) @timeout = options.fetch(:timeout) + @auto_reload_after_fork = options.fetch(:auto_reload_after_fork) @available = TimedStack.new(@size, &block) - @key = :"current-#{@available.object_id}" - @key_count = :"current-#{@available.object_id}-count" + @key = :"pool-#{@available.object_id}" + @key_count = :"pool-#{@available.object_id}-count" + INSTANCES[self] = self if INSTANCES end -if Thread.respond_to?(:handle_interrupt) - - # MRI def with(options = {}) Thread.handle_interrupt(Exception => :never) do conn = checkout(options) @@ -69,93 +114,62 @@ if Thread.respond_to?(:handle_interrupt) end end end - -else - - # jruby 1.7.x - def with(options = {}) - conn = checkout(options) - begin - yield conn - ensure - checkin - end - end - -end + alias_method :then, :with def checkout(options = {}) if ::Thread.current[@key] - ::Thread.current[@key_count]+= 1 + ::Thread.current[@key_count] += 1 ::Thread.current[@key] else - ::Thread.current[@key_count]= 1 - ::Thread.current[@key]= @available.pop(options[:timeout] || @timeout) + ::Thread.current[@key_count] = 1 + ::Thread.current[@key] = @available.pop(options[:timeout] || @timeout) end end - def checkin + def checkin(force: false) if ::Thread.current[@key] - if ::Thread.current[@key_count] == 1 + if ::Thread.current[@key_count] == 1 || force @available.push(::Thread.current[@key]) - ::Thread.current[@key]= nil + ::Thread.current[@key] = nil + ::Thread.current[@key_count] = nil else - ::Thread.current[@key_count]-= 1 + ::Thread.current[@key_count] -= 1 end - else - raise Bundler::ConnectionPool::Error, 'no connections are checked out' + elsif !force + raise Bundler::ConnectionPool::Error, "no connections are checked out" end nil end + ## + # Shuts down the Bundler::ConnectionPool by passing each connection to +block+ and + # then removing it from the pool. Attempting to checkout a connection after + # shutdown will raise +Bundler::ConnectionPool::PoolShuttingDownError+. + def shutdown(&block) @available.shutdown(&block) end - # Size of this connection pool - def size - @size + ## + # Reloads the Bundler::ConnectionPool by passing each connection to +block+ and then + # removing it the pool. Subsequent checkouts will create new connections as + # needed. + + def reload(&block) + @available.shutdown(reload: true, &block) end + # Size of this connection pool + attr_reader :size + # Automatically drop all connections after fork + attr_reader :auto_reload_after_fork + # Number of pool entries available for checkout at this instant. def available @available.length end - - private - - class Wrapper < ::BasicObject - METHODS = [:with, :pool_shutdown] - - def initialize(options = {}, &block) - @pool = options.fetch(:pool) { ::Bundler::ConnectionPool.new(options, &block) } - end - - def with(&block) - @pool.with(&block) - end - - def pool_shutdown(&block) - @pool.shutdown(&block) - end - - def pool_size - @pool.size - end - - def pool_available - @pool.available - end - - def respond_to?(id, *args) - METHODS.include?(id) || with { |c| c.respond_to?(id, *args) } - end - - def method_missing(name, *args, &block) - with do |connection| - connection.send(name, *args, &block) - end - end - end end + +require_relative "connection_pool/timed_stack" +require_relative "connection_pool/wrapper" diff --git a/lib/bundler/vendor/connection_pool/lib/connection_pool/monotonic_time.rb b/lib/bundler/vendor/connection_pool/lib/connection_pool/monotonic_time.rb deleted file mode 100644 index 5a9c4a27bb..0000000000 --- a/lib/bundler/vendor/connection_pool/lib/connection_pool/monotonic_time.rb +++ /dev/null @@ -1,66 +0,0 @@ -# Global monotonic clock from Concurrent Ruby 1.0. -# Copyright (c) Jerry D'Antonio -- released under the MIT license. -# Slightly modified; used with permission. -# https://github.com/ruby-concurrency/concurrent-ruby - -require 'thread' - -class Bundler::ConnectionPool - - class_definition = Class.new do - - if defined?(Process::CLOCK_MONOTONIC) - - # @!visibility private - def get_time - Process.clock_gettime(Process::CLOCK_MONOTONIC) - end - - elsif defined?(RUBY_ENGINE) && RUBY_ENGINE == 'jruby' - - # @!visibility private - def get_time - java.lang.System.nanoTime() / 1_000_000_000.0 - end - - else - - # @!visibility private - def initialize - @mutex = Mutex.new - @last_time = Time.now.to_f - end - - # @!visibility private - def get_time - @mutex.synchronize do - now = Time.now.to_f - if @last_time < now - @last_time = now - else # clock has moved back in time - @last_time += 0.000_001 - end - end - end - end - end - - ## - # Clock that cannot be set and represents monotonic time since - # some unspecified starting point. - # - # @!visibility private - GLOBAL_MONOTONIC_CLOCK = class_definition.new - private_constant :GLOBAL_MONOTONIC_CLOCK - - class << self - ## - # Returns the current time a tracked by the application monotonic clock. - # - # @return [Float] The current monotonic time when `since` not given else - # the elapsed monotonic time between `since` and the current time - def monotonic_time - GLOBAL_MONOTONIC_CLOCK.get_time - end - end -end diff --git a/lib/bundler/vendor/connection_pool/lib/connection_pool/timed_stack.rb b/lib/bundler/vendor/connection_pool/lib/connection_pool/timed_stack.rb index f3fe1e04ad..35d1d7cc35 100644 --- a/lib/bundler/vendor/connection_pool/lib/connection_pool/timed_stack.rb +++ b/lib/bundler/vendor/connection_pool/lib/connection_pool/timed_stack.rb @@ -1,13 +1,3 @@ -require 'thread' -require 'timeout' -require_relative 'monotonic_time' - -## -# Raised when you attempt to retrieve a connection from a pool that has been -# shut down. - -class Bundler::ConnectionPool::PoolShuttingDownError < RuntimeError; end - ## # The TimedStack manages a pool of homogeneous connections (or any resource # you wish to manage). Connections are created lazily up to a given maximum @@ -25,7 +15,7 @@ class Bundler::ConnectionPool::PoolShuttingDownError < RuntimeError; end # # conn = ts.pop # ts.pop timeout: 5 -# #=> raises Timeout::Error after 5 seconds +# #=> raises Bundler::ConnectionPool::TimeoutError after 5 seconds class Bundler::ConnectionPool::TimedStack attr_reader :max @@ -39,8 +29,8 @@ class Bundler::ConnectionPool::TimedStack @created = 0 @que = [] @max = size - @mutex = Mutex.new - @resource = ConditionVariable.new + @mutex = Thread::Mutex.new + @resource = Thread::ConditionVariable.new @shutdown_block = nil end @@ -64,7 +54,7 @@ class Bundler::ConnectionPool::TimedStack ## # Retrieves a connection from the stack. If a connection is available it is # immediately returned. If no connection is available within the given - # timeout a Timeout::Error is raised. + # timeout a Bundler::ConnectionPool::TimeoutError is raised. # # +:timeout+ is the only checked entry in +options+ and is preferred over # the +timeout+ argument (which will be removed in a future release). Other @@ -74,7 +64,7 @@ class Bundler::ConnectionPool::TimedStack options, timeout = timeout, 0.5 if Hash === timeout timeout = options.fetch :timeout, timeout - deadline = Bundler::ConnectionPool.monotonic_time + timeout + deadline = current_time + timeout @mutex.synchronize do loop do raise Bundler::ConnectionPool::PoolShuttingDownError if @shutdown_block @@ -83,25 +73,28 @@ class Bundler::ConnectionPool::TimedStack connection = try_create(options) return connection if connection - to_wait = deadline - Bundler::ConnectionPool.monotonic_time - raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 + to_wait = deadline - current_time + raise Bundler::ConnectionPool::TimeoutError, "Waited #{timeout} sec, #{length}/#{@max} available" if to_wait <= 0 @resource.wait(@mutex, to_wait) end end end ## - # Shuts down the TimedStack which prevents connections from being checked - # out. The +block+ is called once for each connection on the stack. + # Shuts down the TimedStack by passing each connection to +block+ and then + # removing it from the pool. Attempting to checkout a connection after + # shutdown will raise +Bundler::ConnectionPool::PoolShuttingDownError+ unless + # +:reload+ is +true+. - def shutdown(&block) - raise ArgumentError, "shutdown must receive a block" unless block_given? + def shutdown(reload: false, &block) + raise ArgumentError, "shutdown must receive a block" unless block @mutex.synchronize do @shutdown_block = block @resource.broadcast shutdown_connections + @shutdown_block = nil if reload end end @@ -121,6 +114,10 @@ class Bundler::ConnectionPool::TimedStack private + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + ## # This is an extension point for TimedStack and is called with a mutex. # @@ -149,6 +146,7 @@ class Bundler::ConnectionPool::TimedStack conn = fetch_connection(options) @shutdown_block.call(conn) end + @created = 0 end ## diff --git a/lib/bundler/vendor/connection_pool/lib/connection_pool/version.rb b/lib/bundler/vendor/connection_pool/lib/connection_pool/version.rb index b149c0e242..384d6fc977 100644 --- a/lib/bundler/vendor/connection_pool/lib/connection_pool/version.rb +++ b/lib/bundler/vendor/connection_pool/lib/connection_pool/version.rb @@ -1,3 +1,3 @@ class Bundler::ConnectionPool - VERSION = "2.2.2" + VERSION = "2.4.1" end diff --git a/lib/bundler/vendor/connection_pool/lib/connection_pool/wrapper.rb b/lib/bundler/vendor/connection_pool/lib/connection_pool/wrapper.rb new file mode 100644 index 0000000000..dd796d1021 --- /dev/null +++ b/lib/bundler/vendor/connection_pool/lib/connection_pool/wrapper.rb @@ -0,0 +1,56 @@ +class Bundler::ConnectionPool + class Wrapper < ::BasicObject + METHODS = [:with, :pool_shutdown, :wrapped_pool] + + def initialize(options = {}, &block) + @pool = options.fetch(:pool) { ::Bundler::ConnectionPool.new(options, &block) } + end + + def wrapped_pool + @pool + end + + def with(&block) + @pool.with(&block) + end + + def pool_shutdown(&block) + @pool.shutdown(&block) + end + + def pool_size + @pool.size + end + + def pool_available + @pool.available + end + + def respond_to?(id, *args) + METHODS.include?(id) || with { |c| c.respond_to?(id, *args) } + end + + # rubocop:disable Style/MissingRespondToMissing + if ::RUBY_VERSION >= "3.0.0" + def method_missing(name, *args, **kwargs, &block) + with do |connection| + connection.send(name, *args, **kwargs, &block) + end + end + elsif ::RUBY_VERSION >= "2.7.0" + ruby2_keywords def method_missing(name, *args, &block) + with do |connection| + connection.send(name, *args, &block) + end + end + else + def method_missing(name, *args, &block) + with do |connection| + connection.send(name, *args, &block) + end + end + end + # rubocop:enable Style/MethodMissingSuper + # rubocop:enable Style/MissingRespondToMissing + end +end |