Created
May 29, 2009 20:09
-
-
Save wwalker/120178 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# set time zone as early as possible | |
ENV['TZ']='UTC' | |
require 'rubygems' | |
require 'activesupport' | |
# set the time zone for Active Support (it monkey patches Time) | |
Time.zone = 'Etc/UTC' | |
require 'logger' | |
# define an application wide log object (doesn't exist until DeliveryHandler.new() has run) | |
def dh_logger | |
DeliveryHandler::dh_log | |
end | |
# define an application wide config lookup tool (doesn't work until DeliveryHandler.new() has run DeliveryHandler::init_activerecord) | |
def dh_config(key, mod = 'delivery_handler') | |
config = SystemConfiguration.find_by_module_and_setting(mod, key) | |
return nil if ! config | |
config.value | |
end | |
def dh_status(key) | |
status = CurrentState.find_by_key(key) | |
return nil if ! status | |
status.value | |
end | |
def dh_status_increment(key) | |
status = CurrentState.find_by_key(key) | |
if ! status | |
status = CurrentState.create(:key => key, :value => 0) | |
end | |
CurrentState.increment_counter(:value, status.id) | |
end | |
def dh_status_decrement(key) | |
status = CurrentState.find_by_key(key) | |
if ! status | |
status = CurrentState.create(:key => key, :value => 1) | |
end | |
CurrentState.decrement_counter(:value, status.id) | |
end | |
def dh_status_set(key, value) | |
status = CurrentState.find_by_key(key) | |
if ! status | |
status = CurrentState.create(:key => key, :value => value) | |
else | |
status.value = value | |
status.save | |
end | |
end | |
class DeliveryHandler | |
attr_reader :thread | |
attr_reader :run | |
def dh_log | |
@@dh_logger | |
end | |
def initialize | |
#FIXME | |
# init_pid_file # should be handled in calling app, not here in the library. | |
@run = true | |
@@delivery_handlers = {} | |
init_activerecord | |
init_logging | |
zero_states | |
end | |
def zero_states | |
# FIXME | |
dh_status_set('current_6000', 0) | |
dh_status_set('current_6001', 0) | |
dh_status_set('current_6002', 0) | |
end | |
def run | |
Signal.trap("USR1") do | |
dh_log.info "Received and USR1 signal, shutting down" | |
@run = false | |
end | |
dh_log.debug "Singal handler for USR1 installed (probably won't work due to jruby interception of the signal)" | |
dh_log.debug "Our PID - #{$$}" | |
while @run # let signal handlers shut us down cleanly | |
messages_to_deliver.each do |mdr| | |
dh_log.info "Handling mdr # #{mdr.id}" | |
if mdr.job.abort? | |
mdr.current_state = MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_JOB_ABORTED | |
mdr.save | |
dh_log.info "job id #{mdr.job.id} was marked for abort after activation, aborting individual delivery attempt" | |
next | |
else | |
case mdr.message.media_type | |
when Message::MEDIA_TYPE_VOICE | |
dh = VoiceDeliveryHandler.new(mdr) | |
when Message::MEDIA_TYPE_EMAIL | |
dh = EmailDeliveryHandler.new(mdr) | |
when Message::MEDIA_TYPE_SMS | |
dh = SmsDeliveryHandler.new(mdr) | |
end | |
dh.deliver | |
@@delivery_handlers["#{Time.now} #{mdr.id}"] = dh | |
end | |
end | |
sleep 1 | |
end | |
end | |
def reap_completed_handlers | |
@@delivery_handlers.keys.each do |key| | |
dh = @@delivery_handlers[key] | |
delivery_handler_thread = dh.thread | |
next if delivery_handler_thread.alive? | |
case @mdr.message.media_type | |
when Message::MEDIA_TYPE_EMAIL | |
decrement_used_capacity('email_capacity') | |
when Message::MEDIA_TYPE_SMS | |
decrement_used_capacity('sms_capacity') | |
end | |
@mdr.current_state = delivery_handler_thread[:current_state] | |
@mdr.last_state_at = (delivery_handler_thread[:last_state_at] || Time.now) | |
@mdr.save | |
# update the MDR | |
#if update_mdr | |
# MessageDeliveryResult.increment_counter(:delivery_attempts, mdr.id) | |
# mdr.provider_response = delivery_handler_thread['provider_response'] | |
# mdr.current_state = delivery_handler_thread['current_state'] | |
# mdr.attempted_at = (delivery_handler_thread['attempted_at'] || Time.now) | |
# mdr.last_state_at = (delivery_handler_thread['last_state_at'] || Time.now) | |
# mdr.delivered_at = (delivery_handler_thread['delivered_at'] || Time.now) | |
#end | |
end | |
end | |
def init_activerecord | |
# Read in the rails model | |
# FIXME hard coded to development | |
ENV['RAILS_ENV'] = 'development' | |
# FIXME hard coded path (probably keep, should never change) | |
# require '../../../rn_website/trunk/rapid_notify/config/environment.rb' | |
require '/home/pairprogramming/rapid_notify/rn_website/trunk/rapid_notify/config/environment.rb' | |
ActiveRecord::Base.logger = Logger.new('/tmp/job_scheduler_active_record.log') | |
ActiveRecord::Base.colorize_logging = false | |
end | |
def init_logging | |
dh_log = Logger.new(STDERR) | |
dh_log.level = Logger::DEBUG | |
dh_log.formatter = Logger::Formatter.new | |
dh_log.datetime_format = "%Y-%m-%d %H:%M:%S" | |
@@dh_logger = dh_log | |
end | |
def messages_to_deliver | |
messages = MessageDeliveryResult.find(:all, :conditions => "current_state = #{MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_PENDING} OR current_state IS NULL OR current_state = #{MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_QUEUED} OR current_state = #{MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_AWAITING_RETRY}") | |
#FIXME | |
# messages = messages.sort(priority) | |
dh_log.debug("Found #{messages.length} messages to do") if (messages.length > 0) | |
return messages | |
end | |
def decrement_used_capacity(key) | |
dh_status_decrement("current_#{key}") | |
end | |
def increment_used_capacity(key) | |
loop do | |
max_capacity = dh_config("max_#{key}").to_i | |
current_capacity = dh_config("current_#{key}").to_i | |
puts max_capacity | |
puts current_capacity | |
break if (current_capacity < max_capacity) | |
sleep 0.1 if ! reap_completed_handlers | |
end | |
dh_status_increment("current_#{key}") | |
end | |
def deliver | |
increment_used_capacity(@mdr.message.media_type) | |
@mdr.current_state = MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_RUNNING | |
@mdr.attempted_at = Time.now | |
@mdr.save | |
run | |
end | |
end | |
class VoiceDeliveryHandler < DeliveryHandler | |
def initialize(mdr) | |
@mdr = mdr | |
end | |
def run | |
@thread = Thread.new(@mdr.contact.phone_number, @mdr.id,@mdr.get_callerid) do |phone_number,mdr_id,callerid| | |
me = Thread.current | |
me[:current_state] = 0 | |
me[:current_state] = make_call(phone_number,mdr_id,callerid) | |
end | |
end | |
def make_call(phone_number,mdr_id, callerid) | |
prefix = dh_config('sip_prefix') | |
peer = dh_config('sip_peer') | |
context = dh_config('target_context') | |
begin | |
adhearsion = DRbObject.new_with_uri 'druby://127.0.0.1:9050' | |
adhearsion.async_call_into_context( | |
"#{prefix}#{phone_number}@#{peer}", | |
context, | |
:callerid => callerid, | |
:async => '1', | |
:variables => {:mdr_id => mdr_id} | |
) | |
return MessageDeliveryResult::DELIVERY_RESULT_CURRENT_STATE_RUNNING | |
rescue Exception => err | |
dh_logger.error "begin exception from async_call_into_context" | |
dh_logger.error "#{err.message}\n#{err.backtrace.join("\n")}" | |
dh_logger.error "end exception from async_call_into_context" | |
end | |
end | |
end | |
class EmailDeliveryHandler < DeliveryHandler | |
def initialize(mdr) | |
@mdr = mdr | |
end | |
def run | |
#FIXME hard coded from address | |
puts __LINE__ | |
@thread = Thread.new(@mdr.contact.sms, @mdr.id,@mdr.message.subject, @mdr.message.body,'[email protected]') do |email,mdr_id,subject, body, sender| | |
puts __LINE__ | |
me = Thread.current | |
me[:current_state] = 0 | |
puts __LINE__ | |
#me[:current_state] = send_email(email,mdr_id,subject, body, sender) | |
send_email(email,mdr_id,subject, body, sender) | |
puts __LINE__ | |
end | |
end | |
def send_email | |
puts __LINE__ | |
from_address = '[email protected]' | |
return_path_address = '[email protected]' | |
reply_to_address = sender | |
# FIXME is this done elsewhere? | |
msgstr = "To: #{email} | |
From: #{from_address} | |
Reply-To: #{reply_to_address} | |
Return-Path: #{return_path_address} | |
Subject: #{subject} | |
#{body}" | |
puts __LINE__ | |
begin | |
puts __LINE__ | |
Net::SMTP.start('127.0.0.1', 25) do |smtp| | |
smtp.send_message msgstr, from_address, email | |
end | |
puts __LINE__ | |
@thread[:provider_response] = MessageDeliveryResult::EMAIL_RESULT_ACCEPTED | |
puts __LINE__ | |
rescue Object => err | |
puts __LINE__ | |
ae_logger.error "Mail to #{@mdr.contact.email} threw an exception: #{err}" | |
ae_logger.error "#{err.message}\n#{err.backtrace.join("\n")}" | |
puts __LINE__ | |
@thread[:provider_response] = MessageDeliveryResult::EMAIL_RESULT_SERVICE_UNAVAILABLE | |
puts __LINE__ | |
end | |
end | |
end | |
class SmsDeliveryHandler < DeliveryHandler | |
def initialize(mdr) | |
@mdr = mdr | |
end | |
def run | |
@thread = Thread.new(@mdr.contact.sms, @mdr.id,@mdr.message.subject, @mdr.message.body) do |sms,mdr_id,subject, body| | |
me = Thread.current | |
me[:current_state] = 0 | |
me[:current_state] = send_sms(sms,mdr_id,subject, body) | |
end | |
end | |
def send_sms(sms,mdr_id,subject, message_content) | |
message_content = body | |
if !valid_nanp?(sms) | |
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_BAD_NUMBER | |
return | |
end | |
xml_content = File.read('sms_text.xml') | |
xml_content.gsub!('PHONE_NUMBER', sms) | |
xml_content.gsub!('MESSAGE_CONTENT', message_content) | |
begin | |
# FIXME hard coded hostname | |
http = Net::HTTP.new('gateway.ivisionmobile.com', 443) | |
http.use_ssl = true | |
# FIXME should we use a more secure option here? Probably not, but review. | |
http.verify_mode = OpenSSL::SSL::VERIFY_NONE | |
resp,data = http.post('/ivisionwebpost.aspx', "xmlstring=#{xml_content}") | |
case resp | |
when Net::HTTPSuccess | |
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_ACCEPTED | |
return | |
else | |
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_OTHER_FAILURE | |
return | |
end | |
rescue Object => err | |
ae_logger.error "Unexpected exception thrown at end of sms delivery" | |
ae_logger.error "#{err.message}\n#{err.backtrace.join("\n")}" | |
@thread[:provider_response] = MessageDeliveryResult::SMS_RESULT_SERVICE_UNAVAILABLE | |
return | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment