summaryrefslogtreecommitdiff
path: root/test/scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'test/scheduler')
-rwxr-xr-xtest/scheduler/http.rb53
-rw-r--r--test/scheduler/scheduler.rb163
-rw-r--r--test/scheduler/test_enumerator.rb45
-rw-r--r--test/scheduler/test_fiber.rb29
-rw-r--r--test/scheduler/test_http.rb28
-rw-r--r--test/scheduler/test_io.rb35
-rw-r--r--test/scheduler/test_mutex.rb47
-rw-r--r--test/scheduler/test_sleep.rb30
8 files changed, 430 insertions, 0 deletions
diff --git a/test/scheduler/http.rb b/test/scheduler/http.rb
new file mode 100755
index 0000000000..e2a007bc84
--- /dev/null
+++ b/test/scheduler/http.rb
@@ -0,0 +1,53 @@
+
+require 'benchmark'
+
+TOPICS = ["cats", "dogs", "pigs", "skeletons", "zombies", "ocelots", "villagers", "pillagers"]
+
+require 'net/http'
+require 'uri'
+require 'json'
+
+require_relative 'scheduler'
+
+def fetch_topics(topics)
+ responses = {}
+
+ topics.each do |topic|
+ Fiber.new(blocking: Fiber.current.blocking?) do
+ uri = URI("https://www.google.com/search?q=#{topic}")
+ responses[topic] = Net::HTTP.get(uri).scan(topic).size
+ end.resume
+ end
+
+ Thread.scheduler&.run
+
+ return responses
+end
+
+def sweep(repeats: 3, **options)
+ times = (1..8).map do |i|
+ $stderr.puts "Measuring #{i} topic(s)..."
+ topics = TOPICS[0...i]
+
+ Thread.new do
+ Benchmark.realtime do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ repeats.times do
+ Fiber.new(**options) do
+ pp fetch_topics(topics)
+ end.resume
+
+ scheduler.run
+ end
+ end
+ end.value / repeats
+ end
+
+ puts options.inspect
+ puts JSON.dump(times.map{|value| value.round(3)})
+end
+
+sweep(blocking: true)
+sweep(blocking: false)
diff --git a/test/scheduler/scheduler.rb b/test/scheduler/scheduler.rb
new file mode 100644
index 0000000000..b2d36cc728
--- /dev/null
+++ b/test/scheduler/scheduler.rb
@@ -0,0 +1,163 @@
+# frozen_string_literal: true
+
+require 'fiber'
+
+begin
+ require 'io/nonblock'
+rescue LoadError
+ # Ignore.
+end
+
+class Scheduler
+ def initialize
+ @readable = {}
+ @writable = {}
+ @waiting = {}
+ @blocking = []
+
+ @ios = ObjectSpace::WeakMap.new
+ end
+
+ attr :fiber
+
+ attr :readable
+ attr :writable
+ attr :waiting
+ attr :blocking
+
+ def next_timeout
+ fiber, timeout = @waiting.min_by{|key, value| value}
+
+ if timeout
+ offset = timeout - current_time
+
+ if offset < 0
+ return 0
+ else
+ return offset
+ end
+ end
+ end
+
+ def run
+ while @readable.any? or @writable.any? or @waiting.any?
+ # Can only handle file descriptors up to 1024...
+ readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
+
+ # puts "readable: #{readable}" if readable&.any?
+ # puts "writable: #{writable}" if writable&.any?
+
+ readable&.each do |io|
+ @readable[io]&.resume
+ end
+
+ writable&.each do |io|
+ @writable[io]&.resume
+ end
+
+ if @waiting.any?
+ time = current_time
+ waiting = @waiting
+ @waiting = {}
+
+ waiting.each do |fiber, timeout|
+ if timeout <= time
+ fiber.resume
+ else
+ @waiting[fiber] = timeout
+ end
+ end
+ end
+ end
+ end
+
+ def for_fd(fd)
+ @ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
+ end
+
+ def wait_readable(io)
+ @readable[io] = Fiber.current
+
+ Fiber.yield
+
+ @readable.delete(io)
+
+ return true
+ end
+
+ def wait_readable_fd(fd)
+ wait_readable(
+ for_fd(fd)
+ )
+ end
+
+ def wait_writable(io)
+ @writable[io] = Fiber.current
+
+ Fiber.yield
+
+ @writable.delete(io)
+
+ return true
+ end
+
+ def wait_writable_fd(fd)
+ wait_writable(
+ for_fd(fd)
+ )
+ end
+
+ def current_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+
+ def wait_sleep(duration = nil)
+ @waiting[Fiber.current] = current_time + duration
+
+ Fiber.yield
+
+ return true
+ end
+
+ def wait_any(io, events, duration)
+ unless (events & IO::WAIT_READABLE).zero?
+ @readable[io] = Fiber.current
+ end
+
+ unless (events & IO::WAIT_WRITABLE).zero?
+ @writable[io] = Fiber.current
+ end
+
+ Fiber.yield
+
+ @readable.delete(io)
+ @writable.delete(io)
+
+ return true
+ end
+
+ def wait_for_single_fd(fd, events, duration)
+ wait_any(
+ for_fd(fd),
+ events,
+ duration
+ )
+ end
+
+ def enter_blocking_region
+ # puts "Enter blocking region: #{caller.first}"
+ end
+
+ def exit_blocking_region
+ # puts "Exit blocking region: #{caller.first}"
+ @blocking << caller.first
+ end
+
+ def fiber(&block)
+ fiber = Fiber.new(blocking: false, &block)
+
+ fiber.resume
+
+ return fiber
+ end
+end
diff --git a/test/scheduler/test_enumerator.rb b/test/scheduler/test_enumerator.rb
new file mode 100644
index 0000000000..7c97382c52
--- /dev/null
+++ b/test/scheduler/test_enumerator.rb
@@ -0,0 +1,45 @@
+# frozen_string_literal: true
+require 'test/unit'
+require 'socket'
+require_relative 'scheduler'
+
+class TestSchedulerEnumerator < Test::Unit::TestCase
+ MESSAGE = "Hello World"
+
+ def test_read_characters
+ skip unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ skip unless i.nonblock? && o.nonblock?
+
+ message = String.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ e = i.to_enum(:each_char)
+
+ Fiber do
+ o.write("Hello World")
+ o.close
+ end
+
+ Fiber do
+ begin
+ while c = e.next
+ message << c
+ end
+ rescue StopIteration
+ # Ignore.
+ end
+
+ i.close
+ end
+ end
+
+ thread.join
+
+ assert_equal(MESSAGE, message)
+ end
+end
diff --git a/test/scheduler/test_fiber.rb b/test/scheduler/test_fiber.rb
new file mode 100644
index 0000000000..3452591cd9
--- /dev/null
+++ b/test/scheduler/test_fiber.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerFiber < Test::Unit::TestCase
+ def test_fiber_without_scheduler
+ # Cannot create fiber without scheduler.
+ assert_raise RuntimeError do
+ Fiber do
+ end
+ end
+ end
+
+ def test_fiber_blocking
+ scheduler = Scheduler.new
+
+ thread = Thread.new do
+ Thread.current.scheduler = scheduler
+
+ # Close is always a blocking operation.
+ IO.pipe.each(&:close)
+ end
+
+ thread.join
+
+ assert_not_empty scheduler.blocking
+ assert_match /test_fiber.rb:\d+:in `close'/, scheduler.blocking.last
+ end
+end
diff --git a/test/scheduler/test_http.rb b/test/scheduler/test_http.rb
new file mode 100644
index 0000000000..82aa73ca35
--- /dev/null
+++ b/test/scheduler/test_http.rb
@@ -0,0 +1,28 @@
+# frozen_string_literal: true
+
+require 'net/http'
+require 'uri'
+require 'openssl'
+
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerHTTP < Test::Unit::TestCase
+ def test_get
+ Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ uri = URI("https://www.ruby-lang.org/en/")
+
+ http = Net::HTTP.new uri.host, uri.port
+ http.use_ssl = true
+ http.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ body = http.get(uri.path).body
+
+ assert !body.empty?
+ end
+ end.join
+ end
+end
diff --git a/test/scheduler/test_io.rb b/test/scheduler/test_io.rb
new file mode 100644
index 0000000000..ef46d1ac2c
--- /dev/null
+++ b/test/scheduler/test_io.rb
@@ -0,0 +1,35 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerIO < Test::Unit::TestCase
+ MESSAGE = "Hello World"
+
+ def test_read
+ skip unless defined?(UNIXSocket)
+
+ i, o = UNIXSocket.pair
+ skip unless i.nonblock? && o.nonblock?
+
+ message = nil
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ message = i.read(20)
+ i.close
+ end
+
+ Fiber do
+ o.write("Hello World")
+ o.close
+ end
+ end
+
+ thread.join
+
+ assert_equal MESSAGE, message
+ end
+end
diff --git a/test/scheduler/test_mutex.rb b/test/scheduler/test_mutex.rb
new file mode 100644
index 0000000000..8395e5522f
--- /dev/null
+++ b/test/scheduler/test_mutex.rb
@@ -0,0 +1,47 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerMutex < Test::Unit::TestCase
+ def test_mutex_synchronize
+ mutex = Mutex.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ assert_equal Thread.scheduler, scheduler
+
+ mutex.synchronize do
+ assert_nil Thread.scheduler
+ end
+ end
+ end
+
+ thread.join
+ end
+
+ def test_mutex_deadlock
+ mutex = Mutex.new
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber do
+ assert_equal Thread.scheduler, scheduler
+
+ mutex.synchronize do
+ Fiber.yield
+ end
+ end
+
+ assert_raise ThreadError do
+ mutex.lock
+ end
+ end
+
+ thread.join
+ end
+end
diff --git a/test/scheduler/test_sleep.rb b/test/scheduler/test_sleep.rb
new file mode 100644
index 0000000000..0be760341e
--- /dev/null
+++ b/test/scheduler/test_sleep.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestSchedulerSleep < Test::Unit::TestCase
+ ITEMS = [0, 1, 2, 3, 4]
+
+ def test_sleep
+ items = []
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ 5.times do |i|
+ Fiber do
+ sleep(i/100.0)
+ items << i
+ end
+ end
+
+ # Should be 5 fibers waiting:
+ assert_equal scheduler.waiting.size, 5
+ end
+
+ thread.join
+
+ assert_equal ITEMS, items
+ end
+end