diff options
Diffstat (limited to 'thread.c')
| -rw-r--r-- | thread.c | 4182 |
1 files changed, 2479 insertions, 1703 deletions
@@ -63,24 +63,44 @@ /* for model 2 */ -#include "ruby/config.h" -#include "ruby/io.h" +#include "ruby/internal/config.h" + +#ifdef __linux__ +// Normally, gcc(1) translates calls to alloca() with inlined code. This is not done when either the -ansi, -std=c89, -std=c99, or the -std=c11 option is given and the header <alloca.h> is not included. +# include <alloca.h> +#endif + +#define TH_SCHED(th) (&(th)->ractor->threads.sched) + #include "eval_intern.h" -#include "gc.h" -#include "timev.h" -#include "ruby/thread.h" -#include "ruby/thread_native.h" -#include "ruby/debug.h" +#include "hrtime.h" #include "internal.h" +#include "internal/class.h" +#include "internal/cont.h" +#include "internal/error.h" +#include "internal/eval.h" +#include "internal/gc.h" +#include "internal/hash.h" +#include "internal/io.h" +#include "internal/object.h" +#include "internal/proc.h" +#include "ruby/fiber/scheduler.h" +#include "internal/signal.h" +#include "internal/thread.h" +#include "internal/time.h" +#include "internal/warnings.h" #include "iseq.h" +#include "ruby/debug.h" +#include "ruby/io.h" +#include "ruby/thread.h" +#include "ruby/thread_native.h" +#include "timev.h" #include "vm_core.h" -#include "mjit.h" -#include "hrtime.h" +#include "ractor_core.h" +#include "vm_debug.h" +#include "vm_sync.h" -#ifdef __linux__ -// Normally, gcc(1) translates calls to alloca() with inlined code. This is not done when either the -ansi, -std=c89, -std=c99, or the -std=c11 option is given and the header <alloca.h> is not included. -#include <alloca.h> -#endif +#include "ccan/list/list.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -88,50 +108,49 @@ #define RUBY_THREAD_PRIORITY_MIN -3 #endif -#ifndef THREAD_DEBUG -#define THREAD_DEBUG 0 -#endif - static VALUE rb_cThreadShield; +static VALUE cThGroup; static VALUE sym_immediate; static VALUE sym_on_blocking; static VALUE sym_never; -static ID id_locals; + +static uint32_t thread_default_quantum_ms = 100; + +#define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13 +#define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED) + +static inline VALUE +rb_thread_local_storage(VALUE thread) +{ + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { + rb_ivar_set(thread, idLocals, rb_hash_new()); + RB_FL_SET_RAW(thread, THREAD_LOCAL_STORAGE_INITIALISED); + } + return rb_ivar_get(thread, idLocals); +} enum SLEEP_FLAGS { - SLEEP_DEADLOCKABLE = 0x1, - SLEEP_SPURIOUS_CHECK = 0x2 + SLEEP_DEADLOCKABLE = 0x01, + SLEEP_SPURIOUS_CHECK = 0x02, + SLEEP_ALLOW_SPURIOUS = 0x04, + SLEEP_NO_CHECKINTS = 0x08, }; -static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); static void sleep_forever(rb_thread_t *th, unsigned int fl); -static void rb_thread_sleep_deadly_allow_spurious_wakeup(void); +static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); + +static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end); static int rb_threadptr_dead(rb_thread_t *th); -static void rb_check_deadlock(rb_vm_t *vm); +static void rb_check_deadlock(rb_ractor_t *r); static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); static const char *thread_status_name(rb_thread_t *th, int detail); static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static int consume_communication_pipe(int fd); -static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); -void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ +MAYBE_UNUSED(static int consume_communication_pipe(int fd)); -#define eKillSignal INT2FIX(0) -#define eTerminateSignal INT2FIX(1) -static volatile int system_working = 1; - -struct waiting_fd { - struct list_node wfd_node; /* <=> vm.waiting_fds */ - rb_thread_t *th; - int fd; -}; - -inline static void -st_delete_wrap(st_table *table, st_data_t key) -{ - st_delete(table, &key, 0); -} +static rb_atomic_t system_working = 1; +static rb_internal_thread_specific_key_t specific_key_count; /********************************************************************************/ @@ -145,23 +164,17 @@ static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, vo static void unblock_function_clear(rb_thread_t *th); static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, - rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); + rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); -#define RB_GC_SAVE_MACHINE_CONTEXT(th) \ - do { \ - FLUSH_REGISTER_WINDOWS; \ - setjmp((th)->ec->machine.regs); \ - SET_MACHINE_STACK_END(&(th)->ec->machine.stack_end); \ - } while (0) - -#define GVL_UNLOCK_BEGIN(th) do { \ - RB_GC_SAVE_MACHINE_CONTEXT(th); \ - gvl_release(th->vm); +#define THREAD_BLOCKING_BEGIN(th) do { \ + struct rb_thread_sched * const sched = TH_SCHED(th); \ + RB_VM_SAVE_MACHINE_CONTEXT(th); \ + thread_sched_to_waiting((sched), (th)); -#define GVL_UNLOCK_END(th) \ - gvl_acquire(th->vm, th); \ - rb_thread_set_current(th); \ +#define THREAD_BLOCKING_END(th) \ + thread_sched_to_running((sched), (th)); \ + rb_ractor_thread_switch(th->ractor, th, false); \ } while(0) #ifdef __GNUC__ @@ -176,10 +189,14 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio #define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \ struct rb_blocking_region_buffer __region; \ if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ - /* always return true unless fail_if_interrupted */ \ - !only_if_constant(fail_if_interrupted, TRUE)) { \ - exec; \ - blocking_region_end(th, &__region); \ + /* always return true unless fail_if_interrupted */ \ + !only_if_constant(fail_if_interrupted, TRUE)) { \ + /* Important that this is inlined into the macro, and not part of \ + * blocking_region_begin - see bug #20493 */ \ + RB_VM_SAVE_MACHINE_CONTEXT(th); \ + thread_sched_to_waiting(TH_SCHED(th), th); \ + exec; \ + blocking_region_end(th, &__region); \ }; \ } while(0) @@ -191,22 +208,37 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio static inline int vm_check_ints_blocking(rb_execution_context_t *ec) { +#ifdef RUBY_ASSERT_CRITICAL_SECTION + VM_ASSERT(ruby_assert_critical_section_entered == 0); +#endif + rb_thread_t *th = rb_ec_thread_ptr(ec); if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) { - if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE; + if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE; } else { - th->pending_interrupt_queue_checked = 0; - RUBY_VM_SET_INTERRUPT(ec); + th->pending_interrupt_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(ec); } - return rb_threadptr_execute_interrupts(th, 1); + + int result = rb_threadptr_execute_interrupts(th, 1); + + // When a signal is received, we yield to the scheduler as soon as possible: + if (result || RUBY_VM_INTERRUPTED(ec)) { + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); + if (scheduler != Qnil) { + rb_fiber_scheduler_yield(scheduler); + } + } + + return result; } -static int -vm_living_thread_num(const rb_vm_t *vm) +int +rb_vm_check_ints_blocking(rb_execution_context_t *ec) { - return vm->living_thread_num; + return vm_check_ints_blocking(ec); } /* @@ -239,124 +271,11 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, } } -#if THREAD_DEBUG -#ifdef HAVE_VA_ARGS_MACRO -void rb_thread_debug(const char *file, int line, const char *fmt, ...); -#define thread_debug(...) rb_thread_debug(__FILE__, __LINE__, __VA_ARGS__) -#define POSITION_FORMAT "%s:%d:" -#define POSITION_ARGS ,file, line -#else -void rb_thread_debug(const char *fmt, ...); -#define thread_debug rb_thread_debug -#define POSITION_FORMAT -#define POSITION_ARGS -#endif - -# ifdef NON_SCALAR_THREAD_ID -#define fill_thread_id_string ruby_fill_thread_id_string -const char * -ruby_fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf) -{ - extern const char ruby_digitmap[]; - size_t i; - - buf[0] = '0'; - buf[1] = 'x'; - for (i = 0; i < sizeof(thid); i++) { -# ifdef LITTLE_ENDIAN - size_t j = sizeof(thid) - i - 1; -# else - size_t j = i; -# endif - unsigned char c = (unsigned char)((char *)&thid)[j]; - buf[2 + i * 2] = ruby_digitmap[(c >> 4) & 0xf]; - buf[3 + i * 2] = ruby_digitmap[c & 0xf]; - } - buf[sizeof(rb_thread_id_string_t)-1] = '\0'; - return buf; -} -# define fill_thread_id_str(th) fill_thread_id_string((th)->thread_id, (th)->thread_id_string) -# define thread_id_str(th) ((th)->thread_id_string) -# define PRI_THREAD_ID "s" -# endif - -# if THREAD_DEBUG < 0 -static int rb_thread_debug_enabled; - -/* - * call-seq: - * Thread.DEBUG -> num - * - * Returns the thread debug level. Available only if compiled with - * THREAD_DEBUG=-1. - */ - -static VALUE -rb_thread_s_debug(void) -{ - return INT2NUM(rb_thread_debug_enabled); -} - -/* - * call-seq: - * Thread.DEBUG = num - * - * Sets the thread debug level. Available only if compiled with - * THREAD_DEBUG=-1. - */ - -static VALUE -rb_thread_s_debug_set(VALUE self, VALUE val) -{ - rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0; - return val; -} -# else -# define rb_thread_debug_enabled THREAD_DEBUG -# endif -#else -#define thread_debug if(0)printf -#endif - -#ifndef fill_thread_id_str -# define fill_thread_id_string(thid, buf) ((void *)(uintptr_t)(thid)) -# define fill_thread_id_str(th) (void)0 -# define thread_id_str(th) ((void *)(uintptr_t)(th)->thread_id) -# define PRI_THREAD_ID "p" -#endif - -NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)); -static void timer_thread_function(void); -void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ - -static void -ubf_sigwait(void *ignore) -{ - rb_thread_wakeup_timer_thread(0); -} - -#if defined(_WIN32) -#include "thread_win32.c" +MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start))); +MAYBE_UNUSED(static bool th_has_dedicated_nt(const rb_thread_t *th)); +MAYBE_UNUSED(static int waitfd_to_waiting_flag(int wfd_event)); -#define DEBUG_OUT() \ - WaitForSingleObject(&debug_mutex, INFINITE); \ - printf(POSITION_FORMAT"%#lx - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \ - fflush(stdout); \ - ReleaseMutex(&debug_mutex); - -#elif defined(HAVE_PTHREAD_H) -#include "thread_pthread.c" - -#define DEBUG_OUT() \ - pthread_mutex_lock(&debug_mutex); \ - printf(POSITION_FORMAT"%"PRI_THREAD_ID" - %s" POSITION_ARGS, \ - fill_thread_id_string(pthread_self(), thread_id_string), buf); \ - fflush(stdout); \ - pthread_mutex_unlock(&debug_mutex); - -#else -#error "unsupported thread type" -#endif +#include THREAD_IMPL_SRC /* * TODO: somebody with win32 knowledge should be able to get rid of @@ -371,53 +290,9 @@ ubf_sigwait(void *ignore) # define USE_EVENTFD (0) #endif -#if THREAD_DEBUG -static int debug_mutex_initialized = 1; -static rb_nativethread_lock_t debug_mutex; - -void -rb_thread_debug( -#ifdef HAVE_VA_ARGS_MACRO - const char *file, int line, -#endif - const char *fmt, ...) -{ - va_list args; - char buf[BUFSIZ]; -#ifdef NON_SCALAR_THREAD_ID - rb_thread_id_string_t thread_id_string; -#endif - - if (!rb_thread_debug_enabled) return; - - if (debug_mutex_initialized == 1) { - debug_mutex_initialized = 0; - rb_native_mutex_initialize(&debug_mutex); - } - - va_start(args, fmt); - vsnprintf(buf, BUFSIZ, fmt, args); - va_end(args); - - DEBUG_OUT(); -} -#endif - #include "thread_sync.c" void -rb_vm_gvl_destroy(rb_vm_t *vm) -{ - gvl_release(vm); - gvl_destroy(vm); - if (0) { - /* may be held by running threads */ - rb_native_mutex_destroy(&vm->waitpid_lock); - rb_native_mutex_destroy(&vm->workqueue_lock); - } -} - -void rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock) { rb_native_mutex_initialize(lock); @@ -445,14 +320,14 @@ static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted) { do { - if (fail_if_interrupted) { - if (RUBY_VM_INTERRUPTED_ANY(th->ec)) { - return FALSE; - } - } - else { - RUBY_VM_CHECK_INTS(th->ec); - } + if (fail_if_interrupted) { + if (RUBY_VM_INTERRUPTED_ANY(th->ec)) { + return FALSE; + } + } + else { + RUBY_VM_CHECK_INTS(th->ec); + } rb_native_mutex_lock(&th->interrupt_lock); } while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) && @@ -471,89 +346,129 @@ static void unblock_function_clear(rb_thread_t *th) { rb_native_mutex_lock(&th->interrupt_lock); - th->unblock.func = NULL; + th->unblock.func = 0; rb_native_mutex_unlock(&th->interrupt_lock); } static void -rb_threadptr_interrupt_common(rb_thread_t *th, int trap) +threadptr_set_interrupt_locked(rb_thread_t *th, bool trap) { - rb_native_mutex_lock(&th->interrupt_lock); + // th->interrupt_lock should be acquired here + + RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap); + if (trap) { - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); } else { - RUBY_VM_SET_INTERRUPT(th->ec); + RUBY_VM_SET_INTERRUPT(th->ec); } + if (th->unblock.func != NULL) { - (th->unblock.func)(th->unblock.arg); + (th->unblock.func)(th->unblock.arg); } else { - /* none */ + /* none */ + } +} + +static void +threadptr_set_interrupt(rb_thread_t *th, int trap) +{ + rb_native_mutex_lock(&th->interrupt_lock); + { + threadptr_set_interrupt_locked(th, trap); } rb_native_mutex_unlock(&th->interrupt_lock); } +/* Set interrupt flag on another thread or current thread, and call its UBF if it has one set */ void rb_threadptr_interrupt(rb_thread_t *th) { - rb_threadptr_interrupt_common(th, 0); + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + threadptr_set_interrupt(th, false); } static void threadptr_trap_interrupt(rb_thread_t *th) { - rb_threadptr_interrupt_common(th, 1); + threadptr_set_interrupt(th, true); } static void -terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread) +terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) { rb_thread_t *th = 0; - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th != main_thread) { - thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(th), thread_status_name(th, TRUE)); - rb_threadptr_pending_interrupt_enque(th, eTerminateSignal); - rb_threadptr_interrupt(th); - thread_debug("terminate_all: end (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(th), thread_status_name(th, TRUE)); - } - else { - thread_debug("terminate_all: main thread (%p)\n", (void *)th); - } + ccan_list_for_each(&r->threads.set, th, lt_node) { + if (th != main_thread) { + RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE)); + + rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED); + rb_threadptr_interrupt(th); + + RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE)); + } + else { + RUBY_DEBUG_LOG("main thread th:%u", rb_th_serial(th)); + } + } +} + +static void +rb_threadptr_join_list_wakeup(rb_thread_t *thread) +{ + while (thread->join_list) { + struct rb_waiting_list *join_list = thread->join_list; + + // Consume the entry from the join list: + thread->join_list = join_list->next; + + rb_thread_t *target_thread = join_list->thread; + + if (target_thread->scheduler != Qnil && join_list->fiber) { + rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber)); + } + else { + rb_threadptr_interrupt(target_thread); + + switch (target_thread->status) { + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + target_thread->status = THREAD_RUNNABLE; + break; + default: + break; + } + } } } void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) { - const char *err; - rb_mutex_t *mutex; - rb_mutex_t *mutexes = th->keeping_mutexes; + while (th->keeping_mutexes) { + rb_mutex_t *mutex = th->keeping_mutexes; + th->keeping_mutexes = mutex->next_mutex; - while (mutexes) { - mutex = mutexes; - /* rb_warn("mutex #<%p> remains to be locked by terminated thread", - (void *)mutexes); */ - mutexes = mutex->next_mutex; - err = rb_mutex_unlock_th(mutex, th); - if (err) rb_bug("invalid keeping_mutexes: %s", err); + // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th); + VM_ASSERT(mutex->ec_serial); + const char *error_message = rb_mutex_unlock_th(mutex, th, 0); + if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message); } } void -rb_thread_terminate_all(void) +rb_thread_terminate_all(rb_thread_t *th) { - rb_thread_t *volatile th = GET_THREAD(); /* main thread */ + rb_ractor_t *cr = th->ractor; rb_execution_context_t * volatile ec = th->ec; - rb_vm_t *volatile vm = th->vm; volatile int sleeping = 0; - if (vm->main_thread != th) { - rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", - (void *)vm->main_thread, (void *)th); + if (cr->threads.main != th) { + rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", + (void *)cr->threads.main, (void *)th); } /* unlock all locking mutexes */ @@ -562,45 +477,49 @@ rb_thread_terminate_all(void) EC_PUSH_TAG(ec); if (EC_EXEC_TAG() == TAG_NONE) { retry: - thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); - terminate_all(vm, th); + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); - while (vm_living_thread_num(vm) > 1) { + terminate_all(cr, th); + + while (rb_ractor_living_thread_num(cr) > 1) { rb_hrtime_t rel = RB_HRTIME_PER_SEC; - /* - * Thread exiting routine in thread_start_func_2 notify - * me when the last sub-thread exit. - */ - sleeping = 1; - native_sleep(th, &rel); - RUBY_VM_CHECK_INTS_BLOCKING(ec); - sleeping = 0; - } + /*q + * Thread exiting routine in thread_start_func_2 notify + * me when the last sub-thread exit. + */ + sleeping = 1; + native_sleep(th, &rel); + RUBY_VM_CHECK_INTS_BLOCKING(ec); + sleeping = 0; + } } else { - /* - * When caught an exception (e.g. Ctrl+C), let's broadcast - * kill request again to ensure killing all threads even - * if they are blocked on sleep, mutex, etc. - */ - if (sleeping) { - sleeping = 0; - goto retry; - } + /* + * When caught an exception (e.g. Ctrl+C), let's broadcast + * kill request again to ensure killing all threads even + * if they are blocked on sleep, mutex, etc. + */ + if (sleeping) { + sleeping = 0; + goto retry; + } } EC_POP_TAG(); } void rb_threadptr_root_fiber_terminate(rb_thread_t *th); +static void threadptr_interrupt_exec_cleanup(rb_thread_t *th); static void thread_cleanup_func_before_exec(void *th_ptr) { rb_thread_t *th = th_ptr; th->status = THREAD_KILLED; + // The thread stack doesn't exist in the forked process: th->ec->machine.stack_start = th->ec->machine.stack_end = NULL; + threadptr_interrupt_exec_cleanup(th); rb_threadptr_root_fiber_terminate(th); } @@ -612,29 +531,31 @@ thread_cleanup_func(void *th_ptr, int atfork) th->locking_mutex = Qfalse; thread_cleanup_func_before_exec(th_ptr); - /* - * Unfortunately, we can't release native threading resource at fork - * because libc may have unstable locking state therefore touching - * a threading resource may cause a deadlock. - * - * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe - * with NPTL, but native_thread_destroy calls pthread_cond_destroy - * which calls free(3), so there is a small memory leak atfork, here. - */ - if (atfork) - return; + if (atfork) { + native_thread_destroy_atfork(th->nt); + th->nt = NULL; + return; + } rb_native_mutex_destroy(&th->interrupt_lock); - native_thread_destroy(th); +} + +void +rb_thread_free_native_thread(void *th_ptr) +{ + rb_thread_t *th = th_ptr; + + native_thread_destroy_atfork(th->nt); + th->nt = NULL; } static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); static VALUE rb_thread_to_s(VALUE thread); void -ruby_thread_init_stack(rb_thread_t *th) +ruby_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame) { - native_thread_init_stack(th); + native_thread_init_stack(th, local_in_parent_frame); } const VALUE * @@ -643,232 +564,351 @@ rb_vm_proc_local_ep(VALUE proc) const VALUE *ep = vm_proc_ep(proc); if (ep) { - return rb_vm_ep_local_ep(ep); + return rb_vm_ep_local_ep(ep); } else { - return NULL; + return NULL; } } -static void -thread_do_start(rb_thread_t *th) -{ - native_set_thread_name(th); - - if (th->invoke_type == thread_invoke_type_proc) { - VALUE args = th->invoke_arg.proc.args; - long args_len = RARRAY_LEN(args); - const VALUE *args_ptr; - VALUE procval = th->invoke_arg.proc.proc; - rb_proc_t *proc; - GetProcPtr(procval, proc); - - th->ec->errinfo = Qnil; - th->ec->root_lep = rb_vm_proc_local_ep(procval); - th->ec->root_svar = Qfalse; - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); +// for ractor, defined in vm.c +VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self, + int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler); +static VALUE +thread_do_start_proc(rb_thread_t *th) +{ + VALUE args = th->invoke_arg.proc.args; + const VALUE *args_ptr; + int args_len; + VALUE procval = th->invoke_arg.proc.proc; + rb_proc_t *proc; + GetProcPtr(procval, proc); + + th->ec->errinfo = Qnil; + th->ec->root_lep = rb_vm_proc_local_ep(procval); + th->ec->root_svar = Qfalse; + + vm_check_ints_blocking(th->ec); + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + VALUE self = rb_ractor_self(th->ractor); + th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup); + + VM_ASSERT(FIXNUM_P(args)); + args_len = FIX2INT(args); + args_ptr = ALLOCA_N(VALUE, args_len); + rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr); + vm_check_ints_blocking(th->ec); + + return rb_vm_invoke_proc_with_self( + th->ec, proc, self, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); + } + else { + args_len = RARRAY_LENINT(args); if (args_len < 8) { /* free proc.args if the length is enough small */ args_ptr = ALLOCA_N(VALUE, args_len); - MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len); + MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR(args), VALUE, args_len); th->invoke_arg.proc.args = Qnil; } else { args_ptr = RARRAY_CONST_PTR(args); } - th->value = rb_vm_invoke_proc(th->ec, proc, - (int)args_len, args_ptr, - VM_BLOCK_HANDLER_NONE); + vm_check_ints_blocking(th->ec); - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); + return rb_vm_invoke_proc( + th->ec, proc, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); } - else { - th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); +} + +static VALUE +thread_do_start(rb_thread_t *th) +{ + native_set_thread_name(th); + VALUE result = Qundef; + + switch (th->invoke_type) { + case thread_invoke_type_proc: + result = thread_do_start_proc(th); + break; + + case thread_invoke_type_ractor_proc: + result = thread_do_start_proc(th); + rb_ractor_atexit(th->ec, result); + break; + + case thread_invoke_type_func: + result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); + break; + + case thread_invoke_type_none: + rb_bug("unreachable"); } + + return result; } void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); -rb_control_frame_t * -rb_vm_push_frame(rb_execution_context_t *sec, - const rb_iseq_t *iseq, - VALUE type, - VALUE self, - VALUE specval, - VALUE cref_or_me, - const VALUE *pc, - VALUE *sp, - int local_size, - int stack_max); static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start) { - STACK_GROW_DIR_DETECTION; + RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + VM_ASSERT(th != th->vm->ractor.main_thread); + enum ruby_tag_type state; - rb_thread_list_t *join_list; - rb_thread_t *main_th; VALUE errinfo = Qnil; - size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); - VALUE * vm_stack = NULL; + rb_thread_t *ractor_main_th = th->ractor->threads.main; + + // setup ractor + if (rb_ractor_status_p(th->ractor, ractor_blocking)) { + RB_VM_LOCK(); + { + rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__); + rb_ractor_t *r = th->ractor; + r->r_stdin = rb_io_prep_stdin(); + r->r_stdout = rb_io_prep_stdout(); + r->r_stderr = rb_io_prep_stderr(); + } + RB_VM_UNLOCK(); + } + + // Ensure that we are not joinable. + VM_ASSERT(UNDEF_P(th->value)); + + int fiber_scheduler_closed = 0, event_thread_end_hooked = 0; + VALUE result = Qundef; + + EC_PUSH_TAG(th->ec); + + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); - if (th == th->vm->main_thread) { - rb_bug("thread_start_func_2 must not be used for main thread"); + result = thread_do_start(th); } - thread_debug("thread start: %p\n", (void *)th); - VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize); + if (!fiber_scheduler_closed) { + fiber_scheduler_closed = 1; + rb_fiber_scheduler_set(Qnil); + } - vm_stack = alloca(size * sizeof(VALUE)); - VM_ASSERT(vm_stack); + if (!event_thread_end_hooked) { + event_thread_end_hooked = 1; + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); + } - gvl_acquire(th->vm, th); + if (state == TAG_NONE) { + // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed. + th->value = result; + } + else { + errinfo = th->ec->errinfo; - rb_ec_initialize_vm_stack(th->ec, vm_stack, size); - th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack); - th->ec->machine.stack_maxsize -= size * sizeof(VALUE); + VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef); + if (!NIL_P(exc)) errinfo = exc; - ruby_thread_set_native(th); + if (state == TAG_FATAL) { + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit(th->ec, Qnil); + } + /* fatal error within this thread, need to stop whole script */ + } + else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit_exception(th->ec); + } - { - thread_debug("thread start (get lock): %p\n", (void *)th); - rb_thread_set_current(th); - - EC_PUSH_TAG(th->ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - SAVE_ROOT_JMPBUF(th, thread_do_start(th)); - } - else { - errinfo = th->ec->errinfo; - if (state == TAG_FATAL) { - /* fatal error within this thread, need to stop whole script */ - } - else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { - /* exit on main_thread. */ - } - else { - if (th->report_on_exception) { - VALUE mesg = rb_thread_to_s(th->self); - rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n"); - rb_write_error_str(mesg); - rb_ec_error_print(th->ec, errinfo); - } - if (th->vm->thread_abort_on_exception || - th->abort_on_exception || RTEST(ruby_debug)) { - /* exit on main_thread */ - } - else { - errinfo = Qnil; - } - } - th->value = Qnil; - } - - th->status = THREAD_KILLED; - thread_debug("thread end: %p\n", (void *)th); - - main_th = th->vm->main_thread; - if (main_th == th) { - ruby_stop(0); - } - if (RB_TYPE_P(errinfo, T_OBJECT)) { - /* treat with normal error object */ - rb_threadptr_raise(main_th, 1, &errinfo); - } - EC_POP_TAG(); - - rb_ec_clear_current_thread_trace_func(th->ec); - - /* locking_mutex must be Qfalse */ - if (th->locking_mutex != Qfalse) { - rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", - (void *)th, th->locking_mutex); - } - - /* delete self other than main thread from living_threads */ - rb_vm_living_threads_remove(th->vm, th); - if (main_th->status == THREAD_KILLED && rb_thread_alone()) { - /* I'm last thread. wake up main thread from rb_thread_terminate_all */ - rb_threadptr_interrupt(main_th); - } - - /* wake up joining threads */ - join_list = th->join_list; - while (join_list) { - rb_threadptr_interrupt(join_list->th); - switch (join_list->th->status) { - case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: - join_list->th->status = THREAD_RUNNABLE; - default: break; - } - join_list = join_list->next; - } - - rb_threadptr_unlock_all_locking_mutexes(th); - rb_check_deadlock(th->vm); - - rb_fiber_close(th->ec->fiber_ptr); + /* exit on main_thread. */ + } + else { + if (th->report_on_exception) { + VALUE mesg = rb_thread_to_s(th->self); + rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n"); + rb_write_error_str(mesg); + rb_ec_error_print(th->ec, errinfo); + } + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit_exception(th->ec); + } + + if (th->vm->thread_abort_on_exception || + th->abort_on_exception || RTEST(ruby_debug)) { + /* exit on main_thread */ + } + else { + errinfo = Qnil; + } + } + th->value = Qnil; + } + + // The thread is effectively finished and can be joined. + VM_ASSERT(!UNDEF_P(th->value)); + + rb_threadptr_join_list_wakeup(th); + rb_threadptr_unlock_all_locking_mutexes(th); + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_thread_terminate_all(th); + rb_ractor_teardown(th->ec); } + th->status = THREAD_KILLED; + RUBY_DEBUG_LOG("killed th:%u", rb_th_serial(th)); + + if (th->vm->ractor.main_thread == th) { + ruby_stop(0); + } + + if (RB_TYPE_P(errinfo, T_OBJECT)) { + /* treat with normal error object */ + rb_threadptr_raise(ractor_main_th, 1, &errinfo); + } + + EC_POP_TAG(); + + rb_ec_clear_current_thread_trace_func(th->ec); + + /* locking_mutex must be Qfalse */ + if (th->locking_mutex != Qfalse) { + rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", + (void *)th, th->locking_mutex); + } + + if (ractor_main_th->status == THREAD_KILLED && + th->ractor->threads.cnt <= 2 /* main thread and this thread */) { + /* I'm last thread. wake up main thread from rb_thread_terminate_all */ + rb_threadptr_interrupt(ractor_main_th); + } + + rb_check_deadlock(th->ractor); + + rb_fiber_close(th->ec->fiber_ptr); + thread_cleanup_func(th, FALSE); VM_ASSERT(th->ec->vm_stack == NULL); - gvl_release(th->vm); + if (th->invoke_type == thread_invoke_type_ractor_proc) { + // after rb_ractor_living_threads_remove() + // GC will happen anytime and this ractor can be collected (and destroy GVL). + // So gvl_release() should be before it. + thread_sched_to_dead(TH_SCHED(th), th); + rb_ractor_living_threads_remove(th->ractor, th); + } + else { + rb_ractor_living_threads_remove(th->ractor, th); + thread_sched_to_dead(TH_SCHED(th), th); + } return 0; } +struct thread_create_params { + enum thread_invoke_type type; + + // for normal proc thread + VALUE args; + VALUE proc; + + // for ractor + rb_ractor_t *g; + + // for func + VALUE (*fn)(void *); +}; + +static void thread_specific_storage_alloc(rb_thread_t *th); + static VALUE -thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) +thread_create_core(VALUE thval, struct thread_create_params *params) { - rb_thread_t *th = rb_thread_ptr(thval), *current_th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec); int err; + thread_specific_storage_alloc(th); + if (OBJ_FROZEN(current_th->thgroup)) { - rb_raise(rb_eThreadError, - "can't start a new thread (frozen ThreadGroup)"); + rb_raise(rb_eThreadError, + "can't start a new thread (frozen ThreadGroup)"); } - if (fn) { - th->invoke_type = thread_invoke_type_func; - th->invoke_arg.func.func = fn; - th->invoke_arg.func.arg = (void *)args; - } - else { + rb_fiber_inherit_storage(ec, th->ec->fiber_ptr); + + switch (params->type) { + case thread_invoke_type_proc: th->invoke_type = thread_invoke_type_proc; - th->invoke_arg.proc.proc = rb_block_proc(); - th->invoke_arg.proc.args = args; + th->invoke_arg.proc.args = params->args; + th->invoke_arg.proc.proc = params->proc; + th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); + break; + + case thread_invoke_type_ractor_proc: +#if RACTOR_CHECK_MODE > 0 + rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g)); +#endif + th->invoke_type = thread_invoke_type_ractor_proc; + th->ractor = params->g; + th->ec->ractor_id = rb_ractor_id(th->ractor); + th->ractor->threads.main = th; + th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil); + th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args)); + th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); + rb_ractor_send_parameters(ec, params->g, params->args); + break; + + case thread_invoke_type_func: + th->invoke_type = thread_invoke_type_func; + th->invoke_arg.func.func = params->fn; + th->invoke_arg.func.arg = (void *)params->args; + break; + + default: + rb_bug("unreachable"); } th->priority = current_th->priority; th->thgroup = current_th->thgroup; - th->pending_interrupt_queue = rb_ary_tmp_new(0); + th->pending_interrupt_queue = rb_ary_hidden_new(0); th->pending_interrupt_queue_checked = 0; th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack); RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack); rb_native_mutex_initialize(&th->interrupt_lock); + RUBY_DEBUG_LOG("r:%u th:%u", rb_ractor_id(th->ractor), rb_th_serial(th)); + + rb_ractor_living_threads_insert(th->ractor, th); + /* kick thread */ err = native_thread_create(th); if (err) { - th->status = THREAD_KILLED; - rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err)); + th->status = THREAD_KILLED; + rb_ractor_living_threads_remove(th->ractor, th); + rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err)); } - rb_vm_living_threads_insert(th->vm, th); return thval; } #define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none) /* - * call-seq: - * Thread.new { ... } -> thread - * Thread.new(*args, &proc) -> thread - * Thread.new(*args) { |args| ... } -> thread + * call-seq: + * Thread.new { ... } -> thread + * Thread.new(*args, &proc) -> thread + * Thread.new(*args) { |args| ... } -> thread * * Creates a new thread executing the given block. * @@ -890,14 +930,15 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) rb_thread_t *th; VALUE thread = rb_thread_alloc(klass); - if (GET_VM()->main_thread->status == THREAD_KILLED) - rb_raise(rb_eThreadError, "can't alloc thread"); + if (GET_RACTOR()->threads.main->status == THREAD_KILLED) { + rb_raise(rb_eThreadError, "can't alloc thread"); + } - rb_obj_call_init(thread, argc, argv); + rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS); th = rb_thread_ptr(thread); if (!threadptr_initialized(th)) { - rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'", - klass); + rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'", + klass); } return thread; } @@ -915,7 +956,12 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) static VALUE thread_start(VALUE klass, VALUE args) { - return thread_create_core(rb_thread_alloc(klass), args, 0); + struct thread_create_params params = { + .type = thread_invoke_type_proc, + .args = args, + .proc = rb_block_proc(), + }; + return thread_create_core(rb_thread_alloc(klass), ¶ms); } static VALUE @@ -950,19 +996,43 @@ thread_initialize(VALUE thread, VALUE args) } } else { - return thread_create_core(thread, args, NULL); + struct thread_create_params params = { + .type = thread_invoke_type_proc, + .args = args, + .proc = rb_block_proc(), + }; + return thread_create_core(thread, ¶ms); } } VALUE -rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) +rb_thread_create(VALUE (*fn)(void *), void *arg) { - return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); + struct thread_create_params params = { + .type = thread_invoke_type_func, + .fn = fn, + .args = (VALUE)arg, + }; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); +} + +VALUE +rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc) +{ + struct thread_create_params params = { + .type = thread_invoke_type_ractor_proc, + .g = r, + .args = args, + .proc = proc, + }; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); } struct join_arg { - rb_thread_t *target, *waiting; + struct rb_waiting_list *waiter; + rb_thread_t *target; + VALUE timeout; rb_hrtime_t *limit; }; @@ -970,121 +1040,142 @@ static VALUE remove_from_join_list(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; + rb_thread_t *target_thread = p->target; - if (target_th->status != THREAD_KILLED) { - rb_thread_list_t **p = &target_th->join_list; + if (target_thread->status != THREAD_KILLED) { + struct rb_waiting_list **join_list = &target_thread->join_list; - while (*p) { - if ((*p)->th == th) { - *p = (*p)->next; - break; - } - p = &(*p)->next; - } + while (*join_list) { + if (*join_list == p->waiter) { + *join_list = (*join_list)->next; + break; + } + + join_list = &(*join_list)->next; + } } return Qnil; } +static int +thread_finished(rb_thread_t *th) +{ + return th->status == THREAD_KILLED || !UNDEF_P(th->value); +} + static VALUE thread_join_sleep(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; - rb_hrtime_t end = 0; - - if (p->limit) { - end = rb_hrtime_add(*p->limit, rb_hrtime_now()); - } - - while (target_th->status != THREAD_KILLED) { - if (!p->limit) { - th->status = THREAD_STOPPED_FOREVER; - th->vm->sleeper++; - rb_check_deadlock(th->vm); - native_sleep(th, 0); - th->vm->sleeper--; - } - else { - if (hrtime_update_expire(p->limit, end)) { - thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); - return Qfalse; - } - th->status = THREAD_STOPPED; - native_sleep(th, p->limit); - } - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = THREAD_RUNNABLE; - thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + rb_thread_t *target_th = p->target, *th = p->waiter->thread; + rb_hrtime_t end = 0, *limit = p->limit; + + if (limit) { + end = rb_hrtime_add(*limit, rb_hrtime_now()); } + + while (!thread_finished(target_th)) { + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); + + if (!limit) { + if (scheduler != Qnil) { + rb_fiber_scheduler_block(scheduler, target_th->self, Qnil); + } + else { + sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS); + } + } + else { + if (hrtime_update_expire(limit, end)) { + RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th)); + return Qfalse; + } + + if (scheduler != Qnil) { + VALUE timeout = rb_float_new(hrtime2double(*limit)); + rb_fiber_scheduler_block(scheduler, target_th->self, timeout); + } + else { + th->status = THREAD_STOPPED; + native_sleep(th, limit); + } + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = THREAD_RUNNABLE; + + RUBY_DEBUG_LOG("interrupted target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE)); + } + return Qtrue; } static VALUE -thread_join(rb_thread_t *target_th, rb_hrtime_t *rel) +thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit) { - rb_thread_t *th = GET_THREAD(); - struct join_arg arg; + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = ec->thread_ptr; + rb_fiber_t *fiber = ec->fiber_ptr; if (th == target_th) { - rb_raise(rb_eThreadError, "Target thread must not be current thread"); - } - if (GET_VM()->main_thread == target_th) { - rb_raise(rb_eThreadError, "Target thread must not be main thread"); + rb_raise(rb_eThreadError, "Target thread must not be current thread"); } - arg.target = target_th; - arg.waiting = th; - arg.limit = rel; + if (th->ractor->threads.main == target_th) { + rb_raise(rb_eThreadError, "Target thread must not be main thread"); + } - thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + RUBY_DEBUG_LOG("target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE)); if (target_th->status != THREAD_KILLED) { - rb_thread_list_t list; - list.next = target_th->join_list; - list.th = th; - target_th->join_list = &list; - if (!rb_ensure(thread_join_sleep, (VALUE)&arg, - remove_from_join_list, (VALUE)&arg)) { - return Qnil; - } + struct rb_waiting_list waiter; + waiter.next = target_th->join_list; + waiter.thread = th; + waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber; + target_th->join_list = &waiter; + + struct join_arg arg; + arg.waiter = &waiter; + arg.target = target_th; + arg.timeout = timeout; + arg.limit = limit; + + if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) { + return Qnil; + } } - thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + RUBY_DEBUG_LOG("success target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE)); if (target_th->ec->errinfo != Qnil) { - VALUE err = target_th->ec->errinfo; - - if (FIXNUM_P(err)) { - switch (err) { - case INT2FIX(TAG_FATAL): - thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); - - /* OK. killed. */ - break; - default: - rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); - } - } - else if (THROW_DATA_P(target_th->ec->errinfo)) { - rb_bug("thread_join: THROW_DATA should not reach here."); - } - else { - /* normal exception */ - rb_exc_raise(err); - } + VALUE err = target_th->ec->errinfo; + + if (FIXNUM_P(err)) { + switch (err) { + case INT2FIX(TAG_FATAL): + RUBY_DEBUG_LOG("terminated target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE)); + + /* OK. killed. */ + break; + default: + if (err == RUBY_FATAL_FIBER_KILLED) { // not integer constant so can't be a case expression + // root fiber killed in non-main thread + break; + } + rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); + } + } + else if (THROW_DATA_P(target_th->ec->errinfo)) { + rb_bug("thread_join: THROW_DATA should not reach here."); + } + else { + /* normal exception */ + rb_exc_raise(err); + } } return target_th->self; } -static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); - /* * call-seq: * thr.join -> thr @@ -1127,25 +1218,30 @@ static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { - VALUE limit; - rb_hrtime_t rel, *to = 0; + VALUE timeout = Qnil; + rb_hrtime_t rel = 0, *limit = 0; + if (rb_check_arity(argc, 0, 1)) { + timeout = argv[0]; + } + + // Convert the timeout eagerly, so it's always converted and deterministic /* * This supports INFINITY and negative values, so we can't use * rb_time_interval right now... */ - if (!rb_check_arity(argc, 0, 1) || NIL_P(argv[0])) { + if (NIL_P(timeout)) { /* unlimited */ } - else if (FIXNUM_P(limit = argv[0])) { - rel = rb_sec2hrtime(NUM2TIMET(limit)); - to = &rel; + else if (FIXNUM_P(timeout)) { + rel = rb_sec2hrtime(NUM2TIMET(timeout)); + limit = &rel; } else { - to = double2hrtime(&rel, rb_num2dbl(limit)); + limit = double2hrtime(&rel, rb_num2dbl(timeout)); } - return thread_join(rb_thread_ptr(self), to); + return thread_join(rb_thread_ptr(self), timeout, limit); } /* @@ -1166,7 +1262,11 @@ static VALUE thread_value(VALUE self) { rb_thread_t *th = rb_thread_ptr(self); - thread_join(th, 0); + thread_join(th, Qnil, 0); + if (UNDEF_P(th->value)) { + // If the thread is dead because we forked th->value is still Qundef. + return Qnil; + } return th->value; } @@ -1174,34 +1274,6 @@ thread_value(VALUE self) * Thread Scheduling */ -/* - * Back when we used "struct timeval", not all platforms implemented - * tv_sec as time_t. Nowadays we use "struct timespec" and tv_sec - * seems to be implemented more consistently across platforms. - * At least other parts of our code hasn't had to deal with non-time_t - * tv_sec in timespec... - */ -#define TIMESPEC_SEC_MAX TIMET_MAX -#define TIMESPEC_SEC_MIN TIMET_MIN - -static rb_hrtime_t * -double2hrtime(rb_hrtime_t *hrt, double d) -{ - /* assume timespec.tv_sec has same signedness as time_t */ - const double TIMESPEC_SEC_MAX_PLUS_ONE = TIMET_MAX_PLUS_ONE; - - if (TIMESPEC_SEC_MAX_PLUS_ONE <= d) { - return NULL; - } - else if (d <= 0) { - *hrt = 0; - } - else { - *hrt = (rb_hrtime_t)(d * (double)RB_HRTIME_PER_SEC); - } - return hrt; -} - static void getclockofday(struct timespec *ts) { @@ -1226,32 +1298,6 @@ rb_hrtime_now(void) return rb_timespec2hrtime(&ts); } -static void -sleep_forever(rb_thread_t *th, unsigned int fl) -{ - enum rb_thread_status prev_status = th->status; - enum rb_thread_status status; - int woke; - - status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; - th->status = status; - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - while (th->status == status) { - if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper++; - rb_check_deadlock(th->vm); - } - native_sleep(th, 0); - if (fl & SLEEP_DEADLOCKABLE) { - th->vm->sleeper--; - } - woke = vm_check_ints_blocking(th->ec); - if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) - break; - } - th->status = prev_status; -} - /* * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end" * being uninitialized, maybe other versions, too. @@ -1274,15 +1320,15 @@ hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end) rb_hrtime_t now = rb_hrtime_now(); if (now > end) return 1; - thread_debug("hrtime_update_expire: " - "%"PRIu64" > %"PRIu64"\n", - (uint64_t)end, (uint64_t)now); + + RUBY_DEBUG_LOG("%"PRIu64" > %"PRIu64"", (uint64_t)end, (uint64_t)now); + *timeout = end - now; return 0; } COMPILER_WARNING_POP -static void +static int sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) { enum rb_thread_status prev_status = th->status; @@ -1292,12 +1338,72 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) th->status = THREAD_STOPPED; RUBY_VM_CHECK_INTS_BLOCKING(th->ec); while (th->status == THREAD_STOPPED) { - native_sleep(th, &rel); - woke = vm_check_ints_blocking(th->ec); - if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) - break; - if (hrtime_update_expire(&rel, end)) - break; + native_sleep(th, &rel); + woke = vm_check_ints_blocking(th->ec); + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) + break; + if (hrtime_update_expire(&rel, end)) + break; + woke = 1; + } + th->status = prev_status; + return woke; +} + +static int +sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl) +{ + enum rb_thread_status prev_status = th->status; + int woke; + rb_hrtime_t rel = rb_hrtime_sub(end, rb_hrtime_now()); + + th->status = THREAD_STOPPED; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + while (th->status == THREAD_STOPPED) { + native_sleep(th, &rel); + woke = vm_check_ints_blocking(th->ec); + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) + break; + if (hrtime_update_expire(&rel, end)) + break; + woke = 1; + } + th->status = prev_status; + return woke; +} + +static void +sleep_forever(rb_thread_t *th, unsigned int fl) +{ + enum rb_thread_status prev_status = th->status; + enum rb_thread_status status; + int woke; + + status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; + th->status = status; + + if (!(fl & SLEEP_NO_CHECKINTS)) RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + + while (th->status == status) { + if (fl & SLEEP_DEADLOCKABLE) { + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); + } + { + native_sleep(th, 0); + } + if (fl & SLEEP_DEADLOCKABLE) { + rb_ractor_sleeper_threads_dec(th->ractor); + } + if (fl & SLEEP_ALLOW_SPURIOUS) { + break; + } + + woke = vm_check_ints_blocking(th->ec); + + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) { + break; + } } th->status = prev_status; } @@ -1305,34 +1411,34 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) void rb_thread_sleep_forever(void) { - thread_debug("rb_thread_sleep_forever\n"); + RUBY_DEBUG_LOG("forever"); sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK); } void rb_thread_sleep_deadly(void) { - thread_debug("rb_thread_sleep_deadly\n"); + RUBY_DEBUG_LOG("deadly"); sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK); } -void -rb_thread_sleep_interruptible(void) -{ - rb_thread_t *th = GET_THREAD(); - enum rb_thread_status prev_status = th->status; - - th->status = THREAD_STOPPED; - native_sleep(th, 0); - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = prev_status; -} - static void -rb_thread_sleep_deadly_allow_spurious_wakeup(void) +rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end) { - thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); - sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + rb_thread_t *th = GET_THREAD(); + VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th); + if (scheduler != Qnil) { + rb_fiber_scheduler_block(scheduler, blocker, timeout); + } + else { + RUBY_DEBUG_LOG("..."); + if (end) { + sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK); + } + else { + sleep_forever(th, SLEEP_DEADLOCKABLE); + } + } } void @@ -1343,6 +1449,12 @@ rb_thread_wait_for(struct timeval time) sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK); } +void +rb_ec_check_ints(rb_execution_context_t *ec) +{ + RUBY_VM_CHECK_INTS_BLOCKING(ec); +} + /* * CAUTION: This function causes thread switching. * rb_thread_check_ints() check ruby's interrupts. @@ -1353,7 +1465,7 @@ rb_thread_wait_for(struct timeval time) void rb_thread_check_ints(void) { - RUBY_VM_CHECK_INTS_BLOCKING(GET_EC()); + rb_ec_check_ints(GET_EC()); } /* @@ -1382,17 +1494,19 @@ rb_thread_sleep(int sec) static void rb_thread_schedule_limits(uint32_t limits_us) { - thread_debug("rb_thread_schedule\n"); if (!rb_thread_alone()) { - rb_thread_t *th = GET_THREAD(); + rb_thread_t *th = GET_THREAD(); + RUBY_DEBUG_LOG("us:%u", (unsigned int)limits_us); + + if (th->running_time_us >= limits_us) { + RUBY_DEBUG_LOG("switch %s", "start"); - if (th->running_time_us >= limits_us) { - thread_debug("rb_thread_schedule/switch start\n"); - RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_yield(th->vm, th); - rb_thread_set_current(th); - thread_debug("rb_thread_schedule/switch done\n"); - } + RB_VM_SAVE_MACHINE_CONTEXT(th); + thread_sched_yield(TH_SCHED(th), th); + rb_ractor_thread_switch(th->ractor, th, true); + + RUBY_DEBUG_LOG("switch %s", "done"); + } } } @@ -1407,19 +1521,24 @@ rb_thread_schedule(void) static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, - rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted) + rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted) { +#ifdef RUBY_ASSERT_CRITICAL_SECTION + VM_ASSERT(ruby_assert_critical_section_entered == 0); +#endif + VM_ASSERT(th == GET_THREAD()); + region->prev_status = th->status; if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) { - th->blocking_region_buffer = region; - th->status = THREAD_STOPPED; - thread_debug("enter blocking region (%p)\n", (void *)th); - RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_release(th->vm); - return TRUE; + th->blocking_region_buffer = region; + th->status = THREAD_STOPPED; + rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); + + RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id); + return TRUE; } else { - return FALSE; + return FALSE; } } @@ -1431,13 +1550,44 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) /* entry to ubf_list impossible at this point, so unregister is safe: */ unregister_ubf_list(th); - gvl_acquire(th->vm, th); - rb_thread_set_current(th); - thread_debug("leave blocking region (%p)\n", (void *)th); + thread_sched_to_running(TH_SCHED(th), th); + rb_ractor_thread_switch(th->ractor, th, false); + th->blocking_region_buffer = 0; + rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__); if (th->status == THREAD_STOPPED) { - th->status = region->prev_status; + th->status = region->prev_status; + } + + RUBY_DEBUG_LOG("end"); + +#ifndef _WIN32 + // GET_THREAD() clears WSAGetLastError() + VM_ASSERT(th == GET_THREAD()); +#endif +} + +/* + * Resolve sentinel unblock function values to their actual function pointers + * and appropriate data2 values. This centralizes the logic for handling + * RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values. + * + * @param unblock_function Pointer to unblock function pointer (modified in place) + * @param data2 Pointer to data2 pointer (modified in place) + * @param thread Thread context for resolving data2 when needed + * @return true if sentinel values were resolved, false otherwise + */ +bool +rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread) +{ + rb_unblock_function_t *ubf = *unblock_function; + + if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) { + *unblock_function = ubf_select; + *data2 = thread; + return true; } + return false; } void * @@ -1445,41 +1595,49 @@ rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags) { + if (flags & RB_NOGVL_OFFLOAD_SAFE) { + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + struct rb_fiber_scheduler_blocking_operation_state state = {0}; + + VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state); + + if (!UNDEF_P(result)) { + rb_errno_set(state.saved_errno); + return state.result; + } + } + } + void *val = 0; rb_execution_context_t *ec = GET_EC(); rb_thread_t *th = rb_ec_thread_ptr(ec); + rb_vm_t *vm = rb_ec_vm_ptr(ec); + bool is_main_thread = vm->ractor.main_thread == th; int saved_errno = 0; - VALUE ubf_th = Qfalse; - if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { - ubf = ubf_select; - data2 = th; - } - else if (ubf && vm_living_thread_num(th->vm) == 1) { - if (RB_NOGVL_UBF_ASYNC_SAFE) { - th->vm->ubf_async_safe = 1; - } - else { - ubf_th = rb_thread_start_unblock_thread(); + rb_thread_resolve_unblock_function(&ubf, &data2, th); + + if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) { + if (flags & RB_NOGVL_UBF_ASYNC_SAFE) { + vm->ubf_async_safe = 1; } } + rb_vm_t *volatile saved_vm = vm; BLOCKING_REGION(th, { - val = func(data1); - saved_errno = errno; + val = func(data1); + saved_errno = rb_errno(); }, ubf, data2, flags & RB_NOGVL_INTR_FAIL); + vm = saved_vm; - th->vm->ubf_async_safe = 0; + if (is_main_thread) vm->ubf_async_safe = 0; if ((flags & RB_NOGVL_INTR_FAIL) == 0) { - RUBY_VM_CHECK_INTS_BLOCKING(ec); + RUBY_VM_CHECK_INTS_BLOCKING(ec); } - if (ubf_th != Qfalse) { - thread_value(rb_thread_kill(ubf_th)); - } - - errno = saved_errno; + rb_errno_set(saved_errno); return val; } @@ -1571,57 +1729,305 @@ rb_nogvl(void *(*func)(void *), void *data1, */ void * rb_thread_call_without_gvl2(void *(*func)(void *), void *data1, - rb_unblock_function_t *ubf, void *data2) + rb_unblock_function_t *ubf, void *data2) { return rb_nogvl(func, data1, ubf, data2, RB_NOGVL_INTR_FAIL); } void * rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, - rb_unblock_function_t *ubf, void *data2) + rb_unblock_function_t *ubf, void *data2) { return rb_nogvl(func, data1, ubf, data2, 0); } +static int +waitfd_to_waiting_flag(int wfd_event) +{ + return wfd_event << 1; +} + +static struct ccan_list_head * +rb_io_blocking_operations(struct rb_io *io) +{ + rb_serial_t fork_generation = GET_VM()->fork_gen; + + // On fork, all existing entries in this list (which are stack allocated) become invalid. + // Therefore, we re-initialize the list which clears it. + if (io->fork_generation != fork_generation) { + ccan_list_head_init(&io->blocking_operations); + io->fork_generation = fork_generation; + } + + return &io->blocking_operations; +} + +/* + * Registers a blocking operation for an IO object. This is used to track all threads and fibers + * that are currently blocked on this IO for reading, writing or other operations. + * + * When the IO is closed, all blocking operations will be notified via rb_fiber_scheduler_fiber_interrupt + * for fibers with a scheduler, or via rb_threadptr_interrupt for threads without a scheduler. + * + * @parameter io The IO object on which the operation will block + * @parameter blocking_operation The operation details including the execution context that will be blocked + */ +static void +rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) +{ + ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list); +} + +static void +rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) +{ + ccan_list_del(&blocking_operation->list); +} + +struct io_blocking_operation_arguments { + struct rb_io *io; + struct rb_io_blocking_operation *blocking_operation; +}; + +static VALUE +io_blocking_operation_exit(VALUE _arguments) +{ + struct io_blocking_operation_arguments *arguments = (void*)_arguments; + struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation; + + rb_io_blocking_operation_pop(arguments->io, blocking_operation); + + rb_io_t *io = arguments->io; + rb_thread_t *thread = io->closing_ec->thread_ptr; + rb_fiber_t *fiber = io->closing_ec->fiber_ptr; + + if (thread->scheduler != Qnil) { + // This can cause spurious wakeups... + rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber)); + } + else { + rb_thread_wakeup(thread->self); + } + + return Qnil; +} + +/* + * Called when a blocking operation completes or is interrupted. Removes the operation from + * the IO's blocking_operations list and wakes up any waiting threads/fibers. + * + * If there's a wakeup_mutex (meaning an IO close is in progress), synchronizes the cleanup + * through that mutex to ensure proper coordination with the closing thread. + * + * @parameter io The IO object the operation was performed on + * @parameter blocking_operation The completed operation to clean up + */ +static void +rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) +{ + VALUE wakeup_mutex = io->wakeup_mutex; + + // Indicate that the blocking operation is no longer active: + blocking_operation->ec = NULL; + + if (RB_TEST(wakeup_mutex)) { + struct io_blocking_operation_arguments arguments = { + .io = io, + .blocking_operation = blocking_operation + }; + + rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments); + } + else { + // If there's no wakeup_mutex, we can safely remove the operation directly: + rb_io_blocking_operation_pop(io, blocking_operation); + } +} + +static VALUE +rb_thread_io_blocking_operation_ensure(VALUE _argument) +{ + struct io_blocking_operation_arguments *arguments = (void*)_argument; + + rb_io_blocking_operation_exit(arguments->io, arguments->blocking_operation); + + return Qnil; +} + +/* + * Executes a function that performs a blocking IO operation, while properly tracking + * the operation in the IO's blocking_operations list. This ensures proper cleanup + * and interruption handling if the IO is closed while blocked. + * + * The operation is automatically removed from the blocking_operations list when the function + * returns, whether normally or due to an exception. + * + * @parameter self The IO object + * @parameter function The function to execute that will perform the blocking operation + * @parameter argument The argument to pass to the function + * @returns The result of the blocking operation function + */ VALUE -rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument) +{ + struct rb_io *io; + RB_IO_POINTER(self, io); + + rb_execution_context_t *ec = GET_EC(); + struct rb_io_blocking_operation blocking_operation = { + .ec = ec, + }; + rb_io_blocking_operation_enter(io, &blocking_operation); + + struct io_blocking_operation_arguments io_blocking_operation_arguments = { + .io = io, + .blocking_operation = &blocking_operation + }; + + return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments); +} + +static bool +thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout) +{ +#if defined(USE_MN_THREADS) && USE_MN_THREADS + return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking; +#else + return false; +#endif +} + +// true if need retry +static bool +thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout) +{ +#if defined(USE_MN_THREADS) && USE_MN_THREADS + if (thread_io_mn_schedulable(th, events, timeout)) { + rb_hrtime_t rel, *prel; + + if (timeout) { + rel = rb_timeval2hrtime(timeout); + prel = &rel; + } + else { + prel = NULL; + } + + VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT))); + + if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) { + // timeout + return false; + } + else { + return true; + } + } +#endif // defined(USE_MN_THREADS) && USE_MN_THREADS + return false; +} + +// assume read/write +static bool +blocking_call_retryable_p(int r, int eno) +{ + if (r != -1) return false; + + switch (eno) { + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: +#endif + return true; + default: + return false; + } +} + +bool +rb_thread_mn_schedulable(VALUE thval) +{ + rb_thread_t *th = rb_thread_ptr(thval); + return th->mn_schedulable; +} + +VALUE +rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events) { - volatile VALUE val = Qundef; /* shouldn't be used */ rb_execution_context_t * volatile ec = GET_EC(); + rb_thread_t * volatile th = rb_ec_thread_ptr(ec); + + RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events); + + volatile VALUE val = Qundef; /* shouldn't be used */ volatile int saved_errno = 0; enum ruby_tag_type state; - struct waiting_fd wfd; + volatile bool prev_mn_schedulable = th->mn_schedulable; + th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL); - wfd.fd = fd; - wfd.th = rb_ec_thread_ptr(ec); - list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd.wfd_node); + int fd = io->fd; - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - BLOCKING_REGION(wfd.th, { - val = func(data1); - saved_errno = errno; - }, ubf_select, wfd.th, FALSE); + // `errno` is only valid when there is an actual error - but we can't + // extract that from the return value of `func` alone, so we clear any + // prior `errno` value here so that we can later check if it was set by + // `func` or not (as opposed to some previously set value). + errno = 0; + + struct rb_io_blocking_operation blocking_operation = { + .ec = ec, + }; + rb_io_blocking_operation_enter(io, &blocking_operation); + + { + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */ + retry: + BLOCKING_REGION(th, { + val = func(data1); + saved_errno = errno; + }, ubf_select, th, FALSE); + + RUBY_ASSERT(th == rb_ec_thread_ptr(ec)); + if (events && + blocking_call_retryable_p((int)val, saved_errno) && + thread_io_wait_events(th, fd, events, NULL)) { + RUBY_VM_CHECK_INTS_BLOCKING(ec); + goto retry; + } + + RUBY_VM_CHECK_INTS_BLOCKING(ec); + + state = saved_state; + } + EC_POP_TAG(); + + th = rb_ec_thread_ptr(ec); + th->mn_schedulable = prev_mn_schedulable; } - EC_POP_TAG(); - /* - * must be deleted before jump - * this will delete either from waiting_fds or on-stack LIST_HEAD(busy) - */ - list_del(&wfd.wfd_node); + rb_io_blocking_operation_exit(io, &blocking_operation); if (state) { - EC_JUMP_TAG(ec, state); + EC_JUMP_TAG(ec, state); + } + + // If the error was a timeout, we raise a specific exception for that: + if (saved_errno == ETIMEDOUT) { + rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!"); } - /* TODO: check func() */ - RUBY_VM_CHECK_INTS_BLOCKING(ec); errno = saved_errno; return val; } +VALUE +rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1) +{ + return rb_thread_io_blocking_call(io, func, data1, 0); +} + /* * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release. * @@ -1649,6 +2055,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) * created as Ruby thread (created by Thread.new or so). In other * words, this function *DOES NOT* associate or convert a NON-Ruby * thread to a Ruby thread. + * + * NOTE: If this thread has already acquired the GVL, then the method call + * is performed without acquiring or releasing the GVL (from Ruby 4.0). */ void * rb_thread_call_with_gvl(void *(*func)(void *), void *data1) @@ -1659,20 +2068,21 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) void *r; if (th == 0) { - /* Error has occurred, but we can't use rb_bug() - * because this thread is not Ruby's thread. + /* Error has occurred, but we can't use rb_bug() + * because this thread is not Ruby's thread. * What should we do? - */ - - fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); - exit(EXIT_FAILURE); + */ + bp(); + fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); + exit(EXIT_FAILURE); } brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer; prev_unblock = th->unblock; if (brb == 0) { - rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); + /* the GVL is already acquired, call method directly */ + return (*func)(data1); } blocking_region_end(th, brb); @@ -1681,16 +2091,13 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) /* leave from Ruby world: You can not access Ruby values, etc. */ int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); RUBY_ASSERT_ALWAYS(released); + RB_VM_SAVE_MACHINE_CONTEXT(th); + thread_sched_to_waiting(TH_SCHED(th), th); return r; } /* * ruby_thread_has_gvl_p - check if current native thread has GVL. - * - *** - *** This API is EXPERIMENTAL! - *** We do not guarantee that this API remains in ruby 1.9.2 or later. - *** */ int @@ -1699,10 +2106,10 @@ ruby_thread_has_gvl_p(void) rb_thread_t *th = ruby_thread_from_native(); if (th && th->blocking_region_buffer == 0) { - return 1; + return 1; } else { - return 0; + return 0; } } @@ -1756,7 +2163,7 @@ static void threadptr_check_pending_interrupt_queue(rb_thread_t *th) { if (!th->pending_interrupt_queue) { - rb_raise(rb_eThreadError, "uninitialized thread"); + rb_raise(rb_eThreadError, "uninitialized thread"); } } @@ -1768,6 +2175,23 @@ enum handle_interrupt_timing { }; static enum handle_interrupt_timing +rb_threadptr_pending_interrupt_from_symbol(rb_thread_t *th, VALUE sym) +{ + if (sym == sym_immediate) { + return INTERRUPT_IMMEDIATE; + } + else if (sym == sym_on_blocking) { + return INTERRUPT_ON_BLOCKING; + } + else if (sym == sym_never) { + return INTERRUPT_NEVER; + } + else { + rb_raise(rb_eThreadError, "unknown mask signature"); + } +} + +static enum handle_interrupt_timing rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) { VALUE mask; @@ -1777,35 +2201,34 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) long i; for (i=0; i<mask_stack_len; i++) { - mask = mask_stack[mask_stack_len-(i+1)]; - - for (mod = err; mod; mod = RCLASS_SUPER(mod)) { - VALUE klass = mod; - VALUE sym; - - if (BUILTIN_TYPE(mod) == T_ICLASS) { - klass = RBASIC(mod)->klass; - } - else if (mod != RCLASS_ORIGIN(mod)) { - continue; - } - - if ((sym = rb_hash_aref(mask, klass)) != Qnil) { - if (sym == sym_immediate) { - return INTERRUPT_IMMEDIATE; - } - else if (sym == sym_on_blocking) { - return INTERRUPT_ON_BLOCKING; - } - else if (sym == sym_never) { - return INTERRUPT_NEVER; - } - else { - rb_raise(rb_eThreadError, "unknown mask signature"); - } - } - } - /* try to next mask */ + mask = mask_stack[mask_stack_len-(i+1)]; + + if (SYMBOL_P(mask)) { + /* do not match RUBY_FATAL_THREAD_KILLED etc */ + if (err != rb_cInteger) { + return rb_threadptr_pending_interrupt_from_symbol(th, mask); + } + else { + continue; + } + } + + for (mod = err; mod; mod = RCLASS_SUPER(mod)) { + VALUE klass = mod; + VALUE sym; + + if (BUILTIN_TYPE(mod) == T_ICLASS) { + klass = RBASIC(mod)->klass; + } + else if (mod != RCLASS_ORIGIN(mod)) { + continue; + } + + if ((sym = rb_hash_aref(mask, klass)) != Qnil) { + return rb_threadptr_pending_interrupt_from_symbol(th, sym); + } + } + /* try to next mask */ } return INTERRUPT_NONE; } @@ -1821,10 +2244,10 @@ rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err) { int i; for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) { - VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i); - if (rb_class_inherited_p(e, err)) { - return TRUE; - } + VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i); + if (rb_obj_is_kind_of(e, err)) { + return TRUE; + } } return FALSE; } @@ -1836,23 +2259,23 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi int i; for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) { - VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i); + VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i); - enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err)); + enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err)); - switch (mask_timing) { - case INTERRUPT_ON_BLOCKING: - if (timing != INTERRUPT_ON_BLOCKING) { - break; - } - /* fall through */ - case INTERRUPT_NONE: /* default: IMMEDIATE */ - case INTERRUPT_IMMEDIATE: - rb_ary_delete_at(th->pending_interrupt_queue, i); - return err; - case INTERRUPT_NEVER: - break; - } + switch (mask_timing) { + case INTERRUPT_ON_BLOCKING: + if (timing != INTERRUPT_ON_BLOCKING) { + break; + } + /* fall through */ + case INTERRUPT_NONE: /* default: IMMEDIATE */ + case INTERRUPT_IMMEDIATE: + rb_ary_delete_at(th->pending_interrupt_queue, i); + return err; + case INTERRUPT_NEVER: + break; + } } th->pending_interrupt_queue_checked = 1; @@ -1860,7 +2283,7 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi #else VALUE err = rb_ary_shift(th->pending_interrupt_queue); if (rb_threadptr_pending_interrupt_empty_p(th)) { - th->pending_interrupt_queue_checked = 1; + th->pending_interrupt_queue_checked = 1; } return err; #endif @@ -1875,11 +2298,11 @@ threadptr_pending_interrupt_active_p(rb_thread_t *th) * since last check. */ if (th->pending_interrupt_queue_checked) { - return 0; + return 0; } if (rb_threadptr_pending_interrupt_empty_p(th)) { - return 0; + return 0; } return 1; @@ -1891,13 +2314,27 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) VALUE *maskp = (VALUE *)args; if (val != sym_immediate && val != sym_on_blocking && val != sym_never) { - rb_raise(rb_eArgError, "unknown mask signature"); + rb_raise(rb_eArgError, "unknown mask signature"); + } + + if (key == rb_eException && (UNDEF_P(*maskp) || NIL_P(*maskp))) { + *maskp = val; + return ST_CONTINUE; } - if (!*maskp) { - *maskp = rb_ident_hash_new(); + if (RTEST(*maskp)) { + if (!RB_TYPE_P(*maskp, T_HASH)) { + VALUE prev = *maskp; + *maskp = rb_ident_hash_new(); + if (SYMBOL_P(prev)) { + rb_hash_aset(*maskp, rb_eException, prev); + } + } + rb_hash_aset(*maskp, key, val); + } + else { + *maskp = Qfalse; } - rb_hash_aset(*maskp, key, val); return ST_CONTINUE; } @@ -1964,30 +2401,6 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) * resource allocation code. Then, the ensure block is where we can safely * deallocate your resources. * - * ==== Guarding from Timeout::Error - * - * In the next example, we will guard from the Timeout::Error exception. This - * will help prevent from leaking resources when Timeout::Error exceptions occur - * during normal ensure clause. For this example we use the help of the - * standard library Timeout, from lib/timeout.rb - * - * require 'timeout' - * Thread.handle_interrupt(Timeout::Error => :never) { - * timeout(10){ - * # Timeout::Error doesn't occur here - * Thread.handle_interrupt(Timeout::Error => :on_blocking) { - * # possible to be killed by Timeout::Error - * # while blocking operation - * } - * # Timeout::Error doesn't occur here - * } - * } - * - * In the first part of the +timeout+ block, we can rely on Timeout::Error being - * ignored. Then in the <code>Timeout::Error => :on_blocking</code> block, any - * operation that will block the calling thread is susceptible to a - * Timeout::Error exception being raised. - * * ==== Stack control settings * * It's possible to stack multiple levels of ::handle_interrupt blocks in order @@ -2007,49 +2420,63 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) * # all exceptions inherited from Exception are prohibited. * } * + * For handling all interrupts, use +Object+ and not +Exception+ + * as the ExceptionClass, as kill/terminate interrupts are not handled by +Exception+. */ static VALUE rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) { - VALUE mask; + VALUE mask = Qundef; rb_execution_context_t * volatile ec = GET_EC(); rb_thread_t * volatile th = rb_ec_thread_ptr(ec); volatile VALUE r = Qnil; enum ruby_tag_type state; if (!rb_block_given_p()) { - rb_raise(rb_eArgError, "block is needed."); + rb_raise(rb_eArgError, "block is needed."); } - mask = 0; mask_arg = rb_to_hash_type(mask_arg); + + if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) { + mask = Qnil; + } + rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask); - if (!mask) { - return rb_yield(Qnil); + + if (UNDEF_P(mask)) { + return rb_yield(Qnil); } - OBJ_FREEZE_RAW(mask); + + if (!RTEST(mask)) { + mask = mask_arg; + } + else if (RB_TYPE_P(mask, T_HASH)) { + OBJ_FREEZE(mask); + } + rb_ary_push(th->pending_interrupt_mask_stack, mask); if (!rb_threadptr_pending_interrupt_empty_p(th)) { - th->pending_interrupt_queue_checked = 0; - RUBY_VM_SET_INTERRUPT(th->ec); + th->pending_interrupt_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th->ec); } EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { - r = rb_yield(Qnil); + r = rb_yield(Qnil); } EC_POP_TAG(); rb_ary_pop(th->pending_interrupt_mask_stack); if (!rb_threadptr_pending_interrupt_empty_p(th)) { - th->pending_interrupt_queue_checked = 0; - RUBY_VM_SET_INTERRUPT(th->ec); + th->pending_interrupt_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th->ec); } RUBY_VM_CHECK_INTS(th->ec); if (state) { - EC_JUMP_TAG(th->ec, state); + EC_JUMP_TAG(th->ec, state); } return r; @@ -2071,25 +2498,20 @@ rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread) rb_thread_t *target_th = rb_thread_ptr(target_thread); if (!target_th->pending_interrupt_queue) { - return Qfalse; + return Qfalse; } if (rb_threadptr_pending_interrupt_empty_p(target_th)) { - return Qfalse; + return Qfalse; } if (rb_check_arity(argc, 0, 1)) { VALUE err = argv[0]; if (!rb_obj_is_kind_of(err, rb_cModule)) { rb_raise(rb_eTypeError, "class or module required for rescue clause"); } - if (rb_threadptr_pending_interrupt_include_p(target_th, err)) { - return Qtrue; - } - else { - return Qfalse; - } + return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err)); } else { - return Qtrue; + return Qtrue; } } @@ -2161,6 +2583,7 @@ NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th)); static void rb_threadptr_to_kill(rb_thread_t *th) { + VM_ASSERT(GET_THREAD() == th); rb_threadptr_pending_interrupt_clear(th); th->status = THREAD_RUNNABLE; th->to_kill = 1; @@ -2175,99 +2598,126 @@ threadptr_get_interrupts(rb_thread_t *th) rb_atomic_t interrupt; rb_atomic_t old; + old = ATOMIC_LOAD_RELAXED(ec->interrupt_flag); do { - interrupt = ec->interrupt_flag; - old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask); + interrupt = old; + old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask); } while (old != interrupt); return interrupt & (rb_atomic_t)~ec->interrupt_mask; } -MJIT_FUNC_EXPORTED int +static void threadptr_interrupt_exec_exec(rb_thread_t *th); + +// Execute interrupts on currently running thread +// In certain situations, calling this function will raise an exception. Some examples are: +// * during VM shutdown (`rb_ractor_terminate_all`) +// * Call to Thread#exit for current thread (`rb_thread_kill`) +// * Call to Thread#raise for current thread +int rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { rb_atomic_t interrupt; int postponed_job_interrupt = 0; int ret = FALSE; + VM_ASSERT(GET_THREAD() == th); + if (th->ec->raised_flag) return ret; while ((interrupt = threadptr_get_interrupts(th)) != 0) { - int sig; - int timer_interrupt; - int pending_interrupt; - int trap_interrupt; - - timer_interrupt = interrupt & TIMER_INTERRUPT_MASK; - pending_interrupt = interrupt & PENDING_INTERRUPT_MASK; - postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK; - trap_interrupt = interrupt & TRAP_INTERRUPT_MASK; - - if (postponed_job_interrupt) { - rb_postponed_job_flush(th->vm); - } - - /* signal handling */ - if (trap_interrupt && (th == th->vm->main_thread)) { - enum rb_thread_status prev_status = th->status; - int sigwait_fd = rb_sigwait_fd_get(th); - - if (sigwait_fd >= 0) { - (void)consume_communication_pipe(sigwait_fd); - ruby_sigchld_handler(th->vm); - rb_sigwait_fd_put(th, sigwait_fd); - rb_sigwait_fd_migrate(th->vm); - } - th->status = THREAD_RUNNABLE; - while ((sig = rb_get_next_signal()) != 0) { - ret |= rb_signal_exec(th, sig); - } - th->status = prev_status; - } - - /* exception from another thread */ - if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) { - VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); - thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); + int sig; + int timer_interrupt; + int pending_interrupt; + int trap_interrupt; + int terminate_interrupt; + + timer_interrupt = interrupt & TIMER_INTERRUPT_MASK; + pending_interrupt = interrupt & PENDING_INTERRUPT_MASK; + postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK; + trap_interrupt = interrupt & TRAP_INTERRUPT_MASK; + terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors + + if (interrupt & VM_BARRIER_INTERRUPT_MASK) { + RB_VM_LOCKING(); + } + + if (postponed_job_interrupt) { + rb_postponed_job_flush(th->vm); + } + + if (trap_interrupt) { + /* signal handling */ + if (th == th->vm->ractor.main_thread) { + enum rb_thread_status prev_status = th->status; + + th->status = THREAD_RUNNABLE; + { + while ((sig = rb_get_next_signal()) != 0) { + ret |= rb_signal_exec(th, sig); + } + } + th->status = prev_status; + } + + if (!ccan_list_empty(&th->interrupt_exec_tasks)) { + enum rb_thread_status prev_status = th->status; + + th->status = THREAD_RUNNABLE; + { + threadptr_interrupt_exec_exec(th); + } + th->status = prev_status; + } + } + + /* exception from another thread */ + if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) { + VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); + RUBY_DEBUG_LOG("err:%"PRIdVALUE, err); ret = TRUE; - if (err == Qundef) { - /* no error */ - } - else if (err == eKillSignal /* Thread#kill received */ || - err == eTerminateSignal /* Terminate thread */ || - err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { - rb_threadptr_to_kill(th); - } - else { - if (err == th->vm->special_exceptions[ruby_error_stream_closed]) { - /* the only special exception to be queued across thread */ - err = ruby_vm_special_exception_copy(err); - } - /* set runnable if th was slept. */ - if (th->status == THREAD_STOPPED || - th->status == THREAD_STOPPED_FOREVER) - th->status = THREAD_RUNNABLE; - rb_exc_raise(err); - } - } - - if (timer_interrupt) { - uint32_t limits_us = TIME_QUANTUM_USEC; - - if (th->priority > 0) - limits_us <<= th->priority; - else - limits_us >>= -th->priority; - - if (th->status == THREAD_RUNNABLE) - th->running_time_us += TIME_QUANTUM_USEC; + if (UNDEF_P(err)) { + /* no error */ + } + else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ || + err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ || + err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { + terminate_interrupt = 1; + } + else { + if (err == th->vm->special_exceptions[ruby_error_stream_closed]) { + /* the only special exception to be queued across thread */ + err = ruby_vm_special_exception_copy(err); + } + /* set runnable if th was slept. */ + if (th->status == THREAD_STOPPED || + th->status == THREAD_STOPPED_FOREVER) + th->status = THREAD_RUNNABLE; + rb_exc_raise(err); + } + } + + if (terminate_interrupt) { + rb_threadptr_to_kill(th); + } + + if (timer_interrupt) { + uint32_t limits_us = thread_default_quantum_ms * 1000; + + if (th->priority > 0) + limits_us <<= th->priority; + else + limits_us >>= -th->priority; + + if (th->status == THREAD_RUNNABLE) + th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro VM_ASSERT(th->ec->cfp); - EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self, - 0, 0, 0, Qundef); + EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self, + 0, 0, 0, Qundef); - rb_thread_schedule_limits(limits_us); - } + rb_thread_schedule_limits(limits_us); + } } return ret; } @@ -2287,28 +2737,21 @@ rb_threadptr_ready(rb_thread_t *th) static VALUE rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv) { - VALUE exc; - if (rb_threadptr_dead(target_th)) { - return Qnil; + return Qnil; } - if (argc == 0) { - exc = rb_exc_new(rb_eRuntimeError, 0, 0); - } - else { - exc = rb_make_exception(argc, argv); - } + VALUE exception = rb_exception_setup(argc, argv); /* making an exception object can switch thread, so we need to check thread deadness again */ if (rb_threadptr_dead(target_th)) { - return Qnil; + return Qnil; } - rb_ec_setup_exception(GET_EC(), exc, Qundef); - rb_threadptr_pending_interrupt_enque(target_th, exc); + rb_threadptr_pending_interrupt_enque(target_th, exception); rb_threadptr_interrupt(target_th); + return Qnil; } @@ -2319,7 +2762,7 @@ rb_threadptr_signal_raise(rb_thread_t *th, int sig) argv[0] = rb_eSignal; argv[1] = INT2FIX(sig); - rb_threadptr_raise(th->vm->main_thread, 2, argv); + rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv); } void @@ -2329,14 +2772,16 @@ rb_threadptr_signal_exit(rb_thread_t *th) argv[0] = rb_eSystemExit; argv[1] = rb_str_new2("exit"); - rb_threadptr_raise(th->vm->main_thread, 2, argv); + + // TODO: check signal raise deliverly + rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv); } int rb_ec_set_raised(rb_execution_context_t *ec) { if (ec->raised_flag & RAISED_EXCEPTION) { - return 1; + return 1; } ec->raised_flag |= RAISED_EXCEPTION; return 0; @@ -2346,53 +2791,128 @@ int rb_ec_reset_raised(rb_execution_context_t *ec) { if (!(ec->raised_flag & RAISED_EXCEPTION)) { - return 0; + return 0; } ec->raised_flag &= ~RAISED_EXCEPTION; return 1; } -int -rb_notify_fd_close(int fd, struct list_head *busy) +/* + * Thread-safe IO closing mechanism. + * + * When an IO is closed while other threads or fibers are blocked on it, we need to: + * 1. Track and notify all blocking operations through io->blocking_operations + * 2. Ensure only one thread can close at a time using io->closing_ec + * 3. Synchronize cleanup using wakeup_mutex + * + * The close process works as follows: + * - First check if any thread is already closing (io->closing_ec) + * - Set up wakeup_mutex for synchronization + * - Iterate through all blocking operations in io->blocking_operations + * - For each blocked fiber with a scheduler: + * - Notify via rb_fiber_scheduler_fiber_interrupt + * - For each blocked thread without a scheduler: + * - Enqueue IOError via rb_threadptr_pending_interrupt_enque + * - Wake via rb_threadptr_interrupt + * - Wait on wakeup_mutex until all operations are cleaned up + * - Only then clear closing state and allow actual close to proceed + */ +static VALUE +thread_io_close_notify_all(VALUE _io) { - rb_vm_t *vm = GET_THREAD()->vm; - struct waiting_fd *wfd = 0, *next; + struct rb_io *io = (struct rb_io *)_io; + + size_t count = 0; + rb_vm_t *vm = io->closing_ec->thread_ptr->vm; + VALUE error = vm->special_exceptions[ruby_error_stream_closed]; + + struct rb_io_blocking_operation *blocking_operation; + ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) { + rb_execution_context_t *ec = blocking_operation->ec; + + // If the operation is in progress, we need to interrupt it: + if (ec) { + rb_thread_t *thread = ec->thread_ptr; + + VALUE result = RUBY_Qundef; + if (thread->scheduler != Qnil) { + result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error); + } + + if (result == RUBY_Qundef) { + // If the thread is not the current thread, we need to enqueue an error: + rb_threadptr_pending_interrupt_enque(thread, error); + rb_threadptr_interrupt(thread); + } + } - list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { - if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; - VALUE err; + count += 1; + } - list_del(&wfd->wfd_node); - list_add(busy, &wfd->wfd_node); + return (VALUE)count; +} - err = th->vm->special_exceptions[ruby_error_stream_closed]; - rb_threadptr_pending_interrupt_enque(th, err); - rb_threadptr_interrupt(th); - } +size_t +rb_thread_io_close_interrupt(struct rb_io *io) +{ + // We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function. + if (io->closing_ec) { + return 0; + } + + // If there are no blocking operations, we are done: + if (ccan_list_empty(rb_io_blocking_operations(io))) { + return 0; } - return !list_empty(busy); + + // Otherwise, we are now closing the IO: + rb_execution_context_t *ec = GET_EC(); + io->closing_ec = ec; + + // This is used to ensure the correct execution context is woken up after the blocking operation is interrupted: + io->wakeup_mutex = rb_mutex_new(); + rb_mutex_allow_trap(io->wakeup_mutex, 1); + + // We need to use a mutex here as entering the fiber scheduler may cause a context switch: + VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io); + + return (size_t)result; } void -rb_thread_fd_close(int fd) +rb_thread_io_close_wait(struct rb_io* io) { - struct list_head busy; + VALUE wakeup_mutex = io->wakeup_mutex; + + if (!RB_TEST(wakeup_mutex)) { + // There was nobody else using this file when we closed it, so we never bothered to allocate a mutex: + return; + } - list_head_init(&busy); - if (rb_notify_fd_close(fd, &busy)) { - do rb_thread_schedule(); while (!list_empty(&busy)); + rb_mutex_lock(wakeup_mutex); + while (!ccan_list_empty(rb_io_blocking_operations(io))) { + rb_mutex_sleep(wakeup_mutex, Qnil); } + rb_mutex_unlock(wakeup_mutex); + + // We are done closing: + io->wakeup_mutex = Qnil; + io->closing_ec = NULL; +} + +void +rb_thread_fd_close(int fd) +{ + rb_warn("rb_thread_fd_close is deprecated (and is now a no-op)."); } /* * call-seq: - * thr.raise - * thr.raise(string) - * thr.raise(exception [, string [, array]]) + * raise(exception, message = exception.to_s, backtrace = nil, cause: $!) + * raise(message = nil, cause: $!) * * Raises an exception from the given thread. The caller does not have to be - * +thr+. See Kernel#raise for more information. + * +thr+. See Kernel#raise for more information on arguments. * * Thread.abort_on_exception = true * a = Thread.new { sleep(200) } @@ -2417,7 +2937,7 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) /* To perform Thread.current.raise as Kernel.raise */ if (current_th == target_th) { - RUBY_VM_CHECK_INTS(target_th->ec); + RUBY_VM_CHECK_INTS(target_th->ec); } return Qnil; } @@ -2431,42 +2951,46 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) * * Terminates +thr+ and schedules another thread to be run, returning * the terminated Thread. If this is the main thread, or the last - * thread, exits the process. + * thread, exits the process. Note that the caller does not wait for + * the thread to terminate if the receiver is different from the currently + * running thread. The termination is asynchronous, and the thread can still + * run a small amount of ruby code before exiting. */ VALUE rb_thread_kill(VALUE thread) { - rb_thread_t *th = rb_thread_ptr(thread); + rb_thread_t *target_th = rb_thread_ptr(thread); - if (th->to_kill || th->status == THREAD_KILLED) { - return thread; + if (target_th->to_kill || target_th->status == THREAD_KILLED) { + return thread; } - if (th == th->vm->main_thread) { - rb_exit(EXIT_SUCCESS); + if (target_th == target_th->vm->ractor.main_thread) { + rb_exit(EXIT_SUCCESS); } - thread_debug("rb_thread_kill: %p (%"PRI_THREAD_ID")\n", (void *)th, thread_id_str(th)); + RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th)); - if (th == GET_THREAD()) { - /* kill myself immediately */ - rb_threadptr_to_kill(th); + if (target_th == GET_THREAD()) { + /* kill myself immediately */ + rb_threadptr_to_kill(target_th); } else { - threadptr_check_pending_interrupt_queue(th); - rb_threadptr_pending_interrupt_enque(th, eKillSignal); - rb_threadptr_interrupt(th); + threadptr_check_pending_interrupt_queue(target_th); + rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED); + rb_threadptr_interrupt(target_th); } + return thread; } int rb_thread_to_be_killed(VALUE thread) { - rb_thread_t *th = rb_thread_ptr(thread); + rb_thread_t *target_th = rb_thread_ptr(thread); - if (th->to_kill || th->status == THREAD_KILLED) { - return TRUE; + if (target_th->to_kill || target_th->status == THREAD_KILLED) { + return TRUE; } return FALSE; } @@ -2505,7 +3029,7 @@ rb_thread_s_kill(VALUE obj, VALUE th) */ static VALUE -rb_thread_exit(void) +rb_thread_exit(VALUE _) { rb_thread_t *th = GET_THREAD(); return rb_thread_kill(th->self); @@ -2532,7 +3056,7 @@ VALUE rb_thread_wakeup(VALUE thread) { if (!RTEST(rb_thread_wakeup_alive(thread))) { - rb_raise(rb_eThreadError, "killed thread"); + rb_raise(rb_eThreadError, "killed thread"); } return thread; } @@ -2546,8 +3070,8 @@ rb_thread_wakeup_alive(VALUE thread) rb_threadptr_ready(target_th); if (target_th->status == THREAD_STOPPED || - target_th->status == THREAD_STOPPED_FOREVER) { - target_th->status = THREAD_RUNNABLE; + target_th->status == THREAD_STOPPED_FOREVER) { + target_th->status = THREAD_RUNNABLE; } return thread; @@ -2584,6 +3108,17 @@ rb_thread_run(VALUE thread) } +VALUE +rb_thread_stop(void) +{ + if (rb_thread_alone()) { + rb_raise(rb_eThreadError, + "stopping only thread\n\tnote: use sleep to stop forever"); + } + rb_thread_sleep_deadly(); + return Qnil; +} + /* * call-seq: * Thread.stop -> nil @@ -2599,19 +3134,21 @@ rb_thread_run(VALUE thread) * #=> "abc" */ -VALUE -rb_thread_stop(void) +static VALUE +thread_stop(VALUE _) { - if (rb_thread_alone()) { - rb_raise(rb_eThreadError, - "stopping only thread\n\tnote: use sleep to stop forever"); - } - rb_thread_sleep_deadly(); - return Qnil; + return rb_thread_stop(); } /********************************************************************/ +VALUE +rb_thread_list(void) +{ + // TODO + return rb_ractor_thread_list(); +} + /* * call-seq: * Thread.list -> array @@ -2632,24 +3169,10 @@ rb_thread_stop(void) * #<Thread:0x401bdf4c run> */ -VALUE -rb_thread_list(void) +static VALUE +thread_list(VALUE _) { - VALUE ary = rb_ary_new(); - rb_vm_t *vm = GET_THREAD()->vm; - rb_thread_t *th = 0; - - list_for_each(&vm->living_threads, th, vmlt_node) { - switch (th->status) { - case THREAD_RUNNABLE: - case THREAD_STOPPED: - case THREAD_STOPPED_FOREVER: - rb_ary_push(ary, th->self); - default: - break; - } - } - return ary; + return rb_thread_list(); } VALUE @@ -2676,7 +3199,7 @@ thread_s_current(VALUE klass) VALUE rb_thread_main(void) { - return GET_THREAD()->vm->main_thread->self; + return GET_RACTOR()->threads.main->self; } /* @@ -2714,9 +3237,9 @@ rb_thread_s_main(VALUE klass) */ static VALUE -rb_thread_s_abort_exc(void) +rb_thread_s_abort_exc(VALUE _) { - return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; + return RBOOL(GET_THREAD()->vm->thread_abort_on_exception); } @@ -2776,7 +3299,7 @@ rb_thread_s_abort_exc_set(VALUE self, VALUE val) static VALUE rb_thread_abort_exc(VALUE thread) { - return rb_thread_ptr(thread)->abort_on_exception ? Qtrue : Qfalse; + return RBOOL(rb_thread_ptr(thread)->abort_on_exception); } @@ -2840,13 +3363,13 @@ rb_thread_abort_exc_set(VALUE thread, VALUE val) * * There is also an instance level method to set this for a specific thread, * see #report_on_exception=. - * + * */ static VALUE -rb_thread_s_report_exc(void) +rb_thread_s_report_exc(VALUE _) { - return GET_THREAD()->vm->thread_report_on_exception ? Qtrue : Qfalse; + return RBOOL(GET_THREAD()->vm->thread_report_on_exception); } @@ -2890,6 +3413,52 @@ rb_thread_s_report_exc_set(VALUE self, VALUE val) /* * call-seq: + * Thread.ignore_deadlock -> true or false + * + * Returns the status of the global ``ignore deadlock'' condition. + * The default is +false+, so that deadlock conditions are not ignored. + * + * See also ::ignore_deadlock=. + * + */ + +static VALUE +rb_thread_s_ignore_deadlock(VALUE _) +{ + return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock); +} + + +/* + * call-seq: + * Thread.ignore_deadlock = boolean -> true or false + * + * Returns the new state. + * When set to +true+, the VM will not check for deadlock conditions. + * It is only useful to set this if your application can break a + * deadlock condition via some other means, such as a signal. + * + * Thread.ignore_deadlock = true + * queue = Thread::Queue.new + * + * trap(:SIGUSR1){queue.push "Received signal"} + * + * # raises fatal error unless ignoring deadlock + * puts queue.pop + * + * See also ::ignore_deadlock. + */ + +static VALUE +rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val) +{ + GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val); + return val; +} + + +/* + * call-seq: * thr.report_on_exception -> true or false * * Returns the status of the thread-local ``report on exception'' condition for @@ -2907,7 +3476,7 @@ rb_thread_s_report_exc_set(VALUE self, VALUE val) static VALUE rb_thread_report_exc(VALUE thread) { - return rb_thread_ptr(thread)->report_on_exception ? Qtrue : Qfalse; + return RBOOL(rb_thread_ptr(thread)->report_on_exception); } @@ -2936,8 +3505,7 @@ rb_thread_report_exc_set(VALUE thread, VALUE val) * call-seq: * thr.group -> thgrp or nil * - * Returns the ThreadGroup which contains the given thread, or returns +nil+ - * if +thr+ is not a member of any group. + * Returns the ThreadGroup which contains the given thread. * * Thread.main.group #=> #<ThreadGroup:0x4029d914> */ @@ -2945,8 +3513,7 @@ rb_thread_report_exc_set(VALUE thread, VALUE val) VALUE rb_thread_group(VALUE thread) { - VALUE group = rb_thread_ptr(thread)->thgroup; - return group == 0 ? Qnil : group; + return rb_thread_ptr(thread)->thgroup; } static const char * @@ -2954,15 +3521,15 @@ thread_status_name(rb_thread_t *th, int detail) { switch (th->status) { case THREAD_RUNNABLE: - return th->to_kill ? "aborting" : "run"; + return th->to_kill ? "aborting" : "run"; case THREAD_STOPPED_FOREVER: - if (detail) return "sleep_forever"; + if (detail) return "sleep_forever"; case THREAD_STOPPED: - return "sleep"; + return "sleep"; case THREAD_KILLED: - return "dead"; + return "dead"; default: - return "unknown"; + return "unknown"; } } @@ -3010,16 +3577,16 @@ rb_thread_status(VALUE thread) rb_thread_t *target_th = rb_thread_ptr(thread); if (rb_threadptr_dead(target_th)) { - if (!NIL_P(target_th->ec->errinfo) && - !FIXNUM_P(target_th->ec->errinfo)) { - return Qnil; - } - else { - return Qfalse; - } + if (!NIL_P(target_th->ec->errinfo) && + !FIXNUM_P(target_th->ec->errinfo)) { + return Qnil; + } + else { + return Qfalse; + } } else { - return rb_str_new2(thread_status_name(target_th, FALSE)); + return rb_str_new2(thread_status_name(target_th, FALSE)); } } @@ -3041,12 +3608,7 @@ rb_thread_status(VALUE thread) static VALUE rb_thread_alive_p(VALUE thread) { - if (rb_threadptr_dead(rb_thread_ptr(thread))) { - return Qfalse; - } - else { - return Qtrue; - } + return RBOOL(!thread_finished(rb_thread_ptr(thread))); } /* @@ -3069,31 +3631,9 @@ rb_thread_stop_p(VALUE thread) rb_thread_t *th = rb_thread_ptr(thread); if (rb_threadptr_dead(th)) { - return Qtrue; - } - else if (th->status == THREAD_STOPPED || - th->status == THREAD_STOPPED_FOREVER) { - return Qtrue; - } - else { - return Qfalse; + return Qtrue; } -} - -/* - * call-seq: - * thr.safe_level -> integer - * - * Returns the safe level. - * - * This method is obsolete because $SAFE is a process global state. - * Simply check $SAFE. - */ - -static VALUE -rb_thread_safe_level(VALUE thread) -{ - return UINT2NUM(rb_safe_level()); + return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER); } /* @@ -3123,22 +3663,56 @@ rb_thread_setname(VALUE thread, VALUE name) rb_thread_t *target_th = rb_thread_ptr(thread); if (!NIL_P(name)) { - rb_encoding *enc; - StringValueCStr(name); - enc = rb_enc_get(name); - if (!rb_enc_asciicompat(enc)) { - rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)", - rb_enc_name(enc)); - } - name = rb_str_new_frozen(name); + rb_encoding *enc; + StringValueCStr(name); + enc = rb_enc_get(name); + if (!rb_enc_asciicompat(enc)) { + rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)", + rb_enc_name(enc)); + } + name = rb_str_new_frozen(name); } target_th->name = name; - if (threadptr_initialized(target_th)) { - native_set_another_thread_name(target_th->thread_id, name); + if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) { + native_set_another_thread_name(target_th->nt->thread_id, name); } return name; } +#if USE_NATIVE_THREAD_NATIVE_THREAD_ID +/* + * call-seq: + * thr.native_thread_id -> integer + * + * Return the native thread ID which is used by the Ruby thread. + * + * The ID depends on the OS. (not POSIX thread ID returned by pthread_self(3)) + * * On Linux it is TID returned by gettid(2). + * * On macOS it is the system-wide unique integral ID of thread returned + * by pthread_threadid_np(3). + * * On FreeBSD it is the unique integral ID of the thread returned by + * pthread_getthreadid_np(3). + * * On Windows it is the thread identifier returned by GetThreadId(). + * * On other platforms, it raises NotImplementedError. + * + * NOTE: + * If the thread is not associated yet or already deassociated with a native + * thread, it returns _nil_. + * If the Ruby implementation uses M:N thread model, the ID may change + * depending on the timing. + */ + +static VALUE +rb_thread_native_thread_id(VALUE thread) +{ + rb_thread_t *target_th = rb_thread_ptr(thread); + if (rb_threadptr_dead(target_th)) return Qnil; + return native_thread_native_thread_id(target_th); +} +#else +# define rb_thread_native_thread_id rb_f_notimplement +#endif + /* * call-seq: * thr.to_s -> string @@ -3160,35 +3734,33 @@ rb_thread_to_s(VALUE thread) rb_str_catf(str, "@%"PRIsVALUE, target_th->name); } if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) { - rb_str_catf(str, "@%"PRIsVALUE":%"PRIsVALUE, + rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE, RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1)); - rb_gc_force_recycle(loc); } rb_str_catf(str, " %s>", status); - OBJ_INFECT(str, thread); return str; } /* variables for recursive traversals */ -static ID recursive_key; +#define recursive_key id__recursive_key__ static VALUE threadptr_local_aref(rb_thread_t *th, ID id) { if (id == recursive_key) { - return th->ec->local_storage_recursive_hash; + return th->ec->local_storage_recursive_hash; } else { - st_data_t val; - st_table *local_storage = th->ec->local_storage; + VALUE val; + struct rb_id_table *local_storage = th->ec->local_storage; - if (local_storage != NULL && st_lookup(local_storage, id, &val)) { - return (VALUE)val; - } - else { - return Qnil; - } + if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) { + return val; + } + else { + return Qnil; + } } } @@ -3292,26 +3864,26 @@ rb_thread_fetch(int argc, VALUE *argv, VALUE self) block_given = rb_block_given_p(); if (block_given && argc == 2) { - rb_warn("block supersedes default value argument"); + rb_warn("block supersedes default value argument"); } id = rb_check_id(&key); if (id == recursive_key) { - return target_th->ec->local_storage_recursive_hash; + return target_th->ec->local_storage_recursive_hash; } else if (id && target_th->ec->local_storage && - st_lookup(target_th->ec->local_storage, id, &val)) { - return val; + rb_id_table_lookup(target_th->ec->local_storage, id, &val)) { + return val; } else if (block_given) { - return rb_yield(key); + return rb_yield(key); } else if (argc == 1) { - rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key); + rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key); } else { - return argv[1]; + return argv[1]; } } @@ -3319,24 +3891,24 @@ static VALUE threadptr_local_aset(rb_thread_t *th, ID id, VALUE val) { if (id == recursive_key) { - th->ec->local_storage_recursive_hash = val; - return val; + th->ec->local_storage_recursive_hash = val; + return val; } else { - st_table *local_storage = th->ec->local_storage; + struct rb_id_table *local_storage = th->ec->local_storage; - if (NIL_P(val)) { - if (!local_storage) return Qnil; - st_delete_wrap(local_storage, id); - return Qnil; - } - else { - if (local_storage == NULL) { - th->ec->local_storage = local_storage = st_init_numtable(); - } - st_insert(local_storage, id, val); - return val; - } + if (NIL_P(val)) { + if (!local_storage) return Qnil; + rb_id_table_delete(local_storage, id); + return Qnil; + } + else { + if (local_storage == NULL) { + th->ec->local_storage = local_storage = rb_id_table_create(0); + } + rb_id_table_insert(local_storage, id, val); + return val; + } } } @@ -3401,9 +3973,13 @@ static VALUE rb_thread_variable_get(VALUE thread, VALUE key) { VALUE locals; + VALUE symbol = rb_to_symbol(key); - locals = rb_ivar_get(thread, id_locals); - return rb_hash_aref(locals, rb_to_symbol(key)); + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { + return Qnil; + } + locals = rb_thread_local_storage(thread); + return rb_hash_aref(locals, symbol); } /* @@ -3416,7 +3992,7 @@ rb_thread_variable_get(VALUE thread, VALUE key) */ static VALUE -rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) +rb_thread_variable_set(VALUE thread, VALUE key, VALUE val) { VALUE locals; @@ -3424,8 +4000,8 @@ rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) rb_frozen_error_raise(thread, "can't modify frozen thread locals"); } - locals = rb_ivar_get(thread, id_locals); - return rb_hash_aset(locals, rb_to_symbol(id), val); + locals = rb_thread_local_storage(thread); + return rb_hash_aset(locals, rb_to_symbol(key), val); } /* @@ -3444,31 +4020,28 @@ rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) static VALUE rb_thread_key_p(VALUE self, VALUE key) { + VALUE val; ID id = rb_check_id(&key); - st_table *local_storage = rb_thread_ptr(self)->ec->local_storage; + struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage; if (!id || local_storage == NULL) { - return Qfalse; - } - else if (st_lookup(local_storage, id, 0)) { - return Qtrue; - } - else { - return Qfalse; + return Qfalse; } + return RBOOL(rb_id_table_lookup(local_storage, id, &val)); } -static int -thread_keys_i(ID key, VALUE value, VALUE ary) +static enum rb_id_table_iterator_result +thread_keys_i(ID key, VALUE value, void *ary) { - rb_ary_push(ary, ID2SYM(key)); - return ST_CONTINUE; + rb_ary_push((VALUE)ary, ID2SYM(key)); + return ID_TABLE_CONTINUE; } int rb_thread_alone(void) { - return vm_living_thread_num(GET_VM()) == 1; + // TODO + return rb_ractor_living_thread_num(GET_RACTOR()) == 1; } /* @@ -3488,11 +4061,11 @@ rb_thread_alone(void) static VALUE rb_thread_keys(VALUE self) { - st_table *local_storage = rb_thread_ptr(self)->ec->local_storage; + struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage; VALUE ary = rb_ary_new(); if (local_storage) { - st_foreach(local_storage, thread_keys_i, ary); + rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary); } return ary; } @@ -3527,8 +4100,11 @@ rb_thread_variables(VALUE thread) VALUE locals; VALUE ary; - locals = rb_ivar_get(thread, id_locals); ary = rb_ary_new(); + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { + return ary; + } + locals = rb_thread_local_storage(thread); rb_hash_foreach(locals, keys_i, ary); return ary; @@ -3554,20 +4130,14 @@ static VALUE rb_thread_variable_p(VALUE thread, VALUE key) { VALUE locals; - ID id = rb_check_id(&key); + VALUE symbol = rb_to_symbol(key); - if (!id) return Qfalse; - - locals = rb_ivar_get(thread, id_locals); - - if (rb_hash_lookup(locals, ID2SYM(id)) != Qnil) { - return Qtrue; - } - else { + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return Qfalse; } + locals = rb_thread_local_storage(thread); - return Qfalse; + return RBOOL(rb_hash_lookup(locals, symbol) != Qnil); } /* @@ -3630,10 +4200,10 @@ rb_thread_priority_set(VALUE thread, VALUE prio) #else priority = NUM2INT(prio); if (priority > RUBY_THREAD_PRIORITY_MAX) { - priority = RUBY_THREAD_PRIORITY_MAX; + priority = RUBY_THREAD_PRIORITY_MAX; } else if (priority < RUBY_THREAD_PRIORITY_MIN) { - priority = RUBY_THREAD_PRIORITY_MIN; + priority = RUBY_THREAD_PRIORITY_MIN; } target_th->priority = (int8_t)priority; #endif @@ -3656,13 +4226,11 @@ rb_thread_priority_set(VALUE thread, VALUE prio) * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4) * select(2) documents how to allocate fd_set dynamically. * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4 - * - HP-UX documents how to allocate fd_set dynamically. - * http://docs.hp.com/en/B2355-60105/select.2.html * - Solaris 8 has select_large_fdset * - Mac OS X 10.7 (Lion) * select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and * _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined. - * http://developer.apple.com/library/mac/#releasenotes/Darwin/SymbolVariantsRelNotes/_index.html + * https://developer.apple.com/library/archive/releasenotes/Darwin/SymbolVariantsRelNotes/index.html * * When fd_set is not big enough to hold big file descriptors, * it should be allocated dynamically. @@ -3689,7 +4257,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); if (size < sizeof(fd_set)) - size = sizeof(fd_set); + size = sizeof(fd_set); dst->maxfd = src->maxfd; dst->fdset = xmalloc(size); memcpy(dst->fdset, src->fdset, size); @@ -3698,7 +4266,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) void rb_fd_term(rb_fdset_t *fds) { - if (fds->fdset) xfree(fds->fdset); + xfree(fds->fdset); fds->maxfd = 0; fds->fdset = 0; } @@ -3707,7 +4275,7 @@ void rb_fd_zero(rb_fdset_t *fds) { if (fds->fdset) - MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); + MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); } static void @@ -3720,8 +4288,8 @@ rb_fd_resize(int n, rb_fdset_t *fds) if (o < sizeof(fd_set)) o = sizeof(fd_set); if (m > o) { - fds->fdset = xrealloc(fds->fdset, m); - memset((char *)fds->fdset + o, 0, m - o); + fds->fdset = xrealloc(fds->fdset, m); + memset((char *)fds->fdset + o, 0, m - o); } if (n >= fds->maxfd) fds->maxfd = n + 1; } @@ -3764,7 +4332,7 @@ rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); if (size < sizeof(fd_set)) - size = sizeof(fd_set); + size = sizeof(fd_set); dst->maxfd = src->maxfd; dst->fdset = xrealloc(dst->fdset, size); memcpy(dst->fdset, src->fdset, size); @@ -3838,8 +4406,10 @@ rb_fd_set(int fd, rb_fdset_t *set) } } if (set->fdset->fd_count >= (unsigned)set->capa) { - set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; - set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); + set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; + set->fdset = + rb_xrealloc_mul_add( + set->fdset, set->capa, sizeof(SOCKET), sizeof(unsigned int)); } set->fdset->fd_array[set->fdset->fd_count++] = s; } @@ -3863,9 +4433,10 @@ rb_fd_set(int fd, rb_fdset_t *set) #endif static int -wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) +wait_retryable(volatile int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) { - if (*result < 0) { + int r = *result; + if (r < 0) { switch (errnum) { case EINTR: #ifdef ERESTART @@ -3879,8 +4450,8 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) } return FALSE; } - else if (*result == 0) { - /* check for spurious wakeup */ + else if (r == 0) { + /* check for spurious wakeup */ if (rel) { return !hrtime_update_expire(rel, end); } @@ -3891,7 +4462,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) struct select_set { int max; - int sigwait_fd; rb_thread_t *th; rb_fdset_t *rset; rb_fdset_t *wset; @@ -3907,11 +4477,6 @@ select_set_free(VALUE p) { struct select_set *set = (struct select_set *)p; - if (set->sigwait_fd >= 0) { - rb_sigwait_fd_put(set->th, set->sigwait_fd); - rb_sigwait_fd_migrate(set->th->vm); - } - rb_fd_term(&set->orig_rset); rb_fd_term(&set->orig_wset); rb_fd_term(&set->orig_eset); @@ -3919,30 +4484,16 @@ select_set_free(VALUE p) return Qfalse; } -static const rb_hrtime_t * -sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig, - int *drained_p) -{ - static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000; - - if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { - *drained_p = check_signals_nogvl(th, sigwait_fd); - if (!orig || *orig > quantum) - return &quantum; - } - - return orig; -} - static VALUE do_select(VALUE p) { struct select_set *set = (struct select_set *)p; - int result = 0; + volatile int result = 0; int lerrno; rb_hrtime_t *to, rel, end = 0; timeout_prepare(&to, &rel, &end, set->timeout); + volatile rb_hrtime_t endtime = end; #define restore_fdset(dst, src) \ ((dst) ? rb_fd_dup(dst, src) : (void)0) #define do_select_update() \ @@ -3952,86 +4503,34 @@ do_select(VALUE p) TRUE) do { - int drained; - lerrno = 0; + lerrno = 0; - BLOCKING_REGION(set->th, { - const rb_hrtime_t *sto; + BLOCKING_REGION(set->th, { struct timeval tv; - sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained); if (!RUBY_VM_INTERRUPTED(set->th->ec)) { - result = native_fd_select(set->max, set->rset, set->wset, - set->eset, - rb_hrtime2timeval(&tv, sto), set->th); + result = native_fd_select(set->max, + set->rset, set->wset, set->eset, + rb_hrtime2timeval(&tv, to), set->th); if (result < 0) lerrno = errno; } - }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE); - - if (set->sigwait_fd >= 0) { - if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) - result--; - (void)check_signals_nogvl(set->th, set->sigwait_fd); - } + }, ubf_select, set->th, TRUE); RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ - } while (wait_retryable(&result, lerrno, to, end) && do_select_update()); + } while (wait_retryable(&result, lerrno, to, endtime) && do_select_update()); - if (result < 0) { - errno = lerrno; - } + RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); - return (VALUE)result; -} - -static void -rb_thread_wait_fd_rw(int fd, int read) -{ - int result = 0; - int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; - - thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); - - if (fd < 0) { - rb_raise(rb_eIOError, "closed stream"); - } - - result = rb_wait_for_single_fd(fd, events, NULL); if (result < 0) { - rb_sys_fail(0); - } - - thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); -} - -void -rb_thread_wait_fd(int fd) -{ - rb_thread_wait_fd_rw(fd, 1); -} - -int -rb_thread_fd_writable(int fd) -{ - rb_thread_wait_fd_rw(fd, 0); - return TRUE; -} - -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - if (fd < 0) { - return 0; + errno = lerrno; } - rb_fd_init(fds); - rb_fd_set(fd, fds); - return fds; + return (VALUE)result; } int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, - struct timeval *timeout) + struct timeval *timeout) { struct select_set set; @@ -4052,16 +4551,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return 0; } - set.sigwait_fd = rb_sigwait_fd_get(set.th); - if (set.sigwait_fd >= 0) { - if (set.rset) - rb_fd_set(set.sigwait_fd, set.rset); - else - set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); - if (set.sigwait_fd >= set.max) { - set.max = set.sigwait_fd + 1; - } - } #define fd_init_copy(f) do { \ if (set.f) { \ rb_fd_resize(set.max - 1, set.f); \ @@ -4092,82 +4581,89 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * # define POLLERR_SET (0) #endif +static int +wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t nfds, + rb_hrtime_t *const to, volatile int *lerrno) +{ + struct timespec ts; + volatile int result = 0; + + *lerrno = 0; + BLOCKING_REGION(th, { + if (!RUBY_VM_INTERRUPTED(th->ec)) { + result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0); + if (result < 0) *lerrno = errno; + } + }, ubf_select, th, TRUE); + return result; +} + /* * returns a mask of events */ -int -rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) -{ - struct pollfd fds[2]; - int result = 0, lerrno; - rb_hrtime_t *to, rel, end = 0; - int drained; +static int +thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout) +{ + struct pollfd fds[1] = {{ + .fd = fd, + .events = (short)events, + .revents = 0, + }}; + volatile int result = 0; nfds_t nfds; - rb_unblock_function_t *ubf; - struct waiting_fd wfd; - int state; - - wfd.th = GET_THREAD(); - wfd.fd = fd; - list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); - EC_PUSH_TAG(wfd.th->ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); - timeout_prepare(&to, &rel, &end, timeout); - fds[0].fd = fd; - fds[0].events = (short)events; - fds[0].revents = 0; - do { - fds[1].fd = rb_sigwait_fd_get(wfd.th); - - if (fds[1].fd >= 0) { - fds[1].events = POLLIN; - fds[1].revents = 0; - nfds = 2; - ubf = ubf_sigwait; - } - else { - nfds = 1; - ubf = ubf_select; - } + struct rb_io_blocking_operation blocking_operation; + enum ruby_tag_type state; + volatile int lerrno; - lerrno = 0; - BLOCKING_REGION(wfd.th, { - const rb_hrtime_t *sto; - struct timespec ts; + RUBY_ASSERT(th); + rb_execution_context_t *ec = th->ec; - sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained); - if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) { - result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0); - if (result < 0) lerrno = errno; - } - }, ubf, wfd.th, TRUE); + if (io) { + blocking_operation.ec = ec; + rb_io_blocking_operation_enter(io, &blocking_operation); + } - if (fds[1].fd >= 0) { - if (result > 0 && fds[1].revents) { - result--; - } - (void)check_signals_nogvl(wfd.th, fds[1].fd); - rb_sigwait_fd_put(wfd.th, fds[1].fd); - rb_sigwait_fd_migrate(wfd.th->vm); - } - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); - } while (wait_retryable(&result, lerrno, to, end)); + if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) { + // fd is readable + state = 0; + fds[0].revents = events; + errno = 0; } - EC_POP_TAG(); - list_del(&wfd.wfd_node); + else { + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + rb_hrtime_t *to, rel, end = 0; + RUBY_VM_CHECK_INTS_BLOCKING(ec); + timeout_prepare(&to, &rel, &end, timeout); + do { + nfds = numberof(fds); + result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno); + + RUBY_VM_CHECK_INTS_BLOCKING(ec); + } while (wait_retryable(&result, lerrno, to, end)); + + RUBY_VM_CHECK_INTS_BLOCKING(ec); + } + + EC_POP_TAG(); + } + + if (io) { + rb_io_blocking_operation_exit(io, &blocking_operation); + } + if (state) { - EC_JUMP_TAG(wfd.th->ec, state); + EC_JUMP_TAG(ec, state); } if (result < 0) { - errno = lerrno; - return -1; + errno = lerrno; + return -1; } if (fds[0].revents & POLLNVAL) { - errno = EBADF; - return -1; + errno = EBADF; + return -1; } /* @@ -4176,28 +4672,30 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) */ result = 0; if (fds[0].revents & POLLIN_SET) - result |= RB_WAITFD_IN; + result |= RB_WAITFD_IN; if (fds[0].revents & POLLOUT_SET) - result |= RB_WAITFD_OUT; + result |= RB_WAITFD_OUT; if (fds[0].revents & POLLEX_SET) - result |= RB_WAITFD_PRI; + result |= RB_WAITFD_PRI; /* all requested events are ready if there is an error */ if (fds[0].revents & POLLERR_SET) - result |= events; + result |= events; return result; } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ struct select_args { + struct rb_io *io; + struct rb_io_blocking_operation *blocking_operation; + union { - int fd; - int error; + int fd; + int error; } as; rb_fdset_t *read; rb_fdset_t *write; rb_fdset_t *except; - struct waiting_fd wfd; struct timeval *tv; }; @@ -4210,15 +4708,15 @@ select_single(VALUE ptr) r = rb_thread_fd_select(args->as.fd + 1, args->read, args->write, args->except, args->tv); if (r == -1) - args->as.error = errno; + args->as.error = errno; if (r > 0) { - r = 0; - if (args->read && rb_fd_isset(args->as.fd, args->read)) - r |= RB_WAITFD_IN; - if (args->write && rb_fd_isset(args->as.fd, args->write)) - r |= RB_WAITFD_OUT; - if (args->except && rb_fd_isset(args->as.fd, args->except)) - r |= RB_WAITFD_PRI; + r = 0; + if (args->read && rb_fd_isset(args->as.fd, args->read)) + r |= RB_WAITFD_IN; + if (args->write && rb_fd_isset(args->as.fd, args->write)) + r |= RB_WAITFD_OUT; + if (args->except && rb_fd_isset(args->as.fd, args->except)) + r |= RB_WAITFD_PRI; } return (VALUE)r; } @@ -4228,7 +4726,10 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; - list_del(&args->wfd.wfd_node); + if (args->blocking_operation) { + rb_io_blocking_operation_exit(args->io, args->blocking_operation); + } + if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4236,31 +4737,64 @@ select_single_cleanup(VALUE ptr) return (VALUE)-1; } -int -rb_wait_for_single_fd(int fd, int events, struct timeval *tv) +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) +{ + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); + + return fds; +} + +static int +thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout) { rb_fdset_t rfds, wfds, efds; struct select_args args; - int r; VALUE ptr = (VALUE)&args; + struct rb_io_blocking_operation blocking_operation; + if (io) { + args.io = io; + blocking_operation.ec = th->ec; + rb_io_blocking_operation_enter(io, &blocking_operation); + args.blocking_operation = &blocking_operation; + } + else { + args.io = NULL; + blocking_operation.ec = NULL; + args.blocking_operation = NULL; + } + args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; - args.tv = tv; - args.wfd.fd = fd; - args.wfd.th = GET_THREAD(); + args.tv = timeout; - list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node); - r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); - if (r == -1) - errno = args.as.error; + int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); + if (result == -1) + errno = args.as.error; - return r; + return result; } #endif /* ! USE_POLL */ +int +rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout) +{ + return thread_io_wait(th, NULL, fd, events, timeout); +} + +int +rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout) +{ + return thread_io_wait(th, io, io->fd, events, timeout); +} + /* * for GC */ @@ -4270,7 +4804,12 @@ void rb_gc_set_stack_end(VALUE **stack_end_p) { VALUE stack_end; +COMPILER_WARNING_PUSH +#if RBIMPL_COMPILER_IS(GCC) +COMPILER_WARNING_IGNORED(-Wdangling-pointer); +#endif *stack_end_p = &stack_end; +COMPILER_WARNING_POP } #endif @@ -4283,29 +4822,18 @@ rb_threadptr_check_signal(rb_thread_t *mth) { /* mth must be main_thread */ if (rb_signal_buff_size() > 0) { - /* wakeup main thread */ - threadptr_trap_interrupt(mth); + /* wakeup main thread */ + threadptr_trap_interrupt(mth); } } static void -timer_thread_function(void) -{ - volatile rb_execution_context_t *ec; - - /* for time slice */ - ec = ACCESS_ONCE(rb_execution_context_t *, - ruby_current_execution_context_ptr); - if (ec) RUBY_VM_SET_TIMER_INTERRUPT(ec); -} - -static void async_bug_fd(const char *mesg, int errno_arg, int fd) { char buff[64]; size_t n = strlcpy(buff, mesg, sizeof(buff)); if (n < sizeof(buff)-3) { - ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); + ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); } rb_async_bug_errno(buff, errno_arg); } @@ -4323,65 +4851,44 @@ consume_communication_pipe(int fd) ssize_t result; int ret = FALSE; /* for rb_sigwait_sleep */ - /* - * disarm UBF_TIMER before we read, because it can become - * re-armed at any time via sighandler and the pipe will refill - * We can disarm it because this thread is now processing signals - * and we do not want unnecessary SIGVTALRM - */ - ubf_timer_disarm(); - while (1) { - result = read(fd, buff, sizeof(buff)); - if (result > 0) { - ret = TRUE; - if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) { - return ret; - } - } - else if (result == 0) { - return ret; - } - else if (result < 0) { - int e = errno; - switch (e) { - case EINTR: - continue; /* retry */ - case EAGAIN: + result = read(fd, buff, sizeof(buff)); +#if USE_EVENTFD + RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]); +#else + RUBY_DEBUG_LOG("result:%d", (int)result); +#endif + if (result > 0) { + ret = TRUE; + if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) { + return ret; + } + } + else if (result == 0) { + return ret; + } + else if (result < 0) { + int e = errno; + switch (e) { + case EINTR: + continue; /* retry */ + case EAGAIN: #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN - case EWOULDBLOCK: + case EWOULDBLOCK: #endif - return ret; - default: - async_bug_fd("consume_communication_pipe: read", e, fd); - } - } - } -} - -static int -check_signals_nogvl(rb_thread_t *th, int sigwait_fd) -{ - rb_vm_t *vm = GET_VM(); /* th may be 0 */ - int ret = consume_communication_pipe(sigwait_fd); - ubf_wakeup_all_threads(); - ruby_sigchld_handler(vm); - if (rb_signal_buff_size()) { - if (th == vm->main_thread) - /* no need to lock + wakeup if already in main thread */ - RUBY_VM_SET_TRAP_INTERRUPT(th->ec); - else - threadptr_trap_interrupt(vm->main_thread); - ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ + return ret; + default: + async_bug_fd("consume_communication_pipe: read", e, fd); + } + } } - return ret; } void rb_thread_stop_timer_thread(void) { if (TIMER_THREAD_CREATED_P() && native_stop_timer_thread()) { - native_reset_timer_thread(); + native_reset_timer_thread(); } } @@ -4419,10 +4926,10 @@ clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) } } if (branches) { - VALUE counters = RARRAY_AREF(branches, 1); - for (i = 0; i < RARRAY_LEN(counters); i++) { - RARRAY_ASET(counters, i, INT2FIX(0)); - } + VALUE counters = RARRAY_AREF(branches, 1); + for (i = 0; i < RARRAY_LEN(counters); i++) { + RARRAY_ASET(counters, i, INT2FIX(0)); + } } return ST_CONTINUE; @@ -4438,42 +4945,63 @@ rb_clear_coverages(void) } #if defined(HAVE_WORKING_FORK) + static void rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const rb_thread_t *)) { rb_thread_t *i = 0; rb_vm_t *vm = th->vm; - vm->main_thread = th; + rb_ractor_t *r = th->ractor; + vm->ractor.main_ractor = r; + vm->ractor.main_thread = th; + r->threads.main = th; + r->status_ = ractor_created; - gvl_atfork(th->vm); + thread_sched_atfork(TH_SCHED(th)); ubf_list_atfork(); + rb_signal_atfork(); - list_for_each(&vm->living_threads, i, vmlt_node) { - atfork(i, th); + // OK. Only this thread accesses: + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { + if (r != vm->ractor.main_ractor) { + rb_ractor_terminate_atfork(vm, r); + } + ccan_list_for_each(&r->threads.set, i, lt_node) { + atfork(i, th); + } } rb_vm_living_threads_init(vm); - rb_vm_living_threads_insert(vm, th); - /* may be held by MJIT threads in parent */ - rb_native_mutex_initialize(&vm->waitpid_lock); - rb_native_mutex_initialize(&vm->workqueue_lock); + rb_ractor_atfork(vm, th); + rb_vm_postponed_job_atfork(); /* may be held by any thread in parent */ rb_native_mutex_initialize(&th->interrupt_lock); + ccan_list_head_init(&th->interrupt_exec_tasks); vm->fork_gen++; - - vm->sleeper = 0; + rb_ractor_sleeper_threads_clear(th->ractor); rb_clear_coverages(); + + // restart timer thread (timer threads access to `vm->waitpid_lock` and so on. + rb_thread_reset_timer_thread(); + rb_thread_start_timer_thread(); + + VM_ASSERT(vm->ractor.blocking_cnt == 0); + VM_ASSERT(vm->ractor.cnt == 1); } static void terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) { if (th != current_th) { - rb_mutex_abandon_keeping_mutexes(th); - rb_mutex_abandon_locking_mutex(th); - thread_cleanup_func(th, TRUE); + // Clear the scheduler as it is no longer operational: + th->scheduler = Qnil; + + rb_native_mutex_initialize(&th->interrupt_lock); + rb_mutex_abandon_keeping_mutexes(th); + rb_mutex_abandon_locking_mutex(th); + thread_cleanup_func(th, TRUE); } } @@ -4482,22 +5010,21 @@ void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); + rb_threadptr_pending_interrupt_clear(th); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; + th->scheduler = Qnil; rb_fiber_atfork(th); /* We don't want reproduce CVE-2003-0900. */ rb_reset_random_seed(); - - /* For child, starting MJIT worker thread in this place which is safer than immediately after `after_fork_ruby`. */ - mjit_child_after_fork(); } static void terminate_atfork_before_exec_i(rb_thread_t *th, const rb_thread_t *current_th) { if (th != current_th) { - thread_cleanup_func_before_exec(th); + thread_cleanup_func_before_exec(th); } } @@ -4521,19 +5048,16 @@ rb_thread_atfork_before_exec(void) struct thgroup { int enclosed; - VALUE group; }; -static size_t -thgroup_memsize(const void *ptr) -{ - return sizeof(struct thgroup); -} - static const rb_data_type_t thgroup_data_type = { "thgroup", - {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY + { + 0, + RUBY_TYPED_DEFAULT_FREE, + NULL, // No external memory to report + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE }; /* @@ -4563,7 +5087,6 @@ thgroup_s_alloc(VALUE klass) group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data); data->enclosed = 0; - data->group = group; return group; } @@ -4581,13 +5104,13 @@ static VALUE thgroup_list(VALUE group) { VALUE ary = rb_ary_new(); - rb_vm_t *vm = GET_THREAD()->vm; rb_thread_t *th = 0; + rb_ractor_t *r = GET_RACTOR(); - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->thgroup == group) { - rb_ary_push(ary, th->self); - } + ccan_list_for_each(&r->threads.set, th, lt_node) { + if (th->thgroup == group) { + rb_ary_push(ary, th->self); + } } return ary; } @@ -4634,9 +5157,7 @@ thgroup_enclosed_p(VALUE group) struct thgroup *data; TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); - if (data->enclosed) - return Qtrue; - return Qfalse; + return RBOOL(data->enclosed); } @@ -4673,24 +5194,20 @@ thgroup_add(VALUE group, VALUE thread) struct thgroup *data; if (OBJ_FROZEN(group)) { - rb_raise(rb_eThreadError, "can't move to the frozen thread group"); + rb_raise(rb_eThreadError, "can't move to the frozen thread group"); } TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); if (data->enclosed) { - rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); - } - - if (!target_th->thgroup) { - return Qnil; + rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); } if (OBJ_FROZEN(target_th->thgroup)) { - rb_raise(rb_eThreadError, "can't move from the frozen thread group"); + rb_raise(rb_eThreadError, "can't move from the frozen thread group"); } TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data); if (data->enclosed) { - rb_raise(rb_eThreadError, - "can't move from the enclosed thread group"); + rb_raise(rb_eThreadError, + "can't move from the enclosed thread group"); } target_th->thgroup = group; @@ -4719,17 +5236,23 @@ thread_shield_alloc(VALUE klass) } #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type)) -#define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19) +#define THREAD_SHIELD_WAITING_MASK (((FL_USER19-1)&~(FL_USER0-1))|FL_USER19) #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT) -#define rb_thread_shield_waiting(b) ((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT) +#define THREAD_SHIELD_WAITING_MAX (THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT) +STATIC_ASSERT(THREAD_SHIELD_WAITING_MAX, THREAD_SHIELD_WAITING_MAX <= UINT_MAX); +static inline unsigned int +rb_thread_shield_waiting(VALUE b) +{ + return ((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT); +} static inline void rb_thread_shield_waiting_inc(VALUE b) { - unsigned long w = rb_thread_shield_waiting(b); + unsigned int w = rb_thread_shield_waiting(b); w++; - if (w > (THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)) - rb_raise(rb_eRuntimeError, "waiting count overflow"); + if (w > THREAD_SHIELD_WAITING_MAX) + rb_raise(rb_eRuntimeError, "waiting count overflow"); RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT); } @@ -4737,7 +5260,7 @@ rb_thread_shield_waiting_inc(VALUE b) static inline void rb_thread_shield_waiting_dec(VALUE b) { - unsigned long w = rb_thread_shield_waiting(b); + unsigned int w = rb_thread_shield_waiting(b); if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow"); w--; RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; @@ -4752,6 +5275,17 @@ rb_thread_shield_new(void) return thread_shield; } +bool +rb_thread_shield_owned(VALUE self) +{ + VALUE mutex = GetThreadShieldPtr(self); + if (!mutex) return false; + + rb_mutex_t *m = mutex_ptr(mutex); + + return m->ec_serial == rb_ec_serial(GET_EC()); +} + /* * Wait a thread shield. * @@ -4768,7 +5302,7 @@ rb_thread_shield_wait(VALUE self) if (!mutex) return Qfalse; m = mutex_ptr(mutex); - if (m->th == GET_THREAD()) return Qnil; + if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil; rb_thread_shield_waiting_inc(self); rb_mutex_lock(mutex); rb_thread_shield_waiting_dec(self); @@ -4782,7 +5316,7 @@ thread_shield_get_mutex(VALUE self) { VALUE mutex = GetThreadShieldPtr(self); if (!mutex) - rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self); + rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self); return mutex; } @@ -4794,7 +5328,7 @@ rb_thread_shield_release(VALUE self) { VALUE mutex = thread_shield_get_mutex(self); rb_mutex_unlock(mutex); - return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; + return RBOOL(rb_thread_shield_waiting(self) > 0); } /* @@ -4806,7 +5340,7 @@ rb_thread_shield_destroy(VALUE self) VALUE mutex = thread_shield_get_mutex(self); DATA_PTR(self) = 0; rb_mutex_unlock(mutex); - return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; + return RBOOL(rb_thread_shield_waiting(self) > 0); } static VALUE @@ -4836,57 +5370,57 @@ recursive_list_access(VALUE sym) VALUE hash = threadptr_recursive_hash(th); VALUE list; if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) { - hash = rb_ident_hash_new(); - threadptr_recursive_hash_set(th, hash); - list = Qnil; + hash = rb_ident_hash_new(); + threadptr_recursive_hash_set(th, hash); + list = Qnil; } else { - list = rb_hash_aref(hash, sym); + list = rb_hash_aref(hash, sym); } if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) { - list = rb_hash_new(); - rb_hash_aset(hash, sym, list); + list = rb_ident_hash_new(); + rb_hash_aset(hash, sym, list); } return list; } /* - * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already + * Returns true if and only if obj (or the pair <obj, paired_obj>) is already * in the recursion list. * Assumes the recursion list is valid. */ -static VALUE -recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) +static bool +recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id) { #if SIZEOF_LONG == SIZEOF_VOIDP #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other)) #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP - #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \ + #define OBJ_ID_EQL(obj_id, other) (RB_BIGNUM_TYPE_P((obj_id)) ? \ rb_big_eql((obj_id), (other)) : ((obj_id) == (other))) #endif - VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); - if (pair_list == Qundef) - return Qfalse; + VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); + if (UNDEF_P(pair_list)) + return false; if (paired_obj_id) { - if (!RB_TYPE_P(pair_list, T_HASH)) { - if (!OBJ_ID_EQL(paired_obj_id, pair_list)) - return Qfalse; - } - else { - if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) - return Qfalse; - } + if (!RB_TYPE_P(pair_list, T_HASH)) { + if (!OBJ_ID_EQL(paired_obj_id, pair_list)) + return false; + } + else { + if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) + return false; + } } - return Qtrue; + return true; } /* - * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. - * For a single obj_id, it sets list[obj_id] to Qtrue. - * For a pair, it sets list[obj_id] to paired_obj_id if possible, - * otherwise list[obj_id] becomes a hash like: + * Pushes obj (or the pair <obj, paired_obj>) in the recursion list. + * For a single obj, it sets list[obj] to Qtrue. + * For a pair, it sets list[obj] to paired_obj_id if possible, + * otherwise list[obj] becomes a hash like: * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } * Assumes the recursion list is valid. */ @@ -4897,27 +5431,27 @@ recursive_push(VALUE list, VALUE obj, VALUE paired_obj) VALUE pair_list; if (!paired_obj) { - rb_hash_aset(list, obj, Qtrue); + rb_hash_aset(list, obj, Qtrue); } - else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) { - rb_hash_aset(list, obj, paired_obj); + else if (UNDEF_P(pair_list = rb_hash_lookup2(list, obj, Qundef))) { + rb_hash_aset(list, obj, paired_obj); } else { - if (!RB_TYPE_P(pair_list, T_HASH)){ - VALUE other_paired_obj = pair_list; - pair_list = rb_hash_new(); - rb_hash_aset(pair_list, other_paired_obj, Qtrue); - rb_hash_aset(list, obj, pair_list); - } - rb_hash_aset(pair_list, paired_obj, Qtrue); + if (!RB_TYPE_P(pair_list, T_HASH)){ + VALUE other_paired_obj = pair_list; + pair_list = rb_hash_new(); + rb_hash_aset(pair_list, other_paired_obj, Qtrue); + rb_hash_aset(list, obj, pair_list); + } + rb_hash_aset(pair_list, paired_obj, Qtrue); } } /* - * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. - * For a pair, if list[obj_id] is a hash, then paired_obj_id is + * Pops obj (or the pair <obj, paired_obj>) from the recursion list. + * For a pair, if list[obj] is a hash, then paired_obj_id is * removed from the hash and no attempt is made to simplify - * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id + * list[obj] from {only_one_paired_id => true} to only_one_paired_id * Assumes the recursion list is valid. */ @@ -4925,16 +5459,16 @@ static int recursive_pop(VALUE list, VALUE obj, VALUE paired_obj) { if (paired_obj) { - VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); - if (pair_list == Qundef) { - return 0; - } - if (RB_TYPE_P(pair_list, T_HASH)) { - rb_hash_delete_entry(pair_list, paired_obj); - if (!RHASH_EMPTY_P(pair_list)) { - return 1; /* keep hash until is empty */ - } - } + VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); + if (UNDEF_P(pair_list)) { + return 0; + } + if (RB_TYPE_P(pair_list, T_HASH)) { + rb_hash_delete_entry(pair_list, paired_obj); + if (!RHASH_EMPTY_P(pair_list)) { + return 1; /* keep hash until is empty */ + } + } } rb_hash_delete_entry(list, obj); return 1; @@ -4944,7 +5478,6 @@ struct exec_recursive_params { VALUE (*func) (VALUE, VALUE, int); VALUE list; VALUE obj; - VALUE objid; VALUE pairid; VALUE arg; }; @@ -4960,7 +5493,7 @@ exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data)) * Calls func(obj, arg, recursive), where recursive is non-zero if the * current method is called recursively on obj, or on the pair <obj, pairid> * If outer is 0, then the innermost func will be called with recursive set - * to Qtrue, otherwise the outermost func will be called. In the latter case, + * to true, otherwise the outermost func will be called. In the latter case, * all inner func are short-circuited by throw. * Implementation details: the value thrown is the recursive list which is * proper to the current method and unlikely to be caught anywhere else. @@ -4968,62 +5501,63 @@ exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data)) */ static VALUE -exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer) +exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer, ID mid) { VALUE result = Qundef; - const ID mid = rb_frame_last_func(); const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL); struct exec_recursive_params p; int outermost; p.list = recursive_list_access(sym); - p.objid = rb_memory_id(obj); p.obj = obj; p.pairid = pairid; p.arg = arg; outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); - if (recursive_check(p.list, p.objid, pairid)) { - if (outer && !outermost) { - rb_throw_obj(p.list, p.list); - } - return (*func)(obj, arg, TRUE); + if (recursive_check(p.list, p.obj, pairid)) { + if (outer && !outermost) { + rb_throw_obj(p.list, p.list); + } + return (*func)(obj, arg, TRUE); } else { - enum ruby_tag_type state; - - p.func = func; - - if (outermost) { - recursive_push(p.list, ID2SYM(recursive_key), 0); - recursive_push(p.list, p.objid, p.pairid); - result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state); - if (!recursive_pop(p.list, p.objid, p.pairid)) goto invalid; - if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid; - if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state); - if (result == p.list) { - result = (*func)(obj, arg, TRUE); - } - } - else { - volatile VALUE ret = Qundef; - recursive_push(p.list, p.objid, p.pairid); - EC_PUSH_TAG(GET_EC()); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - ret = (*func)(obj, arg, FALSE); - } - EC_POP_TAG(); - if (!recursive_pop(p.list, p.objid, p.pairid)) { - invalid: - rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list " - "for %+"PRIsVALUE" in %+"PRIsVALUE, - sym, rb_thread_current()); - } - if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state); - result = ret; - } + enum ruby_tag_type state; + + p.func = func; + + if (outermost) { + recursive_push(p.list, ID2SYM(recursive_key), 0); + recursive_push(p.list, p.obj, p.pairid); + result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state); + if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid; + if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid; + if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state); + if (result == p.list) { + result = (*func)(obj, arg, TRUE); + } + } + else { + volatile VALUE ret = Qundef; + recursive_push(p.list, p.obj, p.pairid); + EC_PUSH_TAG(GET_EC()); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + ret = (*func)(obj, arg, FALSE); + } + EC_POP_TAG(); + if (!recursive_pop(p.list, p.obj, p.pairid)) { + goto invalid; + } + if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state); + result = ret; + } } *(volatile struct exec_recursive_params *)&p; return result; + + invalid: + rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list " + "for %+"PRIsVALUE" in %+"PRIsVALUE, + sym, rb_thread_current()); + UNREACHABLE_RETURN(Qundef); } /* @@ -5034,7 +5568,7 @@ exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE VALUE rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) { - return exec_recursive(func, obj, 0, arg, 0); + return exec_recursive(func, obj, 0, arg, 0, rb_frame_last_func()); } /* @@ -5045,36 +5579,42 @@ rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) VALUE rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) { - return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0); + return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0, rb_frame_last_func()); } /* * If recursion is detected on the current method and obj, the outermost - * func will be called with (obj, arg, Qtrue). All inner func will be + * func will be called with (obj, arg, true). All inner func will be * short-circuited using throw. */ VALUE rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) { - return exec_recursive(func, obj, 0, arg, 1); + return exec_recursive(func, obj, 0, arg, 1, rb_frame_last_func()); +} + +VALUE +rb_exec_recursive_outer_mid(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg, ID mid) +{ + return exec_recursive(func, obj, 0, arg, 1, mid); } /* * If recursion is detected on the current method, obj and paired_obj, - * the outermost func will be called with (obj, arg, Qtrue). All inner + * the outermost func will be called with (obj, arg, true). All inner * func will be short-circuited using throw. */ VALUE rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) { - return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1); + return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1, rb_frame_last_func()); } /* * call-seq: - * thread.backtrace -> array + * thread.backtrace -> array or nil * * Returns the current backtrace of the target thread. * @@ -5103,6 +5643,15 @@ rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) return rb_vm_thread_backtrace_locations(argc, argv, thval); } +void +Init_Thread_Mutex(void) +{ + rb_thread_t *th = GET_THREAD(); + + rb_native_mutex_initialize(&th->vm->workqueue_lock); + rb_native_mutex_initialize(&th->interrupt_lock); +} + /* * Document-class: ThreadError * @@ -5121,35 +5670,28 @@ rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) void Init_Thread(void) { -#undef rb_intern -#define rb_intern(str) rb_intern_const(str) - - VALUE cThGroup; rb_thread_t *th = GET_THREAD(); - sym_never = ID2SYM(rb_intern("never")); - sym_immediate = ID2SYM(rb_intern("immediate")); - sym_on_blocking = ID2SYM(rb_intern("on_blocking")); - id_locals = rb_intern("locals"); + sym_never = ID2SYM(rb_intern_const("never")); + sym_immediate = ID2SYM(rb_intern_const("immediate")); + sym_on_blocking = ID2SYM(rb_intern_const("on_blocking")); rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); rb_define_singleton_method(rb_cThread, "start", thread_start, -2); rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); - rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); + rb_define_singleton_method(rb_cThread, "stop", thread_stop, 0); rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); - rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); + rb_define_singleton_method(rb_cThread, "list", thread_list, 0); rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); rb_define_singleton_method(rb_cThread, "report_on_exception", rb_thread_s_report_exc, 0); rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1); -#if THREAD_DEBUG < 0 - rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); - rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); -#endif + rb_define_singleton_method(rb_cThread, "ignore_deadlock", rb_thread_s_ignore_deadlock, 0); + rb_define_singleton_method(rb_cThread, "ignore_deadlock=", rb_thread_s_ignore_deadlock_set, 1); rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1); rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1); rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1); @@ -5181,18 +5723,18 @@ Init_Thread(void) rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); rb_define_method(rb_cThread, "report_on_exception", rb_thread_report_exc, 0); rb_define_method(rb_cThread, "report_on_exception=", rb_thread_report_exc_set, 1); - rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); rb_define_method(rb_cThread, "group", rb_thread_group, 0); rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1); rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1); rb_define_method(rb_cThread, "name", rb_thread_getname, 0); rb_define_method(rb_cThread, "name=", rb_thread_setname, 1); + rb_define_method(rb_cThread, "native_thread_id", rb_thread_native_thread_id, 0); rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0); rb_define_alias(rb_cThread, "inspect", "to_s"); rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError, - "stream closed in another thread"); + "stream closed in another thread"); cThGroup = rb_define_class("ThreadGroup", rb_cObject); rb_define_alloc_func(cThGroup, thgroup_s_alloc); @@ -5201,37 +5743,49 @@ Init_Thread(void) rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0); rb_define_method(cThGroup, "add", thgroup_add, 1); + const char * ptr = getenv("RUBY_THREAD_TIMESLICE"); + + if (ptr) { + long quantum = strtol(ptr, NULL, 0); + if (quantum > 0 && !(SIZEOF_LONG > 4 && quantum > UINT32_MAX)) { + thread_default_quantum_ms = (uint32_t)quantum; + } + else if (0) { + fprintf(stderr, "Ignored RUBY_THREAD_TIMESLICE=%s\n", ptr); + } + } + { - th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); - rb_define_const(cThGroup, "Default", th->thgroup); + th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup); + rb_define_const(cThGroup, "Default", th->thgroup); } - recursive_key = rb_intern("__recursive_key__"); rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); /* init thread core */ { - /* main thread setting */ - { - /* acquire global vm lock */ - gvl_init(th->vm); - gvl_acquire(th->vm, th); - rb_native_mutex_initialize(&th->vm->waitpid_lock); - rb_native_mutex_initialize(&th->vm->workqueue_lock); - rb_native_mutex_initialize(&th->interrupt_lock); + /* main thread setting */ + { + /* acquire global vm lock */ +#ifdef HAVE_PTHREAD_NP_H + VM_ASSERT(TH_SCHED(th)->running == th); +#endif + // thread_sched_to_running() should not be called because + // it assumes blocked by thread_sched_to_waiting(). + // thread_sched_to_running(sched, th); - th->pending_interrupt_queue = rb_ary_tmp_new(0); - th->pending_interrupt_queue_checked = 0; - th->pending_interrupt_mask_stack = rb_ary_tmp_new(0); - } + th->pending_interrupt_queue = rb_ary_hidden_new(0); + th->pending_interrupt_queue_checked = 0; + th->pending_interrupt_mask_stack = rb_ary_hidden_new(0); + } } rb_thread_create_timer_thread(); - /* suppress warnings on cygwin, mingw and mswin.*/ - (void)native_mutex_trylock; - Init_thread_sync(); + + // TODO: Suppress unused function warning for now + // if (0) rb_thread_sched_destroy(NULL); } int @@ -5242,68 +5796,85 @@ ruby_native_thread_p(void) return th != 0; } +#ifdef NON_SCALAR_THREAD_ID + #define thread_id_str(th) (NULL) +#else + #define thread_id_str(th) ((void *)(uintptr_t)(th)->nt->thread_id) +#endif + static void -debug_deadlock_check(rb_vm_t *vm, VALUE msg) +debug_deadlock_check(rb_ractor_t *r, VALUE msg) { rb_thread_t *th = 0; VALUE sep = rb_str_new_cstr("\n "); rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n", - vm_living_thread_num(vm), vm->sleeper, (void *)GET_THREAD(), (void *)vm->main_thread); - list_for_each(&vm->living_threads, th, vmlt_node) { - rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " - "native:%"PRI_THREAD_ID" int:%u", - th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag); - if (th->locking_mutex) { - rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, - (void *)mutex->th, rb_mutex_num_waiting(mutex)); - } - { - rb_thread_list_t *list = th->join_list; - while (list) { - rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th); - list = list->next; - } - } - rb_str_catf(msg, "\n "); - rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep)); - rb_str_catf(msg, "\n"); + rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r), + (void *)GET_THREAD(), (void *)r->threads.main); + + ccan_list_for_each(&r->threads.set, th, lt_node) { + rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " + "native:%p int:%u", + th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag); + + if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE, + (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex)); + } + + { + struct rb_waiting_list *list = th->join_list; + while (list) { + rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread); + list = list->next; + } + } + rb_str_catf(msg, "\n "); + rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, RUBY_BACKTRACE_START, RUBY_ALL_BACKTRACE_LINES), sep)); + rb_str_catf(msg, "\n"); } } static void -rb_check_deadlock(rb_vm_t *vm) +rb_check_deadlock(rb_ractor_t *r) { - int found = 0; - rb_thread_t *th = 0; + if (GET_THREAD()->vm->thread_ignore_deadlock) return; + +#ifdef RUBY_THREAD_PTHREAD_H + if (r->threads.sched.readyq_cnt > 0) return; +#endif - if (vm_living_thread_num(vm) > vm->sleeper) return; - if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); - if (patrol_thread && patrol_thread != GET_THREAD()) return; + int sleeper_num = rb_ractor_sleeper_thread_num(r); + int ltnum = rb_ractor_living_thread_num(r); - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { - found = 1; - } - else if (th->locking_mutex) { - rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + if (ltnum > sleeper_num) return; + if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); - if (mutex->th == th || (!mutex->th && !list_empty(&mutex->waitq))) { - found = 1; - } - } - if (found) - break; + int found = 0; + rb_thread_t *th = NULL; + + ccan_list_for_each(&r->threads.set, th, lt_node) { + if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { + found = 1; + } + else if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) { + found = 1; + } + } + if (found) + break; } if (!found) { - VALUE argv[2]; - argv[0] = rb_eFatal; - argv[1] = rb_str_new2("No live threads left. Deadlock?"); - debug_deadlock_check(vm, argv[1]); - vm->sleeper--; - rb_threadptr_raise(vm->main_thread, 2, argv); + VALUE argv[2]; + argv[0] = rb_eFatal; + argv[1] = rb_str_new2("No live threads left. Deadlock?"); + debug_deadlock_check(r, argv[1]); + rb_ractor_sleeper_threads_dec(GET_RACTOR()); + rb_threadptr_raise(r->threads.main, 2, argv); } } @@ -5313,27 +5884,28 @@ update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg) const rb_control_frame_t *cfp = GET_EC()->cfp; VALUE coverage = rb_iseq_coverage(cfp->iseq); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { - VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES); - if (lines) { - long line = rb_sourceline() - 1; - long count; - VALUE num; + VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES); + if (lines) { + long line = rb_sourceline() - 1; + VM_ASSERT(line >= 0); + long count; + VALUE num; void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset); if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) { - rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - cfp->iseq->body->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); + rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); rb_ary_push(lines, LONG2FIX(line + 1)); return; } - if (line >= RARRAY_LEN(lines)) { /* no longer tracked */ - return; - } - num = RARRAY_AREF(lines, line); - if (!FIXNUM_P(num)) return; - count = FIX2LONG(num) + 1; - if (POSFIXABLE(count)) { - RARRAY_ASET(lines, line, LONG2FIX(count)); - } - } + if (line >= RARRAY_LEN(lines)) { /* no longer tracked */ + return; + } + num = RARRAY_AREF(lines, line); + if (!FIXNUM_P(num)) return; + count = FIX2LONG(num) + 1; + if (POSFIXABLE(count)) { + RARRAY_ASET(lines, line, LONG2FIX(count)); + } + } } } @@ -5343,17 +5915,17 @@ update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg) const rb_control_frame_t *cfp = GET_EC()->cfp; VALUE coverage = rb_iseq_coverage(cfp->iseq); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { - VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES); - if (branches) { - long pc = cfp->pc - cfp->iseq->body->iseq_encoded - 1; + VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES); + if (branches) { + long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1; long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count; - VALUE counters = RARRAY_AREF(branches, 1); - VALUE num = RARRAY_AREF(counters, idx); - count = FIX2LONG(num) + 1; - if (POSFIXABLE(count)) { - RARRAY_ASET(counters, idx, LONG2FIX(count)); - } - } + VALUE counters = RARRAY_AREF(branches, 1); + VALUE num = RARRAY_AREF(counters, idx); + count = FIX2LONG(num) + 1; + if (POSFIXABLE(count)) { + RARRAY_ASET(counters, idx, LONG2FIX(count)); + } + } } } @@ -5362,55 +5934,57 @@ rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5]) { VALUE path, beg_pos_lineno, beg_pos_column, end_pos_lineno, end_pos_column; + if (!me->def) return NULL; // negative cme + retry: switch (me->def->type) { case VM_METHOD_TYPE_ISEQ: { - const rb_iseq_t *iseq = me->def->body.iseq.iseqptr; - rb_iseq_location_t *loc = &iseq->body->location; - path = rb_iseq_path(iseq); - beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno); - beg_pos_column = INT2FIX(loc->code_location.beg_pos.column); - end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno); - end_pos_column = INT2FIX(loc->code_location.end_pos.column); - break; + const rb_iseq_t *iseq = me->def->body.iseq.iseqptr; + rb_iseq_location_t *loc = &ISEQ_BODY(iseq)->location; + path = rb_iseq_path(iseq); + beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno); + beg_pos_column = INT2FIX(loc->code_location.beg_pos.column); + end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno); + end_pos_column = INT2FIX(loc->code_location.end_pos.column); + break; } case VM_METHOD_TYPE_BMETHOD: { const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0); - if (iseq) { - rb_iseq_location_t *loc; - rb_iseq_check(iseq); - path = rb_iseq_path(iseq); - loc = &iseq->body->location; - beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno); - beg_pos_column = INT2FIX(loc->code_location.beg_pos.column); - end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno); - end_pos_column = INT2FIX(loc->code_location.end_pos.column); - break; - } - return NULL; + if (iseq) { + rb_iseq_location_t *loc; + rb_iseq_check(iseq); + path = rb_iseq_path(iseq); + loc = &ISEQ_BODY(iseq)->location; + beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno); + beg_pos_column = INT2FIX(loc->code_location.beg_pos.column); + end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno); + end_pos_column = INT2FIX(loc->code_location.end_pos.column); + break; + } + return NULL; } case VM_METHOD_TYPE_ALIAS: - me = me->def->body.alias.original_me; - goto retry; + me = me->def->body.alias.original_me; + goto retry; case VM_METHOD_TYPE_REFINED: - me = me->def->body.refined.orig_me; - if (!me) return NULL; - goto retry; + me = me->def->body.refined.orig_me; + if (!me) return NULL; + goto retry; default: - return NULL; + return NULL; } /* found */ if (RB_TYPE_P(path, T_ARRAY)) { - path = rb_ary_entry(path, 1); - if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */ + path = rb_ary_entry(path, 1); + if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */ } if (resolved_location) { - resolved_location[0] = path; - resolved_location[1] = beg_pos_lineno; - resolved_location[2] = beg_pos_column; - resolved_location[3] = end_pos_lineno; - resolved_location[4] = end_pos_column; + resolved_location[0] = path; + resolved_location[1] = beg_pos_lineno; + resolved_location[2] = beg_pos_column; + resolved_location[3] = end_pos_lineno; + resolved_location[4] = end_pos_column; } return me; } @@ -5430,7 +6004,7 @@ update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg) rcount = rb_hash_aref(me2counter, (VALUE) me); count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1; if (POSFIXABLE(count)) { - rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count)); + rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count)); } } @@ -5450,76 +6024,278 @@ void rb_set_coverages(VALUE coverages, int mode, VALUE me2counter) { GET_VM()->coverages = coverages; + GET_VM()->me2counter = me2counter; GET_VM()->coverage_mode = mode; +} + +void +rb_resume_coverages(void) +{ + int mode = GET_VM()->coverage_mode; + VALUE me2counter = GET_VM()->me2counter; rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); if (mode & COVERAGE_TARGET_BRANCHES) { - rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); + rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); } if (mode & COVERAGE_TARGET_METHODS) { - rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); + rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); } } -/* Make coverage arrays empty so old covered files are no longer tracked. */ void -rb_reset_coverages(void) +rb_suspend_coverages(void) { - rb_clear_coverages(); - rb_iseq_remove_coverage_all(); - GET_VM()->coverages = Qfalse; rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage); if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) { - rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage); + rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage); } if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) { - rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage); + rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage); } } +/* Make coverage arrays empty so old covered files are no longer tracked. */ +void +rb_reset_coverages(void) +{ + rb_clear_coverages(); + rb_iseq_remove_coverage_all(); + GET_VM()->coverages = Qfalse; +} + VALUE rb_default_coverage(int n) { - VALUE coverage = rb_ary_tmp_new_fill(3); + VALUE coverage = rb_ary_hidden_new_fill(3); VALUE lines = Qfalse, branches = Qfalse; int mode = GET_VM()->coverage_mode; if (mode & COVERAGE_TARGET_LINES) { - lines = n > 0 ? rb_ary_tmp_new_fill(n) : rb_ary_tmp_new(0); + lines = n > 0 ? rb_ary_hidden_new_fill(n) : rb_ary_hidden_new(0); } RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines); if (mode & COVERAGE_TARGET_BRANCHES) { - branches = rb_ary_tmp_new_fill(2); - /* internal data structures for branch coverage: - * - * [[base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column, - * target_type_1, target_first_lineno_1, target_first_column_1, target_last_lineno_1, target_last_column_1, target_counter_index_1, - * target_type_2, target_first_lineno_2, target_first_column_2, target_last_lineno_2, target_last_column_2, target_counter_index_2, ...], - * ...] - * - * Example: [[:case, 1, 0, 4, 3, - * :when, 2, 8, 2, 9, 0, - * :when, 3, 8, 3, 9, 1, ...], - * ...] - */ - RARRAY_ASET(branches, 0, rb_ary_tmp_new(0)); - /* branch execution counters */ - RARRAY_ASET(branches, 1, rb_ary_tmp_new(0)); + branches = rb_ary_hidden_new_fill(2); + /* internal data structures for branch coverage: + * + * { branch base node => + * [base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column, { + * branch target id => + * [target_type, target_first_lineno, target_first_column, target_last_lineno, target_last_column, target_counter_index], + * ... + * }], + * ... + * } + * + * Example: + * { NODE_CASE => + * [1, 0, 4, 3, { + * NODE_WHEN => [2, 8, 2, 9, 0], + * NODE_WHEN => [3, 8, 3, 9, 1], + * ... + * }], + * ... + * } + */ + VALUE structure = rb_hash_new(); + rb_obj_hide(structure); + RARRAY_ASET(branches, 0, structure); + /* branch execution counters */ + RARRAY_ASET(branches, 1, rb_ary_hidden_new(0)); } RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches); return coverage; } +static VALUE +uninterruptible_exit(VALUE v) +{ + rb_thread_t *cur_th = GET_THREAD(); + rb_ary_pop(cur_th->pending_interrupt_mask_stack); + + cur_th->pending_interrupt_queue_checked = 0; + if (!rb_threadptr_pending_interrupt_empty_p(cur_th)) { + RUBY_VM_SET_INTERRUPT(cur_th->ec); + } + return Qnil; +} + VALUE -rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) +rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data) { VALUE interrupt_mask = rb_ident_hash_new(); rb_thread_t *cur_th = GET_THREAD(); rb_hash_aset(interrupt_mask, rb_cObject, sym_never); - OBJ_FREEZE_RAW(interrupt_mask); + OBJ_FREEZE(interrupt_mask); rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask); - return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); + VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil); + + RUBY_VM_CHECK_INTS(cur_th->ec); + return ret; } + +static void +thread_specific_storage_alloc(rb_thread_t *th) +{ + VM_ASSERT(th->specific_storage == NULL); + + if (UNLIKELY(specific_key_count > 0)) { + th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + } +} + +rb_internal_thread_specific_key_t +rb_internal_thread_specific_key_create(void) +{ + rb_vm_t *vm = GET_VM(); + + if (specific_key_count == 0 && vm->ractor.cnt > 1) { + rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors"); + } + else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) { + rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + } + else { + rb_internal_thread_specific_key_t key = specific_key_count++; + + if (key == 0) { + // allocate + rb_ractor_t *cr = GET_RACTOR(); + rb_thread_t *th; + + ccan_list_for_each(&cr->threads.set, th, lt_node) { + thread_specific_storage_alloc(th); + } + } + return key; + } +} + +// async and native thread safe. +void * +rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key) +{ + rb_thread_t *th = DATA_PTR(thread_val); + + VM_ASSERT(rb_thread_ptr(thread_val) == th); + VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + VM_ASSERT(th->specific_storage); + + return th->specific_storage[key]; +} + +// async and native thread safe. +void +rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data) +{ + rb_thread_t *th = DATA_PTR(thread_val); + + VM_ASSERT(rb_thread_ptr(thread_val) == th); + VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX); + VM_ASSERT(th->specific_storage); + + th->specific_storage[key] = data; +} + +// interrupt_exec + +struct rb_interrupt_exec_task { + struct ccan_list_node node; + + rb_interrupt_exec_func_t *func; + void *data; + enum rb_interrupt_exec_flag flags; +}; + +void +rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th) +{ + struct rb_interrupt_exec_task *task; + + ccan_list_for_each(&th->interrupt_exec_tasks, task, node) { + if (task->flags & rb_interrupt_exec_flag_value_data) { + rb_gc_mark((VALUE)task->data); + } + } +} + +// native thread safe +// th should be available +void +rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags) +{ + // should not use ALLOC + struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task); + *task = (struct rb_interrupt_exec_task) { + .flags = flags, + .func = func, + .data = data, + }; + + rb_native_mutex_lock(&th->interrupt_lock); + { + ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node); + threadptr_set_interrupt_locked(th, true); + } + rb_native_mutex_unlock(&th->interrupt_lock); +} + +static void +threadptr_interrupt_exec_exec(rb_thread_t *th) +{ + while (1) { + struct rb_interrupt_exec_task *task; + + rb_native_mutex_lock(&th->interrupt_lock); + { + task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node); + } + rb_native_mutex_unlock(&th->interrupt_lock); + + RUBY_DEBUG_LOG("task:%p", task); + + if (task) { + if (task->flags & rb_interrupt_exec_flag_new_thread) { + rb_thread_create(task->func, task->data); + } + else { + (*task->func)(task->data); + } + ruby_xfree(task); + } + else { + break; + } + } +} + +static void +threadptr_interrupt_exec_cleanup(rb_thread_t *th) +{ + rb_native_mutex_lock(&th->interrupt_lock); + { + struct rb_interrupt_exec_task *task; + + while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) { + ruby_xfree(task); + } + } + rb_native_mutex_unlock(&th->interrupt_lock); +} + +// native thread safe +// func/data should be native thread safe +void +rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r, + rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags) +{ + RUBY_DEBUG_LOG("flags:%d", (int)flags); + + rb_thread_t *main_th = target_r->threads.main; + rb_threadptr_interrupt_exec(main_th, func, data, flags | rb_interrupt_exec_flag_new_thread); +} + |
