-
-
Save tyrauber/5d23a39a5da74f535840c3b9927cc518 to your computer and use it in GitHub Desktop.
Postgres TRIGGER to call NOTIFY with a JSON payload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income | |
FOR EACH ROW EXECUTE PROCEDURE notify_trigger( | |
'id', | |
'email', | |
'username' | |
); | |
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income | |
FOR EACH ROW EXECUTE PROCEDURE notify_trigger( | |
'id', | |
'person_id', | |
'amount', | |
'currency_code', | |
'start_date', | |
'end_date', | |
'data_source' | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const pg = require('pg') | |
var pgConString = process.env.DATABASE_URL | |
// Connect to the DB | |
pg.connect(pgConString, function (err, client) { | |
if (err) { | |
console.error(err) | |
} | |
// Handle notifications | |
client.on('notification', function (msg) { | |
const payload = msg.payload | |
console.log(payload) | |
// Send payload into a queue etc... | |
}) | |
// Listen for NOTIFY calls | |
var query = client.query('LISTEN db_notifications') | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Trigger notification for messaging to PG Notify | |
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$ | |
DECLARE | |
rec RECORD; | |
payload TEXT; | |
column_name TEXT; | |
column_value TEXT; | |
payload_items TEXT[]; | |
BEGIN | |
-- Set record row depending on operation | |
CASE TG_OP | |
WHEN 'INSERT', 'UPDATE' THEN | |
rec := NEW; | |
WHEN 'DELETE' THEN | |
rec := OLD; | |
ELSE | |
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; | |
END CASE; | |
-- Get required fields | |
FOREACH column_name IN ARRAY TG_ARGV LOOP | |
EXECUTE format('SELECT $1.%I::TEXT', column_name) | |
INTO column_value | |
USING rec; | |
payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); | |
END LOOP; | |
-- Build the payload | |
payload := '' | |
|| '{' | |
|| '"timestamp":"' || CURRENT_TIMESTAMP || '",' | |
|| '"operation":"' || TG_OP || '",' | |
|| '"schema":"' || TG_TABLE_SCHEMA || '",' | |
|| '"table":"' || TG_TABLE_NAME || '",' | |
|| '"data":{' || array_to_string(payload_items, ',') || '}' | |
|| '}'; | |
-- Notify the channel | |
PERFORM pg_notify('db_notifications', payload); | |
RETURN rec; | |
END; | |
$trigger$ LANGUAGE plpgsql; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment