Skip to content

Instantly share code, notes, and snippets.

@RomanAVolodin
Last active September 24, 2024 14:42
Show Gist options
  • Save RomanAVolodin/600159b146ab040c8ae6fba32985f72c to your computer and use it in GitHub Desktop.
Save RomanAVolodin/600159b146ab040c8ae6fba32985f72c to your computer and use it in GitHub Desktop.
class OrdersService(OrdersServiceABC):
def __init__(self, repository: OrdersRepository, broker: RabbitBroker) -> None:
self._repository = repository
self._broker = broker
async def process(self, id: uuid.UUID, author: UserDataInToken) -> Order:
order = await self.get(id, author)
company = await get_company(order.company_id)
check_if_user_in_company(author, company)
if order.status != OrderStatusEnum.PREPARED.value:
raise AppException(f'orders is not prepared but {order.status}', HTTPStatus.METHOD_NOT_ALLOWED)
dto = OrderResponseDto.model_validate(order)
dto.company = company
await self._broker.publish(dto, queue_to_old)
return await self._repository.process_order(order)
from functools import cache
from fastapi import Depends
from faststream.rabbit import RabbitBroker
from sqlalchemy.ext.asyncio import AsyncSession
from core.rabbit import get_queue_broker
from db.db import get_db_session
from dependencies.registrator import add_factory_to_mapper
from services.orders import OrdersRepository, OrdersService, OrdersServiceABC
@add_factory_to_mapper(OrdersServiceABC)
@cache
def create_orders_service(
session: AsyncSession = Depends(get_db_session),
broker: RabbitBroker = Depends(get_queue_broker),
) -> OrdersService:
return OrdersService(repository=OrdersRepository(db=session), broker=broker)
import logging
from functools import cache
from typing import Annotated
from uuid import UUID
from fastapi import Depends
from faststream.rabbit.fastapi import RabbitRouter
from core.settings import settings, queue_from_old
from schemas.order import IncomingMessageOrderUpdated
from services.orders import OrdersServiceABC
rabbit_router: RabbitRouter = RabbitRouter(
f'amqp://{settings.rabbitmq_user}:{settings.rabbitmq_pass}@{settings.rabbitmq_host}:{settings.rabbitmq_port}/'
)
logger = logging.getLogger().getChild('rabbit-router')
@cache
def get_queue_broker():
return rabbit_router.broker
from faststream.rabbit import RabbitQueue
queue_to_old = RabbitQueue(settings.rabbitmq_queue_name, durable=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment