From 194a6a2c68e9c8a3536b24db18ceac87535a6051 Mon Sep 17 00:00:00 2001 From: normal Date: Mon, 6 Aug 2018 05:22:00 +0000 Subject: thread_pthread.c: restore timer-thread for now :< [ruby-core:88306] Revert "process.c: ensure th->interrupt lock is held when migrating" This reverts commit 5ca416bdf6b6785cb20f139c2c514eda005fe42f (r64201) Revert "process.c (rb_waitpid): reduce sigwait_fd bouncing" This reverts commit 217bdd776fbeea3bfd0b9324eefbfcec3b1ccb3e (r64200). Revert "test/ruby/test_thread.rb (test_thread_timer_and_interrupt): add timeouts" This reverts commit 9f395f11202fc3c7edbd76f5aa6ce1f8a1e752a9 (r64199). Revert "thread_pthread.c (native_sleep): reduce ppoll sleeps" This reverts commit b3aa256c4d43d3d7e9975ec18eb127f45f623c9b (r64193). Revert "thread.c (consume_communication_pipe): do not retry after short read" This reverts commit 291a82f748de56e65fac10edefc51ec7a54a82d4 (r64185). Revert "test/ruby/test_io.rb (test_race_gets_and_close): timeout each thread" This reverts commit 3dbd8d1f66537f968f0461ed8547460b3b1241b3 (r64184). Revert "thread_pthread.c (gvl_acquire_common): persist timeout across calls" This reverts commit 8c2ae6e3ed072b06fc3cbc34fa8a14b2acbb49d5 (r64165). Revert "test/ruby/test_io.rb (test_race_gets_and_close): use SIGABRT on timeout" This reverts commit 931cda4db8afd6b544a8d85a6815765a9c417213 (r64135). Revert "thread_pthread.c (gvl_yield): do ubf wakeups when uncontended" This reverts commit 508f00314f46c08b6e9b0141c01355d24954260c (r64133). Revert "thread_pthread.h (native_thread_data): split condvars on some platforms" This reverts commit a038bf238bd9a24bf1e1622f618a27db261fc91b (r64124). Revert "process.c (waitpid_nogvl): prevent conflicting use of sleep_cond" This reverts commit 7018acc946882f21d519af7c42ccf84b22a46b27 (r64117). Revert "thread_pthread.c (rb_sigwait_sleep): th may be 0 from MJIT" This reverts commit 56491afc7916fb24f5c4dc2c632fb93fa7063992 (r64116). Revert "thread*.c: waiting on sigwait_fd performs periodic ubf wakeups" This reverts commit ab47a57a46e70634d049e4da20a5441c7a14cdec (r64115). Revert "thread_pthread.c (gvl_destroy): make no-op on GVL bits" This reverts commit 95cae748171f4754b97f4ba54da2ae62a8d484fd (r64114). Revert "thread_pthread.c (rb_sigwait_sleep): fix uninitialized poll set in UBF case" This reverts commit 4514362948fdb914c6138b12d961d92e9c0fee6c (r64113). Revert "thread_pthread.c (rb_sigwait_sleep): re-fix [Bug #5343] harder" This reverts commit 26b8a70bb309c7a367b9134045508b5b5a580a77 (r64111). Revert "thread.c: move ppoll wrapper into thread_pthread.c" This reverts commit 3dc7727d22fecbc355597edda25d2a245bf55ba1 (r64110). Revert "thread.c: move ppoll wrapper before thread_pthread.c" This reverts commit 2fa1e2e3c3c5c4b3ce84730dee4bcbe9d81b8e35 (r64109). Revert "thread_pthread.c (ubf_select): refix [Bug #5343]" This reverts commit 4c1ab82f0623eca91a95d2a44053be22bbce48ad (r64108). Revert "thread_win32.c: suppress warnings by -Wsuggest-attribute" This reverts commit 6a9b63e39075c53870933fbac5c1065f7d22047c (r64159). Revert "thread_pthread: remove timer-thread by restructuring GVL" This reverts commit 708bfd21156828526fe72de2cedecfaca6647dc1 (r64107). git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64203 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- thread.c | 397 ++++++++++++++++++++++++--------------------------------------- 1 file changed, 151 insertions(+), 246 deletions(-) (limited to 'thread.c') diff --git a/thread.c b/thread.c index 9c7fcee05f..a7d48a464f 100644 --- a/thread.c +++ b/thread.c @@ -106,13 +106,8 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); static void timespec_add(struct timespec *, const struct timespec *); static void timespec_sub(struct timespec *, const struct timespec *); -static int timespec_cmp(const struct timespec *a, const struct timespec *b); static int timespec_update_expire(struct timespec *, const struct timespec *); static void getclockofday(struct timespec *); -NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static int consume_communication_pipe(int fd); -static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); -void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ #define eKillSignal INT2FIX(0) #define eTerminateSignal INT2FIX(1) @@ -353,14 +348,7 @@ rb_thread_s_debug_set(VALUE self, VALUE val) #endif NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)); -static void timer_thread_function(void); -void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ - -static void -ubf_sigwait(void *ignore) -{ - rb_thread_wakeup_timer_thread(0); -} +static void timer_thread_function(void *); #if defined(_WIN32) #include "thread_win32.c" @@ -385,15 +373,6 @@ ubf_sigwait(void *ignore) #error "unsupported thread type" #endif -/* - * TODO: somebody with win32 knowledge should be able to get rid of - * timer-thread by busy-waiting on signals. And it should be possible - * to make the GVL in thread_pthread.c be platform-independent. - */ -#ifndef BUSY_WAIT_SIGNALS -# define BUSY_WAIT_SIGNALS (0) -#endif - #if THREAD_DEBUG static int debug_mutex_initialized = 1; static rb_nativethread_lock_t debug_mutex; @@ -433,6 +412,7 @@ rb_vm_gvl_destroy(rb_vm_t *vm) { gvl_release(vm); gvl_destroy(vm); + rb_native_mutex_destroy(&vm->thread_destruct_lock); if (0) { /* may be held by running threads */ rb_native_mutex_destroy(&vm->waitpid_lock); @@ -793,6 +773,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s rb_fiber_close(th->ec->fiber_ptr); } + rb_native_mutex_lock(&th->vm->thread_destruct_lock); + /* make sure vm->running_thread never point me after this point.*/ + th->vm->running_thread = NULL; + rb_native_mutex_unlock(&th->vm->thread_destruct_lock); thread_cleanup_func(th, FALSE); gvl_release(th->vm); @@ -2179,14 +2163,6 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) /* signal handling */ if (trap_interrupt && (th == th->vm->main_thread)) { enum rb_thread_status prev_status = th->status; - int sigwait_fd = rb_sigwait_fd_get(th); - - if (sigwait_fd >= 0) { - (void)consume_communication_pipe(sigwait_fd); - ruby_sigchld_handler(th->vm); - rb_sigwait_fd_put(th, sigwait_fd); - rb_sigwait_fd_migrate(th->vm); - } th->status = THREAD_RUNNABLE; while ((sig = rb_get_next_signal()) != 0) { rb_signal_exec(th, sig); @@ -3864,95 +3840,86 @@ wait_retryable(int *result, int errnum, struct timespec *timeout, return FALSE; } +#define restore_fdset(fds1, fds2) \ + ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0) + struct select_set { - int max; - int sigwait_fd; - rb_thread_t *th; - rb_fdset_t *rset; - rb_fdset_t *wset; - rb_fdset_t *eset; - rb_fdset_t orig_rset; - rb_fdset_t orig_wset; - rb_fdset_t orig_eset; - struct timeval *timeout; + rb_fdset_t read; + rb_fdset_t write; + rb_fdset_t except; }; -static VALUE -select_set_free(VALUE p) +static size_t +select_set_memsize(const void *p) { - struct select_set *set = (struct select_set *)p; - - if (set->sigwait_fd >= 0) { - rb_sigwait_fd_put(set->th, set->sigwait_fd); - rb_sigwait_fd_migrate(set->th->vm); - } - - rb_fd_term(&set->orig_rset); - rb_fd_term(&set->orig_wset); - rb_fd_term(&set->orig_eset); - - return Qfalse; + return sizeof(struct select_set); } -static const struct timespec * -sigwait_timeout(rb_thread_t *th, int sigwait_fd, const struct timespec *orig, - int *drained_p) +static void +select_set_free(void *p) { - static const struct timespec quantum = { 0, TIME_QUANTUM_USEC * 1000 }; - - if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { - *drained_p = check_signals_nogvl(th, sigwait_fd); - if (!orig || timespec_cmp(orig, &quantum) > 0) - return &quantum; - } + struct select_set *orig = p; - return orig; + rb_fd_term(&orig->read); + rb_fd_term(&orig->write); + rb_fd_term(&orig->except); + xfree(orig); } -static VALUE -do_select(VALUE p) +static const rb_data_type_t select_set_type = { + "select_set", + {NULL, select_set_free, select_set_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY +}; + +static int +do_select(int n, rb_fdset_t *const readfds, rb_fdset_t *const writefds, + rb_fdset_t *const exceptfds, struct timeval *timeout) { - struct select_set *set = (struct select_set *)p; int MAYBE_UNUSED(result); int lerrno; struct timespec ts, end, *tsp; - const struct timespec *to; - struct timeval tv; + rb_thread_t *th = GET_THREAD(); + VALUE o; + struct select_set *orig; + + o = TypedData_Make_Struct(0, struct select_set, &select_set_type, orig); - timeout_prepare(&tsp, &ts, &end, set->timeout); -#define restore_fdset(dst, src) \ - ((dst) ? rb_fd_dup(dst, src) : (void)0) + timeout_prepare(&tsp, &ts, &end, timeout); #define do_select_update() \ - (restore_fdset(set->rset, &set->orig_rset), \ - restore_fdset(set->wset, &set->orig_wset), \ - restore_fdset(set->eset, &set->orig_eset), \ + (restore_fdset(readfds, &orig->read), \ + restore_fdset(writefds, &orig->write), \ + restore_fdset(exceptfds, &orig->except), \ TRUE) +#define fd_init_copy(f) \ + (f##fds) ? rb_fd_init_copy(&orig->f, f##fds) : rb_fd_no_init(&orig->f) + fd_init_copy(read); + fd_init_copy(write); + fd_init_copy(except); +#undef fd_init_copy + do { - int drained; lerrno = 0; - BLOCKING_REGION(set->th, { - to = sigwait_timeout(set->th, set->sigwait_fd, tsp, &drained); - result = native_fd_select(set->max, set->rset, set->wset, set->eset, - timeval_for(&tv, to), set->th); + BLOCKING_REGION(th, { + result = native_fd_select(n, readfds, writefds, exceptfds, + timeval_for(timeout, tsp), th); if (result < 0) lerrno = errno; - }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, FALSE); + }, ubf_select, th, FALSE); - if (set->sigwait_fd >= 0) { - if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) - result--; - (void)check_signals_nogvl(set->th, set->sigwait_fd); - } - - RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may raise */ } while (wait_retryable(&result, lerrno, tsp, &end) && do_select_update()); + /* didn't raise, perform cleanup ourselves */ + select_set_free(orig); + rb_gc_force_recycle(o); + if (result < 0) { errno = lerrno; } - return (VALUE)result; + return result; } static void @@ -3988,42 +3955,11 @@ rb_thread_fd_writable(int fd) return TRUE; } -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - if (fd < 0) { - return 0; - } - rb_fd_init(fds); - rb_fd_set(fd, fds); - - return fds; -} - int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) { - struct select_set set; - - set.th = GET_THREAD(); - set.max = max; - set.sigwait_fd = rb_sigwait_fd_get(set.th); - set.rset = read; - set.wset = write; - set.eset = except; - set.timeout = timeout; - - if (set.sigwait_fd >= 0) { - if (set.rset) - rb_fd_set(set.sigwait_fd, set.rset); - else - set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); - if (set.sigwait_fd > set.max) { - set.max = set.sigwait_fd + 1; - } - } - if (!set.rset && !set.wset && !set.eset) { + if (!read && !write && !except) { if (!timeout) { rb_thread_sleep_forever(); return 0; @@ -4032,23 +3968,16 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return 0; } -#define fd_init_copy(f) do { \ - if (set.f) { \ - rb_fd_resize(set.max - 1, set.f); \ - if (&set.orig_##f != set.f) { /* sigwait_fd */ \ - rb_fd_init_copy(&set.orig_##f, set.f); \ - } \ - } \ - else { \ - rb_fd_no_init(&set.orig_##f); \ - } \ - } while (0) - fd_init_copy(rset); - fd_init_copy(wset); - fd_init_copy(eset); -#undef fd_init_copy - - return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); + if (read) { + rb_fd_resize(max - 1, read); + } + if (write) { + rb_fd_resize(max - 1, write); + } + if (except) { + rb_fd_resize(max - 1, except); + } + return do_select(max, read, write, except, timeout); } #ifdef USE_POLL @@ -4062,64 +3991,68 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * # define POLLERR_SET (0) #endif +#ifndef HAVE_PPOLL +/* TODO: don't ignore sigmask */ +static int +ruby_ppoll(struct pollfd *fds, nfds_t nfds, + const struct timespec *ts, const sigset_t *sigmask) +{ + int timeout_ms; + + if (ts) { + int tmp, tmp2; + + if (ts->tv_sec > INT_MAX/1000) + timeout_ms = INT_MAX; + else { + tmp = (int)(ts->tv_sec * 1000); + /* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */ + tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L)); + if (INT_MAX - tmp < tmp2) + timeout_ms = INT_MAX; + else + timeout_ms = (int)(tmp + tmp2); + } + } + else + timeout_ms = -1; + + return poll(fds, nfds, timeout_ms); +} +# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask)) +#endif + /* * returns a mask of events */ int rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) { - struct pollfd fds[2]; + struct pollfd fds; int result = 0, lerrno; struct timespec ts, end, *tsp; - const struct timespec *to; - int drained; rb_thread_t *th = GET_THREAD(); - nfds_t nfds; - rb_unblock_function_t *ubf; timeout_prepare(&tsp, &ts, &end, timeout); - fds[0].fd = fd; - fds[0].events = (short)events; - do { - fds[0].revents = 0; - fds[1].fd = rb_sigwait_fd_get(th); - - if (fds[1].fd >= 0) { - fds[1].events = POLLIN; - fds[1].revents = 0; - nfds = 2; - ubf = ubf_sigwait; - } - else { - nfds = 1; - ubf = ubf_select; - } + fds.fd = fd; + fds.events = (short)events; + do { + fds.revents = 0; lerrno = 0; BLOCKING_REGION(th, { - to = sigwait_timeout(th, fds[1].fd, tsp, &drained); - result = ppoll(fds, nfds, to, NULL); + result = ppoll(&fds, 1, tsp, NULL); if (result < 0) lerrno = errno; - }, ubf, th, FALSE); + }, ubf_select, th, FALSE); - if (fds[1].fd >= 0) { - if (result > 0 && fds[1].revents) { - result--; - fds[1].revents = 0; - } - (void)check_signals_nogvl(th, fds[1].fd); - rb_sigwait_fd_put(th, fds[1].fd); - rb_sigwait_fd_migrate(th->vm); - } RUBY_VM_CHECK_INTS_BLOCKING(th->ec); } while (wait_retryable(&result, lerrno, tsp, &end)); - if (result < 0) { errno = lerrno; return -1; } - if (fds[0].revents & POLLNVAL) { + if (fds.revents & POLLNVAL) { errno = EBADF; return -1; } @@ -4129,20 +4062,32 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) * Therefore we need to fix it up. */ result = 0; - if (fds[0].revents & POLLIN_SET) + if (fds.revents & POLLIN_SET) result |= RB_WAITFD_IN; - if (fds[0].revents & POLLOUT_SET) + if (fds.revents & POLLOUT_SET) result |= RB_WAITFD_OUT; - if (fds[0].revents & POLLEX_SET) + if (fds.revents & POLLEX_SET) result |= RB_WAITFD_PRI; /* all requested events are ready if there is an error */ - if (fds[0].revents & POLLERR_SET) + if (fds.revents & POLLERR_SET) result |= events; return result; } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) +{ + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); + + return fds; +} + struct select_args { union { int fd; @@ -4223,6 +4168,10 @@ rb_gc_set_stack_end(VALUE **stack_end_p) } #endif + +/* signal.c */ +void ruby_sigchld_handler(rb_vm_t *); + /* * */ @@ -4238,81 +4187,36 @@ rb_threadptr_check_signal(rb_thread_t *mth) } static void -timer_thread_function(void) +timer_thread_function(void *arg) { - volatile rb_execution_context_t *ec; + rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ + /* + * Tricky: thread_destruct_lock doesn't close a race against + * vm->running_thread switch. however it guarantees th->running_thread + * point to valid pointer or NULL. + */ + rb_native_mutex_lock(&vm->thread_destruct_lock); /* for time slice */ - ec = ACCESS_ONCE(rb_execution_context_t *, - ruby_current_execution_context_ptr); - if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec); -} - -static void -async_bug_fd(const char *mesg, int errno_arg, int fd) -{ - char buff[64]; - size_t n = strlcpy(buff, mesg, sizeof(buff)); - if (n < sizeof(buff)-3) { - ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); - } - rb_async_bug_errno(buff, errno_arg); -} - -/* VM-dependent API is not available for this function */ -static int -consume_communication_pipe(int fd) -{ -#define CCP_READ_BUFF_SIZE 1024 - /* buffer can be shared because no one refers to them. */ - static char buff[CCP_READ_BUFF_SIZE]; - ssize_t result; - int ret = FALSE; /* for rb_sigwait_sleep */ - - while (1) { - result = read(fd, buff, sizeof(buff)); - if (result > 0) { - ret = TRUE; - if (result < (ssize_t)sizeof(buff)) { - return ret; - } - } - else if (result == 0) { - return ret; - } - else if (result < 0) { - int e = errno; - switch (e) { - case EINTR: - continue; /* retry */ - case EAGAIN: -#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: -#endif - return ret; - default: - async_bug_fd("consume_communication_pipe: read", e, fd); - } - } + if (vm->running_thread) { + RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread->ec); } -} + rb_native_mutex_unlock(&vm->thread_destruct_lock); -static int -check_signals_nogvl(rb_thread_t *th, int sigwait_fd) -{ - rb_vm_t *vm = GET_VM(); /* th may be 0 */ - int ret = consume_communication_pipe(sigwait_fd); - ubf_wakeup_all_threads(); + /* check signal */ ruby_sigchld_handler(vm); - if (rb_signal_buff_size()) { - if (th == vm->main_thread) - /* no need to lock + wakeup if already in main thread */ - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - else - threadptr_trap_interrupt(vm->main_thread); - ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ + rb_threadptr_check_signal(vm->main_thread); + +#if 0 + /* prove profiler */ + if (vm->prove_profile.enable) { + rb_thread_t *th = vm->running_thread; + + if (vm->during_gc) { + /* GC prove profiling */ + } } - return ret; +#endif } void @@ -5142,6 +5046,7 @@ Init_Thread(void) /* acquire global vm lock */ gvl_init(th->vm); gvl_acquire(th->vm, th); + rb_native_mutex_initialize(&th->vm->thread_destruct_lock); rb_native_mutex_initialize(&th->vm->waitpid_lock); rb_native_mutex_initialize(&th->interrupt_lock); -- cgit v1.2.3