diff options
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 1063 |
1 files changed, 612 insertions, 451 deletions
@@ -73,12 +73,12 @@ #define TH_SCHED(th) (&(th)->ractor->threads.sched) #include "eval_intern.h" -#include "gc.h" #include "hrtime.h" #include "internal.h" #include "internal/class.h" #include "internal/cont.h" #include "internal/error.h" +#include "internal/gc.h" #include "internal/hash.h" #include "internal/io.h" #include "internal/object.h" @@ -89,7 +89,7 @@ #include "internal/time.h" #include "internal/warnings.h" #include "iseq.h" -#include "mjit.h" +#include "rjit.h" #include "ruby/debug.h" #include "ruby/io.h" #include "ruby/thread.h" @@ -100,7 +100,7 @@ #include "vm_debug.h" #include "vm_sync.h" -#if USE_MJIT && defined(HAVE_SYS_WAIT_H) +#if USE_RJIT && defined(HAVE_SYS_WAIT_H) #include <sys/wait.h> #endif @@ -116,11 +116,6 @@ static VALUE sym_immediate; static VALUE sym_on_blocking; static VALUE sym_never; -enum SLEEP_FLAGS { - SLEEP_DEADLOCKABLE = 0x1, - SLEEP_SPURIOUS_CHECK = 0x2 -}; - #define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13 #define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED) @@ -134,8 +129,16 @@ rb_thread_local_storage(VALUE thread) return rb_ivar_get(thread, idLocals); } -static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); +enum SLEEP_FLAGS { + SLEEP_DEADLOCKABLE = 0x01, + SLEEP_SPURIOUS_CHECK = 0x02, + SLEEP_ALLOW_SPURIOUS = 0x04, + SLEEP_NO_CHECKINTS = 0x08, +}; + static void sleep_forever(rb_thread_t *th, unsigned int fl); +static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); + static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end); static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_ractor_t *r); @@ -143,18 +146,16 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static int consume_communication_pipe(int fd); -static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); -void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ +MAYBE_UNUSED(static int consume_communication_pipe(int fd)); -#define eKillSignal INT2FIX(0) -#define eTerminateSignal INT2FIX(1) static volatile int system_working = 1; +static rb_internal_thread_specific_key_t specific_key_count; struct waiting_fd { struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */ rb_thread_t *th; int fd; + struct rb_io_close_wait_list *busy; }; /********************************************************************************/ @@ -174,11 +175,11 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio #define THREAD_BLOCKING_BEGIN(th) do { \ struct rb_thread_sched * const sched = TH_SCHED(th); \ - RB_GC_SAVE_MACHINE_CONTEXT(th); \ - thread_sched_to_waiting(sched); + RB_VM_SAVE_MACHINE_CONTEXT(th); \ + thread_sched_to_waiting((sched), (th)); #define THREAD_BLOCKING_END(th) \ - thread_sched_to_running(sched, th); \ + thread_sched_to_running((sched), (th)); \ rb_ractor_thread_switch(th->ractor, th); \ } while(0) @@ -196,6 +197,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ /* always return true unless fail_if_interrupted */ \ !only_if_constant(fail_if_interrupted, TRUE)) { \ + /* Important that this is inlined into the macro, and not part of \ + * blocking_region_begin - see bug #20493 */ \ + RB_VM_SAVE_MACHINE_CONTEXT(th); \ + thread_sched_to_waiting(TH_SCHED(th), th); \ exec; \ blocking_region_end(th, &__region); \ }; \ @@ -258,13 +263,8 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, } MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start))); -void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ - -static void -ubf_sigwait(void *ignore) -{ - rb_thread_wakeup_timer_thread(0); -} +MAYBE_UNUSED(static bool th_has_dedicated_nt(const rb_thread_t *th)); +MAYBE_UNUSED(static int waitfd_to_waiting_flag(int wfd_event)); #include THREAD_IMPL_SRC @@ -344,19 +344,23 @@ unblock_function_clear(rb_thread_t *th) static void rb_threadptr_interrupt_common(rb_thread_t *th, int trap) { + RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap); + rb_native_mutex_lock(&th->interrupt_lock); + { + if (trap) { + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + } + else { + RUBY_VM_SET_INTERRUPT(th->ec); + } - if (trap) { - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - } - else { - RUBY_VM_SET_INTERRUPT(th->ec); - } - if (th->unblock.func != NULL) { - (th->unblock.func)(th->unblock.arg); - } - else { - /* none */ + if (th->unblock.func != NULL) { + (th->unblock.func)(th->unblock.arg); + } + else { + /* none */ + } } rb_native_mutex_unlock(&th->interrupt_lock); } @@ -364,6 +368,7 @@ rb_threadptr_interrupt_common(rb_thread_t *th, int trap) void rb_threadptr_interrupt(rb_thread_t *th) { + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); rb_threadptr_interrupt_common(th, 0); } @@ -382,7 +387,7 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) if (th != main_thread) { RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE)); - rb_threadptr_pending_interrupt_enque(th, eTerminateSignal); + rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED); rb_threadptr_interrupt(th); RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE)); @@ -411,11 +416,12 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread) rb_threadptr_interrupt(target_thread); switch (target_thread->status) { - case THREAD_STOPPED: - case THREAD_STOPPED_FOREVER: - target_thread->status = THREAD_RUNNABLE; - default: - break; + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + target_thread->status = THREAD_RUNNABLE; + break; + default: + break; } } } @@ -428,7 +434,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) rb_mutex_t *mutex = th->keeping_mutexes; th->keeping_mutexes = mutex->next_mutex; - /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */ + // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th); const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber); if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message); @@ -509,25 +515,22 @@ thread_cleanup_func(void *th_ptr, int atfork) * Unfortunately, we can't release native threading resource at fork * because libc may have unstable locking state therefore touching * a threading resource may cause a deadlock. - * - * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe - * with NPTL, but native_thread_destroy calls pthread_cond_destroy - * which calls free(3), so there is a small memory leak atfork, here. */ - if (atfork) + if (atfork) { + th->nt = NULL; return; + } rb_native_mutex_destroy(&th->interrupt_lock); - native_thread_destroy(th); } static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); static VALUE rb_thread_to_s(VALUE thread); void -ruby_thread_init_stack(rb_thread_t *th) +ruby_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame) { - native_thread_init_stack(th); + native_thread_init_stack(th, local_in_parent_frame); } const VALUE * @@ -583,7 +586,7 @@ thread_do_start_proc(rb_thread_t *th) if (args_len < 8) { /* free proc.args if the length is enough small */ args_ptr = ALLOCA_N(VALUE, args_len); - MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len); + MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR(args), VALUE, args_len); th->invoke_arg.proc.args = Qnil; } else { @@ -601,14 +604,12 @@ thread_do_start_proc(rb_thread_t *th) } } -static void +static VALUE thread_do_start(rb_thread_t *th) { native_set_thread_name(th); VALUE result = Qundef; - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); - switch (th->invoke_type) { case thread_invoke_type_proc: result = thread_do_start_proc(th); @@ -627,34 +628,20 @@ thread_do_start(rb_thread_t *th) rb_bug("unreachable"); } - rb_fiber_scheduler_set(Qnil); - - th->value = result; - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); + return result; } void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); -#define thread_sched_to_dead thread_sched_to_waiting static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start) { - STACK_GROW_DIR_DETECTION; + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + VM_ASSERT(th != th->vm->ractor.main_thread); + enum ruby_tag_type state; VALUE errinfo = Qnil; - size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); rb_thread_t *ractor_main_th = th->ractor->threads.main; - VALUE * vm_stack = NULL; - - VM_ASSERT(th != th->vm->ractor.main_thread); - RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - - // setup native thread - thread_sched_to_running(TH_SCHED(th), th); - ruby_thread_set_native(th); - - RUBY_DEBUG_LOG("got lock. th:%u", rb_th_serial(th)); // setup ractor if (rb_ractor_status_p(th->ractor, ractor_blocking)) { @@ -669,26 +656,34 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) RB_VM_UNLOCK(); } - // This assertion is not passed on win32 env. Check it later. - // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize); - - // setup VM and machine stack - vm_stack = alloca(size * sizeof(VALUE)); - VM_ASSERT(vm_stack); - - rb_ec_initialize_vm_stack(th->ec, vm_stack, size); - th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack); - th->ec->machine.stack_maxsize -= size * sizeof(VALUE); - // Ensure that we are not joinable. VM_ASSERT(UNDEF_P(th->value)); + int fiber_scheduler_closed = 0, event_thread_end_hooked = 0; + VALUE result = Qundef; + EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { - SAVE_ROOT_JMPBUF(th, thread_do_start(th)); + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); + + result = thread_do_start(th); } - else { + + if (!fiber_scheduler_closed) { + fiber_scheduler_closed = 1; + rb_fiber_scheduler_set(Qnil); + } + + if (!event_thread_end_hooked) { + event_thread_end_hooked = 1; + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); + } + + if (state == TAG_NONE) { + // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed. + th->value = result; + } else { errinfo = th->ec->errinfo; VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef); @@ -776,12 +771,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) // after rb_ractor_living_threads_remove() // GC will happen anytime and this ractor can be collected (and destroy GVL). // So gvl_release() should be before it. - thread_sched_to_dead(TH_SCHED(th)); + thread_sched_to_dead(TH_SCHED(th), th); rb_ractor_living_threads_remove(th->ractor, th); } else { rb_ractor_living_threads_remove(th->ractor, th); - thread_sched_to_dead(TH_SCHED(th)); + thread_sched_to_dead(TH_SCHED(th), th); } return 0; @@ -801,6 +796,8 @@ struct thread_create_params { VALUE (*fn)(void *); }; +static void thread_specific_storage_alloc(rb_thread_t *th); + static VALUE thread_create_core(VALUE thval, struct thread_create_params *params) { @@ -808,6 +805,8 @@ thread_create_core(VALUE thval, struct thread_create_params *params) rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec); int err; + thread_specific_storage_alloc(th); + if (OBJ_FROZEN(current_th->thgroup)) { rb_raise(rb_eThreadError, "can't start a new thread (frozen ThreadGroup)"); @@ -905,7 +904,7 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS); th = rb_thread_ptr(thread); if (!threadptr_initialized(th)) { - rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'", + rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'", klass); } return thread; @@ -985,11 +984,11 @@ rb_thread_create(VALUE (*fn)(void *), void *arg) } VALUE -rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) +rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc) { struct thread_create_params params = { .type = thread_invoke_type_ractor_proc, - .g = g, + .g = r, .args = args, .proc = proc, }; @@ -1048,13 +1047,13 @@ thread_join_sleep(VALUE arg) if (scheduler != Qnil) { rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout); + // Check if the target thread is finished after blocking: + if (thread_finished(target_th)) break; + // Otherwise, a timeout occurred: + else return Qfalse; } else if (!limit) { - th->status = THREAD_STOPPED_FOREVER; - rb_ractor_sleeper_threads_inc(th->ractor); - rb_check_deadlock(th->ractor); - native_sleep(th, 0); - rb_ractor_sleeper_threads_dec(th->ractor); + sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS); } else { if (hrtime_update_expire(limit, end)) { @@ -1069,6 +1068,7 @@ thread_join_sleep(VALUE arg) RUBY_DEBUG_LOG("interrupted target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE)); } + return Qtrue; } @@ -1256,32 +1256,6 @@ rb_hrtime_now(void) return rb_timespec2hrtime(&ts); } -static void -sleep_forever(rb_thread_t *th, unsigned int fl) -{ - enum rb_thread_status prev_status = th->status; - enum rb_thread_status status; - int woke; - - status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; - th->status = status; - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - while (th->status == status) { - if (fl & SLEEP_DEADLOCKABLE) { - rb_ractor_sleeper_threads_inc(th->ractor); - rb_check_deadlock(th->ractor); - } - native_sleep(th, 0); - if (fl & SLEEP_DEADLOCKABLE) { - rb_ractor_sleeper_threads_dec(th->ractor); - } - woke = vm_check_ints_blocking(th->ec); - if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) - break; - } - th->status = prev_status; -} - /* * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end" * being uninitialized, maybe other versions, too. @@ -1356,32 +1330,56 @@ sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl) return woke; } +static void +sleep_forever(rb_thread_t *th, unsigned int fl) +{ + enum rb_thread_status prev_status = th->status; + enum rb_thread_status status; + int woke; + + status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; + th->status = status; + + if (!(fl & SLEEP_NO_CHECKINTS)) RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + + while (th->status == status) { + if (fl & SLEEP_DEADLOCKABLE) { + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); + } + { + native_sleep(th, 0); + } + if (fl & SLEEP_DEADLOCKABLE) { + rb_ractor_sleeper_threads_dec(th->ractor); + } + if (fl & SLEEP_ALLOW_SPURIOUS) { + break; + } + + woke = vm_check_ints_blocking(th->ec); + + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) { + break; + } + } + th->status = prev_status; +} + void rb_thread_sleep_forever(void) { - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("forever"); sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK); } void rb_thread_sleep_deadly(void) { - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("deadly"); sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK); } -void -rb_thread_sleep_interruptible(void) -{ - rb_thread_t *th = GET_THREAD(); - enum rb_thread_status prev_status = th->status; - - th->status = THREAD_STOPPED; - native_sleep(th, 0); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = prev_status; -} - static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end) { @@ -1390,7 +1388,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hr rb_fiber_scheduler_block(scheduler, blocker, timeout); } else { - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("..."); if (end) { sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK); } @@ -1454,7 +1452,7 @@ rb_thread_schedule_limits(uint32_t limits_us) if (th->running_time_us >= limits_us) { RUBY_DEBUG_LOG("switch %s", "start"); - RB_GC_SAVE_MACHINE_CONTEXT(th); + RB_VM_SAVE_MACHINE_CONTEXT(th); thread_sched_yield(TH_SCHED(th), th); rb_ractor_thread_switch(th->ractor, th); @@ -1487,10 +1485,7 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, th->status = THREAD_STOPPED; rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); - RUBY_DEBUG_LOG(""); - - RB_GC_SAVE_MACHINE_CONTEXT(th); - thread_sched_to_waiting(TH_SCHED(th)); + RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id); return TRUE; } else { @@ -1515,8 +1510,12 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) th->status = region->prev_status; } - RUBY_DEBUG_LOG(""); + RUBY_DEBUG_LOG("end"); + +#ifndef _WIN32 + // GET_THREAD() clears WSAGetLastError() VM_ASSERT(th == GET_THREAD()); +#endif } void * @@ -1540,15 +1539,14 @@ rb_nogvl(void *(*func)(void *), void *data1, if (flags & RB_NOGVL_UBF_ASYNC_SAFE) { vm->ubf_async_safe = 1; } - else { - ubf_th = rb_thread_start_unblock_thread(); - } } + rb_vm_t *volatile saved_vm = vm; BLOCKING_REGION(th, { val = func(data1); - saved_errno = errno; + saved_errno = rb_errno(); }, ubf, data2, flags & RB_NOGVL_INTR_FAIL); + vm = saved_vm; if (is_main_thread) vm->ubf_async_safe = 0; @@ -1560,7 +1558,7 @@ rb_nogvl(void *(*func)(void *), void *data1, thread_value(rb_thread_kill(ubf_th)); } - errno = saved_errno; + rb_errno_set(saved_errno); return val; } @@ -1664,18 +1662,130 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, return rb_nogvl(func, data1, ubf, data2, 0); } +static int +waitfd_to_waiting_flag(int wfd_event) +{ + return wfd_event << 1; +} + +static void +thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd) +{ + wfd->fd = fd; + wfd->th = th; + wfd->busy = NULL; + + RB_VM_LOCK_ENTER(); + { + ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node); + } + RB_VM_LOCK_LEAVE(); +} + +static void +thread_io_wake_pending_closer(struct waiting_fd *wfd) +{ + bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex); + if (has_waiter) { + rb_mutex_lock(wfd->busy->wakeup_mutex); + } + + /* Needs to be protected with RB_VM_LOCK because we don't know if + wfd is on the global list of pending FD ops or if it's on a + struct rb_io_close_wait_list close-waiter. */ + RB_VM_LOCK_ENTER(); + ccan_list_del(&wfd->wfd_node); + RB_VM_LOCK_LEAVE(); + + if (has_waiter) { + rb_thread_t *th = rb_thread_ptr(wfd->busy->closing_thread); + if (th->scheduler != Qnil) { + rb_fiber_scheduler_unblock(th->scheduler, wfd->busy->closing_thread, wfd->busy->closing_fiber); + } else { + rb_thread_wakeup(wfd->busy->closing_thread); + } + rb_mutex_unlock(wfd->busy->wakeup_mutex); + } +} + +static bool +thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout) +{ +#if defined(USE_MN_THREADS) && USE_MN_THREADS + return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking; +#else + return false; +#endif +} + +// true if need retry +static bool +thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout) +{ +#if defined(USE_MN_THREADS) && USE_MN_THREADS + if (thread_io_mn_schedulable(th, events, timeout)) { + rb_hrtime_t rel, *prel; + + if (timeout) { + rel = rb_timeval2hrtime(timeout); + prel = &rel; + } + else { + prel = NULL; + } + + VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT))); + + if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) { + // timeout + return false; + } + else { + return true; + } + } +#endif // defined(USE_MN_THREADS) && USE_MN_THREADS + return false; +} + +// assume read/write +static bool +blocking_call_retryable_p(int r, int eno) +{ + if (r != -1) return false; + + switch (eno) { + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + return true; + default: + return false; + } +} + +bool +rb_thread_mn_schedulable(VALUE thval) +{ + rb_thread_t *th = rb_thread_ptr(thval); + return th->mn_schedulable; +} + VALUE -rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events) { + rb_execution_context_t *volatile ec = GET_EC(); + rb_thread_t *volatile th = rb_ec_thread_ptr(ec); + + RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events); + + struct waiting_fd waiting_fd; volatile VALUE val = Qundef; /* shouldn't be used */ - rb_execution_context_t * volatile ec = GET_EC(); volatile int saved_errno = 0; enum ruby_tag_type state; - - struct waiting_fd waiting_fd = { - .fd = fd, - .th = rb_ec_thread_ptr(ec) - }; + bool prev_mn_schedulable = th->mn_schedulable; + th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL); // `errno` is only valid when there is an actual error - but we can't // extract that from the return value of `func` alone, so we clear any @@ -1683,30 +1793,36 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) // `func` or not (as opposed to some previously set value). errno = 0; - RB_VM_LOCK_ENTER(); + thread_io_setup_wfd(th, fd, &waiting_fd); { - ccan_list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */ + retry: + BLOCKING_REGION(waiting_fd.th, { + val = func(data1); + saved_errno = errno; + }, ubf_select, waiting_fd.th, FALSE); + + th = rb_ec_thread_ptr(ec); + if (events && + blocking_call_retryable_p((int)val, saved_errno) && + thread_io_wait_events(th, fd, events, NULL)) { + RUBY_VM_CHECK_INTS_BLOCKING(ec); + goto retry; + } + state = saved_state; + } + EC_POP_TAG(); - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - BLOCKING_REGION(waiting_fd.th, { - val = func(data1); - saved_errno = errno; - }, ubf_select, waiting_fd.th, FALSE); + th = rb_ec_thread_ptr(ec); + th->mn_schedulable = prev_mn_schedulable; } - EC_POP_TAG(); - /* * must be deleted before jump - * this will delete either from waiting_fds or on-stack CCAN_LIST_HEAD(busy) + * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list */ - RB_VM_LOCK_ENTER(); - { - ccan_list_del(&waiting_fd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + thread_io_wake_pending_closer(&waiting_fd); if (state) { EC_JUMP_TAG(ec, state); @@ -1724,6 +1840,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) return val; } +VALUE +rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +{ + return rb_thread_io_blocking_call(func, data1, fd, 0); +} + /* * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release. * @@ -1783,6 +1905,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) /* leave from Ruby world: You can not access Ruby values, etc. */ int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); RUBY_ASSERT_ALWAYS(released); + RB_VM_SAVE_MACHINE_CONTEXT(th); + thread_sched_to_waiting(TH_SCHED(th), th); return r; } @@ -1870,6 +1994,23 @@ enum handle_interrupt_timing { }; static enum handle_interrupt_timing +rb_threadptr_pending_interrupt_from_symbol(rb_thread_t *th, VALUE sym) +{ + if (sym == sym_immediate) { + return INTERRUPT_IMMEDIATE; + } + else if (sym == sym_on_blocking) { + return INTERRUPT_ON_BLOCKING; + } + else if (sym == sym_never) { + return INTERRUPT_NEVER; + } + else { + rb_raise(rb_eThreadError, "unknown mask signature"); + } +} + +static enum handle_interrupt_timing rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) { VALUE mask; @@ -1881,6 +2022,16 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) for (i=0; i<mask_stack_len; i++) { mask = mask_stack[mask_stack_len-(i+1)]; + if (SYMBOL_P(mask)) { + /* do not match RUBY_FATAL_THREAD_KILLED etc */ + if (err != rb_cInteger) { + return rb_threadptr_pending_interrupt_from_symbol(th, mask); + } + else { + continue; + } + } + for (mod = err; mod; mod = RCLASS_SUPER(mod)) { VALUE klass = mod; VALUE sym; @@ -1893,18 +2044,7 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) } if ((sym = rb_hash_aref(mask, klass)) != Qnil) { - if (sym == sym_immediate) { - return INTERRUPT_IMMEDIATE; - } - else if (sym == sym_on_blocking) { - return INTERRUPT_ON_BLOCKING; - } - else if (sym == sym_never) { - return INTERRUPT_NEVER; - } - else { - rb_raise(rb_eThreadError, "unknown mask signature"); - } + return rb_threadptr_pending_interrupt_from_symbol(th, sym); } } /* try to next mask */ @@ -1996,10 +2136,24 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) rb_raise(rb_eArgError, "unknown mask signature"); } - if (!*maskp) { - *maskp = rb_ident_hash_new(); + if (key == rb_eException && (UNDEF_P(*maskp) || NIL_P(*maskp))) { + *maskp = val; + return ST_CONTINUE; + } + + if (RTEST(*maskp)) { + if (!RB_TYPE_P(*maskp, T_HASH)) { + VALUE prev = *maskp; + *maskp = rb_ident_hash_new(); + if (SYMBOL_P(prev)) { + rb_hash_aset(*maskp, rb_eException, prev); + } + } + rb_hash_aset(*maskp, key, val); + } + else { + *maskp = Qfalse; } - rb_hash_aset(*maskp, key, val); return ST_CONTINUE; } @@ -2066,30 +2220,6 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) * resource allocation code. Then, the ensure block is where we can safely * deallocate your resources. * - * ==== Guarding from Timeout::Error - * - * In the next example, we will guard from the Timeout::Error exception. This - * will help prevent from leaking resources when Timeout::Error exceptions occur - * during normal ensure clause. For this example we use the help of the - * standard library Timeout, from lib/timeout.rb - * - * require 'timeout' - * Thread.handle_interrupt(Timeout::Error => :never) { - * timeout(10){ - * # Timeout::Error doesn't occur here - * Thread.handle_interrupt(Timeout::Error => :on_blocking) { - * # possible to be killed by Timeout::Error - * # while blocking operation - * } - * # Timeout::Error doesn't occur here - * } - * } - * - * In the first part of the +timeout+ block, we can rely on Timeout::Error being - * ignored. Then in the <code>Timeout::Error => :on_blocking</code> block, any - * operation that will block the calling thread is susceptible to a - * Timeout::Error exception being raised. - * * ==== Stack control settings * * It's possible to stack multiple levels of ::handle_interrupt blocks in order @@ -2115,7 +2245,7 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) static VALUE rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) { - VALUE mask; + VALUE mask = Qundef; rb_execution_context_t * volatile ec = GET_EC(); rb_thread_t * volatile th = rb_ec_thread_ptr(ec); volatile VALUE r = Qnil; @@ -2125,13 +2255,25 @@ rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) rb_raise(rb_eArgError, "block is needed."); } - mask = 0; mask_arg = rb_to_hash_type(mask_arg); + + if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) { + mask = Qnil; + } + rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask); - if (!mask) { + + if (UNDEF_P(mask)) { return rb_yield(Qnil); } - OBJ_FREEZE_RAW(mask); + + if (!RTEST(mask)) { + mask = mask_arg; + } + else if (RB_TYPE_P(mask, T_HASH)) { + OBJ_FREEZE(mask); + } + rb_ary_push(th->pending_interrupt_mask_stack, mask); if (!rb_threadptr_pending_interrupt_empty_p(th)) { th->pending_interrupt_queue_checked = 0; @@ -2281,13 +2423,7 @@ threadptr_get_interrupts(rb_thread_t *th) return interrupt & (rb_atomic_t)~ec->interrupt_mask; } -#if USE_MJIT -// process.c -extern bool mjit_waitpid_finished; -extern int mjit_waitpid_status; -#endif - -MJIT_FUNC_EXPORTED int +int rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { rb_atomic_t interrupt; @@ -2321,41 +2457,27 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) /* signal handling */ if (trap_interrupt && (th == th->vm->ractor.main_thread)) { enum rb_thread_status prev_status = th->status; - int sigwait_fd = rb_sigwait_fd_get(th); - if (sigwait_fd >= 0) { - (void)consume_communication_pipe(sigwait_fd); - ruby_sigchld_handler(th->vm); - rb_sigwait_fd_put(th, sigwait_fd); - rb_sigwait_fd_migrate(th->vm); - } th->status = THREAD_RUNNABLE; - while ((sig = rb_get_next_signal()) != 0) { - ret |= rb_signal_exec(th, sig); + { + while ((sig = rb_get_next_signal()) != 0) { + ret |= rb_signal_exec(th, sig); + } } th->status = prev_status; } -#if USE_MJIT - // Handle waitpid_signal for MJIT issued by ruby_sigchld_handler. This needs to be done - // outside ruby_sigchld_handler to avoid recursively relying on the SIGCHLD handler. - if (mjit_waitpid_finished && th == th->vm->ractor.main_thread) { - mjit_waitpid_finished = false; - mjit_notify_waitpid(WIFEXITED(mjit_waitpid_status) ? WEXITSTATUS(mjit_waitpid_status) : -1); - } -#endif - /* exception from another thread */ if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) { VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); - RUBY_DEBUG_LOG("err:%"PRIdVALUE"\n", err); + RUBY_DEBUG_LOG("err:%"PRIdVALUE, err); ret = TRUE; if (UNDEF_P(err)) { /* no error */ } - else if (err == eKillSignal /* Thread#kill received */ || - err == eTerminateSignal /* Terminate thread */ || + else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ || + err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ || err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { terminate_interrupt = 1; } @@ -2385,7 +2507,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) limits_us >>= -th->priority; if (th->status == THREAD_RUNNABLE) - th->running_time_us += TIME_QUANTUM_USEC; + th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro VM_ASSERT(th->ec->cfp); EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self, @@ -2480,10 +2602,13 @@ rb_ec_reset_raised(rb_execution_context_t *ec) } int -rb_notify_fd_close(int fd, struct ccan_list_head *busy) +rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) { rb_vm_t *vm = GET_THREAD()->vm; struct waiting_fd *wfd = 0, *next; + ccan_list_head_init(&busy->pending_fd_users); + int has_any; + VALUE wakeup_mutex; RB_VM_LOCK_ENTER(); { @@ -2493,27 +2618,58 @@ rb_notify_fd_close(int fd, struct ccan_list_head *busy) VALUE err; ccan_list_del(&wfd->wfd_node); - ccan_list_add(busy, &wfd->wfd_node); + ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node); + wfd->busy = busy; err = th->vm->special_exceptions[ruby_error_stream_closed]; rb_threadptr_pending_interrupt_enque(th, err); rb_threadptr_interrupt(th); } } } + + has_any = !ccan_list_empty(&busy->pending_fd_users); + busy->closing_thread = rb_thread_current(); + busy->closing_fiber = rb_fiber_current(); + wakeup_mutex = Qnil; + if (has_any) { + wakeup_mutex = rb_mutex_new(); + RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */ + } + busy->wakeup_mutex = wakeup_mutex; + RB_VM_LOCK_LEAVE(); - return !ccan_list_empty(busy); + /* If the caller didn't pass *busy as a pointer to something on the stack, + we need to guard this mutex object on _our_ C stack for the duration + of this function. */ + RB_GC_GUARD(wakeup_mutex); + return has_any; +} + +void +rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy) +{ + if (!RB_TEST(busy->wakeup_mutex)) { + /* There was nobody else using this file when we closed it, so we + never bothered to allocate a mutex*/ + return; + } + + rb_mutex_lock(busy->wakeup_mutex); + while (!ccan_list_empty(&busy->pending_fd_users)) { + rb_mutex_sleep(busy->wakeup_mutex, Qnil); + } + rb_mutex_unlock(busy->wakeup_mutex); } void rb_thread_fd_close(int fd) { - struct ccan_list_head busy; + struct rb_io_close_wait_list busy; - ccan_list_head_init(&busy); if (rb_notify_fd_close(fd, &busy)) { - do rb_thread_schedule(); while (!ccan_list_empty(&busy)); + rb_notify_fd_close_wait(&busy); } } @@ -2586,7 +2742,7 @@ rb_thread_kill(VALUE thread) } else { threadptr_check_pending_interrupt_queue(target_th); - rb_threadptr_pending_interrupt_enque(target_th, eKillSignal); + rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED); rb_threadptr_interrupt(target_th); } @@ -2755,7 +2911,7 @@ VALUE rb_thread_list(void) { // TODO - return rb_ractor_thread_list(GET_RACTOR()); + return rb_ractor_thread_list(); } /* @@ -3282,7 +3438,7 @@ rb_thread_setname(VALUE thread, VALUE name) name = rb_str_new_frozen(name); } target_th->name = name; - if (threadptr_initialized(target_th)) { + if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) { native_set_another_thread_name(target_th->nt->thread_id, name); } return name; @@ -3582,12 +3738,13 @@ static VALUE rb_thread_variable_get(VALUE thread, VALUE key) { VALUE locals; + VALUE symbol = rb_to_symbol(key); if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return Qnil; } locals = rb_thread_local_storage(thread); - return rb_hash_aref(locals, rb_to_symbol(key)); + return rb_hash_aref(locals, symbol); } /* @@ -3738,13 +3895,14 @@ static VALUE rb_thread_variable_p(VALUE thread, VALUE key) { VALUE locals; + VALUE symbol = rb_to_symbol(key); if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return Qfalse; } locals = rb_thread_local_storage(thread); - return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil); + return RBOOL(rb_hash_lookup(locals, symbol) != Qnil); } /* @@ -3873,7 +4031,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) void rb_fd_term(rb_fdset_t *fds) { - if (fds->fdset) xfree(fds->fdset); + xfree(fds->fdset); fds->maxfd = 0; fds->fdset = 0; } @@ -4040,9 +4198,10 @@ rb_fd_set(int fd, rb_fdset_t *set) #endif static int -wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) +wait_retryable(volatile int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) { - if (*result < 0) { + int r = *result; + if (r < 0) { switch (errnum) { case EINTR: #ifdef ERESTART @@ -4056,7 +4215,7 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) } return FALSE; } - else if (*result == 0) { + else if (r == 0) { /* check for spurious wakeup */ if (rel) { return !hrtime_update_expire(rel, end); @@ -4068,7 +4227,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) struct select_set { int max; - int sigwait_fd; rb_thread_t *th; rb_fdset_t *rset; rb_fdset_t *wset; @@ -4084,11 +4242,6 @@ select_set_free(VALUE p) { struct select_set *set = (struct select_set *)p; - if (set->sigwait_fd >= 0) { - rb_sigwait_fd_put(set->th, set->sigwait_fd); - rb_sigwait_fd_migrate(set->th->vm); - } - rb_fd_term(&set->orig_rset); rb_fd_term(&set->orig_wset); rb_fd_term(&set->orig_eset); @@ -4096,33 +4249,16 @@ select_set_free(VALUE p) return Qfalse; } -static const rb_hrtime_t * -sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig, - int *drained_p) -{ - static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000; - - if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { - *drained_p = check_signals_nogvl(th, sigwait_fd); - if (!orig || *orig > quantum) - return &quantum; - } - - return orig; -} - -#define sigwait_signals_fd(result, cond, sigwait_fd) \ - (result > 0 && (cond) ? (result--, (sigwait_fd)) : -1) - static VALUE do_select(VALUE p) { struct select_set *set = (struct select_set *)p; - int result = 0; + volatile int result = 0; int lerrno; rb_hrtime_t *to, rel, end = 0; timeout_prepare(&to, &rel, &end, set->timeout); + volatile rb_hrtime_t endtime = end; #define restore_fdset(dst, src) \ ((dst) ? rb_fd_dup(dst, src) : (void)0) #define do_select_update() \ @@ -4132,31 +4268,21 @@ do_select(VALUE p) TRUE) do { - int drained; lerrno = 0; BLOCKING_REGION(set->th, { - const rb_hrtime_t *sto; struct timeval tv; - sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained); if (!RUBY_VM_INTERRUPTED(set->th->ec)) { - result = native_fd_select(set->max, set->rset, set->wset, - set->eset, - rb_hrtime2timeval(&tv, sto), set->th); + result = native_fd_select(set->max, + set->rset, set->wset, set->eset, + rb_hrtime2timeval(&tv, to), set->th); if (result < 0) lerrno = errno; } - }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE); - - if (set->sigwait_fd >= 0) { - int fd = sigwait_signals_fd(result, - rb_fd_isset(set->sigwait_fd, set->rset), - set->sigwait_fd); - (void)check_signals_nogvl(set->th, fd); - } + }, ubf_select, set->th, TRUE); RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ - } while (wait_retryable(&result, lerrno, to, end) && do_select_update()); + } while (wait_retryable(&result, lerrno, to, endtime) && do_select_update()); if (result < 0) { errno = lerrno; @@ -4165,18 +4291,6 @@ do_select(VALUE p) return (VALUE)result; } -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - if (fd < 0) { - return 0; - } - rb_fd_init(fds); - rb_fd_set(fd, fds); - - return fds; -} - int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) @@ -4200,16 +4314,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return 0; } - set.sigwait_fd = rb_sigwait_fd_get(set.th); - if (set.sigwait_fd >= 0) { - if (set.rset) - rb_fd_set(set.sigwait_fd, set.rset); - else - set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); - if (set.sigwait_fd >= set.max) { - set.max = set.sigwait_fd + 1; - } - } #define fd_init_copy(f) do { \ if (set.f) { \ rb_fd_resize(set.max - 1, set.f); \ @@ -4240,80 +4344,68 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * # define POLLERR_SET (0) #endif +static int +wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t nfds, + rb_hrtime_t *const to, volatile int *lerrno) +{ + struct timespec ts; + volatile int result = 0; + + *lerrno = 0; + BLOCKING_REGION(th, { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0); + if (result < 0) *lerrno = errno; + } + }, ubf_select, th, TRUE); + return result; +} + /* * returns a mask of events */ int rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) { - struct pollfd fds[2]; - int result = 0; - int drained; + struct pollfd fds[1] = {{ + .fd = fd, + .events = (short)events, + .revents = 0, + }}; + volatile int result = 0; nfds_t nfds; - rb_unblock_function_t *ubf; struct waiting_fd wfd; - int state; + enum ruby_tag_type state; volatile int lerrno; - wfd.th = GET_THREAD(); - wfd.fd = fd; - - RB_VM_LOCK_ENTER(); - { - ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); - } - RB_VM_LOCK_LEAVE(); - - EC_PUSH_TAG(wfd.th->ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - rb_hrtime_t *to, rel, end = 0; - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); - timeout_prepare(&to, &rel, &end, timeout); - fds[0].fd = fd; - fds[0].events = (short)events; - fds[0].revents = 0; - do { - fds[1].fd = rb_sigwait_fd_get(wfd.th); - - if (fds[1].fd >= 0) { - fds[1].events = POLLIN; - fds[1].revents = 0; - nfds = 2; - ubf = ubf_sigwait; - } - else { - nfds = 1; - ubf = ubf_select; - } - - lerrno = 0; - BLOCKING_REGION(wfd.th, { - const rb_hrtime_t *sto; - struct timespec ts; + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); - sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained); - if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) { - result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0); - if (result < 0) lerrno = errno; - } - }, ubf, wfd.th, TRUE); + thread_io_setup_wfd(th, fd, &wfd); - if (fds[1].fd >= 0) { - int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd); - (void)check_signals_nogvl(wfd.th, fd1); - rb_sigwait_fd_put(wfd.th, fds[1].fd); - rb_sigwait_fd_migrate(wfd.th->vm); - } - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); - } while (wait_retryable(&result, lerrno, to, end)); + if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) { + // fd is readable + state = 0; + fds[0].revents = events; + errno = 0; } - EC_POP_TAG(); + else { + EC_PUSH_TAG(wfd.th->ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + rb_hrtime_t *to, rel, end = 0; + RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + timeout_prepare(&to, &rel, &end, timeout); + do { + nfds = numberof(fds); + result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno); - RB_VM_LOCK_ENTER(); - { - ccan_list_del(&wfd.wfd_node); + RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + } while (wait_retryable(&result, lerrno, to, end)); + } + EC_POP_TAG(); } - RB_VM_LOCK_LEAVE(); + + thread_io_wake_pending_closer(&wfd); if (state) { EC_JUMP_TAG(wfd.th->ec, state); @@ -4387,11 +4479,7 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; - RB_VM_LOCK_ENTER(); - { - ccan_list_del(&args->wfd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + thread_io_wake_pending_closer(&args->wfd); if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4399,6 +4487,18 @@ select_single_cleanup(VALUE ptr) return (VALUE)-1; } +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) +{ + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); + + return fds; +} + int rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) { @@ -4406,20 +4506,15 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct select_args args; int r; VALUE ptr = (VALUE)&args; + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; args.tv = timeout; - args.wfd.fd = fd; - args.wfd.th = GET_THREAD(); - - RB_VM_LOCK_ENTER(); - { - ccan_list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node); - } - RB_VM_LOCK_LEAVE(); + thread_io_setup_wfd(th, fd, &args.wfd); r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); if (r == -1) @@ -4438,7 +4533,12 @@ void rb_gc_set_stack_end(VALUE **stack_end_p) { VALUE stack_end; +COMPILER_WARNING_PUSH +#if __has_warning("-Wdangling-pointer") +COMPILER_WARNING_IGNORED(-Wdangling-pointer); +#endif *stack_end_p = &stack_end; +COMPILER_WARNING_POP } #endif @@ -4480,16 +4580,13 @@ consume_communication_pipe(int fd) ssize_t result; int ret = FALSE; /* for rb_sigwait_sleep */ - /* - * disarm UBF_TIMER before we read, because it can become - * re-armed at any time via sighandler and the pipe will refill - * We can disarm it because this thread is now processing signals - * and we do not want unnecessary SIGVTALRM - */ - ubf_timer_disarm(); - while (1) { result = read(fd, buff, sizeof(buff)); +#if USE_EVENTFD + RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]); +#else + RUBY_DEBUG_LOG("result:%d", (int)result); +#endif if (result > 0) { ret = TRUE; if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) { @@ -4516,26 +4613,6 @@ consume_communication_pipe(int fd) } } -static int -check_signals_nogvl(rb_thread_t *th, int sigwait_fd) -{ - rb_vm_t *vm = GET_VM(); /* th may be 0 */ - int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE; - ubf_wakeup_all_threads(); - ruby_sigchld_handler(vm); - if (rb_signal_buff_size()) { - if (th == vm->ractor.main_thread) { - /* no need to lock + wakeup if already in main thread */ - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - } - else { - threadptr_trap_interrupt(vm->ractor.main_thread); - } - ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ - } - return ret; -} - void rb_thread_stop_timer_thread(void) { @@ -4621,9 +4698,9 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r rb_vm_living_threads_init(vm); rb_ractor_atfork(vm, th); + rb_vm_postponed_job_atfork(); - /* may be held by MJIT threads in parent */ - rb_native_mutex_initialize(&vm->waitpid_lock); + /* may be held by RJIT threads in parent */ rb_native_mutex_initialize(&vm->workqueue_lock); /* may be held by any thread in parent */ @@ -4633,6 +4710,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r rb_ractor_sleeper_threads_clear(th->ractor); rb_clear_coverages(); + // restart timer thread (timer threads access to `vm->waitpid_lock` and so on. + rb_thread_reset_timer_thread(); + rb_thread_start_timer_thread(); + VM_ASSERT(vm->ractor.blocking_cnt == 0); VM_ASSERT(vm->ractor.cnt == 1); } @@ -4652,15 +4733,13 @@ void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_threadptr_pending_interrupt_clear(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; rb_fiber_atfork(th); /* We don't want reproduce CVE-2003-0900. */ rb_reset_random_seed(); - - /* For child, starting MJIT worker thread in this place which is safer than immediately after `after_fork_ruby`. */ - mjit_child_after_fork(); } static void @@ -4693,16 +4772,14 @@ struct thgroup { int enclosed; }; -static size_t -thgroup_memsize(const void *ptr) -{ - return sizeof(struct thgroup); -} - static const rb_data_type_t thgroup_data_type = { "thgroup", - {0, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY + { + 0, + RUBY_TYPED_DEFAULT_FREE, + NULL, // No external memory to report + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE }; /* @@ -4920,6 +4997,17 @@ rb_thread_shield_new(void) return thread_shield; } +bool +rb_thread_shield_owned(VALUE self) +{ + VALUE mutex = GetThreadShieldPtr(self); + if (!mutex) return false; + + rb_mutex_t *m = mutex_ptr(mutex); + + return m->fiber == GET_EC()->fiber_ptr; +} + /* * Wait a thread shield. * @@ -5019,12 +5107,12 @@ recursive_list_access(VALUE sym) } /* - * Returns Qtrue if and only if obj (or the pair <obj, paired_obj>) is already + * Returns true if and only if obj (or the pair <obj, paired_obj>) is already * in the recursion list. * Assumes the recursion list is valid. */ -static VALUE +static bool recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id) { #if SIZEOF_LONG == SIZEOF_VOIDP @@ -5036,18 +5124,18 @@ recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id) VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); if (UNDEF_P(pair_list)) - return Qfalse; + return false; if (paired_obj_id) { if (!RB_TYPE_P(pair_list, T_HASH)) { if (!OBJ_ID_EQL(paired_obj_id, pair_list)) - return Qfalse; + return false; } else { if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) - return Qfalse; + return false; } } - return Qtrue; + return true; } /* @@ -5127,7 +5215,7 @@ exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data)) * Calls func(obj, arg, recursive), where recursive is non-zero if the * current method is called recursively on obj, or on the pair <obj, pairid> * If outer is 0, then the innermost func will be called with recursive set - * to Qtrue, otherwise the outermost func will be called. In the latter case, + * to true, otherwise the outermost func will be called. In the latter case, * all inner func are short-circuited by throw. * Implementation details: the value thrown is the recursive list which is * proper to the current method and unlikely to be caught anywhere else. @@ -5218,7 +5306,7 @@ rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pai /* * If recursion is detected on the current method and obj, the outermost - * func will be called with (obj, arg, Qtrue). All inner func will be + * func will be called with (obj, arg, true). All inner func will be * short-circuited using throw. */ @@ -5236,7 +5324,7 @@ rb_exec_recursive_outer_mid(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE /* * If recursion is detected on the current method, obj and paired_obj, - * the outermost func will be called with (obj, arg, Qtrue). All inner + * the outermost func will be called with (obj, arg, true). All inner * func will be short-circuited using throw. */ @@ -5282,7 +5370,6 @@ Init_Thread_Mutex(void) { rb_thread_t *th = GET_THREAD(); - rb_native_mutex_initialize(&th->vm->waitpid_lock); rb_native_mutex_initialize(&th->vm->workqueue_lock); rb_native_mutex_initialize(&th->interrupt_lock); } @@ -5391,8 +5478,12 @@ Init_Thread(void) /* main thread setting */ { /* acquire global vm lock */ - struct rb_thread_sched *sched = TH_SCHED(th); - thread_sched_to_running(sched, th); +#ifdef HAVE_PTHREAD_NP_H + VM_ASSERT(TH_SCHED(th)->running == th); +#endif + // thread_sched_to_running() should not be called because + // it assumes blocked by thread_sched_to_waiting(). + // thread_sched_to_running(sched, th); th->pending_interrupt_queue = rb_ary_hidden_new(0); th->pending_interrupt_queue_checked = 0; @@ -5403,6 +5494,9 @@ Init_Thread(void) rb_thread_create_timer_thread(); Init_thread_sync(); + + // TODO: Suppress unused function warning for now + // if (0) rb_thread_sched_destroy(NULL); } int @@ -5432,7 +5526,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) ccan_list_for_each(&r->threads.set, th, lt_node) { rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " "native:%p int:%u", - th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag); + th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag); if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); @@ -5458,14 +5552,18 @@ rb_check_deadlock(rb_ractor_t *r) { if (GET_THREAD()->vm->thread_ignore_deadlock) return; - int found = 0; - rb_thread_t *th = NULL; +#ifdef RUBY_THREAD_PTHREAD_H + if (r->threads.sched.readyq_cnt > 0) return; +#endif + int sleeper_num = rb_ractor_sleeper_thread_num(r); int ltnum = rb_ractor_living_thread_num(r); if (ltnum > sleeper_num) return; if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); - if (patrol_thread && patrol_thread != GET_THREAD()) return; + + int found = 0; + rb_thread_t *th = NULL; ccan_list_for_each(&r->threads.set, th, lt_node) { if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { @@ -5756,7 +5854,7 @@ rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data) rb_thread_t *cur_th = GET_THREAD(); rb_hash_aset(interrupt_mask, rb_cObject, sym_never); - OBJ_FREEZE_RAW(interrupt_mask); + OBJ_FREEZE(interrupt_mask); rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask); VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil); @@ -5764,3 +5862,66 @@ rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data) RUBY_VM_CHECK_INTS(cur_th->ec); return ret; } + +static void +thread_specific_storage_alloc(rb_thread_t *th) +{ + VM_ASSERT(th->specific_storage == NULL); + + if (UNLIKELY(specific_key_count > 0)) { + th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + } +} + +rb_internal_thread_specific_key_t +rb_internal_thread_specific_key_create(void) +{ + rb_vm_t *vm = GET_VM(); + + if (specific_key_count == 0 && vm->ractor.cnt > 1) { + rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors"); + } + else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) { + rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + } + else { + rb_internal_thread_specific_key_t key = specific_key_count++; + + if (key == 0) { + // allocate + rb_ractor_t *cr = GET_RACTOR(); + rb_thread_t *th; + + ccan_list_for_each(&cr->threads.set, th, lt_node) { + thread_specific_storage_alloc(th); + } + } + return key; + } +} + +// async and native thread safe. +void * +rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key) +{ + rb_thread_t *th = DATA_PTR(thread_val); + + VM_ASSERT(rb_thread_ptr(thread_val) == th); + VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + VM_ASSERT(th->specific_storage); + + return th->specific_storage[key]; +} + +// async and native thread safe. +void +rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data) +{ + rb_thread_t *th = DATA_PTR(thread_val); + + VM_ASSERT(rb_thread_ptr(thread_val) == th); + VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + VM_ASSERT(th->specific_storage); + + th->specific_storage[key] = data; +} |