# alembic.ini
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# Basic configuration | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format="%(asctime)s - %(levelname)s - %(name)s - %(funcName)20s() on line %(lineno)s - %(message)s", | |
# datefmt="%Y-%m-%d %H:%M:%S.%f %Z%z", | |
handlers=[stream_handler], | |
) | |
FIREBASE_JWKS_PUBLIC_KEYS = 'https://www.googleapis.com/robot/v1/metadata/x509/[email protected]' | |
keys_cache = {'keys': None, 'last_fetched': 0.0} | |
async def fetch_public_keys() -> dict[str, str]: | |
"""Fetch Firebase public keys from URL, with caching for 1 hour.""" | |
# Check if the cache is older than 1 hour (3600 seconds) |
The important part here is when_used="json". This allows the serialization of data (ObjectId) as a string with FastAPI, while also ensuring that model_dump() retains the ObjectId for MongoDB. Not just for _id, but for every reference with ObjectId.
from typing import Annotated, Any
from bson import ObjectId
IMPORTANT: There is an issue with the official MongoDB Compass client. When you try to connect to a ReplicaSet, it first correctly translates the MongoDB connection string (DNS) to localhost:PORT. However, once it reads the member hostnames (like in this script), it will attempt to connect to those hosts instead!
Usualy, the script would look like this:
This is a working example of how to asynchronously upload chunks to an AWS S3 bucket using Python. We should modify or optimize the code to suit our needs. For example, we can use a generator to yield chunks of the file instead of loading the entire file into memory.
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_REGION_NAME=eu-central-1