Last active
January 26, 2018 20:11
-
-
Save tripliks/948739af2140af7875c2f6d8ce454e28 to your computer and use it in GitHub Desktop.
Set SNS subscription filters using Zappa post callback
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import json | |
import boto3 | |
logger = logging.getLogger(__name__) | |
sns_client = boto3.client('sns') | |
def get_subscription(topic_arn, lambda_arn): | |
try: | |
response = sns_client.list_subscriptions_by_topic(TopicArn=topic_arn) | |
logger.debug(response) | |
for subscription in response['Subscriptions']: | |
if subscription['Endpoint'] == lambda_arn: | |
return subscription | |
return None | |
except Exception: | |
logger.exception('Unable to find event source %s', topic_arn) | |
def set_subscription_filters(subscription_arn, filters): | |
try: | |
response = sns_client.set_subscription_attributes( | |
SubscriptionArn=subscription_arn, | |
AttributeName='FilterPolicy', | |
AttributeValue=json.dumps(filters) | |
) | |
logger.debug(response) | |
except Exception: | |
logger.exception('Unable to add filters for SNS topic %s', subscription_arn) | |
def set_filters(zappa_cli): | |
events = zappa_cli.stage_config.get('events', []) | |
for event in events: | |
event_source = event.get('event_source', None) | |
event_source_arn = event_source['arn'] | |
filters = event_source.get('filters', {}) # Get filters from event config using 'filters' key. | |
lambda_arn = zappa_cli.zappa.get_lambda_function(function_name=zappa_cli.lambda_name) | |
service = zappa_cli.zappa.service_from_arn(event_source_arn) | |
if not filters: | |
logger.warning('Filters not set. Skipping.') | |
continue | |
if 'sns' not in service: | |
logger.info('Not an SNS service %s' % service) | |
continue | |
subscription = get_subscription(event_source_arn, lambda_arn) | |
if not subscription: | |
logger.error('Subscription ARN not found.') | |
continue | |
set_subscription_filters(subscription['SubscriptionArn'], filters) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"dev": { | |
"callbacks": { | |
"post": "post_callback_sns_filters.set_filters" | |
}, | |
"events": [ | |
{ | |
"function": "<your-function-name>", | |
"event_source": { | |
"arn": "<your-sns-topic-arn>", | |
"filters": { | |
"attr1": ["filter1"], | |
"attr2": ["filter2", "filter3"] | |
}, | |
"events": [ | |
"sns:Publish" | |
] | |
} | |
} | |
] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Awesome! I had to add an extra check for None inside set_filters...