A backup copy of the code from https://david.rothlis.net/declarative-schema-migration-for-sqlite/
Code is MIT licensed, written by William Manley, copyright © 2019-2022 Stb-tester.com Ltd
A backup copy of the code from https://david.rothlis.net/declarative-schema-migration-for-sqlite/
Code is MIT licensed, written by William Manley, copyright © 2019-2022 Stb-tester.com Ltd
| # coding: utf-8 | |
| """Simple declarative schema migration for SQLite. | |
| See <https://david.rothlis.net/declarative-schema-migration-for-sqlite>. | |
| Author: William Manley <[email protected]>. | |
| Copyright © 2019-2022 Stb-tester.com Ltd. | |
| License: MIT. | |
| """ | |
| import logging | |
| import re | |
| import sqlite3 | |
| from textwrap import dedent | |
| def dumb_migrate_db(db, schema, allow_deletions=False): | |
| """ | |
| Migrates a database to the new schema given by the SQL text `schema` | |
| preserving the data. We create any table that exists in schema, delete any | |
| old table that is no longer used and add/remove columns and indices as | |
| necessary. | |
| Under this scheme there are a set of changes that we can make to the schema | |
| and this script will handle it fine: | |
| 1. Adding a new table | |
| 2. Adding, deleting or modifying an index | |
| 3. Adding a column to an existing table as long as the new column can be | |
| NULL or has a DEFAULT value specified. | |
| 4. Changing a column to remove NULL or DEFAULT as long as all values in the | |
| database are not NULL | |
| 5. Changing the type of a column | |
| 6. Changing the user_version | |
| In addition this function is capable of: | |
| 1. Deleting tables | |
| 2. Deleting columns from tables | |
| But only if allow_deletions=True. If the new schema requires a column/table | |
| to be deleted and allow_deletions=False this function will raise | |
| `RuntimeError`. | |
| Note: When this function is called a transaction must not be held open on | |
| db. A transaction will be used internally. If you wish to perform | |
| additional migration steps as part of a migration use DBMigrator directly. | |
| Any internally generated rowid columns by SQLite may change values by this | |
| migration. | |
| """ | |
| with DBMigrator(db, schema, allow_deletions) as migrator: | |
| migrator.migrate() | |
| return bool(migrator.n_changes) | |
| class DBMigrator: | |
| def __init__(self, db, schema, allow_deletions=False): | |
| self.db = db | |
| self.schema = schema | |
| self.allow_deletions = allow_deletions | |
| self.pristine = sqlite3.connect(":memory:") | |
| self.pristine.executescript(schema) | |
| self.n_changes = 0 | |
| self.orig_foreign_keys = None | |
| def log_execute(self, msg, sql, args=None): | |
| # It's important to log any changes we're making to the database for | |
| # forensics later | |
| msg_tmpl = "Database migration: %s with SQL:\n%s" | |
| msg_argv = (msg, _left_pad(dedent(sql))) | |
| if args: | |
| msg_tmpl += " args = %r" | |
| msg_argv += (args,) | |
| else: | |
| args = [] | |
| logging.info(msg_tmpl, *msg_argv) | |
| self.db.execute(sql, args) | |
| self.n_changes += 1 | |
| def __enter__(self): | |
| self.orig_foreign_keys = ( | |
| self.db.execute("PRAGMA foreign_keys").fetchone()[0]) | |
| if self.orig_foreign_keys: | |
| self.log_execute("Disable foreign keys temporarily for migration", | |
| "PRAGMA foreign_keys = OFF") | |
| # This doesn't count as a change because we'll undo it at the end | |
| self.n_changes = 0 | |
| self.db.__enter__() | |
| self.db.execute('BEGIN') | |
| return self | |
| def __exit__(self, exc_type, exc_value, exc_tb): | |
| self.db.__exit__(exc_type, exc_value, exc_tb) | |
| if exc_value is None: | |
| # The SQLite docs say: | |
| # | |
| # > This pragma is a no-op within a transaction; foreign key | |
| # > constraint enforcement may only be enabled or disabled when | |
| # > there is no pending BEGIN or SAVEPOINT. | |
| old_changes = self.n_changes | |
| new_val = self._migrate_pragma('foreign_keys') | |
| if new_val == self.orig_foreign_keys: | |
| self.n_changes = old_changes | |
| # SQLite docs say: | |
| # | |
| # > A VACUUM will fail if there is an open transaction on the database | |
| # > connection that is attempting to run the VACUUM. | |
| if self.n_changes: | |
| self.db.execute("VACUUM") | |
| else: | |
| if self.orig_foreign_keys: | |
| self.log_execute( | |
| "Re-enable foreign keys after migration", | |
| "PRAGMA foreign_keys = ON") | |
| def migrate(self): | |
| # In CI the database schema may be changing all the time. This checks | |
| # the current db and if it doesn't match database.sql we will | |
| # modify it so it does match where possible. | |
| pristine_tables = dict(self.pristine.execute("""\ | |
| SELECT name, sql FROM sqlite_master | |
| WHERE type = \"table\" AND name != \"sqlite_sequence\"""").fetchall()) | |
| pristine_indices = dict(self.pristine.execute("""\ | |
| SELECT name, sql FROM sqlite_master | |
| WHERE type = \"index\"""").fetchall()) | |
| tables = dict(self.db.execute("""\ | |
| SELECT name, sql FROM sqlite_master | |
| WHERE type = \"table\" AND name != \"sqlite_sequence\"""").fetchall()) | |
| new_tables = set(pristine_tables.keys()) - set(tables.keys()) | |
| removed_tables = set(tables.keys()) - set(pristine_tables.keys()) | |
| if removed_tables and not self.allow_deletions: | |
| raise RuntimeError( | |
| "Database migration: Refusing to delete tables %r" % | |
| removed_tables) | |
| modified_tables = set( | |
| name for name, sql in pristine_tables.items() | |
| if normalise_sql(tables.get(name, "")) != normalise_sql(sql)) | |
| # This PRAGMA is automatically disabled when the db is committed | |
| self.db.execute("PRAGMA defer_foreign_keys = TRUE") | |
| # New and removed tables are easy: | |
| for tbl_name in new_tables: | |
| self.log_execute("Create table %s" % tbl_name, | |
| pristine_tables[tbl_name]) | |
| for tbl_name in removed_tables: | |
| self.log_execute("Drop table %s" % tbl_name, | |
| "DROP TABLE %s" % tbl_name) | |
| for tbl_name in modified_tables: | |
| # The SQLite documentation insists that we create the new table and | |
| # rename it over the old rather than moving the old out of the way | |
| # and then creating the new | |
| create_table_sql = pristine_tables[tbl_name] | |
| create_table_sql = re.sub(r"\b%s\b" % re.escape(tbl_name), | |
| tbl_name + "_migration_new", | |
| create_table_sql) | |
| self.log_execute( | |
| "Columns change: Create table %s with updated schema" % | |
| tbl_name, create_table_sql) | |
| cols = set([ | |
| x[1] for x in self.db.execute( | |
| "PRAGMA table_info(%s)" % tbl_name)]) | |
| pristine_cols = set([ | |
| x[1] for x in | |
| self.pristine.execute("PRAGMA table_info(%s)" % tbl_name)]) | |
| removed_columns = cols - pristine_cols | |
| if not self.allow_deletions and removed_columns: | |
| logging.warning( | |
| "Database migration: Refusing to remove columns %r from " | |
| "table %s. Current cols are %r attempting migration to %r", | |
| removed_columns, tbl_name, cols, pristine_cols) | |
| raise RuntimeError( | |
| "Database migration: Refusing to remove columns %r from " | |
| "table %s" % (removed_columns, tbl_name)) | |
| logging.info("cols: %s, pristine_cols: %s", cols, pristine_cols) | |
| self.log_execute( | |
| "Migrate data for table %s" % tbl_name, """\ | |
| INSERT INTO {tbl_name}_migration_new ({common}) | |
| SELECT {common} FROM {tbl_name}""".format( | |
| tbl_name=tbl_name, | |
| common=", ".join(cols.intersection(pristine_cols)))) | |
| # Don't need the old table any more | |
| self.log_execute( | |
| "Drop old table %s now data has been migrated" % tbl_name, | |
| "DROP TABLE %s" % tbl_name) | |
| self.log_execute( | |
| "Columns change: Move new table %s over old" % tbl_name, | |
| "ALTER TABLE %s_migration_new RENAME TO %s" % ( | |
| tbl_name, tbl_name)) | |
| # Migrate the indices | |
| indices = dict(self.db.execute("""\ | |
| SELECT name, sql FROM sqlite_master | |
| WHERE type = \"index\"""").fetchall()) | |
| for name in set(indices.keys()) - set(pristine_indices.keys()): | |
| self.log_execute("Dropping obsolete index %s" % name, | |
| "DROP INDEX %s" % name) | |
| for name, sql in pristine_indices.items(): | |
| if name not in indices: | |
| self.log_execute("Creating new index %s" % name, sql) | |
| elif sql != indices[name]: | |
| self.log_execute( | |
| "Index %s changed: Dropping old version" % name, | |
| "DROP INDEX %s" % name) | |
| self.log_execute( | |
| "Index %s changed: Creating updated version in its place" % | |
| name, sql) | |
| self._migrate_pragma('user_version') | |
| if self.pristine.execute("PRAGMA foreign_keys").fetchone()[0]: | |
| if self.db.execute("PRAGMA foreign_key_check").fetchall(): | |
| raise RuntimeError( | |
| "Database migration: Would fail foreign_key_check") | |
| def _migrate_pragma(self, pragma): | |
| pristine_val = self.pristine.execute( | |
| "PRAGMA %s" % pragma).fetchone()[0] | |
| val = self.db.execute("PRAGMA %s" % pragma).fetchone()[0] | |
| if val != pristine_val: | |
| self.log_execute( | |
| "Set %s to %i from %i" % (pragma, pristine_val, val), | |
| "PRAGMA %s = %i" % (pragma, pristine_val)) | |
| return pristine_val | |
| def _left_pad(text, indent=" "): | |
| """Maybe I can find a package in pypi for this?""" | |
| return "\n".join(indent + line for line in text.split('\n')) | |
| def normalise_sql(sql): | |
| # Remove comments: | |
| sql = re.sub(r'--[^\n]*\n', "", sql) | |
| # Normalise whitespace: | |
| sql = re.sub(r'\s+', " ", sql) | |
| sql = re.sub(r" *([(),]) *", r"\1", sql) | |
| # Remove unnecessary quotes | |
| sql = re.sub(r'"(\w+)"', r"\1", sql) | |
| return sql.strip() | |
| def test_normalise_sql(): | |
| assert normalise_sql("""\ | |
| CREATE TABLE "Node"( -- This is my table | |
| -- There are many like it but this one is mine | |
| A b, C D, "E F G", h)""") == \ | |
| 'CREATE TABLE Node(A b,C D,"E F G",h)' |
| # coding: utf-8 | |
| """Unit tests for "Simple declarative schema migration for SQLite". | |
| See <https://david.rothlis.net/declarative-schema-migration-for-sqlite>. | |
| Author: William Manley <[email protected]>. | |
| Copyright © 2019-2022 Stb-tester.com Ltd. | |
| License: MIT. | |
| """ | |
| import logging | |
| import sqlite3 | |
| import pytest | |
| from migrator import dumb_migrate_db, normalise_sql | |
| _TEST_SCHEMAS = [ | |
| # 0 | |
| "", | |
| # 1 | |
| """\ | |
| CREATE TABLE Node( | |
| node_oid INTEGER PRIMARY KEY NOT NULL, | |
| node_id INTEGER NOT NULL); | |
| CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
| """, | |
| # 2 | |
| # Added Node.active | |
| # Changed node_id type from INTEGER to TEXT | |
| # New table Job | |
| """\ | |
| PRAGMA foreign_keys = 1; | |
| CREATE TABLE Node( | |
| node_oid INTEGER PRIMARY KEY NOT NULL, | |
| node_id TEXT NOT NULL, | |
| active BOOLEAN NOT NULL DEFAULT(1), | |
| something_else TEXT); | |
| CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
| CREATE TABLE Job( | |
| node_oid INTEGER NOT NULL, | |
| id INTEGER NOT NULL, | |
| FOREIGN KEY(node_oid) REFERENCES Node(node_oid)); | |
| CREATE UNIQUE INDEX Job_node_oid on Job(node_oid, id); | |
| """, | |
| # 3 | |
| # Remove field something_else. Note: this is significant because | |
| # Job.node_oid references table Node which must be recreated. | |
| """\ | |
| PRAGMA foreign_keys = 1; | |
| CREATE TABLE Node( | |
| node_oid INTEGER PRIMARY KEY NOT NULL, | |
| node_id TEXT NOT NULL, | |
| active BOOLEAN NOT NULL DEFAULT(1)); | |
| CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
| CREATE TABLE Job( | |
| node_oid INTEGER NOT NULL, | |
| id INTEGER NOT NULL, | |
| FOREIGN KEY(node_oid) REFERENCES Node(node_oid)); | |
| CREATE UNIQUE INDEX Job_node_oid on Job(node_oid, id); | |
| """, | |
| # 4 | |
| # Change index Node_node_id field | |
| # Delete index Job_node_id | |
| # Set user_version = 6 | |
| """\ | |
| PRAGMA foreign_keys = 1; | |
| CREATE TABLE Node( | |
| node_oid INTEGER PRIMARY KEY NOT NULL, | |
| node_id TEXT NOT NULL, | |
| active BOOLEAN NOT NULL DEFAULT(1)); | |
| CREATE UNIQUE INDEX Node_node_id on Node(node_oid); | |
| CREATE TABLE Job( | |
| node_oid INTEGER NOT NULL, | |
| id INTEGER NOT NULL, | |
| FOREIGN KEY(node_oid) REFERENCES Node(node_oid)); | |
| CREATE UNIQUE INDEX Job_node_oid on Job(node_oid, id); | |
| PRAGMA user_version = 6; | |
| """, | |
| # 5 | |
| # (vs. schema[1]) - Change Node.active default from 1 to 2 | |
| """\ | |
| CREATE TABLE Node( | |
| node_oid INTEGER PRIMARY KEY NOT NULL, | |
| node_id TEXT NOT NULL, | |
| active BOOLEAN NOT NULL DEFAULT(2)); | |
| CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
| """ | |
| ] | |
| def test_dumb_db_migration_schema_migration(): | |
| db = None | |
| def dump_sqlite_master(db): | |
| out = [] | |
| for type_, name, tbl_name, sql in db.execute( | |
| "SELECT type, name, tbl_name, sql FROM sqlite_master"): | |
| out.append({ | |
| "type": type_, | |
| "name": name, | |
| "tbl_name": tbl_name, | |
| "sql": normalise_sql(sql), | |
| }) | |
| out.sort(key=lambda x: x['name']) | |
| return out | |
| def assert_schema_equal(schema): | |
| pristine = sqlite3.connect(':memory:') | |
| pristine.executescript(schema) | |
| assert dump_sqlite_master(pristine) == dump_sqlite_master(db) | |
| pristine_sql = "\n".join(sorted( | |
| normalise_sql(x) for x in pristine.iterdump())) | |
| db_sql = "\n".join(sorted(normalise_sql(x) for x in db.iterdump())) | |
| assert pristine_sql == db_sql | |
| for pragma in ["user_version", "foreign_keys"]: | |
| assert pristine.execute("PRAGMA %s" % pragma).fetchone()[0] == \ | |
| db.execute("PRAGMA %s" % pragma).fetchone()[0], \ | |
| "Value for PRAGMA %s does not match" % pragma | |
| db = sqlite3.connect(':memory:', isolation_level=None) | |
| assert_schema_equal(_TEST_SCHEMAS[0]) | |
| combos = [ | |
| # from, to, need_allow_deletions | |
| (0, 0, False), | |
| (0, 1, False), | |
| (0, 2, False), | |
| (0, 3, False), | |
| (0, 4, False), | |
| (1, 0, True), | |
| (1, 1, False), | |
| (1, 2, False), | |
| (1, 3, False), | |
| (1, 4, False), | |
| (2, 0, True), | |
| (2, 1, True), | |
| (2, 2, False), | |
| (2, 3, True), | |
| (2, 4, True), | |
| (3, 0, True), | |
| (3, 1, True), | |
| (3, 2, False), | |
| (3, 3, False), | |
| (3, 4, False), | |
| ] | |
| for from_, to, need_allow_deletions in combos: | |
| db = sqlite3.connect(':memory:', isolation_level=None) | |
| logging.info("Testing from %s to %s", from_, to) | |
| db.executescript(_TEST_SCHEMAS[from_]) | |
| if need_allow_deletions: | |
| with pytest.raises(RuntimeError): | |
| dumb_migrate_db(db, _TEST_SCHEMAS[to]) | |
| # The transaction should make the RuntimeError above revert any work | |
| # in progress | |
| assert_schema_equal(_TEST_SCHEMAS[from_]) | |
| changed = dumb_migrate_db( | |
| db, _TEST_SCHEMAS[to], allow_deletions=need_allow_deletions) | |
| assert changed == (from_ != to) | |
| assert_schema_equal(_TEST_SCHEMAS[to]) | |
| assert not dumb_migrate_db(db, _TEST_SCHEMAS[to]) | |
| def test_dumb_db_migration_data_migration(): | |
| # Check that data is preserved during the migration: | |
| db = sqlite3.connect(':memory:', isolation_level=None) | |
| db.executescript(_TEST_SCHEMAS[1]) | |
| db.executemany("""\ | |
| INSERT INTO Node(node_oid, node_id) | |
| VALUES (?, ?)""", [ | |
| (0, 0), | |
| (1, 100), | |
| ]) | |
| assert db.execute("SELECT node_oid, node_id FROM Node").fetchall() == [ | |
| (0, 0), | |
| (1, 100), | |
| ] | |
| dumb_migrate_db(db, _TEST_SCHEMAS[2]) | |
| assert db.execute( | |
| "SELECT node_oid, node_id, active FROM Node").fetchall() == [ | |
| (0, "0", 1), | |
| (1, "100", 1), | |
| ] | |
| db.execute("UPDATE Node SET active = 0, node_id = \"abc\" " | |
| "WHERE node_oid == 0") | |
| # Insert Job data. It has a FOREIGN KEY back into Node. We want to be sure | |
| # that this FOREIGN KEY isn't confused by the migration | |
| db.executemany("""\ | |
| INSERT INTO Job(node_oid, id) | |
| VALUES (?, ?)""", [ | |
| (0, 1234), | |
| (0, 5432), | |
| (1, 1234), | |
| (1, 9876), | |
| ]) | |
| assert db.execute("""\ | |
| SELECT node_id, id | |
| FROM Job | |
| INNER JOIN Node ON Node.node_oid == Job.node_oid""").fetchall() == [ | |
| ("abc", 1234), | |
| ("abc", 5432), | |
| ("100", 1234), | |
| ("100", 9876), | |
| ] | |
| dumb_migrate_db(db, _TEST_SCHEMAS[3], allow_deletions=True) | |
| assert db.execute("""\ | |
| SELECT node_id, id | |
| FROM Job | |
| INNER JOIN Node ON Node.node_oid == Job.node_oid""").fetchall() == [ | |
| ("abc", 1234), | |
| ("abc", 5432), | |
| ("100", 1234), | |
| ("100", 9876), | |
| ] | |
| # The new default for active should not affect existing rows with defaulted | |
| # values: | |
| dumb_migrate_db(db, _TEST_SCHEMAS[4]) | |
| assert db.execute( | |
| "SELECT node_oid, node_id, active FROM Node").fetchall() == [ | |
| (0, "abc", 0), | |
| (1, "100", 1), | |
| ] | |
| db.execute("UPDATE Node SET active = 0, node_id = \"0\" " | |
| "WHERE node_oid == 0") | |
| # And delete the active column again removing the data: | |
| dumb_migrate_db(db, _TEST_SCHEMAS[1], allow_deletions=True) | |
| assert db.execute( | |
| "SELECT node_oid, node_id FROM Node").fetchall() == [ | |
| (0, 0), | |
| (1, 100), | |
| ] |