diff options
Diffstat (limited to 'thread.c')
| -rw-r--r-- | thread.c | 3402 |
1 files changed, 2104 insertions, 1298 deletions
@@ -20,6 +20,12 @@ model 3: Native Thread with fine grain lock Using pthread and Ruby threads run concurrent or parallel. + model 4: M:N User:Native threads with Global VM lock + Combination of model 1 and 2 + + model 5: M:N User:Native thread with fine grain lock + Combination of model 1 and 3 + ------------------------------------------------------------------------ model 2: @@ -57,13 +63,40 @@ /* for model 2 */ +#include "ruby/internal/config.h" + +#ifdef __linux__ +// Normally, gcc(1) translates calls to alloca() with inlined code. This is not done when either the -ansi, -std=c89, -std=c99, or the -std=c11 option is given and the header <alloca.h> is not included. +# include <alloca.h> +#endif + #include "eval_intern.h" #include "gc.h" -#include "timev.h" +#include "hrtime.h" +#include "internal.h" +#include "internal/class.h" +#include "internal/cont.h" +#include "internal/error.h" +#include "internal/hash.h" +#include "internal/io.h" +#include "internal/object.h" +#include "internal/proc.h" +#include "ruby/fiber/scheduler.h" +#include "internal/signal.h" +#include "internal/thread.h" +#include "internal/time.h" +#include "internal/warnings.h" +#include "iseq.h" +#include "mjit.h" +#include "ruby/debug.h" #include "ruby/io.h" #include "ruby/thread.h" #include "ruby/thread_native.h" -#include "internal.h" +#include "timev.h" +#include "vm_core.h" +#include "ractor_core.h" +#include "vm_debug.h" +#include "vm_sync.h" #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 @@ -80,16 +113,37 @@ static VALUE rb_cThreadShield; static VALUE sym_immediate; static VALUE sym_on_blocking; static VALUE sym_never; -static ID id_locals; -static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check); -static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check); -static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check); -static void rb_thread_sleep_deadly_allow_spurious_wakeup(void); -static double timeofday(void); +enum SLEEP_FLAGS { + SLEEP_DEADLOCKABLE = 0x1, + SLEEP_SPURIOUS_CHECK = 0x2 +}; + +#define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13 +#define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED) + +static inline VALUE +rb_thread_local_storage(VALUE thread) +{ + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { + rb_ivar_set(thread, idLocals, rb_hash_new()); + RB_FL_SET_RAW(thread, THREAD_LOCAL_STORAGE_INITIALISED); + } + return rb_ivar_get(thread, idLocals); +} + +static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); +static void sleep_forever(rb_thread_t *th, unsigned int fl); +static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker); static int rb_threadptr_dead(rb_thread_t *th); -static void rb_check_deadlock(rb_vm_t *vm); -static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th); +static void rb_check_deadlock(rb_ractor_t *r); +static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th); +static const char *thread_status_name(rb_thread_t *th, int detail); +static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t); +NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); +static int consume_communication_pipe(int fd); +static int check_signals_nogvl(rb_thread_t *, int sigwait_fd); +void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */ #define eKillSignal INT2FIX(0) #define eTerminateSignal INT2FIX(1) @@ -101,51 +155,28 @@ struct waiting_fd { int fd; }; -inline static void -st_delete_wrap(st_table *table, st_data_t key) -{ - st_delete(table, &key, 0); -} - /********************************************************************************/ #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION struct rb_blocking_region_buffer { enum rb_thread_status prev_status; - struct rb_unblock_callback oldubf; }; -static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, - struct rb_unblock_callback *old, int fail_if_interrupted); -static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); +static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted); +static void unblock_function_clear(rb_thread_t *th); static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); -#ifdef __ia64 -#define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \ - do{(th)->machine.register_stack_end = rb_ia64_bsp();}while(0) -#else -#define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) -#endif -#define RB_GC_SAVE_MACHINE_CONTEXT(th) \ - do { \ - FLUSH_REGISTER_WINDOWS; \ - RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \ - setjmp((th)->machine.regs); \ - SET_MACHINE_STACK_END(&(th)->machine.stack_end); \ - } while (0) - -#define GVL_UNLOCK_BEGIN() do { \ - rb_thread_t *_th_stored = GET_THREAD(); \ - RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \ - gvl_release(_th_stored->vm); +#define GVL_UNLOCK_BEGIN(th) do { \ + RB_GC_SAVE_MACHINE_CONTEXT(th); \ + gvl_release(rb_ractor_gvl(th->ractor)); -#define GVL_UNLOCK_END() \ - gvl_acquire(_th_stored->vm, _th_stored); \ - rb_thread_set_current(_th_stored); \ +#define GVL_UNLOCK_END(th) \ + gvl_acquire(rb_ractor_gvl(th->ractor), th); \ + rb_ractor_thread_switch(th->ractor, th); \ } while(0) #ifdef __GNUC__ @@ -157,42 +188,76 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio #else #define only_if_constant(expr, notconst) notconst #endif -#define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \ - rb_thread_t *__th = GET_THREAD(); \ +#define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \ struct rb_blocking_region_buffer __region; \ - if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ + if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ /* always return true unless fail_if_interrupted */ \ !only_if_constant(fail_if_interrupted, TRUE)) { \ exec; \ - blocking_region_end(__th, &__region); \ + blocking_region_end(th, &__region); \ }; \ } while(0) -#define RUBY_VM_CHECK_INTS_BLOCKING(th) vm_check_ints_blocking(th) -static inline void -vm_check_ints_blocking(rb_thread_t *th) +/* + * returns true if this thread was spuriously interrupted, false otherwise + * (e.g. hit by Thread#run or ran a Ruby-level Signal.trap handler) + */ +#define RUBY_VM_CHECK_INTS_BLOCKING(ec) vm_check_ints_blocking(ec) +static inline int +vm_check_ints_blocking(rb_execution_context_t *ec) { + rb_thread_t *th = rb_ec_thread_ptr(ec); + if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) { - if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(th))) return; + if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE; } else { th->pending_interrupt_queue_checked = 0; - - RUBY_VM_SET_INTERRUPT(th); + RUBY_VM_SET_INTERRUPT(ec); } - rb_threadptr_execute_interrupts(th, 1); + return rb_threadptr_execute_interrupts(th, 1); } -static int -vm_living_thread_num(rb_vm_t *vm) +int +rb_vm_check_ints_blocking(rb_execution_context_t *ec) { - return (int)vm->living_thread_num; + return vm_check_ints_blocking(ec); +} + +/* + * poll() is supported by many OSes, but so far Linux is the only + * one we know of that supports using poll() in all places select() + * would work. + */ +#if defined(HAVE_POLL) +# if defined(__linux__) +# define USE_POLL +# endif +# if defined(__FreeBSD_version) && __FreeBSD_version >= 1100000 +# define USE_POLL + /* FreeBSD does not set POLLOUT when POLLHUP happens */ +# define POLLERR_SET (POLLHUP | POLLERR) +# endif +#endif + +static void +timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end, + const struct timeval *timeout) +{ + if (timeout) { + *rel = rb_timeval2hrtime(timeout); + *end = rb_hrtime_add(rb_hrtime_now(), *rel); + *to = rel; + } + else { + *to = 0; + } } #if THREAD_DEBUG #ifdef HAVE_VA_ARGS_MACRO void rb_thread_debug(const char *file, int line, const char *fmt, ...); -#define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__) +#define thread_debug(...) rb_thread_debug(__FILE__, __LINE__, __VA_ARGS__) #define POSITION_FORMAT "%s:%d:" #define POSITION_ARGS ,file, line #else @@ -203,8 +268,9 @@ void rb_thread_debug(const char *fmt, ...); #endif # ifdef NON_SCALAR_THREAD_ID -static const char * -fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf) +#define fill_thread_id_string ruby_fill_thread_id_string +const char * +ruby_fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf) { extern const char ruby_digitmap[]; size_t i; @@ -241,7 +307,7 @@ static int rb_thread_debug_enabled; */ static VALUE -rb_thread_s_debug(void) +rb_thread_s_debug(VALUE _) { return INT2NUM(rb_thread_debug_enabled); } @@ -268,21 +334,24 @@ rb_thread_s_debug_set(VALUE self, VALUE val) #endif #ifndef fill_thread_id_str -# define fill_thread_id_string(thid, buf) (thid) +# define fill_thread_id_string(thid, buf) ((void *)(uintptr_t)(thid)) # define fill_thread_id_str(th) (void)0 -# define thread_id_str(th) ((void *)(th)->thread_id) +# define thread_id_str(th) ((void *)(uintptr_t)(th)->thread_id) # define PRI_THREAD_ID "p" #endif -#ifndef __ia64 -#define thread_start_func_2(th, st, rst) thread_start_func_2(th, st) -#endif -NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, - VALUE *register_stack_start)); -static void timer_thread_function(void *); +NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)); +void ruby_sigchld_handler(rb_vm_t *); /* signal.c */ + +static void +ubf_sigwait(void *ignore) +{ + rb_thread_wakeup_timer_thread(0); +} + +#include THREAD_IMPL_SRC #if defined(_WIN32) -#include "thread_win32.c" #define DEBUG_OUT() \ WaitForSingleObject(&debug_mutex, INFINITE); \ @@ -291,7 +360,6 @@ static void timer_thread_function(void *); ReleaseMutex(&debug_mutex); #elif defined(HAVE_PTHREAD_H) -#include "thread_pthread.c" #define DEBUG_OUT() \ pthread_mutex_lock(&debug_mutex); \ @@ -300,8 +368,19 @@ static void timer_thread_function(void *); fflush(stdout); \ pthread_mutex_unlock(&debug_mutex); -#else -#error "unsupported thread type" +#endif + +/* + * TODO: somebody with win32 knowledge should be able to get rid of + * timer-thread by busy-waiting on signals. And it should be possible + * to make the GVL in thread_pthread.c be platform-independent. + */ +#ifndef BUSY_WAIT_SIGNALS +# define BUSY_WAIT_SIGNALS (0) +#endif + +#ifndef USE_EVENTFD +# define USE_EVENTFD (0) #endif #if THREAD_DEBUG @@ -325,7 +404,7 @@ rb_thread_debug( if (debug_mutex_initialized == 1) { debug_mutex_initialized = 0; - native_mutex_initialize(&debug_mutex); + rb_native_mutex_initialize(&debug_mutex); } va_start(args, fmt); @@ -339,87 +418,88 @@ rb_thread_debug( #include "thread_sync.c" void -rb_vm_gvl_destroy(rb_vm_t *vm) +rb_vm_gvl_destroy(rb_global_vm_lock_t *gvl) { - gvl_release(vm); - gvl_destroy(vm); - native_mutex_destroy(&vm->thread_destruct_lock); + gvl_release(gvl); + gvl_destroy(gvl); } void rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock) { - native_mutex_initialize(lock); + rb_native_mutex_initialize(lock); } void rb_nativethread_lock_destroy(rb_nativethread_lock_t *lock) { - native_mutex_destroy(lock); + rb_native_mutex_destroy(lock); } void rb_nativethread_lock_lock(rb_nativethread_lock_t *lock) { - native_mutex_lock(lock); + rb_native_mutex_lock(lock); } void rb_nativethread_lock_unlock(rb_nativethread_lock_t *lock) { - native_mutex_unlock(lock); + rb_native_mutex_unlock(lock); } static int -set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, - struct rb_unblock_callback *old, int fail_if_interrupted) +unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted) { do { if (fail_if_interrupted) { - if (RUBY_VM_INTERRUPTED_ANY(th)) { + if (RUBY_VM_INTERRUPTED_ANY(th->ec)) { return FALSE; } } else { - RUBY_VM_CHECK_INTS(th); + RUBY_VM_CHECK_INTS(th->ec); } - native_mutex_lock(&th->interrupt_lock); - } while (!th->raised_flag && RUBY_VM_INTERRUPTED_ANY(th) && - (native_mutex_unlock(&th->interrupt_lock), TRUE)); + rb_native_mutex_lock(&th->interrupt_lock); + } while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) && + (rb_native_mutex_unlock(&th->interrupt_lock), TRUE)); + + VM_ASSERT(th->unblock.func == NULL); - if (old) *old = th->unblock; th->unblock.func = func; th->unblock.arg = arg; - native_mutex_unlock(&th->interrupt_lock); + rb_native_mutex_unlock(&th->interrupt_lock); return TRUE; } static void -reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old) +unblock_function_clear(rb_thread_t *th) { - native_mutex_lock(&th->interrupt_lock); - th->unblock = *old; - native_mutex_unlock(&th->interrupt_lock); + rb_native_mutex_lock(&th->interrupt_lock); + th->unblock.func = 0; + rb_native_mutex_unlock(&th->interrupt_lock); } static void rb_threadptr_interrupt_common(rb_thread_t *th, int trap) { - native_mutex_lock(&th->interrupt_lock); - if (trap) - RUBY_VM_SET_TRAP_INTERRUPT(th); - else - RUBY_VM_SET_INTERRUPT(th); - if (th->unblock.func) { + rb_native_mutex_lock(&th->interrupt_lock); + + if (trap) { + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + } + else { + RUBY_VM_SET_INTERRUPT(th->ec); + } + if (th->unblock.func != NULL) { (th->unblock.func)(th->unblock.arg); } else { /* none */ } - native_cond_signal(&th->interrupt_cond); - native_mutex_unlock(&th->interrupt_lock); + rb_native_mutex_unlock(&th->interrupt_lock); } void @@ -428,75 +508,104 @@ rb_threadptr_interrupt(rb_thread_t *th) rb_threadptr_interrupt_common(th, 0); } -void -rb_threadptr_trap_interrupt(rb_thread_t *th) +static void +threadptr_trap_interrupt(rb_thread_t *th) { rb_threadptr_interrupt_common(th, 1); } static void -terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread) +terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) { rb_thread_t *th = 0; - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th != main_thread) { - thread_debug("terminate_i: %p\n", (void *)th); + list_for_each(&r->threads.set, th, lt_node) { + if (th != main_thread) { + thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(th), thread_status_name(th, TRUE)); rb_threadptr_pending_interrupt_enque(th, eTerminateSignal); rb_threadptr_interrupt(th); + thread_debug("terminate_all: end (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(th), thread_status_name(th, TRUE)); } else { - thread_debug("terminate_i: main thread (%p)\n", (void *)th); + thread_debug("terminate_all: main thread (%p)\n", (void *)th); } } } +static void +rb_threadptr_join_list_wakeup(rb_thread_t *thread) +{ + while (thread->join_list) { + struct rb_waiting_list *join_list = thread->join_list; + + // Consume the entry from the join list: + thread->join_list = join_list->next; + + rb_thread_t *target_thread = join_list->thread; + + if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) { + rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber)); + } + else { + rb_threadptr_interrupt(target_thread); + + switch (target_thread->status) { + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + target_thread->status = THREAD_RUNNABLE; + default: + break; + } + } + } +} + void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) { - const char *err; - rb_mutex_t *mutex; - rb_mutex_t *mutexes = th->keeping_mutexes; + while (th->keeping_mutexes) { + rb_mutex_t *mutex = th->keeping_mutexes; + th->keeping_mutexes = mutex->next_mutex; - while (mutexes) { - mutex = mutexes; - /* rb_warn("mutex #<%p> remains to be locked by terminated thread", - mutexes); */ - mutexes = mutex->next_mutex; - err = rb_mutex_unlock_th(mutex, th); - if (err) rb_bug("invalid keeping_mutexes: %s", err); + /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */ + + const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber); + if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message); } } void -rb_thread_terminate_all(void) +rb_thread_terminate_all(rb_thread_t *th) { - rb_thread_t *volatile th = GET_THREAD(); /* main thread */ - rb_vm_t *volatile vm = th->vm; + rb_ractor_t *cr = th->ractor; + rb_execution_context_t * volatile ec = th->ec; volatile int sleeping = 0; - if (vm->main_thread != th) { - rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", - (void *)vm->main_thread, (void *)th); + if (cr->threads.main != th) { + rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", + (void *)cr->threads.main, (void *)th); } /* unlock all locking mutexes */ rb_threadptr_unlock_all_locking_mutexes(th); - TH_PUSH_TAG(th); - if (TH_EXEC_TAG() == 0) { + EC_PUSH_TAG(ec); + if (EC_EXEC_TAG() == TAG_NONE) { retry: thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); - terminate_all(vm, th); + terminate_all(cr, th); - while (vm_living_thread_num(vm) > 1) { - /* + while (rb_ractor_living_thread_num(cr) > 1) { + rb_hrtime_t rel = RB_HRTIME_PER_SEC; + /*q * Thread exiting routine in thread_start_func_2 notify * me when the last sub-thread exit. */ sleeping = 1; - native_sleep(th, 0); - RUBY_VM_CHECK_INTS_BLOCKING(th); + native_sleep(th, &rel); + RUBY_VM_CHECK_INTS_BLOCKING(ec); sleeping = 0; } } @@ -511,18 +620,21 @@ rb_thread_terminate_all(void) goto retry; } } - TH_POP_TAG(); + EC_POP_TAG(); } +void rb_threadptr_root_fiber_terminate(rb_thread_t *th); + static void thread_cleanup_func_before_exec(void *th_ptr) { rb_thread_t *th = th_ptr; th->status = THREAD_KILLED; - th->machine.stack_start = th->machine.stack_end = 0; -#ifdef __ia64 - th->machine.register_stack_start = th->machine.register_stack_end = 0; -#endif + + // The thread stack doesn't exist in the forked process: + th->ec->machine.stack_start = th->ec->machine.stack_end = NULL; + + rb_threadptr_root_fiber_terminate(th); } static void @@ -537,16 +649,20 @@ thread_cleanup_func(void *th_ptr, int atfork) * Unfortunately, we can't release native threading resource at fork * because libc may have unstable locking state therefore touching * a threading resource may cause a deadlock. + * + * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe + * with NPTL, but native_thread_destroy calls pthread_cond_destroy + * which calls free(3), so there is a small memory leak atfork, here. */ if (atfork) return; - native_mutex_destroy(&th->interrupt_lock); + rb_native_mutex_destroy(&th->interrupt_lock); native_thread_destroy(th); } static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); -static VALUE rb_thread_inspect(VALUE thread); +static VALUE rb_thread_to_s(VALUE thread); void ruby_thread_init_stack(rb_thread_t *th) @@ -567,157 +683,305 @@ rb_vm_proc_local_ep(VALUE proc) } } +// for ractor, defined in vm.c +VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self, + int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler); + +static VALUE +thread_do_start_proc(rb_thread_t *th) +{ + VALUE args = th->invoke_arg.proc.args; + const VALUE *args_ptr; + int args_len; + VALUE procval = th->invoke_arg.proc.proc; + rb_proc_t *proc; + GetProcPtr(procval, proc); + + th->ec->errinfo = Qnil; + th->ec->root_lep = rb_vm_proc_local_ep(procval); + th->ec->root_svar = Qfalse; + + vm_check_ints_blocking(th->ec); + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + VALUE self = rb_ractor_self(th->ractor); + VM_ASSERT(FIXNUM_P(args)); + args_len = FIX2INT(args); + args_ptr = ALLOCA_N(VALUE, args_len); + rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr); + vm_check_ints_blocking(th->ec); + + return rb_vm_invoke_proc_with_self( + th->ec, proc, self, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); + } + else { + args_len = RARRAY_LENINT(args); + if (args_len < 8) { + /* free proc.args if the length is enough small */ + args_ptr = ALLOCA_N(VALUE, args_len); + MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len); + th->invoke_arg.proc.args = Qnil; + } + else { + args_ptr = RARRAY_CONST_PTR(args); + } + + vm_check_ints_blocking(th->ec); + + return rb_vm_invoke_proc( + th->ec, proc, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); + } +} + static void -thread_do_start(rb_thread_t *th, VALUE args) +thread_do_start(rb_thread_t *th) { native_set_thread_name(th); - if (!th->first_func) { - rb_proc_t *proc; - GetProcPtr(th->first_proc, proc); - th->errinfo = Qnil; - th->root_lep = rb_vm_proc_local_ep(th->first_proc); - th->root_svar = Qfalse; - EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); - th->value = rb_vm_invoke_proc(th, proc, - (int)RARRAY_LEN(args), RARRAY_CONST_PTR(args), - VM_BLOCK_HANDLER_NONE); - EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); - } - else { - th->value = (*th->first_func)((void *)args); + VALUE result = Qundef; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); + + switch (th->invoke_type) { + case thread_invoke_type_proc: + result = thread_do_start_proc(th); + break; + + case thread_invoke_type_ractor_proc: + result = thread_do_start_proc(th); + rb_ractor_atexit(th->ec, result); + break; + + case thread_invoke_type_func: + result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); + break; + + case thread_invoke_type_none: + rb_bug("unreachable"); } + + rb_fiber_scheduler_set(Qnil); + + th->value = result; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); } +void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); + static int -thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start) +thread_start_func_2(rb_thread_t *th, VALUE *stack_start) { - int state; - VALUE args = th->first_args; - rb_thread_list_t *join_list; - rb_thread_t *main_th; + STACK_GROW_DIR_DETECTION; + enum ruby_tag_type state; VALUE errinfo = Qnil; -# ifdef USE_SIGALTSTACK - void rb_register_sigaltstack(rb_thread_t *th); + size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); + rb_thread_t *ractor_main_th = th->ractor->threads.main; + VALUE * vm_stack = NULL; - rb_register_sigaltstack(th); -# endif - - if (th == th->vm->main_thread) - rb_bug("thread_start_func_2 must not be used for main thread"); + VM_ASSERT(th != th->vm->ractor.main_thread); + thread_debug("thread start: %p\n", (void *)th); + // setup native thread + gvl_acquire(rb_ractor_gvl(th->ractor), th); ruby_thread_set_native(th); - th->machine.stack_start = stack_start; -#ifdef __ia64 - th->machine.register_stack_start = register_stack_start; -#endif - thread_debug("thread start: %p\n", (void *)th); + // setup ractor + if (rb_ractor_status_p(th->ractor, ractor_blocking)) { + RB_VM_LOCK(); + { + rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__); + rb_ractor_t *r = th->ractor; + r->r_stdin = rb_io_prep_stdin(); + r->r_stdout = rb_io_prep_stdout(); + r->r_stderr = rb_io_prep_stderr(); + } + RB_VM_UNLOCK(); + } - gvl_acquire(th->vm, th); - { - thread_debug("thread start (get lock): %p\n", (void *)th); - rb_thread_set_current(th); + // This assertion is not passed on win32 env. Check it later. + // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize); - TH_PUSH_TAG(th); - if ((state = EXEC_TAG()) == 0) { - SAVE_ROOT_JMPBUF(th, thread_do_start(th, args)); - } - else { - errinfo = th->errinfo; - if (state == TAG_FATAL) { - /* fatal error within this thread, need to stop whole script */ - } - else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { - /* exit on main_thread. */ - } - else { - if (th->report_on_exception) { - VALUE mesg = rb_thread_inspect(th->self); - rb_str_cat_cstr(mesg, " terminated with exception:\n"); - rb_write_error_str(mesg); - rb_threadptr_error_print(th, errinfo); - } - if (th->vm->thread_abort_on_exception || - th->abort_on_exception || RTEST(ruby_debug)) { - /* exit on main_thread */ - } - else { - errinfo = Qnil; - } - } - th->value = Qnil; - } + // setup VM and machine stack + vm_stack = alloca(size * sizeof(VALUE)); + VM_ASSERT(vm_stack); - th->status = THREAD_KILLED; - thread_debug("thread end: %p\n", (void *)th); + rb_ec_initialize_vm_stack(th->ec, vm_stack, size); + th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack); + th->ec->machine.stack_maxsize -= size * sizeof(VALUE); - main_th = th->vm->main_thread; - if (main_th == th) { - ruby_stop(0); - } - if (RB_TYPE_P(errinfo, T_OBJECT)) { - /* treat with normal error object */ - rb_threadptr_raise(main_th, 1, &errinfo); - } - TH_POP_TAG(); + thread_debug("thread start (get lock): %p\n", (void *)th); - /* locking_mutex must be Qfalse */ - if (th->locking_mutex != Qfalse) { - rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", - (void *)th, th->locking_mutex); - } + // Ensure that we are not joinable. + VM_ASSERT(th->value == Qundef); - /* delete self other than main thread from living_threads */ - rb_vm_living_threads_remove(th->vm, th); - if (main_th->status == THREAD_KILLED && rb_thread_alone()) { - /* I'm last thread. wake up main thread from rb_thread_terminate_all */ - rb_threadptr_interrupt(main_th); - } + EC_PUSH_TAG(th->ec); - /* wake up joining threads */ - join_list = th->join_list; - while (join_list) { - rb_threadptr_interrupt(join_list->th); - switch (join_list->th->status) { - case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: - join_list->th->status = THREAD_RUNNABLE; - default: break; - } - join_list = join_list->next; - } + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + SAVE_ROOT_JMPBUF(th, thread_do_start(th)); + } + else { + errinfo = th->ec->errinfo; + + VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef); + if (!NIL_P(exc)) errinfo = exc; + + if (state == TAG_FATAL) { + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit(th->ec, Qnil); + } + /* fatal error within this thread, need to stop whole script */ + } + else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { + /* exit on main_thread. */ + } + else { + if (th->report_on_exception) { + VALUE mesg = rb_thread_to_s(th->self); + rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n"); + rb_write_error_str(mesg); + rb_ec_error_print(th->ec, errinfo); + } + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit_exception(th->ec); + } + + if (th->vm->thread_abort_on_exception || + th->abort_on_exception || RTEST(ruby_debug)) { + /* exit on main_thread */ + } + else { + errinfo = Qnil; + } + } + th->value = Qnil; + } + + // The thread is effectively finished and can be joined. + VM_ASSERT(th->value != Qundef); + + rb_threadptr_join_list_wakeup(th); + rb_threadptr_unlock_all_locking_mutexes(th); - rb_threadptr_unlock_all_locking_mutexes(th); - rb_check_deadlock(th->vm); + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_thread_terminate_all(th); + rb_ractor_teardown(th->ec); + } + + th->status = THREAD_KILLED; + thread_debug("thread end: %p\n", (void *)th); - rb_thread_recycle_stack_release(th->stack); - th->stack = NULL; + if (th->vm->ractor.main_thread == th) { + ruby_stop(0); } - native_mutex_lock(&th->vm->thread_destruct_lock); - /* make sure vm->running_thread never point me after this point.*/ - th->vm->running_thread = NULL; - native_mutex_unlock(&th->vm->thread_destruct_lock); + + if (RB_TYPE_P(errinfo, T_OBJECT)) { + /* treat with normal error object */ + rb_threadptr_raise(ractor_main_th, 1, &errinfo); + } + + EC_POP_TAG(); + + rb_ec_clear_current_thread_trace_func(th->ec); + + /* locking_mutex must be Qfalse */ + if (th->locking_mutex != Qfalse) { + rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", + (void *)th, th->locking_mutex); + } + + if (ractor_main_th->status == THREAD_KILLED && + th->ractor->threads.cnt <= 2 /* main thread and this thread */) { + /* I'm last thread. wake up main thread from rb_thread_terminate_all */ + rb_threadptr_interrupt(ractor_main_th); + } + + rb_check_deadlock(th->ractor); + + rb_fiber_close(th->ec->fiber_ptr); + thread_cleanup_func(th, FALSE); - gvl_release(th->vm); + VM_ASSERT(th->ec->vm_stack == NULL); + + if (th->invoke_type == thread_invoke_type_ractor_proc) { + // after rb_ractor_living_threads_remove() + // GC will happen anytime and this ractor can be collected (and destroy GVL). + // So gvl_release() should be before it. + gvl_release(rb_ractor_gvl(th->ractor)); + rb_ractor_living_threads_remove(th->ractor, th); + } + else { + rb_ractor_living_threads_remove(th->ractor, th); + gvl_release(rb_ractor_gvl(th->ractor)); + } return 0; } +struct thread_create_params { + enum thread_invoke_type type; + + // for normal proc thread + VALUE args; + VALUE proc; + + // for ractor + rb_ractor_t *g; + + // for func + VALUE (*fn)(void *); +}; + static VALUE -thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) +thread_create_core(VALUE thval, struct thread_create_params *params) { - rb_thread_t *th, *current_th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec); int err; if (OBJ_FROZEN(current_th->thgroup)) { rb_raise(rb_eThreadError, "can't start a new thread (frozen ThreadGroup)"); } - GetThreadPtr(thval, th); - /* setup thread environment */ - th->first_func = fn; - th->first_proc = fn ? Qfalse : rb_block_proc(); - th->first_args = args; /* GC: shouldn't put before above line */ + switch (params->type) { + case thread_invoke_type_proc: + th->invoke_type = thread_invoke_type_proc; + th->invoke_arg.proc.args = params->args; + th->invoke_arg.proc.proc = params->proc; + th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); + break; + + case thread_invoke_type_ractor_proc: +#if RACTOR_CHECK_MODE > 0 + rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g)); +#endif + th->invoke_type = thread_invoke_type_ractor_proc; + th->ractor = params->g; + th->ractor->threads.main = th; + th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc); + th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args)); + th->invoke_arg.proc.kw_splat = rb_keyword_given_p(); + rb_ractor_send_parameters(ec, params->g, params->args); + break; + + case thread_invoke_type_func: + th->invoke_type = thread_invoke_type_func; + th->invoke_arg.func.func = params->fn; + th->invoke_arg.func.arg = (void *)params->args; + break; + + default: + rb_bug("unreachable"); + } th->priority = current_th->priority; th->thgroup = current_th->thgroup; @@ -727,23 +991,23 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack); RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack); - th->interrupt_mask = 0; + rb_native_mutex_initialize(&th->interrupt_lock); + + RUBY_DEBUG_LOG("r:%u th:%p", rb_ractor_id(th->ractor), (void *)th); - native_mutex_initialize(&th->interrupt_lock); - native_cond_initialize(&th->interrupt_cond, RB_CONDATTR_CLOCK_MONOTONIC); - th->report_on_exception = th->vm->thread_report_on_exception; + rb_ractor_living_threads_insert(th->ractor, th); /* kick thread */ err = native_thread_create(th); if (err) { th->status = THREAD_KILLED; - rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err)); + rb_ractor_living_threads_remove(th->ractor, th); + rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err)); } - rb_vm_living_threads_insert(th->vm, th); return thval; } -#define threadptr_initialized(th) ((th)->first_args != 0) +#define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none) /* * call-seq: @@ -771,11 +1035,12 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) rb_thread_t *th; VALUE thread = rb_thread_alloc(klass); - if (GET_VM()->main_thread->status == THREAD_KILLED) - rb_raise(rb_eThreadError, "can't alloc thread"); + if (GET_RACTOR()->threads.main->status == THREAD_KILLED) { + rb_raise(rb_eThreadError, "can't alloc thread"); + } - rb_obj_call_init(thread, argc, argv); - GetThreadPtr(thread, th); + rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS); + th = rb_thread_ptr(thread); if (!threadptr_initialized(th)) { rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'", klass); @@ -796,145 +1061,218 @@ thread_s_new(int argc, VALUE *argv, VALUE klass) static VALUE thread_start(VALUE klass, VALUE args) { - return thread_create_core(rb_thread_alloc(klass), args, 0); + struct thread_create_params params = { + .type = thread_invoke_type_proc, + .args = args, + .proc = rb_block_proc(), + }; + return thread_create_core(rb_thread_alloc(klass), ¶ms); +} + +static VALUE +threadptr_invoke_proc_location(rb_thread_t *th) +{ + if (th->invoke_type == thread_invoke_type_proc) { + return rb_proc_location(th->invoke_arg.proc.proc); + } + else { + return Qnil; + } } /* :nodoc: */ static VALUE thread_initialize(VALUE thread, VALUE args) { - rb_thread_t *th; + rb_thread_t *th = rb_thread_ptr(thread); + if (!rb_block_given_p()) { - rb_raise(rb_eThreadError, "must be called with a block"); - } - GetThreadPtr(thread, th); - if (th->first_args) { - VALUE proc = th->first_proc, loc; - if (!proc || !RTEST(loc = rb_proc_location(proc))) { + rb_raise(rb_eThreadError, "must be called with a block"); + } + else if (th->invoke_type != thread_invoke_type_none) { + VALUE loc = threadptr_invoke_proc_location(th); + if (!NIL_P(loc)) { + rb_raise(rb_eThreadError, + "already initialized thread - %"PRIsVALUE":%"PRIsVALUE, + RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1)); + } + else { rb_raise(rb_eThreadError, "already initialized thread"); } - rb_raise(rb_eThreadError, - "already initialized thread - %"PRIsVALUE":%"PRIsVALUE, - RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1)); } - return thread_create_core(thread, args, 0); + else { + struct thread_create_params params = { + .type = thread_invoke_type_proc, + .args = args, + .proc = rb_block_proc(), + }; + return thread_create_core(thread, ¶ms); + } } VALUE -rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) +rb_thread_create(VALUE (*fn)(void *), void *arg) { - return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); + struct thread_create_params params = { + .type = thread_invoke_type_func, + .fn = fn, + .args = (VALUE)arg, + }; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); } +VALUE +rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) +{ + struct thread_create_params params = { + .type = thread_invoke_type_ractor_proc, + .g = g, + .args = args, + .proc = proc, + }; + return thread_create_core(rb_thread_alloc(rb_cThread), ¶ms); +} -/* +infty, for this purpose */ -#define DELAY_INFTY 1E30 struct join_arg { - rb_thread_t *target, *waiting; - double delay; + struct rb_waiting_list *waiter; + rb_thread_t *target; + VALUE timeout; + rb_hrtime_t *limit; }; static VALUE remove_from_join_list(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; + rb_thread_t *target_thread = p->target; - if (target_th->status != THREAD_KILLED) { - rb_thread_list_t **p = &target_th->join_list; + if (target_thread->status != THREAD_KILLED) { + struct rb_waiting_list **join_list = &target_thread->join_list; - while (*p) { - if ((*p)->th == th) { - *p = (*p)->next; - break; - } - p = &(*p)->next; - } + while (*join_list) { + if (*join_list == p->waiter) { + *join_list = (*join_list)->next; + break; + } + + join_list = &(*join_list)->next; + } } return Qnil; } +static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); + +static int +thread_finished(rb_thread_t *th) +{ + return th->status == THREAD_KILLED || th->value != Qundef; +} + static VALUE thread_join_sleep(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; - const int forever = p->delay == DELAY_INFTY; - const double limit = forever ? 0 : timeofday() + p->delay; + rb_thread_t *target_th = p->target, *th = p->waiter->thread; + rb_hrtime_t end = 0, *limit = p->limit; - while (target_th->status != THREAD_KILLED) { - if (forever) { - sleep_forever(th, TRUE, FALSE); - } - else { - double now = timeofday(); - if (now > limit) { - thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); - return Qfalse; - } - sleep_wait_for_interrupt(th, limit - now, 0); - } - thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); + if (limit) { + end = rb_hrtime_add(*limit, rb_hrtime_now()); + } + + while (!thread_finished(target_th)) { + VALUE scheduler = rb_fiber_scheduler_current(); + + if (scheduler != Qnil) { + rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout); + } + else if (!limit) { + th->status = THREAD_STOPPED_FOREVER; + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); + native_sleep(th, 0); + rb_ractor_sleeper_threads_dec(th->ractor); + } + else { + if (hrtime_update_expire(limit, end)) { + thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", + thread_id_str(target_th)); + return Qfalse; + } + th->status = THREAD_STOPPED; + native_sleep(th, limit); + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = THREAD_RUNNABLE; + thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); } return Qtrue; } static VALUE -thread_join(rb_thread_t *target_th, double delay) +thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit) { - rb_thread_t *th = GET_THREAD(); - struct join_arg arg; + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = ec->thread_ptr; + rb_fiber_t *fiber = ec->fiber_ptr; if (th == target_th) { - rb_raise(rb_eThreadError, "Target thread must not be current thread"); - } - if (GET_VM()->main_thread == target_th) { - rb_raise(rb_eThreadError, "Target thread must not be main thread"); + rb_raise(rb_eThreadError, "Target thread must not be current thread"); } - arg.target = target_th; - arg.waiting = th; - arg.delay = delay; + if (th->ractor->threads.main == target_th) { + rb_raise(rb_eThreadError, "Target thread must not be main thread"); + } - thread_debug("thread_join (thid: %"PRI_THREAD_ID")\n", thread_id_str(target_th)); + thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); if (target_th->status != THREAD_KILLED) { - rb_thread_list_t list; - list.next = target_th->join_list; - list.th = th; - target_th->join_list = &list; - if (!rb_ensure(thread_join_sleep, (VALUE)&arg, - remove_from_join_list, (VALUE)&arg)) { - return Qnil; - } + struct rb_waiting_list waiter; + waiter.next = target_th->join_list; + waiter.thread = th; + waiter.fiber = fiber; + target_th->join_list = &waiter; + + struct join_arg arg; + arg.waiter = &waiter; + arg.target = target_th; + arg.timeout = timeout; + arg.limit = limit; + + if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) { + return Qnil; + } } - thread_debug("thread_join: success (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); + thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); - if (target_th->errinfo != Qnil) { - VALUE err = target_th->errinfo; + if (target_th->ec->errinfo != Qnil) { + VALUE err = target_th->ec->errinfo; - if (FIXNUM_P(err)) { - switch (err) { - case INT2FIX(TAG_FATAL): - /* OK. killed. */ - break; - default: - rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); - } - } - else if (THROW_DATA_P(target_th->errinfo)) { - rb_bug("thread_join: THROW_DATA should not reach here."); - } - else { - /* normal exception */ - rb_exc_raise(err); - } + if (FIXNUM_P(err)) { + switch (err) { + case INT2FIX(TAG_FATAL): + thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); + + /* OK. killed. */ + break; + default: + rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); + } + } + else if (THROW_DATA_P(target_th->ec->errinfo)) { + rb_bug("thread_join: THROW_DATA should not reach here."); + } + else { + /* normal exception */ + rb_exc_raise(err); + } } return target_th->self; } @@ -981,18 +1319,30 @@ thread_join(rb_thread_t *target_th, double delay) static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { - rb_thread_t *target_th; - double delay = DELAY_INFTY; - VALUE limit; + VALUE timeout = Qnil; + rb_hrtime_t rel = 0, *limit = 0; - GetThreadPtr(self, target_th); + if (rb_check_arity(argc, 0, 1)) { + timeout = argv[0]; + } - rb_scan_args(argc, argv, "01", &limit); - if (!NIL_P(limit)) { - delay = rb_num2dbl(limit); + // Convert the timeout eagerly, so it's always converted and deterministic + /* + * This supports INFINITY and negative values, so we can't use + * rb_time_interval right now... + */ + if (NIL_P(timeout)) { + /* unlimited */ + } + else if (FIXNUM_P(timeout)) { + rel = rb_sec2hrtime(NUM2TIMET(timeout)); + limit = &rel; + } + else { + limit = double2hrtime(&rel, rb_num2dbl(timeout)); } - return thread_join(target_th, delay); + return thread_join(rb_thread_ptr(self), timeout, limit); } /* @@ -1012,9 +1362,12 @@ thread_join_m(int argc, VALUE *argv, VALUE self) static VALUE thread_value(VALUE self) { - rb_thread_t *th; - GetThreadPtr(self, th); - thread_join(th, DELAY_INFTY); + rb_thread_t *th = rb_thread_ptr(self); + thread_join(th, Qnil, 0); + if (th->value == Qundef) { + // If the thread is dead because we forked th->value is still Qundef. + return Qnil; + } return th->value; } @@ -1023,186 +1376,190 @@ thread_value(VALUE self) */ /* - * The type of tv_sec in struct timeval is time_t in POSIX. - * But several systems violate POSIX. - * - * OpenBSD 5.2 (amd64): - * time_t: int (signed 32bit integer) - * tv_sec: long (signed 64bit integer) - * - * MinGW-w64 (x64): - * time_t: long long (signed 64bit integer) - * tv_sec: long (signed 32bit integer) + * Back when we used "struct timeval", not all platforms implemented + * tv_sec as time_t. Nowadays we use "struct timespec" and tv_sec + * seems to be implemented more consistently across platforms. + * At least other parts of our code hasn't had to deal with non-time_t + * tv_sec in timespec... */ - -#if SIGNEDNESS_OF_TIME_T < 0 /* signed */ -# define TIMEVAL_SEC_MAX SIGNED_INTEGER_MAX(TYPEOF_TIMEVAL_TV_SEC) -# define TIMEVAL_SEC_MIN SIGNED_INTEGER_MIN(TYPEOF_TIMEVAL_TV_SEC) -#elif SIGNEDNESS_OF_TIME_T > 0 /* unsigned */ -# define TIMEVAL_SEC_MAX ((TYPEOF_TIMEVAL_TV_SEC)(~(unsigned_time_t)0)) -# define TIMEVAL_SEC_MIN ((TYPEOF_TIMEVAL_TV_SEC)0) +#define TIMESPEC_SEC_MAX TIMET_MAX +#define TIMESPEC_SEC_MIN TIMET_MIN + +COMPILER_WARNING_PUSH +#if __has_warning("-Wimplicit-int-float-conversion") +COMPILER_WARNING_IGNORED(-Wimplicit-int-float-conversion) +#elif defined(_MSC_VER) +/* C4305: 'initializing': truncation from '__int64' to 'const double' */ +COMPILER_WARNING_IGNORED(4305) #endif +static const double TIMESPEC_SEC_MAX_as_double = TIMESPEC_SEC_MAX; +COMPILER_WARNING_POP -static struct timeval -double2timeval(double d) +static rb_hrtime_t * +double2hrtime(rb_hrtime_t *hrt, double d) { - /* assume timeval.tv_sec has same signedness as time_t */ - const double TIMEVAL_SEC_MAX_PLUS_ONE = (2*(double)(TIMEVAL_SEC_MAX/2+1)); - - struct timeval time; + /* assume timespec.tv_sec has same signedness as time_t */ + const double TIMESPEC_SEC_MAX_PLUS_ONE = 2.0 * (TIMESPEC_SEC_MAX_as_double / 2.0 + 1.0); - if (TIMEVAL_SEC_MAX_PLUS_ONE <= d) { - time.tv_sec = TIMEVAL_SEC_MAX; - time.tv_usec = 999999; + if (TIMESPEC_SEC_MAX_PLUS_ONE <= d) { + return NULL; } - else if (d <= TIMEVAL_SEC_MIN) { - time.tv_sec = TIMEVAL_SEC_MIN; - time.tv_usec = 0; + else if (d <= 0) { + *hrt = 0; } else { - time.tv_sec = (TYPEOF_TIMEVAL_TV_SEC)d; - time.tv_usec = (int)((d - (time_t)d) * 1e6); - if (time.tv_usec < 0) { - time.tv_usec += (int)1e6; - time.tv_sec -= 1; - } + *hrt = (rb_hrtime_t)(d * (double)RB_HRTIME_PER_SEC); } - return time; + return hrt; } static void -sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check) +getclockofday(struct timespec *ts) +{ +#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) + if (clock_gettime(CLOCK_MONOTONIC, ts) == 0) + return; +#endif + rb_timespec_now(ts); +} + +/* + * Don't inline this, since library call is already time consuming + * and we don't want "struct timespec" on stack too long for GC + */ +NOINLINE(rb_hrtime_t rb_hrtime_now(void)); +rb_hrtime_t +rb_hrtime_now(void) +{ + struct timespec ts; + + getclockofday(&ts); + return rb_timespec2hrtime(&ts); +} + +static void +sleep_forever(rb_thread_t *th, unsigned int fl) { enum rb_thread_status prev_status = th->status; - enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; + enum rb_thread_status status; + int woke; + status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; th->status = status; - RUBY_VM_CHECK_INTS_BLOCKING(th); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); while (th->status == status) { - if (deadlockable) { - th->vm->sleeper++; - rb_check_deadlock(th->vm); + if (fl & SLEEP_DEADLOCKABLE) { + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); } native_sleep(th, 0); - if (deadlockable) { - th->vm->sleeper--; + if (fl & SLEEP_DEADLOCKABLE) { + rb_ractor_sleeper_threads_dec(th->ractor); } - RUBY_VM_CHECK_INTS_BLOCKING(th); - if (!spurious_check) + woke = vm_check_ints_blocking(th->ec); + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) break; } th->status = prev_status; } -static void -getclockofday(struct timeval *tp) +/* + * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end" + * being uninitialized, maybe other versions, too. + */ +COMPILER_WARNING_PUSH +#if defined(__GNUC__) && __GNUC__ == 7 && __GNUC_MINOR__ <= 3 +COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized) +#endif +#ifndef PRIu64 +#define PRIu64 PRI_64_PREFIX "u" +#endif +/* + * @end is the absolute time when @ts is set to expire + * Returns true if @end has past + * Updates @ts and returns false otherwise + */ +static int +hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end) { -#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) - struct timespec ts; + rb_hrtime_t now = rb_hrtime_now(); - if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { - tp->tv_sec = ts.tv_sec; - tp->tv_usec = (int)(ts.tv_nsec / 1000); - } - else -#endif - { - gettimeofday(tp, NULL); - } + if (now > end) return 1; + thread_debug("hrtime_update_expire: " + "%"PRIu64" > %"PRIu64"\n", + (uint64_t)end, (uint64_t)now); + *timeout = end - now; + return 0; } +COMPILER_WARNING_POP -static void -sleep_timeval(rb_thread_t *th, struct timeval tv, int spurious_check) +static int +sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl) { - struct timeval to, tvn; enum rb_thread_status prev_status = th->status; - - getclockofday(&to); - if (TIMEVAL_SEC_MAX - tv.tv_sec < to.tv_sec) - to.tv_sec = TIMEVAL_SEC_MAX; - else - to.tv_sec += tv.tv_sec; - if ((to.tv_usec += tv.tv_usec) >= 1000000) { - if (to.tv_sec == TIMEVAL_SEC_MAX) - to.tv_usec = 999999; - else { - to.tv_sec++; - to.tv_usec -= 1000000; - } - } + int woke; + rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), rel); th->status = THREAD_STOPPED; - RUBY_VM_CHECK_INTS_BLOCKING(th); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); while (th->status == THREAD_STOPPED) { - native_sleep(th, &tv); - RUBY_VM_CHECK_INTS_BLOCKING(th); - getclockofday(&tvn); - if (to.tv_sec < tvn.tv_sec) break; - if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; - thread_debug("sleep_timeval: %"PRI_TIMET_PREFIX"d.%.6ld > %"PRI_TIMET_PREFIX"d.%.6ld\n", - (time_t)to.tv_sec, (long)to.tv_usec, - (time_t)tvn.tv_sec, (long)tvn.tv_usec); - tv.tv_sec = to.tv_sec - tvn.tv_sec; - if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { - --tv.tv_sec; - tv.tv_usec += 1000000; - } - if (!spurious_check) + native_sleep(th, &rel); + woke = vm_check_ints_blocking(th->ec); + if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) + break; + if (hrtime_update_expire(&rel, end)) break; + woke = 1; } th->status = prev_status; + return woke; } void rb_thread_sleep_forever(void) { thread_debug("rb_thread_sleep_forever\n"); - sleep_forever(GET_THREAD(), FALSE, TRUE); + sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK); } void rb_thread_sleep_deadly(void) { thread_debug("rb_thread_sleep_deadly\n"); - sleep_forever(GET_THREAD(), TRUE, TRUE); + sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK); } -static void -rb_thread_sleep_deadly_allow_spurious_wakeup(void) -{ - thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); - sleep_forever(GET_THREAD(), TRUE, FALSE); -} - -static double -timeofday(void) +void +rb_thread_sleep_interruptible(void) { -#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) - struct timespec tp; + rb_thread_t *th = GET_THREAD(); + enum rb_thread_status prev_status = th->status; - if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) { - return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9; - } - else -#endif - { - struct timeval tv; - gettimeofday(&tv, NULL); - return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6; - } + th->status = THREAD_STOPPED; + native_sleep(th, 0); + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = prev_status; } static void -sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check) +rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker) { - sleep_timeval(th, double2timeval(sleepsec), spurious_check); + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + rb_fiber_scheduler_block(scheduler, blocker, Qnil); + } + else { + thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); + sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + } } void rb_thread_wait_for(struct timeval time) { rb_thread_t *th = GET_THREAD(); - sleep_timeval(th, time, 1); + + sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK); } /* @@ -1215,8 +1572,7 @@ rb_thread_wait_for(struct timeval time) void rb_thread_check_ints(void) { - rb_thread_t *th = GET_THREAD(); - RUBY_VM_CHECK_INTS_BLOCKING(th); + RUBY_VM_CHECK_INTS_BLOCKING(GET_EC()); } /* @@ -1233,9 +1589,7 @@ rb_thread_check_trap_pending(void) int rb_thread_interrupted(VALUE thval) { - rb_thread_t *th; - GetThreadPtr(thval, th); - return (int)RUBY_VM_INTERRUPTED(th); + return (int)RUBY_VM_INTERRUPTED(rb_thread_ptr(thval)->ec); } void @@ -1245,7 +1599,7 @@ rb_thread_sleep(int sec) } static void -rb_thread_schedule_limits(unsigned long limits_us) +rb_thread_schedule_limits(uint32_t limits_us) { thread_debug("rb_thread_schedule\n"); if (!rb_thread_alone()) { @@ -1254,8 +1608,8 @@ rb_thread_schedule_limits(unsigned long limits_us) if (th->running_time_us >= limits_us) { thread_debug("rb_thread_schedule/switch start\n"); RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_yield(th->vm, th); - rb_thread_set_current(th); + gvl_yield(rb_ractor_gvl(th->ractor), th); + rb_ractor_thread_switch(th->ractor, th); thread_debug("rb_thread_schedule/switch done\n"); } } @@ -1264,10 +1618,8 @@ rb_thread_schedule_limits(unsigned long limits_us) void rb_thread_schedule(void) { - rb_thread_t *cur_th = GET_THREAD(); rb_thread_schedule_limits(0); - - RUBY_VM_CHECK_INTS(cur_th); + RUBY_VM_CHECK_INTS(GET_EC()); } /* blocking region */ @@ -1277,12 +1629,13 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted) { region->prev_status = th->status; - if (set_unblock_function(th, ubf, arg, ®ion->oldubf, fail_if_interrupted)) { + if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) { th->blocking_region_buffer = region; th->status = THREAD_STOPPED; + rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); thread_debug("enter blocking region (%p)\n", (void *)th); RB_GC_SAVE_MACHINE_CONTEXT(th); - gvl_release(th->vm); + gvl_release(rb_ractor_gvl(th->ractor)); return TRUE; } else { @@ -1293,38 +1646,61 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) { - gvl_acquire(th->vm, th); - rb_thread_set_current(th); - thread_debug("leave blocking region (%p)\n", (void *)th); + /* entry to ubf_list still permitted at this point, make it impossible: */ + unblock_function_clear(th); + /* entry to ubf_list impossible at this point, so unregister is safe: */ unregister_ubf_list(th); + + gvl_acquire(rb_ractor_gvl(th->ractor), th); + rb_ractor_thread_switch(th->ractor, th); + + thread_debug("leave blocking region (%p)\n", (void *)th); th->blocking_region_buffer = 0; - reset_unblock_function(th, ®ion->oldubf); + rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__); if (th->status == THREAD_STOPPED) { th->status = region->prev_status; } } -static void * -call_without_gvl(void *(*func)(void *), void *data1, - rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted) +void * +rb_nogvl(void *(*func)(void *), void *data1, + rb_unblock_function_t *ubf, void *data2, + int flags) { void *val = 0; - - rb_thread_t *th = GET_THREAD(); + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); + rb_vm_t *vm = rb_ec_vm_ptr(ec); + bool is_main_thread = vm->ractor.main_thread == th; int saved_errno = 0; + VALUE ubf_th = Qfalse; - if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { + if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) { ubf = ubf_select; data2 = th; } + else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) { + if (flags & RB_NOGVL_UBF_ASYNC_SAFE) { + vm->ubf_async_safe = 1; + } + else { + ubf_th = rb_thread_start_unblock_thread(); + } + } - BLOCKING_REGION({ + BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; - }, ubf, data2, fail_if_interrupted); + }, ubf, data2, flags & RB_NOGVL_INTR_FAIL); + + if (is_main_thread) vm->ubf_async_safe = 0; - if (!fail_if_interrupted) { - RUBY_VM_CHECK_INTS_BLOCKING(th); + if ((flags & RB_NOGVL_INTR_FAIL) == 0) { + RUBY_VM_CHECK_INTS_BLOCKING(ec); + } + + if (ubf_th != Qfalse) { + thread_value(rb_thread_kill(ubf_th)); } errno = saved_errno; @@ -1421,47 +1797,59 @@ void * rb_thread_call_without_gvl2(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2) { - return call_without_gvl(func, data1, ubf, data2, TRUE); + return rb_nogvl(func, data1, ubf, data2, RB_NOGVL_INTR_FAIL); } void * rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, rb_unblock_function_t *ubf, void *data2) { - return call_without_gvl(func, data1, ubf, data2, FALSE); + return rb_nogvl(func, data1, ubf, data2, 0); } VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) { volatile VALUE val = Qundef; /* shouldn't be used */ - rb_vm_t *vm = GET_VM(); - rb_thread_t *th = GET_THREAD(); + rb_execution_context_t * volatile ec = GET_EC(); volatile int saved_errno = 0; - int state; - struct waiting_fd wfd; + enum ruby_tag_type state; - wfd.fd = fd; - wfd.th = th; - list_add(&vm->waiting_fds, &wfd.wfd_node); + struct waiting_fd waiting_fd = { + .fd = fd, + .th = rb_ec_thread_ptr(ec) + }; - TH_PUSH_TAG(th); - if ((state = EXEC_TAG()) == 0) { - BLOCKING_REGION({ - val = func(data1); - saved_errno = errno; - }, ubf_select, th, FALSE); + RB_VM_LOCK_ENTER(); + { + list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node); + } + RB_VM_LOCK_LEAVE(); + + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + BLOCKING_REGION(waiting_fd.th, { + val = func(data1); + saved_errno = errno; + }, ubf_select, waiting_fd.th, FALSE); } - TH_POP_TAG(); + EC_POP_TAG(); - /* must be deleted before jump */ - list_del(&wfd.wfd_node); + /* + * must be deleted before jump + * this will delete either from waiting_fds or on-stack LIST_HEAD(busy) + */ + RB_VM_LOCK_ENTER(); + { + list_del(&waiting_fd.wfd_node); + } + RB_VM_LOCK_LEAVE(); if (state) { - TH_JUMP_TAG(th, state); + EC_JUMP_TAG(ec, state); } /* TODO: check func() */ - RUBY_VM_CHECK_INTS_BLOCKING(th); + RUBY_VM_CHECK_INTS_BLOCKING(ec); errno = saved_errno; @@ -1509,7 +1897,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) * because this thread is not Ruby's thread. * What should we do? */ - + bp(); fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); exit(EXIT_FAILURE); } @@ -1525,7 +1913,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1) /* enter to Ruby world: You can access Ruby values, methods and so on. */ r = (*func)(data1); /* leave from Ruby world: You can not access Ruby values, etc. */ - blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); + int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); + RUBY_ASSERT_ALWAYS(released); return r; } @@ -1618,19 +2007,23 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) VALUE mask; long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack); const VALUE *mask_stack = RARRAY_CONST_PTR(th->pending_interrupt_mask_stack); - VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ - long ancestors_len = RARRAY_LEN(ancestors); - const VALUE *ancestors_ptr = RARRAY_CONST_PTR(ancestors); - int i, j; + VALUE mod; + long i; for (i=0; i<mask_stack_len; i++) { mask = mask_stack[mask_stack_len-(i+1)]; - for (j=0; j<ancestors_len; j++) { - VALUE klass = ancestors_ptr[j]; + for (mod = err; mod; mod = RCLASS_SUPER(mod)) { + VALUE klass = mod; VALUE sym; - /* TODO: remove rb_intern() */ + if (BUILTIN_TYPE(mod) == T_ICLASS) { + klass = RBASIC(mod)->klass; + } + else if (mod != RCLASS_ORIGIN(mod)) { + continue; + } + if ((sym = rb_hash_aref(mask, klass)) != Qnil) { if (sym == sym_immediate) { return INTERRUPT_IMMEDIATE; @@ -1652,7 +2045,7 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) } static int -rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th) +rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th) { return RARRAY_LEN(th->pending_interrupt_queue) == 0; } @@ -1707,8 +2100,8 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi #endif } -int -rb_threadptr_pending_interrupt_active_p(rb_thread_t *th) +static int +threadptr_pending_interrupt_active_p(rb_thread_t *th) { /* * For optimization, we don't check async errinfo queue @@ -1848,21 +2241,24 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args) * # all exceptions inherited from Exception are prohibited. * } * + * For handling all interrupts, use +Object+ and not +Exception+ + * as the ExceptionClass, as kill/terminate interrupts are not handled by +Exception+. */ static VALUE rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) { VALUE mask; - rb_thread_t *th = GET_THREAD(); + rb_execution_context_t * volatile ec = GET_EC(); + rb_thread_t * volatile th = rb_ec_thread_ptr(ec); volatile VALUE r = Qnil; - int state; + enum ruby_tag_type state; if (!rb_block_given_p()) { rb_raise(rb_eArgError, "block is needed."); } mask = 0; - mask_arg = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"); + mask_arg = rb_to_hash_type(mask_arg); rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask); if (!mask) { return rb_yield(Qnil); @@ -1871,25 +2267,25 @@ rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) rb_ary_push(th->pending_interrupt_mask_stack, mask); if (!rb_threadptr_pending_interrupt_empty_p(th)) { th->pending_interrupt_queue_checked = 0; - RUBY_VM_SET_INTERRUPT(th); + RUBY_VM_SET_INTERRUPT(th->ec); } - TH_PUSH_TAG(th); - if ((state = EXEC_TAG()) == 0) { + EC_PUSH_TAG(th->ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { r = rb_yield(Qnil); } - TH_POP_TAG(); + EC_POP_TAG(); rb_ary_pop(th->pending_interrupt_mask_stack); if (!rb_threadptr_pending_interrupt_empty_p(th)) { th->pending_interrupt_queue_checked = 0; - RUBY_VM_SET_INTERRUPT(th); + RUBY_VM_SET_INTERRUPT(th->ec); } - RUBY_VM_CHECK_INTS(th); + RUBY_VM_CHECK_INTS(th->ec); if (state) { - TH_JUMP_TAG(th, state); + EC_JUMP_TAG(th->ec, state); } return r; @@ -1908,9 +2304,7 @@ rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) static VALUE rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread) { - rb_thread_t *target_th; - - GetThreadPtr(target_thread, target_th); + rb_thread_t *target_th = rb_thread_ptr(target_thread); if (!target_th->pending_interrupt_queue) { return Qfalse; @@ -1918,20 +2312,14 @@ rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread) if (rb_threadptr_pending_interrupt_empty_p(target_th)) { return Qfalse; } + if (rb_check_arity(argc, 0, 1)) { + VALUE err = argv[0]; + if (!rb_obj_is_kind_of(err, rb_cModule)) { + rb_raise(rb_eTypeError, "class or module required for rescue clause"); + } + return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err)); + } else { - if (argc == 1) { - VALUE err; - rb_scan_args(argc, argv, "01", &err); - if (!rb_obj_is_kind_of(err, rb_cModule)) { - rb_raise(rb_eTypeError, "class or module required for rescue clause"); - } - if (rb_threadptr_pending_interrupt_include_p(target_th, err)) { - return Qtrue; - } - else { - return Qfalse; - } - } return Qtrue; } } @@ -1999,66 +2387,86 @@ rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self) return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self); } +NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th)); + static void rb_threadptr_to_kill(rb_thread_t *th) { rb_threadptr_pending_interrupt_clear(th); th->status = THREAD_RUNNABLE; th->to_kill = 1; - th->errinfo = INT2FIX(TAG_FATAL); - TH_JUMP_TAG(th, TAG_FATAL); + th->ec->errinfo = INT2FIX(TAG_FATAL); + EC_JUMP_TAG(th->ec, TAG_FATAL); } static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th) { + rb_execution_context_t *ec = th->ec; rb_atomic_t interrupt; rb_atomic_t old; do { - interrupt = th->interrupt_flag; - old = ATOMIC_CAS(th->interrupt_flag, interrupt, interrupt & th->interrupt_mask); + interrupt = ec->interrupt_flag; + old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask); } while (old != interrupt); - return interrupt & (rb_atomic_t)~th->interrupt_mask; + return interrupt & (rb_atomic_t)~ec->interrupt_mask; } -void +MJIT_FUNC_EXPORTED int rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { rb_atomic_t interrupt; int postponed_job_interrupt = 0; + int ret = FALSE; - if (th->raised_flag) return; + if (th->ec->raised_flag) return ret; while ((interrupt = threadptr_get_interrupts(th)) != 0) { int sig; int timer_interrupt; int pending_interrupt; int trap_interrupt; + int terminate_interrupt; timer_interrupt = interrupt & TIMER_INTERRUPT_MASK; pending_interrupt = interrupt & PENDING_INTERRUPT_MASK; postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK; trap_interrupt = interrupt & TRAP_INTERRUPT_MASK; + terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors + + if (interrupt & VM_BARRIER_INTERRUPT_MASK) { + RB_VM_LOCK_ENTER(); + RB_VM_LOCK_LEAVE(); + } if (postponed_job_interrupt) { rb_postponed_job_flush(th->vm); } /* signal handling */ - if (trap_interrupt && (th == th->vm->main_thread)) { + if (trap_interrupt && (th == th->vm->ractor.main_thread)) { enum rb_thread_status prev_status = th->status; + int sigwait_fd = rb_sigwait_fd_get(th); + + if (sigwait_fd >= 0) { + (void)consume_communication_pipe(sigwait_fd); + ruby_sigchld_handler(th->vm); + rb_sigwait_fd_put(th, sigwait_fd); + rb_sigwait_fd_migrate(th->vm); + } th->status = THREAD_RUNNABLE; while ((sig = rb_get_next_signal()) != 0) { - rb_signal_exec(th, sig); + ret |= rb_signal_exec(th, sig); } th->status = prev_status; } /* exception from another thread */ - if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) { + if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) { VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); + ret = TRUE; if (err == Qundef) { /* no error */ @@ -2066,9 +2474,13 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) else if (err == eKillSignal /* Thread#kill received */ || err == eTerminateSignal /* Terminate thread */ || err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { - rb_threadptr_to_kill(th); + terminate_interrupt = 1; } else { + if (err == th->vm->special_exceptions[ruby_error_stream_closed]) { + /* the only special exception to be queued across thread */ + err = ruby_vm_special_exception_copy(err); + } /* set runnable if th was slept. */ if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) @@ -2077,8 +2489,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) } } - if (timer_interrupt) { - unsigned long limits_us = TIME_QUANTUM_USEC; + if (terminate_interrupt) { + rb_threadptr_to_kill(th); + } + + if (timer_interrupt) { + uint32_t limits_us = TIME_QUANTUM_USEC; if (th->priority > 0) limits_us <<= th->priority; @@ -2088,19 +2504,20 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) if (th->status == THREAD_RUNNABLE) th->running_time_us += TIME_QUANTUM_USEC; - EXEC_EVENT_HOOK(th, RUBY_INTERNAL_EVENT_SWITCH, th->cfp->self, 0, 0, 0, Qundef); + VM_ASSERT(th->ec->cfp); + EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self, + 0, 0, 0, Qundef); rb_thread_schedule_limits(limits_us); } } + return ret; } void rb_thread_execute_interrupts(VALUE thval) { - rb_thread_t *th; - GetThreadPtr(thval, th); - rb_threadptr_execute_interrupts(th, 1); + rb_threadptr_execute_interrupts(rb_thread_ptr(thval), 1); } static void @@ -2109,14 +2526,12 @@ rb_threadptr_ready(rb_thread_t *th) rb_threadptr_interrupt(th); } -void rb_threadptr_setup_exception(rb_thread_t *th, VALUE mesg, VALUE cause); - static VALUE -rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) +rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv) { VALUE exc; - if (rb_threadptr_dead(th)) { + if (rb_threadptr_dead(target_th)) { return Qnil; } @@ -2129,13 +2544,13 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) /* making an exception object can switch thread, so we need to check thread deadness again */ - if (rb_threadptr_dead(th)) { + if (rb_threadptr_dead(target_th)) { return Qnil; } - rb_threadptr_setup_exception(GET_THREAD(), exc, Qundef); - rb_threadptr_pending_interrupt_enque(th, exc); - rb_threadptr_interrupt(th); + rb_ec_setup_exception(GET_EC(), exc, Qundef); + rb_threadptr_pending_interrupt_enque(target_th, exc); + rb_threadptr_interrupt(target_th); return Qnil; } @@ -2146,7 +2561,7 @@ rb_threadptr_signal_raise(rb_thread_t *th, int sig) argv[0] = rb_eSignal; argv[1] = INT2FIX(sig); - rb_threadptr_raise(th->vm->main_thread, 2, argv); + rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv); } void @@ -2156,77 +2571,67 @@ rb_threadptr_signal_exit(rb_thread_t *th) argv[0] = rb_eSystemExit; argv[1] = rb_str_new2("exit"); - rb_threadptr_raise(th->vm->main_thread, 2, argv); -} -#if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK) -#define USE_SIGALTSTACK -#endif - -NORETURN(void ruby_thread_stack_overflow(rb_thread_t *th)); -void -ruby_thread_stack_overflow(rb_thread_t *th) -{ - th->raised_flag = 0; -#ifdef USE_SIGALTSTACK - if (!rb_during_gc()) { - rb_exc_raise(sysstack_error); - } -#endif - th->errinfo = sysstack_error; - TH_JUMP_TAG(th, TAG_RAISE); + // TODO: check signal raise deliverly + rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv); } int -rb_threadptr_set_raised(rb_thread_t *th) +rb_ec_set_raised(rb_execution_context_t *ec) { - if (th->raised_flag & RAISED_EXCEPTION) { + if (ec->raised_flag & RAISED_EXCEPTION) { return 1; } - th->raised_flag |= RAISED_EXCEPTION; + ec->raised_flag |= RAISED_EXCEPTION; return 0; } int -rb_threadptr_reset_raised(rb_thread_t *th) +rb_ec_reset_raised(rb_execution_context_t *ec) { - if (!(th->raised_flag & RAISED_EXCEPTION)) { + if (!(ec->raised_flag & RAISED_EXCEPTION)) { return 0; } - th->raised_flag &= ~RAISED_EXCEPTION; + ec->raised_flag &= ~RAISED_EXCEPTION; return 1; } int -rb_notify_fd_close(int fd) +rb_notify_fd_close(int fd, struct list_head *busy) { rb_vm_t *vm = GET_THREAD()->vm; - struct waiting_fd *wfd = 0; - int busy; - - busy = 0; - list_for_each(&vm->waiting_fds, wfd, wfd_node) { - if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; - VALUE err; + struct waiting_fd *wfd = 0, *next; - busy = 1; - if (!th) { - continue; - } - wfd->th = 0; - err = th->vm->special_exceptions[ruby_error_stream_closed]; - rb_threadptr_pending_interrupt_enque(th, err); - rb_threadptr_interrupt(th); - } + RB_VM_LOCK_ENTER(); + { + list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { + if (wfd->fd == fd) { + rb_thread_t *th = wfd->th; + VALUE err; + + list_del(&wfd->wfd_node); + list_add(busy, &wfd->wfd_node); + + err = th->vm->special_exceptions[ruby_error_stream_closed]; + rb_threadptr_pending_interrupt_enque(th, err); + rb_threadptr_interrupt(th); + } + } } - return busy; + RB_VM_LOCK_LEAVE(); + + return !list_empty(busy); } void rb_thread_fd_close(int fd) { - while (rb_notify_fd_close(fd)) rb_thread_schedule(); + struct list_head busy; + + list_head_init(&busy); + if (rb_notify_fd_close(fd, &busy)) { + do rb_thread_schedule(); while (!list_empty(&busy)); + } } /* @@ -2253,15 +2658,15 @@ rb_thread_fd_close(int fd) static VALUE thread_raise_m(int argc, VALUE *argv, VALUE self) { - rb_thread_t *target_th; - rb_thread_t *th = GET_THREAD(); - GetThreadPtr(self, target_th); + rb_thread_t *target_th = rb_thread_ptr(self); + const rb_thread_t *current_th = GET_THREAD(); + threadptr_check_pending_interrupt_queue(target_th); rb_threadptr_raise(target_th, argc, argv); /* To perform Thread.current.raise as Kernel.raise */ - if (th == target_th) { - RUBY_VM_CHECK_INTS(th); + if (current_th == target_th) { + RUBY_VM_CHECK_INTS(target_th->ec); } return Qnil; } @@ -2269,28 +2674,24 @@ thread_raise_m(int argc, VALUE *argv, VALUE self) /* * call-seq: - * thr.exit -> thr or nil - * thr.kill -> thr or nil - * thr.terminate -> thr or nil - * - * Terminates +thr+ and schedules another thread to be run. - * - * If this thread is already marked to be killed, #exit returns the Thread. + * thr.exit -> thr + * thr.kill -> thr + * thr.terminate -> thr * - * If this is the main thread, or the last thread, exits the process. + * Terminates +thr+ and schedules another thread to be run, returning + * the terminated Thread. If this is the main thread, or the last + * thread, exits the process. */ VALUE rb_thread_kill(VALUE thread) { - rb_thread_t *th; - - GetThreadPtr(thread, th); + rb_thread_t *th = rb_thread_ptr(thread); if (th->to_kill || th->status == THREAD_KILLED) { return thread; } - if (th == th->vm->main_thread) { + if (th == th->vm->ractor.main_thread) { rb_exit(EXIT_SUCCESS); } @@ -2311,9 +2712,7 @@ rb_thread_kill(VALUE thread) int rb_thread_to_be_killed(VALUE thread) { - rb_thread_t *th; - - GetThreadPtr(thread, th); + rb_thread_t *th = rb_thread_ptr(thread); if (th->to_kill || th->status == THREAD_KILLED) { return TRUE; @@ -2355,7 +2754,7 @@ rb_thread_s_kill(VALUE obj, VALUE th) */ static VALUE -rb_thread_exit(void) +rb_thread_exit(VALUE _) { rb_thread_t *th = GET_THREAD(); return rb_thread_kill(th->self); @@ -2390,15 +2789,16 @@ rb_thread_wakeup(VALUE thread) VALUE rb_thread_wakeup_alive(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); + rb_thread_t *target_th = rb_thread_ptr(thread); + if (target_th->status == THREAD_KILLED) return Qnil; - if (th->status == THREAD_KILLED) { - return Qnil; + rb_threadptr_ready(target_th); + + if (target_th->status == THREAD_STOPPED || + target_th->status == THREAD_STOPPED_FOREVER) { + target_th->status = THREAD_RUNNABLE; } - rb_threadptr_ready(th); - if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) - th->status = THREAD_RUNNABLE; + return thread; } @@ -2433,6 +2833,17 @@ rb_thread_run(VALUE thread) } +VALUE +rb_thread_stop(void) +{ + if (rb_thread_alone()) { + rb_raise(rb_eThreadError, + "stopping only thread\n\tnote: use sleep to stop forever"); + } + rb_thread_sleep_deadly(); + return Qnil; +} + /* * call-seq: * Thread.stop -> nil @@ -2448,19 +2859,21 @@ rb_thread_run(VALUE thread) * #=> "abc" */ -VALUE -rb_thread_stop(void) +static VALUE +thread_stop(VALUE _) { - if (rb_thread_alone()) { - rb_raise(rb_eThreadError, - "stopping only thread\n\tnote: use sleep to stop forever"); - } - rb_thread_sleep_deadly(); - return Qnil; + return rb_thread_stop(); } /********************************************************************/ +VALUE +rb_thread_list(void) +{ + // TODO + return rb_ractor_thread_list(GET_RACTOR()); +} + /* * call-seq: * Thread.list -> array @@ -2481,24 +2894,10 @@ rb_thread_stop(void) * #<Thread:0x401bdf4c run> */ -VALUE -rb_thread_list(void) +static VALUE +thread_list(VALUE _) { - VALUE ary = rb_ary_new(); - rb_vm_t *vm = GET_THREAD()->vm; - rb_thread_t *th = 0; - - list_for_each(&vm->living_threads, th, vmlt_node) { - switch (th->status) { - case THREAD_RUNNABLE: - case THREAD_STOPPED: - case THREAD_STOPPED_FOREVER: - rb_ary_push(ary, th->self); - default: - break; - } - } - return ary; + return rb_thread_list(); } VALUE @@ -2525,7 +2924,7 @@ thread_s_current(VALUE klass) VALUE rb_thread_main(void) { - return GET_THREAD()->vm->main_thread->self; + return GET_RACTOR()->threads.main->self; } /* @@ -2563,9 +2962,9 @@ rb_thread_s_main(VALUE klass) */ static VALUE -rb_thread_s_abort_exc(void) +rb_thread_s_abort_exc(VALUE _) { - return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; + return RBOOL(GET_THREAD()->vm->thread_abort_on_exception); } @@ -2625,9 +3024,7 @@ rb_thread_s_abort_exc_set(VALUE self, VALUE val) static VALUE rb_thread_abort_exc(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); - return th->abort_on_exception ? Qtrue : Qfalse; + return RBOOL(rb_thread_ptr(thread)->abort_on_exception); } @@ -2647,10 +3044,7 @@ rb_thread_abort_exc(VALUE thread) static VALUE rb_thread_abort_exc_set(VALUE thread, VALUE val) { - rb_thread_t *th; - - GetThreadPtr(thread, th); - th->abort_on_exception = RTEST(val); + rb_thread_ptr(thread)->abort_on_exception = RTEST(val); return val; } @@ -2661,21 +3055,46 @@ rb_thread_abort_exc_set(VALUE thread, VALUE val) * * Returns the status of the global ``report on exception'' condition. * - * The default is +false+. + * The default is +true+ since Ruby 2.5. + * + * All threads created when this flag is true will report + * a message on $stderr if an exception kills the thread. + * + * Thread.new { 1.times { raise } } + * + * will produce this output on $stderr: * - * When set to +true+, all threads will report the exception if an - * exception is raised in any thread. + * #<Thread:...> terminated with exception (report_on_exception is true): + * Traceback (most recent call last): + * 2: from -e:1:in `block in <main>' + * 1: from -e:1:in `times' + * + * This is done to catch errors in threads early. + * In some cases, you might not want this output. + * There are multiple ways to avoid the extra output: + * + * * If the exception is not intended, the best is to fix the cause of + * the exception so it does not happen anymore. + * * If the exception is intended, it might be better to rescue it closer to + * where it is raised rather then let it kill the Thread. + * * If it is guaranteed the Thread will be joined with Thread#join or + * Thread#value, then it is safe to disable this report with + * <code>Thread.current.report_on_exception = false</code> + * when starting the Thread. + * However, this might handle the exception much later, or not at all + * if the Thread is never joined due to the parent thread being blocked, etc. * * See also ::report_on_exception=. * * There is also an instance level method to set this for a specific thread, - * see #report_on_exception. + * see #report_on_exception=. + * */ static VALUE -rb_thread_s_report_exc(void) +rb_thread_s_report_exc(VALUE _) { - return GET_THREAD()->vm->thread_report_on_exception ? Qtrue : Qfalse; + return RBOOL(GET_THREAD()->vm->thread_report_on_exception); } @@ -2683,8 +3102,9 @@ rb_thread_s_report_exc(void) * call-seq: * Thread.report_on_exception= boolean -> true or false * - * When set to +true+, all threads will report the exception if an - * exception is raised. Returns the new state. + * Returns the new state. + * When set to +true+, all threads created afterwards will inherit the + * condition and report a message on $stderr if an exception kills a thread: * * Thread.report_on_exception = true * t1 = Thread.new do @@ -2697,10 +3117,9 @@ rb_thread_s_report_exc(void) * This will produce: * * In new thread - * prog.rb:4: Exception from thread (RuntimeError) - * from prog.rb:2:in `initialize' - * from prog.rb:2:in `new' - * from prog.rb:2 + * #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true): + * Traceback (most recent call last): + * prog.rb:4:in `block in <main>': Exception from thread (RuntimeError) * In the main thread * * See also ::report_on_exception. @@ -2719,25 +3138,70 @@ rb_thread_s_report_exc_set(VALUE self, VALUE val) /* * call-seq: + * Thread.ignore_deadlock -> true or false + * + * Returns the status of the global ``ignore deadlock'' condition. + * The default is +false+, so that deadlock conditions are not ignored. + * + * See also ::ignore_deadlock=. + * + */ + +static VALUE +rb_thread_s_ignore_deadlock(VALUE _) +{ + return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock); +} + + +/* + * call-seq: + * Thread.ignore_deadlock = boolean -> true or false + * + * Returns the new state. + * When set to +true+, the VM will not check for deadlock conditions. + * It is only useful to set this if your application can break a + * deadlock condition via some other means, such as a signal. + * + * Thread.ignore_deadlock = true + * queue = Thread::Queue.new + * + * trap(:SIGUSR1){queue.push "Received signal"} + * + * # raises fatal error unless ignoring deadlock + * puts queue.pop + * + * See also ::ignore_deadlock. + */ + +static VALUE +rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val) +{ + GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val); + return val; +} + + +/* + * call-seq: * thr.report_on_exception -> true or false * * Returns the status of the thread-local ``report on exception'' condition for * this +thr+. * - * The default is +false+. + * The default value when creating a Thread is the value of + * the global flag Thread.report_on_exception. * * See also #report_on_exception=. * - * There is also a class level method to set this for all threads, see - * ::report_on_exception. + * There is also a class level method to set this for all new threads, see + * ::report_on_exception=. */ static VALUE rb_thread_report_exc(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); - return th->report_on_exception ? Qtrue : Qfalse; + return RBOOL(rb_thread_ptr(thread)->report_on_exception); } @@ -2745,22 +3209,19 @@ rb_thread_report_exc(VALUE thread) * call-seq: * thr.report_on_exception= boolean -> true or false * - * When set to +true+, all threads (including the main program) will - * report the exception if an exception is raised in this +thr+. + * When set to +true+, a message is printed on $stderr if an exception + * kills this +thr+. See ::report_on_exception for details. * * See also #report_on_exception. * - * There is also a class level method to set this for all threads, see + * There is also a class level method to set this for all new threads, see * ::report_on_exception=. */ static VALUE rb_thread_report_exc_set(VALUE thread, VALUE val) { - rb_thread_t *th; - - GetThreadPtr(thread, th); - th->report_on_exception = RTEST(val); + rb_thread_ptr(thread)->report_on_exception = RTEST(val); return val; } @@ -2769,8 +3230,7 @@ rb_thread_report_exc_set(VALUE thread, VALUE val) * call-seq: * thr.group -> thgrp or nil * - * Returns the ThreadGroup which contains the given thread, or returns +nil+ - * if +thr+ is not a member of any group. + * Returns the ThreadGroup which contains the given thread. * * Thread.main.group #=> #<ThreadGroup:0x4029d914> */ @@ -2778,15 +3238,7 @@ rb_thread_report_exc_set(VALUE thread, VALUE val) VALUE rb_thread_group(VALUE thread) { - rb_thread_t *th; - VALUE group; - GetThreadPtr(thread, th); - group = th->thgroup; - - if (!group) { - group = Qnil; - } - return group; + return rb_thread_ptr(thread)->thgroup; } static const char * @@ -2794,10 +3246,7 @@ thread_status_name(rb_thread_t *th, int detail) { switch (th->status) { case THREAD_RUNNABLE: - if (th->to_kill) - return "aborting"; - else - return "run"; + return th->to_kill ? "aborting" : "run"; case THREAD_STOPPED_FOREVER: if (detail) return "sleep_forever"; case THREAD_STOPPED: @@ -2850,17 +3299,20 @@ rb_threadptr_dead(rb_thread_t *th) static VALUE rb_thread_status(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); + rb_thread_t *target_th = rb_thread_ptr(thread); - if (rb_threadptr_dead(th)) { - if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo) - /* TODO */ ) { + if (rb_threadptr_dead(target_th)) { + if (!NIL_P(target_th->ec->errinfo) && + !FIXNUM_P(target_th->ec->errinfo)) { return Qnil; } - return Qfalse; + else { + return Qfalse; + } + } + else { + return rb_str_new2(thread_status_name(target_th, FALSE)); } - return rb_str_new2(thread_status_name(th, FALSE)); } @@ -2881,12 +3333,7 @@ rb_thread_status(VALUE thread) static VALUE rb_thread_alive_p(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); - - if (rb_threadptr_dead(th)) - return Qfalse; - return Qtrue; + return RBOOL(!thread_finished(rb_thread_ptr(thread))); } /* @@ -2906,35 +3353,12 @@ rb_thread_alive_p(VALUE thread) static VALUE rb_thread_stop_p(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); + rb_thread_t *th = rb_thread_ptr(thread); - if (rb_threadptr_dead(th)) - return Qtrue; - if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) + if (rb_threadptr_dead(th)) { return Qtrue; - return Qfalse; -} - -/* - * call-seq: - * thr.safe_level -> integer - * - * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe - * levels can help when implementing sandboxes which run insecure code. - * - * thr = Thread.new { $SAFE = 1; sleep } - * Thread.current.safe_level #=> 0 - * thr.safe_level #=> 1 - */ - -static VALUE -rb_thread_safe_level(VALUE thread) -{ - rb_thread_t *th; - GetThreadPtr(thread, th); - - return INT2NUM(th->safe_level); + } + return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER); } /* @@ -2947,9 +3371,7 @@ rb_thread_safe_level(VALUE thread) static VALUE rb_thread_getname(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); - return th->name; + return rb_thread_ptr(thread)->name; } /* @@ -2963,11 +3385,8 @@ rb_thread_getname(VALUE thread) static VALUE rb_thread_setname(VALUE thread, VALUE name) { -#ifdef SET_ANOTHER_THREAD_NAME - const char *s = ""; -#endif - rb_thread_t *th; - GetThreadPtr(thread, th); + rb_thread_t *target_th = rb_thread_ptr(thread); + if (!NIL_P(name)) { rb_encoding *enc; StringValueCStr(name); @@ -2977,68 +3396,92 @@ rb_thread_setname(VALUE thread, VALUE name) rb_enc_name(enc)); } name = rb_str_new_frozen(name); -#ifdef SET_ANOTHER_THREAD_NAME - s = RSTRING_PTR(name); -#endif } - th->name = name; -#if defined(SET_ANOTHER_THREAD_NAME) - if (threadptr_initialized(th)) { - SET_ANOTHER_THREAD_NAME(th->thread_id, s); + target_th->name = name; + if (threadptr_initialized(target_th)) { + native_set_another_thread_name(target_th->thread_id, name); } -#endif return name; } +#if USE_NATIVE_THREAD_NATIVE_THREAD_ID /* * call-seq: - * thr.inspect -> string + * thr.native_thread_id -> integer + * + * Return the native thread ID which is used by the Ruby thread. + * + * The ID depends on the OS. (not POSIX thread ID returned by pthread_self(3)) + * * On Linux it is TID returned by gettid(2). + * * On macOS it is the system-wide unique integral ID of thread returned + * by pthread_threadid_np(3). + * * On FreeBSD it is the unique integral ID of the thread returned by + * pthread_getthreadid_np(3). + * * On Windows it is the thread identifier returned by GetThreadId(). + * * On other platforms, it raises NotImplementedError. + * + * NOTE: + * If the thread is not associated yet or already deassociated with a native + * thread, it returns _nil_. + * If the Ruby implementation uses M:N thread model, the ID may change + * depending on the timing. + */ + +static VALUE +rb_thread_native_thread_id(VALUE thread) +{ + rb_thread_t *target_th = rb_thread_ptr(thread); + if (rb_threadptr_dead(target_th)) return Qnil; + return native_thread_native_thread_id(target_th); +} +#else +# define rb_thread_native_thread_id rb_f_notimplement +#endif + +/* + * call-seq: + * thr.to_s -> string * * Dump the name, id, and status of _thr_ to a string. */ static VALUE -rb_thread_inspect(VALUE thread) +rb_thread_to_s(VALUE thread) { VALUE cname = rb_class_path(rb_obj_class(thread)); - rb_thread_t *th; + rb_thread_t *target_th = rb_thread_ptr(thread); const char *status; - VALUE str; + VALUE str, loc; - GetThreadPtr(thread, th); - status = thread_status_name(th, TRUE); + status = thread_status_name(target_th, TRUE); str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread); - if (!NIL_P(th->name)) { - rb_str_catf(str, "@%"PRIsVALUE, th->name); - } - if (!th->first_func && th->first_proc) { - VALUE loc = rb_proc_location(th->first_proc); - if (!NIL_P(loc)) { - const VALUE *ptr = RARRAY_CONST_PTR(loc); - rb_str_catf(str, "@%"PRIsVALUE":%"PRIsVALUE, ptr[0], ptr[1]); - rb_gc_force_recycle(loc); - } + if (!NIL_P(target_th->name)) { + rb_str_catf(str, "@%"PRIsVALUE, target_th->name); + } + if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) { + rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE, + RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1)); } rb_str_catf(str, " %s>", status); - OBJ_INFECT(str, thread); return str; } /* variables for recursive traversals */ -static ID recursive_key; +#define recursive_key id__recursive_key__ static VALUE threadptr_local_aref(rb_thread_t *th, ID id) { if (id == recursive_key) { - return th->local_storage_recursive_hash; + return th->ec->local_storage_recursive_hash; } else { - st_data_t val; + VALUE val; + struct rb_id_table *local_storage = th->ec->local_storage; - if (th->local_storage && st_lookup(th->local_storage, id, &val)) { - return (VALUE)val; + if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) { + return val; } else { return Qnil; @@ -3049,9 +3492,7 @@ threadptr_local_aref(rb_thread_t *th, ID id) VALUE rb_thread_local_aref(VALUE thread, ID id) { - rb_thread_t *th; - GetThreadPtr(thread, th); - return threadptr_local_aref(th, id); + return threadptr_local_aref(rb_thread_ptr(thread), id); } /* @@ -3122,38 +3563,88 @@ rb_thread_aref(VALUE thread, VALUE key) return rb_thread_local_aref(thread, id); } +/* + * call-seq: + * thr.fetch(sym) -> obj + * thr.fetch(sym) { } -> obj + * thr.fetch(sym, default) -> obj + * + * Returns a fiber-local for the given key. If the key can't be + * found, there are several options: With no other arguments, it will + * raise a KeyError exception; if <i>default</i> is given, then that + * will be returned; if the optional code block is specified, then + * that will be run and its result returned. See Thread#[] and + * Hash#fetch. + */ static VALUE -threadptr_local_aset(rb_thread_t *th, ID id, VALUE val) +rb_thread_fetch(int argc, VALUE *argv, VALUE self) { + VALUE key, val; + ID id; + rb_thread_t *target_th = rb_thread_ptr(self); + int block_given; + + rb_check_arity(argc, 1, 2); + key = argv[0]; + + block_given = rb_block_given_p(); + if (block_given && argc == 2) { + rb_warn("block supersedes default value argument"); + } + + id = rb_check_id(&key); + if (id == recursive_key) { - th->local_storage_recursive_hash = val; + return target_th->ec->local_storage_recursive_hash; + } + else if (id && target_th->ec->local_storage && + rb_id_table_lookup(target_th->ec->local_storage, id, &val)) { return val; } - else if (NIL_P(val)) { - if (!th->local_storage) return Qnil; - st_delete_wrap(th->local_storage, id); - return Qnil; + else if (block_given) { + return rb_yield(key); + } + else if (argc == 1) { + rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key); } else { - if (!th->local_storage) { - th->local_storage = st_init_numtable(); - } - st_insert(th->local_storage, id, val); + return argv[1]; + } +} + +static VALUE +threadptr_local_aset(rb_thread_t *th, ID id, VALUE val) +{ + if (id == recursive_key) { + th->ec->local_storage_recursive_hash = val; return val; } + else { + struct rb_id_table *local_storage = th->ec->local_storage; + + if (NIL_P(val)) { + if (!local_storage) return Qnil; + rb_id_table_delete(local_storage, id); + return Qnil; + } + else { + if (local_storage == NULL) { + th->ec->local_storage = local_storage = rb_id_table_create(0); + } + rb_id_table_insert(local_storage, id, val); + return val; + } + } } VALUE rb_thread_local_aset(VALUE thread, ID id, VALUE val) { - rb_thread_t *th; - GetThreadPtr(thread, th); - if (OBJ_FROZEN(thread)) { - rb_error_frozen("thread locals"); + rb_frozen_error_raise(thread, "can't modify frozen thread locals"); } - return threadptr_local_aset(th, id, val); + return threadptr_local_aset(rb_thread_ptr(thread), id, val); } /* @@ -3208,7 +3699,10 @@ rb_thread_variable_get(VALUE thread, VALUE key) { VALUE locals; - locals = rb_ivar_get(thread, id_locals); + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { + return Qnil; + } + locals = rb_thread_local_storage(thread); return rb_hash_aref(locals, rb_to_symbol(key)); } @@ -3222,16 +3716,16 @@ rb_thread_variable_get(VALUE thread, VALUE key) */ static VALUE -rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) +rb_thread_variable_set(VALUE thread, VALUE key, VALUE val) { VALUE locals; if (OBJ_FROZEN(thread)) { - rb_error_frozen("thread locals"); + rb_frozen_error_raise(thread, "can't modify frozen thread locals"); } - locals = rb_ivar_get(thread, id_locals); - return rb_hash_aset(locals, rb_to_symbol(id), val); + locals = rb_thread_local_storage(thread); + return rb_hash_aset(locals, rb_to_symbol(key), val); } /* @@ -3250,31 +3744,28 @@ rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) static VALUE rb_thread_key_p(VALUE self, VALUE key) { - rb_thread_t *th; + VALUE val; ID id = rb_check_id(&key); + struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage; - GetThreadPtr(self, th); - - if (!id || !th->local_storage) { + if (!id || local_storage == NULL) { return Qfalse; } - if (st_lookup(th->local_storage, id, 0)) { - return Qtrue; - } - return Qfalse; + return RBOOL(rb_id_table_lookup(local_storage, id, &val)); } -static int -thread_keys_i(ID key, VALUE value, VALUE ary) +static enum rb_id_table_iterator_result +thread_keys_i(ID key, VALUE value, void *ary) { - rb_ary_push(ary, ID2SYM(key)); - return ST_CONTINUE; + rb_ary_push((VALUE)ary, ID2SYM(key)); + return ID_TABLE_CONTINUE; } int rb_thread_alone(void) { - return vm_living_thread_num(GET_VM()) == 1; + // TODO + return rb_ractor_living_thread_num(GET_RACTOR()) == 1; } /* @@ -3294,12 +3785,11 @@ rb_thread_alone(void) static VALUE rb_thread_keys(VALUE self) { - rb_thread_t *th; + struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage; VALUE ary = rb_ary_new(); - GetThreadPtr(self, th); - if (th->local_storage) { - st_foreach(th->local_storage, thread_keys_i, ary); + if (local_storage) { + rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary); } return ary; } @@ -3334,8 +3824,11 @@ rb_thread_variables(VALUE thread) VALUE locals; VALUE ary; - locals = rb_ivar_get(thread, id_locals); ary = rb_ary_new(); + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { + return ary; + } + locals = rb_thread_local_storage(thread); rb_hash_foreach(locals, keys_i, ary); return ary; @@ -3361,20 +3854,13 @@ static VALUE rb_thread_variable_p(VALUE thread, VALUE key) { VALUE locals; - ID id = rb_check_id(&key); - if (!id) return Qfalse; - - locals = rb_ivar_get(thread, id_locals); - - if (!RHASH(locals)->ntbl) + if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) { return Qfalse; - - if (st_lookup(RHASH(locals)->ntbl, ID2SYM(id), 0)) { - return Qtrue; } + locals = rb_thread_local_storage(thread); - return Qfalse; + return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil); } /* @@ -3395,9 +3881,7 @@ rb_thread_variable_p(VALUE thread, VALUE key) static VALUE rb_thread_priority(VALUE thread) { - rb_thread_t *th; - GetThreadPtr(thread, th); - return INT2NUM(th->priority); + return INT2NUM(rb_thread_ptr(thread)->priority); } @@ -3430,13 +3914,11 @@ rb_thread_priority(VALUE thread) static VALUE rb_thread_priority_set(VALUE thread, VALUE prio) { - rb_thread_t *th; + rb_thread_t *target_th = rb_thread_ptr(thread); int priority; - GetThreadPtr(thread, th); - #if USE_NATIVE_THREAD_PRIORITY - th->priority = NUM2INT(prio); + target_th->priority = NUM2INT(prio); native_thread_apply_priority(th); #else priority = NUM2INT(prio); @@ -3446,9 +3928,9 @@ rb_thread_priority_set(VALUE thread, VALUE prio) else if (priority < RUBY_THREAD_PRIORITY_MIN) { priority = RUBY_THREAD_PRIORITY_MIN; } - th->priority = priority; + target_th->priority = (int8_t)priority; #endif - return INT2NUM(th->priority); + return INT2NUM(target_th->priority); } /* for IO */ @@ -3581,11 +4063,6 @@ rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) memcpy(dst->fdset, src->fdset, size); } -#ifdef __native_client__ -int select(int nfds, fd_set *readfds, fd_set *writefds, - fd_set *exceptfds, struct timeval *timeout); -#endif - int rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) { @@ -3605,9 +4082,7 @@ rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *excep return select(n, r, w, e, timeout); } -#if defined __GNUC__ && __GNUC__ >= 6 -#define rb_fd_no_init(fds) ASSUME(!(fds)->maxfd) -#endif +#define rb_fd_no_init(fds) ((void)((fds)->fdset = 0), (void)((fds)->maxfd = 0)) #undef FD_ZERO #undef FD_SET @@ -3657,7 +4132,9 @@ rb_fd_set(int fd, rb_fdset_t *set) } if (set->fdset->fd_count >= (unsigned)set->capa) { set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; - set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); + set->fdset = + rb_xrealloc_mul_add( + set->fdset, set->capa, sizeof(SOCKET), sizeof(unsigned int)); } set->fdset->fd_array[set->fdset->fd_count++] = s; } @@ -3672,159 +4149,203 @@ rb_fd_set(int fd, rb_fdset_t *set) #define FD_CLR(i, f) rb_fd_clr((i), (f)) #define FD_ISSET(i, f) rb_fd_isset((i), (f)) +#define rb_fd_no_init(fds) (void)((fds)->fdset = 0) + #endif #ifndef rb_fd_no_init #define rb_fd_no_init(fds) (void)(fds) #endif -static inline int -retryable(int e) +static int +wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end) { - if (e == EINTR) return TRUE; + if (*result < 0) { + switch (errnum) { + case EINTR: #ifdef ERESTART - if (e == ERESTART) return TRUE; + case ERESTART: #endif + *result = 0; + if (rel && hrtime_update_expire(rel, end)) { + *rel = 0; + } + return TRUE; + } + return FALSE; + } + else if (*result == 0) { + /* check for spurious wakeup */ + if (rel) { + return !hrtime_update_expire(rel, end); + } + return TRUE; + } return FALSE; } -#define restore_fdset(fds1, fds2) \ - ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0) +struct select_set { + int max; + int sigwait_fd; + rb_thread_t *th; + rb_fdset_t *rset; + rb_fdset_t *wset; + rb_fdset_t *eset; + rb_fdset_t orig_rset; + rb_fdset_t orig_wset; + rb_fdset_t orig_eset; + struct timeval *timeout; +}; + +static VALUE +select_set_free(VALUE p) +{ + struct select_set *set = (struct select_set *)p; -static inline void -update_timeval(struct timeval *timeout, double limit) + if (set->sigwait_fd >= 0) { + rb_sigwait_fd_put(set->th, set->sigwait_fd); + rb_sigwait_fd_migrate(set->th->vm); + } + + rb_fd_term(&set->orig_rset); + rb_fd_term(&set->orig_wset); + rb_fd_term(&set->orig_eset); + + return Qfalse; +} + +static const rb_hrtime_t * +sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig, + int *drained_p) { - if (timeout) { - double d = limit - timeofday(); + static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000; - timeout->tv_sec = (time_t)d; - timeout->tv_usec = (int)((d-(double)timeout->tv_sec)*1e6); - if (timeout->tv_sec < 0) timeout->tv_sec = 0; - if (timeout->tv_usec < 0) timeout->tv_usec = 0; + if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) { + *drained_p = check_signals_nogvl(th, sigwait_fd); + if (!orig || *orig > quantum) + return &quantum; } + + return orig; } -static int -do_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, - rb_fdset_t *exceptfds, struct timeval *timeout) +#define sigwait_signals_fd(result, cond, sigwait_fd) \ + (result > 0 && (cond) ? (result--, (sigwait_fd)) : -1) + +static VALUE +do_select(VALUE p) { - int MAYBE_UNUSED(result); + struct select_set *set = (struct select_set *)p; + int result = 0; int lerrno; - rb_fdset_t MAYBE_UNUSED(orig_read); - rb_fdset_t MAYBE_UNUSED(orig_write); - rb_fdset_t MAYBE_UNUSED(orig_except); - double limit = 0; - struct timeval wait_rest; - rb_thread_t *th = GET_THREAD(); + rb_hrtime_t *to, rel, end = 0; + timeout_prepare(&to, &rel, &end, set->timeout); +#define restore_fdset(dst, src) \ + ((dst) ? rb_fd_dup(dst, src) : (void)0) #define do_select_update() \ - (restore_fdset(readfds, &orig_read), \ - restore_fdset(writefds, &orig_write), \ - restore_fdset(exceptfds, &orig_except), \ - update_timeval(timeout, limit), \ + (restore_fdset(set->rset, &set->orig_rset), \ + restore_fdset(set->wset, &set->orig_wset), \ + restore_fdset(set->eset, &set->orig_eset), \ TRUE) - if (timeout) { - limit = timeofday(); - limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; - wait_rest = *timeout; - timeout = &wait_rest; - } - -#define fd_init_copy(f) \ - (f##fds) ? rb_fd_init_copy(&orig_##f, f##fds) : rb_fd_no_init(&orig_##f) - fd_init_copy(read); - fd_init_copy(write); - fd_init_copy(except); -#undef fd_init_copy - do { + int drained; lerrno = 0; - BLOCKING_REGION({ - result = native_fd_select(n, readfds, writefds, exceptfds, - timeout, th); - if (result < 0) lerrno = errno; - }, ubf_select, th, FALSE); - - RUBY_VM_CHECK_INTS_BLOCKING(th); - } while (result < 0 && retryable(errno = lerrno) && do_select_update()); - -#define fd_term(f) if (f##fds) rb_fd_term(&orig_##f) - fd_term(read); - fd_term(write); - fd_term(except); -#undef fd_term - - return result; -} - -static void -rb_thread_wait_fd_rw(int fd, int read) -{ - int result = 0; - int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; - - thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); + BLOCKING_REGION(set->th, { + const rb_hrtime_t *sto; + struct timeval tv; + + sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained); + if (!RUBY_VM_INTERRUPTED(set->th->ec)) { + result = native_fd_select(set->max, set->rset, set->wset, + set->eset, + rb_hrtime2timeval(&tv, sto), set->th); + if (result < 0) lerrno = errno; + } + }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE); + + if (set->sigwait_fd >= 0) { + int fd = sigwait_signals_fd(result, + rb_fd_isset(set->sigwait_fd, set->rset), + set->sigwait_fd); + (void)check_signals_nogvl(set->th, fd); + } - if (fd < 0) { - rb_raise(rb_eIOError, "closed stream"); - } + RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */ + } while (wait_retryable(&result, lerrno, to, end) && do_select_update()); - result = rb_wait_for_single_fd(fd, events, NULL); if (result < 0) { - rb_sys_fail(0); + errno = lerrno; } - thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); + return (VALUE)result; } -void -rb_thread_wait_fd(int fd) +static rb_fdset_t * +init_set_fd(int fd, rb_fdset_t *fds) { - rb_thread_wait_fd_rw(fd, 1); -} + if (fd < 0) { + return 0; + } + rb_fd_init(fds); + rb_fd_set(fd, fds); -int -rb_thread_fd_writable(int fd) -{ - rb_thread_wait_fd_rw(fd, 0); - return TRUE; + return fds; } int rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, struct timeval *timeout) { - if (!read && !write && !except) { - if (!timeout) { - rb_thread_sleep_forever(); - return 0; - } - rb_thread_wait_for(*timeout); - return 0; - } + struct select_set set; - if (read) { - rb_fd_resize(max - 1, read); - } - if (write) { - rb_fd_resize(max - 1, write); - } - if (except) { - rb_fd_resize(max - 1, except); + set.th = GET_THREAD(); + RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec); + set.max = max; + set.rset = read; + set.wset = write; + set.eset = except; + set.timeout = timeout; + + if (!set.rset && !set.wset && !set.eset) { + if (!timeout) { + rb_thread_sleep_forever(); + return 0; + } + rb_thread_wait_for(*timeout); + return 0; + } + + set.sigwait_fd = rb_sigwait_fd_get(set.th); + if (set.sigwait_fd >= 0) { + if (set.rset) + rb_fd_set(set.sigwait_fd, set.rset); + else + set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset); + if (set.sigwait_fd >= set.max) { + set.max = set.sigwait_fd + 1; + } } - return do_select(max, read, write, except, timeout); -} +#define fd_init_copy(f) do { \ + if (set.f) { \ + rb_fd_resize(set.max - 1, set.f); \ + if (&set.orig_##f != set.f) { /* sigwait_fd */ \ + rb_fd_init_copy(&set.orig_##f, set.f); \ + } \ + } \ + else { \ + rb_fd_no_init(&set.orig_##f); \ + } \ + } while (0) + fd_init_copy(rset); + fd_init_copy(wset); + fd_init_copy(eset); +#undef fd_init_copy -/* - * poll() is supported by many OSes, but so far Linux is the only - * one we know of that supports using poll() in all places select() - * would work. - */ -#if defined(HAVE_POLL) && defined(__linux__) -# define USE_POLL -#endif + return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); +} #ifdef USE_POLL @@ -3833,89 +4354,95 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * #define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) #define POLLEX_SET (POLLPRI) -#ifndef HAVE_PPOLL -/* TODO: don't ignore sigmask */ -int -ppoll(struct pollfd *fds, nfds_t nfds, - const struct timespec *ts, const sigset_t *sigmask) -{ - int timeout_ms; - - if (ts) { - int tmp, tmp2; - - if (ts->tv_sec > INT_MAX/1000) - timeout_ms = -1; - else { - tmp = (int)(ts->tv_sec * 1000); - tmp2 = (int)(ts->tv_nsec / (1000 * 1000)); - if (INT_MAX - tmp < tmp2) - timeout_ms = -1; - else - timeout_ms = (int)(tmp + tmp2); - } - } - else - timeout_ms = -1; - - return poll(fds, nfds, timeout_ms); -} +#ifndef POLLERR_SET /* defined for FreeBSD for now */ +# define POLLERR_SET (0) #endif -static inline void -update_timespec(struct timespec *timeout, double limit) -{ - if (timeout) { - double d = limit - timeofday(); - - timeout->tv_sec = (long)d; - timeout->tv_nsec = (long)((d-(double)timeout->tv_sec)*1e9); - if (timeout->tv_sec < 0) timeout->tv_sec = 0; - if (timeout->tv_nsec < 0) timeout->tv_nsec = 0; - } -} - /* * returns a mask of events */ int -rb_wait_for_single_fd(int fd, int events, struct timeval *tv) +rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) { - struct pollfd fds; - int result = 0, lerrno; - double limit = 0; - struct timespec ts; - struct timespec *timeout = NULL; - rb_thread_t *th = GET_THREAD(); + struct pollfd fds[2]; + int result = 0; + int drained; + nfds_t nfds; + rb_unblock_function_t *ubf; + struct waiting_fd wfd; + int state; + volatile int lerrno; -#define poll_update() \ - (update_timespec(timeout, limit), \ - TRUE) + wfd.th = GET_THREAD(); + wfd.fd = fd; - if (tv) { - ts.tv_sec = tv->tv_sec; - ts.tv_nsec = tv->tv_usec * 1000; - limit = timeofday(); - limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; - timeout = &ts; + RB_VM_LOCK_ENTER(); + { + list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); + + EC_PUSH_TAG(wfd.th->ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + rb_hrtime_t *to, rel, end = 0; + RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + timeout_prepare(&to, &rel, &end, timeout); + fds[0].fd = fd; + fds[0].events = (short)events; + fds[0].revents = 0; + do { + fds[1].fd = rb_sigwait_fd_get(wfd.th); + + if (fds[1].fd >= 0) { + fds[1].events = POLLIN; + fds[1].revents = 0; + nfds = 2; + ubf = ubf_sigwait; + } + else { + nfds = 1; + ubf = ubf_select; + } + + lerrno = 0; + BLOCKING_REGION(wfd.th, { + const rb_hrtime_t *sto; + struct timespec ts; + + sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained); + if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) { + result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0); + if (result < 0) lerrno = errno; + } + }, ubf, wfd.th, TRUE); + + if (fds[1].fd >= 0) { + int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd); + (void)check_signals_nogvl(wfd.th, fd1); + rb_sigwait_fd_put(wfd.th, fds[1].fd); + rb_sigwait_fd_migrate(wfd.th->vm); + } + RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + } while (wait_retryable(&result, lerrno, to, end)); + } + EC_POP_TAG(); + + RB_VM_LOCK_ENTER(); + { + list_del(&wfd.wfd_node); } + RB_VM_LOCK_LEAVE(); - fds.fd = fd; - fds.events = (short)events; - - do { - fds.revents = 0; - lerrno = 0; - BLOCKING_REGION({ - result = ppoll(&fds, 1, timeout, NULL); - if (result < 0) lerrno = errno; - }, ubf_select, th, FALSE); + if (state) { + EC_JUMP_TAG(wfd.th->ec, state); + } - RUBY_VM_CHECK_INTS_BLOCKING(th); - } while (result < 0 && retryable(errno = lerrno) && poll_update()); - if (result < 0) return -1; + if (result < 0) { + errno = lerrno; + return -1; + } - if (fds.revents & POLLNVAL) { + if (fds[0].revents & POLLNVAL) { errno = EBADF; return -1; } @@ -3925,25 +4452,20 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) * Therefore we need to fix it up. */ result = 0; - if (fds.revents & POLLIN_SET) + if (fds[0].revents & POLLIN_SET) result |= RB_WAITFD_IN; - if (fds.revents & POLLOUT_SET) + if (fds[0].revents & POLLOUT_SET) result |= RB_WAITFD_OUT; - if (fds.revents & POLLEX_SET) + if (fds[0].revents & POLLEX_SET) result |= RB_WAITFD_PRI; + /* all requested events are ready if there is an error */ + if (fds[0].revents & POLLERR_SET) + result |= events; + return result; } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ -static rb_fdset_t * -init_set_fd(int fd, rb_fdset_t *fds) -{ - rb_fd_init(fds); - rb_fd_set(fd, fds); - - return fds; -} - struct select_args { union { int fd; @@ -3952,6 +4474,7 @@ struct select_args { rb_fdset_t *read; rb_fdset_t *write; rb_fdset_t *except; + struct waiting_fd wfd; struct timeval *tv; }; @@ -3982,6 +4505,11 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; + RB_VM_LOCK_ENTER(); + { + list_del(&args->wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -3990,7 +4518,7 @@ select_single_cleanup(VALUE ptr) } int -rb_wait_for_single_fd(int fd, int events, struct timeval *tv) +rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout) { rb_fdset_t rfds, wfds, efds; struct select_args args; @@ -4001,7 +4529,15 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; - args.tv = tv; + args.tv = timeout; + args.wfd.fd = fd; + args.wfd.th = GET_THREAD(); + + RB_VM_LOCK_ENTER(); + { + list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node); + } + RB_VM_LOCK_LEAVE(); r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); if (r == -1) @@ -4024,7 +4560,6 @@ rb_gc_set_stack_end(VALUE **stack_end_p) } #endif - /* * */ @@ -4035,39 +4570,88 @@ rb_threadptr_check_signal(rb_thread_t *mth) /* mth must be main_thread */ if (rb_signal_buff_size() > 0) { /* wakeup main thread */ - rb_threadptr_trap_interrupt(mth); + threadptr_trap_interrupt(mth); } } static void -timer_thread_function(void *arg) +async_bug_fd(const char *mesg, int errno_arg, int fd) { - rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ + char buff[64]; + size_t n = strlcpy(buff, mesg, sizeof(buff)); + if (n < sizeof(buff)-3) { + ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); + } + rb_async_bug_errno(buff, errno_arg); +} + +/* VM-dependent API is not available for this function */ +static int +consume_communication_pipe(int fd) +{ +#if USE_EVENTFD + uint64_t buff[1]; +#else + /* buffer can be shared because no one refers to them. */ + static char buff[1024]; +#endif + ssize_t result; + int ret = FALSE; /* for rb_sigwait_sleep */ /* - * Tricky: thread_destruct_lock doesn't close a race against - * vm->running_thread switch. however it guarantees th->running_thread - * point to valid pointer or NULL. + * disarm UBF_TIMER before we read, because it can become + * re-armed at any time via sighandler and the pipe will refill + * We can disarm it because this thread is now processing signals + * and we do not want unnecessary SIGVTALRM */ - native_mutex_lock(&vm->thread_destruct_lock); - /* for time slice */ - if (vm->running_thread) - RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread); - native_mutex_unlock(&vm->thread_destruct_lock); - - /* check signal */ - rb_threadptr_check_signal(vm->main_thread); - -#if 0 - /* prove profiler */ - if (vm->prove_profile.enable) { - rb_thread_t *th = vm->running_thread; - - if (vm->during_gc) { - /* GC prove profiling */ + ubf_timer_disarm(); + + while (1) { + result = read(fd, buff, sizeof(buff)); + if (result > 0) { + ret = TRUE; + if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) { + return ret; + } } - } + else if (result == 0) { + return ret; + } + else if (result < 0) { + int e = errno; + switch (e) { + case EINTR: + continue; /* retry */ + case EAGAIN: +#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN + case EWOULDBLOCK: #endif + return ret; + default: + async_bug_fd("consume_communication_pipe: read", e, fd); + } + } + } +} + +static int +check_signals_nogvl(rb_thread_t *th, int sigwait_fd) +{ + rb_vm_t *vm = GET_VM(); /* th may be 0 */ + int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE; + ubf_wakeup_all_threads(); + ruby_sigchld_handler(vm); + if (rb_signal_buff_size()) { + if (th == vm->ractor.main_thread) { + /* no need to lock + wakeup if already in main thread */ + RUBY_VM_SET_TRAP_INTERRUPT(th->ec); + } + else { + threadptr_trap_interrupt(vm->ractor.main_thread); + } + ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */ + } + return ret; } void @@ -4091,46 +4675,84 @@ rb_thread_start_timer_thread(void) rb_thread_create_timer_thread(); } -#if defined(HAVE_WORKING_FORK) static int clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) { int i; - VALUE lines = (VALUE)val; + VALUE coverage = (VALUE)val; + VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES); + VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES); - for (i = 0; i < RARRAY_LEN(lines); i++) { - if (RARRAY_AREF(lines, i) != Qnil) { - RARRAY_ASET(lines, i, INT2FIX(0)); + if (lines) { + if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) { + rb_ary_clear(lines); + } + else { + int i; + for (i = 0; i < RARRAY_LEN(lines); i++) { + if (RARRAY_AREF(lines, i) != Qnil) + RARRAY_ASET(lines, i, INT2FIX(0)); + } + } + } + if (branches) { + VALUE counters = RARRAY_AREF(branches, 1); + for (i = 0; i < RARRAY_LEN(counters); i++) { + RARRAY_ASET(counters, i, INT2FIX(0)); } } + return ST_CONTINUE; } -static void -clear_coverage(void) +void +rb_clear_coverages(void) { VALUE coverages = rb_get_coverages(); if (RTEST(coverages)) { - st_foreach(rb_hash_tbl_raw(coverages), clear_coverage_i, 0); + rb_hash_foreach(coverages, clear_coverage_i, 0); } } +#if defined(HAVE_WORKING_FORK) + static void rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const rb_thread_t *)) { rb_thread_t *i = 0; rb_vm_t *vm = th->vm; - vm->main_thread = th; - - gvl_atfork(th->vm); - - list_for_each(&vm->living_threads, i, vmlt_node) { - atfork(i, th); + rb_ractor_t *r = th->ractor; + vm->ractor.main_ractor = r; + vm->ractor.main_thread = th; + r->threads.main = th; + r->status_ = ractor_created; + + gvl_atfork(rb_ractor_gvl(th->ractor)); + ubf_list_atfork(); + + // OK. Only this thread accesses: + list_for_each(&vm->ractor.set, r, vmlr_node) { + list_for_each(&r->threads.set, i, lt_node) { + atfork(i, th); + } } rb_vm_living_threads_init(vm); - rb_vm_living_threads_insert(vm, th); - vm->sleeper = 0; - clear_coverage(); + + rb_ractor_atfork(vm, th); + + /* may be held by MJIT threads in parent */ + rb_native_mutex_initialize(&vm->waitpid_lock); + rb_native_mutex_initialize(&vm->workqueue_lock); + + /* may be held by any thread in parent */ + rb_native_mutex_initialize(&th->interrupt_lock); + + vm->fork_gen++; + rb_ractor_sleeper_threads_clear(th->ractor); + rb_clear_coverages(); + + VM_ASSERT(vm->ractor.blocking_cnt == 0); + VM_ASSERT(vm->ractor.cnt == 1); } static void @@ -4143,15 +4765,20 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th) } } +void rb_fiber_atfork(rb_thread_t *); void rb_thread_atfork(void) { rb_thread_t *th = GET_THREAD(); rb_thread_atfork_internal(th, terminate_atfork_i); th->join_list = NULL; + rb_fiber_atfork(th); /* We don't want reproduce CVE-2003-0900. */ rb_reset_random_seed(); + + /* For child, starting MJIT worker thread in this place which is safer than immediately after `after_fork_ruby`. */ + mjit_child_after_fork(); } static void @@ -4193,7 +4820,7 @@ thgroup_memsize(const void *ptr) static const rb_data_type_t thgroup_data_type = { "thgroup", - {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, + {0, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY }; @@ -4242,11 +4869,11 @@ static VALUE thgroup_list(VALUE group) { VALUE ary = rb_ary_new(); - rb_vm_t *vm = GET_THREAD()->vm; rb_thread_t *th = 0; + rb_ractor_t *r = GET_RACTOR(); - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->thgroup == group) { + list_for_each(&r->threads.set, th, lt_node) { + if (th->thgroup == group) { rb_ary_push(ary, th->self); } } @@ -4264,8 +4891,8 @@ thgroup_list(VALUE group) * New threads can still be started in an enclosed ThreadGroup. * * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914> - * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep> - * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4> + * thr = Thread.new { Thread.stop } #=> #<Thread:0x402a7210 sleep> + * tg = ThreadGroup.new #=> #<ThreadGroup:0x402752d4> * tg.add thr * #=> ThreadError: can't move from the enclosed thread group */ @@ -4295,9 +4922,7 @@ thgroup_enclosed_p(VALUE group) struct thgroup *data; TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); - if (data->enclosed) - return Qtrue; - return Qfalse; + return RBOOL(data->enclosed); } @@ -4330,11 +4955,9 @@ thgroup_enclosed_p(VALUE group) static VALUE thgroup_add(VALUE group, VALUE thread) { - rb_thread_t *th; + rb_thread_t *target_th = rb_thread_ptr(thread); struct thgroup *data; - GetThreadPtr(thread, th); - if (OBJ_FROZEN(group)) { rb_raise(rb_eThreadError, "can't move to the frozen thread group"); } @@ -4343,20 +4966,16 @@ thgroup_add(VALUE group, VALUE thread) rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); } - if (!th->thgroup) { - return Qnil; - } - - if (OBJ_FROZEN(th->thgroup)) { + if (OBJ_FROZEN(target_th->thgroup)) { rb_raise(rb_eThreadError, "can't move from the frozen thread group"); } - TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data); + TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data); if (data->enclosed) { rb_raise(rb_eThreadError, "can't move from the enclosed thread group"); } - th->thgroup = group; + target_th->thgroup = group; return group; } @@ -4382,16 +5001,22 @@ thread_shield_alloc(VALUE klass) } #define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type)) -#define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19) +#define THREAD_SHIELD_WAITING_MASK (((FL_USER19-1)&~(FL_USER0-1))|FL_USER19) #define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT) -#define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT) +#define THREAD_SHIELD_WAITING_MAX (THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT) +STATIC_ASSERT(THREAD_SHIELD_WAITING_MAX, THREAD_SHIELD_WAITING_MAX <= UINT_MAX); +static inline unsigned int +rb_thread_shield_waiting(VALUE b) +{ + return ((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT); +} static inline void rb_thread_shield_waiting_inc(VALUE b) { unsigned int w = rb_thread_shield_waiting(b); w++; - if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)) + if (w > THREAD_SHIELD_WAITING_MAX) rb_raise(rb_eRuntimeError, "waiting count overflow"); RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT); @@ -4415,6 +5040,17 @@ rb_thread_shield_new(void) return thread_shield; } +bool +rb_thread_shield_owned(VALUE self) +{ + VALUE mutex = GetThreadShieldPtr(self); + if (!mutex) return false; + + rb_mutex_t *m = mutex_ptr(mutex); + + return m->fiber == GET_EC()->fiber_ptr; +} + /* * Wait a thread shield. * @@ -4430,8 +5066,8 @@ rb_thread_shield_wait(VALUE self) rb_mutex_t *m; if (!mutex) return Qfalse; - GetMutexPtr(mutex, m); - if (m->th == GET_THREAD()) return Qnil; + m = mutex_ptr(mutex); + if (m->fiber == GET_EC()->fiber_ptr) return Qnil; rb_thread_shield_waiting_inc(self); rb_mutex_lock(mutex); rb_thread_shield_waiting_dec(self); @@ -4457,7 +5093,7 @@ rb_thread_shield_release(VALUE self) { VALUE mutex = thread_shield_get_mutex(self); rb_mutex_unlock(mutex); - return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; + return RBOOL(rb_thread_shield_waiting(self) > 0); } /* @@ -4469,19 +5105,19 @@ rb_thread_shield_destroy(VALUE self) VALUE mutex = thread_shield_get_mutex(self); DATA_PTR(self) = 0; rb_mutex_unlock(mutex); - return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; + return RBOOL(rb_thread_shield_waiting(self) > 0); } static VALUE threadptr_recursive_hash(rb_thread_t *th) { - return th->local_storage_recursive_hash; + return th->ec->local_storage_recursive_hash; } static void threadptr_recursive_hash_set(rb_thread_t *th, VALUE hash) { - th->local_storage_recursive_hash = hash; + th->ec->local_storage_recursive_hash = hash; } ID rb_frame_last_func(void); @@ -4507,29 +5143,29 @@ recursive_list_access(VALUE sym) list = rb_hash_aref(hash, sym); } if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) { - list = rb_hash_new(); + list = rb_ident_hash_new(); rb_hash_aset(hash, sym, list); } return list; } /* - * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already + * Returns Qtrue if and only if obj (or the pair <obj, paired_obj>) is already * in the recursion list. * Assumes the recursion list is valid. */ static VALUE -recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) +recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id) { #if SIZEOF_LONG == SIZEOF_VOIDP #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other)) #elif SIZEOF_LONG_LONG == SIZEOF_VOIDP - #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \ + #define OBJ_ID_EQL(obj_id, other) (RB_BIGNUM_TYPE_P((obj_id)) ? \ rb_big_eql((obj_id), (other)) : ((obj_id) == (other))) #endif - VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); + VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); if (pair_list == Qundef) return Qfalse; if (paired_obj_id) { @@ -4546,10 +5182,10 @@ recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) } /* - * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. - * For a single obj_id, it sets list[obj_id] to Qtrue. - * For a pair, it sets list[obj_id] to paired_obj_id if possible, - * otherwise list[obj_id] becomes a hash like: + * Pushes obj (or the pair <obj, paired_obj>) in the recursion list. + * For a single obj, it sets list[obj] to Qtrue. + * For a pair, it sets list[obj] to paired_obj_id if possible, + * otherwise list[obj] becomes a hash like: * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } * Assumes the recursion list is valid. */ @@ -4577,10 +5213,10 @@ recursive_push(VALUE list, VALUE obj, VALUE paired_obj) } /* - * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. - * For a pair, if list[obj_id] is a hash, then paired_obj_id is + * Pops obj (or the pair <obj, paired_obj>) from the recursion list. + * For a pair, if list[obj] is a hash, then paired_obj_id is * removed from the hash and no attempt is made to simplify - * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id + * list[obj] from {only_one_paired_id => true} to only_one_paired_id * Assumes the recursion list is valid. */ @@ -4607,7 +5243,6 @@ struct exec_recursive_params { VALUE (*func) (VALUE, VALUE, int); VALUE list; VALUE obj; - VALUE objid; VALUE pairid; VALUE arg; }; @@ -4639,54 +5274,56 @@ exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE struct exec_recursive_params p; int outermost; p.list = recursive_list_access(sym); - p.objid = rb_obj_id(obj); p.obj = obj; p.pairid = pairid; p.arg = arg; outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); - if (recursive_check(p.list, p.objid, pairid)) { + if (recursive_check(p.list, p.obj, pairid)) { if (outer && !outermost) { rb_throw_obj(p.list, p.list); } return (*func)(obj, arg, TRUE); } else { - int state; + enum ruby_tag_type state; p.func = func; if (outermost) { recursive_push(p.list, ID2SYM(recursive_key), 0); - recursive_push(p.list, p.objid, p.pairid); + recursive_push(p.list, p.obj, p.pairid); result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state); - if (!recursive_pop(p.list, p.objid, p.pairid)) goto invalid; + if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid; if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid; - if (state) JUMP_TAG(state); + if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state); if (result == p.list) { result = (*func)(obj, arg, TRUE); } } else { volatile VALUE ret = Qundef; - recursive_push(p.list, p.objid, p.pairid); - PUSH_TAG(); - if ((state = EXEC_TAG()) == 0) { + recursive_push(p.list, p.obj, p.pairid); + EC_PUSH_TAG(GET_EC()); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { ret = (*func)(obj, arg, FALSE); } - POP_TAG(); - if (!recursive_pop(p.list, p.objid, p.pairid)) { - invalid: - rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list " - "for %+"PRIsVALUE" in %+"PRIsVALUE, - sym, rb_thread_current()); + EC_POP_TAG(); + if (!recursive_pop(p.list, p.obj, p.pairid)) { + goto invalid; } - if (state) JUMP_TAG(state); + if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state); result = ret; } } *(volatile struct exec_recursive_params *)&p; return result; + + invalid: + rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list " + "for %+"PRIsVALUE" in %+"PRIsVALUE, + sym, rb_thread_current()); + UNREACHABLE_RETURN(Qundef); } /* @@ -4708,7 +5345,7 @@ rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) VALUE rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) { - return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0); + return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0); } /* @@ -4732,12 +5369,12 @@ rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) VALUE rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) { - return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1); + return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1); } /* * call-seq: - * thread.backtrace -> array + * thread.backtrace -> array or nil * * Returns the current backtrace of the target thread. * @@ -4766,6 +5403,16 @@ rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) return rb_vm_thread_backtrace_locations(argc, argv, thval); } +void +Init_Thread_Mutex(void) +{ + rb_thread_t *th = GET_THREAD(); + + rb_native_mutex_initialize(&th->vm->waitpid_lock); + rb_native_mutex_initialize(&th->vm->workqueue_lock); + rb_native_mutex_initialize(&th->interrupt_lock); +} + /* * Document-class: ThreadError * @@ -4784,31 +5431,29 @@ rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) void Init_Thread(void) { -#undef rb_intern -#define rb_intern(str) rb_intern_const(str) - VALUE cThGroup; rb_thread_t *th = GET_THREAD(); - sym_never = ID2SYM(rb_intern("never")); - sym_immediate = ID2SYM(rb_intern("immediate")); - sym_on_blocking = ID2SYM(rb_intern("on_blocking")); - id_locals = rb_intern("locals"); + sym_never = ID2SYM(rb_intern_const("never")); + sym_immediate = ID2SYM(rb_intern_const("immediate")); + sym_on_blocking = ID2SYM(rb_intern_const("on_blocking")); rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); rb_define_singleton_method(rb_cThread, "start", thread_start, -2); rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); - rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); + rb_define_singleton_method(rb_cThread, "stop", thread_stop, 0); rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); - rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); + rb_define_singleton_method(rb_cThread, "list", thread_list, 0); rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); rb_define_singleton_method(rb_cThread, "report_on_exception", rb_thread_s_report_exc, 0); rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1); + rb_define_singleton_method(rb_cThread, "ignore_deadlock", rb_thread_s_ignore_deadlock, 0); + rb_define_singleton_method(rb_cThread, "ignore_deadlock=", rb_thread_s_ignore_deadlock_set, 1); #if THREAD_DEBUG < 0 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); @@ -4828,6 +5473,7 @@ Init_Thread(void) rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0); rb_define_method(rb_cThread, "[]", rb_thread_aref, 1); rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2); + rb_define_method(rb_cThread, "fetch", rb_thread_fetch, -1); rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1); rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); @@ -4843,16 +5489,18 @@ Init_Thread(void) rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); rb_define_method(rb_cThread, "report_on_exception", rb_thread_report_exc, 0); rb_define_method(rb_cThread, "report_on_exception=", rb_thread_report_exc_set, 1); - rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); rb_define_method(rb_cThread, "group", rb_thread_group, 0); rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1); rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1); rb_define_method(rb_cThread, "name", rb_thread_getname, 0); rb_define_method(rb_cThread, "name=", rb_thread_setname, 1); - rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); + rb_define_method(rb_cThread, "native_thread_id", rb_thread_native_thread_id, 0); + rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0); + rb_define_alias(rb_cThread, "inspect", "to_s"); - rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError, "stream closed"); + rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError, + "stream closed in another thread"); cThGroup = rb_define_class("ThreadGroup", rb_cObject); rb_define_alloc_func(cThGroup, thgroup_s_alloc); @@ -4862,11 +5510,10 @@ Init_Thread(void) rb_define_method(cThGroup, "add", thgroup_add, 1); { - th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); + th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup); rb_define_const(cThGroup, "Default", th->thgroup); } - recursive_key = rb_intern("__recursive_key__"); rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); /* init thread core */ @@ -4874,26 +5521,17 @@ Init_Thread(void) /* main thread setting */ { /* acquire global vm lock */ - gvl_init(th->vm); - gvl_acquire(th->vm, th); - native_mutex_initialize(&th->vm->thread_destruct_lock); - native_mutex_initialize(&th->interrupt_lock); - native_cond_initialize(&th->interrupt_cond, - RB_CONDATTR_CLOCK_MONOTONIC); + rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); + gvl_acquire(gvl, th); th->pending_interrupt_queue = rb_ary_tmp_new(0); th->pending_interrupt_queue_checked = 0; th->pending_interrupt_mask_stack = rb_ary_tmp_new(0); - - th->interrupt_mask = 0; } } rb_thread_create_timer_thread(); - /* suppress warnings on cygwin, mingw and mswin.*/ - (void)native_mutex_trylock; - Init_thread_sync(); } @@ -4905,135 +5543,323 @@ ruby_native_thread_p(void) return th != 0; } -VALUE rb_vm_backtrace_str_ary(rb_thread_t *th, long lev, long n); static void -debug_deadlock_check(rb_vm_t *vm, VALUE msg) +debug_deadlock_check(rb_ractor_t *r, VALUE msg) { rb_thread_t *th = 0; VALUE sep = rb_str_new_cstr("\n "); rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n", - vm_living_thread_num(vm), vm->sleeper, GET_THREAD(), vm->main_thread); - list_for_each(&vm->living_threads, th, vmlt_node) { - rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " - "native:%"PRI_THREAD_ID" int:%u", - th->self, th, thread_id_str(th), th->interrupt_flag); - if (th->locking_mutex) { - rb_mutex_t *mutex; - struct rb_thread_struct volatile *mth; - int waiting; - GetMutexPtr(th->locking_mutex, mutex); - - native_mutex_lock(&mutex->lock); - mth = mutex->th; - waiting = mutex->cond_waiting; - native_mutex_unlock(&mutex->lock); - rb_str_catf(msg, " mutex:%p cond:%d", mth, waiting); - } - { - rb_thread_list_t *list = th->join_list; - while (list) { - rb_str_catf(msg, "\n depended by: tb_thread_id:%p", list->th); - list = list->next; - } - } - rb_str_catf(msg, "\n "); - rb_str_concat(msg, rb_ary_join(rb_vm_backtrace_str_ary(th, 0, 0), sep)); - rb_str_catf(msg, "\n"); + rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r), + (void *)GET_THREAD(), (void *)r->threads.main); + + list_for_each(&r->threads.set, th, lt_node) { + rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p " + "native:%"PRI_THREAD_ID" int:%u", + th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag); + + if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE, + (void *)mutex->fiber, rb_mutex_num_waiting(mutex)); + } + + { + struct rb_waiting_list *list = th->join_list; + while (list) { + rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread); + list = list->next; + } + } + rb_str_catf(msg, "\n "); + rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep)); + rb_str_catf(msg, "\n"); } } static void -rb_check_deadlock(rb_vm_t *vm) +rb_check_deadlock(rb_ractor_t *r) { + if (GET_THREAD()->vm->thread_ignore_deadlock) return; + int found = 0; - rb_thread_t *th = 0; + rb_thread_t *th = NULL; + int sleeper_num = rb_ractor_sleeper_thread_num(r); + int ltnum = rb_ractor_living_thread_num(r); - if (vm_living_thread_num(vm) > vm->sleeper) return; - if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); + if (ltnum > sleeper_num) return; + if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); if (patrol_thread && patrol_thread != GET_THREAD()) return; - list_for_each(&vm->living_threads, th, vmlt_node) { - if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) { - found = 1; - } - else if (th->locking_mutex) { - rb_mutex_t *mutex; - GetMutexPtr(th->locking_mutex, mutex); - - native_mutex_lock(&mutex->lock); - if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { - found = 1; - } - native_mutex_unlock(&mutex->lock); - } - if (found) - break; + list_for_each(&r->threads.set, th, lt_node) { + if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) { + found = 1; + } + else if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); + if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !list_empty(&mutex->waitq))) { + found = 1; + } + } + if (found) + break; } if (!found) { VALUE argv[2]; argv[0] = rb_eFatal; argv[1] = rb_str_new2("No live threads left. Deadlock?"); - debug_deadlock_check(vm, argv[1]); - vm->sleeper--; - rb_threadptr_raise(vm->main_thread, 2, argv); + debug_deadlock_check(r, argv[1]); + rb_ractor_sleeper_threads_dec(GET_RACTOR()); + rb_threadptr_raise(r->threads.main, 2, argv); } } static void -update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) +update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg) { - VALUE coverage = rb_iseq_coverage(GET_THREAD()->cfp->iseq); + const rb_control_frame_t *cfp = GET_EC()->cfp; + VALUE coverage = rb_iseq_coverage(cfp->iseq); if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { - long line = rb_sourceline() - 1; - long count; - VALUE num; - if (line >= RARRAY_LEN(coverage)) { /* no longer tracked */ - return; + VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES); + if (lines) { + long line = rb_sourceline() - 1; + long count; + VALUE num; + void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset); + if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) { + rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - cfp->iseq->body->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE); + rb_ary_push(lines, LONG2FIX(line + 1)); + return; + } + if (line >= RARRAY_LEN(lines)) { /* no longer tracked */ + return; + } + num = RARRAY_AREF(lines, line); + if (!FIXNUM_P(num)) return; + count = FIX2LONG(num) + 1; + if (POSFIXABLE(count)) { + RARRAY_ASET(lines, line, LONG2FIX(count)); + } } - num = RARRAY_AREF(coverage, line); - if (!FIXNUM_P(num)) return; - count = FIX2LONG(num) + 1; - if (POSFIXABLE(count)) { - RARRAY_ASET(coverage, line, LONG2FIX(count)); + } +} + +static void +update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg) +{ + const rb_control_frame_t *cfp = GET_EC()->cfp; + VALUE coverage = rb_iseq_coverage(cfp->iseq); + if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) { + VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES); + if (branches) { + long pc = cfp->pc - cfp->iseq->body->iseq_encoded - 1; + long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count; + VALUE counters = RARRAY_AREF(branches, 1); + VALUE num = RARRAY_AREF(counters, idx); + count = FIX2LONG(num) + 1; + if (POSFIXABLE(count)) { + RARRAY_ASET(counters, idx, LONG2FIX(count)); + } } } } +const rb_method_entry_t * +rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5]) +{ + VALUE path, beg_pos_lineno, beg_pos_column, end_pos_lineno, end_pos_column; + + if (!me->def) return NULL; // negative cme + + retry: + switch (me->def->type) { + case VM_METHOD_TYPE_ISEQ: { + const rb_iseq_t *iseq = me->def->body.iseq.iseqptr; + rb_iseq_location_t *loc = &iseq->body->location; + path = rb_iseq_path(iseq); + beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno); + beg_pos_column = INT2FIX(loc->code_location.beg_pos.column); + end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno); + end_pos_column = INT2FIX(loc->code_location.end_pos.column); + break; + } + case VM_METHOD_TYPE_BMETHOD: { + const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0); + if (iseq) { + rb_iseq_location_t *loc; + rb_iseq_check(iseq); + path = rb_iseq_path(iseq); + loc = &iseq->body->location; + beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno); + beg_pos_column = INT2FIX(loc->code_location.beg_pos.column); + end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno); + end_pos_column = INT2FIX(loc->code_location.end_pos.column); + break; + } + return NULL; + } + case VM_METHOD_TYPE_ALIAS: + me = me->def->body.alias.original_me; + goto retry; + case VM_METHOD_TYPE_REFINED: + me = me->def->body.refined.orig_me; + if (!me) return NULL; + goto retry; + default: + return NULL; + } + + /* found */ + if (RB_TYPE_P(path, T_ARRAY)) { + path = rb_ary_entry(path, 1); + if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */ + } + if (resolved_location) { + resolved_location[0] = path; + resolved_location[1] = beg_pos_lineno; + resolved_location[2] = beg_pos_column; + resolved_location[3] = end_pos_lineno; + resolved_location[4] = end_pos_column; + } + return me; +} + +static void +update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg) +{ + const rb_control_frame_t *cfp = GET_EC()->cfp; + const rb_callable_method_entry_t *cme = rb_vm_frame_method_entry(cfp); + const rb_method_entry_t *me = (const rb_method_entry_t *)cme; + VALUE rcount; + long count; + + me = rb_resolve_me_location(me, 0); + if (!me) return; + + rcount = rb_hash_aref(me2counter, (VALUE) me); + count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1; + if (POSFIXABLE(count)) { + rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count)); + } +} + VALUE rb_get_coverages(void) { return GET_VM()->coverages; } +int +rb_get_coverage_mode(void) +{ + return GET_VM()->coverage_mode; +} + void -rb_set_coverages(VALUE coverages) +rb_set_coverages(VALUE coverages, int mode, VALUE me2counter) { GET_VM()->coverages = coverages; - rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil); + GET_VM()->me2counter = me2counter; + GET_VM()->coverage_mode = mode; } -/* Make coverage arrays empty so old covered files are no longer tracked. */ -static int -reset_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) +void +rb_resume_coverages(void) { - VALUE coverage = (VALUE)val; - rb_ary_clear(coverage); - return ST_CONTINUE; + int mode = GET_VM()->coverage_mode; + VALUE me2counter = GET_VM()->me2counter; + rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); + if (mode & COVERAGE_TARGET_BRANCHES) { + rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); + } + if (mode & COVERAGE_TARGET_METHODS) { + rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG); + } } void +rb_suspend_coverages(void) +{ + rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage); + if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) { + rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage); + } + if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) { + rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage); + } +} + +/* Make coverage arrays empty so old covered files are no longer tracked. */ +void rb_reset_coverages(void) { - VALUE coverages = rb_get_coverages(); - st_foreach(rb_hash_tbl_raw(coverages), reset_coverage_i, 0); + rb_clear_coverages(); + rb_iseq_remove_coverage_all(); GET_VM()->coverages = Qfalse; - rb_remove_event_hook(update_coverage); } VALUE -rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) +rb_default_coverage(int n) +{ + VALUE coverage = rb_ary_tmp_new_fill(3); + VALUE lines = Qfalse, branches = Qfalse; + int mode = GET_VM()->coverage_mode; + + if (mode & COVERAGE_TARGET_LINES) { + lines = n > 0 ? rb_ary_tmp_new_fill(n) : rb_ary_tmp_new(0); + } + RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines); + + if (mode & COVERAGE_TARGET_BRANCHES) { + branches = rb_ary_tmp_new_fill(2); + /* internal data structures for branch coverage: + * + * { branch base node => + * [base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column, { + * branch target id => + * [target_type, target_first_lineno, target_first_column, target_last_lineno, target_last_column, target_counter_index], + * ... + * }], + * ... + * } + * + * Example: + * { NODE_CASE => + * [1, 0, 4, 3, { + * NODE_WHEN => [2, 8, 2, 9, 0], + * NODE_WHEN => [3, 8, 3, 9, 1], + * ... + * }], + * ... + * } + */ + VALUE structure = rb_hash_new(); + rb_obj_hide(structure); + RARRAY_ASET(branches, 0, structure); + /* branch execution counters */ + RARRAY_ASET(branches, 1, rb_ary_tmp_new(0)); + } + RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches); + + return coverage; +} + +static VALUE +uninterruptible_exit(VALUE v) +{ + rb_thread_t *cur_th = GET_THREAD(); + rb_ary_pop(cur_th->pending_interrupt_mask_stack); + + cur_th->pending_interrupt_queue_checked = 0; + if (!rb_threadptr_pending_interrupt_empty_p(cur_th)) { + RUBY_VM_SET_INTERRUPT(cur_th->ec); + } + return Qnil; +} + +VALUE +rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data) { VALUE interrupt_mask = rb_ident_hash_new(); rb_thread_t *cur_th = GET_THREAD(); @@ -5042,28 +5868,8 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) OBJ_FREEZE_RAW(interrupt_mask); rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask); - return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); -} + VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil); -void -ruby_kill(rb_pid_t pid, int sig) -{ - int err; - rb_thread_t *th = GET_THREAD(); - - /* - * When target pid is self, many caller assume signal will be - * delivered immediately and synchronously. - */ - { - GVL_UNLOCK_BEGIN(); - native_mutex_lock(&th->interrupt_lock); - err = kill(pid, sig); - native_cond_wait(&th->interrupt_cond, &th->interrupt_lock); - native_mutex_unlock(&th->interrupt_lock); - GVL_UNLOCK_END(); - } - if (err < 0) { - rb_sys_fail(0); - } + RUBY_VM_CHECK_INTS(cur_th->ec); + return ret; } |
