diff options
author | Koichi Sasada <ko1@atdot.net> | 2020-12-08 00:42:20 +0900 |
---|---|---|
committer | Koichi Sasada <ko1@atdot.net> | 2020-12-09 01:40:38 +0900 |
commit | ee194af2aa170c0cb1bfd5fed4e84259a8150ece (patch) | |
tree | a65ce548fade86f8daddd6af525f4c665ca2f97d | |
parent | 2749123e213007e20d2f5e65ce941ff53feb4c76 (diff) |
re-layout rb_ractor_t
separate synchronization data and ractor local data.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/3858
-rw-r--r-- | ractor.c | 224 | ||||
-rw-r--r-- | ractor_core.h | 19 | ||||
-rw-r--r-- | vm_sync.c | 2 |
3 files changed, 123 insertions, 122 deletions
@@ -37,7 +37,7 @@ ASSERT_ractor_unlocking(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 // GET_EC is NULL in an MJIT worker - if (GET_EC() != NULL && r->locked_by == GET_RACTOR()->self) { + if (GET_EC() != NULL && r->sync.locked_by == GET_RACTOR()->self) { rb_bug("recursive ractor locking"); } #endif @@ -48,8 +48,8 @@ ASSERT_ractor_locking(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 // GET_EC is NULL in an MJIT worker - if (GET_EC() != NULL && r->locked_by != GET_RACTOR()->self) { - rp(r->locked_by); + if (GET_EC() != NULL && r->sync.locked_by != GET_RACTOR()->self) { + rp(r->sync.locked_by); rb_bug("ractor lock is not acquired."); } #endif @@ -61,11 +61,11 @@ ractor_lock(rb_ractor_t *r, const char *file, int line) RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); ASSERT_ractor_unlocking(r); - rb_native_mutex_lock(&r->lock); + rb_native_mutex_lock(&r->sync.lock); #if RACTOR_CHECK_MODE > 0 if (GET_EC() != NULL) { // GET_EC is NULL in an MJIT worker - r->locked_by = GET_RACTOR()->self; + r->sync.locked_by = GET_RACTOR()->self; } #endif @@ -76,7 +76,7 @@ static void ractor_lock_self(rb_ractor_t *cr, const char *file, int line) { VM_ASSERT(cr == GET_RACTOR()); - VM_ASSERT(cr->locked_by != cr->self); + VM_ASSERT(cr->sync.locked_by != cr->self); ractor_lock(cr, file, line); } @@ -85,9 +85,9 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line) { ASSERT_ractor_locking(r); #if RACTOR_CHECK_MODE > 0 - r->locked_by = Qnil; + r->sync.locked_by = Qnil; #endif - rb_native_mutex_unlock(&r->lock); + rb_native_mutex_unlock(&r->sync.lock); RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : ""); } @@ -96,7 +96,7 @@ static void ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) { VM_ASSERT(cr == GET_RACTOR()); - VM_ASSERT(cr->locked_by == cr->self); + VM_ASSERT(cr->sync.locked_by == cr->self); ractor_unlock(cr, file, line); } @@ -109,13 +109,13 @@ static void ractor_cond_wait(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->locked_by; - r->locked_by = Qnil; + VALUE locked_by = r->sync.locked_by; + r->sync.locked_by = Qnil; #endif - rb_native_cond_wait(&r->wait.cond, &r->lock); + rb_native_cond_wait(&r->sync.cond, &r->sync.lock); #if RACTOR_CHECK_MODE > 0 - r->locked_by = locked_by; + r->sync.locked_by = locked_by; #endif } @@ -186,11 +186,11 @@ ractor_mark(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - ractor_queue_mark(&r->incoming_queue); - rb_gc_mark(r->wait.taken_basket.v); - rb_gc_mark(r->wait.taken_basket.sender); - rb_gc_mark(r->wait.yielded_basket.v); - rb_gc_mark(r->wait.yielded_basket.sender); + ractor_queue_mark(&r->sync.incoming_queue); + rb_gc_mark(r->sync.wait.taken_basket.v); + rb_gc_mark(r->sync.wait.taken_basket.sender); + rb_gc_mark(r->sync.wait.yielded_basket.v); + rb_gc_mark(r->sync.wait.yielded_basket.sender); rb_gc_mark(r->loc); rb_gc_mark(r->name); rb_gc_mark(r->r_stdin); @@ -224,10 +224,10 @@ static void ractor_free(void *ptr) { rb_ractor_t *r = (rb_ractor_t *)ptr; - rb_native_mutex_destroy(&r->lock); - rb_native_cond_destroy(&r->wait.cond); - ractor_queue_free(&r->incoming_queue); - ractor_waiting_list_free(&r->taking_ractors); + rb_native_mutex_destroy(&r->sync.lock); + rb_native_cond_destroy(&r->sync.cond); + ractor_queue_free(&r->sync.incoming_queue); + ractor_waiting_list_free(&r->sync.taking_ractors); ractor_local_storage_free(r); ruby_xfree(r); } @@ -251,8 +251,8 @@ ractor_memsize(const void *ptr) // TODO return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->incoming_queue) + - ractor_waiting_list_memsize(&r->taking_ractors); + ractor_queue_memsize(&r->sync.incoming_queue) + + ractor_waiting_list_memsize(&r->sync.taking_ractors); } static const rb_data_type_t ractor_data_type = { @@ -407,11 +407,11 @@ ractor_basket_accept(struct rb_ractor_basket *b) static VALUE ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) { - struct rb_ractor_queue *rq = &r->incoming_queue; + struct rb_ractor_queue *rq = &r->sync.incoming_queue; struct rb_ractor_basket basket; if (ractor_queue_deq(r, rq, &basket) == false) { - if (r->incoming_port_closed) { + if (r->sync.incoming_port_closed) { rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); } else { @@ -427,11 +427,11 @@ ractor_sleep_wo_gvl(void *ptr) { rb_ractor_t *cr = ptr; RACTOR_LOCK_SELF(cr); - VM_ASSERT(cr->wait.status != wait_none); - if (cr->wait.wakeup_status == wakeup_none) { + VM_ASSERT(cr->sync.wait.status != wait_none); + if (cr->sync.wait.wakeup_status == wakeup_none) { ractor_cond_wait(cr); } - cr->wait.status = wait_none; + cr->sync.wait.status = wait_none; RACTOR_UNLOCK_SELF(cr); return NULL; } @@ -442,9 +442,9 @@ ractor_sleep_interrupt(void *ptr) rb_ractor_t *r = ptr; RACTOR_LOCK(r); - if (r->wait.wakeup_status == wakeup_none) { - r->wait.wakeup_status = wakeup_by_interrupt; - rb_native_cond_signal(&r->wait.cond); + if (r->sync.wait.wakeup_status == wakeup_none) { + r->sync.wait.wakeup_status = wakeup_by_interrupt; + rb_native_cond_signal(&r->sync.cond); } RACTOR_UNLOCK(r); } @@ -486,9 +486,9 @@ static void ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) { VM_ASSERT(GET_RACTOR() == cr); - VM_ASSERT(cr->wait.status != wait_none); + VM_ASSERT(cr->sync.wait.status != wait_none); // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr, - // wait_status_str(cr->wait.status), wakeup_status_str(cr->wait.wakeup_status)); + // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); RACTOR_UNLOCK(cr); rb_nogvl(ractor_sleep_wo_gvl, cr, @@ -500,7 +500,7 @@ ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) static bool ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status) { - return (r->wait.status & wait_status) && r->wait.wakeup_status == wakeup_none; + return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none; } static bool @@ -509,12 +509,12 @@ ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_w ASSERT_ractor_locking(r); // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r, - // wait_status_str(r->wait.status), wait_status_str(wait_status), - // wakeup_status_str(r->wait.wakeup_status), wakeup_status_str(wakeup_status)); + // wait_status_str(r->sync.wait.status), wait_status_str(wait_status), + // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status)); if (ractor_sleeping_by(r, wait_status)) { - r->wait.wakeup_status = wakeup_status; - rb_native_cond_signal(&r->wait.cond); + r->sync.wait.wakeup_status = wakeup_status; + rb_native_cond_signal(&r->sync.cond); return true; } else { @@ -536,12 +536,12 @@ ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr) } else { // insert cr into taking list - struct rb_ractor_waiting_list *wl = &r->taking_ractors; + struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors; for (int i=0; i<wl->cnt; i++) { if (wl->ractors[i] == cr) { // TODO: make it clean code. - rb_native_mutex_unlock(&r->lock); + rb_native_mutex_unlock(&r->sync.lock); rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting."); } } @@ -564,11 +564,11 @@ ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr) if (retry_try) { RACTOR_LOCK(cr); { - if (cr->wait.wakeup_status == wakeup_none) { - VM_ASSERT(cr->wait.status != wait_none); + if (cr->sync.wait.wakeup_status == wakeup_none) { + VM_ASSERT(cr->sync.wait.status != wait_none); - cr->wait.wakeup_status = wakeup_by_retry; - cr->wait.status = wait_none; + cr->sync.wait.wakeup_status = wakeup_by_retry; + cr->sync.wait.status = wait_none; } } RACTOR_UNLOCK(cr); @@ -601,7 +601,7 @@ static rb_ractor_t * ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) { ASSERT_ractor_locking(r); - VM_ASSERT(&r->taking_ractors == wl); + VM_ASSERT(&r->sync.taking_ractors == wl); if (wl->cnt > 0) { rb_ractor_t *tr = wl->ractors[0]; @@ -625,14 +625,14 @@ ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r) while ((v = ractor_try_receive(ec, r)) == Qundef) { RACTOR_LOCK(r); { - if (ractor_queue_empty_p(r, &r->incoming_queue)) { - VM_ASSERT(r->wait.status == wait_none); - r->wait.status = wait_receiving; - r->wait.wakeup_status = wakeup_none; + if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) { + VM_ASSERT(r->sync.wait.status == wait_none); + r->sync.wait.status = wait_receiving; + r->sync.wait.wakeup_status = wakeup_none; ractor_sleep(ec, r); - r->wait.wakeup_status = wakeup_none; + r->sync.wait.wakeup_status = wakeup_none; } } RACTOR_UNLOCK(r); @@ -645,11 +645,11 @@ static void ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) { bool closed = false; - struct rb_ractor_queue *rq = &r->incoming_queue; + struct rb_ractor_queue *rq = &r->sync.incoming_queue; RACTOR_LOCK(r); { - if (r->incoming_port_closed) { + if (r->sync.incoming_port_closed) { closed = true; } else { @@ -713,11 +713,11 @@ ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r) RACTOR_LOCK(r); { if (ractor_wakeup(r, wait_yielding, wakeup_by_take)) { - VM_ASSERT(r->wait.yielded_basket.type != basket_type_none); - basket = r->wait.yielded_basket; - ractor_basket_clear(&r->wait.yielded_basket); + VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none); + basket = r->sync.wait.yielded_basket; + ractor_basket_clear(&r->sync.wait.yielded_basket); } - else if (r->outgoing_port_closed) { + else if (r->sync.outgoing_port_closed) { closed = true; } else { @@ -745,7 +745,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b ASSERT_ractor_unlocking(cr); VM_ASSERT(basket->type != basket_type_none); - if (cr->outgoing_port_closed) { + if (cr->sync.outgoing_port_closed) { rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); } @@ -754,7 +754,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b retry_shift: RACTOR_LOCK(cr); { - r = ractor_waiting_list_shift(cr, &cr->taking_ractors); + r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors); } RACTOR_UNLOCK(cr); @@ -764,8 +764,8 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b RACTOR_LOCK(r); { if (ractor_wakeup(r, wait_taking, wakeup_by_yield)) { - VM_ASSERT(r->wait.taken_basket.type == basket_type_none); - r->wait.taken_basket = *basket; + VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none); + r->sync.wait.taken_basket = *basket; } else { retry_shift = true; @@ -809,10 +809,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield VALUE v; } *actions = ALLOCA_N(struct ractor_select_action, alen + (yield_p ? 1 : 0)); - VM_ASSERT(cr->wait.status == wait_none); - VM_ASSERT(cr->wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->wait.yielded_basket.type == basket_type_none); + VM_ASSERT(cr->sync.wait.status == wait_none); + VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); + VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); + VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); // setup actions for (i=0; i<alen; i++) { @@ -842,7 +842,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield wait_status |= wait_yielding; alen++; - ractor_basket_setup(ec, &cr->wait.yielded_basket, yielded_value, move, false, false); + ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false); } // TODO: shuffle actions @@ -872,7 +872,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield break; case ractor_select_action_yield: { - if (ractor_try_yield(ec, cr, &cr->wait.yielded_basket)) { + if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) { *ret_r = ID2SYM(rb_intern("yield")); ret = Qnil; goto cleanup; @@ -886,9 +886,9 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield RACTOR_LOCK(cr); { - VM_ASSERT(cr->wait.status == wait_none); - cr->wait.status = wait_status; - cr->wait.wakeup_status == wakeup_none; + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_status; + cr->sync.wait.wakeup_status == wakeup_none; } RACTOR_UNLOCK(cr); @@ -909,7 +909,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield // wait RACTOR_LOCK(cr); { - if (cr->wait.wakeup_status == wakeup_none) { + if (cr->sync.wait.wakeup_status == wakeup_none) { for (i=0; i<alen; i++) { rb_ractor_t *r; @@ -918,41 +918,41 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield r = RACTOR_PTR(actions[i].v); if (ractor_sleeping_by(r, wait_yielding)) { RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->id); - cr->wait.wakeup_status = wakeup_by_retry; + cr->sync.wait.wakeup_status = wakeup_by_retry; goto skip_sleep; } break; case ractor_select_action_receive: - if (cr->incoming_queue.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->incoming_queue.cnt); - cr->wait.wakeup_status = wakeup_by_retry; + if (cr->sync.incoming_queue.cnt > 0) { + RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt); + cr->sync.wait.wakeup_status = wakeup_by_retry; goto skip_sleep; } break; case ractor_select_action_yield: - if (cr->taking_ractors.cnt > 0) { - RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->taking_ractors.cnt); - cr->wait.wakeup_status = wakeup_by_retry; + if (cr->sync.taking_ractors.cnt > 0) { + RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt); + cr->sync.wait.wakeup_status = wakeup_by_retry; goto skip_sleep; } - else if (cr->outgoing_port_closed) { - cr->wait.wakeup_status = wakeup_by_close; + else if (cr->sync.outgoing_port_closed) { + cr->sync.wait.wakeup_status = wakeup_by_close; goto skip_sleep; } break; } } - RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->wait.status)); + RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status)); ractor_sleep(ec, cr); - RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->wait.wakeup_status)); + RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status)); } else { skip_sleep: RUBY_DEBUG_LOG("no need to sleep %s->%s", - wait_status_str(cr->wait.status), - wakeup_status_str(cr->wait.wakeup_status)); - cr->wait.status = wait_none; + wait_status_str(cr->sync.wait.status), + wakeup_status_str(cr->sync.wait.wakeup_status)); + cr->sync.wait.status = wait_none; } } RACTOR_UNLOCK(cr); @@ -963,7 +963,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield switch (actions[i].type) { case ractor_select_action_take: r = RACTOR_PTR(actions[i].v); - ractor_waiting_list_del(r, &r->taking_ractors, cr); + ractor_waiting_list_del(r, &r->sync.taking_ractors, cr); break; case ractor_select_action_receive: case ractor_select_action_yield: @@ -972,8 +972,8 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield } // check results - enum ractor_wakeup_status wakeup_status = cr->wait.wakeup_status; - cr->wait.wakeup_status = wakeup_none; + enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status; + cr->sync.wait.wakeup_status = wakeup_none; switch (wakeup_status) { case wakeup_none: @@ -990,10 +990,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield case wakeup_by_yield: // take was succeeded! // cr.wait.taken_basket contains passed block - VM_ASSERT(cr->wait.taken_basket.type != basket_type_none); - *ret_r = cr->wait.taken_basket.sender; + VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none); + *ret_r = cr->sync.wait.taken_basket.sender; VM_ASSERT(rb_ractor_p(*ret_r)); - ret = ractor_basket_accept(&cr->wait.taken_basket); + ret = ractor_basket_accept(&cr->sync.wait.taken_basket); goto cleanup; case wakeup_by_take: *ret_r = ID2SYM(rb_intern("yield")); @@ -1013,14 +1013,14 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield cleanup: RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status)); - if (cr->wait.yielded_basket.type != basket_type_none) { - ractor_basket_clear(&cr->wait.yielded_basket); + if (cr->sync.wait.yielded_basket.type != basket_type_none) { + ractor_basket_clear(&cr->sync.wait.yielded_basket); } - VM_ASSERT(cr->wait.status == wait_none); - VM_ASSERT(cr->wait.wakeup_status == wakeup_none); - VM_ASSERT(cr->wait.taken_basket.type == basket_type_none); - VM_ASSERT(cr->wait.yielded_basket.type == basket_type_none); + VM_ASSERT(cr->sync.wait.status == wait_none); + VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); + VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); + VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); if (interrupted) { rb_vm_check_ints_blocking(ec); @@ -1055,11 +1055,11 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) RACTOR_LOCK(r); { - if (!r->incoming_port_closed) { + if (!r->sync.incoming_port_closed) { prev = Qfalse; - r->incoming_port_closed = true; + r->sync.incoming_port_closed = true; if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) { - VM_ASSERT(r->incoming_queue.cnt == 0); + VM_ASSERT(r->sync.incoming_queue.cnt == 0); RUBY_DEBUG_LOG("cancel receiving", 0); } } @@ -1078,9 +1078,9 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) RACTOR_LOCK(r); { - if (!r->outgoing_port_closed) { + if (!r->sync.outgoing_port_closed) { prev = Qfalse; - r->outgoing_port_closed = true; + r->sync.outgoing_port_closed = true; } else { prev = Qtrue; @@ -1088,7 +1088,7 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) // wakeup all taking ractors rb_ractor_t *taking_ractor; - while ((taking_ractor = ractor_waiting_list_shift(r, &r->taking_ractors)) != NULL) { + while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) { RACTOR_LOCK(taking_ractor); ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close); RACTOR_UNLOCK(taking_ractor); @@ -1242,9 +1242,9 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->incoming_queue); - rb_native_mutex_initialize(&r->lock); - rb_native_cond_initialize(&r->wait.cond); + ractor_queue_setup(&r->sync.incoming_queue); + rb_native_mutex_initialize(&r->sync.lock); + rb_native_cond_initialize(&r->sync.cond); rb_native_cond_initialize(&r->barrier_wait_cond); // thread management @@ -1309,7 +1309,7 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL static void ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) { - if (cr->outgoing_port_closed) { + if (cr->sync.outgoing_port_closed) { return; } @@ -1326,12 +1326,12 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e bool retry = false; RACTOR_LOCK(cr); { - if (cr->taking_ractors.cnt == 0) { - cr->wait.yielded_basket = basket; + if (cr->sync.taking_ractors.cnt == 0) { + cr->sync.wait.yielded_basket = basket; - VM_ASSERT(cr->wait.status == wait_none); - cr->wait.status = wait_yielding; - cr->wait.wakeup_status = wakeup_none; + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_yielding; + cr->sync.wait.wakeup_status = wakeup_none; VM_ASSERT(cr->yield_atexit == false); cr->yield_atexit = true; diff --git a/ractor_core.h b/ractor_core.h index 3f367ad559..c97dfcc85f 100644 --- a/ractor_core.h +++ b/ractor_core.h @@ -36,23 +36,20 @@ struct rb_ractor_waiting_list { rb_ractor_t **ractors; }; -struct rb_random_struct; // c.f. ruby/random.h - -struct rb_ractor_struct { +struct rb_ractor_sync { // ractor lock rb_nativethread_lock_t lock; #if RACTOR_CHECK_MODE > 0 VALUE locked_by; #endif + rb_nativethread_cond_t cond; // communication struct rb_ractor_queue incoming_queue; + struct rb_ractor_waiting_list taking_ractors; bool incoming_port_closed; bool outgoing_port_closed; - bool yield_atexit; - - struct rb_ractor_waiting_list taking_ractors; struct ractor_wait { enum ractor_wait_status { @@ -72,11 +69,15 @@ struct rb_ractor_struct { wakeup_by_retry, } wakeup_status; - struct rb_ractor_basket taken_basket; struct rb_ractor_basket yielded_basket; - - rb_nativethread_cond_t cond; + struct rb_ractor_basket taken_basket; } wait; +}; + +struct rb_ractor_struct { + struct rb_ractor_sync sync; + + bool yield_atexit; // vm wide barrier synchronization rb_nativethread_cond_t barrier_wait_cond; @@ -49,7 +49,7 @@ vm_lock_enter(rb_ractor_t *cr, rb_vm_t *vm, bool locked, bool no_barrier, unsign else { #if RACTOR_CHECK_MODE // locking ractor and acquire VM lock will cause deadlock - VM_ASSERT(cr->locked_by != cr->self); + VM_ASSERT(cr->sync.locked_by != cr->self); #endif // lock |