Last active
January 8, 2021 09:34
-
-
Save ysbaddaden/dbde2bec0c3fe40afc26441a4d353239 to your computer and use it in GitHub Desktop.
A basic Scheduler for Ruby 3, leveraging the NIO and timers gems
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require "fiber" | |
require "nio" | |
require "timers" | |
module Earl | |
class Scheduler | |
def initialize | |
@selector = NIO::Selector.new | |
@timers = Timers::Group.new | |
@ready = [] | |
@blocked = {} | |
end | |
def process_wait(pid, flags) | |
if (flags & Process::WNOHANG) == Process::WNOHANG | |
Process::Status.wait(pid, flags) | |
else | |
Thread.new { Process::Status.wait(pid, flags) }.value | |
end | |
end | |
def io_wait(io, events, timeout) | |
fiber = Fiber.current | |
blk = proc { enqueue(fiber) } | |
monitor = @selector.register(io, events_to_nio_interests(events)) | |
monitor.value = blk | |
timer = @timers.after(timeout, &blk) if timeout | |
Fiber.yield | |
timer&.cancel | |
monitor.close | |
nio_interests_to_events(monitor.readiness) | |
end | |
def kernel_sleep(duration = nil) | |
# NOTE: ConditionVariable#wait calls #kernel_sleep instead #block | |
# in Ruby 3.0.0 so we delegate to #block to have a proper | |
# block then unblock chain of calls | |
block(:sleep, duration) | |
# NOTE: with the ruby patch below we can just create a timer: | |
# fiber = Fiber.current | |
# @timers.after(duration) { enqueue(fiber) } if duration | |
# Fiber.yield | |
end | |
def block(blocker, timeout = nil) | |
fiber = Fiber.current | |
timer = @timers.after(timeout) { unblock(blocker, fiber) } if timeout | |
@blocked[fiber] = timer | |
Fiber.yield | |
true | |
end | |
def unblock(blocker, fiber) | |
@blocked.delete(fiber)&.cancel | |
enqueue(fiber) | |
# resume the NIO selector, because it may be blocking the loop | |
@selector.wakeup | |
nil | |
end | |
def fiber(&block) | |
fiber = Fiber.new(blocking: false, &block) | |
fiber.resume # immediately yield control to the fiber (seems to be expected) | |
fiber | |
end | |
def close | |
run | |
end | |
def run | |
until done? | |
if @ready.empty? | |
# try to replenish the queue, blocking until something is ready, | |
# making sure to resume when the next timer must be triggered | |
wait_interval = @timers.wait_interval&.clamp(0..) | |
@selector.select(wait_interval) { |monitor| monitor.value.call } | |
# trigger expired sleeps & timeouts | |
@timers.fire | |
end | |
# resume next ready fiber (if any) | |
@ready.shift&.resume | |
end | |
ensure | |
@selector.close | |
@timers.cancel | |
@ready.clear | |
@blocked.clear | |
end | |
private | |
def done? | |
@ready.empty? && @selector.empty? && @timers.empty? && @blocked.empty? | |
end | |
def enqueue(fiber) | |
@ready << fiber | |
end | |
def events_to_nio_interests(events) | |
readable = (events & IO::READABLE) == IO::READABLE | |
writable = (events & IO::WRITABLE) == IO::WRITABLE | |
if readable && writable | |
:rw | |
elsif readable | |
:r | |
elsif writable | |
:w | |
end | |
end | |
def nio_interests_to_events(interests) | |
case interests | |
when :rw | |
IO::READABLE | IO::WRITABLE | |
when :r | |
IO::READABLE | |
when :w | |
IO::WRITABLE | |
else | |
0 | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/thread_sync.c b/thread_sync.c | |
index 8c999e2164..0126891602 100644 | |
--- a/thread_sync.c | |
+++ b/thread_sync.c | |
@@ -547,7 +547,7 @@ rb_mutex_sleep(VALUE self, VALUE timeout) | |
VALUE scheduler = rb_scheduler_current(); | |
if (scheduler != Qnil) { | |
- rb_scheduler_kernel_sleep(scheduler, timeout); | |
+ rb_scheduler_block(scheduler, self, timeout); | |
mutex_lock_uninterruptible(self); | |
} else { | |
if (NIL_P(timeout)) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment