Skip to content

Instantly share code, notes, and snippets.

@Luocy7
Last active November 1, 2024 15:06
Show Gist options
  • Save Luocy7/3234e3f183f8623246789ea2b3a23b93 to your computer and use it in GitHub Desktop.
Save Luocy7/3234e3f183f8623246789ea2b3a23b93 to your computer and use it in GitHub Desktop.
Alembic work properly with PostgreSQL schemas

Alembic work properly with PostgreSQL schemas

Version

  • alembic: 1.12.0
  • python: 3.11
  • Sqlalchemy: 2.0.22

Init

init alembic with official command

alembic init alembic

Which generate an alembic environment looks like:

yourproject/
    alembic/
        env.py
        README
        script.py.mako
        versions/
            <empty>
   alembic.ini

Setup

alembic.ini

[alembic]
script_location = alembic
file_template = %%(year)d-%%(month).2d-%%(day).2d-%%(hour).2d-%%(minute).2d_%%(rev)s
prepend_sys_path = .
output_encoding = utf-8
# truncate_slug_length = 40


[post_write_hooks]
hooks = black,isort,ruff

black.type = console_scripts
black.entrypoint = black

isort.type = console_scripts
isort.entrypoint = isort

ruff.type = exec
ruff.executable = ruff
ruff.options = check --fix-only REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = INFO
handlers = console
qualname =

[logger_sqlalchemy]
level = INFO
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

env.py

import asyncio
from logging.config import fileConfig

from alembic import context
from sqlalchemy.ext.asyncio.engine import create_async_engine
from sqlalchemy.future import Connection

from yourproject.db.meta import meta
from yourproject.db.models import load_all_models
from yourproject.settings import settings

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

load_all_models()

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = meta


# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.


# filter object from specified schema
# https://alembic.sqlalchemy.org/en/latest/autogenerate.html#omitting-schema-names-from-the-autogenerate-process
def include_name_filter(name, type_, parent_names):
    if type_ == "schema":
        return name == target_metadata.schema
    else:
        return True


async def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.

    """
    context.configure(
        url=str(settings.DB_URL),
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
    """
    Run actual sync migrations.

    :param connection: connection to the database.
    """
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        # The following 3 lines are required to support non-default
        # database schema for our database objects
        version_table_schema=target_metadata.schema,
        include_schemas=True,
        include_name=include_name_filter
    )

    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online() -> None:
    """
    Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.
    """
    connectable = create_async_engine(str(settings.DB_URL))

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)


loop = asyncio.get_event_loop()
if context.is_offline_mode():
    task = run_migrations_offline()
else:
    task = run_migrations_online()

loop.run_until_complete(task)

yourproject/db/meta.py

import sqlalchemy as sa

from yourproject.settings import settings

# specified schema for all models generate with this metadata
meta = sa.MetaData(settings.DB_SCHEMA)

yourproject/db/models/__init__.py

import pkgutil
from pathlib import Path


def load_all_models() -> None:
    """Load all models from this folder."""
    package_dir = Path(__file__).resolve().parent
    modules = pkgutil.walk_packages(
        path=[str(package_dir)],
        prefix="yourproject/db/models.",
    )
    for module in modules:
        __import__(module.name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment