summaryrefslogtreecommitdiff
path: root/test/scheduler/scheduler.rb
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-05-14 22:10:55 +1200
committerGitHub <noreply@github.com>2020-05-14 22:10:55 +1200
commit0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 (patch)
tree74d381412dfd8ff49dd3039f8aeae09ad9e4e6e3 /test/scheduler/scheduler.rb
parent336119dfc5e6baae0a936d6feae780a61975479c (diff)
Thread scheduler for light weight concurrency.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3032 Merged-By: ioquatix <samuel@codeotaku.com>
Diffstat (limited to 'test/scheduler/scheduler.rb')
-rw-r--r--test/scheduler/scheduler.rb163
1 files changed, 163 insertions, 0 deletions
diff --git a/test/scheduler/scheduler.rb b/test/scheduler/scheduler.rb
new file mode 100644
index 0000000000..b2d36cc728
--- /dev/null
+++ b/test/scheduler/scheduler.rb
@@ -0,0 +1,163 @@
+# frozen_string_literal: true
+
+require 'fiber'
+
+begin
+ require 'io/nonblock'
+rescue LoadError
+ # Ignore.
+end
+
+class Scheduler
+ def initialize
+ @readable = {}
+ @writable = {}
+ @waiting = {}
+ @blocking = []
+
+ @ios = ObjectSpace::WeakMap.new
+ end
+
+ attr :fiber
+
+ attr :readable
+ attr :writable
+ attr :waiting
+ attr :blocking
+
+ def next_timeout
+ fiber, timeout = @waiting.min_by{|key, value| value}
+
+ if timeout
+ offset = timeout - current_time
+
+ if offset < 0
+ return 0
+ else
+ return offset
+ end
+ end
+ end
+
+ def run
+ while @readable.any? or @writable.any? or @waiting.any?
+ # Can only handle file descriptors up to 1024...
+ readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
+
+ # puts "readable: #{readable}" if readable&.any?
+ # puts "writable: #{writable}" if writable&.any?
+
+ readable&.each do |io|
+ @readable[io]&.resume
+ end
+
+ writable&.each do |io|
+ @writable[io]&.resume
+ end
+
+ if @waiting.any?
+ time = current_time
+ waiting = @waiting
+ @waiting = {}
+
+ waiting.each do |fiber, timeout|
+ if timeout <= time
+ fiber.resume
+ else
+ @waiting[fiber] = timeout
+ end
+ end
+ end
+ end
+ end
+
+ def for_fd(fd)
+ @ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
+ end
+
+ def wait_readable(io)
+ @readable[io] = Fiber.current
+
+ Fiber.yield
+
+ @readable.delete(io)
+
+ return true
+ end
+
+ def wait_readable_fd(fd)
+ wait_readable(
+ for_fd(fd)
+ )
+ end
+
+ def wait_writable(io)
+ @writable[io] = Fiber.current
+
+ Fiber.yield
+
+ @writable.delete(io)
+
+ return true
+ end
+
+ def wait_writable_fd(fd)
+ wait_writable(
+ for_fd(fd)
+ )
+ end
+
+ def current_time
+ Process.clock_gettime(Process::CLOCK_MONOTONIC)
+ end
+
+ def wait_sleep(duration = nil)
+ @waiting[Fiber.current] = current_time + duration
+
+ Fiber.yield
+
+ return true
+ end
+
+ def wait_any(io, events, duration)
+ unless (events & IO::WAIT_READABLE).zero?
+ @readable[io] = Fiber.current
+ end
+
+ unless (events & IO::WAIT_WRITABLE).zero?
+ @writable[io] = Fiber.current
+ end
+
+ Fiber.yield
+
+ @readable.delete(io)
+ @writable.delete(io)
+
+ return true
+ end
+
+ def wait_for_single_fd(fd, events, duration)
+ wait_any(
+ for_fd(fd),
+ events,
+ duration
+ )
+ end
+
+ def enter_blocking_region
+ # puts "Enter blocking region: #{caller.first}"
+ end
+
+ def exit_blocking_region
+ # puts "Exit blocking region: #{caller.first}"
+ @blocking << caller.first
+ end
+
+ def fiber(&block)
+ fiber = Fiber.new(blocking: false, &block)
+
+ fiber.resume
+
+ return fiber
+ end
+end