Skip to content

Instantly share code, notes, and snippets.

@baxeico
Created May 10, 2017 13:18
Show Gist options
  • Save baxeico/d5d6f855774ba9f72e11909648a46cae to your computer and use it in GitHub Desktop.
Save baxeico/d5d6f855774ba9f72e11909648a46cae to your computer and use it in GitHub Desktop.
A simple Python email gateway
import smtpd
import asyncore
import logging
import email
from email.header import decode_header
import requests
logger = logging.getLogger(__name__)
class CustomSMTPServer(smtpd.SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
try:
logger.debug('Receiving message from: %s:%d' % peer)
logger.debug('Message addressed from: %s' % mailfrom)
logger.debug('Message addressed to: %s' % str(rcpttos))
msg = email.message_from_string(data)
subject = ''
for encoded_string, charset in decode_header(msg.get('Subject')):
try:
if charset is not None:
subject += encoded_string.decode(charset)
else:
subject += encoded_string
except:
logger.exception('Error reading part of subject: %s charset %s' %
(encoded_string, charset))
logger.debug('Subject: %s' % subject)
text_parts = []
attachments = {}
logger.debug('Headers: %s' % msg.items())
# YOU CAN DO SOME SECURITY CONTROLS HERE
if (not mailfrom.endswith("@example.com") or
not msg.get('Mail-Header') == 'expected value'):
raise Exception("Email not trusted")
# loop on the email parts
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
c_type = part.get_content_type()
c_disp = part.get('Content-Disposition')
# text parts will be appended to text_parts
if c_type == 'text/plain' and c_disp == None:
text_parts.append(part.get_payload(decode=True).strip())
# ignore html part
elif c_type == 'text/html':
continue
# attachments will be sent as files in the POST request
else:
filename = part.get_filename()
filecontent = part.get_payload(decode=True)
if filecontent is not None:
if filename is None:
filename = 'untitled'
attachments['file%d' % len(attachments)] = (filename,
filecontent)
body = '\n'.join(text_parts)
except:
logger.exception('Error reading incoming email')
else:
# this data will be sent as POST data
data = {
'subject': subject,
'body': body,
'from': mailfrom,
}
url = 'http://yourserver.example.com/email-receive/'
logger.debug('Sending data and files to %s' % url)
try:
r = requests.post(url, data=data, files=attachments)
r.raise_for_status()
except:
logger.exception('Error in server request')
logger.debug(data)
logger.debug(attachments)
else:
logger.debug('Response from server: %s %s' % (r.status_code, r.text))
return
if __name__ == '__main__':
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.setLevel(logging.DEBUG)
logger.addHandler(ch)
server = CustomSMTPServer(('111.111.111.111', 25), None) # use your public IP here
asyncore.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment