summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
authorBenoit Daloze <eregontp@gmail.com>2020-09-05 16:26:24 +1200
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-14 16:44:09 +1200
commit178c1b0922dc727897d81d7cfe9c97d5ffa97fd9 (patch)
tree113600e7e6a196b779bcac7529535597858f78a7 /thread_sync.c
parent9e0a48c7a31ecd39be0596d0517b9d521ae75282 (diff)
Make Mutex per-Fiber instead of per-Thread
* Enables Mutex to be used as synchronization between multiple Fibers of the same Thread. * With a Fiber scheduler we can yield to another Fiber on contended Mutex#lock instead of blocking the entire thread. * This also makes the behavior of Mutex consistent across CRuby, JRuby and TruffleRuby. * [Feature #16792]
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3434
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c106
1 files changed, 69 insertions, 37 deletions
diff --git a/thread_sync.c b/thread_sync.c
index deb3858c31..cfdd62635a 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -7,6 +7,7 @@ static VALUE rb_eClosedQueueError;
/* sync_waiter is always on-stack */
struct sync_waiter {
rb_thread_t *th;
+ rb_fiber_t *fiber;
struct list_node node;
};
@@ -42,7 +43,9 @@ wakeup_all(struct list_head *head)
/* Mutex */
typedef struct rb_mutex_struct {
- rb_thread_t *th;
+ VALUE self;
+
+ rb_fiber_t *fiber;
struct rb_mutex_struct *next_mutex;
struct list_head waitq; /* protected by GVL */
} rb_mutex_t;
@@ -52,7 +55,7 @@ static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
#endif
-static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th);
+static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
/*
* Document-class: Mutex
@@ -93,13 +96,15 @@ rb_mutex_num_waiting(rb_mutex_t *mutex)
return n;
}
+rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
+
static void
mutex_free(void *ptr)
{
rb_mutex_t *mutex = ptr;
- if (mutex->th) {
+ if (mutex->fiber) {
/* rb_warn("free locked mutex"); */
- const char *err = rb_mutex_unlock_th(mutex, mutex->th);
+ const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
if (err) rb_bug("%s", err);
}
ruby_xfree(ptr);
@@ -145,6 +150,8 @@ mutex_alloc(VALUE klass)
rb_mutex_t *mutex;
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
+
+ mutex->self = obj;
list_head_init(&mutex->waitq);
return obj;
}
@@ -178,7 +185,7 @@ rb_mutex_locked_p(VALUE self)
{
rb_mutex_t *mutex = mutex_ptr(self);
- return mutex->th ? Qtrue : Qfalse;
+ return mutex->fiber ? Qtrue : Qfalse;
}
static void
@@ -191,7 +198,7 @@ mutex_locked(rb_thread_t *th, VALUE self)
}
th->keeping_mutexes = mutex;
- th->blocking += 1;
+ // th->blocking += 1;
}
/*
@@ -207,9 +214,10 @@ rb_mutex_trylock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
VALUE locked = Qfalse;
- if (mutex->th == 0) {
+ if (mutex->fiber == 0) {
+ rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_thread_t *th = GET_THREAD();
- mutex->th = th;
+ mutex->fiber = fiber;
locked = Qtrue;
mutex_locked(th, self);
@@ -226,9 +234,9 @@ rb_mutex_trylock(VALUE self)
static const rb_thread_t *patrol_thread = NULL;
static VALUE
-mutex_owned_p(rb_thread_t *th, rb_mutex_t *mutex)
+mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
{
- if (mutex->th == th) {
+ if (mutex->fiber == fiber) {
return Qtrue;
}
else {
@@ -240,6 +248,8 @@ static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
rb_thread_t *th = GET_THREAD();
+ rb_execution_context_t *ec = GET_EC();
+ rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
/* When running trap handler */
@@ -249,15 +259,33 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
if (rb_mutex_trylock(self) == Qfalse) {
- struct sync_waiter w;
+ struct sync_waiter w = {
+ .th = th,
+ .fiber = fiber
+ };
- if (mutex->th == th) {
+ if (mutex->fiber == fiber) {
rb_raise(rb_eThreadError, "deadlock; recursive locking");
}
- w.th = th;
+ VALUE scheduler = rb_thread_current_scheduler();
+ while (mutex->fiber != fiber) {
+ if (scheduler != Qnil) {
+ list_add_tail(&mutex->waitq, &w.node);
+
+ rb_scheduler_mutex_lock(scheduler, self);
- while (mutex->th != th) {
+ list_del(&w.node);
+
+ if (!mutex->fiber) {
+ mutex->fiber = fiber;
+ break;
+ } else {
+ // Try again...
+ continue;
+ }
+ }
+
enum rb_thread_status prev_status = th->status;
rb_hrtime_t *timeout = 0;
rb_hrtime_t rel = rb_msec2hrtime(100);
@@ -277,18 +305,20 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
list_add_tail(&mutex->waitq, &w.node);
- native_sleep(th, timeout); /* release GVL */
+
+ native_sleep(th, timeout); /* release GVL */
+
list_del(&w.node);
- if (!mutex->th) {
- mutex->th = th;
+ if (!mutex->fiber) {
+ mutex->fiber = fiber;
}
if (patrol_thread == th)
patrol_thread = NULL;
th->locking_mutex = Qfalse;
- if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
+ if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
rb_check_deadlock(th->ractor);
}
if (th->status == THREAD_STOPPED_FOREVER) {
@@ -299,22 +329,19 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (interruptible_p) {
/* release mutex before checking for interrupts...as interrupt checking
* code might call rb_raise() */
- if (mutex->th == th) mutex->th = 0;
-
+ if (mutex->fiber == fiber) mutex->fiber = 0;
RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
- if (!mutex->th) {
- mutex->th = th;
- mutex_locked(th, self);
+ if (!mutex->fiber) {
+ mutex->fiber = fiber;
}
}
- else {
- if (mutex->th == th) mutex_locked(th, self);
- }
}
+
+ if (mutex->fiber == fiber) mutex_locked(th, self);
}
// assertion
- if (mutex_owned_p(th, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
+ if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
return self;
}
@@ -347,32 +374,37 @@ rb_mutex_lock(VALUE self)
VALUE
rb_mutex_owned_p(VALUE self)
{
- rb_thread_t *th = GET_THREAD();
+ rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
- return mutex_owned_p(th, mutex);
+ return mutex_owned_p(fiber, mutex);
}
static const char *
-rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
+rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
{
const char *err = NULL;
- if (mutex->th == 0) {
+ if (mutex->fiber == 0) {
err = "Attempt to unlock a mutex which is not locked";
}
- else if (mutex->th != th) {
- err = "Attempt to unlock a mutex which is locked by another thread";
+ else if (mutex->fiber != fiber) {
+ err = "Attempt to unlock a mutex which is locked by another thread/fiber";
}
else {
struct sync_waiter *cur = 0, *next;
rb_mutex_t **th_mutex = &th->keeping_mutexes;
- th->blocking -= 1;
+ // th->blocking -= 1;
- mutex->th = 0;
+ mutex->fiber = 0;
list_for_each_safe(&mutex->waitq, cur, next, node) {
list_del_init(&cur->node);
+
+ if (cur->th->scheduler != Qnil) {
+ rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
+ }
+
switch (cur->th->status) {
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
@@ -411,7 +443,7 @@ rb_mutex_unlock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
rb_thread_t *th = GET_THREAD();
- err = rb_mutex_unlock_th(mutex, th);
+ err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
if (err) rb_raise(rb_eThreadError, "%s", err);
return self;
@@ -444,7 +476,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
while (mutexes) {
mutex = mutexes;
mutexes = mutex->next_mutex;
- mutex->th = 0;
+ mutex->fiber = 0;
mutex->next_mutex = 0;
list_head_init(&mutex->waitq);
}