diff options
Diffstat (limited to 'thread_pthread.c')
| -rw-r--r-- | thread_pthread.c | 712 |
1 files changed, 418 insertions, 294 deletions
diff --git a/thread_pthread.c b/thread_pthread.c index 6d2f55a957..9c7754067b 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -12,7 +12,7 @@ #ifdef THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION #include "internal/gc.h" -#include "rjit.h" +#include "internal/sanitizers.h" #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> @@ -90,9 +90,16 @@ static const void *const condattr_monotonic = NULL; #endif #endif +#ifdef HAVE_SCHED_YIELD +#define native_thread_yield() (void)sched_yield() +#else +#define native_thread_yield() ((void)0) +#endif + // native thread wrappers #define NATIVE_MUTEX_LOCK_DEBUG 0 +#define NATIVE_MUTEX_LOCK_DEBUG_YIELD 0 static void mutex_debug(const char *msg, void *lock) @@ -111,6 +118,9 @@ void rb_native_mutex_lock(pthread_mutex_t *lock) { int r; +#if NATIVE_MUTEX_LOCK_DEBUG_YIELD + native_thread_yield(); +#endif mutex_debug("lock", lock); if ((r = pthread_mutex_lock(lock)) != 0) { rb_bug_errno("pthread_mutex_lock", r); @@ -304,25 +314,12 @@ event_name(rb_event_flag_t event) static rb_serial_t current_fork_gen = 1; /* We can't use GET_VM()->fork_gen */ -#if defined(SIGVTALRM) && !defined(__CYGWIN__) && !defined(__EMSCRIPTEN__) +#if defined(SIGVTALRM) && !defined(__EMSCRIPTEN__) # define USE_UBF_LIST 1 #endif static void threadptr_trap_interrupt(rb_thread_t *); -#ifdef HAVE_SCHED_YIELD -#define native_thread_yield() (void)sched_yield() -#else -#define native_thread_yield() ((void)0) -#endif - -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_MSEC (100) -#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) -#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) - static void native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); static void native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); static void native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th); @@ -332,6 +329,8 @@ static void timer_thread_wakeup(void); static void timer_thread_wakeup_locked(rb_vm_t *vm); static void timer_thread_wakeup_force(void); static void thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th); +static void coroutine_transfer0(struct coroutine_context *transfer_from, + struct coroutine_context *transfer_to, bool to_dead); #define thread_sched_dump(s) thread_sched_dump_(__FILE__, __LINE__, s) @@ -379,40 +378,47 @@ ractor_sched_dump_(const char *file, int line, rb_vm_t *vm) #define thread_sched_unlock(a, b) thread_sched_unlock_(a, b, __FILE__, __LINE__) static void -thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) +thread_sched_set_locked(struct rb_thread_sched *sched, rb_thread_t *th) { - rb_native_mutex_lock(&sched->lock_); - -#if VM_CHECK_MODE - RUBY_DEBUG_LOG2(file, line, "th:%u prev_owner:%u", rb_th_serial(th), rb_th_serial(sched->lock_owner)); +#if VM_CHECK_MODE > 0 VM_ASSERT(sched->lock_owner == NULL); + sched->lock_owner = th; -#else - RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #endif } static void -thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) +thread_sched_set_unlocked(struct rb_thread_sched *sched, rb_thread_t *th) { - RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); - -#if VM_CHECK_MODE +#if VM_CHECK_MODE > 0 VM_ASSERT(sched->lock_owner == th); + sched->lock_owner = NULL; #endif - - rb_native_mutex_unlock(&sched->lock_); } static void -thread_sched_set_lock_owner(struct rb_thread_sched *sched, rb_thread_t *th) +thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { - RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + rb_native_mutex_lock(&sched->lock_); -#if VM_CHECK_MODE > 0 - sched->lock_owner = th; +#if VM_CHECK_MODE + RUBY_DEBUG_LOG2(file, line, "r:%d th:%u", th ? (int)rb_ractor_id(th->ractor) : -1, rb_th_serial(th)); +#else + RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #endif + + thread_sched_set_locked(sched, th); +} + +static void +thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) +{ + RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); + + thread_sched_set_unlocked(sched, th); + + rb_native_mutex_unlock(&sched->lock_); } static void @@ -435,7 +441,8 @@ ASSERT_thread_sched_locked(struct rb_thread_sched *sched, rb_thread_t *th) RBIMPL_ATTR_MAYBE_UNUSED() static unsigned int -rb_ractor_serial(const rb_ractor_t *r) { +rb_ractor_serial(const rb_ractor_t *r) +{ if (r) { return rb_ractor_id(r); } @@ -546,7 +553,6 @@ ractor_sched_timeslice_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) } static void ractor_sched_barrier_join_signal_locked(rb_vm_t *vm); -static void ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th); // setup timeslice signals by the timer thread. static void @@ -589,11 +595,10 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c } if (add_th) { - if (UNLIKELY(vm->ractor.sched.barrier_waiting)) { - RUBY_DEBUG_LOG("barrier-wait"); - - ractor_sched_barrier_join_signal_locked(vm); - ractor_sched_barrier_join_wait_locked(vm, add_th); + if (vm->ractor.sched.barrier_waiting) { + // TODO: GC barrier check? + RUBY_DEBUG_LOG("barrier_waiting"); + RUBY_VM_SET_VM_BARRIER_INTERRUPT(add_th->ec); } VM_ASSERT(!ractor_sched_running_threads_contain_p(vm, add_th)); @@ -625,20 +630,6 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c } ractor_sched_unlock(vm, cr); - if (add_th && !del_th && UNLIKELY(vm->ractor.sync.lock_owner != NULL)) { - // it can be after barrier synchronization by another ractor - rb_thread_t *lock_owner = NULL; -#if VM_CHECK_MODE - lock_owner = sched->lock_owner; -#endif - thread_sched_unlock(sched, lock_owner); - { - RB_VM_LOCK_ENTER(); - RB_VM_LOCK_LEAVE(); - } - thread_sched_lock(sched, lock_owner); - } - //RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u run:%u->%u", // rb_th_serial(add_th), rb_th_serial(del_th), // rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th), @@ -757,7 +748,8 @@ thread_sched_enq(struct rb_thread_sched *sched, rb_thread_t *ready_th) } } else { - VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); + // ractor_sched lock is needed + // VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); } ccan_list_add_tail(&sched->readyq, &ready_th->sched.node.readyq); @@ -841,7 +833,7 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); ASSERT_thread_sched_locked(sched, th); - VM_ASSERT(th == GET_THREAD()); + VM_ASSERT(th == rb_ec_thread_ptr(rb_current_ec_noinline())); if (th != sched->running) { // already deleted from running threads @@ -853,16 +845,16 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b if (th_has_dedicated_nt(th)) { RUBY_DEBUG_LOG("(nt) sleep th:%u running:%u", rb_th_serial(th), rb_th_serial(sched->running)); - thread_sched_set_lock_owner(sched, NULL); + thread_sched_set_unlocked(sched, th); { RUBY_DEBUG_LOG("nt:%d cond:%p", th->nt->serial, &th->nt->cond.readyq); rb_native_cond_wait(&th->nt->cond.readyq, &sched->lock_); } - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); RUBY_DEBUG_LOG("(nt) wakeup %s", sched->running == th ? "success" : "failed"); if (th == sched->running) { - rb_ractor_thread_switch(th->ractor, th); + rb_ractor_thread_switch(th->ractor, th, false); } } else { @@ -874,12 +866,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u->%u (direct)", rb_th_serial(th), rb_th_serial(next_th)); - thread_sched_set_lock_owner(sched, NULL); + thread_sched_set_unlocked(sched, th); { rb_ractor_set_current_ec(th->ractor, NULL); thread_sched_switch(th, next_th); } - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); } else { // search another ready ractor @@ -888,20 +880,20 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u->%u (ractor scheduling)", rb_th_serial(th), rb_th_serial(next_th)); - thread_sched_set_lock_owner(sched, NULL); + thread_sched_set_unlocked(sched, th); { rb_ractor_set_current_ec(th->ractor, NULL); - coroutine_transfer(th->sched.context, nt->nt_context); + coroutine_transfer0(th->sched.context, nt->nt_context, false); } - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); } - VM_ASSERT(GET_EC() == th->ec); + VM_ASSERT(rb_current_ec_noinline() == th->ec); } } VM_ASSERT(th->nt != NULL); - VM_ASSERT(GET_EC() == th->ec); + VM_ASSERT(rb_current_ec_noinline() == th->ec); VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); // add th to running threads @@ -980,33 +972,16 @@ thread_sched_wakeup_next_thread(struct rb_thread_sched *sched, rb_thread_t *th, } } -// running -> waiting -// -// to_dead: false -// th will run dedicated task. -// run another ready thread. -// to_dead: true -// th will be dead. -// run another ready thread. +// running -> dead (locked) static void -thread_sched_to_waiting_common0(struct rb_thread_sched *sched, rb_thread_t *th, bool to_dead) +thread_sched_to_dead_common(struct rb_thread_sched *sched, rb_thread_t *th) { - RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + RUBY_DEBUG_LOG("th:%u DNT:%d", rb_th_serial(th), th->nt->dedicated); - if (!to_dead) native_thread_dedicated_inc(th->vm, th->ractor, th->nt); - - RUBY_DEBUG_LOG("%sth:%u", to_dead ? "to_dead " : "", rb_th_serial(th)); + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); - bool can_switch = to_dead ? !th_has_dedicated_nt(th) : false; - thread_sched_wakeup_next_thread(sched, th, can_switch); -} + thread_sched_wakeup_next_thread(sched, th, !th_has_dedicated_nt(th)); -// running -> dead (locked) -static void -thread_sched_to_dead_common(struct rb_thread_sched *sched, rb_thread_t *th) -{ - RUBY_DEBUG_LOG("dedicated:%d", th->nt->dedicated); - thread_sched_to_waiting_common0(sched, th, true); RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_EXITED, th); } @@ -1027,8 +1002,12 @@ thread_sched_to_dead(struct rb_thread_sched *sched, rb_thread_t *th) static void thread_sched_to_waiting_common(struct rb_thread_sched *sched, rb_thread_t *th) { - RUBY_DEBUG_LOG("dedicated:%d", th->nt->dedicated); - thread_sched_to_waiting_common0(sched, th, false); + RUBY_DEBUG_LOG("th:%u DNT:%d", rb_th_serial(th), th->nt->dedicated); + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + + native_thread_dedicated_inc(th->vm, th->ractor, th->nt); + thread_sched_wakeup_next_thread(sched, th, false); } // running -> waiting @@ -1045,15 +1024,45 @@ thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th) } // mini utility func -static void -setup_ubf(rb_thread_t *th, rb_unblock_function_t *func, void *arg) +// return true if any there are any interrupts +static bool +ubf_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg) { + VM_ASSERT(func != NULL); + + retry: + if (RUBY_VM_INTERRUPTED(th->ec)) { + RUBY_DEBUG_LOG("interrupted:0x%x", th->ec->interrupt_flag); + return true; + } + rb_native_mutex_lock(&th->interrupt_lock); { + if (!th->ec->raised_flag && RUBY_VM_INTERRUPTED(th->ec)) { + rb_native_mutex_unlock(&th->interrupt_lock); + goto retry; + } + + VM_ASSERT(th->unblock.func == NULL); th->unblock.func = func; th->unblock.arg = arg; } rb_native_mutex_unlock(&th->interrupt_lock); + + return false; +} + +static void +ubf_clear(rb_thread_t *th) +{ + if (th->unblock.func) { + rb_native_mutex_lock(&th->interrupt_lock); + { + th->unblock.func = NULL; + th->unblock.arg = NULL; + } + rb_native_mutex_unlock(&th->interrupt_lock); + } } static void @@ -1089,7 +1098,10 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); RB_VM_SAVE_MACHINE_CONTEXT(th); - setup_ubf(th, ubf_waiting, (void *)th); + + if (ubf_set(th, ubf_waiting, (void *)th)) { + return; + } RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); @@ -1097,6 +1109,7 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t { if (!RUBY_VM_INTERRUPTED(th->ec)) { bool can_direct_transfer = !th_has_dedicated_nt(th); + // NOTE: th->status is set before and after this sleep outside of this function in `sleep_forever` thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); thread_sched_wait_running_turn(sched, th, can_direct_transfer); } @@ -1106,7 +1119,7 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t } thread_sched_unlock(sched, th); - setup_ubf(th, NULL, NULL); + ubf_clear(th); } // run another thread in the ready queue. @@ -1124,6 +1137,7 @@ thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) bool can_direct_transfer = !th_has_dedicated_nt(th); thread_sched_to_ready_common(sched, th, false, can_direct_transfer); thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; } else { VM_ASSERT(sched->readyq_cnt == 0); @@ -1150,7 +1164,28 @@ rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork) } static void -thread_sched_switch0(struct coroutine_context *current_cont, rb_thread_t *next_th, struct rb_native_thread *nt) +coroutine_transfer0(struct coroutine_context *transfer_from, struct coroutine_context *transfer_to, bool to_dead) +{ +#ifdef RUBY_ASAN_ENABLED + void **fake_stack = to_dead ? NULL : &transfer_from->fake_stack; + __sanitizer_start_switch_fiber(fake_stack, transfer_to->stack_base, transfer_to->stack_size); +#endif + + RBIMPL_ATTR_MAYBE_UNUSED() + struct coroutine_context *returning_from = coroutine_transfer(transfer_from, transfer_to); + + /* if to_dead was passed, the caller is promising that this coroutine is finished and it should + * never be resumed! */ + VM_ASSERT(!to_dead); +#ifdef RUBY_ASAN_ENABLED + __sanitizer_finish_switch_fiber(transfer_from->fake_stack, + (const void**)&returning_from->stack_base, &returning_from->stack_size); +#endif + +} + +static void +thread_sched_switch0(struct coroutine_context *current_cont, rb_thread_t *next_th, struct rb_native_thread *nt, bool to_dead) { VM_ASSERT(!nt->dedicated); VM_ASSERT(next_th->nt == NULL); @@ -1159,7 +1194,8 @@ thread_sched_switch0(struct coroutine_context *current_cont, rb_thread_t *next_t ruby_thread_set_native(next_th); native_thread_assign(nt, next_th); - coroutine_transfer(current_cont, next_th->sched.context); + + coroutine_transfer0(current_cont, next_th->sched.context, to_dead); } static void @@ -1168,7 +1204,7 @@ thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th) struct rb_native_thread *nt = cth->nt; native_thread_assign(NULL, cth); RUBY_DEBUG_LOG("th:%u->%u on nt:%d", rb_th_serial(cth), rb_th_serial(next_th), nt->serial); - thread_sched_switch0(cth->sched.context, next_th, nt); + thread_sched_switch0(cth->sched.context, next_th, nt, cth->status == THREAD_KILLED); } #if VM_CHECK_MODE > 0 @@ -1290,58 +1326,51 @@ ractor_sched_deq(rb_vm_t *vm, rb_ractor_t *cr) void rb_ractor_lock_self(rb_ractor_t *r); void rb_ractor_unlock_self(rb_ractor_t *r); +// The current thread for a ractor is put to "sleep" (descheduled in the STOPPED_FOREVER state) waiting for +// a ractor action to wake it up. void -rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf) +rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ubf_arg) { // ractor lock of cr is acquired - // r is sleeping statuss - rb_thread_t *th = rb_ec_thread_ptr(ec); + + RUBY_DEBUG_LOG("start%s", ""); + + rb_thread_t * volatile th = rb_ec_thread_ptr(ec); struct rb_thread_sched *sched = TH_SCHED(th); - cr->sync.wait.waiting_thread = th; // TODO: multi-thread - setup_ubf(th, ubf, (void *)cr); + if (ubf_set(th, ubf, ubf_arg)) { + // interrupted + return; + } thread_sched_lock(sched, th); + rb_ractor_unlock_self(cr); { - rb_ractor_unlock_self(cr); - { - if (RUBY_VM_INTERRUPTED(th->ec)) { - RUBY_DEBUG_LOG("interrupted"); - } - else if (cr->sync.wait.wakeup_status != wakeup_none) { - RUBY_DEBUG_LOG("awaken:%d", (int)cr->sync.wait.wakeup_status); - } - else { - // sleep - RB_VM_SAVE_MACHINE_CONTEXT(th); - th->status = THREAD_STOPPED_FOREVER; - - RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); - - bool can_direct_transfer = !th_has_dedicated_nt(th); - thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); - thread_sched_wait_running_turn(sched, th, can_direct_transfer); - th->status = THREAD_RUNNABLE; - // wakeup - } - } + // setup sleep + bool can_direct_transfer = !th_has_dedicated_nt(th); + RB_VM_SAVE_MACHINE_CONTEXT(th); + th->status = THREAD_STOPPED_FOREVER; + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); + // sleep + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; } thread_sched_unlock(sched, th); + rb_ractor_lock_self(cr); - setup_ubf(th, NULL, NULL); + ubf_clear(th); - rb_ractor_lock_self(cr); - cr->sync.wait.waiting_thread = NULL; + RUBY_DEBUG_LOG("end%s", ""); } void -rb_ractor_sched_wakeup(rb_ractor_t *r) +rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *r_th) { - rb_thread_t *r_th = r->sync.wait.waiting_thread; - // ractor lock of r is acquired + // ractor lock of r is NOT acquired struct rb_thread_sched *sched = TH_SCHED(r_th); - VM_ASSERT(r->sync.wait.wakeup_status != 0); + RUBY_DEBUG_LOG("r:%u th:%d", (unsigned int)rb_ractor_id(r), r_th->serial); thread_sched_lock(sched, r_th); { @@ -1357,6 +1386,7 @@ ractor_sched_barrier_completed_p(rb_vm_t *vm) { RUBY_DEBUG_LOG("run:%u wait:%u", vm->ractor.sched.running_cnt, vm->ractor.sched.barrier_waiting_cnt); VM_ASSERT(vm->ractor.sched.running_cnt - 1 >= vm->ractor.sched.barrier_waiting_cnt); + return (vm->ractor.sched.running_cnt - vm->ractor.sched.barrier_waiting_cnt) == 1; } @@ -1367,6 +1397,8 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) VM_ASSERT(vm->ractor.sync.lock_owner == cr); // VM is locked VM_ASSERT(!vm->ractor.sched.barrier_waiting); VM_ASSERT(vm->ractor.sched.barrier_waiting_cnt == 0); + VM_ASSERT(vm->ractor.sched.barrier_ractor == NULL); + VM_ASSERT(vm->ractor.sched.barrier_lock_rec == 0); RUBY_DEBUG_LOG("start serial:%u", vm->ractor.sched.barrier_serial); @@ -1375,46 +1407,60 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = true; + vm->ractor.sched.barrier_ractor = cr; + vm->ractor.sched.barrier_lock_rec = vm->ractor.sync.lock_rec; // release VM lock lock_rec = vm->ractor.sync.lock_rec; vm->ractor.sync.lock_rec = 0; vm->ractor.sync.lock_owner = NULL; rb_native_mutex_unlock(&vm->ractor.sync.lock); - { - // interrupts all running threads - rb_thread_t *ith; - ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { - if (ith->ractor != cr) { - RUBY_DEBUG_LOG("barrier int:%u", rb_th_serial(ith)); - RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); - } - } - // wait for other ractors - while (!ractor_sched_barrier_completed_p(vm)) { - ractor_sched_set_unlocked(vm, cr); - rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); - ractor_sched_set_locked(vm, cr); + // interrupts all running threads + rb_thread_t *ith; + ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { + if (ith->ractor != cr) { + RUBY_DEBUG_LOG("barrier request to th:%u", rb_th_serial(ith)); + RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); } } - } - ractor_sched_unlock(vm, cr); - // acquire VM lock - rb_native_mutex_lock(&vm->ractor.sync.lock); - vm->ractor.sync.lock_rec = lock_rec; - vm->ractor.sync.lock_owner = cr; + // wait for other ractors + while (!ractor_sched_barrier_completed_p(vm)) { + ractor_sched_set_unlocked(vm, cr); + rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); + } - RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); + RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); - ractor_sched_lock(vm, cr); - { - vm->ractor.sched.barrier_waiting = false; + // no other ractors are there vm->ractor.sched.barrier_serial++; vm->ractor.sched.barrier_waiting_cnt = 0; rb_native_cond_broadcast(&vm->ractor.sched.barrier_release_cond); + + // acquire VM lock + rb_native_mutex_lock(&vm->ractor.sync.lock); + vm->ractor.sync.lock_rec = lock_rec; + vm->ractor.sync.lock_owner = cr; } + + // do not release ractor_sched_lock and there is no newly added (resumed) thread + // thread_sched_setup_running_threads +} + +// called from vm_lock_leave if the vm_lock used for barrierred +void +rb_ractor_sched_barrier_end(rb_vm_t *vm, rb_ractor_t *cr) +{ + RUBY_DEBUG_LOG("serial:%u", (unsigned int)vm->ractor.sched.barrier_serial - 1); + VM_ASSERT(vm->ractor.sched.barrier_waiting); + VM_ASSERT(vm->ractor.sched.barrier_ractor); + VM_ASSERT(vm->ractor.sched.barrier_lock_rec > 0); + + vm->ractor.sched.barrier_waiting = false; + vm->ractor.sched.barrier_ractor = NULL; + vm->ractor.sched.barrier_lock_rec = 0; ractor_sched_unlock(vm, cr); } @@ -1530,6 +1576,12 @@ thread_sched_atfork(struct rb_thread_sched *sched) } vm->ractor.sched.running_cnt = 0; + rb_native_mutex_initialize(&vm->ractor.sched.lock); +#if VM_CHECK_MODE > 0 + vm->ractor.sched.lock_owner = NULL; + vm->ractor.sched.locked = false; +#endif + // rb_native_cond_destroy(&vm->ractor.sched.cond); rb_native_cond_initialize(&vm->ractor.sched.cond); rb_native_cond_initialize(&vm->ractor.sched.barrier_complete_cond); @@ -1755,6 +1807,27 @@ native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th) } static void +native_thread_destroy_atfork(struct rb_native_thread *nt) +{ + if (nt) { + /* We can't call rb_native_cond_destroy here because according to the + * specs of pthread_cond_destroy: + * + * Attempting to destroy a condition variable upon which other threads + * are currently blocked results in undefined behavior. + * + * Specifically, glibc's pthread_cond_destroy waits on all the other + * listeners. Since after forking all the threads are dead, the condition + * variable's listeners will never wake up, so it will hang forever. + */ + + RB_ALTSTACK_FREE(nt->altstack); + ruby_xfree(nt->nt_context); + ruby_xfree(nt); + } +} + +static void native_thread_destroy(struct rb_native_thread *nt) { if (nt) { @@ -1764,9 +1837,7 @@ native_thread_destroy(struct rb_native_thread *nt) rb_native_cond_destroy(&nt->cond.intr); } - RB_ALTSTACK_FREE(nt->altstack); - ruby_xfree(nt->nt_context); - ruby_xfree(nt); + native_thread_destroy_atfork(nt); } } @@ -1906,67 +1977,13 @@ space_size(size_t stack_size) } } -#ifdef __linux__ -static __attribute__((noinline)) void -reserve_stack(volatile char *limit, size_t size) -{ -# ifdef C_ALLOCA -# error needs alloca() -# endif - struct rlimit rl; - volatile char buf[0x100]; - enum {stack_check_margin = 0x1000}; /* for -fstack-check */ - - STACK_GROW_DIR_DETECTION; - - if (!getrlimit(RLIMIT_STACK, &rl) && rl.rlim_cur == RLIM_INFINITY) - return; - - if (size < stack_check_margin) return; - size -= stack_check_margin; - - size -= sizeof(buf); /* margin */ - if (IS_STACK_DIR_UPPER()) { - const volatile char *end = buf + sizeof(buf); - limit += size; - if (limit > end) { - /* |<-bottom (=limit(a)) top->| - * | .. |<-buf 256B |<-end | stack check | - * | 256B | =size= | margin (4KB)| - * | =size= limit(b)->| 256B | | - * | | alloca(sz) | | | - * | .. |<-buf |<-limit(c) [sz-1]->0> | | - */ - size_t sz = limit - end; - limit = alloca(sz); - limit[sz-1] = 0; - } - } - else { - limit -= size; - if (buf > limit) { - /* |<-top (=limit(a)) bottom->| - * | .. | 256B buf->| | stack check | - * | 256B | =size= | margin (4KB)| - * | =size= limit(b)->| 256B | | - * | | alloca(sz) | | | - * | .. | buf->| limit(c)-><0> | | - */ - size_t sz = buf - limit; - limit = alloca(sz); - limit[0] = 0; - } - } -} -#else -# define reserve_stack(limit, size) ((void)(limit), (void)(size)) -#endif - -#undef ruby_init_stack -void -ruby_init_stack(volatile VALUE *addr) +static void +native_thread_init_main_thread_stack(void *addr) { native_main_thread.id = pthread_self(); +#ifdef RUBY_ASAN_ENABLED + addr = asan_get_real_stack_addr((void *)addr); +#endif #if MAINSTACKADDR_AVAILABLE if (native_main_thread.stack_maxsize) return; @@ -1976,7 +1993,6 @@ ruby_init_stack(volatile VALUE *addr) if (get_main_stack(&stackaddr, &size) == 0) { native_main_thread.stack_maxsize = size; native_main_thread.stack_start = stackaddr; - reserve_stack(stackaddr, size); goto bound_check; } } @@ -1986,8 +2002,8 @@ ruby_init_stack(volatile VALUE *addr) #else if (!native_main_thread.stack_start || STACK_UPPER((VALUE *)(void *)&addr, - native_main_thread.stack_start > addr, - native_main_thread.stack_start < addr)) { + native_main_thread.stack_start > (VALUE *)addr, + native_main_thread.stack_start < (VALUE *)addr)) { native_main_thread.stack_start = (VALUE *)addr; } #endif @@ -2049,9 +2065,19 @@ ruby_init_stack(volatile VALUE *addr) {int err = (expr); if (err) {rb_bug_errno(#expr, err);}} static int -native_thread_init_stack(rb_thread_t *th) +native_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame) { rb_nativethread_id_t curr = pthread_self(); +#ifdef RUBY_ASAN_ENABLED + local_in_parent_frame = asan_get_real_stack_addr(local_in_parent_frame); + th->ec->machine.asan_fake_stack_handle = asan_get_thread_fake_stack_handle(); +#endif + + if (!native_main_thread.id) { + /* This thread is the first thread, must be the main thread - + * configure the native_main_thread object */ + native_thread_init_main_thread_stack(local_in_parent_frame); + } if (pthread_equal(curr, native_main_thread.id)) { th->ec->machine.stack_start = native_main_thread.stack_start; @@ -2064,8 +2090,8 @@ native_thread_init_stack(rb_thread_t *th) size_t size; if (get_stack(&start, &size) == 0) { - uintptr_t diff = (uintptr_t)start - (uintptr_t)&curr; - th->ec->machine.stack_start = (VALUE *)&curr; + uintptr_t diff = (uintptr_t)start - (uintptr_t)local_in_parent_frame; + th->ec->machine.stack_start = local_in_parent_frame; th->ec->machine.stack_maxsize = size - diff; } } @@ -2176,16 +2202,28 @@ native_thread_create_dedicated(rb_thread_t *th) rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_word_size); th->sched.context_stack = vm_stack; - // setup - thread_sched_to_ready(TH_SCHED(th), th); - return native_thread_create0(th->nt); + int err = native_thread_create0(th->nt); + if (!err) { + // setup + thread_sched_to_ready(TH_SCHED(th), th); + } + return err; } static void call_thread_start_func_2(rb_thread_t *th) { - native_thread_init_stack(th); + /* Capture the address of a local in this stack frame to mark the beginning of the + machine stack for this thread. This is required even if we can tell the real + stack beginning from the pthread API in native_thread_init_stack, because + glibc stores some of its own data on the stack before calling into user code + on a new thread, and replacing that data on fiber-switch would break it (see + bug #13887) */ + VALUE stack_start = 0; + VALUE *stack_start_addr = asan_get_real_stack_addr(&stack_start); + + native_thread_init_stack(th, stack_start_addr); thread_start_func_2(th, th->ec->machine.stack_start); } @@ -2247,7 +2285,7 @@ nt_start(void *ptr) if (next_th && next_th->nt == NULL) { RUBY_DEBUG_LOG("nt:%d next_th:%d", (int)nt->serial, (int)next_th->serial); - thread_sched_switch0(nt->nt_context, next_th, nt); + thread_sched_switch0(nt->nt_context, next_th, nt, false); } else { RUBY_DEBUG_LOG("no schedulable threads -- next_th:%p", next_th); @@ -2259,6 +2297,11 @@ nt_start(void *ptr) // timeout -> deleted. break; } + + if (nt->dedicated) { + // SNT becomes DNT while running + break; + } } } @@ -2283,11 +2326,9 @@ rb_threadptr_remove(rb_thread_t *th) rb_vm_t *vm = th->vm; th->sched.finished = false; - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { ccan_list_add(&vm->ractor.sched.zombie_threads, &th->sched.node.zombie_threads); } - RB_VM_LOCK_LEAVE(); } #endif } @@ -2306,10 +2347,9 @@ rb_threadptr_sched_free(rb_thread_t *th) // TODO: how to free nt and nt->altstack? } - if (th->sched.context) { - ruby_xfree(th->sched.context); - VM_ASSERT((th->sched.context = NULL) == NULL); - } + ruby_xfree(th->sched.context); + th->sched.context = NULL; + // VM_ASSERT(th->sched.context == NULL); #else ruby_xfree(th->sched.context_stack); native_thread_destroy(th->nt); @@ -2520,10 +2560,7 @@ ubf_wakeup_thread(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u thread_id:%p", rb_th_serial(th), (void *)th->nt->thread_id); - int r = pthread_kill(th->nt->thread_id, SIGVTALRM); - if (r != 0) { - rb_bug_errno("pthread_kill", r); - } + pthread_kill(th->nt->thread_id, SIGVTALRM); } static void @@ -2544,16 +2581,14 @@ ubf_threads_empty(void) static void ubf_wakeup_all_threads(void) { - if (!ubf_threads_empty()) { - rb_thread_t *th; - rb_native_mutex_lock(&ubf_list_lock); - { - ccan_list_for_each(&ubf_list_head, th, sched.node.ubf) { - ubf_wakeup_thread(th); - } + rb_thread_t *th; + rb_native_mutex_lock(&ubf_list_lock); + { + ccan_list_for_each(&ubf_list_head, th, sched.node.ubf) { + ubf_wakeup_thread(th); } - rb_native_mutex_unlock(&ubf_list_lock); } + rb_native_mutex_unlock(&ubf_list_lock); } #else /* USE_UBF_LIST */ @@ -2578,7 +2613,7 @@ rb_thread_wakeup_timer_thread(int sig) timer_thread_wakeup_force(); // interrupt main thread if main thread is available - if (system_working) { + if (RUBY_ATOMIC_LOAD(system_working)) { rb_vm_t *vm = GET_VM(); rb_thread_t *main_th = vm->ractor.main_thread; @@ -2822,10 +2857,21 @@ static struct { static void timer_thread_check_timeslice(rb_vm_t *vm); static int timer_thread_set_timeout(rb_vm_t *vm); -static void timer_thread_wakeup_thread(rb_thread_t *th); +static void timer_thread_wakeup_thread(rb_thread_t *th, uint32_t event_serial); #include "thread_pthread_mn.c" +static rb_thread_t * +thread_sched_waiting_thread(struct rb_thread_sched_waiting *w) +{ + if (w) { + return (rb_thread_t *)((size_t)w - offsetof(rb_thread_t, sched.waiting_reason)); + } + else { + return NULL; + } +} + static int timer_thread_set_timeout(rb_vm_t *vm) { @@ -2855,22 +2901,29 @@ timer_thread_set_timeout(rb_vm_t *vm) } ractor_sched_unlock(vm, NULL); - if (vm->ractor.sched.timeslice_wait_inf) { - rb_native_mutex_lock(&timer_th.waiting_lock); - { - rb_thread_t *th = ccan_list_top(&timer_th.waiting, rb_thread_t, sched.waiting_reason.node); - if (th && (th->sched.waiting_reason.flags & thread_sched_waiting_timeout)) { - rb_hrtime_t now = rb_hrtime_now(); - rb_hrtime_t hrrel = rb_hrtime_sub(th->sched.waiting_reason.data.timeout, now); + // Always check waiting threads to find minimum timeout + // even when scheduler has work (grq_cnt > 0) + rb_native_mutex_lock(&timer_th.waiting_lock); + { + struct rb_thread_sched_waiting *w = ccan_list_top(&timer_th.waiting, struct rb_thread_sched_waiting, node); + rb_thread_t *th = thread_sched_waiting_thread(w); + + if (th && (th->sched.waiting_reason.flags & thread_sched_waiting_timeout)) { + rb_hrtime_t now = rb_hrtime_now(); + rb_hrtime_t hrrel = rb_hrtime_sub(th->sched.waiting_reason.data.timeout, now); + + RUBY_DEBUG_LOG("th:%u now:%lu rel:%lu", rb_th_serial(th), (unsigned long)now, (unsigned long)hrrel); - RUBY_DEBUG_LOG("th:%u now:%lu rel:%lu", rb_th_serial(th), (unsigned long)now, (unsigned long)hrrel); + // TODO: overflow? + int thread_timeout = (int)((hrrel + RB_HRTIME_PER_MSEC - 1) / RB_HRTIME_PER_MSEC); // ms - // TODO: overflow? - timeout = (int)((hrrel + RB_HRTIME_PER_MSEC - 1) / RB_HRTIME_PER_MSEC); // ms + // Use minimum of scheduler timeout and thread sleep timeout + if (timeout < 0 || thread_timeout < timeout) { + timeout = thread_timeout; } } - rb_native_mutex_unlock(&timer_th.waiting_lock); } + rb_native_mutex_unlock(&timer_th.waiting_lock); RUBY_DEBUG_LOG("timeout:%d inf:%d", timeout, (int)vm->ractor.sched.timeslice_wait_inf); @@ -2906,23 +2959,25 @@ timer_thread_check_exceed(rb_hrtime_t abs, rb_hrtime_t now) } static rb_thread_t * -timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now) +timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now, uint32_t *event_serial) { - rb_thread_t *th = ccan_list_top(&timer_th.waiting, rb_thread_t, sched.waiting_reason.node); + struct rb_thread_sched_waiting *w = ccan_list_top(&timer_th.waiting, struct rb_thread_sched_waiting, node); - if (th != NULL && - (th->sched.waiting_reason.flags & thread_sched_waiting_timeout) && - timer_thread_check_exceed(th->sched.waiting_reason.data.timeout, now)) { + if (w != NULL && + (w->flags & thread_sched_waiting_timeout) && + timer_thread_check_exceed(w->data.timeout, now)) { - RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(th)); + RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(thread_sched_waiting_thread(w))); // delete from waiting list - ccan_list_del_init(&th->sched.waiting_reason.node); + ccan_list_del_init(&w->node); // setup result - th->sched.waiting_reason.flags = thread_sched_waiting_none; - th->sched.waiting_reason.data.result = 0; + w->flags = thread_sched_waiting_none; + w->data.result = 0; + rb_thread_t *th = thread_sched_waiting_thread(w); + *event_serial = w->data.event_serial; return th; } @@ -2930,19 +2985,22 @@ timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now) } static void -timer_thread_wakeup_thread(rb_thread_t *th) +timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial) +{ + if (sched->running != th && th->sched.event_serial == event_serial) { + thread_sched_to_ready_common(sched, th, true, false); + } +} + +static void +timer_thread_wakeup_thread(rb_thread_t *th, uint32_t event_serial) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); struct rb_thread_sched *sched = TH_SCHED(th); thread_sched_lock(sched, th); { - if (sched->running != th) { - thread_sched_to_ready_common(sched, th, true, false); - } - else { - // will be release the execution right - } + timer_thread_wakeup_thread_locked(sched, th, event_serial); } thread_sched_unlock(sched, th); } @@ -2952,11 +3010,14 @@ timer_thread_check_timeout(rb_vm_t *vm) { rb_hrtime_t now = rb_hrtime_now(); rb_thread_t *th; + uint32_t event_serial; rb_native_mutex_lock(&timer_th.waiting_lock); { - while ((th = timer_thread_deq_wakeup(vm, now)) != NULL) { - timer_thread_wakeup_thread(th); + while ((th = timer_thread_deq_wakeup(vm, now, &event_serial)) != NULL) { + rb_native_mutex_unlock(&timer_th.waiting_lock); + timer_thread_wakeup_thread(th, event_serial); + rb_native_mutex_lock(&timer_th.waiting_lock); } } rb_native_mutex_unlock(&timer_th.waiting_lock); @@ -2996,12 +3057,12 @@ timer_thread_func(void *ptr) RUBY_DEBUG_LOG("started%s", ""); - while (system_working) { + while (RUBY_ATOMIC_LOAD(system_working)) { timer_thread_check_signal(vm); timer_thread_check_timeout(vm); ubf_wakeup_all_threads(); - RUBY_DEBUG_LOG("system_working:%d", system_working); + RUBY_DEBUG_LOG("system_working:%d", RUBY_ATOMIC_LOAD(system_working)); timer_thread_polling(vm); } @@ -3115,18 +3176,16 @@ rb_thread_create_timer_thread(void) static int native_stop_timer_thread(void) { - int stopped; - stopped = --system_working <= 0; + RUBY_ATOMIC_SET(system_working, 0); - if (stopped) { - RUBY_DEBUG_LOG("wakeup send %d", timer_th.comm_fds[1]); - timer_thread_wakeup_force(); - RUBY_DEBUG_LOG("wakeup sent"); - pthread_join(timer_th.pthread_id, NULL); - } + RUBY_DEBUG_LOG("wakeup send %d", timer_th.comm_fds[1]); + timer_thread_wakeup_force(); + RUBY_DEBUG_LOG("wakeup sent"); + pthread_join(timer_th.pthread_id, NULL); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); - return stopped; + + return 1; } static void @@ -3144,8 +3203,12 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) const size_t water_mark = 1024 * 1024; STACK_GROW_DIR_DETECTION; + if (th) { + size = th->ec->machine.stack_maxsize; + base = (char *)th->ec->machine.stack_start - STACK_DIR_UPPER(0, size); + } #ifdef STACKADDR_AVAILABLE - if (get_stack(&base, &size) == 0) { + else if (get_stack(&base, &size) == 0) { # ifdef __APPLE__ if (pthread_equal(th->nt->thread_id, native_main_thread.id)) { struct rlimit rlim; @@ -3156,15 +3219,11 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) # endif base = (char *)base + STACK_DIR_UPPER(+size, -size); } - else #endif - if (th) { - size = th->ec->machine.stack_maxsize; - base = (char *)th->ec->machine.stack_start - STACK_DIR_UPPER(0, size); - } else { return 0; } + size /= RUBY_STACK_SPACE_RATIO; if (size > water_mark) size = water_mark; if (IS_STACK_DIR_UPPER()) { @@ -3285,6 +3344,52 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel) RUBY_DEBUG_LOG("wakeup"); } +// fork read-write lock (only for pthread) +static pthread_rwlock_t rb_thread_fork_rw_lock = PTHREAD_RWLOCK_INITIALIZER; + +void +rb_thread_release_fork_lock(void) +{ + int r; + if ((r = pthread_rwlock_unlock(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_unlock", r); + } +} + +void +rb_thread_reset_fork_lock(void) +{ + int r; + if ((r = pthread_rwlock_destroy(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_destroy", r); + } + + if ((r = pthread_rwlock_init(&rb_thread_fork_rw_lock, NULL))) { + rb_bug_errno("pthread_rwlock_init", r); + } +} + +void * +rb_thread_prevent_fork(void *(*func)(void *), void *data) +{ + int r; + if ((r = pthread_rwlock_rdlock(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_rdlock", r); + } + void *result = func(data); + rb_thread_release_fork_lock(); + return result; +} + +void +rb_thread_acquire_fork_lock(void) +{ + int r; + if ((r = pthread_rwlock_wrlock(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_wrlock", r); + } +} + // thread internal event hooks (only for pthread) struct rb_internal_thread_event_hook { @@ -3379,4 +3484,23 @@ rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th) } } +// return true if the current thread acquires DNT. +// return false if the current thread already acquires DNT. +bool +rb_thread_lock_native_thread(void) +{ + rb_thread_t *th = GET_THREAD(); + bool is_snt = th->nt->dedicated == 0; + native_thread_dedicated_inc(th->vm, th->ractor, th->nt); + + return is_snt; +} + +void +rb_thread_malloc_stack_set(rb_thread_t *th, void *stack) +{ + th->sched.malloc_stack = true; + th->sched.context_stack = stack; +} + #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ |
