diff options
Diffstat (limited to 'cont.c')
-rw-r--r-- | cont.c | 1084 |
1 files changed, 736 insertions, 348 deletions
@@ -16,17 +16,29 @@ #include <sys/mman.h> #endif +// On Solaris, madvise() is NOT declared for SUS (XPG4v2) or later, +// but MADV_* macros are defined when __EXTENSIONS__ is defined. +#ifdef NEED_MADVICE_PROTOTYPE_USING_CADDR_T +#include <sys/types.h> +extern int madvise(caddr_t, size_t, int); +#endif + #include COROUTINE_H #include "eval_intern.h" -#include "gc.h" #include "internal.h" #include "internal/cont.h" +#include "internal/thread.h" +#include "internal/error.h" +#include "internal/gc.h" #include "internal/proc.h" +#include "internal/sanitizers.h" #include "internal/warnings.h" #include "ruby/fiber/scheduler.h" -#include "mjit.h" +#include "rjit.h" +#include "yjit.h" #include "vm_core.h" +#include "vm_sync.h" #include "id_table.h" #include "ractor_core.h" @@ -55,6 +67,9 @@ static VALUE rb_cFiberPool; #define FIBER_POOL_INITIAL_SIZE 32 #define FIBER_POOL_ALLOCATION_MAXIMUM_SIZE 1024 #endif +#ifdef RB_EXPERIMENTAL_FIBER_POOL +#define FIBER_POOL_ALLOCATION_FREE +#endif enum context_type { CONTINUATION_CONTEXT = 0, @@ -162,7 +177,7 @@ struct fiber_pool { // A singly-linked list of allocations which contain 1 or more stacks each. struct fiber_pool_allocation * allocations; - // Provides O(1) stack "allocation": + // Free list that provides O(1) stack "allocation". struct fiber_pool_vacancy * vacancies; // The size of the stack allocations (excluding any guard page). @@ -174,16 +189,27 @@ struct fiber_pool { // The initial number of stacks to allocate. size_t initial_count; - // Whether to madvise(free) the stack or not: + // Whether to madvise(free) the stack or not. + // If this value is set to 1, the stack will be madvise(free)ed + // (or equivalent), where possible, when it is returned to the pool. int free_stacks; // The number of stacks that have been used in this pool. size_t used; - // The amount to allocate for the vm_stack: + // The amount to allocate for the vm_stack. size_t vm_stack_size; }; +// Continuation contexts used by JITs +struct rb_jit_cont { + rb_execution_context_t *ec; // continuation ec + struct rb_jit_cont *prev, *next; // used to form lists +}; + +// Doubly linked list for enumerating all on-stack ISEQs. +static struct rb_jit_cont *first_jit_cont; + typedef struct rb_context_struct { enum context_type type; int argc; @@ -201,22 +227,21 @@ typedef struct rb_context_struct { rb_execution_context_t saved_ec; rb_jmpbuf_t jmpbuf; rb_ensure_entry_t *ensure_array; - /* Pointer to MJIT info about the continuation. */ - struct mjit_cont *mjit_cont; + struct rb_jit_cont *jit_cont; // Continuation contexts for JITs } rb_context_t; - /* * Fiber status: - * [Fiber.new] ------> FIBER_CREATED - * | [Fiber#resume] - * v - * +--> FIBER_RESUMED ----+ - * [Fiber#resume] | | [Fiber.yield] | - * | v | - * +-- FIBER_SUSPENDED | [Terminate] - * | - * FIBER_TERMINATED <-+ + * [Fiber.new] ------> FIBER_CREATED ----> [Fiber#kill] --> | + * | [Fiber#resume] | + * v | + * +--> FIBER_RESUMED ----> [return] ------> | + * [Fiber#resume] | | [Fiber.yield/transfer] | + * [Fiber#transfer] | v | + * +--- FIBER_SUSPENDED --> [Fiber#kill] --> | + * | + * | + * FIBER_TERMINATED <-------------------+ */ enum fiber_status { FIBER_CREATED, @@ -242,18 +267,26 @@ struct rb_fiber_struct { unsigned int yielding : 1; unsigned int blocking : 1; + unsigned int killed : 1; + struct coroutine_context context; struct fiber_pool_stack stack; }; static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; -static ID fiber_initialize_keywords[2] = {0}; +void +rb_free_shared_fiber_pool(void) +{ + xfree(shared_fiber_pool.allocations); +} + +static ID fiber_initialize_keywords[3] = {0}; /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL * if MAP_STACK is passed. - * http://www.FreeBSD.org/cgi/query-pr.cgi?pr=158755 + * https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=158755 */ #if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__) #define FIBER_STACK_FLAGS (MAP_PRIVATE | MAP_ANON | MAP_STACK) @@ -264,7 +297,6 @@ static ID fiber_initialize_keywords[2] = {0}; #define ERRNOMSG strerror(errno) // Locates the stack vacancy details for the given stack. -// Requires that fiber_pool_vacancy fits within one page. inline static struct fiber_pool_vacancy * fiber_pool_vacancy_pointer(void * base, size_t size) { @@ -275,6 +307,24 @@ fiber_pool_vacancy_pointer(void * base, size_t size) ); } +#if defined(COROUTINE_SANITIZE_ADDRESS) +// Compute the base pointer for a vacant stack, for the area which can be poisoned. +inline static void * +fiber_pool_stack_poison_base(struct fiber_pool_stack * stack) +{ + STACK_GROW_DIR_DETECTION; + + return (char*)stack->base + STACK_DIR_UPPER(RB_PAGE_SIZE, 0); +} + +// Compute the size of the vacant stack, for the area that can be poisoned. +inline static size_t +fiber_pool_stack_poison_size(struct fiber_pool_stack * stack) +{ + return stack->size - RB_PAGE_SIZE; +} +#endif + // Reset the current stack pointer and available size of the given stack. inline static void fiber_pool_stack_reset(struct fiber_pool_stack * stack) @@ -624,6 +674,10 @@ fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) VM_ASSERT(vacancy); VM_ASSERT(vacancy->stack.base); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __asan_unpoison_memory_region(fiber_pool_stack_poison_base(&vacancy->stack), fiber_pool_stack_poison_size(&vacancy->stack)); +#endif + // Take the top item from the free list: fiber_pool->used += 1; @@ -647,28 +701,54 @@ fiber_pool_stack_free(struct fiber_pool_stack * stack) // If this is not true, the vacancy information will almost certainly be destroyed: VM_ASSERT(size <= (stack->size - RB_PAGE_SIZE)); - if (DEBUG) fprintf(stderr, "fiber_pool_stack_free: %p+%"PRIuSIZE" [base=%p, size=%"PRIuSIZE"]\n", base, size, stack->base, stack->size); + int advice = stack->pool->free_stacks >> 1; + + if (DEBUG) fprintf(stderr, "fiber_pool_stack_free: %p+%"PRIuSIZE" [base=%p, size=%"PRIuSIZE"] advice=%d\n", base, size, stack->base, stack->size, advice); -#if VM_CHECK_MODE > 0 && defined(MADV_DONTNEED) + // The pages being used by the stack can be returned back to the system. + // That doesn't change the page mapping, but it does allow the system to + // reclaim the physical memory. + // Since we no longer care about the data itself, we don't need to page + // out to disk, since that is costly. Not all systems support that, so + // we try our best to select the most efficient implementation. + // In addition, it's actually slightly desirable to not do anything here, + // but that results in higher memory usage. + +#ifdef __wasi__ + // WebAssembly doesn't support madvise, so we just don't do anything. +#elif VM_CHECK_MODE > 0 && defined(MADV_DONTNEED) + if (!advice) advice = MADV_DONTNEED; // This immediately discards the pages and the memory is reset to zero. - madvise(base, size, MADV_DONTNEED); -#elif defined(POSIX_MADV_DONTNEED) - posix_madvise(base, size, POSIX_MADV_DONTNEED); + madvise(base, size, advice); #elif defined(MADV_FREE_REUSABLE) + if (!advice) advice = MADV_FREE_REUSABLE; + // Darwin / macOS / iOS. // Acknowledge the kernel down to the task info api we make this // page reusable for future use. - // As for MADV_FREE_REUSE below we ensure in the rare occasions the task was not + // As for MADV_FREE_REUSABLE below we ensure in the rare occasions the task was not // completed at the time of the call to re-iterate. - while (madvise(base, size, MADV_FREE_REUSABLE) == -1 && errno == EAGAIN); + while (madvise(base, size, advice) == -1 && errno == EAGAIN); #elif defined(MADV_FREE) - madvise(base, size, MADV_FREE); + if (!advice) advice = MADV_FREE; + // Recent Linux. + madvise(base, size, advice); #elif defined(MADV_DONTNEED) - madvise(base, size, MADV_DONTNEED); + if (!advice) advice = MADV_DONTNEED; + // Old Linux. + madvise(base, size, advice); +#elif defined(POSIX_MADV_DONTNEED) + if (!advice) advice = POSIX_MADV_DONTNEED; + // Solaris? + posix_madvise(base, size, advice); #elif defined(_WIN32) VirtualAlloc(base, size, MEM_RESET, PAGE_READWRITE); // Not available in all versions of Windows. //DiscardVirtualMemory(base, size); #endif + +#if defined(COROUTINE_SANITIZE_ADDRESS) + __asan_poison_memory_region(fiber_pool_stack_poison_base(stack), fiber_pool_stack_poison_size(stack)); +#endif } // Release and return a stack to the vacancy list. @@ -688,7 +768,7 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) fiber_pool_vacancy_reset(vacancy); // Push the vacancy into the vancancies list: - pool->vacancies = fiber_pool_vacancy_push(vacancy, stack->pool->vacancies); + pool->vacancies = fiber_pool_vacancy_push(vacancy, pool->vacancies); pool->used -= 1; #ifdef FIBER_POOL_ALLOCATION_FREE @@ -704,7 +784,8 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) fiber_pool_stack_free(&vacancy->stack); } #else - // This is entirely optional, but clears the dirty flag from the stack memory, so it won't get swapped to disk when there is memory pressure: + // This is entirely optional, but clears the dirty flag from the stack + // memory, so it won't get swapped to disk when there is memory pressure: if (stack->pool->free_stacks) { fiber_pool_stack_free(&vacancy->stack); } @@ -715,6 +796,9 @@ static inline void ec_switch(rb_thread_t *th, rb_fiber_t *fiber) { rb_execution_context_t *ec = &fiber->cont.saved_ec; +#ifdef RUBY_ASAN_ENABLED + ec->machine.asan_fake_stack_handle = asan_get_thread_fake_stack_handle(); +#endif rb_ractor_set_current_ec(th->ractor, th->ec = ec); // ruby_current_execution_context_ptr = th->ec = ec; @@ -741,6 +825,20 @@ static COROUTINE fiber_entry(struct coroutine_context * from, struct coroutine_context * to) { rb_fiber_t *fiber = to->argument; + +#if defined(COROUTINE_SANITIZE_ADDRESS) + // Address sanitizer will copy the previous stack base and stack size into + // the "from" fiber. `coroutine_initialize_main` doesn't generally know the + // stack bounds (base + size). Therefore, the main fiber `stack_base` and + // `stack_size` will be NULL/0. It's specifically important in that case to + // get the (base+size) of the previous fiber and save it, so that later when + // we return to the main coroutine, we don't supply (NULL, 0) to + // __sanitizer_start_switch_fiber which royally messes up the internal state + // of ASAN and causes (sometimes) the following message: + // "WARNING: ASan is ignoring requested __asan_handle_no_return" + __sanitizer_finish_switch_fiber(to->fake_stack, (const void**)&from->stack_base, &from->stack_size); +#endif + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; #ifdef COROUTINE_PTHREAD_CONTEXT @@ -781,7 +879,8 @@ fiber_initialize_coroutine(rb_fiber_t *fiber, size_t * vm_stack_size) return vm_stack; } -// Release the stack from the fiber, it's execution context, and return it to the fiber pool. +// Release the stack from the fiber, it's execution context, and return it to +// the fiber pool. static void fiber_stack_release(rb_fiber_t * fiber) { @@ -927,13 +1026,8 @@ cont_mark(void *ptr) cont->machine.stack + cont->machine.stack_size); } else { - /* fiber */ - const rb_fiber_t *fiber = (rb_fiber_t*)cont; - - if (!FIBER_TERMINATED_P(fiber)) { - rb_gc_mark_locations(cont->machine.stack, - cont->machine.stack + cont->machine.stack_size); - } + /* fiber machine context is marked as part of rb_execution_context_mark, no need to + * do anything here. */ } } @@ -948,6 +1042,8 @@ fiber_is_root_p(const rb_fiber_t *fiber) } #endif +static void jit_cont_free(struct rb_jit_cont *cont); + static void cont_free(void *ptr) { @@ -968,10 +1064,8 @@ cont_free(void *ptr) RUBY_FREE_UNLESS_NULL(cont->saved_vm_stack.ptr); - if (mjit_enabled) { - VM_ASSERT(cont->mjit_cont != NULL); - mjit_cont_free(cont->mjit_cont); - } + VM_ASSERT(cont->jit_cont != NULL); + jit_cont_free(cont->jit_cont); /* free rb_cont_t or rb_fiber_t */ ruby_xfree(ptr); RUBY_FREE_LEAVE("cont"); @@ -1075,7 +1169,9 @@ fiber_memsize(const void *ptr) */ if (saved_ec->local_storage && fiber != th->root_fiber) { size += rb_id_table_memsize(saved_ec->local_storage); + size += rb_obj_memsize_of(saved_ec->storage); } + size += cont_memsize(&fiber->cont); return size; } @@ -1110,6 +1206,7 @@ cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) } FLUSH_REGISTER_WINDOWS; + asan_unpoison_memory_region(cont->machine.stack_src, size, false); MEMCPY(cont->machine.stack, cont->machine.stack_src, VALUE, size); } @@ -1134,15 +1231,127 @@ cont_save_thread(rb_context_t *cont, rb_thread_t *th) sec->machine.stack_end = NULL; } +static rb_nativethread_lock_t jit_cont_lock; + +// Register a new continuation with execution context `ec`. Return JIT info about +// the continuation. +static struct rb_jit_cont * +jit_cont_new(rb_execution_context_t *ec) +{ + struct rb_jit_cont *cont; + + // We need to use calloc instead of something like ZALLOC to avoid triggering GC here. + // When this function is called from rb_thread_alloc through rb_threadptr_root_fiber_setup, + // the thread is still being prepared and marking it causes SEGV. + cont = calloc(1, sizeof(struct rb_jit_cont)); + if (cont == NULL) + rb_memerror(); + cont->ec = ec; + + rb_native_mutex_lock(&jit_cont_lock); + if (first_jit_cont == NULL) { + cont->next = cont->prev = NULL; + } + else { + cont->prev = NULL; + cont->next = first_jit_cont; + first_jit_cont->prev = cont; + } + first_jit_cont = cont; + rb_native_mutex_unlock(&jit_cont_lock); + + return cont; +} + +// Unregister continuation `cont`. static void -cont_init_mjit_cont(rb_context_t *cont) +jit_cont_free(struct rb_jit_cont *cont) +{ + if (!cont) return; + + rb_native_mutex_lock(&jit_cont_lock); + if (cont == first_jit_cont) { + first_jit_cont = cont->next; + if (first_jit_cont != NULL) + first_jit_cont->prev = NULL; + } + else { + cont->prev->next = cont->next; + if (cont->next != NULL) + cont->next->prev = cont->prev; + } + rb_native_mutex_unlock(&jit_cont_lock); + + free(cont); +} + +// Call a given callback against all on-stack ISEQs. +void +rb_jit_cont_each_iseq(rb_iseq_callback callback, void *data) { - VM_ASSERT(cont->mjit_cont == NULL); - if (mjit_enabled) { - cont->mjit_cont = mjit_cont_new(&(cont->saved_ec)); + struct rb_jit_cont *cont; + for (cont = first_jit_cont; cont != NULL; cont = cont->next) { + if (cont->ec->vm_stack == NULL) + continue; + + const rb_control_frame_t *cfp = cont->ec->cfp; + while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { + if (cfp->pc && cfp->iseq && imemo_type((VALUE)cfp->iseq) == imemo_iseq) { + callback(cfp->iseq, data); + } + cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); + } } } +#if USE_YJIT +// Update the jit_return of all CFPs to leave_exit unless it's leave_exception or not set. +// This prevents jit_exec_exception from jumping to the caller after invalidation. +void +rb_yjit_cancel_jit_return(void *leave_exit, void *leave_exception) +{ + struct rb_jit_cont *cont; + for (cont = first_jit_cont; cont != NULL; cont = cont->next) { + if (cont->ec->vm_stack == NULL) + continue; + + const rb_control_frame_t *cfp = cont->ec->cfp; + while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { + if (cfp->jit_return && cfp->jit_return != leave_exception) { + ((rb_control_frame_t *)cfp)->jit_return = leave_exit; + } + cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); + } + } +} +#endif + +// Finish working with jit_cont. +void +rb_jit_cont_finish(void) +{ + struct rb_jit_cont *cont, *next; + for (cont = first_jit_cont; cont != NULL; cont = next) { + next = cont->next; + free(cont); // Don't use xfree because it's allocated by calloc. + } + rb_native_mutex_destroy(&jit_cont_lock); +} + +static void +cont_init_jit_cont(rb_context_t *cont) +{ + VM_ASSERT(cont->jit_cont == NULL); + // We always allocate this since YJIT may be enabled later + cont->jit_cont = jit_cont_new(&(cont->saved_ec)); +} + +struct rb_execution_context_struct * +rb_fiberptr_get_ec(struct rb_fiber_struct *fiber) +{ + return &fiber->cont.saved_ec; +} + static void cont_init(rb_context_t *cont, rb_thread_t *th) { @@ -1152,7 +1361,7 @@ cont_init(rb_context_t *cont, rb_thread_t *th) cont->saved_ec.local_storage = NULL; cont->saved_ec.local_storage_recursive_hash = Qnil; cont->saved_ec.local_storage_recursive_hash_for_trace = Qnil; - cont_init_mjit_cont(cont); + cont_init_jit_cont(cont); } static rb_context_t * @@ -1181,11 +1390,11 @@ rb_fiberptr_blocking(struct rb_fiber_struct *fiber) return fiber->blocking; } -// This is used for root_fiber because other fibers call cont_init_mjit_cont through cont_new. +// Initialize the jit_cont_lock void -rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber) +rb_jit_cont_init(void) { - cont_init_mjit_cont(&fiber->cont); + rb_native_mutex_initialize(&jit_cont_lock); } #if 0 @@ -1208,17 +1417,14 @@ show_vm_pcs(const rb_control_frame_t *cfp, while (cfp != end_of_cfp) { int pc = 0; if (cfp->iseq) { - pc = cfp->pc - cfp->iseq->body->iseq_encoded; + pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded; } fprintf(stderr, "%2d pc: %d\n", i++, pc); cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); } } #endif -COMPILER_WARNING_PUSH -#ifdef __clang__ -COMPILER_WARNING_IGNORED(-Wduplicate-decl-specifier) -#endif + static VALUE cont_capture(volatile int *volatile stat) { @@ -1262,7 +1468,7 @@ cont_capture(volatile int *volatile stat) entry = cont->ensure_array = ALLOC_N(rb_ensure_entry_t,size+1); for (p=th->ec->ensure_list; p; p=p->next) { if (!p->entry.marker) - p->entry.marker = rb_ary_tmp_new(0); /* dummy object */ + p->entry.marker = rb_ary_hidden_new(0); /* dummy object */ *entry++ = p->entry; } entry->marker = 0; @@ -1283,7 +1489,6 @@ cont_capture(volatile int *volatile stat) return contval; } } -COMPILER_WARNING_POP static inline void cont_restore_thread(rb_context_t *cont) @@ -1361,17 +1566,24 @@ fiber_setcontext(rb_fiber_t *new_fiber, rb_fiber_t *old_fiber) } } - /* exchange machine_stack_start between old_fiber and new_fiber */ + /* these values are used in rb_gc_mark_machine_context to mark the fiber's stack. */ old_fiber->cont.saved_ec.machine.stack_start = th->ec->machine.stack_start; + old_fiber->cont.saved_ec.machine.stack_end = FIBER_TERMINATED_P(old_fiber) ? NULL : th->ec->machine.stack_end; - /* old_fiber->machine.stack_end should be NULL */ - old_fiber->cont.saved_ec.machine.stack_end = NULL; // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] -> %p[%p]\n", (void*)old_fiber, old_fiber->stack.base, (void*)new_fiber, new_fiber->stack.base); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __sanitizer_start_switch_fiber(FIBER_TERMINATED_P(old_fiber) ? NULL : &old_fiber->context.fake_stack, new_fiber->context.stack_base, new_fiber->context.stack_size); +#endif + /* swap machine context */ struct coroutine_context * from = coroutine_transfer(&old_fiber->context, &new_fiber->context); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __sanitizer_finish_switch_fiber(old_fiber->context.fake_stack, NULL, NULL); +#endif + if (from == NULL) { rb_syserr_fail(errno, "coroutine_transfer"); } @@ -1391,7 +1603,7 @@ cont_restore_1(rb_context_t *cont) cont_restore_thread(cont); /* restore machine stack */ -#ifdef _M_AMD64 +#if defined(_M_AMD64) && !defined(__MINGW64__) { /* workaround for x64 SEH */ jmp_buf buf; @@ -1431,6 +1643,10 @@ cont_restore_0(rb_context_t *cont, VALUE *addr_in_prev_frame) if (&space[0] > end) { # ifdef HAVE_ALLOCA volatile VALUE *sp = ALLOCA_N(VALUE, &space[0] - end); + // We need to make sure that the stack pointer is moved, + // but some compilers may remove the allocation by optimization. + // We hope that the following read/write will prevent such an optimization. + *sp = Qfalse; space[0] = *sp; # else cont_restore_0(cont, &space[0]); @@ -1556,6 +1772,13 @@ rb_callcc(VALUE self) return rb_yield(val); } } +#ifdef RUBY_ASAN_ENABLED +/* callcc can't possibly work with ASAN; see bug #20273. Also this function + * definition below avoids a "defined and not used" warning. */ +MAYBE_UNUSED(static void notusing_callcc(void)) { rb_callcc(Qnil); } +# define rb_callcc rb_f_notimplement +#endif + static VALUE make_passing_arg(int argc, const VALUE *argv) @@ -1642,7 +1865,7 @@ rollback_ensure_stack(VALUE self,rb_ensure_list_t *current,rb_ensure_entry_t *ta /* push ensure stack */ for (j = 0; j < i; j++) { func = lookup_rollback_func(target[i - j - 1].e_proc); - if ((VALUE)func != Qundef) { + if (!UNDEF_P((VALUE)func)) { (*func)(target[i - j - 1].data2); } } @@ -1767,7 +1990,7 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * the current thread, blocking and non-blocking fibers' behavior is identical. * * Ruby doesn't provide a scheduler class: it is expected to be implemented by - * the user and correspond to Fiber::SchedulerInterface. + * the user and correspond to Fiber::Scheduler. * * There is also Fiber.schedule method, which is expected to immediately perform * the given block in a non-blocking manner. Its actual implementation is up to @@ -1802,6 +2025,7 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) fiber->cont.self = fiber_value; fiber->cont.type = FIBER_CONTEXT; fiber->blocking = blocking; + fiber->killed = 0; cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; @@ -1818,11 +2042,220 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) return fiber; } +static rb_fiber_t * +root_fiber_alloc(rb_thread_t *th) +{ + VALUE fiber_value = fiber_alloc(rb_cFiber); + rb_fiber_t *fiber = th->ec->fiber_ptr; + + VM_ASSERT(DATA_PTR(fiber_value) == NULL); + VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); + VM_ASSERT(FIBER_RESUMED_P(fiber)); + + th->root_fiber = fiber; + DATA_PTR(fiber_value) = fiber; + fiber->cont.self = fiber_value; + + coroutine_initialize_main(&fiber->context); + + return fiber; +} + +static inline rb_fiber_t* +fiber_current(void) +{ + rb_execution_context_t *ec = GET_EC(); + if (ec->fiber_ptr->cont.self == 0) { + root_fiber_alloc(rb_ec_thread_ptr(ec)); + } + return ec->fiber_ptr; +} + +static inline VALUE +current_fiber_storage(void) +{ + rb_execution_context_t *ec = GET_EC(); + return ec->storage; +} + +static inline VALUE +inherit_fiber_storage(void) +{ + return rb_obj_dup(current_fiber_storage()); +} + +static inline void +fiber_storage_set(struct rb_fiber_struct *fiber, VALUE storage) +{ + fiber->cont.saved_ec.storage = storage; +} + +static inline VALUE +fiber_storage_get(rb_fiber_t *fiber, int allocate) +{ + VALUE storage = fiber->cont.saved_ec.storage; + if (storage == Qnil && allocate) { + storage = rb_hash_new(); + fiber_storage_set(fiber, storage); + } + return storage; +} + +static void +storage_access_must_be_from_same_fiber(VALUE self) +{ + rb_fiber_t *fiber = fiber_ptr(self); + rb_fiber_t *current = fiber_current(); + if (fiber != current) { + rb_raise(rb_eArgError, "Fiber storage can only be accessed from the Fiber it belongs to"); + } +} + +/** + * call-seq: fiber.storage -> hash (dup) + * + * Returns a copy of the storage hash for the fiber. The method can only be called on the + * Fiber.current. + */ +static VALUE +rb_fiber_storage_get(VALUE self) +{ + storage_access_must_be_from_same_fiber(self); + + VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE); + + if (storage == Qnil) { + return Qnil; + } + else { + return rb_obj_dup(storage); + } +} + +static int +fiber_storage_validate_each(VALUE key, VALUE value, VALUE _argument) +{ + Check_Type(key, T_SYMBOL); + + return ST_CONTINUE; +} + +static void +fiber_storage_validate(VALUE value) +{ + // nil is an allowed value and will be lazily initialized. + if (value == Qnil) return; + + if (!RB_TYPE_P(value, T_HASH)) { + rb_raise(rb_eTypeError, "storage must be a hash"); + } + + if (RB_OBJ_FROZEN(value)) { + rb_raise(rb_eFrozenError, "storage must not be frozen"); + } + + rb_hash_foreach(value, fiber_storage_validate_each, Qundef); +} + +/** + * call-seq: fiber.storage = hash + * + * Sets the storage hash for the fiber. This feature is experimental + * and may change in the future. The method can only be called on the + * Fiber.current. + * + * You should be careful about using this method as you may inadvertently clear + * important fiber-storage state. You should mostly prefer to assign specific + * keys in the storage using Fiber::[]=. + * + * You can also use <tt>Fiber.new(storage: nil)</tt> to create a fiber with an empty + * storage. + * + * Example: + * + * while request = request_queue.pop + * # Reset the per-request state: + * Fiber.current.storage = nil + * handle_request(request) + * end + */ static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) +rb_fiber_storage_set(VALUE self, VALUE value) { + if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { + rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL, + "Fiber#storage= is experimental and may be removed in the future!"); + } + + storage_access_must_be_from_same_fiber(self); + fiber_storage_validate(value); + + fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value); + return value; +} + +/** + * call-seq: Fiber[key] -> value + * + * Returns the value of the fiber storage variable identified by +key+. + * + * The +key+ must be a symbol, and the value is set by Fiber#[]= or + * Fiber#store. + * + * See also Fiber::[]=. + */ +static VALUE +rb_fiber_storage_aref(VALUE class, VALUE key) +{ + Check_Type(key, T_SYMBOL); + + VALUE storage = fiber_storage_get(fiber_current(), FALSE); + if (storage == Qnil) return Qnil; + + return rb_hash_aref(storage, key); +} + +/** + * call-seq: Fiber[key] = value + * + * Assign +value+ to the fiber storage variable identified by +key+. + * The variable is created if it doesn't exist. + * + * +key+ must be a Symbol, otherwise a TypeError is raised. + * + * See also Fiber::[]. + */ +static VALUE +rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value) +{ + Check_Type(key, T_SYMBOL); + + VALUE storage = fiber_storage_get(fiber_current(), value != Qnil); + if (storage == Qnil) return Qnil; + + if (value == Qnil) { + return rb_hash_delete(storage, key); + } + else { + return rb_hash_aset(storage, key, value); + } +} + +static VALUE +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage) +{ + if (storage == Qundef || storage == Qtrue) { + // The default, inherit storage (dup) from the current fiber: + storage = inherit_fiber_storage(); + } + else /* nil, hash, etc. */ { + fiber_storage_validate(storage); + storage = rb_obj_dup(storage); + } + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); + fiber->cont.saved_ec.storage = storage; fiber->first_proc = proc; fiber->stack.base = NULL; fiber->stack.pool = fiber_pool; @@ -1855,54 +2288,90 @@ rb_fiber_pool_default(VALUE pool) return &shared_fiber_pool; } +VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb_fiber_struct *fiber) +{ + VALUE storage = rb_obj_dup(ec->storage); + fiber->cont.saved_ec.storage = storage; + return storage; +} + /* :nodoc: */ static VALUE rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) { VALUE pool = Qnil; VALUE blocking = Qfalse; + VALUE storage = Qundef; if (kw_splat != RB_NO_KEYWORDS) { VALUE options = Qnil; - VALUE arguments[2] = {Qundef}; + VALUE arguments[3] = {Qundef}; argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); - rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 3, arguments); - if (arguments[0] != Qundef) { + if (!UNDEF_P(arguments[0])) { blocking = arguments[0]; } - if (arguments[1] != Qundef) { + if (!UNDEF_P(arguments[1])) { pool = arguments[1]; } + + storage = arguments[2]; } - return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage); } /* * call-seq: - * Fiber.new(blocking: false) { |*args| ... } -> fiber + * Fiber.new(blocking: false, storage: true) { |*args| ... } -> fiber * - * Creates new Fiber. Initially, the fiber is not running and can be resumed with - * #resume. Arguments to the first #resume call will be passed to the block: + * Creates new Fiber. Initially, the fiber is not running and can be resumed + * with #resume. Arguments to the first #resume call will be passed to the + * block: * - * f = Fiber.new do |initial| - * current = initial - * loop do - * puts "current: #{current.inspect}" - * current = Fiber.yield - * end - * end - * f.resume(100) # prints: current: 100 - * f.resume(1, 2, 3) # prints: current: [1, 2, 3] - * f.resume # prints: current: nil - * # ... and so on ... - * - * If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current thread - * has a Fiber.scheduler defined, the Fiber becomes non-blocking (see "Non-blocking - * Fibers" section in class docs). + * f = Fiber.new do |initial| + * current = initial + * loop do + * puts "current: #{current.inspect}" + * current = Fiber.yield + * end + * end + * f.resume(100) # prints: current: 100 + * f.resume(1, 2, 3) # prints: current: [1, 2, 3] + * f.resume # prints: current: nil + * # ... and so on ... + * + * If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current + * thread has a Fiber.scheduler defined, the Fiber becomes non-blocking (see + * "Non-blocking Fibers" section in class docs). + * + * If the <tt>storage</tt> is unspecified, the default is to inherit a copy of + * the storage from the current fiber. This is the same as specifying + * <tt>storage: true</tt>. + * + * Fiber[:x] = 1 + * Fiber.new do + * Fiber[:x] # => 1 + * Fiber[:x] = 2 + * end.resume + * Fiber[:x] # => 1 + * + * If the given <tt>storage</tt> is <tt>nil</tt>, this function will lazy + * initialize the internal storage, which starts as an empty hash. + * + * Fiber[:x] = "Hello World" + * Fiber.new(storage: nil) do + * Fiber[:x] # nil + * end + * + * Otherwise, the given <tt>storage</tt> is used as the new fiber's storage, + * and it must be an instance of Hash. + * + * Explicitly using <tt>storage: true</tt> is currently experimental and may + * change in the future. */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) @@ -1911,9 +2380,15 @@ rb_fiber_initialize(int argc, VALUE* argv, VALUE self) } VALUE +rb_fiber_new_storage(rb_block_call_func_t func, VALUE obj, VALUE storage) +{ + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 0, storage); +} + +VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); + return rb_fiber_new_storage(func, obj, Qtrue); } static VALUE @@ -1924,7 +2399,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) VALUE fiber = Qnil; if (scheduler != Qnil) { - fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + fiber = rb_fiber_scheduler_fiber(scheduler, argc, argv, kw_splat); } else { rb_raise(rb_eRuntimeError, "No scheduler is available!"); @@ -1967,7 +2442,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) * * Note that the behavior described above is how the method is <em>expected</em> * to behave, actual behavior is up to the current scheduler's implementation of - * Fiber::SchedulerInterface#fiber method. Ruby doesn't enforce this method to + * Fiber::Scheduler#fiber method. Ruby doesn't enforce this method to * behave in any particular way. * * If the scheduler is not set, the method raises @@ -1986,7 +2461,7 @@ rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj) * * Returns the Fiber scheduler, that was last set for the current thread with Fiber.set_scheduler. * Returns +nil+ if no scheduler is set (which is the default), and non-blocking fibers' - # behavior is the same as blocking. + * behavior is the same as blocking. * (see "Non-blocking fibers" section in class docs for details about the scheduler concept). * */ @@ -2020,7 +2495,7 @@ rb_fiber_current_scheduler(VALUE klass) * thread will call scheduler's +close+ method on finalization (allowing the scheduler to * properly manage all non-finished fibers). * - * +scheduler+ can be an object of any class corresponding to Fiber::SchedulerInterface. Its + * +scheduler+ can be an object of any class corresponding to Fiber::Scheduler. Its * implementation is up to the user. * * See also the "Non-blocking fibers" section in class docs. @@ -2032,7 +2507,7 @@ rb_fiber_set_scheduler(VALUE klass, VALUE scheduler) return rb_fiber_scheduler_set(scheduler); } -static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err); +NORETURN(static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err)); void rb_fiber_start(rb_fiber_t *fiber) @@ -2041,7 +2516,6 @@ rb_fiber_start(rb_fiber_t *fiber) rb_proc_t *proc; enum ruby_tag_type state; - int need_interrupt = TRUE; VM_ASSERT(th->ec == GET_EC()); VM_ASSERT(FIBER_RESUMED_P(fiber)); @@ -2067,6 +2541,7 @@ rb_fiber_start(rb_fiber_t *fiber) } EC_POP_TAG(); + int need_interrupt = TRUE; VALUE err = Qfalse; if (state) { err = th->ec->errinfo; @@ -2075,54 +2550,37 @@ rb_fiber_start(rb_fiber_t *fiber) if (state == TAG_RAISE) { // noop... } + else if (state == TAG_FATAL && err == RUBY_FATAL_FIBER_KILLED) { + need_interrupt = FALSE; + err = Qfalse; + } else if (state == TAG_FATAL) { rb_threadptr_pending_interrupt_enque(th, err); } else { err = rb_vm_make_jump_tag_but_local_jump(state, err); } - need_interrupt = TRUE; } rb_fiber_terminate(fiber, need_interrupt, err); } -static rb_fiber_t * -root_fiber_alloc(rb_thread_t *th) -{ - VALUE fiber_value = fiber_alloc(rb_cFiber); - rb_fiber_t *fiber = th->ec->fiber_ptr; - - VM_ASSERT(DATA_PTR(fiber_value) == NULL); - VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); - VM_ASSERT(fiber->status == FIBER_RESUMED); - - th->root_fiber = fiber; - DATA_PTR(fiber_value) = fiber; - fiber->cont.self = fiber_value; - - coroutine_initialize_main(&fiber->context); - - return fiber; -} - +// Set up a "root fiber", which is the fiber that every Ractor has. void rb_threadptr_root_fiber_setup(rb_thread_t *th) { - rb_fiber_t *fiber = ruby_mimmalloc(sizeof(rb_fiber_t)); + rb_fiber_t *fiber = ruby_mimcalloc(1, sizeof(rb_fiber_t)); if (!fiber) { rb_bug("%s", strerror(errno)); /* ... is it possible to call rb_bug here? */ } - MEMZERO(fiber, rb_fiber_t, 1); fiber->cont.type = FIBER_CONTEXT; fiber->cont.saved_ec.fiber_ptr = fiber; fiber->cont.saved_ec.thread_ptr = th; fiber->blocking = 1; + fiber->killed = 0; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ th->ec = &fiber->cont.saved_ec; - // This skips mjit_cont_new for the initial thread because mjit_enabled is always false - // at this point. mjit_init calls rb_fiber_init_mjit_cont again for this root_fiber. - rb_fiber_init_mjit_cont(fiber); + cont_init_jit_cont(&fiber->cont); } void @@ -2132,12 +2590,12 @@ rb_threadptr_root_fiber_release(rb_thread_t *th) /* ignore. A root fiber object will free th->ec */ } else { - rb_execution_context_t *ec = GET_EC(); + rb_execution_context_t *ec = rb_current_execution_context(false); VM_ASSERT(th->ec->fiber_ptr->cont.type == FIBER_CONTEXT); VM_ASSERT(th->ec->fiber_ptr->cont.self == 0); - if (th->ec == ec) { + if (ec && th->ec == ec) { rb_ractor_set_current_ec(th->ractor, NULL); } fiber_free(th->ec->fiber_ptr); @@ -2157,16 +2615,6 @@ rb_threadptr_root_fiber_terminate(rb_thread_t *th) } static inline rb_fiber_t* -fiber_current(void) -{ - rb_execution_context_t *ec = GET_EC(); - if (ec->fiber_ptr->cont.self == 0) { - root_fiber_alloc(rb_ec_thread_ptr(ec)); - } - return ec->fiber_ptr; -} - -static inline rb_fiber_t* return_fiber(bool terminate) { rb_fiber_t *fiber = fiber_current(); @@ -2228,6 +2676,19 @@ fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) fiber_setcontext(next_fiber, fiber); } +static void +fiber_check_killed(rb_fiber_t *fiber) +{ + VM_ASSERT(fiber == fiber_current()); + + if (fiber->killed) { + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; + + thread->ec->errinfo = RUBY_FATAL_FIBER_KILLED; + EC_JUMP_TAG(thread->ec, RUBY_TAG_FATAL); + } +} + static inline VALUE fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fiber_t *resuming_fiber, bool yielding) { @@ -2316,7 +2777,14 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fi current_fiber = th->ec->fiber_ptr; value = current_fiber->cont.value; - if (current_fiber->cont.argc == -1) rb_exc_raise(value); + + fiber_check_killed(current_fiber); + + if (current_fiber->cont.argc == -1) { + // Fiber#raise will trigger this path. + rb_exc_raise(value); + } + return value; } @@ -2343,7 +2811,62 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) VALUE rb_fiber_blocking_p(VALUE fiber) { - return RBOOL(fiber_ptr(fiber)->blocking != 0); + return RBOOL(fiber_ptr(fiber)->blocking); +} + +static VALUE +fiber_blocking_yield(VALUE fiber_value) +{ + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + VM_ASSERT(fiber->blocking == 0); + + // fiber->blocking is `unsigned int : 1`, so we use it as a boolean: + fiber->blocking = 1; + + // Once the fiber is blocking, and current, we increment the thread blocking state: + th->blocking += 1; + + return rb_yield(fiber_value); +} + +static VALUE +fiber_blocking_ensure(VALUE fiber_value) +{ + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + // We are no longer blocking: + fiber->blocking = 0; + th->blocking -= 1; + + return Qnil; +} + +/* + * call-seq: + * Fiber.blocking{|fiber| ...} -> result + * + * Forces the fiber to be blocking for the duration of the block. Returns the + * result of the block. + * + * See the "Non-blocking fibers" section in class docs for details. + * + */ +VALUE +rb_fiber_blocking(VALUE class) +{ + VALUE fiber_value = rb_fiber_current(); + rb_fiber_t *fiber = fiber_ptr(fiber_value); + + // If we are already blocking, this is essentially a no-op: + if (fiber->blocking) { + return rb_yield(fiber_value); + } + else { + return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value); + } } /* @@ -2401,6 +2924,7 @@ rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE error) fiber_switch(next_fiber, -1, &error, RB_NO_KEYWORDS, NULL, false); else fiber_switch(next_fiber, 1, &value, RB_NO_KEYWORDS, NULL, false); + ruby_stop(0); } static VALUE @@ -2428,9 +2952,7 @@ fiber_resume_kw(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat) rb_raise(rb_eFiberError, "attempt to resume a transferring fiber"); } - VALUE result = fiber_switch(fiber, argc, argv, kw_splat, fiber, false); - - return result; + return fiber_switch(fiber, argc, argv, kw_splat, fiber, false); } VALUE @@ -2476,7 +2998,7 @@ rb_fiber_reset_root_local_storage(rb_thread_t *th) VALUE rb_fiber_alive_p(VALUE fiber_value) { - return FIBER_TERMINATED_P(fiber_ptr(fiber_value)) ? Qfalse : Qtrue; + return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value))); } /* @@ -2702,12 +3224,13 @@ rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass) } static VALUE -fiber_raise(rb_fiber_t *fiber, int argc, const VALUE *argv) +fiber_raise(rb_fiber_t *fiber, VALUE exception) { - VALUE exception = rb_make_exception(argc, argv); - - if (fiber->resuming_fiber) { - rb_raise(rb_eFiberError, "attempt to raise a resuming fiber"); + if (fiber == fiber_current()) { + rb_exc_raise(exception); + } + else if (fiber->resuming_fiber) { + return fiber_raise(fiber->resuming_fiber, exception); } else if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) { return fiber_transfer_kw(fiber, -1, &exception, RB_NO_KEYWORDS); @@ -2720,7 +3243,9 @@ fiber_raise(rb_fiber_t *fiber, int argc, const VALUE *argv) VALUE rb_fiber_raise(VALUE fiber, int argc, const VALUE *argv) { - return fiber_raise(fiber_ptr(fiber), argc, argv); + VALUE exception = rb_make_exception(argc, argv); + + return fiber_raise(fiber_ptr(fiber), exception); } /* @@ -2743,6 +3268,10 @@ rb_fiber_raise(VALUE fiber, int argc, const VALUE *argv) * the exception, and the third parameter is an array of callback information. * Exceptions are caught by the +rescue+ clause of <code>begin...end</code> * blocks. + * + * Raises +FiberError+ if called on a Fiber belonging to another +Thread+. + * + * See Kernel#raise for more information. */ static VALUE rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) @@ -2752,6 +3281,46 @@ rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) /* * call-seq: + * fiber.kill -> nil + * + * Terminates the fiber by raising an uncatchable exception. + * It only terminates the given fiber and no other fiber, returning +nil+ to + * another fiber if that fiber was calling #resume or #transfer. + * + * <tt>Fiber#kill</tt> only interrupts another fiber when it is in Fiber.yield. + * If called on the current fiber then it raises that exception at the <tt>Fiber#kill</tt> call site. + * + * If the fiber has not been started, transition directly to the terminated state. + * + * If the fiber is already terminated, does nothing. + * + * Raises FiberError if called on a fiber belonging to another thread. + */ +static VALUE +rb_fiber_m_kill(VALUE self) +{ + rb_fiber_t *fiber = fiber_ptr(self); + + if (fiber->killed) return Qfalse; + fiber->killed = 1; + + if (fiber->status == FIBER_CREATED) { + fiber->status = FIBER_TERMINATED; + } + else if (fiber->status != FIBER_TERMINATED) { + if (fiber_current() == fiber) { + fiber_check_killed(fiber); + } + else { + fiber_raise(fiber_ptr(self), Qnil); + } + } + + return self; +} + +/* + * call-seq: * Fiber.current -> fiber * * Returns the current fiber. If you are not running in the context of @@ -2808,7 +3377,7 @@ fiber_pool_free(void *ptr) struct fiber_pool * fiber_pool = ptr; RUBY_FREE_ENTER("fiber_pool"); - fiber_pool_free_allocations(fiber_pool->allocations); + fiber_pool_allocation_free(fiber_pool->allocations); ruby_xfree(fiber_pool); RUBY_FREE_LEAVE("fiber_pool"); @@ -2834,9 +3403,9 @@ static const rb_data_type_t FiberPoolDataType = { static VALUE fiber_pool_alloc(VALUE klass) { - struct fiber_pool * fiber_pool = RB_ALLOC(struct fiber_pool); + struct fiber_pool *fiber_pool; - return TypedData_Wrap_Struct(klass, &FiberPoolDataType, fiber_pool); + return TypedData_Make_Struct(klass, struct fiber_pool, &FiberPoolDataType, fiber_pool); } static VALUE @@ -2850,7 +3419,7 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) rb_scan_args(argc, argv, "03", &size, &count, &vm_stack_size); if (NIL_P(size)) { - size = INT2NUM(th->vm->default_params.fiber_machine_stack_size); + size = SIZET2NUM(th->vm->default_params.fiber_machine_stack_size); } if (NIL_P(count)) { @@ -2858,7 +3427,7 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) } if (NIL_P(vm_stack_size)) { - vm_stack_size = INT2NUM(th->vm->default_params.fiber_vm_stack_size); + vm_stack_size = SIZET2NUM(th->vm->default_params.fiber_vm_stack_size); } TypedData_Get_Struct(self, struct fiber_pool, &FiberPoolDataType, fiber_pool); @@ -2882,193 +3451,6 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) * fiber.resume #=> FiberError: dead fiber called */ -/* - * Document-class: Fiber::SchedulerInterface - * - * This is not an existing class, but documentation of the interface that Scheduler - * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking - * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations - * of some concepts. - * - * Scheduler's behavior and usage are expected to be as follows: - * - * * When the execution in the non-blocking Fiber reaches some blocking operation (like - * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's - * hook methods, listed below. - * * Scheduler somehow registers what the current fiber is waiting on, and yields control - * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its - * wait to end, and other fibers in the same thread can perform) - * * At the end of the current thread execution, the scheduler's method #close is called - * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has - * registered on hook calls) and resuming them when the awaited resource is ready - * (e.g. I/O ready or sleep time elapsed). - * - * A typical implementation would probably rely for this closing loop on a gem like - * EventMachine[https://github.com/eventmachine/eventmachine] or - * Async[https://github.com/socketry/async]. - * - * This way concurrent execution will be achieved transparently for every - * individual Fiber's code. - * - * Hook methods are: - * - * * #io_wait - * * #process_wait - * * #kernel_sleep - * * #block and #unblock - * * (the list is expanded as Ruby developers make more methods having non-blocking calls) - * - * When not specified otherwise, the hook implementations are mandatory: if they are not - * implemented, the methods trying to call hook will fail. To provide backward compatibility, - * in the future hooks will be optional (if they are not implemented, due to the scheduler - * being created for the older Ruby version, the code which needs this hook will not fail, - * and will just behave in a blocking fashion). - * - * It is also strongly recommended that the scheduler implements the #fiber method, which is - * delegated to by Fiber.schedule. - * - * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in - * <tt>test/fiber/scheduler.rb</tt> - * - */ - -#if 0 /* for RDoc */ -/* - * - * Document-method: Fiber::SchedulerInterface#close - * - * Called when the current thread exits. The scheduler is expected to implement this - * method in order to allow all waiting fibers to finalize their execution. - * - * The suggested pattern is to implement the main event loop in the #close method. - * - */ -static VALUE -rb_fiber_scheduler_interface_close(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#process_wait - * call-seq: process_wait(pid, flags) - * - * Invoked by Process::Status.wait in order to wait for a specified process. - * See that method description for arguments description. - * - * Suggested minimal implementation: - * - * Thread.new do - * Process::Status.wait(pid, flags) - * end.value - * - * This hook is optional: if it is not present in the current scheduler, - * Process::Status.wait will behave as a blocking method. - * - * Expected to return a Process::Status instance. - */ -static VALUE -rb_fiber_scheduler_interface_process_wait(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#io_wait - * call-seq: io_wait(io, events, timeout) - * - * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the - * specified descriptor is ready for specified events within - * the specified +timeout+. - * - * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and - * <tt>IO::PRIORITY</tt>. - * - * Suggested implementation should register which Fiber is waiting for which - * resources and immediately calling Fiber.yield to pass control to other - * fibers. Then, in the #close method, the scheduler might dispatch all the - * I/O resources to fibers waiting for it. - * - * Expected to return the subset of events that are ready immediately. - * - */ -static VALUE -rb_fiber_scheduler_interface_io_wait(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#kernel_sleep - * call-seq: kernel_sleep(duration = nil) - * - * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide - * an implementation of sleeping in a non-blocking way. Implementation might - * register the current fiber in some list of "which fiber wait until what - * moment", call Fiber.yield to pass control, and then in #close resume - * the fibers whose wait period has elapsed. - * - */ -static VALUE -rb_fiber_scheduler_interface_kernel_sleep(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#block - * call-seq: block(blocker, timeout = nil) - * - * Invoked by methods like Thread.join, and by Mutex, to signify that current - * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has - * elapsed. - * - * +blocker+ is what we are waiting on, informational only (for debugging and - * logging). There are no guarantee about its value. - * - * Expected to return boolean, specifying whether the blocking operation was - * successful or not. - */ -static VALUE -rb_fiber_scheduler_interface_block(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#unblock - * call-seq: unblock(blocker, fiber) - * - * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock - * calls #block and Mutex#unlock calls #unblock). The scheduler should use - * the +fiber+ parameter to understand which fiber is unblocked. - * - * +blocker+ is what was awaited for, but it is informational only (for debugging - * and logging), and it is not guaranteed to be the same value as the +blocker+ for - * #block. - * - */ -static VALUE -rb_fiber_scheduler_interface_unblock(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#fiber - * call-seq: fiber(&block) - * - * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately - * run the given block of code in a separate non-blocking fiber, and to return that Fiber. - * - * Minimal suggested implementation is: - * - * def fiber(&block) - * fiber = Fiber.new(blocking: false, &block) - * fiber.resume - * fiber - * end - */ -static VALUE -rb_fiber_scheduler_interface_fiber(VALUE self) -{ -} -#endif - void Init_Cont(void) { @@ -3090,10 +3472,20 @@ Init_Cont(void) fiber_initialize_keywords[0] = rb_intern_const("blocking"); fiber_initialize_keywords[1] = rb_intern_const("pool"); + fiber_initialize_keywords[2] = rb_intern_const("storage"); const char *fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks); + + if (shared_fiber_pool.free_stacks < 0) { + rb_warn("Setting RUBY_SHARED_FIBER_POOL_FREE_STACKS to a negative value is not allowed."); + shared_fiber_pool.free_stacks = 0; + } + + if (shared_fiber_pool.free_stacks > 1) { + rb_warn("Setting RUBY_SHARED_FIBER_POOL_FREE_STACKS to a value greater than 1 is operating system specific, and may cause crashes."); + } } rb_cFiber = rb_define_class("Fiber", rb_cObject); @@ -3101,10 +3493,17 @@ Init_Cont(void) rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_singleton_method(rb_cFiber, "current", rb_fiber_s_current, 0); + rb_define_singleton_method(rb_cFiber, "blocking", rb_fiber_blocking, 0); + rb_define_singleton_method(rb_cFiber, "[]", rb_fiber_storage_aref, 1); + rb_define_singleton_method(rb_cFiber, "[]=", rb_fiber_storage_aset, 2); + rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); + rb_define_method(rb_cFiber, "storage", rb_fiber_storage_get, 0); + rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_m_raise, -1); + rb_define_method(rb_cFiber, "kill", rb_fiber_m_kill, 0); rb_define_method(rb_cFiber, "backtrace", rb_fiber_backtrace, -1); rb_define_method(rb_cFiber, "backtrace_locations", rb_fiber_backtrace_locations, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); @@ -3119,19 +3518,8 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "schedule", rb_fiber_s_schedule, -1); -#if 0 /* for RDoc */ - rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject); - rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0); - rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0); - rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0); - rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0); - rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0); - rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0); - rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0); -#endif - #ifdef RB_EXPERIMENTAL_FIBER_POOL - rb_cFiberPool = rb_define_class("Pool", rb_cFiber); + rb_cFiberPool = rb_define_class_under(rb_cFiber, "Pool", rb_cObject); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); rb_define_method(rb_cFiberPool, "initialize", rb_fiber_pool_initialize, -1); #endif |