From ef2bb61018cd9ccb5b61a3d91911e04a773da4a7 Mon Sep 17 00:00:00 2001 From: Koichi Sasada Date: Tue, 27 May 2025 03:58:04 +0900 Subject: `Ractor::Port` * Added `Ractor::Port` * `Ractor::Port#receive` (support multi-threads) * `Rcator::Port#close` * `Ractor::Port#closed?` * Added some methods * `Ractor#join` * `Ractor#value` * `Ractor#monitor` * `Ractor#unmonitor` * Removed some methods * `Ractor#take` * `Ractor.yield` * Change the spec * `Racotr.select` You can wait for multiple sequences of messages with `Ractor::Port`. ```ruby ports = 3.times.map{ Ractor::Port.new } ports.map.with_index do |port, ri| Ractor.new port,ri do |port, ri| 3.times{|i| port << "r#{ri}-#{i}"} end end p ports.each{|port| pp 3.times.map{port.receive}} ``` In this example, we use 3 ports, and 3 Ractors send messages to them respectively. We can receive a series of messages from each port. You can use `Ractor#value` to get the last value of a Ractor's block: ```ruby result = Ractor.new do heavy_task() end.value ``` You can wait for the termination of a Ractor with `Ractor#join` like this: ```ruby Ractor.new do some_task() end.join ``` `#value` and `#join` are similar to `Thread#value` and `Thread#join`. To implement `#join`, `Ractor#monitor` (and `Ractor#unmonitor`) is introduced. This commit changes `Ractor.select()` method. It now only accepts ports or Ractors, and returns when a port receives a message or a Ractor terminates. We removes `Ractor.yield` and `Ractor#take` because: * `Ractor::Port` supports most of similar use cases in a simpler manner. * Removing them significantly simplifies the code. We also change the internal thread scheduler code (thread_pthread.c): * During barrier synchronization, we keep the `ractor_sched` lock to avoid deadlocks. This lock is released by `rb_ractor_sched_barrier_end()` which is called at the end of operations that require the barrier. * fix potential deadlock issues by checking interrupts just before setting UBF. https://bugs.ruby-lang.org/issues/21262 --- ractor.c | 2076 +++----------------------------------------------------------- 1 file changed, 93 insertions(+), 1983 deletions(-) (limited to 'ractor.c') diff --git a/ractor.c b/ractor.c index b2446439a3..24a57ebf30 100644 --- a/ractor.c +++ b/ractor.c @@ -178,1853 +178,118 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) // Ractor data/mark/free -static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i); static void ractor_local_storage_mark(rb_ractor_t *r); static void ractor_local_storage_free(rb_ractor_t *r); -static void -ractor_queue_mark(struct rb_ractor_queue *rq) -{ - for (int i=0; icnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); - rb_gc_mark(b->sender); - - switch (b->type.e) { - case basket_type_yielding: - case basket_type_take_basket: - case basket_type_deleted: - case basket_type_reserved: - // ignore - break; - default: - rb_gc_mark(b->p.send.v); - } - } -} - -static void -ractor_mark(void *ptr) -{ - rb_ractor_t *r = (rb_ractor_t *)ptr; - - ractor_queue_mark(&r->sync.recv_queue); - ractor_queue_mark(&r->sync.takers_queue); - - rb_gc_mark(r->loc); - rb_gc_mark(r->name); - rb_gc_mark(r->r_stdin); - rb_gc_mark(r->r_stdout); - rb_gc_mark(r->r_stderr); - rb_hook_list_mark(&r->pub.hooks); - - if (r->threads.cnt > 0) { - rb_thread_t *th = 0; - ccan_list_for_each(&r->threads.set, th, lt_node) { - VM_ASSERT(th != NULL); - rb_gc_mark(th->self); - } - } - - ractor_local_storage_mark(r); -} - -static void -ractor_queue_free(struct rb_ractor_queue *rq) -{ - free(rq->baskets); -} - -static void -ractor_free(void *ptr) -{ - rb_ractor_t *r = (rb_ractor_t *)ptr; - RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); - rb_native_mutex_destroy(&r->sync.lock); - ractor_queue_free(&r->sync.recv_queue); - ractor_queue_free(&r->sync.takers_queue); - ractor_local_storage_free(r); - rb_hook_list_free(&r->pub.hooks); - - if (r->newobj_cache) { - RUBY_ASSERT(r == ruby_single_main_ractor); - - rb_gc_ractor_cache_free(r->newobj_cache); - r->newobj_cache = NULL; - } - - ruby_xfree(r); -} - -static size_t -ractor_queue_memsize(const struct rb_ractor_queue *rq) -{ - return sizeof(struct rb_ractor_basket) * rq->size; -} - -static size_t -ractor_memsize(const void *ptr) -{ - rb_ractor_t *r = (rb_ractor_t *)ptr; - - // TODO: more correct? - return sizeof(rb_ractor_t) + - ractor_queue_memsize(&r->sync.recv_queue) + - ractor_queue_memsize(&r->sync.takers_queue); -} - -static const rb_data_type_t ractor_data_type = { - "ractor", - { - ractor_mark, - ractor_free, - ractor_memsize, - NULL, // update - }, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */ -}; - -bool -rb_ractor_p(VALUE gv) -{ - if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) { - return true; - } - else { - return false; - } -} - -static inline rb_ractor_t * -RACTOR_PTR(VALUE self) -{ - VM_ASSERT(rb_ractor_p(self)); - rb_ractor_t *r = DATA_PTR(self); - return r; -} - -static rb_atomic_t ractor_last_id; - -#if RACTOR_CHECK_MODE > 0 -uint32_t -rb_ractor_current_id(void) -{ - if (GET_THREAD()->ractor == NULL) { - return 1; // main ractor - } - else { - return rb_ractor_id(GET_RACTOR()); - } -} -#endif - -// Ractor queue - -static void -ractor_queue_setup(struct rb_ractor_queue *rq) -{ - rq->size = 2; - rq->cnt = 0; - rq->start = 0; - rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); -} - -static struct rb_ractor_basket * -ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - if (r != NULL) ASSERT_ractor_locking(r); - return &rq->baskets[rq->start]; -} - -static struct rb_ractor_basket * -ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) -{ - if (r != NULL) ASSERT_ractor_locking(r); - return &rq->baskets[(rq->start + i) % rq->size]; -} - -static void -ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - if (rq->reserved_cnt == 0) { - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - rq->serial++; - } - else { - ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted; - } -} - -static bool -ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i) -{ - struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); - return basket_type_p(b, basket_type_deleted) || - basket_type_p(b, basket_type_reserved); -} - -static void -ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) { - ractor_queue_advance(r, rq); - } -} - -static bool -ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) -{ - ASSERT_ractor_locking(r); - - if (rq->cnt == 0) { - return true; - } - - ractor_queue_compact(r, rq); - - for (int i=0; icnt; i++) { - if (!ractor_queue_skip_p(r, rq, i)) { - return false; - } - } - - return true; -} - -static bool -ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - for (int i=0; icnt; i++) { - if (!ractor_queue_skip_p(r, rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(r, rq, i); - *basket = *b; - - // remove from queue - b->type.e = basket_type_deleted; - ractor_queue_compact(r, rq); - return true; - } - } - - return false; -} - -static void -ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - ASSERT_ractor_locking(r); - - if (rq->size <= rq->cnt) { - rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2); - for (int i=rq->size - rq->start; icnt; i++) { - rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size]; - } - rq->size *= 2; - } - // copy basket into queue - rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket; - // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt); -} - -static void -ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) -{ - basket->type.e = basket_type_deleted; -} - -// Ractor basket - -static VALUE ractor_reset_belonging(VALUE obj); // in this file - -static VALUE -ractor_basket_value(struct rb_ractor_basket *b) -{ - switch (b->type.e) { - case basket_type_ref: - break; - case basket_type_copy: - case basket_type_move: - case basket_type_will: - b->type.e = basket_type_ref; - b->p.send.v = ractor_reset_belonging(b->p.send.v); - break; - default: - rb_bug("unreachable"); - } - - return b->p.send.v; -} - -static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) -{ - VALUE v = ractor_basket_value(b); - - // a ractor's main thread had an error and yielded us this exception during its dying moments - if (b->p.send.exception) { - VALUE cause = v; - VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); - rb_ivar_set(err, rb_intern("@ractor"), b->sender); - rb_ec_setup_exception(NULL, err, cause); - rb_exc_raise(err); - } - - return v; -} - -// Ractor synchronizations - -#if USE_RUBY_DEBUG_LOG -static const char * -wait_status_str(enum rb_ractor_wait_status wait_status) -{ - switch ((int)wait_status) { - case wait_none: return "none"; - case wait_receiving: return "receiving"; - case wait_taking: return "taking"; - case wait_yielding: return "yielding"; - case wait_receiving|wait_taking: return "receiving|taking"; - case wait_receiving|wait_yielding: return "receiving|yielding"; - case wait_taking|wait_yielding: return "taking|yielding"; - case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; - } - rb_bug("unreachable"); -} - -static const char * -wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status) -{ - switch (wakeup_status) { - case wakeup_none: return "none"; - case wakeup_by_send: return "by_send"; - case wakeup_by_yield: return "by_yield"; - case wakeup_by_take: return "by_take"; - case wakeup_by_close: return "by_close"; - case wakeup_by_interrupt: return "by_interrupt"; - case wakeup_by_retry: return "by_retry"; - } - rb_bug("unreachable"); -} - -static const char * -basket_type_name(enum rb_ractor_basket_type type) -{ - switch (type) { - case basket_type_none: return "none"; - case basket_type_ref: return "ref"; - case basket_type_copy: return "copy"; - case basket_type_move: return "move"; - case basket_type_will: return "will"; - case basket_type_deleted: return "deleted"; - case basket_type_reserved: return "reserved"; - case basket_type_take_basket: return "take_basket"; - case basket_type_yielding: return "yielding"; - } - VM_ASSERT(0); - return NULL; -} -#endif // USE_RUBY_DEBUG_LOG - -static rb_thread_t * -ractor_sleeping_by(const rb_ractor_t *r, rb_thread_t *th, enum rb_ractor_wait_status wait_status) -{ - if (th) { - if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) { - return th; - } - } - else { - // find any thread that has this ractor wait status that is blocked - ccan_list_for_each(&r->sync.wait.waiting_threads, th, ractor_waiting.waiting_node) { - if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) { - return th; - } - } - } - return NULL; -} - -#ifdef RUBY_THREAD_PTHREAD_H -// thread_*.c -void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th); -#else - -// win32 -static void -rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th) -{ - (void)r; - ASSERT_ractor_locking(r); - rb_native_cond_signal(&th->ractor_waiting.cond); - -} -#endif - - -/* - * Wakeup `r` if the given `th` is blocked and has the given ractor `wait_status`. - * Wakeup any blocked thread in `r` with the given ractor `wait_status` if `th` is NULL. - */ -static bool -ractor_wakeup(rb_ractor_t *r, rb_thread_t *th /* can be NULL */, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status) -{ - ASSERT_ractor_locking(r); - - RUBY_DEBUG_LOG("r:%u wait:%s wakeup:%s", - rb_ractor_id(r), - wait_status_str(wait_status), - wakeup_status_str(wakeup_status)); - - if ((th = ractor_sleeping_by(r, th, wait_status)) != NULL) { - th->ractor_waiting.wakeup_status = wakeup_status; - rb_ractor_sched_wakeup(r, th); - return true; - } - else { - return false; - } -} - -// unblock function (UBF). This gets called when another thread on this or another ractor sets our thread's interrupt flag. -// This is not async-safe. -static void -ractor_sleep_interrupt(void *ptr) -{ - rb_execution_context_t *ec = ptr; - rb_ractor_t *r = rb_ec_ractor_ptr(ec); - rb_thread_t *th = rb_ec_thread_ptr(ec); - - RACTOR_LOCK(r); - { - ractor_wakeup(r, th, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt); - } - RACTOR_UNLOCK(r); -} - -typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p); - -// Checks the current thread for ruby interrupts and runs the cleanup function `cf_func` with `cf_data` if -// `rb_ec_check_ints` is going to raise. See the `rb_threadptr_execute_interrupts` for info on when it can raise. -static void -ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, ractor_sleep_cleanup_function cf_func, void *cf_data) -{ - if (cur_th->ractor_waiting.wait_status != wait_none) { - enum rb_ractor_wait_status prev_wait_status = cur_th->ractor_waiting.wait_status; - cur_th->ractor_waiting.wait_status = wait_none; - cur_th->ractor_waiting.wakeup_status = wakeup_by_interrupt; - - RACTOR_UNLOCK(cr); - { - if (cf_func) { - enum ruby_tag_type state; - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - rb_ec_check_ints(ec); - } - EC_POP_TAG(); - - if (state) { - (*cf_func)(cr, cf_data); // cleanup function is run after the ubf, if it had ubf - EC_JUMP_TAG(ec, state); - } - } - else { - rb_ec_check_ints(ec); - } - } - - RACTOR_LOCK(cr); - cur_th->ractor_waiting.wait_status = prev_wait_status; - } -} - -#ifdef RUBY_THREAD_PTHREAD_H -void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf); -#else - -static void -ractor_cond_wait(rb_ractor_t *r, rb_thread_t *th) -{ -#if RACTOR_CHECK_MODE > 0 - VALUE locked_by = r->sync.locked_by; - r->sync.locked_by = Qnil; -#endif - rb_native_cond_wait(&th->ractor_waiting.cond, &r->sync.lock); - -#if RACTOR_CHECK_MODE > 0 - r->sync.locked_by = locked_by; -#endif -} - -static void * -ractor_sleep_wo_gvl(void *ptr) -{ - rb_ractor_t *cr = ptr; - rb_execution_context_t *ec = cr->threads.running_ec; - VM_ASSERT(GET_EC() == ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - RACTOR_LOCK_SELF(cr); - { - VM_ASSERT(cur_th->ractor_waiting.wait_status != wait_none); - // it's possible that another ractor has woken us up (ractor_wakeup), - // so check this condition - if (cur_th->ractor_waiting.wakeup_status == wakeup_none) { - cur_th->status = THREAD_STOPPED_FOREVER; - ractor_cond_wait(cr, cur_th); - cur_th->status = THREAD_RUNNABLE; - VM_ASSERT(cur_th->ractor_waiting.wakeup_status != wakeup_none); - } - else { - RUBY_DEBUG_LOG("rare timing, no cond wait"); - } - cur_th->ractor_waiting.wait_status = wait_none; - } - RACTOR_UNLOCK_SELF(cr); - return NULL; -} - -static void -rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_ractor_sleep_interrupt) -{ - ASSERT_ractor_locking(cr); - rb_thread_t *th = rb_ec_thread_ptr(ec); - struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node; - VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked - ccan_list_add(&cr->sync.wait.waiting_threads, waitn); - RACTOR_UNLOCK(cr); - { - rb_nogvl(ractor_sleep_wo_gvl, cr, ubf_ractor_sleep_interrupt, ec, RB_NOGVL_INTR_FAIL); - } - RACTOR_LOCK(cr); - ccan_list_del_init(waitn); -} -#endif - -/* - * Sleep the current ractor's current thread until another ractor wakes us up or another thread calls our unblock function. - * The following ractor actions can cause this function to be called: - * Ractor#take (wait_taking) - * Ractor.yield (wait_yielding) - * Ractor.receive (wait_receiving) - * Ractor.select (can be a combination of the above wait states, depending on the states of the ractors passed to Ractor.select) - */ -static enum rb_ractor_wakeup_status -ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status, - ractor_sleep_cleanup_function cf_func, void *cf_data) -{ - ASSERT_ractor_locking(cr); - enum rb_ractor_wakeup_status wakeup_status; - VM_ASSERT(GET_RACTOR() == cr); - - VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none); - VM_ASSERT(wait_status != wait_none); - cur_th->ractor_waiting.wait_status = wait_status; - cur_th->ractor_waiting.wakeup_status = wakeup_none; - - // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr, - // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); - - RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status)); - - while (cur_th->ractor_waiting.wakeup_status == wakeup_none) { - rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt); - ractor_check_ints(ec, cr, cur_th, cf_func, cf_data); - } - - cur_th->ractor_waiting.wait_status = wait_none; - - wakeup_status = cur_th->ractor_waiting.wakeup_status; - cur_th->ractor_waiting.wakeup_status = wakeup_none; - - RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status)); - - ASSERT_ractor_locking(cr); - return wakeup_status; -} - -static enum rb_ractor_wakeup_status -ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status) -{ - return ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, 0, NULL); -} - -// Ractor.receive - -static void -ractor_recursive_receive_if(rb_thread_t *th) -{ - if (th->ractor_waiting.receiving_mutex && rb_mutex_owned_p(th->ractor_waiting.receiving_mutex)) { - rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); - } -} - -static VALUE -ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) -{ - struct rb_ractor_basket basket; - ractor_recursive_receive_if(rb_ec_thread_ptr(ec)); - bool received = false; - - RACTOR_LOCK_SELF(cr); - { - RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt); - received = ractor_queue_deq(cr, rq, &basket); - } - RACTOR_UNLOCK_SELF(cr); - - if (!received) { - if (cr->sync.incoming_port_closed) { - rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); - } - return Qundef; - } - else { - return ractor_basket_accept(&basket); - } -} - -static void -ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq) -{ - VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - ractor_recursive_receive_if(cur_th); - - RACTOR_LOCK(cr); - { - while (ractor_queue_empty_p(cr, rq) && !cr->sync.incoming_port_closed) { - ractor_sleep(ec, cr, cur_th, wait_receiving); - } - } - RACTOR_UNLOCK(cr); -} - -static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) -{ - VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); - VALUE v; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - - while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) { - ractor_wait_receive(ec, cr, rq); - } - - return v; -} - -#if 0 -static void -rq_dump(struct rb_ractor_queue *rq) -{ - bool bug = false; - for (int i=0; icnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i); - fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), - (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); - if (basket_type_p(b, basket_type_reserved) bug = true; - } - if (bug) rb_bug("!!"); -} -#endif - -struct receive_block_data { - rb_ractor_t *cr; - rb_thread_t *th; - struct rb_ractor_queue *rq; - VALUE v; - int index; - bool success; -}; - -static void -ractor_receive_if_lock(rb_thread_t *th) -{ - VALUE m = th->ractor_waiting.receiving_mutex; - if (m == Qfalse) { - m = th->ractor_waiting.receiving_mutex = rb_mutex_new(); - } - rb_mutex_lock(m); -} - -static VALUE -receive_if_body(VALUE ptr) -{ - struct receive_block_data *data = (struct receive_block_data *)ptr; - - ractor_receive_if_lock(data->th); - VALUE block_result = rb_yield(data->v); - rb_ractor_t *cr = data->cr; - - RACTOR_LOCK_SELF(cr); - { - struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); - VM_ASSERT(basket_type_p(b, basket_type_reserved)); - data->rq->reserved_cnt--; - - if (RTEST(block_result)) { - ractor_queue_delete(cr, data->rq, b); - ractor_queue_compact(cr, data->rq); - } - else { - b->type.e = basket_type_ref; - } - } - RACTOR_UNLOCK_SELF(cr); - - data->success = true; - - if (RTEST(block_result)) { - return data->v; - } - else { - return Qundef; - } -} - -static VALUE -receive_if_ensure(VALUE v) -{ - struct receive_block_data *data = (struct receive_block_data *)v; - rb_ractor_t *cr = data->cr; - rb_thread_t *cur_th = data->th; - - if (!data->success) { - RACTOR_LOCK_SELF(cr); - { - struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index); - VM_ASSERT(basket_type_p(b, basket_type_reserved)); - b->type.e = basket_type_deleted; - data->rq->reserved_cnt--; - } - RACTOR_UNLOCK_SELF(cr); - } - - rb_mutex_unlock(cur_th->ractor_waiting.receiving_mutex); - return Qnil; -} - -static VALUE -ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) -{ - if (!RTEST(b)) rb_raise(rb_eArgError, "no block given"); - - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - unsigned int serial = (unsigned int)-1; - int index = 0; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - - while (1) { - VALUE v = Qundef; - - ractor_wait_receive(ec, cr, rq); - - RACTOR_LOCK_SELF(cr); - { - if (serial != rq->serial) { - serial = rq->serial; - index = 0; - } - - // check newer version - for (int i=index; icnt; i++) { - if (!ractor_queue_skip_p(cr, rq, i)) { - struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i); - v = ractor_basket_value(b); - b->type.e = basket_type_reserved; - rq->reserved_cnt++; - index = i; - break; - } - } - } - RACTOR_UNLOCK_SELF(cr); - - if (!UNDEF_P(v)) { - struct receive_block_data data = { - .cr = cr, - .th = cur_th, - .rq = rq, - .v = v, - .index = index, - .success = false, - }; - - VALUE result = rb_ensure(receive_if_body, (VALUE)&data, - receive_if_ensure, (VALUE)&data); - - if (!UNDEF_P(result)) return result; - index++; - } - - RUBY_VM_CHECK_INTS(ec); - } -} - -static void -ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) -{ - bool closed = false; - - RACTOR_LOCK(r); - { - if (r->sync.incoming_port_closed) { - closed = true; - } - else { - ractor_queue_enq(r, &r->sync.recv_queue, b); - // wakeup any receiving thread in `r` - ractor_wakeup(r, NULL, wait_receiving, wakeup_by_send); - } - } - RACTOR_UNLOCK(r); - - if (closed) { - rb_raise(rb_eRactorClosedError, "The incoming-port is already closed"); - } -} - -// Ractor#send - -static VALUE ractor_move(VALUE obj); // in this file -static VALUE ractor_copy(VALUE obj); // in this file - -static void -ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype) -{ - VALUE v; - enum rb_ractor_basket_type type; - - if (rb_ractor_shareable_p(obj)) { - type = basket_type_ref; - v = obj; - } - else if (!RTEST(move)) { - v = ractor_copy(obj); - type = basket_type_copy; - } - else { - type = basket_type_move; - v = ractor_move(obj); - } - - *pobj = v; - *ptype = type; -} - -static void -ractor_basket_fill_(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc) -{ - VM_ASSERT(cr == GET_RACTOR()); - - basket->sender = cr->pub.self; - basket->sending_th = cur_th; - basket->p.send.exception = exc; - basket->p.send.v = obj; -} - -static void -ractor_basket_fill(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc) -{ - VALUE v; - enum rb_ractor_basket_type type; - ractor_basket_prepare_contents(obj, move, &v, &type); - ractor_basket_fill_(cr, cur_th, basket, v, exc); - basket->type.e = type; -} - -static void -ractor_basket_fill_will(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc) -{ - ractor_basket_fill_(cr, cur_th, basket, obj, exc); - basket->type.e = basket_type_will; -} - -static VALUE -ractor_send(rb_execution_context_t *ec, rb_ractor_t *recv_r, VALUE obj, VALUE move) -{ - struct rb_ractor_basket basket; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - // TODO: Ractor local GC - ractor_basket_fill(cr, cur_th, &basket, obj, move, false); - ractor_send_basket(ec, recv_r, &basket); - return recv_r->pub.self; -} - -// Ractor#take - -static bool -ractor_take_has_will(rb_ractor_t *r) -{ - ASSERT_ractor_locking(r); - - return basket_type_p(&r->sync.will_basket, basket_type_will); -} - -static bool -ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b) -{ - ASSERT_ractor_locking(r); - - if (ractor_take_has_will(r)) { - *b = r->sync.will_basket; - r->sync.will_basket.type.e = basket_type_none; - return true; - } - else { - VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none)); - return false; - } -} - -static bool -ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b) -{ - ASSERT_ractor_unlocking(r); - bool taken; - - RACTOR_LOCK(r); - { - taken = ractor_take_will(r, b); - } - RACTOR_UNLOCK(r); - - return taken; -} - -static bool -ractor_register_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket, - bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error) -{ - struct rb_ractor_basket b = { - .type.e = basket_type_take_basket, - .sender = cr->pub.self, - .sending_th = cur_th, - .p = { - .take = { - .basket = take_basket, // pointer to our stack value saved in ractor `r` queue - .config = config, - }, - }, - }; - bool closed = false; - - RACTOR_LOCK(r); - { - if (is_take && ractor_take_will(r, take_basket)) { - RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r)); - } - else if (!is_take && ractor_take_has_will(r)) { - RUBY_DEBUG_LOG("has_will"); - VM_ASSERT(config != NULL); - config->closed = true; - } - else if (r->sync.outgoing_port_closed) { - closed = true; - } - else { - RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r)); - ractor_queue_enq(r, &r->sync.takers_queue, &b); - - if (basket_none_p(take_basket)) { - // wakeup any thread in `r` that has yielded, if there is any. - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take); - } - } - } - RACTOR_UNLOCK(r); - - if (closed) { - if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - return false; - } - else { - return true; - } -} - -static bool -ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket) -{ - struct rb_ractor_queue *ts = &r->sync.takers_queue; - bool deleted = false; - - RACTOR_LOCK(r); - { - if (r->sync.outgoing_port_closed) { - // ok - } - else { - for (int i=0; icnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); - if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) { - ractor_queue_delete(r, ts, b); - deleted = true; - } - } - if (deleted) { - ractor_queue_compact(r, ts); - } - } - } - RACTOR_UNLOCK(r); - - return deleted; -} - -static VALUE -ractor_try_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *recv_r, struct rb_ractor_basket *take_basket) -{ - bool taken; - - RACTOR_LOCK_SELF(cr); - { - // If it hasn't yielded yet or is currently in the process of yielding, sleep more - if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { - taken = false; - } - else { - taken = true; // basket type might be, for ex, basket_type_copy if value was copied during yield - } - } - RACTOR_UNLOCK_SELF(cr); - - if (taken) { - RUBY_DEBUG_LOG("taken"); - if (basket_type_p(take_basket, basket_type_deleted)) { - VM_ASSERT(recv_r->sync.outgoing_port_closed); - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - return ractor_basket_accept(take_basket); - } - else { - RUBY_DEBUG_LOG("not taken"); - return Qundef; - } -} - - -#if VM_CHECK_MODE > 0 -static bool -ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb) -{ - bool ret = false; - struct rb_ractor_queue *ts = &r->sync.takers_queue; - - RACTOR_LOCK(r); - { - for (int i=0; icnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(r, ts, i); - if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) { - ret = true; - break; - } - } - } - RACTOR_UNLOCK(r); - - return ret; -} -#endif - -// cleanup function, cr is unlocked -static void -ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb) -{ - retry: - if (basket_none_p(tb)) { // not yielded yet - if (!ractor_deregister_take(r, tb)) { - // not in r's takers queue - rb_thread_sleep(0); - goto retry; - } - } - else { - VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb)); - } -} - -struct take_wait_take_cleanup_data { - rb_ractor_t *r; - struct rb_ractor_basket *tb; -}; - -static void -ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr) -{ - struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr; - ractor_take_cleanup(cr, data->r, data->tb); -} - -static void -ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket) -{ - struct take_wait_take_cleanup_data data = { - .r = r, - .tb = take_basket, - }; - - RACTOR_LOCK_SELF(cr); - { - if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) { - ractor_sleep_with_cleanup(ec, cr, cur_th, wait_taking, ractor_wait_take_cleanup, &data); - } - } - RACTOR_UNLOCK_SELF(cr); -} - -static VALUE -ractor_take(rb_execution_context_t *ec, rb_ractor_t *recv_r) -{ - RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(recv_r)); - VALUE v; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - struct rb_ractor_basket take_basket = { - .type.e = basket_type_none, - .sender = 0, - }; - - ractor_register_take(cr, cur_th, recv_r, &take_basket, true, NULL, false); - - while (UNDEF_P(v = ractor_try_take(cr, cur_th, recv_r, &take_basket))) { - ractor_wait_take(ec, cr, cur_th, recv_r, &take_basket); - } - - VM_ASSERT(!basket_none_p(&take_basket)); // might be, for ex, basket_type_copy - VM_ASSERT(!ractor_check_specific_take_basket_lock(recv_r, &take_basket)); - - return v; -} - -// Ractor.yield - -static bool -ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs) -{ - ASSERT_ractor_locking(cr); - - for (int i=0; icnt; i++) { - struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i); - if (basket_type_p(b, basket_type_take_basket) && - basket_none_p(b->p.take.basket)) { - return true; - } - } - - return false; -} - -// Find another ractor that is taking from this ractor, so we can yield to it -static bool -ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b) -{ - ASSERT_ractor_unlocking(cr); - struct rb_ractor_basket *first_tb = NULL; - bool found = false; - - RACTOR_LOCK_SELF(cr); - { - while (ractor_queue_deq(cr, rs, b)) { - if (basket_type_p(b, basket_type_take_basket)) { // some other ractor is taking - struct rb_ractor_basket *tb = b->p.take.basket; - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { - found = true; // payload basket is now "yielding" type - break; - } - else { - ractor_queue_enq(cr, rs, b); - if (first_tb == NULL) first_tb = tb; - struct rb_ractor_basket *head = ractor_queue_head(cr, rs); - VM_ASSERT(head != NULL); - if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) { - break; // loop detected - } - } - } - else { - VM_ASSERT(basket_none_p(b)); - } - } - - if (found && b->p.take.config && !b->p.take.config->oneshot) { - ractor_queue_enq(cr, rs, b); - } - } - RACTOR_UNLOCK_SELF(cr); - - return found; -} - -// Try yielding to a taking ractor -static bool -ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will) -{ - // Don't lock yielding ractor at same time as taking ractor. This could deadlock due to timing - // issue because we don't have a lock hierarchy. - ASSERT_ractor_unlocking(cr); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - struct rb_ractor_basket b; - - if (ractor_deq_take_basket(cr, ts, &b)) { // deq a take basket from takers queue of `cr` into `b` - VM_ASSERT(basket_type_p(&b, basket_type_take_basket)); - VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding)); - - rb_ractor_t *tr = RACTOR_PTR(b.sender); // taking ractor - rb_thread_t *tr_th = b.sending_th; // taking thread - struct rb_ractor_basket *tb = b.p.take.basket; // payload basket - enum rb_ractor_basket_type type; - - RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr)); - - if (is_will) { - type = basket_type_will; // last message - } - else { - enum ruby_tag_type state; - - // begin - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - // TODO: Ractor local GC - ractor_basket_prepare_contents(obj, move, &obj, &type); - } - EC_POP_TAG(); - // rescue ractor copy/move error, then re-raise - if (state) { - RACTOR_LOCK_SELF(cr); - { - b.p.take.basket->type.e = basket_type_none; - ractor_queue_enq(cr, ts, &b); - } - RACTOR_UNLOCK_SELF(cr); - EC_JUMP_TAG(ec, state); - } - } - - RACTOR_LOCK(tr); - { - VM_ASSERT(basket_type_p(tb, basket_type_yielding)); - // fill atomic - RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr)); - ractor_basket_fill_(cr, cur_th, tb, obj, exc); // fill the take basket payload - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) { - rb_bug("unreachable"); - } - ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_yield); - } - RACTOR_UNLOCK(tr); - - return true; - } - else if (cr->sync.outgoing_port_closed) { - rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - } - else { - RUBY_DEBUG_LOG("no take basket"); - return false; - } -} - -static void -ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts) -{ - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - RACTOR_LOCK_SELF(cr); - { - while (!ractor_check_take_basket(cr, ts) && !cr->sync.outgoing_port_closed) { - ractor_sleep(ec, cr, cur_th, wait_yielding); - } - } - RACTOR_UNLOCK_SELF(cr); -} - -// In order to yield, we wait until our takers queue has at least one element. Then, we wakeup a taker. -static VALUE -ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move) -{ - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - - while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) { - ractor_wait_yield(ec, cr, ts); - } - - return Qnil; -} - -// Ractor::Selector - -struct rb_ractor_selector { - rb_ractor_t *r; - struct rb_ractor_basket take_basket; - st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *) -}; - -static int -ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data) -{ - const rb_ractor_t *r = (rb_ractor_t *)key; - rb_gc_mark(r->pub.self); - return ST_CONTINUE; -} - -static void -ractor_selector_mark(void *ptr) -{ - struct rb_ractor_selector *s = ptr; - - if (s->take_ractors) { - st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0); - } - - switch (s->take_basket.type.e) { - case basket_type_ref: - case basket_type_copy: - case basket_type_move: - case basket_type_will: - rb_gc_mark(s->take_basket.sender); - rb_gc_mark(s->take_basket.p.send.v); - break; - default: - break; - } -} - -static int -ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data) -{ - struct rb_ractor_selector *s = (struct rb_ractor_selector *)data; - struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val; - - if (!config->closed) { - ractor_deregister_take((rb_ractor_t *)key, &s->take_basket); - } - free(config); - return ST_CONTINUE; -} - -static void -ractor_selector_free(void *ptr) -{ - struct rb_ractor_selector *s = ptr; - st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s); - st_free_table(s->take_ractors); - ruby_xfree(ptr); -} - -static size_t -ractor_selector_memsize(const void *ptr) -{ - const struct rb_ractor_selector *s = ptr; - return sizeof(struct rb_ractor_selector) + - st_memsize(s->take_ractors) + - s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config); -} - -static const rb_data_type_t ractor_selector_data_type = { - "ractor/selector", - { - ractor_selector_mark, - ractor_selector_free, - ractor_selector_memsize, - NULL, // update - }, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY, -}; - -static struct rb_ractor_selector * -RACTOR_SELECTOR_PTR(VALUE selv) -{ - VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type)); - - return (struct rb_ractor_selector *)DATA_PTR(selv); -} - -// Ractor::Selector.new - -static VALUE -ractor_selector_create(VALUE klass) -{ - struct rb_ractor_selector *s; - VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s); - s->take_basket.type.e = basket_type_reserved; - s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config - return selv; -} - -// Ractor::Selector#add(r) - -/* - * call-seq: - * add(ractor) -> ractor - * - * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added. - * Returns _ractor_. - */ -static VALUE -ractor_selector_add(VALUE selv, VALUE rv) -{ - if (!rb_ractor_p(rv)) { - rb_raise(rb_eArgError, "Not a ractor object"); - } - - rb_ractor_t *r = RACTOR_PTR(rv); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) { - rb_raise(rb_eArgError, "already added"); - } - - struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config)); - VM_ASSERT(config != NULL); - config->closed = false; - config->oneshot = false; - - if (ractor_register_take(GET_RACTOR(), GET_THREAD(), r, &s->take_basket, false, config, true)) { - st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config); - } - - return rv; -} - -// Ractor::Selector#remove(r) - -/* call-seq: - * remove(ractor) -> ractor - * - * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added. - * Returns the removed _ractor_. - */ -static VALUE -ractor_selector_remove(VALUE selv, VALUE rv) -{ - if (!rb_ractor_p(rv)) { - rb_raise(rb_eArgError, "Not a ractor object"); - } - - rb_ractor_t *r = RACTOR_PTR(rv); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r)); - - if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) { - rb_raise(rb_eArgError, "not added yet"); - } - - ractor_deregister_take(r, &s->take_basket); - struct rb_ractor_selector_take_config *config; - st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config); - free(config); - - return rv; -} - -// Ractor::Selector#clear - -struct ractor_selector_clear_data { - VALUE selv; - rb_execution_context_t *ec; -}; - -static int -ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data) -{ - VALUE selv = (VALUE)data; - rb_ractor_t *r = (rb_ractor_t *)key; - ractor_selector_remove(selv, r->pub.self); - return ST_CONTINUE; -} +static void ractor_sync_mark(rb_ractor_t *r); +static void ractor_sync_free(rb_ractor_t *r); +static size_t ractor_sync_memsize(const rb_ractor_t *r); +static void ractor_sync_init(rb_ractor_t *r); -/* - * call-seq: - * clear -> self - * - * Removes all ractors from +self+. Raises +self+. - */ -static VALUE -ractor_selector_clear(VALUE selv) -{ - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - - st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv); - st_clear(s->take_ractors); - return selv; -} - -/* - * call-seq: - * empty? -> true or false - * - * Returns +true+ if no ractor is added. - */ -static VALUE -ractor_selector_empty_p(VALUE selv) -{ - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse; -} - -static int -ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat) +static void +ractor_mark(void *ptr) { - rb_ractor_t *r = (rb_ractor_t *)key; - struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat; - int ret; - - if (!basket_none_p(tb)) { - RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e)); - return ST_STOP; - } + rb_ractor_t *r = (rb_ractor_t *)ptr; - RACTOR_LOCK(r); - { - if (basket_type_p(&r->sync.will_basket, basket_type_will)) { - RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r)); + // mark received messages + ractor_sync_mark(r); - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) { - ractor_take_will(r, tb); - ret = ST_STOP; - } - else { - RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e)); - ret = ST_CONTINUE; - } - } - else if (r->sync.outgoing_port_closed) { - RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r)); + rb_gc_mark(r->loc); + rb_gc_mark(r->name); + rb_gc_mark(r->r_stdin); + rb_gc_mark(r->r_stdout); + rb_gc_mark(r->r_stderr); + rb_hook_list_mark(&r->pub.hooks); - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) { - tb->sender = r->pub.self; - ret = ST_STOP; - } - else { - RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e)); - ret = ST_CONTINUE; - } - } - else { - RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r)); - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take); - ret = ST_CONTINUE; + if (r->threads.cnt > 0) { + rb_thread_t *th = 0; + ccan_list_for_each(&r->threads.set, th, lt_node) { + VM_ASSERT(th != NULL); + rb_gc_mark(th->self); } } - RACTOR_UNLOCK(r); - return ret; + ractor_local_storage_mark(r); } -// Ractor::Selector#wait - -// cleanup function, cr is unlocked static void -ractor_selector_wait_cleanup(rb_ractor_t *cr, void *ptr) -{ - struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr; - - RACTOR_LOCK_SELF(cr); - { - while (basket_type_p(tb, basket_type_yielding)) { - RACTOR_UNLOCK_SELF(cr); - { - rb_thread_sleep(0); - } - RACTOR_LOCK_SELF(cr); - } - // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately. - tb->type.e = basket_type_reserved; - } - RACTOR_UNLOCK_SELF(cr); -} - -/* :nodoc: */ -static VALUE -ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move) +ractor_free(void *ptr) { - rb_execution_context_t *ec = GET_EC(); - struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv); - struct rb_ractor_basket *tb = &s->take_basket; - struct rb_ractor_basket taken_basket; - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - bool do_receive = !!RTEST(do_receivev); - bool do_yield = !!RTEST(do_yieldv); - VALUE ret_v, ret_r; - enum rb_ractor_wait_status wait_status; - struct rb_ractor_queue *rq = &cr->sync.recv_queue; - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - - RUBY_DEBUG_LOG("start"); - - retry: - RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries); - - // setup wait_status - wait_status = wait_none; - if (s->take_ractors->num_entries > 0) wait_status |= wait_taking; - if (do_receive) wait_status |= wait_receiving; - if (do_yield) wait_status |= wait_yielding; - - RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status)); - - if (wait_status == wait_none) { - rb_raise(rb_eRactorError, "no taking ractors"); - } - - // check recv_queue - if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) { - ret_r = ID2SYM(rb_intern("receive")); - goto success; - } - - // check takers - if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) { - ret_v = Qnil; - ret_r = ID2SYM(rb_intern("yield")); - goto success; - } - - // check take_basket - VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved)); - s->take_basket.type.e = basket_type_none; - // kick all take target ractors - st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb); - - RACTOR_LOCK_SELF(cr); - { - retry_waiting: - while (1) { - if (!basket_none_p(tb)) { - RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e), - tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0); - break; - } - if (do_receive && !ractor_queue_empty_p(cr, rq)) { - RUBY_DEBUG_LOG("can receive (%d)", rq->cnt); - break; - } - if (do_yield && ractor_check_take_basket(cr, ts)) { - RUBY_DEBUG_LOG("can yield"); - break; - } - - ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, ractor_selector_wait_cleanup, tb); - } - - taken_basket = *tb; - - // ensure - // tb->type.e = basket_type_reserved # do it atomic in the following code - if (taken_basket.type.e == basket_type_yielding || - RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) { + rb_ractor_t *r = (rb_ractor_t *)ptr; + RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r)); + rb_native_mutex_destroy(&r->sync.lock); +#ifdef RUBY_THREAD_WIN32_H + rb_native_cond_destroy(&r->sync.wakeup_cond); +#endif + ractor_local_storage_free(r); + rb_hook_list_free(&r->pub.hooks); - if (basket_type_p(tb, basket_type_yielding)) { - RACTOR_UNLOCK_SELF(cr); - { - rb_thread_sleep(0); - } - RACTOR_LOCK_SELF(cr); - } - goto retry_waiting; - } - } - RACTOR_UNLOCK_SELF(cr); + if (r->newobj_cache) { + RUBY_ASSERT(r == ruby_single_main_ractor); - // check the taken result - switch (taken_basket.type.e) { - case basket_type_none: - VM_ASSERT(do_receive || do_yield); - goto retry; - case basket_type_yielding: - rb_bug("unreachable"); - case basket_type_deleted: { - ractor_selector_remove(selv, taken_basket.sender); - - rb_ractor_t *r = RACTOR_PTR(taken_basket.sender); - if (ractor_take_will_lock(r, &taken_basket)) { - RUBY_DEBUG_LOG("has_will"); - } - else { - RUBY_DEBUG_LOG("no will"); - // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); - // remove and retry wait - goto retry; - } - break; - } - case basket_type_will: - // no more messages - ractor_selector_remove(selv, taken_basket.sender); - break; - default: - break; + rb_gc_ractor_cache_free(r->newobj_cache); + r->newobj_cache = NULL; } - RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e)); - - ret_v = ractor_basket_accept(&taken_basket); - ret_r = taken_basket.sender; - success: - return rb_ary_new_from_args(2, ret_r, ret_v); + ractor_sync_free(r); + ruby_xfree(r); } -/* - * call-seq: - * wait(receive: false, yield_value: undef, move: false) -> [ractor, value] - * - * Waits until any ractor in _selector_ can be active. - */ -static VALUE -ractor_selector_wait(int argc, VALUE *argv, VALUE selector) +static size_t +ractor_memsize(const void *ptr) { - VALUE options; - ID keywords[3]; - VALUE values[3]; - - keywords[0] = rb_intern("receive"); - keywords[1] = rb_intern("yield_value"); - keywords[2] = rb_intern("move"); + rb_ractor_t *r = (rb_ractor_t *)ptr; - rb_scan_args(argc, argv, "0:", &options); - rb_get_kwargs(options, keywords, 0, numberof(values), values); - return ractor_selector__wait(selector, - values[0] == Qundef ? Qfalse : RTEST(values[0]), - values[1] != Qundef, values[1], values[2]); + // TODO: more correct? + return sizeof(rb_ractor_t) + ractor_sync_memsize(r); } -static VALUE -ractor_selector_new(int argc, VALUE *ractors, VALUE klass) -{ - VALUE selector = ractor_selector_create(klass); - - for (int i=0; isync.incoming_port_closed) { - prev = Qfalse; - r->sync.incoming_port_closed = true; - if (ractor_wakeup(r, r_th, wait_receiving, wakeup_by_close)) { - VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue)); - RUBY_DEBUG_LOG("cancel receiving"); - } - } - else { - prev = Qtrue; - } - } - RACTOR_UNLOCK(r); - return prev; + VM_ASSERT(rb_ractor_p(self)); + rb_ractor_t *r = DATA_PTR(self); + return r; } -// Ractor#close_outgoing +static rb_atomic_t ractor_last_id; -static VALUE -ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) +#if RACTOR_CHECK_MODE > 0 +uint32_t +rb_ractor_current_id(void) { - VALUE prev; - - RACTOR_LOCK(r); - { - struct rb_ractor_queue *ts = &r->sync.takers_queue; - rb_ractor_t *tr; - struct rb_ractor_basket b; - - if (!r->sync.outgoing_port_closed) { - prev = Qfalse; - r->sync.outgoing_port_closed = true; - } - else { - VM_ASSERT(ractor_queue_empty_p(r, ts)); - prev = Qtrue; - } - - // wakeup all taking ractors - while (ractor_queue_deq(r, ts, &b)) { - if (basket_type_p(&b, basket_type_take_basket)) { - tr = RACTOR_PTR(b.sender); - rb_thread_t *tr_th = b.sending_th; - struct rb_ractor_basket *tb = b.p.take.basket; - - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) { - b.p.take.basket->sender = r->pub.self; - if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) { - rb_bug("unreachable"); - } - RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender))); - } - - if (b.p.take.config) { - b.p.take.config->closed = true; - } - - // TODO: deadlock-able? - RACTOR_LOCK(tr); - { - ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_close); - } - RACTOR_UNLOCK(tr); - } - } - - // raising yielding Ractor - ractor_wakeup(r, NULL, wait_yielding, wakeup_by_close); - - VM_ASSERT(ractor_queue_empty_p(r, ts)); + if (GET_THREAD()->ractor == NULL) { + return 1; // main ractor + } + else { + return rb_ractor_id(GET_RACTOR()); } - RACTOR_UNLOCK(r); - return prev; } +#endif + +#include "ractor_sync.c" // creation/termination @@ -2175,9 +440,7 @@ rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r) rb_gc_ractor_cache_free(r->newobj_cache); r->newobj_cache = NULL; r->status_ = ractor_terminated; - r->sync.outgoing_port_closed = true; - r->sync.incoming_port_closed = true; - r->sync.will_basket.type.e = basket_type_none; + ractor_sync_terminate_atfork(vm, r); } #endif @@ -2194,15 +457,7 @@ rb_ractor_living_threads_init(rb_ractor_t *r) static void ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) { - ractor_queue_setup(&r->sync.recv_queue); - ractor_queue_setup(&r->sync.takers_queue); - rb_native_mutex_initialize(&r->sync.lock); - rb_native_cond_initialize(&r->barrier_wait_cond); - -#ifdef RUBY_THREAD_WIN32_H - rb_native_cond_initialize(&r->barrier_wait_cond); -#endif - ccan_list_head_init(&r->sync.wait.waiting_threads); + ractor_sync_init(r); // thread management rb_thread_sched_init(&r->threads.sched, false); @@ -2255,69 +510,39 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL return rv; } +#if 0 static VALUE ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func) { VALUE block = rb_proc_new(func, Qnil); return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block); } +#endif static void -ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) +ractor_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE result, bool exc) { - if (cr->sync.outgoing_port_closed) { - return; - } - - ASSERT_ractor_unlocking(cr); - - struct rb_ractor_queue *ts = &cr->sync.takers_queue; - rb_thread_t *cur_th = rb_ec_thread_ptr(ec); - - retry: - if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) { - // OK. - } - else { - bool retry = false; - RACTOR_LOCK(cr); - { - if (!ractor_check_take_basket(cr, ts)) { - VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none); - RUBY_DEBUG_LOG("leave a will"); - ractor_basket_fill_will(cr, cur_th, &cr->sync.will_basket, v, exc); - } - else { - RUBY_DEBUG_LOG("rare timing!"); - retry = true; // another ractor is waiting for the yield. - } - } - RACTOR_UNLOCK(cr); - - if (retry) goto retry; - } + ractor_notify_exit(ec, cr, result, exc); } void rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, result, false); + ractor_atexit(ec, cr, result, false); } void rb_ractor_atexit_exception(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_yield_atexit(ec, cr, ec->errinfo, true); + ractor_atexit(ec, cr, ec->errinfo, true); } void rb_ractor_teardown(rb_execution_context_t *ec) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - ractor_close_incoming(ec, cr); - ractor_close_outgoing(ec, cr); // sync with rb_ractor_terminate_interrupt_main_thread() RB_VM_LOCKING() { @@ -2330,7 +555,7 @@ void rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) { for (int i=0; ilocal_storage_store_lock, ractor_local_value_store_i, (VALUE)&data); } -// Ractor::Channel (emulate with Ractor) - -typedef rb_ractor_t rb_ractor_channel_t; - -static VALUE -ractor_channel_func(RB_BLOCK_CALL_FUNC_ARGLIST(y, c)) -{ - rb_execution_context_t *ec = GET_EC(); - rb_ractor_t *cr = rb_ec_ractor_ptr(ec); - - while (1) { - int state; - - EC_PUSH_TAG(ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - VALUE obj = ractor_receive(ec, cr); - ractor_yield(ec, cr, obj, Qfalse); - } - EC_POP_TAG(); - - if (state) { - // ignore the error - break; - } - } - - return Qnil; -} - -static VALUE -rb_ractor_channel_new(void) -{ -#if 0 - return rb_funcall(rb_const_get(rb_cRactor, rb_intern("Channel")), rb_intern("new"), 0); -#else - // class Channel - // def self.new - // Ractor.new do # func body - // while true - // obj = Ractor.receive - // Ractor.yield obj - // end - // rescue Ractor::ClosedError - // nil - // end - // end - // end - - return ractor_create_func(rb_cRactor, Qnil, rb_str_new2("Ractor/channel"), rb_ary_new(), ractor_channel_func); -#endif -} - -static VALUE -rb_ractor_channel_yield(rb_execution_context_t *ec, VALUE vch, VALUE obj) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - ractor_send(ec, (rb_ractor_t *)ch, obj, Qfalse); - return Qnil; -} - -static VALUE -rb_ractor_channel_take(rb_execution_context_t *ec, VALUE vch) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - return ractor_take(ec, (rb_ractor_t *)ch); -} - -static VALUE -rb_ractor_channel_close(rb_execution_context_t *ec, VALUE vch) -{ - VM_ASSERT(ec == rb_current_ec_noinline()); - rb_ractor_channel_t *ch = RACTOR_PTR(vch); - - ractor_close_incoming(ec, (rb_ractor_t *)ch); - return ractor_close_outgoing(ec, (rb_ractor_t *)ch); -} - // Ractor#require struct cross_ractor_require { - VALUE ch; + VALUE port; VALUE result; VALUE exception; @@ -4179,9 +2290,8 @@ ractor_require_protect(struct cross_ractor_require *crr, VALUE (*func)(VALUE)) rb_rescue2(require_result_copy_body, (VALUE)crr, require_result_copy_resuce, (VALUE)crr, rb_eException, 0); - rb_ractor_channel_yield(GET_EC(), crr->ch, Qtrue); + ractor_port_send(GET_EC(), crr->port, Qtrue, Qfalse); return Qnil; - } static VALUE @@ -4197,7 +2307,7 @@ rb_ractor_require(VALUE feature) // TODO: make feature shareable struct cross_ractor_require crr = { .feature = feature, // TODO: ractor - .ch = rb_ractor_channel_new(), + .port = ractor_port_new(GET_RACTOR()), .result = Qundef, .exception = Qundef, }; @@ -4207,8 +2317,8 @@ rb_ractor_require(VALUE feature) rb_ractor_interrupt_exec(main_r, ractore_require_func, &crr, 0); // wait for require done - rb_ractor_channel_take(ec, crr.ch); - rb_ractor_channel_close(ec, crr.ch); + ractor_port_receive(ec, crr.port); + ractor_port_close(ec, crr.port); if (crr.exception != Qundef) { ractor_reset_belonging(crr.exception); @@ -4248,7 +2358,7 @@ rb_ractor_autoload_load(VALUE module, ID name) struct cross_ractor_require crr = { .module = module, .name = name, - .ch = rb_ractor_channel_new(), + .port = ractor_port_new(GET_RACTOR()), .result = Qundef, .exception = Qundef, }; @@ -4258,8 +2368,8 @@ rb_ractor_autoload_load(VALUE module, ID name) rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, &crr, 0); // wait for require done - rb_ractor_channel_take(ec, crr.ch); - rb_ractor_channel_close(ec, crr.ch); + ractor_port_receive(ec, crr.port); + ractor_port_close(ec, crr.port); if (crr.exception != Qundef) { rb_exc_raise(crr.exception); -- cgit v1.2.3