Last active
August 9, 2016 18:22
-
-
Save IronSavior/d9277e5fa845a76340a1 to your computer and use it in GitHub Desktop.
Poll for SQS messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Erik Elmore <[email protected]> | |
# License: Public Domain | |
require 'uri' | |
require 'aws-sdk' | |
# Facilitates polling and handling queue messages. | |
class SQSMessageDispatch | |
# Configure a new SQSMessageDispatch for the given queue URL. | |
# @param queue_url [String] URL of the SQS message queue | |
# @param sqs_opts [Hash] parameters passed to Aws::SQS::Client.new (except :region) | |
def initialize( queue_url, sqs_opts = {} ) | |
@url = String queue_url | |
@sqs = Aws::SQS::Client.new( | |
{ region: URI.parse(@url).host.match(/^sqs\.([^.]+)\./)[1] }.merge sqs_opts | |
) | |
end | |
# Poll message queue continuously until #stop is called. Yields each message to the given block. Message is | |
# implicitly deleted when the block is finished except when the block raises or throws. Returns Enumerator when | |
# block is not given. | |
# @param args [Hash] Additional parameters for Aws::SQS::Client#receive_message | |
def poll( *args, &blk ) | |
return enum_for __callee__ unless block_given? | |
raise StandardError, 'Already polling' if polling? | |
safe_poll *args, &blk | |
end | |
alias_method :each_message, :poll | |
# True when currently polling | |
def polling? | |
@polling | |
end | |
# End the polling loop after the current iteration. | |
def stop | |
return unless @polling | |
@polling = false | |
end | |
# Easy on the eyes | |
def inspect | |
'#<%s @url="%s"%s>' % [ | |
self.class, | |
@url, | |
polling?? ' (polling)' : nil | |
] | |
end | |
# Safe poll and dispatch | |
def safe_poll( *args, &blk ) | |
@polling = true | |
receive_messages(*args).each{ |msg| dispatch_message msg, &blk } while polling? | |
ensure | |
@polling = false | |
end | |
private :safe_poll | |
# Safe dispatch of received message. Traps errors and invokes message handler. Messages are implicitly deleted | |
# unless the handler raises or throws. | |
# @param msg [Aws::Structure] received message | |
def dispatch_message( msg ) | |
yield msg | |
delete_message msg | |
rescue => e | |
warn 'Error from handler: "%s" Trace: %s Received Message: %s' % [ | |
e.message, | |
"\n\t#{e.backtrace.join("\n\t")}\n", | |
msg.to_h | |
] | |
end | |
private :dispatch_message | |
# Fetch messages from the queue. | |
# @param opts [Hash] Additional parameters for Aws::SQS::Client#receive_message except :queue_url | |
# @return [Array<Aws::Structure>] Zero or more messages | |
def receive_messages( opts = {} ) | |
@sqs.receive_message(opts.merge queue_url: @url).messages | |
rescue Aws::SQS::Errors::OverLimit | |
warn 'Too many in-flight messages at this time' | |
Array[] | |
end | |
private :receive_messages | |
# Remove a received message from the queue | |
# @param msg [Aws::Structure] a received message | |
def delete_message( msg ) | |
@sqs.delete_message queue_url: @url, receipt_handle: msg.receipt_handle | |
rescue Aws::SQS::Errors::ReceiptHandleIsInvalid | |
warn 'Cannot delete message with invalid receipt handle: %s' % msg.to_h | |
end | |
private :delete_message | |
end | |
if $0 == __FILE__ | |
raise 'usage: %s queue_url' % File.basename($0) if ARGV.empty? | |
SQSMessageDispatch.new(ARGV.first).tap do |worker| | |
Signal.trap('INT'){ | |
puts | |
worker.stop | |
} | |
worker.poll wait_time_seconds: 3, max_number_of_messages: 10 do |msg| | |
puts '*** Recieved message: %s' % msg.body | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment