summaryrefslogtreecommitdiff
path: root/vm_trace.c
diff options
context:
space:
mode:
authorKJ Tsanaktsidis <kj@kjtsanaktsidis.id.au>2023-11-19 22:54:57 +1100
committerKoichi Sasada <ko1@atdot.net>2023-12-10 15:00:37 +0900
commitf8effa209adb3ce050c100ffaffe6f3cc1508185 (patch)
treec0a672d5917a9917910679504d0f8b2d450088f7 /vm_trace.c
parentaecbd66742f43ccfcac04ca4143fcc68ad834320 (diff)
Change the semantics of rb_postponed_job_register
Our current implementation of rb_postponed_job_register suffers from some safety issues that can lead to interpreter crashes (see bug #1991). Essentially, the issue is that jobs can be called with the wrong arguments. We made two attempts to fix this whilst keeping the promised semantics, but: * The first one involved masking/unmasking when flushing jobs, which was believed to be too expensive * The second one involved a lock-free, multi-producer, single-consumer ringbuffer, which was too complex The critical insight behind this third solution is that essentially the only user of these APIs are a) internal, or b) profiling gems. For a), none of the usages actually require variable data; they will work just fine with the preregistration interface. For b), generally profiling gems only call a single callback with a single piece of data (which is actually usually just zero) for the life of the program. The ringbuffer is complex because it needs to support multi-word inserts of job & data (which can't be atomic); but nobody actually even needs that functionality, really. So, this comit: * Introduces a pre-registration API for jobs, with a GVL-requiring rb_postponed_job_prereigster, which returns a handle which can be used with an async-signal-safe rb_postponed_job_trigger. * Deprecates rb_postponed_job_register (and re-implements it on top of the preregister function for compatability) * Moves all the internal usages of postponed job register pre-registration
Diffstat (limited to 'vm_trace.c')
-rw-r--r--vm_trace.c272
1 files changed, 161 insertions, 111 deletions
diff --git a/vm_trace.c b/vm_trace.c
index 6114ef9d4e..dc4c4738b8 100644
--- a/vm_trace.c
+++ b/vm_trace.c
@@ -23,11 +23,15 @@
#include "eval_intern.h"
#include "internal.h"
+#include "internal/bits.h"
#include "internal/class.h"
+#include "internal/gc.h"
#include "internal/hash.h"
#include "internal/symbol.h"
+#include "internal/thread.h"
#include "iseq.h"
#include "rjit.h"
+#include "ruby/atomic.h"
#include "ruby/debug.h"
#include "vm_core.h"
#include "ruby/ractor.h"
@@ -1617,17 +1621,22 @@ Init_vm_trace(void)
rb_undef_alloc_func(rb_cTracePoint);
}
-typedef struct rb_postponed_job_struct {
- rb_postponed_job_func_t func;
- void *data;
-} rb_postponed_job_t;
-
-#define MAX_POSTPONED_JOB 1000
-#define MAX_POSTPONED_JOB_SPECIAL_ADDITION 24
+/*
+ * Ruby actually has two separate mechanisms for enqueueing work from contexts
+ * where it is not safe to run Ruby code, to run later on when it is safe. One
+ * is async-signal-safe but more limited, and accessed through the
+ * `rb_postponed_job_preregister` and `rb_postponed_job_trigger` functions. The
+ * other is more flexible but cannot be used in signal handlers, and is accessed
+ * through the `rb_workqueue_register` function.
+ *
+ * The postponed job functions form part of Ruby's extension API, but the
+ * workqueue functions are for internal use only.
+ */
struct rb_workqueue_job {
struct ccan_list_node jnode; /* <=> vm->workqueue */
- rb_postponed_job_t job;
+ rb_postponed_job_func_t func;
+ void *data;
};
// Used for VM memsize reporting. Returns the size of a list of rb_workqueue_job
@@ -1645,52 +1654,51 @@ rb_vm_memsize_workqueue(struct ccan_list_head *workqueue)
return size;
}
-// Used for VM memsize reporting. Returns the total size of the postponed job
-// buffer that was allocated at initialization.
-size_t
-rb_vm_memsize_postponed_job_buffer(void)
-{
- return sizeof(rb_postponed_job_t) * MAX_POSTPONED_JOB;
-}
-
-void
-Init_vm_postponed_job(void)
+/*
+ * thread-safe and called from non-Ruby thread
+ * returns FALSE on failure (ENOMEM), TRUE otherwise
+ */
+int
+rb_workqueue_register(unsigned flags, rb_postponed_job_func_t func, void *data)
{
+ struct rb_workqueue_job *wq_job = malloc(sizeof(*wq_job));
rb_vm_t *vm = GET_VM();
- vm->postponed_job_buffer = ALLOC_N(rb_postponed_job_t, MAX_POSTPONED_JOB);
- vm->postponed_job_index = 0;
- /* workqueue is initialized when VM locks are initialized */
-}
-
-enum postponed_job_register_result {
- PJRR_SUCCESS = 0,
- PJRR_FULL = 1,
- PJRR_INTERRUPTED = 2
-};
-/* Async-signal-safe */
-static enum postponed_job_register_result
-postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
- unsigned int flags, rb_postponed_job_func_t func, void *data, rb_atomic_t max, rb_atomic_t expected_index)
-{
- rb_postponed_job_t *pjob;
+ if (!wq_job) return FALSE;
+ wq_job->func = func;
+ wq_job->data = data;
- if (expected_index >= max) return PJRR_FULL; /* failed */
+ rb_nativethread_lock_lock(&vm->workqueue_lock);
+ ccan_list_add_tail(&vm->workqueue, &wq_job->jnode);
+ rb_nativethread_lock_unlock(&vm->workqueue_lock);
- if (ATOMIC_CAS(vm->postponed_job_index, expected_index, expected_index+1) == expected_index) {
- pjob = &vm->postponed_job_buffer[expected_index];
- }
- else {
- return PJRR_INTERRUPTED;
- }
+ // TODO: current implementation affects only main ractor
+ RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(rb_vm_main_ractor_ec(vm));
- /* unused: pjob->flags = flags; */
- pjob->func = func;
- pjob->data = data;
+ return TRUE;
+}
- RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
+#define PJOB_PREREG_TABLE_SIZE (sizeof(rb_atomic_t) * CHAR_BIT)
+/* pre-registered jobs table, for async-safe jobs */
+typedef struct rb_postponed_job_queue {
+ struct {
+ rb_postponed_job_func_t func;
+ void *data;
+ } prereg_table[PJOB_PREREG_TABLE_SIZE];
+ /* Bits in this are set when the corresponding entry in prereg_table has non-zero
+ * triggered_count; i.e. somebody called rb_postponed_job_trigger */
+ rb_atomic_t prereg_triggered_bitset;
+} rb_postponed_job_queues_t;
- return PJRR_SUCCESS;
+void
+rb_vm_postponed_job_queue_init(rb_vm_t *vm)
+{
+ /* use mimmalloc; postponed job registration is a dependency of objspace, so this gets
+ * called _VERY_ early inside Init_BareVM */
+ rb_postponed_job_queues_t *pjq = ruby_mimmalloc(sizeof(rb_postponed_job_queues_t));
+ pjq->prereg_triggered_bitset = 0;
+ memset(pjq->prereg_table, 0, sizeof(pjq->prereg_table));
+ vm->postponed_job_queue = pjq;
}
static rb_execution_context_t *
@@ -1701,83 +1709,113 @@ get_valid_ec(rb_vm_t *vm)
return ec;
}
-/*
- * return 0 if job buffer is full
- * Async-signal-safe
- */
-int
-rb_postponed_job_register(unsigned int flags, rb_postponed_job_func_t func, void *data)
+void
+rb_vm_postponed_job_atfork(void)
{
rb_vm_t *vm = GET_VM();
- rb_execution_context_t *ec = get_valid_ec(vm);
-
- begin:
- switch (postponed_job_register(ec, vm, flags, func, data, MAX_POSTPONED_JOB, vm->postponed_job_index)) {
- case PJRR_SUCCESS : return 1;
- case PJRR_FULL : return 0;
- case PJRR_INTERRUPTED: goto begin;
- default: rb_bug("unreachable");
+ rb_postponed_job_queues_t *pjq = vm->postponed_job_queue;
+ /* make sure we set the interrupt flag on _this_ thread if we carried any pjobs over
+ * from the other side of the fork */
+ if (pjq->prereg_triggered_bitset) {
+ RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(get_valid_ec(vm));
}
+
}
-/*
- * return 0 if job buffer is full
- * Async-signal-safe
- */
-int
-rb_postponed_job_register_one(unsigned int flags, rb_postponed_job_func_t func, void *data)
+/* Frees the memory managed by the postponed job infrastructure at shutdown */
+void
+rb_vm_postponed_job_free(void)
{
rb_vm_t *vm = GET_VM();
- rb_execution_context_t *ec = get_valid_ec(vm);
- rb_postponed_job_t *pjob;
- rb_atomic_t i, index;
-
- begin:
- index = vm->postponed_job_index;
- for (i=0; i<index; i++) {
- pjob = &vm->postponed_job_buffer[i];
- if (pjob->func == func) {
- RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
- return 2;
+ ruby_xfree(vm->postponed_job_queue);
+ vm->postponed_job_queue = NULL;
+}
+
+// Used for VM memsize reporting. Returns the total size of the postponed job
+// queue infrastructure.
+size_t
+rb_vm_memsize_postponed_job_queue(void)
+{
+ return sizeof(rb_postponed_job_queues_t);
+}
+
+
+rb_postponed_job_handle_t
+rb_postponed_job_preregister(rb_postponed_job_func_t func, void *data)
+{
+ /* The doc comments say that this function should be called under the GVL, because
+ * that is actually required to get the guarantee that "if a given (func, data) pair
+ * was already pre-registered, this method will return the same handle instance".
+ *
+ * However, the actual implementation here is called without the GVL, from inside
+ * rb_postponed_job_register, to support that legacy interface. In the presence
+ * of concurrent calls to both _preregister and _register functions on the same
+ * func, however, the data may get mixed up between them. */
+
+ rb_postponed_job_queues_t *pjq = GET_VM()->postponed_job_queue;
+ for (unsigned int i = 0; i < PJOB_PREREG_TABLE_SIZE; i++) {
+ /* Try and set this slot to equal `func` */
+ rb_postponed_job_func_t existing_func = (rb_postponed_job_func_t)RUBY_ATOMIC_PTR_CAS(pjq->prereg_table[i], NULL, (void *)func);
+ if (existing_func == NULL || existing_func == func) {
+ /* Either this slot was NULL, and we set it to func, or, this slot was already equal to func.
+ * In either case, clobber the data with our data. Note that concurrent calls to
+ * rb_postponed_job_register with the same func & different data will result in either of the
+ * datas being written */
+ RUBY_ATOMIC_PTR_EXCHANGE(pjq->prereg_table[i].data, data);
+ return (rb_postponed_job_handle_t)i;
+ } else {
+ /* Try the next slot if this one already has a func in it */
+ continue;
}
}
- switch (postponed_job_register(ec, vm, flags, func, data, MAX_POSTPONED_JOB + MAX_POSTPONED_JOB_SPECIAL_ADDITION, index)) {
- case PJRR_SUCCESS : return 1;
- case PJRR_FULL : return 0;
- case PJRR_INTERRUPTED: goto begin;
- default: rb_bug("unreachable");
- }
+
+ /* full */
+ return POSTPONED_JOB_HANDLE_INVALID;
}
-/*
- * thread-safe and called from non-Ruby thread
- * returns FALSE on failure (ENOMEM), TRUE otherwise
- */
-int
-rb_workqueue_register(unsigned flags, rb_postponed_job_func_t func, void *data)
+void
+rb_postponed_job_trigger(rb_postponed_job_handle_t h)
{
- struct rb_workqueue_job *wq_job = malloc(sizeof(*wq_job));
rb_vm_t *vm = GET_VM();
+ rb_postponed_job_queues_t *pjq = vm->postponed_job_queue;
- if (!wq_job) return FALSE;
- wq_job->job.func = func;
- wq_job->job.data = data;
+ RUBY_ATOMIC_OR(pjq->prereg_triggered_bitset, (((rb_atomic_t)1UL) << h));
+ RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(get_valid_ec(vm));
+}
- rb_nativethread_lock_lock(&vm->workqueue_lock);
- ccan_list_add_tail(&vm->workqueue, &wq_job->jnode);
- rb_nativethread_lock_unlock(&vm->workqueue_lock);
- // TODO: current implementation affects only main ractor
- RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(rb_vm_main_ractor_ec(vm));
+static int
+pjob_register_legacy_impl(unsigned int flags, rb_postponed_job_func_t func, void *data)
+{
+ /* We _know_ calling preregister from a signal handler like this is racy; what is
+ * and is not promised is very exhaustively documented in debug.h */
+ rb_postponed_job_handle_t h = rb_postponed_job_preregister(func, data);
+ if (h == POSTPONED_JOB_HANDLE_INVALID) {
+ return 0;
+ }
+ rb_postponed_job_trigger(h);
+ return 1;
+}
- return TRUE;
+int
+rb_postponed_job_register(unsigned int flags, rb_postponed_job_func_t func, void *data)
+{
+ return pjob_register_legacy_impl(flags, func, data);
}
+int
+rb_postponed_job_register_one(unsigned int flags, rb_postponed_job_func_t func, void *data)
+{
+ return pjob_register_legacy_impl(flags, func, data);
+}
+
+
void
rb_postponed_job_flush(rb_vm_t *vm)
{
+ rb_postponed_job_queues_t *pjq = GET_VM()->postponed_job_queue;
rb_execution_context_t *ec = GET_EC();
- const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK|TRAP_INTERRUPT_MASK;
+ const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK | TRAP_INTERRUPT_MASK;
volatile rb_atomic_t saved_mask = ec->interrupt_mask & block_mask;
VALUE volatile saved_errno = ec->errinfo;
struct ccan_list_head tmp;
@@ -1788,26 +1826,31 @@ rb_postponed_job_flush(rb_vm_t *vm)
ccan_list_append_list(&tmp, &vm->workqueue);
rb_nativethread_lock_unlock(&vm->workqueue_lock);
+ rb_atomic_t prereg_triggered_bits = RUBY_ATOMIC_EXCHANGE(pjq->prereg_triggered_bitset, 0);
+
ec->errinfo = Qnil;
/* mask POSTPONED_JOB dispatch */
ec->interrupt_mask |= block_mask;
{
EC_PUSH_TAG(ec);
if (EC_EXEC_TAG() == TAG_NONE) {
- rb_atomic_t index;
- struct rb_workqueue_job *wq_job;
-
- while ((index = vm->postponed_job_index) > 0) {
- if (ATOMIC_CAS(vm->postponed_job_index, index, index-1) == index) {
- rb_postponed_job_t *pjob = &vm->postponed_job_buffer[index-1];
- (*pjob->func)(pjob->data);
- }
+ /* execute postponed jobs */
+ while (prereg_triggered_bits) {
+ unsigned int i = bit_length(prereg_triggered_bits) - 1;
+ prereg_triggered_bits ^= ((1UL) << i); /* toggle ith bit off */
+ rb_postponed_job_func_t func = pjq->prereg_table[i].func;
+ void *data = pjq->prereg_table[i].data;
+ (func)(data);
}
+
+ /* execute workqueue jobs */
+ struct rb_workqueue_job *wq_job;
while ((wq_job = ccan_list_pop(&tmp, struct rb_workqueue_job, jnode))) {
- rb_postponed_job_t pjob = wq_job->job;
+ rb_postponed_job_func_t func = wq_job->func;
+ void *data = wq_job->data;
free(wq_job);
- (pjob.func)(pjob.data);
+ (func)(data);
}
}
EC_POP_TAG();
@@ -1816,7 +1859,8 @@ rb_postponed_job_flush(rb_vm_t *vm)
ec->interrupt_mask &= ~(saved_mask ^ block_mask);
ec->errinfo = saved_errno;
- /* don't leak memory if a job threw an exception */
+ /* If we threw an exception, there might be leftover workqueue items; carry them over
+ * to a subsequent execution of flush */
if (!ccan_list_empty(&tmp)) {
rb_nativethread_lock_lock(&vm->workqueue_lock);
ccan_list_prepend_list(&vm->workqueue, &tmp);
@@ -1824,4 +1868,10 @@ rb_postponed_job_flush(rb_vm_t *vm)
RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
}
+ /* likewise with any remaining-to-be-executed bits of the preregistered postponed
+ * job table */
+ if (prereg_triggered_bits) {
+ RUBY_ATOMIC_OR(pjq->prereg_triggered_bitset, prereg_triggered_bits);
+ RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
+ }
}