Last active
August 9, 2023 08:27
-
-
Save fractaledmind/43b77af9d04a4aa9012dc7117c7c5042 to your computer and use it in GitHub Desktop.
An executable Ruby script sandbox to explore the concept of a SQLite-backed, process-embedded job backend
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# require "bundler/inline" | |
# | |
# gemfile(true) do | |
# source "https://rubygems.org" | |
# | |
# git_source(:github) { |repo| "https://github.com/#{repo}.git" } | |
# | |
# gem "sqlite3" | |
# end | |
require "sqlite3" | |
require "singleton" | |
require "json" | |
# :nocov: | |
class Litescheduler | |
include Singleton | |
attr_reader :environment, :scheduler, :max_contexts, :storage, :context, :mutex | |
def initialize | |
@environment = detect_environment | |
@scheduler = detect_scheduler | |
@max_contexts = detect_max_contexts | |
@storage = detect_storage | |
@context = detect_context | |
# a single mutex per process (is that ok?) | |
@mutex = Thread::Mutex.new | |
end | |
# spawn a new execution context | |
def spawn(&block) | |
case @scheduler | |
when :fiber then Fiber.schedule(&block) | |
when :polyphony then spin(&block) | |
when :iodine then Thread.new(&block) | |
when :threaded then Thread.new(&block) | |
else | |
raise StandardError.new("Unknown scheduler: `#{@scheduler}`") | |
end | |
end | |
# switch the execution context to allow others to run | |
def switch | |
if @scheduler == :fiber | |
Fiber.scheduler.yield | |
true | |
elsif @scheduler == :polyphony | |
Fiber.current.schedule | |
Thread.current.switch_fiber | |
true | |
else | |
# Thread.pass | |
false | |
end | |
end | |
# bold assumption, we will only synchronize threaded code | |
# this is a no-op for fibers | |
def synchronize(&block) | |
# do nothing, just run the block as is | |
return yield if @scheduler == :fiber | |
return yield if @scheduler == :polyphony | |
@mutex.synchronize(&block) | |
end | |
private | |
# Detect the Rack or Rails environment. | |
def detect_environment | |
return Rails.env if defined? Rails | |
return ENV["RACK_ENV"] if ENV.key?("RACK_ENV") | |
return ENV["APP_ENV"] if ENV.key?("APP_ENV") | |
"development" | |
end | |
# identify which scheduler we are running in | |
# we currently support :fiber, :polyphony, :iodine & :threaded | |
# in the future we might want to expand to other schedulers | |
def detect_scheduler | |
return :fiber if Fiber.scheduler | |
return :polyphony if defined? Polyphony | |
return :iodine if defined? Iodine | |
:threaded | |
end | |
def detect_max_contexts | |
return 50 if scheduler == :fiber | |
return 50 if scheduler == :polyphony | |
5 | |
end | |
def detect_storage | |
if scheduler == :fiber || scheduler == :poylphony | |
Fiber.current.storage | |
else | |
Thread.current | |
end | |
end | |
def detect_context | |
if scheduler == :fiber || scheduler == :poylphony | |
Fiber.current | |
else | |
Thread.current | |
end | |
end | |
end | |
# :nocov: | |
module Litedb | |
def connect!(options = {}) | |
@scheduler = Litescheduler.instance | |
defaults = { | |
path: ":memory:", | |
synchronous: :NORMAL, | |
mmap_size: 32 * 1024, # 32 kilobytes | |
journal_size_limit: 64 * 1024 * 1024, # 64 megabytes | |
} | |
config = defaults.merge(options.transform_keys { |key| key.to_sym rescue key }) | |
@db = SQLite3::Database.new(config[:path]) | |
# set a custom busy handler to override the `busy_timeout` | |
# this ensures we either switch to another execution context or try again to connect | |
# https://www.sqlite.org/pragma.html#pragma_busy_timeout | |
@db.busy_handler { @scheduler.switch || sleep(rand * 0.002) } | |
# Journal mode WAL allows for greater concurrency (many readers + one writer) | |
# https://www.sqlite.org/pragma.html#pragma_journal_mode | |
@db.journal_mode = :WAL | |
# level of database durability | |
# 2 = "FULL" (sync on every write) | |
# 1 = "NORMAL" (sync every 1000 written pages) | |
# 0 = "OFF" (don't sync) | |
# https://www.sqlite.org/pragma.html#pragma_synchronous | |
@db.synchronous = config[:synchronous] | |
# set the global memory map so all processes can share data | |
# https://www.sqlite.org/pragma.html#pragma_mmap_size | |
# https://www.sqlite.org/mmap.html | |
@db.mmap_size = config[:mmap_size] | |
# impose a limit on the WAL file to prevent unlimited growth (with a negative impact on read performance as well) | |
# https://www.sqlite.org/pragma.html#pragma_journal_size_limit | |
@db.journal_size_limit = config[:journal_size_limit] | |
@db.instance_variable_set(:@statements, {}) | |
@db.class.attr_reader(:statements) | |
@db.transaction(:immediate) do | |
self.class.migrations.each do |_key, sql| | |
@db.execute(sql.gsub(/[[:space:]]+/, " ").strip) | |
end | |
end | |
self.class.statements.each do |key, sql| | |
@db.statements[key.to_sym] = @db.prepare(sql.gsub(/[[:space:]]+/, " ").strip) | |
end | |
@db | |
end | |
def run_statement(statement, *args) | |
@db.statements[statement].execute!(*args) | |
end | |
end | |
class Litequeue | |
include Singleton | |
include Litedb | |
Configuration = Struct.new(:path, :synchronous, :mmap_size, :journal_size_limit) | |
def self.configuration | |
@configuration ||= Configuration.new( | |
path = "./queue.sqlite3", | |
synchronous = :OFF, | |
mmap_size = 32 * 1024, | |
) | |
end | |
def self.configure | |
yield(configuration) | |
end | |
def configuration | |
self.class.configuration | |
end | |
def initialize | |
connect!(configuration.to_h) | |
end | |
def push(value, delay=nil, queue=nil) | |
delay ||= 0 | |
queue ||= 'default' | |
result = run_statement(:push, queue, delay, value) | |
result.first | |
end | |
def pop(queue=nil, limit=nil) | |
queue ||= 'default' | |
limit ||= 1 | |
run_statement(:pop, queue, limit) | |
end | |
def repush(id, value, delay=nil, queue=nil) | |
delay ||= 0 | |
queue ||= 'default' | |
result = run_statement(:repush, id, queue, delay, value) | |
result.first | |
end | |
def delete(id) | |
result = run_statement(:delete, id) | |
result.first | |
end | |
def count(queue=nil) | |
result = run_statement(:count, queue) | |
result.first.first | |
end | |
def clear(queue=nil) | |
result = run_statement(:clear, queue) | |
end | |
def empty? | |
count.zero? | |
end | |
private | |
def self.migrations | |
{ | |
create_table_queue: " | |
CREATE TABLE IF NOT EXISTS queue( | |
id TEXT PRIMARY KEY NOT NULL ON CONFLICT REPLACE, | |
name TEXT NOT NULL ON CONFLICT REPLACE, | |
fire_at INTEGER NOT NULL ON CONFLICT REPLACE, | |
value TEXT, | |
created_at INTEGER DEFAULT(UNIXEPOCH()) NOT NULL ON CONFLICT REPLACE | |
) WITHOUT ROWID; | |
", | |
create_index_queue_by_name: " | |
CREATE INDEX IF NOT EXISTS idx_queue_by_name ON queue(name, fire_at ASC); | |
" | |
} | |
end | |
def self.statements | |
{ | |
push: " | |
INSERT INTO queue(id, name, fire_at, value) | |
VALUES (HEX(RANDOMBLOB(32)), $1, (UNIXEPOCH('subsec') + $2), $3) | |
RETURNING id, name; | |
", | |
repush: " | |
INSERT INTO queue(id, name, fire_at, value) | |
VALUES (?, ?, (UNIXEPOCH('subsec') + ?), ?) | |
RETURNING name; | |
", | |
pop: " | |
DELETE FROM queue | |
WHERE name != '_dead' | |
AND (name, fire_at, id) | |
IN ( | |
SELECT name, fire_at, id FROM queue | |
WHERE name = IFNULL($1, 'default') | |
AND fire_at <= (UNIXEPOCH('subsec')) | |
ORDER BY fire_at ASC | |
LIMIT IFNULL($2, 1) | |
) | |
RETURNING id, value; | |
", | |
count: " | |
SELECT COUNT(*) FROM queue WHERE IIF($1 IS NULL, 1, name = $1) | |
", | |
delete: " | |
DELETE FROM queue | |
WHERE id = $1 | |
RETURNING value; | |
", | |
clear: " | |
DELETE FROM queue | |
WHERE IIF($1 IS NULL, 1, name = $1); | |
", | |
info: " | |
SELECT | |
name, | |
COUNT(*) AS count, | |
AVG(UNIXEPOCH('subsec') - created_at) AS avg, | |
MIN(UNIXEPOCH('subsec') - created_at) AS min, | |
MAX(UNIXEPOCH('subsec') - created_at) AS max | |
FROM queue | |
GROUP BY name | |
ORDER BY count DESC; | |
" | |
} | |
end | |
end | |
class Litekiq | |
# Litekiq::Job is responsible for providing an interface to job classes | |
module Job | |
private | |
def self.included(klass) | |
klass.extend(ClassMethods) | |
end | |
module ClassMethods | |
def perform_async(*params) | |
@litejob_options ||= {} | |
client.push(name, params, @litejob_options.merge(delay: 0, queue: queue_name)) | |
end | |
def perform_at(time, *params) | |
@litejob_options ||= {} | |
delay = time.to_i - Time.now.to_i | |
client.push(name, params, @litejob_options.merge(delay: delay, queue: queue_name)) | |
end | |
def perform_in(delay, *params) | |
@litejob_options ||= {} | |
client.push(name, params, @litejob_options.merge(delay: delay, queue: queue_name)) | |
end | |
alias_method :perform_after, :perform_in | |
def delete(id) | |
client.delete(id) | |
end | |
def queue_as(queue_name) | |
@queue_name = queue_name.to_s | |
end | |
def litejob_options(options) | |
@litejob_options = options | |
end | |
private | |
def queue_name | |
@queue_name || 'default' | |
end | |
def client | |
@client ||= Client.new | |
end | |
end | |
end | |
# Litekiq::Client is responsible for pushing job payloads to the SQLite queue. | |
class Client | |
def initialize() | |
@queue = Litequeue.instance | |
end | |
def push(jobclass, params, options = {}) | |
delay = options[:delay] || 0 | |
attempts = options[:attempts] || 5 | |
queue = options[:queue] | |
payload = JSON.dump({class: jobclass, params: params, attempts: attempts, queue: queue}) | |
atomic_push(payload, delay, queue) | |
end | |
def delete(id) | |
result = @queue.delete(id) | |
payload = result.first | |
job = JSON.load(payload) | |
job | |
end | |
private | |
def atomic_push(payload, delay, queue) | |
retryable = true | |
begin | |
@queue.push(payload, delay, queue) | |
rescue => exception | |
# Retry once retryable exceptions | |
# https://github.com/sparklemotion/sqlite3-ruby/blob/master/lib/sqlite3/errors.rb | |
if retryable && exception.is_a?(SQLite3::BusyException) | |
retryable = false | |
retry | |
else | |
raise exception | |
end | |
end | |
end | |
end | |
# Litekiq::Server is responsible for popping job payloads from the SQLite queue. | |
# :nocov: | |
class Server | |
def initialize(queues) | |
@queue = Litequeue.instance | |
@scheduler = Litescheduler.instance | |
@queues = queues | |
# group and order queues according to their priority | |
@prioritized_queues = queues.reduce({}) do |memo, (name, priority, spawns)| | |
memo[priority] ||= [] | |
memo[priority] << [name, spawns == "spawn"] | |
memo | |
end.sort_by do |priority, _| | |
-priority | |
end | |
@running = true | |
@sleep_intervals = [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0] | |
run! | |
end | |
def pop(queue) | |
result = @queue.pop(queue) | |
return result[0] if result.length == 1 | |
return false if result.empty? | |
result | |
end | |
def run! | |
@scheduler.spawn do | |
worker_sleep_index = 0 | |
while @running do | |
processed = 0 | |
@prioritized_queues.each do |priority, queues| | |
queues.each do |queue, spawns| | |
batched = 0 | |
while (batched < priority) && (payload = pop(queue)) | |
batched += 1 | |
processed += 1 | |
processor = Processor.new(payload) | |
processor.process! | |
# give other contexts a chance to run here | |
@scheduler.switch | |
end | |
end | |
if processed == 0 | |
sleep @sleep_intervals[worker_sleep_index] | |
worker_sleep_index += 1 if worker_sleep_index < @sleep_intervals.length - 1 | |
else | |
worker_sleep_index = 0 # reset the index | |
end | |
end | |
end | |
end | |
end | |
end | |
# :nocov: | |
# Litekiq::Processor is responsible for processing job payloads | |
class Processor | |
def initialize(payload) | |
@payload = payload | |
@queue = Litequeue.instance | |
end | |
def repush(id, job, delay=0, queue=nil) | |
@queue.repush(id, JSON.dump(job), delay, queue) | |
end | |
def process! | |
id, serialized_job = @payload | |
job_hash = JSON.load(serialized_job) | |
klass = Object.const_get(job_hash["class"]) | |
instance = klass.new | |
begin | |
instance.perform(*job_hash["params"]) | |
rescue => exception | |
if job_hash['retries_left'] == 0 | |
repush(id, job_hash, 0, "_dead") | |
else | |
job_hash['retries_left'] ||= job_hash['attempts'] | |
job_hash['retries_left'] -= 1 | |
retry_delay = (job_hash['attempts'] - job_hash['retries_left']) * 0.1 | |
repush(id, job_hash, retry_delay, job_hash["queue"]) | |
end | |
end | |
rescue => exception | |
# this is an error in the extraction of job info, retrying here will not be useful | |
raise exception | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
# require "bundler/inline" | |
# | |
# gemfile(true) do | |
# source "https://rubygems.org" | |
# | |
# git_source(:github) { |repo| "https://github.com/#{repo}.git" } | |
# | |
# gem "sqlite3" | |
# end | |
require 'simplecov' | |
SimpleCov.command_name 'test:units' | |
SimpleCov.start do | |
filters.clear | |
enable_coverage :branch | |
end | |
require "minitest/autorun" | |
require_relative "./litekiq" | |
# ----------------------------------------------------------------------------- | |
# Setup a class to allow us to track and test whether code has been performed | |
class Performance | |
def self.reset! | |
@performances = 0 | |
end | |
def self.performed! | |
@performances ||= 0 | |
@performances += 1 | |
end | |
def self.processed!(item, scope: :default) | |
@processed_items ||= {} | |
@processed_items[scope] ||= [] | |
@processed_items[scope] << item | |
end | |
def self.processed_items(scope = :default) | |
@processed_items[scope] | |
end | |
def self.performances | |
@performances || 0 | |
end | |
end | |
$jobqueue = Litequeue.instance | |
describe Litekiq::Job do | |
after do | |
$jobqueue.clear | |
Performance.reset! | |
NoOpJob.instance_variable_set :@queue_name, nil | |
end | |
class NoOpJob | |
include Litekiq::Job | |
def perform = nil | |
end | |
class OpJob | |
include Litekiq::Job | |
def perform = Performance.performed! | |
end | |
class RetryJob | |
include Litekiq::Job | |
class RetryableError < StandardError; end | |
def perform | |
if Performance.performances.zero? | |
Performance.performed! | |
raise RetryableError | |
end | |
end | |
end | |
class AlwaysFailJob | |
include Litekiq::Job | |
class RetryableError < StandardError; end | |
def perform | |
Performance.performed! | |
raise RetryableError | |
end | |
end | |
def perform_enqueued_jobs(&block) | |
yield # enqueue jobs | |
# iterate over enqueued jobs and perform them | |
until $jobqueue.empty? | |
payload = $jobqueue.pop().first | |
next if payload.nil? | |
Litekiq::Processor.new(payload).process! | |
end | |
end | |
def perform_enqueued_job() | |
performed = false | |
attempts = 0 | |
# get first enqueued jobs and perform it | |
until performed do | |
attempts += 1 | |
payload = $jobqueue.pop().first | |
next if payload.nil? | |
Litekiq::Processor.new(payload).process! | |
performed = true | |
end | |
end | |
describe ".perform_async" do | |
it "returns job_id and queue" do | |
job_id, queue = NoOpJob.perform_async() | |
assert job_id | |
assert_equal "default", queue | |
end | |
it "successfully pushes job to the queue" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = NoOpJob.perform_async() | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
it "successfully runs the job" do | |
assert_equal 0, $jobqueue.count("default") | |
perform_enqueued_jobs do | |
_job_id, queue = OpJob.perform_async() | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "retries a job that fails" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = RetryJob.perform_async() | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "stops retrying a job after max retries" do | |
assert_equal 0, $jobqueue.count("default") | |
AlwaysFailJob.litejob_options(attempts: 1) | |
_job_id, queue = AlwaysFailJob.perform_async() | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 2, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
assert_equal 1, $jobqueue.count("_dead") | |
end | |
end | |
describe ".perform_at" do | |
it "returns job_id and queue" do | |
job_id, queue = NoOpJob.perform_at(Time.now.to_i + 0.1) | |
assert job_id | |
assert_equal "default", queue | |
end | |
it "successfully pushes job to the queue" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = NoOpJob.perform_at(Time.now.to_i + 0.1) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
it "successfully runs the job" do | |
assert_equal 0, $jobqueue.count("default") | |
perform_enqueued_jobs do | |
_job_id, queue = OpJob.perform_at(Time.now.to_i + 0.1) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "retries a job that fails" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = RetryJob.perform_at(Time.now.to_i + 0.1) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "stops retrying a job after max retries" do | |
assert_equal 0, $jobqueue.count("default") | |
AlwaysFailJob.litejob_options(attempts: 1) | |
_job_id, queue = AlwaysFailJob.perform_at(Time.now.to_i + 0.1) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 2, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
assert_equal 1, $jobqueue.count("_dead") | |
end | |
end | |
describe ".perform_in" do | |
it "returns job_id and queue" do | |
job_id, queue = NoOpJob.perform_in(1) | |
assert job_id | |
assert_equal "default", queue | |
end | |
it "successfully pushes job to the queue" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = NoOpJob.perform_in(1) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
it "successfully runs the job" do | |
assert_equal 0, $jobqueue.count("default") | |
perform_enqueued_jobs do | |
_job_id, queue = OpJob.perform_in(0.01) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "retries a job that fails" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = RetryJob.perform_in(0.01) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "stops retrying a job after max retries" do | |
assert_equal 0, $jobqueue.count("default") | |
AlwaysFailJob.litejob_options(attempts: 1) | |
_job_id, queue = AlwaysFailJob.perform_in(0.01) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 2, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
assert_equal 1, $jobqueue.count("_dead") | |
end | |
end | |
describe ".perform_after" do | |
it "returns job_id and queue" do | |
job_id, queue = NoOpJob.perform_after(1) | |
assert job_id | |
assert_equal "default", queue | |
end | |
it "successfully pushes job to the queue" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = NoOpJob.perform_after(1) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
it "successfully runs the job" do | |
assert_equal 0, $jobqueue.count("default") | |
perform_enqueued_jobs do | |
_job_id, queue = OpJob.perform_after(0.01) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
end | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "retries a job that fails" do | |
assert_equal 0, $jobqueue.count("default") | |
_job_id, queue = RetryJob.perform_after(0.01) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "stops retrying a job after max retries" do | |
assert_equal 0, $jobqueue.count("default") | |
AlwaysFailJob.litejob_options(attempts: 1) | |
_job_id, queue = AlwaysFailJob.perform_after(0.01) | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 1, Performance.performances | |
assert_equal 1, $jobqueue.count("default") | |
perform_enqueued_job | |
assert_equal 2, Performance.performances | |
assert_equal 0, $jobqueue.count("default") | |
assert_equal 1, $jobqueue.count("_dead") | |
end | |
end | |
describe ".delete" do | |
it "removes the job" do | |
assert_equal 0, $jobqueue.count("default") | |
job_id, queue = NoOpJob.perform_async() | |
assert job_id | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
NoOpJob.delete(job_id) | |
assert_equal 0, $jobqueue.count("default") | |
end | |
it "returns the job hash" do | |
assert_equal 0, $jobqueue.count("default") | |
job_id, queue = NoOpJob.perform_async() | |
assert job_id | |
assert_equal "default", queue | |
assert_equal 1, $jobqueue.count("default") | |
result = NoOpJob.delete(job_id) | |
assert_equal({"class"=>"NoOpJob", "params"=>[], "attempts"=>5, "queue"=>"default"}, result) | |
end | |
end | |
describe ".queue_as" do | |
it "sets the queue name for .perform_async" do | |
NoOpJob.queue_as('test') | |
job_id, queue = NoOpJob.perform_async | |
assert job_id | |
assert_equal "test", queue | |
end | |
it "sets the queue name for .perform_at" do | |
NoOpJob.queue_as('test') | |
job_id, queue = NoOpJob.perform_at(Time.now.to_i + 0.1) | |
assert job_id | |
assert_equal "test", queue | |
end | |
it "sets the queue name for .perform_in" do | |
NoOpJob.queue_as('test') | |
job_id, queue = NoOpJob.perform_in(0.1) | |
assert job_id | |
assert_equal "test", queue | |
end | |
it "sets the queue name for .perform_after" do | |
NoOpJob.queue_as('test') | |
job_id, queue = NoOpJob.perform_after(0.1) | |
assert job_id | |
assert_equal "test", queue | |
end | |
end | |
describe "exceptions" do | |
describe "when trying to push to the Litequeue" do | |
before do | |
@original_push = Litequeue.instance_method(:push) | |
end | |
after do | |
Litequeue.define_method(@original_push.name, @original_push) | |
end | |
it "immediately raises non-retryable exception" do | |
Litequeue.define_method(:push) do |value, delay=nil, queue=nil| | |
Performance.performed! | |
raise StandardError | |
end | |
assert_raises(StandardError) { NoOpJob.perform_async } | |
assert_equal 1, Performance.performances | |
end | |
it "retries once retryable exception" do | |
Litequeue.define_method(:push) do |value, delay=nil, queue=nil| | |
Performance.performed! | |
raise SQLite3::BusyException | |
end | |
assert_raises(SQLite3::BusyException) { NoOpJob.perform_async } | |
assert_equal 2, Performance.performances | |
end | |
end | |
describe "when processing a job" do | |
it "immediately raises when the job class is undefined" do | |
payload = ['ID', JSON.dump({class: "NonExistentClass", params: [], attempts: 5, queue: "default"})] | |
processor = Litekiq::Processor.new(payload) | |
assert_raises(NameError, 'uninitialized constant NonExistentClass') { processor.process! } | |
end | |
end | |
end | |
end | |
describe Litequeue do | |
after do | |
Litequeue.instance_variable_set :@configuration, nil | |
end | |
describe ".configuration" do | |
it "has default path as queue database" do | |
assert_equal "./queue.sqlite3", Litequeue.configuration.path | |
end | |
it "has default synchronous off" do | |
assert_equal :OFF, Litequeue.configuration.synchronous | |
end | |
it "has default mmap_size as 32 kilobyes" do | |
assert_equal 32768, Litequeue.configuration.mmap_size | |
end | |
end | |
describe ".configure" do | |
it "can set path" do | |
Litequeue.configure do |config| | |
config.path = 'path/to/db.sqlite3' | |
end | |
assert_equal 'path/to/db.sqlite3', Litequeue.configuration.path | |
assert_equal :OFF, Litequeue.configuration.synchronous | |
assert_equal 32768, Litequeue.configuration.mmap_size | |
end | |
it "can set synchronous" do | |
Litequeue.configure do |config| | |
config.synchronous = :NORMAL | |
end | |
assert_equal :NORMAL, Litequeue.configuration.synchronous | |
assert_equal "./queue.sqlite3", Litequeue.configuration.path | |
assert_equal 32768, Litequeue.configuration.mmap_size | |
end | |
it "can set synchronous" do | |
Litequeue.configure do |config| | |
config.mmap_size = 0 | |
end | |
assert_equal 0, Litequeue.configuration.mmap_size | |
assert_equal "./queue.sqlite3", Litequeue.configuration.path | |
assert_equal :OFF, Litequeue.configuration.synchronous | |
end | |
it "can set all 3" do | |
Litequeue.configure do |config| | |
config.path = 'path/to/db.sqlite3' | |
config.synchronous = :NORMAL | |
config.mmap_size = 0 | |
end | |
assert_equal 'path/to/db.sqlite3', Litequeue.configuration.path | |
assert_equal :NORMAL, Litequeue.configuration.synchronous | |
assert_equal 0, Litequeue.configuration.mmap_size | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment