diff options
-rw-r--r-- | test/fiber/scheduler.rb | 16 | ||||
-rw-r--r-- | test/fiber/test_thread.rb | 39 | ||||
-rw-r--r-- | thread.c | 75 | ||||
-rw-r--r-- | vm.c | 17 |
4 files changed, 111 insertions, 36 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 0553e386f5..df82c2ff93 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -100,8 +100,10 @@ class Scheduler self.run ensure - @urgent.each(&:close) - @urgent = nil + if @urgent + @urgent.each(&:close) + @urgent = nil + end @closed = true @@ -196,3 +198,13 @@ class BrokenUnblockScheduler < Scheduler raise "Broken unblock!" end end + +class SleepingUnblockScheduler < Scheduler + # This method is invoked when the thread is exiting. + def unblock(blocker, fiber) + super + + # This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang. + sleep(0.1) + end +end diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb index 843604b5f1..5c25c43de2 100644 --- a/test/fiber/test_thread.rb +++ b/test/fiber/test_thread.rb @@ -20,6 +20,31 @@ class TestFiberThread < Test::Unit::TestCase assert_equal :done, thread.value end + def test_thread_join_implicit + sleeping = false + finished = false + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleeping = true + sleep(0.1) + finished = true + end + + :done + end + + Thread.pass until sleeping + + thread.join + + assert_equal :done, thread.value + assert finished, "Scheduler thread's task should be finished!" + end + def test_thread_join_blocking thread = Thread.new do scheduler = Scheduler.new @@ -66,4 +91,18 @@ class TestFiberThread < Test::Unit::TestCase thread.join end end + + def test_thread_join_hang + thread = Thread.new do + scheduler = SleepingUnblockScheduler.new + + Fiber.set_scheduler scheduler + + Fiber.schedule do + Thread.new{sleep(0.01)}.value + end + end + + thread.join + end end @@ -631,6 +631,7 @@ thread_cleanup_func_before_exec(void *th_ptr) { rb_thread_t *th = th_ptr; th->status = THREAD_KILLED; + // The thread stack doesn't exist in the forked process: th->ec->machine.stack_start = th->ec->machine.stack_end = NULL; @@ -687,7 +688,7 @@ rb_vm_proc_local_ep(VALUE proc) VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self, int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler); -static void +static VALUE thread_do_start_proc(rb_thread_t *th) { VALUE args = th->invoke_arg.proc.args; @@ -701,7 +702,6 @@ thread_do_start_proc(rb_thread_t *th) th->ec->root_lep = rb_vm_proc_local_ep(procval); th->ec->root_svar = Qfalse; - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); vm_check_ints_blocking(th->ec); if (th->invoke_type == thread_invoke_type_ractor_proc) { @@ -712,11 +712,12 @@ thread_do_start_proc(rb_thread_t *th) rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr); vm_check_ints_blocking(th->ec); - // kick thread - th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self, - args_len, args_ptr, - th->invoke_arg.proc.kw_splat, - VM_BLOCK_HANDLER_NONE); + return rb_vm_invoke_proc_with_self( + th->ec, proc, self, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); } else { args_len = RARRAY_LENINT(args); @@ -732,17 +733,12 @@ thread_do_start_proc(rb_thread_t *th) vm_check_ints_blocking(th->ec); - // kick thread - th->value = rb_vm_invoke_proc(th->ec, proc, - args_len, args_ptr, - th->invoke_arg.proc.kw_splat, - VM_BLOCK_HANDLER_NONE); - } - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); - - if (th->invoke_type == thread_invoke_type_ractor_proc) { - rb_ractor_atexit(th->ec, th->value); + return rb_vm_invoke_proc( + th->ec, proc, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); } } @@ -750,20 +746,33 @@ static void thread_do_start(rb_thread_t *th) { native_set_thread_name(th); + VALUE result = Qundef; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); switch (th->invoke_type) { case thread_invoke_type_proc: + result = thread_do_start_proc(th); + break; + case thread_invoke_type_ractor_proc: - thread_do_start_proc(th); + result = thread_do_start_proc(th); + rb_ractor_atexit(th->ec, result); break; + case thread_invoke_type_func: - th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); + result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); break; + case thread_invoke_type_none: rb_bug("unreachable"); } rb_scheduler_set(Qnil); + + th->value = result; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); } void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); @@ -816,6 +825,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) thread_debug("thread start (get lock): %p\n", (void *)th); + // Ensure that we are not joinable. + VM_ASSERT(th->value == Qundef); + EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -855,6 +867,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) th->value = Qnil; } + // The thread is effectively finished and can be joined. + VM_ASSERT(th->value != Qundef); + + rb_threadptr_join_list_wakeup(th); + rb_threadptr_unlock_all_locking_mutexes(th); + if (th->invoke_type == thread_invoke_type_ractor_proc) { rb_thread_terminate_all(th); rb_ractor_teardown(th->ec); @@ -872,9 +890,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) rb_threadptr_raise(ractor_main_th, 1, &errinfo); } - rb_threadptr_join_list_wakeup(th); - rb_threadptr_unlock_all_locking_mutexes(th); - EC_POP_TAG(); rb_ec_clear_current_thread_trace_func(th->ec); @@ -1151,6 +1166,12 @@ remove_from_join_list(VALUE arg) static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); +static int +thread_finished(rb_thread_t *th) +{ + return th->status == THREAD_KILLED || th->value != Qundef; +} + static VALUE thread_join_sleep(VALUE arg) { @@ -1177,7 +1198,7 @@ thread_join_sleep(VALUE arg) end = rb_hrtime_add(*limit, rb_hrtime_now()); } - while (target_th->status != THREAD_KILLED) { + while (!thread_finished(target_th)) { VALUE scheduler = rb_scheduler_current(); if (scheduler != Qnil) { @@ -3318,11 +3339,11 @@ rb_thread_status(VALUE thread) static VALUE rb_thread_alive_p(VALUE thread) { - if (rb_threadptr_dead(rb_thread_ptr(thread))) { - return Qfalse; + if (thread_finished(rb_thread_ptr(thread))) { + return Qfalse; } else { - return Qtrue; + return Qtrue; } } @@ -3083,6 +3083,8 @@ th_init(rb_thread_t *th, VALUE self) th->thread_id_string[0] = '\0'; #endif + th->value = Qundef; + #if OPT_CALL_THREADED_CODE th->retval = Qundef; #endif @@ -3095,16 +3097,17 @@ static VALUE ruby_thread_init(VALUE self) { rb_thread_t *th = GET_THREAD(); - rb_thread_t *targe_th = rb_thread_ptr(self); + rb_thread_t *target_th = rb_thread_ptr(self); rb_vm_t *vm = th->vm; - targe_th->vm = vm; - th_init(targe_th, self); + target_th->vm = vm; + th_init(target_th, self); + + target_th->top_wrapper = 0; + target_th->top_self = rb_vm_top_self(); + target_th->ec->root_svar = Qfalse; + target_th->ractor = th->ractor; - targe_th->top_wrapper = 0; - targe_th->top_self = rb_vm_top_self(); - targe_th->ec->root_svar = Qfalse; - targe_th->ractor = th->ractor; return self; } |