Created
May 10, 2017 13:18
-
-
Save baxeico/d5d6f855774ba9f72e11909648a46cae to your computer and use it in GitHub Desktop.
A simple Python email gateway
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import smtpd | |
import asyncore | |
import logging | |
import email | |
from email.header import decode_header | |
import requests | |
logger = logging.getLogger(__name__) | |
class CustomSMTPServer(smtpd.SMTPServer): | |
def process_message(self, peer, mailfrom, rcpttos, data): | |
try: | |
logger.debug('Receiving message from: %s:%d' % peer) | |
logger.debug('Message addressed from: %s' % mailfrom) | |
logger.debug('Message addressed to: %s' % str(rcpttos)) | |
msg = email.message_from_string(data) | |
subject = '' | |
for encoded_string, charset in decode_header(msg.get('Subject')): | |
try: | |
if charset is not None: | |
subject += encoded_string.decode(charset) | |
else: | |
subject += encoded_string | |
except: | |
logger.exception('Error reading part of subject: %s charset %s' % | |
(encoded_string, charset)) | |
logger.debug('Subject: %s' % subject) | |
text_parts = [] | |
attachments = {} | |
logger.debug('Headers: %s' % msg.items()) | |
# YOU CAN DO SOME SECURITY CONTROLS HERE | |
if (not mailfrom.endswith("@example.com") or | |
not msg.get('Mail-Header') == 'expected value'): | |
raise Exception("Email not trusted") | |
# loop on the email parts | |
for part in msg.walk(): | |
if part.get_content_maintype() == 'multipart': | |
continue | |
c_type = part.get_content_type() | |
c_disp = part.get('Content-Disposition') | |
# text parts will be appended to text_parts | |
if c_type == 'text/plain' and c_disp == None: | |
text_parts.append(part.get_payload(decode=True).strip()) | |
# ignore html part | |
elif c_type == 'text/html': | |
continue | |
# attachments will be sent as files in the POST request | |
else: | |
filename = part.get_filename() | |
filecontent = part.get_payload(decode=True) | |
if filecontent is not None: | |
if filename is None: | |
filename = 'untitled' | |
attachments['file%d' % len(attachments)] = (filename, | |
filecontent) | |
body = '\n'.join(text_parts) | |
except: | |
logger.exception('Error reading incoming email') | |
else: | |
# this data will be sent as POST data | |
data = { | |
'subject': subject, | |
'body': body, | |
'from': mailfrom, | |
} | |
url = 'http://yourserver.example.com/email-receive/' | |
logger.debug('Sending data and files to %s' % url) | |
try: | |
r = requests.post(url, data=data, files=attachments) | |
r.raise_for_status() | |
except: | |
logger.exception('Error in server request') | |
logger.debug(data) | |
logger.debug(attachments) | |
else: | |
logger.debug('Response from server: %s %s' % (r.status_code, r.text)) | |
return | |
if __name__ == '__main__': | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
ch.setFormatter(formatter) | |
logger.setLevel(logging.DEBUG) | |
logger.addHandler(ch) | |
server = CustomSMTPServer(('111.111.111.111', 25), None) # use your public IP here | |
asyncore.loop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment