Last active
November 17, 2024 19:00
-
-
Save marcocitus/cd3e52ea897de2200f103adb6b511197 to your computer and use it in GitHub Desktop.
Prototype for PubSub on PG 10 with Citus 7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* commands to run on the coordinator */ | |
CREATE EXTENSION citus; | |
SELECT master_add_node('10.0.0.2', 5432); | |
SELECT master_add_node('10.0.0.3', 5432); | |
SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node; | |
SET citus.replication_model TO 'streaming' | |
CREATE TABLE events ( | |
event_id bigserial primary key, | |
ingest_time timestamptz default now(), | |
topic_name text not null, | |
payload jsonb | |
); | |
SELECT create_distributed_table('events', 'event_id'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* commands to run on the data nodes */ | |
CREATE EXTENSION IF NOT EXISTS citus; | |
CREATE EXTENSION IF NOT EXISTS dblink; | |
/*** LEASES ***/ | |
/* shard leases on the local node */ | |
CREATE TABLE leases ( | |
consumer_group text not null, | |
shard_id bigint not null, | |
owner text, | |
new_owner text, | |
last_heartbeat timestamptz, | |
PRIMARY KEY (consumer_group, shard_id) | |
); | |
/* get leases within a consumer group on the given node */ | |
CREATE OR REPLACE FUNCTION remote_get_leases( | |
node_name text, | |
node_port int, | |
group_name text) | |
RETURNS SETOF leases LANGUAGE plpgsql AS $function$ | |
BEGIN | |
RETURN QUERY | |
SELECT (res).* | |
FROM dblink(format('host=%s port=%s dbname=%s', node_name, node_port, current_database()), | |
format('SELECT leases FROM leases WHERE consumer_group = %L', group_name)) | |
AS r(res leases); | |
EXCEPTION WHEN others THEN | |
RAISE WARNING '%:% %', node_name, node_port, SQLERRM; | |
END; | |
$function$; | |
/* claim a shard on a remote node */ | |
CREATE OR REPLACE FUNCTION claim_lease( | |
group_name text, | |
source_node text, | |
claimed_shard bigint) | |
RETURNS bool LANGUAGE plpgsql AS $function$ | |
DECLARE | |
shard_slot text := format('%s_%s', group_name, claimed_shard); | |
BEGIN | |
/* create a replication slot before any writes occur */ | |
IF NOT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = shard_slot) THEN | |
PERFORM pg_create_logical_replication_slot(shard_slot, 'test_decoding'); | |
END IF; | |
INSERT INTO leases (consumer_group, shard_id, owner, last_heartbeat) | |
VALUES (group_name, claimed_shard, source_node, now()) | |
ON CONFLICT (consumer_group, shard_id) DO UPDATE | |
SET new_owner = source_node, last_heartbeat = now() | |
WHERE leases.new_owner IS NULL; | |
RETURN true; | |
END; | |
$function$; | |
/* claim a shard on a remote node */ | |
CREATE OR REPLACE FUNCTION remote_claim_lease( | |
node_name text, | |
node_port int, | |
group_name text, | |
source_node text, | |
shard_id bigint) | |
RETURNS SETOF bool LANGUAGE plpgsql AS $function$ | |
BEGIN | |
RETURN QUERY | |
SELECT res | |
FROM dblink(format('host=%s port=%s dbname=%s', node_name, node_port, current_database()), | |
format('SELECT claim_lease(%L,%L,%s)', group_name, source_node, shard_id)) | |
AS r(res bool); | |
EXCEPTION WHEN others THEN | |
RAISE WARNING '%:% %', node_name, node_port, SQLERRM; | |
END; | |
$function$; | |
/* distributed algorithm for dividing leases among consumers */ | |
CREATE OR REPLACE FUNCTION obtain_leases( | |
group_name text, | |
source_node text, | |
OUT node_name text, | |
OUT node_port int, | |
OUT shard_id bigint) | |
RETURNS SETOF record LANGUAGE sql AS $function$ | |
/* find the current owners of all shards */ | |
WITH shard_owners AS ( | |
SELECT n.nodename, n.nodeport, shardid, owner, new_owner | |
FROM pg_dist_node n | |
JOIN pg_dist_placement p USING (groupid) | |
JOIN pg_dist_shard s USING (shardid) | |
LEFT JOIN remote_get_leases(nodename, nodeport, group_name) l ON (shardid = l.shard_id) | |
WHERE logicalrelid = 'events'::regclass AND noderole = 'primary' | |
ORDER BY random() | |
), | |
/* claim all unclaimed shards */ | |
claimed_shards AS ( | |
SELECT nodename, nodeport, shardid, remote_claim_lease(nodename, nodeport, group_name, source_node, shardid) | |
FROM shard_owners | |
WHERE owner IS NULL | |
), | |
/* determine total number of consumers */ | |
consumer_count AS ( | |
SELECT count(DISTINCT node) | |
FROM (SELECT owner AS node FROM shard_owners WHERE owner IS NOT NULL | |
UNION ALL | |
SELECT new_owner AS node FROM shard_owners WHERE new_owner IS NOT NULL | |
UNION ALL | |
SELECT source_node) all_consumers | |
), | |
/* determine minimum number of shards to claim */ | |
min_claimable_count AS ( | |
SELECT count(*) / (SELECT * FROM consumer_count) | |
FROM pg_dist_shard | |
WHERE logicalrelid = 'events'::regclass | |
), | |
/* determine number of slots to claim (#shards/#consumers - #claimed - #held) */ | |
num_shards_remaining AS ( | |
SELECT | |
(SELECT * FROM min_claimable_count) - | |
(SELECT count(*) FROM claimed_shards) - | |
count(*) AS num_shards_remaining | |
FROM shard_owners | |
WHERE owner = source_node OR new_owner = source_node | |
), | |
/* find stealable shards (randomly ordered) */ | |
stealable_shards AS ( | |
SELECT row_number() OVER () AS row_number, nodename, nodeport, shardid | |
FROM shard_owners | |
WHERE owner IS NOT NULL AND owner != source_node AND (new_owner IS NULL OR new_owner != source_node) | |
), | |
/* steal shards */ | |
stolen_shards AS ( | |
SELECT *, remote_claim_lease(nodename, nodeport, group_name, source_node, shardid) | |
FROM stealable_shards | |
WHERE row_number <= (SELECT * FROM num_shards_remaining) | |
) | |
SELECT nodename, nodeport, shardid FROM claimed_shards | |
UNION | |
SELECT nodename, nodeport, shardid FROM stolen_shards | |
UNION | |
SELECT nodename, nodeport, shardid FROM shard_owners | |
WHERE owner = source_node OR new_owner = source_node; | |
$function$; | |
/*** EVENT PARSING (replace with PL/Python?) ***/ | |
/* parse the schema name, table name, and tuple data from a test_decoding entry */ | |
CREATE OR REPLACE FUNCTION parse_table_message( | |
message text, | |
OUT schema_name text, | |
OUT table_name text, | |
OUT command text, | |
OUT tuple_data text) | |
RETURNS record LANGUAGE sql AS $function$ | |
SELECT m[1], m[3], m[5], m[6] FROM regexp_matches(message, 'table ([a-z_][a-z0-9_$]*|"(""|[^"])*")\.([a-z_][a-z0-9_$]*|"(""|[^"])*"): ([A-Z]+): (.*)') m; | |
$function$; | |
/* parse the column names, types and quoted values from a test_decoding tuple */ | |
CREATE OR REPLACE FUNCTION parse_tuple_data( | |
tuple_data text, | |
OUT column_name text, | |
OUT column_type text, | |
OUT value text) | |
RETURNS SETOF record LANGUAGE sql AS $function$ | |
SELECT m[1], m[3], m[4] FROM regexp_matches(tuple_data, $$([a-z_][a-z0-9_$]*|"(""|[^"])*")\[([a-z0-9 ]+)\]:('(''|[^'])*'|[^ ]*)$$, 'g') m; | |
$function$; | |
/* convert a test_decoding tuple to a serialised record */ | |
CREATE OR REPLACE FUNCTION tuple_data_to_record( | |
tuple_data text) | |
RETURNS text LANGUAGE plpgsql AS $function$ | |
DECLARE | |
select_query text; | |
result record; | |
BEGIN | |
SELECT 'SELECT '|| string_agg(value||'::'||column_type||' AS '||column_name, ', ') INTO select_query | |
FROM parse_tuple_data(tuple_data); | |
EXECUTE select_query INTO result; | |
RETURN result; | |
END; | |
$function$; | |
/*** EVENT POLLING ***/ | |
/* poll for events within a particular shard */ | |
CREATE OR REPLACE FUNCTION poll_events( | |
group_name text, | |
source_node text, | |
consumed_shard bigint, | |
topic_subscription text default '%', | |
last_applied_event_id bigint default NULL, | |
OUT event_id bigint, | |
OUT ingest_time timestamptz, | |
OUT topic_name text, | |
OUT payload jsonb) | |
RETURNS SETOF record LANGUAGE sql AS $function$ | |
WITH | |
/* determine current lease, relinguish it if claimed by another consumer*/ | |
ownership AS ( | |
UPDATE leases | |
SET owner = Coalesce(new_owner, owner), new_owner = NULL, last_heartbeat = now() | |
WHERE consumer_group = group_name AND shard_id = consumed_shard AND owner = source_node | |
RETURNING owner = source_node AS current_owner | |
), | |
/* peek at latest changes (use changes.location instead of changes.lsn in postgres 9.6) */ | |
changes AS ( | |
SELECT row_number() OVER () AS row_number, changes.lsn AS lsn, changes.data | |
FROM pg_logical_slot_peek_changes(group_name||'_'||consumed_shard, NULL, 5000, 'skip-empty-xacts', 'on') changes | |
), | |
/* extract records from changes */ | |
records AS ( | |
SELECT lsn, row_number, (parse_table_message(data)).* | |
FROM changes | |
WHERE data LIKE format('table public.events_%s: %%', consumed_shard) | |
), | |
/* extract new events from records */ | |
inserts AS ( | |
SELECT lsn, row_number, lead(row_number) OVER () AS next_row_number, (tuple_data_to_record(tuple_data)::events).* | |
FROM records | |
WHERE command = 'INSERT' AND table_name = 'events_'||consumed_shard | |
), | |
topic_inserts AS ( | |
SELECT * FROM inserts WHERE topic_name LIKE topic_subscription | |
), | |
/* find the row number of the last applied insert, if it's the last one then use the last change row number */ | |
last_applied_insert AS ( | |
SELECT CASE WHEN next_row_number IS NULL THEN last_row_number ELSE row_number END AS row_number | |
FROM (SELECT max(row_number) AS last_row_number FROM changes) last_row_number LEFT JOIN topic_inserts ON (true) | |
WHERE last_applied_event_id IS NOT NULL AND (event_id IS NULL OR event_id = last_applied_event_id) | |
LIMIT 1 | |
), | |
/* if applicable, skip all changes up to the last applied event */ | |
applied AS ( | |
SELECT CASE WHEN (SELECT current_owner FROM ownership, last_applied_insert) | |
THEN (SELECT count(*) FROM ( | |
SELECT pg_logical_slot_get_changes(group_name||'_'||consumed_shard, NULL, row_number::int, 'skip-empty-xacts', 'on') | |
FROM last_applied_insert) applied_inserts) > 0 | |
ELSE false | |
END | |
), | |
/* get only the unapplied changes */ | |
unapplied_inserts AS ( | |
SELECT event_id, ingest_time, topic_name, payload | |
FROM ownership, topic_inserts | |
WHERE row_number > Coalesce((SELECT row_number FROM last_applied_insert), 0) | |
AND current_owner LIMIT 100 | |
) | |
SELECT unapplied_inserts.* FROM applied, unapplied_inserts; | |
$function$; | |
/* function to expire old leases */ | |
CREATE OR REPLACE FUNCTION expire_leases() | |
RETURNS void LANGUAGE sql AS $function$ | |
DELETE FROM leases | |
WHERE last_heartbeat < now() - interval '2 minutes' AND new_owner IS NULL; | |
UPDATE leases | |
SET owner = new_owner, new_owner = NULL, last_heartbeat = now() | |
WHERE last_heartbeat < now() - interval '2 minute' AND new_owner IS NOT NULL; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* commands to run on a consumer node */ | |
CREATE EXTENSION IF NOT EXISTS dblink; | |
CREATE TABLE my_leases ( | |
consumer_group text not null, | |
shard_id bigint not null, | |
node_name text not null, | |
node_port int not null, | |
last_consumed_event bigint, | |
PRIMARY KEY (consumer_group, shard_id) | |
); | |
CREATE TYPE events AS ( | |
event_id bigint, | |
ingest_time timestamptz, | |
topic_name text, | |
payload jsonb | |
); | |
/* obtain event subscriptions */ | |
CREATE OR REPLACE FUNCTION remote_obtain_leases( | |
group_name text, | |
source_node text, | |
pubsub_url text) | |
RETURNS bigint LANGUAGE plpgsql AS $function$ | |
DECLARE | |
num_new_leases bigint; | |
BEGIN | |
DELETE FROM my_leases WHERE consumer_group = group_name; | |
WITH new_leases AS( | |
INSERT INTO my_leases | |
SELECT group_name, shard_id, node_name, node_port | |
FROM dblink(pubsub_url, | |
format('SELECT * FROM obtain_leases(%L,%L)', group_name, source_node)) | |
AS res(node_name text, node_port int, shard_id bigint) | |
RETURNING * | |
) | |
SELECT count(*) INTO num_new_leases FROM new_leases; | |
RETURN num_new_leases; | |
END; | |
$function$; | |
CREATE OR REPLACE FUNCTION connection_name( | |
group_name text, | |
node_name text, | |
node_port int, | |
shard_id bigint) | |
RETURNS text LANGUAGE sql AS $function$ | |
SELECT format('%s_%s_%s_%s', group_name, hashtext(node_name), node_port, shard_id) | |
$function$; | |
CREATE OR REPLACE FUNCTION connect_to_shards( | |
group_name text) | |
RETURNS bigint LANGUAGE sql AS $function$ | |
WITH named_leases AS ( | |
SELECT connection_name(consumer_group, node_name, node_port, shard_id) AS connection_name, * | |
FROM my_leases | |
WHERE consumer_group = group_name | |
), | |
open_connections AS ( | |
SELECT connections AS connection_name FROM unnest(dblink_get_connections()) connections | |
), | |
closed_leases AS ( | |
SELECT l.* | |
FROM named_leases l LEFT JOIN open_connections c ON (l.connection_name = c.connection_name) | |
WHERE c.connection_name IS NULL | |
) | |
SELECT count(*) | |
FROM closed_leases, dblink_connect(connection_name, format('host=%s port=%s dbname=%s', node_name, node_port, current_database())) | |
$function$; | |
/* consume events from a particular shard */ | |
CREATE OR REPLACE FUNCTION consume_events( | |
group_name text, | |
source_node text, | |
consume_shard_id bigint, | |
last_consumed_event_id bigint, | |
node_name text, | |
node_port int) | |
RETURNS SETOF events LANGUAGE sql AS $function$ | |
WITH events AS ( | |
SELECT (e).* | |
FROM dblink_get_result(connection_name(group_name, node_name, node_port, consume_shard_id), false) | |
AS res(e events) | |
), | |
last_event AS ( | |
SELECT last_value(event_id) OVER () AS event_id FROM events | |
), | |
update_lease_status AS ( | |
UPDATE my_leases | |
SET last_consumed_event = last_event.event_id | |
FROM last_event | |
WHERE consumer_group = group_name AND shard_id = consume_shard_id | |
RETURNING * | |
) | |
SELECT events.* FROM events, update_lease_status; | |
$function$; | |
/* consume events across all nodes */ | |
CREATE OR REPLACE FUNCTION consume_events( | |
group_name text, | |
source_node text) | |
RETURNS SETOF events LANGUAGE plpgsql AS $function$ | |
BEGIN | |
PERFORM connect_to_shards(group_name); | |
PERFORM dblink_send_query(connection_name(group_name, node_name, node_port, shard_id), | |
format('SELECT e FROM poll_events(%L,%L,%s,''%%'',%L) e', group_name, source_node, shard_id, last_consumed_event)) | |
FROM my_leases | |
WHERE consumer_group = group_name; | |
RETURN QUERY | |
SELECT events.* | |
FROM my_leases, consume_events(group_name, source_node, shard_id, last_consumed_event, node_name, node_port) events | |
WHERE consumer_group = group_name; | |
PERFORM dblink_get_result(connection_name(group_name, node_name, node_port, shard_id), false) | |
FROM my_leases | |
WHERE consumer_group = group_name; | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How many read/write per seconds can you do with such a messaging system?