Last active
December 26, 2015 21:59
-
-
Save jturkel/7220372 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
config.jobs = ActiveSupport::OrderedOptions.new | |
# Controls whether or not workers report heartbeats | |
config.jobs.heartbeat_enabled = true | |
# How often workers should send heartbeats | |
config.jobs.heartbeat_interval_seconds = 60 | |
# How long a worker can go without sending a heartbeat before they're considered dead | |
config.jobs.heartbeat_timeout_seconds = 3 * 60 | |
# How often to check for dead workers | |
config.jobs.dead_worker_polling_interval_seconds = 60 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require File.expand_path('../../config/boot', __FILE__) | |
require File.expand_path('../../config/environment', __FILE__) | |
require 'clockwork' | |
include Clockwork | |
if Rails.configuration.jobs.heartbeat_enabled | |
every(Rails.configuration.jobs.dead_worker_polling_interval_seconds.seconds, 'Unlock orphaned jobs') do | |
Delayed::Plugins::HeartbeatPlugin.unlock_orphaned_jobs | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CreateDelayedWorkers < ActiveRecord::Migration | |
def change | |
create_table(:delayed_workers) do |t| | |
t.string :name | |
t.timestamp :last_heartbeat_at | |
end | |
add_index(:delayed_workers, :name, unique: true) | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Delayed | |
module Heartbeat | |
def self.unlock_orphaned_jobs(timeout_seconds = Rails.configuration.jobs.heartbeat_timeout_seconds) | |
WorkerModel.dead_workers(timeout_seconds).delete_all | |
orphaned_jobs = Delayed::Job.where("locked_at IS NOT NULL AND " \ | |
"locked_by NOT IN (#{WorkerModel.active_names.to_sql})") | |
orphaned_jobs.update_all('locked_at = NULL, locked_by = NULL, attempts = attempts + 1') | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Delayed | |
module Heartbeat | |
class Plugin < Delayed::Plugin | |
callbacks do |lifecycle| | |
lifecycle.before(:execute) do |worker| | |
@heartbeat = WorkerHeartbeat.new(worker.name) if Rails.configuration.jobs.heartbeat_enabled | |
end | |
lifecycle.after(:execute) do |worker| | |
@heartbeat.stop if @heartbeat | |
end | |
end | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Delayed::Worker.plugins << Delayed::Heartbeat::Plugin |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Delayed | |
module Heartbeat | |
class WorkerHeartbeat | |
def initialize(worker_name) | |
@worker_model = create_worker_model(worker_name) | |
# Use a self-pipe to safely shutdown the heartbeat thread | |
@stop_reader, @stop_writer = IO.pipe | |
@heartbeat_thread = Thread.new { run_heartbeat_loop } | |
# We don't want the worker to continue running if the | |
# heartbeat can't be written | |
@heartbeat_thread.abort_on_exception = true | |
end | |
def alive? | |
@heartbeat_thread.alive? | |
end | |
def stop | |
# Use the self-pipe to tell the heartbeat thread to cleanly | |
# shutdown | |
if @stop_writer | |
@stop_writer.write_nonblock('stop') | |
@stop_writer.close | |
@stop_writer = nil | |
end | |
end | |
private | |
def create_worker_model(worker_name) | |
WorkerModel.transaction do | |
# Just recreate the worker model to avoid the race condition where | |
# it gets deleted before we can update its last heartbeat | |
WorkerModel.where(name: worker_name).destroy_all | |
WorkerModel.create!(name: worker_name) | |
end | |
end | |
def run_heartbeat_loop | |
while true | |
break if sleep_interruptibly(heartbeat_interval) | |
@worker_model.update_heartbeat | |
end | |
rescue Exception => e | |
Rails.logger.error("Worker heartbeat error: #{e.message}: #{e.backtrace.join('\n')}") | |
raise e | |
ensure | |
Rails.logger.info('Shutting down worker heartbeat thread') | |
@stop_reader.close | |
@worker_model.delete | |
Delayed::Backend::ActiveRecord::Job.clear_active_connections! | |
end | |
def heartbeat_interval | |
Rails.configuration.jobs.heartbeat_interval_seconds | |
end | |
# Returns a truthy if the sleep was interrupted | |
def sleep_interruptibly(secs) | |
IO.select([@stop_reader], nil, nil, secs) | |
end | |
end | |
end | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Delayed | |
module Heartbeat | |
class WorkerModel < ActiveRecord::Base | |
self.table_name = 'delayed_workers' | |
attr_accessible :name, :last_heartbeat_at | |
before_create do |model| | |
model.last_heartbeat_at ||= Time.now.utc | |
end | |
def update_heartbeat | |
update_column(:last_heartbeat_at, Time.now.utc) | |
end | |
def self.dead_workers(timeout_seconds) | |
where('last_heartbeat_at < ?', Time.now.utc - timeout_seconds.seconds) | |
end | |
def self.active_names | |
select(:name) | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment