From a9a7f4d8b8ec30abc7a47ce700edc7209ae12279 Mon Sep 17 00:00:00 2001 From: Koichi Sasada Date: Tue, 8 Dec 2020 14:04:18 +0900 Subject: Ractor#receive_if to receive only matched messages Instead of Ractor.receive, Ractor.receive_if can provide a pattern by a block and you can choose the receiving message. [Feature #17378] --- ractor.c | 304 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 272 insertions(+), 32 deletions(-) (limited to 'ractor.c') diff --git a/ractor.c b/ractor.c index f7be7daea9..d0f5185225 100644 --- a/ractor.c +++ b/ractor.c @@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) return rb_ractor_status_p(r, status); } +static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i); + static void ractor_queue_mark(struct rb_ractor_queue *rq) { for (int i=0; icnt; i++) { - int idx = (rq->start + i) % rq->size; - rb_gc_mark(rq->baskets[idx].v); - rb_gc_mark(rq->baskets[idx].sender); + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + rb_gc_mark(b->v); + rb_gc_mark(b->sender); } } @@ -191,6 +193,8 @@ ractor_mark(void *ptr) rb_gc_mark(r->sync.wait.taken_basket.sender); rb_gc_mark(r->sync.wait.yielded_basket.v); rb_gc_mark(r->sync.wait.yielded_basket.sender); + rb_gc_mark(r->receiving_mutex); + rb_gc_mark(r->loc); rb_gc_mark(r->name); rb_gc_mark(r->r_stdin); @@ -317,33 +321,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq) rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); } +static struct rb_ractor_basket * +ractor_queue_at(struct rb_ractor_queue *rq, int i) +{ + return &rq->baskets[(rq->start + i) % rq->size]; +} + +static void +ractor_queue_advance(struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(GET_RACTOR()); + + if (rq->reserved_cnt == 0) { + rq->cnt--; + rq->start = (rq->start + 1) % rq->size; + rq->serial++; + } + else { + ractor_queue_at(rq, 0)->type = basket_type_deleted; + } +} + +static bool +ractor_queue_skip_p(struct rb_ractor_queue *rq, int i) +{ + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + return b->type == basket_type_deleted || + b->type == basket_type_reserved; +} + +static void +ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(r); + + while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) { + ractor_queue_advance(rq); + } +} + static bool ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) { ASSERT_ractor_locking(r); - return rq->cnt == 0; + + if (rq->cnt == 0) { + return true; + } + + ractor_queue_compact(r, rq); + + for (int i=0; icnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + return false; + } + } + + return true; } static bool ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - bool b; + bool found = false; RACTOR_LOCK(r); { if (!ractor_queue_empty_p(r, rq)) { - *basket = rq->baskets[rq->start]; - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - b = true; - } - else { - b = false; + for (int i=0; icnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + *basket = *b; + + // remove from queue + b->type = basket_type_deleted; + ractor_queue_compact(r, rq); + found = true; + break; + } + } } } RACTOR_UNLOCK(r); - return b; + return found; } static void @@ -373,24 +434,29 @@ ractor_basket_clear(struct rb_ractor_basket *b) static VALUE ractor_reset_belonging(VALUE obj); // in this file static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) +ractor_basket_value(struct rb_ractor_basket *b) { - VALUE v; - switch (b->type) { case basket_type_ref: - VM_ASSERT(rb_ractor_shareable_p(b->v)); - v = b->v; break; case basket_type_copy: case basket_type_move: case basket_type_will: - v = ractor_reset_belonging(b->v); + b->type = basket_type_ref; + b->v = ractor_reset_belonging(b->v); break; default: rb_bug("unreachable"); } + return b->v; +} + +static VALUE +ractor_basket_accept(struct rb_ractor_basket *b) +{ + VALUE v = ractor_basket_value(b); + if (b->exception) { VALUE cause = v; VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); @@ -404,12 +470,22 @@ ractor_basket_accept(struct rb_ractor_basket *b) return v; } +static void +ractor_recursive_receive_if(rb_ractor_t *r) +{ + if (r->receiving_mutex && rb_mutex_locked_p(r->receiving_mutex)) { + rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); + } +} + static VALUE ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) { struct rb_ractor_queue *rq = &r->sync.incoming_queue; struct rb_ractor_basket basket; + ractor_recursive_receive_if(r); + if (ractor_queue_deq(r, rq, &basket) == false) { if (r->sync.incoming_port_closed) { rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); @@ -616,29 +692,193 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) } } +static void +ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr) +{ + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); + ractor_recursive_receive_if(cr); + + RACTOR_LOCK(cr); + { + if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) { + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_receiving; + cr->sync.wait.wakeup_status = wakeup_none; + ractor_sleep(ec, cr); + cr->sync.wait.wakeup_status = wakeup_none; + } + } + RACTOR_UNLOCK(cr); +} + static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r) +ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) { - VM_ASSERT(r == rb_ec_ractor_ptr(ec)); + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); VALUE v; - while ((v = ractor_try_receive(ec, r)) == Qundef) { - RACTOR_LOCK(r); + while ((v = ractor_try_receive(ec, cr)) == Qundef) { + ractor_receive_wait(ec, cr); + } + + return v; +} + +#if 0 +// for debug +static const char * +basket_type_name(enum rb_ractor_basket_type type) +{ + switch (type) { +#define T(t) case basket_type_##t: return #t + T(none); + T(ref); + T(copy); + T(move); + T(will); + T(deleted); + T(reserved); + default: rb_bug("unreachable"); + } +} + +static void +rq_dump(struct rb_ractor_queue *rq) +{ + bool bug = false; + for (int i=0; icnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); + if (b->type == basket_type_reserved) bug = true; + } + if (bug) rb_bug("!!"); +} +#endif + +struct receive_block_data { + rb_ractor_t *cr; + struct rb_ractor_queue *rq; + VALUE v; + int index; + bool success; +}; + +static void +ractor_receive_if_lock(rb_ractor_t *cr) +{ + VALUE m = cr->receiving_mutex; + if (m == Qfalse) { + m = cr->receiving_mutex = rb_mutex_new(); + } + rb_mutex_lock(m); +} + +static VALUE +receive_if_body(VALUE ptr) +{ + struct receive_block_data *data = (struct receive_block_data *)ptr; + + ractor_receive_if_lock(data->cr); + VALUE block_result = rb_yield(data->v); + + RACTOR_LOCK_SELF(data->cr); + { + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + data->rq->reserved_cnt--; + + if (RTEST(block_result)) { + b->type = basket_type_deleted; + ractor_queue_compact(data->cr, data->rq); + } + else { + b->type = basket_type_ref; + } + } + RACTOR_UNLOCK_SELF(data->cr); + + data->success = true; + + if (RTEST(block_result)) { + return data->v; + } + else { + return Qundef; + } +} + +static VALUE +receive_if_ensure(VALUE v) +{ + struct receive_block_data *data = (struct receive_block_data *)v; + + if (!data->success) { + RACTOR_LOCK_SELF(data->cr); { - if (ractor_queue_empty_p(r, &r->sync.incoming_queue)) { - VM_ASSERT(r->sync.wait.status == wait_none); - r->sync.wait.status = wait_receiving; - r->sync.wait.wakeup_status = wakeup_none; + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + b->type = basket_type_deleted; + data->rq->reserved_cnt--; + } + RACTOR_UNLOCK_SELF(data->cr); + } + + rb_mutex_unlock(data->cr->receiving_mutex); + return Qnil; +} + +static VALUE +ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b) +{ + if (!RTEST(b)) rb_raise(rb_eArgError, "no block given"); + + rb_ractor_t *cr = rb_ec_ractor_ptr(ec); + unsigned int serial = (unsigned int)-1; + int index = 0; + struct rb_ractor_queue *rq = &cr->sync.incoming_queue; + + while (1) { + VALUE v = Qundef; - ractor_sleep(ec, r); + ractor_receive_wait(ec, cr); - r->sync.wait.wakeup_status = wakeup_none; + RACTOR_LOCK_SELF(cr); + { + if (serial != rq->serial) { + serial = rq->serial; + index = 0; + } + + // check newer version + for (int i=index; icnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + v = ractor_basket_value(b); + b->type = basket_type_reserved; + rq->reserved_cnt++; + index = i; + break; + } } } - RACTOR_UNLOCK(r); - } + RACTOR_UNLOCK_SELF(cr); - return v; + if (v != Qundef) { + struct receive_block_data data = { + .cr = cr, + .rq = rq, + .v = v, + .index = index, + .success = false, + }; + + VALUE result = rb_ensure(receive_if_body, (VALUE)&data, + receive_if_ensure, (VALUE)&data); + + if (result != Qundef) return result; + index++; + } + } } static void -- cgit v1.2.3