summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2021-12-18 18:19:30 +1300
committerGitHub <noreply@github.com>2021-12-18 18:19:30 +1300
commit42d32311541e58503b885b09b469948922650c66 (patch)
tree235c0405be8a02e2fb2612b5daf9694d01ebd2fe
parent922a81a99418c992f4039b27a8341f2ee96d2d0c (diff)
Introduce io_result wrapper for passing `[-errno, size]` in VALUE.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/5287 Merged-By: ioquatix <samuel@codeotaku.com>
-rw-r--r--include/ruby/fiber/scheduler.h54
-rw-r--r--io.c121
-rw-r--r--test/fiber/scheduler.rb82
-rw-r--r--test/fiber/test_io.rb32
-rw-r--r--test/fiber/test_io_buffer.rb114
5 files changed, 336 insertions, 67 deletions
diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h
index 8294442216..ff587e28c0 100644
--- a/include/ruby/fiber/scheduler.h
+++ b/include/ruby/fiber/scheduler.h
@@ -11,18 +11,68 @@
*/
#include "ruby/internal/config.h"
+#include <errno.h>
+
#ifdef STDC_HEADERS
#include <stddef.h> /* size_t */
#endif
#include "ruby/ruby.h"
#include "ruby/internal/dllexport.h"
+#include "ruby/internal/arithmetic.h"
RBIMPL_SYMBOL_EXPORT_BEGIN()
struct timeval;
/**
+ * Wrap a `ssize_t` and `int errno` into a single `VALUE`. This interface should
+ * be used to safely capture results from system calls like `read` and `write`.
+ *
+ * You should use `rb_fiber_scheduler_io_result_apply` to unpack the result of
+ * this value and update `int errno`.
+ *
+ * You should not directly try to interpret the result value as it is considered
+ * an opaque representation. However, the general representation is an integer
+ * in the range of `[-int errno, size_t size]`. Linux generally restricts the
+ * result of system calls like `read` and `write` to `<= 2^31` which means this
+ * will typically fit within a single FIXNUM.
+ *
+ * @param[in] result The result of the system call.
+ * @param[in] error The value of `errno`.
+ * @return A `VALUE` which contains the result and/or errno.
+ */
+static inline VALUE
+rb_fiber_scheduler_io_result(ssize_t result, int error) {
+ if (result == -1) {
+ return RB_INT2NUM(-error);
+ } else {
+ return RB_SIZE2NUM(result);
+ }
+}
+
+/**
+ * Apply an io result to the local thread, returning the value of the orginal
+ * system call that created it and updating `int errno`.
+ *
+ * You should not directly try to interpret the result value as it is considered
+ * an opaque representation.
+ *
+ * @param[in] result The `VALUE` which contains an errno and/or result size.
+ * @post Updates `int errno` with the value if negative.
+ * @return The original result of the system call.
+ */
+static inline ssize_t
+rb_fiber_scheduler_io_result_apply(VALUE result) {
+ if (RB_FIXNUM_P(result) && RB_NUM2INT(result) < 0) {
+ errno = -RB_NUM2INT(result);
+ return -1;
+ } else {
+ return RB_NUM2SIZE(result);
+ }
+}
+
+/**
* Queries the current scheduler of the current thread that is calling this
* function.
*
@@ -195,7 +245,7 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
* @param[out] buffer Return buffer.
* @param[in] length Requested number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
- * @return otherwise What `scheduler.io_read` returns.
+ * @return otherwise What `scheduler.io_read` returns `[-errno, size]`.
*/
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
@@ -207,7 +257,7 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* @param[in] buffer What to write.
* @param[in] length Number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
- * @return otherwise What `scheduler.io_write` returns.
+ * @return otherwise What `scheduler.io_write` returns `[-errno, size]`.
*/
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
diff --git a/io.c b/io.c
index bc64938648..7ec63fcf71 100644
--- a/io.c
+++ b/io.c
@@ -1138,14 +1138,14 @@ rb_read_internal(rb_io_t *fptr, void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
- VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, 1);
+ VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, count);
if (result != Qundef) {
- ssize_t length = RB_NUM2SSIZE(result);
+ ssize_t length = rb_fiber_scheduler_io_result_apply(result);
- if (length < 0) rb_sys_fail_path(fptr->pathv);
+ if (length < 0) rb_sys_fail_path(fptr->pathv);
- return length;
+ return length;
}
}
@@ -1165,14 +1165,10 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
- VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, count);
+ VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, 0);
if (result != Qundef) {
- ssize_t length = RB_NUM2SSIZE(result);
-
- if (length < 0) rb_sys_fail_path(fptr->pathv);
-
- return length;
+ return rb_fiber_scheduler_io_result_apply(result);
}
}
@@ -1182,33 +1178,34 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
.capa = count
};
- return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
+ if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
+ return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis, RUBY_UBF_IO, NULL);
+ else
+ return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
}
+#ifdef HAVE_WRITEV
static ssize_t
-rb_write_internal2(rb_io_t *fptr, const void *buf, size_t count)
+rb_writev_internal(rb_io_t *fptr, const struct iovec *iov, int iovcnt)
{
- struct io_internal_write_struct iis = {
- .fd = fptr->fd,
- .buf = buf,
- .capa = count
- };
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ for (int i = 0; i < iovcnt; i += 1) {
+ VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, iov[i].iov_base, iov[i].iov_len, 0);
- return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis,
- RUBY_UBF_IO, NULL);
-}
+ if (result != Qundef) {
+ return rb_fiber_scheduler_io_result_apply(result);
+ }
+ }
+ }
-#ifdef HAVE_WRITEV
-static ssize_t
-rb_writev_internal(int fd, const struct iovec *iov, int iovcnt)
-{
struct io_internal_writev_struct iis = {
- .fd = fd,
+ .fd = fptr->fd,
.iov = iov,
.iovcnt = iovcnt,
};
- return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fd);
+ return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd);
}
#endif
@@ -1592,7 +1589,7 @@ io_binwrite_string(VALUE arg)
iov[1].iov_base = (char *)p->ptr;
iov[1].iov_len = p->length;
- r = rb_writev_internal(fptr->fd, iov, 2);
+ r = rb_writev_internal(fptr, iov, 2);
if (r < 0)
return r;
@@ -1654,56 +1651,49 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
if ((n = len) <= 0) return n;
- VALUE scheduler = rb_fiber_scheduler_current();
- if (scheduler != Qnil) {
- VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, ptr, len, len);
-
- if (result != Qundef) {
- ssize_t length = RB_NUM2SSIZE(result);
-
- if (length < 0) rb_sys_fail_path(fptr->pathv);
-
- return length;
- }
- }
-
if (fptr->wbuf.ptr == NULL && !(!nosync && (fptr->mode & FMODE_SYNC))) {
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
fptr->wbuf.capa = IO_WBUF_CAPA_MIN;
fptr->wbuf.ptr = ALLOC_N(char, fptr->wbuf.capa);
fptr->write_lock = rb_mutex_new();
- rb_mutex_allow_trap(fptr->write_lock, 1);
+ rb_mutex_allow_trap(fptr->write_lock, 1);
}
+
if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
(fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
- struct binwrite_arg arg;
+ struct binwrite_arg arg;
- arg.fptr = fptr;
- arg.str = str;
+ arg.fptr = fptr;
+ arg.str = str;
retry:
- arg.ptr = ptr + offset;
- arg.length = n;
- if (fptr->write_lock) {
+ arg.ptr = ptr + offset;
+ arg.length = n;
+
+ if (fptr->write_lock) {
r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
- }
- else {
- r = io_binwrite_string((VALUE)&arg);
- }
- /* xxx: other threads may modify given string. */
+ }
+ else {
+ r = io_binwrite_string((VALUE)&arg);
+ }
+
+ /* xxx: other threads may modify given string. */
if (r == n) return len;
if (0 <= r) {
offset += r;
n -= r;
errno = EAGAIN;
- }
- if (r == -2L)
- return -1L;
+ }
+
+ if (r == -2L)
+ return -1L;
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);
- if (offset < len)
- goto retry;
+
+ if (offset < len)
+ goto retry;
}
+
return -1L;
}
@@ -1712,8 +1702,10 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}
+
MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
fptr->wbuf.len += (int)len;
+
return len;
}
@@ -1853,7 +1845,7 @@ static VALUE
call_writev_internal(VALUE arg)
{
struct binwritev_arg *p = (struct binwritev_arg *)arg;
- return rb_writev_internal(p->fptr->fd, p->iov, p->iovcnt);
+ return rb_writev_internal(p->fptr, p->iov, p->iovcnt);
}
static long
@@ -1906,7 +1898,7 @@ io_binwritev(struct iovec *iov, int iovcnt, rb_io_t *fptr)
r = rb_mutex_synchronize(fptr->write_lock, call_writev_internal, (VALUE)&arg);
}
else {
- r = rb_writev_internal(fptr->fd, iov, iovcnt);
+ r = rb_writev_internal(fptr, iov, iovcnt);
}
if (r >= 0) {
@@ -2330,6 +2322,7 @@ fptr_wait_readable(rb_io_t *fptr)
if (ret)
rb_io_check_closed(fptr);
+
return ret;
}
@@ -3063,10 +3056,11 @@ read_internal_call(VALUE arg)
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
- VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 1);
+ VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 0);
if (result != Qundef) {
- return (VALUE)RB_NUM2SSIZE(result);
+ // This is actually returned as a pseudo-VALUE and later cast to a long:
+ return (VALUE)rb_fiber_scheduler_io_result_apply(result);
}
}
@@ -4761,10 +4755,7 @@ finish_writeconv(rb_io_t *fptr, int noalloc)
res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0);
while (dp-ds) {
retry:
- if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
- r = rb_write_internal2(fptr, ds, dp-ds);
- else
- r = rb_write_internal(fptr, ds, dp-ds);
+ r = rb_write_internal(fptr, ds, dp-ds);
if (r == dp-ds)
break;
if (0 <= r) {
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index a6e1325259..0e352a1cc9 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -192,6 +192,9 @@ class Scheduler
end
Fiber.yield
+ ensure
+ @readable.delete(io)
+ @writable.delete(io)
end
# Used for Kernel#sleep and Thread::Mutex#sleep
@@ -257,6 +260,85 @@ class Scheduler
end
end
+class IOBufferScheduler < Scheduler
+ EAGAIN = Errno::EAGAIN::Errno
+
+ def io_read(io, buffer, length)
+ offset = 0
+
+ while true
+ maximum_size = buffer.size - offset
+ result = blocking{io.read_nonblock(maximum_size, exception: false)}
+
+ # blocking{pp read: maximum_size, result: result, length: length}
+
+ case result
+ when :wait_readable
+ if length > 0
+ self.io_wait(io, IO::READABLE, nil)
+ else
+ return -EAGAIN
+ end
+ when :wait_writable
+ if length > 0
+ self.io_wait(io, IO::WRITABLE, nil)
+ else
+ return -EAGAIN
+ end
+ else
+ break unless result
+
+ buffer.copy(result, offset)
+
+ size = result.bytesize
+ offset += size
+ break if size >= length
+ length -= size
+ end
+ end
+
+ return offset
+ end
+
+ def io_write(io, buffer, length)
+ offset = 0
+
+ while true
+ maximum_size = buffer.size - offset
+
+ chunk = buffer.to_str(offset, maximum_size)
+ result = blocking{io.write_nonblock(chunk, exception: false)}
+
+ # blocking{pp write: maximum_size, result: result, length: length}
+
+ case result
+ when :wait_readable
+ if length > 0
+ self.io_wait(io, IO::READABLE, nil)
+ else
+ return -EAGAIN
+ end
+ when :wait_writable
+ if length > 0
+ self.io_wait(io, IO::WRITABLE, nil)
+ else
+ return -EAGAIN
+ end
+ else
+ offset += result
+ break if result >= length
+ length -= result
+ end
+ end
+
+ return offset
+ end
+
+ def blocking(&block)
+ Fiber.new(blocking: true, &block).resume
+ end
+end
+
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super
diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb
index ce65a55f78..06a3c0dd11 100644
--- a/test/fiber/test_io.rb
+++ b/test/fiber/test_io.rb
@@ -140,4 +140,36 @@ class TestFiberIO < Test::Unit::TestCase
server.close
th.join
end
+
+ def test_read_write_blocking
+ skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ i.nonblock = false
+ o.nonblock = false
+
+ message = nil
+
+ thread = Thread.new do
+ # This scheduler provides non-blocking `io_read`/`io_write`:
+ scheduler = IOBufferScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ message = i.read(20)
+ i.close
+ end
+
+ Fiber.schedule do
+ o.write("Hello World")
+ o.close
+ end
+ end
+
+ thread.join
+
+ assert_equal MESSAGE, message
+ assert_predicate(i, :closed?)
+ assert_predicate(o, :closed?)
+ end
end
diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb
new file mode 100644
index 0000000000..e48764242f
--- /dev/null
+++ b/test/fiber/test_io_buffer.rb
@@ -0,0 +1,114 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+require 'timeout'
+
+class TestFiberIOBuffer < Test::Unit::TestCase
+ MESSAGE = "Hello World"
+
+ def test_read_write_blocking
+ skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ i.nonblock = false
+ o.nonblock = false
+
+ message = nil
+
+ thread = Thread.new do
+ scheduler = IOBufferScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ message = i.read(20)
+ i.close
+ end
+
+ Fiber.schedule do
+ o.write(MESSAGE)
+ o.close
+ end
+ end
+
+ thread.join
+
+ assert_equal MESSAGE, message
+ assert_predicate(i, :closed?)
+ assert_predicate(o, :closed?)
+ end
+
+ def test_timeout_after
+ skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ i.nonblock = false
+ o.nonblock = false
+
+ message = nil
+ error = nil
+
+ thread = Thread.new do
+ scheduler = IOBufferScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ Timeout.timeout(0.1) do
+ message = i.read(20)
+ end
+ rescue Timeout::Error => error
+ # Assertions below.
+ ensure
+ i.close
+ end
+ end
+
+ thread.join
+
+ assert_nil message
+ assert_kind_of Timeout::Error, error
+ end
+
+ def test_read_nonblock
+ skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+
+ message = nil
+
+ thread = Thread.new do
+ scheduler = IOBufferScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ message = i.read_nonblock(20, exception: false)
+ i.close
+ end
+ end
+
+ thread.join
+
+ assert_equal :wait_readable, message
+ o.close
+ end
+
+ def test_write_nonblock
+ skip "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+
+ thread = Thread.new do
+ scheduler = IOBufferScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ o.write_nonblock(MESSAGE, exception: false)
+ o.close
+ end
+ end
+
+ thread.join
+
+ assert_equal MESSAGE, i.read
+ end
+end