summaryrefslogtreecommitdiff
path: root/ractor.c
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.c')
-rw-r--r--ractor.c224
1 files changed, 112 insertions, 112 deletions
diff --git a/ractor.c b/ractor.c
index 7d4404c92f..4b02b3fcd8 100644
--- a/ractor.c
+++ b/ractor.c
@@ -37,7 +37,7 @@ ASSERT_ractor_unlocking(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
// GET_EC is NULL in an MJIT worker
- if (GET_EC() != NULL && r->locked_by == GET_RACTOR()->self) {
+ if (GET_EC() != NULL && r->sync.locked_by == GET_RACTOR()->self) {
rb_bug("recursive ractor locking");
}
#endif
@@ -48,8 +48,8 @@ ASSERT_ractor_locking(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
// GET_EC is NULL in an MJIT worker
- if (GET_EC() != NULL && r->locked_by != GET_RACTOR()->self) {
- rp(r->locked_by);
+ if (GET_EC() != NULL && r->sync.locked_by != GET_RACTOR()->self) {
+ rp(r->sync.locked_by);
rb_bug("ractor lock is not acquired.");
}
#endif
@@ -61,11 +61,11 @@ ractor_lock(rb_ractor_t *r, const char *file, int line)
RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : "");
ASSERT_ractor_unlocking(r);
- rb_native_mutex_lock(&r->lock);
+ rb_native_mutex_lock(&r->sync.lock);
#if RACTOR_CHECK_MODE > 0
if (GET_EC() != NULL) { // GET_EC is NULL in an MJIT worker
- r->locked_by = GET_RACTOR()->self;
+ r->sync.locked_by = GET_RACTOR()->self;
}
#endif
@@ -76,7 +76,7 @@ static void
ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
{
VM_ASSERT(cr == GET_RACTOR());
- VM_ASSERT(cr->locked_by != cr->self);
+ VM_ASSERT(cr->sync.locked_by != cr->self);
ractor_lock(cr, file, line);
}
@@ -85,9 +85,9 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line)
{
ASSERT_ractor_locking(r);
#if RACTOR_CHECK_MODE > 0
- r->locked_by = Qnil;
+ r->sync.locked_by = Qnil;
#endif
- rb_native_mutex_unlock(&r->lock);
+ rb_native_mutex_unlock(&r->sync.lock);
RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->id, GET_RACTOR() == r ? " (self)" : "");
}
@@ -96,7 +96,7 @@ static void
ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
{
VM_ASSERT(cr == GET_RACTOR());
- VM_ASSERT(cr->locked_by == cr->self);
+ VM_ASSERT(cr->sync.locked_by == cr->self);
ractor_unlock(cr, file, line);
}
@@ -109,13 +109,13 @@ static void
ractor_cond_wait(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
- VALUE locked_by = r->locked_by;
- r->locked_by = Qnil;
+ VALUE locked_by = r->sync.locked_by;
+ r->sync.locked_by = Qnil;
#endif
- rb_native_cond_wait(&r->wait.cond, &r->lock);
+ rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
#if RACTOR_CHECK_MODE > 0
- r->locked_by = locked_by;
+ r->sync.locked_by = locked_by;
#endif
}
@@ -186,11 +186,11 @@ ractor_mark(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
- ractor_queue_mark(&r->incoming_queue);
- rb_gc_mark(r->wait.taken_basket.v);
- rb_gc_mark(r->wait.taken_basket.sender);
- rb_gc_mark(r->wait.yielded_basket.v);
- rb_gc_mark(r->wait.yielded_basket.sender);
+ ractor_queue_mark(&r->sync.incoming_queue);
+ rb_gc_mark(r->sync.wait.taken_basket.v);
+ rb_gc_mark(r->sync.wait.taken_basket.sender);
+ rb_gc_mark(r->sync.wait.yielded_basket.v);
+ rb_gc_mark(r->sync.wait.yielded_basket.sender);
rb_gc_mark(r->loc);
rb_gc_mark(r->name);
rb_gc_mark(r->r_stdin);
@@ -224,10 +224,10 @@ static void
ractor_free(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
- rb_native_mutex_destroy(&r->lock);
- rb_native_cond_destroy(&r->wait.cond);
- ractor_queue_free(&r->incoming_queue);
- ractor_waiting_list_free(&r->taking_ractors);
+ rb_native_mutex_destroy(&r->sync.lock);
+ rb_native_cond_destroy(&r->sync.cond);
+ ractor_queue_free(&r->sync.incoming_queue);
+ ractor_waiting_list_free(&r->sync.taking_ractors);
ractor_local_storage_free(r);
ruby_xfree(r);
}
@@ -251,8 +251,8 @@ ractor_memsize(const void *ptr)
// TODO
return sizeof(rb_ractor_t) +
- ractor_queue_memsize(&r->incoming_queue) +
- ractor_waiting_list_memsize(&r->taking_ractors);
+ ractor_queue_memsize(&r->sync.incoming_queue) +
+ ractor_waiting_list_memsize(&r->sync.taking_ractors);
}
static const rb_data_type_t ractor_data_type = {
@@ -407,11 +407,11 @@ ractor_basket_accept(struct rb_ractor_basket *b)
static VALUE
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
{
- struct rb_ractor_queue *rq = &r->incoming_queue;
+ struct rb_ractor_queue *rq = &r->sync.incoming_queue;
struct rb_ractor_basket basket;
if (ractor_queue_deq(r, rq, &basket) == false) {
- if (r->incoming_port_closed) {
+ if (r->sync.incoming_port_closed) {
rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
}
else {
@@ -427,11 +427,11 @@ ractor_sleep_wo_gvl(void *ptr)
{
rb_ractor_t *cr = ptr;
RACTOR_LOCK_SELF(cr);
- VM_ASSERT(cr->wait.status != wait_none);
- if (cr->wait.wakeup_status == wakeup_none) {
+ VM_ASSERT(cr->sync.wait.status != wait_none);
+ if (cr->sync.wait.wakeup_status == wakeup_none) {
ractor_cond_wait(cr);
}
- cr->wait.status = wait_none;
+ cr->sync.wait.status = wait_none;
RACTOR_UNLOCK_SELF(cr);
return NULL;
}
@@ -442,9 +442,9 @@ ractor_sleep_interrupt(void *ptr)
rb_ractor_t *r = ptr;
RACTOR_LOCK(r);
- if (r->wait.wakeup_status == wakeup_none) {
- r->wait.wakeup_status = wakeup_by_interrupt;
- rb_native_cond_signal(&r->wait.cond);
+ if (r->sync.wait.wakeup_status == wakeup_none) {
+ r->sync.wait.wakeup_status = wakeup_by_interrupt;
+ rb_native_cond_signal(&r->sync.cond);
}
RACTOR_UNLOCK(r);
}
@@ -486,9 +486,9 @@ static void
ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
{
VM_ASSERT(GET_RACTOR() == cr);
- VM_ASSERT(cr->wait.status != wait_none);
+ VM_ASSERT(cr->sync.wait.status != wait_none);
// fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr,
- // wait_status_str(cr->wait.status), wakeup_status_str(cr->wait.wakeup_status));
+ // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
RACTOR_UNLOCK(cr);
rb_nogvl(ractor_sleep_wo_gvl, cr,
@@ -500,7 +500,7 @@ ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
static bool
ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status)
{
- return (r->wait.status & wait_status) && r->wait.wakeup_status == wakeup_none;
+ return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
}
static bool
@@ -509,12 +509,12 @@ ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_w
ASSERT_ractor_locking(r);
// fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r,
- // wait_status_str(r->wait.status), wait_status_str(wait_status),
- // wakeup_status_str(r->wait.wakeup_status), wakeup_status_str(wakeup_status));
+ // wait_status_str(r->sync.wait.status), wait_status_str(wait_status),
+ // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status));
if (ractor_sleeping_by(r, wait_status)) {
- r->wait.wakeup_status = wakeup_status;
- rb_native_cond_signal(&r->wait.cond);
+ r->sync.wait.wakeup_status = wakeup_status;
+ rb_native_cond_signal(&r->sync.cond);
return true;
}
else {
@@ -536,12 +536,12 @@ ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
}
else {
// insert cr into taking list
- struct rb_ractor_waiting_list *wl = &r->taking_ractors;
+ struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors;
for (int i=0; i<wl->cnt; i++) {
if (wl->ractors[i] == cr) {
// TODO: make it clean code.
- rb_native_mutex_unlock(&r->lock);
+ rb_native_mutex_unlock(&r->sync.lock);
rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting.");
}
}
@@ -564,11 +564,11 @@ ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
if (retry_try) {
RACTOR_LOCK(cr);
{
- if (cr->wait.wakeup_status == wakeup_none) {
- VM_ASSERT(cr->wait.status != wait_none);
+ if (cr->sync.wait.wakeup_status == wakeup_none) {
+ VM_ASSERT(cr->sync.wait.status != wait_none);
- cr->wait.wakeup_status = wakeup_by_retry;
- cr->wait.status = wait_none;
+ cr->sync.wait.wakeup_status = wakeup_by_retry;
+ cr->sync.wait.status = wait_none;
}
}
RACTOR_UNLOCK(cr);
@@ -601,7 +601,7 @@ static rb_ractor_t *
ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
{
ASSERT_ractor_locking(r);
- VM_ASSERT(&r->taking_ractors == wl);
+ VM_ASSERT(&r->sync.taking_ractors == wl);
if (wl->cnt > 0) {
rb_ractor_t *tr = wl->ractors[0];
@@ -625,14 +625,14 @@ ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
while ((v = ractor_try_receive(ec, r)) == Qundef) {
RACTOR_LOCK(r);
{
- if (ractor_queue_empty_p(r, &r->incoming_queue)) {
- VM_ASSERT(r->wait.status == wait_none);
- r->wait.status = wait_receiving;
- r->wait.wakeup_status = wakeup_none;
+ if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) {
+ VM_ASSERT(r->sync.wait.status == wait_none);
+ r->sync.wait.status = wait_receiving;
+ r->sync.wait.wakeup_status = wakeup_none;
ractor_sleep(ec, r);
- r->wait.wakeup_status = wakeup_none;
+ r->sync.wait.wakeup_status = wakeup_none;
}
}
RACTOR_UNLOCK(r);
@@ -645,11 +645,11 @@ static void
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
{
bool closed = false;
- struct rb_ractor_queue *rq = &r->incoming_queue;
+ struct rb_ractor_queue *rq = &r->sync.incoming_queue;
RACTOR_LOCK(r);
{
- if (r->incoming_port_closed) {
+ if (r->sync.incoming_port_closed) {
closed = true;
}
else {
@@ -713,11 +713,11 @@ ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r);
{
if (ractor_wakeup(r, wait_yielding, wakeup_by_take)) {
- VM_ASSERT(r->wait.yielded_basket.type != basket_type_none);
- basket = r->wait.yielded_basket;
- ractor_basket_clear(&r->wait.yielded_basket);
+ VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none);
+ basket = r->sync.wait.yielded_basket;
+ ractor_basket_clear(&r->sync.wait.yielded_basket);
}
- else if (r->outgoing_port_closed) {
+ else if (r->sync.outgoing_port_closed) {
closed = true;
}
else {
@@ -745,7 +745,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b
ASSERT_ractor_unlocking(cr);
VM_ASSERT(basket->type != basket_type_none);
- if (cr->outgoing_port_closed) {
+ if (cr->sync.outgoing_port_closed) {
rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
}
@@ -754,7 +754,7 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b
retry_shift:
RACTOR_LOCK(cr);
{
- r = ractor_waiting_list_shift(cr, &cr->taking_ractors);
+ r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors);
}
RACTOR_UNLOCK(cr);
@@ -764,8 +764,8 @@ ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_b
RACTOR_LOCK(r);
{
if (ractor_wakeup(r, wait_taking, wakeup_by_yield)) {
- VM_ASSERT(r->wait.taken_basket.type == basket_type_none);
- r->wait.taken_basket = *basket;
+ VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none);
+ r->sync.wait.taken_basket = *basket;
}
else {
retry_shift = true;
@@ -809,10 +809,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
VALUE v;
} *actions = ALLOCA_N(struct ractor_select_action, alen + (yield_p ? 1 : 0));
- VM_ASSERT(cr->wait.status == wait_none);
- VM_ASSERT(cr->wait.wakeup_status == wakeup_none);
- VM_ASSERT(cr->wait.taken_basket.type == basket_type_none);
- VM_ASSERT(cr->wait.yielded_basket.type == basket_type_none);
+ VM_ASSERT(cr->sync.wait.status == wait_none);
+ VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
+ VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
+ VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
// setup actions
for (i=0; i<alen; i++) {
@@ -842,7 +842,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
wait_status |= wait_yielding;
alen++;
- ractor_basket_setup(ec, &cr->wait.yielded_basket, yielded_value, move, false, false);
+ ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false);
}
// TODO: shuffle actions
@@ -872,7 +872,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
break;
case ractor_select_action_yield:
{
- if (ractor_try_yield(ec, cr, &cr->wait.yielded_basket)) {
+ if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) {
*ret_r = ID2SYM(rb_intern("yield"));
ret = Qnil;
goto cleanup;
@@ -886,9 +886,9 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
RACTOR_LOCK(cr);
{
- VM_ASSERT(cr->wait.status == wait_none);
- cr->wait.status = wait_status;
- cr->wait.wakeup_status == wakeup_none;
+ VM_ASSERT(cr->sync.wait.status == wait_none);
+ cr->sync.wait.status = wait_status;
+ cr->sync.wait.wakeup_status == wakeup_none;
}
RACTOR_UNLOCK(cr);
@@ -909,7 +909,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
// wait
RACTOR_LOCK(cr);
{
- if (cr->wait.wakeup_status == wakeup_none) {
+ if (cr->sync.wait.wakeup_status == wakeup_none) {
for (i=0; i<alen; i++) {
rb_ractor_t *r;
@@ -918,41 +918,41 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
r = RACTOR_PTR(actions[i].v);
if (ractor_sleeping_by(r, wait_yielding)) {
RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->id);
- cr->wait.wakeup_status = wakeup_by_retry;
+ cr->sync.wait.wakeup_status = wakeup_by_retry;
goto skip_sleep;
}
break;
case ractor_select_action_receive:
- if (cr->incoming_queue.cnt > 0) {
- RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->incoming_queue.cnt);
- cr->wait.wakeup_status = wakeup_by_retry;
+ if (cr->sync.incoming_queue.cnt > 0) {
+ RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt);
+ cr->sync.wait.wakeup_status = wakeup_by_retry;
goto skip_sleep;
}
break;
case ractor_select_action_yield:
- if (cr->taking_ractors.cnt > 0) {
- RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->taking_ractors.cnt);
- cr->wait.wakeup_status = wakeup_by_retry;
+ if (cr->sync.taking_ractors.cnt > 0) {
+ RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt);
+ cr->sync.wait.wakeup_status = wakeup_by_retry;
goto skip_sleep;
}
- else if (cr->outgoing_port_closed) {
- cr->wait.wakeup_status = wakeup_by_close;
+ else if (cr->sync.outgoing_port_closed) {
+ cr->sync.wait.wakeup_status = wakeup_by_close;
goto skip_sleep;
}
break;
}
}
- RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->wait.status));
+ RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status));
ractor_sleep(ec, cr);
- RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->wait.wakeup_status));
+ RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status));
}
else {
skip_sleep:
RUBY_DEBUG_LOG("no need to sleep %s->%s",
- wait_status_str(cr->wait.status),
- wakeup_status_str(cr->wait.wakeup_status));
- cr->wait.status = wait_none;
+ wait_status_str(cr->sync.wait.status),
+ wakeup_status_str(cr->sync.wait.wakeup_status));
+ cr->sync.wait.status = wait_none;
}
}
RACTOR_UNLOCK(cr);
@@ -963,7 +963,7 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
switch (actions[i].type) {
case ractor_select_action_take:
r = RACTOR_PTR(actions[i].v);
- ractor_waiting_list_del(r, &r->taking_ractors, cr);
+ ractor_waiting_list_del(r, &r->sync.taking_ractors, cr);
break;
case ractor_select_action_receive:
case ractor_select_action_yield:
@@ -972,8 +972,8 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
}
// check results
- enum ractor_wakeup_status wakeup_status = cr->wait.wakeup_status;
- cr->wait.wakeup_status = wakeup_none;
+ enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status;
+ cr->sync.wait.wakeup_status = wakeup_none;
switch (wakeup_status) {
case wakeup_none:
@@ -990,10 +990,10 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
case wakeup_by_yield:
// take was succeeded!
// cr.wait.taken_basket contains passed block
- VM_ASSERT(cr->wait.taken_basket.type != basket_type_none);
- *ret_r = cr->wait.taken_basket.sender;
+ VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none);
+ *ret_r = cr->sync.wait.taken_basket.sender;
VM_ASSERT(rb_ractor_p(*ret_r));
- ret = ractor_basket_accept(&cr->wait.taken_basket);
+ ret = ractor_basket_accept(&cr->sync.wait.taken_basket);
goto cleanup;
case wakeup_by_take:
*ret_r = ID2SYM(rb_intern("yield"));
@@ -1013,14 +1013,14 @@ ractor_select(rb_execution_context_t *ec, const VALUE *rs, int alen, VALUE yield
cleanup:
RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status));
- if (cr->wait.yielded_basket.type != basket_type_none) {
- ractor_basket_clear(&cr->wait.yielded_basket);
+ if (cr->sync.wait.yielded_basket.type != basket_type_none) {
+ ractor_basket_clear(&cr->sync.wait.yielded_basket);
}
- VM_ASSERT(cr->wait.status == wait_none);
- VM_ASSERT(cr->wait.wakeup_status == wakeup_none);
- VM_ASSERT(cr->wait.taken_basket.type == basket_type_none);
- VM_ASSERT(cr->wait.yielded_basket.type == basket_type_none);
+ VM_ASSERT(cr->sync.wait.status == wait_none);
+ VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
+ VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
+ VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
if (interrupted) {
rb_vm_check_ints_blocking(ec);
@@ -1055,11 +1055,11 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r);
{
- if (!r->incoming_port_closed) {
+ if (!r->sync.incoming_port_closed) {
prev = Qfalse;
- r->incoming_port_closed = true;
+ r->sync.incoming_port_closed = true;
if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
- VM_ASSERT(r->incoming_queue.cnt == 0);
+ VM_ASSERT(r->sync.incoming_queue.cnt == 0);
RUBY_DEBUG_LOG("cancel receiving", 0);
}
}
@@ -1078,9 +1078,9 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r);
{
- if (!r->outgoing_port_closed) {
+ if (!r->sync.outgoing_port_closed) {
prev = Qfalse;
- r->outgoing_port_closed = true;
+ r->sync.outgoing_port_closed = true;
}
else {
prev = Qtrue;
@@ -1088,7 +1088,7 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
// wakeup all taking ractors
rb_ractor_t *taking_ractor;
- while ((taking_ractor = ractor_waiting_list_shift(r, &r->taking_ractors)) != NULL) {
+ while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) {
RACTOR_LOCK(taking_ractor);
ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close);
RACTOR_UNLOCK(taking_ractor);
@@ -1242,9 +1242,9 @@ rb_ractor_living_threads_init(rb_ractor_t *r)
static void
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
{
- ractor_queue_setup(&r->incoming_queue);
- rb_native_mutex_initialize(&r->lock);
- rb_native_cond_initialize(&r->wait.cond);
+ ractor_queue_setup(&r->sync.incoming_queue);
+ rb_native_mutex_initialize(&r->sync.lock);
+ rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond);
// thread management
@@ -1309,7 +1309,7 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL
static void
ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
{
- if (cr->outgoing_port_closed) {
+ if (cr->sync.outgoing_port_closed) {
return;
}
@@ -1326,12 +1326,12 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
bool retry = false;
RACTOR_LOCK(cr);
{
- if (cr->taking_ractors.cnt == 0) {
- cr->wait.yielded_basket = basket;
+ if (cr->sync.taking_ractors.cnt == 0) {
+ cr->sync.wait.yielded_basket = basket;
- VM_ASSERT(cr->wait.status == wait_none);
- cr->wait.status = wait_yielding;
- cr->wait.wakeup_status = wakeup_none;
+ VM_ASSERT(cr->sync.wait.status == wait_none);
+ cr->sync.wait.status = wait_yielding;
+ cr->sync.wait.wakeup_status = wakeup_none;
VM_ASSERT(cr->yield_atexit == false);
cr->yield_atexit = true;