diff options
Diffstat (limited to 'ext/socket/lib')
-rw-r--r-- | ext/socket/lib/socket.rb | 453 |
1 files changed, 432 insertions, 21 deletions
diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb index d756a32a5a..d61de1e8e9 100644 --- a/ext/socket/lib/socket.rb +++ b/ext/socket/lib/socket.rb @@ -1,7 +1,11 @@ # frozen_string_literal: true require 'socket.so' -require 'io/wait' + +unless IO.method_defined?(:wait_writable, false) + # It's only required on older Rubies < v3.2: + require 'io/wait' +end class Addrinfo # creates an Addrinfo object from the arguments. @@ -197,7 +201,7 @@ class Addrinfo sock = Socket.new(self.pfamily, self.socktype, self.protocol) begin sock.ipv6only! if self.ipv6? - sock.setsockopt(:SOCKET, :REUSEADDR, 1) + sock.setsockopt(:SOCKET, :REUSEADDR, 1) unless self.pfamily == Socket::PF_UNIX sock.bind(self) sock.listen(backlog) rescue Exception @@ -329,9 +333,10 @@ class BasicSocket < IO # _flags_ is zero or more of the +MSG_+ options. # The result, _mesg_, is the data received. # - # When recvfrom(2) returns 0, Socket#recv_nonblock returns - # an empty string as data. - # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but for UDP connections + # it may mean an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the number of bytes to receive from the socket @@ -476,9 +481,10 @@ class Socket < BasicSocket # The second element, _sender_addrinfo_, contains protocol-specific address # information of the sender. # - # When recvfrom(2) returns 0, Socket#recvfrom_nonblock returns - # an empty string as data. - # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but for UDP connections + # it may mean an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the maximum number of bytes to receive from the socket @@ -593,6 +599,30 @@ class Socket < BasicSocket __accept_nonblock(exception) end + RESOLUTION_DELAY = 0.05 + private_constant :RESOLUTION_DELAY + + CONNECTION_ATTEMPT_DELAY = 0.25 + private_constant :CONNECTION_ATTEMPT_DELAY + + ADDRESS_FAMILIES = { + ipv6: Socket::AF_INET6, + ipv4: Socket::AF_INET + }.freeze + private_constant :ADDRESS_FAMILIES + + HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0 + private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED + + IPV6_ADRESS_FORMAT = /\A(?i:(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?:(?::[0-9A-F]{1,4}){1,2}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?:(?::[0-9A-F]{1,4}){1,3}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?:(?::[0-9A-F]{1,4}){1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?:(?::[0-9A-F]{1,4}){1,5}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?:(?::[0-9A-F]{1,4}){1,6}|:(?:[0-9A-F]{1,4}:){0,5}[0-9A-F]{1,4}|:)|(?:::(?:[0-9A-F]{1,4}:){0,7}[0-9A-F]{1,4}|::)))(?:%.+)?\z/ + private_constant :IPV6_ADRESS_FORMAT + + @tcp_fast_fallback = true + + class << self + attr_accessor :tcp_fast_fallback + end + # :call-seq: # Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) {|socket| ... } # Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) @@ -606,7 +636,6 @@ class Socket < BasicSocket # _opts_ may have following options: # # [:connect_timeout] specify the timeout in seconds. - # [:resolv_timeout] specify the name resolution timeout in seconds. # # If a block is given, the block is called with the socket. # The value of the block is returned. @@ -619,8 +648,256 @@ class Socket < BasicSocket # sock.close_write # puts sock.read # } - # - def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil) # :yield: socket + def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, fast_fallback: tcp_fast_fallback, &) # :yield: socket + sock = if fast_fallback && !(host && ip_address?(host)) + tcp_with_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:) + else + tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:) + end + + if block_given? + begin + yield sock + ensure + sock.close + end + else + sock + end + end + + def self.tcp_with_fast_fallback(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil) + if local_host || local_port + local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, timeout: resolv_timeout) + resolving_family_names = local_addrinfos.map { |lai| ADDRESS_FAMILIES.key(lai.afamily) }.uniq + else + local_addrinfos = [] + resolving_family_names = ADDRESS_FAMILIES.keys + end + + hostname_resolution_threads = [] + resolution_store = HostnameResolutionStore.new(resolving_family_names) + connecting_sockets = {} + is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/) + + now = current_clock_time + resolution_delay_expires_at = nil + connection_attempt_delay_expires_at = nil + user_specified_connect_timeout_at = nil + last_error = nil + + if resolving_family_names.size == 1 + family_name = resolving_family_names.first + addrinfos = Addrinfo.getaddrinfo(host, port, family_name, :STREAM, timeout: resolv_timeout) + resolution_store.add_resolved(family_name, addrinfos) + hostname_resolution_result = nil + hostname_resolution_notifier = nil + user_specified_resolv_timeout_at = nil + else + hostname_resolution_result = HostnameResolutionResult.new(resolving_family_names.size) + hostname_resolution_notifier = hostname_resolution_result.notifier + + hostname_resolution_threads.concat( + resolving_family_names.map { |family| + thread_args = [family, host, port, hostname_resolution_result] + thread = Thread.new(*thread_args) { |*thread_args| resolve_hostname(*thread_args) } + Thread.pass + thread + } + ) + + user_specified_resolv_timeout_at = resolv_timeout ? now + resolv_timeout : Float::INFINITY + end + + loop do + if resolution_store.any_addrinfos? && + !resolution_delay_expires_at && + !connection_attempt_delay_expires_at + while (addrinfo = resolution_store.get_addrinfo) + if local_addrinfos.any? + local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily } + + if local_addrinfo.nil? # Connecting addrinfoと同じアドレスファミリのLocal addrinfoがない + if resolution_store.any_addrinfos? + # Try other Addrinfo in next "while" + next + elsif connecting_sockets.any? || resolution_store.any_unresolved_family? + # Exit this "while" and wait for connections to be established or hostname resolution in next loop + # Or exit this "while" and wait for hostname resolution in next loop + break + else + raise SocketError.new 'no appropriate local address' + end + end + end + + begin + if resolution_store.any_addrinfos? || + connecting_sockets.any? || + resolution_store.any_unresolved_family? + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + socket.bind(local_addrinfo) if local_addrinfo + result = socket.connect_nonblock(addrinfo, exception: false) + else + result = socket = local_addrinfo ? + addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) : + addrinfo.connect(timeout: connect_timeout) + end + + if result == :wait_writable + connection_attempt_delay_expires_at = now + CONNECTION_ATTEMPT_DELAY + if resolution_store.empty_addrinfos? + user_specified_connect_timeout_at = connect_timeout ? now + connect_timeout : Float::INFINITY + end + + connecting_sockets[socket] = addrinfo + break + else + return socket # connection established + end + rescue SystemCallError => e + socket&.close + last_error = e + + if resolution_store.any_addrinfos? + # Try other Addrinfo in next "while" + next + elsif connecting_sockets.any? || resolution_store.any_unresolved_family? + # Exit this "while" and wait for connections to be established or hostname resolution in next loop + # Or exit this "while" and wait for hostname resolution in next loop + break + else + raise last_error + end + end + end + end + + ends_at = + if resolution_store.any_addrinfos? + resolution_delay_expires_at || connection_attempt_delay_expires_at + else + [user_specified_resolv_timeout_at, user_specified_connect_timeout_at].compact.max + end + + hostname_resolved, writable_sockets, except_sockets = IO.select( + hostname_resolution_notifier, + connecting_sockets.keys, + # Use errorfds to wait for non-blocking connect failures on Windows + is_windows_environment ? connecting_sockets.keys : nil, + second_to_timeout(current_clock_time, ends_at), + ) + now = current_clock_time + resolution_delay_expires_at = nil if expired?(now, resolution_delay_expires_at) + connection_attempt_delay_expires_at = nil if expired?(now, connection_attempt_delay_expires_at) + + if writable_sockets&.any? + while (writable_socket = writable_sockets.pop) + is_connected = is_windows_environment || ( + sockopt = writable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR) + sockopt.int.zero? + ) + + if is_connected + connecting_sockets.delete writable_socket + return writable_socket + else + failed_ai = connecting_sockets.delete writable_socket + writable_socket.close + ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address + last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int) + + if writable_sockets.any? || + resolution_store.any_addrinfos? || + connecting_sockets.any? || + resolution_store.any_unresolved_family? + user_specified_connect_timeout_at = nil if connecting_sockets.empty? + # Try other writable socket in next "while" + # Or exit this "while" and try other connection attempt + # Or exit this "while" and wait for connections to be established or hostname resolution in next loop + # Or exit this "while" and wait for hostname resolution in next loop + else + raise last_error + end + end + end + end + + if except_sockets&.any? + except_sockets.each do |except_socket| + failed_ai = connecting_sockets.delete except_socket + sockopt = except_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME) + except_socket.close + ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address + last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int) + + if writable_sockets.any? || + resolution_store.any_addrinfos? || + connecting_sockets.any? || + resolution_store.any_unresolved_family? + user_specified_connect_timeout_at = nil if connecting_sockets.empty? + # Try other writable socket in next "while" + # Or exit this "while" and try other connection attempt + # Or exit this "while" and wait for connections to be established or hostname resolution in next loop + # Or exit this "while" and wait for hostname resolution in next loop + else + raise last_error + end + end + end + + if hostname_resolved&.any? + while (family_and_result = hostname_resolution_result.get) + family_name, result = family_and_result + + if result.is_a? Exception + resolution_store.add_error(family_name, result) + + unless (Socket.const_defined?(:EAI_ADDRFAMILY)) && + (result.is_a?(Socket::ResolutionError)) && + (result.error_code == Socket::EAI_ADDRFAMILY) + last_error = result + end + else + resolution_store.add_resolved(family_name, result) + end + end + + if resolution_store.resolved?(:ipv4) + if resolution_store.resolved?(:ipv6) + hostname_resolution_notifier = nil + resolution_delay_expires_at = nil + user_specified_resolv_timeout_at = nil + elsif resolution_store.resolved_successfully?(:ipv4) + resolution_delay_expires_at = now + RESOLUTION_DELAY + end + end + end + + if resolution_store.empty_addrinfos? + if connecting_sockets.empty? && resolution_store.resolved_all_families? + raise last_error + end + + if (expired?(now, user_specified_resolv_timeout_at) || resolution_store.resolved_all_families?) && + (expired?(now, user_specified_connect_timeout_at) || connecting_sockets.empty?) + raise Errno::ETIMEDOUT, 'user specified timeout' + end + end + end + ensure + hostname_resolution_threads.each do |thread| + thread.exit + end + + hostname_resolution_result&.close + + connecting_sockets.each_key do |connecting_socket| + connecting_socket.close + end + end + + def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:) last_error = nil ret = nil @@ -654,16 +931,149 @@ class Socket < BasicSocket raise SocketError, "no appropriate local address" end end - if block_given? - begin - yield ret - ensure - ret.close + + ret + end + private_class_method :tcp_without_fast_fallback + + def self.ip_address?(hostname) + hostname.match?(IPV6_ADRESS_FORMAT) || hostname.match?(/\A([0-9]{1,3}\.){3}[0-9]{1,3}\z/) + end + private_class_method :ip_address? + + def self.resolve_hostname(family, host, port, hostname_resolution_result) + begin + resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM) + hostname_resolution_result.add(family, resolved_addrinfos) + rescue => e + hostname_resolution_result.add(family, e) + end + end + private_class_method :resolve_hostname + + def self.current_clock_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + private_class_method :current_clock_time + + def self.second_to_timeout(started_at, ends_at) + return nil if ends_at == Float::INFINITY || ends_at.nil? + + remaining = (ends_at - started_at) + remaining.negative? ? 0 : remaining + end + private_class_method :second_to_timeout + + def self.expired?(started_at, ends_at) + second_to_timeout(started_at, ends_at)&.zero? + end + private_class_method :expired? + + class HostnameResolutionResult + def initialize(size) + @size = size + @taken_count = 0 + @rpipe, @wpipe = IO.pipe + @results = [] + @mutex = Mutex.new + end + + def notifier + [@rpipe] + end + + def add(family, result) + @mutex.synchronize do + @results.push [family, result] + @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED end - else - ret + end + + def get + return nil if @results.empty? + + res = nil + + @mutex.synchronize do + @rpipe.getbyte + res = @results.shift + end + + @taken_count += 1 + close if @taken_count == @size + res + end + + def close + @rpipe.close + @wpipe.close + end + end + private_constant :HostnameResolutionResult + + class HostnameResolutionStore + PRIORITY_ON_V6 = [:ipv6, :ipv4] + PRIORITY_ON_V4 = [:ipv4, :ipv6] + + def initialize(family_names) + @family_names = family_names + @addrinfo_dict = {} + @error_dict = {} + @last_family = nil + end + + def add_resolved(family_name, addrinfos) + @addrinfo_dict[family_name] = addrinfos + end + + def add_error(family_name, error) + @addrinfo_dict[family_name] = [] + @error_dict[family_name] = error + end + + def get_addrinfo + precedences = + case @last_family + when :ipv4, nil then PRIORITY_ON_V6 + when :ipv6 then PRIORITY_ON_V4 + end + + precedences.each do |family_name| + addrinfo = @addrinfo_dict[family_name]&.shift + next unless addrinfo + + @last_family = family_name + return addrinfo + end + + nil + end + + def empty_addrinfos? + @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? } + end + + def any_addrinfos? + !empty_addrinfos? + end + + def resolved?(family) + @addrinfo_dict.has_key? family + end + + def resolved_successfully?(family) + resolved?(family) && !!@error_dict[family] + end + + def resolved_all_families? + (@family_names - @addrinfo_dict.keys).empty? + end + + def any_unresolved_family? + !resolved_all_families? end end + private_constant :HostnameResolutionStore # :stopdoc: def self.ip_sockets_port0(ai_list, reuseaddr) @@ -1226,9 +1636,10 @@ class UDPSocket < IPSocket # The first element of the results, _mesg_, is the data received. # The second element, _sender_inet_addr_, is an array to represent the sender address. # - # When recvfrom(2) returns 0, - # Socket#recvfrom_nonblock returns an empty string as data. - # It means an empty packet. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but it may also mean + # an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the number of bytes to receive from the socket |