diff options
Diffstat (limited to 'thread.c')
| -rw-r--r-- | thread.c | 763 |
1 files changed, 499 insertions, 264 deletions
@@ -78,6 +78,7 @@ #include "internal/class.h" #include "internal/cont.h" #include "internal/error.h" +#include "internal/eval.h" #include "internal/gc.h" #include "internal/hash.h" #include "internal/io.h" @@ -98,6 +99,9 @@ #include "ractor_core.h" #include "vm_debug.h" #include "vm_sync.h" +#include "zjit.h" + +#include "ccan/list/list.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -106,6 +110,7 @@ #endif static VALUE rb_cThreadShield; +static VALUE cThGroup; static VALUE sym_immediate; static VALUE sym_on_blocking; @@ -145,16 +150,9 @@ static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); MAYBE_UNUSED(static int consume_communication_pipe(int fd)); -static volatile int system_working = 1; +static rb_atomic_t system_working = 1; static rb_internal_thread_specific_key_t specific_key_count; -struct waiting_fd { - struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */ - rb_thread_t *th; - int fd; - struct rb_io_close_wait_list *busy; -}; - /********************************************************************************/ #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION @@ -173,11 +171,11 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio #define THREAD_BLOCKING_BEGIN(th) do { \ struct rb_thread_sched * const sched = TH_SCHED(th); \ RB_VM_SAVE_MACHINE_CONTEXT(th); \ - thread_sched_to_waiting((sched), (th)); + thread_sched_to_waiting((sched), (th), true); #define THREAD_BLOCKING_END(th) \ thread_sched_to_running((sched), (th)); \ - rb_ractor_thread_switch(th->ractor, th); \ + rb_ractor_thread_switch(th->ractor, th, false); \ } while(0) #ifdef __GNUC__ @@ -197,7 +195,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio /* Important that this is inlined into the macro, and not part of \ * blocking_region_begin - see bug #20493 */ \ RB_VM_SAVE_MACHINE_CONTEXT(th); \ - thread_sched_to_waiting(TH_SCHED(th), th); \ + thread_sched_to_waiting(TH_SCHED(th), th, false); \ exec; \ blocking_region_end(th, &__region); \ }; \ @@ -211,6 +209,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio static inline int vm_check_ints_blocking(rb_execution_context_t *ec) { +#ifdef RUBY_ASSERT_CRITICAL_SECTION + VM_ASSERT(ruby_assert_critical_section_entered == 0); +#endif + rb_thread_t *th = rb_ec_thread_ptr(ec); if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) { @@ -220,7 +222,18 @@ vm_check_ints_blocking(rb_execution_context_t *ec) th->pending_interrupt_queue_checked = 0; RUBY_VM_SET_INTERRUPT(ec); } - return rb_threadptr_execute_interrupts(th, 1); + + int result = rb_threadptr_execute_interrupts(th, 1); + + // When a signal is received, we yield to the scheduler as soon as possible: + if (result || RUBY_VM_INTERRUPTED(ec)) { + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); + if (scheduler != Qnil) { + rb_fiber_scheduler_yield(scheduler); + } + } + + return result; } int @@ -339,7 +352,7 @@ unblock_function_clear(rb_thread_t *th) } static void -threadptr_interrupt_locked(rb_thread_t *th, bool trap) +threadptr_set_interrupt_locked(rb_thread_t *th, bool trap) { // th->interrupt_lock should be acquired here @@ -361,26 +374,27 @@ threadptr_interrupt_locked(rb_thread_t *th, bool trap) } static void -threadptr_interrupt(rb_thread_t *th, int trap) +threadptr_set_interrupt(rb_thread_t *th, int trap) { rb_native_mutex_lock(&th->interrupt_lock); { - threadptr_interrupt_locked(th, trap); + threadptr_set_interrupt_locked(th, trap); } rb_native_mutex_unlock(&th->interrupt_lock); } +/* Set interrupt flag on another thread or current thread, and call its UBF if it has one set */ void rb_threadptr_interrupt(rb_thread_t *th) { RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - threadptr_interrupt(th, false); + threadptr_set_interrupt(th, false); } static void threadptr_trap_interrupt(rb_thread_t *th) { - threadptr_interrupt(th, true); + threadptr_set_interrupt(th, true); } static void @@ -440,8 +454,8 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) th->keeping_mutexes = mutex->next_mutex; // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th); - - const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber); + VM_ASSERT(mutex->ec_serial); + const char *error_message = rb_mutex_unlock_th(mutex, th, 0); if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message); } } @@ -518,12 +532,8 @@ thread_cleanup_func(void *th_ptr, int atfork) th->locking_mutex = Qfalse; thread_cleanup_func_before_exec(th_ptr); - /* - * Unfortunately, we can't release native threading resource at fork - * because libc may have unstable locking state therefore touching - * a threading resource may cause a deadlock. - */ if (atfork) { + native_thread_destroy_atfork(th->nt); th->nt = NULL; return; } @@ -531,6 +541,15 @@ thread_cleanup_func(void *th_ptr, int atfork) rb_native_mutex_destroy(&th->interrupt_lock); } +void +rb_thread_free_native_thread(void *th_ptr) +{ + rb_thread_t *th = th_ptr; + + native_thread_destroy_atfork(th->nt); + th->nt = NULL; +} + static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); static VALUE rb_thread_to_s(VALUE thread); @@ -575,6 +594,8 @@ thread_do_start_proc(rb_thread_t *th) if (th->invoke_type == thread_invoke_type_ractor_proc) { VALUE self = rb_ractor_self(th->ractor); + th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup); + VM_ASSERT(FIXNUM_P(args)); args_len = FIX2INT(args); args_ptr = ALLOCA_N(VALUE, args_len); @@ -690,7 +711,8 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) if (state == TAG_NONE) { // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed. th->value = result; - } else { + } + else { errinfo = th->ec->errinfo; VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef); @@ -703,6 +725,10 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) /* fatal error within this thread, need to stop whole script */ } else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit_exception(th->ec); + } + /* exit on main_thread. */ } else { @@ -835,8 +861,9 @@ thread_create_core(VALUE thval, struct thread_create_params *params) #endif th->invoke_type = thread_invoke_type_ractor_proc; th->ractor = params->g; + th->ec->ractor_id = rb_ractor_id(th->ractor); th->ractor->threads.main = th; - th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc); + th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil); th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args)); th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); rb_ractor_send_parameters(ec, params->g, params->args); @@ -1050,25 +1077,30 @@ thread_join_sleep(VALUE arg) } while (!thread_finished(target_th)) { - VALUE scheduler = rb_fiber_scheduler_current(); + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); - if (scheduler != Qnil) { - rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout); - // Check if the target thread is finished after blocking: - if (thread_finished(target_th)) break; - // Otherwise, a timeout occurred: - else return Qfalse; - } - else if (!limit) { - sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS); + if (!limit) { + if (scheduler != Qnil) { + rb_fiber_scheduler_block(scheduler, target_th->self, Qnil); + } + else { + sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS); + } } else { if (hrtime_update_expire(limit, end)) { RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th)); return Qfalse; } - th->status = THREAD_STOPPED; - native_sleep(th, limit); + + if (scheduler != Qnil) { + VALUE timeout = rb_float_new(hrtime2double(*limit)); + rb_fiber_scheduler_block(scheduler, target_th->self, timeout); + } + else { + th->status = THREAD_STOPPED; + native_sleep(th, limit); + } } RUBY_VM_CHECK_INTS_BLOCKING(th->ec); th->status = THREAD_RUNNABLE; @@ -1127,6 +1159,10 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit) /* OK. killed. */ break; default: + if (err == RUBY_FATAL_FIBER_KILLED) { // not integer constant so can't be a case expression + // root fiber killed in non-main thread + break; + } rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); } } @@ -1390,17 +1426,18 @@ rb_thread_sleep_deadly(void) static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end) { - VALUE scheduler = rb_fiber_scheduler_current(); + rb_thread_t *th = GET_THREAD(); + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); if (scheduler != Qnil) { rb_fiber_scheduler_block(scheduler, blocker, timeout); } else { RUBY_DEBUG_LOG("..."); if (end) { - sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK); + sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK); } else { - sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + sleep_forever(th, SLEEP_DEADLOCKABLE); } } } @@ -1467,7 +1504,7 @@ rb_thread_schedule_limits(uint32_t limits_us) RB_VM_SAVE_MACHINE_CONTEXT(th); thread_sched_yield(TH_SCHED(th), th); - rb_ractor_thread_switch(th->ractor, th); + rb_ractor_thread_switch(th->ractor, th, true); RUBY_DEBUG_LOG("switch %s", "done"); } @@ -1515,7 +1552,7 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) unregister_ubf_list(th); thread_sched_to_running(TH_SCHED(th), th); - rb_ractor_thread_switch(th->ractor, th); + rb_ractor_thread_switch(th->ractor, th, false); th->blocking_region_buffer = 0; rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__); @@ -1531,6 +1568,29 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) #endif } +/* + * Resolve sentinel unblock function values to their actual function pointers + * and appropriate data2 values. This centralizes the logic for handling + * RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values. + * + * @param unblock_function Pointer to unblock function pointer (modified in place) + * @param data2 Pointer to data2 pointer (modified in place) + * @param thread Thread context for resolving data2 when needed + * @return true if sentinel values were resolved, false otherwise + */ +bool +rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread) +{ + rb_unblock_function_t *ubf = *unblock_function; + + if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) { + *unblock_function = ubf_select; + *data2 = thread; + return true; + } + return false; +} + void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, @@ -1539,7 +1599,7 @@ rb_nogvl(void *(*func)(void *), void *data1, if (flags & RB_NOGVL_OFFLOAD_SAFE) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { - struct rb_fiber_scheduler_blocking_operation_state state; + struct rb_fiber_scheduler_blocking_operation_state state = {0}; VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state); @@ -1557,11 +1617,9 @@ rb_nogvl(void *(*func)(void *), void *data1, bool is_main_thread = vm->ractor.main_thread == th; int saved_errno = 0; - if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) { - ubf = ubf_select; - data2 = th; - } - else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) { + rb_thread_resolve_unblock_function(&ubf, &data2, th); + + if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) { if (flags & RB_NOGVL_UBF_ASYNC_SAFE) { vm->ubf_async_safe = 1; } @@ -1690,46 +1748,146 @@ waitfd_to_waiting_flag(int wfd_event) return wfd_event << 1; } +static struct ccan_list_head * +rb_io_blocking_operations(struct rb_io *io) +{ + rb_serial_t fork_generation = GET_VM()->fork_gen; + + // On fork, all existing entries in this list (which are stack allocated) become invalid. + // Therefore, we re-initialize the list which clears it. + if (io->fork_generation != fork_generation) { + ccan_list_head_init(&io->blocking_operations); + io->fork_generation = fork_generation; + } + + return &io->blocking_operations; +} + +/* + * Registers a blocking operation for an IO object. This is used to track all threads and fibers + * that are currently blocked on this IO for reading, writing or other operations. + * + * When the IO is closed, all blocking operations will be notified via rb_fiber_scheduler_fiber_interrupt + * for fibers with a scheduler, or via rb_threadptr_interrupt for threads without a scheduler. + * + * @parameter io The IO object on which the operation will block + * @parameter blocking_operation The operation details including the execution context that will be blocked + */ static void -thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd) +rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) { - wfd->fd = fd; - wfd->th = th; - wfd->busy = NULL; + ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list); +} - RB_VM_LOCK_ENTER(); - { - ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node); +static void +rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) +{ + ccan_list_del(&blocking_operation->list); +} + +struct io_blocking_operation_arguments { + struct rb_io *io; + struct rb_io_blocking_operation *blocking_operation; +}; + +static VALUE +io_blocking_operation_exit(VALUE _arguments) +{ + struct io_blocking_operation_arguments *arguments = (void*)_arguments; + struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation; + + rb_io_blocking_operation_pop(arguments->io, blocking_operation); + + rb_io_t *io = arguments->io; + rb_thread_t *thread = io->closing_ec->thread_ptr; + rb_fiber_t *fiber = io->closing_ec->fiber_ptr; + + if (thread->scheduler != Qnil) { + // This can cause spurious wakeups... + rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber)); + } + else { + rb_thread_wakeup(thread->self); } - RB_VM_LOCK_LEAVE(); + + return Qnil; } +/* + * Called when a blocking operation completes or is interrupted. Removes the operation from + * the IO's blocking_operations list and wakes up any waiting threads/fibers. + * + * If there's a wakeup_mutex (meaning an IO close is in progress), synchronizes the cleanup + * through that mutex to ensure proper coordination with the closing thread. + * + * @parameter io The IO object the operation was performed on + * @parameter blocking_operation The completed operation to clean up + */ static void -thread_io_wake_pending_closer(struct waiting_fd *wfd) -{ - bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex); - if (has_waiter) { - rb_mutex_lock(wfd->busy->wakeup_mutex); - } - - /* Needs to be protected with RB_VM_LOCK because we don't know if - wfd is on the global list of pending FD ops or if it's on a - struct rb_io_close_wait_list close-waiter. */ - RB_VM_LOCK_ENTER(); - ccan_list_del(&wfd->wfd_node); - RB_VM_LOCK_LEAVE(); - - if (has_waiter) { - rb_thread_t *th = rb_thread_ptr(wfd->busy->closing_thread); - if (th->scheduler != Qnil) { - rb_fiber_scheduler_unblock(th->scheduler, wfd->busy->closing_thread, wfd->busy->closing_fiber); - } else { - rb_thread_wakeup(wfd->busy->closing_thread); - } - rb_mutex_unlock(wfd->busy->wakeup_mutex); +rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) +{ + VALUE wakeup_mutex = io->wakeup_mutex; + + // Indicate that the blocking operation is no longer active: + blocking_operation->ec = NULL; + + if (RB_TEST(wakeup_mutex)) { + struct io_blocking_operation_arguments arguments = { + .io = io, + .blocking_operation = blocking_operation + }; + + rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments); + } + else { + // If there's no wakeup_mutex, we can safely remove the operation directly: + rb_io_blocking_operation_pop(io, blocking_operation); } } +static VALUE +rb_thread_io_blocking_operation_ensure(VALUE _argument) +{ + struct io_blocking_operation_arguments *arguments = (void*)_argument; + + rb_io_blocking_operation_exit(arguments->io, arguments->blocking_operation); + + return Qnil; +} + +/* + * Executes a function that performs a blocking IO operation, while properly tracking + * the operation in the IO's blocking_operations list. This ensures proper cleanup + * and interruption handling if the IO is closed while blocked. + * + * The operation is automatically removed from the blocking_operations list when the function + * returns, whether normally or due to an exception. + * + * @parameter self The IO object + * @parameter function The function to execute that will perform the blocking operation + * @parameter argument The argument to pass to the function + * @returns The result of the blocking operation function + */ +VALUE +rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument) +{ + struct rb_io *io; + RB_IO_POINTER(self, io); + + rb_execution_context_t *ec = GET_EC(); + struct rb_io_blocking_operation blocking_operation = { + .ec = ec, + }; + rb_io_blocking_operation_enter(io, &blocking_operation); + + struct io_blocking_operation_arguments io_blocking_operation_arguments = { + .io = io, + .blocking_operation = &blocking_operation + }; + + return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments); +} + static bool thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout) { @@ -1795,44 +1953,52 @@ rb_thread_mn_schedulable(VALUE thval) } VALUE -rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events) +rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events) { - rb_execution_context_t *volatile ec = GET_EC(); - rb_thread_t *volatile th = rb_ec_thread_ptr(ec); + rb_execution_context_t * volatile ec = GET_EC(); + rb_thread_t * volatile th = rb_ec_thread_ptr(ec); - RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events); + RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events); - struct waiting_fd waiting_fd; volatile VALUE val = Qundef; /* shouldn't be used */ volatile int saved_errno = 0; enum ruby_tag_type state; volatile bool prev_mn_schedulable = th->mn_schedulable; th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL); + int fd = io->fd; + // `errno` is only valid when there is an actual error - but we can't // extract that from the return value of `func` alone, so we clear any // prior `errno` value here so that we can later check if it was set by // `func` or not (as opposed to some previously set value). errno = 0; - thread_io_setup_wfd(th, fd, &waiting_fd); + struct rb_io_blocking_operation blocking_operation = { + .ec = ec, + }; + rb_io_blocking_operation_enter(io, &blocking_operation); + { EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */ retry: - BLOCKING_REGION(waiting_fd.th, { + BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; - }, ubf_select, waiting_fd.th, FALSE); + }, ubf_select, th, FALSE); - th = rb_ec_thread_ptr(ec); + RUBY_ASSERT(th == rb_ec_thread_ptr(ec)); if (events && blocking_call_retryable_p((int)val, saved_errno) && thread_io_wait_events(th, fd, events, NULL)) { RUBY_VM_CHECK_INTS_BLOCKING(ec); goto retry; } + + RUBY_VM_CHECK_INTS_BLOCKING(ec); + state = saved_state; } EC_POP_TAG(); @@ -1840,17 +2006,12 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in th = rb_ec_thread_ptr(ec); th->mn_schedulable = prev_mn_schedulable; } - /* - * must be deleted before jump - * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list - */ - thread_io_wake_pending_closer(&waiting_fd); + + rb_io_blocking_operation_exit(io, &blocking_operation); if (state) { EC_JUMP_TAG(ec, state); } - /* TODO: check func() */ - RUBY_VM_CHECK_INTS_BLOCKING(ec); // If the error was a timeout, we raise a specific exception for that: if (saved_errno == ETIMEDOUT) { @@ -1863,9 +2024,9 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in } VALUE -rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1) { - return rb_thread_io_blocking_call(func, data1, fd, 0); + return rb_thread_io_blocking_call(io, func, data1, 0); } /* @@ -1895,6 +2056,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) * created as Ruby thread (created by Thread.new or so). In other * words, this function *DOES NOT* associate or convert a NON-Ruby * thread to a Ruby thread. + * + * NOTE: If this thread has already acquired the GVL, then the method call + * is performed without acquiring or releasing the GVL (from Ruby 4.0). */ void * rb_thread_call_with_gvl(void *(*func)(void *), void *data1) @@ -1918,7 +2082,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) prev_unblock = th->unblock; if (brb == 0) { - rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); + /* the GVL is already acquired, call method directly */ + return (*func)(data1); } blocking_region_end(th, brb); @@ -1928,17 +2093,12 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); RUBY_ASSERT_ALWAYS(released); RB_VM_SAVE_MACHINE_CONTEXT(th); - thread_sched_to_waiting(TH_SCHED(th), th); + thread_sched_to_waiting(TH_SCHED(th), th, true); return r; } /* * ruby_thread_has_gvl_p - check if current native thread has GVL. - * - *** - *** This API is EXPERIMENTAL! - *** We do not guarantee that this API remains in ruby 1.9.2 or later. - *** */ int @@ -2424,6 +2584,7 @@ NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th)); static void rb_threadptr_to_kill(rb_thread_t *th) { + VM_ASSERT(GET_THREAD() == th); rb_threadptr_pending_interrupt_clear(th); th->status = THREAD_RUNNABLE; th->to_kill = 1; @@ -2438,8 +2599,9 @@ threadptr_get_interrupts(rb_thread_t *th) rb_atomic_t interrupt; rb_atomic_t old; + old = ATOMIC_LOAD_RELAXED(ec->interrupt_flag); do { - interrupt = ec->interrupt_flag; + interrupt = old; old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask); } while (old != interrupt); return interrupt & (rb_atomic_t)~ec->interrupt_mask; @@ -2447,6 +2609,11 @@ threadptr_get_interrupts(rb_thread_t *th) static void threadptr_interrupt_exec_exec(rb_thread_t *th); +// Execute interrupts on currently running thread +// In certain situations, calling this function will raise an exception. Some examples are: +// * during VM shutdown (`rb_ractor_terminate_all`) +// * Call to Thread#exit for current thread (`rb_thread_kill`) +// * Call to Thread#raise for current thread int rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { @@ -2454,6 +2621,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) int postponed_job_interrupt = 0; int ret = FALSE; + VM_ASSERT(GET_THREAD() == th); + if (th->ec->raised_flag) return ret; while ((interrupt = threadptr_get_interrupts(th)) != 0) { @@ -2470,8 +2639,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors if (interrupt & VM_BARRIER_INTERRUPT_MASK) { - RB_VM_LOCK_ENTER(); - RB_VM_LOCK_LEAVE(); + RB_VM_LOCKING(); } if (postponed_job_interrupt) { @@ -2592,6 +2760,7 @@ rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv) rb_ec_setup_exception(GET_EC(), exc, Qundef); rb_threadptr_pending_interrupt_enque(target_th, exc); rb_threadptr_interrupt(target_th); + return Qnil; } @@ -2637,86 +2806,122 @@ rb_ec_reset_raised(rb_execution_context_t *ec) return 1; } -int -rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) +/* + * Thread-safe IO closing mechanism. + * + * When an IO is closed while other threads or fibers are blocked on it, we need to: + * 1. Track and notify all blocking operations through io->blocking_operations + * 2. Ensure only one thread can close at a time using io->closing_ec + * 3. Synchronize cleanup using wakeup_mutex + * + * The close process works as follows: + * - First check if any thread is already closing (io->closing_ec) + * - Set up wakeup_mutex for synchronization + * - Iterate through all blocking operations in io->blocking_operations + * - For each blocked fiber with a scheduler: + * - Notify via rb_fiber_scheduler_fiber_interrupt + * - For each blocked thread without a scheduler: + * - Enqueue IOError via rb_threadptr_pending_interrupt_enque + * - Wake via rb_threadptr_interrupt + * - Wait on wakeup_mutex until all operations are cleaned up + * - Only then clear closing state and allow actual close to proceed + */ +static VALUE +thread_io_close_notify_all(VALUE _io) { - rb_vm_t *vm = GET_THREAD()->vm; - struct waiting_fd *wfd = 0, *next; - ccan_list_head_init(&busy->pending_fd_users); - int has_any; - VALUE wakeup_mutex; + struct rb_io *io = (struct rb_io *)_io; - RB_VM_LOCK_ENTER(); - { - ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { - if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; - VALUE err; - - ccan_list_del(&wfd->wfd_node); - ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node); - - wfd->busy = busy; - err = th->vm->special_exceptions[ruby_error_stream_closed]; - rb_threadptr_pending_interrupt_enque(th, err); - rb_threadptr_interrupt(th); + size_t count = 0; + rb_vm_t *vm = io->closing_ec->thread_ptr->vm; + VALUE error = vm->special_exceptions[ruby_error_stream_closed]; + + struct rb_io_blocking_operation *blocking_operation; + ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) { + rb_execution_context_t *ec = blocking_operation->ec; + + // If the operation is in progress, we need to interrupt it: + if (ec) { + rb_thread_t *thread = ec->thread_ptr; + + VALUE result = RUBY_Qundef; + if (thread->scheduler != Qnil) { + result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error); + } + + if (result == RUBY_Qundef) { + // If the thread is not the current thread, we need to enqueue an error: + rb_threadptr_pending_interrupt_enque(thread, error); + rb_threadptr_interrupt(thread); } } + + count += 1; } - has_any = !ccan_list_empty(&busy->pending_fd_users); - busy->closing_thread = rb_thread_current(); - busy->closing_fiber = rb_fiber_current(); - wakeup_mutex = Qnil; - if (has_any) { - wakeup_mutex = rb_mutex_new(); - RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */ + return (VALUE)count; +} + +size_t +rb_thread_io_close_interrupt(struct rb_io *io) +{ + // We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function. + if (io->closing_ec) { + return 0; } - busy->wakeup_mutex = wakeup_mutex; - RB_VM_LOCK_LEAVE(); + // If there are no blocking operations, we are done: + if (ccan_list_empty(rb_io_blocking_operations(io))) { + return 0; + } + + // Otherwise, we are now closing the IO: + rb_execution_context_t *ec = GET_EC(); + io->closing_ec = ec; + + // This is used to ensure the correct execution context is woken up after the blocking operation is interrupted: + io->wakeup_mutex = rb_mutex_new(); + rb_mutex_allow_trap(io->wakeup_mutex, 1); + + // We need to use a mutex here as entering the fiber scheduler may cause a context switch: + VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io); - /* If the caller didn't pass *busy as a pointer to something on the stack, - we need to guard this mutex object on _our_ C stack for the duration - of this function. */ - RB_GC_GUARD(wakeup_mutex); - return has_any; + return (size_t)result; } void -rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy) +rb_thread_io_close_wait(struct rb_io* io) { - if (!RB_TEST(busy->wakeup_mutex)) { - /* There was nobody else using this file when we closed it, so we - never bothered to allocate a mutex*/ + VALUE wakeup_mutex = io->wakeup_mutex; + + if (!RB_TEST(wakeup_mutex)) { + // There was nobody else using this file when we closed it, so we never bothered to allocate a mutex: return; } - rb_mutex_lock(busy->wakeup_mutex); - while (!ccan_list_empty(&busy->pending_fd_users)) { - rb_mutex_sleep(busy->wakeup_mutex, Qnil); + rb_mutex_lock(wakeup_mutex); + while (!ccan_list_empty(rb_io_blocking_operations(io))) { + rb_mutex_sleep(wakeup_mutex, Qnil); } - rb_mutex_unlock(busy->wakeup_mutex); + rb_mutex_unlock(wakeup_mutex); + + // We are done closing: + io->wakeup_mutex = Qnil; + io->closing_ec = NULL; } void rb_thread_fd_close(int fd) { - struct rb_io_close_wait_list busy; - - if (rb_notify_fd_close(fd, &busy)) { - rb_notify_fd_close_wait(&busy); - } + rb_warn("rb_thread_fd_close is deprecated (and is now a no-op)."); } /* * call-seq: - * thr.raise - * thr.raise(string) - * thr.raise(exception [, string [, array]]) + * raise(exception, message = exception.to_s, backtrace = nil, cause: $!) + * raise(message = nil, cause: $!) * * Raises an exception from the given thread. The caller does not have to be - * +thr+. See Kernel#raise for more information. + * +thr+. See Kernel#raise for more information on arguments. * * Thread.abort_on_exception = true * a = Thread.new { sleep(200) } @@ -2737,7 +2942,14 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) const rb_thread_t *current_th = GET_THREAD(); threadptr_check_pending_interrupt_queue(target_th); - rb_threadptr_raise(target_th, argc, argv); + + if (rb_threadptr_dead(target_th)) { + return Qnil; + } + + VALUE exception = rb_exception_setup(argc, argv); + rb_threadptr_pending_interrupt_enque(target_th, exception); + rb_threadptr_interrupt(target_th); /* To perform Thread.current.raise as Kernel.raise */ if (current_th == target_th) { @@ -2755,7 +2967,10 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) * * Terminates +thr+ and schedules another thread to be run, returning * the terminated Thread. If this is the main thread, or the last - * thread, exits the process. + * thread, exits the process. Note that the caller does not wait for + * the thread to terminate if the receiver is different from the currently + * running thread. The termination is asynchronous, and the thread can still + * run a small amount of ruby code before exiting. */ VALUE @@ -4052,13 +4267,20 @@ rb_fd_init(rb_fdset_t *fds) FD_ZERO(fds->fdset); } +static inline size_t +fdset_memsize(int maxfd) +{ + size_t o = howmany(maxfd, NFDBITS) * sizeof(fd_mask); + if (o < sizeof(fd_set)) { + return sizeof(fd_set); + } + return o; +} + void rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) { - size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); - - if (size < sizeof(fd_set)) - size = sizeof(fd_set); + size_t size = fdset_memsize(rb_fd_max(src)); dst->maxfd = src->maxfd; dst->fdset = xmalloc(size); memcpy(dst->fdset, src->fdset, size); @@ -4067,7 +4289,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) void rb_fd_term(rb_fdset_t *fds) { - xfree(fds->fdset); + ruby_xfree_sized(fds->fdset, fdset_memsize(fds->maxfd)); fds->maxfd = 0; fds->fdset = 0; } @@ -4082,14 +4304,11 @@ rb_fd_zero(rb_fdset_t *fds) static void rb_fd_resize(int n, rb_fdset_t *fds) { - size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); - size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); - - if (m < sizeof(fd_set)) m = sizeof(fd_set); - if (o < sizeof(fd_set)) o = sizeof(fd_set); + size_t m = fdset_memsize(n + 1); + size_t o = fdset_memsize(fds->maxfd); if (m > o) { - fds->fdset = xrealloc(fds->fdset, m); + fds->fdset = ruby_xrealloc_sized(fds->fdset, m, o); memset((char *)fds->fdset + o, 0, m - o); } if (n >= fds->maxfd) fds->maxfd = n + 1; @@ -4119,23 +4338,18 @@ rb_fd_isset(int n, const rb_fdset_t *fds) void rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) { - size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); - - if (size < sizeof(fd_set)) size = sizeof(fd_set); + size_t size = fdset_memsize(max); + dst->fdset = ruby_xrealloc_sized(dst->fdset, size, fdset_memsize(dst->maxfd)); dst->maxfd = max; - dst->fdset = xrealloc(dst->fdset, size); memcpy(dst->fdset, src, size); } void rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) { - size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); - - if (size < sizeof(fd_set)) - size = sizeof(fd_set); + size_t size = fdset_memsize(rb_fd_max(src)); + dst->fdset = ruby_xrealloc_sized(dst->fdset, size, fdset_memsize(dst->maxfd)); dst->maxfd = src->maxfd; - dst->fdset = xrealloc(dst->fdset, size); memcpy(dst->fdset, src->fdset, size); } @@ -4187,10 +4401,19 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) rb_fd_dup(dst, src); } +static inline size_t +fdset_memsize(int capa) +{ + if (capa == FD_SETSIZE) { + return sizeof(fd_set); + } + return sizeof(unsigned int) + (capa * sizeof(SOCKET)); +} + void rb_fd_term(rb_fdset_t *set) { - xfree(set->fdset); + ruby_xfree_sized(set->fdset, fdset_memsize(set->capa)); set->fdset = NULL; set->capa = 0; } @@ -4320,6 +4543,8 @@ do_select(VALUE p) RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ } while (wait_retryable(&result, lerrno, to, endtime) && do_select_update()); + RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); + if (result < 0) { errno = lerrno; } @@ -4400,8 +4625,8 @@ wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t n /* * returns a mask of events */ -int -rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) +static int +thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout) { struct pollfd fds[1] = {{ .fd = fd, @@ -4410,14 +4635,17 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) }}; volatile int result = 0; nfds_t nfds; - struct waiting_fd wfd; + struct rb_io_blocking_operation blocking_operation; enum ruby_tag_type state; volatile int lerrno; - rb_execution_context_t *ec = GET_EC(); - rb_thread_t *th = rb_ec_thread_ptr(ec); + RUBY_ASSERT(th); + rb_execution_context_t *ec = th->ec; - thread_io_setup_wfd(th, fd, &wfd); + if (io) { + blocking_operation.ec = ec; + rb_io_blocking_operation_enter(io, &blocking_operation); + } if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) { // fd is readable @@ -4426,25 +4654,30 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) errno = 0; } else { - EC_PUSH_TAG(wfd.th->ec); + EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_hrtime_t *to, rel, end = 0; - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(ec); timeout_prepare(&to, &rel, &end, timeout); do { nfds = numberof(fds); - result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno); + result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno); - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(ec); } while (wait_retryable(&result, lerrno, to, end)); + + RUBY_VM_CHECK_INTS_BLOCKING(ec); } + EC_POP_TAG(); } - thread_io_wake_pending_closer(&wfd); + if (io) { + rb_io_blocking_operation_exit(io, &blocking_operation); + } if (state) { - EC_JUMP_TAG(wfd.th->ec, state); + EC_JUMP_TAG(ec, state); } if (result < 0) { @@ -4477,6 +4710,9 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ struct select_args { + struct rb_io *io; + struct rb_io_blocking_operation *blocking_operation; + union { int fd; int error; @@ -4484,7 +4720,6 @@ struct select_args { rb_fdset_t *read; rb_fdset_t *write; rb_fdset_t *except; - struct waiting_fd wfd; struct timeval *tv; }; @@ -4515,7 +4750,10 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; - thread_io_wake_pending_closer(&args->wfd); + if (args->blocking_operation) { + rb_io_blocking_operation_exit(args->io, args->blocking_operation); + } + if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4535,31 +4773,52 @@ init_set_fd(int fd, rb_fdset_t *fds) return fds; } -int -rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) +static int +thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout) { rb_fdset_t rfds, wfds, efds; struct select_args args; - int r; VALUE ptr = (VALUE)&args; - rb_execution_context_t *ec = GET_EC(); - rb_thread_t *th = rb_ec_thread_ptr(ec); + + struct rb_io_blocking_operation blocking_operation; + if (io) { + args.io = io; + blocking_operation.ec = th->ec; + rb_io_blocking_operation_enter(io, &blocking_operation); + args.blocking_operation = &blocking_operation; + } + else { + args.io = NULL; + blocking_operation.ec = NULL; + args.blocking_operation = NULL; + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; args.tv = timeout; - thread_io_setup_wfd(th, fd, &args.wfd); - r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); - if (r == -1) + int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); + if (result == -1) errno = args.as.error; - return r; + return result; } #endif /* ! USE_POLL */ +int +rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout) +{ + return thread_io_wait(th, NULL, fd, events, timeout); +} + +int +rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout) +{ + return thread_io_wait(th, io, io->fd, events, timeout); +} + /* * for GC */ @@ -4570,7 +4829,7 @@ rb_gc_set_stack_end(VALUE **stack_end_p) { VALUE stack_end; COMPILER_WARNING_PUSH -#if __has_warning("-Wdangling-pointer") +#if RBIMPL_COMPILER_IS(GCC) COMPILER_WARNING_IGNORED(-Wdangling-pointer); #endif *stack_end_p = &stack_end; @@ -4724,9 +4983,13 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r thread_sched_atfork(TH_SCHED(th)); ubf_list_atfork(); + rb_signal_atfork(); // OK. Only this thread accesses: ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { + if (r != vm->ractor.main_ractor) { + rb_ractor_terminate_atfork(vm, r); + } ccan_list_for_each(&r->threads.set, i, lt_node) { atfork(i, th); } @@ -4756,6 +5019,10 @@ static void terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) { if (th != current_th) { + // Clear the scheduler as it is no longer operational: + th->scheduler = Qnil; + + rb_native_mutex_initialize(&th->interrupt_lock); rb_mutex_abandon_keeping_mutexes(th); rb_mutex_abandon_locking_mutex(th); thread_cleanup_func(th, TRUE); @@ -4770,6 +5037,7 @@ rb_thread_atfork(void) rb_threadptr_pending_interrupt_clear(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; + th->scheduler = Qnil; rb_fiber_atfork(th); /* We don't want reproduce CVE-2003-0900. */ @@ -5039,7 +5307,7 @@ rb_thread_shield_owned(VALUE self) rb_mutex_t *m = mutex_ptr(mutex); - return m->fiber == GET_EC()->fiber_ptr; + return m->ec_serial == rb_ec_serial(GET_EC()); } /* @@ -5058,7 +5326,7 @@ rb_thread_shield_wait(VALUE self) if (!mutex) return Qfalse; m = mutex_ptr(mutex); - if (m->fiber == GET_EC()->fiber_ptr) return Qnil; + if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil; rb_thread_shield_waiting_inc(self); rb_mutex_lock(mutex); rb_thread_shield_waiting_dec(self); @@ -5426,7 +5694,6 @@ Init_Thread_Mutex(void) void Init_Thread(void) { - VALUE cThGroup; rb_thread_t *th = GET_THREAD(); sym_never = ID2SYM(rb_intern_const("never")); @@ -5576,8 +5843,8 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, - (void *)mutex->fiber, rb_mutex_num_waiting(mutex)); + rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE, + (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex)); } { @@ -5617,7 +5884,7 @@ rb_check_deadlock(rb_ractor_t *r) } else if (th->locking_mutex) { rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) { + if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) { found = 1; } } @@ -5635,35 +5902,21 @@ rb_check_deadlock(rb_ractor_t *r) } } -// Used for VM memsize reporting. Returns the size of a list of waiting_fd -// structs. Defined here because the struct definition lives here as well. -size_t -rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds) -{ - struct waiting_fd *waitfd = 0; - size_t size = 0; - - ccan_list_for_each(waiting_fds, waitfd, wfd_node) { - size += sizeof(struct waiting_fd); - } - - return size; -} - static void update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg) { const rb_control_frame_t *cfp = GET_EC()->cfp; - VALUE coverage = rb_iseq_coverage(cfp->iseq); + VALUE coverage = rb_iseq_coverage(CFP_ISEQ(cfp)); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES); if (lines) { long line = rb_sourceline() - 1; + VM_ASSERT(line >= 0); long count; VALUE num; void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset); if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) { - rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); + rb_iseq_clear_event_flags(CFP_ISEQ(cfp), CFP_PC(cfp) - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); rb_ary_push(lines, LONG2FIX(line + 1)); return; } @@ -5684,12 +5937,12 @@ static void update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg) { const rb_control_frame_t *cfp = GET_EC()->cfp; - VALUE coverage = rb_iseq_coverage(cfp->iseq); + VALUE coverage = rb_iseq_coverage(CFP_ISEQ(cfp)); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES); if (branches) { - long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1; - long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count; + long pc = CFP_PC(cfp) - ISEQ_BODY(CFP_ISEQ(cfp))->iseq_encoded - 1; + long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(CFP_ISEQ(cfp)), pc)), count; VALUE counters = RARRAY_AREF(branches, 1); VALUE num = RARRAY_AREF(counters, idx); count = FIX2LONG(num) + 1; @@ -6010,7 +6263,7 @@ rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, voi rb_native_mutex_lock(&th->interrupt_lock); { ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node); - threadptr_interrupt_locked(th, true); + threadptr_set_interrupt_locked(th, true); } rb_native_mutex_unlock(&th->interrupt_lock); } @@ -6027,9 +6280,16 @@ threadptr_interrupt_exec_exec(rb_thread_t *th) } rb_native_mutex_unlock(&th->interrupt_lock); + RUBY_DEBUG_LOG("task:%p", task); + if (task) { - (*task->func)(task->data); - ruby_xfree(task); + if (task->flags & rb_interrupt_exec_flag_new_thread) { + rb_thread_create(task->func, task->data); + } + else { + (*task->func)(task->data); + } + SIZED_FREE(task); } else { break; @@ -6045,46 +6305,21 @@ threadptr_interrupt_exec_cleanup(rb_thread_t *th) struct rb_interrupt_exec_task *task; while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) { - ruby_xfree(task); + SIZED_FREE(task); } } rb_native_mutex_unlock(&th->interrupt_lock); } -struct interrupt_ractor_new_thread_data { - rb_interrupt_exec_func_t *func; - void *data; -}; - -static VALUE -interrupt_ractor_new_thread_func(void *data) -{ - struct interrupt_ractor_new_thread_data d = *(struct interrupt_ractor_new_thread_data *)data; - ruby_xfree(data); - - d.func(d.data); - return Qnil; -} - -static VALUE -interrupt_ractor_func(void *data) -{ - rb_thread_create(interrupt_ractor_new_thread_func, data); - return Qnil; -} - // native thread safe // func/data should be native thread safe void rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags) { - struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data); + RUBY_DEBUG_LOG("flags:%d", (int)flags); - d->func = func; - d->data = data; rb_thread_t *main_th = target_r->threads.main; - rb_threadptr_interrupt_exec(main_th, interrupt_ractor_func, d, flags); - - // TODO MEMO: we can create a new thread in a ractor, but not sure how to do that now. + rb_threadptr_interrupt_exec(main_th, func, data, flags | rb_interrupt_exec_flag_new_thread); } + |
