diff options
Diffstat (limited to 'scheduler.c')
| -rw-r--r-- | scheduler.c | 156 |
1 files changed, 118 insertions, 38 deletions
diff --git a/scheduler.c b/scheduler.c index ddb205da88..7efd4274cb 100644 --- a/scheduler.c +++ b/scheduler.c @@ -9,6 +9,7 @@ **********************************************************************/ #include "vm_core.h" +#include "eval_intern.h" #include "ruby/fiber/scheduler.h" #include "ruby/io.h" #include "ruby/io/buffer.h" @@ -27,6 +28,8 @@ static ID id_scheduler_close; static ID id_block; static ID id_unblock; +static ID id_yield; + static ID id_timeout_after; static ID id_kernel_sleep; static ID id_process_wait; @@ -74,19 +77,6 @@ struct rb_fiber_scheduler_blocking_operation { volatile rb_atomic_t status; }; -static void -blocking_operation_mark(void *ptr) -{ - // No Ruby objects to mark in our struct -} - -static void -blocking_operation_free(void *ptr) -{ - rb_fiber_scheduler_blocking_operation_t *blocking_operation = (rb_fiber_scheduler_blocking_operation_t *)ptr; - ruby_xfree(blocking_operation); -} - static size_t blocking_operation_memsize(const void *ptr) { @@ -96,11 +86,11 @@ blocking_operation_memsize(const void *ptr) static const rb_data_type_t blocking_operation_data_type = { "Fiber::Scheduler::BlockingOperation", { - blocking_operation_mark, - blocking_operation_free, + NULL, // nothing to mark + RUBY_DEFAULT_FREE, blocking_operation_memsize, }, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE }; /* @@ -290,13 +280,15 @@ rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data, * * Hook methods are: * - * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close + * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close * * #process_wait * * #kernel_sleep * * #timeout_after * * #address_resolve * * #block and #unblock * * #blocking_operation_wait + * * #fiber_interrupt + * * #yield * * (the list is expanded as Ruby developers make more methods having non-blocking calls) * * When not specified otherwise, the hook implementations are mandatory: if they are not @@ -320,6 +312,7 @@ Init_Fiber_Scheduler(void) id_block = rb_intern_const("block"); id_unblock = rb_intern_const("unblock"); + id_yield = rb_intern_const("yield"); id_timeout_after = rb_intern_const("timeout_after"); id_kernel_sleep = rb_intern_const("kernel_sleep"); @@ -367,6 +360,9 @@ Init_Fiber_Scheduler(void) rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2); rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2); rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2); + rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0); + rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2); + rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1); #endif } @@ -447,7 +443,7 @@ rb_fiber_scheduler_set(VALUE scheduler) } static VALUE -rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) +fiber_scheduler_current_for_threadptr(rb_thread_t *thread) { RUBY_ASSERT(thread); @@ -459,15 +455,22 @@ rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) } } -VALUE -rb_fiber_scheduler_current(void) +VALUE rb_fiber_scheduler_current(void) { - return rb_fiber_scheduler_current_for_threadptr(GET_THREAD()); + RUBY_ASSERT(ruby_thread_has_gvl_p()); + + return fiber_scheduler_current_for_threadptr(GET_THREAD()); } +// This function is allowed to be called without holding the GVL. VALUE rb_fiber_scheduler_current_for_thread(VALUE thread) { - return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread)); + return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread)); +} + +VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread) +{ + return fiber_scheduler_current_for_threadptr(thread); } /* @@ -516,7 +519,7 @@ rb_fiber_scheduler_make_timeout(struct timeval *timeout) * Document-method: Fiber::Scheduler#kernel_sleep * call-seq: kernel_sleep(duration = nil) * - * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide + * Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide * an implementation of sleeping in a non-blocking way. Implementation might * register the current fiber in some list of "which fiber wait until what * moment", call Fiber.yield to pass control, and then in #close resume @@ -535,6 +538,23 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); } +/** + * Document-method: Fiber::Scheduler#yield + * call-seq: yield + * + * Yield to the scheduler, to be resumed on the next scheduling cycle. + */ +VALUE +rb_fiber_scheduler_yield(VALUE scheduler) +{ + // First try to call the scheduler's yield method, if it exists: + VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL); + if (!UNDEF_P(result)) return result; + + // Otherwise, we can emulate yield by sleeping for 0 seconds: + return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0)); +} + #if 0 /* * Document-method: Fiber::Scheduler#timeout_after @@ -558,7 +578,7 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) * However, as a result of this design, if the +block+ does not invoke any * non-blocking operations, it will be impossible to interrupt it. If you * desire to provide predictable points for timeouts, consider adding - * +sleep(0)+. + * <tt>sleep(0)</tt>. * * If the block is executed successfully, its result will be returned. * @@ -613,7 +633,7 @@ rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags) * Document-method: Fiber::Scheduler#block * call-seq: block(blocker, timeout = nil) * - * Invoked by methods like Thread.join, and by Mutex, to signify that current + * Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has * elapsed. * @@ -633,8 +653,8 @@ rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) * Document-method: Fiber::Scheduler#unblock * call-seq: unblock(blocker, fiber) * - * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock - * calls #block and Mutex#unlock calls #unblock). The scheduler should use + * Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock + * calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use * the +fiber+ parameter to understand which fiber is unblocked. * * +blocker+ is what was awaited for, but it is informational only (for debugging @@ -647,18 +667,36 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) { RUBY_ASSERT(rb_obj_is_fiber(fiber)); + VALUE result; + enum ruby_tag_type state; + // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`. + // // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it. int saved_errno = errno; -#ifdef RUBY_DEBUG + // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`. rb_execution_context_t *ec = GET_EC(); - if (RUBY_VM_INTERRUPTED(ec)) { - rb_bug("rb_fiber_scheduler_unblock called with pending interrupt"); + int saved_interrupt_mask = ec->interrupt_mask; + ec->interrupt_mask |= PENDING_INTERRUPT_MASK; + + rb_control_frame_t *volatile cfp = ec->cfp; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber); } -#endif + else { + rb_vm_rewind_cfp(ec, cfp); + } + EC_POP_TAG(); - VALUE result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber); + ec->interrupt_mask = saved_interrupt_mask; + + if (state) { + EC_JUMP_TAG(ec, state); + } + + RUBY_VM_CHECK_INTS(ec); errno = saved_errno; @@ -914,6 +952,8 @@ fiber_scheduler_io_pwrite(VALUE _argument) { VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset) { + + if (!rb_respond_to(scheduler, id_io_pwrite)) { return RUBY_Qundef; } @@ -977,6 +1017,14 @@ rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, co return result; } +/* + * Document-method: Fiber::Scheduler#io_close + * call-seq: io_close(fd) + * + * Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that + * the method will receive an integer file descriptor of the closed object, not an object + * itself. + */ VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io) { @@ -1032,7 +1080,8 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname) * call-seq: blocking_operation_wait(blocking_operation) * * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way. - * The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation. + * The blocking_operation is an opaque object that encapsulates the blocking operation + * and responds to a <tt>#call</tt> method without any arguments. * * If the scheduler doesn't implement this method, or if the scheduler doesn't execute * the blocking operation, Ruby will fall back to the non-scheduler implementation. @@ -1066,6 +1115,9 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi operation->data2 = NULL; operation->unblock_function = NULL; + // Ensure that the blocking operation remains visible until this point: + RB_GC_GUARD(blocking_operation); + // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) { return Qundef; @@ -1074,20 +1126,48 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi return result; } +/* + * Document-method: Fiber::Scheduler#fiber_interrupt + * call-seq: fiber_interrupt(fiber, exception) + * + * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted + * with an exception. For example, IO#close uses this method to interrupt fibers that are performing + * blocking IO operations. + * + */ VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception) { VALUE arguments[] = { fiber, exception }; -#ifdef RUBY_DEBUG + VALUE result; + enum ruby_tag_type state; + + // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`. rb_execution_context_t *ec = GET_EC(); - if (RUBY_VM_INTERRUPTED(ec)) { - rb_bug("rb_fiber_scheduler_fiber_interrupt called with pending interrupt"); + int saved_interrupt_mask = ec->interrupt_mask; + ec->interrupt_mask |= PENDING_INTERRUPT_MASK; + + rb_control_frame_t *volatile cfp = ec->cfp; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments); } -#endif + else { + rb_vm_rewind_cfp(ec, cfp); + } + EC_POP_TAG(); + + ec->interrupt_mask = saved_interrupt_mask; + + if (state) { + EC_JUMP_TAG(ec, state); + } + + RUBY_VM_CHECK_INTS(ec); - return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments); + return result; } /* |
