diff options
Diffstat (limited to 'thread_pthread.c')
| -rw-r--r-- | thread_pthread.c | 2104 |
1 files changed, 1322 insertions, 782 deletions
diff --git a/thread_pthread.c b/thread_pthread.c index 5295a52f28..6f0cc3d54c 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -12,6 +12,7 @@ #ifdef THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION #include "gc.h" +#include "mjit.h" #ifdef HAVE_SYS_RESOURCE_H #include <sys/resource.h> @@ -19,174 +20,399 @@ #ifdef HAVE_THR_STKSEGMENT #include <thread.h> #endif -#if HAVE_FCNTL_H +#if defined(HAVE_FCNTL_H) #include <fcntl.h> -#elif HAVE_SYS_FCNTL_H +#elif defined(HAVE_SYS_FCNTL_H) #include <sys/fcntl.h> #endif #ifdef HAVE_SYS_PRCTL_H #include <sys/prctl.h> #endif -#if defined(__native_client__) && defined(NACL_NEWLIB) -# include "nacl/select.h" -#endif #if defined(HAVE_SYS_TIME_H) #include <sys/time.h> #endif #if defined(__HAIKU__) #include <kernel/OS.h> #endif +#ifdef __linux__ +#include <sys/syscall.h> /* for SYS_gettid */ +#endif +#include <time.h> +#include <signal.h> + +#if defined(HAVE_SYS_EVENTFD_H) && defined(HAVE_EVENTFD) +# define USE_EVENTFD (1) +# include <sys/eventfd.h> +#else +# define USE_EVENTFD (0) +#endif + +#if defined(SIGVTALRM) && !defined(__CYGWIN__) && !defined(__EMSCRIPTEN__) +# define USE_UBF_LIST 1 +#endif + +/* + * UBF_TIMER and ubf_list both use SIGVTALRM. + * + * UBF_TIMER has NOTHING to do with thread timeslices (TIMER_INTERRUPT_MASK) + * + * UBF_TIMER is to close TOCTTOU signal race on programs where we + * cannot rely on GVL contention (vm->gvl.timer) to perform wakeups + * while a thread is doing blocking I/O on sockets or pipes. With + * rb_thread_call_without_gvl and similar functions: + * + * (1) Check interrupts. + * (2) release GVL. + * (2a) signal received + * (3) call func with data1 (blocks for a long time without ubf_timer) + * (4) acquire GVL. + * Other Ruby threads can not run in parallel any more. + * (5) Check interrupts. + * + * We need UBF_TIMER to break out of (3) if (2a) happens. + * + * ubf_list wakeups may be triggered on gvl_yield. + * + * If we have vm->gvl.timer (on GVL contention), we don't need UBF_TIMER + * as it can perform the same tasks while doing timeslices. + */ +#define UBF_TIMER_NONE 0 +#define UBF_TIMER_POSIX 1 +#define UBF_TIMER_PTHREAD 2 + +#ifndef UBF_TIMER +# if defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_CREATE) && \ + defined(CLOCK_MONOTONIC) && defined(USE_UBF_LIST) + /* preferred */ +# define UBF_TIMER UBF_TIMER_POSIX +# elif defined(USE_UBF_LIST) + /* safe, but inefficient */ +# define UBF_TIMER UBF_TIMER_PTHREAD +# else + /* we'll be racy without SIGVTALRM for ubf_list */ +# define UBF_TIMER UBF_TIMER_NONE +# endif +#endif + +enum rtimer_state { + /* alive, after timer_create: */ + RTIMER_DISARM, + RTIMER_ARMING, + RTIMER_ARMED, + + RTIMER_DEAD +}; -static void native_mutex_lock(rb_nativethread_lock_t *lock); -static void native_mutex_unlock(rb_nativethread_lock_t *lock); -static int native_mutex_trylock(rb_nativethread_lock_t *lock); -static void native_mutex_initialize(rb_nativethread_lock_t *lock); -static void native_mutex_destroy(rb_nativethread_lock_t *lock); -static void native_cond_signal(rb_nativethread_cond_t *cond); -static void native_cond_broadcast(rb_nativethread_cond_t *cond); -static void native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); -static void native_cond_initialize(rb_nativethread_cond_t *cond, int flags); -static void native_cond_destroy(rb_nativethread_cond_t *cond); -static void rb_thread_wakeup_timer_thread_low(void); +#if UBF_TIMER == UBF_TIMER_POSIX +static const struct itimerspec zero; static struct { - pthread_t id; - int created; -} timer_thread; -#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0) + rb_atomic_t state_; /* rtimer_state */ + rb_pid_t owner; + timer_t timerid; +} timer_posix = { + /* .state = */ RTIMER_DEAD, +}; -#define RB_CONDATTR_CLOCK_MONOTONIC 1 +#define TIMER_STATE_DEBUG 0 -#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined(HAVE_CLOCKID_T) && \ - defined(CLOCK_REALTIME) && defined(CLOCK_MONOTONIC) && \ - defined(HAVE_CLOCK_GETTIME) && defined(HAVE_PTHREAD_CONDATTR_INIT) -#define USE_MONOTONIC_COND 1 +static const char * +rtimer_state_name(enum rtimer_state state) +{ + switch (state) { + case RTIMER_DISARM: return "disarm"; + case RTIMER_ARMING: return "arming"; + case RTIMER_ARMED: return "armed"; + case RTIMER_DEAD: return "dead"; + default: rb_bug("unreachable"); + } +} + +static enum rtimer_state +timer_state_exchange(enum rtimer_state state) +{ + enum rtimer_state prev = ATOMIC_EXCHANGE(timer_posix.state_, state); + if (TIMER_STATE_DEBUG) fprintf(stderr, "state (exc): %s->%s\n", rtimer_state_name(prev), rtimer_state_name(state)); + return prev; +} + +static enum rtimer_state +timer_state_cas(enum rtimer_state expected_prev, enum rtimer_state state) +{ + enum rtimer_state prev = ATOMIC_CAS(timer_posix.state_, expected_prev, state); + + if (TIMER_STATE_DEBUG) { + if (prev == expected_prev) { + fprintf(stderr, "state (cas): %s->%s\n", rtimer_state_name(prev), rtimer_state_name(state)); + } + else { + fprintf(stderr, "state (cas): %s (expected:%s)\n", rtimer_state_name(prev), rtimer_state_name(expected_prev)); + } + } + + return prev; +} + +#elif UBF_TIMER == UBF_TIMER_PTHREAD +static void *timer_pthread_fn(void *); +static struct { + int low[2]; + rb_atomic_t armed; /* boolean */ + rb_pid_t owner; + pthread_t thid; +} timer_pthread = { + { -1, -1 }, +}; +#endif + +static const rb_hrtime_t *sigwait_timeout(rb_thread_t *, int sigwait_fd, + const rb_hrtime_t *, + int *drained_p); +static void ubf_timer_disarm(void); +static void threadptr_trap_interrupt(rb_thread_t *); +static void clear_thread_cache_altstack(void); +static void ubf_wakeup_all_threads(void); +static int ubf_threads_empty(void); + +#define TIMER_THREAD_CREATED_P() (signal_self_pipe.owner_process == getpid()) + +/* for testing, and in case we come across a platform w/o pipes: */ +#define BUSY_WAIT_SIGNALS (0) + +/* + * sigwait_th is the thread which owns sigwait_fd and sleeps on it + * (using ppoll). MJIT worker can be sigwait_th==0, so we initialize + * it to THREAD_INVALID at startup and fork time. It is the ONLY thread + * allowed to read from sigwait_fd, otherwise starvation can occur. + */ +#define THREAD_INVALID ((const rb_thread_t *)-1) +static const rb_thread_t *sigwait_th; + +#ifdef HAVE_SCHED_YIELD +#define native_thread_yield() (void)sched_yield() #else -#define USE_MONOTONIC_COND 0 +#define native_thread_yield() ((void)0) #endif -#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) && defined(O_NONBLOCK) && !defined(__native_client__) -/* The timer thread sleeps while only one Ruby thread is running. */ -# define USE_SLEEPY_TIMER_THREAD 1 +#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && \ + defined(CLOCK_REALTIME) && defined(CLOCK_MONOTONIC) && \ + defined(HAVE_CLOCK_GETTIME) +static pthread_condattr_t condattr_mono; +static pthread_condattr_t *condattr_monotonic = &condattr_mono; #else -# define USE_SLEEPY_TIMER_THREAD 0 +static const void *const condattr_monotonic = NULL; #endif +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_MSEC (100) +#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) +#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) + +static rb_hrtime_t native_cond_timeout(rb_nativethread_cond_t *, rb_hrtime_t); +static int native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs); + +/* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ +static int +designate_timer_thread(rb_global_vm_lock_t *gvl) +{ + native_thread_data_t *last; + + last = list_tail(&gvl->waitq, native_thread_data_t, node.ubf); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + return TRUE; + } + return FALSE; +} + +/* + * We become designated timer thread to kick vm->gvl.owner + * periodically. Continue on old timeout if it expired. + */ static void -gvl_acquire_common(rb_vm_t *vm) +do_gvl_timer(rb_global_vm_lock_t *gvl, rb_thread_t *th) { - if (vm->gvl.acquired) { + rb_vm_t *vm = GET_VM(); + static rb_hrtime_t abs; + native_thread_data_t *nd = &th->native_thread_data; - vm->gvl.waiting++; - if (vm->gvl.waiting == 1) { - /* - * Wake up timer thread iff timer thread is slept. - * When timer thread is polling mode, we don't want to - * make confusing timer thread interval time. - */ - rb_thread_wakeup_timer_thread_low(); - } + gvl->timer = th; - while (vm->gvl.acquired) { - native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); - } + /* take over wakeups from UBF_TIMER */ + ubf_timer_disarm(); + + if (gvl->timer_err == ETIMEDOUT) { + abs = native_cond_timeout(&nd->cond.gvlq, TIME_QUANTUM_NSEC); + } + gvl->timer_err = native_cond_timedwait(&nd->cond.gvlq, &gvl->lock, &abs); - vm->gvl.waiting--; + ubf_wakeup_all_threads(); + ruby_sigchld_handler(vm); - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; - native_cond_signal(&vm->gvl.switch_cond); - } + if (UNLIKELY(rb_signal_buff_size())) { + if (th == vm->ractor.main_thread) { + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + } + else { + threadptr_trap_interrupt(vm->ractor.main_thread); + } } - vm->gvl.acquired = 1; + /* + * Timeslice. Warning: the process may fork while this + * thread is contending for GVL: + */ + if (gvl->owner) { + // strictly speaking, accessing "gvl->owner" is not thread-safe + RUBY_VM_SET_TIMER_INTERRUPT(gvl->owner->ec); + } + gvl->timer = 0; } static void -gvl_acquire(rb_vm_t *vm, rb_thread_t *th) +gvl_acquire_common(rb_global_vm_lock_t *gvl, rb_thread_t *th) { - native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm); - native_mutex_unlock(&vm->gvl.lock); + if (gvl->owner) { + native_thread_data_t *nd = &th->native_thread_data; + + VM_ASSERT(th->unblock.func == 0 && + "we must not be in ubf_list and GVL waitq at the same time"); + + list_add_tail(&gvl->waitq, &nd->node.gvl); + + do { + if (!gvl->timer) { + do_gvl_timer(gvl, th); + } + else { + rb_native_cond_wait(&nd->cond.gvlq, &gvl->lock); + } + } while (gvl->owner); + + list_del_init(&nd->node.gvl); + + if (gvl->need_yield) { + gvl->need_yield = 0; + rb_native_cond_signal(&gvl->switch_cond); + } + } + else { /* reset timer if uncontended */ + gvl->timer_err = ETIMEDOUT; + } + gvl->owner = th; + if (!gvl->timer) { + if (!designate_timer_thread(gvl) && !ubf_threads_empty()) { + rb_thread_wakeup_timer_thread(-1); + } + } } static void -gvl_release_common(rb_vm_t *vm) +gvl_acquire(rb_global_vm_lock_t *gvl, rb_thread_t *th) +{ + rb_native_mutex_lock(&gvl->lock); + gvl_acquire_common(gvl, th); + rb_native_mutex_unlock(&gvl->lock); +} + +static const native_thread_data_t * +gvl_release_common(rb_global_vm_lock_t *gvl) { - vm->gvl.acquired = 0; - if (vm->gvl.waiting > 0) - native_cond_signal(&vm->gvl.cond); + native_thread_data_t *next; + gvl->owner = 0; + next = list_top(&gvl->waitq, native_thread_data_t, node.ubf); + if (next) rb_native_cond_signal(&next->cond.gvlq); + + return next; } static void -gvl_release(rb_vm_t *vm) +gvl_release(rb_global_vm_lock_t *gvl) { - native_mutex_lock(&vm->gvl.lock); - gvl_release_common(vm); - native_mutex_unlock(&vm->gvl.lock); + rb_native_mutex_lock(&gvl->lock); + gvl_release_common(gvl); + rb_native_mutex_unlock(&gvl->lock); } static void -gvl_yield(rb_vm_t *vm, rb_thread_t *th) +gvl_yield(rb_global_vm_lock_t *gvl, rb_thread_t *th) { - native_mutex_lock(&vm->gvl.lock); + const native_thread_data_t *next; - gvl_release_common(vm); + /* + * Perhaps other threads are stuck in blocking region w/o GVL, too, + * (perhaps looping in io_close_fptr) so we kick them: + */ + ubf_wakeup_all_threads(); + rb_native_mutex_lock(&gvl->lock); + next = gvl_release_common(gvl); /* An another thread is processing GVL yield. */ - if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) - native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); - goto acquire; - } - - if (vm->gvl.waiting > 0) { - /* Wait until another thread task take GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) - native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; + if (UNLIKELY(gvl->wait_yield)) { + while (gvl->wait_yield) + rb_native_cond_wait(&gvl->switch_wait_cond, &gvl->lock); + } + else if (next) { + /* Wait until another thread task takes GVL. */ + gvl->need_yield = 1; + gvl->wait_yield = 1; + while (gvl->need_yield) + rb_native_cond_wait(&gvl->switch_cond, &gvl->lock); + gvl->wait_yield = 0; + rb_native_cond_broadcast(&gvl->switch_wait_cond); } else { - native_mutex_unlock(&vm->gvl.lock); - sched_yield(); - native_mutex_lock(&vm->gvl.lock); + rb_native_mutex_unlock(&gvl->lock); + native_thread_yield(); + rb_native_mutex_lock(&gvl->lock); + rb_native_cond_broadcast(&gvl->switch_wait_cond); } - - native_cond_broadcast(&vm->gvl.switch_wait_cond); - acquire: - gvl_acquire_common(vm); - native_mutex_unlock(&vm->gvl.lock); + gvl_acquire_common(gvl, th); + rb_native_mutex_unlock(&gvl->lock); } -static void -gvl_init(rb_vm_t *vm) +void +rb_gvl_init(rb_global_vm_lock_t *gvl) { - native_mutex_initialize(&vm->gvl.lock); - native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC); - native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC); - native_cond_initialize(&vm->gvl.switch_wait_cond, RB_CONDATTR_CLOCK_MONOTONIC); - vm->gvl.acquired = 0; - vm->gvl.waiting = 0; - vm->gvl.need_yield = 0; - vm->gvl.wait_yield = 0; + rb_native_mutex_initialize(&gvl->lock); + rb_native_cond_initialize(&gvl->switch_cond); + rb_native_cond_initialize(&gvl->switch_wait_cond); + list_head_init(&gvl->waitq); + gvl->owner = 0; + gvl->timer = 0; + gvl->timer_err = ETIMEDOUT; + gvl->need_yield = 0; + gvl->wait_yield = 0; } static void -gvl_destroy(rb_vm_t *vm) +gvl_destroy(rb_global_vm_lock_t *gvl) { - native_cond_destroy(&vm->gvl.switch_wait_cond); - native_cond_destroy(&vm->gvl.switch_cond); - native_cond_destroy(&vm->gvl.cond); - native_mutex_destroy(&vm->gvl.lock); + /* + * only called once at VM shutdown (not atfork), another thread + * may still grab vm->gvl.lock when calling gvl_release at + * the end of thread_start_func_2 + */ + if (0) { + rb_native_cond_destroy(&gvl->switch_wait_cond); + rb_native_cond_destroy(&gvl->switch_cond); + rb_native_mutex_destroy(&gvl->lock); + } + clear_thread_cache_altstack(); } #if defined(HAVE_WORKING_FORK) +static void thread_cache_reset(void); static void -gvl_atfork(rb_vm_t *vm) +gvl_atfork(rb_global_vm_lock_t *gvl) { - gvl_init(vm); - gvl_acquire(vm, GET_THREAD()); + thread_cache_reset(); + rb_gvl_init(gvl); + gvl_acquire(gvl, GET_THREAD()); } #endif @@ -205,8 +431,8 @@ mutex_debug(const char *msg, void *lock) } } -static void -native_mutex_lock(pthread_mutex_t *lock) +void +rb_native_mutex_lock(pthread_mutex_t *lock) { int r; mutex_debug("lock", lock); @@ -215,8 +441,8 @@ native_mutex_lock(pthread_mutex_t *lock) } } -static void -native_mutex_unlock(pthread_mutex_t *lock) +void +rb_native_mutex_unlock(pthread_mutex_t *lock) { int r; mutex_debug("unlock", lock); @@ -225,8 +451,8 @@ native_mutex_unlock(pthread_mutex_t *lock) } } -static inline int -native_mutex_trylock(pthread_mutex_t *lock) +int +rb_native_mutex_trylock(pthread_mutex_t *lock) { int r; mutex_debug("trylock", lock); @@ -241,8 +467,8 @@ native_mutex_trylock(pthread_mutex_t *lock) return 0; } -static void -native_mutex_initialize(pthread_mutex_t *lock) +void +rb_native_mutex_initialize(pthread_mutex_t *lock) { int r = pthread_mutex_init(lock, 0); mutex_debug("init", lock); @@ -251,8 +477,8 @@ native_mutex_initialize(pthread_mutex_t *lock) } } -static void -native_mutex_destroy(pthread_mutex_t *lock) +void +rb_native_mutex_destroy(pthread_mutex_t *lock) { int r = pthread_mutex_destroy(lock); mutex_debug("destroy", lock); @@ -261,46 +487,22 @@ native_mutex_destroy(pthread_mutex_t *lock) } } -static void -native_cond_initialize(rb_nativethread_cond_t *cond, int flags) +void +rb_native_cond_initialize(rb_nativethread_cond_t *cond) { -#ifdef HAVE_PTHREAD_COND_INIT - int r; -# if USE_MONOTONIC_COND - pthread_condattr_t attr; - - pthread_condattr_init(&attr); - - cond->clockid = CLOCK_REALTIME; - if (flags & RB_CONDATTR_CLOCK_MONOTONIC) { - r = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); - if (r == 0) { - cond->clockid = CLOCK_MONOTONIC; - } - } - - r = pthread_cond_init(&cond->cond, &attr); - pthread_condattr_destroy(&attr); -# else - r = pthread_cond_init(&cond->cond, NULL); -# endif + int r = pthread_cond_init(cond, condattr_monotonic); if (r != 0) { rb_bug_errno("pthread_cond_init", r); } - - return; -#endif } -static void -native_cond_destroy(rb_nativethread_cond_t *cond) +void +rb_native_cond_destroy(rb_nativethread_cond_t *cond) { -#ifdef HAVE_PTHREAD_COND_INIT - int r = pthread_cond_destroy(&cond->cond); + int r = pthread_cond_destroy(cond); if (r != 0) { rb_bug_errno("pthread_cond_destroy", r); } -#endif } /* @@ -309,47 +511,48 @@ native_cond_destroy(rb_nativethread_cond_t *cond) * * http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c * - * The following native_cond_signal and native_cond_broadcast functions + * The following rb_native_cond_signal and rb_native_cond_broadcast functions * need to retrying until pthread functions don't return EAGAIN. */ -static void -native_cond_signal(rb_nativethread_cond_t *cond) +void +rb_native_cond_signal(rb_nativethread_cond_t *cond) { int r; do { - r = pthread_cond_signal(&cond->cond); + r = pthread_cond_signal(cond); } while (r == EAGAIN); if (r != 0) { rb_bug_errno("pthread_cond_signal", r); } } -static void -native_cond_broadcast(rb_nativethread_cond_t *cond) +void +rb_native_cond_broadcast(rb_nativethread_cond_t *cond) { int r; do { - r = pthread_cond_broadcast(&cond->cond); + r = pthread_cond_broadcast(cond); } while (r == EAGAIN); if (r != 0) { - rb_bug_errno("native_cond_broadcast", r); + rb_bug_errno("rb_native_cond_broadcast", r); } } -static void -native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex) +void +rb_native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex) { - int r = pthread_cond_wait(&cond->cond, mutex); + int r = pthread_cond_wait(cond, mutex); if (r != 0) { rb_bug_errno("pthread_cond_wait", r); } } static int -native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *ts) +native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs) { int r; + struct timespec ts; /* * An old Linux may return EINTR. Even though POSIX says @@ -358,75 +561,46 @@ native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, cons * Let's hide it from arch generic code. */ do { - r = pthread_cond_timedwait(&cond->cond, mutex, ts); + rb_hrtime2timespec(&ts, abs); + r = pthread_cond_timedwait(cond, mutex, &ts); } while (r == EINTR); if (r != 0 && r != ETIMEDOUT) { - rb_bug_errno("pthread_cond_timedwait", r); + rb_bug_errno("pthread_cond_timedwait", r); } return r; } -static struct timespec -native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel) +void +rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, unsigned long msec) { - int ret; - struct timeval tv; - struct timespec timeout; - struct timespec now; + rb_hrtime_t hrmsec = native_cond_timeout(cond, RB_HRTIME_PER_MSEC * msec); + native_cond_timedwait(cond, mutex, &hrmsec); +} -#if USE_MONOTONIC_COND - if (cond->clockid == CLOCK_MONOTONIC) { - ret = clock_gettime(cond->clockid, &now); - if (ret != 0) - rb_sys_fail("clock_gettime()"); - goto out; +static rb_hrtime_t +native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel) +{ + if (condattr_monotonic) { + return rb_hrtime_add(rb_hrtime_now(), rel); } + else { + struct timespec ts; - if (cond->clockid != CLOCK_REALTIME) - rb_bug("unsupported clockid %"PRIdVALUE, (SIGNED_VALUE)cond->clockid); -#endif - - ret = gettimeofday(&tv, 0); - if (ret != 0) - rb_sys_fail(0); - now.tv_sec = tv.tv_sec; - now.tv_nsec = tv.tv_usec * 1000; - -#if USE_MONOTONIC_COND - out: -#endif - timeout.tv_sec = now.tv_sec; - timeout.tv_nsec = now.tv_nsec; - timeout.tv_sec += timeout_rel.tv_sec; - timeout.tv_nsec += timeout_rel.tv_nsec; - - if (timeout.tv_nsec >= 1000*1000*1000) { - timeout.tv_sec++; - timeout.tv_nsec -= 1000*1000*1000; + rb_timespec_now(&ts); + return rb_hrtime_add(rb_timespec2hrtime(&ts), rel); } - - if (timeout.tv_sec < now.tv_sec) - timeout.tv_sec = TIMET_MAX; - - return timeout; } #define native_cleanup_push pthread_cleanup_push #define native_cleanup_pop pthread_cleanup_pop -#ifdef HAVE_SCHED_YIELD -#define native_thread_yield() (void)sched_yield() -#else -#define native_thread_yield() ((void)0) -#endif - -#if defined(SIGVTALRM) && !defined(__CYGWIN__) -#define USE_UBF_LIST 1 -static rb_nativethread_lock_t ubf_list_lock; -#endif +#ifdef RB_THREAD_LOCAL_SPECIFIER +static RB_THREAD_LOCAL_SPECIFIER rb_thread_t *ruby_native_thread; +#else static pthread_key_t ruby_native_thread_key; +#endif static void null_func(int i) @@ -434,61 +608,111 @@ null_func(int i) /* null */ } -static rb_thread_t * +rb_thread_t * ruby_thread_from_native(void) { +#ifdef RB_THREAD_LOCAL_SPECIFIER + return ruby_native_thread; +#else return pthread_getspecific(ruby_native_thread_key); +#endif } -static int +int ruby_thread_set_native(rb_thread_t *th) { + if (th && th->ec) { + rb_ractor_set_current_ec(th->ractor, th->ec); + } +#ifdef RB_THREAD_LOCAL_SPECIFIER + ruby_native_thread = th; + return 1; +#else return pthread_setspecific(ruby_native_thread_key, th) == 0; +#endif } static void native_thread_init(rb_thread_t *th); void -Init_native_thread(void) +Init_native_thread(rb_thread_t *th) { - rb_thread_t *th = GET_THREAD(); +#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) + if (condattr_monotonic) { + int r = pthread_condattr_init(condattr_monotonic); + if (r == 0) { + r = pthread_condattr_setclock(condattr_monotonic, CLOCK_MONOTONIC); + } + if (r) condattr_monotonic = NULL; + } +#endif - pthread_key_create(&ruby_native_thread_key, NULL); +#ifndef RB_THREAD_LOCAL_SPECIFIER + if (pthread_key_create(&ruby_native_thread_key, 0) == EAGAIN) { + rb_bug("pthread_key_create failed (ruby_native_thread_key)"); + } + if (pthread_key_create(&ruby_current_ec_key, 0) == EAGAIN) { + rb_bug("pthread_key_create failed (ruby_current_ec_key)"); + } +#endif th->thread_id = pthread_self(); + ruby_thread_set_native(th); fill_thread_id_str(th); native_thread_init(th); -#ifdef USE_UBF_LIST - native_mutex_initialize(&ubf_list_lock); -#endif -#ifndef __native_client__ posix_signal(SIGVTALRM, null_func); +} + +#ifdef RB_THREAD_T_HAS_NATIVE_ID +static int +get_native_thread_id(void) +{ +#ifdef __linux__ + return (int)syscall(SYS_gettid); +#elif defined(__FreeBSD__) + return pthread_getthreadid_np(); #endif } +#endif static void native_thread_init(rb_thread_t *th) { native_thread_data_t *nd = &th->native_thread_data; +#ifdef RB_THREAD_T_HAS_NATIVE_ID + th->tid = get_native_thread_id(); +#endif #ifdef USE_UBF_LIST - list_node_init(&nd->ubf_list); + list_node_init(&nd->node.ubf); #endif - native_cond_initialize(&nd->sleep_cond, RB_CONDATTR_CLOCK_MONOTONIC); - ruby_thread_set_native(th); + rb_native_cond_initialize(&nd->cond.gvlq); + if (&nd->cond.gvlq != &nd->cond.intr) + rb_native_cond_initialize(&nd->cond.intr); } +#ifndef USE_THREAD_CACHE +#define USE_THREAD_CACHE 1 +#endif + static void native_thread_destroy(rb_thread_t *th) { - native_cond_destroy(&th->native_thread_data.sleep_cond); -} + native_thread_data_t *nd = &th->native_thread_data; -#ifndef USE_THREAD_CACHE -#define USE_THREAD_CACHE 0 -#endif + rb_native_cond_destroy(&nd->cond.gvlq); + if (&nd->cond.gvlq != &nd->cond.intr) + rb_native_cond_destroy(&nd->cond.intr); + + /* + * prevent false positive from ruby_thread_has_gvl_p if that + * gets called from an interposing function wrapper + */ + if (USE_THREAD_CACHE) + ruby_thread_set_native(0); +} #if USE_THREAD_CACHE -static rb_thread_t *register_cached_thread_and_wait(void); +static rb_thread_t *register_cached_thread_and_wait(void *); #endif #if defined HAVE_PTHREAD_GETATTR_NP || defined HAVE_PTHREAD_ATTR_GET_NP @@ -505,51 +729,6 @@ size_t pthread_get_stacksize_np(pthread_t); #define STACKADDR_AVAILABLE 1 #elif defined __HAIKU__ #define STACKADDR_AVAILABLE 1 -#elif defined __ia64 && defined _HPUX_SOURCE -#include <sys/dyntune.h> - -#define STACKADDR_AVAILABLE 1 - -/* - * Do not lower the thread's stack to PTHREAD_STACK_MIN, - * otherwise one would receive a 'sendsig: useracc failed.' - * and a coredump. - */ -#undef PTHREAD_STACK_MIN - -#define HAVE_PTHREAD_ATTR_GET_NP 1 -#undef HAVE_PTHREAD_ATTR_GETSTACK - -/* - * As the PTHREAD_STACK_MIN is undefined and - * no one touches the default stacksize, - * it is just fine to use the default. - */ -#define pthread_attr_get_np(thid, attr) 0 - -/* - * Using value of sp is very rough... To make it more real, - * addr would need to be aligned to vps_pagesize. - * The vps_pagesize is 'Default user page size (kBytes)' - * and could be retrieved by gettune(). - */ -static int -hpux_attr_getstackaddr(const pthread_attr_t *attr, void **addr) -{ - static uint64_t pagesize; - size_t size; - - if (!pagesize) { - if (gettune("vps_pagesize", &pagesize)) { - pagesize = 16; - } - pagesize *= 1024; - } - pthread_attr_getstacksize(attr, &size); - *addr = (void *)((size_t)((char *)_Asm_get_sp() - size) & ~(pagesize - 1)); - return 0; -} -#define pthread_attr_getstackaddr(attr, addr) hpux_attr_getstackaddr(attr, addr) #endif #ifndef MAINSTACKADDR_AVAILABLE @@ -584,7 +763,11 @@ get_stack(void **addr, size_t *size) CHECK_ERR(pthread_attr_getstackaddr(&attr, addr)); CHECK_ERR(pthread_attr_getstacksize(&attr, size)); # endif +# ifdef HAVE_PTHREAD_ATTR_GETGUARDSIZE CHECK_ERR(pthread_attr_getguardsize(&attr, &guard)); +# else + guard = getpagesize(); +# endif *size -= guard; pthread_attr_destroy(&attr); #elif defined HAVE_PTHREAD_ATTR_GET_NP /* FreeBSD, DragonFly BSD, NetBSD */ @@ -645,9 +828,6 @@ static struct { rb_nativethread_id_t id; size_t stack_maxsize; VALUE *stack_start; -#ifdef __ia64 - VALUE *register_stack_start; -#endif } native_main_thread; #ifdef STACK_END_ADDRESS @@ -728,24 +908,11 @@ reserve_stack(volatile char *limit, size_t size) #endif #undef ruby_init_stack -/* Set stack bottom of Ruby implementation. - * - * You must call this function before any heap allocation by Ruby implementation. - * Or GC will break living objects */ void -ruby_init_stack(volatile VALUE *addr -#ifdef __ia64 - , void *bsp -#endif - ) +ruby_init_stack(volatile VALUE *addr) { native_main_thread.id = pthread_self(); -#ifdef __ia64 - if (!native_main_thread.register_stack_start || - (VALUE*)bsp < native_main_thread.register_stack_start) { - native_main_thread.register_stack_start = (VALUE*)bsp; - } -#endif + #if MAINSTACKADDR_AVAILABLE if (native_main_thread.stack_maxsize) return; { @@ -832,8 +999,8 @@ native_thread_init_stack(rb_thread_t *th) rb_nativethread_id_t curr = pthread_self(); if (pthread_equal(curr, native_main_thread.id)) { - th->machine.stack_start = native_main_thread.stack_start; - th->machine.stack_maxsize = native_main_thread.stack_maxsize; + th->ec->machine.stack_start = native_main_thread.stack_start; + th->ec->machine.stack_maxsize = native_main_thread.stack_maxsize; } else { #ifdef STACKADDR_AVAILABLE @@ -842,23 +1009,14 @@ native_thread_init_stack(rb_thread_t *th) if (get_stack(&start, &size) == 0) { uintptr_t diff = (uintptr_t)start - (uintptr_t)&curr; - th->machine.stack_start = (VALUE *)&curr; - th->machine.stack_maxsize = size - diff; - } -#elif defined get_stack_of - if (!th->machine.stack_maxsize) { - native_mutex_lock(&th->interrupt_lock); - native_mutex_unlock(&th->interrupt_lock); + th->ec->machine.stack_start = (VALUE *)&curr; + th->ec->machine.stack_maxsize = size - diff; } #else rb_raise(rb_eNotImpError, "ruby engine can initialize only in the main thread"); #endif } -#ifdef __ia64 - th->machine.register_stack_start = native_main_thread.register_stack_start; - th->machine.stack_maxsize /= 2; - th->machine.register_stack_maxsize = th->machine.stack_maxsize; -#endif + return 0; } @@ -869,11 +1027,12 @@ native_thread_init_stack(rb_thread_t *th) static void * thread_start_func_1(void *th_ptr) { + rb_thread_t *th = th_ptr; + RB_ALTSTACK_INIT(void *altstack, th->altstack); #if USE_THREAD_CACHE thread_start: #endif { - rb_thread_t *th = th_ptr; #if !defined USE_NATIVE_THREAD_INIT VALUE stack_start; #endif @@ -885,108 +1044,124 @@ thread_start_func_1(void *th_ptr) native_thread_init(th); /* run */ #if defined USE_NATIVE_THREAD_INIT - thread_start_func_2(th, th->machine.stack_start, rb_ia64_bsp()); + thread_start_func_2(th, th->ec->machine.stack_start); #else - thread_start_func_2(th, &stack_start, rb_ia64_bsp()); + thread_start_func_2(th, &stack_start); #endif } #if USE_THREAD_CACHE - if (1) { - /* cache thread */ - rb_thread_t *th; - if ((th = register_cached_thread_and_wait()) != 0) { - th_ptr = (void *)th; - th->thread_id = pthread_self(); - goto thread_start; - } + /* cache thread */ + if ((th = register_cached_thread_and_wait(RB_ALTSTACK(altstack))) != 0) { + goto thread_start; } +#else + RB_ALTSTACK_FREE(altstack); #endif return 0; } struct cached_thread_entry { - volatile rb_thread_t **th_area; - rb_nativethread_cond_t *cond; - struct cached_thread_entry *next; + rb_nativethread_cond_t cond; + rb_nativethread_id_t thread_id; + rb_thread_t *th; + void *altstack; + struct list_node node; }; - #if USE_THREAD_CACHE static rb_nativethread_lock_t thread_cache_lock = RB_NATIVETHREAD_LOCK_INIT; -struct cached_thread_entry *cached_thread_root; +static LIST_HEAD(cached_thread_head); -static rb_thread_t * -register_cached_thread_and_wait(void) +# if defined(HAVE_WORKING_FORK) +static void +thread_cache_reset(void) { - rb_nativethread_cond_t cond = RB_NATIVETHREAD_COND_INIT; - volatile rb_thread_t *th_area = 0; - struct timeval tv; - struct timespec ts; - struct cached_thread_entry *entry = - (struct cached_thread_entry *)malloc(sizeof(struct cached_thread_entry)); + rb_native_mutex_initialize(&thread_cache_lock); + list_head_init(&cached_thread_head); +} +# endif - if (entry == 0) { - return 0; /* failed -> terminate thread immediately */ - } +/* + * number of seconds to cache for, I think 1-5s is sufficient to obviate + * the need for thread pool in many network programs (taking into account + * worst case network latency across the globe) without wasting memory + */ +#ifndef THREAD_CACHE_TIME +# define THREAD_CACHE_TIME ((rb_hrtime_t)3 * RB_HRTIME_PER_SEC) +#endif + +static rb_thread_t * +register_cached_thread_and_wait(void *altstack) +{ + rb_hrtime_t end = THREAD_CACHE_TIME; + struct cached_thread_entry entry; - gettimeofday(&tv, 0); - ts.tv_sec = tv.tv_sec + 60; - ts.tv_nsec = tv.tv_usec * 1000; + rb_native_cond_initialize(&entry.cond); + entry.altstack = altstack; + entry.th = NULL; + entry.thread_id = pthread_self(); + end = native_cond_timeout(&entry.cond, end); - native_mutex_lock(&thread_cache_lock); + rb_native_mutex_lock(&thread_cache_lock); { - entry->th_area = &th_area; - entry->cond = &cond; - entry->next = cached_thread_root; - cached_thread_root = entry; - - native_cond_timedwait(&cond, &thread_cache_lock, &ts); - - { - struct cached_thread_entry *e, **prev = &cached_thread_root; - - while ((e = *prev) != 0) { - if (e == entry) { - *prev = e->next; - break; - } - prev = &e->next; - } - } + list_add(&cached_thread_head, &entry.node); + + native_cond_timedwait(&entry.cond, &thread_cache_lock, &end); - free(entry); /* ok */ - native_cond_destroy(&cond); + if (entry.th == NULL) { /* unused */ + list_del(&entry.node); + } + } + rb_native_mutex_unlock(&thread_cache_lock); + + rb_native_cond_destroy(&entry.cond); + if (!entry.th) { + RB_ALTSTACK_FREE(entry.altstack); } - native_mutex_unlock(&thread_cache_lock); - return (rb_thread_t *)th_area; + return entry.th; } +#else +# if defined(HAVE_WORKING_FORK) +static void thread_cache_reset(void) { } +# endif #endif static int use_cached_thread(rb_thread_t *th) { - int result = 0; #if USE_THREAD_CACHE struct cached_thread_entry *entry; - if (cached_thread_root) { - native_mutex_lock(&thread_cache_lock); - entry = cached_thread_root; - { - if (cached_thread_root) { - cached_thread_root = entry->next; - *entry->th_area = th; - result = 1; - } - } - if (result) { - native_cond_signal(entry->cond); - } - native_mutex_unlock(&thread_cache_lock); + rb_native_mutex_lock(&thread_cache_lock); + entry = list_pop(&cached_thread_head, struct cached_thread_entry, node); + if (entry) { + entry->th = th; + /* th->thread_id must be set before signal for Thread#name= */ + th->thread_id = entry->thread_id; + fill_thread_id_str(th); + rb_native_cond_signal(&entry->cond); } + rb_native_mutex_unlock(&thread_cache_lock); + return !!entry; +#endif + return 0; +} + +static void +clear_thread_cache_altstack(void) +{ +#if USE_THREAD_CACHE + struct cached_thread_entry *entry; + + rb_native_mutex_lock(&thread_cache_lock); + list_for_each(&cached_thread_head, entry, node) { + void MAYBE_UNUSED(*altstack) = entry->altstack; + entry->altstack = 0; + RB_ALTSTACK_FREE(altstack); + } + rb_native_mutex_unlock(&thread_cache_lock); #endif - return result; } static int @@ -998,22 +1173,15 @@ native_thread_create(rb_thread_t *th) thread_debug("create (use cached thread): %p\n", (void *)th); } else { -#ifdef HAVE_PTHREAD_ATTR_INIT pthread_attr_t attr; - pthread_attr_t *const attrp = &attr; -#else - pthread_attr_t *const attrp = NULL; -#endif - const size_t stack_size = th->vm->default_params.thread_machine_stack_size; + const size_t stack_size = th->vm->default_params.thread_machine_stack_size + th->vm->default_params.thread_vm_stack_size; const size_t space = space_size(stack_size); - th->machine.stack_maxsize = stack_size - space; -#ifdef __ia64 - th->machine.stack_maxsize /= 2; - th->machine.register_stack_maxsize = th->machine.stack_maxsize; +#ifdef USE_SIGALTSTACK + th->altstack = rb_allocate_sigaltstack(); #endif + th->ec->machine.stack_maxsize = stack_size - space; -#ifdef HAVE_PTHREAD_ATTR_INIT CHECK_ERR(pthread_attr_init(&attr)); # ifdef PTHREAD_STACK_MIN @@ -1025,41 +1193,16 @@ native_thread_create(rb_thread_t *th) CHECK_ERR(pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED)); # endif CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)); -#endif -#ifdef get_stack_of - native_mutex_lock(&th->interrupt_lock); -#endif - err = pthread_create(&th->thread_id, attrp, thread_start_func_1, th); -#ifdef get_stack_of - if (!err) { - get_stack_of(th->thread_id, - &th->machine.stack_start, - &th->machine.stack_maxsize); - } - native_mutex_unlock(&th->interrupt_lock); -#endif + + err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th); thread_debug("create: %p (%d)\n", (void *)th, err); /* should be done in the created thread */ fill_thread_id_str(th); -#ifdef HAVE_PTHREAD_ATTR_INIT CHECK_ERR(pthread_attr_destroy(&attr)); -#endif } return err; } -#if USE_SLEEPY_TIMER_THREAD -static void -native_thread_join(pthread_t th) -{ - int err = pthread_join(th, 0); - if (err) { - rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); - } -} -#endif - - #if USE_NATIVE_THREAD_PRIORITY static void @@ -1101,77 +1244,80 @@ ubf_pthread_cond_signal(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th); - native_cond_signal(&th->native_thread_data.sleep_cond); + rb_native_cond_signal(&th->native_thread_data.cond.intr); } static void -native_sleep(rb_thread_t *th, struct timeval *timeout_tv) +native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel) { - struct timespec timeout; rb_nativethread_lock_t *lock = &th->interrupt_lock; - rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond; - - if (timeout_tv) { - struct timespec timeout_rel; - - timeout_rel.tv_sec = timeout_tv->tv_sec; - timeout_rel.tv_nsec = timeout_tv->tv_usec * 1000; - - /* Solaris cond_timedwait() return EINVAL if an argument is greater than - * current_time + 100,000,000. So cut up to 100,000,000. This is - * considered as a kind of spurious wakeup. The caller to native_sleep - * should care about spurious wakeup. - * - * See also [Bug #1341] [ruby-core:29702] - * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html - */ - if (timeout_rel.tv_sec > 100000000) { - timeout_rel.tv_sec = 100000000; - timeout_rel.tv_nsec = 0; - } - - timeout = native_cond_timeout(cond, timeout_rel); - } + rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr; + + /* Solaris cond_timedwait() return EINVAL if an argument is greater than + * current_time + 100,000,000. So cut up to 100,000,000. This is + * considered as a kind of spurious wakeup. The caller to native_sleep + * should care about spurious wakeup. + * + * See also [Bug #1341] [ruby-core:29702] + * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html + */ + const rb_hrtime_t max = (rb_hrtime_t)100000000 * RB_HRTIME_PER_SEC; - GVL_UNLOCK_BEGIN(); + GVL_UNLOCK_BEGIN(th); { - native_mutex_lock(lock); + rb_native_mutex_lock(lock); th->unblock.func = ubf_pthread_cond_signal; th->unblock.arg = th; - if (RUBY_VM_INTERRUPTED(th)) { + if (RUBY_VM_INTERRUPTED(th->ec)) { /* interrupted. return immediate */ thread_debug("native_sleep: interrupted before sleep\n"); } else { - if (!timeout_tv) - native_cond_wait(cond, lock); - else - native_cond_timedwait(cond, lock, &timeout); + if (!rel) { + rb_native_cond_wait(cond, lock); + } + else { + rb_hrtime_t end; + + if (*rel > max) { + *rel = max; + } + + end = native_cond_timeout(cond, *rel); + native_cond_timedwait(cond, lock, &end); + } } th->unblock.func = 0; - th->unblock.arg = 0; - native_mutex_unlock(lock); + rb_native_mutex_unlock(lock); } - GVL_UNLOCK_END(); + GVL_UNLOCK_END(th); thread_debug("native_sleep done\n"); } #ifdef USE_UBF_LIST static LIST_HEAD(ubf_list_head); +static rb_nativethread_lock_t ubf_list_lock = RB_NATIVETHREAD_LOCK_INIT; + +static void +ubf_list_atfork(void) +{ + list_head_init(&ubf_list_head); + rb_native_mutex_initialize(&ubf_list_lock); +} /* The thread 'th' is registered to be trying unblock. */ static void register_ubf_list(rb_thread_t *th) { - struct list_node *node = &th->native_thread_data.ubf_list; + struct list_node *node = &th->native_thread_data.node.ubf; if (list_empty((struct list_head*)node)) { - native_mutex_lock(&ubf_list_lock); + rb_native_mutex_lock(&ubf_list_lock); list_add(&ubf_list_head, node); - native_mutex_unlock(&ubf_list_lock); + rb_native_mutex_unlock(&ubf_list_lock); } } @@ -1179,12 +1325,18 @@ register_ubf_list(rb_thread_t *th) static void unregister_ubf_list(rb_thread_t *th) { - struct list_node *node = &th->native_thread_data.ubf_list; + struct list_node *node = &th->native_thread_data.node.ubf; + + /* we can't allow re-entry into ubf_list_head */ + VM_ASSERT(th->unblock.func == 0); if (!list_empty((struct list_head*)node)) { - native_mutex_lock(&ubf_list_lock); - list_del_init(node); - native_mutex_unlock(&ubf_list_lock); + rb_native_mutex_lock(&ubf_list_lock); + list_del_init(node); + if (list_empty(&ubf_list_head) && !rb_signal_buff_size()) { + ubf_timer_disarm(); + } + rb_native_mutex_unlock(&ubf_list_lock); } } @@ -1196,25 +1348,42 @@ static void ubf_wakeup_thread(rb_thread_t *th) { thread_debug("thread_wait_queue_wakeup (%"PRI_THREAD_ID")\n", thread_id_str(th)); - if (th) - pthread_kill(th->thread_id, SIGVTALRM); + pthread_kill(th->thread_id, SIGVTALRM); } static void ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; + rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); + const rb_thread_t *cur = ruby_thread_from_native(); /* may be 0 */ + register_ubf_list(th); /* * ubf_wakeup_thread() doesn't guarantee to wake up a target thread. * Therefore, we repeatedly call ubf_wakeup_thread() until a target thread - * exit from ubf function. - * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread() - * if running on timer thread because it may make endless wakeups. + * exit from ubf function. We must have a timer to perform this operation. + * We use double-checked locking here because this function may be called + * while vm->gvl.lock is held in do_gvl_timer. + * There is also no need to start a timer if we're the designated + * sigwait_th thread, otherwise we can deadlock with a thread + * in unblock_function_clear. */ - if (!pthread_equal(pthread_self(), timer_thread.id)) - rb_thread_wakeup_timer_thread(); + if (cur != gvl->timer && cur != sigwait_th) { + /* + * Double-checked locking above was to prevent nested locking + * by the SAME thread. We use trylock here to prevent deadlocks + * between DIFFERENT threads + */ + if (rb_native_mutex_trylock(&gvl->lock) == 0) { + if (!gvl->timer) { + rb_thread_wakeup_timer_thread(-1); + } + rb_native_mutex_unlock(&gvl->lock); + } + } + ubf_wakeup_thread(th); } @@ -1228,14 +1397,15 @@ static void ubf_wakeup_all_threads(void) { rb_thread_t *th; + native_thread_data_t *dat; if (!ubf_threads_empty()) { - native_mutex_lock(&ubf_list_lock); - list_for_each(&ubf_list_head, th, - native_thread_data.ubf_list) { + rb_native_mutex_lock(&ubf_list_lock); + list_for_each(&ubf_list_head, dat, node.ubf) { + th = container_of(dat, rb_thread_t, native_thread_data); ubf_wakeup_thread(th); } - native_mutex_unlock(&ubf_list_lock); + rb_native_mutex_unlock(&ubf_list_lock); } } @@ -1245,57 +1415,39 @@ ubf_wakeup_all_threads(void) #define ubf_select 0 static void ubf_wakeup_all_threads(void) { return; } static int ubf_threads_empty(void) { return 1; } +#define ubf_list_atfork() do {} while (0) #endif /* USE_UBF_LIST */ #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_USEC (100 * 1000) - -#if USE_SLEEPY_TIMER_THREAD static struct { - /* - * Read end of each pipe is closed inside timer thread for shutdown - * Write ends are closed by a normal Ruby thread during shutdown - */ - int normal[2]; - int low[2]; + /* pipes are closed in forked children when owner_process does not match */ + int normal[2]; /* [0] == sigwait_fd */ + int ub_main[2]; /* unblock main thread from native_ppoll_sleep */ /* volatile for signal handler use: */ volatile rb_pid_t owner_process; - rb_atomic_t writing; -} timer_thread_pipe = { +} signal_self_pipe = { + {-1, -1}, {-1, -1}, - {-1, -1}, /* low priority */ }; -NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static void -async_bug_fd(const char *mesg, int errno_arg, int fd) -{ - char buff[64]; - size_t n = strlcpy(buff, mesg, sizeof(buff)); - if (n < sizeof(buff)-3) { - ruby_snprintf(buff, sizeof(buff)-n, "(%d)", fd); - } - rb_async_bug_errno(buff, errno_arg); -} - /* only use signal-safe system calls here */ static void -rb_thread_wakeup_timer_thread_fd(volatile int *fdp) +rb_thread_wakeup_timer_thread_fd(int fd) { +#if USE_EVENTFD + const uint64_t buff = 1; +#else + const char buff = '!'; +#endif ssize_t result; - int fd = *fdp; /* access fdp exactly once here and do not reread fdp */ /* already opened */ - if (fd >= 0 && timer_thread_pipe.owner_process == getpid()) { - static const char buff[1] = {'!'}; + if (fd >= 0) { retry: - if ((result = write(fd, buff, 1)) <= 0) { + if ((result = write(fd, &buff, sizeof(buff))) <= 0) { int e = errno; switch (e) { case EINTR: goto retry; @@ -1315,64 +1467,113 @@ rb_thread_wakeup_timer_thread_fd(volatile int *fdp) } } -void -rb_thread_wakeup_timer_thread(void) -{ - /* must be safe inside sighandler, so no mutex */ - if (timer_thread_pipe.owner_process == getpid()) { - ATOMIC_INC(timer_thread_pipe.writing); - rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.normal[1]); - ATOMIC_DEC(timer_thread_pipe.writing); - } -} - +/* + * This ensures we get a SIGVTALRM in TIME_QUANTUM_MSEC if our + * process could not react to the original signal in time. + */ static void -rb_thread_wakeup_timer_thread_low(void) -{ - if (timer_thread_pipe.owner_process == getpid()) { - ATOMIC_INC(timer_thread_pipe.writing); - rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.low[1]); - ATOMIC_DEC(timer_thread_pipe.writing); +ubf_timer_arm(rb_pid_t current) /* async signal safe */ +{ +#if UBF_TIMER == UBF_TIMER_POSIX + if ((!current || timer_posix.owner == current) && + timer_state_cas(RTIMER_DISARM, RTIMER_ARMING) == RTIMER_DISARM) { + struct itimerspec it; + + it.it_interval.tv_sec = it.it_value.tv_sec = 0; + it.it_interval.tv_nsec = it.it_value.tv_nsec = TIME_QUANTUM_NSEC; + + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (arm)", errno); + + switch (timer_state_cas(RTIMER_ARMING, RTIMER_ARMED)) { + case RTIMER_DISARM: + /* somebody requested a disarm while we were arming */ + /* may race harmlessly with ubf_timer_destroy */ + (void)timer_settime(timer_posix.timerid, 0, &zero, 0); + + case RTIMER_ARMING: return; /* success */ + case RTIMER_ARMED: + /* + * it is possible to have another thread disarm, and + * a third thread arm finish re-arming before we get + * here, so we wasted a syscall with timer_settime but + * probably unavoidable in a signal handler. + */ + return; + case RTIMER_DEAD: + /* may race harmlessly with ubf_timer_destroy */ + (void)timer_settime(timer_posix.timerid, 0, &zero, 0); + return; + default: + rb_async_bug_errno("UBF_TIMER_POSIX unknown state", ERANGE); + } + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + if (!current || current == timer_pthread.owner) { + if (ATOMIC_EXCHANGE(timer_pthread.armed, 1) == 0) + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); } +#endif } -/* VM-dependent API is not available for this function */ -static void -consume_communication_pipe(int fd) +void +rb_thread_wakeup_timer_thread(int sig) { -#define CCP_READ_BUFF_SIZE 1024 - /* buffer can be shared because no one refers to them. */ - static char buff[CCP_READ_BUFF_SIZE]; - ssize_t result; + rb_pid_t current; - while (1) { - result = read(fd, buff, sizeof(buff)); - if (result == 0) { - return; - } - else if (result < 0) { - int e = errno; - switch (e) { - case EINTR: - continue; /* retry */ - case EAGAIN: -#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - return; - default: - async_bug_fd("consume_communication_pipe: read", e, fd); - } - } + /* non-sighandler path */ + if (sig <= 0) { + rb_thread_wakeup_timer_thread_fd(signal_self_pipe.normal[1]); + if (sig < 0) { + ubf_timer_arm(0); + } + return; + } + + /* must be safe inside sighandler, so no mutex */ + current = getpid(); + if (signal_self_pipe.owner_process == current) { + rb_thread_wakeup_timer_thread_fd(signal_self_pipe.normal[1]); + + /* + * system_working check is required because vm and main_thread are + * freed during shutdown + */ + if (system_working > 0) { + volatile rb_execution_context_t *ec; + rb_vm_t *vm = GET_VM(); + rb_thread_t *mth; + + /* + * FIXME: root VM and main_thread should be static and not + * on heap for maximum safety (and startup/shutdown speed) + */ + if (!vm) return; + mth = vm->ractor.main_thread; + if (!mth || system_working <= 0) return; + + /* this relies on GC for grace period before cont_free */ + ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); + + if (ec) { + RUBY_VM_SET_TRAP_INTERRUPT(ec); + ubf_timer_arm(current); + + /* some ubfs can interrupt single-threaded process directly */ + if (vm->ubf_async_safe && mth->unblock.func) { + (mth->unblock.func)(mth->unblock.arg); + } + } + } } } -#define CLOSE_INVALIDATE(expr) \ - close_invalidate(&timer_thread_pipe.expr,"close_invalidate: "#expr) +#define CLOSE_INVALIDATE_PAIR(expr) \ + close_invalidate_pair(expr,"close_invalidate: "#expr) static void -close_invalidate(volatile int *fdp, const char *msg) +close_invalidate(int *fdp, const char *msg) { - int fd = *fdp; /* access fdp exactly once here and do not reread fdp */ + int fd = *fdp; *fdp = -1; if (close(fd) < 0) { @@ -1381,6 +1582,19 @@ close_invalidate(volatile int *fdp, const char *msg) } static void +close_invalidate_pair(int fds[2], const char *msg) +{ + if (USE_EVENTFD && fds[0] == fds[1]) { + close_invalidate(&fds[0], msg); + fds[1] = -1; + } + else { + close_invalidate(&fds[0], msg); + close_invalidate(&fds[1], msg); + } +} + +static void set_nonblock(int fd) { int oflags; @@ -1395,14 +1609,33 @@ set_nonblock(int fd) rb_sys_fail(0); } +/* communication pipe with timer thread and signal handler */ static int setup_communication_pipe_internal(int pipes[2]) { int err; + if (pipes[0] >= 0 || pipes[1] >= 0) { + VM_ASSERT(pipes[0] >= 0); + VM_ASSERT(pipes[1] >= 0); + return 0; + } + + /* + * Don't bother with eventfd on ancient Linux 2.6.22..2.6.26 which were + * missing EFD_* flags, they can fall back to pipe + */ +#if USE_EVENTFD && defined(EFD_NONBLOCK) && defined(EFD_CLOEXEC) + pipes[0] = pipes[1] = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); + if (pipes[0] >= 0) { + rb_update_max_fd(pipes[0]); + return 0; + } +#endif + err = rb_cloexec_pipe(pipes); if (err != 0) { - rb_warn("Failed to create communication pipe for timer thread: %s", + rb_warn("pipe creation failed for timer: %s, scheduling broken", strerror(errno)); return -1; } @@ -1413,242 +1646,270 @@ setup_communication_pipe_internal(int pipes[2]) return 0; } -/* communication pipe with timer thread and signal handler */ -static int -setup_communication_pipe(void) -{ - VM_ASSERT(timer_thread_pipe.owner_process == 0); - VM_ASSERT(timer_thread_pipe.normal[0] == -1); - VM_ASSERT(timer_thread_pipe.normal[1] == -1); - VM_ASSERT(timer_thread_pipe.low[0] == -1); - VM_ASSERT(timer_thread_pipe.low[1] == -1); +#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) +# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) +#endif - if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) { - return errno; - } - if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) { - int e = errno; - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); - return e; - } +enum { + THREAD_NAME_MAX = +#if defined(__linux__) + 16 +#elif defined(__APPLE__) +/* Undocumented, and main thread seems unlimited */ + 64 +#else + 16 +#endif +}; - return 0; -} +static VALUE threadptr_invoke_proc_location(rb_thread_t *th); -/** - * Let the timer thread sleep a while. - * - * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running. - * @pre the calling context is in the timer thread. - */ -static inline void -timer_thread_sleep(rb_global_vm_lock_t* gvl) +static void +native_set_thread_name(rb_thread_t *th) { - int result; - int need_polling; - struct pollfd pollfds[2]; - - pollfds[0].fd = timer_thread_pipe.normal[0]; - pollfds[0].events = POLLIN; - pollfds[1].fd = timer_thread_pipe.low[0]; - pollfds[1].events = POLLIN; - - need_polling = !ubf_threads_empty(); - - if (gvl->waiting > 0 || need_polling) { - /* polling (TIME_QUANTUM_USEC usec) */ - result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000); - } - else { - /* wait (infinite) */ - result = poll(pollfds, numberof(pollfds), -1); - } - - if (result == 0) { - /* maybe timeout */ - } - else if (result > 0) { - consume_communication_pipe(timer_thread_pipe.normal[0]); - consume_communication_pipe(timer_thread_pipe.low[0]); - } - else { /* result < 0 */ - int e = errno; - switch (e) { - case EBADF: - case EINVAL: - case ENOMEM: /* from Linux man */ - case EFAULT: /* from FreeBSD man */ - rb_async_bug_errno("thread_timer: select", e); - default: - /* ignore */; - } +#ifdef SET_CURRENT_THREAD_NAME + VALUE loc; + if (!NIL_P(loc = th->name)) { + SET_CURRENT_THREAD_NAME(RSTRING_PTR(loc)); + } + else if ((loc = threadptr_invoke_proc_location(th)) != Qnil) { + char *name, *p; + char buf[THREAD_NAME_MAX]; + size_t len; + int n; + + name = RSTRING_PTR(RARRAY_AREF(loc, 0)); + p = strrchr(name, '/'); /* show only the basename of the path. */ + if (p && p[1]) + name = p + 1; + + n = snprintf(buf, sizeof(buf), "%s:%d", name, NUM2INT(RARRAY_AREF(loc, 1))); + RB_GC_GUARD(loc); + + len = (size_t)n; + if (len >= sizeof(buf)) { + buf[sizeof(buf)-2] = '*'; + buf[sizeof(buf)-1] = '\0'; + } + SET_CURRENT_THREAD_NAME(buf); } +#endif } -#else /* USE_SLEEPY_TIMER_THREAD */ -# define PER_NANO 1000000000 -void rb_thread_wakeup_timer_thread(void) {} -static void rb_thread_wakeup_timer_thread_low(void) {} - -static rb_nativethread_lock_t timer_thread_lock; -static rb_nativethread_cond_t timer_thread_cond; - -static inline void -timer_thread_sleep(rb_global_vm_lock_t* unused) +static void +native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) { - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; - ts = native_cond_timeout(&timer_thread_cond, ts); - - native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts); +#if defined SET_ANOTHER_THREAD_NAME || defined SET_CURRENT_THREAD_NAME + char buf[THREAD_NAME_MAX]; + const char *s = ""; +# if !defined SET_ANOTHER_THREAD_NAME + if (!pthread_equal(pthread_self(), thread_id)) return; +# endif + if (!NIL_P(name)) { + long n; + RSTRING_GETMEM(name, s, n); + if (n >= (int)sizeof(buf)) { + memcpy(buf, s, sizeof(buf)-1); + buf[sizeof(buf)-1] = '\0'; + s = buf; + } + } +# if defined SET_ANOTHER_THREAD_NAME + SET_ANOTHER_THREAD_NAME(thread_id, s); +# elif defined SET_CURRENT_THREAD_NAME + SET_CURRENT_THREAD_NAME(s); +# endif +#endif } -#endif /* USE_SLEEPY_TIMER_THREAD */ -#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME) -# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name) +#if defined(RB_THREAD_T_HAS_NATIVE_ID) || defined(__APPLE__) +static VALUE +native_thread_native_thread_id(rb_thread_t *target_th) +{ +#ifdef RB_THREAD_T_HAS_NATIVE_ID + int tid = target_th->tid; + if (tid == 0) return Qnil; + return INT2FIX(tid); +#elif defined(__APPLE__) + uint64_t tid; + int e = pthread_threadid_np(target_th->thread_id, &tid); + if (e != 0) rb_syserr_fail(e, "pthread_threadid_np"); + return ULL2NUM((unsigned long long)tid); +#endif +} +# define USE_NATIVE_THREAD_NATIVE_THREAD_ID 1 +#else +# define USE_NATIVE_THREAD_NATIVE_THREAD_ID 0 #endif static void -native_set_thread_name(rb_thread_t *th) +ubf_timer_invalidate(void) { -#ifdef SET_CURRENT_THREAD_NAME - if (!th->first_func && th->first_proc) { - VALUE loc; - if (!NIL_P(loc = th->name)) { - SET_CURRENT_THREAD_NAME(RSTRING_PTR(loc)); - } - else if (!NIL_P(loc = rb_proc_location(th->first_proc))) { - const VALUE *ptr = RARRAY_CONST_PTR(loc); /* [ String, Integer ] */ - char *name, *p; - char buf[16]; - size_t len; - int n; - - name = RSTRING_PTR(ptr[0]); - p = strrchr(name, '/'); /* show only the basename of the path. */ - if (p && p[1]) - name = p + 1; - - n = snprintf(buf, sizeof(buf), "%s:%d", name, NUM2INT(ptr[1])); - rb_gc_force_recycle(loc); /* acts as a GC guard, too */ - - len = (size_t)n; - if (len >= sizeof(buf)) { - buf[sizeof(buf)-2] = '*'; - buf[sizeof(buf)-1] = '\0'; - } - SET_CURRENT_THREAD_NAME(buf); - } - } +#if UBF_TIMER == UBF_TIMER_PTHREAD + CLOSE_INVALIDATE_PAIR(timer_pthread.low); #endif } -static void * -thread_timer(void *p) +static void +ubf_timer_pthread_create(rb_pid_t current) { - rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p; +#if UBF_TIMER == UBF_TIMER_PTHREAD + int err; + if (timer_pthread.owner == current) + return; - if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n"); + if (setup_communication_pipe_internal(timer_pthread.low) < 0) + return; -#ifdef SET_CURRENT_THREAD_NAME - SET_CURRENT_THREAD_NAME("ruby-timer-thr"); + err = pthread_create(&timer_pthread.thid, 0, timer_pthread_fn, GET_VM()); + if (!err) + timer_pthread.owner = current; + else + rb_warn("pthread_create failed for timer: %s, signals racy", + strerror(err)); #endif +} -#if !USE_SLEEPY_TIMER_THREAD - native_mutex_initialize(&timer_thread_lock); - native_cond_initialize(&timer_thread_cond, RB_CONDATTR_CLOCK_MONOTONIC); - native_mutex_lock(&timer_thread_lock); -#endif - while (system_working > 0) { +static void +ubf_timer_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_POSIX +# if defined(__sun) +# define UBF_TIMER_CLOCK CLOCK_REALTIME +# else /* Tested Linux and FreeBSD: */ +# define UBF_TIMER_CLOCK CLOCK_MONOTONIC +# endif - /* timer function */ - ubf_wakeup_all_threads(); - timer_thread_function(0); + struct sigevent sev; - if (TT_DEBUG) WRITE_CONST(2, "tick\n"); + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIGVTALRM; + sev.sigev_value.sival_ptr = &timer_posix; - /* wait */ - timer_thread_sleep(gvl); + if (!timer_create(UBF_TIMER_CLOCK, &sev, &timer_posix.timerid)) { + rb_atomic_t prev = timer_state_exchange(RTIMER_DISARM); + + if (prev != RTIMER_DEAD) { + rb_bug("timer_posix was not dead: %u\n", (unsigned)prev); + } + timer_posix.owner = current; + } + else { + rb_warn("timer_create failed: %s, signals racy", strerror(errno)); } -#if USE_SLEEPY_TIMER_THREAD - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(low[0]); -#else - native_mutex_unlock(&timer_thread_lock); - native_cond_destroy(&timer_thread_cond); - native_mutex_destroy(&timer_thread_lock); #endif - - if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n"); - return NULL; + if (UBF_TIMER == UBF_TIMER_PTHREAD) + ubf_timer_pthread_create(current); } static void rb_thread_create_timer_thread(void) { - if (!timer_thread.created) { - int err; -#ifdef HAVE_PTHREAD_ATTR_INIT - pthread_attr_t attr; + /* we only create the pipe, and lazy-spawn */ + rb_pid_t current = getpid(); + rb_pid_t owner = signal_self_pipe.owner_process; - err = pthread_attr_init(&attr); - if (err != 0) { - rb_warn("pthread_attr_init failed for timer: %s, scheduling broken", - strerror(err)); - return; + if (owner && owner != current) { + CLOSE_INVALIDATE_PAIR(signal_self_pipe.normal); + CLOSE_INVALIDATE_PAIR(signal_self_pipe.ub_main); + ubf_timer_invalidate(); + } + + if (setup_communication_pipe_internal(signal_self_pipe.normal) < 0) return; + if (setup_communication_pipe_internal(signal_self_pipe.ub_main) < 0) return; + + ubf_timer_create(current); + if (owner != current) { + /* validate pipe on this process */ + sigwait_th = THREAD_INVALID; + signal_self_pipe.owner_process = current; + } +} + +static void +ubf_timer_disarm(void) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + rb_atomic_t prev; + + if (timer_posix.owner && timer_posix.owner != getpid()) return; + prev = timer_state_cas(RTIMER_ARMED, RTIMER_DISARM); + switch (prev) { + case RTIMER_DISARM: return; /* likely */ + case RTIMER_ARMING: return; /* ubf_timer_arm will disarm itself */ + case RTIMER_ARMED: + if (timer_settime(timer_posix.timerid, 0, &zero, 0)) { + int err = errno; + + if (err == EINVAL) { + prev = timer_state_cas(RTIMER_DISARM, RTIMER_DISARM); + + /* main thread may have killed the timer */ + if (prev == RTIMER_DEAD) return; + + rb_bug_errno("timer_settime (disarm)", err); + } } -# ifdef PTHREAD_STACK_MIN - { - const size_t min_size = (4096 * 4); - /* Allocate the machine stack for the timer thread - * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes - * machine stack overflow only with PTHREAD_STACK_MIN. - */ - size_t stack_size = PTHREAD_STACK_MIN; /* may be dynamic, get only once */ - if (stack_size < min_size) stack_size = min_size; - if (THREAD_DEBUG) stack_size += BUFSIZ; - pthread_attr_setstacksize(&attr, stack_size); - } -# endif + return; + case RTIMER_DEAD: return; /* stay dead */ + default: + rb_bug("UBF_TIMER_POSIX bad state: %u\n", (unsigned)prev); + } + +#elif UBF_TIMER == UBF_TIMER_PTHREAD + ATOMIC_SET(timer_pthread.armed, 0); #endif +} -#if USE_SLEEPY_TIMER_THREAD - err = setup_communication_pipe(); - if (err != 0) { - rb_warn("pipe creation failed for timer: %s, scheduling broken", - strerror(err)); - return; - } -#endif /* USE_SLEEPY_TIMER_THREAD */ +static void +ubf_timer_destroy(void) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + if (timer_posix.owner == getpid()) { + rb_atomic_t expect = RTIMER_DISARM; + size_t i, max = 10000000; + + /* prevent signal handler from arming: */ + for (i = 0; i < max; i++) { + switch (timer_state_cas(expect, RTIMER_DEAD)) { + case RTIMER_DISARM: + if (expect == RTIMER_DISARM) goto done; + expect = RTIMER_DISARM; + break; + case RTIMER_ARMING: + native_thread_yield(); /* let another thread finish arming */ + expect = RTIMER_ARMED; + break; + case RTIMER_ARMED: + if (expect == RTIMER_ARMED) { + if (timer_settime(timer_posix.timerid, 0, &zero, 0)) + rb_bug_errno("timer_settime (destroy)", errno); + goto done; + } + expect = RTIMER_ARMED; + break; + case RTIMER_DEAD: + rb_bug("RTIMER_DEAD unexpected"); + } + } + rb_bug("timed out waiting for timer to arm"); +done: + if (timer_delete(timer_posix.timerid) < 0) + rb_sys_fail("timer_delete"); - /* create timer thread */ - if (timer_thread.created) { - rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n"); - } -#ifdef HAVE_PTHREAD_ATTR_INIT - err = pthread_create(&timer_thread.id, &attr, thread_timer, &GET_VM()->gvl); - pthread_attr_destroy(&attr); -#else - err = pthread_create(&timer_thread.id, NULL, thread_timer, &GET_VM()->gvl); -#endif - if (err != 0) { - rb_warn("pthread_create failed for timer: %s, scheduling broken", - strerror(err)); -#if USE_SLEEPY_TIMER_THREAD - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); - CLOSE_INVALIDATE(low[0]); - CLOSE_INVALIDATE(low[1]); -#endif - return; - } + VM_ASSERT(timer_state_exchange(RTIMER_DEAD) == RTIMER_DEAD); + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + int err; - /* validate pipe on this process */ - timer_thread_pipe.owner_process = getpid(); - timer_thread.created = 1; + timer_pthread.owner = 0; + ubf_timer_disarm(); + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + err = pthread_join(timer_pthread.thid, 0); + if (err) { + rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); } +#endif } static int @@ -1656,37 +1917,10 @@ native_stop_timer_thread(void) { int stopped; stopped = --system_working <= 0; + if (stopped) + ubf_timer_destroy(); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); -#if USE_SLEEPY_TIMER_THREAD - if (stopped) { - /* prevent wakeups from signal handler ASAP */ - timer_thread_pipe.owner_process = 0; - - /* - * however, the above was not enough: the FD may already be - * captured and in the middle of a write while we are running, - * so wait for that to finish: - */ - while (ATOMIC_CAS(timer_thread_pipe.writing, (rb_atomic_t)0, 0)) { - native_thread_yield(); - } - - /* stop writing ends of pipes so timer thread notices EOF */ - CLOSE_INVALIDATE(normal[1]); - CLOSE_INVALIDATE(low[1]); - - /* timer thread will stop looping when system_working <= 0: */ - native_thread_join(timer_thread.id); - - /* timer thread will close the read end on exit: */ - VM_ASSERT(timer_thread_pipe.normal[0] == -1); - VM_ASSERT(timer_thread_pipe.low[0] == -1); - - if (TT_DEBUG) fprintf(stderr, "joined timer thread\n"); - timer_thread.created = 0; - } -#endif return stopped; } @@ -1720,8 +1954,8 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) else #endif if (th) { - size = th->machine.stack_maxsize; - base = (char *)th->machine.stack_start - STACK_DIR_UPPER(0, size); + size = th->ec->machine.stack_maxsize; + base = (char *)th->ec->machine.stack_start - STACK_DIR_UPPER(0, size); } else { return 0; @@ -1743,20 +1977,23 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { -#if USE_SLEEPY_TIMER_THREAD - if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1] || - fd == timer_thread_pipe.low[0] || - fd == timer_thread_pipe.low[1]) && - timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ + /* no false-positive if out-of-FD at startup */ + if (fd < 0) + return 0; + +#if UBF_TIMER == UBF_TIMER_PTHREAD + if (fd == timer_pthread.low[0] || fd == timer_pthread.low[1]) + goto check_pid; +#endif + if (fd == signal_self_pipe.normal[0] || fd == signal_self_pipe.normal[1]) + goto check_pid; + if (fd == signal_self_pipe.ub_main[0] || fd == signal_self_pipe.ub_main[1]) + goto check_pid; + return 0; +check_pid: + if (signal_self_pipe.owner_process == getpid()) /* async-signal-safe */ return 1; - } - else { - return 0; - } -#else return 0; -#endif } rb_nativethread_id_t @@ -1765,4 +2002,307 @@ rb_nativethread_self(void) return pthread_self(); } +#if USE_MJIT +/* A function that wraps actual worker function, for pthread abstraction. */ +static void * +mjit_worker(void *arg) +{ + void (*worker_func)(void) = (void(*)(void))arg; + +#ifdef SET_CURRENT_THREAD_NAME + SET_CURRENT_THREAD_NAME("ruby-mjitworker"); /* 16 byte including NUL */ +#endif + worker_func(); + return NULL; +} + +/* Launch MJIT thread. Returns FALSE if it fails to create thread. */ +int +rb_thread_create_mjit_thread(void (*worker_func)(void)) +{ + pthread_attr_t attr; + pthread_t worker_pid; + int ret = FALSE; + + if (pthread_attr_init(&attr) != 0) return ret; + + /* jit_worker thread is not to be joined */ + if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0 + && pthread_create(&worker_pid, &attr, mjit_worker, (void *)worker_func) == 0) { + ret = TRUE; + } + pthread_attr_destroy(&attr); + return ret; +} +#endif + +int +rb_sigwait_fd_get(const rb_thread_t *th) +{ + if (signal_self_pipe.normal[0] >= 0) { + VM_ASSERT(signal_self_pipe.owner_process == getpid()); + /* + * no need to keep firing the timer if any thread is sleeping + * on the signal self-pipe + */ + ubf_timer_disarm(); + + if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { + return signal_self_pipe.normal[0]; + } + } + return -1; /* avoid thundering herd and work stealing/starvation */ +} + +void +rb_sigwait_fd_put(const rb_thread_t *th, int fd) +{ + const rb_thread_t *old; + + VM_ASSERT(signal_self_pipe.normal[0] == fd); + old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID); + if (old != th) assert(old == th); +} + +#ifndef HAVE_PPOLL +/* TODO: don't ignore sigmask */ +static int +ruby_ppoll(struct pollfd *fds, nfds_t nfds, + const struct timespec *ts, const sigset_t *sigmask) +{ + int timeout_ms; + + if (ts) { + int tmp, tmp2; + + if (ts->tv_sec > INT_MAX/1000) + timeout_ms = INT_MAX; + else { + tmp = (int)(ts->tv_sec * 1000); + /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ + tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); + if (INT_MAX - tmp < tmp2) + timeout_ms = INT_MAX; + else + timeout_ms = (int)(tmp + tmp2); + } + } + else + timeout_ms = -1; + + return poll(fds, nfds, timeout_ms); +} +# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) +#endif + +void +rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *rel) +{ + struct pollfd pfd; + struct timespec ts; + + pfd.fd = sigwait_fd; + pfd.events = POLLIN; + + if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) { + (void)ppoll(&pfd, 1, rb_hrtime2timespec(&ts, rel), 0); + check_signals_nogvl(th, sigwait_fd); + } + else { + rb_hrtime_t to = RB_HRTIME_MAX, end; + int n = 0; + + if (rel) { + to = *rel; + end = rb_hrtime_add(rb_hrtime_now(), to); + } + /* + * tricky: this needs to return on spurious wakeup (no auto-retry). + * But we also need to distinguish between periodic quantum + * wakeups, so we care about the result of consume_communication_pipe + * + * We want to avoid spurious wakeup for Mutex#sleep compatibility + * [ruby-core:88102] + */ + for (;;) { + const rb_hrtime_t *sto = sigwait_timeout(th, sigwait_fd, &to, &n); + + if (n) return; + n = ppoll(&pfd, 1, rb_hrtime2timespec(&ts, sto), 0); + if (check_signals_nogvl(th, sigwait_fd)) + return; + if (n || (th && RUBY_VM_INTERRUPTED(th->ec))) + return; + if (rel && hrtime_update_expire(&to, end)) + return; + } + } +} + +/* + * we need to guarantee wakeups from native_ppoll_sleep because + * ubf_select may not be going through ubf_list if other threads + * are all sleeping. + */ +static void +ubf_ppoll_sleep(void *ignore) +{ + rb_thread_wakeup_timer_thread_fd(signal_self_pipe.ub_main[1]); +} + +/* + * Single CPU setups benefit from explicit sched_yield() before ppoll(), + * since threads may be too starved to enter the GVL waitqueue for + * us to detect contention. Instead, we want to kick other threads + * so they can run and possibly prevent us from entering slow paths + * in ppoll() or similar syscalls. + * + * Confirmed on FreeBSD 11.2 and Linux 4.19. + * [ruby-core:90417] [Bug #15398] + */ +#define GVL_UNLOCK_BEGIN_YIELD(th) do { \ + const native_thread_data_t *next; \ + rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); \ + RB_GC_SAVE_MACHINE_CONTEXT(th); \ + rb_native_mutex_lock(&gvl->lock); \ + next = gvl_release_common(gvl); \ + rb_native_mutex_unlock(&gvl->lock); \ + if (!next && rb_ractor_living_thread_num(th->ractor) > 1) { \ + native_thread_yield(); \ + } + +/* + * This function does not exclusively acquire sigwait_fd, so it + * cannot safely read from it. However, it can be woken up in + * 4 ways: + * + * 1) ubf_ppoll_sleep (from another thread) + * 2) rb_thread_wakeup_timer_thread (from signal handler) + * 3) any unmasked signal hitting the process + * 4) periodic ubf timer wakeups (after 3) + */ +static void +native_ppoll_sleep(rb_thread_t *th, rb_hrtime_t *rel) +{ + rb_native_mutex_lock(&th->interrupt_lock); + th->unblock.func = ubf_ppoll_sleep; + rb_native_mutex_unlock(&th->interrupt_lock); + + GVL_UNLOCK_BEGIN_YIELD(th); + + if (!RUBY_VM_INTERRUPTED(th->ec)) { + struct pollfd pfd[2]; + struct timespec ts; + + pfd[0].fd = signal_self_pipe.normal[0]; /* sigwait_fd */ + pfd[1].fd = signal_self_pipe.ub_main[0]; + pfd[0].events = pfd[1].events = POLLIN; + if (ppoll(pfd, 2, rb_hrtime2timespec(&ts, rel), 0) > 0) { + if (pfd[1].revents & POLLIN) { + (void)consume_communication_pipe(pfd[1].fd); + } + } + /* + * do not read the sigwait_fd, here, let uplevel callers + * or other threads that, otherwise we may steal and starve + * other threads + */ + } + unblock_function_clear(th); + GVL_UNLOCK_END(th); +} + +static void +native_sleep(rb_thread_t *th, rb_hrtime_t *rel) +{ + int sigwait_fd = rb_sigwait_fd_get(th); + rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); + + if (sigwait_fd >= 0) { + rb_native_mutex_lock(&th->interrupt_lock); + th->unblock.func = ubf_sigwait; + rb_native_mutex_unlock(&th->interrupt_lock); + + GVL_UNLOCK_BEGIN_YIELD(th); + + if (!RUBY_VM_INTERRUPTED(th->ec)) { + rb_sigwait_sleep(th, sigwait_fd, rel); + } + else { + check_signals_nogvl(th, sigwait_fd); + } + unblock_function_clear(th); + GVL_UNLOCK_END(th); + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } + else if (th == th->vm->ractor.main_thread) { /* always able to handle signals */ + native_ppoll_sleep(th, rel); + } + else { + native_cond_sleep(th, rel); + } + + rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__); +} + +#if UBF_TIMER == UBF_TIMER_PTHREAD +static void * +timer_pthread_fn(void *p) +{ + rb_vm_t *vm = p; + pthread_t main_thread_id = vm->ractor.main_thread->thread_id; + struct pollfd pfd; + int timeout = -1; + int ccp; + + pfd.fd = timer_pthread.low[0]; + pfd.events = POLLIN; + + while (system_working > 0) { + (void)poll(&pfd, 1, timeout); + ccp = consume_communication_pipe(pfd.fd); + + if (system_working > 0) { + if (ATOMIC_CAS(timer_pthread.armed, 1, 1)) { + pthread_kill(main_thread_id, SIGVTALRM); + + if (rb_signal_buff_size() || !ubf_threads_empty()) { + timeout = TIME_QUANTUM_MSEC; + } + else { + ATOMIC_SET(timer_pthread.armed, 0); + timeout = -1; + } + } + else if (ccp) { + pthread_kill(main_thread_id, SIGVTALRM); + ATOMIC_SET(timer_pthread.armed, 0); + timeout = -1; + } + } + } + + return 0; +} +#endif /* UBF_TIMER_PTHREAD */ + +static VALUE +ubf_caller(void *ignore) +{ + rb_thread_sleep_forever(); + + return Qfalse; +} + +/* + * Called if and only if one thread is running, and + * the unblock function is NOT async-signal-safe + * This assumes USE_THREAD_CACHE is true for performance reasons + */ +static VALUE +rb_thread_start_unblock_thread(void) +{ + return rb_thread_create(ubf_caller, 0); +} #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ |
