diff options
Diffstat (limited to 'ext/thread/thread.c')
-rw-r--r-- | ext/thread/thread.c | 98 |
1 files changed, 66 insertions, 32 deletions
diff --git a/ext/thread/thread.c b/ext/thread/thread.c index b0badf11b8..910b326696 100644 --- a/ext/thread/thread.c +++ b/ext/thread/thread.c @@ -12,6 +12,7 @@ #include <ruby.h> #include <intern.h> #include <rubysig.h> +#include <node.h> static VALUE rb_cMutex; static VALUE rb_cConditionVariable; @@ -207,15 +208,16 @@ array_from_list(List const *list) static VALUE wake_thread(VALUE thread) { - return rb_rescue2(rb_thread_wakeup, thread, - NULL, Qundef, rb_eThreadError, 0); + return rb_thread_wakeup_alive(thread); } static VALUE run_thread(VALUE thread) { - return rb_rescue2(rb_thread_run, thread, - NULL, Qundef, rb_eThreadError, 0); + thread = wake_thread(thread); + if (RTEST(thread) && !rb_thread_critical) + rb_thread_schedule(); + return thread; } static VALUE @@ -225,7 +227,9 @@ wake_one(List *list) waking = Qnil; while (list->entries && !RTEST(waking)) { - waking = wake_thread(shift_list(list)); + waking = shift_list(list); + if (waking == Qundef) break; + waking = wake_thread(waking); } return waking; @@ -266,10 +270,17 @@ static void assert_no_survivors(List *waiting, const char *label, void *addr) { Entry *entry; + VALUE ths = 0; + for (entry = waiting->entries; entry; entry = entry->next) { - if (RTEST(wake_thread(entry->value))) { - rb_bug("%s %p freed with live thread(s) waiting", label, addr); - } + if (RTEST(wake_thread(entry->value))) { + if (!ths) ths = rb_ary_new(); + rb_ary_push(ths, entry->value); + } + } + if (ths) { + rb_bug("%s %p freed with live thread(s) %s waiting", + label, addr, RSTRING_PTR(rb_inspect(ths))); } } @@ -303,6 +314,8 @@ typedef struct _Mutex { List waiting; } Mutex; +#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner)) + static void mark_mutex(Mutex *mutex) { @@ -361,7 +374,7 @@ rb_mutex_locked_p(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); - return RTEST(mutex->owner) ? Qtrue : Qfalse; + return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse; } /* @@ -380,7 +393,7 @@ rb_mutex_try_lock(VALUE self) Data_Get_Struct(self, Mutex, mutex); - if (RTEST(mutex->owner)) + if (MUTEX_LOCKED_P(mutex)) return Qfalse; mutex->owner = rb_thread_current(); @@ -403,11 +416,20 @@ lock_mutex(Mutex *mutex) rb_thread_critical = 1; - while (RTEST(mutex->owner)) { - wait_list(&mutex->waiting); - rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + } + else { + push_list(&mutex->waiting, current); + do { + rb_thread_stop(); + rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + break; + } + } while (mutex->owner != current); } - mutex->owner = current; rb_thread_critical = 0; return Qnil; @@ -422,6 +444,22 @@ rb_mutex_lock(VALUE self) return self; } +static VALUE +relock_mutex(Mutex *mutex) +{ + VALUE current = rb_thread_current(); + + switch (rb_thread_status(current)) { + case THREAD_RUNNABLE: + case THREAD_STOPPED: + lock_mutex(mutex); + break; + default: + break; + } + return Qundef; +} + /* * Document-method: unlock * @@ -434,16 +472,12 @@ unlock_mutex_inner(Mutex *mutex) { VALUE waking; - if (!RTEST(mutex->owner)) { - rb_raise(rb_eThreadError, "not owner"); - } - if (mutex->owner != rb_thread_current()) { rb_raise(rb_eThreadError, "not owner"); } - mutex->owner = Qnil; waking = wake_one(&mutex->waiting); + mutex->owner = waking; return waking; } @@ -462,14 +496,11 @@ unlock_mutex(Mutex *mutex) rb_thread_critical = 1; waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0); - - if (waking == Qundef) { + if (!RTEST(waking)) { return Qfalse; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return Qtrue; } @@ -515,13 +546,11 @@ rb_mutex_exclusive_unlock(VALUE self) rb_thread_critical = 1; waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0); - if (waking == Qundef) { + if (waking == Qundef || !RTEST(waking)) { return Qnil; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return self; } @@ -633,13 +662,18 @@ rb_condvar_alloc(VALUE klass) static void wait_condvar(ConditionVariable *condvar, Mutex *mutex) { + VALUE waking; + rb_thread_critical = 1; if (rb_thread_current() != mutex->owner) { rb_thread_critical = 0; rb_raise(rb_eThreadError, "not owner of the synchronization mutex"); } - unlock_mutex_inner(mutex); - rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex); + waking = unlock_mutex_inner(mutex); + if (RTEST(waking)) { + wake_thread(waking); + } + rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex); } static VALUE @@ -835,10 +869,10 @@ rb_queue_marshal_load(VALUE self, VALUE data) array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { - rb_raise(rb_eRuntimeError, "expected Array of queue data"); + rb_raise(rb_eTypeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { - rb_raise(rb_eRuntimeError, "missing capacity value"); + rb_raise(rb_eArgError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array)); push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); |