Skip to content

Instantly share code, notes, and snippets.

@sdressler
Created October 6, 2020 15:11
Show Gist options
  • Save sdressler/2654cfc0cbc386feb8d82dc4a95fd4ff to your computer and use it in GitHub Desktop.
Save sdressler/2654cfc0cbc386feb8d82dc4a95fd4ff to your computer and use it in GitHub Desktop.
Swarm64 Impact Analyzer
DROP FUNCTION IF EXISTS _calculate_cache_ratio(NUMERIC, NUMERIC);
CREATE FUNCTION _calculate_cache_ratio(sum_blks_read NUMERIC, sum_blks_hit NUMERIC)
RETURNS DOUBLE PRECISION AS $$
SELECT COALESCE(
ROUND(
sum_blks_read / NULLIF((sum_blks_read + sum_blks_hit), 0),
2), 0.0
)::DOUBLE PRECISION
$$ LANGUAGE SQL PARALLEL SAFE;
DROP PROCEDURE IF EXISTS _s64da_impact_analyzer_print();
CREATE PROCEDURE _s64da_impact_analyzer_print() AS $$
DECLARE
row RECORD;
BEGIN
FOR row IN SELECT
COALESCE(MAX(max_parallelism), 0) AS max_parallelism
, COALESCE(MIN(max_parallelism), 0) AS min_parallelism
, COALESCE(ROUND(AVG(max_parallelism)), 0) AS avg_parallelism
, COALESCE(MIN(query_stop - query_start)) AS min_runtime
, COALESCE(MAX(query_stop - query_start)) AS max_runtime
, COALESCE(AVG(query_stop - query_start)) AS avg_runtime
FROM query_stats LOOP
RAISE NOTICE '%', to_json(row);
END LOOP;
FOR row IN SELECT
SUM(heap_blks_read - heap_blks_read_base) AS heap_disk
, _calculate_cache_ratio(SUM(heap_blks_read), SUM(heap_blks_hit)) AS heap_ratio
, SUM(idx_blks_read - idx_blks_read_base) AS idx_disk
, _calculate_cache_ratio(SUM(idx_blks_read), SUM(idx_blks_hit)) AS idx_ratio
, SUM(toast_blks_read - toast_blks_read_base) AS toast_disk
, _calculate_cache_ratio(SUM(toast_blks_read), SUM(toast_blks_hit)) AS toast_ratio
, SUM(inserts - inserts_base) AS inserts
, SUM(updates - updates_base) AS updates
, SUM(deletes - deletes_base) AS deletes
FROM workload_stats LOOP
RAISE NOTICE '%', to_json(row);
END LOOP;
END;
$$ LANGUAGE plpgsql;
DROP PROCEDURE IF EXISTS s64da_impact_analyzer();
CREATE PROCEDURE s64da_impact_analyzer() AS $$
DECLARE
own_datname VARCHAR;
own_pid INT;
BEGIN
SELECT current_database() INTO own_datname;
SELECT pg_backend_pid() INTO own_pid;
RAISE NOTICE 'S64 DA Impact Analyzer on DB: %s', own_datname;
DROP TABLE IF EXISTS query_stats;
CREATE TEMP TABLE query_stats(
pid INT
, max_parallelism INT
, query_start TIMESTAMPTZ
, query_stop TIMESTAMPTZ
, runtime_parallel INTERVAL
);
ALTER TABLE query_stats ADD PRIMARY KEY(pid, query_start);
DROP TABLE IF EXISTS workload_stats;
CREATE TEMP TABLE workload_stats AS
SELECT
stat_io.relid
, stat_io.schemaname
, stat_io.relname
, heap_blks_read
, heap_blks_read AS heap_blks_read_base
, heap_blks_hit
, heap_blks_hit AS heap_blks_hit_base
, idx_blks_read
, idx_blks_read AS idx_blks_read_base
, idx_blks_hit
, idx_blks_hit AS idx_blks_hit_base
, toast_blks_read
, toast_blks_read AS toast_blks_read_base
, toast_blks_hit
, toast_blks_hit AS toast_blks_hit_base
, n_tup_ins AS inserts
, n_tup_ins AS inserts_base
, n_tup_upd AS updates
, n_tup_upd AS updates_base
, n_tup_del AS deletes
, n_tup_del AS deletes_base
FROM pg_statio_user_tables stat_io
JOIN pg_stat_user_tables stat USING(relid)
WHERE stat_io.schemaname NOT LIKE 'pg_temp%';
LOOP
BEGIN
DROP TABLE IF EXISTS current_activity;
CREATE TEMP TABLE current_activity AS
SELECT
query_start
, MIN(pid) AS parent_pid
, COUNT(*) AS worker_count
FROM pg_stat_activity
WHERE datname = own_datname
AND pid <> own_pid
AND state = 'active'
GROUP BY query_start;
INSERT INTO query_stats(query_start, pid, max_parallelism, runtime_parallel)
SELECT *, '0'::INTERVAL FROM current_activity
ON CONFLICT DO NOTHING
;
UPDATE query_stats qs
SET runtime_parallel =
CASE
WHEN ca.worker_count > 1 THEN runtime_parallel + '1'::INTERVAL
ELSE runtime_parallel
END
, query_stop = NOW()
FROM current_activity ca
WHERE qs.pid = ca.parent_pid AND qs.query_start = ca.query_start
;
UPDATE workload_stats ws
SET heap_blks_read = src.heap_blks_read
, heap_blks_hit = src.heap_blks_hit
, idx_blks_read = src.idx_blks_read
, idx_blks_hit = src.idx_blks_hit
, toast_blks_read = src.toast_blks_read
, toast_blks_hit = src.toast_blks_hit
FROM pg_statio_user_tables src
WHERE ws.relid = src.relid;
UPDATE workload_stats ws
SET inserts = src.n_tup_ins
, updates = src.n_tup_upd
, deletes = src.n_tup_del
FROM pg_stat_user_tables src
WHERE ws.relid = src.relid;
CALL _s64da_impact_analyzer_print();
PERFORM pg_sleep(1);
EXCEPTION WHEN operator_intervention THEN
EXIT;
END;
COMMIT;
END LOOP;
END;
$$ LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment