Last active
August 29, 2015 14:12
-
-
Save IronSavior/24c9ed577224887982f1 to your computer and use it in GitHub Desktop.
Temporary SQS queues and SNS topics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Erik Elmore <[email protected]> | |
# License: Public Domain | |
require 'securerandom' | |
require 'aws' | |
# Test the feasability of using temporary queues and topics as a means of receiving | |
# notifications from remote systems via SQS and SNS. TestClient creates either only a | |
# queue or both a queue and a topic before sending a request to the server. When the | |
# client wishes to receive its response directly in a temporary queue, the request | |
# body is the URL to its temporary queue. When the client wishes to be notified via a | |
# topic with a subscribed queue, the body of the request is the ARN of the temporary | |
# SNS topic. | |
# | |
# My observations at this time indicate that the setup costs of using either method | |
# is less than 1 second, but the full cycle (request to response) is about 4x faster | |
# when using temporary SQS queues without a temporary SNS topic. | |
class TestClient | |
def initialize( name, region, server_queue ) | |
@name = name | |
@sqs = AWS::SQS.new :region => region | |
@sns = AWS::SNS.new :region => region | |
@server_queue = server_queue | |
end | |
def run( reply_via_sns = true ) | |
send reply_via_sns ? :temporary_queue_via_sns : :temporary_queue do |queue, topic| | |
req = reply_via_sns ? topic.arn : queue.url | |
@server_queue.send_message req | |
puts 'Request sent: %s' % req | |
queue.poll :attributes => [:all] do |m| | |
handler m | |
break | |
end | |
end | |
end | |
def temporary_queue_via_sns( topic_base_name = @name, queue_base_name = @name ) | |
raise ArgumentError, 'Block required' unless block_given? | |
temporary_topic(topic_base_name){ |topic| | |
temporary_queue(queue_base_name){ |queue| | |
begin | |
sub = topic.subscribe queue | |
sub.raw_message_delivery = true | |
yield queue, topic, sub | |
ensure | |
sub.unsubscribe if sub && sub.exists? | |
end | |
} | |
} | |
end | |
def temporary_topic( base_name = @name ) | |
raise ArgumentError, 'Block required' unless block_given? | |
topic = create_temporary_topic base_name | |
yield topic | |
ensure | |
topic.delete if topic | |
end | |
def temporary_queue( base_name = @name ) | |
raise ArgumentError, 'Block required' unless block_given? | |
queue = create_temporary_queue base_name | |
yield queue | |
ensure | |
queue.delete if queue && queue.exists? | |
end | |
def create_temporary_queue( base_name = @name ) | |
@sqs.queues.create randomized_name(base_name) | |
rescue AWS::SQS::Errors::QueueNameExists, AWS::SQS::Errors::QueueDeletedRecently | |
retry | |
end | |
def create_temporary_topic( base_name = @name ) | |
name = nil | |
begin | |
name = randomized_name base_name | |
end while @sns.topics.detect{|t| t.name == name } | |
@sns.topics.create name | |
end | |
def randomized_name( base_name = @name ) | |
'%s-%s' % [base_name, SecureRandom.uuid] | |
end | |
def handler( msg ) | |
puts 'Received: "%s"' % msg.body | |
end | |
end | |
if $0 == __FILE__ | |
server_queue = AWS::SQS.new(:region => 'us-west-2').queues.named 'messaging_test_server' | |
client_region = ARGV[0] || 'us-west-2' | |
using_sns = ARGV.size > 1 | |
catch :terminate do | |
Signal.trap('INT'){ throw :terminate } | |
loop do | |
TestClient.new('messaging_test_client', client_region, server_queue).run using_sns | |
puts | |
end | |
end | |
puts | |
puts "Stopped" | |
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Erik Elmore <[email protected]> | |
# License: Public Domain | |
require 'aws' | |
require 'uri' | |
# Listens to the given AWS::SQS::Queue and sends responses based on the body of | |
# the request. | |
class TestServer | |
def initialize( queue ) | |
@queue = queue | |
end | |
def run | |
puts 'Listening...' | |
@terminate = false | |
loop do | |
break if @terminate | |
begin | |
@queue.receive_message(:attributes => [:all]){|m| handler m } | |
rescue => e | |
puts 'Error (incoming message ignored): %s' % e.message | |
puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}" | |
end | |
end | |
puts | |
puts 'Server shut down' | |
end | |
def stop | |
@terminate = true | |
end | |
def arn_region( arn ) | |
arn.split(':')[3] | |
end | |
def url_region( url ) | |
URI.parse(url).host.match(/^sqs\.(.+?)\./)[1] | |
end | |
def reply_to( msg, reply ) | |
if msg.body.start_with? 'arn:aws:sns:' | |
AWS::SNS.new( | |
:region => arn_region(msg.body) | |
).topics[msg.body].publish reply | |
else | |
AWS::SQS.new( | |
:region => url_region(msg.body) | |
).queues[msg.body].send_message reply | |
end | |
end | |
def handler( msg ) | |
puts 'Received: %s' % msg.body | |
if msg.receive_count > 3 | |
puts 'WARN: Message received more than 3 times. Deleting.' | |
return | |
end | |
reply_to msg, 'Valid Response' | |
end | |
end | |
if $0 == __FILE__ | |
queue = lambda { |queue_name, sqs| | |
begin | |
sqs.queues.named queue_name | |
rescue AWS::SQS::Errors::NonExistentQueue | |
sqs.queues.create queue_name, :message_retention_period => 10 | |
end | |
}.call 'messaging_test_server', AWS::SQS.new(:region => 'us-west-2') | |
TestServer.new(queue).tap{ |server| | |
Signal.trap('INT'){ server.stop } | |
}.run | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment