Created
March 17, 2017 20:09
-
-
Save mbreit/7b84e8fd51b6d1889f4bc1f2612c3f9d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple ActiveJob worker for PostgreSQL using LISTEN/NOTIFY. | |
# | |
# Supports most ActiveJob features like multiple queues, priorities | |
# and wait times. | |
# | |
# To use this as your Rails job queue, add this to your environment | |
# configuration (config/environments/production.rb): | |
# | |
# config.active_job.queue_adapter = PgJob::QueueAdapter.new | |
# | |
# Then run one or multiple workers for the default queue with | |
# | |
# bin/rails runner PgJob.work | |
# | |
# or for other queues with | |
# | |
# bin/rails runner "PgJob.work(:my_queue)" | |
# | |
class PgJob < ApplicationRecord | |
# QueueAdapter for ActiveJob. | |
class QueueAdapter | |
def enqueue(job) | |
Job.enqueue(job) | |
end | |
def enqueue_at(job, timestamp) | |
Job.enqueue(job, timestamp) | |
end | |
end | |
scope :due, -> { where('scheduled_for IS NULL OR scheduled_for <= ?', Time.current) } | |
scope :queue, -> { where(performed_at: nil).order(:priority, :created_at) } | |
validates :queue_name, format: {with: /\A[a-zA-Z1-9_]+\z/} | |
# Run a worker process for a given queue name. | |
# Will run all scheduled jobs in the queue ordered by their | |
# priorities (lowest first) and then wait for PostgreSQL LISTEN | |
# events to run new jobs. For jobs that are scheduled for a later | |
# time, it wakes up in an interval given by the timeout parameter | |
# to check for jobs that became due in the meantime. | |
# | |
# @param queue_name [String] The name of the queue to work on | |
# @param timeout [integer] Interval to check for due jobs | |
def self.work(queue_name = 'default', timeout: 10) | |
connection.execute "LISTEN pg_jobs_#{queue_name}" | |
loop do | |
# Consume all pending NOTIFY events | |
while connection.raw_connection.notifies; end | |
# Work jobs as long as there are pending jobs in the queue | |
while work_job(queue_name); end | |
# Wait for next NOTIFY event | |
connection.raw_connection.wait_for_notify(timeout) | |
end | |
ensure | |
connection.execute "UNLISTEN pg_jobs_#{queue_name}" | |
end | |
# Enqueue a new job to run at a given time or immediately | |
# | |
# @param job [ActiveJob::Base] The ActiveJob job object to schedule | |
# @param scheduled_for [Integer,Time] Timestamp when the job should be | |
# executed. Use nil if the job should be run immediately. | |
def self.enqueue(job, scheduled_for = nil) | |
queue_name = job.queue_name || 'default' | |
Job.create!(job_data: job.serialize, | |
scheduled_for: scheduled_for && Time.zone.at(scheduled_for), | |
priority: job.priority || 100, | |
queue_name: queue_name) | |
connection.execute "NOTIFY pg_jobs_#{queue_name}" | |
end | |
# Execute a single job from the given queue. | |
# | |
# @param queue_name [String] Name of the queue to look for a due job | |
def self.work_job(queue_name) | |
transaction do | |
job = queue.due.lock('FOR UPDATE SKIP LOCKED').find_by(queue_name: queue_name) | |
return false unless job | |
job.perform | |
job.performed_at = Time.zone.now | |
job.save! | |
end | |
end | |
# Execute the job. Calls `ActiveJob::Base.execute`. | |
def perform | |
ActiveJob::Base.execute(job_data) | |
rescue => e | |
Rails.logger.error("Error while executing job: #{e}\n" + e.backtrace.join('\n')) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment