Created
December 27, 2017 21:33
-
-
Save ehmo/fc4f76011ad1504ea5f95b698e1016d5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION citus_run_on_3_colocated_placements(table_name1 regclass, | |
table_name2 regclass, | |
table_name3 regclass, | |
command text, | |
parallel bool default true, | |
OUT nodename text, | |
OUT nodeport int, | |
OUT shardid1 bigint, | |
OUT shardid2 bigint, | |
OUT shardid3 bigint, | |
OUT success bool, | |
OUT result text) | |
RETURNS SETOF record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
workers text[]; | |
ports int[]; | |
shards1 bigint[]; | |
shards2 bigint[]; | |
shards3 bigint[]; | |
commands text[]; | |
BEGIN | |
IF NOT (SELECT citus_tables_colocated(table_name1, table_name2)) THEN | |
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2; | |
END IF; | |
IF NOT (SELECT citus_tables_colocated(table_name2, table_name2)) THEN | |
RAISE EXCEPTION 'tables % and % are not co-located', table_name2, table_name3; | |
END IF; | |
IF NOT (SELECT citus_tables_colocated(table_name1, table_name3)) THEN | |
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name3; | |
END IF; | |
WITH active_shard_placements AS ( | |
SELECT | |
ds.logicalrelid, | |
ds.shardid AS shardid, | |
shard_name(ds.logicalrelid, ds.shardid) AS shardname, | |
ds.shardminvalue AS shardminvalue, | |
ds.shardmaxvalue AS shardmaxvalue, | |
dsp.nodename AS nodename, | |
dsp.nodeport::int AS nodeport | |
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) | |
WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or | |
ds.logicalrelid::regclass = table_name2 or | |
ds.logicalrelid::regclass = table_name3) | |
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport), | |
citus_colocated_placements AS ( | |
SELECT | |
a.logicalrelid::regclass AS tablename1, | |
a.shardid AS shardid1, | |
shard_name(a.logicalrelid, a.shardid) AS shardname1, | |
b.logicalrelid::regclass AS tablename2, | |
b.shardid AS shardid2, | |
shard_name(b.logicalrelid, b.shardid) AS shardname2, | |
c.logicalrelid::regclass AS tablename3, | |
c.shardid AS shardid3, | |
shard_name(c.logicalrelid, c.shardid) AS shardname3, | |
a.nodename AS nodename, | |
a.nodeport::int AS nodeport | |
FROM | |
active_shard_placements a, active_shard_placements b, active_shard_placements c | |
WHERE | |
a.shardminvalue = b.shardminvalue AND | |
a.shardminvalue = c.shardminvalue AND | |
a.shardmaxvalue = b.shardmaxvalue AND | |
a.shardmaxvalue = c.shardmaxvalue AND | |
a.logicalrelid != b.logicalrelid AND | |
a.logicalrelid != c.logicalrelid AND | |
a.nodename = b.nodename AND | |
a.nodeport = b.nodeport AND | |
a.nodeport = c.nodeport AND | |
a.logicalrelid::regclass = table_name1 AND | |
b.logicalrelid::regclass = table_name2 AND | |
c.logicalrelid::regclass = table_name3 | |
ORDER BY a.logicalrelid, a.shardid, nodename, nodeport) | |
SELECT | |
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1), | |
array_agg(cp.shardid2), array_agg(cp.shardid3), array_agg(format(command, cp.shardname1, cp.shardname2, cp.shardname3)) | |
INTO workers, ports, shards1, shards2, shards3, commands | |
FROM citus_colocated_placements cp; | |
RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality], | |
shards2[ordinality], shards3[ordinality], r.success, r.result | |
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r; | |
END; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION pg_catalog.run_command_on_3_colocated_placements( | |
table_name1 regclass, | |
table_name2 regclass, | |
table_name3 | |
regclass, | |
command text, | |
parallel bool default true, | |
OUT nodename text, | |
OUT nodeport int, | |
OUT shardid1 bigint, | |
OUT shardid2 bigint, | |
OUT shardid3 bigint, | |
OUT success bool, | |
OUT result text) | |
RETURNS SETOF record | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
workers text[]; | |
ports int[]; | |
shards1 bigint[]; | |
shards2 bigint[]; | |
shards3 bigint[]; | |
commands text[]; | |
BEGIN | |
IF NOT (SELECT distributed_tables_colocated(table_name1, table_name2)) THEN | |
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name2; | |
END IF; | |
IF NOT (SELECT distributed_tables_colocated(table_name2, table_name3)) THEN | |
RAISE EXCEPTION 'tables % and % are not co-located', table_name2, table_name3; | |
END IF; | |
IF NOT (SELECT distributed_tables_colocated(table_name1, table_name3)) THEN | |
RAISE EXCEPTION 'tables % and % are not co-located', table_name1, table_name3; | |
END IF; | |
WITH active_shard_placements AS ( | |
SELECT | |
ds.logicalrelid, | |
ds.shardid AS shardid, | |
shard_name(ds.logicalrelid, ds.shardid) AS shardname, | |
ds.shardminvalue AS shardminvalue, | |
ds.shardmaxvalue AS shardmaxvalue, | |
dsp.nodename AS nodename, | |
dsp.nodeport::int AS nodeport | |
FROM pg_dist_shard ds JOIN pg_dist_shard_placement dsp USING (shardid) | |
WHERE dsp.shardstate = 1 and (ds.logicalrelid::regclass = table_name1 or | |
ds.logicalrelid::regclass = table_name2 or ds.logicalrelid::regclass = table_name3) | |
ORDER BY ds.logicalrelid, ds.shardid, dsp.nodename, dsp.nodeport), | |
citus_colocated_placements AS ( | |
SELECT | |
a.logicalrelid::regclass AS tablename1, | |
a.shardid AS shardid1, | |
shard_name(a.logicalrelid, a.shardid) AS shardname1, | |
b.logicalrelid::regclass AS tablename2, | |
b.shardid AS shardid2, | |
shard_name(b.logicalrelid, b.shardid) AS shardname2, | |
c.logicalrelid::regclass AS tablename3, | |
c.shardid AS shardid3, | |
shard_name(c.logicalrelid, c.shardid) AS shardname3, | |
a.nodename AS nodename, | |
a.nodeport::int AS nodeport | |
FROM | |
active_shard_placements a, active_shard_placements b, active_shard_placements c | |
WHERE | |
a.shardminvalue = b.shardminvalue AND | |
a.shardminvalue = c.shardminvalue AND | |
a.shardmaxvalue = b.shardmaxvalue AND | |
a.shardmaxvalue = c.shardmaxvalue AND | |
a.logicalrelid != b.logicalrelid AND | |
a.logicalrelid != c.logicalrelid AND | |
a.nodename = b.nodename AND | |
a.nodeport = b.nodeport AND | |
a.nodeport = c.nodeport AND | |
a.logicalrelid::regclass = table_name1 AND | |
b.logicalrelid::regclass = table_name2 AND | |
c.logicalrelid::regclass = table_name3 | |
ORDER BY a.logicalrelid, a.shardid, nodename, nodeport) | |
SELECT | |
array_agg(cp.nodename), array_agg(cp.nodeport), array_agg(cp.shardid1), | |
array_agg(cp.shardid2), array_agg(cp.shardid3), array_agg(format(command, cp.shardname1, cp.shardname2, cp.shardname3)) | |
INTO workers, ports, shards1, shards2, shards3, commands | |
FROM citus_colocated_placements cp; | |
RETURN QUERY SELECT r.node_name, r.node_port, shards1[ordinality], | |
shards2[ordinality], shards3[ordinality], r.success, r.result | |
FROM master_run_on_worker(workers, ports, commands, parallel) WITH ORDINALITY r; | |
END; | |
$function$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment