summaryrefslogtreecommitdiff
path: root/thread_pthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_pthread.c')
-rw-r--r--thread_pthread.c226
1 files changed, 117 insertions, 109 deletions
diff --git a/thread_pthread.c b/thread_pthread.c
index ee2f7bc909..427897cfd8 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -122,26 +122,14 @@ static struct {
};
#endif
-void rb_native_mutex_lock(rb_nativethread_lock_t *lock);
-void rb_native_mutex_unlock(rb_nativethread_lock_t *lock);
-static int native_mutex_trylock(rb_nativethread_lock_t *lock);
-void rb_native_mutex_initialize(rb_nativethread_lock_t *lock);
-void rb_native_mutex_destroy(rb_nativethread_lock_t *lock);
-void rb_native_cond_signal(rb_nativethread_cond_t *cond);
-void rb_native_cond_broadcast(rb_nativethread_cond_t *cond);
-void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
-void rb_native_cond_initialize(rb_nativethread_cond_t *cond);
-void rb_native_cond_destroy(rb_nativethread_cond_t *cond);
-static void clear_thread_cache_altstack(void);
-static void ubf_wakeup_all_threads(void);
-static int ubf_threads_empty(void);
-static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
- const rb_hrtime_t *abs);
static const rb_hrtime_t *sigwait_timeout(rb_thread_t *, int sigwait_fd,
const rb_hrtime_t *,
int *drained_p);
static void ubf_timer_disarm(void);
static void threadptr_trap_interrupt(rb_thread_t *);
+static void clear_thread_cache_altstack(void);
+static void ubf_wakeup_all_threads(void);
+static int ubf_threads_empty(void);
#define TIMER_THREAD_CREATED_P() (signal_self_pipe.owner_process == getpid())
@@ -180,17 +168,18 @@ static const void *const condattr_monotonic = NULL;
#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000)
static rb_hrtime_t native_cond_timeout(rb_nativethread_cond_t *, rb_hrtime_t);
+static int native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs);
/*
* Designate the next gvl.timer thread, favor the last thread in
* the waitq since it will be in waitq longest
*/
static int
-designate_timer_thread(rb_vm_t *vm)
+designate_timer_thread(rb_global_vm_lock_t *gvl)
{
native_thread_data_t *last;
- last = list_tail(&vm->gvl.waitq, native_thread_data_t, node.ubf);
+ last = list_tail(&gvl->waitq, native_thread_data_t, node.ubf);
if (last) {
rb_native_cond_signal(&last->cond.gvlq);
return TRUE;
@@ -203,29 +192,30 @@ designate_timer_thread(rb_vm_t *vm)
* periodically. Continue on old timeout if it expired.
*/
static void
-do_gvl_timer(rb_vm_t *vm, rb_thread_t *th)
+do_gvl_timer(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
static rb_hrtime_t abs;
native_thread_data_t *nd = &th->native_thread_data;
- vm->gvl.timer = th;
+ gvl->timer = th;
/* take over wakeups from UBF_TIMER */
ubf_timer_disarm();
- if (vm->gvl.timer_err == ETIMEDOUT) {
+ if (gvl->timer_err == ETIMEDOUT) {
abs = native_cond_timeout(&nd->cond.gvlq, TIME_QUANTUM_NSEC);
}
- vm->gvl.timer_err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &abs);
+ gvl->timer_err = native_cond_timedwait(&nd->cond.gvlq, &gvl->lock, &abs);
ubf_wakeup_all_threads();
- ruby_sigchld_handler(vm);
+ ruby_sigchld_handler(GET_VM());
+
if (UNLIKELY(rb_signal_buff_size())) {
- if (th == vm->main_thread) {
+ if (th == GET_VM()->ractor.main_thread) {
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
else {
- threadptr_trap_interrupt(vm->main_thread);
+ threadptr_trap_interrupt(GET_VM()->ractor.main_thread);
}
}
@@ -233,77 +223,77 @@ do_gvl_timer(rb_vm_t *vm, rb_thread_t *th)
* Timeslice. Warning: the process may fork while this
* thread is contending for GVL:
*/
- if (vm->gvl.owner) timer_thread_function();
- vm->gvl.timer = 0;
+ if (gvl->owner) timer_thread_function(gvl->owner->ec);
+ gvl->timer = 0;
}
static void
-gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
+gvl_acquire_common(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
- if (vm->gvl.owner) {
+ if (gvl->owner) {
native_thread_data_t *nd = &th->native_thread_data;
VM_ASSERT(th->unblock.func == 0 &&
"we must not be in ubf_list and GVL waitq at the same time");
- list_add_tail(&vm->gvl.waitq, &nd->node.gvl);
+ list_add_tail(&gvl->waitq, &nd->node.gvl);
do {
- if (!vm->gvl.timer) {
- do_gvl_timer(vm, th);
+ if (!gvl->timer) {
+ do_gvl_timer(gvl, th);
}
else {
- rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock);
+ rb_native_cond_wait(&nd->cond.gvlq, &gvl->lock);
}
- } while (vm->gvl.owner);
+ } while (gvl->owner);
list_del_init(&nd->node.gvl);
- if (vm->gvl.need_yield) {
- vm->gvl.need_yield = 0;
- rb_native_cond_signal(&vm->gvl.switch_cond);
+ if (gvl->need_yield) {
+ gvl->need_yield = 0;
+ rb_native_cond_signal(&gvl->switch_cond);
}
}
else { /* reset timer if uncontended */
- vm->gvl.timer_err = ETIMEDOUT;
+ gvl->timer_err = ETIMEDOUT;
}
- vm->gvl.owner = th;
- if (!vm->gvl.timer) {
- if (!designate_timer_thread(vm) && !ubf_threads_empty()) {
+ gvl->owner = th;
+ if (!gvl->timer) {
+ if (!designate_timer_thread(gvl) && !ubf_threads_empty()) {
rb_thread_wakeup_timer_thread(-1);
}
}
}
static void
-gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
+gvl_acquire(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
- rb_native_mutex_lock(&vm->gvl.lock);
- gvl_acquire_common(vm, th);
- rb_native_mutex_unlock(&vm->gvl.lock);
+ rb_native_mutex_lock(&gvl->lock);
+ gvl_acquire_common(gvl, th);
+ rb_native_mutex_unlock(&gvl->lock);
}
static const native_thread_data_t *
-gvl_release_common(rb_vm_t *vm)
+gvl_release_common(rb_global_vm_lock_t *gvl)
{
native_thread_data_t *next;
- vm->gvl.owner = 0;
- next = list_top(&vm->gvl.waitq, native_thread_data_t, node.ubf);
+ gvl->owner = 0;
+ next = list_top(&gvl->waitq, native_thread_data_t, node.ubf);
if (next) rb_native_cond_signal(&next->cond.gvlq);
return next;
}
static void
-gvl_release(rb_vm_t *vm)
+gvl_release(rb_global_vm_lock_t *gvl)
{
- rb_native_mutex_lock(&vm->gvl.lock);
- gvl_release_common(vm);
- rb_native_mutex_unlock(&vm->gvl.lock);
+ rb_native_mutex_lock(&gvl->lock);
+ gvl_release_common(gvl);
+ rb_native_mutex_unlock(&gvl->lock);
}
static void
-gvl_yield(rb_vm_t *vm, rb_thread_t *th)
+gvl_yield(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
const native_thread_data_t *next;
@@ -312,49 +302,49 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th)
* (perhaps looping in io_close_fptr) so we kick them:
*/
ubf_wakeup_all_threads();
- rb_native_mutex_lock(&vm->gvl.lock);
- next = gvl_release_common(vm);
+ rb_native_mutex_lock(&gvl->lock);
+ next = gvl_release_common(gvl);
/* An another thread is processing GVL yield. */
- if (UNLIKELY(vm->gvl.wait_yield)) {
- while (vm->gvl.wait_yield)
- rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
+ if (UNLIKELY(gvl->wait_yield)) {
+ while (gvl->wait_yield)
+ rb_native_cond_wait(&gvl->switch_wait_cond, &gvl->lock);
}
else if (next) {
/* Wait until another thread task takes GVL. */
- vm->gvl.need_yield = 1;
- vm->gvl.wait_yield = 1;
- while (vm->gvl.need_yield)
- rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
- vm->gvl.wait_yield = 0;
- rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
+ gvl->need_yield = 1;
+ gvl->wait_yield = 1;
+ while (gvl->need_yield)
+ rb_native_cond_wait(&gvl->switch_cond, &gvl->lock);
+ gvl->wait_yield = 0;
+ rb_native_cond_broadcast(&gvl->switch_wait_cond);
}
else {
- rb_native_mutex_unlock(&vm->gvl.lock);
+ rb_native_mutex_unlock(&gvl->lock);
native_thread_yield();
- rb_native_mutex_lock(&vm->gvl.lock);
- rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
+ rb_native_mutex_lock(&gvl->lock);
+ rb_native_cond_broadcast(&gvl->switch_wait_cond);
}
- gvl_acquire_common(vm, th);
- rb_native_mutex_unlock(&vm->gvl.lock);
+ gvl_acquire_common(gvl, th);
+ rb_native_mutex_unlock(&gvl->lock);
}
-static void
-gvl_init(rb_vm_t *vm)
+void
+rb_gvl_init(rb_global_vm_lock_t *gvl)
{
- rb_native_mutex_initialize(&vm->gvl.lock);
- rb_native_cond_initialize(&vm->gvl.switch_cond);
- rb_native_cond_initialize(&vm->gvl.switch_wait_cond);
- list_head_init(&vm->gvl.waitq);
- vm->gvl.owner = 0;
- vm->gvl.timer = 0;
- vm->gvl.timer_err = ETIMEDOUT;
- vm->gvl.need_yield = 0;
- vm->gvl.wait_yield = 0;
+ rb_native_mutex_initialize(&gvl->lock);
+ rb_native_cond_initialize(&gvl->switch_cond);
+ rb_native_cond_initialize(&gvl->switch_wait_cond);
+ list_head_init(&gvl->waitq);
+ gvl->owner = 0;
+ gvl->timer = 0;
+ gvl->timer_err = ETIMEDOUT;
+ gvl->need_yield = 0;
+ gvl->wait_yield = 0;
}
static void
-gvl_destroy(rb_vm_t *vm)
+gvl_destroy(rb_global_vm_lock_t *gvl)
{
/*
* only called once at VM shutdown (not atfork), another thread
@@ -362,9 +352,9 @@ gvl_destroy(rb_vm_t *vm)
* the end of thread_start_func_2
*/
if (0) {
- rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
- rb_native_cond_destroy(&vm->gvl.switch_cond);
- rb_native_mutex_destroy(&vm->gvl.lock);
+ rb_native_cond_destroy(&gvl->switch_wait_cond);
+ rb_native_cond_destroy(&gvl->switch_cond);
+ rb_native_mutex_destroy(&gvl->lock);
}
clear_thread_cache_altstack();
}
@@ -372,11 +362,11 @@ gvl_destroy(rb_vm_t *vm)
#if defined(HAVE_WORKING_FORK)
static void thread_cache_reset(void);
static void
-gvl_atfork(rb_vm_t *vm)
+gvl_atfork(rb_global_vm_lock_t *gvl)
{
thread_cache_reset();
- gvl_init(vm);
- gvl_acquire(vm, GET_THREAD());
+ rb_gvl_init(gvl);
+ gvl_acquire(gvl, GET_THREAD());
}
#endif
@@ -415,8 +405,8 @@ rb_native_mutex_unlock(pthread_mutex_t *lock)
}
}
-static inline int
-native_mutex_trylock(pthread_mutex_t *lock)
+int
+rb_native_mutex_trylock(pthread_mutex_t *lock)
{
int r;
mutex_debug("trylock", lock);
@@ -513,8 +503,7 @@ rb_native_cond_wait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex)
}
static int
-native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex,
- const rb_hrtime_t *abs)
+native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, const rb_hrtime_t *abs)
{
int r;
struct timespec ts;
@@ -526,16 +515,24 @@ native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex,
* Let's hide it from arch generic code.
*/
do {
- r = pthread_cond_timedwait(cond, mutex, rb_hrtime2timespec(&ts, abs));
+ rb_hrtime2timespec(&ts, abs);
+ r = pthread_cond_timedwait(cond, mutex, &ts);
} while (r == EINTR);
if (r != 0 && r != ETIMEDOUT) {
- rb_bug_errno("pthread_cond_timedwait", r);
+ rb_bug_errno("pthread_cond_timedwait", r);
}
return r;
}
+void
+rb_native_cond_timedwait(rb_nativethread_cond_t *cond, pthread_mutex_t *mutex, unsigned long msec)
+{
+ rb_hrtime_t hrmsec = native_cond_timeout(cond, RB_HRTIME_PER_MSEC * msec);
+ native_cond_timedwait(cond, mutex, &hrmsec);
+}
+
static rb_hrtime_t
native_cond_timeout(rb_nativethread_cond_t *cond, const rb_hrtime_t rel)
{
@@ -570,6 +567,9 @@ ruby_thread_from_native(void)
static int
ruby_thread_set_native(rb_thread_t *th)
{
+ if (th && th->ec) {
+ rb_ractor_set_current_ec(th->ractor, th->ec);
+ }
return pthread_setspecific(ruby_native_thread_key, th) == 0;
}
@@ -587,8 +587,14 @@ Init_native_thread(rb_thread_t *th)
if (r) condattr_monotonic = NULL;
}
#endif
- pthread_key_create(&ruby_native_thread_key, 0);
+ if (pthread_key_create(&ruby_native_thread_key, 0) == EAGAIN) {
+ rb_bug("pthread_key_create failed (ruby_native_thread_key)");
+ }
+ if (pthread_key_create(&ruby_current_ec_key, 0) == EAGAIN) {
+ rb_bug("pthread_key_create failed (ruby_current_ec_key)");
+ }
th->thread_id = pthread_self();
+ ruby_thread_set_native(th);
fill_thread_id_str(th);
native_thread_init(th);
posix_signal(SIGVTALRM, null_func);
@@ -605,7 +611,6 @@ native_thread_init(rb_thread_t *th)
rb_native_cond_initialize(&nd->cond.gvlq);
if (&nd->cond.gvlq != &nd->cond.intr)
rb_native_cond_initialize(&nd->cond.intr);
- ruby_thread_set_native(th);
}
#ifndef USE_THREAD_CACHE
@@ -1116,7 +1121,7 @@ native_thread_create(rb_thread_t *th)
# endif
CHECK_ERR(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED));
- err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th);
+ err = pthread_create(&th->thread_id, &attr, thread_start_func_1, th);
thread_debug("create: %p (%d)\n", (void *)th, err);
/* should be done in the created thread */
fill_thread_id_str(th);
@@ -1207,7 +1212,7 @@ native_cond_sleep(rb_thread_t *th, rb_hrtime_t *rel)
}
end = native_cond_timeout(cond, *rel);
- native_cond_timedwait(cond, lock, &end);
+ native_cond_timedwait(cond, lock, &end);
}
}
th->unblock.func = 0;
@@ -1277,7 +1282,7 @@ static void
ubf_select(void *ptr)
{
rb_thread_t *th = (rb_thread_t *)ptr;
- rb_vm_t *vm = th->vm;
+ rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor);
const rb_thread_t *cur = ruby_thread_from_native(); /* may be 0 */
register_ubf_list(th);
@@ -1292,17 +1297,17 @@ ubf_select(void *ptr)
* sigwait_th thread, otherwise we can deadlock with a thread
* in unblock_function_clear.
*/
- if (cur != vm->gvl.timer && cur != sigwait_th) {
+ if (cur != gvl->timer && cur != sigwait_th) {
/*
* Double-checked locking above was to prevent nested locking
* by the SAME thread. We use trylock here to prevent deadlocks
* between DIFFERENT threads
*/
- if (native_mutex_trylock(&vm->gvl.lock) == 0) {
- if (!vm->gvl.timer) {
+ if (rb_native_mutex_trylock(&gvl->lock) == 0) {
+ if (!gvl->timer) {
rb_thread_wakeup_timer_thread(-1);
}
- rb_native_mutex_unlock(&vm->gvl.lock);
+ rb_native_mutex_unlock(&gvl->lock);
}
}
@@ -1471,7 +1476,7 @@ rb_thread_wakeup_timer_thread(int sig)
* on heap for maximum safety (and startup/shutdown speed)
*/
if (!vm) return;
- mth = vm->main_thread;
+ mth = vm->ractor.main_thread;
if (!mth || system_working <= 0) return;
/* this relies on GC for grace period before cont_free */
@@ -2063,12 +2068,12 @@ ubf_ppoll_sleep(void *ignore)
*/
#define GVL_UNLOCK_BEGIN_YIELD(th) do { \
const native_thread_data_t *next; \
- rb_vm_t *vm = th->vm; \
+ rb_global_vm_lock_t *gvl = rb_ractor_gvl(th->ractor); \
RB_GC_SAVE_MACHINE_CONTEXT(th); \
- rb_native_mutex_lock(&vm->gvl.lock); \
- next = gvl_release_common(vm); \
- rb_native_mutex_unlock(&vm->gvl.lock); \
- if (!next && vm_living_thread_num(vm) > 1) { \
+ rb_native_mutex_lock(&gvl->lock); \
+ next = gvl_release_common(gvl); \
+ rb_native_mutex_unlock(&gvl->lock); \
+ if (!next && rb_ractor_living_thread_num(th->ractor) > 1) { \
native_thread_yield(); \
}
@@ -2117,6 +2122,7 @@ static void
native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
{
int sigwait_fd = rb_sigwait_fd_get(th);
+ rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
if (sigwait_fd >= 0) {
rb_native_mutex_lock(&th->interrupt_lock);
@@ -2136,12 +2142,14 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
rb_sigwait_fd_put(th, sigwait_fd);
rb_sigwait_fd_migrate(th->vm);
}
- else if (th == th->vm->main_thread) { /* always able to handle signals */
+ else if (th == th->vm->ractor.main_thread) { /* always able to handle signals */
native_ppoll_sleep(th, rel);
}
else {
native_cond_sleep(th, rel);
}
+
+ rb_ractor_blocking_threads_dec(th->ractor, __FILE__, __LINE__);
}
#if UBF_TIMER == UBF_TIMER_PTHREAD
@@ -2149,7 +2157,7 @@ static void *
timer_pthread_fn(void *p)
{
rb_vm_t *vm = p;
- pthread_t main_thread_id = vm->main_thread->thread_id;
+ pthread_t main_thread_id = vm->ractor.main_thread->thread_id;
struct pollfd pfd;
int timeout = -1;
int ccp;