Last active
September 19, 2018 02:40
-
-
Save jondkelley/b5ea2c8ee65a32d0d9f088a2038bd7ec to your computer and use it in GitHub Desktop.
Example of how to overload custom SMTP commands in asyncio smtp server in python3 along with an AMQP client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# pip3 install aiosmtpd | |
from aiosmtpd.controller import Controller | |
from aiosmtpd.handlers import AsyncMessage | |
from aiosmtpd.smtp import SMTP, syntax | |
from asyncio import coroutine | |
import asyncio | |
import logging | |
import uuid | |
import logging | |
import aio_pika | |
import threading | |
import aiohttp | |
__version__ = "0.0.1" | |
__metarealease__ = "alpha" | |
class SMTPOverload(SMTP): | |
""" | |
overloads homebrew smtp commands | |
""" | |
@syntax('TEST value') | |
async def smtp_TEST(self, value): | |
""" | |
smtp test command | |
""" | |
if not 'alpha' in __metarealease__: | |
help = "501 Command not implemented" | |
await self.push(help) | |
return | |
elif not value: | |
await self.push('501 Syntax: TEST value') | |
return | |
prefix="211" | |
if value == ".": | |
help = ("{prefix} it works\n" | |
"{prefix} congratulations").format(prefix=prefix) | |
await self.push(help) | |
return | |
elif value == "value": | |
await self.push("501 LOL nice try though") | |
return | |
elif value == "die": | |
logging.info("{prefix} test die issued; simulating hard physical crash".format(prefix=prefix)) | |
help = ("503 simulate hard process crash\n" | |
"503 thanks for all the fish").format(prefix=prefix) | |
await self.push(help) | |
exit(0) | |
else: | |
await self.push('501 invalid test code') | |
return | |
status = await self._call_handler_hook('TEST', value) | |
await self.push(status) | |
class SMTPControllerOverload(Controller): | |
""" | |
overloads custom smtp controller functionality | |
""" | |
def factory(self): | |
""" | |
overload MySMTP and ident= | |
""" | |
_ver = __version__ | |
_meta = __metarealease__.upper() | |
return SMTPOverload(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8, ident="SMTP Digester v{} ({})".format(_ver, _meta)) | |
class MessageOverload(AsyncMessage): | |
""" | |
overloads attributes to get active access with envelopes / incoming data | |
you can proxy, store or forward if you wanna | |
""" | |
def __init__(self, message_class=None, *, loop=None): | |
super().__init__(message_class) | |
self.loop = loop or asyncio.get_event_loop() | |
async def handle_DATA(self, server, session, envelope): | |
self.t_uid = str(uuid.uuid4())[:8] | |
message = self.prepare_message(session, envelope) | |
await self.handle_message(message, envelope) | |
return '250 OK transaction defined t_uid={}'.format(self.t_uid) | |
async def handle_message(self, message, envelope): | |
""" | |
perform action on any elements of the incoming email | |
""" | |
print(self.t_uid) | |
print(message) | |
print(envelope.mail_from) | |
print(envelope.rcpt_tos) | |
print(envelope.rcpt_options) | |
print(envelope.smtp_utf8) | |
async def amain(loop): | |
""" | |
smtp server | |
""" | |
control = SMTPControllerOverload(MessageOverload(), hostname='', port=8025) | |
control.start() | |
async def bmain(loop): | |
""" | |
amqp connection client | |
""" | |
logging.info("An AMQP client has some blockig") | |
connection = await aio_pika.connect_robust("amqp://guest:[email protected]:33323", loop=loop) | |
queue_name = "test_queue" | |
# Creating channel | |
try: | |
channel = await connection.channel() # type: aio_pika.Channel | |
# Declaring queue | |
queue = await channel.declare_queue(queue_name, auto_delete=True) # type: aio_pika.Queue | |
async for message in queue: | |
with message.process(): | |
print(message.body) | |
if queue.name in message.body.decode(): | |
break | |
except RuntimeError: | |
logging.info("AMQP server socket collapsed, messages may be lost in-transit") | |
@coroutine | |
def cmain(x, y): | |
logging.info("Testing out some other blocking work") | |
for i in range(90000000): | |
print(i) | |
yield from asyncio.sleep(1.0) | |
if i == 99999999: | |
i = 0 | |
async def fetch(session, url): | |
""" | |
loads a http session with url | |
""" | |
async with session.get(url, verify_ssl=False) as response: | |
return await response.text() | |
async def dmain(url): | |
""" | |
url expensive blocking work test | |
""" | |
async with aiohttp.ClientSession() as session: | |
for i in range(99999999): | |
print(i) | |
html = await fetch(session, url) | |
print(html) | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
loop = asyncio.get_event_loop() | |
tasks = [ | |
asyncio.Task(amain(loop=loop)), | |
asyncio.Task(bmain(loop=loop)), | |
asyncio.Task(cmain(1, 1)), | |
asyncio.Task(dmain(url='http://192.168.0.1'))] | |
try: | |
loop.run_until_complete(asyncio.gather(*tasks)) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment