summaryrefslogtreecommitdiff
path: root/ractor.c
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.c')
-rw-r--r--ractor.c1952
1 files changed, 31 insertions, 1921 deletions
diff --git a/ractor.c b/ractor.c
index b2446439a3..24a57ebf30 100644
--- a/ractor.c
+++ b/ractor.c
@@ -178,37 +178,21 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status)
// Ractor data/mark/free
-static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i);
static void ractor_local_storage_mark(rb_ractor_t *r);
static void ractor_local_storage_free(rb_ractor_t *r);
-static void
-ractor_queue_mark(struct rb_ractor_queue *rq)
-{
- for (int i=0; i<rq->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
- rb_gc_mark(b->sender);
-
- switch (b->type.e) {
- case basket_type_yielding:
- case basket_type_take_basket:
- case basket_type_deleted:
- case basket_type_reserved:
- // ignore
- break;
- default:
- rb_gc_mark(b->p.send.v);
- }
- }
-}
+static void ractor_sync_mark(rb_ractor_t *r);
+static void ractor_sync_free(rb_ractor_t *r);
+static size_t ractor_sync_memsize(const rb_ractor_t *r);
+static void ractor_sync_init(rb_ractor_t *r);
static void
ractor_mark(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
- ractor_queue_mark(&r->sync.recv_queue);
- ractor_queue_mark(&r->sync.takers_queue);
+ // mark received messages
+ ractor_sync_mark(r);
rb_gc_mark(r->loc);
rb_gc_mark(r->name);
@@ -229,19 +213,14 @@ ractor_mark(void *ptr)
}
static void
-ractor_queue_free(struct rb_ractor_queue *rq)
-{
- free(rq->baskets);
-}
-
-static void
ractor_free(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
rb_native_mutex_destroy(&r->sync.lock);
- ractor_queue_free(&r->sync.recv_queue);
- ractor_queue_free(&r->sync.takers_queue);
+#ifdef RUBY_THREAD_WIN32_H
+ rb_native_cond_destroy(&r->sync.wakeup_cond);
+#endif
ractor_local_storage_free(r);
rb_hook_list_free(&r->pub.hooks);
@@ -252,24 +231,17 @@ ractor_free(void *ptr)
r->newobj_cache = NULL;
}
+ ractor_sync_free(r);
ruby_xfree(r);
}
static size_t
-ractor_queue_memsize(const struct rb_ractor_queue *rq)
-{
- return sizeof(struct rb_ractor_basket) * rq->size;
-}
-
-static size_t
ractor_memsize(const void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
// TODO: more correct?
- return sizeof(rb_ractor_t) +
- ractor_queue_memsize(&r->sync.recv_queue) +
- ractor_queue_memsize(&r->sync.takers_queue);
+ return sizeof(rb_ractor_t) + ractor_sync_memsize(r);
}
static const rb_data_type_t ractor_data_type = {
@@ -317,1714 +289,7 @@ rb_ractor_current_id(void)
}
#endif
-// Ractor queue
-
-static void
-ractor_queue_setup(struct rb_ractor_queue *rq)
-{
- rq->size = 2;
- rq->cnt = 0;
- rq->start = 0;
- rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
-}
-
-static struct rb_ractor_basket *
-ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq)
-{
- if (r != NULL) ASSERT_ractor_locking(r);
- return &rq->baskets[rq->start];
-}
-
-static struct rb_ractor_basket *
-ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
-{
- if (r != NULL) ASSERT_ractor_locking(r);
- return &rq->baskets[(rq->start + i) % rq->size];
-}
-
-static void
-ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq)
-{
- ASSERT_ractor_locking(r);
-
- if (rq->reserved_cnt == 0) {
- rq->cnt--;
- rq->start = (rq->start + 1) % rq->size;
- rq->serial++;
- }
- else {
- ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted;
- }
-}
-
-static bool
-ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
-{
- struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
- return basket_type_p(b, basket_type_deleted) ||
- basket_type_p(b, basket_type_reserved);
-}
-
-static void
-ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
-{
- ASSERT_ractor_locking(r);
-
- while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) {
- ractor_queue_advance(r, rq);
- }
-}
-
-static bool
-ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
-{
- ASSERT_ractor_locking(r);
-
- if (rq->cnt == 0) {
- return true;
- }
-
- ractor_queue_compact(r, rq);
-
- for (int i=0; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(r, rq, i)) {
- return false;
- }
- }
-
- return true;
-}
-
-static bool
-ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
-{
- ASSERT_ractor_locking(r);
-
- for (int i=0; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(r, rq, i)) {
- struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
- *basket = *b;
-
- // remove from queue
- b->type.e = basket_type_deleted;
- ractor_queue_compact(r, rq);
- return true;
- }
- }
-
- return false;
-}
-
-static void
-ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
-{
- ASSERT_ractor_locking(r);
-
- if (rq->size <= rq->cnt) {
- rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
- for (int i=rq->size - rq->start; i<rq->cnt; i++) {
- rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
- }
- rq->size *= 2;
- }
- // copy basket into queue
- rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
- // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
-}
-
-static void
-ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
-{
- basket->type.e = basket_type_deleted;
-}
-
-// Ractor basket
-
-static VALUE ractor_reset_belonging(VALUE obj); // in this file
-
-static VALUE
-ractor_basket_value(struct rb_ractor_basket *b)
-{
- switch (b->type.e) {
- case basket_type_ref:
- break;
- case basket_type_copy:
- case basket_type_move:
- case basket_type_will:
- b->type.e = basket_type_ref;
- b->p.send.v = ractor_reset_belonging(b->p.send.v);
- break;
- default:
- rb_bug("unreachable");
- }
-
- return b->p.send.v;
-}
-
-static VALUE
-ractor_basket_accept(struct rb_ractor_basket *b)
-{
- VALUE v = ractor_basket_value(b);
-
- // a ractor's main thread had an error and yielded us this exception during its dying moments
- if (b->p.send.exception) {
- VALUE cause = v;
- VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
- rb_ivar_set(err, rb_intern("@ractor"), b->sender);
- rb_ec_setup_exception(NULL, err, cause);
- rb_exc_raise(err);
- }
-
- return v;
-}
-
-// Ractor synchronizations
-
-#if USE_RUBY_DEBUG_LOG
-static const char *
-wait_status_str(enum rb_ractor_wait_status wait_status)
-{
- switch ((int)wait_status) {
- case wait_none: return "none";
- case wait_receiving: return "receiving";
- case wait_taking: return "taking";
- case wait_yielding: return "yielding";
- case wait_receiving|wait_taking: return "receiving|taking";
- case wait_receiving|wait_yielding: return "receiving|yielding";
- case wait_taking|wait_yielding: return "taking|yielding";
- case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
- }
- rb_bug("unreachable");
-}
-
-static const char *
-wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
-{
- switch (wakeup_status) {
- case wakeup_none: return "none";
- case wakeup_by_send: return "by_send";
- case wakeup_by_yield: return "by_yield";
- case wakeup_by_take: return "by_take";
- case wakeup_by_close: return "by_close";
- case wakeup_by_interrupt: return "by_interrupt";
- case wakeup_by_retry: return "by_retry";
- }
- rb_bug("unreachable");
-}
-
-static const char *
-basket_type_name(enum rb_ractor_basket_type type)
-{
- switch (type) {
- case basket_type_none: return "none";
- case basket_type_ref: return "ref";
- case basket_type_copy: return "copy";
- case basket_type_move: return "move";
- case basket_type_will: return "will";
- case basket_type_deleted: return "deleted";
- case basket_type_reserved: return "reserved";
- case basket_type_take_basket: return "take_basket";
- case basket_type_yielding: return "yielding";
- }
- VM_ASSERT(0);
- return NULL;
-}
-#endif // USE_RUBY_DEBUG_LOG
-
-static rb_thread_t *
-ractor_sleeping_by(const rb_ractor_t *r, rb_thread_t *th, enum rb_ractor_wait_status wait_status)
-{
- if (th) {
- if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) {
- return th;
- }
- }
- else {
- // find any thread that has this ractor wait status that is blocked
- ccan_list_for_each(&r->sync.wait.waiting_threads, th, ractor_waiting.waiting_node) {
- if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) {
- return th;
- }
- }
- }
- return NULL;
-}
-
-#ifdef RUBY_THREAD_PTHREAD_H
-// thread_*.c
-void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th);
-#else
-
-// win32
-static void
-rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
-{
- (void)r;
- ASSERT_ractor_locking(r);
- rb_native_cond_signal(&th->ractor_waiting.cond);
-
-}
-#endif
-
-
-/*
- * Wakeup `r` if the given `th` is blocked and has the given ractor `wait_status`.
- * Wakeup any blocked thread in `r` with the given ractor `wait_status` if `th` is NULL.
- */
-static bool
-ractor_wakeup(rb_ractor_t *r, rb_thread_t *th /* can be NULL */, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
-{
- ASSERT_ractor_locking(r);
-
- RUBY_DEBUG_LOG("r:%u wait:%s wakeup:%s",
- rb_ractor_id(r),
- wait_status_str(wait_status),
- wakeup_status_str(wakeup_status));
-
- if ((th = ractor_sleeping_by(r, th, wait_status)) != NULL) {
- th->ractor_waiting.wakeup_status = wakeup_status;
- rb_ractor_sched_wakeup(r, th);
- return true;
- }
- else {
- return false;
- }
-}
-
-// unblock function (UBF). This gets called when another thread on this or another ractor sets our thread's interrupt flag.
-// This is not async-safe.
-static void
-ractor_sleep_interrupt(void *ptr)
-{
- rb_execution_context_t *ec = ptr;
- rb_ractor_t *r = rb_ec_ractor_ptr(ec);
- rb_thread_t *th = rb_ec_thread_ptr(ec);
-
- RACTOR_LOCK(r);
- {
- ractor_wakeup(r, th, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
- }
- RACTOR_UNLOCK(r);
-}
-
-typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
-
-// Checks the current thread for ruby interrupts and runs the cleanup function `cf_func` with `cf_data` if
-// `rb_ec_check_ints` is going to raise. See the `rb_threadptr_execute_interrupts` for info on when it can raise.
-static void
-ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, ractor_sleep_cleanup_function cf_func, void *cf_data)
-{
- if (cur_th->ractor_waiting.wait_status != wait_none) {
- enum rb_ractor_wait_status prev_wait_status = cur_th->ractor_waiting.wait_status;
- cur_th->ractor_waiting.wait_status = wait_none;
- cur_th->ractor_waiting.wakeup_status = wakeup_by_interrupt;
-
- RACTOR_UNLOCK(cr);
- {
- if (cf_func) {
- enum ruby_tag_type state;
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- rb_ec_check_ints(ec);
- }
- EC_POP_TAG();
-
- if (state) {
- (*cf_func)(cr, cf_data); // cleanup function is run after the ubf, if it had ubf
- EC_JUMP_TAG(ec, state);
- }
- }
- else {
- rb_ec_check_ints(ec);
- }
- }
-
- RACTOR_LOCK(cr);
- cur_th->ractor_waiting.wait_status = prev_wait_status;
- }
-}
-
-#ifdef RUBY_THREAD_PTHREAD_H
-void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
-#else
-
-static void
-ractor_cond_wait(rb_ractor_t *r, rb_thread_t *th)
-{
-#if RACTOR_CHECK_MODE > 0
- VALUE locked_by = r->sync.locked_by;
- r->sync.locked_by = Qnil;
-#endif
- rb_native_cond_wait(&th->ractor_waiting.cond, &r->sync.lock);
-
-#if RACTOR_CHECK_MODE > 0
- r->sync.locked_by = locked_by;
-#endif
-}
-
-static void *
-ractor_sleep_wo_gvl(void *ptr)
-{
- rb_ractor_t *cr = ptr;
- rb_execution_context_t *ec = cr->threads.running_ec;
- VM_ASSERT(GET_EC() == ec);
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
- RACTOR_LOCK_SELF(cr);
- {
- VM_ASSERT(cur_th->ractor_waiting.wait_status != wait_none);
- // it's possible that another ractor has woken us up (ractor_wakeup),
- // so check this condition
- if (cur_th->ractor_waiting.wakeup_status == wakeup_none) {
- cur_th->status = THREAD_STOPPED_FOREVER;
- ractor_cond_wait(cr, cur_th);
- cur_th->status = THREAD_RUNNABLE;
- VM_ASSERT(cur_th->ractor_waiting.wakeup_status != wakeup_none);
- }
- else {
- RUBY_DEBUG_LOG("rare timing, no cond wait");
- }
- cur_th->ractor_waiting.wait_status = wait_none;
- }
- RACTOR_UNLOCK_SELF(cr);
- return NULL;
-}
-
-static void
-rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_ractor_sleep_interrupt)
-{
- ASSERT_ractor_locking(cr);
- rb_thread_t *th = rb_ec_thread_ptr(ec);
- struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node;
- VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked
- ccan_list_add(&cr->sync.wait.waiting_threads, waitn);
- RACTOR_UNLOCK(cr);
- {
- rb_nogvl(ractor_sleep_wo_gvl, cr, ubf_ractor_sleep_interrupt, ec, RB_NOGVL_INTR_FAIL);
- }
- RACTOR_LOCK(cr);
- ccan_list_del_init(waitn);
-}
-#endif
-
-/*
- * Sleep the current ractor's current thread until another ractor wakes us up or another thread calls our unblock function.
- * The following ractor actions can cause this function to be called:
- * Ractor#take (wait_taking)
- * Ractor.yield (wait_yielding)
- * Ractor.receive (wait_receiving)
- * Ractor.select (can be a combination of the above wait states, depending on the states of the ractors passed to Ractor.select)
- */
-static enum rb_ractor_wakeup_status
-ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status,
- ractor_sleep_cleanup_function cf_func, void *cf_data)
-{
- ASSERT_ractor_locking(cr);
- enum rb_ractor_wakeup_status wakeup_status;
- VM_ASSERT(GET_RACTOR() == cr);
-
- VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none);
- VM_ASSERT(wait_status != wait_none);
- cur_th->ractor_waiting.wait_status = wait_status;
- cur_th->ractor_waiting.wakeup_status = wakeup_none;
-
- // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
- // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
-
- RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
-
- while (cur_th->ractor_waiting.wakeup_status == wakeup_none) {
- rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
- ractor_check_ints(ec, cr, cur_th, cf_func, cf_data);
- }
-
- cur_th->ractor_waiting.wait_status = wait_none;
-
- wakeup_status = cur_th->ractor_waiting.wakeup_status;
- cur_th->ractor_waiting.wakeup_status = wakeup_none;
-
- RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
-
- ASSERT_ractor_locking(cr);
- return wakeup_status;
-}
-
-static enum rb_ractor_wakeup_status
-ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status)
-{
- return ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, 0, NULL);
-}
-
-// Ractor.receive
-
-static void
-ractor_recursive_receive_if(rb_thread_t *th)
-{
- if (th->ractor_waiting.receiving_mutex && rb_mutex_owned_p(th->ractor_waiting.receiving_mutex)) {
- rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
- }
-}
-
-static VALUE
-ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
-{
- struct rb_ractor_basket basket;
- ractor_recursive_receive_if(rb_ec_thread_ptr(ec));
- bool received = false;
-
- RACTOR_LOCK_SELF(cr);
- {
- RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt);
- received = ractor_queue_deq(cr, rq, &basket);
- }
- RACTOR_UNLOCK_SELF(cr);
-
- if (!received) {
- if (cr->sync.incoming_port_closed) {
- rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
- }
- return Qundef;
- }
- else {
- return ractor_basket_accept(&basket);
- }
-}
-
-static void
-ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
-{
- VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
- ractor_recursive_receive_if(cur_th);
-
- RACTOR_LOCK(cr);
- {
- while (ractor_queue_empty_p(cr, rq) && !cr->sync.incoming_port_closed) {
- ractor_sleep(ec, cr, cur_th, wait_receiving);
- }
- }
- RACTOR_UNLOCK(cr);
-}
-
-static VALUE
-ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
-{
- VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
- VALUE v;
- struct rb_ractor_queue *rq = &cr->sync.recv_queue;
-
- while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) {
- ractor_wait_receive(ec, cr, rq);
- }
-
- return v;
-}
-
-#if 0
-static void
-rq_dump(struct rb_ractor_queue *rq)
-{
- bool bug = false;
- for (int i=0; i<rq->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
- fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
- (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
- if (basket_type_p(b, basket_type_reserved) bug = true;
- }
- if (bug) rb_bug("!!");
-}
-#endif
-
-struct receive_block_data {
- rb_ractor_t *cr;
- rb_thread_t *th;
- struct rb_ractor_queue *rq;
- VALUE v;
- int index;
- bool success;
-};
-
-static void
-ractor_receive_if_lock(rb_thread_t *th)
-{
- VALUE m = th->ractor_waiting.receiving_mutex;
- if (m == Qfalse) {
- m = th->ractor_waiting.receiving_mutex = rb_mutex_new();
- }
- rb_mutex_lock(m);
-}
-
-static VALUE
-receive_if_body(VALUE ptr)
-{
- struct receive_block_data *data = (struct receive_block_data *)ptr;
-
- ractor_receive_if_lock(data->th);
- VALUE block_result = rb_yield(data->v);
- rb_ractor_t *cr = data->cr;
-
- RACTOR_LOCK_SELF(cr);
- {
- struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
- VM_ASSERT(basket_type_p(b, basket_type_reserved));
- data->rq->reserved_cnt--;
-
- if (RTEST(block_result)) {
- ractor_queue_delete(cr, data->rq, b);
- ractor_queue_compact(cr, data->rq);
- }
- else {
- b->type.e = basket_type_ref;
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-
- data->success = true;
-
- if (RTEST(block_result)) {
- return data->v;
- }
- else {
- return Qundef;
- }
-}
-
-static VALUE
-receive_if_ensure(VALUE v)
-{
- struct receive_block_data *data = (struct receive_block_data *)v;
- rb_ractor_t *cr = data->cr;
- rb_thread_t *cur_th = data->th;
-
- if (!data->success) {
- RACTOR_LOCK_SELF(cr);
- {
- struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
- VM_ASSERT(basket_type_p(b, basket_type_reserved));
- b->type.e = basket_type_deleted;
- data->rq->reserved_cnt--;
- }
- RACTOR_UNLOCK_SELF(cr);
- }
-
- rb_mutex_unlock(cur_th->ractor_waiting.receiving_mutex);
- return Qnil;
-}
-
-static VALUE
-ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
-{
- if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
-
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
- unsigned int serial = (unsigned int)-1;
- int index = 0;
- struct rb_ractor_queue *rq = &cr->sync.recv_queue;
-
- while (1) {
- VALUE v = Qundef;
-
- ractor_wait_receive(ec, cr, rq);
-
- RACTOR_LOCK_SELF(cr);
- {
- if (serial != rq->serial) {
- serial = rq->serial;
- index = 0;
- }
-
- // check newer version
- for (int i=index; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(cr, rq, i)) {
- struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i);
- v = ractor_basket_value(b);
- b->type.e = basket_type_reserved;
- rq->reserved_cnt++;
- index = i;
- break;
- }
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-
- if (!UNDEF_P(v)) {
- struct receive_block_data data = {
- .cr = cr,
- .th = cur_th,
- .rq = rq,
- .v = v,
- .index = index,
- .success = false,
- };
-
- VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
- receive_if_ensure, (VALUE)&data);
-
- if (!UNDEF_P(result)) return result;
- index++;
- }
-
- RUBY_VM_CHECK_INTS(ec);
- }
-}
-
-static void
-ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
-{
- bool closed = false;
-
- RACTOR_LOCK(r);
- {
- if (r->sync.incoming_port_closed) {
- closed = true;
- }
- else {
- ractor_queue_enq(r, &r->sync.recv_queue, b);
- // wakeup any receiving thread in `r`
- ractor_wakeup(r, NULL, wait_receiving, wakeup_by_send);
- }
- }
- RACTOR_UNLOCK(r);
-
- if (closed) {
- rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
- }
-}
-
-// Ractor#send
-
-static VALUE ractor_move(VALUE obj); // in this file
-static VALUE ractor_copy(VALUE obj); // in this file
-
-static void
-ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype)
-{
- VALUE v;
- enum rb_ractor_basket_type type;
-
- if (rb_ractor_shareable_p(obj)) {
- type = basket_type_ref;
- v = obj;
- }
- else if (!RTEST(move)) {
- v = ractor_copy(obj);
- type = basket_type_copy;
- }
- else {
- type = basket_type_move;
- v = ractor_move(obj);
- }
-
- *pobj = v;
- *ptype = type;
-}
-
-static void
-ractor_basket_fill_(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc)
-{
- VM_ASSERT(cr == GET_RACTOR());
-
- basket->sender = cr->pub.self;
- basket->sending_th = cur_th;
- basket->p.send.exception = exc;
- basket->p.send.v = obj;
-}
-
-static void
-ractor_basket_fill(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
-{
- VALUE v;
- enum rb_ractor_basket_type type;
- ractor_basket_prepare_contents(obj, move, &v, &type);
- ractor_basket_fill_(cr, cur_th, basket, v, exc);
- basket->type.e = type;
-}
-
-static void
-ractor_basket_fill_will(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc)
-{
- ractor_basket_fill_(cr, cur_th, basket, obj, exc);
- basket->type.e = basket_type_will;
-}
-
-static VALUE
-ractor_send(rb_execution_context_t *ec, rb_ractor_t *recv_r, VALUE obj, VALUE move)
-{
- struct rb_ractor_basket basket;
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
- // TODO: Ractor local GC
- ractor_basket_fill(cr, cur_th, &basket, obj, move, false);
- ractor_send_basket(ec, recv_r, &basket);
- return recv_r->pub.self;
-}
-
-// Ractor#take
-
-static bool
-ractor_take_has_will(rb_ractor_t *r)
-{
- ASSERT_ractor_locking(r);
-
- return basket_type_p(&r->sync.will_basket, basket_type_will);
-}
-
-static bool
-ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b)
-{
- ASSERT_ractor_locking(r);
-
- if (ractor_take_has_will(r)) {
- *b = r->sync.will_basket;
- r->sync.will_basket.type.e = basket_type_none;
- return true;
- }
- else {
- VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none));
- return false;
- }
-}
-
-static bool
-ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
-{
- ASSERT_ractor_unlocking(r);
- bool taken;
-
- RACTOR_LOCK(r);
- {
- taken = ractor_take_will(r, b);
- }
- RACTOR_UNLOCK(r);
-
- return taken;
-}
-
-static bool
-ractor_register_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
- bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
-{
- struct rb_ractor_basket b = {
- .type.e = basket_type_take_basket,
- .sender = cr->pub.self,
- .sending_th = cur_th,
- .p = {
- .take = {
- .basket = take_basket, // pointer to our stack value saved in ractor `r` queue
- .config = config,
- },
- },
- };
- bool closed = false;
-
- RACTOR_LOCK(r);
- {
- if (is_take && ractor_take_will(r, take_basket)) {
- RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r));
- }
- else if (!is_take && ractor_take_has_will(r)) {
- RUBY_DEBUG_LOG("has_will");
- VM_ASSERT(config != NULL);
- config->closed = true;
- }
- else if (r->sync.outgoing_port_closed) {
- closed = true;
- }
- else {
- RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r));
- ractor_queue_enq(r, &r->sync.takers_queue, &b);
-
- if (basket_none_p(take_basket)) {
- // wakeup any thread in `r` that has yielded, if there is any.
- ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take);
- }
- }
- }
- RACTOR_UNLOCK(r);
-
- if (closed) {
- if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
- return false;
- }
- else {
- return true;
- }
-}
-
-static bool
-ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
-{
- struct rb_ractor_queue *ts = &r->sync.takers_queue;
- bool deleted = false;
-
- RACTOR_LOCK(r);
- {
- if (r->sync.outgoing_port_closed) {
- // ok
- }
- else {
- for (int i=0; i<ts->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
- if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) {
- ractor_queue_delete(r, ts, b);
- deleted = true;
- }
- }
- if (deleted) {
- ractor_queue_compact(r, ts);
- }
- }
- }
- RACTOR_UNLOCK(r);
-
- return deleted;
-}
-
-static VALUE
-ractor_try_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *recv_r, struct rb_ractor_basket *take_basket)
-{
- bool taken;
-
- RACTOR_LOCK_SELF(cr);
- {
- // If it hasn't yielded yet or is currently in the process of yielding, sleep more
- if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
- taken = false;
- }
- else {
- taken = true; // basket type might be, for ex, basket_type_copy if value was copied during yield
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-
- if (taken) {
- RUBY_DEBUG_LOG("taken");
- if (basket_type_p(take_basket, basket_type_deleted)) {
- VM_ASSERT(recv_r->sync.outgoing_port_closed);
- rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
- }
- return ractor_basket_accept(take_basket);
- }
- else {
- RUBY_DEBUG_LOG("not taken");
- return Qundef;
- }
-}
-
-
-#if VM_CHECK_MODE > 0
-static bool
-ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb)
-{
- bool ret = false;
- struct rb_ractor_queue *ts = &r->sync.takers_queue;
-
- RACTOR_LOCK(r);
- {
- for (int i=0; i<ts->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
- if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) {
- ret = true;
- break;
- }
- }
- }
- RACTOR_UNLOCK(r);
-
- return ret;
-}
-#endif
-
-// cleanup function, cr is unlocked
-static void
-ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
-{
- retry:
- if (basket_none_p(tb)) { // not yielded yet
- if (!ractor_deregister_take(r, tb)) {
- // not in r's takers queue
- rb_thread_sleep(0);
- goto retry;
- }
- }
- else {
- VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb));
- }
-}
-
-struct take_wait_take_cleanup_data {
- rb_ractor_t *r;
- struct rb_ractor_basket *tb;
-};
-
-static void
-ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
-{
- struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr;
- ractor_take_cleanup(cr, data->r, data->tb);
-}
-
-static void
-ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
-{
- struct take_wait_take_cleanup_data data = {
- .r = r,
- .tb = take_basket,
- };
-
- RACTOR_LOCK_SELF(cr);
- {
- if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
- ractor_sleep_with_cleanup(ec, cr, cur_th, wait_taking, ractor_wait_take_cleanup, &data);
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-}
-
-static VALUE
-ractor_take(rb_execution_context_t *ec, rb_ractor_t *recv_r)
-{
- RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(recv_r));
- VALUE v;
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
-
- struct rb_ractor_basket take_basket = {
- .type.e = basket_type_none,
- .sender = 0,
- };
-
- ractor_register_take(cr, cur_th, recv_r, &take_basket, true, NULL, false);
-
- while (UNDEF_P(v = ractor_try_take(cr, cur_th, recv_r, &take_basket))) {
- ractor_wait_take(ec, cr, cur_th, recv_r, &take_basket);
- }
-
- VM_ASSERT(!basket_none_p(&take_basket)); // might be, for ex, basket_type_copy
- VM_ASSERT(!ractor_check_specific_take_basket_lock(recv_r, &take_basket));
-
- return v;
-}
-
-// Ractor.yield
-
-static bool
-ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
-{
- ASSERT_ractor_locking(cr);
-
- for (int i=0; i<rs->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i);
- if (basket_type_p(b, basket_type_take_basket) &&
- basket_none_p(b->p.take.basket)) {
- return true;
- }
- }
-
- return false;
-}
-
-// Find another ractor that is taking from this ractor, so we can yield to it
-static bool
-ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
-{
- ASSERT_ractor_unlocking(cr);
- struct rb_ractor_basket *first_tb = NULL;
- bool found = false;
-
- RACTOR_LOCK_SELF(cr);
- {
- while (ractor_queue_deq(cr, rs, b)) {
- if (basket_type_p(b, basket_type_take_basket)) { // some other ractor is taking
- struct rb_ractor_basket *tb = b->p.take.basket;
-
- if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
- found = true; // payload basket is now "yielding" type
- break;
- }
- else {
- ractor_queue_enq(cr, rs, b);
- if (first_tb == NULL) first_tb = tb;
- struct rb_ractor_basket *head = ractor_queue_head(cr, rs);
- VM_ASSERT(head != NULL);
- if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) {
- break; // loop detected
- }
- }
- }
- else {
- VM_ASSERT(basket_none_p(b));
- }
- }
-
- if (found && b->p.take.config && !b->p.take.config->oneshot) {
- ractor_queue_enq(cr, rs, b);
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-
- return found;
-}
-
-// Try yielding to a taking ractor
-static bool
-ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will)
-{
- // Don't lock yielding ractor at same time as taking ractor. This could deadlock due to timing
- // issue because we don't have a lock hierarchy.
- ASSERT_ractor_unlocking(cr);
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
-
- struct rb_ractor_basket b;
-
- if (ractor_deq_take_basket(cr, ts, &b)) { // deq a take basket from takers queue of `cr` into `b`
- VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
- VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
-
- rb_ractor_t *tr = RACTOR_PTR(b.sender); // taking ractor
- rb_thread_t *tr_th = b.sending_th; // taking thread
- struct rb_ractor_basket *tb = b.p.take.basket; // payload basket
- enum rb_ractor_basket_type type;
-
- RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
-
- if (is_will) {
- type = basket_type_will; // last message
- }
- else {
- enum ruby_tag_type state;
-
- // begin
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- // TODO: Ractor local GC
- ractor_basket_prepare_contents(obj, move, &obj, &type);
- }
- EC_POP_TAG();
- // rescue ractor copy/move error, then re-raise
- if (state) {
- RACTOR_LOCK_SELF(cr);
- {
- b.p.take.basket->type.e = basket_type_none;
- ractor_queue_enq(cr, ts, &b);
- }
- RACTOR_UNLOCK_SELF(cr);
- EC_JUMP_TAG(ec, state);
- }
- }
-
- RACTOR_LOCK(tr);
- {
- VM_ASSERT(basket_type_p(tb, basket_type_yielding));
- // fill atomic
- RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
- ractor_basket_fill_(cr, cur_th, tb, obj, exc); // fill the take basket payload
- if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
- rb_bug("unreachable");
- }
- ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_yield);
- }
- RACTOR_UNLOCK(tr);
-
- return true;
- }
- else if (cr->sync.outgoing_port_closed) {
- rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
- }
- else {
- RUBY_DEBUG_LOG("no take basket");
- return false;
- }
-}
-
-static void
-ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
-{
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
- RACTOR_LOCK_SELF(cr);
- {
- while (!ractor_check_take_basket(cr, ts) && !cr->sync.outgoing_port_closed) {
- ractor_sleep(ec, cr, cur_th, wait_yielding);
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-}
-
-// In order to yield, we wait until our takers queue has at least one element. Then, we wakeup a taker.
-static VALUE
-ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
-{
- struct rb_ractor_queue *ts = &cr->sync.takers_queue;
-
- while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) {
- ractor_wait_yield(ec, cr, ts);
- }
-
- return Qnil;
-}
-
-// Ractor::Selector
-
-struct rb_ractor_selector {
- rb_ractor_t *r;
- struct rb_ractor_basket take_basket;
- st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
-};
-
-static int
-ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data)
-{
- const rb_ractor_t *r = (rb_ractor_t *)key;
- rb_gc_mark(r->pub.self);
- return ST_CONTINUE;
-}
-
-static void
-ractor_selector_mark(void *ptr)
-{
- struct rb_ractor_selector *s = ptr;
-
- if (s->take_ractors) {
- st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0);
- }
-
- switch (s->take_basket.type.e) {
- case basket_type_ref:
- case basket_type_copy:
- case basket_type_move:
- case basket_type_will:
- rb_gc_mark(s->take_basket.sender);
- rb_gc_mark(s->take_basket.p.send.v);
- break;
- default:
- break;
- }
-}
-
-static int
-ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data)
-{
- struct rb_ractor_selector *s = (struct rb_ractor_selector *)data;
- struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val;
-
- if (!config->closed) {
- ractor_deregister_take((rb_ractor_t *)key, &s->take_basket);
- }
- free(config);
- return ST_CONTINUE;
-}
-
-static void
-ractor_selector_free(void *ptr)
-{
- struct rb_ractor_selector *s = ptr;
- st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s);
- st_free_table(s->take_ractors);
- ruby_xfree(ptr);
-}
-
-static size_t
-ractor_selector_memsize(const void *ptr)
-{
- const struct rb_ractor_selector *s = ptr;
- return sizeof(struct rb_ractor_selector) +
- st_memsize(s->take_ractors) +
- s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config);
-}
-
-static const rb_data_type_t ractor_selector_data_type = {
- "ractor/selector",
- {
- ractor_selector_mark,
- ractor_selector_free,
- ractor_selector_memsize,
- NULL, // update
- },
- 0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
-};
-
-static struct rb_ractor_selector *
-RACTOR_SELECTOR_PTR(VALUE selv)
-{
- VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
-
- return (struct rb_ractor_selector *)DATA_PTR(selv);
-}
-
-// Ractor::Selector.new
-
-static VALUE
-ractor_selector_create(VALUE klass)
-{
- struct rb_ractor_selector *s;
- VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s);
- s->take_basket.type.e = basket_type_reserved;
- s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config
- return selv;
-}
-
-// Ractor::Selector#add(r)
-
-/*
- * call-seq:
- * add(ractor) -> ractor
- *
- * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added.
- * Returns _ractor_.
- */
-static VALUE
-ractor_selector_add(VALUE selv, VALUE rv)
-{
- if (!rb_ractor_p(rv)) {
- rb_raise(rb_eArgError, "Not a ractor object");
- }
-
- rb_ractor_t *r = RACTOR_PTR(rv);
- struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
-
- if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
- rb_raise(rb_eArgError, "already added");
- }
-
- struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
- VM_ASSERT(config != NULL);
- config->closed = false;
- config->oneshot = false;
-
- if (ractor_register_take(GET_RACTOR(), GET_THREAD(), r, &s->take_basket, false, config, true)) {
- st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
- }
-
- return rv;
-}
-
-// Ractor::Selector#remove(r)
-
-/* call-seq:
- * remove(ractor) -> ractor
- *
- * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added.
- * Returns the removed _ractor_.
- */
-static VALUE
-ractor_selector_remove(VALUE selv, VALUE rv)
-{
- if (!rb_ractor_p(rv)) {
- rb_raise(rb_eArgError, "Not a ractor object");
- }
-
- rb_ractor_t *r = RACTOR_PTR(rv);
- struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
-
- RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
-
- if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
- rb_raise(rb_eArgError, "not added yet");
- }
-
- ractor_deregister_take(r, &s->take_basket);
- struct rb_ractor_selector_take_config *config;
- st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
- free(config);
-
- return rv;
-}
-
-// Ractor::Selector#clear
-
-struct ractor_selector_clear_data {
- VALUE selv;
- rb_execution_context_t *ec;
-};
-
-static int
-ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data)
-{
- VALUE selv = (VALUE)data;
- rb_ractor_t *r = (rb_ractor_t *)key;
- ractor_selector_remove(selv, r->pub.self);
- return ST_CONTINUE;
-}
-
-/*
- * call-seq:
- * clear -> self
- *
- * Removes all ractors from +self+. Raises +self+.
- */
-static VALUE
-ractor_selector_clear(VALUE selv)
-{
- struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
-
- st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
- st_clear(s->take_ractors);
- return selv;
-}
-
-/*
- * call-seq:
- * empty? -> true or false
- *
- * Returns +true+ if no ractor is added.
- */
-static VALUE
-ractor_selector_empty_p(VALUE selv)
-{
- struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
- return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
-}
-
-static int
-ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
-{
- rb_ractor_t *r = (rb_ractor_t *)key;
- struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat;
- int ret;
-
- if (!basket_none_p(tb)) {
- RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e));
- return ST_STOP;
- }
-
- RACTOR_LOCK(r);
- {
- if (basket_type_p(&r->sync.will_basket, basket_type_will)) {
- RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r));
-
- if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) {
- ractor_take_will(r, tb);
- ret = ST_STOP;
- }
- else {
- RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e));
- ret = ST_CONTINUE;
- }
- }
- else if (r->sync.outgoing_port_closed) {
- RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r));
-
- if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) {
- tb->sender = r->pub.self;
- ret = ST_STOP;
- }
- else {
- RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e));
- ret = ST_CONTINUE;
- }
- }
- else {
- RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
- ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take);
- ret = ST_CONTINUE;
- }
- }
- RACTOR_UNLOCK(r);
-
- return ret;
-}
-
-// Ractor::Selector#wait
-
-// cleanup function, cr is unlocked
-static void
-ractor_selector_wait_cleanup(rb_ractor_t *cr, void *ptr)
-{
- struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
-
- RACTOR_LOCK_SELF(cr);
- {
- while (basket_type_p(tb, basket_type_yielding)) {
- RACTOR_UNLOCK_SELF(cr);
- {
- rb_thread_sleep(0);
- }
- RACTOR_LOCK_SELF(cr);
- }
- // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
- tb->type.e = basket_type_reserved;
- }
- RACTOR_UNLOCK_SELF(cr);
-}
-
-/* :nodoc: */
-static VALUE
-ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
-{
- rb_execution_context_t *ec = GET_EC();
- struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
- struct rb_ractor_basket *tb = &s->take_basket;
- struct rb_ractor_basket taken_basket;
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
- bool do_receive = !!RTEST(do_receivev);
- bool do_yield = !!RTEST(do_yieldv);
- VALUE ret_v, ret_r;
- enum rb_ractor_wait_status wait_status;
- struct rb_ractor_queue *rq = &cr->sync.recv_queue;
- struct rb_ractor_queue *ts = &cr->sync.takers_queue;
-
- RUBY_DEBUG_LOG("start");
-
- retry:
- RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
-
- // setup wait_status
- wait_status = wait_none;
- if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
- if (do_receive) wait_status |= wait_receiving;
- if (do_yield) wait_status |= wait_yielding;
-
- RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
-
- if (wait_status == wait_none) {
- rb_raise(rb_eRactorError, "no taking ractors");
- }
-
- // check recv_queue
- if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) {
- ret_r = ID2SYM(rb_intern("receive"));
- goto success;
- }
-
- // check takers
- if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
- ret_v = Qnil;
- ret_r = ID2SYM(rb_intern("yield"));
- goto success;
- }
-
- // check take_basket
- VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
- s->take_basket.type.e = basket_type_none;
- // kick all take target ractors
- st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
-
- RACTOR_LOCK_SELF(cr);
- {
- retry_waiting:
- while (1) {
- if (!basket_none_p(tb)) {
- RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
- tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
- break;
- }
- if (do_receive && !ractor_queue_empty_p(cr, rq)) {
- RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
- break;
- }
- if (do_yield && ractor_check_take_basket(cr, ts)) {
- RUBY_DEBUG_LOG("can yield");
- break;
- }
-
- ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, ractor_selector_wait_cleanup, tb);
- }
-
- taken_basket = *tb;
-
- // ensure
- // tb->type.e = basket_type_reserved # do it atomic in the following code
- if (taken_basket.type.e == basket_type_yielding ||
- RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
-
- if (basket_type_p(tb, basket_type_yielding)) {
- RACTOR_UNLOCK_SELF(cr);
- {
- rb_thread_sleep(0);
- }
- RACTOR_LOCK_SELF(cr);
- }
- goto retry_waiting;
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-
- // check the taken result
- switch (taken_basket.type.e) {
- case basket_type_none:
- VM_ASSERT(do_receive || do_yield);
- goto retry;
- case basket_type_yielding:
- rb_bug("unreachable");
- case basket_type_deleted: {
- ractor_selector_remove(selv, taken_basket.sender);
-
- rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
- if (ractor_take_will_lock(r, &taken_basket)) {
- RUBY_DEBUG_LOG("has_will");
- }
- else {
- RUBY_DEBUG_LOG("no will");
- // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
- // remove and retry wait
- goto retry;
- }
- break;
- }
- case basket_type_will:
- // no more messages
- ractor_selector_remove(selv, taken_basket.sender);
- break;
- default:
- break;
- }
-
- RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
-
- ret_v = ractor_basket_accept(&taken_basket);
- ret_r = taken_basket.sender;
- success:
- return rb_ary_new_from_args(2, ret_r, ret_v);
-}
-
-/*
- * call-seq:
- * wait(receive: false, yield_value: undef, move: false) -> [ractor, value]
- *
- * Waits until any ractor in _selector_ can be active.
- */
-static VALUE
-ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
-{
- VALUE options;
- ID keywords[3];
- VALUE values[3];
-
- keywords[0] = rb_intern("receive");
- keywords[1] = rb_intern("yield_value");
- keywords[2] = rb_intern("move");
-
- rb_scan_args(argc, argv, "0:", &options);
- rb_get_kwargs(options, keywords, 0, numberof(values), values);
- return ractor_selector__wait(selector,
- values[0] == Qundef ? Qfalse : RTEST(values[0]),
- values[1] != Qundef, values[1], values[2]);
-}
-
-static VALUE
-ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
-{
- VALUE selector = ractor_selector_create(klass);
-
- for (int i=0; i<argc; i++) {
- ractor_selector_add(selector, ractors[i]);
- }
-
- return selector;
-}
-
-static VALUE
-ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move)
-{
- VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector);
- VALUE result;
- int state;
-
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move);
- }
- EC_POP_TAG();
- if (state != TAG_NONE) {
- // ensure
- ractor_selector_clear(selector);
-
- // jump
- EC_JUMP_TAG(ec, state);
- }
-
- RB_GC_GUARD(ractors);
- return result;
-}
-
-// Ractor#close_incoming
-
-static VALUE
-ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- VALUE prev;
- rb_thread_t *r_th = NULL;
- if (r == rb_ec_ractor_ptr(ec)) {
- r_th = rb_ec_thread_ptr(ec);
- }
-
- RACTOR_LOCK(r);
- {
- if (!r->sync.incoming_port_closed) {
- prev = Qfalse;
- r->sync.incoming_port_closed = true;
- if (ractor_wakeup(r, r_th, wait_receiving, wakeup_by_close)) {
- VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
- RUBY_DEBUG_LOG("cancel receiving");
- }
- }
- else {
- prev = Qtrue;
- }
- }
- RACTOR_UNLOCK(r);
- return prev;
-}
-
-// Ractor#close_outgoing
-
-static VALUE
-ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- VALUE prev;
-
- RACTOR_LOCK(r);
- {
- struct rb_ractor_queue *ts = &r->sync.takers_queue;
- rb_ractor_t *tr;
- struct rb_ractor_basket b;
-
- if (!r->sync.outgoing_port_closed) {
- prev = Qfalse;
- r->sync.outgoing_port_closed = true;
- }
- else {
- VM_ASSERT(ractor_queue_empty_p(r, ts));
- prev = Qtrue;
- }
-
- // wakeup all taking ractors
- while (ractor_queue_deq(r, ts, &b)) {
- if (basket_type_p(&b, basket_type_take_basket)) {
- tr = RACTOR_PTR(b.sender);
- rb_thread_t *tr_th = b.sending_th;
- struct rb_ractor_basket *tb = b.p.take.basket;
-
- if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
- b.p.take.basket->sender = r->pub.self;
- if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) {
- rb_bug("unreachable");
- }
- RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender)));
- }
-
- if (b.p.take.config) {
- b.p.take.config->closed = true;
- }
-
- // TODO: deadlock-able?
- RACTOR_LOCK(tr);
- {
- ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_close);
- }
- RACTOR_UNLOCK(tr);
- }
- }
-
- // raising yielding Ractor
- ractor_wakeup(r, NULL, wait_yielding, wakeup_by_close);
-
- VM_ASSERT(ractor_queue_empty_p(r, ts));
- }
- RACTOR_UNLOCK(r);
- return prev;
-}
+#include "ractor_sync.c"
// creation/termination
@@ -2175,9 +440,7 @@ rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r)
rb_gc_ractor_cache_free(r->newobj_cache);
r->newobj_cache = NULL;
r->status_ = ractor_terminated;
- r->sync.outgoing_port_closed = true;
- r->sync.incoming_port_closed = true;
- r->sync.will_basket.type.e = basket_type_none;
+ ractor_sync_terminate_atfork(vm, r);
}
#endif
@@ -2194,15 +457,7 @@ rb_ractor_living_threads_init(rb_ractor_t *r)
static void
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
{
- ractor_queue_setup(&r->sync.recv_queue);
- ractor_queue_setup(&r->sync.takers_queue);
- rb_native_mutex_initialize(&r->sync.lock);
- rb_native_cond_initialize(&r->barrier_wait_cond);
-
-#ifdef RUBY_THREAD_WIN32_H
- rb_native_cond_initialize(&r->barrier_wait_cond);
-#endif
- ccan_list_head_init(&r->sync.wait.waiting_threads);
+ ractor_sync_init(r);
// thread management
rb_thread_sched_init(&r->threads.sched, false);
@@ -2255,69 +510,39 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL
return rv;
}
+#if 0
static VALUE
ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func)
{
VALUE block = rb_proc_new(func, Qnil);
return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block);
}
+#endif
static void
-ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
+ractor_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE result, bool exc)
{
- if (cr->sync.outgoing_port_closed) {
- return;
- }
-
- ASSERT_ractor_unlocking(cr);
-
- struct rb_ractor_queue *ts = &cr->sync.takers_queue;
- rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
-
- retry:
- if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
- // OK.
- }
- else {
- bool retry = false;
- RACTOR_LOCK(cr);
- {
- if (!ractor_check_take_basket(cr, ts)) {
- VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none);
- RUBY_DEBUG_LOG("leave a will");
- ractor_basket_fill_will(cr, cur_th, &cr->sync.will_basket, v, exc);
- }
- else {
- RUBY_DEBUG_LOG("rare timing!");
- retry = true; // another ractor is waiting for the yield.
- }
- }
- RACTOR_UNLOCK(cr);
-
- if (retry) goto retry;
- }
+ ractor_notify_exit(ec, cr, result, exc);
}
void
rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_yield_atexit(ec, cr, result, false);
+ ractor_atexit(ec, cr, result, false);
}
void
rb_ractor_atexit_exception(rb_execution_context_t *ec)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_yield_atexit(ec, cr, ec->errinfo, true);
+ ractor_atexit(ec, cr, ec->errinfo, true);
}
void
rb_ractor_teardown(rb_execution_context_t *ec)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_close_incoming(ec, cr);
- ractor_close_outgoing(ec, cr);
// sync with rb_ractor_terminate_interrupt_main_thread()
RB_VM_LOCKING() {
@@ -2330,7 +555,7 @@ void
rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
{
for (int i=0; i<len; i++) {
- ptr[i] = ractor_receive(ec, r);
+ ptr[i] = ractor_receive(ec, ractor_default_port(r));
}
}
@@ -2339,7 +564,7 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args
{
int len = RARRAY_LENINT(args);
for (int i=0; i<len; i++) {
- ractor_send(ec, r, RARRAY_AREF(args, i), false);
+ ractor_send(ec, ractor_default_port(r), RARRAY_AREF(args, i), false);
}
}
@@ -2642,35 +867,6 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self)
rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
}
-#ifndef USE_RACTOR_SELECTOR
-#define USE_RACTOR_SELECTOR 0
-#endif
-
-RUBY_SYMBOL_EXPORT_BEGIN
-void rb_init_ractor_selector(void);
-RUBY_SYMBOL_EXPORT_END
-
-/*
- * Document-class: Ractor::Selector
- * :nodoc: currently
- *
- * Selects multiple Ractors to be activated.
- */
-void
-rb_init_ractor_selector(void)
-{
- rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
- rb_undef_alloc_func(rb_cRactorSelector);
-
- rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
- rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
- rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
- rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
- rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
- rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1);
- rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4);
-}
-
/*
* Document-class: Ractor::ClosedError
*
@@ -2791,11 +987,7 @@ Init_Ractor(void)
rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
- // internal
-
-#if USE_RACTOR_SELECTOR
- rb_init_ractor_selector();
-#endif
+ Init_RactorPort();
}
void
@@ -4028,91 +2220,10 @@ ractor_local_value_store_if_absent(rb_execution_context_t *ec, VALUE self, VALUE
return rb_mutex_synchronize(cr->local_storage_store_lock, ractor_local_value_store_i, (VALUE)&data);
}
-// Ractor::Channel (emulate with Ractor)
-
-typedef rb_ractor_t rb_ractor_channel_t;
-
-static VALUE
-ractor_channel_func(RB_BLOCK_CALL_FUNC_ARGLIST(y, c))
-{
- rb_execution_context_t *ec = GET_EC();
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
-
- while (1) {
- int state;
-
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- VALUE obj = ractor_receive(ec, cr);
- ractor_yield(ec, cr, obj, Qfalse);
- }
- EC_POP_TAG();
-
- if (state) {
- // ignore the error
- break;
- }
- }
-
- return Qnil;
-}
-
-static VALUE
-rb_ractor_channel_new(void)
-{
-#if 0
- return rb_funcall(rb_const_get(rb_cRactor, rb_intern("Channel")), rb_intern("new"), 0);
-#else
- // class Channel
- // def self.new
- // Ractor.new do # func body
- // while true
- // obj = Ractor.receive
- // Ractor.yield obj
- // end
- // rescue Ractor::ClosedError
- // nil
- // end
- // end
- // end
-
- return ractor_create_func(rb_cRactor, Qnil, rb_str_new2("Ractor/channel"), rb_ary_new(), ractor_channel_func);
-#endif
-}
-
-static VALUE
-rb_ractor_channel_yield(rb_execution_context_t *ec, VALUE vch, VALUE obj)
-{
- VM_ASSERT(ec == rb_current_ec_noinline());
- rb_ractor_channel_t *ch = RACTOR_PTR(vch);
-
- ractor_send(ec, (rb_ractor_t *)ch, obj, Qfalse);
- return Qnil;
-}
-
-static VALUE
-rb_ractor_channel_take(rb_execution_context_t *ec, VALUE vch)
-{
- VM_ASSERT(ec == rb_current_ec_noinline());
- rb_ractor_channel_t *ch = RACTOR_PTR(vch);
-
- return ractor_take(ec, (rb_ractor_t *)ch);
-}
-
-static VALUE
-rb_ractor_channel_close(rb_execution_context_t *ec, VALUE vch)
-{
- VM_ASSERT(ec == rb_current_ec_noinline());
- rb_ractor_channel_t *ch = RACTOR_PTR(vch);
-
- ractor_close_incoming(ec, (rb_ractor_t *)ch);
- return ractor_close_outgoing(ec, (rb_ractor_t *)ch);
-}
-
// Ractor#require
struct cross_ractor_require {
- VALUE ch;
+ VALUE port;
VALUE result;
VALUE exception;
@@ -4179,9 +2290,8 @@ ractor_require_protect(struct cross_ractor_require *crr, VALUE (*func)(VALUE))
rb_rescue2(require_result_copy_body, (VALUE)crr,
require_result_copy_resuce, (VALUE)crr, rb_eException, 0);
- rb_ractor_channel_yield(GET_EC(), crr->ch, Qtrue);
+ ractor_port_send(GET_EC(), crr->port, Qtrue, Qfalse);
return Qnil;
-
}
static VALUE
@@ -4197,7 +2307,7 @@ rb_ractor_require(VALUE feature)
// TODO: make feature shareable
struct cross_ractor_require crr = {
.feature = feature, // TODO: ractor
- .ch = rb_ractor_channel_new(),
+ .port = ractor_port_new(GET_RACTOR()),
.result = Qundef,
.exception = Qundef,
};
@@ -4207,8 +2317,8 @@ rb_ractor_require(VALUE feature)
rb_ractor_interrupt_exec(main_r, ractore_require_func, &crr, 0);
// wait for require done
- rb_ractor_channel_take(ec, crr.ch);
- rb_ractor_channel_close(ec, crr.ch);
+ ractor_port_receive(ec, crr.port);
+ ractor_port_close(ec, crr.port);
if (crr.exception != Qundef) {
ractor_reset_belonging(crr.exception);
@@ -4248,7 +2358,7 @@ rb_ractor_autoload_load(VALUE module, ID name)
struct cross_ractor_require crr = {
.module = module,
.name = name,
- .ch = rb_ractor_channel_new(),
+ .port = ractor_port_new(GET_RACTOR()),
.result = Qundef,
.exception = Qundef,
};
@@ -4258,8 +2368,8 @@ rb_ractor_autoload_load(VALUE module, ID name)
rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, &crr, 0);
// wait for require done
- rb_ractor_channel_take(ec, crr.ch);
- rb_ractor_channel_close(ec, crr.ch);
+ ractor_port_receive(ec, crr.port);
+ ractor_port_close(ec, crr.port);
if (crr.exception != Qundef) {
rb_exc_raise(crr.exception);