diff options
Diffstat (limited to 'scheduler.c')
| -rw-r--r-- | scheduler.c | 592 |
1 files changed, 560 insertions, 32 deletions
diff --git a/scheduler.c b/scheduler.c index 3159635dba..7efd4274cb 100644 --- a/scheduler.c +++ b/scheduler.c @@ -9,18 +9,27 @@ **********************************************************************/ #include "vm_core.h" +#include "eval_intern.h" #include "ruby/fiber/scheduler.h" #include "ruby/io.h" #include "ruby/io/buffer.h" +#include "ruby/thread.h" + +// For `ruby_thread_has_gvl_p`: #include "internal/thread.h" +// For atomic operations: +#include "ruby_atomic.h" + static ID id_close; static ID id_scheduler_close; static ID id_block; static ID id_unblock; +static ID id_yield; + static ID id_timeout_after; static ID id_kernel_sleep; static ID id_process_wait; @@ -33,9 +42,216 @@ static ID id_io_close; static ID id_address_resolve; +static ID id_blocking_operation_wait; +static ID id_fiber_interrupt; + static ID id_fiber_schedule; +// Our custom blocking operation class +static VALUE rb_cFiberSchedulerBlockingOperation; + +/* + * Custom blocking operation structure for blocking operations + * This replaces the use of Ruby procs to avoid use-after-free issues + * and provides a cleaner C API for native work pools. + */ + +typedef enum { + RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started + RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running + RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error) + RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled +} rb_fiber_blocking_operation_status_t; + +struct rb_fiber_scheduler_blocking_operation { + void *(*function)(void *); + void *data; + + rb_unblock_function_t *unblock_function; + void *data2; + + int flags; + struct rb_fiber_scheduler_blocking_operation_state *state; + + // Execution status + volatile rb_atomic_t status; +}; + +static size_t +blocking_operation_memsize(const void *ptr) +{ + return sizeof(rb_fiber_scheduler_blocking_operation_t); +} + +static const rb_data_type_t blocking_operation_data_type = { + "Fiber::Scheduler::BlockingOperation", + { + NULL, // nothing to mark + RUBY_DEFAULT_FREE, + blocking_operation_memsize, + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE +}; + +/* + * Allocate a new blocking operation + */ +static VALUE +blocking_operation_alloc(VALUE klass) +{ + rb_fiber_scheduler_blocking_operation_t *blocking_operation; + VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation); + + blocking_operation->function = NULL; + blocking_operation->data = NULL; + blocking_operation->unblock_function = NULL; + blocking_operation->data2 = NULL; + blocking_operation->flags = 0; + blocking_operation->state = NULL; + blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED; + + return obj; +} + +/* + * Get the blocking operation struct from a Ruby object + */ +static rb_fiber_scheduler_blocking_operation_t * +get_blocking_operation(VALUE obj) +{ + rb_fiber_scheduler_blocking_operation_t *blocking_operation; + TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation); + return blocking_operation; +} + +/* + * Document-method: Fiber::Scheduler::BlockingOperation#call + * + * Execute the blocking operation. This method releases the GVL and calls + * the blocking function, then restores the errno value. + * + * Returns nil. The actual result is stored in the associated state object. + */ +static VALUE +blocking_operation_call(VALUE self) +{ + rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self); + + if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) { + rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!"); + } + + if (blocking_operation->function == NULL) { + rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!"); + } + + if (blocking_operation->state == NULL) { + rb_raise(rb_eRuntimeError, "Blocking operation has no result object!"); + } + + // Mark as executing + blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING; + + // Execute the blocking operation without GVL + blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data, + blocking_operation->unblock_function, blocking_operation->data2, + blocking_operation->flags); + blocking_operation->state->saved_errno = rb_errno(); + + // Mark as completed + blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED; + + return Qnil; +} + +/* + * C API: Extract blocking operation struct from Ruby object (GVL required) + * + * This function safely extracts the opaque struct from a BlockingOperation VALUE + * while holding the GVL. The returned pointer can be passed to worker threads + * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl. + * + * Returns the opaque struct pointer on success, NULL on error. + * Must be called while holding the GVL. + */ +rb_fiber_scheduler_blocking_operation_t * +rb_fiber_scheduler_blocking_operation_extract(VALUE self) +{ + return get_blocking_operation(self); +} + +/* + * C API: Execute blocking operation from opaque struct (GVL not required) + * + * This function executes a blocking operation using the opaque struct pointer + * obtained from rb_fiber_scheduler_blocking_operation_extract. + * It can be called from native threads without holding the GVL. + * + * Returns 0 on success, -1 on error. + */ +int +rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation) +{ + if (blocking_operation == NULL) { + return -1; + } + + if (blocking_operation->function == NULL || blocking_operation->state == NULL) { + return -1; // Invalid blocking operation + } + + // Resolve sentinel values for unblock_function and data2: + rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD()); + + // Atomically check if we can transition from QUEUED to EXECUTING + rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED; + if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) { + // Already cancelled or in wrong state + return -1; + } + + // Now we're executing - call the function + blocking_operation->state->result = blocking_operation->function(blocking_operation->data); + blocking_operation->state->saved_errno = errno; + + // Atomically transition to completed (unless cancelled during execution) + expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING; + if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) { + // Successfully completed + return 0; + } else { + // Was cancelled during execution + blocking_operation->state->saved_errno = EINTR; + return -1; + } +} + /* + * C API: Create a new blocking operation + * + * This creates a blocking operation that can be executed by native work pools. + * The blocking operation holds references to the function and data safely. + */ +VALUE +rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data, + rb_unblock_function_t *unblock_function, void *data2, + int flags, struct rb_fiber_scheduler_blocking_operation_state *state) +{ + VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation); + rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self); + + blocking_operation->function = function; + blocking_operation->data = data; + blocking_operation->unblock_function = unblock_function; + blocking_operation->data2 = data2; + blocking_operation->flags = flags; + blocking_operation->state = state; + + return self; +} + +/* + * * Document-class: Fiber::Scheduler * * This is not an existing class, but documentation of the interface that Scheduler @@ -64,12 +280,15 @@ static ID id_fiber_schedule; * * Hook methods are: * - * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close + * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close * * #process_wait * * #kernel_sleep * * #timeout_after * * #address_resolve * * #block and #unblock + * * #blocking_operation_wait + * * #fiber_interrupt + * * #yield * * (the list is expanded as Ruby developers make more methods having non-blocking calls) * * When not specified otherwise, the hook implementations are mandatory: if they are not @@ -93,6 +312,7 @@ Init_Fiber_Scheduler(void) id_block = rb_intern_const("block"); id_unblock = rb_intern_const("unblock"); + id_yield = rb_intern_const("yield"); id_timeout_after = rb_intern_const("timeout_after"); id_kernel_sleep = rb_intern_const("kernel_sleep"); @@ -109,8 +329,20 @@ Init_Fiber_Scheduler(void) id_address_resolve = rb_intern_const("address_resolve"); + id_blocking_operation_wait = rb_intern_const("blocking_operation_wait"); + id_fiber_interrupt = rb_intern_const("fiber_interrupt"); + id_fiber_schedule = rb_intern_const("fiber"); + // Define an anonymous BlockingOperation class for internal use only + // This is completely hidden from Ruby code and cannot be instantiated directly + rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject); + rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc); + rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0); + + // Register the anonymous class as a GC root so it doesn't get collected + rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation); + #if 0 /* for RDoc */ rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject); rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0); @@ -126,17 +358,21 @@ Init_Fiber_Scheduler(void) rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3); rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2); rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2); - rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler, -2); + rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2); + rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2); + rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0); + rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2); + rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1); #endif } VALUE rb_fiber_scheduler_get(void) { - VM_ASSERT(ruby_thread_has_gvl_p()); + RUBY_ASSERT(ruby_thread_has_gvl_p()); rb_thread_t *thread = GET_THREAD(); - VM_ASSERT(thread); + RUBY_ASSERT(thread); return thread->scheduler; } @@ -159,6 +395,10 @@ verify_interface(VALUE scheduler) if (!rb_respond_to(scheduler, id_io_wait)) { rb_raise(rb_eArgError, "Scheduler must implement #io_wait"); } + + if (!rb_respond_to(scheduler, id_fiber_interrupt)) { + rb_warn("Scheduler should implement #fiber_interrupt"); + } } static VALUE @@ -179,10 +419,10 @@ fiber_scheduler_close_ensure(VALUE _thread) VALUE rb_fiber_scheduler_set(VALUE scheduler) { - VM_ASSERT(ruby_thread_has_gvl_p()); + RUBY_ASSERT(ruby_thread_has_gvl_p()); rb_thread_t *thread = GET_THREAD(); - VM_ASSERT(thread); + RUBY_ASSERT(thread); if (scheduler != Qnil) { verify_interface(scheduler); @@ -203,9 +443,9 @@ rb_fiber_scheduler_set(VALUE scheduler) } static VALUE -rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) +fiber_scheduler_current_for_threadptr(rb_thread_t *thread) { - VM_ASSERT(thread); + RUBY_ASSERT(thread); if (thread->blocking == 0) { return thread->scheduler; @@ -215,15 +455,22 @@ rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) } } -VALUE -rb_fiber_scheduler_current(void) +VALUE rb_fiber_scheduler_current(void) { - return rb_fiber_scheduler_current_for_threadptr(GET_THREAD()); + RUBY_ASSERT(ruby_thread_has_gvl_p()); + + return fiber_scheduler_current_for_threadptr(GET_THREAD()); } +// This function is allowed to be called without holding the GVL. VALUE rb_fiber_scheduler_current_for_thread(VALUE thread) { - return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread)); + return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread)); +} + +VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) +{ + return fiber_scheduler_current_for_threadptr(thread); } /* @@ -239,7 +486,7 @@ VALUE rb_fiber_scheduler_current_for_thread(VALUE thread) VALUE rb_fiber_scheduler_close(VALUE scheduler) { - VM_ASSERT(ruby_thread_has_gvl_p()); + RUBY_ASSERT(ruby_thread_has_gvl_p()); VALUE result; @@ -262,7 +509,7 @@ VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout) { if (timeout) { - return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec)); + return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec)); } return Qnil; @@ -272,7 +519,7 @@ rb_fiber_scheduler_make_timeout(struct timeval *timeout) * Document-method: Fiber::Scheduler#kernel_sleep * call-seq: kernel_sleep(duration = nil) * - * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide + * Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide * an implementation of sleeping in a non-blocking way. Implementation might * register the current fiber in some list of "which fiber wait until what * moment", call Fiber.yield to pass control, and then in #close resume @@ -291,6 +538,23 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); } +/** + * Document-method: Fiber::Scheduler#yield + * call-seq: yield + * + * Yield to the scheduler, to be resumed on the next scheduling cycle. + */ +VALUE +rb_fiber_scheduler_yield(VALUE scheduler) +{ + // First try to call the scheduler's yield method, if it exists: + VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL); + if (!UNDEF_P(result)) return result; + + // Otherwise, we can emulate yield by sleeping for 0 seconds: + return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0)); +} + #if 0 /* * Document-method: Fiber::Scheduler#timeout_after @@ -314,7 +578,7 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) * However, as a result of this design, if the +block+ does not invoke any * non-blocking operations, it will be impossible to interrupt it. If you * desire to provide predictable points for timeouts, consider adding - * +sleep(0)+. + * <tt>sleep(0)</tt>. * * If the block is executed successfully, its result will be returned. * @@ -369,7 +633,7 @@ rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) * Document-method: Fiber::Scheduler#block * call-seq: block(blocker, timeout = nil) * - * Invoked by methods like Thread.join, and by Mutex, to signify that current + * Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has * elapsed. * @@ -389,8 +653,8 @@ rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) * Document-method: Fiber::Scheduler#unblock * call-seq: unblock(blocker, fiber) * - * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock - * calls #block and Mutex#unlock calls #unblock). The scheduler should use + * Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock + * calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use * the +fiber+ parameter to understand which fiber is unblocked. * * +blocker+ is what was awaited for, but it is informational only (for debugging @@ -401,9 +665,42 @@ rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) { - VM_ASSERT(rb_obj_is_fiber(fiber)); + RUBY_ASSERT(rb_obj_is_fiber(fiber)); + + VALUE result; + enum ruby_tag_type state; + + // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`. + // + // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it. + int saved_errno = errno; + + // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`. + rb_execution_context_t *ec = GET_EC(); + int saved_interrupt_mask = ec->interrupt_mask; + ec->interrupt_mask |= PENDING_INTERRUPT_MASK; + + rb_control_frame_t *volatile cfp = ec->cfp; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber); + } + else { + rb_vm_rewind_cfp(ec, cfp); + } + EC_POP_TAG(); - return rb_funcall(scheduler, id_unblock, 2, blocker, fiber); + ec->interrupt_mask = saved_interrupt_mask; + + if (state) { + EC_JUMP_TAG(ec, state); + } + + RUBY_VM_CHECK_INTS(ec); + + errno = saved_errno; + + return result; } /* @@ -425,10 +722,25 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) * Expected to return the subset of events that are ready immediately. * */ +static VALUE +fiber_scheduler_io_wait(VALUE _argument) { + VALUE *arguments = (VALUE*)_argument; + + return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1); +} + VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { - return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout); + VALUE arguments[] = { + scheduler, io, events, timeout + }; + + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments); + } else { + return fiber_scheduler_io_wait((VALUE)&arguments); + } } VALUE @@ -498,18 +810,33 @@ VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv) * * The method should be considered _experimental_. */ +static VALUE +fiber_scheduler_io_read(VALUE _argument) { + VALUE *arguments = (VALUE*)_argument; + + return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1); +} + VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { + if (!rb_respond_to(scheduler, id_io_read)) { + return RUBY_Qundef; + } + VALUE arguments[] = { - io, buffer, SIZET2NUM(length), SIZET2NUM(offset) + scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_read, 4, arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments); + } else { + return fiber_scheduler_io_read((VALUE)&arguments); + } } /* - * Document-method: Fiber::Scheduler#io_read + * Document-method: Fiber::Scheduler#io_pread * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno * * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+ @@ -522,18 +849,33 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt * * The method should be considered _experimental_. */ +static VALUE +fiber_scheduler_io_pread(VALUE _argument) { + VALUE *arguments = (VALUE*)_argument; + + return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1); +} + VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { + if (!rb_respond_to(scheduler, id_io_pread)) { + return RUBY_Qundef; + } + VALUE arguments[] = { - io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) + scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_pread, 5, arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments); + } else { + return fiber_scheduler_io_pread((VALUE)&arguments); + } } /* - * Document-method: Scheduler#io_write + * Document-method: Fiber::Scheduler#io_write * call-seq: io_write(io, buffer, length, offset) -> written length or -errno * * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from @@ -560,14 +902,29 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff * * The method should be considered _experimental_. */ +static VALUE +fiber_scheduler_io_write(VALUE _argument) { + VALUE *arguments = (VALUE*)_argument; + + return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1); +} + VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset) { + if (!rb_respond_to(scheduler, id_io_write)) { + return RUBY_Qundef; + } + VALUE arguments[] = { - io, buffer, SIZET2NUM(length), SIZET2NUM(offset) + scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_write, 4, arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments); + } else { + return fiber_scheduler_io_write((VALUE)&arguments); + } } /* @@ -585,14 +942,31 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng * The method should be considered _experimental_. * */ +static VALUE +fiber_scheduler_io_pwrite(VALUE _argument) { + VALUE *arguments = (VALUE*)_argument; + + return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1); +} + VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { + + + if (!rb_respond_to(scheduler, id_io_pwrite)) { + return RUBY_Qundef; + } + VALUE arguments[] = { - io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) + scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments); + } else { + return fiber_scheduler_io_pwrite((VALUE)&arguments); + } } VALUE @@ -643,6 +1017,14 @@ rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, co return result; } +/* + * Document-method: Fiber::Scheduler#io_close + * call-seq: io_close(fd) + * + * Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that + * the method will receive an integer file descriptor of the closed object, not an object + * itself. + */ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io) { @@ -694,6 +1076,101 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) } /* + * Document-method: Fiber::Scheduler#blocking_operation_wait + * call-seq: blocking_operation_wait(blocking_operation) + * + * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way. + * The blocking_operation is an opaque object that encapsulates the blocking operation + * and responds to a <tt>#call</tt> method without any arguments. + * + * If the scheduler doesn't implement this method, or if the scheduler doesn't execute + * the blocking operation, Ruby will fall back to the non-scheduler implementation. + * + * Minimal suggested implementation is: + * + * def blocking_operation_wait(blocking_operation) + * Thread.new { blocking_operation.call }.join + * end + */ +VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state) +{ + // Check if scheduler supports blocking_operation_wait before creating the object + if (!rb_respond_to(scheduler, id_blocking_operation_wait)) { + return Qundef; + } + + // Create a new BlockingOperation with the blocking operation + VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state); + + VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation); + + // Get the operation data to check if it was executed + rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation); + rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status); + + // Invalidate the operation now that we're done with it + operation->function = NULL; + operation->state = NULL; + operation->data = NULL; + operation->data2 = NULL; + operation->unblock_function = NULL; + + // Ensure that the blocking operation remains visible until this point: + RB_GC_GUARD(blocking_operation); + + // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead + if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) { + return Qundef; + } + + return result; +} + +/* + * Document-method: Fiber::Scheduler#fiber_interrupt + * call-seq: fiber_interrupt(fiber, exception) + * + * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted + * with an exception. For example, IO#close uses this method to interrupt fibers that are performing + * blocking IO operations. + * + */ +VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception) +{ + VALUE arguments[] = { + fiber, exception + }; + + VALUE result; + enum ruby_tag_type state; + + // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`. + rb_execution_context_t *ec = GET_EC(); + int saved_interrupt_mask = ec->interrupt_mask; + ec->interrupt_mask |= PENDING_INTERRUPT_MASK; + + rb_control_frame_t *volatile cfp = ec->cfp; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments); + } + else { + rb_vm_rewind_cfp(ec, cfp); + } + EC_POP_TAG(); + + ec->interrupt_mask = saved_interrupt_mask; + + if (state) { + EC_JUMP_TAG(ec, state); + } + + RUBY_VM_CHECK_INTS(ec); + + return result; +} + +/* * Document-method: Fiber::Scheduler#fiber * call-seq: fiber(&block) * @@ -713,3 +1190,54 @@ rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat) { return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat); } + +/* + * C API: Cancel a blocking operation + * + * This function cancels a blocking operation. If the operation is queued, + * it just marks it as cancelled. If it's executing, it marks it as cancelled + * and calls the unblock function to interrupt the operation. + * + * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error. + */ +int +rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation) +{ + if (blocking_operation == NULL) { + return -1; + } + + rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status); + + switch (current_state) { + case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED: + // Work hasn't started - just mark as cancelled: + if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) { + // Successfully cancelled before execution: + return 0; + } + // Fall through if state changed between load and CAS + + case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING: + // Work is running - mark cancelled AND call unblock function + if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) { + // State changed between load and CAS - operation may have completed: + return 0; + } + // Otherwise, we successfully marked it as cancelled, so we can call the unblock function: + rb_unblock_function_t *unblock_function = blocking_operation->unblock_function; + if (unblock_function) { + RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier"); + blocking_operation->unblock_function(blocking_operation->data2); + } + // Cancelled during execution (unblock function called): + return 1; + + case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED: + case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED: + // Already finished or cancelled: + return 0; + } + + return 0; +} |
