Skip to content

Instantly share code, notes, and snippets.

@GitTom
Created July 31, 2024 20:14
Show Gist options
  • Save GitTom/e178a61aa6b2b6e5ad92f15a36f4eb2e to your computer and use it in GitHub Desktop.
Save GitTom/e178a61aa6b2b6e5ad92f15a36f4eb2e to your computer and use it in GitHub Desktop.
Variation of supabase's supa_audit PostgreSQL extension - adds 'changes' column to highlight changes made by UPDATE operations.
-- SPDX-License-Identifier: Apache-2.0
/*
Generic Audit Trigger
Linear Time Record Version History
Date:
2022-02-03
Purpose:
Generic audit history for tables including an indentifier
to enable indexed linear time lookup of a primary key's version history
2024-07 Created this version which also highlights changes made by UPDATE ops.
Started from
https://github.com/supabase/supa_audit/blob/main/supa_audit--0.3.1.sql
Updated mainly to add 'changes' column to record_version table.
I will add an issue to that repo more fully describing this.
*/
-- Activate extension
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Namespace to "audit"
CREATE SCHEMA IF NOT EXISTS audit;
-- Create enum type for SQL operations to reduce disk/memory usage vs text
DO $$
BEGIN
IF NOT EXISTS(
SELECT
1
FROM
pg_type
WHERE
typname = 'operation'
AND typnamespace = 'audit'::regnamespace) THEN
CREATE TYPE audit.operation AS ENUM(
'INSERT',
'UPDATE',
'DELETE',
'TRUNCATE'
);
END IF;
END
$$;
-- Create the record_version table if it doesn't exist
CREATE TABLE IF NOT EXISTS audit.record_version(
-- unique auto-incrementing id
id bigserial PRIMARY KEY,
-- uniquely identifies a record by primary key [primary key + table_oid]
record_id uuid,
-- uniquely identifies a record before update/delete
old_record_id uuid,
-- INSERT/UPDATE/DELETE/TRUNCATE/SNAPSHOT
op audit.operation NOT NULL,
ts timestamptz NOT NULL DEFAULT (now()),
table_oid oid NOT NULL,
table_schema name NOT NULL,
table_name name NOT NULL,
-- contents of the record
record jsonb,
-- previous record contents for UPDATE/DELETE
old_record jsonb,
-- changes made during UPDATE
changes jsonb,
-- at least one of record_id or old_record_id is populated, except for truncates
CHECK (coalesce(record_id, old_record_id) IS NOT NULL OR op = 'TRUNCATE'),
-- record_id must be populated for insert and update
CHECK (op IN ('INSERT', 'UPDATE') =(record_id IS NOT NULL)),
CHECK (op IN ('INSERT', 'UPDATE') =(record IS NOT NULL)),
-- old_record must be populated for update and delete
CHECK (op IN ('UPDATE', 'DELETE') =(old_record_id IS NOT NULL)),
CHECK (op IN ('UPDATE', 'DELETE') =(old_record IS NOT NULL))
);
-- GitTom 2024-07-28 comment this out because not part of a formal extension
-- mark the table as configuration data so it's included in database dumps and can be backed up
-- select pg_catalog.pg_extension_config_dump('audit.record_version', '');
-- select pg_catalog.pg_extension_config_dump('audit.record_version_id_seq', '');
DO $$
BEGIN
-- Detect if we're in a supabase project
-- Ensure `auth.uid() -> uuid` and `auth.role() -> text` exist
IF(
SELECT
count(DISTINCT f.proname) = 2
FROM
pg_proc f
JOIN pg_namespace nsp ON f.pronamespace = nsp.oid
JOIN pg_type pt ON f.prorettype = pt.oid
WHERE(nsp.nspname, f.proname, pt.typname) IN(('auth', 'uid', 'uuid'),('auth', 'role', 'text')) AND f.pronargs = 0) THEN
ALTER TABLE audit.record_version
ADD COLUMN auth_uid uuid DEFAULT(auth.uid());
ALTER TABLE audit.record_version
ADD COLUMN auth_role text DEFAULT(auth.role());
END IF;
END
$$;
CREATE INDEX record_version_record_id ON audit.record_version(record_id)
WHERE
record_id IS NOT NULL;
CREATE INDEX record_version_old_record_id ON audit.record_version(old_record_id)
WHERE
old_record_id IS NOT NULL;
CREATE INDEX record_version_ts ON audit.record_version USING brin(ts);
CREATE INDEX record_version_table_oid ON audit.record_version(table_oid);
CREATE OR REPLACE FUNCTION audit.primary_key_columns(entity_oid oid)
RETURNS text[] STABLE
SECURITY DEFINER
SET search_path = ''
LANGUAGE sql
AS $$
-- Looks up the names of a table's primary key columns
SELECT
coalesce(array_agg(pa.attname::text ORDER BY pa.attnum), ARRAY[]::text[]) column_names
FROM
pg_index pi
JOIN pg_attribute pa ON pi.indrelid = pa.attrelid
AND pa.attnum = ANY(pi.indkey)
WHERE
indrelid = $1
AND indisprimary
$$;
CREATE OR REPLACE FUNCTION audit.to_record_id(entity_oid oid, pkey_cols text[], rec jsonb)
RETURNS uuid STABLE
LANGUAGE sql
AS $$
SELECT
CASE WHEN rec IS NULL THEN
NULL
WHEN pkey_cols = ARRAY[]::text[] THEN
uuid_generate_v4()
ELSE
(
SELECT
uuid_generate_v5('fd62bc3d-8d6e-43c2-919c-802ba3762271',(jsonb_build_array(to_jsonb($1)) || jsonb_agg($3 ->> key_))::text)
FROM
unnest($2) x(key_))
END
$$;
-- First, create the compare_json function
CREATE OR REPLACE FUNCTION audit.compare_json(new_val jsonb, old_val jsonb)
RETURNS jsonb AS $$
DECLARE
result jsonb := '{}'::jsonb;
k text;
v jsonb;
BEGIN
IF jsonb_typeof(new_val) != jsonb_typeof(old_val) THEN
RETURN new_val;
ELSIF jsonb_typeof(new_val) = 'object' THEN
FOR k, v IN SELECT * FROM jsonb_each(new_val)
LOOP
IF old_val ? k THEN
v := audit.compare_json(v, old_val->k);
IF v IS NOT NULL THEN
result := result || jsonb_build_object(k, v);
END IF;
ELSE
result := result || jsonb_build_object(k, v);
END IF;
END LOOP;
RETURN CASE WHEN result = '{}'::jsonb THEN NULL ELSE result END;
ELSIF jsonb_typeof(new_val) = 'array' THEN
IF new_val != old_val THEN
RETURN new_val;
ELSE
RETURN NULL;
END IF;
ELSIF new_val != old_val THEN
RETURN new_val;
ELSE
RETURN NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
-- Now, update the insert_update_delete_trigger function
CREATE OR REPLACE FUNCTION audit.insert_update_delete_trigger()
RETURNS TRIGGER
SECURITY DEFINER
LANGUAGE plpgsql
AS $$
DECLARE
pkey_cols text[] = audit.primary_key_columns(TG_RELID);
record_jsonb jsonb = to_jsonb(NEW);
record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, record_jsonb);
old_record_jsonb jsonb = to_jsonb(OLD);
old_record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb);
changes_jsonb jsonb;
BEGIN
IF TG_OP = 'UPDATE' THEN
-- Calculate the difference between old and new records
SELECT jsonb_object_agg(key, audit.compare_json(record_jsonb->key, old_record_jsonb->key))
INTO changes_jsonb
FROM jsonb_each(record_jsonb)
WHERE key != 'updated_at' -- Ignore 'updated_at' column
AND (record_jsonb->key IS DISTINCT FROM old_record_jsonb->key);
-- Remove null values from changes_jsonb
changes_jsonb := (SELECT jsonb_object_agg(key, value)
FROM jsonb_each(changes_jsonb)
WHERE value IS NOT NULL);
END IF;
INSERT INTO audit.record_version(record_id, old_record_id, op, table_oid, table_schema, table_name, record, old_record, changes)
SELECT
record_id,
old_record_id,
TG_OP::audit.operation,
TG_RELID,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
record_jsonb,
old_record_jsonb,
changes_jsonb;
RETURN COALESCE(NEW, OLD);
END;
$$;
CREATE OR REPLACE FUNCTION audit.truncate_trigger()
RETURNS TRIGGER
SECURITY DEFINER
SET search_path = ''
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO audit.record_version(op, table_oid, table_schema, table_name)
SELECT
TG_OP::audit.operation,
TG_RELID,
TG_TABLE_SCHEMA,
TG_TABLE_NAME;
RETURN coalesce(old, new);
END;
$$;
CREATE OR REPLACE FUNCTION audit.enable_tracking(regclass)
RETURNS void VOLATILE
SECURITY DEFINER
SET search_path = ''
LANGUAGE plpgsql
AS $$
DECLARE
statement_row text = format('
create trigger audit_i_u_d
after insert or update or delete
on %s
for each row
execute procedure audit.insert_update_delete_trigger();', $1);
statement_stmt text = format('
create trigger audit_t
after truncate
on %s
for each statement
execute procedure audit.truncate_trigger();', $1);
pkey_cols text[] = audit.primary_key_columns($1);
BEGIN
IF pkey_cols = ARRAY[]::text[] THEN
RAISE EXCEPTION 'Table % can not be audited because it has no primary key', $1;
END IF;
IF NOT EXISTS (
SELECT
1
FROM
pg_trigger
WHERE
tgrelid = $1
AND tgname = 'audit_i_u_d') THEN
EXECUTE statement_row;
END IF;
IF NOT EXISTS (
SELECT
1
FROM
pg_trigger
WHERE
tgrelid = $1
AND tgname = 'audit_t') THEN
EXECUTE statement_stmt;
END IF;
END;
$$;
CREATE OR REPLACE FUNCTION audit.disable_tracking(regclass)
RETURNS void VOLATILE
SECURITY DEFINER
SET search_path = ''
LANGUAGE plpgsql
AS $$
DECLARE
statement_row text = format('drop trigger if exists audit_i_u_d on %s;', $1);
statement_stmt text = format('drop trigger if exists audit_t on %s;', $1);
BEGIN
EXECUTE statement_row;
EXECUTE statement_stmt;
END;
$$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment