summaryrefslogtreecommitdiff
path: root/scheduler.c
diff options
context:
space:
mode:
Diffstat (limited to 'scheduler.c')
-rw-r--r--scheduler.c156
1 files changed, 118 insertions, 38 deletions
diff --git a/scheduler.c b/scheduler.c
index ddb205da88..7efd4274cb 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -9,6 +9,7 @@
**********************************************************************/
#include "vm_core.h"
+#include "eval_intern.h"
#include "ruby/fiber/scheduler.h"
#include "ruby/io.h"
#include "ruby/io/buffer.h"
@@ -27,6 +28,8 @@ static ID id_scheduler_close;
static ID id_block;
static ID id_unblock;
+static ID id_yield;
+
static ID id_timeout_after;
static ID id_kernel_sleep;
static ID id_process_wait;
@@ -74,19 +77,6 @@ struct rb_fiber_scheduler_blocking_operation {
volatile rb_atomic_t status;
};
-static void
-blocking_operation_mark(void *ptr)
-{
- // No Ruby objects to mark in our struct
-}
-
-static void
-blocking_operation_free(void *ptr)
-{
- rb_fiber_scheduler_blocking_operation_t *blocking_operation = (rb_fiber_scheduler_blocking_operation_t *)ptr;
- ruby_xfree(blocking_operation);
-}
-
static size_t
blocking_operation_memsize(const void *ptr)
{
@@ -96,11 +86,11 @@ blocking_operation_memsize(const void *ptr)
static const rb_data_type_t blocking_operation_data_type = {
"Fiber::Scheduler::BlockingOperation",
{
- blocking_operation_mark,
- blocking_operation_free,
+ NULL, // nothing to mark
+ RUBY_DEFAULT_FREE,
blocking_operation_memsize,
},
- 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
};
/*
@@ -290,13 +280,15 @@ rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
*
* Hook methods are:
*
- * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
+ * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite #io_select, and #io_close
* * #process_wait
* * #kernel_sleep
* * #timeout_after
* * #address_resolve
* * #block and #unblock
* * #blocking_operation_wait
+ * * #fiber_interrupt
+ * * #yield
* * (the list is expanded as Ruby developers make more methods having non-blocking calls)
*
* When not specified otherwise, the hook implementations are mandatory: if they are not
@@ -320,6 +312,7 @@ Init_Fiber_Scheduler(void)
id_block = rb_intern_const("block");
id_unblock = rb_intern_const("unblock");
+ id_yield = rb_intern_const("yield");
id_timeout_after = rb_intern_const("timeout_after");
id_kernel_sleep = rb_intern_const("kernel_sleep");
@@ -367,6 +360,9 @@ Init_Fiber_Scheduler(void)
rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
+ rb_define_method(rb_cFiberScheduler, "yield", rb_fiber_scheduler_yield, 0);
+ rb_define_method(rb_cFiberScheduler, "fiber_interrupt", rb_fiber_scheduler_fiber_interrupt, 2);
+ rb_define_method(rb_cFiberScheduler, "io_close", rb_fiber_scheduler_io_close, 1);
#endif
}
@@ -447,7 +443,7 @@ rb_fiber_scheduler_set(VALUE scheduler)
}
static VALUE
-rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
+fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
{
RUBY_ASSERT(thread);
@@ -459,15 +455,22 @@ rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
}
}
-VALUE
-rb_fiber_scheduler_current(void)
+VALUE rb_fiber_scheduler_current(void)
{
- return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
+ RUBY_ASSERT(ruby_thread_has_gvl_p());
+
+ return fiber_scheduler_current_for_threadptr(GET_THREAD());
}
+// This function is allowed to be called without holding the GVL.
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
{
- return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
+ return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
+}
+
+VALUE rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
+{
+ return fiber_scheduler_current_for_threadptr(thread);
}
/*
@@ -516,7 +519,7 @@ rb_fiber_scheduler_make_timeout(struct timeval *timeout)
* Document-method: Fiber::Scheduler#kernel_sleep
* call-seq: kernel_sleep(duration = nil)
*
- * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
+ * Invoked by Kernel#sleep and Thread::Mutex#sleep and is expected to provide
* an implementation of sleeping in a non-blocking way. Implementation might
* register the current fiber in some list of "which fiber wait until what
* moment", call Fiber.yield to pass control, and then in #close resume
@@ -535,6 +538,23 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
+/**
+ * Document-method: Fiber::Scheduler#yield
+ * call-seq: yield
+ *
+ * Yield to the scheduler, to be resumed on the next scheduling cycle.
+ */
+VALUE
+rb_fiber_scheduler_yield(VALUE scheduler)
+{
+ // First try to call the scheduler's yield method, if it exists:
+ VALUE result = rb_check_funcall(scheduler, id_yield, 0, NULL);
+ if (!UNDEF_P(result)) return result;
+
+ // Otherwise, we can emulate yield by sleeping for 0 seconds:
+ return rb_fiber_scheduler_kernel_sleep(scheduler, RB_INT2NUM(0));
+}
+
#if 0
/*
* Document-method: Fiber::Scheduler#timeout_after
@@ -558,7 +578,7 @@ rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
* However, as a result of this design, if the +block+ does not invoke any
* non-blocking operations, it will be impossible to interrupt it. If you
* desire to provide predictable points for timeouts, consider adding
- * +sleep(0)+.
+ * <tt>sleep(0)</tt>.
*
* If the block is executed successfully, its result will be returned.
*
@@ -613,7 +633,7 @@ rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
* Document-method: Fiber::Scheduler#block
* call-seq: block(blocker, timeout = nil)
*
- * Invoked by methods like Thread.join, and by Mutex, to signify that current
+ * Invoked by methods like Thread.join, and by Thread::Mutex, to signify that current
* Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
* elapsed.
*
@@ -633,8 +653,8 @@ rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
* Document-method: Fiber::Scheduler#unblock
* call-seq: unblock(blocker, fiber)
*
- * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
- * calls #block and Mutex#unlock calls #unblock). The scheduler should use
+ * Invoked to wake up Fiber previously blocked with #block (for example, Thread::Mutex#lock
+ * calls #block and Thread::Mutex#unlock calls #unblock). The scheduler should use
* the +fiber+ parameter to understand which fiber is unblocked.
*
* +blocker+ is what was awaited for, but it is informational only (for debugging
@@ -647,18 +667,36 @@ rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
{
RUBY_ASSERT(rb_obj_is_fiber(fiber));
+ VALUE result;
+ enum ruby_tag_type state;
+
// `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
+ //
// If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
int saved_errno = errno;
-#ifdef RUBY_DEBUG
+ // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
rb_execution_context_t *ec = GET_EC();
- if (RUBY_VM_INTERRUPTED(ec)) {
- rb_bug("rb_fiber_scheduler_unblock called with pending interrupt");
+ int saved_interrupt_mask = ec->interrupt_mask;
+ ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
+
+ rb_control_frame_t *volatile cfp = ec->cfp;
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
}
-#endif
+ else {
+ rb_vm_rewind_cfp(ec, cfp);
+ }
+ EC_POP_TAG();
- VALUE result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
+ ec->interrupt_mask = saved_interrupt_mask;
+
+ if (state) {
+ EC_JUMP_TAG(ec, state);
+ }
+
+ RUBY_VM_CHECK_INTS(ec);
errno = saved_errno;
@@ -914,6 +952,8 @@ fiber_scheduler_io_pwrite(VALUE _argument) {
VALUE
rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
{
+
+
if (!rb_respond_to(scheduler, id_io_pwrite)) {
return RUBY_Qundef;
}
@@ -977,6 +1017,14 @@ rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, co
return result;
}
+/*
+ * Document-method: Fiber::Scheduler#io_close
+ * call-seq: io_close(fd)
+ *
+ * Invoked by Ruby's core methods to notify scheduler that the IO object is closed. Note that
+ * the method will receive an integer file descriptor of the closed object, not an object
+ * itself.
+ */
VALUE
rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
{
@@ -1032,7 +1080,8 @@ rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
* call-seq: blocking_operation_wait(blocking_operation)
*
* Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
- * The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation.
+ * The blocking_operation is an opaque object that encapsulates the blocking operation
+ * and responds to a <tt>#call</tt> method without any arguments.
*
* If the scheduler doesn't implement this method, or if the scheduler doesn't execute
* the blocking operation, Ruby will fall back to the non-scheduler implementation.
@@ -1066,6 +1115,9 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
operation->data2 = NULL;
operation->unblock_function = NULL;
+ // Ensure that the blocking operation remains visible until this point:
+ RB_GC_GUARD(blocking_operation);
+
// If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
return Qundef;
@@ -1074,20 +1126,48 @@ VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*functi
return result;
}
+/*
+ * Document-method: Fiber::Scheduler#fiber_interrupt
+ * call-seq: fiber_interrupt(fiber, exception)
+ *
+ * Invoked by Ruby's core methods to notify the scheduler that the blocked fiber should be interrupted
+ * with an exception. For example, IO#close uses this method to interrupt fibers that are performing
+ * blocking IO operations.
+ *
+ */
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
{
VALUE arguments[] = {
fiber, exception
};
-#ifdef RUBY_DEBUG
+ VALUE result;
+ enum ruby_tag_type state;
+
+ // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
rb_execution_context_t *ec = GET_EC();
- if (RUBY_VM_INTERRUPTED(ec)) {
- rb_bug("rb_fiber_scheduler_fiber_interrupt called with pending interrupt");
+ int saved_interrupt_mask = ec->interrupt_mask;
+ ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
+
+ rb_control_frame_t *volatile cfp = ec->cfp;
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
}
-#endif
+ else {
+ rb_vm_rewind_cfp(ec, cfp);
+ }
+ EC_POP_TAG();
+
+ ec->interrupt_mask = saved_interrupt_mask;
+
+ if (state) {
+ EC_JUMP_TAG(ec, state);
+ }
+
+ RUBY_VM_CHECK_INTS(ec);
- return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
+ return result;
}
/*