Skip to content

Instantly share code, notes, and snippets.

@sco
Created March 26, 2009 13:15
Show Gist options
  • Select an option

  • Save sco/86084 to your computer and use it in GitHub Desktop.

Select an option

Save sco/86084 to your computer and use it in GitHub Desktop.
class JobQueue
def initialize(name, options={})
@key = [:job_queue, Merb.env, name].join
@queue = SQS.queue(@key, true, 200)
@cache = options[:cache] || CACHE
@logger = options[:logger] || Merb.logger
end
def push(options={})
@queue.push(ActiveSupport::JSON.encode(options))
end
def pop(&block)
while message = @queue.receive
options = ActiveSupport::JSON.decode(message.body).symbolize_keys
@logger.info "Processing message: #{options}"
if yield(options)
message.delete
end
@cache.set(@key, recent_times.unshift([Time.now, @queue.size])[0, 50])
end
end
# The estimated number of seconds remaining to process all jobs.
def eta
return 0 if average_time == "NaN"
(average_time * @queue.size.to_f).round
end
private
def recent_times
@cache.get(@key)
rescue Memcached::NotFound
[]
rescue Memcached::UnknownReadFailure
[]
end
def average_time
times = recent_times
return 0 if times.empty?
seconds = times.first[0] - times.last[0]
messages = times.last[1] - times.first[1]
return 0 if messages == 0
seconds.to_f / messages.to_f
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment