diff options
Diffstat (limited to 'lib/bundler/worker.rb')
| -rw-r--r-- | lib/bundler/worker.rb | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/lib/bundler/worker.rb b/lib/bundler/worker.rb index e91cfa7805..77f4f004aa 100644 --- a/lib/bundler/worker.rb +++ b/lib/bundler/worker.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require "thread" - module Bundler class Worker POISON = Object.new @@ -23,20 +21,22 @@ module Bundler # @param func [Proc] job to run in inside the worker pool def initialize(size, name, func) @name = name - @request_queue = Queue.new - @response_queue = Queue.new + @request_queue = Thread::Queue.new + @request_queue_with_priority = Thread::Queue.new + @response_queue = Thread::Queue.new @func = func @size = size @threads = nil - SharedHelpers.trap("INT") { abort_threads } + @previous_interrupt_handler = nil end # Enqueue a request to be executed in the worker pool # # @param obj [String] mostly it is name of spec that should be downloaded - def enq(obj) + def enq(obj, priority: false) + queue = priority ? @request_queue_with_priority : @request_queue create_threads unless @threads - @request_queue.enq obj + queue.enq obj end # Retrieves results of job function being executed in worker pool @@ -50,11 +50,17 @@ module Bundler stop_threads end - private + private def process_queue(i) loop do - obj = @request_queue.deq + obj = begin + @request_queue_with_priority.deq(true) + rescue ThreadError + @request_queue.deq(false, timeout: 0.05) + end + + next if obj.nil? break if obj.equal? POISON @response_queue.enq apply_func(obj, i) end @@ -62,7 +68,7 @@ module Bundler def apply_func(obj, i) @func.call(obj, i) - rescue Exception => e + rescue Exception => e # rubocop:disable Lint/RescueException WrappedException.new(e) end @@ -70,13 +76,16 @@ module Bundler # so as worker threads after retrieving it, shut themselves down def stop_threads return unless @threads + @threads.each { @request_queue.enq POISON } @threads.each(&:join) + + remove_interrupt_handler + @threads = nil end def abort_threads - return unless @threads Bundler.ui.debug("\n#{caller.join("\n")}") @threads.each(&:exit) exit 1 @@ -86,21 +95,31 @@ module Bundler creation_errors = [] @threads = Array.new(@size) do |i| - begin - Thread.start { process_queue(i) }.tap do |thread| - thread.name = "#{name} Worker ##{i}" if thread.respond_to?(:name=) - end - rescue ThreadError => e - creation_errors << e - nil + Thread.start { process_queue(i) }.tap do |thread| + thread.name = "#{name} Worker ##{i}" end + rescue ThreadError => e + creation_errors << e + nil end.compact + add_interrupt_handler unless @threads.empty? + return if creation_errors.empty? message = "Failed to create threads for the #{name} worker: #{creation_errors.map(&:to_s).uniq.join(", ")}" raise ThreadCreationError, message if @threads.empty? Bundler.ui.info message end + + def add_interrupt_handler + @previous_interrupt_handler = trap("INT") { abort_threads } + end + + def remove_interrupt_handler + return unless @previous_interrupt_handler + + trap "INT", @previous_interrupt_handler + end end end |
