diff options
Diffstat (limited to 'ractor.c')
| -rw-r--r-- | ractor.c | 3255 |
1 files changed, 3255 insertions, 0 deletions
diff --git a/ractor.c b/ractor.c new file mode 100644 index 0000000000..4625225dc5 --- /dev/null +++ b/ractor.c @@ -0,0 +1,3255 @@ +// Ractor implementation + +#include "ruby/ruby.h" +#include "ruby/thread.h" +#include "ruby/ractor.h" +#include "ruby/thread_native.h" +#include "vm_core.h" +#include "vm_sync.h" +#include "ractor_core.h" +#include "internal/complex.h" +#include "internal/error.h" +#include "internal/hash.h" +#include "internal/rational.h" +#include "internal/struct.h" +#include "internal/thread.h" +#include "variable.h" +#include "gc.h" +#include "transient_heap.h" +#include "yjit.h" + +VALUE rb_cRactor; + +VALUE rb_eRactorUnsafeError; +VALUE rb_eRactorIsolationError; +static VALUE rb_eRactorError; +static VALUE rb_eRactorRemoteError; +static VALUE rb_eRactorMovedError; +static VALUE rb_eRactorClosedError; +static VALUE rb_cRactorMovedObject; + +static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line); + +static void +ASSERT_ractor_unlocking(rb_ractor_t *r) +{ +#if RACTOR_CHECK_MODE > 0 + // GET_EC is NULL in an MJIT worker + if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) { + rb_bug("recursive ractor locking"); + } +#endif +} + +static void +ASSERT_ractor_locking(rb_ractor_t *r) +{ +#if RACTOR_CHECK_MODE > 0 + // GET_EC is NULL in an MJIT worker + if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) { + rp(r->sync.locked_by); + rb_bug("ractor lock is not acquired."); + } +#endif +} + +static void +ractor_lock(rb_ractor_t *r, const char *file, int line) +{ + RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : ""); + + ASSERT_ractor_unlocking(r); + rb_native_mutex_lock(&r->sync.lock); + +#if RACTOR_CHECK_MODE > 0 + if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an MJIT worker + r->sync.locked_by = rb_ractor_self(GET_RACTOR()); + } +#endif + + RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : ""); +} + +static void +ractor_lock_self(rb_ractor_t *cr, const char *file, int line) +{ + VM_ASSERT(cr == GET_RACTOR()); + VM_ASSERT(cr->sync.locked_by != cr->pub.self); + ractor_lock(cr, file, line); +} + +static void +ractor_unlock(rb_ractor_t *r, const char *file, int line) +{ + ASSERT_ractor_locking(r); +#if RACTOR_CHECK_MODE > 0 + r->sync.locked_by = Qnil; +#endif + rb_native_mutex_unlock(&r->sync.lock); + + RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : ""); +} + +static void +ractor_unlock_self(rb_ractor_t *cr, const char *file, int line) +{ + VM_ASSERT(cr == GET_RACTOR()); + VM_ASSERT(cr->sync.locked_by == cr->pub.self); + ractor_unlock(cr, file, line); +} + +#define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__) +#define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__) +#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__) +#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__) + +static void +ractor_cond_wait(rb_ractor_t *r) +{ +#if RACTOR_CHECK_MODE > 0 + VALUE locked_by = r->sync.locked_by; + r->sync.locked_by = Qnil; +#endif + rb_native_cond_wait(&r->sync.cond, &r->sync.lock); + +#if RACTOR_CHECK_MODE > 0 + r->sync.locked_by = locked_by; +#endif +} + +static const char * +ractor_status_str(enum ractor_status status) +{ + switch (status) { + case ractor_created: return "created"; + case ractor_running: return "running"; + case ractor_blocking: return "blocking"; + case ractor_terminated: return "terminated"; + } + rb_bug("unreachable"); +} + +static void +ractor_status_set(rb_ractor_t *r, enum ractor_status status) +{ + RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status)); + + // check 1 + if (r->status_ != ractor_created) { + VM_ASSERT(r == GET_RACTOR()); // only self-modification is allowed. + ASSERT_vm_locking(); + } + + // check2: transition check. assume it will be vanished on non-debug build. + switch (r->status_) { + case ractor_created: + VM_ASSERT(status == ractor_blocking); + break; + case ractor_running: + VM_ASSERT(status == ractor_blocking|| + status == ractor_terminated); + break; + case ractor_blocking: + VM_ASSERT(status == ractor_running); + break; + case ractor_terminated: + VM_ASSERT(0); // unreachable + break; + } + + r->status_ = status; +} + +static bool +ractor_status_p(rb_ractor_t *r, enum ractor_status status) +{ + return rb_ractor_status_p(r, status); +} + +static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i); + +static void +ractor_queue_mark(struct rb_ractor_queue *rq) +{ + for (int i=0; i<rq->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + rb_gc_mark(b->v); + rb_gc_mark(b->sender); + } +} + +static void ractor_local_storage_mark(rb_ractor_t *r); +static void ractor_local_storage_free(rb_ractor_t *r); + +static void +ractor_mark(void *ptr) +{ + rb_ractor_t *r = (rb_ractor_t *)ptr; + + ractor_queue_mark(&r->sync.incoming_queue); + rb_gc_mark(r->sync.wait.taken_basket.v); + rb_gc_mark(r->sync.wait.taken_basket.sender); + rb_gc_mark(r->sync.wait.yielded_basket.v); + rb_gc_mark(r->sync.wait.yielded_basket.sender); + rb_gc_mark(r->receiving_mutex); + + rb_gc_mark(r->loc); + rb_gc_mark(r->name); + rb_gc_mark(r->r_stdin); + rb_gc_mark(r->r_stdout); + rb_gc_mark(r->r_stderr); + rb_hook_list_mark(&r->pub.hooks); + + if (r->threads.cnt > 0) { + rb_thread_t *th = 0; + list_for_each(&r->threads.set, th, lt_node) { + VM_ASSERT(th != NULL); + rb_gc_mark(th->self); + } + } + + ractor_local_storage_mark(r); +} + +static void +ractor_queue_free(struct rb_ractor_queue *rq) +{ + free(rq->baskets); +} + +static void +ractor_waiting_list_free(struct rb_ractor_waiting_list *wl) +{ + free(wl->ractors); +} + +static void +ractor_free(void *ptr) +{ + rb_ractor_t *r = (rb_ractor_t *)ptr; + rb_native_mutex_destroy(&r->sync.lock); + rb_native_cond_destroy(&r->sync.cond); + ractor_queue_free(&r->sync.incoming_queue); + ractor_waiting_list_free(&r->sync.taking_ractors); + ractor_local_storage_free(r); + rb_hook_list_free(&r->pub.hooks); + ruby_xfree(r); +} + +static size_t +ractor_queue_memsize(const struct rb_ractor_queue *rq) +{ + return sizeof(struct rb_ractor_basket) * rq->size; +} + +static size_t +ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl) +{ + return sizeof(rb_ractor_t *) * wl->size; +} + +static size_t +ractor_memsize(const void *ptr) +{ + rb_ractor_t *r = (rb_ractor_t *)ptr; + + // TODO + return sizeof(rb_ractor_t) + + ractor_queue_memsize(&r->sync.incoming_queue) + + ractor_waiting_list_memsize(&r->sync.taking_ractors); +} + +static const rb_data_type_t ractor_data_type = { + "ractor", + { + ractor_mark, + ractor_free, + ractor_memsize, + NULL, // update + }, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */ +}; + +bool +rb_ractor_p(VALUE gv) +{ + if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) { + return true; + } + else { + return false; + } +} + +static inline rb_ractor_t * +RACTOR_PTR(VALUE self) +{ + VM_ASSERT(rb_ractor_p(self)); + + rb_ractor_t *r = DATA_PTR(self); + // TODO: check + return r; +} + +static rb_atomic_t ractor_last_id; + +#if RACTOR_CHECK_MODE > 0 +MJIT_FUNC_EXPORTED uint32_t +rb_ractor_current_id(void) +{ + if (GET_THREAD()->ractor == NULL) { + return 1; // main ractor + } + else { + return rb_ractor_id(GET_RACTOR()); + } +} +#endif + +static void +ractor_queue_setup(struct rb_ractor_queue *rq) +{ + rq->size = 2; + rq->cnt = 0; + rq->start = 0; + rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); +} + +static struct rb_ractor_basket * +ractor_queue_at(struct rb_ractor_queue *rq, int i) +{ + return &rq->baskets[(rq->start + i) % rq->size]; +} + +static void +ractor_queue_advance(struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(GET_RACTOR()); + + if (rq->reserved_cnt == 0) { + rq->cnt--; + rq->start = (rq->start + 1) % rq->size; + rq->serial++; + } + else { + ractor_queue_at(rq, 0)->type = basket_type_deleted; + } +} + +static bool +ractor_queue_skip_p(struct rb_ractor_queue *rq, int i) +{ + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + return b->type == basket_type_deleted || + b->type == basket_type_reserved; +} + +static void +ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(r); + + while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) { + ractor_queue_advance(rq); + } +} + +static bool +ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(r); + + if (rq->cnt == 0) { + return true; + } + + ractor_queue_compact(r, rq); + + for (int i=0; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + return false; + } + } + + return true; +} + +static bool +ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) +{ + bool found = false; + + RACTOR_LOCK(r); + { + if (!ractor_queue_empty_p(r, rq)) { + for (int i=0; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + *basket = *b; + + // remove from queue + b->type = basket_type_deleted; + ractor_queue_compact(r, rq); + found = true; + break; + } + } + } + } + RACTOR_UNLOCK(r); + + return found; +} + +static void +ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) +{ + ASSERT_ractor_locking(r); + + if (rq->size <= rq->cnt) { + rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2); + for (int i=rq->size - rq->start; i<rq->cnt; i++) { + rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size]; + } + rq->size *= 2; + } + rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket; + // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt); +} + +static void +ractor_basket_clear(struct rb_ractor_basket *b) +{ + b->type = basket_type_none; + b->v = Qfalse; + b->sender = Qfalse; +} + +static VALUE ractor_reset_belonging(VALUE obj); // in this file + +static VALUE +ractor_basket_value(struct rb_ractor_basket *b) +{ + switch (b->type) { + case basket_type_ref: + break; + case basket_type_copy: + case basket_type_move: + case basket_type_will: + b->type = basket_type_ref; + b->v = ractor_reset_belonging(b->v); + break; + default: + rb_bug("unreachable"); + } + + return b->v; +} + +static VALUE +ractor_basket_accept(struct rb_ractor_basket *b) +{ + VALUE v = ractor_basket_value(b); + + if (b->exception) { + VALUE cause = v; + VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); + rb_ivar_set(err, rb_intern("@ractor"), b->sender); + ractor_basket_clear(b); + rb_ec_setup_exception(NULL, err, cause); + rb_exc_raise(err); + } + + ractor_basket_clear(b); + return v; +} + +static void +ractor_recursive_receive_if(rb_ractor_t *r) +{ + if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) { + rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); + } +} + +static VALUE +ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) +{ + struct rb_ractor_queue *rq = &r->sync.incoming_queue; + struct rb_ractor_basket basket; + + ractor_recursive_receive_if(r); + + if (ractor_queue_deq(r, rq, &basket) == false) { + if (r->sync.incoming_port_closed) { + rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); + } + else { + return Qundef; + } + } + + return ractor_basket_accept(&basket); +} + +static bool +ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status) +{ + return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none; +} + +static bool +ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_wakeup_status wakeup_status) +{ + ASSERT_ractor_locking(r); + + // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", RUBY_FUNCTION_NAME_STRING, (void *)r, + // wait_status_str(r->sync.wait.status), wait_status_str(wait_status), + // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status)); + + if (ractor_sleeping_by(r, wait_status)) { + r->sync.wait.wakeup_status = wakeup_status; + rb_native_cond_signal(&r->sync.cond); + return true; + } + else { + return false; + } +} + +static void * +ractor_sleep_wo_gvl(void *ptr) +{ + rb_ractor_t *cr = ptr; + RACTOR_LOCK_SELF(cr); + { + VM_ASSERT(cr->sync.wait.status != wait_none); + if (cr->sync.wait.wakeup_status == wakeup_none) { + ractor_cond_wait(cr); + } + cr->sync.wait.status = wait_none; + } + RACTOR_UNLOCK_SELF(cr); + return NULL; +} + +static void +ractor_sleep_interrupt(void *ptr) +{ + rb_ractor_t *r = ptr; + + RACTOR_LOCK(r); + { + ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt); + } + RACTOR_UNLOCK(r); +} + +#if defined(USE_RUBY_DEBUG_LOG) && USE_RUBY_DEBUG_LOG +static const char * +wait_status_str(enum ractor_wait_status wait_status) +{ + switch ((int)wait_status) { + case wait_none: return "none"; + case wait_receiving: return "receiving"; + case wait_taking: return "taking"; + case wait_yielding: return "yielding"; + case wait_receiving|wait_taking: return "receiving|taking"; + case wait_receiving|wait_yielding: return "receiving|yielding"; + case wait_taking|wait_yielding: return "taking|yielding"; + case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding"; + } + rb_bug("unreachable"); +} + +static const char * +wakeup_status_str(enum ractor_wakeup_status wakeup_status) +{ + switch (wakeup_status) { + case wakeup_none: return "none"; + case wakeup_by_send: return "by_send"; + case wakeup_by_yield: return "by_yield"; + case wakeup_by_take: return "by_take"; + case wakeup_by_close: return "by_close"; + case wakeup_by_interrupt: return "by_interrupt"; + case wakeup_by_retry: return "by_retry"; + } + rb_bug("unreachable"); +} +#endif // USE_RUBY_DEBUG_LOG + +static void +ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr) +{ + VM_ASSERT(GET_RACTOR() == cr); + VM_ASSERT(cr->sync.wait.status != wait_none); + // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr, + // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status)); + + RACTOR_UNLOCK(cr); + { + rb_nogvl(ractor_sleep_wo_gvl, cr, + ractor_sleep_interrupt, cr, + RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL); + } + RACTOR_LOCK(cr); + + // rb_nogvl() can be canceled by interrupts + if (cr->sync.wait.status != wait_none) { + cr->sync.wait.status = wait_none; + cr->sync.wait.wakeup_status = wakeup_by_interrupt; + + RACTOR_UNLOCK(cr); + rb_thread_check_ints(); + RACTOR_LOCK(cr); // reachable? + } +} + +static void +ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr) +{ + VM_ASSERT(cr == GET_RACTOR()); + bool retry_try = false; + + RACTOR_LOCK(r); + { + if (ractor_sleeping_by(r, wait_yielding)) { + // already waiting for yielding. retry try_take. + retry_try = true; + } + else { + // insert cr into taking list + struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors; + + for (int i=0; i<wl->cnt; i++) { + if (wl->ractors[i] == cr) { + // TODO: make it clean code. + rb_native_mutex_unlock(&r->sync.lock); + rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting."); + } + } + + if (wl->size == 0) { + wl->size = 1; + wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size); + if (wl->ractors == NULL) rb_bug("can't allocate buffer"); + } + else if (wl->size <= wl->cnt + 1) { + wl->size *= 2; + wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size); + if (wl->ractors == NULL) rb_bug("can't re-allocate buffer"); + } + wl->ractors[wl->cnt++] = cr; + } + } + RACTOR_UNLOCK(r); + + if (retry_try) { + RACTOR_LOCK(cr); + { + if (cr->sync.wait.wakeup_status == wakeup_none) { + VM_ASSERT(cr->sync.wait.status != wait_none); + + cr->sync.wait.wakeup_status = wakeup_by_retry; + cr->sync.wait.status = wait_none; + } + } + RACTOR_UNLOCK(cr); + } +} + +static void +ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr) +{ + RACTOR_LOCK(r); + { + int pos = -1; + for (int i=0; i<wl->cnt; i++) { + if (wl->ractors[i] == wr) { + pos = i; + break; + } + } + if (pos >= 0) { // found + wl->cnt--; + for (int i=pos; i<wl->cnt; i++) { + wl->ractors[i] = wl->ractors[i+1]; + } + } + } + RACTOR_UNLOCK(r); +} + +static rb_ractor_t * +ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) +{ + ASSERT_ractor_locking(r); + VM_ASSERT(&r->sync.taking_ractors == wl); + + if (wl->cnt > 0) { + rb_ractor_t *tr = wl->ractors[0]; + for (int i=1; i<wl->cnt; i++) { + wl->ractors[i-1] = wl->ractors[i]; + } + wl->cnt--; + return tr; + } + else { + return NULL; + } +} + +static void +ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr) +{ + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); + ractor_recursive_receive_if(cr); + + RACTOR_LOCK(cr); + { + if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) { + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_receiving; + cr->sync.wait.wakeup_status = wakeup_none; + ractor_sleep(ec, cr); + cr->sync.wait.wakeup_status = wakeup_none; + } + } + RACTOR_UNLOCK(cr); +} + +static VALUE +ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) +{ + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); + VALUE v; + + while ((v = ractor_try_receive(ec, cr)) == Qundef) { + ractor_receive_wait(ec, cr); + } + + return v; +} + +#if 0 +// for debug +static const char * +basket_type_name(enum rb_ractor_basket_type type) +{ + switch (type) { +#define T(t) case basket_type_##t: return #t + T(none); + T(ref); + T(copy); + T(move); + T(will); + T(deleted); + T(reserved); + default: rb_bug("unreachable"); + } +} + +static void +rq_dump(struct rb_ractor_queue *rq) +{ + bool bug = false; + for (int i=0; i<rq->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), + (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); + if (b->type == basket_type_reserved) bug = true; + } + if (bug) rb_bug("!!"); +} +#endif + +struct receive_block_data { + rb_ractor_t *cr; + struct rb_ractor_queue *rq; + VALUE v; + int index; + bool success; +}; + +static void +ractor_receive_if_lock(rb_ractor_t *cr) +{ + VALUE m = cr->receiving_mutex; + if (m == Qfalse) { + m = cr->receiving_mutex = rb_mutex_new(); + } + rb_mutex_lock(m); +} + +static VALUE +receive_if_body(VALUE ptr) +{ + struct receive_block_data *data = (struct receive_block_data *)ptr; + + ractor_receive_if_lock(data->cr); + VALUE block_result = rb_yield(data->v); + + RACTOR_LOCK_SELF(data->cr); + { + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + data->rq->reserved_cnt--; + + if (RTEST(block_result)) { + b->type = basket_type_deleted; + ractor_queue_compact(data->cr, data->rq); + } + else { + b->type = basket_type_ref; + } + } + RACTOR_UNLOCK_SELF(data->cr); + + data->success = true; + + if (RTEST(block_result)) { + return data->v; + } + else { + return Qundef; + } +} + +static VALUE +receive_if_ensure(VALUE v) +{ + struct receive_block_data *data = (struct receive_block_data *)v; + + if (!data->success) { + RACTOR_LOCK_SELF(data->cr); + { + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + b->type = basket_type_deleted; + data->rq->reserved_cnt--; + } + RACTOR_UNLOCK_SELF(data->cr); + } + + rb_mutex_unlock(data->cr->receiving_mutex); + return Qnil; +} + +static VALUE +ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) +{ + if (!RTEST(b)) rb_raise(rb_eArgError, "no block given"); + + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + unsigned int serial = (unsigned int)-1; + int index = 0; + struct rb_ractor_queue *rq = &cr->sync.incoming_queue; + + while (1) { + VALUE v = Qundef; + + ractor_receive_wait(ec, cr); + + RACTOR_LOCK_SELF(cr); + { + if (serial != rq->serial) { + serial = rq->serial; + index = 0; + } + + // check newer version + for (int i=index; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + v = ractor_basket_value(b); + b->type = basket_type_reserved; + rq->reserved_cnt++; + index = i; + break; + } + } + } + RACTOR_UNLOCK_SELF(cr); + + if (v != Qundef) { + struct receive_block_data data = { + .cr = cr, + .rq = rq, + .v = v, + .index = index, + .success = false, + }; + + VALUE result = rb_ensure(receive_if_body, (VALUE)&data, + receive_if_ensure, (VALUE)&data); + + if (result != Qundef) return result; + index++; + } + } +} + +static void +ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b) +{ + bool closed = false; + struct rb_ractor_queue *rq = &r->sync.incoming_queue; + + RACTOR_LOCK(r); + { + if (r->sync.incoming_port_closed) { + closed = true; + } + else { + ractor_queue_enq(r, rq, b); + if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) { + RUBY_DEBUG_LOG("wakeup"); + } + } + } + RACTOR_UNLOCK(r); + + if (closed) { + rb_raise(rb_eRactorClosedError, "The incoming-port is already closed"); + } +} + +static VALUE ractor_move(VALUE obj); // in this file +static VALUE ractor_copy(VALUE obj); // in this file + +static void +ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield) +{ + basket->sender = rb_ec_ractor_ptr(ec)->pub.self; + basket->exception = exc; + + if (is_will) { + basket->type = basket_type_will; + basket->v = obj; + } + else if (rb_ractor_shareable_p(obj)) { + basket->type = basket_type_ref; + basket->v = obj; + } + else if (!RTEST(move)) { + basket->v = ractor_copy(obj); + basket->type = basket_type_copy; + } + else { + basket->type = basket_type_move; + + if (is_yield) { + basket->v = obj; // call ractor_move() when yielding timing. + } + else { + basket->v = ractor_move(obj); + } + } +} + +static VALUE +ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) +{ + struct rb_ractor_basket basket; + ractor_basket_setup(ec, &basket, obj, move, false, false, false); + ractor_send_basket(ec, r, &basket); + return r->pub.self; +} + +static VALUE +ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r) +{ + struct rb_ractor_basket basket = { + .type = basket_type_none, + }; + bool closed = false; + + RACTOR_LOCK(r); + { + if (ractor_sleeping_by(r, wait_yielding)) { + MAYBE_UNUSED(bool) wakeup_result; + VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none); + + if (r->sync.wait.yielded_basket.type == basket_type_move) { + wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry); + } + else { + wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take); + basket = r->sync.wait.yielded_basket; + ractor_basket_clear(&r->sync.wait.yielded_basket); + } + VM_ASSERT(wakeup_result); + } + else if (r->sync.outgoing_port_closed) { + closed = true; + } + } + RACTOR_UNLOCK(r); + + if (basket.type == basket_type_none) { + if (closed) { + rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + } + else { + return Qundef; + } + } + else { + return ractor_basket_accept(&basket); + } +} + +static VALUE +ractor_yield_move_body(VALUE v) +{ + return ractor_move(v); +} + +static bool +ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket) +{ + ASSERT_ractor_unlocking(cr); + VM_ASSERT(basket->type != basket_type_none); + + if (cr->sync.outgoing_port_closed) { + rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed"); + } + + rb_ractor_t *r; + + retry_shift: + RACTOR_LOCK(cr); + { + r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors); + } + RACTOR_UNLOCK(cr); + + if (r) { + bool retry_shift = false; + + RACTOR_LOCK(r); + { + if (ractor_sleeping_by(r, wait_taking)) { + VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none); + + if (basket->type == basket_type_move) { + enum ractor_wait_status prev_wait_status = r->sync.wait.status; + r->sync.wait.status = wait_moving; + + RACTOR_UNLOCK(r); + { + int state; + VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state); + if (state) { + r->sync.wait.status = prev_wait_status; + rb_jump_tag(state); + } + else { + basket->v = moved_value; + } + } + RACTOR_LOCK(r); + + if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) { + // terminating? + } + } + else { + ractor_wakeup(r, wait_taking, wakeup_by_yield); + } + r->sync.wait.taken_basket = *basket; + } + else { + retry_shift = true; + } + } + RACTOR_UNLOCK(r); + + if (retry_shift) { + // get candidate take-waiting ractor, but already woke up by another reason. + // retry to check another ractor. + goto retry_shift; + } + else { + return true; + } + } + else { + return false; + } +} + +// select(r1, r2, r3, receive: true, yield: obj) +static VALUE +ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VALUE yielded_value, bool move, VALUE *ret_r) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + VALUE crv = cr->pub.self; + VALUE ret = Qundef; + int i; + bool interrupted = false; + enum ractor_wait_status wait_status = 0; + bool yield_p = (yielded_value != Qundef) ? true : false; + const int alen = rs_len + (yield_p ? 1 : 0); + + struct ractor_select_action { + enum ractor_select_action_type { + ractor_select_action_take, + ractor_select_action_receive, + ractor_select_action_yield, + } type; + VALUE v; + } *actions = ALLOCA_N(struct ractor_select_action, alen); + + VM_ASSERT(cr->sync.wait.status == wait_none); + VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); + VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); + VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); + + // setup actions + for (i=0; i<rs_len; i++) { + VALUE v = rs[i]; + + if (v == crv) { + actions[i].type = ractor_select_action_receive; + actions[i].v = Qnil; + wait_status |= wait_receiving; + } + else if (rb_ractor_p(v)) { + actions[i].type = ractor_select_action_take; + actions[i].v = v; + wait_status |= wait_taking; + } + else { + rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v); + } + } + rs = NULL; + + restart: + + if (yield_p) { + actions[rs_len].type = ractor_select_action_yield; + actions[rs_len].v = Qundef; + wait_status |= wait_yielding; + ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true); + } + + // TODO: shuffle actions + + while (1) { + RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status)); + + for (i=0; i<alen; i++) { + VALUE v, rv; + switch (actions[i].type) { + case ractor_select_action_take: + rv = actions[i].v; + v = ractor_try_take(ec, RACTOR_PTR(rv)); + if (v != Qundef) { + *ret_r = rv; + ret = v; + goto cleanup; + } + break; + case ractor_select_action_receive: + v = ractor_try_receive(ec, cr); + if (v != Qundef) { + *ret_r = ID2SYM(rb_intern("receive")); + ret = v; + goto cleanup; + } + break; + case ractor_select_action_yield: + { + if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) { + *ret_r = ID2SYM(rb_intern("yield")); + ret = Qnil; + goto cleanup; + } + } + break; + } + } + + RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status)); + + RACTOR_LOCK(cr); + { + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_status; + cr->sync.wait.wakeup_status = wakeup_none; + } + RACTOR_UNLOCK(cr); + + // prepare waiting + for (i=0; i<alen; i++) { + rb_ractor_t *r; + switch (actions[i].type) { + case ractor_select_action_take: + r = RACTOR_PTR(actions[i].v); + ractor_register_taking(r, cr); + break; + case ractor_select_action_yield: + case ractor_select_action_receive: + break; + } + } + + // wait + RACTOR_LOCK(cr); + { + if (cr->sync.wait.wakeup_status == wakeup_none) { + for (i=0; i<alen; i++) { + rb_ractor_t *r; + + switch (actions[i].type) { + case ractor_select_action_take: + r = RACTOR_PTR(actions[i].v); + if (ractor_sleeping_by(r, wait_yielding)) { + RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->pub.id); + cr->sync.wait.wakeup_status = wakeup_by_retry; + goto skip_sleep; + } + break; + case ractor_select_action_receive: + if (cr->sync.incoming_queue.cnt > 0) { + RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt); + cr->sync.wait.wakeup_status = wakeup_by_retry; + goto skip_sleep; + } + break; + case ractor_select_action_yield: + if (cr->sync.taking_ractors.cnt > 0) { + RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt); + cr->sync.wait.wakeup_status = wakeup_by_retry; + goto skip_sleep; + } + else if (cr->sync.outgoing_port_closed) { + cr->sync.wait.wakeup_status = wakeup_by_close; + goto skip_sleep; + } + break; + } + } + + RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status)); + ractor_sleep(ec, cr); + RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status)); + } + else { + skip_sleep: + RUBY_DEBUG_LOG("no need to sleep %s->%s", + wait_status_str(cr->sync.wait.status), + wakeup_status_str(cr->sync.wait.wakeup_status)); + cr->sync.wait.status = wait_none; + } + } + RACTOR_UNLOCK(cr); + + // cleanup waiting + for (i=0; i<alen; i++) { + rb_ractor_t *r; + switch (actions[i].type) { + case ractor_select_action_take: + r = RACTOR_PTR(actions[i].v); + ractor_waiting_list_del(r, &r->sync.taking_ractors, cr); + break; + case ractor_select_action_receive: + case ractor_select_action_yield: + break; + } + } + + // check results + enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status; + cr->sync.wait.wakeup_status = wakeup_none; + + switch (wakeup_status) { + case wakeup_none: + // OK. something happens. + // retry loop. + break; + case wakeup_by_retry: + // Retry request. + break; + case wakeup_by_send: + // OK. + // retry loop and try_receive will succss. + break; + case wakeup_by_yield: + // take was succeeded! + // cr.wait.taken_basket contains passed block + VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none); + *ret_r = cr->sync.wait.taken_basket.sender; + VM_ASSERT(rb_ractor_p(*ret_r)); + ret = ractor_basket_accept(&cr->sync.wait.taken_basket); + goto cleanup; + case wakeup_by_take: + *ret_r = ID2SYM(rb_intern("yield")); + ret = Qnil; + goto cleanup; + case wakeup_by_close: + // OK. + // retry loop and will get CloseError. + break; + case wakeup_by_interrupt: + ret = Qundef; + interrupted = true; + goto cleanup; + } + } + + cleanup: + RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status)); + + if (cr->sync.wait.yielded_basket.type != basket_type_none) { + ractor_basket_clear(&cr->sync.wait.yielded_basket); + } + + VM_ASSERT(cr->sync.wait.status == wait_none); + VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none); + VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none); + VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none); + + if (interrupted) { + rb_vm_check_ints_blocking(ec); + interrupted = false; + goto restart; + } + + VM_ASSERT(ret != Qundef); + return ret; +} + +static VALUE +ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move) +{ + VALUE ret_r; + ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r); + return Qnil; +} + +static VALUE +ractor_take(rb_execution_context_t *ec, rb_ractor_t *r) +{ + VALUE ret_r; + VALUE v = ractor_select(ec, &r->pub.self, 1, Qundef, false, &ret_r); + return v; +} + +static VALUE +ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r) +{ + VALUE prev; + + RACTOR_LOCK(r); + { + if (!r->sync.incoming_port_closed) { + prev = Qfalse; + r->sync.incoming_port_closed = true; + if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) { + VM_ASSERT(r->sync.incoming_queue.cnt == 0); + RUBY_DEBUG_LOG("cancel receiving"); + } + } + else { + prev = Qtrue; + } + } + RACTOR_UNLOCK(r); + return prev; +} + +static VALUE +ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r) +{ + VALUE prev; + + RACTOR_LOCK(r); + { + if (!r->sync.outgoing_port_closed) { + prev = Qfalse; + r->sync.outgoing_port_closed = true; + } + else { + prev = Qtrue; + } + + // wakeup all taking ractors + rb_ractor_t *taking_ractor; + while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) { + RACTOR_LOCK(taking_ractor); + ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close); + RACTOR_UNLOCK(taking_ractor); + } + + // raising yielding Ractor + if (!r->yield_atexit && + ractor_wakeup(r, wait_yielding, wakeup_by_close)) { + RUBY_DEBUG_LOG("cancel yielding"); + } + } + RACTOR_UNLOCK(r); + return prev; +} + +// creation/termination + +static uint32_t +ractor_next_id(void) +{ + uint32_t id; + + id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1); + + return id; +} + +static void +vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode) +{ + RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt); + VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P()); + + list_add_tail(&vm->ractor.set, &r->vmlr_node); + vm->ractor.cnt++; +} + +static void +cancel_single_ractor_mode(void) +{ + // enable multi-ractor mode + RUBY_DEBUG_LOG("enable multi-ractor mode"); + + VALUE was_disabled = rb_gc_enable(); + + rb_gc_start(); + rb_transient_heap_evacuate(); + + if (was_disabled) { + rb_gc_disable(); + } + + ruby_single_main_ractor = NULL; +} + +static void +vm_insert_ractor(rb_vm_t *vm, rb_ractor_t *r) +{ + VM_ASSERT(ractor_status_p(r, ractor_created)); + + if (rb_multi_ractor_p()) { + RB_VM_LOCK(); + { + vm_insert_ractor0(vm, r, false); + vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__); + } + RB_VM_UNLOCK(); + } + else { + if (vm->ractor.cnt == 0) { + // main ractor + vm_insert_ractor0(vm, r, true); + ractor_status_set(r, ractor_blocking); + ractor_status_set(r, ractor_running); + } + else { + cancel_single_ractor_mode(); + vm_insert_ractor0(vm, r, true); + vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__); + } + } +} + +static void +vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr) +{ + VM_ASSERT(ractor_status_p(cr, ractor_running)); + VM_ASSERT(vm->ractor.cnt > 1); + VM_ASSERT(cr->threads.cnt == 1); + + RB_VM_LOCK(); + { + RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d", + vm->ractor.cnt, vm->ractor.sync.terminate_waiting); + + VM_ASSERT(vm->ractor.cnt > 0); + list_del(&cr->vmlr_node); + + if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) { + rb_native_cond_signal(&vm->ractor.sync.terminate_cond); + } + vm->ractor.cnt--; + + /* Clear the cached freelist to prevent a memory leak. */ + rb_gc_ractor_newobj_cache_clear(&cr->newobj_cache); + + ractor_status_set(cr, ractor_terminated); + } + RB_VM_UNLOCK(); +} + +static VALUE +ractor_alloc(VALUE klass) +{ + rb_ractor_t *r; + VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r); + FL_SET_RAW(rv, RUBY_FL_SHAREABLE); + r->pub.self = rv; + VM_ASSERT(ractor_status_p(r, ractor_created)); + return rv; +} + +rb_ractor_t * +rb_ractor_main_alloc(void) +{ + rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t)); + if (r == NULL) { + fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n"); + exit(EXIT_FAILURE); + } + MEMZERO(r, rb_ractor_t, 1); + r->pub.id = ++ractor_last_id; + r->loc = Qnil; + r->name = Qnil; + r->pub.self = Qnil; + ruby_single_main_ractor = r; + + return r; +} + +#if defined(HAVE_WORKING_FORK) +void +rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th) +{ + // initialize as a main ractor + vm->ractor.cnt = 0; + vm->ractor.blocking_cnt = 0; + ruby_single_main_ractor = th->ractor; + th->ractor->status_ = ractor_created; + + rb_ractor_living_threads_init(th->ractor); + rb_ractor_living_threads_insert(th->ractor, th); + + VM_ASSERT(vm->ractor.blocking_cnt == 0); + VM_ASSERT(vm->ractor.cnt == 1); +} +#endif + +void rb_gvl_init(rb_global_vm_lock_t *gvl); + +void +rb_ractor_living_threads_init(rb_ractor_t *r) +{ + list_head_init(&r->threads.set); + r->threads.cnt = 0; + r->threads.blocking_cnt = 0; +} + +static void +ractor_init(rb_ractor_t *r, VALUE name, VALUE loc) +{ + ractor_queue_setup(&r->sync.incoming_queue); + rb_native_mutex_initialize(&r->sync.lock); + rb_native_cond_initialize(&r->sync.cond); + rb_native_cond_initialize(&r->barrier_wait_cond); + + // thread management + rb_gvl_init(&r->threads.gvl); + rb_ractor_living_threads_init(r); + + // naming + if (!NIL_P(name)) { + rb_encoding *enc; + StringValueCStr(name); + enc = rb_enc_get(name); + if (!rb_enc_asciicompat(enc)) { + rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)", + rb_enc_name(enc)); + } + name = rb_str_new_frozen(name); + } + r->name = name; + r->loc = loc; +} + +void +rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th) +{ + r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r); + FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE); + ractor_init(r, Qnil, Qnil); + r->threads.main = th; + rb_ractor_living_threads_insert(r, th); +} + +static VALUE +ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block) +{ + VALUE rv = ractor_alloc(self); + rb_ractor_t *r = RACTOR_PTR(rv); + ractor_init(r, name, loc); + + // can block here + r->pub.id = ractor_next_id(); + RUBY_DEBUG_LOG("r:%u", r->pub.id); + + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + r->verbose = cr->verbose; + r->debug = cr->debug; + + rb_yjit_before_ractor_spawn(); + rb_thread_create_ractor(r, args, block); + + RB_GC_GUARD(rv); + return rv; +} + +static void +ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc) +{ + if (cr->sync.outgoing_port_closed) { + return; + } + + ASSERT_ractor_unlocking(cr); + + struct rb_ractor_basket basket; + ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */); + + retry: + if (ractor_try_yield(ec, cr, &basket)) { + // OK. + } + else { + bool retry = false; + RACTOR_LOCK(cr); + { + if (cr->sync.taking_ractors.cnt == 0) { + cr->sync.wait.yielded_basket = basket; + + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_yielding; + cr->sync.wait.wakeup_status = wakeup_none; + + VM_ASSERT(cr->yield_atexit == false); + cr->yield_atexit = true; + } + else { + retry = true; // another ractor is waiting for the yield. + } + } + RACTOR_UNLOCK(cr); + + if (retry) goto retry; + } +} + +void +rb_ractor_teardown(rb_execution_context_t *ec) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_close_incoming(ec, cr); + ractor_close_outgoing(ec, cr); + + // sync with rb_ractor_terminate_interrupt_main_thread() + RB_VM_LOCK_ENTER(); + { + VM_ASSERT(cr->threads.main != NULL); + cr->threads.main = NULL; + } + RB_VM_LOCK_LEAVE(); +} + +void +rb_ractor_atexit(rb_execution_context_t *ec, VALUE result) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_yield_atexit(ec, cr, result, false); +} + +void +rb_ractor_atexit_exception(rb_execution_context_t *ec) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ractor_yield_atexit(ec, cr, ec->errinfo, true); +} + +void +rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr) +{ + for (int i=0; i<len; i++) { + ptr[i] = ractor_receive(ec, r); + } +} + +void +rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args) +{ + int len = RARRAY_LENINT(args); + for (int i=0; i<len; i++) { + ractor_send(ec, r, RARRAY_AREF(args, i), false); + } +} + +MJIT_FUNC_EXPORTED bool +rb_ractor_main_p_(void) +{ + VM_ASSERT(rb_multi_ractor_p()); + rb_execution_context_t *ec = GET_EC(); + return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor; +} + +bool +rb_obj_is_main_ractor(VALUE gv) +{ + if (!rb_ractor_p(gv)) return false; + rb_ractor_t *r = DATA_PTR(gv); + return r == GET_VM()->ractor.main_ractor; +} + +rb_global_vm_lock_t * +rb_ractor_gvl(rb_ractor_t *r) +{ + return &r->threads.gvl; +} + +int +rb_ractor_living_thread_num(const rb_ractor_t *r) +{ + return r->threads.cnt; +} + +VALUE +rb_ractor_thread_list(rb_ractor_t *r) +{ + rb_thread_t *th = 0; + VALUE *ts; + int ts_cnt; + + RACTOR_LOCK(r); + { + ts = ALLOCA_N(VALUE, r->threads.cnt); + ts_cnt = 0; + + list_for_each(&r->threads.set, th, lt_node) { + switch (th->status) { + case THREAD_RUNNABLE: + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + ts[ts_cnt++] = th->self; + default: + break; + } + } + } + RACTOR_UNLOCK(r); + + VALUE ary = rb_ary_new(); + for (int i=0; i<ts_cnt; i++) { + rb_ary_push(ary, ts[i]); + } + + return ary; +} + +void +rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th) +{ + VM_ASSERT(th != NULL); + + RACTOR_LOCK(r); + { + RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt); + list_add_tail(&r->threads.set, &th->lt_node); + r->threads.cnt++; + } + RACTOR_UNLOCK(r); + + // first thread for a ractor + if (r->threads.cnt == 1) { + VM_ASSERT(ractor_status_p(r, ractor_created)); + vm_insert_ractor(th->vm, r); + } +} + +static void +vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line) +{ + ractor_status_set(r, ractor_blocking); + + RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d++", vm->ractor.blocking_cnt); + vm->ractor.blocking_cnt++; + VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt); +} + +void +rb_vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) +{ + ASSERT_vm_locking(); + VM_ASSERT(GET_RACTOR() == cr); + vm_ractor_blocking_cnt_inc(vm, cr, file, line); +} + +void +rb_vm_ractor_blocking_cnt_dec(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line) +{ + ASSERT_vm_locking(); + VM_ASSERT(GET_RACTOR() == cr); + + RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d--", vm->ractor.blocking_cnt); + VM_ASSERT(vm->ractor.blocking_cnt > 0); + vm->ractor.blocking_cnt--; + + ractor_status_set(cr, ractor_running); +} + +static void +ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const char *file, int line) +{ + VM_ASSERT(cr == GET_RACTOR()); + + RUBY_DEBUG_LOG2(file, line, + "cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u", + cr->threads.cnt, cr->threads.blocking_cnt, + GET_VM()->ractor.cnt, GET_VM()->ractor.blocking_cnt); + + VM_ASSERT(cr->threads.cnt >= cr->threads.blocking_cnt + 1); + + if (remained_thread_cnt > 0 && + // will be block + cr->threads.cnt == cr->threads.blocking_cnt + 1) { + // change ractor status: running -> blocking + rb_vm_t *vm = GET_VM(); + ASSERT_vm_unlocking(); + + RB_VM_LOCK(); + { + rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line); + } + RB_VM_UNLOCK(); + } +} + +void +rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th) +{ + VM_ASSERT(cr == GET_RACTOR()); + RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt); + ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__); + + if (cr->threads.cnt == 1) { + vm_remove_ractor(th->vm, cr); + } + else { + RACTOR_LOCK(cr); + { + list_del(&th->lt_node); + cr->threads.cnt--; + } + RACTOR_UNLOCK(cr); + } +} + +void +rb_ractor_blocking_threads_inc(rb_ractor_t *cr, const char *file, int line) +{ + RUBY_DEBUG_LOG2(file, line, "cr->threads.blocking_cnt:%d++", cr->threads.blocking_cnt); + + VM_ASSERT(cr->threads.cnt > 0); + VM_ASSERT(cr == GET_RACTOR()); + + ractor_check_blocking(cr, cr->threads.cnt, __FILE__, __LINE__); + cr->threads.blocking_cnt++; +} + +void +rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line) +{ + RUBY_DEBUG_LOG2(file, line, + "r->threads.blocking_cnt:%d--, r->threads.cnt:%u", + cr->threads.blocking_cnt, cr->threads.cnt); + + VM_ASSERT(cr == GET_RACTOR()); + + if (cr->threads.cnt == cr->threads.blocking_cnt) { + rb_vm_t *vm = GET_VM(); + + RB_VM_LOCK_ENTER(); + { + rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); + } + RB_VM_LOCK_LEAVE(); + } + + cr->threads.blocking_cnt--; +} + +void +rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t *r) +{ + VM_ASSERT(r != GET_RACTOR()); + ASSERT_ractor_unlocking(r); + ASSERT_vm_locking(); + + RACTOR_LOCK(r); + { + if (ractor_status_p(r, ractor_running)) { + rb_execution_context_t *ec = r->threads.running_ec; + if (ec) { + RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec); + } + } + } + RACTOR_UNLOCK(r); +} + +void +rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r) +{ + VM_ASSERT(r != GET_RACTOR()); + ASSERT_ractor_unlocking(r); + ASSERT_vm_locking(); + + rb_thread_t *main_th = r->threads.main; + if (main_th) { + if (main_th->status != THREAD_KILLED) { + RUBY_VM_SET_TERMINATE_INTERRUPT(main_th->ec); + rb_threadptr_interrupt(main_th); + } + else { + RUBY_DEBUG_LOG("killed (%p)", (void *)main_th); + } + } +} + +void rb_thread_terminate_all(rb_thread_t *th); // thread.c + +static void +ractor_terminal_interrupt_all(rb_vm_t *vm) +{ + if (vm->ractor.cnt > 1) { + // send terminate notification to all ractors + rb_ractor_t *r = 0; + list_for_each(&vm->ractor.set, r, vmlr_node) { + if (r != vm->ractor.main_ractor) { + rb_ractor_terminate_interrupt_main_thread(r); + } + } + } +} + +void +rb_ractor_terminate_all(void) +{ + rb_vm_t *vm = GET_VM(); + rb_ractor_t *cr = vm->ractor.main_ractor; + + VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it. + + if (vm->ractor.cnt > 1) { + RB_VM_LOCK(); + ractor_terminal_interrupt_all(vm); // kill all ractors + RB_VM_UNLOCK(); + } + rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait + + RB_VM_LOCK(); + { + while (vm->ractor.cnt > 1) { + RUBY_DEBUG_LOG("terminate_waiting:%d", vm->ractor.sync.terminate_waiting); + vm->ractor.sync.terminate_waiting = true; + + // wait for 1sec + rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__); + rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */); + rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__); + + ractor_terminal_interrupt_all(vm); + } + } + RB_VM_UNLOCK(); +} + +rb_execution_context_t * +rb_vm_main_ractor_ec(rb_vm_t *vm) +{ + return vm->ractor.main_ractor->threads.running_ec; +} + +static VALUE +ractor_moved_missing(int argc, VALUE *argv, VALUE self) +{ + rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object"); +} + +/* + * Document-class: Ractor::ClosedError + * + * Raised when an attempt is made to send a message to a closed port, + * or to retrieve a message from a closed and empty port. + * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming + * and are closed implicitly when a Ractor terminates. + * + * r = Ractor.new { sleep(500) } + * r.close_outgoing + * r.take # Ractor::ClosedError + * + * ClosedError is a descendant of StopIteration, so the closing of the ractor will break + * the loops without propagating the error: + * + * r = Ractor.new do + * loop do + * msg = receive # raises ClosedError and loop traps it + * puts "Received: #{msg}" + * end + * puts "loop exited" + * end + * + * 3.times{|i| r << i} + * r.close_incoming + * r.take + * puts "Continue successfully" + * + * This will print: + * + * Received: 0 + * Received: 1 + * Received: 2 + * loop exited + * Continue successfully + */ + +/* + * Document-class: Ractor::RemoteError + * + * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor. + * Its +cause+ will contain the original exception, and +ractor+ is the original ractor + * it was raised in. + * + * r = Ractor.new { raise "Something weird happened" } + * + * begin + * r.take + * rescue => e + * p e # => #<Ractor::RemoteError: thrown by remote Ractor.> + * p e.ractor == r # => true + * p e.cause # => #<RuntimeError: Something weird happened> + * end + * + */ + +/* + * Document-class: Ractor::MovedError + * + * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield. + * + * r = Ractor.new { sleep } + * + * ary = [1, 2, 3] + * r.send(ary, move: true) + * ary.inspect + * # Ractor::MovedError (can not send any methods to a moved object) + * + */ + +/* + * Document-class: Ractor::MovedObject + * + * A special object which replaces any value that was moved to another ractor in Ractor#send + * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError. + * + * r = Ractor.new { receive } + * + * ary = [1, 2, 3] + * r.send(ary, move: true) + * p Ractor::MovedObject === ary + * # => true + * ary.inspect + * # Ractor::MovedError (can not send any methods to a moved object) + */ + +// Main docs are in ractor.rb, but without this clause there are weird artifacts +// in their rendering. +/* + * Document-class: Ractor + * + */ + +void +Init_Ractor(void) +{ + rb_cRactor = rb_define_class("Ractor", rb_cObject); + rb_undef_alloc_func(rb_cRactor); + + rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError); + rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError); + rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError); + rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError); + rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration); + rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError); + + rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject); + rb_undef_alloc_func(rb_cRactorMovedObject); + rb_define_method(rb_cRactorMovedObject, "method_missing", ractor_moved_missing, -1); + + // override methods defined in BasicObject + rb_define_method(rb_cRactorMovedObject, "__send__", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "!", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "==", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "!=", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "__id__", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1); + rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1); +} + +void +rb_ractor_dump(void) +{ + rb_vm_t *vm = GET_VM(); + rb_ractor_t *r = 0; + + list_for_each(&vm->ractor.set, r, vmlr_node) { + if (r != vm->ractor.main_ractor) { + fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_)); + } + } +} + +VALUE +rb_ractor_stdin(void) +{ + if (rb_ractor_main_p()) { + return rb_stdin; + } + else { + rb_ractor_t *cr = GET_RACTOR(); + return cr->r_stdin; + } +} + +VALUE +rb_ractor_stdout(void) +{ + if (rb_ractor_main_p()) { + return rb_stdout; + } + else { + rb_ractor_t *cr = GET_RACTOR(); + return cr->r_stdout; + } +} + +VALUE +rb_ractor_stderr(void) +{ + if (rb_ractor_main_p()) { + return rb_stderr; + } + else { + rb_ractor_t *cr = GET_RACTOR(); + return cr->r_stderr; + } +} + +void +rb_ractor_stdin_set(VALUE in) +{ + if (rb_ractor_main_p()) { + rb_stdin = in; + } + else { + rb_ractor_t *cr = GET_RACTOR(); + RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in); + } +} + +void +rb_ractor_stdout_set(VALUE out) +{ + if (rb_ractor_main_p()) { + rb_stdout = out; + } + else { + rb_ractor_t *cr = GET_RACTOR(); + RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out); + } +} + +void +rb_ractor_stderr_set(VALUE err) +{ + if (rb_ractor_main_p()) { + rb_stderr = err; + } + else { + rb_ractor_t *cr = GET_RACTOR(); + RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err); + } +} + +rb_hook_list_t * +rb_ractor_hooks(rb_ractor_t *cr) +{ + return &cr->pub.hooks; +} + +/// traverse function + +// 2: stop search +// 1: skip child +// 0: continue + +enum obj_traverse_iterator_result { + traverse_cont, + traverse_skip, + traverse_stop, +}; + +typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func)(VALUE obj); +typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func)(VALUE obj); +typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func)(VALUE obj); + +static enum obj_traverse_iterator_result null_leave(VALUE obj); + +struct obj_traverse_data { + rb_obj_traverse_enter_func enter_func; + rb_obj_traverse_leave_func leave_func; + + st_table *rec; + VALUE rec_hash; +}; + + +struct obj_traverse_callback_data { + bool stop; + struct obj_traverse_data *data; +}; + +static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data); + +static int +obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr) +{ + struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr; + + if (obj_traverse_i(key, d->data)) { + d->stop = true; + return ST_STOP; + } + + if (obj_traverse_i(val, d->data)) { + d->stop = true; + return ST_STOP; + } + + return ST_CONTINUE; +} + +static void +obj_traverse_reachable_i(VALUE obj, void *ptr) +{ + struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr; + + if (obj_traverse_i(obj, d->data)) { + d->stop = true; + } +} + +static struct st_table * +obj_traverse_rec(struct obj_traverse_data *data) +{ + if (UNLIKELY(!data->rec)) { + data->rec_hash = rb_ident_hash_new(); + data->rec = rb_hash_st_table(data->rec_hash); + } + return data->rec; +} + +static int +obj_traverse_i(VALUE obj, struct obj_traverse_data *data) +{ + if (RB_SPECIAL_CONST_P(obj)) return 0; + + switch (data->enter_func(obj)) { + case traverse_cont: break; + case traverse_skip: return 0; // skip children + case traverse_stop: return 1; // stop search + } + + if (UNLIKELY(st_insert(obj_traverse_rec(data), obj, 1))) { + // already traversed + return 0; + } + + if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { + struct gen_ivtbl *ivtbl; + rb_ivar_generic_ivtbl_lookup(obj, &ivtbl); + for (uint32_t i = 0; i < ivtbl->numiv; i++) { + VALUE val = ivtbl->ivptr[i]; + if (val != Qundef && obj_traverse_i(val, data)) return 1; + } + } + + switch (BUILTIN_TYPE(obj)) { + // no child node + case T_STRING: + case T_FLOAT: + case T_BIGNUM: + case T_REGEXP: + case T_FILE: + case T_SYMBOL: + case T_MATCH: + break; + + case T_OBJECT: + { + uint32_t len = ROBJECT_NUMIV(obj); + VALUE *ptr = ROBJECT_IVPTR(obj); + + for (uint32_t i=0; i<len; i++) { + VALUE val = ptr[i]; + if (val != Qundef && obj_traverse_i(val, data)) return 1; + } + } + break; + + case T_ARRAY: + { + for (int i = 0; i < RARRAY_LENINT(obj); i++) { + VALUE e = rb_ary_entry(obj, i); + if (obj_traverse_i(e, data)) return 1; + } + } + break; + + case T_HASH: + { + if (obj_traverse_i(RHASH_IFNONE(obj), data)) return 1; + + struct obj_traverse_callback_data d = { + .stop = false, + .data = data, + }; + rb_hash_foreach(obj, obj_hash_traverse_i, (VALUE)&d); + if (d.stop) return 1; + } + break; + + case T_STRUCT: + { + long len = RSTRUCT_LEN(obj); + const VALUE *ptr = RSTRUCT_CONST_PTR(obj); + + for (long i=0; i<len; i++) { + if (obj_traverse_i(ptr[i], data)) return 1; + } + } + break; + + case T_RATIONAL: + if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1; + if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1; + break; + case T_COMPLEX: + if (obj_traverse_i(RCOMPLEX(obj)->real, data)) return 1; + if (obj_traverse_i(RCOMPLEX(obj)->imag, data)) return 1; + break; + + case T_DATA: + case T_IMEMO: + { + struct obj_traverse_callback_data d = { + .stop = false, + .data = data, + }; + RB_VM_LOCK_ENTER_NO_BARRIER(); + { + rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d); + } + RB_VM_LOCK_LEAVE_NO_BARRIER(); + if (d.stop) return 1; + } + break; + + // unreachable + case T_CLASS: + case T_MODULE: + case T_ICLASS: + default: + rp(obj); + rb_bug("unreachable"); + } + + if (data->leave_func(obj) == traverse_stop) { + return 1; + } + else { + return 0; + } +} + +struct rb_obj_traverse_final_data { + rb_obj_traverse_final_func final_func; + int stopped; +}; + +static int +obj_traverse_final_i(st_data_t key, st_data_t val, st_data_t arg) +{ + struct rb_obj_traverse_final_data *data = (void *)arg; + if (data->final_func(key)) { + data->stopped = 1; + return ST_STOP; + } + return ST_CONTINUE; +} + +// 0: traverse all +// 1: stopped +static int +rb_obj_traverse(VALUE obj, + rb_obj_traverse_enter_func enter_func, + rb_obj_traverse_leave_func leave_func, + rb_obj_traverse_final_func final_func) +{ + struct obj_traverse_data data = { + .enter_func = enter_func, + .leave_func = leave_func, + .rec = NULL, + }; + + if (obj_traverse_i(obj, &data)) return 1; + if (final_func && data.rec) { + struct rb_obj_traverse_final_data f = {final_func, 0}; + st_foreach(data.rec, obj_traverse_final_i, (st_data_t)&f); + return f.stopped; + } + return 0; +} + +static int +frozen_shareable_p(VALUE obj, bool *made_shareable) +{ + if (!RB_TYPE_P(obj, T_DATA)) { + return true; + } + else if (RTYPEDDATA_P(obj)) { + const rb_data_type_t *type = RTYPEDDATA_TYPE(obj); + if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) { + return true; + } + else if (made_shareable && rb_obj_is_proc(obj)) { + // special path to make shareable Proc. + rb_proc_ractor_make_shareable(obj); + *made_shareable = true; + VM_ASSERT(RB_OBJ_SHAREABLE_P(obj)); + return false; + } + } + + return false; +} + +static enum obj_traverse_iterator_result +make_shareable_check_shareable(VALUE obj) +{ + VM_ASSERT(!SPECIAL_CONST_P(obj)); + bool made_shareable = false; + + if (rb_ractor_shareable_p(obj)) { + return traverse_skip; + } + else if (!frozen_shareable_p(obj, &made_shareable)) { + if (made_shareable) { + return traverse_skip; + } + else { + rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj); + } + } + + if (!RB_OBJ_FROZEN_RAW(obj)) { + rb_funcall(obj, idFreeze, 0); + + if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) { + rb_raise(rb_eRactorError, "#freeze does not freeze object correctly"); + } + + if (RB_OBJ_SHAREABLE_P(obj)) { + return traverse_skip; + } + } + + return traverse_cont; +} + +static enum obj_traverse_iterator_result +mark_shareable(VALUE obj) +{ + FL_SET_RAW(obj, RUBY_FL_SHAREABLE); + return traverse_cont; +} + +VALUE +rb_ractor_make_shareable(VALUE obj) +{ + rb_obj_traverse(obj, + make_shareable_check_shareable, + null_leave, mark_shareable); + return obj; +} + +VALUE +rb_ractor_make_shareable_copy(VALUE obj) +{ + VALUE copy = ractor_copy(obj); + rb_obj_traverse(copy, + make_shareable_check_shareable, + null_leave, mark_shareable); + return copy; +} + +VALUE +rb_ractor_ensure_shareable(VALUE obj, VALUE name) +{ + if (!rb_ractor_shareable_p(obj)) { + VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE, + name); + rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message)); + } + return obj; +} + +static enum obj_traverse_iterator_result +shareable_p_enter(VALUE obj) +{ + if (RB_OBJ_SHAREABLE_P(obj)) { + return traverse_skip; + } + else if (RB_TYPE_P(obj, T_CLASS) || + RB_TYPE_P(obj, T_MODULE) || + RB_TYPE_P(obj, T_ICLASS)) { + // TODO: remove it + mark_shareable(obj); + return traverse_skip; + } + else if (RB_OBJ_FROZEN_RAW(obj) && + frozen_shareable_p(obj, NULL)) { + return traverse_cont; + } + + return traverse_stop; // fail +} + +MJIT_FUNC_EXPORTED bool +rb_ractor_shareable_p_continue(VALUE obj) +{ + if (rb_obj_traverse(obj, + shareable_p_enter, null_leave, + mark_shareable)) { + return false; + } + else { + return true; + } +} + +#if RACTOR_CHECK_MODE > 0 +static enum obj_traverse_iterator_result +reset_belonging_enter(VALUE obj) +{ + if (rb_ractor_shareable_p(obj)) { + return traverse_skip; + } + else { + rb_ractor_setup_belonging(obj); + return traverse_cont; + } +} +#endif + +static enum obj_traverse_iterator_result +null_leave(VALUE obj) +{ + return traverse_cont; +} + +static VALUE +ractor_reset_belonging(VALUE obj) +{ +#if RACTOR_CHECK_MODE > 0 + rb_obj_traverse(obj, reset_belonging_enter, null_leave, NULL); +#endif + return obj; +} + + +/// traverse and replace function + +// 2: stop search +// 1: skip child +// 0: continue + +struct obj_traverse_replace_data; +static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data); +typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func)(VALUE obj, struct obj_traverse_replace_data *data); +typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func)(VALUE obj, struct obj_traverse_replace_data *data); + +struct obj_traverse_replace_data { + rb_obj_traverse_replace_enter_func enter_func; + rb_obj_traverse_replace_leave_func leave_func; + + st_table *rec; + VALUE rec_hash; + + VALUE replacement; + bool move; +}; + +struct obj_traverse_replace_callback_data { + bool stop; + VALUE src; + struct obj_traverse_replace_data *data; +}; + +static int +obj_hash_traverse_replace_foreach_i(st_data_t key, st_data_t value, st_data_t argp, int error) +{ + return ST_REPLACE; +} + +static int +obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int exists) +{ + struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr; + struct obj_traverse_replace_data *data = d->data; + + if (obj_traverse_replace_i(*key, data)) { + d->stop = true; + return ST_STOP; + } + else if (*key != data->replacement) { + VALUE v = *key = data->replacement; + RB_OBJ_WRITTEN(d->src, Qundef, v); + } + + if (obj_traverse_replace_i(*val, data)) { + d->stop = true; + return ST_STOP; + } + else if (*val != data->replacement) { + VALUE v = *val = data->replacement; + RB_OBJ_WRITTEN(d->src, Qundef, v); + } + + return ST_CONTINUE; +} + +static struct st_table * +obj_traverse_replace_rec(struct obj_traverse_replace_data *data) +{ + if (UNLIKELY(!data->rec)) { + data->rec_hash = rb_ident_hash_new(); + data->rec = rb_hash_st_table(data->rec_hash); + } + return data->rec; +} + +#if USE_TRANSIENT_HEAP +void rb_ary_transient_heap_evacuate(VALUE ary, int promote); +void rb_obj_transient_heap_evacuate(VALUE obj, int promote); +void rb_hash_transient_heap_evacuate(VALUE hash, int promote); +void rb_struct_transient_heap_evacuate(VALUE st, int promote); +#endif + +static void +obj_refer_only_shareables_p_i(VALUE obj, void *ptr) +{ + int *pcnt = (int *)ptr; + + if (!rb_ractor_shareable_p(obj)) { + pcnt++; + } +} + +static int +obj_refer_only_shareables_p(VALUE obj) +{ + int cnt = 0; + RB_VM_LOCK_ENTER_NO_BARRIER(); + { + rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt); + } + RB_VM_LOCK_LEAVE_NO_BARRIER(); + return cnt == 0; +} + +static int +obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data) +{ + VALUE replacement; + + if (RB_SPECIAL_CONST_P(obj)) { + data->replacement = obj; + return 0; + } + + switch (data->enter_func(obj, data)) { + case traverse_cont: break; + case traverse_skip: return 0; // skip children + case traverse_stop: return 1; // stop search + } + + replacement = data->replacement; + + if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t *)&replacement))) { + data->replacement = replacement; + return 0; + } + else { + st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t)replacement); + } + + if (!data->move) { + obj = replacement; + } + +#define CHECK_AND_REPLACE(v) do { \ + VALUE _val = (v); \ + if (obj_traverse_replace_i(_val, data)) { return 1; } \ + else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \ +} while (0) + + if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { + struct gen_ivtbl *ivtbl; + rb_ivar_generic_ivtbl_lookup(obj, &ivtbl); + for (uint32_t i = 0; i < ivtbl->numiv; i++) { + if (ivtbl->ivptr[i] != Qundef) { + CHECK_AND_REPLACE(ivtbl->ivptr[i]); + } + } + } + + switch (BUILTIN_TYPE(obj)) { + // no child node + case T_FLOAT: + case T_BIGNUM: + case T_REGEXP: + case T_FILE: + case T_SYMBOL: + case T_MATCH: + break; + case T_STRING: + rb_str_make_independent(obj); + break; + + case T_OBJECT: + { +#if USE_TRANSIENT_HEAP + if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE); +#endif + + uint32_t len = ROBJECT_NUMIV(obj); + VALUE *ptr = ROBJECT_IVPTR(obj); + + for (uint32_t i=0; i<len; i++) { + if (ptr[i] != Qundef) { + CHECK_AND_REPLACE(ptr[i]); + } + } + } + break; + + case T_ARRAY: + { + rb_ary_cancel_sharing(obj); +#if USE_TRANSIENT_HEAP + if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE); +#endif + + for (int i = 0; i < RARRAY_LENINT(obj); i++) { + VALUE e = rb_ary_entry(obj, i); + + if (obj_traverse_replace_i(e, data)) { + return 1; + } + else if (e != data->replacement) { + RARRAY_ASET(obj, i, data->replacement); + } + } + RB_GC_GUARD(obj); + } + break; + + case T_HASH: + { +#if USE_TRANSIENT_HEAP + if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE); +#endif + struct obj_traverse_replace_callback_data d = { + .stop = false, + .data = data, + .src = obj, + }; + rb_hash_stlike_foreach_with_replace(obj, + obj_hash_traverse_replace_foreach_i, + obj_hash_traverse_replace_i, + (VALUE)&d); + if (d.stop) return 1; + // TODO: rehash here? + + VALUE ifnone = RHASH_IFNONE(obj); + if (obj_traverse_replace_i(ifnone, data)) { + return 1; + } + else if (ifnone != data->replacement) { + RHASH_SET_IFNONE(obj, data->replacement); + } + } + break; + + case T_STRUCT: + { +#if USE_TRANSIENT_HEAP + if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE); +#endif + long len = RSTRUCT_LEN(obj); + const VALUE *ptr = RSTRUCT_CONST_PTR(obj); + + for (long i=0; i<len; i++) { + CHECK_AND_REPLACE(ptr[i]); + } + } + break; + + case T_RATIONAL: + CHECK_AND_REPLACE(RRATIONAL(obj)->num); + CHECK_AND_REPLACE(RRATIONAL(obj)->den); + break; + case T_COMPLEX: + CHECK_AND_REPLACE(RCOMPLEX(obj)->real); + CHECK_AND_REPLACE(RCOMPLEX(obj)->imag); + break; + + case T_DATA: + if (!data->move && obj_refer_only_shareables_p(obj)) { + break; + } + else { + rb_raise(rb_eRactorError, "can not %s %"PRIsVALUE" object.", + data->move ? "move" : "copy", rb_class_of(obj)); + } + + case T_IMEMO: + // not supported yet + return 1; + + // unreachable + case T_CLASS: + case T_MODULE: + case T_ICLASS: + default: + rp(obj); + rb_bug("unreachable"); + } + + data->replacement = replacement; + + if (data->leave_func(obj, data) == traverse_stop) { + return 1; + } + else { + return 0; + } +} + +// 0: traverse all +// 1: stopped +static VALUE +rb_obj_traverse_replace(VALUE obj, + rb_obj_traverse_replace_enter_func enter_func, + rb_obj_traverse_replace_leave_func leave_func, + bool move) +{ + struct obj_traverse_replace_data data = { + .enter_func = enter_func, + .leave_func = leave_func, + .rec = NULL, + .replacement = Qundef, + .move = move, + }; + + if (obj_traverse_replace_i(obj, &data)) { + return Qundef; + } + else { + return data.replacement; + } +} + +struct RVALUE { + VALUE flags; + VALUE klass; + VALUE v1; + VALUE v2; + VALUE v3; +}; + +static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 | + FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 | + FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 | + FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 | + FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19; + +static void +ractor_moved_bang(VALUE obj) +{ + // invalidate src object + struct RVALUE *rv = (void *)obj; + + rv->klass = rb_cRactorMovedObject; + rv->v1 = 0; + rv->v2 = 0; + rv->v3 = 0; + rv->flags = rv->flags & ~fl_users; + + // TODO: record moved location +} + +static enum obj_traverse_iterator_result +move_enter(VALUE obj, struct obj_traverse_replace_data *data) +{ + if (rb_ractor_shareable_p(obj)) { + data->replacement = obj; + return traverse_skip; + } + else { + data->replacement = rb_obj_alloc(RBASIC_CLASS(obj)); + return traverse_cont; + } +} + +void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c + +static enum obj_traverse_iterator_result +move_leave(VALUE obj, struct obj_traverse_replace_data *data) +{ + VALUE v = data->replacement; + struct RVALUE *dst = (struct RVALUE *)v; + struct RVALUE *src = (struct RVALUE *)obj; + + dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users); + + dst->v1 = src->v1; + dst->v2 = src->v2; + dst->v3 = src->v3; + + if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) { + rb_replace_generic_ivar(v, obj); + } + + // TODO: generic_ivar + + ractor_moved_bang(obj); + return traverse_cont; +} + +static VALUE +ractor_move(VALUE obj) +{ + VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true); + if (val != Qundef) { + return val; + } + else { + rb_raise(rb_eRactorError, "can not move the object"); + } +} + +static enum obj_traverse_iterator_result +copy_enter(VALUE obj, struct obj_traverse_replace_data *data) +{ + if (rb_ractor_shareable_p(obj)) { + data->replacement = obj; + return traverse_skip; + } + else { + data->replacement = rb_obj_clone(obj); + return traverse_cont; + } +} + +static enum obj_traverse_iterator_result +copy_leave(VALUE obj, struct obj_traverse_replace_data *data) +{ + return traverse_cont; +} + +static VALUE +ractor_copy(VALUE obj) +{ + VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false); + if (val != Qundef) { + return val; + } + else { + rb_raise(rb_eRactorError, "can not copy the object"); + } +} + +// Ractor local storage + +struct rb_ractor_local_key_struct { + const struct rb_ractor_local_storage_type *type; + void *main_cache; +}; + +static struct freed_ractor_local_keys_struct { + int cnt; + int capa; + rb_ractor_local_key_t *keys; +} freed_ractor_local_keys; + +static int +ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy) +{ + struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key; + if (k->type->mark) (*k->type->mark)((void *)val); + return ST_CONTINUE; +} + +static enum rb_id_table_iterator_result +idkey_local_storage_mark_i(ID id, VALUE val, void *dmy) +{ + rb_gc_mark(val); + return ID_TABLE_CONTINUE; +} + +static void +ractor_local_storage_mark(rb_ractor_t *r) +{ + if (r->local_storage) { + st_foreach(r->local_storage, ractor_local_storage_mark_i, 0); + + for (int i=0; i<freed_ractor_local_keys.cnt; i++) { + rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i]; + st_data_t val; + if (st_delete(r->local_storage, (st_data_t *)&key, &val) && + key->type->free) { + (*key->type->free)((void *)val); + } + } + } + + if (r->idkey_local_storage) { + rb_id_table_foreach(r->idkey_local_storage, idkey_local_storage_mark_i, NULL); + } +} + +static int +ractor_local_storage_free_i(st_data_t key, st_data_t val, st_data_t dmy) +{ + struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key; + if (k->type->free) (*k->type->free)((void *)val); + return ST_CONTINUE; +} + +static void +ractor_local_storage_free(rb_ractor_t *r) +{ + if (r->local_storage) { + st_foreach(r->local_storage, ractor_local_storage_free_i, 0); + st_free_table(r->local_storage); + } + + if (r->idkey_local_storage) { + rb_id_table_free(r->idkey_local_storage); + } +} + +static void +rb_ractor_local_storage_value_mark(void *ptr) +{ + rb_gc_mark((VALUE)ptr); +} + +static const struct rb_ractor_local_storage_type ractor_local_storage_type_null = { + NULL, + NULL, +}; + +const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free = { + NULL, + ruby_xfree, +}; + +static const struct rb_ractor_local_storage_type ractor_local_storage_type_value = { + rb_ractor_local_storage_value_mark, + NULL, +}; + +rb_ractor_local_key_t +rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type *type) +{ + rb_ractor_local_key_t key = ALLOC(struct rb_ractor_local_key_struct); + key->type = type ? type : &ractor_local_storage_type_null; + key->main_cache = (void *)Qundef; + return key; +} + +rb_ractor_local_key_t +rb_ractor_local_storage_value_newkey(void) +{ + return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value); +} + +void +rb_ractor_local_storage_delkey(rb_ractor_local_key_t key) +{ + RB_VM_LOCK_ENTER(); + { + if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) { + freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4; + REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa); + } + freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key; + } + RB_VM_LOCK_LEAVE(); +} + +static bool +ractor_local_ref(rb_ractor_local_key_t key, void **pret) +{ + if (rb_ractor_main_p()) { + if ((VALUE)key->main_cache != Qundef) { + *pret = key->main_cache; + return true; + } + else { + return false; + } + } + else { + rb_ractor_t *cr = GET_RACTOR(); + + if (cr->local_storage && st_lookup(cr->local_storage, (st_data_t)key, (st_data_t *)pret)) { + return true; + } + else { + return false; + } + } +} + +static void +ractor_local_set(rb_ractor_local_key_t key, void *ptr) +{ + rb_ractor_t *cr = GET_RACTOR(); + + if (cr->local_storage == NULL) { + cr->local_storage = st_init_numtable(); + } + + st_insert(cr->local_storage, (st_data_t)key, (st_data_t)ptr); + + if (rb_ractor_main_p()) { + key->main_cache = ptr; + } +} + +VALUE +rb_ractor_local_storage_value(rb_ractor_local_key_t key) +{ + VALUE val; + if (ractor_local_ref(key, (void **)&val)) { + return val; + } + else { + return Qnil; + } +} + +bool +rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val) +{ + if (ractor_local_ref(key, (void **)val)) { + return true; + } + else { + return false; + } +} + +void +rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val) +{ + ractor_local_set(key, (void *)val); +} + +void * +rb_ractor_local_storage_ptr(rb_ractor_local_key_t key) +{ + void *ret; + if (ractor_local_ref(key, &ret)) { + return ret; + } + else { + return NULL; + } +} + +void +rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key, void *ptr) +{ + ractor_local_set(key, ptr); +} + +#define DEFAULT_KEYS_CAPA 0x10 + +void +rb_ractor_finish_marking(void) +{ + for (int i=0; i<freed_ractor_local_keys.cnt; i++) { + ruby_xfree(freed_ractor_local_keys.keys[i]); + } + freed_ractor_local_keys.cnt = 0; + if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) { + freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA; + REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA); + } +} + +static VALUE +ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ID id = rb_check_id(&sym); + struct rb_id_table *tbl = cr->idkey_local_storage; + VALUE val; + + if (id && tbl && rb_id_table_lookup(tbl, id, &val)) { + return val; + } + else { + return Qnil; + } +} + +static VALUE +ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val) +{ + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + ID id = SYM2ID(rb_to_symbol(sym)); + struct rb_id_table *tbl = cr->idkey_local_storage; + + if (tbl == NULL) { + tbl = cr->idkey_local_storage = rb_id_table_create(2); + } + rb_id_table_insert(tbl, id, val); + return val; +} + +#include "ractor.rbinc" |
