Skip to content

Instantly share code, notes, and snippets.

@la-mar
Last active February 2, 2024 02:55
Show Gist options
  • Save la-mar/b7718bbd3d525dfd097299379814f02f to your computer and use it in GitHub Desktop.
Save la-mar/b7718bbd3d525dfd097299379814f02f to your computer and use it in GitHub Desktop.
-- Definitions for a set of informational views and useful procedures for working with compressed hypertables.
----------------------------------------------------------------------------------------------------
---- VIEWS ------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------
drop view if exists chunks_ts, chunks_tstz, chunk_stats, jobs, job_stats, compression_stats;
---- chunks_tstz -----------------------------------------------------------------------------------
/* Show time range, hypertable, and tablespace for chunks belonging to
hypertables that have a time dimension. Time range bounds are rendered in UTC.
This does not show chunks belonging to hypertables that use an
integer time dimension.
*/
create or replace view chunks_tstz as
select ch.table_name as chunk
, format('%1$I.%2$I', ch.schema_name, ch.table_name)::regclass as qualified_chunk
, format('%1$I.%2$I', ht.schema_name, ht.table_name)::regclass as hypertable
, tstzrange(case
when sl.range_start = -9223372036854775808 then null
else _timescaledb_internal.to_timestamp(sl.range_start) end,
case
when sl.range_end = 9223372036854775807 then null
else _timescaledb_internal.to_timestamp(sl.range_end) end)
as time_range
, (select nspname
from pg_class
join pg_namespace ns on relnamespace = ns.oid
where chunk_id = pg_class.oid) as tablespace
from _timescaledb_catalog.chunk ch
join _timescaledb_catalog.hypertable ht on ch.hypertable_id = ht.id
join _timescaledb_catalog.dimension di on di.hypertable_id = ht.id
join _timescaledb_catalog.chunk_constraint cn on cn.chunk_id = ch.id
join _timescaledb_catalog.dimension_slice sl on cn.dimension_slice_id = sl.id
where column_type = 'timestamptz'::regtype;
---- chunks_ts -----------------------------------------------------------------------------------
/* Show time range, hypertable, and tablespace for chunks belonging to
hypertables that have a time dimension. Time range bounds are rendered as naive timestamps.
This does not show chunks belonging to hypertables that use an
integer time dimension.
*/
create or replace view chunks_ts as
select format('%1$I.%2$I', ch.schema_name, ch.table_name)::regclass as chunk
, format('%1$I.%2$I', ht.schema_name, ht.table_name)::regclass as hypertable
, tsrange(case
when sl.range_start = -9223372036854775808 then null
else _timescaledb_internal.to_timestamp_without_timezone(sl.range_start) end,
case
when sl.range_end = 9223372036854775807 then null
else _timescaledb_internal.to_timestamp_without_timezone(sl.range_end) end)
as time_range
, (select nspname
from pg_class
join pg_namespace ns on relnamespace = ns.oid
where chunk_id = pg_class.oid) as tablespace
from _timescaledb_catalog.chunk ch
join _timescaledb_catalog.hypertable ht on ch.hypertable_id = ht.id
join _timescaledb_catalog.dimension di on di.hypertable_id = ht.id
join _timescaledb_catalog.chunk_constraint cn on cn.chunk_id = ch.id
join _timescaledb_catalog.dimension_slice sl on cn.dimension_slice_id = sl.id
where column_type = 'timestamp'::regtype;
---- chunk_stats -----------------------------------------------------------------------------------
/* Allllllll of the chunk information. */
create or replace view chunk_stats as
select *
from (
select hypertable_schema,
h.hypertable_name,
num_chunks as hypertable_num_chunks,
owner,
num_dimensions,
compression_enabled,
sizes.chunk_schema,
sizes.chunk_name,
qualified_chunk,
lower(time_range) as chunk_start_at,
upper(time_range) as chunk_end_at,
tablespace,
table_bytes,
index_bytes,
toast_bytes,
total_bytes,
pg_size_pretty(table_bytes) as table_size,
pg_size_pretty(index_bytes) as index_size,
pg_size_pretty(toast_bytes) as toast_size,
pg_size_pretty(total_bytes) as total_size,
sizes.node_name,
compression_status,
before_compression_table_bytes,
before_compression_index_bytes,
before_compression_toast_bytes,
before_compression_total_bytes,
after_compression_table_bytes,
after_compression_index_bytes,
after_compression_toast_bytes,
after_compression_total_bytes,
pg_size_pretty(before_compression_table_bytes) as before_compression_table_size,
pg_size_pretty(before_compression_index_bytes) as before_compression_index_size,
pg_size_pretty(before_compression_toast_bytes) as before_compression_toast_size,
pg_size_pretty(before_compression_total_bytes) as before_compression_total_size,
pg_size_pretty(after_compression_table_bytes) as after_compression_table_size,
pg_size_pretty(after_compression_index_bytes) as after_compression_index_size,
pg_size_pretty(after_compression_toast_bytes) as after_compression_toast_size,
pg_size_pretty(after_compression_total_bytes) as after_compression_total_size
from timescaledb_information.hypertables h
cross join chunks_detailed_size(h.hypertable_name::regclass) sizes
left join (
select h.hypertable_name,
chunk_schema,
chunk_name,
compression_status,
before_compression_table_bytes,
before_compression_index_bytes,
before_compression_toast_bytes,
before_compression_total_bytes,
after_compression_table_bytes,
after_compression_index_bytes,
after_compression_toast_bytes,
after_compression_total_bytes,
node_name
from timescaledb_information.hypertables h
cross join chunk_compression_stats(h.hypertable_name::regclass) stats) stats
on stats.chunk_name = sizes.chunk_name
left join chunks_tstz c on sizes.chunk_name = c.chunk) chunk_mecca;
---- jobs ------------------------------------------------------------------------------------------
/* Summary info of timescale background jobs */
create or replace view jobs as
select coalesce(ca.view_name, j.hypertable_name) as name,
job_id,
next_start,
j.hypertable_name,
application_name,
scheduled,
schedule_interval,
max_runtime,
max_retries,
retry_period,
config,
j.hypertable_schema,
owner,
proc_schema,
proc_name
from timescaledb_information.jobs j
left outer join timescaledb_information.continuous_aggregates ca
on j.hypertable_name = ca.materialization_hypertable_name
where coalesce(ca.view_name, j.hypertable_name) is not null;
---- job_stats -------------------------------------------------------------------------------------
/* Detail info on timescale background jobs */
create or replace view job_stats as
select j.name,
j.job_id,
j.application_name,
j.hypertable_name,
last_run_started_at,
last_successful_finish,
last_run_status,
job_status,
last_run_duration,
js.next_start,
total_runs,
total_successes,
total_failures
from jobs j
left outer join timescaledb_information.job_stats js on j.hypertable_name = js.hypertable_name
where name is not null;
---- compression_stats ------------------------------------------------------------------------------
/* Compression effeciency statistics */
create or replace view compression_stats as
select hypertable_name,
pg_size_pretty(sum(before_compression_total_bytes)) as original_size,
pg_size_pretty(sum(after_compression_total_bytes)) as compressed_size,
round(sum(before_compression_index_bytes::numeric / after_compression_index_bytes::numeric), 2) as ratio_index,
round(sum(before_compression_table_bytes::numeric / after_compression_table_bytes::numeric), 2) as ratio_table,
round(sum(before_compression_toast_bytes::numeric / after_compression_toast_bytes::numeric), 2) as ratio_toast,
round(sum(before_compression_total_bytes::numeric / after_compression_total_bytes::numeric), 2) as ratio_total
from chunk_stats
where compression_status = 'Compressed'
group by 1;
----------------------------------------------------------------------------------------------------
---- PROCEDURES ------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------
drop procedure if exists decompress_chunks();
create or replace procedure decompress_chunks(hypertable regclass, start_at timestamptz default null,
end_at timestamptz default null
)
language plpgsql
as
$$
/* Decompress all compressed chunks for the given hypertable within the specified time bounds. Chunks that
partially overlap the given time range will be included as candidates for decompression. If `start_at` is not
specified, it is interpreted as the beginning of the hypertable. Likewise, if `end_at` is not specified,
it is interpreted as the end of the hypertable. Thus, if neither `start_at` or `end_at` is specified, all
chunks in the hypertable will be decompressed.
Examples:
# decompress all chunks between 2021-11-01 and 2022-03-01
call decompress_chunks('mytable'::regclass, '2021-11-01'::timestamptz, '2022-03-01'::timestamptz);
# decompress all chunks that ended on or before 2022-03-01
call decompress_chunks('mytable'::regclass, null, '2022-03-01'::timestamptz);
# decompress all chunks that started on or before 2021-11-01
call decompress_chunks('mytable'::regclass, '2021-11-01'::timestamptz, null);
# decompress all chunks in the hypertable
call decompress_chunks('mytable'::regclass, null, null);
# decompress all chunks in the hypertable
call decompress_chunks('mytable'::regclass);
*/
declare
processed_count int = 0;
item record;
chunk_count int;
earliest_chunk_start_at timestamptz;
latest_chunk_end_at timestamptz;
begin
set search_path to _timescaledb_internal, public, timescaledb_information;
create temporary table compressed_chunks
(
chunk_name regclass,
chunk_start_at timestamptz,
chunk_end_at timestamptz
)
on commit drop;
insert into compressed_chunks(chunk_name, chunk_start_at, chunk_end_at)
select qualified_chunk as chunk_name, chunk_start_at, chunk_end_at
from chunk_stats
where hypertable_name::regclass = hypertable
and (chunk_end_at >= start_at or start_at is null) -- evaluate to true if start_at param wasn't specified
and (chunk_start_at <= end_at or end_at is null) -- evaluate to true if end_at param wasn't specified
and lower(compression_status) = 'compressed'
order by chunk_start_at;
---
select count(*), min(chunk_start_at), max(chunk_end_at)
into chunk_count, earliest_chunk_start_at, latest_chunk_end_at
from compressed_chunks;
raise notice '[%] Found % compressed chunks between % and % (earliest_chunk_start_at=%, latest_chunk_end=%)', hypertable, chunk_count, start_at, end_at, earliest_chunk_start_at, latest_chunk_end_at;
-- loop through ranges and execute updates
for item in (select *
from compressed_chunks
)
loop
perform decompress_chunk(item.chunk_name::regclass, if_compressed := true);
select processed_count + 1 into processed_count;
raise notice '[%] Completed decompressing chunk %/% (chunk_name=%)', hypertable, processed_count, chunk_count, item.chunk_name;
end loop;
end;
$$;
drop procedure if exists compress_chunks();
create or replace procedure compress_chunks(hypertable regclass, start_at timestamptz default null,
end_at timestamptz default null
)
language plpgsql
as
$$
/* Compress all uncompressed chunks for the given hypertable within the specified time bounds. Chunks that
partially overlap the given time range will be included as candidates for decompression. If `start_at` is
not specified, it is interpreted as the beginning of the hypertable. Likewise, if `end_at` is not specified,
it is interpreted as the end of the hypertable. Thus, if neither `start_at` or `end_at` is specified, all
chunks in the hypertable will be compressed (up to any chunks excluded by the hypertable's compression policy).
For example, if the hypertable has a compression policy that compresses chunks after 90 days, and chunks
within or overlapping the time range now() - interval '90 days' are excluded from being compressed.
Exceptions: An exception is raised if compression is not enabled on the specified hypertable.
Examples:
# compress all chunks between 2021-11-01 and 2022-03-01
call compress_chunks('mytable'::regclass, '2021-11-01'::timestamptz, '2022-03-01'::timestamptz);
# compress all chunks that ended on or before 2022-03-01
call compress_chunks('mytable'::regclass, null, '2022-03-01'::timestamptz);
# compress all chunks that started on or before 2021-11-01
call compress_chunks('mytable'::regclass, '2021-11-01'::timestamptz, null);
# compress all chunks in the hypertable
call compress_chunks('mytable'::regclass, null, null);
# compress all chunks in the hypertable (up to the chunk's covered by the hypertable's compression policy)
call compress_chunks('mytable'::regclass);
*/
declare
processed_count int = 0;
item record;
chunk_count int;
earliest_chunk_start_at timestamptz;
latest_chunk_end_at timestamptz;
compression_is_enabled bool = false;
compress_after interval default null;
begin
set session search_path to _timescaledb_internal, public, timescaledb_information;
select compression_enabled
into compression_is_enabled
from timescaledb_information.hypertables
where hypertable_name::regclass = hypertable;
if not compression_is_enabled then
raise exception '[%] Hypertable does not have compression enabled', hypertable;
end if;
select config ->> 'compress_after'
into compress_after
from jobs
where hypertable_name::regclass = hypertable;
if compress_after is not null then
raise notice '[%] Hypertable''s compression policy is set to compress after %v', hypertable, compress_after;
end if;
end_at =
least(end_at, transaction_timestamp() - compress_after::interval); -- never compress chunks that are excluded by the hypertable's compression policy..
create temporary table uncompressed_chunks
(
chunk_name regclass,
chunk_start_at timestamptz,
chunk_end_at timestamptz
)
on commit drop;
insert into uncompressed_chunks(chunk_name, chunk_start_at, chunk_end_at)
select qualified_chunk as chunk_name, chunk_start_at, chunk_end_at
from chunk_stats
where hypertable_name::regclass = hypertable
and (chunk_end_at >= start_at or start_at is null) -- filter by start_at if start_at is not null, else evaluate to true
and (chunk_start_at <= end_at or end_at is null) -- if end_at wasn't specified and the hypertable doesn't have a compression policy, always evaluate to true
and lower(compression_status) = 'uncompressed'
order by chunk_start_at;
---
select count(*), min(chunk_start_at), max(chunk_end_at)
into chunk_count, earliest_chunk_start_at, latest_chunk_end_at
from uncompressed_chunks;
raise notice '[%] Found % uncompressed chunks between % and % (earliest_chunk_start_at=%, latest_chunk_end=%)', hypertable, chunk_count, start_at, end_at, earliest_chunk_start_at, latest_chunk_end_at;
-- loop through ranges and execute updates
for item in (select * from uncompressed_chunks)
loop
perform compress_chunk(item.chunk_name::regclass, if_not_compressed := true);
select processed_count + 1 into processed_count;
raise notice '[%] Completed compressing chunk %/% (chunk_name=%)', hypertable, processed_count, chunk_count, item.chunk_name;
end loop;
end;
$$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment