diff options
Diffstat (limited to 'lib/thread.rb')
| -rw-r--r-- | lib/thread.rb | 296 |
1 files changed, 87 insertions, 209 deletions
diff --git a/lib/thread.rb b/lib/thread.rb index 7df6a140f5..f3831a7425 100644 --- a/lib/thread.rb +++ b/lib/thread.rb @@ -1,10 +1,5 @@ # -# NOTE: -# This file is overwritten by ext/thread/lib/thread.rb unless ruby -# is configured with --disable-fastthread. -# # thread.rb - thread support classes -# $Date$ # by Yukihiro Matsumoto <matz@netlab.co.jp> # # Copyright (C) 2001 Yukihiro Matsumoto @@ -13,147 +8,19 @@ # unless defined? Thread - fail "Thread not available for this ruby interpreter" + raise "Thread not available for this ruby interpreter" end -class Thread - # - # Wraps a block in Thread.critical, restoring the original value upon exit - # from the critical section. - # - def Thread.exclusive - _old = Thread.critical - begin - Thread.critical = true - return yield - ensure - Thread.critical = _old - end +unless defined? ThreadError + class ThreadError < StandardError end end -# -# Mutex implements a simple semaphore that can be used to coordinate access to -# shared data from multiple concurrent threads. -# -# Example: -# -# require 'thread' -# semaphore = Mutex.new -# -# a = Thread.new { -# semaphore.synchronize { -# # access shared resource -# } -# } -# -# b = Thread.new { -# semaphore.synchronize { -# # access shared resource -# } -# } -# -class Mutex - # - # Creates a new Mutex - # - def initialize - @waiting = [] - @locked = false; - @waiting.taint # enable tainted comunication - self.taint - end - - # - # Returns +true+ if this lock is currently held by some thread. - # - def locked? - @locked - end - - # - # Attempts to obtain the lock and returns immediately. Returns +true+ if the - # lock was granted. - # - def try_lock - result = false - Thread.critical = true - unless @locked - @locked = true - result = true - end - Thread.critical = false - result - end - - # - # Attempts to grab the lock and waits if it isn't available. - # - def lock - while (Thread.critical = true; @locked) - @waiting.push Thread.current - Thread.stop - end - @locked = true - Thread.critical = false - self - end - - # - # Releases the lock. Returns +nil+ if ref wasn't locked. - # - def unlock - return unless @locked - Thread.critical = true - @locked = false - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry - end - Thread.critical = false - begin - t.run if t - rescue ThreadError - end - self - end - - # - # Obtains a lock, runs the block, and releases the lock when the block - # completes. See the example under Mutex. - # - def synchronize - lock - begin - yield - ensure - unlock - end - end - - # - # If the mutex is locked, unlocks the mutex, wakes one waiting thread, and - # yields in a critical section. - # - def exclusive_unlock - return unless @locked - Thread.exclusive do - @locked = false - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry - end - yield - end - self - end +if $DEBUG + Thread.abort_on_exception = true end -# +# # ConditionVariable objects augment class Mutex. Using condition variables, # it is possible to suspend while in the middle of a critical section until a # resource becomes available. @@ -164,7 +31,7 @@ end # # mutex = Mutex.new # resource = ConditionVariable.new -# +# # a = Thread.new { # mutex.synchronize { # # Thread 'a' now needs the resource @@ -172,7 +39,7 @@ end # # 'a' can now have the resource # } # } -# +# # b = Thread.new { # mutex.synchronize { # # Thread 'b' has finished using the resource @@ -186,49 +53,56 @@ class ConditionVariable # def initialize @waiters = [] + @waiters_mutex = Mutex.new end - + # # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. # - def wait(mutex) + # If +timeout+ is given, this method returns after +timeout+ seconds passed, + # even if no other thread doesn't signal. + # + def wait(mutex, timeout=nil) begin - mutex.exclusive_unlock do + # TODO: mutex should not be used + @waiters_mutex.synchronize do @waiters.push(Thread.current) - Thread.stop end - ensure - mutex.lock + mutex.sleep timeout end + self end - + # # Wakes up the first thread in line waiting for this lock. # def signal begin - t = @waiters.shift + t = @waiters_mutex.synchronize { @waiters.shift } t.run if t rescue ThreadError retry end + self end - + # # Wakes up all threads waiting for this lock. # def broadcast + # TODO: imcomplete waiters0 = nil - Thread.exclusive do + @waiters_mutex.synchronize do waiters0 = @waiters.dup @waiters.clear end for t in waiters0 begin - t.run + t.run rescue ThreadError end end + self end end @@ -238,9 +112,9 @@ end # Example: # # require 'thread' -# +# # queue = Queue.new -# +# # producer = Thread.new do # 5.times do |i| # sleep rand(i) # simulate expense @@ -248,7 +122,7 @@ end # puts "#{i} produced" # end # end -# +# # consumer = Thread.new do # 5.times do |i| # value = queue.pop @@ -256,7 +130,7 @@ end # puts "consumed #{value}" # end # end -# +# # consumer.join # class Queue @@ -269,26 +143,22 @@ class Queue @que.taint # enable tainted comunication @waiting.taint self.taint + @mutex = Mutex.new end # # Pushes +obj+ to the queue. # def push(obj) - Thread.critical = true - @que.push obj - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry - ensure - Thread.critical = false - end - begin - t.run if t - rescue ThreadError - end + @mutex.synchronize{ + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + } end # @@ -307,14 +177,17 @@ class Queue # thread isn't suspended, and an exception is raised. # def pop(non_block=false) - while (Thread.critical = true; @que.empty?) - raise ThreadError, "queue empty" if non_block - @waiting.push Thread.current - Thread.stop - end - @que.shift - ensure - Thread.critical = false + @mutex.synchronize{ + while true + if @que.empty? + raise ThreadError, "queue empty" if non_block + @waiting.push Thread.current + @mutex.sleep + else + return @que.shift + end + end + } end # @@ -328,7 +201,7 @@ class Queue alias deq pop # - # Returns +true+ is the queue is empty. + # Returns +true+ if the queue is empty. # def empty? @que.empty? @@ -367,7 +240,7 @@ end # # See Queue for an example of how a SizedQueue works. # -class SizedQueue<Queue +class SizedQueue < Queue # # Creates a fixed-length queue with a maximum size of +max+. # @@ -390,14 +263,16 @@ class SizedQueue<Queue # Sets the maximum size of the queue. # def max=(max) - Thread.critical = true - if max <= @max - @max = max - Thread.critical = false - else - diff = max - @max - @max = max - Thread.critical = false + diff = nil + @mutex.synchronize { + if max <= @max + @max = max + else + diff = max - @max + @max = max + end + } + if diff diff.times do begin t = @queue_wait.shift @@ -415,13 +290,21 @@ class SizedQueue<Queue # until space becomes available. # def push(obj) - Thread.critical = true - while @que.length >= @max - @queue_wait.push Thread.current - Thread.stop - Thread.critical = true - end - super + @mutex.synchronize{ + while true + break if @que.length < @max + @queue_wait.push Thread.current + @mutex.sleep + end + + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + } end # @@ -439,21 +322,16 @@ class SizedQueue<Queue # def pop(*args) retval = super - Thread.critical = true - if @que.length < @max - begin - t = @queue_wait.shift - t.wakeup if t - rescue ThreadError - retry - ensure - Thread.critical = false - end - begin - t.run if t - rescue ThreadError + @mutex.synchronize { + if @que.length < @max + begin + t = @queue_wait.shift + t.wakeup if t + rescue ThreadError + retry + end end - end + } retval end |
