diff options
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 148 |
1 files changed, 59 insertions, 89 deletions
diff --git a/thread_sync.c b/thread_sync.c index e2f1b9ee64..6f7919867c 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1,16 +1,21 @@ /* included by thread.c */ +#include "ccan/list/list.h" static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; /* Mutex */ +/* mutex_waiter is always on-stack */ +struct mutex_waiter { + rb_thread_t *th; + struct list_node node; +}; + typedef struct rb_mutex_struct { - rb_nativethread_lock_t lock; - rb_nativethread_cond_t cond; struct rb_thread_struct volatile *th; struct rb_mutex_struct *next_mutex; - int cond_waiting; + struct list_head waitq; /* protected by GVL */ int allow_trap; } rb_mutex_t; @@ -51,6 +56,19 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *t #define mutex_mark NULL +static size_t +rb_mutex_num_waiting(rb_mutex_t *mutex) +{ + struct mutex_waiter *w; + size_t n = 0; + + list_for_each(&mutex->waitq, w, node) { + n++; + } + + return n; +} + static void mutex_free(void *ptr) { @@ -60,8 +78,6 @@ mutex_free(void *ptr) const char *err = rb_mutex_unlock_th(mutex, mutex->th); if (err) rb_bug("%s", err); } - native_mutex_destroy(&mutex->lock); - native_cond_destroy(&mutex->cond); ruby_xfree(ptr); } @@ -95,8 +111,7 @@ mutex_alloc(VALUE klass) rb_mutex_t *mutex; obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); - native_mutex_initialize(&mutex->lock); - native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); + list_head_init(&mutex->waitq); return obj; } @@ -158,7 +173,6 @@ rb_mutex_trylock(VALUE self) VALUE locked = Qfalse; GetMutexPtr(self, mutex); - native_mutex_lock(&mutex->lock); if (mutex->th == 0) { rb_thread_t *th = GET_THREAD(); mutex->th = th; @@ -166,61 +180,10 @@ rb_mutex_trylock(VALUE self) mutex_locked(th, self); } - native_mutex_unlock(&mutex->lock); return locked; } -static int -lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) -{ - int interrupted = 0; - int err = 0; - - mutex->cond_waiting++; - for (;;) { - if (!mutex->th) { - mutex->th = th; - break; - } - if (RUBY_VM_INTERRUPTED(th)) { - interrupted = 1; - break; - } - if (err == ETIMEDOUT) { - interrupted = 2; - break; - } - - if (timeout_ms) { - struct timespec timeout_rel; - struct timespec timeout; - - timeout_rel.tv_sec = 0; - timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; - timeout = native_cond_timeout(&mutex->cond, timeout_rel); - err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); - } - else { - native_cond_wait(&mutex->cond, &mutex->lock); - err = 0; - } - } - mutex->cond_waiting--; - - return interrupted; -} - -static void -lock_interrupt(void *ptr) -{ - rb_mutex_t *mutex = (rb_mutex_t *)ptr; - native_mutex_lock(&mutex->lock); - if (mutex->cond_waiting > 0) - native_cond_broadcast(&mutex->cond); - native_mutex_unlock(&mutex->lock); -} - /* * At maximum, only one thread can use cond_timedwait and watch deadlock * periodically. Multiple polling thread (i.e. concurrent deadlock check) @@ -248,45 +211,45 @@ rb_mutex_lock(VALUE self) } if (rb_mutex_trylock(self) == Qfalse) { + struct mutex_waiter w; + if (mutex->th == th) { rb_raise(rb_eThreadError, "deadlock; recursive locking"); } + w.th = th; + while (mutex->th != th) { - int interrupted; enum rb_thread_status prev_status = th->status; - volatile int timeout_ms = 0; - struct rb_unblock_callback oldubf; + struct timeval *timeout = 0; + struct timeval tv = { 0, 100000 }; /* 100ms */ - set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); th->status = THREAD_STOPPED_FOREVER; th->locking_mutex = self; - - native_mutex_lock(&mutex->lock); th->vm->sleeper++; /* - * Carefully! while some contended threads are in lock_func(), + * Carefully! while some contended threads are in native_sleep(), * vm->sleeper is unstable value. we have to avoid both deadlock * and busy loop. */ if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && !patrol_thread) { - timeout_ms = 100; + timeout = &tv; patrol_thread = th; } - GVL_UNLOCK_BEGIN(); - interrupted = lock_func(th, mutex, (int)timeout_ms); - native_mutex_unlock(&mutex->lock); - GVL_UNLOCK_END(); + list_add_tail(&mutex->waitq, &w.node); + native_sleep(th, timeout); /* release GVL */ + list_del(&w.node); + if (!mutex->th) { + mutex->th = th; + } if (patrol_thread == th) patrol_thread = NULL; - reset_unblock_function(th, &oldubf); - th->locking_mutex = Qfalse; - if (mutex->th && interrupted == 2) { + if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th)) { rb_check_deadlock(th->vm); } if (th->status == THREAD_STOPPED_FOREVER) { @@ -296,9 +259,7 @@ rb_mutex_lock(VALUE self) if (mutex->th == th) mutex_locked(th, self); - if (interrupted) { - RUBY_VM_CHECK_INTS_BLOCKING(th); - } + RUBY_VM_CHECK_INTS_BLOCKING(th); } } return self; @@ -330,24 +291,32 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) { const char *err = NULL; - native_mutex_lock(&mutex->lock); - if (mutex->th == 0) { err = "Attempt to unlock a mutex which is not locked"; } else if (mutex->th != th) { err = "Attempt to unlock a mutex which is locked by another thread"; - } - else { - mutex->th = 0; - if (mutex->cond_waiting > 0) - native_cond_signal(&mutex->cond); - } - - native_mutex_unlock(&mutex->lock); - - if (!err) { + } else { + struct mutex_waiter *cur = 0, *next = 0; rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes; + + mutex->th = 0; + list_for_each_safe(&mutex->waitq, cur, next, node) { + list_del_init(&cur->node); + switch (cur->th->status) { + case THREAD_RUNNABLE: /* from someone else calling Thread#run */ + case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ + rb_threadptr_interrupt(cur->th); + goto found; + case THREAD_STOPPED: /* probably impossible */ + rb_bug("unexpected THREAD_STOPPED"); + case THREAD_KILLED: + /* not sure about this, possible in exit GC? */ + rb_bug("unexpected THREAD_KILLED"); + continue; + } + } +found: while (*th_mutex != mutex) { th_mutex = &(*th_mutex)->next_mutex; } @@ -411,6 +380,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) mutexes = mutex->next_mutex; mutex->th = 0; mutex->next_mutex = 0; + list_head_init(&mutex->waitq); } } #endif |