summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c7039
1 files changed, 4514 insertions, 2525 deletions
diff --git a/thread.c b/thread.c
index 0ba41c7679..e66352c03f 100644
--- a/thread.c
+++ b/thread.c
@@ -20,6 +20,12 @@
model 3: Native Thread with fine grain lock
Using pthread and Ruby threads run concurrent or parallel.
+ model 4: M:N User:Native threads with Global VM lock
+ Combination of model 1 and 2
+
+ model 5: M:N User:Native thread with fine grain lock
+ Combination of model 1 and 3
+
------------------------------------------------------------------------
model 2:
@@ -42,10 +48,59 @@
*/
+/*
+ * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc
+ * 2.15 or later and set _FORTIFY_SOURCE > 0.
+ * However, the implementation is wrong. Even though Linux's select(2)
+ * supports large fd size (>FD_SETSIZE), it wrongly assumes fd is always
+ * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT,
+ * it doesn't work correctly and makes program abort. Therefore we need to
+ * disable FORTIFY_SOURCE until glibc fixes it.
+ */
+#undef _FORTIFY_SOURCE
+#undef __USE_FORTIFY_LEVEL
+#define __USE_FORTIFY_LEVEL 0
+
/* for model 2 */
+#include "ruby/internal/config.h"
+
+#ifdef __linux__
+// Normally, gcc(1) translates calls to alloca() with inlined code. This is not done when either the -ansi, -std=c89, -std=c99, or the -std=c11 option is given and the header <alloca.h> is not included.
+# include <alloca.h>
+#endif
+
+#define TH_SCHED(th) (&(th)->ractor->threads.sched)
+
#include "eval_intern.h"
-#include "gc.h"
+#include "hrtime.h"
+#include "internal.h"
+#include "internal/class.h"
+#include "internal/cont.h"
+#include "internal/error.h"
+#include "internal/eval.h"
+#include "internal/gc.h"
+#include "internal/hash.h"
+#include "internal/io.h"
+#include "internal/object.h"
+#include "internal/proc.h"
+#include "ruby/fiber/scheduler.h"
+#include "internal/signal.h"
+#include "internal/thread.h"
+#include "internal/time.h"
+#include "internal/warnings.h"
+#include "iseq.h"
+#include "ruby/debug.h"
+#include "ruby/io.h"
+#include "ruby/thread.h"
+#include "ruby/thread_native.h"
+#include "timev.h"
+#include "vm_core.h"
+#include "ractor_core.h"
+#include "vm_debug.h"
+#include "vm_sync.h"
+
+#include "ccan/list/list.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
#define USE_NATIVE_THREAD_PRIORITY 0
@@ -53,733 +108,1165 @@
#define RUBY_THREAD_PRIORITY_MIN -3
#endif
-#ifndef THREAD_DEBUG
-#define THREAD_DEBUG 0
-#endif
-
-VALUE rb_cMutex;
-VALUE rb_cBarrier;
+static VALUE rb_cThreadShield;
+static VALUE cThGroup;
-static void sleep_timeval(rb_thread_t *th, struct timeval time);
-static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec);
-static void sleep_forever(rb_thread_t *th, int nodeadlock);
-static double timeofday(void);
-struct timeval rb_time_interval(VALUE);
-static int rb_threadptr_dead(rb_thread_t *th);
+static VALUE sym_immediate;
+static VALUE sym_on_blocking;
+static VALUE sym_never;
-static void rb_check_deadlock(rb_vm_t *vm);
+static uint32_t thread_default_quantum_ms = 100;
-int rb_signal_buff_size(void);
-void rb_signal_exec(rb_thread_t *th, int sig);
-void rb_disable_interrupt(void);
-void rb_thread_stop_timer_thread(void);
+#define THREAD_LOCAL_STORAGE_INITIALISED FL_USER13
+#define THREAD_LOCAL_STORAGE_INITIALISED_P(th) RB_FL_TEST_RAW((th), THREAD_LOCAL_STORAGE_INITIALISED)
-static const VALUE eKillSignal = INT2FIX(0);
-static const VALUE eTerminateSignal = INT2FIX(1);
-static volatile int system_working = 1;
-
-inline static void
-st_delete_wrap(st_table *table, st_data_t key)
+static inline VALUE
+rb_thread_local_storage(VALUE thread)
{
- st_delete(table, &key, 0);
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ rb_ivar_set(thread, idLocals, rb_hash_new());
+ RB_FL_SET_RAW(thread, THREAD_LOCAL_STORAGE_INITIALISED);
+ }
+ return rb_ivar_get(thread, idLocals);
}
+enum SLEEP_FLAGS {
+ SLEEP_DEADLOCKABLE = 0x01,
+ SLEEP_SPURIOUS_CHECK = 0x02,
+ SLEEP_ALLOW_SPURIOUS = 0x04,
+ SLEEP_NO_CHECKINTS = 0x08,
+};
+
+static void sleep_forever(rb_thread_t *th, unsigned int fl);
+static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
+
+static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end);
+static int rb_threadptr_dead(rb_thread_t *th);
+static void rb_check_deadlock(rb_ractor_t *r);
+static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
+static const char *thread_status_name(rb_thread_t *th, int detail);
+static int hrtime_update_expire(rb_hrtime_t *, const rb_hrtime_t);
+NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
+MAYBE_UNUSED(static int consume_communication_pipe(int fd));
+
+static rb_atomic_t system_working = 1;
+static rb_internal_thread_specific_key_t specific_key_count;
+
/********************************************************************************/
#define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
struct rb_blocking_region_buffer {
enum rb_thread_status prev_status;
- struct rb_unblock_callback oldubf;
};
-static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
- struct rb_unblock_callback *old);
-static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old);
+static int unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted);
+static void unblock_function_clear(rb_thread_t *th);
+static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
+ rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted);
static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region);
-#define RB_GC_SAVE_MACHINE_CONTEXT(th) \
- do { \
- rb_gc_save_machine_context(th); \
- SET_MACHINE_STACK_END(&(th)->machine_stack_end); \
- } while (0)
-
-#define GVL_UNLOCK_BEGIN() do { \
- rb_thread_t *_th_stored = GET_THREAD(); \
- RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \
- native_mutex_unlock(&_th_stored->vm->global_vm_lock)
+#define THREAD_BLOCKING_BEGIN(th) do { \
+ struct rb_thread_sched * const sched = TH_SCHED(th); \
+ RB_VM_SAVE_MACHINE_CONTEXT(th); \
+ thread_sched_to_waiting((sched), (th));
-#define GVL_UNLOCK_END() \
- native_mutex_lock(&_th_stored->vm->global_vm_lock); \
- rb_thread_set_current(_th_stored); \
+#define THREAD_BLOCKING_END(th) \
+ thread_sched_to_running((sched), (th)); \
+ rb_ractor_thread_switch(th->ractor, th, false); \
} while(0)
-#define BLOCKING_REGION_CORE(exec) do { \
- GVL_UNLOCK_BEGIN(); {\
- exec; \
- } \
- GVL_UNLOCK_END(); \
-} while(0);
-
-#define blocking_region_begin(th, region, func, arg) \
- do { \
- (region)->prev_status = (th)->status; \
- (th)->blocking_region_buffer = (region); \
- set_unblock_function((th), (func), (arg), &(region)->oldubf); \
- (th)->status = THREAD_STOPPED; \
- thread_debug("enter blocking region (%p)\n", (void *)(th)); \
- RB_GC_SAVE_MACHINE_CONTEXT(th); \
- native_mutex_unlock(&(th)->vm->global_vm_lock); \
- } while (0)
-
-#define BLOCKING_REGION(exec, ubf, ubfarg) do { \
- rb_thread_t *__th = GET_THREAD(); \
+#ifdef __GNUC__
+#ifdef HAVE_BUILTIN___BUILTIN_CHOOSE_EXPR_CONSTANT_P
+#define only_if_constant(expr, notconst) __builtin_choose_expr(__builtin_constant_p(expr), (expr), (notconst))
+#else
+#define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst))
+#endif
+#else
+#define only_if_constant(expr, notconst) notconst
+#endif
+#define BLOCKING_REGION(th, exec, ubf, ubfarg, fail_if_interrupted) do { \
struct rb_blocking_region_buffer __region; \
- blocking_region_begin(__th, &__region, ubf, ubfarg); \
- exec; \
- blocking_region_end(__th, &__region); \
- RUBY_VM_CHECK_INTS(); \
+ if (blocking_region_begin(th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \
+ /* always return true unless fail_if_interrupted */ \
+ !only_if_constant(fail_if_interrupted, TRUE)) { \
+ /* Important that this is inlined into the macro, and not part of \
+ * blocking_region_begin - see bug #20493 */ \
+ RB_VM_SAVE_MACHINE_CONTEXT(th); \
+ thread_sched_to_waiting(TH_SCHED(th), th); \
+ exec; \
+ blocking_region_end(th, &__region); \
+ }; \
} while(0)
-#if THREAD_DEBUG
-#ifdef HAVE_VA_ARGS_MACRO
-void rb_thread_debug(const char *file, int line, const char *fmt, ...);
-#define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__)
-#define POSITION_FORMAT "%s:%d:"
-#define POSITION_ARGS ,file, line
-#else
-void rb_thread_debug(const char *fmt, ...);
-#define thread_debug rb_thread_debug
-#define POSITION_FORMAT
-#define POSITION_ARGS
+/*
+ * returns true if this thread was spuriously interrupted, false otherwise
+ * (e.g. hit by Thread#run or ran a Ruby-level Signal.trap handler)
+ */
+#define RUBY_VM_CHECK_INTS_BLOCKING(ec) vm_check_ints_blocking(ec)
+static inline int
+vm_check_ints_blocking(rb_execution_context_t *ec)
+{
+#ifdef RUBY_ASSERT_CRITICAL_SECTION
+ VM_ASSERT(ruby_assert_critical_section_entered == 0);
#endif
-# if THREAD_DEBUG < 0
-static int rb_thread_debug_enabled;
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
-/*
- * call-seq:
- * Thread.DEBUG => num
- *
- * Returns the thread debug level. Available only if compiled with
- * THREAD_DEBUG=-1.
- */
+ if (LIKELY(rb_threadptr_pending_interrupt_empty_p(th))) {
+ if (LIKELY(!RUBY_VM_INTERRUPTED_ANY(ec))) return FALSE;
+ }
+ else {
+ th->pending_interrupt_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(ec);
+ }
-static VALUE
-rb_thread_s_debug(void)
+ int result = rb_threadptr_execute_interrupts(th, 1);
+
+ // When a signal is received, we yield to the scheduler as soon as possible:
+ if (result || RUBY_VM_INTERRUPTED(ec)) {
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_yield(scheduler);
+ }
+ }
+
+ return result;
+}
+
+int
+rb_vm_check_ints_blocking(rb_execution_context_t *ec)
{
- return INT2NUM(rb_thread_debug_enabled);
+ return vm_check_ints_blocking(ec);
}
/*
- * call-seq:
- * Thread.DEBUG = num
- *
- * Sets the thread debug level. Available only if compiled with
- * THREAD_DEBUG=-1.
+ * poll() is supported by many OSes, but so far Linux is the only
+ * one we know of that supports using poll() in all places select()
+ * would work.
*/
+#if defined(HAVE_POLL)
+# if defined(__linux__)
+# define USE_POLL
+# endif
+# if defined(__FreeBSD_version) && __FreeBSD_version >= 1100000
+# define USE_POLL
+ /* FreeBSD does not set POLLOUT when POLLHUP happens */
+# define POLLERR_SET (POLLHUP | POLLERR)
+# endif
+#endif
-static VALUE
-rb_thread_s_debug_set(VALUE self, VALUE val)
+static void
+timeout_prepare(rb_hrtime_t **to, rb_hrtime_t *rel, rb_hrtime_t *end,
+ const struct timeval *timeout)
{
- rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0;
- return val;
+ if (timeout) {
+ *rel = rb_timeval2hrtime(timeout);
+ *end = rb_hrtime_add(rb_hrtime_now(), *rel);
+ *to = rel;
+ }
+ else {
+ *to = 0;
+ }
}
-# else
-# define rb_thread_debug_enabled THREAD_DEBUG
-# endif
-#else
-#define thread_debug if(0)printf
-#endif
-#ifndef __ia64
-#define thread_start_func_2(th, st, rst) thread_start_func_2(th, st)
-#endif
-NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start,
- VALUE *register_stack_start));
-static void timer_thread_function(void *);
+MAYBE_UNUSED(NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start)));
+MAYBE_UNUSED(static bool th_has_dedicated_nt(const rb_thread_t *th));
+MAYBE_UNUSED(static int waitfd_to_waiting_flag(int wfd_event));
+
+#include THREAD_IMPL_SRC
-#if defined(_WIN32)
-#include "thread_win32.c"
+/*
+ * TODO: somebody with win32 knowledge should be able to get rid of
+ * timer-thread by busy-waiting on signals. And it should be possible
+ * to make the GVL in thread_pthread.c be platform-independent.
+ */
+#ifndef BUSY_WAIT_SIGNALS
+# define BUSY_WAIT_SIGNALS (0)
+#endif
-#define DEBUG_OUT() \
- WaitForSingleObject(&debug_mutex, INFINITE); \
- printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \
- fflush(stdout); \
- ReleaseMutex(&debug_mutex);
+#ifndef USE_EVENTFD
+# define USE_EVENTFD (0)
+#endif
-#elif defined(HAVE_PTHREAD_H)
-#include "thread_pthread.c"
+#include "thread_sync.c"
-#define DEBUG_OUT() \
- pthread_mutex_lock(&debug_mutex); \
- printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \
- fflush(stdout); \
- pthread_mutex_unlock(&debug_mutex);
+void
+rb_nativethread_lock_initialize(rb_nativethread_lock_t *lock)
+{
+ rb_native_mutex_initialize(lock);
+}
-#else
-#error "unsupported thread type"
-#endif
+void
+rb_nativethread_lock_destroy(rb_nativethread_lock_t *lock)
+{
+ rb_native_mutex_destroy(lock);
+}
-#if THREAD_DEBUG
-static int debug_mutex_initialized = 1;
-static rb_thread_lock_t debug_mutex;
+void
+rb_nativethread_lock_lock(rb_nativethread_lock_t *lock)
+{
+ rb_native_mutex_lock(lock);
+}
void
-rb_thread_debug(
-#ifdef HAVE_VA_ARGS_MACRO
- const char *file, int line,
-#endif
- const char *fmt, ...)
+rb_nativethread_lock_unlock(rb_nativethread_lock_t *lock)
{
- va_list args;
- char buf[BUFSIZ];
+ rb_native_mutex_unlock(lock);
+}
+
+static int
+unblock_function_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg, int fail_if_interrupted)
+{
+ do {
+ if (fail_if_interrupted) {
+ if (RUBY_VM_INTERRUPTED_ANY(th->ec)) {
+ return FALSE;
+ }
+ }
+ else {
+ RUBY_VM_CHECK_INTS(th->ec);
+ }
- if (!rb_thread_debug_enabled) return;
+ rb_native_mutex_lock(&th->interrupt_lock);
+ } while (!th->ec->raised_flag && RUBY_VM_INTERRUPTED_ANY(th->ec) &&
+ (rb_native_mutex_unlock(&th->interrupt_lock), TRUE));
- if (debug_mutex_initialized == 1) {
- debug_mutex_initialized = 0;
- native_mutex_initialize(&debug_mutex);
- }
+ VM_ASSERT(th->unblock.func == NULL);
- va_start(args, fmt);
- vsnprintf(buf, BUFSIZ, fmt, args);
- va_end(args);
+ th->unblock.func = func;
+ th->unblock.arg = arg;
+ rb_native_mutex_unlock(&th->interrupt_lock);
- DEBUG_OUT();
+ return TRUE;
}
-#endif
+static void
+unblock_function_clear(rb_thread_t *th)
+{
+ rb_native_mutex_lock(&th->interrupt_lock);
+ th->unblock.func = 0;
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
static void
-set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
- struct rb_unblock_callback *old)
+threadptr_set_interrupt_locked(rb_thread_t *th, bool trap)
{
- check_ints:
- RUBY_VM_CHECK_INTS(); /* check signal or so */
- native_mutex_lock(&th->interrupt_lock);
- if (th->interrupt_flag) {
- native_mutex_unlock(&th->interrupt_lock);
- goto check_ints;
+ // th->interrupt_lock should be acquired here
+
+ RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
+
+ if (trap) {
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
else {
- if (old) *old = th->unblock;
- th->unblock.func = func;
- th->unblock.arg = arg;
+ RUBY_VM_SET_INTERRUPT(th->ec);
+ }
+
+ if (th->unblock.func != NULL) {
+ (th->unblock.func)(th->unblock.arg);
+ }
+ else {
+ /* none */
}
- native_mutex_unlock(&th->interrupt_lock);
}
static void
-reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old)
+threadptr_set_interrupt(rb_thread_t *th, int trap)
{
- native_mutex_lock(&th->interrupt_lock);
- th->unblock = *old;
- native_mutex_unlock(&th->interrupt_lock);
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ threadptr_set_interrupt_locked(th, trap);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
}
-static void
+/* Set interrupt flag on another thread or current thread, and call its UBF if it has one set */
+void
rb_threadptr_interrupt(rb_thread_t *th)
{
- native_mutex_lock(&th->interrupt_lock);
- RUBY_VM_SET_INTERRUPT(th);
- if (th->unblock.func) {
- (th->unblock.func)(th->unblock.arg);
- }
- else {
- /* none */
- }
- native_mutex_unlock(&th->interrupt_lock);
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+ threadptr_set_interrupt(th, false);
}
+static void
+threadptr_trap_interrupt(rb_thread_t *th)
+{
+ threadptr_set_interrupt(th, true);
+}
-static int
-terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)
+static void
+terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread)
{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th);
+ rb_thread_t *th = 0;
+
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ if (th != main_thread) {
+ RUBY_DEBUG_LOG("terminate start th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
+
+ rb_threadptr_pending_interrupt_enque(th, RUBY_FATAL_THREAD_TERMINATED);
+ rb_threadptr_interrupt(th);
- if (th != main_thread) {
- thread_debug("terminate_i: %p\n", (void *)th);
- rb_threadptr_interrupt(th);
- th->thrown_errinfo = eTerminateSignal;
- th->status = THREAD_TO_KILL;
+ RUBY_DEBUG_LOG("terminate done th:%u status:%s", rb_th_serial(th), thread_status_name(th, TRUE));
+ }
+ else {
+ RUBY_DEBUG_LOG("main thread th:%u", rb_th_serial(th));
+ }
}
- else {
- thread_debug("terminate_i: main thread (%p)\n", (void *)th);
+}
+
+static void
+rb_threadptr_join_list_wakeup(rb_thread_t *thread)
+{
+ while (thread->join_list) {
+ struct rb_waiting_list *join_list = thread->join_list;
+
+ // Consume the entry from the join list:
+ thread->join_list = join_list->next;
+
+ rb_thread_t *target_thread = join_list->thread;
+
+ if (target_thread->scheduler != Qnil && join_list->fiber) {
+ rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
+ }
+ else {
+ rb_threadptr_interrupt(target_thread);
+
+ switch (target_thread->status) {
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ target_thread->status = THREAD_RUNNABLE;
+ break;
+ default:
+ break;
+ }
+ }
}
- return ST_CONTINUE;
}
-typedef struct rb_mutex_struct
+void
+rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
{
- rb_thread_lock_t lock;
- rb_thread_cond_t cond;
- struct rb_thread_struct volatile *th;
- volatile int cond_waiting, cond_notified;
- struct rb_mutex_struct *next_mutex;
-} mutex_t;
+ while (th->keeping_mutexes) {
+ rb_mutex_t *mutex = th->keeping_mutexes;
+ th->keeping_mutexes = mutex->next_mutex;
-static void rb_mutex_unlock_all(mutex_t *mutex, rb_thread_t *th);
-static void rb_mutex_abandon_all(mutex_t *mutexes);
+ // rb_warn("mutex #<%p> was not unlocked by thread #<%p>", (void *)mutex, (void*)th);
+ VM_ASSERT(mutex->ec_serial);
+ const char *error_message = rb_mutex_unlock_th(mutex, th, 0);
+ if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
+ }
+}
void
-rb_thread_terminate_all(void)
+rb_thread_terminate_all(rb_thread_t *th)
{
- rb_thread_t *th = GET_THREAD(); /* main thread */
- rb_vm_t *vm = th->vm;
- VALUE einfo = vm->main_thread->errinfo;
- int state;
+ rb_ractor_t *cr = th->ractor;
+ rb_execution_context_t * volatile ec = th->ec;
+ volatile int sleeping = 0;
- if (vm->main_thread != th) {
- rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
- (void *)vm->main_thread, (void *)th);
+ if (cr->threads.main != th) {
+ rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
+ (void *)cr->threads.main, (void *)th);
}
/* unlock all locking mutexes */
- if (th->keeping_mutexes) {
- rb_mutex_unlock_all(th->keeping_mutexes, GET_THREAD());
+ rb_threadptr_unlock_all_locking_mutexes(th);
+
+ EC_PUSH_TAG(ec);
+ if (EC_EXEC_TAG() == TAG_NONE) {
+ retry:
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+
+ terminate_all(cr, th);
+
+ while (rb_ractor_living_thread_num(cr) > 1) {
+ rb_hrtime_t rel = RB_HRTIME_PER_SEC;
+ /*q
+ * Thread exiting routine in thread_start_func_2 notify
+ * me when the last sub-thread exit.
+ */
+ sleeping = 1;
+ native_sleep(th, &rel);
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ sleeping = 0;
+ }
}
-
- thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
- st_foreach(vm->living_threads, terminate_i, (st_data_t)th);
-
- while (!rb_thread_alone()) {
- PUSH_TAG();
- if ((state = EXEC_TAG()) == 0) {
- rb_thread_schedule();
- }
- POP_TAG();
- if (state && einfo != vm->main_thread->errinfo &&
- RUBY_VM_SET_INTERRUPT(vm->main_thread)) {
- break;
- }
+ else {
+ /*
+ * When caught an exception (e.g. Ctrl+C), let's broadcast
+ * kill request again to ensure killing all threads even
+ * if they are blocked on sleep, mutex, etc.
+ */
+ if (sleeping) {
+ sleeping = 0;
+ goto retry;
+ }
}
- rb_thread_stop_timer_thread();
+ EC_POP_TAG();
}
+void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
+static void threadptr_interrupt_exec_cleanup(rb_thread_t *th);
+
static void
thread_cleanup_func_before_exec(void *th_ptr)
{
rb_thread_t *th = th_ptr;
th->status = THREAD_KILLED;
- th->machine_stack_start = th->machine_stack_end = 0;
-#ifdef __ia64
- th->machine_register_stack_start = th->machine_register_stack_end = 0;
-#endif
+
+ // The thread stack doesn't exist in the forked process:
+ th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
+
+ threadptr_interrupt_exec_cleanup(th);
+ rb_threadptr_root_fiber_terminate(th);
}
static void
-thread_cleanup_func(void *th_ptr)
+thread_cleanup_func(void *th_ptr, int atfork)
{
rb_thread_t *th = th_ptr;
- /* unlock all locking mutexes */
- if (th->keeping_mutexes) {
- rb_mutex_unlock_all(th->keeping_mutexes, th);
- th->keeping_mutexes = NULL;
- }
th->locking_mutex = Qfalse;
thread_cleanup_func_before_exec(th_ptr);
- native_thread_destroy(th);
+
+ if (atfork) {
+ native_thread_destroy_atfork(th->nt);
+ th->nt = NULL;
+ return;
+ }
+
+ rb_native_mutex_destroy(&th->interrupt_lock);
+}
+
+void
+rb_thread_free_native_thread(void *th_ptr)
+{
+ rb_thread_t *th = th_ptr;
+
+ native_thread_destroy_atfork(th->nt);
+ th->nt = NULL;
}
-extern void ruby_error_print(void);
static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *);
-void rb_thread_recycle_stack_release(VALUE *);
+static VALUE rb_thread_to_s(VALUE thread);
void
-ruby_thread_init_stack(rb_thread_t *th)
+ruby_thread_init_stack(rb_thread_t *th, void *local_in_parent_frame)
{
- native_thread_init_stack(th);
+ native_thread_init_stack(th, local_in_parent_frame);
}
-static int
-thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start)
+const VALUE *
+rb_vm_proc_local_ep(VALUE proc)
+{
+ const VALUE *ep = vm_proc_ep(proc);
+
+ if (ep) {
+ return rb_vm_ep_local_ep(ep);
+ }
+ else {
+ return NULL;
+ }
+}
+
+// for ractor, defined in vm.c
+VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
+ int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);
+
+static VALUE
+thread_do_start_proc(rb_thread_t *th)
{
- int state;
- VALUE args = th->first_args;
+ VALUE args = th->invoke_arg.proc.args;
+ const VALUE *args_ptr;
+ int args_len;
+ VALUE procval = th->invoke_arg.proc.proc;
rb_proc_t *proc;
- rb_thread_t *join_th;
- rb_thread_t *main_th;
+ GetProcPtr(procval, proc);
+
+ th->ec->errinfo = Qnil;
+ th->ec->root_lep = rb_vm_proc_local_ep(procval);
+ th->ec->root_svar = Qfalse;
+
+ vm_check_ints_blocking(th->ec);
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ VALUE self = rb_ractor_self(th->ractor);
+ th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
+
+ VM_ASSERT(FIXNUM_P(args));
+ args_len = FIX2INT(args);
+ args_ptr = ALLOCA_N(VALUE, args_len);
+ rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
+ vm_check_ints_blocking(th->ec);
+
+ return rb_vm_invoke_proc_with_self(
+ th->ec, proc, self,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE
+ );
+ }
+ else {
+ args_len = RARRAY_LENINT(args);
+ if (args_len < 8) {
+ /* free proc.args if the length is enough small */
+ args_ptr = ALLOCA_N(VALUE, args_len);
+ MEMCPY((VALUE *)args_ptr, RARRAY_CONST_PTR(args), VALUE, args_len);
+ th->invoke_arg.proc.args = Qnil;
+ }
+ else {
+ args_ptr = RARRAY_CONST_PTR(args);
+ }
+
+ vm_check_ints_blocking(th->ec);
+
+ return rb_vm_invoke_proc(
+ th->ec, proc,
+ args_len, args_ptr,
+ th->invoke_arg.proc.kw_splat,
+ VM_BLOCK_HANDLER_NONE
+ );
+ }
+}
+
+static VALUE
+thread_do_start(rb_thread_t *th)
+{
+ native_set_thread_name(th);
+ VALUE result = Qundef;
+
+ switch (th->invoke_type) {
+ case thread_invoke_type_proc:
+ result = thread_do_start_proc(th);
+ break;
+
+ case thread_invoke_type_ractor_proc:
+ result = thread_do_start_proc(th);
+ rb_ractor_atexit(th->ec, result);
+ break;
+
+ case thread_invoke_type_func:
+ result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
+ break;
+
+ case thread_invoke_type_none:
+ rb_bug("unreachable");
+ }
+
+ return result;
+}
+
+void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
+
+static int
+thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
+{
+ RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
+ VM_ASSERT(th != th->vm->ractor.main_thread);
+
+ enum ruby_tag_type state;
VALUE errinfo = Qnil;
+ rb_thread_t *ractor_main_th = th->ractor->threads.main;
+
+ // setup ractor
+ if (rb_ractor_status_p(th->ractor, ractor_blocking)) {
+ RB_VM_LOCK();
+ {
+ rb_vm_ractor_blocking_cnt_dec(th->vm, th->ractor, __FILE__, __LINE__);
+ rb_ractor_t *r = th->ractor;
+ r->r_stdin = rb_io_prep_stdin();
+ r->r_stdout = rb_io_prep_stdout();
+ r->r_stderr = rb_io_prep_stderr();
+ }
+ RB_VM_UNLOCK();
+ }
- ruby_thread_set_native(th);
+ // Ensure that we are not joinable.
+ VM_ASSERT(UNDEF_P(th->value));
- th->machine_stack_start = stack_start;
-#ifdef __ia64
- th->machine_register_stack_start = register_stack_start;
-#endif
- thread_debug("thread start: %p\n", (void *)th);
+ int fiber_scheduler_closed = 0, event_thread_end_hooked = 0;
+ VALUE result = Qundef;
- native_mutex_lock(&th->vm->global_vm_lock);
- {
- thread_debug("thread start (get lock): %p\n", (void *)th);
- rb_thread_set_current(th);
-
- TH_PUSH_TAG(th);
- if ((state = EXEC_TAG()) == 0) {
- SAVE_ROOT_JMPBUF(th, {
- if (!th->first_func) {
- GetProcPtr(th->first_proc, proc);
- th->errinfo = Qnil;
- th->local_lfp = proc->block.lfp;
- th->local_svar = Qnil;
- th->value = rb_vm_invoke_proc(th, proc, proc->block.self,
- (int)RARRAY_LEN(args), RARRAY_PTR(args), 0);
- }
- else {
- th->value = (*th->first_func)((void *)args);
- }
- });
- }
- else {
- errinfo = th->errinfo;
- if (NIL_P(errinfo)) errinfo = rb_errinfo();
- if (state == TAG_FATAL) {
- /* fatal error within this thread, need to stop whole script */
- }
- else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
- if (th->safe_level >= 4) {
- th->errinfo = rb_exc_new3(rb_eSecurityError,
- rb_sprintf("Insecure exit at level %d", th->safe_level));
- errinfo = Qnil;
- }
- }
- else if (th->safe_level < 4 &&
- (th->vm->thread_abort_on_exception ||
- th->abort_on_exception || RTEST(ruby_debug))) {
- /* exit on main_thread */
- }
- else {
- errinfo = Qnil;
- }
- th->value = Qnil;
- }
-
- th->status = THREAD_KILLED;
- thread_debug("thread end: %p\n", (void *)th);
-
- main_th = th->vm->main_thread;
- if (th != main_th) {
- if (TYPE(errinfo) == T_OBJECT) {
- /* treat with normal error object */
- rb_threadptr_raise(main_th, 1, &errinfo);
- }
- }
- TH_POP_TAG();
-
- /* locking_mutex must be Qfalse */
- if (th->locking_mutex != Qfalse) {
- rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
- (void *)th, th->locking_mutex);
- }
-
- /* delete self other than main thread from living_threads */
- if (th != main_th) {
- st_delete_wrap(th->vm->living_threads, th->self);
- }
-
- /* wake up joining threads */
- join_th = th->join_list_head;
- while (join_th) {
- if (join_th == main_th) errinfo = Qnil;
- rb_threadptr_interrupt(join_th);
- switch (join_th->status) {
- case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
- join_th->status = THREAD_RUNNABLE;
- default: break;
- }
- join_th = join_th->join_list_next;
- }
-
- if (!th->root_fiber) {
- rb_thread_recycle_stack_release(th->stack);
- th->stack = 0;
- }
- }
- thread_cleanup_func(th);
- if (th != main_th) rb_check_deadlock(th->vm);
- if (th->vm->main_thread == th) {
- ruby_cleanup(state);
- }
- native_mutex_unlock(&th->vm->global_vm_lock);
+ EC_PUSH_TAG(th->ec);
+
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
+
+ result = thread_do_start(th);
+ }
+
+ if (!fiber_scheduler_closed) {
+ fiber_scheduler_closed = 1;
+ rb_fiber_scheduler_set(Qnil);
+ }
+
+ if (!event_thread_end_hooked) {
+ event_thread_end_hooked = 1;
+ EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
+ }
+
+ if (state == TAG_NONE) {
+ // This must be set AFTER doing all user-level code. At this point, the thread is effectively finished and calls to `Thread#join` will succeed.
+ th->value = result;
+ }
+ else {
+ errinfo = th->ec->errinfo;
+
+ VALUE exc = rb_vm_make_jump_tag_but_local_jump(state, Qundef);
+ if (!NIL_P(exc)) errinfo = exc;
+
+ if (state == TAG_FATAL) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit(th->ec, Qnil);
+ }
+ /* fatal error within this thread, need to stop whole script */
+ }
+ else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
+
+ /* exit on main_thread. */
+ }
+ else {
+ if (th->report_on_exception) {
+ VALUE mesg = rb_thread_to_s(th->self);
+ rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
+ rb_write_error_str(mesg);
+ rb_ec_error_print(th->ec, errinfo);
+ }
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_ractor_atexit_exception(th->ec);
+ }
+
+ if (th->vm->thread_abort_on_exception ||
+ th->abort_on_exception || RTEST(ruby_debug)) {
+ /* exit on main_thread */
+ }
+ else {
+ errinfo = Qnil;
+ }
+ }
+ th->value = Qnil;
+ }
+
+ // The thread is effectively finished and can be joined.
+ VM_ASSERT(!UNDEF_P(th->value));
+
+ rb_threadptr_join_list_wakeup(th);
+ rb_threadptr_unlock_all_locking_mutexes(th);
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ rb_thread_terminate_all(th);
+ rb_ractor_teardown(th->ec);
+ }
+
+ th->status = THREAD_KILLED;
+ RUBY_DEBUG_LOG("killed th:%u", rb_th_serial(th));
+
+ if (th->vm->ractor.main_thread == th) {
+ ruby_stop(0);
+ }
+
+ if (RB_TYPE_P(errinfo, T_OBJECT)) {
+ /* treat with normal error object */
+ rb_threadptr_raise(ractor_main_th, 1, &errinfo);
+ }
+
+ EC_POP_TAG();
+
+ rb_ec_clear_current_thread_trace_func(th->ec);
+
+ /* locking_mutex must be Qfalse */
+ if (th->locking_mutex != Qfalse) {
+ rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")",
+ (void *)th, th->locking_mutex);
+ }
+
+ if (ractor_main_th->status == THREAD_KILLED &&
+ th->ractor->threads.cnt <= 2 /* main thread and this thread */) {
+ /* I'm last thread. wake up main thread from rb_thread_terminate_all */
+ rb_threadptr_interrupt(ractor_main_th);
+ }
+
+ rb_check_deadlock(th->ractor);
+
+ rb_fiber_close(th->ec->fiber_ptr);
+
+ thread_cleanup_func(th, FALSE);
+ VM_ASSERT(th->ec->vm_stack == NULL);
+
+ if (th->invoke_type == thread_invoke_type_ractor_proc) {
+ // after rb_ractor_living_threads_remove()
+ // GC will happen anytime and this ractor can be collected (and destroy GVL).
+ // So gvl_release() should be before it.
+ thread_sched_to_dead(TH_SCHED(th), th);
+ rb_ractor_living_threads_remove(th->ractor, th);
+ }
+ else {
+ rb_ractor_living_threads_remove(th->ractor, th);
+ thread_sched_to_dead(TH_SCHED(th), th);
+ }
return 0;
}
+struct thread_create_params {
+ enum thread_invoke_type type;
+
+ // for normal proc thread
+ VALUE args;
+ VALUE proc;
+
+ // for ractor
+ rb_ractor_t *g;
+
+ // for func
+ VALUE (*fn)(void *);
+};
+
+static void thread_specific_storage_alloc(rb_thread_t *th);
+
static VALUE
-thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
+thread_create_core(VALUE thval, struct thread_create_params *params)
{
- rb_thread_t *th;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_thread_ptr(thval), *current_th = rb_ec_thread_ptr(ec);
int err;
- if (OBJ_FROZEN(GET_THREAD()->thgroup)) {
- rb_raise(rb_eThreadError,
- "can't start a new thread (frozen ThreadGroup)");
+ thread_specific_storage_alloc(th);
+
+ if (OBJ_FROZEN(current_th->thgroup)) {
+ rb_raise(rb_eThreadError,
+ "can't start a new thread (frozen ThreadGroup)");
+ }
+
+ rb_fiber_inherit_storage(ec, th->ec->fiber_ptr);
+
+ switch (params->type) {
+ case thread_invoke_type_proc:
+ th->invoke_type = thread_invoke_type_proc;
+ th->invoke_arg.proc.args = params->args;
+ th->invoke_arg.proc.proc = params->proc;
+ th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
+ break;
+
+ case thread_invoke_type_ractor_proc:
+#if RACTOR_CHECK_MODE > 0
+ rb_ractor_setup_belonging_to(thval, rb_ractor_id(params->g));
+#endif
+ th->invoke_type = thread_invoke_type_ractor_proc;
+ th->ractor = params->g;
+ th->ec->ractor_id = rb_ractor_id(th->ractor);
+ th->ractor->threads.main = th;
+ th->invoke_arg.proc.proc = rb_proc_isolate_bang(params->proc, Qnil);
+ th->invoke_arg.proc.args = INT2FIX(RARRAY_LENINT(params->args));
+ th->invoke_arg.proc.kw_splat = rb_keyword_given_p();
+ rb_ractor_send_parameters(ec, params->g, params->args);
+ break;
+
+ case thread_invoke_type_func:
+ th->invoke_type = thread_invoke_type_func;
+ th->invoke_arg.func.func = params->fn;
+ th->invoke_arg.func.arg = (void *)params->args;
+ break;
+
+ default:
+ rb_bug("unreachable");
}
- GetThreadPtr(thval, th);
- /* setup thread environment */
- th->first_func = fn;
- th->first_proc = fn ? Qfalse : rb_block_proc();
- th->first_args = args; /* GC: shouldn't put before above line */
+ th->priority = current_th->priority;
+ th->thgroup = current_th->thgroup;
- th->priority = GET_THREAD()->priority;
- th->thgroup = GET_THREAD()->thgroup;
+ th->pending_interrupt_queue = rb_ary_hidden_new(0);
+ th->pending_interrupt_queue_checked = 0;
+ th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
+ RBASIC_CLEAR_CLASS(th->pending_interrupt_mask_stack);
+
+ rb_native_mutex_initialize(&th->interrupt_lock);
+
+ RUBY_DEBUG_LOG("r:%u th:%u", rb_ractor_id(th->ractor), rb_th_serial(th));
+
+ rb_ractor_living_threads_insert(th->ractor, th);
- native_mutex_initialize(&th->interrupt_lock);
/* kick thread */
- st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id);
err = native_thread_create(th);
if (err) {
- st_delete_wrap(th->vm->living_threads, th->self);
- th->status = THREAD_KILLED;
- rb_raise(rb_eThreadError, "can't create Thread (%d)", err);
+ th->status = THREAD_KILLED;
+ rb_ractor_living_threads_remove(th->ractor, th);
+ rb_raise(rb_eThreadError, "can't create Thread: %s", strerror(err));
}
return thval;
}
-/* :nodoc: */
+#define threadptr_initialized(th) ((th)->invoke_type != thread_invoke_type_none)
+
+/*
+ * call-seq:
+ * Thread.new { ... } -> thread
+ * Thread.new(*args, &proc) -> thread
+ * Thread.new(*args) { |args| ... } -> thread
+ *
+ * Creates a new thread executing the given block.
+ *
+ * Any +args+ given to ::new will be passed to the block:
+ *
+ * arr = []
+ * a, b, c = 1, 2, 3
+ * Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join
+ * arr #=> [1, 2, 3]
+ *
+ * A ThreadError exception is raised if ::new is called without a block.
+ *
+ * If you're going to subclass Thread, be sure to call super in your
+ * +initialize+ method, otherwise a ThreadError will be raised.
+ */
static VALUE
thread_s_new(int argc, VALUE *argv, VALUE klass)
{
rb_thread_t *th;
VALUE thread = rb_thread_alloc(klass);
- rb_obj_call_init(thread, argc, argv);
- GetThreadPtr(thread, th);
- if (!th->first_args) {
- rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'",
- rb_class2name(klass));
+
+ if (GET_RACTOR()->threads.main->status == THREAD_KILLED) {
+ rb_raise(rb_eThreadError, "can't alloc thread");
+ }
+
+ rb_obj_call_init_kw(thread, argc, argv, RB_PASS_CALLED_KEYWORDS);
+ th = rb_thread_ptr(thread);
+ if (!threadptr_initialized(th)) {
+ rb_raise(rb_eThreadError, "uninitialized thread - check '%"PRIsVALUE"#initialize'",
+ klass);
}
return thread;
}
/*
* call-seq:
- * Thread.start([args]*) {|args| block } => thread
- * Thread.fork([args]*) {|args| block } => thread
+ * Thread.start([args]*) {|args| block } -> thread
+ * Thread.fork([args]*) {|args| block } -> thread
*
- * Basically the same as <code>Thread::new</code>. However, if class
- * <code>Thread</code> is subclassed, then calling <code>start</code> in that
- * subclass will not invoke the subclass's <code>initialize</code> method.
+ * Basically the same as ::new. However, if class Thread is subclassed, then
+ * calling +start+ in that subclass will not invoke the subclass's
+ * +initialize+ method.
*/
static VALUE
thread_start(VALUE klass, VALUE args)
{
- return thread_create_core(rb_thread_alloc(klass), args, 0);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_proc,
+ .args = args,
+ .proc = rb_block_proc(),
+ };
+ return thread_create_core(rb_thread_alloc(klass), &params);
+}
+
+static VALUE
+threadptr_invoke_proc_location(rb_thread_t *th)
+{
+ if (th->invoke_type == thread_invoke_type_proc) {
+ return rb_proc_location(th->invoke_arg.proc.proc);
+ }
+ else {
+ return Qnil;
+ }
}
/* :nodoc: */
static VALUE
thread_initialize(VALUE thread, VALUE args)
{
- rb_thread_t *th;
+ rb_thread_t *th = rb_thread_ptr(thread);
+
if (!rb_block_given_p()) {
- rb_raise(rb_eThreadError, "must be called with a block");
- }
- GetThreadPtr(thread, th);
- if (th->first_args) {
- VALUE rb_proc_location(VALUE self);
- VALUE proc = th->first_proc, line, loc;
- const char *file;
- if (!proc || !RTEST(loc = rb_proc_location(proc))) {
+ rb_raise(rb_eThreadError, "must be called with a block");
+ }
+ else if (th->invoke_type != thread_invoke_type_none) {
+ VALUE loc = threadptr_invoke_proc_location(th);
+ if (!NIL_P(loc)) {
+ rb_raise(rb_eThreadError,
+ "already initialized thread - %"PRIsVALUE":%"PRIsVALUE,
+ RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
+ }
+ else {
rb_raise(rb_eThreadError, "already initialized thread");
}
- file = RSTRING_PTR(RARRAY_PTR(loc)[0]);
- if (NIL_P(line = RARRAY_PTR(loc)[1])) {
- rb_raise(rb_eThreadError, "already initialized thread - %s",
- file);
- }
- rb_raise(rb_eThreadError, "already initialized thread - %s:%d",
- file, NUM2INT(line));
}
- return thread_create_core(thread, args, 0);
+ else {
+ struct thread_create_params params = {
+ .type = thread_invoke_type_proc,
+ .args = args,
+ .proc = rb_block_proc(),
+ };
+ return thread_create_core(thread, &params);
+ }
}
VALUE
-rb_thread_create(VALUE (*fn)(ANYARGS), void *arg)
+rb_thread_create(VALUE (*fn)(void *), void *arg)
{
- return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn);
+ struct thread_create_params params = {
+ .type = thread_invoke_type_func,
+ .fn = fn,
+ .args = (VALUE)arg,
+ };
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
}
+VALUE
+rb_thread_create_ractor(rb_ractor_t *r, VALUE args, VALUE proc)
+{
+ struct thread_create_params params = {
+ .type = thread_invoke_type_ractor_proc,
+ .g = r,
+ .args = args,
+ .proc = proc,
+ };
+ return thread_create_core(rb_thread_alloc(rb_cThread), &params);
+}
-/* +infty, for this purpose */
-#define DELAY_INFTY 1E30
struct join_arg {
- rb_thread_t *target, *waiting;
- double limit;
- int forever;
+ struct rb_waiting_list *waiter;
+ rb_thread_t *target;
+ VALUE timeout;
+ rb_hrtime_t *limit;
};
static VALUE
remove_from_join_list(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
- rb_thread_t *target_th = p->target, *th = p->waiting;
+ rb_thread_t *target_thread = p->target;
- if (target_th->status != THREAD_KILLED) {
- rb_thread_t **pth = &target_th->join_list_head;
+ if (target_thread->status != THREAD_KILLED) {
+ struct rb_waiting_list **join_list = &target_thread->join_list;
+
+ while (*join_list) {
+ if (*join_list == p->waiter) {
+ *join_list = (*join_list)->next;
+ break;
+ }
- while (*pth) {
- if (*pth == th) {
- *pth = th->join_list_next;
- break;
- }
- pth = &(*pth)->join_list_next;
- }
+ join_list = &(*join_list)->next;
+ }
}
return Qnil;
}
+static int
+thread_finished(rb_thread_t *th)
+{
+ return th->status == THREAD_KILLED || !UNDEF_P(th->value);
+}
+
static VALUE
thread_join_sleep(VALUE arg)
{
struct join_arg *p = (struct join_arg *)arg;
- rb_thread_t *target_th = p->target, *th = p->waiting;
- double now, limit = p->limit;
-
- while (target_th->status != THREAD_KILLED) {
- if (p->forever) {
- sleep_forever(th, 1);
- }
- else {
- now = timeofday();
- if (now > limit) {
- thread_debug("thread_join: timeout (thid: %p)\n",
- (void *)target_th->thread_id);
- return Qfalse;
- }
- sleep_wait_for_interrupt(th, limit - now);
- }
- thread_debug("thread_join: interrupted (thid: %p)\n",
- (void *)target_th->thread_id);
+ rb_thread_t *target_th = p->target, *th = p->waiter->thread;
+ rb_hrtime_t end = 0, *limit = p->limit;
+
+ if (limit) {
+ end = rb_hrtime_add(*limit, rb_hrtime_now());
}
+
+ while (!thread_finished(target_th)) {
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
+
+ if (!limit) {
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_block(scheduler, target_th->self, Qnil);
+ }
+ else {
+ sleep_forever(th, SLEEP_DEADLOCKABLE | SLEEP_ALLOW_SPURIOUS | SLEEP_NO_CHECKINTS);
+ }
+ }
+ else {
+ if (hrtime_update_expire(limit, end)) {
+ RUBY_DEBUG_LOG("timeout target_th:%u", rb_th_serial(target_th));
+ return Qfalse;
+ }
+
+ if (scheduler != Qnil) {
+ VALUE timeout = rb_float_new(hrtime2double(*limit));
+ rb_fiber_scheduler_block(scheduler, target_th->self, timeout);
+ }
+ else {
+ th->status = THREAD_STOPPED;
+ native_sleep(th, limit);
+ }
+ }
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ th->status = THREAD_RUNNABLE;
+
+ RUBY_DEBUG_LOG("interrupted target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
+ }
+
return Qtrue;
}
static VALUE
-thread_join(rb_thread_t *target_th, double delay)
+thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit)
{
- rb_thread_t *th = GET_THREAD();
- struct join_arg arg;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = ec->thread_ptr;
+ rb_fiber_t *fiber = ec->fiber_ptr;
- arg.target = target_th;
- arg.waiting = th;
- arg.limit = timeofday() + delay;
- arg.forever = delay == DELAY_INFTY;
+ if (th == target_th) {
+ rb_raise(rb_eThreadError, "Target thread must not be current thread");
+ }
- thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id);
+ if (th->ractor->threads.main == target_th) {
+ rb_raise(rb_eThreadError, "Target thread must not be main thread");
+ }
+
+ RUBY_DEBUG_LOG("target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
if (target_th->status != THREAD_KILLED) {
- th->join_list_next = target_th->join_list_head;
- target_th->join_list_head = th;
- if (!rb_ensure(thread_join_sleep, (VALUE)&arg,
- remove_from_join_list, (VALUE)&arg)) {
- return Qnil;
- }
- }
-
- thread_debug("thread_join: success (thid: %p)\n",
- (void *)target_th->thread_id);
-
- if (target_th->errinfo != Qnil) {
- VALUE err = target_th->errinfo;
-
- if (FIXNUM_P(err)) {
- /* */
- }
- else if (TYPE(target_th->errinfo) == T_NODE) {
- rb_exc_raise(rb_vm_make_jump_tag_but_local_jump(
- GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err)));
- }
- else {
- /* normal exception */
- rb_exc_raise(err);
- }
+ struct rb_waiting_list waiter;
+ waiter.next = target_th->join_list;
+ waiter.thread = th;
+ waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber;
+ target_th->join_list = &waiter;
+
+ struct join_arg arg;
+ arg.waiter = &waiter;
+ arg.target = target_th;
+ arg.timeout = timeout;
+ arg.limit = limit;
+
+ if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) {
+ return Qnil;
+ }
+ }
+
+ RUBY_DEBUG_LOG("success target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
+
+ if (target_th->ec->errinfo != Qnil) {
+ VALUE err = target_th->ec->errinfo;
+
+ if (FIXNUM_P(err)) {
+ switch (err) {
+ case INT2FIX(TAG_FATAL):
+ RUBY_DEBUG_LOG("terminated target_th:%u status:%s", rb_th_serial(target_th), thread_status_name(target_th, TRUE));
+
+ /* OK. killed. */
+ break;
+ default:
+ if (err == RUBY_FATAL_FIBER_KILLED) { // not integer constant so can't be a case expression
+ // root fiber killed in non-main thread
+ break;
+ }
+ rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err));
+ }
+ }
+ else if (THROW_DATA_P(target_th->ec->errinfo)) {
+ rb_bug("thread_join: THROW_DATA should not reach here.");
+ }
+ else {
+ /* normal exception */
+ rb_exc_raise(err);
+ }
}
return target_th->self;
}
/*
* call-seq:
- * thr.join => thr
- * thr.join(limit) => thr
+ * thr.join -> thr
+ * thr.join(limit) -> thr
*
- * The calling thread will suspend execution and run <i>thr</i>. Does not
- * return until <i>thr</i> exits or until <i>limit</i> seconds have passed. If
- * the time limit expires, <code>nil</code> will be returned, otherwise
- * <i>thr</i> is returned.
+ * The calling thread will suspend execution and run this +thr+.
*
- * Any threads not joined will be killed when the main program exits. If
- * <i>thr</i> had previously raised an exception and the
- * <code>abort_on_exception</code> and <code>$DEBUG</code> flags are not set
- * (so the exception has not yet been processed) it will be processed at this
- * time.
+ * Does not return until +thr+ exits or until the given +limit+ seconds have
+ * passed.
*
- * a = Thread.new { print "a"; sleep(10); print "b"; print "c" }
- * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" }
- * x.join # Let x thread finish, a will be killed on exit.
+ * If the time limit expires, +nil+ will be returned, otherwise +thr+ is
+ * returned.
*
- * <em>produces:</em>
+ * Any threads not joined will be killed when the main program exits.
*
- * axyz
+ * If +thr+ had previously raised an exception and the ::abort_on_exception or
+ * $DEBUG flags are not set, (so the exception has not yet been processed), it
+ * will be processed at this time.
*
- * The following example illustrates the <i>limit</i> parameter.
+ * a = Thread.new { print "a"; sleep(10); print "b"; print "c" }
+ * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" }
+ * x.join # Let thread x finish, thread a will be killed on exit.
+ * #=> "axyz"
+ *
+ * The following example illustrates the +limit+ parameter.
*
* y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }}
* puts "Waiting" until y.join(0.15)
*
- * <em>produces:</em>
+ * This will produce:
*
* tick...
* Waiting
* tick...
- * Waitingtick...
- *
- *
+ * Waiting
+ * tick...
* tick...
*/
static VALUE
thread_join_m(int argc, VALUE *argv, VALUE self)
{
- rb_thread_t *target_th;
- double delay = DELAY_INFTY;
- VALUE limit;
+ VALUE timeout = Qnil;
+ rb_hrtime_t rel = 0, *limit = 0;
- GetThreadPtr(self, target_th);
+ if (rb_check_arity(argc, 0, 1)) {
+ timeout = argv[0];
+ }
- rb_scan_args(argc, argv, "01", &limit);
- if (!NIL_P(limit)) {
- delay = rb_num2dbl(limit);
+ // Convert the timeout eagerly, so it's always converted and deterministic
+ /*
+ * This supports INFINITY and negative values, so we can't use
+ * rb_time_interval right now...
+ */
+ if (NIL_P(timeout)) {
+ /* unlimited */
+ }
+ else if (FIXNUM_P(timeout)) {
+ rel = rb_sec2hrtime(NUM2TIMET(timeout));
+ limit = &rel;
+ }
+ else {
+ limit = double2hrtime(&rel, rb_num2dbl(timeout));
}
- return thread_join(target_th, delay);
+ return thread_join(rb_thread_ptr(self), timeout, limit);
}
/*
* call-seq:
- * thr.value => obj
+ * thr.value -> obj
*
- * Waits for <i>thr</i> to complete (via <code>Thread#join</code>) and returns
- * its value.
+ * Waits for +thr+ to complete, using #join, and returns its value or raises
+ * the exception which terminated the thread.
*
* a = Thread.new { 2 + 2 }
* a.value #=> 4
+ *
+ * b = Thread.new { raise 'something went wrong' }
+ * b.value #=> RuntimeError: something went wrong
*/
static VALUE
thread_value(VALUE self)
{
- rb_thread_t *th;
- GetThreadPtr(self, th);
- thread_join(th, DELAY_INFTY);
+ rb_thread_t *th = rb_thread_ptr(self);
+ thread_join(th, Qnil, 0);
+ if (UNDEF_P(th->value)) {
+ // If the thread is dead because we forked th->value is still Qundef.
+ return Qnil;
+ }
return th->value;
}
@@ -787,149 +1274,185 @@ thread_value(VALUE self)
* Thread Scheduling
*/
-static struct timeval
-double2timeval(double d)
+static void
+getclockofday(struct timespec *ts)
{
- struct timeval time;
+#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
+ if (clock_gettime(CLOCK_MONOTONIC, ts) == 0)
+ return;
+#endif
+ rb_timespec_now(ts);
+}
- time.tv_sec = (int)d;
- time.tv_usec = (int)((d - (int)d) * 1e6);
- if (time.tv_usec < 0) {
- time.tv_usec += (int)1e6;
- time.tv_sec -= 1;
- }
- return time;
+/*
+ * Don't inline this, since library call is already time consuming
+ * and we don't want "struct timespec" on stack too long for GC
+ */
+NOINLINE(rb_hrtime_t rb_hrtime_now(void));
+rb_hrtime_t
+rb_hrtime_now(void)
+{
+ struct timespec ts;
+
+ getclockofday(&ts);
+ return rb_timespec2hrtime(&ts);
}
-static void
-sleep_forever(rb_thread_t *th, int deadlockable)
+/*
+ * at least gcc 7.2 and 7.3 complains about "rb_hrtime_t end"
+ * being uninitialized, maybe other versions, too.
+ */
+COMPILER_WARNING_PUSH
+#if defined(__GNUC__) && __GNUC__ == 7 && __GNUC_MINOR__ <= 3
+COMPILER_WARNING_IGNORED(-Wmaybe-uninitialized)
+#endif
+#ifndef PRIu64
+#define PRIu64 PRI_64_PREFIX "u"
+#endif
+/*
+ * @end is the absolute time when @ts is set to expire
+ * Returns true if @end has past
+ * Updates @ts and returns false otherwise
+ */
+static int
+hrtime_update_expire(rb_hrtime_t *timeout, const rb_hrtime_t end)
+{
+ rb_hrtime_t now = rb_hrtime_now();
+
+ if (now > end) return 1;
+
+ RUBY_DEBUG_LOG("%"PRIu64" > %"PRIu64"", (uint64_t)end, (uint64_t)now);
+
+ *timeout = end - now;
+ return 0;
+}
+COMPILER_WARNING_POP
+
+static int
+sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
{
enum rb_thread_status prev_status = th->status;
+ int woke;
+ rb_hrtime_t end = rb_hrtime_add(rb_hrtime_now(), rel);
- th->status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
- do {
- if (deadlockable) {
- th->vm->sleeper++;
- rb_check_deadlock(th->vm);
- }
- native_sleep(th, 0);
- if (deadlockable) {
- th->vm->sleeper--;
- }
- RUBY_VM_CHECK_INTS();
- } while (th->status == THREAD_STOPPED_FOREVER);
+ th->status = THREAD_STOPPED;
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ while (th->status == THREAD_STOPPED) {
+ native_sleep(th, &rel);
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
+ break;
+ if (hrtime_update_expire(&rel, end))
+ break;
+ woke = 1;
+ }
th->status = prev_status;
+ return woke;
}
-static void
-getclockofday(struct timeval *tp)
+static int
+sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl)
{
-#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
- struct timespec ts;
+ enum rb_thread_status prev_status = th->status;
+ int woke;
+ rb_hrtime_t rel = rb_hrtime_sub(end, rb_hrtime_now());
- if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
- tp->tv_sec = ts.tv_sec;
- tp->tv_usec = ts.tv_nsec / 1000;
- } else
-#endif
- {
- gettimeofday(tp, NULL);
+ th->status = THREAD_STOPPED;
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ while (th->status == THREAD_STOPPED) {
+ native_sleep(th, &rel);
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
+ break;
+ if (hrtime_update_expire(&rel, end))
+ break;
+ woke = 1;
}
+ th->status = prev_status;
+ return woke;
}
static void
-sleep_timeval(rb_thread_t *th, struct timeval tv)
+sleep_forever(rb_thread_t *th, unsigned int fl)
{
- struct timeval to, tvn;
enum rb_thread_status prev_status = th->status;
+ enum rb_thread_status status;
+ int woke;
- getclockofday(&to);
- to.tv_sec += tv.tv_sec;
- if ((to.tv_usec += tv.tv_usec) >= 1000000) {
- to.tv_sec++;
- to.tv_usec -= 1000000;
- }
+ status = fl & SLEEP_DEADLOCKABLE ? THREAD_STOPPED_FOREVER : THREAD_STOPPED;
+ th->status = status;
- th->status = THREAD_STOPPED;
- do {
- native_sleep(th, &tv);
- RUBY_VM_CHECK_INTS();
- getclockofday(&tvn);
- if (to.tv_sec < tvn.tv_sec) break;
- if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
- thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n",
- (long)to.tv_sec, (long)to.tv_usec,
- (long)tvn.tv_sec, (long)tvn.tv_usec);
- tv.tv_sec = to.tv_sec - tvn.tv_sec;
- if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) {
- --tv.tv_sec;
- tv.tv_usec += 1000000;
- }
- } while (th->status == THREAD_STOPPED);
+ if (!(fl & SLEEP_NO_CHECKINTS)) RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+
+ while (th->status == status) {
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
+ }
+ {
+ native_sleep(th, 0);
+ }
+ if (fl & SLEEP_DEADLOCKABLE) {
+ rb_ractor_sleeper_threads_dec(th->ractor);
+ }
+ if (fl & SLEEP_ALLOW_SPURIOUS) {
+ break;
+ }
+
+ woke = vm_check_ints_blocking(th->ec);
+
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK)) {
+ break;
+ }
+ }
th->status = prev_status;
}
void
rb_thread_sleep_forever(void)
{
- thread_debug("rb_thread_sleep_forever\n");
- sleep_forever(GET_THREAD(), 0);
+ RUBY_DEBUG_LOG("forever");
+ sleep_forever(GET_THREAD(), SLEEP_SPURIOUS_CHECK);
}
-static void
+void
rb_thread_sleep_deadly(void)
{
- thread_debug("rb_thread_sleep_deadly\n");
- sleep_forever(GET_THREAD(), 1);
-}
-
-static double
-timeofday(void)
-{
-#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
- struct timespec tp;
-
- if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) {
- return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9;
- } else
-#endif
- {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6;
- }
-}
-
-static void
-sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec)
-{
- sleep_timeval(th, double2timeval(sleepsec));
+ RUBY_DEBUG_LOG("deadly");
+ sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE|SLEEP_SPURIOUS_CHECK);
}
static void
-sleep_for_polling(rb_thread_t *th)
+rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
{
- struct timeval time;
- time.tv_sec = 0;
- time.tv_usec = 100 * 1000; /* 0.1 sec */
- sleep_timeval(th, time);
+ rb_thread_t *th = GET_THREAD();
+ VALUE scheduler = rb_fiber_scheduler_current_for_threadptr(th);
+ if (scheduler != Qnil) {
+ rb_fiber_scheduler_block(scheduler, blocker, timeout);
+ }
+ else {
+ RUBY_DEBUG_LOG("...");
+ if (end) {
+ sleep_hrtime_until(th, end, SLEEP_SPURIOUS_CHECK);
+ }
+ else {
+ sleep_forever(th, SLEEP_DEADLOCKABLE);
+ }
+ }
}
void
rb_thread_wait_for(struct timeval time)
{
rb_thread_t *th = GET_THREAD();
- sleep_timeval(th, time);
+
+ sleep_hrtime(th, rb_timeval2hrtime(&time), SLEEP_SPURIOUS_CHECK);
}
void
-rb_thread_polling(void)
+rb_ec_check_ints(rb_execution_context_t *ec)
{
- RUBY_VM_CHECK_INTS();
- if (!rb_thread_alone()) {
- rb_thread_t *th = GET_THREAD();
- sleep_for_polling(th);
- }
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
/*
@@ -942,7 +1465,7 @@ rb_thread_polling(void)
void
rb_thread_check_ints(void)
{
- RUBY_VM_CHECK_INTS();
+ rb_ec_check_ints(GET_EC());
}
/*
@@ -952,49 +1475,37 @@ rb_thread_check_ints(void)
int
rb_thread_check_trap_pending(void)
{
- return GET_THREAD()->exec_signal != 0;
+ return rb_signal_buff_size() != 0;
}
/* This function can be called in blocking region. */
int
rb_thread_interrupted(VALUE thval)
{
- rb_thread_t *th;
- GetThreadPtr(thval, th);
- return RUBY_VM_INTERRUPTED(th);
+ return (int)RUBY_VM_INTERRUPTED(rb_thread_ptr(thval)->ec);
}
-struct timeval rb_time_timeval(VALUE);
-
void
rb_thread_sleep(int sec)
{
rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
}
-static void rb_threadptr_execute_interrupts_rec(rb_thread_t *, int);
-
static void
-rb_thread_schedule_rec(int sched_depth)
+rb_thread_schedule_limits(uint32_t limits_us)
{
- thread_debug("rb_thread_schedule\n");
if (!rb_thread_alone()) {
- rb_thread_t *th = GET_THREAD();
+ rb_thread_t *th = GET_THREAD();
+ RUBY_DEBUG_LOG("us:%u", (unsigned int)limits_us);
- thread_debug("rb_thread_schedule/switch start\n");
+ if (th->running_time_us >= limits_us) {
+ RUBY_DEBUG_LOG("switch %s", "start");
- RB_GC_SAVE_MACHINE_CONTEXT(th);
- native_mutex_unlock(&th->vm->global_vm_lock);
- {
- native_thread_yield();
- }
- native_mutex_lock(&th->vm->global_vm_lock);
+ RB_VM_SAVE_MACHINE_CONTEXT(th);
+ thread_sched_yield(TH_SCHED(th), th);
+ rb_ractor_thread_switch(th->ractor, th, true);
- rb_thread_set_current(th);
- thread_debug("rb_thread_schedule/switch done\n");
-
- if (!sched_depth && UNLIKELY(GET_THREAD()->interrupt_flag)) {
- rb_threadptr_execute_interrupts_rec(GET_THREAD(), sched_depth+1);
+ RUBY_DEBUG_LOG("switch %s", "done");
}
}
}
@@ -1002,120 +1513,528 @@ rb_thread_schedule_rec(int sched_depth)
void
rb_thread_schedule(void)
{
- rb_thread_schedule_rec(0);
+ rb_thread_schedule_limits(0);
+ RUBY_VM_CHECK_INTS(GET_EC());
}
/* blocking region */
+static inline int
+blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region,
+ rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted)
+{
+#ifdef RUBY_ASSERT_CRITICAL_SECTION
+ VM_ASSERT(ruby_assert_critical_section_entered == 0);
+#endif
+ VM_ASSERT(th == GET_THREAD());
+
+ region->prev_status = th->status;
+ if (unblock_function_set(th, ubf, arg, fail_if_interrupted)) {
+ th->blocking_region_buffer = region;
+ th->status = THREAD_STOPPED;
+ rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
+
+ RUBY_DEBUG_LOG("thread_id:%p", (void *)th->nt->thread_id);
+ return TRUE;
+ }
+ else {
+ return FALSE;
+ }
+}
+
static inline void
blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
{
- native_mutex_lock(&th->vm->global_vm_lock);
- rb_thread_set_current(th);
- thread_debug("leave blocking region (%p)\n", (void *)th);
- remove_signal_thread_list(th);
+ /* entry to ubf_list still permitted at this point, make it impossible: */
+ unblock_function_clear(th);
+ /* entry to ubf_list impossible at this point, so unregister is safe: */
+ unregister_ubf_list(th);
+
+ thread_sched_to_running(TH_SCHED(th), th);
+ rb_ractor_thread_switch(th->ractor, th, false);
+
th->blocking_region_buffer = 0;
- reset_unblock_function(th, &region->oldubf);
+ rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
if (th->status == THREAD_STOPPED) {
- th->status = region->prev_status;
+ th->status = region->prev_status;
}
+
+ RUBY_DEBUG_LOG("end");
+
+#ifndef _WIN32
+ // GET_THREAD() clears WSAGetLastError()
+ VM_ASSERT(th == GET_THREAD());
+#endif
}
-struct rb_blocking_region_buffer *
-rb_thread_blocking_region_begin(void)
+/*
+ * Resolve sentinel unblock function values to their actual function pointers
+ * and appropriate data2 values. This centralizes the logic for handling
+ * RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values.
+ *
+ * @param unblock_function Pointer to unblock function pointer (modified in place)
+ * @param data2 Pointer to data2 pointer (modified in place)
+ * @param thread Thread context for resolving data2 when needed
+ * @return true if sentinel values were resolved, false otherwise
+ */
+bool
+rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread)
{
- rb_thread_t *th = GET_THREAD();
- struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer);
- blocking_region_begin(th, region, ubf_select, th);
- return region;
+ rb_unblock_function_t *ubf = *unblock_function;
+
+ if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
+ *unblock_function = ubf_select;
+ *data2 = thread;
+ return true;
+ }
+ return false;
}
-void
-rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
+void *
+rb_nogvl(void *(*func)(void *), void *data1,
+ rb_unblock_function_t *ubf, void *data2,
+ int flags)
{
- rb_thread_t *th = GET_THREAD();
- blocking_region_end(th, region);
- xfree(region);
- RUBY_VM_CHECK_INTS();
+ if (flags & RB_NOGVL_OFFLOAD_SAFE) {
+ VALUE scheduler = rb_fiber_scheduler_current();
+ if (scheduler != Qnil) {
+ struct rb_fiber_scheduler_blocking_operation_state state = {0};
+
+ VALUE result = rb_fiber_scheduler_blocking_operation_wait(scheduler, func, data1, ubf, data2, flags, &state);
+
+ if (!UNDEF_P(result)) {
+ rb_errno_set(state.saved_errno);
+ return state.result;
+ }
+ }
+ }
+
+ void *val = 0;
+ rb_execution_context_t *ec = GET_EC();
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
+ rb_vm_t *vm = rb_ec_vm_ptr(ec);
+ bool is_main_thread = vm->ractor.main_thread == th;
+ int saved_errno = 0;
+
+ rb_thread_resolve_unblock_function(&ubf, &data2, th);
+
+ if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
+ if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
+ vm->ubf_async_safe = 1;
+ }
+ }
+
+ rb_vm_t *volatile saved_vm = vm;
+ BLOCKING_REGION(th, {
+ val = func(data1);
+ saved_errno = rb_errno();
+ }, ubf, data2, flags & RB_NOGVL_INTR_FAIL);
+ vm = saved_vm;
+
+ if (is_main_thread) vm->ubf_async_safe = 0;
+
+ if ((flags & RB_NOGVL_INTR_FAIL) == 0) {
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ }
+
+ rb_errno_set(saved_errno);
+
+ return val;
}
/*
- * rb_thread_blocking_region - permit concurrent/parallel execution.
+ * rb_thread_call_without_gvl - permit concurrent/parallel execution.
+ * rb_thread_call_without_gvl2 - permit concurrent/parallel execution
+ * without interrupt process.
*
- * This function does:
- * (1) release GVL.
+ * rb_thread_call_without_gvl() does:
+ * (1) Check interrupts.
+ * (2) release GVL.
* Other Ruby threads may run in parallel.
- * (2) call func with data1.
- * (3) acquire GVL.
+ * (3) call func with data1
+ * (4) acquire GVL.
* Other Ruby threads can not run in parallel any more.
+ * (5) Check interrupts.
+ *
+ * rb_thread_call_without_gvl2() does:
+ * (1) Check interrupt and return if interrupted.
+ * (2) release GVL.
+ * (3) call func with data1 and a pointer to the flags.
+ * (4) acquire GVL.
+ *
+ * If another thread interrupts this thread (Thread#kill, signal delivery,
+ * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means
+ * "un-blocking function"). `ubf()' should interrupt `func()' execution by
+ * toggling a cancellation flag, canceling the invocation of a call inside
+ * `func()' or similar. Note that `ubf()' may not be called with the GVL.
+ *
+ * There are built-in ubfs and you can specify these ubfs:
+ *
+ * * RUBY_UBF_IO: ubf for IO operation
+ * * RUBY_UBF_PROCESS: ubf for process operation
+ *
+ * However, we can not guarantee our built-in ubfs interrupt your `func()'
+ * correctly. Be careful to use rb_thread_call_without_gvl(). If you don't
+ * provide proper ubf(), your program will not stop for Control+C or other
+ * shutdown events.
+ *
+ * "Check interrupts" on above list means checking asynchronous
+ * interrupt events (such as Thread#kill, signal delivery, VM-shutdown
+ * request, and so on) and calling corresponding procedures
+ * (such as `trap' for signals, raise an exception for Thread#raise).
+ * If `func()' finished and received interrupts, you may skip interrupt
+ * checking. For example, assume the following func() it reads data from file.
+ *
+ * read_func(...) {
+ * // (a) before read
+ * read(buffer); // (b) reading
+ * // (c) after read
+ * }
+ *
+ * If an interrupt occurs at (a) or (b), then `ubf()' cancels this
+ * `read_func()' and interrupts are checked. However, if an interrupt occurs
+ * at (c), after *read* operation is completed, checking interrupts is harmful
+ * because it causes irrevocable side-effect, the read data will vanish. To
+ * avoid such problem, the `read_func()' should be used with
+ * `rb_thread_call_without_gvl2()'.
+ *
+ * If `rb_thread_call_without_gvl2()' detects interrupt, it returns
+ * immediately. This function does not show when the execution was interrupted.
+ * For example, there are 4 possible timing (a), (b), (c) and before calling
+ * read_func(). You need to record progress of a read_func() and check
+ * the progress after `rb_thread_call_without_gvl2()'. You may need to call
+ * `rb_thread_check_ints()' correctly or your program can not process proper
+ * process such as `trap' and so on.
+ *
+ * NOTE: You can not execute most of Ruby C API and touch Ruby
+ * objects in `func()' and `ubf()', including raising an
+ * exception, because current thread doesn't acquire GVL
+ * (it causes synchronization problems). If you need to
+ * call ruby functions either use rb_thread_call_with_gvl()
+ * or read source code of C APIs and confirm safety by
+ * yourself.
+ *
+ * NOTE: In short, this API is difficult to use safely. I recommend you
+ * use other ways if you have. We lack experiences to use this API.
+ * Please report your problem related on it.
+ *
+ * NOTE: Releasing GVL and re-acquiring GVL may be expensive operations
+ * for a short running `func()'. Be sure to benchmark and use this
+ * mechanism when `func()' consumes enough time.
+ *
+ * Safe C API:
+ * * rb_thread_interrupted() - check interrupt flag
+ * * ruby_xmalloc(), ruby_xrealloc(), ruby_xfree() -
+ * they will work without GVL, and may acquire GVL when GC is needed.
+ */
+void *
+rb_thread_call_without_gvl2(void *(*func)(void *), void *data1,
+ rb_unblock_function_t *ubf, void *data2)
+{
+ return rb_nogvl(func, data1, ubf, data2, RB_NOGVL_INTR_FAIL);
+}
+
+void *
+rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
+ rb_unblock_function_t *ubf, void *data2)
+{
+ return rb_nogvl(func, data1, ubf, data2, 0);
+}
+
+static int
+waitfd_to_waiting_flag(int wfd_event)
+{
+ return wfd_event << 1;
+}
+
+static struct ccan_list_head *
+rb_io_blocking_operations(struct rb_io *io)
+{
+ rb_serial_t fork_generation = GET_VM()->fork_gen;
+
+ // On fork, all existing entries in this list (which are stack allocated) become invalid.
+ // Therefore, we re-initialize the list which clears it.
+ if (io->fork_generation != fork_generation) {
+ ccan_list_head_init(&io->blocking_operations);
+ io->fork_generation = fork_generation;
+ }
+
+ return &io->blocking_operations;
+}
+
+/*
+ * Registers a blocking operation for an IO object. This is used to track all threads and fibers
+ * that are currently blocked on this IO for reading, writing or other operations.
*
- * If another thread interrupts this thread (Thread#kill, signal delivery,
- * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means
- * "un-blocking function"). `ubf()' should interrupt `func()' execution.
+ * When the IO is closed, all blocking operations will be notified via rb_fiber_scheduler_fiber_interrupt
+ * for fibers with a scheduler, or via rb_threadptr_interrupt for threads without a scheduler.
*
- * There are built-in ubfs and you can specify these ubfs.
- * However, we can not guarantee our built-in ubfs interrupt
- * your `func()' correctly. Be careful to use rb_thread_blocking_region().
+ * @parameter io The IO object on which the operation will block
+ * @parameter blocking_operation The operation details including the execution context that will be blocked
+ */
+static void
+rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list);
+}
+
+static void
+rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ ccan_list_del(&blocking_operation->list);
+}
+
+struct io_blocking_operation_arguments {
+ struct rb_io *io;
+ struct rb_io_blocking_operation *blocking_operation;
+};
+
+static VALUE
+io_blocking_operation_exit(VALUE _arguments)
+{
+ struct io_blocking_operation_arguments *arguments = (void*)_arguments;
+ struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
+
+ rb_io_blocking_operation_pop(arguments->io, blocking_operation);
+
+ rb_io_t *io = arguments->io;
+ rb_thread_t *thread = io->closing_ec->thread_ptr;
+ rb_fiber_t *fiber = io->closing_ec->fiber_ptr;
+
+ if (thread->scheduler != Qnil) {
+ // This can cause spurious wakeups...
+ rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber));
+ }
+ else {
+ rb_thread_wakeup(thread->self);
+ }
+
+ return Qnil;
+}
+
+/*
+ * Called when a blocking operation completes or is interrupted. Removes the operation from
+ * the IO's blocking_operations list and wakes up any waiting threads/fibers.
*
- * * RUBY_UBF_IO: ubf for IO operation
- * * RUBY_UBF_PROCESS: ubf for process operation
+ * If there's a wakeup_mutex (meaning an IO close is in progress), synchronizes the cleanup
+ * through that mutex to ensure proper coordination with the closing thread.
*
- * NOTE: You can not execute most of Ruby C API and touch Ruby
- * objects in `func()' and `ubf()', including raising an
- * exception, because current thread doesn't acquire GVL
- * (cause synchronization problem). If you need to do it,
- * read source code of C APIs and confirm by yourself.
+ * @parameter io The IO object the operation was performed on
+ * @parameter blocking_operation The completed operation to clean up
+ */
+static void
+rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
+{
+ VALUE wakeup_mutex = io->wakeup_mutex;
+
+ // Indicate that the blocking operation is no longer active:
+ blocking_operation->ec = NULL;
+
+ if (RB_TEST(wakeup_mutex)) {
+ struct io_blocking_operation_arguments arguments = {
+ .io = io,
+ .blocking_operation = blocking_operation
+ };
+
+ rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments);
+ }
+ else {
+ // If there's no wakeup_mutex, we can safely remove the operation directly:
+ rb_io_blocking_operation_pop(io, blocking_operation);
+ }
+}
+
+static VALUE
+rb_thread_io_blocking_operation_ensure(VALUE _argument)
+{
+ struct io_blocking_operation_arguments *arguments = (void*)_argument;
+
+ rb_io_blocking_operation_exit(arguments->io, arguments->blocking_operation);
+
+ return Qnil;
+}
+
+/*
+ * Executes a function that performs a blocking IO operation, while properly tracking
+ * the operation in the IO's blocking_operations list. This ensures proper cleanup
+ * and interruption handling if the IO is closed while blocked.
*
- * NOTE: In short, this API is difficult to use safely. I recommend you
- * use other ways if you have. We lack experiences to use this API.
- * Please report your problem related on it.
+ * The operation is automatically removed from the blocking_operations list when the function
+ * returns, whether normally or due to an exception.
*
- * Safe C API:
- * * rb_thread_interrupted() - check interrupt flag
- * * ruby_xalloc(), ruby_xrealloc(), ruby_xfree() -
- * if they called without GVL, acquire GVL automatically.
+ * @parameter self The IO object
+ * @parameter function The function to execute that will perform the blocking operation
+ * @parameter argument The argument to pass to the function
+ * @returns The result of the blocking operation function
*/
VALUE
-rb_thread_blocking_region(
- rb_blocking_function_t *func, void *data1,
- rb_unblock_function_t *ubf, void *data2)
+rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argument)
{
- VALUE val;
- rb_thread_t *th = GET_THREAD();
+ struct rb_io *io;
+ RB_IO_POINTER(self, io);
+
+ rb_execution_context_t *ec = GET_EC();
+ struct rb_io_blocking_operation blocking_operation = {
+ .ec = ec,
+ };
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+
+ struct io_blocking_operation_arguments io_blocking_operation_arguments = {
+ .io = io,
+ .blocking_operation = &blocking_operation
+ };
+
+ return rb_ensure(function, argument, rb_thread_io_blocking_operation_ensure, (VALUE)&io_blocking_operation_arguments);
+}
+
+static bool
+thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking;
+#else
+ return false;
+#endif
+}
+
+// true if need retry
+static bool
+thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ if (thread_io_mn_schedulable(th, events, timeout)) {
+ rb_hrtime_t rel, *prel;
- if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
- ubf = ubf_select;
- data2 = th;
+ if (timeout) {
+ rel = rb_timeval2hrtime(timeout);
+ prel = &rel;
+ }
+ else {
+ prel = NULL;
+ }
+
+ VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT)));
+
+ if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
+ // timeout
+ return false;
+ }
+ else {
+ return true;
+ }
}
+#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
+ return false;
+}
- BLOCKING_REGION({
- val = func(data1);
- }, ubf, data2);
+// assume read/write
+static bool
+blocking_call_retryable_p(int r, int eno)
+{
+ if (r != -1) return false;
- return val;
+ switch (eno) {
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool
+rb_thread_mn_schedulable(VALUE thval)
+{
+ rb_thread_t *th = rb_thread_ptr(thval);
+ return th->mn_schedulable;
}
-/* alias of rb_thread_blocking_region() */
+VALUE
+rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events)
+{
+ rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
+
+ RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events);
+
+ volatile VALUE val = Qundef; /* shouldn't be used */
+ volatile int saved_errno = 0;
+ enum ruby_tag_type state;
+ volatile bool prev_mn_schedulable = th->mn_schedulable;
+ th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL);
+
+ int fd = io->fd;
+
+ // `errno` is only valid when there is an actual error - but we can't
+ // extract that from the return value of `func` alone, so we clear any
+ // prior `errno` value here so that we can later check if it was set by
+ // `func` or not (as opposed to some previously set value).
+ errno = 0;
+
+ struct rb_io_blocking_operation blocking_operation = {
+ .ec = ec,
+ };
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+
+ {
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ volatile enum ruby_tag_type saved_state = state; /* for BLOCKING_REGION */
+ retry:
+ BLOCKING_REGION(th, {
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, th, FALSE);
+
+ RUBY_ASSERT(th == rb_ec_thread_ptr(ec));
+ if (events &&
+ blocking_call_retryable_p((int)val, saved_errno) &&
+ thread_io_wait_events(th, fd, events, NULL)) {
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ goto retry;
+ }
+
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+
+ state = saved_state;
+ }
+ EC_POP_TAG();
+
+ th = rb_ec_thread_ptr(ec);
+ th->mn_schedulable = prev_mn_schedulable;
+ }
+
+ rb_io_blocking_operation_exit(io, &blocking_operation);
+
+ if (state) {
+ EC_JUMP_TAG(ec, state);
+ }
+
+ // If the error was a timeout, we raise a specific exception for that:
+ if (saved_errno == ETIMEDOUT) {
+ rb_raise(rb_eIOTimeoutError, "Blocking operation timed out!");
+ }
+
+ errno = saved_errno;
+
+ return val;
+}
VALUE
-rb_thread_call_without_gvl(
- rb_blocking_function_t *func, void *data1,
- rb_unblock_function_t *ubf, void *data2)
+rb_thread_io_blocking_region(struct rb_io *io, rb_blocking_function_t *func, void *data1)
{
- return rb_thread_blocking_region(func, data1, ubf, data2);
+ return rb_thread_io_blocking_call(io, func, data1, 0);
}
/*
- * rb_thread_call_with_gvl - re-enter into Ruby world while releasing GVL.
+ * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release.
*
- ***
- *** This API is EXPERIMENTAL!
- *** We do not guarantee that this API remains in ruby 1.9.2 or later.
- ***
- *
- * While releasing GVL using rb_thread_blocking_region() or
- * rb_thread_call_without_gvl(), you can not access Ruby values or invoke methods.
- * If you need to access it, you must use this function rb_thread_call_with_gvl().
+ * After releasing GVL using
+ * rb_thread_call_without_gvl() you can not access Ruby values or invoke
+ * methods. If you need to access Ruby you must use this function
+ * rb_thread_call_with_gvl().
*
* This function rb_thread_call_with_gvl() does:
* (1) acquire GVL.
@@ -1124,18 +2043,21 @@ rb_thread_call_without_gvl(
* (4) return a value which is returned at (2).
*
* NOTE: You should not return Ruby object at (2) because such Object
- * will not marked.
+ * will not be marked.
*
- * NOTE: If an exception is raised in `func', this function "DOES NOT"
+ * NOTE: If an exception is raised in `func', this function DOES NOT
* protect (catch) the exception. If you have any resources
* which should free before throwing exception, you need use
* rb_protect() in `func' and return a value which represents
- * exception is raised.
+ * exception was raised.
+ *
+ * NOTE: This function should not be called by a thread which was not
+ * created as Ruby thread (created by Thread.new or so). In other
+ * words, this function *DOES NOT* associate or convert a NON-Ruby
+ * thread to a Ruby thread.
*
- * NOTE: This functions should not be called by a thread which
- * is not created as Ruby thread (created by Thread.new or so).
- * In other words, this function *DOES NOT* associate
- * NON-Ruby thread to Ruby thread.
+ * NOTE: If this thread has already acquired the GVL, then the method call
+ * is performed without acquiring or releasing the GVL (from Ruby 4.0).
*/
void *
rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
@@ -1146,37 +2068,36 @@ rb_thread_call_with_gvl(void *(*func)(void *), void *data1)
void *r;
if (th == 0) {
- /* Error is occurred, but we can't use rb_bug()
- * because this thread is not Ruby's thread.
+ /* Error has occurred, but we can't use rb_bug()
+ * because this thread is not Ruby's thread.
* What should we do?
- */
-
- fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
- exit(1);
+ */
+ bp();
+ fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n");
+ exit(EXIT_FAILURE);
}
brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer;
prev_unblock = th->unblock;
if (brb == 0) {
- rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL.");
+ /* the GVL is already acquired, call method directly */
+ return (*func)(data1);
}
blocking_region_end(th, brb);
/* enter to Ruby world: You can access Ruby values, methods and so on. */
r = (*func)(data1);
/* leave from Ruby world: You can not access Ruby values, etc. */
- blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg);
+ int released = blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE);
+ RUBY_ASSERT_ALWAYS(released);
+ RB_VM_SAVE_MACHINE_CONTEXT(th);
+ thread_sched_to_waiting(TH_SCHED(th), th);
return r;
}
/*
* ruby_thread_has_gvl_p - check if current native thread has GVL.
- *
- ***
- *** This API is EXPERIMENTAL!
- *** We do not guarantee that this API remains in ruby 1.9.2 or later.
- ***
*/
int
@@ -1185,31 +2106,19 @@ ruby_thread_has_gvl_p(void)
rb_thread_t *th = ruby_thread_from_native();
if (th && th->blocking_region_buffer == 0) {
- return 1;
+ return 1;
}
else {
- return 0;
+ return 0;
}
}
/*
- * call-seq:
- * Thread.pass => nil
- *
- * Invokes the thread scheduler to pass execution to another thread.
- *
- * a = Thread.new { print "a"; Thread.pass;
- * print "b"; Thread.pass;
- * print "c" }
- * b = Thread.new { print "x"; Thread.pass;
- * print "y"; Thread.pass;
- * print "z" }
- * a.join
- * b.join
- *
- * <em>produces:</em>
+ * call-seq:
+ * Thread.pass -> nil
*
- * axbycz
+ * Give the thread scheduler a hint to pass execution to another thread.
+ * A running thread may or may not switch, it depends on OS and processor.
*/
static VALUE
@@ -1219,89 +2128,605 @@ thread_s_pass(VALUE klass)
return Qnil;
}
+/*****************************************************/
+
/*
- *
+ * rb_threadptr_pending_interrupt_* - manage asynchronous error queue
+ *
+ * Async events such as an exception thrown by Thread#raise,
+ * Thread#kill and thread termination (after main thread termination)
+ * will be queued to th->pending_interrupt_queue.
+ * - clear: clear the queue.
+ * - enque: enqueue err object into queue.
+ * - deque: dequeue err object from queue.
+ * - active_p: return 1 if the queue should be checked.
+ *
+ * All rb_threadptr_pending_interrupt_* functions are called by
+ * a GVL acquired thread, of course.
+ * Note that all "rb_" prefix APIs need GVL to call.
*/
+void
+rb_threadptr_pending_interrupt_clear(rb_thread_t *th)
+{
+ rb_ary_clear(th->pending_interrupt_queue);
+}
+
+void
+rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v)
+{
+ rb_ary_push(th->pending_interrupt_queue, v);
+ th->pending_interrupt_queue_checked = 0;
+}
+
static void
-rb_threadptr_execute_interrupts_rec(rb_thread_t *th, int sched_depth)
-{
- if (GET_VM()->main_thread == th) {
- while (rb_signal_buff_size() && !th->exec_signal) native_thread_yield();
- }
-
- if (th->raised_flag) return;
-
- while (th->interrupt_flag) {
- enum rb_thread_status status = th->status;
- int timer_interrupt = th->interrupt_flag & 0x01;
- int finalizer_interrupt = th->interrupt_flag & 0x04;
-
- th->status = THREAD_RUNNABLE;
- th->interrupt_flag = 0;
-
- /* signal handling */
- if (th->exec_signal) {
- int sig = th->exec_signal;
- th->exec_signal = 0;
- rb_signal_exec(th, sig);
- }
-
- /* exception from another thread */
- if (th->thrown_errinfo) {
- VALUE err = th->thrown_errinfo;
- th->thrown_errinfo = 0;
- thread_debug("rb_thread_execute_interrupts: %ld\n", err);
-
- if (err == eKillSignal || err == eTerminateSignal) {
- th->errinfo = INT2FIX(TAG_FATAL);
- TH_JUMP_TAG(th, TAG_FATAL);
- }
- else {
- rb_exc_raise(err);
- }
- }
- th->status = status;
-
- if (finalizer_interrupt) {
- rb_gc_finalize_deferred();
- }
-
- if (!sched_depth && timer_interrupt) {
- sched_depth++;
- EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0);
-
- if (th->slice > 0) {
- th->slice--;
- }
- else {
- reschedule:
- rb_thread_schedule_rec(sched_depth+1);
- if (th->slice < 0) {
- th->slice++;
- goto reschedule;
- }
- else {
- th->slice = th->priority;
- }
- }
- }
+threadptr_check_pending_interrupt_queue(rb_thread_t *th)
+{
+ if (!th->pending_interrupt_queue) {
+ rb_raise(rb_eThreadError, "uninitialized thread");
}
}
-void
-rb_threadptr_execute_interrupts(rb_thread_t *th)
+enum handle_interrupt_timing {
+ INTERRUPT_NONE,
+ INTERRUPT_IMMEDIATE,
+ INTERRUPT_ON_BLOCKING,
+ INTERRUPT_NEVER
+};
+
+static enum handle_interrupt_timing
+rb_threadptr_pending_interrupt_from_symbol(rb_thread_t *th, VALUE sym)
{
- rb_threadptr_execute_interrupts_rec(th, 0);
+ if (sym == sym_immediate) {
+ return INTERRUPT_IMMEDIATE;
+ }
+ else if (sym == sym_on_blocking) {
+ return INTERRUPT_ON_BLOCKING;
+ }
+ else if (sym == sym_never) {
+ return INTERRUPT_NEVER;
+ }
+ else {
+ rb_raise(rb_eThreadError, "unknown mask signature");
+ }
}
-void
-rb_gc_mark_threads(void)
+static enum handle_interrupt_timing
+rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
{
- /* TODO: remove */
+ VALUE mask;
+ long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
+ const VALUE *mask_stack = RARRAY_CONST_PTR(th->pending_interrupt_mask_stack);
+ VALUE mod;
+ long i;
+
+ for (i=0; i<mask_stack_len; i++) {
+ mask = mask_stack[mask_stack_len-(i+1)];
+
+ if (SYMBOL_P(mask)) {
+ /* do not match RUBY_FATAL_THREAD_KILLED etc */
+ if (err != rb_cInteger) {
+ return rb_threadptr_pending_interrupt_from_symbol(th, mask);
+ }
+ else {
+ continue;
+ }
+ }
+
+ for (mod = err; mod; mod = RCLASS_SUPER(mod)) {
+ VALUE klass = mod;
+ VALUE sym;
+
+ if (BUILTIN_TYPE(mod) == T_ICLASS) {
+ klass = RBASIC(mod)->klass;
+ }
+ else if (mod != RCLASS_ORIGIN(mod)) {
+ continue;
+ }
+
+ if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
+ return rb_threadptr_pending_interrupt_from_symbol(th, sym);
+ }
+ }
+ /* try to next mask */
+ }
+ return INTERRUPT_NONE;
}
-/*****************************************************/
+static int
+rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th)
+{
+ return RARRAY_LEN(th->pending_interrupt_queue) == 0;
+}
+
+static int
+rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
+{
+ int i;
+ for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
+ VALUE e = RARRAY_AREF(th->pending_interrupt_queue, i);
+ if (rb_obj_is_kind_of(e, err)) {
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
+static VALUE
+rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing)
+{
+#if 1 /* 1 to enable Thread#handle_interrupt, 0 to ignore it */
+ int i;
+
+ for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
+ VALUE err = RARRAY_AREF(th->pending_interrupt_queue, i);
+
+ enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
+
+ switch (mask_timing) {
+ case INTERRUPT_ON_BLOCKING:
+ if (timing != INTERRUPT_ON_BLOCKING) {
+ break;
+ }
+ /* fall through */
+ case INTERRUPT_NONE: /* default: IMMEDIATE */
+ case INTERRUPT_IMMEDIATE:
+ rb_ary_delete_at(th->pending_interrupt_queue, i);
+ return err;
+ case INTERRUPT_NEVER:
+ break;
+ }
+ }
+
+ th->pending_interrupt_queue_checked = 1;
+ return Qundef;
+#else
+ VALUE err = rb_ary_shift(th->pending_interrupt_queue);
+ if (rb_threadptr_pending_interrupt_empty_p(th)) {
+ th->pending_interrupt_queue_checked = 1;
+ }
+ return err;
+#endif
+}
+
+static int
+threadptr_pending_interrupt_active_p(rb_thread_t *th)
+{
+ /*
+ * For optimization, we don't check async errinfo queue
+ * if the queue and the thread interrupt mask were not changed
+ * since last check.
+ */
+ if (th->pending_interrupt_queue_checked) {
+ return 0;
+ }
+
+ if (rb_threadptr_pending_interrupt_empty_p(th)) {
+ return 0;
+ }
+
+ return 1;
+}
+
+static int
+handle_interrupt_arg_check_i(VALUE key, VALUE val, VALUE args)
+{
+ VALUE *maskp = (VALUE *)args;
+
+ if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
+ rb_raise(rb_eArgError, "unknown mask signature");
+ }
+
+ if (key == rb_eException && (UNDEF_P(*maskp) || NIL_P(*maskp))) {
+ *maskp = val;
+ return ST_CONTINUE;
+ }
+
+ if (RTEST(*maskp)) {
+ if (!RB_TYPE_P(*maskp, T_HASH)) {
+ VALUE prev = *maskp;
+ *maskp = rb_ident_hash_new();
+ if (SYMBOL_P(prev)) {
+ rb_hash_aset(*maskp, rb_eException, prev);
+ }
+ }
+ rb_hash_aset(*maskp, key, val);
+ }
+ else {
+ *maskp = Qfalse;
+ }
+
+ return ST_CONTINUE;
+}
+
+/*
+ * call-seq:
+ * Thread.handle_interrupt(hash) { ... } -> result of the block
+ *
+ * Changes asynchronous interrupt timing.
+ *
+ * _interrupt_ means asynchronous event and corresponding procedure
+ * by Thread#raise, Thread#kill, signal trap (not supported yet)
+ * and main thread termination (if main thread terminates, then all
+ * other thread will be killed).
+ *
+ * The given +hash+ has pairs like <code>ExceptionClass =>
+ * :TimingSymbol</code>. Where the ExceptionClass is the interrupt handled by
+ * the given block. The TimingSymbol can be one of the following symbols:
+ *
+ * [+:immediate+] Invoke interrupts immediately.
+ * [+:on_blocking+] Invoke interrupts while _BlockingOperation_.
+ * [+:never+] Never invoke all interrupts.
+ *
+ * _BlockingOperation_ means that the operation will block the calling thread,
+ * such as read and write. On CRuby implementation, _BlockingOperation_ is any
+ * operation executed without GVL.
+ *
+ * Masked asynchronous interrupts are delayed until they are enabled.
+ * This method is similar to sigprocmask(3).
+ *
+ * === NOTE
+ *
+ * Asynchronous interrupts are difficult to use.
+ *
+ * If you need to communicate between threads, please consider to use another way such as Queue.
+ *
+ * Or use them with deep understanding about this method.
+ *
+ * === Usage
+ *
+ * In this example, we can guard from Thread#raise exceptions.
+ *
+ * Using the +:never+ TimingSymbol the RuntimeError exception will always be
+ * ignored in the first block of the main thread. In the second
+ * ::handle_interrupt block we can purposefully handle RuntimeError exceptions.
+ *
+ * th = Thread.new do
+ * Thread.handle_interrupt(RuntimeError => :never) {
+ * begin
+ * # You can write resource allocation code safely.
+ * Thread.handle_interrupt(RuntimeError => :immediate) {
+ * # ...
+ * }
+ * ensure
+ * # You can write resource deallocation code safely.
+ * end
+ * }
+ * end
+ * Thread.pass
+ * # ...
+ * th.raise "stop"
+ *
+ * While we are ignoring the RuntimeError exception, it's safe to write our
+ * resource allocation code. Then, the ensure block is where we can safely
+ * deallocate your resources.
+ *
+ * ==== Stack control settings
+ *
+ * It's possible to stack multiple levels of ::handle_interrupt blocks in order
+ * to control more than one ExceptionClass and TimingSymbol at a time.
+ *
+ * Thread.handle_interrupt(FooError => :never) {
+ * Thread.handle_interrupt(BarError => :never) {
+ * # FooError and BarError are prohibited.
+ * }
+ * }
+ *
+ * ==== Inheritance with ExceptionClass
+ *
+ * All exceptions inherited from the ExceptionClass parameter will be considered.
+ *
+ * Thread.handle_interrupt(Exception => :never) {
+ * # all exceptions inherited from Exception are prohibited.
+ * }
+ *
+ * For handling all interrupts, use +Object+ and not +Exception+
+ * as the ExceptionClass, as kill/terminate interrupts are not handled by +Exception+.
+ */
+static VALUE
+rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
+{
+ VALUE mask = Qundef;
+ rb_execution_context_t * volatile ec = GET_EC();
+ rb_thread_t * volatile th = rb_ec_thread_ptr(ec);
+ volatile VALUE r = Qnil;
+ enum ruby_tag_type state;
+
+ if (!rb_block_given_p()) {
+ rb_raise(rb_eArgError, "block is needed.");
+ }
+
+ mask_arg = rb_to_hash_type(mask_arg);
+
+ if (OBJ_FROZEN(mask_arg) && rb_hash_compare_by_id_p(mask_arg)) {
+ mask = Qnil;
+ }
+
+ rb_hash_foreach(mask_arg, handle_interrupt_arg_check_i, (VALUE)&mask);
+
+ if (UNDEF_P(mask)) {
+ return rb_yield(Qnil);
+ }
+
+ if (!RTEST(mask)) {
+ mask = mask_arg;
+ }
+ else if (RB_TYPE_P(mask, T_HASH)) {
+ OBJ_FREEZE(mask);
+ }
+
+ rb_ary_push(th->pending_interrupt_mask_stack, mask);
+ if (!rb_threadptr_pending_interrupt_empty_p(th)) {
+ th->pending_interrupt_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(th->ec);
+ }
+
+ EC_PUSH_TAG(th->ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ r = rb_yield(Qnil);
+ }
+ EC_POP_TAG();
+
+ rb_ary_pop(th->pending_interrupt_mask_stack);
+ if (!rb_threadptr_pending_interrupt_empty_p(th)) {
+ th->pending_interrupt_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(th->ec);
+ }
+
+ RUBY_VM_CHECK_INTS(th->ec);
+
+ if (state) {
+ EC_JUMP_TAG(th->ec, state);
+ }
+
+ return r;
+}
+
+/*
+ * call-seq:
+ * target_thread.pending_interrupt?(error = nil) -> true/false
+ *
+ * Returns whether or not the asynchronous queue is empty for the target thread.
+ *
+ * If +error+ is given, then check only for +error+ type deferred events.
+ *
+ * See ::pending_interrupt? for more information.
+ */
+static VALUE
+rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
+{
+ rb_thread_t *target_th = rb_thread_ptr(target_thread);
+
+ if (!target_th->pending_interrupt_queue) {
+ return Qfalse;
+ }
+ if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
+ return Qfalse;
+ }
+ if (rb_check_arity(argc, 0, 1)) {
+ VALUE err = argv[0];
+ if (!rb_obj_is_kind_of(err, rb_cModule)) {
+ rb_raise(rb_eTypeError, "class or module required for rescue clause");
+ }
+ return RBOOL(rb_threadptr_pending_interrupt_include_p(target_th, err));
+ }
+ else {
+ return Qtrue;
+ }
+}
+
+/*
+ * call-seq:
+ * Thread.pending_interrupt?(error = nil) -> true/false
+ *
+ * Returns whether or not the asynchronous queue is empty.
+ *
+ * Since Thread::handle_interrupt can be used to defer asynchronous events,
+ * this method can be used to determine if there are any deferred events.
+ *
+ * If you find this method returns true, then you may finish +:never+ blocks.
+ *
+ * For example, the following method processes deferred asynchronous events
+ * immediately.
+ *
+ * def Thread.kick_interrupt_immediately
+ * Thread.handle_interrupt(Object => :immediate) {
+ * Thread.pass
+ * }
+ * end
+ *
+ * If +error+ is given, then check only for +error+ type deferred events.
+ *
+ * === Usage
+ *
+ * th = Thread.new{
+ * Thread.handle_interrupt(RuntimeError => :on_blocking){
+ * while true
+ * ...
+ * # reach safe point to invoke interrupt
+ * if Thread.pending_interrupt?
+ * Thread.handle_interrupt(Object => :immediate){}
+ * end
+ * ...
+ * end
+ * }
+ * }
+ * ...
+ * th.raise # stop thread
+ *
+ * This example can also be written as the following, which you should use to
+ * avoid asynchronous interrupts.
+ *
+ * flag = true
+ * th = Thread.new{
+ * Thread.handle_interrupt(RuntimeError => :on_blocking){
+ * while true
+ * ...
+ * # reach safe point to invoke interrupt
+ * break if flag == false
+ * ...
+ * end
+ * }
+ * }
+ * ...
+ * flag = false # stop thread
+ */
+
+static VALUE
+rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
+{
+ return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
+}
+
+NORETURN(static void rb_threadptr_to_kill(rb_thread_t *th));
+
+static void
+rb_threadptr_to_kill(rb_thread_t *th)
+{
+ VM_ASSERT(GET_THREAD() == th);
+ rb_threadptr_pending_interrupt_clear(th);
+ th->status = THREAD_RUNNABLE;
+ th->to_kill = 1;
+ th->ec->errinfo = INT2FIX(TAG_FATAL);
+ EC_JUMP_TAG(th->ec, TAG_FATAL);
+}
+
+static inline rb_atomic_t
+threadptr_get_interrupts(rb_thread_t *th)
+{
+ rb_execution_context_t *ec = th->ec;
+ rb_atomic_t interrupt;
+ rb_atomic_t old;
+
+ old = ATOMIC_LOAD_RELAXED(ec->interrupt_flag);
+ do {
+ interrupt = old;
+ old = ATOMIC_CAS(ec->interrupt_flag, interrupt, interrupt & ec->interrupt_mask);
+ } while (old != interrupt);
+ return interrupt & (rb_atomic_t)~ec->interrupt_mask;
+}
+
+static void threadptr_interrupt_exec_exec(rb_thread_t *th);
+
+// Execute interrupts on currently running thread
+// In certain situations, calling this function will raise an exception. Some examples are:
+// * during VM shutdown (`rb_ractor_terminate_all`)
+// * Call to Thread#exit for current thread (`rb_thread_kill`)
+// * Call to Thread#raise for current thread
+int
+rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
+{
+ rb_atomic_t interrupt;
+ int postponed_job_interrupt = 0;
+ int ret = FALSE;
+
+ VM_ASSERT(GET_THREAD() == th);
+
+ if (th->ec->raised_flag) return ret;
+
+ while ((interrupt = threadptr_get_interrupts(th)) != 0) {
+ int sig;
+ int timer_interrupt;
+ int pending_interrupt;
+ int trap_interrupt;
+ int terminate_interrupt;
+
+ timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
+ pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
+ postponed_job_interrupt = interrupt & POSTPONED_JOB_INTERRUPT_MASK;
+ trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
+ terminate_interrupt = interrupt & TERMINATE_INTERRUPT_MASK; // request from other ractors
+
+ if (interrupt & VM_BARRIER_INTERRUPT_MASK) {
+ RB_VM_LOCKING();
+ }
+
+ if (postponed_job_interrupt) {
+ rb_postponed_job_flush(th->vm);
+ }
+
+ if (trap_interrupt) {
+ /* signal handling */
+ if (th == th->vm->ractor.main_thread) {
+ enum rb_thread_status prev_status = th->status;
+
+ th->status = THREAD_RUNNABLE;
+ {
+ while ((sig = rb_get_next_signal()) != 0) {
+ ret |= rb_signal_exec(th, sig);
+ }
+ }
+ th->status = prev_status;
+ }
+
+ if (!ccan_list_empty(&th->interrupt_exec_tasks)) {
+ enum rb_thread_status prev_status = th->status;
+
+ th->status = THREAD_RUNNABLE;
+ {
+ threadptr_interrupt_exec_exec(th);
+ }
+ th->status = prev_status;
+ }
+ }
+
+ /* exception from another thread */
+ if (pending_interrupt && threadptr_pending_interrupt_active_p(th)) {
+ VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
+ RUBY_DEBUG_LOG("err:%"PRIdVALUE, err);
+ ret = TRUE;
+
+ if (UNDEF_P(err)) {
+ /* no error */
+ }
+ else if (err == RUBY_FATAL_THREAD_KILLED /* Thread#kill received */ ||
+ err == RUBY_FATAL_THREAD_TERMINATED /* Terminate thread */ ||
+ err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) {
+ terminate_interrupt = 1;
+ }
+ else {
+ if (err == th->vm->special_exceptions[ruby_error_stream_closed]) {
+ /* the only special exception to be queued across thread */
+ err = ruby_vm_special_exception_copy(err);
+ }
+ /* set runnable if th was slept. */
+ if (th->status == THREAD_STOPPED ||
+ th->status == THREAD_STOPPED_FOREVER)
+ th->status = THREAD_RUNNABLE;
+ rb_exc_raise(err);
+ }
+ }
+
+ if (terminate_interrupt) {
+ rb_threadptr_to_kill(th);
+ }
+
+ if (timer_interrupt) {
+ uint32_t limits_us = thread_default_quantum_ms * 1000;
+
+ if (th->priority > 0)
+ limits_us <<= th->priority;
+ else
+ limits_us >>= -th->priority;
+
+ if (th->status == THREAD_RUNNABLE)
+ th->running_time_us += 10 * 1000; // 10ms = 10_000us // TODO: use macro
+
+ VM_ASSERT(th->ec->cfp);
+ EXEC_EVENT_HOOK(th->ec, RUBY_INTERNAL_EVENT_SWITCH, th->ec->cfp->self,
+ 0, 0, 0, Qundef);
+
+ rb_thread_schedule_limits(limits_us);
+ }
+ }
+ return ret;
+}
+
+void
+rb_thread_execute_interrupts(VALUE thval)
+{
+ rb_threadptr_execute_interrupts(rb_thread_ptr(thval), 1);
+}
static void
rb_threadptr_ready(rb_thread_t *th)
@@ -1310,23 +2735,23 @@ rb_threadptr_ready(rb_thread_t *th)
}
static VALUE
-rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
+rb_threadptr_raise(rb_thread_t *target_th, int argc, VALUE *argv)
{
- VALUE exc;
-
- again:
- if (rb_threadptr_dead(th)) {
- return Qnil;
+ if (rb_threadptr_dead(target_th)) {
+ return Qnil;
}
- if (th->thrown_errinfo != 0 || th->raised_flag) {
- rb_thread_schedule();
- goto again;
+ VALUE exception = rb_exception_setup(argc, argv);
+
+ /* making an exception object can switch thread,
+ so we need to check thread deadness again */
+ if (rb_threadptr_dead(target_th)) {
+ return Qnil;
}
- exc = rb_make_exception(argc, argv);
- th->thrown_errinfo = exc;
- rb_threadptr_ready(th);
+ rb_threadptr_pending_interrupt_enque(target_th, exception);
+ rb_threadptr_interrupt(target_th);
+
return Qnil;
}
@@ -1337,7 +2762,7 @@ rb_threadptr_signal_raise(rb_thread_t *th, int sig)
argv[0] = rb_eSignal;
argv[1] = INT2FIX(sig);
- rb_threadptr_raise(th->vm->main_thread, 2, argv);
+ rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
}
void
@@ -1347,66 +2772,153 @@ rb_threadptr_signal_exit(rb_thread_t *th)
argv[0] = rb_eSystemExit;
argv[1] = rb_str_new2("exit");
- rb_threadptr_raise(th->vm->main_thread, 2, argv);
-}
-#if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK)
-#define USE_SIGALTSTACK
-#endif
-
-void
-ruby_thread_stack_overflow(rb_thread_t *th)
-{
- th->raised_flag = 0;
-#ifdef USE_SIGALTSTACK
- th->raised_flag = 0;
- rb_exc_raise(sysstack_error);
-#else
- th->errinfo = sysstack_error;
- TH_JUMP_TAG(th, TAG_RAISE);
-#endif
+ // TODO: check signal raise deliverly
+ rb_threadptr_raise(th->vm->ractor.main_thread, 2, argv);
}
int
-rb_threadptr_set_raised(rb_thread_t *th)
+rb_ec_set_raised(rb_execution_context_t *ec)
{
- if (th->raised_flag & RAISED_EXCEPTION) {
- return 1;
+ if (ec->raised_flag & RAISED_EXCEPTION) {
+ return 1;
}
- th->raised_flag |= RAISED_EXCEPTION;
+ ec->raised_flag |= RAISED_EXCEPTION;
return 0;
}
int
-rb_threadptr_reset_raised(rb_thread_t *th)
+rb_ec_reset_raised(rb_execution_context_t *ec)
{
- if (!(th->raised_flag & RAISED_EXCEPTION)) {
- return 0;
+ if (!(ec->raised_flag & RAISED_EXCEPTION)) {
+ return 0;
}
- th->raised_flag &= ~RAISED_EXCEPTION;
+ ec->raised_flag &= ~RAISED_EXCEPTION;
return 1;
}
+/*
+ * Thread-safe IO closing mechanism.
+ *
+ * When an IO is closed while other threads or fibers are blocked on it, we need to:
+ * 1. Track and notify all blocking operations through io->blocking_operations
+ * 2. Ensure only one thread can close at a time using io->closing_ec
+ * 3. Synchronize cleanup using wakeup_mutex
+ *
+ * The close process works as follows:
+ * - First check if any thread is already closing (io->closing_ec)
+ * - Set up wakeup_mutex for synchronization
+ * - Iterate through all blocking operations in io->blocking_operations
+ * - For each blocked fiber with a scheduler:
+ * - Notify via rb_fiber_scheduler_fiber_interrupt
+ * - For each blocked thread without a scheduler:
+ * - Enqueue IOError via rb_threadptr_pending_interrupt_enque
+ * - Wake via rb_threadptr_interrupt
+ * - Wait on wakeup_mutex until all operations are cleaned up
+ * - Only then clear closing state and allow actual close to proceed
+ */
+static VALUE
+thread_io_close_notify_all(VALUE _io)
+{
+ struct rb_io *io = (struct rb_io *)_io;
+
+ size_t count = 0;
+ rb_vm_t *vm = io->closing_ec->thread_ptr->vm;
+ VALUE error = vm->special_exceptions[ruby_error_stream_closed];
+
+ struct rb_io_blocking_operation *blocking_operation;
+ ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) {
+ rb_execution_context_t *ec = blocking_operation->ec;
+
+ // If the operation is in progress, we need to interrupt it:
+ if (ec) {
+ rb_thread_t *thread = ec->thread_ptr;
+
+ VALUE result = RUBY_Qundef;
+ if (thread->scheduler != Qnil) {
+ result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error);
+ }
+
+ if (result == RUBY_Qundef) {
+ // If the thread is not the current thread, we need to enqueue an error:
+ rb_threadptr_pending_interrupt_enque(thread, error);
+ rb_threadptr_interrupt(thread);
+ }
+ }
+
+ count += 1;
+ }
+
+ return (VALUE)count;
+}
+
+size_t
+rb_thread_io_close_interrupt(struct rb_io *io)
+{
+ // We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function.
+ if (io->closing_ec) {
+ return 0;
+ }
+
+ // If there are no blocking operations, we are done:
+ if (ccan_list_empty(rb_io_blocking_operations(io))) {
+ return 0;
+ }
+
+ // Otherwise, we are now closing the IO:
+ rb_execution_context_t *ec = GET_EC();
+ io->closing_ec = ec;
+
+ // This is used to ensure the correct execution context is woken up after the blocking operation is interrupted:
+ io->wakeup_mutex = rb_mutex_new();
+ rb_mutex_allow_trap(io->wakeup_mutex, 1);
+
+ // We need to use a mutex here as entering the fiber scheduler may cause a context switch:
+ VALUE result = rb_mutex_synchronize(io->wakeup_mutex, thread_io_close_notify_all, (VALUE)io);
+
+ return (size_t)result;
+}
+
+void
+rb_thread_io_close_wait(struct rb_io* io)
+{
+ VALUE wakeup_mutex = io->wakeup_mutex;
+
+ if (!RB_TEST(wakeup_mutex)) {
+ // There was nobody else using this file when we closed it, so we never bothered to allocate a mutex:
+ return;
+ }
+
+ rb_mutex_lock(wakeup_mutex);
+ while (!ccan_list_empty(rb_io_blocking_operations(io))) {
+ rb_mutex_sleep(wakeup_mutex, Qnil);
+ }
+ rb_mutex_unlock(wakeup_mutex);
+
+ // We are done closing:
+ io->wakeup_mutex = Qnil;
+ io->closing_ec = NULL;
+}
+
void
rb_thread_fd_close(int fd)
{
- /* TODO: fix me */
+ rb_warn("rb_thread_fd_close is deprecated (and is now a no-op).");
}
/*
* call-seq:
- * thr.raise
- * thr.raise(string)
- * thr.raise(exception [, string [, array]])
+ * raise(exception, message = exception.to_s, backtrace = nil, cause: $!)
+ * raise(message = nil, cause: $!)
*
- * Raises an exception (see <code>Kernel::raise</code>) from <i>thr</i>. The
- * caller does not have to be <i>thr</i>.
+ * Raises an exception from the given thread. The caller does not have to be
+ * +thr+. See Kernel#raise for more information on arguments.
*
* Thread.abort_on_exception = true
* a = Thread.new { sleep(200) }
* a.raise("Gotcha")
*
- * <em>produces:</em>
+ * This will produce:
*
* prog.rb:3: Gotcha (RuntimeError)
* from prog.rb:2:in `initialize'
@@ -1417,57 +2929,77 @@ rb_thread_fd_close(int fd)
static VALUE
thread_raise_m(int argc, VALUE *argv, VALUE self)
{
- rb_thread_t *th;
- GetThreadPtr(self, th);
- rb_threadptr_raise(th, argc, argv);
+ rb_thread_t *target_th = rb_thread_ptr(self);
+ const rb_thread_t *current_th = GET_THREAD();
+
+ threadptr_check_pending_interrupt_queue(target_th);
+ rb_threadptr_raise(target_th, argc, argv);
+
+ /* To perform Thread.current.raise as Kernel.raise */
+ if (current_th == target_th) {
+ RUBY_VM_CHECK_INTS(target_th->ec);
+ }
return Qnil;
}
/*
* call-seq:
- * thr.exit => thr or nil
- * thr.kill => thr or nil
- * thr.terminate => thr or nil
- *
- * Terminates <i>thr</i> and schedules another thread to be run. If this thread
- * is already marked to be killed, <code>exit</code> returns the
- * <code>Thread</code>. If this is the main thread, or the last thread, exits
- * the process.
+ * thr.exit -> thr
+ * thr.kill -> thr
+ * thr.terminate -> thr
+ *
+ * Terminates +thr+ and schedules another thread to be run, returning
+ * the terminated Thread. If this is the main thread, or the last
+ * thread, exits the process. Note that the caller does not wait for
+ * the thread to terminate if the receiver is different from the currently
+ * running thread. The termination is asynchronous, and the thread can still
+ * run a small amount of ruby code before exiting.
*/
VALUE
rb_thread_kill(VALUE thread)
{
- rb_thread_t *th;
-
- GetThreadPtr(thread, th);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
- if (th != GET_THREAD() && th->safe_level < 4) {
- rb_secure(4);
+ if (target_th->to_kill || target_th->status == THREAD_KILLED) {
+ return thread;
}
- if (th->status == THREAD_TO_KILL || th->status == THREAD_KILLED) {
- return thread;
- }
- if (th == th->vm->main_thread) {
- rb_exit(EXIT_SUCCESS);
+ if (target_th == target_th->vm->ractor.main_thread) {
+ rb_exit(EXIT_SUCCESS);
}
- thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
+ RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(target_th));
- rb_threadptr_interrupt(th);
- th->thrown_errinfo = eKillSignal;
- th->status = THREAD_TO_KILL;
+ if (target_th == GET_THREAD()) {
+ /* kill myself immediately */
+ rb_threadptr_to_kill(target_th);
+ }
+ else {
+ threadptr_check_pending_interrupt_queue(target_th);
+ rb_threadptr_pending_interrupt_enque(target_th, RUBY_FATAL_THREAD_KILLED);
+ rb_threadptr_interrupt(target_th);
+ }
return thread;
}
+int
+rb_thread_to_be_killed(VALUE thread)
+{
+ rb_thread_t *target_th = rb_thread_ptr(thread);
+
+ if (target_th->to_kill || target_th->status == THREAD_KILLED) {
+ return TRUE;
+ }
+ return FALSE;
+}
/*
* call-seq:
- * Thread.kill(thread) => thread
+ * Thread.kill(thread) -> thread
*
- * Causes the given <em>thread</em> to exit (see <code>Thread::exit</code>).
+ * Causes the given +thread+ to exit, see also Thread::exit.
*
* count = 0
* a = Thread.new { loop { count += 1 } }
@@ -1486,70 +3018,85 @@ rb_thread_s_kill(VALUE obj, VALUE th)
/*
* call-seq:
- * Thread.exit => thread
+ * Thread.exit -> thread
*
* Terminates the currently running thread and schedules another thread to be
- * run. If this thread is already marked to be killed, <code>exit</code>
- * returns the <code>Thread</code>. If this is the main thread, or the last
- * thread, exit the process.
+ * run.
+ *
+ * If this thread is already marked to be killed, ::exit returns the Thread.
+ *
+ * If this is the main thread, or the last thread, exit the process.
*/
static VALUE
-rb_thread_exit(void)
+rb_thread_exit(VALUE _)
{
- return rb_thread_kill(GET_THREAD()->self);
+ rb_thread_t *th = GET_THREAD();
+ return rb_thread_kill(th->self);
}
/*
* call-seq:
- * thr.wakeup => thr
+ * thr.wakeup -> thr
+ *
+ * Marks a given thread as eligible for scheduling, however it may still
+ * remain blocked on I/O.
*
- * Marks <i>thr</i> as eligible for scheduling (it may still remain blocked on
- * I/O, however). Does not invoke the scheduler (see <code>Thread#run</code>).
+ * *Note:* This does not invoke the scheduler, see #run for more information.
*
* c = Thread.new { Thread.stop; puts "hey!" }
+ * sleep 0.1 while c.status!='sleep'
* c.wakeup
- *
- * <em>produces:</em>
- *
- * hey!
+ * c.join
+ * #=> "hey!"
*/
VALUE
rb_thread_wakeup(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
-
- if (th->status == THREAD_KILLED) {
- rb_raise(rb_eThreadError, "killed thread");
+ if (!RTEST(rb_thread_wakeup_alive(thread))) {
+ rb_raise(rb_eThreadError, "killed thread");
}
- rb_threadptr_ready(th);
- if (th->status != THREAD_TO_KILL) {
- th->status = THREAD_RUNNABLE;
+ return thread;
+}
+
+VALUE
+rb_thread_wakeup_alive(VALUE thread)
+{
+ rb_thread_t *target_th = rb_thread_ptr(thread);
+ if (target_th->status == THREAD_KILLED) return Qnil;
+
+ rb_threadptr_ready(target_th);
+
+ if (target_th->status == THREAD_STOPPED ||
+ target_th->status == THREAD_STOPPED_FOREVER) {
+ target_th->status = THREAD_RUNNABLE;
}
+
return thread;
}
/*
* call-seq:
- * thr.run => thr
+ * thr.run -> thr
*
- * Wakes up <i>thr</i>, making it eligible for scheduling.
+ * Wakes up +thr+, making it eligible for scheduling.
*
* a = Thread.new { puts "a"; Thread.stop; puts "c" }
- * Thread.pass
+ * sleep 0.1 while a.status!='sleep'
* puts "Got here"
* a.run
* a.join
*
- * <em>produces:</em>
+ * This will produce:
*
* a
* Got here
* c
+ *
+ * See also the instance method #wakeup.
*/
VALUE
@@ -1561,69 +3108,60 @@ rb_thread_run(VALUE thread)
}
+VALUE
+rb_thread_stop(void)
+{
+ if (rb_thread_alone()) {
+ rb_raise(rb_eThreadError,
+ "stopping only thread\n\tnote: use sleep to stop forever");
+ }
+ rb_thread_sleep_deadly();
+ return Qnil;
+}
+
/*
* call-seq:
- * Thread.stop => nil
+ * Thread.stop -> nil
*
* Stops execution of the current thread, putting it into a ``sleep'' state,
* and schedules execution of another thread.
*
* a = Thread.new { print "a"; Thread.stop; print "c" }
- * Thread.pass
+ * sleep 0.1 while a.status!='sleep'
* print "b"
* a.run
* a.join
- *
- * <em>produces:</em>
- *
- * abc
+ * #=> "abc"
*/
-VALUE
-rb_thread_stop(void)
+static VALUE
+thread_stop(VALUE _)
{
- if (rb_thread_alone()) {
- rb_raise(rb_eThreadError,
- "stopping only thread\n\tnote: use sleep to stop forever");
- }
- rb_thread_sleep_deadly();
- return Qnil;
+ return rb_thread_stop();
}
-static int
-thread_list_i(st_data_t key, st_data_t val, void *data)
-{
- VALUE ary = (VALUE)data;
- rb_thread_t *th;
- GetThreadPtr((VALUE)key, th);
+/********************************************************************/
- switch (th->status) {
- case THREAD_RUNNABLE:
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- case THREAD_TO_KILL:
- rb_ary_push(ary, th->self);
- default:
- break;
- }
- return ST_CONTINUE;
+VALUE
+rb_thread_list(void)
+{
+ // TODO
+ return rb_ractor_thread_list();
}
-/********************************************************************/
-
/*
* call-seq:
- * Thread.list => array
+ * Thread.list -> array
*
- * Returns an array of <code>Thread</code> objects for all threads that are
- * either runnable or stopped.
+ * Returns an array of Thread objects for all threads that are either runnable
+ * or stopped.
*
* Thread.new { sleep(200) }
* Thread.new { 1000000.times {|i| i*i } }
* Thread.new { Thread.stop }
* Thread.list.each {|t| p t}
*
- * <em>produces:</em>
+ * This will produce:
*
* #<Thread:0x401b3e84 sleep>
* #<Thread:0x401b3f38 run>
@@ -1631,12 +3169,10 @@ thread_list_i(st_data_t key, st_data_t val, void *data)
* #<Thread:0x401bdf4c run>
*/
-VALUE
-rb_thread_list(void)
+static VALUE
+thread_list(VALUE _)
{
- VALUE ary = rb_ary_new();
- st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary);
- return ary;
+ return rb_thread_list();
}
VALUE
@@ -1647,7 +3183,7 @@ rb_thread_current(void)
/*
* call-seq:
- * Thread.current => thread
+ * Thread.current -> thread
*
* Returns the currently executing thread.
*
@@ -1663,12 +3199,12 @@ thread_s_current(VALUE klass)
VALUE
rb_thread_main(void)
{
- return GET_THREAD()->vm->main_thread->self;
+ return GET_RACTOR()->threads.main->self;
}
/*
* call-seq:
- * Thread.main => thread
+ * Thread.main -> thread
*
* Returns the main thread.
*/
@@ -1682,29 +3218,38 @@ rb_thread_s_main(VALUE klass)
/*
* call-seq:
- * Thread.abort_on_exception => true or false
- *
- * Returns the status of the global ``abort on exception'' condition. The
- * default is <code>false</code>. When set to <code>true</code>, or if the
- * global <code>$DEBUG</code> flag is <code>true</code> (perhaps because the
- * command line option <code>-d</code> was specified) all threads will abort
- * (the process will <code>exit(0)</code>) if an exception is raised in any
- * thread. See also <code>Thread::abort_on_exception=</code>.
+ * Thread.abort_on_exception -> true or false
+ *
+ * Returns the status of the global ``abort on exception'' condition.
+ *
+ * The default is +false+.
+ *
+ * When set to +true+, if any thread is aborted by an exception, the
+ * raised exception will be re-raised in the main thread.
+ *
+ * Can also be specified by the global $DEBUG flag or command line option
+ * +-d+.
+ *
+ * See also ::abort_on_exception=.
+ *
+ * There is also an instance level method to set this for a specific thread,
+ * see #abort_on_exception.
*/
static VALUE
-rb_thread_s_abort_exc(void)
+rb_thread_s_abort_exc(VALUE _)
{
- return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse;
+ return RBOOL(GET_THREAD()->vm->thread_abort_on_exception);
}
/*
* call-seq:
- * Thread.abort_on_exception= boolean => true or false
+ * Thread.abort_on_exception= boolean -> true or false
*
- * When set to <code>true</code>, all threads will abort if an exception is
- * raised. Returns the new state.
+ * When set to +true+, if any thread is aborted by an exception, the
+ * raised exception will be re-raised in the main thread.
+ * Returns the new state.
*
* Thread.abort_on_exception = true
* t1 = Thread.new do
@@ -1714,19 +3259,23 @@ rb_thread_s_abort_exc(void)
* sleep(1)
* puts "not reached"
*
- * <em>produces:</em>
+ * This will produce:
*
* In new thread
* prog.rb:4: Exception from thread (RuntimeError)
* from prog.rb:2:in `initialize'
* from prog.rb:2:in `new'
* from prog.rb:2
+ *
+ * See also ::abort_on_exception.
+ *
+ * There is also an instance level method to set this for a specific thread,
+ * see #abort_on_exception=.
*/
static VALUE
rb_thread_s_abort_exc_set(VALUE self, VALUE val)
{
- rb_secure(4);
GET_THREAD()->vm->thread_abort_on_exception = RTEST(val);
return val;
}
@@ -1734,49 +3283,229 @@ rb_thread_s_abort_exc_set(VALUE self, VALUE val)
/*
* call-seq:
- * thr.abort_on_exception => true or false
+ * thr.abort_on_exception -> true or false
*
* Returns the status of the thread-local ``abort on exception'' condition for
- * <i>thr</i>. The default is <code>false</code>. See also
- * <code>Thread::abort_on_exception=</code>.
+ * this +thr+.
+ *
+ * The default is +false+.
+ *
+ * See also #abort_on_exception=.
+ *
+ * There is also a class level method to set this for all threads, see
+ * ::abort_on_exception.
*/
static VALUE
rb_thread_abort_exc(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return th->abort_on_exception ? Qtrue : Qfalse;
+ return RBOOL(rb_thread_ptr(thread)->abort_on_exception);
}
/*
* call-seq:
- * thr.abort_on_exception= boolean => true or false
+ * thr.abort_on_exception= boolean -> true or false
+ *
+ * When set to +true+, if this +thr+ is aborted by an exception, the
+ * raised exception will be re-raised in the main thread.
*
- * When set to <code>true</code>, causes all threads (including the main
- * program) to abort if an exception is raised in <i>thr</i>. The process will
- * effectively <code>exit(0)</code>.
+ * See also #abort_on_exception.
+ *
+ * There is also a class level method to set this for all threads, see
+ * ::abort_on_exception=.
*/
static VALUE
rb_thread_abort_exc_set(VALUE thread, VALUE val)
{
- rb_thread_t *th;
- rb_secure(4);
+ rb_thread_ptr(thread)->abort_on_exception = RTEST(val);
+ return val;
+}
+
+
+/*
+ * call-seq:
+ * Thread.report_on_exception -> true or false
+ *
+ * Returns the status of the global ``report on exception'' condition.
+ *
+ * The default is +true+ since Ruby 2.5.
+ *
+ * All threads created when this flag is true will report
+ * a message on $stderr if an exception kills the thread.
+ *
+ * Thread.new { 1.times { raise } }
+ *
+ * will produce this output on $stderr:
+ *
+ * #<Thread:...> terminated with exception (report_on_exception is true):
+ * Traceback (most recent call last):
+ * 2: from -e:1:in `block in <main>'
+ * 1: from -e:1:in `times'
+ *
+ * This is done to catch errors in threads early.
+ * In some cases, you might not want this output.
+ * There are multiple ways to avoid the extra output:
+ *
+ * * If the exception is not intended, the best is to fix the cause of
+ * the exception so it does not happen anymore.
+ * * If the exception is intended, it might be better to rescue it closer to
+ * where it is raised rather then let it kill the Thread.
+ * * If it is guaranteed the Thread will be joined with Thread#join or
+ * Thread#value, then it is safe to disable this report with
+ * <code>Thread.current.report_on_exception = false</code>
+ * when starting the Thread.
+ * However, this might handle the exception much later, or not at all
+ * if the Thread is never joined due to the parent thread being blocked, etc.
+ *
+ * See also ::report_on_exception=.
+ *
+ * There is also an instance level method to set this for a specific thread,
+ * see #report_on_exception=.
+ *
+ */
- GetThreadPtr(thread, th);
- th->abort_on_exception = RTEST(val);
+static VALUE
+rb_thread_s_report_exc(VALUE _)
+{
+ return RBOOL(GET_THREAD()->vm->thread_report_on_exception);
+}
+
+
+/*
+ * call-seq:
+ * Thread.report_on_exception= boolean -> true or false
+ *
+ * Returns the new state.
+ * When set to +true+, all threads created afterwards will inherit the
+ * condition and report a message on $stderr if an exception kills a thread:
+ *
+ * Thread.report_on_exception = true
+ * t1 = Thread.new do
+ * puts "In new thread"
+ * raise "Exception from thread"
+ * end
+ * sleep(1)
+ * puts "In the main thread"
+ *
+ * This will produce:
+ *
+ * In new thread
+ * #<Thread:...prog.rb:2> terminated with exception (report_on_exception is true):
+ * Traceback (most recent call last):
+ * prog.rb:4:in `block in <main>': Exception from thread (RuntimeError)
+ * In the main thread
+ *
+ * See also ::report_on_exception.
+ *
+ * There is also an instance level method to set this for a specific thread,
+ * see #report_on_exception=.
+ */
+
+static VALUE
+rb_thread_s_report_exc_set(VALUE self, VALUE val)
+{
+ GET_THREAD()->vm->thread_report_on_exception = RTEST(val);
return val;
}
/*
* call-seq:
- * thr.group => thgrp or nil
+ * Thread.ignore_deadlock -> true or false
+ *
+ * Returns the status of the global ``ignore deadlock'' condition.
+ * The default is +false+, so that deadlock conditions are not ignored.
+ *
+ * See also ::ignore_deadlock=.
+ *
+ */
+
+static VALUE
+rb_thread_s_ignore_deadlock(VALUE _)
+{
+ return RBOOL(GET_THREAD()->vm->thread_ignore_deadlock);
+}
+
+
+/*
+ * call-seq:
+ * Thread.ignore_deadlock = boolean -> true or false
+ *
+ * Returns the new state.
+ * When set to +true+, the VM will not check for deadlock conditions.
+ * It is only useful to set this if your application can break a
+ * deadlock condition via some other means, such as a signal.
+ *
+ * Thread.ignore_deadlock = true
+ * queue = Thread::Queue.new
*
- * Returns the <code>ThreadGroup</code> which contains <i>thr</i>, or nil if
- * the thread is not a member of any group.
+ * trap(:SIGUSR1){queue.push "Received signal"}
+ *
+ * # raises fatal error unless ignoring deadlock
+ * puts queue.pop
+ *
+ * See also ::ignore_deadlock.
+ */
+
+static VALUE
+rb_thread_s_ignore_deadlock_set(VALUE self, VALUE val)
+{
+ GET_THREAD()->vm->thread_ignore_deadlock = RTEST(val);
+ return val;
+}
+
+
+/*
+ * call-seq:
+ * thr.report_on_exception -> true or false
+ *
+ * Returns the status of the thread-local ``report on exception'' condition for
+ * this +thr+.
+ *
+ * The default value when creating a Thread is the value of
+ * the global flag Thread.report_on_exception.
+ *
+ * See also #report_on_exception=.
+ *
+ * There is also a class level method to set this for all new threads, see
+ * ::report_on_exception=.
+ */
+
+static VALUE
+rb_thread_report_exc(VALUE thread)
+{
+ return RBOOL(rb_thread_ptr(thread)->report_on_exception);
+}
+
+
+/*
+ * call-seq:
+ * thr.report_on_exception= boolean -> true or false
+ *
+ * When set to +true+, a message is printed on $stderr if an exception
+ * kills this +thr+. See ::report_on_exception for details.
+ *
+ * See also #report_on_exception.
+ *
+ * There is also a class level method to set this for all new threads, see
+ * ::report_on_exception=.
+ */
+
+static VALUE
+rb_thread_report_exc_set(VALUE thread, VALUE val)
+{
+ rb_thread_ptr(thread)->report_on_exception = RTEST(val);
+ return val;
+}
+
+
+/*
+ * call-seq:
+ * thr.group -> thgrp or nil
+ *
+ * Returns the ThreadGroup which contains the given thread.
*
* Thread.main.group #=> #<ThreadGroup:0x4029d914>
*/
@@ -1784,32 +3513,23 @@ rb_thread_abort_exc_set(VALUE thread, VALUE val)
VALUE
rb_thread_group(VALUE thread)
{
- rb_thread_t *th;
- VALUE group;
- GetThreadPtr(thread, th);
- group = th->thgroup;
-
- if (!group) {
- group = Qnil;
- }
- return group;
+ return rb_thread_ptr(thread)->thgroup;
}
static const char *
-thread_status_name(enum rb_thread_status status)
+thread_status_name(rb_thread_t *th, int detail)
{
- switch (status) {
+ switch (th->status) {
case THREAD_RUNNABLE:
- return "run";
- case THREAD_STOPPED:
+ return th->to_kill ? "aborting" : "run";
case THREAD_STOPPED_FOREVER:
- return "sleep";
- case THREAD_TO_KILL:
- return "aborting";
+ if (detail) return "sleep_forever";
+ case THREAD_STOPPED:
+ return "sleep";
case THREAD_KILLED:
- return "dead";
+ return "dead";
default:
- return "unknown";
+ return "unknown";
}
}
@@ -1822,13 +3542,20 @@ rb_threadptr_dead(rb_thread_t *th)
/*
* call-seq:
- * thr.status => string, false or nil
+ * thr.status -> string, false or nil
*
- * Returns the status of <i>thr</i>: ``<code>sleep</code>'' if <i>thr</i> is
- * sleeping or waiting on I/O, ``<code>run</code>'' if <i>thr</i> is executing,
- * ``<code>aborting</code>'' if <i>thr</i> is aborting, <code>false</code> if
- * <i>thr</i> terminated normally, and <code>nil</code> if <i>thr</i>
- * terminated with an exception.
+ * Returns the status of +thr+.
+ *
+ * [<tt>"sleep"</tt>]
+ * Returned if this thread is sleeping or waiting on I/O
+ * [<tt>"run"</tt>]
+ * When this thread is executing
+ * [<tt>"aborting"</tt>]
+ * If this thread is aborting
+ * [+false+]
+ * When this thread is terminated normally
+ * [+nil+]
+ * If terminated with an exception.
*
* a = Thread.new { raise("die now") }
* b = Thread.new { Thread.stop }
@@ -1840,206 +3567,449 @@ rb_threadptr_dead(rb_thread_t *th)
* c.status #=> false
* d.status #=> "aborting"
* Thread.current.status #=> "run"
+ *
+ * See also the instance methods #alive? and #stop?
*/
static VALUE
rb_thread_status(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
- if (rb_threadptr_dead(th)) {
- if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo)
- /* TODO */ ) {
- return Qnil;
- }
- return Qfalse;
+ if (rb_threadptr_dead(target_th)) {
+ if (!NIL_P(target_th->ec->errinfo) &&
+ !FIXNUM_P(target_th->ec->errinfo)) {
+ return Qnil;
+ }
+ else {
+ return Qfalse;
+ }
+ }
+ else {
+ return rb_str_new2(thread_status_name(target_th, FALSE));
}
- return rb_str_new2(thread_status_name(th->status));
}
/*
* call-seq:
- * thr.alive? => true or false
+ * thr.alive? -> true or false
*
- * Returns <code>true</code> if <i>thr</i> is running or sleeping.
+ * Returns +true+ if +thr+ is running or sleeping.
*
* thr = Thread.new { }
* thr.join #=> #<Thread:0x401b3fb0 dead>
* Thread.current.alive? #=> true
* thr.alive? #=> false
+ *
+ * See also #stop? and #status.
*/
static VALUE
rb_thread_alive_p(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
-
- if (rb_threadptr_dead(th))
- return Qfalse;
- return Qtrue;
+ return RBOOL(!thread_finished(rb_thread_ptr(thread)));
}
/*
* call-seq:
- * thr.stop? => true or false
+ * thr.stop? -> true or false
*
- * Returns <code>true</code> if <i>thr</i> is dead or sleeping.
+ * Returns +true+ if +thr+ is dead or sleeping.
*
* a = Thread.new { Thread.stop }
* b = Thread.current
* a.stop? #=> true
* b.stop? #=> false
+ *
+ * See also #alive? and #status.
*/
static VALUE
rb_thread_stop_p(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *th = rb_thread_ptr(thread);
- if (rb_threadptr_dead(th))
- return Qtrue;
- if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER)
- return Qtrue;
- return Qfalse;
+ if (rb_threadptr_dead(th)) {
+ return Qtrue;
+ }
+ return RBOOL(th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER);
}
/*
- * call-seq:
- * thr.safe_level => integer
+ * call-seq:
+ * thr.name -> string
*
- * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe
- * levels can help when implementing sandboxes which run insecure code.
+ * show the name of the thread.
+ */
+
+static VALUE
+rb_thread_getname(VALUE thread)
+{
+ return rb_thread_ptr(thread)->name;
+}
+
+/*
+ * call-seq:
+ * thr.name=(name) -> string
*
- * thr = Thread.new { $SAFE = 3; sleep }
- * Thread.current.safe_level #=> 0
- * thr.safe_level #=> 3
+ * set given name to the ruby thread.
+ * On some platform, it may set the name to pthread and/or kernel.
*/
static VALUE
-rb_thread_safe_level(VALUE thread)
+rb_thread_setname(VALUE thread, VALUE name)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ rb_thread_t *target_th = rb_thread_ptr(thread);
- return INT2NUM(th->safe_level);
+ if (!NIL_P(name)) {
+ rb_encoding *enc;
+ StringValueCStr(name);
+ enc = rb_enc_get(name);
+ if (!rb_enc_asciicompat(enc)) {
+ rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
+ rb_enc_name(enc));
+ }
+ name = rb_str_new_frozen(name);
+ }
+ target_th->name = name;
+ if (threadptr_initialized(target_th) && target_th->has_dedicated_nt) {
+ native_set_another_thread_name(target_th->nt->thread_id, name);
+ }
+ return name;
}
+#if USE_NATIVE_THREAD_NATIVE_THREAD_ID
/*
* call-seq:
- * thr.inspect => string
+ * thr.native_thread_id -> integer
+ *
+ * Return the native thread ID which is used by the Ruby thread.
+ *
+ * The ID depends on the OS. (not POSIX thread ID returned by pthread_self(3))
+ * * On Linux it is TID returned by gettid(2).
+ * * On macOS it is the system-wide unique integral ID of thread returned
+ * by pthread_threadid_np(3).
+ * * On FreeBSD it is the unique integral ID of the thread returned by
+ * pthread_getthreadid_np(3).
+ * * On Windows it is the thread identifier returned by GetThreadId().
+ * * On other platforms, it raises NotImplementedError.
+ *
+ * NOTE:
+ * If the thread is not associated yet or already deassociated with a native
+ * thread, it returns _nil_.
+ * If the Ruby implementation uses M:N thread model, the ID may change
+ * depending on the timing.
+ */
+
+static VALUE
+rb_thread_native_thread_id(VALUE thread)
+{
+ rb_thread_t *target_th = rb_thread_ptr(thread);
+ if (rb_threadptr_dead(target_th)) return Qnil;
+ return native_thread_native_thread_id(target_th);
+}
+#else
+# define rb_thread_native_thread_id rb_f_notimplement
+#endif
+
+/*
+ * call-seq:
+ * thr.to_s -> string
*
* Dump the name, id, and status of _thr_ to a string.
*/
static VALUE
-rb_thread_inspect(VALUE thread)
+rb_thread_to_s(VALUE thread)
{
- const char *cname = rb_obj_classname(thread);
- rb_thread_t *th;
+ VALUE cname = rb_class_path(rb_obj_class(thread));
+ rb_thread_t *target_th = rb_thread_ptr(thread);
const char *status;
- VALUE str;
+ VALUE str, loc;
- GetThreadPtr(thread, th);
- status = thread_status_name(th->status);
- str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status);
- OBJ_INFECT(str, thread);
+ status = thread_status_name(target_th, TRUE);
+ str = rb_sprintf("#<%"PRIsVALUE":%p", cname, (void *)thread);
+ if (!NIL_P(target_th->name)) {
+ rb_str_catf(str, "@%"PRIsVALUE, target_th->name);
+ }
+ if ((loc = threadptr_invoke_proc_location(target_th)) != Qnil) {
+ rb_str_catf(str, " %"PRIsVALUE":%"PRIsVALUE,
+ RARRAY_AREF(loc, 0), RARRAY_AREF(loc, 1));
+ }
+ rb_str_catf(str, " %s>", status);
return str;
}
-VALUE
-rb_thread_local_aref(VALUE thread, ID id)
-{
- rb_thread_t *th;
- VALUE val;
+/* variables for recursive traversals */
+#define recursive_key id__recursive_key__
- GetThreadPtr(thread, th);
- if (rb_safe_level() >= 4 && th != GET_THREAD()) {
- rb_raise(rb_eSecurityError, "Insecure: thread locals");
- }
- if (!th->local_storage) {
- return Qnil;
+static VALUE
+threadptr_local_aref(rb_thread_t *th, ID id)
+{
+ if (id == recursive_key) {
+ return th->ec->local_storage_recursive_hash;
}
- if (st_lookup(th->local_storage, id, &val)) {
- return val;
+ else {
+ VALUE val;
+ struct rb_id_table *local_storage = th->ec->local_storage;
+
+ if (local_storage != NULL && rb_id_table_lookup(local_storage, id, &val)) {
+ return val;
+ }
+ else {
+ return Qnil;
+ }
}
- return Qnil;
+}
+
+VALUE
+rb_thread_local_aref(VALUE thread, ID id)
+{
+ return threadptr_local_aref(rb_thread_ptr(thread), id);
}
/*
* call-seq:
- * thr[sym] => obj or nil
+ * thr[sym] -> obj or nil
+ *
+ * Attribute Reference---Returns the value of a fiber-local variable (current thread's root fiber
+ * if not explicitly inside a Fiber), using either a symbol or a string name.
+ * If the specified variable does not exist, returns +nil+.
+ *
+ * [
+ * Thread.new { Thread.current["name"] = "A" },
+ * Thread.new { Thread.current[:name] = "B" },
+ * Thread.new { Thread.current["name"] = "C" }
+ * ].each do |th|
+ * th.join
+ * puts "#{th.inspect}: #{th[:name]}"
+ * end
*
- * Attribute Reference---Returns the value of a thread-local variable, using
- * either a symbol or a string name. If the specified variable does not exist,
- * returns <code>nil</code>.
+ * This will produce:
+ *
+ * #<Thread:0x00000002a54220 dead>: A
+ * #<Thread:0x00000002a541a8 dead>: B
+ * #<Thread:0x00000002a54130 dead>: C
+ *
+ * Thread#[] and Thread#[]= are not thread-local but fiber-local.
+ * This confusion did not exist in Ruby 1.8 because
+ * fibers are only available since Ruby 1.9.
+ * Ruby 1.9 chooses that the methods behaves fiber-local to save
+ * following idiom for dynamic scope.
+ *
+ * def meth(newvalue)
+ * begin
+ * oldvalue = Thread.current[:name]
+ * Thread.current[:name] = newvalue
+ * yield
+ * ensure
+ * Thread.current[:name] = oldvalue
+ * end
+ * end
*
- * a = Thread.new { Thread.current["name"] = "A"; Thread.stop }
- * b = Thread.new { Thread.current[:name] = "B"; Thread.stop }
- * c = Thread.new { Thread.current["name"] = "C"; Thread.stop }
- * Thread.list.each {|x| puts "#{x.inspect}: #{x[:name]}" }
+ * The idiom may not work as dynamic scope if the methods are thread-local
+ * and a given block switches fiber.
*
- * <em>produces:</em>
+ * f = Fiber.new {
+ * meth(1) {
+ * Fiber.yield
+ * }
+ * }
+ * meth(2) {
+ * f.resume
+ * }
+ * f.resume
+ * p Thread.current[:name]
+ * #=> nil if fiber-local
+ * #=> 2 if thread-local (The value 2 is leaked to outside of meth method.)
+ *
+ * For thread-local variables, please see #thread_variable_get and
+ * #thread_variable_set.
*
- * #<Thread:0x401b3b3c sleep>: C
- * #<Thread:0x401b3bc8 sleep>: B
- * #<Thread:0x401b3c68 sleep>: A
- * #<Thread:0x401bdf4c run>:
*/
static VALUE
-rb_thread_aref(VALUE thread, VALUE id)
+rb_thread_aref(VALUE thread, VALUE key)
{
- return rb_thread_local_aref(thread, rb_to_id(id));
+ ID id = rb_check_id(&key);
+ if (!id) return Qnil;
+ return rb_thread_local_aref(thread, id);
}
-VALUE
-rb_thread_local_aset(VALUE thread, ID id, VALUE val)
+/*
+ * call-seq:
+ * thr.fetch(sym) -> obj
+ * thr.fetch(sym) { } -> obj
+ * thr.fetch(sym, default) -> obj
+ *
+ * Returns a fiber-local for the given key. If the key can't be
+ * found, there are several options: With no other arguments, it will
+ * raise a KeyError exception; if <i>default</i> is given, then that
+ * will be returned; if the optional code block is specified, then
+ * that will be run and its result returned. See Thread#[] and
+ * Hash#fetch.
+ */
+static VALUE
+rb_thread_fetch(int argc, VALUE *argv, VALUE self)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
+ VALUE key, val;
+ ID id;
+ rb_thread_t *target_th = rb_thread_ptr(self);
+ int block_given;
- if (rb_safe_level() >= 4 && th != GET_THREAD()) {
- rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals");
+ rb_check_arity(argc, 1, 2);
+ key = argv[0];
+
+ block_given = rb_block_given_p();
+ if (block_given && argc == 2) {
+ rb_warn("block supersedes default value argument");
}
- if (OBJ_FROZEN(thread)) {
- rb_error_frozen("thread locals");
+
+ id = rb_check_id(&key);
+
+ if (id == recursive_key) {
+ return target_th->ec->local_storage_recursive_hash;
}
- if (!th->local_storage) {
- th->local_storage = st_init_numtable();
+ else if (id && target_th->ec->local_storage &&
+ rb_id_table_lookup(target_th->ec->local_storage, id, &val)) {
+ return val;
}
- if (NIL_P(val)) {
- st_delete_wrap(th->local_storage, id);
- return Qnil;
+ else if (block_given) {
+ return rb_yield(key);
}
- st_insert(th->local_storage, id, val);
- return val;
+ else if (argc == 1) {
+ rb_key_err_raise(rb_sprintf("key not found: %+"PRIsVALUE, key), self, key);
+ }
+ else {
+ return argv[1];
+ }
+}
+
+static VALUE
+threadptr_local_aset(rb_thread_t *th, ID id, VALUE val)
+{
+ if (id == recursive_key) {
+ th->ec->local_storage_recursive_hash = val;
+ return val;
+ }
+ else {
+ struct rb_id_table *local_storage = th->ec->local_storage;
+
+ if (NIL_P(val)) {
+ if (!local_storage) return Qnil;
+ rb_id_table_delete(local_storage, id);
+ return Qnil;
+ }
+ else {
+ if (local_storage == NULL) {
+ th->ec->local_storage = local_storage = rb_id_table_create(0);
+ }
+ rb_id_table_insert(local_storage, id, val);
+ return val;
+ }
+ }
+}
+
+VALUE
+rb_thread_local_aset(VALUE thread, ID id, VALUE val)
+{
+ if (OBJ_FROZEN(thread)) {
+ rb_frozen_error_raise(thread, "can't modify frozen thread locals");
+ }
+
+ return threadptr_local_aset(rb_thread_ptr(thread), id, val);
}
/*
* call-seq:
- * thr[sym] = obj => obj
+ * thr[sym] = obj -> obj
*
- * Attribute Assignment---Sets or creates the value of a thread-local variable,
- * using either a symbol or a string. See also <code>Thread#[]</code>.
+ * Attribute Assignment---Sets or creates the value of a fiber-local variable,
+ * using either a symbol or a string.
+ *
+ * See also Thread#[].
+ *
+ * For thread-local variables, please see #thread_variable_set and
+ * #thread_variable_get.
*/
static VALUE
-rb_thread_aset(VALUE self, ID id, VALUE val)
+rb_thread_aset(VALUE self, VALUE id, VALUE val)
{
return rb_thread_local_aset(self, rb_to_id(id), val);
}
/*
* call-seq:
- * thr.key?(sym) => true or false
+ * thr.thread_variable_get(key) -> obj or nil
+ *
+ * Returns the value of a thread local variable that has been set. Note that
+ * these are different than fiber local values. For fiber local values,
+ * please see Thread#[] and Thread#[]=.
+ *
+ * Thread local values are carried along with threads, and do not respect
+ * fibers. For example:
+ *
+ * Thread.new {
+ * Thread.current.thread_variable_set("foo", "bar") # set a thread local
+ * Thread.current["foo"] = "bar" # set a fiber local
+ *
+ * Fiber.new {
+ * Fiber.yield [
+ * Thread.current.thread_variable_get("foo"), # get the thread local
+ * Thread.current["foo"], # get the fiber local
+ * ]
+ * }.resume
+ * }.join.value # => ['bar', nil]
+ *
+ * The value "bar" is returned for the thread local, where nil is returned
+ * for the fiber local. The fiber is executed in the same thread, so the
+ * thread local values are available.
+ */
+
+static VALUE
+rb_thread_variable_get(VALUE thread, VALUE key)
+{
+ VALUE locals;
+ VALUE symbol = rb_to_symbol(key);
+
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ return Qnil;
+ }
+ locals = rb_thread_local_storage(thread);
+ return rb_hash_aref(locals, symbol);
+}
+
+/*
+ * call-seq:
+ * thr.thread_variable_set(key, value)
+ *
+ * Sets a thread local with +key+ to +value+. Note that these are local to
+ * threads, and not to fibers. Please see Thread#thread_variable_get and
+ * Thread#[] for more information.
+ */
+
+static VALUE
+rb_thread_variable_set(VALUE thread, VALUE key, VALUE val)
+{
+ VALUE locals;
+
+ if (OBJ_FROZEN(thread)) {
+ rb_frozen_error_raise(thread, "can't modify frozen thread locals");
+ }
+
+ locals = rb_thread_local_storage(thread);
+ return rb_hash_aset(locals, rb_to_symbol(key), val);
+}
+
+/*
+ * call-seq:
+ * thr.key?(sym) -> true or false
*
- * Returns <code>true</code> if the given string (or symbol) exists as a
- * thread-local variable.
+ * Returns +true+ if the given string (or symbol) exists as a fiber-local
+ * variable.
*
* me = Thread.current
* me[:oliver] = "a"
@@ -2050,49 +4020,35 @@ rb_thread_aset(VALUE self, ID id, VALUE val)
static VALUE
rb_thread_key_p(VALUE self, VALUE key)
{
- rb_thread_t *th;
- ID id = rb_to_id(key);
-
- GetThreadPtr(self, th);
+ VALUE val;
+ ID id = rb_check_id(&key);
+ struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
- if (!th->local_storage) {
- return Qfalse;
- }
- if (st_lookup(th->local_storage, id, 0)) {
- return Qtrue;
+ if (!id || local_storage == NULL) {
+ return Qfalse;
}
- return Qfalse;
+ return RBOOL(rb_id_table_lookup(local_storage, id, &val));
}
-static int
-thread_keys_i(ID key, VALUE value, VALUE ary)
-{
- rb_ary_push(ary, ID2SYM(key));
- return ST_CONTINUE;
-}
-
-static int
-vm_living_thread_num(rb_vm_t *vm)
+static enum rb_id_table_iterator_result
+thread_keys_i(ID key, VALUE value, void *ary)
{
- return vm->living_threads->num_entries;
+ rb_ary_push((VALUE)ary, ID2SYM(key));
+ return ID_TABLE_CONTINUE;
}
int
rb_thread_alone(void)
{
- int num = 1;
- if (GET_THREAD()->vm->living_threads) {
- num = vm_living_thread_num(GET_THREAD()->vm);
- thread_debug("rb_thread_alone: %d\n", num);
- }
- return num == 1;
+ // TODO
+ return rb_ractor_living_thread_num(GET_RACTOR()) == 1;
}
/*
* call-seq:
- * thr.keys => array
+ * thr.keys -> array
*
- * Returns an an array of the names of the thread-local variables (as Symbols).
+ * Returns an array of the names of the fiber-local variables (as Symbols).
*
* thr = Thread.new do
* Thread.current[:cat] = 'meow'
@@ -2105,24 +4061,96 @@ rb_thread_alone(void)
static VALUE
rb_thread_keys(VALUE self)
{
- rb_thread_t *th;
+ struct rb_id_table *local_storage = rb_thread_ptr(self)->ec->local_storage;
VALUE ary = rb_ary_new();
- GetThreadPtr(self, th);
- if (th->local_storage) {
- st_foreach(th->local_storage, thread_keys_i, ary);
+ if (local_storage) {
+ rb_id_table_foreach(local_storage, thread_keys_i, (void *)ary);
}
return ary;
}
+static int
+keys_i(VALUE key, VALUE value, VALUE ary)
+{
+ rb_ary_push(ary, key);
+ return ST_CONTINUE;
+}
+
/*
* call-seq:
- * thr.priority => integer
+ * thr.thread_variables -> array
+ *
+ * Returns an array of the names of the thread-local variables (as Symbols).
+ *
+ * thr = Thread.new do
+ * Thread.current.thread_variable_set(:cat, 'meow')
+ * Thread.current.thread_variable_set("dog", 'woof')
+ * end
+ * thr.join #=> #<Thread:0x401b3f10 dead>
+ * thr.thread_variables #=> [:dog, :cat]
+ *
+ * Note that these are not fiber local variables. Please see Thread#[] and
+ * Thread#thread_variable_get for more details.
+ */
+
+static VALUE
+rb_thread_variables(VALUE thread)
+{
+ VALUE locals;
+ VALUE ary;
+
+ ary = rb_ary_new();
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ return ary;
+ }
+ locals = rb_thread_local_storage(thread);
+ rb_hash_foreach(locals, keys_i, ary);
+
+ return ary;
+}
+
+/*
+ * call-seq:
+ * thr.thread_variable?(key) -> true or false
+ *
+ * Returns +true+ if the given string (or symbol) exists as a thread-local
+ * variable.
+ *
+ * me = Thread.current
+ * me.thread_variable_set(:oliver, "a")
+ * me.thread_variable?(:oliver) #=> true
+ * me.thread_variable?(:stanley) #=> false
+ *
+ * Note that these are not fiber local variables. Please see Thread#[] and
+ * Thread#thread_variable_get for more details.
+ */
+
+static VALUE
+rb_thread_variable_p(VALUE thread, VALUE key)
+{
+ VALUE locals;
+ VALUE symbol = rb_to_symbol(key);
+
+ if (LIKELY(!THREAD_LOCAL_STORAGE_INITIALISED_P(thread))) {
+ return Qfalse;
+ }
+ locals = rb_thread_local_storage(thread);
+
+ return RBOOL(rb_hash_lookup(locals, symbol) != Qnil);
+}
+
+/*
+ * call-seq:
+ * thr.priority -> integer
*
* Returns the priority of <i>thr</i>. Default is inherited from the
* current thread which creating the new thread, or zero for the
- * initial main thread; higher-priority threads will run before
- * lower-priority threads.
+ * initial main thread; higher-priority thread will run more frequently
+ * than lower-priority threads (but lower-priority threads can also run).
+ *
+ * This is just hint for Ruby thread scheduler. It may be ignored on some
+ * platform.
*
* Thread.current.priority #=> 0
*/
@@ -2130,18 +4158,20 @@ rb_thread_keys(VALUE self)
static VALUE
rb_thread_priority(VALUE thread)
{
- rb_thread_t *th;
- GetThreadPtr(thread, th);
- return INT2NUM(th->priority);
+ return INT2NUM(rb_thread_ptr(thread)->priority);
}
/*
* call-seq:
- * thr.priority= integer => thr
+ * thr.priority= integer -> thr
*
* Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads
- * will run before lower-priority threads.
+ * will run more frequently than lower-priority threads (but lower-priority
+ * threads can also run).
+ *
+ * This is just hint for Ruby thread scheduler. It may be ignored on some
+ * platform.
*
* count1 = count2 = 0
* a = Thread.new do
@@ -2161,27 +4191,23 @@ rb_thread_priority(VALUE thread)
static VALUE
rb_thread_priority_set(VALUE thread, VALUE prio)
{
- rb_thread_t *th;
+ rb_thread_t *target_th = rb_thread_ptr(thread);
int priority;
- GetThreadPtr(thread, th);
-
- rb_secure(4);
#if USE_NATIVE_THREAD_PRIORITY
- th->priority = NUM2INT(prio);
+ target_th->priority = NUM2INT(prio);
native_thread_apply_priority(th);
#else
priority = NUM2INT(prio);
if (priority > RUBY_THREAD_PRIORITY_MAX) {
- priority = RUBY_THREAD_PRIORITY_MAX;
+ priority = RUBY_THREAD_PRIORITY_MAX;
}
else if (priority < RUBY_THREAD_PRIORITY_MIN) {
- priority = RUBY_THREAD_PRIORITY_MIN;
+ priority = RUBY_THREAD_PRIORITY_MIN;
}
- th->priority = priority;
- th->slice = priority;
+ target_th->priority = (int8_t)priority;
#endif
- return INT2NUM(th->priority);
+ return INT2NUM(target_th->priority);
}
/* for IO */
@@ -2200,9 +4226,11 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
* - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4)
* select(2) documents how to allocate fd_set dynamically.
* http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4
- * - HP-UX documents how to allocate fd_set dynamically.
- * http://docs.hp.com/en/B2355-60105/select.2.html
* - Solaris 8 has select_large_fdset
+ * - Mac OS X 10.7 (Lion)
+ * select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and
+ * _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined.
+ * https://developer.apple.com/library/archive/releasenotes/Darwin/SymbolVariantsRelNotes/index.html
*
* When fd_set is not big enough to hold big file descriptors,
* it should be allocated dynamically.
@@ -2216,7 +4244,7 @@ rb_thread_priority_set(VALUE thread, VALUE prio)
*/
void
-rb_fd_init(volatile rb_fdset_t *fds)
+rb_fd_init(rb_fdset_t *fds)
{
fds->maxfd = 0;
fds->fdset = ALLOC(fd_set);
@@ -2224,9 +4252,21 @@ rb_fd_init(volatile rb_fdset_t *fds)
}
void
+rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
+{
+ size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
+
+ if (size < sizeof(fd_set))
+ size = sizeof(fd_set);
+ dst->maxfd = src->maxfd;
+ dst->fdset = xmalloc(size);
+ memcpy(dst->fdset, src->fdset, size);
+}
+
+void
rb_fd_term(rb_fdset_t *fds)
{
- if (fds->fdset) xfree(fds->fdset);
+ xfree(fds->fdset);
fds->maxfd = 0;
fds->fdset = 0;
}
@@ -2234,10 +4274,8 @@ rb_fd_term(rb_fdset_t *fds)
void
rb_fd_zero(rb_fdset_t *fds)
{
- if (fds->fdset) {
- MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
- FD_ZERO(fds->fdset);
- }
+ if (fds->fdset)
+ MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS));
}
static void
@@ -2250,8 +4288,8 @@ rb_fd_resize(int n, rb_fdset_t *fds)
if (o < sizeof(fd_set)) o = sizeof(fd_set);
if (m > o) {
- fds->fdset = realloc(fds->fdset, m);
- memset((char *)fds->fdset + o, 0, m - o);
+ fds->fdset = xrealloc(fds->fdset, m);
+ memset((char *)fds->fdset + o, 0, m - o);
}
if (n >= fds->maxfd) fds->maxfd = n + 1;
}
@@ -2284,10 +4322,22 @@ rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max)
if (size < sizeof(fd_set)) size = sizeof(fd_set);
dst->maxfd = max;
- dst->fdset = realloc(dst->fdset, size);
+ dst->fdset = xrealloc(dst->fdset, size);
memcpy(dst->fdset, src, size);
}
+void
+rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src)
+{
+ size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask);
+
+ if (size < sizeof(fd_set))
+ size = sizeof(fd_set);
+ dst->maxfd = src->maxfd;
+ dst->fdset = xrealloc(dst->fdset, size);
+ memcpy(dst->fdset, src->fdset, size);
+}
+
int
rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout)
{
@@ -2307,20 +4357,22 @@ rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *excep
return select(n, r, w, e, timeout);
}
+#define rb_fd_no_init(fds) ((void)((fds)->fdset = 0), (void)((fds)->maxfd = 0))
+
#undef FD_ZERO
#undef FD_SET
#undef FD_CLR
#undef FD_ISSET
#define FD_ZERO(f) rb_fd_zero(f)
-#define FD_SET(i, f) rb_fd_set(i, f)
-#define FD_CLR(i, f) rb_fd_clr(i, f)
-#define FD_ISSET(i, f) rb_fd_isset(i, f)
+#define FD_SET(i, f) rb_fd_set((i), (f))
+#define FD_CLR(i, f) rb_fd_clr((i), (f))
+#define FD_ISSET(i, f) rb_fd_isset((i), (f))
#elif defined(_WIN32)
void
-rb_fd_init(volatile rb_fdset_t *set)
+rb_fd_init(rb_fdset_t *set)
{
set->capa = FD_SETSIZE;
set->fdset = ALLOC(fd_set);
@@ -2328,6 +4380,13 @@ rb_fd_init(volatile rb_fdset_t *set)
}
void
+rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src)
+{
+ rb_fd_init(dst);
+ rb_fd_dup(dst, src);
+}
+
+void
rb_fd_term(rb_fdset_t *set)
{
xfree(set->fdset);
@@ -2347,8 +4406,10 @@ rb_fd_set(int fd, rb_fdset_t *set)
}
}
if (set->fdset->fd_count >= (unsigned)set->capa) {
- set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
- set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa);
+ set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE;
+ set->fdset =
+ rb_xrealloc_mul_add(
+ set->fdset, set->capa, sizeof(SOCKET), sizeof(unsigned int));
}
set->fdset->fd_array[set->fdset->fd_count++] = s;
}
@@ -2359,240 +4420,380 @@ rb_fd_set(int fd, rb_fdset_t *set)
#undef FD_ISSET
#define FD_ZERO(f) rb_fd_zero(f)
-#define FD_SET(i, f) rb_fd_set(i, f)
-#define FD_CLR(i, f) rb_fd_clr(i, f)
-#define FD_ISSET(i, f) rb_fd_isset(i, f)
+#define FD_SET(i, f) rb_fd_set((i), (f))
+#define FD_CLR(i, f) rb_fd_clr((i), (f))
+#define FD_ISSET(i, f) rb_fd_isset((i), (f))
+#define rb_fd_no_init(fds) (void)((fds)->fdset = 0)
+
+#endif
+
+#ifndef rb_fd_no_init
+#define rb_fd_no_init(fds) (void)(fds)
#endif
-#if defined(__CYGWIN__) || defined(_WIN32)
-static long
-cmp_tv(const struct timeval *a, const struct timeval *b)
+static int
+wait_retryable(volatile int *result, int errnum, rb_hrtime_t *rel, rb_hrtime_t end)
{
- long d = (a->tv_sec - b->tv_sec);
- return (d != 0) ? d : (a->tv_usec - b->tv_usec);
+ int r = *result;
+ if (r < 0) {
+ switch (errnum) {
+ case EINTR:
+#ifdef ERESTART
+ case ERESTART:
+#endif
+ *result = 0;
+ if (rel && hrtime_update_expire(rel, end)) {
+ *rel = 0;
+ }
+ return TRUE;
+ }
+ return FALSE;
+ }
+ else if (r == 0) {
+ /* check for spurious wakeup */
+ if (rel) {
+ return !hrtime_update_expire(rel, end);
+ }
+ return TRUE;
+ }
+ return FALSE;
}
-static int
-subtract_tv(struct timeval *rest, const struct timeval *wait)
+struct select_set {
+ int max;
+ rb_thread_t *th;
+ rb_fdset_t *rset;
+ rb_fdset_t *wset;
+ rb_fdset_t *eset;
+ rb_fdset_t orig_rset;
+ rb_fdset_t orig_wset;
+ rb_fdset_t orig_eset;
+ struct timeval *timeout;
+};
+
+static VALUE
+select_set_free(VALUE p)
{
- while (rest->tv_usec < wait->tv_usec) {
- if (rest->tv_sec <= wait->tv_sec) {
- return 0;
- }
- rest->tv_sec -= 1;
- rest->tv_usec += 1000 * 1000;
+ struct select_set *set = (struct select_set *)p;
+
+ rb_fd_term(&set->orig_rset);
+ rb_fd_term(&set->orig_wset);
+ rb_fd_term(&set->orig_eset);
+
+ return Qfalse;
+}
+
+static VALUE
+do_select(VALUE p)
+{
+ struct select_set *set = (struct select_set *)p;
+ volatile int result = 0;
+ int lerrno;
+ rb_hrtime_t *to, rel, end = 0;
+
+ timeout_prepare(&to, &rel, &end, set->timeout);
+ volatile rb_hrtime_t endtime = end;
+#define restore_fdset(dst, src) \
+ ((dst) ? rb_fd_dup(dst, src) : (void)0)
+#define do_select_update() \
+ (restore_fdset(set->rset, &set->orig_rset), \
+ restore_fdset(set->wset, &set->orig_wset), \
+ restore_fdset(set->eset, &set->orig_eset), \
+ TRUE)
+
+ do {
+ lerrno = 0;
+
+ BLOCKING_REGION(set->th, {
+ struct timeval tv;
+
+ if (!RUBY_VM_INTERRUPTED(set->th->ec)) {
+ result = native_fd_select(set->max,
+ set->rset, set->wset, set->eset,
+ rb_hrtime2timeval(&tv, to), set->th);
+ if (result < 0) lerrno = errno;
+ }
+ }, ubf_select, set->th, TRUE);
+
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec); /* may raise */
+ } while (wait_retryable(&result, lerrno, to, endtime) && do_select_update());
+
+ RUBY_VM_CHECK_INTS_BLOCKING(set->th->ec);
+
+ if (result < 0) {
+ errno = lerrno;
}
- rest->tv_sec -= wait->tv_sec;
- rest->tv_usec -= wait->tv_usec;
- return 1;
+
+ return (VALUE)result;
}
+
+int
+rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
+ struct timeval *timeout)
+{
+ struct select_set set;
+
+ set.th = GET_THREAD();
+ RUBY_VM_CHECK_INTS_BLOCKING(set.th->ec);
+ set.max = max;
+ set.rset = read;
+ set.wset = write;
+ set.eset = except;
+ set.timeout = timeout;
+
+ if (!set.rset && !set.wset && !set.eset) {
+ if (!timeout) {
+ rb_thread_sleep_forever();
+ return 0;
+ }
+ rb_thread_wait_for(*timeout);
+ return 0;
+ }
+
+#define fd_init_copy(f) do { \
+ if (set.f) { \
+ rb_fd_resize(set.max - 1, set.f); \
+ if (&set.orig_##f != set.f) { /* sigwait_fd */ \
+ rb_fd_init_copy(&set.orig_##f, set.f); \
+ } \
+ } \
+ else { \
+ rb_fd_no_init(&set.orig_##f); \
+ } \
+ } while (0)
+ fd_init_copy(rset);
+ fd_init_copy(wset);
+ fd_init_copy(eset);
+#undef fd_init_copy
+
+ return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
+}
+
+#ifdef USE_POLL
+
+/* The same with linux kernel. TODO: make platform independent definition. */
+#define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR)
+#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR)
+#define POLLEX_SET (POLLPRI)
+
+#ifndef POLLERR_SET /* defined for FreeBSD for now */
+# define POLLERR_SET (0)
#endif
static int
-do_select(int n, fd_set *read, fd_set *write, fd_set *except,
- struct timeval *timeout)
+wait_for_single_fd_blocking_region(rb_thread_t *th, struct pollfd *fds, nfds_t nfds,
+ rb_hrtime_t *const to, volatile int *lerrno)
{
- int result, lerrno;
- fd_set orig_read, orig_write, orig_except;
-#if defined __GNUC__ && defined __x86_64__
-#define FAKE_FD_ZERO(f) (*(int *)&(f)=0) /* suppress lots of warnings */
-#else
-#define FAKE_FD_ZERO(f) ((void)0)
-#endif
+ struct timespec ts;
+ volatile int result = 0;
-#ifndef linux
- double limit = 0;
- struct timeval wait_rest;
-# if defined(__CYGWIN__) || defined(_WIN32)
- struct timeval start_time;
-# endif
+ *lerrno = 0;
+ BLOCKING_REGION(th, {
+ if (!RUBY_VM_INTERRUPTED(th->ec)) {
+ result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, to), 0);
+ if (result < 0) *lerrno = errno;
+ }
+ }, ubf_select, th, TRUE);
+ return result;
+}
- if (timeout) {
-# if defined(__CYGWIN__) || defined(_WIN32)
- gettimeofday(&start_time, NULL);
- limit = (double)start_time.tv_sec + (double)start_time.tv_usec*1e-6;
-# else
- limit = timeofday();
-# endif
- limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6;
- wait_rest = *timeout;
- timeout = &wait_rest;
- }
-#endif
+/*
+ * returns a mask of events
+ */
+static int
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
+{
+ struct pollfd fds[1] = {{
+ .fd = fd,
+ .events = (short)events,
+ .revents = 0,
+ }};
+ volatile int result = 0;
+ nfds_t nfds;
+ struct rb_io_blocking_operation blocking_operation;
+ enum ruby_tag_type state;
+ volatile int lerrno;
- if (read) orig_read = *read; else FAKE_FD_ZERO(orig_read);
- if (write) orig_write = *write; else FAKE_FD_ZERO(orig_write);
- if (except) orig_except = *except; else FAKE_FD_ZERO(orig_except);
+ RUBY_ASSERT(th);
+ rb_execution_context_t *ec = th->ec;
-#undef FAKE_FD_ZERO
+ if (io) {
+ blocking_operation.ec = ec;
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+ }
- retry:
- lerrno = 0;
+ if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
+ // fd is readable
+ state = 0;
+ fds[0].revents = events;
+ errno = 0;
+ }
+ else {
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ rb_hrtime_t *to, rel, end = 0;
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ timeout_prepare(&to, &rel, &end, timeout);
+ do {
+ nfds = numberof(fds);
+ result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);
+
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ } while (wait_retryable(&result, lerrno, to, end));
+
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ }
-#if defined(__CYGWIN__) || defined(_WIN32)
- {
- int finish = 0;
- /* polling duration: 100ms */
- struct timeval wait_100ms, *wait;
- wait_100ms.tv_sec = 0;
- wait_100ms.tv_usec = 100 * 1000; /* 100 ms */
-
- do {
- wait = (timeout == 0 || cmp_tv(&wait_100ms, timeout) > 0) ? &wait_100ms : timeout;
- BLOCKING_REGION({
- do {
- result = select(n, read, write, except, wait);
- if (result < 0) lerrno = errno;
- if (result != 0) break;
-
- if (read) *read = orig_read;
- if (write) *write = orig_write;
- if (except) *except = orig_except;
- wait = &wait_100ms;
- if (timeout) {
- struct timeval elapsed;
- gettimeofday(&elapsed, NULL);
- subtract_tv(&elapsed, &start_time);
- if (!subtract_tv(timeout, &elapsed)) {
- finish = 1;
- break;
- }
- if (cmp_tv(&wait_100ms, timeout) < 0) wait = timeout;
- }
- } while (__th->interrupt_flag == 0);
- }, 0, 0);
- } while (result == 0 && !finish);
+ EC_POP_TAG();
}
-#else
- BLOCKING_REGION({
- result = select(n, read, write, except, timeout);
- if (result < 0) lerrno = errno;
- }, ubf_select, GET_THREAD());
-#endif
- errno = lerrno;
+ if (io) {
+ rb_io_blocking_operation_exit(io, &blocking_operation);
+ }
+
+ if (state) {
+ EC_JUMP_TAG(ec, state);
+ }
if (result < 0) {
- switch (errno) {
- case EINTR:
-#ifdef ERESTART
- case ERESTART:
-#endif
- if (read) *read = orig_read;
- if (write) *write = orig_write;
- if (except) *except = orig_except;
-#ifndef linux
- if (timeout) {
- double d = limit - timeofday();
-
- wait_rest.tv_sec = (unsigned int)d;
- wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6);
- if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0;
- if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0;
- }
-#endif
- goto retry;
- default:
- break;
- }
+ errno = lerrno;
+ return -1;
}
+
+ if (fds[0].revents & POLLNVAL) {
+ errno = EBADF;
+ return -1;
+ }
+
+ /*
+ * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit.
+ * Therefore we need to fix it up.
+ */
+ result = 0;
+ if (fds[0].revents & POLLIN_SET)
+ result |= RB_WAITFD_IN;
+ if (fds[0].revents & POLLOUT_SET)
+ result |= RB_WAITFD_OUT;
+ if (fds[0].revents & POLLEX_SET)
+ result |= RB_WAITFD_PRI;
+
+ /* all requested events are ready if there is an error */
+ if (fds[0].revents & POLLERR_SET)
+ result |= events;
+
return result;
}
+#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
+struct select_args {
+ struct rb_io *io;
+ struct rb_io_blocking_operation *blocking_operation;
+
+ union {
+ int fd;
+ int error;
+ } as;
+ rb_fdset_t *read;
+ rb_fdset_t *write;
+ rb_fdset_t *except;
+ struct timeval *tv;
+};
-static void
-rb_thread_wait_fd_rw(int fd, int read)
+static VALUE
+select_single(VALUE ptr)
{
- int result = 0;
- thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write");
+ struct select_args *args = (struct select_args *)ptr;
+ int r;
- if (fd < 0) {
- rb_raise(rb_eIOError, "closed stream");
+ r = rb_thread_fd_select(args->as.fd + 1,
+ args->read, args->write, args->except, args->tv);
+ if (r == -1)
+ args->as.error = errno;
+ if (r > 0) {
+ r = 0;
+ if (args->read && rb_fd_isset(args->as.fd, args->read))
+ r |= RB_WAITFD_IN;
+ if (args->write && rb_fd_isset(args->as.fd, args->write))
+ r |= RB_WAITFD_OUT;
+ if (args->except && rb_fd_isset(args->as.fd, args->except))
+ r |= RB_WAITFD_PRI;
}
- if (rb_thread_alone()) return;
- while (result <= 0) {
- rb_fdset_t set;
- rb_fd_init(&set);
- FD_SET(fd, &set);
-
- if (read) {
- result = do_select(fd + 1, rb_fd_ptr(&set), 0, 0, 0);
- }
- else {
- result = do_select(fd + 1, 0, rb_fd_ptr(&set), 0, 0);
- }
+ return (VALUE)r;
+}
- rb_fd_term(&set);
+static VALUE
+select_single_cleanup(VALUE ptr)
+{
+ struct select_args *args = (struct select_args *)ptr;
- if (result < 0) {
- rb_sys_fail(0);
- }
+ if (args->blocking_operation) {
+ rb_io_blocking_operation_exit(args->io, args->blocking_operation);
}
- thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write");
-}
+ if (args->read) rb_fd_term(args->read);
+ if (args->write) rb_fd_term(args->write);
+ if (args->except) rb_fd_term(args->except);
-void
-rb_thread_wait_fd(int fd)
-{
- rb_thread_wait_fd_rw(fd, 1);
+ return (VALUE)-1;
}
-int
-rb_thread_fd_writable(int fd)
+static rb_fdset_t *
+init_set_fd(int fd, rb_fdset_t *fds)
{
- rb_thread_wait_fd_rw(fd, 0);
- return TRUE;
+ if (fd < 0) {
+ return 0;
+ }
+ rb_fd_init(fds);
+ rb_fd_set(fd, fds);
+
+ return fds;
}
-int
-rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except,
- struct timeval *timeout)
+static int
+thread_io_wait(rb_thread_t *th, struct rb_io *io, int fd, int events, struct timeval *timeout)
{
- if (!read && !write && !except) {
- if (!timeout) {
- rb_thread_sleep_forever();
- return 0;
- }
- rb_thread_wait_for(*timeout);
- return 0;
+ rb_fdset_t rfds, wfds, efds;
+ struct select_args args;
+ VALUE ptr = (VALUE)&args;
+
+ struct rb_io_blocking_operation blocking_operation;
+ if (io) {
+ args.io = io;
+ blocking_operation.ec = th->ec;
+ rb_io_blocking_operation_enter(io, &blocking_operation);
+ args.blocking_operation = &blocking_operation;
}
else {
- return do_select(max, read, write, except, timeout);
+ args.io = NULL;
+ blocking_operation.ec = NULL;
+ args.blocking_operation = NULL;
}
-}
+ args.as.fd = fd;
+ args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
+ args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
+ args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
+ args.tv = timeout;
-int
-rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except,
- struct timeval *timeout)
-{
- fd_set *r = NULL, *w = NULL, *e = NULL;
+ int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
+ if (result == -1)
+ errno = args.as.error;
- if (!read && !write && !except) {
- if (!timeout) {
- rb_thread_sleep_forever();
- return 0;
- }
- rb_thread_wait_for(*timeout);
- return 0;
- }
+ return result;
+}
+#endif /* ! USE_POLL */
- if (read) {
- rb_fd_resize(max - 1, read);
- r = rb_fd_ptr(read);
- }
- if (write) {
- rb_fd_resize(max - 1, write);
- w = rb_fd_ptr(write);
- }
- if (except) {
- rb_fd_resize(max - 1, except);
- e = rb_fd_ptr(except);
- }
- return do_select(max, r, w, e, timeout);
+int
+rb_thread_wait_for_single_fd(rb_thread_t *th, int fd, int events, struct timeval *timeout)
+{
+ return thread_io_wait(th, NULL, fd, events, timeout);
}
+int
+rb_thread_io_wait(rb_thread_t *th, struct rb_io *io, int events, struct timeval * timeout)
+{
+ return thread_io_wait(th, io, io->fd, events, timeout);
+}
/*
* for GC
@@ -2603,65 +4804,91 @@ void
rb_gc_set_stack_end(VALUE **stack_end_p)
{
VALUE stack_end;
+COMPILER_WARNING_PUSH
+#if RBIMPL_COMPILER_IS(GCC)
+COMPILER_WARNING_IGNORED(-Wdangling-pointer);
+#endif
*stack_end_p = &stack_end;
+COMPILER_WARNING_POP
}
#endif
-void
-rb_gc_save_machine_context(rb_thread_t *th)
-{
- FLUSH_REGISTER_WINDOWS;
-#ifdef __ia64
- th->machine_register_stack_end = rb_ia64_bsp();
-#endif
- setjmp(th->machine_regs);
-}
-
/*
*
*/
-int rb_get_next_signal(void);
+void
+rb_threadptr_check_signal(rb_thread_t *mth)
+{
+ /* mth must be main_thread */
+ if (rb_signal_buff_size() > 0) {
+ /* wakeup main thread */
+ threadptr_trap_interrupt(mth);
+ }
+}
static void
-timer_thread_function(void *arg)
+async_bug_fd(const char *mesg, int errno_arg, int fd)
{
- rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */
- int sig;
- rb_thread_t *mth;
-
- /* for time slice */
- RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread);
-
- /* check signal */
- mth = vm->main_thread;
- if (!mth->exec_signal && (sig = rb_get_next_signal()) > 0) {
- enum rb_thread_status prev_status = mth->status;
- thread_debug("main_thread: %s, sig: %d\n",
- thread_status_name(prev_status), sig);
- mth->exec_signal = sig;
- if (mth->status != THREAD_KILLED) mth->status = THREAD_RUNNABLE;
- rb_threadptr_interrupt(mth);
- mth->status = prev_status;
+ char buff[64];
+ size_t n = strlcpy(buff, mesg, sizeof(buff));
+ if (n < sizeof(buff)-3) {
+ ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
}
+ rb_async_bug_errno(buff, errno_arg);
+}
-#if 0
- /* prove profiler */
- if (vm->prove_profile.enable) {
- rb_thread_t *th = vm->running_thread;
+/* VM-dependent API is not available for this function */
+static int
+consume_communication_pipe(int fd)
+{
+#if USE_EVENTFD
+ uint64_t buff[1];
+#else
+ /* buffer can be shared because no one refers to them. */
+ static char buff[1024];
+#endif
+ ssize_t result;
+ int ret = FALSE; /* for rb_sigwait_sleep */
- if (vm->during_gc) {
- /* GC prove profiling */
- }
- }
+ while (1) {
+ result = read(fd, buff, sizeof(buff));
+#if USE_EVENTFD
+ RUBY_DEBUG_LOG("resultf:%d buff:%lu", (int)result, (unsigned long)buff[0]);
+#else
+ RUBY_DEBUG_LOG("result:%d", (int)result);
#endif
+ if (result > 0) {
+ ret = TRUE;
+ if (USE_EVENTFD || result < (ssize_t)sizeof(buff)) {
+ return ret;
+ }
+ }
+ else if (result == 0) {
+ return ret;
+ }
+ else if (result < 0) {
+ int e = errno;
+ switch (e) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return ret;
+ default:
+ async_bug_fd("consume_communication_pipe: read", e, fd);
+ }
+ }
+ }
}
void
rb_thread_stop_timer_thread(void)
{
- if (timer_thread_id && native_stop_timer_thread()) {
- native_reset_timer_thread();
+ if (TIMER_THREAD_CREATED_P() && native_stop_timer_thread()) {
+ native_reset_timer_thread();
}
}
@@ -2682,113 +4909,176 @@ static int
clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy)
{
int i;
- VALUE lines = (VALUE)val;
+ VALUE coverage = (VALUE)val;
+ VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
+ VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
- for (i = 0; i < RARRAY_LEN(lines); i++) {
- if (RARRAY_PTR(lines)[i] != Qnil) {
- RARRAY_PTR(lines)[i] = INT2FIX(0);
- }
+ if (lines) {
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
+ rb_ary_clear(lines);
+ }
+ else {
+ int i;
+ for (i = 0; i < RARRAY_LEN(lines); i++) {
+ if (RARRAY_AREF(lines, i) != Qnil)
+ RARRAY_ASET(lines, i, INT2FIX(0));
+ }
+ }
+ }
+ if (branches) {
+ VALUE counters = RARRAY_AREF(branches, 1);
+ for (i = 0; i < RARRAY_LEN(counters); i++) {
+ RARRAY_ASET(counters, i, INT2FIX(0));
+ }
}
+
return ST_CONTINUE;
}
-static void
-clear_coverage(void)
+void
+rb_clear_coverages(void)
{
- extern VALUE rb_get_coverages(void);
VALUE coverages = rb_get_coverages();
if (RTEST(coverages)) {
- st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0);
+ rb_hash_foreach(coverages, clear_coverage_i, 0);
}
}
+#if defined(HAVE_WORKING_FORK)
+
static void
-rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t))
+rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const rb_thread_t *))
{
- rb_thread_t *th = GET_THREAD();
+ rb_thread_t *i = 0;
rb_vm_t *vm = th->vm;
- VALUE thval = th->self;
- vm->main_thread = th;
+ rb_ractor_t *r = th->ractor;
+ vm->ractor.main_ractor = r;
+ vm->ractor.main_thread = th;
+ r->threads.main = th;
+ r->status_ = ractor_created;
+
+ thread_sched_atfork(TH_SCHED(th));
+ ubf_list_atfork();
+ rb_signal_atfork();
+
+ // OK. Only this thread accesses:
+ ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
+ if (r != vm->ractor.main_ractor) {
+ rb_ractor_terminate_atfork(vm, r);
+ }
+ ccan_list_for_each(&r->threads.set, i, lt_node) {
+ atfork(i, th);
+ }
+ }
+ rb_vm_living_threads_init(vm);
+
+ rb_ractor_atfork(vm, th);
+ rb_vm_postponed_job_atfork();
+
+ /* may be held by any thread in parent */
+ rb_native_mutex_initialize(&th->interrupt_lock);
+ ccan_list_head_init(&th->interrupt_exec_tasks);
- native_mutex_reinitialize_atfork(&th->vm->global_vm_lock);
- st_foreach(vm->living_threads, atfork, (st_data_t)th);
- st_clear(vm->living_threads);
- st_insert(vm->living_threads, thval, (st_data_t)th->thread_id);
- vm->sleeper = 0;
- clear_coverage();
+ vm->fork_gen++;
+ rb_ractor_sleeper_threads_clear(th->ractor);
+ rb_clear_coverages();
+
+ // restart timer thread (timer threads access to `vm->waitpid_lock` and so on.
+ rb_thread_reset_timer_thread();
+ rb_thread_start_timer_thread();
+
+ VM_ASSERT(vm->ractor.blocking_cnt == 0);
+ VM_ASSERT(vm->ractor.cnt == 1);
}
-static int
-terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th)
+static void
+terminate_atfork_i(rb_thread_t *th, const rb_thread_t *current_th)
{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th);
+ if (th != current_th) {
+ // Clear the scheduler as it is no longer operational:
+ th->scheduler = Qnil;
- if (th != (rb_thread_t *)current_th) {
- if (th->keeping_mutexes) {
- rb_mutex_abandon_all(th->keeping_mutexes);
- }
- th->keeping_mutexes = NULL;
- thread_cleanup_func(th);
+ rb_native_mutex_initialize(&th->interrupt_lock);
+ rb_mutex_abandon_keeping_mutexes(th);
+ rb_mutex_abandon_locking_mutex(th);
+ thread_cleanup_func(th, TRUE);
}
- return ST_CONTINUE;
}
+void rb_fiber_atfork(rb_thread_t *);
void
rb_thread_atfork(void)
{
- rb_thread_atfork_internal(terminate_atfork_i);
+ rb_thread_t *th = GET_THREAD();
+ rb_threadptr_pending_interrupt_clear(th);
+ rb_thread_atfork_internal(th, terminate_atfork_i);
+ th->join_list = NULL;
+ th->scheduler = Qnil;
+ rb_fiber_atfork(th);
+
+ /* We don't want reproduce CVE-2003-0900. */
rb_reset_random_seed();
}
-static int
-terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th)
+static void
+terminate_atfork_before_exec_i(rb_thread_t *th, const rb_thread_t *current_th)
{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th);
-
- if (th != (rb_thread_t *)current_th) {
- thread_cleanup_func_before_exec(th);
+ if (th != current_th) {
+ thread_cleanup_func_before_exec(th);
}
- return ST_CONTINUE;
}
void
rb_thread_atfork_before_exec(void)
{
- rb_thread_atfork_internal(terminate_atfork_before_exec_i);
+ rb_thread_t *th = GET_THREAD();
+ rb_thread_atfork_internal(th, terminate_atfork_before_exec_i);
+}
+#else
+void
+rb_thread_atfork(void)
+{
+}
+
+void
+rb_thread_atfork_before_exec(void)
+{
}
+#endif
struct thgroup {
int enclosed;
- VALUE group;
};
-static size_t
-thgroup_memsize(const void *ptr)
-{
- return ptr ? sizeof(struct thgroup) : 0;
-}
-
static const rb_data_type_t thgroup_data_type = {
"thgroup",
- NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,
+ {
+ 0,
+ RUBY_TYPED_DEFAULT_FREE,
+ NULL, // No external memory to report
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE
};
/*
* Document-class: ThreadGroup
*
- * <code>ThreadGroup</code> provides a means of keeping track of a number of
- * threads as a group. A <code>Thread</code> can belong to only one
- * <code>ThreadGroup</code> at a time; adding a thread to a new group will
- * remove it from any previous group.
+ * ThreadGroup provides a means of keeping track of a number of threads as a
+ * group.
+ *
+ * A given Thread object can only belong to one ThreadGroup at a time; adding
+ * a thread to a new group will remove it from any previous group.
*
* Newly created threads belong to the same group as the thread from which they
* were created.
*/
+/*
+ * Document-const: Default
+ *
+ * The default ThreadGroup created when Ruby starts; all Threads belong to it
+ * by default.
+ */
static VALUE
thgroup_s_alloc(VALUE klass)
{
@@ -2797,37 +5087,15 @@ thgroup_s_alloc(VALUE klass)
group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data);
data->enclosed = 0;
- data->group = group;
return group;
}
-struct thgroup_list_params {
- VALUE ary;
- VALUE group;
-};
-
-static int
-thgroup_list_i(st_data_t key, st_data_t val, st_data_t data)
-{
- VALUE thread = (VALUE)key;
- VALUE ary = ((struct thgroup_list_params *)data)->ary;
- VALUE group = ((struct thgroup_list_params *)data)->group;
- rb_thread_t *th;
- GetThreadPtr(thread, th);
-
- if (th->thgroup == group) {
- rb_ary_push(ary, thread);
- }
- return ST_CONTINUE;
-}
-
/*
* call-seq:
- * thgrp.list => array
+ * thgrp.list -> array
*
- * Returns an array of all existing <code>Thread</code> objects that belong to
- * this group.
+ * Returns an array of all existing Thread objects that belong to this group.
*
* ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>]
*/
@@ -2836,31 +5104,32 @@ static VALUE
thgroup_list(VALUE group)
{
VALUE ary = rb_ary_new();
- struct thgroup_list_params param;
+ rb_thread_t *th = 0;
+ rb_ractor_t *r = GET_RACTOR();
- param.ary = ary;
- param.group = group;
- st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param);
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ if (th->thgroup == group) {
+ rb_ary_push(ary, th->self);
+ }
+ }
return ary;
}
/*
* call-seq:
- * thgrp.enclose => thgrp
+ * thgrp.enclose -> thgrp
*
* Prevents threads from being added to or removed from the receiving
- * <code>ThreadGroup</code>. New threads can still be started in an enclosed
- * <code>ThreadGroup</code>.
+ * ThreadGroup.
+ *
+ * New threads can still be started in an enclosed ThreadGroup.
*
* ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914>
- * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep>
- * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4>
+ * thr = Thread.new { Thread.stop } #=> #<Thread:0x402a7210 sleep>
+ * tg = ThreadGroup.new #=> #<ThreadGroup:0x402752d4>
* tg.add thr
- *
- * <em>produces:</em>
- *
- * ThreadError: can't move from the enclosed thread group
+ * #=> ThreadError: can't move from the enclosed thread group
*/
static VALUE
@@ -2877,10 +5146,9 @@ thgroup_enclose(VALUE group)
/*
* call-seq:
- * thgrp.enclosed? => true or false
+ * thgrp.enclosed? -> true or false
*
- * Returns <code>true</code> if <em>thgrp</em> is enclosed. See also
- * ThreadGroup#enclose.
+ * Returns +true+ if the +thgrp+ is enclosed. See also ThreadGroup#enclose.
*/
static VALUE
@@ -2889,18 +5157,16 @@ thgroup_enclosed_p(VALUE group)
struct thgroup *data;
TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
- if (data->enclosed)
- return Qtrue;
- return Qfalse;
+ return RBOOL(data->enclosed);
}
/*
* call-seq:
- * thgrp.add(thread) => thgrp
+ * thgrp.add(thread) -> thgrp
*
- * Adds the given <em>thread</em> to this group, removing it from any other
- * group to which it may have previously belonged.
+ * Adds the given +thread+ to this group, removing it from any other
+ * group to which it may have previously been a member.
*
* puts "Initial group is #{ThreadGroup::Default.list}"
* tg = ThreadGroup.new
@@ -2912,7 +5178,7 @@ thgroup_enclosed_p(VALUE group)
* puts "Initial group now #{ThreadGroup::Default.list}"
* puts "tg group now #{tg.list}"
*
- * <em>produces:</em>
+ * This will produce:
*
* Initial group is #<Thread:0x401bdf4c>
* t1 is #<Thread:0x401b3c90>
@@ -2924,515 +5190,172 @@ thgroup_enclosed_p(VALUE group)
static VALUE
thgroup_add(VALUE group, VALUE thread)
{
- rb_thread_t *th;
+ rb_thread_t *target_th = rb_thread_ptr(thread);
struct thgroup *data;
- rb_secure(4);
- GetThreadPtr(thread, th);
-
if (OBJ_FROZEN(group)) {
- rb_raise(rb_eThreadError, "can't move to the frozen thread group");
+ rb_raise(rb_eThreadError, "can't move to the frozen thread group");
}
TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data);
if (data->enclosed) {
- rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
- }
-
- if (!th->thgroup) {
- return Qnil;
+ rb_raise(rb_eThreadError, "can't move to the enclosed thread group");
}
- if (OBJ_FROZEN(th->thgroup)) {
- rb_raise(rb_eThreadError, "can't move from the frozen thread group");
+ if (OBJ_FROZEN(target_th->thgroup)) {
+ rb_raise(rb_eThreadError, "can't move from the frozen thread group");
}
- TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data);
+ TypedData_Get_Struct(target_th->thgroup, struct thgroup, &thgroup_data_type, data);
if (data->enclosed) {
- rb_raise(rb_eThreadError,
- "can't move from the enclosed thread group");
+ rb_raise(rb_eThreadError,
+ "can't move from the enclosed thread group");
}
- th->thgroup = group;
+ target_th->thgroup = group;
return group;
}
-
/*
- * Document-class: Mutex
- *
- * Mutex implements a simple semaphore that can be used to coordinate access to
- * shared data from multiple concurrent threads.
- *
- * Example:
- *
- * require 'thread'
- * semaphore = Mutex.new
- *
- * a = Thread.new {
- * semaphore.synchronize {
- * # access shared resource
- * }
- * }
- *
- * b = Thread.new {
- * semaphore.synchronize {
- * # access shared resource
- * }
- * }
- *
+ * Document-class: ThreadShield
*/
-
-#define GetMutexPtr(obj, tobj) \
- TypedData_Get_Struct(obj, mutex_t, &mutex_data_type, tobj)
-
-static const char *mutex_unlock(mutex_t *mutex, rb_thread_t volatile *th);
-
-#define mutex_mark NULL
-
static void
-mutex_free(void *ptr)
+thread_shield_mark(void *ptr)
{
- if (ptr) {
- mutex_t *mutex = ptr;
- if (mutex->th) {
- /* rb_warn("free locked mutex"); */
- const char *err = mutex_unlock(mutex, mutex->th);
- if (err) rb_bug("%s", err);
- }
- native_mutex_destroy(&mutex->lock);
- native_cond_destroy(&mutex->cond);
- }
- ruby_xfree(ptr);
-}
-
-static size_t
-mutex_memsize(const void *ptr)
-{
- return ptr ? sizeof(mutex_t) : 0;
+ rb_gc_mark((VALUE)ptr);
}
-static const rb_data_type_t mutex_data_type = {
- "mutex",
- mutex_mark, mutex_free, mutex_memsize,
+static const rb_data_type_t thread_shield_data_type = {
+ "thread_shield",
+ {thread_shield_mark, 0, 0,},
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
};
static VALUE
-mutex_alloc(VALUE klass)
-{
- VALUE volatile obj;
- mutex_t *mutex;
-
- obj = TypedData_Make_Struct(klass, mutex_t, &mutex_data_type, mutex);
- native_mutex_initialize(&mutex->lock);
- native_cond_initialize(&mutex->cond);
- return obj;
-}
-
-/*
- * call-seq:
- * Mutex.new => mutex
- *
- * Creates a new Mutex
- */
-static VALUE
-mutex_initialize(VALUE self)
+thread_shield_alloc(VALUE klass)
{
- return self;
+ return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0));
}
-VALUE
-rb_mutex_new(void)
+#define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type))
+#define THREAD_SHIELD_WAITING_MASK (((FL_USER19-1)&~(FL_USER0-1))|FL_USER19)
+#define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT)
+#define THREAD_SHIELD_WAITING_MAX (THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)
+STATIC_ASSERT(THREAD_SHIELD_WAITING_MAX, THREAD_SHIELD_WAITING_MAX <= UINT_MAX);
+static inline unsigned int
+rb_thread_shield_waiting(VALUE b)
{
- return mutex_alloc(rb_cMutex);
+ return ((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT);
}
-/*
- * call-seq:
- * mutex.locked? => true or false
- *
- * Returns +true+ if this lock is currently held by some thread.
- */
-VALUE
-rb_mutex_locked_p(VALUE self)
+static inline void
+rb_thread_shield_waiting_inc(VALUE b)
{
- mutex_t *mutex;
- GetMutexPtr(self, mutex);
- return mutex->th ? Qtrue : Qfalse;
+ unsigned int w = rb_thread_shield_waiting(b);
+ w++;
+ if (w > THREAD_SHIELD_WAITING_MAX)
+ rb_raise(rb_eRuntimeError, "waiting count overflow");
+ RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
+ RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
}
-static void
-mutex_locked(rb_thread_t *th, VALUE self)
+static inline void
+rb_thread_shield_waiting_dec(VALUE b)
{
- mutex_t *mutex;
- GetMutexPtr(self, mutex);
-
- if (th->keeping_mutexes) {
- mutex->next_mutex = th->keeping_mutexes;
- }
- th->keeping_mutexes = mutex;
+ unsigned int w = rb_thread_shield_waiting(b);
+ if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow");
+ w--;
+ RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK;
+ RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT);
}
-/*
- * call-seq:
- * mutex.try_lock => true or false
- *
- * Attempts to obtain the lock and returns immediately. Returns +true+ if the
- * lock was granted.
- */
VALUE
-rb_mutex_trylock(VALUE self)
-{
- mutex_t *mutex;
- VALUE locked = Qfalse;
- GetMutexPtr(self, mutex);
-
- native_mutex_lock(&mutex->lock);
- if (mutex->th == 0) {
- mutex->th = GET_THREAD();
- locked = Qtrue;
-
- mutex_locked(GET_THREAD(), self);
- }
- native_mutex_unlock(&mutex->lock);
-
- return locked;
-}
-
-static int
-lock_func(rb_thread_t *th, mutex_t *mutex, int last_thread)
+rb_thread_shield_new(void)
{
- int interrupted = 0;
-#if 0 /* for debug */
- native_thread_yield();
-#endif
-
- native_mutex_lock(&mutex->lock);
- th->transition_for_lock = 0;
- while (mutex->th || (mutex->th = th, 0)) {
- if (last_thread) {
- interrupted = 2;
- break;
- }
-
- mutex->cond_waiting++;
- native_cond_wait(&mutex->cond, &mutex->lock);
- mutex->cond_notified--;
-
- if (RUBY_VM_INTERRUPTED(th)) {
- interrupted = 1;
- break;
- }
- }
- th->transition_for_lock = 1;
- native_mutex_unlock(&mutex->lock);
-
- if (interrupted == 2) native_thread_yield();
-#if 0 /* for debug */
- native_thread_yield();
-#endif
-
- return interrupted;
-}
-
-static void
-lock_interrupt(void *ptr)
-{
- mutex_t *mutex = (mutex_t *)ptr;
- native_mutex_lock(&mutex->lock);
- if (mutex->cond_waiting > 0) {
- native_cond_broadcast(&mutex->cond);
- mutex->cond_notified = mutex->cond_waiting;
- mutex->cond_waiting = 0;
- }
- native_mutex_unlock(&mutex->lock);
-}
-
-/*
- * call-seq:
- * mutex.lock => self
- *
- * Attempts to grab the lock and waits if it isn't available.
- * Raises +ThreadError+ if +mutex+ was locked by the current thread.
- */
-VALUE
-rb_mutex_lock(VALUE self)
-{
-
- if (rb_mutex_trylock(self) == Qfalse) {
- mutex_t *mutex;
- rb_thread_t *th = GET_THREAD();
- GetMutexPtr(self, mutex);
-
- if (mutex->th == GET_THREAD()) {
- rb_raise(rb_eThreadError, "deadlock; recursive locking");
- }
-
- while (mutex->th != th) {
- int interrupted;
- enum rb_thread_status prev_status = th->status;
- int last_thread = 0;
- struct rb_unblock_callback oldubf;
-
- set_unblock_function(th, lock_interrupt, mutex, &oldubf);
- th->status = THREAD_STOPPED_FOREVER;
- th->vm->sleeper++;
- th->locking_mutex = self;
- if (vm_living_thread_num(th->vm) == th->vm->sleeper) {
- last_thread = 1;
- }
-
- th->transition_for_lock = 1;
- BLOCKING_REGION_CORE({
- interrupted = lock_func(th, mutex, last_thread);
- });
- th->transition_for_lock = 0;
- remove_signal_thread_list(th);
- reset_unblock_function(th, &oldubf);
-
- th->locking_mutex = Qfalse;
- if (mutex->th && interrupted == 2) {
- rb_check_deadlock(th->vm);
- }
- if (th->status == THREAD_STOPPED_FOREVER) {
- th->status = prev_status;
- }
- th->vm->sleeper--;
-
- if (mutex->th == th) mutex_locked(th, self);
-
- if (interrupted) {
- RUBY_VM_CHECK_INTS();
- }
- }
- }
- return self;
+ VALUE thread_shield = thread_shield_alloc(rb_cThreadShield);
+ rb_mutex_lock((VALUE)DATA_PTR(thread_shield));
+ return thread_shield;
}
-static const char *
-mutex_unlock(mutex_t *mutex, rb_thread_t volatile *th)
+bool
+rb_thread_shield_owned(VALUE self)
{
- const char *err = NULL;
- mutex_t *th_mutex;
+ VALUE mutex = GetThreadShieldPtr(self);
+ if (!mutex) return false;
- native_mutex_lock(&mutex->lock);
+ rb_mutex_t *m = mutex_ptr(mutex);
- if (mutex->th == 0) {
- err = "Attempt to unlock a mutex which is not locked";
- }
- else if (mutex->th != th) {
- err = "Attempt to unlock a mutex which is locked by another thread";
- }
- else {
- mutex->th = 0;
- if (mutex->cond_waiting > 0) {
- /* waiting thread */
- native_cond_signal(&mutex->cond);
- mutex->cond_waiting--;
- mutex->cond_notified++;
- }
- }
-
- native_mutex_unlock(&mutex->lock);
-
- if (!err) {
- th_mutex = th->keeping_mutexes;
- if (th_mutex == mutex) {
- th->keeping_mutexes = mutex->next_mutex;
- }
- else {
- while (1) {
- mutex_t *tmp_mutex;
- tmp_mutex = th_mutex->next_mutex;
- if (tmp_mutex == mutex) {
- th_mutex->next_mutex = tmp_mutex->next_mutex;
- break;
- }
- th_mutex = tmp_mutex;
- }
- }
- mutex->next_mutex = NULL;
- }
-
- return err;
+ return m->ec_serial == rb_ec_serial(GET_EC());
}
/*
- * call-seq:
- * mutex.unlock => self
+ * Wait a thread shield.
*
- * Releases the lock.
- * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
+ * Returns
+ * true: acquired the thread shield
+ * false: the thread shield was destroyed and no other threads waiting
+ * nil: the thread shield was destroyed but still in use
*/
VALUE
-rb_mutex_unlock(VALUE self)
-{
- const char *err;
- mutex_t *mutex;
- GetMutexPtr(self, mutex);
-
- err = mutex_unlock(mutex, GET_THREAD());
- if (err) rb_raise(rb_eThreadError, "%s", err);
-
- return self;
-}
-
-static void
-rb_mutex_unlock_all(mutex_t *mutexes, rb_thread_t *th)
+rb_thread_shield_wait(VALUE self)
{
- const char *err;
- mutex_t *mutex;
-
- while (mutexes) {
- mutex = mutexes;
- /* rb_warn("mutex #<%p> remains to be locked by terminated thread",
- mutexes); */
- mutexes = mutex->next_mutex;
- err = mutex_unlock(mutex, th);
- if (err) rb_bug("invalid keeping_mutexes: %s", err);
- }
-}
+ VALUE mutex = GetThreadShieldPtr(self);
+ rb_mutex_t *m;
-static void
-rb_mutex_abandon_all(mutex_t *mutexes)
-{
- mutex_t *mutex;
-
- while (mutexes) {
- mutex = mutexes;
- mutexes = mutex->next_mutex;
- mutex->th = 0;
- mutex->next_mutex = 0;
- }
-}
-
-static VALUE
-rb_mutex_sleep_forever(VALUE time)
-{
- rb_thread_sleep_deadly();
- return Qnil;
-}
-
-static VALUE
-rb_mutex_wait_for(VALUE time)
-{
- const struct timeval *t = (struct timeval *)time;
- rb_thread_wait_for(*t);
- return Qnil;
-}
-
-VALUE
-rb_mutex_sleep(VALUE self, VALUE timeout)
-{
- time_t beg, end;
- struct timeval t;
-
- if (!NIL_P(timeout)) {
- t = rb_time_interval(timeout);
- }
- rb_mutex_unlock(self);
- beg = time(0);
- if (NIL_P(timeout)) {
- rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self);
- }
- else {
- rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self);
- }
- end = time(0) - beg;
- return INT2FIX(end);
+ if (!mutex) return Qfalse;
+ m = mutex_ptr(mutex);
+ if (m->ec_serial == rb_ec_serial(GET_EC())) return Qnil;
+ rb_thread_shield_waiting_inc(self);
+ rb_mutex_lock(mutex);
+ rb_thread_shield_waiting_dec(self);
+ if (DATA_PTR(self)) return Qtrue;
+ rb_mutex_unlock(mutex);
+ return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse;
}
-/*
- * call-seq:
- * mutex.sleep(timeout = nil) => number
- *
- * Releases the lock and sleeps +timeout+ seconds if it is given and
- * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
- * the current thread.
- */
static VALUE
-mutex_sleep(int argc, VALUE *argv, VALUE self)
+thread_shield_get_mutex(VALUE self)
{
- VALUE timeout;
-
- rb_scan_args(argc, argv, "01", &timeout);
- return rb_mutex_sleep(self, timeout);
+ VALUE mutex = GetThreadShieldPtr(self);
+ if (!mutex)
+ rb_raise(rb_eThreadError, "destroyed thread shield - %p", (void *)self);
+ return mutex;
}
/*
- * call-seq:
- * mutex.synchronize { ... } => result of the block
- *
- * Obtains a lock, runs the block, and releases the lock when the block
- * completes. See the example under +Mutex+.
+ * Release a thread shield, and return true if it has waiting threads.
*/
-
VALUE
-rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
+rb_thread_shield_release(VALUE self)
{
- rb_mutex_lock(mutex);
- return rb_ensure(func, arg, rb_mutex_unlock, mutex);
+ VALUE mutex = thread_shield_get_mutex(self);
+ rb_mutex_unlock(mutex);
+ return RBOOL(rb_thread_shield_waiting(self) > 0);
}
/*
- * Document-class: Barrier
+ * Release and destroy a thread shield, and return true if it has waiting threads.
*/
-static void
-barrier_mark(void *ptr)
-{
- rb_gc_mark((VALUE)ptr);
-}
-
-static const rb_data_type_t barrier_data_type = {
- "barrier",
- barrier_mark, 0, 0,
-};
-
-static VALUE
-barrier_alloc(VALUE klass)
-{
- return TypedData_Wrap_Struct(klass, &barrier_data_type, (void *)mutex_alloc(0));
-}
-
-#define GetBarrierPtr(obj) (VALUE)rb_check_typeddata(obj, &barrier_data_type)
-
VALUE
-rb_barrier_new(void)
+rb_thread_shield_destroy(VALUE self)
{
- VALUE barrier = barrier_alloc(rb_cBarrier);
- rb_mutex_lock((VALUE)DATA_PTR(barrier));
- return barrier;
-}
-
-VALUE
-rb_barrier_wait(VALUE self)
-{
- VALUE mutex = GetBarrierPtr(self);
- mutex_t *m;
-
- if (!mutex) return Qfalse;
- GetMutexPtr(mutex, m);
- if (m->th == GET_THREAD()) return Qfalse;
- rb_mutex_lock(mutex);
- if (DATA_PTR(self)) return Qtrue;
+ VALUE mutex = thread_shield_get_mutex(self);
+ DATA_PTR(self) = 0;
rb_mutex_unlock(mutex);
- return Qfalse;
+ return RBOOL(rb_thread_shield_waiting(self) > 0);
}
-VALUE
-rb_barrier_release(VALUE self)
+static VALUE
+threadptr_recursive_hash(rb_thread_t *th)
{
- return rb_mutex_unlock(GetBarrierPtr(self));
+ return th->ec->local_storage_recursive_hash;
}
-VALUE
-rb_barrier_destroy(VALUE self)
+static void
+threadptr_recursive_hash_set(rb_thread_t *th, VALUE hash)
{
- VALUE mutex = GetBarrierPtr(self);
- DATA_PTR(self) = 0;
- return rb_mutex_unlock(mutex);
+ th->ec->local_storage_recursive_hash = hash;
}
-/* variables for recursive traversals */
-static ID recursive_key;
+ID rb_frame_last_func(void);
/*
* Returns the current "recursive list" used to detect recursion.
@@ -3441,58 +5364,63 @@ static ID recursive_key;
*/
static VALUE
-recursive_list_access(void)
+recursive_list_access(VALUE sym)
{
- volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key);
- VALUE sym = ID2SYM(rb_frame_this_func());
+ rb_thread_t *th = GET_THREAD();
+ VALUE hash = threadptr_recursive_hash(th);
VALUE list;
- if (NIL_P(hash) || TYPE(hash) != T_HASH) {
- hash = rb_hash_new();
- OBJ_UNTRUST(hash);
- rb_thread_local_aset(rb_thread_current(), recursive_key, hash);
- list = Qnil;
+ if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) {
+ hash = rb_ident_hash_new();
+ threadptr_recursive_hash_set(th, hash);
+ list = Qnil;
}
else {
- list = rb_hash_aref(hash, sym);
+ list = rb_hash_aref(hash, sym);
}
- if (NIL_P(list) || TYPE(list) != T_HASH) {
- list = rb_hash_new();
- OBJ_UNTRUST(list);
- rb_hash_aset(hash, sym, list);
+ if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) {
+ list = rb_ident_hash_new();
+ rb_hash_aset(hash, sym, list);
}
return list;
}
/*
- * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already
+ * Returns true if and only if obj (or the pair <obj, paired_obj>) is already
* in the recursion list.
* Assumes the recursion list is valid.
*/
-static VALUE
-recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id)
+static bool
+recursive_check(VALUE list, VALUE obj, VALUE paired_obj_id)
{
- VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef);
- if (pair_list == Qundef)
- return Qfalse;
+#if SIZEOF_LONG == SIZEOF_VOIDP
+ #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other))
+#elif SIZEOF_LONG_LONG == SIZEOF_VOIDP
+ #define OBJ_ID_EQL(obj_id, other) (RB_BIGNUM_TYPE_P((obj_id)) ? \
+ rb_big_eql((obj_id), (other)) : ((obj_id) == (other)))
+#endif
+
+ VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
+ if (UNDEF_P(pair_list))
+ return false;
if (paired_obj_id) {
- if (TYPE(pair_list) != T_HASH) {
- if (pair_list != paired_obj_id)
- return Qfalse;
- }
- else {
- if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
- return Qfalse;
- }
+ if (!RB_TYPE_P(pair_list, T_HASH)) {
+ if (!OBJ_ID_EQL(paired_obj_id, pair_list))
+ return false;
+ }
+ else {
+ if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id)))
+ return false;
+ }
}
- return Qtrue;
+ return true;
}
/*
- * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list.
- * For a single obj_id, it sets list[obj_id] to Qtrue.
- * For a pair, it sets list[obj_id] to paired_obj_id if possible,
- * otherwise list[obj_id] becomes a hash like:
+ * Pushes obj (or the pair <obj, paired_obj>) in the recursion list.
+ * For a single obj, it sets list[obj] to Qtrue.
+ * For a pair, it sets list[obj] to paired_obj_id if possible,
+ * otherwise list[obj] becomes a hash like:
* {paired_obj_id_1 => true, paired_obj_id_2 => true, ... }
* Assumes the recursion list is valid.
*/
@@ -3503,125 +5431,133 @@ recursive_push(VALUE list, VALUE obj, VALUE paired_obj)
VALUE pair_list;
if (!paired_obj) {
- rb_hash_aset(list, obj, Qtrue);
+ rb_hash_aset(list, obj, Qtrue);
}
- else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) {
- rb_hash_aset(list, obj, paired_obj);
+ else if (UNDEF_P(pair_list = rb_hash_lookup2(list, obj, Qundef))) {
+ rb_hash_aset(list, obj, paired_obj);
}
else {
- if (TYPE(pair_list) != T_HASH){
- VALUE other_paired_obj = pair_list;
- pair_list = rb_hash_new();
- OBJ_UNTRUST(pair_list);
- rb_hash_aset(pair_list, other_paired_obj, Qtrue);
- rb_hash_aset(list, obj, pair_list);
- }
- rb_hash_aset(pair_list, paired_obj, Qtrue);
+ if (!RB_TYPE_P(pair_list, T_HASH)){
+ VALUE other_paired_obj = pair_list;
+ pair_list = rb_hash_new();
+ rb_hash_aset(pair_list, other_paired_obj, Qtrue);
+ rb_hash_aset(list, obj, pair_list);
+ }
+ rb_hash_aset(pair_list, paired_obj, Qtrue);
}
}
/*
- * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list.
- * For a pair, if list[obj_id] is a hash, then paired_obj_id is
+ * Pops obj (or the pair <obj, paired_obj>) from the recursion list.
+ * For a pair, if list[obj] is a hash, then paired_obj_id is
* removed from the hash and no attempt is made to simplify
- * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id
+ * list[obj] from {only_one_paired_id => true} to only_one_paired_id
* Assumes the recursion list is valid.
*/
-static void
+static int
recursive_pop(VALUE list, VALUE obj, VALUE paired_obj)
{
if (paired_obj) {
- VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
- if (pair_list == Qundef) {
- VALUE symname = rb_inspect(ID2SYM(rb_frame_this_func()));
- VALUE thrname = rb_inspect(rb_thread_current());
- rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list for %s in %s",
- StringValuePtr(symname), StringValuePtr(thrname));
- }
- if (TYPE(pair_list) == T_HASH) {
- rb_hash_delete(pair_list, paired_obj);
- if (!RHASH_EMPTY_P(pair_list)) {
- return; /* keep hash until is empty */
- }
- }
- }
- rb_hash_delete(list, obj);
+ VALUE pair_list = rb_hash_lookup2(list, obj, Qundef);
+ if (UNDEF_P(pair_list)) {
+ return 0;
+ }
+ if (RB_TYPE_P(pair_list, T_HASH)) {
+ rb_hash_delete_entry(pair_list, paired_obj);
+ if (!RHASH_EMPTY_P(pair_list)) {
+ return 1; /* keep hash until is empty */
+ }
+ }
+ }
+ rb_hash_delete_entry(list, obj);
+ return 1;
}
struct exec_recursive_params {
VALUE (*func) (VALUE, VALUE, int);
VALUE list;
VALUE obj;
- VALUE objid;
VALUE pairid;
VALUE arg;
};
static VALUE
-exec_recursive_i(VALUE tag, struct exec_recursive_params *p)
+exec_recursive_i(RB_BLOCK_CALL_FUNC_ARGLIST(tag, data))
{
- VALUE result = Qundef;
- int state;
-
- recursive_push(p->list, p->objid, p->pairid);
- PUSH_TAG();
- if ((state = EXEC_TAG()) == 0) {
- result = (*p->func)(p->obj, p->arg, FALSE);
- }
- POP_TAG();
- recursive_pop(p->list, p->objid, p->pairid);
- if (state)
- JUMP_TAG(state);
- return result;
+ struct exec_recursive_params *p = (void *)data;
+ return (*p->func)(p->obj, p->arg, FALSE);
}
/*
* Calls func(obj, arg, recursive), where recursive is non-zero if the
* current method is called recursively on obj, or on the pair <obj, pairid>
* If outer is 0, then the innermost func will be called with recursive set
- * to Qtrue, otherwise the outermost func will be called. In the latter case,
+ * to true, otherwise the outermost func will be called. In the latter case,
* all inner func are short-circuited by throw.
* Implementation details: the value thrown is the recursive list which is
- * proper to the current method and unlikely to be catched anywhere else.
+ * proper to the current method and unlikely to be caught anywhere else.
* list[recursive_key] is used as a flag for the outermost call.
*/
static VALUE
-exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer)
+exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer, ID mid)
{
+ VALUE result = Qundef;
+ const VALUE sym = mid ? ID2SYM(mid) : ID2SYM(idNULL);
struct exec_recursive_params p;
int outermost;
- p.list = recursive_list_access();
- p.objid = rb_obj_id(obj);
+ p.list = recursive_list_access(sym);
+ p.obj = obj;
+ p.pairid = pairid;
+ p.arg = arg;
outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0);
- if (recursive_check(p.list, p.objid, pairid)) {
- if (outer && !outermost) {
- rb_throw_obj(p.list, p.list);
- }
- return (*func)(obj, arg, TRUE);
+ if (recursive_check(p.list, p.obj, pairid)) {
+ if (outer && !outermost) {
+ rb_throw_obj(p.list, p.list);
+ }
+ return (*func)(obj, arg, TRUE);
}
else {
- VALUE result = Qundef;
- p.func = func;
- p.obj = obj;
- p.pairid = pairid;
- p.arg = arg;
-
- if (outermost) {
- recursive_push(p.list, ID2SYM(recursive_key), 0);
- result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p);
- recursive_pop(p.list, ID2SYM(recursive_key), 0);
- if (result == p.list) {
- result = (*func)(obj, arg, TRUE);
- }
- }
- else {
- result = exec_recursive_i(0, &p);
- }
- return result;
+ enum ruby_tag_type state;
+
+ p.func = func;
+
+ if (outermost) {
+ recursive_push(p.list, ID2SYM(recursive_key), 0);
+ recursive_push(p.list, p.obj, p.pairid);
+ result = rb_catch_protect(p.list, exec_recursive_i, (VALUE)&p, &state);
+ if (!recursive_pop(p.list, p.obj, p.pairid)) goto invalid;
+ if (!recursive_pop(p.list, ID2SYM(recursive_key), 0)) goto invalid;
+ if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
+ if (result == p.list) {
+ result = (*func)(obj, arg, TRUE);
+ }
+ }
+ else {
+ volatile VALUE ret = Qundef;
+ recursive_push(p.list, p.obj, p.pairid);
+ EC_PUSH_TAG(GET_EC());
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ ret = (*func)(obj, arg, FALSE);
+ }
+ EC_POP_TAG();
+ if (!recursive_pop(p.list, p.obj, p.pairid)) {
+ goto invalid;
+ }
+ if (state != TAG_NONE) EC_JUMP_TAG(GET_EC(), state);
+ result = ret;
+ }
}
+ *(volatile struct exec_recursive_params *)&p;
+ return result;
+
+ invalid:
+ rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list "
+ "for %+"PRIsVALUE" in %+"PRIsVALUE,
+ sym, rb_thread_current());
+ UNREACHABLE_RETURN(Qundef);
}
/*
@@ -3632,7 +5568,7 @@ exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE
VALUE
rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
{
- return exec_recursive(func, obj, 0, arg, 0);
+ return exec_recursive(func, obj, 0, arg, 0, rb_frame_last_func());
}
/*
@@ -3643,483 +5579,122 @@ rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
VALUE
rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
{
- return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0);
+ return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 0, rb_frame_last_func());
}
/*
* If recursion is detected on the current method and obj, the outermost
- * func will be called with (obj, arg, Qtrue). All inner func will be
+ * func will be called with (obj, arg, true). All inner func will be
* short-circuited using throw.
*/
VALUE
rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg)
{
- return exec_recursive(func, obj, 0, arg, 1);
-}
-
-/* tracer */
-
-static rb_event_hook_t *
-alloc_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data)
-{
- rb_event_hook_t *hook = ALLOC(rb_event_hook_t);
- hook->func = func;
- hook->flag = events;
- hook->data = data;
- return hook;
+ return exec_recursive(func, obj, 0, arg, 1, rb_frame_last_func());
}
-static void
-thread_reset_event_flags(rb_thread_t *th)
-{
- rb_event_hook_t *hook = th->event_hooks;
- rb_event_flag_t flag = th->event_flags & RUBY_EVENT_VM;
-
- while (hook) {
- flag |= hook->flag;
- hook = hook->next;
- }
-}
-
-static void
-rb_threadptr_add_event_hook(rb_thread_t *th,
- rb_event_hook_func_t func, rb_event_flag_t events, VALUE data)
-{
- rb_event_hook_t *hook = alloc_event_hook(func, events, data);
- hook->next = th->event_hooks;
- th->event_hooks = hook;
- thread_reset_event_flags(th);
-}
-
-static rb_thread_t *
-thval2thread_t(VALUE thval)
-{
- rb_thread_t *th;
- GetThreadPtr(thval, th);
- return th;
-}
-
-void
-rb_thread_add_event_hook(VALUE thval,
- rb_event_hook_func_t func, rb_event_flag_t events, VALUE data)
-{
- rb_threadptr_add_event_hook(thval2thread_t(thval), func, events, data);
-}
-
-static int
-set_threads_event_flags_i(st_data_t key, st_data_t val, st_data_t flag)
-{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th);
-
- if (flag) {
- th->event_flags |= RUBY_EVENT_VM;
- }
- else {
- th->event_flags &= (~RUBY_EVENT_VM);
- }
- return ST_CONTINUE;
-}
-
-static void
-set_threads_event_flags(int flag)
-{
- st_foreach(GET_VM()->living_threads, set_threads_event_flags_i, (st_data_t) flag);
-}
-
-static inline void
-exec_event_hooks(const rb_event_hook_t *hook, rb_event_flag_t flag, VALUE self, ID id, VALUE klass)
-{
- for (; hook; hook = hook->next) {
- if (flag & hook->flag) {
- (*hook->func)(flag, hook->data, self, id, klass);
- }
- }
-}
-
-void
-rb_threadptr_exec_event_hooks(rb_thread_t *th, rb_event_flag_t flag, VALUE self, ID id, VALUE klass)
-{
- const VALUE errinfo = th->errinfo;
- const rb_event_flag_t wait_event = th->event_flags;
-
- if (self == rb_mRubyVMFrozenCore) return;
- if (wait_event & flag) {
- exec_event_hooks(th->event_hooks, flag, self, id, klass);
- }
- if (wait_event & RUBY_EVENT_VM) {
- exec_event_hooks(th->vm->event_hooks, flag, self, id, klass);
- }
- th->errinfo = errinfo;
-}
-
-void
-rb_add_event_hook(rb_event_hook_func_t func, rb_event_flag_t events, VALUE data)
-{
- rb_event_hook_t *hook = alloc_event_hook(func, events, data);
- rb_vm_t *vm = GET_VM();
-
- hook->next = vm->event_hooks;
- vm->event_hooks = hook;
-
- set_threads_event_flags(1);
-}
-
-static int
-remove_event_hook(rb_event_hook_t **root, rb_event_hook_func_t func)
-{
- rb_event_hook_t *prev = NULL, *hook = *root, *next;
-
- while (hook) {
- next = hook->next;
- if (func == 0 || hook->func == func) {
- if (prev) {
- prev->next = hook->next;
- }
- else {
- *root = hook->next;
- }
- xfree(hook);
- }
- else {
- prev = hook;
- }
- hook = next;
- }
- return -1;
-}
-
-static int
-rb_threadptr_revmove_event_hook(rb_thread_t *th, rb_event_hook_func_t func)
-{
- int ret = remove_event_hook(&th->event_hooks, func);
- thread_reset_event_flags(th);
- return ret;
-}
-
-int
-rb_thread_remove_event_hook(VALUE thval, rb_event_hook_func_t func)
-{
- return rb_threadptr_revmove_event_hook(thval2thread_t(thval), func);
-}
-
-int
-rb_remove_event_hook(rb_event_hook_func_t func)
-{
- rb_vm_t *vm = GET_VM();
- rb_event_hook_t *hook = vm->event_hooks;
- int ret = remove_event_hook(&vm->event_hooks, func);
-
- if (hook != NULL && vm->event_hooks == NULL) {
- set_threads_event_flags(0);
- }
-
- return ret;
-}
-
-static int
-clear_trace_func_i(st_data_t key, st_data_t val, st_data_t flag)
-{
- rb_thread_t *th;
- GetThreadPtr((VALUE)key, th);
- rb_threadptr_revmove_event_hook(th, 0);
- return ST_CONTINUE;
-}
-
-void
-rb_clear_trace_func(void)
+VALUE
+rb_exec_recursive_outer_mid(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg, ID mid)
{
- st_foreach(GET_VM()->living_threads, clear_trace_func_i, (st_data_t) 0);
- rb_remove_event_hook(0);
+ return exec_recursive(func, obj, 0, arg, 1, mid);
}
-static void call_trace_func(rb_event_flag_t, VALUE data, VALUE self, ID id, VALUE klass);
-
/*
- * call-seq:
- * set_trace_func(proc) => proc
- * set_trace_func(nil) => nil
- *
- * Establishes _proc_ as the handler for tracing, or disables
- * tracing if the parameter is +nil+. _proc_ takes up
- * to six parameters: an event name, a filename, a line number, an
- * object id, a binding, and the name of a class. _proc_ is
- * invoked whenever an event occurs. Events are: <code>c-call</code>
- * (call a C-language routine), <code>c-return</code> (return from a
- * C-language routine), <code>call</code> (call a Ruby method),
- * <code>class</code> (start a class or module definition),
- * <code>end</code> (finish a class or module definition),
- * <code>line</code> (execute code on a new line), <code>raise</code>
- * (raise an exception), and <code>return</code> (return from a Ruby
- * method). Tracing is disabled within the context of _proc_.
- *
- * class Test
- * def test
- * a = 1
- * b = 2
- * end
- * end
- *
- * set_trace_func proc { |event, file, line, id, binding, classname|
- * printf "%8s %s:%-2d %10s %8s\n", event, file, line, id, classname
- * }
- * t = Test.new
- * t.test
- *
- * line prog.rb:11 false
- * c-call prog.rb:11 new Class
- * c-call prog.rb:11 initialize Object
- * c-return prog.rb:11 initialize Object
- * c-return prog.rb:11 new Class
- * line prog.rb:12 false
- * call prog.rb:2 test Test
- * line prog.rb:3 test Test
- * line prog.rb:4 test Test
- * return prog.rb:4 test Test
+ * If recursion is detected on the current method, obj and paired_obj,
+ * the outermost func will be called with (obj, arg, true). All inner
+ * func will be short-circuited using throw.
*/
-static VALUE
-set_trace_func(VALUE obj, VALUE trace)
-{
- rb_remove_event_hook(call_trace_func);
-
- if (NIL_P(trace)) {
- return Qnil;
- }
-
- if (!rb_obj_is_proc(trace)) {
- rb_raise(rb_eTypeError, "trace_func needs to be Proc");
- }
-
- rb_add_event_hook(call_trace_func, RUBY_EVENT_ALL, trace);
- return trace;
-}
-
-static void
-thread_add_trace_func(rb_thread_t *th, VALUE trace)
+VALUE
+rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg)
{
- if (!rb_obj_is_proc(trace)) {
- rb_raise(rb_eTypeError, "trace_func needs to be Proc");
- }
-
- rb_threadptr_add_event_hook(th, call_trace_func, RUBY_EVENT_ALL, trace);
+ return exec_recursive(func, obj, rb_memory_id(paired_obj), arg, 1, rb_frame_last_func());
}
/*
* call-seq:
- * thr.add_trace_func(proc) => proc
+ * thread.backtrace -> array or nil
+ *
+ * Returns the current backtrace of the target thread.
*
- * Adds _proc_ as a handler for tracing.
- * See <code>Thread#set_trace_func</code> and +set_trace_func+.
*/
static VALUE
-thread_add_trace_func_m(VALUE obj, VALUE trace)
+rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval)
{
- rb_thread_t *th;
- GetThreadPtr(obj, th);
- thread_add_trace_func(th, trace);
- return trace;
+ return rb_vm_thread_backtrace(argc, argv, thval);
}
-/*
- * call-seq:
- * thr.set_trace_func(proc) => proc
- * thr.set_trace_func(nil) => nil
+/* call-seq:
+ * thread.backtrace_locations(*args) -> array or nil
+ *
+ * Returns the execution stack for the target thread---an array containing
+ * backtrace location objects.
*
- * Establishes _proc_ on _thr_ as the handler for tracing, or
- * disables tracing if the parameter is +nil+.
- * See +set_trace_func+.
+ * See Thread::Backtrace::Location for more information.
+ *
+ * This method behaves similarly to Kernel#caller_locations except it applies
+ * to a specific thread.
*/
-
-static VALUE
-thread_set_trace_func_m(VALUE obj, VALUE trace)
-{
- rb_thread_t *th;
- GetThreadPtr(obj, th);
- rb_threadptr_revmove_event_hook(th, call_trace_func);
-
- if (NIL_P(trace)) {
- return Qnil;
- }
- thread_add_trace_func(th, trace);
- return trace;
-}
-
-static const char *
-get_event_name(rb_event_flag_t event)
-{
- switch (event) {
- case RUBY_EVENT_LINE:
- return "line";
- case RUBY_EVENT_CLASS:
- return "class";
- case RUBY_EVENT_END:
- return "end";
- case RUBY_EVENT_CALL:
- return "call";
- case RUBY_EVENT_RETURN:
- return "return";
- case RUBY_EVENT_C_CALL:
- return "c-call";
- case RUBY_EVENT_C_RETURN:
- return "c-return";
- case RUBY_EVENT_RAISE:
- return "raise";
- default:
- return "unknown";
- }
-}
-
-VALUE ruby_suppress_tracing(VALUE (*func)(VALUE, int), VALUE arg, int always);
-
-struct call_trace_func_args {
- rb_event_flag_t event;
- VALUE proc;
- VALUE self;
- ID id;
- VALUE klass;
-};
-
static VALUE
-call_trace_proc(VALUE args, int tracing)
-{
- struct call_trace_func_args *p = (struct call_trace_func_args *)args;
- const char *srcfile = rb_sourcefile();
- VALUE eventname = rb_str_new2(get_event_name(p->event));
- VALUE filename = srcfile ? rb_str_new2(srcfile) : Qnil;
- VALUE argv[6];
- int line = rb_sourceline();
- ID id = 0;
- VALUE klass = 0;
-
- if (p->event == RUBY_EVENT_C_CALL ||
- p->event == RUBY_EVENT_C_RETURN) {
- id = p->id;
- klass = p->klass;
- }
- else {
- rb_thread_method_id_and_class(GET_THREAD(), &id, &klass);
- }
- if (id == ID_ALLOCATOR)
- return Qnil;
- if (klass) {
- if (TYPE(klass) == T_ICLASS) {
- klass = RBASIC(klass)->klass;
- }
- else if (FL_TEST(klass, FL_SINGLETON)) {
- klass = rb_iv_get(klass, "__attached__");
- }
- }
-
- argv[0] = eventname;
- argv[1] = filename;
- argv[2] = INT2FIX(line);
- argv[3] = id ? ID2SYM(id) : Qnil;
- argv[4] = (p->self && srcfile) ? rb_binding_new() : Qnil;
- argv[5] = klass ? klass : Qnil;
-
- return rb_proc_call_with_block(p->proc, 6, argv, Qnil);
-}
-
-static void
-call_trace_func(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass)
+rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval)
{
- struct call_trace_func_args args;
-
- args.event = event;
- args.proc = proc;
- args.self = self;
- args.id = id;
- args.klass = klass;
- ruby_suppress_tracing(call_trace_proc, (VALUE)&args, FALSE);
+ return rb_vm_thread_backtrace_locations(argc, argv, thval);
}
-VALUE
-ruby_suppress_tracing(VALUE (*func)(VALUE, int), VALUE arg, int always)
+void
+Init_Thread_Mutex(void)
{
rb_thread_t *th = GET_THREAD();
- int state, tracing;
- volatile int raised;
- VALUE result = Qnil;
-
- if ((tracing = th->tracing) != 0 && !always) {
- return Qnil;
- }
- else {
- th->tracing = 1;
- }
-
- raised = rb_threadptr_reset_raised(th);
-
- PUSH_TAG();
- if ((state = EXEC_TAG()) == 0) {
- result = (*func)(arg, tracing);
- }
-
- if (raised) {
- rb_threadptr_set_raised(th);
- }
- POP_TAG();
-
- th->tracing = tracing;
- if (state) {
- JUMP_TAG(state);
- }
- return result;
+ rb_native_mutex_initialize(&th->vm->workqueue_lock);
+ rb_native_mutex_initialize(&th->interrupt_lock);
}
-VALUE rb_thread_backtrace(VALUE thval);
-
/*
- * call-seq:
- * thr.backtrace => array
+ * Document-class: ThreadError
*
- * Returns the current back trace of the _thr_.
- */
-
-static VALUE
-rb_thread_backtrace_m(VALUE thval)
-{
- return rb_thread_backtrace(thval);
-}
-
-/*
- * +Thread+ encapsulates the behavior of a thread of
- * execution, including the main thread of the Ruby script.
+ * Raised when an invalid operation is attempted on a thread.
+ *
+ * For example, when no other thread has been started:
+ *
+ * Thread.stop
*
- * In the descriptions of the methods in this class, the parameter _sym_
- * refers to a symbol, which is either a quoted string or a
- * +Symbol+ (such as <code>:name</code>).
+ * This will raises the following exception:
+ *
+ * ThreadError: stopping only thread
+ * note: use sleep to stop forever
*/
void
Init_Thread(void)
{
-#undef rb_intern
-#define rb_intern(str) rb_intern_const(str)
+ rb_thread_t *th = GET_THREAD();
- VALUE cThGroup;
+ sym_never = ID2SYM(rb_intern_const("never"));
+ sym_immediate = ID2SYM(rb_intern_const("immediate"));
+ sym_on_blocking = ID2SYM(rb_intern_const("on_blocking"));
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0);
rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0);
- rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0);
+ rb_define_singleton_method(rb_cThread, "stop", thread_stop, 0);
rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1);
rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0);
rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0);
- rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0);
+ rb_define_singleton_method(rb_cThread, "list", thread_list, 0);
rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0);
rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1);
-#if THREAD_DEBUG < 0
- rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
- rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
-#endif
+ rb_define_singleton_method(rb_cThread, "report_on_exception", rb_thread_s_report_exc, 0);
+ rb_define_singleton_method(rb_cThread, "report_on_exception=", rb_thread_s_report_exc_set, 1);
+ rb_define_singleton_method(rb_cThread, "ignore_deadlock", rb_thread_s_ignore_deadlock, 0);
+ rb_define_singleton_method(rb_cThread, "ignore_deadlock=", rb_thread_s_ignore_deadlock_set, 1);
+ rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
+ rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
+ rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
@@ -4132,20 +5707,34 @@ Init_Thread(void)
rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0);
rb_define_method(rb_cThread, "[]", rb_thread_aref, 1);
rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2);
+ rb_define_method(rb_cThread, "fetch", rb_thread_fetch, -1);
rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1);
rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
rb_define_method(rb_cThread, "status", rb_thread_status, 0);
+ rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
+ rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
+ rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0);
+ rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1);
rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0);
rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0);
rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0);
rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1);
- rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0);
+ rb_define_method(rb_cThread, "report_on_exception", rb_thread_report_exc, 0);
+ rb_define_method(rb_cThread, "report_on_exception=", rb_thread_report_exc_set, 1);
rb_define_method(rb_cThread, "group", rb_thread_group, 0);
- rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, 0);
+ rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
+ rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
- rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
+ rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
+ rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
+ rb_define_method(rb_cThread, "native_thread_id", rb_thread_native_thread_id, 0);
+ rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);
+ rb_define_alias(rb_cThread, "inspect", "to_s");
+
+ rb_vm_register_special_exception(ruby_error_stream_closed, rb_eIOError,
+ "stream closed in another thread");
cThGroup = rb_define_class("ThreadGroup", rb_cObject);
rb_define_alloc_func(cThGroup, thgroup_s_alloc);
@@ -4154,46 +5743,49 @@ Init_Thread(void)
rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0);
rb_define_method(cThGroup, "add", thgroup_add, 1);
- {
- rb_thread_t *th = GET_THREAD();
- th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
- rb_define_const(cThGroup, "Default", th->thgroup);
+ const char * ptr = getenv("RUBY_THREAD_TIMESLICE");
+
+ if (ptr) {
+ long quantum = strtol(ptr, NULL, 0);
+ if (quantum > 0 && !(SIZEOF_LONG > 4 && quantum > UINT32_MAX)) {
+ thread_default_quantum_ms = (uint32_t)quantum;
+ }
+ else if (0) {
+ fprintf(stderr, "Ignored RUBY_THREAD_TIMESLICE=%s\n", ptr);
+ }
}
- rb_cMutex = rb_define_class("Mutex", rb_cObject);
- rb_define_alloc_func(rb_cMutex, mutex_alloc);
- rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
- rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
- rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
- rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
- rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
- rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
+ {
+ th->thgroup = th->ractor->thgroup_default = rb_obj_alloc(cThGroup);
+ rb_define_const(cThGroup, "Default", th->thgroup);
+ }
- recursive_key = rb_intern("__recursive_key__");
rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
- /* trace */
- rb_define_global_function("set_trace_func", set_trace_func, 1);
- rb_define_method(rb_cThread, "set_trace_func", thread_set_trace_func_m, 1);
- rb_define_method(rb_cThread, "add_trace_func", thread_add_trace_func_m, 1);
-
/* init thread core */
- Init_native_thread();
{
- /* main thread setting */
- {
- /* acquire global vm lock */
- rb_thread_lock_t *lp = &GET_THREAD()->vm->global_vm_lock;
- native_mutex_initialize(lp);
- native_mutex_lock(lp);
- native_mutex_initialize(&GET_THREAD()->interrupt_lock);
- }
+ /* main thread setting */
+ {
+ /* acquire global vm lock */
+#ifdef HAVE_PTHREAD_NP_H
+ VM_ASSERT(TH_SCHED(th)->running == th);
+#endif
+ // thread_sched_to_running() should not be called because
+ // it assumes blocked by thread_sched_to_waiting().
+ // thread_sched_to_running(sched, th);
+
+ th->pending_interrupt_queue = rb_ary_hidden_new(0);
+ th->pending_interrupt_queue_checked = 0;
+ th->pending_interrupt_mask_stack = rb_ary_hidden_new(0);
+ }
}
rb_thread_create_timer_thread();
- (void)native_mutex_trylock;
- (void)ruby_thread_set_native;
+ Init_thread_sync();
+
+ // TODO: Suppress unused function warning for now
+ // if (0) rb_thread_sched_destroy(NULL);
}
int
@@ -4204,90 +5796,215 @@ ruby_native_thread_p(void)
return th != 0;
}
-static int
-check_deadlock_i(st_data_t key, st_data_t val, int *found)
+#ifdef NON_SCALAR_THREAD_ID
+ #define thread_id_str(th) (NULL)
+#else
+ #define thread_id_str(th) ((void *)(uintptr_t)(th)->nt->thread_id)
+#endif
+
+static void
+debug_deadlock_check(rb_ractor_t *r, VALUE msg)
{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th);
+ rb_thread_t *th = 0;
+ VALUE sep = rb_str_new_cstr("\n ");
- if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th) || th->transition_for_lock) {
- *found = 1;
- }
- else if (th->locking_mutex) {
- mutex_t *mutex;
- GetMutexPtr(th->locking_mutex, mutex);
+ rb_str_catf(msg, "\n%d threads, %d sleeps current:%p main thread:%p\n",
+ rb_ractor_living_thread_num(r), rb_ractor_sleeper_thread_num(r),
+ (void *)GET_THREAD(), (void *)r->threads.main);
- native_mutex_lock(&mutex->lock);
- if (mutex->th == th || (!mutex->th && mutex->cond_notified)) {
- *found = 1;
- }
- native_mutex_unlock(&mutex->lock);
- }
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ rb_str_catf(msg, "* %+"PRIsVALUE"\n rb_thread_t:%p "
+ "native:%p int:%u",
+ th->self, (void *)th, th->nt ? thread_id_str(th) : "N/A", th->ec->interrupt_flag);
+
+ if (th->locking_mutex) {
+ rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
+ rb_str_catf(msg, " mutex:%llu cond:%"PRIuSIZE,
+ (unsigned long long)mutex->ec_serial, rb_mutex_num_waiting(mutex));
+ }
- return (*found) ? ST_STOP : ST_CONTINUE;
+ {
+ struct rb_waiting_list *list = th->join_list;
+ while (list) {
+ rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread);
+ list = list->next;
+ }
+ }
+ rb_str_catf(msg, "\n ");
+ rb_str_concat(msg, rb_ary_join(rb_ec_backtrace_str_ary(th->ec, RUBY_BACKTRACE_START, RUBY_ALL_BACKTRACE_LINES), sep));
+ rb_str_catf(msg, "\n");
+ }
}
-#if 0 /* for debug */
-static int
-debug_i(st_data_t key, st_data_t val, int *found)
+static void
+rb_check_deadlock(rb_ractor_t *r)
{
- VALUE thval = key;
- rb_thread_t *th;
- GetThreadPtr(thval, th);
+ if (GET_THREAD()->vm->thread_ignore_deadlock) return;
+
+#ifdef RUBY_THREAD_PTHREAD_H
+ if (r->threads.sched.readyq_cnt > 0) return;
+#endif
- printf("th:%p %d %d %d", th, th->status, th->interrupt_flag, th->transition_for_lock);
- if (th->locking_mutex) {
- mutex_t *mutex;
- GetMutexPtr(th->locking_mutex, mutex);
+ int sleeper_num = rb_ractor_sleeper_thread_num(r);
+ int ltnum = rb_ractor_living_thread_num(r);
- native_mutex_lock(&mutex->lock);
- printf(" %p %d\n", mutex->th, mutex->cond_notified);
- native_mutex_unlock(&mutex->lock);
+ if (ltnum > sleeper_num) return;
+ if (ltnum < sleeper_num) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
+
+ int found = 0;
+ rb_thread_t *th = NULL;
+
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th->ec)) {
+ found = 1;
+ }
+ else if (th->locking_mutex) {
+ rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
+ if (mutex->ec_serial == rb_ec_serial(th->ec) || (!mutex->ec_serial && !ccan_list_empty(&mutex->waitq))) {
+ found = 1;
+ }
+ }
+ if (found)
+ break;
}
- else puts("");
- return ST_CONTINUE;
+ if (!found) {
+ VALUE argv[2];
+ argv[0] = rb_eFatal;
+ argv[1] = rb_str_new2("No live threads left. Deadlock?");
+ debug_deadlock_check(r, argv[1]);
+ rb_ractor_sleeper_threads_dec(GET_RACTOR());
+ rb_threadptr_raise(r->threads.main, 2, argv);
+ }
+}
+
+static void
+update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
+{
+ const rb_control_frame_t *cfp = GET_EC()->cfp;
+ VALUE coverage = rb_iseq_coverage(cfp->iseq);
+ if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
+ VALUE lines = RARRAY_AREF(coverage, COVERAGE_INDEX_LINES);
+ if (lines) {
+ long line = rb_sourceline() - 1;
+ VM_ASSERT(line >= 0);
+ long count;
+ VALUE num;
+ void rb_iseq_clear_event_flags(const rb_iseq_t *iseq, size_t pos, rb_event_flag_t reset);
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_ONESHOT_LINES) {
+ rb_iseq_clear_event_flags(cfp->iseq, cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1, RUBY_EVENT_COVERAGE_LINE);
+ rb_ary_push(lines, LONG2FIX(line + 1));
+ return;
+ }
+ if (line >= RARRAY_LEN(lines)) { /* no longer tracked */
+ return;
+ }
+ num = RARRAY_AREF(lines, line);
+ if (!FIXNUM_P(num)) return;
+ count = FIX2LONG(num) + 1;
+ if (POSFIXABLE(count)) {
+ RARRAY_ASET(lines, line, LONG2FIX(count));
+ }
+ }
+ }
}
-#endif
static void
-rb_check_deadlock(rb_vm_t *vm)
+update_branch_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
+{
+ const rb_control_frame_t *cfp = GET_EC()->cfp;
+ VALUE coverage = rb_iseq_coverage(cfp->iseq);
+ if (RB_TYPE_P(coverage, T_ARRAY) && !RBASIC_CLASS(coverage)) {
+ VALUE branches = RARRAY_AREF(coverage, COVERAGE_INDEX_BRANCHES);
+ if (branches) {
+ long pc = cfp->pc - ISEQ_BODY(cfp->iseq)->iseq_encoded - 1;
+ long idx = FIX2INT(RARRAY_AREF(ISEQ_PC2BRANCHINDEX(cfp->iseq), pc)), count;
+ VALUE counters = RARRAY_AREF(branches, 1);
+ VALUE num = RARRAY_AREF(counters, idx);
+ count = FIX2LONG(num) + 1;
+ if (POSFIXABLE(count)) {
+ RARRAY_ASET(counters, idx, LONG2FIX(count));
+ }
+ }
+ }
+}
+
+const rb_method_entry_t *
+rb_resolve_me_location(const rb_method_entry_t *me, VALUE resolved_location[5])
{
- int found = 0;
+ VALUE path, beg_pos_lineno, beg_pos_column, end_pos_lineno, end_pos_column;
- if (vm_living_thread_num(vm) > vm->sleeper) return;
- if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)");
+ if (!me->def) return NULL; // negative cme
- st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found);
+ retry:
+ switch (me->def->type) {
+ case VM_METHOD_TYPE_ISEQ: {
+ const rb_iseq_t *iseq = me->def->body.iseq.iseqptr;
+ rb_iseq_location_t *loc = &ISEQ_BODY(iseq)->location;
+ path = rb_iseq_path(iseq);
+ beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
+ beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
+ end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
+ end_pos_column = INT2FIX(loc->code_location.end_pos.column);
+ break;
+ }
+ case VM_METHOD_TYPE_BMETHOD: {
+ const rb_iseq_t *iseq = rb_proc_get_iseq(me->def->body.bmethod.proc, 0);
+ if (iseq) {
+ rb_iseq_location_t *loc;
+ rb_iseq_check(iseq);
+ path = rb_iseq_path(iseq);
+ loc = &ISEQ_BODY(iseq)->location;
+ beg_pos_lineno = INT2FIX(loc->code_location.beg_pos.lineno);
+ beg_pos_column = INT2FIX(loc->code_location.beg_pos.column);
+ end_pos_lineno = INT2FIX(loc->code_location.end_pos.lineno);
+ end_pos_column = INT2FIX(loc->code_location.end_pos.column);
+ break;
+ }
+ return NULL;
+ }
+ case VM_METHOD_TYPE_ALIAS:
+ me = me->def->body.alias.original_me;
+ goto retry;
+ case VM_METHOD_TYPE_REFINED:
+ me = me->def->body.refined.orig_me;
+ if (!me) return NULL;
+ goto retry;
+ default:
+ return NULL;
+ }
- if (!found) {
- VALUE argv[2];
- argv[0] = rb_eFatal;
- argv[1] = rb_str_new2("deadlock detected");
-#if 0 /* for debug */
- printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread);
- st_foreach(vm->living_threads, debug_i, (st_data_t)0);
-#endif
- vm->sleeper--;
- rb_threadptr_raise(vm->main_thread, 2, argv);
+ /* found */
+ if (RB_TYPE_P(path, T_ARRAY)) {
+ path = rb_ary_entry(path, 1);
+ if (!RB_TYPE_P(path, T_STRING)) return NULL; /* just for the case... */
+ }
+ if (resolved_location) {
+ resolved_location[0] = path;
+ resolved_location[1] = beg_pos_lineno;
+ resolved_location[2] = beg_pos_column;
+ resolved_location[3] = end_pos_lineno;
+ resolved_location[4] = end_pos_column;
}
+ return me;
}
static void
-update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass)
+update_method_coverage(VALUE me2counter, rb_trace_arg_t *trace_arg)
{
- VALUE coverage = GET_THREAD()->cfp->iseq->coverage;
- if (coverage && RBASIC(coverage)->klass == 0) {
- long line = rb_sourceline() - 1;
- long count;
- if (RARRAY_PTR(coverage)[line] == Qnil) {
- rb_bug("bug");
- }
- count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1;
- if (POSFIXABLE(count)) {
- RARRAY_PTR(coverage)[line] = LONG2FIX(count);
- }
+ const rb_control_frame_t *cfp = GET_EC()->cfp;
+ const rb_callable_method_entry_t *cme = rb_vm_frame_method_entry(cfp);
+ const rb_method_entry_t *me = (const rb_method_entry_t *)cme;
+ VALUE rcount;
+ long count;
+
+ me = rb_resolve_me_location(me, 0);
+ if (!me) return;
+
+ rcount = rb_hash_aref(me2counter, (VALUE) me);
+ count = FIXNUM_P(rcount) ? FIX2LONG(rcount) + 1 : 1;
+ if (POSFIXABLE(count)) {
+ rb_hash_aset(me2counter, (VALUE) me, LONG2FIX(count));
}
}
@@ -4297,16 +6014,288 @@ rb_get_coverages(void)
return GET_VM()->coverages;
}
+int
+rb_get_coverage_mode(void)
+{
+ return GET_VM()->coverage_mode;
+}
+
void
-rb_set_coverages(VALUE coverages)
+rb_set_coverages(VALUE coverages, int mode, VALUE me2counter)
{
GET_VM()->coverages = coverages;
- rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil);
+ GET_VM()->me2counter = me2counter;
+ GET_VM()->coverage_mode = mode;
+}
+
+void
+rb_resume_coverages(void)
+{
+ int mode = GET_VM()->coverage_mode;
+ VALUE me2counter = GET_VM()->me2counter;
+ rb_add_event_hook2((rb_event_hook_func_t) update_line_coverage, RUBY_EVENT_COVERAGE_LINE, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ if (mode & COVERAGE_TARGET_BRANCHES) {
+ rb_add_event_hook2((rb_event_hook_func_t) update_branch_coverage, RUBY_EVENT_COVERAGE_BRANCH, Qnil, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ }
+ if (mode & COVERAGE_TARGET_METHODS) {
+ rb_add_event_hook2((rb_event_hook_func_t) update_method_coverage, RUBY_EVENT_CALL, me2counter, RUBY_EVENT_HOOK_FLAG_SAFE | RUBY_EVENT_HOOK_FLAG_RAW_ARG);
+ }
+}
+
+void
+rb_suspend_coverages(void)
+{
+ rb_remove_event_hook((rb_event_hook_func_t) update_line_coverage);
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_BRANCHES) {
+ rb_remove_event_hook((rb_event_hook_func_t) update_branch_coverage);
+ }
+ if (GET_VM()->coverage_mode & COVERAGE_TARGET_METHODS) {
+ rb_remove_event_hook((rb_event_hook_func_t) update_method_coverage);
+ }
}
+/* Make coverage arrays empty so old covered files are no longer tracked. */
void
rb_reset_coverages(void)
{
+ rb_clear_coverages();
+ rb_iseq_remove_coverage_all();
GET_VM()->coverages = Qfalse;
- rb_remove_event_hook(update_coverage);
}
+
+VALUE
+rb_default_coverage(int n)
+{
+ VALUE coverage = rb_ary_hidden_new_fill(3);
+ VALUE lines = Qfalse, branches = Qfalse;
+ int mode = GET_VM()->coverage_mode;
+
+ if (mode & COVERAGE_TARGET_LINES) {
+ lines = n > 0 ? rb_ary_hidden_new_fill(n) : rb_ary_hidden_new(0);
+ }
+ RARRAY_ASET(coverage, COVERAGE_INDEX_LINES, lines);
+
+ if (mode & COVERAGE_TARGET_BRANCHES) {
+ branches = rb_ary_hidden_new_fill(2);
+ /* internal data structures for branch coverage:
+ *
+ * { branch base node =>
+ * [base_type, base_first_lineno, base_first_column, base_last_lineno, base_last_column, {
+ * branch target id =>
+ * [target_type, target_first_lineno, target_first_column, target_last_lineno, target_last_column, target_counter_index],
+ * ...
+ * }],
+ * ...
+ * }
+ *
+ * Example:
+ * { NODE_CASE =>
+ * [1, 0, 4, 3, {
+ * NODE_WHEN => [2, 8, 2, 9, 0],
+ * NODE_WHEN => [3, 8, 3, 9, 1],
+ * ...
+ * }],
+ * ...
+ * }
+ */
+ VALUE structure = rb_hash_new();
+ rb_obj_hide(structure);
+ RARRAY_ASET(branches, 0, structure);
+ /* branch execution counters */
+ RARRAY_ASET(branches, 1, rb_ary_hidden_new(0));
+ }
+ RARRAY_ASET(coverage, COVERAGE_INDEX_BRANCHES, branches);
+
+ return coverage;
+}
+
+static VALUE
+uninterruptible_exit(VALUE v)
+{
+ rb_thread_t *cur_th = GET_THREAD();
+ rb_ary_pop(cur_th->pending_interrupt_mask_stack);
+
+ cur_th->pending_interrupt_queue_checked = 0;
+ if (!rb_threadptr_pending_interrupt_empty_p(cur_th)) {
+ RUBY_VM_SET_INTERRUPT(cur_th->ec);
+ }
+ return Qnil;
+}
+
+VALUE
+rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data)
+{
+ VALUE interrupt_mask = rb_ident_hash_new();
+ rb_thread_t *cur_th = GET_THREAD();
+
+ rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
+ OBJ_FREEZE(interrupt_mask);
+ rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
+
+ VALUE ret = rb_ensure(b_proc, data, uninterruptible_exit, Qnil);
+
+ RUBY_VM_CHECK_INTS(cur_th->ec);
+ return ret;
+}
+
+static void
+thread_specific_storage_alloc(rb_thread_t *th)
+{
+ VM_ASSERT(th->specific_storage == NULL);
+
+ if (UNLIKELY(specific_key_count > 0)) {
+ th->specific_storage = ZALLOC_N(void *, RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ }
+}
+
+rb_internal_thread_specific_key_t
+rb_internal_thread_specific_key_create(void)
+{
+ rb_vm_t *vm = GET_VM();
+
+ if (specific_key_count == 0 && vm->ractor.cnt > 1) {
+ rb_raise(rb_eThreadError, "The first rb_internal_thread_specific_key_create() is called with multiple ractors");
+ }
+ else if (specific_key_count > RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX) {
+ rb_raise(rb_eThreadError, "rb_internal_thread_specific_key_create() is called more than %d times", RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ }
+ else {
+ rb_internal_thread_specific_key_t key = specific_key_count++;
+
+ if (key == 0) {
+ // allocate
+ rb_ractor_t *cr = GET_RACTOR();
+ rb_thread_t *th;
+
+ ccan_list_for_each(&cr->threads.set, th, lt_node) {
+ thread_specific_storage_alloc(th);
+ }
+ }
+ return key;
+ }
+}
+
+// async and native thread safe.
+void *
+rb_internal_thread_specific_get(VALUE thread_val, rb_internal_thread_specific_key_t key)
+{
+ rb_thread_t *th = DATA_PTR(thread_val);
+
+ VM_ASSERT(rb_thread_ptr(thread_val) == th);
+ VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ VM_ASSERT(th->specific_storage);
+
+ return th->specific_storage[key];
+}
+
+// async and native thread safe.
+void
+rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_key_t key, void *data)
+{
+ rb_thread_t *th = DATA_PTR(thread_val);
+
+ VM_ASSERT(rb_thread_ptr(thread_val) == th);
+ VM_ASSERT(key < RB_INTERNAL_THREAD_SPECIFIC_KEY_MAX);
+ VM_ASSERT(th->specific_storage);
+
+ th->specific_storage[key] = data;
+}
+
+// interrupt_exec
+
+struct rb_interrupt_exec_task {
+ struct ccan_list_node node;
+
+ rb_interrupt_exec_func_t *func;
+ void *data;
+ enum rb_interrupt_exec_flag flags;
+};
+
+void
+rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th)
+{
+ struct rb_interrupt_exec_task *task;
+
+ ccan_list_for_each(&th->interrupt_exec_tasks, task, node) {
+ if (task->flags & rb_interrupt_exec_flag_value_data) {
+ rb_gc_mark((VALUE)task->data);
+ }
+ }
+}
+
+// native thread safe
+// th should be available
+void
+rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
+{
+ // should not use ALLOC
+ struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task);
+ *task = (struct rb_interrupt_exec_task) {
+ .flags = flags,
+ .func = func,
+ .data = data,
+ };
+
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
+ threadptr_set_interrupt_locked(th, true);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
+
+static void
+threadptr_interrupt_exec_exec(rb_thread_t *th)
+{
+ while (1) {
+ struct rb_interrupt_exec_task *task;
+
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+
+ RUBY_DEBUG_LOG("task:%p", task);
+
+ if (task) {
+ if (task->flags & rb_interrupt_exec_flag_new_thread) {
+ rb_thread_create(task->func, task->data);
+ }
+ else {
+ (*task->func)(task->data);
+ }
+ ruby_xfree(task);
+ }
+ else {
+ break;
+ }
+ }
+}
+
+static void
+threadptr_interrupt_exec_cleanup(rb_thread_t *th)
+{
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ struct rb_interrupt_exec_task *task;
+
+ while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) {
+ ruby_xfree(task);
+ }
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
+
+// native thread safe
+// func/data should be native thread safe
+void
+rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
+ rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
+{
+ RUBY_DEBUG_LOG("flags:%d", (int)flags);
+
+ rb_thread_t *main_th = target_r->threads.main;
+ rb_threadptr_interrupt_exec(main_th, func, data, flags | rb_interrupt_exec_flag_new_thread);
+}
+