summaryrefslogtreecommitdiff
path: root/lib/bundler/worker.rb
blob: 10139ed25bbbf052d628ce3b5d2aea6e4d62ba83 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# frozen_string_literal: true

module Bundler
  class Worker
    POISON = Object.new

    class WrappedException < StandardError
      attr_reader :exception
      def initialize(exn)
        @exception = exn
      end
    end

    # @return [String] the name of the worker
    attr_reader :name

    # Creates a worker pool of specified size
    #
    # @param size [Integer] Size of pool
    # @param name [String] name the name of the worker
    # @param func [Proc] job to run in inside the worker pool
    def initialize(size, name, func)
      @name = name
      @request_queue = Queue.new
      @response_queue = Queue.new
      @func = func
      @size = size
      @threads = nil
      SharedHelpers.trap("INT") { abort_threads }
    end

    # Enqueue a request to be executed in the worker pool
    #
    # @param obj [String] mostly it is name of spec that should be downloaded
    def enq(obj)
      create_threads unless @threads
      @request_queue.enq obj
    end

    # Retrieves results of job function being executed in worker pool
    def deq
      result = @response_queue.deq
      raise result.exception if result.is_a?(WrappedException)
      result
    end

    def stop
      stop_threads
    end

    private

    def process_queue(i)
      loop do
        obj = @request_queue.deq
        break if obj.equal? POISON
        @response_queue.enq apply_func(obj, i)
      end
    end

    def apply_func(obj, i)
      @func.call(obj, i)
    rescue Exception => e # rubocop:disable Lint/RescueException
      WrappedException.new(e)
    end

    # Stop the worker threads by sending a poison object down the request queue
    # so as worker threads after retrieving it, shut themselves down
    def stop_threads
      return unless @threads
      @threads.each { @request_queue.enq POISON }
      @threads.each(&:join)
      @threads = nil
    end

    def abort_threads
      return unless @threads
      Bundler.ui.debug("\n#{caller.join("\n")}")
      @threads.each(&:exit)
      exit 1
    end

    def create_threads
      creation_errors = []

      @threads = Array.new(@size) do |i|
        begin
          Thread.start { process_queue(i) }.tap do |thread|
            thread.name = "#{name} Worker ##{i}" if thread.respond_to?(:name=)
          end
        rescue ThreadError => e
          creation_errors << e
          nil
        end
      end.compact

      return if creation_errors.empty?

      message = "Failed to create threads for the #{name} worker: #{creation_errors.map(&:to_s).uniq.join(", ")}"
      raise ThreadCreationError, message if @threads.empty?
      Bundler.ui.info message
    end
  end
end