diff options
author | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2021-12-18 18:19:30 +1300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-18 18:19:30 +1300 |
commit | 42d32311541e58503b885b09b469948922650c66 (patch) | |
tree | 235c0405be8a02e2fb2612b5daf9694d01ebd2fe | |
parent | 922a81a99418c992f4039b27a8341f2ee96d2d0c (diff) |
Introduce io_result wrapper for passing `[-errno, size]` in VALUE.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/5287
Merged-By: ioquatix <samuel@codeotaku.com>
-rw-r--r-- | include/ruby/fiber/scheduler.h | 54 | ||||
-rw-r--r-- | io.c | 121 | ||||
-rw-r--r-- | test/fiber/scheduler.rb | 82 | ||||
-rw-r--r-- | test/fiber/test_io.rb | 32 | ||||
-rw-r--r-- | test/fiber/test_io_buffer.rb | 114 |
5 files changed, 336 insertions, 67 deletions
diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h index 8294442216..ff587e28c0 100644 --- a/include/ruby/fiber/scheduler.h +++ b/include/ruby/fiber/scheduler.h @@ -11,18 +11,68 @@ */ #include "ruby/internal/config.h" +#include <errno.h> + #ifdef STDC_HEADERS #include <stddef.h> /* size_t */ #endif #include "ruby/ruby.h" #include "ruby/internal/dllexport.h" +#include "ruby/internal/arithmetic.h" RBIMPL_SYMBOL_EXPORT_BEGIN() struct timeval; /** + * Wrap a `ssize_t` and `int errno` into a single `VALUE`. This interface should + * be used to safely capture results from system calls like `read` and `write`. + * + * You should use `rb_fiber_scheduler_io_result_apply` to unpack the result of + * this value and update `int errno`. + * + * You should not directly try to interpret the result value as it is considered + * an opaque representation. However, the general representation is an integer + * in the range of `[-int errno, size_t size]`. Linux generally restricts the + * result of system calls like `read` and `write` to `<= 2^31` which means this + * will typically fit within a single FIXNUM. + * + * @param[in] result The result of the system call. + * @param[in] error The value of `errno`. + * @return A `VALUE` which contains the result and/or errno. + */ +static inline VALUE +rb_fiber_scheduler_io_result(ssize_t result, int error) { + if (result == -1) { + return RB_INT2NUM(-error); + } else { + return RB_SIZE2NUM(result); + } +} + +/** + * Apply an io result to the local thread, returning the value of the orginal + * system call that created it and updating `int errno`. + * + * You should not directly try to interpret the result value as it is considered + * an opaque representation. + * + * @param[in] result The `VALUE` which contains an errno and/or result size. + * @post Updates `int errno` with the value if negative. + * @return The original result of the system call. + */ +static inline ssize_t +rb_fiber_scheduler_io_result_apply(VALUE result) { + if (RB_FIXNUM_P(result) && RB_NUM2INT(result) < 0) { + errno = -RB_NUM2INT(result); + return -1; + } else { + return RB_NUM2SIZE(result); + } +} + +/** * Queries the current scheduler of the current thread that is calling this * function. * @@ -195,7 +245,7 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io); * @param[out] buffer Return buffer. * @param[in] length Requested number of bytes to read. * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`. - * @return otherwise What `scheduler.io_read` returns. + * @return otherwise What `scheduler.io_read` returns `[-errno, size]`. */ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length); @@ -207,7 +257,7 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t * @param[in] buffer What to write. * @param[in] length Number of bytes to write. * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`. - * @return otherwise What `scheduler.io_write` returns. + * @return otherwise What `scheduler.io_write` returns `[-errno, size]`. */ VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length); @@ -1138,14 +1138,14 @@ rb_read_internal(rb_io_t *fptr, void *buf, size_t count) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, 1); + VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, count); if (result != Qundef) { - ssize_t length = RB_NUM2SSIZE(result); + ssize_t length = rb_fiber_scheduler_io_result_apply(result); - if (length < 0) rb_sys_fail_path(fptr->pathv); + if (length < 0) rb_sys_fail_path(fptr->pathv); - return length; + return length; } } @@ -1165,14 +1165,10 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, count); + VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, 0); if (result != Qundef) { - ssize_t length = RB_NUM2SSIZE(result); - - if (length < 0) rb_sys_fail_path(fptr->pathv); - - return length; + return rb_fiber_scheduler_io_result_apply(result); } } @@ -1182,33 +1178,34 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count) .capa = count }; - return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd); + if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock)) + return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis, RUBY_UBF_IO, NULL); + else + return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd); } +#ifdef HAVE_WRITEV static ssize_t -rb_write_internal2(rb_io_t *fptr, const void *buf, size_t count) +rb_writev_internal(rb_io_t *fptr, const struct iovec *iov, int iovcnt) { - struct io_internal_write_struct iis = { - .fd = fptr->fd, - .buf = buf, - .capa = count - }; + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + for (int i = 0; i < iovcnt; i += 1) { + VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, iov[i].iov_base, iov[i].iov_len, 0); - return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis, - RUBY_UBF_IO, NULL); -} + if (result != Qundef) { + return rb_fiber_scheduler_io_result_apply(result); + } + } + } -#ifdef HAVE_WRITEV -static ssize_t -rb_writev_internal(int fd, const struct iovec *iov, int iovcnt) -{ struct io_internal_writev_struct iis = { - .fd = fd, + .fd = fptr->fd, .iov = iov, .iovcnt = iovcnt, }; - return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fd); + return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd); } #endif @@ -1592,7 +1589,7 @@ io_binwrite_string(VALUE arg) iov[1].iov_base = (char *)p->ptr; iov[1].iov_len = p->length; - r = rb_writev_internal(fptr->fd, iov, 2); + r = rb_writev_internal(fptr, iov, 2); if (r < 0) return r; @@ -1654,56 +1651,49 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync) if ((n = len) <= 0) return n; - VALUE scheduler = rb_fiber_scheduler_current(); - if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, ptr, len, len); - - if (result != Qundef) { - ssize_t length = RB_NUM2SSIZE(result); - - if (length < 0) rb_sys_fail_path(fptr->pathv); - - return length; - } - } - if (fptr->wbuf.ptr == NULL && !(!nosync && (fptr->mode & FMODE_SYNC))) { fptr->wbuf.off = 0; fptr->wbuf.len = 0; fptr->wbuf.capa = IO_WBUF_CAPA_MIN; fptr->wbuf.ptr = ALLOC_N(char, fptr->wbuf.capa); fptr->write_lock = rb_mutex_new(); - rb_mutex_allow_trap(fptr->write_lock, 1); + rb_mutex_allow_trap(fptr->write_lock, 1); } + if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) || (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) { - struct binwrite_arg arg; + struct binwrite_arg arg; - arg.fptr = fptr; - arg.str = str; + arg.fptr = fptr; + arg.str = str; retry: - arg.ptr = ptr + offset; - arg.length = n; - if (fptr->write_lock) { + arg.ptr = ptr + offset; + arg.length = n; + + if (fptr->write_lock) { r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg); - } - else { - r = io_binwrite_string((VALUE)&arg); - } - /* xxx: other threads may modify given string. */ + } + else { + r = io_binwrite_string((VALUE)&arg); + } + + /* xxx: other threads may modify given string. */ if (r == n) return len; if (0 <= r) { offset += r; n -= r; errno = EAGAIN; - } - if (r == -2L) - return -1L; + } + + if (r == -2L) + return -1L; if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) { rb_io_check_closed(fptr); - if (offset < len) - goto retry; + + if (offset < len) + goto retry; } + return -1L; } @@ -1712,8 +1702,10 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync) MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len); fptr->wbuf.off = 0; } + MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len); fptr->wbuf.len += (int)len; + return len; } @@ -1853,7 +1845,7 @@ static VALUE call_writev_internal(VALUE arg) { struct binwritev_arg *p = (struct binwritev_arg *)arg; - return rb_writev_internal(p->fptr->fd, p->iov, p->iovcnt); + return rb_writev_internal(p->fptr, p->iov, p->iovcnt); } static long @@ -1906,7 +1898,7 @@ io_binwritev(struct iovec *iov, int iovcnt, rb_io_t *fptr) r = rb_mutex_synchronize(fptr->write_lock, call_writev_internal, (VALUE)&arg); } else { - r = rb_writev_internal(fptr->fd, iov, iovcnt); + r = rb_writev_internal(fptr, iov, iovcnt); } if (r >= 0) { @@ -2330,6 +2322,7 @@ fptr_wait_readable(rb_io_t *fptr) if (ret) rb_io_check_closed(fptr); + return ret; } @@ -3063,10 +3056,11 @@ read_internal_call(VALUE arg) VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 1); + VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 0); if (result != Qundef) { - return (VALUE)RB_NUM2SSIZE(result); + // This is actually returned as a pseudo-VALUE and later cast to a long: + return (VALUE)rb_fiber_scheduler_io_result_apply(result); } } @@ -4761,10 +4755,7 @@ finish_writeconv(rb_io_t *fptr, int noalloc) res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0); while (dp-ds) { retry: - if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock)) - r = rb_write_internal2(fptr, ds, dp-ds); - else - r = rb_write_internal(fptr, ds, dp-ds); + r = rb_write_internal(fptr, ds, dp-ds); if (r == dp-ds) break; if (0 <= r) { diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index a6e1325259..0e352a1cc9 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -192,6 +192,9 @@ class Scheduler end Fiber.yield + ensure + @readable.delete(io) + @writable.delete(io) end # Used for Kernel#sleep and Thread::Mutex#sleep @@ -257,6 +260,85 @@ class Scheduler end end +class IOBufferScheduler < Scheduler + EAGAIN = Errno::EAGAIN::Errno + + def io_read(io, buffer, length) + offset = 0 + + while true + maximum_size = buffer.size - offset + result = blocking{io.read_nonblock(maximum_size, exception: false)} + + # blocking{pp read: maximum_size, result: result, length: length} + + case result + when :wait_readable + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return -EAGAIN + end + when :wait_writable + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return -EAGAIN + end + else + break unless result + + buffer.copy(result, offset) + + size = result.bytesize + offset += size + break if size >= length + length -= size + end + end + + return offset + end + + def io_write(io, buffer, length) + offset = 0 + + while true + maximum_size = buffer.size - offset + + chunk = buffer.to_str(offset, maximum_size) + result = blocking{io.write_nonblock(chunk, exception: false)} + + # blocking{pp write: maximum_size, result: result, length: length} + + case result + when :wait_readable + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return -EAGAIN + end + when :wait_writable + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return -EAGAIN + end + else + offset += result + break if result >= length + length -= result + end + end + + return offset + end + + def blocking(&block) + Fiber.new(blocking: true, &block).resume + end +end + class BrokenUnblockScheduler < Scheduler def unblock(blocker, fiber) super diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb index ce65a55f78..06a3c0dd11 100644 --- a/test/fiber/test_io.rb +++ b/test/fiber/test_io.rb @@ -140,4 +140,36 @@ class TestFiberIO < Test::Unit::TestCase server.close th.join end + + def test_read_write_blocking + skip "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + message = nil + + thread = Thread.new do + # This scheduler provides non-blocking `io_read`/`io_write`: + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + message = i.read(20) + i.close + end + + Fiber.schedule do + o.write("Hello World") + o.close + end + end + + thread.join + + assert_equal MESSAGE, message + assert_predicate(i, :closed?) + assert_predicate(o, :closed?) + end end diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb new file mode 100644 index 0000000000..e48764242f --- /dev/null +++ b/test/fiber/test_io_buffer.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +require 'timeout' + +class TestFiberIOBuffer < Test::Unit::TestCase + MESSAGE = "Hello World" + + def test_read_write_blocking + skip "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + message = nil + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + message = i.read(20) + i.close + end + + Fiber.schedule do + o.write(MESSAGE) + o.close + end + end + + thread.join + + assert_equal MESSAGE, message + assert_predicate(i, :closed?) + assert_predicate(o, :closed?) + end + + def test_timeout_after + skip "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + message = nil + error = nil + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + Timeout.timeout(0.1) do + message = i.read(20) + end + rescue Timeout::Error => error + # Assertions below. + ensure + i.close + end + end + + thread.join + + assert_nil message + assert_kind_of Timeout::Error, error + end + + def test_read_nonblock + skip "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + + message = nil + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + message = i.read_nonblock(20, exception: false) + i.close + end + end + + thread.join + + assert_equal :wait_readable, message + o.close + end + + def test_write_nonblock + skip "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + o.write_nonblock(MESSAGE, exception: false) + o.close + end + end + + thread.join + + assert_equal MESSAGE, i.read + end +end |