summaryrefslogtreecommitdiff
path: root/lib/rinda/ring.rb
blob: 7152dfb72e0b3667acc2004da6dc9b629d235ad9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#
# Note: Rinda::Ring API is unstable.
#
require 'drb/drb'
require 'rinda/rinda'
require 'thread'

module Rinda
  Ring_PORT = 7647
  class RingServer
    include DRbUndumped

    def initialize(ts, port=Ring_PORT)
      @ts = ts
      @soc = UDPSocket.open
      @soc.bind('', port)
      @w_service = write_service
      @r_service = reply_service
    end

    def write_service
      Thread.new do
	loop do
	  msg, addr = @soc.recvfrom(1024)
	  do_write(msg)
	end
      end
    end
  
    def do_write(msg)
      Thread.new do
	begin
	  tuple, sec = Marshal.load(msg)
	  @ts.write(tuple, sec)
	rescue
	end
      end
    end

    def reply_service
      Thread.new do
	loop do
	  do_reply
	end
      end
    end
    
    def do_reply
      tuple = @ts.take([:lookup_ring, DRbObject])
      Thread.new { tuple[1].call(@ts) rescue nil}
    rescue
    end
  end

  class RingFinger
    @@finger = nil
    def self.finger
      unless @@finger 
	@@finger = self.new
	@@finger.lookup_ring_any
      end
      @@finger
    end

    def self.primary
      finger.primary
    end

    def self.to_a
      finger.to_a
    end

    @@broadcast_list = ['<broadcast>', 'localhost']
    def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
      @broadcast_list = broadcast_list || ['localhost']
      @port = port
      @primary = nil
      @rings = []
    end
    attr_accessor :broadcast_list, :port, :primary

    def to_a
      @rings
    end

    def each
      lookup_ring_any unless @primary
      return unless @primary
      yield(@primary)
      @rings.each { |x| yield(x) }
    end

    def lookup_ring(timeout=5, &block)
      return lookup_ring_any(timeout) unless block_given?

      msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
      @broadcast_list.each do |it|
	soc = UDPSocket.open
	begin
	  soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
	  soc.send(msg, 0, it, @port)
	rescue
	  nil
	ensure
	  soc.close
	end
      end
      sleep(timeout)
    end

    def lookup_ring_any(timeout=5)
      queue = Queue.new

      th = Thread.new do
	self.lookup_ring(timeout) do |ts|
	  queue.push(ts)
	end
	queue.push(nil)
	while it = queue.pop
	  @rings.push(it)
	end
      end
      
      @primary = queue.pop
      raise('RingNotFound') if @primary.nil?
      @primary
    end
  end

  class RingProvider
    def initialize(klass, front, desc, renewer = nil)
      @tuple = [:name, klass, front, desc]
      @renewer = renewer || Rinda::SimpleRenewer.new
    end

    def provide
      ts = Rinda::RingFinger.primary
      ts.write(@tuple, @renewer)
    end
  end
end

if __FILE__ == $0
  DRb.start_service
  case ARGV.shift
  when 's'
    require 'rinda/tuplespace'
    ts = Rinda::TupleSpace.new
    place = Rinda::RingServer.new(ts)
    $stdin.gets
  when 'w'
    finger = Rinda::RingFinger.new(nil)
    finger.lookup_ring do |ts|
      p ts
      ts.write([:hello, :world])
    end
  when 'r'
    finger = Rinda::RingFinger.new(nil)
    finger.lookup_ring do |ts|
      p ts
      p ts.take([nil, nil])
    end
  end
end