Skip to content

Instantly share code, notes, and snippets.

@adrianschneider94
Last active July 16, 2024 13:43
Show Gist options
  • Save adrianschneider94/1557e3c6052c1ecc6401cecb1a79bab4 to your computer and use it in GitHub Desktop.
Save adrianschneider94/1557e3c6052c1ecc6401cecb1a79bab4 to your computer and use it in GitHub Desktop.
Usage of postgresql-audit with alembic/alembic-utils
"""Added postgresql-audit
Revision ID: bb4c40ff8edd
Revises: 771a56f4f9ec
Create Date: 2024-05-08 13:03:23.581216
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = 'bb4c40ff8edd'
down_revision = '771a56f4f9ec'
branch_labels = None
depends_on = None
def upgrade():
# ... Autogenerated operations
op.execute("""
CREATE OPERATOR - (
LEFTARG = jsonb,
RIGHTARG = jsonb,
PROCEDURE = jsonb_subtract
);
""")
def downgrade():
op.execute("""
DROP OPERATOR IF EXISTS - (jsonb, jsonb);
""")
# ... Autogenerated operations
from alembic_utils.replaceable_entity import register_entities
from sqlalchemy.orm import configure_mappers
from .versioning_manager import versioning_manager
configure_mappers()
register_entities(
entities=versioning_manager.get_alembic_utils_objects()
)
import re
from alembic_utils.pg_function import PGFunction
from alembic_utils.pg_trigger import PGTrigger
from postgresql_audit import VersioningManager as BaseVersioningManager
def to_pg_functions(func: str) -> list[PGFunction]:
def revise_names(v: str):
# For some reason, 'default_value' leads to problems in Postgres or alembic-utils
v = v.replace("default_value", "def_value")
return v
funcs = []
pattern = re.compile(
r"^CREATE.*?(?:(?P<schema>\w+)\.)?(?P<signature>\w+\([^()]*\))\s*(?P<definition>[^$]+\${2}[^$]+\${2}[^;]*;)",
re.MULTILINE
)
for match in pattern.finditer(func):
schema = match.group("schema") or "public"
signature = revise_names(match.group("signature"))
definition = revise_names(match.group("definition"))
funcs.append(
PGFunction(
schema=schema,
signature=signature,
definition=definition
)
)
return funcs
class VersioningManager(BaseVersioningManager):
def get_alembic_utils_objects(self):
objects = []
# JSONB Change Key Name
jsonb_change_key_name_def = self.render_tmpl('jsonb_change_key_name.sql')
jsonb_change_key_name = to_pg_functions(jsonb_change_key_name_def)[0]
# A bug in alembic-utils escapes the colon which leads to migration statements in every autogenerate
jsonb_change_key_name.definition = jsonb_change_key_name.definition.replace(r"'\:'", "':'")
objects.append(jsonb_change_key_name)
# Operators
# Alembic utils can't build operators at the moment, so the operator has to be added manually
objects.extend(to_pg_functions(self.render_tmpl("operators.sql")))
# Create Activity
if self.use_statement_level_triggers:
objects.extend(to_pg_functions(self.render_tmpl('create_activity_stmt_level.sql')))
else:
objects.extend(to_pg_functions(self.render_tmpl('create_activity_row_level.sql')))
# Gather excluded fields
gathered_excludes_per_table = {}
for cls in self.pending_classes:
versioned = getattr(cls, "__versioned__", None) is not None
if not versioned:
continue
tables = cls._sa_class_manager.mapper.tables
excluded_field_names = cls.__versioned__.get("exclude", [])
map_excluded_field_to_table = {}
for field_name in excluded_field_names:
field = getattr(cls, field_name)
map_excluded_field_to_table[field_name] = field.table
excludes_per_table = {
table: [k for k, v in map_excluded_field_to_table.items() if v == table]
for table in tables
}
for table, excludes in excludes_per_table.items():
gathered_excludes_per_table[table] = list({*gathered_excludes_per_table.get(table, []), *excludes})
for table, excluded_fields in gathered_excludes_per_table.items():
excluded_fields = sorted(excluded_fields)
schema = table.schema or "public"
excluded_columns_text = ", ".join(f'"{k}"' for k in excluded_fields)
if self.use_statement_level_triggers:
insert_trigger = PGTrigger(
schema=schema,
signature='audit_trigger_insert',
on_entity=f"{schema}.{table.name}",
definition=f"""
AFTER INSERT ON {schema}.{table.name}
REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool)
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}');
"""
)
update_trigger = PGTrigger(
schema=schema,
signature='audit_trigger_update',
on_entity=f"{schema}.{table.name}",
definition=f"""
AFTER UPDATE ON {schema}.{table.name}
REFERENCING NEW TABLE AS new_table OLD TABLE AS old_table FOR EACH STATEMENT
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool)
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}');
"""
)
delete_trigger = PGTrigger(
schema=schema,
signature='audit_trigger_delete',
on_entity=f"{schema}.{table.name}",
definition=f"""
AFTER DELETE ON {schema}.{table.name}
REFERENCING OLD TABLE AS old_table FOR EACH STATEMENT
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool)
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}');
"""
)
objects.extend([insert_trigger, update_trigger, delete_trigger])
else:
row_trigger = PGTrigger(
schema=schema,
signature='audit_trigger_row',
on_entity=f"{schema}.{table.name}",
definition=f"""
AFTER INSERT OR UPDATE OR DELETE ON {schema}.{table.name}
FOR EACH ROW
WHEN (get_setting('postgresql_audit.enable_versioning', 'true')::bool)
EXECUTE PROCEDURE {self.schema_name}.create_activity('{{{excluded_columns_text}}}');
"""
)
objects.append(row_trigger)
return objects
versioning_manager = VersioningManager()
@kkarmeliuk4
Copy link

Replace {self.schema_name} with {schema}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment