Last active
February 18, 2023 12:48
-
-
Save shalvah/ef750bc1bb1f01f6392709f071a670e2 to your computer and use it in GitHub Desktop.
Ruby event loop + web server (single thread) https://blog.shalvah.me/posts/experiments-in-concurrency-3-event-loops
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'algorithms' | |
require 'fiber' | |
class EventLoop | |
def initialize | |
@queue = TaskQueue.new | |
end | |
def run(&entrypoint) | |
entrypoint.call | |
until @queue.empty? | |
callback = @queue.next | |
callback.call if callback | |
end | |
end | |
def set_timeout(timeout, &callback) | |
current_time = Time.now.to_f * 1000 | |
@queue.add_timer(current_time + timeout, callback) | |
end | |
def run_coroutine(block, *args, &continuation) | |
fiber = Fiber.new &block | |
continue_fiber(fiber, *args, &continuation) | |
end | |
def continue_fiber(fiber, *args, &continuation) | |
return_val = fiber.resume(*args) | |
if fiber.alive? | |
@queue.add { continue_fiber(fiber, &continuation) } | |
else | |
if continuation | |
@queue.add { continuation.call(return_val) } | |
end | |
end | |
end | |
def add_callback(&callback) | |
@queue.add &callback | |
end | |
end | |
class TaskQueue | |
def initialize | |
@callbacks = [] | |
@timers = Containers::PriorityQueue.new | |
end | |
def empty? | |
@timers.empty? && @callbacks.empty? | |
end | |
def next | |
next_timer = @timers.next | |
current_time = Time.now.to_f * 1000 | |
if next_timer && (current_time >= next_timer[:scheduled_time]) | |
@timers.pop | |
return next_timer[:callback] | |
end | |
if @callbacks.length | |
return @callbacks.shift | |
end | |
nil | |
end | |
def add_timer(scheduled_time, callback) | |
priority = -scheduled_time | |
@timers.push({ scheduled_time: scheduled_time, callback: callback }, priority) | |
end | |
def add(&callback) | |
@callbacks << callback | |
end | |
end | |
module Kernel | |
def event_loop | |
@loop ||= EventLoop.new | |
end | |
def run_loop(&entrypoint) | |
event_loop.run(&entrypoint) | |
end | |
def set_timeout(timeout, &callback) | |
event_loop.set_timeout(timeout, &callback) | |
end | |
def run_coroutine(fiber, *args, &continuation) | |
event_loop.run_coroutine(fiber, *args, &continuation) | |
end | |
def add_callback(&callback) | |
event_loop.add_callback(&callback) | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'socket' | |
require './event_loop' | |
def handle_next_request(server, &handler) | |
begin | |
socket = server.accept_nonblock | |
request = socket.gets.strip | |
puts "Started handling req: #{request} #{Time.now}" | |
handler.call(socket) | |
add_callback { handle_next_request(server, &handler) } | |
rescue IO::WaitReadable, Errno::EINTR | |
add_callback { handle_next_request(server, &handler) } | |
end | |
end | |
run_loop do | |
server = TCPServer.new("127.0.0.1", 5678) | |
puts "Listening on localhost:5678" | |
handle_next_request(server) do |socket| | |
set_timeout(5000) do | |
puts "Responding 5 seconds later: #{Time.now}" | |
socket.puts <<~HTTP | |
HTTP/1.1 200 OK | |
Content-Type: text/html | |
Hii 👋 | |
HTTP | |
socket.close | |
end | |
set_timeout(0) { puts "0ms later: #{Time.now}" } | |
set_timeout(1500) { puts "1.5s later: #{Time.now}" } | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'socket' | |
require 'json' | |
require './event_loop' | |
# NB: beyond the concurrency, you won't see much performance benefits here | |
# Because the copy and stringify operations are CPU-bound | |
# Replace them with some I/O, like database/files/network, where you have actual "wait time", | |
# and you'll see more benefits. But I'm tired😭...so maybe another day. | |
def handle_next_request(server, &handler) | |
begin | |
socket = server.accept_nonblock | |
request = socket.gets.strip | |
puts "Started handling req: #{request} #{Time.now}" | |
handler.call(socket) | |
add_callback { handle_next_request(server, &handler) } | |
rescue IO::WaitReadable, Errno::EINTR | |
add_callback { handle_next_request(server, &handler) } | |
end | |
end | |
array = 200_000.times.map { |i| {a: i} } | |
copy = lambda do |arr| | |
copied_array = [] | |
arr.each_with_index do |item, index| | |
if index > 0 && (index % 50_000).zero? | |
Fiber.yield # Pause every 50_000 items | |
end | |
copied_array.push item | |
end | |
copied_array | |
end | |
stringify = lambda do |arr| | |
result = "[" | |
arr.each_with_index do |item, index| | |
if index > 0 && (index % 50_000).zero? | |
Fiber.yield # Pause every 50_000 items | |
end | |
result += item.to_json | |
result += "," | |
end | |
result[result.length - 1] = "]" | |
result | |
end | |
run_loop do | |
server = TCPServer.new("127.0.0.1", 5678) | |
puts "Listening on localhost:5678" | |
handle_next_request(server) do |socket| | |
run_coroutine(copy, array) do |copied| | |
puts "Copy done." | |
run_coroutine(stringify, copied) do | |
puts "Stringify done." | |
puts "Responding: #{Time.now}" | |
socket.puts <<~HTTP | |
HTTP/1.1 200 OK | |
Content-Type: text/html | |
Hii 👋 | |
HTTP | |
socket.close | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment