summaryrefslogtreecommitdiff
path: root/cont.c
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2021-06-26 10:17:26 +1200
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2021-07-01 11:23:03 +1200
commit42130a64f02294dc8025af3a51bda518c67ab33d (patch)
treee81c181770e4cc9d3e87e960a25a870e9a4774f5 /cont.c
parent9c9531950c007872d7726f050a1dc0cb6f8f0490 (diff)
Replace copy coroutine with pthread implementation.
Diffstat (limited to 'cont.c')
-rw-r--r--cont.c141
1 files changed, 71 insertions, 70 deletions
diff --git a/cont.c b/cont.c
index 00ab1e9fd1..d72d29eaca 100644
--- a/cont.c
+++ b/cont.c
@@ -711,10 +711,51 @@ fiber_pool_stack_release(struct fiber_pool_stack * stack)
#endif
}
+void rb_fiber_start(rb_fiber_t*);
+
+static inline void
+ec_switch(rb_thread_t *th, rb_fiber_t *fiber)
+{
+ rb_execution_context_t *ec = &fiber->cont.saved_ec;
+ rb_ractor_set_current_ec(th->ractor, th->ec = ec);
+ // ruby_current_execution_context_ptr = th->ec = ec;
+
+ /*
+ * timer-thread may set trap interrupt on previous th->ec at any time;
+ * ensure we do not delay (or lose) the trap interrupt handling.
+ */
+ if (th->vm->ractor.main_thread == th &&
+ rb_signal_buff_size() > 0) {
+ RUBY_VM_SET_TRAP_INTERRUPT(ec);
+ }
+
+ VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL);
+}
+
+static inline void
+fiber_restore_thread(rb_thread_t *th, rb_fiber_t *fiber)
+{
+ ec_switch(th, fiber);
+ VM_ASSERT(th->ec->fiber_ptr == fiber);
+}
+
static COROUTINE
fiber_entry(struct coroutine_context * from, struct coroutine_context * to)
{
- rb_fiber_start();
+ rb_fiber_t *fiber = to->argument;
+ rb_thread_t *thread = fiber->cont.saved_ec.thread_ptr;
+
+#ifdef COROUTINE_PTHREAD_CONTEXT
+ ruby_thread_set_native(thread);
+#endif
+
+ fiber_restore_thread(thread, fiber);
+
+ rb_fiber_start(fiber);
+
+#ifndef COROUTINE_PTHREAD_CONTEXT
+ VM_UNREACHABLE(fiber_entry);
+#endif
}
// Initialize a fiber's coroutine's machine stack and vm stack.
@@ -731,22 +772,13 @@ fiber_initialize_coroutine(rb_fiber_t *fiber, size_t * vm_stack_size)
vm_stack = fiber_pool_stack_alloca(&fiber->stack, fiber_pool->vm_stack_size);
*vm_stack_size = fiber_pool->vm_stack_size;
-#ifdef COROUTINE_PRIVATE_STACK
- coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available, sec->machine.stack_start);
- // The stack for this execution context is still the main machine stack, so don't adjust it.
- // If this is not managed correctly, you will fail in `rb_ec_stack_check`.
-
- // We limit the machine stack usage to the fiber stack size.
- if (sec->machine.stack_maxsize > fiber->stack.available) {
- sec->machine.stack_maxsize = fiber->stack.available;
- }
-#else
coroutine_initialize(&fiber->context, fiber_entry, fiber_pool_stack_base(&fiber->stack), fiber->stack.available);
// The stack for this execution context is the one we allocated:
sec->machine.stack_start = fiber->stack.current;
sec->machine.stack_maxsize = fiber->stack.available;
-#endif
+
+ fiber->context.argument = (void*)fiber;
return vm_stack;
}
@@ -815,25 +847,6 @@ fiber_status_set(rb_fiber_t *fiber, enum fiber_status s)
fiber->status = s;
}
-static inline void
-ec_switch(rb_thread_t *th, rb_fiber_t *fiber)
-{
- rb_execution_context_t *ec = &fiber->cont.saved_ec;
- rb_ractor_set_current_ec(th->ractor, th->ec = ec);
- // ruby_current_execution_context_ptr = th->ec = ec;
-
- /*
- * timer-thread may set trap interrupt on previous th->ec at any time;
- * ensure we do not delay (or lose) the trap interrupt handling.
- */
- if (th->vm->ractor.main_thread == th &&
- rb_signal_buff_size() > 0) {
- RUBY_VM_SET_TRAP_INTERRUPT(ec);
- }
-
- VM_ASSERT(ec->fiber_ptr->cont.self == 0 || ec->vm_stack != NULL);
-}
-
static rb_context_t *
cont_ptr(VALUE obj)
{
@@ -1041,7 +1054,7 @@ fiber_free(void *ptr)
rb_fiber_t *fiber = ptr;
RUBY_FREE_ENTER("fiber");
- //if (DEBUG) fprintf(stderr, "fiber_free: %p[%p]\n", fiber, fiber->stack.base);
+ if (DEBUG) fprintf(stderr, "fiber_free: %p[%p]\n", fiber, fiber->stack.base);
if (fiber->cont.saved_ec.local_storage) {
rb_id_table_free(fiber->cont.saved_ec.local_storage);
@@ -1278,13 +1291,6 @@ cont_capture(volatile int *volatile stat)
COMPILER_WARNING_POP
static inline void
-fiber_restore_thread(rb_thread_t *th, rb_fiber_t *fiber)
-{
- ec_switch(th, fiber);
- VM_ASSERT(th->ec->fiber_ptr == fiber);
-}
-
-static inline void
cont_restore_thread(rb_context_t *cont)
{
rb_thread_t *th = GET_THREAD();
@@ -1326,7 +1332,6 @@ cont_restore_thread(rb_context_t *cont)
th->ec->cfp = sec->cfp;
th->ec->raised_flag = sec->raised_flag;
th->ec->tag = sec->tag;
- th->ec->protect_tag = sec->protect_tag;
th->ec->root_lep = sec->root_lep;
th->ec->root_svar = sec->root_svar;
th->ec->ensure_list = sec->ensure_list;
@@ -1367,13 +1372,17 @@ fiber_setcontext(rb_fiber_t *new_fiber, rb_fiber_t *old_fiber)
/* old_fiber->machine.stack_end should be NULL */
old_fiber->cont.saved_ec.machine.stack_end = NULL;
- /* restore thread context */
- fiber_restore_thread(th, new_fiber);
-
// if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] -> %p[%p]\n", old_fiber, old_fiber->stack.base, new_fiber, new_fiber->stack.base);
/* swap machine context */
- coroutine_transfer(&old_fiber->context, &new_fiber->context);
+ struct coroutine_context * from = coroutine_transfer(&old_fiber->context, &new_fiber->context);
+
+ if (from == NULL) {
+ rb_syserr_fail(errno, "coroutine_transfer");
+ }
+
+ /* restore thread context */
+ fiber_restore_thread(th, old_fiber);
// It's possible to get here, and new_fiber is already freed.
// if (DEBUG) fprintf(stderr, "fiber_setcontext: %p[%p] <- %p[%p]\n", old_fiber, old_fiber->stack.base, new_fiber, new_fiber->stack.base);
@@ -1670,9 +1679,6 @@ rb_cont_call(int argc, VALUE *argv, VALUE contval)
if (cont_thread_value(cont) != th->self) {
rb_raise(rb_eRuntimeError, "continuation called across threads");
}
- if (cont->saved_ec.protect_tag != th->ec->protect_tag) {
- rb_raise(rb_eRuntimeError, "continuation called across stack rewinding barrier");
- }
if (cont->saved_ec.fiber_ptr) {
if (th->ec->fiber_ptr != cont->saved_ec.fiber_ptr) {
rb_raise(rb_eRuntimeError, "continuation called across fiber");
@@ -2031,13 +2037,13 @@ rb_fiber_set_scheduler(VALUE klass, VALUE scheduler)
return rb_fiber_scheduler_set(scheduler);
}
-NORETURN(static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err));
+static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err);
void
-rb_fiber_start(void)
+rb_fiber_start(rb_fiber_t *fiber)
{
- rb_thread_t * volatile th = GET_THREAD();
- rb_fiber_t *fiber = th->ec->fiber_ptr;
+ rb_thread_t * volatile th = fiber->cont.saved_ec.thread_ptr;
+
rb_proc_t *proc;
enum ruby_tag_type state;
int need_interrupt = TRUE;
@@ -2084,7 +2090,6 @@ rb_fiber_start(void)
}
rb_fiber_terminate(fiber, need_interrupt, err);
- VM_UNREACHABLE(rb_fiber_start);
}
static rb_fiber_t *
@@ -2101,12 +2106,7 @@ root_fiber_alloc(rb_thread_t *th)
DATA_PTR(fiber_value) = fiber;
fiber->cont.self = fiber_value;
-#ifdef COROUTINE_PRIVATE_STACK
- fiber->stack = fiber_pool_stack_acquire(&shared_fiber_pool);
- coroutine_initialize_main(&fiber->context, fiber_pool_stack_base(&fiber->stack), fiber->stack.available, th->ec->machine.stack_start);
-#else
coroutine_initialize_main(&fiber->context);
-#endif
return fiber;
}
@@ -2255,10 +2255,8 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE
if (cont_thread_value(cont) != th->self) {
rb_raise(rb_eFiberError, "fiber called across threads");
}
- else if (cont->saved_ec.protect_tag != th->ec->protect_tag) {
- rb_raise(rb_eFiberError, "fiber called across stack rewinding barrier");
- }
- else if (FIBER_TERMINATED_P(fiber)) {
+
+ if (FIBER_TERMINATED_P(fiber)) {
value = rb_exc_new2(rb_eFiberError, "dead fiber called");
if (!FIBER_TERMINATED_P(th->ec->fiber_ptr)) {
@@ -2307,9 +2305,12 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE
fiber_store(fiber, th);
+ // We cannot free the stack until the pthread is joined:
+#ifndef COROUTINE_PTHREAD_CONTEXT
if (RTEST(resuming_fiber) && FIBER_TERMINATED_P(fiber)) {
fiber_stack_release(fiber);
}
+#endif
if (fiber_current()->blocking) {
th->blocking += 1;
@@ -2388,26 +2389,24 @@ rb_fiber_close(rb_fiber_t *fiber)
}
static void
-rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE err)
+rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt, VALUE error)
{
VALUE value = fiber->cont.value;
- rb_fiber_t *next_fiber;
VM_ASSERT(FIBER_RESUMED_P(fiber));
rb_fiber_close(fiber);
- coroutine_destroy(&fiber->context);
-
fiber->cont.machine.stack = NULL;
fiber->cont.machine.stack_size = 0;
- next_fiber = return_fiber(true);
+ rb_fiber_t *next_fiber = return_fiber(true);
+
if (need_interrupt) RUBY_VM_SET_INTERRUPT(&next_fiber->cont.saved_ec);
- if (RTEST(err))
- fiber_switch(next_fiber, -1, &err, RB_NO_KEYWORDS, Qfalse, false);
+
+ if (RTEST(error))
+ fiber_switch(next_fiber, -1, &error, RB_NO_KEYWORDS, Qfalse, false);
else
fiber_switch(next_fiber, 1, &value, RB_NO_KEYWORDS, Qfalse, false);
- VM_UNREACHABLE(rb_fiber_terminate);
}
VALUE
@@ -2436,7 +2435,9 @@ rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat)
rb_raise(rb_eFiberError, "attempt to resume a transferring fiber");
}
- return fiber_switch(fiber, argc, argv, kw_splat, fiber_value, false);
+ VALUE result = fiber_switch(fiber, argc, argv, kw_splat, fiber_value, false);
+
+ return result;
}
VALUE