-
-
Save yucer/09c2d1340def91f9427f81e6d24cd4f7 to your computer and use it in GitHub Desktop.
Python Mixin for SQLAlchemy autopartition by week (postgres)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from sqlalchemy.sql import text | |
_logger = logging.getLogger(__name__) | |
class ArchivableMixin: | |
"""Model which can be duplicated via one special materialized view""" | |
_union_cols = [] | |
_union_json_cols = [] # they need one special typecast | |
_partition_field = 'created_at' | |
@classmethod | |
def create_extra(cls, engine): | |
"""Create corresponding deduplication view""" | |
super().create_extra(engine) | |
src_table = cls.__tablename__ | |
cls.__liveview_name__ = f"v_{src_table}_live" | |
part_field = cls._partition_field | |
view_def = cls.get_default_live_view_def(src_table, part_field) | |
engine.execute(text(view_def)) | |
view_def = cls.get_create_automanage_fn_trg_def(src_table) | |
engine.execute(text(view_def)) | |
view_def = cls.get_enable_automanage_trg_def(src_table) | |
engine.execute(text(view_def)) | |
view_def = cls.get_date_index_def(src_table, part_field) | |
engine.execute(text(view_def)) | |
@classmethod | |
def get_create_automanage_fn_trg_def(cls, src_table): | |
"""Get definition for insert trigger that locates the destination | |
table and creates it when needed""" | |
automanage_fn_trg_def = (f""" | |
CREATE OR REPLACE FUNCTION trg_{src_table}_partition() | |
RETURNS trigger AS | |
$func$ | |
DECLARE | |
_tbl text := to_char(NEW.created_at, '"{src_table}_"IYYY_IW'); | |
_min_date date := date_trunc('week', NEW.created_at)::date; | |
_max_date date := date_trunc('week', NEW.created_at)::date + 7; | |
_min_live date := date_trunc('week', NEW.created_at)::date - 1; | |
BEGIN | |
IF NOT EXISTS ( | |
SELECT 1 | |
FROM pg_catalog.pg_class c | |
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace | |
WHERE n.nspname = 'public' | |
AND c.relname = _tbl | |
AND c.relkind = 'r') THEN | |
EXECUTE format('CREATE TABLE IF NOT EXISTS %I | |
(CHECK (created_at::date >= to_date(%L, ''yyyy-mm-dd'') AND | |
created_at::date < to_date(%L, ''yyyy-mm-dd'')), | |
LIKE {src_table} INCLUDING INDEXES ) | |
INHERITS ({src_table})' | |
, _tbl | |
, to_char(_min_date, 'YYYY-MM-DD') | |
, to_char(_max_date, 'YYYY-MM-DD') | |
); | |
IF (current_date >= _min_date) and (current_date < _max_date) THEN | |
EXECUTE format('CREATE OR REPLACE VIEW v_{src_table}_live AS ( | |
SELECT * FROM {src_table} | |
WHERE (created_at::date >= to_date(%L, ''yyyy-mm-dd'') AND | |
created_at::date < to_date(%L, ''yyyy-mm-dd'')) | |
)' | |
, to_char(_min_live, 'YYYY-MM-DD') | |
, to_char(_max_date, 'YYYY-MM-DD') | |
); | |
END IF; | |
END IF; | |
EXECUTE 'INSERT INTO ' || quote_ident(_tbl) || ' VALUES ($1.*)' | |
USING NEW; | |
RETURN NULL; | |
END | |
$func$ LANGUAGE plpgsql SET search_path = public; | |
""") | |
return automanage_fn_trg_def | |
@classmethod | |
def get_enable_automanage_trg_def(cls, src_table): | |
"""Get SQL for enable automanage trigger""" | |
union_view_def = (f""" | |
DROP TRIGGER IF EXISTS ins_{src_table} on {src_table}; | |
CREATE TRIGGER ins_{src_table} | |
BEFORE INSERT ON {src_table} | |
FOR EACH ROW EXECUTE PROCEDURE trg_{src_table}_partition(); | |
""") | |
return union_view_def | |
@classmethod | |
def get_date_index_def(cls, src_table, date_field): | |
"""Get SQL for date field index""" | |
date_index_def = (f""" | |
drop index if exists {src_table}_{date_field}_part_idx; | |
create unique index {src_table}_{date_field}_part_idx | |
ON {src_table}(({date_field}::date) DESC NULLS LAST, id); | |
""") | |
return date_index_def | |
@classmethod | |
def get_default_live_view_def(cls, src_table, date_field): | |
"""Get SQL for the view that points to the current partition. | |
At this point is the last two days. There is no planner optimization with | |
this query because current_date is dynamic""" | |
view_name = cls.__liveview_name__ | |
live_def = (f""" | |
CREATE OR REPLACE VIEW {view_name} AS | |
( | |
SELECT * FROM {src_table} | |
WHERE ({date_field}::date >= current_date - 1) | |
); | |
""") | |
return live_def |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment