summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-11 20:47:25 +1200
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-14 16:44:09 +1200
commit0f613cc5f1bbe319ab916be905de263523ef5402 (patch)
tree84f707d136241822a547a222e40a98efb57aca49 /thread_sync.c
parent1a0cfe28390ce5d46f7b854eaad2b9b979c160de (diff)
Add support for ConditionVariable.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3434
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c75
1 files changed, 44 insertions, 31 deletions
diff --git a/thread_sync.c b/thread_sync.c
index 9dd3b32645..bd60231789 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -4,8 +4,16 @@
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
+/* Mutex */
+typedef struct rb_mutex_struct {
+ rb_fiber_t *fiber;
+ struct rb_mutex_struct *next_mutex;
+ struct list_head waitq; /* protected by GVL */
+} rb_mutex_t;
+
/* sync_waiter is always on-stack */
struct sync_waiter {
+ VALUE self;
rb_thread_t *th;
rb_fiber_t *fiber;
struct list_node node;
@@ -19,12 +27,17 @@ sync_wakeup(struct list_head *head, long max)
struct sync_waiter *cur = 0, *next;
list_for_each_safe(head, cur, next, node) {
- list_del_init(&cur->node);
- if (cur->th->status != THREAD_KILLED) {
- rb_threadptr_interrupt(cur->th);
- cur->th->status = THREAD_RUNNABLE;
- if (--max == 0) return;
- }
+ list_del_init(&cur->node);
+
+ if (cur->th->scheduler != Qnil) {
+ rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
+ }
+
+ if (cur->th->status != THREAD_KILLED) {
+ rb_threadptr_interrupt(cur->th);
+ cur->th->status = THREAD_RUNNABLE;
+ if (--max == 0) return;
+ }
}
}
@@ -40,16 +53,6 @@ wakeup_all(struct list_head *head)
sync_wakeup(head, LONG_MAX);
}
-/* Mutex */
-
-typedef struct rb_mutex_struct {
- VALUE self;
-
- rb_fiber_t *fiber;
- struct rb_mutex_struct *next_mutex;
- struct list_head waitq; /* protected by GVL */
-} rb_mutex_t;
-
#if defined(HAVE_WORKING_FORK)
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
@@ -151,7 +154,6 @@ mutex_alloc(VALUE klass)
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
- mutex->self = obj;
list_head_init(&mutex->waitq);
return obj;
}
@@ -247,8 +249,8 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
- rb_thread_t *th = GET_THREAD();
rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
@@ -260,6 +262,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (rb_mutex_trylock(self) == Qfalse) {
struct sync_waiter w = {
+ .self = self,
.th = th,
.fiber = fiber
};
@@ -398,7 +401,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
list_del_init(&cur->node);
if (cur->th->scheduler != Qnil) {
- rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
+ rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
switch (cur->th->status) {
@@ -498,7 +501,6 @@ rb_mutex_wait_for(VALUE time)
VALUE
rb_mutex_sleep(VALUE self, VALUE timeout)
{
- time_t beg, end;
struct timeval t;
if (!NIL_P(timeout)) {
@@ -506,18 +508,23 @@ rb_mutex_sleep(VALUE self, VALUE timeout)
}
rb_mutex_unlock(self);
- beg = time(0);
- if (NIL_P(timeout)) {
- rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
+ time_t beg = time(0);
+
+ VALUE scheduler = rb_thread_current_scheduler();
+ if (scheduler != Qnil) {
+ rb_scheduler_kernel_sleep(scheduler, timeout);
+ mutex_lock_uninterruptible(self);
+ } else {
+ if (NIL_P(timeout)) {
+ rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
+ } else {
+ rb_hrtime_t rel = rb_timeval2hrtime(&t);
+ rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
+ }
}
- else {
- rb_hrtime_t rel = rb_timeval2hrtime(&t);
- rb_ensure(rb_mutex_wait_for, (VALUE)&rel,
- mutex_lock_uninterruptible, self);
- }
RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
- end = time(0) - beg;
+ time_t end = time(0) - beg;
return INT2FIX(end);
}
@@ -1429,13 +1436,19 @@ delete_from_waitq(VALUE v)
static VALUE
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
{
+ rb_execution_context_t *ec = GET_EC();
+
struct rb_condvar *cv = condvar_ptr(self);
struct sleep_call args;
- struct sync_waiter w;
rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
- w.th = GET_THREAD();
+ struct sync_waiter w = {
+ .self = args.mutex,
+ .th = ec->thread_ptr,
+ .fiber = ec->fiber_ptr,
+ };
+
list_add_tail(&cv->waitq, &w.node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);