Skip to content

Instantly share code, notes, and snippets.

@jschaf
Last active August 22, 2024 21:16
Show Gist options
  • Save jschaf/cd1c1a3c2a5897282929ee5e16f944fa to your computer and use it in GitHub Desktop.
Save jschaf/cd1c1a3c2a5897282929ee5e16f944fa to your computer and use it in GitHub Desktop.
Postgres audit tables with uni-temporal tables
-- create_temporal_past_table creates a new table with the same structure
-- as the current table. Adds triggers to copy all changed or deleted rows
-- from the current table to the past table.
CREATE PROCEDURE admin.create_temporal_past_table(curr_tbl regclass, past_tbl text) AS $fn$
DECLARE
curr_tbl_qual text := simc.quote_regclass(curr_tbl);
past_tbl_schema text := (parse_ident(past_tbl))[1];
past_tbl_name text := (parse_ident(past_tbl))[2];
past_tbl_qual text := quote_ident(past_tbl_schema) || '.' || quote_ident(past_tbl_name);
-- Primary key to use for the past_tbl.
pk_cols text;
-- Columns of an EXCLUDE constraint.
-- https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-EXCLUDE
excl_cols text;
BEGIN
-- Build the primary key and exclusion constraint for the past_tbl.
-- The primary key is the same as curr_table with asr appended.
-- The exclusion constraint verifies that the asr columns never overlap for
-- the same primary key.
WITH keys AS (
SELECT
array_agg(a.attname ORDER BY array_position(i.indkey, a.attnum)) as pk
FROM pg_index i
JOIN pg_attribute a ON i.indrelid = a.attrelid
WHERE indrelid = curr_tbl
AND a.attnum = ANY (i.indkey)
AND i.indisprimary
GROUP BY a.attrelid
)
SELECT
array_to_string(pk, ', ') || ', asr',
array_to_string(pk, ' WITH =, ') || ' WITH =, asr WITH &&'
FROM keys
INTO pk_cols, excl_cols;
IF pk_cols IS NULL OR length(pk_cols) = 0 THEN
RAISE EXCEPTION 'table % must have a primary key when calling admin.create_temporal_past_table', curr_tbl_qual;
END IF;
-- Create the past table. The upper bound of every row in the past_tbl is set,
-- meaning: temporal.is_current(asr) = false.
EXECUTE format($$
CREATE TABLE %s (
LIKE %s,
CHECK ( NOT temporal.is_current(asr) ),
PRIMARY KEY (%s),
EXCLUDE USING gist (%s)
);
$$, past_tbl_qual, curr_tbl_qual, pk_cols, excl_cols, curr_tbl_qual);
-- Statement-level triggers on update and delete that copy the before image of
-- all modified rows from curr_tbl into past_tbl. The trigger sets upper(asr)
-- to now() on each row copied into the past_tbl.
EXECUTE format($$
CREATE TRIGGER %s_asr_hist_upd_tg
AFTER UPDATE
ON %s
REFERENCING OLD TABLE AS modified_rows
FOR EACH STATEMENT
EXECUTE FUNCTION temporal.asserted_history_trigger();
$$, simc.regclass_table(curr_tbl), curr_tbl_qual);
EXECUTE format($$
CREATE TRIGGER %s_asr_hist_del_tg
AFTER DELETE
ON %s
REFERENCING OLD TABLE AS modified_rows
FOR EACH STATEMENT
EXECUTE FUNCTION temporal.asserted_history_trigger();
$$, simc.regclass_table(curr_tbl), curr_tbl);
-- Row-level trigger on curr_tbl to set the asr column.
EXECUTE format($$
CREATE TRIGGER %s_asr_time_tg
BEFORE UPDATE OR INSERT
ON %s
FOR EACH ROW
EXECUTE FUNCTION temporal.asserted_time_trigger();
$$, simc.regclass_table(curr_tbl), curr_tbl);
INSERT INTO temporal.asserted_table (tbl, current_part, past_part, version)
VALUES (null, curr_tbl, past_tbl, 'v202309');
END;
$fn$ LANGUAGE plpgsql;
-- Returns all columns that are not aligned in asserted versioned tables using
-- the v202309 format.
-- name: FindMisalignedColumnsV202309 :many
WITH
-- Find all asserted version tables with the matching version.
parts AS (
SELECT
simc.quote_regclass(tat.current_part) as key,
simc.regclass_schema(tat.current_part) as curr_schema,
simc.regclass_table(tat.current_part) as curr_table,
simc.regclass_schema(tat.past_part) as past_schema,
simc.regclass_table(tat.past_part) as past_table
FROM temporal.asserted_table tat
WHERE tat.version = 'v202309'
),
curr_cols AS (
SELECT
p.key,
isc.table_schema,
isc.table_name,
isc.column_name,
coalesce(isc.domain_name, isc.udt_name, isc.data_type) AS column_type,
coalesce(isc.domain_schema, isc.udt_schema, 'public') AS column_type_schema
FROM parts p
JOIN information_schema.columns isc
ON (p.curr_schema, p.curr_table) = (isc.table_schema, isc.table_name)
),
past_cols AS (
SELECT
p.key,
isc.table_schema,
isc.table_name,
isc.column_name,
coalesce(isc.domain_name, isc.udt_name, isc.data_type) AS column_type,
coalesce(isc.domain_schema, isc.udt_schema, 'public') AS column_type_schema
FROM parts p
JOIN information_schema.columns isc
ON (p.past_schema, p.past_table) = (isc.table_schema, isc.table_name)
)
-- Find columns in the current or past table that don't match each other
SELECT
cc.key,
format('misaligned column "%s" for %s.%s',
column_name,
coalesce(cc.table_schema, pc.table_schema),
coalesce(cc.table_name, pc.table_name)
) AS message
FROM curr_cols cc
FULL OUTER JOIN past_cols pc USING (key, column_name, column_type, column_type_schema)
WHERE pc.key IS NULL
OR cc.key IS NULL;
-- Returns a quoted, schema-qualified table name from regclass.
CREATE FUNCTION simc.quote_regclass(tbl regclass) RETURNS text AS $$
SELECT quote_ident(nspname) || '.' || quote_ident(relname)
FROM pg_catalog.pg_class AS c
JOIN pg_catalog.pg_namespace AS ns
ON c.relnamespace = ns.oid
WHERE c.oid = tbl::oid;
$$ LANGUAGE sql VOLATILE;
CREATE FUNCTION simc.regclass_table(tbl regclass) RETURNS text AS $$
SELECT relname
FROM pg_catalog.pg_class AS c
WHERE c.oid = tbl::oid;
$$ LANGUAGE sql VOLATILE;
-- temporal.period is a non-null tstzrange type that's not in the future and
-- defaults to [now(), null).
CREATE DOMAIN temporal.period AS tstzrange NOT NULL
-- We don't support "deferred transactions": rows that we'll assert to be
-- true in the future. The benefit of this constraint is that all current rows
-- have an unbounded (null) upper bound. Allow an overlap of 100 microseconds
-- to support a row updated up to 100 times in a single transaction. The
-- temporal trigger ensures each update asr period is not-empty, expanding
-- to a minimum of 1 microsecond.
CONSTRAINT period_starts_before_now CHECK ( lower(value) <= now() + '100 microseconds'::interval )
-- I don't think we'll ever need a period with an unbounded lower bound.
-- If you hit this constraint, it might be that you're inserting and updating
-- the same value in a single transaction and it's likely a bug in
-- add_asserted_history. The now() timestamp is consistent throughout a
-- transaction. A tstzrange where lower == upper is empty and uses null.
CONSTRAINT period_lower_not_null CHECK ( lower(value) IS NOT NULL)
CONSTRAINT period_lower_not_neg_infinity CHECK ( lower(value) != '-infinity')
-- Confusingly, Postgres also supports a value 'infinity' for the upper
-- bound but upper_inf returns false on tstzrange(now(), 'infinity'). To avoid
-- this confusion, we'll require null as an upper bound to indicate unbounded
-- instead of 'infinity'. https://stackoverflow.com/a/67612325/30900.
CONSTRAINT period_upper_not_infinity_use_null CHECK ( upper(value) != 'infinity')
DEFAULT tstzrange(now(), NULL);
-- temporal.is_current returns true if the period is current, meaning it
-- includes now(). We use this as a more semantically meaningful version of the
-- Postgres builtin upper_inf which checks if the upper bound is null, meaning
-- unbounded.
CREATE FUNCTION temporal.is_current(p temporal.period) RETURNS boolean AS $$
-- We enforce using null but check infinity just in case.
SELECT upper_inf(p) OR upper(p) = 'infinity';
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE;
-- _pad_now_within_txn returns the smallest timestamp t such that prev < t AND
-- now() <= t. In most cases, t is now(). Handles updating a row multiple times
-- in the same transaction, ensuring that each update has a temporal.period of
-- at least 1 microsecond. A microsecond is the smallest granularity of a
-- Postgres timestamp. Postgres represents all empty time ranges with the same
-- value, losing the start time information in the process.
CREATE FUNCTION temporal._pad_now_within_txn(prev timestamptz) RETURNS timestamptz AS $$
SELECT CASE WHEN prev >= now() THEN prev + '1 microsecond'::interval ELSE now() END;
$$ LANGUAGE sql STABLE PARALLEL SAFE;
-- asserted_history_trigger is a statement-level trigger that copies the before
-- image of updated or deleted rows from the current partition into the past
-- partition. The trigger updates the copied rows by setting the upper bound
-- of the asserted period to now(). Correspondingly, the asserted_time_trigger
-- will set the lower bound of the modified rows in the current partition to
-- now so that the timeline of a row is contiguous.
CREATE FUNCTION temporal.asserted_history_trigger() RETURNS trigger AS $fn$
DECLARE
curr_tbl text := tg_table_name;
past_tbl text := CASE
WHEN curr_tbl LIKE '%_current'
-- v202107: rewrite public.invoice_current to public.invoice_past.
THEN format('%I.%I', tg_table_schema, replace(curr_tbl, '_current', '_past'))
ELSE
-- v202309: rewrite erp.invoice to erppast.invoice
format('%I.%I', tg_table_schema || 'past', curr_tbl)
END;
ins_cols text;
BEGIN
-- For INSERT, we don't need to do anything. If the INSERT statement has
-- an ON CONFLICT DO UPDATE, Postgres will fire this trigger as an update.
-- https://www.postgresql.org/docs/13/trigger-definition.html#TRIGGER-DEFINITION
IF tg_op != 'UPDATE' AND tg_op != 'DELETE' THEN
RAISE EXCEPTION 'unsupported tg_op % in asserted_history_trigger', tg_op;
END IF;
-- Find all the non-generated columns we'll insert.
-- Also exclude asr since we adjust the upper bound.
EXECUTE format($$
SELECT
string_agg(attname, ', ') as str
FROM pg_attribute
WHERE attrelid = %L::regclass::oid
AND attnum > 0
AND NOT attisdropped
AND attgenerated = ''
AND attname != 'asr'
GROUP BY attrelid;
$$, past_tbl) INTO STRICT ins_cols;
IF ins_cols = '' || ins_cols IS NULL THEN
RAISE EXCEPTION 'columns to insert not found for table %', past_tbl;
END IF;
-- Insert all modified rows. Adjust the upper bound of the asr to now() since
-- each row has been deleted or superseded.
EXECUTE format($$
INSERT INTO %s (%s, asr)
SELECT %s, temporal.period(
lower(asr),
temporal._pad_now_within_txn(lower(asr))
)
FROM modified_rows;
$$, past_tbl, ins_cols, ins_cols);
RETURN NULL; -- per-statement triggers should always return null
END;
$fn$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment