diff options
| author | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2025-05-13 19:02:03 +0900 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-05-13 19:02:03 +0900 |
| commit | 425fa0aeb5ced20c03b2d5edb7a409666363ea8f (patch) | |
| tree | 703d21309b8cb8a6b695cd1e78b8aabdca90efc7 /thread.c | |
| parent | a6435befa76c2ae0525147f934bd9cd1914ffb8a (diff) | |
Make `waiting_fd` behaviour per-IO. (#13127)
- `rb_thread_fd_close` is deprecated and now a no-op.
- IO operations (including close) no longer take a vm-wide lock.
Notes
Notes:
Merged-By: ioquatix <samuel@codeotaku.com>
Diffstat (limited to 'thread.c')
| -rw-r--r-- | thread.c | 260 |
1 files changed, 133 insertions, 127 deletions
@@ -99,6 +99,8 @@ #include "vm_debug.h" #include "vm_sync.h" +#include "ccan/list/list.h" + #ifndef USE_NATIVE_THREAD_PRIORITY #define USE_NATIVE_THREAD_PRIORITY 0 #define RUBY_THREAD_PRIORITY_MAX 3 @@ -149,13 +151,6 @@ MAYBE_UNUSED(static int consume_communication_pipe(int fd)); static volatile int system_working = 1; static rb_internal_thread_specific_key_t specific_key_count; -struct waiting_fd { - struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */ - rb_thread_t *th; - int fd; - struct rb_io_close_wait_list *busy; -}; - /********************************************************************************/ #define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION @@ -1694,44 +1689,45 @@ waitfd_to_waiting_flag(int wfd_event) return wfd_event << 1; } -static void -thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd) -{ - wfd->fd = fd; - wfd->th = th; - wfd->busy = NULL; +struct io_blocking_operation_arguments { + struct rb_io *io; + struct rb_io_blocking_operation *blocking_operation; +}; - RB_VM_LOCK_ENTER(); - { - ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node); +static VALUE +io_blocking_operation_release(VALUE _arguments) { + struct io_blocking_operation_arguments *arguments = (void*)_arguments; + struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation; + + ccan_list_del(&blocking_operation->list); + + rb_io_t *io = arguments->io; + rb_thread_t *thread = io->closing_ec->thread_ptr; + rb_fiber_t *fiber = io->closing_ec->fiber_ptr; + + if (thread->scheduler != Qnil) { + rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber)); + } else { + rb_thread_wakeup(thread->self); } - RB_VM_LOCK_LEAVE(); + + return Qnil; } static void -thread_io_wake_pending_closer(struct waiting_fd *wfd) +rb_io_blocking_operation_release(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) { - bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex); - if (has_waiter) { - rb_mutex_lock(wfd->busy->wakeup_mutex); - } + VALUE wakeup_mutex = io->wakeup_mutex; - /* Needs to be protected with RB_VM_LOCK because we don't know if - wfd is on the global list of pending FD ops or if it's on a - struct rb_io_close_wait_list close-waiter. */ - RB_VM_LOCK_ENTER(); - ccan_list_del(&wfd->wfd_node); - RB_VM_LOCK_LEAVE(); + if (RB_TEST(wakeup_mutex)) { + struct io_blocking_operation_arguments arguments = { + .io = io, + .blocking_operation = blocking_operation + }; - if (has_waiter) { - rb_thread_t *th = rb_thread_ptr(wfd->busy->closing_thread); - if (th->scheduler != Qnil) { - rb_fiber_scheduler_unblock(th->scheduler, wfd->busy->closing_thread, wfd->busy->closing_fiber); - } - else { - rb_thread_wakeup(wfd->busy->closing_thread); - } - rb_mutex_unlock(wfd->busy->wakeup_mutex); + rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_release, (VALUE)&arguments); + } else { + ccan_list_del(&blocking_operation->list); } } @@ -1802,12 +1798,11 @@ rb_thread_mn_schedulable(VALUE thval) VALUE rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events) { - rb_execution_context_t *volatile ec = GET_EC(); - rb_thread_t *volatile th = rb_ec_thread_ptr(ec); + rb_execution_context_t * ec = GET_EC(); + rb_thread_t *th = rb_ec_thread_ptr(ec); RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events); - struct waiting_fd waiting_fd; volatile VALUE val = Qundef; /* shouldn't be used */ volatile int saved_errno = 0; enum ruby_tag_type state; @@ -1822,7 +1817,11 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void // `func` or not (as opposed to some previously set value). errno = 0; - thread_io_setup_wfd(th, fd, &waiting_fd); + struct rb_io_blocking_operation blocking_operation = { + .ec = ec, + }; + ccan_list_add(&io->blocking_operations, &blocking_operation.list); + { EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -1847,15 +1846,13 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void th = rb_ec_thread_ptr(ec); th->mn_schedulable = prev_mn_schedulable; } - /* - * must be deleted before jump - * this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list - */ - thread_io_wake_pending_closer(&waiting_fd); + + rb_io_blocking_operation_release(io, &blocking_operation); if (state) { EC_JUMP_TAG(ec, state); } + /* TODO: check func() */ RUBY_VM_CHECK_INTS_BLOCKING(ec); @@ -2639,76 +2636,81 @@ rb_ec_reset_raised(rb_execution_context_t *ec) return 1; } -int -rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy) +static size_t +thread_io_close_notify_all(struct rb_io *io) { - rb_vm_t *vm = GET_THREAD()->vm; - struct waiting_fd *wfd = 0, *next; - ccan_list_head_init(&busy->pending_fd_users); - int has_any; - VALUE wakeup_mutex; + RUBY_ASSERT_CRITICAL_SECTION_ENTER(); - RB_VM_LOCK_ENTER(); - { - ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) { - if (wfd->fd == fd) { - rb_thread_t *th = wfd->th; - VALUE err; - - ccan_list_del(&wfd->wfd_node); - ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node); - - wfd->busy = busy; - err = th->vm->special_exceptions[ruby_error_stream_closed]; - rb_threadptr_pending_interrupt_enque(th, err); - rb_threadptr_interrupt(th); - } - } + size_t count = 0; + rb_vm_t *vm = io->closing_ec->thread_ptr->vm; + VALUE error = vm->special_exceptions[ruby_error_stream_closed]; + + struct rb_io_blocking_operation *blocking_operation; + ccan_list_for_each(&io->blocking_operations, blocking_operation, list) { + rb_execution_context_t *ec = blocking_operation->ec; + + rb_thread_t *thread = ec->thread_ptr; + rb_threadptr_pending_interrupt_enque(thread, error); + + // This operation is slow: + rb_threadptr_interrupt(thread); + + count += 1; + } + + RUBY_ASSERT_CRITICAL_SECTION_LEAVE(); + + return count; +} + +size_t +rb_thread_io_close_interrupt(struct rb_io *io) +{ + // We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function. + if (io->closing_ec) { + return 0; } - has_any = !ccan_list_empty(&busy->pending_fd_users); - busy->closing_thread = rb_thread_current(); - busy->closing_fiber = rb_fiber_current(); - wakeup_mutex = Qnil; - if (has_any) { - wakeup_mutex = rb_mutex_new(); - RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */ + // If there are no blocking operations, we are done: + if (ccan_list_empty(&io->blocking_operations)) { + return 0; } - busy->wakeup_mutex = wakeup_mutex; - RB_VM_LOCK_LEAVE(); + // Otherwise, we are now closing the IO: + rb_execution_context_t *ec = GET_EC(); + io->closing_ec = ec; + + // This is used to ensure the correct execution context is woken up after the blocking operation is interrupted: + io->wakeup_mutex = rb_mutex_new(); - /* If the caller didn't pass *busy as a pointer to something on the stack, - we need to guard this mutex object on _our_ C stack for the duration - of this function. */ - RB_GC_GUARD(wakeup_mutex); - return has_any; + return thread_io_close_notify_all(io); } void -rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy) +rb_thread_io_close_wait(struct rb_io* io) { - if (!RB_TEST(busy->wakeup_mutex)) { - /* There was nobody else using this file when we closed it, so we - never bothered to allocate a mutex*/ + VALUE wakeup_mutex = io->wakeup_mutex; + + if (!RB_TEST(wakeup_mutex)) { + // There was nobody else using this file when we closed it, so we never bothered to allocate a mutex: return; } - rb_mutex_lock(busy->wakeup_mutex); - while (!ccan_list_empty(&busy->pending_fd_users)) { - rb_mutex_sleep(busy->wakeup_mutex, Qnil); + rb_mutex_lock(wakeup_mutex); + while (!ccan_list_empty(&io->blocking_operations)) { + rb_mutex_sleep(wakeup_mutex, Qnil); } - rb_mutex_unlock(busy->wakeup_mutex); + rb_mutex_unlock(wakeup_mutex); + + // We are done closing: + io->wakeup_mutex = Qnil; + io->closing_ec = NULL; } void rb_thread_fd_close(int fd) { - struct rb_io_close_wait_list busy; - - if (rb_notify_fd_close(fd, &busy)) { - rb_notify_fd_close_wait(&busy); - } + rb_warn("rb_thread_fd_close is deprecated (and is now a no-op)."); } /* @@ -4412,14 +4414,17 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) }}; volatile int result = 0; nfds_t nfds; - struct waiting_fd wfd; + struct rb_io_blocking_operation blocking_operation; enum ruby_tag_type state; volatile int lerrno; rb_execution_context_t *ec = GET_EC(); rb_thread_t *th = rb_ec_thread_ptr(ec); - thread_io_setup_wfd(th, fd, &wfd); + if (io) { + blocking_operation.ec = ec; + ccan_list_add(&io->blocking_operations, &blocking_operation.list); + } if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) { // fd is readable @@ -4428,25 +4433,27 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) errno = 0; } else { - EC_PUSH_TAG(wfd.th->ec); + EC_PUSH_TAG(ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_hrtime_t *to, rel, end = 0; - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(ec); timeout_prepare(&to, &rel, &end, timeout); do { nfds = numberof(fds); - result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno); + result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno); - RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(ec); } while (wait_retryable(&result, lerrno, to, end)); } EC_POP_TAG(); } - thread_io_wake_pending_closer(&wfd); + if (io) { + rb_io_blocking_operation_release(io, &blocking_operation); + } if (state) { - EC_JUMP_TAG(wfd.th->ec, state); + EC_JUMP_TAG(ec, state); } if (result < 0) { @@ -4479,6 +4486,9 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) } #else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ struct select_args { + struct rb_io *io; + struct rb_io_blocking_operation *blocking_operation; + union { int fd; int error; @@ -4486,7 +4496,6 @@ struct select_args { rb_fdset_t *read; rb_fdset_t *write; rb_fdset_t *except; - struct waiting_fd wfd; struct timeval *tv; }; @@ -4517,7 +4526,10 @@ select_single_cleanup(VALUE ptr) { struct select_args *args = (struct select_args *)ptr; - thread_io_wake_pending_closer(&args->wfd); + if (args->blocking_operation) { + rb_io_blocking_operation_release(args->io, args->blocking_operation); + } + if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4542,22 +4554,31 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout) { rb_fdset_t rfds, wfds, efds; struct select_args args; - int r; VALUE ptr = (VALUE)&args; - rb_thread_t *th = GET_THREAD(); + + struct rb_io_blocking_operation blocking_operation; + if (io) { + args.io = io; + blocking_operation.ec = GET_EC(); + ccan_list_add(&io->blocking_operations, &blocking_operation.list); + args.blocking_operation = &blocking_operation; + } else { + args.io = NULL; + blocking_operation.ec = NULL; + args.blocking_operation = NULL; + } args.as.fd = fd; args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; args.tv = timeout; - thread_io_setup_wfd(th, fd, &args.wfd); - r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); - if (r == -1) + int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); + if (result == -1) errno = args.as.error; - return r; + return result; } #endif /* ! USE_POLL */ @@ -5651,21 +5672,6 @@ rb_check_deadlock(rb_ractor_t *r) } } -// Used for VM memsize reporting. Returns the size of a list of waiting_fd -// structs. Defined here because the struct definition lives here as well. -size_t -rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds) -{ - struct waiting_fd *waitfd = 0; - size_t size = 0; - - ccan_list_for_each(waiting_fds, waitfd, wfd_node) { - size += sizeof(struct waiting_fd); - } - - return size; -} - static void update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg) { |
