Skip to content

Instantly share code, notes, and snippets.

@leadelngalame1611
Created March 29, 2022 07:16
Show Gist options
  • Save leadelngalame1611/c72bbe7ed13b8847119070fee39b1339 to your computer and use it in GitHub Desktop.
Save leadelngalame1611/c72bbe7ed13b8847119070fee39b1339 to your computer and use it in GitHub Desktop.

Initialising an sns client

import boto3

AWS_REGION = "us-east-1"

sns_client = boto3.client("sns", region_name=AWS_REGION)

Initialising an sns Resource

import boto3

AWS_REGION = "us-east-1"

sns_resource = boto3.resource("sns", region_name=AWS_REGION)

Create sns topic

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def create_topic(name):
    """
    Creates a SNS notification topic.
    """
    try:
        topic = sns_client.create_topic(Name=name)
        logger.info(f'Created SNS topic {name}.')

    except ClientError:
        logger.exception(f'Could not create SNS topic {name}.')
        raise
    else:
        return topic


if __name__ == '__main__':

    topic_name = 'hands-on-cloud-sns-topic'
    logger.info(f'Creating SNS topic {topic_name}...')
    topic = create_topic(topic_name)

List sns topics

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def list_topics():
    """
    Lists all SNS notification topics using paginator.
    """
    try:

        paginator = sns_client.get_paginator('list_topics')

        # creating a PageIterator from the paginator
        page_iterator = paginator.paginate().build_full_result()

        topics_list = []

        # loop through each page from page_iterator
        for page in page_iterator['Topics']:
            topics_list.append(page['TopicArn'])
    except ClientError:
        logger.exception(f'Could not list SNS topics.')
        raise
    else:
        return topics_list


if __name__ == '__main__':

    logger.info(f'Listing all SNS topics...')
    topics = list_topics()

    for topic in topics:
        logger.info(topic)

List sns Attributes

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def list_topic_attributes():
    """
    Lists all SNS topics attributes using paginator.
    """
    try:

        paginator = sns_client.get_paginator('list_topics')

        # creating a PageIterator from the paginator
        page_iterator = paginator.paginate().build_full_result()

        topic_attributes_list = []

        # loop through each page from page_iterator
        for page in page_iterator['Topics']:

            response = sns_client.get_topic_attributes(
                TopicArn=page['TopicArn']
            )['Attributes']

            dict_obj = {
                'TopicArn': page['TopicArn'],
                'TopicPolicy': json.loads(response['Policy'])
            }

            topic_attributes_list.append(dict_obj)

    except ClientError:
        logger.exception(f'Could not get SNS topic attributes.')
        raise
    else:
        return topic_attributes_list


if __name__ == '__main__':

    logger.info(f'Listing all SNS topic attributes ...')
    topic_attributes = list_topic_attributes()

    for topic_attribute in topic_attributes:
        logger.info(json.dumps(topic_attribute, indent=4, sort_keys=True))

Set SNS topic policy

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def add_permission(topic_arn, policy_label, account_ids, action_names):
    """
    Adds a policy statement to a topic's access control policy.
    """
    try:

        response = sns_client.add_permission(TopicArn=topic_arn,
                                             Label=policy_label,
                                             AWSAccountId=account_ids,
                                             ActionName=action_names)

    except ClientError:
        logger.exception(f'Could not add access policy to the topic.')
        raise
    else:
        return response


if __name__ == '__main__':

    topic_arn = 'your-topic-ARN'
    policy_label = 'hands-on-cloud-sns-topic-policy'
    account_ids = ['your-account-id']
    action_names = ['Publish', 'GetTopicAttributes']

    logger.info(f'Adding access policy {policy_label} to {topic_arn}...')
    add_permission(topic_arn, policy_label, account_ids, action_names)
    logger.info(f'Access policy {policy_label} added to {topic_arn}.')

Publish message on SNS topic

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def publish_message(topic_arn, message, subject):
    """
    Publishes a message to a topic.
    """
    try:

        response = sns_client.publish(
            TopicArn=topic_arn,
            Message=message,
            Subject=subject,
        )['MessageId']

    except ClientError:
        logger.exception(f'Could not publish message to the topic.')
        raise
    else:
        return response


