Last active
October 14, 2022 17:48
-
-
Save apeyroux/0ddf6f0bf86666ac0df4d44d46dcd53d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from email.message import EmailMessage | |
import configparser | |
from random import random | |
import sys | |
from abc import ABC, abstractmethod | |
class MdaOptions: | |
pass | |
class MdaModuleHandler(ABC): | |
@abstractmethod | |
def set_next(self, handler): | |
pass | |
@abstractmethod | |
def handle(self, msg: EmailMessage) -> EmailMessage: | |
pass | |
class MdaModule(MdaModuleHandler): | |
confparser = None | |
log = None | |
name = None | |
options = None | |
timeout = None | |
_next_handler: MdaModuleHandler = None | |
def __init__(self, name: str, options: MdaOptions, confparser: configparser.ConfigParser): | |
try: | |
self.name = name | |
self.confparser = confparser | |
self.options = options | |
self.timeout = 60 | |
# self.timeout: int = int(confparser.get( | |
# 'DEFAULT', 'timeout') | |
# ) if confparser.has_option('DEFAULT', 'timeout') else 60 | |
except Exception: | |
# todo: à revoir mais actuellement dans le core | |
print('error', f'{self.name} - {sys.exc_info()[0]}') | |
raise | |
def set_next(self, handler: MdaModuleHandler) -> MdaModuleHandler: | |
self._next_handler = handler | |
return handler | |
def handle(self, mail: EmailMessage) -> EmailMessage: | |
if self._next_handler: | |
return self._next_handler.handle(mail) | |
class Remise(): | |
def __init__(self, name, options, confparser, mail: EmailMessage): | |
self._observers = [] # Liste des modules qui observent la remise | |
# Code de retour de la remise. Devrait etre le retour du LMTP/SMTP | |
self.code_retour = None | |
self.confparser = confparser | |
self.mail = mail | |
""" Initialisation des classes MFC""" | |
#self.transv = Transverse(confparser, self.log) | |
#self.fonct = Fonctionnel(self.transv, confparser, self.log) | |
def notify_all(self): | |
for module in self._observers: | |
module.update(self) | |
def attach(self, module: MdaModule): | |
if module not in self._observers: | |
print("{} à l'écoute de Remise".format(module.name)) | |
self._observers.append(module) | |
def detach(self, module): | |
try: | |
self._observers.remove(module) | |
except ValueError: | |
pass | |
def sendMessage(self): | |
# envoyer le message comme dans l'ancien code | |
# puis faire un notify_all | |
print("[REMISE] J'ai envoyé le message {} vers le Cyrus".format( | |
self.mail.get('Message-ID'))) | |
self.code_retour = [250, 400, 500][int(random() * 3)] | |
self.notify_all() | |
class Module1(MdaModule): | |
def __init__(self, name: str, options: MdaOptions, confparser: configparser.ConfigParser): | |
super().__init__(name, options, confparser) | |
def update(self, remise: Remise): | |
print(f'[{self.name}] Remise m\'informe de la délivrance de {remise.mail["Message-Id"]} avec le code {remise.code_retour}') | |
def handle(self, mail: EmailMessage) -> EmailMessage: | |
print(f'[{self.name}] - Application du module au mail ...') | |
mail.add_header('X-Module1', 'Module 1') | |
super().handle(mail) | |
return mail | |
class Module2(MdaModule): | |
def __init__(self, name: str, options: MdaOptions, confparser: configparser.ConfigParser): | |
super().__init__(name, options, confparser) | |
def update(self, remise: Remise): | |
print(f'[{self.name}] Remise m\'informe de la délivrance de {remise.mail["Message-Id"]} avec le code {remise.code_retour}') | |
def handle(self, mail: EmailMessage) -> EmailMessage: | |
print(f'[{self.name}] - Application du module au mail ...') | |
mail.add_header('X-Module2', 'Module 2') | |
super().handle(mail) | |
return mail | |
class Module3(MdaModule): | |
def __init__(self, name: str, options: MdaOptions, confparser: configparser.ConfigParser): | |
super().__init__(name, options, confparser) | |
def update(self, remise: Remise): | |
if remise.code_retour == 500: | |
print( | |
f'[{self.name}] Je ne peux pas envoyer mon message d\'absence, code retour de remise {remise.code_retour}') | |
else: | |
print(f'[{self.name}] Update de remise, je peux envoyer mon message d\'absence, code retour de remise {remise.code_retour}') | |
def handle(self, mail: EmailMessage) -> EmailMessage: | |
print(f'[{self.name}] - Application du module au mail ...') | |
mail.set_content(f'{mail.get_body()}\n---\nModule 3') | |
super().handle(mail) | |
return mail | |
if __name__ == '__main__': | |
""" Simulation d'un mail qui arrive d'en MFC """ | |
msg = EmailMessage() | |
msg['Subject'] = 'Test' | |
msg['From'] = '[email protected]' | |
msg['To'] = '[email protected]' | |
msg['Message-ID'] = '123456789' | |
msg.set_content('Test') | |
""" Création des modules """ | |
modules = [Module1('module1', MdaOptions(), None), | |
Module2('module2', MdaOptions(), None), | |
Module3('module3', MdaOptions(), None)] | |
""" Mise en place de l'observateur """ | |
remise = Remise('remise', MdaOptions(), None, msg) | |
for m in modules: | |
remise.attach(m) | |
""" Création du pipeline """ | |
for i in range(len(modules) - 1): | |
modules[i].set_next(modules[i + 1]) | |
""" Exécution du pipeline """ | |
msg = modules[0].handle(msg) | |
""" Affichage du résultat """ | |
print("=======================================") | |
print("Fin du pipe, je vais envoyer :") | |
print(msg) | |
print("=======================================") | |
print("") | |
""" Remise du mail """ | |
""" Les modules sont notifiés """ | |
remise.sendMessage() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Ce qui donne,