summaryrefslogtreecommitdiff
path: root/ractor.c
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2020-12-08 14:04:18 +0900
committerKoichi Sasada <ko1@atdot.net>2020-12-16 19:12:48 +0900
commita9a7f4d8b8ec30abc7a47ce700edc7209ae12279 (patch)
treebc0092b4e1643cc0889c53a0941319ecf76ae190 /ractor.c
parentddb93c3d64b8997c85f23c96a73ad0a9a7f82ad1 (diff)
Ractor#receive_if to receive only matched messages
Instead of Ractor.receive, Ractor.receive_if can provide a pattern by a block and you can choose the receiving message. [Feature #17378]
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3862
Diffstat (limited to 'ractor.c')
-rw-r--r--ractor.c304
1 files changed, 272 insertions, 32 deletions
diff --git a/ractor.c b/ractor.c
index f7be7daea9..d0f5185225 100644
--- a/ractor.c
+++ b/ractor.c
@@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status)
return rb_ractor_status_p(r, status);
}
+static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);
+
static void
ractor_queue_mark(struct rb_ractor_queue *rq)
{
for (int i=0; i<rq->cnt; i++) {
- int idx = (rq->start + i) % rq->size;
- rb_gc_mark(rq->baskets[idx].v);
- rb_gc_mark(rq->baskets[idx].sender);
+ struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+ rb_gc_mark(b->v);
+ rb_gc_mark(b->sender);
}
}
@@ -191,6 +193,8 @@ ractor_mark(void *ptr)
rb_gc_mark(r->sync.wait.taken_basket.sender);
rb_gc_mark(r->sync.wait.yielded_basket.v);
rb_gc_mark(r->sync.wait.yielded_basket.sender);
+ rb_gc_mark(r->receiving_mutex);
+
rb_gc_mark(r->loc);
rb_gc_mark(r->name);
rb_gc_mark(r->r_stdin);
@@ -317,33 +321,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq)
rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
}
+static struct rb_ractor_basket *
+ractor_queue_at(struct rb_ractor_queue *rq, int i)
+{
+ return &rq->baskets[(rq->start + i) % rq->size];
+}
+
+static void
+ractor_queue_advance(struct rb_ractor_queue *rq)
+{
+ ASSERT_ractor_locking(GET_RACTOR());
+
+ if (rq->reserved_cnt == 0) {
+ rq->cnt--;
+ rq->start = (rq->start + 1) % rq->size;
+ rq->serial++;
+ }
+ else {
+ ractor_queue_at(rq, 0)->type = basket_type_deleted;
+ }
+}
+
+static bool
+ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
+{
+ struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+ return b->type == basket_type_deleted ||
+ b->type == basket_type_reserved;
+}
+
+static void
+ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
+{
+ ASSERT_ractor_locking(r);
+
+ while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
+ ractor_queue_advance(rq);
+ }
+}
+
static bool
ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
{
ASSERT_ractor_locking(r);
- return rq->cnt == 0;
+
+ if (rq->cnt == 0) {
+ return true;
+ }
+
+ ractor_queue_compact(r, rq);
+
+ for (int i=0; i<rq->cnt; i++) {
+ if (!ractor_queue_skip_p(rq, i)) {
+ return false;
+ }
+ }
+
+ return true;
}
static bool
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
{
- bool b;
+ bool found = false;
RACTOR_LOCK(r);
{
if (!ractor_queue_empty_p(r, rq)) {
- *basket = rq->baskets[rq->start];
- rq->cnt--;
- rq->start = (rq->start + 1) % rq->size;
- b = true;
- }
- else {
- b = false;
+ for (int i=0; i<rq->cnt; i++) {
+ if (!ractor_queue_skip_p(rq, i)) {
+ struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+ *basket = *b;
+
+ // remove from queue
+ b->type = basket_type_deleted;
+ ractor_queue_compact(r, rq);
+ found = true;
+ break;
+ }
+ }
}
}
RACTOR_UNLOCK(r);
- return b;
+ return found;
}
static void
@@ -373,24 +434,29 @@ ractor_basket_clear(struct rb_ractor_basket *b)
static VALUE ractor_reset_belonging(VALUE obj); // in this file
static VALUE
-ractor_basket_accept(struct rb_ractor_basket *b)
+ractor_basket_value(struct rb_ractor_basket *b)
{
- VALUE v;
-
switch (b->type) {
case basket_type_ref:
- VM_ASSERT(rb_ractor_shareable_p(b->v));
- v = b->v;
break;
case basket_type_copy:
case basket_type_move:
case basket_type_will:
- v = ractor_reset_belonging(b->v);
+ b->type = basket_type_ref;
+ b->v = ractor_reset_belonging(b->v);
break;
default:
rb_bug("unreachable");
}
+ return b->v;
+}
+
+static VALUE
+ractor_basket_accept(struct rb_ractor_basket *b)
+{
+ VALUE v = ractor_basket_value(b);
+
if (b->exception) {
VALUE cause = v;
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
@@ -404,12 +470,22 @@ ractor_basket_accept(struct rb_ractor_basket *b)
return v;
}
+static void
+ractor_recursive_receive_if(rb_ractor_t *r)
+{
+ if (r->receiving_mutex && rb_mutex_locked_p(r->receiving_mutex)) {
+ rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
+ }
+}
+
static VALUE
ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
{
struct rb_ractor_queue *rq = &r->sync.incoming_queue;
struct rb_ractor_basket basket;
+ ractor_recursive_receive_if(r);
+
if (ractor_queue_deq(r, rq, &basket) == false) {
if (r->sync.incoming_port_closed) {
rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
@@ -616,29 +692,193 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
}
}
+static void
+ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
+{
+ VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
+ ractor_recursive_receive_if(cr);
+
+ RACTOR_LOCK(cr);
+ {
+ if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) {
+ VM_ASSERT(cr->sync.wait.status == wait_none);
+ cr->sync.wait.status = wait_receiving;
+ cr->sync.wait.wakeup_status = wakeup_none;
+ ractor_sleep(ec, cr);
+ cr->sync.wait.wakeup_status = wakeup_none;
+ }
+ }
+ RACTOR_UNLOCK(cr);
+}
+
static VALUE
-ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
+ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
{
- VM_ASSERT(r == rb_ec_ractor_ptr(ec));
+ VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
VALUE v;
- while ((v = ractor_try_receive(ec, r)) == Qundef) {
- RACTOR_LOCK(r);
+ while ((v = ractor_try_receive(ec, cr)) == Qundef) {
+ ractor_receive_wait(ec, cr);
+ }
+
+ return v;
+}
+
+#if 0
+// for debug
+static const char *
+basket_type_name(enum rb_ractor_basket_type type)
+{
+ switch (type) {
+#define T(t) case basket_type_##t: return #t
+ T(none);
+ T(ref);
+ T(copy);
+ T(move);
+ T(will);
+ T(deleted);
+ T(reserved);
+ default: rb_bug("unreachable");
+ }
+}
+
+static void
+rq_dump(struct rb_ractor_queue *rq)
+{
+ bool bug = false;
+ for (int i=0; i<rq->cnt; i++) {
+ struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+ fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
+ if (b->type == basket_type_reserved) bug = true;
+ }
+ if (bug) rb_bug("!!");
+}
+#endif
+
+struct receive_block_data {
+ rb_ractor_t *cr;
+ struct rb_ractor_queue *rq;
+ VALUE v;
+ int index;
+ bool success;
+};
+
+static void
+ractor_receive_if_lock(rb_ractor_t *cr)
+{
+ VALUE m = cr->receiving_mutex;
+ if (m == Qfalse) {
+ m = cr->receiving_mutex = rb_mutex_new();
+ }
+ rb_mutex_lock(m);
+}
+
+static VALUE
+receive_if_body(VALUE ptr)
+{
+ struct receive_block_data *data = (struct receive_block_data *)ptr;
+
+ ractor_receive_if_lock(data->cr);
+ VALUE block_result = rb_yield(data->v);
+
+ RACTOR_LOCK_SELF(data->cr);
+ {
+ struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
+ VM_ASSERT(b->type == basket_type_reserved);
+ data->rq->reserved_cnt--;
+
+ if (RTEST(block_result)) {
+ b->type = basket_type_deleted;
+ ractor_queue_compact(data->cr, data->rq);
+ }
+ else {
+ b->type = basket_type_ref;
+ }
+ }
+ RACTOR_UNLOCK_SELF(data->cr);
+
+ data->success = true;
+
+ if (RTEST(block_result)) {
+ return data->v;
+ }
+ else {
+ return Qundef;
+ }
+}
+
+static VALUE
+receive_if_ensure(VALUE v)
+{
+ struct receive_block_data *data = (struct receive_block_data *)v;
+
+ if (!data->success) {
+ RACTOR_LOCK_SELF(data->cr);
{
- if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) {
- VM_ASSERT(r->sync.wait.status == wait_none);
- r->sync.wait.status = wait_receiving;
- r->sync.wait.wakeup_status = wakeup_none;
+ struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
+ VM_ASSERT(b->type == basket_type_reserved);
+ b->type = basket_type_deleted;
+ data->rq->reserved_cnt--;
+ }
+ RACTOR_UNLOCK_SELF(data->cr);
+ }
+
+ rb_mutex_unlock(data->cr->receiving_mutex);
+ return Qnil;
+}
+
+static VALUE
+ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
+{
+ if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
+
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ unsigned int serial = (unsigned int)-1;
+ int index = 0;
+ struct rb_ractor_queue *rq = &cr->sync.incoming_queue;
+
+ while (1) {
+ VALUE v = Qundef;
- ractor_sleep(ec, r);
+ ractor_receive_wait(ec, cr);
- r->sync.wait.wakeup_status = wakeup_none;
+ RACTOR_LOCK_SELF(cr);
+ {
+ if (serial != rq->serial) {
+ serial = rq->serial;
+ index = 0;
+ }
+
+ // check newer version
+ for (int i=index; i<rq->cnt; i++) {
+ if (!ractor_queue_skip_p(rq, i)) {
+ struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+ v = ractor_basket_value(b);
+ b->type = basket_type_reserved;
+ rq->reserved_cnt++;
+ index = i;
+ break;
+ }
}
}
- RACTOR_UNLOCK(r);
- }
+ RACTOR_UNLOCK_SELF(cr);
- return v;
+ if (v != Qundef) {
+ struct receive_block_data data = {
+ .cr = cr,
+ .rq = rq,
+ .v = v,
+ .index = index,
+ .success = false,
+ };
+
+ VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
+ receive_if_ensure, (VALUE)&data);
+
+ if (result != Qundef) return result;
+ index++;
+ }
+ }
}
static void