diff options
author | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2021-06-26 10:17:26 +1200 |
---|---|---|
committer | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2021-07-01 11:23:03 +1200 |
commit | 42130a64f02294dc8025af3a51bda518c67ab33d (patch) | |
tree | e81c181770e4cc9d3e87e960a25a870e9a4774f5 /cont.c | |
parent | 9c9531950c007872d7726f050a1dc0cb6f8f0490 (diff) |
Replace copy coroutine with pthread implementation.
Diffstat (limited to 'cont.c')
-rw-r--r-- | cont.c | 141 |
1 files changed, 71 insertions, 70 deletions
@@ -711,10 +711,51 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack) #endif } +void rb_fiber_start(rb_fiber_t*); + +static inline void +ec_switch(rb_thread_t *th, rb_fiber_t *fiber) +{ + rb_execution_context_t *ec = &fiber->cont.saved_ec; + rb_ractor_set_current_ec(th->ractor, th->ec = ec); + // ruby_current_execution_context_ptr = th->ec = ec; + + /* + * timer-thread may set trap interrupt on previous th->ec at any time; + * ensure we do not delay (or lose) the trap interrupt handling. + */ + if (th->vm->ractor.main_thread == th && + rb_signal_buff_size() > 0) { + RUBY_VM_SET_TRAP_INTERRUPT(ec); + } + + VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL); +} + +static inline void +fiber_restore_thread(rb_thread_t *th, rb_fiber_t *fiber) +{ + ec_switch(th, fiber); + VM_ASSERT(th->ec->fiber_ptr == fiber); +} + static COROUTINE fiber_entry(struct coroutine_context * from, struct coroutine_context * to) { - rb_fiber_start(); + rb_fiber_t *fiber = to->argument; + rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr; + +#ifdef COROUTINE_PTHREAD_CONTEXT + ruby_thread_set_native(thread); +#endif + + fiber_restore_thread(thread, fiber); + + rb_fiber_start(fiber); + +#ifndef COROUTINE_PTHREAD_CONTEXT + VM_UNREACHABLE(fiber_entry); +#endif } // Initialize a fiber's coroutine's machine stack and vm stack. @@ -731,22 +772,13 @@ fiber_initialize_coroutine(rb_fiber_t *fiber, size_t * vm_stack_size) vm_stack = fiber_pool_stack_alloca(&fiber->stack, fiber_pool->vm_stack_size); *vm_stack_size = fiber_pool->vm_stack_size; -#ifdef COROUTINE_PRIVATE_STACK - coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available, sec->machine.stack_start); - // The stack for this execution context is still the main machine stack, so don't adjust it. - // If this is not managed correctly, you will fail in `rb_ec_stack_check`. - - // We limit the machine stack usage to the fiber stack size. - if (sec->machine.stack_maxsize > fiber->stack.available) { - sec->machine.stack_maxsize = fiber->stack.available; - } -#else coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available); // The stack for this execution context is the one we allocated: sec->machine.stack_start = fiber->stack.current; sec->machine.stack_maxsize = fiber->stack.available; -#endif + + fiber->context.argument = (void*)fiber; return vm_stack; } @@ -815,25 +847,6 @@ fiber_status_set(rb_fiber_t *fiber, enum fiber_status s) fiber->status = s; } -static inline void -ec_switch(rb_thread_t *th, rb_fiber_t *fiber) -{ - rb_execution_context_t *ec = &fiber->cont.saved_ec; - rb_ractor_set_current_ec(th->ractor, th->ec = ec); - // ruby_current_execution_context_ptr = th->ec = ec; - - /* - * timer-thread may set trap interrupt on previous th->ec at any time; - * ensure we do not delay (or lose) the trap interrupt handling. - */ - if (th->vm->ractor.main_thread == th && - rb_signal_buff_size() > 0) { - RUBY_VM_SET_TRAP_INTERRUPT(ec); - } - - VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL); -} - static rb_context_t * cont_ptr(VALUE obj) { @@ -1041,7 +1054,7 @@ fiber_free(void *ptr) rb_fiber_t *fiber = ptr; RUBY_FREE_ENTER("fiber"); - //if (DEBUG) fprintf(stderr, "fiber_free: %p[%p]\n", fiber, fiber->stack.base); + if (DEBUG) fprintf(stderr, "fiber_free: %p[%p]\n", fiber, fiber->stack.base); if (fiber->cont.saved_ec.local_storage) { rb_id_table_free(fiber->cont.saved_ec.local_storage); @@ -1278,13 +1291,6 @@ cont_capture(volatile int *volatile stat) COMPILER_WARNING_POP static inline void -fiber_restore_thread(rb_thread_t *th, rb_fiber_t *fiber) -{ - ec_switch(th, fiber); - VM_ASSERT(th->ec->fiber_ptr == fiber); -} - -static inline void cont_restore_thread(rb_context_t *cont) { rb_thread_t *th = GET_THREAD(); @@ -1326,7 +1332,6 @@ cont_restore_thread(rb_context_t *cont) th->ec->cfp = sec->cfp; th->ec->raised_flag = sec->raised_flag; th->ec->tag = sec->tag; - th->ec->protect_tag = sec->protect_tag; th->ec->root_lep = sec->root_lep; th->ec->root_svar = sec->root_svar; th->ec->ensure_list = sec->ensure_list; @@ -1367,13 +1372,17 @@ fiber_setcontext(rb_fiber_t *new_fiber, rb_fiber_t *old_fiber) /* old_fiber->machine.stack_end should be NULL */ old_fiber->cont.saved_ec.machine.stack_end = NULL; - /* restore thread context */ - fiber_restore_thread(th, new_fiber); - // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] -> %p[%p]\n", old_fiber, old_fiber->stack.base, new_fiber, new_fiber->stack.base); /* swap machine context */ - coroutine_transfer(&old_fiber->context, &new_fiber->context); + struct coroutine_context * from = coroutine_transfer(&old_fiber->context, &new_fiber->context); + + if (from == NULL) { + rb_syserr_fail(errno, "coroutine_transfer"); + } + + /* restore thread context */ + fiber_restore_thread(th, old_fiber); // It's possible to get here, and new_fiber is already freed. // if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] <- %p[%p]\n", old_fiber, old_fiber->stack.base, new_fiber, new_fiber->stack.base); @@ -1670,9 +1679,6 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval) if (cont_thread_value(cont) != th->self) { rb_raise(rb_eRuntimeError, "continuation called across threads"); } - if (cont->saved_ec.protect_tag != th->ec->protect_tag) { - rb_raise(rb_eRuntimeError, "continuation called across stack rewinding barrier"); - } if (cont->saved_ec.fiber_ptr) { if (th->ec->fiber_ptr != cont->saved_ec.fiber_ptr) { rb_raise(rb_eRuntimeError, "continuation called across fiber"); @@ -2031,13 +2037,13 @@ rb_fiber_set_scheduler(VALUE klass, VALUE scheduler) return rb_fiber_scheduler_set(scheduler); } -NORETURN(static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err)); +static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err); void -rb_fiber_start(void) +rb_fiber_start(rb_fiber_t *fiber) { - rb_thread_t * volatile th = GET_THREAD(); - rb_fiber_t *fiber = th->ec->fiber_ptr; + rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr; + rb_proc_t *proc; enum ruby_tag_type state; int need_interrupt = TRUE; @@ -2084,7 +2090,6 @@ rb_fiber_start(void) } rb_fiber_terminate(fiber, need_interrupt, err); - VM_UNREACHABLE(rb_fiber_start); } static rb_fiber_t * @@ -2101,12 +2106,7 @@ root_fiber_alloc(rb_thread_t *th) DATA_PTR(fiber_value) = fiber; fiber->cont.self = fiber_value; -#ifdef COROUTINE_PRIVATE_STACK - fiber->stack = fiber_pool_stack_acquire(&shared_fiber_pool); - coroutine_initialize_main(&fiber->context, fiber_pool_stack_base(&fiber->stack), fiber->stack.available, th->ec->machine.stack_start); -#else coroutine_initialize_main(&fiber->context); -#endif return fiber; } @@ -2255,10 +2255,8 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE if (cont_thread_value(cont) != th->self) { rb_raise(rb_eFiberError, "fiber called across threads"); } - else if (cont->saved_ec.protect_tag != th->ec->protect_tag) { - rb_raise(rb_eFiberError, "fiber called across stack rewinding barrier"); - } - else if (FIBER_TERMINATED_P(fiber)) { + + if (FIBER_TERMINATED_P(fiber)) { value = rb_exc_new2(rb_eFiberError, "dead fiber called"); if (!FIBER_TERMINATED_P(th->ec->fiber_ptr)) { @@ -2307,9 +2305,12 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE fiber_store(fiber, th); + // We cannot free the stack until the pthread is joined: +#ifndef COROUTINE_PTHREAD_CONTEXT if (RTEST(resuming_fiber) && FIBER_TERMINATED_P(fiber)) { fiber_stack_release(fiber); } +#endif if (fiber_current()->blocking) { th->blocking += 1; @@ -2388,26 +2389,24 @@ rb_fiber_close(rb_fiber_t *fiber) } static void -rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err) +rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE error) { VALUE value = fiber->cont.value; - rb_fiber_t *next_fiber; VM_ASSERT(FIBER_RESUMED_P(fiber)); rb_fiber_close(fiber); - coroutine_destroy(&fiber->context); - fiber->cont.machine.stack = NULL; fiber->cont.machine.stack_size = 0; - next_fiber = return_fiber(true); + rb_fiber_t *next_fiber = return_fiber(true); + if (need_interrupt) RUBY_VM_SET_INTERRUPT(&next_fiber->cont.saved_ec); - if (RTEST(err)) - fiber_switch(next_fiber, -1, &err, RB_NO_KEYWORDS, Qfalse, false); + + if (RTEST(error)) + fiber_switch(next_fiber, -1, &error, RB_NO_KEYWORDS, Qfalse, false); else fiber_switch(next_fiber, 1, &value, RB_NO_KEYWORDS, Qfalse, false); - VM_UNREACHABLE(rb_fiber_terminate); } VALUE @@ -2436,7 +2435,9 @@ rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat) rb_raise(rb_eFiberError, "attempt to resume a transferring fiber"); } - return fiber_switch(fiber, argc, argv, kw_splat, fiber_value, false); + VALUE result = fiber_switch(fiber, argc, argv, kw_splat, fiber_value, false); + + return result; } VALUE |