summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authornobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2007-09-20 18:05:40 +0000
committernobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2007-09-20 18:05:40 +0000
commit02da16b16777bb21dcd7a91c895fb4fe0e2075a9 (patch)
tree595825d651d787a536ade33d51bfbbaffc6eace1 /ext
parentb21b9a46cfadc785f0001b2b34edc0392d79161b (diff)
* eval.c, intern.h, ext/thread/thread.c: should not free queue while
any live threads are waiting. [ruby-dev:30653] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_1_8@13476 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'ext')
-rw-r--r--ext/thread/thread.c106
1 files changed, 54 insertions, 52 deletions
diff --git a/ext/thread/thread.c b/ext/thread/thread.c
index 700fc90bae..68f172220a 100644
--- a/ext/thread/thread.c
+++ b/ext/thread/thread.c
@@ -21,11 +21,12 @@ static VALUE rb_cSizedQueue;
static VALUE set_critical(VALUE value);
static VALUE
-thread_exclusive_do(void)
+thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg)
{
- rb_thread_critical = 1;
+ VALUE critical = rb_thread_critical;
- return rb_yield(Qundef);
+ rb_thread_critical = 1;
+ return rb_ensure(func, arg, set_critical, (VALUE)critical);
}
/*
@@ -40,7 +41,7 @@ thread_exclusive_do(void)
static VALUE
rb_thread_exclusive(void)
{
- return rb_ensure(thread_exclusive_do, Qundef, set_critical, rb_thread_critical);
+ return thread_exclusive(rb_yield, Qundef);
}
typedef struct _Entry {
@@ -144,7 +145,7 @@ shift_list(List *list)
VALUE value;
entry = list->entries;
- if (!entry) return Qundef;
+ if (!entry) return Qnil;
list->entries = entry->next;
if (entry == list->last_entry) {
@@ -207,15 +208,16 @@ array_from_list(List const *list)
static VALUE
wake_thread(VALUE thread)
{
- return rb_rescue2(rb_thread_wakeup, thread,
- NULL, Qundef, rb_eThreadError, 0);
+ return rb_thread_wakeup_alive(thread);
}
static VALUE
run_thread(VALUE thread)
{
- return rb_rescue2(rb_thread_run, thread,
- NULL, Qundef, rb_eThreadError, 0);
+ thread = wake_thread(thread);
+ if (RTEST(thread) && !rb_thread_critical)
+ rb_thread_schedule();
+ return thread;
}
static VALUE
@@ -225,7 +227,7 @@ wake_one(List *list)
waking = Qnil;
while (list->entries && !RTEST(waking)) {
- waking = wake_thread(shift_list(list));
+ waking = wake_thread(shift_list(list));
}
return waking;
@@ -263,13 +265,12 @@ wait_list(List *list)
}
static void
-assert_no_survivors(List *waiting, const char *label, void *addr)
+kill_waiting_threads(List *waiting)
{
Entry *entry;
+
for (entry = waiting->entries; entry; entry = entry->next) {
- if (RTEST(wake_thread(entry->value))) {
- rb_bug("%s %p freed with live thread(s) waiting", label, addr);
- }
+ rb_thread_kill(entry->value);
}
}
@@ -303,6 +304,8 @@ typedef struct _Mutex {
List waiting;
} Mutex;
+#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner))
+
static void
mark_mutex(Mutex *mutex)
{
@@ -319,7 +322,7 @@ finalize_mutex(Mutex *mutex)
static void
free_mutex(Mutex *mutex)
{
- assert_no_survivors(&mutex->waiting, "mutex", mutex);
+ kill_waiting_threads(&mutex->waiting);
finalize_mutex(mutex);
xfree(mutex);
}
@@ -361,7 +364,7 @@ rb_mutex_locked_p(VALUE self)
{
Mutex *mutex;
Data_Get_Struct(self, Mutex, mutex);
- return RTEST(mutex->owner) ? Qtrue : Qfalse;
+ return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse;
}
/*
@@ -380,7 +383,7 @@ rb_mutex_try_lock(VALUE self)
Data_Get_Struct(self, Mutex, mutex);
- if (RTEST(mutex->owner))
+ if (MUTEX_LOCKED_P(mutex))
return Qfalse;
mutex->owner = rb_thread_current();
@@ -403,11 +406,19 @@ lock_mutex(Mutex *mutex)
rb_thread_critical = 1;
- while (RTEST(mutex->owner)) {
- wait_list(&mutex->waiting);
- rb_thread_critical = 1;
+ if (!MUTEX_LOCKED_P(mutex)) {
+ mutex->owner = current;
+ }
+ else {
+ do {
+ wait_list(&mutex->waiting);
+ rb_thread_critical = 1;
+ if (!MUTEX_LOCKED_P(mutex)) {
+ mutex->owner = current;
+ break;
+ }
+ } while (mutex->owner != current);
}
- mutex->owner = current;
rb_thread_critical = 0;
return Qnil;
@@ -434,16 +445,12 @@ unlock_mutex_inner(Mutex *mutex)
{
VALUE waking;
- if (!RTEST(mutex->owner)) {
- rb_raise(rb_eThreadError, "not owner");
- }
-
if (mutex->owner != rb_thread_current()) {
rb_raise(rb_eThreadError, "not owner");
}
- mutex->owner = Qnil;
waking = wake_one(&mutex->waiting);
+ mutex->owner = waking;
return waking;
}
@@ -458,18 +465,13 @@ set_critical(VALUE value)
static VALUE
unlock_mutex(Mutex *mutex)
{
- VALUE waking;
+ VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex);
- rb_thread_critical = 1;
- waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0);
-
- if (waking == Qundef) {
+ if (!RTEST(waking)) {
return Qfalse;
}
- if (RTEST(waking)) {
- run_thread(waking);
- }
+ run_thread(waking);
return Qtrue;
}
@@ -512,16 +514,13 @@ rb_mutex_exclusive_unlock(VALUE self)
VALUE waking;
Data_Get_Struct(self, Mutex, mutex);
- rb_thread_critical = 1;
- waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0);
+ waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex);
- if (waking == Qundef) {
+ if (!RTEST(waking)) {
return Qnil;
}
- if (RTEST(waking)) {
- run_thread(waking);
- }
+ run_thread(waking);
return self;
}
@@ -592,7 +591,7 @@ finalize_condvar(ConditionVariable *condvar)
static void
free_condvar(ConditionVariable *condvar)
{
- assert_no_survivors(&condvar->waiting, "condition variable", condvar);
+ kill_waiting_threads(&condvar->waiting);
finalize_condvar(condvar);
xfree(condvar);
}
@@ -633,12 +632,17 @@ rb_condvar_alloc(VALUE klass)
static void
wait_condvar(ConditionVariable *condvar, Mutex *mutex)
{
+ VALUE waking;
+
rb_thread_critical = 1;
if (rb_thread_current() != mutex->owner) {
rb_thread_critical = 0;
rb_raise(rb_eThreadError, "not owner of the synchronization mutex");
}
- unlock_mutex_inner(mutex);
+ waking = unlock_mutex_inner(mutex);
+ if (RTEST(waking)) {
+ wake_thread(waking);
+ }
rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex);
}
@@ -697,8 +701,7 @@ rb_condvar_broadcast(VALUE self)
Data_Get_Struct(self, ConditionVariable, condvar);
- rb_thread_critical = 1;
- rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0);
+ thread_exclusive(wake_all, (VALUE)&condvar->waiting);
rb_thread_schedule();
return self;
@@ -715,9 +718,8 @@ rb_condvar_broadcast(VALUE self)
static void
signal_condvar(ConditionVariable *condvar)
{
- VALUE waking;
- rb_thread_critical = 1;
- waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0);
+ VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting);
+
if (RTEST(waking)) {
run_thread(waking);
}
@@ -792,9 +794,9 @@ finalize_queue(Queue *queue)
static void
free_queue(Queue *queue)
{
- assert_no_survivors(&queue->mutex.waiting, "queue", queue);
- assert_no_survivors(&queue->space_available.waiting, "queue(push)", queue);
- assert_no_survivors(&queue->value_available.waiting, "queue(pop)", queue);
+ kill_waiting_threads(&queue->mutex.waiting);
+ kill_waiting_threads(&queue->space_available.waiting);
+ kill_waiting_threads(&queue->value_available.waiting);
finalize_queue(queue);
xfree(queue);
}
@@ -835,10 +837,10 @@ rb_queue_marshal_load(VALUE self, VALUE data)
array = rb_marshal_load(data);
if (TYPE(array) != T_ARRAY) {
- rb_raise(rb_eRuntimeError, "expected Array of queue data");
+ rb_raise(rb_eTypeError, "expected Array of queue data");
}
if (RARRAY(array)->len < 1) {
- rb_raise(rb_eRuntimeError, "missing capacity value");
+ rb_raise(rb_eArgError, "missing capacity value");
}
queue->capacity = NUM2ULONG(rb_ary_shift(array));
push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len);