Skip to content

Instantly share code, notes, and snippets.

@cheald
Created October 24, 2012 02:10
Show Gist options
  • Save cheald/3943316 to your computer and use it in GitHub Desktop.
Save cheald/3943316 to your computer and use it in GitHub Desktop.
require 'rufus/scheduler'
class CronManager
def self.schedule(redis, options = {}, &block)
log options[:logger], "Booting #{self.to_s}..."
if Object.const_defined? :Rails
return unless options[:permit_test_mode] if Rails.env.test?
end
options[:worker] ||= :sidekiq if Object.const_defined? :Sidekiq
options[:worker] ||= :resque if Object.const_defined? :Resque
@singleton ||= new(redis, options, &block)
end
def self.log(logger, message)
logger.debug "[%s] %s" % ["CRON".yellow, message] if logger
end
def every(timing, *args, &block)
options = args.extract_options!
# In order to make sure that jobs are executed at the same time regardless of who runs them
# we quantitize the start time to the next-nearest time slice. This more closely emulates
# cron-style behavior.
#
# This does assume that your system dates are in sync.
seconds = Rufus.parse_duration_string(timing)
now = Time.now.to_i
start_at = Time.at( now + (seconds - (now % seconds)) )
options = options.merge(:first_at => start_at)
options[:period] = seconds
schedule :every, timing, args, options, &block
end
def cron(timing, *args, &block)
options = args.extract_options!
next_fire = Rufus::CronLine.new(timing).next_time
options[:period] = next_fire.to_i - Time.now.to_i
schedule :cron, timing, args, options, &block
end
private
def schedule(method, timing, args = [], options = {}, &block)
if block_given?
options[:name] ||= args.first
else
klass, worker_args = *args
options[:name] ||= "%s(%s)" % [klass, args.join(", ")]
case @options[:worker]
when :resque
block = Proc.new { Resque.enqueue klass.constantize, *worker_args }
when :sidekiq
block = Proc.new { klass.constantize.perform_async *worker_args }
end
end
name = options.delete :name
if name.nil?
if Object.const_defined? :Sourcify
name = Digest::SHA1.hexdigest(block.to_source)
else
raise "No :name specified and sourcify is not available. Specify a name, or add sourcify to your bundle."
end
end
key = "cron:%s-%s" % [timing, name]
period = options.delete :period
@scheduler.send method, timing, options do
# If we can acquire a lock...
if @redis.setnx key, Process.pid
log "#{Process.pid} running #{name}"
# ...set the lock to expire in 95% of the duration's time, to account for system time drift
@redis.expire key, (period * 0.95).ceil
# ...and then run the job
block.call
end
end
end
def log(message)
self.class.log @logger, message
end
def initialize(redis, options = {}, &block)
@logger = options.delete :logger
@redis = redis
@options = options
@scheduler = Rufus::Scheduler.start_new
instance_eval &block
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment