summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c148
1 files changed, 59 insertions, 89 deletions
diff --git a/thread_sync.c b/thread_sync.c
index e2f1b9ee64..6f7919867c 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -1,16 +1,21 @@
/* included by thread.c */
+#include "ccan/list/list.h"
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
/* Mutex */
+/* mutex_waiter is always on-stack */
+struct mutex_waiter {
+ rb_thread_t *th;
+ struct list_node node;
+};
+
typedef struct rb_mutex_struct {
- rb_nativethread_lock_t lock;
- rb_nativethread_cond_t cond;
struct rb_thread_struct volatile *th;
struct rb_mutex_struct *next_mutex;
- int cond_waiting;
+ struct list_head waitq; /* protected by GVL */
int allow_trap;
} rb_mutex_t;
@@ -51,6 +56,19 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *t
#define mutex_mark NULL
+static size_t
+rb_mutex_num_waiting(rb_mutex_t *mutex)
+{
+ struct mutex_waiter *w;
+ size_t n = 0;
+
+ list_for_each(&mutex->waitq, w, node) {
+ n++;
+ }
+
+ return n;
+}
+
static void
mutex_free(void *ptr)
{
@@ -60,8 +78,6 @@ mutex_free(void *ptr)
const char *err = rb_mutex_unlock_th(mutex, mutex->th);
if (err) rb_bug("%s", err);
}
- native_mutex_destroy(&mutex->lock);
- native_cond_destroy(&mutex->cond);
ruby_xfree(ptr);
}
@@ -95,8 +111,7 @@ mutex_alloc(VALUE klass)
rb_mutex_t *mutex;
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
- native_mutex_initialize(&mutex->lock);
- native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC);
+ list_head_init(&mutex->waitq);
return obj;
}
@@ -158,7 +173,6 @@ rb_mutex_trylock(VALUE self)
VALUE locked = Qfalse;
GetMutexPtr(self, mutex);
- native_mutex_lock(&mutex->lock);
if (mutex->th == 0) {
rb_thread_t *th = GET_THREAD();
mutex->th = th;
@@ -166,61 +180,10 @@ rb_mutex_trylock(VALUE self)
mutex_locked(th, self);
}
- native_mutex_unlock(&mutex->lock);
return locked;
}
-static int
-lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms)
-{
- int interrupted = 0;
- int err = 0;
-
- mutex->cond_waiting++;
- for (;;) {
- if (!mutex->th) {
- mutex->th = th;
- break;
- }
- if (RUBY_VM_INTERRUPTED(th)) {
- interrupted = 1;
- break;
- }
- if (err == ETIMEDOUT) {
- interrupted = 2;
- break;
- }
-
- if (timeout_ms) {
- struct timespec timeout_rel;
- struct timespec timeout;
-
- timeout_rel.tv_sec = 0;
- timeout_rel.tv_nsec = timeout_ms * 1000 * 1000;
- timeout = native_cond_timeout(&mutex->cond, timeout_rel);
- err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout);
- }
- else {
- native_cond_wait(&mutex->cond, &mutex->lock);
- err = 0;
- }
- }
- mutex->cond_waiting--;
-
- return interrupted;
-}
-
-static void
-lock_interrupt(void *ptr)
-{
- rb_mutex_t *mutex = (rb_mutex_t *)ptr;
- native_mutex_lock(&mutex->lock);
- if (mutex->cond_waiting > 0)
- native_cond_broadcast(&mutex->cond);
- native_mutex_unlock(&mutex->lock);
-}
-
/*
* At maximum, only one thread can use cond_timedwait and watch deadlock
* periodically. Multiple polling thread (i.e. concurrent deadlock check)
@@ -248,45 +211,45 @@ rb_mutex_lock(VALUE self)
}
if (rb_mutex_trylock(self) == Qfalse) {
+ struct mutex_waiter w;
+
if (mutex->th == th) {
rb_raise(rb_eThreadError, "deadlock; recursive locking");
}
+ w.th = th;
+
while (mutex->th != th) {
- int interrupted;
enum rb_thread_status prev_status = th->status;
- volatile int timeout_ms = 0;
- struct rb_unblock_callback oldubf;
+ struct timeval *timeout = 0;
+ struct timeval tv = { 0, 100000 }; /* 100ms */
- set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE);
th->status = THREAD_STOPPED_FOREVER;
th->locking_mutex = self;
-
- native_mutex_lock(&mutex->lock);
th->vm->sleeper++;
/*
- * Carefully! while some contended threads are in lock_func(),
+ * Carefully! while some contended threads are in native_sleep(),
* vm->sleeper is unstable value. we have to avoid both deadlock
* and busy loop.
*/
if ((vm_living_thread_num(th->vm) == th->vm->sleeper) &&
!patrol_thread) {
- timeout_ms = 100;
+ timeout = &tv;
patrol_thread = th;
}
- GVL_UNLOCK_BEGIN();
- interrupted = lock_func(th, mutex, (int)timeout_ms);
- native_mutex_unlock(&mutex->lock);
- GVL_UNLOCK_END();
+ list_add_tail(&mutex->waitq, &w.node);
+ native_sleep(th, timeout); /* release GVL */
+ list_del(&w.node);
+ if (!mutex->th) {
+ mutex->th = th;
+ }
if (patrol_thread == th)
patrol_thread = NULL;
- reset_unblock_function(th, &oldubf);
-
th->locking_mutex = Qfalse;
- if (mutex->th && interrupted == 2) {
+ if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th)) {
rb_check_deadlock(th->vm);
}
if (th->status == THREAD_STOPPED_FOREVER) {
@@ -296,9 +259,7 @@ rb_mutex_lock(VALUE self)
if (mutex->th == th) mutex_locked(th, self);
- if (interrupted) {
- RUBY_VM_CHECK_INTS_BLOCKING(th);
- }
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
}
}
return self;
@@ -330,24 +291,32 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th)
{
const char *err = NULL;
- native_mutex_lock(&mutex->lock);
-
if (mutex->th == 0) {
err = "Attempt to unlock a mutex which is not locked";
}
else if (mutex->th != th) {
err = "Attempt to unlock a mutex which is locked by another thread";
- }
- else {
- mutex->th = 0;
- if (mutex->cond_waiting > 0)
- native_cond_signal(&mutex->cond);
- }
-
- native_mutex_unlock(&mutex->lock);
-
- if (!err) {
+ } else {
+ struct mutex_waiter *cur = 0, *next = 0;
rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes;
+
+ mutex->th = 0;
+ list_for_each_safe(&mutex->waitq, cur, next, node) {
+ list_del_init(&cur->node);
+ switch (cur->th->status) {
+ case THREAD_RUNNABLE: /* from someone else calling Thread#run */
+ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
+ rb_threadptr_interrupt(cur->th);
+ goto found;
+ case THREAD_STOPPED: /* probably impossible */
+ rb_bug("unexpected THREAD_STOPPED");
+ case THREAD_KILLED:
+ /* not sure about this, possible in exit GC? */
+ rb_bug("unexpected THREAD_KILLED");
+ continue;
+ }
+ }
+found:
while (*th_mutex != mutex) {
th_mutex = &(*th_mutex)->next_mutex;
}
@@ -411,6 +380,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
mutexes = mutex->next_mutex;
mutex->th = 0;
mutex->next_mutex = 0;
+ list_head_init(&mutex->waitq);
}
}
#endif