Created
October 20, 2008 12:56
-
-
Save tooky/18063 to your computer and use it in GitHub Desktop.
workling.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Merb::BootLoader.after_app_loads do | |
config_path = Merb.root / 'config' / 'starling.yml' | |
load_path = Merb.root / 'app' / 'workers' | |
if File.exists?(config_path) | |
Workling::Client.options = YAML.load_file(config_path)[Merb.environment.to_sym] | |
Dir.glob(load_path / '**' / '*.rb').each { |wling| require wling } | |
else | |
Merb.logger.error! "No starling.yml file found in #{Merb.root}/config." | |
exit(1) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
:development: | |
:host: 127.0.0.1 | |
:port: 22122 | |
:test: | |
:host: 127.0.0.1 | |
:port: 22122 | |
:production: | |
:host: 127.0.0.1 | |
:port: 22122 | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Workling | |
class WorklingError < StandardError; end | |
class WorklingNotFoundError < WorklingError; end | |
class Base | |
cattr_accessor :discovered | |
@@discovered = [] | |
def self.client | |
@@client ||= Client.new | |
end | |
def self.queue_for(klass, method) | |
(klass.to_s.tableize / method ).split("/").join("__") # Don't split with : because it messes up memcache stats | |
end | |
def self.method_name(queue) | |
queue.split("__").last | |
end | |
def self.inherited(subclass) | |
discovered << subclass | |
end | |
def self.run(klass, method, options = {}) | |
client.set(queue_for(klass, method), options) | |
return nil # empty. | |
rescue MemCache::MemCacheError => e | |
# failed to enqueue, raise a workling error so that it propagates upwards | |
raise Workling::WorklingError.new("#{e.class.to_s} - #{e.message}") | |
end | |
def self.method_missing(method, *args, &block) | |
if method.to_s =~ /^async?_(.*)/ | |
run(self.to_s.dasherize, $1, *args) | |
else | |
super | |
end | |
end | |
def self.queues | |
@queues ||= instance_methods(false).inject({}) do |queues, method| | |
next if method == 'create' # Skip the create method | |
queue = queue_for(self, method) | |
queues[queue] = self.new | |
queues | |
end | |
end | |
def create | |
# Put worker initialize code in here. This is good for restarting jobs that | |
# were interrupted. | |
end | |
end | |
class Client | |
cattr_accessor :options | |
attr_accessor :starling_url | |
attr_accessor :connection | |
def initialize | |
@starling_url = [self.class.options[:host], self.class.options[:port]].compact.join(':') | |
options = [self.starling_url, self.class.options[:memcache_options]].compact | |
@connection = ::MemCache.new(*options) | |
end | |
def method_missing(method, *args) | |
@connection.send(method, *args) | |
end | |
def stats | |
@connection.stats | |
end | |
end | |
class Poller | |
cattr_accessor :sleep_time # Seconds to sleep before looping | |
cattr_accessor :reset_time # Seconds to wait while resetting connection | |
def initialize | |
Poller.sleep_time = Workling::Client.options[:sleep_time] || 2 | |
Poller.reset_time = Workling::Client.options[:reset_time] || 30 | |
@workers = ThreadGroup.new | |
end | |
def logger | |
Merb.logger | |
end | |
def listen | |
# Create a thread for each worker. | |
Workling::Base.discovered.each do |klass| | |
@workers.add(Thread.new(klass) { |c| klass_listen(c) }) | |
end | |
# Wait for all workers to complete | |
@workers.list.each { |t| t.join } | |
end | |
# gracefully stop processing | |
def stop | |
@workers.list.each { |w| w[:shutdown] = true } | |
end | |
## | |
## Thread procs | |
## | |
# Listen for one worker class | |
def klass_listen(klass) | |
logger.debug("Listener thread #{klass.name} started") | |
# Read thread configuration if available | |
if Client.options.has_key?(:listeners) | |
if Client.options[:listeners].has_key?(klass.to_s) | |
options = Client.options[:listeners][klass.to_s].symbolize_keys | |
thread_sleep_time = options[:sleep_time] if options.has_key?(:sleep_time) | |
end | |
end | |
thread_sleep_time ||= self.class.sleep_time | |
# Setup connection to starling (one per thread) | |
connection = Client.new | |
puts "** Starting Client for #{klass.name} queue" | |
# Start dispatching those messages | |
while (!Thread.current[:shutdown]) do | |
begin | |
# Keep MySQL connection alive | |
# unless ActiveRecord::Base.connection.active? | |
# unless ActiveRecord::Base.connection.reconnect! | |
# logger.fatal("FAILED - Database not available") | |
# break | |
# end | |
# end | |
# Dispatch and process the messages | |
n = dispatch!(connection, klass) | |
logger.debug("Listener thread #{klass.name} processed #{n.to_s} queue items") if n > 0 | |
sleep(self.class.sleep_time) unless n > 0 | |
# If there is a memcache error, hang for a bit to give it a chance to fire up again | |
# and reset the connection. | |
rescue MemCache::MemCacheError | |
logger.warn("Listener thread #{klass.name} failed to connect to memcache. Resetting connection.") | |
sleep(self.class.reset_time) | |
connection.reset | |
end | |
end | |
logger.debug("Listener thread #{klass.name} ended") | |
end | |
# Dispatcher for one worker class. Will throw MemCacheError if unable to connect. | |
# Returns the number of worker methods called | |
def dispatch!(connection, klass) | |
n = 0 | |
for queue in klass.queues | |
begin | |
result = connection.get(queue.first) | |
if result | |
n += 1 | |
handler = queue.last | |
method_name = klass.method_name(queue.first) | |
handler.send(method_name, result) | |
end | |
rescue MemCache::MemCacheError => e | |
logger.error("FAILED to connect with queue #{ queue }: #{ e } }") | |
raise e | |
rescue Object => e | |
logger.error("FAILED to process queue #{ queue }. #{ klass.queues[queue] } could not handle invocation of #{ Base.method_name(queue.first) } with #{ result.inspect }: #{ e }.\n#{ e.backtrace.join("\n") }") | |
end | |
end | |
return n | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment