Last active
August 30, 2024 19:17
-
-
Save carterbryden/e7b19d1ba1ea92e241fed259fea23491 to your computer and use it in GitHub Desktop.
Elixir Phoenix Postgresql migration to add triggers for pubsub to run on every CRUD operation on every table. If a new table is added, it'll automatically add a trigger to that table too.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MyApp.Repo.Migrations.AddPostgresTriggerAndFunctionForAllTables do | |
use Ecto.Migration | |
def up do | |
# Create a function that broadcasts row changes | |
execute " | |
CREATE OR REPLACE FUNCTION broadcast_changes() | |
RETURNS trigger AS $$ | |
DECLARE | |
current_row RECORD; | |
BEGIN | |
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN | |
current_row := NEW; | |
ELSE | |
current_row := OLD; | |
END IF; | |
IF (TG_OP = 'INSERT') THEN | |
OLD := NEW; | |
END IF; | |
IF (TG_OP = 'DELETE') THEN | |
NEW := OLD; | |
END IF; | |
PERFORM pg_notify( | |
'table_changes', | |
json_build_object( | |
'table', TG_TABLE_NAME, | |
'type', TG_OP, | |
'id', current_row.id, | |
'new_row_data', row_to_json(NEW), | |
'old_row_data', row_to_json(OLD) | |
)::text | |
); | |
RETURN current_row; | |
END; | |
$$ LANGUAGE plpgsql;" | |
# Create a trigger that links all of the tables to the broadcast function. Skip the migrations table. | |
execute "CREATE OR REPLACE FUNCTION create_notify_triggers() | |
RETURNS event_trigger | |
LANGUAGE plpgsql | |
AS $$ | |
DECLARE | |
r RECORD; | |
BEGIN | |
FOR r IN SELECT * | |
FROM information_schema.tables | |
where table_schema = 'public' | |
and table_name <> 'schema_migrations' | |
LOOP | |
RAISE NOTICE 'CREATE FOR: %', r.table_name::text; | |
EXECUTE 'DROP TRIGGER IF EXISTS notify_table_changes_trigger ON ' || r.table_name || ';'; | |
EXECUTE 'CREATE TRIGGER notify_table_changes_trigger | |
AFTER INSERT OR UPDATE OR DELETE | |
ON ' || r.table_name || ' | |
FOR EACH ROW | |
EXECUTE PROCEDURE broadcast_changes();'; | |
END LOOP; | |
END; | |
$$;" | |
#What if we add more tables later, after this is run? This adds a trigger to add the above triggers to any new tables as well. | |
execute "CREATE EVENT TRIGGER add_table_broadcast_triggers ON ddl_command_end | |
WHEN TAG IN ('CREATE TABLE','CREATE TABLE AS') | |
EXECUTE PROCEDURE create_notify_triggers();" | |
end | |
def down do | |
execute "DROP EVENT TRIGGER add_table_broadcast_triggers" | |
execute "FOR r IN SELECT * | |
FROM information_schema.tables | |
where table_schema = 'public' | |
and table_name <> 'schema_migrations' | |
LOOP | |
RAISE NOTICE 'CREATE FOR: %', r.table_name::text; | |
EXECUTE 'DROP TRIGGER IF EXISTS notify_table_changes_trigger ON ' || r.table_name || ';'; | |
END LOOP;" | |
end | |
end | |
#And the listener code (separate module anywhere you'd like | |
#This module namespace and name can be whatever you want | |
defmodule MyApp.DatabaseListener.Listener do | |
use GenServer | |
require Logger | |
import Poison, only: [decode!: 1] | |
@doc """ | |
Initialize the GenServer | |
""" | |
@spec start_link([String.t], [any]) :: {:ok, pid} | |
def start_link(channel, otp_opts \\ []), do: GenServer.start_link(__MODULE__, channel, otp_opts) | |
@doc """ | |
When the GenServer starts subscribe to the given channel | |
""" | |
@spec init([String.t]) :: {:ok, []} | |
def init(channel) do | |
Logger.debug("Starting #{ __MODULE__ } with channel subscription: #{channel}") | |
pg_config = MyApp.Repo.config() | |
{:ok, pid} = Postgrex.Notifications.start_link(pg_config) | |
{:ok, ref} = Postgrex.Notifications.listen(pid, channel) | |
{:ok, {pid, channel, ref}} | |
end | |
@doc """ | |
Listen for changes | |
""" | |
def handle_info({:notification, _pid, _ref, "table_changes", payload}, _state) do | |
change = payload | |
|> decode!() | |
#change will decode json into a list with: | |
# type - what crud operation it is | |
# table - what table it was done on | |
# id - the ID of the row, but will also be in the old and new row data | |
# new_row_data - the new data either inserted or updated on the row, or nil in case of delete | |
# old_row_data - the old data that use to be on the row, or nil in case of insert | |
# You can get these with change["type"] for instance and do whatever you want with them below this line | |
{:noreply, :event_handled} | |
end | |
def handle_info(_, _state), do: {:noreply, :event_received} | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment