Initialising an sns client
import boto3
AWS_REGION = "us-east-1"
sns_client = boto3.client("sns", region_name=AWS_REGION)
Initialising an sns Resource
import boto3
AWS_REGION = "us-east-1"
sns_resource = boto3.resource("sns", region_name=AWS_REGION)
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def create_topic(name):
"""
Creates a SNS notification topic.
"""
try:
topic = sns_client.create_topic(Name=name)
logger.info(f'Created SNS topic {name}.')
except ClientError:
logger.exception(f'Could not create SNS topic {name}.')
raise
else:
return topic
if __name__ == '__main__':
topic_name = 'hands-on-cloud-sns-topic'
logger.info(f'Creating SNS topic {topic_name}...')
topic = create_topic(topic_name)
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def list_topics():
"""
Lists all SNS notification topics using paginator.
"""
try:
paginator = sns_client.get_paginator('list_topics')
# creating a PageIterator from the paginator
page_iterator = paginator.paginate().build_full_result()
topics_list = []
# loop through each page from page_iterator
for page in page_iterator['Topics']:
topics_list.append(page['TopicArn'])
except ClientError:
logger.exception(f'Could not list SNS topics.')
raise
else:
return topics_list
if __name__ == '__main__':
logger.info(f'Listing all SNS topics...')
topics = list_topics()
for topic in topics:
logger.info(topic)
import logging
import boto3
from botocore.exceptions import ClientError
import json
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def list_topic_attributes():
"""
Lists all SNS topics attributes using paginator.
"""
try:
paginator = sns_client.get_paginator('list_topics')
# creating a PageIterator from the paginator
page_iterator = paginator.paginate().build_full_result()
topic_attributes_list = []
# loop through each page from page_iterator
for page in page_iterator['Topics']:
response = sns_client.get_topic_attributes(
TopicArn=page['TopicArn']
)['Attributes']
dict_obj = {
'TopicArn': page['TopicArn'],
'TopicPolicy': json.loads(response['Policy'])
}
topic_attributes_list.append(dict_obj)
except ClientError:
logger.exception(f'Could not get SNS topic attributes.')
raise
else:
return topic_attributes_list
if __name__ == '__main__':
logger.info(f'Listing all SNS topic attributes ...')
topic_attributes = list_topic_attributes()
for topic_attribute in topic_attributes:
logger.info(json.dumps(topic_attribute, indent=4, sort_keys=True))
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def add_permission(topic_arn, policy_label, account_ids, action_names):
"""
Adds a policy statement to a topic's access control policy.
"""
try:
response = sns_client.add_permission(TopicArn=topic_arn,
Label=policy_label,
AWSAccountId=account_ids,
ActionName=action_names)
except ClientError:
logger.exception(f'Could not add access policy to the topic.')
raise
else:
return response
if __name__ == '__main__':
topic_arn = 'your-topic-ARN'
policy_label = 'hands-on-cloud-sns-topic-policy'
account_ids = ['your-account-id']
action_names = ['Publish', 'GetTopicAttributes']
logger.info(f'Adding access policy {policy_label} to {topic_arn}...')
add_permission(topic_arn, policy_label, account_ids, action_names)
logger.info(f'Access policy {policy_label} added to {topic_arn}.')
Publish message on SNS topic
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def publish_message(topic_arn, message, subject):
"""
Publishes a message to a topic.
"""
try:
response = sns_client.publish(
TopicArn=topic_arn,
Message=message,
Subject=subject,
)['MessageId']
except ClientError:
logger.exception(f'Could not publish message to the topic.')
raise
else:
return response
if __name__ == '__main__':
topic_arn = 'your-topic-ARN'
message = 'This is a test message on topic.'
subject = 'This is a message subject on topic.'
logger.info(f'Publishing message to topic - {topic_arn}...')
message_id = publish_message(topic_arn, message, subject)
logger.info(
f'Message published to topic - {topic_arn} with message Id - {message_id}.'
)
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def delete_topic(topic_arn):
"""
Delete a SNS topic.
"""
try:
response = sns_client.delete_topic(TopicArn=topic_arn)
except ClientError:
logger.exception(f'Could not delete a SNS topic.')
raise
else:
return response
if __name__ == '__main__':
topic_arn = 'your-topic-ARN'
logger.info(f'Deleting a SNS topic...')
delete_response = delete_topic(topic_arn)
logger.info(f'Deleted a topic - {topic_arn} successfully.')
Create SNS topic subscription
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def subscribe(topic, protocol, endpoint):
"""
Subscribe to a topic using endpoint as email OR SMS
"""
try:
subscription = sns_client.subscribe(
TopicArn=topic,
Protocol=protocol,
Endpoint=endpoint,
ReturnSubscriptionArn=True)['SubscriptionArn']
except ClientError:
logger.exception(
"Couldn't subscribe {protocol} {endpoint} to topic {topic}.")
raise
else:
return subscription
if __name__ == '__main__':
topic_arn = 'your-topic-ARN'
protocol = 'email'
endpoint = '[email protected] '
logger.info('Subscribing to a SNS topic...')
# Creates an email subscription
response = subscribe(topic_arn, protocol, endpoint)
logger.info(
f'Subscribed to a topic successfully.\nSubscription Arn - {response}')
List SNS topic subscriptions
import logging
import boto3
from botocore.exceptions import ClientError
import json
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def list_topic_subscriptions(topic_arn):
"""
Lists SNS notification topic subscriptions using paginator.
"""
try:
paginator = sns_client.get_paginator('list_subscriptions_by_topic')
# creating a PageIterator from the paginator
page_iterator = paginator.paginate(TopicArn=topic_arn,
PaginationConfig={'MaxItems': 100})
topic_subscriptions = []
# loop through each page from page_iterator
for page in page_iterator:
for subscription in page['Subscriptions']:
topic_subscriptions.append(subscription)
except ClientError:
logger.exception(f'Could not list SNS topics.')
raise
else:
return topic_subscriptions
if __name__ == '__main__':
topic_arn = 'your-topic-ARN'
logger.info(f'Listing SNS topic subscriptions...')
topic_subscriptions = list_topic_subscriptions(topic_arn)
for subscription in topic_subscriptions:
logger.info(json.dumps(subscription, indent=4, sort_keys=True))
Delete SNS topic subscription
import logging
import boto3
from botocore.exceptions import ClientError
AWS_REGION = 'us-east-1'
# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
format='%(asctime)s: %(levelname)s: %(message)s')
sns_client = boto3.client('sns', region_name=AWS_REGION)
def unsubscribe(subscription_arn):
"""
Delete/ Unsubscribe to a topic subscription
"""
try:
unsubscribe = sns_client.unsubscribe(SubscriptionArn=subscription_arn)
except ClientError:
logger.exception("Couldn't unsubscribe to {subscription_arn}.")
raise
else:
return unsubscribe
if __name__ == '__main__':
subscription_arn = 'your-subscription-arn'
logger.info('Unsubscribing to a SNS topic subscription...')
response = unsubscribe(subscription_arn)
logger.info(
f'Successfully unsubscribed to a topic.\nSubscription Arn - {subscription_arn}'
)