diff options
-rw-r--r-- | test/fiber/scheduler.rb | 26 | ||||
-rw-r--r-- | thread_sync.c | 86 |
2 files changed, 53 insertions, 59 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index fa05daf886..b03058a210 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -15,11 +15,9 @@ class Scheduler @writable = {} @waiting = {} - @urgent = nil - @lock = Mutex.new @locking = 0 - @ready = [] + @ready = Array.new end attr :readable @@ -51,11 +49,17 @@ class Scheduler # puts "writable: #{writable}" if writable&.any? readable&.each do |io| - @readable[io]&.resume + if fiber = @readable.delete(io) + fiber.resume + elsif io == @urgent.first + @urgent.first.read_nonblock(1024) + end end writable&.each do |io| - @writable[io]&.resume + if fiber = @writable.delete(io) + fiber.resume + end end if @waiting.any? @@ -73,9 +77,6 @@ class Scheduler end if @ready.any? - # Clear out the urgent notification pipe. - @urgent.first.read_nonblock(1024) - ready = nil @lock.synchronize do @@ -114,9 +115,6 @@ class Scheduler Fiber.yield - @readable.delete(io) - @writable.delete(io) - return true end @@ -130,10 +128,10 @@ class Scheduler def mutex_unlock(mutex, fiber) @lock.synchronize do @ready << fiber + end - if @urgent - @urgent.last.write('.') - end + if io = @urgent&.last + @urgent.last.write_nonblock('.') end end diff --git a/thread_sync.c b/thread_sync.c index cfdd62635a..9dd3b32645 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -264,12 +264,12 @@ do_mutex_lock(VALUE self, int interruptible_p) .fiber = fiber }; - if (mutex->fiber == fiber) { - rb_raise(rb_eThreadError, "deadlock; recursive locking"); - } + if (mutex->fiber == fiber) { + rb_raise(rb_eThreadError, "deadlock; recursive locking"); + } - VALUE scheduler = rb_thread_current_scheduler(); - while (mutex->fiber != fiber) { + while (mutex->fiber != fiber) { + VALUE scheduler = rb_thread_current_scheduler(); if (scheduler != Qnil) { list_add_tail(&mutex->waitq, &w.node); @@ -279,52 +279,48 @@ do_mutex_lock(VALUE self, int interruptible_p) if (!mutex->fiber) { mutex->fiber = fiber; - break; - } else { - // Try again... - continue; } - } - - enum rb_thread_status prev_status = th->status; - rb_hrtime_t *timeout = 0; - rb_hrtime_t rel = rb_msec2hrtime(100); - - th->status = THREAD_STOPPED_FOREVER; - th->locking_mutex = self; - rb_ractor_sleeper_threads_inc(th->ractor); - /* - * Carefully! while some contended threads are in native_sleep(), - * ractor->sleeper is unstable value. we have to avoid both deadlock - * and busy loop. - */ - if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) && - !patrol_thread) { - timeout = &rel; - patrol_thread = th; - } + } else { + enum rb_thread_status prev_status = th->status; + rb_hrtime_t *timeout = 0; + rb_hrtime_t rel = rb_msec2hrtime(100); + + th->status = THREAD_STOPPED_FOREVER; + th->locking_mutex = self; + rb_ractor_sleeper_threads_inc(th->ractor); + /* + * Carefully! while some contended threads are in native_sleep(), + * ractor->sleeper is unstable value. we have to avoid both deadlock + * and busy loop. + */ + if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) && + !patrol_thread) { + timeout = &rel; + patrol_thread = th; + } - list_add_tail(&mutex->waitq, &w.node); + list_add_tail(&mutex->waitq, &w.node); - native_sleep(th, timeout); /* release GVL */ + native_sleep(th, timeout); /* release GVL */ - list_del(&w.node); + list_del(&w.node); - if (!mutex->fiber) { - mutex->fiber = fiber; - } + if (!mutex->fiber) { + mutex->fiber = fiber; + } - if (patrol_thread == th) - patrol_thread = NULL; + if (patrol_thread == th) + patrol_thread = NULL; - th->locking_mutex = Qfalse; - if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { - rb_check_deadlock(th->ractor); - } - if (th->status == THREAD_STOPPED_FOREVER) { - th->status = prev_status; - } - rb_ractor_sleeper_threads_dec(th->ractor); + th->locking_mutex = Qfalse; + if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { + rb_check_deadlock(th->ractor); + } + if (th->status == THREAD_STOPPED_FOREVER) { + th->status = prev_status; + } + rb_ractor_sleeper_threads_dec(th->ractor); + } if (interruptible_p) { /* release mutex before checking for interrupts...as interrupt checking @@ -335,7 +331,7 @@ do_mutex_lock(VALUE self, int interruptible_p) mutex->fiber = fiber; } } - } + } if (mutex->fiber == fiber) mutex_locked(th, self); } |