summaryrefslogtreecommitdiff
path: root/lib/thread.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/thread.rb')
-rw-r--r--lib/thread.rb163
1 files changed, 131 insertions, 32 deletions
diff --git a/lib/thread.rb b/lib/thread.rb
index 4f294cc..ec75144 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -9,14 +9,20 @@ unless defined? Thread
end
unless defined? ThreadError
- class ThreadError<Exception
+ class ThreadError<StandardError
end
end
+if $DEBUG
+ Thread.abort_on_exception = true
+end
+
class Mutex
def initialize
@waiting = []
- @locked = FALSE;
+ @locked = false;
+ @waiting.taint # enable tainted comunication
+ self.taint
end
def locked?
@@ -24,42 +30,39 @@ class Mutex
end
def try_lock
- result = FALSE
- Thread.critical = TRUE
+ result = false
+ Thread.critical = true
unless @locked
- @locked = TRUE
- result = TRUE
+ @locked = true
+ result = true
end
- Thread.critical = FALSE
+ Thread.critical = false
result
end
def lock
- while (Thread.critical = TRUE; @locked)
+ while (Thread.critical = true; @locked)
@waiting.push Thread.current
Thread.stop
end
- @locked = TRUE
- Thread.critical = FALSE
+ @locked = true
+ Thread.critical = false
self
end
def unlock
return unless @locked
Thread.critical = TRUE
- wait = @waiting
- @waiting = []
+ t = @waiting.shift
@locked = FALSE
Thread.critical = FALSE
- for w in wait
- w.run
- end
+ t.run if t
self
end
def synchronize
+ lock
begin
- lock
yield
ensure
unlock
@@ -67,37 +70,74 @@ class Mutex
end
end
+class ConditionVariable
+ def initialize
+ @waiters = []
+ @waiters_mutex = Mutex.new
+ @waiters.taint # enable tainted comunication
+ self.taint
+ end
+
+ def wait(mutex)
+ mutex.unlock
+ @waiters_mutex.synchronize {
+ @waiters.push(Thread.current)
+ }
+ Thread.stop
+ mutex.lock
+ end
+
+ def signal
+ @waiters_mutex.synchronize {
+ t = @waiters.shift
+ t.run if t
+ }
+ end
+
+ def broadcast
+ @waiters_mutex.synchronize {
+ for t in @waiters
+ t.run
+ end
+ @waiters.clear
+ }
+ end
+end
+
class Queue
def initialize
@que = []
@waiting = []
+ @que.taint # enable tainted comunication
+ @waiting.taint
+ self.taint
end
def push(obj)
- Thread.critical = TRUE
+ Thread.critical = true
@que.push obj
t = @waiting.shift
- Thread.critical = FALSE
+ Thread.critical = false
t.run if t
end
- def pop non_block=FALSE
- item = nil
- until item
- Thread.critical = TRUE
- if @que.length == 0
- if non_block
- Thread.critical = FALSE
- raise ThreadError, "queue empty"
+ def pop non_block=false
+ Thread.critical = true
+ begin
+ loop do
+ if @que.length == 0
+ if non_block
+ raise ThreadError, "queue empty"
+ end
+ @waiting.push Thread.current
+ Thread.stop
+ else
+ return @que.shift
end
- @waiting.push Thread.current
- Thread.stop
- else
- item = @que.shift
end
+ ensure
+ Thread.critical = false
end
- Thread.critical = FALSE
- item
end
def empty?
@@ -107,4 +147,63 @@ class Queue
def length
@que.length
end
+ alias size length
+
+
+ def num_waiting
+ @waiting.size
+ end
+end
+
+class SizedQueue<Queue
+ def initialize(max)
+ @max = max
+ @queue_wait = []
+ @queue_wait.taint # enable tainted comunication
+ super()
+ end
+
+ def max
+ @max
+ end
+
+ def max=(max)
+ Thread.critical = TRUE
+ if @max >= max
+ @max = max
+ Thread.critical = FALSE
+ else
+ diff = max - @max
+ @max = max
+ Thread.critical = FALSE
+ diff.times do
+ t = @queue_wait.shift
+ t.run if t
+ end
+ end
+ max
+ end
+
+ def push(obj)
+ Thread.critical = true
+ while @que.length >= @max
+ @queue_wait.push Thread.current
+ Thread.stop
+ Thread.critical = true
+ end
+ super
+ end
+
+ def pop(*args)
+ Thread.critical = true
+ if @que.length < @max
+ t = @queue_wait.shift
+ t.run if t
+ end
+ super
+ end
+
+ def num_waiting
+ @waiting.size + @queue_wait.size
+ end
end