docker run --rm --name statements -e POSTGRES_PASSWORD=postgres -d -p 5432:5432 postgres
export PGUSER=postgres
export PGPASSWORD=postgres
export PGHOST=0.0.0.0
psql -f init.sql
psql -f populate.sql
psql -f chart.sql
Last active
May 22, 2020 07:41
-
-
Save pgiraud/6a3178d2dada460615c9377395a8058f to your computer and use it in GitHub Desktop.
Statements
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- SELECT | |
-- anon_1.ts, | |
-- SUM(anon_1.calls) / Greatest(Extract(epoch FROM anon_1.mesure_interval), 1) AS calls | |
-- FROM ( | |
-- SELECT dbid, | |
-- ts, | |
-- Greatest(ts - Lead(ts) over (PARTITION BY dbid ORDER BY ts desc), '0 s') AS mesure_interval, | |
-- Greatest(SUM(calls) - Lead(SUM(calls)) over (PARTITION BY dbid ORDER BY ts desc), 0) AS calls, | |
-- LEAD(SUM(calls)) over (PARTITION BY dbid ORDEr BY ts desc) AS lead_calls_origin, | |
-- SUM(calls) AS calls_origin | |
-- FROM ( | |
-- SELECT agent_address, | |
-- agent_port, | |
-- dbid, | |
-- datname, | |
-- queryid, | |
-- userid, | |
-- base.ts, | |
-- base.calls | |
-- FROM statements.statements, | |
-- lateral | |
-- ( | |
-- SELECT * | |
-- FROM ( | |
-- SELECT row_number() over ( PARTITION BY queryid ORDER BY ts ) AS NUMBER, | |
-- count(*) over (PARTITION BY queryid) AS total, | |
-- * | |
-- FROM ( | |
-- SELECT (record).* | |
-- FROM statements.statements_history_current | |
-- WHERE (record).ts <@ tstzrange('2009-03-01 00:00'::timestamp, '2009-03-02 12:00'::timestamp, '[]') | |
-- AND queryid = statements.statements.queryid | |
-- AND userid = statements.statements.userid | |
-- AND dbid = statements.statements.dbid | |
-- AND datname = 'mydb' | |
-- AND agent_address = '0.0.0.0' | |
-- AND agent_port = 2345 | |
-- ) AS statements_history | |
-- ) AS sh | |
-- WHERE NUMBER % ( int8larger((total)/(10 +1),1) ) = 0 | |
-- ) AS base | |
-- ) by_db | |
-- GROUP BY dbid, ts | |
-- ORDER BY ts | |
-- ) AS anon_1 | |
-- WHERE TRUE | |
-- GROUP BY anon_1.ts, | |
-- anon_1.mesure_interval | |
-- ORDER BY anon_1.ts | |
-- ; | |
-- SELECT agent_address, | |
-- agent_port, | |
-- dbid, | |
-- datname, | |
-- queryid, | |
-- userid, | |
-- base.* | |
-- FROM statements.statements, | |
-- lateral | |
-- ( | |
-- SELECT * | |
-- FROM ( | |
-- SELECT row_number() OVER ( ORDER BY ts ) AS NUMBER, | |
-- count(*) OVER ( PARTITION BY 1 ) AS total, | |
-- * | |
-- FROM ( | |
-- SELECT (record).ts | |
-- FROM statements.statements_history_current | |
-- WHERE (record).ts <@ tstzrange('2009-03-01 00:00'::timestamp, '2009-03-02 12:00'::timestamp, '[]') | |
-- AND dbid = statements.statements.dbid | |
-- AND agent_address = '0.0.0.0' | |
-- AND agent_port = 2345 | |
-- GROUP BY (record).ts | |
-- ) AS statements_history | |
-- ) AS sh | |
-- WHERE NUMBER % ( int8larger((total)/(10 +1),1) ) = 0 | |
-- ) AS base | |
-- ; | |
-- SET max_parallel_workers_per_gather = 0; | |
-- SET track_io_timing TO on; | |
-- SELECT | |
-- anon_1.ts, | |
-- SUM(anon_1.calls) / Greatest(Extract(epoch FROM anon_1.mesure_interval), 1) AS calls_per_sec | |
-- FROM ( | |
-- SELECT | |
-- ts, | |
-- Greatest(ts - Lag(ts) over (ORDER BY ts), '0 s') AS mesure_interval, | |
-- Greatest(calls - Lag(calls) over (ORDER BY ts), 0) AS calls | |
-- FROM ( | |
-- SELECT * | |
-- FROM ( | |
-- SELECT row_number() OVER ( ORDER BY ts ) AS NUMBER, | |
-- count(*) OVER ( PARTITION BY 1 ) AS total, | |
-- * | |
-- FROM ( | |
-- select (record).* | |
-- from statements.statements_history_current_db AS shc | |
-- -- JOIN statements.statements AS st | |
-- -- ON st.agent_address = shc.agent_address | |
-- -- AND st.agent_port = shc.agent_port | |
-- -- AND st.dbid = shc.dbid | |
-- WHERE (record).ts <@ tstzrange('2009-03-02 10:00'::timestamp, '2009-03-02 11:00'::timestamp, '[]') | |
-- AND agent_address = '0.0.0.0' | |
-- AND agent_port = 2345 | |
-- ) as foo | |
-- ) AS sh | |
-- WHERE NUMBER % ( int8larger((total)/(20 +1),1) ) = 0 | |
-- ) AS base | |
-- ) AS anon_1 | |
-- WHERE TRUE | |
-- GROUP BY anon_1.ts, | |
-- anon_1.mesure_interval | |
-- ORDER BY anon_1.ts | |
-- ; | |
-- per database | |
-- EXPLAIN (ANALYZE, BUFFERS) | |
SELECT | |
anon_1.ts, | |
SUM(anon_1.calls) / Greatest(Extract(epoch FROM anon_1.mesure_interval), 1) AS calls_per_sec | |
FROM ( | |
SELECT | |
ts, | |
Greatest(ts - Lag(ts) over (ORDER BY ts), '0 s') AS mesure_interval, | |
Greatest(calls - Lag(calls) over (ORDER BY ts), 0) AS calls | |
FROM ( | |
SELECT * | |
FROM ( | |
SELECT row_number() OVER ( ORDER BY ts ) AS NUMBER, | |
count(*) OVER ( PARTITION BY 1 ) AS total, | |
* | |
FROM ( | |
SELECT (unnested.records).* AS record | |
FROM ( | |
SELECT psh.dbid, psh.coalesce_range, unnest(records) AS records | |
FROM statements.statements_history_db psh | |
WHERE coalesce_range && tstzrange(now() - interval '300 minutes', now(),'[]') | |
AND datname = 'mydatabase' | |
AND agent_address = '0.0.0.0' | |
AND agent_port = 2345 | |
) AS unnested | |
WHERE tstzrange((records).ts, (records).ts, '[]') <@ tstzrange(now() - interval '300 minutes', now(), '[]') | |
UNION ALL | |
select | |
(record).* | |
from statements.statements_history_current_db AS shc | |
WHERE tstzrange((record).ts, (record).ts, '[]') <@ tstzrange(now() - interval '300 minutes', now(), '[]') | |
AND datname = 'mydatabase' | |
AND agent_address = '0.0.0.0' | |
AND agent_port = 2345 | |
-- GROUP BY (record).ts | |
-- ORDER by (record).ts | |
) as foo | |
) AS sh | |
WHERE NUMBER % ( int8larger((total)/(50 +1),1) ) = 0 | |
) AS base | |
) AS anon_1 | |
WHERE TRUE | |
GROUP BY anon_1.ts, anon_1.mesure_interval | |
ORDER BY anon_1.ts | |
; | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- Insertion de données | |
-- On prend les données les plus récentes qui sont dans statements et on les | |
-- duplique dans le passé | |
-- | |
SET search_path TO statements, public; | |
WITH last_ts AS ( | |
select (record).ts | |
from statements.statements_history_current | |
group by (record).ts | |
order by (record).ts desc | |
limit 1 | |
) | |
DELETE FROM statements_history_current | |
WHERE (record).ts NOT IN (SELECT * FROM last_ts); | |
WITH last_ts AS ( | |
select (record).ts | |
from statements.statements_history_current_db | |
group by (record).ts | |
order by (record).ts desc | |
limit 1 | |
) | |
DELETE FROM statements_history_current_db | |
WHERE (record).ts NOT IN (SELECT * FROM last_ts); | |
TRUNCATE statements_src_tmp; | |
VACUUM ANALYZE; | |
-- EXPLAIN ANALYZE | |
-- WITH _last AS ( | |
-- select *, (record).* | |
-- from statements.statements_history_current | |
-- ) | |
-- INSERT INTO statements_src_tmp | |
-- SELECT | |
-- _last.agent_address, | |
-- _last.agent_port, | |
-- _last.ts - interval '1 minute', | |
-- _last.userid, | |
-- '', | |
-- _last.dbid, | |
-- '', | |
-- _last.queryid, | |
-- '', | |
-- 2, | |
-- _last.total_time, | |
-- _last.rows, | |
-- _last.shared_blks_hit, | |
-- _last.shared_blks_read, | |
-- _last.shared_blks_dirtied, | |
-- _last.shared_blks_written, | |
-- _last.local_blks_hit, | |
-- _last.local_blks_read, | |
-- _last.local_blks_dirtied, | |
-- _last.local_blks_written, | |
-- _last.temp_blks_read, | |
-- _last.temp_blks_written, | |
-- _last.blk_read_time, | |
-- _last.blk_write_time | |
-- FROM _last; | |
DROP table statements.statements_history_current_copy; | |
CREATE table statements.statements_history_current_copy | |
AS select * from statements.statements_history_current; | |
DROP table statements.statements_history_current_db_copy; | |
CREATE table statements.statements_history_current_db_copy | |
AS select * from statements.statements_history_current_db; | |
DROP PROCEDURE duplicate; | |
CREATE PROCEDURE duplicate() AS $PROC$ | |
DECLARE | |
num_loops integer := 60 * 24 * 100; | |
BEGIN | |
FOR i IN 1..num_loops LOOP | |
IF i % 100 = 0 THEN | |
RAISE NOTICE '% %', i, num_loops; | |
COMMIT; | |
END IF; | |
WITH _last AS ( | |
select s.*, (record).* | |
from statements.statements_history_current_copy shc | |
JOIN statements.statements s | |
ON s.queryid = shc.queryid | |
AND s.dbid = shc.dbid | |
AND s.userid = shc.userid | |
) | |
INSERT INTO statements_history_current | |
( | |
SELECT | |
_last.agent_address, | |
_last.agent_port, | |
_last.queryid, | |
_last.dbid, | |
_last.userid, | |
ROW( | |
_last.ts - i * interval '1 minute', | |
_last.calls - i, | |
_last.total_time, | |
_last.rows, | |
_last.shared_blks_hit, | |
_last.shared_blks_read, | |
_last.shared_blks_dirtied, | |
_last.shared_blks_written, | |
_last.local_blks_hit, | |
_last.local_blks_read, | |
_last.local_blks_dirtied, | |
_last.local_blks_written, | |
_last.temp_blks_read, | |
_last.temp_blks_written, | |
_last.blk_read_time, | |
_last.blk_write_time | |
)::statements.statements_history_record AS record | |
FROM _last | |
); | |
END LOOP; | |
FOR i IN 1..num_loops LOOP | |
IF i % 100 = 0 THEN | |
RAISE NOTICE '% %', i, num_loops; | |
COMMIT; | |
END IF; | |
WITH _last AS ( | |
select *, (record).* | |
from statements.statements_history_current_db_copy | |
) | |
INSERT INTO statements_history_current_db | |
( | |
SELECT | |
_last.agent_address, | |
_last.agent_port, | |
_last.dbid, | |
_last.datname, | |
ROW( | |
_last.ts - i * interval '1 minute', | |
_last.calls - i, | |
_last.total_time, | |
_last.rows, | |
_last.shared_blks_hit, | |
_last.shared_blks_read, | |
_last.shared_blks_dirtied, | |
_last.shared_blks_written, | |
_last.local_blks_hit, | |
_last.local_blks_read, | |
_last.local_blks_dirtied, | |
_last.local_blks_written, | |
_last.temp_blks_read, | |
_last.temp_blks_written, | |
_last.blk_read_time, | |
_last.blk_write_time | |
)::statements.statements_history_record AS record | |
FROM _last | |
); | |
END LOOP; | |
RAISE NOTICE 'populated'; | |
END; | |
$PROC$ LANGUAGE plpgsql; /* end of powa_statements_src */ | |
CALL duplicate(); | |
-- SELECT process_statements('0.0.0.0', 2345); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
DROP SCHEMA IF EXISTS application CASCADE; | |
CREATE SCHEMA application; | |
CREATE EXTENSION btree_gist; | |
SET search_path TO application, public; | |
CREATE OR REPLACE FUNCTION public.temboard_log (msg text) RETURNS void | |
LANGUAGE plpgsql | |
AS $_$ | |
BEGIN | |
RAISE NOTICE '%', msg; | |
END; | |
$_$; | |
CREATE TABLE instances ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
agent_key TEXT, | |
hostname TEXT NOT NULL, | |
cpu INTEGER, | |
memory_size BIGINT, | |
pg_port INTEGER, | |
pg_version TEXT, | |
pg_version_summary TEXT, | |
pg_data TEXT, | |
notify BOOLEAN DEFAULT true, | |
PRIMARY KEY (agent_address, agent_port) | |
); | |
DROP SCHEMA IF EXISTS statements CASCADE; | |
CREATE SCHEMA statements; | |
SET search_path TO statements, public; | |
BEGIN; | |
CREATE TABLE metas( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
coalesce_seq bigint NOT NULL default (1), | |
snapts timestamp with time zone NOT NULL default '-infinity'::timestamptz, | |
aggts timestamp with time zone NOT NULL default '-infinity'::timestamptz, | |
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port) | |
ON DELETE CASCADE | |
ON UPDATE CASCADE, | |
PRIMARY KEY (agent_address, agent_port) | |
); | |
CREATE TABLE statements ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
queryid BIGINT NOT NULL, | |
query TEXT NOT NULL, | |
dbid OID NOT NULL, | |
datname TEXT NOT NULL, | |
userid OID NOT NULL, | |
rolname TEXT NOT NULL, | |
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port) | |
ON DELETE CASCADE | |
ON UPDATE CASCADE, | |
PRIMARY KEY (agent_address, agent_port, queryid, dbid, userid) | |
); | |
CREATE TYPE statements_history_record AS ( | |
ts TIMESTAMP WITH TIME ZONE, | |
calls BIGINT, | |
total_time DOUBLE PRECISION, | |
rows BIGINT, | |
shared_blks_hit BIGINT, | |
shared_blks_read BIGINT, | |
shared_blks_dirtied BIGINT, | |
shared_blks_written BIGINT, | |
local_blks_hit BIGINT, | |
local_blks_read BIGINT, | |
local_blks_dirtied BIGINT, | |
local_blks_written BIGINT, | |
temp_blks_read BIGINT, | |
temp_blks_written BIGINT, | |
blk_read_time DOUBLE PRECISION, | |
blk_write_time DOUBLE PRECISION | |
); | |
CREATE UNLOGGED TABLE statements_src_tmp ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
ts TIMESTAMP WITH TIME ZONE NOT NULL, | |
userid oid NOT NULL, | |
rolname TEXT NOT NULL, | |
dbid oid NOT NULL, | |
datname TEXT NOT NULL, | |
queryid BIGINT NOT NULL, | |
query TEXT NOT NULL, | |
calls BIGINT NOT NULL, | |
total_time DOUBLE PRECISION NOT NULL, | |
rows BIGINT NOT NULL, | |
shared_blks_hit BIGINT NOT NULL, | |
shared_blks_read BIGINT NOT NULL, | |
shared_blks_dirtied BIGINT NOT NULL, | |
shared_blks_written BIGINT NOT NULL, | |
local_blks_hit BIGINT NOT NULL, | |
local_blks_read BIGINT NOT NULL, | |
local_blks_dirtied BIGINT NOT NULL, | |
local_blks_written BIGINT NOT NULL, | |
temp_blks_read BIGINT NOT NULL, | |
temp_blks_written BIGINT NOT NULL, | |
blk_read_time DOUBLE PRECISION NOT NULL, | |
blk_write_time DOUBLE PRECISION NOT NULL | |
); | |
CREATE TABLE statements_history ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
queryid BIGINT NOT NULL, | |
dbid oid NOT NULL, | |
userid oid NOT NULL, | |
coalesce_range tstzrange NOT NULL, | |
records statements_history_record[] NOT NULL, | |
mins_in_range statements_history_record NOT NULL, | |
maxs_in_range statements_history_record NOT NULL, | |
FOREIGN KEY (agent_address, agent_port, queryid, dbid, userid) REFERENCES statements | |
ON DELETE CASCADE ON UPDATE CASCADE | |
); | |
CREATE INDEX ON statements_history USING gist (agent_address, agent_port, queryid, coalesce_range); | |
CREATE TABLE statements_history_db ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
dbid oid NOT NULL, | |
coalesce_range tstzrange NOT NULL, | |
records statements_history_record[] NOT NULL, | |
mins_in_range statements_history_record NOT NULL, | |
maxs_in_range statements_history_record NOT NULL, | |
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port) | |
ON DELETE CASCADE ON UPDATE CASCADE | |
); | |
CREATE INDEX ON statements_history_db USING gist (agent_address, agent_port, dbid, coalesce_range); | |
CREATE TABLE statements_history_current ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
queryid BIGINT NOT NULL, | |
dbid OID NOT NULL, | |
userid OID NOT NULL, | |
record statements_history_record NOT NULL, | |
FOREIGN KEY (agent_address, agent_port, queryid, dbid, userid) REFERENCES statements ON DELETE CASCADE ON UPDATE CASCADE | |
); | |
CREATE INDEX ON statements_history_current (agent_address, agent_port, dbid, userid, queryid); | |
CREATE INDEX on statements_history_current USING GIST (tstzrange((record).ts, (record).ts, '[]')); | |
CREATE TABLE statements_history_current_db ( | |
agent_address TEXT NOT NULL, | |
agent_port INTEGER NOT NULL, | |
dbid OID NOT NULL, | |
datname TEXT NOT NULL, | |
record statements_history_record NOT NULL, | |
FOREIGN KEY (agent_address, agent_port) REFERENCES application.instances (agent_address, agent_port) ON DELETE CASCADE ON UPDATE CASCADE | |
); | |
CREATE INDEX ON statements_history_current_db (agent_address, agent_port, dbid); | |
CREATE INDEX ON statements_history_current_db (agent_address, agent_port); | |
CREATE INDEX on statements_history_current_db USING GIST (tstzrange((record).ts, (record).ts, '[]')); | |
CREATE OR REPLACE FUNCTION statements_aggregate(_address text, _port integer) | |
RETURNS void AS $PROC$ | |
DECLARE | |
v_funcname text := 'statements_aggregate(' || _address || ':' || _port || ')'; | |
v_rowcount bigint; | |
BEGIN | |
-- PERFORM temboard_log(format('running %I', v_funcname)); | |
-- PERFORM prevent_concurrent_snapshot(_srvid); | |
-- aggregate statements table | |
INSERT INTO statements_history | |
SELECT agent_address, agent_port, queryid, dbid, userid, | |
tstzrange(min((record).ts), max((record).ts),'[]'), | |
array_agg(record), | |
ROW(min((record).ts), | |
min((record).calls),min((record).total_time),min((record).rows), | |
min((record).shared_blks_hit),min((record).shared_blks_read), | |
min((record).shared_blks_dirtied),min((record).shared_blks_written), | |
min((record).local_blks_hit),min((record).local_blks_read), | |
min((record).local_blks_dirtied),min((record).local_blks_written), | |
min((record).temp_blks_read),min((record).temp_blks_written), | |
min((record).blk_read_time),min((record).blk_write_time))::statements_history_record, | |
ROW(max((record).ts), | |
max((record).calls),max((record).total_time),max((record).rows), | |
max((record).shared_blks_hit),max((record).shared_blks_read), | |
max((record).shared_blks_dirtied),max((record).shared_blks_written), | |
max((record).local_blks_hit),max((record).local_blks_read), | |
max((record).local_blks_dirtied),max((record).local_blks_written), | |
max((record).temp_blks_read),max((record).temp_blks_written), | |
max((record).blk_read_time),max((record).blk_write_time))::statements_history_record | |
FROM statements_history_current | |
WHERE agent_address = _address AND agent_port = _port | |
GROUP BY agent_address, agent_port, queryid, dbid, userid; | |
GET DIAGNOSTICS v_rowcount = ROW_COUNT; | |
-- perform temboard_log(format('%I (statements_history) - rowcount: %s', | |
-- v_funcname, v_rowcount)); | |
DELETE FROM statements_history_current | |
WHERE agent_address = _address AND agent_port = _port; | |
-- aggregate db table | |
INSERT INTO statements_history_db | |
SELECT agent_address, agent_port, dbid, | |
tstzrange(min((record).ts), max((record).ts),'[]'), | |
array_agg(record), | |
ROW(min((record).ts), | |
min((record).calls),min((record).total_time),min((record).rows), | |
min((record).shared_blks_hit),min((record).shared_blks_read), | |
min((record).shared_blks_dirtied),min((record).shared_blks_written), | |
min((record).local_blks_hit),min((record).local_blks_read), | |
min((record).local_blks_dirtied),min((record).local_blks_written), | |
min((record).temp_blks_read),min((record).temp_blks_written), | |
min((record).blk_read_time),min((record).blk_write_time))::statements_history_record, | |
ROW(max((record).ts), | |
max((record).calls),max((record).total_time),max((record).rows), | |
max((record).shared_blks_hit),max((record).shared_blks_read), | |
max((record).shared_blks_dirtied),max((record).shared_blks_written), | |
max((record).local_blks_hit),max((record).local_blks_read), | |
max((record).local_blks_dirtied),max((record).local_blks_written), | |
max((record).temp_blks_read),max((record).temp_blks_written), | |
max((record).blk_read_time),max((record).blk_write_time))::statements_history_record | |
FROM statements_history_current_db | |
WHERE agent_address = _address AND agent_port = _port | |
GROUP BY agent_address, agent_port, dbid; | |
GET DIAGNOSTICS v_rowcount = ROW_COUNT; | |
-- perform temboard_log(format('%I (statements_history_db) - rowcount: %s', | |
-- v_funcname, v_rowcount)); | |
DELETE FROM statements_history_current_db | |
WHERE agent_address = _address AND agent_port = _port; | |
END; | |
$PROC$ LANGUAGE plpgsql; /* end of statements_aggregate */ | |
CREATE OR REPLACE FUNCTION process_statements(_address text, _port integer) RETURNS void AS $PROC$ | |
DECLARE | |
v_rowcount bigint; | |
v_coalesce integer := 100; | |
purge_seq bigint; | |
BEGIN | |
-- In this function, we process statements that have just been rerieved | |
-- from agent, and also aggregate counters by database | |
-- PERFORM temboard_log('start of process_statements(' || _address || ':' || _port || ')'); | |
-- Create new meta for agent if doesn't already exist | |
INSERT INTO metas (agent_address, agent_port) VALUES (_address, _port) | |
ON CONFLICT DO NOTHING; | |
-- Update meta with info from the current proccess (snapshot) | |
UPDATE metas | |
SET coalesce_seq = coalesce_seq + 1, | |
snapts = now() | |
WHERE agent_address = _address AND agent_port = _port | |
RETURNING coalesce_seq INTO purge_seq; | |
-- PERFORM temboard_log(format('coalesce_seq(%s:%s): %s', _address, _port, purge_seq)); | |
-- Lock table to prevent multiple snapshots to work at the same time | |
-- and insert data from statements_src_tmp table multiple times | |
-- This would lead to incoherent data | |
-- Use NOWAIT to avoid waiting for lock to be released | |
LOCK TABLE statements.statements, statements.statements_history_current, statements.statements_history_current_db | |
IN SHARE MODE NOWAIT; | |
WITH capture AS( | |
SELECT * | |
FROM statements.statements_src_tmp | |
WHERE agent_address = _address AND agent_port = _port | |
), | |
missing_statements AS ( | |
INSERT INTO statements.statements (agent_address, agent_port, queryid, query, dbid, datname, userid, rolname) | |
SELECT _address, _port, queryid, query, dbid, datname, userid, rolname | |
FROM capture c | |
ON CONFLICT DO NOTHING | |
), | |
by_query AS ( | |
INSERT INTO statements.statements_history_current | |
SELECT _address, _port, queryid, dbid, userid, | |
ROW( | |
ts, calls, total_time, rows, shared_blks_hit, shared_blks_read, | |
shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read, | |
local_blks_dirtied, local_blks_written, temp_blks_read, temp_blks_written, | |
blk_read_time, blk_write_time | |
)::statements.statements_history_record AS record | |
FROM capture | |
), | |
by_database AS ( | |
INSERT INTO statements.statements_history_current_db | |
SELECT _address, _port, dbid, datname, | |
ROW( | |
ts, sum(calls), sum(total_time), sum(rows), sum(shared_blks_hit), sum(shared_blks_read), | |
sum(shared_blks_dirtied), sum(shared_blks_written), sum(local_blks_hit), sum(local_blks_read), | |
sum(local_blks_dirtied), sum(local_blks_written), sum(temp_blks_read), sum(temp_blks_written), | |
sum(blk_read_time), sum(blk_write_time) | |
)::statements.statements_history_record AS record | |
FROM capture | |
GROUP BY dbid, datname, ts | |
) | |
SELECT count(*) INTO v_rowcount | |
FROM capture; | |
-- Coalesce datas if needed | |
IF ( (purge_seq % v_coalesce ) = 0 ) | |
THEN | |
PERFORM temboard_log( | |
format('coalesce needed, agent: %s - seq: %s - coalesce seq: %s', | |
_address, _port, purge_seq, v_coalesce )); | |
UPDATE metas | |
SET aggts = now() | |
WHERE agent_address = _address AND agent_port = _port; | |
-- PERFORM temboard_log(format('aggregating: %s:%s', _address, _port)); | |
EXECUTE format('SELECT statements_aggregate(''%s'', %s)', _address, _port); | |
END IF; | |
-- PERFORM temboard_log(format('%s statements were added', v_rowcount)); | |
DELETE FROM statements.statements_src_tmp WHERE agent_address = _address AND agent_port = _port; | |
END; | |
$PROC$ language plpgsql; /* end of process_statements */ | |
COMMIT; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | |
-- Insertion de données | |
-- | |
SET search_path TO statements, public; | |
INSERT INTO application.instances (agent_address, agent_port, hostname) | |
VALUES ('0.0.0.0', 2345, 'instance.fqdn'); | |
TRUNCATE statements_src_tmp; | |
TRUNCATE statements_history_current; | |
TRUNCATE statements_history_current_db; | |
TRUNCATE statements_history; | |
TRUNCATE statements_history_db; | |
TRUNCATE statements CASCADE; | |
Create or replace function random_phrase() returns text as | |
$$ | |
declare | |
result text := ''; | |
begin | |
with symbols(characters) as ( | |
VALUES ('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789') | |
) | |
SELECT string_agg(substr(word, (random() * 8 + 1 )::integer) , ' ') as phrase | |
INTO result | |
FROM ( | |
select string_agg(substr(characters, (random() * length(characters) + 1) :: INTEGER, 1), '') as word | |
from symbols | |
join generate_series(1, 10) as word(chr_idx) on 1 = 1 -- word length | |
join generate_series(1,100) as words(idx) on 1 = 1 -- # of words | |
group by idx | |
) as foo; | |
return result; | |
end; | |
$$ language plpgsql; | |
DROP PROCEDURE populate; | |
CREATE PROCEDURE populate() AS $PROC$ | |
DECLARE | |
num_statements integer := 100; | |
num_loops integer := 60; | |
_end timestamp = now(); | |
_start timestamp := _end - interval '1 month'; | |
BEGIN | |
FOR i IN 1..num_statements LOOP | |
INSERT INTO statements | |
VALUES ('0.0.0.0', 2345, i, random_phrase(), 16392, 'tpc', 1, 'toto'); | |
END LOOP; | |
FOR i IN 1..num_statements LOOP | |
DROP SEQUENCE IF EXISTS calls_seq; | |
CREATE SEQUENCE calls_seq INCREMENT BY 6000; | |
INSERT INTO statements_history_current | |
VALUES ( | |
'0.0.0.0', | |
2345, | |
i, | |
16392, | |
1, | |
ROW( | |
generate_series(_start, _end, '1 minute'), | |
nextval('calls_seq'), | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0 | |
)::statements.statements_history_record | |
); | |
END LOOP; | |
DROP SEQUENCE IF EXISTS calls_seq; | |
CREATE SEQUENCE calls_seq INCREMENT BY 6000; | |
INSERT INTO statements_history_current_db | |
VALUES ( | |
'0.0.0.0', | |
2345, | |
16392, | |
'tpc', | |
ROW( | |
generate_series(_start, _end, '1 minute'), | |
nextval('calls_seq') * num_statements, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0, | |
0 | |
)::statements.statements_history_record | |
); | |
END; | |
$PROC$ LANGUAGE plpgsql; | |
-- CALL populate(); | |
CREATE OR REPLACE FUNCTION populate_src(_num_snapshots integer, datname text, dbid integer) RETURNS void AS $PROC$ | |
DECLARE | |
_now timestamp := now(); | |
ts timestamp; | |
num_statements integer := 500; | |
statements text[]; | |
BEGIN | |
DROP SEQUENCE IF EXISTS calls_seq; | |
CREATE SEQUENCE calls_seq INCREMENT BY 60; | |
-- INSERT INTO statements_src_tmp | |
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 1, 'mydb', 1, _statement, nextval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1monthago, _1weekago, '1 minute'); | |
SELECT array(select random_phrase() FROM generate_series(0, num_statements)) | |
INTO statements; | |
FOR n IN REVERSE _num_snapshots..0 LOOP | |
PERFORM nextval('calls_seq'); | |
IF (_num_snapshots - n) % 100 = 0 THEN | |
RAISE NOTICE 'snapshot % / %, calls %', _num_snapshots - n, _num_snapshots, currval('calls_seq'); | |
RAISE NOTICE '%', clock_timestamp(); | |
END IF; | |
ts = _now - (n || ' minutes')::INTERVAL; | |
FOR i IN 1..num_statements LOOP | |
INSERT INTO statements_src_tmp | |
VALUES ( | |
'0.0.0.0', | |
2345, | |
ts, | |
1, | |
'toto', | |
dbid, | |
datname, | |
i, | |
statements[i], | |
currval('calls_seq'), | |
10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 | |
); | |
END LOOP; | |
PERFORM process_statements('0.0.0.0', 2345); | |
END LOOP; | |
-- INSERT INTO statements_src_tmp | |
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 1, 'mydb', 1, _statement, currval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1dayago + interval '1 minute', _now, '1 minute'); | |
-- Même base, autre queryid | |
-- DROP SEQUENCE calls_seq; | |
-- CREATE SEQUENCE calls_seq INCREMENT BY 6000; | |
-- INSERT INTO statements_src_tmp | |
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 1, 'mydb', 321, 'SELECT $1 + $2', nextval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1weekago, _now, '1 minute'); | |
-- -- Autre base, même queryid | |
-- DROP SEQUENCE calls_seq; | |
-- CREATE SEQUENCE calls_seq INCREMENT BY 6000; | |
-- INSERT INTO statements_src_tmp | |
-- SELECT '0.0.0.0', 2345, *, 1, 'toto', 123, 'other_db', 1, _statement, nextval('calls_seq'), 10, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0 FROM generate_series(_1weekago, _now, '1 minute'); | |
RAISE NOTICE 'populated'; | |
END; | |
$PROC$ LANGUAGE plpgsql; /* end of powa_statements_src */ | |
DROP INDEX statements_history_current_agent_address_agent_port_dbid_us_idx; | |
DROP INDEX statements_history_current_tstzrange_idx; | |
DROP INDEX statements_history_current_db_agent_address_agent_port_dbid_idx; | |
DROP INDEX statements_history_current_db_agent_address_agent_port_idx; | |
DROP INDEX statements_history_current_db_tstzrange_idx; | |
DROP INDEX statements_history_agent_address_agent_port_queryid_coalesc_idx; | |
DROP INDEX statements_history_db_agent_address_agent_port_dbid_coalesc_idx; | |
SELECT populate_src(60 * 24 * 4, 'new_database', 1); -- 10 days | |
-- SELECT populate_src((now() - interval '5 hours')::timestamp, (now() - interval '4 hour')::timestamp, 1, 'mydb'); | |
-- SELECT process_statements('0.0.0.0', 2345); | |
-- SELECT populate_src((now() - interval '4 hours')::timestamp, (now() - interval '3 hour')::timestamp, 1, 'mydb'); | |
-- SELECT process_statements('0.0.0.0', 2345); | |
-- SELECT populate_src((now() - interval '3 hours')::timestamp, (now() - interval '2 hour')::timestamp, 1, 'mydb'); | |
-- SELECT process_statements('0.0.0.0', 2345); | |
-- SELECT populate_src((now() - interval '2 hours')::timestamp, (now() - interval '1 hour')::timestamp, 1, 'mydb'); | |
-- SELECT process_statements('0.0.0.0', 2345); | |
-- DROP FUNCTION populate_src(); | |
VACUUM ANALYZE; | |
SELECT count(*) from statements_history_current_db; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SET search_path TO statements, public; | |
TRUNCATE statements_src_tmp; | |
TRUNCATE statements_history_current; | |
TRUNCATE statements_history_current_db; | |
TRUNCATE statements_history; | |
TRUNCATE statements_history_db; | |
TRUNCATE statements CASCADE; | |
INSERT INTO application.instances (agent_address, agent_port, hostname) | |
VALUES ('0.0.0.0', 2345, 'instance.fqdn'); | |
CREATE OR REPLACE FUNCTION populate_src(_num_snapshots integer) RETURNS void AS $PROC$ | |
DECLARE | |
ts timestamp; | |
num_statements integer := 500; | |
statements text[]; | |
BEGIN | |
FOR n IN REVERSE _num_snapshots..0 LOOP | |
ts = clock_timestamp() - (n || ' minutes')::interval; | |
IF n % 100 = 0 THEN | |
RAISE NOTICE 'snapshot % / %', n, _num_snapshots; | |
RAISE NOTICE '%', ts; | |
END IF; | |
INSERT INTO statements_src_tmp | |
SELECT | |
'0.0.0.0', | |
2345, | |
ts, | |
userid, | |
rolname, | |
dbid, | |
datname, | |
queryid, | |
query, | |
calls, | |
total_time, | |
rows, | |
shared_blks_hit, | |
shared_blks_read, | |
shared_blks_dirtied, | |
shared_blks_written, | |
local_blks_hit, | |
local_blks_read, | |
local_blks_dirtied, | |
local_blks_written, | |
temp_blks_read, | |
temp_blks_written, | |
blk_read_time, | |
blk_write_time | |
FROM pg_stat_statements pgss | |
JOIN pg_authid ON pgss.userid = pg_authid.oid | |
JOIN pg_database ON pgss.dbid = pg_database.oid; | |
PERFORM process_statements('0.0.0.0', 2345); | |
END LOOP; | |
RAISE NOTICE 'populated'; | |
END; | |
$PROC$ LANGUAGE plpgsql; /* end of powa_statements_src */ | |
-- SELECT populate_src(60 * 24 * 10, 'new_database', 1); -- 10 days | |
SELECT populate_src(60 * 24 * 1); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment