diff options
author | Koichi Sasada <ko1@atdot.net> | 2023-04-10 10:53:13 +0900 |
---|---|---|
committer | Koichi Sasada <ko1@atdot.net> | 2023-10-12 14:47:01 +0900 |
commit | be1bbd5b7d40ad863ab35097765d3754726bbd54 (patch) | |
tree | 2995a0859bea1d6b2903dcd324f41869dbef14a1 /ractor.c | |
parent | 096ee0648e215915a3019c2cd68ba220d94eca12 (diff) |
M:N thread scheduler for Ractors
This patch introduce M:N thread scheduler for Ractor system.
In general, M:N thread scheduler employs N native threads (OS threads)
to manage M user-level threads (Ruby threads in this case).
On the Ruby interpreter, 1 native thread is provided for 1 Ractor
and all Ruby threads are managed by the native thread.
From Ruby 1.9, the interpreter uses 1:1 thread scheduler which means
1 Ruby thread has 1 native thread. M:N scheduler change this strategy.
Because of compatibility issue (and stableness issue of the implementation)
main Ractor doesn't use M:N scheduler on default. On the other words,
threads on the main Ractor will be managed with 1:1 thread scheduler.
There are additional settings by environment variables:
`RUBY_MN_THREADS=1` enables M:N thread scheduler on the main ractor.
Note that non-main ractors use the M:N scheduler without this
configuration. With this configuration, single ractor applications
run threads on M:1 thread scheduler (green threads, user-level threads).
`RUBY_MAX_CPU=n` specifies maximum number of native threads for
M:N scheduler (default: 8).
This patch will be reverted soon if non-easy issues are found.
[Bug #19842]
Diffstat (limited to 'ractor.c')
-rw-r--r-- | ractor.c | 170 |
1 files changed, 117 insertions, 53 deletions
@@ -112,18 +112,16 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__) #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__) -static void -ractor_cond_wait(rb_ractor_t *r) +void +rb_ractor_lock_self(rb_ractor_t *r) { -#if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->sync.locked_by; - r->sync.locked_by = Qnil; -#endif - rb_native_cond_wait(&r->sync.cond, &r->sync.lock); + RACTOR_LOCK_SELF(r); +} -#if RACTOR_CHECK_MODE > 0 - r->sync.locked_by = locked_by; -#endif +void +rb_ractor_unlock_self(rb_ractor_t *r) +{ + RACTOR_UNLOCK_SELF(r); } // Ractor status @@ -243,7 +241,9 @@ ractor_free(void *ptr) rb_ractor_t *r = (rb_ractor_t *)ptr; RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); rb_native_mutex_destroy(&r->sync.lock); +#ifdef RUBY_THREAD_WIN32_H rb_native_cond_destroy(&r->sync.cond); +#endif ractor_queue_free(&r->sync.recv_queue); ractor_queue_free(&r->sync.takers_queue); ractor_local_storage_free(r); @@ -531,6 +531,19 @@ ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status) return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none; } +#ifdef RUBY_THREAD_PTHREAD_H +// thread_*.c +void rb_ractor_sched_wakeup(rb_ractor_t *r); +#else + +static void +rb_ractor_sched_wakeup(rb_ractor_t *r) +{ + rb_native_cond_broadcast(&r->sync.cond); +} +#endif + + static bool ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status) { @@ -544,7 +557,7 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra if (ractor_sleeping_by(r, wait_status)) { r->sync.wait.wakeup_status = wakeup_status; - rb_native_cond_broadcast(&r->sync.cond); + rb_ractor_sched_wakeup(r); return true; } else { @@ -552,6 +565,73 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra } } +static void +ractor_sleep_interrupt(void *ptr) +{ + rb_ractor_t *r = ptr; + + RACTOR_LOCK(r); + { + ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt); + } + RACTOR_UNLOCK(r); +} + +typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); + +static void +ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data) +{ + if (cr->sync.wait.status != wait_none) { + enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status; + cr->sync.wait.status = wait_none; + cr->sync.wait.wakeup_status = wakeup_by_interrupt; + + RACTOR_UNLOCK(cr); + { + if (cf_func) { + int state; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + rb_thread_check_ints(); + } + EC_POP_TAG(); + + if (state) { + (*cf_func)(cr, cf_data); + EC_JUMP_TAG(ec, state); + } + } + else { + rb_thread_check_ints(); + } + } + + // reachable? + RACTOR_LOCK(cr); + cr->sync.wait.status = prev_wait_status; + } +} + +#ifdef RUBY_THREAD_PTHREAD_H +void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf); +#else + +// win32 +static void +ractor_cond_wait(rb_ractor_t *r) +{ +#if RACTOR_CHECK_MODE > 0 + VALUE locked_by = r->sync.locked_by; + r->sync.locked_by = Qnil; +#endif + rb_native_cond_wait(&r->sync.cond, &r->sync.lock); + +#if RACTOR_CHECK_MODE > 0 + r->sync.locked_by = locked_by; +#endif +} + static void * ractor_sleep_wo_gvl(void *ptr) { @@ -569,18 +649,17 @@ ractor_sleep_wo_gvl(void *ptr) } static void -ractor_sleep_interrupt(void *ptr) +rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf) { - rb_ractor_t *r = ptr; - - RACTOR_LOCK(r); + RACTOR_UNLOCK(cr); { - ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt); + rb_nogvl(ractor_sleep_wo_gvl, cr, + ubf, cr, + RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL); } - RACTOR_UNLOCK(r); + RACTOR_LOCK(cr); } - -typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); +#endif static enum rb_ractor_wakeup_status ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status, @@ -600,40 +679,12 @@ ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_r RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status)); - RACTOR_UNLOCK(cr); - { - rb_nogvl(ractor_sleep_wo_gvl, cr, - ractor_sleep_interrupt, cr, - RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL); + while (cr->sync.wait.wakeup_status == wakeup_none) { + rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt); + ractor_check_ints(ec, cr, cf_func, cf_data); } - RACTOR_LOCK(cr); - - // rb_nogvl() can be canceled by interrupts - if (cr->sync.wait.status != wait_none) { - cr->sync.wait.status = wait_none; - cr->sync.wait.wakeup_status = wakeup_by_interrupt; - - RACTOR_UNLOCK(cr); - { - if (cf_func) { - int state; - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - rb_thread_check_ints(); - } - EC_POP_TAG(); - if (state) { - (*cf_func)(cr, cf_data); - EC_JUMP_TAG(ec, state); - } - } - else { - rb_thread_check_ints(); - } - } - RACTOR_LOCK(cr); // reachable? - } + cr->sync.wait.status = wait_none; // TODO: multi-thread wakeup_status = cr->sync.wait.wakeup_status; @@ -1943,7 +1994,7 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th) } #endif -void rb_thread_sched_init(struct rb_thread_sched *); +void rb_thread_sched_init(struct rb_thread_sched *, bool atfork); void rb_ractor_living_threads_init(rb_ractor_t *r) @@ -1959,11 +2010,15 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) ractor_queue_setup(&r->sync.recv_queue); ractor_queue_setup(&r->sync.takers_queue); rb_native_mutex_initialize(&r->sync.lock); + rb_native_cond_initialize(&r->barrier_wait_cond); + +#ifdef RUBY_THREAD_WIN32_H rb_native_cond_initialize(&r->sync.cond); rb_native_cond_initialize(&r->barrier_wait_cond); +#endif // thread management - rb_thread_sched_init(&r->threads.sched); + rb_thread_sched_init(&r->threads.sched, false); rb_ractor_living_threads_init(r); // naming @@ -2218,6 +2273,8 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c } } +void rb_threadptr_remove(rb_thread_t *th); + void rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) { @@ -2225,6 +2282,8 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt); ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__); + rb_threadptr_remove(th); + if (cr->threads.cnt == 1) { vm_remove_ractor(th->vm, cr); } @@ -2327,6 +2386,9 @@ ractor_terminal_interrupt_all(rb_vm_t *vm) } } +void rb_add_running_thread(rb_thread_t *th); +void rb_del_running_thread(rb_thread_t *th); + void rb_ractor_terminate_all(void) { @@ -2354,7 +2416,9 @@ rb_ractor_terminate_all(void) // wait for 1sec rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); + rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */); + rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); ractor_terminal_interrupt_all(vm); |