diff options
Diffstat (limited to 'thread.c')
| -rw-r--r-- | thread.c | 182 |
1 files changed, 119 insertions, 63 deletions
@@ -99,6 +99,7 @@ #include "ractor_core.h" #include "vm_debug.h" #include "vm_sync.h" +#include "zjit.h" #include "ccan/list/list.h" @@ -170,7 +171,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio #define THREAD_BLOCKING_BEGIN(th) do { \ struct rb_thread_sched * const sched = TH_SCHED(th); \ RB_VM_SAVE_MACHINE_CONTEXT(th); \ - thread_sched_to_waiting((sched), (th)); + thread_sched_to_waiting((sched), (th), true); #define THREAD_BLOCKING_END(th) \ thread_sched_to_running((sched), (th)); \ @@ -194,7 +195,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio /* Important that this is inlined into the macro, and not part of \ * blocking_region_begin - see bug #20493 */ \ RB_VM_SAVE_MACHINE_CONTEXT(th); \ - thread_sched_to_waiting(TH_SCHED(th), th); \ + thread_sched_to_waiting(TH_SCHED(th), th, false); \ exec; \ blocking_region_end(th, &__region); \ }; \ @@ -221,7 +222,18 @@ vm_check_ints_blocking(rb_execution_context_t *ec) th->pending_interrupt_queue_checked = 0; RUBY_VM_SET_INTERRUPT(ec); } - return rb_threadptr_execute_interrupts(th, 1); + + int result = rb_threadptr_execute_interrupts(th, 1); + + // When a signal is received, we yield to the scheduler as soon as possible: + if (result || RUBY_VM_INTERRUPTED(ec)) { + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); + if (scheduler != Qnil) { + rb_fiber_scheduler_yield(scheduler); + } + } + + return result; } int @@ -442,8 +454,8 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) th->keeping_mutexes = mutex->next_mutex; // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th); - - const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber); + VM_ASSERT(mutex->ec_serial); + const char *error_message = rb_mutex_unlock_th(mutex, th, 0); if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message); } } @@ -529,6 +541,15 @@ thread_cleanup_func(void *th_ptr, int atfork) rb_native_mutex_destroy(&th->interrupt_lock); } +void +rb_thread_free_native_thread(void *th_ptr) +{ + rb_thread_t *th = th_ptr; + + native_thread_destroy_atfork(th->nt); + th->nt = NULL; +} + static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); static VALUE rb_thread_to_s(VALUE thread); @@ -840,8 +861,9 @@ thread_create_core(VALUE thval, struct thread_create_params *params) #endif th->invoke_type = thread_invoke_type_ractor_proc; th->ractor = params->g; + th->ec->ractor_id = rb_ractor_id(th->ractor); th->ractor->threads.main = th; - th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc); + th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil); th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args)); th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); rb_ractor_send_parameters(ec, params->g, params->args); @@ -1004,7 +1026,7 @@ rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc) .args = args, .proc = proc, }; - return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms);; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); } @@ -1055,7 +1077,7 @@ thread_join_sleep(VALUE arg) } while (!thread_finished(target_th)) { - VALUE scheduler = rb_fiber_scheduler_current(); + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); if (!limit) { if (scheduler != Qnil) { @@ -1404,17 +1426,18 @@ rb_thread_sleep_deadly(void) static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end) { - VALUE scheduler = rb_fiber_scheduler_current(); + rb_thread_t *th = GET_THREAD(); + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); if (scheduler != Qnil) { rb_fiber_scheduler_block(scheduler, blocker, timeout); } else { RUBY_DEBUG_LOG("..."); if (end) { - sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK); + sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK); } else { - sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + sleep_forever(th, SLEEP_DEADLOCKABLE); } } } @@ -2033,6 +2056,9 @@ rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, voi * created as Ruby thread (created by Thread.new or so). In other * words, this function *DOES NOT* associate or convert a NON-Ruby * thread to a Ruby thread. + * + * NOTE: If this thread has already acquired the GVL, then the method call + * is performed without acquiring or releasing the GVL (from Ruby 4.0). */ void * rb_thread_call_with_gvl(void *(*func)(void *), void *data1) @@ -2056,7 +2082,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) prev_unblock = th->unblock; if (brb == 0) { - rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); + /* the GVL is already acquired, call method directly */ + return (*func)(data1); } blocking_region_end(th, brb); @@ -2066,7 +2093,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); RUBY_ASSERT_ALWAYS(released); RB_VM_SAVE_MACHINE_CONTEXT(th); - thread_sched_to_waiting(TH_SCHED(th), th); + thread_sched_to_waiting(TH_SCHED(th), th, true); return r; } @@ -2711,11 +2738,18 @@ rb_threadptr_ready(rb_thread_t *th) static VALUE rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv) { + VALUE exc; + if (rb_threadptr_dead(target_th)) { return Qnil; } - VALUE exception = rb_exception_setup(argc, argv); + if (argc == 0) { + exc = rb_exc_new(rb_eRuntimeError, 0, 0); + } + else { + exc = rb_make_exception(argc, argv); + } /* making an exception object can switch thread, so we need to check thread deadness again */ @@ -2723,7 +2757,8 @@ rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv) return Qnil; } - rb_threadptr_pending_interrupt_enque(target_th, exception); + rb_ec_setup_exception(GET_EC(), exc, Qundef); + rb_threadptr_pending_interrupt_enque(target_th, exc); rb_threadptr_interrupt(target_th); return Qnil; @@ -2882,12 +2917,11 @@ rb_thread_fd_close(int fd) /* * call-seq: - * thr.raise - * thr.raise(string) - * thr.raise(exception [, string [, array]]) + * raise(exception, message = exception.to_s, backtrace = nil, cause: $!) + * raise(message = nil, cause: $!) * * Raises an exception from the given thread. The caller does not have to be - * +thr+. See Kernel#raise for more information. + * +thr+. See Kernel#raise for more information on arguments. * * Thread.abort_on_exception = true * a = Thread.new { sleep(200) } @@ -2908,7 +2942,14 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) const rb_thread_t *current_th = GET_THREAD(); threadptr_check_pending_interrupt_queue(target_th); - rb_threadptr_raise(target_th, argc, argv); + + if (rb_threadptr_dead(target_th)) { + return Qnil; + } + + VALUE exception = rb_exception_setup(argc, argv); + rb_threadptr_pending_interrupt_enque(target_th, exception); + rb_threadptr_interrupt(target_th); /* To perform Thread.current.raise as Kernel.raise */ if (current_th == target_th) { @@ -2926,7 +2967,10 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) * * Terminates +thr+ and schedules another thread to be run, returning * the terminated Thread. If this is the main thread, or the last - * thread, exits the process. + * thread, exits the process. Note that the caller does not wait for + * the thread to terminate if the receiver is different from the currently + * running thread. The termination is asynchronous, and the thread can still + * run a small amount of ruby code before exiting. */ VALUE @@ -4223,13 +4267,20 @@ rb_fd_init(rb_fdset_t *fds) FD_ZERO(fds->fdset); } +static inline size_t +fdset_memsize(int maxfd) +{ + size_t o = howmany(maxfd, NFDBITS) * sizeof(fd_mask); + if (o < sizeof(fd_set)) { + return sizeof(fd_set); + } + return o; +} + void rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) { - size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); - - if (size < sizeof(fd_set)) - size = sizeof(fd_set); + size_t size = fdset_memsize(rb_fd_max(src)); dst->maxfd = src->maxfd; dst->fdset = xmalloc(size); memcpy(dst->fdset, src->fdset, size); @@ -4238,7 +4289,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) void rb_fd_term(rb_fdset_t *fds) { - xfree(fds->fdset); + ruby_xfree_sized(fds->fdset, fdset_memsize(fds->maxfd)); fds->maxfd = 0; fds->fdset = 0; } @@ -4253,14 +4304,11 @@ rb_fd_zero(rb_fdset_t *fds) static void rb_fd_resize(int n, rb_fdset_t *fds) { - size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); - size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); - - if (m < sizeof(fd_set)) m = sizeof(fd_set); - if (o < sizeof(fd_set)) o = sizeof(fd_set); + size_t m = fdset_memsize(n + 1); + size_t o = fdset_memsize(fds->maxfd); if (m > o) { - fds->fdset = xrealloc(fds->fdset, m); + fds->fdset = ruby_xrealloc_sized(fds->fdset, m, o); memset((char *)fds->fdset + o, 0, m - o); } if (n >= fds->maxfd) fds->maxfd = n + 1; @@ -4290,23 +4338,18 @@ rb_fd_isset(int n, const rb_fdset_t *fds) void rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) { - size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); - - if (size < sizeof(fd_set)) size = sizeof(fd_set); + size_t size = fdset_memsize(max); + dst->fdset = ruby_xrealloc_sized(dst->fdset, size, fdset_memsize(dst->maxfd)); dst->maxfd = max; - dst->fdset = xrealloc(dst->fdset, size); memcpy(dst->fdset, src, size); } void rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) { - size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); - - if (size < sizeof(fd_set)) - size = sizeof(fd_set); + size_t size = fdset_memsize(rb_fd_max(src)); + dst->fdset = ruby_xrealloc_sized(dst->fdset, size, fdset_memsize(dst->maxfd)); dst->maxfd = src->maxfd; - dst->fdset = xrealloc(dst->fdset, size); memcpy(dst->fdset, src->fdset, size); } @@ -4358,10 +4401,19 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) rb_fd_dup(dst, src); } +static inline size_t +fdset_memsize(int capa) +{ + if (capa == FD_SETSIZE) { + return sizeof(fd_set); + } + return sizeof(unsigned int) + (capa * sizeof(SOCKET)); +} + void rb_fd_term(rb_fdset_t *set) { - xfree(set->fdset); + ruby_xfree_sized(set->fdset, fdset_memsize(set->capa)); set->fdset = NULL; set->capa = 0; } @@ -4574,7 +4626,7 @@ wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t n * returns a mask of events */ static int -thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) +thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout) { struct pollfd fds[1] = {{ .fd = fd, @@ -4587,8 +4639,8 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) enum ruby_tag_type state; volatile int lerrno; - rb_execution_context_t *ec = GET_EC(); - rb_thread_t *th = rb_ec_thread_ptr(ec); + RUBY_ASSERT(th); + rb_execution_context_t *ec = th->ec; if (io) { blocking_operation.ec = ec; @@ -4722,7 +4774,7 @@ init_set_fd(int fd, rb_fdset_t *fds) } static int -thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) +thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout) { rb_fdset_t rfds, wfds, efds; struct select_args args; @@ -4731,7 +4783,7 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) struct rb_io_blocking_operation blocking_operation; if (io) { args.io = io; - blocking_operation.ec = GET_EC(); + blocking_operation.ec = th->ec; rb_io_blocking_operation_enter(io, &blocking_operation); args.blocking_operation = &blocking_operation; } @@ -4756,15 +4808,15 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) #endif /* ! USE_POLL */ int -rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) +rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout) { - return thread_io_wait(NULL, fd, events, timeout); + return thread_io_wait(th, NULL, fd, events, timeout); } int -rb_thread_io_wait(struct rb_io *io, int events, struct timeval * timeout) +rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout) { - return thread_io_wait(io, io->fd, events, timeout); + return thread_io_wait(th, io, io->fd, events, timeout); } /* @@ -4967,6 +5019,9 @@ static void terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) { if (th != current_th) { + // Clear the scheduler as it is no longer operational: + th->scheduler = Qnil; + rb_native_mutex_initialize(&th->interrupt_lock); rb_mutex_abandon_keeping_mutexes(th); rb_mutex_abandon_locking_mutex(th); @@ -4982,6 +5037,7 @@ rb_thread_atfork(void) rb_threadptr_pending_interrupt_clear(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; + th->scheduler = Qnil; rb_fiber_atfork(th); /* We don't want reproduce CVE-2003-0900. */ @@ -5251,7 +5307,7 @@ rb_thread_shield_owned(VALUE self) rb_mutex_t *m = mutex_ptr(mutex); - return m->fiber == GET_EC()->fiber_ptr; + return m->ec_serial == rb_ec_serial(GET_EC()); } /* @@ -5270,7 +5326,7 @@ rb_thread_shield_wait(VALUE self) if (!mutex) return Qfalse; m = mutex_ptr(mutex); - if (m->fiber == GET_EC()->fiber_ptr) return Qnil; + if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil; rb_thread_shield_waiting_inc(self); rb_mutex_lock(mutex); rb_thread_shield_waiting_dec(self); @@ -5787,8 +5843,8 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, - (void *)mutex->fiber, rb_mutex_num_waiting(mutex)); + rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE, + (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex)); } { @@ -5828,7 +5884,7 @@ rb_check_deadlock(rb_ractor_t *r) } else if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) { + if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) { found = 1; } } @@ -5850,7 +5906,7 @@ static void update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg) { const rb_control_frame_t *cfp = GET_EC()->cfp; - VALUE coverage = rb_iseq_coverage(cfp->iseq); + VALUE coverage = rb_iseq_coverage(CFP_ISEQ(cfp)); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES); if (lines) { @@ -5860,7 +5916,7 @@ update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg) VALUE num; void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset); if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) { - rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); + rb_iseq_clear_event_flags(CFP_ISEQ(cfp), CFP_PC(cfp) - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); rb_ary_push(lines, LONG2FIX(line + 1)); return; } @@ -5881,12 +5937,12 @@ static void update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg) { const rb_control_frame_t *cfp = GET_EC()->cfp; - VALUE coverage = rb_iseq_coverage(cfp->iseq); + VALUE coverage = rb_iseq_coverage(CFP_ISEQ(cfp)); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES); if (branches) { - long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1; - long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count; + long pc = CFP_PC(cfp) - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded - 1; + long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(CFP_ISEQ(cfp)), pc)), count; VALUE counters = RARRAY_AREF(branches, 1); VALUE num = RARRAY_AREF(counters, idx); count = FIX2LONG(num) + 1; @@ -6233,7 +6289,7 @@ threadptr_interrupt_exec_exec(rb_thread_t *th) else { (*task->func)(task->data); } - ruby_xfree(task); + SIZED_FREE(task); } else { break; @@ -6249,7 +6305,7 @@ threadptr_interrupt_exec_cleanup(rb_thread_t *th) struct rb_interrupt_exec_task *task; while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) { - ruby_xfree(task); + SIZED_FREE(task); } } rb_native_mutex_unlock(&th->interrupt_lock); |
