Created
February 7, 2020 23:12
-
-
Save staab/64784f18e5a1f9117f4fd9f0861e7d80 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re, importlib.util, os | |
from time import time | |
from utils.logging import logger | |
from utils.dt import Dt | |
from utils.misc import uuid_str | |
from lib.sql.query import clauses as c | |
logger = logger.getChild(__name__) | |
create_schema_q = c.sql(""" | |
CREATE SCHEMA IF NOT EXISTS {} | |
""") | |
table_exists_q = """ | |
SELECT 1 from pg_tables | |
WHERE tablename = 'migration' | |
AND schemaname = %(schema)s | |
""" | |
create_table_q = c.sql(""" | |
CREATE TABLE {}."migration" ( | |
"id" uuid NOT NULL CONSTRAINT migration_pk PRIMARY KEY, | |
"app_name" varchar(255) NOT NULL, | |
"created" timestamp NOT NULL, | |
"version" varchar(255) NOT NULL, | |
"number" smallint NOT NULL | |
) | |
""") | |
max_applied_q = c.sql(""" | |
SELECT number | |
FROM {}.migration | |
WHERE app_name = %(app_name)s | |
ORDER BY number DESC | |
LIMIT 1 | |
""") | |
add_migration_q = c.sql(""" | |
INSERT INTO {}.migration (id, app_name, created, version, number) | |
VALUES (%(id)s, %(app_name)s, %(created)s, %(version)s, %(number)s) | |
""") | |
delete_obsolete_migrations_q = c.sql(""" | |
DELETE FROM {}.migration WHERE number > %(number)s AND app_name = %(app_name)s | |
""") | |
delete_migration_q = c.sql(""" | |
DELETE FROM {}.migration WHERE version = %(version)s AND app_name = %(app_name)s | |
""") | |
def migrate(schema, app_name, all_versions, db, db_version): | |
now = time() | |
schema_sql = c.field(schema) | |
# Make sure the given version is valid | |
max_version = len(all_versions) - 1 | |
db_version = max(0, min(db_version, max_version)) | |
# Make sure the schema exists | |
db.execute(create_schema_q.format(schema_sql)) | |
with db.using(schema): | |
# Get our most recently applied migration (if any) | |
if db.scalar(table_exists_q, {'schema': schema}): | |
max_applied = db.scalar(max_applied_q.format(schema_sql), { | |
'app_name': app_name, | |
}) | |
else: | |
db.execute(create_table_q.format(schema_sql)) | |
max_applied = None | |
# If we recently squashed migrations, delete migrations > max | |
db.execute(delete_obsolete_migrations_q.format(schema_sql), { | |
'number': max_version, | |
'app_name': app_name, | |
}) | |
# Figure out if we're going forward or backward or nowhere | |
if max_applied is None: | |
direction = 'forward' | |
versions = all_versions[0:db_version + 1] | |
elif db_version > max_applied: | |
direction = 'forward' | |
versions = all_versions[max_applied + 1:db_version + 1] | |
elif db_version < max_applied: | |
direction = 'backward' | |
versions = reversed(all_versions[db_version + 1:max_applied + 1]) | |
raise ValueError( | |
'Backward migrations are not supported (tried to migrate from {} to {})'.format( | |
max_applied, | |
db_version | |
) | |
) | |
else: | |
logger.info("No migrations to apply for db {}".format(schema)) | |
return | |
def exec_file(path, name): | |
# Execute straight-up sql | |
try: | |
with open('{}/{}.sql'.format(path, name)) as f: | |
db.execute(f.read()) | |
except FileNotFoundError: | |
pass | |
# If there needs to be a python component, do that too | |
py_path = '{}/{}.py'.format(path, name) | |
if os.path.isfile(py_path): | |
spec = importlib.util.spec_from_file_location(name, py_path) | |
module = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(module) | |
module.run() | |
logger.info("Migrating {} db: {} -> {}".format(schema, max_applied, db_version)) | |
# Apply all selected migrations | |
for version in versions: | |
name = version['name'] | |
path = version['path'] | |
if not re.match(r'^v\d{4}$', name): | |
raise ValueError('Badly named db version {}'.format(name)) | |
# Run it and track it | |
with db.using(schema): | |
if direction == 'forward': | |
exec_file(path, 'forward') | |
db.execute(add_migration_q.format(schema_sql), { | |
'id': uuid_str(), | |
'app_name': app_name, | |
'created': Dt.now(), | |
'version': name, | |
'number': int(name[1:]), | |
}) | |
if direction == 'backward': | |
exec_file(path, 'backward') | |
db.execute(delete_migration_q.format(schema_sql), { | |
'version': name, | |
'app_name': app_name, | |
}) | |
elapsed = int(time() - now) | |
logger.info("Done applying migrations (took {} seconds)".format(elapsed)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment