diff options
author | Jean Boussier <jean.boussier@gmail.com> | 2022-08-20 11:25:54 +0200 |
---|---|---|
committer | git <svn-admin@ruby-lang.org> | 2022-08-31 12:37:49 +0900 |
commit | 739380c97d2c6440da0d5ae07f9291b1e8cde1b2 (patch) | |
tree | 10db5e41472fbbb85ff20190f13c671310f5f96b | |
parent | 221a52368362a16570f5025a9a055534a49bfbe9 (diff) |
[ruby/net-protocol] Improve BufferedIO performance
`BufferedIO` is a bit inefficient for reading large responses because
it use the classic `buffer.slice!` technique which cause a lot of
unnecessary string copying.
This is particularly visible on line based protocol when reading
line by line.
Instead of repeatedly shifting the string, we can keep track of
which offset we're at, to know how many bytes are left in the buffer.
This change also open the door to further optimization by increasing
the buffer size, as previously `slice!` would get slower the larger
the buffer is.
Benchmark results:
```
=== 1k ===
Warming up --------------------------------------
1k 1.234k i/100ms
1k opt 1.283k i/100ms
Calculating -------------------------------------
1k 12.615k (± 0.9%) i/s - 64.168k in 5.086995s
1k opt 12.856k (± 0.9%) i/s - 65.433k in 5.090051s
Comparison:
1k: 12615.2 i/s
1k opt: 12856.0 i/s - 1.02x (± 0.00) faster
=== 10k ===
Warming up --------------------------------------
10k 1.165k i/100ms
10k opt 1.269k i/100ms
Calculating -------------------------------------
10k 11.550k (± 2.4%) i/s - 58.250k in 5.046378s
10k opt 12.736k (± 1.0%) i/s - 64.719k in 5.081969s
Comparison:
10k: 11550.3 i/s
10k opt: 12736.3 i/s - 1.10x (± 0.00) faster
=== 100k ===
Warming up --------------------------------------
100k 809.000 i/100ms
100k opt 926.000 i/100ms
Calculating -------------------------------------
100k 8.054k (± 3.0%) i/s - 40.450k in 5.028299s
100k opt 9.286k (± 2.2%) i/s - 47.226k in 5.088841s
Comparison:
100k: 8053.6 i/s
100k opt: 9285.5 i/s - 1.15x (± 0.00) faster
=== 1M ===
Warming up --------------------------------------
1M 249.000 i/100ms
1M opt 315.000 i/100ms
Calculating -------------------------------------
1M 2.448k (± 2.5%) i/s - 12.450k in 5.089744s
1M opt 3.119k (± 2.6%) i/s - 15.750k in 5.053772s
Comparison:
1M: 2447.8 i/s
1M opt: 3118.8 i/s - 1.27x (± 0.00) faster
```
Profiling before (1MB responses):
```
==================================
Mode: wall(1000)
Samples: 5276 (0.00% miss rate)
GC: 394 (7.47%)
==================================
TOTAL (pct) SAMPLES (pct) FRAME
1622 (30.7%) 1622 (30.7%) IO#wait_readable
777 (14.7%) 777 (14.7%) IO#read_nonblock
365 (6.9%) 365 (6.9%) (sweeping)
2705 (51.3%) 364 (6.9%) Net::BufferedIO#rbuf_fill
264 (5.0%) 264 (5.0%) String#index
223 (4.2%) 223 (4.2%) String#sub
221 (4.2%) 221 (4.2%) String#slice!
185 (3.5%) 185 (3.5%) String#split
108 (2.0%) 108 (2.0%) IO#write_nonblock
101 (1.9%) 101 (1.9%) String#downcase
66 (1.3%) 66 (1.3%) Net::BufferedIO#LOG
57 (1.1%) 57 (1.1%) String#count
51 (1.0%) 51 (1.0%) String#to_s
391 (7.4%) 50 (0.9%) Net::HTTPGenericRequest#write_header
50 (0.9%) 50 (0.9%) String#capitalize
49 (0.9%) 49 (0.9%) Array#join
47 (0.9%) 47 (0.9%) String#b
106 (2.0%) 36 (0.7%) Net::HTTPHeader#set_field
34 (0.6%) 34 (0.6%) Module#===
33 (0.6%) 33 (0.6%) String#[]
140 (2.7%) 29 (0.5%) Net::BufferedIO#write0
29 (0.5%) 29 (0.5%) (marking)
281 (5.3%) 27 (0.5%) Net::BufferedIO#rbuf_consume
1195 (22.6%) 25 (0.5%) Net::HTTPResponse#read_body
1024 (19.4%) 25 (0.5%) Net::HTTPResponse.each_response_header
86 (1.6%) 24 (0.5%) Net::HTTPHeader#set_field
23 (0.4%) 23 (0.4%) Net::HTTP#proxy_uri
51 (1.0%) 23 (0.4%) Net::HTTPHeader#initialize_http_header
2225 (42.2%) 22 (0.4%) Net::BufferedIO#readuntil
20 (0.4%) 20 (0.4%) Regexp#===
```
Profiling after (1MB responses):
```
==================================
Mode: wall(1000)
Samples: 15180 (0.00% miss rate)
GC: 1688 (11.12%)
==================================
TOTAL (pct) SAMPLES (pct) FRAME
4534 (29.9%) 4534 (29.9%) IO#read_nonblock
10650 (70.2%) 3944 (26.0%) Net::HTTPOpt::BufferedIOOpt#rbuf_fill
2101 (13.8%) 2101 (13.8%) IO#wait_readable
1442 (9.5%) 1442 (9.5%) (sweeping)
360 (2.4%) 360 (2.4%) String#sub
312 (2.1%) 312 (2.1%) String#split
265 (1.7%) 265 (1.7%) String#bytesize
246 (1.6%) 246 (1.6%) (marking)
151 (1.0%) 151 (1.0%) IO#write_nonblock
125 (0.8%) 125 (0.8%) String#downcase
116 (0.8%) 116 (0.8%) String#index
113 (0.7%) 113 (0.7%) Module#===
162 (1.1%) 89 (0.6%) Net::HTTPOpt::BufferedIOOpt#rbuf_consume_all_shareable!
158 (1.0%) 65 (0.4%) Net::HTTPHeader#set_field
63 (0.4%) 63 (0.4%) String#capitalize
63 (0.4%) 63 (0.4%) BasicObject#equal?
58 (0.4%) 58 (0.4%) Regexp#match
58 (0.4%) 58 (0.4%) String#[]
449 (3.0%) 56 (0.4%) Net::HTTPGenericRequest#write_header
53 (0.3%) 53 (0.3%) String#to_s
52 (0.3%) 52 (0.3%) Net::HTTPOpt::BufferedIOOpt#LOG
52 (0.3%) 52 (0.3%) String#count
44 (0.3%) 44 (0.3%) String#byteslice
44 (0.3%) 44 (0.3%) Array#join
1096 (7.2%) 42 (0.3%) Net::HTTPResponse.each_response_header
2617 (17.2%) 40 (0.3%) Net::HTTPOpt::BufferedIOOpt#readuntil
132 (0.9%) 30 (0.2%) Net::HTTPOpt::BufferedIOOpt#rbuf_consume
28 (0.2%) 28 (0.2%) Regexp#===
27 (0.2%) 27 (0.2%) Net::HTTP#proxy_uri
8862 (58.4%) 27 (0.2%) Net::HTTPResponse#read_body
````
Benchmark code:
```ruby
require "fileutils"
DIR = "/tmp/www"
FileUtils.mkdir_p(DIR)
HOST = "127.0.0.1"
PORT = 8080
CONF = <<~EOS
daemon off;
worker_processes 2;
events {
worker_connections 128;
}
http {
server_tokens off;
charset utf-8;
server {
server_name localhost;
listen #{HOST}:#{PORT};
keepalive_requests 10000000;
keepalive_timeout 3600s;
error_page 500 502 503 504 /50x.html;
location / {
root #{DIR};
}
}
}
EOS
File.write(File.join(DIR, "1k.txt"), 'a' * 1024)
File.write(File.join(DIR, "10k.txt"), 'a' * 1024 * 10)
File.write(File.join(DIR, "100k.txt"), 'a' * 1024 * 100)
File.write(File.join(DIR, "1M.txt"), 'a' * 1024 * 1024)
File.write(File.join(DIR, "nginx.conf"), CONF)
require "benchmark/ips"
require "net/http"
nginx_pid = Process.spawn('nginx', '-c', File.join(DIR, "nginx.conf"))
module Net
class HTTPOpt < HTTP
class BufferedIOOpt < ::Net::BufferedIO #:nodoc: internal use only
def initialize(io, read_timeout: 60, write_timeout: 60, continue_timeout: nil, debug_output: nil)
@io = io
@read_timeout = read_timeout
@write_timeout = write_timeout
@continue_timeout = continue_timeout
@debug_output = debug_output
@rbuf = ''.b
@rbuf_offset = 0
end
attr_reader :io
attr_accessor :read_timeout
attr_accessor :write_timeout
attr_accessor :continue_timeout
attr_accessor :debug_output
def inspect
"#<#{self.class} io=#{@io}>"
end
def eof?
@io.eof?
end
def closed?
@io.closed?
end
def close
@io.close
end
#
# Read
#
public
def read(len, dest = ''.b, ignore_eof = false)
LOG "reading #{len} bytes..."
read_bytes = 0
begin
while read_bytes + rbuf_size < len
if s = rbuf_consume_all_shareable!
read_bytes += s.bytesize
dest << s
end
rbuf_fill
end
s = rbuf_consume(len - read_bytes)
read_bytes += s.bytesize
dest << s
rescue EOFError
raise unless ignore_eof
end
LOG "read #{read_bytes} bytes"
dest
end
def read_all(dest = ''.b)
LOG 'reading all...'
read_bytes = 0
begin
while true
if s = rbuf_consume_all_shareable!
read_bytes += s.bytesize
dest << s
end
rbuf_fill
end
rescue EOFError
;
end
LOG "read #{read_bytes} bytes"
dest
end
def readuntil(terminator, ignore_eof = false)
offset = @rbuf_offset
begin
until idx = @rbuf.index(terminator, offset)
offset = @rbuf.bytesize
rbuf_fill
end
return rbuf_consume(idx + terminator.bytesize - @rbuf_offset)
rescue EOFError
raise unless ignore_eof
return rbuf_consume
end
end
def readline
readuntil("\n").chop
end
private
BUFSIZE = 1024 * 16
def rbuf_fill
tmp = @rbuf_empty ? @rbuf : nil
case rv = @io.read_nonblock(BUFSIZE, tmp, exception: false)
when String
@rbuf_empty = false
if rv.equal?(tmp)
@rbuf_offset = 0
else
@rbuf << rv
rv.clear
end
return
when :wait_readable
(io = @io.to_io).wait_readable(@read_timeout) or raise Net::ReadTimeout.new(io)
# continue looping
when :wait_writable
# OpenSSL::Buffering#read_nonblock may fail with IO::WaitWritable.
# http://www.openssl.org/support/faq.html#PROG10
(io = @io.to_io).wait_writable(@read_timeout) or raise Net::ReadTimeout.new(io)
# continue looping
when nil
raise EOFError, 'end of file reached'
end while true
end
def rbuf_flush
if @rbuf_empty
@rbuf.clear
@rbuf_offset = 0
end
nil
end
def rbuf_size
@rbuf.bytesize - @rbuf_offset
end
# Warning: this method may share the buffer to avoid
# copying. The caller must no longer use the returned
# string once rbuf_fill has been called again
def rbuf_consume_all_shareable!
@rbuf_empty = true
buf = if @rbuf_offset == 0
@rbuf
else
@rbuf.byteslice(@rbuf_offset..-1)
end
@rbuf_offset = @rbuf.bytesize
buf
end
def rbuf_consume(len = nil)
if @rbuf_offset == 0 && (len.nil? || len == @rbuf.bytesize)
s = @rbuf
@rbuf = ''.b
@rbuf_offset = 0
@rbuf_empty = true
elsif len.nil?
s = @rbuf.byteslice(@rbuf_offset..-1)
@rbuf = ''.b
@rbuf_offset = 0
@rbuf_empty = true
else
s = @rbuf.byteslice(@rbuf_offset, len)
@rbuf_offset += len
@rbuf_empty = @rbuf_offset == @rbuf.bytesize
rbuf_flush
end
@debug_output << %Q[-> #{s.dump}\n] if @debug_output
s
end
#
# Write
#
public
def write(*strs)
writing {
write0(*strs)
}
end
alias << write
def writeline(str)
writing {
write0 str + "\r\n"
}
end
private
def writing
@written_bytes = 0
@debug_output << '<- ' if @debug_output
yield
@debug_output << "\n" if @debug_output
bytes = @written_bytes
@written_bytes = nil
bytes
end
def write0(*strs)
@debug_output << strs.map(&:dump).join if @debug_output
orig_written_bytes = @written_bytes
strs.each_with_index do |str, i|
need_retry = true
case len = @io.write_nonblock(str, exception: false)
when Integer
@written_bytes += len
len -= str.bytesize
if len == 0
if strs.size == i+1
return @written_bytes - orig_written_bytes
else
need_retry = false
# next string
end
elsif len < 0
str = str.byteslice(len, -len)
else # len > 0
need_retry = false
# next string
end
# continue looping
when :wait_writable
(io = @io.to_io).wait_writable(@write_timeout) or raise Net::WriteTimeout.new(io)
# continue looping
end while need_retry
end
end
#
# Logging
#
private
def LOG_off
@save_debug_out = @debug_output
@debug_output = nil
end
def LOG_on
@debug_output = @save_debug_out
end
def LOG(msg)
return unless @debug_output
@debug_output << msg + "\n"
end
end
BufferedIO = BufferedIOOpt
# Unchanged from ruby 3.1.1, only allow to lookup the mofidied BufferedIO
def connect
if use_ssl?
# reference early to load OpenSSL before connecting,
# as OpenSSL may take time to load.
@ssl_context = OpenSSL::SSL::SSLContext.new
end
if proxy? then
conn_addr = proxy_address
conn_port = proxy_port
else
conn_addr = conn_address
conn_port = port
end
D "opening connection to #{conn_addr}:#{conn_port}..."
begin
s = Socket.tcp conn_addr, conn_port, @local_host, @local_port, connect_timeout: @open_timeout
rescue => e
e = Net::OpenTimeout.new(e) if e.is_a?(Errno::ETIMEDOUT) #for compatibility with previous versions
raise e, "Failed to open TCP connection to " +
"#{conn_addr}:#{conn_port} (#{e.message})"
end
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
D "opened"
if use_ssl?
if proxy?
plain_sock = BufferedIO.new(s, read_timeout: @read_timeout,
write_timeout: @write_timeout,
continue_timeout: @continue_timeout,
debug_output: @debug_output)
buf = "CONNECT #{conn_address}:#{@port} HTTP/#{HTTPVersion}\r\n"
buf << "Host: #{@address}:#{@port}\r\n"
if proxy_user
credential = ["#{proxy_user}:#{proxy_pass}"].pack('m0')
buf << "Proxy-Authorization: Basic #{credential}\r\n"
end
buf << "\r\n"
plain_sock.write(buf)
HTTPResponse.read_new(plain_sock).value
# assuming nothing left in buffers after successful CONNECT response
end
ssl_parameters = Hash.new
iv_list = instance_variables
SSL_IVNAMES.each_with_index do |ivname, i|
if iv_list.include?(ivname)
value = instance_variable_get(ivname)
unless value.nil?
ssl_parameters[SSL_ATTRIBUTES[i]] = value
end
end
end
@ssl_context.set_params(ssl_parameters)
@ssl_context.session_cache_mode =
OpenSSL::SSL::SSLContext::SESSION_CACHE_CLIENT |
OpenSSL::SSL::SSLContext::SESSION_CACHE_NO_INTERNAL_STORE
@ssl_context.session_new_cb = proc {|sock, sess| @ssl_session = sess }
D "starting SSL for #{conn_addr}:#{conn_port}..."
s = OpenSSL::SSL::SSLSocket.new(s, @ssl_context)
s.sync_close = true
# Server Name Indication (SNI) RFC 3546
s.hostname = @address if s.respond_to? :hostname=
if @ssl_session and
Process.clock_gettime(Process::CLOCK_REALTIME) < @ssl_session.time.to_f + @ssl_session.timeout
s.session = @ssl_session
end
ssl_socket_connect(s, @open_timeout)
if (@ssl_context.verify_mode != OpenSSL::SSL::VERIFY_NONE) && @ssl_context.verify_hostname
s.post_connection_check(@address)
end
D "SSL established, protocol: #{s.ssl_version}, cipher: #{s.cipher[0]}"
end
@socket = BufferedIO.new(s, read_timeout: @read_timeout,
write_timeout: @write_timeout,
continue_timeout: @continue_timeout,
debug_output: @debug_output)
@last_communicated = nil
on_connect
rescue => exception
if s
D "Conn close because of connect error #{exception}"
s.close
end
raise
end
private :connect
end
end
begin
sleep 0.2
connection = Net::HTTP.start(HOST, PORT)
connection.keep_alive_timeout = 3600
connection_opt = Net::HTTPOpt.start(HOST, PORT)
connection_opt.keep_alive_timeout = 3600
unless connection.request_get("/100k.txt").body == connection_opt.request_get("/100k.txt").body
abort("bug?")
end
if ARGV.first == "profile"
require 'stackprof'
require 'json'
StackProf.run(mode: :wall, out: "/tmp/stackprof-net-http.dump", raw: true) do
40_000.times do
connection.request_get("/1M.txt").body
end
end
File.write("/tmp/stackprof-net-http.json", JSON.dump(Marshal.load(File.binread("/tmp/stackprof-net-http.dump"))))
system("stackprof", "/tmp/stackprof-net-http.rb")
StackProf.run(mode: :wall, out: "/tmp/stackprof-net-http-opt.dump", raw: true) do
40_000.times do
connection_opt.request_get("/1M.txt").body
end
end
File.write("/tmp/stackprof-net-http-opt.json", JSON.dump(Marshal.load(File.binread("/tmp/stackprof-net-http-opt.dump"))))
system("stackprof", "/tmp/stackprof-net-http-opt.dump")
else
%w(1k 10k 100k 1M).each do |size|
puts "=== #{size} ==="
Benchmark.ips do |x|
path = "/#{size}.txt"
x.report("#{size}") { connection.request_get(path).body }
x.report("#{size} opt") { connection_opt.request_get(path).body }
x.compare!(order: :baseline)
end
puts
end
end
ensure
Process.kill('TERM', nginx_pid)
Process.wait(nginx_pid)
end
```
https://github.com/ruby/net-protocol/commit/781e400389
-rw-r--r-- | lib/net/protocol.rb | 82 |
1 files changed, 64 insertions, 18 deletions
diff --git a/lib/net/protocol.rb b/lib/net/protocol.rb index 822bc00574..c676854b67 100644 --- a/lib/net/protocol.rb +++ b/lib/net/protocol.rb @@ -120,6 +120,7 @@ module Net # :nodoc: @continue_timeout = continue_timeout @debug_output = debug_output @rbuf = ''.b + @rbuf_offset = 0 end attr_reader :io @@ -154,14 +155,15 @@ module Net # :nodoc: LOG "reading #{len} bytes..." read_bytes = 0 begin - while read_bytes + @rbuf.size < len - s = rbuf_consume(@rbuf.size) - read_bytes += s.size - dest << s + while read_bytes + rbuf_size < len + if s = rbuf_consume_all_shareable! + read_bytes += s.bytesize + dest << s + end rbuf_fill end s = rbuf_consume(len - read_bytes) - read_bytes += s.size + read_bytes += s.bytesize dest << s rescue EOFError raise unless ignore_eof @@ -175,9 +177,10 @@ module Net # :nodoc: read_bytes = 0 begin while true - s = rbuf_consume(@rbuf.size) - read_bytes += s.size - dest << s + if s = rbuf_consume_all_shareable! + read_bytes += s.bytesize + dest << s + end rbuf_fill end rescue EOFError @@ -188,14 +191,16 @@ module Net # :nodoc: end def readuntil(terminator, ignore_eof = false) + offset = @rbuf_offset begin - until idx = @rbuf.index(terminator) + until idx = @rbuf.index(terminator, offset) + offset = @rbuf.bytesize rbuf_fill end - return rbuf_consume(idx + terminator.size) + return rbuf_consume(idx + terminator.bytesize - @rbuf_offset) rescue EOFError raise unless ignore_eof - return rbuf_consume(@rbuf.size) + return rbuf_consume end end @@ -208,12 +213,16 @@ module Net # :nodoc: BUFSIZE = 1024 * 16 def rbuf_fill - tmp = @rbuf.empty? ? @rbuf : nil + tmp = @rbuf_empty ? @rbuf : nil case rv = @io.read_nonblock(BUFSIZE, tmp, exception: false) when String - return if rv.equal?(tmp) - @rbuf << rv - rv.clear + @rbuf_empty = false + if rv.equal?(tmp) + @rbuf_offset = 0 + else + @rbuf << rv + rv.clear + end return when :wait_readable (io = @io.to_io).wait_readable(@read_timeout) or raise Net::ReadTimeout.new(io) @@ -228,13 +237,50 @@ module Net # :nodoc: end while true end - def rbuf_consume(len) - if len == @rbuf.size + def rbuf_flush + if @rbuf_empty + @rbuf.clear + @rbuf_offset = 0 + end + nil + end + + def rbuf_size + @rbuf.bytesize - @rbuf_offset + end + + # Warning: this method may share the buffer to avoid + # copying. The caller must no longer use the returned + # string once rbuf_fill has been called again + def rbuf_consume_all_shareable! + @rbuf_empty = true + buf = if @rbuf_offset == 0 + @rbuf + else + @rbuf.byteslice(@rbuf_offset..-1) + end + @rbuf_offset = @rbuf.bytesize + buf + end + + def rbuf_consume(len = nil) + if @rbuf_offset == 0 && (len.nil? || len == @rbuf.bytesize) s = @rbuf @rbuf = ''.b + @rbuf_offset = 0 + @rbuf_empty = true + elsif len.nil? + s = @rbuf.byteslice(@rbuf_offset..-1) + @rbuf = ''.b + @rbuf_offset = 0 + @rbuf_empty = true else - s = @rbuf.slice!(0, len) + s = @rbuf.byteslice(@rbuf_offset, len) + @rbuf_offset += len + @rbuf_empty = @rbuf_offset == @rbuf.bytesize + rbuf_flush end + @debug_output << %Q[-> #{s.dump}\n] if @debug_output s end |