summaryrefslogtreecommitdiff
path: root/lib/thwait.rb
blob: d3b329911ffd51da4dad74d0fed0f87fc422dacd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#
#   thwait.rb - thread synchronization class
#   	$Release Version: 0.9 $
#   	$Revision: 1.3 $
#   	$Date: 1998/06/26 03:19:34 $
#   	by Keiju ISHITSUKA(Nihpon Rational Software Co.,Ltd.)
#
# --
#  feature:
#  provides synchronization for multiple threads.
#
#  class methods:
#  * ThreadsWait.all_waits(thread1,...)
#    waits until all of specified threads are terminated.
#    if a block is supplied for the method, evaluates it for
#    each thread termination.
#  * th = ThreadsWait.new(thread1,...)
#    creates synchronization object, specifying thread(s) to wait.
#  
#  methods:
#  * th.threads
#    list threads to be synchronized
#  * th.empty?
#    is there any thread to be synchronized.
#  * th.finished?
#    is there already terminated thread.
#  * th.join(thread1,...) 
#    wait for specified thread(s).
#  * th.join_nowait(threa1,...)
#    specifies thread(s) to wait.  non-blocking.
#  * th.next_wait
#    waits until any of specified threads is terminated.
#  * th.all_waits
#    waits until all of specified threads are terminated.
#    if a block is supplied for the method, evaluates it for
#    each thread termination.
#

require "thread.rb"
require "e2mmap.rb"

class ThreadsWait
  RCS_ID='-$Id: thwait.rb,v 1.3 1998/06/26 03:19:34 keiju Exp keiju $-'
  
  Exception2MessageMapper.extend_to(binding)
  def_exception("ErrNoWaitingThread", "No threads for waiting.")
  def_exception("ErrNoFinshedThread", "No finished threads.")
  
  def ThreadsWait.all_waits(*threads)
    tw = ThreadsWait.new(*threads)
    if iterator?
      tw.all_waits do
	|th|
	yield th
      end
    else
      tw.all_waits
    end
  end
  
  def initialize(*threads)
    @threads = []
    @wait_queue = Queue.new
    join_nowait(*threads) unless threads.empty?
  end
  
  # accessing
  #	threads - list threads to be synchronized
  attr :threads
  
  # testing
  #	empty?
  #	finished?

  # is there any thread to be synchronized.
  def empty?
    @threads.empty?
  end
  
  # is there already terminated thread.
  def finished?
    !@wait_queue.empty?
  end
  
  # main process:
  #	join
  #	join_nowait
  #	next_wait
  #	all_wait
  
  # adds thread(s) to join,  waits for any of waiting threads to terminate.
  def join(*threads)
    join_nowait(*threads)
    next_wait
  end
  
  # adds thread(s) to join, no wait.
  def join_nowait(*threads)
    @threads.concat threads
    for th in threads
      Thread.start do
	th = Thread.join(th)
	@wait_queue.push th
      end
    end
  end
  
  # waits for any of waiting threads to terminate
  # if there is no thread to wait, raises ErrNoWaitingThread.
  # if `nonblock' is true, and there is no terminated thread,
  # raises ErrNoFinishedThread.
  def next_wait(nonblock = nil)
    ThreadsWait.fail ErrNoWaitingThread if @threads.empty?
    begin
      @threads.delete(th = @wait_queue.pop(nonblock))
      th
    rescue ThreadError
      ThreadsWait.fail ErrNoFinshedThread
    end
  end
  
  # waits until all of specified threads are terminated.
  # if a block is supplied for the method, evaluates it for
  # each thread termination.
  def all_waits
    until @threads.empty?
      th = next_wait
      yield th if iterator?
    end
  end
end

ThWait = ThreadsWait