Forked from GustavoCaso/job_scheduler_using_ractor.rb
Created
September 25, 2020 11:34
-
-
Save strzibny/48c510c8ea7aba6c73c38b25151ebd6c to your computer and use it in GitHub Desktop.
Simple implementation for a job scheduler using ruby Ractor primitive
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'optparse' | |
require 'json' | |
require 'bundler/inline' | |
gemfile do | |
source 'https://rubygems.org' | |
gem 'redis' | |
gem 'oj' | |
end | |
require 'redis' | |
require 'oj' | |
options = {} | |
OptionParser.new do |opts| | |
opts.banner = "Usage: ractor.rb [options]" | |
opts.on("-s", "server", "Run Server") do |s| | |
options[:server] = true | |
end | |
opts.on("-e", "enqueue", "Enqueue work") do |s| | |
options[:enqueue] = true | |
end | |
end.parse! | |
class Worker | |
attr_reader :id | |
def initialize(id) | |
@id = id | |
end | |
def work(job) | |
puts "Worker[#{id}] working #{job}" | |
parsed_job = Oj.load(job) | |
Object.const_get(parsed_job['job_class']).new.perform(parsed_job['args']) | |
end | |
end | |
class DummyJob | |
def perform(*args) | |
puts "Performing" | |
puts args.inspect | |
end | |
end | |
def new_job | |
{'job_class' => DummyJob, 'args' => "DummyJob"}.to_json | |
end | |
QUEUE_NAME = "jobs:queue:low" | |
if options[:enqueue] | |
redis = Redis.new | |
puts "enqueue" | |
result = redis.rpush(QUEUE_NAME, new_job) | |
puts "Result from enqueue #{result}" | |
else | |
pipe = Ractor.new do | |
loop do | |
Ractor.yield(Ractor.recv, move: true) | |
end | |
end | |
workers = 4.times.map do |index| | |
worker = Worker.new(index) | |
Ractor.new(pipe, worker) do |pipe, worker| | |
loop do | |
job = pipe.take | |
puts "taken job from pipe by #{Ractor.current} and work it by worker #{worker.id}" | |
worker.work(job) | |
end | |
end | |
end | |
redis = Redis.new | |
loop do | |
puts "Waiting on work" | |
queue, job = redis.brpop(QUEUE_NAME) | |
puts "Pushing work from #{queue} to available workers" | |
pipe.send(job, move: true) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment