Last active
November 14, 2019 18:27
-
-
Save marcocitus/6696e64b12494c9c438308610b5ffbea to your computer and use it in GitHub Desktop.
Efficient real-time rollups with backfilling in Citus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE rollups ( | |
name text, | |
rolled_up_generation bigint default -1 | |
); | |
-- Create a stub on workers to allow usage as a default in distributed tables | |
SELECT run_command_on_workers($$ | |
CREATE OR REPLACE FUNCTION current_rollup_generation(rollup_name text) | |
RETURNS bigint LANGUAGE sql | |
AS $function$ | |
SELECT 1::bigint | |
$function$ | |
$$); | |
-- Get the current generation and take a share lock on it | |
CREATE OR REPLACE FUNCTION current_rollup_generation(rollup_name text) | |
RETURNS bigint STABLE | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
current_value bigint := coalesce(pg_sequence_last_value(rollup_name), 0); | |
BEGIN | |
PERFORM pg_advisory_xact_lock_shared(current_value); | |
RETURN current_value; | |
END; | |
$function$; | |
-- Find a safe range of generations to roll up | |
CREATE OR REPLACE FUNCTION safe_rollup_window(rollup_name text, OUT start_generation bigint, OUT end_generation bigint) | |
RETURNS record | |
LANGUAGE plpgsql | |
AS $function$ | |
BEGIN | |
-- Use SELECT for UPDATE to prevent concurrent rollups | |
SELECT rolled_up_generation + 1 INTO start_generation | |
FROM rollups WHERE name = rollup_name FOR UPDATE; | |
-- Start a new generation | |
end_generation := nextval(rollup_name) - 1; | |
-- Block until all data from past generations has been ingested | |
PERFORM pg_advisory_xact_lock(generation) | |
FROM generate_series(start_generation, end_generation) generation; | |
-- We expect to be rolling up to end_generation | |
UPDATE rollups SET rolled_up_generation = end_generation WHERE name = rollup_name; | |
END; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE data ( | |
tenant_id int, | |
event text, | |
time timestamptz default now(), | |
rollup_generation bigint default current_rollup_generation('my_rollup') | |
); | |
CREATE INDEX ON data USING BRIN (rollup_generation); | |
SELECT create_distributed_table('data', 'tenant_id'); | |
CREATE TABLE daily_roll ( | |
tenant_id int, | |
key text, | |
day date, | |
counter int, | |
primary key (tenant_id, key, day) | |
); | |
SELECT create_distributed_table('daily_roll', 'tenant_id'); | |
INSERT INTO rollups VALUES ('my_rollup'); | |
CREATE SEQUENCE my_rollup; | |
CREATE OR REPLACE FUNCTION do_rollup() | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$ | |
DECLARE | |
start_gen bigint; | |
end_gen bigint; | |
BEGIN | |
SELECT start_generation, end_generation INTO start_gen, end_gen | |
FROM safe_rollup_window('my_rollup'); | |
INSERT INTO daily_roll | |
SELECT tenant_id, event, time::date, count(*) | |
FROM data | |
WHERE rollup_generation BETWEEN start_gen AND end_gen | |
GROUP BY 1, 2, 3 | |
ON CONFLICT (tenant_id, key, day) | |
DO UPDATE SET counter = daily_roll.counter + EXCLUDED.counter; | |
END; | |
$function$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT INTO data VALUES (1,'hello'); | |
INSERT INTO data VALUES (1,'world'); | |
INSERT INTO data VALUES (1,'hello'); | |
SELECT do_rollup(); | |
SELECT * FROM daily_roll; | |
INSERT INTO data VALUES (1,'hello'); | |
SELECT do_rollup(); | |
SELECT * FROM daily_roll; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment