Created
November 8, 2024 19:53
-
-
Save GGontijo/ebf6b24ca91d505eb32b1e9b56ecd5a7 to your computer and use it in GitHub Desktop.
design
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABC, abstractmethod | |
from enum import Enum | |
import json | |
from typing import Callable | |
import pika | |
import time | |
import logging | |
from pydantic import BaseModel | |
from contextlib import contextmanager | |
class Task(BaseModel): | |
id: str | |
name: str | |
class TaskState(Enum): | |
PENDING = 1 | |
RUNNING = 2 | |
SUCCESS = 3 | |
FAILED = 4 | |
class RabbitMQHandler(logging.Handler): | |
"""Custom logging handler to send logs to RabbitMQ queue.""" | |
def __init__(self, user: str, password: str, log_queue='logs', host='', port=5672): | |
super().__init__() | |
self.user = user | |
self.password = password | |
self.port = port | |
self.log_queue = log_queue | |
self.host = host | |
self.connection = None | |
self.channel = None | |
self._connect() | |
def _connect(self): | |
"""Establish a connection to RabbitMQ.""" | |
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=pika.PlainCredentials(self.user, self.password), port=self.port)) | |
self.channel = self.connection.channel() | |
self.channel.queue_declare(queue=self.log_queue) | |
def emit(self, record): | |
"""Send log record to the RabbitMQ queue.""" | |
if not self.channel or self.connection.is_closed: | |
try: | |
self._connect() | |
except Exception as e: | |
print(f"Failed to reconnect to RabbitMQ for logging: {e}") | |
return | |
try: | |
log_entry = self.format(record) | |
self.channel.basic_publish(exchange='', routing_key=self.log_queue, body=log_entry) | |
except Exception as e: | |
print(f"Failed to send log message to RabbitMQ: {e}") | |
def close(self): | |
"""Close the RabbitMQ connection on shutdown.""" | |
if self.connection: | |
self.connection.close() | |
super().close() | |
class BotFramework: | |
def __init__(self, client_id=None, client_secret=None): | |
self.logger = logging.getLogger() | |
self.logger = logging.getLogger('BotFramework') | |
self.logger.setLevel(logging.INFO) | |
rabbitmq_handler = RabbitMQHandler(log_queue='logs', user='', password='') | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
rabbitmq_handler.setFormatter(formatter) | |
self.logger.addHandler(rabbitmq_handler) | |
if not self.authenticate(client_id='teste', client_secret='teste'): | |
raise Exception('Failed to authenticate with Bot Framework') | |
def authenticate(self, client_id: str, client_secret: str) -> bool: | |
# request = request.post( | |
# 'https://login.microsoftonline.com/common/oauth2/v2.0/token', | |
# data={ | |
# 'client_id': client_id, | |
# 'client_secret': client_secret, | |
# 'grant_type': 'client_credentials', | |
# 'scope': 'https://api.botframework.com/.default' | |
# } | |
# ) | |
return True | |
def get_credentials(self, credential_name: str) -> dict: | |
return {} | |
def set_task_state(self, task: Task, state: TaskState) -> None: | |
self.logger.info(f"Setting task {task.id} state to {state}") | |
pass | |
def _callback(self, ch, method, properties, body) -> None: | |
body_data = json.loads(body) | |
task = Task(**body_data) | |
ch.basic_ack(delivery_tag=method.delivery_tag) | |
self.task = task | |
ch.stop_consuming() | |
@contextmanager | |
def wait_for_queue(self, queue_name: str, retries=5, retry_delay=5, on_start: Callable = lambda: None, on_finish: Callable = lambda: None): | |
self.task = None | |
attempt = 0 | |
while attempt < retries: | |
try: | |
self.logger.info(f"Connecting to RabbitMQ... Attempt {attempt + 1}") | |
param = pika.ConnectionParameters(host='', port=5672, credentials=pika.PlainCredentials('', '')) | |
with pika.BlockingConnection(param) as connection: | |
channel = connection.channel() | |
channel.queue_declare(queue=queue_name) | |
channel.basic_consume(queue=queue_name, on_message_callback=self._callback, auto_ack=False) | |
self.logger.info("Waiting for a message from the queue...") | |
channel.start_consuming() | |
yield self.task | |
if on_finish: | |
on_finish() | |
return | |
except pika.exceptions.AMQPConnectionError as e: | |
self.logger.error(f"Connection failed: {e}. Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
attempt += 1 | |
except Exception as e: | |
self.logger.exception(f"Unexpected error occurred: {e}") | |
self.logger.error("Max retries exceeded. Could not connect to RabbitMQ.") | |
class BotInterface(ABC): | |
def __init__(self): | |
pass | |
@abstractmethod | |
def run(self): | |
pass | |
class TrocaNotacargue(BotInterface): | |
def __init__(self, core: BotFramework): | |
# carrega os parametros | |
self.core = core | |
pass | |
def busca_dados_cargue(self): | |
logging.info('teste') | |
pass | |
def processa_dados_cargue(self): | |
logging.warning('teste') | |
logging.debug('teste') | |
pass | |
def cria_fila_sap(self): | |
logging.error('teste') | |
pass | |
def atualizar_status_sucesso(self): | |
self.core.set_task_state(task=self.task, state=TaskState.SUCCEEDED) | |
pass | |
def run(self): | |
while True: | |
with self.core.wait_for_queue('troca_nota_cargue', on_finish=self.atualizar_status_sucesso) as task: | |
teste = task | |
self.busca_dados_cargue() | |
self.processa_dados_cargue() | |
self.cria_fila_sap() | |
if __name__ == '__main__': | |
core = BotFramework(client_id='client_id', client_secret='client_secret') | |
bot = TrocaNotacargue(core=core, logger) | |
bot.run() | |
print(bot) | |
import requests | |
import zipfile | |
import os | |
import shutil | |
class FrameworkUpdater: | |
def __init__(self, current_version, update_url, download_url): | |
self.current_version = current_version | |
self.update_url = update_url | |
self.download_url = download_url | |
def check_for_updates(self): | |
response = requests.get(self.update_url) | |
latest_version = response.json().get("version") | |
if latest_version > self.current_version: | |
print(f"New version available: {latest_version}") | |
return True, latest_version | |
else: | |
print("No updates available.") | |
return False, self.current_version | |
def download_and_install_update(self): | |
# Download the latest update zip | |
response = requests.get(self.download_url, stream=True) | |
with open("framework_update.zip", "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
# Unzip and replace the old files | |
with zipfile.ZipFile("framework_update.zip", "r") as zip_ref: | |
zip_ref.extractall("framework_new") | |
# Move extracted files to the framework directory | |
shutil.rmtree("framework") # Remove old version | |
shutil.move("framework_new", "framework") # Replace with new version | |
os.remove("framework_update.zip") # Clean up | |
print("Update installed successfully.") | |
# Usage | |
updater = FrameworkUpdater(current_version="1.0.0", update_url="https://example.com/update.json", download_url="https://example.com/framework.zip") | |
update_needed, latest_version = updater.check_for_updates() | |
if update_needed: | |
updater.download_and_install_update() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment