summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c3402
1 files changed, 2104 insertions, 1298 deletions
diff --git a/thread.c b/thread.c
index d74c731a82..b1b5dd0f53 100644
--- a/thread.c
+++ b/thread.c
@@ -20,6 +20,12 @@
model 3: Native Thread with fine grain lock
Using pthread and Ruby threads run concurrent or parallel.
+ model 4: M:N User:Native threads with Global VM lock
+ Combination of model 1 and 2
+
+ model 5: M:N User:Native thread with fine grain lock
+ Combination of model 1 and 3
+
------------------------------------------------------------------------
model 2:
@@ -57,13 +63,40 @@
/* for model 2 */
+#include "ruby/internal/config.h"
+
+#ifdef __linux__
+// Normally, gcc(1) translates calls to alloca() with inlined code. This is not done when either the -ansi, -std=c89, -std=c99, or the -std=c11 option is given and the header <alloca.h> is not included.
+# include <alloca.h>
+#endif
+
#include "eval_intern.h"
#include "gc.h"
-#include "timev.h"
+#include "hrtime.h"
+#include "internal.h"
+#include "internal/class.h"
+#include "internal/cont.h"
+#include "internal/error.h"
+#include "internal/hash.h"
+#include "internal/io.h"
+#include "internal/object.h"
+#include "internal/proc.h"
+#include "ruby/fiber/scheduler.h"
+#include "internal/signal.h"
+#include "internal/thread.h"
+#include "internal/time.h"
+#include "internal/warnings.h"
+#include "iseq.h"
+#include "mjit.h"
+#include "ruby/debug.h"
#include "ruby/io.h"
#include "ruby/thread.h"
#include "ruby/thread_native.h"
-#include "internal.h"
+#include "timev.h"
+#include "vm_core.h"
+#include "ractor_core.h"
+#include "vm_debug.h"
+#include "vm_sync.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -80,16 +113,37 @@ static VALUE rb_cThreadShield;
static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
-static ID id_locals;
-static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check);
-static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check);
-static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check);
-static void rb_thread_sleep_deadly_allow_spurious_wakeup(void);
-static double timeofday(void);
+enum SLEEP_FLAGS {
+ SLEEP_DEADLOCKABLE = 0x1,
+ SLEEP_SPURIOUS_CHECK = 0x2
+};
+
+#define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13
+#define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED)
+
+static inline VALUE
+rb_thread_local_storage(VALUE thread)
+{
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ rb_ivar_set(thread, idLocals, rb_hash_new());
+ RB_FL_SET_RAW(thread, THREAD_LOCAL_STORAGE_INITIALISED);
+ }
+ return rb_ivar_get(thread, idLocals);
+}
+
+static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
+static void sleep_forever(rb_thread_t *th, unsigned int fl);
+static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker);
static int rb_threadptr_dead(rb_thread_t *th);
-static void rb_check_deadlock(rb_vm_t *vm);
-static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th);
+static void rb_check_deadlock(rb_ractor_t *r);
+static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
+static const char *thread_status_name(rb_thread_t *th, int detail);
+static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
+NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
+static int consume_communication_pipe(int fd);
+static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
+void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
#define eKillSignal INT2FIX(0)
#define eTerminateSignal INT2FIX(1)
@@ -101,51 +155,28 @@ struct waiting_fd {
int fd;
};
-inline static void
-st_delete_wrap(st_table *table, st_data_t key)
-{
- st_delete(table, &key, 0);
-}
-
/********************************************************************************/
#define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
struct rb_blocking_region_buffer {
enum rb_thread_status prev_status;
- struct rb_unblock_callback oldubf;
};
-static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
- struct rb_unblock_callback *old, int fail_if_interrupted);
-static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old);
+static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted);
+static void unblock_function_clear(rb_thread_t *th);
static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
-#ifdef __ia64
-#define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \
- do{(th)->machine.register_stack_end = rb_ia64_bsp();}while(0)
-#else
-#define RB_GC_SAVE_MACHINE_REGISTER_STACK(th)
-#endif
-#define RB_GC_SAVE_MACHINE_CONTEXT(th) \
- do { \
- FLUSH_REGISTER_WINDOWS; \
- RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \
- setjmp((th)->machine.regs); \
- SET_MACHINE_STACK_END(&(th)->machine.stack_end); \
- } while (0)
-
-#define GVL_UNLOCK_BEGIN() do { \
- rb_thread_t *_th_stored = GET_THREAD(); \
- RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \
- gvl_release(_th_stored->vm);
+#define GVL_UNLOCK_BEGIN(th) do { \
+ RB_GC_SAVE_MACHINE_CONTEXT(th); \
+ gvl_release(rb_ractor_gvl(th->ractor));
-#define GVL_UNLOCK_END() \
- gvl_acquire(_th_stored->vm, _th_stored); \
- rb_thread_set_current(_th_stored); \
+#define GVL_UNLOCK_END(th) \
+ gvl_acquire(rb_ractor_gvl(th->ractor), th); \
+ rb_ractor_thread_switch(th->ractor, th); \
} while(0)
#ifdef __GNUC__
@@ -157,42 +188,76 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
#else
#define only_if_constant(expr, notconst) notconst
#endif
-#define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \
- rb_thread_t *__th = GET_THREAD(); \
+#define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \
struct rb_blocking_region_buffer __region; \
- if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
+ if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
/* always return true unless fail_if_interrupted */ \
!only_if_constant(fail_if_interrupted, TRUE)) { \
exec; \
- blocking_region_end(__th, &__region); \
+ blocking_region_end(th, &__region); \
}; \
} while(0)
-#define RUBY_VM_CHECK_INTS_BLOCKING(th) vm_check_ints_blocking(th)
-static inline void
-vm_check_ints_blocking(rb_thread_t *th)
+/*
+ * returns true if this thread was spuriously interrupted, false otherwise
+ * (e.g. hit by Thread#run or ran a Ruby-level Signal.trap handler)
+ */
+#define RUBY_VM_CHECK_INTS_BLOCKING(ec) vm_check_ints_blocking(ec)
+static inline int
+vm_check_ints_blocking(rb_execution_context_t *ec)
{
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
+
if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
- if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(th))) return;
+ if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE;
}
else {
th->pending_interrupt_queue_checked = 0;
-
- RUBY_VM_SET_INTERRUPT(th);
+ RUBY_VM_SET_INTERRUPT(ec);
}
- rb_threadptr_execute_interrupts(th, 1);
+ return rb_threadptr_execute_interrupts(th, 1);
}
-static int
-vm_living_thread_num(rb_vm_t *vm)
+int
+rb_vm_check_ints_blocking(rb_execution_context_t *ec)
{
- return (int)vm->living_thread_num;
+ return vm_check_ints_blocking(ec);
+}
+
+/*
+ * poll() is supported by many OSes, but so far Linux is the only
+ * one we know of that supports using poll() in all places select()
+ * would work.
+ */
+#if defined(HAVE_POLL)
+# if defined(__linux__)
+# define USE_POLL
+# endif
+# if defined(__FreeBSD_version) && __FreeBSD_version >= 1100000
+# define USE_POLL
+ /* FreeBSD does not set POLLOUT when POLLHUP happens */
+# define POLLERR_SET (POLLHUP | POLLERR)
+# endif
+#endif
+
+static void
+timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
+ const struct timeval *timeout)
+{
+ if (timeout) {
+ *rel = rb_timeval2hrtime(timeout);
+ *end = rb_hrtime_add(rb_hrtime_now(), *rel);
+ *to = rel;
+ }
+ else {
+ *to = 0;
+ }
}
#if THREAD_DEBUG
#ifdef HAVE_VA_ARGS_MACRO
void rb_thread_debug(const char *file, int line, const char *fmt, ...);
-#define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__)
+#define thread_debug(...) rb_thread_debug(__FILE__, __LINE__, __VA_ARGS__)
#define POSITION_FORMAT "%s:%d:"
#define POSITION_ARGS ,file, line
#else
@@ -203,8 +268,9 @@ void rb_thread_debug(const char *fmt, ...);
#endif
# ifdef NON_SCALAR_THREAD_ID
-static const char *
-fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf)
+#define fill_thread_id_string ruby_fill_thread_id_string
+const char *
+ruby_fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf)
{
extern const char ruby_digitmap[];
size_t i;
@@ -241,7 +307,7 @@ static int rb_thread_debug_enabled;
*/
static VALUE
-rb_thread_s_debug(void)
+rb_thread_s_debug(VALUE _)
{
return INT2NUM(rb_thread_debug_enabled);
}
@@ -268,21 +334,24 @@ rb_thread_s_debug_set(VALUE self, VALUE val)
#endif
#ifndef fill_thread_id_str
-# define fill_thread_id_string(thid, buf) (thid)
+# define fill_thread_id_string(thid, buf) ((void *)(uintptr_t)(thid))
# define fill_thread_id_str(th) (void)0
-# define thread_id_str(th) ((void *)(th)->thread_id)
+# define thread_id_str(th) ((void *)(uintptr_t)(th)->thread_id)
# define PRI_THREAD_ID "p"
#endif
-#ifndef __ia64
-#define thread_start_func_2(th, st, rst) thread_start_func_2(th, st)
-#endif
-NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
- VALUE *register_stack_start));
-static void timer_thread_function(void *);
+NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start));
+void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
+
+static void
+ubf_sigwait(void *ignore)
+{
+ rb_thread_wakeup_timer_thread(0);
+}
+
+#include THREAD_IMPL_SRC
#if defined(_WIN32)
-#include "thread_win32.c"
#define DEBUG_OUT() \
WaitForSingleObject(&debug_mutex, INFINITE); \
@@ -291,7 +360,6 @@ static void timer_thread_function(void *);
ReleaseMutex(&debug_mutex);
#elif defined(HAVE_PTHREAD_H)
-#include "thread_pthread.c"
#define DEBUG_OUT() \
pthread_mutex_lock(&debug_mutex); \
@@ -300,8 +368,19 @@ static void timer_thread_function(void *);
fflush(stdout); \
pthread_mutex_unlock(&debug_mutex);
-#else
-#error "unsupported thread type"
+#endif
+
+/*
+ * TODO: somebody with win32 knowledge should be able to get rid of
+ * timer-thread by busy-waiting on signals. And it should be possible
+ * to make the GVL in thread_pthread.c be platform-independent.
+ */
+#ifndef BUSY_WAIT_SIGNALS
+# define BUSY_WAIT_SIGNALS (0)
+#endif
+
+#ifndef USE_EVENTFD
+# define USE_EVENTFD (0)
#endif
#if THREAD_DEBUG
@@ -325,7 +404,7 @@ rb_thread_debug(
if (debug_mutex_initialized == 1) {
debug_mutex_initialized = 0;
- native_mutex_initialize(&debug_mutex);
+ rb_native_mutex_initialize(&debug_mutex);
}
va_start(args, fmt);
@@ -339,87 +418,88 @@ rb_thread_debug(
#include "thread_sync.c"
void
-rb_vm_gvl_destroy(rb_vm_t *vm)
+rb_vm_gvl_destroy(rb_global_vm_lock_t *gvl)
{
- gvl_release(vm);
- gvl_destroy(vm);
- native_mutex_destroy(&vm->thread_destruct_lock);
+ gvl_release(gvl);
+ gvl_destroy(gvl);
}
void
rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock)
{
- native_mutex_initialize(lock);
+ rb_native_mutex_initialize(lock);
}
void
rb_nativethread_lock_destroy(rb_nativethread_lock_t *lock)
{
- native_mutex_destroy(lock);
+ rb_native_mutex_destroy(lock);
}
void
rb_nativethread_lock_lock(rb_nativethread_lock_t *lock)
{
- native_mutex_lock(lock);
+ rb_native_mutex_lock(lock);
}
void
rb_nativethread_lock_unlock(rb_nativethread_lock_t *lock)
{
- native_mutex_unlock(lock);
+ rb_native_mutex_unlock(lock);
}
static int
-set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
- struct rb_unblock_callback *old, int fail_if_interrupted)
+unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted)
{
do {
if (fail_if_interrupted) {
- if (RUBY_VM_INTERRUPTED_ANY(th)) {
+ if (RUBY_VM_INTERRUPTED_ANY(th->ec)) {
return FALSE;
}
}
else {
- RUBY_VM_CHECK_INTS(th);
+ RUBY_VM_CHECK_INTS(th->ec);
}
- native_mutex_lock(&th->interrupt_lock);
- } while (!th->raised_flag && RUBY_VM_INTERRUPTED_ANY(th) &&
- (native_mutex_unlock(&th->interrupt_lock), TRUE));
+ rb_native_mutex_lock(&th->interrupt_lock);
+ } while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) &&
+ (rb_native_mutex_unlock(&th->interrupt_lock), TRUE));
+
+ VM_ASSERT(th->unblock.func == NULL);
- if (old) *old = th->unblock;
th->unblock.func = func;
th->unblock.arg = arg;
- native_mutex_unlock(&th->interrupt_lock);
+ rb_native_mutex_unlock(&th->interrupt_lock);
return TRUE;
}
static void
-reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old)
+unblock_function_clear(rb_thread_t *th)
{
- native_mutex_lock(&th->interrupt_lock);
- th->unblock = *old;
- native_mutex_unlock(&th->interrupt_lock);
+ rb_native_mutex_lock(&th->interrupt_lock);
+ th->unblock.func = 0;
+ rb_native_mutex_unlock(&th->interrupt_lock);
}
static void
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
{
- native_mutex_lock(&th->interrupt_lock);
- if (trap)
- RUBY_VM_SET_TRAP_INTERRUPT(th);
- else
- RUBY_VM_SET_INTERRUPT(th);
- if (th->unblock.func) {
+ rb_native_mutex_lock(&th->interrupt_lock);
+
+ if (trap) {
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ }
+ else {
+ RUBY_VM_SET_INTERRUPT(th->ec);
+ }
+ if (th->unblock.func != NULL) {
(th->unblock.func)(th->unblock.arg);
}
else {
/* none */
}
- native_cond_signal(&th->interrupt_cond);
- native_mutex_unlock(&th->interrupt_lock);
+ rb_native_mutex_unlock(&th->interrupt_lock);
}
void
@@ -428,75 +508,104 @@ rb_threadptr_interrupt(rb_thread_t *th)
rb_threadptr_interrupt_common(th, 0);
}
-void
-rb_threadptr_trap_interrupt(rb_thread_t *th)
+static void
+threadptr_trap_interrupt(rb_thread_t *th)
{
rb_threadptr_interrupt_common(th, 1);
}
static void
-terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread)
+terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
{
rb_thread_t *th = 0;
- list_for_each(&vm->living_threads, th, vmlt_node) {
- if (th != main_thread) {
- thread_debug("terminate_i: %p\n", (void *)th);
+ list_for_each(&r->threads.set, th, lt_node) {
+ if (th != main_thread) {
+ thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(th), thread_status_name(th, TRUE));
rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
rb_threadptr_interrupt(th);
+ thread_debug("terminate_all: end (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(th), thread_status_name(th, TRUE));
}
else {
- thread_debug("terminate_i: main thread (%p)\n", (void *)th);
+ thread_debug("terminate_all: main thread (%p)\n", (void *)th);
}
}
}
+static void
+rb_threadptr_join_list_wakeup(rb_thread_t *thread)
+{
+ while (thread->join_list) {
+ struct rb_waiting_list *join_list = thread->join_list;
+
+ // Consume the entry from the join list:
+ thread->join_list = join_list->next;
+
+ rb_thread_t *target_thread = join_list->thread;
+
+ if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
+ rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
+ }
+ else {
+ rb_threadptr_interrupt(target_thread);
+
+ switch (target_thread->status) {
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ target_thread->status = THREAD_RUNNABLE;
+ default:
+ break;
+ }
+ }
+ }
+}
+
void
rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
{
- const char *err;
- rb_mutex_t *mutex;
- rb_mutex_t *mutexes = th->keeping_mutexes;
+ while (th->keeping_mutexes) {
+ rb_mutex_t *mutex = th->keeping_mutexes;
+ th->keeping_mutexes = mutex->next_mutex;
- while (mutexes) {
- mutex = mutexes;
- /* rb_warn("mutex #<%p> remains to be locked by terminated thread",
- mutexes); */
- mutexes = mutex->next_mutex;
- err = rb_mutex_unlock_th(mutex, th);
- if (err) rb_bug("invalid keeping_mutexes: %s", err);
+ /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */
+
+ const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
+ if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
}
}
void
-rb_thread_terminate_all(void)
+rb_thread_terminate_all(rb_thread_t *th)
{
- rb_thread_t *volatile th = GET_THREAD(); /* main thread */
- rb_vm_t *volatile vm = th->vm;
+ rb_ractor_t *cr = th->ractor;
+ rb_execution_context_t * volatile ec = th->ec;
volatile int sleeping = 0;
- if (vm->main_thread != th) {
- rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
- (void *)vm->main_thread, (void *)th);
+ if (cr->threads.main != th) {
+ rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
+ (void *)cr->threads.main, (void *)th);
}
/* unlock all locking mutexes */
rb_threadptr_unlock_all_locking_mutexes(th);
- TH_PUSH_TAG(th);
- if (TH_EXEC_TAG() == 0) {
+ EC_PUSH_TAG(ec);
+ if (EC_EXEC_TAG() == TAG_NONE) {
retry:
thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
- terminate_all(vm, th);
+ terminate_all(cr, th);
- while (vm_living_thread_num(vm) > 1) {
- /*
+ while (rb_ractor_living_thread_num(cr) > 1) {
+ rb_hrtime_t rel = RB_HRTIME_PER_SEC;
+ /*q
* Thread exiting routine in thread_start_func_2 notify
* me when the last sub-thread exit.
*/
sleeping = 1;
- native_sleep(th, 0);
- RUBY_VM_CHECK_INTS_BLOCKING(th);
+ native_sleep(th, &rel);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
sleeping = 0;
}
}
@@ -511,18 +620,21 @@ rb_thread_terminate_all(void)
goto retry;
}
}
- TH_POP_TAG();
+ EC_POP_TAG();
}
+void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
+
static void
thread_cleanup_func_before_exec(void *th_ptr)
{
rb_thread_t *th = th_ptr;
th->status = THREAD_KILLED;
- th->machine.stack_start = th->machine.stack_end = 0;
-#ifdef __ia64
- th->machine.register_stack_start = th->machine.register_stack_end = 0;
-#endif
+
+ // The thread stack doesn't exist in the forked process:
+ th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
+
+ rb_threadptr_root_fiber_terminate(th);
}
static void
@@ -537,16 +649,20 @@ thread_cleanup_func(void *th_ptr, int atfork)
* Unfortunately, we can't release native threading resource at fork
* because libc may have unstable locking state therefore touching
* a threading resource may cause a deadlock.
+ *
+ * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe
+ * with NPTL, but native_thread_destroy calls pthread_cond_destroy
+ * which calls free(3), so there is a small memory leak atfork, here.
*/
if (atfork)
return;
- native_mutex_destroy(&th->interrupt_lock);
+ rb_native_mutex_destroy(&th->interrupt_lock);
native_thread_destroy(th);
}
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
-static VALUE rb_thread_inspect(VALUE thread);
+static VALUE rb_thread_to_s(VALUE thread);
void
ruby_thread_init_stack(rb_thread_t *th)
@@ -567,157 +683,305 @@ rb_vm_proc_local_ep(VALUE proc)
}
}
+// for ractor, defined in vm.c
+VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
+ int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
+
+static VALUE
+thread_do_start_proc(rb_thread_t *th)
+{
+ VALUE args = th->invoke_arg.proc.args;
+ const VALUE *args_ptr;
+ int args_len;
+ VALUE procval = th->invoke_arg.proc.proc;
+ rb_proc_t *proc;
+ GetProcPtr(procval, proc);
+
+ th->ec->errinfo = Qnil;
+ th->ec->root_lep = rb_vm_proc_local_ep(procval);
+ th->ec->root_svar = Qfalse;
+
+ vm_check_ints_blocking(th->ec);
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ VALUE self = rb_ractor_self(th->ractor);
+ VM_ASSERT(FIXNUM_P(args));
+ args_len = FIX2INT(args);
+ args_ptr = ALLOCA_N(VALUE, args_len);
+ rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
+ vm_check_ints_blocking(th->ec);
+
+ return rb_vm_invoke_proc_with_self(
+ th->ec, proc, self,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE
+ );
+ }
+ else {
+ args_len = RARRAY_LENINT(args);
+ if (args_len < 8) {
+ /* free proc.args if the length is enough small */
+ args_ptr = ALLOCA_N(VALUE, args_len);
+ MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len);
+ th->invoke_arg.proc.args = Qnil;
+ }
+ else {
+ args_ptr = RARRAY_CONST_PTR(args);
+ }
+
+ vm_check_ints_blocking(th->ec);
+
+ return rb_vm_invoke_proc(
+ th->ec, proc,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE
+ );
+ }
+}
+
static void
-thread_do_start(rb_thread_t *th, VALUE args)
+thread_do_start(rb_thread_t *th)
{
native_set_thread_name(th);
- if (!th->first_func) {
- rb_proc_t *proc;
- GetProcPtr(th->first_proc, proc);
- th->errinfo = Qnil;
- th->root_lep = rb_vm_proc_local_ep(th->first_proc);
- th->root_svar = Qfalse;
- EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
- th->value = rb_vm_invoke_proc(th, proc,
- (int)RARRAY_LEN(args), RARRAY_CONST_PTR(args),
- VM_BLOCK_HANDLER_NONE);
- EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
- }
- else {
- th->value = (*th->first_func)((void *)args);
+ VALUE result = Qundef;
+
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
+
+ switch (th->invoke_type) {
+ case thread_invoke_type_proc:
+ result = thread_do_start_proc(th);
+ break;
+
+ case thread_invoke_type_ractor_proc:
+ result = thread_do_start_proc(th);
+ rb_ractor_atexit(th->ec, result);
+ break;
+
+ case thread_invoke_type_func:
+ result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
+ break;
+
+ case thread_invoke_type_none:
+ rb_bug("unreachable");
}
+
+ rb_fiber_scheduler_set(Qnil);
+
+ th->value = result;
+
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
}
+void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
+
static int
-thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)
+thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{
- int state;
- VALUE args = th->first_args;
- rb_thread_list_t *join_list;
- rb_thread_t *main_th;
+ STACK_GROW_DIR_DETECTION;
+ enum ruby_tag_type state;
VALUE errinfo = Qnil;
-# ifdef USE_SIGALTSTACK
- void rb_register_sigaltstack(rb_thread_t *th);
+ size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
+ rb_thread_t *ractor_main_th = th->ractor->threads.main;
+ VALUE * vm_stack = NULL;
- rb_register_sigaltstack(th);
-# endif
-
- if (th == th->vm->main_thread)
- rb_bug("thread_start_func_2 must not be used for main thread");
+ VM_ASSERT(th != th->vm->ractor.main_thread);
+ thread_debug("thread start: %p\n", (void *)th);
+ // setup native thread
+ gvl_acquire(rb_ractor_gvl(th->ractor), th);
ruby_thread_set_native(th);
- th->machine.stack_start = stack_start;
-#ifdef __ia64
- th->machine.register_stack_start = register_stack_start;
-#endif
- thread_debug("thread start: %p\n", (void *)th);
+ // setup ractor
+ if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
+ RB_VM_LOCK();
+ {
+ rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__);
+ rb_ractor_t *r = th->ractor;
+ r->r_stdin = rb_io_prep_stdin();
+ r->r_stdout = rb_io_prep_stdout();
+ r->r_stderr = rb_io_prep_stderr();
+ }
+ RB_VM_UNLOCK();
+ }
- gvl_acquire(th->vm, th);
- {
- thread_debug("thread start (get lock): %p\n", (void *)th);
- rb_thread_set_current(th);
+ // This assertion is not passed on win32 env. Check it later.
+ // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
- TH_PUSH_TAG(th);
- if ((state = EXEC_TAG()) == 0) {
- SAVE_ROOT_JMPBUF(th, thread_do_start(th, args));
- }
- else {
- errinfo = th->errinfo;
- if (state == TAG_FATAL) {
- /* fatal error within this thread, need to stop whole script */
- }
- else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
- /* exit on main_thread. */
- }
- else {
- if (th->report_on_exception) {
- VALUE mesg = rb_thread_inspect(th->self);
- rb_str_cat_cstr(mesg, " terminated with exception:\n");
- rb_write_error_str(mesg);
- rb_threadptr_error_print(th, errinfo);
- }
- if (th->vm->thread_abort_on_exception ||
- th->abort_on_exception || RTEST(ruby_debug)) {
- /* exit on main_thread */
- }
- else {
- errinfo = Qnil;
- }
- }
- th->value = Qnil;
- }
+ // setup VM and machine stack
+ vm_stack = alloca(size * sizeof(VALUE));
+ VM_ASSERT(vm_stack);
- th->status = THREAD_KILLED;
- thread_debug("thread end: %p\n", (void *)th);
+ rb_ec_initialize_vm_stack(th->ec, vm_stack, size);
+ th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
+ th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
- main_th = th->vm->main_thread;
- if (main_th == th) {
- ruby_stop(0);
- }
- if (RB_TYPE_P(errinfo, T_OBJECT)) {
- /* treat with normal error object */
- rb_threadptr_raise(main_th, 1, &errinfo);
- }
- TH_POP_TAG();
+ thread_debug("thread start (get lock): %p\n", (void *)th);
- /* locking_mutex must be Qfalse */
- if (th->locking_mutex != Qfalse) {
- rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
- (void *)th, th->locking_mutex);
- }
+ // Ensure that we are not joinable.
+ VM_ASSERT(th->value == Qundef);
- /* delete self other than main thread from living_threads */
- rb_vm_living_threads_remove(th->vm, th);
- if (main_th->status == THREAD_KILLED && rb_thread_alone()) {
- /* I'm last thread. wake up main thread from rb_thread_terminate_all */
- rb_threadptr_interrupt(main_th);
- }
+ EC_PUSH_TAG(th->ec);
- /* wake up joining threads */
- join_list = th->join_list;
- while (join_list) {
- rb_threadptr_interrupt(join_list->th);
- switch (join_list->th->status) {
- case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
- join_list->th->status = THREAD_RUNNABLE;
- default: break;
- }
- join_list = join_list->next;
- }
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ SAVE_ROOT_JMPBUF(th, thread_do_start(th));
+ }
+ else {
+ errinfo = th->ec->errinfo;
+
+ VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef);
+ if (!NIL_P(exc)) errinfo = exc;
+
+ if (state == TAG_FATAL) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit(th->ec, Qnil);
+ }
+ /* fatal error within this thread, need to stop whole script */
+ }
+ else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
+ /* exit on main_thread. */
+ }
+ else {
+ if (th->report_on_exception) {
+ VALUE mesg = rb_thread_to_s(th->self);
+ rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
+ rb_write_error_str(mesg);
+ rb_ec_error_print(th->ec, errinfo);
+ }
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
+
+ if (th->vm->thread_abort_on_exception ||
+ th->abort_on_exception || RTEST(ruby_debug)) {
+ /* exit on main_thread */
+ }
+ else {
+ errinfo = Qnil;
+ }
+ }
+ th->value = Qnil;
+ }
+
+ // The thread is effectively finished and can be joined.
+ VM_ASSERT(th->value != Qundef);
+
+ rb_threadptr_join_list_wakeup(th);
+ rb_threadptr_unlock_all_locking_mutexes(th);
- rb_threadptr_unlock_all_locking_mutexes(th);
- rb_check_deadlock(th->vm);
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_thread_terminate_all(th);
+ rb_ractor_teardown(th->ec);
+ }
+
+ th->status = THREAD_KILLED;
+ thread_debug("thread end: %p\n", (void *)th);
- rb_thread_recycle_stack_release(th->stack);
- th->stack = NULL;
+ if (th->vm->ractor.main_thread == th) {
+ ruby_stop(0);
}
- native_mutex_lock(&th->vm->thread_destruct_lock);
- /* make sure vm->running_thread never point me after this point.*/
- th->vm->running_thread = NULL;
- native_mutex_unlock(&th->vm->thread_destruct_lock);
+
+ if (RB_TYPE_P(errinfo, T_OBJECT)) {
+ /* treat with normal error object */
+ rb_threadptr_raise(ractor_main_th, 1, &errinfo);
+ }
+
+ EC_POP_TAG();
+
+ rb_ec_clear_current_thread_trace_func(th->ec);
+
+ /* locking_mutex must be Qfalse */
+ if (th->locking_mutex != Qfalse) {
+ rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
+ (void *)th, th->locking_mutex);
+ }
+
+ if (ractor_main_th->status == THREAD_KILLED &&
+ th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
+ /* I'm last thread. wake up main thread from rb_thread_terminate_all */
+ rb_threadptr_interrupt(ractor_main_th);
+ }
+
+ rb_check_deadlock(th->ractor);
+
+ rb_fiber_close(th->ec->fiber_ptr);
+
thread_cleanup_func(th, FALSE);
- gvl_release(th->vm);
+ VM_ASSERT(th->ec->vm_stack == NULL);
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ // after rb_ractor_living_threads_remove()
+ // GC will happen anytime and this ractor can be collected (and destroy GVL).
+ // So gvl_release() should be before it.
+ gvl_release(rb_ractor_gvl(th->ractor));
+ rb_ractor_living_threads_remove(th->ractor, th);
+ }
+ else {
+ rb_ractor_living_threads_remove(th->ractor, th);
+ gvl_release(rb_ractor_gvl(th->ractor));
+ }
return 0;
}
+struct thread_create_params {
+ enum thread_invoke_type type;
+
+ // for normal proc thread
+ VALUE args;
+ VALUE proc;
+
+ // for ractor
+ rb_ractor_t *g;
+
+ // for func
+ VALUE (*fn)(void *);
+};
+
static VALUE
-thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
+thread_create_core(VALUE thval, struct thread_create_params *params)
{
- rb_thread_t *th, *current_th = GET_THREAD();
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
int err;
if (OBJ_FROZEN(current_th->thgroup)) {
rb_raise(rb_eThreadError,
"can't start a new thread (frozen ThreadGroup)");
}
- GetThreadPtr(thval, th);
- /* setup thread environment */
- th->first_func = fn;
- th->first_proc = fn ? Qfalse : rb_block_proc();
- th->first_args = args; /* GC: shouldn't put before above line */
+ switch (params->type) {
+ case thread_invoke_type_proc:
+ th->invoke_type = thread_invoke_type_proc;
+ th->invoke_arg.proc.args = params->args;
+ th->invoke_arg.proc.proc = params->proc;
+ th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
+ break;
+
+ case thread_invoke_type_ractor_proc:
+#if RACTOR_CHECK_MODE > 0
+ rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g));
+#endif
+ th->invoke_type = thread_invoke_type_ractor_proc;
+ th->ractor = params->g;
+ th->ractor->threads.main = th;
+ th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc);
+ th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
+ th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
+ rb_ractor_send_parameters(ec, params->g, params->args);
+ break;
+
+ case thread_invoke_type_func:
+ th->invoke_type = thread_invoke_type_func;
+ th->invoke_arg.func.func = params->fn;
+ th->invoke_arg.func.arg = (void *)params->args;
+ break;
+
+ default:
+ rb_bug("unreachable");
+ }
th->priority = current_th->priority;
th->thgroup = current_th->thgroup;
@@ -727,23 +991,23 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack);
- th->interrupt_mask = 0;
+ rb_native_mutex_initialize(&th->interrupt_lock);
+
+ RUBY_DEBUG_LOG("r:%u th:%p", rb_ractor_id(th->ractor), (void *)th);
- native_mutex_initialize(&th->interrupt_lock);
- native_cond_initialize(&th->interrupt_cond, RB_CONDATTR_CLOCK_MONOTONIC);
- th->report_on_exception = th->vm->thread_report_on_exception;
+ rb_ractor_living_threads_insert(th->ractor, th);
/* kick thread */
err = native_thread_create(th);
if (err) {
th->status = THREAD_KILLED;
- rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
+ rb_ractor_living_threads_remove(th->ractor, th);
+ rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
}
- rb_vm_living_threads_insert(th->vm, th);
return thval;
}
-#define threadptr_initialized(th) ((th)->first_args != 0)
+#define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none)
/*
* call-seq:
@@ -771,11 +1035,12 @@ thread_s_new(int argc, VALUE *argv, VALUE klass)
rb_thread_t *th;
VALUE thread = rb_thread_alloc(klass);
- if (GET_VM()->main_thread->status == THREAD_KILLED)
- rb_raise(rb_eThreadError, "can't alloc thread");
+ if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
+ rb_raise(rb_eThreadError, "can't alloc thread");
+ }
- rb_obj_call_init(thread, argc, argv);
- GetThreadPtr(thread, th);
+ rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
+ th = rb_thread_ptr(thread);
if (!threadptr_initialized(th)) {
rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'",
klass);
@@ -796,145 +1061,218 @@ thread_s_new(int argc, VALUE *argv, VALUE klass)
static VALUE
thread_start(VALUE klass, VALUE args)
{
- return thread_create_core(rb_thread_alloc(klass), args, 0);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_proc,
+ .args = args,
+ .proc = rb_block_proc(),
+ };
+ return thread_create_core(rb_thread_alloc(klass), &params);
+}
+
+static VALUE
+threadptr_invoke_proc_location(rb_thread_t *th)
+{
+ if (th->invoke_type == thread_invoke_type_proc) {
+ return rb_proc_location(th->invoke_arg.proc.proc);
+ }
+ else {
+ return Qnil;
+ }
}
/* :nodoc: */
static VALUE
thread_initialize(VALUE thread, VALUE args)
{
- rb_thread_t *th;
+ rb_thread_t *th = rb_thread_ptr(thread);
+
if (!rb_block_given_p()) {
- rb_raise(rb_eThreadError, "must be called with a block");
- }
- GetThreadPtr(thread, th);
- if (th->first_args) {
- VALUE proc = th->first_proc, loc;
- if (!proc || !RTEST(loc = rb_proc_location(proc))) {
+ rb_raise(rb_eThreadError, "must be called with a block");
+ }
+ else if (th->invoke_type != thread_invoke_type_none) {
+ VALUE loc = threadptr_invoke_proc_location(th);
+ if (!NIL_P(loc)) {
+ rb_raise(rb_eThreadError,
+ "already initialized thread - %"PRIsVALUE":%"PRIsVALUE,
+ RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
+ }
+ else {
rb_raise(rb_eThreadError, "already initialized thread");
}
- rb_raise(rb_eThreadError,
- "already initialized thread - %"PRIsVALUE":%"PRIsVALUE,
- RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
}
- return thread_create_core(thread, args, 0);
+ else {
+ struct thread_create_params params = {
+ .type = thread_invoke_type_proc,
+ .args = args,
+ .proc = rb_block_proc(),
+ };
+ return thread_create_core(thread, &params);
+ }
}
VALUE
-rb_thread_create(VALUE (*fn)(ANYARGS), void *arg)
+rb_thread_create(VALUE (*fn)(void *), void *arg)
{
- return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_func,
+ .fn = fn,
+ .args = (VALUE)arg,
+ };
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
}
+VALUE
+rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
+{
+ struct thread_create_params params = {
+ .type = thread_invoke_type_ractor_proc,
+ .g = g,
+ .args = args,
+ .proc = proc,
+ };
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
+}
-/* +infty, for this purpose */
-#define DELAY_INFTY 1E30
struct join_arg {
- rb_thread_t *target, *waiting;
- double delay;
+ struct rb_waiting_list *waiter;
+ rb_thread_t *target;
+ VALUE timeout;
+ rb_hrtime_t *limit;
};
static VALUE
remove_from_join_list(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
- rb_thread_t *target_th = p->target, *th = p->waiting;
+ rb_thread_t *target_thread = p->target;
- if (target_th->status != THREAD_KILLED) {
- rb_thread_list_t **p = &target_th->join_list;
+ if (target_thread->status != THREAD_KILLED) {
+ struct rb_waiting_list **join_list = &target_thread->join_list;
- while (*p) {
- if ((*p)->th == th) {
- *p = (*p)->next;
- break;
- }
- p = &(*p)->next;
- }
+ while (*join_list) {
+ if (*join_list == p->waiter) {
+ *join_list = (*join_list)->next;
+ break;
+ }
+
+ join_list = &(*join_list)->next;
+ }
}
return Qnil;
}
+static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
+
+static int
+thread_finished(rb_thread_t *th)
+{
+ return th->status == THREAD_KILLED || th->value != Qundef;
+}
+
static VALUE
thread_join_sleep(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
- rb_thread_t *target_th = p->target, *th = p->waiting;
- const int forever = p->delay == DELAY_INFTY;
- const double limit = forever ? 0 : timeofday() + p->delay;
+ rb_thread_t *target_th = p->target, *th = p->waiter->thread;
+ rb_hrtime_t end = 0, *limit = p->limit;
- while (target_th->status != THREAD_KILLED) {
- if (forever) {
- sleep_forever(th, TRUE, FALSE);
- }
- else {
- double now = timeofday();
- if (now > limit) {
- thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
- thread_id_str(target_th));
- return Qfalse;
- }
- sleep_wait_for_interrupt(th, limit - now, 0);
- }
- thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID")\n",
- thread_id_str(target_th));
+ if (limit) {
+ end = rb_hrtime_add(*limit, rb_hrtime_now());
+ }
+
+ while (!thread_finished(target_th)) {
+ VALUE scheduler = rb_fiber_scheduler_current();
+
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout);
+ }
+ else if (!limit) {
+ th->status = THREAD_STOPPED_FOREVER;
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
+ native_sleep(th, 0);
+ rb_ractor_sleeper_threads_dec(th->ractor);
+ }
+ else {
+ if (hrtime_update_expire(limit, end)) {
+ thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
+ thread_id_str(target_th));
+ return Qfalse;
+ }
+ th->status = THREAD_STOPPED;
+ native_sleep(th, limit);
+ }
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ th->status = THREAD_RUNNABLE;
+ thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
}
return Qtrue;
}
static VALUE
-thread_join(rb_thread_t *target_th, double delay)
+thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
{
- rb_thread_t *th = GET_THREAD();
- struct join_arg arg;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = ec->thread_ptr;
+ rb_fiber_t *fiber = ec->fiber_ptr;
if (th == target_th) {
- rb_raise(rb_eThreadError, "Target thread must not be current thread");
- }
- if (GET_VM()->main_thread == target_th) {
- rb_raise(rb_eThreadError, "Target thread must not be main thread");
+ rb_raise(rb_eThreadError, "Target thread must not be current thread");
}
- arg.target = target_th;
- arg.waiting = th;
- arg.delay = delay;
+ if (th->ractor->threads.main == target_th) {
+ rb_raise(rb_eThreadError, "Target thread must not be main thread");
+ }
- thread_debug("thread_join (thid: %"PRI_THREAD_ID")\n", thread_id_str(target_th));
+ thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
if (target_th->status != THREAD_KILLED) {
- rb_thread_list_t list;
- list.next = target_th->join_list;
- list.th = th;
- target_th->join_list = &list;
- if (!rb_ensure(thread_join_sleep, (VALUE)&arg,
- remove_from_join_list, (VALUE)&arg)) {
- return Qnil;
- }
+ struct rb_waiting_list waiter;
+ waiter.next = target_th->join_list;
+ waiter.thread = th;
+ waiter.fiber = fiber;
+ target_th->join_list = &waiter;
+
+ struct join_arg arg;
+ arg.waiter = &waiter;
+ arg.target = target_th;
+ arg.timeout = timeout;
+ arg.limit = limit;
+
+ if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) {
+ return Qnil;
+ }
}
- thread_debug("thread_join: success (thid: %"PRI_THREAD_ID")\n",
- thread_id_str(target_th));
+ thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
- if (target_th->errinfo != Qnil) {
- VALUE err = target_th->errinfo;
+ if (target_th->ec->errinfo != Qnil) {
+ VALUE err = target_th->ec->errinfo;
- if (FIXNUM_P(err)) {
- switch (err) {
- case INT2FIX(TAG_FATAL):
- /* OK. killed. */
- break;
- default:
- rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
- }
- }
- else if (THROW_DATA_P(target_th->errinfo)) {
- rb_bug("thread_join: THROW_DATA should not reach here.");
- }
- else {
- /* normal exception */
- rb_exc_raise(err);
- }
+ if (FIXNUM_P(err)) {
+ switch (err) {
+ case INT2FIX(TAG_FATAL):
+ thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n",
+ thread_id_str(target_th), thread_status_name(target_th, TRUE));
+
+ /* OK. killed. */
+ break;
+ default:
+ rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
+ }
+ }
+ else if (THROW_DATA_P(target_th->ec->errinfo)) {
+ rb_bug("thread_join: THROW_DATA should not reach here.");
+ }
+ else {
+ /* normal exception */
+ rb_exc_raise(err);
+ }
}
return target_th->self;
}
@@ -981,18 +1319,30 @@ thread_join(rb_thread_t *target_th, double delay)
static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self)
{
- rb_thread_t *target_th;
- double delay = DELAY_INFTY;
- VALUE limit;
+ VALUE timeout = Qnil;
+ rb_hrtime_t rel = 0, *limit = 0;
- GetThreadPtr(self, target_th);
+ if (rb_check_arity(argc, 0, 1)) {
+ timeout = argv[0];
+ }
- rb_scan_args(argc, argv, "01", &limit);
- if (!NIL_P(limit)) {
- delay = rb_num2dbl(limit);
+ // Convert the timeout eagerly, so it's always converted and deterministic
+ /*
+ * This supports INFINITY and negative values, so we can't use
+ * rb_time_interval right now...
+ */
+ if (NIL_P(timeout)) {
+ /* unlimited */
+ }
+ else if (FIXNUM_P(timeout)) {
+ rel = rb_sec2hrtime(NUM2TIMET(timeout));
+ limit = &rel;
+ }
+ else {
+ limit = double2hrtime(&rel, rb_num2dbl(timeout));
}
- return thread_join(target_th, delay);
+ return thread_join(rb_thread_ptr(self), timeout, limit);
}
/*
@@ -1012,9 +1362,12 @@ thread_join_m(int argc, VALUE *argv, VALUE self)
static VALUE
thread_value(VALUE self)
{
- rb_thread_t *th;
- GetThreadPtr(self, th);
- thread_join(th, DELAY_INFTY);
+ rb_thread_t *th = rb_thread_ptr(self);
+ thread_join(th, Qnil, 0);
+ if (th->value == Qundef) {
+ // If the thread is dead because we forked th->value is still Qundef.
+ return Qnil;
+ }
return th->value;
}
@@ -1023,186 +1376,190 @@ thread_value(VALUE self)
*/
/*
- * The type of tv_sec in struct timeval is time_t in POSIX.
- * But several systems violate POSIX.
- *
- * OpenBSD 5.2 (amd64):
- * time_t: int (signed 32bit integer)
- * tv_sec: long (signed 64bit integer)
- *
- * MinGW-w64 (x64):
- * time_t: long long (signed 64bit integer)
- * tv_sec: long (signed 32bit integer)
+ * Back when we used "struct timeval", not all platforms implemented
+ * tv_sec as time_t. Nowadays we use "struct timespec" and tv_sec
+ * seems to be implemented more consistently across platforms.
+ * At least other parts of our code hasn't had to deal with non-time_t
+ * tv_sec in timespec...
*/
-
-#if SIGNEDNESS_OF_TIME_T < 0 /* signed */
-# define TIMEVAL_SEC_MAX SIGNED_INTEGER_MAX(TYPEOF_TIMEVAL_TV_SEC)
-# define TIMEVAL_SEC_MIN SIGNED_INTEGER_MIN(TYPEOF_TIMEVAL_TV_SEC)
-#elif SIGNEDNESS_OF_TIME_T > 0 /* unsigned */
-# define TIMEVAL_SEC_MAX ((TYPEOF_TIMEVAL_TV_SEC)(~(unsigned_time_t)0))
-# define TIMEVAL_SEC_MIN ((TYPEOF_TIMEVAL_TV_SEC)0)
+#define TIMESPEC_SEC_MAX TIMET_MAX
+#define TIMESPEC_SEC_MIN TIMET_MIN
+
+COMPILER_WARNING_PUSH
+#if __has_warning("-Wimplicit-int-float-conversion")
+COMPILER_WARNING_IGNORED(-Wimplicit-int-float-conversion)
+#elif defined(_MSC_VER)
+/* C4305: 'initializing': truncation from '__int64' to 'const double' */
+COMPILER_WARNING_IGNORED(4305)
#endif
+static const double TIMESPEC_SEC_MAX_as_double = TIMESPEC_SEC_MAX;
+COMPILER_WARNING_POP
-static struct timeval
-double2timeval(double d)
+static rb_hrtime_t *
+double2hrtime(rb_hrtime_t *hrt, double d)
{
- /* assume timeval.tv_sec has same signedness as time_t */
- const double TIMEVAL_SEC_MAX_PLUS_ONE = (2*(double)(TIMEVAL_SEC_MAX/2+1));
-
- struct timeval time;
+ /* assume timespec.tv_sec has same signedness as time_t */
+ const double TIMESPEC_SEC_MAX_PLUS_ONE = 2.0 * (TIMESPEC_SEC_MAX_as_double / 2.0 + 1.0);
- if (TIMEVAL_SEC_MAX_PLUS_ONE <= d) {
- time.tv_sec = TIMEVAL_SEC_MAX;
- time.tv_usec = 999999;
+ if (TIMESPEC_SEC_MAX_PLUS_ONE <= d) {
+ return NULL;
}
- else if (d <= TIMEVAL_SEC_MIN) {
- time.tv_sec = TIMEVAL_SEC_MIN;
- time.tv_usec = 0;
+ else if (d <= 0) {
+ *hrt = 0;
}
else {
- time.tv_sec = (TYPEOF_TIMEVAL_TV_SEC)d;
- time.tv_usec = (int)((d - (time_t)d) * 1e6);
- if (time.tv_usec < 0) {
- time.tv_usec += (int)1e6;
- time.tv_sec -= 1;
- }
+ *hrt = (rb_hrtime_t)(d * (double)RB_HRTIME_PER_SEC);
}
- return time;
+ return hrt;
}
static void
-sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check)
+getclockofday(struct timespec *ts)
+{
+#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
+ if (clock_gettime(CLOCK_MONOTONIC, ts) == 0)
+ return;
+#endif
+ rb_timespec_now(ts);
+}
+
+/*
+ * Don't inline this, since library call is already time consuming
+ * and we don't want "struct timespec" on stack too long for GC
+ */
+NOINLINE(rb_hrtime_t rb_hrtime_now(void));
+rb_hrtime_t
+rb_hrtime_now(void)
+{
+ struct timespec ts;
+
+ getclockofday(&ts);
+ return rb_timespec2hrtime(&ts);
+}
+
+static void
+sleep_forever(rb_thread_t *th, unsigned int fl)
{
enum rb_thread_status prev_status = th->status;
- enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
+ enum rb_thread_status status;
+ int woke;
+ status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
th->status = status;
- RUBY_VM_CHECK_INTS_BLOCKING(th);
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
while (th->status == status) {
- if (deadlockable) {
- th->vm->sleeper++;
- rb_check_deadlock(th->vm);
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
}
native_sleep(th, 0);
- if (deadlockable) {
- th->vm->sleeper--;
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_dec(th->ractor);
}
- RUBY_VM_CHECK_INTS_BLOCKING(th);
- if (!spurious_check)
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
break;
}
th->status = prev_status;
}
-static void
-getclockofday(struct timeval *tp)
+/*
+ * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end"
+ * being uninitialized, maybe other versions, too.
+ */
+COMPILER_WARNING_PUSH
+#if defined(__GNUC__) && __GNUC__ == 7 && __GNUC_MINOR__ <= 3
+COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized)
+#endif
+#ifndef PRIu64
+#define PRIu64 PRI_64_PREFIX "u"
+#endif
+/*
+ * @end is the absolute time when @ts is set to expire
+ * Returns true if @end has past
+ * Updates @ts and returns false otherwise
+ */
+static int
+hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end)
{
-#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
- struct timespec ts;
+ rb_hrtime_t now = rb_hrtime_now();
- if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
- tp->tv_sec = ts.tv_sec;
- tp->tv_usec = (int)(ts.tv_nsec / 1000);
- }
- else
-#endif
- {
- gettimeofday(tp, NULL);
- }
+ if (now > end) return 1;
+ thread_debug("hrtime_update_expire: "
+ "%"PRIu64" > %"PRIu64"\n",
+ (uint64_t)end, (uint64_t)now);
+ *timeout = end - now;
+ return 0;
}
+COMPILER_WARNING_POP
-static void
-sleep_timeval(rb_thread_t *th, struct timeval tv, int spurious_check)
+static int
+sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
{
- struct timeval to, tvn;
enum rb_thread_status prev_status = th->status;
-
- getclockofday(&to);
- if (TIMEVAL_SEC_MAX - tv.tv_sec < to.tv_sec)
- to.tv_sec = TIMEVAL_SEC_MAX;
- else
- to.tv_sec += tv.tv_sec;
- if ((to.tv_usec += tv.tv_usec) >= 1000000) {
- if (to.tv_sec == TIMEVAL_SEC_MAX)
- to.tv_usec = 999999;
- else {
- to.tv_sec++;
- to.tv_usec -= 1000000;
- }
- }
+ int woke;
+ rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), rel);
th->status = THREAD_STOPPED;
- RUBY_VM_CHECK_INTS_BLOCKING(th);
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
while (th->status == THREAD_STOPPED) {
- native_sleep(th, &tv);
- RUBY_VM_CHECK_INTS_BLOCKING(th);
- getclockofday(&tvn);
- if (to.tv_sec < tvn.tv_sec) break;
- if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
- thread_debug("sleep_timeval: %"PRI_TIMET_PREFIX"d.%.6ld > %"PRI_TIMET_PREFIX"d.%.6ld\n",
- (time_t)to.tv_sec, (long)to.tv_usec,
- (time_t)tvn.tv_sec, (long)tvn.tv_usec);
- tv.tv_sec = to.tv_sec - tvn.tv_sec;
- if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) {
- --tv.tv_sec;
- tv.tv_usec += 1000000;
- }
- if (!spurious_check)
+ native_sleep(th, &rel);
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
+ break;
+ if (hrtime_update_expire(&rel, end))
break;
+ woke = 1;
}
th->status = prev_status;
+ return woke;
}
void
rb_thread_sleep_forever(void)
{
thread_debug("rb_thread_sleep_forever\n");
- sleep_forever(GET_THREAD(), FALSE, TRUE);
+ sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
}
void
rb_thread_sleep_deadly(void)
{
thread_debug("rb_thread_sleep_deadly\n");
- sleep_forever(GET_THREAD(), TRUE, TRUE);
+ sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
}
-static void
-rb_thread_sleep_deadly_allow_spurious_wakeup(void)
-{
- thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
- sleep_forever(GET_THREAD(), TRUE, FALSE);
-}
-
-static double
-timeofday(void)
+void
+rb_thread_sleep_interruptible(void)
{
-#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
- struct timespec tp;
+ rb_thread_t *th = GET_THREAD();
+ enum rb_thread_status prev_status = th->status;
- if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) {
- return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9;
- }
- else
-#endif
- {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
- }
+ th->status = THREAD_STOPPED;
+ native_sleep(th, 0);
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ th->status = prev_status;
}
static void
-sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check)
+rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
{
- sleep_timeval(th, double2timeval(sleepsec), spurious_check);
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_block(scheduler, blocker, Qnil);
+ }
+ else {
+ thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
+ sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ }
}
void
rb_thread_wait_for(struct timeval time)
{
rb_thread_t *th = GET_THREAD();
- sleep_timeval(th, time, 1);
+
+ sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK);
}
/*
@@ -1215,8 +1572,7 @@ rb_thread_wait_for(struct timeval time)
void
rb_thread_check_ints(void)
{
- rb_thread_t *th = GET_THREAD();
- RUBY_VM_CHECK_INTS_BLOCKING(th);
+ RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
}
/*
@@ -1233,9 +1589,7 @@ rb_thread_check_trap_pending(void)
int
rb_thread_interrupted(VALUE thval)
{
- rb_thread_t *th;
- GetThreadPtr(thval, th);
- return (int)RUBY_VM_INTERRUPTED(th);
+ return (int)RUBY_VM_INTERRUPTED(rb_thread_ptr(thval)->ec);
}
void
@@ -1245,7 +1599,7 @@ rb_thread_sleep(int sec)
}
static void
-rb_thread_schedule_limits(unsigned long limits_us)
+rb_thread_schedule_limits(uint32_t limits_us)
{
thread_debug("rb_thread_schedule\n");
if (!rb_thread_alone()) {
@@ -1254,8 +1608,8 @@ rb_thread_schedule_limits(unsigned long limits_us)
if (th->running_time_us >= limits_us) {
thread_debug("rb_thread_schedule/switch start\n");
RB_GC_SAVE_MACHINE_CONTEXT(th);
- gvl_yield(th->vm, th);
- rb_thread_set_current(th);
+ gvl_yield(rb_ractor_gvl(th->ractor), th);
+ rb_ractor_thread_switch(th->ractor, th);
thread_debug("rb_thread_schedule/switch done\n");
}
}
@@ -1264,10 +1618,8 @@ rb_thread_schedule_limits(unsigned long limits_us)
void
rb_thread_schedule(void)
{
- rb_thread_t *cur_th = GET_THREAD();
rb_thread_schedule_limits(0);
-
- RUBY_VM_CHECK_INTS(cur_th);
+ RUBY_VM_CHECK_INTS(GET_EC());
}
/* blocking region */
@@ -1277,12 +1629,13 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
{
region->prev_status = th->status;
- if (set_unblock_function(th, ubf, arg, &region->oldubf, fail_if_interrupted)) {
+ if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
th->blocking_region_buffer = region;
th->status = THREAD_STOPPED;
+ rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
thread_debug("enter blocking region (%p)\n", (void *)th);
RB_GC_SAVE_MACHINE_CONTEXT(th);
- gvl_release(th->vm);
+ gvl_release(rb_ractor_gvl(th->ractor));
return TRUE;
}
else {
@@ -1293,38 +1646,61 @@ blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
static inline void
blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
{
- gvl_acquire(th->vm, th);
- rb_thread_set_current(th);
- thread_debug("leave blocking region (%p)\n", (void *)th);
+ /* entry to ubf_list still permitted at this point, make it impossible: */
+ unblock_function_clear(th);
+ /* entry to ubf_list impossible at this point, so unregister is safe: */
unregister_ubf_list(th);
+
+ gvl_acquire(rb_ractor_gvl(th->ractor), th);
+ rb_ractor_thread_switch(th->ractor, th);
+
+ thread_debug("leave blocking region (%p)\n", (void *)th);
th->blocking_region_buffer = 0;
- reset_unblock_function(th, &region->oldubf);
+ rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
if (th->status == THREAD_STOPPED) {
th->status = region->prev_status;
}
}
-static void *
-call_without_gvl(void *(*func)(void *), void *data1,
- rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted)
+void *
+rb_nogvl(void *(*func)(void *), void *data1,
+ rb_unblock_function_t *ubf, void *data2,
+ int flags)
{
void *val = 0;
-
- rb_thread_t *th = GET_THREAD();
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
+ rb_vm_t *vm = rb_ec_vm_ptr(ec);
+ bool is_main_thread = vm->ractor.main_thread == th;
int saved_errno = 0;
+ VALUE ubf_th = Qfalse;
- if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
+ if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
ubf = ubf_select;
data2 = th;
}
+ else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
+ if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
+ vm->ubf_async_safe = 1;
+ }
+ else {
+ ubf_th = rb_thread_start_unblock_thread();
+ }
+ }
- BLOCKING_REGION({
+ BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
- }, ubf, data2, fail_if_interrupted);
+ }, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
+
+ if (is_main_thread) vm->ubf_async_safe = 0;
- if (!fail_if_interrupted) {
- RUBY_VM_CHECK_INTS_BLOCKING(th);
+ if ((flags & RB_NOGVL_INTR_FAIL) == 0) {
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ }
+
+ if (ubf_th != Qfalse) {
+ thread_value(rb_thread_kill(ubf_th));
}
errno = saved_errno;
@@ -1421,47 +1797,59 @@ void *
rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2)
{
- return call_without_gvl(func, data1, ubf, data2, TRUE);
+ return rb_nogvl(func, data1, ubf, data2, RB_NOGVL_INTR_FAIL);
}
void *
rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
rb_unblock_function_t *ubf, void *data2)
{
- return call_without_gvl(func, data1, ubf, data2, FALSE);
+ return rb_nogvl(func, data1, ubf, data2, 0);
}
VALUE
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
{
volatile VALUE val = Qundef; /* shouldn't be used */
- rb_vm_t *vm = GET_VM();
- rb_thread_t *th = GET_THREAD();
+ rb_execution_context_t * volatile ec = GET_EC();
volatile int saved_errno = 0;
- int state;
- struct waiting_fd wfd;
+ enum ruby_tag_type state;
- wfd.fd = fd;
- wfd.th = th;
- list_add(&vm->waiting_fds, &wfd.wfd_node);
+ struct waiting_fd waiting_fd = {
+ .fd = fd,
+ .th = rb_ec_thread_ptr(ec)
+ };
- TH_PUSH_TAG(th);
- if ((state = EXEC_TAG()) == 0) {
- BLOCKING_REGION({
- val = func(data1);
- saved_errno = errno;
- }, ubf_select, th, FALSE);
+ RB_VM_LOCK_ENTER();
+ {
+ list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &waiting_fd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
+
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ BLOCKING_REGION(waiting_fd.th, {
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, waiting_fd.th, FALSE);
}
- TH_POP_TAG();
+ EC_POP_TAG();
- /* must be deleted before jump */
- list_del(&wfd.wfd_node);
+ /*
+ * must be deleted before jump
+ * this will delete either from waiting_fds or on-stack LIST_HEAD(busy)
+ */
+ RB_VM_LOCK_ENTER();
+ {
+ list_del(&waiting_fd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
if (state) {
- TH_JUMP_TAG(th, state);
+ EC_JUMP_TAG(ec, state);
}
/* TODO: check func() */
- RUBY_VM_CHECK_INTS_BLOCKING(th);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
errno = saved_errno;
@@ -1509,7 +1897,7 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
* because this thread is not Ruby's thread.
* What should we do?
*/
-
+ bp();
fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
exit(EXIT_FAILURE);
}
@@ -1525,7 +1913,8 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
/* enter to Ruby world: You can access Ruby values, methods and so on. */
r = (*func)(data1);
/* leave from Ruby world: You can not access Ruby values, etc. */
- blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
+ int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
+ RUBY_ASSERT_ALWAYS(released);
return r;
}
@@ -1618,19 +2007,23 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
VALUE mask;
long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
const VALUE *mask_stack = RARRAY_CONST_PTR(th->pending_interrupt_mask_stack);
- VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */
- long ancestors_len = RARRAY_LEN(ancestors);
- const VALUE *ancestors_ptr = RARRAY_CONST_PTR(ancestors);
- int i, j;
+ VALUE mod;
+ long i;
for (i=0; i<mask_stack_len; i++) {
mask = mask_stack[mask_stack_len-(i+1)];
- for (j=0; j<ancestors_len; j++) {
- VALUE klass = ancestors_ptr[j];
+ for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
+ VALUE klass = mod;
VALUE sym;
- /* TODO: remove rb_intern() */
+ if (BUILTIN_TYPE(mod) == T_ICLASS) {
+ klass = RBASIC(mod)->klass;
+ }
+ else if (mod != RCLASS_ORIGIN(mod)) {
+ continue;
+ }
+
if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
if (sym == sym_immediate) {
return INTERRUPT_IMMEDIATE;
@@ -1652,7 +2045,7 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
}
static int
-rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th)
+rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th)
{
return RARRAY_LEN(th->pending_interrupt_queue) == 0;
}
@@ -1707,8 +2100,8 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi
#endif
}
-int
-rb_threadptr_pending_interrupt_active_p(rb_thread_t *th)
+static int
+threadptr_pending_interrupt_active_p(rb_thread_t *th)
{
/*
* For optimization, we don't check async errinfo queue
@@ -1848,21 +2241,24 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
* # all exceptions inherited from Exception are prohibited.
* }
*
+ * For handling all interrupts, use +Object+ and not +Exception+
+ * as the ExceptionClass, as kill/terminate interrupts are not handled by +Exception+.
*/
static VALUE
rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
{
VALUE mask;
- rb_thread_t *th = GET_THREAD();
+ rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
volatile VALUE r = Qnil;
- int state;
+ enum ruby_tag_type state;
if (!rb_block_given_p()) {
rb_raise(rb_eArgError, "block is needed.");
}
mask = 0;
- mask_arg = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash");
+ mask_arg = rb_to_hash_type(mask_arg);
rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
if (!mask) {
return rb_yield(Qnil);
@@ -1871,25 +2267,25 @@ rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
rb_ary_push(th->pending_interrupt_mask_stack, mask);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
th->pending_interrupt_queue_checked = 0;
- RUBY_VM_SET_INTERRUPT(th);
+ RUBY_VM_SET_INTERRUPT(th->ec);
}
- TH_PUSH_TAG(th);
- if ((state = EXEC_TAG()) == 0) {
+ EC_PUSH_TAG(th->ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
r = rb_yield(Qnil);
}
- TH_POP_TAG();
+ EC_POP_TAG();
rb_ary_pop(th->pending_interrupt_mask_stack);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
th->pending_interrupt_queue_checked = 0;
- RUBY_VM_SET_INTERRUPT(th);
+ RUBY_VM_SET_INTERRUPT(th->ec);
}
- RUBY_VM_CHECK_INTS(th);
+ RUBY_VM_CHECK_INTS(th->ec);
if (state) {
- TH_JUMP_TAG(th, state);
+ EC_JUMP_TAG(th->ec, state);
}
return r;
@@ -1908,9 +2304,7 @@ rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
static VALUE
rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
{
- rb_thread_t *target_th;
-
- GetThreadPtr(target_thread, target_th);
+ rb_thread_t *target_th = rb_thread_ptr(target_thread);
if (!target_th->pending_interrupt_queue) {
return Qfalse;
@@ -1918,20 +2312,14 @@ rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
return Qfalse;
}
+ if (rb_check_arity(argc, 0, 1)) {
+ VALUE err = argv[0];
+ if (!rb_obj_is_kind_of(err, rb_cModule)) {
+ rb_raise(rb_eTypeError, "class or module required for rescue clause");
+ }
+ return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err));
+ }
else {
- if (argc == 1) {
- VALUE err;
- rb_scan_args(argc, argv, "01", &err);
- if (!rb_obj_is_kind_of(err, rb_cModule)) {
- rb_raise(rb_eTypeError, "class or module required for rescue clause");
- }
- if (rb_threadptr_pending_interrupt_include_p(target_th, err)) {
- return Qtrue;
- }
- else {
- return Qfalse;
- }
- }
return Qtrue;
}
}
@@ -1999,66 +2387,86 @@ rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
}
+NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
+
static void
rb_threadptr_to_kill(rb_thread_t *th)
{
rb_threadptr_pending_interrupt_clear(th);
th->status = THREAD_RUNNABLE;
th->to_kill = 1;
- th->errinfo = INT2FIX(TAG_FATAL);
- TH_JUMP_TAG(th, TAG_FATAL);
+ th->ec->errinfo = INT2FIX(TAG_FATAL);
+ EC_JUMP_TAG(th->ec, TAG_FATAL);
}
static inline rb_atomic_t
threadptr_get_interrupts(rb_thread_t *th)
{
+ rb_execution_context_t *ec = th->ec;
rb_atomic_t interrupt;
rb_atomic_t old;
do {
- interrupt = th->interrupt_flag;
- old = ATOMIC_CAS(th->interrupt_flag, interrupt, interrupt & th->interrupt_mask);
+ interrupt = ec->interrupt_flag;
+ old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
} while (old != interrupt);
- return interrupt & (rb_atomic_t)~th->interrupt_mask;
+ return interrupt & (rb_atomic_t)~ec->interrupt_mask;
}
-void
+MJIT_FUNC_EXPORTED int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
rb_atomic_t interrupt;
int postponed_job_interrupt = 0;
+ int ret = FALSE;
- if (th->raised_flag) return;
+ if (th->ec->raised_flag) return ret;
while ((interrupt = threadptr_get_interrupts(th)) != 0) {
int sig;
int timer_interrupt;
int pending_interrupt;
int trap_interrupt;
+ int terminate_interrupt;
timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
+ terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
+
+ if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
+ RB_VM_LOCK_ENTER();
+ RB_VM_LOCK_LEAVE();
+ }
if (postponed_job_interrupt) {
rb_postponed_job_flush(th->vm);
}
/* signal handling */
- if (trap_interrupt && (th == th->vm->main_thread)) {
+ if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
enum rb_thread_status prev_status = th->status;
+ int sigwait_fd = rb_sigwait_fd_get(th);
+
+ if (sigwait_fd >= 0) {
+ (void)consume_communication_pipe(sigwait_fd);
+ ruby_sigchld_handler(th->vm);
+ rb_sigwait_fd_put(th, sigwait_fd);
+ rb_sigwait_fd_migrate(th->vm);
+ }
th->status = THREAD_RUNNABLE;
while ((sig = rb_get_next_signal()) != 0) {
- rb_signal_exec(th, sig);
+ ret |= rb_signal_exec(th, sig);
}
th->status = prev_status;
}
/* exception from another thread */
- if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) {
+ if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
+ ret = TRUE;
if (err == Qundef) {
/* no error */
@@ -2066,9 +2474,13 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
else if (err == eKillSignal /* Thread#kill received */ ||
err == eTerminateSignal /* Terminate thread */ ||
err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
- rb_threadptr_to_kill(th);
+ terminate_interrupt = 1;
}
else {
+ if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
+ /* the only special exception to be queued across thread */
+ err = ruby_vm_special_exception_copy(err);
+ }
/* set runnable if th was slept. */
if (th->status == THREAD_STOPPED ||
th->status == THREAD_STOPPED_FOREVER)
@@ -2077,8 +2489,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
}
}
- if (timer_interrupt) {
- unsigned long limits_us = TIME_QUANTUM_USEC;
+ if (terminate_interrupt) {
+ rb_threadptr_to_kill(th);
+ }
+
+ if (timer_interrupt) {
+ uint32_t limits_us = TIME_QUANTUM_USEC;
if (th->priority > 0)
limits_us <<= th->priority;
@@ -2088,19 +2504,20 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
if (th->status == THREAD_RUNNABLE)
th->running_time_us += TIME_QUANTUM_USEC;
- EXEC_EVENT_HOOK(th, RUBY_INTERNAL_EVENT_SWITCH, th->cfp->self, 0, 0, 0, Qundef);
+ VM_ASSERT(th->ec->cfp);
+ EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
+ 0, 0, 0, Qundef);
rb_thread_schedule_limits(limits_us);
}
}
+ return ret;
}
void
rb_thread_execute_interrupts(VALUE thval)
{
- rb_thread_t *th;
- GetThreadPtr(thval, th);
- rb_threadptr_execute_interrupts(th, 1);
+ rb_threadptr_execute_interrupts(rb_thread_ptr(thval), 1);
}
static void
@@ -2109,14 +2526,12 @@ rb_threadptr_ready(rb_thread_t *th)
rb_threadptr_interrupt(th);
}
-void rb_threadptr_setup_exception(rb_thread_t *th, VALUE mesg, VALUE cause);
-
static VALUE
-rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
+rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
{
VALUE exc;
- if (rb_threadptr_dead(th)) {
+ if (rb_threadptr_dead(target_th)) {
return Qnil;
}
@@ -2129,13 +2544,13 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
/* making an exception object can switch thread,
so we need to check thread deadness again */
- if (rb_threadptr_dead(th)) {
+ if (rb_threadptr_dead(target_th)) {
return Qnil;
}
- rb_threadptr_setup_exception(GET_THREAD(), exc, Qundef);
- rb_threadptr_pending_interrupt_enque(th, exc);
- rb_threadptr_interrupt(th);
+ rb_ec_setup_exception(GET_EC(), exc, Qundef);
+ rb_threadptr_pending_interrupt_enque(target_th, exc);
+ rb_threadptr_interrupt(target_th);
return Qnil;
}
@@ -2146,7 +2561,7 @@ rb_threadptr_signal_raise(rb_thread_t *th, int sig)
argv[0] = rb_eSignal;
argv[1] = INT2FIX(sig);
- rb_threadptr_raise(th->vm->main_thread, 2, argv);
+ rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
}
void
@@ -2156,77 +2571,67 @@ rb_threadptr_signal_exit(rb_thread_t *th)
argv[0] = rb_eSystemExit;
argv[1] = rb_str_new2("exit");
- rb_threadptr_raise(th->vm->main_thread, 2, argv);
-}
-#if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK)
-#define USE_SIGALTSTACK
-#endif
-
-NORETURN(void ruby_thread_stack_overflow(rb_thread_t *th));
-void
-ruby_thread_stack_overflow(rb_thread_t *th)
-{
- th->raised_flag = 0;
-#ifdef USE_SIGALTSTACK
- if (!rb_during_gc()) {
- rb_exc_raise(sysstack_error);
- }
-#endif
- th->errinfo = sysstack_error;
- TH_JUMP_TAG(th, TAG_RAISE);
+ // TODO: check signal raise deliverly
+ rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
}
int
-rb_threadptr_set_raised(rb_thread_t *th)
+rb_ec_set_raised(rb_execution_context_t *ec)
{
- if (th->raised_flag & RAISED_EXCEPTION) {
+ if (ec->raised_flag & RAISED_EXCEPTION) {
return 1;
}
- th->raised_flag |= RAISED_EXCEPTION;
+ ec->raised_flag |= RAISED_EXCEPTION;
return 0;
}
int
-rb_threadptr_reset_raised(rb_thread_t *th)
+rb_ec_reset_raised(rb_execution_context_t *ec)
{
- if (!(th->raised_flag & RAISED_EXCEPTION)) {
+ if (!(ec->raised_flag & RAISED_EXCEPTION)) {
return 0;
}
- th->raised_flag &= ~RAISED_EXCEPTION;
+ ec->raised_flag &= ~RAISED_EXCEPTION;
return 1;
}
int
-rb_notify_fd_close(int fd)
+rb_notify_fd_close(int fd, struct list_head *busy)
{
rb_vm_t *vm = GET_THREAD()->vm;
- struct waiting_fd *wfd = 0;
- int busy;
-
- busy = 0;
- list_for_each(&vm->waiting_fds, wfd, wfd_node) {
- if (wfd->fd == fd) {
- rb_thread_t *th = wfd->th;
- VALUE err;
+ struct waiting_fd *wfd = 0, *next;
- busy = 1;
- if (!th) {
- continue;
- }
- wfd->th = 0;
- err = th->vm->special_exceptions[ruby_error_stream_closed];
- rb_threadptr_pending_interrupt_enque(th, err);
- rb_threadptr_interrupt(th);
- }
+ RB_VM_LOCK_ENTER();
+ {
+ list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
+ if (wfd->fd == fd) {
+ rb_thread_t *th = wfd->th;
+ VALUE err;
+
+ list_del(&wfd->wfd_node);
+ list_add(busy, &wfd->wfd_node);
+
+ err = th->vm->special_exceptions[ruby_error_stream_closed];
+ rb_threadptr_pending_interrupt_enque(th, err);
+ rb_threadptr_interrupt(th);
+ }
+ }
}
- return busy;
+ RB_VM_LOCK_LEAVE();
+
+ return !list_empty(busy);
}
void
rb_thread_fd_close(int fd)
{
- while (rb_notify_fd_close(fd)) rb_thread_schedule();
+ struct list_head busy;
+
+ list_head_init(&busy);
+ if (rb_notify_fd_close(fd, &busy)) {
+ do rb_thread_schedule(); while (!list_empty(&busy));
+ }
}
/*
@@ -2253,15 +2658,15 @@ rb_thread_fd_close(int fd)
static VALUE
thread_raise_m(int argc, VALUE *argv, VALUE self)
{
- rb_thread_t *target_th;
- rb_thread_t *th = GET_THREAD();
- GetThreadPtr(self, target_th);
+ rb_thread_t *target_th = rb_thread_ptr(self);
+ const rb_thread_t *current_th = GET_THREAD();
+
threadptr_check_pending_interrupt_queue(target_th);
rb_threadptr_raise(target_th, argc, argv);
/* To perform Thread.current.raise as Kernel.raise */
- if (th == target_th) {
- RUBY_VM_CHECK_INTS(th);
+ if (current_th == target_th) {
+ RUBY_VM_CHECK_INTS(target_th->ec);
}
return Qnil;
}
@@ -2269,28 +2674,24 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
/*
* call-seq:
- * thr.exit -> thr or nil
- * thr.kill -> thr or nil
- * thr.terminate -> thr or nil
- *
- * Terminates +thr+ and schedules another thread to be run.
- *
- * If this thread is already marked to be killed, #exit returns the Thread.
+ * thr.exit -> thr
+ * thr.kill -> thr
+ * thr.terminate -> thr
*
- * If this is the main thread, or the last thread, exits the process.
+ * Terminates +thr+ and schedules another thread to be run, returning
+ * the terminated Thread. If this is the main thread, or the last
+ * thread, exits the process.
*/
VALUE
rb_thread_kill(VALUE thread)
{
- rb_thread_t *th;
-
- GetThreadPtr(thread, th);
+ rb_thread_t *th = rb_thread_ptr(thread);
if (th->to_kill || th->status == THREAD_KILLED) {
return thread;
}
- if (th == th->vm->main_thread) {
+ if (th == th->vm->ractor.main_thread) {
rb_exit(EXIT_SUCCESS);
}
@@ -2311,9 +2712,7 @@ rb_thread_kill(VALUE thread)
int
rb_thread_to_be_killed(VALUE thread)
{
- rb_thread_t *th;
-
- GetThreadPtr(thread, th);
+ rb_thread_t *th = rb_thread_ptr(thread);
if (th->to_kill || th->status == THREAD_KILLED) {
return TRUE;
@@ -2355,7 +2754,7 @@ rb_thread_s_kill(VALUE obj, VALUE th)
*/
static VALUE
-rb_thread_exit(void)
+rb_thread_exit(VALUE _)
{
rb_thread_t *th = GET_THREAD();
return rb_thread_kill(th->self);
@@ -2390,15 +2789,16 @@ rb_thread_wakeup(VALUE thread)
VALUE
rb_thread_wakeup_alive(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
+ if (target_th->status == THREAD_KILLED) return Qnil;
- if (th->status == THREAD_KILLED) {
- return Qnil;
+ rb_threadptr_ready(target_th);
+
+ if (target_th->status == THREAD_STOPPED ||
+ target_th->status == THREAD_STOPPED_FOREVER) {
+ target_th->status = THREAD_RUNNABLE;
}
- rb_threadptr_ready(th);
- if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
- th->status = THREAD_RUNNABLE;
+
return thread;
}
@@ -2433,6 +2833,17 @@ rb_thread_run(VALUE thread)
}
+VALUE
+rb_thread_stop(void)
+{
+ if (rb_thread_alone()) {
+ rb_raise(rb_eThreadError,
+ "stopping only thread\n\tnote: use sleep to stop forever");
+ }
+ rb_thread_sleep_deadly();
+ return Qnil;
+}
+
/*
* call-seq:
* Thread.stop -> nil
@@ -2448,19 +2859,21 @@ rb_thread_run(VALUE thread)
* #=> "abc"
*/
-VALUE
-rb_thread_stop(void)
+static VALUE
+thread_stop(VALUE _)
{
- if (rb_thread_alone()) {
- rb_raise(rb_eThreadError,
- "stopping only thread\n\tnote: use sleep to stop forever");
- }
- rb_thread_sleep_deadly();
- return Qnil;
+ return rb_thread_stop();
}
/********************************************************************/
+VALUE
+rb_thread_list(void)
+{
+ // TODO
+ return rb_ractor_thread_list(GET_RACTOR());
+}
+
/*
* call-seq:
* Thread.list -> array
@@ -2481,24 +2894,10 @@ rb_thread_stop(void)
* #<Thread:0x401bdf4c run>
*/
-VALUE
-rb_thread_list(void)
+static VALUE
+thread_list(VALUE _)
{
- VALUE ary = rb_ary_new();
- rb_vm_t *vm = GET_THREAD()->vm;
- rb_thread_t *th = 0;
-
- list_for_each(&vm->living_threads, th, vmlt_node) {
- switch (th->status) {
- case THREAD_RUNNABLE:
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- rb_ary_push(ary, th->self);
- default:
- break;
- }
- }
- return ary;
+ return rb_thread_list();
}
VALUE
@@ -2525,7 +2924,7 @@ thread_s_current(VALUE klass)
VALUE
rb_thread_main(void)
{
- return GET_THREAD()->vm->main_thread->self;
+ return GET_RACTOR()->threads.main->self;
}
/*
@@ -2563,9 +2962,9 @@ rb_thread_s_main(VALUE klass)
*/
static VALUE
-rb_thread_s_abort_exc(void)
+rb_thread_s_abort_exc(VALUE _)
{
- return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse;
+ return RBOOL(GET_THREAD()->vm->thread_abort_on_exception);
}
@@ -2625,9 +3024,7 @@ rb_thread_s_abort_exc_set(VALUE self, VALUE val)
static VALUE
rb_thread_abort_exc(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return th->abort_on_exception ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_ptr(thread)->abort_on_exception);
}
@@ -2647,10 +3044,7 @@ rb_thread_abort_exc(VALUE thread)
static VALUE
rb_thread_abort_exc_set(VALUE thread, VALUE val)
{
- rb_thread_t *th;
-
- GetThreadPtr(thread, th);
- th->abort_on_exception = RTEST(val);
+ rb_thread_ptr(thread)->abort_on_exception = RTEST(val);
return val;
}
@@ -2661,21 +3055,46 @@ rb_thread_abort_exc_set(VALUE thread, VALUE val)
*
* Returns the status of the global ``report on exception'' condition.
*
- * The default is +false+.
+ * The default is +true+ since Ruby 2.5.
+ *
+ * All threads created when this flag is true will report
+ * a message on $stderr if an exception kills the thread.
+ *
+ * Thread.new { 1.times { raise } }
+ *
+ * will produce this output on $stderr:
*
- * When set to +true+, all threads will report the exception if an
- * exception is raised in any thread.
+ * #<Thread:...> terminated with exception (report_on_exception is true):
+ * Traceback (most recent call last):
+ * 2: from -e:1:in `block in <main>'
+ * 1: from -e:1:in `times'
+ *
+ * This is done to catch errors in threads early.
+ * In some cases, you might not want this output.
+ * There are multiple ways to avoid the extra output:
+ *
+ * * If the exception is not intended, the best is to fix the cause of
+ * the exception so it does not happen anymore.
+ * * If the exception is intended, it might be better to rescue it closer to
+ * where it is raised rather then let it kill the Thread.
+ * * If it is guaranteed the Thread will be joined with Thread#join or
+ * Thread#value, then it is safe to disable this report with
+ * <code>Thread.current.report_on_exception = false</code>
+ * when starting the Thread.
+ * However, this might handle the exception much later, or not at all
+ * if the Thread is never joined due to the parent thread being blocked, etc.
*
* See also ::report_on_exception=.
*
* There is also an instance level method to set this for a specific thread,
- * see #report_on_exception.
+ * see #report_on_exception=.
+ *
*/
static VALUE
-rb_thread_s_report_exc(void)
+rb_thread_s_report_exc(VALUE _)
{
- return GET_THREAD()->vm->thread_report_on_exception ? Qtrue : Qfalse;
+ return RBOOL(GET_THREAD()->vm->thread_report_on_exception);
}
@@ -2683,8 +3102,9 @@ rb_thread_s_report_exc(void)
* call-seq:
* Thread.report_on_exception= boolean -> true or false
*
- * When set to +true+, all threads will report the exception if an
- * exception is raised. Returns the new state.
+ * Returns the new state.
+ * When set to +true+, all threads created afterwards will inherit the
+ * condition and report a message on $stderr if an exception kills a thread:
*
* Thread.report_on_exception = true
* t1 = Thread.new do
@@ -2697,10 +3117,9 @@ rb_thread_s_report_exc(void)
* This will produce:
*
* In new thread
- * prog.rb:4: Exception from thread (RuntimeError)
- * from prog.rb:2:in `initialize'
- * from prog.rb:2:in `new'
- * from prog.rb:2
+ * #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true):
+ * Traceback (most recent call last):
+ * prog.rb:4:in `block in <main>': Exception from thread (RuntimeError)
* In the main thread
*
* See also ::report_on_exception.
@@ -2719,25 +3138,70 @@ rb_thread_s_report_exc_set(VALUE self, VALUE val)
/*
* call-seq:
+ * Thread.ignore_deadlock -> true or false
+ *
+ * Returns the status of the global ``ignore deadlock'' condition.
+ * The default is +false+, so that deadlock conditions are not ignored.
+ *
+ * See also ::ignore_deadlock=.
+ *
+ */
+
+static VALUE
+rb_thread_s_ignore_deadlock(VALUE _)
+{
+ return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock);
+}
+
+
+/*
+ * call-seq:
+ * Thread.ignore_deadlock = boolean -> true or false
+ *
+ * Returns the new state.
+ * When set to +true+, the VM will not check for deadlock conditions.
+ * It is only useful to set this if your application can break a
+ * deadlock condition via some other means, such as a signal.
+ *
+ * Thread.ignore_deadlock = true
+ * queue = Thread::Queue.new
+ *
+ * trap(:SIGUSR1){queue.push "Received signal"}
+ *
+ * # raises fatal error unless ignoring deadlock
+ * puts queue.pop
+ *
+ * See also ::ignore_deadlock.
+ */
+
+static VALUE
+rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val)
+{
+ GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val);
+ return val;
+}
+
+
+/*
+ * call-seq:
* thr.report_on_exception -> true or false
*
* Returns the status of the thread-local ``report on exception'' condition for
* this +thr+.
*
- * The default is +false+.
+ * The default value when creating a Thread is the value of
+ * the global flag Thread.report_on_exception.
*
* See also #report_on_exception=.
*
- * There is also a class level method to set this for all threads, see
- * ::report_on_exception.
+ * There is also a class level method to set this for all new threads, see
+ * ::report_on_exception=.
*/
static VALUE
rb_thread_report_exc(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return th->report_on_exception ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_ptr(thread)->report_on_exception);
}
@@ -2745,22 +3209,19 @@ rb_thread_report_exc(VALUE thread)
* call-seq:
* thr.report_on_exception= boolean -> true or false
*
- * When set to +true+, all threads (including the main program) will
- * report the exception if an exception is raised in this +thr+.
+ * When set to +true+, a message is printed on $stderr if an exception
+ * kills this +thr+. See ::report_on_exception for details.
*
* See also #report_on_exception.
*
- * There is also a class level method to set this for all threads, see
+ * There is also a class level method to set this for all new threads, see
* ::report_on_exception=.
*/
static VALUE
rb_thread_report_exc_set(VALUE thread, VALUE val)
{
- rb_thread_t *th;
-
- GetThreadPtr(thread, th);
- th->report_on_exception = RTEST(val);
+ rb_thread_ptr(thread)->report_on_exception = RTEST(val);
return val;
}
@@ -2769,8 +3230,7 @@ rb_thread_report_exc_set(VALUE thread, VALUE val)
* call-seq:
* thr.group -> thgrp or nil
*
- * Returns the ThreadGroup which contains the given thread, or returns +nil+
- * if +thr+ is not a member of any group.
+ * Returns the ThreadGroup which contains the given thread.
*
* Thread.main.group #=> #<ThreadGroup:0x4029d914>
*/
@@ -2778,15 +3238,7 @@ rb_thread_report_exc_set(VALUE thread, VALUE val)
VALUE
rb_thread_group(VALUE thread)
{
- rb_thread_t *th;
- VALUE group;
- GetThreadPtr(thread, th);
- group = th->thgroup;
-
- if (!group) {
- group = Qnil;
- }
- return group;
+ return rb_thread_ptr(thread)->thgroup;
}
static const char *
@@ -2794,10 +3246,7 @@ thread_status_name(rb_thread_t *th, int detail)
{
switch (th->status) {
case THREAD_RUNNABLE:
- if (th->to_kill)
- return "aborting";
- else
- return "run";
+ return th->to_kill ? "aborting" : "run";
case THREAD_STOPPED_FOREVER:
if (detail) return "sleep_forever";
case THREAD_STOPPED:
@@ -2850,17 +3299,20 @@ rb_threadptr_dead(rb_thread_t *th)
static VALUE
rb_thread_status(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
- if (rb_threadptr_dead(th)) {
- if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo)
- /* TODO */ ) {
+ if (rb_threadptr_dead(target_th)) {
+ if (!NIL_P(target_th->ec->errinfo) &&
+ !FIXNUM_P(target_th->ec->errinfo)) {
return Qnil;
}
- return Qfalse;
+ else {
+ return Qfalse;
+ }
+ }
+ else {
+ return rb_str_new2(thread_status_name(target_th, FALSE));
}
- return rb_str_new2(thread_status_name(th, FALSE));
}
@@ -2881,12 +3333,7 @@ rb_thread_status(VALUE thread)
static VALUE
rb_thread_alive_p(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
-
- if (rb_threadptr_dead(th))
- return Qfalse;
- return Qtrue;
+ return RBOOL(!thread_finished(rb_thread_ptr(thread)));
}
/*
@@ -2906,35 +3353,12 @@ rb_thread_alive_p(VALUE thread)
static VALUE
rb_thread_stop_p(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *th = rb_thread_ptr(thread);
- if (rb_threadptr_dead(th))
- return Qtrue;
- if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
+ if (rb_threadptr_dead(th)) {
return Qtrue;
- return Qfalse;
-}
-
-/*
- * call-seq:
- * thr.safe_level -> integer
- *
- * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe
- * levels can help when implementing sandboxes which run insecure code.
- *
- * thr = Thread.new { $SAFE = 1; sleep }
- * Thread.current.safe_level #=> 0
- * thr.safe_level #=> 1
- */
-
-static VALUE
-rb_thread_safe_level(VALUE thread)
-{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
-
- return INT2NUM(th->safe_level);
+ }
+ return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER);
}
/*
@@ -2947,9 +3371,7 @@ rb_thread_safe_level(VALUE thread)
static VALUE
rb_thread_getname(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return th->name;
+ return rb_thread_ptr(thread)->name;
}
/*
@@ -2963,11 +3385,8 @@ rb_thread_getname(VALUE thread)
static VALUE
rb_thread_setname(VALUE thread, VALUE name)
{
-#ifdef SET_ANOTHER_THREAD_NAME
- const char *s = "";
-#endif
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
+
if (!NIL_P(name)) {
rb_encoding *enc;
StringValueCStr(name);
@@ -2977,68 +3396,92 @@ rb_thread_setname(VALUE thread, VALUE name)
rb_enc_name(enc));
}
name = rb_str_new_frozen(name);
-#ifdef SET_ANOTHER_THREAD_NAME
- s = RSTRING_PTR(name);
-#endif
}
- th->name = name;
-#if defined(SET_ANOTHER_THREAD_NAME)
- if (threadptr_initialized(th)) {
- SET_ANOTHER_THREAD_NAME(th->thread_id, s);
+ target_th->name = name;
+ if (threadptr_initialized(target_th)) {
+ native_set_another_thread_name(target_th->thread_id, name);
}
-#endif
return name;
}
+#if USE_NATIVE_THREAD_NATIVE_THREAD_ID
/*
* call-seq:
- * thr.inspect -> string
+ * thr.native_thread_id -> integer
+ *
+ * Return the native thread ID which is used by the Ruby thread.
+ *
+ * The ID depends on the OS. (not POSIX thread ID returned by pthread_self(3))
+ * * On Linux it is TID returned by gettid(2).
+ * * On macOS it is the system-wide unique integral ID of thread returned
+ * by pthread_threadid_np(3).
+ * * On FreeBSD it is the unique integral ID of the thread returned by
+ * pthread_getthreadid_np(3).
+ * * On Windows it is the thread identifier returned by GetThreadId().
+ * * On other platforms, it raises NotImplementedError.
+ *
+ * NOTE:
+ * If the thread is not associated yet or already deassociated with a native
+ * thread, it returns _nil_.
+ * If the Ruby implementation uses M:N thread model, the ID may change
+ * depending on the timing.
+ */
+
+static VALUE
+rb_thread_native_thread_id(VALUE thread)
+{
+ rb_thread_t *target_th = rb_thread_ptr(thread);
+ if (rb_threadptr_dead(target_th)) return Qnil;
+ return native_thread_native_thread_id(target_th);
+}
+#else
+# define rb_thread_native_thread_id rb_f_notimplement
+#endif
+
+/*
+ * call-seq:
+ * thr.to_s -> string
*
* Dump the name, id, and status of _thr_ to a string.
*/
static VALUE
-rb_thread_inspect(VALUE thread)
+rb_thread_to_s(VALUE thread)
{
VALUE cname = rb_class_path(rb_obj_class(thread));
- rb_thread_t *th;
+ rb_thread_t *target_th = rb_thread_ptr(thread);
const char *status;
- VALUE str;
+ VALUE str, loc;
- GetThreadPtr(thread, th);
- status = thread_status_name(th, TRUE);
+ status = thread_status_name(target_th, TRUE);
str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread);
- if (!NIL_P(th->name)) {
- rb_str_catf(str, "@%"PRIsVALUE, th->name);
- }
- if (!th->first_func && th->first_proc) {
- VALUE loc = rb_proc_location(th->first_proc);
- if (!NIL_P(loc)) {
- const VALUE *ptr = RARRAY_CONST_PTR(loc);
- rb_str_catf(str, "@%"PRIsVALUE":%"PRIsVALUE, ptr[0], ptr[1]);
- rb_gc_force_recycle(loc);
- }
+ if (!NIL_P(target_th->name)) {
+ rb_str_catf(str, "@%"PRIsVALUE, target_th->name);
+ }
+ if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
+ rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE,
+ RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
}
rb_str_catf(str, " %s>", status);
- OBJ_INFECT(str, thread);
return str;
}
/* variables for recursive traversals */
-static ID recursive_key;
+#define recursive_key id__recursive_key__
static VALUE
threadptr_local_aref(rb_thread_t *th, ID id)
{
if (id == recursive_key) {
- return th->local_storage_recursive_hash;
+ return th->ec->local_storage_recursive_hash;
}
else {
- st_data_t val;
+ VALUE val;
+ struct rb_id_table *local_storage = th->ec->local_storage;
- if (th->local_storage && st_lookup(th->local_storage, id, &val)) {
- return (VALUE)val;
+ if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) {
+ return val;
}
else {
return Qnil;
@@ -3049,9 +3492,7 @@ threadptr_local_aref(rb_thread_t *th, ID id)
VALUE
rb_thread_local_aref(VALUE thread, ID id)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return threadptr_local_aref(th, id);
+ return threadptr_local_aref(rb_thread_ptr(thread), id);
}
/*
@@ -3122,38 +3563,88 @@ rb_thread_aref(VALUE thread, VALUE key)
return rb_thread_local_aref(thread, id);
}
+/*
+ * call-seq:
+ * thr.fetch(sym) -> obj
+ * thr.fetch(sym) { } -> obj
+ * thr.fetch(sym, default) -> obj
+ *
+ * Returns a fiber-local for the given key. If the key can't be
+ * found, there are several options: With no other arguments, it will
+ * raise a KeyError exception; if <i>default</i> is given, then that
+ * will be returned; if the optional code block is specified, then
+ * that will be run and its result returned. See Thread#[] and
+ * Hash#fetch.
+ */
static VALUE
-threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
+rb_thread_fetch(int argc, VALUE *argv, VALUE self)
{
+ VALUE key, val;
+ ID id;
+ rb_thread_t *target_th = rb_thread_ptr(self);
+ int block_given;
+
+ rb_check_arity(argc, 1, 2);
+ key = argv[0];
+
+ block_given = rb_block_given_p();
+ if (block_given && argc == 2) {
+ rb_warn("block supersedes default value argument");
+ }
+
+ id = rb_check_id(&key);
+
if (id == recursive_key) {
- th->local_storage_recursive_hash = val;
+ return target_th->ec->local_storage_recursive_hash;
+ }
+ else if (id && target_th->ec->local_storage &&
+ rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
return val;
}
- else if (NIL_P(val)) {
- if (!th->local_storage) return Qnil;
- st_delete_wrap(th->local_storage, id);
- return Qnil;
+ else if (block_given) {
+ return rb_yield(key);
+ }
+ else if (argc == 1) {
+ rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
}
else {
- if (!th->local_storage) {
- th->local_storage = st_init_numtable();
- }
- st_insert(th->local_storage, id, val);
+ return argv[1];
+ }
+}
+
+static VALUE
+threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
+{
+ if (id == recursive_key) {
+ th->ec->local_storage_recursive_hash = val;
return val;
}
+ else {
+ struct rb_id_table *local_storage = th->ec->local_storage;
+
+ if (NIL_P(val)) {
+ if (!local_storage) return Qnil;
+ rb_id_table_delete(local_storage, id);
+ return Qnil;
+ }
+ else {
+ if (local_storage == NULL) {
+ th->ec->local_storage = local_storage = rb_id_table_create(0);
+ }
+ rb_id_table_insert(local_storage, id, val);
+ return val;
+ }
+ }
}
VALUE
rb_thread_local_aset(VALUE thread, ID id, VALUE val)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
-
if (OBJ_FROZEN(thread)) {
- rb_error_frozen("thread locals");
+ rb_frozen_error_raise(thread, "can't modify frozen thread locals");
}
- return threadptr_local_aset(th, id, val);
+ return threadptr_local_aset(rb_thread_ptr(thread), id, val);
}
/*
@@ -3208,7 +3699,10 @@ rb_thread_variable_get(VALUE thread, VALUE key)
{
VALUE locals;
- locals = rb_ivar_get(thread, id_locals);
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ return Qnil;
+ }
+ locals = rb_thread_local_storage(thread);
return rb_hash_aref(locals, rb_to_symbol(key));
}
@@ -3222,16 +3716,16 @@ rb_thread_variable_get(VALUE thread, VALUE key)
*/
static VALUE
-rb_thread_variable_set(VALUE thread, VALUE id, VALUE val)
+rb_thread_variable_set(VALUE thread, VALUE key, VALUE val)
{
VALUE locals;
if (OBJ_FROZEN(thread)) {
- rb_error_frozen("thread locals");
+ rb_frozen_error_raise(thread, "can't modify frozen thread locals");
}
- locals = rb_ivar_get(thread, id_locals);
- return rb_hash_aset(locals, rb_to_symbol(id), val);
+ locals = rb_thread_local_storage(thread);
+ return rb_hash_aset(locals, rb_to_symbol(key), val);
}
/*
@@ -3250,31 +3744,28 @@ rb_thread_variable_set(VALUE thread, VALUE id, VALUE val)
static VALUE
rb_thread_key_p(VALUE self, VALUE key)
{
- rb_thread_t *th;
+ VALUE val;
ID id = rb_check_id(&key);
+ struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
- GetThreadPtr(self, th);
-
- if (!id || !th->local_storage) {
+ if (!id || local_storage == NULL) {
return Qfalse;
}
- if (st_lookup(th->local_storage, id, 0)) {
- return Qtrue;
- }
- return Qfalse;
+ return RBOOL(rb_id_table_lookup(local_storage, id, &val));
}
-static int
-thread_keys_i(ID key, VALUE value, VALUE ary)
+static enum rb_id_table_iterator_result
+thread_keys_i(ID key, VALUE value, void *ary)
{
- rb_ary_push(ary, ID2SYM(key));
- return ST_CONTINUE;
+ rb_ary_push((VALUE)ary, ID2SYM(key));
+ return ID_TABLE_CONTINUE;
}
int
rb_thread_alone(void)
{
- return vm_living_thread_num(GET_VM()) == 1;
+ // TODO
+ return rb_ractor_living_thread_num(GET_RACTOR()) == 1;
}
/*
@@ -3294,12 +3785,11 @@ rb_thread_alone(void)
static VALUE
rb_thread_keys(VALUE self)
{
- rb_thread_t *th;
+ struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
VALUE ary = rb_ary_new();
- GetThreadPtr(self, th);
- if (th->local_storage) {
- st_foreach(th->local_storage, thread_keys_i, ary);
+ if (local_storage) {
+ rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
}
return ary;
}
@@ -3334,8 +3824,11 @@ rb_thread_variables(VALUE thread)
VALUE locals;
VALUE ary;
- locals = rb_ivar_get(thread, id_locals);
ary = rb_ary_new();
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ return ary;
+ }
+ locals = rb_thread_local_storage(thread);
rb_hash_foreach(locals, keys_i, ary);
return ary;
@@ -3361,20 +3854,13 @@ static VALUE
rb_thread_variable_p(VALUE thread, VALUE key)
{
VALUE locals;
- ID id = rb_check_id(&key);
- if (!id) return Qfalse;
-
- locals = rb_ivar_get(thread, id_locals);
-
- if (!RHASH(locals)->ntbl)
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
return Qfalse;
-
- if (st_lookup(RHASH(locals)->ntbl, ID2SYM(id), 0)) {
- return Qtrue;
}
+ locals = rb_thread_local_storage(thread);
- return Qfalse;
+ return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil);
}
/*
@@ -3395,9 +3881,7 @@ rb_thread_variable_p(VALUE thread, VALUE key)
static VALUE
rb_thread_priority(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return INT2NUM(th->priority);
+ return INT2NUM(rb_thread_ptr(thread)->priority);
}
@@ -3430,13 +3914,11 @@ rb_thread_priority(VALUE thread)
static VALUE
rb_thread_priority_set(VALUE thread, VALUE prio)
{
- rb_thread_t *th;
+ rb_thread_t *target_th = rb_thread_ptr(thread);
int priority;
- GetThreadPtr(thread, th);
-
#if USE_NATIVE_THREAD_PRIORITY
- th->priority = NUM2INT(prio);
+ target_th->priority = NUM2INT(prio);
native_thread_apply_priority(th);
#else
priority = NUM2INT(prio);
@@ -3446,9 +3928,9 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
else if (priority < RUBY_THREAD_PRIORITY_MIN) {
priority = RUBY_THREAD_PRIORITY_MIN;
}
- th->priority = priority;
+ target_th->priority = (int8_t)priority;
#endif
- return INT2NUM(th->priority);
+ return INT2NUM(target_th->priority);
}
/* for IO */
@@ -3581,11 +4063,6 @@ rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
memcpy(dst->fdset, src->fdset, size);
}
-#ifdef __native_client__
-int select(int nfds, fd_set *readfds, fd_set *writefds,
- fd_set *exceptfds, struct timeval *timeout);
-#endif
-
int
rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
{
@@ -3605,9 +4082,7 @@ rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *excep
return select(n, r, w, e, timeout);
}
-#if defined __GNUC__ && __GNUC__ >= 6
-#define rb_fd_no_init(fds) ASSUME(!(fds)->maxfd)
-#endif
+#define rb_fd_no_init(fds) ((void)((fds)->fdset = 0), (void)((fds)->maxfd = 0))
#undef FD_ZERO
#undef FD_SET
@@ -3657,7 +4132,9 @@ rb_fd_set(int fd, rb_fdset_t *set)
}
if (set->fdset->fd_count >= (unsigned)set->capa) {
set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
- set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa);
+ set->fdset =
+ rb_xrealloc_mul_add(
+ set->fdset, set->capa, sizeof(SOCKET), sizeof(unsigned int));
}
set->fdset->fd_array[set->fdset->fd_count++] = s;
}
@@ -3672,159 +4149,203 @@ rb_fd_set(int fd, rb_fdset_t *set)
#define FD_CLR(i, f) rb_fd_clr((i), (f))
#define FD_ISSET(i, f) rb_fd_isset((i), (f))
+#define rb_fd_no_init(fds) (void)((fds)->fdset = 0)
+
#endif
#ifndef rb_fd_no_init
#define rb_fd_no_init(fds) (void)(fds)
#endif
-static inline int
-retryable(int e)
+static int
+wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
{
- if (e == EINTR) return TRUE;
+ if (*result < 0) {
+ switch (errnum) {
+ case EINTR:
#ifdef ERESTART
- if (e == ERESTART) return TRUE;
+ case ERESTART:
#endif
+ *result = 0;
+ if (rel && hrtime_update_expire(rel, end)) {
+ *rel = 0;
+ }
+ return TRUE;
+ }
+ return FALSE;
+ }
+ else if (*result == 0) {
+ /* check for spurious wakeup */
+ if (rel) {
+ return !hrtime_update_expire(rel, end);
+ }
+ return TRUE;
+ }
return FALSE;
}
-#define restore_fdset(fds1, fds2) \
- ((fds1) ? rb_fd_dup(fds1, fds2) : (void)0)
+struct select_set {
+ int max;
+ int sigwait_fd;
+ rb_thread_t *th;
+ rb_fdset_t *rset;
+ rb_fdset_t *wset;
+ rb_fdset_t *eset;
+ rb_fdset_t orig_rset;
+ rb_fdset_t orig_wset;
+ rb_fdset_t orig_eset;
+ struct timeval *timeout;
+};
+
+static VALUE
+select_set_free(VALUE p)
+{
+ struct select_set *set = (struct select_set *)p;
-static inline void
-update_timeval(struct timeval *timeout, double limit)
+ if (set->sigwait_fd >= 0) {
+ rb_sigwait_fd_put(set->th, set->sigwait_fd);
+ rb_sigwait_fd_migrate(set->th->vm);
+ }
+
+ rb_fd_term(&set->orig_rset);
+ rb_fd_term(&set->orig_wset);
+ rb_fd_term(&set->orig_eset);
+
+ return Qfalse;
+}
+
+static const rb_hrtime_t *
+sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig,
+ int *drained_p)
{
- if (timeout) {
- double d = limit - timeofday();
+ static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000;
- timeout->tv_sec = (time_t)d;
- timeout->tv_usec = (int)((d-(double)timeout->tv_sec)*1e6);
- if (timeout->tv_sec < 0) timeout->tv_sec = 0;
- if (timeout->tv_usec < 0) timeout->tv_usec = 0;
+ if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
+ *drained_p = check_signals_nogvl(th, sigwait_fd);
+ if (!orig || *orig > quantum)
+ return &quantum;
}
+
+ return orig;
}
-static int
-do_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds,
- rb_fdset_t *exceptfds, struct timeval *timeout)
+#define sigwait_signals_fd(result, cond, sigwait_fd) \
+ (result > 0 && (cond) ? (result--, (sigwait_fd)) : -1)
+
+static VALUE
+do_select(VALUE p)
{
- int MAYBE_UNUSED(result);
+ struct select_set *set = (struct select_set *)p;
+ int result = 0;
int lerrno;
- rb_fdset_t MAYBE_UNUSED(orig_read);
- rb_fdset_t MAYBE_UNUSED(orig_write);
- rb_fdset_t MAYBE_UNUSED(orig_except);
- double limit = 0;
- struct timeval wait_rest;
- rb_thread_t *th = GET_THREAD();
+ rb_hrtime_t *to, rel, end = 0;
+ timeout_prepare(&to, &rel, &end, set->timeout);
+#define restore_fdset(dst, src) \
+ ((dst) ? rb_fd_dup(dst, src) : (void)0)
#define do_select_update() \
- (restore_fdset(readfds, &orig_read), \
- restore_fdset(writefds, &orig_write), \
- restore_fdset(exceptfds, &orig_except), \
- update_timeval(timeout, limit), \
+ (restore_fdset(set->rset, &set->orig_rset), \
+ restore_fdset(set->wset, &set->orig_wset), \
+ restore_fdset(set->eset, &set->orig_eset), \
TRUE)
- if (timeout) {
- limit = timeofday();
- limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6;
- wait_rest = *timeout;
- timeout = &wait_rest;
- }
-
-#define fd_init_copy(f) \
- (f##fds) ? rb_fd_init_copy(&orig_##f, f##fds) : rb_fd_no_init(&orig_##f)
- fd_init_copy(read);
- fd_init_copy(write);
- fd_init_copy(except);
-#undef fd_init_copy
-
do {
+ int drained;
lerrno = 0;
- BLOCKING_REGION({
- result = native_fd_select(n, readfds, writefds, exceptfds,
- timeout, th);
- if (result < 0) lerrno = errno;
- }, ubf_select, th, FALSE);
-
- RUBY_VM_CHECK_INTS_BLOCKING(th);
- } while (result < 0 && retryable(errno = lerrno) && do_select_update());
-
-#define fd_term(f) if (f##fds) rb_fd_term(&orig_##f)
- fd_term(read);
- fd_term(write);
- fd_term(except);
-#undef fd_term
-
- return result;
-}
-
-static void
-rb_thread_wait_fd_rw(int fd, int read)
-{
- int result = 0;
- int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT;
-
- thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write");
+ BLOCKING_REGION(set->th, {
+ const rb_hrtime_t *sto;
+ struct timeval tv;
+
+ sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained);
+ if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
+ result = native_fd_select(set->max, set->rset, set->wset,
+ set->eset,
+ rb_hrtime2timeval(&tv, sto), set->th);
+ if (result < 0) lerrno = errno;
+ }
+ }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE);
+
+ if (set->sigwait_fd >= 0) {
+ int fd = sigwait_signals_fd(result,
+ rb_fd_isset(set->sigwait_fd, set->rset),
+ set->sigwait_fd);
+ (void)check_signals_nogvl(set->th, fd);
+ }
- if (fd < 0) {
- rb_raise(rb_eIOError, "closed stream");
- }
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
+ } while (wait_retryable(&result, lerrno, to, end) && do_select_update());
- result = rb_wait_for_single_fd(fd, events, NULL);
if (result < 0) {
- rb_sys_fail(0);
+ errno = lerrno;
}
- thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write");
+ return (VALUE)result;
}
-void
-rb_thread_wait_fd(int fd)
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
{
- rb_thread_wait_fd_rw(fd, 1);
-}
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
-int
-rb_thread_fd_writable(int fd)
-{
- rb_thread_wait_fd_rw(fd, 0);
- return TRUE;
+ return fds;
}
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
struct timeval *timeout)
{
- if (!read && !write && !except) {
- if (!timeout) {
- rb_thread_sleep_forever();
- return 0;
- }
- rb_thread_wait_for(*timeout);
- return 0;
- }
+ struct select_set set;
- if (read) {
- rb_fd_resize(max - 1, read);
- }
- if (write) {
- rb_fd_resize(max - 1, write);
- }
- if (except) {
- rb_fd_resize(max - 1, except);
+ set.th = GET_THREAD();
+ RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec);
+ set.max = max;
+ set.rset = read;
+ set.wset = write;
+ set.eset = except;
+ set.timeout = timeout;
+
+ if (!set.rset && !set.wset && !set.eset) {
+ if (!timeout) {
+ rb_thread_sleep_forever();
+ return 0;
+ }
+ rb_thread_wait_for(*timeout);
+ return 0;
+ }
+
+ set.sigwait_fd = rb_sigwait_fd_get(set.th);
+ if (set.sigwait_fd >= 0) {
+ if (set.rset)
+ rb_fd_set(set.sigwait_fd, set.rset);
+ else
+ set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
+ if (set.sigwait_fd >= set.max) {
+ set.max = set.sigwait_fd + 1;
+ }
}
- return do_select(max, read, write, except, timeout);
-}
+#define fd_init_copy(f) do { \
+ if (set.f) { \
+ rb_fd_resize(set.max - 1, set.f); \
+ if (&set.orig_##f != set.f) { /* sigwait_fd */ \
+ rb_fd_init_copy(&set.orig_##f, set.f); \
+ } \
+ } \
+ else { \
+ rb_fd_no_init(&set.orig_##f); \
+ } \
+ } while (0)
+ fd_init_copy(rset);
+ fd_init_copy(wset);
+ fd_init_copy(eset);
+#undef fd_init_copy
-/*
- * poll() is supported by many OSes, but so far Linux is the only
- * one we know of that supports using poll() in all places select()
- * would work.
- */
-#if defined(HAVE_POLL) && defined(__linux__)
-# define USE_POLL
-#endif
+ return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
+}
#ifdef USE_POLL
@@ -3833,89 +4354,95 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
#define POLLEX_SET (POLLPRI)
-#ifndef HAVE_PPOLL
-/* TODO: don't ignore sigmask */
-int
-ppoll(struct pollfd *fds, nfds_t nfds,
- const struct timespec *ts, const sigset_t *sigmask)
-{
- int timeout_ms;
-
- if (ts) {
- int tmp, tmp2;
-
- if (ts->tv_sec > INT_MAX/1000)
- timeout_ms = -1;
- else {
- tmp = (int)(ts->tv_sec * 1000);
- tmp2 = (int)(ts->tv_nsec / (1000 * 1000));
- if (INT_MAX - tmp < tmp2)
- timeout_ms = -1;
- else
- timeout_ms = (int)(tmp + tmp2);
- }
- }
- else
- timeout_ms = -1;
-
- return poll(fds, nfds, timeout_ms);
-}
+#ifndef POLLERR_SET /* defined for FreeBSD for now */
+# define POLLERR_SET (0)
#endif
-static inline void
-update_timespec(struct timespec *timeout, double limit)
-{
- if (timeout) {
- double d = limit - timeofday();
-
- timeout->tv_sec = (long)d;
- timeout->tv_nsec = (long)((d-(double)timeout->tv_sec)*1e9);
- if (timeout->tv_sec < 0) timeout->tv_sec = 0;
- if (timeout->tv_nsec < 0) timeout->tv_nsec = 0;
- }
-}
-
/*
* returns a mask of events
*/
int
-rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
+rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
- struct pollfd fds;
- int result = 0, lerrno;
- double limit = 0;
- struct timespec ts;
- struct timespec *timeout = NULL;
- rb_thread_t *th = GET_THREAD();
+ struct pollfd fds[2];
+ int result = 0;
+ int drained;
+ nfds_t nfds;
+ rb_unblock_function_t *ubf;
+ struct waiting_fd wfd;
+ int state;
+ volatile int lerrno;
-#define poll_update() \
- (update_timespec(timeout, limit), \
- TRUE)
+ wfd.th = GET_THREAD();
+ wfd.fd = fd;
- if (tv) {
- ts.tv_sec = tv->tv_sec;
- ts.tv_nsec = tv->tv_usec * 1000;
- limit = timeofday();
- limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6;
- timeout = &ts;
+ RB_VM_LOCK_ENTER();
+ {
+ list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
+
+ EC_PUSH_TAG(wfd.th->ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ rb_hrtime_t *to, rel, end = 0;
+ RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+ timeout_prepare(&to, &rel, &end, timeout);
+ fds[0].fd = fd;
+ fds[0].events = (short)events;
+ fds[0].revents = 0;
+ do {
+ fds[1].fd = rb_sigwait_fd_get(wfd.th);
+
+ if (fds[1].fd >= 0) {
+ fds[1].events = POLLIN;
+ fds[1].revents = 0;
+ nfds = 2;
+ ubf = ubf_sigwait;
+ }
+ else {
+ nfds = 1;
+ ubf = ubf_select;
+ }
+
+ lerrno = 0;
+ BLOCKING_REGION(wfd.th, {
+ const rb_hrtime_t *sto;
+ struct timespec ts;
+
+ sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
+ if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
+ result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0);
+ if (result < 0) lerrno = errno;
+ }
+ }, ubf, wfd.th, TRUE);
+
+ if (fds[1].fd >= 0) {
+ int fd1 = sigwait_signals_fd(result, fds[1].revents, fds[1].fd);
+ (void)check_signals_nogvl(wfd.th, fd1);
+ rb_sigwait_fd_put(wfd.th, fds[1].fd);
+ rb_sigwait_fd_migrate(wfd.th->vm);
+ }
+ RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+ } while (wait_retryable(&result, lerrno, to, end));
+ }
+ EC_POP_TAG();
+
+ RB_VM_LOCK_ENTER();
+ {
+ list_del(&wfd.wfd_node);
}
+ RB_VM_LOCK_LEAVE();
- fds.fd = fd;
- fds.events = (short)events;
-
- do {
- fds.revents = 0;
- lerrno = 0;
- BLOCKING_REGION({
- result = ppoll(&fds, 1, timeout, NULL);
- if (result < 0) lerrno = errno;
- }, ubf_select, th, FALSE);
+ if (state) {
+ EC_JUMP_TAG(wfd.th->ec, state);
+ }
- RUBY_VM_CHECK_INTS_BLOCKING(th);
- } while (result < 0 && retryable(errno = lerrno) && poll_update());
- if (result < 0) return -1;
+ if (result < 0) {
+ errno = lerrno;
+ return -1;
+ }
- if (fds.revents & POLLNVAL) {
+ if (fds[0].revents & POLLNVAL) {
errno = EBADF;
return -1;
}
@@ -3925,25 +4452,20 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
* Therefore we need to fix it up.
*/
result = 0;
- if (fds.revents & POLLIN_SET)
+ if (fds[0].revents & POLLIN_SET)
result |= RB_WAITFD_IN;
- if (fds.revents & POLLOUT_SET)
+ if (fds[0].revents & POLLOUT_SET)
result |= RB_WAITFD_OUT;
- if (fds.revents & POLLEX_SET)
+ if (fds[0].revents & POLLEX_SET)
result |= RB_WAITFD_PRI;
+ /* all requested events are ready if there is an error */
+ if (fds[0].revents & POLLERR_SET)
+ result |= events;
+
return result;
}
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
-static rb_fdset_t *
-init_set_fd(int fd, rb_fdset_t *fds)
-{
- rb_fd_init(fds);
- rb_fd_set(fd, fds);
-
- return fds;
-}
-
struct select_args {
union {
int fd;
@@ -3952,6 +4474,7 @@ struct select_args {
rb_fdset_t *read;
rb_fdset_t *write;
rb_fdset_t *except;
+ struct waiting_fd wfd;
struct timeval *tv;
};
@@ -3982,6 +4505,11 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;
+ RB_VM_LOCK_ENTER();
+ {
+ list_del(&args->wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
@@ -3990,7 +4518,7 @@ select_single_cleanup(VALUE ptr)
}
int
-rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
+rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
rb_fdset_t rfds, wfds, efds;
struct select_args args;
@@ -4001,7 +4529,15 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
- args.tv = tv;
+ args.tv = timeout;
+ args.wfd.fd = fd;
+ args.wfd.th = GET_THREAD();
+
+ RB_VM_LOCK_ENTER();
+ {
+ list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
+ }
+ RB_VM_LOCK_LEAVE();
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
if (r == -1)
@@ -4024,7 +4560,6 @@ rb_gc_set_stack_end(VALUE **stack_end_p)
}
#endif
-
/*
*
*/
@@ -4035,39 +4570,88 @@ rb_threadptr_check_signal(rb_thread_t *mth)
/* mth must be main_thread */
if (rb_signal_buff_size() > 0) {
/* wakeup main thread */
- rb_threadptr_trap_interrupt(mth);
+ threadptr_trap_interrupt(mth);
}
}
static void
-timer_thread_function(void *arg)
+async_bug_fd(const char *mesg, int errno_arg, int fd)
{
- rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
+ char buff[64];
+ size_t n = strlcpy(buff, mesg, sizeof(buff));
+ if (n < sizeof(buff)-3) {
+ ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
+ }
+ rb_async_bug_errno(buff, errno_arg);
+}
+
+/* VM-dependent API is not available for this function */
+static int
+consume_communication_pipe(int fd)
+{
+#if USE_EVENTFD
+ uint64_t buff[1];
+#else
+ /* buffer can be shared because no one refers to them. */
+ static char buff[1024];
+#endif
+ ssize_t result;
+ int ret = FALSE; /* for rb_sigwait_sleep */
/*
- * Tricky: thread_destruct_lock doesn't close a race against
- * vm->running_thread switch. however it guarantees th->running_thread
- * point to valid pointer or NULL.
+ * disarm UBF_TIMER before we read, because it can become
+ * re-armed at any time via sighandler and the pipe will refill
+ * We can disarm it because this thread is now processing signals
+ * and we do not want unnecessary SIGVTALRM
*/
- native_mutex_lock(&vm->thread_destruct_lock);
- /* for time slice */
- if (vm->running_thread)
- RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread);
- native_mutex_unlock(&vm->thread_destruct_lock);
-
- /* check signal */
- rb_threadptr_check_signal(vm->main_thread);
-
-#if 0
- /* prove profiler */
- if (vm->prove_profile.enable) {
- rb_thread_t *th = vm->running_thread;
-
- if (vm->during_gc) {
- /* GC prove profiling */
+ ubf_timer_disarm();
+
+ while (1) {
+ result = read(fd, buff, sizeof(buff));
+ if (result > 0) {
+ ret = TRUE;
+ if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
+ return ret;
+ }
}
- }
+ else if (result == 0) {
+ return ret;
+ }
+ else if (result < 0) {
+ int e = errno;
+ switch (e) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
#endif
+ return ret;
+ default:
+ async_bug_fd("consume_communication_pipe: read", e, fd);
+ }
+ }
+ }
+}
+
+static int
+check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
+{
+ rb_vm_t *vm = GET_VM(); /* th may be 0 */
+ int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE;
+ ubf_wakeup_all_threads();
+ ruby_sigchld_handler(vm);
+ if (rb_signal_buff_size()) {
+ if (th == vm->ractor.main_thread) {
+ /* no need to lock + wakeup if already in main thread */
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ }
+ else {
+ threadptr_trap_interrupt(vm->ractor.main_thread);
+ }
+ ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
+ }
+ return ret;
}
void
@@ -4091,46 +4675,84 @@ rb_thread_start_timer_thread(void)
rb_thread_create_timer_thread();
}
-#if defined(HAVE_WORKING_FORK)
static int
clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
{
int i;
- VALUE lines = (VALUE)val;
+ VALUE coverage = (VALUE)val;
+ VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
+ VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
- for (i = 0; i < RARRAY_LEN(lines); i++) {
- if (RARRAY_AREF(lines, i) != Qnil) {
- RARRAY_ASET(lines, i, INT2FIX(0));
+ if (lines) {
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
+ rb_ary_clear(lines);
+ }
+ else {
+ int i;
+ for (i = 0; i < RARRAY_LEN(lines); i++) {
+ if (RARRAY_AREF(lines, i) != Qnil)
+ RARRAY_ASET(lines, i, INT2FIX(0));
+ }
+ }
+ }
+ if (branches) {
+ VALUE counters = RARRAY_AREF(branches, 1);
+ for (i = 0; i < RARRAY_LEN(counters); i++) {
+ RARRAY_ASET(counters, i, INT2FIX(0));
}
}
+
return ST_CONTINUE;
}
-static void
-clear_coverage(void)
+void
+rb_clear_coverages(void)
{
VALUE coverages = rb_get_coverages();
if (RTEST(coverages)) {
- st_foreach(rb_hash_tbl_raw(coverages), clear_coverage_i, 0);
+ rb_hash_foreach(coverages, clear_coverage_i, 0);
}
}
+#if defined(HAVE_WORKING_FORK)
+
static void
rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const rb_thread_t *))
{
rb_thread_t *i = 0;
rb_vm_t *vm = th->vm;
- vm->main_thread = th;
-
- gvl_atfork(th->vm);
-
- list_for_each(&vm->living_threads, i, vmlt_node) {
- atfork(i, th);
+ rb_ractor_t *r = th->ractor;
+ vm->ractor.main_ractor = r;
+ vm->ractor.main_thread = th;
+ r->threads.main = th;
+ r->status_ = ractor_created;
+
+ gvl_atfork(rb_ractor_gvl(th->ractor));
+ ubf_list_atfork();
+
+ // OK. Only this thread accesses:
+ list_for_each(&vm->ractor.set, r, vmlr_node) {
+ list_for_each(&r->threads.set, i, lt_node) {
+ atfork(i, th);
+ }
}
rb_vm_living_threads_init(vm);
- rb_vm_living_threads_insert(vm, th);
- vm->sleeper = 0;
- clear_coverage();
+
+ rb_ractor_atfork(vm, th);
+
+ /* may be held by MJIT threads in parent */
+ rb_native_mutex_initialize(&vm->waitpid_lock);
+ rb_native_mutex_initialize(&vm->workqueue_lock);
+
+ /* may be held by any thread in parent */
+ rb_native_mutex_initialize(&th->interrupt_lock);
+
+ vm->fork_gen++;
+ rb_ractor_sleeper_threads_clear(th->ractor);
+ rb_clear_coverages();
+
+ VM_ASSERT(vm->ractor.blocking_cnt == 0);
+ VM_ASSERT(vm->ractor.cnt == 1);
}
static void
@@ -4143,15 +4765,20 @@ terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
}
}
+void rb_fiber_atfork(rb_thread_t *);
void
rb_thread_atfork(void)
{
rb_thread_t *th = GET_THREAD();
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
+ rb_fiber_atfork(th);
/* We don't want reproduce CVE-2003-0900. */
rb_reset_random_seed();
+
+ /* For child, starting MJIT worker thread in this place which is safer than immediately after `after_fork_ruby`. */
+ mjit_child_after_fork();
}
static void
@@ -4193,7 +4820,7 @@ thgroup_memsize(const void *ptr)
static const rb_data_type_t thgroup_data_type = {
"thgroup",
- {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
+ {0, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
};
@@ -4242,11 +4869,11 @@ static VALUE
thgroup_list(VALUE group)
{
VALUE ary = rb_ary_new();
- rb_vm_t *vm = GET_THREAD()->vm;
rb_thread_t *th = 0;
+ rb_ractor_t *r = GET_RACTOR();
- list_for_each(&vm->living_threads, th, vmlt_node) {
- if (th->thgroup == group) {
+ list_for_each(&r->threads.set, th, lt_node) {
+ if (th->thgroup == group) {
rb_ary_push(ary, th->self);
}
}
@@ -4264,8 +4891,8 @@ thgroup_list(VALUE group)
* New threads can still be started in an enclosed ThreadGroup.
*
* ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914>
- * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep>
- * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4>
+ * thr = Thread.new { Thread.stop } #=> #<Thread:0x402a7210 sleep>
+ * tg = ThreadGroup.new #=> #<ThreadGroup:0x402752d4>
* tg.add thr
* #=> ThreadError: can't move from the enclosed thread group
*/
@@ -4295,9 +4922,7 @@ thgroup_enclosed_p(VALUE group)
struct thgroup *data;
TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
- if (data->enclosed)
- return Qtrue;
- return Qfalse;
+ return RBOOL(data->enclosed);
}
@@ -4330,11 +4955,9 @@ thgroup_enclosed_p(VALUE group)
static VALUE
thgroup_add(VALUE group, VALUE thread)
{
- rb_thread_t *th;
+ rb_thread_t *target_th = rb_thread_ptr(thread);
struct thgroup *data;
- GetThreadPtr(thread, th);
-
if (OBJ_FROZEN(group)) {
rb_raise(rb_eThreadError, "can't move to the frozen thread group");
}
@@ -4343,20 +4966,16 @@ thgroup_add(VALUE group, VALUE thread)
rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
}
- if (!th->thgroup) {
- return Qnil;
- }
-
- if (OBJ_FROZEN(th->thgroup)) {
+ if (OBJ_FROZEN(target_th->thgroup)) {
rb_raise(rb_eThreadError, "can't move from the frozen thread group");
}
- TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data);
+ TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data);
if (data->enclosed) {
rb_raise(rb_eThreadError,
"can't move from the enclosed thread group");
}
- th->thgroup = group;
+ target_th->thgroup = group;
return group;
}
@@ -4382,16 +5001,22 @@ thread_shield_alloc(VALUE klass)
}
#define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type))
-#define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19)
+#define THREAD_SHIELD_WAITING_MASK (((FL_USER19-1)&~(FL_USER0-1))|FL_USER19)
#define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT)
-#define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT)
+#define THREAD_SHIELD_WAITING_MAX (THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)
+STATIC_ASSERT(THREAD_SHIELD_WAITING_MAX, THREAD_SHIELD_WAITING_MAX <= UINT_MAX);
+static inline unsigned int
+rb_thread_shield_waiting(VALUE b)
+{
+ return ((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT);
+}
static inline void
rb_thread_shield_waiting_inc(VALUE b)
{
unsigned int w = rb_thread_shield_waiting(b);
w++;
- if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT))
+ if (w > THREAD_SHIELD_WAITING_MAX)
rb_raise(rb_eRuntimeError, "waiting count overflow");
RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
@@ -4415,6 +5040,17 @@ rb_thread_shield_new(void)
return thread_shield;
}
+bool
+rb_thread_shield_owned(VALUE self)
+{
+ VALUE mutex = GetThreadShieldPtr(self);
+ if (!mutex) return false;
+
+ rb_mutex_t *m = mutex_ptr(mutex);
+
+ return m->fiber == GET_EC()->fiber_ptr;
+}
+
/*
* Wait a thread shield.
*
@@ -4430,8 +5066,8 @@ rb_thread_shield_wait(VALUE self)
rb_mutex_t *m;
if (!mutex) return Qfalse;
- GetMutexPtr(mutex, m);
- if (m->th == GET_THREAD()) return Qnil;
+ m = mutex_ptr(mutex);
+ if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
rb_thread_shield_waiting_inc(self);
rb_mutex_lock(mutex);
rb_thread_shield_waiting_dec(self);
@@ -4457,7 +5093,7 @@ rb_thread_shield_release(VALUE self)
{
VALUE mutex = thread_shield_get_mutex(self);
rb_mutex_unlock(mutex);
- return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_shield_waiting(self) > 0);
}
/*
@@ -4469,19 +5105,19 @@ rb_thread_shield_destroy(VALUE self)
VALUE mutex = thread_shield_get_mutex(self);
DATA_PTR(self) = 0;
rb_mutex_unlock(mutex);
- return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_shield_waiting(self) > 0);
}
static VALUE
threadptr_recursive_hash(rb_thread_t *th)
{
- return th->local_storage_recursive_hash;
+ return th->ec->local_storage_recursive_hash;
}
static void
threadptr_recursive_hash_set(rb_thread_t *th, VALUE hash)
{
- th->local_storage_recursive_hash = hash;
+ th->ec->local_storage_recursive_hash = hash;
}
ID rb_frame_last_func(void);
@@ -4507,29 +5143,29 @@ recursive_list_access(VALUE sym)
list = rb_hash_aref(hash, sym);
}
if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
- list = rb_hash_new();
+ list = rb_ident_hash_new();
rb_hash_aset(hash, sym, list);
}
return list;
}
/*
- * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already
+ * Returns Qtrue if and only if obj (or the pair <obj, paired_obj>) is already
* in the recursion list.
* Assumes the recursion list is valid.
*/
static VALUE
-recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id)
+recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id)
{
#if SIZEOF_LONG == SIZEOF_VOIDP
#define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
#elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
- #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \
+ #define OBJ_ID_EQL(obj_id, other) (RB_BIGNUM_TYPE_P((obj_id)) ? \
rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
#endif
- VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef);
+ VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
if (pair_list == Qundef)
return Qfalse;
if (paired_obj_id) {
@@ -4546,10 +5182,10 @@ recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id)
}
/*
- * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list.
- * For a single obj_id, it sets list[obj_id] to Qtrue.
- * For a pair, it sets list[obj_id] to paired_obj_id if possible,
- * otherwise list[obj_id] becomes a hash like:
+ * Pushes obj (or the pair <obj, paired_obj>) in the recursion list.
+ * For a single obj, it sets list[obj] to Qtrue.
+ * For a pair, it sets list[obj] to paired_obj_id if possible,
+ * otherwise list[obj] becomes a hash like:
* {paired_obj_id_1 => true, paired_obj_id_2 => true, ... }
* Assumes the recursion list is valid.
*/
@@ -4577,10 +5213,10 @@ recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
}
/*
- * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list.
- * For a pair, if list[obj_id] is a hash, then paired_obj_id is
+ * Pops obj (or the pair <obj, paired_obj>) from the recursion list.
+ * For a pair, if list[obj] is a hash, then paired_obj_id is
* removed from the hash and no attempt is made to simplify
- * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id
+ * list[obj] from {only_one_paired_id => true} to only_one_paired_id
* Assumes the recursion list is valid.
*/
@@ -4607,7 +5243,6 @@ struct exec_recursive_params {
VALUE (*func) (VALUE, VALUE, int);
VALUE list;
VALUE obj;
- VALUE objid;
VALUE pairid;
VALUE arg;
};
@@ -4639,54 +5274,56 @@ exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE
struct exec_recursive_params p;
int outermost;
p.list = recursive_list_access(sym);
- p.objid = rb_obj_id(obj);
p.obj = obj;
p.pairid = pairid;
p.arg = arg;
outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
- if (recursive_check(p.list, p.objid, pairid)) {
+ if (recursive_check(p.list, p.obj, pairid)) {
if (outer && !outermost) {
rb_throw_obj(p.list, p.list);
}
return (*func)(obj, arg, TRUE);
}
else {
- int state;
+ enum ruby_tag_type state;
p.func = func;
if (outermost) {
recursive_push(p.list, ID2SYM(recursive_key), 0);
- recursive_push(p.list, p.objid, p.pairid);
+ recursive_push(p.list, p.obj, p.pairid);
result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
- if (!recursive_pop(p.list, p.objid, p.pairid)) goto invalid;
+ if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid;
if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
- if (state) JUMP_TAG(state);
+ if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
if (result == p.list) {
result = (*func)(obj, arg, TRUE);
}
}
else {
volatile VALUE ret = Qundef;
- recursive_push(p.list, p.objid, p.pairid);
- PUSH_TAG();
- if ((state = EXEC_TAG()) == 0) {
+ recursive_push(p.list, p.obj, p.pairid);
+ EC_PUSH_TAG(GET_EC());
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
ret = (*func)(obj, arg, FALSE);
}
- POP_TAG();
- if (!recursive_pop(p.list, p.objid, p.pairid)) {
- invalid:
- rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
- "for %+"PRIsVALUE" in %+"PRIsVALUE,
- sym, rb_thread_current());
+ EC_POP_TAG();
+ if (!recursive_pop(p.list, p.obj, p.pairid)) {
+ goto invalid;
}
- if (state) JUMP_TAG(state);
+ if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
result = ret;
}
}
*(volatile struct exec_recursive_params *)&p;
return result;
+
+ invalid:
+ rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
+ "for %+"PRIsVALUE" in %+"PRIsVALUE,
+ sym, rb_thread_current());
+ UNREACHABLE_RETURN(Qundef);
}
/*
@@ -4708,7 +5345,7 @@ rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
VALUE
rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
{
- return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0);
+ return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0);
}
/*
@@ -4732,12 +5369,12 @@ rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
VALUE
rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
{
- return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1);
+ return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1);
}
/*
* call-seq:
- * thread.backtrace -> array
+ * thread.backtrace -> array or nil
*
* Returns the current backtrace of the target thread.
*
@@ -4766,6 +5403,16 @@ rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
return rb_vm_thread_backtrace_locations(argc, argv, thval);
}
+void
+Init_Thread_Mutex(void)
+{
+ rb_thread_t *th = GET_THREAD();
+
+ rb_native_mutex_initialize(&th->vm->waitpid_lock);
+ rb_native_mutex_initialize(&th->vm->workqueue_lock);
+ rb_native_mutex_initialize(&th->interrupt_lock);
+}
+
/*
* Document-class: ThreadError
*
@@ -4784,31 +5431,29 @@ rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
void
Init_Thread(void)
{
-#undef rb_intern
-#define rb_intern(str) rb_intern_const(str)
-
VALUE cThGroup;
rb_thread_t *th = GET_THREAD();
- sym_never = ID2SYM(rb_intern("never"));
- sym_immediate = ID2SYM(rb_intern("immediate"));
- sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
- id_locals = rb_intern("locals");
+ sym_never = ID2SYM(rb_intern_const("never"));
+ sym_immediate = ID2SYM(rb_intern_const("immediate"));
+ sym_on_blocking = ID2SYM(rb_intern_const("on_blocking"));
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0);
rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0);
- rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0);
+ rb_define_singleton_method(rb_cThread, "stop", thread_stop, 0);
rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1);
rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0);
rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
- rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
+ rb_define_singleton_method(rb_cThread, "list", thread_list, 0);
rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
rb_define_singleton_method(rb_cThread, "report_on_exception", rb_thread_s_report_exc, 0);
rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1);
+ rb_define_singleton_method(rb_cThread, "ignore_deadlock", rb_thread_s_ignore_deadlock, 0);
+ rb_define_singleton_method(rb_cThread, "ignore_deadlock=", rb_thread_s_ignore_deadlock_set, 1);
#if THREAD_DEBUG < 0
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
@@ -4828,6 +5473,7 @@ Init_Thread(void)
rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0);
rb_define_method(rb_cThread, "[]", rb_thread_aref, 1);
rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2);
+ rb_define_method(rb_cThread, "fetch", rb_thread_fetch, -1);
rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1);
rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
@@ -4843,16 +5489,18 @@ Init_Thread(void)
rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1);
rb_define_method(rb_cThread, "report_on_exception", rb_thread_report_exc, 0);
rb_define_method(rb_cThread, "report_on_exception=", rb_thread_report_exc_set, 1);
- rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0);
rb_define_method(rb_cThread, "group", rb_thread_group, 0);
rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
- rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
+ rb_define_method(rb_cThread, "native_thread_id", rb_thread_native_thread_id, 0);
+ rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);
+ rb_define_alias(rb_cThread, "inspect", "to_s");
- rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError, "stream closed");
+ rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError,
+ "stream closed in another thread");
cThGroup = rb_define_class("ThreadGroup", rb_cObject);
rb_define_alloc_func(cThGroup, thgroup_s_alloc);
@@ -4862,11 +5510,10 @@ Init_Thread(void)
rb_define_method(cThGroup, "add", thgroup_add, 1);
{
- th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
+ th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
rb_define_const(cThGroup, "Default", th->thgroup);
}
- recursive_key = rb_intern("__recursive_key__");
rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
/* init thread core */
@@ -4874,26 +5521,17 @@ Init_Thread(void)
/* main thread setting */
{
/* acquire global vm lock */
- gvl_init(th->vm);
- gvl_acquire(th->vm, th);
- native_mutex_initialize(&th->vm->thread_destruct_lock);
- native_mutex_initialize(&th->interrupt_lock);
- native_cond_initialize(&th->interrupt_cond,
- RB_CONDATTR_CLOCK_MONOTONIC);
+ rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor);
+ gvl_acquire(gvl, th);
th->pending_interrupt_queue = rb_ary_tmp_new(0);
th->pending_interrupt_queue_checked = 0;
th->pending_interrupt_mask_stack = rb_ary_tmp_new(0);
-
- th->interrupt_mask = 0;
}
}
rb_thread_create_timer_thread();
- /* suppress warnings on cygwin, mingw and mswin.*/
- (void)native_mutex_trylock;
-
Init_thread_sync();
}
@@ -4905,135 +5543,323 @@ ruby_native_thread_p(void)
return th != 0;
}
-VALUE rb_vm_backtrace_str_ary(rb_thread_t *th, long lev, long n);
static void
-debug_deadlock_check(rb_vm_t *vm, VALUE msg)
+debug_deadlock_check(rb_ractor_t *r, VALUE msg)
{
rb_thread_t *th = 0;
VALUE sep = rb_str_new_cstr("\n ");
rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n",
- vm_living_thread_num(vm), vm->sleeper, GET_THREAD(), vm->main_thread);
- list_for_each(&vm->living_threads, th, vmlt_node) {
- rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
- "native:%"PRI_THREAD_ID" int:%u",
- th->self, th, thread_id_str(th), th->interrupt_flag);
- if (th->locking_mutex) {
- rb_mutex_t *mutex;
- struct rb_thread_struct volatile *mth;
- int waiting;
- GetMutexPtr(th->locking_mutex, mutex);
-
- native_mutex_lock(&mutex->lock);
- mth = mutex->th;
- waiting = mutex->cond_waiting;
- native_mutex_unlock(&mutex->lock);
- rb_str_catf(msg, " mutex:%p cond:%d", mth, waiting);
- }
- {
- rb_thread_list_t *list = th->join_list;
- while (list) {
- rb_str_catf(msg, "\n depended by: tb_thread_id:%p", list->th);
- list = list->next;
- }
- }
- rb_str_catf(msg, "\n ");
- rb_str_concat(msg, rb_ary_join(rb_vm_backtrace_str_ary(th, 0, 0), sep));
- rb_str_catf(msg, "\n");
+ rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r),
+ (void *)GET_THREAD(), (void *)r->threads.main);
+
+ list_for_each(&r->threads.set, th, lt_node) {
+ rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
+ "native:%"PRI_THREAD_ID" int:%u",
+ th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
+
+ if (th->locking_mutex) {
+ rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
+ rb_str_catf(msg, " mutex:%p cond:%"PRIuSIZE,
+ (void *)mutex->fiber, rb_mutex_num_waiting(mutex));
+ }
+
+ {
+ struct rb_waiting_list *list = th->join_list;
+ while (list) {
+ rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread);
+ list = list->next;
+ }
+ }
+ rb_str_catf(msg, "\n ");
+ rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, 0, 0), sep));
+ rb_str_catf(msg, "\n");
}
}
static void
-rb_check_deadlock(rb_vm_t *vm)
+rb_check_deadlock(rb_ractor_t *r)
{
+ if (GET_THREAD()->vm->thread_ignore_deadlock) return;
+
int found = 0;
- rb_thread_t *th = 0;
+ rb_thread_t *th = NULL;
+ int sleeper_num = rb_ractor_sleeper_thread_num(r);
+ int ltnum = rb_ractor_living_thread_num(r);
- if (vm_living_thread_num(vm) > vm->sleeper) return;
- if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
+ if (ltnum > sleeper_num) return;
+ if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
if (patrol_thread && patrol_thread != GET_THREAD()) return;
- list_for_each(&vm->living_threads, th, vmlt_node) {
- if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) {
- found = 1;
- }
- else if (th->locking_mutex) {
- rb_mutex_t *mutex;
- GetMutexPtr(th->locking_mutex, mutex);
-
- native_mutex_lock(&mutex->lock);
- if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) {
- found = 1;
- }
- native_mutex_unlock(&mutex->lock);
- }
- if (found)
- break;
+ list_for_each(&r->threads.set, th, lt_node) {
+ if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
+ found = 1;
+ }
+ else if (th->locking_mutex) {
+ rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
+ if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !list_empty(&mutex->waitq))) {
+ found = 1;
+ }
+ }
+ if (found)
+ break;
}
if (!found) {
VALUE argv[2];
argv[0] = rb_eFatal;
argv[1] = rb_str_new2("No live threads left. Deadlock?");
- debug_deadlock_check(vm, argv[1]);
- vm->sleeper--;
- rb_threadptr_raise(vm->main_thread, 2, argv);
+ debug_deadlock_check(r, argv[1]);
+ rb_ractor_sleeper_threads_dec(GET_RACTOR());
+ rb_threadptr_raise(r->threads.main, 2, argv);
}
}
static void
-update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass)
+update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
{
- VALUE coverage = rb_iseq_coverage(GET_THREAD()->cfp->iseq);
+ const rb_control_frame_t *cfp = GET_EC()->cfp;
+ VALUE coverage = rb_iseq_coverage(cfp->iseq);
if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
- long line = rb_sourceline() - 1;
- long count;
- VALUE num;
- if (line >= RARRAY_LEN(coverage)) { /* no longer tracked */
- return;
+ VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
+ if (lines) {
+ long line = rb_sourceline() - 1;
+ long count;
+ VALUE num;
+ void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
+ rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - cfp->iseq->body->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
+ rb_ary_push(lines, LONG2FIX(line + 1));
+ return;
+ }
+ if (line >= RARRAY_LEN(lines)) { /* no longer tracked */
+ return;
+ }
+ num = RARRAY_AREF(lines, line);
+ if (!FIXNUM_P(num)) return;
+ count = FIX2LONG(num) + 1;
+ if (POSFIXABLE(count)) {
+ RARRAY_ASET(lines, line, LONG2FIX(count));
+ }
}
- num = RARRAY_AREF(coverage, line);
- if (!FIXNUM_P(num)) return;
- count = FIX2LONG(num) + 1;
- if (POSFIXABLE(count)) {
- RARRAY_ASET(coverage, line, LONG2FIX(count));
+ }
+}
+
+static void
+update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
+{
+ const rb_control_frame_t *cfp = GET_EC()->cfp;
+ VALUE coverage = rb_iseq_coverage(cfp->iseq);
+ if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
+ VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
+ if (branches) {
+ long pc = cfp->pc - cfp->iseq->body->iseq_encoded - 1;
+ long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count;
+ VALUE counters = RARRAY_AREF(branches, 1);
+ VALUE num = RARRAY_AREF(counters, idx);
+ count = FIX2LONG(num) + 1;
+ if (POSFIXABLE(count)) {
+ RARRAY_ASET(counters, idx, LONG2FIX(count));
+ }
}
}
}
+const rb_method_entry_t *
+rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5])
+{
+ VALUE path, beg_pos_lineno, beg_pos_column, end_pos_lineno, end_pos_column;
+
+ if (!me->def) return NULL; // negative cme
+
+ retry:
+ switch (me->def->type) {
+ case VM_METHOD_TYPE_ISEQ: {
+ const rb_iseq_t *iseq = me->def->body.iseq.iseqptr;
+ rb_iseq_location_t *loc = &iseq->body->location;
+ path = rb_iseq_path(iseq);
+ beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
+ beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
+ end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
+ end_pos_column = INT2FIX(loc->code_location.end_pos.column);
+ break;
+ }
+ case VM_METHOD_TYPE_BMETHOD: {
+ const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0);
+ if (iseq) {
+ rb_iseq_location_t *loc;
+ rb_iseq_check(iseq);
+ path = rb_iseq_path(iseq);
+ loc = &iseq->body->location;
+ beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
+ beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
+ end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
+ end_pos_column = INT2FIX(loc->code_location.end_pos.column);
+ break;
+ }
+ return NULL;
+ }
+ case VM_METHOD_TYPE_ALIAS:
+ me = me->def->body.alias.original_me;
+ goto retry;
+ case VM_METHOD_TYPE_REFINED:
+ me = me->def->body.refined.orig_me;
+ if (!me) return NULL;
+ goto retry;
+ default:
+ return NULL;
+ }
+
+ /* found */
+ if (RB_TYPE_P(path, T_ARRAY)) {
+ path = rb_ary_entry(path, 1);
+ if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */
+ }
+ if (resolved_location) {
+ resolved_location[0] = path;
+ resolved_location[1] = beg_pos_lineno;
+ resolved_location[2] = beg_pos_column;
+ resolved_location[3] = end_pos_lineno;
+ resolved_location[4] = end_pos_column;
+ }
+ return me;
+}
+
+static void
+update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg)
+{
+ const rb_control_frame_t *cfp = GET_EC()->cfp;
+ const rb_callable_method_entry_t *cme = rb_vm_frame_method_entry(cfp);
+ const rb_method_entry_t *me = (const rb_method_entry_t *)cme;
+ VALUE rcount;
+ long count;
+
+ me = rb_resolve_me_location(me, 0);
+ if (!me) return;
+
+ rcount = rb_hash_aref(me2counter, (VALUE) me);
+ count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1;
+ if (POSFIXABLE(count)) {
+ rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count));
+ }
+}
+
VALUE
rb_get_coverages(void)
{
return GET_VM()->coverages;
}
+int
+rb_get_coverage_mode(void)
+{
+ return GET_VM()->coverage_mode;
+}
+
void
-rb_set_coverages(VALUE coverages)
+rb_set_coverages(VALUE coverages, int mode, VALUE me2counter)
{
GET_VM()->coverages = coverages;
- rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil);
+ GET_VM()->me2counter = me2counter;
+ GET_VM()->coverage_mode = mode;
}
-/* Make coverage arrays empty so old covered files are no longer tracked. */
-static int
-reset_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
+void
+rb_resume_coverages(void)
{
- VALUE coverage = (VALUE)val;
- rb_ary_clear(coverage);
- return ST_CONTINUE;
+ int mode = GET_VM()->coverage_mode;
+ VALUE me2counter = GET_VM()->me2counter;
+ rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ if (mode & COVERAGE_TARGET_BRANCHES) {
+ rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ }
+ if (mode & COVERAGE_TARGET_METHODS) {
+ rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ }
}
void
+rb_suspend_coverages(void)
+{
+ rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage);
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) {
+ rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage);
+ }
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) {
+ rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage);
+ }
+}
+
+/* Make coverage arrays empty so old covered files are no longer tracked. */
+void
rb_reset_coverages(void)
{
- VALUE coverages = rb_get_coverages();
- st_foreach(rb_hash_tbl_raw(coverages), reset_coverage_i, 0);
+ rb_clear_coverages();
+ rb_iseq_remove_coverage_all();
GET_VM()->coverages = Qfalse;
- rb_remove_event_hook(update_coverage);
}
VALUE
-rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
+rb_default_coverage(int n)
+{
+ VALUE coverage = rb_ary_tmp_new_fill(3);
+ VALUE lines = Qfalse, branches = Qfalse;
+ int mode = GET_VM()->coverage_mode;
+
+ if (mode & COVERAGE_TARGET_LINES) {
+ lines = n > 0 ? rb_ary_tmp_new_fill(n) : rb_ary_tmp_new(0);
+ }
+ RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines);
+
+ if (mode & COVERAGE_TARGET_BRANCHES) {
+ branches = rb_ary_tmp_new_fill(2);
+ /* internal data structures for branch coverage:
+ *
+ * { branch base node =>
+ * [base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column, {
+ * branch target id =>
+ * [target_type, target_first_lineno, target_first_column, target_last_lineno, target_last_column, target_counter_index],
+ * ...
+ * }],
+ * ...
+ * }
+ *
+ * Example:
+ * { NODE_CASE =>
+ * [1, 0, 4, 3, {
+ * NODE_WHEN => [2, 8, 2, 9, 0],
+ * NODE_WHEN => [3, 8, 3, 9, 1],
+ * ...
+ * }],
+ * ...
+ * }
+ */
+ VALUE structure = rb_hash_new();
+ rb_obj_hide(structure);
+ RARRAY_ASET(branches, 0, structure);
+ /* branch execution counters */
+ RARRAY_ASET(branches, 1, rb_ary_tmp_new(0));
+ }
+ RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches);
+
+ return coverage;
+}
+
+static VALUE
+uninterruptible_exit(VALUE v)
+{
+ rb_thread_t *cur_th = GET_THREAD();
+ rb_ary_pop(cur_th->pending_interrupt_mask_stack);
+
+ cur_th->pending_interrupt_queue_checked = 0;
+ if (!rb_threadptr_pending_interrupt_empty_p(cur_th)) {
+ RUBY_VM_SET_INTERRUPT(cur_th->ec);
+ }
+ return Qnil;
+}
+
+VALUE
+rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
{
VALUE interrupt_mask = rb_ident_hash_new();
rb_thread_t *cur_th = GET_THREAD();
@@ -5042,28 +5868,8 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
OBJ_FREEZE_RAW(interrupt_mask);
rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
- return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack);
-}
+ VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil);
-void
-ruby_kill(rb_pid_t pid, int sig)
-{
- int err;
- rb_thread_t *th = GET_THREAD();
-
- /*
- * When target pid is self, many caller assume signal will be
- * delivered immediately and synchronously.
- */
- {
- GVL_UNLOCK_BEGIN();
- native_mutex_lock(&th->interrupt_lock);
- err = kill(pid, sig);
- native_cond_wait(&th->interrupt_cond, &th->interrupt_lock);
- native_mutex_unlock(&th->interrupt_lock);
- GVL_UNLOCK_END();
- }
- if (err < 0) {
- rb_sys_fail(0);
- }
+ RUBY_VM_CHECK_INTS(cur_th->ec);
+ return ret;
}