Skip to content

Instantly share code, notes, and snippets.

@jondkelley
Created September 19, 2018 02:04
Show Gist options
  • Save jondkelley/8d36341d1a66d92991a33d42ef1067eb to your computer and use it in GitHub Desktop.
Save jondkelley/8d36341d1a66d92991a33d42ef1067eb to your computer and use it in GitHub Desktop.
Example of how to overload custom SMTP commands in asyncio smtp server in python3 (no proxy or mailbox capability shown)
#!/usr/bin/env python3
# pip3 install aiosmtpd
from aiosmtpd.controller import Controller
from aiosmtpd.handlers import AsyncMessage
from aiosmtpd.smtp import SMTP, syntax
from asyncio import coroutine
import asyncio
import logging
import uuid
import logging
import aio_pika
import threading
__version__ = "0.0.1"
__metarealease__ = "alpha"
class SMTPOverload(SMTP):
"""
overloads homebrew smtp commands
"""
@syntax('TEST value')
async def smtp_TEST(self, value):
"""
smtp test command
"""
if not 'alpha' in __metarealease__:
help = "501 Command not implemented"
await self.push(help)
return
elif not value:
await self.push('501 Syntax: TEST value')
return
prefix="211"
if value == ".":
help = ("{prefix} it works\n"
"{prefix} congratulations").format(prefix=prefix)
await self.push(help)
return
elif value == "die":
logging.info("{prefix} test die issued; simulating hard physical crash".format(prefix=prefix))
help = ("503 simulate hard process crash\n"
"503 thanks for all the fish").format(prefix=prefix)
await self.push(help)
exit(0)
else:
await self.push('501 invalid test code')
return
status = await self._call_handler_hook('TEST', value)
await self.push(status)
class SMTPControllerOverload(Controller):
"""
overloads custom smtp controller functionality
"""
def factory(self):
"""
overload MySMTP and ident=
"""
_ver = __version__
_meta = __metarealease__.upper()
return SMTPOverload(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident="SMTP Digester v{} ({})".format(_ver, _meta))
class MessageOverload(AsyncMessage):
"""
overloads attributes to get active access with envelopes / incoming data
you can proxy, store or forward if you wanna
"""
def __init__(self, message_class=None, *, loop=None):
super().__init__(message_class)
self.loop = loop or asyncio.get_event_loop()
async def handle_DATA(self, server, session, envelope):
self.t_uid = str(uuid.uuid4())[:8]
message = self.prepare_message(session, envelope)
await self.handle_message(message, envelope)
return '250 OK transaction defined t_uid={}'.format(self.t_uid)
async def handle_message(self, message, envelope):
"""
perform action on any elements of the incoming email
"""
print(self.t_uid)
print(message)
print(envelope.mail_from)
print(envelope.rcpt_tos)
print(envelope.rcpt_options)
print(envelope.smtp_utf8)
async def amain(loop):
"""
smtp server
"""
control = SMTPControllerOverload(MessageOverload(), hostname='', port=8025)
control.start()
async def bmain(loop):
"""
amqp connection client
"""
connection = await aio_pika.connect_robust("amqp://guest:[email protected]:33323", loop=loop)
queue_name = "test_queue"
# Creating channel
try:
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True) # type: aio_pika.Queue
async for message in queue:
with message.process():
print(message.body)
if queue.name in message.body.decode():
break
except RuntimeError:
logging.info("AMQP server socket collapsed, messages may be lost in-transit")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
tasks = [asyncio.Task(amain(loop=loop)), asyncio.Task(bmain(loop=loop))]
try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment