Skip to content

Instantly share code, notes, and snippets.

@h4
Last active November 5, 2024 12:59
Show Gist options
  • Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Setup alembic to work properly with PostgreSQL schemas
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
from models import Base
config = context.config
fileConfig(config.config_file_name)
"""
Load models metadata. We should define schema in this class firstly,
or set schema implicit with `__table_args__ = {'schema' : 'test'}` in model class
"""
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
with connectable.connect() as connection:
"""
Configure migration context
1. Pass our models metadata
2. Set schema for alembic_version table
3. Load all available schemas
"""
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=target_metadata.schema,
include_schemas=True
)
with context.begin_transaction():
"""
By default search_path is setted to "$user",public
that why alembic can't create foreign keys correctly
"""
context.execute('SET search_path TO public')
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
@ddasilva
Copy link

ddasilva commented Apr 5, 2018

Thanks!

@mikkelam
Copy link

Not sure why this isn't default... thanks

@mguidone
Copy link

mguidone commented Feb 6, 2020

Perfect, thank you for posting this! Solved my multi-schema issue AND forewarned me of pending issues with foreign keys.

@mailint-daniellee
Copy link

Holy moly this took ages of searching to find this little gem!! Thanks!

@eranhirs
Copy link

eranhirs commented Aug 9, 2021

In case this helps someone else: This didn't work for me.
When writing the upgrade operations, for example:
op.add_column('account', sa.Column('last_transaction_date', sa.DateTime))
I had to add the schema variable
op.add_column('account', sa.Column('last_transaction_date', sa.DateTime), schema='my_schema')

The following will not work, because it adds quotes into the SQL statement
op.add_column('my_schema.account', sa.Column('last_transaction_date', sa.DateTime))

@Shaul-Z
Copy link

Shaul-Z commented Oct 5, 2021

Didn't work for me either. I have been messing with it for the last couple of days and I couldn't find any support for specifying schema in Alembic. The only thing I did find is in Alembic Cookbook https://alembic.sqlalchemy.org/en/latest/cookbook.html
Attached below, But didn't work for me as well.
Would appreciate any help from whoever have a good clean solution for this, if not I hope Almbic's team aware of this.
Don't know why but Django makes it super easy.

Rudimental Schema-Level Multi Tenancy for PostgreSQL Databases

Multi tenancy refers to an application that accommodates for many clients simultaneously. Within the scope of a database migrations tool, multi-tenancy typically refers to the practice of maintaining multiple, identical databases where each database is assigned to one client.

Alembic does not currently have explicit multi-tenant support; typically, the approach must involve running Alembic multiple times against different database URLs.

One common approach to multi-tenancy, particularly on the PostgreSQL database, is to install tenants within individual PostgreSQL schemas. When using PostgreSQL’s schemas, a special variable search_path is offered that is intended to assist with targeting of different schemas.

Note SQLAlchemy includes a system of directing a common set of Table metadata to many schemas called schema_translate_map. Alembic at the time of this writing lacks adequate support for this feature. The recipe below should be considered interim until Alembic has more first-class support for schema-level multi-tenancy.
The recipe below can be altered for flexibility. The primary purpose of this recipe is to illustrate how to point the Alembic process towards one PostgreSQL schema or another.

The model metadata used as the target for autogenerate must not include any schema name for tables; the schema must be non-present or set to None. Otherwise, Alembic autogenerate will still attempt to compare and render tables in terms of this schema:

class A(Base):
    __tablename__ = 'a'

    id = Column(Integer, primary_key=True)
    data = Column(UnicodeText())
    foo = Column(Integer)

    __table_args__ = {
        "schema": None
    }

The EnvironmentContext.configure.include_schemas flag must also be False or not included.

The “tenant” will be a schema name passed to Alembic using the “-x” flag. In env.py an approach like the following allows -xtenant=some_schema to be supported by making use of EnvironmentContext.get_x_argument():

