Created
April 11, 2016 20:31
-
-
Save anonymous/6b9d4b1405981bbc01edaee637ccdcfb to your computer and use it in GitHub Desktop.
Active Job Async adapter for Rails 4.2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'securerandom' | |
require 'concurrent/scheduled_task' | |
require 'concurrent/executor/thread_pool_executor' | |
require 'concurrent/utility/processor_counter' | |
module ActiveJob | |
module QueueAdapters | |
# == Active Job Async adapter | |
# | |
# The Async adapter runs jobs with an in-process thread pool. | |
# | |
# This is the default queue adapter. It's well-suited for dev/test since | |
# it doesn't need an external infrastructure, but it's a poor fit for | |
# production since it drops pending jobs on restart. | |
# | |
# To use this adapter, set queue adapter to +:async+: | |
# | |
# config.active_job.queue_adapter = :async | |
# | |
# To configure the adapter's thread pool, instantiate the adapter and | |
# pass your own config: | |
# | |
# config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \ | |
# min_threads: 1, | |
# max_threads: 2 * Concurrent.processor_count, | |
# idletime: 600.seconds | |
# | |
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute | |
# jobs. Since jobs share a single thread pool, long-running jobs will block | |
# short-lived jobs. Fine for dev/test; bad for production. | |
class AsyncAdapter | |
class << self | |
# See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options. | |
def initialize(**executor_options) | |
@scheduler = Scheduler.new(**executor_options) | |
end | |
def enqueue(job) #:nodoc: | |
@scheduler ||= Scheduler.new | |
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name | |
end | |
def enqueue_at(job, timestamp) #:nodoc: | |
@scheduler ||= Scheduler.new | |
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name | |
end | |
# Gracefully stop processing jobs. Finishes in-progress work and handles | |
# any new jobs following the executor's fallback policy (`caller_runs`). | |
# Waits for termination by default. Pass `wait: false` to continue. | |
def shutdown(wait: true) #:nodoc: | |
@scheduler ||= Scheduler.new | |
@scheduler.shutdown wait: wait | |
end | |
# Used for our test suite. | |
def immediate=(immediate) #:nodoc: | |
@scheduler ||= Scheduler.new | |
@scheduler.immediate = immediate | |
end | |
end | |
# Note that we don't actually need to serialize the jobs since we're | |
# performing them in-process, but we do so anyway for parity with other | |
# adapters and deployment environments. Otherwise, serialization bugs | |
# may creep in undetected. | |
class JobWrapper #:nodoc: | |
def initialize(job) | |
#job.provider_job_id = SecureRandom.uuid | |
@job_data = job.serialize | |
end | |
def perform | |
Base.execute @job_data | |
end | |
end | |
class Scheduler #:nodoc: | |
DEFAULT_EXECUTOR_OPTIONS = { | |
min_threads: 0, | |
max_threads: Concurrent.processor_count, | |
auto_terminate: true, | |
idletime: 60, # 1 minute | |
max_queue: 0, # unlimited | |
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue | |
}.freeze | |
attr_accessor :immediate | |
def initialize(**options) | |
self.immediate = false | |
@immediate_executor = Concurrent::ImmediateExecutor.new | |
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options)) | |
end | |
def enqueue(job, queue_name:) | |
executor.post(job, &:perform) | |
end | |
def enqueue_at(job, timestamp, queue_name:) | |
delay = timestamp - Time.current.to_f | |
if delay > 0 | |
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) | |
else | |
enqueue(job, queue_name: queue_name) | |
end | |
end | |
def shutdown(wait: true) | |
@async_executor.shutdown | |
@async_executor.wait_for_termination if wait | |
end | |
def executor | |
immediate ? @immediate_executor : @async_executor | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment