Skip to content

Instantly share code, notes, and snippets.

@charlesrc019
Created October 16, 2025 00:22
Show Gist options
  • Save charlesrc019/2e221723156ea9ddae266a5d4e569165 to your computer and use it in GitHub Desktop.
Save charlesrc019/2e221723156ea9ddae266a5d4e569165 to your computer and use it in GitHub Desktop.
import boto3
import email
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from botocore.exceptions import ClientError
import logging
from email.utils import parseaddr, formataddr
import hashlib
# Configure logging.
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Set vars.
REGION = 'us-west-2'
S3_BUCKET = 'forwarder.charleschristensen.co'
S3_PREFIX = None
EMAIL_DEFAULT = '[email protected]'
EMAIL_MAPPINGS = {
'[email protected]': '[email protected]',
'[email protected]': '[email protected]',
'[email protected]': '[email protected]',
'[email protected]': '[email protected]',
'[email protected]': '[email protected]',
'[email protected]': '[email protected]',
'[email protected]': '[email protected]'
# REMEMBER THAT RECIPIENT EMAILS IN HERE MUST BE VERIFIED IN SES!!!
}
def get_message_from_s3(message_id):
if S3_PREFIX:
object_path = f"{S3_PREFIX}/{message_id}"
else:
object_path = message_id
client_s3 = boto3.client("s3")
object_s3 = client_s3.get_object(
Bucket=S3_BUCKET,
Key=object_path
)
file = object_s3['Body'].read()
return file
def send_raw_email(raw_message, source, destination):
mailobject = email.message_from_bytes(raw_message)
new_message = MIMEMultipart('alternative')
# Parse original from header to extract display name and email.
original_from = mailobject.get('From')
display_name, email_addr = parseaddr(original_from)
email_hash = hashlib.md5(email_addr.lower().encode('utf-8')).hexdigest()[:16].upper()
email_sender = email_hash + "@forwarder.charleschristensen.co"
# Create new from header with original display name and verified sender email.
new_from = formataddr((display_name, email_sender)) if display_name else email_sender
# Copy essential headers.
for header in ['To', 'Subject', 'Date', 'Message-ID']:
if mailobject.get(header):
new_message[header] = mailobject.get(header)
new_message.add_header('From', new_from)
new_message.add_header('Reply-To', source)
# Extract and attach preferred parts. Prioritize HTML, include plain text only if no HTML.
has_html = False
html_payload = None
html_subtype = None
text_payload = None
text_subtype = None
if mailobject.is_multipart():
for part in mailobject.get_payload():
content_type = part.get_content_type()
if content_type == 'text/html':
has_html = True
html_payload = part.get_payload(decode=True)
html_subtype = part.get_content_subtype() # Should be 'html'
elif content_type == 'text/plain':
text_payload = part.get_payload(decode=True)
text_subtype = part.get_content_subtype() # Should be 'plain'
else:
content_type = mailobject.get_content_type()
if content_type == 'text/html':
has_html = True
html_payload = mailobject.get_payload(decode=True)
html_subtype = mailobject.get_content_subtype()
elif content_type == 'text/plain':
text_payload = mailobject.get_payload(decode=True)
text_subtype = mailobject.get_content_subtype()
# Attach HTML, if present.
if has_html and html_payload is not None:
# Decode payload and re-wrap in MIMEText to ensure proper handling
html_text = html_payload.decode('utf-8', errors='replace')
new_message.attach(MIMEText(html_text, html_subtype, 'utf-8'))
# Attach plain text, only if no HTML.
if not has_html and text_payload is not None:
text_text = text_payload.decode('utf-8', errors='replace')
new_message.attach(MIMEText(text_text, text_subtype, 'utf-8'))
# Special case. Fallback to attaching both.
if new_message.get_payload() == []:
if mailobject.is_multipart():
for part in mailobject.get_payload():
new_message.attach(part)
else:
new_message.attach(MIMEText(mailobject.get_payload(decode=True).decode('utf-8', errors='replace'), mailobject.get_content_subtype(), 'utf-8'))
modified_message = new_message.as_bytes()
client_ses = boto3.client('ses', REGION)
try:
response = client_ses.send_raw_email(
Source=email_sender,
Destinations=[destination],
RawMessage={
'Data': modified_message
}
)
return f"Email Sent! Message ID: {response['MessageId']}"
except ClientError as e:
return f"Error! {e.response['Error']['Message']}"
def lambda_handler(event, context):
# Get message details from SES event
message_id = event['Records'][0]['ses']['mail']['messageId']
to_address = event['Records'][0]['ses']['mail']['destination'][0]
# Log initial receipt
logger.info(f"Received Message ID: {message_id} for {to_address}")
# Retrieve the raw email from S3
raw_email = get_message_from_s3(message_id)
# Parse the original email to get the source
mailobject = email.message_from_bytes(raw_email)
original_from = mailobject.get('From')
# Log the full message
logger.info(f"From: {original_from}")
logger.info(f"Raw Message: {raw_email.decode('utf-8')}")
# Check if the recipient exists in our mappings
if to_address in EMAIL_MAPPINGS:
forward_to = EMAIL_MAPPINGS[to_address.lower()]
else:
forward_to = EMAIL_DEFAULT
# Forward the modified email
result = send_raw_email(raw_email, original_from, forward_to)
logger.info(result)
return {
'statusCode': 200,
'body': result
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment