From 42130a64f02294dc8025af3a51bda518c67ab33d Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 26 Jun 2021 10:17:26 +1200 Subject: Replace copy coroutine with pthread implementation. --- thread_sync.c | 85 ++++++++++++++++++++++++++--------------------------------- 1 file changed, 38 insertions(+), 47 deletions(-) (limited to 'thread_sync.c') diff --git a/thread_sync.c b/thread_sync.c index 2837f454f4..894235e9e1 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1,6 +1,5 @@ /* included by thread.c */ #include "ccan/list/list.h" -#include "coroutine/Stack.h" static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; @@ -277,12 +276,10 @@ call_rb_fiber_scheduler_block(VALUE mutex) } static VALUE -delete_from_waitq(VALUE v) +delete_from_waitq(VALUE value) { - struct sync_waiter *w = (void *)v; - list_del(&w->node); - - COROUTINE_STACK_FREE(w); + struct sync_waiter *sync_waiter = (void *)value; + list_del(&sync_waiter->node); return Qnil; } @@ -309,14 +306,15 @@ do_mutex_lock(VALUE self, int interruptible_p) while (mutex->fiber != fiber) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - COROUTINE_STACK_LOCAL(struct sync_waiter, w); - w->self = self; - w->th = th; - w->fiber = fiber; + struct sync_waiter sync_waiter = { + .self = self, + .th = th, + .fiber = fiber + }; - list_add_tail(&mutex->waitq, &w->node); + list_add_tail(&mutex->waitq, &sync_waiter.node); - rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)w); + rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter); if (!mutex->fiber) { mutex->fiber = fiber; @@ -341,18 +339,17 @@ do_mutex_lock(VALUE self, int interruptible_p) patrol_thread = th; } - COROUTINE_STACK_LOCAL(struct sync_waiter, w); - w->self = self; - w->th = th; - w->fiber = fiber; + struct sync_waiter sync_waiter = { + .self = self, + .th = th, + .fiber = fiber + }; - list_add_tail(&mutex->waitq, &w->node); + list_add_tail(&mutex->waitq, &sync_waiter.node); native_sleep(th, timeout); /* release GVL */ - list_del(&w->node); - - COROUTINE_STACK_FREE(w); + list_del(&sync_waiter.node); if (!mutex->fiber) { mutex->fiber = fiber; @@ -984,8 +981,6 @@ queue_sleep_done(VALUE p) list_del(&qw->w.node); qw->as.q->num_waiting--; - COROUTINE_STACK_FREE(qw); - return Qfalse; } @@ -997,8 +992,6 @@ szqueue_sleep_done(VALUE p) list_del(&qw->w.node); qw->as.sq->num_waiting_push--; - COROUTINE_STACK_FREE(qw); - return Qfalse; } @@ -1020,17 +1013,15 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) assert(RARRAY_LEN(q->que) == 0); assert(queue_closed_p(self) == 0); - COROUTINE_STACK_LOCAL(struct queue_waiter, qw); + struct queue_waiter queue_waiter = { + .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr}, + .as = {.q = q} + }; - qw->w.self = self; - qw->w.th = ec->thread_ptr; - qw->w.fiber = ec->fiber_ptr; + list_add_tail(queue_waitq(queue_waiter.as.q), &queue_waiter.w.node); + queue_waiter.as.q->num_waiting++; - qw->as.q = q; - list_add_tail(queue_waitq(qw->as.q), &qw->w.node); - qw->as.q->num_waiting++; - - rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)qw); + rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter); } } @@ -1263,18 +1254,17 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) } else { rb_execution_context_t *ec = GET_EC(); - COROUTINE_STACK_LOCAL(struct queue_waiter, qw); - struct list_head *pushq = szqueue_pushq(sq); + struct queue_waiter queue_waiter = { + .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr}, + .as = {.sq = sq} + }; - qw->w.self = self; - qw->w.th = ec->thread_ptr; - qw->w.fiber = ec->fiber_ptr; + struct list_head *pushq = szqueue_pushq(sq); - qw->as.sq = sq; - list_add_tail(pushq, &qw->w.node); + list_add_tail(pushq, &queue_waiter.w.node); sq->num_waiting_push++; - rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)qw); + rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&queue_waiter); } } @@ -1505,13 +1495,14 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self) rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout); - COROUTINE_STACK_LOCAL(struct sync_waiter, w); - w->self = args.mutex; - w->th = ec->thread_ptr; - w->fiber = ec->fiber_ptr; + struct sync_waiter sync_waiter = { + .self = args.mutex, + .th = ec->thread_ptr, + .fiber = ec->fiber_ptr + }; - list_add_tail(&cv->waitq, &w->node); - rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)w); + list_add_tail(&cv->waitq, &sync_waiter.node); + rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter); return self; } -- cgit v1.2.3