diff options
Diffstat (limited to 'ractor.c')
-rw-r--r-- | ractor.c | 2142 |
1 files changed, 1374 insertions, 768 deletions
@@ -5,19 +5,22 @@ #include "ruby/ractor.h" #include "ruby/thread_native.h" #include "vm_core.h" +#include "eval_intern.h" #include "vm_sync.h" #include "ractor_core.h" #include "internal/complex.h" #include "internal/error.h" +#include "internal/gc.h" #include "internal/hash.h" #include "internal/rational.h" #include "internal/struct.h" #include "internal/thread.h" #include "variable.h" -#include "gc.h" -#include "transient_heap.h" +#include "yjit.h" +#include "rjit.h" VALUE rb_cRactor; +static VALUE rb_cRactorSelector; VALUE rb_eRactorUnsafeError; VALUE rb_eRactorIsolationError; @@ -29,11 +32,13 @@ static VALUE rb_cRactorMovedObject; static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line); +// Ractor locking + static void ASSERT_ractor_unlocking(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 - // GET_EC is NULL in an MJIT worker + // GET_EC is NULL in an RJIT worker if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) { rb_bug("recursive ractor locking"); } @@ -44,7 +49,7 @@ static void ASSERT_ractor_locking(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 - // GET_EC is NULL in an MJIT worker + // GET_EC is NULL in an RJIT worker if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) { rp(r->sync.locked_by); rb_bug("ractor lock is not acquired."); @@ -55,25 +60,28 @@ ASSERT_ractor_locking(rb_ractor_t *r) static void ractor_lock(rb_ractor_t *r, const char *file, int line) { - RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : ""); + RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); ASSERT_ractor_unlocking(r); rb_native_mutex_lock(&r->sync.lock); #if RACTOR_CHECK_MODE > 0 - if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an MJIT worker - r->sync.locked_by = rb_ractor_self(GET_RACTOR()); + if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an RJIT worker + rb_ractor_t *cr = rb_current_ractor_raw(false); + r->sync.locked_by = cr ? rb_ractor_self(cr) : Qundef; } #endif - RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : ""); + RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); } static void ractor_lock_self(rb_ractor_t *cr, const char *file, int line) { VM_ASSERT(cr == GET_RACTOR()); +#if RACTOR_CHECK_MODE > 0 VM_ASSERT(cr->sync.locked_by != cr->pub.self); +#endif ractor_lock(cr, file, line); } @@ -86,14 +94,16 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line) #endif rb_native_mutex_unlock(&r->sync.lock); - RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : ""); + RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : ""); } static void ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) { VM_ASSERT(cr == GET_RACTOR()); +#if RACTOR_CHECK_MODE > 0 VM_ASSERT(cr->sync.locked_by == cr->pub.self); +#endif ractor_unlock(cr, file, line); } @@ -102,20 +112,20 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__) #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__) -static void -ractor_cond_wait(rb_ractor_t *r) +void +rb_ractor_lock_self(rb_ractor_t *r) { -#if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->sync.locked_by; - r->sync.locked_by = Qnil; -#endif - rb_native_cond_wait(&r->sync.cond, &r->sync.lock); + RACTOR_LOCK_SELF(r); +} -#if RACTOR_CHECK_MODE > 0 - r->sync.locked_by = locked_by; -#endif +void +rb_ractor_unlock_self(rb_ractor_t *r) +{ + RACTOR_UNLOCK_SELF(r); } +// Ractor status + static const char * ractor_status_str(enum ractor_status status) { @@ -152,7 +162,7 @@ ractor_status_set(rb_ractor_t *r, enum ractor_status status) VM_ASSERT(status == ractor_running); break; case ractor_terminated: - VM_ASSERT(0); // unreachable + rb_bug("unreachable"); break; } @@ -165,31 +175,40 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) return rb_ractor_status_p(r, status); } -static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i); +// Ractor data/mark/free + +static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i); +static void ractor_local_storage_mark(rb_ractor_t *r); +static void ractor_local_storage_free(rb_ractor_t *r); static void ractor_queue_mark(struct rb_ractor_queue *rq) { for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - rb_gc_mark(b->v); + struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); rb_gc_mark(b->sender); + + switch (b->type.e) { + case basket_type_yielding: + case basket_type_take_basket: + case basket_type_deleted: + case basket_type_reserved: + // ignore + break; + default: + rb_gc_mark(b->p.send.v); + } } } -static void ractor_local_storage_mark(rb_ractor_t *r); -static void ractor_local_storage_free(rb_ractor_t *r); - static void ractor_mark(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - ractor_queue_mark(&r->sync.incoming_queue); - rb_gc_mark(r->sync.wait.taken_basket.v); - rb_gc_mark(r->sync.wait.taken_basket.sender); - rb_gc_mark(r->sync.wait.yielded_basket.v); - rb_gc_mark(r->sync.wait.yielded_basket.sender); + ractor_queue_mark(&r->sync.recv_queue); + ractor_queue_mark(&r->sync.takers_queue); + rb_gc_mark(r->receiving_mutex); rb_gc_mark(r->loc); @@ -201,7 +220,7 @@ ractor_mark(void *ptr) if (r->threads.cnt > 0) { rb_thread_t *th = 0; - list_for_each(&r->threads.set, th, lt_node) { + ccan_list_for_each(&r->threads.set, th, lt_node) { VM_ASSERT(th != NULL); rb_gc_mark(th->self); } @@ -217,19 +236,16 @@ ractor_queue_free(struct rb_ractor_queue *rq) } static void -ractor_waiting_list_free(struct rb_ractor_waiting_list *wl) -{ - free(wl->ractors); -} - -static void ractor_free(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; + RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); rb_native_mutex_destroy(&r->sync.lock); +#ifdef RUBY_THREAD_WIN32_H rb_native_cond_destroy(&r->sync.cond); - ractor_queue_free(&r->sync.incoming_queue); - ractor_waiting_list_free(&r->sync.taking_ractors); +#endif + ractor_queue_free(&r->sync.recv_queue); + ractor_queue_free(&r->sync.takers_queue); ractor_local_storage_free(r); rb_hook_list_free(&r->pub.hooks); ruby_xfree(r); @@ -242,27 +258,21 @@ ractor_queue_memsize(const struct rb_ractor_queue *rq) } static size_t -ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl) -{ - return sizeof(rb_ractor_t *) * wl->size; -} - -static size_t ractor_memsize(const void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - // TODO + // TODO: more correct? return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->sync.incoming_queue) + - ractor_waiting_list_memsize(&r->sync.taking_ractors); + ractor_queue_memsize(&r->sync.recv_queue) + + ractor_queue_memsize(&r->sync.takers_queue); } static const rb_data_type_t ractor_data_type = { "ractor", { ractor_mark, - ractor_free, + ractor_free, ractor_memsize, NULL, // update }, @@ -284,16 +294,14 @@ static inline rb_ractor_t * RACTOR_PTR(VALUE self) { VM_ASSERT(rb_ractor_p(self)); - rb_ractor_t *r = DATA_PTR(self); - // TODO: check return r; } static rb_atomic_t ractor_last_id; #if RACTOR_CHECK_MODE > 0 -MJIT_FUNC_EXPORTED uint32_t +uint32_t rb_ractor_current_id(void) { if (GET_THREAD()->ractor == NULL) { @@ -305,6 +313,8 @@ rb_ractor_current_id(void) } #endif +// Ractor queue + static void ractor_queue_setup(struct rb_ractor_queue *rq) { @@ -315,15 +325,23 @@ ractor_queue_setup(struct rb_ractor_queue *rq) } static struct rb_ractor_basket * -ractor_queue_at(struct rb_ractor_queue *rq, int i) +ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq) { + if (r != NULL) ASSERT_ractor_locking(r); + return &rq->baskets[rq->start]; +} + +static struct rb_ractor_basket * +ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) +{ + if (r != NULL) ASSERT_ractor_locking(r); return &rq->baskets[(rq->start + i) % rq->size]; } static void -ractor_queue_advance(struct rb_ractor_queue *rq) +ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq) { - ASSERT_ractor_locking(GET_RACTOR()); + ASSERT_ractor_locking(r); if (rq->reserved_cnt == 0) { rq->cnt--; @@ -331,16 +349,16 @@ ractor_queue_advance(struct rb_ractor_queue *rq) rq->serial++; } else { - ractor_queue_at(rq, 0)->type = basket_type_deleted; + ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted; } } static bool -ractor_queue_skip_p(struct rb_ractor_queue *rq, int i) +ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - return b->type == basket_type_deleted || - b->type == basket_type_reserved; + struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); + return basket_type_p(b, basket_type_deleted) || + basket_type_p(b, basket_type_reserved); } static void @@ -348,8 +366,8 @@ ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) { ASSERT_ractor_locking(r); - while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) { - ractor_queue_advance(rq); + while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) { + ractor_queue_advance(r, rq); } } @@ -365,7 +383,7 @@ ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) ractor_queue_compact(r, rq); for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(rq, i)) { + if (!ractor_queue_skip_p(r, rq, i)) { return false; } } @@ -376,28 +394,21 @@ ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) static bool ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - bool found = false; + ASSERT_ractor_locking(r); - RACTOR_LOCK(r); - { - if (!ractor_queue_empty_p(r, rq)) { - for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - *basket = *b; - - // remove from queue - b->type = basket_type_deleted; - ractor_queue_compact(r, rq); - found = true; - break; - } - } + for (int i=0; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(r, rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); + *basket = *b; + + // remove from queue + b->type.e = basket_type_deleted; + ractor_queue_compact(r, rq); + return true; } } - RACTOR_UNLOCK(r); - return found; + return false; } static void @@ -413,36 +424,36 @@ ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_ba rq->size *= 2; } rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket; - // fprintf(stderr, "%s %p->cnt:%d\n", __func__, rq, rq->cnt); + // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt); } static void -ractor_basket_clear(struct rb_ractor_basket *b) +ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - b->type = basket_type_none; - b->v = Qfalse; - b->sender = Qfalse; + basket->type.e = basket_type_deleted; } +// Ractor basket + static VALUE ractor_reset_belonging(VALUE obj); // in this file static VALUE ractor_basket_value(struct rb_ractor_basket *b) { - switch (b->type) { + switch (b->type.e) { case basket_type_ref: break; case basket_type_copy: case basket_type_move: case basket_type_will: - b->type = basket_type_ref; - b->v = ractor_reset_belonging(b->v); + b->type.e = basket_type_ref; + b->p.send.v = ractor_reset_belonging(b->p.send.v); break; default: rb_bug("unreachable"); } - return b->v; + return b->p.send.v; } static VALUE @@ -450,65 +461,103 @@ ractor_basket_accept(struct rb_ractor_basket *b) { VALUE v = ractor_basket_value(b); - if (b->exception) { + if (b->p.send.exception) { VALUE cause = v; VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); rb_ivar_set(err, rb_intern("@ractor"), b->sender); - ractor_basket_clear(b); rb_ec_setup_exception(NULL, err, cause); rb_exc_raise(err); } - ractor_basket_clear(b); return v; } -static void -ractor_recursive_receive_if(rb_ractor_t *r) +// Ractor synchronizations + +#if USE_RUBY_DEBUG_LOG +static const char * +wait_status_str(enum rb_ractor_wait_status wait_status) { - if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) { - rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); + switch ((int)wait_status) { + case wait_none: return "none"; + case wait_receiving: return "receiving"; + case wait_taking: return "taking"; + case wait_yielding: return "yielding"; + case wait_receiving|wait_taking: return "receiving|taking"; + case wait_receiving|wait_yielding: return "receiving|yielding"; + case wait_taking|wait_yielding: return "taking|yielding"; + case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; } + rb_bug("unreachable"); } -static VALUE -ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) +static const char * +wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status) { - struct rb_ractor_queue *rq = &r->sync.incoming_queue; - struct rb_ractor_basket basket; - - ractor_recursive_receive_if(r); - - if (ractor_queue_deq(r, rq, &basket) == false) { - if (r->sync.incoming_port_closed) { - rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); - } - else { - return Qundef; - } + switch (wakeup_status) { + case wakeup_none: return "none"; + case wakeup_by_send: return "by_send"; + case wakeup_by_yield: return "by_yield"; + case wakeup_by_take: return "by_take"; + case wakeup_by_close: return "by_close"; + case wakeup_by_interrupt: return "by_interrupt"; + case wakeup_by_retry: return "by_retry"; } + rb_bug("unreachable"); +} - return ractor_basket_accept(&basket); +static const char * +basket_type_name(enum rb_ractor_basket_type type) +{ + switch (type) { + case basket_type_none: return "none"; + case basket_type_ref: return "ref"; + case basket_type_copy: return "copy"; + case basket_type_move: return "move"; + case basket_type_will: return "will"; + case basket_type_deleted: return "deleted"; + case basket_type_reserved: return "reserved"; + case basket_type_take_basket: return "take_basket"; + case basket_type_yielding: return "yielding"; + } + VM_ASSERT(0); + return NULL; } +#endif // USE_RUBY_DEBUG_LOG static bool -ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status) +ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status) { return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none; } +#ifdef RUBY_THREAD_PTHREAD_H +// thread_*.c +void rb_ractor_sched_wakeup(rb_ractor_t *r); +#else + +static void +rb_ractor_sched_wakeup(rb_ractor_t *r) +{ + rb_native_cond_broadcast(&r->sync.cond); +} +#endif + + static bool -ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_wakeup_status wakeup_status) +ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status) { ASSERT_ractor_locking(r); - // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r, - // wait_status_str(r->sync.wait.status), wait_status_str(wait_status), - // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status)); + RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s", + rb_ractor_id(r), + wait_status_str(r->sync.wait.status), + wait_status_str(wait_status), + wakeup_status_str(wakeup_status)); if (ractor_sleeping_by(r, wait_status)) { r->sync.wait.wakeup_status = wakeup_status; - rb_native_cond_signal(&r->sync.cond); + rb_ractor_sched_wakeup(r); return true; } else { @@ -516,22 +565,6 @@ ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_w } } -static void * -ractor_sleep_wo_gvl(void *ptr) -{ - rb_ractor_t *cr = ptr; - RACTOR_LOCK_SELF(cr); - { - VM_ASSERT(cr->sync.wait.status != wait_none); - if (cr->sync.wait.wakeup_status == wakeup_none) { - ractor_cond_wait(cr); - } - cr->sync.wait.status = wait_none; - } - RACTOR_UNLOCK_SELF(cr); - return NULL; -} - static void ractor_sleep_interrupt(void *ptr) { @@ -544,174 +577,175 @@ ractor_sleep_interrupt(void *ptr) RACTOR_UNLOCK(r); } -#if USE_RUBY_DEBUG_LOG -static const char * -wait_status_str(enum ractor_wait_status wait_status) +typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); + +static void +ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data) { - switch ((int)wait_status) { - case wait_none: return "none"; - case wait_receiving: return "receiving"; - case wait_taking: return "taking"; - case wait_yielding: return "yielding"; - case wait_receiving|wait_taking: return "receiving|taking"; - case wait_receiving|wait_yielding: return "receiving|yielding"; - case wait_taking|wait_yielding: return "taking|yielding"; - case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; + if (cr->sync.wait.status != wait_none) { + enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status; + cr->sync.wait.status = wait_none; + cr->sync.wait.wakeup_status = wakeup_by_interrupt; + + RACTOR_UNLOCK(cr); + { + if (cf_func) { + enum ruby_tag_type state; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + rb_thread_check_ints(); + } + EC_POP_TAG(); + + if (state) { + (*cf_func)(cr, cf_data); + EC_JUMP_TAG(ec, state); + } + } + else { + rb_thread_check_ints(); + } + } + + // reachable? + RACTOR_LOCK(cr); + cr->sync.wait.status = prev_wait_status; } - rb_bug("unreachable"); } -static const char * -wakeup_status_str(enum ractor_wakeup_status wakeup_status) +#ifdef RUBY_THREAD_PTHREAD_H +void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf); +#else + +// win32 +static void +ractor_cond_wait(rb_ractor_t *r) { - switch (wakeup_status) { - case wakeup_none: return "none"; - case wakeup_by_send: return "by_send"; - case wakeup_by_yield: return "by_yield"; - case wakeup_by_take: return "by_take"; - case wakeup_by_close: return "by_close"; - case wakeup_by_interrupt: return "by_interrupt"; - case wakeup_by_retry: return "by_retry"; +#if RACTOR_CHECK_MODE > 0 + VALUE locked_by = r->sync.locked_by; + r->sync.locked_by = Qnil; +#endif + rb_native_cond_wait(&r->sync.cond, &r->sync.lock); + +#if RACTOR_CHECK_MODE > 0 + r->sync.locked_by = locked_by; +#endif +} + +static void * +ractor_sleep_wo_gvl(void *ptr) +{ + rb_ractor_t *cr = ptr; + RACTOR_LOCK_SELF(cr); + { + VM_ASSERT(cr->sync.wait.status != wait_none); + if (cr->sync.wait.wakeup_status == wakeup_none) { + ractor_cond_wait(cr); + } + cr->sync.wait.status = wait_none; } - rb_bug("unreachable"); + RACTOR_UNLOCK_SELF(cr); + return NULL; } -#endif // USE_RUBY_DEBUG_LOG static void -ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) +rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf) { - VM_ASSERT(GET_RACTOR() == cr); - VM_ASSERT(cr->sync.wait.status != wait_none); - // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr, - // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); - RACTOR_UNLOCK(cr); { rb_nogvl(ractor_sleep_wo_gvl, cr, - ractor_sleep_interrupt, cr, + ubf, cr, RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL); } RACTOR_LOCK(cr); - - // rb_nogvl() can be canceled by interrupts - if (cr->sync.wait.status != wait_none) { - cr->sync.wait.status = wait_none; - cr->sync.wait.wakeup_status = wakeup_by_interrupt; - - RACTOR_UNLOCK(cr); - rb_thread_check_ints(); - RACTOR_LOCK(cr); // reachable? - } } +#endif -static void -ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr) +static enum rb_ractor_wakeup_status +ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status, + ractor_sleep_cleanup_function cf_func, void *cf_data) { - VM_ASSERT(cr == GET_RACTOR()); - bool retry_try = false; + enum rb_ractor_wakeup_status wakeup_status; + VM_ASSERT(GET_RACTOR() == cr); - RACTOR_LOCK(r); - { - if (ractor_sleeping_by(r, wait_yielding)) { - // already waiting for yielding. retry try_take. - retry_try = true; - } - else { - // insert cr into taking list - struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors; - - for (int i=0; i<wl->cnt; i++) { - if (wl->ractors[i] == cr) { - // TODO: make it clean code. - rb_native_mutex_unlock(&r->sync.lock); - rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting."); - } - } + // TODO: multi-threads + VM_ASSERT(cr->sync.wait.status == wait_none); + VM_ASSERT(wait_status != wait_none); + cr->sync.wait.status = wait_status; + cr->sync.wait.wakeup_status = wakeup_none; - if (wl->size == 0) { - wl->size = 1; - wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size); - if (wl->ractors == NULL) rb_bug("can't allocate buffer"); - } - else if (wl->size <= wl->cnt + 1) { - wl->size *= 2; - wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size); - if (wl->ractors == NULL) rb_bug("can't re-allocate buffer"); - } - wl->ractors[wl->cnt++] = cr; - } - } - RACTOR_UNLOCK(r); + // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr, + // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); - if (retry_try) { - RACTOR_LOCK(cr); - { - if (cr->sync.wait.wakeup_status == wakeup_none) { - VM_ASSERT(cr->sync.wait.status != wait_none); + RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status)); - cr->sync.wait.wakeup_status = wakeup_by_retry; - cr->sync.wait.status = wait_none; - } - } - RACTOR_UNLOCK(cr); + while (cr->sync.wait.wakeup_status == wakeup_none) { + rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt); + ractor_check_ints(ec, cr, cf_func, cf_data); } + + cr->sync.wait.status = wait_none; + + // TODO: multi-thread + wakeup_status = cr->sync.wait.wakeup_status; + cr->sync.wait.wakeup_status = wakeup_none; + + RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status)); + + return wakeup_status; } +static enum rb_ractor_wakeup_status +ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status) +{ + return ractor_sleep_with_cleanup(ec, cr, wait_status, 0, NULL); +} + +// Ractor.receive + static void -ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr) +ractor_recursive_receive_if(rb_ractor_t *r) { - RACTOR_LOCK(r); - { - int pos = -1; - for (int i=0; i<wl->cnt; i++) { - if (wl->ractors[i] == wr) { - pos = i; - break; - } - } - if (pos >= 0) { // found - wl->cnt--; - for (int i=pos; i<wl->cnt; i++) { - wl->ractors[i] = wl->ractors[i+1]; - } - } + if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) { + rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); } - RACTOR_UNLOCK(r); } -static rb_ractor_t * -ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) +static VALUE +ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) { - ASSERT_ractor_locking(r); - VM_ASSERT(&r->sync.taking_ractors == wl); + struct rb_ractor_basket basket; + ractor_recursive_receive_if(cr); + bool received = false; + + RACTOR_LOCK_SELF(cr); + { + RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt); + received = ractor_queue_deq(cr, rq, &basket); + } + RACTOR_UNLOCK_SELF(cr); - if (wl->cnt > 0) { - rb_ractor_t *tr = wl->ractors[0]; - for (int i=1; i<wl->cnt; i++) { - wl->ractors[i-1] = wl->ractors[i]; + if (!received) { + if (cr->sync.incoming_port_closed) { + rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); } - wl->cnt--; - return tr; + return Qundef; } else { - return NULL; + return ractor_basket_accept(&basket); } } static void -ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr) +ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) { VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); ractor_recursive_receive_if(cr); RACTOR_LOCK(cr); { - if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) { - VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_receiving; - cr->sync.wait.wakeup_status = wakeup_none; - ractor_sleep(ec, cr); - cr->sync.wait.wakeup_status = wakeup_none; + while (ractor_queue_empty_p(cr, rq)) { + ractor_sleep(ec, cr, wait_receiving); } } RACTOR_UNLOCK(cr); @@ -722,40 +756,25 @@ ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) { VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); VALUE v; + struct rb_ractor_queue *rq = &cr->sync.recv_queue; - while ((v = ractor_try_receive(ec, cr)) == Qundef) { - ractor_receive_wait(ec, cr); + while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) { + ractor_wait_receive(ec, cr, rq); } return v; } #if 0 -// for debug -static const char * -basket_type_name(enum rb_ractor_basket_type type) -{ - switch (type) { -#define T(t) case basket_type_##t: return #t - T(none); - T(ref); - T(copy); - T(move); - T(will); - T(deleted); - T(reserved); - default: rb_bug("unreachable"); - } -} - static void rq_dump(struct rb_ractor_queue *rq) { bool bug = false; for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); - if (b->type == basket_type_reserved) bug = true; + struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); + fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), + (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); + if (basket_type_p(b, basket_type_reserved) bug = true; } if (bug) rb_bug("!!"); } @@ -786,22 +805,23 @@ receive_if_body(VALUE ptr) ractor_receive_if_lock(data->cr); VALUE block_result = rb_yield(data->v); + rb_ractor_t *cr = data->cr; - RACTOR_LOCK_SELF(data->cr); + RACTOR_LOCK_SELF(cr); { - struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); - VM_ASSERT(b->type == basket_type_reserved); + struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); + VM_ASSERT(basket_type_p(b, basket_type_reserved)); data->rq->reserved_cnt--; if (RTEST(block_result)) { - b->type = basket_type_deleted; - ractor_queue_compact(data->cr, data->rq); + ractor_queue_delete(cr, data->rq, b); + ractor_queue_compact(cr, data->rq); } else { - b->type = basket_type_ref; + b->type.e = basket_type_ref; } } - RACTOR_UNLOCK_SELF(data->cr); + RACTOR_UNLOCK_SELF(cr); data->success = true; @@ -817,19 +837,20 @@ static VALUE receive_if_ensure(VALUE v) { struct receive_block_data *data = (struct receive_block_data *)v; + rb_ractor_t *cr = data->cr; if (!data->success) { - RACTOR_LOCK_SELF(data->cr); + RACTOR_LOCK_SELF(cr); { - struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); - VM_ASSERT(b->type == basket_type_reserved); - b->type = basket_type_deleted; + struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); + VM_ASSERT(basket_type_p(b, basket_type_reserved)); + b->type.e = basket_type_deleted; data->rq->reserved_cnt--; } - RACTOR_UNLOCK_SELF(data->cr); + RACTOR_UNLOCK_SELF(cr); } - rb_mutex_unlock(data->cr->receiving_mutex); + rb_mutex_unlock(cr->receiving_mutex); return Qnil; } @@ -841,12 +862,12 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) rb_ractor_t *cr = rb_ec_ractor_ptr(ec); unsigned int serial = (unsigned int)-1; int index = 0; - struct rb_ractor_queue *rq = &cr->sync.incoming_queue; + struct rb_ractor_queue *rq = &cr->sync.recv_queue; while (1) { VALUE v = Qundef; - ractor_receive_wait(ec, cr); + ractor_wait_receive(ec, cr, rq); RACTOR_LOCK_SELF(cr); { @@ -857,10 +878,10 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) // check newer version for (int i=index; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); + if (!ractor_queue_skip_p(cr, rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i); v = ractor_basket_value(b); - b->type = basket_type_reserved; + b->type.e = basket_type_reserved; rq->reserved_cnt++; index = i; break; @@ -869,7 +890,7 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) } RACTOR_UNLOCK_SELF(cr); - if (v != Qundef) { + if (!UNDEF_P(v)) { struct receive_block_data data = { .cr = cr, .rq = rq, @@ -881,9 +902,11 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) VALUE result = rb_ensure(receive_if_body, (VALUE)&data, receive_if_ensure, (VALUE)&data); - if (result != Qundef) return result; + if (!UNDEF_P(result)) return result; index++; } + + RUBY_VM_CHECK_INTS(ec); } } @@ -891,7 +914,6 @@ static void ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) { bool closed = false; - struct rb_ractor_queue *rq = &r->sync.incoming_queue; RACTOR_LOCK(r); { @@ -899,10 +921,8 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_ closed = true; } else { - ractor_queue_enq(r, rq, b); - if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) { - RUBY_DEBUG_LOG("wakeup", 0); - } + ractor_queue_enq(r, &r->sync.recv_queue, b); + ractor_wakeup(r, wait_receiving, wakeup_by_send); } } RACTOR_UNLOCK(r); @@ -912,430 +932,896 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_ } } +// Ractor#send + static VALUE ractor_move(VALUE obj); // in this file static VALUE ractor_copy(VALUE obj); // in this file static void -ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield) +ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype) { - basket->sender = rb_ec_ractor_ptr(ec)->pub.self; - basket->exception = exc; + VALUE v; + enum rb_ractor_basket_type type; - if (is_will) { - basket->type = basket_type_will; - basket->v = obj; - } - else if (rb_ractor_shareable_p(obj)) { - basket->type = basket_type_ref; - basket->v = obj; + if (rb_ractor_shareable_p(obj)) { + type = basket_type_ref; + v = obj; } else if (!RTEST(move)) { - basket->v = ractor_copy(obj); - basket->type = basket_type_copy; + v = ractor_copy(obj); + type = basket_type_copy; } else { - basket->type = basket_type_move; - - if (is_yield) { - basket->v = obj; // call ractor_move() when yielding timing. - } - else { - basket->v = ractor_move(obj); - } + type = basket_type_move; + v = ractor_move(obj); } + + *pobj = v; + *ptype = type; +} + +static void +ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc) +{ + VM_ASSERT(cr == GET_RACTOR()); + + basket->sender = cr->pub.self; + basket->p.send.exception = exc; + basket->p.send.v = obj; +} + +static void +ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc) +{ + VALUE v; + enum rb_ractor_basket_type type; + ractor_basket_prepare_contents(obj, move, &v, &type); + ractor_basket_fill_(cr, basket, v, exc); + basket->type.e = type; +} + +static void +ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc) +{ + ractor_basket_fill_(cr, basket, obj, exc); + basket->type.e = basket_type_will; } static VALUE ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) { struct rb_ractor_basket basket; - ractor_basket_setup(ec, &basket, obj, move, false, false, false); + // TODO: Ractor local GC + ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false); ractor_send_basket(ec, r, &basket); return r->pub.self; } -static VALUE -ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r) +// Ractor#take + +static bool +ractor_take_has_will(rb_ractor_t *r) +{ + ASSERT_ractor_locking(r); + + return basket_type_p(&r->sync.will_basket, basket_type_will); +} + +static bool +ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b) +{ + ASSERT_ractor_locking(r); + + if (ractor_take_has_will(r)) { + *b = r->sync.will_basket; + r->sync.will_basket.type.e = basket_type_none; + return true; + } + else { + VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none)); + return false; + } +} + +static bool +ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b) { - struct rb_ractor_basket basket = { - .type = basket_type_none, + ASSERT_ractor_unlocking(r); + bool taken; + + RACTOR_LOCK(r); + { + taken = ractor_take_will(r, b); + } + RACTOR_UNLOCK(r); + + return taken; +} + +static bool +ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket, + bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error) +{ + struct rb_ractor_basket b = { + .type.e = basket_type_take_basket, + .sender = cr->pub.self, + .p = { + .take = { + .basket = take_basket, + .config = config, + }, + }, }; bool closed = false; RACTOR_LOCK(r); { - if (ractor_sleeping_by(r, wait_yielding)) { - MAYBE_UNUSED(bool) wakeup_result; - VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none); - - if (r->sync.wait.yielded_basket.type == basket_type_move) { - wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry); - } - else { - wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take); - basket = r->sync.wait.yielded_basket; - ractor_basket_clear(&r->sync.wait.yielded_basket); - } - VM_ASSERT(wakeup_result); + if (is_take && ractor_take_will(r, take_basket)) { + RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r)); + } + else if (!is_take && ractor_take_has_will(r)) { + RUBY_DEBUG_LOG("has_will"); + VM_ASSERT(config != NULL); + config->closed = true; } else if (r->sync.outgoing_port_closed) { closed = true; } + else { + RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r)); + ractor_queue_enq(r, &r->sync.takers_queue, &b); + + if (basket_none_p(take_basket)) { + ractor_wakeup(r, wait_yielding, wakeup_by_take); + } + } } RACTOR_UNLOCK(r); - if (basket.type == basket_type_none) { - if (closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + if (closed) { + if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + return false; + } + else { + return true; + } +} + +static bool +ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket) +{ + struct rb_ractor_queue *ts = &r->sync.takers_queue; + bool deleted = false; + + RACTOR_LOCK(r); + { + if (r->sync.outgoing_port_closed) { + // ok } else { - return Qundef; + for (int i=0; i<ts->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); + if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) { + ractor_queue_delete(r, ts, b); + deleted = true; + } + } + if (deleted) { + ractor_queue_compact(r, ts); + } } } - else { - return ractor_basket_accept(&basket); - } + RACTOR_UNLOCK(r); + + return deleted; } static VALUE -ractor_yield_move_body(VALUE v) +ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket) { - return ractor_move(v); + bool taken; + + RACTOR_LOCK_SELF(cr); + { + if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { + taken = false; + } + else { + taken = true; + } + } + RACTOR_UNLOCK_SELF(cr); + + if (taken) { + RUBY_DEBUG_LOG("taken"); + if (basket_type_p(take_basket, basket_type_deleted)) { + VM_ASSERT(r->sync.outgoing_port_closed); + rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + } + return ractor_basket_accept(take_basket); + } + else { + RUBY_DEBUG_LOG("not taken"); + return Qundef; + } } + +#if VM_CHECK_MODE > 0 static bool -ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket) +ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb) { - ASSERT_ractor_unlocking(cr); - VM_ASSERT(basket->type != basket_type_none); + bool ret = false; + struct rb_ractor_queue *ts = &r->sync.takers_queue; - if (cr->sync.outgoing_port_closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + RACTOR_LOCK(r); + { + for (int i=0; i<ts->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); + if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) { + ret = true; + break; + } + } } + RACTOR_UNLOCK(r); + + return ret; +} +#endif +static void +ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb) +{ + retry: + if (basket_none_p(tb)) { // not yielded yet + if (!ractor_deregister_take(r, tb)) { + // not in r's takers queue + rb_thread_sleep(0); + goto retry; + } + } + else { + VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb)); + } +} + +struct take_wait_take_cleanup_data { rb_ractor_t *r; + struct rb_ractor_basket *tb; +}; - retry_shift: - RACTOR_LOCK(cr); +static void +ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr) +{ + struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr; + ractor_take_cleanup(cr, data->r, data->tb); +} + +static void +ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket) +{ + struct take_wait_take_cleanup_data data = { + .r = r, + .tb = take_basket, + }; + + RACTOR_LOCK_SELF(cr); { - r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors); + if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { + ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data); + } } - RACTOR_UNLOCK(cr); + RACTOR_UNLOCK_SELF(cr); +} - if (r) { - bool retry_shift = false; +static VALUE +ractor_take(rb_execution_context_t *ec, rb_ractor_t *r) +{ + RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r)); + VALUE v; + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - RACTOR_LOCK(r); - { - if (ractor_sleeping_by(r, wait_taking)) { - VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none); - - if (basket->type == basket_type_move) { - enum ractor_wait_status prev_wait_status = r->sync.wait.status; - r->sync.wait.status = wait_moving; - - RACTOR_UNLOCK(r); - { - int state; - VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state); - if (state) { - r->sync.wait.status = prev_wait_status; - rb_jump_tag(state); - } - else { - basket->v = moved_value; - } - } - RACTOR_LOCK(r); + struct rb_ractor_basket take_basket = { + .type.e = basket_type_none, + .sender = 0, + }; - if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) { - // terminating? - } + ractor_register_take(cr, r, &take_basket, true, NULL, false); + + while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) { + ractor_wait_take(ec, cr, r, &take_basket); + } + + VM_ASSERT(!basket_none_p(&take_basket)); + VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket)); + + return v; +} + +// Ractor.yield + +static bool +ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs) +{ + ASSERT_ractor_locking(cr); + + for (int i=0; i<rs->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i); + if (basket_type_p(b, basket_type_take_basket) && + basket_none_p(b->p.take.basket)) { + return true; + } + } + + return false; +} + +static bool +ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b) +{ + ASSERT_ractor_unlocking(cr); + struct rb_ractor_basket *first_tb = NULL; + bool found = false; + + RACTOR_LOCK_SELF(cr); + { + while (ractor_queue_deq(cr, rs, b)) { + if (basket_type_p(b, basket_type_take_basket)) { + struct rb_ractor_basket *tb = b->p.take.basket; + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { + found = true; + break; } else { - ractor_wakeup(r, wait_taking, wakeup_by_yield); + ractor_queue_enq(cr, rs, b); + if (first_tb == NULL) first_tb = tb; + struct rb_ractor_basket *head = ractor_queue_head(cr, rs); + VM_ASSERT(head != NULL); + if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) { + break; // loop detected + } } - r->sync.wait.taken_basket = *basket; } else { - retry_shift = true; + VM_ASSERT(basket_none_p(b)); } } - RACTOR_UNLOCK(r); - if (retry_shift) { - // get candidate take-waiting ractor, but already woke up by another reason. - // retry to check another ractor. - goto retry_shift; + if (found && b->p.take.config && !b->p.take.config->oneshot) { + ractor_queue_enq(cr, rs, b); + } + } + RACTOR_UNLOCK_SELF(cr); + + return found; +} + +static bool +ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will) +{ + ASSERT_ractor_unlocking(cr); + + struct rb_ractor_basket b; + + if (ractor_deq_take_basket(cr, ts, &b)) { + VM_ASSERT(basket_type_p(&b, basket_type_take_basket)); + VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding)); + + rb_ractor_t *tr = RACTOR_PTR(b.sender); + struct rb_ractor_basket *tb = b.p.take.basket; + enum rb_ractor_basket_type type; + + RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr)); + + if (is_will) { + type = basket_type_will; } else { - return true; + enum ruby_tag_type state; + + // begin + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + // TODO: Ractor local GC + ractor_basket_prepare_contents(obj, move, &obj, &type); + } + EC_POP_TAG(); + // rescue + if (state) { + RACTOR_LOCK_SELF(cr); + { + b.p.take.basket->type.e = basket_type_none; + ractor_queue_enq(cr, ts, &b); + } + RACTOR_UNLOCK_SELF(cr); + EC_JUMP_TAG(ec, state); + } } + + RACTOR_LOCK(tr); + { + VM_ASSERT(basket_type_p(tb, basket_type_yielding)); + // fill atomic + RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr)); + ractor_basket_fill_(cr, tb, obj, exc); + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) { + rb_bug("unreachable"); + } + ractor_wakeup(tr, wait_taking, wakeup_by_yield); + } + RACTOR_UNLOCK(tr); + + return true; } else { + RUBY_DEBUG_LOG("no take basket"); return false; } } -// select(r1, r2, r3, receive: true, yield: obj) +static void +ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts) +{ + RACTOR_LOCK_SELF(cr); + { + while (!ractor_check_take_basket(cr, ts)) { + ractor_sleep(ec, cr, wait_yielding); + } + } + RACTOR_UNLOCK_SELF(cr); +} + static VALUE -ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VALUE yielded_value, bool move, VALUE *ret_r) +ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move) { - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - VALUE crv = cr->pub.self; - VALUE ret = Qundef; - int i; - bool interrupted = false; - enum ractor_wait_status wait_status = 0; - bool yield_p = (yielded_value != Qundef) ? true : false; - const int alen = rs_len + (yield_p ? 1 : 0); - - struct ractor_select_action { - enum ractor_select_action_type { - ractor_select_action_take, - ractor_select_action_receive, - ractor_select_action_yield, - } type; - VALUE v; - } *actions = ALLOCA_N(struct ractor_select_action, alen); + struct rb_ractor_queue *ts = &cr->sync.takers_queue; - VM_ASSERT(cr->sync.wait.status == wait_none); - VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); + while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) { + ractor_wait_yield(ec, cr, ts); + } - // setup actions - for (i=0; i<rs_len; i++) { - VALUE v = rs[i]; + return Qnil; +} - if (v == crv) { - actions[i].type = ractor_select_action_receive; - actions[i].v = Qnil; - wait_status |= wait_receiving; - } - else if (rb_ractor_p(v)) { - actions[i].type = ractor_select_action_take; - actions[i].v = v; - wait_status |= wait_taking; - } - else { - rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v); - } +// Ractor::Selector + +struct rb_ractor_selector { + rb_ractor_t *r; + struct rb_ractor_basket take_basket; + st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *) +}; + +static int +ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data) +{ + const rb_ractor_t *r = (rb_ractor_t *)key; + rb_gc_mark(r->pub.self); + return ST_CONTINUE; +} + +static void +ractor_selector_mark(void *ptr) +{ + struct rb_ractor_selector *s = ptr; + + if (s->take_ractors) { + st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0); } - rs = NULL; - restart: + switch (s->take_basket.type.e) { + case basket_type_ref: + case basket_type_copy: + case basket_type_move: + case basket_type_will: + rb_gc_mark(s->take_basket.sender); + rb_gc_mark(s->take_basket.p.send.v); + break; + default: + break; + } +} + +static int +ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data) +{ + struct rb_ractor_selector *s = (struct rb_ractor_selector *)data; + struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val; - if (yield_p) { - actions[rs_len].type = ractor_select_action_yield; - actions[rs_len].v = Qundef; - wait_status |= wait_yielding; - ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true); + if (!config->closed) { + ractor_deregister_take((rb_ractor_t *)key, &s->take_basket); } + free(config); + return ST_CONTINUE; +} - // TODO: shuffle actions +static void +ractor_selector_free(void *ptr) +{ + struct rb_ractor_selector *s = ptr; + st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s); + st_free_table(s->take_ractors); + ruby_xfree(ptr); +} - while (1) { - RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status)); - - for (i=0; i<alen; i++) { - VALUE v, rv; - switch (actions[i].type) { - case ractor_select_action_take: - rv = actions[i].v; - v = ractor_try_take(ec, RACTOR_PTR(rv)); - if (v != Qundef) { - *ret_r = rv; - ret = v; - goto cleanup; - } - break; - case ractor_select_action_receive: - v = ractor_try_receive(ec, cr); - if (v != Qundef) { - *ret_r = ID2SYM(rb_intern("receive")); - ret = v; - goto cleanup; - } - break; - case ractor_select_action_yield: - { - if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) { - *ret_r = ID2SYM(rb_intern("yield")); - ret = Qnil; - goto cleanup; - } - } - break; - } - } +static size_t +ractor_selector_memsize(const void *ptr) +{ + const struct rb_ractor_selector *s = ptr; + return sizeof(struct rb_ractor_selector) + + st_memsize(s->take_ractors) + + s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config); +} - RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status)); +static const rb_data_type_t ractor_selector_data_type = { + "ractor/selector", + { + ractor_selector_mark, + ractor_selector_free, + ractor_selector_memsize, + NULL, // update + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY, +}; - RACTOR_LOCK(cr); - { - VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_status; - cr->sync.wait.wakeup_status = wakeup_none; - } - RACTOR_UNLOCK(cr); +static struct rb_ractor_selector * +RACTOR_SELECTOR_PTR(VALUE selv) +{ + VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type)); - // prepare waiting - for (i=0; i<alen; i++) { - rb_ractor_t *r; - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - ractor_register_taking(r, cr); - break; - case ractor_select_action_yield: - case ractor_select_action_receive: - break; + return (struct rb_ractor_selector *)DATA_PTR(selv); +} + +// Ractor::Selector.new + +static VALUE +ractor_selector_create(VALUE klass) +{ + struct rb_ractor_selector *s; + VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s); + s->take_basket.type.e = basket_type_reserved; + s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config + return selv; +} + +// Ractor::Selector#add(r) + +static VALUE +ractor_selector_add(VALUE selv, VALUE rv) +{ + if (!rb_ractor_p(rv)) { + rb_raise(rb_eArgError, "Not a ractor object"); + } + + rb_ractor_t *r = RACTOR_PTR(rv); + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + + if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) { + rb_raise(rb_eArgError, "already added"); + } + + struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config)); + VM_ASSERT(config != NULL); + config->closed = false; + config->oneshot = false; + + if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) { + st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config); + } + + return rv; +} + +// Ractor::Selector#remove(r) + +static VALUE +ractor_selector_remove(VALUE selv, VALUE rv) +{ + if (!rb_ractor_p(rv)) { + rb_raise(rb_eArgError, "Not a ractor object"); + } + + rb_ractor_t *r = RACTOR_PTR(rv); + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + + RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r)); + + if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) { + rb_raise(rb_eArgError, "not added yet"); + } + + ractor_deregister_take(r, &s->take_basket); + struct rb_ractor_selector_take_config *config; + st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config); + free(config); + + return rv; +} + +// Ractor::Selector#clear + +struct ractor_selector_clear_data { + VALUE selv; + rb_execution_context_t *ec; +}; + +static int +ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data) +{ + VALUE selv = (VALUE)data; + rb_ractor_t *r = (rb_ractor_t *)key; + ractor_selector_remove(selv, r->pub.self); + return ST_CONTINUE; +} + +static VALUE +ractor_selector_clear(VALUE selv) +{ + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + + st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv); + st_clear(s->take_ractors); + return selv; +} + +static VALUE +ractor_selector_empty_p(VALUE selv) +{ + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse; +} + +static int +ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat) +{ + rb_ractor_t *r = (rb_ractor_t *)key; + struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat; + int ret; + + if (!basket_none_p(tb)) { + RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e)); + return ST_STOP; + } + + RACTOR_LOCK(r); + { + if (basket_type_p(&r->sync.will_basket, basket_type_will)) { + RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r)); + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) { + ractor_take_will(r, tb); + ret = ST_STOP; + } + else { + RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e)); + ret = ST_CONTINUE; } } + else if (r->sync.outgoing_port_closed) { + RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r)); - // wait - RACTOR_LOCK(cr); - { - if (cr->sync.wait.wakeup_status == wakeup_none) { - for (i=0; i<alen; i++) { - rb_ractor_t *r; - - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - if (ractor_sleeping_by(r, wait_yielding)) { - RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->pub.id); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - break; - case ractor_select_action_receive: - if (cr->sync.incoming_queue.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - break; - case ractor_select_action_yield: - if (cr->sync.taking_ractors.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - else if (cr->sync.outgoing_port_closed) { - cr->sync.wait.wakeup_status = wakeup_by_close; - goto skip_sleep; - } - break; - } - } - - RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status)); - ractor_sleep(ec, cr); - RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status)); + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) { + tb->sender = r->pub.self; + ret = ST_STOP; } else { - skip_sleep: - RUBY_DEBUG_LOG("no need to sleep %s->%s", - wait_status_str(cr->sync.wait.status), - wakeup_status_str(cr->sync.wait.wakeup_status)); - cr->sync.wait.status = wait_none; + RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e)); + ret = ST_CONTINUE; } } - RACTOR_UNLOCK(cr); + else { + RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r)); + ractor_wakeup(r, wait_yielding, wakeup_by_take); + ret = ST_CONTINUE; + } + } + RACTOR_UNLOCK(r); + + return ret; +} + +// Ractor::Selector#wait + +static void +ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr) +{ + struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr; + + RACTOR_LOCK_SELF(cr); + { + while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0); + // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately. + tb->type.e = basket_type_reserved; + } + RACTOR_UNLOCK_SELF(cr); +} + +static VALUE +ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move) +{ + rb_execution_context_t *ec = GET_EC(); + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + struct rb_ractor_basket *tb = &s->take_basket; + struct rb_ractor_basket taken_basket; + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + bool do_receive = !!RTEST(do_receivev); + bool do_yield = !!RTEST(do_yieldv); + VALUE ret_v, ret_r; + enum rb_ractor_wait_status wait_status; + struct rb_ractor_queue *rq = &cr->sync.recv_queue; + struct rb_ractor_queue *ts = &cr->sync.takers_queue; + + RUBY_DEBUG_LOG("start"); + + retry: + RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries); - // cleanup waiting - for (i=0; i<alen; i++) { - rb_ractor_t *r; - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - ractor_waiting_list_del(r, &r->sync.taking_ractors, cr); + // setup wait_status + wait_status = wait_none; + if (s->take_ractors->num_entries > 0) wait_status |= wait_taking; + if (do_receive) wait_status |= wait_receiving; + if (do_yield) wait_status |= wait_yielding; + + RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status)); + + if (wait_status == wait_none) { + rb_raise(rb_eRactorError, "no taking ractors"); + } + + // check recv_queue + if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) { + ret_r = ID2SYM(rb_intern("receive")); + goto success; + } + + // check takers + if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) { + ret_v = Qnil; + ret_r = ID2SYM(rb_intern("yield")); + goto success; + } + + // check take_basket + VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved)); + s->take_basket.type.e = basket_type_none; + // kick all take target ractors + st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb); + + RACTOR_LOCK_SELF(cr); + { + retry_waiting: + while (1) { + if (!basket_none_p(tb)) { + RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e), + tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0); break; - case ractor_select_action_receive: - case ractor_select_action_yield: + } + if (do_receive && !ractor_queue_empty_p(cr, rq)) { + RUBY_DEBUG_LOG("can receive (%d)", rq->cnt); + break; + } + if (do_yield && ractor_check_take_basket(cr, ts)) { + RUBY_DEBUG_LOG("can yield"); break; } + + ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb); } - // check results - enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status; - cr->sync.wait.wakeup_status = wakeup_none; + taken_basket = *tb; - switch (wakeup_status) { - case wakeup_none: - // OK. something happens. - // retry loop. - break; - case wakeup_by_retry: - // Retry request. - break; - case wakeup_by_send: - // OK. - // retry loop and try_receive will succss. - break; - case wakeup_by_yield: - // take was succeeded! - // cr.wait.taken_basket contains passed block - VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none); - *ret_r = cr->sync.wait.taken_basket.sender; - VM_ASSERT(rb_ractor_p(*ret_r)); - ret = ractor_basket_accept(&cr->sync.wait.taken_basket); - goto cleanup; - case wakeup_by_take: - *ret_r = ID2SYM(rb_intern("yield")); - ret = Qnil; - goto cleanup; - case wakeup_by_close: - // OK. - // retry loop and will get CloseError. - break; - case wakeup_by_interrupt: - ret = Qundef; - interrupted = true; - goto cleanup; + // ensure + // tb->type.e = basket_type_reserved # do it atomic in the following code + if (taken_basket.type.e == basket_type_yielding || + RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) { + + if (basket_type_p(tb, basket_type_yielding)) { + RACTOR_UNLOCK_SELF(cr); + { + rb_thread_sleep(0); + } + RACTOR_LOCK_SELF(cr); + } + goto retry_waiting; } } + RACTOR_UNLOCK_SELF(cr); - cleanup: - RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status)); - - if (cr->sync.wait.yielded_basket.type != basket_type_none) { - ractor_basket_clear(&cr->sync.wait.yielded_basket); + // check the taken resutl + switch (taken_basket.type.e) { + case basket_type_none: + VM_ASSERT(do_receive || do_yield); + goto retry; + case basket_type_yielding: + rb_bug("unreachable"); + case basket_type_deleted: { + ractor_selector_remove(selv, taken_basket.sender); + + rb_ractor_t *r = RACTOR_PTR(taken_basket.sender); + if (ractor_take_will_lock(r, &taken_basket)) { + RUBY_DEBUG_LOG("has_will"); + } + else { + RUBY_DEBUG_LOG("no will"); + // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + // remove and retry wait + goto retry; + } + break; + } + case basket_type_will: + // no more messages + ractor_selector_remove(selv, taken_basket.sender); + break; + default: + break; } - VM_ASSERT(cr->sync.wait.status == wait_none); - VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); + RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e)); - if (interrupted) { - rb_vm_check_ints_blocking(ec); - interrupted = false; - goto restart; - } + ret_v = ractor_basket_accept(&taken_basket); + ret_r = taken_basket.sender; + success: + return rb_ary_new_from_args(2, ret_r, ret_v); +} - VM_ASSERT(ret != Qundef); - return ret; +static VALUE +ractor_selector_wait(int argc, VALUE *argv, VALUE selector) +{ + VALUE options; + ID keywords[3]; + VALUE values[3]; + + keywords[0] = rb_intern("receive"); + keywords[1] = rb_intern("yield_value"); + keywords[2] = rb_intern("move"); + + rb_scan_args(argc, argv, "0:", &options); + rb_get_kwargs(options, keywords, 0, numberof(values), values); + return ractor_selector__wait(selector, + values[0] == Qundef ? Qfalse : RTEST(values[0]), + values[1] != Qundef, values[1], values[2]); } static VALUE -ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) +ractor_selector_new(int argc, VALUE *ractors, VALUE klass) { - VALUE ret_r; - ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r); - return Qnil; + VALUE selector = ractor_selector_create(klass); + + for (int i=0; i<argc; i++) { + ractor_selector_add(selector, ractors[i]); + } + + return selector; } static VALUE -ractor_take(rb_execution_context_t *ec, rb_ractor_t *r) +ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move) { - VALUE ret_r; - VALUE v = ractor_select(ec, &r->pub.self, 1, Qundef, false, &ret_r); - return v; + VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector); + VALUE result; + int state; + + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG() == TAG_NONE)) { + result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move); + } + else { + // ensure + ractor_selector_clear(selector); + + // jump + EC_JUMP_TAG(ec, state); + } + EC_POP_TAG(); + + RB_GC_GUARD(ractors); + return result; } +// Ractor#close_incoming + static VALUE ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) { @@ -1347,8 +1833,8 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) prev = Qfalse; r->sync.incoming_port_closed = true; if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) { - VM_ASSERT(r->sync.incoming_queue.cnt == 0); - RUBY_DEBUG_LOG("cancel receiving", 0); + VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue)); + RUBY_DEBUG_LOG("cancel receiving"); } } else { @@ -1359,6 +1845,8 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) return prev; } +// Ractor#close_outgoing + static VALUE ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) { @@ -1366,27 +1854,50 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) RACTOR_LOCK(r); { + struct rb_ractor_queue *ts = &r->sync.takers_queue; + rb_ractor_t *tr; + struct rb_ractor_basket b; + if (!r->sync.outgoing_port_closed) { prev = Qfalse; r->sync.outgoing_port_closed = true; } else { + VM_ASSERT(ractor_queue_empty_p(r, ts)); prev = Qtrue; } // wakeup all taking ractors - rb_ractor_t *taking_ractor; - while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) { - RACTOR_LOCK(taking_ractor); - ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close); - RACTOR_UNLOCK(taking_ractor); + while (ractor_queue_deq(r, ts, &b)) { + if (basket_type_p(&b, basket_type_take_basket)) { + tr = RACTOR_PTR(b.sender); + struct rb_ractor_basket *tb = b.p.take.basket; + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { + b.p.take.basket->sender = r->pub.self; + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) { + rb_bug("unreachable"); + } + RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender))); + } + + if (b.p.take.config) { + b.p.take.config->closed = true; + } + + // TODO: deadlock-able? + RACTOR_LOCK(tr); + { + ractor_wakeup(tr, wait_taking, wakeup_by_close); + } + RACTOR_UNLOCK(tr); + } } // raising yielding Ractor - if (!r->yield_atexit && - ractor_wakeup(r, wait_yielding, wakeup_by_close)) { - RUBY_DEBUG_LOG("cancel yielding", 0); - } + ractor_wakeup(r, wait_yielding, wakeup_by_close); + + VM_ASSERT(ractor_queue_empty_p(r, ts)); } RACTOR_UNLOCK(r); return prev; @@ -1410,7 +1921,7 @@ vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode) RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt); VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P()); - list_add_tail(&vm->ractor.set, &r->vmlr_node); + ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node); vm->ractor.cnt++; } @@ -1418,24 +1929,17 @@ static void cancel_single_ractor_mode(void) { // enable multi-ractor mode - RUBY_DEBUG_LOG("enable multi-ractor mode", 0); + RUBY_DEBUG_LOG("enable multi-ractor mode"); VALUE was_disabled = rb_gc_enable(); rb_gc_start(); - rb_transient_heap_evacuate(); if (was_disabled) { rb_gc_disable(); } ruby_single_main_ractor = NULL; - - if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) { - rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL, - "Ractor is experimental, and the behavior may change in future versions of Ruby! " - "Also there are many implementation issues."); - } } static void @@ -1479,7 +1983,7 @@ vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr) vm->ractor.cnt, vm->ractor.sync.terminate_waiting); VM_ASSERT(vm->ractor.cnt > 0); - list_del(&cr->vmlr_node); + ccan_list_del(&cr->vmlr_node); if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) { rb_native_cond_signal(&vm->ractor.sync.terminate_cond); @@ -1510,7 +2014,7 @@ rb_ractor_main_alloc(void) { rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t)); if (r == NULL) { - fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n"); + fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n"); exit(EXIT_FAILURE); } MEMZERO(r, rb_ractor_t, 1); @@ -1541,12 +2045,12 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th) } #endif -void rb_gvl_init(rb_global_vm_lock_t *gvl); +void rb_thread_sched_init(struct rb_thread_sched *, bool atfork); void rb_ractor_living_threads_init(rb_ractor_t *r) { - list_head_init(&r->threads.set); + ccan_list_head_init(&r->threads.set); r->threads.cnt = 0; r->threads.blocking_cnt = 0; } @@ -1554,13 +2058,18 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->sync.incoming_queue); + ractor_queue_setup(&r->sync.recv_queue); + ractor_queue_setup(&r->sync.takers_queue); rb_native_mutex_initialize(&r->sync.lock); + rb_native_cond_initialize(&r->barrier_wait_cond); + +#ifdef RUBY_THREAD_WIN32_H rb_native_cond_initialize(&r->sync.cond); rb_native_cond_initialize(&r->barrier_wait_cond); +#endif // thread management - rb_gvl_init(&r->threads.gvl); + rb_thread_sched_init(&r->threads.sched, false); rb_ractor_living_threads_init(r); // naming @@ -1603,6 +2112,8 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL r->verbose = cr->verbose; r->debug = cr->debug; + rb_yjit_before_ractor_spawn(); + rb_rjit_before_ractor_spawn(); rb_thread_create_ractor(r, args, block); RB_GC_GUARD(rv); @@ -1618,28 +2129,23 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e ASSERT_ractor_unlocking(cr); - struct rb_ractor_basket basket; - ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */); + struct rb_ractor_queue *ts = &cr->sync.takers_queue; retry: - if (ractor_try_yield(ec, cr, &basket)) { + if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) { // OK. } else { bool retry = false; RACTOR_LOCK(cr); { - if (cr->sync.taking_ractors.cnt == 0) { - cr->sync.wait.yielded_basket = basket; - + if (!ractor_check_take_basket(cr, ts)) { VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_yielding; - cr->sync.wait.wakeup_status = wakeup_none; - - VM_ASSERT(cr->yield_atexit == false); - cr->yield_atexit = true; + RUBY_DEBUG_LOG("leave a will"); + ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc); } else { + RUBY_DEBUG_LOG("rare timing!"); retry = true; // another ractor is waiting for the yield. } } @@ -1650,6 +2156,20 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e } void +rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_yield_atexit(ec, cr, result, false); +} + +void +rb_ractor_atexit_exception(rb_execution_context_t *ec) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_yield_atexit(ec, cr, ec->errinfo, true); +} + +void rb_ractor_teardown(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); @@ -1666,20 +2186,6 @@ rb_ractor_teardown(rb_execution_context_t *ec) } void -rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) -{ - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, result, false); -} - -void -rb_ractor_atexit_exception(rb_execution_context_t *ec) -{ - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, ec->errinfo, true); -} - -void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) { for (int i=0; i<len; i++) { @@ -1696,7 +2202,7 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args } } -MJIT_FUNC_EXPORTED bool +bool rb_ractor_main_p_(void) { VM_ASSERT(rb_multi_ractor_p()); @@ -1712,47 +2218,30 @@ rb_obj_is_main_ractor(VALUE gv) return r == GET_VM()->ractor.main_ractor; } -rb_global_vm_lock_t * -rb_ractor_gvl(rb_ractor_t *r) -{ - return &r->threads.gvl; -} - int rb_ractor_living_thread_num(const rb_ractor_t *r) { return r->threads.cnt; } +// only for current ractor VALUE -rb_ractor_thread_list(rb_ractor_t *r) +rb_ractor_thread_list(void) { + rb_ractor_t *r = GET_RACTOR(); rb_thread_t *th = 0; - VALUE *ts; - int ts_cnt; + VALUE ary = rb_ary_new(); - RACTOR_LOCK(r); - { - ts = ALLOCA_N(VALUE, r->threads.cnt); - ts_cnt = 0; - - list_for_each(&r->threads.set, th, lt_node) { - switch (th->status) { - case THREAD_RUNNABLE: - case THREAD_STOPPED: - case THREAD_STOPPED_FOREVER: - ts[ts_cnt++] = th->self; - default: - break; - } + ccan_list_for_each(&r->threads.set, th, lt_node) { + switch (th->status) { + case THREAD_RUNNABLE: + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + rb_ary_push(ary, th->self); + default: + break; } } - RACTOR_UNLOCK(r); - - VALUE ary = rb_ary_new(); - for (int i=0; i<ts_cnt; i++) { - rb_ary_push(ary, ts[i]); - } return ary; } @@ -1765,7 +2254,7 @@ rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th) RACTOR_LOCK(r); { RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt); - list_add_tail(&r->threads.set, &th->lt_node); + ccan_list_add_tail(&r->threads.set, &th->lt_node); r->threads.cnt++; } RACTOR_UNLOCK(r); @@ -1835,6 +2324,8 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c } } +void rb_threadptr_remove(rb_thread_t *th); + void rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) { @@ -1842,13 +2333,15 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt); ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__); + rb_threadptr_remove(th); + if (cr->threads.cnt == 1) { vm_remove_ractor(th->vm, cr); } else { RACTOR_LOCK(cr); { - list_del(&th->lt_node); + ccan_list_del(&th->lt_node); cr->threads.cnt--; } RACTOR_UNLOCK(cr); @@ -1922,7 +2415,7 @@ rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r) rb_threadptr_interrupt(main_th); } else { - RUBY_DEBUG_LOG("killed (%p)", main_th); + RUBY_DEBUG_LOG("killed (%p)", (void *)main_th); } } } @@ -1935,25 +2428,33 @@ ractor_terminal_interrupt_all(rb_vm_t *vm) if (vm->ractor.cnt > 1) { // send terminate notification to all ractors rb_ractor_t *r = 0; - list_for_each(&vm->ractor.set, r, vmlr_node) { + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { if (r != vm->ractor.main_ractor) { + RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r)); rb_ractor_terminate_interrupt_main_thread(r); } } } } +void rb_add_running_thread(rb_thread_t *th); +void rb_del_running_thread(rb_thread_t *th); + void rb_ractor_terminate_all(void) { rb_vm_t *vm = GET_VM(); rb_ractor_t *cr = vm->ractor.main_ractor; + RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt); + VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it. if (vm->ractor.cnt > 1) { RB_VM_LOCK(); - ractor_terminal_interrupt_all(vm); // kill all ractors + { + ractor_terminal_interrupt_all(vm); // kill all ractors + } RB_VM_UNLOCK(); } rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait @@ -1966,7 +2467,9 @@ rb_ractor_terminate_all(void) // wait for 1sec rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); + rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */); + rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec)); rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); ractor_terminal_interrupt_all(vm); @@ -1978,7 +2481,23 @@ rb_ractor_terminate_all(void) rb_execution_context_t * rb_vm_main_ractor_ec(rb_vm_t *vm) { - return vm->ractor.main_ractor->threads.running_ec; + /* This code needs to carefully work around two bugs: + * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is + * actually currently running (as opposed to without M:N threading, when + * running_ec will still point to the _last_ thread which ran) + * - Bug #20197: If the main thread is sleeping, setting its postponed job + * interrupt flag is pointless; it won't look at the flag until it stops sleeping + * for some reason. It would be better to set the flag on the running ec, which + * will presumably look at it soon. + * + * Solution: use running_ec if it's set, otherwise fall back to the main thread ec. + * This is still susceptible to some rare race conditions (what if the last thread + * to run just entered a long-running sleep?), but seems like the best balance of + * robustness and complexity. + */ + rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec; + if (running_ec) { return running_ec; } + return vm->ractor.main_thread->ec; } static VALUE @@ -1987,6 +2506,29 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self) rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object"); } +#ifndef USE_RACTOR_SELECTOR +#define USE_RACTOR_SELECTOR 0 +#endif + +RUBY_SYMBOL_EXPORT_BEGIN +void rb_init_ractor_selector(void); +RUBY_SYMBOL_EXPORT_END + +void +rb_init_ractor_selector(void) +{ + rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject); + rb_undef_alloc_func(rb_cRactorSelector); + + rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1); + rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1); + rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1); + rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0); + rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0); + rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1); + rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4); +} + /* * Document-class: Ractor::ClosedError * @@ -2106,6 +2648,10 @@ Init_Ractor(void) rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1); + +#if USE_RACTOR_SELECTOR + rb_init_ractor_selector(); +#endif } void @@ -2114,7 +2660,7 @@ rb_ractor_dump(void) rb_vm_t *vm = GET_VM(); rb_ractor_t *r = 0; - list_for_each(&vm->ractor.set, r, vmlr_node) { + ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { if (r != vm->ractor.main_ractor) { fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_)); } @@ -2266,12 +2812,25 @@ obj_traverse_rec(struct obj_traverse_data *data) { if (UNLIKELY(!data->rec)) { data->rec_hash = rb_ident_hash_new(); - data->rec = rb_hash_st_table(data->rec_hash); + data->rec = RHASH_ST_TABLE(data->rec_hash); } return data->rec; } static int +obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr) +{ + struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr; + + if (obj_traverse_i(val, d->data)) { + d->stop = true; + return ST_STOP; + } + + return ST_CONTINUE; +} + +static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data) { if (RB_SPECIAL_CONST_P(obj)) return 0; @@ -2287,14 +2846,12 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) return 0; } - if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { - struct gen_ivtbl *ivtbl; - rb_ivar_generic_ivtbl_lookup(obj, &ivtbl); - for (uint32_t i = 0; i < ivtbl->numiv; i++) { - VALUE val = ivtbl->ivptr[i]; - if (val != Qundef && obj_traverse_i(val, data)) return 1; - } - } + struct obj_traverse_callback_data d = { + .stop = false, + .data = data, + }; + rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d); + if (d.stop) return 1; switch (BUILTIN_TYPE(obj)) { // no child node @@ -2308,15 +2865,7 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data) break; case T_OBJECT: - { - uint32_t len = ROBJECT_NUMIV(obj); - VALUE *ptr = ROBJECT_IVPTR(obj); - - for (uint32_t i=0; i<len; i++) { - VALUE val = ptr[i]; - if (val != Qundef && obj_traverse_i(val, data)) return 1; - } - } + /* Instance variables already traversed. */ break; case T_ARRAY: @@ -2436,7 +2985,10 @@ rb_obj_traverse(VALUE obj, static int frozen_shareable_p(VALUE obj, bool *made_shareable) { - if (!RB_TYPE_P(obj, T_DATA)) { + if (CHILLED_STRING_P(obj)) { + return false; + } + else if (!RB_TYPE_P(obj, T_DATA)) { return true; } else if (RTYPEDDATA_P(obj)) { @@ -2465,6 +3017,17 @@ make_shareable_check_shareable(VALUE obj) if (rb_ractor_shareable_p(obj)) { return traverse_skip; } + else if (CHILLED_STRING_P(obj)) { + rb_funcall(obj, idFreeze, 0); + + if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) { + rb_raise(rb_eRactorError, "#freeze does not freeze object correctly"); + } + + if (RB_OBJ_SHAREABLE_P(obj)) { + return traverse_skip; + } + } else if (!frozen_shareable_p(obj, &made_shareable)) { if (made_shareable) { return traverse_skip; @@ -2509,10 +3072,7 @@ VALUE rb_ractor_make_shareable_copy(VALUE obj) { VALUE copy = ractor_copy(obj); - rb_obj_traverse(copy, - make_shareable_check_shareable, - null_leave, mark_shareable); - return copy; + return rb_ractor_make_shareable(copy); } VALUE @@ -2526,6 +3086,14 @@ rb_ractor_ensure_shareable(VALUE obj, VALUE name) return obj; } +void +rb_ractor_ensure_main_ractor(const char *msg) +{ + if (!rb_ractor_main_p()) { + rb_raise(rb_eRactorIsolationError, "%s", msg); + } +} + static enum obj_traverse_iterator_result shareable_p_enter(VALUE obj) { @@ -2547,7 +3115,7 @@ shareable_p_enter(VALUE obj) return traverse_stop; // fail } -MJIT_FUNC_EXPORTED bool +bool rb_ractor_shareable_p_continue(VALUE obj) { if (rb_obj_traverse(obj, @@ -2651,30 +3219,47 @@ obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int e return ST_CONTINUE; } +static int +obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x) +{ + return ST_REPLACE; +} + +static int +obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists) +{ + struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr; + struct obj_traverse_replace_data *data = d->data; + + if (obj_traverse_replace_i(*(VALUE *)val, data)) { + d->stop = true; + return ST_STOP; + } + else if (*(VALUE *)val != data->replacement) { + VALUE v = *(VALUE *)val = data->replacement; + RB_OBJ_WRITTEN(d->src, Qundef, v); + } + + return ST_CONTINUE; +} + static struct st_table * obj_traverse_replace_rec(struct obj_traverse_replace_data *data) { if (UNLIKELY(!data->rec)) { data->rec_hash = rb_ident_hash_new(); - data->rec = rb_hash_st_table(data->rec_hash); + data->rec = RHASH_ST_TABLE(data->rec_hash); } return data->rec; } -#if USE_TRANSIENT_HEAP -void rb_ary_transient_heap_evacuate(VALUE ary, int promote); -void rb_obj_transient_heap_evacuate(VALUE obj, int promote); -void rb_hash_transient_heap_evacuate(VALUE hash, int promote); -void rb_struct_transient_heap_evacuate(VALUE st, int promote); -#endif - static void obj_refer_only_shareables_p_i(VALUE obj, void *ptr) { int *pcnt = (int *)ptr; if (!rb_ractor_shareable_p(obj)) { - pcnt++; + ++*pcnt; } } @@ -2693,7 +3278,7 @@ obj_refer_only_shareables_p(VALUE obj) static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) { - VALUE replacement; + st_data_t replacement; if (RB_SPECIAL_CONST_P(obj)) { data->replacement = obj; @@ -2706,14 +3291,14 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case traverse_stop: return 1; // stop search } - replacement = data->replacement; + replacement = (st_data_t)data->replacement; - if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t *)&replacement))) { - data->replacement = replacement; + if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) { + data->replacement = (VALUE)replacement; return 0; } else { - st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t)replacement); + st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement); } if (!data->move) { @@ -2729,9 +3314,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { struct gen_ivtbl *ivtbl; rb_ivar_generic_ivtbl_lookup(obj, &ivtbl); - for (uint32_t i = 0; i < ivtbl->numiv; i++) { - if (ivtbl->ivptr[i] != Qundef) { - CHECK_AND_REPLACE(ivtbl->ivptr[i]); + + if (UNLIKELY(rb_shape_obj_too_complex(obj))) { + struct obj_traverse_replace_callback_data d = { + .stop = false, + .data = data, + .src = obj, + }; + rb_st_foreach_with_replace( + ivtbl->as.complex.table, + obj_iv_hash_traverse_replace_foreach_i, + obj_iv_hash_traverse_replace_i, + (st_data_t)&d + ); + if (d.stop) return 1; + } + else { + for (uint32_t i = 0; i < ivtbl->as.shape.numiv; i++) { + if (!UNDEF_P(ivtbl->as.shape.ivptr[i])) { + CHECK_AND_REPLACE(ivtbl->as.shape.ivptr[i]); + } } } } @@ -2751,15 +3353,25 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_OBJECT: { -#if USE_TRANSIENT_HEAP - if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE); -#endif - - uint32_t len = ROBJECT_NUMIV(obj); - VALUE *ptr = ROBJECT_IVPTR(obj); + if (rb_shape_obj_too_complex(obj)) { + struct obj_traverse_replace_callback_data d = { + .stop = false, + .data = data, + .src = obj, + }; + rb_st_foreach_with_replace( + ROBJECT_IV_HASH(obj), + obj_iv_hash_traverse_replace_foreach_i, + obj_iv_hash_traverse_replace_i, + (st_data_t)&d + ); + if (d.stop) return 1; + } + else { + uint32_t len = ROBJECT_IV_COUNT(obj); + VALUE *ptr = ROBJECT_IVPTR(obj); - for (uint32_t i=0; i<len; i++) { - if (ptr[i] != Qundef) { + for (uint32_t i = 0; i < len; i++) { CHECK_AND_REPLACE(ptr[i]); } } @@ -2769,9 +3381,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_ARRAY: { rb_ary_cancel_sharing(obj); -#if USE_TRANSIENT_HEAP - if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE); -#endif for (int i = 0; i < RARRAY_LENINT(obj); i++) { VALUE e = rb_ary_entry(obj, i); @@ -2786,12 +3395,8 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) RB_GC_GUARD(obj); } break; - case T_HASH: { -#if USE_TRANSIENT_HEAP - if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE); -#endif struct obj_traverse_replace_callback_data d = { .stop = false, .data = data, @@ -2816,9 +3421,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) case T_STRUCT: { -#if USE_TRANSIENT_HEAP - if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE); -#endif long len = RSTRUCT_LEN(obj); const VALUE *ptr = RSTRUCT_CONST_PTR(obj); @@ -2859,7 +3461,7 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) rb_bug("unreachable"); } - data->replacement = replacement; + data->replacement = (VALUE)replacement; if (data->leave_func(obj, data) == traverse_stop) { return 1; @@ -2919,6 +3521,8 @@ ractor_moved_bang(VALUE obj) rv->v3 = 0; rv->flags = rv->flags & ~fl_users; + if (BUILTIN_TYPE(obj) == T_OBJECT) ROBJECT_SET_SHAPE_ID(obj, ROOT_SHAPE_ID); + // TODO: record moved location } @@ -2930,7 +3534,9 @@ move_enter(VALUE obj, struct obj_traverse_replace_data *data) return traverse_skip; } else { - data->replacement = rb_obj_alloc(RBASIC_CLASS(obj)); + VALUE moved = rb_obj_alloc(RBASIC_CLASS(obj)); + rb_shape_set_shape(moved, rb_shape_get_shape(obj)); + data->replacement = moved; return traverse_cont; } } @@ -2964,7 +3570,7 @@ static VALUE ractor_move(VALUE obj) { VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true); - if (val != Qundef) { + if (!UNDEF_P(val)) { return val; } else { @@ -2995,7 +3601,7 @@ static VALUE ractor_copy(VALUE obj) { VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false); - if (val != Qundef) { + if (!UNDEF_P(val)) { return val; } else { @@ -3039,9 +3645,9 @@ ractor_local_storage_mark(rb_ractor_t *r) for (int i=0; i<freed_ractor_local_keys.cnt; i++) { rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i]; - st_data_t val; - if (st_delete(r->local_storage, (st_data_t *)&key, &val) && - key->type->free) { + st_data_t val, k = (st_data_t)key; + if (st_delete(r->local_storage, &k, &val) && + (key = (rb_ractor_local_key_t)k)->type->free) { (*key->type->free)((void *)val); } } @@ -3127,7 +3733,7 @@ static bool ractor_local_ref(rb_ractor_local_key_t key, void **pret) { if (rb_ractor_main_p()) { - if ((VALUE)key->main_cache != Qundef) { + if (!UNDEF_P((VALUE)key->main_cache)) { *pret = key->main_cache; return true; } @@ -3166,9 +3772,9 @@ ractor_local_set(rb_ractor_local_key_t key, void *ptr) VALUE rb_ractor_local_storage_value(rb_ractor_local_key_t key) { - VALUE val; - if (ractor_local_ref(key, (void **)&val)) { - return val; + void *val; + if (ractor_local_ref(key, &val)) { + return (VALUE)val; } else { return Qnil; |