diff options
Diffstat (limited to 'ext/socket/lib/socket.rb')
-rw-r--r-- | ext/socket/lib/socket.rb | 542 |
1 files changed, 528 insertions, 14 deletions
diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb index d756a32a5a..e953077fe6 100644 --- a/ext/socket/lib/socket.rb +++ b/ext/socket/lib/socket.rb @@ -1,7 +1,11 @@ # frozen_string_literal: true require 'socket.so' -require 'io/wait' + +unless IO.method_defined?(:wait_writable, false) + # It's only required on older Rubies < v3.2: + require 'io/wait' +end class Addrinfo # creates an Addrinfo object from the arguments. @@ -197,7 +201,7 @@ class Addrinfo sock = Socket.new(self.pfamily, self.socktype, self.protocol) begin sock.ipv6only! if self.ipv6? - sock.setsockopt(:SOCKET, :REUSEADDR, 1) + sock.setsockopt(:SOCKET, :REUSEADDR, 1) unless self.pfamily == Socket::PF_UNIX sock.bind(self) sock.listen(backlog) rescue Exception @@ -329,9 +333,10 @@ class BasicSocket < IO # _flags_ is zero or more of the +MSG_+ options. # The result, _mesg_, is the data received. # - # When recvfrom(2) returns 0, Socket#recv_nonblock returns - # an empty string as data. - # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but for UDP connections + # it may mean an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the number of bytes to receive from the socket @@ -476,9 +481,10 @@ class Socket < BasicSocket # The second element, _sender_addrinfo_, contains protocol-specific address # information of the sender. # - # When recvfrom(2) returns 0, Socket#recvfrom_nonblock returns - # an empty string as data. - # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but for UDP connections + # it may mean an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the maximum number of bytes to receive from the socket @@ -593,6 +599,30 @@ class Socket < BasicSocket __accept_nonblock(exception) end + RESOLUTION_DELAY = 0.05 + private_constant :RESOLUTION_DELAY + + CONNECTION_ATTEMPT_DELAY = 0.25 + private_constant :CONNECTION_ATTEMPT_DELAY + + ADDRESS_FAMILIES = { + ipv6: Socket::AF_INET6, + ipv4: Socket::AF_INET + }.freeze + private_constant :ADDRESS_FAMILIES + + HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0 + private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED + + IPV6_ADRESS_FORMAT = /(?i)(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|::(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|::(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,6}[0-9A-F]{1,4}|:))(?:%.+)?/ + private_constant :IPV6_ADRESS_FORMAT + + @tcp_fast_fallback = true + + class << self + attr_accessor :tcp_fast_fallback + end + # :call-seq: # Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) {|socket| ... } # Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) @@ -606,7 +636,6 @@ class Socket < BasicSocket # _opts_ may have following options: # # [:connect_timeout] specify the timeout in seconds. - # [:resolv_timeout] specify the name resolution timeout in seconds. # # If a block is given, the block is called with the socket. # The value of the block is returned. @@ -619,8 +648,491 @@ class Socket < BasicSocket # sock.close_write # puts sock.read # } - # - def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil) # :yield: socket + def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, fast_fallback: tcp_fast_fallback, &block) # :yield: socket + unless fast_fallback + return tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block) + end + + # Happy Eyeballs' states + # - :start + # - :v6c + # - :v4w + # - :v4c + # - :v46c + # - :v46w + # - :success + # - :failure + # - :timeout + + specified_family_name = nil + hostname_resolution_threads = [] + hostname_resolution_queue = nil + hostname_resolution_waiting = nil + hostname_resolution_expires_at = nil + selectable_addrinfos = SelectableAddrinfos.new + connecting_sockets = ConnectingSockets.new + local_addrinfos = [] + connection_attempt_delay_expires_at = nil + connection_attempt_started_at = nil + state = :start + connected_socket = nil + last_error = nil + is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/) + + ret = loop do + case state + when :start + specified_family_name, next_state = host && specified_family_name_and_next_state(host) + + if local_host && local_port + specified_family_name, next_state = specified_family_name_and_next_state(local_host) unless specified_family_name + local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout) + end + + if specified_family_name + addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout) + selectable_addrinfos.add(specified_family_name, addrinfos) + hostname_resolution_queue = NoHostnameResolutionQueue.new + state = next_state + next + end + + resolving_family_names = ADDRESS_FAMILIES.keys + hostname_resolution_queue = HostnameResolutionQueue.new(resolving_family_names.size) + hostname_resolution_waiting = hostname_resolution_queue.waiting_pipe + hostname_resolution_started_at = current_clocktime if resolv_timeout + hostname_resolution_args = [host, port, hostname_resolution_queue] + + hostname_resolution_threads.concat( + resolving_family_names.map { |family| + thread_args = [family, *hostname_resolution_args] + thread = Thread.new(*thread_args) { |*thread_args| hostname_resolution(*thread_args) } + Thread.pass + thread + } + ) + + hostname_resolution_retry_count = resolving_family_names.size - 1 + hostname_resolution_expires_at = hostname_resolution_started_at + resolv_timeout if resolv_timeout + + while hostname_resolution_retry_count >= 0 + remaining = resolv_timeout ? second_to_timeout(hostname_resolution_started_at + resolv_timeout) : nil + hostname_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, remaining) + + unless hostname_resolved + state = :timeout + break + end + + family_name, res = hostname_resolution_queue.get + + if res.is_a? Exception + unless (Socket.const_defined?(:EAI_ADDRFAMILY)) && + (res.is_a?(Socket::ResolutionError)) && + (res.error_code == Socket::EAI_ADDRFAMILY) + last_error = res + end + + if hostname_resolution_retry_count.zero? + state = :failure + break + end + hostname_resolution_retry_count -= 1 + else + state = case family_name + when :ipv6 then :v6c + when :ipv4 then hostname_resolution_queue.closed? ? :v4c : :v4w + end + selectable_addrinfos.add(family_name, res) + last_error = nil + break + end + end + + next + when :v4w + ipv6_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, RESOLUTION_DELAY) + + if ipv6_resolved + family_name, res = hostname_resolution_queue.get + selectable_addrinfos.add(family_name, res) unless res.is_a? Exception + state = :v46c + else + state = :v4c + end + + next + when :v4c, :v6c, :v46c + connection_attempt_started_at = current_clocktime unless connection_attempt_started_at + addrinfo = selectable_addrinfos.get + + if local_addrinfos.any? + local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily } + + if local_addrinfo.nil? + if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed? + last_error = SocketError.new 'no appropriate local address' + state = :failure + elsif selectable_addrinfos.any? + # Try other Addrinfo in next loop + else + if resolv_timeout && hostname_resolution_queue.opened? && + (current_clocktime >= hostname_resolution_expires_at) + state = :timeout + else + # Wait for connection to be established or hostname resolution in next loop + connection_attempt_delay_expires_at = nil + state = :v46w + end + end + next + end + end + + connection_attempt_delay_expires_at = current_clocktime + CONNECTION_ATTEMPT_DELAY + + begin + result = if specified_family_name && selectable_addrinfos.empty? && + connecting_sockets.empty? && hostname_resolution_queue.closed? + local_addrinfo ? + addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) : + addrinfo.connect(timeout: connect_timeout) + else + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + socket.bind(local_addrinfo) if local_addrinfo + socket.connect_nonblock(addrinfo, exception: false) + end + + case result + when 0 + connected_socket = socket + state = :success + when Socket + connected_socket = result + state = :success + when :wait_writable + connecting_sockets.add(socket, addrinfo) + state = :v46w + end + rescue SystemCallError => e + last_error = e + socket.close if socket && !socket.closed? + + if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed? + state = :failure + elsif selectable_addrinfos.any? + # Try other Addrinfo in next loop + else + if resolv_timeout && hostname_resolution_queue.opened? && + (current_clocktime >= hostname_resolution_expires_at) + state = :timeout + else + # Wait for connection to be established or hostname resolution in next loop + connection_attempt_delay_expires_at = nil + state = :v46w + end + end + end + + next + when :v46w + if connect_timeout && second_to_timeout(connection_attempt_started_at + connect_timeout).zero? + state = :timeout + next + end + + remaining = second_to_timeout(connection_attempt_delay_expires_at) + hostname_resolution_waiting = hostname_resolution_waiting && hostname_resolution_queue.closed? ? nil : hostname_resolution_waiting + hostname_resolved, connectable_sockets, = IO.select(hostname_resolution_waiting, connecting_sockets.all, nil, remaining) + + if connectable_sockets&.any? + while (connectable_socket = connectable_sockets.pop) + is_connected = + if is_windows_environment + sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME) + sockopt.unpack('i').first >= 0 + else + sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR) + sockopt.int.zero? + end + + if is_connected + connected_socket = connectable_socket + connecting_sockets.delete connectable_socket + connectable_sockets.each do |other_connectable_socket| + other_connectable_socket.close unless other_connectable_socket.closed? + end + state = :success + break + else + failed_ai = connecting_sockets.delete connectable_socket + inspected_ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address + last_error = SystemCallError.new("connect(2) for #{inspected_ip_address}:#{failed_ai.ip_port}", sockopt.int) + connectable_socket.close unless connectable_socket.closed? + + next if connectable_sockets.any? + + if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed? + state = :failure + elsif selectable_addrinfos.any? + # Wait for connection attempt delay timeout in next loop + else + if resolv_timeout && hostname_resolution_queue.opened? && + (current_clocktime >= hostname_resolution_expires_at) + state = :timeout + else + # Wait for connection to be established or hostname resolution in next loop + connection_attempt_delay_expires_at = nil + end + end + end + end + elsif hostname_resolved&.any? + family_name, res = hostname_resolution_queue.get + selectable_addrinfos.add(family_name, res) unless res.is_a? Exception + else + if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed? + state = :failure + elsif selectable_addrinfos.any? + # Try other Addrinfo in next loop + state = :v46c + else + if resolv_timeout && hostname_resolution_queue.opened? && + (current_clocktime >= hostname_resolution_expires_at) + state = :timeout + else + # Wait for connection to be established or hostname resolution in next loop + connection_attempt_delay_expires_at = nil + end + end + end + + next + when :success + break connected_socket + when :failure + raise last_error + when :timeout + raise Errno::ETIMEDOUT, 'user specified timeout' + end + end + + if block_given? + begin + yield ret + ensure + ret.close + end + else + ret + end + ensure + if fast_fallback + hostname_resolution_threads.each do |thread| + thread&.exit + end + + hostname_resolution_queue&.close_all + + connecting_sockets.each do |connecting_socket| + connecting_socket.close unless connecting_socket.closed? + end + end + end + + def self.specified_family_name_and_next_state(hostname) + if hostname.match?(IPV6_ADRESS_FORMAT) then [:ipv6, :v6c] + elsif hostname.match?(/^([0-9]{1,3}\.){3}[0-9]{1,3}$/) then [:ipv4, :v4c] + end + end + private_class_method :specified_family_name_and_next_state + + def self.hostname_resolution(family, host, port, hostname_resolution_queue) + begin + resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM) + hostname_resolution_queue.add_resolved(family, resolved_addrinfos) + rescue => e + hostname_resolution_queue.add_error(family, e) + end + end + private_class_method :hostname_resolution + + def self.second_to_timeout(ends_at) + return 0 unless ends_at + + remaining = (ends_at - current_clocktime) + remaining.negative? ? 0 : remaining + end + private_class_method :second_to_timeout + + def self.current_clocktime + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + private_class_method :current_clocktime + + class SelectableAddrinfos + PRIORITY_ON_V6 = [:ipv6, :ipv4] + PRIORITY_ON_V4 = [:ipv4, :ipv6] + + def initialize + @addrinfo_dict = {} + @last_family = nil + end + + def add(family_name, addrinfos) + @addrinfo_dict[family_name] = addrinfos + end + + def get + return nil if empty? + + if @addrinfo_dict.size == 1 + @addrinfo_dict.each { |_, addrinfos| return addrinfos.shift } + end + + precedences = case @last_family + when :ipv4, nil then PRIORITY_ON_V6 + when :ipv6 then PRIORITY_ON_V4 + end + + precedences.each do |family_name| + addrinfo = @addrinfo_dict[family_name]&.shift + next unless addrinfo + + @last_family = family_name + return addrinfo + end + end + + def empty? + @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? } + end + + def any? + !empty? + end + end + private_constant :SelectableAddrinfos + + class NoHostnameResolutionQueue + def waiting_pipe + nil + end + + def add_resolved(_, _) + raise StandardError, "This #{self.class} cannot respond to:" + end + + def add_error(_, _) + raise StandardError, "This #{self.class} cannot respond to:" + end + + def get + nil + end + + def opened? + false + end + + def closed? + true + end + + def close_all + # Do nothing + end + end + private_constant :NoHostnameResolutionQueue + + class HostnameResolutionQueue + def initialize(size) + @size = size + @taken_count = 0 + @rpipe, @wpipe = IO.pipe + @queue = Queue.new + @mutex = Mutex.new + end + + def waiting_pipe + [@rpipe] + end + + def add_resolved(family, resolved_addrinfos) + @mutex.synchronize do + @queue.push [family, resolved_addrinfos] + @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED + end + end + + def add_error(family, error) + @mutex.synchronize do + @queue.push [family, error] + @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED + end + end + + def get + return nil if @queue.empty? + + res = nil + + @mutex.synchronize do + @rpipe.getbyte + res = @queue.pop + end + + @taken_count += 1 + close_all if @taken_count == @size + res + end + + def closed? + @rpipe.closed? + end + + def opened? + !closed? + end + + def close_all + @queue.close unless @queue.closed? + @rpipe.close unless @rpipe.closed? + @wpipe.close unless @wpipe.closed? + end + end + private_constant :HostnameResolutionQueue + + class ConnectingSockets + def initialize + @socket_dict = {} + end + + def all + @socket_dict.keys + end + + def add(socket, addrinfo) + @socket_dict[socket] = addrinfo + end + + def delete(socket) + @socket_dict.delete socket + end + + def empty? + @socket_dict.empty? + end + + def each + @socket_dict.keys.each do |socket| + yield socket + end + end + end + private_constant :ConnectingSockets + + def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block) last_error = nil ret = nil @@ -664,6 +1176,7 @@ class Socket < BasicSocket ret end end + private_class_method :tcp_without_fast_fallback # :stopdoc: def self.ip_sockets_port0(ai_list, reuseaddr) @@ -1226,9 +1739,10 @@ class UDPSocket < IPSocket # The first element of the results, _mesg_, is the data received. # The second element, _sender_inet_addr_, is an array to represent the sender address. # - # When recvfrom(2) returns 0, - # Socket#recvfrom_nonblock returns an empty string as data. - # It means an empty packet. + # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil. + # In most cases it means the connection was closed, but it may also mean + # an empty packet was received, as the underlying API makes + # it impossible to distinguish these two cases. # # === Parameters # * +maxlen+ - the number of bytes to receive from the socket |