diff options
Diffstat (limited to 'ext/socket/lib/socket.rb')
| -rw-r--r-- | ext/socket/lib/socket.rb | 542 |
1 files changed, 503 insertions, 39 deletions
diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb index 4ed2df23e6..36fcceaee9 100644 --- a/ext/socket/lib/socket.rb +++ b/ext/socket/lib/socket.rb @@ -1,7 +1,11 @@ # frozen_string_literal: true require 'socket.so' -require 'io/wait' + +unless IO.method_defined?(:wait_writable, false) + # It's only required on older Rubies < v3.2: + require 'io/wait' +end class Addrinfo # creates an Addrinfo object from the arguments. @@ -58,7 +62,7 @@ class Addrinfo break when :wait_writable sock.wait_writable(timeout) or - raise Errno::ETIMEDOUT, 'user specified timeout' + raise Errno::ETIMEDOUT, "user specified timeout for #{self.ip_address}:#{self.ip_port}" end while true else sock.connect(self) @@ -197,7 +201,7 @@ class Addrinfo sock = Socket.new(self.pfamily, self.socktype, self.protocol) begin sock.ipv6only! if self.ipv6? - sock.setsockopt(:SOCKET, :REUSEADDR, 1) + sock.setsockopt(:SOCKET, :REUSEADDR, 1) unless self.pfamily == Socket::PF_UNIX sock.bind(self) sock.listen(backlog) rescue Exception @@ -223,8 +227,8 @@ class Addrinfo # # #<Addrinfo: [::1]:80 TCP (:80)> # # #<Addrinfo: [::1]:80 UDP (:80)> # - def self.foreach(nodename, service, family=nil, socktype=nil, protocol=nil, flags=nil, &block) - Addrinfo.getaddrinfo(nodename, service, family, socktype, protocol, flags).each(&block) + def self.foreach(nodename, service, family=nil, socktype=nil, protocol=nil, flags=nil, timeout: nil, &block) + Addrinfo.getaddrinfo(nodename, service, family, socktype, protocol, flags, timeout: timeout).each(&block) end end @@ -329,9 +333,10 @@ class BasicSocket < IO # _flags_ is zero or more of the +MSG_+ options. # The result, _mesg_, is the data received. # - # When recvfrom(2) returns 0, Socket#recv_nonblock returns - # an empty string as data. - # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but for UDP connections + # it may mean an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the number of bytes to receive from the socket @@ -476,9 +481,10 @@ class Socket < BasicSocket # The second element, _sender_addrinfo_, contains protocol-specific address # information of the sender. # - # When recvfrom(2) returns 0, Socket#recvfrom_nonblock returns - # an empty string as data. - # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but for UDP connections + # it may mean an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the maximum number of bytes to receive from the socket @@ -593,19 +599,54 @@ class Socket < BasicSocket __accept_nonblock(exception) end + # :stopdoc: + RESOLUTION_DELAY = 0.05 + private_constant :RESOLUTION_DELAY + + CONNECTION_ATTEMPT_DELAY = 0.25 + private_constant :CONNECTION_ATTEMPT_DELAY + + ADDRESS_FAMILIES = { + ipv6: Socket::AF_INET6, + ipv4: Socket::AF_INET + }.freeze + private_constant :ADDRESS_FAMILIES + + HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0 + private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED + + IPV6_ADDRESS_FORMAT = /\A(?i:(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?:(?::[0-9A-F]{1,4}){1,2}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?:(?::[0-9A-F]{1,4}){1,3}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?:(?::[0-9A-F]{1,4}){1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?:(?::[0-9A-F]{1,4}){1,5}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?:(?::[0-9A-F]{1,4}){1,6}|:(?:[0-9A-F]{1,4}:){0,5}[0-9A-F]{1,4}|:)|(?:::(?:[0-9A-F]{1,4}:){0,7}[0-9A-F]{1,4}|::)))(?:%.+)?\z/ + private_constant :IPV6_ADDRESS_FORMAT + # :startdoc: + # :call-seq: # Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) {|socket| ... } # Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) # # creates a new socket object connected to host:port using TCP/IP. # + # Starting from Ruby 3.4, this method operates according to the + # Happy Eyeballs Version 2 ({RFC 8305}[https://datatracker.ietf.org/doc/html/rfc8305]) + # algorithm by default. + # + # For details on Happy Eyeballs Version 2, + # see {Socket.tcp_fast_fallback=}[rdoc-ref:Socket.tcp_fast_fallback=]. + # + # To make it behave the same as in Ruby 3.3 and earlier, + # explicitly specify the option fast_fallback:false. + # Or, setting Socket.tcp_fast_fallback=false will disable + # Happy Eyeballs Version 2 not only for this method but for all Socket globally. + # # If local_host:local_port is given, # the socket is bound to it. # # The optional last argument _opts_ is options represented by a hash. # _opts_ may have following options: # - # [:connect_timeout] specify the timeout in seconds. + # [:resolv_timeout] Specifies the timeout in seconds from when the hostname resolution starts. + # [:connect_timeout] This method sequentially attempts connecting to all candidate destination addresses.<br>The +connect_timeout+ specifies the timeout in seconds from the start of the connection attempt to the last candidate.<br>By default, all connection attempts continue until the timeout occurs.<br>When +fast_fallback:false+ is explicitly specified,<br>a timeout is set for each connection attempt and any connection attempt that exceeds its timeout will be canceled. + # [:open_timeout] Specifies the timeout in seconds from the start of the method execution.<br>If this timeout is reached while there are still addresses that have not yet been attempted for connection, no further attempts will be made.<br>If this option is specified together with other timeout options, an +ArgumentError+ will be raised. + # [:fast_fallback] Enables the Happy Eyeballs Version 2 algorithm (enabled by default). # # If a block is given, the block is called with the socket. # The value of the block is returned. @@ -618,17 +659,296 @@ class Socket < BasicSocket # sock.close_write # puts sock.read # } - # - def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil) # :yield: socket + def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, open_timeout: nil, fast_fallback: tcp_fast_fallback, &) # :yield: socket + if open_timeout && (connect_timeout || resolv_timeout) + raise ArgumentError, "Cannot specify open_timeout along with connect_timeout or resolv_timeout" + end + + sock = if fast_fallback && !(host && ip_address?(host)) && !(local_port && local_port.to_i != 0) + tcp_with_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, open_timeout:) + else + tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, open_timeout:) + end + + if block_given? + begin + yield sock + ensure + sock.close + end + else + sock + end + end + + # :stopdoc: + def self.tcp_with_fast_fallback(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, open_timeout: nil) + if local_host || local_port + local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, timeout: open_timeout || resolv_timeout) + resolving_family_names = local_addrinfos.map { |lai| ADDRESS_FAMILIES.key(lai.afamily) }.uniq + else + local_addrinfos = [] + resolving_family_names = ADDRESS_FAMILIES.keys + end + + hostname_resolution_threads = [] + resolution_store = HostnameResolutionStore.new(resolving_family_names) + connecting_sockets = {} + is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/) + + now = current_clock_time + starts_at = now + resolution_delay_expires_at = nil + connection_attempt_delay_expires_at = nil + user_specified_connect_timeout_at = nil + user_specified_open_timeout_at = open_timeout ? now + open_timeout : nil + last_error = nil + last_error_from_thread = false + + if resolving_family_names.size == 1 + family_name = resolving_family_names.first + addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[:family_name], :STREAM, timeout: open_timeout || resolv_timeout) + resolution_store.add_resolved(family_name, addrinfos) + hostname_resolution_result = nil + hostname_resolution_notifier = nil + user_specified_resolv_timeout_at = nil + else + hostname_resolution_result = HostnameResolutionResult.new(resolving_family_names.size) + hostname_resolution_notifier = hostname_resolution_result.notifier + + hostname_resolution_threads.concat( + resolving_family_names.map { |family| + thread_args = [family, host, port, hostname_resolution_result] + thread = Thread.new(*thread_args) { |*thread_args| resolve_hostname(*thread_args) } + Thread.pass + thread + } + ) + user_specified_resolv_timeout_at = resolv_timeout ? now + resolv_timeout : Float::INFINITY + end + + loop do + if resolution_store.any_addrinfos? && + !resolution_delay_expires_at && + !connection_attempt_delay_expires_at + while (addrinfo = resolution_store.get_addrinfo) + if local_addrinfos.any? + local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily } + + if local_addrinfo.nil? + if resolution_store.any_addrinfos? + # Try other Addrinfo in next "while" + next + elsif connecting_sockets.any? || resolution_store.any_unresolved_family? + # Exit this "while" and wait for connections to be established or hostname resolution in next loop + # Or exit this "while" and wait for hostname resolution in next loop + break + else + raise SocketError.new 'no appropriate local address' + end + end + end + + begin + if resolution_store.any_addrinfos? || + connecting_sockets.any? || + resolution_store.any_unresolved_family? + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + socket.bind(local_addrinfo) if local_addrinfo + result = socket.connect_nonblock(addrinfo, exception: false) + else + timeout = + if open_timeout + t = open_timeout - (current_clock_time - starts_at) + t.negative? ? 0 : t + else + connect_timeout + end + result = socket = local_addrinfo ? + addrinfo.connect_from(local_addrinfo, timeout:) : + addrinfo.connect(timeout:) + end + + if result == :wait_writable + connection_attempt_delay_expires_at = now + CONNECTION_ATTEMPT_DELAY + if resolution_store.empty_addrinfos? + user_specified_connect_timeout_at = connect_timeout ? now + connect_timeout : Float::INFINITY + end + + connecting_sockets[socket] = addrinfo + break + else + return socket # connection established + end + rescue SystemCallError => e + socket&.close + last_error = e + + if resolution_store.any_addrinfos? + # Try other Addrinfo in next "while" + next + elsif connecting_sockets.any? || resolution_store.any_unresolved_family? + # Exit this "while" and wait for connections to be established or hostname resolution in next loop + # Or exit this "while" and wait for hostname resolution in next loop + break + else + raise last_error + end + end + end + end + + ends_at = + if resolution_store.any_addrinfos? + [(resolution_delay_expires_at || connection_attempt_delay_expires_at), + user_specified_open_timeout_at].compact.min + elsif user_specified_open_timeout_at + user_specified_open_timeout_at + else + [user_specified_resolv_timeout_at, user_specified_connect_timeout_at].compact.max + end + + hostname_resolved, writable_sockets, except_sockets = IO.select( + hostname_resolution_notifier, + connecting_sockets.keys, + # Use errorfds to wait for non-blocking connect failures on Windows + is_windows_environment ? connecting_sockets.keys : nil, + second_to_timeout(current_clock_time, ends_at), + ) + now = current_clock_time + resolution_delay_expires_at = nil if expired?(now, resolution_delay_expires_at) + connection_attempt_delay_expires_at = nil if expired?(now, connection_attempt_delay_expires_at) + + if writable_sockets&.any? + while (writable_socket = writable_sockets.pop) + is_connected = is_windows_environment || ( + sockopt = writable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR) + sockopt.int.zero? + ) + + if is_connected + connecting_sockets.delete writable_socket + return writable_socket + else + failed_ai = connecting_sockets.delete writable_socket + writable_socket.close + ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address + last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int) + + if writable_sockets.any? || connecting_sockets.any? + # Try other writable socket in next "while" + # Or exit this "while" and wait for connections to be established or hostname resolution in next loop + elsif resolution_store.any_addrinfos? || resolution_store.any_unresolved_family? + # Exit this "while" and try other connection attempt + # Or exit this "while" and wait for hostname resolution in next loop + connection_attempt_delay_expires_at = nil + user_specified_connect_timeout_at = nil + else + raise last_error + end + end + end + end + + if except_sockets&.any? + except_sockets.each do |except_socket| + failed_ai = connecting_sockets.delete except_socket + sockopt = except_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR) + except_socket.close + ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address + last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int) + + if except_sockets.any? || connecting_sockets.any? + # Cleanup other except socket in next "each" + # Or exit this "while" and wait for connections to be established or hostname resolution in next loop + elsif resolution_store.any_addrinfos? || resolution_store.any_unresolved_family? + # Exit this "while" and try other connection attempt + # Or exit this "while" and wait for hostname resolution in next loop + connection_attempt_delay_expires_at = nil + user_specified_connect_timeout_at = nil + else + raise last_error + end + end + end + + if hostname_resolved&.any? + while (family_and_result = hostname_resolution_result.get) + family_name, result = family_and_result + + if result.is_a? Exception + resolution_store.add_error(family_name, result) + + unless (Socket.const_defined?(:EAI_ADDRFAMILY)) && + (result.is_a?(Socket::ResolutionError)) && + (result.error_code == Socket::EAI_ADDRFAMILY) + other = family_name == :ipv6 ? :ipv4 : :ipv6 + if !resolution_store.resolved?(other) || !resolution_store.resolved_successfully?(other) + last_error = result + last_error_from_thread = true + end + end + else + resolution_store.add_resolved(family_name, result) + end + end + + if resolution_store.resolved?(:ipv4) + if resolution_store.resolved?(:ipv6) + hostname_resolution_notifier = nil + resolution_delay_expires_at = nil + user_specified_resolv_timeout_at = nil + elsif resolution_store.resolved_successfully?(:ipv4) + resolution_delay_expires_at = now + RESOLUTION_DELAY + end + end + end + + if expired?(now, user_specified_open_timeout_at) + raise(IO::TimeoutError, "user specified timeout for #{host}:#{port}") + end + + if resolution_store.empty_addrinfos? + if connecting_sockets.empty? && resolution_store.resolved_all_families? + if last_error_from_thread + raise last_error.class, last_error.message, cause: last_error + else + raise last_error + end + end + + if (expired?(now, user_specified_resolv_timeout_at) || resolution_store.resolved_all_families?) && + (expired?(now, user_specified_connect_timeout_at) || connecting_sockets.empty?) + raise(IO::TimeoutError, "user specified timeout for #{host}:#{port}") + end + end + end + ensure + hostname_resolution_threads.each do |thread| + thread.exit + end + + hostname_resolution_result&.close + + connecting_sockets.each_key do |connecting_socket| + connecting_socket.close + end + end + private_class_method :tcp_with_fast_fallback + + def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, open_timeout:) last_error = nil ret = nil local_addr_list = nil if local_host != nil || local_port != nil - local_addr_list = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, nil) + local_addr_list = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, nil, timeout: open_timeout || resolv_timeout) end - Addrinfo.foreach(host, port, nil, :STREAM) {|ai| + timeout = open_timeout ? open_timeout : resolv_timeout + starts_at = current_clock_time + + Addrinfo.foreach(host, port, nil, :STREAM, timeout:) {|ai| if local_addr_list local_addr = local_addr_list.find {|local_ai| local_ai.afamily == ai.afamily } next unless local_addr @@ -636,9 +956,17 @@ class Socket < BasicSocket local_addr = nil end begin + timeout = + if open_timeout + t = open_timeout - (current_clock_time - starts_at) + t.negative? ? 0 : t + else + connect_timeout + end + sock = local_addr ? - ai.connect_from(local_addr, timeout: connect_timeout) : - ai.connect(timeout: connect_timeout) + ai.connect_from(local_addr, timeout:) : + ai.connect(timeout:) rescue SystemCallError last_error = $! next @@ -653,18 +981,150 @@ class Socket < BasicSocket raise SocketError, "no appropriate local address" end end - if block_given? - begin - yield ret - ensure - ret.close + + ret + end + private_class_method :tcp_without_fast_fallback + + def self.ip_address?(hostname) + hostname.match?(IPV6_ADDRESS_FORMAT) || hostname.match?(/\A([0-9]{1,3}\.){3}[0-9]{1,3}\z/) + end + private_class_method :ip_address? + + def self.resolve_hostname(family, host, port, hostname_resolution_result) + begin + resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM) + hostname_resolution_result.add(family, resolved_addrinfos) + rescue => e + hostname_resolution_result.add(family, e) + end + end + private_class_method :resolve_hostname + + def self.current_clock_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + private_class_method :current_clock_time + + def self.second_to_timeout(started_at, ends_at) + return nil if ends_at == Float::INFINITY || ends_at.nil? + + remaining = (ends_at - started_at) + remaining.negative? ? 0 : remaining + end + private_class_method :second_to_timeout + + def self.expired?(started_at, ends_at) + second_to_timeout(started_at, ends_at)&.zero? + end + private_class_method :expired? + + class HostnameResolutionResult + def initialize(size) + @size = size + @taken_count = 0 + @rpipe, @wpipe = IO.pipe + @results = [] + @mutex = Mutex.new + end + + def notifier + [@rpipe] + end + + def add(family, result) + @mutex.synchronize do + @results.push [family, result] + @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED end - else - ret + end + + def get + return nil if @results.empty? + + res = nil + + @mutex.synchronize do + @rpipe.getbyte + res = @results.shift + end + + @taken_count += 1 + close if @taken_count == @size + res + end + + def close + @rpipe.close + @wpipe.close end end + private_constant :HostnameResolutionResult + + class HostnameResolutionStore + PRIORITY_ON_V6 = [:ipv6, :ipv4].freeze + PRIORITY_ON_V4 = [:ipv4, :ipv6].freeze + + def initialize(family_names) + @family_names = family_names + @addrinfo_dict = {} + @error_dict = {} + @last_family = nil + end + + def add_resolved(family_name, addrinfos) + @addrinfo_dict[family_name] = addrinfos + end + + def add_error(family_name, error) + @addrinfo_dict[family_name] = [] + @error_dict[family_name] = error + end + + def get_addrinfo + precedences = + case @last_family + when :ipv4, nil then PRIORITY_ON_V6 + when :ipv6 then PRIORITY_ON_V4 + end + + precedences.each do |family_name| + addrinfo = @addrinfo_dict[family_name]&.shift + next unless addrinfo + + @last_family = family_name + return addrinfo + end + + nil + end + + def empty_addrinfos? + @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? } + end + + def any_addrinfos? + !empty_addrinfos? + end + + def resolved?(family) + @addrinfo_dict.has_key? family + end + + def resolved_successfully?(family) + resolved?(family) && !@error_dict[family] + end + + def resolved_all_families? + (@family_names - @addrinfo_dict.keys).empty? + end + + def any_unresolved_family? + !resolved_all_families? + end + end + private_constant :HostnameResolutionStore - # :stopdoc: def self.ip_sockets_port0(ai_list, reuseaddr) sockets = [] begin @@ -697,9 +1157,7 @@ class Socket < BasicSocket end sockets end - class << self - private :ip_sockets_port0 - end + private_class_method :ip_sockets_port0 def self.tcp_server_sockets_port0(host) ai_list = Addrinfo.getaddrinfo(host, 0, nil, :STREAM, nil, Socket::AI_PASSIVE) @@ -1127,13 +1585,18 @@ class Socket < BasicSocket end end - class << self - private - - def unix_socket_abstract_name?(path) - /linux/ =~ RUBY_PLATFORM && /\A(\0|\z)/ =~ path + # :stopdoc: + if RUBY_PLATFORM.include?("linux") + def self.unix_socket_abstract_name?(path) + path.empty? or path.start_with?("\0") + end + else + def self.unix_socket_abstract_name?(path) + false end end + private_class_method :unix_socket_abstract_name? + # :startdoc: # creates a UNIX socket server on _path_. # It calls the block for each socket accepted. @@ -1173,7 +1636,7 @@ class Socket < BasicSocket # Returns 0 if successful, otherwise an exception is raised. # # === Parameter - # # +remote_sockaddr+ - the +struct+ sockaddr contained in a string or Addrinfo object + # * +remote_sockaddr+ - the +struct+ sockaddr contained in a string or Addrinfo object # # === Example: # # Pull down Google's web page @@ -1208,7 +1671,7 @@ class Socket < BasicSocket # return the symbol +:wait_writable+ instead. # # === See - # # Socket#connect + # * Socket#connect def connect_nonblock(addr, exception: true) __connect_nonblock(addr, exception) end @@ -1225,9 +1688,10 @@ class UDPSocket < IPSocket # The first element of the results, _mesg_, is the data received. # The second element, _sender_inet_addr_, is an array to represent the sender address. # - # When recvfrom(2) returns 0, - # Socket#recvfrom_nonblock returns an empty string as data. - # It means an empty packet. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but it may also mean + # an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the number of bytes to receive from the socket |