def run_migrations_online():
    connectable = engine_from_config(
        config.get_section(config.config_ini_section),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    current_tenant = context.get_x_argument(as_dictionary=True).get("tenant")
    with connectable.connect() as connection:

        # set search path on the connection, which ensures that
        # PostgreSQL will emit all CREATE / ALTER / DROP statements
        # in terms of this schema by default
        connection.execute("set search_path to %s" % current_tenant)

        # make use of non-supported SQLAlchemy attribute to ensure
        # the dialect reflects tables in terms of the current tenant name
        connection.dialect.default_schema_name = current_tenant

        context.configure(
            connection=connection,
            target_metadata=target_metadata,
        )

        with context.begin_transaction():
            context.run_migrations()

The current tenant is set using the PostgreSQL search_path variable on the connection. Note above we must employ a non-supported SQLAlchemy workaround at the moment which is to hardcode the SQLAlchemy dialect’s default schema name to our target schema.

It is also important to note that the above changes remain on the connection permanently unless reversed explicitly. If the alembic application simply exits above, there is no issue. However if the application attempts to continue using the above connection for other purposes, it may be necessary to reset these variables back to the default, which for PostgreSQL is usually the name “public” however may be different based on configuration.

Alembic operations will now proceed in terms of whichever schema we pass on the command line. All logged SQL will show no schema,
except for reflection operations which will make use of the default_schema_name attribute:

[]$ alembic -x tenant=some_schema revision -m "rev1" --autogenerate

Since all schemas are to be maintained in sync, autogenerate should be run against only one schema, generating new Alembic migration
files. Autogenerate migratin operations are then run against all schemas.

@bikashckarmokar
Copy link

bikashckarmokar commented Mar 2, 2023

The cookbook example worked for me. The updated configuration is given on the below link for SQLAlchemy 2+.

https://alembic.sqlalchemy.org/en/latest/cookbook.html#rudimental-schema-level-multi-tenancy-for-postgresql-databases

        class A(Base):
              __tablename__ = 'a'
          
              id = Column(Integer, primary_key=True)
              data = Column(UnicodeText())
              foo = Column(Integer)
          
              __table_args__ = {
                  "schema": None
              }


         from sqlalchemy import text

        def run_migrations_online():
            connectable = engine_from_config(
                config.get_section(config.config_ini_section),
                prefix="sqlalchemy.",
                poolclass=pool.NullPool,
            )
        
            current_tenant = context.get_x_argument(as_dictionary=True).get("tenant")
            with connectable.connect() as connection:
        
                # set search path on the connection, which ensures that
                # PostgreSQL will emit all CREATE / ALTER / DROP statements
                # in terms of this schema by default
                connection.execute(text('set search_path to "%s"' % current_tenant))
                # in SQLAlchemy v2+ the search path change needs to be committed
                connection.commit()
        
                # make use of non-supported SQLAlchemy attribute to ensure
                # the dialect reflects tables in terms of the current tenant name
                connection.dialect.default_schema_name = current_tenant
        
                context.configure(
                    connection=connection,
                    target_metadata=target_metadata,
                )
        
                with context.begin_transaction():
                    context.run_migrations()

@dmugtasimov
Copy link

For myself I figured out this solution (after reviewing Alembic source code):

    database_schema = settings.database_schema


    def include_name_filter(name, type_, parent_names):
        if type_ == "schema":
            return name == database_schema

        return True

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            # The following 3 lines are required to support non-default
            # database schema for our database objects
            version_table_schema=database_schema,
            include_schemas=True,
            include_name=include_name_filter,
        )

        connection.execute(text(CREATE_SCHEMA_STATEMENT))

        with context.begin_transaction():
            context.run_migrations()

Setting search path did not work for me. It started finding alembic_version table, some foreign keys, etc.

@Luocy7
Copy link

Luocy7 commented Oct 18, 2023

here is my solution for work properly with PostgreSQL schema using asyncpg and asyncio in sqlalchemy.

what counts is like the official document and above said
configure filter object from specified schema

def include_name(name, type_, parent_names):
    if type_ == "schema":
        return name == target_metadata.schema
    else:
        return True

context.configure(
        connection=connection,
        target_metadata=target_metadata,

        version_table_schema=target_metadata.schema,
        include_schemas=True,
        include_name=include_name
    )

@Ilyes-git
Copy link

Ilyes-git commented Feb 16, 2024

After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution:
[The only solution that worked for me on February 16, 2024]

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) #  <-- The magic line

        with context.begin_transaction():
            context.run_migrations()

Hope this helps

@thiagoolsilva
Copy link

After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution: [The only solution that worked for me on February 16, 2024]

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) #  <-- The magic line

        with context.begin_transaction():
            context.run_migrations()

Hope this helps

This solution works for me. Thanks to share it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment