summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-11 20:47:25 +1200
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-14 16:44:09 +1200
commit0f613cc5f1bbe319ab916be905de263523ef5402 (patch)
tree84f707d136241822a547a222e40a98efb57aca49
parent1a0cfe28390ce5d46f7b854eaad2b9b979c160de (diff)
Add support for ConditionVariable.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3434
-rw-r--r--test/fiber/scheduler.rb4
-rw-r--r--test/fiber/test_mutex.rb37
-rw-r--r--thread_sync.c75
3 files changed, 84 insertions, 32 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index b03058a210..7003d88417 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -97,7 +97,9 @@ class Scheduler
end
def kernel_sleep(duration = nil)
- @waiting[Fiber.current] = current_time + duration
+ if duration
+ @waiting[Fiber.current] = current_time + duration
+ end
Fiber.yield
diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb
index 393a44fc2f..21034128a1 100644
--- a/test/fiber/test_mutex.rb
+++ b/test/fiber/test_mutex.rb
@@ -47,6 +47,43 @@ class TestFiberMutex < Test::Unit::TestCase
thread.join
end
+ def test_condition_variable
+ mutex = Mutex.new
+ condition = ConditionVariable.new
+
+ signalled = 0
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber.schedule do
+ mutex.synchronize do
+ 3.times do
+ condition.wait(mutex)
+ signalled += 1
+ end
+ end
+ end
+
+ Fiber.schedule do
+ 3.times do
+ mutex.synchronize do
+ condition.signal
+ end
+
+ sleep 0.1
+ end
+ end
+
+ scheduler.run
+ end
+
+ thread.join
+
+ assert signalled > 1
+ end
+
def test_mutex_deadlock
err = /No live threads left. Deadlock\?/
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
diff --git a/thread_sync.c b/thread_sync.c
index 9dd3b32645..bd60231789 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -4,8 +4,16 @@
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
+/* Mutex */
+typedef struct rb_mutex_struct {
+ rb_fiber_t *fiber;
+ struct rb_mutex_struct *next_mutex;
+ struct list_head waitq; /* protected by GVL */
+} rb_mutex_t;
+
/* sync_waiter is always on-stack */
struct sync_waiter {
+ VALUE self;
rb_thread_t *th;
rb_fiber_t *fiber;
struct list_node node;
@@ -19,12 +27,17 @@ sync_wakeup(struct list_head *head, long max)
struct sync_waiter *cur = 0, *next;
list_for_each_safe(head, cur, next, node) {
- list_del_init(&cur->node);
- if (cur->th->status != THREAD_KILLED) {
- rb_threadptr_interrupt(cur->th);
- cur->th->status = THREAD_RUNNABLE;
- if (--max == 0) return;
- }
+ list_del_init(&cur->node);
+
+ if (cur->th->scheduler != Qnil) {
+ rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
+ }
+
+ if (cur->th->status != THREAD_KILLED) {
+ rb_threadptr_interrupt(cur->th);
+ cur->th->status = THREAD_RUNNABLE;
+ if (--max == 0) return;
+ }
}
}
@@ -40,16 +53,6 @@ wakeup_all(struct list_head *head)
sync_wakeup(head, LONG_MAX);
}
-/* Mutex */
-
-typedef struct rb_mutex_struct {
- VALUE self;
-
- rb_fiber_t *fiber;
- struct rb_mutex_struct *next_mutex;
- struct list_head waitq; /* protected by GVL */
-} rb_mutex_t;
-
#if defined(HAVE_WORKING_FORK)
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
@@ -151,7 +154,6 @@ mutex_alloc(VALUE klass)
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
- mutex->self = obj;
list_head_init(&mutex->waitq);
return obj;
}
@@ -247,8 +249,8 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
- rb_thread_t *th = GET_THREAD();
rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
@@ -260,6 +262,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (rb_mutex_trylock(self) == Qfalse) {
struct sync_waiter w = {
+ .self = self,
.th = th,
.fiber = fiber
};
@@ -398,7 +401,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
list_del_init(&cur->node);
if (cur->th->scheduler != Qnil) {
- rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
+ rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
switch (cur->th->status) {
@@ -498,7 +501,6 @@ rb_mutex_wait_for(VALUE time)
VALUE
rb_mutex_sleep(VALUE self, VALUE timeout)
{
- time_t beg, end;
struct timeval t;
if (!NIL_P(timeout)) {
@@ -506,18 +508,23 @@ rb_mutex_sleep(VALUE self, VALUE timeout)
}
rb_mutex_unlock(self);
- beg = time(0);
- if (NIL_P(timeout)) {
- rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
+ time_t beg = time(0);
+
+ VALUE scheduler = rb_thread_current_scheduler();
+ if (scheduler != Qnil) {
+ rb_scheduler_kernel_sleep(scheduler, timeout);
+ mutex_lock_uninterruptible(self);
+ } else {
+ if (NIL_P(timeout)) {
+ rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
+ } else {
+ rb_hrtime_t rel = rb_timeval2hrtime(&t);
+ rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
+ }
}
- else {
- rb_hrtime_t rel = rb_timeval2hrtime(&t);
- rb_ensure(rb_mutex_wait_for, (VALUE)&rel,
- mutex_lock_uninterruptible, self);
- }
RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
- end = time(0) - beg;
+ time_t end = time(0) - beg;
return INT2FIX(end);
}
@@ -1429,13 +1436,19 @@ delete_from_waitq(VALUE v)
static VALUE
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
{
+ rb_execution_context_t *ec = GET_EC();
+
struct rb_condvar *cv = condvar_ptr(self);
struct sleep_call args;
- struct sync_waiter w;
rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
- w.th = GET_THREAD();
+ struct sync_waiter w = {
+ .self = args.mutex,
+ .th = ec->thread_ptr,
+ .fiber = ec->fiber_ptr,
+ };
+
list_add_tail(&cv->waitq, &w.node);
rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);