Last active
July 16, 2024 13:43
-
-
Save adrianschneider94/1557e3c6052c1ecc6401cecb1a79bab4 to your computer and use it in GitHub Desktop.
Usage of postgresql-audit with alembic/alembic-utils
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Added postgresql-audit | |
Revision ID: bb4c40ff8edd | |
Revises: 771a56f4f9ec | |
Create Date: 2024-05-08 13:03:23.581216 | |
""" | |
from alembic import op | |
# revision identifiers, used by Alembic. | |
revision = 'bb4c40ff8edd' | |
down_revision = '771a56f4f9ec' | |
branch_labels = None | |
depends_on = None | |
def upgrade(): | |
# ... Autogenerated operations | |
op.execute(""" | |
CREATE OPERATOR - ( | |
LEFTARG = jsonb, | |
RIGHTARG = jsonb, | |
PROCEDURE = jsonb_subtract | |
); | |
""") | |
def downgrade(): | |
op.execute(""" | |
DROP OPERATOR IF EXISTS - (jsonb, jsonb); | |
""") | |
# ... Autogenerated operations |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from alembic_utils.replaceable_entity import register_entities | |
from sqlalchemy.orm import configure_mappers | |
from .versioning_manager import versioning_manager | |
configure_mappers() | |
register_entities( | |
entities=versioning_manager.get_alembic_utils_objects() | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from alembic_utils.pg_function import PGFunction | |
from alembic_utils.pg_trigger import PGTrigger | |
from postgresql_audit import VersioningManager as BaseVersioningManager | |
def to_pg_functions(func: str) -> list[PGFunction]: | |
def revise_names(v: str): | |
# For some reason, 'default_value' leads to problems in Postgres or alembic-utils | |
v = v.replace("default_value", "def_value") | |
return v | |
funcs = [] | |
pattern = re.compile( | |
r"^CREATE.*?(?:(?P<schema>\w+)\.)?(?P<signature>\w+\([^()]*\))\s*(?P<definition>[^$]+\${2}[^$]+\${2}[^;]*;)", | |
re.MULTILINE | |
) | |
for match in pattern.finditer(func): | |
schema = match.group("schema") or "public" | |
signature = revise_names(match.group("signature")) | |
definition = revise_names(match.group("definition")) | |
funcs.append( | |
PGFunction( | |
schema=schema, | |
signature=signature, | |
definition=definition | |
) | |
) | |
return funcs | |
class VersioningManager(BaseVersioningManager): | |
def get_alembic_utils_objects(self): | |
objects = [] | |
# JSONB Change Key Name | |
jsonb_change_key_name_def = self.render_tmpl('jsonb_change_key_name.sql') | |
jsonb_change_key_name = to_pg_functions(jsonb_change_key_name_def)[0] | |
# A bug in alembic-utils escapes the colon which leads to migration statements in every autogenerate | |
jsonb_change_key_name.definition = jsonb_change_key_name.definition.replace(r"'\:'", "':'") | |
objects.append(jsonb_change_key_name) | |
# Operators | |
# Alembic utils can't build operators at the moment, so the operator has to be added manually | |
objects.extend(to_pg_functions(self.render_tmpl("operators.sql"))) | |
# Create Activity | |
if self.use_statement_level_triggers: | |
objects.extend(to_pg_functions(self.render_tmpl('create_activity_stmt_level.sql'))) | |
else: | |
objects.extend(to_pg_functions(self.render_tmpl('create_activity_row_level.sql'))) | |
# Gather excluded fields | |
gathered_excludes_per_table = {} | |
for cls in self.pending_classes: | |
versioned = getattr(cls, "__versioned__", None) is not None | |
if not versioned: | |
continue | |
tables = cls._sa_class_manager.mapper.tables | |
excluded_field_names = cls.__versioned__.get("exclude", []) | |
map_excluded_field_to_table = {} | |
for field_name in excluded_field_names: | |
field = getattr(cls, field_name) | |
map_excluded_field_to_table[field_name] = field.table | |
excludes_per_table = { | |
table: [k for k, v in map_excluded_field_to_table.items() if v == table] | |
for table in tables | |
} | |
for table, excludes in excludes_per_table.items(): | |
gathered_excludes_per_table[table] = list({*gathered_excludes_per_table.get(table, []), *excludes}) | |
for table, excluded_fields in gathered_excludes_per_table.items(): | |
excluded_fields = sorted(excluded_fields) | |
schema = table.schema or "public" | |
excluded_columns_text = ", ".join(f'"{k}"' for k in excluded_fields) | |
if self.use_statement_level_triggers: | |
insert_trigger = PGTrigger( | |
schema=schema, | |
signature='audit_trigger_insert', | |
on_entity=f"{schema}.{table.name}", | |
definition=f""" | |
AFTER INSERT ON {schema}.{table.name} | |
REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT | |
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool) | |
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}'); | |
""" | |
) | |
update_trigger = PGTrigger( | |
schema=schema, | |
signature='audit_trigger_update', | |
on_entity=f"{schema}.{table.name}", | |
definition=f""" | |
AFTER UPDATE ON {schema}.{table.name} | |
REFERENCING NEW TABLE AS new_table OLD TABLE AS old_table FOR EACH STATEMENT | |
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool) | |
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}'); | |
""" | |
) | |
delete_trigger = PGTrigger( | |
schema=schema, | |
signature='audit_trigger_delete', | |
on_entity=f"{schema}.{table.name}", | |
definition=f""" | |
AFTER DELETE ON {schema}.{table.name} | |
REFERENCING OLD TABLE AS old_table FOR EACH STATEMENT | |
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool) | |
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}'); | |
""" | |
) | |
objects.extend([insert_trigger, update_trigger, delete_trigger]) | |
else: | |
row_trigger = PGTrigger( | |
schema=schema, | |
signature='audit_trigger_row', | |
on_entity=f"{schema}.{table.name}", | |
definition=f""" | |
AFTER INSERT OR UPDATE OR DELETE ON {schema}.{table.name} | |
FOR EACH ROW | |
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool) | |
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}'); | |
""" | |
) | |
objects.append(row_trigger) | |
return objects | |
versioning_manager = VersioningManager() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Replace
{self.schema_name}
with{schema}