Skip to content

Instantly share code, notes, and snippets.

@bryanp
Last active December 29, 2021 16:51
Show Gist options
  • Save bryanp/678e3ee9d8bf4f5a725045d81b8a7291 to your computer and use it in GitHub Desktop.
Save bryanp/678e3ee9d8bf4f5a725045d81b8a7291 to your computer and use it in GitHub Desktop.
Ruby Routines
  • Performs routines concurrently in one or more reactors, each reactor runnning in its own thread.

  • Does not back routines with fibers or threads because of their overhead. Instead routines are defined as a block that is called on each tick of the reactor until the routine says it's finished. The block is given the underlying routine that can hold state, etc. It's a different way of coding as the routine cannot be paused or resumed like fibers can.

  • Reactors are as equitable as possible when allocating compute time. Developers define a compute "unit" in terms of one call of their routine's block. Reactors call one tick of every routine in sequence. This is kind of similar to reductions in Erlang, but differs in that it gives each routine one tick instead of counting.

# Generates 1M UUIDs distributed across 100k routines.
@results = []
100_000.times do
# Create the routine with an initial state of `0`.
#
go(0) { |routine|
if routine.state >= 10
routine.finished
else
@results << SecureRandom.uuid
# Increment the counter state—otherwise this routine would run forever!
#
routine.update(routine.state + 1)
end
}
end
# Create 5 routines that each sleep for a random amount of time.
5.times do |index|
go(true) { |routine|
if routine.state
routine.update(false)
routine.sleep(rand)
else
puts "[#{index}] woke up at #{Time.now.to_f}"
routine.update(true)
end
}
end
# Example output:
# [3] woke up at 1640794396.7832682
# [1] woke up at 1640794396.9643729
# [0] woke up at 1640794397.167804
# [4] woke up at 1640794397.4017222
# [2] woke up at 1640794397.500705
# [1] woke up at 1640794397.675383
# [3] woke up at 1640794397.910521
# [0] woke up at 1640794397.95169
# [4] woke up at 1640794398.2548401
# [2] woke up at 1640794398.359256
# [1] woke up at 1640794398.537109
# [3] woke up at 1640794398.596313
# [0] woke up at 1640794398.976924
# [3] woke up at 1640794399.2930062
# [4] woke up at 1640794399.340241
# Start a server that accepts connections and responds 204.
class Delegate < LLHttp::Delegate
def initialize
reset
end
def reset
@message_complete = false
end
def message_complete?
@message_complete == true
end
def on_message_complete
@message_complete = true
end
end
go(io: TCPServer.new("localhost", 4242), intent: :r) { |server_routine|
if (client_io = server_routine.accept)
state = {delegate: Delegate.new}
state[:parser] = LLHttp::Parser.new(state[:delegate])
go(state, io: client_io, intent: :r) { |client_routine|
if (data = client_routine.read(16384))
client_routine.state[:parser] << data
if client_routine.state[:delegate].message_complete?
client_routine.io.write_nonblock("HTTP/1.1 204 No Content\r\n")
client_routine.io.write_nonblock("content-length: 0\r\n\r\n")
client_routine.state[:delegate].reset
client_routine.state[:parser].reset
end
end
}
end
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment