Install auditing on a per-table basis, e.g.:
SELECT audit.audit_table('billing.subscriptions'),
audit.audit_table('billing.customers'),
audit.audit_table('billing.plans');
# HEAVILY based on https://github.com/razorlabs/pg-json-audit-trigger/blob/master/audit.sql | |
class AddAuditTables < ActiveRecord::Migration | |
def up | |
execute <<-SQL | |
CREATE OR REPLACE FUNCTION jsonb_minus( | |
"json" jsonb, | |
"keys" TEXT[] | |
) | |
RETURNS jsonb | |
LANGUAGE sql | |
IMMUTABLE | |
STRICT | |
AS $_$ | |
SELECT | |
-- Only executes operation if the JSON document has the keys | |
CASE WHEN "json" ?| "keys" | |
THEN COALESCE( | |
(SELECT ('{' || string_agg(to_json("key")::text || ':' || "value", ',') || '}') | |
FROM jsonb_each("json") | |
WHERE "key" <> ALL ("keys")), | |
'{}' | |
)::jsonb | |
ELSE "json" | |
END | |
$_$; | |
CREATE OPERATOR - ( | |
LEFTARG = jsonb, | |
RIGHTARG = text[], | |
PROCEDURE = jsonb_minus | |
); | |
-- Implements "JSONB - JSONB", returns a recursive diff of the JSON documents | |
-- | |
-- http://coussej.github.io/2016/05/24/A-Minus-Operator-For-PostgreSQLs-JSONB/ | |
-- | |
-- param 0: JSONB, primary JSONB source document to compare | |
-- param 1: JSONB, secondary JSONB source document to compare | |
-- | |
CREATE OR REPLACE FUNCTION jsonb_minus ( arg1 jsonb, arg2 jsonb ) | |
RETURNS jsonb | |
LANGUAGE sql | |
IMMUTABLE | |
STRICT | |
AS $$ | |
SELECT | |
COALESCE( | |
json_object_agg( | |
key, | |
CASE | |
-- if the value is an object and the value of the second argument is | |
-- not null, we do a recursion | |
WHEN jsonb_typeof(value) = 'object' AND arg2 -> key IS NOT NULL | |
THEN jsonb_minus(value, arg2 -> key) | |
-- for all the other types, we just return the value | |
ELSE value | |
END | |
), | |
'{}' | |
)::jsonb | |
FROM | |
jsonb_each(arg1) | |
WHERE | |
arg1 -> key <> arg2 -> key | |
OR arg2 -> key IS NULL | |
$$; | |
CREATE OPERATOR - ( | |
LEFTARG = jsonb, | |
RIGHTARG = jsonb, | |
PROCEDURE = jsonb_minus | |
); | |
CREATE SCHEMA audit; | |
REVOKE ALL ON SCHEMA audit FROM public; | |
CREATE TABLE audit.log ( | |
id bigserial NOT NULL PRIMARY KEY, | |
schema_name text NOT NULL, | |
table_name text NOT NULL, | |
-- table_id oid NOT NULL, | |
"session_user" text NOT NULL DEFAULT session_user::text, | |
transaction_at timestamptz NOT NULL DEFAULT current_timestamp, | |
statement_at timestamptz NOT NULL DEFAULT statement_timestamp(), | |
clock_at timestamptz NOT NULL DEFAULT clock_timestamp(), | |
transaction_id bigint NOT NULL DEFAULT txid_current(), | |
application_name text DEFAULT current_setting('application.name', true), | |
application_user text DEFAULT current_setting('application.user', true), | |
query text NOT NULL DEFAULT current_query(), | |
action TEXT NOT NULL CHECK (action IN ('I','D','U','T')), | |
client_addr inet DEFAULT inet_client_addr(), | |
client_port integer DEFAULT inet_client_port(), | |
primary_keys jsonb CHECK (action = 'T' OR (primary_keys <> '{}'::jsonb AND primary_keys IS NOT NULL)), | |
changes jsonb NOT NULL DEFAULT '{}'::jsonb | |
); | |
--CREATE INDEX log_table_id_idx ON audit.log(table_id); | |
CREATE INDEX log_statement_at_idx ON audit.log(statement_at); | |
CREATE INDEX log_action_idx ON audit.log(action); | |
CREATE OR REPLACE FUNCTION audit.if_modified_func() RETURNS TRIGGER AS $body$ | |
DECLARE | |
audit_row audit.log; | |
pkeys text[]; | |
pkey_vals jsonb = '{}'::jsonb; | |
excluded_cols text[] = ARRAY[]::text[]; | |
BEGIN | |
IF TG_WHEN <> 'AFTER' THEN | |
RAISE EXCEPTION 'audit.if_modified_func() may only run as an AFTER trigger'; | |
END IF; | |
-- https://stackoverflow.com/a/44848236/56690 | |
SELECT | |
array_agg(CAST(a.attname AS TEXT)) INTO pkeys | |
FROM | |
pg_index i | |
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) | |
WHERE | |
i.indrelid = TG_RELID | |
AND i.indisprimary; | |
audit_row = ROW( | |
nextval('audit.log_id_seq'), | |
TG_TABLE_SCHEMA::text, | |
TG_TABLE_NAME::text, | |
-- TG_RELID, | |
session_user::text, | |
current_timestamp AT TIME ZONE 'utc', | |
statement_timestamp() AT TIME ZONE 'utc', | |
clock_timestamp() AT TIME ZONE 'utc', | |
txid_current(), | |
current_setting('application.name', true), | |
current_setting('application.user', true), | |
current_query(), | |
substring(TG_OP,1,1), | |
inet_client_addr(), | |
inet_client_port(), | |
'{}'::jsonb, | |
'{}'::jsonb | |
); | |
IF NOT TG_ARGV[0]::boolean IS DISTINCT FROM 'f'::boolean THEN | |
audit_row.query = NULL; | |
END IF; | |
IF TG_ARGV[1] IS NOT NULL THEN | |
excluded_cols = TG_ARGV[1]::text[]; | |
END IF; | |
IF (TG_OP = 'UPDATE' AND TG_LEVEL = 'ROW') THEN | |
audit_row.changes = (to_jsonb(NEW.*) - to_jsonb(OLD.*)) - excluded_cols; | |
SELECT json_object_agg(key, value) INTO audit_row.primary_keys | |
FROM json_each(row_to_json(OLD)) | |
WHERE key IN (SELECT * FROM unnest(pkeys)); | |
IF audit_row.changes = '{}'::jsonb THEN | |
-- All changed fields are ignored. Skip this update. | |
RETURN NULL; | |
END IF; | |
ELSIF (TG_OP = 'DELETE' AND TG_LEVEL = 'ROW') THEN | |
audit_row.changes = to_jsonb(OLD.*) - excluded_cols; | |
SELECT json_object_agg(key, value) INTO audit_row.primary_keys | |
FROM json_each(row_to_json(OLD)) | |
WHERE key IN (SELECT * FROM unnest(pkeys)); | |
ELSIF (TG_OP = 'INSERT' AND TG_LEVEL = 'ROW') THEN | |
audit_row.changes = to_jsonb(NEW.*) - excluded_cols; | |
SELECT json_object_agg(key, value) INTO audit_row.primary_keys | |
FROM json_each(row_to_json(NEW)) | |
WHERE key IN (SELECT * FROM unnest(pkeys)); | |
ELSIF (TG_LEVEL = 'STATEMENT' AND TG_OP IN ('INSERT','UPDATE','DELETE','TRUNCATE')) THEN | |
NULL; | |
ELSE | |
RAISE EXCEPTION '[audit.if_modified_func] - Trigger func added as trigger for unhandled case: %, %',TG_OP, TG_LEVEL; | |
RETURN NULL; | |
END IF; | |
INSERT INTO audit.log VALUES (audit_row.*); | |
RETURN NULL; | |
END; | |
$body$ | |
LANGUAGE plpgsql | |
SECURITY DEFINER | |
SET search_path = pg_catalog, public; | |
COMMENT ON FUNCTION audit.if_modified_func() IS $body$ | |
Track changes to a table at the statement and/or row level. | |
Optional parameters to trigger in CREATE TRIGGER call: | |
param 0: boolean, whether to log the query text. Default 't'. | |
param 1: text[], columns to ignore in updates. Default []. | |
Updates to ignored cols are omitted from changed_fields. | |
Updates with only ignored cols changed are not inserted | |
into the audit log. | |
Almost all the processing work is still done for updates | |
that ignored. If you need to save the load, you need to use | |
WHEN clause on the trigger instead. | |
No warning or error is issued if ignored_cols contains columns | |
that do not exist in the target table. This lets you specify | |
a standard set of ignored columns. | |
There is no parameter to disable logging of values. Add this trigger as | |
a 'FOR EACH STATEMENT' rather than 'FOR EACH ROW' trigger if you do not | |
want to log row values. | |
Note that the user name logged is the login role for the session. The audit trigger | |
cannot obtain the active role because it is reset by the SECURITY DEFINER invocation | |
of the audit trigger its self. | |
$body$; | |
CREATE OR REPLACE FUNCTION audit.unaudit_table(target_table regclass) | |
RETURNS void AS $body$ | |
BEGIN | |
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_row ON ' || target_table::TEXT; | |
EXECUTE 'DROP TRIGGER IF EXISTS audit_trigger_stm ON ' || target_table::TEXT; | |
END; | |
$body$ | |
LANGUAGE 'plpgsql'; | |
CREATE OR REPLACE FUNCTION audit.audit_table( | |
target_table regclass, | |
audit_rows boolean, | |
audit_query_text boolean, | |
ignored_cols text[] | |
) | |
RETURNS void AS $body$ | |
DECLARE | |
stm_targets text = 'INSERT OR UPDATE OR DELETE OR TRUNCATE'; | |
_q_txt text; | |
_ignored_cols_snip text = ''; | |
BEGIN | |
PERFORM audit.unaudit_table(target_table); | |
IF audit_rows THEN | |
IF array_length(ignored_cols,1) > 0 THEN | |
_ignored_cols_snip = ', ' || quote_literal(ignored_cols); | |
END IF; | |
_q_txt = 'CREATE TRIGGER audit_trigger_row AFTER INSERT OR UPDATE OR DELETE ON ' || | |
target_table::TEXT || | |
' FOR EACH ROW EXECUTE PROCEDURE audit.if_modified_func(' || | |
quote_literal(audit_query_text) || _ignored_cols_snip || ');'; | |
RAISE NOTICE '%',_q_txt; | |
EXECUTE _q_txt; | |
stm_targets = 'TRUNCATE'; | |
ELSE | |
END IF; | |
_q_txt = 'CREATE TRIGGER audit_trigger_stm AFTER ' || stm_targets || ' ON ' || | |
target_table || | |
' FOR EACH STATEMENT EXECUTE PROCEDURE audit.if_modified_func('|| | |
quote_literal(audit_query_text) || ');'; | |
RAISE NOTICE '%',_q_txt; | |
EXECUTE _q_txt; | |
END; | |
$body$ | |
LANGUAGE 'plpgsql'; | |
COMMENT ON FUNCTION audit.audit_table(regclass, boolean, boolean, text[]) IS $body$ | |
Add auditing support to a table. | |
Arguments: | |
target_table: Table name, schema qualified if not on search_path | |
audit_rows: Record each row change, or only audit at a statement level | |
audit_query_text: Record the text of the client query that triggered the audit event? | |
ignored_cols: Columns to exclude from update diffs, ignore updates that change only ignored cols. | |
$body$; | |
-- Pg doesn't allow variadic calls with 0 params, so provide a wrapper | |
CREATE OR REPLACE FUNCTION audit.audit_table( | |
target_table regclass, | |
audit_rows boolean, | |
audit_query_text boolean | |
) | |
RETURNS void AS $body$ | |
SELECT audit.audit_table($1, $2, $3, ARRAY[]::text[]); | |
$body$ LANGUAGE SQL; | |
-- And provide a convenience call wrapper for the simplest case | |
-- of row-level logging with no excluded cols and query logging enabled. | |
-- | |
CREATE OR REPLACE FUNCTION audit.audit_table(target_table regclass) | |
RETURNS void AS $body$ | |
SELECT audit.audit_table($1, BOOLEAN 't', BOOLEAN 't'); | |
$body$ LANGUAGE 'sql'; | |
COMMENT ON FUNCTION audit.audit_table(regclass) IS $body$ | |
Add auditing support to the given table. Row-level changes will be logged with full client query text. No cols are ignored. | |
$body$; | |
SQL | |
end | |
def down | |
execute <<-SQL | |
DROP SCHEMA IF EXISTS audit CASCADE; | |
DROP OPERATOR - (jsonb, text[]) CASCADE; | |
DROP OPERATOR - (jsonb, jsonb) CASCADE; | |
SQL | |
end | |
end |