summaryrefslogtreecommitdiff
path: root/ext/thread/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/thread/thread.c')
-rw-r--r--ext/thread/thread.c60
1 files changed, 51 insertions, 9 deletions
diff --git a/ext/thread/thread.c b/ext/thread/thread.c
index ac81b2e3ba..298d7d08ca 100644
--- a/ext/thread/thread.c
+++ b/ext/thread/thread.c
@@ -440,6 +440,12 @@ lock_mutex(Mutex *mutex)
}
static VALUE
+lock_mutex_call(VALUE mutex)
+{
+ return lock_mutex((Mutex *)mutex);
+}
+
+static VALUE
rb_mutex_lock(VALUE self)
{
Mutex *mutex;
@@ -492,6 +498,12 @@ unlock_mutex(Mutex *mutex)
}
static VALUE
+unlock_mutex_call(VALUE mutex)
+{
+ return unlock_mutex((Mutex *)mutex);
+}
+
+static VALUE
rb_mutex_unlock(VALUE self)
{
Mutex *mutex;
@@ -644,9 +656,18 @@ rb_condvar_alloc(VALUE klass)
*
*/
+static void condvar_wakeup(Mutex *mutex);
+
static void
wait_condvar(ConditionVariable *condvar, Mutex *mutex)
{
+ condvar_wakeup(mutex);
+ rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex_call, (VALUE)mutex);
+}
+
+static void
+condvar_wakeup(Mutex *mutex)
+{
VALUE waking;
rb_thread_critical = 1;
@@ -658,7 +679,6 @@ wait_condvar(ConditionVariable *condvar, Mutex *mutex)
if (RTEST(waking)) {
wake_thread(waking);
}
- rb_ensure(wait_list, (VALUE)&condvar->waiting, lock_mutex, (VALUE)mutex);
}
static VALUE
@@ -741,6 +761,13 @@ signal_condvar(ConditionVariable *condvar)
}
static VALUE
+signal_condvar_call(VALUE condvar)
+{
+ signal_condvar((ConditionVariable *)condvar);
+ return Qundef;
+}
+
+static VALUE
rb_condvar_signal(VALUE self)
{
ConditionVariable *condvar;
@@ -964,6 +991,16 @@ rb_queue_num_waiting(VALUE self)
return result;
}
+static void
+wait_queue(ConditionVariable *condvar, Mutex *mutex)
+{
+ condvar_wakeup(mutex);
+ wait_list(&condvar->waiting);
+ lock_mutex(mutex);
+}
+
+static VALUE queue_pop_inner(VALUE arg);
+
/*
* Document-method: pop
* call_seq: pop(non_block=false)
@@ -979,7 +1016,6 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
Queue *queue;
int should_block;
- VALUE result;
Data_Get_Struct(self, Queue, queue);
if (argc == 0) {
@@ -997,15 +1033,21 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self)
}
while (!queue->values.entries) {
- wait_condvar(&queue->value_available, &queue->mutex);
+ wait_queue(&queue->value_available, &queue->mutex);
}
- result = shift_list(&queue->values);
+ return rb_ensure(queue_pop_inner, (VALUE)queue,
+ unlock_mutex_call, (VALUE)&queue->mutex);
+}
+
+static VALUE
+queue_pop_inner(VALUE arg)
+{
+ Queue *queue = (Queue *)arg;
+ VALUE result = shift_list(&queue->values);
if (queue->capacity && queue->values.size < queue->capacity) {
signal_condvar(&queue->space_available);
}
- unlock_mutex(&queue->mutex);
-
return result;
}
@@ -1025,11 +1067,11 @@ rb_queue_push(VALUE self, VALUE value)
lock_mutex(&queue->mutex);
while (queue->capacity && queue->values.size >= queue->capacity) {
- wait_condvar(&queue->space_available, &queue->mutex);
+ wait_queue(&queue->space_available, &queue->mutex);
}
push_list(&queue->values, value);
- signal_condvar(&queue->value_available);
- unlock_mutex(&queue->mutex);
+ rb_ensure(signal_condvar_call, (VALUE)&queue->value_available,
+ unlock_mutex_call, (VALUE)&queue->mutex);
return self;
}