diff options
Diffstat (limited to 'ractor.c')
| -rw-r--r-- | ractor.c | 2292 |
1 files changed, 1089 insertions, 1203 deletions
@@ -3,41 +3,51 @@ #include "ruby/ruby.h" #include "ruby/thread.h" #include "ruby/ractor.h" +#include "ruby/re.h" #include "ruby/thread_native.h" #include "vm_core.h" #include "vm_sync.h" #include "ractor_core.h" #include "internal/complex.h" #include "internal/error.h" +#include "internal/gc.h" #include "internal/hash.h" +#include "internal/object.h" +#include "internal/ractor.h" #include "internal/rational.h" #include "internal/struct.h" +#include "internal/st.h" +#include "internal/thread.h" #include "variable.h" -#include "gc.h" -#include "transient_heap.h" +#include "yjit.h" +#include "zjit.h" VALUE rb_cRactor; +static VALUE rb_cRactorSelector; + +VALUE rb_eRactorUnsafeError; +VALUE rb_eRactorIsolationError; static VALUE rb_eRactorError; static VALUE rb_eRactorRemoteError; static VALUE rb_eRactorMovedError; static VALUE rb_eRactorClosedError; static VALUE rb_cRactorMovedObject; -VALUE rb_eRactorUnsafeError; - -VALUE -rb_ractor_error_class(void) -{ - return rb_eRactorError; -} static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line); + +#if RACTOR_CHECK_MODE > 0 +bool rb_ractor_ignore_belonging_flag = false; +#endif + +// Ractor locking + static void ASSERT_ractor_unlocking(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 - // GET_EC is NULL in an MJIT worker - if (GET_EC() != NULL && r->sync.locked_by == GET_RACTOR()->self) { + const rb_execution_context_t *ec = rb_current_ec_noinline(); + if (ec != NULL && r->sync.locked_by == rb_ractor_self(rb_ec_ractor_ptr(ec))) { rb_bug("recursive ractor locking"); } #endif @@ -47,8 +57,8 @@ static void ASSERT_ractor_locking(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 - // GET_EC is NULL in an MJIT worker - if (GET_EC() != NULL && r->sync.locked_by != GET_RACTOR()->self) { + const rb_execution_context_t *ec = rb_current_ec_noinline(); + if (ec != NULL && r->sync.locked_by != rb_ractor_self(rb_ec_ractor_ptr(ec))) { rp(r->sync.locked_by); rb_bug("ractor lock is not acquired."); } @@ -58,25 +68,35 @@ ASSERT_ractor_locking(rb_ractor_t *r) static void ractor_lock(rb_ractor_t *r, const char *file, int line) { - RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); + RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); ASSERT_ractor_unlocking(r); rb_native_mutex_lock(&r->sync.lock); + const rb_execution_context_t *ec = rb_current_ec_noinline(); + if (ec) { + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + VM_ASSERT(!cr->malloc_gc_disabled); + cr->malloc_gc_disabled = true; + } + #if RACTOR_CHECK_MODE > 0 - if (GET_EC() != NULL) { // GET_EC is NULL in an MJIT worker - r->sync.locked_by = GET_RACTOR()->self; + if (ec != NULL) { + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + r->sync.locked_by = rb_ractor_self(cr); } #endif - RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); + RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); } static void ractor_lock_self(rb_ractor_t *cr, const char *file, int line) { - VM_ASSERT(cr == GET_RACTOR()); - VM_ASSERT(cr->sync.locked_by != cr->self); + VM_ASSERT(cr == rb_ec_ractor_ptr(rb_current_ec_noinline())); +#if RACTOR_CHECK_MODE > 0 + VM_ASSERT(cr->sync.locked_by != cr->pub.self); +#endif ractor_lock(cr, file, line); } @@ -87,16 +107,26 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line) #if RACTOR_CHECK_MODE > 0 r->sync.locked_by = Qnil; #endif + + const rb_execution_context_t *ec = rb_current_ec_noinline(); + if (ec) { + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + VM_ASSERT(cr->malloc_gc_disabled); + cr->malloc_gc_disabled = false; + } + rb_native_mutex_unlock(&r->sync.lock); - RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); + RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); } static void ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) { - VM_ASSERT(cr == GET_RACTOR()); - VM_ASSERT(cr->sync.locked_by == cr->self); + VM_ASSERT(cr == rb_ec_ractor_ptr(rb_current_ec_noinline())); +#if RACTOR_CHECK_MODE > 0 + VM_ASSERT(cr->sync.locked_by == cr->pub.self); +#endif ractor_unlock(cr, file, line); } @@ -105,20 +135,20 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__) #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__) -static void -ractor_cond_wait(rb_ractor_t *r) +void +rb_ractor_lock_self(rb_ractor_t *r) { -#if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->sync.locked_by; - r->sync.locked_by = Qnil; -#endif - rb_native_cond_wait(&r->sync.cond, &r->sync.lock); + RACTOR_LOCK_SELF(r); +} -#if RACTOR_CHECK_MODE > 0 - r->sync.locked_by = locked_by; -#endif +void +rb_ractor_unlock_self(rb_ractor_t *r) +{ + RACTOR_UNLOCK_SELF(r); } +// Ractor status + static const char * ractor_status_str(enum ractor_status status) { @@ -134,7 +164,7 @@ ractor_status_str(enum ractor_status status) static void ractor_status_set(rb_ractor_t *r, enum ractor_status status) { - RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->id, ractor_status_str(r->status_), ractor_status_str(status)); + RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status)); // check 1 if (r->status_ != ractor_created) { @@ -155,7 +185,7 @@ ractor_status_set(rb_ractor_t *r, enum ractor_status status) VM_ASSERT(status == ractor_running); break; case ractor_terminated: - VM_ASSERT(0); // unreachable + rb_bug("unreachable"); break; } @@ -168,80 +198,110 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) return rb_ractor_status_p(r, status); } -static void -ractor_queue_mark(struct rb_ractor_queue *rq) -{ - for (int i=0; i<rq->cnt; i++) { - int idx = (rq->start + i) % rq->size; - rb_gc_mark(rq->baskets[idx].v); - rb_gc_mark(rq->baskets[idx].sender); - } -} +// Ractor data/mark/free static void ractor_local_storage_mark(rb_ractor_t *r); static void ractor_local_storage_free(rb_ractor_t *r); +static void ractor_sync_mark(rb_ractor_t *r); +static void ractor_sync_free(rb_ractor_t *r); +static size_t ractor_sync_memsize(const rb_ractor_t *r); +static void ractor_sync_init(rb_ractor_t *r); + +static int +mark_targeted_hook_list(st_data_t key, st_data_t value, st_data_t _arg) +{ + rb_hook_list_t *hook_list = (rb_hook_list_t*)value; + + if (hook_list->type == hook_list_type_targeted_iseq) { + rb_gc_mark((VALUE)key); + } + else { + rb_method_definition_t *def = (rb_method_definition_t*)key; + RUBY_ASSERT(hook_list->type == hook_list_type_targeted_def); + rb_gc_mark(def->body.bmethod.proc); + } + rb_hook_list_mark(hook_list); + + return ST_CONTINUE; +} + static void ractor_mark(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; + bool checking_shareable = rb_gc_checking_shareable(); - ractor_queue_mark(&r->sync.incoming_queue); - rb_gc_mark(r->sync.wait.taken_basket.v); - rb_gc_mark(r->sync.wait.taken_basket.sender); - rb_gc_mark(r->sync.wait.yielded_basket.v); - rb_gc_mark(r->sync.wait.yielded_basket.sender); rb_gc_mark(r->loc); rb_gc_mark(r->name); - rb_gc_mark(r->r_stdin); - rb_gc_mark(r->r_stdout); - rb_gc_mark(r->r_stderr); - - if (r->threads.cnt > 0) { - rb_thread_t *th = 0; - list_for_each(&r->threads.set, th, lt_node) { - VM_ASSERT(th != NULL); - rb_gc_mark(th->self); + + if (!checking_shareable) { + // may unshareable objects + rb_gc_mark(r->r_stdin); + rb_gc_mark(r->r_stdout); + rb_gc_mark(r->r_stderr); + rb_gc_mark(r->verbose); + rb_gc_mark(r->debug); + + // mark received messages + ractor_sync_mark(r); + + rb_hook_list_mark(&r->pub.hooks); + if (r->pub.targeted_hooks.num_entries) { + st_foreach(&r->pub.targeted_hooks, mark_targeted_hook_list, 0); + } + + if (r->threads.cnt > 0) { + rb_thread_t *th = 0; + ccan_list_for_each(&r->threads.set, th, lt_node) { + VM_ASSERT(th != NULL); + rb_gc_mark(th->self); + } } - } - ractor_local_storage_mark(r); + ractor_local_storage_mark(r); + } } -static void -ractor_queue_free(struct rb_ractor_queue *rq) +static int +free_targeted_hook_lists(st_data_t key, st_data_t val, st_data_t _arg) { - free(rq->baskets); + rb_hook_list_t *hook_list = (rb_hook_list_t*)val; + rb_hook_list_free(hook_list); + return ST_DELETE; } static void -ractor_waiting_list_free(struct rb_ractor_waiting_list *wl) +free_targeted_hooks(st_table *hooks_tbl) { - free(wl->ractors); + st_foreach(hooks_tbl, free_targeted_hook_lists, 0); } static void ractor_free(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; + RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); + free_targeted_hooks(&r->pub.targeted_hooks); rb_native_mutex_destroy(&r->sync.lock); - rb_native_cond_destroy(&r->sync.cond); - ractor_queue_free(&r->sync.incoming_queue); - ractor_waiting_list_free(&r->sync.taking_ractors); +#ifdef RUBY_THREAD_WIN32_H + rb_native_cond_destroy(&r->sync.wakeup_cond); +#endif ractor_local_storage_free(r); - ruby_xfree(r); -} + rb_hook_list_free(&r->pub.hooks); + rb_st_free_embedded_table(&r->pub.targeted_hooks); -static size_t -ractor_queue_memsize(const struct rb_ractor_queue *rq) -{ - return sizeof(struct rb_ractor_basket) * rq->size; -} + if (r->newobj_cache) { + RUBY_ASSERT(r == ruby_single_main_ractor); -static size_t -ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl) -{ - return sizeof(rb_ractor_t *) * wl->size; + rb_gc_ractor_cache_free(r->newobj_cache); + r->newobj_cache = NULL; + } + + ractor_sync_free(r); + if (!r->main_ractor) { + SIZED_FREE(r); + } } static size_t @@ -249,17 +309,15 @@ ractor_memsize(const void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - // TODO - return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->sync.incoming_queue) + - ractor_waiting_list_memsize(&r->sync.taking_ractors); + // TODO: more correct? + return sizeof(rb_ractor_t) + ractor_sync_memsize(r); } static const rb_data_type_t ractor_data_type = { "ractor", { ractor_mark, - ractor_free, + ractor_free, ractor_memsize, NULL, // update }, @@ -281,828 +339,27 @@ static inline rb_ractor_t * RACTOR_PTR(VALUE self) { VM_ASSERT(rb_ractor_p(self)); - rb_ractor_t *r = DATA_PTR(self); - // TODO: check return r; } -uint32_t -rb_ractor_id(const rb_ractor_t *g) -{ - return g->id; -} - -static uint32_t ractor_last_id; +#define MAIN_RACTOR_ID 1 +static rb_atomic_t ractor_last_id = MAIN_RACTOR_ID; #if RACTOR_CHECK_MODE > 0 -MJIT_FUNC_EXPORTED uint32_t +uint32_t rb_ractor_current_id(void) { if (GET_THREAD()->ractor == NULL) { return 1; // main ractor } else { - return GET_RACTOR()->id; + return rb_ractor_id(GET_RACTOR()); } } #endif -static void -ractor_queue_setup(struct rb_ractor_queue *rq) -{ - rq->size = 2; - rq->cnt = 0; - rq->start = 0; - rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); -} - -static bool -ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - return rq->cnt == 0; -} - -static bool -ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - bool b; - - RACTOR_LOCK(r); - { - if (!ractor_queue_empty_p(r, rq)) { - *basket = rq->baskets[rq->start]; - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - b = true; - } - else { - b = false; - } - } - RACTOR_UNLOCK(r); - - return b; -} - -static void -ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - if (rq->size <= rq->cnt) { - rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2); - for (int i=rq->size - rq->start; i<rq->cnt; i++) { - rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size]; - } - rq->size *= 2; - } - rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket; - // fprintf(stderr, "%s %p->cnt:%d\n", __func__, rq, rq->cnt); -} - -static void -ractor_basket_clear(struct rb_ractor_basket *b) -{ - b->type = basket_type_none; - b->v = Qfalse; - b->sender = Qfalse; -} - -static VALUE ractor_reset_belonging(VALUE obj); // in this file - -static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) -{ - VALUE v; - - switch (b->type) { - case basket_type_ref: - VM_ASSERT(rb_ractor_shareable_p(b->v)); - v = b->v; - break; - case basket_type_copy: - case basket_type_move: - case basket_type_will: - v = ractor_reset_belonging(b->v); - break; - default: - rb_bug("unreachable"); - } - - if (b->exception) { - VALUE cause = v; - VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); - rb_ivar_set(err, rb_intern("@ractor"), b->sender); - ractor_basket_clear(b); - rb_ec_setup_exception(NULL, err, cause); - rb_exc_raise(err); - } - - ractor_basket_clear(b); - return v; -} - -static VALUE -ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) -{ - struct rb_ractor_queue *rq = &r->sync.incoming_queue; - struct rb_ractor_basket basket; - - if (ractor_queue_deq(r, rq, &basket) == false) { - if (r->sync.incoming_port_closed) { - rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); - } - else { - return Qundef; - } - } - - return ractor_basket_accept(&basket); -} - -static void * -ractor_sleep_wo_gvl(void *ptr) -{ - rb_ractor_t *cr = ptr; - RACTOR_LOCK_SELF(cr); - VM_ASSERT(cr->sync.wait.status != wait_none); - if (cr->sync.wait.wakeup_status == wakeup_none) { - ractor_cond_wait(cr); - } - cr->sync.wait.status = wait_none; - RACTOR_UNLOCK_SELF(cr); - return NULL; -} - -static void -ractor_sleep_interrupt(void *ptr) -{ - rb_ractor_t *r = ptr; - - RACTOR_LOCK(r); - if (r->sync.wait.wakeup_status == wakeup_none) { - r->sync.wait.wakeup_status = wakeup_by_interrupt; - rb_native_cond_signal(&r->sync.cond); - } - RACTOR_UNLOCK(r); -} - -#if USE_RUBY_DEBUG_LOG -static const char * -wait_status_str(enum ractor_wait_status wait_status) -{ - switch ((int)wait_status) { - case wait_none: return "none"; - case wait_receiving: return "receiving"; - case wait_taking: return "taking"; - case wait_yielding: return "yielding"; - case wait_receiving|wait_taking: return "receiving|taking"; - case wait_receiving|wait_yielding: return "receiving|yielding"; - case wait_taking|wait_yielding: return "taking|yielding"; - case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; - } - rb_bug("unrechable"); -} - -static const char * -wakeup_status_str(enum ractor_wakeup_status wakeup_status) -{ - switch (wakeup_status) { - case wakeup_none: return "none"; - case wakeup_by_send: return "by_send"; - case wakeup_by_yield: return "by_yield"; - case wakeup_by_take: return "by_take"; - case wakeup_by_close: return "by_close"; - case wakeup_by_interrupt: return "by_interrupt"; - case wakeup_by_retry: return "by_retry"; - } - rb_bug("unrechable"); -} -#endif // USE_RUBY_DEBUG_LOG - -static void -ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) -{ - VM_ASSERT(GET_RACTOR() == cr); - VM_ASSERT(cr->sync.wait.status != wait_none); - // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr, - // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); - - RACTOR_UNLOCK(cr); - rb_nogvl(ractor_sleep_wo_gvl, cr, - ractor_sleep_interrupt, cr, - RB_NOGVL_UBF_ASYNC_SAFE); - RACTOR_LOCK(cr); -} - -static bool -ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status) -{ - return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none; -} - -static bool -ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_wakeup_status wakeup_status) -{ - ASSERT_ractor_locking(r); - - // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r, - // wait_status_str(r->sync.wait.status), wait_status_str(wait_status), - // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status)); - - if (ractor_sleeping_by(r, wait_status)) { - r->sync.wait.wakeup_status = wakeup_status; - rb_native_cond_signal(&r->sync.cond); - return true; - } - else { - return false; - } -} - -static void -ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr) -{ - VM_ASSERT(cr == GET_RACTOR()); - bool retry_try = false; - - RACTOR_LOCK(r); - { - if (ractor_sleeping_by(r, wait_yielding)) { - // already waiting for yielding. retry try_take. - retry_try = true; - } - else { - // insert cr into taking list - struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors; - - for (int i=0; i<wl->cnt; i++) { - if (wl->ractors[i] == cr) { - // TODO: make it clean code. - rb_native_mutex_unlock(&r->sync.lock); - rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting."); - } - } - - if (wl->size == 0) { - wl->size = 1; - wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size); - if (wl->ractors == NULL) rb_bug("can't allocate buffer"); - } - else if (wl->size <= wl->cnt + 1) { - wl->size *= 2; - wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size); - if (wl->ractors == NULL) rb_bug("can't re-allocate buffer"); - } - wl->ractors[wl->cnt++] = cr; - } - } - RACTOR_UNLOCK(r); - - if (retry_try) { - RACTOR_LOCK(cr); - { - if (cr->sync.wait.wakeup_status == wakeup_none) { - VM_ASSERT(cr->sync.wait.status != wait_none); - - cr->sync.wait.wakeup_status = wakeup_by_retry; - cr->sync.wait.status = wait_none; - } - } - RACTOR_UNLOCK(cr); - } -} - -static void -ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr) -{ - RACTOR_LOCK(r); - { - int pos = -1; - for (int i=0; i<wl->cnt; i++) { - if (wl->ractors[i] == wr) { - pos = i; - break; - } - } - if (pos >= 0) { // found - wl->cnt--; - for (int i=pos; i<wl->cnt; i++) { - wl->ractors[i] = wl->ractors[i+1]; - } - } - } - RACTOR_UNLOCK(r); -} - -static rb_ractor_t * -ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) -{ - ASSERT_ractor_locking(r); - VM_ASSERT(&r->sync.taking_ractors == wl); - - if (wl->cnt > 0) { - rb_ractor_t *tr = wl->ractors[0]; - for (int i=1; i<wl->cnt; i++) { - wl->ractors[i-1] = wl->ractors[i]; - } - wl->cnt--; - return tr; - } - else { - return NULL; - } -} - -static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VM_ASSERT(r == rb_ec_ractor_ptr(ec)); - VALUE v; - - while ((v = ractor_try_receive(ec, r)) == Qundef) { - RACTOR_LOCK(r); - { - if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) { - VM_ASSERT(r->sync.wait.status == wait_none); - r->sync.wait.status = wait_receiving; - r->sync.wait.wakeup_status = wakeup_none; - - ractor_sleep(ec, r); - - r->sync.wait.wakeup_status = wakeup_none; - } - } - RACTOR_UNLOCK(r); - } - - return v; -} - -static void -ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) -{ - bool closed = false; - struct rb_ractor_queue *rq = &r->sync.incoming_queue; - - RACTOR_LOCK(r); - { - if (r->sync.incoming_port_closed) { - closed = true; - } - else { - ractor_queue_enq(r, rq, b); - if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) { - RUBY_DEBUG_LOG("wakeup", 0); - } - } - } - RACTOR_UNLOCK(r); - - if (closed) { - rb_raise(rb_eRactorClosedError, "The incoming-port is already closed"); - } -} - -static VALUE ractor_move(VALUE obj); // in this file -static VALUE ractor_copy(VALUE obj); // in this file - -static void -ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will) -{ - basket->sender = rb_ec_ractor_ptr(ec)->self; - basket->exception = exc; - - if (is_will) { - basket->type = basket_type_will; - basket->v = obj; - } - else if (rb_ractor_shareable_p(obj)) { - basket->type = basket_type_ref; - basket->v = obj; - } - else if (!RTEST(move)) { - basket->v = ractor_copy(obj); - basket->type = basket_type_copy; - } - else { - basket->type = basket_type_move; - basket->v = ractor_move(obj); - } -} - -static VALUE -ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) -{ - struct rb_ractor_basket basket; - ractor_basket_setup(ec, &basket, obj, move, false, false); - ractor_send_basket(ec, r, &basket); - return r->self; -} - -static VALUE -ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r) -{ - struct rb_ractor_basket basket = { - .type = basket_type_none, - }; - bool closed = false; - - RACTOR_LOCK(r); - { - if (ractor_wakeup(r, wait_yielding, wakeup_by_take)) { - VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none); - basket = r->sync.wait.yielded_basket; - ractor_basket_clear(&r->sync.wait.yielded_basket); - } - else if (r->sync.outgoing_port_closed) { - closed = true; - } - else { - // not reached. - } - } - RACTOR_UNLOCK(r); - - if (basket.type == basket_type_none) { - if (closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - else { - return Qundef; - } - } - else { - return ractor_basket_accept(&basket); - } -} - -static bool -ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_unlocking(cr); - VM_ASSERT(basket->type != basket_type_none); - - if (cr->sync.outgoing_port_closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - - rb_ractor_t *r; - - retry_shift: - RACTOR_LOCK(cr); - { - r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors); - } - RACTOR_UNLOCK(cr); - - if (r) { - bool retry_shift = false; - - RACTOR_LOCK(r); - { - if (ractor_wakeup(r, wait_taking, wakeup_by_yield)) { - VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none); - r->sync.wait.taken_basket = *basket; - } - else { - retry_shift = true; - } - } - RACTOR_UNLOCK(r); - - if (retry_shift) { - // get candidate take-waiting ractor, but already woke up by another reason. - // retry to check another ractor. - goto retry_shift; - } - else { - return true; - } - } - else { - return false; - } -} - -// select(r1, r2, r3, receive: true, yield: obj) -static VALUE -ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yielded_value, bool move, VALUE *ret_r) -{ - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - VALUE crv = cr->self; - VALUE ret = Qundef; - int i; - bool interrupted = false; - enum ractor_wait_status wait_status = 0; - bool yield_p = (yielded_value != Qundef) ? true : false; - const int rs_len = alen; - - struct ractor_select_action { - enum ractor_select_action_type { - ractor_select_action_take, - ractor_select_action_receive, - ractor_select_action_yield, - } type; - VALUE v; - } *actions = ALLOCA_N(struct ractor_select_action, alen + (yield_p ? 1 : 0)); - - VM_ASSERT(cr->sync.wait.status == wait_none); - VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); - - // setup actions - for (i=0; i<alen; i++) { - VALUE v = rs[i]; - - if (v == crv) { - actions[i].type = ractor_select_action_receive; - actions[i].v = Qnil; - wait_status |= wait_receiving; - } - else if (rb_ractor_p(v)) { - actions[i].type = ractor_select_action_take; - actions[i].v = v; - wait_status |= wait_taking; - } - else { - rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v); - } - } - rs = NULL; - - restart: - - if (yield_p) { - actions[rs_len].type = ractor_select_action_yield; - actions[rs_len].v = Qundef; - wait_status |= wait_yielding; - alen++; - - ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false); - } - - // TODO: shuffle actions - - while (1) { - RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status)); - - for (i=0; i<alen; i++) { - VALUE v, rv; - switch (actions[i].type) { - case ractor_select_action_take: - rv = actions[i].v; - v = ractor_try_take(ec, RACTOR_PTR(rv)); - if (v != Qundef) { - *ret_r = rv; - ret = v; - goto cleanup; - } - break; - case ractor_select_action_receive: - v = ractor_try_receive(ec, cr); - if (v != Qundef) { - *ret_r = ID2SYM(rb_intern("receive")); - ret = v; - goto cleanup; - } - break; - case ractor_select_action_yield: - { - if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) { - *ret_r = ID2SYM(rb_intern("yield")); - ret = Qnil; - goto cleanup; - } - } - break; - } - } - - RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status)); - - RACTOR_LOCK(cr); - { - VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_status; - cr->sync.wait.wakeup_status == wakeup_none; - } - RACTOR_UNLOCK(cr); - - // prepare waiting - for (i=0; i<alen; i++) { - rb_ractor_t *r; - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - ractor_register_taking(r, cr); - break; - case ractor_select_action_yield: - case ractor_select_action_receive: - break; - } - } - - // wait - RACTOR_LOCK(cr); - { - if (cr->sync.wait.wakeup_status == wakeup_none) { - for (i=0; i<alen; i++) { - rb_ractor_t *r; - - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - if (ractor_sleeping_by(r, wait_yielding)) { - RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->id); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - break; - case ractor_select_action_receive: - if (cr->sync.incoming_queue.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - break; - case ractor_select_action_yield: - if (cr->sync.taking_ractors.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - else if (cr->sync.outgoing_port_closed) { - cr->sync.wait.wakeup_status = wakeup_by_close; - goto skip_sleep; - } - break; - } - } - - RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status)); - ractor_sleep(ec, cr); - RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status)); - } - else { - skip_sleep: - RUBY_DEBUG_LOG("no need to sleep %s->%s", - wait_status_str(cr->sync.wait.status), - wakeup_status_str(cr->sync.wait.wakeup_status)); - cr->sync.wait.status = wait_none; - } - } - RACTOR_UNLOCK(cr); - - // cleanup waiting - for (i=0; i<alen; i++) { - rb_ractor_t *r; - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - ractor_waiting_list_del(r, &r->sync.taking_ractors, cr); - break; - case ractor_select_action_receive: - case ractor_select_action_yield: - break; - } - } - - // check results - enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status; - cr->sync.wait.wakeup_status = wakeup_none; - - switch (wakeup_status) { - case wakeup_none: - // OK. something happens. - // retry loop. - break; - case wakeup_by_retry: - // Retry request. - break; - case wakeup_by_send: - // OK. - // retry loop and try_receive will succss. - break; - case wakeup_by_yield: - // take was succeeded! - // cr.wait.taken_basket contains passed block - VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none); - *ret_r = cr->sync.wait.taken_basket.sender; - VM_ASSERT(rb_ractor_p(*ret_r)); - ret = ractor_basket_accept(&cr->sync.wait.taken_basket); - goto cleanup; - case wakeup_by_take: - *ret_r = ID2SYM(rb_intern("yield")); - ret = Qnil; - goto cleanup; - case wakeup_by_close: - // OK. - // retry loop and will get CloseError. - break; - case wakeup_by_interrupt: - ret = Qundef; - interrupted = true; - goto cleanup; - } - } - - cleanup: - RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status)); - - if (cr->sync.wait.yielded_basket.type != basket_type_none) { - ractor_basket_clear(&cr->sync.wait.yielded_basket); - } - - VM_ASSERT(cr->sync.wait.status == wait_none); - VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); - - if (interrupted) { - rb_vm_check_ints_blocking(ec); - interrupted = false; - goto restart; - } - - VM_ASSERT(ret != Qundef); - return ret; -} - -static VALUE -ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) -{ - VALUE ret_r; - ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r); - return Qnil; -} - -static VALUE -ractor_take(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE ret_r; - VALUE v = ractor_select(ec, &r->self, 1, Qundef, false, &ret_r); - return v; -} - -static VALUE -ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE prev; - - RACTOR_LOCK(r); - { - if (!r->sync.incoming_port_closed) { - prev = Qfalse; - r->sync.incoming_port_closed = true; - if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) { - VM_ASSERT(r->sync.incoming_queue.cnt == 0); - RUBY_DEBUG_LOG("cancel receiving", 0); - } - } - else { - prev = Qtrue; - } - } - RACTOR_UNLOCK(r); - return prev; -} - -static VALUE -ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE prev; - - RACTOR_LOCK(r); - { - if (!r->sync.outgoing_port_closed) { - prev = Qfalse; - r->sync.outgoing_port_closed = true; - } - else { - prev = Qtrue; - } - - // wakeup all taking ractors - rb_ractor_t *taking_ractor; - while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) { - RACTOR_LOCK(taking_ractor); - ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close); - RACTOR_UNLOCK(taking_ractor); - } - - // raising yielding Ractor - if (!r->yield_atexit && - ractor_wakeup(r, wait_yielding, wakeup_by_close)) { - RUBY_DEBUG_LOG("cancel yielding", 0); - } - } - RACTOR_UNLOCK(r); - return prev; -} +#include "ractor_sync.c" // creation/termination @@ -1111,11 +368,7 @@ ractor_next_id(void) { uint32_t id; - RB_VM_LOCK(); - { - id = ++ractor_last_id; - } - RB_VM_UNLOCK(); + id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1); return id; } @@ -1123,28 +376,28 @@ ractor_next_id(void) static void vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode) { - RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->id, vm->ractor.cnt); + RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt); VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P()); - list_add_tail(&vm->ractor.set, &r->vmlr_node); + ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node); vm->ractor.cnt++; + + if (r->newobj_cache) { + VM_ASSERT(r == ruby_single_main_ractor); + } + else { + r->newobj_cache = rb_gc_ractor_cache_alloc(r); + } } static void cancel_single_ractor_mode(void) { // enable multi-ractor mode - RUBY_DEBUG_LOG("enable multi-ractor mode", 0); - - rb_gc_start(); - rb_transient_heap_evacuate(); - - if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { - rb_warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " - "Also there are many implementation issues."); - } + RUBY_DEBUG_LOG("enable multi-ractor mode"); ruby_single_main_ractor = NULL; + rb_funcall(rb_cRactor, rb_intern("_activated"), 0); } static void @@ -1188,13 +441,16 @@ vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr) vm->ractor.cnt, vm->ractor.sync.terminate_waiting); VM_ASSERT(vm->ractor.cnt > 0); - list_del(&cr->vmlr_node); + ccan_list_del(&cr->vmlr_node); if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) { rb_native_cond_signal(&vm->ractor.sync.terminate_cond); } vm->ractor.cnt--; + rb_gc_ractor_cache_free(cr->newobj_cache); + cr->newobj_cache = NULL; + ractor_status_set(cr, ractor_terminated); } RB_VM_UNLOCK(); @@ -1206,35 +462,66 @@ ractor_alloc(VALUE klass) rb_ractor_t *r; VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r); FL_SET_RAW(rv, RUBY_FL_SHAREABLE); - r->self = rv; + r->pub.self = rv; + r->next_ec_serial = 1; VM_ASSERT(ractor_status_p(r, ractor_created)); return rv; } +static rb_ractor_t _main_ractor = { + .loc = Qnil, + .name = Qnil, + .pub.id = MAIN_RACTOR_ID, + .pub.self = Qnil, + .next_ec_serial = 1, + .main_ractor = true, +}; + rb_ractor_t * rb_ractor_main_alloc(void) { - rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t)); - if (r == NULL) { - fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n"); - exit(EXIT_FAILURE); - } - MEMZERO(r, rb_ractor_t, 1); - r->id = ++ractor_last_id; - r->loc = Qnil; - r->name = Qnil; - r->self = Qnil; + rb_ractor_t *r = &_main_ractor; + r->newobj_cache = rb_gc_ractor_cache_alloc(r); ruby_single_main_ractor = r; return r; } -void rb_gvl_init(rb_global_vm_lock_t *gvl); +#if defined(HAVE_WORKING_FORK) +// Set up the main Ractor for the VM after fork. +// Puts us in "single Ractor mode" +void +rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th) +{ + // initialize as a main ractor + vm->ractor.cnt = 0; + vm->ractor.blocking_cnt = 0; + ruby_single_main_ractor = th->ractor; + th->ractor->status_ = ractor_created; + + rb_ractor_living_threads_init(th->ractor); + rb_ractor_living_threads_insert(th->ractor, th); + + VM_ASSERT(vm->ractor.blocking_cnt == 0); + VM_ASSERT(vm->ractor.cnt == 1); +} + +void +rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r) +{ + rb_gc_ractor_cache_free(r->newobj_cache); + r->newobj_cache = NULL; + r->status_ = ractor_terminated; + ractor_sync_terminate_atfork(vm, r); +} +#endif + +void rb_thread_sched_init(struct rb_thread_sched *, bool atfork); void rb_ractor_living_threads_init(rb_ractor_t *r) { - list_head_init(&r->threads.set); + ccan_list_head_init(&r->threads.set); r->threads.cnt = 0; r->threads.blocking_cnt = 0; } @@ -1242,13 +529,12 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->sync.incoming_queue); - rb_native_mutex_initialize(&r->sync.lock); - rb_native_cond_initialize(&r->sync.cond); - rb_native_cond_initialize(&r->barrier_wait_cond); + ractor_sync_init(r); + st_init_existing_numtable_with_size(&r->pub.targeted_hooks, 0); + r->pub.hooks.type = hook_list_type_ractor_local; // thread management - rb_gvl_init(&r->threads.gvl); + rb_thread_sched_init(&r->threads.sched, false); rb_ractor_living_threads_init(r); // naming @@ -1260,26 +546,25 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)", rb_enc_name(enc)); } - name = rb_str_new_frozen(name); + name = RB_OBJ_SET_SHAREABLE(rb_str_new_frozen(name)); } - r->name = name; + + if (!SPECIAL_CONST_P(loc)) RB_OBJ_SET_SHAREABLE(loc); r->loc = loc; + r->name = name; } void rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th) { - r->self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r); - FL_SET_RAW(r->self, RUBY_FL_SHAREABLE); + VALUE rv = r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r); + FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE); ractor_init(r, Qnil, Qnil); r->threads.main = th; rb_ractor_living_threads_insert(r, th); -} -// io.c -VALUE rb_io_prep_stdin(void); -VALUE rb_io_prep_stdout(void); -VALUE rb_io_prep_stderr(void); + RB_GC_GUARD(rv); +} static VALUE ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block) @@ -1288,99 +573,67 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL rb_ractor_t *r = RACTOR_PTR(rv); ractor_init(r, name, loc); - // can block here - r->id = ractor_next_id(); - RUBY_DEBUG_LOG("r:%u", r->id); - - r->r_stdin = rb_io_prep_stdin(); - r->r_stdout = rb_io_prep_stdout(); - r->r_stderr = rb_io_prep_stderr(); + r->pub.id = ractor_next_id(); + RUBY_DEBUG_LOG("r:%u", r->pub.id); rb_ractor_t *cr = rb_ec_ractor_ptr(ec); r->verbose = cr->verbose; r->debug = cr->debug; + rb_yjit_before_ractor_spawn(); + rb_zjit_before_ractor_spawn(); rb_thread_create_ractor(r, args, block); RB_GC_GUARD(rv); return rv; } -static void -ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) +#if 0 +static VALUE +ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func) { - if (cr->sync.outgoing_port_closed) { - return; - } - - ASSERT_ractor_unlocking(cr); - - struct rb_ractor_basket basket; - ractor_basket_setup(ec, &basket, v, Qfalse, exc, true); - - retry: - if (ractor_try_yield(ec, cr, &basket)) { - // OK. - } - else { - bool retry = false; - RACTOR_LOCK(cr); - { - if (cr->sync.taking_ractors.cnt == 0) { - cr->sync.wait.yielded_basket = basket; - - VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_yielding; - cr->sync.wait.wakeup_status = wakeup_none; - - VM_ASSERT(cr->yield_atexit == false); - cr->yield_atexit = true; - } - else { - retry = true; // another ractor is waiting for the yield. - } - } - RACTOR_UNLOCK(cr); + VALUE block = rb_proc_new(func, Qnil); + return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block); +} +#endif - if (retry) goto retry; - } +static void +ractor_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE result, bool exc) +{ + ractor_notify_exit(ec, cr, result, exc); } void -rb_ractor_teardown(rb_execution_context_t *ec) +rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_close_incoming(ec, cr); - ractor_close_outgoing(ec, cr); - - // sync with rb_ractor_terminate_interrupt_main_thread() - RB_VM_LOCK_ENTER(); - { - VM_ASSERT(cr->threads.main != NULL); - cr->threads.main = NULL; - } - RB_VM_LOCK_LEAVE(); + ractor_atexit(ec, cr, result, false); } void -rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) +rb_ractor_atexit_exception(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, result, false); + ractor_atexit(ec, cr, ec->errinfo, true); } void -rb_ractor_atexit_exception(rb_execution_context_t *ec) +rb_ractor_teardown(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, ec->errinfo, true); + + // sync with rb_ractor_terminate_interrupt_main_thread() + RB_VM_LOCKING() { + VM_ASSERT(cr->threads.main != NULL); + cr->threads.main = NULL; + } } void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) { for (int i=0; i<len; i++) { - ptr[i] = ractor_receive(ec, r); + ptr[i] = ractor_receive(ec, ractor_default_port(r)); } } @@ -1389,17 +642,11 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args { int len = RARRAY_LENINT(args); for (int i=0; i<len; i++) { - ractor_send(ec, r, RARRAY_AREF(args, i), false); + ractor_send(ec, ractor_default_port(r), RARRAY_AREF(args, i), false); } } -VALUE -rb_ractor_self(const rb_ractor_t *r) -{ - return r->self; -} - -MJIT_FUNC_EXPORTED bool +bool rb_ractor_main_p_(void) { VM_ASSERT(rb_multi_ractor_p()); @@ -1407,34 +654,21 @@ rb_ractor_main_p_(void) return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor; } -bool -rb_obj_is_main_ractor(VALUE gv) -{ - if (!rb_ractor_p(gv)) return false; - rb_ractor_t *r = DATA_PTR(gv); - return r == GET_VM()->ractor.main_ractor; -} - -rb_global_vm_lock_t * -rb_ractor_gvl(rb_ractor_t *r) -{ - return &r->threads.gvl; -} - int rb_ractor_living_thread_num(const rb_ractor_t *r) { return r->threads.cnt; } +// only for current ractor VALUE -rb_ractor_thread_list(rb_ractor_t *r) +rb_ractor_thread_list(void) { - VALUE ary = rb_ary_new(); + rb_ractor_t *r = GET_RACTOR(); rb_thread_t *th = 0; + VALUE ary = rb_ary_new(); - RACTOR_LOCK(r); - list_for_each(&r->threads.set, th, lt_node) { + ccan_list_for_each(&r->threads.set, th, lt_node) { switch (th->status) { case THREAD_RUNNABLE: case THREAD_STOPPED: @@ -1444,7 +678,7 @@ rb_ractor_thread_list(rb_ractor_t *r) break; } } - RACTOR_UNLOCK(r); + return ary; } @@ -1455,8 +689,8 @@ rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th) RACTOR_LOCK(r); { - RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->id, r->threads.cnt); - list_add_tail(&r->threads.set, &th->lt_node); + RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt); + ccan_list_add_tail(&r->threads.set, &th->lt_node); r->threads.cnt++; } RACTOR_UNLOCK(r); @@ -1516,16 +750,15 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c cr->threads.cnt == cr->threads.blocking_cnt + 1) { // change ractor status: running -> blocking rb_vm_t *vm = GET_VM(); - ASSERT_vm_unlocking(); - RB_VM_LOCK(); - { + RB_VM_LOCKING() { rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line); } - RB_VM_UNLOCK(); } } +void rb_threadptr_remove(rb_thread_t *th); + void rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) { @@ -1533,13 +766,15 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt); ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__); + rb_threadptr_remove(th); + if (cr->threads.cnt == 1) { vm_remove_ractor(th->vm, cr); } else { RACTOR_LOCK(cr); { - list_del(&th->lt_node); + ccan_list_del(&th->lt_node); cr->threads.cnt--; } RACTOR_UNLOCK(cr); @@ -1570,11 +805,9 @@ rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line) if (cr->threads.cnt == cr->threads.blocking_cnt) { rb_vm_t *vm = GET_VM(); - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); } - RB_VM_LOCK_LEAVE(); } cr->threads.blocking_cnt--; @@ -1613,7 +846,7 @@ rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r) rb_threadptr_interrupt(main_th); } else { - RUBY_DEBUG_LOG("killed (%p)", main_th); + RUBY_DEBUG_LOG("killed (%p)", (void *)main_th); } } } @@ -1626,27 +859,33 @@ ractor_terminal_interrupt_all(rb_vm_t *vm) if (vm->ractor.cnt > 1) { // send terminate notification to all ractors rb_ractor_t *r = 0; - list_for_each(&vm->ractor.set, r, vmlr_node) { + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { if (r != vm->ractor.main_ractor) { + RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r)); rb_ractor_terminate_interrupt_main_thread(r); } } } } +void rb_add_running_thread(rb_thread_t *th); +void rb_del_running_thread(rb_thread_t *th); + void rb_ractor_terminate_all(void) { rb_vm_t *vm = GET_VM(); rb_ractor_t *cr = vm->ractor.main_ractor; + RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt); + VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it. - if (vm->ractor.cnt > 1) { - RB_VM_LOCK(); + RB_VM_LOCK(); + { ractor_terminal_interrupt_all(vm); // kill all ractors - RB_VM_UNLOCK(); } + RB_VM_UNLOCK(); rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait RB_VM_LOCK(); @@ -1657,7 +896,20 @@ rb_ractor_terminate_all(void) // wait for 1sec rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); + rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */); +#ifdef RUBY_THREAD_PTHREAD_H + while (vm->ractor.sched.barrier_waiting) { + // A barrier is waiting. Threads relinquish the VM lock before joining the barrier and + // since we just acquired the VM lock back, we're blocking other threads from joining it. + // We loop until the barrier is over. We can't join this barrier because our thread isn't added to + // running_threads until the call below to `rb_add_running_thread`. + RB_VM_UNLOCK(); + unsigned int lev; + RB_VM_LOCK_ENTER_LEV_NB(&lev); + } +#endif + rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); ractor_terminal_interrupt_all(vm); @@ -1669,7 +921,23 @@ rb_ractor_terminate_all(void) rb_execution_context_t * rb_vm_main_ractor_ec(rb_vm_t *vm) { - return vm->ractor.main_ractor->threads.running_ec; + /* This code needs to carefully work around two bugs: + * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is + * actually currently running (as opposed to without M:N threading, when + * running_ec will still point to the _last_ thread which ran) + * - Bug #20197: If the main thread is sleeping, setting its postponed job + * interrupt flag is pointless; it won't look at the flag until it stops sleeping + * for some reason. It would be better to set the flag on the running ec, which + * will presumably look at it soon. + * + * Solution: use running_ec if it's set, otherwise fall back to the main thread ec. + * This is still susceptible to some rare race conditions (what if the last thread + * to run just entered a long-running sleep?), but seems like the best balance of + * robustness and complexity. + */ + rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec; + if (running_ec) { return running_ec; } + return vm->ractor.main_thread->ec; } static VALUE @@ -1678,15 +946,110 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self) rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object"); } +/* + * Document-class: Ractor::Error + * + * The parent class of Ractor-related error classes. + */ + +/* + * Document-class: Ractor::ClosedError + * + * Raised when an attempt is made to send a message to a closed port, + * or to retrieve a message from a closed and empty port. + * Ports may be closed explicitly with Ractor::Port#close + * and are closed implicitly when a Ractor terminates. + * + * port = Ractor::Port.new + * port.close + * port << "test" # Ractor::ClosedError + * port.receive # Ractor::ClosedError + * + * ClosedError is a descendant of StopIteration, so the closing of a port will break + * out of loops without propagating the error. + */ + +/* + * Document-class: Ractor::IsolationError + * + * Raised on attempt to make a Ractor-unshareable object + * Ractor-shareable. + */ + +/* + * Document-class: Ractor::RemoteError + * + * Raised on Ractor#join or Ractor#value if there was an uncaught exception in the Ractor. + * Its +cause+ will contain the original exception, and +ractor+ is the original ractor + * it was raised in. + * + * r = Ractor.new { raise "Something weird happened" } + * + * begin + * r.value + * rescue => e + * p e # => #<Ractor::RemoteError: thrown by remote Ractor.> + * p e.ractor == r # => true + * p e.cause # => #<RuntimeError: Something weird happened> + * end + * + */ + +/* + * Document-class: Ractor::MovedError + * + * Raised on an attempt to access an object which was moved in Ractor#send or Ractor::Port#send. + * + * r = Ractor.new { sleep } + * + * ary = [1, 2, 3] + * r.send(ary, move: true) + * ary.inspect + * # Ractor::MovedError (can not send any methods to a moved object) + * + */ + +/* + * Document-class: Ractor::MovedObject + * + * A special object which replaces any value that was moved to another ractor in Ractor#send + * or Ractor::Port#send. Any attempt to access the object results in Ractor::MovedError. + * + * r = Ractor.new { receive } + * + * ary = [1, 2, 3] + * r.send(ary, move: true) + * p Ractor::MovedObject === ary + * # => true + * ary.inspect + * # Ractor::MovedError (can not send any methods to a moved object) + */ + +/* + * Document-class: Ractor::UnsafeError + * + * Raised when Ractor-unsafe C-methods is invoked by a non-main Ractor. + */ + +// Main docs are in ractor.rb, but without this clause there are weird artifacts +// in their rendering. +/* + * Document-class: Ractor + * + */ + void Init_Ractor(void) { rb_cRactor = rb_define_class("Ractor", rb_cObject); - rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError); - rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError); - rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError); - rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration); - rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError); + rb_undef_alloc_func(rb_cRactor); + + rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError); + rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError); + rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError); + rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError); + rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration); + rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError); rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject); rb_undef_alloc_func(rb_cRactorMovedObject); @@ -1702,7 +1065,7 @@ Init_Ractor(void) rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1); - rb_obj_freeze(rb_cRactorMovedObject); + Init_RactorPort(); } void @@ -1711,9 +1074,9 @@ rb_ractor_dump(void) rb_vm_t *vm = GET_VM(); rb_ractor_t *r = 0; - list_for_each(&vm->ractor.set, r, vmlr_node) { + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { if (r != vm->ractor.main_ractor) { - fprintf(stderr, "r:%u (%s)\n", r->id, ractor_status_str(r->status_)); + fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_)); } } } @@ -1762,7 +1125,7 @@ rb_ractor_stdin_set(VALUE in) } else { rb_ractor_t *cr = GET_RACTOR(); - RB_OBJ_WRITE(cr->self, &cr->r_stdin, in); + RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in); } } @@ -1774,7 +1137,7 @@ rb_ractor_stdout_set(VALUE out) } else { rb_ractor_t *cr = GET_RACTOR(); - RB_OBJ_WRITE(cr->self, &cr->r_stdout, out); + RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out); } } @@ -1786,10 +1149,60 @@ rb_ractor_stderr_set(VALUE err) } else { rb_ractor_t *cr = GET_RACTOR(); - RB_OBJ_WRITE(cr->self, &cr->r_stderr, err); + RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err); } } +rb_hook_list_t * +rb_ractor_hooks(rb_ractor_t *cr) +{ + return &cr->pub.hooks; +} + +st_table * +rb_ractor_targeted_hooks(rb_ractor_t *cr) +{ + return &cr->pub.targeted_hooks; +} + +static void +rb_obj_set_shareable_no_assert(VALUE obj) +{ + FL_SET_RAW(obj, FL_SHAREABLE); + + if (rb_obj_gen_fields_p(obj)) { + VALUE fields = rb_obj_fields_no_ractor_check(obj); + if (imemo_type_p(fields, imemo_fields)) { + // no recursive mark + FL_SET_RAW(fields, FL_SHAREABLE); + } + } +} + +#ifndef STRICT_VERIFY_SHAREABLE +#define STRICT_VERIFY_SHAREABLE 0 +#endif + +bool +rb_ractor_verify_shareable(VALUE obj) +{ +#if STRICT_VERIFY_SHAREABLE + rb_gc_verify_shareable(obj); +#endif + return true; +} + +VALUE +rb_obj_set_shareable(VALUE obj) +{ + RUBY_ASSERT(!RB_SPECIAL_CONST_P(obj)); + + rb_obj_set_shareable_no_assert(obj); + RUBY_ASSERT(rb_ractor_verify_shareable(obj)); + + return obj; +} + /// traverse function // 2: stop search @@ -1857,12 +1270,26 @@ obj_traverse_rec(struct obj_traverse_data *data) { if (UNLIKELY(!data->rec)) { data->rec_hash = rb_ident_hash_new(); - data->rec = rb_hash_st_table(data->rec_hash); + rb_obj_hide(data->rec_hash); + data->rec = RHASH_ST_TABLE(data->rec_hash); } return data->rec; } static int +obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr) +{ + struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr; + + if (obj_traverse_i(val, d->data)) { + d->stop = true; + return ST_STOP; + } + + return ST_CONTINUE; +} + +static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data) { if (RB_SPECIAL_CONST_P(obj)) return 0; @@ -1877,15 +1304,14 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) // already traversed return 0; } + RB_OBJ_WRITTEN(data->rec_hash, Qundef, obj); - if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { - struct gen_ivtbl *ivtbl; - rb_ivar_generic_ivtbl_lookup(obj, &ivtbl); - for (uint32_t i = 0; i < ivtbl->numiv; i++) { - VALUE val = ivtbl->ivptr[i]; - if (val != Qundef && obj_traverse_i(val, data)) return 1; - } - } + struct obj_traverse_callback_data d = { + .stop = false, + .data = data, + }; + rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d); + if (d.stop) return 1; switch (BUILTIN_TYPE(obj)) { // no child node @@ -1895,23 +1321,16 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) case T_REGEXP: case T_FILE: case T_SYMBOL: - case T_MATCH: break; case T_OBJECT: - { - uint32_t len = ROBJECT_NUMIV(obj); - VALUE *ptr = ROBJECT_IVPTR(obj); - - for (uint32_t i=0; i<len; i++) { - VALUE val = ptr[i]; - if (val != Qundef && obj_traverse_i(val, data)) return 1; - } - } + /* Instance variables already traversed. */ break; case T_ARRAY: { + rb_ary_cancel_sharing(obj); + for (int i = 0; i < RARRAY_LENINT(obj); i++) { VALUE e = rb_ary_entry(obj, i); if (obj_traverse_i(e, data)) return 1; @@ -1934,7 +1353,7 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) case T_STRUCT: { - long len = RSTRUCT_LEN(obj); + long len = RSTRUCT_LEN_RAW(obj); const VALUE *ptr = RSTRUCT_CONST_PTR(obj); for (long i=0; i<len; i++) { @@ -1943,6 +1362,10 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) } break; + case T_MATCH: + if (obj_traverse_i(RMATCH(obj)->str, data)) return 1; + break; + case T_RATIONAL: if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1; if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1; @@ -1959,7 +1382,9 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) .stop = false, .data = data, }; - rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d); + RB_VM_LOCKING_NO_BARRIER() { + rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d); + } if (d.stop) return 1; } break; @@ -2021,65 +1446,105 @@ rb_obj_traverse(VALUE obj, } static int -frozen_shareable_p(VALUE obj, bool *made_shareable) +allow_frozen_shareable_p(VALUE obj) { if (!RB_TYPE_P(obj, T_DATA)) { return true; } - else if (RTYPEDDATA_P(obj)) { + else { const rb_data_type_t *type = RTYPEDDATA_TYPE(obj); if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) { return true; } - else if (made_shareable && rb_obj_is_proc(obj)) { - // special path to make shareable Proc. - rb_proc_ractor_make_shareable(obj); - *made_shareable = true; - VM_ASSERT(RB_OBJ_SHAREABLE_P(obj)); - return false; - } } return false; } static enum obj_traverse_iterator_result +make_shareable_check_shareable_freeze(VALUE obj, enum obj_traverse_iterator_result result) +{ + if (!RB_OBJ_FROZEN_RAW(obj)) { + rb_funcall(obj, idFreeze, 0); + + if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) { + rb_raise(rb_eRactorError, "#freeze does not freeze object correctly"); + } + + if (RB_OBJ_SHAREABLE_P(obj)) { + return traverse_skip; + } + } + + return result; +} + +static int obj_refer_only_shareables_p(VALUE obj); + +static enum obj_traverse_iterator_result make_shareable_check_shareable(VALUE obj) { VM_ASSERT(!SPECIAL_CONST_P(obj)); - bool made_shareable = false; - if (RB_OBJ_SHAREABLE_P(obj)) { + if (rb_ractor_shareable_p(obj)) { return traverse_skip; } - else if (!frozen_shareable_p(obj, &made_shareable)) { - if (made_shareable) { - return traverse_skip; + else if (!allow_frozen_shareable_p(obj)) { + VM_ASSERT(RB_TYPE_P(obj, T_DATA)); + const rb_data_type_t *type = RTYPEDDATA_TYPE(obj); + + if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE_NO_REC) { + if (obj_refer_only_shareables_p(obj)) { + make_shareable_check_shareable_freeze(obj, traverse_skip); + RB_OBJ_SET_SHAREABLE(obj); + return traverse_skip; + } + else { + rb_raise(rb_eRactorError, + "can not make shareable object for %+"PRIsVALUE" because it refers unshareable objects", obj); + } + } + else if (rb_obj_is_proc(obj)) { + rb_proc_ractor_make_shareable(obj, Qundef); + return traverse_cont; } else { - rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj); + rb_raise(rb_eRactorError, "can not make shareable object for %+"PRIsVALUE, obj); } } - if (!RB_OBJ_FROZEN_RAW(obj)) { - rb_funcall(obj, idFreeze, 0); - - if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) { - rb_raise(rb_eRactorError, "#freeze does not freeze object correctly"); - } - - if (RB_OBJ_SHAREABLE_P(obj)) { - return traverse_skip; + switch (TYPE(obj)) { + case T_IMEMO: + return traverse_skip; + case T_OBJECT: + { + // If a T_OBJECT is shared and has no free capacity, we can't safely store the object_id inline, + // as it would require to move the object content into an external buffer. + // This is only a problem for T_OBJECT, given other types have external fields and can do RCU. + // To avoid this issue, we proactively create the object_id. + shape_id_t shape_id = RBASIC_SHAPE_ID(obj); + attr_index_t capacity = RSHAPE_CAPACITY(shape_id); + attr_index_t free_capacity = capacity - RSHAPE_LEN(shape_id); + if (!rb_shape_has_object_id(shape_id) && capacity && !free_capacity) { + rb_obj_id(obj); + } } + break; + default: + break; } - return traverse_cont; + return make_shareable_check_shareable_freeze(obj, traverse_cont); } static enum obj_traverse_iterator_result mark_shareable(VALUE obj) { - FL_SET_RAW(obj, RUBY_FL_SHAREABLE); + if (RB_TYPE_P(obj, T_STRING)) { + rb_str_make_independent(obj); + } + + rb_obj_set_shareable_no_assert(obj); return traverse_cont; } @@ -2092,6 +1557,32 @@ rb_ractor_make_shareable(VALUE obj) return obj; } +VALUE +rb_ractor_make_shareable_copy(VALUE obj) +{ + VALUE copy = ractor_copy(obj); + return rb_ractor_make_shareable(copy); +} + +VALUE +rb_ractor_ensure_shareable(VALUE obj, VALUE name) +{ + if (!rb_ractor_shareable_p(obj)) { + VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE, + name); + rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message)); + } + return obj; +} + +void +rb_ractor_ensure_main_ractor(const char *msg) +{ + if (!rb_ractor_main_p()) { + rb_raise(rb_eRactorIsolationError, "%s", msg); + } +} + static enum obj_traverse_iterator_result shareable_p_enter(VALUE obj) { @@ -2106,14 +1597,14 @@ shareable_p_enter(VALUE obj) return traverse_skip; } else if (RB_OBJ_FROZEN_RAW(obj) && - frozen_shareable_p(obj, NULL)) { + allow_frozen_shareable_p(obj)) { return traverse_cont; } return traverse_stop; // fail } -MJIT_FUNC_EXPORTED bool +bool rb_ractor_shareable_p_continue(VALUE obj) { if (rb_obj_traverse(obj, @@ -2127,6 +1618,12 @@ rb_ractor_shareable_p_continue(VALUE obj) } #if RACTOR_CHECK_MODE > 0 +void +rb_ractor_setup_belonging(VALUE obj) +{ + rb_ractor_setup_belonging_to(obj, rb_ractor_current_id()); +} + static enum obj_traverse_iterator_result reset_belonging_enter(VALUE obj) { @@ -2217,30 +1714,47 @@ obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int e return ST_CONTINUE; } +static int +obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x) +{ + return ST_REPLACE; +} + +static int +obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists) +{ + struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr; + struct obj_traverse_replace_data *data = d->data; + + if (obj_traverse_replace_i(*(VALUE *)val, data)) { + d->stop = true; + return ST_STOP; + } + else if (*(VALUE *)val != data->replacement) { + VALUE v = *(VALUE *)val = data->replacement; + RB_OBJ_WRITTEN(d->src, Qundef, v); + } + + return ST_CONTINUE; +} + static struct st_table * obj_traverse_replace_rec(struct obj_traverse_replace_data *data) { if (UNLIKELY(!data->rec)) { data->rec_hash = rb_ident_hash_new(); - data->rec = rb_hash_st_table(data->rec_hash); + data->rec = RHASH_ST_TABLE(data->rec_hash); } return data->rec; } -#if USE_TRANSIENT_HEAP -void rb_ary_transient_heap_evacuate(VALUE ary, int promote); -void rb_obj_transient_heap_evacuate(VALUE obj, int promote); -void rb_hash_transient_heap_evacuate(VALUE hash, int promote); -void rb_struct_transient_heap_evacuate(VALUE st, int promote); -#endif - static void obj_refer_only_shareables_p_i(VALUE obj, void *ptr) { int *pcnt = (int *)ptr; if (!rb_ractor_shareable_p(obj)) { - pcnt++; + ++*pcnt; } } @@ -2248,14 +1762,16 @@ static int obj_refer_only_shareables_p(VALUE obj) { int cnt = 0; - rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt); + RB_VM_LOCKING_NO_BARRIER() { + rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt); + } return cnt == 0; } static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) { - VALUE replacement; + st_data_t replacement; if (RB_SPECIAL_CONST_P(obj)) { data->replacement = obj; @@ -2268,32 +1784,50 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case traverse_stop: return 1; // stop search } - replacement = data->replacement; + replacement = (st_data_t)data->replacement; - if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t *)&replacement))) { - data->replacement = replacement; + if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) { + data->replacement = (VALUE)replacement; return 0; } else { - st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t)replacement); + st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement); + RB_OBJ_WRITTEN(data->rec_hash, Qundef, obj); + RB_OBJ_WRITTEN(data->rec_hash, Qundef, replacement); } if (!data->move) { obj = replacement; } -#define CHECK_AND_REPLACE(v) do { \ +#define CHECK_AND_REPLACE(parent_obj, v) do { \ VALUE _val = (v); \ if (obj_traverse_replace_i(_val, data)) { return 1; } \ - else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \ + else if (data->replacement != _val) { RB_OBJ_WRITE(parent_obj, &v, data->replacement); } \ } while (0) - if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { - struct gen_ivtbl *ivtbl; - rb_ivar_generic_ivtbl_lookup(obj, &ivtbl); - for (uint32_t i = 0; i < ivtbl->numiv; i++) { - if (ivtbl->ivptr[i] != Qundef) { - CHECK_AND_REPLACE(ivtbl->ivptr[i]); + if (UNLIKELY(rb_obj_gen_fields_p(obj))) { + VALUE fields_obj = rb_obj_fields_no_ractor_check(obj); + + if (UNLIKELY(rb_obj_shape_complex_p(obj))) { + struct obj_traverse_replace_callback_data d = { + .stop = false, + .data = data, + .src = fields_obj, + }; + rb_st_foreach_with_replace( + rb_imemo_fields_complex_tbl(fields_obj), + obj_iv_hash_traverse_replace_foreach_i, + obj_iv_hash_traverse_replace_i, + (st_data_t)&d + ); + if (d.stop) return 1; + } + else { + uint32_t fields_count = RSHAPE_LEN(RBASIC_SHAPE_ID(obj)); + VALUE *fields = rb_imemo_fields_ptr(fields_obj); + for (uint32_t i = 0; i < fields_count; i++) { + CHECK_AND_REPLACE(fields_obj, fields[i]); } } } @@ -2305,7 +1839,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_REGEXP: case T_FILE: case T_SYMBOL: - case T_MATCH: break; case T_STRING: rb_str_make_independent(obj); @@ -2313,16 +1846,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_OBJECT: { -#if USE_TRANSIENT_HEAP - if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE); -#endif - - uint32_t len = ROBJECT_NUMIV(obj); - VALUE *ptr = ROBJECT_IVPTR(obj); + if (rb_obj_shape_complex_p(obj)) { + struct obj_traverse_replace_callback_data d = { + .stop = false, + .data = data, + .src = obj, + }; + rb_st_foreach_with_replace( + ROBJECT_FIELDS_HASH(obj), + obj_iv_hash_traverse_replace_foreach_i, + obj_iv_hash_traverse_replace_i, + (st_data_t)&d + ); + if (d.stop) return 1; + } + else { + uint32_t len = ROBJECT_FIELDS_COUNT_NOT_COMPLEX(obj); + VALUE *ptr = ROBJECT_FIELDS(obj); - for (uint32_t i=0; i<len; i++) { - if (ptr[i] != Qundef) { - CHECK_AND_REPLACE(ptr[i]); + for (uint32_t i = 0; i < len; i++) { + CHECK_AND_REPLACE(obj, ptr[i]); } } } @@ -2331,9 +1874,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_ARRAY: { rb_ary_cancel_sharing(obj); -#if USE_TRANSIENT_HEAP - if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE); -#endif for (int i = 0; i < RARRAY_LENINT(obj); i++) { VALUE e = rb_ary_entry(obj, i); @@ -2348,12 +1888,8 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) RB_GC_GUARD(obj); } break; - case T_HASH: { -#if USE_TRANSIENT_HEAP - if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE); -#endif struct obj_traverse_replace_callback_data d = { .stop = false, .data = data, @@ -2378,25 +1914,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_STRUCT: { -#if USE_TRANSIENT_HEAP - if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE); -#endif - long len = RSTRUCT_LEN(obj); + long len = RSTRUCT_LEN_RAW(obj); const VALUE *ptr = RSTRUCT_CONST_PTR(obj); for (long i=0; i<len; i++) { - CHECK_AND_REPLACE(ptr[i]); + CHECK_AND_REPLACE(obj, ptr[i]); } } break; + case T_MATCH: + CHECK_AND_REPLACE(obj, RMATCH(obj)->str); + break; + case T_RATIONAL: - CHECK_AND_REPLACE(RRATIONAL(obj)->num); - CHECK_AND_REPLACE(RRATIONAL(obj)->den); + CHECK_AND_REPLACE(obj, RRATIONAL(obj)->num); + CHECK_AND_REPLACE(obj, RRATIONAL(obj)->den); break; case T_COMPLEX: - CHECK_AND_REPLACE(RCOMPLEX(obj)->real); - CHECK_AND_REPLACE(RCOMPLEX(obj)->imag); + CHECK_AND_REPLACE(obj, RCOMPLEX(obj)->real); + CHECK_AND_REPLACE(obj, RCOMPLEX(obj)->imag); break; case T_DATA: @@ -2421,7 +1958,7 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) rb_bug("unreachable"); } - data->replacement = replacement; + data->replacement = (VALUE)replacement; if (data->leave_func(obj, data) == traverse_stop) { return 1; @@ -2455,35 +1992,19 @@ rb_obj_traverse_replace(VALUE obj, } } -struct RVALUE { - VALUE flags; - VALUE klass; - VALUE v1; - VALUE v2; - VALUE v3; +static const bool wb_protected_types[RUBY_T_MASK] = { + [T_OBJECT] = true, + [T_HASH] = true, + [T_ARRAY] = true, + [T_STRING] = true, + [T_STRUCT] = true, + [T_COMPLEX] = true, + [T_REGEXP] = true, + [T_MATCH] = true, + [T_FLOAT] = true, + [T_RATIONAL] = true, }; -static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 | - FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 | - FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 | - FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 | - FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19; - -static void -ractor_moved_bang(VALUE obj) -{ - // invalidate src object - struct RVALUE *rv = (void *)obj; - - rv->klass = rb_cRactorMovedObject; - rv->v1 = 0; - rv->v2 = 0; - rv->v3 = 0; - rv->flags = rv->flags & ~fl_users; - - // TODO: record moved location -} - static enum obj_traverse_iterator_result move_enter(VALUE obj, struct obj_traverse_replace_data *data) { @@ -2492,33 +2013,44 @@ move_enter(VALUE obj, struct obj_traverse_replace_data *data) return traverse_skip; } else { - data->replacement = rb_obj_alloc(RBASIC_CLASS(obj)); + VALUE type = RB_BUILTIN_TYPE(obj); + size_t slot_size = rb_gc_obj_slot_size(obj); + VALUE moved = rb_newobj(GET_EC(), 0, type, RBASIC_SHAPE_ID(obj), wb_protected_types[type], slot_size); + MEMZERO(((struct RBasic *)moved) + 1, char, slot_size - sizeof(struct RBasic)); + data->replacement = (VALUE)moved; return traverse_cont; } } -void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c - static enum obj_traverse_iterator_result move_leave(VALUE obj, struct obj_traverse_replace_data *data) { - VALUE v = data->replacement; - struct RVALUE *dst = (struct RVALUE *)v; - struct RVALUE *src = (struct RVALUE *)obj; - - dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users); + // Copy flags + VALUE ignored_flags = RUBY_FL_PROMOTED; + RBASIC(data->replacement)->flags = (RBASIC(obj)->flags & ~ignored_flags) | (RBASIC(data->replacement)->flags & ignored_flags); + // Copy contents without the flags + memcpy( + (char *)data->replacement + sizeof(VALUE), + (char *)obj + sizeof(VALUE), + rb_gc_obj_slot_size(obj) - sizeof(VALUE) + ); - dst->v1 = src->v1; - dst->v2 = src->v2; - dst->v3 = src->v3; + // We've copied obj's references to the replacement + rb_gc_writebarrier_remember(data->replacement); - if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { - rb_replace_generic_ivar(v, obj); + void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c + if (UNLIKELY(rb_obj_gen_fields_p(obj))) { + rb_replace_generic_ivar(data->replacement, obj); } - // TODO: generic_ivar + rb_gc_obj_id_moved(data->replacement); - ractor_moved_bang(obj); + VALUE flags = T_OBJECT | FL_FREEZE | (RBASIC(obj)->flags & FL_PROMOTED); + + // Avoid mutations using bind_call, etc. + MEMZERO((char *)obj, char, sizeof(struct RBasic)); + RBASIC(obj)->flags = flags; + RBASIC_SET_CLASS_RAW(obj, rb_cRactorMovedObject); return traverse_cont; } @@ -2526,7 +2058,7 @@ static VALUE ractor_move(VALUE obj) { VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true); - if (val != Qundef) { + if (!UNDEF_P(val)) { return val; } else { @@ -2534,6 +2066,31 @@ ractor_move(VALUE obj) } } +static VALUE +ractor_call_clone_try(VALUE obj) +{ + return rb_funcall(obj, idClone, 0); +} + +static VALUE +ractor_call_clone_rescue(VALUE obj, VALUE exc) +{ + rb_raise(rb_eRactorError, "can't clone unshareable instance of %"PRIsVALUE, rb_class_of(obj)); + UNREACHABLE_RETURN(Qnil); +} + +static VALUE +ractor_obj_clone(VALUE obj) +{ + VALUE clone = rb_rescue(ractor_call_clone_try, obj, ractor_call_clone_rescue, obj); + + if (obj == clone) { + rb_raise(rb_eRactorError, "#clone returned self"); + } + + return clone; +} + static enum obj_traverse_iterator_result copy_enter(VALUE obj, struct obj_traverse_replace_data *data) { @@ -2542,7 +2099,7 @@ copy_enter(VALUE obj, struct obj_traverse_replace_data *data) return traverse_skip; } else { - data->replacement = rb_obj_clone(obj); + data->replacement = ractor_obj_clone(obj); return traverse_cont; } } @@ -2557,7 +2114,7 @@ static VALUE ractor_copy(VALUE obj) { VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false); - if (val != Qundef) { + if (!UNDEF_P(val)) { return val; } else { @@ -2586,6 +2143,13 @@ ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy) return ST_CONTINUE; } +static enum rb_id_table_iterator_result +idkey_local_storage_mark_i(VALUE val, void *dmy) +{ + rb_gc_mark(val); + return ID_TABLE_CONTINUE; +} + static void ractor_local_storage_mark(rb_ractor_t *r) { @@ -2594,13 +2158,19 @@ ractor_local_storage_mark(rb_ractor_t *r) for (int i=0; i<freed_ractor_local_keys.cnt; i++) { rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i]; - st_data_t val; - if (st_delete(r->local_storage, (st_data_t *)&key, &val) && - key->type->free) { + st_data_t val, k = (st_data_t)key; + if (st_delete(r->local_storage, &k, &val) && + (key = (rb_ractor_local_key_t)k)->type->free) { (*key->type->free)((void *)val); } } } + + if (r->idkey_local_storage) { + rb_id_table_foreach_values(r->idkey_local_storage, idkey_local_storage_mark_i, NULL); + } + + rb_gc_mark(r->local_storage_store_lock); } static int @@ -2618,6 +2188,10 @@ ractor_local_storage_free(rb_ractor_t *r) st_foreach(r->local_storage, ractor_local_storage_free_i, 0); st_free_table(r->local_storage); } + + if (r->idkey_local_storage) { + rb_id_table_free(r->idkey_local_storage); + } } static void @@ -2659,22 +2233,20 @@ rb_ractor_local_storage_value_newkey(void) void rb_ractor_local_storage_delkey(rb_ractor_local_key_t key) { - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) { freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4; - REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa); + SIZED_REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa, freed_ractor_local_keys.cnt); } freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key; } - RB_VM_LOCK_LEAVE(); } static bool ractor_local_ref(rb_ractor_local_key_t key, void **pret) { if (rb_ractor_main_p()) { - if ((VALUE)key->main_cache != Qundef) { + if (!UNDEF_P((VALUE)key->main_cache)) { *pret = key->main_cache; return true; } @@ -2713,15 +2285,26 @@ ractor_local_set(rb_ractor_local_key_t key, void *ptr) VALUE rb_ractor_local_storage_value(rb_ractor_local_key_t key) { - VALUE val; - if (ractor_local_ref(key, (void **)&val)) { - return val; + void *val; + if (ractor_local_ref(key, &val)) { + return (VALUE)val; } else { return Qnil; } } +bool +rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val) +{ + if (ractor_local_ref(key, (void **)val)) { + return true; + } + else { + return false; + } +} + void rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val) { @@ -2752,12 +2335,315 @@ void rb_ractor_finish_marking(void) { for (int i=0; i<freed_ractor_local_keys.cnt; i++) { - ruby_xfree(freed_ractor_local_keys.keys[i]); + SIZED_FREE(freed_ractor_local_keys.keys[i]); } freed_ractor_local_keys.cnt = 0; if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) { freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA; - REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA); + SIZED_REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA, freed_ractor_local_keys.capa); + } +} + +static VALUE +ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ID id = rb_check_id(&sym); + struct rb_id_table *tbl = cr->idkey_local_storage; + VALUE val; + + if (id && tbl && rb_id_table_lookup(tbl, id, &val)) { + return val; + } + else { + return Qnil; + } +} + +static VALUE +ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ID id = SYM2ID(rb_to_symbol(sym)); + struct rb_id_table *tbl = cr->idkey_local_storage; + + if (tbl == NULL) { + tbl = cr->idkey_local_storage = rb_id_table_create(2); + } + rb_id_table_insert(tbl, id, val); + return val; +} + +struct ractor_local_storage_store_data { + rb_execution_context_t *ec; + struct rb_id_table *tbl; + ID id; + VALUE sym; +}; + +static VALUE +ractor_local_value_store_i(VALUE ptr) +{ + VALUE val; + struct ractor_local_storage_store_data *data = (struct ractor_local_storage_store_data *)ptr; + + if (rb_id_table_lookup(data->tbl, data->id, &val)) { + // after synchronization, we found already registered entry + } + else { + val = rb_yield(Qnil); + ractor_local_value_set(data->ec, Qnil, data->sym, val); + } + return val; +} + +static VALUE +ractor_local_value_store_if_absent(rb_execution_context_t *ec, VALUE self, VALUE sym) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + struct ractor_local_storage_store_data data = { + .ec = ec, + .sym = sym, + .id = SYM2ID(rb_to_symbol(sym)), + .tbl = cr->idkey_local_storage, + }; + VALUE val; + + if (data.tbl == NULL) { + data.tbl = cr->idkey_local_storage = rb_id_table_create(2); + } + else if (rb_id_table_lookup(data.tbl, data.id, &val)) { + // already set + return val; + } + + if (!cr->local_storage_store_lock) { + cr->local_storage_store_lock = rb_mutex_new(); + } + + return rb_mutex_synchronize(cr->local_storage_store_lock, ractor_local_value_store_i, (VALUE)&data); +} + +// shareable_proc + +static VALUE +ractor_shareable_proc(rb_execution_context_t *ec, VALUE replace_self, bool is_lambda) +{ + if (!rb_ractor_shareable_p(replace_self)) { + rb_raise(rb_eRactorIsolationError, "self should be shareable: %" PRIsVALUE, replace_self); + } + else { + VALUE proc = is_lambda ? rb_block_lambda() : rb_block_proc(); + return rb_proc_ractor_make_shareable(rb_proc_dup(proc), replace_self); + } +} + +// Ractor#require + +struct cross_ractor_require { + VALUE port; + bool raised; + + union { + struct { + VALUE feature; + } require; + + struct { + VALUE module; + ID name; + } autoload; + } as; + + bool silent; +}; + +RUBY_REFERENCES(cross_ractor_require_refs) = { + RUBY_REF_EDGE(struct cross_ractor_require, port), + RUBY_REF_EDGE(struct cross_ractor_require, as.require.feature), + RUBY_REF_END +}; + +static const rb_data_type_t cross_ractor_require_data_type = { + "ractor/cross_ractor_require", + { + RUBY_REFS_LIST_PTR(cross_ractor_require_refs), + RUBY_DEFAULT_FREE, + NULL, // memsize + NULL, // compact + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_DECL_MARKING | RUBY_TYPED_EMBEDDABLE +}; + +static VALUE +require_body(VALUE crr_obj) +{ + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + VALUE feature = crr->as.require.feature; + + ID require; + CONST_ID(require, "require"); + + if (crr->silent) { + int rb_require_internal_silent(VALUE fname); + return INT2NUM(rb_require_internal_silent(feature)); + } + else { + return rb_funcallv(Qnil, require, 1, &feature); + } +} + +static VALUE +require_rescue(VALUE crr_obj, VALUE errinfo) +{ + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + crr->raised = true; + return errinfo; +} + +static VALUE +require_result_send_body(VALUE ary) +{ + VALUE port = RARRAY_AREF(ary, 0); + VALUE results = RARRAY_AREF(ary, 1); + + rb_execution_context_t *ec = GET_EC(); + + ractor_port_send(ec, port, results, Qfalse); + return Qnil; +} + +static VALUE +require_result_send_resuce(VALUE port, VALUE errinfo) +{ + // TODO: need rescue? + ractor_port_send(GET_EC(), port, errinfo, Qfalse); + return Qnil; +} + +static VALUE +ractor_require_protect(VALUE crr_obj, VALUE (*func)(VALUE)) +{ + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + + const bool silent = crr->silent; + + VALUE debug, errinfo; + if (silent) { + debug = ruby_debug; + errinfo = rb_errinfo(); + } + + // get normal result or raised exception (with crr->raised == true) + VALUE result = rb_rescue2(func, crr_obj, require_rescue, crr_obj, rb_eException, 0); + + if (silent) { + ruby_debug = debug; + rb_set_errinfo(errinfo); + } + + rb_rescue2(require_result_send_body, + // [port, [result, raised]] + rb_ary_new_from_args(2, crr->port, rb_ary_new_from_args(2, result, crr->raised ? Qtrue : Qfalse)), + require_result_send_resuce, rb_eException, crr->port); + + RB_GC_GUARD(crr_obj); + return Qnil; +} + +static VALUE +ractor_require_func(void *crr_obj) +{ + return ractor_require_protect((VALUE)crr_obj, require_body); +} + +VALUE +rb_ractor_require(VALUE feature, bool silent) +{ + // We're about to block on the main ractor, so if we're holding the global lock we'll deadlock. + ASSERT_vm_unlocking(); + + struct cross_ractor_require *crr; + VALUE crr_obj = TypedData_Make_Struct(0, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + RB_OBJ_SET_SHAREABLE(crr_obj); // TODO: internal data? + + // Convert feature to proper file path and make it shareable as fstring + RB_OBJ_WRITE(crr_obj, &crr->as.require.feature, rb_fstring(FilePathValue(feature))); + RB_OBJ_WRITE(crr_obj, &crr->port, rb_ractor_make_shareable(ractor_port_new(GET_RACTOR()))); + crr->raised = false; + crr->silent = silent; + + rb_execution_context_t *ec = GET_EC(); + rb_ractor_t *main_r = GET_VM()->ractor.main_ractor; + rb_ractor_interrupt_exec(main_r, ractor_require_func, (void *)crr_obj, rb_interrupt_exec_flag_value_data); + + // wait for require done + VALUE results = ractor_port_receive(ec, crr->port); + ractor_port_close(ec, crr->port); + + VALUE exc = rb_ary_pop(results); + VALUE result = rb_ary_pop(results); + RB_GC_GUARD(crr_obj); + + if (RTEST(exc)) { + rb_exc_raise(result); + } + else { + return result; + } +} + +static VALUE +ractor_require(rb_execution_context_t *ec, VALUE self, VALUE feature) +{ + return rb_ractor_require(feature, false); +} + +static VALUE +autoload_load_body(VALUE crr_obj) +{ + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + return rb_autoload_load(crr->as.autoload.module, crr->as.autoload.name); +} + +static VALUE +ractor_autoload_load_func(void *crr_obj) +{ + return ractor_require_protect((VALUE)crr_obj, autoload_load_body); +} + +VALUE +rb_ractor_autoload_load(VALUE module, ID name) +{ + struct cross_ractor_require *crr; + VALUE crr_obj = TypedData_Make_Struct(0, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + RB_OBJ_SET_SHAREABLE(crr_obj); // TODO: internal data? + + RB_OBJ_WRITE(crr_obj, &crr->as.autoload.module, module); + RB_OBJ_WRITE(crr_obj, &crr->as.autoload.name, name); + RB_OBJ_WRITE(crr_obj, &crr->port, rb_ractor_make_shareable(ractor_port_new(GET_RACTOR()))); + + rb_execution_context_t *ec = GET_EC(); + rb_ractor_t *main_r = GET_VM()->ractor.main_ractor; + rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, (void *)crr_obj, rb_interrupt_exec_flag_value_data); + + // wait for require done + VALUE results = ractor_port_receive(ec, crr->port); + ractor_port_close(ec, crr->port); + + VALUE exc = rb_ary_pop(results); + VALUE result = rb_ary_pop(results); + RB_GC_GUARD(crr_obj); + + if (RTEST(exc)) { + rb_exc_raise(result); + } + else { + return result; } } |
