From 0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 14 May 2020 22:10:55 +1200 Subject: Thread scheduler for light weight concurrency. --- io.c | 113 +++++++++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 41 deletions(-) (limited to 'io.c') diff --git a/io.c b/io.c index fc817f96b2..26202e0806 100644 --- a/io.c +++ b/io.c @@ -177,15 +177,6 @@ off_t __syscall(quad_t number, ...); #define rename(f, t) rb_w32_urename((f), (t)) #endif -#if defined(_WIN32) -# define RUBY_PIPE_NONBLOCK_DEFAULT (0) -#elif defined(O_NONBLOCK) - /* disabled for [Bug #15356] (Rack::Deflater + rails) failure: */ -# define RUBY_PIPE_NONBLOCK_DEFAULT (0) -#else /* any platforms where O_NONBLOCK does not exist? */ -# define RUBY_PIPE_NONBLOCK_DEFAULT (0) -#endif - VALUE rb_cIO; VALUE rb_eEOFError; VALUE rb_eIOError; @@ -406,44 +397,37 @@ rb_fd_set_nonblock(int fd) } int -rb_cloexec_pipe(int fildes[2]) +rb_cloexec_pipe(int descriptors[2]) { - int ret; - -#if defined(HAVE_PIPE2) - static int try_pipe2 = 1; - if (try_pipe2) { - ret = pipe2(fildes, O_CLOEXEC | RUBY_PIPE_NONBLOCK_DEFAULT); - if (ret != -1) - return ret; - /* pipe2 is available since Linux 2.6.27, glibc 2.9. */ - if (errno == ENOSYS) { - try_pipe2 = 0; - ret = pipe(fildes); - } - } - else { - ret = pipe(fildes); - } +#ifdef HAVE_PIPE2 + int result = pipe2(descriptors, O_CLOEXEC | O_NONBLOCK); #else - ret = pipe(fildes); + int result = pipe(descriptors); #endif - if (ret < 0) return ret; + + if (result < 0) + return result; + #ifdef __CYGWIN__ - if (ret == 0 && fildes[1] == -1) { - close(fildes[0]); - fildes[0] = -1; - errno = ENFILE; - return -1; + if (ret == 0 && descriptors[1] == -1) { + close(descriptors[0]); + descriptors[0] = -1; + errno = ENFILE; + return -1; } #endif - rb_maygvl_fd_fix_cloexec(fildes[0]); - rb_maygvl_fd_fix_cloexec(fildes[1]); - if (RUBY_PIPE_NONBLOCK_DEFAULT) { - rb_fd_set_nonblock(fildes[0]); - rb_fd_set_nonblock(fildes[1]); - } - return ret; + +#ifndef HAVE_PIPE2 + rb_maygvl_fd_fix_cloexec(descriptors[0]); + rb_maygvl_fd_fix_cloexec(descriptors[1]); + +#ifndef _WIN32 + rb_fd_set_nonblock(descriptors[0]); + rb_fd_set_nonblock(descriptors[1]); +#endif +#endif + + return result; } int @@ -1270,6 +1254,12 @@ io_fflush(rb_io_t *fptr) int rb_io_wait_readable(int f) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f)); + return RTEST(result); + } + io_fd_check_closed(f); switch (errno) { case EINTR: @@ -1294,6 +1284,12 @@ rb_io_wait_readable(int f) int rb_io_wait_writable(int f) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f)); + return RTEST(result); + } + io_fd_check_closed(f); switch (errno) { case EINTR: @@ -10897,6 +10893,23 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp) return FALSE; } +struct wait_for_single_fd { + VALUE scheduler; + + int fd; + short events; + + VALUE result; +}; + +void * rb_thread_scheduler_wait_for_single_fd(void * _args) { + struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args; + + args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil); + + return NULL; +} + #if USE_POLL # define IOWAIT_SYSCALL "poll" STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN); @@ -10904,6 +10917,13 @@ STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT); static int nogvl_wait_for_single_fd(int fd, short events) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events}; + rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args); + return RTEST(args.result); + } + struct pollfd fds; fds.fd = fd; @@ -10916,6 +10936,13 @@ nogvl_wait_for_single_fd(int fd, short events) static int nogvl_wait_for_single_fd(int fd, short events) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events}; + rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args); + return RTEST(args.result); + } + rb_fdset_t fds; int ret; @@ -13283,6 +13310,10 @@ Init_IO(void) rb_cIO = rb_define_class("IO", rb_cObject); rb_include_module(rb_cIO, rb_mEnumerable); + rb_define_const(rb_cIO, "WAIT_READABLE", INT2NUM(RB_WAITFD_IN)); + rb_define_const(rb_cIO, "WAIT_PRIORITY", INT2NUM(RB_WAITFD_PRI)); + rb_define_const(rb_cIO, "WAIT_WRITABLE", INT2NUM(RB_WAITFD_OUT)); + /* exception to wait for reading. see IO.select. */ rb_mWaitReadable = rb_define_module_under(rb_cIO, "WaitReadable"); /* exception to wait for writing. see IO.select. */ -- cgit v1.2.3