diff options
Diffstat (limited to 'test/fiber')
| -rw-r--r-- | test/fiber/autoload.rb | 3 | ||||
| -rw-r--r-- | test/fiber/http.rb | 54 | ||||
| -rw-r--r-- | test/fiber/scheduler.rb | 553 | ||||
| -rw-r--r-- | test/fiber/test_address_resolve.rb | 278 | ||||
| -rw-r--r-- | test/fiber/test_backtrace.rb | 22 | ||||
| -rw-r--r-- | test/fiber/test_enumerator.rb | 53 | ||||
| -rw-r--r-- | test/fiber/test_io.rb | 278 | ||||
| -rw-r--r-- | test/fiber/test_io_buffer.rb | 200 | ||||
| -rw-r--r-- | test/fiber/test_io_close.rb | 107 | ||||
| -rw-r--r-- | test/fiber/test_mutex.rb | 240 | ||||
| -rw-r--r-- | test/fiber/test_process.rb | 76 | ||||
| -rw-r--r-- | test/fiber/test_queue.rb | 54 | ||||
| -rw-r--r-- | test/fiber/test_ractor.rb | 23 | ||||
| -rw-r--r-- | test/fiber/test_scheduler.rb | 386 | ||||
| -rw-r--r-- | test/fiber/test_sleep.rb | 71 | ||||
| -rw-r--r-- | test/fiber/test_storage.rb | 115 | ||||
| -rw-r--r-- | test/fiber/test_thread.rb | 175 | ||||
| -rw-r--r-- | test/fiber/test_timeout.rb | 51 |
18 files changed, 2739 insertions, 0 deletions
diff --git a/test/fiber/autoload.rb b/test/fiber/autoload.rb new file mode 100644 index 0000000000..dcb27164a7 --- /dev/null +++ b/test/fiber/autoload.rb @@ -0,0 +1,3 @@ +sleep 0.01 +module TestFiberSchedulerAutoload +end diff --git a/test/fiber/http.rb b/test/fiber/http.rb new file mode 100644 index 0000000000..53a4602bd4 --- /dev/null +++ b/test/fiber/http.rb @@ -0,0 +1,54 @@ + +require 'benchmark' + +TOPICS = ["cats", "dogs", "pigs", "skeletons", "zombies", "ocelots", "villagers", "pillagers"] + +require 'net/http' +require 'uri' +require 'json' + +require_relative 'scheduler' + +def fetch_topics(topics) + responses = {} + + topics.each do |topic| + Fiber.new(blocking: Fiber.current.blocking?) do + uri = URI("https://www.google.com/search?q=#{topic}") + response = Net::HTTP.get(uri) + responses[topic] = response.scan(topic).size + end.resume + end + + Fiber.scheduler&.run + + return responses +end + +def sweep(repeats: 3, **options) + times = (1..8).map do |i| + $stderr.puts "Measuring #{i} topic(s) #{options.inspect}..." + topics = TOPICS[0...i] + + Thread.new do + Benchmark.realtime do + scheduler = Scheduler.new + Fiber.set_scheduler(scheduler) + + repeats.times do + Fiber.new(**options) do + pp fetch_topics(topics) + end.resume + + scheduler.run + end + end + end.value / repeats + end + + puts options.inspect + puts JSON.dump(times.map{|value| value.round(3)}) +end + +# sweep(blocking: true) +sweep(blocking: false) diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb new file mode 100644 index 0000000000..8f1ce4376b --- /dev/null +++ b/test/fiber/scheduler.rb @@ -0,0 +1,553 @@ +# frozen_string_literal: true + +# This is an example and simplified scheduler for test purposes. +# - It is not efficient for a large number of file descriptors as it uses +# IO.select(). +# - It does not correctly handle multiple calls to `wait` with the same file +# descriptor and overlapping events. +# - Production fiber schedulers should use epoll/kqueue/etc. Consider using the +# [`io-event`](https://github.com/socketry/io-event) gem instead of this +# scheduler if you want something simple to build on. + +require 'fiber' +require 'socket' + +begin + require 'io/nonblock' +rescue LoadError + # Ignore. +end + +class Scheduler + experimental = Warning[:experimental] + begin + Warning[:experimental] = false + IO::Buffer.new(0) + ensure + Warning[:experimental] = experimental + end + + def initialize(fiber = Fiber.current) + @fiber = fiber + + @readable = {} + @writable = {} + @waiting = {} + + @closed = false + + @lock = Thread::Mutex.new + @blocking = Hash.new.compare_by_identity + @ready = [] + + @urgent = IO.pipe + end + + attr :readable + attr :writable + attr :waiting + + def transfer + @fiber.transfer + end + + def next_timeout + _fiber, timeout = @waiting.min_by{|key, value| value} + + if timeout + offset = timeout - current_time + + if offset < 0 + return 0 + else + return offset + end + end + end + + def run_once + readable = writable = nil + + begin + readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) + rescue IOError + # Ignore - this can happen if the IO is closed while we are waiting. + end + + # puts "readable: #{readable}" if readable&.any? + # puts "writable: #{writable}" if writable&.any? + + selected = {} + + readable&.each do |io| + if fiber = @readable.delete(io) + @writable.delete(io) if @writable[io] == fiber + selected[fiber] = IO::READABLE + elsif io == @urgent.first + @urgent.first.read_nonblock(1024) + end + end + + writable&.each do |io| + if fiber = @writable.delete(io) + @readable.delete(io) if @readable[io] == fiber + selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE + end + end + + selected.each do |fiber, events| + fiber.transfer(events) + end + + if @waiting.any? + time = current_time + waiting, @waiting = @waiting, {} + + waiting.each do |fiber, timeout| + if fiber.alive? + if timeout <= time + fiber.transfer + else + @waiting[fiber] = timeout + end + end + end + end + + if @ready.any? + ready = nil + + @lock.synchronize do + ready, @ready = @ready, [] + end + + ready.each do |fiber| + fiber.transfer if fiber.alive? + end + end + end + + def run + # $stderr.puts [__method__, Fiber.current].inspect + + # Use Thread.handle_interrupt like Async::Scheduler does + # This defers signal processing, which is the root cause of the gRPC bug + # See: https://github.com/socketry/async/blob/main/lib/async/scheduler.rb + Thread.handle_interrupt(::SignalException => :never) do + while @readable.any? or @writable.any? or @waiting.any? or @blocking.any? + run_once + + break if Thread.pending_interrupt? + end + end + end + + # A fiber scheduler hook, invoked when the scheduler goes out of scope. + def scheduler_close + close(true) + end + + # If the `scheduler_close` hook does not exist, this method `close` will be + # invoked instead when the fiber scheduler goes out of scope. This is legacy + # behaviour, you should almost certainly use `scheduler_close`. The reason for + # this, is `scheduler_close` is called when the scheduler goes out of scope, + # while `close` may be called by the user. + def close(internal = false) + # $stderr.puts [__method__, Fiber.current].inspect + + unless internal + if Fiber.scheduler == self + return Fiber.set_scheduler(nil) + end + end + + if @closed + raise "Scheduler already closed!" + end + + self.run + ensure + if @urgent + @urgent.each(&:close) + @urgent = nil + end + + @closed ||= true + + # We freeze to detect any unintended modifications after the scheduler is closed: + self.freeze + end + + def closed? + @closed + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + # This hook is invoked by `Timeout.timeout` and related code. + def timeout_after(duration, klass, message, &block) + fiber = Fiber.current + + self.fiber do + sleep(duration) + + if fiber&.alive? + fiber.raise(klass, message) + end + end + + begin + yield(duration) + ensure + fiber = nil + end + end + + # This hook is invoked by `Process.wait`, `system`, and backticks. + def process_wait(pid, flags) + # $stderr.puts [__method__, pid, flags, Fiber.current].inspect + + # This is a very simple way to implement a non-blocking wait: + Thread.new do + Process::Status.wait(pid, flags) + end.value + end + + # This hook is invoked by `IO#read` and `IO#write` in the case that `io_read` + # and `io_write` hooks are not available. This implementation is not + # completely general, in the sense that calling `io_wait` multiple times with + # the same `io` and `events` will not work, which is okay for tests but not + # for real code. Correct fiber schedulers should not have this limitation. + def io_wait(io, events, duration) + # $stderr.puts [__method__, io, events, duration, Fiber.current].inspect + + fiber = Fiber.current + + unless (events & IO::READABLE).zero? + @readable[io] = fiber + readable = true + end + + unless (events & IO::WRITABLE).zero? + @writable[io] = fiber + writable = true + end + + if duration + @waiting[fiber] = current_time + duration + end + + @fiber.transfer + ensure + @waiting.delete(fiber) if duration + @readable.delete(io) if readable + @writable.delete(io) if writable + end + + # This hook is invoked by `IO.select`. Using a thread ensures that the + # operation does not block the fiber scheduler. + def io_select(...) + # Emulate the operation using a non-blocking thread: + Thread.new do + IO.select(...) + end.value + end + + # This hook is invoked by `IO#close`. Using a separate IO object + # demonstrates that the close operation is asynchronous. + def io_close(descriptor) + Fiber.blocking{IO.for_fd(descriptor.to_i).close} + return true + end + + # This hook is invoked by `Kernel#sleep` and `Thread::Mutex#sleep`. + def kernel_sleep(duration = nil) + # $stderr.puts [__method__, duration, Fiber.current].inspect + + self.block(:sleep, duration) + + return true + end + + # This hook is invoked by blocking options such as `Thread::Mutex#lock`, + # `Thread::Queue#pop` and `Thread::SizedQueue#push`, which are unblocked by + # other threads/fibers. To unblock a blocked fiber, you should call `unblock` + # with the same `blocker` and `fiber` arguments. + def block(blocker, timeout = nil) + # $stderr.puts [__method__, blocker, timeout].inspect + + fiber = Fiber.current + + if timeout + @waiting[fiber] = current_time + timeout + begin + @fiber.transfer + ensure + # Remove from @waiting in the case #unblock was called before the timeout expired: + @waiting.delete(fiber) + end + else + @blocking[fiber] = true + begin + @fiber.transfer + ensure + @blocking.delete(fiber) + end + end + end + + # This method is invoked from a thread or fiber to unblock a fiber that is + # blocked by `block`. It is expected to be thread safe. + def unblock(blocker, fiber) + # $stderr.puts [__method__, blocker, fiber].inspect + # $stderr.puts blocker.backtrace.inspect + # $stderr.puts fiber.backtrace.inspect + + @lock.synchronize do + @ready << fiber + end + + io = @urgent.last + io.write_nonblock('.') + end + + class FiberInterrupt + def initialize(fiber, exception) + @fiber = fiber + @exception = exception + end + + def alive? + @fiber.alive? + end + + def transfer + @fiber.raise(@exception) + end + end + + def fiber_interrupt(fiber, exception) + @lock.synchronize do + @ready << FiberInterrupt.new(fiber, exception) + end + + io = @urgent.last + io.write_nonblock('.') + end + + # This hook is invoked by `Fiber.schedule`. Strictly speaking, you should use + # it to create scheduled fibers, but it is not required in practice; + # `Fiber.new` is usually sufficient. + def fiber(&block) + fiber = Fiber.new(blocking: false, &block) + + fiber.transfer + + return fiber + end + + # This hook is invoked by `Addrinfo.getaddrinfo`. Using a thread ensures that + # the operation does not block the fiber scheduler, since `getaddrinfo` is + # usually provided by `libc` and is blocking. + def address_resolve(hostname) + Thread.new do + Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq + end.value + end + + def blocking_operation_wait(work) + thread = Thread.new{work.call} + + thread.join + + thread = nil + ensure + thread&.kill + end +end + +# This scheduler class implements `io_read` and `io_write` hooks which require +# `IO::Buffer`. +class IOBufferScheduler < Scheduler + EAGAIN = -Errno::EAGAIN::Errno + + def io_read(io, buffer, length, offset) + total = 0 + io.nonblock = true + + while true + result = blocking{buffer.read(io, 0, offset)} + + if result > 0 + total += result + offset += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + + def io_write(io, buffer, length, offset) + total = 0 + io.nonblock = true + + while true + result = blocking{buffer.write(io, 0, offset)} + + if result > 0 + total += result + offset += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + + def io_pread(io, buffer, from, length, offset) + total = 0 + io.nonblock = true + + while true + result = blocking{buffer.pread(io, from, 0, offset)} + + if result > 0 + total += result + offset += result + from += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::READABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + + def io_pwrite(io, buffer, from, length, offset) + total = 0 + io.nonblock = true + + while true + result = blocking{buffer.pwrite(io, from, 0, offset)} + + if result > 0 + total += result + offset += result + from += result + break if total >= length + elsif result == 0 + break + elsif result == EAGAIN + if length > 0 + self.io_wait(io, IO::WRITABLE, nil) + else + return result + end + elsif result < 0 + return result + end + end + + return total + end + + def blocking(&block) + Fiber.blocking(&block) + end +end + +class IOScheduler < Scheduler + def operations + @operations ||= [] + end + + def io_write(io, buffer, length, offset) + descriptor = io.fileno + string = buffer.get_string + + self.operations << [:io_write, descriptor, string] + + Fiber.blocking do + buffer.write(io, 0, offset) + end + end +end + +class IOErrorScheduler < Scheduler + def io_read(io, buffer, length, offset) + return -Errno::EBADF::Errno + end + + def io_write(io, buffer, length, offset) + return -Errno::EINVAL::Errno + end +end + +# This scheduler has a broken implementation of `unblock`` in the sense that it +# raises an exception. This is used to test the behavior of the scheduler when +# unblock raises an exception. +class BrokenUnblockScheduler < Scheduler + def unblock(blocker, fiber) + super + + raise "Broken unblock!" + end +end + +# This scheduler has a broken implementation of `unblock` in the sense that it +# sleeps. This is used to test the behavior of the scheduler when unblock +# messes with the internal thread state in an unexpected way. +class SleepingUnblockScheduler < Scheduler + # This method is invoked when the thread is exiting. + def unblock(blocker, fiber) + super + + # This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang. + sleep(0.1) + end +end + +# This scheduler has a broken implementation of `kernel_sleep` in the sense that +# it invokes a blocking sleep which can cause a deadlock in some cases. +class SleepingBlockingScheduler < Scheduler + def kernel_sleep(duration = nil) + # Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct. + Fiber.blocking{sleep 0.0001} + + self.block(:sleep, duration) + + return true + end +end diff --git a/test/fiber/test_address_resolve.rb b/test/fiber/test_address_resolve.rb new file mode 100644 index 0000000000..09c8db6049 --- /dev/null +++ b/test/fiber/test_address_resolve.rb @@ -0,0 +1,278 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestAddressResolve < Test::Unit::TestCase + class NullScheduler < Scheduler + def address_resolve(*) + end + end + + class StubScheduler < Scheduler + def address_resolve(hostname) + ["1.2.3.4", "1234::1"] + end + end + + def test_addrinfo_getaddrinfo_ipv4_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + result = Addrinfo.getaddrinfo("example.com", 80, :AF_INET, :STREAM) + assert_equal(1, result.count) + + ai = result.first + assert_equal("1.2.3.4", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET, ai.afamily) + assert_equal(Socket::SOCK_STREAM, ai.socktype) + end + end.join + end + + def test_addrinfo_getaddrinfo_ipv6_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + result = Addrinfo.getaddrinfo("example.com", 80, :AF_INET6, :STREAM) + assert_equal(1, result.count) + + ai = result.first + assert_equal("1234::1", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET6, ai.afamily) + assert_equal(Socket::SOCK_STREAM, ai.socktype) + end + end.join + end + + def test_addrinfo_getaddrinfo_pf_unspec_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + results = Addrinfo.getaddrinfo("example.com", 80, :PF_UNSPEC, :STREAM) + assert_equal(2, results.count) + + ai_ipv4 = results.first + assert_equal("1.2.3.4", ai_ipv4.ip_address) + assert_equal(80, ai_ipv4.ip_port) + assert_equal(Socket::AF_INET, ai_ipv4.afamily) + assert_equal(Socket::SOCK_STREAM, ai_ipv4.socktype) + + ai_ipv6 = results.last + assert_equal("1234::1", ai_ipv6.ip_address) + assert_equal(80, ai_ipv6.ip_port) + assert_equal(Socket::AF_INET6, ai_ipv6.afamily) + assert_equal(Socket::SOCK_STREAM, ai_ipv6.socktype) + end + end.join + end + + def test_addrinfo_getaddrinfo_full_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + results = Addrinfo.getaddrinfo("example.com", 80) + assert_equal(6, results.count) + + ai_ipv4_tcp = results[0] + assert_equal("1.2.3.4", ai_ipv4_tcp.ip_address) + assert_equal(80, ai_ipv4_tcp.ip_port) + assert_equal(Socket::AF_INET, ai_ipv4_tcp.afamily) + assert_equal(Socket::SOCK_STREAM, ai_ipv4_tcp.socktype) + + ai_ipv4_udp = results[1] + assert_equal("1.2.3.4", ai_ipv4_udp.ip_address) + assert_equal(80, ai_ipv4_udp.ip_port) + assert_equal(Socket::AF_INET, ai_ipv4_udp.afamily) + assert_equal(Socket::SOCK_DGRAM, ai_ipv4_udp.socktype) + + ai_ipv4_sock_raw = results[2] + assert_equal("1.2.3.4", ai_ipv4_sock_raw.ip_address) + assert_equal(80, ai_ipv4_sock_raw.ip_port) + assert_equal(Socket::AF_INET, ai_ipv4_sock_raw.afamily) + assert_equal(Socket::SOCK_RAW, ai_ipv4_sock_raw.socktype) + + ai_ipv6_tcp = results[3] + assert_equal("1234::1", ai_ipv6_tcp.ip_address) + assert_equal(80, ai_ipv6_tcp.ip_port) + assert_equal(Socket::AF_INET6, ai_ipv6_tcp.afamily) + assert_equal(Socket::SOCK_STREAM, ai_ipv6_tcp.socktype) + + ai_ipv6_udp = results[4] + assert_equal("1234::1", ai_ipv6_udp.ip_address) + assert_equal(80, ai_ipv6_udp.ip_port) + assert_equal(Socket::AF_INET6, ai_ipv6_udp.afamily) + assert_equal(Socket::SOCK_DGRAM, ai_ipv6_udp.socktype) + + ai_ipv6_sock_raw = results[5] + assert_equal("1234::1", ai_ipv6_sock_raw.ip_address) + assert_equal(80, ai_ipv6_sock_raw.ip_port) + assert_equal(Socket::AF_INET6, ai_ipv6_sock_raw.afamily) + assert_equal(Socket::SOCK_RAW, ai_ipv6_sock_raw.socktype) + end + end.join + end + + def test_addrinfo_getaddrinfo_numeric_non_blocking + Thread.new do + scheduler = NullScheduler.new # scheduler hook not invoked + Fiber.set_scheduler scheduler + + Fiber.schedule do + result = Addrinfo.getaddrinfo("1.2.3.4", 80, :AF_INET, :STREAM) + assert_equal(1, result.count) + + ai = result.first + assert_equal("1.2.3.4", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET, ai.afamily) + assert_equal(Socket::SOCK_STREAM, ai.socktype) + end + end.join + end + + def test_addrinfo_getaddrinfo_any_non_blocking + Thread.new do + scheduler = NullScheduler.new # scheduler hook not invoked + Fiber.set_scheduler scheduler + + Fiber.schedule do + result = Addrinfo.getaddrinfo("<any>", 80, :AF_INET, :STREAM) + assert_equal(1, result.count) + + ai = result.first + assert_equal("0.0.0.0", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET, ai.afamily) + assert_equal(Socket::SOCK_STREAM, ai.socktype) + end + end.join + end + + def test_addrinfo_getaddrinfo_localhost + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + results = Addrinfo.getaddrinfo("localhost", 80, :AF_INET, :STREAM) + assert_equal(1, results.count) + + ai = results.first + assert_equal("1.2.3.4", ai.ip_address) + end + end.join + end + + def test_addrinfo_getaddrinfo_non_existing_domain_blocking + Thread.new do + scheduler = NullScheduler.new # invoked, returns nil + Fiber.set_scheduler scheduler + + Fiber.schedule do + assert_raise(Socket::ResolutionError) { + Addrinfo.getaddrinfo("non-existing-domain.abc", nil) + } + end + end.join + end + + def test_addrinfo_getaddrinfo_no_host_non_blocking + Thread.new do + scheduler = NullScheduler.new # scheduler hook not invoked + Fiber.set_scheduler scheduler + + Fiber.schedule do + result = Addrinfo.getaddrinfo(nil, 80, :AF_INET, :STREAM) + assert_equal(1, result.count) + + ai = result.first + assert_equal("127.0.0.1", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET, ai.afamily) + assert_equal(Socket::SOCK_STREAM, ai.socktype) + end + end.join + end + + def test_addrinfo_ip_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + ai = Addrinfo.ip("example.com") + + assert_equal("1.2.3.4", ai.ip_address) + end + end.join + end + + def test_addrinfo_tcp_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + ai = Addrinfo.tcp("example.com", 80) + + assert_equal("1.2.3.4", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET, ai.afamily) + assert_equal(Socket::SOCK_STREAM, ai.socktype) + end + end.join + end + + def test_addrinfo_udp_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + ai = Addrinfo.udp("example.com", 80) + + assert_equal("1.2.3.4", ai.ip_address) + assert_equal(80, ai.ip_port) + assert_equal(Socket::AF_INET, ai.afamily) + assert_equal(Socket::SOCK_DGRAM, ai.socktype) + end + end.join + end + + def test_ip_socket_getaddress_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + ip = IPSocket.getaddress("example.com") + + assert_equal("1.2.3.4", ip) + end + end.join + end + + # This test "hits deep" into C function call chain. + def test_socket_getnameinfo_domain_blocking + Thread.new do + scheduler = StubScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + result = Socket.getnameinfo(["AF_INET", 80, "example.com"], Socket::NI_NUMERICSERV | Socket::NI_NUMERICHOST) + + assert_equal(["1.2.3.4", "80"], result) + end + end.join + end +end diff --git a/test/fiber/test_backtrace.rb b/test/fiber/test_backtrace.rb new file mode 100644 index 0000000000..ba05ca2983 --- /dev/null +++ b/test/fiber/test_backtrace.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true +require 'test/unit' +require 'fiber' + +class TestFiberBacktrace < Test::Unit::TestCase + def test_backtrace + backtrace = Fiber.current.backtrace + assert_kind_of Array, backtrace + assert_match(/test_backtrace/, backtrace[0]) + end + + def test_backtrace_locations + backtrace = Fiber.current.backtrace_locations + assert_kind_of Array, backtrace + assert_match(/test_backtrace_locations/, backtrace[1].label) + end + + def test_local_backtrace + backtrace = Fiber.current.backtrace(2) + assert_equal backtrace, caller + end +end diff --git a/test/fiber/test_enumerator.rb b/test/fiber/test_enumerator.rb new file mode 100644 index 0000000000..e9410f925c --- /dev/null +++ b/test/fiber/test_enumerator.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberEnumerator < Test::Unit::TestCase + MESSAGE = "Hello World" + + def test_read_characters + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + + message = String.new + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + e = i.to_enum(:each_char) + + Fiber.schedule do + o.write("Hello World") + o.close + end + + Fiber.schedule do + begin + while c = e.next + message << c + end + rescue StopIteration + # Ignore. + end + + i.close + end + end + + thread.join + + assert_equal(MESSAGE, message) + assert_predicate(i, :closed?) + assert_predicate(o, :closed?) + end + + def enumerator_fiber_is_nonblocking + enumerator = Enumerator.new do |yielder| + yielder << Fiber.current.blocking? + end + + assert_equal(false, enumerator.next) + end +end diff --git a/test/fiber/test_io.rb b/test/fiber/test_io.rb new file mode 100644 index 0000000000..eea06f97c8 --- /dev/null +++ b/test/fiber/test_io.rb @@ -0,0 +1,278 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberIO < Test::Unit::TestCase + MESSAGE = "Hello World" + + def test_read + omit unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + if RUBY_PLATFORM =~ /mswin|mingw/ + i.nonblock = true + o.nonblock = true + end + + message = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + message = i.read(20) + i.close + end + + Fiber.schedule do + o.write("Hello World") + o.close + end + end + + thread.join + + assert_equal MESSAGE, message + assert_predicate(i, :closed?) + assert_predicate(o, :closed?) + end + + def test_heavy_read + omit unless defined?(UNIXSocket) + + 16.times.map do + Thread.new do + i, o = UNIXSocket.pair + if RUBY_PLATFORM =~ /mswin|mingw/ + i.nonblock = true + o.nonblock = true + end + + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + i.read(20) + i.close + end + + Fiber.schedule do + o.write("Hello World") + o.close + end + end + end.each(&:join) + end + + def test_epipe_on_read + omit unless defined?(UNIXSocket) + omit "nonblock=true isn't properly supported on Windows" if RUBY_PLATFORM =~ /mswin|mingw/ + + i, o = UNIXSocket.pair + + error = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + begin + i.close + o.write(MESSAGE) + rescue => error + # Saved into error. + end + end + end + + thread.join + + i.close + o.close + + assert_kind_of Errno::EPIPE, error + end + + def test_tcp_accept + server = TCPServer.new('localhost', 0) + + th = Thread.new do + Fiber.set_scheduler(Scheduler.new) + + Fiber.schedule do + sender = server.accept + sender.wait_writable + sender.write "hello" + sender.close + end + end + + recver = TCPSocket.new('localhost', server.local_address.ip_port) + assert "hello", recver.read + + recver.close + server.close + th.join + end + + def test_tcp_connect + server = TCPServer.new('localhost', 0) + + th = Thread.new do + Fiber.set_scheduler(Scheduler.new) + + Fiber.schedule do + sender = TCPSocket.new('localhost', server.local_address.ip_port) + sender.write "hello" + sender.close + end + end + + recver = server.accept + assert "hello", recver.read + + recver.close + server.close + th.join + end + + def test_read_write_blocking + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + message = nil + + thread = Thread.new do + # This scheduler provides non-blocking `io_read`/`io_write`: + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + # We add 1 here, to force the read to block (testing that specific code path). + message = i.read(MESSAGE.bytesize + 1) + i.close + end + + Fiber.schedule do + o.write(MESSAGE) + o.close + end + end + + thread.join + + assert_equal MESSAGE, message + assert_predicate(i, :closed?) + assert_predicate(o, :closed?) + end + + def test_puts_empty + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + thread = Thread.new do + # This scheduler provides non-blocking `io_read`/`io_write`: + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + # This was causing a segfault on older Ruby. + o.puts "" + o.puts nil + o.close + end + end + + thread.join + + message = i.read + i.close + + assert_equal $/*2, message + end + + def test_io_select + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + UNIXSocket.pair do |r, w| + result = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + w.write("Hello World") + result = IO.select([r], [w]) + end + end + + thread.join + + assert_equal [[r], [w], []], result + end + end + + def test_backquote + result = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + result = `#{EnvUtil.rubybin} -e "sleep 0.1;puts %[ok]"` + end + end + thread.join + + assert_equal "ok\n", result + end + + # Tests for https://bugs.ruby-lang.org/issues/20723 which would + # otherwise deadlock this test. + def test_close_while_reading_on_thread + # Windows has UNIXSocket, but only with VS 2019+ + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + Socket.pair(:UNIX, :STREAM) do |i, o| + if RUBY_PLATFORM =~ /mswin|mingw/ + i.nonblock = true + o.nonblock = true + end + + reading_thread = Thread.new do + Thread.current.report_on_exception = false + i.wait_readable + end + + scheduler_thread = Thread.new do + # Wait until the reading thread is blocked on read: + Thread.pass until reading_thread.status == "sleep" + + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + i.close + end + end + + assert_raise(IOError) { reading_thread.join } + refute_nil scheduler_thread.join(5), "expected thread to terminate within 5 seconds" + + assert_predicate(i, :closed?) + ensure + scheduler_thread&.kill + scheduler_thread&.join rescue nil + reading_thread&.kill + reading_thread&.join rescue nil + end + end +end diff --git a/test/fiber/test_io_buffer.rb b/test/fiber/test_io_buffer.rb new file mode 100644 index 0000000000..19e6c1f88e --- /dev/null +++ b/test/fiber/test_io_buffer.rb @@ -0,0 +1,200 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +require 'timeout' + +class TestFiberIOBuffer < Test::Unit::TestCase + MESSAGE = "Hello World" + + def test_read_write_blocking + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + message = nil + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + # We add 1 here, to force the read to block (testing that specific code path). + message = i.read(MESSAGE.bytesize + 1) + i.close + end + + Fiber.schedule do + o.write(MESSAGE) + o.close + end + end + + thread.join + + assert_equal MESSAGE, message + assert_predicate(i, :closed?) + assert_predicate(o, :closed?) + ensure + i&.close + o&.close + end + + def test_timeout_after + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + i.nonblock = false + o.nonblock = false + + message = nil + error = nil + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + Timeout.timeout(0.1) do + message = i.read(20) + end + rescue Timeout::Error => error + # Assertions below. + ensure + i.close + end + end + + thread.join + + assert_nil message + assert_kind_of Timeout::Error, error + ensure + i&.close + o&.close + end + + def test_read_nonblock + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + + message = nil + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + message = i.read_nonblock(20, exception: false) + i.close + end + end + + thread.join + + assert_equal :wait_readable, message + ensure + i&.close + o&.close + end + + def test_write_nonblock + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + o.write_nonblock(MESSAGE, exception: false) + o.close + end + end + + thread.join + + assert_equal MESSAGE, i.read + ensure + i&.close + o&.close + end + + def test_io_buffer_read_write + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + source_buffer = IO::Buffer.for("Hello World!") + destination_buffer = IO::Buffer.new(source_buffer.size) + + # Test non-scheduler code path: + source_buffer.write(o, source_buffer.size) + destination_buffer.read(i, source_buffer.size) + assert_equal source_buffer, destination_buffer + + # Test scheduler code path: + destination_buffer.clear + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + source_buffer.write(o, source_buffer.size) + destination_buffer.read(i, source_buffer.size) + end + end + + thread.join + + assert_equal source_buffer, destination_buffer + ensure + i&.close + o&.close + end + + def nonblockable?(io) + io.nonblock{} + true + rescue + false + end + + def test_io_buffer_pread_pwrite + file = Tempfile.new("test_io_buffer_pread_pwrite") + + omit "Non-blocking file IO is not supported" unless nonblockable?(file) + + source_buffer = IO::Buffer.for("Hello World!") + destination_buffer = IO::Buffer.new(source_buffer.size) + + # Test non-scheduler code path: + source_buffer.pwrite(file, 1, source_buffer.size) + destination_buffer.pread(file, 1, source_buffer.size) + assert_equal source_buffer, destination_buffer + + # Test scheduler code path: + destination_buffer.clear + file.truncate(0) + + thread = Thread.new do + scheduler = IOBufferScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + source_buffer.pwrite(file, 1, source_buffer.size) + destination_buffer.pread(file, 1, source_buffer.size) + end + end + + thread.join + + assert_equal source_buffer, destination_buffer + ensure + file&.close! + end +end diff --git a/test/fiber/test_io_close.rb b/test/fiber/test_io_close.rb new file mode 100644 index 0000000000..742b40841d --- /dev/null +++ b/test/fiber/test_io_close.rb @@ -0,0 +1,107 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberIOClose < Test::Unit::TestCase + def with_socket_pair(&block) + omit "UNIXSocket is not defined!" unless defined?(UNIXSocket) + + UNIXSocket.pair do |i, o| + if RUBY_PLATFORM =~ /mswin|mingw/ + i.nonblock = true + o.nonblock = true + end + + yield i, o + end + end + + def test_io_close_across_fibers + # omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/ + + with_socket_pair do |i, o| + error = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + i.read + rescue => error + # Ignore. + end + + Fiber.schedule do + i.close + end + end + + thread.join + + assert_instance_of IOError, error + assert_match(/closed/, error.message) + end + end + + def test_io_close_blocking_thread + omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/ + + with_socket_pair do |i, o| + error = nil + + reading_thread = Thread.new do + i.read + rescue => error + # Ignore. + end + + Thread.pass until reading_thread.status == 'sleep' + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + i.close + end + end + + thread.join + reading_thread.join + + assert_instance_of IOError, error + assert_match(/closed/, error.message) + end + end + + def test_io_close_blocking_fiber + # omit "Interrupting a io_wait read is not supported!" if RUBY_PLATFORM =~ /mswin|mingw/ + + with_socket_pair do |i, o| + error = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + begin + i.read + rescue => error + # Ignore. + end + end + end + + Thread.pass until thread.status == 'sleep' + + i.close + + thread.join + + assert_instance_of IOError, error + assert_match(/closed/, error.message) + end + end +end diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb new file mode 100644 index 0000000000..2cee2cc235 --- /dev/null +++ b/test/fiber/test_mutex.rb @@ -0,0 +1,240 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberMutex < Test::Unit::TestCase + def test_mutex_synchronize + mutex = Thread::Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + assert_not_predicate Fiber, :blocking? + + mutex.synchronize do + assert_not_predicate Fiber, :blocking? + end + end + end + + thread.join + end + + def test_mutex_interleaved_locking + mutex = Thread::Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + mutex.lock + sleep 0.1 + mutex.unlock + end + + Fiber.schedule do + mutex.lock + sleep 0.1 + mutex.unlock + end + + scheduler.run + end + + thread.join + end + + def test_mutex_thread + mutex = Thread::Mutex.new + mutex.lock + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + mutex.lock + sleep 0.1 + mutex.unlock + end + + scheduler.run + end + + sleep 0.1 + mutex.unlock + + thread.join + end + + def test_mutex_fiber_raise + mutex = Thread::Mutex.new + ran = false + + main = Thread.new do + mutex.lock + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + f = Fiber.schedule do + assert_raise_with_message(RuntimeError, "bye") do + mutex.lock + end + + ran = true + end + + Fiber.schedule do + f.raise "bye" + end + end + + thread.join + end + + main.join # causes mutex to be released + assert_equal false, mutex.locked? + assert_equal true, ran + end + + def test_condition_variable + mutex = Thread::Mutex.new + condition = Thread::ConditionVariable.new + + signalled = 0 + + Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + mutex.synchronize do + 3.times do + condition.wait(mutex) + signalled += 1 + end + end + end + + Fiber.schedule do + 3.times do + mutex.synchronize do + condition.signal + end + + sleep 0.1 + end + end + + scheduler.run + end.join + + assert_equal 3, signalled + end + + def test_queue + queue = Thread::Queue.new + processed = 0 + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + 3.times do |i| + queue << i + sleep 0.1 + end + + queue.close + end + + Fiber.schedule do + while item = queue.pop + processed += 1 + end + end + + scheduler.run + end + + thread.join + + assert_equal 3, processed + end + + def test_queue_pop_waits + queue = Thread::Queue.new + running = false + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + result = nil + Fiber.schedule do + result = queue.pop + end + + running = true + scheduler.run + result + end + + Thread.pass until running + sleep 0.1 + + queue << :done + assert_equal :done, thread.value + end + + def test_mutex_deadlock + error_pattern = /lock already owned by another fiber/ + + assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], error_pattern, success: false + require 'scheduler' + mutex = Thread::Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + mutex.synchronize do + puts 'in synchronize' + scheduler.transfer + end + end + + mutex.lock + end + + thread.join + RUBY + end + + def test_mutex_fiber_deadlock_no_scheduler + thr = Thread.new do + loop do + sleep 1 + end + end + + mutex = Mutex.new + mutex.synchronize do + error = assert_raise ThreadError do + Fiber.new do + mutex.lock + end.resume + end + assert_includes error.message, "deadlock; lock already owned by another fiber belonging to the same thread" + end + ensure + thr&.kill&.join + end +end diff --git a/test/fiber/test_process.rb b/test/fiber/test_process.rb new file mode 100644 index 0000000000..f17f767704 --- /dev/null +++ b/test/fiber/test_process.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberProcess < Test::Unit::TestCase + TRUE_CMD = RUBY_PLATFORM =~ /mswin|mingw/ ? "exit 0" : "true" + + def test_process_wait + Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + pid = Process.spawn(TRUE_CMD) + Process.wait(pid) + + # TODO test that scheduler was invoked. + + assert_predicate $?, :success? + end + end.join + end + + def test_system + Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + system(TRUE_CMD) + + # TODO test that scheduler was invoked (currently it's not). + + assert_predicate $?, :success? + end + end.join + end + + def test_system_faulty_process_wait + Thread.new do + scheduler = Scheduler.new + + def scheduler.process_wait(pid, flags) + Fiber.blocking{Process.wait(pid, flags)} + + # Don't return `Process::Status` instance. + return false + end + + Fiber.set_scheduler scheduler + + Fiber.schedule do + assert_raise TypeError do + system(TRUE_CMD) + end + end + end.join + end + + def test_fork + omit 'fork not supported' unless Process.respond_to?(:fork) + + pid = Process.fork{} + + Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + Process.wait(pid) + + assert_predicate $?, :success? + end + end.join + end +end diff --git a/test/fiber/test_queue.rb b/test/fiber/test_queue.rb new file mode 100644 index 0000000000..d78b026f11 --- /dev/null +++ b/test/fiber/test_queue.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberQueue < Test::Unit::TestCase + def test_pop_with_timeout + queue = Thread::Queue.new + kill = false + result = :unspecified + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler(scheduler) + + Fiber.schedule do + result = queue.pop(timeout: 0.0001) + end + + scheduler.run + end + until thread.join(2) + kill = true + thread.kill + end + + assert_false(kill, 'Getting stuck due to a possible compiler bug.') + assert_nil result + end + + def test_pop_with_timeout_and_value + queue = Thread::Queue.new + queue.push(:something) + kill = false + result = :unspecified + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler(scheduler) + + Fiber.schedule do + result = queue.pop(timeout: 0.0001) + end + + scheduler.run + end + until thread.join(2) + kill = true + thread.kill + end + + assert_false(kill, 'Getting stuck due to a possible compiler bug.') + assert_equal :something, result + end +end diff --git a/test/fiber/test_ractor.rb b/test/fiber/test_ractor.rb new file mode 100644 index 0000000000..7dd82eda62 --- /dev/null +++ b/test/fiber/test_ractor.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true +require "test/unit" +require "fiber" + +class TestFiberCurrentRactor < Test::Unit::TestCase + def setup + omit unless defined? Ractor + end + + def test_ractor_shareable + assert_separately([], "#{<<~"begin;"}\n#{<<~'end;'}") + begin; + $VERBOSE = nil + require "fiber" + r = Ractor.new do + Fiber.new do + Fiber.current.class + end.resume + end + assert_equal(Fiber, r.value) + end; + end +end diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb new file mode 100644 index 0000000000..d3696267f7 --- /dev/null +++ b/test/fiber/test_scheduler.rb @@ -0,0 +1,386 @@ +# frozen_string_literal: true +require 'test/unit' +require 'securerandom' +require 'fileutils' +require_relative 'scheduler' + +class TestFiberScheduler < Test::Unit::TestCase + def test_fiber_without_scheduler + # Cannot create fiber without scheduler. + assert_raise RuntimeError do + Fiber.schedule do + end + end + end + + def test_fiber_new + f = Fiber.new{} + refute f.blocking? + end + + def test_fiber_new_with_options + f = Fiber.new(blocking: true){} + assert f.blocking? + + f = Fiber.new(blocking: false){} + refute f.blocking? + + f = Fiber.new(pool: nil){} + refute f.blocking? + end + + def test_fiber_blocking + f = Fiber.new(blocking: false) do + fiber = Fiber.current + refute fiber.blocking? + Fiber.blocking do |_fiber| + assert_equal fiber, _fiber + assert fiber.blocking? + end + end + f.resume + end + + def test_closed_at_thread_exit + scheduler = Scheduler.new + + thread = Thread.new do + Fiber.set_scheduler scheduler + end + + thread.join + + assert scheduler.closed? + end + + def test_closed_when_set_to_nil + scheduler = Scheduler.new + + thread = Thread.new do + Fiber.set_scheduler scheduler + Fiber.set_scheduler nil + + assert scheduler.closed? + end + + thread.join + end + + def test_close_at_exit + assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['Running Fiber'], [], success: true + require 'scheduler' + Warning[:experimental] = false + + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleep(0) + puts "Running Fiber" + end + RUBY + end + + def test_minimal_interface + scheduler = Object.new + + def scheduler.block + end + + def scheduler.unblock + end + + def scheduler.io_wait + end + + def scheduler.kernel_sleep + end + + def scheduler.fiber_interrupt(_fiber, _exception) + end + + thread = Thread.new do + Fiber.set_scheduler scheduler + end + + thread.join + end + + def test_current_scheduler + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + assert Fiber.scheduler + refute Fiber.current_scheduler + + Fiber.schedule do + assert Fiber.current_scheduler + end + end + + thread.join + end + + def test_autoload + 10.times do + Object.autoload(:TestFiberSchedulerAutoload, File.expand_path("autoload.rb", __dir__)) + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + 10.times do + Fiber.schedule do + Object.const_get(:TestFiberSchedulerAutoload) + end + end + end + + thread.join + ensure + $LOADED_FEATURES.delete(File.expand_path("autoload.rb", __dir__)) + Object.send(:remove_const, :TestFiberSchedulerAutoload) + end + end + + def test_iseq_compile_under_gc_stress_bug_21180 + Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + EnvUtil.under_gc_stress do + RubyVM::InstructionSequence.compile_file(File::NULL) + end + end + end.join + end + + def test_deadlock + mutex = Thread::Mutex.new + condition = Thread::ConditionVariable.new + q = 0.0001 + + signaller = Thread.new do + loop do + mutex.synchronize do + condition.signal + end + sleep q + end + end + + i = 0 + + thread = Thread.new do + scheduler = SleepingBlockingScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + 10.times do + mutex.synchronize do + condition.wait(mutex) + sleep q + i += 1 + end + end + end + end + + # Wait for 10 seconds at most... if it doesn't finish, it's deadlocked. + thread.join(10) + + # If it's deadlocked, it will never finish, so this will be 0. + assert_equal 10, i + ensure + # Make sure the threads are dead... + thread.kill + signaller.kill + thread.join + signaller.join + end + + def test_condition_variable + condition_variable = ::Thread::ConditionVariable.new + mutex = ::Thread::Mutex.new + + error = nil + + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + fiber = Fiber.schedule do + begin + mutex.synchronize do + condition_variable.wait(mutex) + end + rescue => error + end + end + + fiber.raise(RuntimeError) + end + + thread.join + assert_kind_of RuntimeError, error + end + + def test_post_fork_scheduler_reset + omit 'fork not supported' unless Process.respond_to?(:fork) + + forked_scheduler_state = nil + thread = Thread.new do + r, w = IO.pipe + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + forked_pid = fork do + r.close + w << (Fiber.scheduler ? 'set' : 'reset') + w.close + end + w.close + forked_scheduler_state = r.read + Process.wait(forked_pid) + ensure + r.close rescue nil + w.close rescue nil + end + thread.join + assert_equal 'reset', forked_scheduler_state + ensure + thread.kill rescue nil + end + + def test_post_fork_fiber_blocking + omit 'fork not supported' unless Process.respond_to?(:fork) + + fiber_blocking_state = nil + thread = Thread.new do + r, w = IO.pipe + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + forked_pid = nil + Fiber.schedule do + forked_pid = fork do + r.close + w << (Fiber.current.blocking? ? 'blocking' : 'nonblocking') + w.close + end + end + w.close + fiber_blocking_state = r.read + Process.wait(forked_pid) + ensure + r.close rescue nil + w.close rescue nil + end + thread.join + assert_equal 'blocking', fiber_blocking_state + ensure + thread.kill rescue nil + end + + def test_io_write_on_flush + begin + path = File.join(Dir.tmpdir, "ruby_test_io_write_on_flush_#{SecureRandom.hex}") + descriptor = nil + operations = nil + + thread = Thread.new do + scheduler = IOScheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + File.open(path, 'w+') do |file| + descriptor = file.fileno + file << 'foo' + file.flush + file << 'bar' + end + end + + operations = scheduler.operations + end + + thread.join + assert_equal [ + [:io_write, descriptor, 'foo'], + [:io_write, descriptor, 'bar'] + ], operations + + assert_equal 'foobar', IO.read(path) + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end + end + + def test_io_read_error + path = File.join(Dir.tmpdir, "ruby_test_io_read_error_#{SecureRandom.hex}") + error = nil + + thread = Thread.new do + scheduler = IOErrorScheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + File.open(path, 'w+') { it.read } + rescue => error + # Ignore. + end + end + + thread.join + assert_kind_of Errno::EBADF, error + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end + + def test_io_write_error + path = File.join(Dir.tmpdir, "ruby_test_io_write_error_#{SecureRandom.hex}") + error = nil + + thread = Thread.new do + scheduler = IOErrorScheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + File.open(path, 'w+') { it.sync = true; it << 'foo' } + rescue => error + # Ignore. + end + end + + thread.join + assert_kind_of Errno::EINVAL, error + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end + + def test_io_write_flush_error + path = File.join(Dir.tmpdir, "ruby_test_io_write_flush_error_#{SecureRandom.hex}") + error = nil + + thread = Thread.new do + scheduler = IOErrorScheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + File.open(path, 'w+') { it << 'foo' } + rescue => error + # Ignore. + end + end + + thread.join + assert_kind_of Errno::EINVAL, error + ensure + thread.kill rescue nil + FileUtils.rm_f(path) + end +end diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb new file mode 100644 index 0000000000..187f59dbd4 --- /dev/null +++ b/test/fiber/test_sleep.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestFiberSleep < Test::Unit::TestCase + ITEMS = [0, 1, 2, 3, 4] + + def test_sleep + items = [] + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + 5.times do |i| + Fiber.schedule do + assert_operator sleep(i/100.0), :>=, 0 + items << i + end + end + + # Should be 5 fibers waiting: + assert_equal scheduler.waiting.size, 5 + end + + thread.join + + assert_equal ITEMS, items + end + + def test_sleep_returns_seconds_slept + seconds = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + Fiber.schedule do + seconds = sleep(1.1) + end + end + + thread.join + + assert_operator seconds, :>=, 1, "actual: %p" % seconds + end + + def test_broken_sleep + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = Scheduler.new + + def scheduler.kernel_sleep(duration = nil) + raise "Broken sleep!" + end + + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleep 0 + end + + ensure + scheduler.close + end + + assert_raise(RuntimeError) do + thread.join + end + end +end diff --git a/test/fiber/test_storage.rb b/test/fiber/test_storage.rb new file mode 100644 index 0000000000..3726decbdb --- /dev/null +++ b/test/fiber/test_storage.rb @@ -0,0 +1,115 @@ +# frozen_string_literal: true +require 'test/unit' + +class TestFiberStorage < Test::Unit::TestCase + def test_storage + Fiber.new do + Fiber[:x] = 10 + assert_kind_of Hash, Fiber.current.storage + assert_predicate Fiber.current.storage, :any? + end.resume + end + + def test_storage_inherited + Fiber.new do + Fiber[:foo] = :bar + + Fiber.new do + assert_equal :bar, Fiber[:foo] + Fiber[:bar] = :baz + end.resume + + assert_nil Fiber[:bar] + end.resume + end + + def test_variable_assignment + Fiber.new do + Fiber[:foo] = :bar + assert_equal :bar, Fiber[:foo] + end.resume + end + + def test_storage_assignment + old, Warning[:experimental] = Warning[:experimental], false + + Fiber.new do + Fiber.current.storage = {foo: :bar} + assert_equal :bar, Fiber[:foo] + end.resume + ensure + Warning[:experimental] = old + end + + def test_storage_only_allow_access_from_same_fiber + old, Warning[:experimental] = Warning[:experimental], false + + f = Fiber.new do + Fiber[:a] = 1 + end + assert_raise(ArgumentError) { f.storage } + assert_raise(ArgumentError) { f.storage = {} } + ensure + Warning[:experimental] = old + end + + def test_inherited_storage + Fiber.new(storage: {foo: :bar}) do + f = Fiber.new do + assert_equal :bar, Fiber[:foo] + end + f.resume + end.resume + end + + def test_enumerator_inherited_storage + Fiber.new do + Fiber[:item] = "Hello World" + + enumerator = Enumerator.new do |out| + out << Fiber.current + out << Fiber[:item] + end + + # The fiber within the enumerator is not equal to the current... + assert_not_equal Fiber.current, enumerator.next + + # But it inherited the storage from the current fiber: + assert_equal "Hello World", enumerator.next + end.resume + end + + def test_thread_inherited_storage + Fiber.new do + Fiber[:x] = 10 + + x = Thread.new do + Fiber[:y] = 20 + Fiber[:x] + end.value + + assert_equal 10, x + assert_equal nil, Fiber[:y] + end.resume + end + + def test_enumerator_count + Fiber.new do + Fiber[:count] = 0 + + enumerator = Enumerator.new do |y| + Fiber[:count] += 1 + y << Fiber[:count] + end + + assert_equal 1, enumerator.next + assert_equal 0, Fiber[:count] + end.resume + end + + def test_storage_assignment_type_error + assert_raise(TypeError) do + Fiber.new(storage: {Object.new => "bar"}) {} + end + end +end diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb new file mode 100644 index 0000000000..4d2fbde9ed --- /dev/null +++ b/test/fiber/test_thread.rb @@ -0,0 +1,175 @@ +# frozen_string_literal: true +require "test/unit" +require_relative 'scheduler' + +class TestFiberThread < Test::Unit::TestCase + def test_thread_join + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + result = nil + Fiber.schedule do + result = Thread.new{:done}.value + end + + scheduler.run + result + end + + assert_equal :done, thread.value + end + + def test_thread_join_timeout + sleeper = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleeper = Thread.new{sleep} + sleeper.join(0.1) + end + + scheduler.run + end + + thread.join + + assert_predicate sleeper, :alive? + ensure + sleeper&.kill&.join + end + + def test_thread_join_implicit + sleeping = false + finished = false + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleeping = true + sleep(0.1) + finished = true + end + + :done + end + + Thread.pass until sleeping + + thread.join + + assert_equal :done, thread.value + assert finished, "Scheduler thread's task should be finished!" + end + + def test_thread_join_blocking + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + result = nil + Fiber.schedule do + Fiber.new(blocking: true) do + # This can deadlock if the blocking state is not taken into account: + Thread.new do + sleep(0) + result = :done + end.join + end.resume + end + + scheduler.run + result + end + + assert_equal :done, thread.value + end + + def test_spurious_unblock_during_thread_join + ready = Thread::Queue.new + + target_thread = Thread.new do + ready.pop + :success + end + + Thread.pass until target_thread.status == "sleep" + + result = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + # Create a fiber that will join a long-running thread: + joining_fiber = Fiber.schedule do + result = target_thread.value + end + + # Create another fiber that spuriously unblocks the joining fiber: + Fiber.schedule do + # This interrupts the join in joining_fiber: + scheduler.unblock(:spurious_wakeup, joining_fiber) + + # This allows the unblock to be processed: + sleep(0) + + # This allows the target thread to finish: + ready.push(:done) + end + + scheduler.run + end + + thread.join + + assert_equal :success, result + end + + def test_broken_unblock + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = BrokenUnblockScheduler.new + + Fiber.set_scheduler scheduler + + Fiber.schedule do + Thread.new{ + Thread.current.report_on_exception = false + }.join + end + + scheduler.run + ensure + scheduler.close + end + + assert_raise(RuntimeError) do + thread.join + end + end + + def test_thread_join_hang + inner = nil + thread = Thread.new do + scheduler = SleepingUnblockScheduler.new + + Fiber.set_scheduler scheduler + + Fiber.schedule do + inner = Thread.new{sleep(0.01)} + inner.value + end + end + + thread.join + ensure + inner&.join + end +end diff --git a/test/fiber/test_timeout.rb b/test/fiber/test_timeout.rb new file mode 100644 index 0000000000..5493606be0 --- /dev/null +++ b/test/fiber/test_timeout.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +require 'timeout' + +class TestFiberTimeout < Test::Unit::TestCase + def test_timeout_after + error = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + begin + Timeout.timeout(0.001) do + sleep(1) + end + rescue + error = $! + end + end + end + + thread.join + + assert_kind_of(Timeout::Error, error) + end + + MESSAGE = "Hello World" + + def test_timeout_on_main_fiber + message = nil + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + assert_nil Fiber.current_scheduler + + Timeout.timeout(1) do + message = MESSAGE + end + end + + thread.join + + assert_equal MESSAGE, message + end +end |
