-
Star
(101)
You must be signed in to star a gist -
Fork
(24)
You must be signed in to fork a gist
-
-
Save colophonemes/9701b906c5be572a40a84b08f4d2fa4e to your computer and use it in GitHub Desktop.
CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income | |
FOR EACH ROW EXECUTE PROCEDURE notify_trigger( | |
'id', | |
'email', | |
'username' | |
); | |
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income | |
FOR EACH ROW EXECUTE PROCEDURE notify_trigger( | |
'id', | |
'person_id', | |
'amount', | |
'currency_code', | |
'start_date', | |
'end_date', | |
'data_source' | |
); |
const pg = require('pg') | |
var pgConString = process.env.DATABASE_URL | |
// Connect to the DB | |
pg.connect(pgConString, function (err, client) { | |
if (err) { | |
console.error(err) | |
} | |
// Handle notifications | |
client.on('notification', function (msg) { | |
const payload = msg.payload | |
console.log(payload) | |
// Send payload into a queue etc... | |
}) | |
// Listen for NOTIFY calls | |
var query = client.query('LISTEN db_notifications') | |
}) |
-- Trigger notification for messaging to PG Notify | |
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$ | |
DECLARE | |
rec RECORD; | |
payload TEXT; | |
column_name TEXT; | |
column_value TEXT; | |
payload_items TEXT[]; | |
BEGIN | |
-- Set record row depending on operation | |
CASE TG_OP | |
WHEN 'INSERT', 'UPDATE' THEN | |
rec := NEW; | |
WHEN 'DELETE' THEN | |
rec := OLD; | |
ELSE | |
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; | |
END CASE; | |
-- Get required fields | |
FOREACH column_name IN ARRAY TG_ARGV LOOP | |
EXECUTE format('SELECT $1.%I::TEXT', column_name) | |
INTO column_value | |
USING rec; | |
payload_items := array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'); | |
END LOOP; | |
-- Build the payload | |
payload := '' | |
|| '{' | |
|| '"timestamp":"' || CURRENT_TIMESTAMP || '",' | |
|| '"operation":"' || TG_OP || '",' | |
|| '"schema":"' || TG_TABLE_SCHEMA || '",' | |
|| '"table":"' || TG_TABLE_NAME || '",' | |
|| '"data":{' || array_to_string(payload_items, ',') || '}' | |
|| '}'; | |
-- Notify the channel | |
PERFORM pg_notify('db_notifications', payload); | |
RETURN rec; | |
END; | |
$trigger$ LANGUAGE plpgsql; |
Hi
Just wanted to say that we packaged this idea (albeit written in C) into a Docker container that can simply be dropped alongside your Postgres instance:
https://github.com/hasura/skor
What do you think?
Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value TEXT;
payload_items JSONB;
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get required fields
FOREACH column_name IN ARRAY TG_ARGV LOOP
EXECUTE format('SELECT $1.%I::TEXT', column_name)
INTO column_value
USING rec;
payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
END LOOP;
-- Build the payload
payload := json_build_object(
'timestamp',CURRENT_TIMESTAMP,
'operation',TG_OP,
'schema',TG_TABLE_SCHEMA,
'table',TG_TABLE_NAME,
'data',payload_items
);
-- Notify the channel
PERFORM pg_notify('db_notifications', payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;```
that helped, thanks bud
Thanks for this code, which was very helpful! These days, you can simplify your code thanks to the json_object
function which just so happens to turn an even-sized PostgreSQL ARRAY
into a JSON object by alternating keys and values. Also, you can use pg-listen
instead of pg
. I demonstrated both techniques in a forked gist.
Hello everyone, I am trying to get this trigger to work on the following table schema, but somehow the loop triggers an error:
CREATE TABLE IF NOT EXISTS fetchq_catalog.fetchq_sys_queues (
id SERIAL PRIMARY KEY,
created_at TIMESTAMP WITH TIME ZONE,
name CHARACTER VARYING(40) NOT NULL,
is_active BOOLEAN DEFAULT true,
current_version INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 5,
errors_retention VARCHAR(25) DEFAULT '24h',
metrics_retention JSONB DEFAULT '[]',
config JSON DEFAULT '{}'
);
Can you help to identify what's wrong?
Hello again, after a few dead ends I found this solution that seems to be working just fine:
SELECT json_agg(n)::text INTO data FROM json_each_text(to_json(rec)) n;
Hello, How about this
I'm add new function is_json
create or replace function is_json(text)
returns boolean language plpgsql immutable as $$
begin
perform $1::json;
return true;
exception
when invalid_text_representation then
return false;
end $$;
then
CASE
WHEN is_json(column_value) THEN
payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value::jsonb)::jsonb;
ELSE
payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
END CASE;
Doing something similar, but instead of coalesce i am collecting arrays of keys and values and use json_object
instead of json_build_object
.
Also, instead of CREATE OR REPLACE
function i am using DROP IF EXISTS CASCADE
before. That way all table triggers are deleted (cleaned) before on each start of node process.
Also, for actuall listening i am using this library: https://github.com/andywer/pg-listen
Not sure if thats better. Just found this and wanted to share my approach.
Trigger function:
DROP FUNCTION IF EXISTS ${schemaName}.${functionName}() CASCADE;
CREATE FUNCTION ${schemaName}.${functionName}() RETURNS trigger AS $$
DECLARE
notification json;
new_or_changed_record_as_jsonb jsonb;
channelName text;
fields text[];
field text;
extracted_fields text[];
extracted_values text[];
BEGIN
-- TG_ARGV[0]: 1st trigger argument must be channel name
channelName = TG_ARGV[0];
-- TG_ARGV[1]: 2nd trigger argument must be list of fields to include in JSON
fields = TG_ARGV[1];
-- create temp. jsonb version of changed or created record 1
IF (TG_OP = 'DELETE') THEN
new_or_changed_record_as_jsonb = to_jsonb(OLD);
ELSE
new_or_changed_record_as_jsonb = to_jsonb(NEW);
END IF;
-- build json dynamically for a given list of fields
FOREACH field IN ARRAY fields LOOP
extracted_fields = array_append(extracted_fields, field);
extracted_values = array_append(extracted_values, new_or_changed_record_as_jsonb->>field);
END LOOP;
notification = json_build_object(
'type', TG_OP,
'dateTime', clock_timestamp(),
'table', TG_TABLE_NAME::text,
'schema', TG_TABLE_SCHEMA::text,
'data', json_object(extracted_fields, extracted_values)
);
PERFORM pg_notify(channelName, notification::text);
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
I have tried to build this setup, but get the error:
ERROR: operator does not exist: jsonb || jsonb at character 44
HINT: No operator matches the given name and argument type(s). You might need to add explicit type casts.
QUERY: SELECT coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb
CONTEXT: PL/pgSQL function notify_trigger() line 24 at assignment
my notify_trigger
function looks like this (from @shrumm):
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value TEXT;
payload_items JSONB;
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get required fields
FOREACH column_name IN ARRAY TG_ARGV LOOP
EXECUTE format('SELECT $1.%I::TEXT', column_name)
INTO column_value
USING rec;
payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb;
END LOOP;
-- Build the payload
payload := json_build_object(
'timestamp',CURRENT_TIMESTAMP,
'operation',TG_OP,
'schema',TG_TABLE_SCHEMA,
'table',TG_TABLE_NAME,
'data',payload_items
);
-- Notify the channel
PERFORM pg_notify('db_notifications', payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
Can any one help me ?
PostgreSQL version is 9.4.24.
@mStirner I think it's because you're trying to coalesce a JSONB
type with a JSON
type. Maybe try using jsonb_build_object
without the cast instead of json_build_object()::JSONB
?
@mStirner I think it's because you're trying to coalesce a
JSONB
type with aJSON
type. Maybe try usingjsonb_build_object
without the cast instead ofjson_build_object()::JSONB
?
I have no idea from sql (postgresql)... I have opend a qustion on StackExcahnge (https://dba.stackexchange.com/questions/275297/postgresql-enable-notification-via-trigger-function?noredirect=1#comment539912_275297) some one there writed:
The || operator for jsonb was introduced in 9.5 and is not available in your un-supported version
But i have no idea how to solve this...
First of all, thanks for writing this code.
In order to have dynamic types into column_value, instead all values as String, I have changed it as:
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value JSONB;
payload_items JSONB;
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Get required fields
FOREACH column_name IN ARRAY TG_ARGV LOOP
EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
INTO column_value
USING rec;
payload_items := coalesce(payload_items,'{}')::jsonb || column_value;
END LOOP;
-- Build the payload
payload := json_build_object(
'timestamp',CURRENT_TIMESTAMP,
'operation',TG_OP,
'schema',TG_TABLE_SCHEMA,
'table',TG_TABLE_NAME,
'data',payload_items
);
-- Notify the channel
PERFORM pg_notify('db_notifications', payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
As result we get:
{"data": {"id": 1111, "hash": 2222, "type": "LIBRARY", "score": 3.03, "viewed": false, "version": 0, "state_id": 1, "lastenv_id": 3, "environment": "SERVER", "firstenv_id": 3, "occurrences": 2, "application_id": 3333, "lastocc urrence": "2021-08-25T16:43:54.376+00:00", "firstoccurrence": "2021-08-25T16:43:54.376+00:00", "organization_id": 4444, "processingstate": "CREATED", "statechangetime": null, "lastappversion_id": 5555, "firstappversion_id": 6666}, "table": "table_things", "schema": "public", "operation": "UPDATE", "timestamp": "2021-09-01T17:06:59.447523+00:00"}
Thanks for this! I was looking for a design, where I can listen to changes in the database and act on them from a Python application I'm writing. Only after some research I noticed that the NOTIFY/LISTEN
design is not great for this purpose. Because when there are no listeners, all payloads from NOTIFY
are instantly lost, and I need to make sure I act on all relevant changes. This scenario is also discussed here: https://stackoverflow.com/questions/23087347/what-happens-with-a-notify-when-no-session-has-issued-listen-in-postgresql
I repurposed most of the code to serve my needs and write trigger data to a new database. Also I'm persisting both the OLD
and NEW
and an extra string to identify the trigger.
persist_trigger.sql
:
-- Create table to store trigger data in
CREATE TABLE invopro.trigger_data(
id SERIAL PRIMARY KEY,
timestamp TIMESTAMP,
operation TEXT,
schema_name TEXT,
table_name TEXT,
trigger TEXT,
data_old JSONB,
data_new JSONB,
processed BOOLEAN DEFAULT FALSE
);
-- Create function to store data from trigger in table
DROP FUNCTION IF EXISTS invopro.persist_trigger() CASCADE;
CREATE FUNCTION invopro.persist_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
payload TEXT;
column_name TEXT;
column_value JSONB;
payload_items_new JSONB;
payload_items_old JSONB;
event_name TEXT;
fields TEXT[];
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'INSERT', 'UPDATE' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
event_name = TG_ARGV[0];
fields = TG_ARGV[1];
-- Get old values of required fields
FOREACH column_name IN ARRAY fields LOOP
EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
INTO column_value
USING OLD;
payload_items_old := coalesce(payload_items_old,'{}')::jsonb || column_value;
END LOOP;
-- Get new values of required fields
FOREACH column_name IN ARRAY fields LOOP
EXECUTE format('SELECT json_build_object(''%I'', $1.%I)', column_name, column_name)
INTO column_value
USING NEW;
payload_items_new := coalesce(payload_items_new,'{}')::jsonb || column_value;
END LOOP;
INSERT INTO invopro.trigger_data (
timestamp,
operation,
schema_name,
table_name,
trigger,
data_old,
data_new
)
VALUES (
CURRENT_TIMESTAMP,
TG_OP,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
event_name,
payload_items_old,
payload_items_new
);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
create_triggers.sql
:
CREATE TRIGGER move_notify_state_change
AFTER UPDATE OF state
ON public.account_move
FOR EACH ROW
EXECUTE PROCEDURE invopro.persist_trigger(
'state_change',
'{id, move_type, state}'
);
CREATE TRIGGER move_notify_edi_state_change
AFTER UPDATE OF edi_state
ON public.account_move
FOR EACH ROW
EXECUTE PROCEDURE invopro.persist_trigger(
'edi_state_change',
'{id, move_type, edi_state}'
);
CREATE TRIGGER move_notify_payment_state_change
AFTER UPDATE OF payment_state
ON public.account_move
FOR EACH ROW
EXECUTE PROCEDURE invopro.persist_trigger(
'payment_state_change',
'{id, move_type, payment_state}'
);
Hey, first of all, thanks so much for writing this! I noticed that this function doesn't work if the column is itself a json value using PG's built-in JSON/JSONB data types. So I modified your function to support it, hope it is helpful.
CREATE FUNCTION notify_trigger() RETURNS trigger AS $trigger$ DECLARE rec RECORD; payload TEXT; column_name TEXT; column_value TEXT; payload_items JSONB; BEGIN -- Set record row depending on operation CASE TG_OP WHEN 'INSERT', 'UPDATE' THEN rec := NEW; WHEN 'DELETE' THEN rec := OLD; ELSE RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP; END CASE; -- Get required fields FOREACH column_name IN ARRAY TG_ARGV LOOP EXECUTE format('SELECT $1.%I::TEXT', column_name) INTO column_value USING rec; payload_items := coalesce(payload_items,'{}')::jsonb || json_build_object(column_name,column_value)::jsonb; END LOOP; -- Build the payload payload := json_build_object( 'timestamp',CURRENT_TIMESTAMP, 'operation',TG_OP, 'schema',TG_TABLE_SCHEMA, 'table',TG_TABLE_NAME, 'data',payload_items ); -- Notify the channel PERFORM pg_notify('db_notifications', payload); RETURN rec; END; $trigger$ LANGUAGE plpgsql;```
thanks !
This
TRIGGER
function calls PosgreSQL'sNOTIFY
command with a JSON payload. You can listen for these calls and then send the JSON payload to a message queue (like AMQP/RabbitMQ) or trigger other actions.Create the trigger with
notify_trigger.sql
.When declaring the trigger, supply the column names you want the JSON payload to contain as arguments to the function (see
create_triggers.sql
)The payload returns a JSON object:
Inspired by this post by Björn Gylling (@bjorngylling).