Last active
August 22, 2024 21:16
-
-
Save jschaf/cd1c1a3c2a5897282929ee5e16f944fa to your computer and use it in GitHub Desktop.
Postgres audit tables with uni-temporal tables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- create_temporal_past_table creates a new table with the same structure | |
-- as the current table. Adds triggers to copy all changed or deleted rows | |
-- from the current table to the past table. | |
CREATE PROCEDURE admin.create_temporal_past_table(curr_tbl regclass, past_tbl text) AS $fn$ | |
DECLARE | |
curr_tbl_qual text := simc.quote_regclass(curr_tbl); | |
past_tbl_schema text := (parse_ident(past_tbl))[1]; | |
past_tbl_name text := (parse_ident(past_tbl))[2]; | |
past_tbl_qual text := quote_ident(past_tbl_schema) || '.' || quote_ident(past_tbl_name); | |
-- Primary key to use for the past_tbl. | |
pk_cols text; | |
-- Columns of an EXCLUDE constraint. | |
-- https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-EXCLUDE | |
excl_cols text; | |
BEGIN | |
-- Build the primary key and exclusion constraint for the past_tbl. | |
-- The primary key is the same as curr_table with asr appended. | |
-- The exclusion constraint verifies that the asr columns never overlap for | |
-- the same primary key. | |
WITH keys AS ( | |
SELECT | |
array_agg(a.attname ORDER BY array_position(i.indkey, a.attnum)) as pk | |
FROM pg_index i | |
JOIN pg_attribute a ON i.indrelid = a.attrelid | |
WHERE indrelid = curr_tbl | |
AND a.attnum = ANY (i.indkey) | |
AND i.indisprimary | |
GROUP BY a.attrelid | |
) | |
SELECT | |
array_to_string(pk, ', ') || ', asr', | |
array_to_string(pk, ' WITH =, ') || ' WITH =, asr WITH &&' | |
FROM keys | |
INTO pk_cols, excl_cols; | |
IF pk_cols IS NULL OR length(pk_cols) = 0 THEN | |
RAISE EXCEPTION 'table % must have a primary key when calling admin.create_temporal_past_table', curr_tbl_qual; | |
END IF; | |
-- Create the past table. The upper bound of every row in the past_tbl is set, | |
-- meaning: temporal.is_current(asr) = false. | |
EXECUTE format($$ | |
CREATE TABLE %s ( | |
LIKE %s, | |
CHECK ( NOT temporal.is_current(asr) ), | |
PRIMARY KEY (%s), | |
EXCLUDE USING gist (%s) | |
); | |
$$, past_tbl_qual, curr_tbl_qual, pk_cols, excl_cols, curr_tbl_qual); | |
-- Statement-level triggers on update and delete that copy the before image of | |
-- all modified rows from curr_tbl into past_tbl. The trigger sets upper(asr) | |
-- to now() on each row copied into the past_tbl. | |
EXECUTE format($$ | |
CREATE TRIGGER %s_asr_hist_upd_tg | |
AFTER UPDATE | |
ON %s | |
REFERENCING OLD TABLE AS modified_rows | |
FOR EACH STATEMENT | |
EXECUTE FUNCTION temporal.asserted_history_trigger(); | |
$$, simc.regclass_table(curr_tbl), curr_tbl_qual); | |
EXECUTE format($$ | |
CREATE TRIGGER %s_asr_hist_del_tg | |
AFTER DELETE | |
ON %s | |
REFERENCING OLD TABLE AS modified_rows | |
FOR EACH STATEMENT | |
EXECUTE FUNCTION temporal.asserted_history_trigger(); | |
$$, simc.regclass_table(curr_tbl), curr_tbl); | |
-- Row-level trigger on curr_tbl to set the asr column. | |
EXECUTE format($$ | |
CREATE TRIGGER %s_asr_time_tg | |
BEFORE UPDATE OR INSERT | |
ON %s | |
FOR EACH ROW | |
EXECUTE FUNCTION temporal.asserted_time_trigger(); | |
$$, simc.regclass_table(curr_tbl), curr_tbl); | |
INSERT INTO temporal.asserted_table (tbl, current_part, past_part, version) | |
VALUES (null, curr_tbl, past_tbl, 'v202309'); | |
END; | |
$fn$ LANGUAGE plpgsql; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Returns all columns that are not aligned in asserted versioned tables using | |
-- the v202309 format. | |
-- name: FindMisalignedColumnsV202309 :many | |
WITH | |
-- Find all asserted version tables with the matching version. | |
parts AS ( | |
SELECT | |
simc.quote_regclass(tat.current_part) as key, | |
simc.regclass_schema(tat.current_part) as curr_schema, | |
simc.regclass_table(tat.current_part) as curr_table, | |
simc.regclass_schema(tat.past_part) as past_schema, | |
simc.regclass_table(tat.past_part) as past_table | |
FROM temporal.asserted_table tat | |
WHERE tat.version = 'v202309' | |
), | |
curr_cols AS ( | |
SELECT | |
p.key, | |
isc.table_schema, | |
isc.table_name, | |
isc.column_name, | |
coalesce(isc.domain_name, isc.udt_name, isc.data_type) AS column_type, | |
coalesce(isc.domain_schema, isc.udt_schema, 'public') AS column_type_schema | |
FROM parts p | |
JOIN information_schema.columns isc | |
ON (p.curr_schema, p.curr_table) = (isc.table_schema, isc.table_name) | |
), | |
past_cols AS ( | |
SELECT | |
p.key, | |
isc.table_schema, | |
isc.table_name, | |
isc.column_name, | |
coalesce(isc.domain_name, isc.udt_name, isc.data_type) AS column_type, | |
coalesce(isc.domain_schema, isc.udt_schema, 'public') AS column_type_schema | |
FROM parts p | |
JOIN information_schema.columns isc | |
ON (p.past_schema, p.past_table) = (isc.table_schema, isc.table_name) | |
) | |
-- Find columns in the current or past table that don't match each other | |
SELECT | |
cc.key, | |
format('misaligned column "%s" for %s.%s', | |
column_name, | |
coalesce(cc.table_schema, pc.table_schema), | |
coalesce(cc.table_name, pc.table_name) | |
) AS message | |
FROM curr_cols cc | |
FULL OUTER JOIN past_cols pc USING (key, column_name, column_type, column_type_schema) | |
WHERE pc.key IS NULL | |
OR cc.key IS NULL; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Returns a quoted, schema-qualified table name from regclass. | |
CREATE FUNCTION simc.quote_regclass(tbl regclass) RETURNS text AS $$ | |
SELECT quote_ident(nspname) || '.' || quote_ident(relname) | |
FROM pg_catalog.pg_class AS c | |
JOIN pg_catalog.pg_namespace AS ns | |
ON c.relnamespace = ns.oid | |
WHERE c.oid = tbl::oid; | |
$$ LANGUAGE sql VOLATILE; | |
CREATE FUNCTION simc.regclass_table(tbl regclass) RETURNS text AS $$ | |
SELECT relname | |
FROM pg_catalog.pg_class AS c | |
WHERE c.oid = tbl::oid; | |
$$ LANGUAGE sql VOLATILE; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- temporal.period is a non-null tstzrange type that's not in the future and | |
-- defaults to [now(), null). | |
CREATE DOMAIN temporal.period AS tstzrange NOT NULL | |
-- We don't support "deferred transactions": rows that we'll assert to be | |
-- true in the future. The benefit of this constraint is that all current rows | |
-- have an unbounded (null) upper bound. Allow an overlap of 100 microseconds | |
-- to support a row updated up to 100 times in a single transaction. The | |
-- temporal trigger ensures each update asr period is not-empty, expanding | |
-- to a minimum of 1 microsecond. | |
CONSTRAINT period_starts_before_now CHECK ( lower(value) <= now() + '100 microseconds'::interval ) | |
-- I don't think we'll ever need a period with an unbounded lower bound. | |
-- If you hit this constraint, it might be that you're inserting and updating | |
-- the same value in a single transaction and it's likely a bug in | |
-- add_asserted_history. The now() timestamp is consistent throughout a | |
-- transaction. A tstzrange where lower == upper is empty and uses null. | |
CONSTRAINT period_lower_not_null CHECK ( lower(value) IS NOT NULL) | |
CONSTRAINT period_lower_not_neg_infinity CHECK ( lower(value) != '-infinity') | |
-- Confusingly, Postgres also supports a value 'infinity' for the upper | |
-- bound but upper_inf returns false on tstzrange(now(), 'infinity'). To avoid | |
-- this confusion, we'll require null as an upper bound to indicate unbounded | |
-- instead of 'infinity'. https://stackoverflow.com/a/67612325/30900. | |
CONSTRAINT period_upper_not_infinity_use_null CHECK ( upper(value) != 'infinity') | |
DEFAULT tstzrange(now(), NULL); | |
-- temporal.is_current returns true if the period is current, meaning it | |
-- includes now(). We use this as a more semantically meaningful version of the | |
-- Postgres builtin upper_inf which checks if the upper bound is null, meaning | |
-- unbounded. | |
CREATE FUNCTION temporal.is_current(p temporal.period) RETURNS boolean AS $$ | |
-- We enforce using null but check infinity just in case. | |
SELECT upper_inf(p) OR upper(p) = 'infinity'; | |
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE; | |
-- _pad_now_within_txn returns the smallest timestamp t such that prev < t AND | |
-- now() <= t. In most cases, t is now(). Handles updating a row multiple times | |
-- in the same transaction, ensuring that each update has a temporal.period of | |
-- at least 1 microsecond. A microsecond is the smallest granularity of a | |
-- Postgres timestamp. Postgres represents all empty time ranges with the same | |
-- value, losing the start time information in the process. | |
CREATE FUNCTION temporal._pad_now_within_txn(prev timestamptz) RETURNS timestamptz AS $$ | |
SELECT CASE WHEN prev >= now() THEN prev + '1 microsecond'::interval ELSE now() END; | |
$$ LANGUAGE sql STABLE PARALLEL SAFE; | |
-- asserted_history_trigger is a statement-level trigger that copies the before | |
-- image of updated or deleted rows from the current partition into the past | |
-- partition. The trigger updates the copied rows by setting the upper bound | |
-- of the asserted period to now(). Correspondingly, the asserted_time_trigger | |
-- will set the lower bound of the modified rows in the current partition to | |
-- now so that the timeline of a row is contiguous. | |
CREATE FUNCTION temporal.asserted_history_trigger() RETURNS trigger AS $fn$ | |
DECLARE | |
curr_tbl text := tg_table_name; | |
past_tbl text := CASE | |
WHEN curr_tbl LIKE '%_current' | |
-- v202107: rewrite public.invoice_current to public.invoice_past. | |
THEN format('%I.%I', tg_table_schema, replace(curr_tbl, '_current', '_past')) | |
ELSE | |
-- v202309: rewrite erp.invoice to erppast.invoice | |
format('%I.%I', tg_table_schema || 'past', curr_tbl) | |
END; | |
ins_cols text; | |
BEGIN | |
-- For INSERT, we don't need to do anything. If the INSERT statement has | |
-- an ON CONFLICT DO UPDATE, Postgres will fire this trigger as an update. | |
-- https://www.postgresql.org/docs/13/trigger-definition.html#TRIGGER-DEFINITION | |
IF tg_op != 'UPDATE' AND tg_op != 'DELETE' THEN | |
RAISE EXCEPTION 'unsupported tg_op % in asserted_history_trigger', tg_op; | |
END IF; | |
-- Find all the non-generated columns we'll insert. | |
-- Also exclude asr since we adjust the upper bound. | |
EXECUTE format($$ | |
SELECT | |
string_agg(attname, ', ') as str | |
FROM pg_attribute | |
WHERE attrelid = %L::regclass::oid | |
AND attnum > 0 | |
AND NOT attisdropped | |
AND attgenerated = '' | |
AND attname != 'asr' | |
GROUP BY attrelid; | |
$$, past_tbl) INTO STRICT ins_cols; | |
IF ins_cols = '' || ins_cols IS NULL THEN | |
RAISE EXCEPTION 'columns to insert not found for table %', past_tbl; | |
END IF; | |
-- Insert all modified rows. Adjust the upper bound of the asr to now() since | |
-- each row has been deleted or superseded. | |
EXECUTE format($$ | |
INSERT INTO %s (%s, asr) | |
SELECT %s, temporal.period( | |
lower(asr), | |
temporal._pad_now_within_txn(lower(asr)) | |
) | |
FROM modified_rows; | |
$$, past_tbl, ins_cols, ins_cols); | |
RETURN NULL; -- per-statement triggers should always return null | |
END; | |
$fn$ LANGUAGE plpgsql; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment