Last active
December 20, 2015 06:59
-
-
Save jturkel/6089698 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CreateDelayedJobGroups < ActiveRecord::Migration | |
def up | |
add_column(:delayed_jobs, :blocked, :boolean, default: false, null: false) | |
add_column(:delayed_jobs, :job_group_id, :integer) | |
add_index(:delayed_jobs, :job_group_id) | |
remove_index(:delayed_jobs, name: :delayed_jobs_priority) | |
execute <<-SQL | |
CREATE INDEX delayed_jobs_priority | |
ON delayed_jobs(priority, run_at) | |
WHERE failed_at IS NULL AND blocked = FALSE | |
SQL | |
create_table(:delayed_job_groups) do |t| | |
t.text :on_completion_job | |
t.text :on_completion_job_options | |
t.boolean :queueing_complete | |
t.boolean :blocked, default: false, null: false | |
end | |
end | |
def down | |
remove_columns(:delayed_jobs, :blocked, :job_group_id) | |
execute <<-SQL | |
CREATE INDEX delayed_jobs_priority | |
ON delayed_jobs(priority, run_at) | |
WHERE failed_at IS NULL | |
SQL | |
drop_table(:delayed_job_groups) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'delayed_job_job_groups_plugin' | |
Delayed::Worker.plugins << Delayed::Plugins::JobGroupsPlugin |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Delayed | |
class JobGroup < ActiveRecord::Base | |
self.table_name = 'delayed_job_groups' | |
serialize :on_completion_job | |
serialize :on_completion_job_options, Hash | |
attr_accessible :on_completion_job, :on_completion_job_options, :blocked | |
validates :queueing_complete, :blocked, inclusion: [true, false] | |
alias_method :cancel, :destroy | |
has_many :active_jobs, class_name: Job, conditions: 'failed_at IS NULL' | |
# Only delete dependent jobs that are unlocked so we can determine if there are inflight jobs | |
# for canceled job groups | |
has_many :queued_jobs, class_name: Job, conditions: 'failed_at IS NULL AND locked_by IS NULL', | |
dependent: :delete_all | |
def mark_queueing_complete | |
with_lock do | |
raise 'JobGroup has already completed queueing' if queueing_complete? | |
update_column(:queueing_complete, true) | |
complete if ready_for_completion? | |
end | |
end | |
def enqueue(job, options = {}) | |
options = options.merge(job_group_id: id) | |
options[:blocked] = blocked? | |
Delayed::Job.enqueue(job, options) | |
end | |
def unblock | |
return unless blocked? | |
with_lock do | |
update_column(:blocked, false) | |
active_jobs.update_all(blocked: false) | |
complete if ready_for_completion? | |
end | |
end | |
def self.check_for_completion(job_group_id) | |
# Optimization to avoid loading and locking the JobGroup when the group | |
# still has pending jobs | |
return if has_pending_jobs?(job_group_id) | |
Delayed::JobGroup.transaction do | |
# The first completed job to notice the job group's queue count has dropped to | |
# zero will queue the job group's completion job and destroy the job group so | |
# other jobs need to handle the job group having been destroyed already. | |
job_group = Delayed::JobGroup.where(id: job_group_id).lock(true).first | |
job_group.send(:complete) if job_group && job_group.send(:ready_for_completion?) | |
end | |
end | |
private | |
def self.has_pending_jobs?(job_group_id) | |
Delayed::Job.where(job_group_id: job_group_id, failed_at: nil).exists? | |
end | |
def ready_for_completion? | |
queueing_complete? && !JobGroup.has_pending_jobs?(id) && !blocked? | |
end | |
def complete | |
Delayed::Job.enqueue(on_completion_job, on_completion_job_options || {}) if on_completion_job | |
destroy | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Construct the JobGroup in a blocked state | |
job_group = Delayed::JobGroup.create!(blocked: true) | |
job_group.enqueue(MyJob.new("some arg"), queue: 'general') | |
job_group.enqueue(MyJob.new("some other arg"), queue: 'general', priority: 10) | |
job_group.mark_queueing_complete | |
# Do more stuff... | |
# Unblock the JobGroup so its jobs can run | |
job_group.unblock |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# We can also pass options that will be used when queueing the on completion job | |
job_group = Delayed::JobGroup.create!(on_completion_job: MyCompletionJob.new, | |
on_completion_job_options: { queue: 'general' }) | |
job_group.enqueue(MyJob.new("some arg"), queue: 'general') | |
job_group.enqueue(MyJob.new("some other arg"), queue: 'general', priority: 10) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
job_group = Delayed::JobGroup.create! | |
# Delayed::JobGroup#enqueue has the same signature as Delayed::Job.enqueue | |
# i.e. it takes a job and an optional hash of options. | |
job_group.enqueue(MyJob.new("some arg"), queue: 'general') | |
job_group.enqueue(MyJob.new("some other arg"), queue: 'general', priority: 10) | |
# Tell the JobGroup we're done queueing jobs | |
job_group.mark_queueing_complete | |
# Do more stuff... | |
job_group.cancel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Delayed | |
module Plugins | |
class JobGroupsPlugin < BasePlugin | |
callbacks do |lifecycle| | |
lifecycle.before(:error) do |worker, job| | |
# If the job group has been cancelled then don't let the job be retried | |
if job.in_job_group? && job_group_cancelled?(job.job_group_id) | |
def job.max_attempts | |
1 | |
end | |
end | |
end | |
lifecycle.before(:failure) do |worker, job| | |
# If a job in the job group fails, then cancel the whole job group. | |
# Need to check that the job group is present since another | |
# job may have concurrently cancelled it. | |
if job.in_job_group? && job.job_group | |
job.job_group.cancel | |
end | |
end | |
lifecycle.after(:perform) do |worker, job| | |
# Make sure we only check to see if the job group is complete | |
# if the job succeeded | |
if job.in_job_group? && job_completed?(job) | |
Delayed::JobGroup.check_for_completion(job.job_group_id) | |
end | |
end | |
end | |
private | |
def self.job_group_cancelled?(job_group_id) | |
!Delayed::JobGroup.exists?(job_group_id) | |
end | |
def self.job_completed?(job) | |
# Delayed job will already have marked the job for destruction | |
# if it has completed | |
job.destroyed? | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Delayed::Backend::ActiveRecord::Job.class_eval do | |
attr_accessible :job_group_id, :blocked | |
belongs_to :job_group | |
def in_job_group? | |
job_group_id.present? | |
end | |
class << self | |
# Patch ready_to_run to exclude blocked jobs | |
def ready_to_run_with_blocked_filtering(worker_name, max_run_time) | |
ready_to_run_without_blocked_filtering(worker_name, max_run_time).where(blocked: false) | |
end | |
alias_method_chain :ready_to_run, :blocked_filtering | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment