diff options
author | Benoit Daloze <eregontp@gmail.com> | 2020-09-17 14:45:44 +0200 |
---|---|---|
committer | Benoit Daloze <eregontp@gmail.com> | 2020-09-17 14:59:38 +0200 |
commit | 9472d160619861201bad91560c06248fef8362dc (patch) | |
tree | a2cf2586ca2c920c6f64f58971128982c8669dda | |
parent | 738a089b3ac55e5bfb26f9121ee73ffcb8c4a36b (diff) |
Call scheduler.block instead of scheduler.kernel_sleep for blocking Queue/SizedQueue operations
* scheduler.unblock was already already called before but with no corresponding scheduler.block
* add test that Queue#pop makes the scheduler wait until it gets an element.
-rw-r--r-- | test/fiber/scheduler.rb | 3 | ||||
-rw-r--r-- | test/fiber/test_mutex.rb | 25 | ||||
-rw-r--r-- | thread.c | 6 | ||||
-rw-r--r-- | thread_sync.c | 10 |
4 files changed, 35 insertions, 9 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 1e7f60cdf0..f7a773ade1 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -93,6 +93,7 @@ class Scheduler end ensure @urgent.each(&:close) + @urgent = nil end def current_time @@ -139,7 +140,7 @@ class Scheduler end if io = @urgent&.last - @urgent.last.write_nonblock('.') + io.write_nonblock('.') end end diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb index 1f53ae1a1f..baec559a70 100644 --- a/test/fiber/test_mutex.rb +++ b/test/fiber/test_mutex.rb @@ -115,6 +115,31 @@ class TestFiberMutex < Test::Unit::TestCase assert processed == 3 end + def test_queue_pop_waits + queue = Queue.new + running = false + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + result = nil + Fiber.schedule do + result = queue.pop + end + + running = true + scheduler.run + result + end + + Thread.pass until running + sleep 0.1 + + queue << :done + assert_equal :done, thread.value + end + def test_mutex_deadlock err = /No live threads left. Deadlock\?/ assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false @@ -134,7 +134,7 @@ rb_thread_local_storage(VALUE thread) static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); static void sleep_forever(rb_thread_t *th, unsigned int fl); -static void rb_thread_sleep_deadly_allow_spurious_wakeup(void); +static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker); static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_ractor_t *r); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); @@ -1485,11 +1485,11 @@ rb_thread_sleep_interruptible(void) } static void -rb_thread_sleep_deadly_allow_spurious_wakeup(void) +rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker) { VALUE scheduler = rb_thread_current_scheduler(); if (scheduler != Qnil) { - rb_scheduler_kernel_sleepv(scheduler, 0, NULL); + rb_scheduler_block(scheduler, blocker); } else { thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); diff --git a/thread_sync.c b/thread_sync.c index 5b1e4f288f..741bff6160 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -483,9 +483,9 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) #endif static VALUE -rb_mutex_sleep_forever(VALUE time) +rb_mutex_sleep_forever(VALUE self) { - rb_thread_sleep_deadly_allow_spurious_wakeup(); + rb_thread_sleep_deadly_allow_spurious_wakeup(self); return Qnil; } @@ -516,7 +516,7 @@ rb_mutex_sleep(VALUE self, VALUE timeout) mutex_lock_uninterruptible(self); } else { if (NIL_P(timeout)) { - rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self); + rb_ensure(rb_mutex_sleep_forever, self, mutex_lock_uninterruptible, self); } else { rb_hrtime_t rel = rb_timeval2hrtime(&t); rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self); @@ -904,9 +904,9 @@ rb_queue_push(VALUE self, VALUE obj) } static VALUE -queue_sleep(VALUE arg) +queue_sleep(VALUE self) { - rb_thread_sleep_deadly_allow_spurious_wakeup(); + rb_thread_sleep_deadly_allow_spurious_wakeup(self); return Qnil; } |