summaryrefslogtreecommitdiff
path: root/lib/thread.rb
blob: 709aa83c25e542cd87eabe3413578799a0e0a2a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#
#		thread.rb - thread support classes
#			$Date$
#			by Yukihiro Matsumoto <matz@caelum.co.jp>
#

unless defined? Thread
  fail "Thread not available for this ruby interpreter"
end

unless defined? ThreadError
  class ThreadError<Exception
  end
end

if $DEBUG
  Thread.abort_on_exception = true
end

class Mutex
  def initialize
    @waiting = []
    @locked = false;
  end

  def locked?
    @locked
  end

  def try_lock
    result = false
    Thread.critical = true
    unless @locked
      @locked = true
      result = true
    end
    Thread.critical = false
    result
  end

  def lock
    while (Thread.critical = true; @locked)
      @waiting.push Thread.current
      Thread.stop
    end
    @locked = true
    Thread.critical = false
    self
  end

  def unlock
    return unless @locked
    Thread.critical = TRUE
    t = @waiting.shift
    @locked = FALSE
    Thread.critical = FALSE
    t.run if t
    self
  end

  def synchronize
    lock
    begin
      yield
    ensure
      unlock
    end
  end
end

class ConditionVariable
  def initialize
    @waiters = []
    @waiters_mutex = Mutex.new
  end
  
  def wait(mutex)
    mutex.unlock
    @waiters_mutex.synchronize {
      @waiters.push(Thread.current)
    }
    Thread.stop
    mutex.lock
  end
  
  def signal
    @waiters_mutex.synchronize {
      t = @waiters.shift
      t.run if t
    }
  end
    
  def broadcast
    @waiters_mutex.synchronize {
      for t in @waiters
	t.run
      end
      @waiters.clear
    }
  end
end

class Queue
  def initialize
    @que = []
    @waiting = []
  end

  def push(obj)
    Thread.critical = true
    @que.push obj
    t = @waiting.shift
    Thread.critical = false
    t.run if t
  end

  def pop non_block=false
    Thread.critical = true
    begin
      loop do
	if @que.length == 0
	  if non_block
	    raise ThreadError, "queue empty"
	  end
	  @waiting.push Thread.current
	  Thread.stop
	else
	  return @que.shift
	end
      end
    ensure
      Thread.critical = false
    end
  end

  def empty?
    @que.length == 0
  end

  def length
    @que.length
  end
  alias size length


  def num_waiting
    @waiting.size
  end
end

class SizedQueue<Queue
  def initialize(max)
    @max = max
    @queue_wait = []
    super()
  end

  def max
    @max
  end

  def max=(max)
    Thread.critical = TRUE
    if @max >= max
      @max = max
      Thread.critical = FALSE
    else
      diff = max - @max
      @max = max
      Thread.critical = FALSE
      diff.times do
	t = @queue_wait.shift
	t.run if t
      end
    end
    max
  end

  def push(obj)
    Thread.critical = true
    while @que.length >= @max
      @queue_wait.push Thread.current
      Thread.stop
      Thread.critical = true
    end
    super
  end

  def pop(*args)
    Thread.critical = true
    if @que.length < @max
      t = @queue_wait.shift
      t.run if t
    end
    super
  end

  def num_waiting
    @waiting.size + @queue_wait.size
  end
end