diff options
Diffstat (limited to 'lib/net/protocol.rb')
| -rw-r--r-- | lib/net/protocol.rb | 933 |
1 files changed, 356 insertions, 577 deletions
diff --git a/lib/net/protocol.rb b/lib/net/protocol.rb index 9da1ad8909..8c81298c0e 100644 --- a/lib/net/protocol.rb +++ b/lib/net/protocol.rb @@ -1,233 +1,73 @@ -=begin - -= net/protocol.rb - -Copyright (c) 1999-2002 Yukihiro Matsumoto - -written & maintained by Minero Aoki <aamine@loveruby.net> - -This program is free software. You can re-distribute and/or -modify this program under the same terms as Ruby itself, -Ruby Distribute License or GNU General Public License. - -NOTE: You can find Japanese version of this document in -the doc/net directory of the standard ruby interpreter package. - -$Id$ - -=end +# frozen_string_literal: true +# +# = net/protocol.rb +# +#-- +# Copyright (c) 1999-2004 Yukihiro Matsumoto +# Copyright (c) 1999-2004 Minero Aoki +# +# written and maintained by Minero Aoki <aamine@loveruby.net> +# +# This program is free software. You can re-distribute and/or +# modify this program under the same terms as Ruby itself, +# Ruby Distribute License or GNU General Public License. +# +# $Id$ +#++ +# +# WARNING: This file is going to remove. +# Do not rely on the implementation written in this file. +# require 'socket' require 'timeout' +require 'io/wait' +module Net # :nodoc: -module Net - - class Protocol - - Version = '1.2.3' - Revision = '$Revision$'.slice(/[\d\.]+/) - - - class << self - - def port - default_port - end - - private - - def protocol_param( name, val ) - module_eval <<-EOS, __FILE__, __LINE__ + 1 - def self.#{name.id2name} - #{val} - end - EOS - end - - end - - - # - # --- Configuration Staffs for Sub Classes --- - # - # class method default_port - # class method command_type - # class method socket_type - # - # private method do_start - # private method do_finish - # - # private method conn_address - # private method conn_port - # - - - def Protocol.start( address, port = nil, *args ) - instance = new(address, port) - if block_given? - instance.start(*args) { - return yield(instance) - } - else - instance.start(*args) - instance - end - end - - def initialize( addr, port = nil ) - @address = addr - @port = port || self.class.default_port - - @command = nil - @socket = nil - - @started = false - - @open_timeout = 30 - @read_timeout = 60 - - @debug_output = nil - end - - attr_reader :address - attr_reader :port - - attr_reader :command - attr_reader :socket - - attr_accessor :open_timeout - - attr_reader :read_timeout - - def read_timeout=( sec ) - @socket.read_timeout = sec if @socket - @read_timeout = sec - end - - def started? - @started - end - - alias active? started? - - def set_debug_output( arg ) # un-documented - @debug_output = arg - end + class Protocol #:nodoc: internal use only + VERSION = "0.2.2" - def inspect - "#<#{self.class} #{@address}:#{@port} open=#{active?}>" + private + def Protocol.protocol_param(name, val) + module_eval(<<-End, __FILE__, __LINE__ + 1) + def #{name} + #{val} + end + End end - # - # open - # - - def start( *args ) - @started and raise IOError, 'protocol has been opened already' - - if block_given? - begin - do_start(*args) - @started = true - return yield(self) - ensure - finish if @started + def ssl_socket_connect(s, timeout) + if timeout + while true + raise Net::OpenTimeout if timeout <= 0 + start = Process.clock_gettime Process::CLOCK_MONOTONIC + # to_io is required because SSLSocket doesn't have wait_readable yet + case s.connect_nonblock(exception: false) + when :wait_readable; s.to_io.wait_readable(timeout) + when :wait_writable; s.to_io.wait_writable(timeout) + else; break + end + timeout -= Process.clock_gettime(Process::CLOCK_MONOTONIC) - start end + else + s.connect end - - do_start(*args) - @started = true - self - end - - private - - # abstract do_start() - - def conn_socket - @socket = self.class.socket_type.open( - conn_address(), conn_port(), - @open_timeout, @read_timeout, @debug_output ) - on_connect - end - - alias conn_address address - alias conn_port port - - def reconn_socket - @socket.reopen @open_timeout - on_connect - end - - def conn_command - @command = self.class.command_type.new(@socket) - end - - def on_connect - end - - # - # close - # - - public - - def finish - raise IOError, 'closing already closed protocol' unless @started - do_finish - @started = false - nil - end - - private - - # abstract do_finish() - - def disconn_command - @command.quit if @command and not @command.critical? - @command = nil - end - - def disconn_socket - @socket.close if @socket and not @socket.closed? - @socket = nil - end - - end - - Session = Protocol - - - class Response - - def initialize( ctype, code, msg ) - @code_type = ctype - @code = code - @message = msg - super() - end - - attr_reader :code_type - attr_reader :code - attr_reader :message - alias msg message - - def inspect - "#<#{self.class} #{@code}>" - end - - def error! - raise error_type().new(code + ' ' + @message.dump, self) end - def error_type - @code_type.error_type + tcp_socket_parameters = TCPSocket.instance_method(:initialize).parameters + TCP_SOCKET_NEW_HAS_OPEN_TIMEOUT = if tcp_socket_parameters != [[:rest]] + tcp_socket_parameters.include?([:key, :open_timeout]) + else + # Use Socket.tcp to find out since there is no parameters information for TCPSocket#initialize + # See discussion in https://github.com/ruby/net-http/pull/224 + Socket.method(:tcp).parameters.include?([:key, :open_timeout]) end - + private_constant :TCP_SOCKET_NEW_HAS_OPEN_TIMEOUT end + # :stopdoc: class ProtocolError < StandardError; end class ProtoSyntaxError < ProtocolError; end class ProtoFatalError < ProtocolError; end @@ -237,515 +77,459 @@ module Net class ProtoCommandError < ProtocolError; end class ProtoRetriableError < ProtocolError; end ProtocRetryError = ProtoRetriableError + # :startdoc: - class ProtocolError - - def initialize( msg, resp ) - super msg - @response = resp - end - - attr_reader :response - alias data response - - def inspect - "#<#{self.class} #{self.message}>" - end - - end - + ## + # OpenTimeout, a subclass of Timeout::Error, is raised if a connection cannot + # be created within the open_timeout. - class Code - - def initialize( paren, err ) - @parents = [self] + paren - @error_type = err - end - - def parents - @parents.dup - end + class OpenTimeout < Timeout::Error; end - attr_reader :error_type + ## + # ReadTimeout, a subclass of Timeout::Error, is raised if a chunk of the + # response cannot be read within the read_timeout. - def inspect - "#<#{self.class} #{sprintf '0x%x', __id__}>" + class ReadTimeout < Timeout::Error + # :stopdoc: + def initialize(io = nil) + @io = io end + attr_reader :io - def ===( response ) - response.code_type.parents.each do |c| - return true if c == self + def message + msg = super + if @io + msg = "#{msg} with #{@io.inspect}" end - false - end - - def mkchild( err = nil ) - self.class.new(@parents, err || @error_type) + msg end - end - - ReplyCode = Code.new([], ProtoUnknownError) - InformationCode = ReplyCode.mkchild(ProtoUnknownError) - SuccessCode = ReplyCode.mkchild(ProtoUnknownError) - ContinueCode = ReplyCode.mkchild(ProtoUnknownError) - ErrorCode = ReplyCode.mkchild(ProtocolError) - SyntaxErrorCode = ErrorCode.mkchild(ProtoSyntaxError) - FatalErrorCode = ErrorCode.mkchild(ProtoFatalError) - ServerErrorCode = ErrorCode.mkchild(ProtoServerError) - AuthErrorCode = ErrorCode.mkchild(ProtoAuthError) - RetriableCode = ReplyCode.mkchild(ProtoRetriableError) - UnknownCode = ReplyCode.mkchild(ProtoUnknownError) - - - class Command - - def initialize( sock ) - @socket = sock - @last_reply = nil - @atomic = false - end - - attr_accessor :socket - attr_reader :last_reply - def inspect - "#<#{self.class} socket=#{@socket.inspect} critical=#{@atomic}>" - end - - # abstract quit() - - private + ## + # WriteTimeout, a subclass of Timeout::Error, is raised if a chunk of the + # response cannot be written within the write_timeout. Not raised on Windows. - def check_reply( *oks ) - @last_reply = get_reply() - reply_must @last_reply, *oks + class WriteTimeout < Timeout::Error + # :stopdoc: + def initialize(io = nil) + @io = io end + attr_reader :io - # abstract get_reply() - - def reply_must( rep, *oks ) - oks.each do |i| - return rep if i === rep + def message + msg = super + if @io + msg = "#{msg} with #{@io.inspect}" end - rep.error! - end - - def getok( line, expect = SuccessCode ) - @socket.writeline line - check_reply expect + msg end - - # - # critical session - # - - public - - def critical? - @atomic - end - - def error_ok - @atomic = false - end - - private - - def atomic - @atomic = true - ret = yield - @atomic = false - ret - end - end - class InternetMessageIO - - class << self - alias open new - end - - def initialize( addr, port, otime = nil, rtime = nil, dout = nil ) - @address = addr - @port = port - @read_timeout = rtime - @debug_output = dout - - @socket = nil - @rbuf = nil - - connect otime - D 'opened' - end - - attr_reader :address - attr_reader :port - - def ip_address - @socket or return '' - @socket.addr[3] + class BufferedIO #:nodoc: internal use only + def initialize(io, read_timeout: 60, write_timeout: 60, continue_timeout: nil, debug_output: nil) + @io = io + @read_timeout = read_timeout + @write_timeout = write_timeout + @continue_timeout = continue_timeout + @debug_output = debug_output + @rbuf = ''.b + @rbuf_empty = true + @rbuf_offset = 0 end + attr_reader :io attr_accessor :read_timeout + attr_accessor :write_timeout + attr_accessor :continue_timeout + attr_accessor :debug_output - attr_reader :socket - - def connect( otime ) - D "opening connection to #{@address}..." - timeout(otime) { - @socket = TCPsocket.new(@address, @port) - } - @rbuf = '' - end - private :connect - - def close - if @socket - @socket.close - D 'closed' - else - D 'close call for already closed socket' - end - @socket = nil - @rbuf = '' + def inspect + "#<#{self.class} io=#{@io}>" end - def reopen( otime = nil ) - D 'reopening...' - close - connect otime - D 'reopened' + def eof? + @io.eof? end def closed? - not @socket + @io.closed? end - def inspect - "#<#{self.class} #{closed? ? 'closed' : 'opened'}>" + def close + @io.close end - ### - ### READ - ### + # + # Read + # public - def read( len, dest = '', ignore = false ) - D_off "reading #{len} bytes..." - - rsize = 0 + def read(len, dest = ''.b, ignore_eof = false) + LOG "reading #{len} bytes..." + read_bytes = 0 begin - while rsize + @rbuf.size < len - rsize += rbuf_moveto(dest, @rbuf.size) + while read_bytes + rbuf_size < len + if s = rbuf_consume_all + read_bytes += s.bytesize + dest << s + end rbuf_fill end - rbuf_moveto dest, len - rsize + s = rbuf_consume(len - read_bytes) + read_bytes += s.bytesize + dest << s rescue EOFError - raise unless ignore + raise unless ignore_eof end - - D_on "read #{len} bytes" + LOG "read #{read_bytes} bytes" dest end - def read_all( dest = '' ) - D_off 'reading all...' - - rsize = 0 + def read_all(dest = ''.b) + LOG 'reading all...' + read_bytes = 0 begin while true - rsize += rbuf_moveto(dest, @rbuf.size) + if s = rbuf_consume_all + read_bytes += s.bytesize + dest << s + end rbuf_fill end rescue EOFError ; end - - D_on "read #{rsize} bytes" + LOG "read #{read_bytes} bytes" dest end - def readuntil( target, ignore = false ) - dest = '' + def readuntil(terminator, ignore_eof = false) + offset = @rbuf_offset begin - until idx = @rbuf.index(target) + until idx = @rbuf.index(terminator, offset) + offset = @rbuf.bytesize rbuf_fill end - rbuf_moveto dest, idx + target.size + return rbuf_consume(idx + terminator.bytesize - @rbuf_offset) rescue EOFError - raise unless ignore - rbuf_moveto dest, @rbuf.size + raise unless ignore_eof + return rbuf_consume end - dest end - + def readline - ret = readuntil("\n") - ret.chop! - ret + readuntil("\n").chop end private - BLOCK_SIZE = 1024 + BUFSIZE = 1024 * 16 def rbuf_fill - until IO.select [@socket], nil, nil, @read_timeout - on_read_timeout + tmp = @rbuf_empty ? @rbuf : nil + case rv = @io.read_nonblock(BUFSIZE, tmp, exception: false) + when String + @rbuf_empty = false + if rv.equal?(tmp) + @rbuf_offset = 0 + else + @rbuf << rv + rv.clear + end + return + when :wait_readable + (io = @io.to_io).wait_readable(@read_timeout) or raise Net::ReadTimeout.new(io) + # continue looping + when :wait_writable + # OpenSSL::Buffering#read_nonblock may fail with IO::WaitWritable. + # http://www.openssl.org/support/faq.html#PROG10 + (io = @io.to_io).wait_writable(@read_timeout) or raise Net::ReadTimeout.new(io) + # continue looping + when nil + raise EOFError, 'end of file reached' + end while true + end + + def rbuf_flush + if @rbuf_empty + @rbuf.clear + @rbuf_offset = 0 end - @rbuf << @socket.sysread(BLOCK_SIZE) + nil end - def on_read_timeout - raise TimeoutError, "socket read timeout (#{@read_timeout} sec)" + def rbuf_size + @rbuf.bytesize - @rbuf_offset end - def rbuf_moveto( dest, len ) - dest << (s = @rbuf.slice!(0, len)) - @debug_output << %Q[-> #{s.dump}\n] if @debug_output - len + def rbuf_consume_all + rbuf_consume if rbuf_size > 0 end - # - # message read - # - - public - - def read_message_to( dest ) - D_off 'reading text...' - - rsize = 0 - while (str = readuntil("\r\n")) != ".\r\n" - rsize += str.size - dest << str.sub(/\A\./, '') + def rbuf_consume(len = nil) + if @rbuf_offset == 0 && (len.nil? || len == @rbuf.bytesize) + s = @rbuf + @rbuf = ''.b + @rbuf_offset = 0 + @rbuf_empty = true + elsif len.nil? + s = @rbuf.byteslice(@rbuf_offset..-1) + @rbuf = ''.b + @rbuf_offset = 0 + @rbuf_empty = true + else + s = @rbuf.byteslice(@rbuf_offset, len) + @rbuf_offset += len + @rbuf_empty = @rbuf_offset == @rbuf.bytesize + rbuf_flush end - D_on "read #{rsize} bytes" - dest - end - - # private use only (cannot handle 'break') - def each_list_item - while (str = readuntil("\r\n")) != ".\r\n" - yield str.chop - end + @debug_output << %Q[-> #{s.dump}\n] if @debug_output + s end - - ### - ### WRITE - ### - # - # basic write + # Write # public - def write( str ) + def write(*strs) writing { - do_write str + write0(*strs) } end - def writeline( str ) + alias << write + + def writeline(str) writing { - do_write str + "\r\n" + write0 str + "\r\n" } end private def writing - @writtensize = 0 + @written_bytes = 0 @debug_output << '<- ' if @debug_output yield - @socket.flush @debug_output << "\n" if @debug_output - @writtensize - end - - def do_write( str ) - @debug_output << str.dump if @debug_output - @writtensize += (n = @socket.write(str)) - n + bytes = @written_bytes + @written_bytes = nil + bytes + end + + def write0(*strs) + @debug_output << strs.map(&:dump).join if @debug_output + orig_written_bytes = @written_bytes + strs.each_with_index do |str, i| + need_retry = true + case len = @io.write_nonblock(str, exception: false) + when Integer + @written_bytes += len + len -= str.bytesize + if len == 0 + if strs.size == i+1 + return @written_bytes - orig_written_bytes + else + need_retry = false + # next string + end + elsif len < 0 + str = str.byteslice(len, -len) + else # len > 0 + need_retry = false + # next string + end + # continue looping + when :wait_writable + (io = @io.to_io).wait_writable(@write_timeout) or raise Net::WriteTimeout.new(io) + # continue looping + end while need_retry + end end # - # message write + # Logging # - public + private - def write_message( src ) - D_off "writing text from #{src.class}" + def LOG_off + @save_debug_out = @debug_output + @debug_output = nil + end - wsize = using_each_crlf_line { - wpend_in src - } + def LOG_on + @debug_output = @save_debug_out + end - D_on "wrote #{wsize} bytes text" - wsize + def LOG(msg) + return unless @debug_output + @debug_output << msg + "\n" end + end - def through_message - D_off 'writing text from block' - wsize = using_each_crlf_line { - yield WriteAdapter.new(self, :wpend_in) - } + class InternetMessageIO < BufferedIO #:nodoc: internal use only + def initialize(*, **) + super + @wbuf = nil + end + + # + # Read + # - D_on "wrote #{wsize} bytes text" - wsize + def each_message_chunk + LOG 'reading message...' + LOG_off() + read_bytes = 0 + while (line = readuntil("\r\n")) != ".\r\n" + read_bytes += line.size + yield line.delete_prefix('.') + end + LOG_on() + LOG "read message (#{read_bytes} bytes)" end - private + # *library private* (cannot handle 'break') + def each_list_item + while (str = readuntil("\r\n")) != ".\r\n" + yield str.chop + end + end - def wpend_in( src ) - line = nil - pre = @writtensize + def write_message_0(src) + prev = @written_bytes each_crlf_line(src) do |line| - do_write '.' if line[0] == ?. - do_write line + write0 dot_stuff(line) end - - @writtensize - pre + @written_bytes - prev end - def using_each_crlf_line - writing { - @wbuf = '' + # + # Write + # - yield + def write_message(src) + LOG "writing message from #{src.class}" + LOG_off() + len = writing { + using_each_crlf_line { + write_message_0 src + } + } + LOG_on() + LOG "wrote #{len} bytes" + len + end - if not @wbuf.empty? # unterminated last line - if @wbuf[-1] == ?\r - @wbuf.chop! - end - @wbuf.concat "\r\n" - do_write @wbuf - elsif @writtensize == 0 # empty src - do_write "\r\n" + def write_message_by_block(&block) + LOG 'writing message from block' + LOG_off() + len = writing { + using_each_crlf_line { + begin + block.call(WriteAdapter.new(self.method(:write_message_0))) + rescue LocalJumpError + # allow `break' from writer block end - do_write ".\r\n" - - @wbuf = nil + } } + LOG_on() + LOG "wrote #{len} bytes" + len end - def each_crlf_line( src ) - str = m = beg = nil + private + + def dot_stuff(s) + s.sub(/\A\./, '..') + end - adding(src) do - beg = 0 - buf = @wbuf - while buf.index(/\n|\r\n|\r/, beg) - m = Regexp.last_match - if m.begin(0) == buf.size - 1 and buf[-1] == ?\r - # "...\r" : can follow "\n..." - break - end - str = buf[ beg ... m.begin(0) ] - str.concat "\r\n" - yield str - beg = m.end(0) - end - @wbuf = buf[ beg ... buf.size ] + def using_each_crlf_line + @wbuf = ''.b + yield + if not @wbuf.empty? # unterminated last line + write0 dot_stuff(@wbuf.chomp) + "\r\n" + elsif @written_bytes == 0 # empty src + write0 "\r\n" end + write0 ".\r\n" + @wbuf = nil end - def adding( src ) - i = s = nil + def each_crlf_line(src) + buffer_filling(@wbuf, src) do + while line = @wbuf.slice!(/\A[^\r\n]*(?:\n|\r(?:\n|(?!\z)))/) + yield line.chomp("\n") + "\r\n" + end + end + end + def buffer_filling(buf, src) case src - when String - 0.step(src.size - 1, 2048) do |i| - @wbuf << src[i,2048] + when String # for speeding up. + 0.step(src.size - 1, 1024) do |i| + buf << src[i, 1024] yield end - - when File - while s = src.read(2048) - s[0,0] = @wbuf - @wbuf = s + when File # for speeding up. + while s = src.read(1024) + buf << s yield end - - else - src.each do |s| - @wbuf << s - yield if @wbuf.size > 2048 + else # generic reader + src.each do |str| + buf << str + yield if buf.size > 1024 end - yield unless @wbuf.empty? + yield unless buf.empty? end end - - ### - ### DEBUG - ### - - private - - def D_off( msg ) - D msg - @savedo, @debug_output = @debug_output, nil - end - - def D_on( msg ) - @debug_output = @savedo - D msg - end - - def D( msg ) - @debug_output or return - @debug_output << msg - @debug_output << "\n" - end - end + # + # The writer adapter class + # class WriteAdapter - - def initialize( sock, mid ) - @socket = sock - @mid = mid + # :stopdoc: + def initialize(writer) + @writer = writer end def inspect - "#<#{self.class} socket=#{@socket.inspect}>" + "#<#{self.class} writer=#{@writer.inspect}>" end - def write( str ) - @socket.__send__ @mid, str + def write(str) + @writer.call(str) end alias print write - def <<( str ) + def <<(str) write str self end - def puts( str = '' ) - write str.sub(/\n?/, "\n") + def puts(str = '') + write str.chomp("\n") + "\n" end - def printf( *args ) + def printf(*args) write sprintf(*args) end - end - class ReadAdapter - - def initialize( block ) + class ReadAdapter #:nodoc: internal use only + def initialize(block) @block = block end @@ -753,28 +537,23 @@ module Net "#<#{self.class}>" end - def <<( str ) + def <<(str) call_block(str, &@block) if @block end private - def call_block( str ) + # This method is needed because @block must be called by yield, + # not Proc#call. You can see difference when using `break' in + # the block. + def call_block(str) yield str end - end - # for backward compatibility - module NetPrivate - Response = ::Net::Response - Command = ::Net::Command + module NetPrivate #:nodoc: obsolete Socket = ::Net::InternetMessageIO - BufferedSocket = ::Net::InternetMessageIO - WriteAdapter = ::Net::WriteAdapter - ReadAdapter = ::Net::ReadAdapter end - BufferedSocket = ::Net::InternetMessageIO end # module Net |
