Last active
November 5, 2024 12:59
-
-
Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Setup alembic to work properly with PostgreSQL schemas
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import with_statement | |
from alembic import context | |
from sqlalchemy import engine_from_config, pool | |
from logging.config import fileConfig | |
from models import Base | |
config = context.config | |
fileConfig(config.config_file_name) | |
""" | |
Load models metadata. We should define schema in this class firstly, | |
or set schema implicit with `__table_args__ = {'schema' : 'test'}` in model class | |
""" | |
target_metadata = Base.metadata | |
def run_migrations_offline(): | |
url = config.get_main_option("sqlalchemy.url") | |
context.configure( | |
url=url, target_metadata=target_metadata, literal_binds=True) | |
with context.begin_transaction(): | |
context.run_migrations() | |
def run_migrations_online(): | |
connectable = engine_from_config( | |
config.get_section(config.config_ini_section), | |
prefix='sqlalchemy.', | |
poolclass=pool.NullPool) | |
with connectable.connect() as connection: | |
""" | |
Configure migration context | |
1. Pass our models metadata | |
2. Set schema for alembic_version table | |
3. Load all available schemas | |
""" | |
context.configure( | |
connection=connection, | |
target_metadata=target_metadata, | |
version_table_schema=target_metadata.schema, | |
include_schemas=True | |
) | |
with context.begin_transaction(): | |
""" | |
By default search_path is setted to "$user",public | |
that why alembic can't create foreign keys correctly | |
""" | |
context.execute('SET search_path TO public') | |
context.run_migrations() | |
if context.is_offline_mode(): | |
run_migrations_offline() | |
else: | |
run_migrations_online() |
here is my solution for work properly with PostgreSQL schema using asyncpg and asyncio in sqlalchemy.
what counts is like the official document and above said
configure filter object from specified schema
def include_name(name, type_, parent_names):
if type_ == "schema":
return name == target_metadata.schema
else:
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=target_metadata.schema,
include_schemas=True,
include_name=include_name
)
After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution:
[The only solution that worked for me on February 16, 2024]
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) # <-- The magic line
with context.begin_transaction():
context.run_migrations()
Hope this helps
After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution: [The only solution that worked for me on February 16, 2024]
def run_migrations_online() -> None: """Run migrations in 'online' mode. In this scenario we need to create an Engine and associate a connection with the context. """ connectable = engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) with connectable.connect() as connection: context.configure(connection=connection, target_metadata=target_metadata) connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) # <-- The magic line with context.begin_transaction(): context.run_migrations()Hope this helps
This solution works for me. Thanks to share it!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For myself I figured out this solution (after reviewing Alembic source code):
Setting search path did not work for me. It started finding alembic_version table, some foreign keys, etc.