A backup copy of the code from https://david.rothlis.net/declarative-schema-migration-for-sqlite/
Code is MIT licensed, written by William Manley, copyright © 2019-2022 Stb-tester.com Ltd
A backup copy of the code from https://david.rothlis.net/declarative-schema-migration-for-sqlite/
Code is MIT licensed, written by William Manley, copyright © 2019-2022 Stb-tester.com Ltd
# coding: utf-8 | |
"""Simple declarative schema migration for SQLite. | |
See <https://david.rothlis.net/declarative-schema-migration-for-sqlite>. | |
Author: William Manley <[email protected]>. | |
Copyright © 2019-2022 Stb-tester.com Ltd. | |
License: MIT. | |
""" | |
import logging | |
import re | |
import sqlite3 | |
from textwrap import dedent | |
def dumb_migrate_db(db, schema, allow_deletions=False): | |
""" | |
Migrates a database to the new schema given by the SQL text `schema` | |
preserving the data. We create any table that exists in schema, delete any | |
old table that is no longer used and add/remove columns and indices as | |
necessary. | |
Under this scheme there are a set of changes that we can make to the schema | |
and this script will handle it fine: | |
1. Adding a new table | |
2. Adding, deleting or modifying an index | |
3. Adding a column to an existing table as long as the new column can be | |
NULL or has a DEFAULT value specified. | |
4. Changing a column to remove NULL or DEFAULT as long as all values in the | |
database are not NULL | |
5. Changing the type of a column | |
6. Changing the user_version | |
In addition this function is capable of: | |
1. Deleting tables | |
2. Deleting columns from tables | |
But only if allow_deletions=True. If the new schema requires a column/table | |
to be deleted and allow_deletions=False this function will raise | |
`RuntimeError`. | |
Note: When this function is called a transaction must not be held open on | |
db. A transaction will be used internally. If you wish to perform | |
additional migration steps as part of a migration use DBMigrator directly. | |
Any internally generated rowid columns by SQLite may change values by this | |
migration. | |
""" | |
with DBMigrator(db, schema, allow_deletions) as migrator: | |
migrator.migrate() | |
return bool(migrator.n_changes) | |
class DBMigrator: | |
def __init__(self, db, schema, allow_deletions=False): | |
self.db = db | |
self.schema = schema | |
self.allow_deletions = allow_deletions | |
self.pristine = sqlite3.connect(":memory:") | |
self.pristine.executescript(schema) | |
self.n_changes = 0 | |
self.orig_foreign_keys = None | |
def log_execute(self, msg, sql, args=None): | |
# It's important to log any changes we're making to the database for | |
# forensics later | |
msg_tmpl = "Database migration: %s with SQL:\n%s" | |
msg_argv = (msg, _left_pad(dedent(sql))) | |
if args: | |
msg_tmpl += " args = %r" | |
msg_argv += (args,) | |
else: | |
args = [] | |
logging.info(msg_tmpl, *msg_argv) | |
self.db.execute(sql, args) | |
self.n_changes += 1 | |
def __enter__(self): | |
self.orig_foreign_keys = ( | |
self.db.execute("PRAGMA foreign_keys").fetchone()[0]) | |
if self.orig_foreign_keys: | |
self.log_execute("Disable foreign keys temporarily for migration", | |
"PRAGMA foreign_keys = OFF") | |
# This doesn't count as a change because we'll undo it at the end | |
self.n_changes = 0 | |
self.db.__enter__() | |
self.db.execute('BEGIN') | |
return self | |
def __exit__(self, exc_type, exc_value, exc_tb): | |
self.db.__exit__(exc_type, exc_value, exc_tb) | |
if exc_value is None: | |
# The SQLite docs say: | |
# | |
# > This pragma is a no-op within a transaction; foreign key | |
# > constraint enforcement may only be enabled or disabled when | |
# > there is no pending BEGIN or SAVEPOINT. | |
old_changes = self.n_changes | |
new_val = self._migrate_pragma('foreign_keys') | |
if new_val == self.orig_foreign_keys: | |
self.n_changes = old_changes | |
# SQLite docs say: | |
# | |
# > A VACUUM will fail if there is an open transaction on the database | |
# > connection that is attempting to run the VACUUM. | |
if self.n_changes: | |
self.db.execute("VACUUM") | |
else: | |
if self.orig_foreign_keys: | |
self.log_execute( | |
"Re-enable foreign keys after migration", | |
"PRAGMA foreign_keys = ON") | |
def migrate(self): | |
# In CI the database schema may be changing all the time. This checks | |
# the current db and if it doesn't match database.sql we will | |
# modify it so it does match where possible. | |
pristine_tables = dict(self.pristine.execute("""\ | |
SELECT name, sql FROM sqlite_master | |
WHERE type = \"table\" AND name != \"sqlite_sequence\"""").fetchall()) | |
pristine_indices = dict(self.pristine.execute("""\ | |
SELECT name, sql FROM sqlite_master | |
WHERE type = \"index\"""").fetchall()) | |
tables = dict(self.db.execute("""\ | |
SELECT name, sql FROM sqlite_master | |
WHERE type = \"table\" AND name != \"sqlite_sequence\"""").fetchall()) | |
new_tables = set(pristine_tables.keys()) - set(tables.keys()) | |
removed_tables = set(tables.keys()) - set(pristine_tables.keys()) | |
if removed_tables and not self.allow_deletions: | |
raise RuntimeError( | |
"Database migration: Refusing to delete tables %r" % | |
removed_tables) | |
modified_tables = set( | |
name for name, sql in pristine_tables.items() | |
if normalise_sql(tables.get(name, "")) != normalise_sql(sql)) | |
# This PRAGMA is automatically disabled when the db is committed | |
self.db.execute("PRAGMA defer_foreign_keys = TRUE") | |
# New and removed tables are easy: | |
for tbl_name in new_tables: | |
self.log_execute("Create table %s" % tbl_name, | |
pristine_tables[tbl_name]) | |
for tbl_name in removed_tables: | |
self.log_execute("Drop table %s" % tbl_name, | |
"DROP TABLE %s" % tbl_name) | |
for tbl_name in modified_tables: | |
# The SQLite documentation insists that we create the new table and | |
# rename it over the old rather than moving the old out of the way | |
# and then creating the new | |
create_table_sql = pristine_tables[tbl_name] | |
create_table_sql = re.sub(r"\b%s\b" % re.escape(tbl_name), | |
tbl_name + "_migration_new", | |
create_table_sql) | |
self.log_execute( | |
"Columns change: Create table %s with updated schema" % | |
tbl_name, create_table_sql) | |
cols = set([ | |
x[1] for x in self.db.execute( | |
"PRAGMA table_info(%s)" % tbl_name)]) | |
pristine_cols = set([ | |
x[1] for x in | |
self.pristine.execute("PRAGMA table_info(%s)" % tbl_name)]) | |
removed_columns = cols - pristine_cols | |
if not self.allow_deletions and removed_columns: | |
logging.warning( | |
"Database migration: Refusing to remove columns %r from " | |
"table %s. Current cols are %r attempting migration to %r", | |
removed_columns, tbl_name, cols, pristine_cols) | |
raise RuntimeError( | |
"Database migration: Refusing to remove columns %r from " | |
"table %s" % (removed_columns, tbl_name)) | |
logging.info("cols: %s, pristine_cols: %s", cols, pristine_cols) | |
self.log_execute( | |
"Migrate data for table %s" % tbl_name, """\ | |
INSERT INTO {tbl_name}_migration_new ({common}) | |
SELECT {common} FROM {tbl_name}""".format( | |
tbl_name=tbl_name, | |
common=", ".join(cols.intersection(pristine_cols)))) | |
# Don't need the old table any more | |
self.log_execute( | |
"Drop old table %s now data has been migrated" % tbl_name, | |
"DROP TABLE %s" % tbl_name) | |
self.log_execute( | |
"Columns change: Move new table %s over old" % tbl_name, | |
"ALTER TABLE %s_migration_new RENAME TO %s" % ( | |
tbl_name, tbl_name)) | |
# Migrate the indices | |
indices = dict(self.db.execute("""\ | |
SELECT name, sql FROM sqlite_master | |
WHERE type = \"index\"""").fetchall()) | |
for name in set(indices.keys()) - set(pristine_indices.keys()): | |
self.log_execute("Dropping obsolete index %s" % name, | |
"DROP INDEX %s" % name) | |
for name, sql in pristine_indices.items(): | |
if name not in indices: | |
self.log_execute("Creating new index %s" % name, sql) | |
elif sql != indices[name]: | |
self.log_execute( | |
"Index %s changed: Dropping old version" % name, | |
"DROP INDEX %s" % name) | |
self.log_execute( | |
"Index %s changed: Creating updated version in its place" % | |
name, sql) | |
self._migrate_pragma('user_version') | |
if self.pristine.execute("PRAGMA foreign_keys").fetchone()[0]: | |
if self.db.execute("PRAGMA foreign_key_check").fetchall(): | |
raise RuntimeError( | |
"Database migration: Would fail foreign_key_check") | |
def _migrate_pragma(self, pragma): | |
pristine_val = self.pristine.execute( | |
"PRAGMA %s" % pragma).fetchone()[0] | |
val = self.db.execute("PRAGMA %s" % pragma).fetchone()[0] | |
if val != pristine_val: | |
self.log_execute( | |
"Set %s to %i from %i" % (pragma, pristine_val, val), | |
"PRAGMA %s = %i" % (pragma, pristine_val)) | |
return pristine_val | |
def _left_pad(text, indent=" "): | |
"""Maybe I can find a package in pypi for this?""" | |
return "\n".join(indent + line for line in text.split('\n')) | |
def normalise_sql(sql): | |
# Remove comments: | |
sql = re.sub(r'--[^\n]*\n', "", sql) | |
# Normalise whitespace: | |
sql = re.sub(r'\s+', " ", sql) | |
sql = re.sub(r" *([(),]) *", r"\1", sql) | |
# Remove unnecessary quotes | |
sql = re.sub(r'"(\w+)"', r"\1", sql) | |
return sql.strip() | |
def test_normalise_sql(): | |
assert normalise_sql("""\ | |
CREATE TABLE "Node"( -- This is my table | |
-- There are many like it but this one is mine | |
A b, C D, "E F G", h)""") == \ | |
'CREATE TABLE Node(A b,C D,"E F G",h)' |
# coding: utf-8 | |
"""Unit tests for "Simple declarative schema migration for SQLite". | |
See <https://david.rothlis.net/declarative-schema-migration-for-sqlite>. | |
Author: William Manley <[email protected]>. | |
Copyright © 2019-2022 Stb-tester.com Ltd. | |
License: MIT. | |
""" | |
import logging | |
import sqlite3 | |
import pytest | |
from migrator import dumb_migrate_db, normalise_sql | |
_TEST_SCHEMAS = [ | |
# 0 | |
"", | |
# 1 | |
"""\ | |
CREATE TABLE Node( | |
node_oid INTEGER PRIMARY KEY NOT NULL, | |
node_id INTEGER NOT NULL); | |
CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
""", | |
# 2 | |
# Added Node.active | |
# Changed node_id type from INTEGER to TEXT | |
# New table Job | |
"""\ | |
PRAGMA foreign_keys = 1; | |
CREATE TABLE Node( | |
node_oid INTEGER PRIMARY KEY NOT NULL, | |
node_id TEXT NOT NULL, | |
active BOOLEAN NOT NULL DEFAULT(1), | |
something_else TEXT); | |
CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
CREATE TABLE Job( | |
node_oid INTEGER NOT NULL, | |
id INTEGER NOT NULL, | |
FOREIGN KEY(node_oid) REFERENCES Node(node_oid)); | |
CREATE UNIQUE INDEX Job_node_oid on Job(node_oid, id); | |
""", | |
# 3 | |
# Remove field something_else. Note: this is significant because | |
# Job.node_oid references table Node which must be recreated. | |
"""\ | |
PRAGMA foreign_keys = 1; | |
CREATE TABLE Node( | |
node_oid INTEGER PRIMARY KEY NOT NULL, | |
node_id TEXT NOT NULL, | |
active BOOLEAN NOT NULL DEFAULT(1)); | |
CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
CREATE TABLE Job( | |
node_oid INTEGER NOT NULL, | |
id INTEGER NOT NULL, | |
FOREIGN KEY(node_oid) REFERENCES Node(node_oid)); | |
CREATE UNIQUE INDEX Job_node_oid on Job(node_oid, id); | |
""", | |
# 4 | |
# Change index Node_node_id field | |
# Delete index Job_node_id | |
# Set user_version = 6 | |
"""\ | |
PRAGMA foreign_keys = 1; | |
CREATE TABLE Node( | |
node_oid INTEGER PRIMARY KEY NOT NULL, | |
node_id TEXT NOT NULL, | |
active BOOLEAN NOT NULL DEFAULT(1)); | |
CREATE UNIQUE INDEX Node_node_id on Node(node_oid); | |
CREATE TABLE Job( | |
node_oid INTEGER NOT NULL, | |
id INTEGER NOT NULL, | |
FOREIGN KEY(node_oid) REFERENCES Node(node_oid)); | |
CREATE UNIQUE INDEX Job_node_oid on Job(node_oid, id); | |
PRAGMA user_version = 6; | |
""", | |
# 5 | |
# (vs. schema[1]) - Change Node.active default from 1 to 2 | |
"""\ | |
CREATE TABLE Node( | |
node_oid INTEGER PRIMARY KEY NOT NULL, | |
node_id TEXT NOT NULL, | |
active BOOLEAN NOT NULL DEFAULT(2)); | |
CREATE UNIQUE INDEX Node_node_id on Node(node_id); | |
""" | |
] | |
def test_dumb_db_migration_schema_migration(): | |
db = None | |
def dump_sqlite_master(db): | |
out = [] | |
for type_, name, tbl_name, sql in db.execute( | |
"SELECT type, name, tbl_name, sql FROM sqlite_master"): | |
out.append({ | |
"type": type_, | |
"name": name, | |
"tbl_name": tbl_name, | |
"sql": normalise_sql(sql), | |
}) | |
out.sort(key=lambda x: x['name']) | |
return out | |
def assert_schema_equal(schema): | |
pristine = sqlite3.connect(':memory:') | |
pristine.executescript(schema) | |
assert dump_sqlite_master(pristine) == dump_sqlite_master(db) | |
pristine_sql = "\n".join(sorted( | |
normalise_sql(x) for x in pristine.iterdump())) | |
db_sql = "\n".join(sorted(normalise_sql(x) for x in db.iterdump())) | |
assert pristine_sql == db_sql | |
for pragma in ["user_version", "foreign_keys"]: | |
assert pristine.execute("PRAGMA %s" % pragma).fetchone()[0] == \ | |
db.execute("PRAGMA %s" % pragma).fetchone()[0], \ | |
"Value for PRAGMA %s does not match" % pragma | |
db = sqlite3.connect(':memory:', isolation_level=None) | |
assert_schema_equal(_TEST_SCHEMAS[0]) | |
combos = [ | |
# from, to, need_allow_deletions | |
(0, 0, False), | |
(0, 1, False), | |
(0, 2, False), | |
(0, 3, False), | |
(0, 4, False), | |
(1, 0, True), | |
(1, 1, False), | |
(1, 2, False), | |
(1, 3, False), | |
(1, 4, False), | |
(2, 0, True), | |
(2, 1, True), | |
(2, 2, False), | |
(2, 3, True), | |
(2, 4, True), | |
(3, 0, True), | |
(3, 1, True), | |
(3, 2, False), | |
(3, 3, False), | |
(3, 4, False), | |
] | |
for from_, to, need_allow_deletions in combos: | |
db = sqlite3.connect(':memory:', isolation_level=None) | |
logging.info("Testing from %s to %s", from_, to) | |
db.executescript(_TEST_SCHEMAS[from_]) | |
if need_allow_deletions: | |
with pytest.raises(RuntimeError): | |
dumb_migrate_db(db, _TEST_SCHEMAS[to]) | |
# The transaction should make the RuntimeError above revert any work | |
# in progress | |
assert_schema_equal(_TEST_SCHEMAS[from_]) | |
changed = dumb_migrate_db( | |
db, _TEST_SCHEMAS[to], allow_deletions=need_allow_deletions) | |
assert changed == (from_ != to) | |
assert_schema_equal(_TEST_SCHEMAS[to]) | |
assert not dumb_migrate_db(db, _TEST_SCHEMAS[to]) | |
def test_dumb_db_migration_data_migration(): | |
# Check that data is preserved during the migration: | |
db = sqlite3.connect(':memory:', isolation_level=None) | |
db.executescript(_TEST_SCHEMAS[1]) | |
db.executemany("""\ | |
INSERT INTO Node(node_oid, node_id) | |
VALUES (?, ?)""", [ | |
(0, 0), | |
(1, 100), | |
]) | |
assert db.execute("SELECT node_oid, node_id FROM Node").fetchall() == [ | |
(0, 0), | |
(1, 100), | |
] | |
dumb_migrate_db(db, _TEST_SCHEMAS[2]) | |
assert db.execute( | |
"SELECT node_oid, node_id, active FROM Node").fetchall() == [ | |
(0, "0", 1), | |
(1, "100", 1), | |
] | |
db.execute("UPDATE Node SET active = 0, node_id = \"abc\" " | |
"WHERE node_oid == 0") | |
# Insert Job data. It has a FOREIGN KEY back into Node. We want to be sure | |
# that this FOREIGN KEY isn't confused by the migration | |
db.executemany("""\ | |
INSERT INTO Job(node_oid, id) | |
VALUES (?, ?)""", [ | |
(0, 1234), | |
(0, 5432), | |
(1, 1234), | |
(1, 9876), | |
]) | |
assert db.execute("""\ | |
SELECT node_id, id | |
FROM Job | |
INNER JOIN Node ON Node.node_oid == Job.node_oid""").fetchall() == [ | |
("abc", 1234), | |
("abc", 5432), | |
("100", 1234), | |
("100", 9876), | |
] | |
dumb_migrate_db(db, _TEST_SCHEMAS[3], allow_deletions=True) | |
assert db.execute("""\ | |
SELECT node_id, id | |
FROM Job | |
INNER JOIN Node ON Node.node_oid == Job.node_oid""").fetchall() == [ | |
("abc", 1234), | |
("abc", 5432), | |
("100", 1234), | |
("100", 9876), | |
] | |
# The new default for active should not affect existing rows with defaulted | |
# values: | |
dumb_migrate_db(db, _TEST_SCHEMAS[4]) | |
assert db.execute( | |
"SELECT node_oid, node_id, active FROM Node").fetchall() == [ | |
(0, "abc", 0), | |
(1, "100", 1), | |
] | |
db.execute("UPDATE Node SET active = 0, node_id = \"0\" " | |
"WHERE node_oid == 0") | |
# And delete the active column again removing the data: | |
dumb_migrate_db(db, _TEST_SCHEMAS[1], allow_deletions=True) | |
assert db.execute( | |
"SELECT node_oid, node_id FROM Node").fetchall() == [ | |
(0, 0), | |
(1, 100), | |
] |