summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cont.c11
-rw-r--r--internal/cont.h2
-rw-r--r--internal/scheduler.h3
-rw-r--r--scheduler.c14
-rw-r--r--spec/ruby/core/mutex/owned_spec.rb12
-rw-r--r--test/fiber/scheduler.rb46
-rw-r--r--test/fiber/test_mutex.rb38
-rw-r--r--thread.c11
-rw-r--r--thread_sync.c106
9 files changed, 194 insertions, 49 deletions
diff --git a/cont.c b/cont.c
index d228107b9a..0304f4c60e 100644
--- a/cont.c
+++ b/cont.c
@@ -851,6 +851,12 @@ NOINLINE(static VALUE cont_capture(volatile int *volatile stat));
if (!(th)->ec->tag) rb_raise(rb_eThreadError, "not running thread"); \
} while (0)
+rb_thread_t*
+rb_fiber_threadptr(const rb_fiber_t *fiber)
+{
+ return fiber->cont.saved_ec.thread_ptr;
+}
+
static VALUE
cont_thread_value(const rb_context_t *cont)
{
@@ -1146,6 +1152,11 @@ cont_new(VALUE klass)
return cont;
}
+VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber)
+{
+ return fiber->cont.self;
+}
+
void
rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber)
{
diff --git a/internal/cont.h b/internal/cont.h
index 81874aa5c7..a365cbe978 100644
--- a/internal/cont.h
+++ b/internal/cont.h
@@ -20,4 +20,6 @@ void rb_fiber_reset_root_local_storage(struct rb_thread_struct *);
void ruby_register_rollback_func_for_ensure(VALUE (*ensure_func)(VALUE), VALUE (*rollback_func)(VALUE));
void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber);
+VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber);
+
#endif /* INTERNAL_CONT_H */
diff --git a/internal/scheduler.h b/internal/scheduler.h
index f5a41af064..44872e3b10 100644
--- a/internal/scheduler.h
+++ b/internal/scheduler.h
@@ -17,6 +17,9 @@ VALUE rb_scheduler_timeout(struct timeval *timeout);
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
+VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex);
+VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber);
+
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);
VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io);
VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
diff --git a/scheduler.c b/scheduler.c
index 9821d07636..9ecc40cf6c 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -12,6 +12,8 @@
#include "ruby/io.h"
static ID id_kernel_sleep;
+static ID id_mutex_lock;
+static ID id_mutex_unlock;
static ID id_io_read;
static ID id_io_write;
static ID id_io_wait;
@@ -20,6 +22,8 @@ void
Init_Scheduler(void)
{
id_kernel_sleep = rb_intern_const("kernel_sleep");
+ id_mutex_lock = rb_intern_const("mutex_lock");
+ id_mutex_unlock = rb_intern_const("mutex_unlock");
id_io_read = rb_intern_const("io_read");
id_io_write = rb_intern_const("io_write");
id_io_wait = rb_intern_const("io_wait");
@@ -44,6 +48,16 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
+VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex)
+{
+ return rb_funcall(scheduler, id_mutex_lock, 1, mutex);
+}
+
+VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber)
+{
+ return rb_funcall(scheduler, id_mutex_unlock, 2, mutex, fiber);
+}
+
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
{
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
diff --git a/spec/ruby/core/mutex/owned_spec.rb b/spec/ruby/core/mutex/owned_spec.rb
index e66062534e..f881622965 100644
--- a/spec/ruby/core/mutex/owned_spec.rb
+++ b/spec/ruby/core/mutex/owned_spec.rb
@@ -40,4 +40,16 @@ describe "Mutex#owned?" do
m.owned?.should be_false
end
end
+
+ ruby_version_is "2.8" do
+ it "is held per Fiber" do
+ m = Mutex.new
+ m.lock
+
+ Fiber.new do
+ m.locked?.should == true
+ m.owned?.should == false
+ end.resume
+ end
+ end
end
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 1f690b4c08..fa05daf886 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -14,6 +14,12 @@ class Scheduler
@readable = {}
@writable = {}
@waiting = {}
+
+ @urgent = nil
+
+ @lock = Mutex.new
+ @locking = 0
+ @ready = []
end
attr :readable
@@ -35,9 +41,11 @@ class Scheduler
end
def run
- while @readable.any? or @writable.any? or @waiting.any?
+ @urgent = IO.pipe
+
+ while @readable.any? or @writable.any? or @waiting.any? or @locking.positive?
# Can only handle file descriptors up to 1024...
- readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
+ readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
@@ -63,7 +71,24 @@ class Scheduler
end
end
end
+
+ if @ready.any?
+ # Clear out the urgent notification pipe.
+ @urgent.first.read_nonblock(1024)
+
+ ready = nil
+
+ @lock.synchronize do
+ ready, @ready = @ready, Array.new
+ end
+
+ ready.each do |fiber|
+ fiber.resume
+ end
+ end
end
+ ensure
+ @urgent.each(&:close)
end
def current_time
@@ -95,6 +120,23 @@ class Scheduler
return true
end
+ def mutex_lock(mutex)
+ @locking += 1
+ Fiber.yield
+ ensure
+ @locking -= 1
+ end
+
+ def mutex_unlock(mutex, fiber)
+ @lock.synchronize do
+ @ready << fiber
+
+ if @urgent
+ @urgent.last.write('.')
+ end
+ end
+ end
+
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb
index 5179959a6a..393a44fc2f 100644
--- a/test/fiber/test_mutex.rb
+++ b/test/fiber/test_mutex.rb
@@ -14,7 +14,7 @@ class TestFiberMutex < Test::Unit::TestCase
assert_equal Thread.scheduler, scheduler
mutex.synchronize do
- assert_nil Thread.scheduler
+ assert Thread.scheduler
end
end
end
@@ -22,7 +22,35 @@ class TestFiberMutex < Test::Unit::TestCase
thread.join
end
+ def test_mutex_interleaved_locking
+ mutex = Mutex.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber.schedule do
+ mutex.lock
+ sleep 0.1
+ mutex.unlock
+ end
+
+ Fiber.schedule do
+ mutex.lock
+ sleep 0.1
+ mutex.unlock
+ end
+
+ scheduler.run
+ end
+
+ thread.join
+ end
+
def test_mutex_deadlock
+ err = /No live threads left. Deadlock\?/
+ assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
+ require 'scheduler'
mutex = Mutex.new
thread = Thread.new do
@@ -30,18 +58,18 @@ class TestFiberMutex < Test::Unit::TestCase
Thread.current.scheduler = scheduler
Fiber.schedule do
- assert_equal Thread.scheduler, scheduler
+ raise unless Thread.scheduler == scheduler
mutex.synchronize do
+ puts 'in synchronize'
Fiber.yield
end
end
- assert_raise ThreadError do
- mutex.lock
- end
+ mutex.lock
end
thread.join
+ RUBY
end
end
diff --git a/thread.c b/thread.c
index d0ebfff882..c4ff5aafde 100644
--- a/thread.c
+++ b/thread.c
@@ -75,11 +75,13 @@
#include "hrtime.h"
#include "internal.h"
#include "internal/class.h"
+#include "internal/cont.h"
#include "internal/error.h"
#include "internal/hash.h"
#include "internal/io.h"
#include "internal/object.h"
#include "internal/proc.h"
+#include "internal/scheduler.h"
#include "internal/signal.h"
#include "internal/thread.h"
#include "internal/time.h"
@@ -548,7 +550,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
/* rb_warn("mutex #<%p> remains to be locked by terminated thread",
(void *)mutexes); */
mutexes = mutex->next_mutex;
- err = rb_mutex_unlock_th(mutex, th);
+ err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
if (err) rb_bug("invalid keeping_mutexes: %s", err);
}
}
@@ -5040,7 +5042,7 @@ rb_thread_shield_wait(VALUE self)
if (!mutex) return Qfalse;
m = mutex_ptr(mutex);
- if (m->th == GET_THREAD()) return Qnil;
+ if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
rb_thread_shield_waiting_inc(self);
rb_mutex_lock(mutex);
rb_thread_shield_waiting_dec(self);
@@ -5540,7 +5542,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
- (void *)mutex->th, rb_mutex_num_waiting(mutex));
+ (void *)mutex->fiber, rb_mutex_num_waiting(mutex));
}
{
@@ -5574,8 +5576,7 @@ rb_check_deadlock(rb_ractor_t *r)
}
else if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
-
- if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) {
+ if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !list_empty(&mutex->waitq))) {
found = 1;
}
}
diff --git a/thread_sync.c b/thread_sync.c
index deb3858c31..cfdd62635a 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -7,6 +7,7 @@ static VALUE rb_eClosedQueueError;
/* sync_waiter is always on-stack */
struct sync_waiter {
rb_thread_t *th;
+ rb_fiber_t *fiber;
struct list_node node;
};
@@ -42,7 +43,9 @@ wakeup_all(struct list_head *head)
/* Mutex */
typedef struct rb_mutex_struct {
- rb_thread_t *th;
+ VALUE self;
+
+ rb_fiber_t *fiber;
struct rb_mutex_struct *next_mutex;
struct list_head waitq; /* protected by GVL */
} rb_mutex_t;
@@ -52,7 +55,7 @@ static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
#endif
-static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th);
+static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
/*
* Document-class: Mutex
@@ -93,13 +96,15 @@ rb_mutex_num_waiting(rb_mutex_t *mutex)
return n;
}
+rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
+
static void
mutex_free(void *ptr)
{
rb_mutex_t *mutex = ptr;
- if (mutex->th) {
+ if (mutex->fiber) {
/* rb_warn("free locked mutex"); */
- const char *err = rb_mutex_unlock_th(mutex, mutex->th);
+ const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
if (err) rb_bug("%s", err);
}
ruby_xfree(ptr);
@@ -145,6 +150,8 @@ mutex_alloc(VALUE klass)
rb_mutex_t *mutex;
obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
+
+ mutex->self = obj;
list_head_init(&mutex->waitq);
return obj;
}
@@ -178,7 +185,7 @@ rb_mutex_locked_p(VALUE self)
{
rb_mutex_t *mutex = mutex_ptr(self);
- return mutex->th ? Qtrue : Qfalse;
+ return mutex->fiber ? Qtrue : Qfalse;
}
static void
@@ -191,7 +198,7 @@ mutex_locked(rb_thread_t *th, VALUE self)
}
th->keeping_mutexes = mutex;
- th->blocking += 1;
+ // th->blocking += 1;
}
/*
@@ -207,9 +214,10 @@ rb_mutex_trylock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
VALUE locked = Qfalse;
- if (mutex->th == 0) {
+ if (mutex->fiber == 0) {
+ rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_thread_t *th = GET_THREAD();
- mutex->th = th;
+ mutex->fiber = fiber;
locked = Qtrue;
mutex_locked(th, self);
@@ -226,9 +234,9 @@ rb_mutex_trylock(VALUE self)
static const rb_thread_t *patrol_thread = NULL;
static VALUE
-mutex_owned_p(rb_thread_t *th, rb_mutex_t *mutex)
+mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
{
- if (mutex->th == th) {
+ if (mutex->fiber == fiber) {
return Qtrue;
}
else {
@@ -240,6 +248,8 @@ static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
rb_thread_t *th = GET_THREAD();
+ rb_execution_context_t *ec = GET_EC();
+ rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
/* When running trap handler */
@@ -249,15 +259,33 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
if (rb_mutex_trylock(self) == Qfalse) {
- struct sync_waiter w;
+ struct sync_waiter w = {
+ .th = th,
+ .fiber = fiber
+ };
- if (mutex->th == th) {
+ if (mutex->fiber == fiber) {
rb_raise(rb_eThreadError, "deadlock; recursive locking");
}
- w.th = th;
+ VALUE scheduler = rb_thread_current_scheduler();
+ while (mutex->fiber != fiber) {
+ if (scheduler != Qnil) {
+ list_add_tail(&mutex->waitq, &w.node);
+
+ rb_scheduler_mutex_lock(scheduler, self);
- while (mutex->th != th) {
+ list_del(&w.node);
+
+ if (!mutex->fiber) {
+ mutex->fiber = fiber;
+ break;
+ } else {
+ // Try again...
+ continue;
+ }
+ }
+
enum rb_thread_status prev_status = th->status;
rb_hrtime_t *timeout = 0;
rb_hrtime_t rel = rb_msec2hrtime(100);
@@ -277,18 +305,20 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
list_add_tail(&mutex->waitq, &w.node);
- native_sleep(th, timeout); /* release GVL */
+
+ native_sleep(th, timeout); /* release GVL */
+
list_del(&w.node);
- if (!mutex->th) {
- mutex->th = th;
+ if (!mutex->fiber) {
+ mutex->fiber = fiber;
}
if (patrol_thread == th)
patrol_thread = NULL;
th->locking_mutex = Qfalse;
- if (mutex->th && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
+ if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
rb_check_deadlock(th->ractor);
}
if (th->status == THREAD_STOPPED_FOREVER) {
@@ -299,22 +329,19 @@ do_mutex_lock(VALUE self, int interruptible_p)
if (interruptible_p) {
/* release mutex before checking for interrupts...as interrupt checking
* code might call rb_raise() */
- if (mutex->th == th) mutex->th = 0;
-
+ if (mutex->fiber == fiber) mutex->fiber = 0;
RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
- if (!mutex->th) {
- mutex->th = th;
- mutex_locked(th, self);
+ if (!mutex->fiber) {
+ mutex->fiber = fiber;
}
}
- else {
- if (mutex->th == th) mutex_locked(th, self);
- }
}
+
+ if (mutex->fiber == fiber) mutex_locked(th, self);
}
// assertion
- if (mutex_owned_p(th, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
+ if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
return self;
}
@@ -347,32 +374,37 @@ rb_mutex_lock(VALUE self)
VALUE
rb_mutex_owned_p(VALUE self)
{
- rb_thread_t *th = GET_THREAD();
+ rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
- return mutex_owned_p(th, mutex);
+ return mutex_owned_p(fiber, mutex);
}
static const char *
-rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
+rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
{
const char *err = NULL;
- if (mutex->th == 0) {
+ if (mutex->fiber == 0) {
err = "Attempt to unlock a mutex which is not locked";
}
- else if (mutex->th != th) {
- err = "Attempt to unlock a mutex which is locked by another thread";
+ else if (mutex->fiber != fiber) {
+ err = "Attempt to unlock a mutex which is locked by another thread/fiber";
}
else {
struct sync_waiter *cur = 0, *next;
rb_mutex_t **th_mutex = &th->keeping_mutexes;
- th->blocking -= 1;
+ // th->blocking -= 1;
- mutex->th = 0;
+ mutex->fiber = 0;
list_for_each_safe(&mutex->waitq, cur, next, node) {
list_del_init(&cur->node);
+
+ if (cur->th->scheduler != Qnil) {
+ rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
+ }
+
switch (cur->th->status) {
case THREAD_RUNNABLE: /* from someone else calling Thread#run */
case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
@@ -411,7 +443,7 @@ rb_mutex_unlock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
rb_thread_t *th = GET_THREAD();
- err = rb_mutex_unlock_th(mutex, th);
+ err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
if (err) rb_raise(rb_eThreadError, "%s", err);
return self;
@@ -444,7 +476,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
while (mutexes) {
mutex = mutexes;
mutexes = mutex->next_mutex;
- mutex->th = 0;
+ mutex->fiber = 0;
mutex->next_mutex = 0;
list_head_init(&mutex->waitq);
}