diff options
Diffstat (limited to 'cont.c')
| -rw-r--r-- | cont.c | 1909 |
1 files changed, 1254 insertions, 655 deletions
@@ -16,27 +16,45 @@ #include <sys/mman.h> #endif +// On Solaris, madvise() is NOT declared for SUS (XPG4v2) or later, +// but MADV_* macros are defined when __EXTENSIONS__ is defined. +#ifdef NEED_MADVICE_PROTOTYPE_USING_CADDR_T +#include <sys/types.h> +extern int madvise(caddr_t, size_t, int); +#endif + #include COROUTINE_H #include "eval_intern.h" -#include "gc.h" #include "internal.h" #include "internal/cont.h" +#include "internal/thread.h" +#include "internal/error.h" +#include "internal/eval.h" +#include "internal/gc.h" #include "internal/proc.h" +#include "internal/sanitizers.h" #include "internal/warnings.h" #include "ruby/fiber/scheduler.h" -#include "mjit.h" +#include "yjit.h" #include "vm_core.h" +#include "vm_sync.h" #include "id_table.h" #include "ractor_core.h" +#include "zjit.h" -static const int DEBUG = 0; +enum { + DEBUG = 0, + DEBUG_EXPAND = 0, + DEBUG_ACQUIRE = 0, +}; #define RB_PAGE_SIZE (pagesize) #define RB_PAGE_MASK (~(RB_PAGE_SIZE - 1)) static long pagesize; -static const rb_data_type_t cont_data_type, fiber_data_type; +static const rb_data_type_t rb_cont_data_type; +static const rb_data_type_t rb_fiber_data_type; static VALUE rb_cContinuation; static VALUE rb_cFiber; static VALUE rb_eFiberError; @@ -49,11 +67,14 @@ static VALUE rb_cFiberPool; // Defined in `coroutine/$arch/Context.h`: #ifdef COROUTINE_LIMITED_ADDRESS_SPACE #define FIBER_POOL_ALLOCATION_FREE -#define FIBER_POOL_INITIAL_SIZE 8 -#define FIBER_POOL_ALLOCATION_MAXIMUM_SIZE 32 +#define FIBER_POOL_MINIMUM_COUNT 8 +#define FIBER_POOL_MAXIMUM_ALLOCATIONS 32 #else -#define FIBER_POOL_INITIAL_SIZE 32 -#define FIBER_POOL_ALLOCATION_MAXIMUM_SIZE 1024 +#define FIBER_POOL_MINIMUM_COUNT 32 +#define FIBER_POOL_MAXIMUM_ALLOCATIONS 1024 +#endif +#ifdef RB_EXPERIMENTAL_FIBER_POOL +#define FIBER_POOL_ALLOCATION_FREE #endif enum context_type { @@ -63,6 +84,7 @@ enum context_type { struct cont_saved_vm_stack { VALUE *ptr; + size_t size; #ifdef CAPTURE_JUST_VALID_VM_STACK size_t slen; /* length of stack (head of ec->vm_stack) */ size_t clen; /* length of control frames (tail of ec->vm_stack) */ @@ -162,7 +184,7 @@ struct fiber_pool { // A singly-linked list of allocations which contain 1 or more stacks each. struct fiber_pool_allocation * allocations; - // Provides O(1) stack "allocation": + // Free list that provides O(1) stack "allocation". struct fiber_pool_vacancy * vacancies; // The size of the stack allocations (excluding any guard page). @@ -172,18 +194,33 @@ struct fiber_pool { size_t count; // The initial number of stacks to allocate. - size_t initial_count; + size_t minimum_count; + + // If positive, total stacks in this pool cannot exceed this (shared pool only: + // set via RUBY_SHARED_FIBER_POOL_MAXIMUM_COUNT). Expansion fails with errno EAGAIN. + size_t maximum_count; - // Whether to madvise(free) the stack or not: + // Whether to madvise(free) the stack or not. + // If this value is set to 1, the stack will be madvise(free)ed + // (or equivalent), where possible, when it is returned to the pool. int free_stacks; // The number of stacks that have been used in this pool. size_t used; - // The amount to allocate for the vm_stack: + // The amount to allocate for the vm_stack. size_t vm_stack_size; }; +// Continuation contexts used by JITs +struct rb_jit_cont { + rb_execution_context_t *ec; // continuation ec + struct rb_jit_cont *prev, *next; // used to form lists +}; + +// Doubly linked list for enumerating all on-stack ISEQs. +static struct rb_jit_cont *first_jit_cont; + typedef struct rb_context_struct { enum context_type type; int argc; @@ -200,23 +237,21 @@ typedef struct rb_context_struct { } machine; rb_execution_context_t saved_ec; rb_jmpbuf_t jmpbuf; - rb_ensure_entry_t *ensure_array; - /* Pointer to MJIT info about the continuation. */ - struct mjit_cont *mjit_cont; + struct rb_jit_cont *jit_cont; // Continuation contexts for JITs } rb_context_t; - /* * Fiber status: - * [Fiber.new] ------> FIBER_CREATED - * | [Fiber#resume] - * v - * +--> FIBER_RESUMED ----+ - * [Fiber#resume] | | [Fiber.yield] | - * | v | - * +-- FIBER_SUSPENDED | [Terminate] - * | - * FIBER_TERMINATED <-+ + * [Fiber.new] ------> FIBER_CREATED ----> [Fiber#kill] --> | + * | [Fiber#resume] | + * v | + * +--> FIBER_RESUMED ----> [return] ------> | + * [Fiber#resume] | | [Fiber.yield/transfer] | + * [Fiber#transfer] | v | + * +--- FIBER_SUSPENDED --> [Fiber#kill] --> | + * | + * | + * FIBER_TERMINATED <-------------------+ */ enum fiber_status { FIBER_CREATED, @@ -235,25 +270,38 @@ struct rb_fiber_struct { rb_context_t cont; VALUE first_proc; struct rb_fiber_struct *prev; - VALUE resuming_fiber; + struct rb_fiber_struct *resuming_fiber; BITFIELD(enum fiber_status, status, 2); /* Whether the fiber is allowed to implicitly yield. */ unsigned int yielding : 1; unsigned int blocking : 1; + unsigned int killed : 1; + struct coroutine_context context; struct fiber_pool_stack stack; }; static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; -static ID fiber_initialize_keywords[2] = {0}; +void +rb_free_shared_fiber_pool(void) +{ + struct fiber_pool_allocation *allocations = shared_fiber_pool.allocations; + while (allocations) { + struct fiber_pool_allocation *next = allocations->next; + SIZED_FREE(allocations); + allocations = next; + } +} + +static ID fiber_initialize_keywords[3] = {0}; /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL * if MAP_STACK is passed. - * http://www.FreeBSD.org/cgi/query-pr.cgi?pr=158755 + * https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=158755 */ #if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__) #define FIBER_STACK_FLAGS (MAP_PRIVATE | MAP_ANON | MAP_STACK) @@ -264,7 +312,6 @@ static ID fiber_initialize_keywords[2] = {0}; #define ERRNOMSG strerror(errno) // Locates the stack vacancy details for the given stack. -// Requires that fiber_pool_vacancy fits within one page. inline static struct fiber_pool_vacancy * fiber_pool_vacancy_pointer(void * base, size_t size) { @@ -275,6 +322,24 @@ fiber_pool_vacancy_pointer(void * base, size_t size) ); } +#if defined(COROUTINE_SANITIZE_ADDRESS) +// Compute the base pointer for a vacant stack, for the area which can be poisoned. +inline static void * +fiber_pool_stack_poison_base(struct fiber_pool_stack * stack) +{ + STACK_GROW_DIR_DETECTION; + + return (char*)stack->base + STACK_DIR_UPPER(RB_PAGE_SIZE, 0); +} + +// Compute the size of the vacant stack, for the area that can be poisoned. +inline static size_t +fiber_pool_stack_poison_size(struct fiber_pool_stack * stack) +{ + return stack->size - RB_PAGE_SIZE; +} +#endif + // Reset the current stack pointer and available size of the given stack. inline static void fiber_pool_stack_reset(struct fiber_pool_stack * stack) @@ -414,11 +479,12 @@ fiber_pool_allocate_memory(size_t * count, size_t stride) // the system would allow (e.g. overcommit * physical memory + swap), we // divide count by two and try again. This condition should only be // encountered in edge cases, but we handle it here gracefully. - while (*count > 1) { + while (*count) { #if defined(_WIN32) void * base = VirtualAlloc(0, (*count)*stride, MEM_COMMIT, PAGE_READWRITE); if (!base) { + errno = rb_w32_map_errno(GetLastError()); *count = (*count) >> 1; } else { @@ -426,18 +492,20 @@ fiber_pool_allocate_memory(size_t * count, size_t stride) } #else errno = 0; - void * base = mmap(NULL, (*count)*stride, PROT_READ | PROT_WRITE, FIBER_STACK_FLAGS, -1, 0); + size_t mmap_size = (*count)*stride; + void * base = mmap(NULL, mmap_size, PROT_READ | PROT_WRITE, FIBER_STACK_FLAGS, -1, 0); if (base == MAP_FAILED) { // If the allocation fails, count = count / 2, and try again. *count = (*count) >> 1; } else { + ruby_annotate_mmap(base, mmap_size, "Ruby:fiber_pool_allocate_memory"); #if defined(MADV_FREE_REUSE) // On Mac MADV_FREE_REUSE is necessary for the task_info api // to keep the accounting accurate as possible when a page is marked as reusable // it can possibly not occurring at first call thus re-iterating if necessary. - while (madvise(base, (*count)*stride, MADV_FREE_REUSE) == -1 && errno == EAGAIN); + while (madvise(base, mmap_size, MADV_FREE_REUSE) == -1 && errno == EAGAIN); #endif return base; } @@ -448,26 +516,53 @@ fiber_pool_allocate_memory(size_t * count, size_t stride) } // Given an existing fiber pool, expand it by the specified number of stacks. +// // @param count the maximum number of stacks to allocate. -// @return the allocated fiber pool. +// @return the new allocation on success, or NULL on failure with errno set. +// @raise NoMemoryError if the struct or memory allocation fails. +// +// Call from fiber_pool_stack_acquire_expand with VM lock held, or from +// fiber_pool_initialize before the pool is shared across threads. // @sa fiber_pool_allocation_free static struct fiber_pool_allocation * fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) { + if (count == 0) { + errno = EAGAIN; + return NULL; + } + STACK_GROW_DIR_DETECTION; size_t size = fiber_pool->size; size_t stride = size + RB_PAGE_SIZE; + // If the maximum number of stacks is set, and we have reached it, return NULL. + if (fiber_pool->maximum_count > 0) { + if (fiber_pool->count >= fiber_pool->maximum_count) { + errno = EAGAIN; + return NULL; + } + size_t remaining = fiber_pool->maximum_count - fiber_pool->count; + if (count > remaining) { + count = remaining; + } + } + + // Allocate metadata before mmap: ruby_xmalloc (RB_ALLOC) raises on failure and + // must not run after base is mapped, or the region would leak. + struct fiber_pool_allocation * allocation = RB_ALLOC(struct fiber_pool_allocation); + // Allocate the memory required for the stacks: void * base = fiber_pool_allocate_memory(&count, stride); if (base == NULL) { - rb_raise(rb_eFiberError, "can't alloc machine stack to fiber (%"PRIuSIZE" x %"PRIuSIZE" bytes): %s", count, size, ERRNOMSG); + if (!errno) errno = ENOMEM; + ruby_xfree(allocation); + return NULL; } struct fiber_pool_vacancy * vacancies = fiber_pool->vacancies; - struct fiber_pool_allocation * allocation = RB_ALLOC(struct fiber_pool_allocation); // Initialize fiber pool allocation: allocation->base = base; @@ -479,7 +574,7 @@ fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) #endif allocation->pool = fiber_pool; - if (DEBUG) { + if (DEBUG_EXPAND) { fprintf(stderr, "fiber_pool_expand(%"PRIuSIZE"): %p, %"PRIuSIZE"/%"PRIuSIZE" x [%"PRIuSIZE":%"PRIuSIZE"]\n", count, (void*)fiber_pool, fiber_pool->used, fiber_pool->count, size, fiber_pool->vm_stack_size); } @@ -488,18 +583,27 @@ fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) for (size_t i = 0; i < count; i += 1) { void * base = (char*)allocation->base + (stride * i); void * page = (char*)base + STACK_DIR_UPPER(size, 0); - #if defined(_WIN32) DWORD old_protect; if (!VirtualProtect(page, RB_PAGE_SIZE, PAGE_READWRITE | PAGE_GUARD, &old_protect)) { + int error = rb_w32_map_errno(GetLastError()); VirtualFree(allocation->base, 0, MEM_RELEASE); - rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); + ruby_xfree(allocation); + errno = error; + return NULL; } +#elif defined(__wasi__) + // wasi-libc's mprotect emulation doesn't support PROT_NONE. + (void)page; #else if (mprotect(page, RB_PAGE_SIZE, PROT_NONE) < 0) { + int error = errno; + if (!error) error = ENOMEM; munmap(allocation->base, count*stride); - rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); + ruby_xfree(allocation); + errno = error; + return NULL; } #endif @@ -535,7 +639,7 @@ fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) // Initialize the specified fiber pool with the given number of stacks. // @param vm_stack_size The size of the vm stack to allocate. static void -fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t count, size_t vm_stack_size) +fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t minimum_count, size_t maximum_count, size_t vm_stack_size) { VM_ASSERT(vm_stack_size < size); @@ -543,13 +647,17 @@ fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t count, fiber_pool->vacancies = NULL; fiber_pool->size = ((size / RB_PAGE_SIZE) + 1) * RB_PAGE_SIZE; fiber_pool->count = 0; - fiber_pool->initial_count = count; + fiber_pool->minimum_count = minimum_count; + fiber_pool->maximum_count = maximum_count; fiber_pool->free_stacks = 1; fiber_pool->used = 0; - fiber_pool->vm_stack_size = vm_stack_size; - fiber_pool_expand(fiber_pool, count); + if (fiber_pool->minimum_count > 0) { + if (RB_UNLIKELY(!fiber_pool_expand(fiber_pool, fiber_pool->minimum_count))) { + rb_raise(rb_eFiberError, "can't allocate initial fiber stacks (%"PRIuSIZE" x %"PRIuSIZE" bytes): %s", fiber_pool->minimum_count, fiber_pool->size, strerror(errno)); + } + } } #ifdef FIBER_POOL_ALLOCATION_FREE @@ -561,7 +669,7 @@ fiber_pool_allocation_free(struct fiber_pool_allocation * allocation) VM_ASSERT(allocation->used == 0); - if (DEBUG) fprintf(stderr, "fiber_pool_allocation_free: %p base=%p count=%"PRIuSIZE"\n", allocation, allocation->base, allocation->count); + if (DEBUG) fprintf(stderr, "fiber_pool_allocation_free: %p base=%p count=%"PRIuSIZE"\n", (void*)allocation, allocation->base, allocation->count); size_t i; for (i = 0; i < allocation->count; i += 1) { @@ -593,45 +701,120 @@ fiber_pool_allocation_free(struct fiber_pool_allocation * allocation) allocation->pool->count -= allocation->count; - ruby_xfree(allocation); + SIZED_FREE(allocation); } #endif -// Acquire a stack from the given fiber pool. If none are available, allocate more. -static struct fiber_pool_stack -fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) +// Number of stacks to request when expanding the pool (clamped to min/max). +static size_t +fiber_pool_stack_expand_count(const struct fiber_pool *pool) { - struct fiber_pool_vacancy * vacancy = fiber_pool_vacancy_pop(fiber_pool); + const size_t maximum_allocations = FIBER_POOL_MAXIMUM_ALLOCATIONS; + const size_t minimum_count = FIBER_POOL_MINIMUM_COUNT; + + // We are going try and double the number of stacks in the pool: + size_t count = pool->count; + if (count > maximum_allocations) count = maximum_allocations; + if (count < minimum_count) count = minimum_count; + + // If we have a maximum count, we need to clamp the number of stacks to the maximum: + if (pool->maximum_count > 0) { + if (pool->count >= pool->maximum_count) { + // No expansion is possible: + return 0; + } + + // Otherwise, compute the number of stacks we can allocate to bring us to the maximum: + size_t remaining = pool->maximum_count - pool->count; + if (count > remaining) { + count = remaining; + } + } + + return count; +} - if (DEBUG) fprintf(stderr, "fiber_pool_stack_acquire: %p used=%"PRIuSIZE"\n", (void*)fiber_pool->vacancies, fiber_pool->used); +// When the vacancy list is empty, grow the pool (and run GC only if mmap fails). Caller holds the VM lock. +// Returns NULL if expansion failed after GC + retry; errno is set. Otherwise returns a vacancy. +static struct fiber_pool_vacancy * +fiber_pool_stack_acquire_expand(struct fiber_pool *fiber_pool) +{ + size_t count = fiber_pool_stack_expand_count(fiber_pool); - if (!vacancy) { - const size_t maximum = FIBER_POOL_ALLOCATION_MAXIMUM_SIZE; - const size_t minimum = fiber_pool->initial_count; + if (DEBUG_ACQUIRE) fprintf(stderr, "fiber_pool_stack_acquire: expanding fiber pool by %"PRIuSIZE" stacks\n", count); - size_t count = fiber_pool->count; - if (count > maximum) count = maximum; - if (count < minimum) count = minimum; + struct fiber_pool_vacancy *vacancy = NULL; - fiber_pool_expand(fiber_pool, count); + if (RB_LIKELY(fiber_pool_expand(fiber_pool, count))) { + return fiber_pool_vacancy_pop(fiber_pool); + } + else { + if (DEBUG_ACQUIRE) fprintf(stderr, "fiber_pool_stack_acquire: expand failed (%s), collecting garbage\n", strerror(errno)); - // The free list should now contain some stacks: - VM_ASSERT(fiber_pool->vacancies); + rb_gc(); + // After running GC, the vacancy list may have some stacks: vacancy = fiber_pool_vacancy_pop(fiber_pool); + if (RB_LIKELY(vacancy)) { + return vacancy; + } + + // Recompute count as gc may have freed up some allocations: + count = fiber_pool_stack_expand_count(fiber_pool); + + // Try to expand the fiber pool again: + if (RB_LIKELY(fiber_pool_expand(fiber_pool, count))) { + return fiber_pool_vacancy_pop(fiber_pool); + } + else { + // Okay, we really failed to acquire a stack. Give up and return NULL with errno set: + return NULL; + } } +} - VM_ASSERT(vacancy); - VM_ASSERT(vacancy->stack.base); +// Acquire a stack from the given fiber pool. If none are available, allocate more. +static struct fiber_pool_stack +fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) +{ + struct fiber_pool_vacancy * vacancy; + + unsigned int lev; + RB_VM_LOCK_ENTER_LEV(&lev); + { + // Fast path: try to acquire a stack from the vacancy list: + vacancy = fiber_pool_vacancy_pop(fiber_pool); + + if (DEBUG) fprintf(stderr, "fiber_pool_stack_acquire: %p used=%"PRIuSIZE"\n", (void*)fiber_pool->vacancies, fiber_pool->used); + + // Slow path: If the pool has no vacancies, expand first. Only run GC when expansion fails (e.g. mmap), so we can reclaim stacks from dead fibers before retrying: + if (RB_UNLIKELY(!vacancy)) { + vacancy = fiber_pool_stack_acquire_expand(fiber_pool); + + // If expansion failed, raise an error: + if (RB_UNLIKELY(!vacancy)) { + RB_VM_LOCK_LEAVE_LEV(&lev); + rb_raise(rb_eFiberError, "can't allocate fiber stack: %s", strerror(errno)); + } + } - // Take the top item from the free list: - fiber_pool->used += 1; + VM_ASSERT(vacancy); + VM_ASSERT(vacancy->stack.base); + +#if defined(COROUTINE_SANITIZE_ADDRESS) + __asan_unpoison_memory_region(fiber_pool_stack_poison_base(&vacancy->stack), fiber_pool_stack_poison_size(&vacancy->stack)); +#endif + + // Take the top item from the free list: + fiber_pool->used += 1; #ifdef FIBER_POOL_ALLOCATION_FREE - vacancy->stack.allocation->used += 1; + vacancy->stack.allocation->used += 1; #endif - fiber_pool_stack_reset(&vacancy->stack); + fiber_pool_stack_reset(&vacancy->stack); + } + RB_VM_LOCK_LEAVE_LEV(&lev); return vacancy->stack; } @@ -647,28 +830,54 @@ fiber_pool_stack_free(struct fiber_pool_stack * stack) // If this is not true, the vacancy information will almost certainly be destroyed: VM_ASSERT(size <= (stack->size - RB_PAGE_SIZE)); - if (DEBUG) fprintf(stderr, "fiber_pool_stack_free: %p+%"PRIuSIZE" [base=%p, size=%"PRIuSIZE"]\n", base, size, stack->base, stack->size); + int advice = stack->pool->free_stacks >> 1; -#if VM_CHECK_MODE > 0 && defined(MADV_DONTNEED) + if (DEBUG) fprintf(stderr, "fiber_pool_stack_free: %p+%"PRIuSIZE" [base=%p, size=%"PRIuSIZE"] advice=%d\n", base, size, stack->base, stack->size, advice); + + // The pages being used by the stack can be returned back to the system. + // That doesn't change the page mapping, but it does allow the system to + // reclaim the physical memory. + // Since we no longer care about the data itself, we don't need to page + // out to disk, since that is costly. Not all systems support that, so + // we try our best to select the most efficient implementation. + // In addition, it's actually slightly desirable to not do anything here, + // but that results in higher memory usage. + +#ifdef __wasi__ + // WebAssembly doesn't support madvise, so we just don't do anything. +#elif VM_CHECK_MODE > 0 && defined(MADV_DONTNEED) + if (!advice) advice = MADV_DONTNEED; // This immediately discards the pages and the memory is reset to zero. - madvise(base, size, MADV_DONTNEED); -#elif defined(POSIX_MADV_DONTNEED) - posix_madvise(base, size, POSIX_MADV_DONTNEED); + madvise(base, size, advice); #elif defined(MADV_FREE_REUSABLE) + if (!advice) advice = MADV_FREE_REUSABLE; + // Darwin / macOS / iOS. // Acknowledge the kernel down to the task info api we make this // page reusable for future use. - // As for MADV_FREE_REUSE below we ensure in the rare occasions the task was not + // As for MADV_FREE_REUSABLE below we ensure in the rare occasions the task was not // completed at the time of the call to re-iterate. - while (madvise(base, size, MADV_FREE_REUSABLE) == -1 && errno == EAGAIN); + while (madvise(base, size, advice) == -1 && errno == EAGAIN); #elif defined(MADV_FREE) - madvise(base, size, MADV_FREE); + if (!advice) advice = MADV_FREE; + // Recent Linux. + madvise(base, size, advice); #elif defined(MADV_DONTNEED) - madvise(base, size, MADV_DONTNEED); + if (!advice) advice = MADV_DONTNEED; + // Old Linux. + madvise(base, size, advice); +#elif defined(POSIX_MADV_DONTNEED) + if (!advice) advice = POSIX_MADV_DONTNEED; + // Solaris? + posix_madvise(base, size, advice); #elif defined(_WIN32) VirtualAlloc(base, size, MEM_RESET, PAGE_READWRITE); // Not available in all versions of Windows. //DiscardVirtualMemory(base, size); #endif + +#if defined(COROUTINE_SANITIZE_ADDRESS) + __asan_poison_memory_region(fiber_pool_stack_poison_base(stack), fiber_pool_stack_poison_size(stack)); +#endif } // Release and return a stack to the vacancy list. @@ -688,7 +897,7 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) fiber_pool_vacancy_reset(vacancy); // Push the vacancy into the vancancies list: - pool->vacancies = fiber_pool_vacancy_push(vacancy, stack->pool->vacancies); + pool->vacancies = fiber_pool_vacancy_push(vacancy, pool->vacancies); pool->used -= 1; #ifdef FIBER_POOL_ALLOCATION_FREE @@ -704,17 +913,78 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) fiber_pool_stack_free(&vacancy->stack); } #else - // This is entirely optional, but clears the dirty flag from the stack memory, so it won't get swapped to disk when there is memory pressure: + // This is entirely optional, but clears the dirty flag from the stack + // memory, so it won't get swapped to disk when there is memory pressure: if (stack->pool->free_stacks) { fiber_pool_stack_free(&vacancy->stack); } #endif } +static inline void +ec_switch(rb_thread_t *th, rb_fiber_t *fiber) +{ + rb_execution_context_t *ec = &fiber->cont.saved_ec; +#ifdef RUBY_ASAN_ENABLED + ec->machine.asan_fake_stack_handle = asan_get_thread_fake_stack_handle(); +#endif + rb_ractor_set_current_ec(th->ractor, th->ec = ec); + // ruby_current_execution_context_ptr = th->ec = ec; + + /* + * timer-thread may set trap interrupt on previous th->ec at any time; + * ensure we do not delay (or lose) the trap interrupt handling. + */ + if (th->vm->ractor.main_thread == th && + rb_signal_buff_size() > 0) { + RUBY_VM_SET_TRAP_INTERRUPT(ec); + } + + VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL); +} + +static inline void +fiber_restore_thread(rb_thread_t *th, rb_fiber_t *fiber) +{ + ec_switch(th, fiber); + VM_ASSERT(th->ec->fiber_ptr == fiber); +} + +#ifndef COROUTINE_DECL +# define COROUTINE_DECL COROUTINE +#endif +NORETURN(static COROUTINE_DECL fiber_entry(struct coroutine_context * from, struct coroutine_context * to)); static COROUTINE fiber_entry(struct coroutine_context * from, struct coroutine_context * to) { - rb_fiber_start(); + rb_fiber_t *fiber = to->argument; + +#if defined(COROUTINE_SANITIZE_ADDRESS) + // Address sanitizer will copy the previous stack base and stack size into + // the "from" fiber. `coroutine_initialize_main` doesn't generally know the + // stack bounds (base + size). Therefore, the main fiber `stack_base` and + // `stack_size` will be NULL/0. It's specifically important in that case to + // get the (base+size) of the previous fiber and save it, so that later when + // we return to the main coroutine, we don't supply (NULL, 0) to + // __sanitizer_start_switch_fiber which royally messes up the internal state + // of ASAN and causes (sometimes) the following message: + // "WARNING: ASan is ignoring requested __asan_handle_no_return" + __sanitizer_finish_switch_fiber(to->fake_stack, (const void**)&from->stack_base, &from->stack_size); +#endif + + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; + +#ifdef COROUTINE_PTHREAD_CONTEXT + ruby_thread_set_native(thread); +#endif + + fiber_restore_thread(thread, fiber); + + rb_fiber_start(fiber); + +#ifndef COROUTINE_PTHREAD_CONTEXT + VM_UNREACHABLE(fiber_entry); +#endif } // Initialize a fiber's coroutine's machine stack and vm stack. @@ -731,27 +1001,19 @@ fiber_initialize_coroutine(rb_fiber_t *fiber, size_t * vm_stack_size) vm_stack = fiber_pool_stack_alloca(&fiber->stack, fiber_pool->vm_stack_size); *vm_stack_size = fiber_pool->vm_stack_size; -#ifdef COROUTINE_PRIVATE_STACK - coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available, sec->machine.stack_start); - // The stack for this execution context is still the main machine stack, so don't adjust it. - // If this is not managed correctly, you will fail in `rb_ec_stack_check`. - - // We limit the machine stack usage to the fiber stack size. - if (sec->machine.stack_maxsize > fiber->stack.available) { - sec->machine.stack_maxsize = fiber->stack.available; - } -#else coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available); // The stack for this execution context is the one we allocated: sec->machine.stack_start = fiber->stack.current; sec->machine.stack_maxsize = fiber->stack.available; -#endif + + fiber->context.argument = (void*)fiber; return vm_stack; } -// Release the stack from the fiber, it's execution context, and return it to the fiber pool. +// Release the stack from the fiber, it's execution context, and return it to +// the fiber pool. static void fiber_stack_release(rb_fiber_t * fiber) { @@ -769,6 +1031,17 @@ fiber_stack_release(rb_fiber_t * fiber) rb_ec_clear_vm_stack(ec); } +static void +fiber_stack_release_locked(rb_fiber_t *fiber) +{ + if (!ruby_vm_during_cleanup) { + // We can't try to acquire the VM lock here because MMTK calls free in its own native thread which has no ec. + // This assertion will fail on MMTK but we currently don't have CI for debug releases of MMTK, so we can assert for now. + ASSERT_vm_locking_with_barrier(); + } + fiber_stack_release(fiber); +} + static const char * fiber_status_name(enum fiber_status s) { @@ -790,7 +1063,9 @@ fiber_verify(const rb_fiber_t *fiber) switch (fiber->status) { case FIBER_RESUMED: - VM_ASSERT(fiber->cont.saved_ec.vm_stack != NULL); + if (fiber->cont.saved_ec.thread_ptr->self == 0) { + VM_ASSERT(fiber->cont.saved_ec.vm_stack != NULL); + } break; case FIBER_SUSPENDED: VM_ASSERT(fiber->cont.saved_ec.vm_stack != NULL); @@ -815,31 +1090,12 @@ fiber_status_set(rb_fiber_t *fiber, enum fiber_status s) fiber->status = s; } -static inline void -ec_switch(rb_thread_t *th, rb_fiber_t *fiber) -{ - rb_execution_context_t *ec = &fiber->cont.saved_ec; - rb_ractor_set_current_ec(th->ractor, th->ec = ec); - // ruby_current_execution_context_ptr = th->ec = ec; - - /* - * timer-thread may set trap interrupt on previous th->ec at any time; - * ensure we do not delay (or lose) the trap interrupt handling. - */ - if (th->vm->ractor.main_thread == th && - rb_signal_buff_size() > 0) { - RUBY_VM_SET_TRAP_INTERRUPT(ec); - } - - VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL); -} - static rb_context_t * cont_ptr(VALUE obj) { rb_context_t *cont; - TypedData_Get_Struct(obj, rb_context_t, &cont_data_type, cont); + TypedData_Get_Struct(obj, rb_context_t, &rb_cont_data_type, cont); return cont; } @@ -849,7 +1105,7 @@ fiber_ptr(VALUE obj) { rb_fiber_t *fiber; - TypedData_Get_Struct(obj, rb_fiber_t, &fiber_data_type, fiber); + TypedData_Get_Struct(obj, rb_fiber_t, &rb_fiber_data_type, fiber); if (!fiber) rb_raise(rb_eFiberError, "uninitialized fiber"); return fiber; @@ -916,13 +1172,8 @@ cont_mark(void *ptr) cont->machine.stack + cont->machine.stack_size); } else { - /* fiber */ - const rb_fiber_t *fiber = (rb_fiber_t*)cont; - - if (!FIBER_TERMINATED_P(fiber)) { - rb_gc_mark_locations(cont->machine.stack, - cont->machine.stack + cont->machine.stack_size); - } + /* fiber machine context is marked as part of rb_execution_context_mark, no need to + * do anything here. */ } } @@ -937,6 +1188,8 @@ fiber_is_root_p(const rb_fiber_t *fiber) } #endif +static void jit_cont_free(struct rb_jit_cont *cont); + static void cont_free(void *ptr) { @@ -945,24 +1198,26 @@ cont_free(void *ptr) RUBY_FREE_ENTER("cont"); if (cont->type == CONTINUATION_CONTEXT) { - ruby_xfree(cont->saved_ec.vm_stack); - ruby_xfree(cont->ensure_array); - RUBY_FREE_UNLESS_NULL(cont->machine.stack); + SIZED_FREE_N(cont->saved_ec.vm_stack, cont->saved_ec.vm_stack_size); + SIZED_FREE_N(cont->machine.stack, cont->machine.stack_size); } else { rb_fiber_t *fiber = (rb_fiber_t*)cont; coroutine_destroy(&fiber->context); - fiber_stack_release(fiber); + fiber_stack_release_locked(fiber); } - RUBY_FREE_UNLESS_NULL(cont->saved_vm_stack.ptr); + SIZED_FREE_N(cont->saved_vm_stack.ptr, cont->saved_vm_stack.size); - if (mjit_enabled) { - VM_ASSERT(cont->mjit_cont != NULL); - mjit_cont_free(cont->mjit_cont); - } + VM_ASSERT(cont->jit_cont != NULL); + jit_cont_free(cont->jit_cont); /* free rb_cont_t or rb_fiber_t */ - ruby_xfree(ptr); + if (cont->type == CONTINUATION_CONTEXT) { + SIZED_FREE(cont); + } + else { + SIZED_FREE((rb_fiber_t *)cont); + } RUBY_FREE_LEAVE("cont"); } @@ -1003,12 +1258,7 @@ rb_fiber_update_self(rb_fiber_t *fiber) void rb_fiber_mark_self(const rb_fiber_t *fiber) { - if (fiber->cont.self) { - rb_gc_mark_movable(fiber->cont.self); - } - else { - rb_execution_context_mark(&fiber->cont.saved_ec); - } + rb_gc_mark_movable(fiber->cont.self); } static void @@ -1041,7 +1291,7 @@ fiber_free(void *ptr) rb_fiber_t *fiber = ptr; RUBY_FREE_ENTER("fiber"); - //if (DEBUG) fprintf(stderr, "fiber_free: %p[%p]\n", fiber, fiber->stack.base); + if (DEBUG) fprintf(stderr, "fiber_free: %p[%p]\n", (void *)fiber, fiber->stack.base); if (fiber->cont.saved_ec.local_storage) { rb_id_table_free(fiber->cont.saved_ec.local_storage); @@ -1064,7 +1314,9 @@ fiber_memsize(const void *ptr) */ if (saved_ec->local_storage && fiber != th->root_fiber) { size += rb_id_table_memsize(saved_ec->local_storage); + size += rb_obj_memsize_of(saved_ec->storage); } + size += cont_memsize(&fiber->cont); return size; } @@ -1072,17 +1324,13 @@ fiber_memsize(const void *ptr) VALUE rb_obj_is_fiber(VALUE obj) { - if (rb_typeddata_is_kind_of(obj, &fiber_data_type)) { - return Qtrue; - } - else { - return Qfalse; - } + return RBOOL(rb_typeddata_is_kind_of(obj, &rb_fiber_data_type)); } static void cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) { + const size_t old_stack_size = cont->machine.stack_size; size_t size; SET_MACHINE_STACK_END(&th->ec->machine.stack_end); @@ -1097,19 +1345,34 @@ cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) } if (cont->machine.stack) { - REALLOC_N(cont->machine.stack, VALUE, size); + SIZED_REALLOC_N(cont->machine.stack, VALUE, cont->machine.stack_size, old_stack_size); } else { - cont->machine.stack = ALLOC_N(VALUE, size); + cont->machine.stack = ALLOC_N(VALUE, cont->machine.stack_size); } FLUSH_REGISTER_WINDOWS; + asan_unpoison_memory_region(cont->machine.stack_src, size, false); MEMCPY(cont->machine.stack, cont->machine.stack_src, VALUE, size); } -static const rb_data_type_t cont_data_type = { +static void +cont_handle_weak_references(void *ptr) +{ + rb_context_t *cont = ptr; + + if (!cont) return; + + if (!rb_gc_handle_weak_references_alive_p(cont->saved_ec.gen_fields_cache.obj) || + !rb_gc_handle_weak_references_alive_p(cont->saved_ec.gen_fields_cache.fields_obj)) { + cont->saved_ec.gen_fields_cache.obj = Qundef; + cont->saved_ec.gen_fields_cache.fields_obj = Qundef; + } +} + +static const rb_data_type_t rb_cont_data_type = { "continuation", - {cont_mark, cont_free, cont_memsize, cont_compact}, + {cont_mark, cont_free, cont_memsize, cont_compact, cont_handle_weak_references}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; @@ -1128,13 +1391,128 @@ cont_save_thread(rb_context_t *cont, rb_thread_t *th) sec->machine.stack_end = NULL; } +static rb_nativethread_lock_t jit_cont_lock; + +// Register a new continuation with execution context `ec`. Return JIT info about +// the continuation. +static struct rb_jit_cont * +jit_cont_new(rb_execution_context_t *ec) +{ + struct rb_jit_cont *cont; + + // We need to use calloc instead of something like ZALLOC to avoid triggering GC here. + // When this function is called from rb_thread_alloc through rb_threadptr_root_fiber_setup, + // the thread is still being prepared and marking it causes SEGV. + cont = ruby_mimcalloc(1, sizeof(struct rb_jit_cont)); + if (cont == NULL) + rb_memerror(); + cont->ec = ec; + + rb_native_mutex_lock(&jit_cont_lock); + if (first_jit_cont == NULL) { + cont->next = cont->prev = NULL; + } + else { + cont->prev = NULL; + cont->next = first_jit_cont; + first_jit_cont->prev = cont; + } + first_jit_cont = cont; + rb_native_mutex_unlock(&jit_cont_lock); + + return cont; +} + +// Unregister continuation `cont`. static void -cont_init_mjit_cont(rb_context_t *cont) +jit_cont_free(struct rb_jit_cont *cont) +{ + if (!cont) return; + + rb_native_mutex_lock(&jit_cont_lock); + if (cont == first_jit_cont) { + first_jit_cont = cont->next; + if (first_jit_cont != NULL) + first_jit_cont->prev = NULL; + } + else { + cont->prev->next = cont->next; + if (cont->next != NULL) + cont->next->prev = cont->prev; + } + rb_native_mutex_unlock(&jit_cont_lock); + + ruby_mimfree(cont); +} + +// Call a given callback against all on-stack ISEQs. +void +rb_jit_cont_each_iseq(rb_iseq_callback callback, void *data) +{ + struct rb_jit_cont *cont; + for (cont = first_jit_cont; cont != NULL; cont = cont->next) { + if (cont->ec->vm_stack == NULL) + continue; + + const rb_control_frame_t *cfp = cont->ec->cfp; + while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { + if (CFP_PC(cfp) && CFP_ISEQ(cfp)) { + const rb_iseq_t *iseq = CFP_ISEQ(cfp); + if (iseq && imemo_type((VALUE)iseq) == imemo_iseq) { + callback(iseq, data); + } + } + cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); + } + } +} + +#if USE_YJIT +// Update the jit_return of all CFPs to leave_exit unless it's leave_exception or not set. +// This prevents jit_exec_exception from jumping to the caller after invalidation. +void +rb_yjit_cancel_jit_return(void *leave_exit, void *leave_exception) +{ + struct rb_jit_cont *cont; + for (cont = first_jit_cont; cont != NULL; cont = cont->next) { + if (cont->ec->vm_stack == NULL) + continue; + + const rb_control_frame_t *cfp = cont->ec->cfp; + while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { + if (cfp->jit_return && cfp->jit_return != leave_exception) { + ((rb_control_frame_t *)cfp)->jit_return = leave_exit; + } + cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); + } + } +} +#endif + +// Finish working with jit_cont. +void +rb_jit_cont_finish(void) { - VM_ASSERT(cont->mjit_cont == NULL); - if (mjit_enabled) { - cont->mjit_cont = mjit_cont_new(&(cont->saved_ec)); + struct rb_jit_cont *cont, *next; + for (cont = first_jit_cont; cont != NULL; cont = next) { + next = cont->next; + ruby_mimfree(cont); // Don't use xfree because it's allocated by mimcalloc. } + rb_native_mutex_destroy(&jit_cont_lock); +} + +static void +cont_init_jit_cont(rb_context_t *cont) +{ + VM_ASSERT(cont->jit_cont == NULL); + // We always allocate this since YJIT may be enabled later + cont->jit_cont = jit_cont_new(&(cont->saved_ec)); +} + +struct rb_execution_context_struct * +rb_fiberptr_get_ec(struct rb_fiber_struct *fiber) +{ + return &fiber->cont.saved_ec; } static void @@ -1146,7 +1524,7 @@ cont_init(rb_context_t *cont, rb_thread_t *th) cont->saved_ec.local_storage = NULL; cont->saved_ec.local_storage_recursive_hash = Qnil; cont->saved_ec.local_storage_recursive_hash_for_trace = Qnil; - cont_init_mjit_cont(cont); + cont_init_jit_cont(cont); } static rb_context_t * @@ -1157,27 +1535,30 @@ cont_new(VALUE klass) rb_thread_t *th = GET_THREAD(); THREAD_MUST_BE_RUNNING(th); - contval = TypedData_Make_Struct(klass, rb_context_t, &cont_data_type, cont); + contval = TypedData_Make_Struct(klass, rb_context_t, &rb_cont_data_type, cont); + rb_gc_declare_weak_references(contval); cont->self = contval; cont_init(cont, th); return cont; } -VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber) +VALUE +rb_fiberptr_self(struct rb_fiber_struct *fiber) { return fiber->cont.self; } -unsigned int rb_fiberptr_blocking(struct rb_fiber_struct *fiber) +unsigned int +rb_fiberptr_blocking(struct rb_fiber_struct *fiber) { return fiber->blocking; } -// This is used for root_fiber because other fibers call cont_init_mjit_cont through cont_new. +// Initialize the jit_cont_lock void -rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber) +rb_jit_cont_init(void) { - cont_init_mjit_cont(&fiber->cont); + rb_native_mutex_initialize(&jit_cont_lock); } #if 0 @@ -1199,18 +1580,15 @@ show_vm_pcs(const rb_control_frame_t *cfp, int i=0; while (cfp != end_of_cfp) { int pc = 0; - if (cfp->iseq) { - pc = cfp->pc - cfp->iseq->body->iseq_encoded; + if (CFP_ISEQ(cfp)) { + pc = cfp->pc - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded; } fprintf(stderr, "%2d pc: %d\n", i++, pc); cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); } } #endif -COMPILER_WARNING_PUSH -#ifdef __clang__ -COMPILER_WARNING_IGNORED(-Wduplicate-decl-specifier) -#endif + static VALUE cont_capture(volatile int *volatile stat) { @@ -1227,6 +1605,7 @@ cont_capture(volatile int *volatile stat) #ifdef CAPTURE_JUST_VALID_VM_STACK cont->saved_vm_stack.slen = ec->cfp->sp - ec->vm_stack; cont->saved_vm_stack.clen = ec->vm_stack + ec->vm_stack_size - (VALUE*)ec->cfp; + cont->saved_vm_stack.size = cont->saved_vm_stack.slen + cont->saved_vm_stack.clen; cont->saved_vm_stack.ptr = ALLOC_N(VALUE, cont->saved_vm_stack.slen + cont->saved_vm_stack.clen); MEMCPY(cont->saved_vm_stack.ptr, ec->vm_stack, @@ -1236,6 +1615,7 @@ cont_capture(volatile int *volatile stat) VALUE, cont->saved_vm_stack.clen); #else + cont->saved_vm_stack.size = ec->vm_stack_size; cont->saved_vm_stack.ptr = ALLOC_N(VALUE, ec->vm_stack_size); MEMCPY(cont->saved_vm_stack.ptr, ec->vm_stack, VALUE, ec->vm_stack_size); #endif @@ -1244,22 +1624,6 @@ cont_capture(volatile int *volatile stat) VM_ASSERT(cont->saved_ec.cfp != NULL); cont_save_machine_stack(th, cont); - /* backup ensure_list to array for search in another context */ - { - rb_ensure_list_t *p; - int size = 0; - rb_ensure_entry_t *entry; - for (p=th->ec->ensure_list; p; p=p->next) - size++; - entry = cont->ensure_array = ALLOC_N(rb_ensure_entry_t,size+1); - for (p=th->ec->ensure_list; p; p=p->next) { - if (!p->entry.marker) - p->entry.marker = rb_ary_tmp_new(0); /* dummy object */ - *entry++ = p->entry; - } - entry->marker = 0; - } - if (ruby_setjmp(cont->jmpbuf)) { VALUE value; @@ -1275,14 +1639,6 @@ cont_capture(volatile int *volatile stat) return contval; } } -COMPILER_WARNING_POP - -static inline void -fiber_restore_thread(rb_thread_t *th, rb_fiber_t *fiber) -{ - ec_switch(th, fiber); - VM_ASSERT(th->ec->fiber_ptr == fiber); -} static inline void cont_restore_thread(rb_context_t *cont) @@ -1310,6 +1666,51 @@ cont_restore_thread(rb_context_t *cont) rb_raise(rb_eRuntimeError, "can't call across trace_func"); } +#if defined(__wasm__) && !defined(__EMSCRIPTEN__) + if (th->ec->tag != sec->tag) { + /* find the lowest common ancestor tag of the current EC and the saved EC */ + + struct rb_vm_tag *lowest_common_ancestor = NULL; + size_t num_tags = 0; + size_t num_saved_tags = 0; + for (struct rb_vm_tag *tag = th->ec->tag; tag != NULL; tag = tag->prev) { + ++num_tags; + } + for (struct rb_vm_tag *tag = sec->tag; tag != NULL; tag = tag->prev) { + ++num_saved_tags; + } + + size_t min_tags = num_tags <= num_saved_tags ? num_tags : num_saved_tags; + + struct rb_vm_tag *tag = th->ec->tag; + while (num_tags > min_tags) { + tag = tag->prev; + --num_tags; + } + + struct rb_vm_tag *saved_tag = sec->tag; + while (num_saved_tags > min_tags) { + saved_tag = saved_tag->prev; + --num_saved_tags; + } + + while (min_tags > 0) { + if (tag == saved_tag) { + lowest_common_ancestor = tag; + break; + } + tag = tag->prev; + saved_tag = saved_tag->prev; + --min_tags; + } + + /* free all the jump buffers between the current EC's tag and the lowest common ancestor tag */ + for (struct rb_vm_tag *tag = th->ec->tag; tag != lowest_common_ancestor; tag = tag->prev) { + rb_vm_tag_jmpbuf_deinit(&tag->buf); + } + } +#endif + /* copy vm stack */ #ifdef CAPTURE_JUST_VALID_VM_STACK MEMCPY(th->ec->vm_stack, @@ -1326,10 +1727,8 @@ cont_restore_thread(rb_context_t *cont) th->ec->cfp = sec->cfp; th->ec->raised_flag = sec->raised_flag; th->ec->tag = sec->tag; - th->ec->protect_tag = sec->protect_tag; th->ec->root_lep = sec->root_lep; th->ec->root_svar = sec->root_svar; - th->ec->ensure_list = sec->ensure_list; th->ec->errinfo = sec->errinfo; VM_ASSERT(th->ec->vm_stack != NULL); @@ -1361,22 +1760,33 @@ fiber_setcontext(rb_fiber_t *new_fiber, rb_fiber_t *old_fiber) } } - /* exchange machine_stack_start between old_fiber and new_fiber */ + /* these values are used in rb_gc_mark_machine_context to mark the fiber's stack. */ old_fiber->cont.saved_ec.machine.stack_start = th->ec->machine.stack_start; + old_fiber->cont.saved_ec.machine.stack_end = FIBER_TERMINATED_P(old_fiber) ? NULL : th->ec->machine.stack_end; - /* old_fiber->machine.stack_end should be NULL */ - old_fiber->cont.saved_ec.machine.stack_end = NULL; - /* restore thread context */ - fiber_restore_thread(th, new_fiber); + // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] -> %p[%p]\n", (void*)old_fiber, old_fiber->stack.base, (void*)new_fiber, new_fiber->stack.base); - // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] -> %p[%p]\n", old_fiber, old_fiber->stack.base, new_fiber, new_fiber->stack.base); +#if defined(COROUTINE_SANITIZE_ADDRESS) + __sanitizer_start_switch_fiber(FIBER_TERMINATED_P(old_fiber) ? NULL : &old_fiber->context.fake_stack, new_fiber->context.stack_base, new_fiber->context.stack_size); +#endif /* swap machine context */ - coroutine_transfer(&old_fiber->context, &new_fiber->context); + struct coroutine_context * from = coroutine_transfer(&old_fiber->context, &new_fiber->context); + +#if defined(COROUTINE_SANITIZE_ADDRESS) + __sanitizer_finish_switch_fiber(old_fiber->context.fake_stack, NULL, NULL); +#endif + + if (from == NULL) { + rb_syserr_fail(errno, "coroutine_transfer"); + } + + /* restore thread context */ + fiber_restore_thread(th, old_fiber); // It's possible to get here, and new_fiber is already freed. - // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] <- %p[%p]\n", old_fiber, old_fiber->stack.base, new_fiber, new_fiber->stack.base); + // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] <- %p[%p]\n", (void*)old_fiber, old_fiber->stack.base, (void*)new_fiber, new_fiber->stack.base); } NOINLINE(NORETURN(static void cont_restore_1(rb_context_t *))); @@ -1387,9 +1797,9 @@ cont_restore_1(rb_context_t *cont) cont_restore_thread(cont); /* restore machine stack */ -#ifdef _M_AMD64 +#if (defined(_M_AMD64) && !defined(__MINGW64__)) || defined(_M_ARM64) { - /* workaround for x64 SEH */ + /* workaround for x64 and arm64 SEH on Windows */ jmp_buf buf; setjmp(buf); _JUMP_BUFFER *bp = (void*)&cont->jmpbuf; @@ -1427,6 +1837,10 @@ cont_restore_0(rb_context_t *cont, VALUE *addr_in_prev_frame) if (&space[0] > end) { # ifdef HAVE_ALLOCA volatile VALUE *sp = ALLOCA_N(VALUE, &space[0] - end); + // We need to make sure that the stack pointer is moved, + // but some compilers may remove the allocation by optimization. + // We hope that the following read/write will prevent such an optimization. + *sp = Qfalse; space[0] = *sp; # else cont_restore_0(cont, &space[0]); @@ -1552,6 +1966,13 @@ rb_callcc(VALUE self) return rb_yield(val); } } +#ifdef RUBY_ASAN_ENABLED +/* callcc can't possibly work with ASAN; see bug #20273. Also this function + * definition below avoids a "defined and not used" warning. */ +MAYBE_UNUSED(static void notusing_callcc(void)) { rb_callcc(Qnil); } +# define rb_callcc rb_f_notimplement +#endif + static VALUE make_passing_arg(int argc, const VALUE *argv) @@ -1570,80 +1991,6 @@ make_passing_arg(int argc, const VALUE *argv) typedef VALUE e_proc(VALUE); -/* CAUTION!! : Currently, error in rollback_func is not supported */ -/* same as rb_protect if set rollback_func to NULL */ -void -ruby_register_rollback_func_for_ensure(e_proc *ensure_func, e_proc *rollback_func) -{ - st_table **table_p = &GET_VM()->ensure_rollback_table; - if (UNLIKELY(*table_p == NULL)) { - *table_p = st_init_numtable(); - } - st_insert(*table_p, (st_data_t)ensure_func, (st_data_t)rollback_func); -} - -static inline e_proc * -lookup_rollback_func(e_proc *ensure_func) -{ - st_table *table = GET_VM()->ensure_rollback_table; - st_data_t val; - if (table && st_lookup(table, (st_data_t)ensure_func, &val)) - return (e_proc *) val; - return (e_proc *) Qundef; -} - - -static inline void -rollback_ensure_stack(VALUE self,rb_ensure_list_t *current,rb_ensure_entry_t *target) -{ - rb_ensure_list_t *p; - rb_ensure_entry_t *entry; - size_t i, j; - size_t cur_size; - size_t target_size; - size_t base_point; - e_proc *func; - - cur_size = 0; - for (p=current; p; p=p->next) - cur_size++; - target_size = 0; - for (entry=target; entry->marker; entry++) - target_size++; - - /* search common stack point */ - p = current; - base_point = cur_size; - while (base_point) { - if (target_size >= base_point && - p->entry.marker == target[target_size - base_point].marker) - break; - base_point --; - p = p->next; - } - - /* rollback function check */ - for (i=0; i < target_size - base_point; i++) { - if (!lookup_rollback_func(target[i].e_proc)) { - rb_raise(rb_eRuntimeError, "continuation called from out of critical rb_ensure scope"); - } - } - /* pop ensure stack */ - while (cur_size > base_point) { - /* escape from ensure block */ - (*current->entry.e_proc)(current->entry.data2); - current = current->next; - cur_size--; - } - /* push ensure stack */ - for (j = 0; j < i; j++) { - func = lookup_rollback_func(target[i - j - 1].e_proc); - if ((VALUE)func != Qundef) { - (*func)(target[i - j - 1].data2); - } - } -} - NORETURN(static VALUE rb_cont_call(int argc, VALUE *argv, VALUE contval)); /* @@ -1670,15 +2017,11 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) if (cont_thread_value(cont) != th->self) { rb_raise(rb_eRuntimeError, "continuation called across threads"); } - if (cont->saved_ec.protect_tag != th->ec->protect_tag) { - rb_raise(rb_eRuntimeError, "continuation called across stack rewinding barrier"); - } if (cont->saved_ec.fiber_ptr) { if (th->ec->fiber_ptr != cont->saved_ec.fiber_ptr) { rb_raise(rb_eRuntimeError, "continuation called across fiber"); } } - rollback_ensure_stack(contval, th->ec->ensure_list, cont->ensure_array); cont->argc = argc; cont->value = make_passing_arg(argc, argv); @@ -1755,10 +2098,10 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * == Non-blocking Fibers * * The concept of <em>non-blocking fiber</em> was introduced in Ruby 3.0. - * A non-blocking fiber, when reaching a operation that would normally block + * A non-blocking fiber, when reaching an operation that would normally block * the fiber (like <code>sleep</code>, or wait for another process or I/O) - # will yield control to other fibers and allow the <em>scheduler</em> to - # handle blocking and waking up (resuming) this fiber when it can proceed. + * will yield control to other fibers and allow the <em>scheduler</em> to + * handle blocking and waking up (resuming) this fiber when it can proceed. * * For a Fiber to behave as non-blocking, it need to be created in Fiber.new with * <tt>blocking: false</tt> (which is the default), and Fiber.scheduler @@ -1766,7 +2109,7 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * the current thread, blocking and non-blocking fibers' behavior is identical. * * Ruby doesn't provide a scheduler class: it is expected to be implemented by - * the user and correspond to Fiber::SchedulerInterface. + * the user and correspond to Fiber::Scheduler. * * There is also Fiber.schedule method, which is expected to immediately perform * the given block in a non-blocking manner. Its actual implementation is up to @@ -1774,16 +2117,38 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * */ -static const rb_data_type_t fiber_data_type = { +static void +fiber_handle_weak_references(void *ptr) +{ + rb_fiber_t *fiber = ptr; + + if (!fiber) return; + + if (!rb_gc_handle_weak_references_alive_p(fiber->cont.saved_ec.gen_fields_cache.obj) || + !rb_gc_handle_weak_references_alive_p(fiber->cont.saved_ec.gen_fields_cache.fields_obj)) { + fiber->cont.saved_ec.gen_fields_cache.obj = Qundef; + fiber->cont.saved_ec.gen_fields_cache.fields_obj = Qundef; + } +} + +static const rb_data_type_t rb_fiber_data_type = { "fiber", - {fiber_mark, fiber_free, fiber_memsize, fiber_compact,}, + {fiber_mark, fiber_free, fiber_memsize, fiber_compact, fiber_handle_weak_references}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; static VALUE fiber_alloc(VALUE klass) { - return TypedData_Wrap_Struct(klass, &fiber_data_type, 0); + VALUE obj = TypedData_Wrap_Struct(klass, &rb_fiber_data_type, 0); + rb_gc_declare_weak_references(obj); + return obj; +} + +static rb_serial_t +next_ec_serial(rb_ractor_t *cr) +{ + return cr->next_ec_serial++; } static rb_fiber_t* @@ -1801,9 +2166,11 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) fiber->cont.self = fiber_value; fiber->cont.type = FIBER_CONTEXT; fiber->blocking = blocking; + fiber->killed = 0; cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; + fiber->cont.saved_ec.serial = next_ec_serial(th->ractor); rb_ec_clear_vm_stack(&fiber->cont.saved_ec); fiber->prev = NULL; @@ -1817,11 +2184,198 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) return fiber; } +static inline rb_fiber_t* +fiber_current(void) +{ + rb_execution_context_t *ec = GET_EC(); + return ec->fiber_ptr; +} + +static inline VALUE +current_fiber_storage(void) +{ + rb_execution_context_t *ec = GET_EC(); + return ec->storage; +} + +static inline VALUE +inherit_fiber_storage(void) +{ + return rb_obj_dup(current_fiber_storage()); +} + +static inline void +fiber_storage_set(struct rb_fiber_struct *fiber, VALUE storage) +{ + fiber->cont.saved_ec.storage = storage; +} + +static inline VALUE +fiber_storage_get(rb_fiber_t *fiber, int allocate) +{ + VALUE storage = fiber->cont.saved_ec.storage; + if (storage == Qnil && allocate) { + storage = rb_hash_new(); + fiber_storage_set(fiber, storage); + } + return storage; +} + +static void +storage_access_must_be_from_same_fiber(VALUE self) +{ + rb_fiber_t *fiber = fiber_ptr(self); + rb_fiber_t *current = fiber_current(); + if (fiber != current) { + rb_raise(rb_eArgError, "Fiber storage can only be accessed from the Fiber it belongs to"); + } +} + +/** + * call-seq: fiber.storage -> hash (dup) + * + * Returns a copy of the storage hash for the fiber. The method can only be called on the + * Fiber.current. + */ static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) +rb_fiber_storage_get(VALUE self) { + storage_access_must_be_from_same_fiber(self); + + VALUE storage = fiber_storage_get(fiber_ptr(self), FALSE); + + if (storage == Qnil) { + return Qnil; + } + else { + return rb_obj_dup(storage); + } +} + +static int +fiber_storage_validate_each(VALUE key, VALUE value, VALUE _argument) +{ + Check_Type(key, T_SYMBOL); + + return ST_CONTINUE; +} + +static void +fiber_storage_validate(VALUE value) +{ + // nil is an allowed value and will be lazily initialized. + if (value == Qnil) return; + + if (!RB_TYPE_P(value, T_HASH)) { + rb_raise(rb_eTypeError, "storage must be a hash"); + } + + if (RB_OBJ_FROZEN(value)) { + rb_raise(rb_eFrozenError, "storage must not be frozen"); + } + + rb_hash_foreach(value, fiber_storage_validate_each, Qundef); +} + +/** + * call-seq: fiber.storage = hash + * + * Sets the storage hash for the fiber. This feature is experimental + * and may change in the future. The method can only be called on the + * Fiber.current. + * + * You should be careful about using this method as you may inadvertently clear + * important fiber-storage state. You should mostly prefer to assign specific + * keys in the storage using Fiber::[]=. + * + * You can also use <tt>Fiber.new(storage: nil)</tt> to create a fiber with an empty + * storage. + * + * Example: + * + * while request = request_queue.pop + * # Reset the per-request state: + * Fiber.current.storage = nil + * handle_request(request) + * end + */ +static VALUE +rb_fiber_storage_set(VALUE self, VALUE value) +{ + if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { + rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL, + "Fiber#storage= is experimental and may be removed in the future!"); + } + + storage_access_must_be_from_same_fiber(self); + fiber_storage_validate(value); + + fiber_ptr(self)->cont.saved_ec.storage = rb_obj_dup(value); + return value; +} + +/** + * call-seq: Fiber[key] -> value + * + * Returns the value of the fiber storage variable identified by +key+. + * + * The +key+ must be a symbol, and the value is set by Fiber#[]= or + * Fiber#storage. + * + * See also Fiber::[]=. + */ +static VALUE +rb_fiber_storage_aref(VALUE class, VALUE key) +{ + key = rb_to_symbol(key); + + VALUE storage = fiber_storage_get(fiber_current(), FALSE); + if (storage == Qnil) return Qnil; + + return rb_hash_aref(storage, key); +} + +/** + * call-seq: Fiber[key] = value + * + * Assign +value+ to the fiber storage variable identified by +key+. + * The variable is created if it doesn't exist. + * + * +key+ must be a Symbol, otherwise a TypeError is raised. + * + * See also Fiber::[]. + */ +static VALUE +rb_fiber_storage_aset(VALUE class, VALUE key, VALUE value) +{ + key = rb_to_symbol(key); + + VALUE storage = fiber_storage_get(fiber_current(), value != Qnil); + if (storage == Qnil) return Qnil; + + if (value == Qnil) { + return rb_hash_delete(storage, key); + } + else { + return rb_hash_aset(storage, key, value); + } +} + +static VALUE +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking, VALUE storage) +{ + if (storage == Qundef || storage == Qtrue) { + // The default, inherit storage (dup) from the current fiber: + storage = inherit_fiber_storage(); + } + else /* nil, hash, etc. */ { + fiber_storage_validate(storage); + storage = rb_obj_dup(storage); + } + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); + fiber->cont.saved_ec.storage = storage; fiber->first_proc = proc; fiber->stack.base = NULL; fiber->stack.pool = fiber_pool; @@ -1854,54 +2408,90 @@ rb_fiber_pool_default(VALUE pool) return &shared_fiber_pool; } +VALUE rb_fiber_inherit_storage(struct rb_execution_context_struct *ec, struct rb_fiber_struct *fiber) +{ + VALUE storage = rb_obj_dup(ec->storage); + fiber->cont.saved_ec.storage = storage; + return storage; +} + /* :nodoc: */ static VALUE rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) { VALUE pool = Qnil; VALUE blocking = Qfalse; + VALUE storage = Qundef; if (kw_splat != RB_NO_KEYWORDS) { VALUE options = Qnil; - VALUE arguments[2] = {Qundef}; + VALUE arguments[3] = {Qundef}; argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); - rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 3, arguments); - if (arguments[0] != Qundef) { + if (!UNDEF_P(arguments[0])) { blocking = arguments[0]; } - if (arguments[1] != Qundef) { + if (!UNDEF_P(arguments[1])) { pool = arguments[1]; } + + storage = arguments[2]; } - return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking), storage); } /* * call-seq: - * Fiber.new(blocking: false) { |*args| ... } -> fiber + * Fiber.new(blocking: false, storage: true) { |*args| ... } -> fiber * - * Creates new Fiber. Initially, the fiber is not running and can be resumed with - * #resume. Arguments to the first #resume call will be passed to the block: + * Creates new Fiber. Initially, the fiber is not running and can be resumed + * with #resume. Arguments to the first #resume call will be passed to the + * block: * - * f = Fiber.new do |initial| - * current = initial - * loop do - * puts "current: #{current.inspect}" - * current = Fiber.yield - * end - * end - * f.resume(100) # prints: current: 100 - * f.resume(1, 2, 3) # prints: current: [1, 2, 3] - * f.resume # prints: current: nil - * # ... and so on ... - * - * If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current thread - * has a Fiber.scheduler defined, the Fiber becomes non-blocking (see "Non-blocking - * Fibers" section in class docs). + * f = Fiber.new do |initial| + * current = initial + * loop do + * puts "current: #{current.inspect}" + * current = Fiber.yield + * end + * end + * f.resume(100) # prints: current: 100 + * f.resume(1, 2, 3) # prints: current: [1, 2, 3] + * f.resume # prints: current: nil + * # ... and so on ... + * + * If <tt>blocking: false</tt> is passed to <tt>Fiber.new</tt>, _and_ current + * thread has a Fiber.scheduler defined, the Fiber becomes non-blocking (see + * "Non-blocking Fibers" section in class docs). + * + * If the <tt>storage</tt> is unspecified, the default is to inherit a copy of + * the storage from the current fiber. This is the same as specifying + * <tt>storage: true</tt>. + * + * Fiber[:x] = 1 + * Fiber.new do + * Fiber[:x] # => 1 + * Fiber[:x] = 2 + * end.resume + * Fiber[:x] # => 1 + * + * If the given <tt>storage</tt> is <tt>nil</tt>, this function will lazy + * initialize the internal storage, which starts as an empty hash. + * + * Fiber[:x] = "Hello World" + * Fiber.new(storage: nil) do + * Fiber[:x] # nil + * end + * + * Otherwise, the given <tt>storage</tt> is used as the new fiber's storage, + * and it must be an instance of Hash. + * + * Explicitly using <tt>storage: true</tt> is currently experimental and may + * change in the future. */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) @@ -1910,9 +2500,15 @@ rb_fiber_initialize(int argc, VALUE* argv, VALUE self) } VALUE +rb_fiber_new_storage(rb_block_call_func_t func, VALUE obj, VALUE storage) +{ + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 0, storage); +} + +VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); + return rb_fiber_new_storage(func, obj, Qtrue); } static VALUE @@ -1923,7 +2519,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) VALUE fiber = Qnil; if (scheduler != Qnil) { - fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + fiber = rb_fiber_scheduler_fiber(scheduler, argc, argv, kw_splat); } else { rb_raise(rb_eRuntimeError, "No scheduler is available!"); @@ -1966,7 +2562,7 @@ rb_fiber_s_schedule_kw(int argc, VALUE* argv, int kw_splat) * * Note that the behavior described above is how the method is <em>expected</em> * to behave, actual behavior is up to the current scheduler's implementation of - * Fiber::SchedulerInterface#fiber method. Ruby doesn't enforce this method to + * Fiber::Scheduler#fiber method. Ruby doesn't enforce this method to * behave in any particular way. * * If the scheduler is not set, the method raises @@ -1985,7 +2581,7 @@ rb_fiber_s_schedule(int argc, VALUE *argv, VALUE obj) * * Returns the Fiber scheduler, that was last set for the current thread with Fiber.set_scheduler. * Returns +nil+ if no scheduler is set (which is the default), and non-blocking fibers' - # behavior is the same as blocking. + * behavior is the same as blocking. * (see "Non-blocking fibers" section in class docs for details about the scheduler concept). * */ @@ -2000,7 +2596,7 @@ rb_fiber_s_scheduler(VALUE klass) * Fiber.current_scheduler -> obj or nil * * Returns the Fiber scheduler, that was last set for the current thread with Fiber.set_scheduler - * iff the current fiber is non-blocking. + * if and only if the current fiber is non-blocking. * */ static VALUE @@ -2019,7 +2615,7 @@ rb_fiber_current_scheduler(VALUE klass) * thread will call scheduler's +close+ method on finalization (allowing the scheduler to * properly manage all non-finished fibers). * - * +scheduler+ can be an object of any class corresponding to Fiber::SchedulerInterface. Its + * +scheduler+ can be an object of any class corresponding to Fiber::Scheduler. Its * implementation is up to the user. * * See also the "Non-blocking fibers" section in class docs. @@ -2034,14 +2630,12 @@ rb_fiber_set_scheduler(VALUE klass, VALUE scheduler) NORETURN(static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err)); void -rb_fiber_start(void) +rb_fiber_start(rb_fiber_t *fiber) { - rb_thread_t * volatile th = GET_THREAD(); - rb_fiber_t *fiber = th->ec->fiber_ptr; + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + rb_proc_t *proc; enum ruby_tag_type state; - int need_interrupt = TRUE; - VALUE err = Qfalse; VM_ASSERT(th->ec == GET_EC()); VM_ASSERT(FIBER_RESUMED_P(fiber)); @@ -2067,6 +2661,8 @@ rb_fiber_start(void) } EC_POP_TAG(); + int need_interrupt = TRUE; + VALUE err = Qfalse; if (state) { err = th->ec->errinfo; VM_ASSERT(FIBER_RESUMED_P(fiber)); @@ -2074,60 +2670,52 @@ rb_fiber_start(void) if (state == TAG_RAISE) { // noop... } + else if (state == TAG_FATAL && err == RUBY_FATAL_FIBER_KILLED) { + need_interrupt = FALSE; + err = Qfalse; + } else if (state == TAG_FATAL) { rb_threadptr_pending_interrupt_enque(th, err); } else { err = rb_vm_make_jump_tag_but_local_jump(state, err); } - need_interrupt = TRUE; } rb_fiber_terminate(fiber, need_interrupt, err); - VM_UNREACHABLE(rb_fiber_start); -} - -static rb_fiber_t * -root_fiber_alloc(rb_thread_t *th) -{ - VALUE fiber_value = fiber_alloc(rb_cFiber); - rb_fiber_t *fiber = th->ec->fiber_ptr; - - VM_ASSERT(DATA_PTR(fiber_value) == NULL); - VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); - VM_ASSERT(fiber->status == FIBER_RESUMED); - - th->root_fiber = fiber; - DATA_PTR(fiber_value) = fiber; - fiber->cont.self = fiber_value; - -#ifdef COROUTINE_PRIVATE_STACK - fiber->stack = fiber_pool_stack_acquire(&shared_fiber_pool); - coroutine_initialize_main(&fiber->context, fiber_pool_stack_base(&fiber->stack), fiber->stack.available, th->ec->machine.stack_start); -#else - coroutine_initialize_main(&fiber->context); -#endif - - return fiber; } +// Set up a "root fiber", which is the fiber that every Ractor has. void rb_threadptr_root_fiber_setup(rb_thread_t *th) { - rb_fiber_t *fiber = ruby_mimmalloc(sizeof(rb_fiber_t)); + rb_fiber_t *fiber = ZALLOC(rb_fiber_t); if (!fiber) { rb_bug("%s", strerror(errno)); /* ... is it possible to call rb_bug here? */ } - MEMZERO(fiber, rb_fiber_t, 1); + fiber->cont.type = FIBER_CONTEXT; fiber->cont.saved_ec.fiber_ptr = fiber; + fiber->cont.saved_ec.serial = next_ec_serial(th->ractor); fiber->cont.saved_ec.thread_ptr = th; fiber->blocking = 1; + fiber->killed = 0; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ + + coroutine_initialize_main(&fiber->context); + th->ec = &fiber->cont.saved_ec; - // This skips mjit_cont_new for the initial thread because mjit_enabled is always false - // at this point. mjit_init calls rb_fiber_init_mjit_cont again for this root_fiber. - rb_fiber_init_mjit_cont(fiber); + + cont_init_jit_cont(&fiber->cont); +} + +void +rb_root_fiber_obj_setup(rb_thread_t *th) +{ + rb_fiber_t *fiber = th->ec->fiber_ptr; + VALUE fiber_value = fiber_alloc(rb_cFiber); + DATA_PTR(fiber_value) = fiber; + fiber->cont.self = fiber_value; } void @@ -2137,12 +2725,12 @@ rb_threadptr_root_fiber_release(rb_thread_t *th) /* ignore. A root fiber object will free th->ec */ } else { - rb_execution_context_t *ec = GET_EC(); + rb_execution_context_t *ec = rb_current_execution_context(false); VM_ASSERT(th->ec->fiber_ptr->cont.type == FIBER_CONTEXT); VM_ASSERT(th->ec->fiber_ptr->cont.self == 0); - if (th->ec == ec) { + if (ec && th->ec == ec) { rb_ractor_set_current_ec(th->ractor, NULL); } fiber_free(th->ec->fiber_ptr); @@ -2162,16 +2750,6 @@ rb_threadptr_root_fiber_terminate(rb_thread_t *th) } static inline rb_fiber_t* -fiber_current(void) -{ - rb_execution_context_t *ec = GET_EC(); - if (ec->fiber_ptr->cont.self == 0) { - root_fiber_alloc(rb_ec_thread_ptr(ec)); - } - return ec->fiber_ptr; -} - -static inline rb_fiber_t* return_fiber(bool terminate) { rb_fiber_t *fiber = fiber_current(); @@ -2179,7 +2757,7 @@ return_fiber(bool terminate) if (prev) { fiber->prev = NULL; - prev->resuming_fiber = Qnil; + prev->resuming_fiber = NULL; return prev; } else { @@ -2193,9 +2771,7 @@ return_fiber(bool terminate) VM_ASSERT(root_fiber != NULL); // search resuming fiber - for (fiber = root_fiber; - RTEST(fiber->resuming_fiber); - fiber = fiber_ptr(fiber->resuming_fiber)) { + for (fiber = root_fiber; fiber->resuming_fiber; fiber = fiber->resuming_fiber) { } return fiber; @@ -2212,15 +2788,7 @@ rb_fiber_current(void) static inline void fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) { - rb_fiber_t *fiber; - - if (th->ec->fiber_ptr != NULL) { - fiber = th->ec->fiber_ptr; - } - else { - /* create root fiber */ - fiber = root_fiber_alloc(th); - } + rb_fiber_t *fiber = th->ec->fiber_ptr; if (FIBER_CREATED_P(next_fiber)) { fiber_prepare_stack(next_fiber); @@ -2235,15 +2803,30 @@ fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) fiber_setcontext(next_fiber, fiber); } +static void +fiber_check_killed(rb_fiber_t *fiber) +{ + VM_ASSERT(fiber == fiber_current()); + + if (fiber->killed) { + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; + + thread->ec->errinfo = RUBY_FATAL_FIBER_KILLED; + EC_JUMP_TAG(thread->ec, RUBY_TAG_FATAL); + } +} + static inline VALUE -fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE resuming_fiber, bool yielding) +fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fiber_t *resuming_fiber, bool yielding) { VALUE value; rb_context_t *cont = &fiber->cont; rb_thread_t *th = GET_THREAD(); /* make sure the root_fiber object is available */ - if (th->root_fiber == NULL) root_fiber_alloc(th); + if (th->root_fiber == NULL) { + th->root_fiber = th->ec->fiber_ptr; + } if (th->ec->fiber_ptr == fiber) { /* ignore fiber context switch @@ -2255,10 +2838,8 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE if (cont_thread_value(cont) != th->self) { rb_raise(rb_eFiberError, "fiber called across threads"); } - else if (cont->saved_ec.protect_tag != th->ec->protect_tag) { - rb_raise(rb_eFiberError, "fiber called across stack rewinding barrier"); - } - else if (FIBER_TERMINATED_P(fiber)) { + + if (FIBER_TERMINATED_P(fiber)) { value = rb_exc_new2(rb_eFiberError, "dead fiber called"); if (!FIBER_TERMINATED_P(th->ec->fiber_ptr)) { @@ -2285,8 +2866,9 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE rb_fiber_t *current_fiber = fiber_current(); - VM_ASSERT(!RTEST(current_fiber->resuming_fiber)); - if (RTEST(resuming_fiber)) { + VM_ASSERT(!current_fiber->resuming_fiber); + + if (resuming_fiber) { current_fiber->resuming_fiber = resuming_fiber; fiber->prev = fiber_current(); fiber->yielding = 0; @@ -2307,9 +2889,14 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE fiber_store(fiber, th); - if (RTEST(resuming_fiber) && FIBER_TERMINATED_P(fiber)) { - fiber_stack_release(fiber); + // We cannot free the stack until the pthread is joined: +#ifndef COROUTINE_PTHREAD_CONTEXT + if (FIBER_TERMINATED_P(fiber)) { + RB_VM_LOCKING() { + fiber_stack_release(fiber); + } } +#endif if (fiber_current()->blocking) { th->blocking += 1; @@ -2321,14 +2908,21 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE current_fiber = th->ec->fiber_ptr; value = current_fiber->cont.value; - if (current_fiber->cont.argc == -1) rb_exc_raise(value); + + fiber_check_killed(current_fiber); + + if (current_fiber->cont.argc == -1) { + // Fiber#raise will trigger this path. + rb_exc_raise(value); + } + return value; } VALUE rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) { - return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, Qfalse, false); + return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, NULL, false); } /* @@ -2348,7 +2942,62 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) VALUE rb_fiber_blocking_p(VALUE fiber) { - return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue; + return RBOOL(fiber_ptr(fiber)->blocking); +} + +static VALUE +fiber_blocking_yield(VALUE fiber_value) +{ + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + VM_ASSERT(fiber->blocking == 0); + + // fiber->blocking is `unsigned int : 1`, so we use it as a boolean: + fiber->blocking = 1; + + // Once the fiber is blocking, and current, we increment the thread blocking state: + th->blocking += 1; + + return rb_yield(fiber_value); +} + +static VALUE +fiber_blocking_ensure(VALUE fiber_value) +{ + rb_fiber_t *fiber = fiber_ptr(fiber_value); + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + + // We are no longer blocking: + fiber->blocking = 0; + th->blocking -= 1; + + return Qnil; +} + +/* + * call-seq: + * Fiber.blocking{|fiber| ...} -> result + * + * Forces the fiber to be blocking for the duration of the block. Returns the + * result of the block. + * + * See the "Non-blocking fibers" section in class docs for details. + * + */ +VALUE +rb_fiber_blocking(VALUE class) +{ + VALUE fiber_value = rb_fiber_current(); + rb_fiber_t *fiber = fiber_ptr(fiber_value); + + // If we are already blocking, this is essentially a no-op: + if (fiber->blocking) { + return rb_yield(fiber_value); + } + else { + return rb_ensure(fiber_blocking_yield, fiber_value, fiber_blocking_ensure, fiber_value); + } } /* @@ -2385,35 +3034,34 @@ void rb_fiber_close(rb_fiber_t *fiber) { fiber_status_set(fiber, FIBER_TERMINATED); + rb_ec_close(&fiber->cont.saved_ec); } static void -rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err) +rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE error) { VALUE value = fiber->cont.value; - rb_fiber_t *next_fiber; VM_ASSERT(FIBER_RESUMED_P(fiber)); rb_fiber_close(fiber); - coroutine_destroy(&fiber->context); - fiber->cont.machine.stack = NULL; fiber->cont.machine.stack_size = 0; - next_fiber = return_fiber(true); + rb_fiber_t *next_fiber = return_fiber(true); + if (need_interrupt) RUBY_VM_SET_INTERRUPT(&next_fiber->cont.saved_ec); - if (RTEST(err)) - fiber_switch(next_fiber, -1, &err, RB_NO_KEYWORDS, Qfalse, false); + + if (RTEST(error)) + fiber_switch(next_fiber, -1, &error, RB_NO_KEYWORDS, NULL, false); else - fiber_switch(next_fiber, 1, &value, RB_NO_KEYWORDS, Qfalse, false); - VM_UNREACHABLE(rb_fiber_terminate); + fiber_switch(next_fiber, 1, &value, RB_NO_KEYWORDS, NULL, false); + ruby_stop(0); } -VALUE -rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat) +static VALUE +fiber_resume_kw(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat) { - rb_fiber_t *fiber = fiber_ptr(fiber_value); rb_fiber_t *current_fiber = fiber_current(); if (argc == -1 && FIBER_CREATED_P(fiber)) { @@ -2428,7 +3076,7 @@ rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat) else if (fiber->prev != NULL) { rb_raise(rb_eFiberError, "attempt to resume a resumed fiber (double resume)"); } - else if (RTEST(fiber->resuming_fiber)) { + else if (fiber->resuming_fiber) { rb_raise(rb_eFiberError, "attempt to resume a resuming fiber"); } else if (fiber->prev == NULL && @@ -2436,25 +3084,31 @@ rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat) rb_raise(rb_eFiberError, "attempt to resume a transferring fiber"); } - return fiber_switch(fiber, argc, argv, kw_splat, fiber_value, false); + return fiber_switch(fiber, argc, argv, kw_splat, fiber, false); } VALUE -rb_fiber_resume(VALUE fiber_value, int argc, const VALUE *argv) +rb_fiber_resume_kw(VALUE self, int argc, const VALUE *argv, int kw_splat) { - return rb_fiber_resume_kw(fiber_value, argc, argv, RB_NO_KEYWORDS); + return fiber_resume_kw(fiber_ptr(self), argc, argv, kw_splat); +} + +VALUE +rb_fiber_resume(VALUE self, int argc, const VALUE *argv) +{ + return fiber_resume_kw(fiber_ptr(self), argc, argv, RB_NO_KEYWORDS); } VALUE rb_fiber_yield_kw(int argc, const VALUE *argv, int kw_splat) { - return fiber_switch(return_fiber(false), argc, argv, kw_splat, Qfalse, true); + return fiber_switch(return_fiber(false), argc, argv, kw_splat, NULL, true); } VALUE rb_fiber_yield(int argc, const VALUE *argv) { - return fiber_switch(return_fiber(false), argc, argv, RB_NO_KEYWORDS, Qfalse, true); + return fiber_switch(return_fiber(false), argc, argv, RB_NO_KEYWORDS, NULL, true); } void @@ -2476,7 +3130,7 @@ rb_fiber_reset_root_local_storage(rb_thread_t *th) VALUE rb_fiber_alive_p(VALUE fiber_value) { - return FIBER_TERMINATED_P(fiber_ptr(fiber_value)) ? Qfalse : Qtrue; + return RBOOL(!FIBER_TERMINATED_P(fiber_ptr(fiber_value))); } /* @@ -2500,45 +3154,6 @@ rb_fiber_m_resume(int argc, VALUE *argv, VALUE fiber) return rb_fiber_resume_kw(fiber, argc, argv, rb_keyword_given_p()); } -static VALUE rb_fiber_transfer_kw(VALUE fiber_value, int argc, VALUE *argv, int kw_splat); - -/* - * call-seq: - * fiber.raise -> obj - * fiber.raise(string) -> obj - * fiber.raise(exception [, string [, array]]) -> obj - * - * Raises an exception in the fiber at the point at which the last - * +Fiber.yield+ was called. If the fiber has not been started or has - * already run to completion, raises +FiberError+. If the fiber is - * yielding, it is resumed. If it is transferring, it is transferred into. - * But if it is resuming, raises +FiberError+. - * - * With no arguments, raises a +RuntimeError+. With a single +String+ - * argument, raises a +RuntimeError+ with the string as a message. Otherwise, - * the first parameter should be the name of an +Exception+ class (or an - * object that returns an +Exception+ object when sent an +exception+ - * message). The optional second parameter sets the message associated with - * the exception, and the third parameter is an array of callback information. - * Exceptions are caught by the +rescue+ clause of <code>begin...end</code> - * blocks. - */ -static VALUE -rb_fiber_raise(int argc, VALUE *argv, VALUE fiber_value) -{ - rb_fiber_t *fiber = fiber_ptr(fiber_value); - VALUE exc = rb_make_exception(argc, argv); - if (RTEST(fiber->resuming_fiber)) { - rb_raise(rb_eFiberError, "attempt to raise a resuming fiber"); - } - else if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) { - return rb_fiber_transfer_kw(fiber_value, -1, &exc, RB_NO_KEYWORDS); - } - else { - return rb_fiber_resume_kw(fiber_value, -1, &exc, RB_NO_KEYWORDS); - } -} - /* * call-seq: * fiber.backtrace -> array @@ -2699,22 +3314,29 @@ rb_fiber_backtrace_locations(int argc, VALUE *argv, VALUE fiber) * */ static VALUE -rb_fiber_m_transfer(int argc, VALUE *argv, VALUE fiber_value) +rb_fiber_m_transfer(int argc, VALUE *argv, VALUE self) { - return rb_fiber_transfer_kw(fiber_value, argc, argv, rb_keyword_given_p()); + return rb_fiber_transfer_kw(self, argc, argv, rb_keyword_given_p()); } static VALUE -rb_fiber_transfer_kw(VALUE fiber_value, int argc, VALUE *argv, int kw_splat) +fiber_transfer_kw(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat) { - rb_fiber_t *fiber = fiber_ptr(fiber_value); - if (RTEST(fiber->resuming_fiber)) { - rb_raise(rb_eFiberError, "attempt to transfer to a resuming fiber"); - } - if (fiber->yielding) { - rb_raise(rb_eFiberError, "attempt to transfer to a yielding fiber"); - } - return fiber_switch(fiber, argc, argv, kw_splat, Qfalse, false); + if (fiber->resuming_fiber) { + rb_raise(rb_eFiberError, "attempt to transfer to a resuming fiber"); + } + + if (fiber->yielding) { + rb_raise(rb_eFiberError, "attempt to transfer to a yielding fiber"); + } + + return fiber_switch(fiber, argc, argv, kw_splat, NULL, false); +} + +VALUE +rb_fiber_transfer_kw(VALUE self, int argc, const VALUE *argv, int kw_splat) +{ + return fiber_transfer_kw(fiber_ptr(self), argc, argv, kw_splat); } /* @@ -2733,6 +3355,111 @@ rb_fiber_s_yield(int argc, VALUE *argv, VALUE klass) return rb_fiber_yield_kw(argc, argv, rb_keyword_given_p()); } +static VALUE +fiber_raise(rb_fiber_t *fiber, VALUE exception) +{ + if (fiber == fiber_current()) { + rb_exc_raise(exception); + } + else if (fiber->resuming_fiber) { + return fiber_raise(fiber->resuming_fiber, exception); + } + else if (FIBER_SUSPENDED_P(fiber) && !fiber->yielding) { + return fiber_transfer_kw(fiber, -1, &exception, RB_NO_KEYWORDS); + } + else { + return fiber_resume_kw(fiber, -1, &exception, RB_NO_KEYWORDS); + } +} + +VALUE +rb_fiber_raise(VALUE fiber, int argc, VALUE *argv) +{ + VALUE exception = rb_exception_setup(argc, argv); + + return fiber_raise(fiber_ptr(fiber), exception); +} + +/* + * call-seq: + * raise(exception, message = exception.to_s, backtrace = nil, cause: $!) + * raise(message = nil, cause: $!) + * + * Raises an exception in the fiber at the point at which the last + * +Fiber.yield+ was called. + * + * f = Fiber.new { + * puts "Before the yield" + * Fiber.yield 1 # -- exception will be raised here + * puts "After the yield" + * } + * + * p f.resume + * f.raise "Gotcha" + * + * Output + * + * Before the first yield + * 1 + * t.rb:8:in 'Fiber.yield': Gotcha (RuntimeError) + * from t.rb:8:in 'block in <main>' + * + * If the fiber has not been started or has + * already run to completion, raises +FiberError+. If the fiber is + * yielding, it is resumed. If it is transferring, it is transferred into. + * But if it is resuming, raises +FiberError+. + * + * Raises +FiberError+ if called on a Fiber belonging to another +Thread+. + * + * See Kernel#raise for more information on arguments. + * + */ +static VALUE +rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) +{ + return rb_fiber_raise(self, argc, argv); +} + +/* + * call-seq: + * fiber.kill -> nil + * + * Terminates the fiber by raising an uncatchable exception. + * It only terminates the given fiber and no other fiber, returning +nil+ to + * another fiber if that fiber was calling #resume or #transfer. + * + * <tt>Fiber#kill</tt> only interrupts another fiber when it is in Fiber.yield. + * If called on the current fiber then it raises that exception at the <tt>Fiber#kill</tt> call site. + * + * If the fiber has not been started, transition directly to the terminated state. + * + * If the fiber is already terminated, does nothing. + * + * Raises FiberError if called on a fiber belonging to another thread. + */ +static VALUE +rb_fiber_m_kill(VALUE self) +{ + rb_fiber_t *fiber = fiber_ptr(self); + + if (fiber->killed) return Qfalse; + fiber->killed = 1; + + if (fiber->status == FIBER_CREATED) { + fiber->status = FIBER_TERMINATED; + } + else if (fiber->status != FIBER_TERMINATED) { + if (fiber_current() == fiber) { + fiber_check_killed(fiber); + } + else { + fiber_raise(fiber_ptr(self), Qnil); + } + } + + return self; +} + /* * call-seq: * Fiber.current -> fiber @@ -2753,7 +3480,7 @@ fiber_to_s(VALUE fiber_value) const rb_proc_t *proc; char status_info[0x20]; - if (RTEST(fiber->resuming_fiber)) { + if (fiber->resuming_fiber) { snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status)); } else { @@ -2780,6 +3507,8 @@ rb_fiber_atfork(rb_thread_t *th) th->root_fiber = th->ec->fiber_ptr; } th->root_fiber->prev = 0; + th->root_fiber->blocking = 1; + th->blocking = 1; } } #endif @@ -2791,8 +3520,8 @@ fiber_pool_free(void *ptr) struct fiber_pool * fiber_pool = ptr; RUBY_FREE_ENTER("fiber_pool"); - fiber_pool_free_allocations(fiber_pool->allocations); - ruby_xfree(fiber_pool); + fiber_pool_allocation_free(fiber_pool->allocations); + SIZED_FREE(fiber_pool); RUBY_FREE_LEAVE("fiber_pool"); } @@ -2817,9 +3546,9 @@ static const rb_data_type_t FiberPoolDataType = { static VALUE fiber_pool_alloc(VALUE klass) { - struct fiber_pool * fiber_pool = RB_ALLOC(struct fiber_pool); + struct fiber_pool *fiber_pool; - return TypedData_Wrap_Struct(klass, &FiberPoolDataType, fiber_pool); + return TypedData_Make_Struct(klass, struct fiber_pool, &FiberPoolDataType, fiber_pool); } static VALUE @@ -2833,7 +3562,7 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) rb_scan_args(argc, argv, "03", &size, &count, &vm_stack_size); if (NIL_P(size)) { - size = INT2NUM(th->vm->default_params.fiber_machine_stack_size); + size = SIZET2NUM(th->vm->default_params.fiber_machine_stack_size); } if (NIL_P(count)) { @@ -2841,12 +3570,12 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) } if (NIL_P(vm_stack_size)) { - vm_stack_size = INT2NUM(th->vm->default_params.fiber_vm_stack_size); + vm_stack_size = SIZET2NUM(th->vm->default_params.fiber_vm_stack_size); } TypedData_Get_Struct(self, struct fiber_pool, &FiberPoolDataType, fiber_pool); - fiber_pool_initialize(fiber_pool, NUM2SIZET(size), NUM2SIZET(count), NUM2SIZET(vm_stack_size)); + fiber_pool_initialize(fiber_pool, NUM2SIZET(size), NUM2SIZET(count), 0, NUM2SIZET(vm_stack_size)); return self; } @@ -2865,192 +3594,45 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) * fiber.resume #=> FiberError: dead fiber called */ -/* - * Document-class: Fiber::SchedulerInterface - * - * This is not an existing class, but documentation of the interface that Scheduler - * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking - * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations - * of some concepts. - * - * Scheduler's behavior and usage are expected to be as follows: - * - * * When the execution in the non-blocking Fiber reaches some blocking operation (like - * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's - * hook methods, listed below. - * * Scheduler somehow registers what the current fiber is waiting on, and yields control - * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its - * wait to end, and other fibers in the same thread can perform) - * * At the end of the current thread execution, the scheduler's method #close is called - * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has - * registered on hook calls) and resuming them when the awaited resource is ready - * (e.g. I/O ready or sleep time elapsed). - * - * A typical implementation would probably rely for this closing loop on a gem like - * EventMachine[https://github.com/eventmachine/eventmachine] or - * Async[https://github.com/socketry/async]. - * - * This way concurrent execution will be achieved transparently for every - * individual Fiber's code. - * - * Hook methods are: - * - * * #io_wait - * * #process_wait - * * #kernel_sleep - * * #block and #unblock - * * (the list is expanded as Ruby developers make more methods having non-blocking calls) - * - * When not specified otherwise, the hook implementations are mandatory: if they are not - * implemented, the methods trying to call hook will fail. To provide backward compatibility, - * in the future hooks will be optional (if they are not implemented, due to the scheduler - * being created for the older Ruby version, the code which needs this hook will not fail, - * and will just behave in a blocking fashion). - * - * It is also strongly recommended that the scheduler implements the #fiber method, which is - * delegated to by Fiber.schedule. - * - * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in - * <tt>test/fiber/scheduler.rb</tt> - * - */ - -#if 0 /* for RDoc */ -/* - * - * Document-method: Fiber::SchedulerInterface#close - * - * Called when the current thread exits. The scheduler is expected to implement this - * method in order to allow all waiting fibers to finalize their execution. - * - * The suggested pattern is to implement the main event loop in the #close method. - * - */ -static VALUE -rb_fiber_scheduler_interface_close(VALUE self) -{ -} - -/* - * Document-method: SchedulerInterface#process_wait - * call-seq: process_wait(pid, flags) - * - * Invoked by Process::Status.wait in order to wait for a specified process. - * See that method description for arguments description. - * - * Suggested minimal implementation: - * - * Thread.new do - * Process::Status.wait(pid, flags) - * end.value - * - * This hook is optional: if it is not present in the current scheduler, - * Process::Status.wait will behave as a blocking method. - * - * Expected to return a Process::Status instance. - */ -static VALUE -rb_fiber_scheduler_interface_process_wait(VALUE self) +static size_t +shared_fiber_pool_minimum_count(void) { -} + size_t minimum_count = FIBER_POOL_MINIMUM_COUNT; -/* - * Document-method: SchedulerInterface#io_wait - * call-seq: io_wait(io, events, timeout) - * - * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the - * specified descriptor is ready for specified events within - * the specified +timeout+. - * - * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and - * <tt>IO::PRIORITY</tt>. - * - * Suggested implementation should register which Fiber is waiting for which - * resources and immediately calling Fiber.yield to pass control to other - * fibers. Then, in the #close method, the scheduler might dispatch all the - * I/O resources to fibers waiting for it. - * - * Expected to return the subset of events that are ready immediately. - * - */ -static VALUE -rb_fiber_scheduler_interface_io_wait(VALUE self) -{ -} + const char *minimum_count_env = getenv("RUBY_SHARED_FIBER_POOL_MINIMUM_COUNT"); + if (minimum_count_env && minimum_count_env[0]) { + char *end; + unsigned long value = strtoul(minimum_count_env, &end, 10); + if (end != minimum_count_env && *end == '\0') { + minimum_count = (size_t)value; + } + else { + rb_warn("invalid RUBY_SHARED_FIBER_POOL_MINIMUM_COUNT=%s (expected a non-negative integer)", minimum_count_env); + } + } -/* - * Document-method: SchedulerInterface#kernel_sleep - * call-seq: kernel_sleep(duration = nil) - * - * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide - * an implementation of sleeping in a non-blocking way. Implementation might - * register the current fiber in some list of "which fiber wait until what - * moment", call Fiber.yield to pass control, and then in #close resume - * the fibers whose wait period has elapsed. - * - */ -static VALUE -rb_fiber_scheduler_interface_kernel_sleep(VALUE self) -{ + return minimum_count; } -/* - * Document-method: SchedulerInterface#block - * call-seq: block(blocker, timeout = nil) - * - * Invoked by methods like Thread.join, and by Mutex, to signify that current - * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has - * elapsed. - * - * +blocker+ is what we are waiting on, informational only (for debugging and - * logging). There are no guarantee about its value. - * - * Expected to return boolean, specifying whether the blocking operation was - * successful or not. - */ -static VALUE -rb_fiber_scheduler_interface_block(VALUE self) +static size_t +shared_fiber_pool_maximum_count(void) { -} + size_t maximum_count = 0; -/* - * Document-method: SchedulerInterface#unblock - * call-seq: unblock(blocker, fiber) - * - * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock - * calls #block and Mutex#unlock calls #unblock). The scheduler should use - * the +fiber+ parameter to understand which fiber is unblocked. - * - * +blocker+ is what was awaited for, but it is informational only (for debugging - * and logging), and it is not guaranteed to be the same value as the +blocker+ for - * #block. - * - */ -static VALUE -rb_fiber_scheduler_interface_unblock(VALUE self) -{ -} + const char *maximum_count_env = getenv("RUBY_SHARED_FIBER_POOL_MAXIMUM_COUNT"); + if (maximum_count_env && maximum_count_env[0]) { + char *end; + unsigned long value = strtoul(maximum_count_env, &end, 10); + if (end != maximum_count_env && *end == '\0') { + maximum_count = (size_t)value; + } + else { + rb_warn("invalid RUBY_SHARED_FIBER_POOL_MAXIMUM_COUNT=%s (expected a non-negative integer)", maximum_count_env); + } + } -/* - * Document-method: SchedulerInterface#fiber - * call-seq: fiber(&block) - * - * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately - * run the given block of code in a separate non-blocking fiber, and to return that Fiber. - * - * Minimal suggested implementation is: - * - * def fiber(&block) - * fiber = Fiber.new(blocking: false, &block) - * fiber.resume - * fiber - * end - */ -static VALUE -rb_fiber_scheduler_interface_fiber(VALUE self) -{ + return maximum_count; } -#endif void Init_Cont(void) @@ -3069,25 +3651,45 @@ Init_Cont(void) #endif SET_MACHINE_STACK_END(&th->ec->machine.stack_end); - fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size); + rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); + + size_t minimum_count = shared_fiber_pool_minimum_count(); + size_t maximum_count = shared_fiber_pool_maximum_count(); + fiber_pool_initialize(&shared_fiber_pool, stack_size, minimum_count, maximum_count, vm_stack_size); fiber_initialize_keywords[0] = rb_intern_const("blocking"); fiber_initialize_keywords[1] = rb_intern_const("pool"); + fiber_initialize_keywords[2] = rb_intern_const("storage"); const char *fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks); + + if (shared_fiber_pool.free_stacks < 0) { + rb_warn("Setting RUBY_SHARED_FIBER_POOL_FREE_STACKS to a negative value is not allowed."); + shared_fiber_pool.free_stacks = 0; + } + + if (shared_fiber_pool.free_stacks > 1) { + rb_warn("Setting RUBY_SHARED_FIBER_POOL_FREE_STACKS to a value greater than 1 is operating system specific, and may cause crashes."); + } } rb_cFiber = rb_define_class("Fiber", rb_cObject); rb_define_alloc_func(rb_cFiber, fiber_alloc); - rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_singleton_method(rb_cFiber, "current", rb_fiber_s_current, 0); + rb_define_singleton_method(rb_cFiber, "blocking", rb_fiber_blocking, 0); + rb_define_singleton_method(rb_cFiber, "[]", rb_fiber_storage_aref, 1); + rb_define_singleton_method(rb_cFiber, "[]=", rb_fiber_storage_aset, 2); + rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); + rb_define_method(rb_cFiber, "storage", rb_fiber_storage_get, 0); + rb_define_method(rb_cFiber, "storage=", rb_fiber_storage_set, 1); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); - rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1); + rb_define_method(rb_cFiber, "raise", rb_fiber_m_raise, -1); + rb_define_method(rb_cFiber, "kill", rb_fiber_m_kill, 0); rb_define_method(rb_cFiber, "backtrace", rb_fiber_backtrace, -1); rb_define_method(rb_cFiber, "backtrace_locations", rb_fiber_backtrace_locations, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); @@ -3102,19 +3704,16 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "schedule", rb_fiber_s_schedule, -1); -#if 0 /* for RDoc */ - rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "SchedulerInterface", rb_cObject); - rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_interface_close, 0); - rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_interface_process_wait, 0); - rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_interface_io_wait, 0); - rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_interface_kernel_sleep, 0); - rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_interface_block, 0); - rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_interface_unblock, 0); - rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_interface_fiber, 0); -#endif + rb_thread_t *current_thread = rb_current_thread(); + RUBY_ASSERT(CLASS_OF(current_thread->ec->fiber_ptr->cont.self) == 0); + *(VALUE *)&((struct RBasic *)current_thread->ec->fiber_ptr->cont.self)->klass = rb_cFiber; #ifdef RB_EXPERIMENTAL_FIBER_POOL - rb_cFiberPool = rb_define_class("Pool", rb_cFiber); + /* + * Document-class: Fiber::Pool + * :nodoc: experimental + */ + rb_cFiberPool = rb_define_class_under(rb_cFiber, "Pool", rb_cObject); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); rb_define_method(rb_cFiberPool, "initialize", rb_fiber_pool_initialize, -1); #endif |
