Skip to content

Instantly share code, notes, and snippets.

@twratl
Last active July 18, 2024 17:09
Show Gist options
  • Save twratl/7c2c45495c394a01ee0fddfc87ac51a2 to your computer and use it in GitHub Desktop.
Save twratl/7c2c45495c394a01ee0fddfc87ac51a2 to your computer and use it in GitHub Desktop.
Sample Python Lambda code for parsing emails delivered to S3 buckets via SES and then Lambda invocation from SES - untested
# content provided as part of thread here: https://www.reddit.com/r/aws/comments/cffmxr/how_do_you_strip_the_attachments_from_aws_sessns/
from email.parser import BytesParser
from email import policy
import os
# event will be JSON from SES incoming email rule - NOT an S3 PUT event
def lambda_handler(event, context):
try:
ses_mail = event['Records'][0]['ses']['mail']
receipt = event['Records'][0]['ses']['receipt']
message_id = ses_mail['messageId']
print('Commencing processing for message {}'.format(message_id))
statuses = [
receipt['spamVerdict']['status'],
receipt['virusVerdict']['status'],
receipt['spfVerdict']['status'],
receipt['dkimVerdict']['status']
]
if 'FAIL' in statuses:
print('Email failed one of the security tests. Quitting.')
raise Exception('Message failed to pass the appropriate security checks - ceasing processing of message.')
# if we get here then we have passed all the tests so we can now process the message
bucket_name = os.environ['BUCKET']
s3 = boto3.resource('s3')
bucket = s3.Bucket(bucket_name)
raw_email = bucket.Object(os.environ['SES_S3_BUCKET_PREFIX'] + message_id).get()['Body'].read()
msg = BytesParser(policy=policy.SMTP).parsebytes(raw_email)
# get the plain text version of the email
plain = ''
try:
plain = msg.get_body(preferencelist=('plain'))
plain = ''.join(plain.get_content().splitlines(keepends=True))
plain = '' if plain == None else plain
except:
print('Incoming message does not have an plain text part - skipping this part.')
# get the HTML version of the email
html = ''
try:
html = msg.get_body(preferencelist=('html'))
html = ''.join(html.get_content().splitlines(keepends=True))
html = '' if html == None else html
except:
print('Incoming message does not have an HTML part - skipping this part.')
# do some stuff with the plain and/or html parts
# publish to SNS topic I assume...
try:
# delete the S3 object if you don't need it anymore?
except:
# some error you care about
except Exception as e:
# do whatever you need to do
raise e
@Quiita
Copy link

Quiita commented Jun 13, 2022

you save my day <3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment