diff options
Diffstat (limited to 'ractor.c')
| -rw-r--r-- | ractor.c | 2548 |
1 files changed, 448 insertions, 2100 deletions
@@ -3,9 +3,9 @@ #include "ruby/ruby.h" #include "ruby/thread.h" #include "ruby/ractor.h" +#include "ruby/re.h" #include "ruby/thread_native.h" #include "vm_core.h" -#include "eval_intern.h" #include "vm_sync.h" #include "ractor_core.h" #include "internal/complex.h" @@ -19,6 +19,7 @@ #include "internal/thread.h" #include "variable.h" #include "yjit.h" +#include "zjit.h" VALUE rb_cRactor; static VALUE rb_cRactorSelector; @@ -33,6 +34,11 @@ static VALUE rb_cRactorMovedObject; static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line); + +#if RACTOR_CHECK_MODE > 0 +bool rb_ractor_ignore_belonging_flag = false; +#endif + // Ractor locking static void @@ -66,10 +72,17 @@ ractor_lock(rb_ractor_t *r, const char *file, int line) ASSERT_ractor_unlocking(r); rb_native_mutex_lock(&r->sync.lock); + const rb_execution_context_t *ec = rb_current_ec_noinline(); + if (ec) { + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + VM_ASSERT(!cr->malloc_gc_disabled); + cr->malloc_gc_disabled = true; + } + #if RACTOR_CHECK_MODE > 0 - if (rb_current_execution_context(false) != NULL) { - rb_ractor_t *cr = rb_current_ractor_raw(false); - r->sync.locked_by = cr ? rb_ractor_self(cr) : Qundef; + if (ec != NULL) { + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + r->sync.locked_by = rb_ractor_self(cr); } #endif @@ -93,6 +106,14 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line) #if RACTOR_CHECK_MODE > 0 r->sync.locked_by = Qnil; #endif + + const rb_execution_context_t *ec = rb_current_ec_noinline(); + if (ec) { + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + VM_ASSERT(cr->malloc_gc_disabled); + cr->malloc_gc_disabled = false; + } + rb_native_mutex_unlock(&r->sync.lock); RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); @@ -178,60 +199,81 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) // Ractor data/mark/free -static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i); static void ractor_local_storage_mark(rb_ractor_t *r); static void ractor_local_storage_free(rb_ractor_t *r); -static void -ractor_queue_mark(struct rb_ractor_queue *rq) -{ - for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); - rb_gc_mark(b->sender); - - switch (b->type.e) { - case basket_type_yielding: - case basket_type_take_basket: - case basket_type_deleted: - case basket_type_reserved: - // ignore - break; - default: - rb_gc_mark(b->p.send.v); - } +static void ractor_sync_mark(rb_ractor_t *r); +static void ractor_sync_free(rb_ractor_t *r); +static size_t ractor_sync_memsize(const rb_ractor_t *r); +static void ractor_sync_init(rb_ractor_t *r); + +static int +mark_targeted_hook_list(st_data_t key, st_data_t value, st_data_t _arg) +{ + rb_hook_list_t *hook_list = (rb_hook_list_t*)value; + + if (hook_list->type == hook_list_type_targeted_iseq) { + rb_gc_mark((VALUE)key); + } + else { + rb_method_definition_t *def = (rb_method_definition_t*)key; + RUBY_ASSERT(hook_list->type == hook_list_type_targeted_def); + rb_gc_mark(def->body.bmethod.proc); } + rb_hook_list_mark(hook_list); + + return ST_CONTINUE; } static void ractor_mark(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - - ractor_queue_mark(&r->sync.recv_queue); - ractor_queue_mark(&r->sync.takers_queue); + bool checking_shareable = rb_gc_checking_shareable(); rb_gc_mark(r->loc); rb_gc_mark(r->name); - rb_gc_mark(r->r_stdin); - rb_gc_mark(r->r_stdout); - rb_gc_mark(r->r_stderr); - rb_hook_list_mark(&r->pub.hooks); - - if (r->threads.cnt > 0) { - rb_thread_t *th = 0; - ccan_list_for_each(&r->threads.set, th, lt_node) { - VM_ASSERT(th != NULL); - rb_gc_mark(th->self); + + if (!checking_shareable) { + // may unshareable objects + rb_gc_mark(r->r_stdin); + rb_gc_mark(r->r_stdout); + rb_gc_mark(r->r_stderr); + rb_gc_mark(r->verbose); + rb_gc_mark(r->debug); + + // mark received messages + ractor_sync_mark(r); + + rb_hook_list_mark(&r->pub.hooks); + if (r->pub.targeted_hooks) { + st_foreach(r->pub.targeted_hooks, mark_targeted_hook_list, 0); + } + + if (r->threads.cnt > 0) { + rb_thread_t *th = 0; + ccan_list_for_each(&r->threads.set, th, lt_node) { + VM_ASSERT(th != NULL); + rb_gc_mark(th->self); + } } + + ractor_local_storage_mark(r); } +} - ractor_local_storage_mark(r); +static int +free_targeted_hook_lists(st_data_t key, st_data_t val, st_data_t _arg) +{ + rb_hook_list_t *hook_list = (rb_hook_list_t*)val; + rb_hook_list_free(hook_list); + return ST_DELETE; } static void -ractor_queue_free(struct rb_ractor_queue *rq) +free_targeted_hooks(st_table *hooks_tbl) { - free(rq->baskets); + st_foreach(hooks_tbl, free_targeted_hook_lists, 0); } static void @@ -239,11 +281,14 @@ ractor_free(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); + free_targeted_hooks(r->pub.targeted_hooks); rb_native_mutex_destroy(&r->sync.lock); - ractor_queue_free(&r->sync.recv_queue); - ractor_queue_free(&r->sync.takers_queue); +#ifdef RUBY_THREAD_WIN32_H + rb_native_cond_destroy(&r->sync.wakeup_cond); +#endif ractor_local_storage_free(r); rb_hook_list_free(&r->pub.hooks); + st_free_table(r->pub.targeted_hooks); if (r->newobj_cache) { RUBY_ASSERT(r == ruby_single_main_ractor); @@ -252,24 +297,17 @@ ractor_free(void *ptr) r->newobj_cache = NULL; } + ractor_sync_free(r); ruby_xfree(r); } static size_t -ractor_queue_memsize(const struct rb_ractor_queue *rq) -{ - return sizeof(struct rb_ractor_basket) * rq->size; -} - -static size_t ractor_memsize(const void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; // TODO: more correct? - return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->sync.recv_queue) + - ractor_queue_memsize(&r->sync.takers_queue); + return sizeof(rb_ractor_t) + ractor_sync_memsize(r); } static const rb_data_type_t ractor_data_type = { @@ -317,1715 +355,7 @@ rb_ractor_current_id(void) } #endif -// Ractor queue - -static void -ractor_queue_setup(struct rb_ractor_queue *rq) -{ - rq->size = 2; - rq->cnt = 0; - rq->start = 0; - rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); -} - -static struct rb_ractor_basket * -ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - if (r != NULL) ASSERT_ractor_locking(r); - return &rq->baskets[rq->start]; -} - -static struct rb_ractor_basket * -ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) -{ - if (r != NULL) ASSERT_ractor_locking(r); - return &rq->baskets[(rq->start + i) % rq->size]; -} - -static void -ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - if (rq->reserved_cnt == 0) { - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - rq->serial++; - } - else { - ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted; - } -} - -static bool -ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) -{ - struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); - return basket_type_p(b, basket_type_deleted) || - basket_type_p(b, basket_type_reserved); -} - -static void -ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) { - ractor_queue_advance(r, rq); - } -} - -static bool -ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - if (rq->cnt == 0) { - return true; - } - - ractor_queue_compact(r, rq); - - for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(r, rq, i)) { - return false; - } - } - - return true; -} - -static bool -ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(r, rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); - *basket = *b; - - // remove from queue - b->type.e = basket_type_deleted; - ractor_queue_compact(r, rq); - return true; - } - } - - return false; -} - -static void -ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - if (rq->size <= rq->cnt) { - rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2); - for (int i=rq->size - rq->start; i<rq->cnt; i++) { - rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size]; - } - rq->size *= 2; - } - // copy basket into queue - rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket; - // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt); -} - -static void -ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - basket->type.e = basket_type_deleted; -} - -// Ractor basket - -static VALUE ractor_reset_belonging(VALUE obj); // in this file - -static VALUE -ractor_basket_value(struct rb_ractor_basket *b) -{ - switch (b->type.e) { - case basket_type_ref: - break; - case basket_type_copy: - case basket_type_move: - case basket_type_will: - b->type.e = basket_type_ref; - b->p.send.v = ractor_reset_belonging(b->p.send.v); - break; - default: - rb_bug("unreachable"); - } - - return b->p.send.v; -} - -static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) -{ - VALUE v = ractor_basket_value(b); - - // a ractor's main thread had an error and yielded us this exception during its dying moments - if (b->p.send.exception) { - VALUE cause = v; - VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); - rb_ivar_set(err, rb_intern("@ractor"), b->sender); - rb_ec_setup_exception(NULL, err, cause); - rb_exc_raise(err); - } - - return v; -} - -// Ractor synchronizations - -#if USE_RUBY_DEBUG_LOG -static const char * -wait_status_str(enum rb_ractor_wait_status wait_status) -{ - switch ((int)wait_status) { - case wait_none: return "none"; - case wait_receiving: return "receiving"; - case wait_taking: return "taking"; - case wait_yielding: return "yielding"; - case wait_receiving|wait_taking: return "receiving|taking"; - case wait_receiving|wait_yielding: return "receiving|yielding"; - case wait_taking|wait_yielding: return "taking|yielding"; - case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; - } - rb_bug("unreachable"); -} - -static const char * -wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status) -{ - switch (wakeup_status) { - case wakeup_none: return "none"; - case wakeup_by_send: return "by_send"; - case wakeup_by_yield: return "by_yield"; - case wakeup_by_take: return "by_take"; - case wakeup_by_close: return "by_close"; - case wakeup_by_interrupt: return "by_interrupt"; - case wakeup_by_retry: return "by_retry"; - } - rb_bug("unreachable"); -} - -static const char * -basket_type_name(enum rb_ractor_basket_type type) -{ - switch (type) { - case basket_type_none: return "none"; - case basket_type_ref: return "ref"; - case basket_type_copy: return "copy"; - case basket_type_move: return "move"; - case basket_type_will: return "will"; - case basket_type_deleted: return "deleted"; - case basket_type_reserved: return "reserved"; - case basket_type_take_basket: return "take_basket"; - case basket_type_yielding: return "yielding"; - } - VM_ASSERT(0); - return NULL; -} -#endif // USE_RUBY_DEBUG_LOG - -static rb_thread_t * -ractor_sleeping_by(const rb_ractor_t *r, rb_thread_t *th, enum rb_ractor_wait_status wait_status) -{ - if (th) { - if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) { - return th; - } - } - else { - // find any thread that has this ractor wait status that is blocked - ccan_list_for_each(&r->sync.wait.waiting_threads, th, ractor_waiting.waiting_node) { - if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) { - return th; - } - } - } - return NULL; -} - -#ifdef RUBY_THREAD_PTHREAD_H -// thread_*.c -void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th); -#else - -// win32 -static void -rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th) -{ - (void)r; - ASSERT_ractor_locking(r); - rb_native_cond_signal(&th->ractor_waiting.cond); - -} -#endif - - -/* - * Wakeup `r` if the given `th` is blocked and has the given ractor `wait_status`. - * Wakeup any blocked thread in `r` with the given ractor `wait_status` if `th` is NULL. - */ -static bool -ractor_wakeup(rb_ractor_t *r, rb_thread_t *th /* can be NULL */, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status) -{ - ASSERT_ractor_locking(r); - - RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s", - rb_ractor_id(r), - wait_status_str(th->ractor_waiting.wait_status), - wait_status_str(wait_status), - wakeup_status_str(wakeup_status)); - - if ((th = ractor_sleeping_by(r, th, wait_status)) != NULL) { - th->ractor_waiting.wakeup_status = wakeup_status; - rb_ractor_sched_wakeup(r, th); - return true; - } - else { - return false; - } -} - -// unblock function (UBF). This gets called when another thread on this or another ractor sets our thread's interrupt flag. -// This is not async-safe. -static void -ractor_sleep_interrupt(void *ptr) -{ - rb_execution_context_t *ec = ptr; - rb_ractor_t *r = rb_ec_ractor_ptr(ec); - rb_thread_t *th = rb_ec_thread_ptr(ec); - - RACTOR_LOCK(r); - { - ractor_wakeup(r, th, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt); - } - RACTOR_UNLOCK(r); -} - -typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); - -// Checks the current thread for ruby interrupts and runs the cleanup function `cf_func` with `cf_data` if -// `rb_ec_check_ints` is going to raise. See the `rb_threadptr_execute_interrupts` for info on when it can raise. -static void -ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, ractor_sleep_cleanup_function cf_func, void *cf_data) -{ - if (cur_th->ractor_waiting.wait_status != wait_none) { - enum rb_ractor_wait_status prev_wait_status = cur_th->ractor_waiting.wait_status; - cur_th->ractor_waiting.wait_status = wait_none; - cur_th->ractor_waiting.wakeup_status = wakeup_by_interrupt; - - RACTOR_UNLOCK(cr); - { - if (cf_func) { - enum ruby_tag_type state; - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - rb_ec_check_ints(ec); - } - EC_POP_TAG(); - - if (state) { - (*cf_func)(cr, cf_data); // cleanup function is run after the ubf, if it had ubf - EC_JUMP_TAG(ec, state); - } - } - else { - rb_ec_check_ints(ec); - } - } - - RACTOR_LOCK(cr); - cur_th->ractor_waiting.wait_status = prev_wait_status; - } -} - -#ifdef RUBY_THREAD_PTHREAD_H -void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf); -#else - -static void -ractor_cond_wait(rb_ractor_t *r, rb_thread_t *th) -{ -#if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->sync.locked_by; - r->sync.locked_by = Qnil; -#endif - rb_native_cond_wait(&th->ractor_waiting.cond, &r->sync.lock); - -#if RACTOR_CHECK_MODE > 0 - r->sync.locked_by = locked_by; -#endif -} - -static void * -ractor_sleep_wo_gvl(void *ptr) -{ - rb_ractor_t *cr = ptr; - rb_execution_context_t *ec = cr->threads.running_ec; - VM_ASSERT(GET_EC() == ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - RACTOR_LOCK_SELF(cr); - { - VM_ASSERT(cur_th->ractor_waiting.wait_status != wait_none); - // it's possible that another ractor has woken us up (ractor_wakeup), - // so check this condition - if (cur_th->ractor_waiting.wakeup_status == wakeup_none) { - cur_th->status = THREAD_STOPPED_FOREVER; - ractor_cond_wait(cr, cur_th); - cur_th->status = THREAD_RUNNABLE; - VM_ASSERT(cur_th->ractor_waiting.wakeup_status != wakeup_none); - } - else { - RUBY_DEBUG_LOG("rare timing, no cond wait"); - } - cur_th->ractor_waiting.wait_status = wait_none; - } - RACTOR_UNLOCK_SELF(cr); - return NULL; -} - -static void -rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_ractor_sleep_interrupt) -{ - ASSERT_ractor_locking(cr); - rb_thread_t *th = rb_ec_thread_ptr(ec); - struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node; - VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked - ccan_list_add(&cr->sync.wait.waiting_threads, waitn); - RACTOR_UNLOCK(cr); - { - rb_nogvl(ractor_sleep_wo_gvl, cr, ubf_ractor_sleep_interrupt, ec, RB_NOGVL_INTR_FAIL); - } - RACTOR_LOCK(cr); - ccan_list_del_init(waitn); -} -#endif - -/* - * Sleep the current ractor's current thread until another ractor wakes us up or another thread calls our unblock function. - * The following ractor actions can cause this function to be called: - * Ractor#take (wait_taking) - * Ractor.yield (wait_yielding) - * Ractor.receive (wait_receiving) - * Ractor.select (can be a combination of the above wait states, depending on the states of the ractors passed to Ractor.select) - */ -static enum rb_ractor_wakeup_status -ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status, - ractor_sleep_cleanup_function cf_func, void *cf_data) -{ - ASSERT_ractor_locking(cr); - enum rb_ractor_wakeup_status wakeup_status; - VM_ASSERT(GET_RACTOR() == cr); - - VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none); - VM_ASSERT(wait_status != wait_none); - cur_th->ractor_waiting.wait_status = wait_status; - cur_th->ractor_waiting.wakeup_status = wakeup_none; - - // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr, - // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); - - RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status)); - - while (cur_th->ractor_waiting.wakeup_status == wakeup_none) { - rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt); - ractor_check_ints(ec, cr, cur_th, cf_func, cf_data); - } - - cur_th->ractor_waiting.wait_status = wait_none; - - wakeup_status = cur_th->ractor_waiting.wakeup_status; - cur_th->ractor_waiting.wakeup_status = wakeup_none; - - RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status)); - - ASSERT_ractor_locking(cr); - return wakeup_status; -} - -static enum rb_ractor_wakeup_status -ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status) -{ - return ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, 0, NULL); -} - -// Ractor.receive - -static void -ractor_recursive_receive_if(rb_thread_t *th) -{ - if (th->ractor_waiting.receiving_mutex && rb_mutex_owned_p(th->ractor_waiting.receiving_mutex)) { - rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); - } -} - -static VALUE -ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) -{ - struct rb_ractor_basket basket; - ractor_recursive_receive_if(rb_ec_thread_ptr(ec)); - bool received = false; - - RACTOR_LOCK_SELF(cr); - { - RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt); - received = ractor_queue_deq(cr, rq, &basket); - } - RACTOR_UNLOCK_SELF(cr); - - if (!received) { - if (cr->sync.incoming_port_closed) { - rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); - } - return Qundef; - } - else { - return ractor_basket_accept(&basket); - } -} - -static void -ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) -{ - VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - ractor_recursive_receive_if(cur_th); - - RACTOR_LOCK(cr); - { - while (ractor_queue_empty_p(cr, rq) && !cr->sync.incoming_port_closed) { - ractor_sleep(ec, cr, cur_th, wait_receiving); - } - } - RACTOR_UNLOCK(cr); -} - -static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) -{ - VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); - VALUE v; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - - while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) { - ractor_wait_receive(ec, cr, rq); - } - - return v; -} - -#if 0 -static void -rq_dump(struct rb_ractor_queue *rq) -{ - bool bug = false; - for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); - fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), - (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); - if (basket_type_p(b, basket_type_reserved) bug = true; - } - if (bug) rb_bug("!!"); -} -#endif - -struct receive_block_data { - rb_ractor_t *cr; - rb_thread_t *th; - struct rb_ractor_queue *rq; - VALUE v; - int index; - bool success; -}; - -static void -ractor_receive_if_lock(rb_thread_t *th) -{ - VALUE m = th->ractor_waiting.receiving_mutex; - if (m == Qfalse) { - m = th->ractor_waiting.receiving_mutex = rb_mutex_new(); - } - rb_mutex_lock(m); -} - -static VALUE -receive_if_body(VALUE ptr) -{ - struct receive_block_data *data = (struct receive_block_data *)ptr; - - ractor_receive_if_lock(data->th); - VALUE block_result = rb_yield(data->v); - rb_ractor_t *cr = data->cr; - - RACTOR_LOCK_SELF(cr); - { - struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); - VM_ASSERT(basket_type_p(b, basket_type_reserved)); - data->rq->reserved_cnt--; - - if (RTEST(block_result)) { - ractor_queue_delete(cr, data->rq, b); - ractor_queue_compact(cr, data->rq); - } - else { - b->type.e = basket_type_ref; - } - } - RACTOR_UNLOCK_SELF(cr); - - data->success = true; - - if (RTEST(block_result)) { - return data->v; - } - else { - return Qundef; - } -} - -static VALUE -receive_if_ensure(VALUE v) -{ - struct receive_block_data *data = (struct receive_block_data *)v; - rb_ractor_t *cr = data->cr; - rb_thread_t *cur_th = data->th; - - if (!data->success) { - RACTOR_LOCK_SELF(cr); - { - struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); - VM_ASSERT(basket_type_p(b, basket_type_reserved)); - b->type.e = basket_type_deleted; - data->rq->reserved_cnt--; - } - RACTOR_UNLOCK_SELF(cr); - } - - rb_mutex_unlock(cur_th->ractor_waiting.receiving_mutex); - return Qnil; -} - -static VALUE -ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) -{ - if (!RTEST(b)) rb_raise(rb_eArgError, "no block given"); - - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - unsigned int serial = (unsigned int)-1; - int index = 0; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - - while (1) { - VALUE v = Qundef; - - ractor_wait_receive(ec, cr, rq); - - RACTOR_LOCK_SELF(cr); - { - if (serial != rq->serial) { - serial = rq->serial; - index = 0; - } - - // check newer version - for (int i=index; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(cr, rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i); - v = ractor_basket_value(b); - b->type.e = basket_type_reserved; - rq->reserved_cnt++; - index = i; - break; - } - } - } - RACTOR_UNLOCK_SELF(cr); - - if (!UNDEF_P(v)) { - struct receive_block_data data = { - .cr = cr, - .th = cur_th, - .rq = rq, - .v = v, - .index = index, - .success = false, - }; - - VALUE result = rb_ensure(receive_if_body, (VALUE)&data, - receive_if_ensure, (VALUE)&data); - - if (!UNDEF_P(result)) return result; - index++; - } - - RUBY_VM_CHECK_INTS(ec); - } -} - -static void -ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) -{ - bool closed = false; - - RACTOR_LOCK(r); - { - if (r->sync.incoming_port_closed) { - closed = true; - } - else { - ractor_queue_enq(r, &r->sync.recv_queue, b); - // wakeup any receiving thread in `r` - ractor_wakeup(r, NULL, wait_receiving, wakeup_by_send); - } - } - RACTOR_UNLOCK(r); - - if (closed) { - rb_raise(rb_eRactorClosedError, "The incoming-port is already closed"); - } -} - -// Ractor#send - -static VALUE ractor_move(VALUE obj); // in this file -static VALUE ractor_copy(VALUE obj); // in this file - -static void -ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype) -{ - VALUE v; - enum rb_ractor_basket_type type; - - if (rb_ractor_shareable_p(obj)) { - type = basket_type_ref; - v = obj; - } - else if (!RTEST(move)) { - v = ractor_copy(obj); - type = basket_type_copy; - } - else { - type = basket_type_move; - v = ractor_move(obj); - } - - *pobj = v; - *ptype = type; -} - -static void -ractor_basket_fill_(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc) -{ - VM_ASSERT(cr == GET_RACTOR()); - - basket->sender = cr->pub.self; - basket->sending_th = cur_th; - basket->p.send.exception = exc; - basket->p.send.v = obj; -} - -static void -ractor_basket_fill(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc) -{ - VALUE v; - enum rb_ractor_basket_type type; - ractor_basket_prepare_contents(obj, move, &v, &type); - ractor_basket_fill_(cr, cur_th, basket, v, exc); - basket->type.e = type; -} - -static void -ractor_basket_fill_will(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc) -{ - ractor_basket_fill_(cr, cur_th, basket, obj, exc); - basket->type.e = basket_type_will; -} - -static VALUE -ractor_send(rb_execution_context_t *ec, rb_ractor_t *recv_r, VALUE obj, VALUE move) -{ - struct rb_ractor_basket basket; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - // TODO: Ractor local GC - ractor_basket_fill(cr, cur_th, &basket, obj, move, false); - ractor_send_basket(ec, recv_r, &basket); - return recv_r->pub.self; -} - -// Ractor#take - -static bool -ractor_take_has_will(rb_ractor_t *r) -{ - ASSERT_ractor_locking(r); - - return basket_type_p(&r->sync.will_basket, basket_type_will); -} - -static bool -ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b) -{ - ASSERT_ractor_locking(r); - - if (ractor_take_has_will(r)) { - *b = r->sync.will_basket; - r->sync.will_basket.type.e = basket_type_none; - return true; - } - else { - VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none)); - return false; - } -} - -static bool -ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b) -{ - ASSERT_ractor_unlocking(r); - bool taken; - - RACTOR_LOCK(r); - { - taken = ractor_take_will(r, b); - } - RACTOR_UNLOCK(r); - - return taken; -} - -static bool -ractor_register_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket, - bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error) -{ - struct rb_ractor_basket b = { - .type.e = basket_type_take_basket, - .sender = cr->pub.self, - .sending_th = cur_th, - .p = { - .take = { - .basket = take_basket, // pointer to our stack value saved in ractor `r` queue - .config = config, - }, - }, - }; - bool closed = false; - - RACTOR_LOCK(r); - { - if (is_take && ractor_take_will(r, take_basket)) { - RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r)); - } - else if (!is_take && ractor_take_has_will(r)) { - RUBY_DEBUG_LOG("has_will"); - VM_ASSERT(config != NULL); - config->closed = true; - } - else if (r->sync.outgoing_port_closed) { - closed = true; - } - else { - RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r)); - ractor_queue_enq(r, &r->sync.takers_queue, &b); - - if (basket_none_p(take_basket)) { - // wakeup any thread in `r` that has yielded, if there is any. - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take); - } - } - } - RACTOR_UNLOCK(r); - - if (closed) { - if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - return false; - } - else { - return true; - } -} - -static bool -ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket) -{ - struct rb_ractor_queue *ts = &r->sync.takers_queue; - bool deleted = false; - - RACTOR_LOCK(r); - { - if (r->sync.outgoing_port_closed) { - // ok - } - else { - for (int i=0; i<ts->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); - if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) { - ractor_queue_delete(r, ts, b); - deleted = true; - } - } - if (deleted) { - ractor_queue_compact(r, ts); - } - } - } - RACTOR_UNLOCK(r); - - return deleted; -} - -static VALUE -ractor_try_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *recv_r, struct rb_ractor_basket *take_basket) -{ - bool taken; - - RACTOR_LOCK_SELF(cr); - { - // If it hasn't yielded yet or is currently in the process of yielding, sleep more - if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { - taken = false; - } - else { - taken = true; // basket type might be, for ex, basket_type_copy if value was copied during yield - } - } - RACTOR_UNLOCK_SELF(cr); - - if (taken) { - RUBY_DEBUG_LOG("taken"); - if (basket_type_p(take_basket, basket_type_deleted)) { - VM_ASSERT(recv_r->sync.outgoing_port_closed); - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - return ractor_basket_accept(take_basket); - } - else { - RUBY_DEBUG_LOG("not taken"); - return Qundef; - } -} - - -#if VM_CHECK_MODE > 0 -static bool -ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb) -{ - bool ret = false; - struct rb_ractor_queue *ts = &r->sync.takers_queue; - - RACTOR_LOCK(r); - { - for (int i=0; i<ts->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); - if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) { - ret = true; - break; - } - } - } - RACTOR_UNLOCK(r); - - return ret; -} -#endif - -// cleanup function, cr is unlocked -static void -ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb) -{ - retry: - if (basket_none_p(tb)) { // not yielded yet - if (!ractor_deregister_take(r, tb)) { - // not in r's takers queue - rb_thread_sleep(0); - goto retry; - } - } - else { - VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb)); - } -} - -struct take_wait_take_cleanup_data { - rb_ractor_t *r; - struct rb_ractor_basket *tb; -}; - -static void -ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr) -{ - struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr; - ractor_take_cleanup(cr, data->r, data->tb); -} - -static void -ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket) -{ - struct take_wait_take_cleanup_data data = { - .r = r, - .tb = take_basket, - }; - - RACTOR_LOCK_SELF(cr); - { - if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { - ractor_sleep_with_cleanup(ec, cr, cur_th, wait_taking, ractor_wait_take_cleanup, &data); - } - } - RACTOR_UNLOCK_SELF(cr); -} - -static VALUE -ractor_take(rb_execution_context_t *ec, rb_ractor_t *recv_r) -{ - RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(recv_r)); - VALUE v; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - struct rb_ractor_basket take_basket = { - .type.e = basket_type_none, - .sender = 0, - }; - - ractor_register_take(cr, cur_th, recv_r, &take_basket, true, NULL, false); - - while (UNDEF_P(v = ractor_try_take(cr, cur_th, recv_r, &take_basket))) { - ractor_wait_take(ec, cr, cur_th, recv_r, &take_basket); - } - - VM_ASSERT(!basket_none_p(&take_basket)); // might be, for ex, basket_type_copy - VM_ASSERT(!ractor_check_specific_take_basket_lock(recv_r, &take_basket)); - - return v; -} - -// Ractor.yield - -static bool -ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs) -{ - ASSERT_ractor_locking(cr); - - for (int i=0; i<rs->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i); - if (basket_type_p(b, basket_type_take_basket) && - basket_none_p(b->p.take.basket)) { - return true; - } - } - - return false; -} - -// Find another ractor that is taking from this ractor, so we can yield to it -static bool -ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b) -{ - ASSERT_ractor_unlocking(cr); - struct rb_ractor_basket *first_tb = NULL; - bool found = false; - - RACTOR_LOCK_SELF(cr); - { - while (ractor_queue_deq(cr, rs, b)) { - if (basket_type_p(b, basket_type_take_basket)) { // some other ractor is taking - struct rb_ractor_basket *tb = b->p.take.basket; - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { - found = true; // payload basket is now "yielding" type - break; - } - else { - ractor_queue_enq(cr, rs, b); - if (first_tb == NULL) first_tb = tb; - struct rb_ractor_basket *head = ractor_queue_head(cr, rs); - VM_ASSERT(head != NULL); - if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) { - break; // loop detected - } - } - } - else { - VM_ASSERT(basket_none_p(b)); - } - } - - if (found && b->p.take.config && !b->p.take.config->oneshot) { - ractor_queue_enq(cr, rs, b); - } - } - RACTOR_UNLOCK_SELF(cr); - - return found; -} - -// Try yielding to a taking ractor -static bool -ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will) -{ - // Don't lock yielding ractor at same time as taking ractor. This could deadlock due to timing - // issue because we don't have a lock hierarchy. - ASSERT_ractor_unlocking(cr); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - struct rb_ractor_basket b; - - if (ractor_deq_take_basket(cr, ts, &b)) { // deq a take basket from takers queue of `cr` into `b` - VM_ASSERT(basket_type_p(&b, basket_type_take_basket)); - VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding)); - - rb_ractor_t *tr = RACTOR_PTR(b.sender); // taking ractor - rb_thread_t *tr_th = b.sending_th; // taking thread - struct rb_ractor_basket *tb = b.p.take.basket; // payload basket - enum rb_ractor_basket_type type; - - RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr)); - - if (is_will) { - type = basket_type_will; // last message - } - else { - enum ruby_tag_type state; - - // begin - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - // TODO: Ractor local GC - ractor_basket_prepare_contents(obj, move, &obj, &type); - } - EC_POP_TAG(); - // rescue ractor copy/move error, then re-raise - if (state) { - RACTOR_LOCK_SELF(cr); - { - b.p.take.basket->type.e = basket_type_none; - ractor_queue_enq(cr, ts, &b); - } - RACTOR_UNLOCK_SELF(cr); - EC_JUMP_TAG(ec, state); - } - } - - RACTOR_LOCK(tr); - { - VM_ASSERT(basket_type_p(tb, basket_type_yielding)); - // fill atomic - RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr)); - ractor_basket_fill_(cr, cur_th, tb, obj, exc); // fill the take basket payload - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) { - rb_bug("unreachable"); - } - ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_yield); - } - RACTOR_UNLOCK(tr); - - return true; - } - else if (cr->sync.outgoing_port_closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - else { - RUBY_DEBUG_LOG("no take basket"); - return false; - } -} - -static void -ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts) -{ - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - RACTOR_LOCK_SELF(cr); - { - while (!ractor_check_take_basket(cr, ts) && !cr->sync.outgoing_port_closed) { - ractor_sleep(ec, cr, cur_th, wait_yielding); - } - } - RACTOR_UNLOCK_SELF(cr); -} - -// In order to yield, we wait until our takers queue has at least one element. Then, we wakeup a taker. -static VALUE -ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move) -{ - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - - while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) { - ractor_wait_yield(ec, cr, ts); - } - - return Qnil; -} - -// Ractor::Selector - -struct rb_ractor_selector { - rb_ractor_t *r; - struct rb_ractor_basket take_basket; - st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *) -}; - -static int -ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data) -{ - const rb_ractor_t *r = (rb_ractor_t *)key; - rb_gc_mark(r->pub.self); - return ST_CONTINUE; -} - -static void -ractor_selector_mark(void *ptr) -{ - struct rb_ractor_selector *s = ptr; - - if (s->take_ractors) { - st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0); - } - - switch (s->take_basket.type.e) { - case basket_type_ref: - case basket_type_copy: - case basket_type_move: - case basket_type_will: - rb_gc_mark(s->take_basket.sender); - rb_gc_mark(s->take_basket.p.send.v); - break; - default: - break; - } -} - -static int -ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data) -{ - struct rb_ractor_selector *s = (struct rb_ractor_selector *)data; - struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val; - - if (!config->closed) { - ractor_deregister_take((rb_ractor_t *)key, &s->take_basket); - } - free(config); - return ST_CONTINUE; -} - -static void -ractor_selector_free(void *ptr) -{ - struct rb_ractor_selector *s = ptr; - st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s); - st_free_table(s->take_ractors); - ruby_xfree(ptr); -} - -static size_t -ractor_selector_memsize(const void *ptr) -{ - const struct rb_ractor_selector *s = ptr; - return sizeof(struct rb_ractor_selector) + - st_memsize(s->take_ractors) + - s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config); -} - -static const rb_data_type_t ractor_selector_data_type = { - "ractor/selector", - { - ractor_selector_mark, - ractor_selector_free, - ractor_selector_memsize, - NULL, // update - }, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY, -}; - -static struct rb_ractor_selector * -RACTOR_SELECTOR_PTR(VALUE selv) -{ - VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type)); - - return (struct rb_ractor_selector *)DATA_PTR(selv); -} - -// Ractor::Selector.new - -static VALUE -ractor_selector_create(VALUE klass) -{ - struct rb_ractor_selector *s; - VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s); - s->take_basket.type.e = basket_type_reserved; - s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config - return selv; -} - -// Ractor::Selector#add(r) - -/* - * call-seq: - * add(ractor) -> ractor - * - * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added. - * Returns _ractor_. - */ -static VALUE -ractor_selector_add(VALUE selv, VALUE rv) -{ - if (!rb_ractor_p(rv)) { - rb_raise(rb_eArgError, "Not a ractor object"); - } - - rb_ractor_t *r = RACTOR_PTR(rv); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) { - rb_raise(rb_eArgError, "already added"); - } - - struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config)); - VM_ASSERT(config != NULL); - config->closed = false; - config->oneshot = false; - - if (ractor_register_take(GET_RACTOR(), GET_THREAD(), r, &s->take_basket, false, config, true)) { - st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config); - } - - return rv; -} - -// Ractor::Selector#remove(r) - -/* call-seq: - * remove(ractor) -> ractor - * - * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added. - * Returns the removed _ractor_. - */ -static VALUE -ractor_selector_remove(VALUE selv, VALUE rv) -{ - if (!rb_ractor_p(rv)) { - rb_raise(rb_eArgError, "Not a ractor object"); - } - - rb_ractor_t *r = RACTOR_PTR(rv); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r)); - - if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) { - rb_raise(rb_eArgError, "not added yet"); - } - - ractor_deregister_take(r, &s->take_basket); - struct rb_ractor_selector_take_config *config; - st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config); - free(config); - - return rv; -} - -// Ractor::Selector#clear - -struct ractor_selector_clear_data { - VALUE selv; - rb_execution_context_t *ec; -}; - -static int -ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data) -{ - VALUE selv = (VALUE)data; - rb_ractor_t *r = (rb_ractor_t *)key; - ractor_selector_remove(selv, r->pub.self); - return ST_CONTINUE; -} - -/* - * call-seq: - * clear -> self - * - * Removes all ractors from +self+. Raises +self+. - */ -static VALUE -ractor_selector_clear(VALUE selv) -{ - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv); - st_clear(s->take_ractors); - return selv; -} - -/* - * call-seq: - * empty? -> true or false - * - * Returns +true+ if no ractor is added. - */ -static VALUE -ractor_selector_empty_p(VALUE selv) -{ - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse; -} - -static int -ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat) -{ - rb_ractor_t *r = (rb_ractor_t *)key; - struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat; - int ret; - - if (!basket_none_p(tb)) { - RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e)); - return ST_STOP; - } - - RACTOR_LOCK(r); - { - if (basket_type_p(&r->sync.will_basket, basket_type_will)) { - RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r)); - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) { - ractor_take_will(r, tb); - ret = ST_STOP; - } - else { - RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e)); - ret = ST_CONTINUE; - } - } - else if (r->sync.outgoing_port_closed) { - RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r)); - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) { - tb->sender = r->pub.self; - ret = ST_STOP; - } - else { - RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e)); - ret = ST_CONTINUE; - } - } - else { - RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r)); - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take); - ret = ST_CONTINUE; - } - } - RACTOR_UNLOCK(r); - - return ret; -} - -// Ractor::Selector#wait - -// cleanup function, cr is unlocked -static void -ractor_selector_wait_cleanup(rb_ractor_t *cr, void *ptr) -{ - struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr; - - RACTOR_LOCK_SELF(cr); - { - while (basket_type_p(tb, basket_type_yielding)) { - RACTOR_UNLOCK_SELF(cr); - { - rb_thread_sleep(0); - } - RACTOR_LOCK_SELF(cr); - } - // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately. - tb->type.e = basket_type_reserved; - } - RACTOR_UNLOCK_SELF(cr); -} - -/* :nodoc: */ -static VALUE -ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move) -{ - rb_execution_context_t *ec = GET_EC(); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - struct rb_ractor_basket *tb = &s->take_basket; - struct rb_ractor_basket taken_basket; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - bool do_receive = !!RTEST(do_receivev); - bool do_yield = !!RTEST(do_yieldv); - VALUE ret_v, ret_r; - enum rb_ractor_wait_status wait_status; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - - RUBY_DEBUG_LOG("start"); - - retry: - RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries); - - // setup wait_status - wait_status = wait_none; - if (s->take_ractors->num_entries > 0) wait_status |= wait_taking; - if (do_receive) wait_status |= wait_receiving; - if (do_yield) wait_status |= wait_yielding; - - RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status)); - - if (wait_status == wait_none) { - rb_raise(rb_eRactorError, "no taking ractors"); - } - - // check recv_queue - if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) { - ret_r = ID2SYM(rb_intern("receive")); - goto success; - } - - // check takers - if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) { - ret_v = Qnil; - ret_r = ID2SYM(rb_intern("yield")); - goto success; - } - - // check take_basket - VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved)); - s->take_basket.type.e = basket_type_none; - // kick all take target ractors - st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb); - - RACTOR_LOCK_SELF(cr); - { - retry_waiting: - while (1) { - if (!basket_none_p(tb)) { - RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e), - tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0); - break; - } - if (do_receive && !ractor_queue_empty_p(cr, rq)) { - RUBY_DEBUG_LOG("can receive (%d)", rq->cnt); - break; - } - if (do_yield && ractor_check_take_basket(cr, ts)) { - RUBY_DEBUG_LOG("can yield"); - break; - } - - ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, ractor_selector_wait_cleanup, tb); - } - - taken_basket = *tb; - - // ensure - // tb->type.e = basket_type_reserved # do it atomic in the following code - if (taken_basket.type.e == basket_type_yielding || - RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) { - - if (basket_type_p(tb, basket_type_yielding)) { - RACTOR_UNLOCK_SELF(cr); - { - rb_thread_sleep(0); - } - RACTOR_LOCK_SELF(cr); - } - goto retry_waiting; - } - } - RACTOR_UNLOCK_SELF(cr); - - // check the taken result - switch (taken_basket.type.e) { - case basket_type_none: - VM_ASSERT(do_receive || do_yield); - goto retry; - case basket_type_yielding: - rb_bug("unreachable"); - case basket_type_deleted: { - ractor_selector_remove(selv, taken_basket.sender); - - rb_ractor_t *r = RACTOR_PTR(taken_basket.sender); - if (ractor_take_will_lock(r, &taken_basket)) { - RUBY_DEBUG_LOG("has_will"); - } - else { - RUBY_DEBUG_LOG("no will"); - // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - // remove and retry wait - goto retry; - } - break; - } - case basket_type_will: - // no more messages - ractor_selector_remove(selv, taken_basket.sender); - break; - default: - break; - } - - RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e)); - - ret_v = ractor_basket_accept(&taken_basket); - ret_r = taken_basket.sender; - success: - return rb_ary_new_from_args(2, ret_r, ret_v); -} - -/* - * call-seq: - * wait(receive: false, yield_value: undef, move: false) -> [ractor, value] - * - * Waits until any ractor in _selector_ can be active. - */ -static VALUE -ractor_selector_wait(int argc, VALUE *argv, VALUE selector) -{ - VALUE options; - ID keywords[3]; - VALUE values[3]; - - keywords[0] = rb_intern("receive"); - keywords[1] = rb_intern("yield_value"); - keywords[2] = rb_intern("move"); - - rb_scan_args(argc, argv, "0:", &options); - rb_get_kwargs(options, keywords, 0, numberof(values), values); - return ractor_selector__wait(selector, - values[0] == Qundef ? Qfalse : RTEST(values[0]), - values[1] != Qundef, values[1], values[2]); -} - -static VALUE -ractor_selector_new(int argc, VALUE *ractors, VALUE klass) -{ - VALUE selector = ractor_selector_create(klass); - - for (int i=0; i<argc; i++) { - ractor_selector_add(selector, ractors[i]); - } - - return selector; -} - -static VALUE -ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move) -{ - VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector); - VALUE result; - int state; - - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move); - } - EC_POP_TAG(); - if (state != TAG_NONE) { - // ensure - ractor_selector_clear(selector); - - // jump - EC_JUMP_TAG(ec, state); - } - - RB_GC_GUARD(ractors); - return result; -} - -// Ractor#close_incoming - -static VALUE -ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE prev; - rb_thread_t *r_th = NULL; - if (r == rb_ec_ractor_ptr(ec)) { - r_th = rb_ec_thread_ptr(ec); - } - - RACTOR_LOCK(r); - { - if (!r->sync.incoming_port_closed) { - prev = Qfalse; - r->sync.incoming_port_closed = true; - if (ractor_wakeup(r, r_th, wait_receiving, wakeup_by_close)) { - VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue)); - RUBY_DEBUG_LOG("cancel receiving"); - } - } - else { - prev = Qtrue; - } - } - RACTOR_UNLOCK(r); - return prev; -} - -// Ractor#close_outgoing - -static VALUE -ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) -{ - VALUE prev; - - RACTOR_LOCK(r); - { - struct rb_ractor_queue *ts = &r->sync.takers_queue; - rb_ractor_t *tr; - struct rb_ractor_basket b; - - if (!r->sync.outgoing_port_closed) { - prev = Qfalse; - r->sync.outgoing_port_closed = true; - } - else { - VM_ASSERT(ractor_queue_empty_p(r, ts)); - prev = Qtrue; - } - - // wakeup all taking ractors - while (ractor_queue_deq(r, ts, &b)) { - if (basket_type_p(&b, basket_type_take_basket)) { - tr = RACTOR_PTR(b.sender); - rb_thread_t *tr_th = b.sending_th; - struct rb_ractor_basket *tb = b.p.take.basket; - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { - b.p.take.basket->sender = r->pub.self; - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) { - rb_bug("unreachable"); - } - RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender))); - } - - if (b.p.take.config) { - b.p.take.config->closed = true; - } - - // TODO: deadlock-able? - RACTOR_LOCK(tr); - { - ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_close); - } - RACTOR_UNLOCK(tr); - } - } - - // raising yielding Ractor - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_close); - - VM_ASSERT(ractor_queue_empty_p(r, ts)); - } - RACTOR_UNLOCK(r); - return prev; -} +#include "ractor_sync.c" // creation/termination @@ -2129,6 +459,7 @@ ractor_alloc(VALUE klass) VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r); FL_SET_RAW(rv, RUBY_FL_SHAREABLE); r->pub.self = rv; + r->next_ec_serial = 1; VM_ASSERT(ractor_status_p(r, ractor_created)); return rv; } @@ -2146,6 +477,7 @@ rb_ractor_main_alloc(void) r->name = Qnil; r->pub.self = Qnil; r->newobj_cache = rb_gc_ractor_cache_alloc(r); + r->next_ec_serial = 1; ruby_single_main_ractor = r; return r; @@ -2176,9 +508,7 @@ rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r) rb_gc_ractor_cache_free(r->newobj_cache); r->newobj_cache = NULL; r->status_ = ractor_terminated; - r->sync.outgoing_port_closed = true; - r->sync.incoming_port_closed = true; - r->sync.will_basket.type.e = basket_type_none; + ractor_sync_terminate_atfork(vm, r); } #endif @@ -2195,15 +525,9 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->sync.recv_queue); - ractor_queue_setup(&r->sync.takers_queue); - rb_native_mutex_initialize(&r->sync.lock); - rb_native_cond_initialize(&r->barrier_wait_cond); - -#ifdef RUBY_THREAD_WIN32_H - rb_native_cond_initialize(&r->barrier_wait_cond); -#endif - ccan_list_head_init(&r->sync.wait.waiting_threads); + ractor_sync_init(r); + r->pub.targeted_hooks = st_init_numtable(); + r->pub.hooks.type = hook_list_type_ractor_local; // thread management rb_thread_sched_init(&r->threads.sched, false); @@ -2218,20 +542,24 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)", rb_enc_name(enc)); } - name = rb_str_new_frozen(name); + name = RB_OBJ_SET_SHAREABLE(rb_str_new_frozen(name)); } - r->name = name; + + if (!SPECIAL_CONST_P(loc)) RB_OBJ_SET_SHAREABLE(loc); r->loc = loc; + r->name = name; } void rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th) { - r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r); + VALUE rv = r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r); FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE); ractor_init(r, Qnil, Qnil); r->threads.main = th; rb_ractor_living_threads_insert(r, th); + + RB_GC_GUARD(rv); } static VALUE @@ -2241,7 +569,6 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL rb_ractor_t *r = RACTOR_PTR(rv); ractor_init(r, name, loc); - // can block here r->pub.id = ractor_next_id(); RUBY_DEBUG_LOG("r:%u", r->pub.id); @@ -2250,90 +577,59 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL r->debug = cr->debug; rb_yjit_before_ractor_spawn(); + rb_zjit_before_ractor_spawn(); rb_thread_create_ractor(r, args, block); RB_GC_GUARD(rv); return rv; } +#if 0 static VALUE ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func) { VALUE block = rb_proc_new(func, Qnil); return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block); } +#endif static void -ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) +ractor_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE result, bool exc) { - if (cr->sync.outgoing_port_closed) { - return; - } - - ASSERT_ractor_unlocking(cr); - - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - retry: - if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) { - // OK. - } - else { - bool retry = false; - RACTOR_LOCK(cr); - { - if (!ractor_check_take_basket(cr, ts)) { - VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none); - RUBY_DEBUG_LOG("leave a will"); - ractor_basket_fill_will(cr, cur_th, &cr->sync.will_basket, v, exc); - } - else { - RUBY_DEBUG_LOG("rare timing!"); - retry = true; // another ractor is waiting for the yield. - } - } - RACTOR_UNLOCK(cr); - - if (retry) goto retry; - } + ractor_notify_exit(ec, cr, result, exc); } void rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, result, false); + ractor_atexit(ec, cr, result, false); } void rb_ractor_atexit_exception(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, ec->errinfo, true); + ractor_atexit(ec, cr, ec->errinfo, true); } void rb_ractor_teardown(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_close_incoming(ec, cr); - ractor_close_outgoing(ec, cr); // sync with rb_ractor_terminate_interrupt_main_thread() - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { VM_ASSERT(cr->threads.main != NULL); cr->threads.main = NULL; } - RB_VM_LOCK_LEAVE(); } void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) { for (int i=0; i<len; i++) { - ptr[i] = ractor_receive(ec, r); + ptr[i] = ractor_receive(ec, ractor_default_port(r)); } } @@ -2342,7 +638,7 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args { int len = RARRAY_LENINT(args); for (int i=0; i<len; i++) { - ractor_send(ec, r, RARRAY_AREF(args, i), false); + ractor_send(ec, ractor_default_port(r), RARRAY_AREF(args, i), false); } } @@ -2354,14 +650,6 @@ rb_ractor_main_p_(void) return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor; } -bool -rb_obj_is_main_ractor(VALUE gv) -{ - if (!rb_ractor_p(gv)) return false; - rb_ractor_t *r = DATA_PTR(gv); - return r == GET_VM()->ractor.main_ractor; -} - int rb_ractor_living_thread_num(const rb_ractor_t *r) { @@ -2459,11 +747,9 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c // change ractor status: running -> blocking rb_vm_t *vm = GET_VM(); - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line); } - RB_VM_LOCK_LEAVE(); } } @@ -2515,11 +801,9 @@ rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line) if (cr->threads.cnt == cr->threads.blocking_cnt) { rb_vm_t *vm = GET_VM(); - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); } - RB_VM_LOCK_LEAVE(); } cr->threads.blocking_cnt--; @@ -2593,13 +877,11 @@ rb_ractor_terminate_all(void) VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it. - if (vm->ractor.cnt > 1) { - RB_VM_LOCK(); - { - ractor_terminal_interrupt_all(vm); // kill all ractors - } - RB_VM_UNLOCK(); + RB_VM_LOCK(); + { + ractor_terminal_interrupt_all(vm); // kill all ractors } + RB_VM_UNLOCK(); rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait RB_VM_LOCK(); @@ -2612,6 +894,17 @@ rb_ractor_terminate_all(void) rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */); +#ifdef RUBY_THREAD_PTHREAD_H + while (vm->ractor.sched.barrier_waiting) { + // A barrier is waiting. Threads relinquish the VM lock before joining the barrier and + // since we just acquired the VM lock back, we're blocking other threads from joining it. + // We loop until the barrier is over. We can't join this barrier because our thread isn't added to + // running_threads until the call below to `rb_add_running_thread`. + RB_VM_UNLOCK(); + unsigned int lev; + RB_VM_LOCK_ENTER_LEV_NB(&lev); + } +#endif rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); @@ -2649,83 +942,47 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self) rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object"); } -#ifndef USE_RACTOR_SELECTOR -#define USE_RACTOR_SELECTOR 0 -#endif - -RUBY_SYMBOL_EXPORT_BEGIN -void rb_init_ractor_selector(void); -RUBY_SYMBOL_EXPORT_END - /* - * Document-class: Ractor::Selector - * :nodoc: currently + * Document-class: Ractor::Error * - * Selects multiple Ractors to be activated. + * The parent class of Ractor-related error classes. */ -void -rb_init_ractor_selector(void) -{ - rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject); - rb_undef_alloc_func(rb_cRactorSelector); - - rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1); - rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1); - rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1); - rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0); - rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0); - rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1); - rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4); -} /* * Document-class: Ractor::ClosedError * * Raised when an attempt is made to send a message to a closed port, * or to retrieve a message from a closed and empty port. - * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming + * Ports may be closed explicitly with Ractor::Port#close * and are closed implicitly when a Ractor terminates. * - * r = Ractor.new { sleep(500) } - * r.close_outgoing - * r.take # Ractor::ClosedError - * - * ClosedError is a descendant of StopIteration, so the closing of the ractor will break - * the loops without propagating the error: - * - * r = Ractor.new do - * loop do - * msg = receive # raises ClosedError and loop traps it - * puts "Received: #{msg}" - * end - * puts "loop exited" - * end + * port = Ractor::Port.new + * port.close + * port << "test" # Ractor::ClosedError + * port.receive # Ractor::ClosedError * - * 3.times{|i| r << i} - * r.close_incoming - * r.take - * puts "Continue successfully" - * - * This will print: + * ClosedError is a descendant of StopIteration, so the closing of a port will break + * out of loops without propagating the error. + */ + +/* + * Document-class: Ractor::IsolationError * - * Received: 0 - * Received: 1 - * Received: 2 - * loop exited - * Continue successfully + * Raised on attempt to make a Ractor-unshareable object + * Ractor-shareable. */ /* * Document-class: Ractor::RemoteError * - * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor. + * Raised on Ractor#join or Ractor#value if there was an uncaught exception in the Ractor. * Its +cause+ will contain the original exception, and +ractor+ is the original ractor * it was raised in. * * r = Ractor.new { raise "Something weird happened" } * * begin - * r.take + * r.value * rescue => e * p e # => #<Ractor::RemoteError: thrown by remote Ractor.> * p e.ractor == r # => true @@ -2737,7 +994,7 @@ rb_init_ractor_selector(void) /* * Document-class: Ractor::MovedError * - * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield. + * Raised on an attempt to access an object which was moved in Ractor#send or Ractor::Port#send. * * r = Ractor.new { sleep } * @@ -2752,7 +1009,7 @@ rb_init_ractor_selector(void) * Document-class: Ractor::MovedObject * * A special object which replaces any value that was moved to another ractor in Ractor#send - * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError. + * or Ractor::Port#send. Any attempt to access the object results in Ractor::MovedError. * * r = Ractor.new { receive } * @@ -2764,6 +1021,12 @@ rb_init_ractor_selector(void) * # Ractor::MovedError (can not send any methods to a moved object) */ +/* + * Document-class: Ractor::UnsafeError + * + * Raised when Ractor-unsafe C-methods is invoked by a non-main Ractor. + */ + // Main docs are in ractor.rb, but without this clause there are weird artifacts // in their rendering. /* @@ -2798,11 +1061,7 @@ Init_Ractor(void) rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1); - // internal - -#if USE_RACTOR_SELECTOR - rb_init_ractor_selector(); -#endif + Init_RactorPort(); } void @@ -2896,6 +1155,50 @@ rb_ractor_hooks(rb_ractor_t *cr) return &cr->pub.hooks; } +st_table * +rb_ractor_targeted_hooks(rb_ractor_t *cr) +{ + return cr->pub.targeted_hooks; +} + +static void +rb_obj_set_shareable_no_assert(VALUE obj) +{ + FL_SET_RAW(obj, FL_SHAREABLE); + + if (rb_obj_gen_fields_p(obj)) { + VALUE fields = rb_obj_fields_no_ractor_check(obj); + if (imemo_type_p(fields, imemo_fields)) { + // no recursive mark + FL_SET_RAW(fields, FL_SHAREABLE); + } + } +} + +#ifndef STRICT_VERIFY_SHAREABLE +#define STRICT_VERIFY_SHAREABLE 0 +#endif + +bool +rb_ractor_verify_shareable(VALUE obj) +{ +#if STRICT_VERIFY_SHAREABLE + rb_gc_verify_shareable(obj); +#endif + return true; +} + +VALUE +rb_obj_set_shareable(VALUE obj) +{ + RUBY_ASSERT(!RB_SPECIAL_CONST_P(obj)); + + rb_obj_set_shareable_no_assert(obj); + RUBY_ASSERT(rb_ractor_verify_shareable(obj)); + + return obj; +} + /// traverse function // 2: stop search @@ -2996,6 +1299,7 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) // already traversed return 0; } + RB_OBJ_WRITTEN(data->rec_hash, Qundef, obj); struct obj_traverse_callback_data d = { .stop = false, @@ -3012,7 +1316,6 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) case T_REGEXP: case T_FILE: case T_SYMBOL: - case T_MATCH: break; case T_OBJECT: @@ -3021,6 +1324,8 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) case T_ARRAY: { + rb_ary_cancel_sharing(obj); + for (int i = 0; i < RARRAY_LENINT(obj); i++) { VALUE e = rb_ary_entry(obj, i); if (obj_traverse_i(e, data)) return 1; @@ -3043,7 +1348,7 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) case T_STRUCT: { - long len = RSTRUCT_LEN(obj); + long len = RSTRUCT_LEN_RAW(obj); const VALUE *ptr = RSTRUCT_CONST_PTR(obj); for (long i=0; i<len; i++) { @@ -3052,6 +1357,10 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) } break; + case T_MATCH: + if (obj_traverse_i(RMATCH(obj)->str, data)) return 1; + break; + case T_RATIONAL: if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1; if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1; @@ -3068,11 +1377,9 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) .stop = false, .data = data, }; - RB_VM_LOCK_ENTER_NO_BARRIER(); - { + RB_VM_LOCKING_NO_BARRIER() { rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d); } - RB_VM_LOCK_LEAVE_NO_BARRIER(); if (d.stop) return 1; } break; @@ -3150,6 +1457,26 @@ allow_frozen_shareable_p(VALUE obj) } static enum obj_traverse_iterator_result +make_shareable_check_shareable_freeze(VALUE obj, enum obj_traverse_iterator_result result) +{ + if (!RB_OBJ_FROZEN_RAW(obj)) { + rb_funcall(obj, idFreeze, 0); + + if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) { + rb_raise(rb_eRactorError, "#freeze does not freeze object correctly"); + } + + if (RB_OBJ_SHAREABLE_P(obj)) { + return traverse_skip; + } + } + + return result; +} + +static int obj_refer_only_shareables_p(VALUE obj); + +static enum obj_traverse_iterator_result make_shareable_check_shareable(VALUE obj) { VM_ASSERT(!SPECIAL_CONST_P(obj)); @@ -3158,8 +1485,22 @@ make_shareable_check_shareable(VALUE obj) return traverse_skip; } else if (!allow_frozen_shareable_p(obj)) { - if (rb_obj_is_proc(obj)) { - rb_proc_ractor_make_shareable(obj); + VM_ASSERT(RB_TYPE_P(obj, T_DATA)); + const rb_data_type_t *type = RTYPEDDATA_TYPE(obj); + + if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE_NO_REC) { + if (obj_refer_only_shareables_p(obj)) { + make_shareable_check_shareable_freeze(obj, traverse_skip); + RB_OBJ_SET_SHAREABLE(obj); + return traverse_skip; + } + else { + rb_raise(rb_eRactorError, + "can not make shareable object for %+"PRIsVALUE" because it refers unshareable objects", obj); + } + } + else if (rb_obj_is_proc(obj)) { + rb_proc_ractor_make_shareable(obj, Qundef); return traverse_cont; } else { @@ -3167,29 +1508,38 @@ make_shareable_check_shareable(VALUE obj) } } - if (RB_TYPE_P(obj, T_IMEMO)) { + switch (TYPE(obj)) { + case T_IMEMO: return traverse_skip; - } - - if (!RB_OBJ_FROZEN_RAW(obj)) { - rb_funcall(obj, idFreeze, 0); - - if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) { - rb_raise(rb_eRactorError, "#freeze does not freeze object correctly"); - } - - if (RB_OBJ_SHAREABLE_P(obj)) { - return traverse_skip; + case T_OBJECT: + { + // If a T_OBJECT is shared and has no free capacity, we can't safely store the object_id inline, + // as it would require to move the object content into an external buffer. + // This is only a problem for T_OBJECT, given other types have external fields and can do RCU. + // To avoid this issue, we proactively create the object_id. + shape_id_t shape_id = RBASIC_SHAPE_ID(obj); + attr_index_t capacity = RSHAPE_CAPACITY(shape_id); + attr_index_t free_capacity = capacity - RSHAPE_LEN(shape_id); + if (!rb_shape_has_object_id(shape_id) && capacity && !free_capacity) { + rb_obj_id(obj); + } } + break; + default: + break; } - return traverse_cont; + return make_shareable_check_shareable_freeze(obj, traverse_cont); } static enum obj_traverse_iterator_result mark_shareable(VALUE obj) { - FL_SET_RAW(obj, RUBY_FL_SHAREABLE); + if (RB_TYPE_P(obj, T_STRING)) { + rb_str_make_independent(obj); + } + + rb_obj_set_shareable_no_assert(obj); return traverse_cont; } @@ -3407,11 +1757,9 @@ static int obj_refer_only_shareables_p(VALUE obj) { int cnt = 0; - RB_VM_LOCK_ENTER_NO_BARRIER(); - { + RB_VM_LOCKING_NO_BARRIER() { rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt); } - RB_VM_LOCK_LEAVE_NO_BARRIER(); return cnt == 0; } @@ -3439,30 +1787,31 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) } else { st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement); + RB_OBJ_WRITTEN(data->rec_hash, Qundef, obj); + RB_OBJ_WRITTEN(data->rec_hash, Qundef, replacement); } if (!data->move) { obj = replacement; } -#define CHECK_AND_REPLACE(v) do { \ +#define CHECK_AND_REPLACE(parent_obj, v) do { \ VALUE _val = (v); \ if (obj_traverse_replace_i(_val, data)) { return 1; } \ - else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \ + else if (data->replacement != _val) { RB_OBJ_WRITE(parent_obj, &v, data->replacement); } \ } while (0) - if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { - struct gen_fields_tbl *fields_tbl; - rb_ivar_generic_fields_tbl_lookup(obj, &fields_tbl); + if (UNLIKELY(rb_obj_gen_fields_p(obj))) { + VALUE fields_obj = rb_obj_fields_no_ractor_check(obj); if (UNLIKELY(rb_shape_obj_too_complex_p(obj))) { struct obj_traverse_replace_callback_data d = { .stop = false, .data = data, - .src = obj, + .src = fields_obj, }; rb_st_foreach_with_replace( - fields_tbl->as.complex.table, + rb_imemo_fields_complex_tbl(fields_obj), obj_iv_hash_traverse_replace_foreach_i, obj_iv_hash_traverse_replace_i, (st_data_t)&d @@ -3470,10 +1819,10 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) if (d.stop) return 1; } else { - for (uint32_t i = 0; i < fields_tbl->as.shape.fields_count; i++) { - if (!UNDEF_P(fields_tbl->as.shape.fields[i])) { - CHECK_AND_REPLACE(fields_tbl->as.shape.fields[i]); - } + uint32_t fields_count = RSHAPE_LEN(RBASIC_SHAPE_ID(obj)); + VALUE *fields = rb_imemo_fields_ptr(fields_obj); + for (uint32_t i = 0; i < fields_count; i++) { + CHECK_AND_REPLACE(fields_obj, fields[i]); } } } @@ -3485,7 +1834,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_REGEXP: case T_FILE: case T_SYMBOL: - case T_MATCH: break; case T_STRING: rb_str_make_independent(obj); @@ -3508,11 +1856,11 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) if (d.stop) return 1; } else { - uint32_t len = ROBJECT_FIELDS_COUNT(obj); + uint32_t len = ROBJECT_FIELDS_COUNT_NOT_COMPLEX(obj); VALUE *ptr = ROBJECT_FIELDS(obj); for (uint32_t i = 0; i < len; i++) { - CHECK_AND_REPLACE(ptr[i]); + CHECK_AND_REPLACE(obj, ptr[i]); } } } @@ -3561,22 +1909,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_STRUCT: { - long len = RSTRUCT_LEN(obj); + long len = RSTRUCT_LEN_RAW(obj); const VALUE *ptr = RSTRUCT_CONST_PTR(obj); for (long i=0; i<len; i++) { - CHECK_AND_REPLACE(ptr[i]); + CHECK_AND_REPLACE(obj, ptr[i]); } } break; + case T_MATCH: + CHECK_AND_REPLACE(obj, RMATCH(obj)->str); + break; + case T_RATIONAL: - CHECK_AND_REPLACE(RRATIONAL(obj)->num); - CHECK_AND_REPLACE(RRATIONAL(obj)->den); + CHECK_AND_REPLACE(obj, RRATIONAL(obj)->num); + CHECK_AND_REPLACE(obj, RRATIONAL(obj)->den); break; case T_COMPLEX: - CHECK_AND_REPLACE(RCOMPLEX(obj)->real); - CHECK_AND_REPLACE(RCOMPLEX(obj)->imag); + CHECK_AND_REPLACE(obj, RCOMPLEX(obj)->real); + CHECK_AND_REPLACE(obj, RCOMPLEX(obj)->imag); break; case T_DATA: @@ -3657,8 +2009,10 @@ move_enter(VALUE obj, struct obj_traverse_replace_data *data) } else { VALUE type = RB_BUILTIN_TYPE(obj); + size_t slot_size = rb_gc_obj_slot_size(obj); type |= wb_protected_types[type] ? FL_WB_PROTECTED : 0; - NEWOBJ_OF(moved, struct RBasic, 0, type, rb_gc_obj_slot_size(obj), 0); + NEWOBJ_OF(moved, struct RBasic, 0, type, slot_size, 0); + MEMZERO(&moved[1], char, slot_size - sizeof(*moved)); data->replacement = (VALUE)moved; return traverse_cont; } @@ -3667,20 +2021,31 @@ move_enter(VALUE obj, struct obj_traverse_replace_data *data) static enum obj_traverse_iterator_result move_leave(VALUE obj, struct obj_traverse_replace_data *data) { - size_t size = rb_gc_obj_slot_size(obj); - memcpy((void *)data->replacement, (void *)obj, size); + // Copy flags + VALUE ignored_flags = RUBY_FL_PROMOTED; + RBASIC(data->replacement)->flags = (RBASIC(obj)->flags & ~ignored_flags) | (RBASIC(data->replacement)->flags & ignored_flags); + // Copy contents without the flags + memcpy( + (char *)data->replacement + sizeof(VALUE), + (char *)obj + sizeof(VALUE), + rb_gc_obj_slot_size(obj) - sizeof(VALUE) + ); + + // We've copied obj's references to the replacement + rb_gc_writebarrier_remember(data->replacement); void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c + if (UNLIKELY(rb_obj_gen_fields_p(obj))) { + rb_replace_generic_ivar(data->replacement, obj); + } rb_gc_obj_id_moved(data->replacement); - if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { - rb_replace_generic_ivar(data->replacement, obj); - } + VALUE flags = T_OBJECT | FL_FREEZE | (RBASIC(obj)->flags & FL_PROMOTED); // Avoid mutations using bind_call, etc. - MEMZERO((char *)obj + sizeof(struct RBasic), char, size - sizeof(struct RBasic)); - RBASIC(obj)->flags = T_OBJECT | FL_FREEZE; + MEMZERO((char *)obj, char, sizeof(struct RBasic)); + RBASIC(obj)->flags = flags; RBASIC_SET_CLASS_RAW(obj, rb_cRactorMovedObject); return traverse_cont; } @@ -3839,15 +2204,13 @@ rb_ractor_local_storage_value_newkey(void) void rb_ractor_local_storage_delkey(rb_ractor_local_key_t key) { - RB_VM_LOCK_ENTER(); - { + RB_VM_LOCKING() { if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) { freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4; REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa); } freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key; } - RB_VM_LOCK_LEAVE(); } static bool @@ -4032,241 +2395,226 @@ ractor_local_value_store_if_absent(rb_execution_context_t *ec, VALUE self, VALUE return rb_mutex_synchronize(cr->local_storage_store_lock, ractor_local_value_store_i, (VALUE)&data); } -// Ractor::Channel (emulate with Ractor) - -typedef rb_ractor_t rb_ractor_channel_t; +// shareable_proc static VALUE -ractor_channel_func(RB_BLOCK_CALL_FUNC_ARGLIST(y, c)) +ractor_shareable_proc(rb_execution_context_t *ec, VALUE replace_self, bool is_lambda) { - rb_execution_context_t *ec = GET_EC(); - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - - while (1) { - int state; - - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - VALUE obj = ractor_receive(ec, cr); - ractor_yield(ec, cr, obj, Qfalse); - } - EC_POP_TAG(); - - if (state) { - // ignore the error - break; - } + if (!rb_ractor_shareable_p(replace_self)) { + rb_raise(rb_eRactorIsolationError, "self should be shareable: %" PRIsVALUE, replace_self); + } + else { + VALUE proc = is_lambda ? rb_block_lambda() : rb_block_proc(); + return rb_proc_ractor_make_shareable(rb_proc_dup(proc), replace_self); } - - return Qnil; -} - -static VALUE -rb_ractor_channel_new(void) -{ -#if 0 - return rb_funcall(rb_const_get(rb_cRactor, rb_intern("Channel")), rb_intern("new"), 0); -#else - // class Channel - // def self.new - // Ractor.new do # func body - // while true - // obj = Ractor.receive - // Ractor.yield obj - // end - // rescue Ractor::ClosedError - // nil - // end - // end - // end - - return ractor_create_func(rb_cRactor, Qnil, rb_str_new2("Ractor/channel"), rb_ary_new(), ractor_channel_func); -#endif -} - -static VALUE -rb_ractor_channel_yield(rb_execution_context_t *ec, VALUE vch, VALUE obj) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - ractor_send(ec, (rb_ractor_t *)ch, obj, Qfalse); - return Qnil; } -static VALUE -rb_ractor_channel_take(rb_execution_context_t *ec, VALUE vch) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - return ractor_take(ec, (rb_ractor_t *)ch); -} +// Ractor#require -static VALUE -rb_ractor_channel_close(rb_execution_context_t *ec, VALUE vch) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); +struct cross_ractor_require { + VALUE port; + bool raised; - ractor_close_incoming(ec, (rb_ractor_t *)ch); - return ractor_close_outgoing(ec, (rb_ractor_t *)ch); -} + union { + struct { + VALUE feature; + } require; -// Ractor#require + struct { + VALUE module; + ID name; + } autoload; + } as; -struct cross_ractor_require { - VALUE ch; - VALUE result; - VALUE exception; + bool silent; +}; - // require - VALUE feature; +RUBY_REFERENCES(cross_ractor_require_refs) = { + RUBY_REF_EDGE(struct cross_ractor_require, port), + RUBY_REF_EDGE(struct cross_ractor_require, as.require.feature), + RUBY_REF_END +}; - // autoload - VALUE module; - ID name; +static const rb_data_type_t cross_ractor_require_data_type = { + "ractor/cross_ractor_require", + { + RUBY_REFS_LIST_PTR(cross_ractor_require_refs), + RUBY_DEFAULT_FREE, + NULL, // memsize + NULL, // compact + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_DECL_MARKING | RUBY_TYPED_EMBEDDABLE }; static VALUE -require_body(VALUE data) +require_body(VALUE crr_obj) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + VALUE feature = crr->as.require.feature; ID require; CONST_ID(require, "require"); - crr->result = rb_funcallv(Qnil, require, 1, &crr->feature); - return Qnil; + if (crr->silent) { + int rb_require_internal_silent(VALUE fname); + return INT2NUM(rb_require_internal_silent(feature)); + } + else { + return rb_funcallv(Qnil, require, 1, &feature); + } } static VALUE -require_rescue(VALUE data, VALUE errinfo) +require_rescue(VALUE crr_obj, VALUE errinfo) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; - crr->exception = errinfo; - return Qundef; + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + crr->raised = true; + return errinfo; } static VALUE -require_result_copy_body(VALUE data) +require_result_send_body(VALUE ary) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; + VALUE port = RARRAY_AREF(ary, 0); + VALUE results = RARRAY_AREF(ary, 1); - if (crr->exception != Qundef) { - VM_ASSERT(crr->result == Qundef); - crr->exception = ractor_copy(crr->exception); - } - else{ - VM_ASSERT(crr->result != Qundef); - crr->result = ractor_copy(crr->result); - } + rb_execution_context_t *ec = GET_EC(); + ractor_port_send(ec, port, results, Qfalse); return Qnil; } static VALUE -require_result_copy_resuce(VALUE data, VALUE errinfo) +require_result_send_resuce(VALUE port, VALUE errinfo) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; - crr->exception = errinfo; // ractor_move(crr->exception); + // TODO: need rescue? + ractor_port_send(GET_EC(), port, errinfo, Qfalse); return Qnil; } static VALUE -ractor_require_protect(struct cross_ractor_require *crr, VALUE (*func)(VALUE)) +ractor_require_protect(VALUE crr_obj, VALUE (*func)(VALUE)) { - // catch any error - rb_rescue2(func, (VALUE)crr, - require_rescue, (VALUE)crr, rb_eException, 0); + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); - rb_rescue2(require_result_copy_body, (VALUE)crr, - require_result_copy_resuce, (VALUE)crr, rb_eException, 0); + const bool silent = crr->silent; - rb_ractor_channel_yield(GET_EC(), crr->ch, Qtrue); - return Qnil; + VALUE debug, errinfo; + if (silent) { + debug = ruby_debug; + errinfo = rb_errinfo(); + } + + // get normal result or raised exception (with crr->raised == true) + VALUE result = rb_rescue2(func, crr_obj, require_rescue, crr_obj, rb_eException, 0); + + if (silent) { + ruby_debug = debug; + rb_set_errinfo(errinfo); + } + + rb_rescue2(require_result_send_body, + // [port, [result, raised]] + rb_ary_new_from_args(2, crr->port, rb_ary_new_from_args(2, result, crr->raised ? Qtrue : Qfalse)), + require_result_send_resuce, rb_eException, crr->port); + RB_GC_GUARD(crr_obj); + return Qnil; } static VALUE -ractore_require_func(void *data) +ractor_require_func(void *crr_obj) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; - return ractor_require_protect(crr, require_body); + return ractor_require_protect((VALUE)crr_obj, require_body); } VALUE -rb_ractor_require(VALUE feature) -{ - // TODO: make feature shareable - struct cross_ractor_require crr = { - .feature = feature, // TODO: ractor - .ch = rb_ractor_channel_new(), - .result = Qundef, - .exception = Qundef, - }; +rb_ractor_require(VALUE feature, bool silent) +{ + // We're about to block on the main ractor, so if we're holding the global lock we'll deadlock. + ASSERT_vm_unlocking(); + + struct cross_ractor_require *crr; + VALUE crr_obj = TypedData_Make_Struct(0, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + RB_OBJ_SET_SHAREABLE(crr_obj); // TODO: internal data? + + // Convert feature to proper file path and make it shareable as fstring + RB_OBJ_WRITE(crr_obj, &crr->as.require.feature, rb_fstring(FilePathValue(feature))); + RB_OBJ_WRITE(crr_obj, &crr->port, rb_ractor_make_shareable(ractor_port_new(GET_RACTOR()))); + crr->raised = false; + crr->silent = silent; rb_execution_context_t *ec = GET_EC(); rb_ractor_t *main_r = GET_VM()->ractor.main_ractor; - rb_ractor_interrupt_exec(main_r, ractore_require_func, &crr, 0); + rb_ractor_interrupt_exec(main_r, ractor_require_func, (void *)crr_obj, rb_interrupt_exec_flag_value_data); // wait for require done - rb_ractor_channel_take(ec, crr.ch); - rb_ractor_channel_close(ec, crr.ch); + VALUE results = ractor_port_receive(ec, crr->port); + ractor_port_close(ec, crr->port); + + VALUE exc = rb_ary_pop(results); + VALUE result = rb_ary_pop(results); + RB_GC_GUARD(crr_obj); - if (crr.exception != Qundef) { - rb_exc_raise(crr.exception); + if (RTEST(exc)) { + rb_exc_raise(result); } else { - return crr.result; + return result; } } static VALUE ractor_require(rb_execution_context_t *ec, VALUE self, VALUE feature) { - return rb_ractor_require(feature); + return rb_ractor_require(feature, false); } static VALUE -autoload_load_body(VALUE data) +autoload_load_body(VALUE crr_obj) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; - crr->result = rb_autoload_load(crr->module, crr->name); - return Qnil; + struct cross_ractor_require *crr; + TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + return rb_autoload_load(crr->as.autoload.module, crr->as.autoload.name); } static VALUE -ractor_autoload_load_func(void *data) +ractor_autoload_load_func(void *crr_obj) { - struct cross_ractor_require *crr = (struct cross_ractor_require *)data; - return ractor_require_protect(crr, autoload_load_body); + return ractor_require_protect((VALUE)crr_obj, autoload_load_body); } VALUE rb_ractor_autoload_load(VALUE module, ID name) { - struct cross_ractor_require crr = { - .module = module, - .name = name, - .ch = rb_ractor_channel_new(), - .result = Qundef, - .exception = Qundef, - }; + struct cross_ractor_require *crr; + VALUE crr_obj = TypedData_Make_Struct(0, struct cross_ractor_require, &cross_ractor_require_data_type, crr); + RB_OBJ_SET_SHAREABLE(crr_obj); // TODO: internal data? + + RB_OBJ_WRITE(crr_obj, &crr->as.autoload.module, module); + RB_OBJ_WRITE(crr_obj, &crr->as.autoload.name, name); + RB_OBJ_WRITE(crr_obj, &crr->port, rb_ractor_make_shareable(ractor_port_new(GET_RACTOR()))); rb_execution_context_t *ec = GET_EC(); rb_ractor_t *main_r = GET_VM()->ractor.main_ractor; - rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, &crr, 0); + rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, (void *)crr_obj, rb_interrupt_exec_flag_value_data); // wait for require done - rb_ractor_channel_take(ec, crr.ch); - rb_ractor_channel_close(ec, crr.ch); + VALUE results = ractor_port_receive(ec, crr->port); + ractor_port_close(ec, crr->port); + + VALUE exc = rb_ary_pop(results); + VALUE result = rb_ary_pop(results); + RB_GC_GUARD(crr_obj); - if (crr.exception != Qundef) { - rb_exc_raise(crr.exception); + if (RTEST(exc)) { + rb_exc_raise(result); } else { - return crr.result; + return result; } } |
