summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authornormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2018-07-29 20:47:33 +0000
committernormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2018-07-29 20:47:33 +0000
commit708bfd21156828526fe72de2cedecfaca6647dc1 (patch)
tree3a29a7e3edf47618e8cf8f1159d27993e12e66ce /thread.c
parent822e54a527b6be1f4cda7c48cf723e02b51fadc9 (diff)
thread_pthread: remove timer-thread by restructuring GVL
To reduce resource use and reduce CI failure; remove timer-thread. Single-threaded Ruby processes (including forked children) will never see extra thread overhead. This prevents glibc and jemalloc from going into multi-threaded mode and initializing locks or causing fragmentation via arena explosion. The GVL is implements its own wait-queue as a ccan/list to permit controlling wakeup order. Timeslice under contention is handled by a designated timer thread (similar to choosing a "patrol_thread" for current deadlock checking). There is only one self-pipe, now, as wakeups for timeslice are done independently using condition variables. This reduces FD pressure slightly. Signal handling is handled directly by a Ruby Thread (instead of timer-thread) by exposing signal self-pipe to callers of rb_thread_fd_select, native_sleep, rb_wait_for_single_fd, etc... Acquiring, using, and releasing the self-pipe is exposed via 4 new internal functions: 1) rb_sigwait_fd_get - exclusively acquire timer_thread_pipe.normal[0] 2) rb_sigwait_fd_sleep - sleep and wait for signal (and no other FDs) 3) rb_sigwait_fd_put - release acquired result from rb_sigwait_fd_get 4) rb_sigwait_fd_migrate - migrate signal handling to another thread after calling rb_sigwait_fd_put. rb_sigwait_fd_migrate is necessary for waitpid callers because only one thread can wait on self-pipe at a time, otherwise a deadlock will occur if threads fight over the self-pipe. TRAP_INTERRUPT_MASK is now set for the main thread directly in signal handler via rb_thread_wakeup_timer_thread. Originally, I wanted to use POSIX timers (timer_create/timer_settime) for this. Unfortunately, this proved unfeasible as Mutex#sleep resumes on spurious wakeups and test/thread/test_cv.rb::test_condvar_timed_wait failed. Using pthread_sigmask to mask out SIGVTALRM fixed that test, but test/fiddle/test_function.rb::test_nogvl_poll proved there'd be some unavoidable (and frequent) incompatibilities from that approach. Finally, this allows us to drop thread_destruct_lock and interrupt current ec directly. We don't need to rely on vm->thread_destruct_lock or a coherent vm->running_thread on any platform. Separate timer-thread for time slice and signal handling is relegated to thread_win32.c, now. [ruby-core:88088] [Misc #14937] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64107 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c327
1 files changed, 206 insertions, 121 deletions
diff --git a/thread.c b/thread.c
index a7d48a4..01bfad1 100644
--- a/thread.c
+++ b/thread.c
@@ -108,6 +108,10 @@ static void timespec_add(struct timespec *, const struct timespec *);
static void timespec_sub(struct timespec *, const struct timespec *);
static int timespec_update_expire(struct timespec *, const struct timespec *);
static void getclockofday(struct timespec *);
+NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
+static void consume_communication_pipe(int fd);
+static void check_signals_nogvl(rb_thread_t *, int sigwait_fd);
+void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
#define eKillSignal INT2FIX(0)
#define eTerminateSignal INT2FIX(1)
@@ -348,7 +352,14 @@ rb_thread_s_debug_set(VALUE self, VALUE val)
#endif
NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
VALUE *register_stack_start));
-static void timer_thread_function(void *);
+static void timer_thread_function(void);
+void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
+
+static void
+ubf_sigwait(void *ignore)
+{
+ rb_thread_wakeup_timer_thread(0);
+}
#if defined(_WIN32)
#include "thread_win32.c"
@@ -412,7 +423,6 @@ rb_vm_gvl_destroy(rb_vm_t *vm)
{
gvl_release(vm);
gvl_destroy(vm);
- rb_native_mutex_destroy(&vm->thread_destruct_lock);
if (0) {
/* may be held by running threads */
rb_native_mutex_destroy(&vm->waitpid_lock);
@@ -773,10 +783,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
rb_fiber_close(th->ec->fiber_ptr);
}
- rb_native_mutex_lock(&th->vm->thread_destruct_lock);
- /* make sure vm->running_thread never point me after this point.*/
- th->vm->running_thread = NULL;
- rb_native_mutex_unlock(&th->vm->thread_destruct_lock);
thread_cleanup_func(th, FALSE);
gvl_release(th->vm);
@@ -2163,6 +2169,14 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
/* signal handling */
if (trap_interrupt && (th == th->vm->main_thread)) {
enum rb_thread_status prev_status = th->status;
+ int sigwait_fd = rb_sigwait_fd_get(th);
+
+ if (sigwait_fd >= 0) {
+ consume_communication_pipe(sigwait_fd);
+ ruby_sigchld_handler(th->vm);
+ rb_sigwait_fd_put(th, sigwait_fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
th->status = THREAD_RUNNABLE;
while ((sig = rb_get_next_signal()) != 0) {
rb_signal_exec(th, sig);
@@ -3840,86 +3854,75 @@ wait_retryable(int *result, int errnum, struct timespec *timeout,
return FALSE;
}
-#define restore_fdset(fds1, fds2) \
- ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0)
-
struct select_set {
- rb_fdset_t read;
- rb_fdset_t write;
- rb_fdset_t except;
+ int max;
+ int sigwait_fd;
+ rb_thread_t *th;
+ rb_fdset_t *rset;
+ rb_fdset_t *wset;
+ rb_fdset_t *eset;
+ rb_fdset_t orig_rset;
+ rb_fdset_t orig_wset;
+ rb_fdset_t orig_eset;
+ struct timeval *timeout;
};
-static size_t
-select_set_memsize(const void *p)
+static VALUE
+select_set_free(VALUE p)
{
- return sizeof(struct select_set);
-}
+ struct select_set *set = (struct select_set *)p;
-static void
-select_set_free(void *p)
-{
- struct select_set *orig = p;
+ if (set->sigwait_fd >= 0) {
+ rb_sigwait_fd_put(set->th, set->sigwait_fd);
+ rb_sigwait_fd_migrate(set->th->vm);
+ }
- rb_fd_term(&orig->read);
- rb_fd_term(&orig->write);
- rb_fd_term(&orig->except);
- xfree(orig);
-}
+ rb_fd_term(&set->orig_rset);
+ rb_fd_term(&set->orig_wset);
+ rb_fd_term(&set->orig_eset);
-static const rb_data_type_t select_set_type = {
- "select_set",
- {NULL, select_set_free, select_set_memsize,},
- 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
-};
+ return Qfalse;
+}
-static int
-do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds,
- rb_fdset_t *const exceptfds, struct timeval *timeout)
+static VALUE
+do_select(VALUE p)
{
+ struct select_set *set = (struct select_set *)p;
int MAYBE_UNUSED(result);
int lerrno;
struct timespec ts, end, *tsp;
- rb_thread_t *th = GET_THREAD();
- VALUE o;
- struct select_set *orig;
-
- o = TypedData_Make_Struct(0, struct select_set, &select_set_type, orig);
- timeout_prepare(&tsp, &ts, &end, timeout);
+ timeout_prepare(&tsp, &ts, &end, set->timeout);
+#define restore_fdset(dst, src) \
+ ((dst) ? rb_fd_dup(dst, src) : (void)0)
#define do_select_update() \
- (restore_fdset(readfds, &orig->read), \
- restore_fdset(writefds, &orig->write), \
- restore_fdset(exceptfds, &orig->except), \
+ (restore_fdset(set->rset, &set->orig_rset), \
+ restore_fdset(set->wset, &set->orig_wset), \
+ restore_fdset(set->eset, &set->orig_eset), \
TRUE)
-#define fd_init_copy(f) \
- (f##fds) ? rb_fd_init_copy(&orig->f, f##fds) : rb_fd_no_init(&orig->f)
- fd_init_copy(read);
- fd_init_copy(write);
- fd_init_copy(except);
-#undef fd_init_copy
-
do {
lerrno = 0;
- BLOCKING_REGION(th, {
- result = native_fd_select(n, readfds, writefds, exceptfds,
- timeval_for(timeout, tsp), th);
+ BLOCKING_REGION(set->th, {
+ result = native_fd_select(set->max, set->rset, set->wset, set->eset,
+ timeval_for(set->timeout, tsp), set->th);
if (result < 0) lerrno = errno;
- }, ubf_select, th, FALSE);
+ }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE);
- RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */
- } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update());
+ if (set->sigwait_fd >= 0 && rb_fd_isset(set->sigwait_fd, set->rset)) {
+ result--;
+ check_signals_nogvl(set->th, set->sigwait_fd);
+ }
- /* didn't raise, perform cleanup ourselves */
- select_set_free(orig);
- rb_gc_force_recycle(o);
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
+ } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update());
if (result < 0) {
errno = lerrno;
}
- return result;
+ return (VALUE)result;
}
static void
@@ -3955,11 +3958,42 @@ rb_thread_fd_writable(int fd)
return TRUE;
}
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
+{
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
+
+ return fds;
+}
+
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout)
{
- if (!read && !write && !except) {
+ struct select_set set;
+
+ set.th = GET_THREAD();
+ set.max = max;
+ set.sigwait_fd = rb_sigwait_fd_get(set.th);
+ set.rset = read;
+ set.wset = write;
+ set.eset = except;
+ set.timeout = timeout;
+
+ if (set.sigwait_fd >= 0) {
+ if (set.rset)
+ rb_fd_set(set.sigwait_fd, set.rset);
+ else
+ set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
+ if (set.sigwait_fd > set.max) {
+ set.max = set.sigwait_fd + 1;
+ }
+ }
+ if (!set.rset && !set.wset && !set.eset) {
if (!timeout) {
rb_thread_sleep_forever();
return 0;
@@ -3968,16 +4002,23 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0;
}
- if (read) {
- rb_fd_resize(max - 1, read);
- }
- if (write) {
- rb_fd_resize(max - 1, write);
- }
- if (except) {
- rb_fd_resize(max - 1, except);
- }
- return do_select(max, read, write, except, timeout);
+#define fd_init_copy(f) do { \
+ if (set.f) { \
+ rb_fd_resize(set.max - 1, set.f); \
+ if (&set.orig_##f != set.f) { /* sigwait_fd */ \
+ rb_fd_init_copy(&set.orig_##f, set.f); \
+ } \
+ } \
+ else { \
+ rb_fd_no_init(&set.orig_##f); \
+ } \
+ } while (0)
+ fd_init_copy(rset);
+ fd_init_copy(wset);
+ fd_init_copy(eset);
+#undef fd_init_copy
+
+ return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
#ifdef USE_POLL
@@ -4028,31 +4069,55 @@ ruby_ppoll(struct pollfd *fds, nfds_t nfds,
int
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
- struct pollfd fds;
+ struct pollfd fds[2];
int result = 0, lerrno;
struct timespec ts, end, *tsp;
rb_thread_t *th = GET_THREAD();
+ nfds_t nfds;
+ rb_unblock_function_t *ubf;
timeout_prepare(&tsp, &ts, &end, timeout);
- fds.fd = fd;
- fds.events = (short)events;
-
+ fds[0].fd = fd;
+ fds[0].events = (short)events;
do {
- fds.revents = 0;
+ fds[0].revents = 0;
+ fds[1].fd = rb_sigwait_fd_get(th);
+
+ if (fds[1].fd >= 0) {
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
+ nfds = 2;
+ ubf = ubf_sigwait;
+ }
+ else {
+ nfds = 1;
+ ubf = ubf_select;
+ }
+
lerrno = 0;
BLOCKING_REGION(th, {
- result = ppoll(&fds, 1, tsp, NULL);
+ result = ppoll(fds, nfds, tsp, NULL);
if (result < 0) lerrno = errno;
- }, ubf_select, th, FALSE);
+ }, ubf, th, FALSE);
+ if (fds[1].fd >= 0) {
+ if (fds[1].revents) {
+ result--;
+ check_signals_nogvl(th, fds[1].fd);
+ fds[1].revents = 0;
+ }
+ rb_sigwait_fd_put(th, fds[1].fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
} while (wait_retryable(&result, lerrno, tsp, &end));
+
if (result < 0) {
errno = lerrno;
return -1;
}
- if (fds.revents & POLLNVAL) {
+ if (fds[0].revents & POLLNVAL) {
errno = EBADF;
return -1;
}
@@ -4062,32 +4127,20 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
* Therefore we need to fix it up.
*/
result = 0;
- if (fds.revents & POLLIN_SET)
+ if (fds[0].revents & POLLIN_SET)
result |= RB_WAITFD_IN;
- if (fds.revents & POLLOUT_SET)
+ if (fds[0].revents & POLLOUT_SET)
result |= RB_WAITFD_OUT;
- if (fds.revents & POLLEX_SET)
+ if (fds[0].revents & POLLEX_SET)
result |= RB_WAITFD_PRI;
/* all requested events are ready if there is an error */
- if (fds.revents & POLLERR_SET)
+ if (fds[0].revents & POLLERR_SET)
result |= events;
return result;
}
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
-static rb_fdset_t *
-init_set_fd(int fd, rb_fdset_t *fds)
-{
- if (fd < 0) {
- return 0;
- }
- rb_fd_init(fds);
- rb_fd_set(fd, fds);
-
- return fds;
-}
-
struct select_args {
union {
int fd;
@@ -4168,10 +4221,6 @@ rb_gc_set_stack_end(VALUE **stack_end_p)
}
#endif
-
-/* signal.c */
-void ruby_sigchld_handler(rb_vm_t *);
-
/*
*
*/
@@ -4187,36 +4236,73 @@ rb_threadptr_check_signal(rb_thread_t *mth)
}
static void
-timer_thread_function(void *arg)
+timer_thread_function(void)
{
- rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
+ volatile rb_execution_context_t *ec;
- /*
- * Tricky: thread_destruct_lock doesn't close a race against
- * vm->running_thread switch. however it guarantees th->running_thread
- * point to valid pointer or NULL.
- */
- rb_native_mutex_lock(&vm->thread_destruct_lock);
/* for time slice */
- if (vm->running_thread) {
- RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec);
- }
- rb_native_mutex_unlock(&vm->thread_destruct_lock);
+ ec = ACCESS_ONCE(rb_execution_context_t *,
+ ruby_current_execution_context_ptr);
+ if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec);
+}
- /* check signal */
- ruby_sigchld_handler(vm);
- rb_threadptr_check_signal(vm->main_thread);
+static void
+async_bug_fd(const char *mesg, int errno_arg, int fd)
+{
+ char buff[64];
+ size_t n = strlcpy(buff, mesg, sizeof(buff));
+ if (n < sizeof(buff)-3) {
+ ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
+ }
+ rb_async_bug_errno(buff, errno_arg);
+}
-#if 0
- /* prove profiler */
- if (vm->prove_profile.enable) {
- rb_thread_t *th = vm->running_thread;
+/* VM-dependent API is not available for this function */
+static void
+consume_communication_pipe(int fd)
+{
+#define CCP_READ_BUFF_SIZE 1024
+ /* buffer can be shared because no one refers to them. */
+ static char buff[CCP_READ_BUFF_SIZE];
+ ssize_t result;
- if (vm->during_gc) {
- /* GC prove profiling */
+ while (1) {
+ result = read(fd, buff, sizeof(buff));
+ if (result == 0) {
+ return;
}
- }
+ else if (result < 0) {
+ int e = errno;
+ switch (e) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
#endif
+ return;
+ default:
+ async_bug_fd("consume_communication_pipe: read", e, fd);
+ }
+ }
+ }
+}
+
+static void
+check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
+{
+ rb_vm_t *vm = GET_VM(); /* th may be 0 */
+
+ consume_communication_pipe(sigwait_fd);
+ ubf_wakeup_all_threads();
+ ruby_sigchld_handler(vm);
+ if (rb_signal_buff_size()) {
+ if (th == vm->main_thread)
+ /* no need to lock + wakeup if already in main thread */
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ else
+ threadptr_trap_interrupt(vm->main_thread);
+ }
}
void
@@ -5046,7 +5132,6 @@ Init_Thread(void)
/* acquire global vm lock */
gvl_init(th->vm);
gvl_acquire(th->vm, th);
- rb_native_mutex_initialize(&th->vm->thread_destruct_lock);
rb_native_mutex_initialize(&th->vm->waitpid_lock);
rb_native_mutex_initialize(&th->interrupt_lock);