summaryrefslogtreecommitdiff
path: root/thread_pthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_pthread.c')
-rw-r--r--thread_pthread.c326
1 files changed, 282 insertions, 44 deletions
diff --git a/thread_pthread.c b/thread_pthread.c
index 545cc2fa3b..eb639d06aa 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -34,6 +34,56 @@
#if defined(__HAIKU__)
#include <kernel/OS.h>
#endif
+#include <time.h>
+#include <signal.h>
+
+#if defined(SIGVTALRM) && !defined(__CYGWIN__)
+# define USE_UBF_LIST 1
+#endif
+
+/*
+ * UBF_TIMER and ubf_list both use SIGVTALRM.
+ * UBF_TIMER is to close TOCTTOU signal race on programs
+ * without GVL contention blocking read/write to sockets.
+ *
+ * ubf_list wakeups may be triggered periodically by UBF_TIMER on
+ * gvl_yield.
+ */
+#define UBF_TIMER_NONE 0
+#define UBF_TIMER_POSIX 1
+#define UBF_TIMER_PTHREAD 2
+
+#ifndef UBF_TIMER
+# if defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_CREATE) && \
+ defined(CLOCK_MONOTONIC) && defined(USE_UBF_LIST)
+ /* preferred */
+# define UBF_TIMER UBF_TIMER_POSIX
+# elif defined(USE_UBF_LIST)
+ /* safe, but inefficient */
+# define UBF_TIMER UBF_TIMER_PTHREAD
+# else
+ /* we'll be racy without SIGVTALRM for ubf_list */
+# define UBF_TIMER UBF_TIMER_NONE
+# endif
+#endif
+
+#if UBF_TIMER == UBF_TIMER_POSIX
+static struct {
+ timer_t timerid;
+ rb_atomic_t armed; /* 0: disarmed, 1: arming, 2: armed */
+ rb_pid_t owner;
+} timer_posix;
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+static void *timer_pthread_fn(void *);
+static struct {
+ int low[2];
+ rb_atomic_t armed; /* boolean */
+ rb_pid_t owner;
+ pthread_t thid;
+} timer_pthread = {
+ { -1, -1 },
+};
+#endif
void rb_native_mutex_lock(rb_nativethread_lock_t *lock);
void rb_native_mutex_unlock(rb_nativethread_lock_t *lock);
@@ -53,6 +103,7 @@ static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd,
const struct timespec *,
int *drained_p);
+static void rb_timer_disarm(void);
#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid())
@@ -79,11 +130,30 @@ static const void *const condattr_monotonic = NULL;
/* 100ms. 10ms is too small for user level thread scheduling
* on recent Linux (tested on 2.6.35)
*/
-#define TIME_QUANTUM_USEC (100 * 1000)
+#define TIME_QUANTUM_MSEC (100)
+#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000)
+#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000)
static struct timespec native_cond_timeout(rb_nativethread_cond_t *,
struct timespec rel);
+/*
+ * Designate the next gvl.timer thread, favor the last thread in
+ * the waitq since it will be in waitq longest
+ */
+static int
+designate_timer_thread(rb_vm_t *vm)
+{
+ native_thread_data_t *last;
+
+ last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
+ if (last) {
+ rb_native_cond_signal(&last->cond.gvlq);
+ return TRUE;
+ }
+ return FALSE;
+}
+
static void
gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
{
@@ -98,13 +168,16 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
static struct timespec ts;
static int err = ETIMEDOUT;
+ /* take over timing from timer */
+ rb_timer_disarm();
+
/*
* become designated timer thread to kick vm->gvl.acquired
* periodically. Continue on old timeout if it expired:
*/
if (err == ETIMEDOUT) {
ts.tv_sec = 0;
- ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
+ ts.tv_nsec = TIME_QUANTUM_NSEC;
ts = native_cond_timeout(&nd->cond.gvlq, ts);
}
vm->gvl.timer = th;
@@ -132,18 +205,8 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
}
}
vm->gvl.acquired = th;
- /*
- * Designate the next gvl.timer thread, favor the last thread in
- * the waitq since it will be in waitq longest
- */
if (!vm->gvl.timer) {
- native_thread_data_t *last;
-
- last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
- if (last) {
- rb_native_cond_signal(&last->cond.gvlq);
- }
- else if (!ubf_threads_empty()) {
+ if (!designate_timer_thread(vm) && !ubf_threads_empty()) {
rb_thread_wakeup_timer_thread(0);
}
}
@@ -181,6 +244,7 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th)
{
native_thread_data_t *next;
+ ubf_wakeup_all_threads();
rb_native_mutex_lock(&vm->gvl.lock);
next = gvl_release_common(vm);
@@ -200,12 +264,6 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th)
}
else {
rb_native_mutex_unlock(&vm->gvl.lock);
- /*
- * GVL was not contended when we released, so we have no potential
- * contenders for reacquisition. Perhaps they are stuck in blocking
- * region w/o GVL, too, so we kick them:
- */
- ubf_wakeup_all_threads();
native_thread_yield();
rb_native_mutex_lock(&vm->gvl.lock);
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
@@ -427,8 +485,7 @@ native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel)
#define native_cleanup_push pthread_cleanup_push
#define native_cleanup_pop pthread_cleanup_pop
-#if defined(SIGVTALRM) && !defined(__CYGWIN__)
-#define USE_UBF_LIST 1
+#if defined(USE_UBF_LIST)
static rb_nativethread_lock_t ubf_list_lock;
#endif
@@ -1189,7 +1246,10 @@ unregister_ubf_list(rb_thread_t *th)
if (!list_empty((struct list_head*)node)) {
rb_native_mutex_lock(&ubf_list_lock);
- list_del_init(node);
+ list_del_init(node);
+ if (list_empty(&ubf_list_head) && !rb_signal_buff_size()) {
+ rb_timer_disarm();
+ }
rb_native_mutex_unlock(&ubf_list_lock);
}
}
@@ -1222,13 +1282,7 @@ ubf_select(void *ptr)
*/
rb_native_mutex_lock(&vm->gvl.lock);
if (!vm->gvl.timer) {
- native_thread_data_t *last;
-
- last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
- if (last) {
- rb_native_cond_signal(&last->cond.gvlq);
- }
- else {
+ if (!designate_timer_thread(vm)) {
rb_thread_wakeup_timer_thread(0);
}
}
@@ -1310,18 +1364,54 @@ rb_thread_wakeup_timer_thread_fd(int fd)
}
}
+static void
+rb_timer_arm(rb_pid_t current) /* async signal safe */
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+ if (timer_posix.owner == current && !ATOMIC_CAS(timer_posix.armed, 0, 1)) {
+ struct itimerspec it;
+
+ it.it_interval.tv_sec = it.it_value.tv_sec = 0;
+ it.it_interval.tv_nsec = it.it_value.tv_nsec = TIME_QUANTUM_NSEC;
+
+ if (timer_settime(timer_posix.timerid, 0, &it, 0))
+ rb_async_bug_errno("timer_settime (arm)", errno);
+
+ switch (ATOMIC_CAS(timer_posix.armed, 1, 2)) {
+ case 0:
+ /* somebody requested a disarm while we were arming */
+ it.it_interval.tv_nsec = it.it_value.tv_nsec = 0;
+ if (timer_settime(timer_posix.timerid, 0, &it, 0))
+ rb_async_bug_errno("timer_settime (disarm)", errno);
+
+ case 1: return; /* success */
+ case 2:
+ rb_async_bug_errno("UBF_TIMER_POSIX state 2 unexpected", EINVAL);
+ default:
+ rb_async_bug_errno("UBF_TIMER_POSIX unknown state", ERANGE);
+ }
+ }
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+ if (current == timer_pthread.owner) {
+ if (ATOMIC_EXCHANGE(timer_pthread.armed, 1) == 0)
+ rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]);
+ }
+#endif
+}
+
void
rb_thread_wakeup_timer_thread(int sig)
{
/* must be safe inside sighandler, so no mutex */
- if (timer_thread_pipe.owner_process == getpid()) {
+ rb_pid_t current = getpid();
+ if (timer_thread_pipe.owner_process == current) {
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
/*
* system_working check is required because vm and main_thread are
* freed during shutdown
*/
- if (sig && system_working) {
+ if (sig && system_working > 0) {
volatile rb_execution_context_t *ec;
rb_vm_t *vm = GET_VM();
rb_thread_t *mth;
@@ -1332,18 +1422,24 @@ rb_thread_wakeup_timer_thread(int sig)
*/
if (!vm) return;
mth = vm->main_thread;
- if (!mth || !system_working) return;
+ if (!mth || system_working <= 0) return;
/* this relies on GC for grace period before cont_free */
ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec);
- if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec);
+ if (ec) {
+ RUBY_VM_SET_TRAP_INTERRUPT(ec);
+ rb_timer_arm(current);
+ }
}
+ else if (sig == 0 && system_working > 0) {
+ rb_timer_arm(current);
+ }
}
}
#define CLOSE_INVALIDATE(expr) \
- close_invalidate(&timer_thread_pipe.expr,"close_invalidate: "#expr)
+ close_invalidate(&expr,"close_invalidate: "#expr)
static void
close_invalidate(int *fdp, const char *msg)
{
@@ -1446,6 +1542,52 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name)
}
static void
+rb_timer_invalidate(void)
+{
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ CLOSE_INVALIDATE(timer_pthread.low[0]);
+ CLOSE_INVALIDATE(timer_pthread.low[1]);
+#endif
+}
+
+static void
+rb_timer_pthread_create(rb_pid_t current)
+{
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ int err;
+ if (timer_pthread.owner == current)
+ return;
+
+ if (setup_communication_pipe_internal(timer_pthread.low) < 0)
+ return;
+
+ err = pthread_create(&timer_pthread.thid, 0, timer_pthread_fn, GET_VM());
+ if (!err)
+ timer_pthread.owner = current;
+ else
+ rb_warn("pthread_create failed for timer: %s, signals racy",
+ strerror(err));
+#endif
+}
+
+static void
+rb_timer_create(rb_pid_t current)
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+ struct sigevent sev;
+
+ sev.sigev_notify = SIGEV_SIGNAL;
+ sev.sigev_signo = SIGVTALRM;
+ sev.sigev_value.sival_ptr = &timer_posix;
+ if (!timer_create(CLOCK_MONOTONIC, &sev, &timer_posix.timerid))
+ timer_posix.owner = current;
+ else
+ rb_warn("timer_create failed: %s, signals racy", strerror(errno));
+#endif
+ rb_timer_pthread_create(current);
+}
+
+static void
rb_thread_create_timer_thread(void)
{
/* we only create the pipe, and lazy-spawn */
@@ -1453,17 +1595,65 @@ rb_thread_create_timer_thread(void)
rb_pid_t owner = timer_thread_pipe.owner_process;
if (owner && owner != current) {
- CLOSE_INVALIDATE(normal[0]);
- CLOSE_INVALIDATE(normal[1]);
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[0]);
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[1]);
+ rb_timer_invalidate();
}
if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return;
if (owner != current) {
/* validate pipe on this process */
+ rb_timer_create(current);
sigwait_th = THREAD_INVALID;
timer_thread_pipe.owner_process = current;
}
+ else {
+ /* UBF_TIMER_PTHREAD needs to recreate after fork */
+ rb_timer_pthread_create(current);
+ }
+}
+
+static void
+rb_timer_disarm(void)
+{
+#if UBF_TIMER == UBF_TIMER_POSIX
+ static const struct itimerspec zero;
+ rb_atomic_t armed = ATOMIC_EXCHANGE(timer_posix.armed, 0);
+
+ if (LIKELY(armed) == 0) return;
+ switch (armed) {
+ case 1: return; /* rb_timer_arm was arming and will disarm itself */
+ case 2:
+ if (timer_settime(timer_posix.timerid, 0, &zero, 0))
+ rb_bug_errno("timer_settime (disarm)", errno);
+ return;
+ default:
+ rb_bug("UBF_TIMER_POSIX bad state: %u\n", (unsigned)armed);
+ }
+#elif UBF_TIMER == UBF_TIMER_PTHREAD
+ ATOMIC_SET(timer_pthread.armed, 0);
+#endif
+}
+
+static void
+rb_timer_destroy(void)
+{
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ rb_pid_t current = getpid();
+ if (timer_pthread.owner == current) {
+ int err;
+
+ timer_pthread.owner = 0;
+ rb_timer_disarm();
+ rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]);
+ err = pthread_join(timer_pthread.thid, 0);
+ if (err) {
+ rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
+ }
+ }
+#endif
+/* no need to destroy real POSIX timers */
}
static int
@@ -1471,6 +1661,8 @@ native_stop_timer_thread(void)
{
int stopped;
stopped = --system_working <= 0;
+ if (stopped)
+ rb_timer_destroy();
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
return stopped;
@@ -1529,14 +1721,18 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
int
rb_reserved_fd_p(int fd)
{
- if ((fd == timer_thread_pipe.normal[0] ||
- fd == timer_thread_pipe.normal[1]) &&
- timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+ if (fd == timer_pthread.low[0] || fd == timer_pthread.low[1])
+ goto check_pid;
+#endif
+ if (fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1])
+ goto check_pid;
+
+ return 0;
+check_pid:
+ if (timer_thread_pipe.owner_process == getpid()) /* async-signal-safe */
return 1;
- }
- else {
- return 0;
- }
+ return 0;
}
rb_nativethread_id_t
@@ -1600,8 +1796,17 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond)
int
rb_sigwait_fd_get(const rb_thread_t *th)
{
- if (timer_thread_pipe.owner_process == getpid() &&
+ rb_pid_t current = getpid();
+
+ if (timer_thread_pipe.owner_process == current &&
timer_thread_pipe.normal[0] >= 0) {
+
+ /*
+ * no need to keep firing the timer if any thread is sleeping
+ * on the signal self-pipe
+ */
+ rb_timer_disarm();
+
if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) {
return timer_thread_pipe.normal[0];
}
@@ -1719,4 +1924,37 @@ native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
native_cond_sleep(th, timeout_rel);
}
}
+
+#if UBF_TIMER == UBF_TIMER_PTHREAD
+static void *
+timer_pthread_fn(void *p)
+{
+ rb_vm_t *vm = p;
+ pthread_t main_thread_id = vm->main_thread->thread_id;
+ struct pollfd pfd;
+ int timeout = -1;
+
+ pfd.fd = timer_pthread.low[0];
+ pfd.events = POLLIN;
+
+ while (system_working > 0) {
+ (void)poll(&pfd, 1, timeout);
+ (void)consume_communication_pipe(pfd.fd);
+
+ if (system_working > 0 && ATOMIC_CAS(timer_pthread.armed, 1, 1)) {
+ pthread_kill(main_thread_id, SIGVTALRM);
+
+ if (rb_signal_buff_size() || !ubf_threads_empty()) {
+ timeout = TIME_QUANTUM_MSEC;
+ }
+ else {
+ ATOMIC_SET(timer_pthread.armed, 0);
+ timeout = -1;
+ }
+ }
+ }
+
+ return 0;
+}
+#endif /* UBF_TIMER_PTHREAD */
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */