summaryrefslogtreecommitdiff
path: root/thread_pthread_mn.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_pthread_mn.c')
-rw-r--r--thread_pthread_mn.c249
1 files changed, 158 insertions, 91 deletions
diff --git a/thread_pthread_mn.c b/thread_pthread_mn.c
index f44c8b18a2..72a5d8fce2 100644
--- a/thread_pthread_mn.c
+++ b/thread_pthread_mn.c
@@ -3,26 +3,23 @@
#if USE_MN_THREADS
static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags);
+static void timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial);
static bool
timer_thread_cancel_waiting(rb_thread_t *th)
{
bool canceled = false;
- if (th->sched.waiting_reason.flags) {
- rb_native_mutex_lock(&timer_th.waiting_lock);
- {
- if (th->sched.waiting_reason.flags) {
- canceled = true;
- ccan_list_del_init(&th->sched.waiting_reason.node);
- if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) {
- timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
- }
- th->sched.waiting_reason.flags = thread_sched_waiting_none;
- }
+ rb_native_mutex_lock(&timer_th.waiting_lock);
+ {
+ if (th->sched.waiting_reason.flags) {
+ canceled = true;
+ ccan_list_del_init(&th->sched.waiting_reason.node);
+ timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
+ th->sched.waiting_reason.flags = thread_sched_waiting_none;
}
- rb_native_mutex_unlock(&timer_th.waiting_lock);
}
+ rb_native_mutex_unlock(&timer_th.waiting_lock);
return canceled;
}
@@ -41,10 +38,10 @@ ubf_event_waiting(void *ptr)
th->unblock.func = NULL;
th->unblock.arg = NULL;
- bool canceled = timer_thread_cancel_waiting(th);
-
thread_sched_lock(sched, th);
{
+ bool canceled = timer_thread_cancel_waiting(th);
+
if (sched->running == th) {
RUBY_DEBUG_LOG("not waiting yet");
}
@@ -58,7 +55,7 @@ ubf_event_waiting(void *ptr)
thread_sched_unlock(sched, th);
}
-static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel);
+static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial);
// return true if timed out
static bool
@@ -68,20 +65,27 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd,
volatile bool timedout = false, need_cancel = false;
- if (timer_thread_register_waiting(th, fd, events, rel)) {
- RUBY_DEBUG_LOG("wait fd:%d", fd);
+ uint32_t event_serial = ++th->sched.event_serial; // overflow is okay
- RB_VM_SAVE_MACHINE_CONTEXT(th);
- setup_ubf(th, ubf_event_waiting, (void *)th);
+ if (ubf_set(th, ubf_event_waiting, (void *)th)) {
+ return false;
+ }
- RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
+ thread_sched_lock(sched, th);
+ {
+ if (timer_thread_register_waiting(th, fd, events, rel, event_serial)) {
+ RUBY_DEBUG_LOG("wait fd:%d", fd);
+
+ RB_VM_SAVE_MACHINE_CONTEXT(th);
+
+ RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
- thread_sched_lock(sched, th);
- {
if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
- // already awaken
+ th->sched.event_serial++;
+ // timer thread has dequeued us already, but it won't try to wake us because we bumped our serial
}
else if (RUBY_VM_INTERRUPTED(th->ec)) {
+ th->sched.event_serial++; // make sure timer thread doesn't try to wake us
need_cancel = true;
}
else {
@@ -95,21 +99,22 @@ thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd,
}
timedout = th->sched.waiting_reason.data.result == 0;
- }
- thread_sched_unlock(sched, th);
-
- if (need_cancel) {
- timer_thread_cancel_waiting(th);
- }
- setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL?
+ if (need_cancel) {
+ timer_thread_cancel_waiting(th);
+ }
- th->status = THREAD_RUNNABLE;
- }
- else {
- RUBY_DEBUG_LOG("can not wait fd:%d", fd);
- return false;
+ th->status = THREAD_RUNNABLE;
+ }
+ else {
+ RUBY_DEBUG_LOG("can not wait fd:%d", fd);
+ timedout = false;
+ }
}
+ thread_sched_unlock(sched, th);
+
+ // if ubf triggered between sched unlock and ubf clear, sched->running == th here
+ ubf_clear(th);
VM_ASSERT(sched->running == th);
@@ -194,6 +199,8 @@ nt_alloc_thread_stack_chunk(void)
return NULL;
}
+ ruby_annotate_mmap(m, MSTACK_CHUNK_SIZE, "Ruby:nt_alloc_thread_stack_chunk");
+
size_t msz = nt_thread_stack_size();
int header_page_cnt = 1;
int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
@@ -338,6 +345,30 @@ nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack)
}
static void
+nt_madvise_free_or_dontneed(void *addr, size_t len)
+{
+ /* There is no real way to perform error handling here. Both MADV_FREE
+ * and MADV_DONTNEED are both documented to pretty much only return EINVAL
+ * for a huge variety of errors. It's indistinguishable if madvise fails
+ * because the parameters were bad, or because the kernel we're running on
+ * does not support the given advice. This kind of free-but-don't-unmap
+ * is best-effort anyway, so don't sweat it.
+ *
+ * n.b. A very common case of "the kernel doesn't support MADV_FREE and
+ * returns EINVAL" is running under the `rr` debugger; it makes all
+ * MADV_FREE calls return EINVAL. */
+
+#if defined(MADV_FREE)
+ int r = madvise(addr, len, MADV_FREE);
+ // Return on success, or else try MADV_DONTNEED
+ if (r == 0) return;
+#endif
+#if defined(MADV_DONTNEED)
+ madvise(addr, len, MADV_DONTNEED);
+#endif
+}
+
+static void
nt_free_stack(void *mstack)
{
if (!mstack) return;
@@ -358,19 +389,12 @@ nt_free_stack(void *mstack)
ch->free_stack[ch->free_stack_pos++] = idx;
// clear the stack pages
-#if defined(MADV_FREE)
- int r = madvise(stack, nt_thread_stack_size(), MADV_FREE);
-#elif defined(MADV_DONTNEED)
- int r = madvise(stack, nt_thread_stack_size(), MADV_DONTNEED);
-#else
- int r = 0;
-#endif
-
- if (r != 0) rb_bug("madvise errno:%d", errno);
+ nt_madvise_free_or_dontneed(stack, nt_thread_stack_size());
}
rb_native_mutex_unlock(&nt_machine_stack_lock);
}
+
static int
native_thread_check_and_create_shared(rb_vm_t *vm)
{
@@ -378,11 +402,15 @@ native_thread_check_and_create_shared(rb_vm_t *vm)
rb_native_mutex_lock(&vm->ractor.sched.lock);
{
- unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
- if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) snt_cnt++; // do not need snt for main ractor
+ unsigned int schedulable_ractor_cnt = vm->ractor.cnt;
+ RUBY_ASSERT(schedulable_ractor_cnt >= 1);
+
+ if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads)
+ schedulable_ractor_cnt--; // do not need snt for main ractor
+ unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
if (((int)snt_cnt < MINIMUM_SNT) ||
- (snt_cnt < vm->ractor.cnt &&
+ (snt_cnt < schedulable_ractor_cnt &&
snt_cnt < vm->ractor.sched.max_cpu)) {
RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
@@ -410,9 +438,19 @@ native_thread_check_and_create_shared(rb_vm_t *vm)
}
}
-static COROUTINE
+#ifdef __APPLE__
+# define co_start ruby_coroutine_start
+#else
+static
+#endif
+COROUTINE
co_start(struct coroutine_context *from, struct coroutine_context *self)
{
+#ifdef RUBY_ASAN_ENABLED
+ __sanitizer_finish_switch_fiber(self->fake_stack,
+ (const void**)&from->stack_base, &from->stack_size);
+#endif
+
rb_thread_t *th = (rb_thread_t *)self->argument;
struct rb_thread_sched *sched = TH_SCHED(th);
VM_ASSERT(th->nt != NULL);
@@ -421,7 +459,7 @@ co_start(struct coroutine_context *from, struct coroutine_context *self)
// RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
- thread_sched_set_lock_owner(sched, th);
+ thread_sched_set_locked(sched, th);
thread_sched_add_running_thread(TH_SCHED(th), th);
thread_sched_unlock(sched, th);
{
@@ -434,27 +472,33 @@ co_start(struct coroutine_context *from, struct coroutine_context *self)
// Thread is terminated
- VM_ASSERT(!th_has_dedicated_nt(th));
-
- rb_vm_t *vm = th->vm;
- bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued
-
- rb_thread_t *next_th = sched->running;
struct rb_native_thread *nt = th->nt;
+ bool is_dnt = th_has_dedicated_nt(th);
native_thread_assign(NULL, th);
rb_ractor_set_current_ec(th->ractor, NULL);
- if (!has_ready_ractor && next_th && !next_th->nt) {
- // switch to the next thread
- thread_sched_set_lock_owner(sched, NULL);
- thread_sched_switch0(th->sched.context, next_th, nt);
+ if (is_dnt) {
+ // SNT became DNT while running. Just return to the nt_context
+
th->sched.finished = true;
+ coroutine_transfer0(self, nt->nt_context, true);
}
else {
- // switch to the next Ractor
- th->sched.finished = true;
- coroutine_transfer(self, nt->nt_context);
+ rb_thread_t *next_th = sched->running;
+
+ if (next_th && !next_th->nt) {
+ // switch to the next thread
+ thread_sched_set_unlocked(sched, NULL);
+ th->sched.finished = true;
+ thread_sched_switch0(th->sched.context, next_th, nt, true);
+ }
+ else {
+ // switch to the next Ractor
+ th->sched.finished = true;
+ coroutine_transfer0(self, nt->nt_context, true);
+ }
}
+
rb_bug("unreachable");
}
@@ -533,15 +577,18 @@ static void
verify_waiting_list(void)
{
#if VM_CHECK_MODE > 0
- rb_thread_t *wth, *prev_wth = NULL;
- ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
+ struct rb_thread_sched_waiting *w, *prev_w = NULL;
+
+ // waiting list's timeout order should be [1, 2, 3, ..., 0, 0, 0]
+
+ ccan_list_for_each(&timer_th.waiting, w, node) {
// fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
- if (prev_wth) {
- rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout;
- rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout;
+ if (prev_w) {
+ rb_hrtime_t timeout = w->data.timeout;
+ rb_hrtime_t prev_timeout = w->data.timeout;
VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
}
- prev_wth = wth;
+ prev_w = w;
}
#endif
}
@@ -570,11 +617,17 @@ kqueue_wait(rb_vm_t *vm)
struct timespec *timeout = NULL;
int timeout_ms = timer_thread_set_timeout(vm);
- if (timeout_ms >= 0) {
+ if (timeout_ms > 0) {
calculated_timeout.tv_sec = timeout_ms / 1000;
calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
timeout = &calculated_timeout;
}
+ else if (timeout_ms == 0) {
+ // Relying on the absence of other members of struct timespec is not strictly portable,
+ // and kevent needs a 0-valued timespec to mean immediate timeout.
+ memset(&calculated_timeout, 0, sizeof(struct timespec));
+ timeout = &calculated_timeout;
+ }
return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
}
@@ -619,23 +672,24 @@ kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags)
static bool
kqueue_already_registered(int fd)
{
- rb_thread_t *wth, *found_wth = NULL;
- ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
+ struct rb_thread_sched_waiting *w, *found_w = NULL;
+
+ ccan_list_for_each(&timer_th.waiting, w, node) {
// Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags
// for simplicity
- if (wth->sched.waiting_reason.flags && wth->sched.waiting_reason.data.fd == fd) {
- found_wth = wth;
+ if (w->flags && w->data.fd == fd) {
+ found_w = w;
break;
}
}
- return found_wth != NULL;
+ return found_w != NULL;
}
#endif // HAVE_SYS_EVENT_H
// return false if the fd is not waitable or not need to wait.
static bool
-timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel)
+timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial)
{
RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
@@ -741,7 +795,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
case EPERM:
// the fd doesn't support epoll
case EEXIST:
- // the fd is already registerred by another thread
+ // the fd is already registered by another thread
rb_native_mutex_unlock(&timer_th.waiting_lock);
return false;
default:
@@ -762,6 +816,7 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
th->sched.waiting_reason.data.timeout = abs;
th->sched.waiting_reason.data.fd = fd;
th->sched.waiting_reason.data.result = 0;
+ th->sched.waiting_reason.data.event_serial = event_serial;
}
if (abs == 0) { // no timeout
@@ -769,24 +824,24 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
}
else {
- RUBY_DEBUG_LOG("abs:%lu", abs);
+ RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs);
VM_ASSERT(flags & thread_sched_waiting_timeout);
// insert th to sorted list (TODO: O(n))
- rb_thread_t *wth, *prev_wth = NULL;
+ struct rb_thread_sched_waiting *w, *prev_w = NULL;
- ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
- if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) &&
- wth->sched.waiting_reason.data.timeout < abs) {
- prev_wth = wth;
+ ccan_list_for_each(&timer_th.waiting, w, node) {
+ if ((w->flags & thread_sched_waiting_timeout) &&
+ w->data.timeout < abs) {
+ prev_w = w;
}
else {
break;
}
}
- if (prev_wth) {
- ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node);
+ if (prev_w) {
+ ccan_list_add_after(&timer_th.waiting, &prev_w->node, &th->sched.waiting_reason.node);
}
else {
ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
@@ -794,8 +849,8 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
verify_waiting_list();
- // update timeout seconds
- timer_thread_wakeup();
+ // update timeout seconds; force wake so timer thread notices short deadlines
+ timer_thread_wakeup_force();
}
}
else {
@@ -810,6 +865,10 @@ timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting
static void
timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags)
{
+ if (!(th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write))) {
+ return;
+ }
+
RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
#if HAVE_SYS_EVENT_H
kqueue_unregister_waiting(fd, flags);
@@ -840,7 +899,7 @@ timer_thread_setup_mn(void)
#endif
RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
- timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL);
+ timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL, 0);
}
static int
@@ -929,6 +988,8 @@ timer_thread_polling(rb_vm_t *vm)
(filter == EVFILT_READ) ? "read/" : "",
(filter == EVFILT_WRITE) ? "write/" : "");
+ struct rb_thread_sched *sched = TH_SCHED(th);
+ thread_sched_lock(sched, th);
rb_native_mutex_lock(&timer_th.waiting_lock);
{
if (th->sched.waiting_reason.flags) {
@@ -939,14 +1000,16 @@ timer_thread_polling(rb_vm_t *vm)
th->sched.waiting_reason.flags = thread_sched_waiting_none;
th->sched.waiting_reason.data.fd = -1;
th->sched.waiting_reason.data.result = filter;
+ uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
- timer_thread_wakeup_thread(th);
+ timer_thread_wakeup_thread_locked(sched, th, event_serial);
}
else {
// already released
}
}
rb_native_mutex_unlock(&timer_th.waiting_lock);
+ thread_sched_unlock(sched, th);
}
}
#else
@@ -971,6 +1034,8 @@ timer_thread_polling(rb_vm_t *vm)
(events & EPOLLERR) ? "err/" : "",
(events & EPOLLHUP) ? "hup/" : "");
+ struct rb_thread_sched *sched = TH_SCHED(th);
+ thread_sched_lock(sched, th);
rb_native_mutex_lock(&timer_th.waiting_lock);
{
if (th->sched.waiting_reason.flags) {
@@ -981,14 +1046,16 @@ timer_thread_polling(rb_vm_t *vm)
th->sched.waiting_reason.flags = thread_sched_waiting_none;
th->sched.waiting_reason.data.fd = -1;
th->sched.waiting_reason.data.result = (int)events;
+ uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
- timer_thread_wakeup_thread(th);
+ timer_thread_wakeup_thread_locked(sched, th, event_serial);
}
else {
// already released
}
}
rb_native_mutex_unlock(&timer_th.waiting_lock);
+ thread_sched_unlock(sched, th);
}
}
#endif
@@ -1017,12 +1084,12 @@ timer_thread_polling(rb_vm_t *vm)
switch (r) {
case 0: // timeout
- rb_native_mutex_lock(&vm->ractor.sched.lock);
+ ractor_sched_lock(vm, NULL);
{
// (1-1) timeslice
timer_thread_check_timeslice(vm);
}
- rb_native_mutex_unlock(&vm->ractor.sched.lock);
+ ractor_sched_unlock(vm, NULL);
break;
case -1: // error