From 02da16b16777bb21dcd7a91c895fb4fe0e2075a9 Mon Sep 17 00:00:00 2001 From: nobu Date: Thu, 20 Sep 2007 18:05:40 +0000 Subject: * eval.c, intern.h, ext/thread/thread.c: should not free queue while any live threads are waiting. [ruby-dev:30653] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_1_8@13476 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 5 +++ eval.c | 14 +++++-- ext/thread/thread.c | 106 ++++++++++++++++++++++++++-------------------------- intern.h | 2 + 4 files changed, 72 insertions(+), 55 deletions(-) diff --git a/ChangeLog b/ChangeLog index 6ae570373b..0800afb9fa 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +Fri Sep 21 03:05:35 2007 Nobuyoshi Nakada + + * eval.c, intern.h, ext/thread/thread.c: should not free queue while + any live threads are waiting. [ruby-dev:30653] + Fri Sep 21 01:46:19 2007 Nobuyoshi Nakada * string.c (str_alloc): defaults to null_str instead of NULL. diff --git a/eval.c b/eval.c index a1f4eb3eb9..5903dc97c1 100644 --- a/eval.c +++ b/eval.c @@ -6826,7 +6826,6 @@ rb_load(fname, wrap) volatile VALUE self = ruby_top_self; NODE *volatile last_node; NODE *saved_cref = ruby_cref; - TMP_PROTECT; if (wrap && ruby_safe_level >= 4) { StringValue(fname); @@ -11233,11 +11232,20 @@ rb_thread_list() VALUE rb_thread_wakeup(thread) VALUE thread; +{ + if (!RTEST(rb_thread_wakeup_alive(thread))) + rb_raise(rb_eThreadError, "killed thread"); + return thread; +} + +VALUE +rb_thread_wakeup_alive(thread) + VALUE thread; { rb_thread_t th = rb_thread_check(thread); if (th->status == THREAD_KILLED) - rb_raise(rb_eThreadError, "killed thread"); + return Qnil; rb_thread_ready(th); return thread; @@ -12173,7 +12181,7 @@ rb_thread_status(thread) * thr.alive? #=> false */ -static VALUE +VALUE rb_thread_alive_p(thread) VALUE thread; { diff --git a/ext/thread/thread.c b/ext/thread/thread.c index 700fc90bae..68f172220a 100644 --- a/ext/thread/thread.c +++ b/ext/thread/thread.c @@ -21,11 +21,12 @@ static VALUE rb_cSizedQueue; static VALUE set_critical(VALUE value); static VALUE -thread_exclusive_do(void) +thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg) { - rb_thread_critical = 1; + VALUE critical = rb_thread_critical; - return rb_yield(Qundef); + rb_thread_critical = 1; + return rb_ensure(func, arg, set_critical, (VALUE)critical); } /* @@ -40,7 +41,7 @@ thread_exclusive_do(void) static VALUE rb_thread_exclusive(void) { - return rb_ensure(thread_exclusive_do, Qundef, set_critical, rb_thread_critical); + return thread_exclusive(rb_yield, Qundef); } typedef struct _Entry { @@ -144,7 +145,7 @@ shift_list(List *list) VALUE value; entry = list->entries; - if (!entry) return Qundef; + if (!entry) return Qnil; list->entries = entry->next; if (entry == list->last_entry) { @@ -207,15 +208,16 @@ array_from_list(List const *list) static VALUE wake_thread(VALUE thread) { - return rb_rescue2(rb_thread_wakeup, thread, - NULL, Qundef, rb_eThreadError, 0); + return rb_thread_wakeup_alive(thread); } static VALUE run_thread(VALUE thread) { - return rb_rescue2(rb_thread_run, thread, - NULL, Qundef, rb_eThreadError, 0); + thread = wake_thread(thread); + if (RTEST(thread) && !rb_thread_critical) + rb_thread_schedule(); + return thread; } static VALUE @@ -225,7 +227,7 @@ wake_one(List *list) waking = Qnil; while (list->entries && !RTEST(waking)) { - waking = wake_thread(shift_list(list)); + waking = wake_thread(shift_list(list)); } return waking; @@ -263,13 +265,12 @@ wait_list(List *list) } static void -assert_no_survivors(List *waiting, const char *label, void *addr) +kill_waiting_threads(List *waiting) { Entry *entry; + for (entry = waiting->entries; entry; entry = entry->next) { - if (RTEST(wake_thread(entry->value))) { - rb_bug("%s %p freed with live thread(s) waiting", label, addr); - } + rb_thread_kill(entry->value); } } @@ -303,6 +304,8 @@ typedef struct _Mutex { List waiting; } Mutex; +#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner)) + static void mark_mutex(Mutex *mutex) { @@ -319,7 +322,7 @@ finalize_mutex(Mutex *mutex) static void free_mutex(Mutex *mutex) { - assert_no_survivors(&mutex->waiting, "mutex", mutex); + kill_waiting_threads(&mutex->waiting); finalize_mutex(mutex); xfree(mutex); } @@ -361,7 +364,7 @@ rb_mutex_locked_p(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); - return RTEST(mutex->owner) ? Qtrue : Qfalse; + return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse; } /* @@ -380,7 +383,7 @@ rb_mutex_try_lock(VALUE self) Data_Get_Struct(self, Mutex, mutex); - if (RTEST(mutex->owner)) + if (MUTEX_LOCKED_P(mutex)) return Qfalse; mutex->owner = rb_thread_current(); @@ -403,11 +406,19 @@ lock_mutex(Mutex *mutex) rb_thread_critical = 1; - while (RTEST(mutex->owner)) { - wait_list(&mutex->waiting); - rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + } + else { + do { + wait_list(&mutex->waiting); + rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + break; + } + } while (mutex->owner != current); } - mutex->owner = current; rb_thread_critical = 0; return Qnil; @@ -434,16 +445,12 @@ unlock_mutex_inner(Mutex *mutex) { VALUE waking; - if (!RTEST(mutex->owner)) { - rb_raise(rb_eThreadError, "not owner"); - } - if (mutex->owner != rb_thread_current()) { rb_raise(rb_eThreadError, "not owner"); } - mutex->owner = Qnil; waking = wake_one(&mutex->waiting); + mutex->owner = waking; return waking; } @@ -458,18 +465,13 @@ set_critical(VALUE value) static VALUE unlock_mutex(Mutex *mutex) { - VALUE waking; + VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex); - rb_thread_critical = 1; - waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0); - - if (waking == Qundef) { + if (!RTEST(waking)) { return Qfalse; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return Qtrue; } @@ -512,16 +514,13 @@ rb_mutex_exclusive_unlock(VALUE self) VALUE waking; Data_Get_Struct(self, Mutex, mutex); - rb_thread_critical = 1; - waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0); + waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex); - if (waking == Qundef) { + if (!RTEST(waking)) { return Qnil; } - if (RTEST(waking)) { - run_thread(waking); - } + run_thread(waking); return self; } @@ -592,7 +591,7 @@ finalize_condvar(ConditionVariable *condvar) static void free_condvar(ConditionVariable *condvar) { - assert_no_survivors(&condvar->waiting, "condition variable", condvar); + kill_waiting_threads(&condvar->waiting); finalize_condvar(condvar); xfree(condvar); } @@ -633,12 +632,17 @@ rb_condvar_alloc(VALUE klass) static void wait_condvar(ConditionVariable *condvar, Mutex *mutex) { + VALUE waking; + rb_thread_critical = 1; if (rb_thread_current() != mutex->owner) { rb_thread_critical = 0; rb_raise(rb_eThreadError, "not owner of the synchronization mutex"); } - unlock_mutex_inner(mutex); + waking = unlock_mutex_inner(mutex); + if (RTEST(waking)) { + wake_thread(waking); + } rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex); } @@ -697,8 +701,7 @@ rb_condvar_broadcast(VALUE self) Data_Get_Struct(self, ConditionVariable, condvar); - rb_thread_critical = 1; - rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0); + thread_exclusive(wake_all, (VALUE)&condvar->waiting); rb_thread_schedule(); return self; @@ -715,9 +718,8 @@ rb_condvar_broadcast(VALUE self) static void signal_condvar(ConditionVariable *condvar) { - VALUE waking; - rb_thread_critical = 1; - waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0); + VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting); + if (RTEST(waking)) { run_thread(waking); } @@ -792,9 +794,9 @@ finalize_queue(Queue *queue) static void free_queue(Queue *queue) { - assert_no_survivors(&queue->mutex.waiting, "queue", queue); - assert_no_survivors(&queue->space_available.waiting, "queue(push)", queue); - assert_no_survivors(&queue->value_available.waiting, "queue(pop)", queue); + kill_waiting_threads(&queue->mutex.waiting); + kill_waiting_threads(&queue->space_available.waiting); + kill_waiting_threads(&queue->value_available.waiting); finalize_queue(queue); xfree(queue); } @@ -835,10 +837,10 @@ rb_queue_marshal_load(VALUE self, VALUE data) array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { - rb_raise(rb_eRuntimeError, "expected Array of queue data"); + rb_raise(rb_eTypeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { - rb_raise(rb_eRuntimeError, "missing capacity value"); + rb_raise(rb_eArgError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array)); push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); diff --git a/intern.h b/intern.h index 0dc3bffe55..47da14bb7a 100644 --- a/intern.h +++ b/intern.h @@ -207,8 +207,10 @@ void rb_thread_sleep _((int)); void rb_thread_sleep_forever _((void)); VALUE rb_thread_stop _((void)); VALUE rb_thread_wakeup _((VALUE)); +VALUE rb_thread_wakeup_alive _((VALUE)); VALUE rb_thread_run _((VALUE)); VALUE rb_thread_kill _((VALUE)); +VALUE rb_thread_alive_p _((VALUE)); VALUE rb_thread_create _((VALUE (*)(ANYARGS), void*)); void rb_thread_interrupt _((void)); void rb_thread_trap_eval _((VALUE, int, int)); -- cgit v1.2.3