summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c182
1 files changed, 119 insertions, 63 deletions
diff --git a/thread.c b/thread.c
index 97e9561f3a..25dde73fd7 100644
--- a/thread.c
+++ b/thread.c
@@ -99,6 +99,7 @@
#include "ractor_core.h"
#include "vm_debug.h"
#include "vm_sync.h"
+#include "zjit.h"
#include "ccan/list/list.h"
@@ -170,7 +171,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
#define THREAD_BLOCKING_BEGIN(th) do { \
struct rb_thread_sched * const sched = TH_SCHED(th); \
RB_VM_SAVE_MACHINE_CONTEXT(th); \
- thread_sched_to_waiting((sched), (th));
+ thread_sched_to_waiting((sched), (th), true);
#define THREAD_BLOCKING_END(th) \
thread_sched_to_running((sched), (th)); \
@@ -194,7 +195,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
/* Important that this is inlined into the macro, and not part of \
* blocking_region_begin - see bug #20493 */ \
RB_VM_SAVE_MACHINE_CONTEXT(th); \
- thread_sched_to_waiting(TH_SCHED(th), th); \
+ thread_sched_to_waiting(TH_SCHED(th), th, false); \
exec; \
blocking_region_end(th, &__region); \
}; \
@@ -221,7 +222,18 @@ vm_check_ints_blocking(rb_execution_context_t *ec)
th->pending_interrupt_queue_checked = 0;
RUBY_VM_SET_INTERRUPT(ec);
}
- return rb_threadptr_execute_interrupts(th, 1);
+
+ int result = rb_threadptr_execute_interrupts(th, 1);
+
+ // When a signal is received, we yield to the scheduler as soon as possible:
+ if (result || RUBY_VM_INTERRUPTED(ec)) {
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_yield(scheduler);
+ }
+ }
+
+ return result;
}
int
@@ -442,8 +454,8 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
th->keeping_mutexes = mutex->next_mutex;
// rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
-
- const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
+ VM_ASSERT(mutex->ec_serial);
+ const char *error_message = rb_mutex_unlock_th(mutex, th, 0);
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
}
}
@@ -529,6 +541,15 @@ thread_cleanup_func(void *th_ptr, int atfork)
rb_native_mutex_destroy(&th->interrupt_lock);
}
+void
+rb_thread_free_native_thread(void *th_ptr)
+{
+ rb_thread_t *th = th_ptr;
+
+ native_thread_destroy_atfork(th->nt);
+ th->nt = NULL;
+}
+
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
static VALUE rb_thread_to_s(VALUE thread);
@@ -840,8 +861,9 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
#endif
th->invoke_type = thread_invoke_type_ractor_proc;
th->ractor = params->g;
+ th->ec->ractor_id = rb_ractor_id(th->ractor);
th->ractor->threads.main = th;
- th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc);
+ th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil);
th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
rb_ractor_send_parameters(ec, params->g, params->args);
@@ -1004,7 +1026,7 @@ rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
.args = args,
.proc = proc,
};
- return thread_create_core(rb_thread_alloc(rb_cThread), &params);;
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
}
@@ -1055,7 +1077,7 @@ thread_join_sleep(VALUE arg)
}
while (!thread_finished(target_th)) {
- VALUE scheduler = rb_fiber_scheduler_current();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
if (!limit) {
if (scheduler != Qnil) {
@@ -1404,17 +1426,18 @@ rb_thread_sleep_deadly(void)
static void
rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
{
- VALUE scheduler = rb_fiber_scheduler_current();
+ rb_thread_t *th = GET_THREAD();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
if (scheduler != Qnil) {
rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
RUBY_DEBUG_LOG("...");
if (end) {
- sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
+ sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK);
}
else {
- sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ sleep_forever(th, SLEEP_DEADLOCKABLE);
}
}
}
@@ -2033,6 +2056,9 @@ rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, voi
* created as Ruby thread (created by Thread.new or so). In other
* words, this function *DOES NOT* associate or convert a NON-Ruby
* thread to a Ruby thread.
+ *
+ * NOTE: If this thread has already acquired the GVL, then the method call
+ * is performed without acquiring or releasing the GVL (from Ruby 4.0).
*/
void *
rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
@@ -2056,7 +2082,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
prev_unblock = th->unblock;
if (brb == 0) {
- rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
+ /* the GVL is already acquired, call method directly */
+ return (*func)(data1);
}
blocking_region_end(th, brb);
@@ -2066,7 +2093,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
RUBY_ASSERT_ALWAYS(released);
RB_VM_SAVE_MACHINE_CONTEXT(th);
- thread_sched_to_waiting(TH_SCHED(th), th);
+ thread_sched_to_waiting(TH_SCHED(th), th, true);
return r;
}
@@ -2711,11 +2738,18 @@ rb_threadptr_ready(rb_thread_t *th)
static VALUE
rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
{
+ VALUE exc;
+
if (rb_threadptr_dead(target_th)) {
return Qnil;
}
- VALUE exception = rb_exception_setup(argc, argv);
+ if (argc == 0) {
+ exc = rb_exc_new(rb_eRuntimeError, 0, 0);
+ }
+ else {
+ exc = rb_make_exception(argc, argv);
+ }
/* making an exception object can switch thread,
so we need to check thread deadness again */
@@ -2723,7 +2757,8 @@ rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
return Qnil;
}
- rb_threadptr_pending_interrupt_enque(target_th, exception);
+ rb_ec_setup_exception(GET_EC(), exc, Qundef);
+ rb_threadptr_pending_interrupt_enque(target_th, exc);
rb_threadptr_interrupt(target_th);
return Qnil;
@@ -2882,12 +2917,11 @@ rb_thread_fd_close(int fd)
/*
* call-seq:
- * thr.raise
- * thr.raise(string)
- * thr.raise(exception [, string [, array]])
+ * raise(exception, message = exception.to_s, backtrace = nil, cause: $!)
+ * raise(message = nil, cause: $!)
*
* Raises an exception from the given thread. The caller does not have to be
- * +thr+. See Kernel#raise for more information.
+ * +thr+. See Kernel#raise for more information on arguments.
*
* Thread.abort_on_exception = true
* a = Thread.new { sleep(200) }
@@ -2908,7 +2942,14 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
const rb_thread_t *current_th = GET_THREAD();
threadptr_check_pending_interrupt_queue(target_th);
- rb_threadptr_raise(target_th, argc, argv);
+
+ if (rb_threadptr_dead(target_th)) {
+ return Qnil;
+ }
+
+ VALUE exception = rb_exception_setup(argc, argv);
+ rb_threadptr_pending_interrupt_enque(target_th, exception);
+ rb_threadptr_interrupt(target_th);
/* To perform Thread.current.raise as Kernel.raise */
if (current_th == target_th) {
@@ -2926,7 +2967,10 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
*
* Terminates +thr+ and schedules another thread to be run, returning
* the terminated Thread. If this is the main thread, or the last
- * thread, exits the process.
+ * thread, exits the process. Note that the caller does not wait for
+ * the thread to terminate if the receiver is different from the currently
+ * running thread. The termination is asynchronous, and the thread can still
+ * run a small amount of ruby code before exiting.
*/
VALUE
@@ -4223,13 +4267,20 @@ rb_fd_init(rb_fdset_t *fds)
FD_ZERO(fds->fdset);
}
+static inline size_t
+fdset_memsize(int maxfd)
+{
+ size_t o = howmany(maxfd, NFDBITS) * sizeof(fd_mask);
+ if (o < sizeof(fd_set)) {
+ return sizeof(fd_set);
+ }
+ return o;
+}
+
void
rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
{
- size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
-
- if (size < sizeof(fd_set))
- size = sizeof(fd_set);
+ size_t size = fdset_memsize(rb_fd_max(src));
dst->maxfd = src->maxfd;
dst->fdset = xmalloc(size);
memcpy(dst->fdset, src->fdset, size);
@@ -4238,7 +4289,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
void
rb_fd_term(rb_fdset_t *fds)
{
- xfree(fds->fdset);
+ ruby_xfree_sized(fds->fdset, fdset_memsize(fds->maxfd));
fds->maxfd = 0;
fds->fdset = 0;
}
@@ -4253,14 +4304,11 @@ rb_fd_zero(rb_fdset_t *fds)
static void
rb_fd_resize(int n, rb_fdset_t *fds)
{
- size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask);
- size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask);
-
- if (m < sizeof(fd_set)) m = sizeof(fd_set);
- if (o < sizeof(fd_set)) o = sizeof(fd_set);
+ size_t m = fdset_memsize(n + 1);
+ size_t o = fdset_memsize(fds->maxfd);
if (m > o) {
- fds->fdset = xrealloc(fds->fdset, m);
+ fds->fdset = ruby_xrealloc_sized(fds->fdset, m, o);
memset((char *)fds->fdset + o, 0, m - o);
}
if (n >= fds->maxfd) fds->maxfd = n + 1;
@@ -4290,23 +4338,18 @@ rb_fd_isset(int n, const rb_fdset_t *fds)
void
rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
{
- size_t size = howmany(max, NFDBITS) * sizeof(fd_mask);
-
- if (size < sizeof(fd_set)) size = sizeof(fd_set);
+ size_t size = fdset_memsize(max);
+ dst->fdset = ruby_xrealloc_sized(dst->fdset, size, fdset_memsize(dst->maxfd));
dst->maxfd = max;
- dst->fdset = xrealloc(dst->fdset, size);
memcpy(dst->fdset, src, size);
}
void
rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
{
- size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
-
- if (size < sizeof(fd_set))
- size = sizeof(fd_set);
+ size_t size = fdset_memsize(rb_fd_max(src));
+ dst->fdset = ruby_xrealloc_sized(dst->fdset, size, fdset_memsize(dst->maxfd));
dst->maxfd = src->maxfd;
- dst->fdset = xrealloc(dst->fdset, size);
memcpy(dst->fdset, src->fdset, size);
}
@@ -4358,10 +4401,19 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
rb_fd_dup(dst, src);
}
+static inline size_t
+fdset_memsize(int capa)
+{
+ if (capa == FD_SETSIZE) {
+ return sizeof(fd_set);
+ }
+ return sizeof(unsigned int) + (capa * sizeof(SOCKET));
+}
+
void
rb_fd_term(rb_fdset_t *set)
{
- xfree(set->fdset);
+ ruby_xfree_sized(set->fdset, fdset_memsize(set->capa));
set->fdset = NULL;
set->capa = 0;
}
@@ -4574,7 +4626,7 @@ wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t n
* returns a mask of events
*/
static int
-thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
struct pollfd fds[1] = {{
.fd = fd,
@@ -4587,8 +4639,8 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
enum ruby_tag_type state;
volatile int lerrno;
- rb_execution_context_t *ec = GET_EC();
- rb_thread_t *th = rb_ec_thread_ptr(ec);
+ RUBY_ASSERT(th);
+ rb_execution_context_t *ec = th->ec;
if (io) {
blocking_operation.ec = ec;
@@ -4722,7 +4774,7 @@ init_set_fd(int fd, rb_fdset_t *fds)
}
static int
-thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
rb_fdset_t rfds, wfds, efds;
struct select_args args;
@@ -4731,7 +4783,7 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
struct rb_io_blocking_operation blocking_operation;
if (io) {
args.io = io;
- blocking_operation.ec = GET_EC();
+ blocking_operation.ec = th->ec;
rb_io_blocking_operation_enter(io, &blocking_operation);
args.blocking_operation = &blocking_operation;
}
@@ -4756,15 +4808,15 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
#endif /* ! USE_POLL */
int
-rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout)
{
- return thread_io_wait(NULL, fd, events, timeout);
+ return thread_io_wait(th, NULL, fd, events, timeout);
}
int
-rb_thread_io_wait(struct rb_io *io, int events, struct timeval * timeout)
+rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout)
{
- return thread_io_wait(io, io->fd, events, timeout);
+ return thread_io_wait(th, io, io->fd, events, timeout);
}
/*
@@ -4967,6 +5019,9 @@ static void
terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
{
if (th != current_th) {
+ // Clear the scheduler as it is no longer operational:
+ th->scheduler = Qnil;
+
rb_native_mutex_initialize(&th->interrupt_lock);
rb_mutex_abandon_keeping_mutexes(th);
rb_mutex_abandon_locking_mutex(th);
@@ -4982,6 +5037,7 @@ rb_thread_atfork(void)
rb_threadptr_pending_interrupt_clear(th);
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
+ th->scheduler = Qnil;
rb_fiber_atfork(th);
/* We don't want reproduce CVE-2003-0900. */
@@ -5251,7 +5307,7 @@ rb_thread_shield_owned(VALUE self)
rb_mutex_t *m = mutex_ptr(mutex);
- return m->fiber == GET_EC()->fiber_ptr;
+ return m->ec_serial == rb_ec_serial(GET_EC());
}
/*
@@ -5270,7 +5326,7 @@ rb_thread_shield_wait(VALUE self)
if (!mutex) return Qfalse;
m = mutex_ptr(mutex);
- if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
+ if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil;
rb_thread_shield_waiting_inc(self);
rb_mutex_lock(mutex);
rb_thread_shield_waiting_dec(self);
@@ -5787,8 +5843,8 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
- (void *)mutex->fiber, rb_mutex_num_waiting(mutex));
+ rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE,
+ (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex));
}
{
@@ -5828,7 +5884,7 @@ rb_check_deadlock(rb_ractor_t *r)
}
else if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) {
+ if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) {
found = 1;
}
}
@@ -5850,7 +5906,7 @@ static void
update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
{
const rb_control_frame_t *cfp = GET_EC()->cfp;
- VALUE coverage = rb_iseq_coverage(cfp->iseq);
+ VALUE coverage = rb_iseq_coverage(CFP_ISEQ(cfp));
if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
if (lines) {
@@ -5860,7 +5916,7 @@ update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
VALUE num;
void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
- rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
+ rb_iseq_clear_event_flags(CFP_ISEQ(cfp), CFP_PC(cfp) - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
rb_ary_push(lines, LONG2FIX(line + 1));
return;
}
@@ -5881,12 +5937,12 @@ static void
update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
{
const rb_control_frame_t *cfp = GET_EC()->cfp;
- VALUE coverage = rb_iseq_coverage(cfp->iseq);
+ VALUE coverage = rb_iseq_coverage(CFP_ISEQ(cfp));
if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
if (branches) {
- long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1;
- long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count;
+ long pc = CFP_PC(cfp) - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded - 1;
+ long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(CFP_ISEQ(cfp)), pc)), count;
VALUE counters = RARRAY_AREF(branches, 1);
VALUE num = RARRAY_AREF(counters, idx);
count = FIX2LONG(num) + 1;
@@ -6233,7 +6289,7 @@ threadptr_interrupt_exec_exec(rb_thread_t *th)
else {
(*task->func)(task->data);
}
- ruby_xfree(task);
+ SIZED_FREE(task);
}
else {
break;
@@ -6249,7 +6305,7 @@ threadptr_interrupt_exec_cleanup(rb_thread_t *th)
struct rb_interrupt_exec_task *task;
while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) {
- ruby_xfree(task);
+ SIZED_FREE(task);
}
}
rb_native_mutex_unlock(&th->interrupt_lock);