Skip to content

Instantly share code, notes, and snippets.

@anna-anisienia
Created May 24, 2021 10:12
Show Gist options
  • Save anna-anisienia/d2ce9152f2036cac64ee4ad437d8e2e7 to your computer and use it in GitHub Desktop.
Save anna-anisienia/d2ce9152f2036cac64ee4ad437d8e2e7 to your computer and use it in GitHub Desktop.
import boto3
import uuid
import json
import random
import logging
from datetime import datetime
logging.basicConfig(format="[%(levelname)s] [%(name)s] [%(asctime)s]: %(message)s",
level="INFO")
logger = logging.getLogger(__name__)
sns = boto3.client('sns', region_name='eu-central-1')
# CREATE TOPIC
topic_name = "example_sns_topic"
create_response = sns.create_topic(Name=topic_name)
topic_arn = create_response.get("TopicArn")
logger.info("Create topic response: %s", create_response)
# CREATE SUBSCRIPTIONS
email_sub = sns.subscribe(TopicArn=topic_arn,
Protocol='email',
Endpoint="your_email")
phone_sub = sns.subscribe(TopicArn=topic_arn,
Protocol='sms',
Endpoint="your_phone_number")
sqs_sub = sns.subscribe(TopicArn=topic_arn,
Protocol='sqs',
Endpoint="your_sqs_queue_arn")
# SEND MESSAGES
sns.publish(TopicArn=topic_arn,
Message=json.dumps(dict(item=random.randint(1, 100000),
value=random.random(),
sent_timestamp=datetime.utcnow().isoformat(),
id=str(uuid.uuid4()))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment