import logging
from typing import Annotated, Any
from fastapi import Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jwt import PyJWKClient
This is a working example of how to asynchronously upload chunks to an AWS S3 bucket using Python. We should modify or optimize the code to suit our needs. For example, we can use a generator to yield chunks of the file instead of loading the entire file into memory.
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_REGION_NAME=eu-central-1
IMPORTANT: There is an issue with the official MongoDB Compass client. When you try to connect to a ReplicaSet, it first correctly translates the MongoDB connection string (DNS) to localhost:PORT. However, once it reads the member hostnames (like in this script), it will attempt to connect to those hosts instead!
Usualy, the script would look like this:
The important part here is when_used="json". This allows the serialization of data (ObjectId) as a string with FastAPI, while also ensuring that model_dump() retains the ObjectId for MongoDB. Not just for _id, but for every reference with ObjectId.
from typing import Annotated, Any
from bson import ObjectId
FIREBASE_JWKS_PUBLIC_KEYS = 'https://www.googleapis.com/robot/v1/metadata/x509/[email protected]' | |
keys_cache = {'keys': None, 'last_fetched': 0.0} | |
async def fetch_public_keys() -> dict[str, str]: | |
"""Fetch Firebase public keys from URL, with caching for 1 hour.""" | |
# Check if the cache is older than 1 hour (3600 seconds) |
# Basic configuration | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format="%(asctime)s - %(levelname)s - %(name)s - %(funcName)20s() on line %(lineno)s - %(message)s", | |
# datefmt="%Y-%m-%d %H:%M:%S.%f %Z%z", | |
handlers=[stream_handler], | |
) | |
""" | |
Just a note where to find original asyncpg exception object wrapped by SQLAlchemy | |
""" | |
try: | |
created_record = await db.scalar(insert(Model).values(**data_in.model_dump()).returning(Model)) | |
except IntegrityError as err: | |
if isinstance(err.orig.__cause__, UniqueViolationError): | |
raise RecordAlreadyExists() |
import asyncio | |
from sqlalchemy import ForeignKey, UniqueConstraint, select | |
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine | |
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, selectin_polymorphic | |
aengine = create_async_engine("sqlite+aiosqlite:///database2.db") | |
asession = async_sessionmaker(aengine) | |