summaryrefslogtreecommitdiff
path: root/ext/thread/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/thread/thread.c')
-rw-r--r--ext/thread/thread.c60
1 files changed, 51 insertions, 9 deletions
diff --git a/ext/thread/thread.c b/ext/thread/thread.c
index ae08a61770..f59f704707 100644
--- a/ext/thread/thread.c
+++ b/ext/thread/thread.c
@@ -445,6 +445,12 @@ lock_mutex(Mutex *mutex)
}
static VALUE
+lock_mutex_call(VALUE mutex)
+{
+ return lock_mutex((Mutex *)mutex);
+}
+
+static VALUE
rb_mutex_lock(VALUE self)
{
Mutex *mutex;
@@ -513,6 +519,12 @@ unlock_mutex(Mutex *mutex)
}
static VALUE
+unlock_mutex_call(VALUE mutex)
+{
+ return unlock_mutex((Mutex *)mutex);
+}
+
+static VALUE
rb_mutex_unlock(VALUE self)
{
Mutex *mutex;
@@ -665,9 +677,18 @@ rb_condvar_alloc(VALUE klass)
*
*/
+static void condvar_wakeup(Mutex *mutex);
+
static void
wait_condvar(ConditionVariable *condvar, Mutex *mutex)
{
+ condvar_wakeup(mutex);
+ rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex_call, (VALUE)mutex);
+}
+
+static void
+condvar_wakeup(Mutex *mutex)
+{
VALUE waking;
rb_thread_critical = 1;
@@ -679,7 +700,6 @@ wait_condvar(ConditionVariable *condvar, Mutex *mutex)
if (RTEST(waking)) {
wake_thread(waking);
}
- rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex);
}
static VALUE
@@ -762,6 +782,13 @@ signal_condvar(ConditionVariable *condvar)
}
static VALUE
+signal_condvar_call(VALUE condvar)
+{
+ signal_condvar((ConditionVariable *)condvar);
+ return Qundef;
+}
+
+static VALUE
rb_condvar_signal(VALUE self)
{
ConditionVariable *condvar;
@@ -985,6 +1012,16 @@ rb_queue_num_waiting(VALUE self)
return result;
}
+static void
+wait_queue(ConditionVariable *condvar, Mutex *mutex)
+{
+ condvar_wakeup(mutex);
+ wait_list(&condvar->waiting);
+ lock_mutex(mutex);
+}
+
+static VALUE queue_pop_inner(VALUE arg);
+
/*
* Document-method: pop
* call_seq: pop(non_block=false)
@@ -1000,7 +1037,6 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
Queue *queue;
int should_block;
- VALUE result;
Data_Get_Struct(self, Queue, queue);
if (argc == 0) {
@@ -1018,15 +1054,21 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self)
}
while (!queue->values.entries) {
- wait_condvar(&queue->value_available, &queue->mutex);
+ wait_queue(&queue->value_available, &queue->mutex);
}
- result = shift_list(&queue->values);
+ return rb_ensure(queue_pop_inner, (VALUE)queue,
+ unlock_mutex_call, (VALUE)&queue->mutex);
+}
+
+static VALUE
+queue_pop_inner(VALUE arg)
+{
+ Queue *queue = (Queue *)arg;
+ VALUE result = shift_list(&queue->values);
if (queue->capacity && queue->values.size < queue->capacity) {
signal_condvar(&queue->space_available);
}
- unlock_mutex(&queue->mutex);
-
return result;
}
@@ -1046,11 +1088,11 @@ rb_queue_push(VALUE self, VALUE value)
lock_mutex(&queue->mutex);
while (queue->capacity && queue->values.size >= queue->capacity) {
- wait_condvar(&queue->space_available, &queue->mutex);
+ wait_queue(&queue->space_available, &queue->mutex);
}
push_list(&queue->values, value);
- signal_condvar(&queue->value_available);
- unlock_mutex(&queue->mutex);
+ rb_ensure(signal_condvar_call, (VALUE)&queue->value_available,
+ unlock_mutex_call, (VALUE)&queue->mutex);
return self;
}