summaryrefslogtreecommitdiff
path: root/ractor_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'ractor_sync.c')
-rw-r--r--ractor_sync.c1516
1 files changed, 1516 insertions, 0 deletions
diff --git a/ractor_sync.c b/ractor_sync.c
new file mode 100644
index 0000000000..a63df7c407
--- /dev/null
+++ b/ractor_sync.c
@@ -0,0 +1,1516 @@
+
+// this file is included by ractor.c
+
+struct ractor_port {
+ rb_ractor_t *r;
+ st_data_t id_;
+};
+
+static st_data_t
+ractor_port_id(const struct ractor_port *rp)
+{
+ return rp->id_;
+}
+
+static VALUE rb_cRactorPort;
+
+static VALUE ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp);
+static VALUE ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move);
+static VALUE ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move);
+static void ractor_add_port(rb_ractor_t *r, st_data_t id);
+
+static void
+ractor_port_mark(void *ptr)
+{
+ const struct ractor_port *rp = (struct ractor_port *)ptr;
+
+ if (rp->r) {
+ rb_gc_mark(rp->r->pub.self);
+ }
+}
+
+static void
+ractor_port_free(void *ptr)
+{
+ xfree(ptr);
+}
+
+static size_t
+ractor_port_memsize(const void *ptr)
+{
+ return sizeof(struct ractor_port);
+}
+
+static const rb_data_type_t ractor_port_data_type = {
+ "ractor/port",
+ {
+ ractor_port_mark,
+ ractor_port_free,
+ ractor_port_memsize,
+ NULL, // update
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FROZEN_SHAREABLE,
+};
+
+static st_data_t
+ractor_genid_for_port(rb_ractor_t *cr)
+{
+ // TODO: enough?
+ return cr->sync.next_port_id++;
+}
+
+static struct ractor_port *
+RACTOR_PORT_PTR(VALUE self)
+{
+ VM_ASSERT(rb_typeddata_is_kind_of(self, &ractor_port_data_type));
+ struct ractor_port *rp = DATA_PTR(self);
+ return rp;
+}
+
+static VALUE
+ractor_port_alloc(VALUE klass)
+{
+ struct ractor_port *rp;
+ VALUE rpv = TypedData_Make_Struct(klass, struct ractor_port, &ractor_port_data_type, rp);
+ rb_obj_freeze(rpv);
+ return rpv;
+}
+
+static VALUE
+ractor_port_init(VALUE rpv, rb_ractor_t *r)
+{
+ struct ractor_port *rp = RACTOR_PORT_PTR(rpv);
+
+ rp->r = r;
+ RB_OBJ_WRITTEN(rpv, Qundef, r->pub.self);
+ rp->id_ = ractor_genid_for_port(r);
+
+ ractor_add_port(r, ractor_port_id(rp));
+
+ rb_obj_freeze(rpv);
+
+ return rpv;
+}
+
+/*
+ * call-seq:
+ * Ractor::Port.new -> new_port
+ *
+ * Returns a new Ractor::Port object.
+ */
+static VALUE
+ractor_port_initialize(VALUE self)
+{
+ return ractor_port_init(self, GET_RACTOR());
+}
+
+/* :nodoc: */
+static VALUE
+ractor_port_initialize_copy(VALUE self, VALUE orig)
+{
+ struct ractor_port *dst = RACTOR_PORT_PTR(self);
+ struct ractor_port *src = RACTOR_PORT_PTR(orig);
+ dst->r = src->r;
+ RB_OBJ_WRITTEN(self, Qundef, dst->r->pub.self);
+ dst->id_ = ractor_port_id(src);
+
+ return self;
+}
+
+static VALUE
+ractor_port_new(rb_ractor_t *r)
+{
+ VALUE rpv = ractor_port_alloc(rb_cRactorPort);
+ ractor_port_init(rpv, r);
+ return rpv;
+}
+
+static bool
+ractor_port_p(VALUE self)
+{
+ return rb_typeddata_is_kind_of(self, &ractor_port_data_type);
+}
+
+static VALUE
+ractor_port_receive(rb_execution_context_t *ec, VALUE self)
+{
+ const struct ractor_port *rp = RACTOR_PORT_PTR(self);
+
+ if (rp->r != rb_ec_ractor_ptr(ec)) {
+ rb_raise(rb_eRactorError, "only allowed from the creator Ractor of this port");
+ }
+
+ return ractor_receive(ec, rp);
+}
+
+static VALUE
+ractor_port_send(rb_execution_context_t *ec, VALUE self, VALUE obj, VALUE move)
+{
+ const struct ractor_port *rp = RACTOR_PORT_PTR(self);
+ ractor_send(ec, rp, obj, RTEST(move));
+ return self;
+}
+
+static bool ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp);
+static bool ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp);
+
+static VALUE
+ractor_port_closed_p(rb_execution_context_t *ec, VALUE self)
+{
+ const struct ractor_port *rp = RACTOR_PORT_PTR(self);
+
+ if (ractor_closed_port_p(ec, rp->r, rp)) {
+ return Qtrue;
+ }
+ else {
+ return Qfalse;
+ }
+}
+
+static VALUE
+ractor_port_close(rb_execution_context_t *ec, VALUE self)
+{
+ const struct ractor_port *rp = RACTOR_PORT_PTR(self);
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+
+ if (cr != rp->r) {
+ rb_raise(rb_eRactorError, "closing port by other ractors is not allowed");
+ }
+
+ ractor_close_port(ec, cr, rp);
+ return self;
+}
+
+// ractor-internal
+
+// ractor-internal - ractor_basket
+
+enum ractor_basket_type {
+ // basket is empty
+ basket_type_none,
+
+ // value is available
+ basket_type_ref,
+ basket_type_copy,
+ basket_type_move,
+};
+
+struct ractor_basket {
+ enum ractor_basket_type type;
+ VALUE sender;
+ st_data_t port_id;
+
+ struct {
+ VALUE v;
+ bool exception;
+ } p; // payload
+
+ struct ccan_list_node node;
+};
+
+#if 0
+static inline bool
+ractor_basket_type_p(const struct ractor_basket *b, enum ractor_basket_type type)
+{
+ return b->type == type;
+}
+
+static inline bool
+ractor_basket_none_p(const struct ractor_basket *b)
+{
+ return ractor_basket_type_p(b, basket_type_none);
+}
+#endif
+
+static void
+ractor_basket_mark(const struct ractor_basket *b)
+{
+ rb_gc_mark(b->p.v);
+}
+
+static void
+ractor_basket_free(struct ractor_basket *b)
+{
+ xfree(b);
+}
+
+static struct ractor_basket *
+ractor_basket_alloc(void)
+{
+ struct ractor_basket *b = ALLOC(struct ractor_basket);
+ return b;
+}
+
+// ractor-internal - ractor_queue
+
+struct ractor_queue {
+ struct ccan_list_head set;
+ bool closed;
+};
+
+static void
+ractor_queue_init(struct ractor_queue *rq)
+{
+ ccan_list_head_init(&rq->set);
+ rq->closed = false;
+}
+
+static struct ractor_queue *
+ractor_queue_new(void)
+{
+ struct ractor_queue *rq = ALLOC(struct ractor_queue);
+ ractor_queue_init(rq);
+ return rq;
+}
+
+static void
+ractor_queue_mark(const struct ractor_queue *rq)
+{
+ const struct ractor_basket *b;
+
+ ccan_list_for_each(&rq->set, b, node) {
+ ractor_basket_mark(b);
+ }
+}
+
+static void
+ractor_queue_free(struct ractor_queue *rq)
+{
+ struct ractor_basket *b, *nxt;
+
+ ccan_list_for_each_safe(&rq->set, b, nxt, node) {
+ ccan_list_del_init(&b->node);
+ ractor_basket_free(b);
+ }
+
+ VM_ASSERT(ccan_list_empty(&rq->set));
+
+ xfree(rq);
+}
+
+RBIMPL_ATTR_MAYBE_UNUSED()
+static size_t
+ractor_queue_size(const struct ractor_queue *rq)
+{
+ size_t size = 0;
+ const struct ractor_basket *b;
+
+ ccan_list_for_each(&rq->set, b, node) {
+ size++;
+ }
+ return size;
+}
+
+static void
+ractor_queue_close(struct ractor_queue *rq)
+{
+ rq->closed = true;
+}
+
+static void
+ractor_queue_move(struct ractor_queue *dst_rq, struct ractor_queue *src_rq)
+{
+ struct ccan_list_head *src = &src_rq->set;
+ struct ccan_list_head *dst = &dst_rq->set;
+
+ dst->n.next = src->n.next;
+ dst->n.prev = src->n.prev;
+ dst->n.next->prev = &dst->n;
+ dst->n.prev->next = &dst->n;
+ ccan_list_head_init(src);
+}
+
+#if 0
+static struct ractor_basket *
+ractor_queue_head(rb_ractor_t *r, struct ractor_queue *rq)
+{
+ return ccan_list_top(&rq->set, struct ractor_basket, node);
+}
+#endif
+
+static bool
+ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq)
+{
+ return ccan_list_empty(&rq->set);
+}
+
+static struct ractor_basket *
+ractor_queue_deq(rb_ractor_t *r, struct ractor_queue *rq)
+{
+ VM_ASSERT(GET_RACTOR() == r);
+
+ return ccan_list_pop(&rq->set, struct ractor_basket, node);
+}
+
+static void
+ractor_queue_enq(rb_ractor_t *r, struct ractor_queue *rq, struct ractor_basket *basket)
+{
+ ccan_list_add_tail(&rq->set, &basket->node);
+}
+
+#if 0
+static void
+rq_dump(const struct ractor_queue *rq)
+{
+ int i=0;
+ struct ractor_basket *b;
+ ccan_list_for_each(&rq->set, b, node) {
+ fprintf(stderr, "%d type:%s %p\n", i, basket_type_name(b->type), (void *)b);
+ i++;
+ }
+}
+#endif
+
+static void ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked);
+
+static struct ractor_queue *
+ractor_get_queue(rb_ractor_t *cr, st_data_t id, bool locked)
+{
+ VM_ASSERT(cr == GET_RACTOR());
+
+ struct ractor_queue *rq;
+
+ if (cr->sync.ports && st_lookup(cr->sync.ports, id, (st_data_t *)&rq)) {
+ if (rq->closed && ractor_queue_empty_p(cr, rq)) {
+ ractor_delete_port(cr, id, locked);
+ return NULL;
+ }
+ else {
+ return rq;
+ }
+ }
+ else {
+ return NULL;
+ }
+}
+
+// ractor-internal - ports
+
+static void
+ractor_add_port(rb_ractor_t *r, st_data_t id)
+{
+ struct ractor_queue *rq = ractor_queue_new();
+ ASSERT_ractor_unlocking(r);
+
+ RUBY_DEBUG_LOG("id:%u", (unsigned int)id);
+
+ RACTOR_LOCK(r);
+ {
+ st_insert(r->sync.ports, id, (st_data_t)rq);
+ }
+ RACTOR_UNLOCK(r);
+}
+
+static void
+ractor_delete_port_locked(rb_ractor_t *cr, st_data_t id)
+{
+ ASSERT_ractor_locking(cr);
+
+ RUBY_DEBUG_LOG("id:%u", (unsigned int)id);
+
+ struct ractor_queue *rq;
+
+ if (st_delete(cr->sync.ports, &id, (st_data_t *)&rq)) {
+ ractor_queue_free(rq);
+ }
+ else {
+ VM_ASSERT(0);
+ }
+}
+
+static void
+ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked)
+{
+ if (locked) {
+ ractor_delete_port_locked(cr, id);
+ }
+ else {
+ RACTOR_LOCK_SELF(cr);
+ {
+ ractor_delete_port_locked(cr, id);
+ }
+ RACTOR_UNLOCK_SELF(cr);
+ }
+}
+
+static const struct ractor_port *
+ractor_default_port(rb_ractor_t *r)
+{
+ return RACTOR_PORT_PTR(r->sync.default_port_value);
+}
+
+static VALUE
+ractor_default_port_value(rb_ractor_t *r)
+{
+ return r->sync.default_port_value;
+}
+
+static bool
+ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp)
+{
+ VM_ASSERT(rb_ec_ractor_ptr(ec) == rp->r ? 1 : (ASSERT_ractor_locking(rp->r), 1));
+
+ const struct ractor_queue *rq;
+
+ if (rp->r->sync.ports && st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) {
+ return rq->closed;
+ }
+ else {
+ return true;
+ }
+}
+
+static void ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr);
+static bool ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq);
+
+static bool
+ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp)
+{
+ VM_ASSERT(cr == rp->r);
+ struct ractor_queue *rq = NULL;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ ractor_deliver_incoming_messages(ec, cr); // check incoming messages
+
+ if (st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) {
+ ractor_queue_close(rq);
+
+ if (ractor_queue_empty_p(cr, rq)) {
+ // delete from the table
+ ractor_delete_port(cr, ractor_port_id(rp), true);
+ }
+
+ // TODO: free rq
+ }
+ }
+ RACTOR_UNLOCK_SELF(cr);
+
+ return rq != NULL;
+}
+
+static int
+ractor_free_all_ports_i(st_data_t port_id, st_data_t val, st_data_t dat)
+{
+ struct ractor_queue *rq = (struct ractor_queue *)val;
+ // rb_ractor_t *cr = (rb_ractor_t *)dat;
+
+ ractor_queue_free(rq);
+ return ST_CONTINUE;
+}
+
+static void
+ractor_free_all_ports(rb_ractor_t *cr)
+{
+ if (cr->sync.ports) {
+ st_foreach(cr->sync.ports, ractor_free_all_ports_i, (st_data_t)cr);
+ st_free_table(cr->sync.ports);
+ cr->sync.ports = NULL;
+ }
+
+ if (cr->sync.recv_queue) {
+ ractor_queue_free(cr->sync.recv_queue);
+ cr->sync.recv_queue = NULL;
+ }
+}
+
+#if defined(HAVE_WORKING_FORK)
+static void
+ractor_sync_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r)
+{
+ ractor_free_all_ports(r);
+ r->sync.legacy = Qnil;
+}
+#endif
+
+// Ractor#monitor
+
+struct ractor_monitor {
+ struct ractor_port port;
+ struct ccan_list_node node;
+};
+
+static void
+ractor_mark_monitors(rb_ractor_t *r)
+{
+ const struct ractor_monitor *rm;
+ ccan_list_for_each(&r->sync.monitors, rm, node) {
+ rb_gc_mark(rm->port.r->pub.self);
+ }
+}
+
+static VALUE
+ractor_exit_token(bool exc)
+{
+ if (exc) {
+ RUBY_DEBUG_LOG("aborted");
+ return ID2SYM(idAborted);
+ }
+ else {
+ RUBY_DEBUG_LOG("exited");
+ return ID2SYM(idExited);
+ }
+}
+
+static VALUE
+ractor_monitor(rb_execution_context_t *ec, VALUE self, VALUE port)
+{
+ rb_ractor_t *r = RACTOR_PTR(self);
+ bool terminated = false;
+ const struct ractor_port *rp = RACTOR_PORT_PTR(port);
+ struct ractor_monitor *rm = ALLOC(struct ractor_monitor);
+ rm->port = *rp; // copy port information
+
+ RACTOR_LOCK(r);
+ {
+ if (UNDEF_P(r->sync.legacy)) { // not terminated
+ RUBY_DEBUG_LOG("OK/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
+ ccan_list_add_tail(&r->sync.monitors, &rm->node);
+ }
+ else {
+ RUBY_DEBUG_LOG("NG/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
+ terminated = true;
+ }
+ }
+ RACTOR_UNLOCK(r);
+
+ if (terminated) {
+ xfree(rm);
+ ractor_port_send(ec, port, ractor_exit_token(r->sync.legacy_exc), Qfalse);
+
+ return Qfalse;
+ }
+ else {
+ return Qtrue;
+ }
+}
+
+static VALUE
+ractor_unmonitor(rb_execution_context_t *ec, VALUE self, VALUE port)
+{
+ rb_ractor_t *r = RACTOR_PTR(self);
+ const struct ractor_port *rp = RACTOR_PORT_PTR(port);
+
+ RACTOR_LOCK(r);
+ {
+ if (UNDEF_P(r->sync.legacy)) { // not terminated
+ struct ractor_monitor *rm, *nxt;
+
+ ccan_list_for_each_safe(&r->sync.monitors, rm, nxt, node) {
+ if (ractor_port_id(&rm->port) == ractor_port_id(rp)) {
+ RUBY_DEBUG_LOG("r:%u -> port:%u@r%u",
+ (unsigned int)rb_ractor_id(r),
+ (unsigned int)ractor_port_id(&rm->port),
+ (unsigned int)rb_ractor_id(rm->port.r));
+ ccan_list_del(&rm->node);
+ xfree(rm);
+ }
+ }
+ }
+ }
+ RACTOR_UNLOCK(r);
+
+ return self;
+}
+
+static void
+ractor_notify_exit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE legacy, bool exc)
+{
+ RUBY_DEBUG_LOG("exc:%d", exc);
+ VM_ASSERT(!UNDEF_P(legacy));
+ VM_ASSERT(cr->sync.legacy == Qundef);
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ ractor_free_all_ports(cr);
+
+ cr->sync.legacy = legacy;
+ cr->sync.legacy_exc = exc;
+ }
+ RACTOR_UNLOCK_SELF(cr);
+
+ // send token
+
+ VALUE token = ractor_exit_token(exc);
+ struct ractor_monitor *rm, *nxt;
+
+ ccan_list_for_each_safe(&cr->sync.monitors, rm, nxt, node)
+ {
+ RUBY_DEBUG_LOG("port:%u@r%u", (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r));
+
+ ractor_try_send(ec, &rm->port, token, false);
+
+ ccan_list_del(&rm->node);
+ xfree(rm);
+ }
+
+ VM_ASSERT(ccan_list_empty(&cr->sync.monitors));
+}
+
+// ractor-internal - initialize, mark, free, memsize
+
+static int
+ractor_mark_ports_i(st_data_t key, st_data_t val, st_data_t data)
+{
+ // id -> ractor_queue
+ const struct ractor_queue *rq = (struct ractor_queue *)val;
+ ractor_queue_mark(rq);
+ return ST_CONTINUE;
+}
+
+static void
+ractor_sync_mark(rb_ractor_t *r)
+{
+ rb_gc_mark(r->sync.default_port_value);
+
+ if (r->sync.ports) {
+ ractor_queue_mark(r->sync.recv_queue);
+ st_foreach(r->sync.ports, ractor_mark_ports_i, 0);
+ }
+
+ ractor_mark_monitors(r);
+}
+
+static int
+ractor_sync_free_ports_i(st_data_t _key, st_data_t val, st_data_t _args)
+{
+ struct ractor_queue *queue = (struct ractor_queue *)val;
+
+ ractor_queue_free(queue);
+
+ return ST_CONTINUE;
+}
+
+static void
+ractor_sync_free(rb_ractor_t *r)
+{
+ if (r->sync.recv_queue) {
+ ractor_queue_free(r->sync.recv_queue);
+ }
+
+ // maybe NULL
+ if (r->sync.ports) {
+ st_foreach(r->sync.ports, ractor_sync_free_ports_i, 0);
+ st_free_table(r->sync.ports);
+ r->sync.ports = NULL;
+ }
+}
+
+static size_t
+ractor_sync_memsize(const rb_ractor_t *r)
+{
+ if (r->sync.ports) {
+ return st_table_size(r->sync.ports);
+ }
+ else {
+ return 0;
+ }
+}
+
+static void
+ractor_sync_init(rb_ractor_t *r)
+{
+ // lock
+ rb_native_mutex_initialize(&r->sync.lock);
+
+ // monitors
+ ccan_list_head_init(&r->sync.monitors);
+
+ // waiters
+ ccan_list_head_init(&r->sync.waiters);
+
+ // receiving queue
+ r->sync.recv_queue = ractor_queue_new();
+
+ // ports
+ r->sync.ports = st_init_numtable();
+ r->sync.default_port_value = ractor_port_new(r);
+ FL_SET_RAW(r->sync.default_port_value, RUBY_FL_SHAREABLE); // only default ports are shareable
+
+ // legacy
+ r->sync.legacy = Qundef;
+
+#ifndef RUBY_THREAD_PTHREAD_H
+ rb_native_cond_initialize(&r->sync.wakeup_cond);
+#endif
+}
+
+// Ractor#value
+
+static rb_ractor_t *
+ractor_set_successor_once(rb_ractor_t *r, rb_ractor_t *cr)
+{
+ if (r->sync.successor == NULL) {
+ rb_ractor_t *successor = ATOMIC_PTR_CAS(r->sync.successor, NULL, cr);
+ return successor == NULL ? cr : successor;
+ }
+
+ return r->sync.successor;
+}
+
+static VALUE ractor_reset_belonging(VALUE obj);
+
+static VALUE
+ractor_make_remote_exception(VALUE cause, VALUE sender)
+{
+ VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
+ rb_ivar_set(err, rb_intern("@ractor"), sender);
+ rb_ec_setup_exception(NULL, err, cause);
+ return err;
+}
+
+static VALUE
+ractor_value(rb_execution_context_t *ec, VALUE self)
+{
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ rb_ractor_t *r = RACTOR_PTR(self);
+ rb_ractor_t *sr = ractor_set_successor_once(r, cr);
+
+ if (sr == cr) {
+ ractor_reset_belonging(r->sync.legacy);
+
+ if (r->sync.legacy_exc) {
+ rb_exc_raise(ractor_make_remote_exception(r->sync.legacy, self));
+ }
+ return r->sync.legacy;
+ }
+ else {
+ rb_raise(rb_eRactorError, "Only the successor ractor can take a value");
+ }
+}
+
+static VALUE ractor_move(VALUE obj); // in this file
+static VALUE ractor_copy(VALUE obj); // in this file
+
+static VALUE
+ractor_prepare_payload(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type *ptype)
+{
+ switch (*ptype) {
+ case basket_type_ref:
+ return obj;
+ case basket_type_move:
+ return ractor_move(obj);
+ default:
+ if (rb_ractor_shareable_p(obj)) {
+ *ptype = basket_type_ref;
+ return obj;
+ }
+ else {
+ *ptype = basket_type_copy;
+ return ractor_copy(obj);
+ }
+ }
+}
+
+static struct ractor_basket *
+ractor_basket_new(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type type, bool exc)
+{
+ VALUE v = ractor_prepare_payload(ec, obj, &type);
+
+ struct ractor_basket *b = ractor_basket_alloc();
+ b->type = type;
+ b->p.v = v;
+ b->p.exception = exc;
+ return b;
+}
+
+static VALUE
+ractor_basket_value(struct ractor_basket *b)
+{
+ switch (b->type) {
+ case basket_type_ref:
+ break;
+ case basket_type_copy:
+ case basket_type_move:
+ ractor_reset_belonging(b->p.v);
+ break;
+ default:
+ VM_ASSERT(0); // unreachable
+ }
+
+ VM_ASSERT(!RB_TYPE_P(b->p.v, T_NONE));
+ return b->p.v;
+}
+
+static VALUE
+ractor_basket_accept(struct ractor_basket *b)
+{
+ VALUE v = ractor_basket_value(b);
+
+ if (b->p.exception) {
+ VALUE err = ractor_make_remote_exception(v, b->sender);
+ ractor_basket_free(b);
+ rb_exc_raise(err);
+ }
+
+ ractor_basket_free(b);
+ return v;
+}
+
+// Ractor blocking by receive
+
+enum ractor_wakeup_status {
+ wakeup_none,
+ wakeup_by_send,
+ wakeup_by_interrupt,
+
+ // wakeup_by_close,
+};
+
+struct ractor_waiter {
+ enum ractor_wakeup_status wakeup_status;
+ rb_thread_t *th;
+ struct ccan_list_node node;
+};
+
+#if VM_CHECK_MODE > 0
+static bool
+ractor_waiter_included(rb_ractor_t *cr, rb_thread_t *th)
+{
+ ASSERT_ractor_locking(cr);
+
+ struct ractor_waiter *w;
+
+ ccan_list_for_each(&cr->sync.waiters, w, node) {
+ if (w->th == th) {
+ return true;
+ }
+ }
+
+ return false;
+}
+#endif
+
+#if USE_RUBY_DEBUG_LOG
+
+static const char *
+wakeup_status_str(enum ractor_wakeup_status wakeup_status)
+{
+ switch (wakeup_status) {
+ case wakeup_none: return "none";
+ case wakeup_by_send: return "by_send";
+ case wakeup_by_interrupt: return "by_interrupt";
+ // case wakeup_by_close: return "by_close";
+ }
+ rb_bug("unreachable");
+}
+
+static const char *
+basket_type_name(enum ractor_basket_type type)
+{
+ switch (type) {
+ case basket_type_none: return "none";
+ case basket_type_ref: return "ref";
+ case basket_type_copy: return "copy";
+ case basket_type_move: return "move";
+ }
+ VM_ASSERT(0);
+ return NULL;
+}
+
+#endif // USE_RUBY_DEBUG_LOG
+
+#ifdef RUBY_THREAD_PTHREAD_H
+
+//
+
+#else // win32
+
+static void
+ractor_cond_wait(rb_ractor_t *r)
+{
+#if RACTOR_CHECK_MODE > 0
+ VALUE locked_by = r->sync.locked_by;
+ r->sync.locked_by = Qnil;
+#endif
+ rb_native_cond_wait(&r->sync.wakeup_cond, &r->sync.lock);
+
+#if RACTOR_CHECK_MODE > 0
+ r->sync.locked_by = locked_by;
+#endif
+}
+
+static void *
+ractor_wait_no_gvl(void *ptr)
+{
+ struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
+ rb_ractor_t *cr = waiter->th->ractor;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ if (waiter->wakeup_status == wakeup_none) {
+ ractor_cond_wait(cr);
+ }
+ }
+ RACTOR_UNLOCK_SELF(cr);
+ return NULL;
+}
+
+static void
+rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ptr)
+{
+ struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
+
+ RACTOR_UNLOCK(cr);
+ {
+ rb_nogvl(ractor_wait_no_gvl, waiter,
+ ubf, waiter,
+ RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
+ }
+ RACTOR_LOCK(cr);
+}
+
+static void
+rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
+{
+ // ractor lock is acquired
+ rb_native_cond_broadcast(&r->sync.wakeup_cond);
+}
+#endif
+
+static bool
+ractor_wakeup_all(rb_ractor_t *r, enum ractor_wakeup_status wakeup_status)
+{
+ ASSERT_ractor_unlocking(r);
+
+ RUBY_DEBUG_LOG("r:%u wakeup:%s", rb_ractor_id(r), wakeup_status_str(wakeup_status));
+
+ bool wakeup_p = false;
+
+ RACTOR_LOCK(r);
+ while (1) {
+ struct ractor_waiter *waiter = ccan_list_pop(&r->sync.waiters, struct ractor_waiter, node);
+
+ if (waiter) {
+ VM_ASSERT(waiter->wakeup_status == wakeup_none);
+
+ waiter->wakeup_status = wakeup_status;
+ rb_ractor_sched_wakeup(r, waiter->th);
+
+ wakeup_p = true;
+ }
+ else {
+ break;
+ }
+ }
+ RACTOR_UNLOCK(r);
+
+ return wakeup_p;
+}
+
+static void
+ubf_ractor_wait(void *ptr)
+{
+ struct ractor_waiter *waiter = (struct ractor_waiter *)ptr;
+
+ rb_thread_t *th = waiter->th;
+ rb_ractor_t *r = th->ractor;
+
+ // clear ubf and nobody can kick UBF
+ th->unblock.func = NULL;
+ th->unblock.arg = NULL;
+
+ rb_native_mutex_unlock(&th->interrupt_lock);
+ {
+ RACTOR_LOCK(r);
+ {
+ if (waiter->wakeup_status == wakeup_none) {
+ RUBY_DEBUG_LOG("waiter:%p", (void *)waiter);
+
+ waiter->wakeup_status = wakeup_by_interrupt;
+ ccan_list_del(&waiter->node);
+
+ rb_ractor_sched_wakeup(r, waiter->th);
+ }
+ }
+ RACTOR_UNLOCK(r);
+ }
+ rb_native_mutex_lock(&th->interrupt_lock);
+}
+
+static enum ractor_wakeup_status
+ractor_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
+{
+ rb_thread_t *th = rb_ec_thread_ptr(ec);
+
+ struct ractor_waiter waiter = {
+ .wakeup_status = wakeup_none,
+ .th = th,
+ };
+
+ RUBY_DEBUG_LOG("wait%s", "");
+
+ ASSERT_ractor_locking(cr);
+
+ VM_ASSERT(GET_RACTOR() == cr);
+ VM_ASSERT(!ractor_waiter_included(cr, th));
+
+ ccan_list_add_tail(&cr->sync.waiters, &waiter.node);
+
+ // resume another ready thread and wait for an event
+ rb_ractor_sched_wait(ec, cr, ubf_ractor_wait, &waiter);
+
+ if (waiter.wakeup_status == wakeup_none) {
+ ccan_list_del(&waiter.node);
+ }
+
+ RUBY_DEBUG_LOG("wakeup_status:%s", wakeup_status_str(waiter.wakeup_status));
+
+ RACTOR_UNLOCK_SELF(cr);
+ {
+ rb_ec_check_ints(ec);
+ }
+ RACTOR_LOCK_SELF(cr);
+
+ VM_ASSERT(!ractor_waiter_included(cr, th));
+ return waiter.wakeup_status;
+}
+
+static void
+ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr)
+{
+ ASSERT_ractor_locking(cr);
+ struct ractor_queue *recv_q = cr->sync.recv_queue;
+
+ struct ractor_basket *b;
+ while ((b = ractor_queue_deq(cr, recv_q)) != NULL) {
+ ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, true), b);
+ }
+}
+
+static bool
+ractor_check_received(rb_ractor_t *cr, struct ractor_queue *messages)
+{
+ struct ractor_queue *received_queue = cr->sync.recv_queue;
+ bool received = false;
+
+ ASSERT_ractor_locking(cr);
+
+ if (ractor_queue_empty_p(cr, received_queue)) {
+ RUBY_DEBUG_LOG("empty");
+ }
+ else {
+ received = true;
+
+ // messages <- incoming
+ ractor_queue_init(messages);
+ ractor_queue_move(messages, received_queue);
+ }
+
+ VM_ASSERT(ractor_queue_empty_p(cr, received_queue));
+
+ RUBY_DEBUG_LOG("received:%d", received);
+ return received;
+}
+
+static void
+ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
+{
+ struct ractor_queue messages;
+ bool deliverred = false;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ if (ractor_check_received(cr, &messages)) {
+ deliverred = true;
+ }
+ else {
+ ractor_wait(ec, cr);
+ }
+ }
+ RACTOR_UNLOCK_SELF(cr);
+
+ if (deliverred) {
+ VM_ASSERT(!ractor_queue_empty_p(cr, &messages));
+ struct ractor_basket *b;
+
+ while ((b = ractor_queue_deq(cr, &messages)) != NULL) {
+ ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, false), b);
+ }
+ }
+}
+
+static VALUE
+ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp)
+{
+ struct ractor_queue *rq = ractor_get_queue(cr, ractor_port_id(rp), false);
+
+ if (rq == NULL) {
+ rb_raise(rb_eRactorClosedError, "The port was already closed");
+ }
+
+ struct ractor_basket *b = ractor_queue_deq(cr, rq);
+
+ if (rq->closed && ractor_queue_empty_p(cr, rq)) {
+ ractor_delete_port(cr, ractor_port_id(rp), false);
+ }
+
+ if (b) {
+ return ractor_basket_accept(b);
+ }
+ else {
+ return Qundef;
+ }
+}
+
+static VALUE
+ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp)
+{
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ VM_ASSERT(cr == rp->r);
+
+ RUBY_DEBUG_LOG("port:%u", (unsigned int)ractor_port_id(rp));
+
+ while (1) {
+ VALUE v = ractor_try_receive(ec, cr, rp);
+
+ if (v != Qundef) {
+ return v;
+ }
+ else {
+ ractor_wait_receive(ec, cr);
+ }
+ }
+}
+
+// Ractor#send
+
+static void
+ractor_send_basket(rb_execution_context_t *ec, const struct ractor_port *rp, struct ractor_basket *b, bool raise_on_error)
+{
+ bool closed = false;
+
+ RUBY_DEBUG_LOG("port:%u@r%u b:%s v:%p", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r), basket_type_name(b->type), (void *)b->p.v);
+
+ RACTOR_LOCK(rp->r);
+ {
+ if (ractor_closed_port_p(ec, rp->r, rp)) {
+ closed = true;
+ }
+ else {
+ b->port_id = ractor_port_id(rp);
+ ractor_queue_enq(rp->r, rp->r->sync.recv_queue, b);
+ }
+ }
+ RACTOR_UNLOCK(rp->r);
+
+ // NOTE: ref r -> b->p.v is created, but Ractor is unprotected object, so no problem on that.
+
+ if (!closed) {
+ ractor_wakeup_all(rp->r, wakeup_by_send);
+ }
+ else {
+ RUBY_DEBUG_LOG("closed:%u@r%u", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r));
+
+ if (raise_on_error) {
+ ractor_basket_free(b);
+ rb_raise(rb_eRactorClosedError, "The port was already closed");
+ }
+ }
+}
+
+static VALUE
+ractor_send0(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move, bool raise_on_error)
+{
+ struct ractor_basket *b = ractor_basket_new(ec, obj, RTEST(move) ? basket_type_move : basket_type_none, false);
+ ractor_send_basket(ec, rp, b, raise_on_error);
+ RB_GC_GUARD(obj);
+ return rp->r->pub.self;
+}
+
+static VALUE
+ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move)
+{
+ return ractor_send0(ec, rp, obj, move, true);
+}
+
+static VALUE
+ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move)
+{
+ return ractor_send0(ec, rp, obj, move, false);
+}
+
+// Ractor::Selector
+
+struct ractor_selector {
+ struct st_table *ports; // rpv -> rp
+
+};
+
+static int
+ractor_selector_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
+{
+ rb_gc_mark((VALUE)key); // rpv
+
+ return ST_CONTINUE;
+}
+
+static void
+ractor_selector_mark(void *ptr)
+{
+ struct ractor_selector *s = ptr;
+
+ if (s->ports) {
+ st_foreach(s->ports, ractor_selector_mark_i, 0);
+ }
+}
+
+static void
+ractor_selector_free(void *ptr)
+{
+ struct ractor_selector *s = ptr;
+ st_free_table(s->ports);
+ ruby_xfree(ptr);
+}
+
+static size_t
+ractor_selector_memsize(const void *ptr)
+{
+ const struct ractor_selector *s = ptr;
+ size_t size = sizeof(struct ractor_selector);
+ if (s->ports) {
+ size += st_memsize(s->ports);
+ }
+ return size;
+}
+
+static const rb_data_type_t ractor_selector_data_type = {
+ "ractor/selector",
+ {
+ ractor_selector_mark,
+ ractor_selector_free,
+ ractor_selector_memsize,
+ NULL, // update
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
+};
+
+static struct ractor_selector *
+RACTOR_SELECTOR_PTR(VALUE selv)
+{
+ VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
+ return (struct ractor_selector *)DATA_PTR(selv);
+}
+
+// Ractor::Selector.new
+
+static VALUE
+ractor_selector_create(VALUE klass)
+{
+ struct ractor_selector *s;
+ VALUE selv = TypedData_Make_Struct(klass, struct ractor_selector, &ractor_selector_data_type, s);
+ s->ports = st_init_numtable(); // TODO
+ return selv;
+}
+
+// Ractor::Selector#add(r)
+
+/*
+ * call-seq:
+ * add(ractor) -> ractor
+ *
+ * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added.
+ * Returns _ractor_.
+ */
+static VALUE
+ractor_selector_add(VALUE selv, VALUE rpv)
+{
+ if (!ractor_port_p(rpv)) {
+ rb_raise(rb_eArgError, "Not a Ractor::Port object");
+ }
+
+ struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+ const struct ractor_port *rp = RACTOR_PORT_PTR(rpv);
+
+ if (st_lookup(s->ports, (st_data_t)rpv, NULL)) {
+ rb_raise(rb_eArgError, "already added");
+ }
+
+ st_insert(s->ports, (st_data_t)rpv, (st_data_t)rp);
+ RB_OBJ_WRITTEN(selv, Qundef, rpv);
+
+ return selv;
+}
+
+// Ractor::Selector#remove(r)
+
+/* call-seq:
+ * remove(ractor) -> ractor
+ *
+ * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added.
+ * Returns the removed _ractor_.
+ */
+static VALUE
+ractor_selector_remove(VALUE selv, VALUE rpv)
+{
+ if (!ractor_port_p(rpv)) {
+ rb_raise(rb_eArgError, "Not a Ractor::Port object");
+ }
+
+ struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+
+ if (!st_lookup(s->ports, (st_data_t)rpv, NULL)) {
+ rb_raise(rb_eArgError, "not added yet");
+ }
+
+ st_delete(s->ports, (st_data_t *)&rpv, NULL);
+
+ return selv;
+}
+
+// Ractor::Selector#clear
+
+/*
+ * call-seq:
+ * clear -> self
+ *
+ * Removes all ractors from +self+. Raises +self+.
+ */
+static VALUE
+ractor_selector_clear(VALUE selv)
+{
+ struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+ st_clear(s->ports);
+ return selv;
+}
+
+/*
+ * call-seq:
+ * empty? -> true or false
+ *
+ * Returns +true+ if no ractor is added.
+ */
+static VALUE
+ractor_selector_empty_p(VALUE selv)
+{
+ struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+ return s->ports->num_entries == 0 ? Qtrue : Qfalse;
+}
+
+// Ractor::Selector#wait
+
+struct ractor_selector_wait_data {
+ rb_ractor_t *cr;
+ rb_execution_context_t *ec;
+ bool found;
+ VALUE v;
+ VALUE rpv;
+};
+
+static int
+ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t data)
+{
+ struct ractor_selector_wait_data *p = (struct ractor_selector_wait_data *)data;
+ const struct ractor_port *rp = (const struct ractor_port *)val;
+
+ VALUE v = ractor_try_receive(p->ec, p->cr, rp);
+
+ if (v != Qundef) {
+ p->found = true;
+ p->v = v;
+ p->rpv = (VALUE)key;
+ return ST_STOP;
+ }
+ else {
+ return ST_CONTINUE;
+ }
+}
+
+static VALUE
+ractor_selector__wait(rb_execution_context_t *ec, VALUE selector)
+{
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ struct ractor_selector *s = RACTOR_SELECTOR_PTR(selector);
+
+ struct ractor_selector_wait_data data = {
+ .ec = ec,
+ .cr = cr,
+ .found = false,
+ };
+
+ while (1) {
+ st_foreach(s->ports, ractor_selector_wait_i, (st_data_t)&data);
+
+ if (data.found) {
+ return rb_ary_new_from_args(2, data.rpv, data.v);
+ }
+
+ ractor_wait_receive(ec, cr);
+ }
+}
+
+/*
+ * call-seq:
+ * wait(receive: false, yield_value: undef, move: false) -> [ractor, value]
+ *
+ * Waits until any ractor in _selector_ can be active.
+ */
+static VALUE
+ractor_selector_wait(VALUE selector)
+{
+ return ractor_selector__wait(GET_EC(), selector);
+}
+
+static VALUE
+ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
+{
+ VALUE selector = ractor_selector_create(klass);
+
+ for (int i=0; i<argc; i++) {
+ ractor_selector_add(selector, ractors[i]);
+ }
+
+ return selector;
+}
+
+static VALUE
+ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ports)
+{
+ VALUE selector = ractor_selector_new(RARRAY_LENINT(ports), (VALUE *)RARRAY_CONST_PTR(ports), rb_cRactorSelector);
+ VALUE result = ractor_selector__wait(ec, selector);
+
+ RB_GC_GUARD(selector);
+ RB_GC_GUARD(ports);
+ return result;
+}
+
+#ifndef USE_RACTOR_SELECTOR
+#define USE_RACTOR_SELECTOR 0
+#endif
+
+RUBY_SYMBOL_EXPORT_BEGIN
+void rb_init_ractor_selector(void);
+RUBY_SYMBOL_EXPORT_END
+
+/*
+ * Document-class: Ractor::Selector
+ * :nodoc: currently
+ *
+ * Selects multiple Ractors to be activated.
+ */
+void
+rb_init_ractor_selector(void)
+{
+ rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
+ rb_undef_alloc_func(rb_cRactorSelector);
+
+ rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
+ rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
+ rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
+ rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
+ rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
+ rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, 0);
+}
+
+static void
+Init_RactorPort(void)
+{
+ rb_cRactorPort = rb_define_class_under(rb_cRactor, "Port", rb_cObject);
+ rb_define_alloc_func(rb_cRactorPort, ractor_port_alloc);
+ rb_define_method(rb_cRactorPort, "initialize", ractor_port_initialize, 0);
+ rb_define_method(rb_cRactorPort, "initialize_copy", ractor_port_initialize_copy, 1);
+
+#if USE_RACTOR_SELECTOR
+ rb_init_ractor_selector();
+#endif
+}