summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2024-11-05 03:55:52 +0900
committerKoichi Sasada <ko1@atdot.net>2024-11-08 18:02:46 +0900
commitc8297c3eed571e978dded34dd2036a53d6f28bb3 (patch)
tree14fbd2c4218a73d1c67f3f45ae80ebbd3bf6749d /thread.c
parent29578773c2baacef4bd3d4e239713200ac0e6b08 (diff)
`interrupt_exec`
introduce - rb_threadptr_interrupt_exec - rb_ractor_interrupt_exec to intercept the thread/ractor execution.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/11142
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c190
1 files changed, 166 insertions, 24 deletions
diff --git a/thread.c b/thread.c
index 815fcc1151..83b61cbcc3 100644
--- a/thread.c
+++ b/thread.c
@@ -342,25 +342,33 @@ unblock_function_clear(rb_thread_t *th)
}
static void
-rb_threadptr_interrupt_common(rb_thread_t *th, int trap)
+threadptr_interrupt_locked(rb_thread_t *th, bool trap)
{
+ // th->interrupt_lock should be acquired here
+
RUBY_DEBUG_LOG("th:%u trap:%d", rb_th_serial(th), trap);
+ if (trap) {
+ RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
+ }
+ else {
+ RUBY_VM_SET_INTERRUPT(th->ec);
+ }
+
+ if (th->unblock.func != NULL) {
+ (th->unblock.func)(th->unblock.arg);
+ }
+ else {
+ /* none */
+ }
+}
+
+static void
+threadptr_interrupt(rb_thread_t *th, int trap)
+{
rb_native_mutex_lock(&th->interrupt_lock);
{
- if (trap) {
- RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
- }
- else {
- RUBY_VM_SET_INTERRUPT(th->ec);
- }
-
- if (th->unblock.func != NULL) {
- (th->unblock.func)(th->unblock.arg);
- }
- else {
- /* none */
- }
+ threadptr_interrupt_locked(th, trap);
}
rb_native_mutex_unlock(&th->interrupt_lock);
}
@@ -369,13 +377,13 @@ void
rb_threadptr_interrupt(rb_thread_t *th)
{
RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
- rb_threadptr_interrupt_common(th, 0);
+ threadptr_interrupt(th, false);
}
static void
threadptr_trap_interrupt(rb_thread_t *th)
{
- rb_threadptr_interrupt_common(th, 1);
+ threadptr_interrupt(th, true);
}
static void
@@ -490,6 +498,7 @@ rb_thread_terminate_all(rb_thread_t *th)
}
void rb_threadptr_root_fiber_terminate(rb_thread_t *th);
+static void threadptr_interrupt_exec_cleanup(rb_thread_t *th);
static void
thread_cleanup_func_before_exec(void *th_ptr)
@@ -500,6 +509,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
// The thread stack doesn't exist in the forked process:
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;
+ threadptr_interrupt_exec_cleanup(th);
rb_threadptr_root_fiber_terminate(th);
}
@@ -2418,6 +2428,8 @@ threadptr_get_interrupts(rb_thread_t *th)
return interrupt & (rb_atomic_t)~ec->interrupt_mask;
}
+static void threadptr_interrupt_exec_exec(rb_thread_t *th);
+
int
rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
@@ -2449,17 +2461,29 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
rb_postponed_job_flush(th->vm);
}
- /* signal handling */
- if (trap_interrupt && (th == th->vm->ractor.main_thread)) {
- enum rb_thread_status prev_status = th->status;
+ if (trap_interrupt) {
+ /* signal handling */
+ if (th == th->vm->ractor.main_thread) {
+ enum rb_thread_status prev_status = th->status;
- th->status = THREAD_RUNNABLE;
- {
- while ((sig = rb_get_next_signal()) != 0) {
- ret |= rb_signal_exec(th, sig);
+ th->status = THREAD_RUNNABLE;
+ {
+ while ((sig = rb_get_next_signal()) != 0) {
+ ret |= rb_signal_exec(th, sig);
+ }
}
+ th->status = prev_status;
+ }
+
+ if (!ccan_list_empty(&th->interrupt_exec_tasks)) {
+ enum rb_thread_status prev_status = th->status;
+
+ th->status = THREAD_RUNNABLE;
+ {
+ threadptr_interrupt_exec_exec(th);
+ }
+ th->status = prev_status;
}
- th->status = prev_status;
}
/* exception from another thread */
@@ -4700,6 +4724,7 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
/* may be held by any thread in parent */
rb_native_mutex_initialize(&th->interrupt_lock);
+ ccan_list_head_init(&th->interrupt_exec_tasks);
vm->fork_gen++;
rb_ractor_sleeper_threads_clear(th->ractor);
@@ -5920,3 +5945,120 @@ rb_internal_thread_specific_set(VALUE thread_val, rb_internal_thread_specific_ke
th->specific_storage[key] = data;
}
+
+// interrupt_exec
+
+struct rb_interrupt_exec_task {
+ struct ccan_list_node node;
+
+ rb_interrupt_exec_func_t *func;
+ void *data;
+ enum rb_interrupt_exec_flag flags;
+};
+
+void
+rb_threadptr_interrupt_exec_task_mark(rb_thread_t *th)
+{
+ struct rb_interrupt_exec_task *task;
+
+ ccan_list_for_each(&th->interrupt_exec_tasks, task, node) {
+ if (task->flags & rb_interrupt_exec_flag_value_data) {
+ rb_gc_mark((VALUE)task->data);
+ }
+ }
+}
+
+// native thread safe
+// th should be available
+void
+rb_threadptr_interrupt_exec(rb_thread_t *th, rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
+{
+ // should not use ALLOC
+ struct rb_interrupt_exec_task *task = ALLOC(struct rb_interrupt_exec_task);
+ *task = (struct rb_interrupt_exec_task) {
+ .flags = flags,
+ .func = func,
+ .data = data,
+ };
+
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ ccan_list_add_tail(&th->interrupt_exec_tasks, &task->node);
+ threadptr_interrupt_locked(th, true);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
+
+static void
+threadptr_interrupt_exec_exec(rb_thread_t *th)
+{
+ while (1) {
+ struct rb_interrupt_exec_task *task;
+
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node);
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+
+ if (task) {
+ (*task->func)(task->data);
+ ruby_xfree(task);
+ }
+ else {
+ break;
+ }
+ }
+}
+
+static void
+threadptr_interrupt_exec_cleanup(rb_thread_t *th)
+{
+ rb_native_mutex_lock(&th->interrupt_lock);
+ {
+ struct rb_interrupt_exec_task *task;
+
+ while ((task = ccan_list_pop(&th->interrupt_exec_tasks, struct rb_interrupt_exec_task, node)) != NULL) {
+ ruby_xfree(task);
+ }
+ }
+ rb_native_mutex_unlock(&th->interrupt_lock);
+}
+
+struct interrupt_ractor_new_thread_data {
+ rb_interrupt_exec_func_t *func;
+ void *data;
+};
+
+static VALUE
+interrupt_ractor_new_thread_func(void *data)
+{
+ struct interrupt_ractor_new_thread_data d = *(struct interrupt_ractor_new_thread_data *)data;
+ ruby_xfree(data);
+
+ d.func(d.data);
+ return Qnil;
+}
+
+static VALUE
+interrupt_ractor_func(void *data)
+{
+ rb_thread_create(interrupt_ractor_new_thread_func, data);
+ return Qnil;
+}
+
+// native thread safe
+// func/data should be native thread safe
+void
+rb_ractor_interrupt_exec(struct rb_ractor_struct *target_r,
+ rb_interrupt_exec_func_t *func, void *data, enum rb_interrupt_exec_flag flags)
+{
+ struct interrupt_ractor_new_thread_data *d = ALLOC(struct interrupt_ractor_new_thread_data);
+
+ d->func = func;
+ d->data = data;
+ rb_thread_t *main_th = target_r->threads.main;
+ rb_threadptr_interrupt_exec(main_th, interrupt_ractor_func, d, flags);
+
+ // TODO MEMO: we can create a new thread in a ractor, but not sure how to do that now.
+}