Skip to content

Instantly share code, notes, and snippets.

@nickretallack
Last active March 16, 2023 20:57
Show Gist options
  • Save nickretallack/bb8ca0e37829b4722dd1 to your computer and use it in GitHub Desktop.
Save nickretallack/bb8ca0e37829b4722dd1 to your computer and use it in GitHub Desktop.
How to run multi-tenant migrations in alembic.
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool, MetaData, Table, ForeignKeyConstraint
from logging.config import fileConfig
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
from flask import current_app
config.set_main_option('sqlalchemy.url', current_app.config.get('SQLALCHEMY_DATABASE_URI'))
prototype_schema = current_app.config['PROTOTYPE_SCHEMA'] # base migrations on this schema
public_schema_tables = current_app.config['PUBLIC_SCHEMA_TABLES'] # table names that belong in 'public', not tenant schemas
get_schemas_query = current_app.config['GET_SCHEMAS_QUERY'] # query that returns a list of tenant schemas
def include_schemas(names):
# produce an include object function that filters on the given schemas
def include_object(object, name, type_, reflected, compare_to):
if type_ == "table":
return object.schema in names
return True
return include_object
def lookup_correct_schema(name):
if name in public_schema_tables:
return 'public'
else:
return prototype_schema
def _get_table_key(name, schema):
if schema is None:
return name
else:
return schema + "." + name
def tometadata(table, metadata, schema):
key = _get_table_key(table.name, schema)
if key in metadata.tables:
return metadata.tables[key]
args = []
for c in table.columns:
args.append(c.copy(schema=schema))
new_table = Table(
table.name, metadata, schema=schema,
*args, **table.kwargs
)
for c in table.constraints:
if isinstance(c, ForeignKeyConstraint):
constraint_schema = lookup_correct_schema(c.elements[0].column.table.name)
else:
constraint_schema = schema
new_table.append_constraint(c.copy(schema=constraint_schema, target_table=new_table))
for index in table.indexes:
# skip indexes that would be generated
# by the 'index' flag on Column
if len(index.columns) == 1 and \
list(index.columns)[0].index:
continue
Index(index.name,
unique=index.unique,
*[new_table.c[col] for col in index.columns.keys()],
**index.kwargs)
return table._schema_item_copy(new_table)
meta = current_app.extensions['migrate'].db.metadata
meta_schemax = MetaData()
for table in meta.tables.values():
tometadata(table, meta_schemax, lookup_correct_schema(table.name))
target_metadata = meta_schemax
# target_metadata = current_app.extensions['migrate'].db.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
schemas = set([prototype_schema,None])
connection = engine.connect()
context.configure(
connection=connection,
target_metadata=target_metadata,
include_schemas=True, #schemas,
include_object=include_schemas([None,prototype_schema])
)
try:
# get the schema names
tenant_schemas = [row[0] for row in connection.execute(get_schemas_query)]
for schema in tenant_schemas:
connection.execute('set search_path to "{}", public'.format(schema))
with context.begin_transaction():
context.run_migrations()
finally:
connection.close()
if context.is_offline_mode():
print "Can't run migrations offline"
# run_migrations_offline()
else:
run_migrations_online()
@djdduty
Copy link

djdduty commented May 24, 2016

Is it possible to see an example of your config file too? I'm wondering what that query and everything looks like.

@djdduty
Copy link

djdduty commented May 24, 2016

Actually, I don't think this helps me much. It seems you use some flask extension to retrieve some meta data about the models? I'm using pyramid and I don't care so much about auto generating migrations, but I am trying to figure out how I can have both shared and tenant model migrations in a single environment. Currently I'm using one environment for shared migrations on the public schema and another looping through tenants like how you've done it here.

@imtiaz-emu
Copy link

imtiaz-emu commented Nov 10, 2016

Can you post your config file? I've a User model (Public Schema) and Dashboard model (Private Schema).
For every entry in user table I need to create a new schema where a new table named dashboard will be created. How can I achieve that? I've posted a question in StackOverflow. Can you please take a look?

@djduty, @nickretallack

@bfmcneill
Copy link

Is this still a best practice or is there a better way to migrate schemas?

I am using SQL server and have two schemas. A "dbo" and a "stage"

Everything is working nicely except for each migration foreign key constraints are getting dropped and created. I have the include_schemas=True

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment