Skip to content

Instantly share code, notes, and snippets.

@ahoward
Created July 16, 2012 17:13
Show Gist options
  • Save ahoward/3123839 to your computer and use it in GitHub Desktop.
Save ahoward/3123839 to your computer and use it in GitHub Desktop.
app/models/job.rb
#
# this module encapsulate the interface expected by DelayedJob, and Resque -
# if you use it your app will not need to change if you change background
# processing systems. it also centralizes and makes easy to test all
# potential background jobs independently of your preferred system.
#
# all jobs are stored in the db, but a queueing system, like resque, is used
# to run them in the background. this way the job class can support a rich
# query interface and persistence regardless of the background processing
# system used.
#
# examples:
#
# Job.submit(Mailer, :invitation, '[email protected]')
#
# Job.submit('p :this_is_evaled')
#
# Job.submit(CustomJob, :method_to_run, :arbitrary, :args => :to_pass)
#
# Job.submit(RespondsTo_perform, :arbitrary, :args => :to_pass)
#
# Job.pending.count
#
# Job.clear # you'll wanna run this using whenever or something...
#
class Job
##
#
include App::Document
field(:object, :type => String, :default => proc{ 'Scope' })
field(:message, :type => String, :default => proc{ 'eval' })
field(:args, :type => Array, :default => proc{ [] })
field(:status, :type => String, :default => 'pending')
field(:result)
field(:error, :type => Map)
field(:completed_at, :type => Time)
field(:attempts, :type => Integer, :default => proc{ 0 })
%w( pending running success failure ).each do |status|
scope(status, where(:status => status))
end
def run
inc(:attempts, 1)
update_attributes(:status => 'running')
object = Job.eval(read_attribute(:object))
result =
case
when object <= ActionMailer::Base
mailer = object
mail = mailer.send(:new, message, *args).message
mail.deliver
Array(mail.destinations)
else
object.send(message, *args)
end
ensure
if $!
e = $!
error = {
'message' => e.message,
'class' => e.class,
'backtrace' => Array(e.backtrace)
}
update_attributes(:status => 'failure', :error => Job.pod(error), :result => nil, :completed_at => nil)
else
update_attributes(:status => 'success', :completed_at => Time.now.utc, :result => Job.pod(result), :error => nil)
end
end
def Job.pod(value)
case value
when Hash, Array
MultiJson.load(MultiJson.dump(value))
else
{'_' => MultiJson.load(MultiJson.dump(value))}
end
rescue
{'_' => value.class.name}
end
def Job.submit(*args, &block)
job =
case args.size
when 0
raise ArgumentError
when 1
create!(:object => Scope, :message => :eval, :args => args)
when 2
object = object_for(args.shift)
message = 'perform'
create!(:object => object, :message => message, :args => args)
else
object = object_for(args.shift)
message = args.shift.to_s
create!(:object => object, :message => message, :args => args)
end
begin
enqueue(job)
rescue Object => e
job.destroy
raise SubmitError
end
job
end
def Job.object_for(value)
value.to_s
end
def Job.eval(code)
Scope.eval(code)
end
def Job.clear(max = 8192)
if Job.count > max
where(:status => 'success', :completed_at.lt => 1.minute.ago).
delete_all
end
if Job.count > max
where(:attempts.gt => 4).
delete_all
end
end
## resque support
#
@queue = 'jobs'
def Job.enqueue(job)
Resque.enqueue(Job, job.id.to_s)
end
def enqueue
Job.enqueue(job=self)
end
def Job.dequeue(job)
raise NotImplementedError
end
def dequeue
Job.dequeue(job=self)
end
def Job.perform(id)
Job.find(id).run
end
unless ::UUID.respond_to?(:generate)
class << ::UUID
def generate(*args) ::App.uuid end
end
end
## provides a clean scope for evaling code in
#
class Scope
alias_method :__binding__, :binding
public :__binding__
instance_methods.each{|m| undef_method m unless m =~ /__|object_id/}
def Scope.eval(string)
scope = new
::Kernel.eval(string.to_s, scope.__binding__)
end
end
## error classes
#
class SubmitError < ::StandardError
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment