Skip to content

Instantly share code, notes, and snippets.

@jturkel
Last active December 20, 2015 06:59
Show Gist options
  • Save jturkel/6089698 to your computer and use it in GitHub Desktop.
Save jturkel/6089698 to your computer and use it in GitHub Desktop.
class CreateDelayedJobGroups < ActiveRecord::Migration
def up
add_column(:delayed_jobs, :blocked, :boolean, default: false, null: false)
add_column(:delayed_jobs, :job_group_id, :integer)
add_index(:delayed_jobs, :job_group_id)
remove_index(:delayed_jobs, name: :delayed_jobs_priority)
execute <<-SQL
CREATE INDEX delayed_jobs_priority
ON delayed_jobs(priority, run_at)
WHERE failed_at IS NULL AND blocked = FALSE
SQL
create_table(:delayed_job_groups) do |t|
t.text :on_completion_job
t.text :on_completion_job_options
t.boolean :queueing_complete
t.boolean :blocked, default: false, null: false
end
end
def down
remove_columns(:delayed_jobs, :blocked, :job_group_id)
execute <<-SQL
CREATE INDEX delayed_jobs_priority
ON delayed_jobs(priority, run_at)
WHERE failed_at IS NULL
SQL
drop_table(:delayed_job_groups)
end
end
require 'delayed_job_job_groups_plugin'
Delayed::Worker.plugins << Delayed::Plugins::JobGroupsPlugin
module Delayed
class JobGroup < ActiveRecord::Base
self.table_name = 'delayed_job_groups'
serialize :on_completion_job
serialize :on_completion_job_options, Hash
attr_accessible :on_completion_job, :on_completion_job_options, :blocked
validates :queueing_complete, :blocked, inclusion: [true, false]
alias_method :cancel, :destroy
has_many :active_jobs, class_name: Job, conditions: 'failed_at IS NULL'
# Only delete dependent jobs that are unlocked so we can determine if there are inflight jobs
# for canceled job groups
has_many :queued_jobs, class_name: Job, conditions: 'failed_at IS NULL AND locked_by IS NULL',
dependent: :delete_all
def mark_queueing_complete
with_lock do
raise 'JobGroup has already completed queueing' if queueing_complete?
update_column(:queueing_complete, true)
complete if ready_for_completion?
end
end
def enqueue(job, options = {})
options = options.merge(job_group_id: id)
options[:blocked] = blocked?
Delayed::Job.enqueue(job, options)
end
def unblock
return unless blocked?
with_lock do
update_column(:blocked, false)
active_jobs.update_all(blocked: false)
complete if ready_for_completion?
end
end
def self.check_for_completion(job_group_id)
# Optimization to avoid loading and locking the JobGroup when the group
# still has pending jobs
return if has_pending_jobs?(job_group_id)
Delayed::JobGroup.transaction do
# The first completed job to notice the job group's queue count has dropped to
# zero will queue the job group's completion job and destroy the job group so
# other jobs need to handle the job group having been destroyed already.
job_group = Delayed::JobGroup.where(id: job_group_id).lock(true).first
job_group.send(:complete) if job_group && job_group.send(:ready_for_completion?)
end
end
private
def self.has_pending_jobs?(job_group_id)
Delayed::Job.where(job_group_id: job_group_id, failed_at: nil).exists?
end
def ready_for_completion?
queueing_complete? && !JobGroup.has_pending_jobs?(id) && !blocked?
end
def complete
Delayed::Job.enqueue(on_completion_job, on_completion_job_options || {}) if on_completion_job
destroy
end
end
end
# Construct the JobGroup in a blocked state
job_group = Delayed::JobGroup.create!(blocked: true)
job_group.enqueue(MyJob.new("some arg"), queue: 'general')
job_group.enqueue(MyJob.new("some other arg"), queue: 'general', priority: 10)
job_group.mark_queueing_complete
# Do more stuff...
# Unblock the JobGroup so its jobs can run
job_group.unblock
# We can also pass options that will be used when queueing the on completion job
job_group = Delayed::JobGroup.create!(on_completion_job: MyCompletionJob.new,
on_completion_job_options: { queue: 'general' })
job_group.enqueue(MyJob.new("some arg"), queue: 'general')
job_group.enqueue(MyJob.new("some other arg"), queue: 'general', priority: 10)
job_group = Delayed::JobGroup.create!
# Delayed::JobGroup#enqueue has the same signature as Delayed::Job.enqueue
# i.e. it takes a job and an optional hash of options.
job_group.enqueue(MyJob.new("some arg"), queue: 'general')
job_group.enqueue(MyJob.new("some other arg"), queue: 'general', priority: 10)
# Tell the JobGroup we're done queueing jobs
job_group.mark_queueing_complete
# Do more stuff...
job_group.cancel
module Delayed
module Plugins
class JobGroupsPlugin < BasePlugin
callbacks do |lifecycle|
lifecycle.before(:error) do |worker, job|
# If the job group has been cancelled then don't let the job be retried
if job.in_job_group? && job_group_cancelled?(job.job_group_id)
def job.max_attempts
1
end
end
end
lifecycle.before(:failure) do |worker, job|
# If a job in the job group fails, then cancel the whole job group.
# Need to check that the job group is present since another
# job may have concurrently cancelled it.
if job.in_job_group? && job.job_group
job.job_group.cancel
end
end
lifecycle.after(:perform) do |worker, job|
# Make sure we only check to see if the job group is complete
# if the job succeeded
if job.in_job_group? && job_completed?(job)
Delayed::JobGroup.check_for_completion(job.job_group_id)
end
end
end
private
def self.job_group_cancelled?(job_group_id)
!Delayed::JobGroup.exists?(job_group_id)
end
def self.job_completed?(job)
# Delayed job will already have marked the job for destruction
# if it has completed
job.destroyed?
end
end
end
end
Delayed::Backend::ActiveRecord::Job.class_eval do
attr_accessible :job_group_id, :blocked
belongs_to :job_group
def in_job_group?
job_group_id.present?
end
class << self
# Patch ready_to_run to exclude blocked jobs
def ready_to_run_with_blocked_filtering(worker_name, max_run_time)
ready_to_run_without_blocked_filtering(worker_name, max_run_time).where(blocked: false)
end
alias_method_chain :ready_to_run, :blocked_filtering
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment