From 14cf95cff35612c6238790ad2f605530f69e9a44 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 2 Jun 2019 12:49:58 +1200 Subject: Implement fiber pool for reduced fiber allocation overhead. Replace previous stack cache with fiber pool cache. The fiber pool allocates many stacks in a single memory region. Stack allocation becomes O(log N) and fiber creation is amortized O(1). Around 10x performance improvement was measured in micro-benchmarks. --- cont.c | 700 +++++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 508 insertions(+), 192 deletions(-) (limited to 'cont.c') diff --git a/cont.c b/cont.c index 5e72c24b96..51f0a3a3d7 100644 --- a/cont.c +++ b/cont.c @@ -24,10 +24,20 @@ #include #endif +static const int DEBUG = 0; + #define RB_PAGE_SIZE (pagesize) #define RB_PAGE_MASK (~(RB_PAGE_SIZE - 1)) static long pagesize; +static const rb_data_type_t cont_data_type, fiber_data_type; +static VALUE rb_cContinuation; +static VALUE rb_cFiber; +static VALUE rb_eFiberError; +#ifdef RB_EXPERIMENTAL_FIBER_POOL +static VALUE rb_cFiberPool; +#endif + #define CAPTURE_JUST_VALID_VM_STACK 1 enum context_type { @@ -43,6 +53,99 @@ struct cont_saved_vm_stack { #endif }; +struct fiber_pool; + +// Represents a single stack. +struct fiber_pool_stack { + // A pointer to the memory allocation (lowest address) for the stack. + void * base; + + // The current stack pointer, taking into account the direction of the stack. + void * current; + + // The size of the stack including any guard pages. + size_t size; + + // The available stack capacity w.r.t. the current stack offset. + size_t available; + + // The pool this stack is managed by. + struct fiber_pool * pool; +}; + +// A singly linked list of vacant (unused) stacks. +// This structure is stored in the first page of a stack if it is not in use. +// @sa fiber_pool_vacancy_pointer +struct fiber_pool_vacancy { + // Details about the vacant stack: + struct fiber_pool_stack stack; + + // The next vacancy in the linked list. + struct fiber_pool_vacancy * next; +}; + +// Manages singly linked list of mapped regions of memory which contains 1 more more stack: +// +// base = +-------------------------------+-----------------------+ + +// |VM Stack |VM Stack | | | +// | | | | | +// | | | | | +// +-------------------------------+ | | +// |Machine Stack |Machine Stack | | | +// | | | | | +// | | | | | +// | | | . . . . | | size +// | | | | | +// | | | | | +// | | | | | +// | | | | | +// | | | | | +// +-------------------------------+ | | +// |Guard Page |Guard Page | | | +// +-------------------------------+-----------------------+ v +// +// +-------------------------------------------------------> +// +// count +// +struct fiber_pool_allocation { + // A pointer to the memory mapped region. + void * base; + + // The size of the individual stacks. + size_t size; + + // The number of stacks that were allocated. + size_t count; + + // The number of stacks used in this allocation. + // size_t used; + + // The next allocation in the linked list. + struct fiber_pool_allocation * next; +}; + +// A fiber pool manages vacant stacks to reduce the overhead of creating fibers. +struct fiber_pool { + // A singly-linked list of allocations which contain 1 or more stacks each. + struct fiber_pool_allocation * allocations; + + // Provides O(1) stack "allocation": + struct fiber_pool_vacancy * vacancies; + + // The size of the stack allocations including guard page. + size_t size; + + // The total number of stacks that have been allocated in this pool. + size_t count; + + // The number of stacks that have been used in this pool. + size_t used; + + // The amount to allocate for the vm_stack: + size_t vm_stack_size; +}; + typedef struct rb_context_struct { enum context_type type; int argc; @@ -57,7 +160,6 @@ typedef struct rb_context_struct { size_t stack_size; } machine; rb_execution_context_t saved_ec; - int free_vm_stack; rb_jmpbuf_t jmpbuf; rb_ensure_entry_t *ensure_array; /* Pointer to MJIT info about the continuation. */ @@ -102,18 +204,272 @@ struct rb_fiber_struct { unsigned int transferred : 1; struct coroutine_context context; - void *ss_sp; - size_t ss_size; + struct fiber_pool_stack stack; }; -#define MAX_MACHINE_STACK_CACHE 10 -static int machine_stack_cache_index = 0; -typedef struct machine_stack_cache_struct { - void *ptr; - size_t size; -} machine_stack_cache_t; -static machine_stack_cache_t machine_stack_cache[MAX_MACHINE_STACK_CACHE]; -static machine_stack_cache_t terminated_machine_stack; +static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; + +/* + * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL + * if MAP_STACK is passed. + * http://www.FreeBSD.org/cgi/query-pr.cgi?pr=158755 + */ +#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__) +#define FIBER_STACK_FLAGS (MAP_PRIVATE | MAP_ANON | MAP_STACK) +#else +#define FIBER_STACK_FLAGS (MAP_PRIVATE | MAP_ANON) +#endif + +#define ERRNOMSG strerror(errno) + +// Locates the stack vacancy details for the given stack. +// Requires that fiber_pool_vacancy fits within one page. +inline static struct fiber_pool_vacancy * +fiber_pool_vacancy_pointer(void * base, size_t size) +{ + STACK_GROW_DIR_DETECTION; + + return (struct fiber_pool_vacancy *)( + (char*)base + STACK_DIR_UPPER(0, size - RB_PAGE_SIZE) + ); +} + +// Given an existing fiber pool, expand it by the specified number of stacks. +static struct fiber_pool_allocation * +fiber_pool_expand(struct fiber_pool * fiber_pool, size_t count) +{ + size_t i; + struct fiber_pool_vacancy * vacancies = fiber_pool->vacancies; + struct fiber_pool_allocation * allocation = RB_ALLOC(struct fiber_pool_allocation); + + size_t size = fiber_pool->size; + + /* Initialize fiber pool */ + allocation->base = NULL; + allocation->size = size; + allocation->count = count; + + if (DEBUG) fprintf(stderr, "fiber_pool_expand(%zu): %p, %zu/%zu x [%zu:%zu]\n", count, fiber_pool, fiber_pool->used, fiber_pool->count, size, fiber_pool->vm_stack_size); + +#ifdef _WIN32 + DWORD old_protect; + + allocation->base = VirtualAlloc(0, count*size, MEM_COMMIT, PAGE_READWRITE); + + if (!allocation->base) { + rb_raise(rb_eFiberError, "can't alloc machine stack to fiber (%zu x %zu bytes): %s", count, size, ERRNOMSG); + } + + for (i = 0; i < count; i += 1) { + void * base = (char*)allocation->base + (size * i); + + if (!VirtualProtect(base, RB_PAGE_SIZE, PAGE_READWRITE | PAGE_GUARD, &old_protect)) { + VirtualFree(allocation->base, 0, MEM_RELEASE); + rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); + } + + struct fiber_pool_vacancy * vacancy = fiber_pool_vacancy_pointer(base, size); + vacancy->stack.base = base; + vacancy->stack.current = (char*)base + size; + vacancy->stack.size = size; + vacancy->stack.available = size - pagesize; + vacancy->stack.pool = fiber_pool; + vacancy->next = vacancies; + vacancies = vacancy; + } +#else + STACK_GROW_DIR_DETECTION; + + errno = 0; + allocation->base = mmap(NULL, count*size, PROT_READ | PROT_WRITE, FIBER_STACK_FLAGS, -1, 0); + + if (allocation->base == MAP_FAILED) { + rb_raise(rb_eFiberError, "can't alloc machine stack to fiber (%zu x %zu bytes): %s", count, size, ERRNOMSG); + } + + for (i = 0; i < count; i += 1) { + void * base = (char*)allocation->base + (size * i); + void * page = (char*)base + STACK_DIR_UPPER(size - RB_PAGE_SIZE, 0); + + if (mprotect(page, RB_PAGE_SIZE, PROT_NONE) < 0) { + munmap(allocation->base, count*size); + rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); + } + + struct fiber_pool_vacancy * vacancy = fiber_pool_vacancy_pointer(base, size); + vacancy->stack.base = base; + vacancy->stack.current = (char*)base + STACK_DIR_UPPER(0, size); + vacancy->stack.size = size; + vacancy->stack.available = size - pagesize; + vacancy->stack.pool = fiber_pool; + vacancy->next = vacancies; + vacancies = vacancy; + } +#endif + + // Insert the allocation into the head of the pool: + allocation->next = fiber_pool->allocations; + fiber_pool->allocations = allocation; + fiber_pool->vacancies = vacancies; + fiber_pool->count += count; + + return allocation; +} + +// Initialize the specified fiber pool with the given number of stacks. +// @param vm_stack_size The size of the vm stack to allocate. +static void +fiber_pool_initialize(struct fiber_pool * fiber_pool, size_t size, size_t count, size_t vm_stack_size) +{ + VM_ASSERT(vm_stack_size < size); + + fiber_pool->allocations = NULL; + fiber_pool->vacancies = NULL; + fiber_pool->size = ((size / RB_PAGE_SIZE) + 1) * RB_PAGE_SIZE; + fiber_pool->count = count; + fiber_pool->used = 0; + + fiber_pool->vm_stack_size = vm_stack_size; + + fiber_pool_expand(fiber_pool, count); +} + +#ifdef RB_EXPERIMENTAL_FIBER_POOL +// Free the list of fiber pool allocations. +static void +fiber_pool_free_allocations(struct fiber_pool_allocation * allocation) +{ + // If no stacks are being used, we can free this allocation: + // VM_ASSERT(allocation->used == 0); + +#ifdef _WIN32 + VirtualFree(allocation->base, 0, MEM_RELEASE); +#else + munmap(allocation->base, allocation->size * allocation->count); +#endif + allocation->base = NULL; + + if (allocation->next != NULL) { + fiber_pool_free_allocations(allocation->next); + } + + ruby_xfree(allocation); +} +#endif + +// Reset the current stack pointer and available size of the given stack. +inline static void +fiber_pool_stack_reset(struct fiber_pool_stack * stack) +{ + STACK_GROW_DIR_DETECTION; + + stack->current = (char*)stack->base + STACK_DIR_UPPER(0, stack->size); + stack->available = stack->size - RB_PAGE_SIZE; +} + +// A pointer to the base of the current unused portion of the stack. +inline static void * +fiber_pool_stack_base(struct fiber_pool_stack * stack) +{ + STACK_GROW_DIR_DETECTION; + + return STACK_DIR_UPPER(stack->current, (char*)stack->current - stack->available); +} + +// Allocate some memory from the stack. Used to allocate vm_stack inline with machine stack. +// @sa fiber_initialize_coroutine +inline static void * +fiber_pool_stack_alloca(struct fiber_pool_stack * stack, size_t offset) +{ + STACK_GROW_DIR_DETECTION; + + VM_ASSERT(stack->available >= offset); + + // The pointer to the memory being allocated: + void * pointer = STACK_DIR_UPPER(stack->current, (char*)stack->current - offset); + + // Move the stack pointer: + stack->current = STACK_DIR_UPPER((char*)stack->current + offset, (char*)stack->current - offset); + stack->available -= offset; + + return pointer; +} + +// Acquire a stack from the given fiber pool. If none are avilable, allocate more. +static struct fiber_pool_stack +fiber_pool_stack_acquire(struct fiber_pool * fiber_pool) { + struct fiber_pool_vacancy * vacancy = fiber_pool->vacancies; + + if (DEBUG) fprintf(stderr, "fiber_pool_stack_acquire: %p used=%zu\n", fiber_pool->vacancies, fiber_pool->used); + + if (!vacancy) { + size_t count = fiber_pool->count; + if (count > 1024) count = 1024; + + fiber_pool_expand(fiber_pool, count); + + // The free list should now contain some stacks: + VM_ASSERT(fiber_pool->vacancies); + + vacancy = fiber_pool->vacancies; + } + + // Take the top item from the free list: + fiber_pool->vacancies = vacancy->next; + fiber_pool->used += 1; + + fiber_pool_stack_reset(&vacancy->stack); + + return vacancy->stack; +} + +// Release and return a stack to the vacancy list. +static void +fiber_pool_stack_release(struct fiber_pool_stack stack) { + struct fiber_pool_vacancy * vacancy = fiber_pool_vacancy_pointer(stack.base, stack.size); + +#if defined(MADV_FREE) && defined(__linux__) + // Using madvise can make physical memory available to OS when there is memory pressure. + // But bencmarks show that this approach makes performance worse. + // madvise(vacancy->stack.base, vacancy->stack.size - RB_PAGE_SIZE, MADV_FREE); +#endif + + vacancy->stack = stack; + vacancy->next = stack.pool->vacancies; + stack.pool->vacancies = vacancy; + stack.pool->used -= 1; + + if (DEBUG) fprintf(stderr, "fiber_pool_stack_release: %p used=%zu\n", stack.base, stack.pool->used); +} + +static COROUTINE +fiber_entry(struct coroutine_context * from, struct coroutine_context * to) +{ + rb_fiber_start(); +} + +static VALUE * +fiber_initialize_machine_stack_context(rb_fiber_t *fiber, size_t * vm_stack_size) +{ + struct fiber_pool * fiber_pool = fiber->stack.pool; + rb_execution_context_t *sec = &fiber->cont.saved_ec; + void * vm_stack = NULL; + + STACK_GROW_DIR_DETECTION; + + VM_ASSERT(fiber_pool != NULL); + + fiber->stack = fiber_pool_stack_acquire(fiber_pool); + vm_stack = fiber_pool_stack_alloca(&fiber->stack, fiber_pool->vm_stack_size); + *vm_stack_size = fiber_pool->vm_stack_size; + + coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available); + + sec->machine.stack_start = fiber->stack.current; + sec->machine.stack_maxsize = fiber->stack.available; + + return vm_stack; +} static const char * fiber_status_name(enum fiber_status s) @@ -159,10 +515,10 @@ rb_ec_verify(const rb_execution_context_t *ec) } #endif -static void +inline static void fiber_status_set(rb_fiber_t *fiber, enum fiber_status s) { - if (0) fprintf(stderr, "fiber: %p, status: %s -> %s\n", (void *)fiber, fiber_status_name(fiber->status), fiber_status_name(s)); + if (DEBUG) fprintf(stderr, "fiber: %p, status: %s -> %s\n", (void *)fiber, fiber_status_name(fiber->status), fiber_status_name(s)); VM_ASSERT(!FIBER_TERMINATED_P(fiber)); VM_ASSERT(fiber->status != s); fiber_verify(fiber); @@ -187,11 +543,6 @@ ec_switch(rb_thread_t *th, rb_fiber_t *fiber) VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL); } -static const rb_data_type_t cont_data_type, fiber_data_type; -static VALUE rb_cContinuation; -static VALUE rb_cFiber; -static VALUE rb_eFiberError; - static rb_context_t * cont_ptr(VALUE obj) { @@ -288,26 +639,21 @@ cont_free(void *ptr) RUBY_FREE_ENTER("cont"); - if (cont->free_vm_stack) { - ruby_xfree(cont->saved_ec.vm_stack); - } - if (cont->type == CONTINUATION_CONTEXT) { + ruby_xfree(cont->saved_ec.vm_stack); ruby_xfree(cont->ensure_array); RUBY_FREE_UNLESS_NULL(cont->machine.stack); } else { rb_fiber_t *fiber = (rb_fiber_t*)cont; coroutine_destroy(&fiber->context); - if (fiber->ss_sp != NULL) { + if (fiber->stack.base != NULL) { if (fiber_is_root_p(fiber)) { rb_bug("Illegal root fiber parameter"); } -#ifdef _WIN32 - VirtualFree((void*)fiber->ss_sp, 0, MEM_RELEASE); -#else - munmap((void*)fiber->ss_sp, fiber->ss_size); -#endif - fiber->ss_sp = NULL; + if (fiber->stack.base) { + fiber_pool_stack_release(fiber->stack); + fiber->stack.base = NULL; + } } } @@ -667,93 +1013,6 @@ cont_restore_thread(rb_context_t *cont) } } -static COROUTINE -fiber_entry(struct coroutine_context * from, struct coroutine_context * to) -{ - rb_fiber_start(); -} - -/* - * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL - * if MAP_STACK is passed. - * http://www.FreeBSD.org/cgi/query-pr.cgi?pr=158755 - */ -#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__) -#define FIBER_STACK_FLAGS (MAP_PRIVATE | MAP_ANON | MAP_STACK) -#else -#define FIBER_STACK_FLAGS (MAP_PRIVATE | MAP_ANON) -#endif - -#define ERRNOMSG strerror(errno) - -static char* -fiber_machine_stack_alloc(size_t size) -{ - char *ptr; -#ifdef _WIN32 - DWORD old_protect; -#endif - - if (machine_stack_cache_index > 0) { - if (machine_stack_cache[machine_stack_cache_index - 1].size == (size / sizeof(VALUE))) { - ptr = machine_stack_cache[machine_stack_cache_index - 1].ptr; - machine_stack_cache_index--; - machine_stack_cache[machine_stack_cache_index].ptr = NULL; - machine_stack_cache[machine_stack_cache_index].size = 0; - } - else { - /* TODO handle multiple machine stack size */ - rb_bug("machine_stack_cache size is not canonicalized"); - } - } - else { -#ifdef _WIN32 - ptr = VirtualAlloc(0, size, MEM_COMMIT, PAGE_READWRITE); - - if (!ptr) { - rb_raise(rb_eFiberError, "can't allocate machine stack to fiber: %s", ERRNOMSG); - } - - if (!VirtualProtect(ptr, RB_PAGE_SIZE, PAGE_READWRITE | PAGE_GUARD, &old_protect)) { - rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); - } -#else - void *page; - STACK_GROW_DIR_DETECTION; - - errno = 0; - ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, FIBER_STACK_FLAGS, -1, 0); - if (ptr == MAP_FAILED) { - rb_raise(rb_eFiberError, "can't alloc machine stack to fiber: %s", ERRNOMSG); - } - - /* guard page setup */ - page = ptr + STACK_DIR_UPPER(size - RB_PAGE_SIZE, 0); - if (mprotect(page, RB_PAGE_SIZE, PROT_NONE) < 0) { - rb_raise(rb_eFiberError, "can't set a guard page: %s", ERRNOMSG); - } -#endif - } - - return ptr; -} - -static void -fiber_initialize_machine_stack_context(rb_fiber_t *fiber, size_t size) -{ - rb_execution_context_t *sec = &fiber->cont.saved_ec; - - char *ptr; - STACK_GROW_DIR_DETECTION; - - ptr = fiber_machine_stack_alloc(size); - fiber->ss_sp = ptr; - fiber->ss_size = size; - coroutine_initialize(&fiber->context, fiber_entry, ptr, size); - sec->machine.stack_start = (VALUE*)(ptr + STACK_DIR_UPPER(0, size)); - sec->machine.stack_maxsize = size - RB_PAGE_SIZE; -} - NOINLINE(static void fiber_setcontext(rb_fiber_t *new_fiber, rb_fiber_t *old_fiber)); static void @@ -915,7 +1174,7 @@ cont_restore_0(rb_context_t *cont, VALUE *addr_in_prev_frame) * require "continuation" * callcc {|cont| * for i in 0..4 - * print "\n#{i}: " + * print "#{i}: " * for j in i*5...(i+1)*5 * cont.call() if j == 17 * printf "%3d", j @@ -1211,50 +1470,47 @@ rb_vm_push_frame(rb_execution_context_t *sec, int stack_max); static VALUE -fiber_init(VALUE fiber_value, VALUE proc) +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool) +{ + rb_fiber_t *fiber = fiber_t_alloc(self); + + fiber->first_proc = proc; + fiber->stack.base = NULL; + fiber->stack.pool = fiber_pool; + + return self; +} + +static void +fiber_prepare_stack(rb_fiber_t *fiber) { - rb_fiber_t *fiber = fiber_t_alloc(fiber_value); rb_context_t *cont = &fiber->cont; rb_execution_context_t *sec = &cont->saved_ec; - rb_thread_t *cth = GET_THREAD(); - rb_vm_t *vm = cth->vm; - size_t fiber_vm_stack_size = vm->default_params.fiber_vm_stack_size; - size_t thread_vm_stack_size = vm->default_params.thread_vm_stack_size; - VALUE *vm_stack; + + size_t vm_stack_size = 0; + VALUE *vm_stack = fiber_initialize_machine_stack_context(fiber, &vm_stack_size); /* initialize cont */ cont->saved_vm_stack.ptr = NULL; - if (fiber_vm_stack_size == thread_vm_stack_size) { - vm_stack = rb_thread_recycle_stack(fiber_vm_stack_size / sizeof(VALUE)); - } - else { - vm_stack = ruby_xmalloc(fiber_vm_stack_size); - } - - cont->free_vm_stack = 1; - rb_ec_initialize_vm_stack(sec, vm_stack, fiber_vm_stack_size / sizeof(VALUE)); + rb_ec_initialize_vm_stack(sec, vm_stack, vm_stack_size / sizeof(VALUE)); sec->tag = NULL; sec->local_storage = NULL; sec->local_storage_recursive_hash = Qnil; sec->local_storage_recursive_hash_for_trace = Qnil; - - fiber->first_proc = proc; - - return fiber_value; } /* :nodoc: */ static VALUE -rb_fiber_init(VALUE fiber_value) +rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { - return fiber_init(fiber_value, rb_block_proc()); + return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool); } VALUE rb_fiber_new(VALUE (*func)(ANYARGS), VALUE obj) { - return fiber_init(fiber_alloc(rb_cFiber), rb_proc_new(func, obj)); + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool); } static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt); @@ -1337,10 +1593,6 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) fiber->cont.saved_ec.thread_ptr = th; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ th->ec = &fiber->cont.saved_ec; - - VM_ASSERT(fiber->cont.free_vm_stack == 0); - - /* NOTE: On WIN32, fiber_handle is not allocated yet. */ } void @@ -1411,6 +1663,20 @@ rb_fiber_current(void) return fiber_current()->cont.self; } +static void +fiber_stack_release(rb_fiber_t * fiber) +{ + rb_execution_context_t *ec = &fiber->cont.saved_ec; + + if (fiber->stack.base) { + fiber_pool_stack_release(fiber->stack); + fiber->stack.base = NULL; + } + + // The stack is no longer associated with this execution context: + rb_ec_clear_vm_stack(ec); +} + static inline VALUE fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) { @@ -1424,41 +1690,23 @@ fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th) fiber = root_fiber_alloc(th); } - VM_ASSERT(FIBER_RESUMED_P(fiber) || FIBER_TERMINATED_P(fiber)); - VM_ASSERT(FIBER_RUNNABLE_P(next_fiber)); - if (FIBER_CREATED_P(next_fiber)) { - fiber_initialize_machine_stack_context(next_fiber, th->vm->default_params.fiber_machine_stack_size); + fiber_prepare_stack(next_fiber); } + VM_ASSERT(FIBER_RESUMED_P(fiber) || FIBER_TERMINATED_P(fiber)); + VM_ASSERT(FIBER_RUNNABLE_P(next_fiber)); + if (FIBER_RESUMED_P(fiber)) fiber_status_set(fiber, FIBER_SUSPENDED); fiber_status_set(next_fiber, FIBER_RESUMED); - fiber_setcontext(next_fiber, fiber); - if (terminated_machine_stack.ptr) { - if (machine_stack_cache_index < MAX_MACHINE_STACK_CACHE) { - machine_stack_cache[machine_stack_cache_index++] = terminated_machine_stack; - } - else { - if (terminated_machine_stack.ptr != fiber->cont.machine.stack) { -#ifdef _WIN32 - VirtualFree(terminated_machine_stack.ptr, 0, MEM_RELEASE); -#else - munmap((void*)terminated_machine_stack.ptr, terminated_machine_stack.size * sizeof(VALUE)); -#endif - } - else { - rb_bug("terminated fiber resumed"); - } - } - terminated_machine_stack.ptr = NULL; - terminated_machine_stack.size = 0; - } - fiber = th->ec->fiber_ptr; + + /* Raise an exception if that was the result of executing the fiber */ if (fiber->cont.argc == -1) rb_exc_raise(fiber->cont.value); + return fiber->cont.value; } @@ -1516,7 +1764,13 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume) cont->argc = argc; cont->value = make_passing_arg(argc, argv); + value = fiber_store(fiber, th); + + if (is_resume && FIBER_TERMINATED_P(fiber)) { + fiber_stack_release(fiber); + } + RUBY_VM_CHECK_INTS(th->ec); EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); @@ -1533,44 +1787,27 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) void rb_fiber_close(rb_fiber_t *fiber) { - rb_execution_context_t *ec = &fiber->cont.saved_ec; - VALUE *vm_stack = ec->vm_stack; - size_t stack_bytes = ec->vm_stack_size * sizeof(VALUE); - fiber_status_set(fiber, FIBER_TERMINATED); - if (fiber->cont.free_vm_stack) { - if (stack_bytes == rb_ec_vm_ptr(ec)->default_params.thread_vm_stack_size) { - rb_thread_recycle_stack_release(vm_stack); - } - else { - ruby_xfree(vm_stack); - } - } - - rb_ec_clear_vm_stack(ec); + fiber_stack_release(fiber); } static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt) { VALUE value = fiber->cont.value; - rb_fiber_t *ret_fiber; + rb_fiber_t *next_fiber; VM_ASSERT(FIBER_RESUMED_P(fiber)); rb_fiber_close(fiber); coroutine_destroy(&fiber->context); - /* Ruby must not switch to other thread until storing terminated_machine_stack */ - terminated_machine_stack.ptr = fiber->ss_sp; - terminated_machine_stack.size = fiber->ss_size / sizeof(VALUE); - fiber->ss_sp = NULL; fiber->cont.machine.stack = NULL; fiber->cont.machine.stack_size = 0; - ret_fiber = return_fiber(); - if (need_interrupt) RUBY_VM_SET_INTERRUPT(&ret_fiber->cont.saved_ec); - fiber_switch(ret_fiber, 1, &value, 0); + next_fiber = return_fiber(); + if (need_interrupt) RUBY_VM_SET_INTERRUPT(&next_fiber->cont.saved_ec); + fiber_switch(next_fiber, 1, &value, 0); } VALUE @@ -1792,6 +2029,74 @@ rb_fiber_atfork(rb_thread_t *th) } #endif +#ifdef RB_EXPERIMENTAL_FIBER_POOL +static void +fiber_pool_free(void *ptr) +{ + struct fiber_pool * fiber_pool = ptr; + RUBY_FREE_ENTER("fiber_pool"); + + fiber_pool_free_allocations(fiber_pool->allocations); + ruby_xfree(fiber_pool); + + RUBY_FREE_LEAVE("fiber_pool"); +} + +static size_t +fiber_pool_memsize(const void *ptr) +{ + const struct fiber_pool * fiber_pool = ptr; + size_t size = sizeof(*fiber_pool); + + size += fiber_pool->count * fiber_pool->size; + + return size; +} + +static const rb_data_type_t FiberPoolDataType = { + "fiber_pool", + {NULL, fiber_pool_free, fiber_pool_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +}; + +static VALUE +fiber_pool_alloc(VALUE klass) +{ + struct fiber_pool * fiber_pool = RB_ALLOC(struct fiber_pool); + + return TypedData_Wrap_Struct(klass, &FiberPoolDataType, fiber_pool); +} + +static VALUE +rb_fiber_pool_initialize(int argc, VALUE* argv, VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + VALUE size = Qnil, count = Qnil, vm_stack_size = Qnil; + struct fiber_pool * fiber_pool = NULL; + + // Maybe these should be keyworkd arguments. + rb_scan_args(argc, argv, "03", &size, &count, &vm_stack_size); + + if (NIL_P(size)) { + size = INT2NUM(th->vm->default_params.fiber_machine_stack_size); + } + + if (NIL_P(count)) { + count = INT2NUM(128); + } + + if (NIL_P(vm_stack_size)) { + vm_stack_size = INT2NUM(th->vm->default_params.fiber_vm_stack_size); + } + + TypedData_Get_Struct(self, struct fiber_pool, &FiberPoolDataType, fiber_pool); + + fiber_pool_initialize(fiber_pool, NUM2SIZET(size), NUM2SIZET(count), NUM2SIZET(vm_stack_size)); + + return self; +} +#endif + /* * Document-class: FiberError * @@ -1809,6 +2114,9 @@ void Init_Cont(void) { rb_thread_t *th = GET_THREAD(); + size_t vm_stack_size = th->vm->default_params.fiber_vm_stack_size; + size_t machine_stack_size = th->vm->default_params.fiber_machine_stack_size; + size_t stack_size = machine_stack_size + vm_stack_size; #ifdef _WIN32 SYSTEM_INFO info; @@ -1819,15 +2127,23 @@ Init_Cont(void) #endif SET_MACHINE_STACK_END(&th->ec->machine.stack_end); + fiber_pool_initialize(&shared_fiber_pool, stack_size, 8, vm_stack_size); + rb_cFiber = rb_define_class("Fiber", rb_cObject); rb_define_alloc_func(rb_cFiber, fiber_alloc); rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); - rb_define_method(rb_cFiber, "initialize", rb_fiber_init, 0); + rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + +#ifdef RB_EXPERIMENTAL_FIBER_POOL + rb_cFiberPool = rb_define_class("Pool", rb_cFiber); + rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); + rb_define_method(rb_cFiberPool, "initialize", rb_fiber_pool_initialize, -1); +#endif } RUBY_SYMBOL_EXPORT_BEGIN -- cgit v1.2.3