summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKoichi Sasada <ko1@atdot.net>2023-12-29 03:52:45 +0900
committerKoichi Sasada <ko1@atdot.net>2024-01-05 05:51:25 +0900
commitd65d2fb6b5c56c495f2f98f8e6ac5994ad5d93dd (patch)
tree3bf8410052a24b8da27cafe6f39ac13188a56ce1
parent6c252912af4981f016a9abdb4c1689307a4f1d2f (diff)
Do not `poll` first
Before this patch, the MN scheduler waits for the IO with the following steps: 1. `poll(fd, timeout=0)` to check fd is ready or not. 2. if fd is not ready, waits with MN thread scheduler 3. call `func` to issue the blocking I/O call The advantage of advanced `poll()` is we can wait for the IO ready for any fds. However `poll()` becomes overhead for already ready fds. This patch changes the steps like: 1. call `func` to issue the blocking I/O call 2. if the `func` returns `EWOULDBLOCK` the fd is `O_NONBLOCK` and we need to wait for fd is ready so that waits with MN thread scheduler. In this case, we can wait only for `O_NONBLOCK` fds. Otherwise it waits with blocking operations such as `read()` system call. However we don't need to call `poll()` to check fd is ready in advance. With this patch we can observe performance improvement on microbenchmark which repeats blocking I/O (not `O_NONBLOCK` fd) with and without MN thread scheduler. ```ruby require 'benchmark' f = open('/dev/null', 'w') f.sync = true TN = 1 N = 1_000_000 / TN Benchmark.bm{|x| x.report{ TN.times.map{ Thread.new{ N.times{f.print '.'} } }.each(&:join) } } __END__ TN = 1 user system total real ruby32 0.393966 0.101122 0.495088 ( 0.495235) ruby33 0.493963 0.089521 0.583484 ( 0.584091) ruby33+MN 0.639333 0.200843 0.840176 ( 0.840291) <- Slow this+MN 0.512231 0.099091 0.611322 ( 0.611074) <- Good ```
-rw-r--r--internal/thread.h1
-rw-r--r--io.c5
-rw-r--r--rjit_c.rb1
-rw-r--r--thread.c116
-rw-r--r--vm_core.h1
5 files changed, 79 insertions, 45 deletions
diff --git a/internal/thread.h b/internal/thread.h
index cf25975d8b..47273436e3 100644
--- a/internal/thread.h
+++ b/internal/thread.h
@@ -74,5 +74,6 @@ int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */
RUBY_SYMBOL_EXPORT_END
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
+bool rb_thread_mn_schedulable(VALUE thread);
#endif /* INTERNAL_THREAD_H */
diff --git a/io.c b/io.c
index a34c457140..f6cd2c1a56 100644
--- a/io.c
+++ b/io.c
@@ -1146,6 +1146,11 @@ static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval
static inline int
io_internal_wait(VALUE thread, rb_io_t *fptr, int error, int events, struct timeval *timeout)
{
+ if (!timeout && rb_thread_mn_schedulable(thread)) {
+ RUBY_ASSERT(errno == EWOULDBLOCK || errno == EAGAIN);
+ return -1;
+ }
+
int ready = nogvl_wait_for(thread, fptr, events, timeout);
if (ready > 0) {
diff --git a/rjit_c.rb b/rjit_c.rb
index f100fb911c..61ae3b04a7 100644
--- a/rjit_c.rb
+++ b/rjit_c.rb
@@ -1497,6 +1497,7 @@ module RubyVM::RJIT # :nodoc: all
nt: [CType::Pointer.new { self.rb_native_thread }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), nt)")],
ec: [CType::Pointer.new { self.rb_execution_context_t }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), ec)")],
sched: [self.rb_thread_sched_item, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), sched)")],
+ mn_schedulable: [self._Bool, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), mn_schedulable)")],
serial: [self.rb_atomic_t, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), serial)")],
last_status: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), last_status)")],
calling: [CType::Pointer.new { self.rb_calling_info }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), calling)")],
diff --git a/thread.c b/thread.c
index 720f07394d..34986ebbe6 100644
--- a/thread.c
+++ b/thread.c
@@ -1687,15 +1687,22 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
}
}
-static int
-thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int events, struct timeval *timeout, struct waiting_fd *wfd)
+static bool
+thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
{
#if defined(USE_MN_THREADS) && USE_MN_THREADS
- if (!th_has_dedicated_nt(th) &&
- (events || timeout) &&
- th->blocking // no fiber scheduler
- ) {
- int r;
+ return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking;
+#else
+ return false;
+#endif
+}
+
+// true if need retry
+static bool
+thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout)
+{
+#if defined(USE_MN_THREADS) && USE_MN_THREADS
+ if (thread_io_mn_schedulable(th, events, timeout)) {
rb_hrtime_t rel, *prel;
if (timeout) {
@@ -1708,20 +1715,40 @@ thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int e
VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT)));
- thread_io_setup_wfd(th, fd, wfd);
- {
- // wait readable/writable
- r = thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
+ if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
+ // timeout
+ return false;
}
- thread_io_wake_pending_closer(wfd);
+ else {
+ return true;
+ }
+ }
+#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
+ return false;
+}
- RUBY_VM_CHECK_INTS_BLOCKING(ec);
+// assume read/write
+static bool
+blocking_call_retryable_p(int r, int eno)
+{
+ if (r != -1) return false;
- return r;
+ switch (eno) {
+ case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+ case EWOULDBLOCK:
+#endif
+ return true;
+ default:
+ return false;
}
-#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
+}
- return 0;
+bool
+rb_thread_mn_schedulable(VALUE thval)
+{
+ rb_thread_t *th = rb_thread_ptr(thval);
+ return th->mn_schedulable;
}
VALUE
@@ -1733,12 +1760,11 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
struct waiting_fd waiting_fd;
-
- thread_io_wait_events(th, ec, fd, events, NULL, &waiting_fd);
-
volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
enum ruby_tag_type state;
+ bool prev_mn_schedulable = th->mn_schedulable;
+ th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL);
// `errno` is only valid when there is an actual error - but we can't
// extract that from the return value of `func` alone, so we clear any
@@ -1747,16 +1773,26 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
errno = 0;
thread_io_setup_wfd(th, fd, &waiting_fd);
+ {
+ EC_PUSH_TAG(ec);
+ if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+ retry:
+ BLOCKING_REGION(waiting_fd.th, {
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, waiting_fd.th, FALSE);
+
+ if (events &&
+ blocking_call_retryable_p((int)val, saved_errno) &&
+ thread_io_wait_events(th, fd, events, NULL)) {
+ RUBY_VM_CHECK_INTS_BLOCKING(ec);
+ goto retry;
+ }
+ }
+ EC_POP_TAG();
- EC_PUSH_TAG(ec);
- if ((state = EC_EXEC_TAG()) == TAG_NONE) {
- BLOCKING_REGION(waiting_fd.th, {
- val = func(data1);
- saved_errno = errno;
- }, ubf_select, waiting_fd.th, FALSE);
+ th->mn_schedulable = prev_mn_schedulable;
}
- EC_POP_TAG();
-
/*
* must be deleted before jump
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
@@ -4316,20 +4352,20 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
- if (thread_io_wait_events(th, ec, fd, events, timeout, &wfd)) {
- return 0; // timeout
- }
-
thread_io_setup_wfd(th, fd, &wfd);
EC_PUSH_TAG(wfd.th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_hrtime_t *to, rel, end = 0;
+ struct timeval tv;
+
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
+
timeout_prepare(&to, &rel, &end, timeout);
fds[0].fd = fd;
fds[0].events = (short)events;
fds[0].revents = 0;
+
do {
nfds = 1;
@@ -4344,7 +4380,9 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
}, ubf_select, wfd.th, TRUE);
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
- } while (wait_retryable(&result, lerrno, to, end));
+ } while (wait_retryable(&result, lerrno, to, end) &&
+ thread_io_wait_events(th, fd, events, rb_hrtime2timeval(&tv, to)) &&
+ wait_retryable(&result, lerrno, to, end));
}
EC_POP_TAG();
@@ -4452,24 +4490,12 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);
- if (thread_io_wait_events(th, ec, fd, events, timeout, &args.wfd)) {
- return 0; // timeout
- }
-
args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = timeout;
- args.wfd.fd = fd;
- args.wfd.th = th;
- args.wfd.busy = NULL;
-
- RB_VM_LOCK_ENTER();
- {
- ccan_list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
- }
- RB_VM_LOCK_LEAVE();
+ thread_io_setup_wfd(th, fd, &args.wfd);
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
if (r == -1)
diff --git a/vm_core.h b/vm_core.h
index ef770ab441..354603514e 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -1076,6 +1076,7 @@ typedef struct rb_thread_struct {
rb_execution_context_t *ec;
struct rb_thread_sched_item sched;
+ bool mn_schedulable;
rb_atomic_t serial; // only for RUBY_DEBUG_LOG()
VALUE last_status; /* $? */