This TRIGGER function calls PosgreSQL's NOTIFY
command with a JSON payload. You can listen for these calls and then send the JSON payload to a message queue (like AMQP/RabbitMQ) or trigger other actions.
Create the trigger with notify_trigger.sql.
When declaring the trigger, supply the column names you want the JSON payload to contain as arguments to the function (see create_triggers.sql)
The payload returns a JSON object:
{
"timestamp": "2017-01-14 22:10:49.506002+00",
"operation": "INSERT",
"schema": "pledges",
"table": "income",
"data": {
"id": "4e565844-daa6-11e6-ad5e-4b33ec44da97",
"person_id": "8dba0c26-da13-11e6-b5bc-474d83f61aaa",
"amount": "12345.99",
"currency_code": "USD",
"start_date": "2016-01-01",
"end_date": "2016-12-31"
}
}
Inspired by this post by Björn Gylling (@bjorngylling).
There's some restrictions in terms on what you can send
- Maximum of 8000 bytes in the payload
- 8GB queue by default.
You can check the special variable keys in this psql documentation article.
CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
rec RECORD;
dat RECORD;
payload TEXT;
BEGIN
-- Set record row depending on operation
CASE TG_OP
WHEN 'UPDATE' THEN
rec := NEW;
dat := OLD;
WHEN 'INSERT' THEN
rec := NEW;
WHEN 'DELETE' THEN
rec := OLD;
ELSE
RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
END CASE;
-- Build the payload
payload := json_build_object('timestamp',CURRENT_TIMESTAMP,'action',LOWER(TG_OP),'schema',TG_TABLE_SCHEMA,'identity',TG_TABLE_NAME,'record',row_to_json(rec), 'old',row_to_json(dat));
-- Notify the channel
PERFORM pg_notify('core_db_event',payload);
RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
You then would register the triggers:
DROP TRIGGER location_notify ON location;
CREATE TRIGGER location_notify AFTER INSERT OR UPDATE OR DELETE ON location
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();
You can also restrict which fields actually trigger the notify:
CREATE TRIGGER income_notify
AFTER UPDATE ON income FOR EACH ROW
WHEN ( (OLD.amount, OLD.currency_code, OLD.data_source) IS DISTINCT FROM (NEW.amount, NEW.currency_code, NEW.data_source) )
EXECUTE PROCEDURE notify_trigger(
'id',
'person_id',
'amount',
'currency_code',
'start_date',
'end_date',
'data_source'
);
Note the use of IS DISTINCT FROM
rather than =
. Think about the effect of NULL
.
TIMESTAMP WITH TIME ZONE fields are always stored in UTC and are displayed in local time by default. You can control display using AT TIME ZONE in queries, or with SET timezone = 'UTC' as a per-session GUC. See the Pg docs
Restrictions of Logical Replication:
- The database schema and DDL commands are not replicated
- Sequence data is not replicated.
TRUNCATE
commands are not replicated- Large objects are not replicated.
- Replication is only possible from base tables to base tables( not views, materialized views, partition root tables, or foreign tables)
Post on how to use WAL replication on AWS to achieve a similar result