Created
December 5, 2015 11:43
-
-
Save maia/a00974bf70875fef6b13 to your computer and use it in GitHub Desktop.
Wait for completion of ActiveJob jobs of a given batch before enqueueing jobs of the next class, using Sidekiq.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## | |
# | |
# This module allows job classes to complete a batch and only then add workers of other classes. | |
# When including it, call #perform_as_batch(ids) instead of manually calling #perform, | |
# use batch_key to #perform as second parameter and set it as instance variable, and | |
# add #run_when_batch_completed with whatever shall happen once the batch is completed. | |
require 'active_support/concern' | |
module ChainedJob | |
extend ActiveSupport::Concern | |
included(nil) do | |
after_perform :chain_if_completed | |
end | |
class_methods do | |
def perform_as_batch(ids) | |
batch_key = "#{self.name}/#{Time.now.to_i}" | |
ids.each do |id| | |
job = perform_later(id, batch_key) | |
add_job_to_batch(job.job_id, batch_key) | |
end | |
end | |
def add_job_to_batch(job_id, batch_key) | |
Sidekiq.redis do |conn| | |
conn.sadd(batch_key, job_id) | |
conn.expire(batch_key, 600) | |
end | |
end | |
end | |
def chain_if_completed | |
remove_job_from_batch | |
run_when_batch_completed if batch_completed? | |
end | |
def remove_job_from_batch | |
Sidekiq.redis { |conn| conn.srem(@batch_key, @job_id) } | |
end | |
def remaining_job_count | |
Sidekiq.redis { |conn| conn.scard(@batch_key).to_i } | |
end | |
def batch_completed? | |
remaining_job_count == 0 | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SomeJob < ActiveJob::Base | |
include ChainedJob | |
queue_as :default | |
def self.do_something | |
ids = SomeClassRelation.ids | |
perform_as_batch(ids) | |
end | |
def perform(id, batch_key=nil) | |
@batch_key = batch_key | |
object = SomeClass.find(id) | |
# do something | |
end | |
def run_when_batch_completed | |
ChainedOtherJob.perform_later | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment