diff options
Diffstat (limited to 'thread_pthread.c')
| -rw-r--r-- | thread_pthread.c | 3762 |
1 files changed, 2860 insertions, 902 deletions
diff --git a/thread_pthread.c b/thread_pthread.c index 55e2856e43..9c7754067b 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -11,7 +11,8 @@ #ifdef THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION -#include "gc.h" +#include "internal/gc.h" +#include "internal/sanitizers.h" #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> @@ -19,498 +20,1841 @@ #ifdef HAVE_THR_STKSEGMENT #include <thread.h> #endif -#if HAVE_FCNTL_H +#if defined(HAVE_FCNTL_H) #include <fcntl.h> -#elif HAVE_SYS_FCNTL_H +#elif defined(HAVE_SYS_FCNTL_H) #include <sys/fcntl.h> #endif #ifdef HAVE_SYS_PRCTL_H #include <sys/prctl.h> #endif -#if defined(__native_client__) && defined(NACL_NEWLIB) -# include "nacl/select.h" -#endif -#if HAVE_POLL -#include <poll.h> -#endif #if defined(HAVE_SYS_TIME_H) #include <sys/time.h> #endif +#if defined(__HAIKU__) +#include <kernel/OS.h> +#endif +#ifdef __linux__ +#include <sys/syscall.h> /* for SYS_gettid */ +#endif +#include <time.h> +#include <signal.h> -static void native_mutex_lock(pthread_mutex_t *lock); -static void native_mutex_unlock(pthread_mutex_t *lock); -static int native_mutex_trylock(pthread_mutex_t *lock); -static void native_mutex_initialize(pthread_mutex_t *lock); -static void native_mutex_destroy(pthread_mutex_t *lock); -static void native_cond_signal(rb_thread_cond_t *cond); -static void native_cond_broadcast(rb_thread_cond_t *cond); -static void native_cond_wait(rb_thread_cond_t *cond, pthread_mutex_t *mutex); -static void native_cond_initialize(rb_thread_cond_t *cond, int flags); -static void native_cond_destroy(rb_thread_cond_t *cond); -static void rb_thread_wakeup_timer_thread_low(void); -static pthread_t timer_thread_id; +#if defined __APPLE__ +# include <AvailabilityMacros.h> +#endif -#define RB_CONDATTR_CLOCK_MONOTONIC 1 +#if defined(HAVE_SYS_EVENTFD_H) && defined(HAVE_EVENTFD) +# define USE_EVENTFD (1) +# include <sys/eventfd.h> +#else +# define USE_EVENTFD (0) +#endif -#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined(HAVE_CLOCKID_T) && \ +#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && \ defined(CLOCK_REALTIME) && defined(CLOCK_MONOTONIC) && \ - defined(HAVE_CLOCK_GETTIME) && defined(HAVE_PTHREAD_CONDATTR_INIT) -#define USE_MONOTONIC_COND 1 + defined(HAVE_CLOCK_GETTIME) +static pthread_condattr_t condattr_mono; +static pthread_condattr_t *condattr_monotonic = &condattr_mono; #else -#define USE_MONOTONIC_COND 0 +static const void *const condattr_monotonic = NULL; +#endif + +#include COROUTINE_H + +#ifndef HAVE_SYS_EVENT_H +#define HAVE_SYS_EVENT_H 0 #endif -#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) && defined(O_NONBLOCK) && !defined(__native_client__) -/* The timer thread sleeps while only one Ruby thread is running. */ -# define USE_SLEEPY_TIMER_THREAD 1 +#ifndef HAVE_SYS_EPOLL_H +#define HAVE_SYS_EPOLL_H 0 #else -# define USE_SLEEPY_TIMER_THREAD 0 +// force setting for debug +// #undef HAVE_SYS_EPOLL_H +// #define HAVE_SYS_EPOLL_H 0 #endif -#ifndef ARRAY_SIZE -#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) +#ifndef USE_MN_THREADS + #if defined(__EMSCRIPTEN__) || defined(COROUTINE_PTHREAD_CONTEXT) + // on __EMSCRIPTEN__ provides epoll* declarations, but no implementations. + // on COROUTINE_PTHREAD_CONTEXT, it doesn't worth to use it. + #define USE_MN_THREADS 0 + #elif HAVE_SYS_EPOLL_H + #include <sys/epoll.h> + #define USE_MN_THREADS 1 + #elif HAVE_SYS_EVENT_H + #include <sys/event.h> + #define USE_MN_THREADS 1 + #else + #define USE_MN_THREADS 0 + #endif #endif +#ifdef HAVE_SCHED_YIELD +#define native_thread_yield() (void)sched_yield() +#else +#define native_thread_yield() ((void)0) +#endif + +// native thread wrappers + +#define NATIVE_MUTEX_LOCK_DEBUG 0 +#define NATIVE_MUTEX_LOCK_DEBUG_YIELD 0 + static void -gvl_acquire_common(rb_vm_t *vm) +mutex_debug(const char *msg, void *lock) +{ + if (NATIVE_MUTEX_LOCK_DEBUG) { + int r; + static pthread_mutex_t dbglock = PTHREAD_MUTEX_INITIALIZER; + + if ((r = pthread_mutex_lock(&dbglock)) != 0) {exit(EXIT_FAILURE);} + fprintf(stdout, "%s: %p\n", msg, lock); + if ((r = pthread_mutex_unlock(&dbglock)) != 0) {exit(EXIT_FAILURE);} + } +} + +void +rb_native_mutex_lock(pthread_mutex_t *lock) +{ + int r; +#if NATIVE_MUTEX_LOCK_DEBUG_YIELD + native_thread_yield(); +#endif + mutex_debug("lock", lock); + if ((r = pthread_mutex_lock(lock)) != 0) { + rb_bug_errno("pthread_mutex_lock", r); + } +} + +void +rb_native_mutex_unlock(pthread_mutex_t *lock) +{ + int r; + mutex_debug("unlock", lock); + if ((r = pthread_mutex_unlock(lock)) != 0) { + rb_bug_errno("pthread_mutex_unlock", r); + } +} + +int +rb_native_mutex_trylock(pthread_mutex_t *lock) +{ + int r; + mutex_debug("trylock", lock); + if ((r = pthread_mutex_trylock(lock)) != 0) { + if (r == EBUSY) { + return EBUSY; + } + else { + rb_bug_errno("pthread_mutex_trylock", r); + } + } + return 0; +} + +void +rb_native_mutex_initialize(pthread_mutex_t *lock) +{ + int r = pthread_mutex_init(lock, 0); + mutex_debug("init", lock); + if (r != 0) { + rb_bug_errno("pthread_mutex_init", r); + } +} + +void +rb_native_mutex_destroy(pthread_mutex_t *lock) +{ + int r = pthread_mutex_destroy(lock); + mutex_debug("destroy", lock); + if (r != 0) { + rb_bug_errno("pthread_mutex_destroy", r); + } +} + +void +rb_native_cond_initialize(rb_nativethread_cond_t *cond) +{ + int r = pthread_cond_init(cond, condattr_monotonic); + if (r != 0) { + rb_bug_errno("pthread_cond_init", r); + } +} + +void +rb_native_cond_destroy(rb_nativethread_cond_t *cond) +{ + int r = pthread_cond_destroy(cond); + if (r != 0) { + rb_bug_errno("pthread_cond_destroy", r); + } +} + +/* + * In OS X 10.7 (Lion), pthread_cond_signal and pthread_cond_broadcast return + * EAGAIN after retrying 8192 times. You can see them in the following page: + * + * http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c + * + * The following rb_native_cond_signal and rb_native_cond_broadcast functions + * need to retrying until pthread functions don't return EAGAIN. + */ + +void +rb_native_cond_signal(rb_nativethread_cond_t *cond) +{ + int r; + do { + r = pthread_cond_signal(cond); + } while (r == EAGAIN); + if (r != 0) { + rb_bug_errno("pthread_cond_signal", r); + } +} + +void +rb_native_cond_broadcast(rb_nativethread_cond_t *cond) +{ + int r; + do { + r = pthread_cond_broadcast(cond); + } while (r == EAGAIN); + if (r != 0) { + rb_bug_errno("rb_native_cond_broadcast", r); + } +} + +void +rb_native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex) +{ + int r = pthread_cond_wait(cond, mutex); + if (r != 0) { + rb_bug_errno("pthread_cond_wait", r); + } +} + +static int +native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs) +{ + int r; + struct timespec ts; + + /* + * An old Linux may return EINTR. Even though POSIX says + * "These functions shall not return an error code of [EINTR]". + * http://pubs.opengroup.org/onlinepubs/009695399/functions/pthread_cond_timedwait.html + * Let's hide it from arch generic code. + */ + do { + rb_hrtime2timespec(&ts, abs); + r = pthread_cond_timedwait(cond, mutex, &ts); + } while (r == EINTR); + + if (r != 0 && r != ETIMEDOUT) { + rb_bug_errno("pthread_cond_timedwait", r); + } + + return r; +} + +static rb_hrtime_t +native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel) +{ + if (condattr_monotonic) { + return rb_hrtime_add(rb_hrtime_now(), rel); + } + else { + struct timespec ts; + + rb_timespec_now(&ts); + return rb_hrtime_add(rb_timespec2hrtime(&ts), rel); + } +} + +void +rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, unsigned long msec) +{ + rb_hrtime_t hrmsec = native_cond_timeout(cond, RB_HRTIME_PER_MSEC * msec); + native_cond_timedwait(cond, mutex, &hrmsec); +} + +// thread scheduling + +static rb_internal_thread_event_hook_t *rb_internal_thread_event_hooks = NULL; +static void rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th); + +#if 0 +static const char * +event_name(rb_event_flag_t event) { - if (vm->gvl.acquired) { + switch (event) { + case RUBY_INTERNAL_THREAD_EVENT_STARTED: + return "STARTED"; + case RUBY_INTERNAL_THREAD_EVENT_READY: + return "READY"; + case RUBY_INTERNAL_THREAD_EVENT_RESUMED: + return "RESUMED"; + case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED: + return "SUSPENDED"; + case RUBY_INTERNAL_THREAD_EVENT_EXITED: + return "EXITED"; + } + return "no-event"; +} + +#define RB_INTERNAL_THREAD_HOOK(event, th) \ + if (UNLIKELY(rb_internal_thread_event_hooks)) { \ + fprintf(stderr, "[thread=%"PRIxVALUE"] %s in %s (%s:%d)\n", th->self, event_name(event), __func__, __FILE__, __LINE__); \ + rb_thread_execute_hooks(event, th); \ + } +#else +#define RB_INTERNAL_THREAD_HOOK(event, th) if (UNLIKELY(rb_internal_thread_event_hooks)) { rb_thread_execute_hooks(event, th); } +#endif + +static rb_serial_t current_fork_gen = 1; /* We can't use GET_VM()->fork_gen */ - vm->gvl.waiting++; - if (vm->gvl.waiting == 1) { - /* - * Wake up timer thread iff timer thread is slept. - * When timer thread is polling mode, we don't want to - * make confusing timer thread interval time. - */ - rb_thread_wakeup_timer_thread_low(); - } +#if defined(SIGVTALRM) && !defined(__EMSCRIPTEN__) +# define USE_UBF_LIST 1 +#endif + +static void threadptr_trap_interrupt(rb_thread_t *); + +static void native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); +static void native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt); +static void native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th); - while (vm->gvl.acquired) { - native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); - } +static void ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r); +static void timer_thread_wakeup(void); +static void timer_thread_wakeup_locked(rb_vm_t *vm); +static void timer_thread_wakeup_force(void); +static void thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th); +static void coroutine_transfer0(struct coroutine_context *transfer_from, + struct coroutine_context *transfer_to, bool to_dead); - vm->gvl.waiting--; +#define thread_sched_dump(s) thread_sched_dump_(__FILE__, __LINE__, s) - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; - native_cond_signal(&vm->gvl.switch_cond); - } +static bool +th_has_dedicated_nt(const rb_thread_t *th) +{ + // TODO: th->has_dedicated_nt + return th->nt->dedicated > 0; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static void +thread_sched_dump_(const char *file, int line, struct rb_thread_sched *sched) +{ + fprintf(stderr, "@%s:%d running:%d\n", file, line, sched->running ? (int)sched->running->serial : -1); + rb_thread_t *th; + int i = 0; + ccan_list_for_each(&sched->readyq, th, sched.node.readyq) { + i++; if (i>10) rb_bug("too many"); + fprintf(stderr, " ready:%d (%sNT:%d)\n", th->serial, + th->nt ? (th->nt->dedicated ? "D" : "S") : "x", + th->nt ? (int)th->nt->serial : -1); } +} + +#define ractor_sched_dump(s) ractor_sched_dump_(__FILE__, __LINE__, s) + +RBIMPL_ATTR_MAYBE_UNUSED() +static void +ractor_sched_dump_(const char *file, int line, rb_vm_t *vm) +{ + rb_ractor_t *r; - vm->gvl.acquired = 1; + fprintf(stderr, "ractor_sched_dump %s:%d\n", file, line); + + int i = 0; + ccan_list_for_each(&vm->ractor.sched.grq, r, threads.sched.grq_node) { + i++; + if (i>10) rb_bug("!!"); + fprintf(stderr, " %d ready:%d\n", i, rb_ractor_id(r)); + } } +#define thread_sched_lock(a, b) thread_sched_lock_(a, b, __FILE__, __LINE__) +#define thread_sched_unlock(a, b) thread_sched_unlock_(a, b, __FILE__, __LINE__) + static void -gvl_acquire(rb_vm_t *vm, rb_thread_t *th) +thread_sched_set_locked(struct rb_thread_sched *sched, rb_thread_t *th) { - native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm); - native_mutex_unlock(&vm->gvl.lock); +#if VM_CHECK_MODE > 0 + VM_ASSERT(sched->lock_owner == NULL); + + sched->lock_owner = th; +#endif } static void -gvl_release_common(rb_vm_t *vm) +thread_sched_set_unlocked(struct rb_thread_sched *sched, rb_thread_t *th) { - vm->gvl.acquired = 0; - if (vm->gvl.waiting > 0) - native_cond_signal(&vm->gvl.cond); +#if VM_CHECK_MODE > 0 + VM_ASSERT(sched->lock_owner == th); + + sched->lock_owner = NULL; +#endif } static void -gvl_release(rb_vm_t *vm) +thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { - native_mutex_lock(&vm->gvl.lock); - gvl_release_common(vm); - native_mutex_unlock(&vm->gvl.lock); + rb_native_mutex_lock(&sched->lock_); + +#if VM_CHECK_MODE + RUBY_DEBUG_LOG2(file, line, "r:%d th:%u", th ? (int)rb_ractor_id(th->ractor) : -1, rb_th_serial(th)); +#else + RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); +#endif + + thread_sched_set_locked(sched, th); } static void -gvl_yield(rb_vm_t *vm, rb_thread_t *th) +thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { - native_mutex_lock(&vm->gvl.lock); + RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); + + thread_sched_set_unlocked(sched, th); + + rb_native_mutex_unlock(&sched->lock_); +} - gvl_release_common(vm); +static void +ASSERT_thread_sched_locked(struct rb_thread_sched *sched, rb_thread_t *th) +{ + VM_ASSERT(rb_native_mutex_trylock(&sched->lock_) == EBUSY); - /* An another thread is processing GVL yield. */ - if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) - native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); - goto acquire; +#if VM_CHECK_MODE + if (th) { + VM_ASSERT(sched->lock_owner == th); } + else { + VM_ASSERT(sched->lock_owner != NULL); + } +#endif +} + +#define ractor_sched_lock(a, b) ractor_sched_lock_(a, b, __FILE__, __LINE__) +#define ractor_sched_unlock(a, b) ractor_sched_unlock_(a, b, __FILE__, __LINE__) - if (vm->gvl.waiting > 0) { - /* Wait until another thread task take GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) - native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; +RBIMPL_ATTR_MAYBE_UNUSED() +static unsigned int +rb_ractor_serial(const rb_ractor_t *r) +{ + if (r) { + return rb_ractor_id(r); } else { - native_mutex_unlock(&vm->gvl.lock); - sched_yield(); - native_mutex_lock(&vm->gvl.lock); + return 0; } +} + +static void +ractor_sched_set_locked(rb_vm_t *vm, rb_ractor_t *cr) +{ +#if VM_CHECK_MODE > 0 + VM_ASSERT(vm->ractor.sched.lock_owner == NULL); + VM_ASSERT(vm->ractor.sched.locked == false); + + vm->ractor.sched.lock_owner = cr; + vm->ractor.sched.locked = true; +#endif +} + +static void +ractor_sched_set_unlocked(rb_vm_t *vm, rb_ractor_t *cr) +{ +#if VM_CHECK_MODE > 0 + VM_ASSERT(vm->ractor.sched.locked); + VM_ASSERT(vm->ractor.sched.lock_owner == cr); - native_cond_broadcast(&vm->gvl.switch_wait_cond); - acquire: - gvl_acquire_common(vm); - native_mutex_unlock(&vm->gvl.lock); + vm->ractor.sched.locked = false; + vm->ractor.sched.lock_owner = NULL; +#endif } static void -gvl_init(rb_vm_t *vm) +ractor_sched_lock_(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) { - native_mutex_initialize(&vm->gvl.lock); - native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC); - native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC); - native_cond_initialize(&vm->gvl.switch_wait_cond, RB_CONDATTR_CLOCK_MONOTONIC); - vm->gvl.acquired = 0; - vm->gvl.waiting = 0; - vm->gvl.need_yield = 0; - vm->gvl.wait_yield = 0; + rb_native_mutex_lock(&vm->ractor.sched.lock); + +#if VM_CHECK_MODE + RUBY_DEBUG_LOG2(file, line, "cr:%u prev_owner:%u", rb_ractor_serial(cr), rb_ractor_serial(vm->ractor.sched.lock_owner)); +#else + RUBY_DEBUG_LOG2(file, line, "cr:%u", rb_ractor_serial(cr)); +#endif + + ractor_sched_set_locked(vm, cr); } static void -gvl_destroy(rb_vm_t *vm) +ractor_sched_unlock_(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) { - native_cond_destroy(&vm->gvl.switch_wait_cond); - native_cond_destroy(&vm->gvl.switch_cond); - native_cond_destroy(&vm->gvl.cond); - native_mutex_destroy(&vm->gvl.lock); + RUBY_DEBUG_LOG2(file, line, "cr:%u", rb_ractor_serial(cr)); + + ractor_sched_set_unlocked(vm, cr); + rb_native_mutex_unlock(&vm->ractor.sched.lock); } static void -gvl_atfork(rb_vm_t *vm) +ASSERT_ractor_sched_locked(rb_vm_t *vm, rb_ractor_t *cr) { - gvl_init(vm); - gvl_acquire(vm, GET_THREAD()); + VM_ASSERT(rb_native_mutex_trylock(&vm->ractor.sched.lock) == EBUSY); + VM_ASSERT(vm->ractor.sched.locked); + VM_ASSERT(cr == NULL || vm->ractor.sched.lock_owner == cr); } -#define NATIVE_MUTEX_LOCK_DEBUG 0 +RBIMPL_ATTR_MAYBE_UNUSED() +static bool +ractor_sched_running_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) +{ + rb_thread_t *rth; + ccan_list_for_each(&vm->ractor.sched.running_threads, rth, sched.node.running_threads) { + if (rth == th) return true; + } + return false; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static unsigned int +ractor_sched_running_threads_size(rb_vm_t *vm) +{ + rb_thread_t *th; + unsigned int i = 0; + ccan_list_for_each(&vm->ractor.sched.running_threads, th, sched.node.running_threads) { + i++; + } + return i; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static unsigned int +ractor_sched_timeslice_threads_size(rb_vm_t *vm) +{ + rb_thread_t *th; + unsigned int i = 0; + ccan_list_for_each(&vm->ractor.sched.timeslice_threads, th, sched.node.timeslice_threads) { + i++; + } + return i; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static bool +ractor_sched_timeslice_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) +{ + rb_thread_t *rth; + ccan_list_for_each(&vm->ractor.sched.timeslice_threads, rth, sched.node.timeslice_threads) { + if (rth == th) return true; + } + return false; +} + +static void ractor_sched_barrier_join_signal_locked(rb_vm_t *vm); +// setup timeslice signals by the timer thread. static void -mutex_debug(const char *msg, pthread_mutex_t *lock) +thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *cr, rb_vm_t *vm, + rb_thread_t *add_th, rb_thread_t *del_th, rb_thread_t *add_timeslice_th) { - if (NATIVE_MUTEX_LOCK_DEBUG) { - int r; - static pthread_mutex_t dbglock = PTHREAD_MUTEX_INITIALIZER; +#if USE_RUBY_DEBUG_LOG + unsigned int prev_running_cnt = vm->ractor.sched.running_cnt; +#endif + + rb_thread_t *del_timeslice_th; + + if (del_th && sched->is_running_timeslice) { + del_timeslice_th = del_th; + sched->is_running_timeslice = false; + } + else { + del_timeslice_th = NULL; + } + + RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u", + rb_th_serial(add_th), rb_th_serial(del_th), + rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th)); + + ractor_sched_lock(vm, cr); + { + // update running_threads + if (del_th) { + VM_ASSERT(ractor_sched_running_threads_contain_p(vm, del_th)); + VM_ASSERT(del_timeslice_th != NULL || + !ractor_sched_timeslice_threads_contain_p(vm, del_th)); + + ccan_list_del_init(&del_th->sched.node.running_threads); + vm->ractor.sched.running_cnt--; + + if (UNLIKELY(vm->ractor.sched.barrier_waiting)) { + ractor_sched_barrier_join_signal_locked(vm); + } + sched->is_running = false; + } + + if (add_th) { + if (vm->ractor.sched.barrier_waiting) { + // TODO: GC barrier check? + RUBY_DEBUG_LOG("barrier_waiting"); + RUBY_VM_SET_VM_BARRIER_INTERRUPT(add_th->ec); + } + + VM_ASSERT(!ractor_sched_running_threads_contain_p(vm, add_th)); + VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(vm, add_th)); + + ccan_list_add(&vm->ractor.sched.running_threads, &add_th->sched.node.running_threads); + vm->ractor.sched.running_cnt++; + sched->is_running = true; + } + + if (add_timeslice_th) { + // update timeslice threads + int was_empty = ccan_list_empty(&vm->ractor.sched.timeslice_threads); + VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(vm, add_timeslice_th)); + ccan_list_add(&vm->ractor.sched.timeslice_threads, &add_timeslice_th->sched.node.timeslice_threads); + sched->is_running_timeslice = true; + if (was_empty) { + timer_thread_wakeup_locked(vm); + } + } + + if (del_timeslice_th) { + VM_ASSERT(ractor_sched_timeslice_threads_contain_p(vm, del_timeslice_th)); + ccan_list_del_init(&del_timeslice_th->sched.node.timeslice_threads); + } - if ((r = pthread_mutex_lock(&dbglock)) != 0) {exit(EXIT_FAILURE);} - fprintf(stdout, "%s: %p\n", msg, (void *)lock); - if ((r = pthread_mutex_unlock(&dbglock)) != 0) {exit(EXIT_FAILURE);} + VM_ASSERT(ractor_sched_running_threads_size(vm) == vm->ractor.sched.running_cnt); + VM_ASSERT(ractor_sched_timeslice_threads_size(vm) <= vm->ractor.sched.running_cnt); } + ractor_sched_unlock(vm, cr); + + //RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u run:%u->%u", + // rb_th_serial(add_th), rb_th_serial(del_th), + // rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th), + RUBY_DEBUG_LOG("run:%u->%u", prev_running_cnt, vm->ractor.sched.running_cnt); } static void -native_mutex_lock(pthread_mutex_t *lock) +thread_sched_add_running_thread(struct rb_thread_sched *sched, rb_thread_t *th) { - int r; - mutex_debug("lock", lock); - if ((r = pthread_mutex_lock(lock)) != 0) { - rb_bug_errno("pthread_mutex_lock", r); + ASSERT_thread_sched_locked(sched, th); + VM_ASSERT(sched->running == th); + + rb_vm_t *vm = th->vm; + thread_sched_setup_running_threads(sched, th->ractor, vm, th, NULL, ccan_list_empty(&sched->readyq) ? NULL : th); +} + +static void +thread_sched_del_running_thread(struct rb_thread_sched *sched, rb_thread_t *th) +{ + ASSERT_thread_sched_locked(sched, th); + + rb_vm_t *vm = th->vm; + thread_sched_setup_running_threads(sched, th->ractor, vm, NULL, th, NULL); +} + +void +rb_add_running_thread(rb_thread_t *th) +{ + struct rb_thread_sched *sched = TH_SCHED(th); + + thread_sched_lock(sched, th); + { + thread_sched_add_running_thread(sched, th); + } + thread_sched_unlock(sched, th); +} + +void +rb_del_running_thread(rb_thread_t *th) +{ + struct rb_thread_sched *sched = TH_SCHED(th); + + thread_sched_lock(sched, th); + { + thread_sched_del_running_thread(sched, th); } + thread_sched_unlock(sched, th); } +// setup current or next running thread +// sched->running should be set only on this function. +// +// if th is NULL, there is no running threads. static void -native_mutex_unlock(pthread_mutex_t *lock) +thread_sched_set_running(struct rb_thread_sched *sched, rb_thread_t *th) { - int r; - mutex_debug("unlock", lock); - if ((r = pthread_mutex_unlock(lock)) != 0) { - rb_bug_errno("pthread_mutex_unlock", r); + RUBY_DEBUG_LOG("th:%u->th:%u", rb_th_serial(sched->running), rb_th_serial(th)); + VM_ASSERT(sched->running != th); + + sched->running = th; +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static bool +thread_sched_readyq_contain_p(struct rb_thread_sched *sched, rb_thread_t *th) +{ + rb_thread_t *rth; + ccan_list_for_each(&sched->readyq, rth, sched.node.readyq) { + if (rth == th) return true; } + return false; } -static inline int -native_mutex_trylock(pthread_mutex_t *lock) +// deque thread from the ready queue. +// if the ready queue is empty, return NULL. +// +// return deque'ed running thread (or NULL). +static rb_thread_t * +thread_sched_deq(struct rb_thread_sched *sched) { - int r; - mutex_debug("trylock", lock); - if ((r = pthread_mutex_trylock(lock)) != 0) { - if (r == EBUSY) { - return EBUSY; - } - else { - rb_bug_errno("pthread_mutex_trylock", r); - } + ASSERT_thread_sched_locked(sched, NULL); + rb_thread_t *next_th; + + VM_ASSERT(sched->running != NULL); + + if (ccan_list_empty(&sched->readyq)) { + next_th = NULL; } - return 0; + else { + next_th = ccan_list_pop(&sched->readyq, rb_thread_t, sched.node.readyq); + + VM_ASSERT(sched->readyq_cnt > 0); + sched->readyq_cnt--; + ccan_list_node_init(&next_th->sched.node.readyq); + } + + RUBY_DEBUG_LOG("next_th:%u readyq_cnt:%d", rb_th_serial(next_th), sched->readyq_cnt); + + return next_th; } +// enqueue ready thread to the ready queue. static void -native_mutex_initialize(pthread_mutex_t *lock) +thread_sched_enq(struct rb_thread_sched *sched, rb_thread_t *ready_th) { - int r = pthread_mutex_init(lock, 0); - mutex_debug("init", lock); - if (r != 0) { - rb_bug_errno("pthread_mutex_init", r); + ASSERT_thread_sched_locked(sched, NULL); + RUBY_DEBUG_LOG("ready_th:%u readyq_cnt:%d", rb_th_serial(ready_th), sched->readyq_cnt); + + VM_ASSERT(sched->running != NULL); + VM_ASSERT(!thread_sched_readyq_contain_p(sched, ready_th)); + + if (sched->is_running) { + if (ccan_list_empty(&sched->readyq)) { + // add sched->running to timeslice + thread_sched_setup_running_threads(sched, ready_th->ractor, ready_th->vm, NULL, NULL, sched->running); + } } + else { + // ractor_sched lock is needed + // VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); + } + + ccan_list_add_tail(&sched->readyq, &ready_th->sched.node.readyq); + sched->readyq_cnt++; } +// DNT: kick condvar +// SNT: TODO static void -native_mutex_destroy(pthread_mutex_t *lock) +thread_sched_wakeup_running_thread(struct rb_thread_sched *sched, rb_thread_t *next_th, bool will_switch) { - int r = pthread_mutex_destroy(lock); - mutex_debug("destroy", lock); - if (r != 0) { - rb_bug_errno("pthread_mutex_destroy", r); + ASSERT_thread_sched_locked(sched, NULL); + VM_ASSERT(sched->running == next_th); + + if (next_th) { + if (next_th->nt) { + if (th_has_dedicated_nt(next_th)) { + RUBY_DEBUG_LOG("pinning th:%u", next_th->serial); + rb_native_cond_signal(&next_th->nt->cond.readyq); + } + else { + // TODO + RUBY_DEBUG_LOG("th:%u is already running.", next_th->serial); + } + } + else { + if (will_switch) { + RUBY_DEBUG_LOG("th:%u (do nothing)", rb_th_serial(next_th)); + } + else { + RUBY_DEBUG_LOG("th:%u (enq)", rb_th_serial(next_th)); + ractor_sched_enq(next_th->vm, next_th->ractor); + } + } + } + else { + RUBY_DEBUG_LOG("no waiting threads%s", ""); } } +// waiting -> ready (locked) static void -native_cond_initialize(rb_thread_cond_t *cond, int flags) +thread_sched_to_ready_common(struct rb_thread_sched *sched, rb_thread_t *th, bool wakeup, bool will_switch) { -#ifdef HAVE_PTHREAD_COND_INIT - int r; -# if USE_MONOTONIC_COND - pthread_condattr_t attr; + RUBY_DEBUG_LOG("th:%u running:%u redyq_cnt:%d", rb_th_serial(th), rb_th_serial(sched->running), sched->readyq_cnt); + + VM_ASSERT(sched->running != th); + VM_ASSERT(!thread_sched_readyq_contain_p(sched, th)); + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_READY, th); + + if (sched->running == NULL) { + thread_sched_set_running(sched, th); + if (wakeup) thread_sched_wakeup_running_thread(sched, th, will_switch); + } + else { + thread_sched_enq(sched, th); + } +} - pthread_condattr_init(&attr); +// waiting -> ready +// +// `th` had became "waiting" state by `thread_sched_to_waiting` +// and `thread_sched_to_ready` enqueue `th` to the thread ready queue. +RBIMPL_ATTR_MAYBE_UNUSED() +static void +thread_sched_to_ready(struct rb_thread_sched *sched, rb_thread_t *th) +{ + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - cond->clockid = CLOCK_REALTIME; - if (flags & RB_CONDATTR_CLOCK_MONOTONIC) { - r = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); - if (r == 0) { - cond->clockid = CLOCK_MONOTONIC; - } + thread_sched_lock(sched, th); + { + thread_sched_to_ready_common(sched, th, true, false); } + thread_sched_unlock(sched, th); +} - r = pthread_cond_init(&cond->cond, &attr); - pthread_condattr_destroy(&attr); -# else - r = pthread_cond_init(&cond->cond, NULL); -# endif - if (r != 0) { - rb_bug_errno("pthread_cond_init", r); +// wait until sched->running is `th`. +static void +thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, bool can_direct_transfer) +{ + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + ASSERT_thread_sched_locked(sched, th); + VM_ASSERT(th == rb_ec_thread_ptr(rb_current_ec_noinline())); + + if (th != sched->running) { + // already deleted from running threads + // VM_ASSERT(!ractor_sched_running_threads_contain_p(th->vm, th)); // need locking + + // wait for execution right + rb_thread_t *next_th; + while((next_th = sched->running) != th) { + if (th_has_dedicated_nt(th)) { + RUBY_DEBUG_LOG("(nt) sleep th:%u running:%u", rb_th_serial(th), rb_th_serial(sched->running)); + + thread_sched_set_unlocked(sched, th); + { + RUBY_DEBUG_LOG("nt:%d cond:%p", th->nt->serial, &th->nt->cond.readyq); + rb_native_cond_wait(&th->nt->cond.readyq, &sched->lock_); + } + thread_sched_set_locked(sched, th); + + RUBY_DEBUG_LOG("(nt) wakeup %s", sched->running == th ? "success" : "failed"); + if (th == sched->running) { + rb_ractor_thread_switch(th->ractor, th, false); + } + } + else { + // search another ready thread + if (can_direct_transfer && + (next_th = sched->running) != NULL && + !next_th->nt // next_th is running or has dedicated nt + ) { + + RUBY_DEBUG_LOG("th:%u->%u (direct)", rb_th_serial(th), rb_th_serial(next_th)); + + thread_sched_set_unlocked(sched, th); + { + rb_ractor_set_current_ec(th->ractor, NULL); + thread_sched_switch(th, next_th); + } + thread_sched_set_locked(sched, th); + } + else { + // search another ready ractor + struct rb_native_thread *nt = th->nt; + native_thread_assign(NULL, th); + + RUBY_DEBUG_LOG("th:%u->%u (ractor scheduling)", rb_th_serial(th), rb_th_serial(next_th)); + + thread_sched_set_unlocked(sched, th); + { + rb_ractor_set_current_ec(th->ractor, NULL); + coroutine_transfer0(th->sched.context, nt->nt_context, false); + } + thread_sched_set_locked(sched, th); + } + + VM_ASSERT(rb_current_ec_noinline() == th->ec); + } + } + + VM_ASSERT(th->nt != NULL); + VM_ASSERT(rb_current_ec_noinline() == th->ec); + VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none); + + // add th to running threads + thread_sched_add_running_thread(sched, th); } - return; -#endif + // VM_ASSERT(ractor_sched_running_threads_contain_p(th->vm, th)); need locking + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th); } +// waiting -> ready -> running (locked) static void -native_cond_destroy(rb_thread_cond_t *cond) +thread_sched_to_running_common(struct rb_thread_sched *sched, rb_thread_t *th) { -#ifdef HAVE_PTHREAD_COND_INIT - int r = pthread_cond_destroy(&cond->cond); - if (r != 0) { - rb_bug_errno("pthread_cond_destroy", r); + RUBY_DEBUG_LOG("th:%u dedicated:%d", rb_th_serial(th), th_has_dedicated_nt(th)); + + VM_ASSERT(sched->running != th); + VM_ASSERT(th_has_dedicated_nt(th)); + VM_ASSERT(GET_THREAD() == th); + + native_thread_dedicated_dec(th->vm, th->ractor, th->nt); + + // waiting -> ready + thread_sched_to_ready_common(sched, th, false, false); + + if (sched->running == th) { + thread_sched_add_running_thread(sched, th); } -#endif + + // TODO: check SNT number + thread_sched_wait_running_turn(sched, th, false); } -/* - * In OS X 10.7 (Lion), pthread_cond_signal and pthread_cond_broadcast return - * EAGAIN after retrying 8192 times. You can see them in the following page: - * - * http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c - * - * The following native_cond_signal and native_cond_broadcast functions - * need to retrying until pthread functions don't return EAGAIN. - */ +// waiting -> ready -> running +// +// `th` had been waiting by `thread_sched_to_waiting()` +// and run a dedicated task (like waitpid and so on). +// After the dedicated task, this function is called +// to join a normal thread-scheduling. +static void +thread_sched_to_running(struct rb_thread_sched *sched, rb_thread_t *th) +{ + thread_sched_lock(sched, th); + { + thread_sched_to_running_common(sched, th); + } + thread_sched_unlock(sched, th); +} +// resume a next thread in the thread ready queue. +// +// deque next running thread from the ready thread queue and +// resume this thread if available. +// +// If the next therad has a dedicated native thraed, simply signal to resume. +// Otherwise, make the ractor ready and other nt will run the ractor and the thread. static void -native_cond_signal(rb_thread_cond_t *cond) +thread_sched_wakeup_next_thread(struct rb_thread_sched *sched, rb_thread_t *th, bool will_switch) { - int r; - do { - r = pthread_cond_signal(&cond->cond); - } while (r == EAGAIN); - if (r != 0) { - rb_bug_errno("pthread_cond_signal", r); + ASSERT_thread_sched_locked(sched, th); + + VM_ASSERT(sched->running == th); + VM_ASSERT(sched->running->nt != NULL); + + rb_thread_t *next_th = thread_sched_deq(sched); + + RUBY_DEBUG_LOG("next_th:%u", rb_th_serial(next_th)); + VM_ASSERT(th != next_th); + + thread_sched_set_running(sched, next_th); + VM_ASSERT(next_th == sched->running); + thread_sched_wakeup_running_thread(sched, next_th, will_switch); + + if (th != next_th) { + thread_sched_del_running_thread(sched, th); } } +// running -> dead (locked) static void -native_cond_broadcast(rb_thread_cond_t *cond) +thread_sched_to_dead_common(struct rb_thread_sched *sched, rb_thread_t *th) { - int r; - do { - r = pthread_cond_broadcast(&cond->cond); - } while (r == EAGAIN); - if (r != 0) { - rb_bug_errno("native_cond_broadcast", r); + RUBY_DEBUG_LOG("th:%u DNT:%d", rb_th_serial(th), th->nt->dedicated); + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + + thread_sched_wakeup_next_thread(sched, th, !th_has_dedicated_nt(th)); + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_EXITED, th); +} + +// running -> dead +static void +thread_sched_to_dead(struct rb_thread_sched *sched, rb_thread_t *th) +{ + thread_sched_lock(sched, th); + { + thread_sched_to_dead_common(sched, th); } + thread_sched_unlock(sched, th); } +// running -> waiting (locked) +// +// This thread will run dedicated task (th->nt->dedicated++). static void -native_cond_wait(rb_thread_cond_t *cond, pthread_mutex_t *mutex) +thread_sched_to_waiting_common(struct rb_thread_sched *sched, rb_thread_t *th) { - int r = pthread_cond_wait(&cond->cond, mutex); - if (r != 0) { - rb_bug_errno("pthread_cond_wait", r); + RUBY_DEBUG_LOG("th:%u DNT:%d", rb_th_serial(th), th->nt->dedicated); + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + + native_thread_dedicated_inc(th->vm, th->ractor, th->nt); + thread_sched_wakeup_next_thread(sched, th, false); +} + +// running -> waiting +// +// This thread will run a dedicated task. +static void +thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th) +{ + thread_sched_lock(sched, th); + { + thread_sched_to_waiting_common(sched, th); } + thread_sched_unlock(sched, th); } -static int -native_cond_timedwait(rb_thread_cond_t *cond, pthread_mutex_t *mutex, struct timespec *ts) +// mini utility func +// return true if any there are any interrupts +static bool +ubf_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg) { - int r; + VM_ASSERT(func != NULL); - /* - * An old Linux may return EINTR. Even though POSIX says - * "These functions shall not return an error code of [EINTR]". - * http://pubs.opengroup.org/onlinepubs/009695399/functions/pthread_cond_timedwait.html - * Let's hide it from arch generic code. - */ - do { - r = pthread_cond_timedwait(&cond->cond, mutex, ts); - } while (r == EINTR); + retry: + if (RUBY_VM_INTERRUPTED(th->ec)) { + RUBY_DEBUG_LOG("interrupted:0x%x", th->ec->interrupt_flag); + return true; + } - if (r != 0 && r != ETIMEDOUT) { - rb_bug_errno("pthread_cond_timedwait", r); + rb_native_mutex_lock(&th->interrupt_lock); + { + if (!th->ec->raised_flag && RUBY_VM_INTERRUPTED(th->ec)) { + rb_native_mutex_unlock(&th->interrupt_lock); + goto retry; + } + + VM_ASSERT(th->unblock.func == NULL); + th->unblock.func = func; + th->unblock.arg = arg; } + rb_native_mutex_unlock(&th->interrupt_lock); - return r; + return false; } -#if SIZEOF_TIME_T == SIZEOF_LONG -typedef unsigned long unsigned_time_t; -#elif SIZEOF_TIME_T == SIZEOF_INT -typedef unsigned int unsigned_time_t; -#elif SIZEOF_TIME_T == SIZEOF_LONG_LONG -typedef unsigned LONG_LONG unsigned_time_t; -#else -# error cannot find integer type which size is same as time_t. -#endif +static void +ubf_clear(rb_thread_t *th) +{ + if (th->unblock.func) { + rb_native_mutex_lock(&th->interrupt_lock); + { + th->unblock.func = NULL; + th->unblock.arg = NULL; + } + rb_native_mutex_unlock(&th->interrupt_lock); + } +} + +static void +ubf_waiting(void *ptr) +{ + rb_thread_t *th = (rb_thread_t *)ptr; + struct rb_thread_sched *sched = TH_SCHED(th); + + // only once. it is safe because th->interrupt_lock is already acquired. + th->unblock.func = NULL; + th->unblock.arg = NULL; + + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + thread_sched_lock(sched, th); + { + if (sched->running == th) { + // not sleeping yet. + } + else { + thread_sched_to_ready_common(sched, th, true, false); + } + } + thread_sched_unlock(sched, th); +} + +// running -> waiting +// +// This thread will sleep until other thread wakeup the thread. +static void +thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t *th) +{ + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + + RB_VM_SAVE_MACHINE_CONTEXT(th); + + if (ubf_set(th, ubf_waiting, (void *)th)) { + return; + } + + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + + thread_sched_lock(sched, th); + { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + bool can_direct_transfer = !th_has_dedicated_nt(th); + // NOTE: th->status is set before and after this sleep outside of this function in `sleep_forever` + thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + } + else { + RUBY_DEBUG_LOG("th:%u interrupted", rb_th_serial(th)); + } + } + thread_sched_unlock(sched, th); -#define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0)) + ubf_clear(th); +} -static struct timespec -native_cond_timeout(rb_thread_cond_t *cond, struct timespec timeout_rel) +// run another thread in the ready queue. +// continue to run if there are no ready threads. +static void +thread_sched_yield(struct rb_thread_sched *sched, rb_thread_t *th) { - int ret; - struct timeval tv; - struct timespec timeout; - struct timespec now; + RUBY_DEBUG_LOG("th:%d sched->readyq_cnt:%d", (int)th->serial, sched->readyq_cnt); -#if USE_MONOTONIC_COND - if (cond->clockid == CLOCK_MONOTONIC) { - ret = clock_gettime(cond->clockid, &now); - if (ret != 0) - rb_sys_fail("clock_gettime()"); - goto out; + thread_sched_lock(sched, th); + { + if (!ccan_list_empty(&sched->readyq)) { + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + thread_sched_wakeup_next_thread(sched, th, !th_has_dedicated_nt(th)); + bool can_direct_transfer = !th_has_dedicated_nt(th); + thread_sched_to_ready_common(sched, th, false, can_direct_transfer); + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; + } + else { + VM_ASSERT(sched->readyq_cnt == 0); + } } + thread_sched_unlock(sched, th); +} - if (cond->clockid != CLOCK_REALTIME) - rb_bug("unsupported clockid %"PRIdVALUE, (SIGNED_VALUE)cond->clockid); +void +rb_thread_sched_init(struct rb_thread_sched *sched, bool atfork) +{ + rb_native_mutex_initialize(&sched->lock_); + +#if VM_CHECK_MODE + sched->lock_owner = NULL; #endif - ret = gettimeofday(&tv, 0); - if (ret != 0) - rb_sys_fail(0); - now.tv_sec = tv.tv_sec; - now.tv_nsec = tv.tv_usec * 1000; + ccan_list_head_init(&sched->readyq); + sched->readyq_cnt = 0; -#if USE_MONOTONIC_COND - out: +#if USE_MN_THREADS + if (!atfork) sched->enable_mn_threads = true; // MN is enabled on Ractors +#endif +} + +static void +coroutine_transfer0(struct coroutine_context *transfer_from, struct coroutine_context *transfer_to, bool to_dead) +{ +#ifdef RUBY_ASAN_ENABLED + void **fake_stack = to_dead ? NULL : &transfer_from->fake_stack; + __sanitizer_start_switch_fiber(fake_stack, transfer_to->stack_base, transfer_to->stack_size); #endif - timeout.tv_sec = now.tv_sec; - timeout.tv_nsec = now.tv_nsec; - timeout.tv_sec += timeout_rel.tv_sec; - timeout.tv_nsec += timeout_rel.tv_nsec; - if (timeout.tv_nsec >= 1000*1000*1000) { - timeout.tv_sec++; - timeout.tv_nsec -= 1000*1000*1000; + RBIMPL_ATTR_MAYBE_UNUSED() + struct coroutine_context *returning_from = coroutine_transfer(transfer_from, transfer_to); + + /* if to_dead was passed, the caller is promising that this coroutine is finished and it should + * never be resumed! */ + VM_ASSERT(!to_dead); +#ifdef RUBY_ASAN_ENABLED + __sanitizer_finish_switch_fiber(transfer_from->fake_stack, + (const void**)&returning_from->stack_base, &returning_from->stack_size); +#endif + +} + +static void +thread_sched_switch0(struct coroutine_context *current_cont, rb_thread_t *next_th, struct rb_native_thread *nt, bool to_dead) +{ + VM_ASSERT(!nt->dedicated); + VM_ASSERT(next_th->nt == NULL); + + RUBY_DEBUG_LOG("next_th:%u", rb_th_serial(next_th)); + + ruby_thread_set_native(next_th); + native_thread_assign(nt, next_th); + + coroutine_transfer0(current_cont, next_th->sched.context, to_dead); +} + +static void +thread_sched_switch(rb_thread_t *cth, rb_thread_t *next_th) +{ + struct rb_native_thread *nt = cth->nt; + native_thread_assign(NULL, cth); + RUBY_DEBUG_LOG("th:%u->%u on nt:%d", rb_th_serial(cth), rb_th_serial(next_th), nt->serial); + thread_sched_switch0(cth->sched.context, next_th, nt, cth->status == THREAD_KILLED); +} + +#if VM_CHECK_MODE > 0 +RBIMPL_ATTR_MAYBE_UNUSED() +static unsigned int +grq_size(rb_vm_t *vm, rb_ractor_t *cr) +{ + ASSERT_ractor_sched_locked(vm, cr); + + rb_ractor_t *r, *prev_r = NULL; + unsigned int i = 0; + + ccan_list_for_each(&vm->ractor.sched.grq, r, threads.sched.grq_node) { + i++; + + VM_ASSERT(r != prev_r); + prev_r = r; } + return i; +} +#endif - if (timeout.tv_sec < now.tv_sec) - timeout.tv_sec = TIMET_MAX; +static void +ractor_sched_enq(rb_vm_t *vm, rb_ractor_t *r) +{ + struct rb_thread_sched *sched = &r->threads.sched; + rb_ractor_t *cr = NULL; // timer thread can call this function - return timeout; + VM_ASSERT(sched->running != NULL); + VM_ASSERT(sched->running->nt == NULL); + + ractor_sched_lock(vm, cr); + { +#if VM_CHECK_MODE > 0 + // check if grq contains r + rb_ractor_t *tr; + ccan_list_for_each(&vm->ractor.sched.grq, tr, threads.sched.grq_node) { + VM_ASSERT(r != tr); + } +#endif + + ccan_list_add_tail(&vm->ractor.sched.grq, &sched->grq_node); + vm->ractor.sched.grq_cnt++; + VM_ASSERT(grq_size(vm, cr) == vm->ractor.sched.grq_cnt); + + RUBY_DEBUG_LOG("r:%u th:%u grq_cnt:%u", rb_ractor_id(r), rb_th_serial(sched->running), vm->ractor.sched.grq_cnt); + + rb_native_cond_signal(&vm->ractor.sched.cond); + + // ractor_sched_dump(vm); + } + ractor_sched_unlock(vm, cr); } -#define native_cleanup_push pthread_cleanup_push -#define native_cleanup_pop pthread_cleanup_pop -#ifdef HAVE_SCHED_YIELD -#define native_thread_yield() (void)sched_yield() + +#ifndef SNT_KEEP_SECONDS +#define SNT_KEEP_SECONDS 0 +#endif + +#ifndef MINIMUM_SNT +// make at least MINIMUM_SNT snts for debug. +#define MINIMUM_SNT 0 +#endif + +static rb_ractor_t * +ractor_sched_deq(rb_vm_t *vm, rb_ractor_t *cr) +{ + rb_ractor_t *r; + + ractor_sched_lock(vm, cr); + { + RUBY_DEBUG_LOG("empty? %d", ccan_list_empty(&vm->ractor.sched.grq)); + // ractor_sched_dump(vm); + + VM_ASSERT(rb_current_execution_context(false) == NULL); + VM_ASSERT(grq_size(vm, cr) == vm->ractor.sched.grq_cnt); + + while ((r = ccan_list_pop(&vm->ractor.sched.grq, rb_ractor_t, threads.sched.grq_node)) == NULL) { + RUBY_DEBUG_LOG("wait grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); + +#if SNT_KEEP_SECONDS > 0 + rb_hrtime_t abs = rb_hrtime_add(rb_hrtime_now(), RB_HRTIME_PER_SEC * SNT_KEEP_SECONDS); + if (native_cond_timedwait(&vm->ractor.sched.cond, &vm->ractor.sched.lock, &abs) == ETIMEDOUT) { + RUBY_DEBUG_LOG("timeout, grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); + VM_ASSERT(r == NULL); + vm->ractor.sched.snt_cnt--; + vm->ractor.sched.running_cnt--; + break; + } + else { + RUBY_DEBUG_LOG("wakeup grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); + } #else -#define native_thread_yield() ((void)0) + ractor_sched_set_unlocked(vm, cr); + rb_native_cond_wait(&vm->ractor.sched.cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); + + RUBY_DEBUG_LOG("wakeup grq_cnt:%d", (int)vm->ractor.sched.grq_cnt); +#endif + } + + VM_ASSERT(rb_current_execution_context(false) == NULL); + + if (r) { + VM_ASSERT(vm->ractor.sched.grq_cnt > 0); + vm->ractor.sched.grq_cnt--; + RUBY_DEBUG_LOG("r:%d grq_cnt:%u", (int)rb_ractor_id(r), vm->ractor.sched.grq_cnt); + } + else { + VM_ASSERT(SNT_KEEP_SECONDS > 0); + // timeout + } + } + ractor_sched_unlock(vm, cr); + + return r; +} + +void rb_ractor_lock_self(rb_ractor_t *r); +void rb_ractor_unlock_self(rb_ractor_t *r); + +// The current thread for a ractor is put to "sleep" (descheduled in the STOPPED_FOREVER state) waiting for +// a ractor action to wake it up. +void +rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ubf_arg) +{ + // ractor lock of cr is acquired + + RUBY_DEBUG_LOG("start%s", ""); + + rb_thread_t * volatile th = rb_ec_thread_ptr(ec); + struct rb_thread_sched *sched = TH_SCHED(th); + + if (ubf_set(th, ubf, ubf_arg)) { + // interrupted + return; + } + + thread_sched_lock(sched, th); + rb_ractor_unlock_self(cr); + { + // setup sleep + bool can_direct_transfer = !th_has_dedicated_nt(th); + RB_VM_SAVE_MACHINE_CONTEXT(th); + th->status = THREAD_STOPPED_FOREVER; + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); + // sleep + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; + } + thread_sched_unlock(sched, th); + rb_ractor_lock_self(cr); + + ubf_clear(th); + + RUBY_DEBUG_LOG("end%s", ""); +} + +void +rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *r_th) +{ + // ractor lock of r is NOT acquired + struct rb_thread_sched *sched = TH_SCHED(r_th); + + RUBY_DEBUG_LOG("r:%u th:%d", (unsigned int)rb_ractor_id(r), r_th->serial); + + thread_sched_lock(sched, r_th); + { + if (r_th->status == THREAD_STOPPED_FOREVER) { + thread_sched_to_ready_common(sched, r_th, true, false); + } + } + thread_sched_unlock(sched, r_th); +} + +static bool +ractor_sched_barrier_completed_p(rb_vm_t *vm) +{ + RUBY_DEBUG_LOG("run:%u wait:%u", vm->ractor.sched.running_cnt, vm->ractor.sched.barrier_waiting_cnt); + VM_ASSERT(vm->ractor.sched.running_cnt - 1 >= vm->ractor.sched.barrier_waiting_cnt); + + return (vm->ractor.sched.running_cnt - vm->ractor.sched.barrier_waiting_cnt) == 1; +} + +void +rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) +{ + VM_ASSERT(cr == GET_RACTOR()); + VM_ASSERT(vm->ractor.sync.lock_owner == cr); // VM is locked + VM_ASSERT(!vm->ractor.sched.barrier_waiting); + VM_ASSERT(vm->ractor.sched.barrier_waiting_cnt == 0); + VM_ASSERT(vm->ractor.sched.barrier_ractor == NULL); + VM_ASSERT(vm->ractor.sched.barrier_lock_rec == 0); + + RUBY_DEBUG_LOG("start serial:%u", vm->ractor.sched.barrier_serial); + + unsigned int lock_rec; + + ractor_sched_lock(vm, cr); + { + vm->ractor.sched.barrier_waiting = true; + vm->ractor.sched.barrier_ractor = cr; + vm->ractor.sched.barrier_lock_rec = vm->ractor.sync.lock_rec; + + // release VM lock + lock_rec = vm->ractor.sync.lock_rec; + vm->ractor.sync.lock_rec = 0; + vm->ractor.sync.lock_owner = NULL; + rb_native_mutex_unlock(&vm->ractor.sync.lock); + + // interrupts all running threads + rb_thread_t *ith; + ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { + if (ith->ractor != cr) { + RUBY_DEBUG_LOG("barrier request to th:%u", rb_th_serial(ith)); + RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); + } + } + + // wait for other ractors + while (!ractor_sched_barrier_completed_p(vm)) { + ractor_sched_set_unlocked(vm, cr); + rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); + } + + RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); + + // no other ractors are there + vm->ractor.sched.barrier_serial++; + vm->ractor.sched.barrier_waiting_cnt = 0; + rb_native_cond_broadcast(&vm->ractor.sched.barrier_release_cond); + + // acquire VM lock + rb_native_mutex_lock(&vm->ractor.sync.lock); + vm->ractor.sync.lock_rec = lock_rec; + vm->ractor.sync.lock_owner = cr; + } + + // do not release ractor_sched_lock and there is no newly added (resumed) thread + // thread_sched_setup_running_threads +} + +// called from vm_lock_leave if the vm_lock used for barrierred +void +rb_ractor_sched_barrier_end(rb_vm_t *vm, rb_ractor_t *cr) +{ + RUBY_DEBUG_LOG("serial:%u", (unsigned int)vm->ractor.sched.barrier_serial - 1); + VM_ASSERT(vm->ractor.sched.barrier_waiting); + VM_ASSERT(vm->ractor.sched.barrier_ractor); + VM_ASSERT(vm->ractor.sched.barrier_lock_rec > 0); + + vm->ractor.sched.barrier_waiting = false; + vm->ractor.sched.barrier_ractor = NULL; + vm->ractor.sched.barrier_lock_rec = 0; + ractor_sched_unlock(vm, cr); +} + +static void +ractor_sched_barrier_join_signal_locked(rb_vm_t *vm) +{ + if (ractor_sched_barrier_completed_p(vm)) { + rb_native_cond_signal(&vm->ractor.sched.barrier_complete_cond); + } +} + +static void +ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th) +{ + VM_ASSERT(vm->ractor.sched.barrier_waiting); + + unsigned int barrier_serial = vm->ractor.sched.barrier_serial; + + while (vm->ractor.sched.barrier_serial == barrier_serial) { + RUBY_DEBUG_LOG("sleep serial:%u", barrier_serial); + RB_VM_SAVE_MACHINE_CONTEXT(th); + + rb_ractor_t *cr = th->ractor; + ractor_sched_set_unlocked(vm, cr); + rb_native_cond_wait(&vm->ractor.sched.barrier_release_cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); + + RUBY_DEBUG_LOG("wakeup serial:%u", barrier_serial); + } +} + +void +rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) +{ + VM_ASSERT(cr->threads.sched.running != NULL); // running ractor + VM_ASSERT(cr == GET_RACTOR()); + VM_ASSERT(vm->ractor.sync.lock_owner == NULL); // VM is locked, but owner == NULL + VM_ASSERT(vm->ractor.sched.barrier_waiting); // VM needs barrier sync + +#if USE_RUBY_DEBUG_LOG || VM_CHECK_MODE > 0 + unsigned int barrier_serial = vm->ractor.sched.barrier_serial; +#endif + + RUBY_DEBUG_LOG("join"); + + rb_native_mutex_unlock(&vm->ractor.sync.lock); + { + VM_ASSERT(vm->ractor.sched.barrier_waiting); // VM needs barrier sync + VM_ASSERT(vm->ractor.sched.barrier_serial == barrier_serial); + + ractor_sched_lock(vm, cr); + { + // running_cnt + vm->ractor.sched.barrier_waiting_cnt++; + RUBY_DEBUG_LOG("waiting_cnt:%u serial:%u", vm->ractor.sched.barrier_waiting_cnt, barrier_serial); + + ractor_sched_barrier_join_signal_locked(vm); + ractor_sched_barrier_join_wait_locked(vm, cr->threads.sched.running); + } + ractor_sched_unlock(vm, cr); + } + + rb_native_mutex_lock(&vm->ractor.sync.lock); + // VM locked here +} + +#if 0 +// TODO + +static void clear_thread_cache_altstack(void); + +static void +rb_thread_sched_destroy(struct rb_thread_sched *sched) +{ + /* + * only called once at VM shutdown (not atfork), another thread + * may still grab vm->gvl.lock when calling gvl_release at + * the end of thread_start_func_2 + */ + if (0) { + rb_native_mutex_destroy(&sched->lock); + } + clear_thread_cache_altstack(); +} +#endif + +#ifdef RB_THREAD_T_HAS_NATIVE_ID +static int +get_native_thread_id(void) +{ +#ifdef __linux__ + return (int)syscall(SYS_gettid); +#elif defined(__FreeBSD__) + return pthread_getthreadid_np(); +#endif +} #endif -#if defined(SIGVTALRM) && !defined(__CYGWIN__) && !defined(__SYMBIAN32__) -#define USE_SIGNAL_THREAD_LIST 1 +#if defined(HAVE_WORKING_FORK) +static void +thread_sched_atfork(struct rb_thread_sched *sched) +{ + current_fork_gen++; + rb_thread_sched_init(sched, true); + rb_thread_t *th = GET_THREAD(); + rb_vm_t *vm = GET_VM(); + + if (th_has_dedicated_nt(th)) { + vm->ractor.sched.snt_cnt = 0; + } + else { + vm->ractor.sched.snt_cnt = 1; + } + vm->ractor.sched.running_cnt = 0; + + rb_native_mutex_initialize(&vm->ractor.sched.lock); +#if VM_CHECK_MODE > 0 + vm->ractor.sched.lock_owner = NULL; + vm->ractor.sched.locked = false; +#endif + + // rb_native_cond_destroy(&vm->ractor.sched.cond); + rb_native_cond_initialize(&vm->ractor.sched.cond); + rb_native_cond_initialize(&vm->ractor.sched.barrier_complete_cond); + rb_native_cond_initialize(&vm->ractor.sched.barrier_release_cond); + + ccan_list_head_init(&vm->ractor.sched.grq); + ccan_list_head_init(&vm->ractor.sched.timeslice_threads); + ccan_list_head_init(&vm->ractor.sched.running_threads); + + VM_ASSERT(sched->is_running); + sched->is_running_timeslice = false; + + if (sched->running != th) { + thread_sched_to_running(sched, th); + } + else { + thread_sched_setup_running_threads(sched, th->ractor, vm, th, NULL, NULL); + } + +#ifdef RB_THREAD_T_HAS_NATIVE_ID + if (th->nt) { + th->nt->tid = get_native_thread_id(); + } #endif -#ifdef USE_SIGNAL_THREAD_LIST -static void add_signal_thread_list(rb_thread_t *th); -static void remove_signal_thread_list(rb_thread_t *th); -static rb_thread_lock_t signal_thread_list_lock; +} + #endif +#ifdef RB_THREAD_LOCAL_SPECIFIER +static RB_THREAD_LOCAL_SPECIFIER rb_thread_t *ruby_native_thread; +#else static pthread_key_t ruby_native_thread_key; +#endif static void null_func(int i) { /* null */ + // This function can be called from signal handler + // RUBY_DEBUG_LOG("i:%d", i); } -static rb_thread_t * +rb_thread_t * ruby_thread_from_native(void) { +#ifdef RB_THREAD_LOCAL_SPECIFIER + return ruby_native_thread; +#else return pthread_getspecific(ruby_native_thread_key); +#endif } -static int +int ruby_thread_set_native(rb_thread_t *th) { + if (th) { +#ifdef USE_UBF_LIST + ccan_list_node_init(&th->sched.node.ubf); +#endif + } + + // setup TLS + + if (th && th->ec) { + rb_ractor_set_current_ec(th->ractor, th->ec); + } +#ifdef RB_THREAD_LOCAL_SPECIFIER + ruby_native_thread = th; + return 1; +#else return pthread_setspecific(ruby_native_thread_key, th) == 0; +#endif } -static void native_thread_init(rb_thread_t *th); +static void native_thread_setup(struct rb_native_thread *nt); +static void native_thread_setup_on_thread(struct rb_native_thread *nt); void -Init_native_thread(void) +Init_native_thread(rb_thread_t *main_th) { - rb_thread_t *th = GET_THREAD(); +#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) + if (condattr_monotonic) { + int r = pthread_condattr_init(condattr_monotonic); + if (r == 0) { + r = pthread_condattr_setclock(condattr_monotonic, CLOCK_MONOTONIC); + } + if (r) condattr_monotonic = NULL; + } +#endif - pthread_key_create(&ruby_native_thread_key, NULL); - th->thread_id = pthread_self(); - native_thread_init(th); -#ifdef USE_SIGNAL_THREAD_LIST - native_mutex_initialize(&signal_thread_list_lock); +#ifndef RB_THREAD_LOCAL_SPECIFIER + if (pthread_key_create(&ruby_native_thread_key, 0) == EAGAIN) { + rb_bug("pthread_key_create failed (ruby_native_thread_key)"); + } + if (pthread_key_create(&ruby_current_ec_key, 0) == EAGAIN) { + rb_bug("pthread_key_create failed (ruby_current_ec_key)"); + } #endif -#ifndef __native_client__ - posix_signal(SIGVTALRM, null_func); + ruby_posix_signal(SIGVTALRM, null_func); + + // setup vm + rb_vm_t *vm = main_th->vm; + rb_native_mutex_initialize(&vm->ractor.sched.lock); + rb_native_cond_initialize(&vm->ractor.sched.cond); + rb_native_cond_initialize(&vm->ractor.sched.barrier_complete_cond); + rb_native_cond_initialize(&vm->ractor.sched.barrier_release_cond); + + ccan_list_head_init(&vm->ractor.sched.grq); + ccan_list_head_init(&vm->ractor.sched.timeslice_threads); + ccan_list_head_init(&vm->ractor.sched.running_threads); + + // setup main thread + main_th->nt->thread_id = pthread_self(); + main_th->nt->serial = 1; +#ifdef RUBY_NT_SERIAL + ruby_nt_serial = 1; #endif + ruby_thread_set_native(main_th); + native_thread_setup(main_th->nt); + native_thread_setup_on_thread(main_th->nt); + + TH_SCHED(main_th)->running = main_th; + main_th->has_dedicated_nt = 1; + + thread_sched_setup_running_threads(TH_SCHED(main_th), main_th->ractor, vm, main_th, NULL, NULL); + + // setup main NT + main_th->nt->dedicated = 1; + main_th->nt->vm = vm; + + // setup mn + vm->ractor.sched.dnt_cnt = 1; +} + +extern int ruby_mn_threads_enabled; + +void +ruby_mn_threads_params(void) +{ + rb_vm_t *vm = GET_VM(); + rb_ractor_t *main_ractor = GET_RACTOR(); + + const char *mn_threads_cstr = getenv("RUBY_MN_THREADS"); + bool enable_mn_threads = false; + + if (USE_MN_THREADS && mn_threads_cstr && (enable_mn_threads = atoi(mn_threads_cstr) > 0)) { + // enabled + ruby_mn_threads_enabled = 1; + } + main_ractor->threads.sched.enable_mn_threads = enable_mn_threads; + + const char *max_cpu_cstr = getenv("RUBY_MAX_CPU"); + const int default_max_cpu = 8; // TODO: CPU num? + int max_cpu = default_max_cpu; + + if (USE_MN_THREADS && max_cpu_cstr) { + int given_max_cpu = atoi(max_cpu_cstr); + if (given_max_cpu > 0) { + max_cpu = given_max_cpu; + } + } + + vm->ractor.sched.max_cpu = max_cpu; } static void -native_thread_init(rb_thread_t *th) +native_thread_dedicated_inc(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt) { - native_cond_initialize(&th->native_thread_data.sleep_cond, RB_CONDATTR_CLOCK_MONOTONIC); - ruby_thread_set_native(th); + RUBY_DEBUG_LOG("nt:%d %d->%d", nt->serial, nt->dedicated, nt->dedicated + 1); + + if (nt->dedicated == 0) { + ractor_sched_lock(vm, cr); + { + vm->ractor.sched.snt_cnt--; + vm->ractor.sched.dnt_cnt++; + } + ractor_sched_unlock(vm, cr); + } + + nt->dedicated++; } static void -native_thread_destroy(rb_thread_t *th) +native_thread_dedicated_dec(rb_vm_t *vm, rb_ractor_t *cr, struct rb_native_thread *nt) { - native_cond_destroy(&th->native_thread_data.sleep_cond); + RUBY_DEBUG_LOG("nt:%d %d->%d", nt->serial, nt->dedicated, nt->dedicated - 1); + VM_ASSERT(nt->dedicated > 0); + nt->dedicated--; + + if (nt->dedicated == 0) { + ractor_sched_lock(vm, cr); + { + nt->vm->ractor.sched.snt_cnt++; + nt->vm->ractor.sched.dnt_cnt--; + } + ractor_sched_unlock(vm, cr); + } } -#ifndef USE_THREAD_CACHE -#define USE_THREAD_CACHE 0 +static void +native_thread_assign(struct rb_native_thread *nt, rb_thread_t *th) +{ +#if USE_RUBY_DEBUG_LOG + if (nt) { + if (th->nt) { + RUBY_DEBUG_LOG("th:%d nt:%d->%d", (int)th->serial, (int)th->nt->serial, (int)nt->serial); + } + else { + RUBY_DEBUG_LOG("th:%d nt:NULL->%d", (int)th->serial, (int)nt->serial); + } + } + else { + if (th->nt) { + RUBY_DEBUG_LOG("th:%d nt:%d->NULL", (int)th->serial, (int)th->nt->serial); + } + else { + RUBY_DEBUG_LOG("th:%d nt:NULL->NULL", (int)th->serial); + } + } #endif -#if USE_THREAD_CACHE -static rb_thread_t *register_cached_thread_and_wait(void); -#endif + th->nt = nt; +} + +static void +native_thread_destroy_atfork(struct rb_native_thread *nt) +{ + if (nt) { + /* We can't call rb_native_cond_destroy here because according to the + * specs of pthread_cond_destroy: + * + * Attempting to destroy a condition variable upon which other threads + * are currently blocked results in undefined behavior. + * + * Specifically, glibc's pthread_cond_destroy waits on all the other + * listeners. Since after forking all the threads are dead, the condition + * variable's listeners will never wake up, so it will hang forever. + */ + + RB_ALTSTACK_FREE(nt->altstack); + ruby_xfree(nt->nt_context); + ruby_xfree(nt); + } +} + +static void +native_thread_destroy(struct rb_native_thread *nt) +{ + if (nt) { + rb_native_cond_destroy(&nt->cond.readyq); + + if (&nt->cond.readyq != &nt->cond.intr) { + rb_native_cond_destroy(&nt->cond.intr); + } + + native_thread_destroy_atfork(nt); + } +} #if defined HAVE_PTHREAD_GETATTR_NP || defined HAVE_PTHREAD_ATTR_GET_NP #define STACKADDR_AVAILABLE 1 #elif defined HAVE_PTHREAD_GET_STACKADDR_NP && defined HAVE_PTHREAD_GET_STACKSIZE_NP #define STACKADDR_AVAILABLE 1 #undef MAINSTACKADDR_AVAILABLE -#define MAINSTACKADDR_AVAILABLE 0 +#define MAINSTACKADDR_AVAILABLE 1 void *pthread_get_stackaddr_np(pthread_t); size_t pthread_get_stacksize_np(pthread_t); #elif defined HAVE_THR_STKSEGMENT || defined HAVE_PTHREAD_STACKSEG_NP #define STACKADDR_AVAILABLE 1 #elif defined HAVE_PTHREAD_GETTHRDS_NP #define STACKADDR_AVAILABLE 1 +#elif defined __HAIKU__ +#define STACKADDR_AVAILABLE 1 #endif #ifndef MAINSTACKADDR_AVAILABLE @@ -520,6 +1864,9 @@ size_t pthread_get_stacksize_np(pthread_t); # define MAINSTACKADDR_AVAILABLE 0 # endif #endif +#if MAINSTACKADDR_AVAILABLE && !defined(get_main_stack) +# define get_main_stack(addr, size) get_stack(addr, size) +#endif #ifdef STACKADDR_AVAILABLE /* @@ -542,7 +1889,11 @@ get_stack(void **addr, size_t *size) CHECK_ERR(pthread_attr_getstackaddr(&attr, addr)); CHECK_ERR(pthread_attr_getstacksize(&attr, size)); # endif +# ifdef HAVE_PTHREAD_ATTR_GETGUARDSIZE CHECK_ERR(pthread_attr_getguardsize(&attr, &guard)); +# else + guard = getpagesize(); +# endif *size -= guard; pthread_attr_destroy(&attr); #elif defined HAVE_PTHREAD_ATTR_GET_NP /* FreeBSD, DragonFly BSD, NetBSD */ @@ -551,12 +1902,11 @@ get_stack(void **addr, size_t *size) CHECK_ERR(pthread_attr_get_np(pthread_self(), &attr)); # ifdef HAVE_PTHREAD_ATTR_GETSTACK CHECK_ERR(pthread_attr_getstack(&attr, addr, size)); - STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); # else CHECK_ERR(pthread_attr_getstackaddr(&attr, addr)); CHECK_ERR(pthread_attr_getstacksize(&attr, size)); - STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); # endif + STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); pthread_attr_destroy(&attr); #elif (defined HAVE_PTHREAD_GET_STACKADDR_NP && defined HAVE_PTHREAD_GET_STACKSIZE_NP) /* MacOS X */ pthread_t th = pthread_self(); @@ -577,10 +1927,20 @@ get_stack(void **addr, size_t *size) char reg[256]; int regsiz=sizeof(reg); CHECK_ERR(pthread_getthrds_np(&th, PTHRDSINFO_QUERY_ALL, - &thinfo, sizeof(thinfo), - ®, ®siz)); + &thinfo, sizeof(thinfo), + ®, ®siz)); *addr = thinfo.__pi_stackaddr; - *size = thinfo.__pi_stacksize; + /* Must not use thinfo.__pi_stacksize for size. + It is around 3KB smaller than the correct size + calculated by thinfo.__pi_stackend - thinfo.__pi_stackaddr. */ + *size = thinfo.__pi_stackend - thinfo.__pi_stackaddr; + STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); +#elif defined __HAIKU__ + thread_info info; + STACK_GROW_DIR_DETECTION; + CHECK_ERR(get_thread_info(find_thread(NULL), &info)); + *addr = info.stack_base; + *size = (uintptr_t)info.stack_end - (uintptr_t)info.stack_base; STACK_DIR_UPPER((void)0, (void)(*addr = (char *)*addr + *size)); #else #error STACKADDR_AVAILABLE is defined but not implemented. @@ -591,12 +1951,9 @@ get_stack(void **addr, size_t *size) #endif static struct { - rb_thread_id_t id; + rb_nativethread_id_t id; size_t stack_maxsize; VALUE *stack_start; -#ifdef __ia64 - VALUE *register_stack_start; -#endif } native_main_thread; #ifdef STACK_END_ADDRESS @@ -613,97 +1970,94 @@ space_size(size_t stack_size) { size_t space_size = stack_size / RUBY_STACK_SPACE_RATIO; if (space_size > RUBY_STACK_SPACE_LIMIT) { - return RUBY_STACK_SPACE_LIMIT; + return RUBY_STACK_SPACE_LIMIT; } else { - return space_size; + return space_size; } } -#undef ruby_init_stack -/* Set stack bottom of Ruby implementation. - * - * You must call this function before any heap allocation by Ruby implementation. - * Or GC will break living objects */ -void -ruby_init_stack(volatile VALUE *addr -#ifdef __ia64 - , void *bsp -#endif - ) +static void +native_thread_init_main_thread_stack(void *addr) { native_main_thread.id = pthread_self(); +#ifdef RUBY_ASAN_ENABLED + addr = asan_get_real_stack_addr((void *)addr); +#endif + +#if MAINSTACKADDR_AVAILABLE + if (native_main_thread.stack_maxsize) return; + { + void* stackaddr; + size_t size; + if (get_main_stack(&stackaddr, &size) == 0) { + native_main_thread.stack_maxsize = size; + native_main_thread.stack_start = stackaddr; + goto bound_check; + } + } +#endif #ifdef STACK_END_ADDRESS native_main_thread.stack_start = STACK_END_ADDRESS; #else if (!native_main_thread.stack_start || STACK_UPPER((VALUE *)(void *)&addr, - native_main_thread.stack_start > addr, - native_main_thread.stack_start < addr)) { + native_main_thread.stack_start > (VALUE *)addr, + native_main_thread.stack_start < (VALUE *)addr)) { native_main_thread.stack_start = (VALUE *)addr; } #endif -#ifdef __ia64 - if (!native_main_thread.register_stack_start || - (VALUE*)bsp < native_main_thread.register_stack_start) { - native_main_thread.register_stack_start = (VALUE*)bsp; - } -#endif { +#if defined(HAVE_GETRLIMIT) #if defined(PTHREAD_STACK_DEFAULT) # if PTHREAD_STACK_DEFAULT < RUBY_STACK_SPACE*5 # error "PTHREAD_STACK_DEFAULT is too small" # endif - size_t size = PTHREAD_STACK_DEFAULT; + size_t size = PTHREAD_STACK_DEFAULT; #else - size_t size = RUBY_VM_THREAD_VM_STACK_SIZE; + size_t size = RUBY_VM_THREAD_VM_STACK_SIZE; #endif - size_t space = space_size(size); -#if MAINSTACKADDR_AVAILABLE - void* stackaddr; - STACK_GROW_DIR_DETECTION; - if (get_stack(&stackaddr, &size) == 0) { - space = STACK_DIR_UPPER((char *)addr - (char *)stackaddr, (char *)stackaddr - (char *)addr); - } - native_main_thread.stack_maxsize = size - space; -#elif defined(HAVE_GETRLIMIT) - int pagesize = getpagesize(); - struct rlimit rlim; + size_t space; + int pagesize = getpagesize(); + struct rlimit rlim; STACK_GROW_DIR_DETECTION; - if (getrlimit(RLIMIT_STACK, &rlim) == 0) { - size = (size_t)rlim.rlim_cur; - } - addr = native_main_thread.stack_start; - if (IS_STACK_DIR_UPPER()) { - space = ((size_t)((char *)addr + size) / pagesize) * pagesize - (size_t)addr; - } - else { - space = (size_t)addr - ((size_t)((char *)addr - size) / pagesize + 1) * pagesize; - } - native_main_thread.stack_maxsize = space; + if (getrlimit(RLIMIT_STACK, &rlim) == 0) { + size = (size_t)rlim.rlim_cur; + } + addr = native_main_thread.stack_start; + if (IS_STACK_DIR_UPPER()) { + space = ((size_t)((char *)addr + size) / pagesize) * pagesize - (size_t)addr; + } + else { + space = (size_t)addr - ((size_t)((char *)addr - size) / pagesize + 1) * pagesize; + } + native_main_thread.stack_maxsize = space; #endif } +#if MAINSTACKADDR_AVAILABLE + bound_check: +#endif /* If addr is out of range of main-thread stack range estimation, */ /* it should be on co-routine (alternative stack). [Feature #2294] */ { - void *start, *end; - STACK_GROW_DIR_DETECTION; + void *start, *end; + STACK_GROW_DIR_DETECTION; - if (IS_STACK_DIR_UPPER()) { - start = native_main_thread.stack_start; - end = (char *)native_main_thread.stack_start + native_main_thread.stack_maxsize; - } - else { - start = (char *)native_main_thread.stack_start - native_main_thread.stack_maxsize; - end = native_main_thread.stack_start; - } + if (IS_STACK_DIR_UPPER()) { + start = native_main_thread.stack_start; + end = (char *)native_main_thread.stack_start + native_main_thread.stack_maxsize; + } + else { + start = (char *)native_main_thread.stack_start - native_main_thread.stack_maxsize; + end = native_main_thread.stack_start; + } - if ((void *)addr < start || (void *)addr > end) { - /* out of range */ - native_main_thread.stack_start = (VALUE *)addr; - native_main_thread.stack_maxsize = 0; /* unknown */ - } + if ((void *)addr < start || (void *)addr > end) { + /* out of range */ + native_main_thread.stack_start = (VALUE *)addr; + native_main_thread.stack_maxsize = 0; /* unknown */ + } } } @@ -711,221 +2065,333 @@ ruby_init_stack(volatile VALUE *addr {int err = (expr); if (err) {rb_bug_errno(#expr, err);}} static int -native_thread_init_stack(rb_thread_t *th) +native_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame) { - rb_thread_id_t curr = pthread_self(); + rb_nativethread_id_t curr = pthread_self(); +#ifdef RUBY_ASAN_ENABLED + local_in_parent_frame = asan_get_real_stack_addr(local_in_parent_frame); + th->ec->machine.asan_fake_stack_handle = asan_get_thread_fake_stack_handle(); +#endif + + if (!native_main_thread.id) { + /* This thread is the first thread, must be the main thread - + * configure the native_main_thread object */ + native_thread_init_main_thread_stack(local_in_parent_frame); + } if (pthread_equal(curr, native_main_thread.id)) { - th->machine_stack_start = native_main_thread.stack_start; - th->machine_stack_maxsize = native_main_thread.stack_maxsize; + th->ec->machine.stack_start = native_main_thread.stack_start; + th->ec->machine.stack_maxsize = native_main_thread.stack_maxsize; } else { #ifdef STACKADDR_AVAILABLE - void *start; - size_t size; - - if (get_stack(&start, &size) == 0) { - th->machine_stack_start = start; - th->machine_stack_maxsize = size; - } + if (th_has_dedicated_nt(th)) { + void *start; + size_t size; + + if (get_stack(&start, &size) == 0) { + uintptr_t diff = (uintptr_t)start - (uintptr_t)local_in_parent_frame; + th->ec->machine.stack_start = local_in_parent_frame; + th->ec->machine.stack_maxsize = size - diff; + } + } #else - rb_raise(rb_eNotImpError, "ruby engine can initialize only in the main thread"); + rb_raise(rb_eNotImpError, "ruby engine can initialize only in the main thread"); #endif } -#ifdef __ia64 - th->machine_register_stack_start = native_main_thread.register_stack_start; - th->machine_stack_maxsize /= 2; - th->machine_register_stack_maxsize = th->machine_stack_maxsize; -#endif + return 0; } -#ifndef __CYGWIN__ -#define USE_NATIVE_THREAD_INIT 1 -#endif +struct nt_param { + rb_vm_t *vm; + struct rb_native_thread *nt; +}; static void * -thread_start_func_1(void *th_ptr) +nt_start(void *ptr); + +static int +native_thread_create0(struct rb_native_thread *nt) { -#if USE_THREAD_CACHE - thread_start: -#endif - { - rb_thread_t *th = th_ptr; -#if !defined USE_NATIVE_THREAD_INIT - VALUE stack_start; -#endif + int err = 0; + pthread_attr_t attr; -#if defined USE_NATIVE_THREAD_INIT - native_thread_init_stack(th); -#endif - native_thread_init(th); - /* run */ -#if defined USE_NATIVE_THREAD_INIT - thread_start_func_2(th, th->machine_stack_start, rb_ia64_bsp()); -#else - thread_start_func_2(th, &stack_start, rb_ia64_bsp()); + const size_t stack_size = nt->vm->default_params.thread_machine_stack_size; + const size_t space = space_size(stack_size); + + nt->machine_stack_maxsize = stack_size - space; + +#ifdef USE_SIGALTSTACK + nt->altstack = rb_allocate_sigaltstack(); #endif + + CHECK_ERR(pthread_attr_init(&attr)); + +# ifdef PTHREAD_STACK_MIN + RUBY_DEBUG_LOG("stack size: %lu", (unsigned long)stack_size); + CHECK_ERR(pthread_attr_setstacksize(&attr, stack_size)); +# endif + +# ifdef HAVE_PTHREAD_ATTR_SETINHERITSCHED + CHECK_ERR(pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED)); +# endif + CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); + + err = pthread_create(&nt->thread_id, &attr, nt_start, nt); + + RUBY_DEBUG_LOG("nt:%d err:%d", (int)nt->serial, err); + + CHECK_ERR(pthread_attr_destroy(&attr)); + + return err; +} + +static void +native_thread_setup(struct rb_native_thread *nt) +{ + // init cond + rb_native_cond_initialize(&nt->cond.readyq); + + if (&nt->cond.readyq != &nt->cond.intr) { + rb_native_cond_initialize(&nt->cond.intr); } -#if USE_THREAD_CACHE - if (1) { - /* cache thread */ - rb_thread_t *th; - if ((th = register_cached_thread_and_wait()) != 0) { - th_ptr = (void *)th; - th->thread_id = pthread_self(); - goto thread_start; - } - } +} + +static void +native_thread_setup_on_thread(struct rb_native_thread *nt) +{ + // init tid +#ifdef RB_THREAD_T_HAS_NATIVE_ID + nt->tid = get_native_thread_id(); #endif - return 0; + + // init signal handler + RB_ALTSTACK_INIT(nt->altstack, nt->altstack); } -struct cached_thread_entry { - volatile rb_thread_t **th_area; - rb_thread_cond_t *cond; - struct cached_thread_entry *next; -}; +static struct rb_native_thread * +native_thread_alloc(void) +{ + struct rb_native_thread *nt = ZALLOC(struct rb_native_thread); + native_thread_setup(nt); +#if USE_MN_THREADS + nt->nt_context = ruby_xmalloc(sizeof(struct coroutine_context)); +#endif -#if USE_THREAD_CACHE -static pthread_mutex_t thread_cache_lock = PTHREAD_MUTEX_INITIALIZER; -struct cached_thread_entry *cached_thread_root; +#if USE_RUBY_DEBUG_LOG + static rb_atomic_t nt_serial = 2; + nt->serial = RUBY_ATOMIC_FETCH_ADD(nt_serial, 1); +#endif + return nt; +} -static rb_thread_t * -register_cached_thread_and_wait(void) +static int +native_thread_create_dedicated(rb_thread_t *th) { - rb_thread_cond_t cond = { PTHREAD_COND_INITIALIZER, }; - volatile rb_thread_t *th_area = 0; - struct timeval tv; - struct timespec ts; - struct cached_thread_entry *entry = - (struct cached_thread_entry *)malloc(sizeof(struct cached_thread_entry)); + th->nt = native_thread_alloc(); + th->nt->vm = th->vm; + th->nt->running_thread = th; + th->nt->dedicated = 1; + + // vm stack + size_t vm_stack_word_size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); + void *vm_stack = ruby_xmalloc(vm_stack_word_size * sizeof(VALUE)); + th->sched.malloc_stack = true; + rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_word_size); + th->sched.context_stack = vm_stack; - if (entry == 0) { - return 0; /* failed -> terminate thread immediately */ + + int err = native_thread_create0(th->nt); + if (!err) { + // setup + thread_sched_to_ready(TH_SCHED(th), th); } + return err; +} + +static void +call_thread_start_func_2(rb_thread_t *th) +{ + /* Capture the address of a local in this stack frame to mark the beginning of the + machine stack for this thread. This is required even if we can tell the real + stack beginning from the pthread API in native_thread_init_stack, because + glibc stores some of its own data on the stack before calling into user code + on a new thread, and replacing that data on fiber-switch would break it (see + bug #13887) */ + VALUE stack_start = 0; + VALUE *stack_start_addr = asan_get_real_stack_addr(&stack_start); + + native_thread_init_stack(th, stack_start_addr); + thread_start_func_2(th, th->ec->machine.stack_start); +} - gettimeofday(&tv, 0); - ts.tv_sec = tv.tv_sec + 60; - ts.tv_nsec = tv.tv_usec * 1000; +static void * +nt_start(void *ptr) +{ + struct rb_native_thread *nt = (struct rb_native_thread *)ptr; + rb_vm_t *vm = nt->vm; - pthread_mutex_lock(&thread_cache_lock); - { - entry->th_area = &th_area; - entry->cond = &cond; - entry->next = cached_thread_root; - cached_thread_root = entry; + native_thread_setup_on_thread(nt); - native_cond_timedwait(&cond, &thread_cache_lock, &ts); + // init tid +#ifdef RB_THREAD_T_HAS_NATIVE_ID + nt->tid = get_native_thread_id(); +#endif - { - struct cached_thread_entry *e = cached_thread_root; - struct cached_thread_entry *prev = cached_thread_root; +#if USE_RUBY_DEBUG_LOG && defined(RUBY_NT_SERIAL) + ruby_nt_serial = nt->serial; +#endif + + RUBY_DEBUG_LOG("nt:%u", nt->serial); - while (e) { - if (e == entry) { - if (prev == cached_thread_root) { - cached_thread_root = e->next; - } - else { - prev->next = e->next; - } - break; - } - prev = e; - e = e->next; - } - } + if (!nt->dedicated) { + coroutine_initialize_main(nt->nt_context); + } - free(entry); /* ok */ - native_cond_destroy(&cond); + while (1) { + if (nt->dedicated) { + // wait running turn + rb_thread_t *th = nt->running_thread; + struct rb_thread_sched *sched = TH_SCHED(th); + + RUBY_DEBUG_LOG("on dedicated th:%u", rb_th_serial(th)); + ruby_thread_set_native(th); + + thread_sched_lock(sched, th); + { + if (sched->running == th) { + thread_sched_add_running_thread(sched, th); + } + thread_sched_wait_running_turn(sched, th, false); + } + thread_sched_unlock(sched, th); + + // start threads + call_thread_start_func_2(th); + break; // TODO: allow to change to the SNT + } + else { + RUBY_DEBUG_LOG("check next"); + rb_ractor_t *r = ractor_sched_deq(vm, NULL); + + if (r) { + struct rb_thread_sched *sched = &r->threads.sched; + + thread_sched_lock(sched, NULL); + { + rb_thread_t *next_th = sched->running; + + if (next_th && next_th->nt == NULL) { + RUBY_DEBUG_LOG("nt:%d next_th:%d", (int)nt->serial, (int)next_th->serial); + thread_sched_switch0(nt->nt_context, next_th, nt, false); + } + else { + RUBY_DEBUG_LOG("no schedulable threads -- next_th:%p", next_th); + } + } + thread_sched_unlock(sched, NULL); + } + else { + // timeout -> deleted. + break; + } + + if (nt->dedicated) { + // SNT becomes DNT while running + break; + } + } } - pthread_mutex_unlock(&thread_cache_lock); - return (rb_thread_t *)th_area; + return NULL; } + +static int native_thread_create_shared(rb_thread_t *th); + +#if USE_MN_THREADS +static void nt_free_stack(void *mstack); #endif -static int -use_cached_thread(rb_thread_t *th) -{ - int result = 0; -#if USE_THREAD_CACHE - struct cached_thread_entry *entry; - - if (cached_thread_root) { - pthread_mutex_lock(&thread_cache_lock); - entry = cached_thread_root; - { - if (cached_thread_root) { - cached_thread_root = entry->next; - *entry->th_area = th; - result = 1; - } - } - if (result) { - native_cond_signal(entry->cond); - } - pthread_mutex_unlock(&thread_cache_lock); +void +rb_threadptr_remove(rb_thread_t *th) +{ +#if USE_MN_THREADS + if (th->sched.malloc_stack) { + // dedicated + return; + } + else { + rb_vm_t *vm = th->vm; + th->sched.finished = false; + + RB_VM_LOCKING() { + ccan_list_add(&vm->ractor.sched.zombie_threads, &th->sched.node.zombie_threads); + } } #endif - return result; } -static int -native_thread_create(rb_thread_t *th) +void +rb_threadptr_sched_free(rb_thread_t *th) { - int err = 0; - - if (use_cached_thread(th)) { - thread_debug("create (use cached thread): %p\n", (void *)th); +#if USE_MN_THREADS + if (th->sched.malloc_stack) { + // has dedicated + ruby_xfree(th->sched.context_stack); + native_thread_destroy(th->nt); } else { - pthread_attr_t attr; - const size_t stack_size = th->vm->default_params.thread_machine_stack_size; - const size_t space = space_size(stack_size); + nt_free_stack(th->sched.context_stack); + // TODO: how to free nt and nt->altstack? + } - th->machine_stack_maxsize = stack_size - space; -#ifdef __ia64 - th->machine_stack_maxsize /= 2; - th->machine_register_stack_maxsize = th->machine_stack_maxsize; + ruby_xfree(th->sched.context); + th->sched.context = NULL; + // VM_ASSERT(th->sched.context == NULL); +#else + ruby_xfree(th->sched.context_stack); + native_thread_destroy(th->nt); #endif -#ifdef HAVE_PTHREAD_ATTR_INIT - CHECK_ERR(pthread_attr_init(&attr)); - -# ifdef PTHREAD_STACK_MIN - thread_debug("create - stack size: %lu\n", (unsigned long)stack_size); - CHECK_ERR(pthread_attr_setstacksize(&attr, stack_size)); -# endif - -# ifdef HAVE_PTHREAD_ATTR_SETINHERITSCHED - CHECK_ERR(pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED)); -# endif - CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); + th->nt = NULL; +} - err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th); -#else - err = pthread_create(&th->thread_id, NULL, thread_start_func_1, th); -#endif - thread_debug("create: %p (%d)\n", (void *)th, err); -#ifdef HAVE_PTHREAD_ATTR_INIT - CHECK_ERR(pthread_attr_destroy(&attr)); -#endif +void +rb_thread_sched_mark_zombies(rb_vm_t *vm) +{ + if (!ccan_list_empty(&vm->ractor.sched.zombie_threads)) { + rb_thread_t *zombie_th, *next_zombie_th; + ccan_list_for_each_safe(&vm->ractor.sched.zombie_threads, zombie_th, next_zombie_th, sched.node.zombie_threads) { + if (zombie_th->sched.finished) { + ccan_list_del_init(&zombie_th->sched.node.zombie_threads); + } + else { + rb_gc_mark(zombie_th->self); + } + } } - return err; } -static void -native_thread_join(pthread_t th) +static int +native_thread_create(rb_thread_t *th) { - int err = pthread_join(th, 0); - if (err) { - rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); + VM_ASSERT(th->nt == 0); + RUBY_DEBUG_LOG("th:%d has_dnt:%d", th->serial, th->has_dedicated_nt); + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_STARTED, th); + + if (!th->ractor->threads.sched.enable_mn_threads) { + th->has_dedicated_nt = 1; } -} + if (th->has_dedicated_nt) { + return native_thread_create_dedicated(th); + } + else { + return native_thread_create_shared(th); + } +} #if USE_NATIVE_THREAD_PRIORITY @@ -937,19 +2403,19 @@ native_thread_apply_priority(rb_thread_t *th) int policy; int priority = 0 - th->priority; int max, min; - pthread_getschedparam(th->thread_id, &policy, &sp); + pthread_getschedparam(th->nt->thread_id, &policy, &sp); max = sched_get_priority_max(policy); min = sched_get_priority_min(policy); if (min > priority) { - priority = min; + priority = min; } else if (max < priority) { - priority = max; + priority = max; } sp.sched_priority = priority; - pthread_setschedparam(th->thread_id, policy, &sp); + pthread_setschedparam(th->nt->thread_id, policy, &sp); #else /* not touched */ #endif @@ -967,287 +2433,228 @@ static void ubf_pthread_cond_signal(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; - thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th); - native_cond_signal(&th->native_thread_data.sleep_cond); + RUBY_DEBUG_LOG("th:%u on nt:%d", rb_th_serial(th), (int)th->nt->serial); + rb_native_cond_signal(&th->nt->cond.intr); } static void -native_sleep(rb_thread_t *th, struct timeval *timeout_tv) +native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel) { - struct timespec timeout; - pthread_mutex_t *lock = &th->interrupt_lock; - rb_thread_cond_t *cond = &th->native_thread_data.sleep_cond; - - if (timeout_tv) { - struct timespec timeout_rel; - - timeout_rel.tv_sec = timeout_tv->tv_sec; - timeout_rel.tv_nsec = timeout_tv->tv_usec * 1000; - - /* Solaris cond_timedwait() return EINVAL if an argument is greater than - * current_time + 100,000,000. So cut up to 100,000,000. This is - * considered as a kind of spurious wakeup. The caller to native_sleep - * should care about spurious wakeup. - * - * See also [Bug #1341] [ruby-core:29702] - * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html - */ - if (timeout_rel.tv_sec > 100000000) { - timeout_rel.tv_sec = 100000000; - timeout_rel.tv_nsec = 0; - } - - timeout = native_cond_timeout(cond, timeout_rel); - } + rb_nativethread_lock_t *lock = &th->interrupt_lock; + rb_nativethread_cond_t *cond = &th->nt->cond.intr; + + /* Solaris cond_timedwait() return EINVAL if an argument is greater than + * current_time + 100,000,000. So cut up to 100,000,000. This is + * considered as a kind of spurious wakeup. The caller to native_sleep + * should care about spurious wakeup. + * + * See also [Bug #1341] [ruby-core:29702] + * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html + */ + const rb_hrtime_t max = (rb_hrtime_t)100000000 * RB_HRTIME_PER_SEC; - GVL_UNLOCK_BEGIN(); + THREAD_BLOCKING_BEGIN(th); { - pthread_mutex_lock(lock); - th->unblock.func = ubf_pthread_cond_signal; - th->unblock.arg = th; + rb_native_mutex_lock(lock); + th->unblock.func = ubf_pthread_cond_signal; + th->unblock.arg = th; - if (RUBY_VM_INTERRUPTED(th)) { - /* interrupted. return immediate */ - thread_debug("native_sleep: interrupted before sleep\n"); - } - else { - if (!timeout_tv) - native_cond_wait(cond, lock); - else - native_cond_timedwait(cond, lock, &timeout); - } - th->unblock.func = 0; - th->unblock.arg = 0; + if (RUBY_VM_INTERRUPTED(th->ec)) { + /* interrupted. return immediate */ + RUBY_DEBUG_LOG("interrupted before sleep th:%u", rb_th_serial(th)); + } + else { + if (!rel) { + rb_native_cond_wait(cond, lock); + } + else { + rb_hrtime_t end; + + if (*rel > max) { + *rel = max; + } + + end = native_cond_timeout(cond, *rel); + native_cond_timedwait(cond, lock, &end); + } + } + th->unblock.func = 0; - pthread_mutex_unlock(lock); + rb_native_mutex_unlock(lock); } - GVL_UNLOCK_END(); + THREAD_BLOCKING_END(th); - thread_debug("native_sleep done\n"); + RUBY_DEBUG_LOG("done th:%u", rb_th_serial(th)); } -#ifdef USE_SIGNAL_THREAD_LIST -struct signal_thread_list { - rb_thread_t *th; - struct signal_thread_list *prev; - struct signal_thread_list *next; -}; - -static struct signal_thread_list signal_thread_list_anchor = { - 0, 0, 0, -}; - -#define FGLOCK(lock, body) do { \ - native_mutex_lock(lock); \ - { \ - body; \ - } \ - native_mutex_unlock(lock); \ -} while (0) +#ifdef USE_UBF_LIST +static CCAN_LIST_HEAD(ubf_list_head); +static rb_nativethread_lock_t ubf_list_lock = RB_NATIVETHREAD_LOCK_INIT; -#if 0 /* for debug */ static void -print_signal_list(char *str) +ubf_list_atfork(void) +{ + ccan_list_head_init(&ubf_list_head); + rb_native_mutex_initialize(&ubf_list_lock); +} + +RBIMPL_ATTR_MAYBE_UNUSED() +static bool +ubf_list_contain_p(rb_thread_t *th) { - struct signal_thread_list *list = - signal_thread_list_anchor.next; - thread_debug("list (%s)> ", str); - while (list) { - thread_debug("%p (%p), ", list->th, list->th->thread_id); - list = list->next; + rb_thread_t *list_th; + ccan_list_for_each(&ubf_list_head, list_th, sched.node.ubf) { + if (list_th == th) return true; } - thread_debug("\n"); + return false; } -#endif +/* The thread 'th' is registered to be trying unblock. */ static void -add_signal_thread_list(rb_thread_t *th) +register_ubf_list(rb_thread_t *th) { - if (!th->native_thread_data.signal_thread_list) { - FGLOCK(&signal_thread_list_lock, { - struct signal_thread_list *list = - malloc(sizeof(struct signal_thread_list)); - - if (list == 0) { - fprintf(stderr, "[FATAL] failed to allocate memory\n"); - exit(EXIT_FAILURE); - } + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + struct ccan_list_node *node = &th->sched.node.ubf; - list->th = th; + VM_ASSERT(th->unblock.func != NULL); - list->prev = &signal_thread_list_anchor; - list->next = signal_thread_list_anchor.next; - if (list->next) { - list->next->prev = list; - } - signal_thread_list_anchor.next = list; - th->native_thread_data.signal_thread_list = list; - }); + rb_native_mutex_lock(&ubf_list_lock); + { + // check not connected yet + if (ccan_list_empty((struct ccan_list_head*)node)) { + VM_ASSERT(!ubf_list_contain_p(th)); + ccan_list_add(&ubf_list_head, node); + } } + rb_native_mutex_unlock(&ubf_list_lock); + + timer_thread_wakeup(); } +/* The thread 'th' is unblocked. It no longer need to be registered. */ static void -remove_signal_thread_list(rb_thread_t *th) +unregister_ubf_list(rb_thread_t *th) { - if (th->native_thread_data.signal_thread_list) { - FGLOCK(&signal_thread_list_lock, { - struct signal_thread_list *list = - (struct signal_thread_list *) - th->native_thread_data.signal_thread_list; + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + struct ccan_list_node *node = &th->sched.node.ubf; + + /* we can't allow re-entry into ubf_list_head */ + VM_ASSERT(th->unblock.func == NULL); - list->prev->next = list->next; - if (list->next) { - list->next->prev = list->prev; - } - th->native_thread_data.signal_thread_list = 0; - list->th = 0; - free(list); /* ok */ - }); + if (!ccan_list_empty((struct ccan_list_head*)node)) { + rb_native_mutex_lock(&ubf_list_lock); + { + VM_ASSERT(ubf_list_contain_p(th)); + ccan_list_del_init(node); + } + rb_native_mutex_unlock(&ubf_list_lock); } } +/* + * send a signal to intent that a target thread return from blocking syscall. + * Maybe any signal is ok, but we chose SIGVTALRM. + */ static void -ubf_select_each(rb_thread_t *th) +ubf_wakeup_thread(rb_thread_t *th) { - thread_debug("ubf_select_each (%p)\n", (void *)th->thread_id); - if (th) { - pthread_kill(th->thread_id, SIGVTALRM); - } + RUBY_DEBUG_LOG("th:%u thread_id:%p", rb_th_serial(th), (void *)th->nt->thread_id); + + pthread_kill(th->nt->thread_id, SIGVTALRM); } static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; - add_signal_thread_list(th); - if (pthread_self() != timer_thread_id) - rb_thread_wakeup_timer_thread(); /* activate timer thread */ - ubf_select_each(th); + RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(th)); + ubf_wakeup_thread(th); + register_ubf_list(th); } -static void -ping_signal_thread_list(void) +static bool +ubf_threads_empty(void) { - if (signal_thread_list_anchor.next) { - FGLOCK(&signal_thread_list_lock, { - struct signal_thread_list *list; - - list = signal_thread_list_anchor.next; - while (list) { - ubf_select_each(list->th); - list = list->next; - } - }); - } + return ccan_list_empty(&ubf_list_head) != 0; } -static int -check_signal_thread_list(void) +static void +ubf_wakeup_all_threads(void) { - if (signal_thread_list_anchor.next) - return 1; - else - return 0; + rb_thread_t *th; + rb_native_mutex_lock(&ubf_list_lock); + { + ccan_list_for_each(&ubf_list_head, th, sched.node.ubf) { + ubf_wakeup_thread(th); + } + } + rb_native_mutex_unlock(&ubf_list_lock); } -#else /* USE_SIGNAL_THREAD_LIST */ -#define add_signal_thread_list(th) (void)(th) -#define remove_signal_thread_list(th) (void)(th) + +#else /* USE_UBF_LIST */ +#define register_ubf_list(th) (void)(th) +#define unregister_ubf_list(th) (void)(th) #define ubf_select 0 -static void ping_signal_thread_list(void) { return; } -static int check_signal_thread_list(void) { return 0; } -#endif /* USE_SIGNAL_THREAD_LIST */ +static void ubf_wakeup_all_threads(void) { return; } +static bool ubf_threads_empty(void) { return true; } +#define ubf_list_atfork() do {} while (0) +#endif /* USE_UBF_LIST */ #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_USEC (100 * 1000) +void +rb_thread_wakeup_timer_thread(int sig) +{ + // This function can be called from signal handlers so that + // pthread_mutex_lock() should not be used. -#if USE_SLEEPY_TIMER_THREAD -static int timer_thread_pipe[2] = {-1, -1}; -static int timer_thread_pipe_low[2] = {-1, -1}; /* low priority */ -static int timer_thread_pipe_owner_process; + // wakeup timer thread + timer_thread_wakeup_force(); -/* only use signal-safe system calls here */ -static void -rb_thread_wakeup_timer_thread_fd(int fd) -{ - ssize_t result; + // interrupt main thread if main thread is available + if (RUBY_ATOMIC_LOAD(system_working)) { + rb_vm_t *vm = GET_VM(); + rb_thread_t *main_th = vm->ractor.main_thread; - /* already opened */ - if (timer_thread_pipe_owner_process == getpid()) { - const char *buff = "!"; - retry: - if ((result = write(fd, buff, 1)) <= 0) { - switch (errno) { - case EINTR: goto retry; - case EAGAIN: -#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - break; - default: - rb_async_bug_errno("rb_thread_wakeup_timer_thread - write", errno); - } - } - if (TT_DEBUG) WRITE_CONST(2, "rb_thread_wakeup_timer_thread: write\n"); - } - else { - /* ignore wakeup */ - } -} + if (main_th) { + volatile rb_execution_context_t *main_th_ec = ACCESS_ONCE(rb_execution_context_t *, main_th->ec); -void -rb_thread_wakeup_timer_thread(void) -{ - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe[1]); -} + if (main_th_ec) { + RUBY_VM_SET_TRAP_INTERRUPT(main_th_ec); -static void -rb_thread_wakeup_timer_thread_low(void) -{ - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe_low[1]); + if (vm->ubf_async_safe && main_th->unblock.func) { + (main_th->unblock.func)(main_th->unblock.arg); + } + } + } + } } -/* VM-dependent API is not available for this function */ +#define CLOSE_INVALIDATE_PAIR(expr) \ + close_invalidate_pair(expr,"close_invalidate: "#expr) static void -consume_communication_pipe(int fd) +close_invalidate(int *fdp, const char *msg) { -#define CCP_READ_BUFF_SIZE 1024 - /* buffer can be shared because no one refers to them. */ - static char buff[CCP_READ_BUFF_SIZE]; - ssize_t result; + int fd = *fdp; - while (1) { - result = read(fd, buff, sizeof(buff)); - if (result == 0) { - return; - } - else if (result < 0) { - switch (errno) { - case EINTR: - continue; /* retry */ - case EAGAIN: - return; - default: - rb_async_bug_errno("consume_communication_pipe: read\n", errno); - } - } + *fdp = -1; + if (close(fd) < 0) { + async_bug_fd(msg, errno, fd); } } static void -close_communication_pipe(int pipes[2]) +close_invalidate_pair(int fds[2], const char *msg) { - if (close(pipes[0]) < 0) { - rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno); + if (USE_EVENTFD && fds[0] == fds[1]) { + fds[1] = -1; // disable write port first + close_invalidate(&fds[0], msg); } - if (close(pipes[1]) < 0) { - rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno); + else { + close_invalidate(&fds[1], msg); + close_invalidate(&fds[0], msg); } - pipes[0] = pipes[1] = -1; } static void @@ -1258,26 +2665,41 @@ set_nonblock(int fd) oflags = fcntl(fd, F_GETFL); if (oflags == -1) - rb_sys_fail(0); + rb_sys_fail(0); oflags |= O_NONBLOCK; err = fcntl(fd, F_SETFL, oflags); if (err == -1) - rb_sys_fail(0); + rb_sys_fail(0); } +/* communication pipe with timer thread and signal handler */ static void setup_communication_pipe_internal(int pipes[2]) { int err; - if (pipes[0] != -1) { - /* close pipe of parent process */ - close_communication_pipe(pipes); + if (pipes[0] > 0 || pipes[1] > 0) { + VM_ASSERT(pipes[0] > 0); + VM_ASSERT(pipes[1] > 0); + return; } + /* + * Don't bother with eventfd on ancient Linux 2.6.22..2.6.26 which were + * missing EFD_* flags, they can fall back to pipe + */ +#if USE_EVENTFD && defined(EFD_NONBLOCK) && defined(EFD_CLOEXEC) + pipes[0] = pipes[1] = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); + + if (pipes[0] >= 0) { + rb_update_max_fd(pipes[0]); + return; + } +#endif + err = rb_cloexec_pipe(pipes); if (err != 0) { - rb_bug_errno("setup_communication_pipe: Failed to create communication pipe for timer thread", errno); + rb_bug("can not create communication pipe"); } rb_update_max_fd(pipes[0]); rb_update_max_fd(pipes[1]); @@ -1285,211 +2707,491 @@ setup_communication_pipe_internal(int pipes[2]) set_nonblock(pipes[1]); } -/* communication pipe with timer thread and signal handler */ +#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) +# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) +#endif + +enum { + THREAD_NAME_MAX = +#if defined(__linux__) + 16 +#elif defined(__APPLE__) +/* Undocumented, and main thread seems unlimited */ + 64 +#else + 16 +#endif +}; + +static VALUE threadptr_invoke_proc_location(rb_thread_t *th); + static void -setup_communication_pipe(void) +native_set_thread_name(rb_thread_t *th) { - if (timer_thread_pipe_owner_process == getpid()) { - /* already set up. */ - return; +#ifdef SET_CURRENT_THREAD_NAME + VALUE loc; + if (!NIL_P(loc = th->name)) { + SET_CURRENT_THREAD_NAME(RSTRING_PTR(loc)); + } + else if ((loc = threadptr_invoke_proc_location(th)) != Qnil) { + char *name, *p; + char buf[THREAD_NAME_MAX]; + size_t len; + int n; + + name = RSTRING_PTR(RARRAY_AREF(loc, 0)); + p = strrchr(name, '/'); /* show only the basename of the path. */ + if (p && p[1]) + name = p + 1; + + n = snprintf(buf, sizeof(buf), "%s:%d", name, NUM2INT(RARRAY_AREF(loc, 1))); + RB_GC_GUARD(loc); + + len = (size_t)n; + if (len >= sizeof(buf)) { + buf[sizeof(buf)-2] = '*'; + buf[sizeof(buf)-1] = '\0'; + } + SET_CURRENT_THREAD_NAME(buf); } - setup_communication_pipe_internal(timer_thread_pipe); - setup_communication_pipe_internal(timer_thread_pipe_low); +#endif +} - /* validate pipe on this process */ - timer_thread_pipe_owner_process = getpid(); +static void +native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) +{ +#if defined SET_ANOTHER_THREAD_NAME || defined SET_CURRENT_THREAD_NAME + char buf[THREAD_NAME_MAX]; + const char *s = ""; +# if !defined SET_ANOTHER_THREAD_NAME + if (!pthread_equal(pthread_self(), thread_id)) return; +# endif + if (!NIL_P(name)) { + long n; + RSTRING_GETMEM(name, s, n); + if (n >= (int)sizeof(buf)) { + memcpy(buf, s, sizeof(buf)-1); + buf[sizeof(buf)-1] = '\0'; + s = buf; + } + } +# if defined SET_ANOTHER_THREAD_NAME + SET_ANOTHER_THREAD_NAME(thread_id, s); +# elif defined SET_CURRENT_THREAD_NAME + SET_CURRENT_THREAD_NAME(s); +# endif +#endif } -/** - * Let the timer thread sleep a while. - * - * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running. - * @pre the calling context is in the timer thread. - */ -static inline void -timer_thread_sleep(rb_global_vm_lock_t* gvl) +#if defined(RB_THREAD_T_HAS_NATIVE_ID) || defined(__APPLE__) +static VALUE +native_thread_native_thread_id(rb_thread_t *target_th) { - int result; - int need_polling; - struct pollfd pollfds[2]; + if (!target_th->nt) return Qnil; + +#ifdef RB_THREAD_T_HAS_NATIVE_ID + int tid = target_th->nt->tid; + if (tid == 0) return Qnil; + return INT2FIX(tid); +#elif defined(__APPLE__) + uint64_t tid; +/* The first condition is needed because MAC_OS_X_VERSION_10_6 + is not defined on 10.5, and while __POWERPC__ takes care of ppc/ppc64, + i386 will be broken without this. Note, 10.5 is supported with GCC upstream, + so it has C++17 and everything needed to build modern Ruby. */ +# if (!defined(MAC_OS_X_VERSION_10_6) || \ + (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_6) || \ + defined(__POWERPC__) /* never defined for PowerPC platforms */) + const bool no_pthread_threadid_np = true; +# define NO_PTHREAD_MACH_THREAD_NP 1 +# elif MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_6 + const bool no_pthread_threadid_np = false; +# else +# if !(defined(__has_attribute) && __has_attribute(availability)) + /* __API_AVAILABLE macro does nothing on gcc */ + __attribute__((weak)) int pthread_threadid_np(pthread_t, uint64_t*); +# endif + /* Check weakly linked symbol */ + const bool no_pthread_threadid_np = !&pthread_threadid_np; +# endif + if (no_pthread_threadid_np) { + return ULL2NUM(pthread_mach_thread_np(pthread_self())); + } +# ifndef NO_PTHREAD_MACH_THREAD_NP + int e = pthread_threadid_np(target_th->nt->thread_id, &tid); + if (e != 0) rb_syserr_fail(e, "pthread_threadid_np"); + return ULL2NUM((unsigned long long)tid); +# endif +#endif +} +# define USE_NATIVE_THREAD_NATIVE_THREAD_ID 1 +#else +# define USE_NATIVE_THREAD_NATIVE_THREAD_ID 0 +#endif + +static struct { + rb_serial_t created_fork_gen; + pthread_t pthread_id; + + int comm_fds[2]; // r, w + +#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS + int event_fd; // kernel event queue fd (epoll/kqueue) +#endif +#if HAVE_SYS_EPOLL_H && USE_MN_THREADS +#define EPOLL_EVENTS_MAX 0x10 + struct epoll_event finished_events[EPOLL_EVENTS_MAX]; +#elif HAVE_SYS_EVENT_H && USE_MN_THREADS +#define KQUEUE_EVENTS_MAX 0x10 + struct kevent finished_events[KQUEUE_EVENTS_MAX]; +#endif + + // waiting threads list + struct ccan_list_head waiting; // waiting threads in ractors + pthread_mutex_t waiting_lock; +} timer_th = { + .created_fork_gen = 0, +}; + +#define TIMER_THREAD_CREATED_P() (timer_th.created_fork_gen == current_fork_gen) + +static void timer_thread_check_timeslice(rb_vm_t *vm); +static int timer_thread_set_timeout(rb_vm_t *vm); +static void timer_thread_wakeup_thread(rb_thread_t *th, uint32_t event_serial); + +#include "thread_pthread_mn.c" + +static rb_thread_t * +thread_sched_waiting_thread(struct rb_thread_sched_waiting *w) +{ + if (w) { + return (rb_thread_t *)((size_t)w - offsetof(rb_thread_t, sched.waiting_reason)); + } + else { + return NULL; + } +} + +static int +timer_thread_set_timeout(rb_vm_t *vm) +{ +#if 0 + return 10; // ms +#else + int timeout = -1; + + ractor_sched_lock(vm, NULL); + { + if ( !ccan_list_empty(&vm->ractor.sched.timeslice_threads) // (1-1) Provide time slice for active NTs + || !ubf_threads_empty() // (1-3) Periodic UBF + || vm->ractor.sched.grq_cnt > 0 // (1-4) Lazy GRQ deq start + ) { + + RUBY_DEBUG_LOG("timeslice:%d ubf:%d grq:%d", + !ccan_list_empty(&vm->ractor.sched.timeslice_threads), + !ubf_threads_empty(), + (vm->ractor.sched.grq_cnt > 0)); + + timeout = 10; // ms + vm->ractor.sched.timeslice_wait_inf = false; + } + else { + vm->ractor.sched.timeslice_wait_inf = true; + } + } + ractor_sched_unlock(vm, NULL); + + // Always check waiting threads to find minimum timeout + // even when scheduler has work (grq_cnt > 0) + rb_native_mutex_lock(&timer_th.waiting_lock); + { + struct rb_thread_sched_waiting *w = ccan_list_top(&timer_th.waiting, struct rb_thread_sched_waiting, node); + rb_thread_t *th = thread_sched_waiting_thread(w); + + if (th && (th->sched.waiting_reason.flags & thread_sched_waiting_timeout)) { + rb_hrtime_t now = rb_hrtime_now(); + rb_hrtime_t hrrel = rb_hrtime_sub(th->sched.waiting_reason.data.timeout, now); + + RUBY_DEBUG_LOG("th:%u now:%lu rel:%lu", rb_th_serial(th), (unsigned long)now, (unsigned long)hrrel); + + // TODO: overflow? + int thread_timeout = (int)((hrrel + RB_HRTIME_PER_MSEC - 1) / RB_HRTIME_PER_MSEC); // ms + + // Use minimum of scheduler timeout and thread sleep timeout + if (timeout < 0 || thread_timeout < timeout) { + timeout = thread_timeout; + } + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); + + RUBY_DEBUG_LOG("timeout:%d inf:%d", timeout, (int)vm->ractor.sched.timeslice_wait_inf); + + // fprintf(stderr, "timeout:%d\n", timeout); + return timeout; +#endif +} - pollfds[0].fd = timer_thread_pipe[0]; - pollfds[0].events = POLLIN; - pollfds[1].fd = timer_thread_pipe_low[0]; - pollfds[1].events = POLLIN; +static void +timer_thread_check_signal(rb_vm_t *vm) +{ + // ruby_sigchld_handler(vm); TODO - need_polling = check_signal_thread_list(); + int signum = rb_signal_buff_size(); + if (UNLIKELY(signum > 0) && vm->ractor.main_thread) { + RUBY_DEBUG_LOG("signum:%d", signum); + threadptr_trap_interrupt(vm->ractor.main_thread); + } +} - if (gvl->waiting > 0 || need_polling) { - /* polling (TIME_QUANTUM_USEC usec) */ - result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); +static bool +timer_thread_check_exceed(rb_hrtime_t abs, rb_hrtime_t now) +{ + if (abs < now) { + return true; + } + else if (abs - now < RB_HRTIME_PER_MSEC) { + return true; // too short time } else { - /* wait (infinite) */ - result = poll(pollfds, ARRAY_SIZE(pollfds), -1); + return false; } +} + +static rb_thread_t * +timer_thread_deq_wakeup(rb_vm_t *vm, rb_hrtime_t now, uint32_t *event_serial) +{ + struct rb_thread_sched_waiting *w = ccan_list_top(&timer_th.waiting, struct rb_thread_sched_waiting, node); + + if (w != NULL && + (w->flags & thread_sched_waiting_timeout) && + timer_thread_check_exceed(w->data.timeout, now)) { - if (result == 0) { - /* maybe timeout */ + RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(thread_sched_waiting_thread(w))); + + // delete from waiting list + ccan_list_del_init(&w->node); + + // setup result + w->flags = thread_sched_waiting_none; + w->data.result = 0; + + rb_thread_t *th = thread_sched_waiting_thread(w); + *event_serial = w->data.event_serial; + return th; } - else if (result > 0) { - consume_communication_pipe(timer_thread_pipe[0]); - consume_communication_pipe(timer_thread_pipe_low[0]); + + return NULL; +} + +static void +timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial) +{ + if (sched->running != th && th->sched.event_serial == event_serial) { + thread_sched_to_ready_common(sched, th, true, false); } - else { /* result < 0 */ - switch (errno) { - case EBADF: - case EINVAL: - case ENOMEM: /* from Linux man */ - case EFAULT: /* from FreeBSD man */ - rb_async_bug_errno("thread_timer: select", errno); - default: - /* ignore */; - } +} + +static void +timer_thread_wakeup_thread(rb_thread_t *th, uint32_t event_serial) +{ + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + struct rb_thread_sched *sched = TH_SCHED(th); + + thread_sched_lock(sched, th); + { + timer_thread_wakeup_thread_locked(sched, th, event_serial); } + thread_sched_unlock(sched, th); } -#else /* USE_SLEEPY_TIMER_THREAD */ -# define PER_NANO 1000000000 -void rb_thread_wakeup_timer_thread(void) {} -static void rb_thread_wakeup_timer_thread_low(void) {} +static void +timer_thread_check_timeout(rb_vm_t *vm) +{ + rb_hrtime_t now = rb_hrtime_now(); + rb_thread_t *th; + uint32_t event_serial; -static pthread_mutex_t timer_thread_lock; -static rb_thread_cond_t timer_thread_cond; + rb_native_mutex_lock(&timer_th.waiting_lock); + { + while ((th = timer_thread_deq_wakeup(vm, now, &event_serial)) != NULL) { + rb_native_mutex_unlock(&timer_th.waiting_lock); + timer_thread_wakeup_thread(th, event_serial); + rb_native_mutex_lock(&timer_th.waiting_lock); + } + } + rb_native_mutex_unlock(&timer_th.waiting_lock); +} -static inline void -timer_thread_sleep(rb_global_vm_lock_t* unused) +static void +timer_thread_check_timeslice(rb_vm_t *vm) { - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; - ts = native_cond_timeout(&timer_thread_cond, ts); + // TODO: check time + rb_thread_t *th; + ccan_list_for_each(&vm->ractor.sched.timeslice_threads, th, sched.node.timeslice_threads) { + RUBY_DEBUG_LOG("timeslice th:%u", rb_th_serial(th)); + RUBY_VM_SET_TIMER_INTERRUPT(th->ec); + } +} - native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts); +void +rb_assert_sig(void) +{ + sigset_t oldmask; + pthread_sigmask(0, NULL, &oldmask); + if (sigismember(&oldmask, SIGVTALRM)) { + rb_bug("!!!"); + } + else { + RUBY_DEBUG_LOG("ok"); + } } -#endif /* USE_SLEEPY_TIMER_THREAD */ static void * -thread_timer(void *p) +timer_thread_func(void *ptr) { - rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p; + rb_vm_t *vm = (rb_vm_t *)ptr; +#if defined(RUBY_NT_SERIAL) + ruby_nt_serial = (rb_atomic_t)-1; +#endif + + RUBY_DEBUG_LOG("started%s", ""); - if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n"); + while (RUBY_ATOMIC_LOAD(system_working)) { + timer_thread_check_signal(vm); + timer_thread_check_timeout(vm); + ubf_wakeup_all_threads(); + + RUBY_DEBUG_LOG("system_working:%d", RUBY_ATOMIC_LOAD(system_working)); + timer_thread_polling(vm); + } -#if defined(__linux__) && defined(PR_SET_NAME) - prctl(PR_SET_NAME, "ruby-timer-thr"); + RUBY_DEBUG_LOG("terminated"); + return NULL; +} + +/* only use signal-safe system calls here */ +static void +signal_communication_pipe(int fd) +{ +#if USE_EVENTFD + const uint64_t buff = 1; +#else + const char buff = '!'; #endif + ssize_t result; -#if !USE_SLEEPY_TIMER_THREAD - native_mutex_initialize(&timer_thread_lock); - native_cond_initialize(&timer_thread_cond, RB_CONDATTR_CLOCK_MONOTONIC); - native_mutex_lock(&timer_thread_lock); + /* already opened */ + if (fd >= 0) { + retry: + if ((result = write(fd, &buff, sizeof(buff))) <= 0) { + int e = errno; + switch (e) { + case EINTR: goto retry; + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: #endif - while (system_working > 0) { + break; + default: + async_bug_fd("rb_thread_wakeup_timer_thread: write", e, fd); + } + } + if (TT_DEBUG) WRITE_CONST(2, "rb_thread_wakeup_timer_thread: write\n"); + } + else { + // ignore wakeup + } +} - /* timer function */ - ping_signal_thread_list(); - timer_thread_function(0); +static void +timer_thread_wakeup_force(void) +{ + // should not use RUBY_DEBUG_LOG() because it can be called within signal handlers. + signal_communication_pipe(timer_th.comm_fds[1]); +} - if (TT_DEBUG) WRITE_CONST(2, "tick\n"); +static void +timer_thread_wakeup_locked(rb_vm_t *vm) +{ + // should be locked before. + ASSERT_ractor_sched_locked(vm, NULL); - /* wait */ - timer_thread_sleep(gvl); + if (timer_th.created_fork_gen == current_fork_gen) { + if (vm->ractor.sched.timeslice_wait_inf) { + RUBY_DEBUG_LOG("wakeup with fd:%d", timer_th.comm_fds[1]); + timer_thread_wakeup_force(); + } + else { + RUBY_DEBUG_LOG("will be wakeup..."); + } } -#if !USE_SLEEPY_TIMER_THREAD - native_mutex_unlock(&timer_thread_lock); - native_cond_destroy(&timer_thread_cond); - native_mutex_destroy(&timer_thread_lock); -#endif +} - if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n"); - return NULL; +static void +timer_thread_wakeup(void) +{ + rb_vm_t *vm = GET_VM(); + + ractor_sched_lock(vm, NULL); + { + timer_thread_wakeup_locked(vm); + } + ractor_sched_unlock(vm, NULL); } static void rb_thread_create_timer_thread(void) { - if (!timer_thread_id) { - int err; -#ifdef HAVE_PTHREAD_ATTR_INIT - pthread_attr_t attr; + rb_serial_t created_fork_gen = timer_th.created_fork_gen; - err = pthread_attr_init(&attr); - if (err != 0) { - fprintf(stderr, "[FATAL] Failed to initialize pthread attr(errno: %d)\n", err); - exit(EXIT_FAILURE); - } -# ifdef PTHREAD_STACK_MIN - { - const size_t min_size = (4096 * 4); - /* Allocate the machine stack for the timer thread - * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes - * machine stack overflow only with PTHREAD_STACK_MIN. - */ - size_t stack_size = PTHREAD_STACK_MIN; /* may be dynamic, get only once */ - if (stack_size < min_size) stack_size = min_size; - if (THREAD_DEBUG) stack_size += BUFSIZ; - pthread_attr_setstacksize(&attr, stack_size); - } -# endif -#endif + RUBY_DEBUG_LOG("fork_gen create:%d current:%d", (int)created_fork_gen, (int)current_fork_gen); -#if USE_SLEEPY_TIMER_THREAD - setup_communication_pipe(); -#endif /* USE_SLEEPY_TIMER_THREAD */ + timer_th.created_fork_gen = current_fork_gen; - /* create timer thread */ - if (timer_thread_id) { - rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n"); - } -#ifdef HAVE_PTHREAD_ATTR_INIT - err = pthread_create(&timer_thread_id, &attr, thread_timer, &GET_VM()->gvl); -#else - err = pthread_create(&timer_thread_id, NULL, thread_timer, &GET_VM()->gvl); -#endif - if (err != 0) { - fprintf(stderr, "[FATAL] Failed to create timer thread (errno: %d)\n", err); - exit(EXIT_FAILURE); - } -#ifdef HAVE_PTHREAD_ATTR_INIT - pthread_attr_destroy(&attr); + if (created_fork_gen != current_fork_gen) { + if (created_fork_gen != 0) { + RUBY_DEBUG_LOG("forked child process"); + + CLOSE_INVALIDATE_PAIR(timer_th.comm_fds); +#if HAVE_SYS_EPOLL_H && USE_MN_THREADS + close_invalidate(&timer_th.event_fd, "close event_fd"); #endif + rb_native_mutex_destroy(&timer_th.waiting_lock); + } + + ccan_list_head_init(&timer_th.waiting); + rb_native_mutex_initialize(&timer_th.waiting_lock); + + // open communication channel + setup_communication_pipe_internal(timer_th.comm_fds); + + // open event fd + timer_thread_setup_mn(); } + + pthread_create(&timer_th.pthread_id, NULL, timer_thread_func, GET_VM()); } static int -native_stop_timer_thread(int close_anyway) +native_stop_timer_thread(void) { - int stopped; - stopped = --system_working <= 0; + RUBY_ATOMIC_SET(system_working, 0); + + RUBY_DEBUG_LOG("wakeup send %d", timer_th.comm_fds[1]); + timer_thread_wakeup_force(); + RUBY_DEBUG_LOG("wakeup sent"); + pthread_join(timer_th.pthread_id, NULL); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); - if (stopped) { - /* join */ - rb_thread_wakeup_timer_thread(); - native_thread_join(timer_thread_id); - if (TT_DEBUG) fprintf(stderr, "joined timer thread\n"); - timer_thread_id = 0; - /* close communication pipe */ - if (close_anyway) { - /* TODO: Uninstall all signal handlers or mask all signals. - * This pass is cleaning phase (terminate ruby process). - * To avoid such race, we skip to close communication - * pipe. OS will close it at process termination. - * It may not good practice, but pragmatic. - * We remain it is TODO. - */ - /* close_communication_pipe(); */ - } - } - return stopped; + return 1; } static void native_reset_timer_thread(void) { - if (TT_DEBUG) fprintf(stderr, "reset timer thread\n"); + // } #ifdef HAVE_SIGALTSTACK @@ -1502,26 +3204,35 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) STACK_GROW_DIR_DETECTION; if (th) { - size = th->machine_stack_maxsize; - base = (char *)th->machine_stack_start - STACK_DIR_UPPER(0, size); + size = th->ec->machine.stack_maxsize; + base = (char *)th->ec->machine.stack_start - STACK_DIR_UPPER(0, size); } #ifdef STACKADDR_AVAILABLE else if (get_stack(&base, &size) == 0) { - STACK_DIR_UPPER((void)(base = (char *)base + size), (void)0); +# ifdef __APPLE__ + if (pthread_equal(th->nt->thread_id, native_main_thread.id)) { + struct rlimit rlim; + if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur > size) { + size = (size_t)rlim.rlim_cur; + } + } +# endif + base = (char *)base + STACK_DIR_UPPER(+size, -size); } #endif else { - return 0; + return 0; } + size /= RUBY_STACK_SPACE_RATIO; if (size > water_mark) size = water_mark; if (IS_STACK_DIR_UPPER()) { - if (size > ~(size_t)base+1) size = ~(size_t)base+1; - if (addr > base && addr <= (void *)((char *)base + size)) return 1; + if (size > ~(size_t)base+1) size = ~(size_t)base+1; + if (addr > base && addr <= (void *)((char *)base + size)) return 1; } else { - if (size > (size_t)base) size = (size_t)base; - if (addr > (void *)((char *)base - size) && addr <= base) return 1; + if (size > (size_t)base) size = (size_t)base; + if (addr > (void *)((char *)base - size) && addr <= base) return 1; } return 0; } @@ -1530,19 +3241,266 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { -#if USE_SLEEPY_TIMER_THREAD - if (fd == timer_thread_pipe[0] || - fd == timer_thread_pipe[1] || - fd == timer_thread_pipe_low[0] || - fd == timer_thread_pipe_low[1]) { - return 1; + /* no false-positive if out-of-FD at startup */ + if (fd < 0) return 0; + + if (fd == timer_th.comm_fds[0] || + fd == timer_th.comm_fds[1] +#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS + || fd == timer_th.event_fd +#endif + ) { + goto check_fork_gen; + } + return 0; + + check_fork_gen: + if (timer_th.created_fork_gen == current_fork_gen) { + /* async-signal-safe */ + return 1; } else { - return 0; + return 0; } -#else - return 0; +} + +rb_nativethread_id_t +rb_nativethread_self(void) +{ + return pthread_self(); +} + +#if defined(USE_POLL) && !defined(HAVE_PPOLL) +/* TODO: don't ignore sigmask */ +static int +ruby_ppoll(struct pollfd *fds, nfds_t nfds, + const struct timespec *ts, const sigset_t *sigmask) +{ + int timeout_ms; + + if (ts) { + int tmp, tmp2; + + if (ts->tv_sec > INT_MAX/1000) + timeout_ms = INT_MAX; + else { + tmp = (int)(ts->tv_sec * 1000); + /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ + tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); + if (INT_MAX - tmp < tmp2) + timeout_ms = INT_MAX; + else + timeout_ms = (int)(tmp + tmp2); + } + } + else + timeout_ms = -1; + + return poll(fds, nfds, timeout_ms); +} +# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) #endif + +/* + * Single CPU setups benefit from explicit sched_yield() before ppoll(), + * since threads may be too starved to enter the GVL waitqueue for + * us to detect contention. Instead, we want to kick other threads + * so they can run and possibly prevent us from entering slow paths + * in ppoll() or similar syscalls. + * + * Confirmed on FreeBSD 11.2 and Linux 4.19. + * [ruby-core:90417] [Bug #15398] + */ +#define THREAD_BLOCKING_YIELD(th) do { \ + const rb_thread_t *next_th; \ + struct rb_thread_sched *sched = TH_SCHED(th); \ + RB_VM_SAVE_MACHINE_CONTEXT(th); \ + thread_sched_to_waiting(sched, (th)); \ + next_th = sched->running; \ + rb_native_mutex_unlock(&sched->lock_); \ + native_thread_yield(); /* TODO: needed? */ \ + if (!next_th && rb_ractor_living_thread_num(th->ractor) > 1) { \ + native_thread_yield(); \ + } + +static void +native_sleep(rb_thread_t *th, rb_hrtime_t *rel) +{ + struct rb_thread_sched *sched = TH_SCHED(th); + + RUBY_DEBUG_LOG("rel:%d", rel ? (int)*rel : 0); + if (rel) { + if (th_has_dedicated_nt(th)) { + native_cond_sleep(th, rel); + } + else { + thread_sched_wait_events(sched, th, -1, thread_sched_waiting_timeout, rel); + } + } + else { + thread_sched_to_waiting_until_wakeup(sched, th); + } + + RUBY_DEBUG_LOG("wakeup"); +} + +// fork read-write lock (only for pthread) +static pthread_rwlock_t rb_thread_fork_rw_lock = PTHREAD_RWLOCK_INITIALIZER; + +void +rb_thread_release_fork_lock(void) +{ + int r; + if ((r = pthread_rwlock_unlock(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_unlock", r); + } +} + +void +rb_thread_reset_fork_lock(void) +{ + int r; + if ((r = pthread_rwlock_destroy(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_destroy", r); + } + + if ((r = pthread_rwlock_init(&rb_thread_fork_rw_lock, NULL))) { + rb_bug_errno("pthread_rwlock_init", r); + } +} + +void * +rb_thread_prevent_fork(void *(*func)(void *), void *data) +{ + int r; + if ((r = pthread_rwlock_rdlock(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_rdlock", r); + } + void *result = func(data); + rb_thread_release_fork_lock(); + return result; +} + +void +rb_thread_acquire_fork_lock(void) +{ + int r; + if ((r = pthread_rwlock_wrlock(&rb_thread_fork_rw_lock))) { + rb_bug_errno("pthread_rwlock_wrlock", r); + } +} + +// thread internal event hooks (only for pthread) + +struct rb_internal_thread_event_hook { + rb_internal_thread_event_callback callback; + rb_event_flag_t event; + void *user_data; + + struct rb_internal_thread_event_hook *next; +}; + +static pthread_rwlock_t rb_internal_thread_event_hooks_rw_lock = PTHREAD_RWLOCK_INITIALIZER; + +rb_internal_thread_event_hook_t * +rb_internal_thread_add_event_hook(rb_internal_thread_event_callback callback, rb_event_flag_t internal_event, void *user_data) +{ + rb_internal_thread_event_hook_t *hook = ALLOC_N(rb_internal_thread_event_hook_t, 1); + hook->callback = callback; + hook->user_data = user_data; + hook->event = internal_event; + + int r; + if ((r = pthread_rwlock_wrlock(&rb_internal_thread_event_hooks_rw_lock))) { + rb_bug_errno("pthread_rwlock_wrlock", r); + } + + hook->next = rb_internal_thread_event_hooks; + ATOMIC_PTR_EXCHANGE(rb_internal_thread_event_hooks, hook); + + if ((r = pthread_rwlock_unlock(&rb_internal_thread_event_hooks_rw_lock))) { + rb_bug_errno("pthread_rwlock_unlock", r); + } + return hook; +} + +bool +rb_internal_thread_remove_event_hook(rb_internal_thread_event_hook_t * hook) +{ + int r; + if ((r = pthread_rwlock_wrlock(&rb_internal_thread_event_hooks_rw_lock))) { + rb_bug_errno("pthread_rwlock_wrlock", r); + } + + bool success = FALSE; + + if (rb_internal_thread_event_hooks == hook) { + ATOMIC_PTR_EXCHANGE(rb_internal_thread_event_hooks, hook->next); + success = TRUE; + } + else { + rb_internal_thread_event_hook_t *h = rb_internal_thread_event_hooks; + + do { + if (h->next == hook) { + h->next = hook->next; + success = TRUE; + break; + } + } while ((h = h->next)); + } + + if ((r = pthread_rwlock_unlock(&rb_internal_thread_event_hooks_rw_lock))) { + rb_bug_errno("pthread_rwlock_unlock", r); + } + + if (success) { + ruby_xfree(hook); + } + return success; +} + +static void +rb_thread_execute_hooks(rb_event_flag_t event, rb_thread_t *th) +{ + int r; + if ((r = pthread_rwlock_rdlock(&rb_internal_thread_event_hooks_rw_lock))) { + rb_bug_errno("pthread_rwlock_rdlock", r); + } + + if (rb_internal_thread_event_hooks) { + rb_internal_thread_event_hook_t *h = rb_internal_thread_event_hooks; + do { + if (h->event & event) { + rb_internal_thread_event_data_t event_data = { + .thread = th->self, + }; + (*h->callback)(event, &event_data, h->user_data); + } + } while((h = h->next)); + } + if ((r = pthread_rwlock_unlock(&rb_internal_thread_event_hooks_rw_lock))) { + rb_bug_errno("pthread_rwlock_unlock", r); + } +} + +// return true if the current thread acquires DNT. +// return false if the current thread already acquires DNT. +bool +rb_thread_lock_native_thread(void) +{ + rb_thread_t *th = GET_THREAD(); + bool is_snt = th->nt->dedicated == 0; + native_thread_dedicated_inc(th->vm, th->ractor, th->nt); + + return is_snt; +} + +void +rb_thread_malloc_stack_set(rb_thread_t *th, void *stack) +{ + th->sched.malloc_stack = true; + th->sched.context_stack = stack; } #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ |
