diff options
Diffstat (limited to 'thread_pthread.c')
-rw-r--r-- | thread_pthread.c | 326 |
1 files changed, 282 insertions, 44 deletions
diff --git a/thread_pthread.c b/thread_pthread.c index 545cc2fa3b..eb639d06aa 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -34,6 +34,56 @@ #if defined(__HAIKU__) #include <kernel/OS.h> #endif +#include <time.h> +#include <signal.h> + +#if defined(SIGVTALRM) && !defined(__CYGWIN__) +# define USE_UBF_LIST 1 +#endif + +/* + * UBF_TIMER and ubf_list both use SIGVTALRM. + * UBF_TIMER is to close TOCTTOU signal race on programs + * without GVL contention blocking read/write to sockets. + * + * ubf_list wakeups may be triggered periodically by UBF_TIMER on + * gvl_yield. + */ +#define UBF_TIMER_NONE 0 +#define UBF_TIMER_POSIX 1 +#define UBF_TIMER_PTHREAD 2 + +#ifndef UBF_TIMER +# if defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_CREATE) && \ + defined(CLOCK_MONOTONIC) && defined(USE_UBF_LIST) + /* preferred */ +# define UBF_TIMER UBF_TIMER_POSIX +# elif defined(USE_UBF_LIST) + /* safe, but inefficient */ +# define UBF_TIMER UBF_TIMER_PTHREAD +# else + /* we'll be racy without SIGVTALRM for ubf_list */ +# define UBF_TIMER UBF_TIMER_NONE +# endif +#endif + +#if UBF_TIMER == UBF_TIMER_POSIX +static struct { + timer_t timerid; + rb_atomic_t armed; /* 0: disarmed, 1: arming, 2: armed */ + rb_pid_t owner; +} timer_posix; +#elif UBF_TIMER == UBF_TIMER_PTHREAD +static void *timer_pthread_fn(void *); +static struct { + int low[2]; + rb_atomic_t armed; /* boolean */ + rb_pid_t owner; + pthread_t thid; +} timer_pthread = { + { -1, -1 }, +}; +#endif void rb_native_mutex_lock(rb_nativethread_lock_t *lock); void rb_native_mutex_unlock(rb_nativethread_lock_t *lock); @@ -53,6 +103,7 @@ static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, const struct timespec *, int *drained_p); +static void rb_timer_disarm(void); #define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) @@ -79,11 +130,30 @@ static const void *const condattr_monotonic = NULL; /* 100ms. 10ms is too small for user level thread scheduling * on recent Linux (tested on 2.6.35) */ -#define TIME_QUANTUM_USEC (100 * 1000) +#define TIME_QUANTUM_MSEC (100) +#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) +#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) static struct timespec native_cond_timeout(rb_nativethread_cond_t *, struct timespec rel); +/* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ +static int +designate_timer_thread(rb_vm_t *vm) +{ + native_thread_data_t *last; + + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + return TRUE; + } + return FALSE; +} + static void gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) { @@ -98,13 +168,16 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) static struct timespec ts; static int err = ETIMEDOUT; + /* take over timing from timer */ + rb_timer_disarm(); + /* * become designated timer thread to kick vm->gvl.acquired * periodically. Continue on old timeout if it expired: */ if (err == ETIMEDOUT) { ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; + ts.tv_nsec = TIME_QUANTUM_NSEC; ts = native_cond_timeout(&nd->cond.gvlq, ts); } vm->gvl.timer = th; @@ -132,18 +205,8 @@ gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) } } vm->gvl.acquired = th; - /* - * Designate the next gvl.timer thread, favor the last thread in - * the waitq since it will be in waitq longest - */ if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else if (!ubf_threads_empty()) { + if (!designate_timer_thread(vm) && !ubf_threads_empty()) { rb_thread_wakeup_timer_thread(0); } } @@ -181,6 +244,7 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) { native_thread_data_t *next; + ubf_wakeup_all_threads(); rb_native_mutex_lock(&vm->gvl.lock); next = gvl_release_common(vm); @@ -200,12 +264,6 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) } else { rb_native_mutex_unlock(&vm->gvl.lock); - /* - * GVL was not contended when we released, so we have no potential - * contenders for reacquisition. Perhaps they are stuck in blocking - * region w/o GVL, too, so we kick them: - */ - ubf_wakeup_all_threads(); native_thread_yield(); rb_native_mutex_lock(&vm->gvl.lock); rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); @@ -427,8 +485,7 @@ native_cond_timeout(rb_nativethread_cond_t *cond, struct timespec timeout_rel) #define native_cleanup_push pthread_cleanup_push #define native_cleanup_pop pthread_cleanup_pop -#if defined(SIGVTALRM) && !defined(__CYGWIN__) -#define USE_UBF_LIST 1 +#if defined(USE_UBF_LIST) static rb_nativethread_lock_t ubf_list_lock; #endif @@ -1189,7 +1246,10 @@ unregister_ubf_list(rb_thread_t *th) if (!list_empty((struct list_head*)node)) { rb_native_mutex_lock(&ubf_list_lock); - list_del_init(node); + list_del_init(node); + if (list_empty(&ubf_list_head) && !rb_signal_buff_size()) { + rb_timer_disarm(); + } rb_native_mutex_unlock(&ubf_list_lock); } } @@ -1222,13 +1282,7 @@ ubf_select(void *ptr) */ rb_native_mutex_lock(&vm->gvl.lock); if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else { + if (!designate_timer_thread(vm)) { rb_thread_wakeup_timer_thread(0); } } @@ -1310,18 +1364,54 @@ rb_thread_wakeup_timer_thread_fd(int fd) } } +static void +rb_timer_arm(rb_pid_t current) /* async signal safe */ +{ +#if UBF_TIMER == UBF_TIMER_POSIX + if (timer_posix.owner == current && !ATOMIC_CAS(timer_posix.armed, 0, 1)) { + struct itimerspec it; + + it.it_interval.tv_sec = it.it_value.tv_sec = 0; + it.it_interval.tv_nsec = it.it_value.tv_nsec = TIME_QUANTUM_NSEC; + + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (arm)", errno); + + switch (ATOMIC_CAS(timer_posix.armed, 1, 2)) { + case 0: + /* somebody requested a disarm while we were arming */ + it.it_interval.tv_nsec = it.it_value.tv_nsec = 0; + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (disarm)", errno); + + case 1: return; /* success */ + case 2: + rb_async_bug_errno("UBF_TIMER_POSIX state 2 unexpected", EINVAL); + default: + rb_async_bug_errno("UBF_TIMER_POSIX unknown state", ERANGE); + } + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + if (current == timer_pthread.owner) { + if (ATOMIC_EXCHANGE(timer_pthread.armed, 1) == 0) + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + } +#endif +} + void rb_thread_wakeup_timer_thread(int sig) { /* must be safe inside sighandler, so no mutex */ - if (timer_thread_pipe.owner_process == getpid()) { + rb_pid_t current = getpid(); + if (timer_thread_pipe.owner_process == current) { rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); /* * system_working check is required because vm and main_thread are * freed during shutdown */ - if (sig && system_working) { + if (sig && system_working > 0) { volatile rb_execution_context_t *ec; rb_vm_t *vm = GET_VM(); rb_thread_t *mth; @@ -1332,18 +1422,24 @@ rb_thread_wakeup_timer_thread(int sig) */ if (!vm) return; mth = vm->main_thread; - if (!mth || !system_working) return; + if (!mth || system_working <= 0) return; /* this relies on GC for grace period before cont_free */ ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); - if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); + if (ec) { + RUBY_VM_SET_TRAP_INTERRUPT(ec); + rb_timer_arm(current); + } } + else if (sig == 0 && system_working > 0) { + rb_timer_arm(current); + } } } #define CLOSE_INVALIDATE(expr) \ - close_invalidate(&timer_thread_pipe.expr,"close_invalidate: "#expr) + close_invalidate(&expr,"close_invalidate: "#expr) static void close_invalidate(int *fdp, const char *msg) { @@ -1446,6 +1542,52 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name) } static void +rb_timer_invalidate(void) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + CLOSE_INVALIDATE(timer_pthread.low[0]); + CLOSE_INVALIDATE(timer_pthread.low[1]); +#endif +} + +static void +rb_timer_pthread_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + int err; + if (timer_pthread.owner == current) + return; + + if (setup_communication_pipe_internal(timer_pthread.low) < 0) + return; + + err = pthread_create(&timer_pthread.thid, 0, timer_pthread_fn, GET_VM()); + if (!err) + timer_pthread.owner = current; + else + rb_warn("pthread_create failed for timer: %s, signals racy", + strerror(err)); +#endif +} + +static void +rb_timer_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + struct sigevent sev; + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIGVTALRM; + sev.sigev_value.sival_ptr = &timer_posix; + if (!timer_create(CLOCK_MONOTONIC, &sev, &timer_posix.timerid)) + timer_posix.owner = current; + else + rb_warn("timer_create failed: %s, signals racy", strerror(errno)); +#endif + rb_timer_pthread_create(current); +} + +static void rb_thread_create_timer_thread(void) { /* we only create the pipe, and lazy-spawn */ @@ -1453,17 +1595,65 @@ rb_thread_create_timer_thread(void) rb_pid_t owner = timer_thread_pipe.owner_process; if (owner && owner != current) { - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); + CLOSE_INVALIDATE(timer_thread_pipe.normal[0]); + CLOSE_INVALIDATE(timer_thread_pipe.normal[1]); + rb_timer_invalidate(); } if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; if (owner != current) { /* validate pipe on this process */ + rb_timer_create(current); sigwait_th = THREAD_INVALID; timer_thread_pipe.owner_process = current; } + else { + /* UBF_TIMER_PTHREAD needs to recreate after fork */ + rb_timer_pthread_create(current); + } +} + +static void +rb_timer_disarm(void) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + static const struct itimerspec zero; + rb_atomic_t armed = ATOMIC_EXCHANGE(timer_posix.armed, 0); + + if (LIKELY(armed) == 0) return; + switch (armed) { + case 1: return; /* rb_timer_arm was arming and will disarm itself */ + case 2: + if (timer_settime(timer_posix.timerid, 0, &zero, 0)) + rb_bug_errno("timer_settime (disarm)", errno); + return; + default: + rb_bug("UBF_TIMER_POSIX bad state: %u\n", (unsigned)armed); + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + ATOMIC_SET(timer_pthread.armed, 0); +#endif +} + +static void +rb_timer_destroy(void) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + rb_pid_t current = getpid(); + if (timer_pthread.owner == current) { + int err; + + timer_pthread.owner = 0; + rb_timer_disarm(); + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + err = pthread_join(timer_pthread.thid, 0); + if (err) { + rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); + } + } +#endif +/* no need to destroy real POSIX timers */ } static int @@ -1471,6 +1661,8 @@ native_stop_timer_thread(void) { int stopped; stopped = --system_working <= 0; + if (stopped) + rb_timer_destroy(); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); return stopped; @@ -1529,14 +1721,18 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr) int rb_reserved_fd_p(int fd) { - if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1]) && - timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ +#if UBF_TIMER == UBF_TIMER_PTHREAD + if (fd == timer_pthread.low[0] || fd == timer_pthread.low[1]) + goto check_pid; +#endif + if (fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1]) + goto check_pid; + + return 0; +check_pid: + if (timer_thread_pipe.owner_process == getpid()) /* async-signal-safe */ return 1; - } - else { - return 0; - } + return 0; } rb_nativethread_id_t @@ -1600,8 +1796,17 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond) int rb_sigwait_fd_get(const rb_thread_t *th) { - if (timer_thread_pipe.owner_process == getpid() && + rb_pid_t current = getpid(); + + if (timer_thread_pipe.owner_process == current && timer_thread_pipe.normal[0] >= 0) { + + /* + * no need to keep firing the timer if any thread is sleeping + * on the signal self-pipe + */ + rb_timer_disarm(); + if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { return timer_thread_pipe.normal[0]; } @@ -1719,4 +1924,37 @@ native_sleep(rb_thread_t *th, struct timespec *timeout_rel) native_cond_sleep(th, timeout_rel); } } + +#if UBF_TIMER == UBF_TIMER_PTHREAD +static void * +timer_pthread_fn(void *p) +{ + rb_vm_t *vm = p; + pthread_t main_thread_id = vm->main_thread->thread_id; + struct pollfd pfd; + int timeout = -1; + + pfd.fd = timer_pthread.low[0]; + pfd.events = POLLIN; + + while (system_working > 0) { + (void)poll(&pfd, 1, timeout); + (void)consume_communication_pipe(pfd.fd); + + if (system_working > 0 && ATOMIC_CAS(timer_pthread.armed, 1, 1)) { + pthread_kill(main_thread_id, SIGVTALRM); + + if (rb_signal_buff_size() || !ubf_threads_empty()) { + timeout = TIME_QUANTUM_MSEC; + } + else { + ATOMIC_SET(timer_pthread.armed, 0); + timeout = -1; + } + } + } + + return 0; +} +#endif /* UBF_TIMER_PTHREAD */ #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ |