Skip to content

Instantly share code, notes, and snippets.

@jondkelley
Last active September 19, 2018 02:40
Show Gist options
  • Save jondkelley/b5ea2c8ee65a32d0d9f088a2038bd7ec to your computer and use it in GitHub Desktop.
Save jondkelley/b5ea2c8ee65a32d0d9f088a2038bd7ec to your computer and use it in GitHub Desktop.
Example of how to overload custom SMTP commands in asyncio smtp server in python3 along with an AMQP client
#!/usr/bin/env python3
# pip3 install aiosmtpd
from aiosmtpd.controller import Controller
from aiosmtpd.handlers import AsyncMessage
from aiosmtpd.smtp import SMTP, syntax
from asyncio import coroutine
import asyncio
import logging
import uuid
import logging
import aio_pika
import threading
import aiohttp
__version__ = "0.0.1"
__metarealease__ = "alpha"
class SMTPOverload(SMTP):
"""
overloads homebrew smtp commands
"""
@syntax('TEST value')
async def smtp_TEST(self, value):
"""
smtp test command
"""
if not 'alpha' in __metarealease__:
help = "501 Command not implemented"
await self.push(help)
return
elif not value:
await self.push('501 Syntax: TEST value')
return
prefix="211"
if value == ".":
help = ("{prefix} it works\n"
"{prefix} congratulations").format(prefix=prefix)
await self.push(help)
return
elif value == "value":
await self.push("501 LOL nice try though")
return
elif value == "die":
logging.info("{prefix} test die issued; simulating hard physical crash".format(prefix=prefix))
help = ("503 simulate hard process crash\n"
"503 thanks for all the fish").format(prefix=prefix)
await self.push(help)
exit(0)
else:
await self.push('501 invalid test code')
return
status = await self._call_handler_hook('TEST', value)
await self.push(status)
class SMTPControllerOverload(Controller):
"""
overloads custom smtp controller functionality
"""
def factory(self):
"""
overload MySMTP and ident=
"""
_ver = __version__
_meta = __metarealease__.upper()
return SMTPOverload(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident="SMTP Digester v{} ({})".format(_ver, _meta))
class MessageOverload(AsyncMessage):
"""
overloads attributes to get active access with envelopes / incoming data
you can proxy, store or forward if you wanna
"""
def __init__(self, message_class=None, *, loop=None):
super().__init__(message_class)
self.loop = loop or asyncio.get_event_loop()
async def handle_DATA(self, server, session, envelope):
self.t_uid = str(uuid.uuid4())[:8]
message = self.prepare_message(session, envelope)
await self.handle_message(message, envelope)
return '250 OK transaction defined t_uid={}'.format(self.t_uid)
async def handle_message(self, message, envelope):
"""
perform action on any elements of the incoming email
"""
print(self.t_uid)
print(message)
print(envelope.mail_from)
print(envelope.rcpt_tos)
print(envelope.rcpt_options)
print(envelope.smtp_utf8)
async def amain(loop):
"""
smtp server
"""
control = SMTPControllerOverload(MessageOverload(), hostname='', port=8025)
control.start()
async def bmain(loop):
"""
amqp connection client
"""
logging.info("An AMQP client has some blockig")
connection = await aio_pika.connect_robust("amqp://guest:[email protected]:33323", loop=loop)
queue_name = "test_queue"
# Creating channel
try:
channel = await connection.channel() # type: aio_pika.Channel
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True) # type: aio_pika.Queue
async for message in queue:
with message.process():
print(message.body)
if queue.name in message.body.decode():
break
except RuntimeError:
logging.info("AMQP server socket collapsed, messages may be lost in-transit")
@coroutine
def cmain(x, y):
logging.info("Testing out some other blocking work")
for i in range(90000000):
print(i)
yield from asyncio.sleep(1.0)
if i == 99999999:
i = 0
async def fetch(session, url):
"""
loads a http session with url
"""
async with session.get(url, verify_ssl=False) as response:
return await response.text()
async def dmain(url):
"""
url expensive blocking work test
"""
async with aiohttp.ClientSession() as session:
for i in range(99999999):
print(i)
html = await fetch(session, url)
print(html)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
tasks = [
asyncio.Task(amain(loop=loop)),
asyncio.Task(bmain(loop=loop)),
asyncio.Task(cmain(1, 1)),
asyncio.Task(dmain(url='http://192.168.0.1'))]
try:
loop.run_until_complete(asyncio.gather(*tasks))
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment