Skip to content

Instantly share code, notes, and snippets.

@blaedj
Last active April 16, 2018 21:43
Show Gist options
  • Save blaedj/45dea8864d8562f9be734b9e98d3324f to your computer and use it in GitHub Desktop.
Save blaedj/45dea8864d8562f9be734b9e98d3324f to your computer and use it in GitHub Desktop.
Custom ActiveJob backends - notes and research

ActiveJob Queue Adapter api

Adapters must adhere to the following api: (rails//activejob/lib/active_job/queue_adapter.rb)

QUEUE_ADAPTER_METHODS = [:enqueue, :enqueue_at].freeze

  def queue_adapter?(object)
    QUEUE_ADAPTER_METHODS.all? { |meth| object.respond_to?(meth) 
  end

The enqueue(job) method needs to take a single argument: an instance of an ActiveJob::Base-extending class. It should return

The enqueue_at(job, timestamp) additionally takes a timestamp to begin running the job.

to set the adapter, you can just supply the adapter name in place of :foo in a rails config file;

config.active_job.queue_adapter = :foo

The adapter class should be named FooAdapter. If this class class is loaded, it will be looked up:

  # Returns adapter for specified name.
  #
  #   ActiveJob::QueueAdapters.lookup(:sidekiq)
  #   # => ActiveJob::QueueAdapters::SidekiqAdapter
  def lookup(name)
    const_get(name.to_s.camelize << ADAPTER)
  end

Additional Considerations:

  • we should provide a way to kill the adapter without breaking in-progress jobs

  • a sidekiq-like api for reporting stats (e.g. pending jobs, failed jobs, scheduled jobs etc) would be nice for visibility

  • we would need a way to schedule recurring jobs

    • maybe ingest the current yaml file we use for sidekiq-cron?

Job Classes

There are some convenience methods defined on ActiveJob::Base that may be helpful for enqueueing jobs:

  • serialize this method serializes the job instance into a ruby (json/hash) blob:
  {
    "job_class"  => self.class.name,
    "job_id"     => job_id,
    "provider_job_id" => provider_job_id,
    "queue_name" => queue_name,
    "priority"   => priority,
    "arguments"  => serialize_arguments(arguments),
    "executions" => executions,
    "locale"     => I18n.locale.to_s,
    "timezone"   => Time.zone.try(:name)
  }
# TODO: this is just a quick and dirty example, not functioning at all
# and probably considered psuedo-code
class PubsubAdapter
def enqueue(job)
PubsubEnqueuer.new(job.serialize).enqueue
end
def enqueue_at(job, timestamp)
PubsubEnqueuer.new(job.serialize).enqueue
end
end
# all of this could be a separate gem
class PubsubEnqueuer
def initialize(job_data)
@job_data = job_data
end
def push_to_pubsub(message, queue)
# TODO: implement
# push to a pubsub queue that has a consumer running to consume these.
end
end
class PubsubJobConsumer
def initialize(queue='default')
@topic_name = queue
end
def run
message = pull_message
job_class = message.delete('job_class')
JobWrapper.new(job_class, message).run!
end
def pull_message
subscription = client.subscription subscription_name(@topic_name)
subscription.pull(max: 1)
end
end
class JobWrapper
def initialize(klass, attrs)
@job = klass.new()
@job.perform(*attrs.arguments)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment