summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2024-10-31 17:26:37 +1300
committerGitHub <noreply@github.com>2024-10-31 17:26:37 +1300
commit87fb44dff6409a19d12052cf0fc07ba80a4c45ac (patch)
tree1446cd9390356ae660c50ef7ae877bc57af8d567
parent550ac2f2edc07d1b63e3755233df0758a652b53f (diff)
Introduce Fiber Scheduler `blocking_region` hook. (#11963)
Notes
Notes: Merged-By: ioquatix <samuel@codeotaku.com>
-rw-r--r--NEWS.md6
-rw-r--r--common.mk1
-rw-r--r--include/ruby/fiber/scheduler.h20
-rw-r--r--scheduler.c63
-rw-r--r--test/fiber/scheduler.rb16
-rw-r--r--test/fiber/test_io.rb5
-rw-r--r--test/fiber/test_io_buffer.rb3
-rw-r--r--test/fiber/test_process.rb4
-rw-r--r--thread.c12
9 files changed, 118 insertions, 12 deletions
diff --git a/NEWS.md b/NEWS.md
index ac9f282714..3896d23fe5 100644
--- a/NEWS.md
+++ b/NEWS.md
@@ -57,6 +57,11 @@ Note: We're only listing outstanding class updates.
associated with the AST node. [[Feature #20624]]
* Add RubyVM::AbstractSyntaxTree::Location class which holds location information. [[Feature #20624]]
+* Fiber::Scheduler
+
+ * An optional `Fiber::Scheduler#blocking_region` hook allows blocking operations to be moved out of the event loop
+ in order to reduce latency and improve multi-core processor utilization. [[Feature #20855]]
+
## Stdlib updates
* Tempfile
@@ -212,3 +217,4 @@ details of the default gems or bundled gems.
[Feature #20497]: https://bugs.ruby-lang.org/issues/20497
[Feature #20624]: https://bugs.ruby-lang.org/issues/20624
[Feature #20775]: https://bugs.ruby-lang.org/issues/20775
+[Feature #20855]: https://bugs.ruby-lang.org/issues/20855
diff --git a/common.mk b/common.mk
index 4c48f1ebd0..28bb2b7d60 100644
--- a/common.mk
+++ b/common.mk
@@ -16648,6 +16648,7 @@ scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
scheduler.$(OBJEXT): {$(VPATH)}shape.h
scheduler.$(OBJEXT): {$(VPATH)}st.h
scheduler.$(OBJEXT): {$(VPATH)}subst.h
+scheduler.$(OBJEXT): {$(VPATH)}thread.h
scheduler.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
scheduler.$(OBJEXT): {$(VPATH)}thread_native.h
scheduler.$(OBJEXT): {$(VPATH)}vm_core.h
diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h
index 8f3d383330..bb18b5e01b 100644
--- a/include/ruby/fiber/scheduler.h
+++ b/include/ruby/fiber/scheduler.h
@@ -391,6 +391,26 @@ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io);
*/
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname);
+struct rb_fiber_scheduler_blocking_region_state {
+ void *result;
+ int saved_errno;
+};
+
+/**
+ * Defer the execution of the passed function to the scheduler.
+ *
+ * @param[in] scheduler Target scheduler.
+ * @param[in] function The function to run.
+ * @param[in] data The data to pass to the function.
+ * @param[in] unblock_function The unblock function to use to interrupt the operation.
+ * @param[in] data2 The data to pass to the unblock function.
+ * @param[in] flags Flags passed to `rb_nogvl`.
+ * @param[out] state The result and errno of the operation.
+ * @retval RUBY_Qundef `scheduler` doesn't have `#blocking_region`.
+ * @return otherwise What `scheduler.blocking_region` returns.
+ */
+VALUE rb_fiber_scheduler_blocking_region(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_region_state *state);
+
/**
* Create and schedule a non-blocking fiber.
*
diff --git a/scheduler.c b/scheduler.c
index 3159635dba..0d51a0d951 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -13,6 +13,9 @@
#include "ruby/io.h"
#include "ruby/io/buffer.h"
+#include "ruby/thread.h"
+
+// For `ruby_thread_has_gvl_p`.
#include "internal/thread.h"
static ID id_close;
@@ -33,6 +36,8 @@ static ID id_io_close;
static ID id_address_resolve;
+static ID id_blocking_region;
+
static ID id_fiber_schedule;
/*
@@ -109,6 +114,8 @@ Init_Fiber_Scheduler(void)
id_address_resolve = rb_intern_const("address_resolve");
+ id_blocking_region = rb_intern_const("blocking_region");
+
id_fiber_schedule = rb_intern_const("fiber");
#if 0 /* for RDoc */
@@ -693,6 +700,62 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
}
+struct rb_blocking_region_arguments {
+ void *(*function)(void *);
+ void *data;
+ rb_unblock_function_t *unblock_function;
+ void *data2;
+ int flags;
+
+ struct rb_fiber_scheduler_blocking_region_state *state;
+};
+
+static VALUE
+rb_fiber_scheduler_blocking_region_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
+{
+ struct rb_blocking_region_arguments *arguments = (struct rb_blocking_region_arguments*)_arguments;
+
+ if (arguments->state == NULL) {
+ rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
+ }
+
+ arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
+ arguments->state->saved_errno = rb_errno();
+
+ // Make sure it's only invoked once.
+ arguments->state = NULL;
+
+ return Qnil;
+}
+
+/*
+ * Document-method: Fiber::Scheduler#blocking_region
+ * call-seq: blocking_region(work)
+ *
+ * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
+ *
+ * Minimal suggested implementation is:
+ *
+ * def blocking_region(work)
+ * Thread.new(&work).join
+ * end
+ */
+VALUE rb_fiber_scheduler_blocking_region(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_region_state *state)
+{
+ struct rb_blocking_region_arguments arguments = {
+ .function = function,
+ .data = data,
+ .unblock_function = unblock_function,
+ .data2 = data2,
+ .flags = flags,
+ .state = state
+ };
+
+ VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_region_proc, (VALUE)&arguments);
+
+ return rb_check_funcall(scheduler, id_blocking_region, 1, &proc);
+}
+
/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 3926226ca3..91fba0476e 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -309,6 +309,10 @@ class Scheduler
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
end.value
end
+
+ def blocking_region(work)
+ Thread.new(&work).join
+ end
end
# This scheduler class implements `io_read` and `io_write` hooks which require
@@ -321,8 +325,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.read(io, maximum_size, offset)}
+ result = blocking{buffer.read(io, 0, offset)}
if result > 0
total += result
@@ -349,8 +352,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.write(io, maximum_size, offset)}
+ result = blocking{buffer.write(io, 0, offset)}
if result > 0
total += result
@@ -377,8 +379,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.pread(io, from, maximum_size, offset)}
+ result = blocking{buffer.pread(io, from, 0, offset)}
if result > 0
total += result
@@ -406,8 +407,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
+ result = blocking{buffer.pwrite(io, from, 0, offset)}
if result > 0
total += result
diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb
index 4891c607f7..39e32c5987 100644
--- a/test/fiber/test_io.rb
+++ b/test/fiber/test_io.rb
@@ -153,12 +153,13 @@ class TestFiberIO < Test::Unit::TestCase
Fiber.set_scheduler scheduler
Fiber.schedule do
- message = i.read(20)
+ # We add 1 here, to force the read to block (testing that specific code path).
+ message = i.read(MESSAGE.bytesize + 1)
i.close
end
Fiber.schedule do
- o.write("Hello World")
+ o.write(MESSAGE)
o.close
end
end
diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb
index a08b1ce1a9..19e6c1f88e 100644
--- a/test/fiber/test_io_buffer.rb
+++ b/test/fiber/test_io_buffer.rb
@@ -21,7 +21,8 @@ class TestFiberIOBuffer < Test::Unit::TestCase
Fiber.set_scheduler scheduler
Fiber.schedule do
- message = i.read(20)
+ # We add 1 here, to force the read to block (testing that specific code path).
+ message = i.read(MESSAGE.bytesize + 1)
i.close
end
diff --git a/test/fiber/test_process.rb b/test/fiber/test_process.rb
index a09b070c0a..f17f767704 100644
--- a/test/fiber/test_process.rb
+++ b/test/fiber/test_process.rb
@@ -59,12 +59,14 @@ class TestFiberProcess < Test::Unit::TestCase
def test_fork
omit 'fork not supported' unless Process.respond_to?(:fork)
+
+ pid = Process.fork{}
+
Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
- pid = Process.fork {}
Process.wait(pid)
assert_predicate $?, :success?
diff --git a/thread.c b/thread.c
index 2a937ca278..91278e718d 100644
--- a/thread.c
+++ b/thread.c
@@ -1523,6 +1523,18 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2,
int flags)
{
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ struct rb_fiber_scheduler_blocking_region_state state;
+
+ VALUE result = rb_fiber_scheduler_blocking_region(scheduler, func, data1, ubf, data2, flags, &state);
+
+ if (!UNDEF_P(result)) {
+ rb_errno_set(state.saved_errno);
+ return state.result;
+ }
+ }
+
void *val = 0;
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);