diff options
| author | Luke Gruber <luke.gruber@shopify.com> | 2026-01-26 14:34:37 -0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-26 14:34:37 -0500 |
| commit | 994257ab06072df38de024e70a60aa9a87e36089 (patch) | |
| tree | 17a803ca00869ee01041b318bc338356e72abddf | |
| parent | 3c634893e245c578181e8337b4025d1f673d77e8 (diff) | |
Prevent starvation when acquiring mutex over and over (#15877)
Continually locking a mutex m can lead to starvation if all other threads are on the waitq of m.
See https://bugs.ruby-lang.org/issues/21840 for more details.
Solution:
When a thread `T1` wakes up `T2` during mutex unlock but `T1` or any other thread successfully acquires it
before `T2`, then we record the `running_time` of the thread during mutex acquisition. Then during unlock, if
that thread's running_time is less than the saved running time, we set it back to the saved time.
Fixes [Bug #21840]
| -rw-r--r-- | test/ruby/test_thread.rb | 33 | ||||
| -rw-r--r-- | thread_sync.c | 24 |
2 files changed, 55 insertions, 2 deletions
diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb index b2d8e73693..60e3aa772a 100644 --- a/test/ruby/test_thread.rb +++ b/test/ruby/test_thread.rb @@ -1664,4 +1664,37 @@ q.pop assert_operator elapsed, :>=, 0.1, "sub-millisecond sleeps should not return immediately" end; end + + # [Bug #21840] + def test_mutex_owner_doesnt_starve_waiters + assert_ruby_status([], "#{<<~"begin;"}\n#{<<~'end;'}") + begin; + m = Mutex.new + + fib = lambda { |n| + return n if n <= 1 + fib(n - 1) + fib(n - 2) + } + + t1_running = false + t1 = Thread.new do + t1_running = true + loop do + fib(20) + m.synchronize do + File.open(__FILE__) { } # reset timeslice due to blocking operation + end + end + end + + loop until t1_running + + 3.times.map do + Thread.new do + m.synchronize do + end + end + end.each(&:join) + end; + end end diff --git a/thread_sync.c b/thread_sync.c index e3916c97cb..2963b6db73 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -10,6 +10,8 @@ typedef struct rb_mutex_struct { rb_thread_t *th; // even if the fiber is collected, we might need access to the thread in mutex_free struct rb_mutex_struct *next_mutex; struct ccan_list_head waitq; /* protected by GVL */ + uint32_t saved_running_time_us; + bool wait_waking; // Is there a thread waiting to be woken up by this mutex? Reset during every wakeup. } rb_mutex_t; /* sync_waiter is always on-stack */ @@ -212,8 +214,15 @@ mutex_locked(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial) static inline bool do_mutex_trylock(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial) { + // NOTE: we can successfully lock a mutex even if there are other threads waiting on it. First one to it wins. if (mutex->ec_serial == 0) { RUBY_DEBUG_LOG("%p ok", mutex); + if (mutex->wait_waking) { + // If we acquired `mutex` without contention and before the thread that was popped off the waitq, we're going + // to set our running_time back to what it was here during mutex unlock if it got reset during our critical + // section. This is to prevent starvation of other threads waiting on the mutex. + mutex->saved_running_time_us = th->running_time_us; + } mutex_locked(mutex, th, ec_serial); return true; @@ -350,7 +359,8 @@ do_mutex_lock(struct mutex_args *args, int interruptible_p) } ccan_list_del(&sync_waiter.node); - // unlocked by another thread while sleeping + // If mutex->ec_serial != 0, the mutex was locked by another thread before we had the chance to acquire it. + // We'll put ourselves on the waitq and sleep again. if (!mutex->ec_serial) { mutex_set_owner(mutex, th, ec_serial); } @@ -391,6 +401,7 @@ do_mutex_lock(struct mutex_args *args, int interruptible_p) if (saved_ints) th->ec->interrupt_flag = saved_ints; if (mutex->ec_serial == ec_serial) mutex_locked(mutex, th, ec_serial); + mutex->wait_waking = false; } RUBY_DEBUG_LOG("%p locked", mutex); @@ -454,6 +465,15 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial) struct sync_waiter *cur = 0, *next; + + if (mutex->wait_waking) { + uint32_t saved = mutex->saved_running_time_us; + if (th->running_time_us < saved) { + th->running_time_us = saved; + } + } + + mutex->saved_running_time_us = 0; mutex->ec_serial = 0; thread_mutex_remove(th, mutex); @@ -469,6 +489,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial) case THREAD_RUNNABLE: /* from someone else calling Thread#run */ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th)); + mutex->wait_waking = true; rb_threadptr_interrupt(cur->th); return NULL; case THREAD_STOPPED: /* probably impossible */ @@ -480,7 +501,6 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial) } } } - // We did not find any threads to wake up, so we can just return with no error: return NULL; } |
