summaryrefslogtreecommitdiff
path: root/ext/socket/lib/socket.rb
diff options
context:
space:
mode:
Diffstat (limited to 'ext/socket/lib/socket.rb')
-rw-r--r--ext/socket/lib/socket.rb453
1 files changed, 432 insertions, 21 deletions
diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb
index d756a32a5a..d61de1e8e9 100644
--- a/ext/socket/lib/socket.rb
+++ b/ext/socket/lib/socket.rb
@@ -1,7 +1,11 @@
# frozen_string_literal: true
require 'socket.so'
-require 'io/wait'
+
+unless IO.method_defined?(:wait_writable, false)
+ # It's only required on older Rubies < v3.2:
+ require 'io/wait'
+end
class Addrinfo
# creates an Addrinfo object from the arguments.
@@ -197,7 +201,7 @@ class Addrinfo
sock = Socket.new(self.pfamily, self.socktype, self.protocol)
begin
sock.ipv6only! if self.ipv6?
- sock.setsockopt(:SOCKET, :REUSEADDR, 1)
+ sock.setsockopt(:SOCKET, :REUSEADDR, 1) unless self.pfamily == Socket::PF_UNIX
sock.bind(self)
sock.listen(backlog)
rescue Exception
@@ -329,9 +333,10 @@ class BasicSocket < IO
# _flags_ is zero or more of the +MSG_+ options.
# The result, _mesg_, is the data received.
#
- # When recvfrom(2) returns 0, Socket#recv_nonblock returns
- # an empty string as data.
- # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc.
+ # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil.
+ # In most cases it means the connection was closed, but for UDP connections
+ # it may mean an empty packet was received, as the underlying API makes
+ # it impossible to distinguish these two cases.
#
# === Parameters
# * +maxlen+ - the number of bytes to receive from the socket
@@ -476,9 +481,10 @@ class Socket < BasicSocket
# The second element, _sender_addrinfo_, contains protocol-specific address
# information of the sender.
#
- # When recvfrom(2) returns 0, Socket#recvfrom_nonblock returns
- # an empty string as data.
- # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc.
+ # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil.
+ # In most cases it means the connection was closed, but for UDP connections
+ # it may mean an empty packet was received, as the underlying API makes
+ # it impossible to distinguish these two cases.
#
# === Parameters
# * +maxlen+ - the maximum number of bytes to receive from the socket
@@ -593,6 +599,30 @@ class Socket < BasicSocket
__accept_nonblock(exception)
end
+ RESOLUTION_DELAY = 0.05
+ private_constant :RESOLUTION_DELAY
+
+ CONNECTION_ATTEMPT_DELAY = 0.25
+ private_constant :CONNECTION_ATTEMPT_DELAY
+
+ ADDRESS_FAMILIES = {
+ ipv6: Socket::AF_INET6,
+ ipv4: Socket::AF_INET
+ }.freeze
+ private_constant :ADDRESS_FAMILIES
+
+ HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0
+ private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED
+
+ IPV6_ADRESS_FORMAT = /\A(?i:(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?:(?::[0-9A-F]{1,4}){1,2}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?:(?::[0-9A-F]{1,4}){1,3}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?:(?::[0-9A-F]{1,4}){1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?:(?::[0-9A-F]{1,4}){1,5}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?:(?::[0-9A-F]{1,4}){1,6}|:(?:[0-9A-F]{1,4}:){0,5}[0-9A-F]{1,4}|:)|(?:::(?:[0-9A-F]{1,4}:){0,7}[0-9A-F]{1,4}|::)))(?:%.+)?\z/
+ private_constant :IPV6_ADRESS_FORMAT
+
+ @tcp_fast_fallback = true
+
+ class << self
+ attr_accessor :tcp_fast_fallback
+ end
+
# :call-seq:
# Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) {|socket| ... }
# Socket.tcp(host, port, local_host=nil, local_port=nil, [opts])
@@ -606,7 +636,6 @@ class Socket < BasicSocket
# _opts_ may have following options:
#
# [:connect_timeout] specify the timeout in seconds.
- # [:resolv_timeout] specify the name resolution timeout in seconds.
#
# If a block is given, the block is called with the socket.
# The value of the block is returned.
@@ -619,8 +648,256 @@ class Socket < BasicSocket
# sock.close_write
# puts sock.read
# }
- #
- def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil) # :yield: socket
+ def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, fast_fallback: tcp_fast_fallback, &) # :yield: socket
+ sock = if fast_fallback && !(host && ip_address?(host))
+ tcp_with_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:)
+ else
+ tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:)
+ end
+
+ if block_given?
+ begin
+ yield sock
+ ensure
+ sock.close
+ end
+ else
+ sock
+ end
+ end
+
+ def self.tcp_with_fast_fallback(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil)
+ if local_host || local_port
+ local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, timeout: resolv_timeout)
+ resolving_family_names = local_addrinfos.map { |lai| ADDRESS_FAMILIES.key(lai.afamily) }.uniq
+ else
+ local_addrinfos = []
+ resolving_family_names = ADDRESS_FAMILIES.keys
+ end
+
+ hostname_resolution_threads = []
+ resolution_store = HostnameResolutionStore.new(resolving_family_names)
+ connecting_sockets = {}
+ is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/)
+
+ now = current_clock_time
+ resolution_delay_expires_at = nil
+ connection_attempt_delay_expires_at = nil
+ user_specified_connect_timeout_at = nil
+ last_error = nil
+
+ if resolving_family_names.size == 1
+ family_name = resolving_family_names.first
+ addrinfos = Addrinfo.getaddrinfo(host, port, family_name, :STREAM, timeout: resolv_timeout)
+ resolution_store.add_resolved(family_name, addrinfos)
+ hostname_resolution_result = nil
+ hostname_resolution_notifier = nil
+ user_specified_resolv_timeout_at = nil
+ else
+ hostname_resolution_result = HostnameResolutionResult.new(resolving_family_names.size)
+ hostname_resolution_notifier = hostname_resolution_result.notifier
+
+ hostname_resolution_threads.concat(
+ resolving_family_names.map { |family|
+ thread_args = [family, host, port, hostname_resolution_result]
+ thread = Thread.new(*thread_args) { |*thread_args| resolve_hostname(*thread_args) }
+ Thread.pass
+ thread
+ }
+ )
+
+ user_specified_resolv_timeout_at = resolv_timeout ? now + resolv_timeout : Float::INFINITY
+ end
+
+ loop do
+ if resolution_store.any_addrinfos? &&
+ !resolution_delay_expires_at &&
+ !connection_attempt_delay_expires_at
+ while (addrinfo = resolution_store.get_addrinfo)
+ if local_addrinfos.any?
+ local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily }
+
+ if local_addrinfo.nil? # Connecting addrinfoと同じアドレスファミリのLocal addrinfoがない
+ if resolution_store.any_addrinfos?
+ # Try other Addrinfo in next "while"
+ next
+ elsif connecting_sockets.any? || resolution_store.any_unresolved_family?
+ # Exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
+ break
+ else
+ raise SocketError.new 'no appropriate local address'
+ end
+ end
+ end
+
+ begin
+ if resolution_store.any_addrinfos? ||
+ connecting_sockets.any? ||
+ resolution_store.any_unresolved_family?
+ socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
+ socket.bind(local_addrinfo) if local_addrinfo
+ result = socket.connect_nonblock(addrinfo, exception: false)
+ else
+ result = socket = local_addrinfo ?
+ addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) :
+ addrinfo.connect(timeout: connect_timeout)
+ end
+
+ if result == :wait_writable
+ connection_attempt_delay_expires_at = now + CONNECTION_ATTEMPT_DELAY
+ if resolution_store.empty_addrinfos?
+ user_specified_connect_timeout_at = connect_timeout ? now + connect_timeout : Float::INFINITY
+ end
+
+ connecting_sockets[socket] = addrinfo
+ break
+ else
+ return socket # connection established
+ end
+ rescue SystemCallError => e
+ socket&.close
+ last_error = e
+
+ if resolution_store.any_addrinfos?
+ # Try other Addrinfo in next "while"
+ next
+ elsif connecting_sockets.any? || resolution_store.any_unresolved_family?
+ # Exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
+ break
+ else
+ raise last_error
+ end
+ end
+ end
+ end
+
+ ends_at =
+ if resolution_store.any_addrinfos?
+ resolution_delay_expires_at || connection_attempt_delay_expires_at
+ else
+ [user_specified_resolv_timeout_at, user_specified_connect_timeout_at].compact.max
+ end
+
+ hostname_resolved, writable_sockets, except_sockets = IO.select(
+ hostname_resolution_notifier,
+ connecting_sockets.keys,
+ # Use errorfds to wait for non-blocking connect failures on Windows
+ is_windows_environment ? connecting_sockets.keys : nil,
+ second_to_timeout(current_clock_time, ends_at),
+ )
+ now = current_clock_time
+ resolution_delay_expires_at = nil if expired?(now, resolution_delay_expires_at)
+ connection_attempt_delay_expires_at = nil if expired?(now, connection_attempt_delay_expires_at)
+
+ if writable_sockets&.any?
+ while (writable_socket = writable_sockets.pop)
+ is_connected = is_windows_environment || (
+ sockopt = writable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
+ sockopt.int.zero?
+ )
+
+ if is_connected
+ connecting_sockets.delete writable_socket
+ return writable_socket
+ else
+ failed_ai = connecting_sockets.delete writable_socket
+ writable_socket.close
+ ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
+ last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int)
+
+ if writable_sockets.any? ||
+ resolution_store.any_addrinfos? ||
+ connecting_sockets.any? ||
+ resolution_store.any_unresolved_family?
+ user_specified_connect_timeout_at = nil if connecting_sockets.empty?
+ # Try other writable socket in next "while"
+ # Or exit this "while" and try other connection attempt
+ # Or exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
+ else
+ raise last_error
+ end
+ end
+ end
+ end
+
+ if except_sockets&.any?
+ except_sockets.each do |except_socket|
+ failed_ai = connecting_sockets.delete except_socket
+ sockopt = except_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME)
+ except_socket.close
+ ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
+ last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int)
+
+ if writable_sockets.any? ||
+ resolution_store.any_addrinfos? ||
+ connecting_sockets.any? ||
+ resolution_store.any_unresolved_family?
+ user_specified_connect_timeout_at = nil if connecting_sockets.empty?
+ # Try other writable socket in next "while"
+ # Or exit this "while" and try other connection attempt
+ # Or exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
+ else
+ raise last_error
+ end
+ end
+ end
+
+ if hostname_resolved&.any?
+ while (family_and_result = hostname_resolution_result.get)
+ family_name, result = family_and_result
+
+ if result.is_a? Exception
+ resolution_store.add_error(family_name, result)
+
+ unless (Socket.const_defined?(:EAI_ADDRFAMILY)) &&
+ (result.is_a?(Socket::ResolutionError)) &&
+ (result.error_code == Socket::EAI_ADDRFAMILY)
+ last_error = result
+ end
+ else
+ resolution_store.add_resolved(family_name, result)
+ end
+ end
+
+ if resolution_store.resolved?(:ipv4)
+ if resolution_store.resolved?(:ipv6)
+ hostname_resolution_notifier = nil
+ resolution_delay_expires_at = nil
+ user_specified_resolv_timeout_at = nil
+ elsif resolution_store.resolved_successfully?(:ipv4)
+ resolution_delay_expires_at = now + RESOLUTION_DELAY
+ end
+ end
+ end
+
+ if resolution_store.empty_addrinfos?
+ if connecting_sockets.empty? && resolution_store.resolved_all_families?
+ raise last_error
+ end
+
+ if (expired?(now, user_specified_resolv_timeout_at) || resolution_store.resolved_all_families?) &&
+ (expired?(now, user_specified_connect_timeout_at) || connecting_sockets.empty?)
+ raise Errno::ETIMEDOUT, 'user specified timeout'
+ end
+ end
+ end
+ ensure
+ hostname_resolution_threads.each do |thread|
+ thread.exit
+ end
+
+ hostname_resolution_result&.close
+
+ connecting_sockets.each_key do |connecting_socket|
+ connecting_socket.close
+ end
+ end
+
+ def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:)
last_error = nil
ret = nil
@@ -654,16 +931,149 @@ class Socket < BasicSocket
raise SocketError, "no appropriate local address"
end
end
- if block_given?
- begin
- yield ret
- ensure
- ret.close
+
+ ret
+ end
+ private_class_method :tcp_without_fast_fallback
+
+ def self.ip_address?(hostname)
+ hostname.match?(IPV6_ADRESS_FORMAT) || hostname.match?(/\A([0-9]{1,3}\.){3}[0-9]{1,3}\z/)
+ end
+ private_class_method :ip_address?
+
+ def self.resolve_hostname(family, host, port, hostname_resolution_result)
+ begin
+ resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM)
+ hostname_resolution_result.add(family, resolved_addrinfos)
+ rescue => e
+ hostname_resolution_result.add(family, e)
+ end
+ end
+ private_class_method :resolve_hostname
+
+ def self.current_clock_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+ private_class_method :current_clock_time
+
+ def self.second_to_timeout(started_at, ends_at)
+ return nil if ends_at == Float::INFINITY || ends_at.nil?
+
+ remaining = (ends_at - started_at)
+ remaining.negative? ? 0 : remaining
+ end
+ private_class_method :second_to_timeout
+
+ def self.expired?(started_at, ends_at)
+ second_to_timeout(started_at, ends_at)&.zero?
+ end
+ private_class_method :expired?
+
+ class HostnameResolutionResult
+ def initialize(size)
+ @size = size
+ @taken_count = 0
+ @rpipe, @wpipe = IO.pipe
+ @results = []
+ @mutex = Mutex.new
+ end
+
+ def notifier
+ [@rpipe]
+ end
+
+ def add(family, result)
+ @mutex.synchronize do
+ @results.push [family, result]
+ @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
end
- else
- ret
+ end
+
+ def get
+ return nil if @results.empty?
+
+ res = nil
+
+ @mutex.synchronize do
+ @rpipe.getbyte
+ res = @results.shift
+ end
+
+ @taken_count += 1
+ close if @taken_count == @size
+ res
+ end
+
+ def close
+ @rpipe.close
+ @wpipe.close
+ end
+ end
+ private_constant :HostnameResolutionResult
+
+ class HostnameResolutionStore
+ PRIORITY_ON_V6 = [:ipv6, :ipv4]
+ PRIORITY_ON_V4 = [:ipv4, :ipv6]
+
+ def initialize(family_names)
+ @family_names = family_names
+ @addrinfo_dict = {}
+ @error_dict = {}
+ @last_family = nil
+ end
+
+ def add_resolved(family_name, addrinfos)
+ @addrinfo_dict[family_name] = addrinfos
+ end
+
+ def add_error(family_name, error)
+ @addrinfo_dict[family_name] = []
+ @error_dict[family_name] = error
+ end
+
+ def get_addrinfo
+ precedences =
+ case @last_family
+ when :ipv4, nil then PRIORITY_ON_V6
+ when :ipv6 then PRIORITY_ON_V4
+ end
+
+ precedences.each do |family_name|
+ addrinfo = @addrinfo_dict[family_name]&.shift
+ next unless addrinfo
+
+ @last_family = family_name
+ return addrinfo
+ end
+
+ nil
+ end
+
+ def empty_addrinfos?
+ @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? }
+ end
+
+ def any_addrinfos?
+ !empty_addrinfos?
+ end
+
+ def resolved?(family)
+ @addrinfo_dict.has_key? family
+ end
+
+ def resolved_successfully?(family)
+ resolved?(family) && !!@error_dict[family]
+ end
+
+ def resolved_all_families?
+ (@family_names - @addrinfo_dict.keys).empty?
+ end
+
+ def any_unresolved_family?
+ !resolved_all_families?
end
end
+ private_constant :HostnameResolutionStore
# :stopdoc:
def self.ip_sockets_port0(ai_list, reuseaddr)
@@ -1226,9 +1636,10 @@ class UDPSocket < IPSocket
# The first element of the results, _mesg_, is the data received.
# The second element, _sender_inet_addr_, is an array to represent the sender address.
#
- # When recvfrom(2) returns 0,
- # Socket#recvfrom_nonblock returns an empty string as data.
- # It means an empty packet.
+ # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil.
+ # In most cases it means the connection was closed, but it may also mean
+ # an empty packet was received, as the underlying API makes
+ # it impossible to distinguish these two cases.
#
# === Parameters
# * +maxlen+ - the number of bytes to receive from the socket