diff options
author | Koichi Sasada <ko1@atdot.net> | 2023-02-24 18:46:17 +0900 |
---|---|---|
committer | Koichi Sasada <ko1@atdot.net> | 2023-03-02 14:31:54 +0900 |
commit | a4421bd73c286253311c2cdf8c78ed258f8cff44 (patch) | |
tree | 3ebec079d5ed19429148726e2f5e60597d1df988 /ractor.c | |
parent | 1abec43b5d3290ef2229ceb64014ed91410a6381 (diff) |
Rewrite Ractor synchronization mechanism
This patch rewrites Ractor synchronization mechanism, send/receive
and take/yield.
* API
* Ractor::Selector is introduced for lightweight waiting
for many ractors.
* Data structure
* remove `struct rb_ractor_waiting_list` and use
`struct rb_ractor_queue takers_queue` to manage takers.
* remove `rb_ractor_t::yield_atexit` and use
`rb_ractor_t::sync::will_basket::type` to check the will.
* add `rb_ractor_basket::p.take` to represent a taking ractor.
* Synchronization protocol
* For the Ractor local GC, `take` can not make a copy object
directly so ask to generate the copy from the yielding ractor.
* The following steps shows what `r1.take` does on `r0`.
* step1: (r0) register `r0` into `r1`'s takers.
* step2: (r0) check `r1`'s status and wakeup r0 if `r1` is waiting
for yielding a value.
* step3: (r0) sleep until `r1` wakes up `r0`.
* The following steps shows what `Ractor.yield(v)` on `r1`.
* step1: (r1) check first takers of `r1` and if there is (`r0`),
make a copy object of `v` and pass it to `r0` and
wakes up `r0`.
* step2: (r1) if there is no taker ractors, sleep until
another ractor try to take.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/7371
Diffstat (limited to 'ractor.c')
-rw-r--r-- | ractor.c | 1594 |
1 files changed, 985 insertions, 609 deletions
@@ -5,6 +5,7 @@ #include "ruby/ractor.h" #include "ruby/thread_native.h" #include "vm_core.h" +#include "eval_intern.h" #include "vm_sync.h" #include "ractor_core.h" #include "internal/complex.h" @@ -20,6 +21,7 @@ #include "mjit.h" VALUE rb_cRactor; +static VALUE rb_cRactorSelector; VALUE rb_eRactorUnsafeError; VALUE rb_eRactorIsolationError; @@ -31,6 +33,8 @@ static VALUE rb_cRactorMovedObject; static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line); +// Ractor locking + static void ASSERT_ractor_unlocking(rb_ractor_t *r) { @@ -122,6 +126,8 @@ ractor_cond_wait(rb_ractor_t *r) #endif } +// Ractor status + static const char * ractor_status_str(enum ractor_status status) { @@ -158,7 +164,7 @@ ractor_status_set(rb_ractor_t *r, enum ractor_status status) VM_ASSERT(status == ractor_running); break; case ractor_terminated: - VM_ASSERT(0); // unreachable + rb_bug("unreachable"); break; } @@ -171,31 +177,40 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) return rb_ractor_status_p(r, status); } -static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i); +// Ractor data/mark/free + +static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i); +static void ractor_local_storage_mark(rb_ractor_t *r); +static void ractor_local_storage_free(rb_ractor_t *r); static void ractor_queue_mark(struct rb_ractor_queue *rq) { for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - rb_gc_mark(b->v); + struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); rb_gc_mark(b->sender); + + switch (b->type.e) { + case basket_type_yielding: + case basket_type_take_basket: + case basket_type_deleted: + case basket_type_reserved: + // ignore + break; + default: + rb_gc_mark(b->p.send.v); + } } } -static void ractor_local_storage_mark(rb_ractor_t *r); -static void ractor_local_storage_free(rb_ractor_t *r); - static void ractor_mark(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - ractor_queue_mark(&r->sync.incoming_queue); - rb_gc_mark(r->sync.wait.taken_basket.v); - rb_gc_mark(r->sync.wait.taken_basket.sender); - rb_gc_mark(r->sync.wait.yielded_basket.v); - rb_gc_mark(r->sync.wait.yielded_basket.sender); + ractor_queue_mark(&r->sync.recv_queue); + ractor_queue_mark(&r->sync.takers_queue); + rb_gc_mark(r->receiving_mutex); rb_gc_mark(r->loc); @@ -223,19 +238,14 @@ ractor_queue_free(struct rb_ractor_queue *rq) } static void -ractor_waiting_list_free(struct rb_ractor_waiting_list *wl) -{ - free(wl->ractors); -} - -static void ractor_free(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; + RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); rb_native_mutex_destroy(&r->sync.lock); rb_native_cond_destroy(&r->sync.cond); - ractor_queue_free(&r->sync.incoming_queue); - ractor_waiting_list_free(&r->sync.taking_ractors); + ractor_queue_free(&r->sync.recv_queue); + ractor_queue_free(&r->sync.takers_queue); ractor_local_storage_free(r); rb_hook_list_free(&r->pub.hooks); ruby_xfree(r); @@ -248,20 +258,14 @@ ractor_queue_memsize(const struct rb_ractor_queue *rq) } static size_t -ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl) -{ - return sizeof(rb_ractor_t *) * wl->size; -} - -static size_t ractor_memsize(const void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - // TODO + // TODO: more correct? return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->sync.incoming_queue) + - ractor_waiting_list_memsize(&r->sync.taking_ractors); + ractor_queue_memsize(&r->sync.recv_queue) + + ractor_queue_memsize(&r->sync.takers_queue); } static const rb_data_type_t ractor_data_type = { @@ -290,9 +294,7 @@ static inline rb_ractor_t * RACTOR_PTR(VALUE self) { VM_ASSERT(rb_ractor_p(self)); - rb_ractor_t *r = DATA_PTR(self); - // TODO: check return r; } @@ -311,6 +313,8 @@ rb_ractor_current_id(void) } #endif +// Ractor queue + static void ractor_queue_setup(struct rb_ractor_queue *rq) { @@ -321,15 +325,23 @@ ractor_queue_setup(struct rb_ractor_queue *rq) } static struct rb_ractor_basket * -ractor_queue_at(struct rb_ractor_queue *rq, int i) +ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq) +{ + if (r != NULL) ASSERT_ractor_locking(r); + return &rq->baskets[rq->start]; +} + +static struct rb_ractor_basket * +ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) { + if (r != NULL) ASSERT_ractor_locking(r); return &rq->baskets[(rq->start + i) % rq->size]; } static void -ractor_queue_advance(struct rb_ractor_queue *rq) +ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq) { - ASSERT_ractor_locking(GET_RACTOR()); + ASSERT_ractor_locking(r); if (rq->reserved_cnt == 0) { rq->cnt--; @@ -337,16 +349,16 @@ ractor_queue_advance(struct rb_ractor_queue *rq) rq->serial++; } else { - ractor_queue_at(rq, 0)->type = basket_type_deleted; + ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted; } } static bool -ractor_queue_skip_p(struct rb_ractor_queue *rq, int i) +ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - return b->type == basket_type_deleted || - b->type == basket_type_reserved; + struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); + return basket_type_p(b, basket_type_deleted) || + basket_type_p(b, basket_type_reserved); } static void @@ -354,8 +366,8 @@ ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) { ASSERT_ractor_locking(r); - while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) { - ractor_queue_advance(rq); + while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) { + ractor_queue_advance(r, rq); } } @@ -371,7 +383,7 @@ ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) ractor_queue_compact(r, rq); for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(rq, i)) { + if (!ractor_queue_skip_p(r, rq, i)) { return false; } } @@ -380,40 +392,30 @@ ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) } static bool -ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_basket *basket) +ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - bool found = false; - struct rb_ractor_queue *rq = &r->sync.incoming_queue; + ASSERT_ractor_locking(r); - RACTOR_LOCK(r); - { - if (!ractor_queue_empty_p(r, rq)) { - for (int i=0; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); - *basket = *b; - - // remove from queue - b->type = basket_type_deleted; - ractor_queue_compact(r, rq); - found = true; - break; - } - } + for (int i=0; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(r, rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); + *basket = *b; + + // remove from queue + b->type.e = basket_type_deleted; + ractor_queue_compact(r, rq); + return true; } } - RACTOR_UNLOCK(r); - return found; + return false; } static void -ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_basket *basket) +ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { ASSERT_ractor_locking(r); - struct rb_ractor_queue *rq = &r->sync.incoming_queue; - if (rq->size <= rq->cnt) { rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2); for (int i=rq->size - rq->start; i<rq->cnt; i++) { @@ -426,32 +428,32 @@ ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_basket *basket) } static void -ractor_basket_clear(struct rb_ractor_basket *b) +ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - b->type = basket_type_none; - b->v = Qfalse; - b->sender = Qfalse; + basket->type.e = basket_type_deleted; } +// Ractor basket + static VALUE ractor_reset_belonging(VALUE obj); // in this file static VALUE ractor_basket_value(struct rb_ractor_basket *b) { - switch (b->type) { + switch (b->type.e) { case basket_type_ref: break; case basket_type_copy: case basket_type_move: case basket_type_will: - b->type = basket_type_ref; - b->v = ractor_reset_belonging(b->v); + b->type.e = basket_type_ref; + b->p.send.v = ractor_reset_belonging(b->p.send.v); break; default: rb_bug("unreachable"); } - return b->v; + return b->p.send.v; } static VALUE @@ -459,45 +461,68 @@ ractor_basket_accept(struct rb_ractor_basket *b) { VALUE v = ractor_basket_value(b); - if (b->exception) { + if (b->p.send.exception) { VALUE cause = v; VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); rb_ivar_set(err, rb_intern("@ractor"), b->sender); - ractor_basket_clear(b); rb_ec_setup_exception(NULL, err, cause); rb_exc_raise(err); } - ractor_basket_clear(b); return v; } -static void -ractor_recursive_receive_if(rb_ractor_t *r) +// Ractor synchronizations + +#if USE_RUBY_DEBUG_LOG +static const char * +wait_status_str(enum rb_ractor_wait_status wait_status) { - if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) { - rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); + switch ((int)wait_status) { + case wait_none: return "none"; + case wait_receiving: return "receiving"; + case wait_taking: return "taking"; + case wait_yielding: return "yielding"; + case wait_receiving|wait_taking: return "receiving|taking"; + case wait_receiving|wait_yielding: return "receiving|yielding"; + case wait_taking|wait_yielding: return "taking|yielding"; + case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; } + rb_bug("unreachable"); } -static VALUE -ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) +static const char * +wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status) { - struct rb_ractor_basket basket; - - ractor_recursive_receive_if(r); - - if (ractor_queue_deq(r, &basket) == false) { - if (r->sync.incoming_port_closed) { - rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); - } - else { - return Qundef; - } + switch (wakeup_status) { + case wakeup_none: return "none"; + case wakeup_by_send: return "by_send"; + case wakeup_by_yield: return "by_yield"; + case wakeup_by_take: return "by_take"; + case wakeup_by_close: return "by_close"; + case wakeup_by_interrupt: return "by_interrupt"; + case wakeup_by_retry: return "by_retry"; } + rb_bug("unreachable"); +} - return ractor_basket_accept(&basket); +static const char * +basket_type_name(enum rb_ractor_basket_type type) +{ + switch (type) { + case basket_type_none: return "none"; + case basket_type_ref: return "ref"; + case basket_type_copy: return "copy"; + case basket_type_move: return "move"; + case basket_type_will: return "will"; + case basket_type_deleted: return "deleted"; + case basket_type_reserved: return "reserved"; + case basket_type_take_basket: return "take_basket"; + } + VM_ASSERT(0); + return NULL; } +#endif // USE_RUBY_DEBUG_LOG static bool ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status) @@ -510,13 +535,15 @@ ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ra { ASSERT_ractor_locking(r); - // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", RUBY_FUNCTION_NAME_STRING, (void *)r, - // wait_status_str(r->sync.wait.status), wait_status_str(wait_status), - // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status)); + RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s", + rb_ractor_id(r), + wait_status_str(r->sync.wait.status), + wait_status_str(wait_status), + wakeup_status_str(wakeup_status)); if (ractor_sleeping_by(r, wait_status)) { r->sync.wait.wakeup_status = wakeup_status; - rb_native_cond_signal(&r->sync.cond); + rb_native_cond_broadcast(&r->sync.cond); return true; } else { @@ -552,47 +579,26 @@ ractor_sleep_interrupt(void *ptr) RACTOR_UNLOCK(r); } -#if USE_RUBY_DEBUG_LOG -static const char * -wait_status_str(enum rb_ractor_wait_status wait_status) -{ - switch ((int)wait_status) { - case wait_none: return "none"; - case wait_receiving: return "receiving"; - case wait_taking: return "taking"; - case wait_yielding: return "yielding"; - case wait_receiving|wait_taking: return "receiving|taking"; - case wait_receiving|wait_yielding: return "receiving|yielding"; - case wait_taking|wait_yielding: return "taking|yielding"; - case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; - } - rb_bug("unreachable"); -} - -static const char * -wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status) -{ - switch (wakeup_status) { - case wakeup_none: return "none"; - case wakeup_by_send: return "by_send"; - case wakeup_by_yield: return "by_yield"; - case wakeup_by_take: return "by_take"; - case wakeup_by_close: return "by_close"; - case wakeup_by_interrupt: return "by_interrupt"; - case wakeup_by_retry: return "by_retry"; - } - rb_bug("unreachable"); -} -#endif // USE_RUBY_DEBUG_LOG +typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); -static void -ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) +static enum rb_ractor_wakeup_status +ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status, + ractor_sleep_cleanup_function cf_func, void *cf_data) { + enum rb_ractor_wakeup_status wakeup_status; VM_ASSERT(GET_RACTOR() == cr); - VM_ASSERT(cr->sync.wait.status != wait_none); + + // TODO: multi-threads + VM_ASSERT(cr->sync.wait.status == wait_none); + VM_ASSERT(wait_status != wait_none); + cr->sync.wait.status = wait_status; + cr->sync.wait.wakeup_status = wakeup_none; + // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr, // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); + RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status)); + RACTOR_UNLOCK(cr); { rb_nogvl(ractor_sleep_wo_gvl, cr, @@ -607,119 +613,87 @@ ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) cr->sync.wait.wakeup_status = wakeup_by_interrupt; RACTOR_UNLOCK(cr); - rb_thread_check_ints(); - RACTOR_LOCK(cr); // reachable? - } -} - -static void -ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr) -{ - VM_ASSERT(cr == GET_RACTOR()); - bool retry_try = false; - - RACTOR_LOCK(r); - { - if (ractor_sleeping_by(r, wait_yielding)) { - // already waiting for yielding. retry try_take. - retry_try = true; - } - else { - // insert cr into taking list - struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors; - - for (int i=0; i<wl->cnt; i++) { - if (wl->ractors[i] == cr) { - // TODO: make it clean code. - rb_native_mutex_unlock(&r->sync.lock); - rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting."); + { + if (cf_func) { + int state; + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + rb_thread_check_ints(); } - } + EC_POP_TAG(); - if (wl->size == 0) { - wl->size = 1; - wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size); - if (wl->ractors == NULL) rb_bug("can't allocate buffer"); + if (state) { + (*cf_func)(cr, cf_data); + EC_JUMP_TAG(ec, state); + } } - else if (wl->size <= wl->cnt + 1) { - wl->size *= 2; - wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size); - if (wl->ractors == NULL) rb_bug("can't re-allocate buffer"); + else { + rb_thread_check_ints(); } - wl->ractors[wl->cnt++] = cr; } + RACTOR_LOCK(cr); // reachable? } - RACTOR_UNLOCK(r); - if (retry_try) { - RACTOR_LOCK(cr); - { - if (cr->sync.wait.wakeup_status == wakeup_none) { - VM_ASSERT(cr->sync.wait.status != wait_none); + // TODO: multi-thread + wakeup_status = cr->sync.wait.wakeup_status; + cr->sync.wait.wakeup_status = wakeup_none; - cr->sync.wait.wakeup_status = wakeup_by_retry; - cr->sync.wait.status = wait_none; - } - } - RACTOR_UNLOCK(cr); - } + RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status)); + + return wakeup_status; } +static enum rb_ractor_wakeup_status +ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status) +{ + return ractor_sleep_with_cleanup(ec, cr, wait_status, NULL, NULL); +} + +// Ractor.receive + static void -ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr) +ractor_recursive_receive_if(rb_ractor_t *r) { - RACTOR_LOCK(r); - { - int pos = -1; - for (int i=0; i<wl->cnt; i++) { - if (wl->ractors[i] == wr) { - pos = i; - break; - } - } - if (pos >= 0) { // found - wl->cnt--; - for (int i=pos; i<wl->cnt; i++) { - wl->ractors[i] = wl->ractors[i+1]; - } - } + if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) { + rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); } - RACTOR_UNLOCK(r); } -static rb_ractor_t * -ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) +static VALUE +ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) { - ASSERT_ractor_locking(r); - VM_ASSERT(&r->sync.taking_ractors == wl); + struct rb_ractor_basket basket; + ractor_recursive_receive_if(cr); + bool received = false; + + RACTOR_LOCK_SELF(cr); + { + RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt); + received = ractor_queue_deq(cr, rq, &basket); + } + RACTOR_UNLOCK_SELF(cr); - if (wl->cnt > 0) { - rb_ractor_t *tr = wl->ractors[0]; - for (int i=1; i<wl->cnt; i++) { - wl->ractors[i-1] = wl->ractors[i]; + if (!received) { + if (cr->sync.incoming_port_closed) { + rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); } - wl->cnt--; - return tr; + return Qundef; } else { - return NULL; + return ractor_basket_accept(&basket); } } static void -ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr) +ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) { VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); ractor_recursive_receive_if(cr); RACTOR_LOCK(cr); { - if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) { - VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_receiving; - cr->sync.wait.wakeup_status = wakeup_none; - ractor_sleep(ec, cr); - cr->sync.wait.wakeup_status = wakeup_none; + while (ractor_queue_empty_p(cr, rq)) { + ractor_sleep(ec, cr, wait_receiving); } } RACTOR_UNLOCK(cr); @@ -730,41 +704,25 @@ ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) { VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); VALUE v; + struct rb_ractor_queue *rq = &cr->sync.recv_queue; - while (UNDEF_P(v = ractor_try_receive(ec, cr))) { - ractor_receive_wait(ec, cr); + while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) { + ractor_wait_receive(ec, cr, rq); } return v; } #if 0 -// for debug -static const char * -basket_type_name(enum rb_ractor_basket_type type) -{ - switch (type) { -#define T(t) case basket_type_##t: return #t - T(none); - T(ref); - T(copy); - T(move); - T(will); - T(deleted); - T(reserved); - default: rb_bug("unreachable"); - } -} - static void rq_dump(struct rb_ractor_queue *rq) { bool bug = false; for (int i=0; i<rq->cnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); + struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); - if (b->type == basket_type_reserved) bug = true; + if (basket_type_p(b, basket_type_reserved) bug = true; } if (bug) rb_bug("!!"); } @@ -795,22 +753,23 @@ receive_if_body(VALUE ptr) ractor_receive_if_lock(data->cr); VALUE block_result = rb_yield(data->v); + rb_ractor_t *cr = data->cr; - RACTOR_LOCK_SELF(data->cr); + RACTOR_LOCK_SELF(cr); { - struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); - VM_ASSERT(b->type == basket_type_reserved); + struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); + VM_ASSERT(basket_type_p(b, basket_type_reserved)); data->rq->reserved_cnt--; if (RTEST(block_result)) { - b->type = basket_type_deleted; - ractor_queue_compact(data->cr, data->rq); + ractor_queue_delete(cr, data->rq, b); + ractor_queue_compact(cr, data->rq); } else { - b->type = basket_type_ref; + b->type.e = basket_type_ref; } } - RACTOR_UNLOCK_SELF(data->cr); + RACTOR_UNLOCK_SELF(cr); data->success = true; @@ -826,19 +785,20 @@ static VALUE receive_if_ensure(VALUE v) { struct receive_block_data *data = (struct receive_block_data *)v; + rb_ractor_t *cr = data->cr; if (!data->success) { - RACTOR_LOCK_SELF(data->cr); + RACTOR_LOCK_SELF(cr); { - struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); - VM_ASSERT(b->type == basket_type_reserved); - b->type = basket_type_deleted; + struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); + VM_ASSERT(basket_type_p(b, basket_type_reserved)); + b->type.e = basket_type_deleted; data->rq->reserved_cnt--; } - RACTOR_UNLOCK_SELF(data->cr); + RACTOR_UNLOCK_SELF(cr); } - rb_mutex_unlock(data->cr->receiving_mutex); + rb_mutex_unlock(cr->receiving_mutex); return Qnil; } @@ -850,12 +810,12 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) rb_ractor_t *cr = rb_ec_ractor_ptr(ec); unsigned int serial = (unsigned int)-1; int index = 0; - struct rb_ractor_queue *rq = &cr->sync.incoming_queue; + struct rb_ractor_queue *rq = &cr->sync.recv_queue; while (1) { VALUE v = Qundef; - ractor_receive_wait(ec, cr); + ractor_wait_receive(ec, cr, rq); RACTOR_LOCK_SELF(cr); { @@ -866,10 +826,10 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) // check newer version for (int i=index; i<rq->cnt; i++) { - if (!ractor_queue_skip_p(rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(rq, i); + if (!ractor_queue_skip_p(cr, rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i); v = ractor_basket_value(b); - b->type = basket_type_reserved; + b->type.e = basket_type_reserved; rq->reserved_cnt++; index = i; break; @@ -909,10 +869,8 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_ closed = true; } else { - ractor_queue_enq(r, b); - if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) { - RUBY_DEBUG_LOG("wakeup"); - } + ractor_queue_enq(r, &r->sync.recv_queue, b); + ractor_wakeup(r, wait_receiving, wakeup_by_send); } } RACTOR_UNLOCK(r); @@ -922,430 +880,825 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_ } } +// Ractor#send + static VALUE ractor_move(VALUE obj); // in this file static VALUE ractor_copy(VALUE obj); // in this file static void -ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield) +ractor_basket_prepare_contents(VALUE obj, VALUE move, VALUE *pobj, enum rb_ractor_basket_type *ptype) { - basket->sender = rb_ec_ractor_ptr(ec)->pub.self; - basket->exception = exc; + VALUE v; + enum rb_ractor_basket_type type; - if (is_will) { - basket->type = basket_type_will; - basket->v = obj; - } - else if (rb_ractor_shareable_p(obj)) { - basket->type = basket_type_ref; - basket->v = obj; + if (rb_ractor_shareable_p(obj)) { + type = basket_type_ref; + v = obj; } else if (!RTEST(move)) { - basket->v = ractor_copy(obj); - basket->type = basket_type_copy; + v = ractor_copy(obj); + type = basket_type_copy; } else { - basket->type = basket_type_move; - - if (is_yield) { - basket->v = obj; // call ractor_move() when yielding timing. - } - else { - basket->v = ractor_move(obj); - } + type = basket_type_move; + v = ractor_move(obj); } + + *pobj = v; + *ptype = type; +} + +static void +ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc) +{ + VM_ASSERT(cr == GET_RACTOR()); + + basket->sender = cr->pub.self; + basket->p.send.exception = exc; + basket->p.send.v = obj; +} + +static void +ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc) +{ + VALUE v; + enum rb_ractor_basket_type type; + ractor_basket_prepare_contents(obj, move, &v, &type); + ractor_basket_fill_(cr, basket, v, exc); + basket->type.e = type; +} + +static void +ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc) +{ + ractor_basket_fill_(cr, basket, obj, exc); + basket->type.e = basket_type_will; } static VALUE ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) { struct rb_ractor_basket basket; - ractor_basket_setup(ec, &basket, obj, move, false, false, false); + // TODO: Ractor local GC + ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false); ractor_send_basket(ec, r, &basket); return r->pub.self; } -static VALUE -ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r) +// Ractor#take + +static bool +ractor_take_has_will(rb_ractor_t *r) +{ + ASSERT_ractor_locking(r); + + return basket_type_p(&r->sync.will_basket, basket_type_will); +} + +static bool +ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b) +{ + ASSERT_ractor_locking(r); + + if (ractor_take_has_will(r)) { + *b = r->sync.will_basket; + r->sync.will_basket.type.e = basket_type_none; + return true; + } + else { + VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none)); + return false; + } +} + +static bool +ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b) { - struct rb_ractor_basket basket = { - .type = basket_type_none, + ASSERT_ractor_unlocking(r); + bool taken; + + RACTOR_LOCK(r); + { + taken =ractor_take_will(r, b); + } + RACTOR_UNLOCK(r); + + return taken; +} + +static bool +ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket, + bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error) +{ + struct rb_ractor_basket b = { + .type.e = basket_type_take_basket, + .sender = cr->pub.self, + .p = { + .take = { + .basket = take_basket, + .config = config, + }, + }, }; bool closed = false; RACTOR_LOCK(r); { - if (ractor_sleeping_by(r, wait_yielding)) { - MAYBE_UNUSED(bool) wakeup_result; - VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none); - - if (r->sync.wait.yielded_basket.type == basket_type_move) { - wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry); - } - else { - wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take); - basket = r->sync.wait.yielded_basket; - ractor_basket_clear(&r->sync.wait.yielded_basket); - } - VM_ASSERT(wakeup_result); + if (is_take && ractor_take_will(r, take_basket)) { + RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r)); + } + else if (!is_take && ractor_take_has_will(r)) { + RUBY_DEBUG_LOG("has_will"); } else if (r->sync.outgoing_port_closed) { closed = true; } + else { + RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r)); + ractor_queue_enq(r, &r->sync.takers_queue, &b); + + if (basket_none_p(take_basket)) { + ractor_wakeup(r, wait_yielding, wakeup_by_take); + } + } } RACTOR_UNLOCK(r); - if (basket.type == basket_type_none) { - if (closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + if (closed) { + if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + return false; + } + else { + return true; + } +} + +static bool +ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket) +{ + struct rb_ractor_queue *ts = &r->sync.takers_queue; + bool deleted = false; + + RACTOR_LOCK(r); + { + if (r->sync.outgoing_port_closed) { + // ok } else { - return Qundef; + for (int i=0; i<ts->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); + if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) { + ractor_queue_delete(r, ts, b); + deleted = true; + } + } + if (deleted) { + ractor_queue_compact(r, ts); + } } } - else { - return ractor_basket_accept(&basket); - } + RACTOR_UNLOCK(r); + + return deleted; } static VALUE -ractor_yield_move_body(VALUE v) +ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket) { - return ractor_move(v); + bool taken; + + RACTOR_LOCK_SELF(cr); + { + if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { + taken = false; + } + else { + taken = true; + } + } + RACTOR_UNLOCK_SELF(cr); + + if (taken) { + RUBY_DEBUG_LOG("taken"); + if (basket_type_p(take_basket, basket_type_deleted)) { + VM_ASSERT(r->sync.outgoing_port_closed); + rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + } + return ractor_basket_accept(take_basket); + } + else { + RUBY_DEBUG_LOG("not taken"); + return Qundef; + } } + +#if VM_CHECK_MODE > 0 static bool -ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket) +ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb) { - ASSERT_ractor_unlocking(cr); - VM_ASSERT(basket->type != basket_type_none); + bool ret = false; + struct rb_ractor_queue *ts = &r->sync.takers_queue; - if (cr->sync.outgoing_port_closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + RACTOR_LOCK(r); + { + for (int i=0; i<ts->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); + if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) { + ret = true; + break; + } + } + } + RACTOR_UNLOCK(r); + + return ret; +} +#endif + +static void +ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb) +{ + retry: + if (basket_none_p(tb)) { // not yielded yet + if (!ractor_deregister_take(r, tb)) { + // not in r's takers queue + rb_thread_sleep(0); + goto retry; + } + } + else { + VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb)); } +} +struct take_wait_take_cleanup_data { rb_ractor_t *r; + struct rb_ractor_basket *tb; +}; - retry_shift: - RACTOR_LOCK(cr); +static void +ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr) +{ + struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr; + ractor_take_cleanup(cr, data->r, data->tb); +} + +static void +ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket) +{ + struct take_wait_take_cleanup_data data = { + .r = r, + .tb = take_basket, + }; + + RACTOR_LOCK_SELF(cr); { - r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors); + if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { + ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data); + } } - RACTOR_UNLOCK(cr); + RACTOR_UNLOCK_SELF(cr); +} - if (r) { - bool retry_shift = false; +static VALUE +ractor_take(rb_execution_context_t *ec, rb_ractor_t *r) +{ + RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r)); + VALUE v; + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - RACTOR_LOCK(r); - { - if (ractor_sleeping_by(r, wait_taking)) { - VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none); - - if (basket->type == basket_type_move) { - enum rb_ractor_wait_status prev_wait_status = r->sync.wait.status; - r->sync.wait.status = wait_moving; - - RACTOR_UNLOCK(r); - { - int state; - VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state); - if (state) { - r->sync.wait.status = prev_wait_status; - rb_jump_tag(state); - } - else { - basket->v = moved_value; - } - } - RACTOR_LOCK(r); + struct rb_ractor_basket take_basket = { + .type.e = basket_type_none, + .sender = 0, + }; - if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) { - // terminating? - } + ractor_register_take(cr, r, &take_basket, true, NULL, false); + + while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) { + ractor_wait_take(ec, cr, r, &take_basket); + } + + VM_ASSERT(!basket_none_p(&take_basket)); + VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket)); + + return v; +} + +// Ractor.yield + +static bool +ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs) +{ + ASSERT_ractor_locking(cr); + + for (int i=0; i<rs->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i); + if (basket_type_p(b, basket_type_take_basket) && + basket_none_p(b->p.take.basket)) { + return true; + } + } + + return false; +} + +static bool +ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b) +{ + ASSERT_ractor_unlocking(cr); + struct rb_ractor_basket *first_tb = NULL; + bool found = false; + + RACTOR_LOCK_SELF(cr); + { + while (ractor_queue_deq(cr, rs, b)) { + if (basket_type_p(b, basket_type_take_basket)) { + struct rb_ractor_basket *tb = b->p.take.basket; + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { + found = true; + break; } else { - ractor_wakeup(r, wait_taking, wakeup_by_yield); + ractor_queue_enq(cr, rs, b); + if (first_tb == NULL) first_tb = tb; + struct rb_ractor_basket *head = ractor_queue_head(cr, rs); + VM_ASSERT(head != NULL); + if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) { + break; // loop detected + } } - r->sync.wait.taken_basket = *basket; } else { - retry_shift = true; + VM_ASSERT(basket_none_p(b)); } } - RACTOR_UNLOCK(r); - if (retry_shift) { - // get candidate take-waiting ractor, but already woke up by another reason. - // retry to check another ractor. - goto retry_shift; + if (found && b->p.take.config && !b->p.take.config->oneshot) { + ractor_queue_enq(cr, rs, b); + } + } + RACTOR_UNLOCK_SELF(cr); + + return found; +} + +static bool +ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, VALUE obj, VALUE move, bool exc, bool is_will) +{ + ASSERT_ractor_unlocking(cr); + + struct rb_ractor_basket b; + + if (ractor_deq_take_basket(cr, ts, &b)) { + VM_ASSERT(basket_type_p(&b, basket_type_take_basket)); + VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding)); + + rb_ractor_t *tr = RACTOR_PTR(b.sender); + struct rb_ractor_basket *tb = b.p.take.basket; + enum rb_ractor_basket_type type; + + RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr)); + + if (is_will) { + type = basket_type_will; } else { - return true; + int state; + + // begin + EC_PUSH_TAG(ec); + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + // TODO: Ractor local GC + ractor_basket_prepare_contents(obj, move, &obj, &type); + } + EC_POP_TAG(); + // rescue + if (state) { + RACTOR_LOCK_SELF(cr); + { + b.p.take.basket->type.e = basket_type_none; + ractor_queue_enq(cr, ts, &b); + } + RACTOR_UNLOCK_SELF(cr); + EC_JUMP_TAG(ec, state); + } } + + RACTOR_LOCK(tr); + { + VM_ASSERT(basket_type_p(tb, basket_type_yielding)); + // fill atomic + RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr)); + ractor_basket_fill_(cr, tb, obj, exc); + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) { + rb_bug("unreachable"); + } + ractor_wakeup(tr, wait_taking, wakeup_by_yield); + } + RACTOR_UNLOCK(tr); + + return true; } else { + RUBY_DEBUG_LOG("no take basket"); return false; } } -// select(r1, r2, r3, receive: true, yield: obj) +static void +ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts) +{ + RACTOR_LOCK_SELF(cr); + { + while (!ractor_check_take_basket(cr, ts)) { + ractor_sleep(ec, cr, wait_yielding); + } + } + RACTOR_UNLOCK_SELF(cr); +} + static VALUE -ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VALUE yielded_value, bool move, VALUE *ret_r) +ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move) { - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - VALUE crv = cr->pub.self; - VALUE ret = Qundef; - int i; - bool interrupted = false; - enum rb_ractor_wait_status wait_status = 0; - bool yield_p = !UNDEF_P(yielded_value) ? true : false; - const int alen = rs_len + (yield_p ? 1 : 0); - - struct ractor_select_action { - enum ractor_select_action_type { - ractor_select_action_take, - ractor_select_action_receive, - ractor_select_action_yield, - } type; - VALUE v; - } *actions = ALLOCA_N(struct ractor_select_action, alen); + struct rb_ractor_queue *ts = &cr->sync.takers_queue; - VM_ASSERT(cr->sync.wait.status == wait_none); - VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); + while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) { + ractor_wait_yield(ec, cr, ts); + } + + return Qnil; +} - // setup actions - for (i=0; i<rs_len; i++) { - VALUE v = rs[i]; +// Ractor::Selector - if (v == crv) { - actions[i].type = ractor_select_action_receive; - actions[i].v = Qnil; - wait_status |= wait_receiving; - } - else if (rb_ractor_p(v)) { - actions[i].type = ractor_select_action_take; - actions[i].v = v; - wait_status |= wait_taking; - } - else { - rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v); - } +struct rb_ractor_selector { + rb_ractor_t *r; + struct rb_ractor_basket take_basket; + st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *) +}; + +static int +ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data) +{ + const rb_ractor_t *r = (rb_ractor_t *)key; + rb_gc_mark(r->pub.self); + return ST_CONTINUE; +} + +static void +ractor_selector_mark(void *ptr) +{ + struct rb_ractor_selector *s = ptr; + + if (s->take_ractors) { + st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0); + } + + switch (s->take_basket.type.e) { + case basket_type_ref: + case basket_type_copy: + case basket_type_move: + case basket_type_will: + rb_gc_mark(s->take_basket.sender); + rb_gc_mark(s->take_basket.p.send.v); + break; + default: + break; } - rs = NULL; +} - restart: +static int +ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data) +{ + struct rb_ractor_selector *s = (struct rb_ractor_selector *)data; + struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val; - if (yield_p) { - actions[rs_len].type = ractor_select_action_yield; - actions[rs_len].v = Qundef; - wait_status |= wait_yielding; - ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true); + if (!config->closed) { + ractor_deregister_take((rb_ractor_t *)key, &s->take_basket); } + free(config); + return ST_CONTINUE; +} - // TODO: shuffle actions +static void +ractor_selector_free(void *ptr) +{ + struct rb_ractor_selector *s = ptr; + st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s); + st_free_table(s->take_ractors); + ruby_xfree(ptr); +} - while (1) { - RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status)); - - for (i=0; i<alen; i++) { - VALUE v, rv; - switch (actions[i].type) { - case ractor_select_action_take: - rv = actions[i].v; - v = ractor_try_take(ec, RACTOR_PTR(rv)); - if (!UNDEF_P(v)) { - *ret_r = rv; - ret = v; - goto cleanup; - } - break; - case ractor_select_action_receive: - v = ractor_try_receive(ec, cr); - if (!UNDEF_P(v)) { - *ret_r = ID2SYM(rb_intern("receive")); - ret = v; - goto cleanup; - } - break; - case ractor_select_action_yield: - { - if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) { - *ret_r = ID2SYM(rb_intern("yield")); - ret = Qnil; - goto cleanup; - } - } - break; - } - } +static size_t +ractor_selector_memsize(const void *ptr) +{ + const struct rb_ractor_selector *s = ptr; + return sizeof(struct rb_ractor_selector) + + st_memsize(s->take_ractors) + + s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config); +} - RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status)); +static const rb_data_type_t ractor_selector_data_type = { + "ractor/selector", + { + ractor_selector_mark, + ractor_selector_free, + ractor_selector_memsize, + NULL, // update + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY, +}; - RACTOR_LOCK(cr); - { - VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_status; - cr->sync.wait.wakeup_status = wakeup_none; - } - RACTOR_UNLOCK(cr); +static struct rb_ractor_selector * +RACTOR_SELECTOR_PTR(VALUE selv) +{ + VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type)); - // prepare waiting - for (i=0; i<alen; i++) { - rb_ractor_t *r; - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - ractor_register_taking(r, cr); - break; - case ractor_select_action_yield: - case ractor_select_action_receive: - break; - } - } + return (struct rb_ractor_selector *)DATA_PTR(selv); +} - // wait - RACTOR_LOCK(cr); - { - if (cr->sync.wait.wakeup_status == wakeup_none) { - for (i=0; i<alen; i++) { - rb_ractor_t *r; - - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - if (ractor_sleeping_by(r, wait_yielding)) { - RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->pub.id); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - break; - case ractor_select_action_receive: - if (cr->sync.incoming_queue.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - break; - case ractor_select_action_yield: - if (cr->sync.taking_ractors.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt); - cr->sync.wait.wakeup_status = wakeup_by_retry; - goto skip_sleep; - } - else if (cr->sync.outgoing_port_closed) { - cr->sync.wait.wakeup_status = wakeup_by_close; - goto skip_sleep; - } - break; - } - } +// Ractor::Selector.new - RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status)); - ractor_sleep(ec, cr); - RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status)); - } - else { - skip_sleep: - RUBY_DEBUG_LOG("no need to sleep %s->%s", - wait_status_str(cr->sync.wait.status), - wakeup_status_str(cr->sync.wait.wakeup_status)); - cr->sync.wait.status = wait_none; - } - } - RACTOR_UNLOCK(cr); +static VALUE +ractor_selector_create(VALUE crv) +{ + struct rb_ractor_selector *s; + VALUE selv = TypedData_Make_Struct(rb_cRactorSelector, struct rb_ractor_selector, &ractor_selector_data_type, s); + s->take_basket.type.e = basket_type_reserved; + s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config + return selv; +} - // cleanup waiting - for (i=0; i<alen; i++) { - rb_ractor_t *r; - switch (actions[i].type) { - case ractor_select_action_take: - r = RACTOR_PTR(actions[i].v); - ractor_waiting_list_del(r, &r->sync.taking_ractors, cr); - break; - case ractor_select_action_receive: - case ractor_select_action_yield: - break; - } - } +// Ractor::Selector#add(r) - // check results - enum rb_ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status; - cr->sync.wait.wakeup_status = wakeup_none; +static VALUE +ractor_selector_add(rb_execution_context_t *ec, VALUE selv, VALUE rv) +{ + if (!rb_ractor_p(rv)) { + rb_raise(rb_eArgError, "Not a ractor object"); + } - switch (wakeup_status) { - case wakeup_none: - // OK. something happens. - // retry loop. - break; - case wakeup_by_retry: - // Retry request. - break; - case wakeup_by_send: - // OK. - // retry loop and try_receive will succss. - break; - case wakeup_by_yield: - // take was succeeded! - // cr.wait.taken_basket contains passed block - VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none); - *ret_r = cr->sync.wait.taken_basket.sender; - VM_ASSERT(rb_ractor_p(*ret_r)); - ret = ractor_basket_accept(&cr->sync.wait.taken_basket); - goto cleanup; - case wakeup_by_take: - *ret_r = ID2SYM(rb_intern("yield")); - ret = Qnil; - goto cleanup; - case wakeup_by_close: - // OK. - // retry loop and will get CloseError. - break; - case wakeup_by_interrupt: - ret = Qundef; - interrupted = true; - goto cleanup; - } + rb_ractor_t *r = RACTOR_PTR(rv); + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + + if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) { + rb_raise(rb_eArgError, "already added"); } - cleanup: - RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status)); + struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config)); + VM_ASSERT(config != NULL); + config->closed = false; + config->oneshot = false; - if (cr->sync.wait.yielded_basket.type != basket_type_none) { - ractor_basket_clear(&cr->sync.wait.yielded_basket); + if (ractor_register_take(rb_ec_ractor_ptr(ec), r, &s->take_basket, false, config, true)) { + st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config); } - VM_ASSERT(cr->sync.wait.status == wait_none); - VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); + return rv; +} - if (interrupted) { - rb_vm_check_ints_blocking(ec); - interrupted = false; - goto restart; +// Ractor::Selector#remove(r) + +static VALUE +ractor_selector_remove(rb_execution_context_t *ec, VALUE selv, VALUE rv) +{ + if (!rb_ractor_p(rv)) { + rb_raise(rb_eArgError, "Not a ractor object"); } - VM_ASSERT(!UNDEF_P(ret)); - return ret; + rb_ractor_t *r = RACTOR_PTR(rv); + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + + RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r)); + + if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) { + rb_raise(rb_eArgError, "not added yet"); + } + + ractor_deregister_take(r, &s->take_basket); + struct rb_ractor_selector_take_config *config; + st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config); + free(config); + + return rv; +} + +// Ractor::Selector#clear + +struct ractor_selector_clear_data { + VALUE selv; + rb_execution_context_t *ec; +}; + +static int +ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data) +{ + struct ractor_selector_clear_data *ptr = (struct ractor_selector_clear_data *)data; + rb_ractor_t *r = (rb_ractor_t *)key; + ractor_selector_remove(ptr->ec, ptr->selv, r->pub.self); + return ST_CONTINUE; } static VALUE -ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) +ractor_selector_clear(rb_execution_context_t *ec, VALUE selv) { - VALUE ret_r; - ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r); - return Qnil; + struct ractor_selector_clear_data data = { + .selv = selv, + .ec = ec, + }; + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + + st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)&data); + st_clear(s->take_ractors); + return selv; +} + +static int +ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat) +{ + rb_ractor_t *r = (rb_ractor_t *)key; + struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat; + int ret; + + if (!basket_none_p(tb)) { + RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type)); + return ST_STOP; + } + + RACTOR_LOCK(r); + { + if (basket_type_p(&r->sync.will_basket, basket_type_will)) { + RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r)); + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) { + ractor_take_will(r, tb); + ret = ST_STOP; + } + else { + RUBY_DEBUG_LOG("has will, but already taken"); + ret = ST_CONTINUE; + } + } + else if (r->sync.outgoing_port_closed) { + RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r)); + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) { + tb->sender = r->pub.self; + ret = ST_STOP; + } + else { + RUBY_DEBUG_LOG("closed, but already taken"); + ret = ST_CONTINUE; + } + } + else { + RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r)); + ractor_wakeup(r, wait_yielding, wakeup_by_take); + ret = ST_CONTINUE; + } + } + RACTOR_UNLOCK(r); + + return ret; +} + +// Ractor::Selector#wait + +static void +ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr) +{ + struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr; + + RACTOR_LOCK_SELF(cr); + { + while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0); + // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately. + tb->type.e = basket_type_reserved; + } + RACTOR_UNLOCK_SELF(cr); } static VALUE -ractor_take(rb_execution_context_t *ec, rb_ractor_t *r) +ractor_selector_wait(rb_execution_context_t *ec, VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move) { - VALUE ret_r; - VALUE v = ractor_select(ec, &r->pub.self, 1, Qundef, false, &ret_r); - return v; + struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); + struct rb_ractor_basket *tb = &s->take_basket; + struct rb_ractor_basket taken_basket; + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + bool do_receive = !!RTEST(do_receivev); + bool do_yield = !!RTEST(do_yieldv); + VALUE ret_v, ret_r; + enum rb_ractor_wait_status wait_status; + struct rb_ractor_queue *rq = &cr->sync.recv_queue; + struct rb_ractor_queue *ts = &cr->sync.takers_queue; + + RUBY_DEBUG_LOG("start"); + + retry: + RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries); + + // setup wait_status + wait_status = wait_none; + if (s->take_ractors->num_entries > 0) wait_status |= wait_taking; + if (do_receive) wait_status |= wait_receiving; + if (do_yield) wait_status |= wait_yielding; + + RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status)); + + if (wait_status == wait_none) { + rb_raise(rb_eRactorError, "no taking ractors"); + } + + // check recv_queue + if (do_receive && (ret_v = ractor_try_receive(ec, cr, rq)) != Qundef) { + ret_r = ID2SYM(rb_intern("receive")); + goto success; + } + + // check takers + if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) { + ret_v = Qnil; + ret_r = ID2SYM(rb_intern("yield")); + goto success; + } + + // check take_basket + VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved)); + s->take_basket.type.e = basket_type_none; + // kick all take target ractors + st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb); + + RACTOR_LOCK_SELF(cr); + { + retry_waiting: + while (1) { + if (!basket_none_p(tb)) { + RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type), + rb_ractor_id(RACTOR_PTR(tb->sender))); + break; + } + if (do_receive && !ractor_queue_empty_p(cr, rq)) { + RUBY_DEBUG_LOG("can receive (%d)", rq->cnt); + break; + } + if (do_yield && ractor_check_take_basket(cr, ts)) { + RUBY_DEBUG_LOG("can yield"); + break; + } + + ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb); + } + + taken_basket = *tb; + + // ensure + // tb->type.e = basket_type_reserved # do it atomic in the following code + if (taken_basket.type.e == basket_type_yielding || + RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) { + + if (basket_type_p(tb, basket_type_yielding)) { + RACTOR_UNLOCK_SELF(cr); + { + rb_thread_sleep(0); + } + RACTOR_LOCK_SELF(cr); + } + goto retry_waiting; + } + } + RACTOR_UNLOCK_SELF(cr); + + // check the taken resutl + switch (taken_basket.type.e) { + case basket_type_none: + VM_ASSERT(do_receive || do_yield); + goto retry; + case basket_type_yielding: + rb_bug("unreachable"); + case basket_type_deleted: { + rb_ractor_t *r = RACTOR_PTR(taken_basket.sender); + if (!ractor_take_will_lock(r, &taken_basket)) { + // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + // remove and retry wait + ractor_selector_remove(ec, selv, taken_basket.sender); + goto retry; + } + break; + } + default: + break; + } + + ret_v = ractor_basket_accept(&taken_basket); + ret_r = taken_basket.sender; + success: + return rb_ary_new_from_args(2, ret_r, ret_v); } +// Ractor#close_incoming + static VALUE ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) { @@ -1357,7 +1710,7 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) prev = Qfalse; r->sync.incoming_port_closed = true; if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) { - VM_ASSERT(r->sync.incoming_queue.cnt == 0); + VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue)); RUBY_DEBUG_LOG("cancel receiving"); } } @@ -1369,6 +1722,8 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) return prev; } +// Ractor#close_outgoing + static VALUE ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) { @@ -1376,27 +1731,49 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) RACTOR_LOCK(r); { + struct rb_ractor_queue *ts = &r->sync.takers_queue; + rb_ractor_t *tr; + struct rb_ractor_basket b; + if (!r->sync.outgoing_port_closed) { prev = Qfalse; r->sync.outgoing_port_closed = true; } else { + VM_ASSERT(ractor_queue_empty_p(r, ts)); prev = Qtrue; } // wakeup all taking ractors - rb_ractor_t *taking_ractor; - while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) { - RACTOR_LOCK(taking_ractor); - ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close); - RACTOR_UNLOCK(taking_ractor); + while (ractor_queue_deq(r, ts, &b)) { + if (basket_type_p(&b, basket_type_take_basket)) { + tr = RACTOR_PTR(b.sender); + struct rb_ractor_basket *tb = b.p.take.basket; + + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { + b.p.take.basket->sender = r->pub.self; + if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) { + rb_bug("unreachable"); + } + } + + if (b.p.take.config) { + b.p.take.config->closed = true; + } + + // TODO: deadlock-able? + RACTOR_LOCK(tr); + { + ractor_wakeup(tr, wait_taking, wakeup_by_close); + } + RACTOR_UNLOCK(tr); + } } // raising yielding Ractor - if (!r->yield_atexit && - ractor_wakeup(r, wait_yielding, wakeup_by_close)) { - RUBY_DEBUG_LOG("cancel yielding"); - } + ractor_wakeup(r, wait_yielding, wakeup_by_close); + + VM_ASSERT(ractor_queue_empty_p(r, ts)); } RACTOR_UNLOCK(r); return prev; @@ -1558,7 +1935,8 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->sync.incoming_queue); + ractor_queue_setup(&r->sync.recv_queue); + ractor_queue_setup(&r->sync.takers_queue); rb_native_mutex_initialize(&r->sync.lock); rb_native_cond_initialize(&r->sync.cond); rb_native_cond_initialize(&r->barrier_wait_cond); @@ -1624,28 +2002,23 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e ASSERT_ractor_unlocking(cr); - struct rb_ractor_basket basket; - ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */); + struct rb_ractor_queue *ts = &cr->sync.takers_queue; retry: - if (ractor_try_yield(ec, cr, &basket)) { + if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) { // OK. } else { bool retry = false; RACTOR_LOCK(cr); { - if (cr->sync.taking_ractors.cnt == 0) { - cr->sync.wait.yielded_basket = basket; - + if (!ractor_check_take_basket(cr, ts)) { VM_ASSERT(cr->sync.wait.status == wait_none); - cr->sync.wait.status = wait_yielding; - cr->sync.wait.wakeup_status = wakeup_none; - - VM_ASSERT(cr->yield_atexit == false); - cr->yield_atexit = true; + RUBY_DEBUG_LOG("leave a will"); + ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc); } else { + RUBY_DEBUG_LOG("rare timing!"); retry = true; // another ractor is waiting for the yield. } } @@ -1656,6 +2029,20 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e } void +rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_yield_atexit(ec, cr, result, false); +} + +void +rb_ractor_atexit_exception(rb_execution_context_t *ec) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_yield_atexit(ec, cr, ec->errinfo, true); +} + +void rb_ractor_teardown(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); @@ -1672,20 +2059,6 @@ rb_ractor_teardown(rb_execution_context_t *ec) } void -rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) -{ - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, result, false); -} - -void -rb_ractor_atexit_exception(rb_execution_context_t *ec) -{ - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, ec->errinfo, true); -} - -void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) { for (int i=0; i<len; i++) { @@ -2106,6 +2479,9 @@ Init_Ractor(void) rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1); rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1); + + rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject); + rb_undef_alloc_func(rb_cRactorSelector); } void |