summaryrefslogtreecommitdiff
path: root/sample/drb/old_tuplespace.rb
blob: 0da9fa84c32306bc3e229480d38fae17085f80f5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/local/bin/ruby
# TupleSpace
# Copyright (c) 1999-2000 Masatoshi SEKI
# You can redistribute it and/or modify it under the same terms as Ruby.

require 'thread'

class TupleSpace
  class Template
    def initialize(list)
      @list = list
      @check_idx = []
      @list.each_with_index do |x, i|
	@check_idx.push i if x
      end
      @size = @list.size
    end

    attr :size
    alias length size

    def match(tuple)
      return nil if tuple.size != self.size
      @check_idx.each do |i|
	unless @list[i] === tuple[i]
	  return false
	end
      end
      return true
    end
  end

  def initialize
    @que = {}
    @waiting = {}
    @que.taint		# enable tainted comunication
    @waiting.taint
    self.taint
  end

  def wakeup_waiting(tuple)
    sz = tuple.length
    return nil unless @waiting[sz]

    x = nil
    i = -1
    found = false
    @waiting[sz] = @waiting[sz].find_all { |x|
      if x[0].match(tuple)
	begin
	  x[1].wakeup
	rescue ThreadError
	end
	false
      else
	true
      end
    }
  end

  def put_waiting(template, thread)
    sz = template.length
    @waiting[sz] = [] unless @waiting[sz]
    @waiting[sz].push([Template.new(template), thread])
  end
  private :wakeup_waiting
  private :put_waiting

  def get_que(template)
    sz = template.length
    return nil unless @que[sz]

    template = Template.new(template)

    x = nil
    i = -1
    found = false
    @que[sz].each_with_index do |x, i|
      if template.match(x)
	found = true
	break
      end
    end
    return nil unless found

    @que[sz].delete_at(i)

    return x
  end

  def put_que(tuple)
    sz = tuple.length
    @que[sz] = [] unless @que[sz]
    @que[sz].push tuple
  end
  private :get_que
  private :put_que

  def out(*tuples)
    tuples.each do |tuple|
      Thread.critical = true
      put_que(tuple)
      wakeup_waiting(tuple)
      Thread.critical = false
    end
  end
  alias put out
  alias write out

  def in(template, non_block=false)
    begin
      loop do
	Thread.critical = true
	tuple = get_que(template)
	unless tuple
	  if non_block
	    raise ThreadError, "queue empty"
	  end
	  put_waiting(template, Thread.current)
	  Thread.stop
	else
	  return tuple
	end
      end
    ensure
      Thread.critical = false
    end
  end
  alias get in
  alias take in

  def rd(template, non_block=false)
    tuple = self.in(template, non_block)
    out(tuple)
    tuple
  end
  alias read rd

  def mv(dest, template, non_block=false)
    tuple = self.in(template, non_block)
    begin
      dest.out(tuple)
    rescue
      self.out(tuple)
    end
  end
  alias move mv
end

if __FILE__ == $0
  ts = TupleSpace.new
  clients = []
  servers = []

  def server(ts, id)
    Thread.start {
      loop do
	req = ts.in(['req', nil, nil])
	ac = req[1]
	num = req[2]
	sleep id
	ts.out([ac, id, num, num * num])
      end
    }
  end

  def client(ts, n)
    Thread.start {
      ac = Object.new
      tuples = (1..10).collect { |i|
	['req', ac, i * 10 + n]
      }
      ts.out(*tuples)
      ts.out(tuples[0])
      puts "out: #{n}"
      11.times do |i|
	ans = ts.in([ac, nil, nil, nil])
	puts "client(#{n}) server(#{ans[1]}) #{ans[2]} #{ans[3]}"
      end
    }
  end

  def watcher(ts)
    Thread.start {
      loop do
	begin
	  sleep 1
	  p ts.rd(['req', nil, nil], true)
	rescue ThreadError
	  puts "'req' not found."
	end
      end
    }
  end

  (0..3).each do |n|
    servers.push(server(ts, n))
  end

  (1..6).each do |n|
    clients.push(client(ts, n))
  end

  (1..3).each do
    watcher(ts)
  end

  clients.each do |t|
    t.join
  end
end