Skip to content

Instantly share code, notes, and snippets.

View mrmamongo's full-sized avatar
🗿
To code or not to code?

mrmamongo

🗿
To code or not to code?
View GitHub Profile
from __future__ import annotations
from enum import Enum
from dataclasses import dataclass, field
class Sign(Enum):
X = "X"
O = "O"
class Player(Enum):
{
"params": {
"vectors": {
"dense": {
"size": 1024,
"distance": "Cosine"
},
"lie": {
"size": 1024,
"distance": "Cosine",
@mrmamongo
mrmamongo / main.py
Created May 28, 2025 14:22
Dishka + Faststream Kafka + Fastapi
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
from dishka.integrations import fastapi as fastapi_integration
from dishka.integrations import faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from dishka.integrations.fastapi import DishkaRoute
from dishka.integrations.faststream import inject as faststream_inject
from fastapi import FastAPI, APIRouter
from faststream import FastStream
T = TypeVar('T')
class DependencyKey(Generic[T]):
__slots__ = ('key',)
def __init__(
self, tp: type[T] | None = None, dependency_key: str | None = None
) -> None:
from aiogram import Bot
from aiogram.fsm.state import State, StatesGroup
from aiogram.types import Message, CallbackQuery
from aiogram_dialog import Dialog, DialogManager, Window
from aiogram_dialog.widgets.input import TextInput, ManagedTextInput
from aiogram_dialog.widgets.kbd import WebApp, Button, Cancel, SwitchTo
from aiogram_dialog.widgets.text import Const, Format
from dishka import FromDishka
from loguru import logger
with coins_cte as (select c.request_id,
jsonb_agg(
jsonb_build_object(
'url', url,
'address', address,
'symbol', symbol,
'amount', amount
)
) as coins
from hamster_schema.request_coins c
@mrmamongo
mrmamongo / container.py
Created January 14, 2024 00:13
DI Container with FastDepends built by https://github.com/Lancetnik
from typing import TypeVar, ParamSpec, Callable
from fast_depends import inject
from fast_depends.dependencies.provider import Provider
T = TypeVar("T")
P = ParamSpec("P")
class Container(Provider):
def execute(
import json
import logging
import sys
from dataclasses import dataclass
import adaptix
@dataclass
class ExecutionData:
logger = logging.getLogger(__name__)
class Application:
def __init__(
self,
config: Config,
app: FastAPI
) -> None:
self.config = config
import logging
from dataclasses import dataclass
from typing import Callable, Any, NamedTuple
from telethon import TelegramClient
from telethon.events import NewMessage
from telethon.events.common import EventBuilder
Handler = Callable[[Any], None]