if __name__ == '__main__':

    topic_arn = 'your-topic-ARN'
    message = 'This is a test message on topic.'
    subject = 'This is a message subject on topic.'

    logger.info(f'Publishing message to topic - {topic_arn}...')
    message_id = publish_message(topic_arn, message, subject)
    logger.info(
        f'Message published to topic - {topic_arn} with message Id - {message_id}.'
    )

Delete SNS topic

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def delete_topic(topic_arn):
    """
    Delete a SNS topic.
    """
    try:
        response = sns_client.delete_topic(TopicArn=topic_arn)
    except ClientError:
        logger.exception(f'Could not delete a SNS topic.')
        raise
    else:
        return response


if __name__ == '__main__':

    topic_arn = 'your-topic-ARN'

    logger.info(f'Deleting a SNS topic...')
    delete_response = delete_topic(topic_arn)
    logger.info(f'Deleted a topic - {topic_arn} successfully.')

Create SNS topic subscription

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def subscribe(topic, protocol, endpoint):
    """
    Subscribe to a topic using endpoint as email OR SMS
    """
    try:
        subscription = sns_client.subscribe(
            TopicArn=topic,
            Protocol=protocol,
            Endpoint=endpoint,
            ReturnSubscriptionArn=True)['SubscriptionArn']
    except ClientError:
        logger.exception(
            "Couldn't subscribe {protocol} {endpoint} to topic {topic}.")
        raise
    else:
        return subscription


if __name__ == '__main__':

    topic_arn = 'your-topic-ARN'
    protocol = 'email'
    endpoint = '[email protected]'

    logger.info('Subscribing to a SNS topic...')

    # Creates an email subscription
    response = subscribe(topic_arn, protocol, endpoint)

    logger.info(
        f'Subscribed to a topic successfully.\nSubscription Arn - {response}')

List SNS topic subscriptions

import logging
import boto3
from botocore.exceptions import ClientError
import json

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def list_topic_subscriptions(topic_arn):
    """
    Lists SNS notification topic subscriptions using paginator.
    """
    try:

        paginator = sns_client.get_paginator('list_subscriptions_by_topic')

        # creating a PageIterator from the paginator
        page_iterator = paginator.paginate(TopicArn=topic_arn,
                                           PaginationConfig={'MaxItems': 100})

        topic_subscriptions = []

        # loop through each page from page_iterator
        for page in page_iterator:
            for subscription in page['Subscriptions']:
                topic_subscriptions.append(subscription)
    except ClientError:
        logger.exception(f'Could not list SNS topics.')
        raise
    else:
        return topic_subscriptions


if __name__ == '__main__':

    topic_arn = 'your-topic-ARN'

    logger.info(f'Listing SNS topic subscriptions...')

    topic_subscriptions = list_topic_subscriptions(topic_arn)

    for subscription in topic_subscriptions:
        logger.info(json.dumps(subscription, indent=4, sort_keys=True))

Delete SNS topic subscription

import logging
import boto3
from botocore.exceptions import ClientError

AWS_REGION = 'us-east-1'

# logger config
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s: %(levelname)s: %(message)s')

sns_client = boto3.client('sns', region_name=AWS_REGION)


def unsubscribe(subscription_arn):
    """
    Delete/ Unsubscribe to a topic subscription
    """
    try:
        unsubscribe = sns_client.unsubscribe(SubscriptionArn=subscription_arn)
    except ClientError:
        logger.exception("Couldn't unsubscribe to {subscription_arn}.")
        raise
    else:
        return unsubscribe


if __name__ == '__main__':

    subscription_arn = 'your-subscription-arn'

    logger.info('Unsubscribing to a SNS topic subscription...')

    response = unsubscribe(subscription_arn)

    logger.info(
        f'Successfully unsubscribed to a topic.\nSubscription Arn - {subscription_arn}'
    )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment