diff options
Diffstat (limited to 'test/fiber/scheduler.rb')
-rw-r--r-- | test/fiber/scheduler.rb | 152 |
1 files changed, 131 insertions, 21 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 204a297133..3926226ca3 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -1,8 +1,13 @@ # frozen_string_literal: true # This is an example and simplified scheduler for test purposes. -# It is not efficient for a large number of file descriptors as it uses IO.select(). -# Production Fiber schedulers should use epoll/kqueue/etc. +# - It is not efficient for a large number of file descriptors as it uses +# IO.select(). +# - It does not correctly handle multiple calls to `wait` with the same file +# descriptor and overlapping events. +# - Production fiber schedulers should use epoll/kqueue/etc. Consider using the +# [`io-event`](https://github.com/socketry/io-event) gem instead of this +# scheduler if you want something simple to build on. require 'fiber' require 'socket' @@ -22,7 +27,9 @@ class Scheduler Warning[:experimental] = experimental end - def initialize + def initialize(fiber = Fiber.current) + @fiber = fiber + @readable = {} @writable = {} @waiting = {} @@ -40,6 +47,10 @@ class Scheduler attr :writable attr :waiting + def transfer + @fiber.transfer + end + def next_timeout _fiber, timeout = @waiting.min_by{|key, value| value} @@ -58,7 +69,7 @@ class Scheduler # $stderr.puts [__method__, Fiber.current].inspect while @readable.any? or @writable.any? or @waiting.any? or @blocking.any? - # Can only handle file descriptors up to 1024... + # May only handle file descriptors up to 1024... readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) # puts "readable: #{readable}" if readable&.any? @@ -83,7 +94,7 @@ class Scheduler end selected.each do |fiber, events| - fiber.resume(events) + fiber.transfer(events) end if @waiting.any? @@ -93,7 +104,7 @@ class Scheduler waiting.each do |fiber, timeout| if fiber.alive? if timeout <= time - fiber.resume + fiber.transfer else @waiting[fiber] = timeout end @@ -109,16 +120,22 @@ class Scheduler end ready.each do |fiber| - fiber.resume + fiber.transfer end end end end + # A fiber scheduler hook, invoked when the scheduler goes out of scope. def scheduler_close close(true) end + # If the `scheduler_close` hook does not exist, this method `close` will be + # invoked instead when the fiber scheduler goes out of scope. This is legacy + # behaviour, you should almost certainly use `scheduler_close`. The reason for + # this, is `scheduler_close` is called when the scheduler goes out of scope, + # while `close` may be called by the user. def close(internal = false) # $stderr.puts [__method__, Fiber.current].inspect @@ -153,6 +170,7 @@ class Scheduler Process.clock_gettime(Process::CLOCK_MONOTONIC) end + # This hook is invoked by `Timeout.timeout` and related code. def timeout_after(duration, klass, message, &block) fiber = Fiber.current @@ -171,6 +189,7 @@ class Scheduler end end + # This hook is invoked by `Process.wait`, `system`, and backticks. def process_wait(pid, flags) # $stderr.puts [__method__, pid, flags, Fiber.current].inspect @@ -180,23 +199,39 @@ class Scheduler end.value end + # This hook is invoked by `IO#read` and `IO#write` in the case that `io_read` + # and `io_write` hooks are not available. This implementation is not + # completely general, in the sense that calling `io_wait` multiple times with + # the same `io` and `events` will not work, which is okay for tests but not + # for real code. Correct fiber schedulers should not have this limitation. def io_wait(io, events, duration) # $stderr.puts [__method__, io, events, duration, Fiber.current].inspect + fiber = Fiber.current + unless (events & IO::READABLE).zero? - @readable[io] = Fiber.current + @readable[io] = fiber + readable = true end unless (events & IO::WRITABLE).zero? - @writable[io] = Fiber.current + @writable[io] = fiber + writable = true + end + + if duration + @waiting[fiber] = current_time + duration end - Fiber.yield + @fiber.transfer ensure - @readable.delete(io) - @writable.delete(io) + @waiting.delete(fiber) if duration + @readable.delete(io) if readable + @writable.delete(io) if writable end + # This hook is invoked by `IO.select`. Using a thread ensures that the + # operation does not block the fiber scheduler. def io_select(...) # Emulate the operation using a non-blocking thread: Thread.new do @@ -204,7 +239,7 @@ class Scheduler end.value end - # Used for Kernel#sleep and Thread::Mutex#sleep + # This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`. def kernel_sleep(duration = nil) # $stderr.puts [__method__, duration, Fiber.current].inspect @@ -213,8 +248,10 @@ class Scheduler return true end - # Used when blocking on synchronization (Thread::Mutex#lock, - # Thread::Queue#pop, Thread::SizedQueue#push, ...) + # This hook is invoked by blocking options such as `Thread::Mutex#lock`, + # `Thread::Queue#pop` and `Thread::SizedQueue#push`, which are unblocked by + # other threads/fibers. To unblock a blocked fiber, you should call `unblock` + # with the same `blocker` and `fiber` arguments. def block(blocker, timeout = nil) # $stderr.puts [__method__, blocker, timeout].inspect @@ -223,7 +260,7 @@ class Scheduler if timeout @waiting[fiber] = current_time + timeout begin - Fiber.yield + @fiber.transfer ensure # Remove from @waiting in the case #unblock was called before the timeout expired: @waiting.delete(fiber) @@ -231,16 +268,15 @@ class Scheduler else @blocking[fiber] = true begin - Fiber.yield + @fiber.transfer ensure @blocking.delete(fiber) end end end - # Used when synchronization wakes up a previously-blocked fiber - # (Thread::Mutex#unlock, Thread::Queue#push, ...). - # This might be called from another thread. + # This method is invoked from a thread or fiber to unblock a fiber that is + # blocked by `block`. It is expected to be thread safe. def unblock(blocker, fiber) # $stderr.puts [__method__, blocker, fiber].inspect # $stderr.puts blocker.backtrace.inspect @@ -254,14 +290,20 @@ class Scheduler io.write_nonblock('.') end + # This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use + # it to create scheduled fibers, but it is not required in practice; + # `Fiber.new` is usually sufficient. def fiber(&block) fiber = Fiber.new(blocking: false, &block) - fiber.resume + fiber.transfer return fiber end + # This hook is invoked by `Addrinfo.getaddrinfo`. Using a thread ensures that + # the operation does not block the fiber scheduler, since `getaddrinfo` is + # usually provided by `libc` and is blocking. def address_resolve(hostname) Thread.new do Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq @@ -269,6 +311,8 @@ class Scheduler end end +# This scheduler class implements `io_read` and `io_write` hooks which require +# `IO::Buffer`. class IOBufferScheduler < Scheduler EAGAIN = -Errno::EAGAIN::Errno @@ -328,11 +372,72 @@ class IOBufferScheduler < Scheduler return total end + def io_pread(io, buffer, from, length, offset) + total = 0 + io.nonblock = true + + while true + maximum_size = buffer.size - offset + result = blocking{buffer.pread(io, from, maximum_size, offset)} + + if result > 0 + total += result + offset += result + from += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + + def io_pwrite(io, buffer, from, length, offset) + total = 0 + io.nonblock = true + + while true + maximum_size = buffer.size - offset + result = blocking{buffer.pwrite(io, from, maximum_size, offset)} + + if result > 0 + total += result + offset += result + from += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + def blocking(&block) Fiber.blocking(&block) end end +# This scheduler has a broken implementation of `unblock`` in the sense that it +# raises an exception. This is used to test the behavior of the scheduler when +# unblock raises an exception. class BrokenUnblockScheduler < Scheduler def unblock(blocker, fiber) super @@ -341,6 +446,9 @@ class BrokenUnblockScheduler < Scheduler end end +# This scheduler has a broken implementation of `unblock` in the sense that it +# sleeps. This is used to test the behavior of the scheduler when unblock +# messes with the internal thread state in an unexpected way. class SleepingUnblockScheduler < Scheduler # This method is invoked when the thread is exiting. def unblock(blocker, fiber) @@ -351,6 +459,8 @@ class SleepingUnblockScheduler < Scheduler end end +# This scheduler has a broken implementation of `kernel_sleep` in the sense that +# it invokes a blocking sleep which can cause a deadlock in some cases. class SleepingBlockingScheduler < Scheduler def kernel_sleep(duration = nil) # Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct. |