summaryrefslogtreecommitdiff
path: root/ext/socket/lib
diff options
context:
space:
mode:
authorMisaki Shioi <31817032+shioimm@users.noreply.github.com>2024-07-30 12:58:31 +0900
committerGitHub <noreply@github.com>2024-07-30 12:58:31 +0900
commitb3baa11ee90c3fc0a3c43f1ac3293558fbfe76cb (patch)
tree3d54588d814bd3bd4aa060f029cf994e399eae00 /ext/socket/lib
parent7ea678b24b88ec0db9c38d7763d9d1014b5ed46d (diff)
Improve Socket.tcp (#11187)
[Feature #20646]Improve Socket.tcp This is a proposed improvement to `Socket.tcp`, which has implemented Happy Eyeballs version 2 (RFC8305) in PR9374. 1. Background I implemented Happy Eyeballs version 2 (HEv2) for Socket.tcp in PR9374, but several issues have been identified: - `IO.select` waits for name resolution or connection establishment in v46w, but it does not consider the case where both events occur simultaneously when it returns a value. - In this case, Socket.tcp can only capture one event and needs to execute an unnecessary loop to capture the other one, calling `IO.select` one extra time. - `IO.select` waits for both IPv6/IPv4 name resolution (in start), but when it returns a value, it doesn't consider the case where name resolution for both address families is complete. - In this case, `Socket.tcp` can only obtain the addresses of one address family and needs to execute an unnecessary loop obtain the other addresses, calling `IO.select` one extra time. - The consideration for `connect_timeout` was insufficient. After initiating one or more connections, it raises a 'user specified timeout' after the `connect_timeout` period even if there were addresses that have been resolved and have not yet tried to connect. - It does not retry with another address in case of a connection failure. - It executes unnecessary state transitions even when an IP address is passed as the `host` argument. - The regex for IP addresses did not correctly specify the start and end. 2. Proposal & Outcome To overcome the aforementioned issues, this PR introduces the following changes: - Previously, each loop iteration represented a single state transition. This has been changed to execute all processes that meet the execution conditions within a single loop iteration. - This prevents unnecessary repeated loops and calling `IO.select` - Introduced logic to determine the timeout value set for `IO.select`. During the Resolution Delay and Connection Attempt Delay, the user-specified timeout is ignored. Otherwise, the timeout value is set to the larger of `resolv_timeout` and `connect_timeout`. - This ensures that the `connect_timeout` is only detected after attempting to connect to all resolved addresses. - Retry with another address in case of a connection failure. - This prevents unnecessary repeated loops upon connection failure. - Call `tcp_without_fast_fallback` when an IP address is passed as the host argument. - This prevents unnecessary state transitions when an IP address is passed. - Fixed regex for IP addresses. Additionally, the code has been reduced by over 100 lines, and redundancy has been minimized, which is expected to improve readability. 3. Performance No significant performance changes were observed in the happy case before and after the improvement. However, improvements in state transition deficiencies are expected to enhance performance in edge cases. ```ruby require 'socket' require 'benchmark' Benchmark.bmbm do |x| x.report('fast_fallback: true') do 30.times { Socket.tcp("www.ruby-lang.org", 80) } end x.report('fast_fallback: false') do # Ruby3.3時点と同じ 30.times { Socket.tcp("www.ruby-lang.org", 80, fast_fallback: false) } end end ``` Before: ``` ~/s/build ❯❯❯ ../install/bin/ruby ../ruby/test.rb user system total real fast_fallback: true 0.021315 0.040723 0.062038 ( 0.504866) fast_fallback: false 0.007553 0.026248 0.033801 ( 0.533211) ``` After: ``` ~/s/build ❯❯❯ ../install/bin/ruby ../ruby/test.rb user system total real fast_fallback: true 0.023081 0.040525 0.063606 ( 0.406219) fast_fallback: false 0.007302 0.025515 0.032817 ( 0.418680) ```
Diffstat (limited to 'ext/socket/lib')
-rw-r--r--ext/socket/lib/socket.rb729
1 files changed, 313 insertions, 416 deletions
diff --git a/ext/socket/lib/socket.rb b/ext/socket/lib/socket.rb
index e953077fe6..86f2a523eb 100644
--- a/ext/socket/lib/socket.rb
+++ b/ext/socket/lib/socket.rb
@@ -614,7 +614,7 @@ class Socket < BasicSocket
HOSTNAME_RESOLUTION_QUEUE_UPDATED = 0
private_constant :HOSTNAME_RESOLUTION_QUEUE_UPDATED
- IPV6_ADRESS_FORMAT = /(?i)(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?::[0-9A-F]{1,4}::[0-9A-F]{1,4}|::(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|::(?:[0-9A-F]{1,4}::[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,6}[0-9A-F]{1,4}|:))(?:%.+)?/
+ IPV6_ADRESS_FORMAT = /\A(?i:(?:(?:[0-9A-F]{1,4}:){7}(?:[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){6}(?:[0-9A-F]{1,4}|:(?:[0-9A-F]{1,4}:){1,5}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){5}(?:(?::[0-9A-F]{1,4}){1,2}|:(?:[0-9A-F]{1,4}:){1,4}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){4}(?:(?::[0-9A-F]{1,4}){1,3}|:(?:[0-9A-F]{1,4}:){1,3}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){3}(?:(?::[0-9A-F]{1,4}){1,4}|:(?:[0-9A-F]{1,4}:){1,2}[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){2}(?:(?::[0-9A-F]{1,4}){1,5}|:(?:[0-9A-F]{1,4}:)[0-9A-F]{1,4}|:)|(?:[0-9A-F]{1,4}:){1}(?:(?::[0-9A-F]{1,4}){1,6}|:(?:[0-9A-F]{1,4}:){0,5}[0-9A-F]{1,4}|:)|(?:::(?:[0-9A-F]{1,4}:){0,7}[0-9A-F]{1,4}|::)))(?:%.+)?\z/
private_constant :IPV6_ADRESS_FORMAT
@tcp_fast_fallback = true
@@ -649,534 +649,431 @@ class Socket < BasicSocket
# puts sock.read
# }
def self.tcp(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil, fast_fallback: tcp_fast_fallback, &block) # :yield: socket
- unless fast_fallback
- return tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
+ sock = if fast_fallback && !(host && ip_address?(host))
+ tcp_with_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
+ else
+ tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
+ end
+
+ if block_given?
+ begin
+ yield sock
+ ensure
+ sock.close
+ end
+ else
+ sock
+ end
+ end
+
+ def self.tcp_with_fast_fallback(host, port, local_host = nil, local_port = nil, connect_timeout: nil, resolv_timeout: nil)
+ if local_host || local_port
+ local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, timeout: resolv_timeout)
+ resolving_family_names = local_addrinfos.map { |lai| ADDRESS_FAMILIES.key(lai.afamily) }.uniq
+ else
+ local_addrinfos = []
+ resolving_family_names = ADDRESS_FAMILIES.keys
end
- # Happy Eyeballs' states
- # - :start
- # - :v6c
- # - :v4w
- # - :v4c
- # - :v46c
- # - :v46w
- # - :success
- # - :failure
- # - :timeout
-
- specified_family_name = nil
hostname_resolution_threads = []
- hostname_resolution_queue = nil
- hostname_resolution_waiting = nil
- hostname_resolution_expires_at = nil
- selectable_addrinfos = SelectableAddrinfos.new
- connecting_sockets = ConnectingSockets.new
- local_addrinfos = []
- connection_attempt_delay_expires_at = nil
- connection_attempt_started_at = nil
- state = :start
- connected_socket = nil
- last_error = nil
+ resolution_store = HostnameResolutionStore.new(resolving_family_names)
+ connecting_sockets = {}
is_windows_environment ||= (RUBY_PLATFORM =~ /mswin|mingw|cygwin/)
- ret = loop do
- case state
- when :start
- specified_family_name, next_state = host && specified_family_name_and_next_state(host)
+ now = current_clock_time
+ resolution_delay_expires_at = nil
+ connection_attempt_delay_expires_at = nil
+ user_specified_connect_timeout_at = nil
+ last_error = nil
- if local_host && local_port
- specified_family_name, next_state = specified_family_name_and_next_state(local_host) unless specified_family_name
- local_addrinfos = Addrinfo.getaddrinfo(local_host, local_port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout)
- end
+ if resolving_family_names.size == 1
+ family_name = resolving_family_names.first
+ addrinfos = Addrinfo.getaddrinfo(host, port, family_name, :STREAM, timeout: resolv_timeout)
+ resolution_store.add_resolved(family_name, addrinfos)
+ hostname_resolution_result = nil
+ hostname_resolution_notifier = nil
+ user_specified_resolv_timeout_at = nil
+ else
+ hostname_resolution_result = HostnameResolutionResult.new(resolving_family_names.size)
+ hostname_resolution_notifier = hostname_resolution_result.notifier
+
+ hostname_resolution_threads.concat(
+ resolving_family_names.map { |family|
+ thread_args = [family, host, port, hostname_resolution_result]
+ thread = Thread.new(*thread_args) { |*thread_args| resolve_hostname(*thread_args) }
+ Thread.pass
+ thread
+ }
+ )
- if specified_family_name
- addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[specified_family_name], :STREAM, timeout: resolv_timeout)
- selectable_addrinfos.add(specified_family_name, addrinfos)
- hostname_resolution_queue = NoHostnameResolutionQueue.new
- state = next_state
- next
- end
+ user_specified_resolv_timeout_at = resolv_timeout ? now + resolv_timeout : Float::INFINITY
+ end
- resolving_family_names = ADDRESS_FAMILIES.keys
- hostname_resolution_queue = HostnameResolutionQueue.new(resolving_family_names.size)
- hostname_resolution_waiting = hostname_resolution_queue.waiting_pipe
- hostname_resolution_started_at = current_clocktime if resolv_timeout
- hostname_resolution_args = [host, port, hostname_resolution_queue]
-
- hostname_resolution_threads.concat(
- resolving_family_names.map { |family|
- thread_args = [family, *hostname_resolution_args]
- thread = Thread.new(*thread_args) { |*thread_args| hostname_resolution(*thread_args) }
- Thread.pass
- thread
- }
- )
-
- hostname_resolution_retry_count = resolving_family_names.size - 1
- hostname_resolution_expires_at = hostname_resolution_started_at + resolv_timeout if resolv_timeout
-
- while hostname_resolution_retry_count >= 0
- remaining = resolv_timeout ? second_to_timeout(hostname_resolution_started_at + resolv_timeout) : nil
- hostname_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, remaining)
-
- unless hostname_resolved
- state = :timeout
- break
+ loop do
+ if resolution_store.any_addrinfos? &&
+ !resolution_delay_expires_at &&
+ !connection_attempt_delay_expires_at
+ while (addrinfo = resolution_store.get_addrinfo)
+ if local_addrinfos.any?
+ local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily }
+
+ if local_addrinfo.nil? # Connecting addrinfoと同じアドレスファミリのLocal addrinfoがない
+ if resolution_store.any_addrinfos?
+ # Try other Addrinfo in next "while"
+ next
+ elsif connecting_sockets.any? || resolution_store.any_unresolved_family?
+ # Exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
+ break
+ else
+ raise SocketError.new 'no appropriate local address'
+ end
+ end
end
- family_name, res = hostname_resolution_queue.get
-
- if res.is_a? Exception
- unless (Socket.const_defined?(:EAI_ADDRFAMILY)) &&
- (res.is_a?(Socket::ResolutionError)) &&
- (res.error_code == Socket::EAI_ADDRFAMILY)
- last_error = res
+ begin
+ if resolution_store.any_addrinfos? ||
+ connecting_sockets.any? ||
+ resolution_store.any_unresolved_family?
+ socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
+ socket.bind(local_addrinfo) if local_addrinfo
+ result = socket.connect_nonblock(addrinfo, exception: false)
+ else
+ result = socket = local_addrinfo ?
+ addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) :
+ addrinfo.connect(timeout: connect_timeout)
end
- if hostname_resolution_retry_count.zero?
- state = :failure
+ if result == :wait_writable
+ connection_attempt_delay_expires_at = now + CONNECTION_ATTEMPT_DELAY
+ if resolution_store.empty_addrinfos?
+ user_specified_connect_timeout_at = connect_timeout ? now + connect_timeout : Float::INFINITY
+ end
+
+ connecting_sockets[socket] = addrinfo
break
+ else
+ return socket # connection established
+ end
+ rescue SystemCallError => e
+ socket&.close
+ last_error = e
+
+ if resolution_store.any_addrinfos?
+ # Try other Addrinfo in next "while"
+ next
+ elsif connecting_sockets.any? || resolution_store.any_unresolved_family?
+ # Exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
+ break
+ else
+ raise last_error
end
- hostname_resolution_retry_count -= 1
- else
- state = case family_name
- when :ipv6 then :v6c
- when :ipv4 then hostname_resolution_queue.closed? ? :v4c : :v4w
- end
- selectable_addrinfos.add(family_name, res)
- last_error = nil
- break
end
end
+ end
- next
- when :v4w
- ipv6_resolved, _, = IO.select(hostname_resolution_waiting, nil, nil, RESOLUTION_DELAY)
-
- if ipv6_resolved
- family_name, res = hostname_resolution_queue.get
- selectable_addrinfos.add(family_name, res) unless res.is_a? Exception
- state = :v46c
+ ends_at =
+ if resolution_store.any_addrinfos?
+ resolution_delay_expires_at || connection_attempt_delay_expires_at
else
- state = :v4c
+ [user_specified_resolv_timeout_at, user_specified_connect_timeout_at].compact.max
end
- next
- when :v4c, :v6c, :v46c
- connection_attempt_started_at = current_clocktime unless connection_attempt_started_at
- addrinfo = selectable_addrinfos.get
-
- if local_addrinfos.any?
- local_addrinfo = local_addrinfos.find { |lai| lai.afamily == addrinfo.afamily }
-
- if local_addrinfo.nil?
- if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
- last_error = SocketError.new 'no appropriate local address'
- state = :failure
- elsif selectable_addrinfos.any?
- # Try other Addrinfo in next loop
+ hostname_resolved, writable_sockets, except_sockets = IO.select(
+ hostname_resolution_notifier,
+ connecting_sockets.keys,
+ # Use errorfds to wait for non-blocking connect failures on Windows
+ is_windows_environment ? connecting_sockets.keys : nil,
+ second_to_timeout(current_clock_time, ends_at),
+ )
+ now = current_clock_time
+ resolution_delay_expires_at = nil if expired?(now, resolution_delay_expires_at)
+ connection_attempt_delay_expires_at = nil if expired?(now, connection_attempt_delay_expires_at)
+
+ if writable_sockets&.any?
+ while (writable_socket = writable_sockets.pop)
+ is_connected = is_windows_environment || (
+ sockopt = writable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
+ sockopt.int.zero?
+ )
+
+ if is_connected
+ connecting_sockets.delete writable_socket
+ return writable_socket
+ else
+ failed_ai = connecting_sockets.delete writable_socket
+ writable_socket.close
+ ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
+ last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int)
+
+ if writable_sockets.any? ||
+ resolution_store.any_addrinfos? ||
+ connecting_sockets.any? ||
+ resolution_store.any_unresolved_family?
+ user_specified_connect_timeout_at = nil if connecting_sockets.empty?
+ # Try other writable socket in next "while"
+ # Or exit this "while" and try other connection attempt
+ # Or exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
else
- if resolv_timeout && hostname_resolution_queue.opened? &&
- (current_clocktime >= hostname_resolution_expires_at)
- state = :timeout
- else
- # Wait for connection to be established or hostname resolution in next loop
- connection_attempt_delay_expires_at = nil
- state = :v46w
- end
+ raise last_error
end
- next
end
end
+ end
- connection_attempt_delay_expires_at = current_clocktime + CONNECTION_ATTEMPT_DELAY
-
- begin
- result = if specified_family_name && selectable_addrinfos.empty? &&
- connecting_sockets.empty? && hostname_resolution_queue.closed?
- local_addrinfo ?
- addrinfo.connect_from(local_addrinfo, timeout: connect_timeout) :
- addrinfo.connect(timeout: connect_timeout)
- else
- socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol)
- socket.bind(local_addrinfo) if local_addrinfo
- socket.connect_nonblock(addrinfo, exception: false)
- end
-
- case result
- when 0
- connected_socket = socket
- state = :success
- when Socket
- connected_socket = result
- state = :success
- when :wait_writable
- connecting_sockets.add(socket, addrinfo)
- state = :v46w
- end
- rescue SystemCallError => e
- last_error = e
- socket.close if socket && !socket.closed?
-
- if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
- state = :failure
- elsif selectable_addrinfos.any?
- # Try other Addrinfo in next loop
+ if except_sockets&.any?
+ except_sockets.each do |except_socket|
+ failed_ai = connecting_sockets.delete except_socket
+ sockopt = except_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME)
+ except_socket.close
+ ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
+ last_error = SystemCallError.new("connect(2) for #{ip_address}:#{failed_ai.ip_port}", sockopt.int)
+
+ if writable_sockets.any? ||
+ resolution_store.any_addrinfos? ||
+ connecting_sockets.any? ||
+ resolution_store.any_unresolved_family?
+ user_specified_connect_timeout_at = nil if connecting_sockets.empty?
+ # Try other writable socket in next "while"
+ # Or exit this "while" and try other connection attempt
+ # Or exit this "while" and wait for connections to be established or hostname resolution in next loop
+ # Or exit this "while" and wait for hostname resolution in next loop
else
- if resolv_timeout && hostname_resolution_queue.opened? &&
- (current_clocktime >= hostname_resolution_expires_at)
- state = :timeout
- else
- # Wait for connection to be established or hostname resolution in next loop
- connection_attempt_delay_expires_at = nil
- state = :v46w
- end
+ raise last_error
end
end
+ end
- next
- when :v46w
- if connect_timeout && second_to_timeout(connection_attempt_started_at + connect_timeout).zero?
- state = :timeout
- next
- end
-
- remaining = second_to_timeout(connection_attempt_delay_expires_at)
- hostname_resolution_waiting = hostname_resolution_waiting && hostname_resolution_queue.closed? ? nil : hostname_resolution_waiting
- hostname_resolved, connectable_sockets, = IO.select(hostname_resolution_waiting, connecting_sockets.all, nil, remaining)
-
- if connectable_sockets&.any?
- while (connectable_socket = connectable_sockets.pop)
- is_connected =
- if is_windows_environment
- sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_CONNECT_TIME)
- sockopt.unpack('i').first >= 0
- else
- sockopt = connectable_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR)
- sockopt.int.zero?
- end
-
- if is_connected
- connected_socket = connectable_socket
- connecting_sockets.delete connectable_socket
- connectable_sockets.each do |other_connectable_socket|
- other_connectable_socket.close unless other_connectable_socket.closed?
- end
- state = :success
- break
- else
- failed_ai = connecting_sockets.delete connectable_socket
- inspected_ip_address = failed_ai.ipv6? ? "[#{failed_ai.ip_address}]" : failed_ai.ip_address
- last_error = SystemCallError.new("connect(2) for #{inspected_ip_address}:#{failed_ai.ip_port}", sockopt.int)
- connectable_socket.close unless connectable_socket.closed?
+ if hostname_resolved&.any?
+ while (family_and_result = hostname_resolution_result.get)
+ family_name, result = family_and_result
- next if connectable_sockets.any?
+ if result.is_a? Exception
+ resolution_store.add_error(family_name, result)
- if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
- state = :failure
- elsif selectable_addrinfos.any?
- # Wait for connection attempt delay timeout in next loop
- else
- if resolv_timeout && hostname_resolution_queue.opened? &&
- (current_clocktime >= hostname_resolution_expires_at)
- state = :timeout
- else
- # Wait for connection to be established or hostname resolution in next loop
- connection_attempt_delay_expires_at = nil
- end
- end
+ unless (Socket.const_defined?(:EAI_ADDRFAMILY)) &&
+ (result.is_a?(Socket::ResolutionError)) &&
+ (result.error_code == Socket::EAI_ADDRFAMILY)
+ last_error = result
end
- end
- elsif hostname_resolved&.any?
- family_name, res = hostname_resolution_queue.get
- selectable_addrinfos.add(family_name, res) unless res.is_a? Exception
- else
- if selectable_addrinfos.empty? && connecting_sockets.empty? && hostname_resolution_queue.closed?
- state = :failure
- elsif selectable_addrinfos.any?
- # Try other Addrinfo in next loop
- state = :v46c
else
- if resolv_timeout && hostname_resolution_queue.opened? &&
- (current_clocktime >= hostname_resolution_expires_at)
- state = :timeout
- else
- # Wait for connection to be established or hostname resolution in next loop
- connection_attempt_delay_expires_at = nil
- end
+ resolution_store.add_resolved(family_name, result)
end
end
- next
- when :success
- break connected_socket
- when :failure
- raise last_error
- when :timeout
- raise Errno::ETIMEDOUT, 'user specified timeout'
+ if resolution_store.resolved?(:ipv4)
+ if resolution_store.resolved?(:ipv6)
+ hostname_resolution_notifier = nil
+ resolution_delay_expires_at = nil
+ user_specified_resolv_timeout_at = nil
+ elsif resolution_store.resolved_successfully?(:ipv4)
+ resolution_delay_expires_at = now + RESOLUTION_DELAY
+ end
+ end
end
- end
- if block_given?
- begin
- yield ret
- ensure
- ret.close
+ if resolution_store.empty_addrinfos?
+ if connecting_sockets.empty? && resolution_store.resolved_all_families?
+ raise last_error
+ end
+
+ if (expired?(now, user_specified_resolv_timeout_at) || resolution_store.resolved_all_families?) &&
+ (expired?(now, user_specified_connect_timeout_at) || connecting_sockets.empty?)
+ raise Errno::ETIMEDOUT, 'user specified timeout'
+ end
end
- else
- ret
end
ensure
- if fast_fallback
- hostname_resolution_threads.each do |thread|
- thread&.exit
- end
-
- hostname_resolution_queue&.close_all
-
- connecting_sockets.each do |connecting_socket|
- connecting_socket.close unless connecting_socket.closed?
- end
+ hostname_resolution_threads.each do |thread|
+ thread.exit
end
- end
- def self.specified_family_name_and_next_state(hostname)
- if hostname.match?(IPV6_ADRESS_FORMAT) then [:ipv6, :v6c]
- elsif hostname.match?(/^([0-9]{1,3}\.){3}[0-9]{1,3}$/) then [:ipv4, :v4c]
- end
- end
- private_class_method :specified_family_name_and_next_state
+ hostname_resolution_result&.close
- def self.hostname_resolution(family, host, port, hostname_resolution_queue)
- begin
- resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM)
- hostname_resolution_queue.add_resolved(family, resolved_addrinfos)
- rescue => e
- hostname_resolution_queue.add_error(family, e)
+ connecting_sockets.each_key do |connecting_socket|
+ connecting_socket.close
end
end
- private_class_method :hostname_resolution
-
- def self.second_to_timeout(ends_at)
- return 0 unless ends_at
- remaining = (ends_at - current_clocktime)
- remaining.negative? ? 0 : remaining
- end
- private_class_method :second_to_timeout
-
- def self.current_clocktime
- Process.clock_gettime(Process::CLOCK_MONOTONIC)
- end
- private_class_method :current_clocktime
-
- class SelectableAddrinfos
- PRIORITY_ON_V6 = [:ipv6, :ipv4]
- PRIORITY_ON_V4 = [:ipv4, :ipv6]
-
- def initialize
- @addrinfo_dict = {}
- @last_family = nil
- end
+ def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
+ last_error = nil
+ ret = nil
- def add(family_name, addrinfos)
- @addrinfo_dict[family_name] = addrinfos
+ local_addr_list = nil
+ if local_host != nil || local_port != nil
+ local_addr_list = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, nil)
end
- def get
- return nil if empty?
-
- if @addrinfo_dict.size == 1
- @addrinfo_dict.each { |_, addrinfos| return addrinfos.shift }
+ Addrinfo.foreach(host, port, nil, :STREAM, timeout: resolv_timeout) {|ai|
+ if local_addr_list
+ local_addr = local_addr_list.find {|local_ai| local_ai.afamily == ai.afamily }
+ next unless local_addr
+ else
+ local_addr = nil
end
-
- precedences = case @last_family
- when :ipv4, nil then PRIORITY_ON_V6
- when :ipv6 then PRIORITY_ON_V4
- end
-
- precedences.each do |family_name|
- addrinfo = @addrinfo_dict[family_name]&.shift
- next unless addrinfo
-
- @last_family = family_name
- return addrinfo
+ begin
+ sock = local_addr ?
+ ai.connect_from(local_addr, timeout: connect_timeout) :
+ ai.connect(timeout: connect_timeout)
+ rescue SystemCallError
+ last_error = $!
+ next
+ end
+ ret = sock
+ break
+ }
+ unless ret
+ if last_error
+ raise last_error
+ else
+ raise SocketError, "no appropriate local address"
end
end
- def empty?
- @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? }
- end
-
- def any?
- !empty?
- end
+ ret
end
- private_constant :SelectableAddrinfos
-
- class NoHostnameResolutionQueue
- def waiting_pipe
- nil
- end
+ private_class_method :tcp_without_fast_fallback
- def add_resolved(_, _)
- raise StandardError, "This #{self.class} cannot respond to:"
- end
+ def self.ip_address?(hostname)
+ hostname.match?(IPV6_ADRESS_FORMAT) || hostname.match?(/\A([0-9]{1,3}\.){3}[0-9]{1,3}\z/)
+ end
+ private_class_method :ip_address?
- def add_error(_, _)
- raise StandardError, "This #{self.class} cannot respond to:"
+ def self.resolve_hostname(family, host, port, hostname_resolution_result)
+ begin
+ resolved_addrinfos = Addrinfo.getaddrinfo(host, port, ADDRESS_FAMILIES[family], :STREAM)
+ hostname_resolution_result.add(family, resolved_addrinfos)
+ rescue => e
+ hostname_resolution_result.add(family, e)
end
+ end
+ private_class_method :resolve_hostname
- def get
- nil
- end
+ def self.current_clock_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+ private_class_method :current_clock_time
- def opened?
- false
- end
+ def self.second_to_timeout(started_at, ends_at)
+ return nil if ends_at == Float::INFINITY || ends_at.nil?
- def closed?
- true
- end
+ remaining = (ends_at - started_at)
+ remaining.negative? ? 0 : remaining
+ end
+ private_class_method :second_to_timeout
- def close_all
- # Do nothing
- end
+ def self.expired?(started_at, ends_at)
+ second_to_timeout(started_at, ends_at)&.zero?
end
- private_constant :NoHostnameResolutionQueue
+ private_class_method :expired?
- class HostnameResolutionQueue
+ class HostnameResolutionResult
def initialize(size)
@size = size
@taken_count = 0
@rpipe, @wpipe = IO.pipe
- @queue = Queue.new
+ @results = []
@mutex = Mutex.new
end
- def waiting_pipe
+ def notifier
[@rpipe]
end
- def add_resolved(family, resolved_addrinfos)
+ def add(family, result)
@mutex.synchronize do
- @queue.push [family, resolved_addrinfos]
- @wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
- end
- end
-
- def add_error(family, error)
- @mutex.synchronize do
- @queue.push [family, error]
+ @results.push [family, result]
@wpipe.putc HOSTNAME_RESOLUTION_QUEUE_UPDATED
end
end
def get
- return nil if @queue.empty?
+ return nil if @results.empty?
res = nil
@mutex.synchronize do
@rpipe.getbyte
- res = @queue.pop
+ res = @results.shift
end
@taken_count += 1
- close_all if @taken_count == @size
+ close if @taken_count == @size
res
end
- def closed?
- @rpipe.closed?
+ def close
+ @rpipe.close
+ @wpipe.close
end
+ end
+ private_constant :HostnameResolutionResult
- def opened?
- !closed?
- end
+ class HostnameResolutionStore
+ PRIORITY_ON_V6 = [:ipv6, :ipv4]
+ PRIORITY_ON_V4 = [:ipv4, :ipv6]
- def close_all
- @queue.close unless @queue.closed?
- @rpipe.close unless @rpipe.closed?
- @wpipe.close unless @wpipe.closed?
+ def initialize(family_names)
+ @family_names = family_names
+ @addrinfo_dict = {}
+ @error_dict = {}
+ @last_family = nil
end
- end
- private_constant :HostnameResolutionQueue
- class ConnectingSockets
- def initialize
- @socket_dict = {}
+ def add_resolved(family_name, addrinfos)
+ @addrinfo_dict[family_name] = addrinfos
end
- def all
- @socket_dict.keys
+ def add_error(family_name, error)
+ @addrinfo_dict[family_name] = []
+ @error_dict[family_name] = error
end
- def add(socket, addrinfo)
- @socket_dict[socket] = addrinfo
- end
+ def get_addrinfo
+ precedences =
+ case @last_family
+ when :ipv4, nil then PRIORITY_ON_V6
+ when :ipv6 then PRIORITY_ON_V4
+ end
+
+ precedences.each do |family_name|
+ addrinfo = @addrinfo_dict[family_name]&.shift
+ next unless addrinfo
+
+ @last_family = family_name
+ return addrinfo
+ end
- def delete(socket)
- @socket_dict.delete socket
+ nil
end
- def empty?
- @socket_dict.empty?
+ def empty_addrinfos?
+ @addrinfo_dict.all? { |_, addrinfos| addrinfos.empty? }
end
- def each
- @socket_dict.keys.each do |socket|
- yield socket
- end
+ def any_addrinfos?
+ !empty_addrinfos?
end
- end
- private_constant :ConnectingSockets
- def self.tcp_without_fast_fallback(host, port, local_host, local_port, connect_timeout:, resolv_timeout:, &block)
- last_error = nil
- ret = nil
+ def resolved?(family)
+ @addrinfo_dict.has_key? family
+ end
- local_addr_list = nil
- if local_host != nil || local_port != nil
- local_addr_list = Addrinfo.getaddrinfo(local_host, local_port, nil, :STREAM, nil)
+ def resolved_successfully?(family)
+ resolved?(family) && !!@error_dict[family]
end
- Addrinfo.foreach(host, port, nil, :STREAM, timeout: resolv_timeout) {|ai|
- if local_addr_list
- local_addr = local_addr_list.find {|local_ai| local_ai.afamily == ai.afamily }
- next unless local_addr
- else
- local_addr = nil
- end
- begin
- sock = local_addr ?
- ai.connect_from(local_addr, timeout: connect_timeout) :
- ai.connect(timeout: connect_timeout)
- rescue SystemCallError
- last_error = $!
- next
- end
- ret = sock
- break
- }
- unless ret
- if last_error
- raise last_error
- else
- raise SocketError, "no appropriate local address"
- end
+ def resolved_all_families?
+ (@family_names - @addrinfo_dict.keys).empty?
end
- if block_given?
- begin
- yield ret
- ensure
- ret.close
- end
- else
- ret
+
+ def any_unresolved_family?
+ !resolved_all_families?
end
end
- private_class_method :tcp_without_fast_fallback
+ private_constant :HostnameResolutionStore
# :stopdoc:
def self.ip_sockets_port0(ai_list, reuseaddr)