diff options
author | yugui <yugui@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2008-08-25 15:02:05 +0000 |
---|---|---|
committer | yugui <yugui@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2008-08-25 15:02:05 +0000 |
commit | 0dc342de848a642ecce8db697b8fecd83a63e117 (patch) | |
tree | 2b7ed4724aff1f86073e4740134bda9c4aac1a39 /trunk/lib/thread.rb | |
parent | ef70cf7138ab8034b5b806f466e4b484b24f0f88 (diff) |
added tag v1_9_0_4
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/tags/v1_9_0_4@18845 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'trunk/lib/thread.rb')
-rw-r--r-- | trunk/lib/thread.rb | 367 |
1 files changed, 367 insertions, 0 deletions
diff --git a/trunk/lib/thread.rb b/trunk/lib/thread.rb new file mode 100644 index 0000000000..68eeaf5b05 --- /dev/null +++ b/trunk/lib/thread.rb @@ -0,0 +1,367 @@ +# +# thread.rb - thread support classes +# by Yukihiro Matsumoto <matz@netlab.co.jp> +# +# Copyright (C) 2001 Yukihiro Matsumoto +# Copyright (C) 2000 Network Applied Communication Laboratory, Inc. +# Copyright (C) 2000 Information-technology Promotion Agency, Japan +# + +unless defined? Thread + raise "Thread not available for this ruby interpreter" +end + +unless defined? ThreadError + class ThreadError < StandardError + end +end + +if $DEBUG + Thread.abort_on_exception = true +end + +# +# ConditionVariable objects augment class Mutex. Using condition variables, +# it is possible to suspend while in the middle of a critical section until a +# resource becomes available. +# +# Example: +# +# require 'thread' +# +# mutex = Mutex.new +# resource = ConditionVariable.new +# +# a = Thread.new { +# mutex.synchronize { +# # Thread 'a' now needs the resource +# resource.wait(mutex) +# # 'a' can now have the resource +# } +# } +# +# b = Thread.new { +# mutex.synchronize { +# # Thread 'b' has finished using the resource +# resource.signal +# } +# } +# +class ConditionVariable + # + # Creates a new ConditionVariable + # + def initialize + @waiters = [] + @waiters_mutex = Mutex.new + end + + # + # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. + # + def wait(mutex) + begin + # TODO: mutex should not be used + @waiters_mutex.synchronize do + @waiters.push(Thread.current) + end + mutex.sleep + end + end + + # + # Wakes up the first thread in line waiting for this lock. + # + def signal + begin + t = @waiters_mutex.synchronize { @waiters.shift } + t.run if t + rescue ThreadError + retry + end + end + + # + # Wakes up all threads waiting for this lock. + # + def broadcast + # TODO: imcomplete + waiters0 = nil + @waiters_mutex.synchronize do + waiters0 = @waiters.dup + @waiters.clear + end + for t in waiters0 + begin + t.run + rescue ThreadError + end + end + end +end + +# +# This class provides a way to synchronize communication between threads. +# +# Example: +# +# require 'thread' +# +# queue = Queue.new +# +# producer = Thread.new do +# 5.times do |i| +# sleep rand(i) # simulate expense +# queue << i +# puts "#{i} produced" +# end +# end +# +# consumer = Thread.new do +# 5.times do |i| +# value = queue.pop +# sleep rand(i/2) # simulate expense +# puts "consumed #{value}" +# end +# end +# +# consumer.join +# +class Queue + # + # Creates a new queue. + # + def initialize + @que = [] + @waiting = [] + @que.taint # enable tainted comunication + @waiting.taint + self.taint + @mutex = Mutex.new + end + + # + # Pushes +obj+ to the queue. + # + def push(obj) + t = nil + @mutex.synchronize{ + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + } + begin + t.run if t + rescue ThreadError + end + end + + # + # Alias of push + # + alias << push + + # + # Alias of push + # + alias enq push + + # + # Retrieves data from the queue. If the queue is empty, the calling thread is + # suspended until data is pushed onto the queue. If +non_block+ is true, the + # thread isn't suspended, and an exception is raised. + # + def pop(non_block=false) + while true + @mutex.synchronize{ + if @que.empty? + raise ThreadError, "queue empty" if non_block + @waiting.push Thread.current + @mutex.sleep + else + return @que.shift + end + } + end + end + + # + # Alias of pop + # + alias shift pop + + # + # Alias of pop + # + alias deq pop + + # + # Returns +true+ if the queue is empty. + # + def empty? + @que.empty? + end + + # + # Removes all objects from the queue. + # + def clear + @que.clear + end + + # + # Returns the length of the queue. + # + def length + @que.length + end + + # + # Alias of length. + # + alias size length + + # + # Returns the number of threads waiting on the queue. + # + def num_waiting + @waiting.size + end +end + +# +# This class represents queues of specified size capacity. The push operation +# may be blocked if the capacity is full. +# +# See Queue for an example of how a SizedQueue works. +# +class SizedQueue < Queue + # + # Creates a fixed-length queue with a maximum size of +max+. + # + def initialize(max) + raise ArgumentError, "queue size must be positive" unless max > 0 + @max = max + @queue_wait = [] + @queue_wait.taint # enable tainted comunication + super() + end + + # + # Returns the maximum size of the queue. + # + def max + @max + end + + # + # Sets the maximum size of the queue. + # + def max=(max) + diff = nil + @mutex.synchronize { + if max <= @max + @max = max + else + diff = max - @max + @max = max + end + } + if diff + diff.times do + begin + t = @queue_wait.shift + t.run if t + rescue ThreadError + retry + end + end + end + max + end + + # + # Pushes +obj+ to the queue. If there is no space left in the queue, waits + # until space becomes available. + # + def push(obj) + t = nil + @mutex.synchronize{ + while true + break if @que.length <= @max + @queue_wait.push Thread.current + @mutex.sleep + end + + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + } + + begin + t.run if t + rescue ThreadError + end + end + + # + # Alias of push + # + alias << push + + # + # Alias of push + # + alias enq push + + # + # Retrieves data from the queue and runs a waiting thread, if any. + # + def pop(*args) + retval = super + t = nil + @mutex.synchronize { + if @que.length < @max + begin + t = @queue_wait.shift + t.wakeup if t + rescue ThreadError + retry + end + end + } + begin + t.run if t + rescue ThreadError + end + retval + end + + # + # Alias of pop + # + alias shift pop + + # + # Alias of pop + # + alias deq pop + + # + # Returns the number of threads waiting on the queue. + # + def num_waiting + @waiting.size + @queue_wait.size + end +end + +# Documentation comments: +# - How do you make RDoc inherit documentation from superclass? |