summaryrefslogtreecommitdiff
path: root/ractor.c
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2023-04-10 10:53:13 +0900
committerKoichi Sasada <ko1@atdot.net>2023-10-12 14:47:01 +0900
commitbe1bbd5b7d40ad863ab35097765d3754726bbd54 (patch)
tree2995a0859bea1d6b2903dcd324f41869dbef14a1 /ractor.c
parent096ee0648e215915a3019c2cd68ba220d94eca12 (diff)
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system. In general, M:N thread scheduler employs N native threads (OS threads) to manage M user-level threads (Ruby threads in this case). On the Ruby interpreter, 1 native thread is provided for 1 Ractor and all Ruby threads are managed by the native thread. From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means 1 Ruby thread has 1 native thread. M:N scheduler change this strategy. Because of compatibility issue (and stableness issue of the implementation) main Ractor doesn't use M:N scheduler on default. On the other words, threads on the main Ractor will be managed with 1:1 thread scheduler. There are additional settings by environment variables: `RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor. Note that non-main ractors use the M:N scheduler without this configuration. With this configuration, single ractor applications run threads on M:1 thread scheduler (green threads, user-level threads). `RUBY_MAX_CPU=n` specifies maximum number of native threads for M:N scheduler (default: 8). This patch will be reverted soon if non-easy issues are found. [Bug #19842]
Diffstat (limited to 'ractor.c')
-rw-r--r--ractor.c170
1 files changed, 117 insertions, 53 deletions
diff --git a/ractor.c b/ractor.c
index e0919ac56b..b4787e7a01 100644
--- a/ractor.c
+++ b/ractor.c
@@ -112,18 +112,16 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
-static void
-ractor_cond_wait(rb_ractor_t *r)
+void
+rb_ractor_lock_self(rb_ractor_t *r)
{
-#if RACTOR_CHECK_MODE > 0
- VALUE locked_by = r->sync.locked_by;
- r->sync.locked_by = Qnil;
-#endif
- rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
+ RACTOR_LOCK_SELF(r);
+}
-#if RACTOR_CHECK_MODE > 0
- r->sync.locked_by = locked_by;
-#endif
+void
+rb_ractor_unlock_self(rb_ractor_t *r)
+{
+ RACTOR_UNLOCK_SELF(r);
}
// Ractor status
@@ -243,7 +241,9 @@ ractor_free(void *ptr)
rb_ractor_t *r = (rb_ractor_t *)ptr;
RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
rb_native_mutex_destroy(&r->sync.lock);
+#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_destroy(&r->sync.cond);
+#endif
ractor_queue_free(&r->sync.recv_queue);
ractor_queue_free(&r->sync.takers_queue);
ractor_local_storage_free(r);
@@ -531,6 +531,19 @@ ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
}
+#ifdef RUBY_THREAD_PTHREAD_H
+// thread_*.c
+void rb_ractor_sched_wakeup(rb_ractor_t *r);
+#else
+
+static void
+rb_ractor_sched_wakeup(rb_ractor_t *r)
+{
+ rb_native_cond_broadcast(&r->sync.cond);
+}
+#endif
+
+
static bool
ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
{
@@ -544,7 +557,7 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra
if (ractor_sleeping_by(r, wait_status)) {
r->sync.wait.wakeup_status = wakeup_status;
- rb_native_cond_broadcast(&r->sync.cond);
+ rb_ractor_sched_wakeup(r);
return true;
}
else {
@@ -552,6 +565,73 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra
}
}
+static void
+ractor_sleep_interrupt(void *ptr)
+{
+ rb_ractor_t *r = ptr;
+
+ RACTOR_LOCK(r);
+ {
+ ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
+ }
+ RACTOR_UNLOCK(r);
+}
+
+typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
+
+static void
+ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data)
+{
+ if (cr->sync.wait.status != wait_none) {
+ enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status;
+ cr->sync.wait.status = wait_none;
+ cr->sync.wait.wakeup_status = wakeup_by_interrupt;
+
+ RACTOR_UNLOCK(cr);
+ {
+ if (cf_func) {
+ int state;
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ rb_thread_check_ints();
+ }
+ EC_POP_TAG();
+
+ if (state) {
+ (*cf_func)(cr, cf_data);
+ EC_JUMP_TAG(ec, state);
+ }
+ }
+ else {
+ rb_thread_check_ints();
+ }
+ }
+
+ // reachable?
+ RACTOR_LOCK(cr);
+ cr->sync.wait.status = prev_wait_status;
+ }
+}
+
+#ifdef RUBY_THREAD_PTHREAD_H
+void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
+#else
+
+// win32
+static void
+ractor_cond_wait(rb_ractor_t *r)
+{
+#if RACTOR_CHECK_MODE > 0
+ VALUE locked_by = r->sync.locked_by;
+ r->sync.locked_by = Qnil;
+#endif
+ rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
+
+#if RACTOR_CHECK_MODE > 0
+ r->sync.locked_by = locked_by;
+#endif
+}
+
static void *
ractor_sleep_wo_gvl(void *ptr)
{
@@ -569,18 +649,17 @@ ractor_sleep_wo_gvl(void *ptr)
}
static void
-ractor_sleep_interrupt(void *ptr)
+rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
{
- rb_ractor_t *r = ptr;
-
- RACTOR_LOCK(r);
+ RACTOR_UNLOCK(cr);
{
- ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
+ rb_nogvl(ractor_sleep_wo_gvl, cr,
+ ubf, cr,
+ RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
}
- RACTOR_UNLOCK(r);
+ RACTOR_LOCK(cr);
}
-
-typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
+#endif
static enum rb_ractor_wakeup_status
ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
@@ -600,40 +679,12 @@ ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_r
RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
- RACTOR_UNLOCK(cr);
- {
- rb_nogvl(ractor_sleep_wo_gvl, cr,
- ractor_sleep_interrupt, cr,
- RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
+ while (cr->sync.wait.wakeup_status == wakeup_none) {
+ rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
+ ractor_check_ints(ec, cr, cf_func, cf_data);
}
- RACTOR_LOCK(cr);
-
- // rb_nogvl() can be canceled by interrupts
- if (cr->sync.wait.status != wait_none) {
- cr->sync.wait.status = wait_none;
- cr->sync.wait.wakeup_status = wakeup_by_interrupt;
-
- RACTOR_UNLOCK(cr);
- {
- if (cf_func) {
- int state;
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- rb_thread_check_ints();
- }
- EC_POP_TAG();
- if (state) {
- (*cf_func)(cr, cf_data);
- EC_JUMP_TAG(ec, state);
- }
- }
- else {
- rb_thread_check_ints();
- }
- }
- RACTOR_LOCK(cr); // reachable?
- }
+ cr->sync.wait.status = wait_none;
// TODO: multi-thread
wakeup_status = cr->sync.wait.wakeup_status;
@@ -1943,7 +1994,7 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
}
#endif
-void rb_thread_sched_init(struct rb_thread_sched *);
+void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
void
rb_ractor_living_threads_init(rb_ractor_t *r)
@@ -1959,11 +2010,15 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
ractor_queue_setup(&r->sync.recv_queue);
ractor_queue_setup(&r->sync.takers_queue);
rb_native_mutex_initialize(&r->sync.lock);
+ rb_native_cond_initialize(&r->barrier_wait_cond);
+
+#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond);
+#endif
// thread management
- rb_thread_sched_init(&r->threads.sched);
+ rb_thread_sched_init(&r->threads.sched, false);
rb_ractor_living_threads_init(r);
// naming
@@ -2218,6 +2273,8 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c
}
}
+void rb_threadptr_remove(rb_thread_t *th);
+
void
rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
{
@@ -2225,6 +2282,8 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
+ rb_threadptr_remove(th);
+
if (cr->threads.cnt == 1) {
vm_remove_ractor(th->vm, cr);
}
@@ -2327,6 +2386,9 @@ ractor_terminal_interrupt_all(rb_vm_t *vm)
}
}
+void rb_add_running_thread(rb_thread_t *th);
+void rb_del_running_thread(rb_thread_t *th);
+
void
rb_ractor_terminate_all(void)
{
@@ -2354,7 +2416,9 @@ rb_ractor_terminate_all(void)
// wait for 1sec
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
+ rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
+ rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
ractor_terminal_interrupt_all(vm);