summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c655
1 files changed, 426 insertions, 229 deletions
diff --git a/thread.c b/thread.c
index d09767cb75..e66352c03f 100644
--- a/thread.c
+++ b/thread.c
@@ -78,6 +78,7 @@
#include "internal/class.h"
#include "internal/cont.h"
#include "internal/error.h"
+#include "internal/eval.h"
#include "internal/gc.h"
#include "internal/hash.h"
#include "internal/io.h"
@@ -99,6 +100,8 @@
#include "vm_debug.h"
#include "vm_sync.h"
+#include "ccan/list/list.h"
+
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
#define RUBY_THREAD_PRIORITY_MAX 3
@@ -146,16 +149,9 @@ static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
MAYBE_UNUSED(static int consume_communication_pipe(int fd));
-static volatile int system_working = 1;
+static rb_atomic_t system_working = 1;
static rb_internal_thread_specific_key_t specific_key_count;
-struct waiting_fd {
- struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
- rb_thread_t *th;
- int fd;
- struct rb_io_close_wait_list *busy;
-};
-
/********************************************************************************/
#define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
@@ -178,7 +174,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
#define THREAD_BLOCKING_END(th) \
thread_sched_to_running((sched), (th)); \
- rb_ractor_thread_switch(th->ractor, th); \
+ rb_ractor_thread_switch(th->ractor, th, false); \
} while(0)
#ifdef __GNUC__
@@ -212,6 +208,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
static inline int
vm_check_ints_blocking(rb_execution_context_t *ec)
{
+#ifdef RUBY_ASSERT_CRITICAL_SECTION
+ VM_ASSERT(ruby_assert_critical_section_entered == 0);
+#endif
+
rb_thread_t *th = rb_ec_thread_ptr(ec);
if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
@@ -221,7 +221,18 @@ vm_check_ints_blocking(rb_execution_context_t *ec)
th->pending_interrupt_queue_checked = 0;
RUBY_VM_SET_INTERRUPT(ec);
}
- return rb_threadptr_execute_interrupts(th, 1);
+
+ int result = rb_threadptr_execute_interrupts(th, 1);
+
+ // When a signal is received, we yield to the scheduler as soon as possible:
+ if (result || RUBY_VM_INTERRUPTED(ec)) {
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_yield(scheduler);
+ }
+ }
+
+ return result;
}
int
@@ -340,7 +351,7 @@ unblock_function_clear(rb_thread_t *th)
}
static void
-threadptr_interrupt_locked(rb_thread_t *th, bool trap)
+threadptr_set_interrupt_locked(rb_thread_t *th, bool trap)
{
// th->interrupt_lock should be acquired here
@@ -362,26 +373,27 @@ threadptr_interrupt_locked(rb_thread_t *th, bool trap)
}
static void
-threadptr_interrupt(rb_thread_t *th, int trap)
+threadptr_set_interrupt(rb_thread_t *th, int trap)
{
rb_native_mutex_lock(&th->interrupt_lock);
{
- threadptr_interrupt_locked(th, trap);
+ threadptr_set_interrupt_locked(th, trap);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
+/* Set interrupt flag on another thread or current thread, and call its UBF if it has one set */
void
rb_threadptr_interrupt(rb_thread_t *th)
{
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
- threadptr_interrupt(th, false);
+ threadptr_set_interrupt(th, false);
}
static void
threadptr_trap_interrupt(rb_thread_t *th)
{
- threadptr_interrupt(th, true);
+ threadptr_set_interrupt(th, true);
}
static void
@@ -441,8 +453,8 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
th->keeping_mutexes = mutex->next_mutex;
// rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
-
- const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
+ VM_ASSERT(mutex->ec_serial);
+ const char *error_message = rb_mutex_unlock_th(mutex, th, 0);
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
}
}
@@ -519,12 +531,8 @@ thread_cleanup_func(void *th_ptr, int atfork)
th->locking_mutex = Qfalse;
thread_cleanup_func_before_exec(th_ptr);
- /*
- * Unfortunately, we can't release native threading resource at fork
- * because libc may have unstable locking state therefore touching
- * a threading resource may cause a deadlock.
- */
if (atfork) {
+ native_thread_destroy_atfork(th->nt);
th->nt = NULL;
return;
}
@@ -532,6 +540,15 @@ thread_cleanup_func(void *th_ptr, int atfork)
rb_native_mutex_destroy(&th->interrupt_lock);
}
+void
+rb_thread_free_native_thread(void *th_ptr)
+{
+ rb_thread_t *th = th_ptr;
+
+ native_thread_destroy_atfork(th->nt);
+ th->nt = NULL;
+}
+
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
static VALUE rb_thread_to_s(VALUE thread);
@@ -707,6 +724,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
/* fatal error within this thread, need to stop whole script */
}
else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
+
/* exit on main_thread. */
}
else {
@@ -839,8 +860,9 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
#endif
th->invoke_type = thread_invoke_type_ractor_proc;
th->ractor = params->g;
+ th->ec->ractor_id = rb_ractor_id(th->ractor);
th->ractor->threads.main = th;
- th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc);
+ th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil);
th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
rb_ractor_send_parameters(ec, params->g, params->args);
@@ -1003,7 +1025,7 @@ rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
.args = args,
.proc = proc,
};
- return thread_create_core(rb_thread_alloc(rb_cThread), &params);;
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
}
@@ -1054,25 +1076,30 @@ thread_join_sleep(VALUE arg)
}
while (!thread_finished(target_th)) {
- VALUE scheduler = rb_fiber_scheduler_current();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
- if (scheduler != Qnil) {
- rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout);
- // Check if the target thread is finished after blocking:
- if (thread_finished(target_th)) break;
- // Otherwise, a timeout occurred:
- else return Qfalse;
- }
- else if (!limit) {
- sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS);
+ if (!limit) {
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_block(scheduler, target_th->self, Qnil);
+ }
+ else {
+ sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS);
+ }
}
else {
if (hrtime_update_expire(limit, end)) {
RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th));
return Qfalse;
}
- th->status = THREAD_STOPPED;
- native_sleep(th, limit);
+
+ if (scheduler != Qnil) {
+ VALUE timeout = rb_float_new(hrtime2double(*limit));
+ rb_fiber_scheduler_block(scheduler, target_th->self, timeout);
+ }
+ else {
+ th->status = THREAD_STOPPED;
+ native_sleep(th, limit);
+ }
}
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
th->status = THREAD_RUNNABLE;
@@ -1131,6 +1158,10 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
/* OK. killed. */
break;
default:
+ if (err == RUBY_FATAL_FIBER_KILLED) { // not integer constant so can't be a case expression
+ // root fiber killed in non-main thread
+ break;
+ }
rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
}
}
@@ -1394,17 +1425,18 @@ rb_thread_sleep_deadly(void)
static void
rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
{
- VALUE scheduler = rb_fiber_scheduler_current();
+ rb_thread_t *th = GET_THREAD();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
if (scheduler != Qnil) {
rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
RUBY_DEBUG_LOG("...");
if (end) {
- sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
+ sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK);
}
else {
- sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ sleep_forever(th, SLEEP_DEADLOCKABLE);
}
}
}
@@ -1471,7 +1503,7 @@ rb_thread_schedule_limits(uint32_t limits_us)
RB_VM_SAVE_MACHINE_CONTEXT(th);
thread_sched_yield(TH_SCHED(th), th);
- rb_ractor_thread_switch(th->ractor, th);
+ rb_ractor_thread_switch(th->ractor, th, true);
RUBY_DEBUG_LOG("switch %s", "done");
}
@@ -1519,7 +1551,7 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
unregister_ubf_list(th);
thread_sched_to_running(TH_SCHED(th), th);
- rb_ractor_thread_switch(th->ractor, th);
+ rb_ractor_thread_switch(th->ractor, th, false);
th->blocking_region_buffer = 0;
rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
@@ -1535,6 +1567,29 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
#endif
}
+/*
+ * Resolve sentinel unblock function values to their actual function pointers
+ * and appropriate data2 values. This centralizes the logic for handling
+ * RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values.
+ *
+ * @param unblock_function Pointer to unblock function pointer (modified in place)
+ * @param data2 Pointer to data2 pointer (modified in place)
+ * @param thread Thread context for resolving data2 when needed
+ * @return true if sentinel values were resolved, false otherwise
+ */
+bool
+rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread)
+{
+ rb_unblock_function_t *ubf = *unblock_function;
+
+ if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
+ *unblock_function = ubf_select;
+ *data2 = thread;
+ return true;
+ }
+ return false;
+}
+
void *
rb_nogvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2,
@@ -1543,7 +1598,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
if (flags & RB_NOGVL_OFFLOAD_SAFE) {
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
- struct rb_fiber_scheduler_blocking_operation_state state;
+ struct rb_fiber_scheduler_blocking_operation_state state = {0};
VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
@@ -1561,11 +1616,9 @@ rb_nogvl(void *(*func)(void *), void *data1,
bool is_main_thread = vm->ractor.main_thread == th;
int saved_errno = 0;
- if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
- ubf = ubf_select;
- data2 = th;
- }
- else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
+ rb_thread_resolve_unblock_function(&ubf, &data2, th);
+
+ if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
vm->ubf_async_safe = 1;
}
@@ -1694,47 +1747,146 @@ waitfd_to_waiting_flag(int wfd_event)
return wfd_event << 1;
}
-static void
-thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
+static struct ccan_list_head *
+rb_io_blocking_operations(struct rb_io *io)
{
- wfd->fd = fd;
- wfd->th = th;
- wfd->busy = NULL;
+ rb_serial_t fork_generation = GET_VM()->fork_gen;
- RB_VM_LOCK_ENTER();
- {
- ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
+ // On fork, all existing entries in this list (which are stack allocated) become invalid.
+ // Therefore, we re-initialize the list which clears it.
+ if (io->fork_generation != fork_generation) {
+ ccan_list_head_init(&io->blocking_operations);
+ io->fork_generation = fork_generation;
}
- RB_VM_LOCK_LEAVE();
+
+ return &io->blocking_operations;
+}
+
+/*
+ * Registers a blocking operation for an IO object. This is used to track all threads and fibers
+ * that are currently blocked on this IO for reading, writing or other operations.
+ *
+ * When the IO is closed, all blocking operations will be notified via rb_fiber_scheduler_fiber_interrupt
+ * for fibers with a scheduler, or via rb_threadptr_interrupt for threads without a scheduler.
+ *
+ * @parameter io The IO object on which the operation will block
+ * @parameter blocking_operation The operation details including the execution context that will be blocked
+ */
+static void
+rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
}
static void
-thread_io_wake_pending_closer(struct waiting_fd *wfd)
+rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
{
- bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
- if (has_waiter) {
- rb_mutex_lock(wfd->busy->wakeup_mutex);
+ ccan_list_del(&blocking_operation->list);
+}
+
+struct io_blocking_operation_arguments {
+ struct rb_io *io;
+ struct rb_io_blocking_operation *blocking_operation;
+};
+
+static VALUE
+io_blocking_operation_exit(VALUE _arguments)
+{
+ struct io_blocking_operation_arguments *arguments = (void*)_arguments;
+ struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
+
+ rb_io_blocking_operation_pop(arguments->io, blocking_operation);
+
+ rb_io_t *io = arguments->io;
+ rb_thread_t *thread = io->closing_ec->thread_ptr;
+ rb_fiber_t *fiber = io->closing_ec->fiber_ptr;
+
+ if (thread->scheduler != Qnil) {
+ // This can cause spurious wakeups...
+ rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber));
+ }
+ else {
+ rb_thread_wakeup(thread->self);
}
- /* Needs to be protected with RB_VM_LOCK because we don't know if
- wfd is on the global list of pending FD ops or if it's on a
- struct rb_io_close_wait_list close-waiter. */
- RB_VM_LOCK_ENTER();
- ccan_list_del(&wfd->wfd_node);
- RB_VM_LOCK_LEAVE();
+ return Qnil;
+}
- if (has_waiter) {
- rb_thread_t *th = rb_thread_ptr(wfd->busy->closing_thread);
- if (th->scheduler != Qnil) {
- rb_fiber_scheduler_unblock(th->scheduler, wfd->busy->closing_thread, wfd->busy->closing_fiber);
- }
- else {
- rb_thread_wakeup(wfd->busy->closing_thread);
- }
- rb_mutex_unlock(wfd->busy->wakeup_mutex);
+/*
+ * Called when a blocking operation completes or is interrupted. Removes the operation from
+ * the IO's blocking_operations list and wakes up any waiting threads/fibers.
+ *
+ * If there's a wakeup_mutex (meaning an IO close is in progress), synchronizes the cleanup
+ * through that mutex to ensure proper coordination with the closing thread.
+ *
+ * @parameter io The IO object the operation was performed on
+ * @parameter blocking_operation The completed operation to clean up
+ */
+static void
+rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ VALUE wakeup_mutex = io->wakeup_mutex;
+
+ // Indicate that the blocking operation is no longer active:
+ blocking_operation->ec = NULL;
+
+ if (RB_TEST(wakeup_mutex)) {
+ struct io_blocking_operation_arguments arguments = {
+ .io = io,
+ .blocking_operation = blocking_operation
+ };
+
+ rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
+ }
+ else {
+ // If there's no wakeup_mutex, we can safely remove the operation directly:
+ rb_io_blocking_operation_pop(io, blocking_operation);
}
}
+static VALUE
+rb_thread_io_blocking_operation_ensure(VALUE _argument)
+{
+ struct io_blocking_operation_arguments *arguments = (void*)_argument;
+
+ rb_io_blocking_operation_exit(arguments->io, arguments->blocking_operation);
+
+ return Qnil;
+}
+
+/*
+ * Executes a function that performs a blocking IO operation, while properly tracking
+ * the operation in the IO's blocking_operations list. This ensures proper cleanup
+ * and interruption handling if the IO is closed while blocked.
+ *
+ * The operation is automatically removed from the blocking_operations list when the function
+ * returns, whether normally or due to an exception.
+ *
+ * @parameter self The IO object
+ * @parameter function The function to execute that will perform the blocking operation
+ * @parameter argument The argument to pass to the function
+ * @returns The result of the blocking operation function
+ */
+VALUE
+rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
+{
+ struct rb_io *io;
+ RB_IO_POINTER(self, io);
+
+ rb_execution_context_t *ec = GET_EC();
+ struct rb_io_blocking_operation blocking_operation = {
+ .ec = ec,
+ };
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+
+ struct io_blocking_operation_arguments io_blocking_operation_arguments = {
+ .io = io,
+ .blocking_operation = &blocking_operation
+ };
+
+ return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments);
+}
+
static bool
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
{
@@ -1802,12 +1954,11 @@ rb_thread_mn_schedulable(VALUE thval)
VALUE
rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events)
{
- rb_execution_context_t *volatile ec = GET_EC();
- rb_thread_t *volatile th = rb_ec_thread_ptr(ec);
+ rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events);
- struct waiting_fd waiting_fd;
volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
enum ruby_tag_type state;
@@ -1822,7 +1973,11 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
// `func` or not (as opposed to some previously set value).
errno = 0;
- thread_io_setup_wfd(th, fd, &waiting_fd);
+ struct rb_io_blocking_operation blocking_operation = {
+ .ec = ec,
+ };
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+
{
EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
@@ -1833,13 +1988,16 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
saved_errno = errno;
}, ubf_select, th, FALSE);
- th = rb_ec_thread_ptr(ec);
+ RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
if (events &&
blocking_call_retryable_p((int)val, saved_errno) &&
thread_io_wait_events(th, fd, events, NULL)) {
RUBY_VM_CHECK_INTS_BLOCKING(ec);
goto retry;
}
+
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+
state = saved_state;
}
EC_POP_TAG();
@@ -1847,17 +2005,12 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
th = rb_ec_thread_ptr(ec);
th->mn_schedulable = prev_mn_schedulable;
}
- /*
- * must be deleted before jump
- * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
- */
- thread_io_wake_pending_closer(&waiting_fd);
+
+ rb_io_blocking_operation_exit(io, &blocking_operation);
if (state) {
EC_JUMP_TAG(ec, state);
}
- /* TODO: check func() */
- RUBY_VM_CHECK_INTS_BLOCKING(ec);
// If the error was a timeout, we raise a specific exception for that:
if (saved_errno == ETIMEDOUT) {
@@ -1902,6 +2055,9 @@ rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, voi
* created as Ruby thread (created by Thread.new or so). In other
* words, this function *DOES NOT* associate or convert a NON-Ruby
* thread to a Ruby thread.
+ *
+ * NOTE: If this thread has already acquired the GVL, then the method call
+ * is performed without acquiring or releasing the GVL (from Ruby 4.0).
*/
void *
rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
@@ -1925,7 +2081,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
prev_unblock = th->unblock;
if (brb == 0) {
- rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
+ /* the GVL is already acquired, call method directly */
+ return (*func)(data1);
}
blocking_region_end(th, brb);
@@ -2426,6 +2583,7 @@ NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
static void
rb_threadptr_to_kill(rb_thread_t *th)
{
+ VM_ASSERT(GET_THREAD() == th);
rb_threadptr_pending_interrupt_clear(th);
th->status = THREAD_RUNNABLE;
th->to_kill = 1;
@@ -2440,8 +2598,9 @@ threadptr_get_interrupts(rb_thread_t *th)
rb_atomic_t interrupt;
rb_atomic_t old;
+ old = ATOMIC_LOAD_RELAXED(ec->interrupt_flag);
do {
- interrupt = ec->interrupt_flag;
+ interrupt = old;
old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
} while (old != interrupt);
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
@@ -2449,6 +2608,11 @@ threadptr_get_interrupts(rb_thread_t *th)
static void threadptr_interrupt_exec_exec(rb_thread_t *th);
+// Execute interrupts on currently running thread
+// In certain situations, calling this function will raise an exception. Some examples are:
+// * during VM shutdown (`rb_ractor_terminate_all`)
+// * Call to Thread#exit for current thread (`rb_thread_kill`)
+// * Call to Thread#raise for current thread
int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
@@ -2456,6 +2620,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
int postponed_job_interrupt = 0;
int ret = FALSE;
+ VM_ASSERT(GET_THREAD() == th);
+
if (th->ec->raised_flag) return ret;
while ((interrupt = threadptr_get_interrupts(th)) != 0) {
@@ -2472,8 +2638,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
- RB_VM_LOCK_ENTER();
- RB_VM_LOCK_LEAVE();
+ RB_VM_LOCKING();
}
if (postponed_job_interrupt) {
@@ -2572,18 +2737,11 @@ rb_threadptr_ready(rb_thread_t *th)
static VALUE
rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
{
- VALUE exc;
-
if (rb_threadptr_dead(target_th)) {
return Qnil;
}
- if (argc == 0) {
- exc = rb_exc_new(rb_eRuntimeError, 0, 0);
- }
- else {
- exc = rb_make_exception(argc, argv);
- }
+ VALUE exception = rb_exception_setup(argc, argv);
/* making an exception object can switch thread,
so we need to check thread deadness again */
@@ -2591,9 +2749,9 @@ rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
return Qnil;
}
- rb_ec_setup_exception(GET_EC(), exc, Qundef);
- rb_threadptr_pending_interrupt_enque(target_th, exc);
+ rb_threadptr_pending_interrupt_enque(target_th, exception);
rb_threadptr_interrupt(target_th);
+
return Qnil;
}
@@ -2639,86 +2797,122 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
return 1;
}
-int
-rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
+/*
+ * Thread-safe IO closing mechanism.
+ *
+ * When an IO is closed while other threads or fibers are blocked on it, we need to:
+ * 1. Track and notify all blocking operations through io->blocking_operations
+ * 2. Ensure only one thread can close at a time using io->closing_ec
+ * 3. Synchronize cleanup using wakeup_mutex
+ *
+ * The close process works as follows:
+ * - First check if any thread is already closing (io->closing_ec)
+ * - Set up wakeup_mutex for synchronization
+ * - Iterate through all blocking operations in io->blocking_operations
+ * - For each blocked fiber with a scheduler:
+ * - Notify via rb_fiber_scheduler_fiber_interrupt
+ * - For each blocked thread without a scheduler:
+ * - Enqueue IOError via rb_threadptr_pending_interrupt_enque
+ * - Wake via rb_threadptr_interrupt
+ * - Wait on wakeup_mutex until all operations are cleaned up
+ * - Only then clear closing state and allow actual close to proceed
+ */
+static VALUE
+thread_io_close_notify_all(VALUE _io)
{
- rb_vm_t *vm = GET_THREAD()->vm;
- struct waiting_fd *wfd = 0, *next;
- ccan_list_head_init(&busy->pending_fd_users);
- int has_any;
- VALUE wakeup_mutex;
+ struct rb_io *io = (struct rb_io *)_io;
- RB_VM_LOCK_ENTER();
- {
- ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
- if (wfd->fd == fd) {
- rb_thread_t *th = wfd->th;
- VALUE err;
-
- ccan_list_del(&wfd->wfd_node);
- ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
-
- wfd->busy = busy;
- err = th->vm->special_exceptions[ruby_error_stream_closed];
- rb_threadptr_pending_interrupt_enque(th, err);
- rb_threadptr_interrupt(th);
+ size_t count = 0;
+ rb_vm_t *vm = io->closing_ec->thread_ptr->vm;
+ VALUE error = vm->special_exceptions[ruby_error_stream_closed];
+
+ struct rb_io_blocking_operation *blocking_operation;
+ ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
+ rb_execution_context_t *ec = blocking_operation->ec;
+
+ // If the operation is in progress, we need to interrupt it:
+ if (ec) {
+ rb_thread_t *thread = ec->thread_ptr;
+
+ VALUE result = RUBY_Qundef;
+ if (thread->scheduler != Qnil) {
+ result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
+ }
+
+ if (result == RUBY_Qundef) {
+ // If the thread is not the current thread, we need to enqueue an error:
+ rb_threadptr_pending_interrupt_enque(thread, error);
+ rb_threadptr_interrupt(thread);
}
}
+
+ count += 1;
}
- has_any = !ccan_list_empty(&busy->pending_fd_users);
- busy->closing_thread = rb_thread_current();
- busy->closing_fiber = rb_fiber_current();
- wakeup_mutex = Qnil;
- if (has_any) {
- wakeup_mutex = rb_mutex_new();
- RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
+ return (VALUE)count;
+}
+
+size_t
+rb_thread_io_close_interrupt(struct rb_io *io)
+{
+ // We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function.
+ if (io->closing_ec) {
+ return 0;
+ }
+
+ // If there are no blocking operations, we are done:
+ if (ccan_list_empty(rb_io_blocking_operations(io))) {
+ return 0;
}
- busy->wakeup_mutex = wakeup_mutex;
- RB_VM_LOCK_LEAVE();
+ // Otherwise, we are now closing the IO:
+ rb_execution_context_t *ec = GET_EC();
+ io->closing_ec = ec;
+
+ // This is used to ensure the correct execution context is woken up after the blocking operation is interrupted:
+ io->wakeup_mutex = rb_mutex_new();
+ rb_mutex_allow_trap(io->wakeup_mutex, 1);
+
+ // We need to use a mutex here as entering the fiber scheduler may cause a context switch:
+ VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io);
- /* If the caller didn't pass *busy as a pointer to something on the stack,
- we need to guard this mutex object on _our_ C stack for the duration
- of this function. */
- RB_GC_GUARD(wakeup_mutex);
- return has_any;
+ return (size_t)result;
}
void
-rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
+rb_thread_io_close_wait(struct rb_io* io)
{
- if (!RB_TEST(busy->wakeup_mutex)) {
- /* There was nobody else using this file when we closed it, so we
- never bothered to allocate a mutex*/
+ VALUE wakeup_mutex = io->wakeup_mutex;
+
+ if (!RB_TEST(wakeup_mutex)) {
+ // There was nobody else using this file when we closed it, so we never bothered to allocate a mutex:
return;
}
- rb_mutex_lock(busy->wakeup_mutex);
- while (!ccan_list_empty(&busy->pending_fd_users)) {
- rb_mutex_sleep(busy->wakeup_mutex, Qnil);
+ rb_mutex_lock(wakeup_mutex);
+ while (!ccan_list_empty(rb_io_blocking_operations(io))) {
+ rb_mutex_sleep(wakeup_mutex, Qnil);
}
- rb_mutex_unlock(busy->wakeup_mutex);
+ rb_mutex_unlock(wakeup_mutex);
+
+ // We are done closing:
+ io->wakeup_mutex = Qnil;
+ io->closing_ec = NULL;
}
void
rb_thread_fd_close(int fd)
{
- struct rb_io_close_wait_list busy;
-
- if (rb_notify_fd_close(fd, &busy)) {
- rb_notify_fd_close_wait(&busy);
- }
+ rb_warn("rb_thread_fd_close is deprecated (and is now a no-op).");
}
/*
* call-seq:
- * thr.raise
- * thr.raise(string)
- * thr.raise(exception [, string [, array]])
+ * raise(exception, message = exception.to_s, backtrace = nil, cause: $!)
+ * raise(message = nil, cause: $!)
*
* Raises an exception from the given thread. The caller does not have to be
- * +thr+. See Kernel#raise for more information.
+ * +thr+. See Kernel#raise for more information on arguments.
*
* Thread.abort_on_exception = true
* a = Thread.new { sleep(200) }
@@ -2757,7 +2951,10 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
*
* Terminates +thr+ and schedules another thread to be run, returning
* the terminated Thread. If this is the main thread, or the last
- * thread, exits the process.
+ * thread, exits the process. Note that the caller does not wait for
+ * the thread to terminate if the receiver is different from the currently
+ * running thread. The termination is asynchronous, and the thread can still
+ * run a small amount of ruby code before exiting.
*/
VALUE
@@ -4322,6 +4519,8 @@ do_select(VALUE p)
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
} while (wait_retryable(&result, lerrno, to, endtime) && do_select_update());
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec);
+
if (result < 0) {
errno = lerrno;
}
@@ -4403,7 +4602,7 @@ wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t n
* returns a mask of events
*/
static int
-thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
struct pollfd fds[1] = {{
.fd = fd,
@@ -4412,14 +4611,17 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
}};
volatile int result = 0;
nfds_t nfds;
- struct waiting_fd wfd;
+ struct rb_io_blocking_operation blocking_operation;
enum ruby_tag_type state;
volatile int lerrno;
- rb_execution_context_t *ec = GET_EC();
- rb_thread_t *th = rb_ec_thread_ptr(ec);
+ RUBY_ASSERT(th);
+ rb_execution_context_t *ec = th->ec;
- thread_io_setup_wfd(th, fd, &wfd);
+ if (io) {
+ blocking_operation.ec = ec;
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+ }
if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
// fd is readable
@@ -4428,25 +4630,30 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
errno = 0;
}
else {
- EC_PUSH_TAG(wfd.th->ec);
+ EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_hrtime_t *to, rel, end = 0;
- RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
timeout_prepare(&to, &rel, &end, timeout);
do {
nfds = numberof(fds);
- result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno);
+ result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);
- RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
} while (wait_retryable(&result, lerrno, to, end));
+
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
+
EC_POP_TAG();
}
- thread_io_wake_pending_closer(&wfd);
+ if (io) {
+ rb_io_blocking_operation_exit(io, &blocking_operation);
+ }
if (state) {
- EC_JUMP_TAG(wfd.th->ec, state);
+ EC_JUMP_TAG(ec, state);
}
if (result < 0) {
@@ -4479,6 +4686,9 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
}
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
struct select_args {
+ struct rb_io *io;
+ struct rb_io_blocking_operation *blocking_operation;
+
union {
int fd;
int error;
@@ -4486,7 +4696,6 @@ struct select_args {
rb_fdset_t *read;
rb_fdset_t *write;
rb_fdset_t *except;
- struct waiting_fd wfd;
struct timeval *tv;
};
@@ -4517,7 +4726,10 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;
- thread_io_wake_pending_closer(&args->wfd);
+ if (args->blocking_operation) {
+ rb_io_blocking_operation_exit(args->io, args->blocking_operation);
+ }
+
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
@@ -4538,39 +4750,49 @@ init_set_fd(int fd, rb_fdset_t *fds)
}
static int
-thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
rb_fdset_t rfds, wfds, efds;
struct select_args args;
- int r;
VALUE ptr = (VALUE)&args;
- rb_thread_t *th = GET_THREAD();
+
+ struct rb_io_blocking_operation blocking_operation;
+ if (io) {
+ args.io = io;
+ blocking_operation.ec = th->ec;
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+ args.blocking_operation = &blocking_operation;
+ }
+ else {
+ args.io = NULL;
+ blocking_operation.ec = NULL;
+ args.blocking_operation = NULL;
+ }
args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = timeout;
- thread_io_setup_wfd(th, fd, &args.wfd);
- r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
- if (r == -1)
+ int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
+ if (result == -1)
errno = args.as.error;
- return r;
+ return result;
}
#endif /* ! USE_POLL */
int
-rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout)
{
- return thread_io_wait(NULL, fd, events, timeout);
+ return thread_io_wait(th, NULL, fd, events, timeout);
}
int
-rb_thread_io_wait(struct rb_io *io, int events, struct timeval * timeout)
+rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout)
{
- return thread_io_wait(io, io->fd, events, timeout);
+ return thread_io_wait(th, io, io->fd, events, timeout);
}
/*
@@ -4583,7 +4805,7 @@ rb_gc_set_stack_end(VALUE **stack_end_p)
{
VALUE stack_end;
COMPILER_WARNING_PUSH
-#if __has_warning("-Wdangling-pointer")
+#if RBIMPL_COMPILER_IS(GCC)
COMPILER_WARNING_IGNORED(-Wdangling-pointer);
#endif
*stack_end_p = &stack_end;
@@ -4737,9 +4959,13 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
thread_sched_atfork(TH_SCHED(th));
ubf_list_atfork();
+ rb_signal_atfork();
// OK. Only this thread accesses:
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
+ if (r != vm->ractor.main_ractor) {
+ rb_ractor_terminate_atfork(vm, r);
+ }
ccan_list_for_each(&r->threads.set, i, lt_node) {
atfork(i, th);
}
@@ -4769,6 +4995,9 @@ static void
terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
{
if (th != current_th) {
+ // Clear the scheduler as it is no longer operational:
+ th->scheduler = Qnil;
+
rb_native_mutex_initialize(&th->interrupt_lock);
rb_mutex_abandon_keeping_mutexes(th);
rb_mutex_abandon_locking_mutex(th);
@@ -4784,6 +5013,7 @@ rb_thread_atfork(void)
rb_threadptr_pending_interrupt_clear(th);
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
+ th->scheduler = Qnil;
rb_fiber_atfork(th);
/* We don't want reproduce CVE-2003-0900. */
@@ -5053,7 +5283,7 @@ rb_thread_shield_owned(VALUE self)
rb_mutex_t *m = mutex_ptr(mutex);
- return m->fiber == GET_EC()->fiber_ptr;
+ return m->ec_serial == rb_ec_serial(GET_EC());
}
/*
@@ -5072,7 +5302,7 @@ rb_thread_shield_wait(VALUE self)
if (!mutex) return Qfalse;
m = mutex_ptr(mutex);
- if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
+ if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil;
rb_thread_shield_waiting_inc(self);
rb_mutex_lock(mutex);
rb_thread_shield_waiting_dec(self);
@@ -5589,8 +5819,8 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
- (void *)mutex->fiber, rb_mutex_num_waiting(mutex));
+ rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE,
+ (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex));
}
{
@@ -5630,7 +5860,7 @@ rb_check_deadlock(rb_ractor_t *r)
}
else if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) {
+ if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) {
found = 1;
}
}
@@ -5648,21 +5878,6 @@ rb_check_deadlock(rb_ractor_t *r)
}
}
-// Used for VM memsize reporting. Returns the size of a list of waiting_fd
-// structs. Defined here because the struct definition lives here as well.
-size_t
-rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds)
-{
- struct waiting_fd *waitfd = 0;
- size_t size = 0;
-
- ccan_list_for_each(waiting_fds, waitfd, wfd_node) {
- size += sizeof(struct waiting_fd);
- }
-
- return size;
-}
-
static void
update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
{
@@ -6024,7 +6239,7 @@ rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, voi
rb_native_mutex_lock(&th->interrupt_lock);
{
ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
- threadptr_interrupt_locked(th, true);
+ threadptr_set_interrupt_locked(th, true);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
@@ -6041,8 +6256,15 @@ threadptr_interrupt_exec_exec(rb_thread_t *th)
}
rb_native_mutex_unlock(&th->interrupt_lock);
+ RUBY_DEBUG_LOG("task:%p", task);
+
if (task) {
- (*task->func)(task->data);
+ if (task->flags & rb_interrupt_exec_flag_new_thread) {
+ rb_thread_create(task->func, task->data);
+ }
+ else {
+ (*task->func)(task->data);
+ }
ruby_xfree(task);
}
else {
@@ -6065,40 +6287,15 @@ threadptr_interrupt_exec_cleanup(rb_thread_t *th)
rb_native_mutex_unlock(&th->interrupt_lock);
}
-struct interrupt_ractor_new_thread_data {
- rb_interrupt_exec_func_t *func;
- void *data;
-};
-
-static VALUE
-interrupt_ractor_new_thread_func(void *data)
-{
- struct interrupt_ractor_new_thread_data d = *(struct interrupt_ractor_new_thread_data *)data;
- ruby_xfree(data);
-
- d.func(d.data);
- return Qnil;
-}
-
-static VALUE
-interrupt_ractor_func(void *data)
-{
- rb_thread_create(interrupt_ractor_new_thread_func, data);
- return Qnil;
-}
-
// native thread safe
// func/data should be native thread safe
void
rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
{
- struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data);
+ RUBY_DEBUG_LOG("flags:%d", (int)flags);
- d->func = func;
- d->data = data;
rb_thread_t *main_th = target_r->threads.main;
- rb_threadptr_interrupt_exec(main_th, interrupt_ractor_func, d, flags);
-
- // TODO MEMO: we can create a new thread in a ractor, but not sure how to do that now.
+ rb_threadptr_interrupt_exec(main_th, func, data, flags | rb_interrupt_exec_flag_new_thread);
}
+