From 3357d88ada8ad44ecf267f5f2a1fb1df4e5cb8f4 Mon Sep 17 00:00:00 2001 From: kosaki Date: Fri, 30 Nov 2012 18:55:09 +0000 Subject: * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable. * lib/thread.rb (SizedQueue#push): ditto. * lib/thread.rb (SizedQueue#max): ditto. * lib/thread.rb (Queue#pop): ditto. * lib/thread.rb (Queue#push): ditto. * lib/thread.rb (SizedQueue#num_waiting): adopt the above changes. * lib/thread.rb (SizedQueue#initialize): ditto. * lib/thread.rb (Queue#num_waiting): ditto. * lib/thread.rb (Queue#initialize): ditto. * test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@38087 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- lib/thread.rb | 99 +++++++++++++++++++++++++---------------------------------- 1 file changed, 42 insertions(+), 57 deletions(-) (limited to 'lib/thread.rb') diff --git a/lib/thread.rb b/lib/thread.rb index f3dc9b15f9..e3aeed19fe 100644 --- a/lib/thread.rb +++ b/lib/thread.rb @@ -149,26 +149,23 @@ class Queue # def initialize @que = [] - @waiting = [] @que.taint # enable tainted communication - @waiting.taint + @num_waiting = 0 self.taint @mutex = Mutex.new + @cond = ConditionVariable.new end # # Pushes +obj+ to the queue. # def push(obj) - @mutex.synchronize{ - @que.push obj - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry + Thread.async_interrupt_timing(StandardError => :on_blocking) do + @mutex.synchronize do + @que.push obj + @cond.signal end - } + end end # @@ -187,23 +184,26 @@ class Queue # thread isn't suspended, and an exception is raised. # def pop(non_block=false) - @mutex.synchronize{ - begin + Thread.async_interrupt_timing(StandardError => :on_blocking) do + @mutex.synchronize do while true if @que.empty? - raise ThreadError, "queue empty" if non_block - # @waiting.include? check is necessary for avoiding a race against - # Thread.wakeup [Bug 5195] - @waiting.push Thread.current unless @waiting.include?(Thread.current) - @mutex.sleep + if non_block + raise ThreadError, "queue empty" + else + begin + @num_waiting += 1 + @cond.wait @mutex + ensure + @num_waiting -= 1 + end + end else return @que.shift end end - ensure - @waiting.delete(Thread.current) end - } + end end # @@ -246,7 +246,7 @@ class Queue # Returns the number of threads waiting on the queue. # def num_waiting - @waiting.size + @num_waiting end end @@ -263,8 +263,8 @@ class SizedQueue < Queue def initialize(max) raise ArgumentError, "queue size must be positive" unless max > 0 @max = max - @queue_wait = [] - @queue_wait.taint # enable tainted comunication + @enque_cond = ConditionVariable.new + @num_enqueue_waiting = 0 super() end @@ -280,22 +280,15 @@ class SizedQueue < Queue # def max=(max) raise ArgumentError, "queue size must be positive" unless max > 0 - diff = nil - @mutex.synchronize { + + @mutex.synchronize do if max <= @max @max = max else diff = max - @max @max = max - end - } - if diff - diff.times do - begin - t = @queue_wait.shift - t.run if t - rescue ThreadError - retry + diff.times do + @enque_cond.signal end end end @@ -307,25 +300,22 @@ class SizedQueue < Queue # until space becomes available. # def push(obj) - @mutex.synchronize{ - begin + Thread.async_interrupt_timing(RuntimeError => :on_blocking) do + @mutex.synchronize do while true break if @que.length < @max - @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current) - @mutex.sleep + @num_enqueue_waiting += 1 + begin + @enque_cond.wait @mutex + ensure + @num_enqueue_waiting -= 1 + end end - ensure - @queue_wait.delete(Thread.current) - end - @que.push obj - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry + @que.push obj + @cond.signal end - } + end end # @@ -343,16 +333,11 @@ class SizedQueue < Queue # def pop(*args) retval = super - @mutex.synchronize { + @mutex.synchronize do if @que.length < @max - begin - t = @queue_wait.shift - t.wakeup if t - rescue ThreadError - retry - end + @enque_cond.signal end - } + end retval end @@ -370,7 +355,7 @@ class SizedQueue < Queue # Returns the number of threads waiting on the queue. # def num_waiting - @waiting.size + @queue_wait.size + @num_waiting + @num_enqueue_waiting end end -- cgit v1.2.3