diff options
Diffstat (limited to 'cont.c')
| -rw-r--r-- | cont.c | 527 |
1 files changed, 399 insertions, 128 deletions
@@ -30,6 +30,7 @@ extern int madvise(caddr_t, size_t, int); #include "internal/cont.h" #include "internal/thread.h" #include "internal/error.h" +#include "internal/eval.h" #include "internal/gc.h" #include "internal/proc.h" #include "internal/sanitizers.h" @@ -40,14 +41,20 @@ extern int madvise(caddr_t, size_t, int); #include "vm_sync.h" #include "id_table.h" #include "ractor_core.h" +#include "zjit.h" -static const int DEBUG = 0; +enum { + DEBUG = 0, + DEBUG_EXPAND = 0, + DEBUG_ACQUIRE = 0, +}; #define RB_PAGE_SIZE (pagesize) #define RB_PAGE_MASK (~(RB_PAGE_SIZE - 1)) static long pagesize; -static const rb_data_type_t cont_data_type, fiber_data_type; +static const rb_data_type_t rb_cont_data_type; +static const rb_data_type_t rb_fiber_data_type; static VALUE rb_cContinuation; static VALUE rb_cFiber; static VALUE rb_eFiberError; @@ -60,11 +67,11 @@ static VALUE rb_cFiberPool; // Defined in `coroutine/$arch/Context.h`: #ifdef COROUTINE_LIMITED_ADDRESS_SPACE #define FIBER_POOL_ALLOCATION_FREE -#define FIBER_POOL_INITIAL_SIZE 8 -#define FIBER_POOL_ALLOCATION_MAXIMUM_SIZE 32 +#define FIBER_POOL_MINIMUM_COUNT 8 +#define FIBER_POOL_MAXIMUM_ALLOCATIONS 32 #else -#define FIBER_POOL_INITIAL_SIZE 32 -#define FIBER_POOL_ALLOCATION_MAXIMUM_SIZE 1024 +#define FIBER_POOL_MINIMUM_COUNT 32 +#define FIBER_POOL_MAXIMUM_ALLOCATIONS 1024 #endif #ifdef RB_EXPERIMENTAL_FIBER_POOL #define FIBER_POOL_ALLOCATION_FREE @@ -77,6 +84,7 @@ enum context_type { struct cont_saved_vm_stack { VALUE *ptr; + size_t size; #ifdef CAPTURE_JUST_VALID_VM_STACK size_t slen; /* length of stack (head of ec->vm_stack) */ size_t clen; /* length of control frames (tail of ec->vm_stack) */ @@ -186,7 +194,11 @@ struct fiber_pool { size_t count; // The initial number of stacks to allocate. - size_t initial_count; + size_t minimum_count; + + // If positive, total stacks in this pool cannot exceed this (shared pool only: + // set via RUBY_SHARED_FIBER_POOL_MAXIMUM_COUNT). Expansion fails with errno EAGAIN. + size_t maximum_count; // Whether to madvise(free) the stack or not. // If this value is set to 1, the stack will be madvise(free)ed @@ -279,7 +291,7 @@ rb_free_shared_fiber_pool(void) struct fiber_pool_allocation *allocations = shared_fiber_pool.allocations; while (allocations) { struct fiber_pool_allocation *next = allocations->next; - xfree(allocations); + SIZED_FREE(allocations); allocations = next; } } @@ -467,11 +479,12 @@ fiber_pool_allocate_memory(size_t * count, size_t stride) // the system would allow (e.g. overcommit * physical memory + swap), we // divide count by two and try again. This condition should only be // encountered in edge cases, but we handle it here gracefully. - while (*count > 1) { + while (*count) { #if defined(_WIN32) void * base = VirtualAlloc(0, (*count)*stride, MEM_COMMIT, PAGE_READWRITE); if (!base) { + errno = rb_w32_map_errno(GetLastError()); *count = (*count) >> 1; } else { @@ -503,26 +516,53 @@ fiber_pool_allocate_memory(size_t * count, size_t stride) } // Given an existing fiber pool, expand it by the specified number of stacks. +// // @param count the maximum number of stacks to allocate. -// @return the allocated fiber pool. +// @return the new allocation on success, or NULL on failure with errno set. +// @raise NoMemoryError if the struct or memory allocation fails. +// +// Call from fiber_pool_stack_acquire_expand with VM lock held, or from +// fiber_pool_initialize before the pool is shared across threads. // @sa fiber_pool_allocation_free static struct fiber_pool_allocation * fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) { + if (count == 0) { + errno = EAGAIN; + return NULL; + } + STACK_GROW_DIR_DETECTION; size_t size = fiber_pool->size; size_t stride = size + RB_PAGE_SIZE; + // If the maximum number of stacks is set, and we have reached it, return NULL. + if (fiber_pool->maximum_count > 0) { + if (fiber_pool->count >= fiber_pool->maximum_count) { + errno = EAGAIN; + return NULL; + } + size_t remaining = fiber_pool->maximum_count - fiber_pool->count; + if (count > remaining) { + count = remaining; + } + } + + // Allocate metadata before mmap: ruby_xmalloc (RB_ALLOC) raises on failure and + // must not run after base is mapped, or the region would leak. + struct fiber_pool_allocation * allocation = RB_ALLOC(struct fiber_pool_allocation); + // Allocate the memory required for the stacks: void * base = fiber_pool_allocate_memory(&count, stride); if (base == NULL) { - rb_raise(rb_eFiberError, "can't alloc machine stack to fiber (%"PRIuSIZE" x %"PRIuSIZE" bytes): %s", count, size, ERRNOMSG); + if (!errno) errno = ENOMEM; + ruby_xfree(allocation); + return NULL; } struct fiber_pool_vacancy * vacancies = fiber_pool->vacancies; - struct fiber_pool_allocation * allocation = RB_ALLOC(struct fiber_pool_allocation); // Initialize fiber pool allocation: allocation->base = base; @@ -534,7 +574,7 @@ fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) #endif allocation->pool = fiber_pool; - if (DEBUG) { + if (DEBUG_EXPAND) { fprintf(stderr, "fiber_pool_expand(%"PRIuSIZE"): %p, %"PRIuSIZE"/%"PRIuSIZE" x [%"PRIuSIZE":%"PRIuSIZE"]\n", count, (void*)fiber_pool, fiber_pool->used, fiber_pool->count, size, fiber_pool->vm_stack_size); } @@ -543,21 +583,27 @@ fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) for (size_t i = 0; i < count; i += 1) { void * base = (char*)allocation->base + (stride * i); void * page = (char*)base + STACK_DIR_UPPER(size, 0); - #if defined(_WIN32) DWORD old_protect; if (!VirtualProtect(page, RB_PAGE_SIZE, PAGE_READWRITE | PAGE_GUARD, &old_protect)) { + int error = rb_w32_map_errno(GetLastError()); VirtualFree(allocation->base, 0, MEM_RELEASE); - rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); + ruby_xfree(allocation); + errno = error; + return NULL; } #elif defined(__wasi__) // wasi-libc's mprotect emulation doesn't support PROT_NONE. (void)page; #else if (mprotect(page, RB_PAGE_SIZE, PROT_NONE) < 0) { + int error = errno; + if (!error) error = ENOMEM; munmap(allocation->base, count*stride); - rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); + ruby_xfree(allocation); + errno = error; + return NULL; } #endif @@ -593,7 +639,7 @@ fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) // Initialize the specified fiber pool with the given number of stacks. // @param vm_stack_size The size of the vm stack to allocate. static void -fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t count, size_t vm_stack_size) +fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t minimum_count, size_t maximum_count, size_t vm_stack_size) { VM_ASSERT(vm_stack_size < size); @@ -601,13 +647,17 @@ fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t count, fiber_pool->vacancies = NULL; fiber_pool->size = ((size / RB_PAGE_SIZE) + 1) * RB_PAGE_SIZE; fiber_pool->count = 0; - fiber_pool->initial_count = count; + fiber_pool->minimum_count = minimum_count; + fiber_pool->maximum_count = maximum_count; fiber_pool->free_stacks = 1; fiber_pool->used = 0; - fiber_pool->vm_stack_size = vm_stack_size; - fiber_pool_expand(fiber_pool, count); + if (fiber_pool->minimum_count > 0) { + if (RB_UNLIKELY(!fiber_pool_expand(fiber_pool, fiber_pool->minimum_count))) { + rb_raise(rb_eFiberError, "can't allocate initial fiber stacks (%"PRIuSIZE" x %"PRIuSIZE" bytes): %s", fiber_pool->minimum_count, fiber_pool->size, strerror(errno)); + } + } } #ifdef FIBER_POOL_ALLOCATION_FREE @@ -651,49 +701,120 @@ fiber_pool_allocation_free(struct fiber_pool_allocation * allocation) allocation->pool->count -= allocation->count; - ruby_xfree(allocation); + SIZED_FREE(allocation); } #endif -// Acquire a stack from the given fiber pool. If none are available, allocate more. -static struct fiber_pool_stack -fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) +// Number of stacks to request when expanding the pool (clamped to min/max). +static size_t +fiber_pool_stack_expand_count(const struct fiber_pool *pool) { - struct fiber_pool_vacancy * vacancy = fiber_pool_vacancy_pop(fiber_pool); + const size_t maximum_allocations = FIBER_POOL_MAXIMUM_ALLOCATIONS; + const size_t minimum_count = FIBER_POOL_MINIMUM_COUNT; - if (DEBUG) fprintf(stderr, "fiber_pool_stack_acquire: %p used=%"PRIuSIZE"\n", (void*)fiber_pool->vacancies, fiber_pool->used); + // We are going try and double the number of stacks in the pool: + size_t count = pool->count; + if (count > maximum_allocations) count = maximum_allocations; + if (count < minimum_count) count = minimum_count; - if (!vacancy) { - const size_t maximum = FIBER_POOL_ALLOCATION_MAXIMUM_SIZE; - const size_t minimum = fiber_pool->initial_count; + // If we have a maximum count, we need to clamp the number of stacks to the maximum: + if (pool->maximum_count > 0) { + if (pool->count >= pool->maximum_count) { + // No expansion is possible: + return 0; + } + + // Otherwise, compute the number of stacks we can allocate to bring us to the maximum: + size_t remaining = pool->maximum_count - pool->count; + if (count > remaining) { + count = remaining; + } + } - size_t count = fiber_pool->count; - if (count > maximum) count = maximum; - if (count < minimum) count = minimum; + return count; +} - fiber_pool_expand(fiber_pool, count); +// When the vacancy list is empty, grow the pool (and run GC only if mmap fails). Caller holds the VM lock. +// Returns NULL if expansion failed after GC + retry; errno is set. Otherwise returns a vacancy. +static struct fiber_pool_vacancy * +fiber_pool_stack_acquire_expand(struct fiber_pool *fiber_pool) +{ + size_t count = fiber_pool_stack_expand_count(fiber_pool); + + if (DEBUG_ACQUIRE) fprintf(stderr, "fiber_pool_stack_acquire: expanding fiber pool by %"PRIuSIZE" stacks\n", count); + + struct fiber_pool_vacancy *vacancy = NULL; + + if (RB_LIKELY(fiber_pool_expand(fiber_pool, count))) { + return fiber_pool_vacancy_pop(fiber_pool); + } + else { + if (DEBUG_ACQUIRE) fprintf(stderr, "fiber_pool_stack_acquire: expand failed (%s), collecting garbage\n", strerror(errno)); - // The free list should now contain some stacks: - VM_ASSERT(fiber_pool->vacancies); + rb_gc(); + // After running GC, the vacancy list may have some stacks: vacancy = fiber_pool_vacancy_pop(fiber_pool); + if (RB_LIKELY(vacancy)) { + return vacancy; + } + + // Recompute count as gc may have freed up some allocations: + count = fiber_pool_stack_expand_count(fiber_pool); + + // Try to expand the fiber pool again: + if (RB_LIKELY(fiber_pool_expand(fiber_pool, count))) { + return fiber_pool_vacancy_pop(fiber_pool); + } + else { + // Okay, we really failed to acquire a stack. Give up and return NULL with errno set: + return NULL; + } } +} + +// Acquire a stack from the given fiber pool. If none are available, allocate more. +static struct fiber_pool_stack +fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) +{ + struct fiber_pool_vacancy * vacancy; + + unsigned int lev; + RB_VM_LOCK_ENTER_LEV(&lev); + { + // Fast path: try to acquire a stack from the vacancy list: + vacancy = fiber_pool_vacancy_pop(fiber_pool); + + if (DEBUG) fprintf(stderr, "fiber_pool_stack_acquire: %p used=%"PRIuSIZE"\n", (void*)fiber_pool->vacancies, fiber_pool->used); - VM_ASSERT(vacancy); - VM_ASSERT(vacancy->stack.base); + // Slow path: If the pool has no vacancies, expand first. Only run GC when expansion fails (e.g. mmap), so we can reclaim stacks from dead fibers before retrying: + if (RB_UNLIKELY(!vacancy)) { + vacancy = fiber_pool_stack_acquire_expand(fiber_pool); + + // If expansion failed, raise an error: + if (RB_UNLIKELY(!vacancy)) { + RB_VM_LOCK_LEAVE_LEV(&lev); + rb_raise(rb_eFiberError, "can't allocate fiber stack: %s", strerror(errno)); + } + } + + VM_ASSERT(vacancy); + VM_ASSERT(vacancy->stack.base); #if defined(COROUTINE_SANITIZE_ADDRESS) - __asan_unpoison_memory_region(fiber_pool_stack_poison_base(&vacancy->stack), fiber_pool_stack_poison_size(&vacancy->stack)); + __asan_unpoison_memory_region(fiber_pool_stack_poison_base(&vacancy->stack), fiber_pool_stack_poison_size(&vacancy->stack)); #endif - // Take the top item from the free list: - fiber_pool->used += 1; + // Take the top item from the free list: + fiber_pool->used += 1; #ifdef FIBER_POOL_ALLOCATION_FREE - vacancy->stack.allocation->used += 1; + vacancy->stack.allocation->used += 1; #endif - fiber_pool_stack_reset(&vacancy->stack); + fiber_pool_stack_reset(&vacancy->stack); + } + RB_VM_LOCK_LEAVE_LEV(&lev); return vacancy->stack; } @@ -910,6 +1031,17 @@ fiber_stack_release(rb_fiber_t * fiber) rb_ec_clear_vm_stack(ec); } +static void +fiber_stack_release_locked(rb_fiber_t *fiber) +{ + if (!ruby_vm_during_cleanup) { + // We can't try to acquire the VM lock here because MMTK calls free in its own native thread which has no ec. + // This assertion will fail on MMTK but we currently don't have CI for debug releases of MMTK, so we can assert for now. + ASSERT_vm_locking_with_barrier(); + } + fiber_stack_release(fiber); +} + static const char * fiber_status_name(enum fiber_status s) { @@ -931,7 +1063,9 @@ fiber_verify(const rb_fiber_t *fiber) switch (fiber->status) { case FIBER_RESUMED: - VM_ASSERT(fiber->cont.saved_ec.vm_stack != NULL); + if (fiber->cont.saved_ec.thread_ptr->self == 0) { + VM_ASSERT(fiber->cont.saved_ec.vm_stack != NULL); + } break; case FIBER_SUSPENDED: VM_ASSERT(fiber->cont.saved_ec.vm_stack != NULL); @@ -961,7 +1095,7 @@ cont_ptr(VALUE obj) { rb_context_t *cont; - TypedData_Get_Struct(obj, rb_context_t, &cont_data_type, cont); + TypedData_Get_Struct(obj, rb_context_t, &rb_cont_data_type, cont); return cont; } @@ -971,7 +1105,7 @@ fiber_ptr(VALUE obj) { rb_fiber_t *fiber; - TypedData_Get_Struct(obj, rb_fiber_t, &fiber_data_type, fiber); + TypedData_Get_Struct(obj, rb_fiber_t, &rb_fiber_data_type, fiber); if (!fiber) rb_raise(rb_eFiberError, "uninitialized fiber"); return fiber; @@ -1064,21 +1198,26 @@ cont_free(void *ptr) RUBY_FREE_ENTER("cont"); if (cont->type == CONTINUATION_CONTEXT) { - ruby_xfree(cont->saved_ec.vm_stack); - RUBY_FREE_UNLESS_NULL(cont->machine.stack); + SIZED_FREE_N(cont->saved_ec.vm_stack, cont->saved_ec.vm_stack_size); + SIZED_FREE_N(cont->machine.stack, cont->machine.stack_size); } else { rb_fiber_t *fiber = (rb_fiber_t*)cont; coroutine_destroy(&fiber->context); - fiber_stack_release(fiber); + fiber_stack_release_locked(fiber); } - RUBY_FREE_UNLESS_NULL(cont->saved_vm_stack.ptr); + SIZED_FREE_N(cont->saved_vm_stack.ptr, cont->saved_vm_stack.size); VM_ASSERT(cont->jit_cont != NULL); jit_cont_free(cont->jit_cont); /* free rb_cont_t or rb_fiber_t */ - ruby_xfree(ptr); + if (cont->type == CONTINUATION_CONTEXT) { + SIZED_FREE(cont); + } + else { + SIZED_FREE((rb_fiber_t *)cont); + } RUBY_FREE_LEAVE("cont"); } @@ -1119,12 +1258,7 @@ rb_fiber_update_self(rb_fiber_t *fiber) void rb_fiber_mark_self(const rb_fiber_t *fiber) { - if (fiber->cont.self) { - rb_gc_mark_movable(fiber->cont.self); - } - else { - rb_execution_context_mark(&fiber->cont.saved_ec); - } + rb_gc_mark_movable(fiber->cont.self); } static void @@ -1190,12 +1324,13 @@ fiber_memsize(const void *ptr) VALUE rb_obj_is_fiber(VALUE obj) { - return RBOOL(rb_typeddata_is_kind_of(obj, &fiber_data_type)); + return RBOOL(rb_typeddata_is_kind_of(obj, &rb_fiber_data_type)); } static void cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) { + const size_t old_stack_size = cont->machine.stack_size; size_t size; SET_MACHINE_STACK_END(&th->ec->machine.stack_end); @@ -1210,10 +1345,10 @@ cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) } if (cont->machine.stack) { - REALLOC_N(cont->machine.stack, VALUE, size); + SIZED_REALLOC_N(cont->machine.stack, VALUE, cont->machine.stack_size, old_stack_size); } else { - cont->machine.stack = ALLOC_N(VALUE, size); + cont->machine.stack = ALLOC_N(VALUE, cont->machine.stack_size); } FLUSH_REGISTER_WINDOWS; @@ -1221,9 +1356,23 @@ cont_save_machine_stack(rb_thread_t *th, rb_context_t *cont) MEMCPY(cont->machine.stack, cont->machine.stack_src, VALUE, size); } -static const rb_data_type_t cont_data_type = { +static void +cont_handle_weak_references(void *ptr) +{ + rb_context_t *cont = ptr; + + if (!cont) return; + + if (!rb_gc_handle_weak_references_alive_p(cont->saved_ec.gen_fields_cache.obj) || + !rb_gc_handle_weak_references_alive_p(cont->saved_ec.gen_fields_cache.fields_obj)) { + cont->saved_ec.gen_fields_cache.obj = Qundef; + cont->saved_ec.gen_fields_cache.fields_obj = Qundef; + } +} + +static const rb_data_type_t rb_cont_data_type = { "continuation", - {cont_mark, cont_free, cont_memsize, cont_compact}, + {cont_mark, cont_free, cont_memsize, cont_compact, cont_handle_weak_references}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; @@ -1254,7 +1403,7 @@ jit_cont_new(rb_execution_context_t *ec) // We need to use calloc instead of something like ZALLOC to avoid triggering GC here. // When this function is called from rb_thread_alloc through rb_threadptr_root_fiber_setup, // the thread is still being prepared and marking it causes SEGV. - cont = calloc(1, sizeof(struct rb_jit_cont)); + cont = ruby_mimcalloc(1, sizeof(struct rb_jit_cont)); if (cont == NULL) rb_memerror(); cont->ec = ec; @@ -1293,7 +1442,7 @@ jit_cont_free(struct rb_jit_cont *cont) } rb_native_mutex_unlock(&jit_cont_lock); - free(cont); + ruby_mimfree(cont); } // Call a given callback against all on-stack ISEQs. @@ -1307,8 +1456,11 @@ rb_jit_cont_each_iseq(rb_iseq_callback callback, void *data) const rb_control_frame_t *cfp = cont->ec->cfp; while (!RUBY_VM_CONTROL_FRAME_STACK_OVERFLOW_P(cont->ec, cfp)) { - if (cfp->pc && cfp->iseq && imemo_type((VALUE)cfp->iseq) == imemo_iseq) { - callback(cfp->iseq, data); + if (CFP_PC(cfp) && CFP_ISEQ(cfp)) { + const rb_iseq_t *iseq = CFP_ISEQ(cfp); + if (iseq && imemo_type((VALUE)iseq) == imemo_iseq) { + callback(iseq, data); + } } cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); } @@ -1344,7 +1496,7 @@ rb_jit_cont_finish(void) struct rb_jit_cont *cont, *next; for (cont = first_jit_cont; cont != NULL; cont = next) { next = cont->next; - free(cont); // Don't use xfree because it's allocated by calloc. + ruby_mimfree(cont); // Don't use xfree because it's allocated by mimcalloc. } rb_native_mutex_destroy(&jit_cont_lock); } @@ -1383,7 +1535,8 @@ cont_new(VALUE klass) rb_thread_t *th = GET_THREAD(); THREAD_MUST_BE_RUNNING(th); - contval = TypedData_Make_Struct(klass, rb_context_t, &cont_data_type, cont); + contval = TypedData_Make_Struct(klass, rb_context_t, &rb_cont_data_type, cont); + rb_gc_declare_weak_references(contval); cont->self = contval; cont_init(cont, th); return cont; @@ -1427,8 +1580,8 @@ show_vm_pcs(const rb_control_frame_t *cfp, int i=0; while (cfp != end_of_cfp) { int pc = 0; - if (cfp->iseq) { - pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded; + if (CFP_ISEQ(cfp)) { + pc = cfp->pc - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded; } fprintf(stderr, "%2d pc: %d\n", i++, pc); cfp = RUBY_VM_PREVIOUS_CONTROL_FRAME(cfp); @@ -1452,6 +1605,7 @@ cont_capture(volatile int *volatile stat) #ifdef CAPTURE_JUST_VALID_VM_STACK cont->saved_vm_stack.slen = ec->cfp->sp - ec->vm_stack; cont->saved_vm_stack.clen = ec->vm_stack + ec->vm_stack_size - (VALUE*)ec->cfp; + cont->saved_vm_stack.size = cont->saved_vm_stack.slen + cont->saved_vm_stack.clen; cont->saved_vm_stack.ptr = ALLOC_N(VALUE, cont->saved_vm_stack.slen + cont->saved_vm_stack.clen); MEMCPY(cont->saved_vm_stack.ptr, ec->vm_stack, @@ -1461,6 +1615,7 @@ cont_capture(volatile int *volatile stat) VALUE, cont->saved_vm_stack.clen); #else + cont->saved_vm_stack.size = ec->vm_stack_size; cont->saved_vm_stack.ptr = ALLOC_N(VALUE, ec->vm_stack_size); MEMCPY(cont->saved_vm_stack.ptr, ec->vm_stack, VALUE, ec->vm_stack_size); #endif @@ -1511,6 +1666,51 @@ cont_restore_thread(rb_context_t *cont) rb_raise(rb_eRuntimeError, "can't call across trace_func"); } +#if defined(__wasm__) && !defined(__EMSCRIPTEN__) + if (th->ec->tag != sec->tag) { + /* find the lowest common ancestor tag of the current EC and the saved EC */ + + struct rb_vm_tag *lowest_common_ancestor = NULL; + size_t num_tags = 0; + size_t num_saved_tags = 0; + for (struct rb_vm_tag *tag = th->ec->tag; tag != NULL; tag = tag->prev) { + ++num_tags; + } + for (struct rb_vm_tag *tag = sec->tag; tag != NULL; tag = tag->prev) { + ++num_saved_tags; + } + + size_t min_tags = num_tags <= num_saved_tags ? num_tags : num_saved_tags; + + struct rb_vm_tag *tag = th->ec->tag; + while (num_tags > min_tags) { + tag = tag->prev; + --num_tags; + } + + struct rb_vm_tag *saved_tag = sec->tag; + while (num_saved_tags > min_tags) { + saved_tag = saved_tag->prev; + --num_saved_tags; + } + + while (min_tags > 0) { + if (tag == saved_tag) { + lowest_common_ancestor = tag; + break; + } + tag = tag->prev; + saved_tag = saved_tag->prev; + --min_tags; + } + + /* free all the jump buffers between the current EC's tag and the lowest common ancestor tag */ + for (struct rb_vm_tag *tag = th->ec->tag; tag != lowest_common_ancestor; tag = tag->prev) { + rb_vm_tag_jmpbuf_deinit(&tag->buf); + } + } +#endif + /* copy vm stack */ #ifdef CAPTURE_JUST_VALID_VM_STACK MEMCPY(th->ec->vm_stack, @@ -1917,16 +2117,38 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) * */ -static const rb_data_type_t fiber_data_type = { +static void +fiber_handle_weak_references(void *ptr) +{ + rb_fiber_t *fiber = ptr; + + if (!fiber) return; + + if (!rb_gc_handle_weak_references_alive_p(fiber->cont.saved_ec.gen_fields_cache.obj) || + !rb_gc_handle_weak_references_alive_p(fiber->cont.saved_ec.gen_fields_cache.fields_obj)) { + fiber->cont.saved_ec.gen_fields_cache.obj = Qundef; + fiber->cont.saved_ec.gen_fields_cache.fields_obj = Qundef; + } +} + +static const rb_data_type_t rb_fiber_data_type = { "fiber", - {fiber_mark, fiber_free, fiber_memsize, fiber_compact,}, + {fiber_mark, fiber_free, fiber_memsize, fiber_compact, fiber_handle_weak_references}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; static VALUE fiber_alloc(VALUE klass) { - return TypedData_Wrap_Struct(klass, &fiber_data_type, 0); + VALUE obj = TypedData_Wrap_Struct(klass, &rb_fiber_data_type, 0); + rb_gc_declare_weak_references(obj); + return obj; +} + +static rb_serial_t +next_ec_serial(rb_ractor_t *cr) +{ + return cr->next_ec_serial++; } static rb_fiber_t* @@ -1948,6 +2170,7 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; + fiber->cont.saved_ec.serial = next_ec_serial(th->ractor); rb_ec_clear_vm_stack(&fiber->cont.saved_ec); fiber->prev = NULL; @@ -1961,32 +2184,10 @@ fiber_t_alloc(VALUE fiber_value, unsigned int blocking) return fiber; } -static rb_fiber_t * -root_fiber_alloc(rb_thread_t *th) -{ - VALUE fiber_value = fiber_alloc(rb_cFiber); - rb_fiber_t *fiber = th->ec->fiber_ptr; - - VM_ASSERT(DATA_PTR(fiber_value) == NULL); - VM_ASSERT(fiber->cont.type == FIBER_CONTEXT); - VM_ASSERT(FIBER_RESUMED_P(fiber)); - - th->root_fiber = fiber; - DATA_PTR(fiber_value) = fiber; - fiber->cont.self = fiber_value; - - coroutine_initialize_main(&fiber->context); - - return fiber; -} - static inline rb_fiber_t* fiber_current(void) { rb_execution_context_t *ec = GET_EC(); - if (ec->fiber_ptr->cont.self == 0) { - root_fiber_alloc(rb_ec_thread_ptr(ec)); - } return ec->fiber_ptr; } @@ -2119,7 +2320,7 @@ rb_fiber_storage_set(VALUE self, VALUE value) * Returns the value of the fiber storage variable identified by +key+. * * The +key+ must be a symbol, and the value is set by Fiber#[]= or - * Fiber#store. + * Fiber#storage. * * See also Fiber::[]=. */ @@ -2488,21 +2689,36 @@ rb_fiber_start(rb_fiber_t *fiber) void rb_threadptr_root_fiber_setup(rb_thread_t *th) { - rb_fiber_t *fiber = ruby_mimcalloc(1, sizeof(rb_fiber_t)); + rb_fiber_t *fiber = ZALLOC(rb_fiber_t); if (!fiber) { rb_bug("%s", strerror(errno)); /* ... is it possible to call rb_bug here? */ } + fiber->cont.type = FIBER_CONTEXT; fiber->cont.saved_ec.fiber_ptr = fiber; + fiber->cont.saved_ec.serial = next_ec_serial(th->ractor); fiber->cont.saved_ec.thread_ptr = th; fiber->blocking = 1; fiber->killed = 0; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ + + coroutine_initialize_main(&fiber->context); + th->ec = &fiber->cont.saved_ec; + cont_init_jit_cont(&fiber->cont); } void +rb_root_fiber_obj_setup(rb_thread_t *th) +{ + rb_fiber_t *fiber = th->ec->fiber_ptr; + VALUE fiber_value = fiber_alloc(rb_cFiber); + DATA_PTR(fiber_value) = fiber; + fiber->cont.self = fiber_value; +} + +void rb_threadptr_root_fiber_release(rb_thread_t *th) { if (th->root_fiber) { @@ -2572,15 +2788,7 @@ rb_fiber_current(void) static inline void fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) { - rb_fiber_t *fiber; - - if (th->ec->fiber_ptr != NULL) { - fiber = th->ec->fiber_ptr; - } - else { - /* create root fiber */ - fiber = root_fiber_alloc(th); - } + rb_fiber_t *fiber = th->ec->fiber_ptr; if (FIBER_CREATED_P(next_fiber)) { fiber_prepare_stack(next_fiber); @@ -2616,7 +2824,9 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fi rb_thread_t *th = GET_THREAD(); /* make sure the root_fiber object is available */ - if (th->root_fiber == NULL) root_fiber_alloc(th); + if (th->root_fiber == NULL) { + th->root_fiber = th->ec->fiber_ptr; + } if (th->ec->fiber_ptr == fiber) { /* ignore fiber context switch @@ -2681,8 +2891,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, rb_fi // We cannot free the stack until the pthread is joined: #ifndef COROUTINE_PTHREAD_CONTEXT - if (resuming_fiber && FIBER_TERMINATED_P(fiber)) { - fiber_stack_release(fiber); + if (FIBER_TERMINATED_P(fiber)) { + RB_VM_LOCKING() { + fiber_stack_release(fiber); + } } #endif @@ -2822,6 +3034,7 @@ void rb_fiber_close(rb_fiber_t *fiber) { fiber_status_set(fiber, FIBER_TERMINATED); + rb_ec_close(&fiber->cont.saved_ec); } static void @@ -3160,37 +3373,46 @@ fiber_raise(rb_fiber_t *fiber, VALUE exception) } VALUE -rb_fiber_raise(VALUE fiber, int argc, const VALUE *argv) +rb_fiber_raise(VALUE fiber, int argc, VALUE *argv) { - VALUE exception = rb_make_exception(argc, argv); + VALUE exception = rb_exception_setup(argc, argv); return fiber_raise(fiber_ptr(fiber), exception); } /* * call-seq: - * fiber.raise -> obj - * fiber.raise(string) -> obj - * fiber.raise(exception [, string [, array]]) -> obj + * raise(exception, message = exception.to_s, backtrace = nil, cause: $!) + * raise(message = nil, cause: $!) * * Raises an exception in the fiber at the point at which the last - * +Fiber.yield+ was called. If the fiber has not been started or has + * +Fiber.yield+ was called. + * + * f = Fiber.new { + * puts "Before the yield" + * Fiber.yield 1 # -- exception will be raised here + * puts "After the yield" + * } + * + * p f.resume + * f.raise "Gotcha" + * + * Output + * + * Before the first yield + * 1 + * t.rb:8:in 'Fiber.yield': Gotcha (RuntimeError) + * from t.rb:8:in 'block in <main>' + * + * If the fiber has not been started or has * already run to completion, raises +FiberError+. If the fiber is * yielding, it is resumed. If it is transferring, it is transferred into. * But if it is resuming, raises +FiberError+. * - * With no arguments, raises a +RuntimeError+. With a single +String+ - * argument, raises a +RuntimeError+ with the string as a message. Otherwise, - * the first parameter should be the name of an +Exception+ class (or an - * object that returns an +Exception+ object when sent an +exception+ - * message). The optional second parameter sets the message associated with - * the exception, and the third parameter is an array of callback information. - * Exceptions are caught by the +rescue+ clause of <code>begin...end</code> - * blocks. - * * Raises +FiberError+ if called on a Fiber belonging to another +Thread+. * - * See Kernel#raise for more information. + * See Kernel#raise for more information on arguments. + * */ static VALUE rb_fiber_m_raise(int argc, VALUE *argv, VALUE self) @@ -3285,6 +3507,8 @@ rb_fiber_atfork(rb_thread_t *th) th->root_fiber = th->ec->fiber_ptr; } th->root_fiber->prev = 0; + th->root_fiber->blocking = 1; + th->blocking = 1; } } #endif @@ -3297,7 +3521,7 @@ fiber_pool_free(void *ptr) RUBY_FREE_ENTER("fiber_pool"); fiber_pool_allocation_free(fiber_pool->allocations); - ruby_xfree(fiber_pool); + SIZED_FREE(fiber_pool); RUBY_FREE_LEAVE("fiber_pool"); } @@ -3351,7 +3575,7 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) TypedData_Get_Struct(self, struct fiber_pool, &FiberPoolDataType, fiber_pool); - fiber_pool_initialize(fiber_pool, NUM2SIZET(size), NUM2SIZET(count), NUM2SIZET(vm_stack_size)); + fiber_pool_initialize(fiber_pool, NUM2SIZET(size), NUM2SIZET(count), 0, NUM2SIZET(vm_stack_size)); return self; } @@ -3370,6 +3594,46 @@ rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) * fiber.resume #=> FiberError: dead fiber called */ +static size_t +shared_fiber_pool_minimum_count(void) +{ + size_t minimum_count = FIBER_POOL_MINIMUM_COUNT; + + const char *minimum_count_env = getenv("RUBY_SHARED_FIBER_POOL_MINIMUM_COUNT"); + if (minimum_count_env && minimum_count_env[0]) { + char *end; + unsigned long value = strtoul(minimum_count_env, &end, 10); + if (end != minimum_count_env && *end == '\0') { + minimum_count = (size_t)value; + } + else { + rb_warn("invalid RUBY_SHARED_FIBER_POOL_MINIMUM_COUNT=%s (expected a non-negative integer)", minimum_count_env); + } + } + + return minimum_count; +} + +static size_t +shared_fiber_pool_maximum_count(void) +{ + size_t maximum_count = 0; + + const char *maximum_count_env = getenv("RUBY_SHARED_FIBER_POOL_MAXIMUM_COUNT"); + if (maximum_count_env && maximum_count_env[0]) { + char *end; + unsigned long value = strtoul(maximum_count_env, &end, 10); + if (end != maximum_count_env && *end == '\0') { + maximum_count = (size_t)value; + } + else { + rb_warn("invalid RUBY_SHARED_FIBER_POOL_MAXIMUM_COUNT=%s (expected a non-negative integer)", maximum_count_env); + } + } + + return maximum_count; +} + void Init_Cont(void) { @@ -3387,7 +3651,11 @@ Init_Cont(void) #endif SET_MACHINE_STACK_END(&th->ec->machine.stack_end); - fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size); + rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); + + size_t minimum_count = shared_fiber_pool_minimum_count(); + size_t maximum_count = shared_fiber_pool_maximum_count(); + fiber_pool_initialize(&shared_fiber_pool, stack_size, minimum_count, maximum_count, vm_stack_size); fiber_initialize_keywords[0] = rb_intern_const("blocking"); fiber_initialize_keywords[1] = rb_intern_const("pool"); @@ -3409,7 +3677,6 @@ Init_Cont(void) rb_cFiber = rb_define_class("Fiber", rb_cObject); rb_define_alloc_func(rb_cFiber, fiber_alloc); - rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_singleton_method(rb_cFiber, "current", rb_fiber_s_current, 0); rb_define_singleton_method(rb_cFiber, "blocking", rb_fiber_blocking, 0); @@ -3437,6 +3704,10 @@ Init_Cont(void) rb_define_singleton_method(rb_cFiber, "schedule", rb_fiber_s_schedule, -1); + rb_thread_t *current_thread = rb_current_thread(); + RUBY_ASSERT(CLASS_OF(current_thread->ec->fiber_ptr->cont.self) == 0); + *(VALUE *)&((struct RBasic *)current_thread->ec->fiber_ptr->cont.self)->klass = rb_cFiber; + #ifdef RB_EXPERIMENTAL_FIBER_POOL /* * Document-class: Fiber::Pool |
