summaryrefslogtreecommitdiff
path: root/scheduler.c
diff options
context:
space:
mode:
Diffstat (limited to 'scheduler.c')
-rw-r--r--scheduler.c592
1 files changed, 560 insertions, 32 deletions
diff --git a/scheduler.c b/scheduler.c
index 3159635dba..7efd4274cb 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -9,18 +9,27 @@
**********************************************************************/
#include "vm_core.h"
+#include "eval_intern.h"
#include "ruby/fiber/scheduler.h"
#include "ruby/io.h"
#include "ruby/io/buffer.h"
+#include "ruby/thread.h"
+
+// For `ruby_thread_has_gvl_p`:
#include "internal/thread.h"
+// For atomic operations:
+#include "ruby_atomic.h"
+
static ID id_close;
static ID id_scheduler_close;
static ID id_block;
static ID id_unblock;
+static ID id_yield;
+
static ID id_timeout_after;
static ID id_kernel_sleep;
static ID id_process_wait;
@@ -33,9 +42,216 @@ static ID id_io_close;
static ID id_address_resolve;
+static ID id_blocking_operation_wait;
+static ID id_fiber_interrupt;
+
static ID id_fiber_schedule;
+// Our custom blocking operation class
+static VALUE rb_cFiberSchedulerBlockingOperation;
+
+/*
+ * Custom blocking operation structure for blocking operations
+ * This replaces the use of Ruby procs to avoid use-after-free issues
+ * and provides a cleaner C API for native work pools.
+ */
+
+typedef enum {
+ RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started
+ RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
+ RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
+ RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
+} rb_fiber_blocking_operation_status_t;
+
+struct rb_fiber_scheduler_blocking_operation {
+ void *(*function)(void *);
+ void *data;
+
+ rb_unblock_function_t *unblock_function;
+ void *data2;
+
+ int flags;
+ struct rb_fiber_scheduler_blocking_operation_state *state;
+
+ // Execution status
+ volatile rb_atomic_t status;
+};
+
+static size_t
+blocking_operation_memsize(const void *ptr)
+{
+ return sizeof(rb_fiber_scheduler_blocking_operation_t);
+}
+
+static const rb_data_type_t blocking_operation_data_type = {
+ "Fiber::Scheduler::BlockingOperation",
+ {
+ NULL, // nothing to mark
+ RUBY_DEFAULT_FREE,
+ blocking_operation_memsize,
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
+};
+
+/*
+ * Allocate a new blocking operation
+ */
+static VALUE
+blocking_operation_alloc(VALUE klass)
+{
+ rb_fiber_scheduler_blocking_operation_t *blocking_operation;
+ VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
+
+ blocking_operation->function = NULL;
+ blocking_operation->data = NULL;
+ blocking_operation->unblock_function = NULL;
+ blocking_operation->data2 = NULL;
+ blocking_operation->flags = 0;
+ blocking_operation->state = NULL;
+ blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
+
+ return obj;
+}
+
+/*
+ * Get the blocking operation struct from a Ruby object
+ */
+static rb_fiber_scheduler_blocking_operation_t *
+get_blocking_operation(VALUE obj)
+{
+ rb_fiber_scheduler_blocking_operation_t *blocking_operation;
+ TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
+ return blocking_operation;
+}
+
+/*
+ * Document-method: Fiber::Scheduler::BlockingOperation#call
+ *
+ * Execute the blocking operation. This method releases the GVL and calls
+ * the blocking function, then restores the errno value.
+ *
+ * Returns nil. The actual result is stored in the associated state object.
+ */
+static VALUE
+blocking_operation_call(VALUE self)
+{
+ rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
+
+ if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
+ rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
+ }
+
+ if (blocking_operation->function == NULL) {
+ rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
+ }
+
+ if (blocking_operation->state == NULL) {
+ rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
+ }
+
+ // Mark as executing
+ blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
+
+ // Execute the blocking operation without GVL
+ blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
+ blocking_operation->unblock_function, blocking_operation->data2,
+ blocking_operation->flags);
+ blocking_operation->state->saved_errno = rb_errno();
+
+ // Mark as completed
+ blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
+
+ return Qnil;
+}
+
+/*
+ * C API: Extract blocking operation struct from Ruby object (GVL required)
+ *
+ * This function safely extracts the opaque struct from a BlockingOperation VALUE
+ * while holding the GVL. The returned pointer can be passed to worker threads
+ * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
+ *
+ * Returns the opaque struct pointer on success, NULL on error.
+ * Must be called while holding the GVL.
+ */
+rb_fiber_scheduler_blocking_operation_t *
+rb_fiber_scheduler_blocking_operation_extract(VALUE self)
+{
+ return get_blocking_operation(self);
+}
+
+/*
+ * C API: Execute blocking operation from opaque struct (GVL not required)
+ *
+ * This function executes a blocking operation using the opaque struct pointer
+ * obtained from rb_fiber_scheduler_blocking_operation_extract.
+ * It can be called from native threads without holding the GVL.
+ *
+ * Returns 0 on success, -1 on error.
+ */
+int
+rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
+{
+ if (blocking_operation == NULL) {
+ return -1;
+ }
+
+ if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
+ return -1; // Invalid blocking operation
+ }
+
+ // Resolve sentinel values for unblock_function and data2:
+ rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
+
+ // Atomically check if we can transition from QUEUED to EXECUTING
+ rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
+ if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
+ // Already cancelled or in wrong state
+ return -1;
+ }
+
+ // Now we're executing - call the function
+ blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
+ blocking_operation->state->saved_errno = errno;
+
+ // Atomically transition to completed (unless cancelled during execution)
+ expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
+ if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
+ // Successfully completed
+ return 0;
+ } else {
+ // Was cancelled during execution
+ blocking_operation->state->saved_errno = EINTR;
+ return -1;
+ }
+}
+
/*
+ * C API: Create a new blocking operation
+ *
+ * This creates a blocking operation that can be executed by native work pools.
+ * The blocking operation holds references to the function and data safely.
+ */
+VALUE
+rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
+ rb_unblock_function_t *unblock_function, void *data2,
+ int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
+{
+ VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
+ rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
+
+ blocking_operation->function = function;
+ blocking_operation->data = data;
+ blocking_operation->unblock_function = unblock_function;
+ blocking_operation->data2 = data2;
+ blocking_operation->flags = flags;
+ blocking_operation->state = state;
+
+ return self;
+}
+
+/*
+ *
* Document-class: Fiber::Scheduler
*
* This is not an existing class, but documentation of the interface that Scheduler
@@ -64,12 +280,15 @@ static ID id_fiber_schedule;
*
* Hook methods are:
*
- * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
+ * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close
* * #process_wait
* * #kernel_sleep
* * #timeout_after
* * #address_resolve
* * #block and #unblock
+ * * #blocking_operation_wait
+ * * #fiber_interrupt
+ * * #yield
* * (the list is expanded as Ruby developers make more methods having non-blocking calls)
*
* When not specified otherwise, the hook implementations are mandatory: if they are not
@@ -93,6 +312,7 @@ Init_Fiber_Scheduler(void)
id_block = rb_intern_const("block");
id_unblock = rb_intern_const("unblock");
+ id_yield = rb_intern_const("yield");
id_timeout_after = rb_intern_const("timeout_after");
id_kernel_sleep = rb_intern_const("kernel_sleep");
@@ -109,8 +329,20 @@ Init_Fiber_Scheduler(void)
id_address_resolve = rb_intern_const("address_resolve");
+ id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
+ id_fiber_interrupt = rb_intern_const("fiber_interrupt");
+
id_fiber_schedule = rb_intern_const("fiber");
+ // Define an anonymous BlockingOperation class for internal use only
+ // This is completely hidden from Ruby code and cannot be instantiated directly
+ rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
+ rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
+ rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
+
+ // Register the anonymous class as a GC root so it doesn't get collected
+ rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
+
#if 0 /* for RDoc */
rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
@@ -126,17 +358,21 @@ Init_Fiber_Scheduler(void)
rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
- rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler, -2);
+ rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
+ rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
+ rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0);
+ rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2);
+ rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1);
#endif
}
VALUE
rb_fiber_scheduler_get(void)
{
- VM_ASSERT(ruby_thread_has_gvl_p());
+ RUBY_ASSERT(ruby_thread_has_gvl_p());
rb_thread_t *thread = GET_THREAD();
- VM_ASSERT(thread);
+ RUBY_ASSERT(thread);
return thread->scheduler;
}
@@ -159,6 +395,10 @@ verify_interface(VALUE scheduler)
if (!rb_respond_to(scheduler, id_io_wait)) {
rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
}
+
+ if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
+ rb_warn("Scheduler should implement #fiber_interrupt");
+ }
}
static VALUE
@@ -179,10 +419,10 @@ fiber_scheduler_close_ensure(VALUE _thread)
VALUE
rb_fiber_scheduler_set(VALUE scheduler)
{
- VM_ASSERT(ruby_thread_has_gvl_p());
+ RUBY_ASSERT(ruby_thread_has_gvl_p());
rb_thread_t *thread = GET_THREAD();
- VM_ASSERT(thread);
+ RUBY_ASSERT(thread);
if (scheduler != Qnil) {
verify_interface(scheduler);
@@ -203,9 +443,9 @@ rb_fiber_scheduler_set(VALUE scheduler)
}
static VALUE
-rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
+fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
{
- VM_ASSERT(thread);
+ RUBY_ASSERT(thread);
if (thread->blocking == 0) {
return thread->scheduler;
@@ -215,15 +455,22 @@ rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
}
}
-VALUE
-rb_fiber_scheduler_current(void)
+VALUE rb_fiber_scheduler_current(void)
{
- return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
+ RUBY_ASSERT(ruby_thread_has_gvl_p());
+
+ return fiber_scheduler_current_for_threadptr(GET_THREAD());
}
+// This function is allowed to be called without holding the GVL.
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
{
- return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
+ return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
+}
+
+VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
+{
+ return fiber_scheduler_current_for_threadptr(thread);
}
/*
@@ -239,7 +486,7 @@ VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
VALUE
rb_fiber_scheduler_close(VALUE scheduler)
{
- VM_ASSERT(ruby_thread_has_gvl_p());
+ RUBY_ASSERT(ruby_thread_has_gvl_p());
VALUE result;
@@ -262,7 +509,7 @@ VALUE
rb_fiber_scheduler_make_timeout(struct timeval *timeout)
{
if (timeout) {
- return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
+ return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
}
return Qnil;
@@ -272,7 +519,7 @@ rb_fiber_scheduler_make_timeout(struct timeval *timeout)
* Document-method: Fiber::Scheduler#kernel_sleep
* call-seq: kernel_sleep(duration = nil)
*
- * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
+ * Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide
* an implementation of sleeping in a non-blocking way. Implementation might
* register the current fiber in some list of "which fiber wait until what
* moment", call Fiber.yield to pass control, and then in #close resume
@@ -291,6 +538,23 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
+/**
+ * Document-method: Fiber::Scheduler#yield
+ * call-seq: yield
+ *
+ * Yield to the scheduler, to be resumed on the next scheduling cycle.
+ */
+VALUE
+rb_fiber_scheduler_yield(VALUE scheduler)
+{
+ // First try to call the scheduler's yield method, if it exists:
+ VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
+ if (!UNDEF_P(result)) return result;
+
+ // Otherwise, we can emulate yield by sleeping for 0 seconds:
+ return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
+}
+
#if 0
/*
* Document-method: Fiber::Scheduler#timeout_after
@@ -314,7 +578,7 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
* However, as a result of this design, if the +block+ does not invoke any
* non-blocking operations, it will be impossible to interrupt it. If you
* desire to provide predictable points for timeouts, consider adding
- * +sleep(0)+.
+ * <tt>sleep(0)</tt>.
*
* If the block is executed successfully, its result will be returned.
*
@@ -369,7 +633,7 @@ rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
* Document-method: Fiber::Scheduler#block
* call-seq: block(blocker, timeout = nil)
*
- * Invoked by methods like Thread.join, and by Mutex, to signify that current
+ * Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current
* Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
* elapsed.
*
@@ -389,8 +653,8 @@ rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
* Document-method: Fiber::Scheduler#unblock
* call-seq: unblock(blocker, fiber)
*
- * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
- * calls #block and Mutex#unlock calls #unblock). The scheduler should use
+ * Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock
+ * calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use
* the +fiber+ parameter to understand which fiber is unblocked.
*
* +blocker+ is what was awaited for, but it is informational only (for debugging
@@ -401,9 +665,42 @@ rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
VALUE
rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
- VM_ASSERT(rb_obj_is_fiber(fiber));
+ RUBY_ASSERT(rb_obj_is_fiber(fiber));
+
+ VALUE result;
+ enum ruby_tag_type state;
+
+ // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
+ //
+ // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
+ int saved_errno = errno;
+
+ // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
+ rb_execution_context_t *ec = GET_EC();
+ int saved_interrupt_mask = ec->interrupt_mask;
+ ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
+
+ rb_control_frame_t *volatile cfp = ec->cfp;
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
+ }
+ else {
+ rb_vm_rewind_cfp(ec, cfp);
+ }
+ EC_POP_TAG();
- return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
+ ec->interrupt_mask = saved_interrupt_mask;
+
+ if (state) {
+ EC_JUMP_TAG(ec, state);
+ }
+
+ RUBY_VM_CHECK_INTS(ec);
+
+ errno = saved_errno;
+
+ return result;
}
/*
@@ -425,10 +722,25 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
* Expected to return the subset of events that are ready immediately.
*
*/
+static VALUE
+fiber_scheduler_io_wait(VALUE _argument) {
+ VALUE *arguments = (VALUE*)_argument;
+
+ return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
+}
+
VALUE
rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
- return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
+ VALUE arguments[] = {
+ scheduler, io, events, timeout
+ };
+
+ if (rb_respond_to(scheduler, id_fiber_interrupt)) {
+ return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
+ } else {
+ return fiber_scheduler_io_wait((VALUE)&arguments);
+ }
}
VALUE
@@ -498,18 +810,33 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
*
* The method should be considered _experimental_.
*/
+static VALUE
+fiber_scheduler_io_read(VALUE _argument) {
+ VALUE *arguments = (VALUE*)_argument;
+
+ return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
+}
+
VALUE
rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
+ if (!rb_respond_to(scheduler, id_io_read)) {
+ return RUBY_Qundef;
+ }
+
VALUE arguments[] = {
- io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
+ scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
- return rb_check_funcall(scheduler, id_io_read, 4, arguments);
+ if (rb_respond_to(scheduler, id_fiber_interrupt)) {
+ return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
+ } else {
+ return fiber_scheduler_io_read((VALUE)&arguments);
+ }
}
/*
- * Document-method: Fiber::Scheduler#io_read
+ * Document-method: Fiber::Scheduler#io_pread
* call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
*
* Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
@@ -522,18 +849,33 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt
*
* The method should be considered _experimental_.
*/
+static VALUE
+fiber_scheduler_io_pread(VALUE _argument) {
+ VALUE *arguments = (VALUE*)_argument;
+
+ return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
+}
+
VALUE
rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
+ if (!rb_respond_to(scheduler, id_io_pread)) {
+ return RUBY_Qundef;
+ }
+
VALUE arguments[] = {
- io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
+ scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
- return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
+ if (rb_respond_to(scheduler, id_fiber_interrupt)) {
+ return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
+ } else {
+ return fiber_scheduler_io_pread((VALUE)&arguments);
+ }
}
/*
- * Document-method: Scheduler#io_write
+ * Document-method: Fiber::Scheduler#io_write
* call-seq: io_write(io, buffer, length, offset) -> written length or -errno
*
* Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
@@ -560,14 +902,29 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff
*
* The method should be considered _experimental_.
*/
+static VALUE
+fiber_scheduler_io_write(VALUE _argument) {
+ VALUE *arguments = (VALUE*)_argument;
+
+ return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
+}
+
VALUE
rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
{
+ if (!rb_respond_to(scheduler, id_io_write)) {
+ return RUBY_Qundef;
+ }
+
VALUE arguments[] = {
- io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
+ scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
};
- return rb_check_funcall(scheduler, id_io_write, 4, arguments);
+ if (rb_respond_to(scheduler, id_fiber_interrupt)) {
+ return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
+ } else {
+ return fiber_scheduler_io_write((VALUE)&arguments);
+ }
}
/*
@@ -585,14 +942,31 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng
* The method should be considered _experimental_.
*
*/
+static VALUE
+fiber_scheduler_io_pwrite(VALUE _argument) {
+ VALUE *arguments = (VALUE*)_argument;
+
+ return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
+}
+
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
+
+
+ if (!rb_respond_to(scheduler, id_io_pwrite)) {
+ return RUBY_Qundef;
+ }
+
VALUE arguments[] = {
- io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
+ scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
};
- return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
+ if (rb_respond_to(scheduler, id_fiber_interrupt)) {
+ return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
+ } else {
+ return fiber_scheduler_io_pwrite((VALUE)&arguments);
+ }
}
VALUE
@@ -643,6 +1017,14 @@ rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, co
return result;
}
+/*
+ * Document-method: Fiber::Scheduler#io_close
+ * call-seq: io_close(fd)
+ *
+ * Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that
+ * the method will receive an integer file descriptor of the closed object, not an object
+ * itself.
+ */
VALUE
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
{
@@ -694,6 +1076,101 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
}
/*
+ * Document-method: Fiber::Scheduler#blocking_operation_wait
+ * call-seq: blocking_operation_wait(blocking_operation)
+ *
+ * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
+ * The blocking_operation is an opaque object that encapsulates the blocking operation
+ * and responds to a <tt>#call</tt> method without any arguments.
+ *
+ * If the scheduler doesn't implement this method, or if the scheduler doesn't execute
+ * the blocking operation, Ruby will fall back to the non-scheduler implementation.
+ *
+ * Minimal suggested implementation is:
+ *
+ * def blocking_operation_wait(blocking_operation)
+ * Thread.new { blocking_operation.call }.join
+ * end
+ */
+VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
+{
+ // Check if scheduler supports blocking_operation_wait before creating the object
+ if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
+ return Qundef;
+ }
+
+ // Create a new BlockingOperation with the blocking operation
+ VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
+
+ VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
+
+ // Get the operation data to check if it was executed
+ rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
+ rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
+
+ // Invalidate the operation now that we're done with it
+ operation->function = NULL;
+ operation->state = NULL;
+ operation->data = NULL;
+ operation->data2 = NULL;
+ operation->unblock_function = NULL;
+
+ // Ensure that the blocking operation remains visible until this point:
+ RB_GC_GUARD(blocking_operation);
+
+ // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
+ if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
+ return Qundef;
+ }
+
+ return result;
+}
+
+/*
+ * Document-method: Fiber::Scheduler#fiber_interrupt
+ * call-seq: fiber_interrupt(fiber, exception)
+ *
+ * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted
+ * with an exception. For example, IO#close uses this method to interrupt fibers that are performing
+ * blocking IO operations.
+ *
+ */
+VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
+{
+ VALUE arguments[] = {
+ fiber, exception
+ };
+
+ VALUE result;
+ enum ruby_tag_type state;
+
+ // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
+ rb_execution_context_t *ec = GET_EC();
+ int saved_interrupt_mask = ec->interrupt_mask;
+ ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
+
+ rb_control_frame_t *volatile cfp = ec->cfp;
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
+ }
+ else {
+ rb_vm_rewind_cfp(ec, cfp);
+ }
+ EC_POP_TAG();
+
+ ec->interrupt_mask = saved_interrupt_mask;
+
+ if (state) {
+ EC_JUMP_TAG(ec, state);
+ }
+
+ RUBY_VM_CHECK_INTS(ec);
+
+ return result;
+}
+
+/*
* Document-method: Fiber::Scheduler#fiber
* call-seq: fiber(&block)
*
@@ -713,3 +1190,54 @@ rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
{
return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
}
+
+/*
+ * C API: Cancel a blocking operation
+ *
+ * This function cancels a blocking operation. If the operation is queued,
+ * it just marks it as cancelled. If it's executing, it marks it as cancelled
+ * and calls the unblock function to interrupt the operation.
+ *
+ * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
+ */
+int
+rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
+{
+ if (blocking_operation == NULL) {
+ return -1;
+ }
+
+ rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
+
+ switch (current_state) {
+ case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
+ // Work hasn't started - just mark as cancelled:
+ if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
+ // Successfully cancelled before execution:
+ return 0;
+ }
+ // Fall through if state changed between load and CAS
+
+ case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
+ // Work is running - mark cancelled AND call unblock function
+ if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
+ // State changed between load and CAS - operation may have completed:
+ return 0;
+ }
+ // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
+ rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
+ if (unblock_function) {
+ RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
+ blocking_operation->unblock_function(blocking_operation->data2);
+ }
+ // Cancelled during execution (unblock function called):
+ return 1;
+
+ case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
+ case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
+ // Already finished or cancelled:
+ return 0;
+ }
+
+ return 0;
+}