Skip to content

Instantly share code, notes, and snippets.

Last active February 6, 2020 22:36
Show Gist options
  • Save stuartmyles/8099723 to your computer and use it in GitHub Desktop.
Save stuartmyles/8099723 to your computer and use it in GitHub Desktop.
A complete example of how to use Amazon Web Services Simple Notification Services from Python. This code uses the boto library to create a topic, wait for a confirmation and then send a success message. Simply plug in your AWS access and secret keys, plus your email and mobile phone number.
# An example of how to use AWS SNS with Python's boto
# By Stuart Myles @smyles
# Inspired by parts of the Ruby SWF SNS tutorial
# And the Python SNS code in and
import boto.sns as sns
import json
class SNSTopicShell:
def fail(self, reason):
def complete(self, result):
class SNSTopicCreator(SNSTopicShell):
def _create_topic(self, sns_client):
t = sns_client.create_topic('SNS_Sample_Topic')
topic_arn = t['CreateTopicResponse']['CreateTopicResult']['TopicArn']
# For an SMS notification, setting `DisplayName` is *required*. Note that
# only the *first 10 characters* of the DisplayName will be shown on the
# SMS message sent to the user, so choose your DisplayName wisely!
if topic_arn:
sns_client.set_topic_attributes(topic_arn, 'DisplayName', 'SNSSample')
else:{"reason", "Couldn't create SNS topic", "detail", "" }))
return False, "Couldn't create SNS topic"
return True, topic_arn
# Subscribe the user to an Amazon SNS topic
def _subscribe_topic_activity(self, task):
activity_data = { "topic_arn": None, "email": {"endpoint" : None, "SubscriptionArn" : None}, "sms": {"endpoint" : None, "SubscriptionArn" : None} }
if task:
input = json.loads(task)
activity_data["email"]["endpoint"] = input["email"]
activity_data["sms"]["endpoint"] = input["sms"]
else:{"reason", "Didn't receive any input!", "detail", "" }))
return False, "Didn't receive any input!"
sns_client = sns.SNSConnection(aws_access_key_id=ACCESS, aws_secret_access_key=SECRET)
# Create the topic and get the ARN
result, activity_data["topic_arn"] = self._create_topic(sns_client)
if result:
# Subscribe the user to the topic, using either or both endpoints.
for protocol in ["email", "sms"]:
ep = activity_data[protocol]["endpoint"]
if (ep):
print("About to subscribe protocol: " + protocol + " ep: " + ep)
response = sns_client.subscribe(activity_data["topic_arn"], protocol, ep)
activity_data[protocol]["SubscriptionArn"] = response['SubscribeResponse']['SubscribeResult']["SubscriptionArn"]
# If at least one subscription arn is set, consider this a success.
if (activity_data["email"]["SubscriptionArn"] != None) or (activity_data["sms"]["SubscriptionArn"] != None):
return True, json.dumps(activity_data)
else:{ "reason" : "Couldn't subscribe to SNS topic", "detail" : "" }))
return False, "Couldn't subscribe to SNS topic"
return False, "Couldn't create SNS topic"
class SNSTopicWaiter(SNSTopicShell):
# Wait for the user to confirm the subscription to the SNS topic
def _wait_for_confirmation_activity(self, task):
if task:
subscription_data = json.loads(task)
else:{"reason", "Didn't receive any input!", "detail", "" }))
sns_client = sns.SNSConnection(aws_access_key_id=ACCESS, aws_secret_access_key=SECRET)
topic = sns_client.get_topic_attributes(subscription_data["topic_arn"])
if topic:
subscription_confirmed = False
else:{ "reason" : "Couldn't get SWF topic ARN", "detail" : "Topic ARN: %s" % subscription_data["topic_arn"] }))
# Loop through all of the subscriptions to this topic until we get some indication that a subscription was confirmed.
while not subscription_confirmed:
for sub in sns_client.get_all_subscriptions_by_topic(subscription_data["topic_arn"])["ListSubscriptionsByTopicResponse"]["ListSubscriptionsByTopicResult"]["Subscriptions"]:
if subscription_data[sub["Protocol"]]["endpoint"] == sub["Endpoint"]:
# this is one of the endpoints we're interested in. Is it subscribed?
if sub["SubscriptionArn"] != 'PendingConfirmation':
subscription_data[sub["Protocol"]]["subscription_arn"] = sub["SubscriptionArn"]
subscription_confirmed = True
return True, json.dumps(subscription_data)
class SNSTopicConfirmer(SNSTopicShell):
# Send the user a confirmation of the successful topic subscription, using the topic that the user subscribed to and the endpoint that the user confirmed the subscription with
def _send_result_activity(self, task):
if task:
subscription_data = json.loads(task)
else:{"reason", "Didn't receive any input!", "detail", "" }))
sns_client = sns.SNSConnection(aws_access_key_id=ACCESS, aws_secret_access_key=SECRET)
results = "Thanks, you've successfully confirmed registration, and your workflow is complete!"
# send the message via SNS
sns_client.publish(topic=subscription_data["topic_arn"], message=results)
return True
if __name__ == '__main__':
st = SNSTopicCreator()
testtaskjs = json.dumps({ "email" : "<REPLACE_WITH_EMAIL_ADDRESS>", "sms" : "<REPLACE_WITH_SMS_ENABLED_PHONE_NUMBER>"})
result, subscription_data = st._subscribe_topic_activity(testtaskjs)
if result:
sw = SNSTopicWaiter()
result, subscription_data = sw._wait_for_confirmation_activity(subscription_data)
if result:
sc = SNSTopicConfirmer()
if sc._send_result_activity(subscription_data):
print("It worked!")
print("Confirmation failed...")
print("Waiting failed...")
print("Creation failed...")
Copy link

This is a helpful example but I might suggest adding a delay around after line 95 (for sub in sns_client.get_all_subscriptions_by_topic...).

When I tried it I got a rate limit error. This might be prone to happen in some situations particularly for accounts that have default rate threshold settings.

In my case I added a time.sleep(1) after the for to keep it from hammering the api and it worked.

When i tried it as is I got this error back after the pending confirmation message:

Traceback (most recent call last):
  File "", line 130, in <module>
    result, subscription_data = sw._wait_for_confirmation_activity(subscription_data)
  File "", line 95, in _wait_for_confirmation_activity
    for sub in sns_client.get_all_subscriptions_by_topic(subscription_data["topic_arn"])["ListSubscriptionsByTopicResponse"]["ListSubscriptionsByTopicResult"]["Subscriptions"]:
  File "/usr/local/lib/python2.7/site-packages/boto-2.48.0-py2.7.egg/boto/sns/", line 440, in get_all_subscriptions_by_topic
    return self._make_request('ListSubscriptionsByTopic', params)
  File "/usr/local/lib/python2.7/site-packages/boto-2.48.0-py2.7.egg/boto/sns/", line 765, in _make_request
    raise self.ResponseError(response.status, response.reason, body)
boto.exception.BotoServerError: BotoServerError: 400 Bad Request
{"Error":{"Code":"Throttling","Message":"Rate exceeded","Type":"Sender"},"RequestId":"xxx"}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment