summaryrefslogtreecommitdiff
path: root/ractor.c
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.c')
-rw-r--r--ractor.c2142
1 files changed, 1374 insertions, 768 deletions
diff --git a/ractor.c b/ractor.c
index bf19aa62d3..7b9c088ceb 100644
--- a/ractor.c
+++ b/ractor.c
@@ -5,19 +5,22 @@
#include "ruby/ractor.h"
#include "ruby/thread_native.h"
#include "vm_core.h"
+#include "eval_intern.h"
#include "vm_sync.h"
#include "ractor_core.h"
#include "internal/complex.h"
#include "internal/error.h"
+#include "internal/gc.h"
#include "internal/hash.h"
#include "internal/rational.h"
#include "internal/struct.h"
#include "internal/thread.h"
#include "variable.h"
-#include "gc.h"
-#include "transient_heap.h"
+#include "yjit.h"
+#include "rjit.h"
VALUE rb_cRactor;
+static VALUE rb_cRactorSelector;
VALUE rb_eRactorUnsafeError;
VALUE rb_eRactorIsolationError;
@@ -29,11 +32,13 @@ static VALUE rb_cRactorMovedObject;
static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
+// Ractor locking
+
static void
ASSERT_ractor_unlocking(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
- // GET_EC is NULL in an MJIT worker
+ // GET_EC is NULL in an RJIT worker
if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) {
rb_bug("recursive ractor locking");
}
@@ -44,7 +49,7 @@ static void
ASSERT_ractor_locking(rb_ractor_t *r)
{
#if RACTOR_CHECK_MODE > 0
- // GET_EC is NULL in an MJIT worker
+ // GET_EC is NULL in an RJIT worker
if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) {
rp(r->sync.locked_by);
rb_bug("ractor lock is not acquired.");
@@ -55,25 +60,28 @@ ASSERT_ractor_locking(rb_ractor_t *r)
static void
ractor_lock(rb_ractor_t *r, const char *file, int line)
{
- RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
+ RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
ASSERT_ractor_unlocking(r);
rb_native_mutex_lock(&r->sync.lock);
#if RACTOR_CHECK_MODE > 0
- if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an MJIT worker
- r->sync.locked_by = rb_ractor_self(GET_RACTOR());
+ if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an RJIT worker
+ rb_ractor_t *cr = rb_current_ractor_raw(false);
+ r->sync.locked_by = cr ? rb_ractor_self(cr) : Qundef;
}
#endif
- RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
+ RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
}
static void
ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
{
VM_ASSERT(cr == GET_RACTOR());
+#if RACTOR_CHECK_MODE > 0
VM_ASSERT(cr->sync.locked_by != cr->pub.self);
+#endif
ractor_lock(cr, file, line);
}
@@ -86,14 +94,16 @@ ractor_unlock(rb_ractor_t *r, const char *file, int line)
#endif
rb_native_mutex_unlock(&r->sync.lock);
- RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
+ RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
}
static void
ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
{
VM_ASSERT(cr == GET_RACTOR());
+#if RACTOR_CHECK_MODE > 0
VM_ASSERT(cr->sync.locked_by == cr->pub.self);
+#endif
ractor_unlock(cr, file, line);
}
@@ -102,20 +112,20 @@ ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
-static void
-ractor_cond_wait(rb_ractor_t *r)
+void
+rb_ractor_lock_self(rb_ractor_t *r)
{
-#if RACTOR_CHECK_MODE > 0
- VALUE locked_by = r->sync.locked_by;
- r->sync.locked_by = Qnil;
-#endif
- rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
+ RACTOR_LOCK_SELF(r);
+}
-#if RACTOR_CHECK_MODE > 0
- r->sync.locked_by = locked_by;
-#endif
+void
+rb_ractor_unlock_self(rb_ractor_t *r)
+{
+ RACTOR_UNLOCK_SELF(r);
}
+// Ractor status
+
static const char *
ractor_status_str(enum ractor_status status)
{
@@ -152,7 +162,7 @@ ractor_status_set(rb_ractor_t *r, enum ractor_status status)
VM_ASSERT(status == ractor_running);
break;
case ractor_terminated:
- VM_ASSERT(0); // unreachable
+ rb_bug("unreachable");
break;
}
@@ -165,31 +175,40 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status)
return rb_ractor_status_p(r, status);
}
-static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);
+// Ractor data/mark/free
+
+static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i);
+static void ractor_local_storage_mark(rb_ractor_t *r);
+static void ractor_local_storage_free(rb_ractor_t *r);
static void
ractor_queue_mark(struct rb_ractor_queue *rq)
{
for (int i=0; i<rq->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- rb_gc_mark(b->v);
+ struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
rb_gc_mark(b->sender);
+
+ switch (b->type.e) {
+ case basket_type_yielding:
+ case basket_type_take_basket:
+ case basket_type_deleted:
+ case basket_type_reserved:
+ // ignore
+ break;
+ default:
+ rb_gc_mark(b->p.send.v);
+ }
}
}
-static void ractor_local_storage_mark(rb_ractor_t *r);
-static void ractor_local_storage_free(rb_ractor_t *r);
-
static void
ractor_mark(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
- ractor_queue_mark(&r->sync.incoming_queue);
- rb_gc_mark(r->sync.wait.taken_basket.v);
- rb_gc_mark(r->sync.wait.taken_basket.sender);
- rb_gc_mark(r->sync.wait.yielded_basket.v);
- rb_gc_mark(r->sync.wait.yielded_basket.sender);
+ ractor_queue_mark(&r->sync.recv_queue);
+ ractor_queue_mark(&r->sync.takers_queue);
+
rb_gc_mark(r->receiving_mutex);
rb_gc_mark(r->loc);
@@ -201,7 +220,7 @@ ractor_mark(void *ptr)
if (r->threads.cnt > 0) {
rb_thread_t *th = 0;
- list_for_each(&r->threads.set, th, lt_node) {
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
VM_ASSERT(th != NULL);
rb_gc_mark(th->self);
}
@@ -217,19 +236,16 @@ ractor_queue_free(struct rb_ractor_queue *rq)
}
static void
-ractor_waiting_list_free(struct rb_ractor_waiting_list *wl)
-{
- free(wl->ractors);
-}
-
-static void
ractor_free(void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
+ RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
rb_native_mutex_destroy(&r->sync.lock);
+#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_destroy(&r->sync.cond);
- ractor_queue_free(&r->sync.incoming_queue);
- ractor_waiting_list_free(&r->sync.taking_ractors);
+#endif
+ ractor_queue_free(&r->sync.recv_queue);
+ ractor_queue_free(&r->sync.takers_queue);
ractor_local_storage_free(r);
rb_hook_list_free(&r->pub.hooks);
ruby_xfree(r);
@@ -242,27 +258,21 @@ ractor_queue_memsize(const struct rb_ractor_queue *rq)
}
static size_t
-ractor_waiting_list_memsize(const struct rb_ractor_waiting_list *wl)
-{
- return sizeof(rb_ractor_t *) * wl->size;
-}
-
-static size_t
ractor_memsize(const void *ptr)
{
rb_ractor_t *r = (rb_ractor_t *)ptr;
- // TODO
+ // TODO: more correct?
return sizeof(rb_ractor_t) +
- ractor_queue_memsize(&r->sync.incoming_queue) +
- ractor_waiting_list_memsize(&r->sync.taking_ractors);
+ ractor_queue_memsize(&r->sync.recv_queue) +
+ ractor_queue_memsize(&r->sync.takers_queue);
}
static const rb_data_type_t ractor_data_type = {
"ractor",
{
ractor_mark,
- ractor_free,
+ ractor_free,
ractor_memsize,
NULL, // update
},
@@ -284,16 +294,14 @@ static inline rb_ractor_t *
RACTOR_PTR(VALUE self)
{
VM_ASSERT(rb_ractor_p(self));
-
rb_ractor_t *r = DATA_PTR(self);
- // TODO: check
return r;
}
static rb_atomic_t ractor_last_id;
#if RACTOR_CHECK_MODE > 0
-MJIT_FUNC_EXPORTED uint32_t
+uint32_t
rb_ractor_current_id(void)
{
if (GET_THREAD()->ractor == NULL) {
@@ -305,6 +313,8 @@ rb_ractor_current_id(void)
}
#endif
+// Ractor queue
+
static void
ractor_queue_setup(struct rb_ractor_queue *rq)
{
@@ -315,15 +325,23 @@ ractor_queue_setup(struct rb_ractor_queue *rq)
}
static struct rb_ractor_basket *
-ractor_queue_at(struct rb_ractor_queue *rq, int i)
+ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq)
{
+ if (r != NULL) ASSERT_ractor_locking(r);
+ return &rq->baskets[rq->start];
+}
+
+static struct rb_ractor_basket *
+ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
+{
+ if (r != NULL) ASSERT_ractor_locking(r);
return &rq->baskets[(rq->start + i) % rq->size];
}
static void
-ractor_queue_advance(struct rb_ractor_queue *rq)
+ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq)
{
- ASSERT_ractor_locking(GET_RACTOR());
+ ASSERT_ractor_locking(r);
if (rq->reserved_cnt == 0) {
rq->cnt--;
@@ -331,16 +349,16 @@ ractor_queue_advance(struct rb_ractor_queue *rq)
rq->serial++;
}
else {
- ractor_queue_at(rq, 0)->type = basket_type_deleted;
+ ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted;
}
}
static bool
-ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
+ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
{
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- return b->type == basket_type_deleted ||
- b->type == basket_type_reserved;
+ struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
+ return basket_type_p(b, basket_type_deleted) ||
+ basket_type_p(b, basket_type_reserved);
}
static void
@@ -348,8 +366,8 @@ ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
{
ASSERT_ractor_locking(r);
- while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
- ractor_queue_advance(rq);
+ while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) {
+ ractor_queue_advance(r, rq);
}
}
@@ -365,7 +383,7 @@ ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
ractor_queue_compact(r, rq);
for (int i=0; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(rq, i)) {
+ if (!ractor_queue_skip_p(r, rq, i)) {
return false;
}
}
@@ -376,28 +394,21 @@ ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
static bool
ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
{
- bool found = false;
+ ASSERT_ractor_locking(r);
- RACTOR_LOCK(r);
- {
- if (!ractor_queue_empty_p(r, rq)) {
- for (int i=0; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(rq, i)) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- *basket = *b;
-
- // remove from queue
- b->type = basket_type_deleted;
- ractor_queue_compact(r, rq);
- found = true;
- break;
- }
- }
+ for (int i=0; i<rq->cnt; i++) {
+ if (!ractor_queue_skip_p(r, rq, i)) {
+ struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
+ *basket = *b;
+
+ // remove from queue
+ b->type.e = basket_type_deleted;
+ ractor_queue_compact(r, rq);
+ return true;
}
}
- RACTOR_UNLOCK(r);
- return found;
+ return false;
}
static void
@@ -413,36 +424,36 @@ ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_ba
rq->size *= 2;
}
rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
- // fprintf(stderr, "%s %p->cnt:%d\n", __func__, rq, rq->cnt);
+ // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
}
static void
-ractor_basket_clear(struct rb_ractor_basket *b)
+ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
{
- b->type = basket_type_none;
- b->v = Qfalse;
- b->sender = Qfalse;
+ basket->type.e = basket_type_deleted;
}
+// Ractor basket
+
static VALUE ractor_reset_belonging(VALUE obj); // in this file
static VALUE
ractor_basket_value(struct rb_ractor_basket *b)
{
- switch (b->type) {
+ switch (b->type.e) {
case basket_type_ref:
break;
case basket_type_copy:
case basket_type_move:
case basket_type_will:
- b->type = basket_type_ref;
- b->v = ractor_reset_belonging(b->v);
+ b->type.e = basket_type_ref;
+ b->p.send.v = ractor_reset_belonging(b->p.send.v);
break;
default:
rb_bug("unreachable");
}
- return b->v;
+ return b->p.send.v;
}
static VALUE
@@ -450,65 +461,103 @@ ractor_basket_accept(struct rb_ractor_basket *b)
{
VALUE v = ractor_basket_value(b);
- if (b->exception) {
+ if (b->p.send.exception) {
VALUE cause = v;
VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
rb_ivar_set(err, rb_intern("@ractor"), b->sender);
- ractor_basket_clear(b);
rb_ec_setup_exception(NULL, err, cause);
rb_exc_raise(err);
}
- ractor_basket_clear(b);
return v;
}
-static void
-ractor_recursive_receive_if(rb_ractor_t *r)
+// Ractor synchronizations
+
+#if USE_RUBY_DEBUG_LOG
+static const char *
+wait_status_str(enum rb_ractor_wait_status wait_status)
{
- if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
- rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
+ switch ((int)wait_status) {
+ case wait_none: return "none";
+ case wait_receiving: return "receiving";
+ case wait_taking: return "taking";
+ case wait_yielding: return "yielding";
+ case wait_receiving|wait_taking: return "receiving|taking";
+ case wait_receiving|wait_yielding: return "receiving|yielding";
+ case wait_taking|wait_yielding: return "taking|yielding";
+ case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
}
+ rb_bug("unreachable");
}
-static VALUE
-ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
+static const char *
+wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
{
- struct rb_ractor_queue *rq = &r->sync.incoming_queue;
- struct rb_ractor_basket basket;
-
- ractor_recursive_receive_if(r);
-
- if (ractor_queue_deq(r, rq, &basket) == false) {
- if (r->sync.incoming_port_closed) {
- rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
- }
- else {
- return Qundef;
- }
+ switch (wakeup_status) {
+ case wakeup_none: return "none";
+ case wakeup_by_send: return "by_send";
+ case wakeup_by_yield: return "by_yield";
+ case wakeup_by_take: return "by_take";
+ case wakeup_by_close: return "by_close";
+ case wakeup_by_interrupt: return "by_interrupt";
+ case wakeup_by_retry: return "by_retry";
}
+ rb_bug("unreachable");
+}
- return ractor_basket_accept(&basket);
+static const char *
+basket_type_name(enum rb_ractor_basket_type type)
+{
+ switch (type) {
+ case basket_type_none: return "none";
+ case basket_type_ref: return "ref";
+ case basket_type_copy: return "copy";
+ case basket_type_move: return "move";
+ case basket_type_will: return "will";
+ case basket_type_deleted: return "deleted";
+ case basket_type_reserved: return "reserved";
+ case basket_type_take_basket: return "take_basket";
+ case basket_type_yielding: return "yielding";
+ }
+ VM_ASSERT(0);
+ return NULL;
}
+#endif // USE_RUBY_DEBUG_LOG
static bool
-ractor_sleeping_by(const rb_ractor_t *r, enum ractor_wait_status wait_status)
+ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
{
return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
}
+#ifdef RUBY_THREAD_PTHREAD_H
+// thread_*.c
+void rb_ractor_sched_wakeup(rb_ractor_t *r);
+#else
+
+static void
+rb_ractor_sched_wakeup(rb_ractor_t *r)
+{
+ rb_native_cond_broadcast(&r->sync.cond);
+}
+#endif
+
+
static bool
-ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_wakeup_status wakeup_status)
+ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
{
ASSERT_ractor_locking(r);
- // fprintf(stderr, "%s r:%p status:%s/%s wakeup_status:%s/%s\n", __func__, r,
- // wait_status_str(r->sync.wait.status), wait_status_str(wait_status),
- // wakeup_status_str(r->sync.wait.wakeup_status), wakeup_status_str(wakeup_status));
+ RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
+ rb_ractor_id(r),
+ wait_status_str(r->sync.wait.status),
+ wait_status_str(wait_status),
+ wakeup_status_str(wakeup_status));
if (ractor_sleeping_by(r, wait_status)) {
r->sync.wait.wakeup_status = wakeup_status;
- rb_native_cond_signal(&r->sync.cond);
+ rb_ractor_sched_wakeup(r);
return true;
}
else {
@@ -516,22 +565,6 @@ ractor_wakeup(rb_ractor_t *r, enum ractor_wait_status wait_status, enum ractor_w
}
}
-static void *
-ractor_sleep_wo_gvl(void *ptr)
-{
- rb_ractor_t *cr = ptr;
- RACTOR_LOCK_SELF(cr);
- {
- VM_ASSERT(cr->sync.wait.status != wait_none);
- if (cr->sync.wait.wakeup_status == wakeup_none) {
- ractor_cond_wait(cr);
- }
- cr->sync.wait.status = wait_none;
- }
- RACTOR_UNLOCK_SELF(cr);
- return NULL;
-}
-
static void
ractor_sleep_interrupt(void *ptr)
{
@@ -544,174 +577,175 @@ ractor_sleep_interrupt(void *ptr)
RACTOR_UNLOCK(r);
}
-#if USE_RUBY_DEBUG_LOG
-static const char *
-wait_status_str(enum ractor_wait_status wait_status)
+typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
+
+static void
+ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, ractor_sleep_cleanup_function cf_func, void *cf_data)
{
- switch ((int)wait_status) {
- case wait_none: return "none";
- case wait_receiving: return "receiving";
- case wait_taking: return "taking";
- case wait_yielding: return "yielding";
- case wait_receiving|wait_taking: return "receiving|taking";
- case wait_receiving|wait_yielding: return "receiving|yielding";
- case wait_taking|wait_yielding: return "taking|yielding";
- case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
+ if (cr->sync.wait.status != wait_none) {
+ enum rb_ractor_wait_status prev_wait_status = cr->sync.wait.status;
+ cr->sync.wait.status = wait_none;
+ cr->sync.wait.wakeup_status = wakeup_by_interrupt;
+
+ RACTOR_UNLOCK(cr);
+ {
+ if (cf_func) {
+ enum ruby_tag_type state;
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ rb_thread_check_ints();
+ }
+ EC_POP_TAG();
+
+ if (state) {
+ (*cf_func)(cr, cf_data);
+ EC_JUMP_TAG(ec, state);
+ }
+ }
+ else {
+ rb_thread_check_ints();
+ }
+ }
+
+ // reachable?
+ RACTOR_LOCK(cr);
+ cr->sync.wait.status = prev_wait_status;
}
- rb_bug("unreachable");
}
-static const char *
-wakeup_status_str(enum ractor_wakeup_status wakeup_status)
+#ifdef RUBY_THREAD_PTHREAD_H
+void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
+#else
+
+// win32
+static void
+ractor_cond_wait(rb_ractor_t *r)
{
- switch (wakeup_status) {
- case wakeup_none: return "none";
- case wakeup_by_send: return "by_send";
- case wakeup_by_yield: return "by_yield";
- case wakeup_by_take: return "by_take";
- case wakeup_by_close: return "by_close";
- case wakeup_by_interrupt: return "by_interrupt";
- case wakeup_by_retry: return "by_retry";
+#if RACTOR_CHECK_MODE > 0
+ VALUE locked_by = r->sync.locked_by;
+ r->sync.locked_by = Qnil;
+#endif
+ rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
+
+#if RACTOR_CHECK_MODE > 0
+ r->sync.locked_by = locked_by;
+#endif
+}
+
+static void *
+ractor_sleep_wo_gvl(void *ptr)
+{
+ rb_ractor_t *cr = ptr;
+ RACTOR_LOCK_SELF(cr);
+ {
+ VM_ASSERT(cr->sync.wait.status != wait_none);
+ if (cr->sync.wait.wakeup_status == wakeup_none) {
+ ractor_cond_wait(cr);
+ }
+ cr->sync.wait.status = wait_none;
}
- rb_bug("unreachable");
+ RACTOR_UNLOCK_SELF(cr);
+ return NULL;
}
-#endif // USE_RUBY_DEBUG_LOG
static void
-ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr)
+rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf)
{
- VM_ASSERT(GET_RACTOR() == cr);
- VM_ASSERT(cr->sync.wait.status != wait_none);
- // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", __func__, cr,
- // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
-
RACTOR_UNLOCK(cr);
{
rb_nogvl(ractor_sleep_wo_gvl, cr,
- ractor_sleep_interrupt, cr,
+ ubf, cr,
RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
}
RACTOR_LOCK(cr);
-
- // rb_nogvl() can be canceled by interrupts
- if (cr->sync.wait.status != wait_none) {
- cr->sync.wait.status = wait_none;
- cr->sync.wait.wakeup_status = wakeup_by_interrupt;
-
- RACTOR_UNLOCK(cr);
- rb_thread_check_ints();
- RACTOR_LOCK(cr); // reachable?
- }
}
+#endif
-static void
-ractor_register_taking(rb_ractor_t *r, rb_ractor_t *cr)
+static enum rb_ractor_wakeup_status
+ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
+ ractor_sleep_cleanup_function cf_func, void *cf_data)
{
- VM_ASSERT(cr == GET_RACTOR());
- bool retry_try = false;
+ enum rb_ractor_wakeup_status wakeup_status;
+ VM_ASSERT(GET_RACTOR() == cr);
- RACTOR_LOCK(r);
- {
- if (ractor_sleeping_by(r, wait_yielding)) {
- // already waiting for yielding. retry try_take.
- retry_try = true;
- }
- else {
- // insert cr into taking list
- struct rb_ractor_waiting_list *wl = &r->sync.taking_ractors;
-
- for (int i=0; i<wl->cnt; i++) {
- if (wl->ractors[i] == cr) {
- // TODO: make it clean code.
- rb_native_mutex_unlock(&r->sync.lock);
- rb_raise(rb_eRuntimeError, "Already another thread of same ractor is waiting.");
- }
- }
+ // TODO: multi-threads
+ VM_ASSERT(cr->sync.wait.status == wait_none);
+ VM_ASSERT(wait_status != wait_none);
+ cr->sync.wait.status = wait_status;
+ cr->sync.wait.wakeup_status = wakeup_none;
- if (wl->size == 0) {
- wl->size = 1;
- wl->ractors = malloc(sizeof(rb_ractor_t *) * wl->size);
- if (wl->ractors == NULL) rb_bug("can't allocate buffer");
- }
- else if (wl->size <= wl->cnt + 1) {
- wl->size *= 2;
- wl->ractors = realloc(wl->ractors, sizeof(rb_ractor_t *) * wl->size);
- if (wl->ractors == NULL) rb_bug("can't re-allocate buffer");
- }
- wl->ractors[wl->cnt++] = cr;
- }
- }
- RACTOR_UNLOCK(r);
+ // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
+ // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
- if (retry_try) {
- RACTOR_LOCK(cr);
- {
- if (cr->sync.wait.wakeup_status == wakeup_none) {
- VM_ASSERT(cr->sync.wait.status != wait_none);
+ RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- cr->sync.wait.status = wait_none;
- }
- }
- RACTOR_UNLOCK(cr);
+ while (cr->sync.wait.wakeup_status == wakeup_none) {
+ rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
+ ractor_check_ints(ec, cr, cf_func, cf_data);
}
+
+ cr->sync.wait.status = wait_none;
+
+ // TODO: multi-thread
+ wakeup_status = cr->sync.wait.wakeup_status;
+ cr->sync.wait.wakeup_status = wakeup_none;
+
+ RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
+
+ return wakeup_status;
}
+static enum rb_ractor_wakeup_status
+ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status)
+{
+ return ractor_sleep_with_cleanup(ec, cr, wait_status, 0, NULL);
+}
+
+// Ractor.receive
+
static void
-ractor_waiting_list_del(rb_ractor_t *r, struct rb_ractor_waiting_list *wl, rb_ractor_t *wr)
+ractor_recursive_receive_if(rb_ractor_t *r)
{
- RACTOR_LOCK(r);
- {
- int pos = -1;
- for (int i=0; i<wl->cnt; i++) {
- if (wl->ractors[i] == wr) {
- pos = i;
- break;
- }
- }
- if (pos >= 0) { // found
- wl->cnt--;
- for (int i=pos; i<wl->cnt; i++) {
- wl->ractors[i] = wl->ractors[i+1];
- }
- }
+ if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
+ rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
}
- RACTOR_UNLOCK(r);
}
-static rb_ractor_t *
-ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl)
+static VALUE
+ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
{
- ASSERT_ractor_locking(r);
- VM_ASSERT(&r->sync.taking_ractors == wl);
+ struct rb_ractor_basket basket;
+ ractor_recursive_receive_if(cr);
+ bool received = false;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt);
+ received = ractor_queue_deq(cr, rq, &basket);
+ }
+ RACTOR_UNLOCK_SELF(cr);
- if (wl->cnt > 0) {
- rb_ractor_t *tr = wl->ractors[0];
- for (int i=1; i<wl->cnt; i++) {
- wl->ractors[i-1] = wl->ractors[i];
+ if (!received) {
+ if (cr->sync.incoming_port_closed) {
+ rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
}
- wl->cnt--;
- return tr;
+ return Qundef;
}
else {
- return NULL;
+ return ractor_basket_accept(&basket);
}
}
static void
-ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
+ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
{
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
ractor_recursive_receive_if(cr);
RACTOR_LOCK(cr);
{
- if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) {
- VM_ASSERT(cr->sync.wait.status == wait_none);
- cr->sync.wait.status = wait_receiving;
- cr->sync.wait.wakeup_status = wakeup_none;
- ractor_sleep(ec, cr);
- cr->sync.wait.wakeup_status = wakeup_none;
+ while (ractor_queue_empty_p(cr, rq)) {
+ ractor_sleep(ec, cr, wait_receiving);
}
}
RACTOR_UNLOCK(cr);
@@ -722,40 +756,25 @@ ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
{
VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
VALUE v;
+ struct rb_ractor_queue *rq = &cr->sync.recv_queue;
- while ((v = ractor_try_receive(ec, cr)) == Qundef) {
- ractor_receive_wait(ec, cr);
+ while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) {
+ ractor_wait_receive(ec, cr, rq);
}
return v;
}
#if 0
-// for debug
-static const char *
-basket_type_name(enum rb_ractor_basket_type type)
-{
- switch (type) {
-#define T(t) case basket_type_##t: return #t
- T(none);
- T(ref);
- T(copy);
- T(move);
- T(will);
- T(deleted);
- T(reserved);
- default: rb_bug("unreachable");
- }
-}
-
static void
rq_dump(struct rb_ractor_queue *rq)
{
bool bug = false;
for (int i=0; i<rq->cnt; i++) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
- fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
- if (b->type == basket_type_reserved) bug = true;
+ struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
+ fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
+ (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
+ if (basket_type_p(b, basket_type_reserved) bug = true;
}
if (bug) rb_bug("!!");
}
@@ -786,22 +805,23 @@ receive_if_body(VALUE ptr)
ractor_receive_if_lock(data->cr);
VALUE block_result = rb_yield(data->v);
+ rb_ractor_t *cr = data->cr;
- RACTOR_LOCK_SELF(data->cr);
+ RACTOR_LOCK_SELF(cr);
{
- struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
- VM_ASSERT(b->type == basket_type_reserved);
+ struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
+ VM_ASSERT(basket_type_p(b, basket_type_reserved));
data->rq->reserved_cnt--;
if (RTEST(block_result)) {
- b->type = basket_type_deleted;
- ractor_queue_compact(data->cr, data->rq);
+ ractor_queue_delete(cr, data->rq, b);
+ ractor_queue_compact(cr, data->rq);
}
else {
- b->type = basket_type_ref;
+ b->type.e = basket_type_ref;
}
}
- RACTOR_UNLOCK_SELF(data->cr);
+ RACTOR_UNLOCK_SELF(cr);
data->success = true;
@@ -817,19 +837,20 @@ static VALUE
receive_if_ensure(VALUE v)
{
struct receive_block_data *data = (struct receive_block_data *)v;
+ rb_ractor_t *cr = data->cr;
if (!data->success) {
- RACTOR_LOCK_SELF(data->cr);
+ RACTOR_LOCK_SELF(cr);
{
- struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
- VM_ASSERT(b->type == basket_type_reserved);
- b->type = basket_type_deleted;
+ struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
+ VM_ASSERT(basket_type_p(b, basket_type_reserved));
+ b->type.e = basket_type_deleted;
data->rq->reserved_cnt--;
}
- RACTOR_UNLOCK_SELF(data->cr);
+ RACTOR_UNLOCK_SELF(cr);
}
- rb_mutex_unlock(data->cr->receiving_mutex);
+ rb_mutex_unlock(cr->receiving_mutex);
return Qnil;
}
@@ -841,12 +862,12 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
unsigned int serial = (unsigned int)-1;
int index = 0;
- struct rb_ractor_queue *rq = &cr->sync.incoming_queue;
+ struct rb_ractor_queue *rq = &cr->sync.recv_queue;
while (1) {
VALUE v = Qundef;
- ractor_receive_wait(ec, cr);
+ ractor_wait_receive(ec, cr, rq);
RACTOR_LOCK_SELF(cr);
{
@@ -857,10 +878,10 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
// check newer version
for (int i=index; i<rq->cnt; i++) {
- if (!ractor_queue_skip_p(rq, i)) {
- struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+ if (!ractor_queue_skip_p(cr, rq, i)) {
+ struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i);
v = ractor_basket_value(b);
- b->type = basket_type_reserved;
+ b->type.e = basket_type_reserved;
rq->reserved_cnt++;
index = i;
break;
@@ -869,7 +890,7 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
}
RACTOR_UNLOCK_SELF(cr);
- if (v != Qundef) {
+ if (!UNDEF_P(v)) {
struct receive_block_data data = {
.cr = cr,
.rq = rq,
@@ -881,9 +902,11 @@ ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
receive_if_ensure, (VALUE)&data);
- if (result != Qundef) return result;
+ if (!UNDEF_P(result)) return result;
index++;
}
+
+ RUBY_VM_CHECK_INTS(ec);
}
}
@@ -891,7 +914,6 @@ static void
ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
{
bool closed = false;
- struct rb_ractor_queue *rq = &r->sync.incoming_queue;
RACTOR_LOCK(r);
{
@@ -899,10 +921,8 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_
closed = true;
}
else {
- ractor_queue_enq(r, rq, b);
- if (ractor_wakeup(r, wait_receiving, wakeup_by_send)) {
- RUBY_DEBUG_LOG("wakeup", 0);
- }
+ ractor_queue_enq(r, &r->sync.recv_queue, b);
+ ractor_wakeup(r, wait_receiving, wakeup_by_send);
}
}
RACTOR_UNLOCK(r);
@@ -912,430 +932,896 @@ ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_
}
}
+// Ractor#send
+
static VALUE ractor_move(VALUE obj); // in this file
static VALUE ractor_copy(VALUE obj); // in this file
static void
-ractor_basket_setup(rb_execution_context_t *ec, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc, bool is_will, bool is_yield)
+ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype)
{
- basket->sender = rb_ec_ractor_ptr(ec)->pub.self;
- basket->exception = exc;
+ VALUE v;
+ enum rb_ractor_basket_type type;
- if (is_will) {
- basket->type = basket_type_will;
- basket->v = obj;
- }
- else if (rb_ractor_shareable_p(obj)) {
- basket->type = basket_type_ref;
- basket->v = obj;
+ if (rb_ractor_shareable_p(obj)) {
+ type = basket_type_ref;
+ v = obj;
}
else if (!RTEST(move)) {
- basket->v = ractor_copy(obj);
- basket->type = basket_type_copy;
+ v = ractor_copy(obj);
+ type = basket_type_copy;
}
else {
- basket->type = basket_type_move;
-
- if (is_yield) {
- basket->v = obj; // call ractor_move() when yielding timing.
- }
- else {
- basket->v = ractor_move(obj);
- }
+ type = basket_type_move;
+ v = ractor_move(obj);
}
+
+ *pobj = v;
+ *ptype = type;
+}
+
+static void
+ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
+{
+ VM_ASSERT(cr == GET_RACTOR());
+
+ basket->sender = cr->pub.self;
+ basket->p.send.exception = exc;
+ basket->p.send.v = obj;
+}
+
+static void
+ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
+{
+ VALUE v;
+ enum rb_ractor_basket_type type;
+ ractor_basket_prepare_contents(obj, move, &v, &type);
+ ractor_basket_fill_(cr, basket, v, exc);
+ basket->type.e = type;
+}
+
+static void
+ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
+{
+ ractor_basket_fill_(cr, basket, obj, exc);
+ basket->type.e = basket_type_will;
}
static VALUE
ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
{
struct rb_ractor_basket basket;
- ractor_basket_setup(ec, &basket, obj, move, false, false, false);
+ // TODO: Ractor local GC
+ ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false);
ractor_send_basket(ec, r, &basket);
return r->pub.self;
}
-static VALUE
-ractor_try_take(rb_execution_context_t *ec, rb_ractor_t *r)
+// Ractor#take
+
+static bool
+ractor_take_has_will(rb_ractor_t *r)
+{
+ ASSERT_ractor_locking(r);
+
+ return basket_type_p(&r->sync.will_basket, basket_type_will);
+}
+
+static bool
+ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b)
+{
+ ASSERT_ractor_locking(r);
+
+ if (ractor_take_has_will(r)) {
+ *b = r->sync.will_basket;
+ r->sync.will_basket.type.e = basket_type_none;
+ return true;
+ }
+ else {
+ VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none));
+ return false;
+ }
+}
+
+static bool
+ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
{
- struct rb_ractor_basket basket = {
- .type = basket_type_none,
+ ASSERT_ractor_unlocking(r);
+ bool taken;
+
+ RACTOR_LOCK(r);
+ {
+ taken = ractor_take_will(r, b);
+ }
+ RACTOR_UNLOCK(r);
+
+ return taken;
+}
+
+static bool
+ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
+ bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
+{
+ struct rb_ractor_basket b = {
+ .type.e = basket_type_take_basket,
+ .sender = cr->pub.self,
+ .p = {
+ .take = {
+ .basket = take_basket,
+ .config = config,
+ },
+ },
};
bool closed = false;
RACTOR_LOCK(r);
{
- if (ractor_sleeping_by(r, wait_yielding)) {
- MAYBE_UNUSED(bool) wakeup_result;
- VM_ASSERT(r->sync.wait.yielded_basket.type != basket_type_none);
-
- if (r->sync.wait.yielded_basket.type == basket_type_move) {
- wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_retry);
- }
- else {
- wakeup_result = ractor_wakeup(r, wait_yielding, wakeup_by_take);
- basket = r->sync.wait.yielded_basket;
- ractor_basket_clear(&r->sync.wait.yielded_basket);
- }
- VM_ASSERT(wakeup_result);
+ if (is_take && ractor_take_will(r, take_basket)) {
+ RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r));
+ }
+ else if (!is_take && ractor_take_has_will(r)) {
+ RUBY_DEBUG_LOG("has_will");
+ VM_ASSERT(config != NULL);
+ config->closed = true;
}
else if (r->sync.outgoing_port_closed) {
closed = true;
}
+ else {
+ RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r));
+ ractor_queue_enq(r, &r->sync.takers_queue, &b);
+
+ if (basket_none_p(take_basket)) {
+ ractor_wakeup(r, wait_yielding, wakeup_by_take);
+ }
+ }
}
RACTOR_UNLOCK(r);
- if (basket.type == basket_type_none) {
- if (closed) {
- rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
+ if (closed) {
+ if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
+ return false;
+ }
+ else {
+ return true;
+ }
+}
+
+static bool
+ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
+{
+ struct rb_ractor_queue *ts = &r->sync.takers_queue;
+ bool deleted = false;
+
+ RACTOR_LOCK(r);
+ {
+ if (r->sync.outgoing_port_closed) {
+ // ok
}
else {
- return Qundef;
+ for (int i=0; i<ts->cnt; i++) {
+ struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
+ if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) {
+ ractor_queue_delete(r, ts, b);
+ deleted = true;
+ }
+ }
+ if (deleted) {
+ ractor_queue_compact(r, ts);
+ }
}
}
- else {
- return ractor_basket_accept(&basket);
- }
+ RACTOR_UNLOCK(r);
+
+ return deleted;
}
static VALUE
-ractor_yield_move_body(VALUE v)
+ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
{
- return ractor_move(v);
+ bool taken;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
+ taken = false;
+ }
+ else {
+ taken = true;
+ }
+ }
+ RACTOR_UNLOCK_SELF(cr);
+
+ if (taken) {
+ RUBY_DEBUG_LOG("taken");
+ if (basket_type_p(take_basket, basket_type_deleted)) {
+ VM_ASSERT(r->sync.outgoing_port_closed);
+ rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
+ }
+ return ractor_basket_accept(take_basket);
+ }
+ else {
+ RUBY_DEBUG_LOG("not taken");
+ return Qundef;
+ }
}
+
+#if VM_CHECK_MODE > 0
static bool
-ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_basket *basket)
+ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb)
{
- ASSERT_ractor_unlocking(cr);
- VM_ASSERT(basket->type != basket_type_none);
+ bool ret = false;
+ struct rb_ractor_queue *ts = &r->sync.takers_queue;
- if (cr->sync.outgoing_port_closed) {
- rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
+ RACTOR_LOCK(r);
+ {
+ for (int i=0; i<ts->cnt; i++) {
+ struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
+ if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) {
+ ret = true;
+ break;
+ }
+ }
}
+ RACTOR_UNLOCK(r);
+
+ return ret;
+}
+#endif
+static void
+ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
+{
+ retry:
+ if (basket_none_p(tb)) { // not yielded yet
+ if (!ractor_deregister_take(r, tb)) {
+ // not in r's takers queue
+ rb_thread_sleep(0);
+ goto retry;
+ }
+ }
+ else {
+ VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb));
+ }
+}
+
+struct take_wait_take_cleanup_data {
rb_ractor_t *r;
+ struct rb_ractor_basket *tb;
+};
- retry_shift:
- RACTOR_LOCK(cr);
+static void
+ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
+{
+ struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr;
+ ractor_take_cleanup(cr, data->r, data->tb);
+}
+
+static void
+ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
+{
+ struct take_wait_take_cleanup_data data = {
+ .r = r,
+ .tb = take_basket,
+ };
+
+ RACTOR_LOCK_SELF(cr);
{
- r = ractor_waiting_list_shift(cr, &cr->sync.taking_ractors);
+ if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
+ ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data);
+ }
}
- RACTOR_UNLOCK(cr);
+ RACTOR_UNLOCK_SELF(cr);
+}
- if (r) {
- bool retry_shift = false;
+static VALUE
+ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
+{
+ RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r));
+ VALUE v;
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- RACTOR_LOCK(r);
- {
- if (ractor_sleeping_by(r, wait_taking)) {
- VM_ASSERT(r->sync.wait.taken_basket.type == basket_type_none);
-
- if (basket->type == basket_type_move) {
- enum ractor_wait_status prev_wait_status = r->sync.wait.status;
- r->sync.wait.status = wait_moving;
-
- RACTOR_UNLOCK(r);
- {
- int state;
- VALUE moved_value = rb_protect(ractor_yield_move_body, basket->v, &state);
- if (state) {
- r->sync.wait.status = prev_wait_status;
- rb_jump_tag(state);
- }
- else {
- basket->v = moved_value;
- }
- }
- RACTOR_LOCK(r);
+ struct rb_ractor_basket take_basket = {
+ .type.e = basket_type_none,
+ .sender = 0,
+ };
- if (!ractor_wakeup(r, wait_moving, wakeup_by_yield)) {
- // terminating?
- }
+ ractor_register_take(cr, r, &take_basket, true, NULL, false);
+
+ while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) {
+ ractor_wait_take(ec, cr, r, &take_basket);
+ }
+
+ VM_ASSERT(!basket_none_p(&take_basket));
+ VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket));
+
+ return v;
+}
+
+// Ractor.yield
+
+static bool
+ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
+{
+ ASSERT_ractor_locking(cr);
+
+ for (int i=0; i<rs->cnt; i++) {
+ struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i);
+ if (basket_type_p(b, basket_type_take_basket) &&
+ basket_none_p(b->p.take.basket)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+static bool
+ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
+{
+ ASSERT_ractor_unlocking(cr);
+ struct rb_ractor_basket *first_tb = NULL;
+ bool found = false;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ while (ractor_queue_deq(cr, rs, b)) {
+ if (basket_type_p(b, basket_type_take_basket)) {
+ struct rb_ractor_basket *tb = b->p.take.basket;
+
+ if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
+ found = true;
+ break;
}
else {
- ractor_wakeup(r, wait_taking, wakeup_by_yield);
+ ractor_queue_enq(cr, rs, b);
+ if (first_tb == NULL) first_tb = tb;
+ struct rb_ractor_basket *head = ractor_queue_head(cr, rs);
+ VM_ASSERT(head != NULL);
+ if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) {
+ break; // loop detected
+ }
}
- r->sync.wait.taken_basket = *basket;
}
else {
- retry_shift = true;
+ VM_ASSERT(basket_none_p(b));
}
}
- RACTOR_UNLOCK(r);
- if (retry_shift) {
- // get candidate take-waiting ractor, but already woke up by another reason.
- // retry to check another ractor.
- goto retry_shift;
+ if (found && b->p.take.config && !b->p.take.config->oneshot) {
+ ractor_queue_enq(cr, rs, b);
+ }
+ }
+ RACTOR_UNLOCK_SELF(cr);
+
+ return found;
+}
+
+static bool
+ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will)
+{
+ ASSERT_ractor_unlocking(cr);
+
+ struct rb_ractor_basket b;
+
+ if (ractor_deq_take_basket(cr, ts, &b)) {
+ VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
+ VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
+
+ rb_ractor_t *tr = RACTOR_PTR(b.sender);
+ struct rb_ractor_basket *tb = b.p.take.basket;
+ enum rb_ractor_basket_type type;
+
+ RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
+
+ if (is_will) {
+ type = basket_type_will;
}
else {
- return true;
+ enum ruby_tag_type state;
+
+ // begin
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ // TODO: Ractor local GC
+ ractor_basket_prepare_contents(obj, move, &obj, &type);
+ }
+ EC_POP_TAG();
+ // rescue
+ if (state) {
+ RACTOR_LOCK_SELF(cr);
+ {
+ b.p.take.basket->type.e = basket_type_none;
+ ractor_queue_enq(cr, ts, &b);
+ }
+ RACTOR_UNLOCK_SELF(cr);
+ EC_JUMP_TAG(ec, state);
+ }
}
+
+ RACTOR_LOCK(tr);
+ {
+ VM_ASSERT(basket_type_p(tb, basket_type_yielding));
+ // fill atomic
+ RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
+ ractor_basket_fill_(cr, tb, obj, exc);
+ if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
+ rb_bug("unreachable");
+ }
+ ractor_wakeup(tr, wait_taking, wakeup_by_yield);
+ }
+ RACTOR_UNLOCK(tr);
+
+ return true;
}
else {
+ RUBY_DEBUG_LOG("no take basket");
return false;
}
}
-// select(r1, r2, r3, receive: true, yield: obj)
+static void
+ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
+{
+ RACTOR_LOCK_SELF(cr);
+ {
+ while (!ractor_check_take_basket(cr, ts)) {
+ ractor_sleep(ec, cr, wait_yielding);
+ }
+ }
+ RACTOR_UNLOCK_SELF(cr);
+}
+
static VALUE
-ractor_select(rb_execution_context_t *ec, const VALUE *rs, const int rs_len, VALUE yielded_value, bool move, VALUE *ret_r)
+ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
{
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- VALUE crv = cr->pub.self;
- VALUE ret = Qundef;
- int i;
- bool interrupted = false;
- enum ractor_wait_status wait_status = 0;
- bool yield_p = (yielded_value != Qundef) ? true : false;
- const int alen = rs_len + (yield_p ? 1 : 0);
-
- struct ractor_select_action {
- enum ractor_select_action_type {
- ractor_select_action_take,
- ractor_select_action_receive,
- ractor_select_action_yield,
- } type;
- VALUE v;
- } *actions = ALLOCA_N(struct ractor_select_action, alen);
+ struct rb_ractor_queue *ts = &cr->sync.takers_queue;
- VM_ASSERT(cr->sync.wait.status == wait_none);
- VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
- VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
- VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
+ while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) {
+ ractor_wait_yield(ec, cr, ts);
+ }
- // setup actions
- for (i=0; i<rs_len; i++) {
- VALUE v = rs[i];
+ return Qnil;
+}
- if (v == crv) {
- actions[i].type = ractor_select_action_receive;
- actions[i].v = Qnil;
- wait_status |= wait_receiving;
- }
- else if (rb_ractor_p(v)) {
- actions[i].type = ractor_select_action_take;
- actions[i].v = v;
- wait_status |= wait_taking;
- }
- else {
- rb_raise(rb_eArgError, "should be a ractor object, but %"PRIsVALUE, v);
- }
+// Ractor::Selector
+
+struct rb_ractor_selector {
+ rb_ractor_t *r;
+ struct rb_ractor_basket take_basket;
+ st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
+};
+
+static int
+ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data)
+{
+ const rb_ractor_t *r = (rb_ractor_t *)key;
+ rb_gc_mark(r->pub.self);
+ return ST_CONTINUE;
+}
+
+static void
+ractor_selector_mark(void *ptr)
+{
+ struct rb_ractor_selector *s = ptr;
+
+ if (s->take_ractors) {
+ st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0);
}
- rs = NULL;
- restart:
+ switch (s->take_basket.type.e) {
+ case basket_type_ref:
+ case basket_type_copy:
+ case basket_type_move:
+ case basket_type_will:
+ rb_gc_mark(s->take_basket.sender);
+ rb_gc_mark(s->take_basket.p.send.v);
+ break;
+ default:
+ break;
+ }
+}
+
+static int
+ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data)
+{
+ struct rb_ractor_selector *s = (struct rb_ractor_selector *)data;
+ struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val;
- if (yield_p) {
- actions[rs_len].type = ractor_select_action_yield;
- actions[rs_len].v = Qundef;
- wait_status |= wait_yielding;
- ractor_basket_setup(ec, &cr->sync.wait.yielded_basket, yielded_value, move, false, false, true);
+ if (!config->closed) {
+ ractor_deregister_take((rb_ractor_t *)key, &s->take_basket);
}
+ free(config);
+ return ST_CONTINUE;
+}
- // TODO: shuffle actions
+static void
+ractor_selector_free(void *ptr)
+{
+ struct rb_ractor_selector *s = ptr;
+ st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s);
+ st_free_table(s->take_ractors);
+ ruby_xfree(ptr);
+}
- while (1) {
- RUBY_DEBUG_LOG("try actions (%s)", wait_status_str(wait_status));
-
- for (i=0; i<alen; i++) {
- VALUE v, rv;
- switch (actions[i].type) {
- case ractor_select_action_take:
- rv = actions[i].v;
- v = ractor_try_take(ec, RACTOR_PTR(rv));
- if (v != Qundef) {
- *ret_r = rv;
- ret = v;
- goto cleanup;
- }
- break;
- case ractor_select_action_receive:
- v = ractor_try_receive(ec, cr);
- if (v != Qundef) {
- *ret_r = ID2SYM(rb_intern("receive"));
- ret = v;
- goto cleanup;
- }
- break;
- case ractor_select_action_yield:
- {
- if (ractor_try_yield(ec, cr, &cr->sync.wait.yielded_basket)) {
- *ret_r = ID2SYM(rb_intern("yield"));
- ret = Qnil;
- goto cleanup;
- }
- }
- break;
- }
- }
+static size_t
+ractor_selector_memsize(const void *ptr)
+{
+ const struct rb_ractor_selector *s = ptr;
+ return sizeof(struct rb_ractor_selector) +
+ st_memsize(s->take_ractors) +
+ s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config);
+}
- RUBY_DEBUG_LOG("wait actions (%s)", wait_status_str(wait_status));
+static const rb_data_type_t ractor_selector_data_type = {
+ "ractor/selector",
+ {
+ ractor_selector_mark,
+ ractor_selector_free,
+ ractor_selector_memsize,
+ NULL, // update
+ },
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
+};
- RACTOR_LOCK(cr);
- {
- VM_ASSERT(cr->sync.wait.status == wait_none);
- cr->sync.wait.status = wait_status;
- cr->sync.wait.wakeup_status = wakeup_none;
- }
- RACTOR_UNLOCK(cr);
+static struct rb_ractor_selector *
+RACTOR_SELECTOR_PTR(VALUE selv)
+{
+ VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
- // prepare waiting
- for (i=0; i<alen; i++) {
- rb_ractor_t *r;
- switch (actions[i].type) {
- case ractor_select_action_take:
- r = RACTOR_PTR(actions[i].v);
- ractor_register_taking(r, cr);
- break;
- case ractor_select_action_yield:
- case ractor_select_action_receive:
- break;
+ return (struct rb_ractor_selector *)DATA_PTR(selv);
+}
+
+// Ractor::Selector.new
+
+static VALUE
+ractor_selector_create(VALUE klass)
+{
+ struct rb_ractor_selector *s;
+ VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s);
+ s->take_basket.type.e = basket_type_reserved;
+ s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config
+ return selv;
+}
+
+// Ractor::Selector#add(r)
+
+static VALUE
+ractor_selector_add(VALUE selv, VALUE rv)
+{
+ if (!rb_ractor_p(rv)) {
+ rb_raise(rb_eArgError, "Not a ractor object");
+ }
+
+ rb_ractor_t *r = RACTOR_PTR(rv);
+ struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+
+ if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
+ rb_raise(rb_eArgError, "already added");
+ }
+
+ struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
+ VM_ASSERT(config != NULL);
+ config->closed = false;
+ config->oneshot = false;
+
+ if (ractor_register_take(GET_RACTOR(), r, &s->take_basket, false, config, true)) {
+ st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
+ }
+
+ return rv;
+}
+
+// Ractor::Selector#remove(r)
+
+static VALUE
+ractor_selector_remove(VALUE selv, VALUE rv)
+{
+ if (!rb_ractor_p(rv)) {
+ rb_raise(rb_eArgError, "Not a ractor object");
+ }
+
+ rb_ractor_t *r = RACTOR_PTR(rv);
+ struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+
+ RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
+
+ if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
+ rb_raise(rb_eArgError, "not added yet");
+ }
+
+ ractor_deregister_take(r, &s->take_basket);
+ struct rb_ractor_selector_take_config *config;
+ st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
+ free(config);
+
+ return rv;
+}
+
+// Ractor::Selector#clear
+
+struct ractor_selector_clear_data {
+ VALUE selv;
+ rb_execution_context_t *ec;
+};
+
+static int
+ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data)
+{
+ VALUE selv = (VALUE)data;
+ rb_ractor_t *r = (rb_ractor_t *)key;
+ ractor_selector_remove(selv, r->pub.self);
+ return ST_CONTINUE;
+}
+
+static VALUE
+ractor_selector_clear(VALUE selv)
+{
+ struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+
+ st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
+ st_clear(s->take_ractors);
+ return selv;
+}
+
+static VALUE
+ractor_selector_empty_p(VALUE selv)
+{
+ struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+ return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
+}
+
+static int
+ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
+{
+ rb_ractor_t *r = (rb_ractor_t *)key;
+ struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat;
+ int ret;
+
+ if (!basket_none_p(tb)) {
+ RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e));
+ return ST_STOP;
+ }
+
+ RACTOR_LOCK(r);
+ {
+ if (basket_type_p(&r->sync.will_basket, basket_type_will)) {
+ RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r));
+
+ if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) {
+ ractor_take_will(r, tb);
+ ret = ST_STOP;
+ }
+ else {
+ RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e));
+ ret = ST_CONTINUE;
}
}
+ else if (r->sync.outgoing_port_closed) {
+ RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r));
- // wait
- RACTOR_LOCK(cr);
- {
- if (cr->sync.wait.wakeup_status == wakeup_none) {
- for (i=0; i<alen; i++) {
- rb_ractor_t *r;
-
- switch (actions[i].type) {
- case ractor_select_action_take:
- r = RACTOR_PTR(actions[i].v);
- if (ractor_sleeping_by(r, wait_yielding)) {
- RUBY_DEBUG_LOG("wakeup_none, but r:%u is waiting for yielding", r->pub.id);
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- goto skip_sleep;
- }
- break;
- case ractor_select_action_receive:
- if (cr->sync.incoming_queue.cnt > 0) {
- RUBY_DEBUG_LOG("wakeup_none, but incoming_queue has %u messages", cr->sync.incoming_queue.cnt);
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- goto skip_sleep;
- }
- break;
- case ractor_select_action_yield:
- if (cr->sync.taking_ractors.cnt > 0) {
- RUBY_DEBUG_LOG("wakeup_none, but %u taking_ractors are waiting", cr->sync.taking_ractors.cnt);
- cr->sync.wait.wakeup_status = wakeup_by_retry;
- goto skip_sleep;
- }
- else if (cr->sync.outgoing_port_closed) {
- cr->sync.wait.wakeup_status = wakeup_by_close;
- goto skip_sleep;
- }
- break;
- }
- }
-
- RUBY_DEBUG_LOG("sleep %s", wait_status_str(cr->sync.wait.status));
- ractor_sleep(ec, cr);
- RUBY_DEBUG_LOG("awaken %s", wakeup_status_str(cr->sync.wait.wakeup_status));
+ if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) {
+ tb->sender = r->pub.self;
+ ret = ST_STOP;
}
else {
- skip_sleep:
- RUBY_DEBUG_LOG("no need to sleep %s->%s",
- wait_status_str(cr->sync.wait.status),
- wakeup_status_str(cr->sync.wait.wakeup_status));
- cr->sync.wait.status = wait_none;
+ RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e));
+ ret = ST_CONTINUE;
}
}
- RACTOR_UNLOCK(cr);
+ else {
+ RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
+ ractor_wakeup(r, wait_yielding, wakeup_by_take);
+ ret = ST_CONTINUE;
+ }
+ }
+ RACTOR_UNLOCK(r);
+
+ return ret;
+}
+
+// Ractor::Selector#wait
+
+static void
+ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr)
+{
+ struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0);
+ // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
+ tb->type.e = basket_type_reserved;
+ }
+ RACTOR_UNLOCK_SELF(cr);
+}
+
+static VALUE
+ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
+{
+ rb_execution_context_t *ec = GET_EC();
+ struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
+ struct rb_ractor_basket *tb = &s->take_basket;
+ struct rb_ractor_basket taken_basket;
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ bool do_receive = !!RTEST(do_receivev);
+ bool do_yield = !!RTEST(do_yieldv);
+ VALUE ret_v, ret_r;
+ enum rb_ractor_wait_status wait_status;
+ struct rb_ractor_queue *rq = &cr->sync.recv_queue;
+ struct rb_ractor_queue *ts = &cr->sync.takers_queue;
+
+ RUBY_DEBUG_LOG("start");
+
+ retry:
+ RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
- // cleanup waiting
- for (i=0; i<alen; i++) {
- rb_ractor_t *r;
- switch (actions[i].type) {
- case ractor_select_action_take:
- r = RACTOR_PTR(actions[i].v);
- ractor_waiting_list_del(r, &r->sync.taking_ractors, cr);
+ // setup wait_status
+ wait_status = wait_none;
+ if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
+ if (do_receive) wait_status |= wait_receiving;
+ if (do_yield) wait_status |= wait_yielding;
+
+ RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
+
+ if (wait_status == wait_none) {
+ rb_raise(rb_eRactorError, "no taking ractors");
+ }
+
+ // check recv_queue
+ if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) {
+ ret_r = ID2SYM(rb_intern("receive"));
+ goto success;
+ }
+
+ // check takers
+ if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
+ ret_v = Qnil;
+ ret_r = ID2SYM(rb_intern("yield"));
+ goto success;
+ }
+
+ // check take_basket
+ VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
+ s->take_basket.type.e = basket_type_none;
+ // kick all take target ractors
+ st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
+
+ RACTOR_LOCK_SELF(cr);
+ {
+ retry_waiting:
+ while (1) {
+ if (!basket_none_p(tb)) {
+ RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
+ tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
break;
- case ractor_select_action_receive:
- case ractor_select_action_yield:
+ }
+ if (do_receive && !ractor_queue_empty_p(cr, rq)) {
+ RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
+ break;
+ }
+ if (do_yield && ractor_check_take_basket(cr, ts)) {
+ RUBY_DEBUG_LOG("can yield");
break;
}
+
+ ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
}
- // check results
- enum ractor_wakeup_status wakeup_status = cr->sync.wait.wakeup_status;
- cr->sync.wait.wakeup_status = wakeup_none;
+ taken_basket = *tb;
- switch (wakeup_status) {
- case wakeup_none:
- // OK. something happens.
- // retry loop.
- break;
- case wakeup_by_retry:
- // Retry request.
- break;
- case wakeup_by_send:
- // OK.
- // retry loop and try_receive will succss.
- break;
- case wakeup_by_yield:
- // take was succeeded!
- // cr.wait.taken_basket contains passed block
- VM_ASSERT(cr->sync.wait.taken_basket.type != basket_type_none);
- *ret_r = cr->sync.wait.taken_basket.sender;
- VM_ASSERT(rb_ractor_p(*ret_r));
- ret = ractor_basket_accept(&cr->sync.wait.taken_basket);
- goto cleanup;
- case wakeup_by_take:
- *ret_r = ID2SYM(rb_intern("yield"));
- ret = Qnil;
- goto cleanup;
- case wakeup_by_close:
- // OK.
- // retry loop and will get CloseError.
- break;
- case wakeup_by_interrupt:
- ret = Qundef;
- interrupted = true;
- goto cleanup;
+ // ensure
+ // tb->type.e = basket_type_reserved # do it atomic in the following code
+ if (taken_basket.type.e == basket_type_yielding ||
+ RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
+
+ if (basket_type_p(tb, basket_type_yielding)) {
+ RACTOR_UNLOCK_SELF(cr);
+ {
+ rb_thread_sleep(0);
+ }
+ RACTOR_LOCK_SELF(cr);
+ }
+ goto retry_waiting;
}
}
+ RACTOR_UNLOCK_SELF(cr);
- cleanup:
- RUBY_DEBUG_LOG("cleanup actions (%s)", wait_status_str(wait_status));
-
- if (cr->sync.wait.yielded_basket.type != basket_type_none) {
- ractor_basket_clear(&cr->sync.wait.yielded_basket);
+ // check the taken resutl
+ switch (taken_basket.type.e) {
+ case basket_type_none:
+ VM_ASSERT(do_receive || do_yield);
+ goto retry;
+ case basket_type_yielding:
+ rb_bug("unreachable");
+ case basket_type_deleted: {
+ ractor_selector_remove(selv, taken_basket.sender);
+
+ rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
+ if (ractor_take_will_lock(r, &taken_basket)) {
+ RUBY_DEBUG_LOG("has_will");
+ }
+ else {
+ RUBY_DEBUG_LOG("no will");
+ // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
+ // remove and retry wait
+ goto retry;
+ }
+ break;
+ }
+ case basket_type_will:
+ // no more messages
+ ractor_selector_remove(selv, taken_basket.sender);
+ break;
+ default:
+ break;
}
- VM_ASSERT(cr->sync.wait.status == wait_none);
- VM_ASSERT(cr->sync.wait.wakeup_status == wakeup_none);
- VM_ASSERT(cr->sync.wait.taken_basket.type == basket_type_none);
- VM_ASSERT(cr->sync.wait.yielded_basket.type == basket_type_none);
+ RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
- if (interrupted) {
- rb_vm_check_ints_blocking(ec);
- interrupted = false;
- goto restart;
- }
+ ret_v = ractor_basket_accept(&taken_basket);
+ ret_r = taken_basket.sender;
+ success:
+ return rb_ary_new_from_args(2, ret_r, ret_v);
+}
- VM_ASSERT(ret != Qundef);
- return ret;
+static VALUE
+ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
+{
+ VALUE options;
+ ID keywords[3];
+ VALUE values[3];
+
+ keywords[0] = rb_intern("receive");
+ keywords[1] = rb_intern("yield_value");
+ keywords[2] = rb_intern("move");
+
+ rb_scan_args(argc, argv, "0:", &options);
+ rb_get_kwargs(options, keywords, 0, numberof(values), values);
+ return ractor_selector__wait(selector,
+ values[0] == Qundef ? Qfalse : RTEST(values[0]),
+ values[1] != Qundef, values[1], values[2]);
}
static VALUE
-ractor_yield(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
+ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
{
- VALUE ret_r;
- ractor_select(ec, NULL, 0, obj, RTEST(move) ? true : false, &ret_r);
- return Qnil;
+ VALUE selector = ractor_selector_create(klass);
+
+ for (int i=0; i<argc; i++) {
+ ractor_selector_add(selector, ractors[i]);
+ }
+
+ return selector;
}
static VALUE
-ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
+ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move)
{
- VALUE ret_r;
- VALUE v = ractor_select(ec, &r->pub.self, 1, Qundef, false, &ret_r);
- return v;
+ VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector);
+ VALUE result;
+ int state;
+
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG() == TAG_NONE)) {
+ result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move);
+ }
+ else {
+ // ensure
+ ractor_selector_clear(selector);
+
+ // jump
+ EC_JUMP_TAG(ec, state);
+ }
+ EC_POP_TAG();
+
+ RB_GC_GUARD(ractors);
+ return result;
}
+// Ractor#close_incoming
+
static VALUE
ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
{
@@ -1347,8 +1833,8 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
prev = Qfalse;
r->sync.incoming_port_closed = true;
if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
- VM_ASSERT(r->sync.incoming_queue.cnt == 0);
- RUBY_DEBUG_LOG("cancel receiving", 0);
+ VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
+ RUBY_DEBUG_LOG("cancel receiving");
}
}
else {
@@ -1359,6 +1845,8 @@ ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
return prev;
}
+// Ractor#close_outgoing
+
static VALUE
ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
{
@@ -1366,27 +1854,50 @@ ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
RACTOR_LOCK(r);
{
+ struct rb_ractor_queue *ts = &r->sync.takers_queue;
+ rb_ractor_t *tr;
+ struct rb_ractor_basket b;
+
if (!r->sync.outgoing_port_closed) {
prev = Qfalse;
r->sync.outgoing_port_closed = true;
}
else {
+ VM_ASSERT(ractor_queue_empty_p(r, ts));
prev = Qtrue;
}
// wakeup all taking ractors
- rb_ractor_t *taking_ractor;
- while ((taking_ractor = ractor_waiting_list_shift(r, &r->sync.taking_ractors)) != NULL) {
- RACTOR_LOCK(taking_ractor);
- ractor_wakeup(taking_ractor, wait_taking, wakeup_by_close);
- RACTOR_UNLOCK(taking_ractor);
+ while (ractor_queue_deq(r, ts, &b)) {
+ if (basket_type_p(&b, basket_type_take_basket)) {
+ tr = RACTOR_PTR(b.sender);
+ struct rb_ractor_basket *tb = b.p.take.basket;
+
+ if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
+ b.p.take.basket->sender = r->pub.self;
+ if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) {
+ rb_bug("unreachable");
+ }
+ RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender)));
+ }
+
+ if (b.p.take.config) {
+ b.p.take.config->closed = true;
+ }
+
+ // TODO: deadlock-able?
+ RACTOR_LOCK(tr);
+ {
+ ractor_wakeup(tr, wait_taking, wakeup_by_close);
+ }
+ RACTOR_UNLOCK(tr);
+ }
}
// raising yielding Ractor
- if (!r->yield_atexit &&
- ractor_wakeup(r, wait_yielding, wakeup_by_close)) {
- RUBY_DEBUG_LOG("cancel yielding", 0);
- }
+ ractor_wakeup(r, wait_yielding, wakeup_by_close);
+
+ VM_ASSERT(ractor_queue_empty_p(r, ts));
}
RACTOR_UNLOCK(r);
return prev;
@@ -1410,7 +1921,7 @@ vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt);
VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P());
- list_add_tail(&vm->ractor.set, &r->vmlr_node);
+ ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node);
vm->ractor.cnt++;
}
@@ -1418,24 +1929,17 @@ static void
cancel_single_ractor_mode(void)
{
// enable multi-ractor mode
- RUBY_DEBUG_LOG("enable multi-ractor mode", 0);
+ RUBY_DEBUG_LOG("enable multi-ractor mode");
VALUE was_disabled = rb_gc_enable();
rb_gc_start();
- rb_transient_heap_evacuate();
if (was_disabled) {
rb_gc_disable();
}
ruby_single_main_ractor = NULL;
-
- if (rb_warning_category_enabled_p(RB_WARN_CATEGORY_EXPERIMENTAL)) {
- rb_category_warn(RB_WARN_CATEGORY_EXPERIMENTAL,
- "Ractor is experimental, and the behavior may change in future versions of Ruby! "
- "Also there are many implementation issues.");
- }
}
static void
@@ -1479,7 +1983,7 @@ vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
vm->ractor.cnt, vm->ractor.sync.terminate_waiting);
VM_ASSERT(vm->ractor.cnt > 0);
- list_del(&cr->vmlr_node);
+ ccan_list_del(&cr->vmlr_node);
if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) {
rb_native_cond_signal(&vm->ractor.sync.terminate_cond);
@@ -1510,7 +2014,7 @@ rb_ractor_main_alloc(void)
{
rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t));
if (r == NULL) {
- fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
+ fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
exit(EXIT_FAILURE);
}
MEMZERO(r, rb_ractor_t, 1);
@@ -1541,12 +2045,12 @@ rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
}
#endif
-void rb_gvl_init(rb_global_vm_lock_t *gvl);
+void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
void
rb_ractor_living_threads_init(rb_ractor_t *r)
{
- list_head_init(&r->threads.set);
+ ccan_list_head_init(&r->threads.set);
r->threads.cnt = 0;
r->threads.blocking_cnt = 0;
}
@@ -1554,13 +2058,18 @@ rb_ractor_living_threads_init(rb_ractor_t *r)
static void
ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
{
- ractor_queue_setup(&r->sync.incoming_queue);
+ ractor_queue_setup(&r->sync.recv_queue);
+ ractor_queue_setup(&r->sync.takers_queue);
rb_native_mutex_initialize(&r->sync.lock);
+ rb_native_cond_initialize(&r->barrier_wait_cond);
+
+#ifdef RUBY_THREAD_WIN32_H
rb_native_cond_initialize(&r->sync.cond);
rb_native_cond_initialize(&r->barrier_wait_cond);
+#endif
// thread management
- rb_gvl_init(&r->threads.gvl);
+ rb_thread_sched_init(&r->threads.sched, false);
rb_ractor_living_threads_init(r);
// naming
@@ -1603,6 +2112,8 @@ ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VAL
r->verbose = cr->verbose;
r->debug = cr->debug;
+ rb_yjit_before_ractor_spawn();
+ rb_rjit_before_ractor_spawn();
rb_thread_create_ractor(r, args, block);
RB_GC_GUARD(rv);
@@ -1618,28 +2129,23 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
ASSERT_ractor_unlocking(cr);
- struct rb_ractor_basket basket;
- ractor_basket_setup(ec, &basket, v, Qfalse, exc, true, true /* this flag is ignored because move is Qfalse */);
+ struct rb_ractor_queue *ts = &cr->sync.takers_queue;
retry:
- if (ractor_try_yield(ec, cr, &basket)) {
+ if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
// OK.
}
else {
bool retry = false;
RACTOR_LOCK(cr);
{
- if (cr->sync.taking_ractors.cnt == 0) {
- cr->sync.wait.yielded_basket = basket;
-
+ if (!ractor_check_take_basket(cr, ts)) {
VM_ASSERT(cr->sync.wait.status == wait_none);
- cr->sync.wait.status = wait_yielding;
- cr->sync.wait.wakeup_status = wakeup_none;
-
- VM_ASSERT(cr->yield_atexit == false);
- cr->yield_atexit = true;
+ RUBY_DEBUG_LOG("leave a will");
+ ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc);
}
else {
+ RUBY_DEBUG_LOG("rare timing!");
retry = true; // another ractor is waiting for the yield.
}
}
@@ -1650,6 +2156,20 @@ ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool e
}
void
+rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
+{
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ ractor_yield_atexit(ec, cr, result, false);
+}
+
+void
+rb_ractor_atexit_exception(rb_execution_context_t *ec)
+{
+ rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
+ ractor_yield_atexit(ec, cr, ec->errinfo, true);
+}
+
+void
rb_ractor_teardown(rb_execution_context_t *ec)
{
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
@@ -1666,20 +2186,6 @@ rb_ractor_teardown(rb_execution_context_t *ec)
}
void
-rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
-{
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_yield_atexit(ec, cr, result, false);
-}
-
-void
-rb_ractor_atexit_exception(rb_execution_context_t *ec)
-{
- rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
- ractor_yield_atexit(ec, cr, ec->errinfo, true);
-}
-
-void
rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
{
for (int i=0; i<len; i++) {
@@ -1696,7 +2202,7 @@ rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args
}
}
-MJIT_FUNC_EXPORTED bool
+bool
rb_ractor_main_p_(void)
{
VM_ASSERT(rb_multi_ractor_p());
@@ -1712,47 +2218,30 @@ rb_obj_is_main_ractor(VALUE gv)
return r == GET_VM()->ractor.main_ractor;
}
-rb_global_vm_lock_t *
-rb_ractor_gvl(rb_ractor_t *r)
-{
- return &r->threads.gvl;
-}
-
int
rb_ractor_living_thread_num(const rb_ractor_t *r)
{
return r->threads.cnt;
}
+// only for current ractor
VALUE
-rb_ractor_thread_list(rb_ractor_t *r)
+rb_ractor_thread_list(void)
{
+ rb_ractor_t *r = GET_RACTOR();
rb_thread_t *th = 0;
- VALUE *ts;
- int ts_cnt;
+ VALUE ary = rb_ary_new();
- RACTOR_LOCK(r);
- {
- ts = ALLOCA_N(VALUE, r->threads.cnt);
- ts_cnt = 0;
-
- list_for_each(&r->threads.set, th, lt_node) {
- switch (th->status) {
- case THREAD_RUNNABLE:
- case THREAD_STOPPED:
- case THREAD_STOPPED_FOREVER:
- ts[ts_cnt++] = th->self;
- default:
- break;
- }
+ ccan_list_for_each(&r->threads.set, th, lt_node) {
+ switch (th->status) {
+ case THREAD_RUNNABLE:
+ case THREAD_STOPPED:
+ case THREAD_STOPPED_FOREVER:
+ rb_ary_push(ary, th->self);
+ default:
+ break;
}
}
- RACTOR_UNLOCK(r);
-
- VALUE ary = rb_ary_new();
- for (int i=0; i<ts_cnt; i++) {
- rb_ary_push(ary, ts[i]);
- }
return ary;
}
@@ -1765,7 +2254,7 @@ rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th)
RACTOR_LOCK(r);
{
RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt);
- list_add_tail(&r->threads.set, &th->lt_node);
+ ccan_list_add_tail(&r->threads.set, &th->lt_node);
r->threads.cnt++;
}
RACTOR_UNLOCK(r);
@@ -1835,6 +2324,8 @@ ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const c
}
}
+void rb_threadptr_remove(rb_thread_t *th);
+
void
rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
{
@@ -1842,13 +2333,15 @@ rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
+ rb_threadptr_remove(th);
+
if (cr->threads.cnt == 1) {
vm_remove_ractor(th->vm, cr);
}
else {
RACTOR_LOCK(cr);
{
- list_del(&th->lt_node);
+ ccan_list_del(&th->lt_node);
cr->threads.cnt--;
}
RACTOR_UNLOCK(cr);
@@ -1922,7 +2415,7 @@ rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
rb_threadptr_interrupt(main_th);
}
else {
- RUBY_DEBUG_LOG("killed (%p)", main_th);
+ RUBY_DEBUG_LOG("killed (%p)", (void *)main_th);
}
}
}
@@ -1935,25 +2428,33 @@ ractor_terminal_interrupt_all(rb_vm_t *vm)
if (vm->ractor.cnt > 1) {
// send terminate notification to all ractors
rb_ractor_t *r = 0;
- list_for_each(&vm->ractor.set, r, vmlr_node) {
+ ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
if (r != vm->ractor.main_ractor) {
+ RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r));
rb_ractor_terminate_interrupt_main_thread(r);
}
}
}
}
+void rb_add_running_thread(rb_thread_t *th);
+void rb_del_running_thread(rb_thread_t *th);
+
void
rb_ractor_terminate_all(void)
{
rb_vm_t *vm = GET_VM();
rb_ractor_t *cr = vm->ractor.main_ractor;
+ RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt);
+
VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
if (vm->ractor.cnt > 1) {
RB_VM_LOCK();
- ractor_terminal_interrupt_all(vm); // kill all ractors
+ {
+ ractor_terminal_interrupt_all(vm); // kill all ractors
+ }
RB_VM_UNLOCK();
}
rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
@@ -1966,7 +2467,9 @@ rb_ractor_terminate_all(void)
// wait for 1sec
rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
+ rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
+ rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
ractor_terminal_interrupt_all(vm);
@@ -1978,7 +2481,23 @@ rb_ractor_terminate_all(void)
rb_execution_context_t *
rb_vm_main_ractor_ec(rb_vm_t *vm)
{
- return vm->ractor.main_ractor->threads.running_ec;
+ /* This code needs to carefully work around two bugs:
+ * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is
+ * actually currently running (as opposed to without M:N threading, when
+ * running_ec will still point to the _last_ thread which ran)
+ * - Bug #20197: If the main thread is sleeping, setting its postponed job
+ * interrupt flag is pointless; it won't look at the flag until it stops sleeping
+ * for some reason. It would be better to set the flag on the running ec, which
+ * will presumably look at it soon.
+ *
+ * Solution: use running_ec if it's set, otherwise fall back to the main thread ec.
+ * This is still susceptible to some rare race conditions (what if the last thread
+ * to run just entered a long-running sleep?), but seems like the best balance of
+ * robustness and complexity.
+ */
+ rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec;
+ if (running_ec) { return running_ec; }
+ return vm->ractor.main_thread->ec;
}
static VALUE
@@ -1987,6 +2506,29 @@ ractor_moved_missing(int argc, VALUE *argv, VALUE self)
rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
}
+#ifndef USE_RACTOR_SELECTOR
+#define USE_RACTOR_SELECTOR 0
+#endif
+
+RUBY_SYMBOL_EXPORT_BEGIN
+void rb_init_ractor_selector(void);
+RUBY_SYMBOL_EXPORT_END
+
+void
+rb_init_ractor_selector(void)
+{
+ rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
+ rb_undef_alloc_func(rb_cRactorSelector);
+
+ rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
+ rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
+ rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
+ rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
+ rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
+ rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1);
+ rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4);
+}
+
/*
* Document-class: Ractor::ClosedError
*
@@ -2106,6 +2648,10 @@ Init_Ractor(void)
rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
+
+#if USE_RACTOR_SELECTOR
+ rb_init_ractor_selector();
+#endif
}
void
@@ -2114,7 +2660,7 @@ rb_ractor_dump(void)
rb_vm_t *vm = GET_VM();
rb_ractor_t *r = 0;
- list_for_each(&vm->ractor.set, r, vmlr_node) {
+ ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
if (r != vm->ractor.main_ractor) {
fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_));
}
@@ -2266,12 +2812,25 @@ obj_traverse_rec(struct obj_traverse_data *data)
{
if (UNLIKELY(!data->rec)) {
data->rec_hash = rb_ident_hash_new();
- data->rec = rb_hash_st_table(data->rec_hash);
+ data->rec = RHASH_ST_TABLE(data->rec_hash);
}
return data->rec;
}
static int
+obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr)
+{
+ struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
+
+ if (obj_traverse_i(val, d->data)) {
+ d->stop = true;
+ return ST_STOP;
+ }
+
+ return ST_CONTINUE;
+}
+
+static int
obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
{
if (RB_SPECIAL_CONST_P(obj)) return 0;
@@ -2287,14 +2846,12 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
return 0;
}
- if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
- struct gen_ivtbl *ivtbl;
- rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
- for (uint32_t i = 0; i < ivtbl->numiv; i++) {
- VALUE val = ivtbl->ivptr[i];
- if (val != Qundef && obj_traverse_i(val, data)) return 1;
- }
- }
+ struct obj_traverse_callback_data d = {
+ .stop = false,
+ .data = data,
+ };
+ rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d);
+ if (d.stop) return 1;
switch (BUILTIN_TYPE(obj)) {
// no child node
@@ -2308,15 +2865,7 @@ obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
break;
case T_OBJECT:
- {
- uint32_t len = ROBJECT_NUMIV(obj);
- VALUE *ptr = ROBJECT_IVPTR(obj);
-
- for (uint32_t i=0; i<len; i++) {
- VALUE val = ptr[i];
- if (val != Qundef && obj_traverse_i(val, data)) return 1;
- }
- }
+ /* Instance variables already traversed. */
break;
case T_ARRAY:
@@ -2436,7 +2985,10 @@ rb_obj_traverse(VALUE obj,
static int
frozen_shareable_p(VALUE obj, bool *made_shareable)
{
- if (!RB_TYPE_P(obj, T_DATA)) {
+ if (CHILLED_STRING_P(obj)) {
+ return false;
+ }
+ else if (!RB_TYPE_P(obj, T_DATA)) {
return true;
}
else if (RTYPEDDATA_P(obj)) {
@@ -2465,6 +3017,17 @@ make_shareable_check_shareable(VALUE obj)
if (rb_ractor_shareable_p(obj)) {
return traverse_skip;
}
+ else if (CHILLED_STRING_P(obj)) {
+ rb_funcall(obj, idFreeze, 0);
+
+ if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
+ rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
+ }
+
+ if (RB_OBJ_SHAREABLE_P(obj)) {
+ return traverse_skip;
+ }
+ }
else if (!frozen_shareable_p(obj, &made_shareable)) {
if (made_shareable) {
return traverse_skip;
@@ -2509,10 +3072,7 @@ VALUE
rb_ractor_make_shareable_copy(VALUE obj)
{
VALUE copy = ractor_copy(obj);
- rb_obj_traverse(copy,
- make_shareable_check_shareable,
- null_leave, mark_shareable);
- return copy;
+ return rb_ractor_make_shareable(copy);
}
VALUE
@@ -2526,6 +3086,14 @@ rb_ractor_ensure_shareable(VALUE obj, VALUE name)
return obj;
}
+void
+rb_ractor_ensure_main_ractor(const char *msg)
+{
+ if (!rb_ractor_main_p()) {
+ rb_raise(rb_eRactorIsolationError, "%s", msg);
+ }
+}
+
static enum obj_traverse_iterator_result
shareable_p_enter(VALUE obj)
{
@@ -2547,7 +3115,7 @@ shareable_p_enter(VALUE obj)
return traverse_stop; // fail
}
-MJIT_FUNC_EXPORTED bool
+bool
rb_ractor_shareable_p_continue(VALUE obj)
{
if (rb_obj_traverse(obj,
@@ -2651,30 +3219,47 @@ obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int e
return ST_CONTINUE;
}
+static int
+obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x)
+{
+ return ST_REPLACE;
+}
+
+static int
+obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists)
+{
+ struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
+ struct obj_traverse_replace_data *data = d->data;
+
+ if (obj_traverse_replace_i(*(VALUE *)val, data)) {
+ d->stop = true;
+ return ST_STOP;
+ }
+ else if (*(VALUE *)val != data->replacement) {
+ VALUE v = *(VALUE *)val = data->replacement;
+ RB_OBJ_WRITTEN(d->src, Qundef, v);
+ }
+
+ return ST_CONTINUE;
+}
+
static struct st_table *
obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
{
if (UNLIKELY(!data->rec)) {
data->rec_hash = rb_ident_hash_new();
- data->rec = rb_hash_st_table(data->rec_hash);
+ data->rec = RHASH_ST_TABLE(data->rec_hash);
}
return data->rec;
}
-#if USE_TRANSIENT_HEAP
-void rb_ary_transient_heap_evacuate(VALUE ary, int promote);
-void rb_obj_transient_heap_evacuate(VALUE obj, int promote);
-void rb_hash_transient_heap_evacuate(VALUE hash, int promote);
-void rb_struct_transient_heap_evacuate(VALUE st, int promote);
-#endif
-
static void
obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
{
int *pcnt = (int *)ptr;
if (!rb_ractor_shareable_p(obj)) {
- pcnt++;
+ ++*pcnt;
}
}
@@ -2693,7 +3278,7 @@ obj_refer_only_shareables_p(VALUE obj)
static int
obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
{
- VALUE replacement;
+ st_data_t replacement;
if (RB_SPECIAL_CONST_P(obj)) {
data->replacement = obj;
@@ -2706,14 +3291,14 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case traverse_stop: return 1; // stop search
}
- replacement = data->replacement;
+ replacement = (st_data_t)data->replacement;
- if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t *)&replacement))) {
- data->replacement = replacement;
+ if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) {
+ data->replacement = (VALUE)replacement;
return 0;
}
else {
- st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, (st_data_t)replacement);
+ st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement);
}
if (!data->move) {
@@ -2729,9 +3314,26 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
struct gen_ivtbl *ivtbl;
rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
- for (uint32_t i = 0; i < ivtbl->numiv; i++) {
- if (ivtbl->ivptr[i] != Qundef) {
- CHECK_AND_REPLACE(ivtbl->ivptr[i]);
+
+ if (UNLIKELY(rb_shape_obj_too_complex(obj))) {
+ struct obj_traverse_replace_callback_data d = {
+ .stop = false,
+ .data = data,
+ .src = obj,
+ };
+ rb_st_foreach_with_replace(
+ ivtbl->as.complex.table,
+ obj_iv_hash_traverse_replace_foreach_i,
+ obj_iv_hash_traverse_replace_i,
+ (st_data_t)&d
+ );
+ if (d.stop) return 1;
+ }
+ else {
+ for (uint32_t i = 0; i < ivtbl->as.shape.numiv; i++) {
+ if (!UNDEF_P(ivtbl->as.shape.ivptr[i])) {
+ CHECK_AND_REPLACE(ivtbl->as.shape.ivptr[i]);
+ }
}
}
}
@@ -2751,15 +3353,25 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_OBJECT:
{
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE);
-#endif
-
- uint32_t len = ROBJECT_NUMIV(obj);
- VALUE *ptr = ROBJECT_IVPTR(obj);
+ if (rb_shape_obj_too_complex(obj)) {
+ struct obj_traverse_replace_callback_data d = {
+ .stop = false,
+ .data = data,
+ .src = obj,
+ };
+ rb_st_foreach_with_replace(
+ ROBJECT_IV_HASH(obj),
+ obj_iv_hash_traverse_replace_foreach_i,
+ obj_iv_hash_traverse_replace_i,
+ (st_data_t)&d
+ );
+ if (d.stop) return 1;
+ }
+ else {
+ uint32_t len = ROBJECT_IV_COUNT(obj);
+ VALUE *ptr = ROBJECT_IVPTR(obj);
- for (uint32_t i=0; i<len; i++) {
- if (ptr[i] != Qundef) {
+ for (uint32_t i = 0; i < len; i++) {
CHECK_AND_REPLACE(ptr[i]);
}
}
@@ -2769,9 +3381,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_ARRAY:
{
rb_ary_cancel_sharing(obj);
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE);
-#endif
for (int i = 0; i < RARRAY_LENINT(obj); i++) {
VALUE e = rb_ary_entry(obj, i);
@@ -2786,12 +3395,8 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
RB_GC_GUARD(obj);
}
break;
-
case T_HASH:
{
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE);
-#endif
struct obj_traverse_replace_callback_data d = {
.stop = false,
.data = data,
@@ -2816,9 +3421,6 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
case T_STRUCT:
{
-#if USE_TRANSIENT_HEAP
- if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE);
-#endif
long len = RSTRUCT_LEN(obj);
const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
@@ -2859,7 +3461,7 @@ obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
rb_bug("unreachable");
}
- data->replacement = replacement;
+ data->replacement = (VALUE)replacement;
if (data->leave_func(obj, data) == traverse_stop) {
return 1;
@@ -2919,6 +3521,8 @@ ractor_moved_bang(VALUE obj)
rv->v3 = 0;
rv->flags = rv->flags & ~fl_users;
+ if (BUILTIN_TYPE(obj) == T_OBJECT) ROBJECT_SET_SHAPE_ID(obj, ROOT_SHAPE_ID);
+
// TODO: record moved location
}
@@ -2930,7 +3534,9 @@ move_enter(VALUE obj, struct obj_traverse_replace_data *data)
return traverse_skip;
}
else {
- data->replacement = rb_obj_alloc(RBASIC_CLASS(obj));
+ VALUE moved = rb_obj_alloc(RBASIC_CLASS(obj));
+ rb_shape_set_shape(moved, rb_shape_get_shape(obj));
+ data->replacement = moved;
return traverse_cont;
}
}
@@ -2964,7 +3570,7 @@ static VALUE
ractor_move(VALUE obj)
{
VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true);
- if (val != Qundef) {
+ if (!UNDEF_P(val)) {
return val;
}
else {
@@ -2995,7 +3601,7 @@ static VALUE
ractor_copy(VALUE obj)
{
VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false);
- if (val != Qundef) {
+ if (!UNDEF_P(val)) {
return val;
}
else {
@@ -3039,9 +3645,9 @@ ractor_local_storage_mark(rb_ractor_t *r)
for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i];
- st_data_t val;
- if (st_delete(r->local_storage, (st_data_t *)&key, &val) &&
- key->type->free) {
+ st_data_t val, k = (st_data_t)key;
+ if (st_delete(r->local_storage, &k, &val) &&
+ (key = (rb_ractor_local_key_t)k)->type->free) {
(*key->type->free)((void *)val);
}
}
@@ -3127,7 +3733,7 @@ static bool
ractor_local_ref(rb_ractor_local_key_t key, void **pret)
{
if (rb_ractor_main_p()) {
- if ((VALUE)key->main_cache != Qundef) {
+ if (!UNDEF_P((VALUE)key->main_cache)) {
*pret = key->main_cache;
return true;
}
@@ -3166,9 +3772,9 @@ ractor_local_set(rb_ractor_local_key_t key, void *ptr)
VALUE
rb_ractor_local_storage_value(rb_ractor_local_key_t key)
{
- VALUE val;
- if (ractor_local_ref(key, (void **)&val)) {
- return val;
+ void *val;
+ if (ractor_local_ref(key, &val)) {
+ return (VALUE)val;
}
else {
return Qnil;