summaryrefslogtreecommitdiff
path: root/thread_pthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_pthread.c')
-rw-r--r--thread_pthread.c2104
1 files changed, 1322 insertions, 782 deletions
diff --git a/thread_pthread.c b/thread_pthread.c
index 5295a52f28..6f0cc3d54c 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -12,6 +12,7 @@
#ifdef THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
#include "gc.h"
+#include "mjit.h"
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
@@ -19,174 +20,399 @@
#ifdef HAVE_THR_STKSEGMENT
#include <thread.h>
#endif
-#if HAVE_FCNTL_H
+#if defined(HAVE_FCNTL_H)
#include <fcntl.h>
-#elif HAVE_SYS_FCNTL_H
+#elif defined(HAVE_SYS_FCNTL_H)
#include <sys/fcntl.h>
#endif
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
-#if defined(__native_client__) && defined(NACL_NEWLIB)
-# include "nacl/select.h"
-#endif
#if defined(HAVE_SYS_TIME_H)
#include <sys/time.h>
#endif
#if defined(__HAIKU__)
#include <kernel/OS.h>
#endif
+#ifdef __linux__
+#include <sys/syscall.h> /* for SYS_gettid */
+#endif
+#include <time.h>
+#include <signal.h>
+
+#if defined(HAVE_SYS_EVENTFD_H) && defined(HAVE_EVENTFD)
+# define USE_EVENTFD (1)
+# include <sys/eventfd.h>
+#else
+# define USE_EVENTFD (0)
+#endif
+
+#if defined(SIGVTALRM) && !defined(__CYGWIN__) && !defined(__EMSCRIPTEN__)
+# define USE_UBF_LIST 1
+#endif
+
+/*
+ * UBF_TIMER and ubf_list both use SIGVTALRM.
+ *
+ * UBF_TIMER has NOTHING to do with thread timeslices (TIMER_INTERRUPT_MASK)
+ *
+ * UBF_TIMER is to close TOCTTOU signal race on programs where we
+ * cannot rely on GVL contention (vm->gvl.timer) to perform wakeups
+ * while a thread is doing blocking I/O on sockets or pipes. With
+ * rb_thread_call_without_gvl and similar functions:
+ *
+ * (1) Check interrupts.
+ * (2) release GVL.
+ * (2a) signal received
+ * (3) call func with data1 (blocks for a long time without ubf_timer)
+ * (4) acquire GVL.
+ * Other Ruby threads can not run in parallel any more.
+ * (5) Check interrupts.
+ *
+ * We need UBF_TIMER to break out of (3) if (2a) happens.
+ *
+ * ubf_list wakeups may be triggered on gvl_yield.
+ *
+ * If we have vm->gvl.timer (on GVL contention), we don't need UBF_TIMER
+ * as it can perform the same tasks while doing timeslices.
+ */
+#define UBF_TIMER_NONE 0
+#define UBF_TIMER_POSIX 1
+#define UBF_TIMER_PTHREAD 2
+
+#ifndef UBF_TIMER
+# if defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_CREATE) && \
+ defined(CLOCK_MONOTONIC) && defined(USE_UBF_LIST)
+ /* preferred */
+# define UBF_TIMER UBF_TIMER_POSIX
+# elif defined(USE_UBF_LIST)
+ /* safe, but inefficient */
+# define UBF_TIMER UBF_TIMER_PTHREAD
+# else
+ /* we'll be racy without SIGVTALRM for ubf_list */
+# define UBF_TIMER UBF_TIMER_NONE
+# endif
+#endif
+
+enum rtimer_state {
+ /* alive, after timer_create: */
+ RTIMER_DISARM,
+ RTIMER_ARMING,
+ RTIMER_ARMED,
+
+ RTIMER_DEAD
+};
-static void native_mutex_lock(rb_nativethread_lock_t *lock);
-static void native_mutex_unlock(rb_nativethread_lock_t *lock);
-static int native_mutex_trylock(rb_nativethread_lock_t *lock);
-static void native_mutex_initialize(rb_nativethread_lock_t *lock);
-static void native_mutex_destroy(rb_nativethread_lock_t *lock);
-static void native_cond_signal(rb_nativethread_cond_t *cond);
-static void native_cond_broadcast(rb_nativethread_cond_t *cond);
-static void native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
-static void native_cond_initialize(rb_nativethread_cond_t *cond, int flags);
-static void native_cond_destroy(rb_nativethread_cond_t *cond);
-static void rb_thread_wakeup_timer_thread_low(void);
+#if UBF_TIMER == UBF_TIMER_POSIX
+static const struct itimerspec zero;
static struct {
- pthread_t id;
- int created;
-} timer_thread;
-#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0)
+ rb_atomic_t state_; /* rtimer_state */
+ rb_pid_t owner;
+ timer_t timerid;
+} timer_posix = {
+ /* .state = */ RTIMER_DEAD,
+};
-#define RB_CONDATTR_CLOCK_MONOTONIC 1
+#define TIMER_STATE_DEBUG 0
-#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && defined(HAVE_CLOCKID_T) && \
- defined(CLOCK_REALTIME) && defined(CLOCK_MONOTONIC) && \
- defined(HAVE_CLOCK_GETTIME) && defined(HAVE_PTHREAD_CONDATTR_INIT)
-#define USE_MONOTONIC_COND 1
+static const char *
+rtimer_state_name(enum rtimer_state state)
+{
+ switch (state) {
+ case RTIMER_DISARM: return "disarm";
+ case RTIMER_ARMING: return "arming";
+ case RTIMER_ARMED: return "armed";
+ case RTIMER_DEAD: return "dead";
+ default: rb_bug("unreachable");
+ }
+}
+
+static enum rtimer_state
+timer_state_exchange(enum rtimer_state state)
+{
+ enum rtimer_state prev = ATOMIC_EXCHANGE(timer_posix.state_, state);
+ if (TIMER_STATE_DEBUG) fprintf(stderr, "state (exc): %s->%s\n", rtimer_state_name(prev), rtimer_state_name(state));
+ return prev;
+}
+
+static enum rtimer_state
+timer_state_cas(enum rtimer_state expected_prev, enum rtimer_state state)
+{
+ enum rtimer_state prev = ATOMIC_CAS(timer_posix.state_, expected_prev, state);
+
+ if (TIMER_STATE_DEBUG) {
+ if (prev == expected_prev) {
+ fprintf(stderr, "state (cas): %s->%s\n", rtimer_state_name(prev), rtimer_state_name(state));
+ }
+ else {
+ fprintf(stderr, "state (cas): %s (expected:%s)\n", rtimer_state_name(prev), rtimer_state_name(expected_prev));
+ }
+ }
+
+ return prev;
+}
+
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+static void *timer_pthread_fn(void *);
+static struct {
+ int low[2];
+ rb_atomic_t armed; /* boolean */
+ rb_pid_t owner;
+ pthread_t thid;
+} timer_pthread = {
+ { -1, -1 },
+};
+#endif
+
+static const rb_hrtime_t *sigwait_timeout(rb_thread_t *, int sigwait_fd,
+ const rb_hrtime_t *,
+ int *drained_p);
+static void ubf_timer_disarm(void);
+static void threadptr_trap_interrupt(rb_thread_t *);
+static void clear_thread_cache_altstack(void);
+static void ubf_wakeup_all_threads(void);
+static int ubf_threads_empty(void);
+
+#define TIMER_THREAD_CREATED_P() (signal_self_pipe.owner_process == getpid())
+
+/* for testing, and in case we come across a platform w/o pipes: */
+#define BUSY_WAIT_SIGNALS (0)
+
+/*
+ * sigwait_th is the thread which owns sigwait_fd and sleeps on it
+ * (using ppoll). MJIT worker can be sigwait_th==0, so we initialize
+ * it to THREAD_INVALID at startup and fork time. It is the ONLY thread
+ * allowed to read from sigwait_fd, otherwise starvation can occur.
+ */
+#define THREAD_INVALID ((const rb_thread_t *)-1)
+static const rb_thread_t *sigwait_th;
+
+#ifdef HAVE_SCHED_YIELD
+#define native_thread_yield() (void)sched_yield()
#else
-#define USE_MONOTONIC_COND 0
+#define native_thread_yield() ((void)0)
#endif
-#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) && defined(O_NONBLOCK) && !defined(__native_client__)
-/* The timer thread sleeps while only one Ruby thread is running. */
-# define USE_SLEEPY_TIMER_THREAD 1
+#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK) && \
+ defined(CLOCK_REALTIME) && defined(CLOCK_MONOTONIC) && \
+ defined(HAVE_CLOCK_GETTIME)
+static pthread_condattr_t condattr_mono;
+static pthread_condattr_t *condattr_monotonic = &condattr_mono;
#else
-# define USE_SLEEPY_TIMER_THREAD 0
+static const void *const condattr_monotonic = NULL;
#endif
+/* 100ms. 10ms is too small for user level thread scheduling
+ * on recent Linux (tested on 2.6.35)
+ */
+#define TIME_QUANTUM_MSEC (100)
+#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000)
+#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000)
+
+static rb_hrtime_t native_cond_timeout(rb_nativethread_cond_t *, rb_hrtime_t);
+static int native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs);
+
+/*
+ * Designate the next gvl.timer thread, favor the last thread in
+ * the waitq since it will be in waitq longest
+ */
+static int
+designate_timer_thread(rb_global_vm_lock_t *gvl)
+{
+ native_thread_data_t *last;
+
+ last = list_tail(&gvl->waitq, native_thread_data_t, node.ubf);
+ if (last) {
+ rb_native_cond_signal(&last->cond.gvlq);
+ return TRUE;
+ }
+ return FALSE;
+}
+
+/*
+ * We become designated timer thread to kick vm->gvl.owner
+ * periodically. Continue on old timeout if it expired.
+ */
static void
-gvl_acquire_common(rb_vm_t *vm)
+do_gvl_timer(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
- if (vm->gvl.acquired) {
+ rb_vm_t *vm = GET_VM();
+ static rb_hrtime_t abs;
+ native_thread_data_t *nd = &th->native_thread_data;
- vm->gvl.waiting++;
- if (vm->gvl.waiting == 1) {
- /*
- * Wake up timer thread iff timer thread is slept.
- * When timer thread is polling mode, we don't want to
- * make confusing timer thread interval time.
- */
- rb_thread_wakeup_timer_thread_low();
- }
+ gvl->timer = th;
- while (vm->gvl.acquired) {
- native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
- }
+ /* take over wakeups from UBF_TIMER */
+ ubf_timer_disarm();
+
+ if (gvl->timer_err == ETIMEDOUT) {
+ abs = native_cond_timeout(&nd->cond.gvlq, TIME_QUANTUM_NSEC);
+ }
+ gvl->timer_err = native_cond_timedwait(&nd->cond.gvlq, &gvl->lock, &abs);
- vm->gvl.waiting--;
+ ubf_wakeup_all_threads();
+ ruby_sigchld_handler(vm);
- if (vm->gvl.need_yield) {
- vm->gvl.need_yield = 0;
- native_cond_signal(&vm->gvl.switch_cond);
- }
+ if (UNLIKELY(rb_signal_buff_size())) {
+ if (th == vm->ractor.main_thread) {
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ }
+ else {
+ threadptr_trap_interrupt(vm->ractor.main_thread);
+ }
}
- vm->gvl.acquired = 1;
+ /*
+ * Timeslice. Warning: the process may fork while this
+ * thread is contending for GVL:
+ */
+ if (gvl->owner) {
+ // strictly speaking, accessing "gvl->owner" is not thread-safe
+ RUBY_VM_SET_TIMER_INTERRUPT(gvl->owner->ec);
+ }
+ gvl->timer = 0;
}
static void
-gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
+gvl_acquire_common(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
- native_mutex_lock(&vm->gvl.lock);
- gvl_acquire_common(vm);
- native_mutex_unlock(&vm->gvl.lock);
+ if (gvl->owner) {
+ native_thread_data_t *nd = &th->native_thread_data;
+
+ VM_ASSERT(th->unblock.func == 0 &&
+ "we must not be in ubf_list and GVL waitq at the same time");
+
+ list_add_tail(&gvl->waitq, &nd->node.gvl);
+
+ do {
+ if (!gvl->timer) {
+ do_gvl_timer(gvl, th);
+ }
+ else {
+ rb_native_cond_wait(&nd->cond.gvlq, &gvl->lock);
+ }
+ } while (gvl->owner);
+
+ list_del_init(&nd->node.gvl);
+
+ if (gvl->need_yield) {
+ gvl->need_yield = 0;
+ rb_native_cond_signal(&gvl->switch_cond);
+ }
+ }
+ else { /* reset timer if uncontended */
+ gvl->timer_err = ETIMEDOUT;
+ }
+ gvl->owner = th;
+ if (!gvl->timer) {
+ if (!designate_timer_thread(gvl) && !ubf_threads_empty()) {
+ rb_thread_wakeup_timer_thread(-1);
+ }
+ }
}
static void
-gvl_release_common(rb_vm_t *vm)
+gvl_acquire(rb_global_vm_lock_t *gvl, rb_thread_t *th)
+{
+ rb_native_mutex_lock(&gvl->lock);
+ gvl_acquire_common(gvl, th);
+ rb_native_mutex_unlock(&gvl->lock);
+}
+
+static const native_thread_data_t *
+gvl_release_common(rb_global_vm_lock_t *gvl)
{
- vm->gvl.acquired = 0;
- if (vm->gvl.waiting > 0)
- native_cond_signal(&vm->gvl.cond);
+ native_thread_data_t *next;
+ gvl->owner = 0;
+ next = list_top(&gvl->waitq, native_thread_data_t, node.ubf);
+ if (next) rb_native_cond_signal(&next->cond.gvlq);
+
+ return next;
}
static void
-gvl_release(rb_vm_t *vm)
+gvl_release(rb_global_vm_lock_t *gvl)
{
- native_mutex_lock(&vm->gvl.lock);
- gvl_release_common(vm);
- native_mutex_unlock(&vm->gvl.lock);
+ rb_native_mutex_lock(&gvl->lock);
+ gvl_release_common(gvl);
+ rb_native_mutex_unlock(&gvl->lock);
}
static void
-gvl_yield(rb_vm_t *vm, rb_thread_t *th)
+gvl_yield(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
- native_mutex_lock(&vm->gvl.lock);
+ const native_thread_data_t *next;
- gvl_release_common(vm);
+ /*
+ * Perhaps other threads are stuck in blocking region w/o GVL, too,
+ * (perhaps looping in io_close_fptr) so we kick them:
+ */
+ ubf_wakeup_all_threads();
+ rb_native_mutex_lock(&gvl->lock);
+ next = gvl_release_common(gvl);
/* An another thread is processing GVL yield. */
- if (UNLIKELY(vm->gvl.wait_yield)) {
- while (vm->gvl.wait_yield)
- native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
- goto acquire;
- }
-
- if (vm->gvl.waiting > 0) {
- /* Wait until another thread task take GVL. */
- vm->gvl.need_yield = 1;
- vm->gvl.wait_yield = 1;
- while (vm->gvl.need_yield)
- native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
- vm->gvl.wait_yield = 0;
+ if (UNLIKELY(gvl->wait_yield)) {
+ while (gvl->wait_yield)
+ rb_native_cond_wait(&gvl->switch_wait_cond, &gvl->lock);
+ }
+ else if (next) {
+ /* Wait until another thread task takes GVL. */
+ gvl->need_yield = 1;
+ gvl->wait_yield = 1;
+ while (gvl->need_yield)
+ rb_native_cond_wait(&gvl->switch_cond, &gvl->lock);
+ gvl->wait_yield = 0;
+ rb_native_cond_broadcast(&gvl->switch_wait_cond);
}
else {
- native_mutex_unlock(&vm->gvl.lock);
- sched_yield();
- native_mutex_lock(&vm->gvl.lock);
+ rb_native_mutex_unlock(&gvl->lock);
+ native_thread_yield();
+ rb_native_mutex_lock(&gvl->lock);
+ rb_native_cond_broadcast(&gvl->switch_wait_cond);
}
-
- native_cond_broadcast(&vm->gvl.switch_wait_cond);
- acquire:
- gvl_acquire_common(vm);
- native_mutex_unlock(&vm->gvl.lock);
+ gvl_acquire_common(gvl, th);
+ rb_native_mutex_unlock(&gvl->lock);
}
-static void
-gvl_init(rb_vm_t *vm)
+void
+rb_gvl_init(rb_global_vm_lock_t *gvl)
{
- native_mutex_initialize(&vm->gvl.lock);
- native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC);
- native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC);
- native_cond_initialize(&vm->gvl.switch_wait_cond, RB_CONDATTR_CLOCK_MONOTONIC);
- vm->gvl.acquired = 0;
- vm->gvl.waiting = 0;
- vm->gvl.need_yield = 0;
- vm->gvl.wait_yield = 0;
+ rb_native_mutex_initialize(&gvl->lock);
+ rb_native_cond_initialize(&gvl->switch_cond);
+ rb_native_cond_initialize(&gvl->switch_wait_cond);
+ list_head_init(&gvl->waitq);
+ gvl->owner = 0;
+ gvl->timer = 0;
+ gvl->timer_err = ETIMEDOUT;
+ gvl->need_yield = 0;
+ gvl->wait_yield = 0;
}
static void
-gvl_destroy(rb_vm_t *vm)
+gvl_destroy(rb_global_vm_lock_t *gvl)
{
- native_cond_destroy(&vm->gvl.switch_wait_cond);
- native_cond_destroy(&vm->gvl.switch_cond);
- native_cond_destroy(&vm->gvl.cond);
- native_mutex_destroy(&vm->gvl.lock);
+ /*
+ * only called once at VM shutdown (not atfork), another thread
+ * may still grab vm->gvl.lock when calling gvl_release at
+ * the end of thread_start_func_2
+ */
+ if (0) {
+ rb_native_cond_destroy(&gvl->switch_wait_cond);
+ rb_native_cond_destroy(&gvl->switch_cond);
+ rb_native_mutex_destroy(&gvl->lock);
+ }
+ clear_thread_cache_altstack();
}
#if defined(HAVE_WORKING_FORK)
+static void thread_cache_reset(void);
static void
-gvl_atfork(rb_vm_t *vm)
+gvl_atfork(rb_global_vm_lock_t *gvl)
{
- gvl_init(vm);
- gvl_acquire(vm, GET_THREAD());
+ thread_cache_reset();
+ rb_gvl_init(gvl);
+ gvl_acquire(gvl, GET_THREAD());
}
#endif
@@ -205,8 +431,8 @@ mutex_debug(const char *msg, void *lock)
}
}
-static void
-native_mutex_lock(pthread_mutex_t *lock)
+void
+rb_native_mutex_lock(pthread_mutex_t *lock)
{
int r;
mutex_debug("lock", lock);
@@ -215,8 +441,8 @@ native_mutex_lock(pthread_mutex_t *lock)
}
}
-static void
-native_mutex_unlock(pthread_mutex_t *lock)
+void
+rb_native_mutex_unlock(pthread_mutex_t *lock)
{
int r;
mutex_debug("unlock", lock);
@@ -225,8 +451,8 @@ native_mutex_unlock(pthread_mutex_t *lock)
}
}
-static inline int
-native_mutex_trylock(pthread_mutex_t *lock)
+int
+rb_native_mutex_trylock(pthread_mutex_t *lock)
{
int r;
mutex_debug("trylock", lock);
@@ -241,8 +467,8 @@ native_mutex_trylock(pthread_mutex_t *lock)
return 0;
}
-static void
-native_mutex_initialize(pthread_mutex_t *lock)
+void
+rb_native_mutex_initialize(pthread_mutex_t *lock)
{
int r = pthread_mutex_init(lock, 0);
mutex_debug("init", lock);
@@ -251,8 +477,8 @@ native_mutex_initialize(pthread_mutex_t *lock)
}
}
-static void
-native_mutex_destroy(pthread_mutex_t *lock)
+void
+rb_native_mutex_destroy(pthread_mutex_t *lock)
{
int r = pthread_mutex_destroy(lock);
mutex_debug("destroy", lock);
@@ -261,46 +487,22 @@ native_mutex_destroy(pthread_mutex_t *lock)
}
}
-static void
-native_cond_initialize(rb_nativethread_cond_t *cond, int flags)
+void
+rb_native_cond_initialize(rb_nativethread_cond_t *cond)
{
-#ifdef HAVE_PTHREAD_COND_INIT
- int r;
-# if USE_MONOTONIC_COND
- pthread_condattr_t attr;
-
- pthread_condattr_init(&attr);
-
- cond->clockid = CLOCK_REALTIME;
- if (flags & RB_CONDATTR_CLOCK_MONOTONIC) {
- r = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
- if (r == 0) {
- cond->clockid = CLOCK_MONOTONIC;
- }
- }
-
- r = pthread_cond_init(&cond->cond, &attr);
- pthread_condattr_destroy(&attr);
-# else
- r = pthread_cond_init(&cond->cond, NULL);
-# endif
+ int r = pthread_cond_init(cond, condattr_monotonic);
if (r != 0) {
rb_bug_errno("pthread_cond_init", r);
}
-
- return;
-#endif
}
-static void
-native_cond_destroy(rb_nativethread_cond_t *cond)
+void
+rb_native_cond_destroy(rb_nativethread_cond_t *cond)
{
-#ifdef HAVE_PTHREAD_COND_INIT
- int r = pthread_cond_destroy(&cond->cond);
+ int r = pthread_cond_destroy(cond);
if (r != 0) {
rb_bug_errno("pthread_cond_destroy", r);
}
-#endif
}
/*
@@ -309,47 +511,48 @@ native_cond_destroy(rb_nativethread_cond_t *cond)
*
* http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
*
- * The following native_cond_signal and native_cond_broadcast functions
+ * The following rb_native_cond_signal and rb_native_cond_broadcast functions
* need to retrying until pthread functions don't return EAGAIN.
*/
-static void
-native_cond_signal(rb_nativethread_cond_t *cond)
+void
+rb_native_cond_signal(rb_nativethread_cond_t *cond)
{
int r;
do {
- r = pthread_cond_signal(&cond->cond);
+ r = pthread_cond_signal(cond);
} while (r == EAGAIN);
if (r != 0) {
rb_bug_errno("pthread_cond_signal", r);
}
}
-static void
-native_cond_broadcast(rb_nativethread_cond_t *cond)
+void
+rb_native_cond_broadcast(rb_nativethread_cond_t *cond)
{
int r;
do {
- r = pthread_cond_broadcast(&cond->cond);
+ r = pthread_cond_broadcast(cond);
} while (r == EAGAIN);
if (r != 0) {
- rb_bug_errno("native_cond_broadcast", r);
+ rb_bug_errno("rb_native_cond_broadcast", r);
}
}
-static void
-native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex)
+void
+rb_native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex)
{
- int r = pthread_cond_wait(&cond->cond, mutex);
+ int r = pthread_cond_wait(cond, mutex);
if (r != 0) {
rb_bug_errno("pthread_cond_wait", r);
}
}
static int
-native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *ts)
+native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs)
{
int r;
+ struct timespec ts;
/*
* An old Linux may return EINTR. Even though POSIX says
@@ -358,75 +561,46 @@ native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, cons
* Let's hide it from arch generic code.
*/
do {
- r = pthread_cond_timedwait(&cond->cond, mutex, ts);
+ rb_hrtime2timespec(&ts, abs);
+ r = pthread_cond_timedwait(cond, mutex, &ts);
} while (r == EINTR);
if (r != 0 && r != ETIMEDOUT) {
- rb_bug_errno("pthread_cond_timedwait", r);
+ rb_bug_errno("pthread_cond_timedwait", r);
}
return r;
}
-static struct timespec
-native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel)
+void
+rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, unsigned long msec)
{
- int ret;
- struct timeval tv;
- struct timespec timeout;
- struct timespec now;
+ rb_hrtime_t hrmsec = native_cond_timeout(cond, RB_HRTIME_PER_MSEC * msec);
+ native_cond_timedwait(cond, mutex, &hrmsec);
+}
-#if USE_MONOTONIC_COND
- if (cond->clockid == CLOCK_MONOTONIC) {
- ret = clock_gettime(cond->clockid, &now);
- if (ret != 0)
- rb_sys_fail("clock_gettime()");
- goto out;
+static rb_hrtime_t
+native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel)
+{
+ if (condattr_monotonic) {
+ return rb_hrtime_add(rb_hrtime_now(), rel);
}
+ else {
+ struct timespec ts;
- if (cond->clockid != CLOCK_REALTIME)
- rb_bug("unsupported clockid %"PRIdVALUE, (SIGNED_VALUE)cond->clockid);
-#endif
-
- ret = gettimeofday(&tv, 0);
- if (ret != 0)
- rb_sys_fail(0);
- now.tv_sec = tv.tv_sec;
- now.tv_nsec = tv.tv_usec * 1000;
-
-#if USE_MONOTONIC_COND
- out:
-#endif
- timeout.tv_sec = now.tv_sec;
- timeout.tv_nsec = now.tv_nsec;
- timeout.tv_sec += timeout_rel.tv_sec;
- timeout.tv_nsec += timeout_rel.tv_nsec;
-
- if (timeout.tv_nsec >= 1000*1000*1000) {
- timeout.tv_sec++;
- timeout.tv_nsec -= 1000*1000*1000;
+ rb_timespec_now(&ts);
+ return rb_hrtime_add(rb_timespec2hrtime(&ts), rel);
}
-
- if (timeout.tv_sec < now.tv_sec)
- timeout.tv_sec = TIMET_MAX;
-
- return timeout;
}
#define native_cleanup_push pthread_cleanup_push
#define native_cleanup_pop pthread_cleanup_pop
-#ifdef HAVE_SCHED_YIELD
-#define native_thread_yield() (void)sched_yield()
-#else
-#define native_thread_yield() ((void)0)
-#endif
-
-#if defined(SIGVTALRM) && !defined(__CYGWIN__)
-#define USE_UBF_LIST 1
-static rb_nativethread_lock_t ubf_list_lock;
-#endif
+#ifdef RB_THREAD_LOCAL_SPECIFIER
+static RB_THREAD_LOCAL_SPECIFIER rb_thread_t *ruby_native_thread;
+#else
static pthread_key_t ruby_native_thread_key;
+#endif
static void
null_func(int i)
@@ -434,61 +608,111 @@ null_func(int i)
/* null */
}
-static rb_thread_t *
+rb_thread_t *
ruby_thread_from_native(void)
{
+#ifdef RB_THREAD_LOCAL_SPECIFIER
+ return ruby_native_thread;
+#else
return pthread_getspecific(ruby_native_thread_key);
+#endif
}
-static int
+int
ruby_thread_set_native(rb_thread_t *th)
{
+ if (th && th->ec) {
+ rb_ractor_set_current_ec(th->ractor, th->ec);
+ }
+#ifdef RB_THREAD_LOCAL_SPECIFIER
+ ruby_native_thread = th;
+ return 1;
+#else
return pthread_setspecific(ruby_native_thread_key, th) == 0;
+#endif
}
static void native_thread_init(rb_thread_t *th);
void
-Init_native_thread(void)
+Init_native_thread(rb_thread_t *th)
{
- rb_thread_t *th = GET_THREAD();
+#if defined(HAVE_PTHREAD_CONDATTR_SETCLOCK)
+ if (condattr_monotonic) {
+ int r = pthread_condattr_init(condattr_monotonic);
+ if (r == 0) {
+ r = pthread_condattr_setclock(condattr_monotonic, CLOCK_MONOTONIC);
+ }
+ if (r) condattr_monotonic = NULL;
+ }
+#endif
- pthread_key_create(&ruby_native_thread_key, NULL);
+#ifndef RB_THREAD_LOCAL_SPECIFIER
+ if (pthread_key_create(&ruby_native_thread_key, 0) == EAGAIN) {
+ rb_bug("pthread_key_create failed (ruby_native_thread_key)");
+ }
+ if (pthread_key_create(&ruby_current_ec_key, 0) == EAGAIN) {
+ rb_bug("pthread_key_create failed (ruby_current_ec_key)");
+ }
+#endif
th->thread_id = pthread_self();
+ ruby_thread_set_native(th);
fill_thread_id_str(th);
native_thread_init(th);
-#ifdef USE_UBF_LIST
- native_mutex_initialize(&ubf_list_lock);
-#endif
-#ifndef __native_client__
posix_signal(SIGVTALRM, null_func);
+}
+
+#ifdef RB_THREAD_T_HAS_NATIVE_ID
+static int
+get_native_thread_id(void)
+{
+#ifdef __linux__
+ return (int)syscall(SYS_gettid);
+#elif defined(__FreeBSD__)
+ return pthread_getthreadid_np();
#endif
}
+#endif
static void
native_thread_init(rb_thread_t *th)
{
native_thread_data_t *nd = &th->native_thread_data;
+#ifdef RB_THREAD_T_HAS_NATIVE_ID
+ th->tid = get_native_thread_id();
+#endif
#ifdef USE_UBF_LIST
- list_node_init(&nd->ubf_list);
+ list_node_init(&nd->node.ubf);
#endif
- native_cond_initialize(&nd->sleep_cond, RB_CONDATTR_CLOCK_MONOTONIC);
- ruby_thread_set_native(th);
+ rb_native_cond_initialize(&nd->cond.gvlq);
+ if (&nd->cond.gvlq != &nd->cond.intr)
+ rb_native_cond_initialize(&nd->cond.intr);
}
+#ifndef USE_THREAD_CACHE
+#define USE_THREAD_CACHE 1
+#endif
+
static void
native_thread_destroy(rb_thread_t *th)
{
- native_cond_destroy(&th->native_thread_data.sleep_cond);
-}
+ native_thread_data_t *nd = &th->native_thread_data;
-#ifndef USE_THREAD_CACHE
-#define USE_THREAD_CACHE 0
-#endif
+ rb_native_cond_destroy(&nd->cond.gvlq);
+ if (&nd->cond.gvlq != &nd->cond.intr)
+ rb_native_cond_destroy(&nd->cond.intr);
+
+ /*
+ * prevent false positive from ruby_thread_has_gvl_p if that
+ * gets called from an interposing function wrapper
+ */
+ if (USE_THREAD_CACHE)
+ ruby_thread_set_native(0);
+}
#if USE_THREAD_CACHE
-static rb_thread_t *register_cached_thread_and_wait(void);
+static rb_thread_t *register_cached_thread_and_wait(void *);
#endif
#if defined HAVE_PTHREAD_GETATTR_NP || defined HAVE_PTHREAD_ATTR_GET_NP
@@ -505,51 +729,6 @@ size_t pthread_get_stacksize_np(pthread_t);
#define STACKADDR_AVAILABLE 1
#elif defined __HAIKU__
#define STACKADDR_AVAILABLE 1
-#elif defined __ia64 && defined _HPUX_SOURCE
-#include <sys/dyntune.h>
-
-#define STACKADDR_AVAILABLE 1
-
-/*
- * Do not lower the thread's stack to PTHREAD_STACK_MIN,
- * otherwise one would receive a 'sendsig: useracc failed.'
- * and a coredump.
- */
-#undef PTHREAD_STACK_MIN
-
-#define HAVE_PTHREAD_ATTR_GET_NP 1
-#undef HAVE_PTHREAD_ATTR_GETSTACK
-
-/*
- * As the PTHREAD_STACK_MIN is undefined and
- * no one touches the default stacksize,
- * it is just fine to use the default.
- */
-#define pthread_attr_get_np(thid, attr) 0
-
-/*
- * Using value of sp is very rough... To make it more real,
- * addr would need to be aligned to vps_pagesize.
- * The vps_pagesize is 'Default user page size (kBytes)'
- * and could be retrieved by gettune().
- */
-static int
-hpux_attr_getstackaddr(const pthread_attr_t *attr, void **addr)
-{
- static uint64_t pagesize;
- size_t size;
-
- if (!pagesize) {
- if (gettune("vps_pagesize", &pagesize)) {
- pagesize = 16;
- }
- pagesize *= 1024;
- }
- pthread_attr_getstacksize(attr, &size);
- *addr = (void *)((size_t)((char *)_Asm_get_sp() - size) & ~(pagesize - 1));
- return 0;
-}
-#define pthread_attr_getstackaddr(attr, addr) hpux_attr_getstackaddr(attr, addr)
#endif
#ifndef MAINSTACKADDR_AVAILABLE
@@ -584,7 +763,11 @@ get_stack(void **addr, size_t *size)
CHECK_ERR(pthread_attr_getstackaddr(&attr, addr));
CHECK_ERR(pthread_attr_getstacksize(&attr, size));
# endif
+# ifdef HAVE_PTHREAD_ATTR_GETGUARDSIZE
CHECK_ERR(pthread_attr_getguardsize(&attr, &guard));
+# else
+ guard = getpagesize();
+# endif
*size -= guard;
pthread_attr_destroy(&attr);
#elif defined HAVE_PTHREAD_ATTR_GET_NP /* FreeBSD, DragonFly BSD, NetBSD */
@@ -645,9 +828,6 @@ static struct {
rb_nativethread_id_t id;
size_t stack_maxsize;
VALUE *stack_start;
-#ifdef __ia64
- VALUE *register_stack_start;
-#endif
} native_main_thread;
#ifdef STACK_END_ADDRESS
@@ -728,24 +908,11 @@ reserve_stack(volatile char *limit, size_t size)
#endif
#undef ruby_init_stack
-/* Set stack bottom of Ruby implementation.
- *
- * You must call this function before any heap allocation by Ruby implementation.
- * Or GC will break living objects */
void
-ruby_init_stack(volatile VALUE *addr
-#ifdef __ia64
- , void *bsp
-#endif
- )
+ruby_init_stack(volatile VALUE *addr)
{
native_main_thread.id = pthread_self();
-#ifdef __ia64
- if (!native_main_thread.register_stack_start ||
- (VALUE*)bsp < native_main_thread.register_stack_start) {
- native_main_thread.register_stack_start = (VALUE*)bsp;
- }
-#endif
+
#if MAINSTACKADDR_AVAILABLE
if (native_main_thread.stack_maxsize) return;
{
@@ -832,8 +999,8 @@ native_thread_init_stack(rb_thread_t *th)
rb_nativethread_id_t curr = pthread_self();
if (pthread_equal(curr, native_main_thread.id)) {
- th->machine.stack_start = native_main_thread.stack_start;
- th->machine.stack_maxsize = native_main_thread.stack_maxsize;
+ th->ec->machine.stack_start = native_main_thread.stack_start;
+ th->ec->machine.stack_maxsize = native_main_thread.stack_maxsize;
}
else {
#ifdef STACKADDR_AVAILABLE
@@ -842,23 +1009,14 @@ native_thread_init_stack(rb_thread_t *th)
if (get_stack(&start, &size) == 0) {
uintptr_t diff = (uintptr_t)start - (uintptr_t)&curr;
- th->machine.stack_start = (VALUE *)&curr;
- th->machine.stack_maxsize = size - diff;
- }
-#elif defined get_stack_of
- if (!th->machine.stack_maxsize) {
- native_mutex_lock(&th->interrupt_lock);
- native_mutex_unlock(&th->interrupt_lock);
+ th->ec->machine.stack_start = (VALUE *)&curr;
+ th->ec->machine.stack_maxsize = size - diff;
}
#else
rb_raise(rb_eNotImpError, "ruby engine can initialize only in the main thread");
#endif
}
-#ifdef __ia64
- th->machine.register_stack_start = native_main_thread.register_stack_start;
- th->machine.stack_maxsize /= 2;
- th->machine.register_stack_maxsize = th->machine.stack_maxsize;
-#endif
+
return 0;
}
@@ -869,11 +1027,12 @@ native_thread_init_stack(rb_thread_t *th)
static void *
thread_start_func_1(void *th_ptr)
{
+ rb_thread_t *th = th_ptr;
+ RB_ALTSTACK_INIT(void *altstack, th->altstack);
#if USE_THREAD_CACHE
thread_start:
#endif
{
- rb_thread_t *th = th_ptr;
#if !defined USE_NATIVE_THREAD_INIT
VALUE stack_start;
#endif
@@ -885,108 +1044,124 @@ thread_start_func_1(void *th_ptr)
native_thread_init(th);
/* run */
#if defined USE_NATIVE_THREAD_INIT
- thread_start_func_2(th, th->machine.stack_start, rb_ia64_bsp());
+ thread_start_func_2(th, th->ec->machine.stack_start);
#else
- thread_start_func_2(th, &stack_start, rb_ia64_bsp());
+ thread_start_func_2(th, &stack_start);
#endif
}
#if USE_THREAD_CACHE
- if (1) {
- /* cache thread */
- rb_thread_t *th;
- if ((th = register_cached_thread_and_wait()) != 0) {
- th_ptr = (void *)th;
- th->thread_id = pthread_self();
- goto thread_start;
- }
+ /* cache thread */
+ if ((th = register_cached_thread_and_wait(RB_ALTSTACK(altstack))) != 0) {
+ goto thread_start;
}
+#else
+ RB_ALTSTACK_FREE(altstack);
#endif
return 0;
}
struct cached_thread_entry {
- volatile rb_thread_t **th_area;
- rb_nativethread_cond_t *cond;
- struct cached_thread_entry *next;
+ rb_nativethread_cond_t cond;
+ rb_nativethread_id_t thread_id;
+ rb_thread_t *th;
+ void *altstack;
+ struct list_node node;
};
-
#if USE_THREAD_CACHE
static rb_nativethread_lock_t thread_cache_lock = RB_NATIVETHREAD_LOCK_INIT;
-struct cached_thread_entry *cached_thread_root;
+static LIST_HEAD(cached_thread_head);
-static rb_thread_t *
-register_cached_thread_and_wait(void)
+# if defined(HAVE_WORKING_FORK)
+static void
+thread_cache_reset(void)
{
- rb_nativethread_cond_t cond = RB_NATIVETHREAD_COND_INIT;
- volatile rb_thread_t *th_area = 0;
- struct timeval tv;
- struct timespec ts;
- struct cached_thread_entry *entry =
- (struct cached_thread_entry *)malloc(sizeof(struct cached_thread_entry));
+ rb_native_mutex_initialize(&thread_cache_lock);
+ list_head_init(&cached_thread_head);
+}
+# endif
- if (entry == 0) {
- return 0; /* failed -> terminate thread immediately */
- }
+/*
+ * number of seconds to cache for, I think 1-5s is sufficient to obviate
+ * the need for thread pool in many network programs (taking into account
+ * worst case network latency across the globe) without wasting memory
+ */
+#ifndef THREAD_CACHE_TIME
+# define THREAD_CACHE_TIME ((rb_hrtime_t)3 * RB_HRTIME_PER_SEC)
+#endif
+
+static rb_thread_t *
+register_cached_thread_and_wait(void *altstack)
+{
+ rb_hrtime_t end = THREAD_CACHE_TIME;
+ struct cached_thread_entry entry;
- gettimeofday(&tv, 0);
- ts.tv_sec = tv.tv_sec + 60;
- ts.tv_nsec = tv.tv_usec * 1000;
+ rb_native_cond_initialize(&entry.cond);
+ entry.altstack = altstack;
+ entry.th = NULL;
+ entry.thread_id = pthread_self();
+ end = native_cond_timeout(&entry.cond, end);
- native_mutex_lock(&thread_cache_lock);
+ rb_native_mutex_lock(&thread_cache_lock);
{
- entry->th_area = &th_area;
- entry->cond = &cond;
- entry->next = cached_thread_root;
- cached_thread_root = entry;
-
- native_cond_timedwait(&cond, &thread_cache_lock, &ts);
-
- {
- struct cached_thread_entry *e, **prev = &cached_thread_root;
-
- while ((e = *prev) != 0) {
- if (e == entry) {
- *prev = e->next;
- break;
- }
- prev = &e->next;
- }
- }
+ list_add(&cached_thread_head, &entry.node);
+
+ native_cond_timedwait(&entry.cond, &thread_cache_lock, &end);
- free(entry); /* ok */
- native_cond_destroy(&cond);
+ if (entry.th == NULL) { /* unused */
+ list_del(&entry.node);
+ }
+ }
+ rb_native_mutex_unlock(&thread_cache_lock);
+
+ rb_native_cond_destroy(&entry.cond);
+ if (!entry.th) {
+ RB_ALTSTACK_FREE(entry.altstack);
}
- native_mutex_unlock(&thread_cache_lock);
- return (rb_thread_t *)th_area;
+ return entry.th;
}
+#else
+# if defined(HAVE_WORKING_FORK)
+static void thread_cache_reset(void) { }
+# endif
#endif
static int
use_cached_thread(rb_thread_t *th)
{
- int result = 0;
#if USE_THREAD_CACHE
struct cached_thread_entry *entry;
- if (cached_thread_root) {
- native_mutex_lock(&thread_cache_lock);
- entry = cached_thread_root;
- {
- if (cached_thread_root) {
- cached_thread_root = entry->next;
- *entry->th_area = th;
- result = 1;
- }
- }
- if (result) {
- native_cond_signal(entry->cond);
- }
- native_mutex_unlock(&thread_cache_lock);
+ rb_native_mutex_lock(&thread_cache_lock);
+ entry = list_pop(&cached_thread_head, struct cached_thread_entry, node);
+ if (entry) {
+ entry->th = th;
+ /* th->thread_id must be set before signal for Thread#name= */
+ th->thread_id = entry->thread_id;
+ fill_thread_id_str(th);
+ rb_native_cond_signal(&entry->cond);
}
+ rb_native_mutex_unlock(&thread_cache_lock);
+ return !!entry;
+#endif
+ return 0;
+}
+
+static void
+clear_thread_cache_altstack(void)
+{
+#if USE_THREAD_CACHE
+ struct cached_thread_entry *entry;
+
+ rb_native_mutex_lock(&thread_cache_lock);
+ list_for_each(&cached_thread_head, entry, node) {
+ void MAYBE_UNUSED(*altstack) = entry->altstack;
+ entry->altstack = 0;
+ RB_ALTSTACK_FREE(altstack);
+ }
+ rb_native_mutex_unlock(&thread_cache_lock);
#endif
- return result;
}
static int
@@ -998,22 +1173,15 @@ native_thread_create(rb_thread_t *th)
thread_debug("create (use cached thread): %p\n", (void *)th);
}
else {
-#ifdef HAVE_PTHREAD_ATTR_INIT
pthread_attr_t attr;
- pthread_attr_t *const attrp = &attr;
-#else
- pthread_attr_t *const attrp = NULL;
-#endif
- const size_t stack_size = th->vm->default_params.thread_machine_stack_size;
+ const size_t stack_size = th->vm->default_params.thread_machine_stack_size + th->vm->default_params.thread_vm_stack_size;
const size_t space = space_size(stack_size);
- th->machine.stack_maxsize = stack_size - space;
-#ifdef __ia64
- th->machine.stack_maxsize /= 2;
- th->machine.register_stack_maxsize = th->machine.stack_maxsize;
+#ifdef USE_SIGALTSTACK
+ th->altstack = rb_allocate_sigaltstack();
#endif
+ th->ec->machine.stack_maxsize = stack_size - space;
-#ifdef HAVE_PTHREAD_ATTR_INIT
CHECK_ERR(pthread_attr_init(&attr));
# ifdef PTHREAD_STACK_MIN
@@ -1025,41 +1193,16 @@ native_thread_create(rb_thread_t *th)
CHECK_ERR(pthread_attr_setinheritsched(&attr, PTHREAD_INHERIT_SCHED));
# endif
CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));
-#endif
-#ifdef get_stack_of
- native_mutex_lock(&th->interrupt_lock);
-#endif
- err = pthread_create(&th->thread_id, attrp, thread_start_func_1, th);
-#ifdef get_stack_of
- if (!err) {
- get_stack_of(th->thread_id,
- &th->machine.stack_start,
- &th->machine.stack_maxsize);
- }
- native_mutex_unlock(&th->interrupt_lock);
-#endif
+
+ err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th);
thread_debug("create: %p (%d)\n", (void *)th, err);
/* should be done in the created thread */
fill_thread_id_str(th);
-#ifdef HAVE_PTHREAD_ATTR_INIT
CHECK_ERR(pthread_attr_destroy(&attr));
-#endif
}
return err;
}
-#if USE_SLEEPY_TIMER_THREAD
-static void
-native_thread_join(pthread_t th)
-{
- int err = pthread_join(th, 0);
- if (err) {
- rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
- }
-}
-#endif
-
-
#if USE_NATIVE_THREAD_PRIORITY
static void
@@ -1101,77 +1244,80 @@ ubf_pthread_cond_signal(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th);
- native_cond_signal(&th->native_thread_data.sleep_cond);
+ rb_native_cond_signal(&th->native_thread_data.cond.intr);
}
static void
-native_sleep(rb_thread_t *th, struct timeval *timeout_tv)
+native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel)
{
- struct timespec timeout;
rb_nativethread_lock_t *lock = &th->interrupt_lock;
- rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond;
-
- if (timeout_tv) {
- struct timespec timeout_rel;
-
- timeout_rel.tv_sec = timeout_tv->tv_sec;
- timeout_rel.tv_nsec = timeout_tv->tv_usec * 1000;
-
- /* Solaris cond_timedwait() return EINVAL if an argument is greater than
- * current_time + 100,000,000. So cut up to 100,000,000. This is
- * considered as a kind of spurious wakeup. The caller to native_sleep
- * should care about spurious wakeup.
- *
- * See also [Bug #1341] [ruby-core:29702]
- * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html
- */
- if (timeout_rel.tv_sec > 100000000) {
- timeout_rel.tv_sec = 100000000;
- timeout_rel.tv_nsec = 0;
- }
-
- timeout = native_cond_timeout(cond, timeout_rel);
- }
+ rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr;
+
+ /* Solaris cond_timedwait() return EINVAL if an argument is greater than
+ * current_time + 100,000,000. So cut up to 100,000,000. This is
+ * considered as a kind of spurious wakeup. The caller to native_sleep
+ * should care about spurious wakeup.
+ *
+ * See also [Bug #1341] [ruby-core:29702]
+ * http://download.oracle.com/docs/cd/E19683-01/816-0216/6m6ngupgv/index.html
+ */
+ const rb_hrtime_t max = (rb_hrtime_t)100000000 * RB_HRTIME_PER_SEC;
- GVL_UNLOCK_BEGIN();
+ GVL_UNLOCK_BEGIN(th);
{
- native_mutex_lock(lock);
+ rb_native_mutex_lock(lock);
th->unblock.func = ubf_pthread_cond_signal;
th->unblock.arg = th;
- if (RUBY_VM_INTERRUPTED(th)) {
+ if (RUBY_VM_INTERRUPTED(th->ec)) {
/* interrupted. return immediate */
thread_debug("native_sleep: interrupted before sleep\n");
}
else {
- if (!timeout_tv)
- native_cond_wait(cond, lock);
- else
- native_cond_timedwait(cond, lock, &timeout);
+ if (!rel) {
+ rb_native_cond_wait(cond, lock);
+ }
+ else {
+ rb_hrtime_t end;
+
+ if (*rel > max) {
+ *rel = max;
+ }
+
+ end = native_cond_timeout(cond, *rel);
+ native_cond_timedwait(cond, lock, &end);
+ }
}
th->unblock.func = 0;
- th->unblock.arg = 0;
- native_mutex_unlock(lock);
+ rb_native_mutex_unlock(lock);
}
- GVL_UNLOCK_END();
+ GVL_UNLOCK_END(th);
thread_debug("native_sleep done\n");
}
#ifdef USE_UBF_LIST
static LIST_HEAD(ubf_list_head);
+static rb_nativethread_lock_t ubf_list_lock = RB_NATIVETHREAD_LOCK_INIT;
+
+static void
+ubf_list_atfork(void)
+{
+ list_head_init(&ubf_list_head);
+ rb_native_mutex_initialize(&ubf_list_lock);
+}
/* The thread 'th' is registered to be trying unblock. */
static void
register_ubf_list(rb_thread_t *th)
{
- struct list_node *node = &th->native_thread_data.ubf_list;
+ struct list_node *node = &th->native_thread_data.node.ubf;
if (list_empty((struct list_head*)node)) {
- native_mutex_lock(&ubf_list_lock);
+ rb_native_mutex_lock(&ubf_list_lock);
list_add(&ubf_list_head, node);
- native_mutex_unlock(&ubf_list_lock);
+ rb_native_mutex_unlock(&ubf_list_lock);
}
}
@@ -1179,12 +1325,18 @@ register_ubf_list(rb_thread_t *th)
static void
unregister_ubf_list(rb_thread_t *th)
{
- struct list_node *node = &th->native_thread_data.ubf_list;
+ struct list_node *node = &th->native_thread_data.node.ubf;
+
+ /* we can't allow re-entry into ubf_list_head */
+ VM_ASSERT(th->unblock.func == 0);
if (!list_empty((struct list_head*)node)) {
- native_mutex_lock(&ubf_list_lock);
- list_del_init(node);
- native_mutex_unlock(&ubf_list_lock);
+ rb_native_mutex_lock(&ubf_list_lock);
+ list_del_init(node);
+ if (list_empty(&ubf_list_head) && !rb_signal_buff_size()) {
+ ubf_timer_disarm();
+ }
+ rb_native_mutex_unlock(&ubf_list_lock);
}
}
@@ -1196,25 +1348,42 @@ static void
ubf_wakeup_thread(rb_thread_t *th)
{
thread_debug("thread_wait_queue_wakeup (%"PRI_THREAD_ID")\n", thread_id_str(th));
- if (th)
- pthread_kill(th->thread_id, SIGVTALRM);
+ pthread_kill(th->thread_id, SIGVTALRM);
}
static void
ubf_select(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
+ rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor);
+ const rb_thread_t *cur = ruby_thread_from_native(); /* may be 0 */
+
register_ubf_list(th);
/*
* ubf_wakeup_thread() doesn't guarantee to wake up a target thread.
* Therefore, we repeatedly call ubf_wakeup_thread() until a target thread
- * exit from ubf function.
- * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread()
- * if running on timer thread because it may make endless wakeups.
+ * exit from ubf function. We must have a timer to perform this operation.
+ * We use double-checked locking here because this function may be called
+ * while vm->gvl.lock is held in do_gvl_timer.
+ * There is also no need to start a timer if we're the designated
+ * sigwait_th thread, otherwise we can deadlock with a thread
+ * in unblock_function_clear.
*/
- if (!pthread_equal(pthread_self(), timer_thread.id))
- rb_thread_wakeup_timer_thread();
+ if (cur != gvl->timer && cur != sigwait_th) {
+ /*
+ * Double-checked locking above was to prevent nested locking
+ * by the SAME thread. We use trylock here to prevent deadlocks
+ * between DIFFERENT threads
+ */
+ if (rb_native_mutex_trylock(&gvl->lock) == 0) {
+ if (!gvl->timer) {
+ rb_thread_wakeup_timer_thread(-1);
+ }
+ rb_native_mutex_unlock(&gvl->lock);
+ }
+ }
+
ubf_wakeup_thread(th);
}
@@ -1228,14 +1397,15 @@ static void
ubf_wakeup_all_threads(void)
{
rb_thread_t *th;
+ native_thread_data_t *dat;
if (!ubf_threads_empty()) {
- native_mutex_lock(&ubf_list_lock);
- list_for_each(&ubf_list_head, th,
- native_thread_data.ubf_list) {
+ rb_native_mutex_lock(&ubf_list_lock);
+ list_for_each(&ubf_list_head, dat, node.ubf) {
+ th = container_of(dat, rb_thread_t, native_thread_data);
ubf_wakeup_thread(th);
}
- native_mutex_unlock(&ubf_list_lock);
+ rb_native_mutex_unlock(&ubf_list_lock);
}
}
@@ -1245,57 +1415,39 @@ ubf_wakeup_all_threads(void)
#define ubf_select 0
static void ubf_wakeup_all_threads(void) { return; }
static int ubf_threads_empty(void) { return 1; }
+#define ubf_list_atfork() do {} while (0)
#endif /* USE_UBF_LIST */
#define TT_DEBUG 0
#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0)
-/* 100ms. 10ms is too small for user level thread scheduling
- * on recent Linux (tested on 2.6.35)
- */
-#define TIME_QUANTUM_USEC (100 * 1000)
-
-#if USE_SLEEPY_TIMER_THREAD
static struct {
- /*
- * Read end of each pipe is closed inside timer thread for shutdown
- * Write ends are closed by a normal Ruby thread during shutdown
- */
- int normal[2];
- int low[2];
+ /* pipes are closed in forked children when owner_process does not match */
+ int normal[2]; /* [0] == sigwait_fd */
+ int ub_main[2]; /* unblock main thread from native_ppoll_sleep */
/* volatile for signal handler use: */
volatile rb_pid_t owner_process;
- rb_atomic_t writing;
-} timer_thread_pipe = {
+} signal_self_pipe = {
+ {-1, -1},
{-1, -1},
- {-1, -1}, /* low priority */
};
-NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
-static void
-async_bug_fd(const char *mesg, int errno_arg, int fd)
-{
- char buff[64];
- size_t n = strlcpy(buff, mesg, sizeof(buff));
- if (n < sizeof(buff)-3) {
- ruby_snprintf(buff, sizeof(buff)-n, "(%d)", fd);
- }
- rb_async_bug_errno(buff, errno_arg);
-}
-
/* only use signal-safe system calls here */
static void
-rb_thread_wakeup_timer_thread_fd(volatile int *fdp)
+rb_thread_wakeup_timer_thread_fd(int fd)
{
+#if USE_EVENTFD
+ const uint64_t buff = 1;
+#else
+ const char buff = '!';
+#endif
ssize_t result;
- int fd = *fdp; /* access fdp exactly once here and do not reread fdp */
/* already opened */
- if (fd >= 0 && timer_thread_pipe.owner_process == getpid()) {
- static const char buff[1] = {'!'};
+ if (fd >= 0) {
retry:
- if ((result = write(fd, buff, 1)) <= 0) {
+ if ((result = write(fd, &buff, sizeof(buff))) <= 0) {
int e = errno;
switch (e) {
case EINTR: goto retry;
@@ -1315,64 +1467,113 @@ rb_thread_wakeup_timer_thread_fd(volatile int *fdp)
}
}
-void
-rb_thread_wakeup_timer_thread(void)
-{
- /* must be safe inside sighandler, so no mutex */
- if (timer_thread_pipe.owner_process == getpid()) {
- ATOMIC_INC(timer_thread_pipe.writing);
- rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.normal[1]);
- ATOMIC_DEC(timer_thread_pipe.writing);
- }
-}
-
+/*
+ * This ensures we get a SIGVTALRM in TIME_QUANTUM_MSEC if our
+ * process could not react to the original signal in time.
+ */
static void
-rb_thread_wakeup_timer_thread_low(void)
-{
- if (timer_thread_pipe.owner_process == getpid()) {
- ATOMIC_INC(timer_thread_pipe.writing);
- rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.low[1]);
- ATOMIC_DEC(timer_thread_pipe.writing);
+ubf_timer_arm(rb_pid_t current) /* async signal safe */
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+ if ((!current || timer_posix.owner == current) &&
+ timer_state_cas(RTIMER_DISARM, RTIMER_ARMING) == RTIMER_DISARM) {
+ struct itimerspec it;
+
+ it.it_interval.tv_sec = it.it_value.tv_sec = 0;
+ it.it_interval.tv_nsec = it.it_value.tv_nsec = TIME_QUANTUM_NSEC;
+
+ if (timer_settime(timer_posix.timerid, 0, &it, 0))
+ rb_async_bug_errno("timer_settime (arm)", errno);
+
+ switch (timer_state_cas(RTIMER_ARMING, RTIMER_ARMED)) {
+ case RTIMER_DISARM:
+ /* somebody requested a disarm while we were arming */
+ /* may race harmlessly with ubf_timer_destroy */
+ (void)timer_settime(timer_posix.timerid, 0, &zero, 0);
+
+ case RTIMER_ARMING: return; /* success */
+ case RTIMER_ARMED:
+ /*
+ * it is possible to have another thread disarm, and
+ * a third thread arm finish re-arming before we get
+ * here, so we wasted a syscall with timer_settime but
+ * probably unavoidable in a signal handler.
+ */
+ return;
+ case RTIMER_DEAD:
+ /* may race harmlessly with ubf_timer_destroy */
+ (void)timer_settime(timer_posix.timerid, 0, &zero, 0);
+ return;
+ default:
+ rb_async_bug_errno("UBF_TIMER_POSIX unknown state", ERANGE);
+ }
+ }
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+ if (!current || current == timer_pthread.owner) {
+ if (ATOMIC_EXCHANGE(timer_pthread.armed, 1) == 0)
+ rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]);
}
+#endif
}
-/* VM-dependent API is not available for this function */
-static void
-consume_communication_pipe(int fd)
+void
+rb_thread_wakeup_timer_thread(int sig)
{
-#define CCP_READ_BUFF_SIZE 1024
- /* buffer can be shared because no one refers to them. */
- static char buff[CCP_READ_BUFF_SIZE];
- ssize_t result;
+ rb_pid_t current;
- while (1) {
- result = read(fd, buff, sizeof(buff));
- if (result == 0) {
- return;
- }
- else if (result < 0) {
- int e = errno;
- switch (e) {
- case EINTR:
- continue; /* retry */
- case EAGAIN:
-#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
-#endif
- return;
- default:
- async_bug_fd("consume_communication_pipe: read", e, fd);
- }
- }
+ /* non-sighandler path */
+ if (sig <= 0) {
+ rb_thread_wakeup_timer_thread_fd(signal_self_pipe.normal[1]);
+ if (sig < 0) {
+ ubf_timer_arm(0);
+ }
+ return;
+ }
+
+ /* must be safe inside sighandler, so no mutex */
+ current = getpid();
+ if (signal_self_pipe.owner_process == current) {
+ rb_thread_wakeup_timer_thread_fd(signal_self_pipe.normal[1]);
+
+ /*
+ * system_working check is required because vm and main_thread are
+ * freed during shutdown
+ */
+ if (system_working > 0) {
+ volatile rb_execution_context_t *ec;
+ rb_vm_t *vm = GET_VM();
+ rb_thread_t *mth;
+
+ /*
+ * FIXME: root VM and main_thread should be static and not
+ * on heap for maximum safety (and startup/shutdown speed)
+ */
+ if (!vm) return;
+ mth = vm->ractor.main_thread;
+ if (!mth || system_working <= 0) return;
+
+ /* this relies on GC for grace period before cont_free */
+ ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec);
+
+ if (ec) {
+ RUBY_VM_SET_TRAP_INTERRUPT(ec);
+ ubf_timer_arm(current);
+
+ /* some ubfs can interrupt single-threaded process directly */
+ if (vm->ubf_async_safe && mth->unblock.func) {
+ (mth->unblock.func)(mth->unblock.arg);
+ }
+ }
+ }
}
}
-#define CLOSE_INVALIDATE(expr) \
- close_invalidate(&timer_thread_pipe.expr,"close_invalidate: "#expr)
+#define CLOSE_INVALIDATE_PAIR(expr) \
+ close_invalidate_pair(expr,"close_invalidate: "#expr)
static void
-close_invalidate(volatile int *fdp, const char *msg)
+close_invalidate(int *fdp, const char *msg)
{
- int fd = *fdp; /* access fdp exactly once here and do not reread fdp */
+ int fd = *fdp;
*fdp = -1;
if (close(fd) < 0) {
@@ -1381,6 +1582,19 @@ close_invalidate(volatile int *fdp, const char *msg)
}
static void
+close_invalidate_pair(int fds[2], const char *msg)
+{
+ if (USE_EVENTFD && fds[0] == fds[1]) {
+ close_invalidate(&fds[0], msg);
+ fds[1] = -1;
+ }
+ else {
+ close_invalidate(&fds[0], msg);
+ close_invalidate(&fds[1], msg);
+ }
+}
+
+static void
set_nonblock(int fd)
{
int oflags;
@@ -1395,14 +1609,33 @@ set_nonblock(int fd)
rb_sys_fail(0);
}
+/* communication pipe with timer thread and signal handler */
static int
setup_communication_pipe_internal(int pipes[2])
{
int err;
+ if (pipes[0] >= 0 || pipes[1] >= 0) {
+ VM_ASSERT(pipes[0] >= 0);
+ VM_ASSERT(pipes[1] >= 0);
+ return 0;
+ }
+
+ /*
+ * Don't bother with eventfd on ancient Linux 2.6.22..2.6.26 which were
+ * missing EFD_* flags, they can fall back to pipe
+ */
+#if USE_EVENTFD && defined(EFD_NONBLOCK) && defined(EFD_CLOEXEC)
+ pipes[0] = pipes[1] = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
+ if (pipes[0] >= 0) {
+ rb_update_max_fd(pipes[0]);
+ return 0;
+ }
+#endif
+
err = rb_cloexec_pipe(pipes);
if (err != 0) {
- rb_warn("Failed to create communication pipe for timer thread: %s",
+ rb_warn("pipe creation failed for timer: %s, scheduling broken",
strerror(errno));
return -1;
}
@@ -1413,242 +1646,270 @@ setup_communication_pipe_internal(int pipes[2])
return 0;
}
-/* communication pipe with timer thread and signal handler */
-static int
-setup_communication_pipe(void)
-{
- VM_ASSERT(timer_thread_pipe.owner_process == 0);
- VM_ASSERT(timer_thread_pipe.normal[0] == -1);
- VM_ASSERT(timer_thread_pipe.normal[1] == -1);
- VM_ASSERT(timer_thread_pipe.low[0] == -1);
- VM_ASSERT(timer_thread_pipe.low[1] == -1);
+#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
+# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
+#endif
- if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
- return errno;
- }
- if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) {
- int e = errno;
- CLOSE_INVALIDATE(normal[0]);
- CLOSE_INVALIDATE(normal[1]);
- return e;
- }
+enum {
+ THREAD_NAME_MAX =
+#if defined(__linux__)
+ 16
+#elif defined(__APPLE__)
+/* Undocumented, and main thread seems unlimited */
+ 64
+#else
+ 16
+#endif
+};
- return 0;
-}
+static VALUE threadptr_invoke_proc_location(rb_thread_t *th);
-/**
- * Let the timer thread sleep a while.
- *
- * The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running.
- * @pre the calling context is in the timer thread.
- */
-static inline void
-timer_thread_sleep(rb_global_vm_lock_t* gvl)
+static void
+native_set_thread_name(rb_thread_t *th)
{
- int result;
- int need_polling;
- struct pollfd pollfds[2];
-
- pollfds[0].fd = timer_thread_pipe.normal[0];
- pollfds[0].events = POLLIN;
- pollfds[1].fd = timer_thread_pipe.low[0];
- pollfds[1].events = POLLIN;
-
- need_polling = !ubf_threads_empty();
-
- if (gvl->waiting > 0 || need_polling) {
- /* polling (TIME_QUANTUM_USEC usec) */
- result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
- }
- else {
- /* wait (infinite) */
- result = poll(pollfds, numberof(pollfds), -1);
- }
-
- if (result == 0) {
- /* maybe timeout */
- }
- else if (result > 0) {
- consume_communication_pipe(timer_thread_pipe.normal[0]);
- consume_communication_pipe(timer_thread_pipe.low[0]);
- }
- else { /* result < 0 */
- int e = errno;
- switch (e) {
- case EBADF:
- case EINVAL:
- case ENOMEM: /* from Linux man */
- case EFAULT: /* from FreeBSD man */
- rb_async_bug_errno("thread_timer: select", e);
- default:
- /* ignore */;
- }
+#ifdef SET_CURRENT_THREAD_NAME
+ VALUE loc;
+ if (!NIL_P(loc = th->name)) {
+ SET_CURRENT_THREAD_NAME(RSTRING_PTR(loc));
+ }
+ else if ((loc = threadptr_invoke_proc_location(th)) != Qnil) {
+ char *name, *p;
+ char buf[THREAD_NAME_MAX];
+ size_t len;
+ int n;
+
+ name = RSTRING_PTR(RARRAY_AREF(loc, 0));
+ p = strrchr(name, '/'); /* show only the basename of the path. */
+ if (p && p[1])
+ name = p + 1;
+
+ n = snprintf(buf, sizeof(buf), "%s:%d", name, NUM2INT(RARRAY_AREF(loc, 1)));
+ RB_GC_GUARD(loc);
+
+ len = (size_t)n;
+ if (len >= sizeof(buf)) {
+ buf[sizeof(buf)-2] = '*';
+ buf[sizeof(buf)-1] = '\0';
+ }
+ SET_CURRENT_THREAD_NAME(buf);
}
+#endif
}
-#else /* USE_SLEEPY_TIMER_THREAD */
-# define PER_NANO 1000000000
-void rb_thread_wakeup_timer_thread(void) {}
-static void rb_thread_wakeup_timer_thread_low(void) {}
-
-static rb_nativethread_lock_t timer_thread_lock;
-static rb_nativethread_cond_t timer_thread_cond;
-
-static inline void
-timer_thread_sleep(rb_global_vm_lock_t* unused)
+static void
+native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name)
{
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
- ts = native_cond_timeout(&timer_thread_cond, ts);
-
- native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts);
+#if defined SET_ANOTHER_THREAD_NAME || defined SET_CURRENT_THREAD_NAME
+ char buf[THREAD_NAME_MAX];
+ const char *s = "";
+# if !defined SET_ANOTHER_THREAD_NAME
+ if (!pthread_equal(pthread_self(), thread_id)) return;
+# endif
+ if (!NIL_P(name)) {
+ long n;
+ RSTRING_GETMEM(name, s, n);
+ if (n >= (int)sizeof(buf)) {
+ memcpy(buf, s, sizeof(buf)-1);
+ buf[sizeof(buf)-1] = '\0';
+ s = buf;
+ }
+ }
+# if defined SET_ANOTHER_THREAD_NAME
+ SET_ANOTHER_THREAD_NAME(thread_id, s);
+# elif defined SET_CURRENT_THREAD_NAME
+ SET_CURRENT_THREAD_NAME(s);
+# endif
+#endif
}
-#endif /* USE_SLEEPY_TIMER_THREAD */
-#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
-# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
+#if defined(RB_THREAD_T_HAS_NATIVE_ID) || defined(__APPLE__)
+static VALUE
+native_thread_native_thread_id(rb_thread_t *target_th)
+{
+#ifdef RB_THREAD_T_HAS_NATIVE_ID
+ int tid = target_th->tid;
+ if (tid == 0) return Qnil;
+ return INT2FIX(tid);
+#elif defined(__APPLE__)
+ uint64_t tid;
+ int e = pthread_threadid_np(target_th->thread_id, &tid);
+ if (e != 0) rb_syserr_fail(e, "pthread_threadid_np");
+ return ULL2NUM((unsigned long long)tid);
+#endif
+}
+# define USE_NATIVE_THREAD_NATIVE_THREAD_ID 1
+#else
+# define USE_NATIVE_THREAD_NATIVE_THREAD_ID 0
#endif
static void
-native_set_thread_name(rb_thread_t *th)
+ubf_timer_invalidate(void)
{
-#ifdef SET_CURRENT_THREAD_NAME
- if (!th->first_func && th->first_proc) {
- VALUE loc;
- if (!NIL_P(loc = th->name)) {
- SET_CURRENT_THREAD_NAME(RSTRING_PTR(loc));
- }
- else if (!NIL_P(loc = rb_proc_location(th->first_proc))) {
- const VALUE *ptr = RARRAY_CONST_PTR(loc); /* [ String, Integer ] */
- char *name, *p;
- char buf[16];
- size_t len;
- int n;
-
- name = RSTRING_PTR(ptr[0]);
- p = strrchr(name, '/'); /* show only the basename of the path. */
- if (p && p[1])
- name = p + 1;
-
- n = snprintf(buf, sizeof(buf), "%s:%d", name, NUM2INT(ptr[1]));
- rb_gc_force_recycle(loc); /* acts as a GC guard, too */
-
- len = (size_t)n;
- if (len >= sizeof(buf)) {
- buf[sizeof(buf)-2] = '*';
- buf[sizeof(buf)-1] = '\0';
- }
- SET_CURRENT_THREAD_NAME(buf);
- }
- }
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ CLOSE_INVALIDATE_PAIR(timer_pthread.low);
#endif
}
-static void *
-thread_timer(void *p)
+static void
+ubf_timer_pthread_create(rb_pid_t current)
{
- rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p;
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ int err;
+ if (timer_pthread.owner == current)
+ return;
- if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n");
+ if (setup_communication_pipe_internal(timer_pthread.low) < 0)
+ return;
-#ifdef SET_CURRENT_THREAD_NAME
- SET_CURRENT_THREAD_NAME("ruby-timer-thr");
+ err = pthread_create(&timer_pthread.thid, 0, timer_pthread_fn, GET_VM());
+ if (!err)
+ timer_pthread.owner = current;
+ else
+ rb_warn("pthread_create failed for timer: %s, signals racy",
+ strerror(err));
#endif
+}
-#if !USE_SLEEPY_TIMER_THREAD
- native_mutex_initialize(&timer_thread_lock);
- native_cond_initialize(&timer_thread_cond, RB_CONDATTR_CLOCK_MONOTONIC);
- native_mutex_lock(&timer_thread_lock);
-#endif
- while (system_working > 0) {
+static void
+ubf_timer_create(rb_pid_t current)
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+# if defined(__sun)
+# define UBF_TIMER_CLOCK CLOCK_REALTIME
+# else /* Tested Linux and FreeBSD: */
+# define UBF_TIMER_CLOCK CLOCK_MONOTONIC
+# endif
- /* timer function */
- ubf_wakeup_all_threads();
- timer_thread_function(0);
+ struct sigevent sev;
- if (TT_DEBUG) WRITE_CONST(2, "tick\n");
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = SIGVTALRM;
+ sev.sigev_value.sival_ptr = &timer_posix;
- /* wait */
- timer_thread_sleep(gvl);
+ if (!timer_create(UBF_TIMER_CLOCK, &sev, &timer_posix.timerid)) {
+ rb_atomic_t prev = timer_state_exchange(RTIMER_DISARM);
+
+ if (prev != RTIMER_DEAD) {
+ rb_bug("timer_posix was not dead: %u\n", (unsigned)prev);
+ }
+ timer_posix.owner = current;
+ }
+ else {
+ rb_warn("timer_create failed: %s, signals racy", strerror(errno));
}
-#if USE_SLEEPY_TIMER_THREAD
- CLOSE_INVALIDATE(normal[0]);
- CLOSE_INVALIDATE(low[0]);
-#else
- native_mutex_unlock(&timer_thread_lock);
- native_cond_destroy(&timer_thread_cond);
- native_mutex_destroy(&timer_thread_lock);
#endif
-
- if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n");
- return NULL;
+ if (UBF_TIMER == UBF_TIMER_PTHREAD)
+ ubf_timer_pthread_create(current);
}
static void
rb_thread_create_timer_thread(void)
{
- if (!timer_thread.created) {
- int err;
-#ifdef HAVE_PTHREAD_ATTR_INIT
- pthread_attr_t attr;
+ /* we only create the pipe, and lazy-spawn */
+ rb_pid_t current = getpid();
+ rb_pid_t owner = signal_self_pipe.owner_process;
- err = pthread_attr_init(&attr);
- if (err != 0) {
- rb_warn("pthread_attr_init failed for timer: %s, scheduling broken",
- strerror(err));
- return;
+ if (owner && owner != current) {
+ CLOSE_INVALIDATE_PAIR(signal_self_pipe.normal);
+ CLOSE_INVALIDATE_PAIR(signal_self_pipe.ub_main);
+ ubf_timer_invalidate();
+ }
+
+ if (setup_communication_pipe_internal(signal_self_pipe.normal) < 0) return;
+ if (setup_communication_pipe_internal(signal_self_pipe.ub_main) < 0) return;
+
+ ubf_timer_create(current);
+ if (owner != current) {
+ /* validate pipe on this process */
+ sigwait_th = THREAD_INVALID;
+ signal_self_pipe.owner_process = current;
+ }
+}
+
+static void
+ubf_timer_disarm(void)
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+ rb_atomic_t prev;
+
+ if (timer_posix.owner && timer_posix.owner != getpid()) return;
+ prev = timer_state_cas(RTIMER_ARMED, RTIMER_DISARM);
+ switch (prev) {
+ case RTIMER_DISARM: return; /* likely */
+ case RTIMER_ARMING: return; /* ubf_timer_arm will disarm itself */
+ case RTIMER_ARMED:
+ if (timer_settime(timer_posix.timerid, 0, &zero, 0)) {
+ int err = errno;
+
+ if (err == EINVAL) {
+ prev = timer_state_cas(RTIMER_DISARM, RTIMER_DISARM);
+
+ /* main thread may have killed the timer */
+ if (prev == RTIMER_DEAD) return;
+
+ rb_bug_errno("timer_settime (disarm)", err);
+ }
}
-# ifdef PTHREAD_STACK_MIN
- {
- const size_t min_size = (4096 * 4);
- /* Allocate the machine stack for the timer thread
- * at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes
- * machine stack overflow only with PTHREAD_STACK_MIN.
- */
- size_t stack_size = PTHREAD_STACK_MIN; /* may be dynamic, get only once */
- if (stack_size < min_size) stack_size = min_size;
- if (THREAD_DEBUG) stack_size += BUFSIZ;
- pthread_attr_setstacksize(&attr, stack_size);
- }
-# endif
+ return;
+ case RTIMER_DEAD: return; /* stay dead */
+ default:
+ rb_bug("UBF_TIMER_POSIX bad state: %u\n", (unsigned)prev);
+ }
+
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+ ATOMIC_SET(timer_pthread.armed, 0);
#endif
+}
-#if USE_SLEEPY_TIMER_THREAD
- err = setup_communication_pipe();
- if (err != 0) {
- rb_warn("pipe creation failed for timer: %s, scheduling broken",
- strerror(err));
- return;
- }
-#endif /* USE_SLEEPY_TIMER_THREAD */
+static void
+ubf_timer_destroy(void)
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+ if (timer_posix.owner == getpid()) {
+ rb_atomic_t expect = RTIMER_DISARM;
+ size_t i, max = 10000000;
+
+ /* prevent signal handler from arming: */
+ for (i = 0; i < max; i++) {
+ switch (timer_state_cas(expect, RTIMER_DEAD)) {
+ case RTIMER_DISARM:
+ if (expect == RTIMER_DISARM) goto done;
+ expect = RTIMER_DISARM;
+ break;
+ case RTIMER_ARMING:
+ native_thread_yield(); /* let another thread finish arming */
+ expect = RTIMER_ARMED;
+ break;
+ case RTIMER_ARMED:
+ if (expect == RTIMER_ARMED) {
+ if (timer_settime(timer_posix.timerid, 0, &zero, 0))
+ rb_bug_errno("timer_settime (destroy)", errno);
+ goto done;
+ }
+ expect = RTIMER_ARMED;
+ break;
+ case RTIMER_DEAD:
+ rb_bug("RTIMER_DEAD unexpected");
+ }
+ }
+ rb_bug("timed out waiting for timer to arm");
+done:
+ if (timer_delete(timer_posix.timerid) < 0)
+ rb_sys_fail("timer_delete");
- /* create timer thread */
- if (timer_thread.created) {
- rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
- }
-#ifdef HAVE_PTHREAD_ATTR_INIT
- err = pthread_create(&timer_thread.id, &attr, thread_timer, &GET_VM()->gvl);
- pthread_attr_destroy(&attr);
-#else
- err = pthread_create(&timer_thread.id, NULL, thread_timer, &GET_VM()->gvl);
-#endif
- if (err != 0) {
- rb_warn("pthread_create failed for timer: %s, scheduling broken",
- strerror(err));
-#if USE_SLEEPY_TIMER_THREAD
- CLOSE_INVALIDATE(normal[0]);
- CLOSE_INVALIDATE(normal[1]);
- CLOSE_INVALIDATE(low[0]);
- CLOSE_INVALIDATE(low[1]);
-#endif
- return;
- }
+ VM_ASSERT(timer_state_exchange(RTIMER_DEAD) == RTIMER_DEAD);
+ }
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+ int err;
- /* validate pipe on this process */
- timer_thread_pipe.owner_process = getpid();
- timer_thread.created = 1;
+ timer_pthread.owner = 0;
+ ubf_timer_disarm();
+ rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]);
+ err = pthread_join(timer_pthread.thid, 0);
+ if (err) {
+ rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
}
+#endif
}
static int
@@ -1656,37 +1917,10 @@ native_stop_timer_thread(void)
{
int stopped;
stopped = --system_working <= 0;
+ if (stopped)
+ ubf_timer_destroy();
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
-#if USE_SLEEPY_TIMER_THREAD
- if (stopped) {
- /* prevent wakeups from signal handler ASAP */
- timer_thread_pipe.owner_process = 0;
-
- /*
- * however, the above was not enough: the FD may already be
- * captured and in the middle of a write while we are running,
- * so wait for that to finish:
- */
- while (ATOMIC_CAS(timer_thread_pipe.writing, (rb_atomic_t)0, 0)) {
- native_thread_yield();
- }
-
- /* stop writing ends of pipes so timer thread notices EOF */
- CLOSE_INVALIDATE(normal[1]);
- CLOSE_INVALIDATE(low[1]);
-
- /* timer thread will stop looping when system_working <= 0: */
- native_thread_join(timer_thread.id);
-
- /* timer thread will close the read end on exit: */
- VM_ASSERT(timer_thread_pipe.normal[0] == -1);
- VM_ASSERT(timer_thread_pipe.low[0] == -1);
-
- if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
- timer_thread.created = 0;
- }
-#endif
return stopped;
}
@@ -1720,8 +1954,8 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
else
#endif
if (th) {
- size = th->machine.stack_maxsize;
- base = (char *)th->machine.stack_start - STACK_DIR_UPPER(0, size);
+ size = th->ec->machine.stack_maxsize;
+ base = (char *)th->ec->machine.stack_start - STACK_DIR_UPPER(0, size);
}
else {
return 0;
@@ -1743,20 +1977,23 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
int
rb_reserved_fd_p(int fd)
{
-#if USE_SLEEPY_TIMER_THREAD
- if ((fd == timer_thread_pipe.normal[0] ||
- fd == timer_thread_pipe.normal[1] ||
- fd == timer_thread_pipe.low[0] ||
- fd == timer_thread_pipe.low[1]) &&
- timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
+ /* no false-positive if out-of-FD at startup */
+ if (fd < 0)
+ return 0;
+
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ if (fd == timer_pthread.low[0] || fd == timer_pthread.low[1])
+ goto check_pid;
+#endif
+ if (fd == signal_self_pipe.normal[0] || fd == signal_self_pipe.normal[1])
+ goto check_pid;
+ if (fd == signal_self_pipe.ub_main[0] || fd == signal_self_pipe.ub_main[1])
+ goto check_pid;
+ return 0;
+check_pid:
+ if (signal_self_pipe.owner_process == getpid()) /* async-signal-safe */
return 1;
- }
- else {
- return 0;
- }
-#else
return 0;
-#endif
}
rb_nativethread_id_t
@@ -1765,4 +2002,307 @@ rb_nativethread_self(void)
return pthread_self();
}
+#if USE_MJIT
+/* A function that wraps actual worker function, for pthread abstraction. */
+static void *
+mjit_worker(void *arg)
+{
+ void (*worker_func)(void) = (void(*)(void))arg;
+
+#ifdef SET_CURRENT_THREAD_NAME
+ SET_CURRENT_THREAD_NAME("ruby-mjitworker"); /* 16 byte including NUL */
+#endif
+ worker_func();
+ return NULL;
+}
+
+/* Launch MJIT thread. Returns FALSE if it fails to create thread. */
+int
+rb_thread_create_mjit_thread(void (*worker_func)(void))
+{
+ pthread_attr_t attr;
+ pthread_t worker_pid;
+ int ret = FALSE;
+
+ if (pthread_attr_init(&attr) != 0) return ret;
+
+ /* jit_worker thread is not to be joined */
+ if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) == 0
+ && pthread_create(&worker_pid, &attr, mjit_worker, (void *)worker_func) == 0) {
+ ret = TRUE;
+ }
+ pthread_attr_destroy(&attr);
+ return ret;
+}
+#endif
+
+int
+rb_sigwait_fd_get(const rb_thread_t *th)
+{
+ if (signal_self_pipe.normal[0] >= 0) {
+ VM_ASSERT(signal_self_pipe.owner_process == getpid());
+ /*
+ * no need to keep firing the timer if any thread is sleeping
+ * on the signal self-pipe
+ */
+ ubf_timer_disarm();
+
+ if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) {
+ return signal_self_pipe.normal[0];
+ }
+ }
+ return -1; /* avoid thundering herd and work stealing/starvation */
+}
+
+void
+rb_sigwait_fd_put(const rb_thread_t *th, int fd)
+{
+ const rb_thread_t *old;
+
+ VM_ASSERT(signal_self_pipe.normal[0] == fd);
+ old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID);
+ if (old != th) assert(old == th);
+}
+
+#ifndef HAVE_PPOLL
+/* TODO: don't ignore sigmask */
+static int
+ruby_ppoll(struct pollfd *fds, nfds_t nfds,
+ const struct timespec *ts, const sigset_t *sigmask)
+{
+ int timeout_ms;
+
+ if (ts) {
+ int tmp, tmp2;
+
+ if (ts->tv_sec > INT_MAX/1000)
+ timeout_ms = INT_MAX;
+ else {
+ tmp = (int)(ts->tv_sec * 1000);
+ /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
+ tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
+ if (INT_MAX - tmp < tmp2)
+ timeout_ms = INT_MAX;
+ else
+ timeout_ms = (int)(tmp + tmp2);
+ }
+ }
+ else
+ timeout_ms = -1;
+
+ return poll(fds, nfds, timeout_ms);
+}
+# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
+#endif
+
+void
+rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *rel)
+{
+ struct pollfd pfd;
+ struct timespec ts;
+
+ pfd.fd = sigwait_fd;
+ pfd.events = POLLIN;
+
+ if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) {
+ (void)ppoll(&pfd, 1, rb_hrtime2timespec(&ts, rel), 0);
+ check_signals_nogvl(th, sigwait_fd);
+ }
+ else {
+ rb_hrtime_t to = RB_HRTIME_MAX, end;
+ int n = 0;
+
+ if (rel) {
+ to = *rel;
+ end = rb_hrtime_add(rb_hrtime_now(), to);
+ }
+ /*
+ * tricky: this needs to return on spurious wakeup (no auto-retry).
+ * But we also need to distinguish between periodic quantum
+ * wakeups, so we care about the result of consume_communication_pipe
+ *
+ * We want to avoid spurious wakeup for Mutex#sleep compatibility
+ * [ruby-core:88102]
+ */
+ for (;;) {
+ const rb_hrtime_t *sto = sigwait_timeout(th, sigwait_fd, &to, &n);
+
+ if (n) return;
+ n = ppoll(&pfd, 1, rb_hrtime2timespec(&ts, sto), 0);
+ if (check_signals_nogvl(th, sigwait_fd))
+ return;
+ if (n || (th && RUBY_VM_INTERRUPTED(th->ec)))
+ return;
+ if (rel && hrtime_update_expire(&to, end))
+ return;
+ }
+ }
+}
+
+/*
+ * we need to guarantee wakeups from native_ppoll_sleep because
+ * ubf_select may not be going through ubf_list if other threads
+ * are all sleeping.
+ */
+static void
+ubf_ppoll_sleep(void *ignore)
+{
+ rb_thread_wakeup_timer_thread_fd(signal_self_pipe.ub_main[1]);
+}
+
+/*
+ * Single CPU setups benefit from explicit sched_yield() before ppoll(),
+ * since threads may be too starved to enter the GVL waitqueue for
+ * us to detect contention. Instead, we want to kick other threads
+ * so they can run and possibly prevent us from entering slow paths
+ * in ppoll() or similar syscalls.
+ *
+ * Confirmed on FreeBSD 11.2 and Linux 4.19.
+ * [ruby-core:90417] [Bug #15398]
+ */
+#define GVL_UNLOCK_BEGIN_YIELD(th) do { \
+ const native_thread_data_t *next; \
+ rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); \
+ RB_GC_SAVE_MACHINE_CONTEXT(th); \
+ rb_native_mutex_lock(&gvl->lock); \
+ next = gvl_release_common(gvl); \
+ rb_native_mutex_unlock(&gvl->lock); \
+ if (!next && rb_ractor_living_thread_num(th->ractor) > 1) { \
+ native_thread_yield(); \
+ }
+
+/*
+ * This function does not exclusively acquire sigwait_fd, so it
+ * cannot safely read from it. However, it can be woken up in
+ * 4 ways:
+ *
+ * 1) ubf_ppoll_sleep (from another thread)
+ * 2) rb_thread_wakeup_timer_thread (from signal handler)
+ * 3) any unmasked signal hitting the process
+ * 4) periodic ubf timer wakeups (after 3)
+ */
+static void
+native_ppoll_sleep(rb_thread_t *th, rb_hrtime_t *rel)
+{
+ rb_native_mutex_lock(&th->interrupt_lock);
+ th->unblock.func = ubf_ppoll_sleep;
+ rb_native_mutex_unlock(&th->interrupt_lock);
+
+ GVL_UNLOCK_BEGIN_YIELD(th);
+
+ if (!RUBY_VM_INTERRUPTED(th->ec)) {
+ struct pollfd pfd[2];
+ struct timespec ts;
+
+ pfd[0].fd = signal_self_pipe.normal[0]; /* sigwait_fd */
+ pfd[1].fd = signal_self_pipe.ub_main[0];
+ pfd[0].events = pfd[1].events = POLLIN;
+ if (ppoll(pfd, 2, rb_hrtime2timespec(&ts, rel), 0) > 0) {
+ if (pfd[1].revents & POLLIN) {
+ (void)consume_communication_pipe(pfd[1].fd);
+ }
+ }
+ /*
+ * do not read the sigwait_fd, here, let uplevel callers
+ * or other threads that, otherwise we may steal and starve
+ * other threads
+ */
+ }
+ unblock_function_clear(th);
+ GVL_UNLOCK_END(th);
+}
+
+static void
+native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
+{
+ int sigwait_fd = rb_sigwait_fd_get(th);
+ rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
+
+ if (sigwait_fd >= 0) {
+ rb_native_mutex_lock(&th->interrupt_lock);
+ th->unblock.func = ubf_sigwait;
+ rb_native_mutex_unlock(&th->interrupt_lock);
+
+ GVL_UNLOCK_BEGIN_YIELD(th);
+
+ if (!RUBY_VM_INTERRUPTED(th->ec)) {
+ rb_sigwait_sleep(th, sigwait_fd, rel);
+ }
+ else {
+ check_signals_nogvl(th, sigwait_fd);
+ }
+ unblock_function_clear(th);
+ GVL_UNLOCK_END(th);
+ rb_sigwait_fd_put(th, sigwait_fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
+ else if (th == th->vm->ractor.main_thread) { /* always able to handle signals */
+ native_ppoll_sleep(th, rel);
+ }
+ else {
+ native_cond_sleep(th, rel);
+ }
+
+ rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
+}
+
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+static void *
+timer_pthread_fn(void *p)
+{
+ rb_vm_t *vm = p;
+ pthread_t main_thread_id = vm->ractor.main_thread->thread_id;
+ struct pollfd pfd;
+ int timeout = -1;
+ int ccp;
+
+ pfd.fd = timer_pthread.low[0];
+ pfd.events = POLLIN;
+
+ while (system_working > 0) {
+ (void)poll(&pfd, 1, timeout);
+ ccp = consume_communication_pipe(pfd.fd);
+
+ if (system_working > 0) {
+ if (ATOMIC_CAS(timer_pthread.armed, 1, 1)) {
+ pthread_kill(main_thread_id, SIGVTALRM);
+
+ if (rb_signal_buff_size() || !ubf_threads_empty()) {
+ timeout = TIME_QUANTUM_MSEC;
+ }
+ else {
+ ATOMIC_SET(timer_pthread.armed, 0);
+ timeout = -1;
+ }
+ }
+ else if (ccp) {
+ pthread_kill(main_thread_id, SIGVTALRM);
+ ATOMIC_SET(timer_pthread.armed, 0);
+ timeout = -1;
+ }
+ }
+ }
+
+ return 0;
+}
+#endif /* UBF_TIMER_PTHREAD */
+
+static VALUE
+ubf_caller(void *ignore)
+{
+ rb_thread_sleep_forever();
+
+ return Qfalse;
+}
+
+/*
+ * Called if and only if one thread is running, and
+ * the unblock function is NOT async-signal-safe
+ * This assumes USE_THREAD_CACHE is true for performance reasons
+ */
+static VALUE
+rb_thread_start_unblock_thread(void)
+{
+ return rb_thread_create(ubf_caller, 0);
+}
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */