diff options
author | hsbt <hsbt@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2018-11-02 23:07:56 +0000 |
---|---|---|
committer | hsbt <hsbt@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2018-11-02 23:07:56 +0000 |
commit | 59c8d50653480bef3f24517296e6ddf937fdf6bc (patch) | |
tree | df10aaf4f3307837fe3d1d129d66f6c0c7586bc5 /lib/bundler/worker.rb | |
parent | 7deb37777a230837e865e0a11fb8d7c1dc6d03ce (diff) |
Added bundler as default gems. Revisit [Feature #12733]
* bin/*, lib/bundler/*, lib/bundler.rb, spec/bundler, man/*:
Merge from latest stable branch of bundler/bundler repository and
added workaround patches. I will backport them into upstream.
* common.mk, defs/gmake.mk: Added `test-bundler` task for test suite
of bundler.
* tool/sync_default_gems.rb: Added sync task for bundler.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@65509 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'lib/bundler/worker.rb')
-rw-r--r-- | lib/bundler/worker.rb | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/lib/bundler/worker.rb b/lib/bundler/worker.rb new file mode 100644 index 0000000000..e91cfa7805 --- /dev/null +++ b/lib/bundler/worker.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require "thread" + +module Bundler + class Worker + POISON = Object.new + + class WrappedException < StandardError + attr_reader :exception + def initialize(exn) + @exception = exn + end + end + + # @return [String] the name of the worker + attr_reader :name + + # Creates a worker pool of specified size + # + # @param size [Integer] Size of pool + # @param name [String] name the name of the worker + # @param func [Proc] job to run in inside the worker pool + def initialize(size, name, func) + @name = name + @request_queue = Queue.new + @response_queue = Queue.new + @func = func + @size = size + @threads = nil + SharedHelpers.trap("INT") { abort_threads } + end + + # Enqueue a request to be executed in the worker pool + # + # @param obj [String] mostly it is name of spec that should be downloaded + def enq(obj) + create_threads unless @threads + @request_queue.enq obj + end + + # Retrieves results of job function being executed in worker pool + def deq + result = @response_queue.deq + raise result.exception if result.is_a?(WrappedException) + result + end + + def stop + stop_threads + end + + private + + def process_queue(i) + loop do + obj = @request_queue.deq + break if obj.equal? POISON + @response_queue.enq apply_func(obj, i) + end + end + + def apply_func(obj, i) + @func.call(obj, i) + rescue Exception => e + WrappedException.new(e) + end + + # Stop the worker threads by sending a poison object down the request queue + # so as worker threads after retrieving it, shut themselves down + def stop_threads + return unless @threads + @threads.each { @request_queue.enq POISON } + @threads.each(&:join) + @threads = nil + end + + def abort_threads + return unless @threads + Bundler.ui.debug("\n#{caller.join("\n")}") + @threads.each(&:exit) + exit 1 + end + + def create_threads + creation_errors = [] + + @threads = Array.new(@size) do |i| + begin + Thread.start { process_queue(i) }.tap do |thread| + thread.name = "#{name} Worker ##{i}" if thread.respond_to?(:name=) + end + rescue ThreadError => e + creation_errors << e + nil + end + end.compact + + return if creation_errors.empty? + + message = "Failed to create threads for the #{name} worker: #{creation_errors.map(&:to_s).uniq.join(", ")}" + raise ThreadCreationError, message if @threads.empty? + Bundler.ui.info message + end + end +end |