summaryrefslogtreecommitdiff
path: root/ext/socket/lib/socket.rb
diff options
context:
space:
mode:
Diffstat (limited to 'ext/socket/lib/socket.rb')
-rw-r--r--ext/socket/lib/socket.rb542
1 files changed, 528 insertions, 14 deletions
diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb
index d756a32a5a..e953077fe6 100644
--- a/ext/socket/lib/socket.rb
+++ b/ext/socket/lib/socket.rb
@@ -1,7 +1,11 @@
# frozen_string_literal: true
require 'socket.so'
-require 'io/wait'
+
+unless IO.method_defined?(:wait_writable, false)
+ # It's only required on older Rubies < v3.2:
+ require 'io/wait'
+end
class Addrinfo
# creates an Addrinfo object from the arguments.
@@ -197,7 +201,7 @@ class Addrinfo
sock = Socket.new(self.pfamily, self.socktype, self.protocol)
begin
sock.ipv6only! if self.ipv6?
- sock.setsockopt(:SOCKET, :REUSEADDR, 1)
+ sock.setsockopt(:SOCKET, :REUSEADDR, 1) unless self.pfamily == Socket::PF_UNIX
sock.bind(self)
sock.listen(backlog)
rescue Exception
@@ -329,9 +333,10 @@ class BasicSocket < IO
# _flags_ is zero or more of the +MSG_+ options.
# The result, _mesg_, is the data received.
#
- # When recvfrom(2) returns 0, Socket#recv_nonblock returns
- # an empty string as data.
- # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc.
+ # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil.
+ # In most cases it means the connection was closed, but for UDP connections
+ # it may mean an empty packet was received, as the underlying API makes
+ # it impossible to distinguish these two cases.
#
# === Parameters
# * +maxlen+ - the number of bytes to receive from the socket
@@ -476,9 +481,10 @@ class Socket < BasicSocket
# The second element, _sender_addrinfo_, contains protocol-specific address
# information of the sender.
#
- # When recvfrom(2) returns 0, Socket#recvfrom_nonblock returns
- # an empty string as data.
- # The meaning depends on the socket: EOF on TCP, empty packet on UDP, etc.
+ # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil.
+ # In most cases it means the connection was closed, but for UDP connections
+ # it may mean an empty packet was received, as the underlying API makes
+ # it impossible to distinguish these two cases.
#
# === Parameters
# * +maxlen+ - the maximum number of bytes to receive from the socket
@@ -593,6 +599,30 @@ class Socket < BasicSocket
__accept_nonblock(exception)
end
+ RESOLUTION_DELAY = 0.05
+ private_constant :RESOLUTION_DELAY
+
+ CONNECTION_ATTEMPT_DELAY = 0.25
+ private_constant :CONNECTION_ATTEMPT_DELAY
+
+ ADDRESS_FAMILIES = {
+ ipv6: Socket::AF_INET6,
+ ipv4: Socket::AF_INET
+ }.freeze
+ private_constant :ADDRESS_FAMILIES
+
+ HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0
+ private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED
+
+ IPV6_ADRESS_FORMAT = /(?i)(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|::(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|::(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,6}[0-9A-F]{1,4}|:))(?:%.+)?/
+ private_constant :IPV6_ADRESS_FORMAT
+
+ @tcp_fast_fallback = true
+
+ class << self
+ attr_accessor :tcp_fast_fallback
+ end
+
# :call-seq:
# Socket.tcp(host, port, local_host=nil, local_port=nil, [opts]) {|socket| ... }
# Socket.tcp(host, port, local_host=nil, local_port=nil, [opts])
@@ -606,7 +636,6 @@ class Socket < BasicSocket
# _opts_ may have following options:
#
# [:connect_timeout] specify the timeout in seconds.
- # [:resolv_timeout] specify the name resolution timeout in seconds.
#
# If a block is given, the block is called with the socket.
# The value of the block is returned.
@@ -619,8 +648,491 @@ class Socket < BasicSocket
# sock.close_write
# puts sock.read
# }
- #
- def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil) # :yield: socket
+ def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, fast_fallback: tcp_fast_fallback, &block) # :yield: socket
+ unless fast_fallback
+ return tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
+ end
+
+ # Happy Eyeballs' states
+ # - :start
+ # - :v6c
+ # - :v4w
+ # - :v4c
+ # - :v46c
+ # - :v46w
+ # - :success
+ # - :failure
+ # - :timeout
+
+ specified_family_name = nil
+ hostname_resolution_threads = []
+ hostname_resolution_queue = nil
+ hostname_resolution_waiting = nil
+ hostname_resolution_expires_at = nil
+ selectable_addrinfos = SelectableAddrinfos.new
+ connecting_sockets = ConnectingSockets.new
+ local_addrinfos = []
+ connection_attempt_delay_expires_at = nil
+ connection_attempt_started_at = nil
+ state = :start
+ connected_socket = nil
+ last_error = nil
+ is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/)
+
+ ret = loop do
+ case state
+ when :start
+ specified_family_name, next_state = host && specified_family_name_and_next_state(host)
+
+ if local_host && local_port
+ specified_family_name, next_state = specified_family_name_and_next_state(local_host) unless specified_family_name
+ local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout)
+ end
+
+ if specified_family_name
+ addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout)
+ selectable_addrinfos.add(specified_family_name, addrinfos)
+ hostname_resolution_queue = NoHostnameResolutionQueue.new
+ state = next_state
+ next
+ end
+
+ resolving_family_names = ADDRESS_FAMILIES.keys
+ hostname_resolution_queue = HostnameResolutionQueue.new(resolving_family_names.size)
+ hostname_resolution_waiting = hostname_resolution_queue.waiting_pipe
+ hostname_resolution_started_at = current_clocktime if resolv_timeout
+ hostname_resolution_args = [host, port, hostname_resolution_queue]
+
+ hostname_resolution_threads.concat(
+ resolving_family_names.map { |family|
+ thread_args = [family, *hostname_resolution_args]
+ thread = Thread.new(*thread_args) { |*thread_args| hostname_resolution(*thread_args) }
+ Thread.pass
+ thread
+ }
+ )
+
+ hostname_resolution_retry_count = resolving_family_names.size - 1
+ hostname_resolution_expires_at = hostname_resolution_started_at + resolv_timeout if resolv_timeout
+
+ while hostname_resolution_retry_count >= 0
+ remaining = resolv_timeout ? second_to_timeout(hostname_resolution_started_at + resolv_timeout) : nil
+ hostname_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, remaining)
+
+ unless hostname_resolved
+ state = :timeout
+ break
+ end
+
+ family_name, res = hostname_resolution_queue.get
+
+ if res.is_a? Exception
+ unless (Socket.const_defined?(:EAI_ADDRFAMILY)) &&
+ (res.is_a?(Socket::ResolutionError)) &&
+ (res.error_code == Socket::EAI_ADDRFAMILY)
+ last_error = res
+ end
+
+ if hostname_resolution_retry_count.zero?
+ state = :failure
+ break
+ end
+ hostname_resolution_retry_count -= 1
+ else
+ state = case family_name
+ when :ipv6 then :v6c
+ when :ipv4 then hostname_resolution_queue.closed? ? :v4c : :v4w
+ end
+ selectable_addrinfos.add(family_name, res)
+ last_error = nil
+ break
+ end
+ end
+
+ next
+ when :v4w
+ ipv6_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, RESOLUTION_DELAY)
+
+ if ipv6_resolved
+ family_name, res = hostname_resolution_queue.get
+ selectable_addrinfos.add(family_name, res) unless res.is_a? Exception
+ state = :v46c
+ else
+ state = :v4c
+ end
+
+ next
+ when :v4c, :v6c, :v46c
+ connection_attempt_started_at = current_clocktime unless connection_attempt_started_at
+ addrinfo = selectable_addrinfos.get
+
+ if local_addrinfos.any?
+ local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily }
+
+ if local_addrinfo.nil?
+ if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
+ last_error = SocketError.new 'no appropriate local address'
+ state = :failure
+ elsif selectable_addrinfos.any?
+ # Try other Addrinfo in next loop
+ else
+ if resolv_timeout && hostname_resolution_queue.opened? &&
+ (current_clocktime >= hostname_resolution_expires_at)
+ state = :timeout
+ else
+ # Wait for connection to be established or hostname resolution in next loop
+ connection_attempt_delay_expires_at = nil
+ state = :v46w
+ end
+ end
+ next
+ end
+ end
+
+ connection_attempt_delay_expires_at = current_clocktime + CONNECTION_ATTEMPT_DELAY
+
+ begin
+ result = if specified_family_name && selectable_addrinfos.empty? &&
+ connecting_sockets.empty? && hostname_resolution_queue.closed?
+ local_addrinfo ?
+ addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) :
+ addrinfo.connect(timeout: connect_timeout)
+ else
+ socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
+ socket.bind(local_addrinfo) if local_addrinfo
+ socket.connect_nonblock(addrinfo, exception: false)
+ end
+
+ case result
+ when 0
+ connected_socket = socket
+ state = :success
+ when Socket
+ connected_socket = result
+ state = :success
+ when :wait_writable
+ connecting_sockets.add(socket, addrinfo)
+ state = :v46w
+ end
+ rescue SystemCallError => e
+ last_error = e
+ socket.close if socket && !socket.closed?
+
+ if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
+ state = :failure
+ elsif selectable_addrinfos.any?
+ # Try other Addrinfo in next loop
+ else
+ if resolv_timeout && hostname_resolution_queue.opened? &&
+ (current_clocktime >= hostname_resolution_expires_at)
+ state = :timeout
+ else
+ # Wait for connection to be established or hostname resolution in next loop
+ connection_attempt_delay_expires_at = nil
+ state = :v46w
+ end
+ end
+ end
+
+ next
+ when :v46w
+ if connect_timeout && second_to_timeout(connection_attempt_started_at + connect_timeout).zero?
+ state = :timeout
+ next
+ end
+
+ remaining = second_to_timeout(connection_attempt_delay_expires_at)
+ hostname_resolution_waiting = hostname_resolution_waiting && hostname_resolution_queue.closed? ? nil : hostname_resolution_waiting
+ hostname_resolved, connectable_sockets, = IO.select(hostname_resolution_waiting, connecting_sockets.all, nil, remaining)
+
+ if connectable_sockets&.any?
+ while (connectable_socket = connectable_sockets.pop)
+ is_connected =
+ if is_windows_environment
+ sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME)
+ sockopt.unpack('i').first >= 0
+ else
+ sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
+ sockopt.int.zero?
+ end
+
+ if is_connected
+ connected_socket = connectable_socket
+ connecting_sockets.delete connectable_socket
+ connectable_sockets.each do |other_connectable_socket|
+ other_connectable_socket.close unless other_connectable_socket.closed?
+ end
+ state = :success
+ break
+ else
+ failed_ai = connecting_sockets.delete connectable_socket
+ inspected_ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
+ last_error = SystemCallError.new("connect(2) for #{inspected_ip_address}:#{failed_ai.ip_port}", sockopt.int)
+ connectable_socket.close unless connectable_socket.closed?
+
+ next if connectable_sockets.any?
+
+ if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
+ state = :failure
+ elsif selectable_addrinfos.any?
+ # Wait for connection attempt delay timeout in next loop
+ else
+ if resolv_timeout && hostname_resolution_queue.opened? &&
+ (current_clocktime >= hostname_resolution_expires_at)
+ state = :timeout
+ else
+ # Wait for connection to be established or hostname resolution in next loop
+ connection_attempt_delay_expires_at = nil
+ end
+ end
+ end
+ end
+ elsif hostname_resolved&.any?
+ family_name, res = hostname_resolution_queue.get
+ selectable_addrinfos.add(family_name, res) unless res.is_a? Exception
+ else
+ if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
+ state = :failure
+ elsif selectable_addrinfos.any?
+ # Try other Addrinfo in next loop
+ state = :v46c
+ else
+ if resolv_timeout && hostname_resolution_queue.opened? &&
+ (current_clocktime >= hostname_resolution_expires_at)
+ state = :timeout
+ else
+ # Wait for connection to be established or hostname resolution in next loop
+ connection_attempt_delay_expires_at = nil
+ end
+ end
+ end
+
+ next
+ when :success
+ break connected_socket
+ when :failure
+ raise last_error
+ when :timeout
+ raise Errno::ETIMEDOUT, 'user specified timeout'
+ end
+ end
+
+ if block_given?
+ begin
+ yield ret
+ ensure
+ ret.close
+ end
+ else
+ ret
+ end
+ ensure
+ if fast_fallback
+ hostname_resolution_threads.each do |thread|
+ thread&.exit
+ end
+
+ hostname_resolution_queue&.close_all
+
+ connecting_sockets.each do |connecting_socket|
+ connecting_socket.close unless connecting_socket.closed?
+ end
+ end
+ end
+
+ def self.specified_family_name_and_next_state(hostname)
+ if hostname.match?(IPV6_ADRESS_FORMAT) then [:ipv6, :v6c]
+ elsif hostname.match?(/^([0-9]{1,3}\.){3}[0-9]{1,3}$/) then [:ipv4, :v4c]
+ end
+ end
+ private_class_method :specified_family_name_and_next_state
+
+ def self.hostname_resolution(family, host, port, hostname_resolution_queue)
+ begin
+ resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM)
+ hostname_resolution_queue.add_resolved(family, resolved_addrinfos)
+ rescue => e
+ hostname_resolution_queue.add_error(family, e)
+ end
+ end
+ private_class_method :hostname_resolution
+
+ def self.second_to_timeout(ends_at)
+ return 0 unless ends_at
+
+ remaining = (ends_at - current_clocktime)
+ remaining.negative? ? 0 : remaining
+ end
+ private_class_method :second_to_timeout
+
+ def self.current_clocktime
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+ private_class_method :current_clocktime
+
+ class SelectableAddrinfos
+ PRIORITY_ON_V6 = [:ipv6, :ipv4]
+ PRIORITY_ON_V4 = [:ipv4, :ipv6]
+
+ def initialize
+ @addrinfo_dict = {}
+ @last_family = nil
+ end
+
+ def add(family_name, addrinfos)
+ @addrinfo_dict[family_name] = addrinfos
+ end
+
+ def get
+ return nil if empty?
+
+ if @addrinfo_dict.size == 1
+ @addrinfo_dict.each { |_, addrinfos| return addrinfos.shift }
+ end
+
+ precedences = case @last_family
+ when :ipv4, nil then PRIORITY_ON_V6
+ when :ipv6 then PRIORITY_ON_V4
+ end
+
+ precedences.each do |family_name|
+ addrinfo = @addrinfo_dict[family_name]&.shift
+ next unless addrinfo
+
+ @last_family = family_name
+ return addrinfo
+ end
+ end
+
+ def empty?
+ @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? }
+ end
+
+ def any?
+ !empty?
+ end
+ end
+ private_constant :SelectableAddrinfos
+
+ class NoHostnameResolutionQueue
+ def waiting_pipe
+ nil
+ end
+
+ def add_resolved(_, _)
+ raise StandardError, "This #{self.class} cannot respond to:"
+ end
+
+ def add_error(_, _)
+ raise StandardError, "This #{self.class} cannot respond to:"
+ end
+
+ def get
+ nil
+ end
+
+ def opened?
+ false
+ end
+
+ def closed?
+ true
+ end
+
+ def close_all
+ # Do nothing
+ end
+ end
+ private_constant :NoHostnameResolutionQueue
+
+ class HostnameResolutionQueue
+ def initialize(size)
+ @size = size
+ @taken_count = 0
+ @rpipe, @wpipe = IO.pipe
+ @queue = Queue.new
+ @mutex = Mutex.new
+ end
+
+ def waiting_pipe
+ [@rpipe]
+ end
+
+ def add_resolved(family, resolved_addrinfos)
+ @mutex.synchronize do
+ @queue.push [family, resolved_addrinfos]
+ @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
+ end
+ end
+
+ def add_error(family, error)
+ @mutex.synchronize do
+ @queue.push [family, error]
+ @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
+ end
+ end
+
+ def get
+ return nil if @queue.empty?
+
+ res = nil
+
+ @mutex.synchronize do
+ @rpipe.getbyte
+ res = @queue.pop
+ end
+
+ @taken_count += 1
+ close_all if @taken_count == @size
+ res
+ end
+
+ def closed?
+ @rpipe.closed?
+ end
+
+ def opened?
+ !closed?
+ end
+
+ def close_all
+ @queue.close unless @queue.closed?
+ @rpipe.close unless @rpipe.closed?
+ @wpipe.close unless @wpipe.closed?
+ end
+ end
+ private_constant :HostnameResolutionQueue
+
+ class ConnectingSockets
+ def initialize
+ @socket_dict = {}
+ end
+
+ def all
+ @socket_dict.keys
+ end
+
+ def add(socket, addrinfo)
+ @socket_dict[socket] = addrinfo
+ end
+
+ def delete(socket)
+ @socket_dict.delete socket
+ end
+
+ def empty?
+ @socket_dict.empty?
+ end
+
+ def each
+ @socket_dict.keys.each do |socket|
+ yield socket
+ end
+ end
+ end
+ private_constant :ConnectingSockets
+
+ def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
last_error = nil
ret = nil
@@ -664,6 +1176,7 @@ class Socket < BasicSocket
ret
end
end
+ private_class_method :tcp_without_fast_fallback
# :stopdoc:
def self.ip_sockets_port0(ai_list, reuseaddr)
@@ -1226,9 +1739,10 @@ class UDPSocket < IPSocket
# The first element of the results, _mesg_, is the data received.
# The second element, _sender_inet_addr_, is an array to represent the sender address.
#
- # When recvfrom(2) returns 0,
- # Socket#recvfrom_nonblock returns an empty string as data.
- # It means an empty packet.
+ # When recvfrom(2) returns 0, Socket#recv_nonblock returns nil.
+ # In most cases it means the connection was closed, but it may also mean
+ # an empty packet was received, as the underlying API makes
+ # it impossible to distinguish these two cases.
#
# === Parameters
# * +maxlen+ - the number of bytes to receive from the socket