summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--scheduler.c24
-rw-r--r--test/fiber/scheduler.rb8
-rw-r--r--test/fiber/test_scheduler.rb18
-rw-r--r--test/fiber/test_sleep.rb22
-rw-r--r--test/fiber/test_thread.rb20
-rw-r--r--thread.c156
6 files changed, 167 insertions, 81 deletions
diff --git a/scheduler.c b/scheduler.c
index 88db433f1e..66cbfc6f10 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -49,12 +49,36 @@ rb_scheduler_get(void)
return thread->scheduler;
}
+static void
+verify_interface(VALUE scheduler)
+{
+ if (!rb_respond_to(scheduler, id_block)) {
+ rb_raise(rb_eArgError, "Scheduler must implement #block!");
+ }
+
+ if (!rb_respond_to(scheduler, id_unblock)) {
+ rb_raise(rb_eArgError, "Scheduler must implement #unblock!");
+ }
+
+ if (!rb_respond_to(scheduler, id_kernel_sleep)) {
+ rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep!");
+ }
+
+ if (!rb_respond_to(scheduler, id_io_wait)) {
+ rb_raise(rb_eArgError, "Scheduler must implement #io_wait!");
+ }
+}
+
VALUE
rb_scheduler_set(VALUE scheduler)
{
rb_thread_t *thread = GET_THREAD();
VM_ASSERT(thread);
+ if (scheduler != Qnil) {
+ verify_interface(scheduler);
+ }
+
// We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
if (thread->scheduler != Qnil) {
rb_scheduler_close(thread->scheduler);
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index b3c3eaff59..0553e386f5 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -188,3 +188,11 @@ class Scheduler
return fiber
end
end
+
+class BrokenUnblockScheduler < Scheduler
+ def unblock(blocker, fiber)
+ super
+
+ raise "Broken unblock!"
+ end
+end
diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb
index 72bde9fcc3..d1fb89d693 100644
--- a/test/fiber/test_scheduler.rb
+++ b/test/fiber/test_scheduler.rb
@@ -66,9 +66,23 @@ class TestFiberScheduler < Test::Unit::TestCase
RUBY
end
- def test_optional_close
+ def test_minimal_interface
+ scheduler = Object.new
+
+ def scheduler.block
+ end
+
+ def scheduler.unblock
+ end
+
+ def scheduler.io_wait
+ end
+
+ def scheduler.kernel_sleep
+ end
+
thread = Thread.new do
- Fiber.set_scheduler Object.new
+ Fiber.set_scheduler scheduler
end
thread.join
diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb
index e882766345..844369740f 100644
--- a/test/fiber/test_sleep.rb
+++ b/test/fiber/test_sleep.rb
@@ -43,4 +43,26 @@ class TestFiberSleep < Test::Unit::TestCase
assert_operator seconds, :>=, 2, "actual: %p" % seconds
end
+
+ def test_broken_sleep
+ thread = Thread.new do
+ Thread.current.report_on_exception = false
+
+ scheduler = Scheduler.new
+
+ def scheduler.kernel_sleep(duration = nil)
+ raise "Broken sleep!"
+ end
+
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ sleep 0
+ end
+ end
+
+ assert_raise(RuntimeError) do
+ thread.join
+ end
+ end
end
diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb
index 5fc80f0e6c..b7323d7237 100644
--- a/test/fiber/test_thread.rb
+++ b/test/fiber/test_thread.rb
@@ -42,4 +42,24 @@ class TestFiberThread < Test::Unit::TestCase
assert_equal :done, thread.value
end
+
+ def test_broken_unblock
+ thread = Thread.new do
+ Thread.current.report_on_exception = false
+
+ scheduler = BrokenUnblockScheduler.new
+
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ Thread.new{}.join
+ end
+
+ scheduler.run
+ end
+
+ assert_raise(RuntimeError) do
+ thread.join
+ end
+ end
end
diff --git a/thread.c b/thread.c
index ec7a9b760f..508772c8fe 100644
--- a/thread.c
+++ b/thread.c
@@ -539,9 +539,12 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
static void
rb_threadptr_join_list_wakeup(rb_thread_t *thread)
{
- struct rb_waiting_list *join_list = thread->join_list;
+ while (thread->join_list) {
+ struct rb_waiting_list *join_list = thread->join_list;
+
+ // Consume the entry from the join list:
+ thread->join_list = join_list->next;
- while (join_list) {
rb_thread_t *target_thread = join_list->thread;
if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
@@ -557,25 +560,20 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
break;
}
}
-
- join_list = join_list->next;
}
}
void
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
{
- const char *err;
- rb_mutex_t *mutex;
- rb_mutex_t *mutexes = th->keeping_mutexes;
+ while (th->keeping_mutexes) {
+ rb_mutex_t *mutex = th->keeping_mutexes;
+ th->keeping_mutexes = mutex->next_mutex;
+
+ /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */
- while (mutexes) {
- mutex = mutexes;
- /* rb_warn("mutex #<%p> remains to be locked by terminated thread",
- (void *)mutexes); */
- mutexes = mutex->next_mutex;
- err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
- if (err) rb_bug("invalid keeping_mutexes: %s", err);
+ const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
+ if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
}
}
@@ -816,87 +814,87 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
- {
- thread_debug("thread start (get lock): %p\n", (void *)th);
+ thread_debug("thread start (get lock): %p\n", (void *)th);
- EC_PUSH_TAG(th->ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- SAVE_ROOT_JMPBUF(th, thread_do_start(th));
- }
- else {
- errinfo = th->ec->errinfo;
+ EC_PUSH_TAG(th->ec);
- if (state == TAG_FATAL) {
- if (th->invoke_type == thread_invoke_type_ractor_proc) {
- rb_ractor_atexit(th->ec, Qnil);
- }
- /* fatal error within this thread, need to stop whole script */
- }
- else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
- /* exit on main_thread. */
- }
- else {
- if (th->report_on_exception) {
- VALUE mesg = rb_thread_to_s(th->self);
- rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
- rb_write_error_str(mesg);
- rb_ec_error_print(th->ec, errinfo);
- }
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ SAVE_ROOT_JMPBUF(th, thread_do_start(th));
+ } else {
+ errinfo = th->ec->errinfo;
- if (th->invoke_type == thread_invoke_type_ractor_proc) {
- rb_ractor_atexit_exception(th->ec);
- }
+ if (state == TAG_FATAL) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit(th->ec, Qnil);
+ }
+ /* fatal error within this thread, need to stop whole script */
+ }
+ else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
+ /* exit on main_thread. */
+ }
+ else {
+ if (th->report_on_exception) {
+ VALUE mesg = rb_thread_to_s(th->self);
+ rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
+ rb_write_error_str(mesg);
+ rb_ec_error_print(th->ec, errinfo);
+ }
- if (th->vm->thread_abort_on_exception ||
- th->abort_on_exception || RTEST(ruby_debug)) {
- /* exit on main_thread */
- }
- else {
- errinfo = Qnil;
- }
- }
- th->value = Qnil;
- }
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
- if (th->invoke_type == thread_invoke_type_ractor_proc) {
- rb_thread_terminate_all(th);
- rb_ractor_teardown(th->ec);
+ if (th->vm->thread_abort_on_exception ||
+ th->abort_on_exception || RTEST(ruby_debug)) {
+ /* exit on main_thread */
+ }
+ else {
+ errinfo = Qnil;
+ }
}
+ th->value = Qnil;
+ }
- th->status = THREAD_KILLED;
- thread_debug("thread end: %p\n", (void *)th);
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_thread_terminate_all(th);
+ rb_ractor_teardown(th->ec);
+ }
- if (th->vm->ractor.main_thread == th) {
- ruby_stop(0);
- }
+ th->status = THREAD_KILLED;
+ thread_debug("thread end: %p\n", (void *)th);
- if (RB_TYPE_P(errinfo, T_OBJECT)) {
- /* treat with normal error object */
- rb_threadptr_raise(ractor_main_th, 1, &errinfo);
- }
- EC_POP_TAG();
+ if (th->vm->ractor.main_thread == th) {
+ ruby_stop(0);
+ }
- rb_ec_clear_current_thread_trace_func(th->ec);
+ if (RB_TYPE_P(errinfo, T_OBJECT)) {
+ /* treat with normal error object */
+ rb_threadptr_raise(ractor_main_th, 1, &errinfo);
+ }
- /* locking_mutex must be Qfalse */
- if (th->locking_mutex != Qfalse) {
- rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
- (void *)th, th->locking_mutex);
- }
+ rb_threadptr_join_list_wakeup(th);
+ rb_threadptr_unlock_all_locking_mutexes(th);
- if (ractor_main_th->status == THREAD_KILLED &&
- th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
- /* I'm last thread. wake up main thread from rb_thread_terminate_all */
- rb_threadptr_interrupt(ractor_main_th);
- }
+ EC_POP_TAG();
+
+ rb_ec_clear_current_thread_trace_func(th->ec);
- rb_threadptr_join_list_wakeup(th);
- rb_threadptr_unlock_all_locking_mutexes(th);
- rb_check_deadlock(th->ractor);
+ /* locking_mutex must be Qfalse */
+ if (th->locking_mutex != Qfalse) {
+ rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
+ (void *)th, th->locking_mutex);
+ }
- rb_fiber_close(th->ec->fiber_ptr);
+ if (ractor_main_th->status == THREAD_KILLED &&
+ th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
+ /* I'm last thread. wake up main thread from rb_thread_terminate_all */
+ rb_threadptr_interrupt(ractor_main_th);
}
+ rb_check_deadlock(th->ractor);
+
+ rb_fiber_close(th->ec->fiber_ptr);
+
thread_cleanup_func(th, FALSE);
VM_ASSERT(th->ec->vm_stack == NULL);