Last active
February 1, 2025 14:53
-
Star
(101)
You must be signed in to star a gist -
Fork
(24)
You must be signed in to fork a gist
-
-
Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
Postgres TRIGGER to call NOTIFY with a JSON payload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income | |
FOR EACH ROW EXECUTE PROCEDURE notify_trigger( | |
'id', | |
'email', | |
'username' | |
); | |
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income | |
FOR EACH ROW EXECUTE PROCEDURE notify_trigger( | |
'id', | |
'person_id', | |
'amount', | |
'currency_code', | |
'start_date', | |
'end_date', | |
'data_source' | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const pg = require('pg') | |
var pgConString = process.env.DATABASE_URL | |
// Connect to the DB | |
pg.connect(pgConString, function (err, client) { | |
if (err) { | |
console.error(err) | |
} | |
// Handle notifications | |
client.on('notification', function (msg) { | |
const payload = msg.payload | |
console.log(payload) | |
// Send payload into a queue etc... | |
}) | |
// Listen for NOTIFY calls | |
var query = client.query('LISTEN db_notifications') | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Trigger notification for messaging to PG Notify | |
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$ | |
DECLARE | |
rec RECORD; | |
payload TEXT; | |
column_name TEXT; | |
column_value TEXT; | |
payload_items TEXT[]; | |
BEGIN | |
-- Set record row depending on operation | |
CASE TG_OP | |
WHEN 'INSERT', 'UPDATE' THEN | |
rec := NEW; | |
WHEN 'DELETE' THEN | |
rec := OLD; | |
ELSE | |
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; | |
END CASE; | |
-- Get required fields | |
FOREACH column_name IN ARRAY TG_ARGV LOOP | |
EXECUTE format('SELECT $1.%I::TEXT', column_name) | |
INTO column_value | |
USING rec; | |
payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); | |
END LOOP; | |
-- Build the payload | |
payload := '' | |
|| '{' | |
|| '"timestamp":"' || CURRENT_TIMESTAMP || '",' | |
|| '"operation":"' || TG_OP || '",' | |
|| '"schema":"' || TG_TABLE_SCHEMA || '",' | |
|| '"table":"' || TG_TABLE_NAME || '",' | |
|| '"data":{' || array_to_string(payload_items, ',') || '}' | |
|| '}'; | |
-- Notify the channel | |
PERFORM pg_notify('db_notifications', payload); | |
RETURN rec; | |
END; | |
$trigger$ LANGUAGE plpgsql; |
Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$ DECLARE rec RECORD; payload TEXT; column_name TEXT; column_value TEXT; payload_items JSONB; BEGIN -- Set record row depending on operation CASE TG_OP WHEN 'INSERT', 'UPDATE' THEN rec := NEW; WHEN 'DELETE' THEN rec := OLD; ELSE RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; END CASE; -- Get required fields FOREACH column_name IN ARRAY TG_ARGV LOOP EXECUTE format('SELECT $1.%I::TEXT', column_name) INTO column_value USING rec; payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; END LOOP; -- Build the payload payload := json_build_object( 'timestamp',CURRENT_TIMESTAMP, 'operation',TG_OP, 'schema',TG_TABLE_SCHEMA, 'table',TG_TABLE_NAME, 'data',payload_items ); -- Notify the channel PERFORM pg_notify('db_notifications', payload); RETURN rec; END; $trigger$ LANGUAGE plpgsql;```
thanks !
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this! I was looking for a design, where I can listen to changes in the database and act on them from a Python application I'm writing. Only after some research I noticed that the
NOTIFY/LISTEN
design is not great for this purpose. Because when there are no listeners, all payloads fromNOTIFY
are instantly lost, and I need to make sure I act on all relevant changes. This scenario is also discussed here: https://stackoverflow.com/questions/23087347/what-happens-with-a-notify-when-no-session-has-issued-listen-in-postgresqlI repurposed most of the code to serve my needs and write trigger data to a new database. Also I'm persisting both the
OLD
andNEW
and an extra string to identify the trigger.persist_trigger.sql
:create_triggers.sql
: