diff options
Diffstat (limited to 'cont.c')
-rw-r--r-- | cont.c | 1188 |
1 files changed, 710 insertions, 478 deletions
@@ -26,14 +26,19 @@ extern int madvise(caddr_t, size_t, int); #include COROUTINE_H #include "eval_intern.h" -#include "gc.h" #include "internal.h" #include "internal/cont.h" +#include "internal/thread.h" +#include "internal/error.h" +#include "internal/gc.h" #include "internal/proc.h" +#include "internal/sanitizers.h" #include "internal/warnings.h" #include "ruby/fiber/scheduler.h" -#include "mjit.h" +#include "rjit.h" +#include "yjit.h" #include "vm_core.h" +#include "vm_sync.h" #include "id_table.h" #include "ractor_core.h" @@ -172,7 +177,7 @@ struct fiber_pool { // A singly-linked list of allocations which contain 1 or more stacks each. struct fiber_pool_allocation * allocations; - // Provides O(1) stack "allocation": + // Free list that provides O(1) stack "allocation". struct fiber_pool_vacancy * vacancies; // The size of the stack allocations (excluding any guard page). @@ -184,16 +189,27 @@ struct fiber_pool { // The initial number of stacks to allocate. size_t initial_count; - // Whether to madvise(free) the stack or not: + // Whether to madvise(free) the stack or not. + // If this value is set to 1, the stack will be madvise(free)ed + // (or equivalent), where possible, when it is returned to the pool. int free_stacks; // The number of stacks that have been used in this pool. size_t used; - // The amount to allocate for the vm_stack: + // The amount to allocate for the vm_stack. size_t vm_stack_size; }; +// Continuation contexts used by JITs +struct rb_jit_cont { + rb_execution_context_t *ec; // continuation ec + struct rb_jit_cont *prev, *next; // used to form lists +}; + +// Doubly linked list for enumerating all on-stack ISEQs. +static struct rb_jit_cont *first_jit_cont; + typedef struct rb_context_struct { enum context_type type; int argc; @@ -211,22 +227,21 @@ typedef struct rb_context_struct { rb_execution_context_t saved_ec; rb_jmpbuf_t jmpbuf; rb_ensure_entry_t *ensure_array; - /* Pointer to MJIT info about the continuation. */ - struct mjit_cont *mjit_cont; + struct rb_jit_cont *jit_cont; // Continuation contexts for JITs } rb_context_t; - /* * Fiber status: - * [Fiber.new] ------> FIBER_CREATED - * | [Fiber#resume] - * v - * +--> FIBER_RESUMED ----+ - * [Fiber#resume] | | [Fiber.yield] | - * | v | - * +-- FIBER_SUSPENDED | [Terminate] - * | - * FIBER_TERMINATED <-+ + * [Fiber.new] ------> FIBER_CREATED ----> [Fiber#kill] --> | + * | [Fiber#resume] | + * v | + * +--> FIBER_RESUMED ----> [return] ------> | + * [Fiber#resume] | | [Fiber.yield/transfer] | + * [Fiber#transfer] | v | + * +--- FIBER_SUSPENDED --> [Fiber#kill] --> | + * | + * | + * FIBER_TERMINATED <-------------------+ */ enum fiber_status { FIBER_CREATED, @@ -252,13 +267,21 @@ struct rb_fiber_struct { unsigned int yielding : 1; unsigned int blocking : 1; + unsigned int killed : 1; + struct coroutine_context context; struct fiber_pool_stack stack; }; static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; -static ID fiber_initialize_keywords[2] = {0}; +void +rb_free_shared_fiber_pool(void) +{ + xfree(shared_fiber_pool.allocations); +} + +static ID fiber_initialize_keywords[3] = {0}; /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL @@ -274,7 +297,6 @@ static ID fiber_initialize_keywords[2] = {0}; #define ERRNOMSG strerror(errno) // Locates the stack vacancy details for the given stack. -// Requires that fiber_pool_vacancy fits within one page. inline static struct fiber_pool_vacancy * fiber_pool_vacancy_pointer(void * base, size_t size) { @@ -285,6 +307,24 @@ fiber_pool_vacancy_pointer(void * base, size_t size) ); } +#if defined(COROUTINE_SANITIZE_ADDRESS) +// Compute the base pointer for a vacant stack, for the area which can be poisoned. +inline static void * +fiber_pool_stack_poison_base(struct fiber_pool_stack * stack) +{ + STACK_GROW_DIR_DETECTION; + + return (char*)stack->base + STACK_DIR_UPPER(RB_PAGE_SIZE, 0); +} + +// Compute the size of the vacant stack, for the area that can be poisoned. +inline static size_t +fiber_pool_stack_poison_size(struct fiber_pool_stack * stack) +{ + return stack->size - RB_PAGE_SIZE; +} +#endif + // Reset the current stack pointer and available size of the given stack. inline static void fiber_pool_stack_reset(struct fiber_pool_stack * stack) @@ -634,6 +674,10 @@ fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) VM_ASSERT(vacancy); VM_ASSERT(vacancy->stack.base); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __asan_unpoison_memory_region(fiber_pool_stack_poison_base(&vacancy->stack), fiber_pool_stack_poison_size(&vacancy->stack)); +#endif + // Take the top item from the free list: fiber_pool->used += 1; @@ -657,28 +701,54 @@ fiber_pool_stack_free(struct fiber_pool_stack * stack) // If this is not true, the vacancy information will almost certainly be destroyed: VM_ASSERT(size <= (stack->size - RB_PAGE_SIZE)); - if (DEBUG) fprintf(stderr, "fiber_pool_stack_free: %p+%"PRIuSIZE" [base=%p, size=%"PRIuSIZE"]\n", base, size, stack->base, stack->size); + int advice = stack->pool->free_stacks >> 1; + + if (DEBUG) fprintf(stderr, "fiber_pool_stack_free: %p+%"PRIuSIZE" [base=%p, size=%"PRIuSIZE"] advice=%d\n", base, size, stack->base, stack->size, advice); -#if VM_CHECK_MODE > 0 && defined(MADV_DONTNEED) + // The pages being used by the stack can be returned back to the system. + // That doesn't change the page mapping, but it does allow the system to + // reclaim the physical memory. + // Since we no longer care about the data itself, we don't need to page + // out to disk, since that is costly. Not all systems support that, so + // we try our best to select the most efficient implementation. + // In addition, it's actually slightly desirable to not do anything here, + // but that results in higher memory usage. + +#ifdef __wasi__ + // WebAssembly doesn't support madvise, so we just don't do anything. +#elif VM_CHECK_MODE > 0 && defined(MADV_DONTNEED) + if (!advice) advice = MADV_DONTNEED; // This immediately discards the pages and the memory is reset to zero. - madvise(base, size, MADV_DONTNEED); -#elif defined(POSIX_MADV_DONTNEED) - posix_madvise(base, size, POSIX_MADV_DONTNEED); + madvise(base, size, advice); #elif defined(MADV_FREE_REUSABLE) + if (!advice) advice = MADV_FREE_REUSABLE; + // Darwin / macOS / iOS. // Acknowledge the kernel down to the task info api we make this // page reusable for future use. - // As for MADV_FREE_REUSE below we ensure in the rare occasions the task was not + // As for MADV_FREE_REUSABLE below we ensure in the rare occasions the task was not // completed at the time of the call to re-iterate. - while (madvise(base, size, MADV_FREE_REUSABLE) == -1 && errno == EAGAIN); + while (madvise(base, size, advice) == -1 && errno == EAGAIN); #elif defined(MADV_FREE) - madvise(base, size, MADV_FREE); + if (!advice) advice = MADV_FREE; + // Recent Linux. + madvise(base, size, advice); #elif defined(MADV_DONTNEED) - madvise(base, size, MADV_DONTNEED); + if (!advice) advice = MADV_DONTNEED; + // Old Linux. + madvise(base, size, advice); +#elif defined(POSIX_MADV_DONTNEED) + if (!advice) advice = POSIX_MADV_DONTNEED; + // Solaris? + posix_madvise(base, size, advice); #elif defined(_WIN32) VirtualAlloc(base, size, MEM_RESET, PAGE_READWRITE); // Not available in all versions of Windows. //DiscardVirtualMemory(base, size); #endif + +#if defined(COROUTINE_SANITIZE_ADDRESS) + __asan_poison_memory_region(fiber_pool_stack_poison_base(stack), fiber_pool_stack_poison_size(stack)); +#endif } // Release and return a stack to the vacancy list. @@ -698,7 +768,7 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) fiber_pool_vacancy_reset(vacancy); // Push the vacancy into the vancancies list: - pool->vacancies = fiber_pool_vacancy_push(vacancy, stack->pool->vacancies); + pool->vacancies = fiber_pool_vacancy_push(vacancy, pool->vacancies); pool->used -= 1; #ifdef FIBER_POOL_ALLOCATION_FREE @@ -714,7 +784,8 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) fiber_pool_stack_free(&vacancy->stack); } #else - // This is entirely optional, but clears the dirty flag from the stack memory, so it won't get swapped to disk when there is memory pressure: + // This is entirely optional, but clears the dirty flag from the stack + // memory, so it won't get swapped to disk when there is memory pressure: if (stack->pool->free_stacks) { fiber_pool_stack_free(&vacancy->stack); } @@ -725,6 +796,9 @@ static inline void ec_switch(rb_thread_t *th, rb_fiber_t *fiber) { rb_execution_context_t *ec = &fiber->cont.saved_ec; +#ifdef RUBY_ASAN_ENABLED + ec->machine.asan_fake_stack_handle = asan_get_thread_fake_stack_handle(); +#endif rb_ractor_set_current_ec(th->ractor, th->ec = ec); // ruby_current_execution_context_ptr = th->ec = ec; @@ -751,6 +825,20 @@ static COROUTINE fiber_entry(struct coroutine_context * from, struct coroutine_context * to) { rb_fiber_t *fiber = to->argument; + +#if defined(COROUTINE_SANITIZE_ADDRESS) + // Address sanitizer will copy the previous stack base and stack size into + // the "from" fiber. `coroutine_initialize_main` doesn't generally know the + // stack bounds (base + size). Therefore, the main fiber `stack_base` and + // `stack_size` will be NULL/0. It's specifically important in that case to + // get the (base+size) of the previous fiber and save it, so that later when + // we return to the main coroutine, we don't supply (NULL, 0) to + // __sanitizer_start_switch_fiber which royally messes up the internal state + // of ASAN and causes (sometimes) the following message: + // "WARNING: ASan is ignoring requested __asan_handle_no_return" + __sanitizer_finish_switch_fiber(to->fake_stack, (const void**)&from->stack_base, &from->stack_size); +#endif + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; #ifdef COROUTINE_PTHREAD_CONTEXT @@ -791,7 +879,8 @@ fiber_initialize_coroutine(rb_fiber_t *fiber, size_t * vm_stack_size) return vm_stack; } -// Release the stack from the fiber, it's execution context, and return it to the fiber pool. +// Release the stack from the fiber, it's execution context, and return it to +// the fiber pool. static void fiber_stack_release(rb_fiber_t * fiber) { @@ -937,13 +1026,8 @@ cont_mark(void *ptr) cont->machine.stack + cont->machine.stack_size); } else { - /* fiber */ - const rb_fiber_t *fiber = (rb_fiber_t*)cont; - - if (!FIBER_TERMINATED_P(fiber)) { - rb_gc_mark_locations(cont->machine.stack, - cont->machine.stack + cont->machine.stack_size); - } + /* fiber machine context is marked as part of rb_execution_context_mark, no need to + * do anything here. */ } } @@ -958,6 +1042,8 @@ fiber_is_root_p(const rb_fiber_t *fiber) } #endif +static void jit_cont_free(struct rb_jit_cont *cont); + static void cont_free(void *ptr) { @@ -978,10 +1064,8 @@ cont_free(void *ptr) RUBY_FREE_UNLESS_NULL(cont->saved_vm_stack.ptr); - if (mjit_enabled) { - VM_ASSERT(cont->mjit_cont != NULL); - mjit_cont_free(cont->mjit_cont); - } + VM_ASSERT(cont->jit_cont != NULL); + jit_cont_free(cont->jit_cont); /* free rb_cont_t or rb_fiber_t */ ruby_xfree(ptr); RUBY_FREE_LEAVE("cont"); @@ -1085,7 +1169,9 @@ fiber_memsize(const void *ptr) */ if (saved_ec->local_storage && fiber != th->root_fiber) { size += rb_id_table_memsize(saved_ec->local_storage); + size += rb_obj_memsize_of(saved_ec->storage); } + size += cont_memsize(&fiber->cont); return size; } @@ -1120,6 +1206,7 @@ cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) } FLUSH_REGISTER_WINDOWS; + asan_unpoison_memory_region(cont->machine.stack_src, size, false); MEMCPY(cont->machine.stack, cont->machine.stack_src, VALUE, size); } @@ -1144,13 +1231,125 @@ cont_save_thread(rb_context_t *cont, rb_thread_t *th) sec->machine.stack_end = NULL; } +static rb_nativethread_lock_t jit_cont_lock; + +// Register a new continuation with execution context `ec`. Return JIT info about +// the continuation. +static struct rb_jit_cont * +jit_cont_new(rb_execution_context_t *ec) +{ + struct rb_jit_cont *cont; + + // We need to use calloc instead of something like ZALLOC to avoid triggering GC here. + // When this function is called from rb_thread_alloc through rb_threadptr_root_fiber_setup, + // the thread is still being prepared and marking it causes SEGV. + cont = calloc(1, sizeof(struct rb_jit_cont)); + if (cont == NULL) + rb_memerror(); + cont->ec = ec; + + rb_native_mutex_lock(&jit_cont_lock); + if (first_jit_cont == NULL) { + cont->next = cont->prev = NULL; + } + else { + cont->prev = NULL; + cont->next = first_jit_cont; + first_jit_cont->prev = cont; + } + first_jit_cont = cont; + rb_native_mutex_unlock(&jit_cont_lock); + + return cont; +} + +// Unregister continuation `cont`. static void -cont_init_mjit_cont(rb_context_t *cont) +jit_cont_free(struct rb_jit_cont *cont) { - VM_ASSERT(cont->mjit_cont == NULL); - if (mjit_enabled) { - cont->mjit_cont = mjit_cont_new(&(cont->saved_ec)); + if (!cont) return; + + rb_native_mutex_lock(&jit_cont_lock); + if (cont == first_jit_cont) { + first_jit_cont = cont->next; + if (first_jit_cont != NULL) + first_jit_cont->prev = NULL; + } + else { + cont->prev->next = cont->next; + if (cont->next != NULL) + cont->next->prev = cont->prev; } + rb_native_mutex_unlock(&jit_cont_lock); + + free(cont); +} + +// Call a given callback against all on-stack ISEQs. +void +rb_jit_cont_each_iseq(rb_iseq_callback callback, void *data) +{ + struct rb_jit_cont *cont; + for (cont = first_jit_cont; cont != NULL; cont = cont->next) { + if (cont->ec->vm_stack == NULL) + continue; + + const rb_control_frame_t *cfp = cont->ec->cfp; + while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { + if (cfp->pc && cfp->iseq && imemo_type((VALUE)cfp->iseq) == imemo_iseq) { + callback(cfp->iseq, data); + } + cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); + } + } +} + +#if USE_YJIT +// Update the jit_return of all CFPs to leave_exit unless it's leave_exception or not set. +// This prevents jit_exec_exception from jumping to the caller after invalidation. +void +rb_yjit_cancel_jit_return(void *leave_exit, void *leave_exception) +{ + struct rb_jit_cont *cont; + for (cont = first_jit_cont; cont != NULL; cont = cont->next) { + if (cont->ec->vm_stack == NULL) + continue; + + const rb_control_frame_t *cfp = cont->ec->cfp; + while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { + if (cfp->jit_return && cfp->jit_return != leave_exception) { + ((rb_control_frame_t *)cfp)->jit_return = leave_exit; + } + cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); + } + } +} +#endif + +// Finish working with jit_cont. +void +rb_jit_cont_finish(void) +{ + struct rb_jit_cont *cont, *next; + for (cont = first_jit_cont; cont != NULL; cont = next) { + next = cont->next; + free(cont); // Don't use xfree because it's allocated by calloc. + } + rb_native_mutex_destroy(&jit_cont_lock); +} + +static void +cont_init_jit_cont(rb_context_t *cont) +{ + VM_ASSERT(cont->jit_cont == NULL); + // We always allocate this since YJIT may be enabled later + cont->jit_cont = jit_cont_new(&(cont->saved_ec)); +} + +struct rb_execution_context_struct * +rb_fiberptr_get_ec(struct rb_fiber_struct *fiber) +{ + return &fiber->cont.saved_ec; } static void @@ -1162,7 +1361,7 @@ cont_init(rb_context_t *cont, rb_thread_t *th) cont->saved_ec.local_storage = NULL; cont->saved_ec.local_storage_recursive_hash = Qnil; cont->saved_ec.local_storage_recursive_hash_for_trace = Qnil; - cont_init_mjit_cont(cont); + cont_init_jit_cont(cont); } static rb_context_t * @@ -1191,11 +1390,11 @@ rb_fiberptr_blocking(struct rb_fiber_struct *fiber) return fiber->blocking; } -// This is used for root_fiber because other fibers call cont_init_mjit_cont through cont_new. +// Initialize the jit_cont_lock void -rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber) +rb_jit_cont_init(void) { - cont_init_mjit_cont(&fiber->cont); + rb_native_mutex_initialize(&jit_cont_lock); } #if 0 @@ -1225,10 +1424,7 @@ show_vm_pcs(const rb_control_frame_t *cfp, } } #endif -COMPILER_WARNING_PUSH -#ifdef __clang__ -COMPILER_WARNING_IGNORED(-Wduplicate-decl-specifier) -#endif + static VALUE cont_capture(volatile int *volatile stat) { @@ -1272,7 +1468,7 @@ cont_capture(volatile int *volatile stat) entry = cont->ensure_array = ALLOC_N(rb_ensure_entry_t,size+1); for (p=th->ec->ensure_list; p; p=p->next) { if (!p->entry.marker) - p->entry.marker = rb_ary_tmp_new(0); /* dummy object */ + p->entry.marker = rb_ary_hidden_new(0); /* dummy object */ *entry++ = p->entry; } entry->marker = 0; @@ -1293,7 +1489,6 @@ cont_capture(volatile int *volatile stat) return contval; } } -COMPILER_WARNING_POP static inline void cont_restore_thread(rb_context_t *cont) @@ -1371,17 +1566,24 @@ fiber_setcontext(rb_fiber_t *new_fiber, rb_fiber_t *old_fiber) } } - /* exchange machine_stack_start between old_fiber and new_fiber */ + /* these values are used in rb_gc_mark_machine_context to mark the fiber's stack. */ old_fiber->cont.saved_ec.machine.stack_start = th->ec->machine.stack_start; + old_fiber->cont.saved_ec.machine.stack_end = FIBER_TERMINATED_P(old_fiber) ? NULL : th->ec->machine.stack_end; - /* old_fiber->machine.stack_end should be NULL */ - old_fiber->cont.saved_ec.machine.stack_end = NULL; // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] -> %p[%p]\n", (void*)old_fiber, old_fiber->stack.base, (void*)new_fiber, new_fiber->stack.base); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __sanitizer_start_switch_fiber(FIBER_TERMINATED_P(old_fiber) ? NULL : &old_fiber->context.fake_stack, new_fiber->context.stack_base, new_fiber->context.stack_size); +#endif + /* swap machine context */ struct coroutine_context * from = coroutine_transfer(&old_fiber->context, &new_fiber->context); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __sanitizer_finish_switch_fiber(old_fiber->context.fake_stack, NULL, NULL); +#endif + if (from == NULL) { rb_syserr_fail(errno, "coroutine_transfer"); } @@ -1401,7 +1603,7 @@ cont_restore_1(rb_context_t *cont) cont_restore_thread(cont); /* restore machine stack */ -#ifdef _M_AMD64 +#if defined(_M_AMD64) && !defined(__MINGW64__) { /* workaround for x64 SEH */ jmp_buf buf; @@ -1441,6 +1643,10 @@ cont_restore_0(rb_context_t *cont, VALUE *addr_in_prev_frame) if (&space[0] > end) { # ifdef HAVE_ALLOCA volatile VALUE *sp = ALLOCA_N(VALUE, &space[0] - end); + // We need to make sure that the stack pointer is moved, + // but some compilers may remove the allocation by optimization. + // We hope that the following read/write will prevent such an optimization. + *sp = Qfalse; space[0] = *sp; # else cont_restore_0(cont, &space[0]); @@ -1566,6 +1772,13 @@ rb_callcc(VALUE self) return rb_yield(val); } } +#ifdef RUBY_ASAN_ENABLED +/* callcc can't possibly work with ASAN; see bug #20273. Also this function + * definition below avoids a "defined and not used" warning. */ +MAYBE_UNUSED(static void notusing_callcc(void)) { rb_callcc(Qnil); } +# define rb_callcc rb_f_notimplement +#endif + static VALUE make_passing_arg(int argc, const VALUE *argv) @@ -1652,7 +1865,7 @@ rollback_ensure_stack(VALUE self,rb_ensure_list_t *current,rb_ensure_entry_t *ta /* push ensure stack */ for (j = 0; j < i; j++) { func = lookup_rollback_func(target[i - j - 1].e_proc); - if ((VALUE)func != Qundef) { + if (!UNDEF_P((VALUE)func)) { (*func)(target[i - j - 1].data2); } } @@ -1777,7 +1990,7 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * the current thread, blocking and non-blocking fibers' behavior is identical. * * Ruby doesn't provide a scheduler class: it is expected to be implemented by - * the user and correspond to Fiber::SchedulerInterface. + * the user and correspond to Fiber::Scheduler. * * There is also Fiber.schedule method, which is expected to immediately perform * the given block in a non-blocking manner. Its actual implementation is up to @@ -1812,6 +2025,7 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) fiber->cont.self = fiber_value; fiber->cont.type = FIBER_CONTEXT; fiber->blocking = blocking; + fiber->killed = 0; cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; @@ -1828,11 +2042,220 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) return fiber; } +static rb_fiber_t * +root_fiber_alloc(rb_thread_t *th) +{ + VALUE fiber_value = fiber_alloc(rb_cFiber); + rb_fiber_t *fiber = th->ec->fiber_ptr; + + VM_ASSERT(DATA_PTR(fiber_value) == NULL); + VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); + VM_ASSERT(FIBER_RESUMED_P(fiber)); + + th->root_fiber = fiber; + DATA_PTR(fiber_value) = fiber; + fiber->cont.self = fiber_value; + + coroutine_initialize_main(&fiber->context); + + return fiber; +} + +static inline rb_fiber_t* +fiber_current(void) +{ + rb_execution_context_t *ec = GET_EC(); + if (ec->fiber_ptr->cont.self == 0) { + root_fiber_alloc(rb_ec_thread_ptr(ec)); + } + return ec->fiber_ptr; +} + +static inline VALUE +current_fiber_storage(void) +{ + rb_execution_context_t *ec = GET_EC(); + return ec->storage; +} + +static inline VALUE +inherit_fiber_storage(void) +{ + return rb_obj_dup(current_fiber_storage()); +} + +static inline void +fiber_storage_set(struct rb_fiber_struct *fiber, VALUE storage) +{ + fiber->cont.saved_ec.storage = storage; +} + +static inline VALUE +fiber_storage_get(rb_fiber_t *fiber, int allocate) +{ + VALUE storage = fiber->cont.saved_ec.storage; + if (storage == Qnil && allocate) { + storage = rb_hash_new(); + fiber_storage_set(fiber, storage); + } + return storage; +} + +static void +storage_access_must_be_from_same_fiber(VALUE self) +{ + rb_fiber_t *fiber = fiber_ptr(self); + rb_fiber_t *current = fiber_current(); + if (fiber != current) { + rb_raise(rb_eArgError, "Fiber storage can only be accessed from the Fiber it belongs to"); + } +} + +/** + * call-seq: fiber.storage -> hash (dup) + * + * Returns a copy of the storage hash for the fiber. The method can only be called on the + * Fiber.current. + */ static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) +rb_fiber_storage_get(VALUE self) { + storage_access_must_be_from_same_fiber(self); + + VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE); + + if (storage == Qnil) { + return Qnil; + } + else { + return rb_obj_dup(storage); + } +} + +static int +fiber_storage_validate_each(VALUE key, VALUE value, VALUE _argument) +{ + Check_Type(key, T_SYMBOL); + + return ST_CONTINUE; +} + +static void +fiber_storage_validate(VALUE value) +{ + // nil is an allowed value and will be lazily initialized. + if (value == Qnil) return; + + if (!RB_TYPE_P(value, T_HASH)) { + rb_raise(rb_eTypeError, "storage must be a hash"); + } + + if (RB_OBJ_FROZEN(value)) { + rb_raise(rb_eFrozenError, "storage must not be frozen"); + } + + rb_hash_foreach(value, fiber_storage_validate_each, Qundef); +} + +/** + * call-seq: fiber.storage = hash + * + * Sets the storage hash for the fiber. This feature is experimental + * and may change in the future. The method can only be called on the + * Fiber.current. + * + * You should be careful about using this method as you may inadvertently clear + * important fiber-storage state. You should mostly prefer to assign specific + * keys in the storage using Fiber::[]=. + * + * You can also use <tt>Fiber.new(storage: nil)</tt> to create a fiber with an empty + * storage. + * + * Example: + * + * while request = request_queue.pop + * # Reset the per-request state: + * Fiber.current.storage = nil + * handle_request(request) + * end + */ +static VALUE +rb_fiber_storage_set(VALUE self, VALUE value) +{ + if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { + rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL, + "Fiber#storage= is experimental and may be removed in the future!"); + } + + storage_access_must_be_from_same_fiber(self); + fiber_storage_validate(value); + + fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value); + return value; +} + +/** + * call-seq: Fiber[key] -> value + * + * Returns the value of the fiber storage variable identified by +key+. + * + * The +key+ must be a symbol, and the value is set by Fiber#[]= or + * Fiber#store. + * + * See also Fiber::[]=. + */ +static VALUE +rb_fiber_storage_aref(VALUE class, VALUE key) +{ + Check_Type(key, T_SYMBOL); + + VALUE storage = fiber_storage_get(fiber_current(), FALSE); + if (storage == Qnil) return Qnil; + + return rb_hash_aref(storage, key); +} + +/** + * call-seq: Fiber[key] = value + * + * Assign +value+ to the fiber storage variable identified by +key+. + * The variable is created if it doesn't exist. + * + * +key+ must be a Symbol, otherwise a TypeError is raised. + * + * See also Fiber::[]. + */ +static VALUE +rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value) +{ + Check_Type(key, T_SYMBOL); + + VALUE storage = fiber_storage_get(fiber_current(), value != Qnil); + if (storage == Qnil) return Qnil; + + if (value == Qnil) { + return rb_hash_delete(storage, key); + } + else { + return rb_hash_aset(storage, key, value); + } +} + +static VALUE +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage) +{ + if (storage == Qundef || storage == Qtrue) { + // The default, inherit storage (dup) from the current fiber: + storage = inherit_fiber_storage(); + } + else /* nil, hash, etc. */ { + fiber_storage_validate(storage); + storage = rb_obj_dup(storage); + } + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); + fiber->cont.saved_ec.storage = storage; fiber->first_proc = proc; fiber->stack.base = NULL; fiber->stack.pool = fiber_pool; @@ -1865,54 +2288,90 @@ rb_fiber_pool_default(VALUE pool) return &shared_fiber_pool; } +VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb_fiber_struct *fiber) +{ + VALUE storage = rb_obj_dup(ec->storage); + fiber->cont.saved_ec.storage = storage; + return storage; +} + /* :nodoc: */ static VALUE rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) { VALUE pool = Qnil; VALUE blocking = Qfalse; + VALUE storage = Qundef; if (kw_splat != RB_NO_KEYWORDS) { VALUE options = Qnil; - VALUE arguments[2] = {Qundef}; + VALUE arguments[3] = {Qundef}; argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); - rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 3, arguments); - if (arguments[0] != Qundef) { + if (!UNDEF_P(arguments[0])) { blocking = arguments[0]; } - if (arguments[1] != Qundef) { + if (!UNDEF_P(arguments[1])) { pool = arguments[1]; } + + storage = arguments[2]; } - return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage); } /* * call-seq: - * Fiber.new(blocking: false) { |*args| ... } -> fiber + * Fiber.new(blocking: false, storage: true) { |*args| ... } -> fiber * - * Creates new Fiber. Initially, the fiber is not running and can be resumed with - * #resume. Arguments to the first #resume call will be passed to the block: + * Creates new Fiber. Initially, the fiber is not running and can be resumed + * with #resume. Arguments to the first #resume call will be passed to the + * block: * - * f = Fiber.new do |initial| - * current = initial - * loop do - * puts "current: #{current.inspect}" - * current = Fiber.yield - * end - * end - * f.resume(100) # prints: current: 100 - * f.resume(1, 2, 3) # prints: current: [1, 2, 3] - * f.resume # prints: current: nil - * # ... and so on ... - * - * If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current thread - * has a Fiber.scheduler defined, the Fiber becomes non-blocking (see "Non-blocking - * Fibers" section in class docs). + * f = Fiber.new do |initial| + * current = initial + * loop do + * puts "current: #{current.inspect}" + * current = Fiber.yield + * end + * end + * f.resume(100) # prints: current: 100 + * f.resume(1, 2, 3) # prints: current: [1, 2, 3] + * f.resume # prints: current: nil + * # ... and so on ... + * + * If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current + * thread has a Fiber.scheduler defined, the Fiber becomes non-blocking (see + * "Non-blocking Fibers" section in class docs). + * + * If the <tt>storage</tt> is unspecified, the default is to inherit a copy of + * the storage from the current fiber. This is the same as specifying + * <tt>storage: true</tt>. + * + * Fiber[:x] = 1 + * Fiber.new do + * Fiber[:x] # => 1 + * Fiber[:x] = 2 + * end.resume + * Fiber[:x] # => 1 + * + * If the given <tt>storage</tt> is <tt>nil</tt>, this function will lazy + * initialize the internal storage, which starts as an empty hash. + * + * Fiber[:x] = "Hello World" + * Fiber.new(storage: nil) do + * Fiber[:x] # nil + * end + * + * Otherwise, the given <tt>storage</tt> is used as the new fiber's storage, + * and it must be an instance of Hash. + * + * Explicitly using <tt>storage: true</tt> is currently experimental and may + * change in the future. */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) @@ -1921,9 +2380,15 @@ rb_fiber_initialize(int argc, VALUE* argv, VALUE self) } VALUE +rb_fiber_new_storage(rb_block_call_func_t func, VALUE obj, VALUE storage) +{ + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1, storage); +} + +VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); + return rb_fiber_new_storage(func, obj, Qtrue); } static VALUE @@ -1934,7 +2399,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) VALUE fiber = Qnil; if (scheduler != Qnil) { - fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + fiber = rb_fiber_scheduler_fiber(scheduler, argc, argv, kw_splat); } else { rb_raise(rb_eRuntimeError, "No scheduler is available!"); @@ -1977,7 +2442,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) * * Note that the behavior described above is how the method is <em>expected</em> * to behave, actual behavior is up to the current scheduler's implementation of - * Fiber::SchedulerInterface#fiber method. Ruby doesn't enforce this method to + * Fiber::Scheduler#fiber method. Ruby doesn't enforce this method to * behave in any particular way. * * If the scheduler is not set, the method raises @@ -1996,7 +2461,7 @@ rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj) * * Returns the Fiber scheduler, that was last set for the current thread with Fiber.set_scheduler. * Returns +nil+ if no scheduler is set (which is the default), and non-blocking fibers' - # behavior is the same as blocking. + * behavior is the same as blocking. * (see "Non-blocking fibers" section in class docs for details about the scheduler concept). * */ @@ -2030,7 +2495,7 @@ rb_fiber_current_scheduler(VALUE klass) * thread will call scheduler's +close+ method on finalization (allowing the scheduler to * properly manage all non-finished fibers). * - * +scheduler+ can be an object of any class corresponding to Fiber::SchedulerInterface. Its + * +scheduler+ can be an object of any class corresponding to Fiber::Scheduler. Its * implementation is up to the user. * * See also the "Non-blocking fibers" section in class docs. @@ -2051,7 +2516,6 @@ rb_fiber_start(rb_fiber_t *fiber) rb_proc_t *proc; enum ruby_tag_type state; - int need_interrupt = TRUE; VM_ASSERT(th->ec == GET_EC()); VM_ASSERT(FIBER_RESUMED_P(fiber)); @@ -2077,6 +2541,7 @@ rb_fiber_start(rb_fiber_t *fiber) } EC_POP_TAG(); + int need_interrupt = TRUE; VALUE err = Qfalse; if (state) { err = th->ec->errinfo; @@ -2085,37 +2550,22 @@ rb_fiber_start(rb_fiber_t *fiber) if (state == TAG_RAISE) { // noop... } + else if (state == TAG_FATAL && err == RUBY_FATAL_FIBER_KILLED) { + need_interrupt = FALSE; + err = Qfalse; + } else if (state == TAG_FATAL) { rb_threadptr_pending_interrupt_enque(th, err); } else { err = rb_vm_make_jump_tag_but_local_jump(state, err); } - need_interrupt = TRUE; } rb_fiber_terminate(fiber, need_interrupt, err); } -static rb_fiber_t * -root_fiber_alloc(rb_thread_t *th) -{ - VALUE fiber_value = fiber_alloc(rb_cFiber); - rb_fiber_t *fiber = th->ec->fiber_ptr; - - VM_ASSERT(DATA_PTR(fiber_value) == NULL); - VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); - VM_ASSERT(fiber->status == FIBER_RESUMED); - - th->root_fiber = fiber; - DATA_PTR(fiber_value) = fiber; - fiber->cont.self = fiber_value; - - coroutine_initialize_main(&fiber->context); - - return fiber; -} - +// Set up a "root fiber", which is the fiber that every Ractor has. void rb_threadptr_root_fiber_setup(rb_thread_t *th) { @@ -2128,11 +2578,10 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) fiber->cont.saved_ec.fiber_ptr = fiber; fiber->cont.saved_ec.thread_ptr = th; fiber->blocking = 1; + fiber->killed = 0; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ th->ec = &fiber->cont.saved_ec; - // This skips mjit_cont_new for the initial thread because mjit_enabled is always false - // at this point. mjit_init calls rb_fiber_init_mjit_cont again for this root_fiber. - rb_fiber_init_mjit_cont(fiber); + cont_init_jit_cont(&fiber->cont); } void @@ -2142,12 +2591,12 @@ rb_threadptr_root_fiber_release(rb_thread_t *th) /* ignore. A root fiber object will free th->ec */ } else { - rb_execution_context_t *ec = GET_EC(); + rb_execution_context_t *ec = rb_current_execution_context(false); VM_ASSERT(th->ec->fiber_ptr->cont.type == FIBER_CONTEXT); VM_ASSERT(th->ec->fiber_ptr->cont.self == 0); - if (th->ec == ec) { + if (ec && th->ec == ec) { rb_ractor_set_current_ec(th->ractor, NULL); } fiber_free(th->ec->fiber_ptr); @@ -2167,16 +2616,6 @@ rb_threadptr_root_fiber_terminate(rb_thread_t *th) } static inline rb_fiber_t* -fiber_current(void) -{ - rb_execution_context_t *ec = GET_EC(); - if (ec->fiber_ptr->cont.self == 0) { - root_fiber_alloc(rb_ec_thread_ptr(ec)); - } - return ec->fiber_ptr; -} - -static inline rb_fiber_t* return_fiber(bool terminate) { rb_fiber_t *fiber = fiber_current(); @@ -2238,6 +2677,19 @@ fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) fiber_setcontext(next_fiber, fiber); } +static void +fiber_check_killed(rb_fiber_t *fiber) +{ + VM_ASSERT(fiber == fiber_current()); + + if (fiber->killed) { + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; + + thread->ec->errinfo = RUBY_FATAL_FIBER_KILLED; + EC_JUMP_TAG(thread->ec, RUBY_TAG_FATAL); + } +} + static inline VALUE fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fiber_t *resuming_fiber, bool yielding) { @@ -2326,7 +2778,14 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fi current_fiber = th->ec->fiber_ptr; value = current_fiber->cont.value; - if (current_fiber->cont.argc == -1) rb_exc_raise(value); + + fiber_check_killed(current_fiber); + + if (current_fiber->cont.argc == -1) { + // Fiber#raise will trigger this path. + rb_exc_raise(value); + } + return value; } @@ -2353,7 +2812,62 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) VALUE rb_fiber_blocking_p(VALUE fiber) { - return RBOOL(fiber_ptr(fiber)->blocking != 0); + return RBOOL(fiber_ptr(fiber)->blocking); +} + +static VALUE +fiber_blocking_yield(VALUE fiber_value) +{ + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + VM_ASSERT(fiber->blocking == 0); + + // fiber->blocking is `unsigned int : 1`, so we use it as a boolean: + fiber->blocking = 1; + + // Once the fiber is blocking, and current, we increment the thread blocking state: + th->blocking += 1; + + return rb_yield(fiber_value); +} + +static VALUE +fiber_blocking_ensure(VALUE fiber_value) +{ + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + // We are no longer blocking: + fiber->blocking = 0; + th->blocking -= 1; + + return Qnil; +} + +/* + * call-seq: + * Fiber.blocking{|fiber| ...} -> result + * + * Forces the fiber to be blocking for the duration of the block. Returns the + * result of the block. + * + * See the "Non-blocking fibers" section in class docs for details. + * + */ +VALUE +rb_fiber_blocking(VALUE class) +{ + VALUE fiber_value = rb_fiber_current(); + rb_fiber_t *fiber = fiber_ptr(fiber_value); + + // If we are already blocking, this is essentially a no-op: + if (fiber->blocking) { + return rb_yield(fiber_value); + } + else { + return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value); + } } /* @@ -2439,9 +2953,7 @@ fiber_resume_kw(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat) rb_raise(rb_eFiberError, "attempt to resume a transferring fiber"); } - VALUE result = fiber_switch(fiber, argc, argv, kw_splat, fiber, false); - - return result; + return fiber_switch(fiber, argc, argv, kw_splat, fiber, false); } VALUE @@ -2713,14 +3225,9 @@ rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass) } static VALUE -fiber_raise(rb_fiber_t *fiber, int argc, const VALUE *argv) +fiber_raise(rb_fiber_t *fiber, VALUE exception) { - VALUE exception = rb_make_exception(argc, argv); - - if (fiber->resuming_fiber) { - rb_raise(rb_eFiberError, "attempt to raise a resuming fiber"); - } - else if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) { + if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) { return fiber_transfer_kw(fiber, -1, &exception, RB_NO_KEYWORDS); } else { @@ -2731,7 +3238,9 @@ fiber_raise(rb_fiber_t *fiber, int argc, const VALUE *argv) VALUE rb_fiber_raise(VALUE fiber, int argc, const VALUE *argv) { - return fiber_raise(fiber_ptr(fiber), argc, argv); + VALUE exception = rb_make_exception(argc, argv); + + return fiber_raise(fiber_ptr(fiber), exception); } /* @@ -2754,6 +3263,10 @@ rb_fiber_raise(VALUE fiber, int argc, const VALUE *argv) * the exception, and the third parameter is an array of callback information. * Exceptions are caught by the +rescue+ clause of <code>begin...end</code> * blocks. + * + * Raises +FiberError+ if called on a Fiber belonging to another +Thread+. + * + * See Kernel#raise for more information. */ static VALUE rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) @@ -2763,6 +3276,46 @@ rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) /* * call-seq: + * fiber.kill -> nil + * + * Terminates the fiber by raising an uncatchable exception. + * It only terminates the given fiber and no other fiber, returning +nil+ to + * another fiber if that fiber was calling #resume or #transfer. + * + * <tt>Fiber#kill</tt> only interrupts another fiber when it is in Fiber.yield. + * If called on the current fiber then it raises that exception at the <tt>Fiber#kill</tt> call site. + * + * If the fiber has not been started, transition directly to the terminated state. + * + * If the fiber is already terminated, does nothing. + * + * Raises FiberError if called on a fiber belonging to another thread. + */ +static VALUE +rb_fiber_m_kill(VALUE self) +{ + rb_fiber_t *fiber = fiber_ptr(self); + + if (fiber->killed) return Qfalse; + fiber->killed = 1; + + if (fiber->status == FIBER_CREATED) { + fiber->status = FIBER_TERMINATED; + } + else if (fiber->status != FIBER_TERMINATED) { + if (fiber_current() == fiber) { + fiber_check_killed(fiber); + } + else { + fiber_raise(fiber_ptr(self), Qnil); + } + } + + return self; +} + +/* + * call-seq: * Fiber.current -> fiber * * Returns the current fiber. If you are not running in the context of @@ -2893,329 +3446,6 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) * fiber.resume #=> FiberError: dead fiber called */ -/* - * Document-class: Fiber::SchedulerInterface - * - * This is not an existing class, but documentation of the interface that Scheduler - * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking - * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations - * of some concepts. - * - * Scheduler's behavior and usage are expected to be as follows: - * - * * When the execution in the non-blocking Fiber reaches some blocking operation (like - * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's - * hook methods, listed below. - * * Scheduler somehow registers what the current fiber is waiting on, and yields control - * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its - * wait to end, and other fibers in the same thread can perform) - * * At the end of the current thread execution, the scheduler's method #close is called - * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has - * registered on hook calls) and resuming them when the awaited resource is ready - * (e.g. I/O ready or sleep time elapsed). - * - * A typical implementation would probably rely for this closing loop on a gem like - * EventMachine[https://github.com/eventmachine/eventmachine] or - * Async[https://github.com/socketry/async]. - * - * This way concurrent execution will be achieved transparently for every - * individual Fiber's code. - * - * Hook methods are: - * - * * #io_wait, #io_read, and #io_write - * * #process_wait - * * #kernel_sleep - * * #timeout_after - * * #address_resolve - * * #block and #unblock - * * (the list is expanded as Ruby developers make more methods having non-blocking calls) - * - * When not specified otherwise, the hook implementations are mandatory: if they are not - * implemented, the methods trying to call hook will fail. To provide backward compatibility, - * in the future hooks will be optional (if they are not implemented, due to the scheduler - * being created for the older Ruby version, the code which needs this hook will not fail, - * and will just behave in a blocking fashion). - * - * It is also strongly recommended that the scheduler implements the #fiber method, which is - * delegated to by Fiber.schedule. - * - * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in - * <tt>test/fiber/scheduler.rb</tt> - * - */ - -#if 0 /* for RDoc */ -/* - * - * Document-method: Fiber::SchedulerInterface#close - * - * Called when the current thread exits. The scheduler is expected to implement this - * method in order to allow all waiting fibers to finalize their execution. - * - * The suggested pattern is to implement the main event loop in the #close method. - * - */ -static VALUE -rb_fiber_scheduler_interface_close(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#process_wait - * call-seq: process_wait(pid, flags) - * - * Invoked by Process::Status.wait in order to wait for a specified process. - * See that method description for arguments description. - * - * Suggested minimal implementation: - * - * Thread.new do - * Process::Status.wait(pid, flags) - * end.value - * - * This hook is optional: if it is not present in the current scheduler, - * Process::Status.wait will behave as a blocking method. - * - * Expected to return a Process::Status instance. - */ -static VALUE -rb_fiber_scheduler_interface_process_wait(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_wait - * call-seq: io_wait(io, events, timeout) - * - * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the - * specified descriptor is ready for specified events within - * the specified +timeout+. - * - * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and - * <tt>IO::PRIORITY</tt>. - * - * Suggested implementation should register which Fiber is waiting for which - * resources and immediately calling Fiber.yield to pass control to other - * fibers. Then, in the #close method, the scheduler might dispatch all the - * I/O resources to fibers waiting for it. - * - * Expected to return the subset of events that are ready immediately. - * - */ -static VALUE -rb_fiber_scheduler_interface_io_wait(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_read - * call-seq: io_read(io, buffer, length) -> read length or -errno - * - * Invoked by IO#read to read +length+ bytes from +io+ into a specified - * +buffer+ (see IO::Buffer). - * - * The +length+ argument is the "minimum length to be read". - * If the IO buffer size is 8KiB, but the +length+ is +1024+ (1KiB), up to - * 8KiB might be read, but at least 1KiB will be. - * Generally, the only case where less data than +length+ will be read is if - * there is an error reading the data. - * - * Specifying a +length+ of 0 is valid and means try reading at least once - * and return any available data. - * - * Suggested implementation should try to read from +io+ in a non-blocking - * manner and call #io_wait if the +io+ is not ready (which will yield control - * to other fibers). - * - * See IO::Buffer for an interface available to return data. - * - * Expected to return number of bytes read, or, in case of an error, <tt>-errno</tt> - * (negated number corresponding to system's error code). - * - * The method should be considered _experimental_. - */ -static VALUE -rb_fiber_scheduler_interface_io_read(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_write - * call-seq: io_write(io, buffer, length) -> written length or -errno - * - * Invoked by IO#write to write +length+ bytes to +io+ from - * from a specified +buffer+ (see IO::Buffer). - * - * The +length+ argument is the "(minimum) length to be written". - * If the IO buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), - * at most 8KiB will be written, but at least 1KiB will be. - * Generally, the only case where less data than +length+ will be written is if - * there is an error writing the data. - * - * Specifying a +length+ of 0 is valid and means try writing at least once, - * as much data as possible. - * - * Suggested implementation should try to write to +io+ in a non-blocking - * manner and call #io_wait if the +io+ is not ready (which will yield control - * to other fibers). - * - * See IO::Buffer for an interface available to get data from buffer efficiently. - * - * Expected to return number of bytes written, or, in case of an error, <tt>-errno</tt> - * (negated number corresponding to system's error code). - * - * The method should be considered _experimental_. - */ -static VALUE -rb_fiber_scheduler_interface_io_write(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#kernel_sleep - * call-seq: kernel_sleep(duration = nil) - * - * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide - * an implementation of sleeping in a non-blocking way. Implementation might - * register the current fiber in some list of "which fiber wait until what - * moment", call Fiber.yield to pass control, and then in #close resume - * the fibers whose wait period has elapsed. - * - */ -static VALUE -rb_fiber_scheduler_interface_kernel_sleep(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#address_resolve - * call-seq: address_resolve(hostname) -> array_of_strings or nil - * - * Invoked by any method that performs a non-reverse DNS lookup. The most - * notable method is Addrinfo.getaddrinfo, but there are many other. - * - * The method is expected to return an array of strings corresponding to ip - * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved. - * - * Fairly exhaustive list of all possible call-sites: - * - * - Addrinfo.getaddrinfo - * - Addrinfo.tcp - * - Addrinfo.udp - * - Addrinfo.ip - * - Addrinfo.new - * - Addrinfo.marshal_load - * - SOCKSSocket.new - * - TCPServer.new - * - TCPSocket.new - * - IPSocket.getaddress - * - TCPSocket.gethostbyname - * - UDPSocket#connect - * - UDPSocket#bind - * - UDPSocket#send - * - Socket.getaddrinfo - * - Socket.gethostbyname - * - Socket.pack_sockaddr_in - * - Socket.sockaddr_in - * - Socket.unpack_sockaddr_in - */ -static VALUE -rb_fiber_scheduler_interface_address_resolve(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#timeout_after - * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block - * - * Invoked by Timeout.timeout to execute the given +block+ within the given - * +duration+. It can also be invoked directly by the scheduler or user code. - * - * Attempt to limit the execution time of a given +block+ to the given - * +duration+ if possible. When a non-blocking operation causes the +block+'s - * execution time to exceed the specified +duration+, that non-blocking - * operation should be interrupted by raising the specified +exception_class+ - * constructed with the given +exception_arguments+. - * - * General execution timeouts are often considered risky. This implementation - * will only interrupt non-blocking operations. This is by design because it's - * expected that non-blocking operations can fail for a variety of - * unpredictable reasons, so applications should already be robust in handling - * these conditions and by implication timeouts. - * - * However, as a result of this design, if the +block+ does not invoke any - * non-blocking operations, it will be impossible to interrupt it. If you - * desire to provide predictable points for timeouts, consider adding - * +sleep(0)+. - * - * If the block is executed successfully, its result will be returned. - * - * The exception will typically be raised using Fiber#raise. - */ -static VALUE -rb_fiber_scheduler_interface_timeout_after(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#block - * call-seq: block(blocker, timeout = nil) - * - * Invoked by methods like Thread.join, and by Mutex, to signify that current - * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has - * elapsed. - * - * +blocker+ is what we are waiting on, informational only (for debugging and - * logging). There are no guarantee about its value. - * - * Expected to return boolean, specifying whether the blocking operation was - * successful or not. - */ -static VALUE -rb_fiber_scheduler_interface_block(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#unblock - * call-seq: unblock(blocker, fiber) - * - * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock - * calls #block and Mutex#unlock calls #unblock). The scheduler should use - * the +fiber+ parameter to understand which fiber is unblocked. - * - * +blocker+ is what was awaited for, but it is informational only (for debugging - * and logging), and it is not guaranteed to be the same value as the +blocker+ for - * #block. - * - */ -static VALUE -rb_fiber_scheduler_interface_unblock(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#fiber - * call-seq: fiber(&block) - * - * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately - * run the given block of code in a separate non-blocking fiber, and to return that Fiber. - * - * Minimal suggested implementation is: - * - * def fiber(&block) - * fiber = Fiber.new(blocking: false, &block) - * fiber.resume - * fiber - * end - */ -static VALUE -rb_fiber_scheduler_interface_fiber(VALUE self) -{ -} -#endif - void Init_Cont(void) { @@ -3237,10 +3467,20 @@ Init_Cont(void) fiber_initialize_keywords[0] = rb_intern_const("blocking"); fiber_initialize_keywords[1] = rb_intern_const("pool"); + fiber_initialize_keywords[2] = rb_intern_const("storage"); const char *fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks); + + if (shared_fiber_pool.free_stacks < 0) { + rb_warn("Setting RUBY_SHARED_FIBER_POOL_FREE_STACKS to a negative value is not allowed."); + shared_fiber_pool.free_stacks = 0; + } + + if (shared_fiber_pool.free_stacks > 1) { + rb_warn("Setting RUBY_SHARED_FIBER_POOL_FREE_STACKS to a value greater than 1 is operating system specific, and may cause crashes."); + } } rb_cFiber = rb_define_class("Fiber", rb_cObject); @@ -3248,10 +3488,17 @@ Init_Cont(void) rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_singleton_method(rb_cFiber, "current", rb_fiber_s_current, 0); + rb_define_singleton_method(rb_cFiber, "blocking", rb_fiber_blocking, 0); + rb_define_singleton_method(rb_cFiber, "[]", rb_fiber_storage_aref, 1); + rb_define_singleton_method(rb_cFiber, "[]=", rb_fiber_storage_aset, 2); + rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); + rb_define_method(rb_cFiber, "storage", rb_fiber_storage_get, 0); + rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_m_raise, -1); + rb_define_method(rb_cFiber, "kill", rb_fiber_m_kill, 0); rb_define_method(rb_cFiber, "backtrace", rb_fiber_backtrace, -1); rb_define_method(rb_cFiber, "backtrace_locations", rb_fiber_backtrace_locations, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); @@ -3266,21 +3513,6 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "schedule", rb_fiber_s_schedule, -1); -#if 0 /* for RDoc */ - rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject); - rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0); - rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0); - rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0); - rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_interface_io_read, 0); - rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_interface_io_write, 0); - rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0); - rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_interface_address_resolve, 0); - rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_interface_timeout_after, 0); - rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0); - rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0); - rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0); -#endif - #ifdef RB_EXPERIMENTAL_FIBER_POOL rb_cFiberPool = rb_define_class_under(rb_cFiber, "Pool", rb_cObject); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); |