summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common.mk3
-rw-r--r--ext/io/wait/wait.c209
-rw-r--r--include/ruby/io.h9
-rw-r--r--inits.c1
-rw-r--r--internal/scheduler.h27
-rw-r--r--internal/thread.h2
-rw-r--r--io.c81
-rw-r--r--process.c4
-rw-r--r--scheduler.c71
-rw-r--r--test/fiber/scheduler.rb61
-rw-r--r--test/fiber/test_scheduler.rb16
-rw-r--r--test/fiber/test_sleep.rb1
-rw-r--r--thread.c44
13 files changed, 313 insertions, 216 deletions
diff --git a/common.mk b/common.mk
index 094b9abe0a..1578a5f259 100644
--- a/common.mk
+++ b/common.mk
@@ -128,6 +128,7 @@ COMMONOBJS = array.$(OBJEXT) \
regparse.$(OBJEXT) \
regsyntax.$(OBJEXT) \
ruby.$(OBJEXT) \
+ scheduler.$(OBJEXT) \
signal.$(OBJEXT) \
sprintf.$(OBJEXT) \
st.$(OBJEXT) \
@@ -12123,6 +12124,8 @@ ruby.$(OBJEXT): {$(VPATH)}thread_native.h
ruby.$(OBJEXT): {$(VPATH)}util.h
ruby.$(OBJEXT): {$(VPATH)}vm_core.h
ruby.$(OBJEXT): {$(VPATH)}vm_opts.h
+scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
+scheduler.$(OBJEXT): {$(VPATH)}internal/scheduler.h
setproctitle.$(OBJEXT): $(hdrdir)/ruby.h
setproctitle.$(OBJEXT): $(hdrdir)/ruby/ruby.h
setproctitle.$(OBJEXT): {$(VPATH)}assert.h
diff --git a/ext/io/wait/wait.c b/ext/io/wait/wait.c
index d846bba49e..a82295f72e 100644
--- a/ext/io/wait/wait.c
+++ b/ext/io/wait/wait.c
@@ -39,35 +39,6 @@
#define FIONREAD_POSSIBLE_P(fd) ((void)(fd),Qtrue)
#endif
-static VALUE io_ready_p _((VALUE io));
-static VALUE io_wait_readable _((int argc, VALUE *argv, VALUE io));
-static VALUE io_wait_writable _((int argc, VALUE *argv, VALUE io));
-void Init_wait _((void));
-
-static struct timeval *
-get_timeout(int argc, VALUE *argv, struct timeval *timerec)
-{
- VALUE timeout = Qnil;
- rb_check_arity(argc, 0, 1);
- if (!argc || NIL_P(timeout = argv[0])) {
- return NULL;
- }
- else {
- *timerec = rb_time_interval(timeout);
- return timerec;
- }
-}
-
-static int
-wait_for_single_fd(rb_io_t *fptr, int events, struct timeval *tv)
-{
- int i = rb_wait_for_single_fd(fptr->fd, events, tv);
- if (i < 0)
- rb_sys_fail(0);
- rb_io_check_closed(fptr);
- return (i & events);
-}
-
/*
* call-seq:
* io.nread -> int
@@ -79,13 +50,12 @@ wait_for_single_fd(rb_io_t *fptr, int events, struct timeval *tv)
static VALUE
io_nread(VALUE io)
{
- rb_io_t *fptr;
- int len;
+ rb_io_t *fptr = NULL;
ioctl_arg n;
GetOpenFile(io, fptr);
rb_io_check_readable(fptr);
- len = rb_io_read_pending(fptr);
+ int len = rb_io_read_pending(fptr);
if (len > 0) return INT2FIX(len);
if (!FIONREAD_POSSIBLE_P(fptr->fd)) return INT2FIX(0);
if (ioctl(fptr->fd, FIONREAD, &n)) return INT2FIX(0);
@@ -93,76 +63,113 @@ io_nread(VALUE io)
return INT2FIX(0);
}
+static VALUE
+io_wait_event(VALUE io, int event, VALUE timeout)
+{
+ VALUE result = rb_io_wait(io, RB_INT2NUM(event), timeout);
+
+ if (!RB_TEST(result)) {
+ return Qnil;
+ }
+
+ int mask = RB_NUM2INT(result);
+
+ if (mask & event) {
+ return io;
+ } else {
+ return Qfalse;
+ }
+}
+
/*
* call-seq:
* io.ready? -> true or false
*
- * Returns true if input available without blocking, or false.
+ * Returns +true+ if input available without blocking, or +false+.
*/
static VALUE
io_ready_p(VALUE io)
{
rb_io_t *fptr;
- struct timeval tv = {0, 0};
GetOpenFile(io, fptr);
rb_io_check_readable(fptr);
if (rb_io_read_pending(fptr)) return Qtrue;
- if (wait_for_single_fd(fptr, RB_WAITFD_IN, &tv))
- return Qtrue;
- return Qfalse;
+
+ return io_wait_event(io, RUBY_IO_READABLE, RB_INT2NUM(0));
}
/*
* call-seq:
- * io.wait_readable -> IO, true or nil
- * io.wait_readable(timeout) -> IO, true or nil
+ * io.wait_readable -> true or false
+ * io.wait_readable(timeout) -> true or false
*
- * Waits until IO is readable without blocking and returns +self+, or
- * +nil+ when times out.
+ * Waits until IO is readable and returns +true+, or
+ * +false+ when times out.
* Returns +true+ immediately when buffered data is available.
*/
static VALUE
io_wait_readable(int argc, VALUE *argv, VALUE io)
{
- rb_io_t *fptr;
- struct timeval timerec;
- struct timeval *tv;
+ rb_io_t *fptr = NULL;
- GetOpenFile(io, fptr);
+ RB_IO_POINTER(io, fptr);
rb_io_check_readable(fptr);
- tv = get_timeout(argc, argv, &timerec);
+
if (rb_io_read_pending(fptr)) return Qtrue;
- if (wait_for_single_fd(fptr, RB_WAITFD_IN, tv)) {
- return io;
- }
- return Qnil;
+
+ rb_check_arity(argc, 0, 1);
+ VALUE timeout = (argc == 1 ? argv[0] : Qnil);
+
+ return io_wait_event(io, RUBY_IO_READABLE, timeout);
}
/*
* call-seq:
- * io.wait_writable -> IO
- * io.wait_writable(timeout) -> IO or nil
+ * io.wait_writable -> true or false
+ * io.wait_writable(timeout) -> true or false
*
- * Waits until IO is writable without blocking and returns +self+ or
- * +nil+ when times out.
+ * Waits until IO is writable and returns +true+ or
+ * +false+ when times out.
*/
static VALUE
io_wait_writable(int argc, VALUE *argv, VALUE io)
{
- rb_io_t *fptr;
- struct timeval timerec;
- struct timeval *tv;
+ rb_io_t *fptr = NULL;
- GetOpenFile(io, fptr);
+ RB_IO_POINTER(io, fptr);
rb_io_check_writable(fptr);
- tv = get_timeout(argc, argv, &timerec);
- if (wait_for_single_fd(fptr, RB_WAITFD_OUT, tv)) {
- return io;
- }
- return Qnil;
+
+ rb_check_arity(argc, 0, 1);
+ VALUE timeout = (argc == 1 ? argv[0] : Qnil);
+
+ return io_wait_event(io, RUBY_IO_WRITABLE, timeout);
+}
+
+/*
+ * call-seq:
+ * io.wait_priority -> true or false
+ * io.wait_priority(timeout) -> true or false
+ *
+ * Waits until IO is priority and returns +true+ or
+ * +false+ when times out.
+ */
+static VALUE
+io_wait_priority(int argc, VALUE *argv, VALUE io)
+{
+ rb_io_t *fptr = NULL;
+
+ RB_IO_POINTER(io, fptr);
+ rb_io_check_readable(fptr);
+
+ if (rb_io_read_pending(fptr)) return Qtrue;
+
+ rb_check_arity(argc, 0, 1);
+ VALUE timeout = argc == 1 ? argv[0] : Qnil;
+
+ return io_wait_event(io, RUBY_IO_PRIORITY, timeout);
}
static int
@@ -201,41 +208,60 @@ wait_mode_sym(VALUE mode)
/*
* call-seq:
- * io.wait(timeout = nil, mode = :read) -> IO, true or nil
+ * io.wait(events, timeout) -> event mask or false.
+ * io.wait(timeout = nil, mode = :read) -> event mask or false (deprecated)
+ *
+ * Waits until the IO becomes ready for the specified events and returns the
+ * subset of events that become ready, or +false+ when times out.
+ *
+ * The events can be a bit mask of +IO::READABLE+, +IO::WRITABLE+ or
+ * +IO::PRIORITY+.
*
- * Waits until IO is readable or writable without blocking and returns
- * +self+, or +nil+ when times out.
* Returns +true+ immediately when buffered data is available.
+ *
* Optional parameter +mode+ is one of +:read+, +:write+, or
- * +:read_write+.
+ * +:read_write+ (deprecated).
*/
static VALUE
-io_wait_readwrite(int argc, VALUE *argv, VALUE io)
+io_wait(int argc, VALUE *argv, VALUE io)
{
- rb_io_t *fptr;
- struct timeval timerec;
- struct timeval *tv = NULL;
- int event = 0;
- int i;
+ VALUE timeout = Qnil;
+ rb_io_event_t events = 0;
- GetOpenFile(io, fptr);
- for (i = 0; i < argc; ++i) {
- if (SYMBOL_P(argv[i])) {
- event |= wait_mode_sym(argv[i]);
- }
- else {
- *(tv = &timerec) = rb_time_interval(argv[i]);
- }
+ if (argc < 2 || (argc >= 2 && RB_SYMBOL_P(argv[1]))) {
+ if (argc > 0) {
+ timeout = argv[0];
+ }
+
+ for (int i = 1; i < argc; i += 1) {
+ events |= wait_mode_sym(argv[i]);
+ }
+ } else if (argc == 2) {
+ events = RB_NUM2UINT(argv[0]);
+
+ if (argv[1] != Qnil) {
+ timeout = argv[1];
+ }
+ } else {
+ // TODO error
+ return Qnil;
}
- /* rb_time_interval() and might_mode() might convert the argument */
- rb_io_check_closed(fptr);
- if (!event) event = RB_WAITFD_IN;
- if ((event & RB_WAITFD_IN) && rb_io_read_pending(fptr))
- return Qtrue;
- if (wait_for_single_fd(fptr, event, tv))
- return io;
- return Qnil;
+
+ if (events == 0) {
+ events = RUBY_IO_READABLE;
+ }
+
+ if (events & RUBY_IO_READABLE) {
+ rb_io_t *fptr = NULL;
+ RB_IO_POINTER(io, fptr);
+
+ if (rb_io_read_pending(fptr)) {
+ return Qtrue;
+ }
+ }
+
+ return io_wait_event(io, events, timeout);
}
/*
@@ -247,7 +273,10 @@ Init_wait(void)
{
rb_define_method(rb_cIO, "nread", io_nread, 0);
rb_define_method(rb_cIO, "ready?", io_ready_p, 0);
- rb_define_method(rb_cIO, "wait", io_wait_readwrite, -1);
+
+ rb_define_method(rb_cIO, "wait", io_wait, -1);
+
rb_define_method(rb_cIO, "wait_readable", io_wait_readable, -1);
rb_define_method(rb_cIO, "wait_writable", io_wait_writable, -1);
+ rb_define_method(rb_cIO, "wait_priority", io_wait_priority, -1);
}
diff --git a/include/ruby/io.h b/include/ruby/io.h
index 5774a3fc10..fc6240adcc 100644
--- a/include/ruby/io.h
+++ b/include/ruby/io.h
@@ -147,14 +147,17 @@ VALUE rb_io_get_io(VALUE io);
VALUE rb_io_check_io(VALUE io);
VALUE rb_io_get_write_io(VALUE io);
VALUE rb_io_set_write_io(VALUE io, VALUE w);
-int rb_io_wait_readable(int);
-int rb_io_wait_writable(int);
-int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
void rb_io_set_nonblock(rb_io_t *fptr);
int rb_io_extract_encoding_option(VALUE opt, rb_encoding **enc_p, rb_encoding **enc2_p, int *fmode_p);
void rb_io_extract_modeenc(VALUE *vmode_p, VALUE *vperm_p, VALUE opthash, int *oflags_p, int *fmode_p, rb_io_enc_t *convconfig_p);
ssize_t rb_io_bufwrite(VALUE io, const void *buf, size_t size);
+int rb_io_wait_readable(int);
+int rb_io_wait_writable(int);
+int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
+
+VALUE rb_io_wait(VALUE io, VALUE events, VALUE timeout);
+
/* compatibility for ruby 1.8 and older */
#define rb_io_mode_flags(modestr) [<"rb_io_mode_flags() is obsolete; use rb_io_modestr_fmode()">]
#define rb_io_modenum_flags(oflags) [<"rb_io_modenum_flags() is obsolete; use rb_io_oflags_fmode()">]
diff --git a/inits.c b/inits.c
index f8eba911d5..b36b162a42 100644
--- a/inits.c
+++ b/inits.c
@@ -65,6 +65,7 @@ rb_call_inits(void)
CALL(VM);
CALL(ISeq);
CALL(Thread);
+ CALL(Scheduler);
CALL(process);
CALL(Cont);
CALL(Rational);
diff --git a/internal/scheduler.h b/internal/scheduler.h
new file mode 100644
index 0000000000..be976d7be5
--- /dev/null
+++ b/internal/scheduler.h
@@ -0,0 +1,27 @@
+#ifndef RUBY_SCHEDULER_H /*-*-C-*-vi:se ft=c:*/
+#define RUBY_SCHEDULER_H
+/**
+ * @file
+ * @author Ruby developers <ruby-core@ruby-lang.org>
+ * @copyright This file is a part of the programming language Ruby.
+ * Permission is hereby granted, to either redistribute and/or
+ * modify this file, provided that the conditions mentioned in the
+ * file COPYING are met. Consult the file for details.
+ * @brief Internal header for Scheduler.
+ */
+#include "ruby/ruby.h"
+#include "ruby/intern.h"
+
+VALUE rb_scheduler_timeout(struct timeval *timeout);
+
+VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
+VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
+
+VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);
+VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io);
+VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
+
+VALUE rb_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length);
+VALUE rb_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length);
+
+#endif /* RUBY_SCHEDULER_H */
diff --git a/internal/thread.h b/internal/thread.h
index e7a9019b88..13e419bc95 100644
--- a/internal/thread.h
+++ b/internal/thread.h
@@ -37,6 +37,8 @@ void rb_mutex_allow_trap(VALUE self, int val);
VALUE rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data);
VALUE rb_mutex_owned_p(VALUE self);
+int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);
+
VALUE rb_thread_scheduler_get(VALUE thread);
VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler);
diff --git a/io.c b/io.c
index 08865931bf..fbc913bad8 100644
--- a/io.c
+++ b/io.c
@@ -13,6 +13,8 @@
#include "ruby/internal/config.h"
+#include "internal/scheduler.h"
+
#ifdef _WIN32
# include "ruby/ruby.h"
# include "ruby/io.h"
@@ -213,6 +215,8 @@ static VALUE sym_DATA;
static VALUE sym_HOLE;
#endif
+static VALUE rb_io_initialize(int argc, VALUE *argv, VALUE io);
+
struct argf {
VALUE filename, current_file;
long last_lineno; /* $. */
@@ -1256,13 +1260,65 @@ io_fflush(rb_io_t *fptr)
return 0;
}
+VALUE
+rb_io_wait(VALUE io, VALUE events, VALUE timeout) {
+ VALUE scheduler = rb_thread_current_scheduler();
+
+ if (scheduler != Qnil) {
+ return rb_scheduler_io_wait(scheduler, io, events, timeout);
+ }
+
+ rb_io_t * fptr = NULL;
+ RB_IO_POINTER(io, fptr);
+
+ struct timeval tv_storage;
+ struct timeval *tv = NULL;
+
+ if (timeout != Qnil) {
+ tv_storage = rb_time_interval(timeout);
+ tv = &tv_storage;
+ }
+
+ int ready = rb_thread_wait_for_single_fd(fptr->fd, RB_NUM2INT(events), tv);
+
+ if (ready < 0) {
+ rb_sys_fail(0);
+ }
+
+ // Not sure if this is necessary:
+ rb_io_check_closed(fptr);
+
+ if (ready > 0) {
+ return RB_INT2NUM(ready);
+ } else {
+ return Qfalse;
+ }
+}
+
+VALUE
+rb_io_from_fd(int f)
+{
+ VALUE io = rb_obj_alloc(rb_cIO);
+ VALUE argv[] = {RB_INT2NUM(f)};
+
+ rb_io_initialize(1, argv, io);
+
+ rb_io_t *fptr;
+ RB_IO_POINTER(io, fptr);
+
+ fptr->mode &= ~FMODE_PREP;
+
+ return io;
+}
+
int
rb_io_wait_readable(int f)
{
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
- VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f));
- return RTEST(result);
+ return RTEST(
+ rb_scheduler_io_wait_readable(scheduler, rb_io_from_fd(f))
+ );
}
io_fd_check_closed(f);
@@ -1291,8 +1347,9 @@ rb_io_wait_writable(int f)
{
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
- VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f));
- return RTEST(result);
+ return RTEST(
+ rb_scheduler_io_wait_writable(scheduler, rb_io_from_fd(f))
+ );
}
io_fd_check_closed(f);
@@ -1325,6 +1382,20 @@ rb_io_wait_writable(int f)
}
}
+int
+rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+{
+ VALUE scheduler = rb_thread_current_scheduler();
+
+ if (scheduler != Qnil) {
+ return RTEST(
+ rb_scheduler_io_wait(scheduler, rb_io_from_fd(fd), RB_INT2NUM(events), rb_scheduler_timeout(timeout))
+ );
+ }
+
+ return rb_thread_wait_for_single_fd(fd, events, timeout);
+}
+
static void
make_writeconv(rb_io_t *fptr)
{
@@ -10975,7 +11046,7 @@ rb_thread_scheduler_wait_for_single_fd(void * _args)
{
struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
- args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil);
+ args->result = rb_scheduler_io_wait(args->scheduler, rb_io_from_fd(args->fd), INT2NUM(args->events), Qnil);
return NULL;
}
diff --git a/process.c b/process.c
index 34dd986953..317f7ff3ec 100644
--- a/process.c
+++ b/process.c
@@ -13,6 +13,8 @@
#include "ruby/internal/config.h"
+#include "internal/scheduler.h"
+
#include <ctype.h>
#include <errno.h>
#include <signal.h>
@@ -4927,7 +4929,7 @@ rb_f_sleep(int argc, VALUE *argv, VALUE _)
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
if (scheduler != Qnil) {
- rb_funcallv(scheduler, rb_intern("wait_sleep"), argc, argv);
+ rb_scheduler_kernel_sleepv(scheduler, argc, argv);
}
else {
if (argc == 0) {
diff --git a/scheduler.c b/scheduler.c
new file mode 100644
index 0000000000..4eaf12b333
--- /dev/null
+++ b/scheduler.c
@@ -0,0 +1,71 @@
+/**********************************************************************
+
+ scheduler.c
+
+ $Author$
+
+ Copyright (C) 2020 Samuel Grant Dawson Williams
+
+**********************************************************************/
+
+#include "internal/scheduler.h"
+#include "ruby/io.h"
+
+static ID id_kernel_sleep;
+static ID id_io_read;
+static ID id_io_write;
+static ID id_io_wait;
+
+void
+Init_Scheduler(void)
+{
+ id_kernel_sleep = rb_intern_const("kernel_sleep");
+ id_io_read = rb_intern_const("io_read");
+ id_io_write = rb_intern_const("io_write");
+ id_io_wait = rb_intern_const("io_wait");
+}
+
+VALUE
+rb_scheduler_timeout(struct timeval *timeout) {
+ if (timeout) {
+ return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
+ }
+
+ return Qnil;
+}
+
+VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
+{
+ return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
+}
+
+VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
+{
+ return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
+}
+
+VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
+{
+ return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
+}
+
+VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
+{
+ return rb_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), Qnil);
+}
+
+VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
+{
+ return rb_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), Qnil);
+}
+
+VALUE rb_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length)
+{
+ return rb_funcall(scheduler, id_io_read, 4, io, buffer, offset, length);
+}
+
+VALUE rb_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length)
+{
+ // We should ensure string has capacity to receive data, and then resize it afterwards.
+ return rb_funcall(scheduler, id_io_write, 4, io, buffer, offset, length);
+}
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 5508f21cf7..1f690b4c08 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -14,15 +14,11 @@ class Scheduler
@readable = {}
@writable = {}
@waiting = {}
- @blocking = []
-
- @ios = ObjectSpace::WeakMap.new
end
attr :readable
attr :writable
attr :waiting
- attr :blocking
def next_timeout
_fiber, timeout = @waiting.min_by{|key, value| value}
@@ -70,47 +66,11 @@ class Scheduler
end
end
- def for_fd(fd)
- @ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
- end
-
- def wait_readable(io)
- @readable[io] = Fiber.current
-
- Fiber.yield
-
- @readable.delete(io)
-
- return true
- end
-
- def wait_readable_fd(fd)
- wait_readable(
- for_fd(fd)
- )
- end
-
- def wait_writable(io)
- @writable[io] = Fiber.current
-
- Fiber.yield
-
- @writable.delete(io)
-
- return true
- end
-
- def wait_writable_fd(fd)
- wait_writable(
- for_fd(fd)
- )
- end
-
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
- def wait_sleep(duration = nil)
+ def kernel_sleep(duration = nil)
@waiting[Fiber.current] = current_time + duration
Fiber.yield
@@ -118,7 +78,7 @@ class Scheduler
return true
end
- def wait_any(io, events, duration)
+ def io_wait(io, events, duration)
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
end
@@ -135,23 +95,6 @@ class Scheduler
return true
end
- def wait_for_single_fd(fd, events, duration)
- wait_any(
- for_fd(fd),
- events,
- duration
- )
- end
-
- def enter_blocking_region
- # puts "Enter blocking region: #{caller.first}"
- end
-
- def exit_blocking_region
- # puts "Exit blocking region: #{caller.first}"
- @blocking << caller.first
- end
-
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb
index 52f10846ac..5055a95999 100644
--- a/test/fiber/test_scheduler.rb
+++ b/test/fiber/test_scheduler.rb
@@ -10,20 +10,4 @@ class TestFiberScheduler < Test::Unit::TestCase
end
end
end
-
- def test_fiber_blocking
- scheduler = Scheduler.new
-
- thread = Thread.new do
- Thread.current.scheduler = scheduler
-
- # Close is always a blocking operation.
- IO.pipe.each(&:close)
- end
-
- thread.join
-
- assert_not_empty scheduler.blocking
- assert_match(/test_scheduler\.rb:\d+:in `close'/, scheduler.blocking.last)
- end
end
diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb
index 84e9d27550..f1d9f79fc8 100644
--- a/test/fiber/test_sleep.rb
+++ b/test/fiber/test_sleep.rb
@@ -43,5 +43,4 @@ class TestFiberSleep < Test::Unit::TestCase
assert_operator seconds, :>=, 2, "actual: %p" % seconds
end
-
end
diff --git a/thread.c b/thread.c
index d10d4118c3..d0ebfff882 100644
--- a/thread.c
+++ b/thread.c
@@ -112,8 +112,6 @@ static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
-static ID id_wait_for_single_fd;
-
enum SLEEP_FLAGS {
SLEEP_DEADLOCKABLE = 0x1,
SLEEP_SPURIOUS_CHECK = 0x2
@@ -1603,7 +1601,6 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_thread_t *th = rb_ec_thread_ptr(ec);
int saved_errno = 0;
VALUE ubf_th = Qfalse;
- VALUE scheduler = th->scheduler;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
@@ -1618,10 +1615,6 @@ rb_nogvl(void *(*func)(void *), void *data1,
}
}
- if (scheduler != Qnil) {
- rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0);
- }
-
BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
@@ -1637,10 +1630,6 @@ rb_nogvl(void *(*func)(void *), void *data1,
thread_value(rb_thread_kill(ubf_th));
}
- if (scheduler != Qnil) {
- rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0);
- }
-
errno = saved_errno;
return val;
@@ -3749,7 +3738,7 @@ rb_thread_scheduler(VALUE klass)
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
}
-static VALUE
+VALUE
rb_thread_current_scheduler()
{
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
@@ -4332,15 +4321,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
-static VALUE
-rb_thread_timeout(struct timeval *timeout) {
- if (timeout) {
- return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
- }
-
- return Qnil;
-}
-
#ifdef USE_POLL
/* The same with linux kernel. TODO: make platform independent definition. */
@@ -4356,7 +4336,7 @@ rb_thread_timeout(struct timeval *timeout) {
* returns a mask of events
*/
int
-rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
struct pollfd fds[2];
int result = 0, lerrno;
@@ -4367,14 +4347,6 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct waiting_fd wfd;
int state;
- VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
- if (scheduler != Qnil) {
- VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
- rb_thread_timeout(timeout)
- );
- return RTEST(result);
- }
-
wfd.th = GET_THREAD();
wfd.fd = fd;
@@ -4513,16 +4485,8 @@ select_single_cleanup(VALUE ptr)
}
int
-rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
+rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
- VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
- if (scheduler != Qnil) {
- VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
- rb_thread_timeout(timeout)
- );
- return RTEST(result);
- }
-
rb_fdset_t rfds, wfds, efds;
struct select_args args;
int r;
@@ -5450,8 +5414,6 @@ Init_Thread(void)
sym_immediate = ID2SYM(rb_intern("immediate"));
sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
- id_wait_for_single_fd = rb_intern("wait_for_single_fd");
-
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);