diff options
Diffstat (limited to 'lib/net/protocol.rb')
| -rw-r--r-- | lib/net/protocol.rb | 564 |
1 files changed, 337 insertions, 227 deletions
diff --git a/lib/net/protocol.rb b/lib/net/protocol.rb index 316f5e51a2..8c81298c0e 100644 --- a/lib/net/protocol.rb +++ b/lib/net/protocol.rb @@ -1,9 +1,10 @@ +# frozen_string_literal: true # # = net/protocol.rb # #-- -# Copyright (c) 1999-2003 Yukihiro Matsumoto -# Copyright (c) 1999-2003 Minero Aoki +# Copyright (c) 1999-2004 Yukihiro Matsumoto +# Copyright (c) 1999-2004 Minero Aoki # # written and maintained by Minero Aoki <aamine@loveruby.net> # @@ -20,20 +21,53 @@ require 'socket' require 'timeout' +require 'io/wait' module Net # :nodoc: class Protocol #:nodoc: internal use only + VERSION = "0.2.2" + private - def Protocol.protocol_param( name, val ) + def Protocol.protocol_param(name, val) module_eval(<<-End, __FILE__, __LINE__ + 1) def #{name} #{val} end End end + + def ssl_socket_connect(s, timeout) + if timeout + while true + raise Net::OpenTimeout if timeout <= 0 + start = Process.clock_gettime Process::CLOCK_MONOTONIC + # to_io is required because SSLSocket doesn't have wait_readable yet + case s.connect_nonblock(exception: false) + when :wait_readable; s.to_io.wait_readable(timeout) + when :wait_writable; s.to_io.wait_writable(timeout) + else; break + end + timeout -= Process.clock_gettime(Process::CLOCK_MONOTONIC) - start + end + else + s.connect + end + end + + tcp_socket_parameters = TCPSocket.instance_method(:initialize).parameters + TCP_SOCKET_NEW_HAS_OPEN_TIMEOUT = if tcp_socket_parameters != [[:rest]] + tcp_socket_parameters.include?([:key, :open_timeout]) + else + # Use Socket.tcp to find out since there is no parameters information for TCPSocket#initialize + # See discussion in https://github.com/ruby/net-http/pull/224 + Socket.method(:tcp).parameters.include?([:key, :open_timeout]) + end + private_constant :TCP_SOCKET_NEW_HAS_OPEN_TIMEOUT end + + # :stopdoc: class ProtocolError < StandardError; end class ProtoSyntaxError < ProtocolError; end class ProtoFatalError < ProtocolError; end @@ -43,338 +77,420 @@ module Net # :nodoc: class ProtoCommandError < ProtocolError; end class ProtoRetriableError < ProtocolError; end ProtocRetryError = ProtoRetriableError + # :startdoc: + + ## + # OpenTimeout, a subclass of Timeout::Error, is raised if a connection cannot + # be created within the open_timeout. + class OpenTimeout < Timeout::Error; end - class InternetMessageIO #:nodoc: internal use only + ## + # ReadTimeout, a subclass of Timeout::Error, is raised if a chunk of the + # response cannot be read within the read_timeout. - class << self - alias open new + class ReadTimeout < Timeout::Error + # :stopdoc: + def initialize(io = nil) + @io = io end + attr_reader :io - def initialize( addr, port, - open_timeout = nil, read_timeout = nil, - debug_output = nil ) - @address = addr - @port = port - @read_timeout = read_timeout - @debug_output = debug_output - @socket = nil - @rbuf = nil # read buffer - @wbuf = nil # write buffer - connect open_timeout - LOG 'opened' + def message + msg = super + if @io + msg = "#{msg} with #{@io.inspect}" + end + msg end + end - attr_reader :address - attr_reader :port + ## + # WriteTimeout, a subclass of Timeout::Error, is raised if a chunk of the + # response cannot be written within the write_timeout. Not raised on Windows. - def ip_address - return '' unless @socket - @socket.addr[3] + class WriteTimeout < Timeout::Error + # :stopdoc: + def initialize(io = nil) + @io = io end + attr_reader :io - attr_accessor :read_timeout + def message + msg = super + if @io + msg = "#{msg} with #{@io.inspect}" + end + msg + end + end - attr_reader :socket - def connect( open_timeout ) - LOG "opening connection to #{@address}..." - timeout(open_timeout) { - @socket = TCPsocket.new(@address, @port) - } - @rbuf = '' + class BufferedIO #:nodoc: internal use only + def initialize(io, read_timeout: 60, write_timeout: 60, continue_timeout: nil, debug_output: nil) + @io = io + @read_timeout = read_timeout + @write_timeout = write_timeout + @continue_timeout = continue_timeout + @debug_output = debug_output + @rbuf = ''.b + @rbuf_empty = true + @rbuf_offset = 0 end - private :connect - def close - if @socket - @socket.close - LOG 'closed' - else - LOG 'close call for already closed socket' - end - @socket = nil - @rbuf = '' + attr_reader :io + attr_accessor :read_timeout + attr_accessor :write_timeout + attr_accessor :continue_timeout + attr_accessor :debug_output + + def inspect + "#<#{self.class} io=#{@io}>" end - def reopen( open_timeout = nil ) - LOG 'reopening...' - close - connect open_timeout - LOG 'reopened' + def eof? + @io.eof? end def closed? - not @socket + @io.closed? end - def inspect - "#<#{self.class} #{closed?() ? 'closed' : 'opened'}>" + def close + @io.close end - ### - ### READ - ### + # + # Read + # public - def read( len, dest = '', ignore_eof = false ) + def read(len, dest = ''.b, ignore_eof = false) LOG "reading #{len} bytes..." - # LOG_off() # experimental: [ruby-list:38800] read_bytes = 0 begin - while read_bytes + @rbuf.size < len - read_bytes += rbuf_moveto(dest, @rbuf.size) + while read_bytes + rbuf_size < len + if s = rbuf_consume_all + read_bytes += s.bytesize + dest << s + end rbuf_fill end - rbuf_moveto dest, len - read_bytes + s = rbuf_consume(len - read_bytes) + read_bytes += s.bytesize + dest << s rescue EOFError raise unless ignore_eof end - # LOG_on() LOG "read #{read_bytes} bytes" dest end - def read_all( dest = '' ) + def read_all(dest = ''.b) LOG 'reading all...' - # LOG_off() # experimental: [ruby-list:38800] read_bytes = 0 begin while true - read_bytes += rbuf_moveto(dest, @rbuf.size) + if s = rbuf_consume_all + read_bytes += s.bytesize + dest << s + end rbuf_fill end rescue EOFError ; end - # LOG_on() LOG "read #{read_bytes} bytes" dest end - def readuntil( terminator, ignore_eof = false ) - dest = '' + def readuntil(terminator, ignore_eof = false) + offset = @rbuf_offset begin - until idx = @rbuf.index(terminator) + until idx = @rbuf.index(terminator, offset) + offset = @rbuf.bytesize rbuf_fill end - rbuf_moveto dest, idx + terminator.size + return rbuf_consume(idx + terminator.bytesize - @rbuf_offset) rescue EOFError raise unless ignore_eof - rbuf_moveto dest, @rbuf.size + return rbuf_consume end - dest end - + def readline readuntil("\n").chop end - def each_message_chunk - LOG 'reading message...' - LOG_off() - read_bytes = 0 - while (line = readuntil("\r\n")) != ".\r\n" - read_bytes += line.size - yield line.sub(/\A\./, '') + private + + BUFSIZE = 1024 * 16 + + def rbuf_fill + tmp = @rbuf_empty ? @rbuf : nil + case rv = @io.read_nonblock(BUFSIZE, tmp, exception: false) + when String + @rbuf_empty = false + if rv.equal?(tmp) + @rbuf_offset = 0 + else + @rbuf << rv + rv.clear + end + return + when :wait_readable + (io = @io.to_io).wait_readable(@read_timeout) or raise Net::ReadTimeout.new(io) + # continue looping + when :wait_writable + # OpenSSL::Buffering#read_nonblock may fail with IO::WaitWritable. + # http://www.openssl.org/support/faq.html#PROG10 + (io = @io.to_io).wait_writable(@read_timeout) or raise Net::ReadTimeout.new(io) + # continue looping + when nil + raise EOFError, 'end of file reached' + end while true + end + + def rbuf_flush + if @rbuf_empty + @rbuf.clear + @rbuf_offset = 0 end - LOG_on() - LOG "read message (#{read_bytes} bytes)" + nil end - - # *library private* (cannot handle 'break') - def each_list_item - while (str = readuntil("\r\n")) != ".\r\n" - yield str.chop - end + + def rbuf_size + @rbuf.bytesize - @rbuf_offset end - private + def rbuf_consume_all + rbuf_consume if rbuf_size > 0 + end - def rbuf_fill - until IO.select([@socket], nil, nil, @read_timeout) - raise TimeoutError, "socket read timeout (#{@read_timeout} sec)" + def rbuf_consume(len = nil) + if @rbuf_offset == 0 && (len.nil? || len == @rbuf.bytesize) + s = @rbuf + @rbuf = ''.b + @rbuf_offset = 0 + @rbuf_empty = true + elsif len.nil? + s = @rbuf.byteslice(@rbuf_offset..-1) + @rbuf = ''.b + @rbuf_offset = 0 + @rbuf_empty = true + else + s = @rbuf.byteslice(@rbuf_offset, len) + @rbuf_offset += len + @rbuf_empty = @rbuf_offset == @rbuf.bytesize + rbuf_flush end - @rbuf << @socket.sysread(1024) - end - def rbuf_moveto( dest, len ) - dest << (s = @rbuf.slice!(0, len)) @debug_output << %Q[-> #{s.dump}\n] if @debug_output - len + s end - ### - ### WRITE - ### + # + # Write + # public - def write( str ) + def write(*strs) writing { - write0 str + write0(*strs) } end - def writeline( str ) + alias << write + + def writeline(str) writing { write0 str + "\r\n" } end - def write_message( src ) - LOG "writing message from #{src.class}" - LOG_off() - len = using_each_crlf_line { - write_message_0 src - } - LOG_on() - LOG "wrote #{len} bytes" - len - end - - def write_message_by_block( &block ) - LOG 'writing message from block' - LOG_off() - len = using_each_crlf_line { - begin - block.call(WriteAdapter.new(self, :write_message_0)) - rescue LocalJumpError - # allow `break' from writer block - end - } - LOG_on() - LOG "wrote #{len} bytes" - len - end - private def writing @written_bytes = 0 @debug_output << '<- ' if @debug_output yield - @socket.flush @debug_output << "\n" if @debug_output bytes = @written_bytes @written_bytes = nil bytes end - def write0( str ) - @debug_output << str.dump if @debug_output - len = @socket.write(str) - @written_bytes += len - len + def write0(*strs) + @debug_output << strs.map(&:dump).join if @debug_output + orig_written_bytes = @written_bytes + strs.each_with_index do |str, i| + need_retry = true + case len = @io.write_nonblock(str, exception: false) + when Integer + @written_bytes += len + len -= str.bytesize + if len == 0 + if strs.size == i+1 + return @written_bytes - orig_written_bytes + else + need_retry = false + # next string + end + elsif len < 0 + str = str.byteslice(len, -len) + else # len > 0 + need_retry = false + # next string + end + # continue looping + when :wait_writable + (io = @io.to_io).wait_writable(@write_timeout) or raise Net::WriteTimeout.new(io) + # continue looping + end while need_retry + end + end + + # + # Logging + # + + private + + def LOG_off + @save_debug_out = @debug_output + @debug_output = nil + end + + def LOG_on + @debug_output = @save_debug_out + end + + def LOG(msg) + return unless @debug_output + @debug_output << msg + "\n" + end + end + + + class InternetMessageIO < BufferedIO #:nodoc: internal use only + def initialize(*, **) + super + @wbuf = nil end # - # Reads string from src calling :each, and write to @socket. - # Escapes '.' on the each line head. + # Read # - def write_message_0( src ) + + def each_message_chunk + LOG 'reading message...' + LOG_off() + read_bytes = 0 + while (line = readuntil("\r\n")) != ".\r\n" + read_bytes += line.size + yield line.delete_prefix('.') + end + LOG_on() + LOG "read message (#{read_bytes} bytes)" + end + + # *library private* (cannot handle 'break') + def each_list_item + while (str = readuntil("\r\n")) != ".\r\n" + yield str.chop + end + end + + def write_message_0(src) prev = @written_bytes each_crlf_line(src) do |line| - if line[0] == ?. - then write0 '.' + line - else write0 line - end + write0 dot_stuff(line) end @written_bytes - prev end # - # setup @wbuf for each_crlf_line. + # Write # - def using_each_crlf_line - writing { - @wbuf = '' - yield - if not @wbuf.empty? # unterminated last line - if @wbuf[-1] == ?\r - @wbuf.chop! - end - @wbuf.concat "\r\n" - write0 @wbuf - elsif @written_bytes == 0 # empty src - write0 "\r\n" - end - write0 ".\r\n" - @wbuf = nil + + def write_message(src) + LOG "writing message from #{src.class}" + LOG_off() + len = writing { + using_each_crlf_line { + write_message_0 src + } } + LOG_on() + LOG "wrote #{len} bytes" + len end - # - # extract a CR-LF-terminating-line from @wbuf and yield it. - # - def each_crlf_line( src ) - adding(src) do - beg = 0 - buf = @wbuf - while buf.index(/\n|\r\n|\r/, beg) - m = Regexp.last_match - if (m.begin(0) == buf.length - 1) and buf[-1] == ?\r - # "...\r" : can follow "\n..." - break + def write_message_by_block(&block) + LOG 'writing message from block' + LOG_off() + len = writing { + using_each_crlf_line { + begin + block.call(WriteAdapter.new(self.method(:write_message_0))) + rescue LocalJumpError + # allow `break' from writer block end - str = buf[beg ... m.begin(0)] - str.concat "\r\n" - yield str - beg = m.end(0) + } + } + LOG_on() + LOG "wrote #{len} bytes" + len + end + + private + + def dot_stuff(s) + s.sub(/\A\./, '..') + end + + def using_each_crlf_line + @wbuf = ''.b + yield + if not @wbuf.empty? # unterminated last line + write0 dot_stuff(@wbuf.chomp) + "\r\n" + elsif @written_bytes == 0 # empty src + write0 "\r\n" + end + write0 ".\r\n" + @wbuf = nil + end + + def each_crlf_line(src) + buffer_filling(@wbuf, src) do + while line = @wbuf.slice!(/\A[^\r\n]*(?:\n|\r(?:\n|(?!\z)))/) + yield line.chomp("\n") + "\r\n" end - @wbuf = buf[beg ... buf.length] end end - # - # Reads strings from SRC and add to @wbuf, then yield. - # - def adding( src ) + def buffer_filling(buf, src) case src when String # for speeding up. - 0.step(src.size - 1, 2048) do |i| - @wbuf << src[i,2048] + 0.step(src.size - 1, 1024) do |i| + buf << src[i, 1024] yield end - when File # for speeding up. - while s = src.read(2048) - s[0,0] = @wbuf - @wbuf = s + while s = src.read(1024) + buf << s yield end - else # generic reader - src.each do |s| - @wbuf << s - yield if @wbuf.size > 2048 + src.each do |str| + buf << str + yield if buf.size > 1024 end - yield unless @wbuf.empty? + yield unless buf.empty? end end - - ### - ### DEBUG - ### - - private - - def LOG_off - @save_debug_out = @debug_output - @debug_output = nil - end - - def LOG_on - @debug_output = @save_debug_out - end - - def LOG( msg ) - return unless @debug_output - @debug_output << msg - @debug_output << "\n" - end - end @@ -382,41 +498,38 @@ module Net # :nodoc: # The writer adapter class # class WriteAdapter - - def initialize( sock, mid ) - @socket = sock - @method_id = mid + # :stopdoc: + def initialize(writer) + @writer = writer end def inspect - "#<#{self.class} socket=#{@socket.inspect}>" + "#<#{self.class} writer=#{@writer.inspect}>" end - def write( str ) - @socket.__send__(@method_id, str) + def write(str) + @writer.call(str) end alias print write - def <<( str ) + def <<(str) write str self end - def puts( str = '' ) - write str.sub(/\n?/, "\n") + def puts(str = '') + write str.chomp("\n") + "\n" end - def printf( *args ) + def printf(*args) write sprintf(*args) end - end class ReadAdapter #:nodoc: internal use only - - def initialize( block ) + def initialize(block) @block = block end @@ -424,21 +537,18 @@ module Net # :nodoc: "#<#{self.class}>" end - def <<( str ) + def <<(str) call_block(str, &@block) if @block end private - # # This method is needed because @block must be called by yield, # not Proc#call. You can see difference when using `break' in # the block. - # - def call_block( str ) + def call_block(str) yield str end - end |
