summaryrefslogtreecommitdiff
path: root/ractor.c
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.c')
-rw-r--r--ractor.c2305
1 files changed, 800 insertions, 1505 deletions
diff --git a/ractor.c b/ractor.c
index e608e4e232..2dcbbd10a0 100644
--- a/ractor.c
+++ b/ractor.c
@@ -3,23 +3,26 @@
#include "ruby/ruby.h"
#include "ruby/thread.h"
#include "ruby/ractor.h"
+#include "ruby/re.h"
#include "ruby/thread_native.h"
#include "vm_core.h"
#include "vm_sync.h"
#include "ractor_core.h"
#include "internal/complex.h"
#include "internal/error.h"
+#include "internal/gc.h"
#include "internal/hash.h"
+#include "internal/object.h"
+#include "internal/ractor.h"
#include "internal/rational.h"
#include "internal/struct.h"
#include "internal/thread.h"
#include "variable.h"
-#include "gc.h"
-#include "transient_heap.h"
#include "yjit.h"
-#include "mjit.h"
+#include "zjit.h"
VALUE rb_cRactor;
+static VALUE rb_cRactorSelector;
VALUE rb_eRactorUnsafeError;
VALUE rb_eRactorIsolationError;
@@ -31,12 +34,19 @@ static VALUE rb_cRactorMovedObject;
static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
+
+#if RACTOR_CHECK_MODE > 0
+bool rb_ractor_ignore_belonging_flag = false;
+#endif
+
+// Ractor locking
+
static void
ASSERT_ractor_unlocking(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
- // GET_EC is NULL in an MJIT worker
- if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) {
+ const rb_execution_context_t *ec = rb_current_ec_noinline();
+ if (ec != NULL && r->sync.locked_by == rb_ractor_self(rb_ec_ractor_ptr(ec))) {
rb_bug("recursive ractor locking");
}
#endif
@@ -46,8 +56,8 @@ static void
ASSERT_ractor_locking(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
- // GET_EC is NULL in an MJIT worker
- if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) {
+ const rb_execution_context_t *ec = rb_current_ec_noinline();
+ if (ec != NULL && r->sync.locked_by != rb_ractor_self(rb_ec_ractor_ptr(ec))) {
rp(r->sync.locked_by);
rb_bug("ractor lock is not acquired.");
}
@@ -57,24 +67,32 @@ ASSERT_ractor_locking(rb_ractor_t *r)
static void
ractor_lock(rb_ractor_t *r, const char *file, int line)
{
- RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
+ RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
ASSERT_ractor_unlocking(r);
rb_native_mutex_lock(&r->sync.lock);
+ const rb_execution_context_t *ec = rb_current_ec_noinline();
+ if (ec) {
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ VM_ASSERT(!cr->malloc_gc_disabled);
+ cr->malloc_gc_disabled = true;
+ }
+
#if RACTOR_CHECK_MODE > 0
- if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an MJIT worker
- r->sync.locked_by = rb_ractor_self(GET_RACTOR());
+ if (ec != NULL) {
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ r->sync.locked_by = rb_ractor_self(cr);
}
#endif
- RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
+ RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
}
static void
ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
{
- VM_ASSERT(cr == GET_RACTOR());
+ VM_ASSERT(cr == rb_ec_ractor_ptr(rb_current_ec_noinline()));
#if RACTOR_CHECK_MODE > 0
VM_ASSERT(cr->sync.locked_by != cr->pub.self);
#endif
@@ -88,15 +106,23 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line)
#if RACTOR_CHECK_MODE > 0
r->sync.locked_by = Qnil;
#endif
+
+ const rb_execution_context_t *ec = rb_current_ec_noinline();
+ if (ec) {
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ VM_ASSERT(cr->malloc_gc_disabled);
+ cr->malloc_gc_disabled = false;
+ }
+
rb_native_mutex_unlock(&r->sync.lock);
- RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
+ RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
}
static void
ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
{
- VM_ASSERT(cr == GET_RACTOR());
+ VM_ASSERT(cr == rb_ec_ractor_ptr(rb_current_ec_noinline()));
#if RACTOR_CHECK_MODE > 0
VM_ASSERT(cr->sync.locked_by == cr->pub.self);
#endif
@@ -108,20 +134,20 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
-static void
-ractor_cond_wait(rb_ractor_t *r)
+void
+rb_ractor_lock_self(rb_ractor_t *r)
{
-#if RACTOR_CHECK_MODE > 0
- VALUE locked_by = r->sync.locked_by;
- r->sync.locked_by = Qnil;
-#endif
- rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
+ RACTOR_LOCK_SELF(r);
+}
-#if RACTOR_CHECK_MODE > 0
- r->sync.locked_by = locked_by;
-#endif
+void
+rb_ractor_unlock_self(rb_ractor_t *r)
+{
+ RACTOR_UNLOCK_SELF(r);
}
+// Ractor status
+
static const char *
ractor_status_str(enum ractor_status status)
{
@@ -158,7 +184,7 @@ ractor_status_set(rb_ractor_t *r, enum ractor_status status)
VM_ASSERT(status == ractor_running);
break;
case ractor_terminated:
- VM_ASSERT(0); // unreachable
+ rb_bug("unreachable");
break;
}
@@ -171,86 +197,108 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status)
return rb_ractor_status_p(r, status);
}
-static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);
+// Ractor data/mark/free
-static void
-ractor_queue_mark(struct rb_ractor_queue *rq)
+static void ractor_local_storage_mark(rb_ractor_t *r);
+static void ractor_local_storage_free(rb_ractor_t *r);
+
+static void ractor_sync_mark(rb_ractor_t *r);
+static void ractor_sync_free(rb_ractor_t *r);
+static size_t ractor_sync_memsize(const rb_ractor_t *r);
+static void ractor_sync_init(rb_ractor_t *r);
+
+static int
+mark_targeted_hook_list(st_data_t key, st_data_t value, st_data_t _arg)
{
- for (int i=0; i<rq->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- rb_gc_mark(b->v);
- rb_gc_mark(b->sender);
+ rb_hook_list_t *hook_list = (rb_hook_list_t*)value;
+
+ if (hook_list->type == hook_list_type_targeted_iseq) {
+ rb_gc_mark((VALUE)key);
}
-}
+ else {
+ rb_method_definition_t *def = (rb_method_definition_t*)key;
+ RUBY_ASSERT(hook_list->type == hook_list_type_targeted_def);
+ rb_gc_mark(def->body.bmethod.proc);
+ }
+ rb_hook_list_mark(hook_list);
-static void ractor_local_storage_mark(rb_ractor_t *r);
-static void ractor_local_storage_free(rb_ractor_t *r);
+ return ST_CONTINUE;
+}
static void
ractor_mark(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
-
- ractor_queue_mark(&r->sync.incoming_queue);
- rb_gc_mark(r->sync.wait.taken_basket.v);
- rb_gc_mark(r->sync.wait.taken_basket.sender);
- rb_gc_mark(r->sync.wait.yielded_basket.v);
- rb_gc_mark(r->sync.wait.yielded_basket.sender);
- rb_gc_mark(r->receiving_mutex);
+ bool checking_shareable = rb_gc_checking_shareable();
rb_gc_mark(r->loc);
rb_gc_mark(r->name);
- rb_gc_mark(r->r_stdin);
- rb_gc_mark(r->r_stdout);
- rb_gc_mark(r->r_stderr);
- rb_hook_list_mark(&r->pub.hooks);
-
- if (r->threads.cnt > 0) {
- rb_thread_t *th = 0;
- ccan_list_for_each(&r->threads.set, th, lt_node) {
- VM_ASSERT(th != NULL);
- rb_gc_mark(th->self);
+
+ if (!checking_shareable) {
+ // may unshareable objects
+ rb_gc_mark(r->r_stdin);
+ rb_gc_mark(r->r_stdout);
+ rb_gc_mark(r->r_stderr);
+ rb_gc_mark(r->verbose);
+ rb_gc_mark(r->debug);
+
+ // mark received messages
+ ractor_sync_mark(r);
+
+ rb_hook_list_mark(&r->pub.hooks);
+ if (r->pub.targeted_hooks) {
+ st_foreach(r->pub.targeted_hooks, mark_targeted_hook_list, 0);
}
- }
- ractor_local_storage_mark(r);
+ if (r->threads.cnt > 0) {
+ rb_thread_t *th = 0;
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ VM_ASSERT(th != NULL);
+ rb_gc_mark(th->self);
+ }
+ }
+
+ ractor_local_storage_mark(r);
+ }
}
-static void
-ractor_queue_free(struct rb_ractor_queue *rq)
+static int
+free_targeted_hook_lists(st_data_t key, st_data_t val, st_data_t _arg)
{
- free(rq->baskets);
+ rb_hook_list_t *hook_list = (rb_hook_list_t*)val;
+ rb_hook_list_free(hook_list);
+ return ST_DELETE;
}
static void
-ractor_waiting_list_free(struct rb_ractor_waiting_list *wl)
+free_targeted_hooks(st_table *hooks_tbl)
{
- free(wl->ractors);
+ st_foreach(hooks_tbl, free_targeted_hook_lists, 0);
}
static void
ractor_free(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
+ RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
+ free_targeted_hooks(r->pub.targeted_hooks);
rb_native_mutex_destroy(&r->sync.lock);
- rb_native_cond_destroy(&r->sync.cond);
- ractor_queue_free(&r->sync.incoming_queue);
- ractor_waiting_list_free(&r->sync.taking_ractors);
+#ifdef RUBY_THREAD_WIN32_H
+ rb_native_cond_destroy(&r->sync.wakeup_cond);
+#endif
ractor_local_storage_free(r);
rb_hook_list_free(&r->pub.hooks);
- ruby_xfree(r);
-}
+ st_free_table(r->pub.targeted_hooks);
-static size_t
-ractor_queue_memsize(const struct rb_ractor_queue *rq)
-{
- return sizeof(struct rb_ractor_basket) * rq->size;
-}
+ if (r->newobj_cache) {
+ RUBY_ASSERT(r == ruby_single_main_ractor);
-static size_t
-ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl)
-{
- return sizeof(rb_ractor_t *) * wl->size;
+ rb_gc_ractor_cache_free(r->newobj_cache);
+ r->newobj_cache = NULL;
+ }
+
+ ractor_sync_free(r);
+ ruby_xfree(r);
}
static size_t
@@ -258,10 +306,8 @@ ractor_memsize(const void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
- // TODO
- return sizeof(rb_ractor_t) +
- ractor_queue_memsize(&r->sync.incoming_queue) +
- ractor_waiting_list_memsize(&r->sync.taking_ractors);
+ // TODO: more correct?
+ return sizeof(rb_ractor_t) + ractor_sync_memsize(r);
}
static const rb_data_type_t ractor_data_type = {
@@ -290,16 +336,14 @@ static inline rb_ractor_t *
RACTOR_PTR(VALUE self)
{
VM_ASSERT(rb_ractor_p(self));
-
rb_ractor_t *r = DATA_PTR(self);
- // TODO: check
return r;
}
static rb_atomic_t ractor_last_id;
#if RACTOR_CHECK_MODE > 0
-MJIT_FUNC_EXPORTED uint32_t
+uint32_t
rb_ractor_current_id(void)
{
if (GET_THREAD()->ractor == NULL) {
@@ -311,1095 +355,7 @@ rb_ractor_current_id(void)
}
#endif
-static void
-ractor_queue_setup(struct rb_ractor_queue *rq)
-{
- rq->size = 2;
- rq->cnt = 0;
- rq->start = 0;
- rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
-}
-
-static struct rb_ractor_basket *
-ractor_queue_at(struct rb_ractor_queue *rq, int i)
-{
- return &rq->baskets[(rq->start + i) % rq->size];
-}
-
-static void
-ractor_queue_advance(struct rb_ractor_queue *rq)
-{
- ASSERT_ractor_locking(GET_RACTOR());
-
- if (rq->reserved_cnt == 0) {
- rq->cnt--;
- rq->start = (rq->start + 1) % rq->size;
- rq->serial++;
- }
- else {
- ractor_queue_at(rq, 0)->type = basket_type_deleted;
- }
-}
-
-static bool
-ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
-{
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- return b->type == basket_type_deleted ||
- b->type == basket_type_reserved;
-}
-
-static void
-ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
-{
- ASSERT_ractor_locking(r);
-
- while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
- ractor_queue_advance(rq);
- }
-}
-
-static bool
-ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
-{
- ASSERT_ractor_locking(r);
-
- if (rq->cnt == 0) {
- return true;
- }
-
- ractor_queue_compact(r, rq);
-
- for (int i=0; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(rq, i)) {
- return false;
- }
- }
-
- return true;
-}
-
-static bool
-ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
-{
- bool found = false;
-
- RACTOR_LOCK(r);
- {
- if (!ractor_queue_empty_p(r, rq)) {
- for (int i=0; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(rq, i)) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- *basket = *b;
-
- // remove from queue
- b->type = basket_type_deleted;
- ractor_queue_compact(r, rq);
- found = true;
- break;
- }
- }
- }
- }
- RACTOR_UNLOCK(r);
-
- return found;
-}
-
-static void
-ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
-{
- ASSERT_ractor_locking(r);
-
- if (rq->size <= rq->cnt) {
- rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
- for (int i=rq->size - rq->start; i<rq->cnt; i++) {
- rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
- }
- rq->size *= 2;
- }
- rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
- // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
-}
-
-static void
-ractor_basket_clear(struct rb_ractor_basket *b)
-{
- b->type = basket_type_none;
- b->v = Qfalse;
- b->sender = Qfalse;
-}
-
-static VALUE ractor_reset_belonging(VALUE obj); // in this file
-
-static VALUE
-ractor_basket_value(struct rb_ractor_basket *b)
-{
- switch (b->type) {
- case basket_type_ref:
- break;
- case basket_type_copy:
- case basket_type_move:
- case basket_type_will:
- b->type = basket_type_ref;
- b->v = ractor_reset_belonging(b->v);
- break;
- default:
- rb_bug("unreachable");
- }
-
- return b->v;
-}
-
-static VALUE
-ractor_basket_accept(struct rb_ractor_basket *b)
-{
- VALUE v = ractor_basket_value(b);
-
- if (b->exception) {
- VALUE cause = v;
- VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
- rb_ivar_set(err, rb_intern("@ractor"), b->sender);
- ractor_basket_clear(b);
- rb_ec_setup_exception(NULL, err, cause);
- rb_exc_raise(err);
- }
-
- ractor_basket_clear(b);
- return v;
-}
-
-static void
-ractor_recursive_receive_if(rb_ractor_t *r)
-{
- if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
- rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
- }
-}
-
-static VALUE
-ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- struct rb_ractor_queue *rq = &r->sync.incoming_queue;
- struct rb_ractor_basket basket;
-
- ractor_recursive_receive_if(r);
-
- if (ractor_queue_deq(r, rq, &basket) == false) {
- if (r->sync.incoming_port_closed) {
- rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
- }
- else {
- return Qundef;
- }
- }
-
- return ractor_basket_accept(&basket);
-}
-
-static bool
-ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
-{
- return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
-}
-
-static bool
-ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
-{
- ASSERT_ractor_locking(r);
-
- // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", RUBY_FUNCTION_NAME_STRING, (void *)r,
- // wait_status_str(r->sync.wait.status), wait_status_str(wait_status),
- // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status));
-
- if (ractor_sleeping_by(r, wait_status)) {
- r->sync.wait.wakeup_status = wakeup_status;
- rb_native_cond_signal(&r->sync.cond);
- return true;
- }
- else {
- return false;
- }
-}
-
-static void *
-ractor_sleep_wo_gvl(void *ptr)
-{
- rb_ractor_t *cr = ptr;
- RACTOR_LOCK_SELF(cr);
- {
- VM_ASSERT(cr->sync.wait.status != wait_none);
- if (cr->sync.wait.wakeup_status == wakeup_none) {
- ractor_cond_wait(cr);
- }
- cr->sync.wait.status = wait_none;
- }
- RACTOR_UNLOCK_SELF(cr);
- return NULL;
-}
-
-static void
-ractor_sleep_interrupt(void *ptr)
-{
- rb_ractor_t *r = ptr;
-
- RACTOR_LOCK(r);
- {
- ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
- }
- RACTOR_UNLOCK(r);
-}
-
-#if USE_RUBY_DEBUG_LOG
-static const char *
-wait_status_str(enum rb_ractor_wait_status wait_status)
-{
- switch ((int)wait_status) {
- case wait_none: return "none";
- case wait_receiving: return "receiving";
- case wait_taking: return "taking";
- case wait_yielding: return "yielding";
- case wait_receiving|wait_taking: return "receiving|taking";
- case wait_receiving|wait_yielding: return "receiving|yielding";
- case wait_taking|wait_yielding: return "taking|yielding";
- case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
- }
- rb_bug("unreachable");
-}
-
-static const char *
-wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
-{
- switch (wakeup_status) {
- case wakeup_none: return "none";
- case wakeup_by_send: return "by_send";
- case wakeup_by_yield: return "by_yield";
- case wakeup_by_take: return "by_take";
- case wakeup_by_close: return "by_close";
- case wakeup_by_interrupt: return "by_interrupt";
- case wakeup_by_retry: return "by_retry";
- }
- rb_bug("unreachable");
-}
-#endif // USE_RUBY_DEBUG_LOG
-
-static void
-ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
-{
- VM_ASSERT(GET_RACTOR() == cr);
- VM_ASSERT(cr->sync.wait.status != wait_none);
- // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
- // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
-
- RACTOR_UNLOCK(cr);
- {
- rb_nogvl(ractor_sleep_wo_gvl, cr,
- ractor_sleep_interrupt, cr,
- RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
- }
- RACTOR_LOCK(cr);
-
- // rb_nogvl() can be canceled by interrupts
- if (cr->sync.wait.status != wait_none) {
- cr->sync.wait.status = wait_none;
- cr->sync.wait.wakeup_status = wakeup_by_interrupt;
-
- RACTOR_UNLOCK(cr);
- rb_thread_check_ints();
- RACTOR_LOCK(cr); // reachable?
- }
-}
-
-static void
-ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
-{
- VM_ASSERT(cr == GET_RACTOR());
- bool retry_try = false;
-
- RACTOR_LOCK(r);
- {
- if (ractor_sleeping_by(r, wait_yielding)) {
- // already waiting for yielding. retry try_take.
- retry_try = true;
- }
- else {
- // insert cr into taking list
- struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors;
-
- for (int i=0; i<wl->cnt; i++) {
- if (wl->ractors[i] == cr) {
- // TODO: make it clean code.
- rb_native_mutex_unlock(&r->sync.lock);
- rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting.");
- }
- }
-
- if (wl->size == 0) {
- wl->size = 1;
- wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size);
- if (wl->ractors == NULL) rb_bug("can't allocate buffer");
- }
- else if (wl->size <= wl->cnt + 1) {
- wl->size *= 2;
- wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size);
- if (wl->ractors == NULL) rb_bug("can't re-allocate buffer");
- }
- wl->ractors[wl->cnt++] = cr;
- }
- }
- RACTOR_UNLOCK(r);
-
- if (retry_try) {
- RACTOR_LOCK(cr);
- {
- if (cr->sync.wait.wakeup_status == wakeup_none) {
- VM_ASSERT(cr->sync.wait.status != wait_none);
-
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- cr->sync.wait.status = wait_none;
- }
- }
- RACTOR_UNLOCK(cr);
- }
-}
-
-static void
-ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr)
-{
- RACTOR_LOCK(r);
- {
- int pos = -1;
- for (int i=0; i<wl->cnt; i++) {
- if (wl->ractors[i] == wr) {
- pos = i;
- break;
- }
- }
- if (pos >= 0) { // found
- wl->cnt--;
- for (int i=pos; i<wl->cnt; i++) {
- wl->ractors[i] = wl->ractors[i+1];
- }
- }
- }
- RACTOR_UNLOCK(r);
-}
-
-static rb_ractor_t *
-ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
-{
- ASSERT_ractor_locking(r);
- VM_ASSERT(&r->sync.taking_ractors == wl);
-
- if (wl->cnt > 0) {
- rb_ractor_t *tr = wl->ractors[0];
- for (int i=1; i<wl->cnt; i++) {
- wl->ractors[i-1] = wl->ractors[i];
- }
- wl->cnt--;
- return tr;
- }
- else {
- return NULL;
- }
-}
-
-static void
-ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
-{
- VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
- ractor_recursive_receive_if(cr);
-
- RACTOR_LOCK(cr);
- {
- if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) {
- VM_ASSERT(cr->sync.wait.status == wait_none);
- cr->sync.wait.status = wait_receiving;
- cr->sync.wait.wakeup_status = wakeup_none;
- ractor_sleep(ec, cr);
- cr->sync.wait.wakeup_status = wakeup_none;
- }
- }
- RACTOR_UNLOCK(cr);
-}
-
-static VALUE
-ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
-{
- VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
- VALUE v;
-
- while (UNDEF_P(v = ractor_try_receive(ec, cr))) {
- ractor_receive_wait(ec, cr);
- }
-
- return v;
-}
-
-#if 0
-// for debug
-static const char *
-basket_type_name(enum rb_ractor_basket_type type)
-{
- switch (type) {
-#define T(t) case basket_type_##t: return #t
- T(none);
- T(ref);
- T(copy);
- T(move);
- T(will);
- T(deleted);
- T(reserved);
- default: rb_bug("unreachable");
- }
-}
-
-static void
-rq_dump(struct rb_ractor_queue *rq)
-{
- bool bug = false;
- for (int i=0; i<rq->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
- (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
- if (b->type == basket_type_reserved) bug = true;
- }
- if (bug) rb_bug("!!");
-}
-#endif
-
-struct receive_block_data {
- rb_ractor_t *cr;
- struct rb_ractor_queue *rq;
- VALUE v;
- int index;
- bool success;
-};
-
-static void
-ractor_receive_if_lock(rb_ractor_t *cr)
-{
- VALUE m = cr->receiving_mutex;
- if (m == Qfalse) {
- m = cr->receiving_mutex = rb_mutex_new();
- }
- rb_mutex_lock(m);
-}
-
-static VALUE
-receive_if_body(VALUE ptr)
-{
- struct receive_block_data *data = (struct receive_block_data *)ptr;
-
- ractor_receive_if_lock(data->cr);
- VALUE block_result = rb_yield(data->v);
-
- RACTOR_LOCK_SELF(data->cr);
- {
- struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
- VM_ASSERT(b->type == basket_type_reserved);
- data->rq->reserved_cnt--;
-
- if (RTEST(block_result)) {
- b->type = basket_type_deleted;
- ractor_queue_compact(data->cr, data->rq);
- }
- else {
- b->type = basket_type_ref;
- }
- }
- RACTOR_UNLOCK_SELF(data->cr);
-
- data->success = true;
-
- if (RTEST(block_result)) {
- return data->v;
- }
- else {
- return Qundef;
- }
-}
-
-static VALUE
-receive_if_ensure(VALUE v)
-{
- struct receive_block_data *data = (struct receive_block_data *)v;
-
- if (!data->success) {
- RACTOR_LOCK_SELF(data->cr);
- {
- struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
- VM_ASSERT(b->type == basket_type_reserved);
- b->type = basket_type_deleted;
- data->rq->reserved_cnt--;
- }
- RACTOR_UNLOCK_SELF(data->cr);
- }
-
- rb_mutex_unlock(data->cr->receiving_mutex);
- return Qnil;
-}
-
-static VALUE
-ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
-{
- if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
-
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- unsigned int serial = (unsigned int)-1;
- int index = 0;
- struct rb_ractor_queue *rq = &cr->sync.incoming_queue;
-
- while (1) {
- VALUE v = Qundef;
-
- ractor_receive_wait(ec, cr);
-
- RACTOR_LOCK_SELF(cr);
- {
- if (serial != rq->serial) {
- serial = rq->serial;
- index = 0;
- }
-
- // check newer version
- for (int i=index; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(rq, i)) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- v = ractor_basket_value(b);
- b->type = basket_type_reserved;
- rq->reserved_cnt++;
- index = i;
- break;
- }
- }
- }
- RACTOR_UNLOCK_SELF(cr);
-
- if (!UNDEF_P(v)) {
- struct receive_block_data data = {
- .cr = cr,
- .rq = rq,
- .v = v,
- .index = index,
- .success = false,
- };
-
- VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
- receive_if_ensure, (VALUE)&data);
-
- if (!UNDEF_P(result)) return result;
- index++;
- }
-
- RUBY_VM_CHECK_INTS(ec);
- }
-}
-
-static void
-ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
-{
- bool closed = false;
- struct rb_ractor_queue *rq = &r->sync.incoming_queue;
-
- RACTOR_LOCK(r);
- {
- if (r->sync.incoming_port_closed) {
- closed = true;
- }
- else {
- ractor_queue_enq(r, rq, b);
- if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) {
- RUBY_DEBUG_LOG("wakeup");
- }
- }
- }
- RACTOR_UNLOCK(r);
-
- if (closed) {
- rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
- }
-}
-
-static VALUE ractor_move(VALUE obj); // in this file
-static VALUE ractor_copy(VALUE obj); // in this file
-
-static void
-ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield)
-{
- basket->sender = rb_ec_ractor_ptr(ec)->pub.self;
- basket->exception = exc;
-
- if (is_will) {
- basket->type = basket_type_will;
- basket->v = obj;
- }
- else if (rb_ractor_shareable_p(obj)) {
- basket->type = basket_type_ref;
- basket->v = obj;
- }
- else if (!RTEST(move)) {
- basket->v = ractor_copy(obj);
- basket->type = basket_type_copy;
- }
- else {
- basket->type = basket_type_move;
-
- if (is_yield) {
- basket->v = obj; // call ractor_move() when yielding timing.
- }
- else {
- basket->v = ractor_move(obj);
- }
- }
-}
-
-static VALUE
-ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
-{
- struct rb_ractor_basket basket;
- ractor_basket_setup(ec, &basket, obj, move, false, false, false);
- ractor_send_basket(ec, r, &basket);
- return r->pub.self;
-}
-
-static VALUE
-ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- struct rb_ractor_basket basket = {
- .type = basket_type_none,
- };
- bool closed = false;
-
- RACTOR_LOCK(r);
- {
- if (ractor_sleeping_by(r, wait_yielding)) {
- MAYBE_UNUSED(bool) wakeup_result;
- VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none);
-
- if (r->sync.wait.yielded_basket.type == basket_type_move) {
- wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry);
- }
- else {
- wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take);
- basket = r->sync.wait.yielded_basket;
- ractor_basket_clear(&r->sync.wait.yielded_basket);
- }
- VM_ASSERT(wakeup_result);
- }
- else if (r->sync.outgoing_port_closed) {
- closed = true;
- }
- }
- RACTOR_UNLOCK(r);
-
- if (basket.type == basket_type_none) {
- if (closed) {
- rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
- }
- else {
- return Qundef;
- }
- }
- else {
- return ractor_basket_accept(&basket);
- }
-}
-
-static VALUE
-ractor_yield_move_body(VALUE v)
-{
- return ractor_move(v);
-}
-
-static bool
-ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket)
-{
- ASSERT_ractor_unlocking(cr);
- VM_ASSERT(basket->type != basket_type_none);
-
- if (cr->sync.outgoing_port_closed) {
- rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
- }
-
- rb_ractor_t *r;
-
- retry_shift:
- RACTOR_LOCK(cr);
- {
- r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors);
- }
- RACTOR_UNLOCK(cr);
-
- if (r) {
- bool retry_shift = false;
-
- RACTOR_LOCK(r);
- {
- if (ractor_sleeping_by(r, wait_taking)) {
- VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none);
-
- if (basket->type == basket_type_move) {
- enum rb_ractor_wait_status prev_wait_status = r->sync.wait.status;
- r->sync.wait.status = wait_moving;
-
- RACTOR_UNLOCK(r);
- {
- int state;
- VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state);
- if (state) {
- r->sync.wait.status = prev_wait_status;
- rb_jump_tag(state);
- }
- else {
- basket->v = moved_value;
- }
- }
- RACTOR_LOCK(r);
-
- if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) {
- // terminating?
- }
- }
- else {
- ractor_wakeup(r, wait_taking, wakeup_by_yield);
- }
- r->sync.wait.taken_basket = *basket;
- }
- else {
- retry_shift = true;
- }
- }
- RACTOR_UNLOCK(r);
-
- if (retry_shift) {
- // get candidate take-waiting ractor, but already woke up by another reason.
- // retry to check another ractor.
- goto retry_shift;
- }
- else {
- return true;
- }
- }
- else {
- return false;
- }
-}
-
-// select(r1, r2, r3, receive: true, yield: obj)
-static VALUE
-ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VALUE yielded_value, bool move, VALUE *ret_r)
-{
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- VALUE crv = cr->pub.self;
- VALUE ret = Qundef;
- int i;
- bool interrupted = false;
- enum rb_ractor_wait_status wait_status = 0;
- bool yield_p = !UNDEF_P(yielded_value) ? true : false;
- const int alen = rs_len + (yield_p ? 1 : 0);
-
- struct ractor_select_action {
- enum ractor_select_action_type {
- ractor_select_action_take,
- ractor_select_action_receive,
- ractor_select_action_yield,
- } type;
- VALUE v;
- } *actions = ALLOCA_N(struct ractor_select_action, alen);
-
- VM_ASSERT(cr->sync.wait.status == wait_none);
- VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
- VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
- VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
-
- // setup actions
- for (i=0; i<rs_len; i++) {
- VALUE v = rs[i];
-
- if (v == crv) {
- actions[i].type = ractor_select_action_receive;
- actions[i].v = Qnil;
- wait_status |= wait_receiving;
- }
- else if (rb_ractor_p(v)) {
- actions[i].type = ractor_select_action_take;
- actions[i].v = v;
- wait_status |= wait_taking;
- }
- else {
- rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v);
- }
- }
- rs = NULL;
-
- restart:
-
- if (yield_p) {
- actions[rs_len].type = ractor_select_action_yield;
- actions[rs_len].v = Qundef;
- wait_status |= wait_yielding;
- ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true);
- }
-
- // TODO: shuffle actions
-
- while (1) {
- RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status));
-
- for (i=0; i<alen; i++) {
- VALUE v, rv;
- switch (actions[i].type) {
- case ractor_select_action_take:
- rv = actions[i].v;
- v = ractor_try_take(ec, RACTOR_PTR(rv));
- if (!UNDEF_P(v)) {
- *ret_r = rv;
- ret = v;
- goto cleanup;
- }
- break;
- case ractor_select_action_receive:
- v = ractor_try_receive(ec, cr);
- if (!UNDEF_P(v)) {
- *ret_r = ID2SYM(rb_intern("receive"));
- ret = v;
- goto cleanup;
- }
- break;
- case ractor_select_action_yield:
- {
- if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) {
- *ret_r = ID2SYM(rb_intern("yield"));
- ret = Qnil;
- goto cleanup;
- }
- }
- break;
- }
- }
-
- RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status));
-
- RACTOR_LOCK(cr);
- {
- VM_ASSERT(cr->sync.wait.status == wait_none);
- cr->sync.wait.status = wait_status;
- cr->sync.wait.wakeup_status = wakeup_none;
- }
- RACTOR_UNLOCK(cr);
-
- // prepare waiting
- for (i=0; i<alen; i++) {
- rb_ractor_t *r;
- switch (actions[i].type) {
- case ractor_select_action_take:
- r = RACTOR_PTR(actions[i].v);
- ractor_register_taking(r, cr);
- break;
- case ractor_select_action_yield:
- case ractor_select_action_receive:
- break;
- }
- }
-
- // wait
- RACTOR_LOCK(cr);
- {
- if (cr->sync.wait.wakeup_status == wakeup_none) {
- for (i=0; i<alen; i++) {
- rb_ractor_t *r;
-
- switch (actions[i].type) {
- case ractor_select_action_take:
- r = RACTOR_PTR(actions[i].v);
- if (ractor_sleeping_by(r, wait_yielding)) {
- RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->pub.id);
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- goto skip_sleep;
- }
- break;
- case ractor_select_action_receive:
- if (cr->sync.incoming_queue.cnt > 0) {
- RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt);
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- goto skip_sleep;
- }
- break;
- case ractor_select_action_yield:
- if (cr->sync.taking_ractors.cnt > 0) {
- RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt);
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- goto skip_sleep;
- }
- else if (cr->sync.outgoing_port_closed) {
- cr->sync.wait.wakeup_status = wakeup_by_close;
- goto skip_sleep;
- }
- break;
- }
- }
-
- RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status));
- ractor_sleep(ec, cr);
- RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status));
- }
- else {
- skip_sleep:
- RUBY_DEBUG_LOG("no need to sleep %s->%s",
- wait_status_str(cr->sync.wait.status),
- wakeup_status_str(cr->sync.wait.wakeup_status));
- cr->sync.wait.status = wait_none;
- }
- }
- RACTOR_UNLOCK(cr);
-
- // cleanup waiting
- for (i=0; i<alen; i++) {
- rb_ractor_t *r;
- switch (actions[i].type) {
- case ractor_select_action_take:
- r = RACTOR_PTR(actions[i].v);
- ractor_waiting_list_del(r, &r->sync.taking_ractors, cr);
- break;
- case ractor_select_action_receive:
- case ractor_select_action_yield:
- break;
- }
- }
-
- // check results
- enum rb_ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status;
- cr->sync.wait.wakeup_status = wakeup_none;
-
- switch (wakeup_status) {
- case wakeup_none:
- // OK. something happens.
- // retry loop.
- break;
- case wakeup_by_retry:
- // Retry request.
- break;
- case wakeup_by_send:
- // OK.
- // retry loop and try_receive will succss.
- break;
- case wakeup_by_yield:
- // take was succeeded!
- // cr.wait.taken_basket contains passed block
- VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none);
- *ret_r = cr->sync.wait.taken_basket.sender;
- VM_ASSERT(rb_ractor_p(*ret_r));
- ret = ractor_basket_accept(&cr->sync.wait.taken_basket);
- goto cleanup;
- case wakeup_by_take:
- *ret_r = ID2SYM(rb_intern("yield"));
- ret = Qnil;
- goto cleanup;
- case wakeup_by_close:
- // OK.
- // retry loop and will get CloseError.
- break;
- case wakeup_by_interrupt:
- ret = Qundef;
- interrupted = true;
- goto cleanup;
- }
- }
-
- cleanup:
- RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status));
-
- if (cr->sync.wait.yielded_basket.type != basket_type_none) {
- ractor_basket_clear(&cr->sync.wait.yielded_basket);
- }
-
- VM_ASSERT(cr->sync.wait.status == wait_none);
- VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
- VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
- VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
-
- if (interrupted) {
- rb_vm_check_ints_blocking(ec);
- interrupted = false;
- goto restart;
- }
-
- VM_ASSERT(!UNDEF_P(ret));
- return ret;
-}
-
-static VALUE
-ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
-{
- VALUE ret_r;
- ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r);
- return Qnil;
-}
-
-static VALUE
-ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- VALUE ret_r;
- VALUE v = ractor_select(ec, &r->pub.self, 1, Qundef, false, &ret_r);
- return v;
-}
-
-static VALUE
-ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- VALUE prev;
-
- RACTOR_LOCK(r);
- {
- if (!r->sync.incoming_port_closed) {
- prev = Qfalse;
- r->sync.incoming_port_closed = true;
- if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
- VM_ASSERT(r->sync.incoming_queue.cnt == 0);
- RUBY_DEBUG_LOG("cancel receiving");
- }
- }
- else {
- prev = Qtrue;
- }
- }
- RACTOR_UNLOCK(r);
- return prev;
-}
-
-static VALUE
-ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
-{
- VALUE prev;
-
- RACTOR_LOCK(r);
- {
- if (!r->sync.outgoing_port_closed) {
- prev = Qfalse;
- r->sync.outgoing_port_closed = true;
- }
- else {
- prev = Qtrue;
- }
-
- // wakeup all taking ractors
- rb_ractor_t *taking_ractor;
- while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) {
- RACTOR_LOCK(taking_ractor);
- ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close);
- RACTOR_UNLOCK(taking_ractor);
- }
-
- // raising yielding Ractor
- if (!r->yield_atexit &&
- ractor_wakeup(r, wait_yielding, wakeup_by_close)) {
- RUBY_DEBUG_LOG("cancel yielding");
- }
- }
- RACTOR_UNLOCK(r);
- return prev;
-}
+#include "ractor_sync.c"
// creation/termination
@@ -1421,6 +377,13 @@ vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node);
vm->ractor.cnt++;
+
+ if (r->newobj_cache) {
+ VM_ASSERT(r == ruby_single_main_ractor);
+ }
+ else {
+ r->newobj_cache = rb_gc_ractor_cache_alloc(r);
+ }
}
static void
@@ -1429,16 +392,8 @@ cancel_single_ractor_mode(void)
// enable multi-ractor mode
RUBY_DEBUG_LOG("enable multi-ractor mode");
- VALUE was_disabled = rb_gc_enable();
-
- rb_gc_start();
- rb_transient_heap_evacuate();
-
- if (was_disabled) {
- rb_gc_disable();
- }
-
ruby_single_main_ractor = NULL;
+ rb_funcall(rb_cRactor, rb_intern("_activated"), 0);
}
static void
@@ -1489,8 +444,8 @@ vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
}
vm->ractor.cnt--;
- /* Clear the cached freelist to prevent a memory leak. */
- rb_gc_ractor_newobj_cache_clear(&cr->newobj_cache);
+ rb_gc_ractor_cache_free(cr->newobj_cache);
+ cr->newobj_cache = NULL;
ractor_status_set(cr, ractor_terminated);
}
@@ -1504,6 +459,7 @@ ractor_alloc(VALUE klass)
VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r);
FL_SET_RAW(rv, RUBY_FL_SHAREABLE);
r->pub.self = rv;
+ r->next_ec_serial = 1;
VM_ASSERT(ractor_status_p(r, ractor_created));
return rv;
}
@@ -1511,22 +467,25 @@ ractor_alloc(VALUE klass)
rb_ractor_t *
rb_ractor_main_alloc(void)
{
- rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t));
+ rb_ractor_t *r = ruby_mimcalloc(1, sizeof(rb_ractor_t));
if (r == NULL) {
fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
exit(EXIT_FAILURE);
}
- MEMZERO(r, rb_ractor_t, 1);
r->pub.id = ++ractor_last_id;
r->loc = Qnil;
r->name = Qnil;
r->pub.self = Qnil;
+ r->newobj_cache = rb_gc_ractor_cache_alloc(r);
+ r->next_ec_serial = 1;
ruby_single_main_ractor = r;
return r;
}
#if defined(HAVE_WORKING_FORK)
+// Set up the main Ractor for the VM after fork.
+// Puts us in "single Ractor mode"
void
rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
{
@@ -1542,9 +501,18 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
VM_ASSERT(vm->ractor.blocking_cnt == 0);
VM_ASSERT(vm->ractor.cnt == 1);
}
+
+void
+rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r)
+{
+ rb_gc_ractor_cache_free(r->newobj_cache);
+ r->newobj_cache = NULL;
+ r->status_ = ractor_terminated;
+ ractor_sync_terminate_atfork(vm, r);
+}
#endif
-void rb_thread_sched_init(struct rb_thread_sched *);
+void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
void
rb_ractor_living_threads_init(rb_ractor_t *r)
@@ -1557,13 +525,12 @@ rb_ractor_living_threads_init(rb_ractor_t *r)
static void
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
{
- ractor_queue_setup(&r->sync.incoming_queue);
- rb_native_mutex_initialize(&r->sync.lock);
- rb_native_cond_initialize(&r->sync.cond);
- rb_native_cond_initialize(&r->barrier_wait_cond);
+ ractor_sync_init(r);
+ r->pub.targeted_hooks = st_init_numtable();
+ r->pub.hooks.type = hook_list_type_ractor_local;
// thread management
- rb_thread_sched_init(&r->threads.sched);
+ rb_thread_sched_init(&r->threads.sched, false);
rb_ractor_living_threads_init(r);
// naming
@@ -1575,20 +542,24 @@ ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
rb_enc_name(enc));
}
- name = rb_str_new_frozen(name);
+ name = RB_OBJ_SET_SHAREABLE(rb_str_new_frozen(name));
}
- r->name = name;
+
+ if (!SPECIAL_CONST_P(loc)) RB_OBJ_SET_SHAREABLE(loc);
r->loc = loc;
+ r->name = name;
}
void
rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th)
{
- r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
+ VALUE rv = r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE);
ractor_init(r, Qnil, Qnil);
r->threads.main = th;
rb_ractor_living_threads_insert(r, th);
+
+ RB_GC_GUARD(rv);
}
static VALUE
@@ -1598,7 +569,6 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL
rb_ractor_t *r = RACTOR_PTR(rv);
ractor_init(r, name, loc);
- // can block here
r->pub.id = ractor_next_id();
RUBY_DEBUG_LOG("r:%u", r->pub.id);
@@ -1607,88 +577,59 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL
r->debug = cr->debug;
rb_yjit_before_ractor_spawn();
- rb_mjit_before_ractor_spawn();
+ rb_zjit_before_ractor_spawn();
rb_thread_create_ractor(r, args, block);
RB_GC_GUARD(rv);
return rv;
}
-static void
-ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
+#if 0
+static VALUE
+ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func)
{
- if (cr->sync.outgoing_port_closed) {
- return;
- }
-
- ASSERT_ractor_unlocking(cr);
-
- struct rb_ractor_basket basket;
- ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */);
-
- retry:
- if (ractor_try_yield(ec, cr, &basket)) {
- // OK.
- }
- else {
- bool retry = false;
- RACTOR_LOCK(cr);
- {
- if (cr->sync.taking_ractors.cnt == 0) {
- cr->sync.wait.yielded_basket = basket;
-
- VM_ASSERT(cr->sync.wait.status == wait_none);
- cr->sync.wait.status = wait_yielding;
- cr->sync.wait.wakeup_status = wakeup_none;
-
- VM_ASSERT(cr->yield_atexit == false);
- cr->yield_atexit = true;
- }
- else {
- retry = true; // another ractor is waiting for the yield.
- }
- }
- RACTOR_UNLOCK(cr);
+ VALUE block = rb_proc_new(func, Qnil);
+ return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block);
+}
+#endif
- if (retry) goto retry;
- }
+static void
+ractor_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE result, bool exc)
+{
+ ractor_notify_exit(ec, cr, result, exc);
}
void
-rb_ractor_teardown(rb_execution_context_t *ec)
+rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_close_incoming(ec, cr);
- ractor_close_outgoing(ec, cr);
-
- // sync with rb_ractor_terminate_interrupt_main_thread()
- RB_VM_LOCK_ENTER();
- {
- VM_ASSERT(cr->threads.main != NULL);
- cr->threads.main = NULL;
- }
- RB_VM_LOCK_LEAVE();
+ ractor_atexit(ec, cr, result, false);
}
void
-rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
+rb_ractor_atexit_exception(rb_execution_context_t *ec)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_yield_atexit(ec, cr, result, false);
+ ractor_atexit(ec, cr, ec->errinfo, true);
}
void
-rb_ractor_atexit_exception(rb_execution_context_t *ec)
+rb_ractor_teardown(rb_execution_context_t *ec)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_yield_atexit(ec, cr, ec->errinfo, true);
+
+ // sync with rb_ractor_terminate_interrupt_main_thread()
+ RB_VM_LOCKING() {
+ VM_ASSERT(cr->threads.main != NULL);
+ cr->threads.main = NULL;
+ }
}
void
rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
{
for (int i=0; i<len; i++) {
- ptr[i] = ractor_receive(ec, r);
+ ptr[i] = ractor_receive(ec, ractor_default_port(r));
}
}
@@ -1697,11 +638,11 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args
{
int len = RARRAY_LENINT(args);
for (int i=0; i<len; i++) {
- ractor_send(ec, r, RARRAY_AREF(args, i), false);
+ ractor_send(ec, ractor_default_port(r), RARRAY_AREF(args, i), false);
}
}
-MJIT_FUNC_EXPORTED bool
+bool
rb_ractor_main_p_(void)
{
VM_ASSERT(rb_multi_ractor_p());
@@ -1709,49 +650,30 @@ rb_ractor_main_p_(void)
return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor;
}
-bool
-rb_obj_is_main_ractor(VALUE gv)
-{
- if (!rb_ractor_p(gv)) return false;
- rb_ractor_t *r = DATA_PTR(gv);
- return r == GET_VM()->ractor.main_ractor;
-}
-
int
rb_ractor_living_thread_num(const rb_ractor_t *r)
{
return r->threads.cnt;
}
+// only for current ractor
VALUE
-rb_ractor_thread_list(rb_ractor_t *r)
+rb_ractor_thread_list(void)
{
+ rb_ractor_t *r = GET_RACTOR();
rb_thread_t *th = 0;
- VALUE *ts;
- int ts_cnt;
+ VALUE ary = rb_ary_new();
- RACTOR_LOCK(r);
- {
- ts = ALLOCA_N(VALUE, r->threads.cnt);
- ts_cnt = 0;
-
- ccan_list_for_each(&r->threads.set, th, lt_node) {
- switch (th->status) {
- case THREAD_RUNNABLE:
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- ts[ts_cnt++] = th->self;
- default:
- break;
- }
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ switch (th->status) {
+ case THREAD_RUNNABLE:
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ rb_ary_push(ary, th->self);
+ default:
+ break;
}
}
- RACTOR_UNLOCK(r);
-
- VALUE ary = rb_ary_new();
- for (int i=0; i<ts_cnt; i++) {
- rb_ary_push(ary, ts[i]);
- }
return ary;
}
@@ -1824,16 +746,15 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c
cr->threads.cnt == cr->threads.blocking_cnt + 1) {
// change ractor status: running -> blocking
rb_vm_t *vm = GET_VM();
- ASSERT_vm_unlocking();
- RB_VM_LOCK();
- {
+ RB_VM_LOCKING() {
rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line);
}
- RB_VM_UNLOCK();
}
}
+void rb_threadptr_remove(rb_thread_t *th);
+
void
rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
{
@@ -1841,6 +762,8 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
+ rb_threadptr_remove(th);
+
if (cr->threads.cnt == 1) {
vm_remove_ractor(th->vm, cr);
}
@@ -1878,11 +801,9 @@ rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line)
if (cr->threads.cnt == cr->threads.blocking_cnt) {
rb_vm_t *vm = GET_VM();
- RB_VM_LOCK_ENTER();
- {
+ RB_VM_LOCKING() {
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
}
- RB_VM_LOCK_LEAVE();
}
cr->threads.blocking_cnt--;
@@ -1936,25 +857,31 @@ ractor_terminal_interrupt_all(rb_vm_t *vm)
rb_ractor_t *r = 0;
ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
if (r != vm->ractor.main_ractor) {
+ RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r));
rb_ractor_terminate_interrupt_main_thread(r);
}
}
}
}
+void rb_add_running_thread(rb_thread_t *th);
+void rb_del_running_thread(rb_thread_t *th);
+
void
rb_ractor_terminate_all(void)
{
rb_vm_t *vm = GET_VM();
rb_ractor_t *cr = vm->ractor.main_ractor;
+ RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt);
+
VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
- if (vm->ractor.cnt > 1) {
- RB_VM_LOCK();
+ RB_VM_LOCK();
+ {
ractor_terminal_interrupt_all(vm); // kill all ractors
- RB_VM_UNLOCK();
}
+ RB_VM_UNLOCK();
rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
RB_VM_LOCK();
@@ -1965,7 +892,20 @@ rb_ractor_terminate_all(void)
// wait for 1sec
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
+ rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
+#ifdef RUBY_THREAD_PTHREAD_H
+ while (vm->ractor.sched.barrier_waiting) {
+ // A barrier is waiting. Threads relinquish the VM lock before joining the barrier and
+ // since we just acquired the VM lock back, we're blocking other threads from joining it.
+ // We loop until the barrier is over. We can't join this barrier because our thread isn't added to
+ // running_threads until the call below to `rb_add_running_thread`.
+ RB_VM_UNLOCK();
+ unsigned int lev;
+ RB_VM_LOCK_ENTER_LEV_NB(&lev);
+ }
+#endif
+ rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
ractor_terminal_interrupt_all(vm);
@@ -1977,7 +917,23 @@ rb_ractor_terminate_all(void)
rb_execution_context_t *
rb_vm_main_ractor_ec(rb_vm_t *vm)
{
- return vm->ractor.main_ractor->threads.running_ec;
+ /* This code needs to carefully work around two bugs:
+ * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is
+ * actually currently running (as opposed to without M:N threading, when
+ * running_ec will still point to the _last_ thread which ran)
+ * - Bug #20197: If the main thread is sleeping, setting its postponed job
+ * interrupt flag is pointless; it won't look at the flag until it stops sleeping
+ * for some reason. It would be better to set the flag on the running ec, which
+ * will presumably look at it soon.
+ *
+ * Solution: use running_ec if it's set, otherwise fall back to the main thread ec.
+ * This is still susceptible to some rare race conditions (what if the last thread
+ * to run just entered a long-running sleep?), but seems like the best balance of
+ * robustness and complexity.
+ */
+ rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec;
+ if (running_ec) { return running_ec; }
+ return vm->ractor.main_thread->ec;
}
static VALUE
@@ -1987,53 +943,46 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self)
}
/*
+ * Document-class: Ractor::Error
+ *
+ * The parent class of Ractor-related error classes.
+ */
+
+/*
* Document-class: Ractor::ClosedError
*
* Raised when an attempt is made to send a message to a closed port,
* or to retrieve a message from a closed and empty port.
- * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
+ * Ports may be closed explicitly with Ractor::Port#close
* and are closed implicitly when a Ractor terminates.
*
- * r = Ractor.new { sleep(500) }
- * r.close_outgoing
- * r.take # Ractor::ClosedError
- *
- * ClosedError is a descendant of StopIteration, so the closing of the ractor will break
- * the loops without propagating the error:
- *
- * r = Ractor.new do
- * loop do
- * msg = receive # raises ClosedError and loop traps it
- * puts "Received: #{msg}"
- * end
- * puts "loop exited"
- * end
+ * port = Ractor::Port.new
+ * port.close
+ * port << "test" # Ractor::ClosedError
+ * port.receive # Ractor::ClosedError
*
- * 3.times{|i| r << i}
- * r.close_incoming
- * r.take
- * puts "Continue successfully"
- *
- * This will print:
+ * ClosedError is a descendant of StopIteration, so the closing of a port will break
+ * out of loops without propagating the error.
+ */
+
+/*
+ * Document-class: Ractor::IsolationError
*
- * Received: 0
- * Received: 1
- * Received: 2
- * loop exited
- * Continue successfully
+ * Raised on attempt to make a Ractor-unshareable object
+ * Ractor-shareable.
*/
/*
* Document-class: Ractor::RemoteError
*
- * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
+ * Raised on Ractor#join or Ractor#value if there was an uncaught exception in the Ractor.
* Its +cause+ will contain the original exception, and +ractor+ is the original ractor
* it was raised in.
*
* r = Ractor.new { raise "Something weird happened" }
*
* begin
- * r.take
+ * r.value
* rescue => e
* p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
* p e.ractor == r # => true
@@ -2045,7 +994,7 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self)
/*
* Document-class: Ractor::MovedError
*
- * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
+ * Raised on an attempt to access an object which was moved in Ractor#send or Ractor::Port#send.
*
* r = Ractor.new { sleep }
*
@@ -2060,7 +1009,7 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self)
* Document-class: Ractor::MovedObject
*
* A special object which replaces any value that was moved to another ractor in Ractor#send
- * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
+ * or Ractor::Port#send. Any attempt to access the object results in Ractor::MovedError.
*
* r = Ractor.new { receive }
*
@@ -2072,6 +1021,12 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self)
* # Ractor::MovedError (can not send any methods to a moved object)
*/
+/*
+ * Document-class: Ractor::UnsafeError
+ *
+ * Raised when Ractor-unsafe C-methods is invoked by a non-main Ractor.
+ */
+
// Main docs are in ractor.rb, but without this clause there are weird artifacts
// in their rendering.
/*
@@ -2105,6 +1060,8 @@ Init_Ractor(void)
rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
+
+ Init_RactorPort();
}
void
@@ -2198,6 +1155,50 @@ rb_ractor_hooks(rb_ractor_t *cr)
return &cr->pub.hooks;
}
+st_table *
+rb_ractor_targeted_hooks(rb_ractor_t *cr)
+{
+ return cr->pub.targeted_hooks;
+}
+
+static void
+rb_obj_set_shareable_no_assert(VALUE obj)
+{
+ FL_SET_RAW(obj, FL_SHAREABLE);
+
+ if (rb_obj_gen_fields_p(obj)) {
+ VALUE fields = rb_obj_fields_no_ractor_check(obj);
+ if (imemo_type_p(fields, imemo_fields)) {
+ // no recursive mark
+ FL_SET_RAW(fields, FL_SHAREABLE);
+ }
+ }
+}
+
+#ifndef STRICT_VERIFY_SHAREABLE
+#define STRICT_VERIFY_SHAREABLE 0
+#endif
+
+bool
+rb_ractor_verify_shareable(VALUE obj)
+{
+#if STRICT_VERIFY_SHAREABLE
+ rb_gc_verify_shareable(obj);
+#endif
+ return true;
+}
+
+VALUE
+rb_obj_set_shareable(VALUE obj)
+{
+ RUBY_ASSERT(!RB_SPECIAL_CONST_P(obj));
+
+ rb_obj_set_shareable_no_assert(obj);
+ RUBY_ASSERT(rb_ractor_verify_shareable(obj));
+
+ return obj;
+}
+
/// traverse function
// 2: stop search
@@ -2250,19 +1251,6 @@ obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr)
return ST_CONTINUE;
}
-static enum rb_id_table_iterator_result
-obj_hash_iv_traverse_i(VALUE val, void *ptr)
-{
- struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
-
- if (obj_traverse_i(val, d->data)) {
- d->stop = true;
- return ID_TABLE_STOP;
- }
-
- return ID_TABLE_CONTINUE;
-}
-
static void
obj_traverse_reachable_i(VALUE obj, void *ptr)
{
@@ -2278,12 +1266,25 @@ obj_traverse_rec(struct obj_traverse_data *data)
{
if (UNLIKELY(!data->rec)) {
data->rec_hash = rb_ident_hash_new();
- data->rec = rb_hash_st_table(data->rec_hash);
+ data->rec = RHASH_ST_TABLE(data->rec_hash);
}
return data->rec;
}
static int
+obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr)
+{
+ struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
+
+ if (obj_traverse_i(val, d->data)) {
+ d->stop = true;
+ return ST_STOP;
+ }
+
+ return ST_CONTINUE;
+}
+
+static int
obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
{
if (RB_SPECIAL_CONST_P(obj)) return 0;
@@ -2298,15 +1299,14 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
// already traversed
return 0;
}
+ RB_OBJ_WRITTEN(data->rec_hash, Qundef, obj);
- if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
- struct gen_ivtbl *ivtbl;
- rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
- for (uint32_t i = 0; i < ivtbl->numiv; i++) {
- VALUE val = ivtbl->ivptr[i];
- if (!UNDEF_P(val) && obj_traverse_i(val, data)) return 1;
- }
- }
+ struct obj_traverse_callback_data d = {
+ .stop = false,
+ .data = data,
+ };
+ rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d);
+ if (d.stop) return 1;
switch (BUILTIN_TYPE(obj)) {
// no child node
@@ -2316,33 +1316,16 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
case T_REGEXP:
case T_FILE:
case T_SYMBOL:
- case T_MATCH:
break;
case T_OBJECT:
- {
- if (rb_shape_obj_too_complex(obj)) {
- struct obj_traverse_callback_data d = {
- .stop = false,
- .data = data,
- };
- rb_id_table_foreach_values(ROBJECT_IV_HASH(obj), obj_hash_iv_traverse_i, &d);
- if (d.stop) return 1;
- }
- else {
- uint32_t len = ROBJECT_IV_COUNT(obj);
- VALUE *ptr = ROBJECT_IVPTR(obj);
-
- for (uint32_t i=0; i<len; i++) {
- VALUE val = ptr[i];
- if (!UNDEF_P(val) && obj_traverse_i(val, data)) return 1;
- }
- }
- }
+ /* Instance variables already traversed. */
break;
case T_ARRAY:
{
+ rb_ary_cancel_sharing(obj);
+
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
VALUE e = rb_ary_entry(obj, i);
if (obj_traverse_i(e, data)) return 1;
@@ -2365,7 +1348,7 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
case T_STRUCT:
{
- long len = RSTRUCT_LEN(obj);
+ long len = RSTRUCT_LEN_RAW(obj);
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
for (long i=0; i<len; i++) {
@@ -2374,6 +1357,10 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
}
break;
+ case T_MATCH:
+ if (obj_traverse_i(RMATCH(obj)->str, data)) return 1;
+ break;
+
case T_RATIONAL:
if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1;
if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1;
@@ -2390,11 +1377,9 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
.stop = false,
.data = data,
};
- RB_VM_LOCK_ENTER_NO_BARRIER();
- {
+ RB_VM_LOCKING_NO_BARRIER() {
rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d);
}
- RB_VM_LOCK_LEAVE_NO_BARRIER();
if (d.stop) return 1;
}
break;
@@ -2456,7 +1441,7 @@ rb_obj_traverse(VALUE obj,
}
static int
-frozen_shareable_p(VALUE obj, bool *made_shareable)
+allow_frozen_shareable_p(VALUE obj)
{
if (!RB_TYPE_P(obj, T_DATA)) {
return true;
@@ -2466,55 +1451,95 @@ frozen_shareable_p(VALUE obj, bool *made_shareable)
if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) {
return true;
}
- else if (made_shareable && rb_obj_is_proc(obj)) {
- // special path to make shareable Proc.
- rb_proc_ractor_make_shareable(obj);
- *made_shareable = true;
- VM_ASSERT(RB_OBJ_SHAREABLE_P(obj));
- return false;
- }
}
return false;
}
static enum obj_traverse_iterator_result
+make_shareable_check_shareable_freeze(VALUE obj, enum obj_traverse_iterator_result result)
+{
+ if (!RB_OBJ_FROZEN_RAW(obj)) {
+ rb_funcall(obj, idFreeze, 0);
+
+ if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
+ rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
+ }
+
+ if (RB_OBJ_SHAREABLE_P(obj)) {
+ return traverse_skip;
+ }
+ }
+
+ return result;
+}
+
+static int obj_refer_only_shareables_p(VALUE obj);
+
+static enum obj_traverse_iterator_result
make_shareable_check_shareable(VALUE obj)
{
VM_ASSERT(!SPECIAL_CONST_P(obj));
- bool made_shareable = false;
if (rb_ractor_shareable_p(obj)) {
return traverse_skip;
}
- else if (!frozen_shareable_p(obj, &made_shareable)) {
- if (made_shareable) {
- return traverse_skip;
+ else if (!allow_frozen_shareable_p(obj)) {
+ VM_ASSERT(RB_TYPE_P(obj, T_DATA));
+ const rb_data_type_t *type = RTYPEDDATA_TYPE(obj);
+
+ if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE_NO_REC) {
+ if (obj_refer_only_shareables_p(obj)) {
+ make_shareable_check_shareable_freeze(obj, traverse_skip);
+ RB_OBJ_SET_SHAREABLE(obj);
+ return traverse_skip;
+ }
+ else {
+ rb_raise(rb_eRactorError,
+ "can not make shareable object for %+"PRIsVALUE" because it refers unshareable objects", obj);
+ }
+ }
+ else if (rb_obj_is_proc(obj)) {
+ rb_proc_ractor_make_shareable(obj, Qundef);
+ return traverse_cont;
}
else {
- rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj);
+ rb_raise(rb_eRactorError, "can not make shareable object for %+"PRIsVALUE, obj);
}
}
- if (!RB_OBJ_FROZEN_RAW(obj)) {
- rb_funcall(obj, idFreeze, 0);
-
- if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
- rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
- }
-
- if (RB_OBJ_SHAREABLE_P(obj)) {
- return traverse_skip;
+ switch (TYPE(obj)) {
+ case T_IMEMO:
+ return traverse_skip;
+ case T_OBJECT:
+ {
+ // If a T_OBJECT is shared and has no free capacity, we can't safely store the object_id inline,
+ // as it would require to move the object content into an external buffer.
+ // This is only a problem for T_OBJECT, given other types have external fields and can do RCU.
+ // To avoid this issue, we proactively create the object_id.
+ shape_id_t shape_id = RBASIC_SHAPE_ID(obj);
+ attr_index_t capacity = RSHAPE_CAPACITY(shape_id);
+ attr_index_t free_capacity = capacity - RSHAPE_LEN(shape_id);
+ if (!rb_shape_has_object_id(shape_id) && capacity && !free_capacity) {
+ rb_obj_id(obj);
+ }
}
+ break;
+ default:
+ break;
}
- return traverse_cont;
+ return make_shareable_check_shareable_freeze(obj, traverse_cont);
}
static enum obj_traverse_iterator_result
mark_shareable(VALUE obj)
{
- FL_SET_RAW(obj, RUBY_FL_SHAREABLE);
+ if (RB_TYPE_P(obj, T_STRING)) {
+ rb_str_make_independent(obj);
+ }
+
+ rb_obj_set_shareable_no_assert(obj);
return traverse_cont;
}
@@ -2531,10 +1556,7 @@ VALUE
rb_ractor_make_shareable_copy(VALUE obj)
{
VALUE copy = ractor_copy(obj);
- rb_obj_traverse(copy,
- make_shareable_check_shareable,
- null_leave, mark_shareable);
- return copy;
+ return rb_ractor_make_shareable(copy);
}
VALUE
@@ -2570,14 +1592,14 @@ shareable_p_enter(VALUE obj)
return traverse_skip;
}
else if (RB_OBJ_FROZEN_RAW(obj) &&
- frozen_shareable_p(obj, NULL)) {
+ allow_frozen_shareable_p(obj)) {
return traverse_cont;
}
return traverse_stop; // fail
}
-MJIT_FUNC_EXPORTED bool
+bool
rb_ractor_shareable_p_continue(VALUE obj)
{
if (rb_obj_traverse(obj,
@@ -2591,6 +1613,12 @@ rb_ractor_shareable_p_continue(VALUE obj)
}
#if RACTOR_CHECK_MODE > 0
+void
+rb_ractor_setup_belonging(VALUE obj)
+{
+ rb_ractor_setup_belonging_to(obj, rb_ractor_current_id());
+}
+
static enum obj_traverse_iterator_result
reset_belonging_enter(VALUE obj)
{
@@ -2681,28 +1709,28 @@ obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int e
return ST_CONTINUE;
}
-static enum rb_id_table_iterator_result
-obj_iv_hash_traverse_replace_foreach_i(VALUE val, void *data)
+static int
+obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x)
{
- return ID_TABLE_REPLACE;
+ return ST_REPLACE;
}
-static enum rb_id_table_iterator_result
-obj_iv_hash_traverse_replace_i(VALUE *val, void *ptr, int exists)
+static int
+obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists)
{
struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
struct obj_traverse_replace_data *data = d->data;
- if (obj_traverse_replace_i(*val, data)) {
+ if (obj_traverse_replace_i(*(VALUE *)val, data)) {
d->stop = true;
- return ID_TABLE_STOP;
+ return ST_STOP;
}
- else if (*val != data->replacement) {
- VALUE v = *val = data->replacement;
+ else if (*(VALUE *)val != data->replacement) {
+ VALUE v = *(VALUE *)val = data->replacement;
RB_OBJ_WRITTEN(d->src, Qundef, v);
}
- return ID_TABLE_CONTINUE;
+ return ST_CONTINUE;
}
static struct st_table *
@@ -2710,25 +1738,18 @@ obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
{
if (UNLIKELY(!data->rec)) {
data->rec_hash = rb_ident_hash_new();
- data->rec = rb_hash_st_table(data->rec_hash);
+ data->rec = RHASH_ST_TABLE(data->rec_hash);
}
return data->rec;
}
-#if USE_TRANSIENT_HEAP
-void rb_ary_transient_heap_evacuate(VALUE ary, int promote);
-void rb_obj_transient_heap_evacuate(VALUE obj, int promote);
-void rb_hash_transient_heap_evacuate(VALUE hash, int promote);
-void rb_struct_transient_heap_evacuate(VALUE st, int promote);
-#endif
-
static void
obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
{
int *pcnt = (int *)ptr;
if (!rb_ractor_shareable_p(obj)) {
- *pcnt++;
+ ++*pcnt;
}
}
@@ -2736,11 +1757,9 @@ static int
obj_refer_only_shareables_p(VALUE obj)
{
int cnt = 0;
- RB_VM_LOCK_ENTER_NO_BARRIER();
- {
+ RB_VM_LOCKING_NO_BARRIER() {
rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt);
}
- RB_VM_LOCK_LEAVE_NO_BARRIER();
return cnt == 0;
}
@@ -2768,24 +1787,42 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
}
else {
st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement);
+ RB_OBJ_WRITTEN(data->rec_hash, Qundef, obj);
+ RB_OBJ_WRITTEN(data->rec_hash, Qundef, replacement);
}
if (!data->move) {
obj = replacement;
}
-#define CHECK_AND_REPLACE(v) do { \
+#define CHECK_AND_REPLACE(parent_obj, v) do { \
VALUE _val = (v); \
if (obj_traverse_replace_i(_val, data)) { return 1; } \
- else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
+ else if (data->replacement != _val) { RB_OBJ_WRITE(parent_obj, &v, data->replacement); } \
} while (0)
- if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
- struct gen_ivtbl *ivtbl;
- rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
- for (uint32_t i = 0; i < ivtbl->numiv; i++) {
- if (!UNDEF_P(ivtbl->ivptr[i])) {
- CHECK_AND_REPLACE(ivtbl->ivptr[i]);
+ if (UNLIKELY(rb_obj_gen_fields_p(obj))) {
+ VALUE fields_obj = rb_obj_fields_no_ractor_check(obj);
+
+ if (UNLIKELY(rb_shape_obj_too_complex_p(obj))) {
+ struct obj_traverse_replace_callback_data d = {
+ .stop = false,
+ .data = data,
+ .src = fields_obj,
+ };
+ rb_st_foreach_with_replace(
+ rb_imemo_fields_complex_tbl(fields_obj),
+ obj_iv_hash_traverse_replace_foreach_i,
+ obj_iv_hash_traverse_replace_i,
+ (st_data_t)&d
+ );
+ if (d.stop) return 1;
+ }
+ else {
+ uint32_t fields_count = RSHAPE_LEN(RBASIC_SHAPE_ID(obj));
+ VALUE *fields = rb_imemo_fields_ptr(fields_obj);
+ for (uint32_t i = 0; i < fields_count; i++) {
+ CHECK_AND_REPLACE(fields_obj, fields[i]);
}
}
}
@@ -2797,7 +1834,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_REGEXP:
case T_FILE:
case T_SYMBOL:
- case T_MATCH:
break;
case T_STRING:
rb_str_make_independent(obj);
@@ -2805,30 +1841,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_OBJECT:
{
- if (rb_shape_obj_too_complex(obj)) {
- struct rb_id_table * table = ROBJECT_IV_HASH(obj);
+ if (rb_shape_obj_too_complex_p(obj)) {
struct obj_traverse_replace_callback_data d = {
.stop = false,
.data = data,
.src = obj,
};
- rb_id_table_foreach_values_with_replace(table,
- obj_iv_hash_traverse_replace_foreach_i,
- obj_iv_hash_traverse_replace_i,
- (void *)&d);
+ rb_st_foreach_with_replace(
+ ROBJECT_FIELDS_HASH(obj),
+ obj_iv_hash_traverse_replace_foreach_i,
+ obj_iv_hash_traverse_replace_i,
+ (st_data_t)&d
+ );
+ if (d.stop) return 1;
}
else {
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE);
-#endif
+ uint32_t len = ROBJECT_FIELDS_COUNT_NOT_COMPLEX(obj);
+ VALUE *ptr = ROBJECT_FIELDS(obj);
- uint32_t len = ROBJECT_IV_COUNT(obj);
- VALUE *ptr = ROBJECT_IVPTR(obj);
-
- for (uint32_t i=0; i<len; i++) {
- if (!UNDEF_P(ptr[i])) {
- CHECK_AND_REPLACE(ptr[i]);
- }
+ for (uint32_t i = 0; i < len; i++) {
+ CHECK_AND_REPLACE(obj, ptr[i]);
}
}
}
@@ -2837,9 +1869,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_ARRAY:
{
rb_ary_cancel_sharing(obj);
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE);
-#endif
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
VALUE e = rb_ary_entry(obj, i);
@@ -2854,12 +1883,8 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
RB_GC_GUARD(obj);
}
break;
-
case T_HASH:
{
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE);
-#endif
struct obj_traverse_replace_callback_data d = {
.stop = false,
.data = data,
@@ -2884,25 +1909,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_STRUCT:
{
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE);
-#endif
- long len = RSTRUCT_LEN(obj);
+ long len = RSTRUCT_LEN_RAW(obj);
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
for (long i=0; i<len; i++) {
- CHECK_AND_REPLACE(ptr[i]);
+ CHECK_AND_REPLACE(obj, ptr[i]);
}
}
break;
+ case T_MATCH:
+ CHECK_AND_REPLACE(obj, RMATCH(obj)->str);
+ break;
+
case T_RATIONAL:
- CHECK_AND_REPLACE(RRATIONAL(obj)->num);
- CHECK_AND_REPLACE(RRATIONAL(obj)->den);
+ CHECK_AND_REPLACE(obj, RRATIONAL(obj)->num);
+ CHECK_AND_REPLACE(obj, RRATIONAL(obj)->den);
break;
case T_COMPLEX:
- CHECK_AND_REPLACE(RCOMPLEX(obj)->real);
- CHECK_AND_REPLACE(RCOMPLEX(obj)->imag);
+ CHECK_AND_REPLACE(obj, RCOMPLEX(obj)->real);
+ CHECK_AND_REPLACE(obj, RCOMPLEX(obj)->imag);
break;
case T_DATA:
@@ -2961,35 +1987,19 @@ rb_obj_traverse_replace(VALUE obj,
}
}
-struct RVALUE {
- VALUE flags;
- VALUE klass;
- VALUE v1;
- VALUE v2;
- VALUE v3;
+static const bool wb_protected_types[RUBY_T_MASK] = {
+ [T_OBJECT] = RGENGC_WB_PROTECTED_OBJECT,
+ [T_HASH] = RGENGC_WB_PROTECTED_HASH,
+ [T_ARRAY] = RGENGC_WB_PROTECTED_ARRAY,
+ [T_STRING] = RGENGC_WB_PROTECTED_STRING,
+ [T_STRUCT] = RGENGC_WB_PROTECTED_STRUCT,
+ [T_COMPLEX] = RGENGC_WB_PROTECTED_COMPLEX,
+ [T_REGEXP] = RGENGC_WB_PROTECTED_REGEXP,
+ [T_MATCH] = RGENGC_WB_PROTECTED_MATCH,
+ [T_FLOAT] = RGENGC_WB_PROTECTED_FLOAT,
+ [T_RATIONAL] = RGENGC_WB_PROTECTED_RATIONAL,
};
-static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 |
- FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 |
- FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 |
- FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 |
- FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19;
-
-static void
-ractor_moved_bang(VALUE obj)
-{
- // invalidate src object
- struct RVALUE *rv = (void *)obj;
-
- rv->klass = rb_cRactorMovedObject;
- rv->v1 = 0;
- rv->v2 = 0;
- rv->v3 = 0;
- rv->flags = rv->flags & ~fl_users;
-
- // TODO: record moved location
-}
-
static enum obj_traverse_iterator_result
move_enter(VALUE obj, struct obj_traverse_replace_data *data)
{
@@ -2998,33 +2008,45 @@ move_enter(VALUE obj, struct obj_traverse_replace_data *data)
return traverse_skip;
}
else {
- data->replacement = rb_obj_alloc(RBASIC_CLASS(obj));
+ VALUE type = RB_BUILTIN_TYPE(obj);
+ size_t slot_size = rb_gc_obj_slot_size(obj);
+ type |= wb_protected_types[type] ? FL_WB_PROTECTED : 0;
+ NEWOBJ_OF(moved, struct RBasic, 0, type, slot_size, 0);
+ MEMZERO(&moved[1], char, slot_size - sizeof(*moved));
+ data->replacement = (VALUE)moved;
return traverse_cont;
}
}
-void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
-
static enum obj_traverse_iterator_result
move_leave(VALUE obj, struct obj_traverse_replace_data *data)
{
- VALUE v = data->replacement;
- struct RVALUE *dst = (struct RVALUE *)v;
- struct RVALUE *src = (struct RVALUE *)obj;
-
- dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users);
+ // Copy flags
+ VALUE ignored_flags = RUBY_FL_PROMOTED;
+ RBASIC(data->replacement)->flags = (RBASIC(obj)->flags & ~ignored_flags) | (RBASIC(data->replacement)->flags & ignored_flags);
+ // Copy contents without the flags
+ memcpy(
+ (char *)data->replacement + sizeof(VALUE),
+ (char *)obj + sizeof(VALUE),
+ rb_gc_obj_slot_size(obj) - sizeof(VALUE)
+ );
- dst->v1 = src->v1;
- dst->v2 = src->v2;
- dst->v3 = src->v3;
+ // We've copied obj's references to the replacement
+ rb_gc_writebarrier_remember(data->replacement);
- if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
- rb_replace_generic_ivar(v, obj);
+ void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
+ if (UNLIKELY(rb_obj_gen_fields_p(obj))) {
+ rb_replace_generic_ivar(data->replacement, obj);
}
- // TODO: generic_ivar
+ rb_gc_obj_id_moved(data->replacement);
+
+ VALUE flags = T_OBJECT | FL_FREEZE | (RBASIC(obj)->flags & FL_PROMOTED);
- ractor_moved_bang(obj);
+ // Avoid mutations using bind_call, etc.
+ MEMZERO((char *)obj, char, sizeof(struct RBasic));
+ RBASIC(obj)->flags = flags;
+ RBASIC_SET_CLASS_RAW(obj, rb_cRactorMovedObject);
return traverse_cont;
}
@@ -3093,7 +2115,7 @@ ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
}
static enum rb_id_table_iterator_result
-idkey_local_storage_mark_i(ID id, VALUE val, void *dmy)
+idkey_local_storage_mark_i(VALUE val, void *dmy)
{
rb_gc_mark(val);
return ID_TABLE_CONTINUE;
@@ -3116,8 +2138,10 @@ ractor_local_storage_mark(rb_ractor_t *r)
}
if (r->idkey_local_storage) {
- rb_id_table_foreach(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
+ rb_id_table_foreach_values(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
}
+
+ rb_gc_mark(r->local_storage_store_lock);
}
static int
@@ -3180,15 +2204,13 @@ rb_ractor_local_storage_value_newkey(void)
void
rb_ractor_local_storage_delkey(rb_ractor_local_key_t key)
{
- RB_VM_LOCK_ENTER();
- {
+ RB_VM_LOCKING() {
if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) {
freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4;
REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa);
}
freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key;
}
- RB_VM_LOCK_LEAVE();
}
static bool
@@ -3323,4 +2345,277 @@ ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE
return val;
}
+struct ractor_local_storage_store_data {
+ rb_execution_context_t *ec;
+ struct rb_id_table *tbl;
+ ID id;
+ VALUE sym;
+};
+
+static VALUE
+ractor_local_value_store_i(VALUE ptr)
+{
+ VALUE val;
+ struct ractor_local_storage_store_data *data = (struct ractor_local_storage_store_data *)ptr;
+
+ if (rb_id_table_lookup(data->tbl, data->id, &val)) {
+ // after synchronization, we found already registered entry
+ }
+ else {
+ val = rb_yield(Qnil);
+ ractor_local_value_set(data->ec, Qnil, data->sym, val);
+ }
+ return val;
+}
+
+static VALUE
+ractor_local_value_store_if_absent(rb_execution_context_t *ec, VALUE self, VALUE sym)
+{
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ struct ractor_local_storage_store_data data = {
+ .ec = ec,
+ .sym = sym,
+ .id = SYM2ID(rb_to_symbol(sym)),
+ .tbl = cr->idkey_local_storage,
+ };
+ VALUE val;
+
+ if (data.tbl == NULL) {
+ data.tbl = cr->idkey_local_storage = rb_id_table_create(2);
+ }
+ else if (rb_id_table_lookup(data.tbl, data.id, &val)) {
+ // already set
+ return val;
+ }
+
+ if (!cr->local_storage_store_lock) {
+ cr->local_storage_store_lock = rb_mutex_new();
+ }
+
+ return rb_mutex_synchronize(cr->local_storage_store_lock, ractor_local_value_store_i, (VALUE)&data);
+}
+
+// shareable_proc
+
+static VALUE
+ractor_shareable_proc(rb_execution_context_t *ec, VALUE replace_self, bool is_lambda)
+{
+ if (!rb_ractor_shareable_p(replace_self)) {
+ rb_raise(rb_eRactorIsolationError, "self should be shareable: %" PRIsVALUE, replace_self);
+ }
+ else {
+ VALUE proc = is_lambda ? rb_block_lambda() : rb_block_proc();
+ return rb_proc_ractor_make_shareable(rb_proc_dup(proc), replace_self);
+ }
+}
+
+// Ractor#require
+
+struct cross_ractor_require {
+ VALUE port;
+ bool raised;
+
+ union {
+ struct {
+ VALUE feature;
+ } require;
+
+ struct {
+ VALUE module;
+ ID name;
+ } autoload;
+ } as;
+
+ bool silent;
+};
+
+RUBY_REFERENCES(cross_ractor_require_refs) = {
+ RUBY_REF_EDGE(struct cross_ractor_require, port),
+ RUBY_REF_EDGE(struct cross_ractor_require, as.require.feature),
+ RUBY_REF_END
+};
+
+static const rb_data_type_t cross_ractor_require_data_type = {
+ "ractor/cross_ractor_require",
+ {
+ RUBY_REFS_LIST_PTR(cross_ractor_require_refs),
+ RUBY_DEFAULT_FREE,
+ NULL, // memsize
+ NULL, // compact
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_DECL_MARKING | RUBY_TYPED_EMBEDDABLE
+};
+
+static VALUE
+require_body(VALUE crr_obj)
+{
+ struct cross_ractor_require *crr;
+ TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr);
+ VALUE feature = crr->as.require.feature;
+
+ ID require;
+ CONST_ID(require, "require");
+
+ if (crr->silent) {
+ int rb_require_internal_silent(VALUE fname);
+ return INT2NUM(rb_require_internal_silent(feature));
+ }
+ else {
+ return rb_funcallv(Qnil, require, 1, &feature);
+ }
+}
+
+static VALUE
+require_rescue(VALUE crr_obj, VALUE errinfo)
+{
+ struct cross_ractor_require *crr;
+ TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr);
+ crr->raised = true;
+ return errinfo;
+}
+
+static VALUE
+require_result_send_body(VALUE ary)
+{
+ VALUE port = RARRAY_AREF(ary, 0);
+ VALUE results = RARRAY_AREF(ary, 1);
+
+ rb_execution_context_t *ec = GET_EC();
+
+ ractor_port_send(ec, port, results, Qfalse);
+ return Qnil;
+}
+
+static VALUE
+require_result_send_resuce(VALUE port, VALUE errinfo)
+{
+ // TODO: need rescue?
+ ractor_port_send(GET_EC(), port, errinfo, Qfalse);
+ return Qnil;
+}
+
+static VALUE
+ractor_require_protect(VALUE crr_obj, VALUE (*func)(VALUE))
+{
+ struct cross_ractor_require *crr;
+ TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr);
+
+ const bool silent = crr->silent;
+
+ VALUE debug, errinfo;
+ if (silent) {
+ debug = ruby_debug;
+ errinfo = rb_errinfo();
+ }
+
+ // get normal result or raised exception (with crr->raised == true)
+ VALUE result = rb_rescue2(func, crr_obj, require_rescue, crr_obj, rb_eException, 0);
+
+ if (silent) {
+ ruby_debug = debug;
+ rb_set_errinfo(errinfo);
+ }
+
+ rb_rescue2(require_result_send_body,
+ // [port, [result, raised]]
+ rb_ary_new_from_args(2, crr->port, rb_ary_new_from_args(2, result, crr->raised ? Qtrue : Qfalse)),
+ require_result_send_resuce, rb_eException, crr->port);
+
+ RB_GC_GUARD(crr_obj);
+ return Qnil;
+}
+
+static VALUE
+ractor_require_func(void *crr_obj)
+{
+ return ractor_require_protect((VALUE)crr_obj, require_body);
+}
+
+VALUE
+rb_ractor_require(VALUE feature, bool silent)
+{
+ // We're about to block on the main ractor, so if we're holding the global lock we'll deadlock.
+ ASSERT_vm_unlocking();
+
+ struct cross_ractor_require *crr;
+ VALUE crr_obj = TypedData_Make_Struct(0, struct cross_ractor_require, &cross_ractor_require_data_type, crr);
+ RB_OBJ_SET_SHAREABLE(crr_obj); // TODO: internal data?
+
+ // Convert feature to proper file path and make it shareable as fstring
+ RB_OBJ_WRITE(crr_obj, &crr->as.require.feature, rb_fstring(FilePathValue(feature)));
+ RB_OBJ_WRITE(crr_obj, &crr->port, rb_ractor_make_shareable(ractor_port_new(GET_RACTOR())));
+ crr->raised = false;
+ crr->silent = silent;
+
+ rb_execution_context_t *ec = GET_EC();
+ rb_ractor_t *main_r = GET_VM()->ractor.main_ractor;
+ rb_ractor_interrupt_exec(main_r, ractor_require_func, (void *)crr_obj, rb_interrupt_exec_flag_value_data);
+
+ // wait for require done
+ VALUE results = ractor_port_receive(ec, crr->port);
+ ractor_port_close(ec, crr->port);
+
+ VALUE exc = rb_ary_pop(results);
+ VALUE result = rb_ary_pop(results);
+ RB_GC_GUARD(crr_obj);
+
+ if (RTEST(exc)) {
+ rb_exc_raise(result);
+ }
+ else {
+ return result;
+ }
+}
+
+static VALUE
+ractor_require(rb_execution_context_t *ec, VALUE self, VALUE feature)
+{
+ return rb_ractor_require(feature, false);
+}
+
+static VALUE
+autoload_load_body(VALUE crr_obj)
+{
+ struct cross_ractor_require *crr;
+ TypedData_Get_Struct(crr_obj, struct cross_ractor_require, &cross_ractor_require_data_type, crr);
+ return rb_autoload_load(crr->as.autoload.module, crr->as.autoload.name);
+}
+
+static VALUE
+ractor_autoload_load_func(void *crr_obj)
+{
+ return ractor_require_protect((VALUE)crr_obj, autoload_load_body);
+}
+
+VALUE
+rb_ractor_autoload_load(VALUE module, ID name)
+{
+ struct cross_ractor_require *crr;
+ VALUE crr_obj = TypedData_Make_Struct(0, struct cross_ractor_require, &cross_ractor_require_data_type, crr);
+ RB_OBJ_SET_SHAREABLE(crr_obj); // TODO: internal data?
+
+ RB_OBJ_WRITE(crr_obj, &crr->as.autoload.module, module);
+ RB_OBJ_WRITE(crr_obj, &crr->as.autoload.name, name);
+ RB_OBJ_WRITE(crr_obj, &crr->port, rb_ractor_make_shareable(ractor_port_new(GET_RACTOR())));
+
+ rb_execution_context_t *ec = GET_EC();
+ rb_ractor_t *main_r = GET_VM()->ractor.main_ractor;
+ rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, (void *)crr_obj, rb_interrupt_exec_flag_value_data);
+
+ // wait for require done
+ VALUE results = ractor_port_receive(ec, crr->port);
+ ractor_port_close(ec, crr->port);
+
+ VALUE exc = rb_ary_pop(results);
+ VALUE result = rb_ary_pop(results);
+ RB_GC_GUARD(crr_obj);
+
+ if (RTEST(exc)) {
+ rb_exc_raise(result);
+ }
+ else {
+ return result;
+ }
+}
+
#include "ractor.rbinc"