From 1415653c84602b3ac4ee449858783aacd8af81ed Mon Sep 17 00:00:00 2001 From: zverok Date: Sat, 12 Dec 2020 18:55:17 +0200 Subject: Redocument non-blocking Fibers and scheduler * Document Fiber's method related to scheduling; * Extend Fiber's class docs with concepts of non-blocking fibers; * Introduce "imaginary" (documentation-only) class Fiber::SchedulerInterface to properly document how scheduler's methods should look. --- cont.c | 340 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 339 insertions(+), 1 deletion(-) diff --git a/cont.c b/cont.c index cd3dfba248..2a43c8d2e3 100644 --- a/cont.c +++ b/cont.c @@ -1734,6 +1734,28 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * 1000000 * FiberError: dead fiber called * + * == Non-blocking Fibers + * + * Since Ruby 3.0, the concept of non-blocking fiber was introduced. + * Non-blocking fiber, when reaching any potentially blocking operation (like + * sleep, wait for another process, wait for I/O data to be ready), instead + * of just freezing itself and all execution in the thread, yields control + * to other fibers, and allows the scheduler to handle waiting and waking + * (resuming) the fiber when it can proceed. + * + * For Fiber to behave as non-blocking, it should be created in Fiber.new with + * blocking: false (which is the default now), and Fiber.scheduler + * should be set with Fiber.set_scheduler. If Fiber.scheduler is not set in + * the current thread, blocking and non-blocking fiber's behavior is identical. + * + * Ruby doesn't provide a scheduler class: it is expected to be implemented by + * the user and correspond to Fiber::SchedulerInterface. + * + * There is also Fiber.schedule method, which is expected to immediately perform + * passed block in a non-blocking manner (but its actual implementation is up to + * the scheduler). + * + * */ static const rb_data_type_t fiber_data_type = { @@ -1842,7 +1864,29 @@ rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); } -/* :nodoc: */ +/* + * call-seq: + * Fiber.new(blocking: false) { |*args| ... } -> fiber + * + * Creates new Fiber. Initially, fiber is not running, but can be resumed with + * #resume. Arguments to the first #resume call would be passed to the block: + * + * f = Fiber.new do |initial| + * current = initial + * loop do + * puts "current: #{current.inspect}" + * current = Fiber.yield + * end + * end + * f.resume(100) # prints: current: 100 + * f.resume(1, 2, 3) # prints: current: [1, 2, 3] + * f.resume # prints: current: nil + * # ... and so on ... + * + * if blocking: false is passed to the Fiber.new, _and_ current thread + * has Fiber.scheduler defined, the Fiber becames non-blocking (see "Non-blocking + * fibers" section in class docs). + */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { @@ -1871,18 +1915,84 @@ rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat) return fiber; } +/* + * call-seq: + * Fiber.schedule { |*args| ... } -> fiber + * + * The method is expected to immediately run the provided block of code in a + * separate non-blocking fiber. + * + * puts "Go to sleep!" + * + * Fiber.set_scheduler(MyScheduler.new) + * + * Fiber.schedule do + * puts "Going to sleep" + * sleep(1) + * puts "I slept well" + * end + * + * puts "Wakey-wakey, sleepyhead" + * + * Assuming MyScheduler is properly implemented, this program will produce: + * + * Go to sleep! + * Going to sleep + * Wakey-wakey, sleepyhead + * ...1 sec pause here... + * I slept well + * + * ...e.g. on the first blocking operation inside the Fiber (sleep(1)), + * the control is yielded at the outside code (main fiber), and at the end + * of the execution, the scheduler takes care of properly resuming all the + * blocked fibers. + * + * Note that the behavior described above is how the method is expected + * to behave, actual behavior is up to the current scheduler's implementation of + * Fiber::SchedulerInterface#fiber method. Ruby doesn't enforce this method to + * behave in any particular way. + * + * If the scheduler is not set, the method raises + * RuntimeError (No scheduler is available!). + * + */ static VALUE rb_f_fiber(int argc, VALUE *argv, VALUE obj) { return rb_f_fiber_kw(argc, argv, rb_keyword_given_p()); } +/* + * call-seq: + * Fiber.scheduler -> obj or nil + * + * Fiber scheduler, set in the current thread with Fiber.set_scheduler. If the scheduler + * is +nil+ (which is the default), non-blocking fibers behavior is the same as blocking. + * (see "Non-blocking fibers" section in class docs for details about the scheduler concept). + * + */ static VALUE rb_fiber_scheduler(VALUE klass) { return rb_scheduler_get(); } +/* + * call-seq: + * Fiber.set_scheduler(scheduler) -> scheduler + * + * Sets Fiber scheduler for the current thread. If the scheduler is set, non-blocking + * fibers (created by Fiber.new with blocking: false, or by Fiber.schedule) + * call that scheduler's hook methods on potentially blocking operations, and the current + * thread will call scheduler's +close+ method on finalization (allowing the scheduler to + * properly manage all non-finished fibers). + * + * +scheduler+ can be an object of any class corresponding to Fiber::SchedulerInterface. Its + * implementation is up to the user. + * + * See also the "Non-blocking fibers" section in class docs. + * + */ static VALUE rb_fiber_set_scheduler(VALUE klass, VALUE scheduler) { @@ -2196,12 +2306,44 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, Qfalse, false); } +/* + * call-seq: + * fiber.blocking? -> true or false + * + * Returns +true+ if +fiber+ is blocking and +false+ otherwise. + * Fiber is non-blocking if it was created via passing blocking: false + * to Fiber.new, or via Fiber.schedule. + * + * Note, that even if the method returns +false+, Fiber behaves differently + * only if Fiber.scheduler is set in the current thread. + * + * See the "Non-blocking fibers" section in class docs for details. + * + */ VALUE rb_fiber_blocking_p(VALUE fiber) { return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue; } +/* + * call-seq: + * Fiber.blocking? -> false or number + * + * Returns +false+ if the current fiber is non-blocking. + * Fiber is non-blocking if it was created via passing blocking: false + * to Fiber.new, or via Fiber.schedule. + * + * If the current Fiber is blocking, the method, unlike usual + * predicate methods, returns a *number* of blocking fibers currently + * running (TBD: always 1?). + * + * Note, that even if the method returns +false+, Fiber behaves differently + * only if Fiber.scheduler is set in the current thread. + * + * See the "Non-blocking fibers" section in class docs for details. + * + */ static VALUE rb_f_fiber_blocking_p(VALUE klass) { @@ -2707,6 +2849,191 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) * fiber.resume #=> FiberError: dead fiber called */ +/* + * Document-class: Fiber::SchedulerInterface + * + * This is not an existing class, but documentation of the interface that Scheduler + * object should comply in order to be used as Fiber.scheduler and handle non-blocking + * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations + * of some concepts. + * + * Scheduler's behavior and usage are expected to be as follows: + * + * * When the execution in the non-blocking Fiber reaches some blocking operation (like + * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's + * hook methods, listed below. + * * Scheduler somehow registers what the current fiber is waited for, and yields control + * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its + * wait to end, and other fibers in the same thread can perform) + * * At the end of the current thread execution, the scheduler's method #close is called + * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has + * registered on hook calls) and resuming them when the awaited resource is ready (I/O + * ready, sleep time passed). + * + * A typical implementation would probably rely for this closing loop on a gem like + * EventMachine[https://github.com/eventmachine/eventmachine] or + * Async[https://github.com/socketry/async]. + * + * This way concurrent execution will be achieved in a way that is transparent for every + * individual Fiber's code. + * + * Hook methods are: + * + * * #io_wait + * * #process_wait + * * #kernel_sleep + * * #block and #unblock + * * (the list is expanded as Ruby developers make more methods having non-blocking calls) + * + * When not specified otherwise, the hook implementations are mandatory: if they are not + * implemented, the methods trying to call hook will fail. To provide backward compatibility, + * in the future hooks will be optional (if they are not implemented, due to the scheduler + * being created for the older Ruby version, the code which needs this hook will not fail, + * and will just behave in a blocking fashion). + * + * It is also strongly suggested that the scheduler implement the #fiber method, which is + * delegated to by Fiber.schedule. + * + * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in + * test/fiber/scheduler.rb + * + */ + +#if 0 /* for RDoc */ +/* + * + * Document-method: Fiber::SchedulerInterface#close + * + * Called when the current thread exits. The scheduler is expected to implement this + * method in order to allow all waiting fibers to finalize their execution. + * + * The suggested pattern is to implement the main event loop in the #close method. + * + */ +static VALUE +rb_fiber_scheduler_interface_close(VALUE self) +{ +} + +/* + * Document-method: SchedulerInterface#process_wait + * call-seq: process_wait(pid, flags) + * + * Invoked by Process::Status.wait in order to wait for a specified process. + * See that method description for arguments description. + * + * Suggested minimal implementation: + * + * Thread.new do + * Process::Status.wait(pid, flags) + * end.value + * + * This hook is optional: if it is not present in the current scheduler, + * Process::Status.wait will behave as a blocking method. + * + * Expected to returns a Process::Status instance. + */ +static VALUE +rb_fiber_scheduler_interface_process_wait(VALUE self) +{ +} + +/* + * Document-method: SchedulerInterface#io_wait + * call-seq: io_wait(io, events, timeout) + * + * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the + * specified descriptor is ready for specified events within + * the specified +timeout+. + * + * +events+ is a bit mask of IO::READABLE, IO::WRITABLE, and + * IO::PRIORITY. + * + * Suggested implementation should register which Fiber is waiting for which + * resources and immediately calling Fiber.yield to pass control to other + * fibers. Then, in the #close method, the scheduler might dispatch all the + * I/O resources to fibers waiting for it. + * + * Expected to return the subset of events that are ready immediately. + * + */ +static VALUE +rb_fiber_scheduler_interface_io_wait(VALUE self) +{ +} + +/* + * Document-method: SchedulerInterface#kernel_sleep + * call-seq: kernel_sleep(duration = nil) + * + * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide + * an implementation of sleeping in a non-blocking way. Implementation might + * register the current fiber in some list of "what fiber waits till what + * moment", call Fiber.yield to pass control, and then in #close resume + * the fibers whose wait period have ended. + * + */ +static VALUE +rb_fiber_scheduler_interface_kernel_sleep(VALUE self) +{ +} + +/* + * Document-method: SchedulerInterface#block + * call-seq: block(blocker, timeout = nil) + * + * Invoked by methods like Thread.join, and by Mutex, to signify that current + * Fiber is blocked till further notice (e.g. #unblock) or till +timeout+ will + * pass. + * + * +blocker+ is what we are waiting on, informational only (for debugging and + * logging). There are no guarantees about its value. + * + * Expected to return boolean, specifying whether the blocking operation was + * successful or not. + */ +static VALUE +rb_fiber_scheduler_interface_block(VALUE self) +{ +} + +/* + * Document-method: SchedulerInterface#unblock + * call-seq: unblock(blocker, fiber) + * + * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock + * calls #block and Mutex#unlock calls #unblock). The scheduler should use + * the +fiber+ parameter to understand which fiber is unblocked. + * + * +blocker+ is what was awaited for, but it is informational only (for debugging + * and logging), and it is not guaranteed to be the same value as the +blocker+ for + * #block. + * + */ +static VALUE +rb_fiber_scheduler_interface_unblock(VALUE self) +{ +} + +/* + * Document-method: SchedulerInterface#fiber + * call-seq: fiber(&block) + * + * Implementation of the Fiber.schedule. The method is expected to immediately + * run passed block of code in a separate non-blocking fiber, and to return that Fiber. + * + * Minimal suggested implementation is: + * + * def fiber(&block) + * Fiber.new(blocking: false, &block).tap(&:resume) + * end + */ +static VALUE +rb_fiber_scheduler_interface_fiber(VALUE self) +{ +} +#endif + void Init_Cont(void) { @@ -2754,6 +3081,17 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "schedule", rb_f_fiber, -1); //rb_define_global_function("Fiber", rb_f_fiber, -1); +#if 0 /* for RDoc */ + rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject); + rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0); + rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0); + rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0); + rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0); + rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0); + rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0); + rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0); +#endif + #ifdef RB_EXPERIMENTAL_FIBER_POOL rb_cFiberPool = rb_define_class("Pool", rb_cFiber); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); -- cgit v1.2.3