Created
October 24, 2012 02:10
-
-
Save cheald/3943316 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rufus/scheduler' | |
class CronManager | |
def self.schedule(redis, options = {}, &block) | |
log options[:logger], "Booting #{self.to_s}..." | |
if Object.const_defined? :Rails | |
return unless options[:permit_test_mode] if Rails.env.test? | |
end | |
options[:worker] ||= :sidekiq if Object.const_defined? :Sidekiq | |
options[:worker] ||= :resque if Object.const_defined? :Resque | |
@singleton ||= new(redis, options, &block) | |
end | |
def self.log(logger, message) | |
logger.debug "[%s] %s" % ["CRON".yellow, message] if logger | |
end | |
def every(timing, *args, &block) | |
options = args.extract_options! | |
# In order to make sure that jobs are executed at the same time regardless of who runs them | |
# we quantitize the start time to the next-nearest time slice. This more closely emulates | |
# cron-style behavior. | |
# | |
# This does assume that your system dates are in sync. | |
seconds = Rufus.parse_duration_string(timing) | |
now = Time.now.to_i | |
start_at = Time.at( now + (seconds - (now % seconds)) ) | |
options = options.merge(:first_at => start_at) | |
options[:period] = seconds | |
schedule :every, timing, args, options, &block | |
end | |
def cron(timing, *args, &block) | |
options = args.extract_options! | |
next_fire = Rufus::CronLine.new(timing).next_time | |
options[:period] = next_fire.to_i - Time.now.to_i | |
schedule :cron, timing, args, options, &block | |
end | |
private | |
def schedule(method, timing, args = [], options = {}, &block) | |
if block_given? | |
options[:name] ||= args.first | |
else | |
klass, worker_args = *args | |
options[:name] ||= "%s(%s)" % [klass, args.join(", ")] | |
case @options[:worker] | |
when :resque | |
block = Proc.new { Resque.enqueue klass.constantize, *worker_args } | |
when :sidekiq | |
block = Proc.new { klass.constantize.perform_async *worker_args } | |
end | |
end | |
name = options.delete :name | |
if name.nil? | |
if Object.const_defined? :Sourcify | |
name = Digest::SHA1.hexdigest(block.to_source) | |
else | |
raise "No :name specified and sourcify is not available. Specify a name, or add sourcify to your bundle." | |
end | |
end | |
key = "cron:%s-%s" % [timing, name] | |
period = options.delete :period | |
@scheduler.send method, timing, options do | |
# If we can acquire a lock... | |
if @redis.setnx key, Process.pid | |
log "#{Process.pid} running #{name}" | |
# ...set the lock to expire in 95% of the duration's time, to account for system time drift | |
@redis.expire key, (period * 0.95).ceil | |
# ...and then run the job | |
block.call | |
end | |
end | |
end | |
def log(message) | |
self.class.log @logger, message | |
end | |
def initialize(redis, options = {}, &block) | |
@logger = options.delete :logger | |
@redis = redis | |
@options = options | |
@scheduler = Rufus::Scheduler.start_new | |
instance_eval &block | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment