diff options
Diffstat (limited to 'thread_pthread_mn.c')
| -rw-r--r-- | thread_pthread_mn.c | 249 |
1 files changed, 158 insertions, 91 deletions
diff --git a/thread_pthread_mn.c b/thread_pthread_mn.c index f44c8b18a2..72a5d8fce2 100644 --- a/thread_pthread_mn.c +++ b/thread_pthread_mn.c @@ -3,26 +3,23 @@ #if USE_MN_THREADS static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags); +static void timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial); static bool timer_thread_cancel_waiting(rb_thread_t *th) { bool canceled = false; - if (th->sched.waiting_reason.flags) { - rb_native_mutex_lock(&timer_th.waiting_lock); - { - if (th->sched.waiting_reason.flags) { - canceled = true; - ccan_list_del_init(&th->sched.waiting_reason.node); - if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) { - timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags); - } - th->sched.waiting_reason.flags = thread_sched_waiting_none; - } + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (th->sched.waiting_reason.flags) { + canceled = true; + ccan_list_del_init(&th->sched.waiting_reason.node); + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags); + th->sched.waiting_reason.flags = thread_sched_waiting_none; } - rb_native_mutex_unlock(&timer_th.waiting_lock); } + rb_native_mutex_unlock(&timer_th.waiting_lock); return canceled; } @@ -41,10 +38,10 @@ ubf_event_waiting(void *ptr) th->unblock.func = NULL; th->unblock.arg = NULL; - bool canceled = timer_thread_cancel_waiting(th); - thread_sched_lock(sched, th); { + bool canceled = timer_thread_cancel_waiting(th); + if (sched->running == th) { RUBY_DEBUG_LOG("not waiting yet"); } @@ -58,7 +55,7 @@ ubf_event_waiting(void *ptr) thread_sched_unlock(sched, th); } -static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel); +static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial); // return true if timed out static bool @@ -68,20 +65,27 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, volatile bool timedout = false, need_cancel = false; - if (timer_thread_register_waiting(th, fd, events, rel)) { - RUBY_DEBUG_LOG("wait fd:%d", fd); + uint32_t event_serial = ++th->sched.event_serial; // overflow is okay - RB_VM_SAVE_MACHINE_CONTEXT(th); - setup_ubf(th, ubf_event_waiting, (void *)th); + if (ubf_set(th, ubf_event_waiting, (void *)th)) { + return false; + } - RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + thread_sched_lock(sched, th); + { + if (timer_thread_register_waiting(th, fd, events, rel, event_serial)) { + RUBY_DEBUG_LOG("wait fd:%d", fd); + + RB_VM_SAVE_MACHINE_CONTEXT(th); + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); - thread_sched_lock(sched, th); - { if (th->sched.waiting_reason.flags == thread_sched_waiting_none) { - // already awaken + th->sched.event_serial++; + // timer thread has dequeued us already, but it won't try to wake us because we bumped our serial } else if (RUBY_VM_INTERRUPTED(th->ec)) { + th->sched.event_serial++; // make sure timer thread doesn't try to wake us need_cancel = true; } else { @@ -95,21 +99,22 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, } timedout = th->sched.waiting_reason.data.result == 0; - } - thread_sched_unlock(sched, th); - - if (need_cancel) { - timer_thread_cancel_waiting(th); - } - setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL? + if (need_cancel) { + timer_thread_cancel_waiting(th); + } - th->status = THREAD_RUNNABLE; - } - else { - RUBY_DEBUG_LOG("can not wait fd:%d", fd); - return false; + th->status = THREAD_RUNNABLE; + } + else { + RUBY_DEBUG_LOG("can not wait fd:%d", fd); + timedout = false; + } } + thread_sched_unlock(sched, th); + + // if ubf triggered between sched unlock and ubf clear, sched->running == th here + ubf_clear(th); VM_ASSERT(sched->running == th); @@ -194,6 +199,8 @@ nt_alloc_thread_stack_chunk(void) return NULL; } + ruby_annotate_mmap(m, MSTACK_CHUNK_SIZE, "Ruby:nt_alloc_thread_stack_chunk"); + size_t msz = nt_thread_stack_size(); int header_page_cnt = 1; int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; @@ -338,6 +345,30 @@ nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack) } static void +nt_madvise_free_or_dontneed(void *addr, size_t len) +{ + /* There is no real way to perform error handling here. Both MADV_FREE + * and MADV_DONTNEED are both documented to pretty much only return EINVAL + * for a huge variety of errors. It's indistinguishable if madvise fails + * because the parameters were bad, or because the kernel we're running on + * does not support the given advice. This kind of free-but-don't-unmap + * is best-effort anyway, so don't sweat it. + * + * n.b. A very common case of "the kernel doesn't support MADV_FREE and + * returns EINVAL" is running under the `rr` debugger; it makes all + * MADV_FREE calls return EINVAL. */ + +#if defined(MADV_FREE) + int r = madvise(addr, len, MADV_FREE); + // Return on success, or else try MADV_DONTNEED + if (r == 0) return; +#endif +#if defined(MADV_DONTNEED) + madvise(addr, len, MADV_DONTNEED); +#endif +} + +static void nt_free_stack(void *mstack) { if (!mstack) return; @@ -358,19 +389,12 @@ nt_free_stack(void *mstack) ch->free_stack[ch->free_stack_pos++] = idx; // clear the stack pages -#if defined(MADV_FREE) - int r = madvise(stack, nt_thread_stack_size(), MADV_FREE); -#elif defined(MADV_DONTNEED) - int r = madvise(stack, nt_thread_stack_size(), MADV_DONTNEED); -#else - int r = 0; -#endif - - if (r != 0) rb_bug("madvise errno:%d", errno); + nt_madvise_free_or_dontneed(stack, nt_thread_stack_size()); } rb_native_mutex_unlock(&nt_machine_stack_lock); } + static int native_thread_check_and_create_shared(rb_vm_t *vm) { @@ -378,11 +402,15 @@ native_thread_check_and_create_shared(rb_vm_t *vm) rb_native_mutex_lock(&vm->ractor.sched.lock); { - unsigned int snt_cnt = vm->ractor.sched.snt_cnt; - if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) snt_cnt++; // do not need snt for main ractor + unsigned int schedulable_ractor_cnt = vm->ractor.cnt; + RUBY_ASSERT(schedulable_ractor_cnt >= 1); + + if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) + schedulable_ractor_cnt--; // do not need snt for main ractor + unsigned int snt_cnt = vm->ractor.sched.snt_cnt; if (((int)snt_cnt < MINIMUM_SNT) || - (snt_cnt < vm->ractor.cnt && + (snt_cnt < schedulable_ractor_cnt && snt_cnt < vm->ractor.sched.max_cpu)) { RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u", @@ -410,9 +438,19 @@ native_thread_check_and_create_shared(rb_vm_t *vm) } } -static COROUTINE +#ifdef __APPLE__ +# define co_start ruby_coroutine_start +#else +static +#endif +COROUTINE co_start(struct coroutine_context *from, struct coroutine_context *self) { +#ifdef RUBY_ASAN_ENABLED + __sanitizer_finish_switch_fiber(self->fake_stack, + (const void**)&from->stack_base, &from->stack_size); +#endif + rb_thread_t *th = (rb_thread_t *)self->argument; struct rb_thread_sched *sched = TH_SCHED(th); VM_ASSERT(th->nt != NULL); @@ -421,7 +459,7 @@ co_start(struct coroutine_context *from, struct coroutine_context *self) // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); thread_sched_add_running_thread(TH_SCHED(th), th); thread_sched_unlock(sched, th); { @@ -434,27 +472,33 @@ co_start(struct coroutine_context *from, struct coroutine_context *self) // Thread is terminated - VM_ASSERT(!th_has_dedicated_nt(th)); - - rb_vm_t *vm = th->vm; - bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued - - rb_thread_t *next_th = sched->running; struct rb_native_thread *nt = th->nt; + bool is_dnt = th_has_dedicated_nt(th); native_thread_assign(NULL, th); rb_ractor_set_current_ec(th->ractor, NULL); - if (!has_ready_ractor && next_th && !next_th->nt) { - // switch to the next thread - thread_sched_set_lock_owner(sched, NULL); - thread_sched_switch0(th->sched.context, next_th, nt); + if (is_dnt) { + // SNT became DNT while running. Just return to the nt_context + th->sched.finished = true; + coroutine_transfer0(self, nt->nt_context, true); } else { - // switch to the next Ractor - th->sched.finished = true; - coroutine_transfer(self, nt->nt_context); + rb_thread_t *next_th = sched->running; + + if (next_th && !next_th->nt) { + // switch to the next thread + thread_sched_set_unlocked(sched, NULL); + th->sched.finished = true; + thread_sched_switch0(th->sched.context, next_th, nt, true); + } + else { + // switch to the next Ractor + th->sched.finished = true; + coroutine_transfer0(self, nt->nt_context, true); + } } + rb_bug("unreachable"); } @@ -533,15 +577,18 @@ static void verify_waiting_list(void) { #if VM_CHECK_MODE > 0 - rb_thread_t *wth, *prev_wth = NULL; - ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + struct rb_thread_sched_waiting *w, *prev_w = NULL; + + // waiting list's timeout order should be [1, 2, 3, ..., 0, 0, 0] + + ccan_list_for_each(&timer_th.waiting, w, node) { // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout); - if (prev_wth) { - rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout; - rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout; + if (prev_w) { + rb_hrtime_t timeout = w->data.timeout; + rb_hrtime_t prev_timeout = w->data.timeout; VM_ASSERT(timeout == 0 || prev_timeout <= timeout); } - prev_wth = wth; + prev_w = w; } #endif } @@ -570,11 +617,17 @@ kqueue_wait(rb_vm_t *vm) struct timespec *timeout = NULL; int timeout_ms = timer_thread_set_timeout(vm); - if (timeout_ms >= 0) { + if (timeout_ms > 0) { calculated_timeout.tv_sec = timeout_ms / 1000; calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000; timeout = &calculated_timeout; } + else if (timeout_ms == 0) { + // Relying on the absence of other members of struct timespec is not strictly portable, + // and kevent needs a 0-valued timespec to mean immediate timeout. + memset(&calculated_timeout, 0, sizeof(struct timespec)); + timeout = &calculated_timeout; + } return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout); } @@ -619,23 +672,24 @@ kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags) static bool kqueue_already_registered(int fd) { - rb_thread_t *wth, *found_wth = NULL; - ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + struct rb_thread_sched_waiting *w, *found_w = NULL; + + ccan_list_for_each(&timer_th.waiting, w, node) { // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags // for simplicity - if (wth->sched.waiting_reason.flags && wth->sched.waiting_reason.data.fd == fd) { - found_wth = wth; + if (w->flags && w->data.fd == fd) { + found_w = w; break; } } - return found_wth != NULL; + return found_w != NULL; } #endif // HAVE_SYS_EVENT_H // return false if the fd is not waitable or not need to wait. static bool -timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel) +timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial) { RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0); @@ -741,7 +795,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting case EPERM: // the fd doesn't support epoll case EEXIST: - // the fd is already registerred by another thread + // the fd is already registered by another thread rb_native_mutex_unlock(&timer_th.waiting_lock); return false; default: @@ -762,6 +816,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting th->sched.waiting_reason.data.timeout = abs; th->sched.waiting_reason.data.fd = fd; th->sched.waiting_reason.data.result = 0; + th->sched.waiting_reason.data.event_serial = event_serial; } if (abs == 0) { // no timeout @@ -769,24 +824,24 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node); } else { - RUBY_DEBUG_LOG("abs:%lu", abs); + RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs); VM_ASSERT(flags & thread_sched_waiting_timeout); // insert th to sorted list (TODO: O(n)) - rb_thread_t *wth, *prev_wth = NULL; + struct rb_thread_sched_waiting *w, *prev_w = NULL; - ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { - if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) && - wth->sched.waiting_reason.data.timeout < abs) { - prev_wth = wth; + ccan_list_for_each(&timer_th.waiting, w, node) { + if ((w->flags & thread_sched_waiting_timeout) && + w->data.timeout < abs) { + prev_w = w; } else { break; } } - if (prev_wth) { - ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node); + if (prev_w) { + ccan_list_add_after(&timer_th.waiting, &prev_w->node, &th->sched.waiting_reason.node); } else { ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node); @@ -794,8 +849,8 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting verify_waiting_list(); - // update timeout seconds - timer_thread_wakeup(); + // update timeout seconds; force wake so timer thread notices short deadlines + timer_thread_wakeup_force(); } } else { @@ -810,6 +865,10 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags) { + if (!(th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write))) { + return; + } + RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd); #if HAVE_SYS_EVENT_H kqueue_unregister_waiting(fd, flags); @@ -840,7 +899,7 @@ timer_thread_setup_mn(void) #endif RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]); - timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL); + timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL, 0); } static int @@ -929,6 +988,8 @@ timer_thread_polling(rb_vm_t *vm) (filter == EVFILT_READ) ? "read/" : "", (filter == EVFILT_WRITE) ? "write/" : ""); + struct rb_thread_sched *sched = TH_SCHED(th); + thread_sched_lock(sched, th); rb_native_mutex_lock(&timer_th.waiting_lock); { if (th->sched.waiting_reason.flags) { @@ -939,14 +1000,16 @@ timer_thread_polling(rb_vm_t *vm) th->sched.waiting_reason.flags = thread_sched_waiting_none; th->sched.waiting_reason.data.fd = -1; th->sched.waiting_reason.data.result = filter; + uint32_t event_serial = th->sched.waiting_reason.data.event_serial; - timer_thread_wakeup_thread(th); + timer_thread_wakeup_thread_locked(sched, th, event_serial); } else { // already released } } rb_native_mutex_unlock(&timer_th.waiting_lock); + thread_sched_unlock(sched, th); } } #else @@ -971,6 +1034,8 @@ timer_thread_polling(rb_vm_t *vm) (events & EPOLLERR) ? "err/" : "", (events & EPOLLHUP) ? "hup/" : ""); + struct rb_thread_sched *sched = TH_SCHED(th); + thread_sched_lock(sched, th); rb_native_mutex_lock(&timer_th.waiting_lock); { if (th->sched.waiting_reason.flags) { @@ -981,14 +1046,16 @@ timer_thread_polling(rb_vm_t *vm) th->sched.waiting_reason.flags = thread_sched_waiting_none; th->sched.waiting_reason.data.fd = -1; th->sched.waiting_reason.data.result = (int)events; + uint32_t event_serial = th->sched.waiting_reason.data.event_serial; - timer_thread_wakeup_thread(th); + timer_thread_wakeup_thread_locked(sched, th, event_serial); } else { // already released } } rb_native_mutex_unlock(&timer_th.waiting_lock); + thread_sched_unlock(sched, th); } } #endif @@ -1017,12 +1084,12 @@ timer_thread_polling(rb_vm_t *vm) switch (r) { case 0: // timeout - rb_native_mutex_lock(&vm->ractor.sched.lock); + ractor_sched_lock(vm, NULL); { // (1-1) timeslice timer_thread_check_timeslice(vm); } - rb_native_mutex_unlock(&vm->ractor.sched.lock); + ractor_sched_unlock(vm, NULL); break; case -1: // error |
