summaryrefslogtreecommitdiff
path: root/test/fiber/scheduler.rb
diff options
context:
space:
mode:
Diffstat (limited to 'test/fiber/scheduler.rb')
-rw-r--r--test/fiber/scheduler.rb233
1 files changed, 173 insertions, 60 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 96b22856d1..3926226ca3 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -1,8 +1,13 @@
# frozen_string_literal: true
# This is an example and simplified scheduler for test purposes.
-# It is not efficient for a large number of file descriptors as it uses IO.select().
-# Production Fiber schedulers should use epoll/kqueue/etc.
+# - It is not efficient for a large number of file descriptors as it uses
+# IO.select().
+# - It does not correctly handle multiple calls to `wait` with the same file
+# descriptor and overlapping events.
+# - Production fiber schedulers should use epoll/kqueue/etc. Consider using the
+# [`io-event`](https://github.com/socketry/io-event) gem instead of this
+# scheduler if you want something simple to build on.
require 'fiber'
require 'socket'
@@ -22,7 +27,9 @@ class Scheduler
Warning[:experimental] = experimental
end
- def initialize
+ def initialize(fiber = Fiber.current)
+ @fiber = fiber
+
@readable = {}
@writable = {}
@waiting = {}
@@ -40,6 +47,10 @@ class Scheduler
attr :writable
attr :waiting
+ def transfer
+ @fiber.transfer
+ end
+
def next_timeout
_fiber, timeout = @waiting.min_by{|key, value| value}
@@ -58,7 +69,7 @@ class Scheduler
# $stderr.puts [__method__, Fiber.current].inspect
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
- # Can only handle file descriptors up to 1024...
+ # May only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
@@ -83,7 +94,7 @@ class Scheduler
end
selected.each do |fiber, events|
- fiber.resume(events)
+ fiber.transfer(events)
end
if @waiting.any?
@@ -93,7 +104,7 @@ class Scheduler
waiting.each do |fiber, timeout|
if fiber.alive?
if timeout <= time
- fiber.resume
+ fiber.transfer
else
@waiting[fiber] = timeout
end
@@ -109,16 +120,22 @@ class Scheduler
end
ready.each do |fiber|
- fiber.resume
+ fiber.transfer
end
end
end
end
+ # A fiber scheduler hook, invoked when the scheduler goes out of scope.
def scheduler_close
close(true)
end
+ # If the `scheduler_close` hook does not exist, this method `close` will be
+ # invoked instead when the fiber scheduler goes out of scope. This is legacy
+ # behaviour, you should almost certainly use `scheduler_close`. The reason for
+ # this, is `scheduler_close` is called when the scheduler goes out of scope,
+ # while `close` may be called by the user.
def close(internal = false)
# $stderr.puts [__method__, Fiber.current].inspect
@@ -153,6 +170,7 @@ class Scheduler
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
+ # This hook is invoked by `Timeout.timeout` and related code.
def timeout_after(duration, klass, message, &block)
fiber = Fiber.current
@@ -171,6 +189,7 @@ class Scheduler
end
end
+ # This hook is invoked by `Process.wait`, `system`, and backticks.
def process_wait(pid, flags)
# $stderr.puts [__method__, pid, flags, Fiber.current].inspect
@@ -180,24 +199,47 @@ class Scheduler
end.value
end
+ # This hook is invoked by `IO#read` and `IO#write` in the case that `io_read`
+ # and `io_write` hooks are not available. This implementation is not
+ # completely general, in the sense that calling `io_wait` multiple times with
+ # the same `io` and `events` will not work, which is okay for tests but not
+ # for real code. Correct fiber schedulers should not have this limitation.
def io_wait(io, events, duration)
# $stderr.puts [__method__, io, events, duration, Fiber.current].inspect
+ fiber = Fiber.current
+
unless (events & IO::READABLE).zero?
- @readable[io] = Fiber.current
+ @readable[io] = fiber
+ readable = true
end
unless (events & IO::WRITABLE).zero?
- @writable[io] = Fiber.current
+ @writable[io] = fiber
+ writable = true
+ end
+
+ if duration
+ @waiting[fiber] = current_time + duration
end
- Fiber.yield
+ @fiber.transfer
ensure
- @readable.delete(io)
- @writable.delete(io)
+ @waiting.delete(fiber) if duration
+ @readable.delete(io) if readable
+ @writable.delete(io) if writable
end
- # Used for Kernel#sleep and Thread::Mutex#sleep
+ # This hook is invoked by `IO.select`. Using a thread ensures that the
+ # operation does not block the fiber scheduler.
+ def io_select(...)
+ # Emulate the operation using a non-blocking thread:
+ Thread.new do
+ IO.select(...)
+ end.value
+ end
+
+ # This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`.
def kernel_sleep(duration = nil)
# $stderr.puts [__method__, duration, Fiber.current].inspect
@@ -206,8 +248,10 @@ class Scheduler
return true
end
- # Used when blocking on synchronization (Thread::Mutex#lock,
- # Thread::Queue#pop, Thread::SizedQueue#push, ...)
+ # This hook is invoked by blocking options such as `Thread::Mutex#lock`,
+ # `Thread::Queue#pop` and `Thread::SizedQueue#push`, which are unblocked by
+ # other threads/fibers. To unblock a blocked fiber, you should call `unblock`
+ # with the same `blocker` and `fiber` arguments.
def block(blocker, timeout = nil)
# $stderr.puts [__method__, blocker, timeout].inspect
@@ -216,7 +260,7 @@ class Scheduler
if timeout
@waiting[fiber] = current_time + timeout
begin
- Fiber.yield
+ @fiber.transfer
ensure
# Remove from @waiting in the case #unblock was called before the timeout expired:
@waiting.delete(fiber)
@@ -224,16 +268,15 @@ class Scheduler
else
@blocking[fiber] = true
begin
- Fiber.yield
+ @fiber.transfer
ensure
@blocking.delete(fiber)
end
end
end
- # Used when synchronization wakes up a previously-blocked fiber
- # (Thread::Mutex#unlock, Thread::Queue#push, ...).
- # This might be called from another thread.
+ # This method is invoked from a thread or fiber to unblock a fiber that is
+ # blocked by `block`. It is expected to be thread safe.
def unblock(blocker, fiber)
# $stderr.puts [__method__, blocker, fiber].inspect
# $stderr.puts blocker.backtrace.inspect
@@ -247,14 +290,20 @@ class Scheduler
io.write_nonblock('.')
end
+ # This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
+ # it to create scheduled fibers, but it is not required in practice;
+ # `Fiber.new` is usually sufficient.
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
- fiber.resume
+ fiber.transfer
return fiber
end
+ # This hook is invoked by `Addrinfo.getaddrinfo`. Using a thread ensures that
+ # the operation does not block the fiber scheduler, since `getaddrinfo` is
+ # usually provided by `libc` and is blocking.
def address_resolve(hostname)
Thread.new do
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
@@ -262,85 +311,133 @@ class Scheduler
end
end
+# This scheduler class implements `io_read` and `io_write` hooks which require
+# `IO::Buffer`.
class IOBufferScheduler < Scheduler
- EAGAIN = Errno::EAGAIN::Errno
+ EAGAIN = -Errno::EAGAIN::Errno
- def io_read(io, buffer, length)
- offset = 0
+ def io_read(io, buffer, length, offset)
+ total = 0
+ io.nonblock = true
while true
maximum_size = buffer.size - offset
- result = blocking{io.read_nonblock(maximum_size, exception: false)}
-
- # blocking{pp read: maximum_size, result: result, length: length}
+ result = blocking{buffer.read(io, maximum_size, offset)}
- case result
- when :wait_readable
+ if result > 0
+ total += result
+ offset += result
+ break if total >= length
+ elsif result == 0
+ break
+ elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
- return -EAGAIN
+ return result
end
- when :wait_writable
+ elsif result < 0
+ return result
+ end
+ end
+
+ return total
+ end
+
+ def io_write(io, buffer, length, offset)
+ total = 0
+ io.nonblock = true
+
+ while true
+ maximum_size = buffer.size - offset
+ result = blocking{buffer.write(io, maximum_size, offset)}
+
+ if result > 0
+ total += result
+ offset += result
+ break if total >= length
+ elsif result == 0
+ break
+ elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
- return -EAGAIN
+ return result
end
- else
- break unless result
-
- buffer.set_string(result, offset)
-
- size = result.bytesize
- offset += size
- break if size >= length
- length -= size
+ elsif result < 0
+ return result
end
end
- return offset
+ return total
end
- def io_write(io, buffer, length)
- offset = 0
+ def io_pread(io, buffer, from, length, offset)
+ total = 0
+ io.nonblock = true
while true
maximum_size = buffer.size - offset
+ result = blocking{buffer.pread(io, from, maximum_size, offset)}
- chunk = buffer.get_string(offset, maximum_size)
- result = blocking{io.write_nonblock(chunk, exception: false)}
-
- # blocking{pp write: maximum_size, result: result, length: length}
-
- case result
- when :wait_readable
+ if result > 0
+ total += result
+ offset += result
+ from += result
+ break if total >= length
+ elsif result == 0
+ break
+ elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
- return -EAGAIN
+ return result
end
- when :wait_writable
+ elsif result < 0
+ return result
+ end
+ end
+
+ return total
+ end
+
+ def io_pwrite(io, buffer, from, length, offset)
+ total = 0
+ io.nonblock = true
+
+ while true
+ maximum_size = buffer.size - offset
+ result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
+
+ if result > 0
+ total += result
+ offset += result
+ from += result
+ break if total >= length
+ elsif result == 0
+ break
+ elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
- return -EAGAIN
+ return result
end
- else
- offset += result
- break if result >= length
- length -= result
+ elsif result < 0
+ return result
end
end
- return offset
+ return total
end
def blocking(&block)
- Fiber.new(blocking: true, &block).resume
+ Fiber.blocking(&block)
end
end
+# This scheduler has a broken implementation of `unblock`` in the sense that it
+# raises an exception. This is used to test the behavior of the scheduler when
+# unblock raises an exception.
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super
@@ -349,6 +446,9 @@ class BrokenUnblockScheduler < Scheduler
end
end
+# This scheduler has a broken implementation of `unblock` in the sense that it
+# sleeps. This is used to test the behavior of the scheduler when unblock
+# messes with the internal thread state in an unexpected way.
class SleepingUnblockScheduler < Scheduler
# This method is invoked when the thread is exiting.
def unblock(blocker, fiber)
@@ -358,3 +458,16 @@ class SleepingUnblockScheduler < Scheduler
sleep(0.1)
end
end
+
+# This scheduler has a broken implementation of `kernel_sleep` in the sense that
+# it invokes a blocking sleep which can cause a deadlock in some cases.
+class SleepingBlockingScheduler < Scheduler
+ def kernel_sleep(duration = nil)
+ # Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
+ Fiber.blocking{sleep 0.0001}
+
+ self.block(:sleep, duration)
+
+ return true
+ end
+end