Skip to content

Instantly share code, notes, and snippets.

@brandur
Created January 13, 2021 17:37
Show Gist options
  • Save brandur/af8ac446e6fcaf4120639ceb53391231 to your computer and use it in GitHub Desktop.
Save brandur/af8ac446e6fcaf4120639ceb53391231 to your computer and use it in GitHub Desktop.
Ractor worker pool
require 'kramdown'
require 'redcarpet'
require 'toml'
require 'yaml'
#
# pool
#
class WorkerPool
def initialize(num_workers)
@close_channel = new_channel
@num_jobs = 0
@num_workers = num_workers
@work_channel = new_channel
@workers = num_workers.times.map { |i| new_worker(i) }
end
def close
@num_workers.times.each do
@close_channel << [:close]
end
@workers.each(&:take)
end
def do_work(job)
@num_jobs += 1
@work_channel.send([:job, job])
end
def wait
@num_jobs.times.each do
_r, res= Ractor.select(*@workers)
job, duration = res
puts "job '#{job.name}' took #{duration}s"
end
end
private def new_channel
Ractor.new do
loop do
Ractor.yield Ractor.receive
end
end
end
private def new_worker(worker_num)
Ractor.new(@work_channel, @close_channel, name: "worker-#{worker_num}") do |work_channel, close_channel|
puts "#{name}: started"
loop do
case Ractor.select(work_channel, close_channel)
in _, [:job, job]
start = Time.now
_res = job.work
Ractor.yield([job, Time.now - start])
in _, [:close]
puts "#{name}: closing"
break
else
puts "received unknown message: #{message}"
end
end
end
end
end
#
# jobs
#
class RenderFragmentJob
def initialize(source)
@source = source
end
def name
"fragment: #{File.basename(@source)}"
end
def work
data = File.read(@source)
_, frontmatter_data, markdown_data = data.split("+++")
if frontmatter_data.empty? || markdown_data.empty?
raise "malformatted fragment: #{File.basename(@source)}"
end
meta = TOML::Parser.new(frontmatter_data).parsed
# content = Kramdown::Document.new(markdown_data).to_html
# data = YAML.parse("a: 123")
# renderer = Redcarpet::Render::HTML.new(render_options = {})
# markdown = Redcarpet::Markdown.new(renderer, extensions = {})
# markdown.render(markdown_data)
end
end
class SleepJob
def initialize(i)
@i = i
end
def name
"sleep: #{@i}"
end
def work
sleep(1)
end
end
#
# main
#
pool = WorkerPool.new(10)
=begin
50.times.each do |i|
pool.do_work(SleepJob.new(i))
end
=end
Dir["/Users/brandur/Documents/go/src/github.com/brandur/sorg/content/fragments/*.md"].each do |source|
pool.do_work(RenderFragmentJob.new(source))
end
pool.wait
pool.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment