diff options
Diffstat (limited to 'thread_pthread_mn.c')
-rw-r--r-- | thread_pthread_mn.c | 1061 |
1 files changed, 1061 insertions, 0 deletions
diff --git a/thread_pthread_mn.c b/thread_pthread_mn.c new file mode 100644 index 0000000000..b605d6a751 --- /dev/null +++ b/thread_pthread_mn.c @@ -0,0 +1,1061 @@ +// included by "thread_pthread.c" + +#if USE_MN_THREADS + +static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags); + +static bool +timer_thread_cancel_waiting(rb_thread_t *th) +{ + bool canceled = false; + + if (th->sched.waiting_reason.flags) { + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (th->sched.waiting_reason.flags) { + canceled = true; + ccan_list_del_init(&th->sched.waiting_reason.node); + if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) { + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags); + } + th->sched.waiting_reason.flags = thread_sched_waiting_none; + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + } + + return canceled; +} + +static void +ubf_event_waiting(void *ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + struct rb_thread_sched *sched = TH_SCHED(th); + + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th)); + + // only once. it is safe because th->interrupt_lock is already acquired. + th->unblock.func = NULL; + th->unblock.arg = NULL; + + bool canceled = timer_thread_cancel_waiting(th); + + thread_sched_lock(sched, th); + { + if (sched->running == th) { + RUBY_DEBUG_LOG("not waiting yet"); + } + else if (canceled) { + thread_sched_to_ready_common(sched, th, true, false); + } + else { + RUBY_DEBUG_LOG("already not waiting"); + } + } + thread_sched_unlock(sched, th); +} + +static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel); + +// return true if timed out +static bool +thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel) +{ + VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT + + volatile bool timedout = false, need_cancel = false; + + if (timer_thread_register_waiting(th, fd, events, rel)) { + RUBY_DEBUG_LOG("wait fd:%d", fd); + + RB_VM_SAVE_MACHINE_CONTEXT(th); + setup_ubf(th, ubf_event_waiting, (void *)th); + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + + thread_sched_lock(sched, th); + { + if (th->sched.waiting_reason.flags == thread_sched_waiting_none) { + // already awaken + } + else if (RUBY_VM_INTERRUPTED(th->ec)) { + need_cancel = true; + } + else { + RUBY_DEBUG_LOG("sleep"); + + th->status = THREAD_STOPPED_FOREVER; + thread_sched_wakeup_next_thread(sched, th, true); + thread_sched_wait_running_turn(sched, th, true); + + RUBY_DEBUG_LOG("wakeup"); + } + + timedout = th->sched.waiting_reason.data.result == 0; + } + thread_sched_unlock(sched, th); + + if (need_cancel) { + timer_thread_cancel_waiting(th); + } + + setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL? + + th->status = THREAD_RUNNABLE; + } + else { + RUBY_DEBUG_LOG("can not wait fd:%d", fd); + return false; + } + + VM_ASSERT(sched->running == th); + + return timedout; +} + +/// stack management + +static int +get_sysconf_page_size(void) +{ + static long page_size = 0; + + if (UNLIKELY(page_size == 0)) { + page_size = sysconf(_SC_PAGESIZE); + VM_ASSERT(page_size < INT_MAX); + } + return (int)page_size; +} + +#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB +#define MSTACK_PAGE_SIZE get_sysconf_page_size() +#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone + +// 512MB chunk +// 131,072 pages (> 65,536) +// 0th page is Redzone. Start from 1st page. + +/* + * <--> machine stack + vm stack + * ---------------------------------- + * |HD...|RZ| ... |RZ| ... ... |RZ| + * <------------- 512MB -------------> + */ + +static struct nt_stack_chunk_header { + struct nt_stack_chunk_header *prev_chunk; + struct nt_stack_chunk_header *prev_free_chunk; + + uint16_t start_page; + uint16_t stack_count; + uint16_t uninitialized_stack_count; + + uint16_t free_stack_pos; + uint16_t free_stack[]; +} *nt_stack_chunks = NULL, + *nt_free_stack_chunks = NULL; + +struct nt_machine_stack_footer { + struct nt_stack_chunk_header *ch; + size_t index; +}; + +static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT; + +#include <sys/mman.h> + +// vm_stack_size + machine_stack_size + 1 * (guard page size) +static inline size_t +nt_thread_stack_size(void) +{ + static size_t msz; + if (LIKELY(msz > 0)) return msz; + + rb_vm_t *vm = GET_VM(); + int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE); + int page_num = roomof(sz, MSTACK_PAGE_SIZE); + msz = (size_t)page_num * MSTACK_PAGE_SIZE; + return msz; +} + +static struct nt_stack_chunk_header * +nt_alloc_thread_stack_chunk(void) +{ + int mmap_flags = MAP_ANONYMOUS | MAP_PRIVATE; +#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__) + mmap_flags |= MAP_STACK; +#endif + + const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0); + if (m == MAP_FAILED) { + return NULL; + } + + size_t msz = nt_thread_stack_size(); + int header_page_cnt = 1; + int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; + int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count; + + if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) { + header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE; + stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; + } + + VM_ASSERT(stack_count <= UINT16_MAX); + + struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m; + + ch->start_page = header_page_cnt; + ch->prev_chunk = nt_stack_chunks; + ch->prev_free_chunk = nt_free_stack_chunks; + ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count; + ch->free_stack_pos = 0; + + RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz); + + return ch; +} + +static void * +nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx) +{ + const char *m = (char *)ch; + return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size()); +} + +static struct nt_machine_stack_footer * +nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack) +{ + // TODO: stack direction + const size_t msz = vm->default_params.thread_machine_stack_size; + return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)]; +} + +static void * +nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack) +{ + // TODO: only support stack going down + // [VM ... <GUARD> machine stack ...] + + const char *vstack, *mstack; + const char *guard_page; + vstack = nt_stack_chunk_get_stack_start(ch, idx); + guard_page = vstack + vm->default_params.thread_vm_stack_size; + mstack = guard_page + MSTACK_PAGE_SIZE; + + struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack); + msf->ch = ch; + msf->index = idx; + +#if 0 + RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf, + vstack, (void *)(guard_page-1), + guard_page, (void *)(mstack-1), + mstack, (void *)(msf)); +#endif + + *vm_stack = (void *)vstack; + *machine_stack = (void *)mstack; + + return (void *)guard_page; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static void +nt_stack_chunk_dump(void) +{ + struct nt_stack_chunk_header *ch; + int i; + + fprintf(stderr, "** nt_stack_chunks\n"); + ch = nt_stack_chunks; + for (i=0; ch; i++, ch = ch->prev_chunk) { + fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos); + } + + fprintf(stderr, "** nt_free_stack_chunks\n"); + ch = nt_free_stack_chunks; + for (i=0; ch; i++, ch = ch->prev_free_chunk) { + fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos); + } +} + +static int +nt_guard_page(const char *p, size_t len) +{ + if (mprotect((void *)p, len, PROT_NONE) != -1) { + return 0; + } + else { + return errno; + } +} + +static int +nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack) +{ + int err = 0; + + rb_native_mutex_lock(&nt_machine_stack_lock); + { + retry: + if (nt_free_stack_chunks) { + struct nt_stack_chunk_header *ch = nt_free_stack_chunks; + if (ch->free_stack_pos > 0) { + RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos); + nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack); + } + else if (ch->uninitialized_stack_count > 0) { + RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count); + + size_t idx = ch->stack_count - ch->uninitialized_stack_count--; + void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack); + err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE); + } + else { + nt_free_stack_chunks = ch->prev_free_chunk; + ch->prev_free_chunk = NULL; + goto retry; + } + } + else { + struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk(); + if (p == NULL) { + err = errno; + } + else { + nt_free_stack_chunks = nt_stack_chunks = p; + goto retry; + } + } + } + rb_native_mutex_unlock(&nt_machine_stack_lock); + + return err; +} + +static void +nt_free_stack(void *mstack) +{ + if (!mstack) return; + + rb_native_mutex_lock(&nt_machine_stack_lock); + { + struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack); + struct nt_stack_chunk_header *ch = msf->ch; + int idx = (int)msf->index; + void *stack = nt_stack_chunk_get_stack_start(ch, idx); + + RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx); + + if (ch->prev_free_chunk == NULL) { + ch->prev_free_chunk = nt_free_stack_chunks; + nt_free_stack_chunks = ch; + } + ch->free_stack[ch->free_stack_pos++] = idx; + + // clear the stack pages +#if defined(MADV_FREE) + int r = madvise(stack, nt_thread_stack_size(), MADV_FREE); +#elif defined(MADV_DONTNEED) + int r = madvise(stack, nt_thread_stack_size(), MADV_DONTNEED); +#else + int r = 0; +#endif + + if (r != 0) rb_bug("madvise errno:%d", errno); + } + rb_native_mutex_unlock(&nt_machine_stack_lock); +} + +static int +native_thread_check_and_create_shared(rb_vm_t *vm) +{ + bool need_to_make = false; + + rb_native_mutex_lock(&vm->ractor.sched.lock); + { + unsigned int snt_cnt = vm->ractor.sched.snt_cnt; + if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) snt_cnt++; // do not need snt for main ractor + + if (((int)snt_cnt < MINIMUM_SNT) || + (snt_cnt < vm->ractor.cnt && + snt_cnt < vm->ractor.sched.max_cpu)) { + + RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u", + vm->ractor.sched.snt_cnt, + vm->ractor.sched.dnt_cnt, + vm->ractor.cnt, + vm->ractor.sched.grq_cnt); + + vm->ractor.sched.snt_cnt++; + need_to_make = true; + } + else { + RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt); + } + } + rb_native_mutex_unlock(&vm->ractor.sched.lock); + + if (need_to_make) { + struct rb_native_thread *nt = native_thread_alloc(); + nt->vm = vm; + return native_thread_create0(nt); + } + else { + return 0; + } +} + +static COROUTINE +co_start(struct coroutine_context *from, struct coroutine_context *self) +{ +#ifdef RUBY_ASAN_ENABLED + __sanitizer_finish_switch_fiber(self->fake_stack, + (const void**)&from->stack_base, &from->stack_size); +#endif + + rb_thread_t *th = (rb_thread_t *)self->argument; + struct rb_thread_sched *sched = TH_SCHED(th); + VM_ASSERT(th->nt != NULL); + VM_ASSERT(th == sched->running); + VM_ASSERT(sched->lock_owner == NULL); + + // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + thread_sched_set_lock_owner(sched, th); + thread_sched_add_running_thread(TH_SCHED(th), th); + thread_sched_unlock(sched, th); + { + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th); + call_thread_start_func_2(th); + } + thread_sched_lock(sched, NULL); + + RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial); + + // Thread is terminated + + struct rb_native_thread *nt = th->nt; + bool is_dnt = th_has_dedicated_nt(th); + native_thread_assign(NULL, th); + rb_ractor_set_current_ec(th->ractor, NULL); + + if (is_dnt) { + // SNT became DNT while running. Just return to the nt_context + + th->sched.finished = true; + coroutine_transfer0(self, nt->nt_context, true); + } + else { + rb_vm_t *vm = th->vm; + bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued + rb_thread_t *next_th = sched->running; + + if (!has_ready_ractor && next_th && !next_th->nt) { + // switch to the next thread + thread_sched_set_lock_owner(sched, NULL); + thread_sched_switch0(th->sched.context, next_th, nt, true); + th->sched.finished = true; + } + else { + // switch to the next Ractor + th->sched.finished = true; + coroutine_transfer0(self, nt->nt_context, true); + } + } + + rb_bug("unreachable"); +} + +static int +native_thread_create_shared(rb_thread_t *th) +{ + // setup coroutine + rb_vm_t *vm = th->vm; + void *vm_stack = NULL, *machine_stack = NULL; + int err = nt_alloc_stack(vm, &vm_stack, &machine_stack); + if (err) return err; + + VM_ASSERT(vm_stack < machine_stack); + + // setup vm stack + size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE); + rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words); + + // setup machine stack + size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer); + th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size); + th->ec->machine.stack_maxsize = machine_stack_size; // TODO + th->sched.context_stack = machine_stack; + + th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context)); + coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size); + th->sched.context->argument = th; + + RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack); + thread_sched_to_ready(TH_SCHED(th), th); + + // setup nt + return native_thread_check_and_create_shared(th->vm); +} + +#else // USE_MN_THREADS + +static int +native_thread_create_shared(rb_thread_t *th) +{ + rb_bug("unreachable"); +} + +static bool +thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel) +{ + rb_bug("unreachable"); +} + +#endif // USE_MN_THREADS + +/// EPOLL/KQUEUE specific code +#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS + +static bool +fd_readable_nonblock(int fd) +{ + struct pollfd pfd = { + .fd = fd, + .events = POLLIN, + }; + return poll(&pfd, 1, 0) != 0; +} + +static bool +fd_writable_nonblock(int fd) +{ + struct pollfd pfd = { + .fd = fd, + .events = POLLOUT, + }; + return poll(&pfd, 1, 0) != 0; +} + +static void +verify_waiting_list(void) +{ +#if VM_CHECK_MODE > 0 + rb_thread_t *wth, *prev_wth = NULL; + ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout); + if (prev_wth) { + rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout; + rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout; + VM_ASSERT(timeout == 0 || prev_timeout <= timeout); + } + prev_wth = wth; + } +#endif +} + +#if HAVE_SYS_EVENT_H // kqueue helpers + +static enum thread_sched_waiting_flag +kqueue_translate_filter_to_flags(int16_t filter) +{ + switch (filter) { + case EVFILT_READ: + return thread_sched_waiting_io_read; + case EVFILT_WRITE: + return thread_sched_waiting_io_write; + case EVFILT_TIMER: + return thread_sched_waiting_timeout; + default: + rb_bug("kevent filter:%d not supported", filter); + } +} + +static int +kqueue_wait(rb_vm_t *vm) +{ + struct timespec calculated_timeout; + struct timespec *timeout = NULL; + int timeout_ms = timer_thread_set_timeout(vm); + + if (timeout_ms >= 0) { + calculated_timeout.tv_sec = timeout_ms / 1000; + calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000; + timeout = &calculated_timeout; + } + + return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout); +} + +static void +kqueue_create(void) +{ + if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno); + int flags = fcntl(timer_th.event_fd, F_GETFD); + if (flags == -1) { + rb_bug("kqueue GETFD failed (errno:%d)", errno); + } + + flags |= FD_CLOEXEC; + if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) { + rb_bug("kqueue SETFD failed (errno:%d)", errno); + } +} + +static void +kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags) +{ + if (flags) { + struct kevent ke[2]; + int num_events = 0; + + if (flags & thread_sched_waiting_io_read) { + EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); + num_events++; + } + if (flags & thread_sched_waiting_io_write) { + EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + num_events++; + } + if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) { + perror("kevent"); + rb_bug("unregister/kevent fails. errno:%d", errno); + } + } +} + +static bool +kqueue_already_registered(int fd) +{ + rb_thread_t *wth, *found_wth = NULL; + ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags + // for simplicity + if (wth->sched.waiting_reason.flags && wth->sched.waiting_reason.data.fd == fd) { + found_wth = wth; + break; + } + } + return found_wth != NULL; +} + +#endif // HAVE_SYS_EVENT_H + +// return false if the fd is not waitable or not need to wait. +static bool +timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel) +{ + RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0); + + VM_ASSERT(th == NULL || TH_SCHED(th)->running == th); + VM_ASSERT(flags != 0); + + rb_hrtime_t abs = 0; // 0 means no timeout + + if (rel) { + if (*rel > 0) { + flags |= thread_sched_waiting_timeout; + } + else { + return false; + } + } + + if (rel && *rel > 0) { + flags |= thread_sched_waiting_timeout; + } + +#if HAVE_SYS_EVENT_H + struct kevent ke[2]; + int num_events = 0; +#else + uint32_t epoll_events = 0; +#endif + if (flags & thread_sched_waiting_timeout) { + VM_ASSERT(rel != NULL); + abs = rb_hrtime_add(rb_hrtime_now(), *rel); + } + + if (flags & thread_sched_waiting_io_read) { + if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) { + RUBY_DEBUG_LOG("fd_readable_nonblock"); + return false; + } + else { + VM_ASSERT(fd >= 0); +#if HAVE_SYS_EVENT_H + EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)th); + num_events++; +#else + epoll_events |= EPOLLIN; +#endif + } + } + + if (flags & thread_sched_waiting_io_write) { + if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) { + RUBY_DEBUG_LOG("fd_writable_nonblock"); + return false; + } + else { + VM_ASSERT(fd >= 0); +#if HAVE_SYS_EVENT_H + EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)th); + num_events++; +#else + epoll_events |= EPOLLOUT; +#endif + } + } + + rb_native_mutex_lock(&timer_th.waiting_lock); + { +#if HAVE_SYS_EVENT_H + if (num_events > 0) { + if (kqueue_already_registered(fd)) { + rb_native_mutex_unlock(&timer_th.waiting_lock); + return false; + } + + if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) { + RUBY_DEBUG_LOG("failed (%d)", errno); + + switch (errno) { + case EBADF: + // the fd is closed? + case EINTR: + // signal received? is there a sensible way to handle this? + default: + perror("kevent"); + rb_bug("register/kevent failed(fd:%d, errno:%d)", fd, errno); + } + } + RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd); + } +#else + if (epoll_events) { + struct epoll_event event = { + .events = epoll_events, + .data = { + .ptr = (void *)th, + }, + }; + if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) { + RUBY_DEBUG_LOG("failed (%d)", errno); + + switch (errno) { + case EBADF: + // the fd is closed? + case EPERM: + // the fd doesn't support epoll + case EEXIST: + // the fd is already registered by another thread + rb_native_mutex_unlock(&timer_th.waiting_lock); + return false; + default: + perror("epoll_ctl"); + rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno); + } + } + RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events); + } +#endif + + if (th) { + VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); + + // setup waiting information + { + th->sched.waiting_reason.flags = flags; + th->sched.waiting_reason.data.timeout = abs; + th->sched.waiting_reason.data.fd = fd; + th->sched.waiting_reason.data.result = 0; + } + + if (abs == 0) { // no timeout + VM_ASSERT(!(flags & thread_sched_waiting_timeout)); + ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node); + } + else { + RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs); + VM_ASSERT(flags & thread_sched_waiting_timeout); + + // insert th to sorted list (TODO: O(n)) + rb_thread_t *wth, *prev_wth = NULL; + + ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) && + wth->sched.waiting_reason.data.timeout < abs) { + prev_wth = wth; + } + else { + break; + } + } + + if (prev_wth) { + ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node); + } + else { + ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node); + } + + verify_waiting_list(); + + // update timeout seconds + timer_thread_wakeup(); + } + } + else { + VM_ASSERT(abs == 0); + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + + return true; +} + +static void +timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags) +{ + RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd); +#if HAVE_SYS_EVENT_H + kqueue_unregister_waiting(fd, flags); +#else + // Linux 2.6.9 or later is needed to pass NULL as data. + if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) { + switch (errno) { + case EBADF: + // just ignore. maybe fd is closed. + break; + default: + perror("epoll_ctl"); + rb_bug("unregister/epoll_ctl fails. errno:%d", errno); + } + } +#endif +} + +static void +timer_thread_setup_mn(void) +{ +#if HAVE_SYS_EVENT_H + kqueue_create(); + RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd); +#else + if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno); + RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.event_fd); +#endif + RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]); + + timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL); +} + +static int +event_wait(rb_vm_t *vm) +{ +#if HAVE_SYS_EVENT_H + int r = kqueue_wait(vm); +#else + int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm)); +#endif + return r; +} + +/* + * The purpose of the timer thread: + * + * (1) Periodic checking + * (1-1) Provide time slice for active NTs + * (1-2) Check NT shortage + * (1-3) Periodic UBF (global) + * (1-4) Lazy GRQ deq start + * (2) Receive notification + * (2-1) async I/O termination + * (2-2) timeout + * (2-2-1) sleep(n) + * (2-2-2) timeout(n), I/O, ... + */ +static void +timer_thread_polling(rb_vm_t *vm) +{ + int r = event_wait(vm); + + RUBY_DEBUG_LOG("r:%d errno:%d", r, errno); + + switch (r) { + case 0: // timeout + RUBY_DEBUG_LOG("timeout%s", ""); + + ractor_sched_lock(vm, NULL); + { + // (1-1) timeslice + timer_thread_check_timeslice(vm); + + // (1-4) lazy grq deq + if (vm->ractor.sched.grq_cnt > 0) { + RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt); + rb_native_cond_signal(&vm->ractor.sched.cond); + } + } + ractor_sched_unlock(vm, NULL); + + // (1-2) + native_thread_check_and_create_shared(vm); + + break; + + case -1: + switch (errno) { + case EINTR: + // simply retry + break; + default: + perror("event_wait"); + rb_bug("event_wait errno:%d", errno); + } + break; + + default: + RUBY_DEBUG_LOG("%d event(s)", r); + +#if HAVE_SYS_EVENT_H + for (int i=0; i<r; i++) { + rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].udata; + int fd = (int)timer_th.finished_events[i].ident; + int16_t filter = timer_th.finished_events[i].filter; + + if (th == NULL) { + // wakeup timerthread + RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]); + consume_communication_pipe(timer_th.comm_fds[0]); + } + else { + // wakeup specific thread by IO + RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s", + rb_th_serial(th), + (filter == EVFILT_READ) ? "read/" : "", + (filter == EVFILT_WRITE) ? "write/" : ""); + + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (th->sched.waiting_reason.flags) { + // delete from chain + ccan_list_del_init(&th->sched.waiting_reason.node); + timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter)); + + th->sched.waiting_reason.flags = thread_sched_waiting_none; + th->sched.waiting_reason.data.fd = -1; + th->sched.waiting_reason.data.result = filter; + + timer_thread_wakeup_thread(th); + } + else { + // already released + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + } + } +#else + for (int i=0; i<r; i++) { + rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr; + + if (th == NULL) { + // wakeup timerthread + RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]); + consume_communication_pipe(timer_th.comm_fds[0]); + } + else { + // wakeup specific thread by IO + uint32_t events = timer_th.finished_events[i].events; + + RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s", + rb_th_serial(th), + (events & EPOLLIN) ? "in/" : "", + (events & EPOLLOUT) ? "out/" : "", + (events & EPOLLRDHUP) ? "RDHUP/" : "", + (events & EPOLLPRI) ? "pri/" : "", + (events & EPOLLERR) ? "err/" : "", + (events & EPOLLHUP) ? "hup/" : ""); + + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (th->sched.waiting_reason.flags) { + // delete from chain + ccan_list_del_init(&th->sched.waiting_reason.node); + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags); + + th->sched.waiting_reason.flags = thread_sched_waiting_none; + th->sched.waiting_reason.data.fd = -1; + th->sched.waiting_reason.data.result = (int)events; + + timer_thread_wakeup_thread(th); + } + else { + // already released + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + } + } +#endif + } +} + +#else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H + +static void +timer_thread_setup_mn(void) +{ + // do nothing +} + +static void +timer_thread_polling(rb_vm_t *vm) +{ + int timeout = timer_thread_set_timeout(vm); + + struct pollfd pfd = { + .fd = timer_th.comm_fds[0], + .events = POLLIN, + }; + + int r = poll(&pfd, 1, timeout); + + switch (r) { + case 0: // timeout + rb_native_mutex_lock(&vm->ractor.sched.lock); + { + // (1-1) timeslice + timer_thread_check_timeslice(vm); + } + rb_native_mutex_unlock(&vm->ractor.sched.lock); + break; + + case -1: // error + switch (errno) { + case EINTR: + // simply retry + break; + default: + perror("poll"); + rb_bug("poll errno:%d", errno); + break; + } + + case 1: + consume_communication_pipe(timer_th.comm_fds[0]); + break; + + default: + rb_bug("unreachbale"); + } +} + +#endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H |