summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c1608
1 files changed, 1086 insertions, 522 deletions
diff --git a/thread.c b/thread.c
index 4ea9716139..e66352c03f 100644
--- a/thread.c
+++ b/thread.c
@@ -78,6 +78,7 @@
#include "internal/class.h"
#include "internal/cont.h"
#include "internal/error.h"
+#include "internal/eval.h"
#include "internal/gc.h"
#include "internal/hash.h"
#include "internal/io.h"
@@ -89,7 +90,6 @@
#include "internal/time.h"
#include "internal/warnings.h"
#include "iseq.h"
-#include "rjit.h"
#include "ruby/debug.h"
#include "ruby/io.h"
#include "ruby/thread.h"
@@ -100,9 +100,7 @@
#include "vm_debug.h"
#include "vm_sync.h"
-#if USE_RJIT && defined(HAVE_SYS_WAIT_H)
-#include <sys/wait.h>
-#endif
+#include "ccan/list/list.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -111,15 +109,13 @@
#endif
static VALUE rb_cThreadShield;
+static VALUE cThGroup;
static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
-enum SLEEP_FLAGS {
- SLEEP_DEADLOCKABLE = 0x1,
- SLEEP_SPURIOUS_CHECK = 0x2
-};
+static uint32_t thread_default_quantum_ms = 100;
#define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13
#define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED)
@@ -134,8 +130,16 @@ rb_thread_local_storage(VALUE thread)
return rb_ivar_get(thread, idLocals);
}
-static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
+enum SLEEP_FLAGS {
+ SLEEP_DEADLOCKABLE = 0x01,
+ SLEEP_SPURIOUS_CHECK = 0x02,
+ SLEEP_ALLOW_SPURIOUS = 0x04,
+ SLEEP_NO_CHECKINTS = 0x08,
+};
+
static void sleep_forever(rb_thread_t *th, unsigned int fl);
+static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
+
static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end);
static int rb_threadptr_dead(rb_thread_t *th);
static void rb_check_deadlock(rb_ractor_t *r);
@@ -143,18 +147,10 @@ static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
static const char *thread_status_name(rb_thread_t *th, int detail);
static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
-static int consume_communication_pipe(int fd);
-static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
-
-#define eKillSignal INT2FIX(0)
-#define eTerminateSignal INT2FIX(1)
-static volatile int system_working = 1;
+MAYBE_UNUSED(static int consume_communication_pipe(int fd));
-struct waiting_fd {
- struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
- rb_thread_t *th;
- int fd;
-};
+static rb_atomic_t system_working = 1;
+static rb_internal_thread_specific_key_t specific_key_count;
/********************************************************************************/
@@ -174,11 +170,11 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
#define THREAD_BLOCKING_BEGIN(th) do { \
struct rb_thread_sched * const sched = TH_SCHED(th); \
RB_VM_SAVE_MACHINE_CONTEXT(th); \
- thread_sched_to_waiting(sched);
+ thread_sched_to_waiting((sched), (th));
#define THREAD_BLOCKING_END(th) \
- thread_sched_to_running(sched, th); \
- rb_ractor_thread_switch(th->ractor, th); \
+ thread_sched_to_running((sched), (th)); \
+ rb_ractor_thread_switch(th->ractor, th, false); \
} while(0)
#ifdef __GNUC__
@@ -195,6 +191,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
/* always return true unless fail_if_interrupted */ \
!only_if_constant(fail_if_interrupted, TRUE)) { \
+ /* Important that this is inlined into the macro, and not part of \
+ * blocking_region_begin - see bug #20493 */ \
+ RB_VM_SAVE_MACHINE_CONTEXT(th); \
+ thread_sched_to_waiting(TH_SCHED(th), th); \
exec; \
blocking_region_end(th, &__region); \
}; \
@@ -208,6 +208,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
static inline int
vm_check_ints_blocking(rb_execution_context_t *ec)
{
+#ifdef RUBY_ASSERT_CRITICAL_SECTION
+ VM_ASSERT(ruby_assert_critical_section_entered == 0);
+#endif
+
rb_thread_t *th = rb_ec_thread_ptr(ec);
if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
@@ -217,7 +221,18 @@ vm_check_ints_blocking(rb_execution_context_t *ec)
th->pending_interrupt_queue_checked = 0;
RUBY_VM_SET_INTERRUPT(ec);
}
- return rb_threadptr_execute_interrupts(th, 1);
+
+ int result = rb_threadptr_execute_interrupts(th, 1);
+
+ // When a signal is received, we yield to the scheduler as soon as possible:
+ if (result || RUBY_VM_INTERRUPTED(ec)) {
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_yield(scheduler);
+ }
+ }
+
+ return result;
}
int
@@ -257,12 +272,8 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
}
MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)));
-
-static void
-ubf_sigwait(void *ignore)
-{
- rb_thread_wakeup_timer_thread(0);
-}
+MAYBE_UNUSED(static bool th_has_dedicated_nt(const rb_thread_t *th));
+MAYBE_UNUSED(static int waitfd_to_waiting_flag(int wfd_event));
#include THREAD_IMPL_SRC
@@ -340,9 +351,11 @@ unblock_function_clear(rb_thread_t *th)
}
static void
-rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
+threadptr_set_interrupt_locked(rb_thread_t *th, bool trap)
{
- rb_native_mutex_lock(&th->interrupt_lock);
+ // th->interrupt_lock should be acquired here
+
+ RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
if (trap) {
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
@@ -350,25 +363,37 @@ rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
else {
RUBY_VM_SET_INTERRUPT(th->ec);
}
+
if (th->unblock.func != NULL) {
(th->unblock.func)(th->unblock.arg);
}
else {
/* none */
}
+}
+
+static void
+threadptr_set_interrupt(rb_thread_t *th, int trap)
+{
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ threadptr_set_interrupt_locked(th, trap);
+ }
rb_native_mutex_unlock(&th->interrupt_lock);
}
+/* Set interrupt flag on another thread or current thread, and call its UBF if it has one set */
void
rb_threadptr_interrupt(rb_thread_t *th)
{
- rb_threadptr_interrupt_common(th, 0);
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+ threadptr_set_interrupt(th, false);
}
static void
threadptr_trap_interrupt(rb_thread_t *th)
{
- rb_threadptr_interrupt_common(th, 1);
+ threadptr_set_interrupt(th, true);
}
static void
@@ -380,7 +405,7 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
if (th != main_thread) {
RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
- rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
+ rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED);
rb_threadptr_interrupt(th);
RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
@@ -409,11 +434,12 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
rb_threadptr_interrupt(target_thread);
switch (target_thread->status) {
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- target_thread->status = THREAD_RUNNABLE;
- default:
- break;
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ target_thread->status = THREAD_RUNNABLE;
+ break;
+ default:
+ break;
}
}
}
@@ -427,8 +453,8 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
th->keeping_mutexes = mutex->next_mutex;
// rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
-
- const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
+ VM_ASSERT(mutex->ec_serial);
+ const char *error_message = rb_mutex_unlock_th(mutex, th, 0);
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
}
}
@@ -482,6 +508,7 @@ rb_thread_terminate_all(rb_thread_t *th)
}
void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
+static void threadptr_interrupt_exec_cleanup(rb_thread_t *th);
static void
thread_cleanup_func_before_exec(void *th_ptr)
@@ -492,6 +519,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
// The thread stack doesn't exist in the forked process:
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
+ threadptr_interrupt_exec_cleanup(th);
rb_threadptr_root_fiber_terminate(th);
}
@@ -503,29 +531,31 @@ thread_cleanup_func(void *th_ptr, int atfork)
th->locking_mutex = Qfalse;
thread_cleanup_func_before_exec(th_ptr);
- /*
- * Unfortunately, we can't release native threading resource at fork
- * because libc may have unstable locking state therefore touching
- * a threading resource may cause a deadlock.
- *
- * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe
- * with NPTL, but native_thread_destroy calls pthread_cond_destroy
- * which calls free(3), so there is a small memory leak atfork, here.
- */
- if (atfork)
+ if (atfork) {
+ native_thread_destroy_atfork(th->nt);
+ th->nt = NULL;
return;
+ }
rb_native_mutex_destroy(&th->interrupt_lock);
- native_thread_destroy(th);
+}
+
+void
+rb_thread_free_native_thread(void *th_ptr)
+{
+ rb_thread_t *th = th_ptr;
+
+ native_thread_destroy_atfork(th->nt);
+ th->nt = NULL;
}
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
static VALUE rb_thread_to_s(VALUE thread);
void
-ruby_thread_init_stack(rb_thread_t *th)
+ruby_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame)
{
- native_thread_init_stack(th);
+ native_thread_init_stack(th, local_in_parent_frame);
}
const VALUE *
@@ -563,6 +593,8 @@ thread_do_start_proc(rb_thread_t *th)
if (th->invoke_type == thread_invoke_type_ractor_proc) {
VALUE self = rb_ractor_self(th->ractor);
+ th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
+
VM_ASSERT(FIXNUM_P(args));
args_len = FIX2INT(args);
args_ptr = ALLOCA_N(VALUE, args_len);
@@ -581,7 +613,7 @@ thread_do_start_proc(rb_thread_t *th)
if (args_len < 8) {
/* free proc.args if the length is enough small */
args_ptr = ALLOCA_N(VALUE, args_len);
- MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len);
+ MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR(args), VALUE, args_len);
th->invoke_arg.proc.args = Qnil;
}
else {
@@ -599,14 +631,12 @@ thread_do_start_proc(rb_thread_t *th)
}
}
-static void
+static VALUE
thread_do_start(rb_thread_t *th)
{
native_set_thread_name(th);
VALUE result = Qundef;
- EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
-
switch (th->invoke_type) {
case thread_invoke_type_proc:
result = thread_do_start_proc(th);
@@ -625,11 +655,7 @@ thread_do_start(rb_thread_t *th)
rb_bug("unreachable");
}
- rb_fiber_scheduler_set(Qnil);
-
- th->value = result;
-
- EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
+ return result;
}
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
@@ -637,21 +663,12 @@ void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
static int
thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{
- STACK_GROW_DIR_DETECTION;
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+ VM_ASSERT(th != th->vm->ractor.main_thread);
+
enum ruby_tag_type state;
VALUE errinfo = Qnil;
- size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_thread_t *ractor_main_th = th->ractor->threads.main;
- VALUE * vm_stack = NULL;
-
- VM_ASSERT(th != th->vm->ractor.main_thread);
- RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
-
- // setup native thread
- thread_sched_to_running(TH_SCHED(th), th);
- ruby_thread_set_native(th);
-
- RUBY_DEBUG_LOG("got lock. th:%u", rb_th_serial(th));
// setup ractor
if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
@@ -666,24 +683,33 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
RB_VM_UNLOCK();
}
- // This assertion is not passed on win32 env. Check it later.
- // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
-
- // setup VM and machine stack
- vm_stack = alloca(size * sizeof(VALUE));
- VM_ASSERT(vm_stack);
-
- rb_ec_initialize_vm_stack(th->ec, vm_stack, size);
- th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
- th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
-
// Ensure that we are not joinable.
VM_ASSERT(UNDEF_P(th->value));
+ int fiber_scheduler_closed = 0, event_thread_end_hooked = 0;
+ VALUE result = Qundef;
+
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- SAVE_ROOT_JMPBUF(th, thread_do_start(th));
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
+
+ result = thread_do_start(th);
+ }
+
+ if (!fiber_scheduler_closed) {
+ fiber_scheduler_closed = 1;
+ rb_fiber_scheduler_set(Qnil);
+ }
+
+ if (!event_thread_end_hooked) {
+ event_thread_end_hooked = 1;
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
+ }
+
+ if (state == TAG_NONE) {
+ // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed.
+ th->value = result;
}
else {
errinfo = th->ec->errinfo;
@@ -698,6 +724,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
/* fatal error within this thread, need to stop whole script */
}
else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
+
/* exit on main_thread. */
}
else {
@@ -773,12 +803,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
// after rb_ractor_living_threads_remove()
// GC will happen anytime and this ractor can be collected (and destroy GVL).
// So gvl_release() should be before it.
- thread_sched_to_dead(TH_SCHED(th));
+ thread_sched_to_dead(TH_SCHED(th), th);
rb_ractor_living_threads_remove(th->ractor, th);
}
else {
rb_ractor_living_threads_remove(th->ractor, th);
- thread_sched_to_dead(TH_SCHED(th));
+ thread_sched_to_dead(TH_SCHED(th), th);
}
return 0;
@@ -798,6 +828,8 @@ struct thread_create_params {
VALUE (*fn)(void *);
};
+static void thread_specific_storage_alloc(rb_thread_t *th);
+
static VALUE
thread_create_core(VALUE thval, struct thread_create_params *params)
{
@@ -805,6 +837,8 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
int err;
+ thread_specific_storage_alloc(th);
+
if (OBJ_FROZEN(current_th->thgroup)) {
rb_raise(rb_eThreadError,
"can't start a new thread (frozen ThreadGroup)");
@@ -826,8 +860,9 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
#endif
th->invoke_type = thread_invoke_type_ractor_proc;
th->ractor = params->g;
+ th->ec->ractor_id = rb_ractor_id(th->ractor);
th->ractor->threads.main = th;
- th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc);
+ th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil);
th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
rb_ractor_send_parameters(ec, params->g, params->args);
@@ -870,10 +905,10 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
#define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none)
/*
- * call-seq:
- * Thread.new { ... } -> thread
- * Thread.new(*args, &proc) -> thread
- * Thread.new(*args) { |args| ... } -> thread
+ * call-seq:
+ * Thread.new { ... } -> thread
+ * Thread.new(*args, &proc) -> thread
+ * Thread.new(*args) { |args| ... } -> thread
*
* Creates a new thread executing the given block.
*
@@ -902,7 +937,7 @@ thread_s_new(int argc, VALUE *argv, VALUE klass)
rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
th = rb_thread_ptr(thread);
if (!threadptr_initialized(th)) {
- rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'",
+ rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'",
klass);
}
return thread;
@@ -982,11 +1017,11 @@ rb_thread_create(VALUE (*fn)(void *), void *arg)
}
VALUE
-rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
+rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
{
struct thread_create_params params = {
.type = thread_invoke_type_ractor_proc,
- .g = g,
+ .g = r,
.args = args,
.proc = proc,
};
@@ -1041,31 +1076,37 @@ thread_join_sleep(VALUE arg)
}
while (!thread_finished(target_th)) {
- VALUE scheduler = rb_fiber_scheduler_current();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
- if (scheduler != Qnil) {
- rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout);
- }
- else if (!limit) {
- th->status = THREAD_STOPPED_FOREVER;
- rb_ractor_sleeper_threads_inc(th->ractor);
- rb_check_deadlock(th->ractor);
- native_sleep(th, 0);
- rb_ractor_sleeper_threads_dec(th->ractor);
+ if (!limit) {
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_block(scheduler, target_th->self, Qnil);
+ }
+ else {
+ sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS);
+ }
}
else {
if (hrtime_update_expire(limit, end)) {
RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th));
return Qfalse;
}
- th->status = THREAD_STOPPED;
- native_sleep(th, limit);
+
+ if (scheduler != Qnil) {
+ VALUE timeout = rb_float_new(hrtime2double(*limit));
+ rb_fiber_scheduler_block(scheduler, target_th->self, timeout);
+ }
+ else {
+ th->status = THREAD_STOPPED;
+ native_sleep(th, limit);
+ }
}
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
th->status = THREAD_RUNNABLE;
RUBY_DEBUG_LOG("interrupted target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
}
+
return Qtrue;
}
@@ -1117,6 +1158,10 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
/* OK. killed. */
break;
default:
+ if (err == RUBY_FATAL_FIBER_KILLED) { // not integer constant so can't be a case expression
+ // root fiber killed in non-main thread
+ break;
+ }
rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
}
}
@@ -1253,32 +1298,6 @@ rb_hrtime_now(void)
return rb_timespec2hrtime(&ts);
}
-static void
-sleep_forever(rb_thread_t *th, unsigned int fl)
-{
- enum rb_thread_status prev_status = th->status;
- enum rb_thread_status status;
- int woke;
-
- status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
- th->status = status;
- RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
- while (th->status == status) {
- if (fl & SLEEP_DEADLOCKABLE) {
- rb_ractor_sleeper_threads_inc(th->ractor);
- rb_check_deadlock(th->ractor);
- }
- native_sleep(th, 0);
- if (fl & SLEEP_DEADLOCKABLE) {
- rb_ractor_sleeper_threads_dec(th->ractor);
- }
- woke = vm_check_ints_blocking(th->ec);
- if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
- break;
- }
- th->status = prev_status;
-}
-
/*
* at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end"
* being uninitialized, maybe other versions, too.
@@ -1353,34 +1372,71 @@ sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl)
return woke;
}
+static void
+sleep_forever(rb_thread_t *th, unsigned int fl)
+{
+ enum rb_thread_status prev_status = th->status;
+ enum rb_thread_status status;
+ int woke;
+
+ status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
+ th->status = status;
+
+ if (!(fl & SLEEP_NO_CHECKINTS)) RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+
+ while (th->status == status) {
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
+ }
+ {
+ native_sleep(th, 0);
+ }
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_dec(th->ractor);
+ }
+ if (fl & SLEEP_ALLOW_SPURIOUS) {
+ break;
+ }
+
+ woke = vm_check_ints_blocking(th->ec);
+
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) {
+ break;
+ }
+ }
+ th->status = prev_status;
+}
+
void
rb_thread_sleep_forever(void)
{
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("forever");
sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
}
void
rb_thread_sleep_deadly(void)
{
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("deadly");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
}
static void
rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
{
- VALUE scheduler = rb_fiber_scheduler_current();
+ rb_thread_t *th = GET_THREAD();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
if (scheduler != Qnil) {
rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("...");
if (end) {
- sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
+ sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK);
}
else {
- sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ sleep_forever(th, SLEEP_DEADLOCKABLE);
}
}
}
@@ -1393,6 +1449,12 @@ rb_thread_wait_for(struct timeval time)
sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK);
}
+void
+rb_ec_check_ints(rb_execution_context_t *ec)
+{
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+}
+
/*
* CAUTION: This function causes thread switching.
* rb_thread_check_ints() check ruby's interrupts.
@@ -1403,7 +1465,7 @@ rb_thread_wait_for(struct timeval time)
void
rb_thread_check_ints(void)
{
- RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
+ rb_ec_check_ints(GET_EC());
}
/*
@@ -1441,7 +1503,7 @@ rb_thread_schedule_limits(uint32_t limits_us)
RB_VM_SAVE_MACHINE_CONTEXT(th);
thread_sched_yield(TH_SCHED(th), th);
- rb_ractor_thread_switch(th->ractor, th);
+ rb_ractor_thread_switch(th->ractor, th, true);
RUBY_DEBUG_LOG("switch %s", "done");
}
@@ -1461,7 +1523,7 @@ static inline int
blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
{
-#ifdef RUBY_VM_CRITICAL_SECTION
+#ifdef RUBY_ASSERT_CRITICAL_SECTION
VM_ASSERT(ruby_assert_critical_section_entered == 0);
#endif
VM_ASSERT(th == GET_THREAD());
@@ -1472,10 +1534,7 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
th->status = THREAD_STOPPED;
rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
- RUBY_DEBUG_LOG("");
-
- RB_VM_SAVE_MACHINE_CONTEXT(th);
- thread_sched_to_waiting(TH_SCHED(th));
+ RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id);
return TRUE;
}
else {
@@ -1492,7 +1551,7 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
unregister_ubf_list(th);
thread_sched_to_running(TH_SCHED(th), th);
- rb_ractor_thread_switch(th->ractor, th);
+ rb_ractor_thread_switch(th->ractor, th, false);
th->blocking_region_buffer = 0;
rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
@@ -1500,8 +1559,35 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
th->status = region->prev_status;
}
- RUBY_DEBUG_LOG("");
+ RUBY_DEBUG_LOG("end");
+
+#ifndef _WIN32
+ // GET_THREAD() clears WSAGetLastError()
VM_ASSERT(th == GET_THREAD());
+#endif
+}
+
+/*
+ * Resolve sentinel unblock function values to their actual function pointers
+ * and appropriate data2 values. This centralizes the logic for handling
+ * RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values.
+ *
+ * @param unblock_function Pointer to unblock function pointer (modified in place)
+ * @param data2 Pointer to data2 pointer (modified in place)
+ * @param thread Thread context for resolving data2 when needed
+ * @return true if sentinel values were resolved, false otherwise
+ */
+bool
+rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread)
+{
+ rb_unblock_function_t *ubf = *unblock_function;
+
+ if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
+ *unblock_function = ubf_select;
+ *data2 = thread;
+ return true;
+ }
+ return false;
}
void *
@@ -1509,31 +1595,41 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2,
int flags)
{
+ if (flags & RB_NOGVL_OFFLOAD_SAFE) {
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ struct rb_fiber_scheduler_blocking_operation_state state = {0};
+
+ VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
+
+ if (!UNDEF_P(result)) {
+ rb_errno_set(state.saved_errno);
+ return state.result;
+ }
+ }
+ }
+
void *val = 0;
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
rb_vm_t *vm = rb_ec_vm_ptr(ec);
bool is_main_thread = vm->ractor.main_thread == th;
int saved_errno = 0;
- VALUE ubf_th = Qfalse;
- if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
- ubf = ubf_select;
- data2 = th;
- }
- else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
+ rb_thread_resolve_unblock_function(&ubf, &data2, th);
+
+ if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
vm->ubf_async_safe = 1;
}
- else {
- ubf_th = rb_thread_start_unblock_thread();
- }
}
+ rb_vm_t *volatile saved_vm = vm;
BLOCKING_REGION(th, {
val = func(data1);
- saved_errno = errno;
+ saved_errno = rb_errno();
}, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
+ vm = saved_vm;
if (is_main_thread) vm->ubf_async_safe = 0;
@@ -1541,11 +1637,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
- if (ubf_th != Qfalse) {
- thread_value(rb_thread_kill(ubf_th));
- }
-
- errno = saved_errno;
+ rb_errno_set(saved_errno);
return val;
}
@@ -1649,18 +1741,231 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
return rb_nogvl(func, data1, ubf, data2, 0);
}
+static int
+waitfd_to_waiting_flag(int wfd_event)
+{
+ return wfd_event << 1;
+}
+
+static struct ccan_list_head *
+rb_io_blocking_operations(struct rb_io *io)
+{
+ rb_serial_t fork_generation = GET_VM()->fork_gen;
+
+ // On fork, all existing entries in this list (which are stack allocated) become invalid.
+ // Therefore, we re-initialize the list which clears it.
+ if (io->fork_generation != fork_generation) {
+ ccan_list_head_init(&io->blocking_operations);
+ io->fork_generation = fork_generation;
+ }
+
+ return &io->blocking_operations;
+}
+
+/*
+ * Registers a blocking operation for an IO object. This is used to track all threads and fibers
+ * that are currently blocked on this IO for reading, writing or other operations.
+ *
+ * When the IO is closed, all blocking operations will be notified via rb_fiber_scheduler_fiber_interrupt
+ * for fibers with a scheduler, or via rb_threadptr_interrupt for threads without a scheduler.
+ *
+ * @parameter io The IO object on which the operation will block
+ * @parameter blocking_operation The operation details including the execution context that will be blocked
+ */
+static void
+rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
+}
+
+static void
+rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ ccan_list_del(&blocking_operation->list);
+}
+
+struct io_blocking_operation_arguments {
+ struct rb_io *io;
+ struct rb_io_blocking_operation *blocking_operation;
+};
+
+static VALUE
+io_blocking_operation_exit(VALUE _arguments)
+{
+ struct io_blocking_operation_arguments *arguments = (void*)_arguments;
+ struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
+
+ rb_io_blocking_operation_pop(arguments->io, blocking_operation);
+
+ rb_io_t *io = arguments->io;
+ rb_thread_t *thread = io->closing_ec->thread_ptr;
+ rb_fiber_t *fiber = io->closing_ec->fiber_ptr;
+
+ if (thread->scheduler != Qnil) {
+ // This can cause spurious wakeups...
+ rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber));
+ }
+ else {
+ rb_thread_wakeup(thread->self);
+ }
+
+ return Qnil;
+}
+
+/*
+ * Called when a blocking operation completes or is interrupted. Removes the operation from
+ * the IO's blocking_operations list and wakes up any waiting threads/fibers.
+ *
+ * If there's a wakeup_mutex (meaning an IO close is in progress), synchronizes the cleanup
+ * through that mutex to ensure proper coordination with the closing thread.
+ *
+ * @parameter io The IO object the operation was performed on
+ * @parameter blocking_operation The completed operation to clean up
+ */
+static void
+rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ VALUE wakeup_mutex = io->wakeup_mutex;
+
+ // Indicate that the blocking operation is no longer active:
+ blocking_operation->ec = NULL;
+
+ if (RB_TEST(wakeup_mutex)) {
+ struct io_blocking_operation_arguments arguments = {
+ .io = io,
+ .blocking_operation = blocking_operation
+ };
+
+ rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
+ }
+ else {
+ // If there's no wakeup_mutex, we can safely remove the operation directly:
+ rb_io_blocking_operation_pop(io, blocking_operation);
+ }
+}
+
+static VALUE
+rb_thread_io_blocking_operation_ensure(VALUE _argument)
+{
+ struct io_blocking_operation_arguments *arguments = (void*)_argument;
+
+ rb_io_blocking_operation_exit(arguments->io, arguments->blocking_operation);
+
+ return Qnil;
+}
+
+/*
+ * Executes a function that performs a blocking IO operation, while properly tracking
+ * the operation in the IO's blocking_operations list. This ensures proper cleanup
+ * and interruption handling if the IO is closed while blocked.
+ *
+ * The operation is automatically removed from the blocking_operations list when the function
+ * returns, whether normally or due to an exception.
+ *
+ * @parameter self The IO object
+ * @parameter function The function to execute that will perform the blocking operation
+ * @parameter argument The argument to pass to the function
+ * @returns The result of the blocking operation function
+ */
VALUE
-rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
+{
+ struct rb_io *io;
+ RB_IO_POINTER(self, io);
+
+ rb_execution_context_t *ec = GET_EC();
+ struct rb_io_blocking_operation blocking_operation = {
+ .ec = ec,
+ };
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+
+ struct io_blocking_operation_arguments io_blocking_operation_arguments = {
+ .io = io,
+ .blocking_operation = &blocking_operation
+ };
+
+ return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments);
+}
+
+static bool
+thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking;
+#else
+ return false;
+#endif
+}
+
+// true if need retry
+static bool
+thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ if (thread_io_mn_schedulable(th, events, timeout)) {
+ rb_hrtime_t rel, *prel;
+
+ if (timeout) {
+ rel = rb_timeval2hrtime(timeout);
+ prel = &rel;
+ }
+ else {
+ prel = NULL;
+ }
+
+ VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT)));
+
+ if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
+ // timeout
+ return false;
+ }
+ else {
+ return true;
+ }
+ }
+#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
+ return false;
+}
+
+// assume read/write
+static bool
+blocking_call_retryable_p(int r, int eno)
+{
+ if (r != -1) return false;
+
+ switch (eno) {
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool
+rb_thread_mn_schedulable(VALUE thval)
+{
+ rb_thread_t *th = rb_thread_ptr(thval);
+ return th->mn_schedulable;
+}
+
+VALUE
+rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events)
{
- volatile VALUE val = Qundef; /* shouldn't be used */
rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
+
+ RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events);
+
+ volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
enum ruby_tag_type state;
+ volatile bool prev_mn_schedulable = th->mn_schedulable;
+ th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL);
- struct waiting_fd waiting_fd = {
- .fd = fd,
- .th = rb_ec_thread_ptr(ec)
- };
+ int fd = io->fd;
// `errno` is only valid when there is an actual error - but we can't
// extract that from the return value of `func` alone, so we clear any
@@ -1668,36 +1973,44 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
// `func` or not (as opposed to some previously set value).
errno = 0;
- RB_VM_LOCK_ENTER();
+ struct rb_io_blocking_operation blocking_operation = {
+ .ec = ec,
+ };
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+
{
- ccan_list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */
+ retry:
+ BLOCKING_REGION(th, {
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, th, FALSE);
+
+ RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
+ if (events &&
+ blocking_call_retryable_p((int)val, saved_errno) &&
+ thread_io_wait_events(th, fd, events, NULL)) {
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ goto retry;
+ }
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- BLOCKING_REGION(waiting_fd.th, {
- val = func(data1);
- saved_errno = errno;
- }, ubf_select, waiting_fd.th, FALSE);
- }
- EC_POP_TAG();
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
- /*
- * must be deleted before jump
- * this will delete either from waiting_fds or on-stack CCAN_LIST_HEAD(busy)
- */
- RB_VM_LOCK_ENTER();
- {
- ccan_list_del(&waiting_fd.wfd_node);
+ state = saved_state;
+ }
+ EC_POP_TAG();
+
+ th = rb_ec_thread_ptr(ec);
+ th->mn_schedulable = prev_mn_schedulable;
}
- RB_VM_LOCK_LEAVE();
+
+ rb_io_blocking_operation_exit(io, &blocking_operation);
if (state) {
EC_JUMP_TAG(ec, state);
}
- /* TODO: check func() */
- RUBY_VM_CHECK_INTS_BLOCKING(ec);
// If the error was a timeout, we raise a specific exception for that:
if (saved_errno == ETIMEDOUT) {
@@ -1709,6 +2022,12 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
return val;
}
+VALUE
+rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1)
+{
+ return rb_thread_io_blocking_call(io, func, data1, 0);
+}
+
/*
* rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
*
@@ -1736,6 +2055,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
* created as Ruby thread (created by Thread.new or so). In other
* words, this function *DOES NOT* associate or convert a NON-Ruby
* thread to a Ruby thread.
+ *
+ * NOTE: If this thread has already acquired the GVL, then the method call
+ * is performed without acquiring or releasing the GVL (from Ruby 4.0).
*/
void *
rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
@@ -1759,7 +2081,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
prev_unblock = th->unblock;
if (brb == 0) {
- rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
+ /* the GVL is already acquired, call method directly */
+ return (*func)(data1);
}
blocking_region_end(th, brb);
@@ -1768,16 +2091,13 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
/* leave from Ruby world: You can not access Ruby values, etc. */
int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
RUBY_ASSERT_ALWAYS(released);
+ RB_VM_SAVE_MACHINE_CONTEXT(th);
+ thread_sched_to_waiting(TH_SCHED(th), th);
return r;
}
/*
* ruby_thread_has_gvl_p - check if current native thread has GVL.
- *
- ***
- *** This API is EXPERIMENTAL!
- *** We do not guarantee that this API remains in ruby 1.9.2 or later.
- ***
*/
int
@@ -1855,6 +2175,23 @@ enum handle_interrupt_timing {
};
static enum handle_interrupt_timing
+rb_threadptr_pending_interrupt_from_symbol(rb_thread_t *th, VALUE sym)
+{
+ if (sym == sym_immediate) {
+ return INTERRUPT_IMMEDIATE;
+ }
+ else if (sym == sym_on_blocking) {
+ return INTERRUPT_ON_BLOCKING;
+ }
+ else if (sym == sym_never) {
+ return INTERRUPT_NEVER;
+ }
+ else {
+ rb_raise(rb_eThreadError, "unknown mask signature");
+ }
+}
+
+static enum handle_interrupt_timing
rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
{
VALUE mask;
@@ -1866,6 +2203,16 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
for (i=0; i<mask_stack_len; i++) {
mask = mask_stack[mask_stack_len-(i+1)];
+ if (SYMBOL_P(mask)) {
+ /* do not match RUBY_FATAL_THREAD_KILLED etc */
+ if (err != rb_cInteger) {
+ return rb_threadptr_pending_interrupt_from_symbol(th, mask);
+ }
+ else {
+ continue;
+ }
+ }
+
for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
VALUE klass = mod;
VALUE sym;
@@ -1878,18 +2225,7 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
}
if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
- if (sym == sym_immediate) {
- return INTERRUPT_IMMEDIATE;
- }
- else if (sym == sym_on_blocking) {
- return INTERRUPT_ON_BLOCKING;
- }
- else if (sym == sym_never) {
- return INTERRUPT_NEVER;
- }
- else {
- rb_raise(rb_eThreadError, "unknown mask signature");
- }
+ return rb_threadptr_pending_interrupt_from_symbol(th, sym);
}
}
/* try to next mask */
@@ -1981,10 +2317,24 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
rb_raise(rb_eArgError, "unknown mask signature");
}
- if (!*maskp) {
- *maskp = rb_ident_hash_new();
+ if (key == rb_eException && (UNDEF_P(*maskp) || NIL_P(*maskp))) {
+ *maskp = val;
+ return ST_CONTINUE;
+ }
+
+ if (RTEST(*maskp)) {
+ if (!RB_TYPE_P(*maskp, T_HASH)) {
+ VALUE prev = *maskp;
+ *maskp = rb_ident_hash_new();
+ if (SYMBOL_P(prev)) {
+ rb_hash_aset(*maskp, rb_eException, prev);
+ }
+ }
+ rb_hash_aset(*maskp, key, val);
+ }
+ else {
+ *maskp = Qfalse;
}
- rb_hash_aset(*maskp, key, val);
return ST_CONTINUE;
}
@@ -2051,30 +2401,6 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
* resource allocation code. Then, the ensure block is where we can safely
* deallocate your resources.
*
- * ==== Guarding from Timeout::Error
- *
- * In the next example, we will guard from the Timeout::Error exception. This
- * will help prevent from leaking resources when Timeout::Error exceptions occur
- * during normal ensure clause. For this example we use the help of the
- * standard library Timeout, from lib/timeout.rb
- *
- * require 'timeout'
- * Thread.handle_interrupt(Timeout::Error => :never) {
- * timeout(10){
- * # Timeout::Error doesn't occur here
- * Thread.handle_interrupt(Timeout::Error => :on_blocking) {
- * # possible to be killed by Timeout::Error
- * # while blocking operation
- * }
- * # Timeout::Error doesn't occur here
- * }
- * }
- *
- * In the first part of the +timeout+ block, we can rely on Timeout::Error being
- * ignored. Then in the <code>Timeout::Error => :on_blocking</code> block, any
- * operation that will block the calling thread is susceptible to a
- * Timeout::Error exception being raised.
- *
* ==== Stack control settings
*
* It's possible to stack multiple levels of ::handle_interrupt blocks in order
@@ -2100,7 +2426,7 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
static VALUE
rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
{
- VALUE mask;
+ VALUE mask = Qundef;
rb_execution_context_t * volatile ec = GET_EC();
rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
volatile VALUE r = Qnil;
@@ -2110,13 +2436,25 @@ rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
rb_raise(rb_eArgError, "block is needed.");
}
- mask = 0;
mask_arg = rb_to_hash_type(mask_arg);
+
+ if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) {
+ mask = Qnil;
+ }
+
rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
- if (!mask) {
+
+ if (UNDEF_P(mask)) {
return rb_yield(Qnil);
}
- OBJ_FREEZE_RAW(mask);
+
+ if (!RTEST(mask)) {
+ mask = mask_arg;
+ }
+ else if (RB_TYPE_P(mask, T_HASH)) {
+ OBJ_FREEZE(mask);
+ }
+
rb_ary_push(th->pending_interrupt_mask_stack, mask);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
th->pending_interrupt_queue_checked = 0;
@@ -2245,6 +2583,7 @@ NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
static void
rb_threadptr_to_kill(rb_thread_t *th)
{
+ VM_ASSERT(GET_THREAD() == th);
rb_threadptr_pending_interrupt_clear(th);
th->status = THREAD_RUNNABLE;
th->to_kill = 1;
@@ -2259,13 +2598,21 @@ threadptr_get_interrupts(rb_thread_t *th)
rb_atomic_t interrupt;
rb_atomic_t old;
+ old = ATOMIC_LOAD_RELAXED(ec->interrupt_flag);
do {
- interrupt = ec->interrupt_flag;
+ interrupt = old;
old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
} while (old != interrupt);
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
}
+static void threadptr_interrupt_exec_exec(rb_thread_t *th);
+
+// Execute interrupts on currently running thread
+// In certain situations, calling this function will raise an exception. Some examples are:
+// * during VM shutdown (`rb_ractor_terminate_all`)
+// * Call to Thread#exit for current thread (`rb_thread_kill`)
+// * Call to Thread#raise for current thread
int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
@@ -2273,6 +2620,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
int postponed_job_interrupt = 0;
int ret = FALSE;
+ VM_ASSERT(GET_THREAD() == th);
+
if (th->ec->raised_flag) return ret;
while ((interrupt = threadptr_get_interrupts(th)) != 0) {
@@ -2289,41 +2638,49 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
- RB_VM_LOCK_ENTER();
- RB_VM_LOCK_LEAVE();
+ RB_VM_LOCKING();
}
if (postponed_job_interrupt) {
rb_postponed_job_flush(th->vm);
}
- /* signal handling */
- if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
- enum rb_thread_status prev_status = th->status;
- int sigwait_fd = rb_sigwait_fd_get(th);
+ if (trap_interrupt) {
+ /* signal handling */
+ if (th == th->vm->ractor.main_thread) {
+ enum rb_thread_status prev_status = th->status;
- if (sigwait_fd >= 0) {
- (void)consume_communication_pipe(sigwait_fd);
- rb_sigwait_fd_put(th, sigwait_fd);
+ th->status = THREAD_RUNNABLE;
+ {
+ while ((sig = rb_get_next_signal()) != 0) {
+ ret |= rb_signal_exec(th, sig);
+ }
+ }
+ th->status = prev_status;
}
- th->status = THREAD_RUNNABLE;
- while ((sig = rb_get_next_signal()) != 0) {
- ret |= rb_signal_exec(th, sig);
+
+ if (!ccan_list_empty(&th->interrupt_exec_tasks)) {
+ enum rb_thread_status prev_status = th->status;
+
+ th->status = THREAD_RUNNABLE;
+ {
+ threadptr_interrupt_exec_exec(th);
+ }
+ th->status = prev_status;
}
- th->status = prev_status;
}
/* exception from another thread */
if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
- RUBY_DEBUG_LOG("err:%"PRIdVALUE"\n", err);
+ RUBY_DEBUG_LOG("err:%"PRIdVALUE, err);
ret = TRUE;
if (UNDEF_P(err)) {
/* no error */
}
- else if (err == eKillSignal /* Thread#kill received */ ||
- err == eTerminateSignal /* Terminate thread */ ||
+ else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ ||
+ err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ ||
err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
terminate_interrupt = 1;
}
@@ -2345,7 +2702,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
}
if (timer_interrupt) {
- uint32_t limits_us = TIME_QUANTUM_USEC;
+ uint32_t limits_us = thread_default_quantum_ms * 1000;
if (th->priority > 0)
limits_us <<= th->priority;
@@ -2353,7 +2710,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
limits_us >>= -th->priority;
if (th->status == THREAD_RUNNABLE)
- th->running_time_us += TIME_QUANTUM_USEC;
+ th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro
VM_ASSERT(th->ec->cfp);
EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
@@ -2380,18 +2737,11 @@ rb_threadptr_ready(rb_thread_t *th)
static VALUE
rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
{
- VALUE exc;
-
if (rb_threadptr_dead(target_th)) {
return Qnil;
}
- if (argc == 0) {
- exc = rb_exc_new(rb_eRuntimeError, 0, 0);
- }
- else {
- exc = rb_make_exception(argc, argv);
- }
+ VALUE exception = rb_exception_setup(argc, argv);
/* making an exception object can switch thread,
so we need to check thread deadness again */
@@ -2399,9 +2749,9 @@ rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
return Qnil;
}
- rb_ec_setup_exception(GET_EC(), exc, Qundef);
- rb_threadptr_pending_interrupt_enque(target_th, exc);
+ rb_threadptr_pending_interrupt_enque(target_th, exception);
rb_threadptr_interrupt(target_th);
+
return Qnil;
}
@@ -2447,52 +2797,122 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
return 1;
}
-int
-rb_notify_fd_close(int fd, struct ccan_list_head *busy)
+/*
+ * Thread-safe IO closing mechanism.
+ *
+ * When an IO is closed while other threads or fibers are blocked on it, we need to:
+ * 1. Track and notify all blocking operations through io->blocking_operations
+ * 2. Ensure only one thread can close at a time using io->closing_ec
+ * 3. Synchronize cleanup using wakeup_mutex
+ *
+ * The close process works as follows:
+ * - First check if any thread is already closing (io->closing_ec)
+ * - Set up wakeup_mutex for synchronization
+ * - Iterate through all blocking operations in io->blocking_operations
+ * - For each blocked fiber with a scheduler:
+ * - Notify via rb_fiber_scheduler_fiber_interrupt
+ * - For each blocked thread without a scheduler:
+ * - Enqueue IOError via rb_threadptr_pending_interrupt_enque
+ * - Wake via rb_threadptr_interrupt
+ * - Wait on wakeup_mutex until all operations are cleaned up
+ * - Only then clear closing state and allow actual close to proceed
+ */
+static VALUE
+thread_io_close_notify_all(VALUE _io)
{
- rb_vm_t *vm = GET_THREAD()->vm;
- struct waiting_fd *wfd = 0, *next;
+ struct rb_io *io = (struct rb_io *)_io;
- RB_VM_LOCK_ENTER();
- {
- ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
- if (wfd->fd == fd) {
- rb_thread_t *th = wfd->th;
- VALUE err;
+ size_t count = 0;
+ rb_vm_t *vm = io->closing_ec->thread_ptr->vm;
+ VALUE error = vm->special_exceptions[ruby_error_stream_closed];
+
+ struct rb_io_blocking_operation *blocking_operation;
+ ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
+ rb_execution_context_t *ec = blocking_operation->ec;
- ccan_list_del(&wfd->wfd_node);
- ccan_list_add(busy, &wfd->wfd_node);
+ // If the operation is in progress, we need to interrupt it:
+ if (ec) {
+ rb_thread_t *thread = ec->thread_ptr;
+
+ VALUE result = RUBY_Qundef;
+ if (thread->scheduler != Qnil) {
+ result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
+ }
- err = th->vm->special_exceptions[ruby_error_stream_closed];
- rb_threadptr_pending_interrupt_enque(th, err);
- rb_threadptr_interrupt(th);
+ if (result == RUBY_Qundef) {
+ // If the thread is not the current thread, we need to enqueue an error:
+ rb_threadptr_pending_interrupt_enque(thread, error);
+ rb_threadptr_interrupt(thread);
}
}
+
+ count += 1;
+ }
+
+ return (VALUE)count;
+}
+
+size_t
+rb_thread_io_close_interrupt(struct rb_io *io)
+{
+ // We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function.
+ if (io->closing_ec) {
+ return 0;
+ }
+
+ // If there are no blocking operations, we are done:
+ if (ccan_list_empty(rb_io_blocking_operations(io))) {
+ return 0;
}
- RB_VM_LOCK_LEAVE();
- return !ccan_list_empty(busy);
+ // Otherwise, we are now closing the IO:
+ rb_execution_context_t *ec = GET_EC();
+ io->closing_ec = ec;
+
+ // This is used to ensure the correct execution context is woken up after the blocking operation is interrupted:
+ io->wakeup_mutex = rb_mutex_new();
+ rb_mutex_allow_trap(io->wakeup_mutex, 1);
+
+ // We need to use a mutex here as entering the fiber scheduler may cause a context switch:
+ VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io);
+
+ return (size_t)result;
}
void
-rb_thread_fd_close(int fd)
+rb_thread_io_close_wait(struct rb_io* io)
{
- struct ccan_list_head busy;
+ VALUE wakeup_mutex = io->wakeup_mutex;
- ccan_list_head_init(&busy);
- if (rb_notify_fd_close(fd, &busy)) {
- do rb_thread_schedule(); while (!ccan_list_empty(&busy));
+ if (!RB_TEST(wakeup_mutex)) {
+ // There was nobody else using this file when we closed it, so we never bothered to allocate a mutex:
+ return;
}
+
+ rb_mutex_lock(wakeup_mutex);
+ while (!ccan_list_empty(rb_io_blocking_operations(io))) {
+ rb_mutex_sleep(wakeup_mutex, Qnil);
+ }
+ rb_mutex_unlock(wakeup_mutex);
+
+ // We are done closing:
+ io->wakeup_mutex = Qnil;
+ io->closing_ec = NULL;
+}
+
+void
+rb_thread_fd_close(int fd)
+{
+ rb_warn("rb_thread_fd_close is deprecated (and is now a no-op).");
}
/*
* call-seq:
- * thr.raise
- * thr.raise(string)
- * thr.raise(exception [, string [, array]])
+ * raise(exception, message = exception.to_s, backtrace = nil, cause: $!)
+ * raise(message = nil, cause: $!)
*
* Raises an exception from the given thread. The caller does not have to be
- * +thr+. See Kernel#raise for more information.
+ * +thr+. See Kernel#raise for more information on arguments.
*
* Thread.abort_on_exception = true
* a = Thread.new { sleep(200) }
@@ -2531,7 +2951,10 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
*
* Terminates +thr+ and schedules another thread to be run, returning
* the terminated Thread. If this is the main thread, or the last
- * thread, exits the process.
+ * thread, exits the process. Note that the caller does not wait for
+ * the thread to terminate if the receiver is different from the currently
+ * running thread. The termination is asynchronous, and the thread can still
+ * run a small amount of ruby code before exiting.
*/
VALUE
@@ -2554,7 +2977,7 @@ rb_thread_kill(VALUE thread)
}
else {
threadptr_check_pending_interrupt_queue(target_th);
- rb_threadptr_pending_interrupt_enque(target_th, eKillSignal);
+ rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
rb_threadptr_interrupt(target_th);
}
@@ -2723,7 +3146,7 @@ VALUE
rb_thread_list(void)
{
// TODO
- return rb_ractor_thread_list(GET_RACTOR());
+ return rb_ractor_thread_list();
}
/*
@@ -3250,7 +3673,7 @@ rb_thread_setname(VALUE thread, VALUE name)
name = rb_str_new_frozen(name);
}
target_th->name = name;
- if (threadptr_initialized(target_th)) {
+ if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
native_set_another_thread_name(target_th->nt->thread_id, name);
}
return name;
@@ -3550,12 +3973,13 @@ static VALUE
rb_thread_variable_get(VALUE thread, VALUE key)
{
VALUE locals;
+ VALUE symbol = rb_to_symbol(key);
if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
return Qnil;
}
locals = rb_thread_local_storage(thread);
- return rb_hash_aref(locals, rb_to_symbol(key));
+ return rb_hash_aref(locals, symbol);
}
/*
@@ -3706,13 +4130,14 @@ static VALUE
rb_thread_variable_p(VALUE thread, VALUE key)
{
VALUE locals;
+ VALUE symbol = rb_to_symbol(key);
if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
return Qfalse;
}
locals = rb_thread_local_storage(thread);
- return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil);
+ return RBOOL(rb_hash_lookup(locals, symbol) != Qnil);
}
/*
@@ -3841,7 +4266,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
void
rb_fd_term(rb_fdset_t *fds)
{
- if (fds->fdset) xfree(fds->fdset);
+ xfree(fds->fdset);
fds->maxfd = 0;
fds->fdset = 0;
}
@@ -4008,9 +4433,10 @@ rb_fd_set(int fd, rb_fdset_t *set)
#endif
static int
-wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
+wait_retryable(volatile int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
{
- if (*result < 0) {
+ int r = *result;
+ if (r < 0) {
switch (errnum) {
case EINTR:
#ifdef ERESTART
@@ -4024,7 +4450,7 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
}
return FALSE;
}
- else if (*result == 0) {
+ else if (r == 0) {
/* check for spurious wakeup */
if (rel) {
return !hrtime_update_expire(rel, end);
@@ -4036,7 +4462,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
struct select_set {
int max;
- int sigwait_fd;
rb_thread_t *th;
rb_fdset_t *rset;
rb_fdset_t *wset;
@@ -4052,10 +4477,6 @@ select_set_free(VALUE p)
{
struct select_set *set = (struct select_set *)p;
- if (set->sigwait_fd >= 0) {
- rb_sigwait_fd_put(set->th, set->sigwait_fd);
- }
-
rb_fd_term(&set->orig_rset);
rb_fd_term(&set->orig_wset);
rb_fd_term(&set->orig_eset);
@@ -4063,33 +4484,16 @@ select_set_free(VALUE p)
return Qfalse;
}
-static const rb_hrtime_t *
-sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig,
- int *drained_p)
-{
- static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000;
-
- if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
- *drained_p = check_signals_nogvl(th, sigwait_fd);
- if (!orig || *orig > quantum)
- return &quantum;
- }
-
- return orig;
-}
-
-#define sigwait_signals_fd(result, cond, sigwait_fd) \
- (result > 0 && (cond) ? (result--, (sigwait_fd)) : -1)
-
static VALUE
do_select(VALUE p)
{
struct select_set *set = (struct select_set *)p;
- int result = 0;
+ volatile int result = 0;
int lerrno;
rb_hrtime_t *to, rel, end = 0;
timeout_prepare(&to, &rel, &end, set->timeout);
+ volatile rb_hrtime_t endtime = end;
#define restore_fdset(dst, src) \
((dst) ? rb_fd_dup(dst, src) : (void)0)
#define do_select_update() \
@@ -4099,31 +4503,23 @@ do_select(VALUE p)
TRUE)
do {
- int drained;
lerrno = 0;
BLOCKING_REGION(set->th, {
- const rb_hrtime_t *sto;
struct timeval tv;
- sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained);
if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
- result = native_fd_select(set->max, set->rset, set->wset,
- set->eset,
- rb_hrtime2timeval(&tv, sto), set->th);
+ result = native_fd_select(set->max,
+ set->rset, set->wset, set->eset,
+ rb_hrtime2timeval(&tv, to), set->th);
if (result < 0) lerrno = errno;
}
- }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE);
-
- if (set->sigwait_fd >= 0) {
- int fd = sigwait_signals_fd(result,
- rb_fd_isset(set->sigwait_fd, set->rset),
- set->sigwait_fd);
- (void)check_signals_nogvl(set->th, fd);
- }
+ }, ubf_select, set->th, TRUE);
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
- } while (wait_retryable(&result, lerrno, to, end) && do_select_update());
+ } while (wait_retryable(&result, lerrno, to, endtime) && do_select_update());
+
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec);
if (result < 0) {
errno = lerrno;
@@ -4132,18 +4528,6 @@ do_select(VALUE p)
return (VALUE)result;
}
-static rb_fdset_t *
-init_set_fd(int fd, rb_fdset_t *fds)
-{
- if (fd < 0) {
- return 0;
- }
- rb_fd_init(fds);
- rb_fd_set(fd, fds);
-
- return fds;
-}
-
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout)
@@ -4167,16 +4551,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0;
}
- set.sigwait_fd = rb_sigwait_fd_get(set.th);
- if (set.sigwait_fd >= 0) {
- if (set.rset)
- rb_fd_set(set.sigwait_fd, set.rset);
- else
- set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
- if (set.sigwait_fd >= set.max) {
- set.max = set.sigwait_fd + 1;
- }
- }
#define fd_init_copy(f) do { \
if (set.f) { \
rb_fd_resize(set.max - 1, set.f); \
@@ -4207,82 +4581,79 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
# define POLLERR_SET (0)
#endif
+static int
+wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t nfds,
+ rb_hrtime_t *const to, volatile int *lerrno)
+{
+ struct timespec ts;
+ volatile int result = 0;
+
+ *lerrno = 0;
+ BLOCKING_REGION(th, {
+ if (!RUBY_VM_INTERRUPTED(th->ec)) {
+ result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0);
+ if (result < 0) *lerrno = errno;
+ }
+ }, ubf_select, th, TRUE);
+ return result;
+}
+
/*
* returns a mask of events
*/
-int
-rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+static int
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
- struct pollfd fds[2];
- int result = 0;
- int drained;
+ struct pollfd fds[1] = {{
+ .fd = fd,
+ .events = (short)events,
+ .revents = 0,
+ }};
+ volatile int result = 0;
nfds_t nfds;
- rb_unblock_function_t *ubf;
- struct waiting_fd wfd;
- int state;
+ struct rb_io_blocking_operation blocking_operation;
+ enum ruby_tag_type state;
volatile int lerrno;
- wfd.th = GET_THREAD();
- wfd.fd = fd;
+ RUBY_ASSERT(th);
+ rb_execution_context_t *ec = th->ec;
- RB_VM_LOCK_ENTER();
- {
- ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
+ if (io) {
+ blocking_operation.ec = ec;
+ rb_io_blocking_operation_enter(io, &blocking_operation);
}
- RB_VM_LOCK_LEAVE();
- EC_PUSH_TAG(wfd.th->ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- rb_hrtime_t *to, rel, end = 0;
- RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
- timeout_prepare(&to, &rel, &end, timeout);
- fds[0].fd = fd;
- fds[0].events = (short)events;
- fds[0].revents = 0;
- do {
- fds[1].fd = rb_sigwait_fd_get(wfd.th);
-
- if (fds[1].fd >= 0) {
- fds[1].events = POLLIN;
- fds[1].revents = 0;
- nfds = 2;
- ubf = ubf_sigwait;
- }
- else {
- nfds = 1;
- ubf = ubf_select;
- }
+ if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
+ // fd is readable
+ state = 0;
+ fds[0].revents = events;
+ errno = 0;
+ }
+ else {
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ rb_hrtime_t *to, rel, end = 0;
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ timeout_prepare(&to, &rel, &end, timeout);
+ do {
+ nfds = numberof(fds);
+ result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);
- lerrno = 0;
- BLOCKING_REGION(wfd.th, {
- const rb_hrtime_t *sto;
- struct timespec ts;
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ } while (wait_retryable(&result, lerrno, to, end));
- sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
- if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
- result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0);
- if (result < 0) lerrno = errno;
- }
- }, ubf, wfd.th, TRUE);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ }
- if (fds[1].fd >= 0) {
- int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd);
- (void)check_signals_nogvl(wfd.th, fd1);
- rb_sigwait_fd_put(wfd.th, fds[1].fd);
- }
- RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
- } while (wait_retryable(&result, lerrno, to, end));
+ EC_POP_TAG();
}
- EC_POP_TAG();
- RB_VM_LOCK_ENTER();
- {
- ccan_list_del(&wfd.wfd_node);
+ if (io) {
+ rb_io_blocking_operation_exit(io, &blocking_operation);
}
- RB_VM_LOCK_LEAVE();
if (state) {
- EC_JUMP_TAG(wfd.th->ec, state);
+ EC_JUMP_TAG(ec, state);
}
if (result < 0) {
@@ -4315,6 +4686,9 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
}
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
struct select_args {
+ struct rb_io *io;
+ struct rb_io_blocking_operation *blocking_operation;
+
union {
int fd;
int error;
@@ -4322,7 +4696,6 @@ struct select_args {
rb_fdset_t *read;
rb_fdset_t *write;
rb_fdset_t *except;
- struct waiting_fd wfd;
struct timeval *tv;
};
@@ -4353,11 +4726,10 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;
- RB_VM_LOCK_ENTER();
- {
- ccan_list_del(&args->wfd.wfd_node);
+ if (args->blocking_operation) {
+ rb_io_blocking_operation_exit(args->io, args->blocking_operation);
}
- RB_VM_LOCK_LEAVE();
+
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
@@ -4365,36 +4737,64 @@ select_single_cleanup(VALUE ptr)
return (VALUE)-1;
}
-int
-rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
+{
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
+
+ return fds;
+}
+
+static int
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
rb_fdset_t rfds, wfds, efds;
struct select_args args;
- int r;
VALUE ptr = (VALUE)&args;
+ struct rb_io_blocking_operation blocking_operation;
+ if (io) {
+ args.io = io;
+ blocking_operation.ec = th->ec;
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+ args.blocking_operation = &blocking_operation;
+ }
+ else {
+ args.io = NULL;
+ blocking_operation.ec = NULL;
+ args.blocking_operation = NULL;
+ }
+
args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = timeout;
- args.wfd.fd = fd;
- args.wfd.th = GET_THREAD();
-
- RB_VM_LOCK_ENTER();
- {
- ccan_list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
- r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
- if (r == -1)
+ int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
+ if (result == -1)
errno = args.as.error;
- return r;
+ return result;
}
#endif /* ! USE_POLL */
+int
+rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout)
+{
+ return thread_io_wait(th, NULL, fd, events, timeout);
+}
+
+int
+rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout)
+{
+ return thread_io_wait(th, io, io->fd, events, timeout);
+}
+
/*
* for GC
*/
@@ -4404,7 +4804,12 @@ void
rb_gc_set_stack_end(VALUE **stack_end_p)
{
VALUE stack_end;
+COMPILER_WARNING_PUSH
+#if RBIMPL_COMPILER_IS(GCC)
+COMPILER_WARNING_IGNORED(-Wdangling-pointer);
+#endif
*stack_end_p = &stack_end;
+COMPILER_WARNING_POP
}
#endif
@@ -4446,16 +4851,13 @@ consume_communication_pipe(int fd)
ssize_t result;
int ret = FALSE; /* for rb_sigwait_sleep */
- /*
- * disarm UBF_TIMER before we read, because it can become
- * re-armed at any time via sighandler and the pipe will refill
- * We can disarm it because this thread is now processing signals
- * and we do not want unnecessary SIGVTALRM
- */
- ubf_timer_disarm();
-
while (1) {
result = read(fd, buff, sizeof(buff));
+#if USE_EVENTFD
+ RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]);
+#else
+ RUBY_DEBUG_LOG("result:%d", (int)result);
+#endif
if (result > 0) {
ret = TRUE;
if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
@@ -4482,25 +4884,6 @@ consume_communication_pipe(int fd)
}
}
-static int
-check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
-{
- rb_vm_t *vm = GET_VM(); /* th may be 0 */
- int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE;
- ubf_wakeup_all_threads();
- if (rb_signal_buff_size()) {
- if (th == vm->ractor.main_thread) {
- /* no need to lock + wakeup if already in main thread */
- RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
- }
- else {
- threadptr_trap_interrupt(vm->ractor.main_thread);
- }
- ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
- }
- return ret;
-}
-
void
rb_thread_stop_timer_thread(void)
{
@@ -4576,9 +4959,13 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
thread_sched_atfork(TH_SCHED(th));
ubf_list_atfork();
+ rb_signal_atfork();
// OK. Only this thread accesses:
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
+ if (r != vm->ractor.main_ractor) {
+ rb_ractor_terminate_atfork(vm, r);
+ }
ccan_list_for_each(&r->threads.set, i, lt_node) {
atfork(i, th);
}
@@ -4586,17 +4973,20 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
rb_vm_living_threads_init(vm);
rb_ractor_atfork(vm, th);
-
- /* may be held by RJIT threads in parent */
- rb_native_mutex_initialize(&vm->workqueue_lock);
+ rb_vm_postponed_job_atfork();
/* may be held by any thread in parent */
rb_native_mutex_initialize(&th->interrupt_lock);
+ ccan_list_head_init(&th->interrupt_exec_tasks);
vm->fork_gen++;
rb_ractor_sleeper_threads_clear(th->ractor);
rb_clear_coverages();
+ // restart timer thread (timer threads access to `vm->waitpid_lock` and so on.
+ rb_thread_reset_timer_thread();
+ rb_thread_start_timer_thread();
+
VM_ASSERT(vm->ractor.blocking_cnt == 0);
VM_ASSERT(vm->ractor.cnt == 1);
}
@@ -4605,6 +4995,10 @@ static void
terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
{
if (th != current_th) {
+ // Clear the scheduler as it is no longer operational:
+ th->scheduler = Qnil;
+
+ rb_native_mutex_initialize(&th->interrupt_lock);
rb_mutex_abandon_keeping_mutexes(th);
rb_mutex_abandon_locking_mutex(th);
thread_cleanup_func(th, TRUE);
@@ -4616,8 +5010,10 @@ void
rb_thread_atfork(void)
{
rb_thread_t *th = GET_THREAD();
+ rb_threadptr_pending_interrupt_clear(th);
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
+ th->scheduler = Qnil;
rb_fiber_atfork(th);
/* We don't want reproduce CVE-2003-0900. */
@@ -4654,16 +5050,14 @@ struct thgroup {
int enclosed;
};
-static size_t
-thgroup_memsize(const void *ptr)
-{
- return sizeof(struct thgroup);
-}
-
static const rb_data_type_t thgroup_data_type = {
"thgroup",
- {0, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
- 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
+ {
+ 0,
+ RUBY_TYPED_DEFAULT_FREE,
+ NULL, // No external memory to report
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
};
/*
@@ -4889,7 +5283,7 @@ rb_thread_shield_owned(VALUE self)
rb_mutex_t *m = mutex_ptr(mutex);
- return m->fiber == GET_EC()->fiber_ptr;
+ return m->ec_serial == rb_ec_serial(GET_EC());
}
/*
@@ -4908,7 +5302,7 @@ rb_thread_shield_wait(VALUE self)
if (!mutex) return Qfalse;
m = mutex_ptr(mutex);
- if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
+ if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil;
rb_thread_shield_waiting_inc(self);
rb_mutex_lock(mutex);
rb_thread_shield_waiting_dec(self);
@@ -4991,12 +5385,12 @@ recursive_list_access(VALUE sym)
}
/*
- * Returns Qtrue if and only if obj (or the pair <obj, paired_obj>) is already
+ * Returns true if and only if obj (or the pair <obj, paired_obj>) is already
* in the recursion list.
* Assumes the recursion list is valid.
*/
-static VALUE
+static bool
recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id)
{
#if SIZEOF_LONG == SIZEOF_VOIDP
@@ -5008,18 +5402,18 @@ recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id)
VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
if (UNDEF_P(pair_list))
- return Qfalse;
+ return false;
if (paired_obj_id) {
if (!RB_TYPE_P(pair_list, T_HASH)) {
if (!OBJ_ID_EQL(paired_obj_id, pair_list))
- return Qfalse;
+ return false;
}
else {
if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
- return Qfalse;
+ return false;
}
}
- return Qtrue;
+ return true;
}
/*
@@ -5099,7 +5493,7 @@ exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
* Calls func(obj, arg, recursive), where recursive is non-zero if the
* current method is called recursively on obj, or on the pair <obj, pairid>
* If outer is 0, then the innermost func will be called with recursive set
- * to Qtrue, otherwise the outermost func will be called. In the latter case,
+ * to true, otherwise the outermost func will be called. In the latter case,
* all inner func are short-circuited by throw.
* Implementation details: the value thrown is the recursive list which is
* proper to the current method and unlikely to be caught anywhere else.
@@ -5190,7 +5584,7 @@ rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pai
/*
* If recursion is detected on the current method and obj, the outermost
- * func will be called with (obj, arg, Qtrue). All inner func will be
+ * func will be called with (obj, arg, true). All inner func will be
* short-circuited using throw.
*/
@@ -5208,7 +5602,7 @@ rb_exec_recursive_outer_mid(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE
/*
* If recursion is detected on the current method, obj and paired_obj,
- * the outermost func will be called with (obj, arg, Qtrue). All inner
+ * the outermost func will be called with (obj, arg, true). All inner
* func will be short-circuited using throw.
*/
@@ -5276,7 +5670,6 @@ Init_Thread_Mutex(void)
void
Init_Thread(void)
{
- VALUE cThGroup;
rb_thread_t *th = GET_THREAD();
sym_never = ID2SYM(rb_intern_const("never"));
@@ -5350,6 +5743,18 @@ Init_Thread(void)
rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0);
rb_define_method(cThGroup, "add", thgroup_add, 1);
+ const char * ptr = getenv("RUBY_THREAD_TIMESLICE");
+
+ if (ptr) {
+ long quantum = strtol(ptr, NULL, 0);
+ if (quantum > 0 && !(SIZEOF_LONG > 4 && quantum > UINT32_MAX)) {
+ thread_default_quantum_ms = (uint32_t)quantum;
+ }
+ else if (0) {
+ fprintf(stderr, "Ignored RUBY_THREAD_TIMESLICE=%s\n", ptr);
+ }
+ }
+
{
th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
rb_define_const(cThGroup, "Default", th->thgroup);
@@ -5362,8 +5767,12 @@ Init_Thread(void)
/* main thread setting */
{
/* acquire global vm lock */
- struct rb_thread_sched *sched = TH_SCHED(th);
- thread_sched_to_running(sched, th);
+#ifdef HAVE_PTHREAD_NP_H
+ VM_ASSERT(TH_SCHED(th)->running == th);
+#endif
+ // thread_sched_to_running() should not be called because
+ // it assumes blocked by thread_sched_to_waiting().
+ // thread_sched_to_running(sched, th);
th->pending_interrupt_queue = rb_ary_hidden_new(0);
th->pending_interrupt_queue_checked = 0;
@@ -5374,6 +5783,9 @@ Init_Thread(void)
rb_thread_create_timer_thread();
Init_thread_sync();
+
+ // TODO: Suppress unused function warning for now
+ // if (0) rb_thread_sched_destroy(NULL);
}
int
@@ -5403,12 +5815,12 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
ccan_list_for_each(&r->threads.set, th, lt_node) {
rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
"native:%p int:%u",
- th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
+ th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag);
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
- (void *)mutex->fiber, rb_mutex_num_waiting(mutex));
+ rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE,
+ (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex));
}
{
@@ -5419,7 +5831,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
}
}
rb_str_catf(msg, "\n ");
- rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep));
+ rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, RUBY_BACKTRACE_START, RUBY_ALL_BACKTRACE_LINES), sep));
rb_str_catf(msg, "\n");
}
}
@@ -5429,14 +5841,18 @@ rb_check_deadlock(rb_ractor_t *r)
{
if (GET_THREAD()->vm->thread_ignore_deadlock) return;
- int found = 0;
- rb_thread_t *th = NULL;
+#ifdef RUBY_THREAD_PTHREAD_H
+ if (r->threads.sched.readyq_cnt > 0) return;
+#endif
+
int sleeper_num = rb_ractor_sleeper_thread_num(r);
int ltnum = rb_ractor_living_thread_num(r);
if (ltnum > sleeper_num) return;
if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
- if (patrol_thread && patrol_thread != GET_THREAD()) return;
+
+ int found = 0;
+ rb_thread_t *th = NULL;
ccan_list_for_each(&r->threads.set, th, lt_node) {
if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
@@ -5444,7 +5860,7 @@ rb_check_deadlock(rb_ractor_t *r)
}
else if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) {
+ if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) {
found = 1;
}
}
@@ -5462,21 +5878,6 @@ rb_check_deadlock(rb_ractor_t *r)
}
}
-// Used for VM memsize reporting. Returns the size of a list of waiting_fd
-// structs. Defined here because the struct definition lives here as well.
-size_t
-rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds)
-{
- struct waiting_fd *waitfd = 0;
- size_t size = 0;
-
- ccan_list_for_each(waiting_fds, waitfd, wfd_node) {
- size += sizeof(struct waiting_fd);
- }
-
- return size;
-}
-
static void
update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
{
@@ -5486,6 +5887,7 @@ update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
if (lines) {
long line = rb_sourceline() - 1;
+ VM_ASSERT(line >= 0);
long count;
VALUE num;
void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
@@ -5727,7 +6129,7 @@ rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
rb_thread_t *cur_th = GET_THREAD();
rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
- OBJ_FREEZE_RAW(interrupt_mask);
+ OBJ_FREEZE(interrupt_mask);
rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil);
@@ -5735,3 +6137,165 @@ rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
RUBY_VM_CHECK_INTS(cur_th->ec);
return ret;
}
+
+static void
+thread_specific_storage_alloc(rb_thread_t *th)
+{
+ VM_ASSERT(th->specific_storage == NULL);
+
+ if (UNLIKELY(specific_key_count > 0)) {
+ th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ }
+}
+
+rb_internal_thread_specific_key_t
+rb_internal_thread_specific_key_create(void)
+{
+ rb_vm_t *vm = GET_VM();
+
+ if (specific_key_count == 0 && vm->ractor.cnt > 1) {
+ rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors");
+ }
+ else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) {
+ rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ }
+ else {
+ rb_internal_thread_specific_key_t key = specific_key_count++;
+
+ if (key == 0) {
+ // allocate
+ rb_ractor_t *cr = GET_RACTOR();
+ rb_thread_t *th;
+
+ ccan_list_for_each(&cr->threads.set, th, lt_node) {
+ thread_specific_storage_alloc(th);
+ }
+ }
+ return key;
+ }
+}
+
+// async and native thread safe.
+void *
+rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key)
+{
+ rb_thread_t *th = DATA_PTR(thread_val);
+
+ VM_ASSERT(rb_thread_ptr(thread_val) == th);
+ VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ VM_ASSERT(th->specific_storage);
+
+ return th->specific_storage[key];
+}
+
+// async and native thread safe.
+void
+rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data)
+{
+ rb_thread_t *th = DATA_PTR(thread_val);
+
+ VM_ASSERT(rb_thread_ptr(thread_val) == th);
+ VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ VM_ASSERT(th->specific_storage);
+
+ th->specific_storage[key] = data;
+}
+
+// interrupt_exec
+
+struct rb_interrupt_exec_task {
+ struct ccan_list_node node;
+
+ rb_interrupt_exec_func_t *func;
+ void *data;
+ enum rb_interrupt_exec_flag flags;
+};
+
+void
+rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th)
+{
+ struct rb_interrupt_exec_task *task;
+
+ ccan_list_for_each(&th->interrupt_exec_tasks, task, node) {
+ if (task->flags & rb_interrupt_exec_flag_value_data) {
+ rb_gc_mark((VALUE)task->data);
+ }
+ }
+}
+
+// native thread safe
+// th should be available
+void
+rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
+{
+ // should not use ALLOC
+ struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task);
+ *task = (struct rb_interrupt_exec_task) {
+ .flags = flags,
+ .func = func,
+ .data = data,
+ };
+
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
+ threadptr_set_interrupt_locked(th, true);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
+
+static void
+threadptr_interrupt_exec_exec(rb_thread_t *th)
+{
+ while (1) {
+ struct rb_interrupt_exec_task *task;
+
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+
+ RUBY_DEBUG_LOG("task:%p", task);
+
+ if (task) {
+ if (task->flags & rb_interrupt_exec_flag_new_thread) {
+ rb_thread_create(task->func, task->data);
+ }
+ else {
+ (*task->func)(task->data);
+ }
+ ruby_xfree(task);
+ }
+ else {
+ break;
+ }
+ }
+}
+
+static void
+threadptr_interrupt_exec_cleanup(rb_thread_t *th)
+{
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ struct rb_interrupt_exec_task *task;
+
+ while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) {
+ ruby_xfree(task);
+ }
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
+
+// native thread safe
+// func/data should be native thread safe
+void
+rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
+ rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
+{
+ RUBY_DEBUG_LOG("flags:%d", (int)flags);
+
+ rb_thread_t *main_th = target_r->threads.main;
+ rb_threadptr_interrupt_exec(main_th, func, data, flags | rb_interrupt_exec_flag_new_thread);
+}
+