diff options
author | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2020-09-11 20:47:25 +1200 |
---|---|---|
committer | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2020-09-14 16:44:09 +1200 |
commit | 0f613cc5f1bbe319ab916be905de263523ef5402 (patch) | |
tree | 84f707d136241822a547a222e40a98efb57aca49 /thread_sync.c | |
parent | 1a0cfe28390ce5d46f7b854eaad2b9b979c160de (diff) |
Add support for ConditionVariable.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/3434
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 75 |
1 files changed, 44 insertions, 31 deletions
diff --git a/thread_sync.c b/thread_sync.c index 9dd3b32645..bd60231789 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -4,8 +4,16 @@ static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; +/* Mutex */ +typedef struct rb_mutex_struct { + rb_fiber_t *fiber; + struct rb_mutex_struct *next_mutex; + struct list_head waitq; /* protected by GVL */ +} rb_mutex_t; + /* sync_waiter is always on-stack */ struct sync_waiter { + VALUE self; rb_thread_t *th; rb_fiber_t *fiber; struct list_node node; @@ -19,12 +27,17 @@ sync_wakeup(struct list_head *head, long max) struct sync_waiter *cur = 0, *next; list_for_each_safe(head, cur, next, node) { - list_del_init(&cur->node); - if (cur->th->status != THREAD_KILLED) { - rb_threadptr_interrupt(cur->th); - cur->th->status = THREAD_RUNNABLE; - if (--max == 0) return; - } + list_del_init(&cur->node); + + if (cur->th->scheduler != Qnil) { + rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); + } + + if (cur->th->status != THREAD_KILLED) { + rb_threadptr_interrupt(cur->th); + cur->th->status = THREAD_RUNNABLE; + if (--max == 0) return; + } } } @@ -40,16 +53,6 @@ wakeup_all(struct list_head *head) sync_wakeup(head, LONG_MAX); } -/* Mutex */ - -typedef struct rb_mutex_struct { - VALUE self; - - rb_fiber_t *fiber; - struct rb_mutex_struct *next_mutex; - struct list_head waitq; /* protected by GVL */ -} rb_mutex_t; - #if defined(HAVE_WORKING_FORK) static void rb_mutex_abandon_all(rb_mutex_t *mutexes); static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); @@ -151,7 +154,6 @@ mutex_alloc(VALUE klass) obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); - mutex->self = obj; list_head_init(&mutex->waitq); return obj; } @@ -247,8 +249,8 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) static VALUE do_mutex_lock(VALUE self, int interruptible_p) { - rb_thread_t *th = GET_THREAD(); rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = ec->thread_ptr; rb_fiber_t *fiber = ec->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); @@ -260,6 +262,7 @@ do_mutex_lock(VALUE self, int interruptible_p) if (rb_mutex_trylock(self) == Qfalse) { struct sync_waiter w = { + .self = self, .th = th, .fiber = fiber }; @@ -398,7 +401,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) list_del_init(&cur->node); if (cur->th->scheduler != Qnil) { - rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber)); + rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); } switch (cur->th->status) { @@ -498,7 +501,6 @@ rb_mutex_wait_for(VALUE time) VALUE rb_mutex_sleep(VALUE self, VALUE timeout) { - time_t beg, end; struct timeval t; if (!NIL_P(timeout)) { @@ -506,18 +508,23 @@ rb_mutex_sleep(VALUE self, VALUE timeout) } rb_mutex_unlock(self); - beg = time(0); - if (NIL_P(timeout)) { - rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self); + time_t beg = time(0); + + VALUE scheduler = rb_thread_current_scheduler(); + if (scheduler != Qnil) { + rb_scheduler_kernel_sleep(scheduler, timeout); + mutex_lock_uninterruptible(self); + } else { + if (NIL_P(timeout)) { + rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self); + } else { + rb_hrtime_t rel = rb_timeval2hrtime(&t); + rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self); + } } - else { - rb_hrtime_t rel = rb_timeval2hrtime(&t); - rb_ensure(rb_mutex_wait_for, (VALUE)&rel, - mutex_lock_uninterruptible, self); - } RUBY_VM_CHECK_INTS_BLOCKING(GET_EC()); - end = time(0) - beg; + time_t end = time(0) - beg; return INT2FIX(end); } @@ -1429,13 +1436,19 @@ delete_from_waitq(VALUE v) static VALUE rb_condvar_wait(int argc, VALUE *argv, VALUE self) { + rb_execution_context_t *ec = GET_EC(); + struct rb_condvar *cv = condvar_ptr(self); struct sleep_call args; - struct sync_waiter w; rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout); - w.th = GET_THREAD(); + struct sync_waiter w = { + .self = args.mutex, + .th = ec->thread_ptr, + .fiber = ec->fiber_ptr, + }; + list_add_tail(&cv->waitq, &w.node); rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w); |