summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Gruber <luke.gruber@shopify.com>2026-01-26 14:34:37 -0500
committerGitHub <noreply@github.com>2026-01-26 14:34:37 -0500
commit994257ab06072df38de024e70a60aa9a87e36089 (patch)
tree17a803ca00869ee01041b318bc338356e72abddf
parent3c634893e245c578181e8337b4025d1f673d77e8 (diff)
Prevent starvation when acquiring mutex over and over (#15877)
Continually locking a mutex m can lead to starvation if all other threads are on the waitq of m. See https://bugs.ruby-lang.org/issues/21840 for more details. Solution: When a thread `T1` wakes up `T2` during mutex unlock but `T1` or any other thread successfully acquires it before `T2`, then we record the `running_time` of the thread during mutex acquisition. Then during unlock, if that thread's running_time is less than the saved running time, we set it back to the saved time. Fixes [Bug #21840]
-rw-r--r--test/ruby/test_thread.rb33
-rw-r--r--thread_sync.c24
2 files changed, 55 insertions, 2 deletions
diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb
index b2d8e73693..60e3aa772a 100644
--- a/test/ruby/test_thread.rb
+++ b/test/ruby/test_thread.rb
@@ -1664,4 +1664,37 @@ q.pop
assert_operator elapsed, :>=, 0.1, "sub-millisecond sleeps should not return immediately"
end;
end
+
+ # [Bug #21840]
+ def test_mutex_owner_doesnt_starve_waiters
+ assert_ruby_status([], "#{<<~"begin;"}\n#{<<~'end;'}")
+ begin;
+ m = Mutex.new
+
+ fib = lambda { |n|
+ return n if n <= 1
+ fib(n - 1) + fib(n - 2)
+ }
+
+ t1_running = false
+ t1 = Thread.new do
+ t1_running = true
+ loop do
+ fib(20)
+ m.synchronize do
+ File.open(__FILE__) { } # reset timeslice due to blocking operation
+ end
+ end
+ end
+
+ loop until t1_running
+
+ 3.times.map do
+ Thread.new do
+ m.synchronize do
+ end
+ end
+ end.each(&:join)
+ end;
+ end
end
diff --git a/thread_sync.c b/thread_sync.c
index e3916c97cb..2963b6db73 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -10,6 +10,8 @@ typedef struct rb_mutex_struct {
rb_thread_t *th; // even if the fiber is collected, we might need access to the thread in mutex_free
struct rb_mutex_struct *next_mutex;
struct ccan_list_head waitq; /* protected by GVL */
+ uint32_t saved_running_time_us;
+ bool wait_waking; // Is there a thread waiting to be woken up by this mutex? Reset during every wakeup.
} rb_mutex_t;
/* sync_waiter is always on-stack */
@@ -212,8 +214,15 @@ mutex_locked(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
static inline bool
do_mutex_trylock(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
{
+ // NOTE: we can successfully lock a mutex even if there are other threads waiting on it. First one to it wins.
if (mutex->ec_serial == 0) {
RUBY_DEBUG_LOG("%p ok", mutex);
+ if (mutex->wait_waking) {
+ // If we acquired `mutex` without contention and before the thread that was popped off the waitq, we're going
+ // to set our running_time back to what it was here during mutex unlock if it got reset during our critical
+ // section. This is to prevent starvation of other threads waiting on the mutex.
+ mutex->saved_running_time_us = th->running_time_us;
+ }
mutex_locked(mutex, th, ec_serial);
return true;
@@ -350,7 +359,8 @@ do_mutex_lock(struct mutex_args *args, int interruptible_p)
}
ccan_list_del(&sync_waiter.node);
- // unlocked by another thread while sleeping
+ // If mutex->ec_serial != 0, the mutex was locked by another thread before we had the chance to acquire it.
+ // We'll put ourselves on the waitq and sleep again.
if (!mutex->ec_serial) {
mutex_set_owner(mutex, th, ec_serial);
}
@@ -391,6 +401,7 @@ do_mutex_lock(struct mutex_args *args, int interruptible_p)
if (saved_ints) th->ec->interrupt_flag = saved_ints;
if (mutex->ec_serial == ec_serial) mutex_locked(mutex, th, ec_serial);
+ mutex->wait_waking = false;
}
RUBY_DEBUG_LOG("%p locked", mutex);
@@ -454,6 +465,15 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
struct sync_waiter *cur = 0, *next;
+
+ if (mutex->wait_waking) {
+ uint32_t saved = mutex->saved_running_time_us;
+ if (th->running_time_us < saved) {
+ th->running_time_us = saved;
+ }
+ }
+
+ mutex->saved_running_time_us = 0;
mutex->ec_serial = 0;
thread_mutex_remove(th, mutex);
@@ -469,6 +489,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
+ mutex->wait_waking = true;
rb_threadptr_interrupt(cur->th);
return NULL;
case THREAD_STOPPED: /* probably impossible */
@@ -480,7 +501,6 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
}
}
}
-
// We did not find any threads to wake up, so we can just return with no error:
return NULL;
}