summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c2551
1 files changed, 1285 insertions, 1266 deletions
diff --git a/thread.c b/thread.c
index 1511977d4d..196f9092b4 100644
--- a/thread.c
+++ b/thread.c
@@ -70,13 +70,15 @@
# include <alloca.h>
#endif
+#define TH_SCHED(th) (&(th)->ractor->threads.sched)
+
#include "eval_intern.h"
-#include "gc.h"
#include "hrtime.h"
#include "internal.h"
#include "internal/class.h"
#include "internal/cont.h"
#include "internal/error.h"
+#include "internal/gc.h"
#include "internal/hash.h"
#include "internal/io.h"
#include "internal/object.h"
@@ -87,7 +89,7 @@
#include "internal/time.h"
#include "internal/warnings.h"
#include "iseq.h"
-#include "mjit.h"
+#include "rjit.h"
#include "ruby/debug.h"
#include "ruby/io.h"
#include "ruby/thread.h"
@@ -98,27 +100,22 @@
#include "vm_debug.h"
#include "vm_sync.h"
+#if USE_RJIT && defined(HAVE_SYS_WAIT_H)
+#include <sys/wait.h>
+#endif
+
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
#define RUBY_THREAD_PRIORITY_MAX 3
#define RUBY_THREAD_PRIORITY_MIN -3
#endif
-#ifndef THREAD_DEBUG
-#define THREAD_DEBUG 0
-#endif
-
static VALUE rb_cThreadShield;
static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
-enum SLEEP_FLAGS {
- SLEEP_DEADLOCKABLE = 0x1,
- SLEEP_SPURIOUS_CHECK = 0x2
-};
-
#define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13
#define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED)
@@ -132,27 +129,33 @@ rb_thread_local_storage(VALUE thread)
return rb_ivar_get(thread, idLocals);
}
-static void sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
+enum SLEEP_FLAGS {
+ SLEEP_DEADLOCKABLE = 0x01,
+ SLEEP_SPURIOUS_CHECK = 0x02,
+ SLEEP_ALLOW_SPURIOUS = 0x04,
+ SLEEP_NO_CHECKINTS = 0x08,
+};
+
static void sleep_forever(rb_thread_t *th, unsigned int fl);
-static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker);
+static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
+
+static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end);
static int rb_threadptr_dead(rb_thread_t *th);
static void rb_check_deadlock(rb_ractor_t *r);
static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
static const char *thread_status_name(rb_thread_t *th, int detail);
static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
-static int consume_communication_pipe(int fd);
-static int check_signals_nogvl(rb_thread_t *, int sigwait_fd);
-void rb_sigwait_fd_migrate(rb_vm_t *); /* process.c */
+MAYBE_UNUSED(static int consume_communication_pipe(int fd));
-#define eKillSignal INT2FIX(0)
-#define eTerminateSignal INT2FIX(1)
static volatile int system_working = 1;
+static rb_internal_thread_specific_key_t specific_key_count;
struct waiting_fd {
- struct list_node wfd_node; /* <=> vm.waiting_fds */
+ struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
rb_thread_t *th;
int fd;
+ struct rb_io_close_wait_list *busy;
};
/********************************************************************************/
@@ -167,15 +170,16 @@ static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, vo
static void unblock_function_clear(rb_thread_t *th);
static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
- rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
+ rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
-#define GVL_UNLOCK_BEGIN(th) do { \
- RB_GC_SAVE_MACHINE_CONTEXT(th); \
- gvl_release(rb_ractor_gvl(th->ractor));
+#define THREAD_BLOCKING_BEGIN(th) do { \
+ struct rb_thread_sched * const sched = TH_SCHED(th); \
+ RB_VM_SAVE_MACHINE_CONTEXT(th); \
+ thread_sched_to_waiting((sched), (th));
-#define GVL_UNLOCK_END(th) \
- gvl_acquire(rb_ractor_gvl(th->ractor), th); \
+#define THREAD_BLOCKING_END(th) \
+ thread_sched_to_running((sched), (th)); \
rb_ractor_thread_switch(th->ractor, th); \
} while(0)
@@ -191,10 +195,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
#define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \
struct rb_blocking_region_buffer __region; \
if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
- /* always return true unless fail_if_interrupted */ \
- !only_if_constant(fail_if_interrupted, TRUE)) { \
- exec; \
- blocking_region_end(th, &__region); \
+ /* always return true unless fail_if_interrupted */ \
+ !only_if_constant(fail_if_interrupted, TRUE)) { \
+ exec; \
+ blocking_region_end(th, &__region); \
}; \
} while(0)
@@ -209,11 +213,11 @@ vm_check_ints_blocking(rb_execution_context_t *ec)
rb_thread_t *th = rb_ec_thread_ptr(ec);
if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
- if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE;
+ if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE;
}
else {
- th->pending_interrupt_queue_checked = 0;
- RUBY_VM_SET_INTERRUPT(ec);
+ th->pending_interrupt_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(ec);
}
return rb_threadptr_execute_interrupts(th, 1);
}
@@ -254,123 +258,11 @@ timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
}
}
-#if THREAD_DEBUG
-#ifdef HAVE_VA_ARGS_MACRO
-void rb_thread_debug(const char *file, int line, const char *fmt, ...);
-#define thread_debug(...) rb_thread_debug(__FILE__, __LINE__, __VA_ARGS__)
-#define POSITION_FORMAT "%s:%d:"
-#define POSITION_ARGS ,file, line
-#else
-void rb_thread_debug(const char *fmt, ...);
-#define thread_debug rb_thread_debug
-#define POSITION_FORMAT
-#define POSITION_ARGS
-#endif
-
-# ifdef NON_SCALAR_THREAD_ID
-#define fill_thread_id_string ruby_fill_thread_id_string
-const char *
-ruby_fill_thread_id_string(rb_nativethread_id_t thid, rb_thread_id_string_t buf)
-{
- extern const char ruby_digitmap[];
- size_t i;
-
- buf[0] = '0';
- buf[1] = 'x';
- for (i = 0; i < sizeof(thid); i++) {
-# ifdef LITTLE_ENDIAN
- size_t j = sizeof(thid) - i - 1;
-# else
- size_t j = i;
-# endif
- unsigned char c = (unsigned char)((char *)&thid)[j];
- buf[2 + i * 2] = ruby_digitmap[(c >> 4) & 0xf];
- buf[3 + i * 2] = ruby_digitmap[c & 0xf];
- }
- buf[sizeof(rb_thread_id_string_t)-1] = '\0';
- return buf;
-}
-# define fill_thread_id_str(th) fill_thread_id_string((th)->thread_id, (th)->thread_id_string)
-# define thread_id_str(th) ((th)->thread_id_string)
-# define PRI_THREAD_ID "s"
-# endif
-
-# if THREAD_DEBUG < 0
-static int rb_thread_debug_enabled;
+MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)));
+MAYBE_UNUSED(static bool th_has_dedicated_nt(const rb_thread_t *th));
+MAYBE_UNUSED(static int waitfd_to_waiting_flag(int wfd_event));
-/*
- * call-seq:
- * Thread.DEBUG -> num
- *
- * Returns the thread debug level. Available only if compiled with
- * THREAD_DEBUG=-1.
- */
-
-static VALUE
-rb_thread_s_debug(VALUE _)
-{
- return INT2NUM(rb_thread_debug_enabled);
-}
-
-/*
- * call-seq:
- * Thread.DEBUG = num
- *
- * Sets the thread debug level. Available only if compiled with
- * THREAD_DEBUG=-1.
- */
-
-static VALUE
-rb_thread_s_debug_set(VALUE self, VALUE val)
-{
- rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0;
- return val;
-}
-# else
-# define rb_thread_debug_enabled THREAD_DEBUG
-# endif
-#else
-#define thread_debug if(0)printf
-#endif
-
-#ifndef fill_thread_id_str
-# define fill_thread_id_string(thid, buf) ((void *)(uintptr_t)(thid))
-# define fill_thread_id_str(th) (void)0
-# define thread_id_str(th) ((void *)(uintptr_t)(th)->thread_id)
-# define PRI_THREAD_ID "p"
-#endif
-
-NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start));
-void ruby_sigchld_handler(rb_vm_t *); /* signal.c */
-
-static void
-ubf_sigwait(void *ignore)
-{
- rb_thread_wakeup_timer_thread(0);
-}
-
-#if defined(_WIN32)
-#include "thread_win32.c"
-
-#define DEBUG_OUT() \
- WaitForSingleObject(&debug_mutex, INFINITE); \
- printf(POSITION_FORMAT"%#lx - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \
- fflush(stdout); \
- ReleaseMutex(&debug_mutex);
-
-#elif defined(HAVE_PTHREAD_H)
-#include "thread_pthread.c"
-
-#define DEBUG_OUT() \
- pthread_mutex_lock(&debug_mutex); \
- printf(POSITION_FORMAT"%"PRI_THREAD_ID" - %s" POSITION_ARGS, \
- fill_thread_id_string(pthread_self(), thread_id_string), buf); \
- fflush(stdout); \
- pthread_mutex_unlock(&debug_mutex);
-
-#else
-#error "unsupported thread type"
-#endif
+#include THREAD_IMPL_SRC
/*
* TODO: somebody with win32 knowledge should be able to get rid of
@@ -385,48 +277,9 @@ ubf_sigwait(void *ignore)
# define USE_EVENTFD (0)
#endif
-#if THREAD_DEBUG
-static int debug_mutex_initialized = 1;
-static rb_nativethread_lock_t debug_mutex;
-
-void
-rb_thread_debug(
-#ifdef HAVE_VA_ARGS_MACRO
- const char *file, int line,
-#endif
- const char *fmt, ...)
-{
- va_list args;
- char buf[BUFSIZ];
-#ifdef NON_SCALAR_THREAD_ID
- rb_thread_id_string_t thread_id_string;
-#endif
-
- if (!rb_thread_debug_enabled) return;
-
- if (debug_mutex_initialized == 1) {
- debug_mutex_initialized = 0;
- rb_native_mutex_initialize(&debug_mutex);
- }
-
- va_start(args, fmt);
- vsnprintf(buf, BUFSIZ, fmt, args);
- va_end(args);
-
- DEBUG_OUT();
-}
-#endif
-
#include "thread_sync.c"
void
-rb_vm_gvl_destroy(rb_global_vm_lock_t *gvl)
-{
- gvl_release(gvl);
- gvl_destroy(gvl);
-}
-
-void
rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock)
{
rb_native_mutex_initialize(lock);
@@ -454,14 +307,14 @@ static int
unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted)
{
do {
- if (fail_if_interrupted) {
- if (RUBY_VM_INTERRUPTED_ANY(th->ec)) {
- return FALSE;
- }
- }
- else {
- RUBY_VM_CHECK_INTS(th->ec);
- }
+ if (fail_if_interrupted) {
+ if (RUBY_VM_INTERRUPTED_ANY(th->ec)) {
+ return FALSE;
+ }
+ }
+ else {
+ RUBY_VM_CHECK_INTS(th->ec);
+ }
rb_native_mutex_lock(&th->interrupt_lock);
} while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) &&
@@ -487,19 +340,23 @@ unblock_function_clear(rb_thread_t *th)
static void
rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
{
+ RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
+
rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ if (trap) {
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ }
+ else {
+ RUBY_VM_SET_INTERRUPT(th->ec);
+ }
- if (trap) {
- RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
- }
- else {
- RUBY_VM_SET_INTERRUPT(th->ec);
- }
- if (th->unblock.func != NULL) {
- (th->unblock.func)(th->unblock.arg);
- }
- else {
- /* none */
+ if (th->unblock.func != NULL) {
+ (th->unblock.func)(th->unblock.arg);
+ }
+ else {
+ /* none */
+ }
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
@@ -507,6 +364,7 @@ rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
void
rb_threadptr_interrupt(rb_thread_t *th)
{
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
rb_threadptr_interrupt_common(th, 0);
}
@@ -521,18 +379,18 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
{
rb_thread_t *th = 0;
- list_for_each(&r->threads.set, th, lt_node) {
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
if (th != main_thread) {
- thread_debug("terminate_all: begin (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(th), thread_status_name(th, TRUE));
- rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
- rb_threadptr_interrupt(th);
- thread_debug("terminate_all: end (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(th), thread_status_name(th, TRUE));
- }
- else {
- thread_debug("terminate_all: main thread (%p)\n", (void *)th);
- }
+ RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
+
+ rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED);
+ rb_threadptr_interrupt(th);
+
+ RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
+ }
+ else {
+ RUBY_DEBUG_LOG("main thread th:%u", rb_th_serial(th));
+ }
}
}
@@ -547,18 +405,19 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread)
rb_thread_t *target_thread = join_list->thread;
- if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
+ if (target_thread->scheduler != Qnil && join_list->fiber) {
rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
}
else {
rb_threadptr_interrupt(target_thread);
switch (target_thread->status) {
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- target_thread->status = THREAD_RUNNABLE;
- default:
- break;
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ target_thread->status = THREAD_RUNNABLE;
+ break;
+ default:
+ break;
}
}
}
@@ -571,7 +430,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
rb_mutex_t *mutex = th->keeping_mutexes;
th->keeping_mutexes = mutex->next_mutex;
- /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */
+ // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
@@ -596,31 +455,32 @@ rb_thread_terminate_all(rb_thread_t *th)
EC_PUSH_TAG(ec);
if (EC_EXEC_TAG() == TAG_NONE) {
retry:
- thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
- terminate_all(cr, th);
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+
+ terminate_all(cr, th);
- while (rb_ractor_living_thread_num(cr) > 1) {
+ while (rb_ractor_living_thread_num(cr) > 1) {
rb_hrtime_t rel = RB_HRTIME_PER_SEC;
- /*q
- * Thread exiting routine in thread_start_func_2 notify
- * me when the last sub-thread exit.
- */
- sleeping = 1;
- native_sleep(th, &rel);
- RUBY_VM_CHECK_INTS_BLOCKING(ec);
- sleeping = 0;
- }
+ /*q
+ * Thread exiting routine in thread_start_func_2 notify
+ * me when the last sub-thread exit.
+ */
+ sleeping = 1;
+ native_sleep(th, &rel);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ sleeping = 0;
+ }
}
else {
- /*
- * When caught an exception (e.g. Ctrl+C), let's broadcast
- * kill request again to ensure killing all threads even
- * if they are blocked on sleep, mutex, etc.
- */
- if (sleeping) {
- sleeping = 0;
- goto retry;
- }
+ /*
+ * When caught an exception (e.g. Ctrl+C), let's broadcast
+ * kill request again to ensure killing all threads even
+ * if they are blocked on sleep, mutex, etc.
+ */
+ if (sleeping) {
+ sleeping = 0;
+ goto retry;
+ }
}
EC_POP_TAG();
}
@@ -632,6 +492,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
{
rb_thread_t *th = th_ptr;
th->status = THREAD_KILLED;
+
// The thread stack doesn't exist in the forked process:
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
@@ -650,25 +511,22 @@ thread_cleanup_func(void *th_ptr, int atfork)
* Unfortunately, we can't release native threading resource at fork
* because libc may have unstable locking state therefore touching
* a threading resource may cause a deadlock.
- *
- * FIXME: Skipping native_mutex_destroy(pthread_mutex_destroy) is safe
- * with NPTL, but native_thread_destroy calls pthread_cond_destroy
- * which calls free(3), so there is a small memory leak atfork, here.
*/
- if (atfork)
- return;
+ if (atfork) {
+ th->nt = NULL;
+ return;
+ }
rb_native_mutex_destroy(&th->interrupt_lock);
- native_thread_destroy(th);
}
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
static VALUE rb_thread_to_s(VALUE thread);
void
-ruby_thread_init_stack(rb_thread_t *th)
+ruby_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame)
{
- native_thread_init_stack(th);
+ native_thread_init_stack(th, local_in_parent_frame);
}
const VALUE *
@@ -677,10 +535,10 @@ rb_vm_proc_local_ep(VALUE proc)
const VALUE *ep = vm_proc_ep(proc);
if (ep) {
- return rb_vm_ep_local_ep(ep);
+ return rb_vm_ep_local_ep(ep);
}
else {
- return NULL;
+ return NULL;
}
}
@@ -688,7 +546,7 @@ rb_vm_proc_local_ep(VALUE proc)
VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
-static void
+static VALUE
thread_do_start_proc(rb_thread_t *th)
{
VALUE args = th->invoke_arg.proc.args;
@@ -702,7 +560,6 @@ thread_do_start_proc(rb_thread_t *th)
th->ec->root_lep = rb_vm_proc_local_ep(procval);
th->ec->root_svar = Qfalse;
- EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
vm_check_ints_blocking(th->ec);
if (th->invoke_type == thread_invoke_type_ractor_proc) {
@@ -713,18 +570,19 @@ thread_do_start_proc(rb_thread_t *th)
rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
vm_check_ints_blocking(th->ec);
- // kick thread
- th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self,
- args_len, args_ptr,
- th->invoke_arg.proc.kw_splat,
- VM_BLOCK_HANDLER_NONE);
+ return rb_vm_invoke_proc_with_self(
+ th->ec, proc, self,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE
+ );
}
else {
args_len = RARRAY_LENINT(args);
if (args_len < 8) {
/* free proc.args if the length is enough small */
args_ptr = ALLOCA_N(VALUE, args_len);
- MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR_TRANSIENT(args), VALUE, args_len);
+ MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR(args), VALUE, args_len);
th->invoke_arg.proc.args = Qnil;
}
else {
@@ -733,63 +591,53 @@ thread_do_start_proc(rb_thread_t *th)
vm_check_ints_blocking(th->ec);
- // kick thread
- th->value = rb_vm_invoke_proc(th->ec, proc,
- args_len, args_ptr,
- th->invoke_arg.proc.kw_splat,
- VM_BLOCK_HANDLER_NONE);
- }
-
- EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
-
- if (th->invoke_type == thread_invoke_type_ractor_proc) {
- rb_ractor_atexit(th->ec, th->value);
+ return rb_vm_invoke_proc(
+ th->ec, proc,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE
+ );
}
}
-static void
+static VALUE
thread_do_start(rb_thread_t *th)
{
native_set_thread_name(th);
+ VALUE result = Qundef;
switch (th->invoke_type) {
case thread_invoke_type_proc:
+ result = thread_do_start_proc(th);
+ break;
+
case thread_invoke_type_ractor_proc:
- thread_do_start_proc(th);
+ result = thread_do_start_proc(th);
+ rb_ractor_atexit(th->ec, result);
break;
+
case thread_invoke_type_func:
- th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
+ result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
break;
+
case thread_invoke_type_none:
rb_bug("unreachable");
}
- rb_fiber_scheduler_set(Qnil);
+ return result;
}
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
-// io.c
-VALUE rb_io_prep_stdin(void);
-VALUE rb_io_prep_stdout(void);
-VALUE rb_io_prep_stderr(void);
-
static int
thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
{
- STACK_GROW_DIR_DETECTION;
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+ VM_ASSERT(th != th->vm->ractor.main_thread);
+
enum ruby_tag_type state;
VALUE errinfo = Qnil;
- size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_thread_t *ractor_main_th = th->ractor->threads.main;
- VALUE * vm_stack = NULL;
-
- VM_ASSERT(th != th->vm->ractor.main_thread);
- thread_debug("thread start: %p\n", (void *)th);
-
- // setup native thread
- gvl_acquire(rb_ractor_gvl(th->ractor), th);
- ruby_thread_set_native(th);
// setup ractor
if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
@@ -804,27 +652,39 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
RB_VM_UNLOCK();
}
- // This assertion is not passed on win32 env. Check it later.
- // VM_ASSERT((size * sizeof(VALUE)) <= th->ec->machine.stack_maxsize);
-
- // setup VM and machine stack
- vm_stack = alloca(size * sizeof(VALUE));
- VM_ASSERT(vm_stack);
+ // Ensure that we are not joinable.
+ VM_ASSERT(UNDEF_P(th->value));
- rb_ec_initialize_vm_stack(th->ec, vm_stack, size);
- th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
- th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
-
- thread_debug("thread start (get lock): %p\n", (void *)th);
+ int fiber_scheduler_closed = 0, event_thread_end_hooked = 0;
+ VALUE result = Qundef;
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- SAVE_ROOT_JMPBUF(th, thread_do_start(th));
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
+
+ result = thread_do_start(th);
}
- else {
+
+ if (!fiber_scheduler_closed) {
+ fiber_scheduler_closed = 1;
+ rb_fiber_scheduler_set(Qnil);
+ }
+
+ if (!event_thread_end_hooked) {
+ event_thread_end_hooked = 1;
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
+ }
+
+ if (state == TAG_NONE) {
+ // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed.
+ th->value = result;
+ } else {
errinfo = th->ec->errinfo;
+ VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef);
+ if (!NIL_P(exc)) errinfo = exc;
+
if (state == TAG_FATAL) {
if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_ractor_atexit(th->ec, Qnil);
@@ -857,13 +717,19 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
th->value = Qnil;
}
+ // The thread is effectively finished and can be joined.
+ VM_ASSERT(!UNDEF_P(th->value));
+
+ rb_threadptr_join_list_wakeup(th);
+ rb_threadptr_unlock_all_locking_mutexes(th);
+
if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_thread_terminate_all(th);
rb_ractor_teardown(th->ec);
}
th->status = THREAD_KILLED;
- thread_debug("thread end: %p\n", (void *)th);
+ RUBY_DEBUG_LOG("killed th:%u", rb_th_serial(th));
if (th->vm->ractor.main_thread == th) {
ruby_stop(0);
@@ -874,9 +740,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
}
- rb_threadptr_join_list_wakeup(th);
- rb_threadptr_unlock_all_locking_mutexes(th);
-
EC_POP_TAG();
rb_ec_clear_current_thread_trace_func(th->ec);
@@ -904,12 +767,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
// after rb_ractor_living_threads_remove()
// GC will happen anytime and this ractor can be collected (and destroy GVL).
// So gvl_release() should be before it.
- gvl_release(rb_ractor_gvl(th->ractor));
+ thread_sched_to_dead(TH_SCHED(th), th);
rb_ractor_living_threads_remove(th->ractor, th);
}
else {
rb_ractor_living_threads_remove(th->ractor, th);
- gvl_release(rb_ractor_gvl(th->ractor));
+ thread_sched_to_dead(TH_SCHED(th), th);
}
return 0;
@@ -929,6 +792,8 @@ struct thread_create_params {
VALUE (*fn)(void *);
};
+static void thread_specific_storage_alloc(rb_thread_t *th);
+
static VALUE
thread_create_core(VALUE thval, struct thread_create_params *params)
{
@@ -936,11 +801,15 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
int err;
+ thread_specific_storage_alloc(th);
+
if (OBJ_FROZEN(current_th->thgroup)) {
- rb_raise(rb_eThreadError,
- "can't start a new thread (frozen ThreadGroup)");
+ rb_raise(rb_eThreadError,
+ "can't start a new thread (frozen ThreadGroup)");
}
+ rb_fiber_inherit_storage(ec, th->ec->fiber_ptr);
+
switch (params->type) {
case thread_invoke_type_proc:
th->invoke_type = thread_invoke_type_proc;
@@ -975,21 +844,21 @@ thread_create_core(VALUE thval, struct thread_create_params *params)
th->priority = current_th->priority;
th->thgroup = current_th->thgroup;
- th->pending_interrupt_queue = rb_ary_tmp_new(0);
+ th->pending_interrupt_queue = rb_ary_hidden_new(0);
th->pending_interrupt_queue_checked = 0;
th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack);
rb_native_mutex_initialize(&th->interrupt_lock);
- RUBY_DEBUG_LOG("r:%u th:%p", rb_ractor_id(th->ractor), th);
+ RUBY_DEBUG_LOG("r:%u th:%u", rb_ractor_id(th->ractor), rb_th_serial(th));
rb_ractor_living_threads_insert(th->ractor, th);
/* kick thread */
err = native_thread_create(th);
if (err) {
- th->status = THREAD_KILLED;
+ th->status = THREAD_KILLED;
rb_ractor_living_threads_remove(th->ractor, th);
rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
}
@@ -1031,8 +900,8 @@ thread_s_new(int argc, VALUE *argv, VALUE klass)
rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
th = rb_thread_ptr(thread);
if (!threadptr_initialized(th)) {
- rb_raise(rb_eThreadError, "uninitialized thread - check `%"PRIsVALUE"#initialize'",
- klass);
+ rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'",
+ klass);
}
return thread;
}
@@ -1111,11 +980,11 @@ rb_thread_create(VALUE (*fn)(void *), void *arg)
}
VALUE
-rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc)
+rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
{
struct thread_create_params params = {
.type = thread_invoke_type_ractor_proc,
- .g = g,
+ .g = r,
.args = args,
.proc = proc,
};
@@ -1127,6 +996,7 @@ struct join_arg {
struct rb_waiting_list *waiter;
rb_thread_t *target;
VALUE timeout;
+ rb_hrtime_t *limit;
};
static VALUE
@@ -1151,51 +1021,39 @@ remove_from_join_list(VALUE arg)
return Qnil;
}
-static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
+static int
+thread_finished(rb_thread_t *th)
+{
+ return th->status == THREAD_KILLED || !UNDEF_P(th->value);
+}
static VALUE
thread_join_sleep(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
rb_thread_t *target_th = p->target, *th = p->waiter->thread;
- rb_hrtime_t end = 0, rel = 0, *limit = 0;
-
- /*
- * This supports INFINITY and negative values, so we can't use
- * rb_time_interval right now...
- */
- if (p->timeout == Qnil) {
- /* unlimited */
- }
- else if (FIXNUM_P(p->timeout)) {
- rel = rb_sec2hrtime(NUM2TIMET(p->timeout));
- limit = &rel;
- }
- else {
- limit = double2hrtime(&rel, rb_num2dbl(p->timeout));
- }
+ rb_hrtime_t end = 0, *limit = p->limit;
if (limit) {
end = rb_hrtime_add(*limit, rb_hrtime_now());
}
- while (target_th->status != THREAD_KILLED) {
+ while (!thread_finished(target_th)) {
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
rb_fiber_scheduler_block(scheduler, target_th->self, p->timeout);
+ // Check if the target thread is finished after blocking:
+ if (thread_finished(target_th)) break;
+ // Otherwise, a timeout occurred:
+ else return Qfalse;
}
else if (!limit) {
- th->status = THREAD_STOPPED_FOREVER;
- rb_ractor_sleeper_threads_inc(th->ractor);
- rb_check_deadlock(th->ractor);
- native_sleep(th, 0);
- rb_ractor_sleeper_threads_dec(th->ractor);
+ sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS);
}
else {
if (hrtime_update_expire(limit, end)) {
- thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
- thread_id_str(target_th));
+ RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th));
return Qfalse;
}
th->status = THREAD_STOPPED;
@@ -1203,14 +1061,15 @@ thread_join_sleep(VALUE arg)
}
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
th->status = THREAD_RUNNABLE;
- thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+
+ RUBY_DEBUG_LOG("interrupted target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
}
+
return Qtrue;
}
static VALUE
-thread_join(rb_thread_t *target_th, VALUE timeout)
+thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
{
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = ec->thread_ptr;
@@ -1224,28 +1083,27 @@ thread_join(rb_thread_t *target_th, VALUE timeout)
rb_raise(rb_eThreadError, "Target thread must not be main thread");
}
- thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+ RUBY_DEBUG_LOG("target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
if (target_th->status != THREAD_KILLED) {
struct rb_waiting_list waiter;
waiter.next = target_th->join_list;
waiter.thread = th;
- waiter.fiber = fiber;
+ waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber;
target_th->join_list = &waiter;
struct join_arg arg;
arg.waiter = &waiter;
arg.target = target_th;
arg.timeout = timeout;
+ arg.limit = limit;
if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) {
return Qnil;
}
}
- thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+ RUBY_DEBUG_LOG("success target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
if (target_th->ec->errinfo != Qnil) {
VALUE err = target_th->ec->errinfo;
@@ -1253,8 +1111,7 @@ thread_join(rb_thread_t *target_th, VALUE timeout)
if (FIXNUM_P(err)) {
switch (err) {
case INT2FIX(TAG_FATAL):
- thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n",
- thread_id_str(target_th), thread_status_name(target_th, TRUE));
+ RUBY_DEBUG_LOG("terminated target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
/* OK. killed. */
break;
@@ -1316,23 +1173,29 @@ static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self)
{
VALUE timeout = Qnil;
+ rb_hrtime_t rel = 0, *limit = 0;
if (rb_check_arity(argc, 0, 1)) {
timeout = argv[0];
}
// Convert the timeout eagerly, so it's always converted and deterministic
- if (timeout == Qnil) {
+ /*
+ * This supports INFINITY and negative values, so we can't use
+ * rb_time_interval right now...
+ */
+ if (NIL_P(timeout)) {
/* unlimited */
}
else if (FIXNUM_P(timeout)) {
- /* handled directly in thread_join_sleep() */
+ rel = rb_sec2hrtime(NUM2TIMET(timeout));
+ limit = &rel;
}
else {
- timeout = rb_to_float(timeout);
+ limit = double2hrtime(&rel, rb_num2dbl(timeout));
}
- return thread_join(rb_thread_ptr(self), timeout);
+ return thread_join(rb_thread_ptr(self), timeout, limit);
}
/*
@@ -1353,7 +1216,11 @@ static VALUE
thread_value(VALUE self)
{
rb_thread_t *th = rb_thread_ptr(self);
- thread_join(th, Qnil);
+ thread_join(th, Qnil, 0);
+ if (UNDEF_P(th->value)) {
+ // If the thread is dead because we forked th->value is still Qundef.
+ return Qnil;
+ }
return th->value;
}
@@ -1361,44 +1228,6 @@ thread_value(VALUE self)
* Thread Scheduling
*/
-/*
- * Back when we used "struct timeval", not all platforms implemented
- * tv_sec as time_t. Nowadays we use "struct timespec" and tv_sec
- * seems to be implemented more consistently across platforms.
- * At least other parts of our code hasn't had to deal with non-time_t
- * tv_sec in timespec...
- */
-#define TIMESPEC_SEC_MAX TIMET_MAX
-#define TIMESPEC_SEC_MIN TIMET_MIN
-
-COMPILER_WARNING_PUSH
-#if __has_warning("-Wimplicit-int-float-conversion")
-COMPILER_WARNING_IGNORED(-Wimplicit-int-float-conversion)
-#elif defined(_MSC_VER)
-/* C4305: 'initializing': truncation from '__int64' to 'const double' */
-COMPILER_WARNING_IGNORED(4305)
-#endif
-static const double TIMESPEC_SEC_MAX_as_double = TIMESPEC_SEC_MAX;
-COMPILER_WARNING_POP
-
-static rb_hrtime_t *
-double2hrtime(rb_hrtime_t *hrt, double d)
-{
- /* assume timespec.tv_sec has same signedness as time_t */
- const double TIMESPEC_SEC_MAX_PLUS_ONE = 2.0 * (TIMESPEC_SEC_MAX_as_double / 2.0 + 1.0);
-
- if (TIMESPEC_SEC_MAX_PLUS_ONE <= d) {
- return NULL;
- }
- else if (d <= 0) {
- *hrt = 0;
- }
- else {
- *hrt = (rb_hrtime_t)(d * (double)RB_HRTIME_PER_SEC);
- }
- return hrt;
-}
-
static void
getclockofday(struct timespec *ts)
{
@@ -1423,32 +1252,6 @@ rb_hrtime_now(void)
return rb_timespec2hrtime(&ts);
}
-static void
-sleep_forever(rb_thread_t *th, unsigned int fl)
-{
- enum rb_thread_status prev_status = th->status;
- enum rb_thread_status status;
- int woke;
-
- status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
- th->status = status;
- RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
- while (th->status == status) {
- if (fl & SLEEP_DEADLOCKABLE) {
- rb_ractor_sleeper_threads_inc(th->ractor);
- rb_check_deadlock(th->ractor);
- }
- native_sleep(th, 0);
- if (fl & SLEEP_DEADLOCKABLE) {
- rb_ractor_sleeper_threads_dec(th->ractor);
- }
- woke = vm_check_ints_blocking(th->ec);
- if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
- break;
- }
- th->status = prev_status;
-}
-
/*
* at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end"
* being uninitialized, maybe other versions, too.
@@ -1471,15 +1274,15 @@ hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end)
rb_hrtime_t now = rb_hrtime_now();
if (now > end) return 1;
- thread_debug("hrtime_update_expire: "
- "%"PRIu64" > %"PRIu64"\n",
- (uint64_t)end, (uint64_t)now);
+
+ RUBY_DEBUG_LOG("%"PRIu64" > %"PRIu64"", (uint64_t)end, (uint64_t)now);
+
*timeout = end - now;
return 0;
}
COMPILER_WARNING_POP
-static void
+static int
sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
{
enum rb_thread_status prev_status = th->status;
@@ -1489,12 +1292,72 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
th->status = THREAD_STOPPED;
RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
while (th->status == THREAD_STOPPED) {
- native_sleep(th, &rel);
- woke = vm_check_ints_blocking(th->ec);
- if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
- break;
- if (hrtime_update_expire(&rel, end))
- break;
+ native_sleep(th, &rel);
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
+ break;
+ if (hrtime_update_expire(&rel, end))
+ break;
+ woke = 1;
+ }
+ th->status = prev_status;
+ return woke;
+}
+
+static int
+sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl)
+{
+ enum rb_thread_status prev_status = th->status;
+ int woke;
+ rb_hrtime_t rel = rb_hrtime_sub(end, rb_hrtime_now());
+
+ th->status = THREAD_STOPPED;
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ while (th->status == THREAD_STOPPED) {
+ native_sleep(th, &rel);
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
+ break;
+ if (hrtime_update_expire(&rel, end))
+ break;
+ woke = 1;
+ }
+ th->status = prev_status;
+ return woke;
+}
+
+static void
+sleep_forever(rb_thread_t *th, unsigned int fl)
+{
+ enum rb_thread_status prev_status = th->status;
+ enum rb_thread_status status;
+ int woke;
+
+ status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
+ th->status = status;
+
+ if (!(fl & SLEEP_NO_CHECKINTS)) RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+
+ while (th->status == status) {
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
+ }
+ {
+ native_sleep(th, 0);
+ }
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_dec(th->ractor);
+ }
+ if (fl & SLEEP_ALLOW_SPURIOUS) {
+ break;
+ }
+
+ woke = vm_check_ints_blocking(th->ec);
+
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) {
+ break;
+ }
}
th->status = prev_status;
}
@@ -1502,39 +1365,32 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
void
rb_thread_sleep_forever(void)
{
- thread_debug("rb_thread_sleep_forever\n");
+ RUBY_DEBUG_LOG("forever");
sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
}
void
rb_thread_sleep_deadly(void)
{
- thread_debug("rb_thread_sleep_deadly\n");
+ RUBY_DEBUG_LOG("deadly");
sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
}
-void
-rb_thread_sleep_interruptible(void)
-{
- rb_thread_t *th = GET_THREAD();
- enum rb_thread_status prev_status = th->status;
-
- th->status = THREAD_STOPPED;
- native_sleep(th, 0);
- RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
- th->status = prev_status;
-}
-
static void
-rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
+rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
- rb_fiber_scheduler_block(scheduler, blocker, Qnil);
+ rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
- thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
- sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ RUBY_DEBUG_LOG("...");
+ if (end) {
+ sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
+ }
+ else {
+ sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ }
}
}
@@ -1585,17 +1441,19 @@ rb_thread_sleep(int sec)
static void
rb_thread_schedule_limits(uint32_t limits_us)
{
- thread_debug("rb_thread_schedule\n");
if (!rb_thread_alone()) {
- rb_thread_t *th = GET_THREAD();
+ rb_thread_t *th = GET_THREAD();
+ RUBY_DEBUG_LOG("us:%u", (unsigned int)limits_us);
- if (th->running_time_us >= limits_us) {
- thread_debug("rb_thread_schedule/switch start\n");
- RB_GC_SAVE_MACHINE_CONTEXT(th);
- gvl_yield(rb_ractor_gvl(th->ractor), th);
+ if (th->running_time_us >= limits_us) {
+ RUBY_DEBUG_LOG("switch %s", "start");
+
+ RB_VM_SAVE_MACHINE_CONTEXT(th);
+ thread_sched_yield(TH_SCHED(th), th);
rb_ractor_thread_switch(th->ractor, th);
- thread_debug("rb_thread_schedule/switch done\n");
- }
+
+ RUBY_DEBUG_LOG("switch %s", "done");
+ }
}
}
@@ -1610,20 +1468,27 @@ rb_thread_schedule(void)
static inline int
blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
- rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
+ rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
{
+#ifdef RUBY_VM_CRITICAL_SECTION
+ VM_ASSERT(ruby_assert_critical_section_entered == 0);
+#endif
+ VM_ASSERT(th == GET_THREAD());
+
region->prev_status = th->status;
if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
- th->blocking_region_buffer = region;
- th->status = THREAD_STOPPED;
+ th->blocking_region_buffer = region;
+ th->status = THREAD_STOPPED;
rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
- thread_debug("enter blocking region (%p)\n", (void *)th);
- RB_GC_SAVE_MACHINE_CONTEXT(th);
- gvl_release(rb_ractor_gvl(th->ractor));
- return TRUE;
+
+ RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id);
+
+ RB_VM_SAVE_MACHINE_CONTEXT(th);
+ thread_sched_to_waiting(TH_SCHED(th), th);
+ return TRUE;
}
else {
- return FALSE;
+ return FALSE;
}
}
@@ -1635,15 +1500,21 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
/* entry to ubf_list impossible at this point, so unregister is safe: */
unregister_ubf_list(th);
- gvl_acquire(rb_ractor_gvl(th->ractor), th);
+ thread_sched_to_running(TH_SCHED(th), th);
rb_ractor_thread_switch(th->ractor, th);
- thread_debug("leave blocking region (%p)\n", (void *)th);
th->blocking_region_buffer = 0;
rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
if (th->status == THREAD_STOPPED) {
- th->status = region->prev_status;
+ th->status = region->prev_status;
}
+
+ RUBY_DEBUG_LOG("end");
+
+#ifndef _WIN32
+ // GET_THREAD() clears WSAGetLastError()
+ VM_ASSERT(th == GET_THREAD());
+#endif
}
void *
@@ -1659,35 +1530,32 @@ rb_nogvl(void *(*func)(void *), void *data1,
int saved_errno = 0;
VALUE ubf_th = Qfalse;
- if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
- ubf = ubf_select;
- data2 = th;
+ if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
+ ubf = ubf_select;
+ data2 = th;
}
else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
vm->ubf_async_safe = 1;
}
- else {
- ubf_th = rb_thread_start_unblock_thread();
- }
}
BLOCKING_REGION(th, {
- val = func(data1);
- saved_errno = errno;
+ val = func(data1);
+ saved_errno = rb_errno();
}, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
if (is_main_thread) vm->ubf_async_safe = 0;
if ((flags & RB_NOGVL_INTR_FAIL) == 0) {
- RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
if (ubf_th != Qfalse) {
thread_value(rb_thread_kill(ubf_th));
}
- errno = saved_errno;
+ rb_errno_set(saved_errno);
return val;
}
@@ -1779,67 +1647,193 @@ rb_nogvl(void *(*func)(void *), void *data1,
*/
void *
rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
- rb_unblock_function_t *ubf, void *data2)
+ rb_unblock_function_t *ubf, void *data2)
{
return rb_nogvl(func, data1, ubf, data2, RB_NOGVL_INTR_FAIL);
}
void *
rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
- rb_unblock_function_t *ubf, void *data2)
+ rb_unblock_function_t *ubf, void *data2)
{
return rb_nogvl(func, data1, ubf, data2, 0);
}
-VALUE
-rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+static int
+waitfd_to_waiting_flag(int wfd_event)
{
- volatile VALUE val = Qundef; /* shouldn't be used */
- rb_execution_context_t * volatile ec = GET_EC();
- volatile int saved_errno = 0;
- enum ruby_tag_type state;
- COROUTINE_STACK_LOCAL(struct waiting_fd, wfd);
+ return wfd_event << 1;
+}
+static void
+thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
+{
wfd->fd = fd;
- wfd->th = rb_ec_thread_ptr(ec);
+ wfd->th = th;
+ wfd->busy = NULL;
RB_VM_LOCK_ENTER();
{
- list_add(&rb_ec_vm_ptr(ec)->waiting_fds, &wfd->wfd_node);
+ ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
}
RB_VM_LOCK_LEAVE();
+}
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- BLOCKING_REGION(wfd->th, {
- val = func(data1);
- saved_errno = errno;
- }, ubf_select, wfd->th, FALSE);
+static void
+thread_io_wake_pending_closer(struct waiting_fd *wfd)
+{
+ bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
+ if (has_waiter) {
+ rb_mutex_lock(wfd->busy->wakeup_mutex);
}
- EC_POP_TAG();
- /*
- * must be deleted before jump
- * this will delete either from waiting_fds or on-stack LIST_HEAD(busy)
- */
+ /* Needs to be protected with RB_VM_LOCK because we don't know if
+ wfd is on the global list of pending FD ops or if it's on a
+ struct rb_io_close_wait_list close-waiter. */
RB_VM_LOCK_ENTER();
+ ccan_list_del(&wfd->wfd_node);
+ RB_VM_LOCK_LEAVE();
+
+ if (has_waiter) {
+ rb_thread_wakeup(wfd->busy->closing_thread);
+ rb_mutex_unlock(wfd->busy->wakeup_mutex);
+ }
+}
+
+static bool
+thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking;
+#else
+ return false;
+#endif
+}
+
+// true if need retry
+static bool
+thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ if (thread_io_mn_schedulable(th, events, timeout)) {
+ rb_hrtime_t rel, *prel;
+
+ if (timeout) {
+ rel = rb_timeval2hrtime(timeout);
+ prel = &rel;
+ }
+ else {
+ prel = NULL;
+ }
+
+ VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT)));
+
+ if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
+ // timeout
+ return false;
+ }
+ else {
+ return true;
+ }
+ }
+#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
+ return false;
+}
+
+// assume read/write
+static bool
+blocking_call_retryable_p(int r, int eno)
+{
+ if (r != -1) return false;
+
+ switch (eno) {
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool
+rb_thread_mn_schedulable(VALUE thval)
+{
+ rb_thread_t *th = rb_thread_ptr(thval);
+ return th->mn_schedulable;
+}
+
+VALUE
+rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, int events)
+{
+ rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
+
+ RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
+
+ struct waiting_fd waiting_fd;
+ volatile VALUE val = Qundef; /* shouldn't be used */
+ volatile int saved_errno = 0;
+ enum ruby_tag_type state;
+ bool prev_mn_schedulable = th->mn_schedulable;
+ th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL);
+
+ // `errno` is only valid when there is an actual error - but we can't
+ // extract that from the return value of `func` alone, so we clear any
+ // prior `errno` value here so that we can later check if it was set by
+ // `func` or not (as opposed to some previously set value).
+ errno = 0;
+
+ thread_io_setup_wfd(th, fd, &waiting_fd);
{
- list_del(&wfd->wfd_node);
- COROUTINE_STACK_FREE(wfd);
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ retry:
+ BLOCKING_REGION(waiting_fd.th, {
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, waiting_fd.th, FALSE);
+
+ if (events &&
+ blocking_call_retryable_p((int)val, saved_errno) &&
+ thread_io_wait_events(th, fd, events, NULL)) {
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ goto retry;
+ }
+ }
+ EC_POP_TAG();
+
+ th->mn_schedulable = prev_mn_schedulable;
}
- RB_VM_LOCK_LEAVE();
+ /*
+ * must be deleted before jump
+ * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
+ */
+ thread_io_wake_pending_closer(&waiting_fd);
if (state) {
- EC_JUMP_TAG(ec, state);
+ EC_JUMP_TAG(ec, state);
}
/* TODO: check func() */
RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ // If the error was a timeout, we raise a specific exception for that:
+ if (saved_errno == ETIMEDOUT) {
+ rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!");
+ }
+
errno = saved_errno;
return val;
}
+VALUE
+rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+{
+ return rb_thread_io_blocking_call(func, data1, fd, 0);
+}
+
/*
* rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
*
@@ -1877,20 +1871,20 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
void *r;
if (th == 0) {
- /* Error has occurred, but we can't use rb_bug()
- * because this thread is not Ruby's thread.
+ /* Error has occurred, but we can't use rb_bug()
+ * because this thread is not Ruby's thread.
* What should we do?
- */
+ */
bp();
- fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
- exit(EXIT_FAILURE);
+ fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
+ exit(EXIT_FAILURE);
}
brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer;
prev_unblock = th->unblock;
if (brb == 0) {
- rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
+ rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
}
blocking_region_end(th, brb);
@@ -1917,10 +1911,10 @@ ruby_thread_has_gvl_p(void)
rb_thread_t *th = ruby_thread_from_native();
if (th && th->blocking_region_buffer == 0) {
- return 1;
+ return 1;
}
else {
- return 0;
+ return 0;
}
}
@@ -1974,7 +1968,7 @@ static void
threadptr_check_pending_interrupt_queue(rb_thread_t *th)
{
if (!th->pending_interrupt_queue) {
- rb_raise(rb_eThreadError, "uninitialized thread");
+ rb_raise(rb_eThreadError, "uninitialized thread");
}
}
@@ -1986,6 +1980,23 @@ enum handle_interrupt_timing {
};
static enum handle_interrupt_timing
+rb_threadptr_pending_interrupt_from_symbol(rb_thread_t *th, VALUE sym)
+{
+ if (sym == sym_immediate) {
+ return INTERRUPT_IMMEDIATE;
+ }
+ else if (sym == sym_on_blocking) {
+ return INTERRUPT_ON_BLOCKING;
+ }
+ else if (sym == sym_never) {
+ return INTERRUPT_NEVER;
+ }
+ else {
+ rb_raise(rb_eThreadError, "unknown mask signature");
+ }
+}
+
+static enum handle_interrupt_timing
rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
{
VALUE mask;
@@ -1995,35 +2006,34 @@ rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
long i;
for (i=0; i<mask_stack_len; i++) {
- mask = mask_stack[mask_stack_len-(i+1)];
-
- for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
- VALUE klass = mod;
- VALUE sym;
-
- if (BUILTIN_TYPE(mod) == T_ICLASS) {
- klass = RBASIC(mod)->klass;
- }
- else if (mod != RCLASS_ORIGIN(mod)) {
- continue;
- }
-
- if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
- if (sym == sym_immediate) {
- return INTERRUPT_IMMEDIATE;
- }
- else if (sym == sym_on_blocking) {
- return INTERRUPT_ON_BLOCKING;
- }
- else if (sym == sym_never) {
- return INTERRUPT_NEVER;
- }
- else {
- rb_raise(rb_eThreadError, "unknown mask signature");
- }
- }
- }
- /* try to next mask */
+ mask = mask_stack[mask_stack_len-(i+1)];
+
+ if (SYMBOL_P(mask)) {
+ /* do not match RUBY_FATAL_THREAD_KILLED etc */
+ if (err != rb_cInteger) {
+ return rb_threadptr_pending_interrupt_from_symbol(th, mask);
+ }
+ else {
+ continue;
+ }
+ }
+
+ for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
+ VALUE klass = mod;
+ VALUE sym;
+
+ if (BUILTIN_TYPE(mod) == T_ICLASS) {
+ klass = RBASIC(mod)->klass;
+ }
+ else if (mod != RCLASS_ORIGIN(mod)) {
+ continue;
+ }
+
+ if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
+ return rb_threadptr_pending_interrupt_from_symbol(th, sym);
+ }
+ }
+ /* try to next mask */
}
return INTERRUPT_NONE;
}
@@ -2039,10 +2049,10 @@ rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
{
int i;
for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
- VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i);
- if (rb_class_inherited_p(e, err)) {
- return TRUE;
- }
+ VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i);
+ if (rb_obj_is_kind_of(e, err)) {
+ return TRUE;
+ }
}
return FALSE;
}
@@ -2054,23 +2064,23 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi
int i;
for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
- VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i);
+ VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i);
- enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
+ enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
- switch (mask_timing) {
- case INTERRUPT_ON_BLOCKING:
- if (timing != INTERRUPT_ON_BLOCKING) {
- break;
- }
- /* fall through */
- case INTERRUPT_NONE: /* default: IMMEDIATE */
- case INTERRUPT_IMMEDIATE:
- rb_ary_delete_at(th->pending_interrupt_queue, i);
- return err;
- case INTERRUPT_NEVER:
- break;
- }
+ switch (mask_timing) {
+ case INTERRUPT_ON_BLOCKING:
+ if (timing != INTERRUPT_ON_BLOCKING) {
+ break;
+ }
+ /* fall through */
+ case INTERRUPT_NONE: /* default: IMMEDIATE */
+ case INTERRUPT_IMMEDIATE:
+ rb_ary_delete_at(th->pending_interrupt_queue, i);
+ return err;
+ case INTERRUPT_NEVER:
+ break;
+ }
}
th->pending_interrupt_queue_checked = 1;
@@ -2078,7 +2088,7 @@ rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timi
#else
VALUE err = rb_ary_shift(th->pending_interrupt_queue);
if (rb_threadptr_pending_interrupt_empty_p(th)) {
- th->pending_interrupt_queue_checked = 1;
+ th->pending_interrupt_queue_checked = 1;
}
return err;
#endif
@@ -2093,11 +2103,11 @@ threadptr_pending_interrupt_active_p(rb_thread_t *th)
* since last check.
*/
if (th->pending_interrupt_queue_checked) {
- return 0;
+ return 0;
}
if (rb_threadptr_pending_interrupt_empty_p(th)) {
- return 0;
+ return 0;
}
return 1;
@@ -2109,13 +2119,27 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
VALUE *maskp = (VALUE *)args;
if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
- rb_raise(rb_eArgError, "unknown mask signature");
+ rb_raise(rb_eArgError, "unknown mask signature");
+ }
+
+ if (key == rb_eException && (UNDEF_P(*maskp) || NIL_P(*maskp))) {
+ *maskp = val;
+ return ST_CONTINUE;
}
- if (!*maskp) {
- *maskp = rb_ident_hash_new();
+ if (RTEST(*maskp)) {
+ if (!RB_TYPE_P(*maskp, T_HASH)) {
+ VALUE prev = *maskp;
+ *maskp = rb_ident_hash_new();
+ if (SYMBOL_P(prev)) {
+ rb_hash_aset(*maskp, rb_eException, prev);
+ }
+ }
+ rb_hash_aset(*maskp, key, val);
+ }
+ else {
+ *maskp = Qfalse;
}
- rb_hash_aset(*maskp, key, val);
return ST_CONTINUE;
}
@@ -2231,45 +2255,57 @@ handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
static VALUE
rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
{
- VALUE mask;
+ VALUE mask = Qundef;
rb_execution_context_t * volatile ec = GET_EC();
rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
volatile VALUE r = Qnil;
enum ruby_tag_type state;
if (!rb_block_given_p()) {
- rb_raise(rb_eArgError, "block is needed.");
+ rb_raise(rb_eArgError, "block is needed.");
}
- mask = 0;
mask_arg = rb_to_hash_type(mask_arg);
+
+ if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) {
+ mask = Qnil;
+ }
+
rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
- if (!mask) {
- return rb_yield(Qnil);
+
+ if (UNDEF_P(mask)) {
+ return rb_yield(Qnil);
+ }
+
+ if (!RTEST(mask)) {
+ mask = mask_arg;
}
- OBJ_FREEZE_RAW(mask);
+ else if (RB_TYPE_P(mask, T_HASH)) {
+ OBJ_FREEZE_RAW(mask);
+ }
+
rb_ary_push(th->pending_interrupt_mask_stack, mask);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
- th->pending_interrupt_queue_checked = 0;
- RUBY_VM_SET_INTERRUPT(th->ec);
+ th->pending_interrupt_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(th->ec);
}
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- r = rb_yield(Qnil);
+ r = rb_yield(Qnil);
}
EC_POP_TAG();
rb_ary_pop(th->pending_interrupt_mask_stack);
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
- th->pending_interrupt_queue_checked = 0;
- RUBY_VM_SET_INTERRUPT(th->ec);
+ th->pending_interrupt_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(th->ec);
}
RUBY_VM_CHECK_INTS(th->ec);
if (state) {
- EC_JUMP_TAG(th->ec, state);
+ EC_JUMP_TAG(th->ec, state);
}
return r;
@@ -2291,25 +2327,20 @@ rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
rb_thread_t *target_th = rb_thread_ptr(target_thread);
if (!target_th->pending_interrupt_queue) {
- return Qfalse;
+ return Qfalse;
}
if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
- return Qfalse;
+ return Qfalse;
}
if (rb_check_arity(argc, 0, 1)) {
VALUE err = argv[0];
if (!rb_obj_is_kind_of(err, rb_cModule)) {
rb_raise(rb_eTypeError, "class or module required for rescue clause");
}
- if (rb_threadptr_pending_interrupt_include_p(target_th, err)) {
- return Qtrue;
- }
- else {
- return Qfalse;
- }
+ return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err));
}
else {
- return Qtrue;
+ return Qtrue;
}
}
@@ -2396,13 +2427,13 @@ threadptr_get_interrupts(rb_thread_t *th)
rb_atomic_t old;
do {
- interrupt = ec->interrupt_flag;
- old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
+ interrupt = ec->interrupt_flag;
+ old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
} while (old != interrupt);
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
}
-MJIT_FUNC_EXPORTED int
+int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
rb_atomic_t interrupt;
@@ -2412,16 +2443,16 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
if (th->ec->raised_flag) return ret;
while ((interrupt = threadptr_get_interrupts(th)) != 0) {
- int sig;
- int timer_interrupt;
- int pending_interrupt;
- int trap_interrupt;
+ int sig;
+ int timer_interrupt;
+ int pending_interrupt;
+ int trap_interrupt;
int terminate_interrupt;
- timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
- pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
- postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
- trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
+ timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
+ pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
+ postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
+ trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
@@ -2429,76 +2460,71 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
RB_VM_LOCK_LEAVE();
}
- if (postponed_job_interrupt) {
- rb_postponed_job_flush(th->vm);
- }
-
- /* signal handling */
- if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
- enum rb_thread_status prev_status = th->status;
- int sigwait_fd = rb_sigwait_fd_get(th);
-
- if (sigwait_fd >= 0) {
- (void)consume_communication_pipe(sigwait_fd);
- ruby_sigchld_handler(th->vm);
- rb_sigwait_fd_put(th, sigwait_fd);
- rb_sigwait_fd_migrate(th->vm);
- }
- th->status = THREAD_RUNNABLE;
- while ((sig = rb_get_next_signal()) != 0) {
- ret |= rb_signal_exec(th, sig);
- }
- th->status = prev_status;
- }
-
- /* exception from another thread */
- if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
- VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
- thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
+ if (postponed_job_interrupt) {
+ rb_postponed_job_flush(th->vm);
+ }
+
+ /* signal handling */
+ if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
+ enum rb_thread_status prev_status = th->status;
+
+ th->status = THREAD_RUNNABLE;
+ {
+ while ((sig = rb_get_next_signal()) != 0) {
+ ret |= rb_signal_exec(th, sig);
+ }
+ }
+ th->status = prev_status;
+ }
+
+ /* exception from another thread */
+ if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
+ VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
+ RUBY_DEBUG_LOG("err:%"PRIdVALUE, err);
ret = TRUE;
- if (err == Qundef) {
- /* no error */
- }
- else if (err == eKillSignal /* Thread#kill received */ ||
- err == eTerminateSignal /* Terminate thread */ ||
- err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
+ if (UNDEF_P(err)) {
+ /* no error */
+ }
+ else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ ||
+ err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ ||
+ err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
terminate_interrupt = 1;
- }
- else {
- if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
- /* the only special exception to be queued across thread */
- err = ruby_vm_special_exception_copy(err);
- }
- /* set runnable if th was slept. */
- if (th->status == THREAD_STOPPED ||
- th->status == THREAD_STOPPED_FOREVER)
- th->status = THREAD_RUNNABLE;
- rb_exc_raise(err);
- }
- }
+ }
+ else {
+ if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
+ /* the only special exception to be queued across thread */
+ err = ruby_vm_special_exception_copy(err);
+ }
+ /* set runnable if th was slept. */
+ if (th->status == THREAD_STOPPED ||
+ th->status == THREAD_STOPPED_FOREVER)
+ th->status = THREAD_RUNNABLE;
+ rb_exc_raise(err);
+ }
+ }
if (terminate_interrupt) {
rb_threadptr_to_kill(th);
}
if (timer_interrupt) {
- uint32_t limits_us = TIME_QUANTUM_USEC;
+ uint32_t limits_us = TIME_QUANTUM_USEC;
- if (th->priority > 0)
- limits_us <<= th->priority;
- else
- limits_us >>= -th->priority;
+ if (th->priority > 0)
+ limits_us <<= th->priority;
+ else
+ limits_us >>= -th->priority;
- if (th->status == THREAD_RUNNABLE)
- th->running_time_us += TIME_QUANTUM_USEC;
+ if (th->status == THREAD_RUNNABLE)
+ th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro
VM_ASSERT(th->ec->cfp);
- EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
- 0, 0, 0, Qundef);
+ EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
+ 0, 0, 0, Qundef);
- rb_thread_schedule_limits(limits_us);
- }
+ rb_thread_schedule_limits(limits_us);
+ }
}
return ret;
}
@@ -2521,20 +2547,20 @@ rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
VALUE exc;
if (rb_threadptr_dead(target_th)) {
- return Qnil;
+ return Qnil;
}
if (argc == 0) {
- exc = rb_exc_new(rb_eRuntimeError, 0, 0);
+ exc = rb_exc_new(rb_eRuntimeError, 0, 0);
}
else {
- exc = rb_make_exception(argc, argv);
+ exc = rb_make_exception(argc, argv);
}
/* making an exception object can switch thread,
so we need to check thread deadness again */
if (rb_threadptr_dead(target_th)) {
- return Qnil;
+ return Qnil;
}
rb_ec_setup_exception(GET_EC(), exc, Qundef);
@@ -2569,7 +2595,7 @@ int
rb_ec_set_raised(rb_execution_context_t *ec)
{
if (ec->raised_flag & RAISED_EXCEPTION) {
- return 1;
+ return 1;
}
ec->raised_flag |= RAISED_EXCEPTION;
return 0;
@@ -2579,47 +2605,80 @@ int
rb_ec_reset_raised(rb_execution_context_t *ec)
{
if (!(ec->raised_flag & RAISED_EXCEPTION)) {
- return 0;
+ return 0;
}
ec->raised_flag &= ~RAISED_EXCEPTION;
return 1;
}
int
-rb_notify_fd_close(int fd, struct list_head *busy)
+rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
{
rb_vm_t *vm = GET_THREAD()->vm;
struct waiting_fd *wfd = 0, *next;
+ ccan_list_head_init(&busy->pending_fd_users);
+ int has_any;
+ VALUE wakeup_mutex;
RB_VM_LOCK_ENTER();
{
- list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
+ ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
if (wfd->fd == fd) {
rb_thread_t *th = wfd->th;
VALUE err;
- list_del(&wfd->wfd_node);
- list_add(busy, &wfd->wfd_node);
+ ccan_list_del(&wfd->wfd_node);
+ ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
+ wfd->busy = busy;
err = th->vm->special_exceptions[ruby_error_stream_closed];
rb_threadptr_pending_interrupt_enque(th, err);
rb_threadptr_interrupt(th);
}
}
}
+
+ has_any = !ccan_list_empty(&busy->pending_fd_users);
+ busy->closing_thread = rb_thread_current();
+ wakeup_mutex = Qnil;
+ if (has_any) {
+ wakeup_mutex = rb_mutex_new();
+ RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
+ }
+ busy->wakeup_mutex = wakeup_mutex;
+
RB_VM_LOCK_LEAVE();
- return !list_empty(busy);
+ /* If the caller didn't pass *busy as a pointer to something on the stack,
+ we need to guard this mutex object on _our_ C stack for the duration
+ of this function. */
+ RB_GC_GUARD(wakeup_mutex);
+ return has_any;
+}
+
+void
+rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
+{
+ if (!RB_TEST(busy->wakeup_mutex)) {
+ /* There was nobody else using this file when we closed it, so we
+ never bothered to allocate a mutex*/
+ return;
+ }
+
+ rb_mutex_lock(busy->wakeup_mutex);
+ while (!ccan_list_empty(&busy->pending_fd_users)) {
+ rb_mutex_sleep(busy->wakeup_mutex, Qnil);
+ }
+ rb_mutex_unlock(busy->wakeup_mutex);
}
void
rb_thread_fd_close(int fd)
{
- struct list_head busy;
+ struct rb_io_close_wait_list busy;
- list_head_init(&busy);
if (rb_notify_fd_close(fd, &busy)) {
- do rb_thread_schedule(); while (!list_empty(&busy));
+ rb_notify_fd_close_wait(&busy);
}
}
@@ -2655,7 +2714,7 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
/* To perform Thread.current.raise as Kernel.raise */
if (current_th == target_th) {
- RUBY_VM_CHECK_INTS(target_th->ec);
+ RUBY_VM_CHECK_INTS(target_th->ec);
}
return Qnil;
}
@@ -2675,36 +2734,37 @@ thread_raise_m(int argc, VALUE *argv, VALUE self)
VALUE
rb_thread_kill(VALUE thread)
{
- rb_thread_t *th = rb_thread_ptr(thread);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
- if (th->to_kill || th->status == THREAD_KILLED) {
- return thread;
+ if (target_th->to_kill || target_th->status == THREAD_KILLED) {
+ return thread;
}
- if (th == th->vm->ractor.main_thread) {
- rb_exit(EXIT_SUCCESS);
+ if (target_th == target_th->vm->ractor.main_thread) {
+ rb_exit(EXIT_SUCCESS);
}
- thread_debug("rb_thread_kill: %p (%"PRI_THREAD_ID")\n", (void *)th, thread_id_str(th));
+ RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th));
- if (th == GET_THREAD()) {
- /* kill myself immediately */
- rb_threadptr_to_kill(th);
+ if (target_th == GET_THREAD()) {
+ /* kill myself immediately */
+ rb_threadptr_to_kill(target_th);
}
else {
- threadptr_check_pending_interrupt_queue(th);
- rb_threadptr_pending_interrupt_enque(th, eKillSignal);
- rb_threadptr_interrupt(th);
+ threadptr_check_pending_interrupt_queue(target_th);
+ rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
+ rb_threadptr_interrupt(target_th);
}
+
return thread;
}
int
rb_thread_to_be_killed(VALUE thread)
{
- rb_thread_t *th = rb_thread_ptr(thread);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
- if (th->to_kill || th->status == THREAD_KILLED) {
- return TRUE;
+ if (target_th->to_kill || target_th->status == THREAD_KILLED) {
+ return TRUE;
}
return FALSE;
}
@@ -2770,7 +2830,7 @@ VALUE
rb_thread_wakeup(VALUE thread)
{
if (!RTEST(rb_thread_wakeup_alive(thread))) {
- rb_raise(rb_eThreadError, "killed thread");
+ rb_raise(rb_eThreadError, "killed thread");
}
return thread;
}
@@ -2784,8 +2844,8 @@ rb_thread_wakeup_alive(VALUE thread)
rb_threadptr_ready(target_th);
if (target_th->status == THREAD_STOPPED ||
- target_th->status == THREAD_STOPPED_FOREVER) {
- target_th->status = THREAD_RUNNABLE;
+ target_th->status == THREAD_STOPPED_FOREVER) {
+ target_th->status = THREAD_RUNNABLE;
}
return thread;
@@ -2860,7 +2920,7 @@ VALUE
rb_thread_list(void)
{
// TODO
- return rb_ractor_thread_list(GET_RACTOR());
+ return rb_ractor_thread_list();
}
/*
@@ -2953,7 +3013,7 @@ rb_thread_s_main(VALUE klass)
static VALUE
rb_thread_s_abort_exc(VALUE _)
{
- return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse;
+ return RBOOL(GET_THREAD()->vm->thread_abort_on_exception);
}
@@ -3013,7 +3073,7 @@ rb_thread_s_abort_exc_set(VALUE self, VALUE val)
static VALUE
rb_thread_abort_exc(VALUE thread)
{
- return rb_thread_ptr(thread)->abort_on_exception ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_ptr(thread)->abort_on_exception);
}
@@ -3083,7 +3143,7 @@ rb_thread_abort_exc_set(VALUE thread, VALUE val)
static VALUE
rb_thread_s_report_exc(VALUE _)
{
- return GET_THREAD()->vm->thread_report_on_exception ? Qtrue : Qfalse;
+ return RBOOL(GET_THREAD()->vm->thread_report_on_exception);
}
@@ -3139,7 +3199,7 @@ rb_thread_s_report_exc_set(VALUE self, VALUE val)
static VALUE
rb_thread_s_ignore_deadlock(VALUE _)
{
- return GET_THREAD()->vm->thread_ignore_deadlock ? Qtrue : Qfalse;
+ return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock);
}
@@ -3153,7 +3213,7 @@ rb_thread_s_ignore_deadlock(VALUE _)
* deadlock condition via some other means, such as a signal.
*
* Thread.ignore_deadlock = true
- * queue = Queue.new
+ * queue = Thread::Queue.new
*
* trap(:SIGUSR1){queue.push "Received signal"}
*
@@ -3190,7 +3250,7 @@ rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val)
static VALUE
rb_thread_report_exc(VALUE thread)
{
- return rb_thread_ptr(thread)->report_on_exception ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_ptr(thread)->report_on_exception);
}
@@ -3235,15 +3295,15 @@ thread_status_name(rb_thread_t *th, int detail)
{
switch (th->status) {
case THREAD_RUNNABLE:
- return th->to_kill ? "aborting" : "run";
+ return th->to_kill ? "aborting" : "run";
case THREAD_STOPPED_FOREVER:
- if (detail) return "sleep_forever";
+ if (detail) return "sleep_forever";
case THREAD_STOPPED:
- return "sleep";
+ return "sleep";
case THREAD_KILLED:
- return "dead";
+ return "dead";
default:
- return "unknown";
+ return "unknown";
}
}
@@ -3291,16 +3351,16 @@ rb_thread_status(VALUE thread)
rb_thread_t *target_th = rb_thread_ptr(thread);
if (rb_threadptr_dead(target_th)) {
- if (!NIL_P(target_th->ec->errinfo) &&
- !FIXNUM_P(target_th->ec->errinfo)) {
- return Qnil;
- }
- else {
- return Qfalse;
- }
+ if (!NIL_P(target_th->ec->errinfo) &&
+ !FIXNUM_P(target_th->ec->errinfo)) {
+ return Qnil;
+ }
+ else {
+ return Qfalse;
+ }
}
else {
- return rb_str_new2(thread_status_name(target_th, FALSE));
+ return rb_str_new2(thread_status_name(target_th, FALSE));
}
}
@@ -3322,12 +3382,7 @@ rb_thread_status(VALUE thread)
static VALUE
rb_thread_alive_p(VALUE thread)
{
- if (rb_threadptr_dead(rb_thread_ptr(thread))) {
- return Qfalse;
- }
- else {
- return Qtrue;
- }
+ return RBOOL(!thread_finished(rb_thread_ptr(thread)));
}
/*
@@ -3350,15 +3405,9 @@ rb_thread_stop_p(VALUE thread)
rb_thread_t *th = rb_thread_ptr(thread);
if (rb_threadptr_dead(th)) {
- return Qtrue;
- }
- else if (th->status == THREAD_STOPPED ||
- th->status == THREAD_STOPPED_FOREVER) {
- return Qtrue;
- }
- else {
- return Qfalse;
+ return Qtrue;
}
+ return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER);
}
/*
@@ -3388,18 +3437,18 @@ rb_thread_setname(VALUE thread, VALUE name)
rb_thread_t *target_th = rb_thread_ptr(thread);
if (!NIL_P(name)) {
- rb_encoding *enc;
- StringValueCStr(name);
- enc = rb_enc_get(name);
- if (!rb_enc_asciicompat(enc)) {
- rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
- rb_enc_name(enc));
- }
- name = rb_str_new_frozen(name);
+ rb_encoding *enc;
+ StringValueCStr(name);
+ enc = rb_enc_get(name);
+ if (!rb_enc_asciicompat(enc)) {
+ rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
+ rb_enc_name(enc));
+ }
+ name = rb_str_new_frozen(name);
}
target_th->name = name;
- if (threadptr_initialized(target_th)) {
- native_set_another_thread_name(target_th->thread_id, name);
+ if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
+ native_set_another_thread_name(target_th->nt->thread_id, name);
}
return name;
}
@@ -3461,7 +3510,6 @@ rb_thread_to_s(VALUE thread)
if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE,
RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
- rb_gc_force_recycle(loc);
}
rb_str_catf(str, " %s>", status);
@@ -3469,24 +3517,24 @@ rb_thread_to_s(VALUE thread)
}
/* variables for recursive traversals */
-static ID recursive_key;
+#define recursive_key id__recursive_key__
static VALUE
threadptr_local_aref(rb_thread_t *th, ID id)
{
if (id == recursive_key) {
- return th->ec->local_storage_recursive_hash;
+ return th->ec->local_storage_recursive_hash;
}
else {
- VALUE val;
- struct rb_id_table *local_storage = th->ec->local_storage;
+ VALUE val;
+ struct rb_id_table *local_storage = th->ec->local_storage;
- if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) {
- return val;
- }
- else {
- return Qnil;
- }
+ if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) {
+ return val;
+ }
+ else {
+ return Qnil;
+ }
}
}
@@ -3590,26 +3638,26 @@ rb_thread_fetch(int argc, VALUE *argv, VALUE self)
block_given = rb_block_given_p();
if (block_given && argc == 2) {
- rb_warn("block supersedes default value argument");
+ rb_warn("block supersedes default value argument");
}
id = rb_check_id(&key);
if (id == recursive_key) {
- return target_th->ec->local_storage_recursive_hash;
+ return target_th->ec->local_storage_recursive_hash;
}
else if (id && target_th->ec->local_storage &&
- rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
- return val;
+ rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
+ return val;
}
else if (block_given) {
- return rb_yield(key);
+ return rb_yield(key);
}
else if (argc == 1) {
- rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
+ rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
}
else {
- return argv[1];
+ return argv[1];
}
}
@@ -3617,24 +3665,24 @@ static VALUE
threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
{
if (id == recursive_key) {
- th->ec->local_storage_recursive_hash = val;
- return val;
+ th->ec->local_storage_recursive_hash = val;
+ return val;
}
else {
- struct rb_id_table *local_storage = th->ec->local_storage;
+ struct rb_id_table *local_storage = th->ec->local_storage;
- if (NIL_P(val)) {
- if (!local_storage) return Qnil;
- rb_id_table_delete(local_storage, id);
- return Qnil;
- }
- else {
- if (local_storage == NULL) {
- th->ec->local_storage = local_storage = rb_id_table_create(0);
- }
- rb_id_table_insert(local_storage, id, val);
- return val;
- }
+ if (NIL_P(val)) {
+ if (!local_storage) return Qnil;
+ rb_id_table_delete(local_storage, id);
+ return Qnil;
+ }
+ else {
+ if (local_storage == NULL) {
+ th->ec->local_storage = local_storage = rb_id_table_create(0);
+ }
+ rb_id_table_insert(local_storage, id, val);
+ return val;
+ }
}
}
@@ -3750,14 +3798,9 @@ rb_thread_key_p(VALUE self, VALUE key)
struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
if (!id || local_storage == NULL) {
- return Qfalse;
- }
- else if (rb_id_table_lookup(local_storage, id, &val)) {
- return Qtrue;
- }
- else {
- return Qfalse;
+ return Qfalse;
}
+ return RBOOL(rb_id_table_lookup(local_storage, id, &val));
}
static enum rb_id_table_iterator_result
@@ -3795,7 +3838,7 @@ rb_thread_keys(VALUE self)
VALUE ary = rb_ary_new();
if (local_storage) {
- rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
+ rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
}
return ary;
}
@@ -3866,14 +3909,7 @@ rb_thread_variable_p(VALUE thread, VALUE key)
}
locals = rb_thread_local_storage(thread);
- if (rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil) {
- return Qtrue;
- }
- else {
- return Qfalse;
- }
-
- return Qfalse;
+ return RBOOL(rb_hash_lookup(locals, rb_to_symbol(key)) != Qnil);
}
/*
@@ -3936,10 +3972,10 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
#else
priority = NUM2INT(prio);
if (priority > RUBY_THREAD_PRIORITY_MAX) {
- priority = RUBY_THREAD_PRIORITY_MAX;
+ priority = RUBY_THREAD_PRIORITY_MAX;
}
else if (priority < RUBY_THREAD_PRIORITY_MIN) {
- priority = RUBY_THREAD_PRIORITY_MIN;
+ priority = RUBY_THREAD_PRIORITY_MIN;
}
target_th->priority = (int8_t)priority;
#endif
@@ -3962,13 +3998,11 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
* - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4)
* select(2) documents how to allocate fd_set dynamically.
* http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4
- * - HP-UX documents how to allocate fd_set dynamically.
- * http://docs.hp.com/en/B2355-60105/select.2.html
* - Solaris 8 has select_large_fdset
* - Mac OS X 10.7 (Lion)
* select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and
* _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined.
- * http://developer.apple.com/library/mac/#releasenotes/Darwin/SymbolVariantsRelNotes/_index.html
+ * https://developer.apple.com/library/archive/releasenotes/Darwin/SymbolVariantsRelNotes/index.html
*
* When fd_set is not big enough to hold big file descriptors,
* it should be allocated dynamically.
@@ -3995,7 +4029,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
if (size < sizeof(fd_set))
- size = sizeof(fd_set);
+ size = sizeof(fd_set);
dst->maxfd = src->maxfd;
dst->fdset = xmalloc(size);
memcpy(dst->fdset, src->fdset, size);
@@ -4004,7 +4038,7 @@ rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
void
rb_fd_term(rb_fdset_t *fds)
{
- if (fds->fdset) xfree(fds->fdset);
+ xfree(fds->fdset);
fds->maxfd = 0;
fds->fdset = 0;
}
@@ -4013,7 +4047,7 @@ void
rb_fd_zero(rb_fdset_t *fds)
{
if (fds->fdset)
- MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
+ MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
}
static void
@@ -4026,8 +4060,8 @@ rb_fd_resize(int n, rb_fdset_t *fds)
if (o < sizeof(fd_set)) o = sizeof(fd_set);
if (m > o) {
- fds->fdset = xrealloc(fds->fdset, m);
- memset((char *)fds->fdset + o, 0, m - o);
+ fds->fdset = xrealloc(fds->fdset, m);
+ memset((char *)fds->fdset + o, 0, m - o);
}
if (n >= fds->maxfd) fds->maxfd = n + 1;
}
@@ -4070,7 +4104,7 @@ rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
if (size < sizeof(fd_set))
- size = sizeof(fd_set);
+ size = sizeof(fd_set);
dst->maxfd = src->maxfd;
dst->fdset = xrealloc(dst->fdset, size);
memcpy(dst->fdset, src->fdset, size);
@@ -4144,7 +4178,7 @@ rb_fd_set(int fd, rb_fdset_t *set)
}
}
if (set->fdset->fd_count >= (unsigned)set->capa) {
- set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
+ set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
set->fdset =
rb_xrealloc_mul_add(
set->fdset, set->capa, sizeof(SOCKET), sizeof(unsigned int));
@@ -4188,7 +4222,7 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
return FALSE;
}
else if (*result == 0) {
- /* check for spurious wakeup */
+ /* check for spurious wakeup */
if (rel) {
return !hrtime_update_expire(rel, end);
}
@@ -4199,7 +4233,6 @@ wait_retryable(int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
struct select_set {
int max;
- int sigwait_fd;
rb_thread_t *th;
rb_fdset_t *rset;
rb_fdset_t *wset;
@@ -4215,11 +4248,6 @@ select_set_free(VALUE p)
{
struct select_set *set = (struct select_set *)p;
- if (set->sigwait_fd >= 0) {
- rb_sigwait_fd_put(set->th, set->sigwait_fd);
- rb_sigwait_fd_migrate(set->th->vm);
- }
-
rb_fd_term(&set->orig_rset);
rb_fd_term(&set->orig_wset);
rb_fd_term(&set->orig_eset);
@@ -4227,21 +4255,6 @@ select_set_free(VALUE p)
return Qfalse;
}
-static const rb_hrtime_t *
-sigwait_timeout(rb_thread_t *th, int sigwait_fd, const rb_hrtime_t *orig,
- int *drained_p)
-{
- static const rb_hrtime_t quantum = TIME_QUANTUM_USEC * 1000;
-
- if (sigwait_fd >= 0 && (!ubf_threads_empty() || BUSY_WAIT_SIGNALS)) {
- *drained_p = check_signals_nogvl(th, sigwait_fd);
- if (!orig || *orig > quantum)
- return &quantum;
- }
-
- return orig;
-}
-
static VALUE
do_select(VALUE p)
{
@@ -4260,90 +4273,32 @@ do_select(VALUE p)
TRUE)
do {
- int drained;
- lerrno = 0;
+ lerrno = 0;
- BLOCKING_REGION(set->th, {
- const rb_hrtime_t *sto;
+ BLOCKING_REGION(set->th, {
struct timeval tv;
- sto = sigwait_timeout(set->th, set->sigwait_fd, to, &drained);
if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
- result = native_fd_select(set->max, set->rset, set->wset,
- set->eset,
- rb_hrtime2timeval(&tv, sto), set->th);
+ result = native_fd_select(set->max,
+ set->rset, set->wset, set->eset,
+ rb_hrtime2timeval(&tv, to), set->th);
if (result < 0) lerrno = errno;
}
- }, set->sigwait_fd >= 0 ? ubf_sigwait : ubf_select, set->th, TRUE);
-
- if (set->sigwait_fd >= 0) {
- if (result > 0 && rb_fd_isset(set->sigwait_fd, set->rset)) {
- result--;
- (void)check_signals_nogvl(set->th, set->sigwait_fd);
- }
- else {
- (void)check_signals_nogvl(set->th, -1);
- }
- }
+ }, ubf_select, set->th, TRUE);
RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
} while (wait_retryable(&result, lerrno, to, end) && do_select_update());
if (result < 0) {
- errno = lerrno;
+ errno = lerrno;
}
return (VALUE)result;
}
-static void
-rb_thread_wait_fd_rw(int fd, int read)
-{
- int result = 0;
- int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT;
-
- thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write");
-
- if (fd < 0) {
- rb_raise(rb_eIOError, "closed stream");
- }
-
- result = rb_wait_for_single_fd(fd, events, NULL);
- if (result < 0) {
- rb_sys_fail(0);
- }
-
- thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write");
-}
-
-void
-rb_thread_wait_fd(int fd)
-{
- rb_thread_wait_fd_rw(fd, 1);
-}
-
-int
-rb_thread_fd_writable(int fd)
-{
- rb_thread_wait_fd_rw(fd, 0);
- return TRUE;
-}
-
-static rb_fdset_t *
-init_set_fd(int fd, rb_fdset_t *fds)
-{
- if (fd < 0) {
- return 0;
- }
- rb_fd_init(fds);
- rb_fd_set(fd, fds);
-
- return fds;
-}
-
int
rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
- struct timeval *timeout)
+ struct timeval *timeout)
{
struct select_set set;
@@ -4364,16 +4319,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return 0;
}
- set.sigwait_fd = rb_sigwait_fd_get(set.th);
- if (set.sigwait_fd >= 0) {
- if (set.rset)
- rb_fd_set(set.sigwait_fd, set.rset);
- else
- set.rset = init_set_fd(set.sigwait_fd, &set.orig_rset);
- if (set.sigwait_fd >= set.max) {
- set.max = set.sigwait_fd + 1;
- }
- }
#define fd_init_copy(f) do { \
if (set.f) { \
rb_fd_resize(set.max - 1, set.f); \
@@ -4410,91 +4355,67 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
int
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
- struct pollfd fds[2];
+ struct pollfd fds[1] = {{
+ .fd = fd,
+ .events = (short)events,
+ .revents = 0,
+ }};
int result = 0;
- int drained;
nfds_t nfds;
- rb_unblock_function_t *ubf;
struct waiting_fd wfd;
- int state;
+ enum ruby_tag_type state;
volatile int lerrno;
- wfd.th = GET_THREAD();
- wfd.fd = fd;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
- RB_VM_LOCK_ENTER();
- {
- list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
+ thread_io_setup_wfd(th, fd, &wfd);
- EC_PUSH_TAG(wfd.th->ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- rb_hrtime_t *to, rel, end = 0;
- RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
- timeout_prepare(&to, &rel, &end, timeout);
- fds[0].fd = fd;
- fds[0].events = (short)events;
- fds[0].revents = 0;
- do {
- fds[1].fd = rb_sigwait_fd_get(wfd.th);
-
- if (fds[1].fd >= 0) {
- fds[1].events = POLLIN;
- fds[1].revents = 0;
- nfds = 2;
- ubf = ubf_sigwait;
- }
- else {
+ if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
+ // fd is readable
+ state = 0;
+ fds[0].revents = events;
+ errno = 0;
+ }
+ else {
+ EC_PUSH_TAG(wfd.th->ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ rb_hrtime_t *to, rel, end = 0;
+ RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+ timeout_prepare(&to, &rel, &end, timeout);
+ do {
nfds = 1;
- ubf = ubf_select;
- }
- lerrno = 0;
- BLOCKING_REGION(wfd.th, {
- const rb_hrtime_t *sto;
- struct timespec ts;
+ lerrno = 0;
+ BLOCKING_REGION(wfd.th, {
+ struct timespec ts;
- sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained);
- if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
- result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), 0);
- if (result < 0) lerrno = errno;
- }
- }, ubf, wfd.th, TRUE);
-
- if (fds[1].fd >= 0) {
- if (result > 0 && fds[1].revents) {
- result--;
- (void)check_signals_nogvl(wfd.th, fds[1].fd);
- } else {
- (void)check_signals_nogvl(wfd.th, -1);
- }
- rb_sigwait_fd_put(wfd.th, fds[1].fd);
- rb_sigwait_fd_migrate(wfd.th->vm);
- }
- RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
- } while (wait_retryable(&result, lerrno, to, end));
- }
- EC_POP_TAG();
+ if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) {
+ result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0);
+ if (result < 0) lerrno = errno;
+ }
+ }, ubf_select, wfd.th, TRUE);
- RB_VM_LOCK_ENTER();
- {
- list_del(&wfd.wfd_node);
+ RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+ } while (wait_retryable(&result, lerrno, to, end));
+ }
+ EC_POP_TAG();
}
- RB_VM_LOCK_LEAVE();
+
+ thread_io_wake_pending_closer(&wfd);
if (state) {
EC_JUMP_TAG(wfd.th->ec, state);
}
if (result < 0) {
- errno = lerrno;
- return -1;
+ errno = lerrno;
+ return -1;
}
if (fds[0].revents & POLLNVAL) {
- errno = EBADF;
- return -1;
+ errno = EBADF;
+ return -1;
}
/*
@@ -4503,23 +4424,23 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
*/
result = 0;
if (fds[0].revents & POLLIN_SET)
- result |= RB_WAITFD_IN;
+ result |= RB_WAITFD_IN;
if (fds[0].revents & POLLOUT_SET)
- result |= RB_WAITFD_OUT;
+ result |= RB_WAITFD_OUT;
if (fds[0].revents & POLLEX_SET)
- result |= RB_WAITFD_PRI;
+ result |= RB_WAITFD_PRI;
/* all requested events are ready if there is an error */
if (fds[0].revents & POLLERR_SET)
- result |= events;
+ result |= events;
return result;
}
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
struct select_args {
union {
- int fd;
- int error;
+ int fd;
+ int error;
} as;
rb_fdset_t *read;
rb_fdset_t *write;
@@ -4537,15 +4458,15 @@ select_single(VALUE ptr)
r = rb_thread_fd_select(args->as.fd + 1,
args->read, args->write, args->except, args->tv);
if (r == -1)
- args->as.error = errno;
+ args->as.error = errno;
if (r > 0) {
- r = 0;
- if (args->read && rb_fd_isset(args->as.fd, args->read))
- r |= RB_WAITFD_IN;
- if (args->write && rb_fd_isset(args->as.fd, args->write))
- r |= RB_WAITFD_OUT;
- if (args->except && rb_fd_isset(args->as.fd, args->except))
- r |= RB_WAITFD_PRI;
+ r = 0;
+ if (args->read && rb_fd_isset(args->as.fd, args->read))
+ r |= RB_WAITFD_IN;
+ if (args->write && rb_fd_isset(args->as.fd, args->write))
+ r |= RB_WAITFD_OUT;
+ if (args->except && rb_fd_isset(args->as.fd, args->except))
+ r |= RB_WAITFD_PRI;
}
return (VALUE)r;
}
@@ -4555,7 +4476,7 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;
- list_del(&args->wfd.wfd_node);
+ thread_io_wake_pending_closer(&args->wfd);
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
@@ -4563,6 +4484,18 @@ select_single_cleanup(VALUE ptr)
return (VALUE)-1;
}
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
+{
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
+
+ return fds;
+}
+
int
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
@@ -4570,24 +4503,19 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct select_args args;
int r;
VALUE ptr = (VALUE)&args;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = timeout;
- args.wfd.fd = fd;
- args.wfd.th = GET_THREAD();
-
- RB_VM_LOCK_ENTER();
- {
- list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
+ thread_io_setup_wfd(th, fd, &args.wfd);
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
if (r == -1)
- errno = args.as.error;
+ errno = args.as.error;
return r;
}
@@ -4615,8 +4543,8 @@ rb_threadptr_check_signal(rb_thread_t *mth)
{
/* mth must be main_thread */
if (rb_signal_buff_size() > 0) {
- /* wakeup main thread */
- threadptr_trap_interrupt(mth);
+ /* wakeup main thread */
+ threadptr_trap_interrupt(mth);
}
}
@@ -4626,7 +4554,7 @@ async_bug_fd(const char *mesg, int errno_arg, int fd)
char buff[64];
size_t n = strlcpy(buff, mesg, sizeof(buff));
if (n < sizeof(buff)-3) {
- ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
+ ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
}
rb_async_bug_errno(buff, errno_arg);
}
@@ -4644,67 +4572,44 @@ consume_communication_pipe(int fd)
ssize_t result;
int ret = FALSE; /* for rb_sigwait_sleep */
- /*
- * disarm UBF_TIMER before we read, because it can become
- * re-armed at any time via sighandler and the pipe will refill
- * We can disarm it because this thread is now processing signals
- * and we do not want unnecessary SIGVTALRM
- */
- ubf_timer_disarm();
-
while (1) {
- result = read(fd, buff, sizeof(buff));
- if (result > 0) {
- ret = TRUE;
- if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
- return ret;
- }
- }
- else if (result == 0) {
- return ret;
- }
- else if (result < 0) {
- int e = errno;
- switch (e) {
- case EINTR:
- continue; /* retry */
- case EAGAIN:
-#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
- case EWOULDBLOCK:
+ result = read(fd, buff, sizeof(buff));
+#if USE_EVENTFD
+ RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]);
+#else
+ RUBY_DEBUG_LOG("result:%d", (int)result);
#endif
- return ret;
- default:
- async_bug_fd("consume_communication_pipe: read", e, fd);
- }
- }
- }
-}
-
-static int
-check_signals_nogvl(rb_thread_t *th, int sigwait_fd)
-{
- rb_vm_t *vm = GET_VM(); /* th may be 0 */
- int ret = sigwait_fd >= 0 ? consume_communication_pipe(sigwait_fd) : FALSE;
- ubf_wakeup_all_threads();
- ruby_sigchld_handler(vm);
- if (rb_signal_buff_size()) {
- if (th == vm->ractor.main_thread) {
- /* no need to lock + wakeup if already in main thread */
- RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ if (result > 0) {
+ ret = TRUE;
+ if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
+ return ret;
+ }
}
- else {
- threadptr_trap_interrupt(vm->ractor.main_thread);
+ else if (result == 0) {
+ return ret;
+ }
+ else if (result < 0) {
+ int e = errno;
+ switch (e) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return ret;
+ default:
+ async_bug_fd("consume_communication_pipe: read", e, fd);
+ }
}
- ret = TRUE; /* for SIGCHLD_LOSSY && rb_sigwait_sleep */
}
- return ret;
}
void
rb_thread_stop_timer_thread(void)
{
if (TIMER_THREAD_CREATED_P() && native_stop_timer_thread()) {
- native_reset_timer_thread();
+ native_reset_timer_thread();
}
}
@@ -4742,10 +4647,10 @@ clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
}
}
if (branches) {
- VALUE counters = RARRAY_AREF(branches, 1);
- for (i = 0; i < RARRAY_LEN(counters); i++) {
- RARRAY_ASET(counters, i, INT2FIX(0));
- }
+ VALUE counters = RARRAY_AREF(branches, 1);
+ for (i = 0; i < RARRAY_LEN(counters); i++) {
+ RARRAY_ASET(counters, i, INT2FIX(0));
+ }
}
return ST_CONTINUE;
@@ -4773,21 +4678,21 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
r->threads.main = th;
r->status_ = ractor_created;
- gvl_atfork(rb_ractor_gvl(th->ractor));
+ thread_sched_atfork(TH_SCHED(th));
ubf_list_atfork();
// OK. Only this thread accesses:
- list_for_each(&vm->ractor.set, r, vmlr_node) {
- list_for_each(&r->threads.set, i, lt_node) {
+ ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
+ ccan_list_for_each(&r->threads.set, i, lt_node) {
atfork(i, th);
}
}
rb_vm_living_threads_init(vm);
rb_ractor_atfork(vm, th);
+ rb_vm_postponed_job_atfork();
- /* may be held by MJIT threads in parent */
- rb_native_mutex_initialize(&vm->waitpid_lock);
+ /* may be held by RJIT threads in parent */
rb_native_mutex_initialize(&vm->workqueue_lock);
/* may be held by any thread in parent */
@@ -4797,6 +4702,10 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
rb_ractor_sleeper_threads_clear(th->ractor);
rb_clear_coverages();
+ // restart timer thread (timer threads access to `vm->waitpid_lock` and so on.
+ rb_thread_reset_timer_thread();
+ rb_thread_start_timer_thread();
+
VM_ASSERT(vm->ractor.blocking_cnt == 0);
VM_ASSERT(vm->ractor.cnt == 1);
}
@@ -4805,9 +4714,9 @@ static void
terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
{
if (th != current_th) {
- rb_mutex_abandon_keeping_mutexes(th);
- rb_mutex_abandon_locking_mutex(th);
- thread_cleanup_func(th, TRUE);
+ rb_mutex_abandon_keeping_mutexes(th);
+ rb_mutex_abandon_locking_mutex(th);
+ thread_cleanup_func(th, TRUE);
}
}
@@ -4816,22 +4725,20 @@ void
rb_thread_atfork(void)
{
rb_thread_t *th = GET_THREAD();
+ rb_threadptr_pending_interrupt_clear(th);
rb_thread_atfork_internal(th, terminate_atfork_i);
th->join_list = NULL;
rb_fiber_atfork(th);
/* We don't want reproduce CVE-2003-0900. */
rb_reset_random_seed();
-
- /* For child, starting MJIT worker thread in this place which is safer than immediately after `after_fork_ruby`. */
- mjit_child_after_fork();
}
static void
terminate_atfork_before_exec_i(rb_thread_t *th, const rb_thread_t *current_th)
{
if (th != current_th) {
- thread_cleanup_func_before_exec(th);
+ thread_cleanup_func_before_exec(th);
}
}
@@ -4855,19 +4762,16 @@ rb_thread_atfork_before_exec(void)
struct thgroup {
int enclosed;
- VALUE group;
};
-static size_t
-thgroup_memsize(const void *ptr)
-{
- return sizeof(struct thgroup);
-}
-
static const rb_data_type_t thgroup_data_type = {
"thgroup",
- {0, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,},
- 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
+ {
+ 0,
+ RUBY_TYPED_DEFAULT_FREE,
+ NULL, // No external memory to report
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
};
/*
@@ -4897,7 +4801,6 @@ thgroup_s_alloc(VALUE klass)
group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data);
data->enclosed = 0;
- data->group = group;
return group;
}
@@ -4918,10 +4821,10 @@ thgroup_list(VALUE group)
rb_thread_t *th = 0;
rb_ractor_t *r = GET_RACTOR();
- list_for_each(&r->threads.set, th, lt_node) {
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
if (th->thgroup == group) {
- rb_ary_push(ary, th->self);
- }
+ rb_ary_push(ary, th->self);
+ }
}
return ary;
}
@@ -4968,9 +4871,7 @@ thgroup_enclosed_p(VALUE group)
struct thgroup *data;
TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
- if (data->enclosed)
- return Qtrue;
- return Qfalse;
+ return RBOOL(data->enclosed);
}
@@ -5007,20 +4908,20 @@ thgroup_add(VALUE group, VALUE thread)
struct thgroup *data;
if (OBJ_FROZEN(group)) {
- rb_raise(rb_eThreadError, "can't move to the frozen thread group");
+ rb_raise(rb_eThreadError, "can't move to the frozen thread group");
}
TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
if (data->enclosed) {
- rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
+ rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
}
if (OBJ_FROZEN(target_th->thgroup)) {
- rb_raise(rb_eThreadError, "can't move from the frozen thread group");
+ rb_raise(rb_eThreadError, "can't move from the frozen thread group");
}
TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data);
if (data->enclosed) {
- rb_raise(rb_eThreadError,
- "can't move from the enclosed thread group");
+ rb_raise(rb_eThreadError,
+ "can't move from the enclosed thread group");
}
target_th->thgroup = group;
@@ -5065,7 +4966,7 @@ rb_thread_shield_waiting_inc(VALUE b)
unsigned int w = rb_thread_shield_waiting(b);
w++;
if (w > THREAD_SHIELD_WAITING_MAX)
- rb_raise(rb_eRuntimeError, "waiting count overflow");
+ rb_raise(rb_eRuntimeError, "waiting count overflow");
RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
}
@@ -5088,6 +4989,17 @@ rb_thread_shield_new(void)
return thread_shield;
}
+bool
+rb_thread_shield_owned(VALUE self)
+{
+ VALUE mutex = GetThreadShieldPtr(self);
+ if (!mutex) return false;
+
+ rb_mutex_t *m = mutex_ptr(mutex);
+
+ return m->fiber == GET_EC()->fiber_ptr;
+}
+
/*
* Wait a thread shield.
*
@@ -5118,7 +5030,7 @@ thread_shield_get_mutex(VALUE self)
{
VALUE mutex = GetThreadShieldPtr(self);
if (!mutex)
- rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self);
+ rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self);
return mutex;
}
@@ -5130,7 +5042,7 @@ rb_thread_shield_release(VALUE self)
{
VALUE mutex = thread_shield_get_mutex(self);
rb_mutex_unlock(mutex);
- return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_shield_waiting(self) > 0);
}
/*
@@ -5142,7 +5054,7 @@ rb_thread_shield_destroy(VALUE self)
VALUE mutex = thread_shield_get_mutex(self);
DATA_PTR(self) = 0;
rb_mutex_unlock(mutex);
- return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_shield_waiting(self) > 0);
}
static VALUE
@@ -5172,50 +5084,50 @@ recursive_list_access(VALUE sym)
VALUE hash = threadptr_recursive_hash(th);
VALUE list;
if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) {
- hash = rb_ident_hash_new();
- threadptr_recursive_hash_set(th, hash);
- list = Qnil;
+ hash = rb_ident_hash_new();
+ threadptr_recursive_hash_set(th, hash);
+ list = Qnil;
}
else {
- list = rb_hash_aref(hash, sym);
+ list = rb_hash_aref(hash, sym);
}
if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
- list = rb_ident_hash_new();
- rb_hash_aset(hash, sym, list);
+ list = rb_ident_hash_new();
+ rb_hash_aset(hash, sym, list);
}
return list;
}
/*
- * Returns Qtrue if and only if obj (or the pair <obj, paired_obj>) is already
+ * Returns true if and only if obj (or the pair <obj, paired_obj>) is already
* in the recursion list.
* Assumes the recursion list is valid.
*/
-static VALUE
+static bool
recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id)
{
#if SIZEOF_LONG == SIZEOF_VOIDP
#define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
#elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
- #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \
+ #define OBJ_ID_EQL(obj_id, other) (RB_BIGNUM_TYPE_P((obj_id)) ? \
rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
#endif
VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
- if (pair_list == Qundef)
- return Qfalse;
+ if (UNDEF_P(pair_list))
+ return false;
if (paired_obj_id) {
- if (!RB_TYPE_P(pair_list, T_HASH)) {
- if (!OBJ_ID_EQL(paired_obj_id, pair_list))
- return Qfalse;
- }
- else {
- if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
- return Qfalse;
- }
+ if (!RB_TYPE_P(pair_list, T_HASH)) {
+ if (!OBJ_ID_EQL(paired_obj_id, pair_list))
+ return false;
+ }
+ else {
+ if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
+ return false;
+ }
}
- return Qtrue;
+ return true;
}
/*
@@ -5233,19 +5145,19 @@ recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
VALUE pair_list;
if (!paired_obj) {
- rb_hash_aset(list, obj, Qtrue);
+ rb_hash_aset(list, obj, Qtrue);
}
- else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) {
- rb_hash_aset(list, obj, paired_obj);
+ else if (UNDEF_P(pair_list = rb_hash_lookup2(list, obj, Qundef))) {
+ rb_hash_aset(list, obj, paired_obj);
}
else {
- if (!RB_TYPE_P(pair_list, T_HASH)){
- VALUE other_paired_obj = pair_list;
- pair_list = rb_hash_new();
- rb_hash_aset(pair_list, other_paired_obj, Qtrue);
- rb_hash_aset(list, obj, pair_list);
- }
- rb_hash_aset(pair_list, paired_obj, Qtrue);
+ if (!RB_TYPE_P(pair_list, T_HASH)){
+ VALUE other_paired_obj = pair_list;
+ pair_list = rb_hash_new();
+ rb_hash_aset(pair_list, other_paired_obj, Qtrue);
+ rb_hash_aset(list, obj, pair_list);
+ }
+ rb_hash_aset(pair_list, paired_obj, Qtrue);
}
}
@@ -5261,16 +5173,16 @@ static int
recursive_pop(VALUE list, VALUE obj, VALUE paired_obj)
{
if (paired_obj) {
- VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
- if (pair_list == Qundef) {
- return 0;
- }
- if (RB_TYPE_P(pair_list, T_HASH)) {
- rb_hash_delete_entry(pair_list, paired_obj);
- if (!RHASH_EMPTY_P(pair_list)) {
- return 1; /* keep hash until is empty */
- }
- }
+ VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
+ if (UNDEF_P(pair_list)) {
+ return 0;
+ }
+ if (RB_TYPE_P(pair_list, T_HASH)) {
+ rb_hash_delete_entry(pair_list, paired_obj);
+ if (!RHASH_EMPTY_P(pair_list)) {
+ return 1; /* keep hash until is empty */
+ }
+ }
}
rb_hash_delete_entry(list, obj);
return 1;
@@ -5295,7 +5207,7 @@ exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
* Calls func(obj, arg, recursive), where recursive is non-zero if the
* current method is called recursively on obj, or on the pair <obj, pairid>
* If outer is 0, then the innermost func will be called with recursive set
- * to Qtrue, otherwise the outermost func will be called. In the latter case,
+ * to true, otherwise the outermost func will be called. In the latter case,
* all inner func are short-circuited by throw.
* Implementation details: the value thrown is the recursive list which is
* proper to the current method and unlikely to be caught anywhere else.
@@ -5303,10 +5215,9 @@ exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
*/
static VALUE
-exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer)
+exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer, ID mid)
{
VALUE result = Qundef;
- const ID mid = rb_frame_last_func();
const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL);
struct exec_recursive_params p;
int outermost;
@@ -5317,41 +5228,41 @@ exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE
outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
if (recursive_check(p.list, p.obj, pairid)) {
- if (outer && !outermost) {
- rb_throw_obj(p.list, p.list);
- }
- return (*func)(obj, arg, TRUE);
+ if (outer && !outermost) {
+ rb_throw_obj(p.list, p.list);
+ }
+ return (*func)(obj, arg, TRUE);
}
else {
- enum ruby_tag_type state;
-
- p.func = func;
-
- if (outermost) {
- recursive_push(p.list, ID2SYM(recursive_key), 0);
- recursive_push(p.list, p.obj, p.pairid);
- result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
- if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid;
- if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
- if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
- if (result == p.list) {
- result = (*func)(obj, arg, TRUE);
- }
- }
- else {
- volatile VALUE ret = Qundef;
- recursive_push(p.list, p.obj, p.pairid);
- EC_PUSH_TAG(GET_EC());
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- ret = (*func)(obj, arg, FALSE);
- }
- EC_POP_TAG();
- if (!recursive_pop(p.list, p.obj, p.pairid)) {
+ enum ruby_tag_type state;
+
+ p.func = func;
+
+ if (outermost) {
+ recursive_push(p.list, ID2SYM(recursive_key), 0);
+ recursive_push(p.list, p.obj, p.pairid);
+ result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
+ if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid;
+ if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
+ if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
+ if (result == p.list) {
+ result = (*func)(obj, arg, TRUE);
+ }
+ }
+ else {
+ volatile VALUE ret = Qundef;
+ recursive_push(p.list, p.obj, p.pairid);
+ EC_PUSH_TAG(GET_EC());
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ ret = (*func)(obj, arg, FALSE);
+ }
+ EC_POP_TAG();
+ if (!recursive_pop(p.list, p.obj, p.pairid)) {
goto invalid;
- }
- if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
- result = ret;
- }
+ }
+ if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
+ result = ret;
+ }
}
*(volatile struct exec_recursive_params *)&p;
return result;
@@ -5371,7 +5282,7 @@ exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE
VALUE
rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
{
- return exec_recursive(func, obj, 0, arg, 0);
+ return exec_recursive(func, obj, 0, arg, 0, rb_frame_last_func());
}
/*
@@ -5382,31 +5293,37 @@ rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
VALUE
rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
{
- return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0);
+ return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0, rb_frame_last_func());
}
/*
* If recursion is detected on the current method and obj, the outermost
- * func will be called with (obj, arg, Qtrue). All inner func will be
+ * func will be called with (obj, arg, true). All inner func will be
* short-circuited using throw.
*/
VALUE
rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
{
- return exec_recursive(func, obj, 0, arg, 1);
+ return exec_recursive(func, obj, 0, arg, 1, rb_frame_last_func());
+}
+
+VALUE
+rb_exec_recursive_outer_mid(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg, ID mid)
+{
+ return exec_recursive(func, obj, 0, arg, 1, mid);
}
/*
* If recursion is detected on the current method, obj and paired_obj,
- * the outermost func will be called with (obj, arg, Qtrue). All inner
+ * the outermost func will be called with (obj, arg, true). All inner
* func will be short-circuited using throw.
*/
VALUE
rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
{
- return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1);
+ return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1, rb_frame_last_func());
}
/*
@@ -5445,7 +5362,6 @@ Init_Thread_Mutex(void)
{
rb_thread_t *th = GET_THREAD();
- rb_native_mutex_initialize(&th->vm->waitpid_lock);
rb_native_mutex_initialize(&th->vm->workqueue_lock);
rb_native_mutex_initialize(&th->interrupt_lock);
}
@@ -5491,10 +5407,6 @@ Init_Thread(void)
rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1);
rb_define_singleton_method(rb_cThread, "ignore_deadlock", rb_thread_s_ignore_deadlock, 0);
rb_define_singleton_method(rb_cThread, "ignore_deadlock=", rb_thread_s_ignore_deadlock_set, 1);
-#if THREAD_DEBUG < 0
- rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
- rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
-#endif
rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
@@ -5537,7 +5449,7 @@ Init_Thread(void)
rb_define_alias(rb_cThread, "inspect", "to_s");
rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError,
- "stream closed in another thread");
+ "stream closed in another thread");
cThGroup = rb_define_class("ThreadGroup", rb_cObject);
rb_define_alloc_func(cThGroup, thgroup_s_alloc);
@@ -5548,29 +5460,35 @@ Init_Thread(void)
{
th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
- rb_define_const(cThGroup, "Default", th->thgroup);
+ rb_define_const(cThGroup, "Default", th->thgroup);
}
- recursive_key = rb_intern_const("__recursive_key__");
rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
/* init thread core */
{
- /* main thread setting */
- {
- /* acquire global vm lock */
- rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor);
- gvl_acquire(gvl, th);
+ /* main thread setting */
+ {
+ /* acquire global vm lock */
+#ifdef HAVE_PTHREAD_NP_H
+ VM_ASSERT(TH_SCHED(th)->running == th);
+#endif
+ // thread_sched_to_running() should not be called because
+ // it assumes blocked by thread_sched_to_waiting().
+ // thread_sched_to_running(sched, th);
- th->pending_interrupt_queue = rb_ary_tmp_new(0);
- th->pending_interrupt_queue_checked = 0;
- th->pending_interrupt_mask_stack = rb_ary_tmp_new(0);
- }
+ th->pending_interrupt_queue = rb_ary_hidden_new(0);
+ th->pending_interrupt_queue_checked = 0;
+ th->pending_interrupt_mask_stack = rb_ary_hidden_new(0);
+ }
}
rb_thread_create_timer_thread();
Init_thread_sync();
+
+ // TODO: Suppress unused function warning for now
+ // if (0) rb_thread_sched_destroy(NULL);
}
int
@@ -5581,6 +5499,12 @@ ruby_native_thread_p(void)
return th != 0;
}
+#ifdef NON_SCALAR_THREAD_ID
+ #define thread_id_str(th) (NULL)
+#else
+ #define thread_id_str(th) ((void *)(uintptr_t)(th)->nt->thread_id)
+#endif
+
static void
debug_deadlock_check(rb_ractor_t *r, VALUE msg)
{
@@ -5588,13 +5512,13 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg)
VALUE sep = rb_str_new_cstr("\n ");
rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n",
- rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r),
+ rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r),
(void *)GET_THREAD(), (void *)r->threads.main);
- list_for_each(&r->threads.set, th, lt_node) {
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
- "native:%"PRI_THREAD_ID" int:%u",
- th->self, (void *)th, thread_id_str(th), th->ec->interrupt_flag);
+ "native:%p int:%u",
+ th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag);
if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
@@ -5620,22 +5544,26 @@ rb_check_deadlock(rb_ractor_t *r)
{
if (GET_THREAD()->vm->thread_ignore_deadlock) return;
- int found = 0;
- rb_thread_t *th = NULL;
+#ifdef RUBY_THREAD_PTHREAD_H
+ if (r->threads.sched.readyq_cnt > 0) return;
+#endif
+
int sleeper_num = rb_ractor_sleeper_thread_num(r);
int ltnum = rb_ractor_living_thread_num(r);
if (ltnum > sleeper_num) return;
if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
- if (patrol_thread && patrol_thread != GET_THREAD()) return;
- list_for_each(&r->threads.set, th, lt_node) {
+ int found = 0;
+ rb_thread_t *th = NULL;
+
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
found = 1;
}
else if (th->locking_mutex) {
rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
- if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !list_empty(&mutex->waitq))) {
+ if (mutex->fiber == th->ec->fiber_ptr || (!mutex->fiber && !ccan_list_empty(&mutex->waitq))) {
found = 1;
}
}
@@ -5644,42 +5572,57 @@ rb_check_deadlock(rb_ractor_t *r)
}
if (!found) {
- VALUE argv[2];
- argv[0] = rb_eFatal;
- argv[1] = rb_str_new2("No live threads left. Deadlock?");
- debug_deadlock_check(r, argv[1]);
+ VALUE argv[2];
+ argv[0] = rb_eFatal;
+ argv[1] = rb_str_new2("No live threads left. Deadlock?");
+ debug_deadlock_check(r, argv[1]);
rb_ractor_sleeper_threads_dec(GET_RACTOR());
- rb_threadptr_raise(r->threads.main, 2, argv);
+ rb_threadptr_raise(r->threads.main, 2, argv);
}
}
+// Used for VM memsize reporting. Returns the size of a list of waiting_fd
+// structs. Defined here because the struct definition lives here as well.
+size_t
+rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds)
+{
+ struct waiting_fd *waitfd = 0;
+ size_t size = 0;
+
+ ccan_list_for_each(waiting_fds, waitfd, wfd_node) {
+ size += sizeof(struct waiting_fd);
+ }
+
+ return size;
+}
+
static void
update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
{
const rb_control_frame_t *cfp = GET_EC()->cfp;
VALUE coverage = rb_iseq_coverage(cfp->iseq);
if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
- VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
- if (lines) {
- long line = rb_sourceline() - 1;
- long count;
- VALUE num;
+ VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
+ if (lines) {
+ long line = rb_sourceline() - 1;
+ long count;
+ VALUE num;
void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
- rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - cfp->iseq->body->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
+ rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
rb_ary_push(lines, LONG2FIX(line + 1));
return;
}
- if (line >= RARRAY_LEN(lines)) { /* no longer tracked */
- return;
- }
- num = RARRAY_AREF(lines, line);
- if (!FIXNUM_P(num)) return;
- count = FIX2LONG(num) + 1;
- if (POSFIXABLE(count)) {
- RARRAY_ASET(lines, line, LONG2FIX(count));
- }
- }
+ if (line >= RARRAY_LEN(lines)) { /* no longer tracked */
+ return;
+ }
+ num = RARRAY_AREF(lines, line);
+ if (!FIXNUM_P(num)) return;
+ count = FIX2LONG(num) + 1;
+ if (POSFIXABLE(count)) {
+ RARRAY_ASET(lines, line, LONG2FIX(count));
+ }
+ }
}
}
@@ -5689,17 +5632,17 @@ update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
const rb_control_frame_t *cfp = GET_EC()->cfp;
VALUE coverage = rb_iseq_coverage(cfp->iseq);
if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
- VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
- if (branches) {
- long pc = cfp->pc - cfp->iseq->body->iseq_encoded - 1;
+ VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
+ if (branches) {
+ long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1;
long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count;
- VALUE counters = RARRAY_AREF(branches, 1);
- VALUE num = RARRAY_AREF(counters, idx);
- count = FIX2LONG(num) + 1;
- if (POSFIXABLE(count)) {
- RARRAY_ASET(counters, idx, LONG2FIX(count));
- }
- }
+ VALUE counters = RARRAY_AREF(branches, 1);
+ VALUE num = RARRAY_AREF(counters, idx);
+ count = FIX2LONG(num) + 1;
+ if (POSFIXABLE(count)) {
+ RARRAY_ASET(counters, idx, LONG2FIX(count));
+ }
+ }
}
}
@@ -5713,52 +5656,52 @@ rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5])
retry:
switch (me->def->type) {
case VM_METHOD_TYPE_ISEQ: {
- const rb_iseq_t *iseq = me->def->body.iseq.iseqptr;
- rb_iseq_location_t *loc = &iseq->body->location;
- path = rb_iseq_path(iseq);
- beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
- beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
- end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
- end_pos_column = INT2FIX(loc->code_location.end_pos.column);
- break;
+ const rb_iseq_t *iseq = me->def->body.iseq.iseqptr;
+ rb_iseq_location_t *loc = &ISEQ_BODY(iseq)->location;
+ path = rb_iseq_path(iseq);
+ beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
+ beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
+ end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
+ end_pos_column = INT2FIX(loc->code_location.end_pos.column);
+ break;
}
case VM_METHOD_TYPE_BMETHOD: {
const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0);
- if (iseq) {
- rb_iseq_location_t *loc;
- rb_iseq_check(iseq);
- path = rb_iseq_path(iseq);
- loc = &iseq->body->location;
- beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
- beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
- end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
- end_pos_column = INT2FIX(loc->code_location.end_pos.column);
- break;
- }
- return NULL;
+ if (iseq) {
+ rb_iseq_location_t *loc;
+ rb_iseq_check(iseq);
+ path = rb_iseq_path(iseq);
+ loc = &ISEQ_BODY(iseq)->location;
+ beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
+ beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
+ end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
+ end_pos_column = INT2FIX(loc->code_location.end_pos.column);
+ break;
+ }
+ return NULL;
}
case VM_METHOD_TYPE_ALIAS:
- me = me->def->body.alias.original_me;
- goto retry;
+ me = me->def->body.alias.original_me;
+ goto retry;
case VM_METHOD_TYPE_REFINED:
- me = me->def->body.refined.orig_me;
- if (!me) return NULL;
- goto retry;
+ me = me->def->body.refined.orig_me;
+ if (!me) return NULL;
+ goto retry;
default:
- return NULL;
+ return NULL;
}
/* found */
if (RB_TYPE_P(path, T_ARRAY)) {
- path = rb_ary_entry(path, 1);
- if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */
+ path = rb_ary_entry(path, 1);
+ if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */
}
if (resolved_location) {
- resolved_location[0] = path;
- resolved_location[1] = beg_pos_lineno;
- resolved_location[2] = beg_pos_column;
- resolved_location[3] = end_pos_lineno;
- resolved_location[4] = end_pos_column;
+ resolved_location[0] = path;
+ resolved_location[1] = beg_pos_lineno;
+ resolved_location[2] = beg_pos_column;
+ resolved_location[3] = end_pos_lineno;
+ resolved_location[4] = end_pos_column;
}
return me;
}
@@ -5778,7 +5721,7 @@ update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg)
rcount = rb_hash_aref(me2counter, (VALUE) me);
count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1;
if (POSFIXABLE(count)) {
- rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count));
+ rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count));
}
}
@@ -5798,46 +5741,59 @@ void
rb_set_coverages(VALUE coverages, int mode, VALUE me2counter)
{
GET_VM()->coverages = coverages;
+ GET_VM()->me2counter = me2counter;
GET_VM()->coverage_mode = mode;
+}
+
+void
+rb_resume_coverages(void)
+{
+ int mode = GET_VM()->coverage_mode;
+ VALUE me2counter = GET_VM()->me2counter;
rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
if (mode & COVERAGE_TARGET_BRANCHES) {
- rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
}
if (mode & COVERAGE_TARGET_METHODS) {
- rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
}
}
-/* Make coverage arrays empty so old covered files are no longer tracked. */
void
-rb_reset_coverages(void)
+rb_suspend_coverages(void)
{
- rb_clear_coverages();
- rb_iseq_remove_coverage_all();
- GET_VM()->coverages = Qfalse;
rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage);
if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) {
- rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage);
+ rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage);
}
if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) {
- rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage);
+ rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage);
}
}
+/* Make coverage arrays empty so old covered files are no longer tracked. */
+void
+rb_reset_coverages(void)
+{
+ rb_clear_coverages();
+ rb_iseq_remove_coverage_all();
+ GET_VM()->coverages = Qfalse;
+}
+
VALUE
rb_default_coverage(int n)
{
- VALUE coverage = rb_ary_tmp_new_fill(3);
+ VALUE coverage = rb_ary_hidden_new_fill(3);
VALUE lines = Qfalse, branches = Qfalse;
int mode = GET_VM()->coverage_mode;
if (mode & COVERAGE_TARGET_LINES) {
- lines = n > 0 ? rb_ary_tmp_new_fill(n) : rb_ary_tmp_new(0);
+ lines = n > 0 ? rb_ary_hidden_new_fill(n) : rb_ary_hidden_new(0);
}
RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines);
if (mode & COVERAGE_TARGET_BRANCHES) {
- branches = rb_ary_tmp_new_fill(2);
+ branches = rb_ary_hidden_new_fill(2);
/* internal data structures for branch coverage:
*
* { branch base node =>
@@ -5861,9 +5817,9 @@ rb_default_coverage(int n)
*/
VALUE structure = rb_hash_new();
rb_obj_hide(structure);
- RARRAY_ASET(branches, 0, structure);
- /* branch execution counters */
- RARRAY_ASET(branches, 1, rb_ary_tmp_new(0));
+ RARRAY_ASET(branches, 0, structure);
+ /* branch execution counters */
+ RARRAY_ASET(branches, 1, rb_ary_hidden_new(0));
}
RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches);
@@ -5898,3 +5854,66 @@ rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
RUBY_VM_CHECK_INTS(cur_th->ec);
return ret;
}
+
+static void
+thread_specific_storage_alloc(rb_thread_t *th)
+{
+ VM_ASSERT(th->specific_storage == NULL);
+
+ if (UNLIKELY(specific_key_count > 0)) {
+ th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ }
+}
+
+rb_internal_thread_specific_key_t
+rb_internal_thread_specific_key_create(void)
+{
+ rb_vm_t *vm = GET_VM();
+
+ if (specific_key_count == 0 && vm->ractor.cnt > 1) {
+ rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors");
+ }
+ else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) {
+ rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ }
+ else {
+ rb_internal_thread_specific_key_t key = specific_key_count++;
+
+ if (key == 0) {
+ // allocate
+ rb_ractor_t *cr = GET_RACTOR();
+ rb_thread_t *th;
+
+ ccan_list_for_each(&cr->threads.set, th, lt_node) {
+ thread_specific_storage_alloc(th);
+ }
+ }
+ return key;
+ }
+}
+
+// async and native thread safe.
+void *
+rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key)
+{
+ rb_thread_t *th = DATA_PTR(thread_val);
+
+ VM_ASSERT(rb_thread_ptr(thread_val) == th);
+ VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ VM_ASSERT(th->specific_storage);
+
+ return th->specific_storage[key];
+}
+
+// async and native thread safe.
+void
+rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data)
+{
+ rb_thread_t *th = DATA_PTR(thread_val);
+
+ VM_ASSERT(rb_thread_ptr(thread_val) == th);
+ VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ VM_ASSERT(th->specific_storage);
+
+ th->specific_storage[key] = data;
+}