diff options
author | normal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2018-08-13 21:34:20 +0000 |
---|---|---|
committer | normal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2018-08-13 21:34:20 +0000 |
commit | 48b6bd74e2febde095ac85d818e94c0e58677647 (patch) | |
tree | 73348a2cf6a66b1f1d6620b93109b7f7dfb0ca03 /thread.c | |
parent | 4d2e0fffb08f0418fa6995be2e15aad7ee11b048 (diff) |
thread_pthread.c: eliminate timer thread by restructuring GVL
This reverts commit 194a6a2c68e9c8a3536b24db18ceac87535a6051 (r64203).
Race conditions which caused the original reversion will be fixed
in the subsequent commit.
[ruby-core:88360] [Misc #14937]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64352 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 397 |
1 files changed, 246 insertions, 151 deletions
@@ -106,8 +106,13 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); static void timespec_add(struct timespec *, const struct timespec *); static void timespec_sub(struct timespec *, const struct timespec *); +static int timespec_cmp(const struct timespec *a, const struct timespec *b); static int timespec_update_expire(struct timespec *, const struct timespec *); static void getclockofday(struct timespec *); +NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); +static int consume_communication_pipe(int fd); +static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); +void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ #define eKillSignal INT2FIX(0) #define eTerminateSignal INT2FIX(1) @@ -348,7 +353,14 @@ rb_thread_s_debug_set(VALUE self, VALUE val) #endif NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)); -static void timer_thread_function(void *); +static void timer_thread_function(void); +void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ + +static void +ubf_sigwait(void *ignore) +{ + rb_thread_wakeup_timer_thread(0); +} #if defined(_WIN32) #include "thread_win32.c" @@ -373,6 +385,15 @@ static void timer_thread_function(void *); #error "unsupported thread type" #endif +/* + * TODO: somebody with win32 knowledge should be able to get rid of + * timer-thread by busy-waiting on signals. And it should be possible + * to make the GVL in thread_pthread.c be platform-independent. + */ +#ifndef BUSY_WAIT_SIGNALS +# define BUSY_WAIT_SIGNALS (0) +#endif + #if THREAD_DEBUG static int debug_mutex_initialized = 1; static rb_nativethread_lock_t debug_mutex; @@ -412,7 +433,6 @@ rb_vm_gvl_destroy(rb_vm_t *vm) { gvl_release(vm); gvl_destroy(vm); - rb_native_mutex_destroy(&vm->thread_destruct_lock); if (0) { /* may be held by running threads */ rb_native_mutex_destroy(&vm->waitpid_lock); @@ -773,10 +793,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s rb_fiber_close(th->ec->fiber_ptr); } - rb_native_mutex_lock(&th->vm->thread_destruct_lock); - /* make sure vm->running_thread never point me after this point.*/ - th->vm->running_thread = NULL; - rb_native_mutex_unlock(&th->vm->thread_destruct_lock); thread_cleanup_func(th, FALSE); gvl_release(th->vm); @@ -2163,6 +2179,14 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) /* signal handling */ if (trap_interrupt && (th == th->vm->main_thread)) { enum rb_thread_status prev_status = th->status; + int sigwait_fd = rb_sigwait_fd_get(th); + + if (sigwait_fd >= 0) { + (void)consume_communication_pipe(sigwait_fd); + ruby_sigchld_handler(th->vm); + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } th->status = THREAD_RUNNABLE; while ((sig = rb_get_next_signal()) != 0) { rb_signal_exec(th, sig); @@ -3840,86 +3864,95 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, return FALSE; } -#define restore_fdset(fds1, fds2) \ - ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0) - struct select_set { - rb_fdset_t read; - rb_fdset_t write; - rb_fdset_t except; + int max; + int sigwait_fd; + rb_thread_t *th; + rb_fdset_t *rset; + rb_fdset_t *wset; + rb_fdset_t *eset; + rb_fdset_t orig_rset; + rb_fdset_t orig_wset; + rb_fdset_t orig_eset; + struct timeval *timeout; }; -static size_t -select_set_memsize(const void *p) +static VALUE +select_set_free(VALUE p) { - return sizeof(struct select_set); + struct select_set *set = (struct select_set *)p; + + if (set->sigwait_fd >= 0) { + rb_sigwait_fd_put(set->th, set->sigwait_fd); + rb_sigwait_fd_migrate(set->th->vm); + } + + rb_fd_term(&set->orig_rset); + rb_fd_term(&set->orig_wset); + rb_fd_term(&set->orig_eset); + + return Qfalse; } -static void -select_set_free(void *p) +static const struct timespec * +sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig, + int *drained_p) { - struct select_set *orig = p; + static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; - rb_fd_term(&orig->read); - rb_fd_term(&orig->write); - rb_fd_term(&orig->except); - xfree(orig); -} + if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { + *drained_p = check_signals_nogvl(th, sigwait_fd); + if (!orig || timespec_cmp(orig, &quantum) > 0) + return &quantum; + } -static const rb_data_type_t select_set_type = { - "select_set", - {NULL, select_set_free, select_set_memsize,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY -}; + return orig; +} -static int -do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, - rb_fdset_t *const exceptfds, struct timeval *timeout) +static VALUE +do_select(VALUE p) { + struct select_set *set = (struct select_set *)p; int MAYBE_UNUSED(result); int lerrno; struct timespec ts, end, *tsp; - rb_thread_t *th = GET_THREAD(); - VALUE o; - struct select_set *orig; - - o = TypedData_Make_Struct(0, struct select_set, &select_set_type, orig); + const struct timespec *to; + struct timeval tv; - timeout_prepare(&tsp, &ts, &end, timeout); + timeout_prepare(&tsp, &ts, &end, set->timeout); +#define restore_fdset(dst, src) \ + ((dst) ? rb_fd_dup(dst, src) : (void)0) #define do_select_update() \ - (restore_fdset(readfds, &orig->read), \ - restore_fdset(writefds, &orig->write), \ - restore_fdset(exceptfds, &orig->except), \ + (restore_fdset(set->rset, &set->orig_rset), \ + restore_fdset(set->wset, &set->orig_wset), \ + restore_fdset(set->eset, &set->orig_eset), \ TRUE) -#define fd_init_copy(f) \ - (f##fds) ? rb_fd_init_copy(&orig->f, f##fds) : rb_fd_no_init(&orig->f) - fd_init_copy(read); - fd_init_copy(write); - fd_init_copy(except); -#undef fd_init_copy - do { + int drained; lerrno = 0; - BLOCKING_REGION(th, { - result = native_fd_select(n, readfds, writefds, exceptfds, - timeval_for(timeout, tsp), th); + BLOCKING_REGION(set->th, { + to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained); + result = native_fd_select(set->max, set->rset, set->wset, set->eset, + timeval_for(&tv, to), set->th); if (result < 0) lerrno = errno; - }, ubf_select, th, FALSE); + }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */ - } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); + if (set->sigwait_fd >= 0) { + if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) + result--; + (void)check_signals_nogvl(set->th, set->sigwait_fd); + } - /* didn't raise, perform cleanup ourselves */ - select_set_free(orig); - rb_gc_force_recycle(o); + RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ + } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); if (result < 0) { errno = lerrno; } - return result; + return (VALUE)result; } static void @@ -3955,11 +3988,42 @@ rb_thread_fd_writable(int fd) return TRUE; } +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) +{ + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); + + return fds; +} + int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) { - if (!read && !write && !except) { + struct select_set set; + + set.th = GET_THREAD(); + set.max = max; + set.sigwait_fd = rb_sigwait_fd_get(set.th); + set.rset = read; + set.wset = write; + set.eset = except; + set.timeout = timeout; + + if (set.sigwait_fd >= 0) { + if (set.rset) + rb_fd_set(set.sigwait_fd, set.rset); + else + set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); + if (set.sigwait_fd > set.max) { + set.max = set.sigwait_fd + 1; + } + } + if (!set.rset && !set.wset && !set.eset) { if (!timeout) { rb_thread_sleep_forever(); return 0; @@ -3968,16 +4032,23 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return 0; } - if (read) { - rb_fd_resize(max - 1, read); - } - if (write) { - rb_fd_resize(max - 1, write); - } - if (except) { - rb_fd_resize(max - 1, except); - } - return do_select(max, read, write, except, timeout); +#define fd_init_copy(f) do { \ + if (set.f) { \ + rb_fd_resize(set.max - 1, set.f); \ + if (&set.orig_##f != set.f) { /* sigwait_fd */ \ + rb_fd_init_copy(&set.orig_##f, set.f); \ + } \ + } \ + else { \ + rb_fd_no_init(&set.orig_##f); \ + } \ + } while (0) + fd_init_copy(rset); + fd_init_copy(wset); + fd_init_copy(eset); +#undef fd_init_copy + + return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); } #ifdef USE_POLL @@ -3991,68 +4062,64 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * # define POLLERR_SET (0) #endif -#ifndef HAVE_PPOLL -/* TODO: don't ignore sigmask */ -static int -ruby_ppoll(struct pollfd *fds, nfds_t nfds, - const struct timespec *ts, const sigset_t *sigmask) -{ - int timeout_ms; - - if (ts) { - int tmp, tmp2; - - if (ts->tv_sec > INT_MAX/1000) - timeout_ms = INT_MAX; - else { - tmp = (int)(ts->tv_sec * 1000); - /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ - tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); - if (INT_MAX - tmp < tmp2) - timeout_ms = INT_MAX; - else - timeout_ms = (int)(tmp + tmp2); - } - } - else - timeout_ms = -1; - - return poll(fds, nfds, timeout_ms); -} -# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) -#endif - /* * returns a mask of events */ int rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) { - struct pollfd fds; + struct pollfd fds[2]; int result = 0, lerrno; struct timespec ts, end, *tsp; + const struct timespec *to; + int drained; rb_thread_t *th = GET_THREAD(); + nfds_t nfds; + rb_unblock_function_t *ubf; timeout_prepare(&tsp, &ts, &end, timeout); - fds.fd = fd; - fds.events = (short)events; - + fds[0].fd = fd; + fds[0].events = (short)events; do { - fds.revents = 0; + fds[0].revents = 0; + fds[1].fd = rb_sigwait_fd_get(th); + + if (fds[1].fd >= 0) { + fds[1].events = POLLIN; + fds[1].revents = 0; + nfds = 2; + ubf = ubf_sigwait; + } + else { + nfds = 1; + ubf = ubf_select; + } + lerrno = 0; BLOCKING_REGION(th, { - result = ppoll(&fds, 1, tsp, NULL); + to = sigwait_timeout(th, fds[1].fd, tsp, &drained); + result = ppoll(fds, nfds, to, NULL); if (result < 0) lerrno = errno; - }, ubf_select, th, FALSE); + }, ubf, th, FALSE); + if (fds[1].fd >= 0) { + if (result > 0 && fds[1].revents) { + result--; + fds[1].revents = 0; + } + (void)check_signals_nogvl(th, fds[1].fd); + rb_sigwait_fd_put(th, fds[1].fd); + rb_sigwait_fd_migrate(th->vm); + } RUBY_VM_CHECK_INTS_BLOCKING(th->ec); } while (wait_retryable(&result, lerrno, tsp, &end)); + if (result < 0) { errno = lerrno; return -1; } - if (fds.revents & POLLNVAL) { + if (fds[0].revents & POLLNVAL) { errno = EBADF; return -1; } @@ -4062,32 +4129,20 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) * Therefore we need to fix it up. */ result = 0; - if (fds.revents & POLLIN_SET) + if (fds[0].revents & POLLIN_SET) result |= RB_WAITFD_IN; - if (fds.revents & POLLOUT_SET) + if (fds[0].revents & POLLOUT_SET) result |= RB_WAITFD_OUT; - if (fds.revents & POLLEX_SET) + if (fds[0].revents & POLLEX_SET) result |= RB_WAITFD_PRI; /* all requested events are ready if there is an error */ - if (fds.revents & POLLERR_SET) + if (fds[0].revents & POLLERR_SET) result |= events; return result; } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - if (fd < 0) { - return 0; - } - rb_fd_init(fds); - rb_fd_set(fd, fds); - - return fds; -} - struct select_args { union { int fd; @@ -4168,10 +4223,6 @@ rb_gc_set_stack_end(VALUE **stack_end_p) } #endif - -/* signal.c */ -void ruby_sigchld_handler(rb_vm_t *); - /* * */ @@ -4187,36 +4238,81 @@ rb_threadptr_check_signal(rb_thread_t *mth) } static void -timer_thread_function(void *arg) +timer_thread_function(void) { - rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ + volatile rb_execution_context_t *ec; - /* - * Tricky: thread_destruct_lock doesn't close a race against - * vm->running_thread switch. however it guarantees th->running_thread - * point to valid pointer or NULL. - */ - rb_native_mutex_lock(&vm->thread_destruct_lock); /* for time slice */ - if (vm->running_thread) { - RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec); - } - rb_native_mutex_unlock(&vm->thread_destruct_lock); - - /* check signal */ - ruby_sigchld_handler(vm); - rb_threadptr_check_signal(vm->main_thread); + ec = ACCESS_ONCE(rb_execution_context_t *, + ruby_current_execution_context_ptr); + if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec); +} -#if 0 - /* prove profiler */ - if (vm->prove_profile.enable) { - rb_thread_t *th = vm->running_thread; +static void +async_bug_fd(const char *mesg, int errno_arg, int fd) +{ + char buff[64]; + size_t n = strlcpy(buff, mesg, sizeof(buff)); + if (n < sizeof(buff)-3) { + ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); + } + rb_async_bug_errno(buff, errno_arg); +} - if (vm->during_gc) { - /* GC prove profiling */ +/* VM-dependent API is not available for this function */ +static int +consume_communication_pipe(int fd) +{ +#define CCP_READ_BUFF_SIZE 1024 + /* buffer can be shared because no one refers to them. */ + static char buff[CCP_READ_BUFF_SIZE]; + ssize_t result; + int ret = FALSE; /* for rb_sigwait_sleep */ + + while (1) { + result = read(fd, buff, sizeof(buff)); + if (result > 0) { + ret = TRUE; + if (result < (ssize_t)sizeof(buff)) { + return ret; + } } - } + else if (result == 0) { + return ret; + } + else if (result < 0) { + int e = errno; + switch (e) { + case EINTR: + continue; /* retry */ + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: #endif + return ret; + default: + async_bug_fd("consume_communication_pipe: read", e, fd); + } + } + } +} + +static int +check_signals_nogvl(rb_thread_t *th, int sigwait_fd) +{ + rb_vm_t *vm = GET_VM(); /* th may be 0 */ + int ret = consume_communication_pipe(sigwait_fd); + ubf_wakeup_all_threads(); + ruby_sigchld_handler(vm); + if (rb_signal_buff_size()) { + if (th == vm->main_thread) + /* no need to lock + wakeup if already in main thread */ + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + else + threadptr_trap_interrupt(vm->main_thread); + ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ + } + return ret; } void @@ -5046,7 +5142,6 @@ Init_Thread(void) /* acquire global vm lock */ gvl_init(th->vm); gvl_acquire(th->vm, th); - rb_native_mutex_initialize(&th->vm->thread_destruct_lock); rb_native_mutex_initialize(&th->vm->waitpid_lock); rb_native_mutex_initialize(&th->interrupt_lock); |