Skip to content

Instantly share code, notes, and snippets.

@GGontijo
Created November 8, 2024 19:53
Show Gist options
  • Save GGontijo/ebf6b24ca91d505eb32b1e9b56ecd5a7 to your computer and use it in GitHub Desktop.
Save GGontijo/ebf6b24ca91d505eb32b1e9b56ecd5a7 to your computer and use it in GitHub Desktop.
design
from abc import ABC, abstractmethod
from enum import Enum
import json
from typing import Callable
import pika
import time
import logging
from pydantic import BaseModel
from contextlib import contextmanager
class Task(BaseModel):
id: str
name: str
class TaskState(Enum):
PENDING = 1
RUNNING = 2
SUCCESS = 3
FAILED = 4
class RabbitMQHandler(logging.Handler):
"""Custom logging handler to send logs to RabbitMQ queue."""
def __init__(self, user: str, password: str, log_queue='logs', host='', port=5672):
super().__init__()
self.user = user
self.password = password
self.port = port
self.log_queue = log_queue
self.host = host
self.connection = None
self.channel = None
self._connect()
def _connect(self):
"""Establish a connection to RabbitMQ."""
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, credentials=pika.PlainCredentials(self.user, self.password), port=self.port))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.log_queue)
def emit(self, record):
"""Send log record to the RabbitMQ queue."""
if not self.channel or self.connection.is_closed:
try:
self._connect()
except Exception as e:
print(f"Failed to reconnect to RabbitMQ for logging: {e}")
return
try:
log_entry = self.format(record)
self.channel.basic_publish(exchange='', routing_key=self.log_queue, body=log_entry)
except Exception as e:
print(f"Failed to send log message to RabbitMQ: {e}")
def close(self):
"""Close the RabbitMQ connection on shutdown."""
if self.connection:
self.connection.close()
super().close()
class BotFramework:
def __init__(self, client_id=None, client_secret=None):
self.logger = logging.getLogger()
self.logger = logging.getLogger('BotFramework')
self.logger.setLevel(logging.INFO)
rabbitmq_handler = RabbitMQHandler(log_queue='logs', user='', password='')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
rabbitmq_handler.setFormatter(formatter)
self.logger.addHandler(rabbitmq_handler)
if not self.authenticate(client_id='teste', client_secret='teste'):
raise Exception('Failed to authenticate with Bot Framework')
def authenticate(self, client_id: str, client_secret: str) -> bool:
# request = request.post(
# 'https://login.microsoftonline.com/common/oauth2/v2.0/token',
# data={
# 'client_id': client_id,
# 'client_secret': client_secret,
# 'grant_type': 'client_credentials',
# 'scope': 'https://api.botframework.com/.default'
# }
# )
return True
def get_credentials(self, credential_name: str) -> dict:
return {}
def set_task_state(self, task: Task, state: TaskState) -> None:
self.logger.info(f"Setting task {task.id} state to {state}")
pass
def _callback(self, ch, method, properties, body) -> None:
body_data = json.loads(body)
task = Task(**body_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
self.task = task
ch.stop_consuming()
@contextmanager
def wait_for_queue(self, queue_name: str, retries=5, retry_delay=5, on_start: Callable = lambda: None, on_finish: Callable = lambda: None):
self.task = None
attempt = 0
while attempt < retries:
try:
self.logger.info(f"Connecting to RabbitMQ... Attempt {attempt + 1}")
param = pika.ConnectionParameters(host='', port=5672, credentials=pika.PlainCredentials('', ''))
with pika.BlockingConnection(param) as connection:
channel = connection.channel()
channel.queue_declare(queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=self._callback, auto_ack=False)
self.logger.info("Waiting for a message from the queue...")
channel.start_consuming()
yield self.task
if on_finish:
on_finish()
return
except pika.exceptions.AMQPConnectionError as e:
self.logger.error(f"Connection failed: {e}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
attempt += 1
except Exception as e:
self.logger.exception(f"Unexpected error occurred: {e}")
self.logger.error("Max retries exceeded. Could not connect to RabbitMQ.")
class BotInterface(ABC):
def __init__(self):
pass
@abstractmethod
def run(self):
pass
class TrocaNotacargue(BotInterface):
def __init__(self, core: BotFramework):
# carrega os parametros
self.core = core
pass
def busca_dados_cargue(self):
logging.info('teste')
pass
def processa_dados_cargue(self):
logging.warning('teste')
logging.debug('teste')
pass
def cria_fila_sap(self):
logging.error('teste')
pass
def atualizar_status_sucesso(self):
self.core.set_task_state(task=self.task, state=TaskState.SUCCEEDED)
pass
def run(self):
while True:
with self.core.wait_for_queue('troca_nota_cargue', on_finish=self.atualizar_status_sucesso) as task:
teste = task
self.busca_dados_cargue()
self.processa_dados_cargue()
self.cria_fila_sap()
if __name__ == '__main__':
core = BotFramework(client_id='client_id', client_secret='client_secret')
bot = TrocaNotacargue(core=core, logger)
bot.run()
print(bot)
import requests
import zipfile
import os
import shutil
class FrameworkUpdater:
def __init__(self, current_version, update_url, download_url):
self.current_version = current_version
self.update_url = update_url
self.download_url = download_url
def check_for_updates(self):
response = requests.get(self.update_url)
latest_version = response.json().get("version")
if latest_version > self.current_version:
print(f"New version available: {latest_version}")
return True, latest_version
else:
print("No updates available.")
return False, self.current_version
def download_and_install_update(self):
# Download the latest update zip
response = requests.get(self.download_url, stream=True)
with open("framework_update.zip", "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Unzip and replace the old files
with zipfile.ZipFile("framework_update.zip", "r") as zip_ref:
zip_ref.extractall("framework_new")
# Move extracted files to the framework directory
shutil.rmtree("framework") # Remove old version
shutil.move("framework_new", "framework") # Replace with new version
os.remove("framework_update.zip") # Clean up
print("Update installed successfully.")
# Usage
updater = FrameworkUpdater(current_version="1.0.0", update_url="https://example.com/update.json", download_url="https://example.com/framework.zip")
update_needed, latest_version = updater.check_for_updates()
if update_needed:
updater.download_and_install_update()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment