From 5f69a7f60467fa58c2f998daffab43e118bff36c Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 9 Feb 2021 19:39:56 +1300 Subject: Expose scheduler as public interface & bug fixes. (#3945) * Rename `rb_scheduler` to `rb_fiber_scheduler`. * Use public interface if available. * Use `rb_check_funcall` where possible. * Don't use `unblock` unless the fiber was non-blocking. --- test/fiber/http.rb | 11 ++++++----- test/fiber/scheduler.rb | 24 ++++++++++++++++++++---- test/fiber/test_thread.rb | 45 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 9 deletions(-) create mode 100644 test/fiber/test_thread.rb (limited to 'test') diff --git a/test/fiber/http.rb b/test/fiber/http.rb index ad51ae3c76..53a4602bd4 100644 --- a/test/fiber/http.rb +++ b/test/fiber/http.rb @@ -15,24 +15,25 @@ def fetch_topics(topics) topics.each do |topic| Fiber.new(blocking: Fiber.current.blocking?) do uri = URI("https://www.google.com/search?q=#{topic}") - responses[topic] = Net::HTTP.get(uri).scan(topic).size + response = Net::HTTP.get(uri) + responses[topic] = response.scan(topic).size end.resume end - Thread.fiber_scheduler&.run + Fiber.scheduler&.run return responses end def sweep(repeats: 3, **options) times = (1..8).map do |i| - $stderr.puts "Measuring #{i} topic(s)..." + $stderr.puts "Measuring #{i} topic(s) #{options.inspect}..." topics = TOPICS[0...i] Thread.new do Benchmark.realtime do scheduler = Scheduler.new - Fiber.set_scheduler scheduler + Fiber.set_scheduler(scheduler) repeats.times do Fiber.new(**options) do @@ -49,5 +50,5 @@ def sweep(repeats: 3, **options) puts JSON.dump(times.map{|value| value.round(3)}) end -sweep(blocking: true) +# sweep(blocking: true) sweep(blocking: false) diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index b3c3eaff59..f2fb304e19 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -47,6 +47,8 @@ class Scheduler end def run + # $stderr.puts [__method__, Fiber.current].inspect + while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? # Can only handle file descriptors up to 1024... readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) @@ -54,9 +56,11 @@ class Scheduler # puts "readable: #{readable}" if readable&.any? # puts "writable: #{writable}" if writable&.any? + selected = {} + readable&.each do |io| if fiber = @readable.delete(io) - fiber.resume + selected[fiber] = IO::READABLE elsif io == @urgent.first @urgent.first.read_nonblock(1024) end @@ -64,10 +68,14 @@ class Scheduler writable&.each do |io| if fiber = @writable.delete(io) - fiber.resume + selected[fiber] |= IO::WRITABLE end end + selected.each do |fiber, events| + fiber.resume(events) + end + if @waiting.any? time = current_time waiting, @waiting = @waiting, {} @@ -96,6 +104,8 @@ class Scheduler end def close + # $stderr.puts [__method__, Fiber.current].inspect + raise "Scheduler already closed!" if @closed self.run @@ -118,6 +128,8 @@ class Scheduler end def process_wait(pid, flags) + # $stderr.puts [__method__, pid, flags, Fiber.current].inspect + # This is a very simple way to implement a non-blocking wait: Thread.new do Process::Status.wait(pid, flags) @@ -125,6 +137,8 @@ class Scheduler end def io_wait(io, events, duration) + # $stderr.puts [__method__, io, events, duration, Fiber.current].inspect + unless (events & IO::READABLE).zero? @readable[io] = Fiber.current end @@ -134,12 +148,12 @@ class Scheduler end Fiber.yield - - return true end # Used for Kernel#sleep and Mutex#sleep def kernel_sleep(duration = nil) + # $stderr.puts [__method__, duration, Fiber.current].inspect + self.block(:sleep, duration) return true @@ -171,6 +185,8 @@ class Scheduler # This might be called from another thread. def unblock(blocker, fiber) # $stderr.puts [__method__, blocker, fiber].inspect + # $stderr.puts blocker.backtrace.inspect + # $stderr.puts fiber.backtrace.inspect @lock.synchronize do @ready << fiber diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb new file mode 100644 index 0000000000..5fc80f0e6c --- /dev/null +++ b/test/fiber/test_thread.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true +require "test/unit" +require_relative 'scheduler' + +class TestFiberThread < Test::Unit::TestCase + def test_thread_join + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + result = nil + Fiber.schedule do + result = Thread.new{:done}.value + end + + scheduler.run + result + end + + assert_equal :done, thread.value + end + + def test_thread_join_blocking + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + result = nil + Fiber.schedule do + Fiber.new(blocking: true) do + # This can deadlock if the blocking state is not taken into account: + Thread.new do + sleep(0) + result = :done + end.join + end.resume + end + + scheduler.run + result + end + + assert_equal :done, thread.value + end +end -- cgit v1.2.3