diff options
Diffstat (limited to 'test/fiber')
| -rw-r--r-- | test/fiber/scheduler.rb | 178 | ||||
| -rw-r--r-- | test/fiber/test_io.rb | 51 | ||||
| -rw-r--r-- | test/fiber/test_io_buffer.rb | 3 | ||||
| -rw-r--r-- | test/fiber/test_io_close.rb | 107 | ||||
| -rw-r--r-- | test/fiber/test_process.rb | 4 | ||||
| -rw-r--r-- | test/fiber/test_ractor.rb | 2 | ||||
| -rw-r--r-- | test/fiber/test_scheduler.rb | 201 | ||||
| -rw-r--r-- | test/fiber/test_sleep.rb | 4 | ||||
| -rw-r--r-- | test/fiber/test_thread.rb | 47 |
9 files changed, 537 insertions, 60 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 3926226ca3..8f1ce4376b 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -65,63 +65,79 @@ class Scheduler end end - def run - # $stderr.puts [__method__, Fiber.current].inspect + def run_once + readable = writable = nil - while @readable.any? or @writable.any? or @waiting.any? or @blocking.any? - # May only handle file descriptors up to 1024... + begin readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) + rescue IOError + # Ignore - this can happen if the IO is closed while we are waiting. + end - # puts "readable: #{readable}" if readable&.any? - # puts "writable: #{writable}" if writable&.any? + # puts "readable: #{readable}" if readable&.any? + # puts "writable: #{writable}" if writable&.any? - selected = {} + selected = {} - readable&.each do |io| - if fiber = @readable.delete(io) - @writable.delete(io) if @writable[io] == fiber - selected[fiber] = IO::READABLE - elsif io == @urgent.first - @urgent.first.read_nonblock(1024) - end + readable&.each do |io| + if fiber = @readable.delete(io) + @writable.delete(io) if @writable[io] == fiber + selected[fiber] = IO::READABLE + elsif io == @urgent.first + @urgent.first.read_nonblock(1024) end + end - writable&.each do |io| - if fiber = @writable.delete(io) - @readable.delete(io) if @readable[io] == fiber - selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE - end + writable&.each do |io| + if fiber = @writable.delete(io) + @readable.delete(io) if @readable[io] == fiber + selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE end + end - selected.each do |fiber, events| - fiber.transfer(events) - end + selected.each do |fiber, events| + fiber.transfer(events) + end - if @waiting.any? - time = current_time - waiting, @waiting = @waiting, {} - - waiting.each do |fiber, timeout| - if fiber.alive? - if timeout <= time - fiber.transfer - else - @waiting[fiber] = timeout - end + if @waiting.any? + time = current_time + waiting, @waiting = @waiting, {} + + waiting.each do |fiber, timeout| + if fiber.alive? + if timeout <= time + fiber.transfer + else + @waiting[fiber] = timeout end end end + end - if @ready.any? - ready = nil + if @ready.any? + ready = nil - @lock.synchronize do - ready, @ready = @ready, [] - end + @lock.synchronize do + ready, @ready = @ready, [] + end - ready.each do |fiber| - fiber.transfer - end + ready.each do |fiber| + fiber.transfer if fiber.alive? + end + end + end + + def run + # $stderr.puts [__method__, Fiber.current].inspect + + # Use Thread.handle_interrupt like Async::Scheduler does + # This defers signal processing, which is the root cause of the gRPC bug + # See: https://github.com/socketry/async/blob/main/lib/async/scheduler.rb + Thread.handle_interrupt(::SignalException => :never) do + while @readable.any? or @writable.any? or @waiting.any? or @blocking.any? + run_once + + break if Thread.pending_interrupt? end end end @@ -239,6 +255,13 @@ class Scheduler end.value end + # This hook is invoked by `IO#close`. Using a separate IO object + # demonstrates that the close operation is asynchronous. + def io_close(descriptor) + Fiber.blocking{IO.for_fd(descriptor.to_i).close} + return true + end + # This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`. def kernel_sleep(duration = nil) # $stderr.puts [__method__, duration, Fiber.current].inspect @@ -290,6 +313,30 @@ class Scheduler io.write_nonblock('.') end + class FiberInterrupt + def initialize(fiber, exception) + @fiber = fiber + @exception = exception + end + + def alive? + @fiber.alive? + end + + def transfer + @fiber.raise(@exception) + end + end + + def fiber_interrupt(fiber, exception) + @lock.synchronize do + @ready << FiberInterrupt.new(fiber, exception) + end + + io = @urgent.last + io.write_nonblock('.') + end + # This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use # it to create scheduled fibers, but it is not required in practice; # `Fiber.new` is usually sufficient. @@ -309,6 +356,16 @@ class Scheduler Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq end.value end + + def blocking_operation_wait(work) + thread = Thread.new{work.call} + + thread.join + + thread = nil + ensure + thread&.kill + end end # This scheduler class implements `io_read` and `io_write` hooks which require @@ -321,8 +378,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.read(io, maximum_size, offset)} + result = blocking{buffer.read(io, 0, offset)} if result > 0 total += result @@ -349,8 +405,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.write(io, maximum_size, offset)} + result = blocking{buffer.write(io, 0, offset)} if result > 0 total += result @@ -377,8 +432,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.pread(io, from, maximum_size, offset)} + result = blocking{buffer.pread(io, from, 0, offset)} if result > 0 total += result @@ -406,8 +460,7 @@ class IOBufferScheduler < Scheduler io.nonblock = true while true - maximum_size = buffer.size - offset - result = blocking{buffer.pwrite(io, from, maximum_size, offset)} + result = blocking{buffer.pwrite(io, from, 0, offset)} if result > 0 total += result @@ -435,6 +488,33 @@ class IOBufferScheduler < Scheduler end end +class IOScheduler < Scheduler + def operations + @operations ||= [] + end + + def io_write(io, buffer, length, offset) + descriptor = io.fileno + string = buffer.get_string + + self.operations << [:io_write, descriptor, string] + + Fiber.blocking do + buffer.write(io, 0, offset) + end + end +end + +class IOErrorScheduler < Scheduler + def io_read(io, buffer, length, offset) + return -Errno::EBADF::Errno + end + + def io_write(io, buffer, length, offset) + return -Errno::EINVAL::Errno + end +end + # This scheduler has a broken implementation of `unblock`` in the sense that it # raises an exception. This is used to test the behavior of the scheduler when # unblock raises an exception. diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb index 0e3e086d5a..eea06f97c8 100644 --- a/test/fiber/test_io.rb +++ b/test/fiber/test_io.rb @@ -9,7 +9,7 @@ class TestFiberIO < Test::Unit::TestCase omit unless defined?(UNIXSocket) i, o = UNIXSocket.pair - if RUBY_PLATFORM=~/mswin|mingw/ + if RUBY_PLATFORM =~ /mswin|mingw/ i.nonblock = true o.nonblock = true end @@ -44,7 +44,7 @@ class TestFiberIO < Test::Unit::TestCase 16.times.map do Thread.new do i, o = UNIXSocket.pair - if RUBY_PLATFORM=~/mswin|mingw/ + if RUBY_PLATFORM =~ /mswin|mingw/ i.nonblock = true o.nonblock = true end @@ -67,7 +67,7 @@ class TestFiberIO < Test::Unit::TestCase def test_epipe_on_read omit unless defined?(UNIXSocket) - omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM=~/mswin|mingw/ + omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM =~ /mswin|mingw/ i, o = UNIXSocket.pair @@ -153,12 +153,13 @@ class TestFiberIO < Test::Unit::TestCase Fiber.set_scheduler scheduler Fiber.schedule do - message = i.read(20) + # We add 1 here, to force the read to block (testing that specific code path). + message = i.read(MESSAGE.bytesize + 1) i.close end Fiber.schedule do - o.write("Hello World") + o.write(MESSAGE) o.close end end @@ -234,4 +235,44 @@ class TestFiberIO < Test::Unit::TestCase assert_equal "ok\n", result end + + # Tests for https://bugs.ruby-lang.org/issues/20723 which would + # otherwise deadlock this test. + def test_close_while_reading_on_thread + # Windows has UNIXSocket, but only with VS 2019+ + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + Socket.pair(:UNIX, :STREAM) do |i, o| + if RUBY_PLATFORM =~ /mswin|mingw/ + i.nonblock = true + o.nonblock = true + end + + reading_thread = Thread.new do + Thread.current.report_on_exception = false + i.wait_readable + end + + scheduler_thread = Thread.new do + # Wait until the reading thread is blocked on read: + Thread.pass until reading_thread.status == "sleep" + + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + i.close + end + end + + assert_raise(IOError) { reading_thread.join } + refute_nil scheduler_thread.join(5), "expected thread to terminate within 5 seconds" + + assert_predicate(i, :closed?) + ensure + scheduler_thread&.kill + scheduler_thread&.join rescue nil + reading_thread&.kill + reading_thread&.join rescue nil + end + end end diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb index a08b1ce1a9..19e6c1f88e 100644 --- a/test/fiber/test_io_buffer.rb +++ b/test/fiber/test_io_buffer.rb @@ -21,7 +21,8 @@ class TestFiberIOBuffer < Test::Unit::TestCase Fiber.set_scheduler scheduler Fiber.schedule do - message = i.read(20) + # We add 1 here, to force the read to block (testing that specific code path). + message = i.read(MESSAGE.bytesize + 1) i.close end diff --git a/test/fiber/test_io_close.rb b/test/fiber/test_io_close.rb new file mode 100644 index 0000000000..742b40841d --- /dev/null +++ b/test/fiber/test_io_close.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberIOClose < Test::Unit::TestCase + def with_socket_pair(&block) + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + UNIXSocket.pair do |i, o| + if RUBY_PLATFORM =~ /mswin|mingw/ + i.nonblock = true + o.nonblock = true + end + + yield i, o + end + end + + def test_io_close_across_fibers + # omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/ + + with_socket_pair do |i, o| + error = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + i.read + rescue => error + # Ignore. + end + + Fiber.schedule do + i.close + end + end + + thread.join + + assert_instance_of IOError, error + assert_match(/closed/, error.message) + end + end + + def test_io_close_blocking_thread + omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/ + + with_socket_pair do |i, o| + error = nil + + reading_thread = Thread.new do + i.read + rescue => error + # Ignore. + end + + Thread.pass until reading_thread.status == 'sleep' + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + i.close + end + end + + thread.join + reading_thread.join + + assert_instance_of IOError, error + assert_match(/closed/, error.message) + end + end + + def test_io_close_blocking_fiber + # omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/ + + with_socket_pair do |i, o| + error = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + begin + i.read + rescue => error + # Ignore. + end + end + end + + Thread.pass until thread.status == 'sleep' + + i.close + + thread.join + + assert_instance_of IOError, error + assert_match(/closed/, error.message) + end + end +end diff --git a/test/fiber/test_process.rb b/test/fiber/test_process.rb index a09b070c0a..f17f767704 100644 --- a/test/fiber/test_process.rb +++ b/test/fiber/test_process.rb @@ -59,12 +59,14 @@ class TestFiberProcess < Test::Unit::TestCase def test_fork omit 'fork not supported' unless Process.respond_to?(:fork) + + pid = Process.fork{} + Thread.new do scheduler = Scheduler.new Fiber.set_scheduler scheduler Fiber.schedule do - pid = Process.fork {} Process.wait(pid) assert_predicate $?, :success? diff --git a/test/fiber/test_ractor.rb b/test/fiber/test_ractor.rb index 3c4ccbd8e5..7dd82eda62 100644 --- a/test/fiber/test_ractor.rb +++ b/test/fiber/test_ractor.rb @@ -17,7 +17,7 @@ class TestFiberCurrentRactor < Test::Unit::TestCase Fiber.current.class end.resume end - assert_equal(Fiber, r.take) + assert_equal(Fiber, r.value) end; end end diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb index 34effad816..d3696267f7 100644 --- a/test/fiber/test_scheduler.rb +++ b/test/fiber/test_scheduler.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true require 'test/unit' +require 'securerandom' +require 'fileutils' require_relative 'scheduler' class TestFiberScheduler < Test::Unit::TestCase @@ -94,6 +96,9 @@ class TestFiberScheduler < Test::Unit::TestCase def scheduler.kernel_sleep end + def scheduler.fiber_interrupt(_fiber, _exception) + end + thread = Thread.new do Fiber.set_scheduler scheduler end @@ -139,6 +144,19 @@ class TestFiberScheduler < Test::Unit::TestCase end end + def test_iseq_compile_under_gc_stress_bug_21180 + Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + EnvUtil.under_gc_stress do + RubyVM::InstructionSequence.compile_file(File::NULL) + end + end + end.join + end + def test_deadlock mutex = Thread::Mutex.new condition = Thread::ConditionVariable.new @@ -182,4 +200,187 @@ class TestFiberScheduler < Test::Unit::TestCase thread.join signaller.join end + + def test_condition_variable + condition_variable = ::Thread::ConditionVariable.new + mutex = ::Thread::Mutex.new + + error = nil + + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + fiber = Fiber.schedule do + begin + mutex.synchronize do + condition_variable.wait(mutex) + end + rescue => error + end + end + + fiber.raise(RuntimeError) + end + + thread.join + assert_kind_of RuntimeError, error + end + + def test_post_fork_scheduler_reset + omit 'fork not supported' unless Process.respond_to?(:fork) + + forked_scheduler_state = nil + thread = Thread.new do + r, w = IO.pipe + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + forked_pid = fork do + r.close + w << (Fiber.scheduler ? 'set' : 'reset') + w.close + end + w.close + forked_scheduler_state = r.read + Process.wait(forked_pid) + ensure + r.close rescue nil + w.close rescue nil + end + thread.join + assert_equal 'reset', forked_scheduler_state + ensure + thread.kill rescue nil + end + + def test_post_fork_fiber_blocking + omit 'fork not supported' unless Process.respond_to?(:fork) + + fiber_blocking_state = nil + thread = Thread.new do + r, w = IO.pipe + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + forked_pid = nil + Fiber.schedule do + forked_pid = fork do + r.close + w << (Fiber.current.blocking? ? 'blocking' : 'nonblocking') + w.close + end + end + w.close + fiber_blocking_state = r.read + Process.wait(forked_pid) + ensure + r.close rescue nil + w.close rescue nil + end + thread.join + assert_equal 'blocking', fiber_blocking_state + ensure + thread.kill rescue nil + end + + def test_io_write_on_flush + begin + path = File.join(Dir.tmpdir, "ruby_test_io_write_on_flush_#{SecureRandom.hex}") + descriptor = nil + operations = nil + + thread = Thread.new do + scheduler = IOScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + File.open(path, 'w+') do |file| + descriptor = file.fileno + file << 'foo' + file.flush + file << 'bar' + end + end + + operations = scheduler.operations + end + + thread.join + assert_equal [ + [:io_write, descriptor, 'foo'], + [:io_write, descriptor, 'bar'] + ], operations + + assert_equal 'foobar', IO.read(path) + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end + end + + def test_io_read_error + path = File.join(Dir.tmpdir, "ruby_test_io_read_error_#{SecureRandom.hex}") + error = nil + + thread = Thread.new do + scheduler = IOErrorScheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + File.open(path, 'w+') { it.read } + rescue => error + # Ignore. + end + end + + thread.join + assert_kind_of Errno::EBADF, error + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end + + def test_io_write_error + path = File.join(Dir.tmpdir, "ruby_test_io_write_error_#{SecureRandom.hex}") + error = nil + + thread = Thread.new do + scheduler = IOErrorScheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + File.open(path, 'w+') { it.sync = true; it << 'foo' } + rescue => error + # Ignore. + end + end + + thread.join + assert_kind_of Errno::EINVAL, error + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end + + def test_io_write_flush_error + path = File.join(Dir.tmpdir, "ruby_test_io_write_flush_error_#{SecureRandom.hex}") + error = nil + + thread = Thread.new do + scheduler = IOErrorScheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + File.open(path, 'w+') { it << 'foo' } + rescue => error + # Ignore. + end + end + + thread.join + assert_kind_of Errno::EINVAL, error + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end end diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb index a7e88c0367..187f59dbd4 100644 --- a/test/fiber/test_sleep.rb +++ b/test/fiber/test_sleep.rb @@ -35,13 +35,13 @@ class TestFiberSleep < Test::Unit::TestCase scheduler = Scheduler.new Fiber.set_scheduler scheduler Fiber.schedule do - seconds = sleep(2) + seconds = sleep(1.1) end end thread.join - assert_operator seconds, :>=, 2, "actual: %p" % seconds + assert_operator seconds, :>=, 1, "actual: %p" % seconds end def test_broken_sleep diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb index 5e3cc6d0e1..4d2fbde9ed 100644 --- a/test/fiber/test_thread.rb +++ b/test/fiber/test_thread.rb @@ -90,6 +90,47 @@ class TestFiberThread < Test::Unit::TestCase assert_equal :done, thread.value end + def test_spurious_unblock_during_thread_join + ready = Thread::Queue.new + + target_thread = Thread.new do + ready.pop + :success + end + + Thread.pass until target_thread.status == "sleep" + + result = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + # Create a fiber that will join a long-running thread: + joining_fiber = Fiber.schedule do + result = target_thread.value + end + + # Create another fiber that spuriously unblocks the joining fiber: + Fiber.schedule do + # This interrupts the join in joining_fiber: + scheduler.unblock(:spurious_wakeup, joining_fiber) + + # This allows the unblock to be processed: + sleep(0) + + # This allows the target thread to finish: + ready.push(:done) + end + + scheduler.run + end + + thread.join + + assert_equal :success, result + end + def test_broken_unblock thread = Thread.new do Thread.current.report_on_exception = false @@ -115,16 +156,20 @@ class TestFiberThread < Test::Unit::TestCase end def test_thread_join_hang + inner = nil thread = Thread.new do scheduler = SleepingUnblockScheduler.new Fiber.set_scheduler scheduler Fiber.schedule do - Thread.new{sleep(0.01)}.value + inner = Thread.new{sleep(0.01)} + inner.value end end thread.join + ensure + inner&.join end end |
