summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-05-14 22:10:55 +1200
committerGitHub <noreply@github.com>2020-05-14 22:10:55 +1200
commit0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 (patch)
tree74d381412dfd8ff49dd3039f8aeae09ad9e4e6e3
parent336119dfc5e6baae0a936d6feae780a61975479c (diff)
Thread scheduler for light weight concurrency.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3032 Merged-By: ioquatix <samuel@codeotaku.com>
-rw-r--r--cont.c92
-rw-r--r--doc/fiber.rdoc137
-rw-r--r--ext/socket/ancdata.c6
-rw-r--r--ext/socket/init.c148
-rw-r--r--ext/socket/rubysocket.h7
-rw-r--r--ext/socket/socket.c98
-rw-r--r--ext/socket/unixsocket.c6
-rw-r--r--include/ruby/internal/intern/cont.h1
-rw-r--r--include/ruby/internal/intern/thread.h4
-rw-r--r--io.c113
-rw-r--r--process.c11
-rw-r--r--spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb4
-rw-r--r--spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb4
-rw-r--r--test/ruby/test_fiber.rb45
-rw-r--r--test/ruby/test_stack.rb81
-rwxr-xr-xtest/scheduler/http.rb53
-rw-r--r--test/scheduler/scheduler.rb163
-rw-r--r--test/scheduler/test_enumerator.rb45
-rw-r--r--test/scheduler/test_fiber.rb29
-rw-r--r--test/scheduler/test_http.rb28
-rw-r--r--test/scheduler/test_io.rb35
-rw-r--r--test/scheduler/test_mutex.rb47
-rw-r--r--test/scheduler/test_sleep.rb30
-rw-r--r--test/socket/test_basicsocket.rb2
-rw-r--r--thread.c109
-rw-r--r--thread_sync.c7
-rw-r--r--vm.c10
-rw-r--r--vm_core.h4
28 files changed, 1014 insertions, 305 deletions
diff --git a/cont.c b/cont.c
index 1ea9056248..a10e0580e6 100644
--- a/cont.c
+++ b/cont.c
@@ -241,12 +241,17 @@ struct rb_fiber_struct {
*/
unsigned int transferred : 1;
+ /* Whether the fiber is allowed to implicitly yield. */
+ unsigned int blocking : 1;
+
struct coroutine_context context;
struct fiber_pool_stack stack;
};
static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0};
+static ID fiber_initialize_keywords[2] = {0};
+
/*
* FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
* if MAP_STACK is passed.
@@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass)
}
static rb_fiber_t*
-fiber_t_alloc(VALUE fiber_value)
+fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
{
rb_fiber_t *fiber;
rb_thread_t *th = GET_THREAD();
@@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value)
fiber = ZALLOC(rb_fiber_t);
fiber->cont.self = fiber_value;
fiber->cont.type = FIBER_CONTEXT;
+ fiber->blocking = blocking;
cont_init(&fiber->cont, th);
fiber->cont.saved_ec.fiber_ptr = fiber;
@@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value)
}
static VALUE
-fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool)
+fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking)
{
- rb_fiber_t *fiber = fiber_t_alloc(self);
+ rb_fiber_t *fiber = fiber_t_alloc(self, blocking);
fiber->first_proc = proc;
fiber->stack.base = NULL;
@@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber)
sec->local_storage_recursive_hash_for_trace = Qnil;
}
+static struct fiber_pool *
+rb_fiber_pool_default(VALUE pool)
+{
+ return &shared_fiber_pool;
+}
+
+/* :nodoc: */
+static VALUE
+rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
+{
+ VALUE pool = Qnil;
+ VALUE blocking = Qtrue;
+
+ if (kw_splat != RB_NO_KEYWORDS) {
+ VALUE options = Qnil;
+ VALUE arguments[2] = {Qundef};
+
+ argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
+ rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);
+
+ blocking = arguments[0];
+ pool = arguments[1];
+ }
+
+ return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
+}
+
/* :nodoc: */
static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
- return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool);
+ return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
VALUE
rb_fiber_new(rb_block_call_func_t func, VALUE obj)
{
- return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool);
+ return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1);
+}
+
+static VALUE
+rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
+{
+ rb_thread_t * th = GET_THREAD();
+ VALUE scheduler = th->scheduler;
+ VALUE fiber = Qnil;
+
+ if (scheduler != Qnil) {
+ fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat);
+ } else {
+ rb_raise(rb_eRuntimeError, "No scheduler is available!");
+ }
+
+ return fiber;
+}
+
+static VALUE
+rb_f_fiber(int argc, VALUE *argv, VALUE obj)
+{
+ return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
}
static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt);
@@ -1820,6 +1875,10 @@ rb_fiber_start(void)
VM_ASSERT(th->ec == ruby_current_execution_context_ptr);
VM_ASSERT(FIBER_RESUMED_P(fiber));
+ if (fiber->blocking) {
+ th->blocking += 1;
+ }
+
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont;
@@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
fiber->cont.type = FIBER_CONTEXT;
fiber->cont.saved_ec.fiber_ptr = fiber;
fiber->cont.saved_ec.thread_ptr = th;
+ fiber->blocking = 1;
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
th->ec = &fiber->cont.saved_ec;
}
@@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
}
}
+ VM_ASSERT(FIBER_RUNNABLE_P(fiber));
+
if (is_resume) {
fiber->prev = fiber_current();
}
- VM_ASSERT(FIBER_RUNNABLE_P(fiber));
+ if (fiber_current()->blocking) {
+ th->blocking -= 1;
+ }
cont->argc = argc;
cont->kw_splat = kw_splat;
@@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
fiber_stack_release(fiber);
}
+ if (fiber_current()->blocking) {
+ th->blocking += 1;
+ }
+
RUBY_VM_CHECK_INTS(th->ec);
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil);
@@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS);
}
+VALUE
+rb_fiber_blocking_p(VALUE fiber)
+{
+ return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
+}
+
void
rb_fiber_close(rb_fiber_t *fiber)
{
@@ -2442,6 +2516,9 @@ Init_Cont(void)
fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size);
+ fiber_initialize_keywords[0] = rb_intern_const("blocking");
+ fiber_initialize_keywords[1] = rb_intern_const("pool");
+
char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
if (fiber_shared_fiber_pool_free_stacks) {
shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks);
@@ -2452,11 +2529,14 @@ Init_Cont(void)
rb_eFiberError = rb_define_class("FiberError", rb_eStandardError);
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
+ rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1);
rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);
rb_define_alias(rb_cFiber, "inspect", "to_s");
+ rb_define_global_function("Fiber", rb_f_fiber, -1);
+
#ifdef RB_EXPERIMENTAL_FIBER_POOL
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);
diff --git a/doc/fiber.rdoc b/doc/fiber.rdoc
new file mode 100644
index 0000000000..d3c19a0d14
--- /dev/null
+++ b/doc/fiber.rdoc
@@ -0,0 +1,137 @@
+= Fiber
+
+Fiber is a flow-control primitive which enable cooperative scheduling. This is
+in contrast to threads which can be preemptively scheduled at any time. While
+having a similar memory profiles, the cost of context switching fibers can be
+significantly less than threads as it does not involve a system call.
+
+== Design
+
+=== Scheduler
+
+The per-thread fiber scheduler interface is used to intercept blocking
+operations. A typical implementation would be a wrapper for a gem like
+EventMachine or Async. This design provides separation of concerns between the
+event loop implementation and application code. It also allows for layered
+schedulers which can perform instrumentation.
+
+ class Scheduler
+ # Wait for the given file descriptor to become readable.
+ def wait_readable(io)
+ end
+
+ # Wait for the given file descriptor to become writable.
+ def wait_writable(io)
+ end
+
+ # Wait for the given file descriptor to match the specified events within
+ # the specified timeout.
+ # @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
+ # `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
+ # @param timeout [#to_f] the amount of time to wait for the event.
+ def wait_any(io, events, timeout)
+ end
+
+ # Sleep the current task for the specified duration, or forever if not
+ # specified.
+ # @param duration [#to_f] the amount of time to sleep.
+ def wait_sleep(duration = nil)
+ end
+
+ # The Ruby virtual machine is going to enter a system level blocking
+ # operation.
+ def enter_blocking_region
+ end
+
+ # The Ruby virtual machine has completed the system level blocking
+ # operation.
+ def exit_blocking_region
+ end
+
+ # Intercept the creation of a non-blocking fiber.
+ def fiber(&block)
+ Fiber.new(blocking: false, &block)
+ end
+
+ # Invoked when the thread exits.
+ def run
+ # Implement event loop here.
+ end
+ end
+
+On CRuby, the following extra methods need to be implemented to handle the
+public C interface:
+
+ class Scheduler
+ # Wrapper for rb_wait_readable(int) C function.
+ def wait_readable_fd(fd)
+ wait_readable(::IO.from_fd(fd, autoclose: false))
+ end
+
+ # Wrapper for rb_wait_readable(int) C function.
+ def wait_writable_fd(fd)
+ wait_writable(::IO.from_fd(fd, autoclose: false))
+ end
+
+ # Wrapper for rb_wait_for_single_fd(int) C function.
+ def wait_for_single_fd(fd, events, duration)
+ wait_any(::IO.from_fd(fd, autoclose: false), events, duration)
+ end
+ end
+
+=== Non-blocking Fibers
+
+By default fibers are blocking. Non-blocking fibers may invoke specific
+scheduler hooks when a blocking operation occurs, and these hooks may introduce
+context switching points.
+
+ Fiber.new(blocking: false) do
+ puts Fiber.current.blocking? # false
+
+ # May invoke `Thread.scheduler&.wait_readable`.
+ io.read(...)
+
+ # May invoke `Thread.scheduler&.wait_writable`.
+ io.write(...)
+
+ # Will invoke `Thread.scheduler&.wait_sleep`.
+ sleep(n)
+ end.resume
+
+We also introduce a new method which simplifes the creation of these
+non-blocking fibers:
+
+ Fiber do
+ puts Fiber.current.blocking? # false
+ end
+
+The purpose of this method is to allow the scheduler to internally decide the
+policy for when to start the fiber, and whether to use symmetric or asymmetric
+fibers.
+
+=== Mutex
+
+Locking a mutex causes the +Thread#scheduler+ to not be used while the mutex
+is held by that thread. On +Mutex#lock+, fiber switching via the scheduler
+is disabled and operations become blocking for all fibers of the same +Thread+.
+On +Mutex#unlock+, the scheduler is enabled again.
+
+ mutex = Mutex.new
+
+ puts Thread.current.blocking? # 1 (true)
+
+ Fiber.new(blocking: false) do
+ puts Thread.current.blocking? # false
+ mutex.synchronize do
+ puts Thread.current.blocking? # (1) true
+ end
+
+ puts Thread.current.blocking? # false
+ end.resume
+
+=== Non-blocking I/O
+
+By default, I/O is non-blocking. Not all operating systems support non-blocking
+I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
+I/O is blocking. Provided that there *is* a scheduler and the current thread *is
+non-blocking*, the operation will invoke the scheduler.
diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c
index 84463af061..e0fc247f8c 100644
--- a/ext/socket/ancdata.c
+++ b/ext/socket/ancdata.c
@@ -2,7 +2,6 @@
#include <time.h>
-int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
static VALUE sym_wait_readable, sym_wait_writable;
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@@ -1429,10 +1428,7 @@ make_io_for_unix_rights(VALUE ctl, struct cmsghdr *cmh, char *msg_end)
if (fstat(fd, &stbuf) == -1)
rb_raise(rb_eSocket, "invalid fd in SCM_RIGHTS");
rb_update_max_fd(fd);
- if (rsock_cmsg_cloexec_state < 0)
- rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd);
- if (rsock_cmsg_cloexec_state == 0 || fd <= 2)
- rb_maygvl_fd_fix_cloexec(fd);
+ rb_maygvl_fd_fix_cloexec(fd);
if (S_ISSOCK(stbuf.st_mode))
io = rsock_init_sock(rb_obj_alloc(rb_cSocket), fd);
else
diff --git a/ext/socket/init.c b/ext/socket/init.c
index 6d17ecfb4e..0604e8b72f 100644
--- a/ext/socket/init.c
+++ b/ext/socket/init.c
@@ -408,84 +408,30 @@ rsock_write_nonblock(VALUE sock, VALUE str, VALUE ex)
}
#endif /* MSG_DONTWAIT_RELIABLE */
-/* returns true if SOCK_CLOEXEC is supported */
-int rsock_detect_cloexec(int fd)
+static int
+rsock_socket0(int domain, int type, int proto)
{
#ifdef SOCK_CLOEXEC
- int flags = fcntl(fd, F_GETFD);
-
- if (flags == -1)
- rb_bug("rsock_detect_cloexec: fcntl(%d, F_GETFD) failed: %s", fd, strerror(errno));
+ type |= SOCK_CLOEXEC;
+#endif
- if (flags & FD_CLOEXEC)
- return 1;
+#ifdef SOCK_NONBLOCK
+ type |= SOCK_NONBLOCK;
#endif
- return 0;
-}
-#ifdef SOCK_CLOEXEC
-static int
-rsock_socket0(int domain, int type, int proto)
-{
- int ret;
- static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
-
- if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */
- ret = socket(domain, type|SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT, proto);
- if (ret >= 0) {
- if (ret <= 2)
- goto fix_cloexec;
- goto update_max_fd;
- }
- }
- else if (cloexec_state < 0) { /* usually runs once only for detection */
- ret = socket(domain, type|SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT, proto);
- if (ret >= 0) {
- cloexec_state = rsock_detect_cloexec(ret);
- if (cloexec_state == 0 || ret <= 2)
- goto fix_cloexec;
- goto update_max_fd;
- }
- else if (ret == -1 && errno == EINVAL) {
- /* SOCK_CLOEXEC is available since Linux 2.6.27. Linux 2.6.18 fails with EINVAL */
- ret = socket(domain, type, proto);
- if (ret != -1) {
- cloexec_state = 0;
- /* fall through to fix_cloexec */
- }
- }
- }
- else { /* cloexec_state == 0 */
- ret = socket(domain, type, proto);
- }
- if (ret == -1)
+ int result = socket(domain, type, proto);
+
+ if (result == -1)
return -1;
-fix_cloexec:
- rb_maygvl_fd_fix_cloexec(ret);
- if (RSOCK_NONBLOCK_DEFAULT) {
- rsock_make_fd_nonblock(ret);
- }
-update_max_fd:
- rb_update_max_fd(ret);
- return ret;
-}
-#else /* !SOCK_CLOEXEC */
-static int
-rsock_socket0(int domain, int type, int proto)
-{
- int ret = socket(domain, type, proto);
+ rb_fd_fix_cloexec(result);
- if (ret == -1)
- return -1;
- rb_fd_fix_cloexec(ret);
- if (RSOCK_NONBLOCK_DEFAULT) {
- rsock_make_fd_nonblock(ret);
- }
+#ifndef SOCK_NONBLOCK
+ rsock_make_fd_nonblock(result);
+#endif
- return ret;
+ return result;
}
-#endif /* !SOCK_CLOEXEC */
int
rsock_socket(int domain, int type, int proto)
@@ -637,6 +583,10 @@ rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks)
void
rsock_make_fd_nonblock(int fd)
{
+#ifdef _WIN32
+ return;
+#endif
+
int flags;
#ifdef F_GETFL
flags = fcntl(fd, F_GETFL);
@@ -653,56 +603,34 @@ rsock_make_fd_nonblock(int fd)
}
static int
-cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len,
- int nonblock)
+cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len)
{
- int ret;
socklen_t len0 = 0;
-#ifdef HAVE_ACCEPT4
- static int try_accept4 = 1;
-#endif
- if (RSOCK_NONBLOCK_DEFAULT) {
- nonblock = 1;
- }
if (address_len) len0 = *address_len;
+
#ifdef HAVE_ACCEPT4
- if (try_accept4) {
- int flags = 0;
-#ifdef SOCK_CLOEXEC
- flags |= SOCK_CLOEXEC;
-#endif
+ int flags = SOCK_CLOEXEC;
+
#ifdef SOCK_NONBLOCK
- if (nonblock) {
- flags |= SOCK_NONBLOCK;
- }
+ flags |= SOCK_NONBLOCK;
#endif
- ret = accept4(socket, address, address_len, flags);
- /* accept4 is available since Linux 2.6.28, glibc 2.10. */
- if (ret != -1) {
- if (ret <= 2)
- rb_maygvl_fd_fix_cloexec(ret);
+
+ int result = accept4(socket, address, address_len, flags);
+ if (result == -1) return -1;
+
#ifndef SOCK_NONBLOCK
- if (nonblock) {
- rsock_make_fd_nonblock(ret);
- }
+ rsock_make_fd_nonblock(result);
#endif
- if (address_len && len0 < *address_len) *address_len = len0;
- return ret;
- }
- if (errno != ENOSYS) {
- return -1;
- }
- try_accept4 = 0;
- }
+#else
+ int result = accept(socket, address, address_len);
+ if (result == -1) return -1;
+
+ rb_maygvl_fd_fix_cloexec(result);
+ rsock_make_fd_nonblock(result);
#endif
- ret = accept(socket, address, address_len);
- if (ret == -1) return -1;
+
if (address_len && len0 < *address_len) *address_len = len0;
- rb_maygvl_fd_fix_cloexec(ret);
- if (nonblock) {
- rsock_make_fd_nonblock(ret);
- }
- return ret;
+ return result;
}
VALUE
@@ -712,7 +640,7 @@ rsock_s_accept_nonblock(VALUE klass, VALUE ex, rb_io_t *fptr,
int fd2;
rb_io_set_nonblock(fptr);
- fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1);
+ fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len);
if (fd2 < 0) {
int e = errno;
switch (e) {
@@ -744,7 +672,7 @@ static VALUE
accept_blocking(void *data)
{
struct accept_arg *arg = data;
- return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len, 0);
+ return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len);
}
VALUE
diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h
index 867e215273..fcea2a5a05 100644
--- a/ext/socket/rubysocket.h
+++ b/ext/socket/rubysocket.h
@@ -36,13 +36,7 @@
# if defined(_MSC_VER)
# undef HAVE_TYPE_STRUCT_SOCKADDR_DL
# endif
-/*
- * FIXME: failures if we make nonblocking the default
- * [ruby-core:89973] [ruby-core:89976] [ruby-core:89977] [Bug #14968]
- */
-# define RSOCK_NONBLOCK_DEFAULT (0)
#else
-# define RSOCK_NONBLOCK_DEFAULT (0)
# include <sys/socket.h>
# include <netinet/in.h>
# ifdef HAVE_NETINET_IN_SYSTM_H
@@ -260,7 +254,6 @@ typedef union {
#define INET_SOCKS 2
extern int rsock_do_not_reverse_lookup;
-extern int rsock_cmsg_cloexec_state;
#define FMODE_NOREVLOOKUP 0x100
/* common socket families only */
diff --git a/ext/socket/socket.c b/ext/socket/socket.c
index bfeb30340c..e4504620fb 100644
--- a/ext/socket/socket.c
+++ b/ext/socket/socket.c
@@ -168,93 +168,47 @@ pair_yield(VALUE pair)
#endif
#if defined HAVE_SOCKETPAIR
-
-#ifdef SOCK_CLOEXEC
static int
-rsock_socketpair0(int domain, int type, int protocol, int sv[2])
+rsock_socketpair0(int domain, int type, int protocol, int descriptors[2])
{
- int ret;
- static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
- static const int default_flags = SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT;
+#ifdef SOCK_CLOEXEC
+ type |= SOCK_CLOEXEC;
+#endif
- if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */
- ret = socketpair(domain, type|default_flags, protocol, sv);
- if (ret == 0 && (sv[0] <= 2 || sv[1] <= 2)) {
- goto fix_cloexec; /* highly unlikely */
- }
- goto update_max_fd;
- }
- else if (cloexec_state < 0) { /* usually runs once only for detection */
- ret = socketpair(domain, type|default_flags, protocol, sv);
- if (ret == 0) {
- cloexec_state = rsock_detect_cloexec(sv[0]);
- if ((cloexec_state == 0) || (sv[0] <= 2 || sv[1] <= 2))
- goto fix_cloexec;
- goto update_max_fd;
- }
- else if (ret == -1 && errno == EINVAL) {
- /* SOCK_CLOEXEC is available since Linux 2.6.27. Linux 2.6.18 fails with EINVAL */
- ret = socketpair(domain, type, protocol, sv);
- if (ret != -1) {
- /* The reason of EINVAL may be other than SOCK_CLOEXEC.
- * So disable SOCK_CLOEXEC only if socketpair() succeeds without SOCK_CLOEXEC.
- * Ex. Socket.pair(:UNIX, 0xff) fails with EINVAL.
- */
- cloexec_state = 0;
- }
- }
- }
- else { /* cloexec_state == 0 */
- ret = socketpair(domain, type, protocol, sv);
- }
- if (ret == -1) {
- return -1;
- }
+#ifdef SOCK_NONBLOCK
+ type |= SOCK_NONBLOCK;
+#endif
-fix_cloexec:
- rb_maygvl_fd_fix_cloexec(sv[0]);
- rb_maygvl_fd_fix_cloexec(sv[1]);
- if (RSOCK_NONBLOCK_DEFAULT) {
- rsock_make_fd_nonblock(sv[0]);
- rsock_make_fd_nonblock(sv[1]);
- }
+ int result = socketpair(domain, type, protocol, descriptors);
-update_max_fd:
- rb_update_max_fd(sv[0]);
- rb_update_max_fd(sv[1]);
+ if (result == -1)
+ return -1;
- return ret;
-}
-#else /* !SOCK_CLOEXEC */
-static int
-rsock_socketpair0(int domain, int type, int protocol, int sv[2])
-{
- int ret = socketpair(domain, type, protocol, sv);
+#ifndef SOCK_CLOEXEC
+ rb_fd_fix_cloexec(descriptors[0]);
+ rb_fd_fix_cloexec(descriptors[1]);
+#endif
- if (ret == -1)
- return -1;
+#ifndef SOCK_NONBLOCK
+ rsock_make_fd_nonblock(descriptors[0]);
+ rsock_make_fd_nonblock(descriptors[1]);
+#endif
- rb_fd_fix_cloexec(sv[0]);
- rb_fd_fix_cloexec(sv[1]);
- if (RSOCK_NONBLOCK_DEFAULT) {
- rsock_make_fd_nonblock(sv[0]);
- rsock_make_fd_nonblock(sv[1]);
- }
- return ret;
+ return result;
}
-#endif /* !SOCK_CLOEXEC */
static int
-rsock_socketpair(int domain, int type, int protocol, int sv[2])
+rsock_socketpair(int domain, int type, int protocol, int descriptors[2])
{
- int ret;
+ int result;
+
+ result = rsock_socketpair0(domain, type, protocol, descriptors);
- ret = rsock_socketpair0(domain, type, protocol, sv);
- if (ret < 0 && rb_gc_for_fd(errno)) {
- ret = rsock_socketpair0(domain, type, protocol, sv);
+ if (result < 0 && rb_gc_for_fd(errno)) {
+ result = rsock_socketpair0(domain, type, protocol, descriptors);
}
- return ret;
+ return result;
}
/*
diff --git a/ext/socket/unixsocket.c b/ext/socket/unixsocket.c
index 0c3a01d21e..53a50958ed 100644
--- a/ext/socket/unixsocket.c
+++ b/ext/socket/unixsocket.c
@@ -455,11 +455,7 @@ retry:
#endif
rb_update_max_fd(fd);
-
- if (rsock_cmsg_cloexec_state < 0)
- rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd);
- if (rsock_cmsg_cloexec_state == 0 || fd <= 2)
- rb_maygvl_fd_fix_cloexec(fd);
+ rb_maygvl_fd_fix_cloexec(fd);
if (klass == Qnil)
return INT2FIX(fd);
diff --git a/include/ruby/internal/intern/cont.h b/include/ruby/internal/intern/cont.h
index 6988f753c3..cfa5630af2 100644
--- a/include/ruby/internal/intern/cont.h
+++ b/include/ruby/internal/intern/cont.h
@@ -28,6 +28,7 @@ RBIMPL_SYMBOL_EXPORT_BEGIN()
/* cont.c */
VALUE rb_fiber_new(rb_block_call_func_t, VALUE);
+VALUE rb_fiber_new_kw(rb_block_call_func_t, VALUE, int kw_splat);
VALUE rb_fiber_resume(VALUE fib, int argc, const VALUE *argv);
VALUE rb_fiber_resume_kw(VALUE fib, int argc, const VALUE *argv, int kw_splat);
VALUE rb_fiber_yield(int argc, const VALUE *argv);
diff --git a/include/ruby/internal/intern/thread.h b/include/ruby/internal/intern/thread.h
index a12a371058..4a840cd881 100644
--- a/include/ruby/internal/intern/thread.h
+++ b/include/ruby/internal/intern/thread.h
@@ -71,6 +71,10 @@ VALUE rb_mutex_unlock(VALUE mutex);
VALUE rb_mutex_sleep(VALUE self, VALUE timeout);
VALUE rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg);
+VALUE rb_thread_scheduler_get(VALUE);
+VALUE rb_thread_scheduler_set(VALUE, VALUE);
+VALUE rb_current_thread_scheduler(void);
+
RBIMPL_SYMBOL_EXPORT_END()
#endif /* RBIMPL_INTERN_THREAD_H */
diff --git a/io.c b/io.c
index fc817f96b2..26202e0806 100644
--- a/io.c
+++ b/io.c
@@ -177,15 +177,6 @@ off_t __syscall(quad_t number, ...);
#define rename(f, t) rb_w32_urename((f), (t))
#endif
-#if defined(_WIN32)
-# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
-#elif defined(O_NONBLOCK)
- /* disabled for [Bug #15356] (Rack::Deflater + rails) failure: */
-# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
-#else /* any platforms where O_NONBLOCK does not exist? */
-# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
-#endif
-
VALUE rb_cIO;
VALUE rb_eEOFError;
VALUE rb_eIOError;
@@ -406,44 +397,37 @@ rb_fd_set_nonblock(int fd)
}
int
-rb_cloexec_pipe(int fildes[2])
+rb_cloexec_pipe(int descriptors[2])
{
- int ret;
-
-#if defined(HAVE_PIPE2)
- static int try_pipe2 = 1;
- if (try_pipe2) {
- ret = pipe2(fildes, O_CLOEXEC | RUBY_PIPE_NONBLOCK_DEFAULT);
- if (ret != -1)
- return ret;
- /* pipe2 is available since Linux 2.6.27, glibc 2.9. */
- if (errno == ENOSYS) {
- try_pipe2 = 0;
- ret = pipe(fildes);
- }
- }
- else {
- ret = pipe(fildes);
- }
+#ifdef HAVE_PIPE2
+ int result = pipe2(descriptors, O_CLOEXEC | O_NONBLOCK);
#else
- ret = pipe(fildes);
+ int result = pipe(descriptors);
#endif
- if (ret < 0) return ret;
+
+ if (result < 0)
+ return result;
+
#ifdef __CYGWIN__
- if (ret == 0 && fildes[1] == -1) {
- close(fildes[0]);
- fildes[0] = -1;
- errno = ENFILE;
- return -1;
+ if (ret == 0 && descriptors[1] == -1) {
+ close(descriptors[0]);
+ descriptors[0] = -1;
+ errno = ENFILE;
+ return -1;
}
#endif
- rb_maygvl_fd_fix_cloexec(fildes[0]);
- rb_maygvl_fd_fix_cloexec(fildes[1]);
- if (RUBY_PIPE_NONBLOCK_DEFAULT) {
- rb_fd_set_nonblock(fildes[0]);
- rb_fd_set_nonblock(fildes[1]);
- }
- return ret;
+
+#ifndef HAVE_PIPE2
+ rb_maygvl_fd_fix_cloexec(descriptors[0]);
+ rb_maygvl_fd_fix_cloexec(descriptors[1]);
+
+#ifndef _WIN32
+ rb_fd_set_nonblock(descriptors[0]);
+ rb_fd_set_nonblock(descriptors[1]);
+#endif
+#endif
+
+ return result;
}
int
@@ -1270,6 +1254,12 @@ io_fflush(rb_io_t *fptr)
int
rb_io_wait_readable(int f)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f));
+ return RTEST(result);
+ }
+
io_fd_check_closed(f);
switch (errno) {
case EINTR:
@@ -1294,6 +1284,12 @@ rb_io_wait_readable(int f)
int
rb_io_wait_writable(int f)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f));
+ return RTEST(result);
+ }
+
io_fd_check_closed(f);
switch (errno) {
case EINTR:
@@ -10897,6 +10893,23 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
return FALSE;
}
+struct wait_for_single_fd {
+ VALUE scheduler;
+
+ int fd;
+ short events;
+
+ VALUE result;
+};
+
+void * rb_thread_scheduler_wait_for_single_fd(void * _args) {
+ struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
+
+ args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil);
+
+ return NULL;
+}
+
#if USE_POLL
# define IOWAIT_SYSCALL "poll"
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
@@ -10904,6 +10917,13 @@ STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
static int
nogvl_wait_for_single_fd(int fd, short events)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
+ rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
+ return RTEST(args.result);
+ }
+
struct pollfd fds;
fds.fd = fd;
@@ -10916,6 +10936,13 @@ nogvl_wait_for_single_fd(int fd, short events)
static int
nogvl_wait_for_single_fd(int fd, short events)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
+ rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
+ return RTEST(args.result);
+ }
+
rb_fdset_t fds;
int ret;
@@ -13283,6 +13310,10 @@ Init_IO(void)
rb_cIO = rb_define_class("IO", rb_cObject);
rb_include_module(rb_cIO, rb_mEnumerable);
+ rb_define_const(rb_cIO, "WAIT_READABLE", INT2NUM(RB_WAITFD_IN));
+ rb_define_const(rb_cIO, "WAIT_PRIORITY", INT2NUM(RB_WAITFD_PRI));
+ rb_define_const(rb_cIO, "WAIT_WRITABLE", INT2NUM(RB_WAITFD_OUT));
+
/* exception to wait for reading. see IO.select. */
rb_mWaitReadable = rb_define_module_under(rb_cIO, "WaitReadable");
/* exception to wait for writing. see IO.select. */
diff --git a/process.c b/process.c
index 74301bba91..ee5b164dd7 100644
--- a/process.c
+++ b/process.c
@@ -4892,9 +4892,14 @@ rb_f_spawn(int argc, VALUE *argv, VALUE _)
static VALUE
rb_f_sleep(int argc, VALUE *argv, VALUE _)
{
- time_t beg, end;
+ VALUE scheduler = rb_current_thread_scheduler();
- beg = time(0);
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcallv(scheduler, rb_intern("wait_sleep"), argc, argv);
+ return RTEST(result);
+ }
+
+ time_t beg = time(0);
if (argc == 0) {
rb_thread_sleep_forever();
}
@@ -4903,7 +4908,7 @@ rb_f_sleep(int argc, VALUE *argv, VALUE _)
rb_thread_wait_for(rb_time_interval(argv[0]));
}
- end = time(0) - beg;
+ time_t end = time(0) - beg;
return INT2FIX(end);
}
diff --git a/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb b/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb
index b8def7e930..df44a50afa 100644
--- a/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb
+++ b/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb
@@ -24,7 +24,7 @@ describe "BasicSocket#read_nonblock" do
platform_is :linux do
it 'does not set the IO in nonblock mode' do
require 'io/nonblock'
- @r.should_not.nonblock?
+ @r.nonblock = false
IO.select([@r], nil, nil, 2)
@r.read_nonblock(3).should == "aaa"
@r.should_not.nonblock?
@@ -34,7 +34,7 @@ describe "BasicSocket#read_nonblock" do
platform_is_not :linux, :windows do
it 'sets the IO in nonblock mode' do
require 'io/nonblock'
- @r.should_not.nonblock?
+ @r.nonblock = false
IO.select([@r], nil, nil, 2)
@r.read_nonblock(3).should == "aaa"
@r.should.nonblock?
diff --git a/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb b/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb
index c385beab24..523e732959 100644
--- a/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb
+++ b/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb
@@ -25,7 +25,7 @@ describe "BasicSocket#write_nonblock" do
platform_is :linux do
it 'does not set the IO in nonblock mode' do
require 'io/nonblock'
- @w.should_not.nonblock?
+ @w.nonblock = false
@w.write_nonblock("aaa").should == 3
@w.should_not.nonblock?
end
@@ -34,7 +34,7 @@ describe "BasicSocket#write_nonblock" do
platform_is_not :linux, :windows do
it 'sets the IO in nonblock mode' do
require 'io/nonblock'
- @w.should_not.nonblock?
+ @w.nonblock = false
@w.write_nonblock("aaa").should == 3
@w.should.nonblock?
end
diff --git a/test/ruby/test_fiber.rb b/test/ruby/test_fiber.rb
index 7070fdf03c..4d103a7f76 100644
--- a/test/ruby/test_fiber.rb
+++ b/test/ruby/test_fiber.rb
@@ -347,51 +347,6 @@ class TestFiber < Test::Unit::TestCase
EOS
end
- def invoke_rec script, vm_stack_size, machine_stack_size, use_length = true
- env = {}
- env['RUBY_FIBER_VM_STACK_SIZE'] = vm_stack_size.to_s if vm_stack_size
- env['RUBY_FIBER_MACHINE_STACK_SIZE'] = machine_stack_size.to_s if machine_stack_size
- out = Dir.mktmpdir("test_fiber") {|tmpdir|
- out, err, status = EnvUtil.invoke_ruby([env, '-e', script], '', true, true, chdir: tmpdir, timeout: 30)
- assert(!status.signaled?, FailDesc[status, nil, err])
- out
- }
- use_length ? out.length : out
- end
-
- def test_stack_size
- skip 'too unstable on riscv' if RUBY_PLATFORM =~ /riscv/
- h_default = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', nil, nil, false))
- h_0 = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', 0, 0, false))
- h_large = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', 1024 * 1024 * 5, 1024 * 1024 * 10, false))
-
- assert_operator(h_default[:fiber_vm_stack_size], :>, h_0[:fiber_vm_stack_size])
- assert_operator(h_default[:fiber_vm_stack_size], :<, h_large[:fiber_vm_stack_size])
- assert_operator(h_default[:fiber_machine_stack_size], :>=, h_0[:fiber_machine_stack_size])
- assert_operator(h_default[:fiber_machine_stack_size], :<=, h_large[:fiber_machine_stack_size])
-
- # check VM machine stack size
- script = '$stdout.sync=true; def rec; print "."; rec; end; Fiber.new{rec}.resume'
- size_default = invoke_rec script, nil, nil
- assert_operator(size_default, :>, 0)
- size_0 = invoke_rec script, 0, nil
- assert_operator(size_default, :>, size_0)
- size_large = invoke_rec script, 1024 * 1024 * 5, nil
- assert_operator(size_default, :<, size_large)
-
- return if /mswin|mingw/ =~ RUBY_PLATFORM
-
- # check machine stack size
- # Note that machine stack size may not change size (depend on OSs)
- script = '$stdout.sync=true; def rec; print "."; 1.times{1.times{1.times{rec}}}; end; Fiber.new{rec}.resume'
- vm_stack_size = 1024 * 1024
- size_default = invoke_rec script, vm_stack_size, nil
- size_0 = invoke_rec script, vm_stack_size, 0
- assert_operator(size_default, :>=, size_0)
- size_large = invoke_rec script, vm_stack_size, 1024 * 1024 * 10
- assert_operator(size_default, :<=, size_large)
- end
-
def test_separate_lastmatch
bug7678 = '[ruby-core:51331]'
/a/ =~ "a"
diff --git a/test/ruby/test_stack.rb b/test/ruby/test_stack.rb
new file mode 100644
index 0000000000..6657b9e83c
--- /dev/null
+++ b/test/ruby/test_stack.rb
@@ -0,0 +1,81 @@
+# frozen_string_literal: false
+require 'test/unit'
+require 'tmpdir'
+
+class TestStack < Test::Unit::TestCase
+ LARGE_VM_STACK_SIZE = 1024*1024*5
+ LARGE_MACHINE_STACK_SIZE = 1024*1024*10
+
+ def initialize(*)
+ super
+
+ @h_default = nil
+ @h_0 = nil
+ @h_large = nil
+ end
+
+ def invoke_ruby script, vm_stack_size: nil, machine_stack_size: nil
+ env = {}
+ env['RUBY_FIBER_VM_STACK_SIZE'] = vm_stack_size.to_s if vm_stack_size
+ env['RUBY_FIBER_MACHINE_STACK_SIZE'] = machine_stack_size.to_s if machine_stack_size
+
+ stdout, stderr, status = EnvUtil.invoke_ruby([env, '-e', script], '', true, true, timeout: 30)
+ assert(!status.signaled?, FailDesc[status, nil, stderr])
+
+ return stdout
+ end
+
+ def h_default
+ @h_default ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS'))
+ end
+
+ def h_0
+ @h_0 ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS',
+ vm_stack_size: 0,
+ machine_stack_size: 0
+ ))
+ end
+
+ def h_large
+ @h_large ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS',
+ vm_stack_size: LARGE_VM_STACK_SIZE,
+ machine_stack_size: LARGE_MACHINE_STACK_SIZE
+ ))
+ end
+
+ def test_relative_stack_sizes
+ assert_operator(h_default[:fiber_vm_stack_size], :>, h_0[:fiber_vm_stack_size])
+ assert_operator(h_default[:fiber_vm_stack_size], :<, h_large[:fiber_vm_stack_size])
+ assert_operator(h_default[:fiber_machine_stack_size], :>=, h_0[:fiber_machine_stack_size])
+ assert_operator(h_default[:fiber_machine_stack_size], :<=, h_large[:fiber_machine_stack_size])
+ end
+
+ def test_vm_stack_size
+ script = '$stdout.sync=true; def rec; print "."; rec; end; Fiber.new{rec}.resume'
+
+ size_default = invoke_ruby(script).bytesize
+ assert_operator(size_default, :>, 0)
+
+ size_0 = invoke_ruby(script, vm_stack_size: 0).bytesize
+ assert_operator(size_default, :>, size_0)
+
+ size_large = invoke_ruby(script, vm_stack_size: LARGE_VM_STACK_SIZE).bytesize
+ assert_operator(size_default, :<, size_large)
+ end
+
+ # Depending on OS, machine stack size may not change size.
+ def test_machine_stack_size
+ return if /mswin|mingw/ =~ RUBY_PLATFORM
+
+ script = '$stdout.sync=true; def rec; print "."; 1.times{1.times{1.times{rec}}}; end; Fiber.new{rec}.resume'
+
+ vm_stack_size = 1024 * 1024
+ size_default = invoke_ruby(script, vm_stack_size: vm_stack_size).bytesize
+
+ size_0 = invoke_ruby(script, vm_stack_size: vm_stack_size, machine_stack_size: 0).bytesize
+ assert_operator(size_default, :>=, size_0)
+
+ size_large = invoke_ruby(script, vm_stack_size: vm_stack_size, machine_stack_size: LARGE_MACHINE_STACK_SIZE).bytesize
+ assert_operator(size_default, :<=, size_large)
+ end
+end
diff --git a/test/scheduler/http.rb b/test/scheduler/http.rb
new file mode 100755
index 0000000000..e2a007bc84
--- /dev/null
+++ b/test/scheduler/http.rb
@@ -0,0 +1,53 @@
+
+require 'benchmark'
+
+TOPICS = ["cats", "dogs", "pigs", "skeletons", "zombies", "ocelots", "villagers", "pillagers"]
+
+require 'net/http'
+require 'uri'
+require 'json'
+
+require_relative 'scheduler'
+
+def fetch_topics(topics)
+ responses = {}
+
+ topics.each do |topic|
+ Fiber.new(blocking: Fiber.current.blocking?) do
+ uri = URI("https://www.google.com/search?q=#{topic}")
+ responses[topic] = Net::HTTP.get(uri).scan(topic).size
+ end.resume
+ end
+
+ Thread.scheduler&.run
+
+ return responses
+end
+
+def sweep(repeats: 3, **options)
+ times = (1..8).map do |i|
+ $stderr.puts "Measuring #{i} topic(s)..."
+ topics = TOPICS[0...i]
+
+ Thread.new do
+ Benchmark.realtime do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ repeats.times do
+ Fiber.new(**options) do
+ pp fetch_topics(topics)
+ end.resume
+
+ scheduler.run
+ end
+ end
+ end.value / repeats
+ end
+
+ puts options.inspect
+ puts JSON.dump(times.map{|value| value.round(3)})
+end
+
+sweep(blocking: true)
+sweep(blocking: false)
diff --git a/test/scheduler/scheduler.rb b/test/scheduler/scheduler.rb
new file mode 100644
index 0000000000..b2d36cc728
--- /dev/null
+++ b/test/scheduler/scheduler.rb
@@ -0,0 +1,163 @@
+# frozen_string_literal: true
+
+require 'fiber'
+
+begin
+ require 'io/nonblock'
+rescue LoadError
+ # Ignore.
+end
+
+class Scheduler
+ def initialize
+ @readable = {}
+ @writable = {}
+ @waiting = {}
+ @blocking = []
+
+ @ios = ObjectSpace::WeakMap.new
+ end
+
+ attr :fiber
+
+ attr :readable
+ attr :writable
+ attr :waiting
+ attr :blocking
+
+ def next_timeout
+ fiber, timeout = @waiting.min_by{|key, value| value}
+
+ if timeout
+ offset = timeout - current_time
+
+ if offset < 0
+ return 0
+ else
+ return offset
+ end
+ end
+ end
+
+ def run
+ while @readable.any? or @writable.any? or @waiting.any?
+ # Can only handle file descriptors up to 1024...
+ readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
+
+ # puts "readable: #{readable}" if readable&.any?
+ # puts "writable: #{writable}" if writable&.any?
+
+ readable&.each do |io|
+ @readable[io]&.resume
+ end
+
+ writable&.each do |io|
+ @writable[io]&.resume
+ end
+
+ if @waiting.any?
+ time = current_time
+ waiting = @waiting
+ @waiting = {}
+
+ waiting.each do |fiber, timeout|
+ if timeout <= time
+ fiber.resume
+ else
+ @waiting[fiber] = timeout
+ end
+ end
+ end
+ end
+ end
+
+ def for_fd(fd)
+ @ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
+ end
+
+ def wait_readable(io)
+ @readable[io] = Fiber.current
+
+ Fiber.yield
+
+ @readable.delete(io)
+
+ return true
+ end
+
+ def wait_readable_fd(fd)
+ wait_readable(
+ for_fd(fd)
+ )
+ end
+
+ def wait_writable(io)
+ @writable[io] = Fiber.current
+
+ Fiber.yield
+
+ @writable.delete(io)
+
+ return true
+ end
+
+ def wait_writable_fd(fd)
+ wait_writable(
+ for_fd(fd)
+ )
+ end
+
+ def current_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+
+ def wait_sleep(duration = nil)
+ @waiting[Fiber.current] = current_time + duration
+
+ Fiber.yield
+
+ return true
+ end
+
+ def wait_any(io, events, duration)
+ unless (events & IO::WAIT_READABLE).zero?
+ @readable[io] = Fiber.current
+ end
+
+ unless (events & IO::WAIT_WRITABLE).zero?
+ @writable[io] = Fiber.current
+ end
+
+ Fiber.yield
+
+ @readable.delete(io)
+ @writable.delete(io)
+
+ return true
+ end
+
+ def wait_for_single_fd(fd, events, duration)
+ wait_any(
+ for_fd(fd),
+ events,
+ duration
+ )
+ end
+
+ def enter_blocking_region
+ # puts "Enter blocking region: #{caller.first}"
+ end
+
+ def exit_blocking_region
+ # puts "Exit blocking region: #{caller.first}"
+ @blocking << caller.first
+ end
+
+ def fiber(&block)
+ fiber = Fiber.new(blocking: false, &block)
+
+ fiber.resume
+
+ return fiber
+ end
+end
diff --git a/test/scheduler/test_enumerator.rb b/test/scheduler/test_enumerator.rb
new file mode 100644
index 0000000000..7c97382c52
--- /dev/null
+++ b/test/scheduler/test_enumerator.rb
@@ -0,0 +1,45 @@
+# frozen_string_literal: true
+require 'test/unit'
+require 'socket'
+require_relative 'scheduler'
+
+class TestSchedulerEnumerator < Test::Unit::TestCase
+ MESSAGE = "Hello World"
+
+ def test_read_characters
+ skip unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ skip unless i.nonblock? && o.nonblock?
+
+ message = String.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ e = i.to_enum(:each_char)
+
+ Fiber do
+ o.write("Hello World")
+ o.close
+ end
+
+ Fiber do
+ begin
+ while c = e.next
+ message << c
+ end
+ rescue StopIteration
+ # Ignore.
+ end
+
+ i.close
+ end
+ end
+
+ thread.join
+
+ assert_equal(MESSAGE, message)
+ end
+end
diff --git a/test/scheduler/test_fiber.rb b/test/scheduler/test_fiber.rb
new file mode 100644
index 0000000000..3452591cd9
--- /dev/null
+++ b/test/scheduler/test_fiber.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerFiber < Test::Unit::TestCase
+ def test_fiber_without_scheduler
+ # Cannot create fiber without scheduler.
+ assert_raise RuntimeError do
+ Fiber do
+ end
+ end
+ end
+
+ def test_fiber_blocking
+ scheduler = Scheduler.new
+
+ thread = Thread.new do
+ Thread.current.scheduler = scheduler
+
+ # Close is always a blocking operation.
+ IO.pipe.each(&:close)
+ end
+
+ thread.join
+
+ assert_not_empty scheduler.blocking
+ assert_match /test_fiber.rb:\d+:in `close'/, scheduler.blocking.last
+ end
+end
diff --git a/test/scheduler/test_http.rb b/test/scheduler/test_http.rb
new file mode 100644
index 0000000000..82aa73ca35
--- /dev/null
+++ b/test/scheduler/test_http.rb
@@ -0,0 +1,28 @@
+# frozen_string_literal: true
+
+require 'net/http'
+require 'uri'
+require 'openssl'
+
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerHTTP < Test::Unit::TestCase
+ def test_get
+ Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ uri = URI("https://www.ruby-lang.org/en/")
+
+ http = Net::HTTP.new uri.host, uri.port
+ http.use_ssl = true
+ http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ body = http.get(uri.path).body
+
+ assert !body.empty?
+ end
+ end.join
+ end
+end
diff --git a/test/scheduler/test_io.rb b/test/scheduler/test_io.rb
new file mode 100644
index 0000000000..ef46d1ac2c
--- /dev/null
+++ b/test/scheduler/test_io.rb
@@ -0,0 +1,35 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerIO < Test::Unit::TestCase
+ MESSAGE = "Hello World"
+
+ def test_read
+ skip unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ skip unless i.nonblock? && o.nonblock?
+
+ message = nil
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ message = i.read(20)
+ i.close
+ end
+
+ Fiber do
+ o.write("Hello World")
+ o.close
+ end
+ end
+
+ thread.join
+
+ assert_equal MESSAGE, message
+ end
+end
diff --git a/test/scheduler/test_mutex.rb b/test/scheduler/test_mutex.rb
new file mode 100644
index 0000000000..8395e5522f
--- /dev/null
+++ b/test/scheduler/test_mutex.rb
@@ -0,0 +1,47 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerMutex < Test::Unit::TestCase
+ def test_mutex_synchronize
+ mutex = Mutex.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ assert_equal Thread.scheduler, scheduler
+
+ mutex.synchronize do
+ assert_nil Thread.scheduler
+ end
+ end
+ end
+
+ thread.join
+ end
+
+ def test_mutex_deadlock
+ mutex = Mutex.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ assert_equal Thread.scheduler, scheduler
+
+ mutex.synchronize do
+ Fiber.yield
+ end
+ end
+
+ assert_raise ThreadError do
+ mutex.lock
+ end
+ end
+
+ thread.join
+ end
+end
diff --git a/test/scheduler/test_sleep.rb b/test/scheduler/test_sleep.rb
new file mode 100644
index 0000000000..0be760341e
--- /dev/null
+++ b/test/scheduler/test_sleep.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerSleep < Test::Unit::TestCase
+ ITEMS = [0, 1, 2, 3, 4]
+
+ def test_sleep
+ items = []
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ 5.times do |i|
+ Fiber do
+ sleep(i/100.0)
+ items << i
+ end
+ end
+
+ # Should be 5 fibers waiting:
+ assert_equal scheduler.waiting.size, 5
+ end
+
+ thread.join
+
+ assert_equal ITEMS, items
+ end
+end
diff --git a/test/socket/test_basicsocket.rb b/test/socket/test_basicsocket.rb
index c8e9b23f83..7b1c9b4a06 100644
--- a/test/socket/test_basicsocket.rb
+++ b/test/socket/test_basicsocket.rb
@@ -159,8 +159,6 @@ class TestSocket_BasicSocket < Test::Unit::TestCase
set_nb = true
buf = String.new
if ssock.respond_to?(:nonblock?)
- assert_not_predicate(ssock, :nonblock?)
- assert_not_predicate(csock, :nonblock?)
csock.nonblock = ssock.nonblock = false
# Linux may use MSG_DONTWAIT to avoid setting O_NONBLOCK
diff --git a/thread.c b/thread.c
index 7ecc535b88..13fef6be9a 100644
--- a/thread.c
+++ b/thread.c
@@ -109,6 +109,8 @@ static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
+static ID id_wait_for_single_fd;
+
enum SLEEP_FLAGS {
SLEEP_DEADLOCKABLE = 0x1,
SLEEP_SPURIOUS_CHECK = 0x2
@@ -708,6 +710,11 @@ thread_do_start(rb_thread_t *th)
else {
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
}
+
+ VALUE scheduler = th->scheduler;
+ if (scheduler != Qnil) {
+ rb_funcall(scheduler, rb_intern("run"), 0);
+ }
}
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
@@ -1471,6 +1478,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_thread_t *th = rb_ec_thread_ptr(ec);
int saved_errno = 0;
VALUE ubf_th = Qfalse;
+ VALUE scheduler = th->scheduler;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
@@ -1485,6 +1493,10 @@ rb_nogvl(void *(*func)(void *), void *data1,
}
}
+ if (scheduler != Qnil) {
+ rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0);
+ }
+
BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
@@ -1500,6 +1512,10 @@ rb_nogvl(void *(*func)(void *), void *data1,
thread_value(rb_thread_kill(ubf_th));
}
+ if (scheduler != Qnil) {
+ rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0);
+ }
+
errno = saved_errno;
return val;
@@ -3574,6 +3590,63 @@ rb_thread_variables(VALUE thread)
return ary;
}
+VALUE rb_thread_scheduler_get(VALUE thread)
+{
+ rb_thread_t * th = rb_thread_ptr(thread);
+
+ VM_ASSERT(th);
+
+ return th->scheduler;
+}
+
+VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler)
+{
+ rb_thread_t * th = rb_thread_ptr(thread);
+
+ VM_ASSERT(th);
+
+ th->scheduler = scheduler;
+
+ return th->scheduler;
+}
+
+/*
+ * call-seq:
+ * Thread.scheduler -> scheduler or nil
+ *
+ * Returns the current scheduler if scheduling operations are permitted.
+ *
+ */
+
+static VALUE
+rb_thread_scheduler(VALUE klass)
+{
+ return rb_current_thread_scheduler();
+}
+
+VALUE rb_current_thread_scheduler(void)
+{
+ rb_thread_t * th = GET_THREAD();
+
+ VM_ASSERT(th);
+
+ if (th->blocking == 0)
+ return th->scheduler;
+ else
+ return Qnil;
+}
+
+static VALUE
+rb_thread_blocking_p(VALUE thread)
+{
+ unsigned blocking = rb_thread_ptr(thread)->blocking;
+
+ if (blocking == 0)
+ return Qfalse;
+
+ return INT2NUM(blocking);
+}
+
/*
* call-seq:
* thr.thread_variable?(key) -> true or false
@@ -4129,6 +4202,15 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
+static VALUE
+rb_thread_timeout(struct timeval *timeout) {
+ if (timeout) {
+ return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
+ }
+
+ return Qnil;
+}
+
#ifdef USE_POLL
/* The same with linux kernel. TODO: make platform independent definition. */
@@ -4155,6 +4237,14 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct waiting_fd wfd;
int state;
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
+ rb_thread_timeout(timeout)
+ );
+ return RTEST(result);
+ }
+
wfd.th = GET_THREAD();
wfd.fd = fd;
list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
@@ -4287,8 +4377,16 @@ select_single_cleanup(VALUE ptr)
}
int
-rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
+rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
+ rb_thread_timeout(timeout)
+ );
+ return RTEST(result);
+ }
+
rb_fdset_t rfds, wfds, efds;
struct select_args args;
int r;
@@ -4298,7 +4396,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
- args.tv = tv;
+ args.tv = timeout;
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
@@ -5185,6 +5283,8 @@ Init_Thread(void)
sym_immediate = ID2SYM(rb_intern("immediate"));
sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
+ id_wait_for_single_fd = rb_intern("wait_for_single_fd");
+
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
@@ -5223,6 +5323,7 @@ Init_Thread(void)
rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
+ rb_define_method(rb_cThread, "blocking?", rb_thread_blocking_p, 0);
rb_define_method(rb_cThread, "status", rb_thread_status, 0);
rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
@@ -5239,6 +5340,10 @@ Init_Thread(void)
rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
+ rb_define_singleton_method(rb_cThread, "scheduler", rb_thread_scheduler, 0);
+ rb_define_method(rb_cThread, "scheduler", rb_thread_scheduler_get, 0);
+ rb_define_method(rb_cThread, "scheduler=", rb_thread_scheduler_set, 1);
+
rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);
diff --git a/thread_sync.c b/thread_sync.c
index 2e20812f4d..3689dee789 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -190,6 +190,8 @@ mutex_locked(rb_thread_t *th, VALUE self)
mutex->next_mutex = th->keeping_mutexes;
}
th->keeping_mutexes = mutex;
+
+ th->blocking += 1;
}
/*
@@ -365,6 +367,8 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
struct sync_waiter *cur = 0, *next;
rb_mutex_t **th_mutex = &th->keeping_mutexes;
+ th->blocking -= 1;
+
mutex->th = 0;
list_for_each_safe(&mutex->waitq, cur, next, node) {
list_del_init(&cur->node);
@@ -404,8 +408,9 @@ rb_mutex_unlock(VALUE self)
{
const char *err;
rb_mutex_t *mutex = mutex_ptr(self);
+ rb_thread_t *th = GET_THREAD();
- err = rb_mutex_unlock_th(mutex, GET_THREAD());
+ err = rb_mutex_unlock_th(mutex, th);
if (err) rb_raise(rb_eThreadError, "%s", err);
return self;
diff --git a/vm.c b/vm.c
index 6f9c999adb..d286bc7210 100644
--- a/vm.c
+++ b/vm.c
@@ -2620,6 +2620,8 @@ thread_mark(void *ptr)
RUBY_MARK_UNLESS_NULL(th->locking_mutex);
RUBY_MARK_UNLESS_NULL(th->name);
+ RUBY_MARK_UNLESS_NULL(th->scheduler);
+
RUBY_MARK_LEAVE("thread");
}
@@ -2734,6 +2736,10 @@ th_init(rb_thread_t *th, VALUE self)
th->self = self;
rb_threadptr_root_fiber_setup(th);
+ /* All threads are blocking until a non-blocking fiber is scheduled */
+ th->blocking = 1;
+ th->scheduler = Qnil;
+
if (self == 0) {
size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_ec_initialize_vm_stack(th->ec, ALLOC_N(VALUE, size), size);
@@ -3294,12 +3300,14 @@ Init_VM(void)
vm->self = TypedData_Wrap_Struct(rb_cRubyVM, &vm_data_type, vm);
/* create main thread */
- th->self = TypedData_Wrap_Struct(rb_cThread, &thread_data_type, th);
+ th->self = TypedData_Wrap_Struct(rb_cThread, &thread_data_type, th);
+
vm->main_thread = th;
vm->running_thread = th;
th->vm = vm;
th->top_wrapper = 0;
th->top_self = rb_vm_top_self();
+
rb_thread_set_current(th);
rb_vm_living_threads_insert(vm, th);
diff --git a/vm_core.h b/vm_core.h
index e725e4d8d0..4f122cacb8 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -969,13 +969,15 @@ typedef struct rb_thread_struct {
rb_fiber_t *root_fiber;
rb_jmpbuf_t root_jmpbuf;
+ VALUE scheduler;
+ unsigned blocking;
+
/* misc */
VALUE name;
#ifdef USE_SIGALTSTACK
void *altstack;
#endif
-
} rb_thread_t;
typedef enum {