summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2022-10-20 13:38:52 +1300
committerGitHub <noreply@github.com>2022-10-20 13:38:52 +1300
commit7f175e564875b011efb43537907867dd08d659e8 (patch)
treeb382a58d414e0c0014285958099a531d35e80086
parentd9d9005a3a31d0df0b5432eba5d6f2b9bd647cb1 (diff)
Avoid missed wakeup with fiber scheduler and Fiber.blocking. (#6588)
* Ensure that blocked fibers don't prevent valid wakeups.
Notes
Notes: Merged-By: ioquatix <samuel@codeotaku.com>
-rw-r--r--test/fiber/scheduler.rb11
-rw-r--r--test/fiber/test_scheduler.rb42
-rw-r--r--thread.c4
-rw-r--r--thread_sync.c25
4 files changed, 72 insertions, 10 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 3fd41ef6f1..204a297133 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -350,3 +350,14 @@ class SleepingUnblockScheduler < Scheduler
sleep(0.1)
end
end
+
+class SleepingBlockingScheduler < Scheduler
+ def kernel_sleep(duration = nil)
+ # Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
+ Fiber.blocking{sleep 0.0001}
+
+ self.block(:sleep, duration)
+
+ return true
+ end
+end
diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb
index 5a24bff04f..300d30ad63 100644
--- a/test/fiber/test_scheduler.rb
+++ b/test/fiber/test_scheduler.rb
@@ -138,4 +138,46 @@ class TestFiberScheduler < Test::Unit::TestCase
Object.send(:remove_const, :TestFiberSchedulerAutoload)
end
end
+
+ def test_deadlock
+ mutex = Thread::Mutex.new
+ condition = Thread::ConditionVariable.new
+ q = 0.0001
+
+ signaller = Thread.new do
+ loop do
+ mutex.synchronize do
+ condition.signal
+ end
+ sleep q
+ end
+ end
+
+ i = 0
+
+ thread = Thread.new do
+ scheduler = SleepingBlockingScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ 10.times do
+ mutex.synchronize do
+ condition.wait(mutex)
+ sleep q
+ i += 1
+ end
+ end
+ end
+ end
+
+ # Wait for 10 seconds at most... if it doesn't finish, it's deadlocked.
+ thread.join(10)
+
+ # If it's deadlocked, it will never finish, so this will be 0.
+ assert_equal 10, i
+ ensure
+ # Make sure the threads are dead...
+ thread.kill
+ signaller.kill
+ end
end
diff --git a/thread.c b/thread.c
index e1b194861a..d8925e618e 100644
--- a/thread.c
+++ b/thread.c
@@ -404,7 +404,7 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
rb_thread_t *target_thread = join_list->thread;
- if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
+ if (target_thread->scheduler != Qnil && join_list->fiber) {
rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
}
else {
@@ -1091,7 +1091,7 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
struct rb_waiting_list waiter;
waiter.next = target_th->join_list;
waiter.thread = th;
- waiter.fiber = fiber;
+ waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber;
target_th->join_list = &waiter;
struct join_arg arg;
diff --git a/thread_sync.c b/thread_sync.c
index 3888534468..2bcf59137e 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -20,6 +20,16 @@ struct sync_waiter {
struct ccan_list_node node;
};
+static inline rb_fiber_t*
+nonblocking_fiber(rb_fiber_t *fiber)
+{
+ if (rb_fiberptr_blocking(fiber)) {
+ return NULL;
+ }
+
+ return fiber;
+}
+
struct queue_sleep_arg {
VALUE self;
VALUE timeout;
@@ -37,8 +47,7 @@ sync_wakeup(struct ccan_list_head *head, long max)
ccan_list_del_init(&cur->node);
if (cur->th->status != THREAD_KILLED) {
-
- if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
+ if (cur->th->scheduler != Qnil && cur->fiber) {
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
else {
@@ -306,7 +315,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
struct sync_waiter sync_waiter = {
.self = self,
.th = th,
- .fiber = fiber
+ .fiber = nonblocking_fiber(fiber)
};
ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
@@ -339,7 +348,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
struct sync_waiter sync_waiter = {
.self = self,
.th = th,
- .fiber = fiber
+ .fiber = nonblocking_fiber(fiber)
};
ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
@@ -437,7 +446,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
ccan_list_del_init(&cur->node);
- if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
+ if (cur->th->scheduler != Qnil && cur->fiber) {
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
goto found;
}
@@ -1051,7 +1060,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
assert(queue_closed_p(self) == 0);
struct queue_waiter queue_waiter = {
- .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
+ .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
.as = {.q = q}
};
@@ -1258,7 +1267,7 @@ rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_
else {
rb_execution_context_t *ec = GET_EC();
struct queue_waiter queue_waiter = {
- .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
+ .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
.as = {.sq = sq}
};
@@ -1491,7 +1500,7 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self)
struct sync_waiter sync_waiter = {
.self = args.mutex,
.th = ec->thread_ptr,
- .fiber = ec->fiber_ptr
+ .fiber = nonblocking_fiber(ec->fiber_ptr)
};
ccan_list_add_tail(&cv->waitq, &sync_waiter.node);