summaryrefslogtreecommitdiff
path: root/lib/thread.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/thread.rb')
-rw-r--r--lib/thread.rb296
1 files changed, 87 insertions, 209 deletions
diff --git a/lib/thread.rb b/lib/thread.rb
index 7df6a140f5..f3831a7425 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -1,10 +1,5 @@
#
-# NOTE:
-# This file is overwritten by ext/thread/lib/thread.rb unless ruby
-# is configured with --disable-fastthread.
-#
# thread.rb - thread support classes
-# $Date$
# by Yukihiro Matsumoto <matz@netlab.co.jp>
#
# Copyright (C) 2001 Yukihiro Matsumoto
@@ -13,147 +8,19 @@
#
unless defined? Thread
- fail "Thread not available for this ruby interpreter"
+ raise "Thread not available for this ruby interpreter"
end
-class Thread
- #
- # Wraps a block in Thread.critical, restoring the original value upon exit
- # from the critical section.
- #
- def Thread.exclusive
- _old = Thread.critical
- begin
- Thread.critical = true
- return yield
- ensure
- Thread.critical = _old
- end
+unless defined? ThreadError
+ class ThreadError < StandardError
end
end
-#
-# Mutex implements a simple semaphore that can be used to coordinate access to
-# shared data from multiple concurrent threads.
-#
-# Example:
-#
-# require 'thread'
-# semaphore = Mutex.new
-#
-# a = Thread.new {
-# semaphore.synchronize {
-# # access shared resource
-# }
-# }
-#
-# b = Thread.new {
-# semaphore.synchronize {
-# # access shared resource
-# }
-# }
-#
-class Mutex
- #
- # Creates a new Mutex
- #
- def initialize
- @waiting = []
- @locked = false;
- @waiting.taint # enable tainted comunication
- self.taint
- end
-
- #
- # Returns +true+ if this lock is currently held by some thread.
- #
- def locked?
- @locked
- end
-
- #
- # Attempts to obtain the lock and returns immediately. Returns +true+ if the
- # lock was granted.
- #
- def try_lock
- result = false
- Thread.critical = true
- unless @locked
- @locked = true
- result = true
- end
- Thread.critical = false
- result
- end
-
- #
- # Attempts to grab the lock and waits if it isn't available.
- #
- def lock
- while (Thread.critical = true; @locked)
- @waiting.push Thread.current
- Thread.stop
- end
- @locked = true
- Thread.critical = false
- self
- end
-
- #
- # Releases the lock. Returns +nil+ if ref wasn't locked.
- #
- def unlock
- return unless @locked
- Thread.critical = true
- @locked = false
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
- end
- Thread.critical = false
- begin
- t.run if t
- rescue ThreadError
- end
- self
- end
-
- #
- # Obtains a lock, runs the block, and releases the lock when the block
- # completes. See the example under Mutex.
- #
- def synchronize
- lock
- begin
- yield
- ensure
- unlock
- end
- end
-
- #
- # If the mutex is locked, unlocks the mutex, wakes one waiting thread, and
- # yields in a critical section.
- #
- def exclusive_unlock
- return unless @locked
- Thread.exclusive do
- @locked = false
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
- end
- yield
- end
- self
- end
+if $DEBUG
+ Thread.abort_on_exception = true
end
-#
+#
# ConditionVariable objects augment class Mutex. Using condition variables,
# it is possible to suspend while in the middle of a critical section until a
# resource becomes available.
@@ -164,7 +31,7 @@ end
#
# mutex = Mutex.new
# resource = ConditionVariable.new
-#
+#
# a = Thread.new {
# mutex.synchronize {
# # Thread 'a' now needs the resource
@@ -172,7 +39,7 @@ end
# # 'a' can now have the resource
# }
# }
-#
+#
# b = Thread.new {
# mutex.synchronize {
# # Thread 'b' has finished using the resource
@@ -186,49 +53,56 @@ class ConditionVariable
#
def initialize
@waiters = []
+ @waiters_mutex = Mutex.new
end
-
+
#
# Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
#
- def wait(mutex)
+ # If +timeout+ is given, this method returns after +timeout+ seconds passed,
+ # even if no other thread doesn't signal.
+ #
+ def wait(mutex, timeout=nil)
begin
- mutex.exclusive_unlock do
+ # TODO: mutex should not be used
+ @waiters_mutex.synchronize do
@waiters.push(Thread.current)
- Thread.stop
end
- ensure
- mutex.lock
+ mutex.sleep timeout
end
+ self
end
-
+
#
# Wakes up the first thread in line waiting for this lock.
#
def signal
begin
- t = @waiters.shift
+ t = @waiters_mutex.synchronize { @waiters.shift }
t.run if t
rescue ThreadError
retry
end
+ self
end
-
+
#
# Wakes up all threads waiting for this lock.
#
def broadcast
+ # TODO: imcomplete
waiters0 = nil
- Thread.exclusive do
+ @waiters_mutex.synchronize do
waiters0 = @waiters.dup
@waiters.clear
end
for t in waiters0
begin
- t.run
+ t.run
rescue ThreadError
end
end
+ self
end
end
@@ -238,9 +112,9 @@ end
# Example:
#
# require 'thread'
-#
+#
# queue = Queue.new
-#
+#
# producer = Thread.new do
# 5.times do |i|
# sleep rand(i) # simulate expense
@@ -248,7 +122,7 @@ end
# puts "#{i} produced"
# end
# end
-#
+#
# consumer = Thread.new do
# 5.times do |i|
# value = queue.pop
@@ -256,7 +130,7 @@ end
# puts "consumed #{value}"
# end
# end
-#
+#
# consumer.join
#
class Queue
@@ -269,26 +143,22 @@ class Queue
@que.taint # enable tainted comunication
@waiting.taint
self.taint
+ @mutex = Mutex.new
end
#
# Pushes +obj+ to the queue.
#
def push(obj)
- Thread.critical = true
- @que.push obj
- begin
- t = @waiting.shift
- t.wakeup if t
- rescue ThreadError
- retry
- ensure
- Thread.critical = false
- end
- begin
- t.run if t
- rescue ThreadError
- end
+ @mutex.synchronize{
+ @que.push obj
+ begin
+ t = @waiting.shift
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ end
+ }
end
#
@@ -307,14 +177,17 @@ class Queue
# thread isn't suspended, and an exception is raised.
#
def pop(non_block=false)
- while (Thread.critical = true; @que.empty?)
- raise ThreadError, "queue empty" if non_block
- @waiting.push Thread.current
- Thread.stop
- end
- @que.shift
- ensure
- Thread.critical = false
+ @mutex.synchronize{
+ while true
+ if @que.empty?
+ raise ThreadError, "queue empty" if non_block
+ @waiting.push Thread.current
+ @mutex.sleep
+ else
+ return @que.shift
+ end
+ end
+ }
end
#
@@ -328,7 +201,7 @@ class Queue
alias deq pop
#
- # Returns +true+ is the queue is empty.
+ # Returns +true+ if the queue is empty.
#
def empty?
@que.empty?
@@ -367,7 +240,7 @@ end
#
# See Queue for an example of how a SizedQueue works.
#
-class SizedQueue<Queue
+class SizedQueue < Queue
#
# Creates a fixed-length queue with a maximum size of +max+.
#
@@ -390,14 +263,16 @@ class SizedQueue<Queue
# Sets the maximum size of the queue.
#
def max=(max)
- Thread.critical = true
- if max <= @max
- @max = max
- Thread.critical = false
- else
- diff = max - @max
- @max = max
- Thread.critical = false
+ diff = nil
+ @mutex.synchronize {
+ if max <= @max
+ @max = max
+ else
+ diff = max - @max
+ @max = max
+ end
+ }
+ if diff
diff.times do
begin
t = @queue_wait.shift
@@ -415,13 +290,21 @@ class SizedQueue<Queue
# until space becomes available.
#
def push(obj)
- Thread.critical = true
- while @que.length >= @max
- @queue_wait.push Thread.current
- Thread.stop
- Thread.critical = true
- end
- super
+ @mutex.synchronize{
+ while true
+ break if @que.length < @max
+ @queue_wait.push Thread.current
+ @mutex.sleep
+ end
+
+ @que.push obj
+ begin
+ t = @waiting.shift
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ end
+ }
end
#
@@ -439,21 +322,16 @@ class SizedQueue<Queue
#
def pop(*args)
retval = super
- Thread.critical = true
- if @que.length < @max
- begin
- t = @queue_wait.shift
- t.wakeup if t
- rescue ThreadError
- retry
- ensure
- Thread.critical = false
- end
- begin
- t.run if t
- rescue ThreadError
+ @mutex.synchronize {
+ if @que.length < @max
+ begin
+ t = @queue_wait.shift
+ t.wakeup if t
+ rescue ThreadError
+ retry
+ end
end
- end
+ }
retval
end