summaryrefslogtreecommitdiff
path: root/lib/rinda
diff options
context:
space:
mode:
authordrbrain <drbrain@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2013-03-23 20:10:11 +0000
committerdrbrain <drbrain@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2013-03-23 20:10:11 +0000
commit646bcaa71742b78c6999fcecf133183a8a28a414 (patch)
tree79f6b6b5e4cb951f57ca04cbcc3ee703a4238654 /lib/rinda
parent362208c041ee8feed9262d0c7f2c78bf3caea6e5 (diff)
* lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and
Rinda::RingServer. [ruby-trunk - Bug #8073] * test/rinda/test_rinda.rb: Test for the above. * NEWS: Update with Rinda multicast support git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@39895 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'lib/rinda')
-rw-r--r--lib/rinda/ring.rb172
1 files changed, 150 insertions, 22 deletions
diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb
index 8a2a201..fc85f44 100644
--- a/lib/rinda/ring.rb
+++ b/lib/rinda/ring.rb
@@ -4,6 +4,7 @@
require 'drb/drb'
require 'rinda/rinda'
require 'thread'
+require 'ipaddr'
module Rinda
@@ -27,25 +28,90 @@ module Rinda
include DRbUndumped
##
+ # Special renewer for the RingServer to allow shutdown
+
+ class Renewer # :nodoc:
+ include DRbUndumped
+
+ ##
+ # Set to false to shutdown future requests using this Renewer
+
+ attr_accessor :renew
+
+ def initialize # :nodoc:
+ @renew = true
+ end
+
+ def renew # :nodoc:
+ @renew ? 1 : true
+ end
+ end
+
+ ##
# Advertises +ts+ on the UDP broadcast address at +port+.
- def initialize(ts, port=Ring_PORT)
+ def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT)
+ @port = port
+
+ if Integer === addresses then
+ addresses, @port = [Socket::INADDR_ANY], addresses
+ end
+
+ @renewer = Renewer.new
+
@ts = ts
- @soc = UDPSocket.open
- @soc.bind('', port)
- @w_service = write_service
- @r_service = reply_service
+ @sockets = addresses.map do |address|
+ make_socket(address)
+ end
+
+ @w_services = write_services
+ @r_service = reply_service
end
##
- # Creates a thread that picks up UDP packets and passes them to do_write
- # for decoding.
+ # Creates a socket at +address+
- def write_service
- Thread.new do
- loop do
- msg = @soc.recv(1024)
- do_write(msg)
+ def make_socket(address)
+ addrinfo = Addrinfo.udp(address, @port)
+
+ socket = Socket.new(addrinfo.pfamily, addrinfo.socktype,
+ addrinfo.protocol)
+
+ if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then
+ if Socket.const_defined?(:SO_REUSEPORT) then
+ socket.setsockopt(:SOCKET, :SO_REUSEPORT, true)
+ else
+ socket.setsockopt(:SOCKET, :SO_REUSEADDR, true)
+ end
+
+ if addrinfo.ipv4_multicast? then
+ mreq = IPAddr.new(addrinfo.ip_address).hton +
+ IPAddr.new('0.0.0.0').hton
+
+ socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq)
+ else
+ mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I')
+
+ socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq)
+ end
+ end
+
+ socket.bind(addrinfo)
+
+ socket
+ end
+
+ ##
+ # Creates threads that pick up UDP packets and passes them to do_write for
+ # decoding.
+
+ def write_services
+ @sockets.map do |s|
+ Thread.new(s) do |socket|
+ loop do
+ msg = socket.recv(1024)
+ do_write(msg)
+ end
end
end
end
@@ -80,11 +146,28 @@ module Rinda
# address of the local TupleSpace.
def do_reply
- tuple = @ts.take([:lookup_ring, DRbObject])
+ tuple = @ts.take([:lookup_ring, DRbObject], @renewer)
Thread.new { tuple[1].call(@ts) rescue nil}
rescue
end
+ ##
+ # Shuts down the RingServer
+
+ def shutdown
+ @renewer.renew = false
+
+ @w_services.each do |thread|
+ thread.kill
+ end
+
+ @sockets.each do |socket|
+ socket.close
+ end
+
+ @r_service.kill
+ end
+
end
##
@@ -131,6 +214,18 @@ module Rinda
attr_accessor :broadcast_list
##
+ # Maximum number of hops for sent multicast packets (if using a multicast
+ # address in the broadcast list). The default is 1 (same as UDP
+ # broadcast).
+
+ attr_accessor :multicast_hops
+
+ ##
+ # The interface index to send IPv6 multicast packets from.
+
+ attr_accessor :multicast_interface
+
+ ##
# The port that RingFinger will send query packets to.
attr_accessor :port
@@ -149,6 +244,9 @@ module Rinda
@port = port
@primary = nil
@rings = []
+
+ @multicast_hops = 1
+ @multicast_interface = 0
end
##
@@ -178,15 +276,7 @@ module Rinda
msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
@broadcast_list.each do |it|
- soc = UDPSocket.open
- begin
- soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
- soc.send(msg, 0, it, @port)
- rescue
- nil
- ensure
- soc.close
- end
+ send_message(it, msg)
end
sleep(timeout)
end
@@ -217,6 +307,44 @@ module Rinda
@primary
end
+ ##
+ # Creates a socket for +address+ with the appropriate multicast options
+ # for multicast addresses.
+
+ def make_socket(address) # :nodoc:
+ addrinfo = Addrinfo.udp(address, @port)
+
+ soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
+
+ if addrinfo.ipv4_multicast? then
+ soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true)
+ soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL,
+ [@multicast_hops].pack('c'))
+ elsif addrinfo.ipv6_multicast? then
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true)
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS,
+ [@multicast_hops].pack('I'))
+ soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF,
+ [@multicast_interface].pack('I'))
+ else
+ soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true)
+ end
+
+ soc.connect(addrinfo)
+
+ soc
+ end
+
+ def send_message(address, message) # :nodoc:
+ soc = make_socket(address)
+
+ soc.send(message, 0)
+ rescue
+ nil
+ ensure
+ soc.close if soc
+ end
+
end
##