summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2021-12-23 12:20:09 +1300
committerGitHub <noreply@github.com>2021-12-23 12:20:09 +1300
commitbed920f0731a1a89a0e5fc7a7428d21be3ffb8a0 (patch)
tree928abd45556a6f2380211ef347ce0df9c0858db9
parent91c5c1c132994c9ca4540125d462988d83e37a6b (diff)
Add fiber scheduler hooks for `pread`/`pwrite`, and add support to `IO::Buffer`.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/5249 Merged-By: ioquatix <samuel@codeotaku.com>
-rw-r--r--common.mk1
-rw-r--r--include/ruby/fiber/scheduler.h26
-rw-r--r--include/ruby/io/buffer.h6
-rw-r--r--io_buffer.c173
-rw-r--r--scheduler.c27
-rw-r--r--test/ruby/test_io_buffer.rb57
6 files changed, 288 insertions, 2 deletions
diff --git a/common.mk b/common.mk
index 8fc0a590f8..8791069af7 100644
--- a/common.mk
+++ b/common.mk
@@ -7585,6 +7585,7 @@ io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h
io_buffer.$(OBJEXT): {$(VPATH)}config.h
io_buffer.$(OBJEXT): {$(VPATH)}defines.h
io_buffer.$(OBJEXT): {$(VPATH)}encoding.h
+io_buffer.$(OBJEXT): {$(VPATH)}fiber/scheduler.h
io_buffer.$(OBJEXT): {$(VPATH)}intern.h
io_buffer.$(OBJEXT): {$(VPATH)}internal.h
io_buffer.$(OBJEXT): {$(VPATH)}internal/anyargs.h
diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h
index ff587e28c0..a255a1a712 100644
--- a/include/ruby/fiber/scheduler.h
+++ b/include/ruby/fiber/scheduler.h
@@ -262,6 +262,32 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
/**
+ * Nonblocking read from the passed IO at the specified offset.
+ *
+ * @param[in] scheduler Target scheduler.
+ * @param[out] io An io object to read from.
+ * @param[out] buffer Return buffer.
+ * @param[in] length Requested number of bytes to read.
+ * @param[in] offset The offset in the given IO to read the data from.
+ * @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
+ * @return otherwise What `scheduler.io_read` returns.
+ */
+VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset);
+
+/**
+ * Nonblocking write to the passed IO at the specified offset.
+ *
+ * @param[in] scheduler Target scheduler.
+ * @param[out] io An io object to write to.
+ * @param[in] buffer What to write.
+ * @param[in] length Number of bytes to write.
+ * @param[in] offset The offset in the given IO to write the data to.
+ * @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
+ * @return otherwise What `scheduler.io_write` returns.
+ */
+VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset);
+
+/**
* Nonblocking read from the passed IO using a native buffer.
*
* @param[in] scheduler Target scheduler.
diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h
index 4826a7a76f..907fec20bb 100644
--- a/include/ruby/io/buffer.h
+++ b/include/ruby/io/buffer.h
@@ -80,6 +80,12 @@ VALUE rb_io_buffer_transfer(VALUE self);
void rb_io_buffer_resize(VALUE self, size_t size);
void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length);
+// The length is the minimum required length.
+VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length);
+VALUE rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset);
+VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length);
+VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset);
+
RBIMPL_SYMBOL_EXPORT_END()
#endif /* RUBY_IO_BUFFER_T */
diff --git a/io_buffer.c b/io_buffer.c
index 4487cab773..fa28c59b32 100644
--- a/io_buffer.c
+++ b/io_buffer.c
@@ -8,6 +8,7 @@
#include "ruby/io.h"
#include "ruby/io/buffer.h"
+#include "ruby/fiber/scheduler.h"
#include "internal.h"
#include "internal/string.h"
@@ -1864,6 +1865,172 @@ size_t io_buffer_default_size(size_t page_size) {
return platform_agnostic_default_size;
}
+VALUE
+rb_io_buffer_read(VALUE self, VALUE io, size_t length)
+{
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length);
+
+ if (result != Qundef) {
+ return result;
+ }
+ }
+
+ struct rb_io_buffer *data = NULL;
+ TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+ io_buffer_validate_range(data, 0, length);
+
+ int descriptor = rb_io_descriptor(io);
+
+ void * base;
+ size_t size;
+ io_buffer_get_bytes_for_writing(data, &base, &size);
+
+ ssize_t result = read(descriptor, base, size);
+
+ return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_read(VALUE self, VALUE io, VALUE length)
+{
+ return rb_io_buffer_read(self, io, RB_NUM2SIZE(length));
+}
+
+VALUE
+rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset)
+{
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, self, length, offset);
+
+ if (result != Qundef) {
+ return result;
+ }
+ }
+
+ struct rb_io_buffer *data = NULL;
+ TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+ io_buffer_validate_range(data, 0, length);
+
+ int descriptor = rb_io_descriptor(io);
+
+ void * base;
+ size_t size;
+ io_buffer_get_bytes_for_writing(data, &base, &size);
+
+#if defined(HAVE_PREAD)
+ ssize_t result = pread(descriptor, base, size, offset);
+#else
+ // This emulation is not thread safe, but the GVL means it's unlikely to be a problem.
+ off_t current_offset = lseek(descriptor, 0, SEEK_CUR);
+ if (current_offset == (off_t)-1)
+ return rb_fiber_scheduler_io_result(-1, errno);
+
+ if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1)
+ return rb_fiber_scheduler_io_result(-1, errno);
+
+ ssize_t result = read(descriptor, base, size);
+
+ if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1)
+ return rb_fiber_scheduler_io_result(-1, errno);
+#endif
+
+ return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_pread(VALUE self, VALUE io, VALUE length, VALUE offset)
+{
+ return rb_io_buffer_pread(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset));
+}
+
+VALUE
+rb_io_buffer_write(VALUE self, VALUE io, size_t length)
+{
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length);
+
+ if (result != Qundef) {
+ return result;
+ }
+ }
+
+ struct rb_io_buffer *data = NULL;
+ TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+ io_buffer_validate_range(data, 0, length);
+
+ int descriptor = rb_io_descriptor(io);
+
+ const void * base;
+ size_t size;
+ io_buffer_get_bytes_for_reading(data, &base, &size);
+
+ ssize_t result = write(descriptor, base, length);
+
+ return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_write(VALUE self, VALUE io, VALUE length)
+{
+ return rb_io_buffer_write(self, io, RB_NUM2SIZE(length));
+}
+
+VALUE
+rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset)
+{
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, self, length, OFFT2NUM(offset));
+
+ if (result != Qundef) {
+ return result;
+ }
+ }
+
+ struct rb_io_buffer *data = NULL;
+ TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+ io_buffer_validate_range(data, 0, length);
+
+ int descriptor = rb_io_descriptor(io);
+
+ const void * base;
+ size_t size;
+ io_buffer_get_bytes_for_reading(data, &base, &size);
+
+#if defined(HAVE_PWRITE)
+ ssize_t result = pwrite(descriptor, base, length, offset);
+#else
+ // This emulation is not thread safe, but the GVL means it's unlikely to be a problem.
+ off_t current_offset = lseek(descriptor, 0, SEEK_CUR);
+ if (current_offset == (off_t)-1)
+ return rb_fiber_scheduler_io_result(-1, errno);
+
+ if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1)
+ return rb_fiber_scheduler_io_result(-1, errno);
+
+ ssize_t result = write(descriptor, base, length);
+
+ if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1)
+ return rb_fiber_scheduler_io_result(-1, errno);
+#endif
+
+ return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_pwrite(VALUE self, VALUE io, VALUE length, VALUE offset)
+{
+ return rb_io_buffer_pwrite(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset));
+}
+
/*
* Document-class: IO::Buffer
*
@@ -2038,4 +2205,10 @@ Init_IO_Buffer(void)
rb_define_method(rb_cIOBuffer, "get_string", io_buffer_get_string, -1);
rb_define_method(rb_cIOBuffer, "set_string", io_buffer_set_string, -1);
+
+ // IO operations:
+ rb_define_method(rb_cIOBuffer, "read", io_buffer_read, 2);
+ rb_define_method(rb_cIOBuffer, "pread", io_buffer_pread, 3);
+ rb_define_method(rb_cIOBuffer, "write", io_buffer_write, 2);
+ rb_define_method(rb_cIOBuffer, "pwrite", io_buffer_pwrite, 3);
}
diff --git a/scheduler.c b/scheduler.c
index 51696ab18f..06658356b1 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -25,8 +25,8 @@ static ID id_timeout_after;
static ID id_kernel_sleep;
static ID id_process_wait;
-static ID id_io_read;
-static ID id_io_write;
+static ID id_io_read, id_io_pread;
+static ID id_io_write, id_io_pwrite;
static ID id_io_wait;
static ID id_io_close;
@@ -46,7 +46,10 @@ Init_Fiber_Scheduler(void)
id_process_wait = rb_intern_const("process_wait");
id_io_read = rb_intern_const("io_read");
+ id_io_pread = rb_intern_const("io_pread");
id_io_write = rb_intern_const("io_write");
+ id_io_pwrite = rb_intern_const("io_pwrite");
+
id_io_wait = rb_intern_const("io_wait");
id_io_close = rb_intern_const("io_close");
@@ -239,6 +242,16 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
}
VALUE
+rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
+{
+ VALUE arguments[] = {
+ io, buffer, SIZET2NUM(length), OFFT2NUM(offset)
+ };
+
+ return rb_check_funcall(scheduler, id_io_pread, 4, arguments);
+}
+
+VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length)
{
VALUE arguments[] = {
@@ -249,6 +262,16 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
}
VALUE
+rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset)
+{
+ VALUE arguments[] = {
+ io, buffer, SIZET2NUM(length), OFFT2NUM(offset)
+ };
+
+ return rb_check_funcall(scheduler, id_io_pwrite, 4, arguments);
+}
+
+VALUE
rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
{
VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
diff --git a/test/ruby/test_io_buffer.rb b/test/ruby/test_io_buffer.rb
index 362845ec2a..7e3b467ed5 100644
--- a/test/ruby/test_io_buffer.rb
+++ b/test/ruby/test_io_buffer.rb
@@ -1,5 +1,7 @@
# frozen_string_literal: false
+require 'tempfile'
+
class TestIOBuffer < Test::Unit::TestCase
experimental = Warning[:experimental]
begin
@@ -270,4 +272,59 @@ class TestIOBuffer < Test::Unit::TestCase
input.close
end
+
+ def test_read
+ io = Tempfile.new
+ io.write("Hello World")
+ io.seek(0)
+
+ buffer = IO::Buffer.new(128)
+ buffer.read(io, 5)
+
+ assert_equal "Hello", buffer.get_string(0, 5)
+ ensure
+ io.close!
+ end
+
+ def test_write
+ io = Tempfile.new
+
+ buffer = IO::Buffer.new(128)
+ buffer.set_string("Hello")
+ buffer.write(io, 5)
+
+ io.seek(0)
+ assert_equal "Hello", io.read(5)
+ ensure
+ io.close!
+ end
+
+ def test_pread
+ io = Tempfile.new
+ io.write("Hello World")
+ io.seek(0)
+
+ buffer = IO::Buffer.new(128)
+ buffer.pread(io, 5, 6)
+
+ assert_equal "World", buffer.get_string(0, 5)
+ assert_equal 0, io.tell
+ ensure
+ io.close!
+ end
+
+ def test_pwrite
+ io = Tempfile.new
+
+ buffer = IO::Buffer.new(128)
+ buffer.set_string("World")
+ buffer.pwrite(io, 5, 6)
+
+ assert_equal 0, io.tell
+
+ io.seek(6)
+ assert_equal "World", io.read(5)
+ ensure
+ io.close!
+ end
end