Skip to content

Instantly share code, notes, and snippets.

@miguelbermudez
Forked from goliatone/README.md
Created June 14, 2023 15:14
Show Gist options
  • Save miguelbermudez/6ff09f77c138ffb1c774abbea310db04 to your computer and use it in GitHub Desktop.
Save miguelbermudez/6ff09f77c138ffb1c774abbea310db04 to your computer and use it in GitHub Desktop.
Postgres TRIGGER to call NOTIFY with a JSON payload

This TRIGGER function calls PosgreSQL's NOTIFY command with a JSON payload. You can listen for these calls and then send the JSON payload to a message queue (like AMQP/RabbitMQ) or trigger other actions.

Create the trigger with notify_trigger.sql.

When declaring the trigger, supply the column names you want the JSON payload to contain as arguments to the function (see create_triggers.sql)

The payload returns a JSON object:

{
  "timestamp": "2017-01-14 22:10:49.506002+00",
  "operation": "INSERT",
  "schema": "pledges",
  "table": "income",
  "data": {
    "id": "4e565844-daa6-11e6-ad5e-4b33ec44da97",
    "person_id": "8dba0c26-da13-11e6-b5bc-474d83f61aaa",
    "amount": "12345.99",
    "currency_code": "USD",
    "start_date": "2016-01-01",
    "end_date": "2016-12-31"
  }
}

Inspired by this post by Björn Gylling (@bjorngylling).

LISTEN/NOTIFY limitations

There's some restrictions in terms on what you can send

  • Maximum of 8000 bytes in the payload
  • 8GB queue by default.

Handling JSON fields

You can check the special variable keys in this psql documentation article.

CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  dat RECORD;
  payload TEXT;
BEGIN

  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'UPDATE' THEN
     rec := NEW;
     dat := OLD;
  WHEN 'INSERT' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  
  -- Build the payload
  payload := json_build_object('timestamp',CURRENT_TIMESTAMP,'action',LOWER(TG_OP),'schema',TG_TABLE_SCHEMA,'identity',TG_TABLE_NAME,'record',row_to_json(rec), 'old',row_to_json(dat));

  -- Notify the channel
  PERFORM pg_notify('core_db_event',payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;

You then would register the triggers:

DROP TRIGGER location_notify ON location;


CREATE TRIGGER location_notify AFTER INSERT OR UPDATE OR DELETE ON location
FOR EACH ROW EXECUTE PROCEDURE notify_trigger();

You can also restrict which fields actually trigger the notify:

CREATE TRIGGER income_notify
AFTER UPDATE ON income FOR EACH ROW
WHEN ( (OLD.amount, OLD.currency_code, OLD.data_source) IS DISTINCT FROM (NEW.amount, NEW.currency_code, NEW.data_source) )
EXECUTE PROCEDURE notify_trigger(
  'id',
  'person_id',
  'amount',
  'currency_code',
  'start_date',
  'end_date',
  'data_source'
);

Note the use of IS DISTINCT FROM rather than =. Think about the effect of NULL.

Time zone concerns

TIMESTAMP WITH TIME ZONE fields are always stored in UTC and are displayed in local time by default. You can control display using AT TIME ZONE in queries, or with SET timezone = 'UTC' as a per-session GUC. See the Pg docs

Logical Replication

Restrictions of Logical Replication:

  • The database schema and DDL commands are not replicated
  • Sequence data is not replicated.
  • TRUNCATE commands are not replicated
  • Large objects are not replicated.
  • Replication is only possible from base tables to base tables( not views, materialized views, partition root tables, or foreign tables)

Post on how to use WAL replication on AWS to achieve a similar result

Resources

CREATE TRIGGER person_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'email',
'username'
);
CREATE TRIGGER income_notify AFTER INSERT OR UPDATE OR DELETE ON income
FOR EACH ROW EXECUTE PROCEDURE notify_trigger(
'id',
'person_id',
'amount',
'currency_code',
'start_date',
'end_date',
'data_source'
);
const pg = require('pg')
var pgConString = process.env.DATABASE_URL
// Connect to the DB
pg.connect(pgConString, function (err, client) {
if (err) {
console.error(err)
}
// Handle notifications
client.on('notification', function (msg) {
const payload = msg.payload
console.log(payload)
// Send payload into a queue etc...
})
// Listen for NOTIFY calls
var query = client.query('LISTEN core_db_event')
})
CREATE OR REPLACE FUNCTION notify_trigger() RETURNS trigger AS $trigger$
DECLARE
  rec RECORD;
  payload TEXT;
  column_name TEXT;
  column_value TEXT;
  payload_items json;
BEGIN
  -- Set record row depending on operation
  CASE TG_OP
  WHEN 'INSERT','UPDATE' THEN
     rec := NEW;
  WHEN 'DELETE' THEN
     rec := OLD;
  ELSE
     RAISE EXCEPTION 'Unknown TG_OP: "%". Should not occur!', TG_OP;
  END CASE;
  -- Get required fields
  IF TG_ARGV[0] IS NOT NULL THEN
    FOREACH column_name IN ARRAY TG_ARGV LOOP
      EXECUTE format('SELECT $1.%I::TEXT', column_name)
      INTO column_value
      USING rec;
      payload_items := array_to_json(array_append(payload_items, '"' || replace(column_name, '"', '\"') || '":"' || replace(column_value, '"', '\"') || '"'));
    END LOOP;
  ELSE
    payload_items := row_to_json(rec);
  END IF;
  -- Build the payload
  payload := json_build_object('timestamp',CURRENT_TIMESTAMP,'operation',TG_OP,'schema',TG_TABLE_SCHEMA,'table',TG_TABLE_NAME,'data',payload_items);
  -- Notify the channel
  PERFORM pg_notify('core_db_event', payload);
  
  RETURN rec;
END;
$trigger$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment