Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save strzibny/48c510c8ea7aba6c73c38b25151ebd6c to your computer and use it in GitHub Desktop.
Save strzibny/48c510c8ea7aba6c73c38b25151ebd6c to your computer and use it in GitHub Desktop.
Simple implementation for a job scheduler using ruby Ractor primitive
require 'optparse'
require 'json'
require 'bundler/inline'
gemfile do
source 'https://rubygems.org'
gem 'redis'
gem 'oj'
end
require 'redis'
require 'oj'
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: ractor.rb [options]"
opts.on("-s", "server", "Run Server") do |s|
options[:server] = true
end
opts.on("-e", "enqueue", "Enqueue work") do |s|
options[:enqueue] = true
end
end.parse!
class Worker
attr_reader :id
def initialize(id)
@id = id
end
def work(job)
puts "Worker[#{id}] working #{job}"
parsed_job = Oj.load(job)
Object.const_get(parsed_job['job_class']).new.perform(parsed_job['args'])
end
end
class DummyJob
def perform(*args)
puts "Performing"
puts args.inspect
end
end
def new_job
{'job_class' => DummyJob, 'args' => "DummyJob"}.to_json
end
QUEUE_NAME = "jobs:queue:low"
if options[:enqueue]
redis = Redis.new
puts "enqueue"
result = redis.rpush(QUEUE_NAME, new_job)
puts "Result from enqueue #{result}"
else
pipe = Ractor.new do
loop do
Ractor.yield(Ractor.recv, move: true)
end
end
workers = 4.times.map do |index|
worker = Worker.new(index)
Ractor.new(pipe, worker) do |pipe, worker|
loop do
job = pipe.take
puts "taken job from pipe by #{Ractor.current} and work it by worker #{worker.id}"
worker.work(job)
end
end
end
redis = Redis.new
loop do
puts "Waiting on work"
queue, job = redis.brpop(QUEUE_NAME)
puts "Pushing work from #{queue} to available workers"
pipe.send(job, move: true)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment