From 646bcaa71742b78c6999fcecf133183a8a28a414 Mon Sep 17 00:00:00 2001 From: drbrain Date: Sat, 23 Mar 2013 20:10:11 +0000 Subject: * lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and Rinda::RingServer. [ruby-trunk - Bug #8073] * test/rinda/test_rinda.rb: Test for the above. * NEWS: Update with Rinda multicast support git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@39895 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- lib/rinda/ring.rb | 172 +++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 150 insertions(+), 22 deletions(-) (limited to 'lib/rinda') diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb index 8a2a201c8a..fc85f44398 100644 --- a/lib/rinda/ring.rb +++ b/lib/rinda/ring.rb @@ -4,6 +4,7 @@ require 'drb/drb' require 'rinda/rinda' require 'thread' +require 'ipaddr' module Rinda @@ -26,26 +27,91 @@ module Rinda include DRbUndumped + ## + # Special renewer for the RingServer to allow shutdown + + class Renewer # :nodoc: + include DRbUndumped + + ## + # Set to false to shutdown future requests using this Renewer + + attr_accessor :renew + + def initialize # :nodoc: + @renew = true + end + + def renew # :nodoc: + @renew ? 1 : true + end + end + ## # Advertises +ts+ on the UDP broadcast address at +port+. - def initialize(ts, port=Ring_PORT) + def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) + @port = port + + if Integer === addresses then + addresses, @port = [Socket::INADDR_ANY], addresses + end + + @renewer = Renewer.new + @ts = ts - @soc = UDPSocket.open - @soc.bind('', port) - @w_service = write_service - @r_service = reply_service + @sockets = addresses.map do |address| + make_socket(address) + end + + @w_services = write_services + @r_service = reply_service end ## - # Creates a thread that picks up UDP packets and passes them to do_write - # for decoding. + # Creates a socket at +address+ - def write_service - Thread.new do - loop do - msg = @soc.recv(1024) - do_write(msg) + def make_socket(address) + addrinfo = Addrinfo.udp(address, @port) + + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, + addrinfo.protocol) + + if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then + if Socket.const_defined?(:SO_REUSEPORT) then + socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) + else + socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) + end + + if addrinfo.ipv4_multicast? then + mreq = IPAddr.new(addrinfo.ip_address).hton + + IPAddr.new('0.0.0.0').hton + + socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) + else + mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I') + + socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) + end + end + + socket.bind(addrinfo) + + socket + end + + ## + # Creates threads that pick up UDP packets and passes them to do_write for + # decoding. + + def write_services + @sockets.map do |s| + Thread.new(s) do |socket| + loop do + msg = socket.recv(1024) + do_write(msg) + end end end end @@ -80,11 +146,28 @@ module Rinda # address of the local TupleSpace. def do_reply - tuple = @ts.take([:lookup_ring, DRbObject]) + tuple = @ts.take([:lookup_ring, DRbObject], @renewer) Thread.new { tuple[1].call(@ts) rescue nil} rescue end + ## + # Shuts down the RingServer + + def shutdown + @renewer.renew = false + + @w_services.each do |thread| + thread.kill + end + + @sockets.each do |socket| + socket.close + end + + @r_service.kill + end + end ## @@ -130,6 +213,18 @@ module Rinda attr_accessor :broadcast_list + ## + # Maximum number of hops for sent multicast packets (if using a multicast + # address in the broadcast list). The default is 1 (same as UDP + # broadcast). + + attr_accessor :multicast_hops + + ## + # The interface index to send IPv6 multicast packets from. + + attr_accessor :multicast_interface + ## # The port that RingFinger will send query packets to. @@ -149,6 +244,9 @@ module Rinda @port = port @primary = nil @rings = [] + + @multicast_hops = 1 + @multicast_interface = 0 end ## @@ -178,15 +276,7 @@ module Rinda msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) @broadcast_list.each do |it| - soc = UDPSocket.open - begin - soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) - soc.send(msg, 0, it, @port) - rescue - nil - ensure - soc.close - end + send_message(it, msg) end sleep(timeout) end @@ -217,6 +307,44 @@ module Rinda @primary end + ## + # Creates a socket for +address+ with the appropriate multicast options + # for multicast addresses. + + def make_socket(address) # :nodoc: + addrinfo = Addrinfo.udp(address, @port) + + soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + + if addrinfo.ipv4_multicast? then + soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL, + [@multicast_hops].pack('c')) + elsif addrinfo.ipv6_multicast? then + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS, + [@multicast_hops].pack('I')) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF, + [@multicast_interface].pack('I')) + else + soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true) + end + + soc.connect(addrinfo) + + soc + end + + def send_message(address, message) # :nodoc: + soc = make_socket(address) + + soc.send(message, 0) + rescue + nil + ensure + soc.close if soc + end + end ## -- cgit v1.2.3