summaryrefslogtreecommitdiff
path: root/test/fiber
diff options
context:
space:
mode:
Diffstat (limited to 'test/fiber')
-rw-r--r--test/fiber/scheduler.rb178
-rw-r--r--test/fiber/test_io.rb51
-rw-r--r--test/fiber/test_io_buffer.rb3
-rw-r--r--test/fiber/test_io_close.rb107
-rw-r--r--test/fiber/test_process.rb4
-rw-r--r--test/fiber/test_ractor.rb2
-rw-r--r--test/fiber/test_scheduler.rb201
-rw-r--r--test/fiber/test_sleep.rb4
-rw-r--r--test/fiber/test_thread.rb47
9 files changed, 537 insertions, 60 deletions
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 3926226ca3..8f1ce4376b 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -65,63 +65,79 @@ class Scheduler
end
end
- def run
- # $stderr.puts [__method__, Fiber.current].inspect
+ def run_once
+ readable = writable = nil
- while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
- # May only handle file descriptors up to 1024...
+ begin
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
+ rescue IOError
+ # Ignore - this can happen if the IO is closed while we are waiting.
+ end
- # puts "readable: #{readable}" if readable&.any?
- # puts "writable: #{writable}" if writable&.any?
+ # puts "readable: #{readable}" if readable&.any?
+ # puts "writable: #{writable}" if writable&.any?
- selected = {}
+ selected = {}
- readable&.each do |io|
- if fiber = @readable.delete(io)
- @writable.delete(io) if @writable[io] == fiber
- selected[fiber] = IO::READABLE
- elsif io == @urgent.first
- @urgent.first.read_nonblock(1024)
- end
+ readable&.each do |io|
+ if fiber = @readable.delete(io)
+ @writable.delete(io) if @writable[io] == fiber
+ selected[fiber] = IO::READABLE
+ elsif io == @urgent.first
+ @urgent.first.read_nonblock(1024)
end
+ end
- writable&.each do |io|
- if fiber = @writable.delete(io)
- @readable.delete(io) if @readable[io] == fiber
- selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE
- end
+ writable&.each do |io|
+ if fiber = @writable.delete(io)
+ @readable.delete(io) if @readable[io] == fiber
+ selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE
end
+ end
- selected.each do |fiber, events|
- fiber.transfer(events)
- end
+ selected.each do |fiber, events|
+ fiber.transfer(events)
+ end
- if @waiting.any?
- time = current_time
- waiting, @waiting = @waiting, {}
-
- waiting.each do |fiber, timeout|
- if fiber.alive?
- if timeout <= time
- fiber.transfer
- else
- @waiting[fiber] = timeout
- end
+ if @waiting.any?
+ time = current_time
+ waiting, @waiting = @waiting, {}
+
+ waiting.each do |fiber, timeout|
+ if fiber.alive?
+ if timeout <= time
+ fiber.transfer
+ else
+ @waiting[fiber] = timeout
end
end
end
+ end
- if @ready.any?
- ready = nil
+ if @ready.any?
+ ready = nil
- @lock.synchronize do
- ready, @ready = @ready, []
- end
+ @lock.synchronize do
+ ready, @ready = @ready, []
+ end
- ready.each do |fiber|
- fiber.transfer
- end
+ ready.each do |fiber|
+ fiber.transfer if fiber.alive?
+ end
+ end
+ end
+
+ def run
+ # $stderr.puts [__method__, Fiber.current].inspect
+
+ # Use Thread.handle_interrupt like Async::Scheduler does
+ # This defers signal processing, which is the root cause of the gRPC bug
+ # See: https://github.com/socketry/async/blob/main/lib/async/scheduler.rb
+ Thread.handle_interrupt(::SignalException => :never) do
+ while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
+ run_once
+
+ break if Thread.pending_interrupt?
end
end
end
@@ -239,6 +255,13 @@ class Scheduler
end.value
end
+ # This hook is invoked by `IO#close`. Using a separate IO object
+ # demonstrates that the close operation is asynchronous.
+ def io_close(descriptor)
+ Fiber.blocking{IO.for_fd(descriptor.to_i).close}
+ return true
+ end
+
# This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`.
def kernel_sleep(duration = nil)
# $stderr.puts [__method__, duration, Fiber.current].inspect
@@ -290,6 +313,30 @@ class Scheduler
io.write_nonblock('.')
end
+ class FiberInterrupt
+ def initialize(fiber, exception)
+ @fiber = fiber
+ @exception = exception
+ end
+
+ def alive?
+ @fiber.alive?
+ end
+
+ def transfer
+ @fiber.raise(@exception)
+ end
+ end
+
+ def fiber_interrupt(fiber, exception)
+ @lock.synchronize do
+ @ready << FiberInterrupt.new(fiber, exception)
+ end
+
+ io = @urgent.last
+ io.write_nonblock('.')
+ end
+
# This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use
# it to create scheduled fibers, but it is not required in practice;
# `Fiber.new` is usually sufficient.
@@ -309,6 +356,16 @@ class Scheduler
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
end.value
end
+
+ def blocking_operation_wait(work)
+ thread = Thread.new{work.call}
+
+ thread.join
+
+ thread = nil
+ ensure
+ thread&.kill
+ end
end
# This scheduler class implements `io_read` and `io_write` hooks which require
@@ -321,8 +378,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.read(io, maximum_size, offset)}
+ result = blocking{buffer.read(io, 0, offset)}
if result > 0
total += result
@@ -349,8 +405,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.write(io, maximum_size, offset)}
+ result = blocking{buffer.write(io, 0, offset)}
if result > 0
total += result
@@ -377,8 +432,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.pread(io, from, maximum_size, offset)}
+ result = blocking{buffer.pread(io, from, 0, offset)}
if result > 0
total += result
@@ -406,8 +460,7 @@ class IOBufferScheduler < Scheduler
io.nonblock = true
while true
- maximum_size = buffer.size - offset
- result = blocking{buffer.pwrite(io, from, maximum_size, offset)}
+ result = blocking{buffer.pwrite(io, from, 0, offset)}
if result > 0
total += result
@@ -435,6 +488,33 @@ class IOBufferScheduler < Scheduler
end
end
+class IOScheduler < Scheduler
+ def operations
+ @operations ||= []
+ end
+
+ def io_write(io, buffer, length, offset)
+ descriptor = io.fileno
+ string = buffer.get_string
+
+ self.operations << [:io_write, descriptor, string]
+
+ Fiber.blocking do
+ buffer.write(io, 0, offset)
+ end
+ end
+end
+
+class IOErrorScheduler < Scheduler
+ def io_read(io, buffer, length, offset)
+ return -Errno::EBADF::Errno
+ end
+
+ def io_write(io, buffer, length, offset)
+ return -Errno::EINVAL::Errno
+ end
+end
+
# This scheduler has a broken implementation of `unblock`` in the sense that it
# raises an exception. This is used to test the behavior of the scheduler when
# unblock raises an exception.
diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb
index 0e3e086d5a..eea06f97c8 100644
--- a/test/fiber/test_io.rb
+++ b/test/fiber/test_io.rb
@@ -9,7 +9,7 @@ class TestFiberIO < Test::Unit::TestCase
omit unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
- if RUBY_PLATFORM=~/mswin|mingw/
+ if RUBY_PLATFORM =~ /mswin|mingw/
i.nonblock = true
o.nonblock = true
end
@@ -44,7 +44,7 @@ class TestFiberIO < Test::Unit::TestCase
16.times.map do
Thread.new do
i, o = UNIXSocket.pair
- if RUBY_PLATFORM=~/mswin|mingw/
+ if RUBY_PLATFORM =~ /mswin|mingw/
i.nonblock = true
o.nonblock = true
end
@@ -67,7 +67,7 @@ class TestFiberIO < Test::Unit::TestCase
def test_epipe_on_read
omit unless defined?(UNIXSocket)
- omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM=~/mswin|mingw/
+ omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM =~ /mswin|mingw/
i, o = UNIXSocket.pair
@@ -153,12 +153,13 @@ class TestFiberIO < Test::Unit::TestCase
Fiber.set_scheduler scheduler
Fiber.schedule do
- message = i.read(20)
+ # We add 1 here, to force the read to block (testing that specific code path).
+ message = i.read(MESSAGE.bytesize + 1)
i.close
end
Fiber.schedule do
- o.write("Hello World")
+ o.write(MESSAGE)
o.close
end
end
@@ -234,4 +235,44 @@ class TestFiberIO < Test::Unit::TestCase
assert_equal "ok\n", result
end
+
+ # Tests for https://bugs.ruby-lang.org/issues/20723 which would
+ # otherwise deadlock this test.
+ def test_close_while_reading_on_thread
+ # Windows has UNIXSocket, but only with VS 2019+
+ omit "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ Socket.pair(:UNIX, :STREAM) do |i, o|
+ if RUBY_PLATFORM =~ /mswin|mingw/
+ i.nonblock = true
+ o.nonblock = true
+ end
+
+ reading_thread = Thread.new do
+ Thread.current.report_on_exception = false
+ i.wait_readable
+ end
+
+ scheduler_thread = Thread.new do
+ # Wait until the reading thread is blocked on read:
+ Thread.pass until reading_thread.status == "sleep"
+
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+ Fiber.schedule do
+ i.close
+ end
+ end
+
+ assert_raise(IOError) { reading_thread.join }
+ refute_nil scheduler_thread.join(5), "expected thread to terminate within 5 seconds"
+
+ assert_predicate(i, :closed?)
+ ensure
+ scheduler_thread&.kill
+ scheduler_thread&.join rescue nil
+ reading_thread&.kill
+ reading_thread&.join rescue nil
+ end
+ end
end
diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb
index a08b1ce1a9..19e6c1f88e 100644
--- a/test/fiber/test_io_buffer.rb
+++ b/test/fiber/test_io_buffer.rb
@@ -21,7 +21,8 @@ class TestFiberIOBuffer < Test::Unit::TestCase
Fiber.set_scheduler scheduler
Fiber.schedule do
- message = i.read(20)
+ # We add 1 here, to force the read to block (testing that specific code path).
+ message = i.read(MESSAGE.bytesize + 1)
i.close
end
diff --git a/test/fiber/test_io_close.rb b/test/fiber/test_io_close.rb
new file mode 100644
index 0000000000..742b40841d
--- /dev/null
+++ b/test/fiber/test_io_close.rb
@@ -0,0 +1,107 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestFiberIOClose < Test::Unit::TestCase
+ def with_socket_pair(&block)
+ omit "UNIXSocket is not defined!" unless defined?(UNIXSocket)
+
+ UNIXSocket.pair do |i, o|
+ if RUBY_PLATFORM =~ /mswin|mingw/
+ i.nonblock = true
+ o.nonblock = true
+ end
+
+ yield i, o
+ end
+ end
+
+ def test_io_close_across_fibers
+ # omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
+
+ with_socket_pair do |i, o|
+ error = nil
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ i.read
+ rescue => error
+ # Ignore.
+ end
+
+ Fiber.schedule do
+ i.close
+ end
+ end
+
+ thread.join
+
+ assert_instance_of IOError, error
+ assert_match(/closed/, error.message)
+ end
+ end
+
+ def test_io_close_blocking_thread
+ omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
+
+ with_socket_pair do |i, o|
+ error = nil
+
+ reading_thread = Thread.new do
+ i.read
+ rescue => error
+ # Ignore.
+ end
+
+ Thread.pass until reading_thread.status == 'sleep'
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ i.close
+ end
+ end
+
+ thread.join
+ reading_thread.join
+
+ assert_instance_of IOError, error
+ assert_match(/closed/, error.message)
+ end
+ end
+
+ def test_io_close_blocking_fiber
+ # omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/
+
+ with_socket_pair do |i, o|
+ error = nil
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ begin
+ i.read
+ rescue => error
+ # Ignore.
+ end
+ end
+ end
+
+ Thread.pass until thread.status == 'sleep'
+
+ i.close
+
+ thread.join
+
+ assert_instance_of IOError, error
+ assert_match(/closed/, error.message)
+ end
+ end
+end
diff --git a/test/fiber/test_process.rb b/test/fiber/test_process.rb
index a09b070c0a..f17f767704 100644
--- a/test/fiber/test_process.rb
+++ b/test/fiber/test_process.rb
@@ -59,12 +59,14 @@ class TestFiberProcess < Test::Unit::TestCase
def test_fork
omit 'fork not supported' unless Process.respond_to?(:fork)
+
+ pid = Process.fork{}
+
Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
- pid = Process.fork {}
Process.wait(pid)
assert_predicate $?, :success?
diff --git a/test/fiber/test_ractor.rb b/test/fiber/test_ractor.rb
index 3c4ccbd8e5..7dd82eda62 100644
--- a/test/fiber/test_ractor.rb
+++ b/test/fiber/test_ractor.rb
@@ -17,7 +17,7 @@ class TestFiberCurrentRactor < Test::Unit::TestCase
Fiber.current.class
end.resume
end
- assert_equal(Fiber, r.take)
+ assert_equal(Fiber, r.value)
end;
end
end
diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb
index 34effad816..d3696267f7 100644
--- a/test/fiber/test_scheduler.rb
+++ b/test/fiber/test_scheduler.rb
@@ -1,5 +1,7 @@
# frozen_string_literal: true
require 'test/unit'
+require 'securerandom'
+require 'fileutils'
require_relative 'scheduler'
class TestFiberScheduler < Test::Unit::TestCase
@@ -94,6 +96,9 @@ class TestFiberScheduler < Test::Unit::TestCase
def scheduler.kernel_sleep
end
+ def scheduler.fiber_interrupt(_fiber, _exception)
+ end
+
thread = Thread.new do
Fiber.set_scheduler scheduler
end
@@ -139,6 +144,19 @@ class TestFiberScheduler < Test::Unit::TestCase
end
end
+ def test_iseq_compile_under_gc_stress_bug_21180
+ Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ EnvUtil.under_gc_stress do
+ RubyVM::InstructionSequence.compile_file(File::NULL)
+ end
+ end
+ end.join
+ end
+
def test_deadlock
mutex = Thread::Mutex.new
condition = Thread::ConditionVariable.new
@@ -182,4 +200,187 @@ class TestFiberScheduler < Test::Unit::TestCase
thread.join
signaller.join
end
+
+ def test_condition_variable
+ condition_variable = ::Thread::ConditionVariable.new
+ mutex = ::Thread::Mutex.new
+
+ error = nil
+
+ thread = Thread.new do
+ Thread.current.report_on_exception = false
+
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ fiber = Fiber.schedule do
+ begin
+ mutex.synchronize do
+ condition_variable.wait(mutex)
+ end
+ rescue => error
+ end
+ end
+
+ fiber.raise(RuntimeError)
+ end
+
+ thread.join
+ assert_kind_of RuntimeError, error
+ end
+
+ def test_post_fork_scheduler_reset
+ omit 'fork not supported' unless Process.respond_to?(:fork)
+
+ forked_scheduler_state = nil
+ thread = Thread.new do
+ r, w = IO.pipe
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ forked_pid = fork do
+ r.close
+ w << (Fiber.scheduler ? 'set' : 'reset')
+ w.close
+ end
+ w.close
+ forked_scheduler_state = r.read
+ Process.wait(forked_pid)
+ ensure
+ r.close rescue nil
+ w.close rescue nil
+ end
+ thread.join
+ assert_equal 'reset', forked_scheduler_state
+ ensure
+ thread.kill rescue nil
+ end
+
+ def test_post_fork_fiber_blocking
+ omit 'fork not supported' unless Process.respond_to?(:fork)
+
+ fiber_blocking_state = nil
+ thread = Thread.new do
+ r, w = IO.pipe
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ forked_pid = nil
+ Fiber.schedule do
+ forked_pid = fork do
+ r.close
+ w << (Fiber.current.blocking? ? 'blocking' : 'nonblocking')
+ w.close
+ end
+ end
+ w.close
+ fiber_blocking_state = r.read
+ Process.wait(forked_pid)
+ ensure
+ r.close rescue nil
+ w.close rescue nil
+ end
+ thread.join
+ assert_equal 'blocking', fiber_blocking_state
+ ensure
+ thread.kill rescue nil
+ end
+
+ def test_io_write_on_flush
+ begin
+ path = File.join(Dir.tmpdir, "ruby_test_io_write_on_flush_#{SecureRandom.hex}")
+ descriptor = nil
+ operations = nil
+
+ thread = Thread.new do
+ scheduler = IOScheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ File.open(path, 'w+') do |file|
+ descriptor = file.fileno
+ file << 'foo'
+ file.flush
+ file << 'bar'
+ end
+ end
+
+ operations = scheduler.operations
+ end
+
+ thread.join
+ assert_equal [
+ [:io_write, descriptor, 'foo'],
+ [:io_write, descriptor, 'bar']
+ ], operations
+
+ assert_equal 'foobar', IO.read(path)
+ ensure
+ thread.kill rescue nil
+ FileUtils.rm_f(path)
+ end
+ end
+
+ def test_io_read_error
+ path = File.join(Dir.tmpdir, "ruby_test_io_read_error_#{SecureRandom.hex}")
+ error = nil
+
+ thread = Thread.new do
+ scheduler = IOErrorScheduler.new
+ Fiber.set_scheduler scheduler
+ Fiber.schedule do
+ File.open(path, 'w+') { it.read }
+ rescue => error
+ # Ignore.
+ end
+ end
+
+ thread.join
+ assert_kind_of Errno::EBADF, error
+ ensure
+ thread.kill rescue nil
+ FileUtils.rm_f(path)
+ end
+
+ def test_io_write_error
+ path = File.join(Dir.tmpdir, "ruby_test_io_write_error_#{SecureRandom.hex}")
+ error = nil
+
+ thread = Thread.new do
+ scheduler = IOErrorScheduler.new
+ Fiber.set_scheduler scheduler
+ Fiber.schedule do
+ File.open(path, 'w+') { it.sync = true; it << 'foo' }
+ rescue => error
+ # Ignore.
+ end
+ end
+
+ thread.join
+ assert_kind_of Errno::EINVAL, error
+ ensure
+ thread.kill rescue nil
+ FileUtils.rm_f(path)
+ end
+
+ def test_io_write_flush_error
+ path = File.join(Dir.tmpdir, "ruby_test_io_write_flush_error_#{SecureRandom.hex}")
+ error = nil
+
+ thread = Thread.new do
+ scheduler = IOErrorScheduler.new
+ Fiber.set_scheduler scheduler
+ Fiber.schedule do
+ File.open(path, 'w+') { it << 'foo' }
+ rescue => error
+ # Ignore.
+ end
+ end
+
+ thread.join
+ assert_kind_of Errno::EINVAL, error
+ ensure
+ thread.kill rescue nil
+ FileUtils.rm_f(path)
+ end
end
diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb
index a7e88c0367..187f59dbd4 100644
--- a/test/fiber/test_sleep.rb
+++ b/test/fiber/test_sleep.rb
@@ -35,13 +35,13 @@ class TestFiberSleep < Test::Unit::TestCase
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
- seconds = sleep(2)
+ seconds = sleep(1.1)
end
end
thread.join
- assert_operator seconds, :>=, 2, "actual: %p" % seconds
+ assert_operator seconds, :>=, 1, "actual: %p" % seconds
end
def test_broken_sleep
diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb
index 5e3cc6d0e1..4d2fbde9ed 100644
--- a/test/fiber/test_thread.rb
+++ b/test/fiber/test_thread.rb
@@ -90,6 +90,47 @@ class TestFiberThread < Test::Unit::TestCase
assert_equal :done, thread.value
end
+ def test_spurious_unblock_during_thread_join
+ ready = Thread::Queue.new
+
+ target_thread = Thread.new do
+ ready.pop
+ :success
+ end
+
+ Thread.pass until target_thread.status == "sleep"
+
+ result = nil
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ # Create a fiber that will join a long-running thread:
+ joining_fiber = Fiber.schedule do
+ result = target_thread.value
+ end
+
+ # Create another fiber that spuriously unblocks the joining fiber:
+ Fiber.schedule do
+ # This interrupts the join in joining_fiber:
+ scheduler.unblock(:spurious_wakeup, joining_fiber)
+
+ # This allows the unblock to be processed:
+ sleep(0)
+
+ # This allows the target thread to finish:
+ ready.push(:done)
+ end
+
+ scheduler.run
+ end
+
+ thread.join
+
+ assert_equal :success, result
+ end
+
def test_broken_unblock
thread = Thread.new do
Thread.current.report_on_exception = false
@@ -115,16 +156,20 @@ class TestFiberThread < Test::Unit::TestCase
end
def test_thread_join_hang
+ inner = nil
thread = Thread.new do
scheduler = SleepingUnblockScheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
- Thread.new{sleep(0.01)}.value
+ inner = Thread.new{sleep(0.01)}
+ inner.value
end
end
thread.join
+ ensure
+ inner&.join
end
end