Skip to content

Instantly share code, notes, and snippets.

@mgreenly
Last active February 23, 2020 23:54
Show Gist options
  • Save mgreenly/98e67d3e707a7dcc4add1c3cd8bf7207 to your computer and use it in GitHub Desktop.
Save mgreenly/98e67d3e707a7dcc4add1c3cd8bf7207 to your computer and use it in GitHub Desktop.
require 'concurrent'
MAX_THREADS = 20
class Job
def step
sleep 3 # simulates performing the next task in the jobs workflow
end
end
class App
def run(runner)
pool = Concurrent::ThreadPoolExecutor.new(pool_opts)
until runner.exit?
job = poll # only a single thread ever polls for work
pool.post do
job.step # jobs are performed in parallel
end
end
pool.shutdown
pool.wait_for_termination
end
private
def poll
Job.new # simulate polling the broker for a job that's ready to progress
end
def pool_opts
{
min_threads: MAX_THREADS,
max_threads: MAX_THREADS,
max_queue: 1,
fallback_policy: :caller_runs
}
end
end
module Runner
class << self
def run(app)
@exit = false
thread = Thread.new do
app.run(self)
end
thread.join
rescue Interrupt
@exit = true
thread.join
end
def exit?
@exit
end
end
end
Signal.trap("TERM") { raise Interrupt }
Runner.run(App.new)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment