diff options
author | Koichi Sasada <ko1@atdot.net> | 2023-04-10 10:53:13 +0900 |
---|---|---|
committer | Koichi Sasada <ko1@atdot.net> | 2023-10-12 14:47:01 +0900 |
commit | be1bbd5b7d40ad863ab35097765d3754726bbd54 (patch) | |
tree | 2995a0859bea1d6b2903dcd324f41869dbef14a1 /thread_pthread_mn.c | |
parent | 096ee0648e215915a3019c2cd68ba220d94eca12 (diff) |
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system.
In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.
From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.
Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.
There are additional settings by environment variables:
`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).
`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).
This patch will be reverted soon if non-easy issues are found.
[Bug #19842]
Diffstat (limited to 'thread_pthread_mn.c')
-rw-r--r-- | thread_pthread_mn.c | 843 |
1 files changed, 843 insertions, 0 deletions
diff --git a/thread_pthread_mn.c b/thread_pthread_mn.c new file mode 100644 index 0000000000..64266683cd --- /dev/null +++ b/thread_pthread_mn.c @@ -0,0 +1,843 @@ +// included by "thread_pthread.c" + +#if USE_MN_THREADS + +static void timer_thread_unregister_waiting(rb_thread_t *th, int fd); + +static bool +timer_thread_cancel_waiting(rb_thread_t *th) +{ + bool canceled = false; + + if (th->sched.waiting_reason.flags) { + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (th->sched.waiting_reason.flags) { + canceled = true; + ccan_list_del_init(&th->sched.waiting_reason.node); + if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) { + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd); + } + th->sched.waiting_reason.flags = thread_sched_waiting_none; + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + } + + return canceled; +} + +static void +ubf_event_waiting(void *ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + struct rb_thread_sched *sched = TH_SCHED(th); + + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th)); + + // only once. it is safe because th->interrupt_lock is already acquired. + th->unblock.func = NULL; + th->unblock.arg = NULL; + + bool canceled = timer_thread_cancel_waiting(th); + + thread_sched_lock(sched, th); + { + if (sched->running == th) { + RUBY_DEBUG_LOG("not waiting yet"); + } + else if (canceled) { + thread_sched_to_ready_common(sched, th, true, false); + } + else { + RUBY_DEBUG_LOG("already not waiting"); + } + } + thread_sched_unlock(sched, th); +} + +static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel); + +// return true if timed out +static bool +thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel) +{ + VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT + + volatile bool timedout = false, need_cancel = false; + + if (timer_thread_register_waiting(th, fd, events, rel)) { + RUBY_DEBUG_LOG("wait fd:%d", fd); + + RB_VM_SAVE_MACHINE_CONTEXT(th); + setup_ubf(th, ubf_event_waiting, (void *)th); + + thread_sched_lock(sched, th); + { + if (th->sched.waiting_reason.flags == thread_sched_waiting_none) { + // already awaken + } + else if (RUBY_VM_INTERRUPTED(th->ec)) { + need_cancel = true; + } + else { + RUBY_DEBUG_LOG("sleep"); + + th->status = THREAD_STOPPED_FOREVER; + thread_sched_wakeup_next_thread(sched, th, true); + thread_sched_wait_running_turn(sched, th, true); + + RUBY_DEBUG_LOG("wakeup"); + } + + timedout = th->sched.waiting_reason.data.result == 0; + } + thread_sched_unlock(sched, th); + + if (need_cancel) { + timer_thread_cancel_waiting(th); + } + + setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL? + + th->status = THREAD_RUNNABLE; + } + else { + RUBY_DEBUG_LOG("can not wait fd:%d", fd); + return false; + } + + VM_ASSERT(sched->running == th); + + return timedout; +} + +/// stack management + +#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB +#define MSTACK_PAGE_SIZE 4096 +#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone + +// 512MB chunk +// 131,072 pages (> 65,536) +// 0th page is Redzone. Start from 1st page. + +/* + * <--> machine stack + vm stack + * ---------------------------------- + * |HD...|RZ| ... |RZ| ... ... |RZ| + * <------------- 512MB -------------> + */ + +static struct nt_stack_chunk_header { + struct nt_stack_chunk_header *prev_chunk; + struct nt_stack_chunk_header *prev_free_chunk; + + uint16_t start_page; + uint16_t stack_count; + uint16_t uninitialized_stack_count; + + uint16_t free_stack_pos; + uint16_t free_stack[]; +} *nt_stack_chunks = NULL, + *nt_free_stack_chunks = NULL; + +struct nt_machine_stack_footer { + struct nt_stack_chunk_header *ch; + size_t index; +}; + +static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT; + +#include <sys/mman.h> + +// vm_stack_size + machine_stack_size + 1 * (guard page size) +static inline size_t +nt_therad_stack_size(void) +{ + static size_t msz; + if (LIKELY(msz > 0)) return msz; + + rb_vm_t *vm = GET_VM(); + int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE); + int page_num = (sz + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE; + msz = page_num * MSTACK_PAGE_SIZE; + return msz; +} + +static struct nt_stack_chunk_header * +nt_alloc_thread_stack_chunk(void) +{ + const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE | MAP_STACK, -1, 0); + if (m == MAP_FAILED) { + return NULL; + } + + size_t msz = nt_therad_stack_size(); + int header_page_cnt = 1; + int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; + int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count; + + if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) { + header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE; + stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz; + } + + VM_ASSERT(stack_count <= UINT16_MAX); + + struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m; + + ch->start_page = header_page_cnt; + ch->prev_chunk = nt_stack_chunks; + ch->prev_free_chunk = nt_free_stack_chunks; + ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count; + ch->free_stack_pos = 0; + + RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz); + + return ch; +} + +static void * +nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx) +{ + const char *m = (char *)ch; + return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_therad_stack_size()); +} + +static struct nt_machine_stack_footer * +nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack) +{ + // TODO: stack direction + const size_t msz = vm->default_params.thread_machine_stack_size; + return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)]; +} + +static void * +nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack) +{ + // TODO: only support stack going down + // [VM ... <GUARD> machine stack ...] + + const char *vstack, *mstack; + const char *guard_page; + vstack = nt_stack_chunk_get_stack_start(ch, idx); + guard_page = vstack + vm->default_params.thread_vm_stack_size; + mstack = guard_page + MSTACK_PAGE_SIZE; + + struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack); + msf->ch = ch; + msf->index = idx; + +#if 0 + RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf, + vstack, (void *)(guard_page-1), + guard_page, (void *)(mstack-1), + mstack, (void *)(msf)); +#endif + + *vm_stack = (void *)vstack; + *machine_stack = (void *)mstack; + + return (void *)guard_page; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static void +nt_stack_chunk_dump(void) +{ + struct nt_stack_chunk_header *ch; + int i; + + fprintf(stderr, "** nt_stack_chunks\n"); + ch = nt_stack_chunks; + for (i=0; ch; i++, ch = ch->prev_chunk) { + fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos); + } + + fprintf(stderr, "** nt_free_stack_chunks\n"); + ch = nt_free_stack_chunks; + for (i=0; ch; i++, ch = ch->prev_free_chunk) { + fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos); + } +} + +static int +nt_guard_page(const char *p, size_t len) +{ + if (mprotect((void *)p, len, PROT_NONE) != -1) { + return 0; + } + else { + return errno; + } +} + +static int +nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack) +{ + int err = 0; + + rb_native_mutex_lock(&nt_machine_stack_lock); + { + retry: + if (nt_free_stack_chunks) { + struct nt_stack_chunk_header *ch = nt_free_stack_chunks; + if (ch->free_stack_pos > 0) { + RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos); + nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack); + } + else if (ch->uninitialized_stack_count > 0) { + RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count); + + size_t idx = ch->stack_count - ch->uninitialized_stack_count--; + void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack); + err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE); + } + else { + nt_free_stack_chunks = ch->prev_free_chunk; + ch->prev_free_chunk = NULL; + goto retry; + } + } + else { + struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk(); + if (p == NULL) { + err = errno; + } + else { + nt_free_stack_chunks = nt_stack_chunks = p; + goto retry; + } + } + } + rb_native_mutex_unlock(&nt_machine_stack_lock); + + return err; +} + +static void +nt_free_stack(void *mstack) +{ + if (!mstack) return; + + rb_native_mutex_lock(&nt_machine_stack_lock); + { + struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack); + struct nt_stack_chunk_header *ch = msf->ch; + int idx = (int)msf->index; + void *stack = nt_stack_chunk_get_stack_start(ch, idx); + + RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx); + + if (ch->prev_free_chunk == NULL) { + ch->prev_free_chunk = nt_free_stack_chunks; + nt_free_stack_chunks = ch; + } + ch->free_stack[ch->free_stack_pos++] = idx; + + // clear the stack pages +#if defined(MADV_FREE) + int r = madvise(stack, nt_therad_stack_size(), MADV_FREE); +#elif defined(MADV_DONTNEED) + int r = madvise(stack, nt_therad_stack_size(), MADV_DONTNEED); +#else + int r = 0; +#endif + + if (r != 0) rb_bug("madvise errno:%d", errno); + } + rb_native_mutex_unlock(&nt_machine_stack_lock); +} + +static int +native_thread_check_and_create_shared(rb_vm_t *vm) +{ + bool need_to_make = false; + + rb_native_mutex_lock(&vm->ractor.sched.lock); + { + unsigned int snt_cnt = vm->ractor.sched.snt_cnt; + if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) snt_cnt++; // do not need snt for main ractor + + if (((int)snt_cnt < MINIMUM_SNT) || + (snt_cnt < vm->ractor.cnt && + snt_cnt < vm->ractor.sched.max_cpu)) { + + RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u", + vm->ractor.sched.snt_cnt, + vm->ractor.sched.dnt_cnt, + vm->ractor.cnt, + vm->ractor.sched.grq_cnt); + + vm->ractor.sched.snt_cnt++; + need_to_make = true; + } + else { + RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt); + } + } + rb_native_mutex_unlock(&vm->ractor.sched.lock); + + if (need_to_make) { + struct rb_native_thread *nt = native_thread_alloc(); + nt->vm = vm; + return native_thread_create0(nt); + } + else { + return 0; + } +} + +static COROUTINE +co_start(struct coroutine_context *from, struct coroutine_context *self) +{ + rb_thread_t *th = (rb_thread_t *)self->argument; + struct rb_thread_sched *sched = TH_SCHED(th); + VM_ASSERT(th->nt != NULL); + VM_ASSERT(th == sched->running); + VM_ASSERT(sched->lock_owner == NULL); + + // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + thread_sched_set_lock_owner(sched, th); + thread_sched_add_running_thread(TH_SCHED(th), th); + thread_sched_unlock(sched, th); + { + call_thread_start_func_2(th); + } + thread_sched_lock(sched, NULL); + + RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial); + + // Thread is terminated + + VM_ASSERT(!th_has_dedicated_nt(th)); + + rb_vm_t *vm = th->vm; + bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued + + rb_thread_t *next_th = sched->running; + struct rb_native_thread *nt = th->nt; + native_thread_assign(NULL, th); + rb_ractor_set_current_ec(th->ractor, NULL); + + if (!has_ready_ractor && next_th && !next_th->nt) { + // switch to the next thread + thread_sched_set_lock_owner(sched, NULL); + thread_sched_switch0(th->sched.context, next_th, nt); + th->sched.finished = true; + } + else { + // switch to the next Ractor + th->sched.finished = true; + coroutine_transfer(self, nt->nt_context); + } + rb_bug("unreachable"); +} + +static int +native_thread_create_shared(rb_thread_t *th) +{ + // setup coroutine + rb_vm_t *vm = th->vm; + void *vm_stack = NULL, *machine_stack = NULL; + int err = nt_alloc_stack(vm, &vm_stack, &machine_stack); + if (err) return err; + + VM_ASSERT(vm_stack < machine_stack); + + // setup vm stack + size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE); + rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words); + + // setup machine stack + size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer); + th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size); + th->ec->machine.stack_maxsize = machine_stack_size; // TODO + th->sched.context_stack = machine_stack; + + th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context)); + coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size); + th->sched.context->argument = th; + + RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack); + thread_sched_to_ready(TH_SCHED(th), th); + + // setup nt + return native_thread_check_and_create_shared(th->vm); +} + +#else // USE_MN_THREADS + +static int +native_thread_create_shared(rb_thread_t *th) +{ + rb_bug("unreachable"); +} + +static bool +thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel) +{ + rb_bug("unreachable"); +} + +#endif // USE_MN_THREADS + +/// EPOLL specific code +#if HAVE_SYS_EPOLL_H && USE_MN_THREADS + +static bool +fd_readable_nonblock(int fd) +{ + struct pollfd pfd = { + .fd = fd, + .events = POLLIN, + }; + return poll(&pfd, 1, 0) != 0; +} + +static bool +fd_writable_nonblock(int fd) +{ + struct pollfd pfd = { + .fd = fd, + .events = POLLOUT, + }; + return poll(&pfd, 1, 0) != 0; +} + +static void +verify_waiting_list(void) +{ +#if VM_CHECK_MODE > 0 + rb_thread_t *wth, *prev_wth = NULL; + ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout); + if (prev_wth) { + rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout; + rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout; + VM_ASSERT(timeout == 0 || prev_timeout <= timeout); + } + prev_wth = wth; + } +#endif +} + +// return false if the fd is not waitable or not need to wait. +static bool +timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel) +{ + RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0); + + VM_ASSERT(th == NULL || TH_SCHED(th)->running == th); + VM_ASSERT(flags != 0); + + rb_hrtime_t abs = 0; // 0 means no timeout + + if (rel) { + if (*rel > 0) { + flags |= thread_sched_waiting_timeout; + } + else { + return false; + } + } + + if (rel && *rel > 0) { + flags |= thread_sched_waiting_timeout; + } + + __uint32_t epoll_events = 0; + if (flags & thread_sched_waiting_timeout) { + VM_ASSERT(rel != NULL); + abs = rb_hrtime_add(rb_hrtime_now(), *rel); + } + + if (flags & thread_sched_waiting_io_read) { + if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) { + RUBY_DEBUG_LOG("fd_readable_nonblock"); + return false; + } + else { + VM_ASSERT(fd >= 0); + epoll_events |= EPOLLIN; + } + } + + if (flags & thread_sched_waiting_io_write) { + if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) { + RUBY_DEBUG_LOG("fd_writable_nonblock"); + return false; + } + else { + VM_ASSERT(fd >= 0); + epoll_events |= EPOLLOUT; + } + } + + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (epoll_events) { + struct epoll_event event = { + .events = epoll_events, + .data = { + .ptr = (void *)th, + }, + }; + if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) { + RUBY_DEBUG_LOG("failed (%d)", errno); + + switch (errno) { + case EBADF: + // the fd is closed? + case EPERM: + // the fd doesn't support epoll + case EEXIST: + // the fd is already registerred by another thread + rb_native_mutex_unlock(&timer_th.waiting_lock); + return false; + default: + perror("epoll_ctl"); + rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno); + } + } + RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events); + } + + if (th) { + VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); + + // setup waiting information + { + th->sched.waiting_reason.flags = flags; + th->sched.waiting_reason.data.timeout = abs; + th->sched.waiting_reason.data.fd = fd; + th->sched.waiting_reason.data.result = 0; + } + + if (abs == 0) { // no timeout + VM_ASSERT(!(flags & thread_sched_waiting_timeout)); + ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node); + } + else { + RUBY_DEBUG_LOG("abs:%lu", abs); + VM_ASSERT(flags & thread_sched_waiting_timeout); + + // insert th to sorted list (TODO: O(n)) + rb_thread_t *wth, *prev_wth = NULL; + + ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) { + if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) && + wth->sched.waiting_reason.data.timeout < abs) { + prev_wth = wth; + } + else { + break; + } + } + + if (prev_wth) { + ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node); + } + else { + ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node); + } + + verify_waiting_list(); + + // update timeout seconds + timer_thread_wakeup(); + } + } + else { + VM_ASSERT(abs == 0); + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + + return true; +} + +static void +timer_thread_unregister_waiting(rb_thread_t *th, int fd) +{ + RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd); + + // Linux 2.6.9 or later is needed to pass NULL as data. + if (epoll_ctl(timer_th.epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) { + switch (errno) { + case EBADF: + // just ignore. maybe fd is closed. + break; + default: + perror("epoll_ctl"); + rb_bug("unregister/epoll_ctl fails. errno:%d", errno); + } + } +} + +static void +timer_thread_setup_nm(void) +{ + if ((timer_th.epoll_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno); + RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]); + RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.epoll_fd); + + timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL); +} + +/* + * The purpose of the timer thread: + * + * (1) Periodic checking + * (1-1) Provide time slice for active NTs + * (1-2) Check NT shortage + * (1-3) Periodic UBF (global) + * (1-4) Lazy GRQ deq start + * (2) Receive notification + * (2-1) async I/O termination + * (2-2) timeout + * (2-2-1) sleep(n) + * (2-2-2) timeout(n), I/O, ... + */ +static void +timer_thread_polling(rb_vm_t *vm) +{ + int r = epoll_wait(timer_th.epoll_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm)); + + RUBY_DEBUG_LOG("r:%d errno:%d", r, errno); + + switch (r) { + case 0: // timeout + RUBY_DEBUG_LOG("timeout%s", ""); + + ractor_sched_lock(vm, NULL); + { + // (1-1) timeslice + timer_thread_check_timeslice(vm); + + // (1-4) lazy grq deq + if (vm->ractor.sched.grq_cnt > 0) { + RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt); + rb_native_cond_signal(&vm->ractor.sched.cond); + } + } + ractor_sched_unlock(vm, NULL); + + // (1-2) + native_thread_check_and_create_shared(vm); + + break; + + case -1: + switch (errno) { + case EINTR: + // simply retry + break; + default: + perror("epoll_wait"); + rb_bug("epoll_wait errno:%d", errno); + } + break; + + default: + RUBY_DEBUG_LOG("%d event(s)", r); + + for (int i=0; i<r; i++) { + rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr; + + if (th == NULL) { + // wakeup timerthread + RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]); + consume_communication_pipe(timer_th.comm_fds[0]); + } + else { + // wakeup specific thread by IO + uint32_t events = timer_th.finished_events[i].events; + + RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s", + rb_th_serial(th), + (events & EPOLLIN) ? "in/" : "", + (events & EPOLLOUT) ? "out/" : "", + (events & EPOLLRDHUP) ? "RDHUP/" : "", + (events & EPOLLPRI) ? "pri/" : "", + (events & EPOLLERR) ? "err/" : "", + (events & EPOLLHUP) ? "hup/" : ""); + + rb_native_mutex_lock(&timer_th.waiting_lock); + { + if (th->sched.waiting_reason.flags) { + // delete from chain + ccan_list_del_init(&th->sched.waiting_reason.node); + timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd); + + th->sched.waiting_reason.flags = thread_sched_waiting_none; + th->sched.waiting_reason.data.fd = -1; + th->sched.waiting_reason.data.result = (int)events; + + timer_thread_wakeup_thread(th); + } + else { + // already released + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + } + } + } +} + +#else // HAVE_SYS_EPOLL_H + +static void +timer_thread_setup_nm(void) +{ + // do nothing +} + +static void +timer_thread_polling(rb_vm_t *vm) +{ + int timeout = timer_thread_set_timeout(vm); + + struct pollfd pfd = { + .fd = timer_th.comm_fds[0], + .events = POLLIN, + }; + + int r = poll(&pfd, 1, timeout); + + switch (r) { + case 0: // timeout + rb_native_mutex_lock(&vm->ractor.sched.lock); + { + // (1-1) timeslice + timer_thread_check_timeslice(vm); + } + rb_native_mutex_unlock(&vm->ractor.sched.lock); + break; + + case -1: // error + switch (errno) { + case EINTR: + // simply retry + break; + default: + perror("poll"); + rb_bug("poll errno:%d", errno); + break; + } + + case 1: + consume_communication_pipe(timer_th.comm_fds[0]); + break; + + default: + rb_bug("unreachbale"); + } +} + +#endif // HAVE_SYS_EPOLL_H |