Skip to content

Instantly share code, notes, and snippets.

@Marco-Rod
Last active September 19, 2024 11:56
Show Gist options
  • Save Marco-Rod/b7a25f880c685b72f24bba1982c328bd to your computer and use it in GitHub Desktop.
Save Marco-Rod/b7a25f880c685b72f24bba1982c328bd to your computer and use it in GitHub Desktop.
import os
import requests
from sqlalchemy import create_engine, Column, Integer, String, Float
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import inspect
# Estas variables globales deberian ser declaradas
# en un archivo .env o cualquier otro metodo por seguridad
# Ya que son datos sensibles.
ENDPOINT = 'https://api.com/'
API_KEY = 'my_api_key'
# DB_USER = 'marco_root'
# DB_PASSWORD = 'v>9)7jUembDL17#70rVI7db1D'
# DB_HOST = 'localhost'
# DB_NAME = 'database_name'
DB_NAME = 'inventory'
DB_HOST = 'localhost'
DB_PASSWORD = '141981'
DB_PORT = 3306
DB_USER = 'root'
Base = declarative_base() # Define la base de datos
class Producto(Base):
"""
Definimos el modelo productos de la base de datos
"""
__tablename__ = 'productos'
id = Column(Integer, primary_key=True)
nombre = Column(String(50), nullable=False)
cantidad_disponible = Column(Integer, nullable=False)
precio = Column(Float, nullable=False)
class DatabaseManager:
"""
Manager para la base de datos
"""
def __init__(self, database_url):
"""
Creamos el motor de la base de datos
"""
self.engine = create_engine(database_url, future=True)
self.Session = sessionmaker(bind=self.engine, future=True)
self._check_and_create_tables() # Validamos si existe o no la tabla
def _check_and_create_tables(self):
"""
con este metodo privado nos aseguramos de validar que
la tabla productos existe en la base de datos
de lo contrario la creamos antes de hacer cualquier
uso de la base de datos y la tabla.
"""
try:
inspector = inspect(self.engine)
if not inspector.has_table("productos"):
Base.metadata.create_all(self.engine) # Crea la tabla si no existe
print("La tabla productos fue creada correctamente.")
else:
print("La tabla productos ya existe.")
except SQLAlchemyError as e:
print(f"Error al crear la tabla: {e}")
def create_product(self, products):
"""
Metodo para crear los productos teniendo en cuenta que
pueden ser una gran cantidad de datos procesados desde la API
se hace uso del metodo bulk_save_objects que nos permite insertar
multiples objetos de Producto al mismo tiempo. en este caso es mas
eficiente que agregarlos de manera individual.
Al usar una Sesion para hacer las inserciones nos aseguramos que la
transaccion mantenga la integridad de los datos y sea eficiente.
dentro de las posibles excepciones de parte de SQLAlchemy o errores
inesperados agregamos un rollback de toda la transaccion.
"""
with self.Session() as session: # Creando la sesion
try:
session.bulk_save_objects([Producto(**prod) for prod in products]) # Agregando los productos
session.commit()
print(f'{len(products)} productos creados con exito.')
except SQLAlchemyError as e: # Manejando errores de SQLAlchemy
session.rollback()
print(f'Error al crear los productos: {e}')
except Exception as e: # Manejando errores inesperados
session.rollback()
print(f'Error inesperado: {e}')
class ManagerApiProducts:
def __init__(self, base_url, api_key=None) -> None:
"""
Inicializamos los valores principales para hacer la peticion a la API
base_url : str
api_key: str
api_key puede ser declarada globalmente como una constante o tomada
desde una variable de entorno.
"""
self.base_url = base_url
self.api_key = api_key or os.getenv('API_KEY')
def _get_headers(self):
"""
Metodo privado que retorna el encabezado para la API, el cual contiene
los valores necesarios para la autenticacion
"""
if not self.api_key:
raise ValueError("No API Key provider")
return {
'Authorization': f'Bearer {self.api_key}' # Se usa bearer como esquema de autenticacion, puede ser modificado
}
def get(self, endpoint: str, params=None):
"""
Metodo para realizar solicitudes GET
url: str
headers: self
"""
url = f'{self.base_url}/{endpoint}'
headers = self._get_headers()
try:
response = requests.get(url=url, headers=headers, params=params)
response.raise_for_status() # Levanta posibles excepciones para codigos HTTP
if response.status_code == 200:
print(f'Solicitud exitosa, codigo de estado: {response.status_code}')
return response.json() # Retorna el contenido en formato JSON
except requests.exceptions.RequestException as e: # Obtiene las excepciones
print(f"Error Making GET Method request: {e}")
return None
def filter_data(self, data: list):
"""
Metodo privado que nos permite filtrar cada uno de los campos
necesarios para manipular la informacion.
"""
data_filtered = list()
if len(data) > 0:
for items in data:
clean_data = {key: value for key, value in items.items() if key == 'id' or key == 'nombre'\
or key == 'precio' or key == 'cantidad'}
data_filtered.append(clean_data)
return data_filtered
def get_product_stock(self, data: list, is_reverse: bool):
"""
Metodo para retornar el producto con mayor o menor cantidad de unidades disponible.
obtiene la lista de productos y usando la funcion sorted los ordena y con el
parametro reverse en True o False los coloca en orden descendente o ascendente;
(reverse = True el resultado estara en orden descendente)
(reverse = False el resultado estara en orden ascendente)
como parametro key pasamos una funcion lambda que se encarga de tomar
como criterio de ordenacion la clave cantidad presente en cada uno de los diccionarios de la lista.
retorna el primer elemento presente en la nueva lista ya ordenada.
"""
order_data = list()
if len(data) > 0:
order_data = sorted(data, key=lambda index:index['cantidad'], reverse=is_reverse)
return order_data[0]
return order_data
def get_product_high_price_high_stock(self, data: list):
"""
metodo para obtener el producto con el precio mas alto y con la mayor cantidad
disponible.
el producto a retornar es el que cumpla con las condiciones.
se entiende que si existe solo un producto con un unico precio mas alto ese sera
el que se tome como valido, en caso de que exista mas de un producto con el mismo precio
se aplica la validacion de cual tiene la mayor cantidad disponible.
finalmente se obtiene el producto en la primera posicion del listado.
"""
filter_by_price = list()
if len(data) > 0:
order_data_by_price = sorted(data, key=lambda index:index['precio'], reverse=True) # Ordenamos los productos por mayor a menor precio
high_price = order_data_by_price[0]['precio'] # Obtenemos el producto con el precio mas alto
filter_by_price = list(filter(lambda index:index['precio'] == high_price, order_data_by_price)) # Filtramos los productos por el precio mas alto
if len(filter_by_price) > 1: # Validamos que exista mas de un producto con el precio mas alto
filter_by_stock = sorted(filter_by_price, key=lambda index:index['cantidad'], reverse=True) # En caso de que exista, ordenamos por la mayor cantidad
return filter_by_stock[0]
return filter_by_price
def get_product_arithmetic_mean_price(self, data: list):
"""
En este metodo Determinamos el producto cuyo precio este mas cercano
a la media aritmetica del precio de todos los productos.
Primero obtenemos todos los precios dentro de la lista de productos y
los sumamos.
para obtener la media aritemetica de los precios. hacemos la division
entre la suma de los precios y el total de los productos en el listado.
una vez obtenida la media aritemetica comprobaremos cada uno de los precios de
los productos en el listado y determinar cual es el mas cercano a la media aritemetica.
esto lo hacemos con el uso de la funcion min y la funcon abs.
con min recorremos la lista de productos y con abs calculamos la diferencia absoluta
entre cada precio y la media aritmetica.
por ultimo retornamos el o los productos con el precio mas cercano a la media.
"""
filter_by_price = list()
prices_list = list()
sum_prices = 0.00
if len(data) > 0:
for items in data:
get_all_prices = {key: value for key, value in items.items() if key.startswith('precio')}
sum_prices+=get_all_prices['precio']
prices_list.append(get_all_prices['precio'])
sum_prices = round(sum_prices, 2)
total_products = len(data)
arithmetic_mean = sum_prices / total_products
closest_price = min(prices_list, key=lambda x: abs(x - arithmetic_mean))
filter_by_price = list(filter(lambda index:index['precio'] == closest_price, data))
return filter_by_price
# Using class ManageAPIProducts
api_client = ManagerApiProducts(base_url=ENDPOINT, api_key=API_KEY)
endpoint = 'productos'
response = api_client.get(endpoint=endpoint, params=None)
filter_data = api_client.filter_data(data=response)
max_stock = api_client.get_product_stock(filter_data, True)
min_stock = api_client.get_product_stock(filter_data, False)
high_price_high_stock = api_client.get_product_high_price_high_stock(data=filter_data)
product_arithmetic_mean_price = api_client.get_product_arithmetic_mean_price(filter_data)
print(f'Se filtraron los datos obteniendo solo: id, nombre, precio y cantidad disponible: {filter_data}')
print(f'Este es el producto con la mayor cantidad disponible: {max_stock}')
print(f'Este es el producto con la menor cantidad disponible: {min_stock}')
print(f'Este es el producto con el precio mas alto y con la mayor cantidad disponible: {high_price_high_stock}')
print(f'Este es el producto cuyo precio esta mas cerccano a la media aritmetica del precio de todos los productos: {product_arithmetic_mean_price}')
# Using class DataBaseManager
# Se crea la url para la base de datos con las credenciales.
DATABASE_URL = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}"
# Se inicializa el manejador de la base de datos
db_manager = DatabaseManager(DATABASE_URL)
# Se insertar los registros procesados desde la API
db_manager.create_product(products=filter_data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment