Created
February 18, 2020 18:16
-
-
Save Andrei-Pozolotin/7bb44f13eeb26512d3c8196b14101f9d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
""" | |
import tester | |
import logging | |
from logging.config import fileConfig | |
from sqlalchemy import pool | |
from sqlalchemy import engine_from_config | |
from alembic import context | |
logger = logging.getLogger(__name__) | |
# this is the Alembic Config object, which provides | |
# access to the values within the .ini file in use. | |
config = context.config | |
# Interpret the config file for Python logging. | |
# This line sets up loggers basically. | |
fileConfig(config.config_file_name) | |
# add your model's MetaData object here | |
# for 'autogenerate' support | |
from tester.dbms.alembic import AlembicUnit | |
target_metadata = AlembicUnit.target_metadata | |
# other values from the config, defined by the needs of env.py, | |
# can be acquired: | |
# my_important_option = config.get_main_option("my_important_option") | |
# ... etc. | |
def run_migrations_offline(): | |
""" | |
Run migrations in 'offline' mode. | |
""" | |
url = config.get_main_option("sqlalchemy.url") | |
context.configure( | |
url=url, | |
target_metadata=target_metadata, | |
literal_binds=True, | |
dialect_opts={"paramstyle": "named"}, | |
) | |
with context.begin_transaction(): | |
context.run_migrations() | |
def run_migrations_online(): | |
""" | |
Run migrations in 'online' mode. | |
""" | |
def process_revision_directives(context, revision, directives): | |
if config.cmd_opts.autogenerate: | |
script = directives[0] | |
if script.upgrade_ops.is_empty(): | |
directives[:] = [] | |
engine = engine_from_config( | |
config.get_section(config.config_ini_section), | |
prefix="sqlalchemy.", | |
poolclass=pool.NullPool, | |
) | |
with engine.connect() as connection: | |
AlembicUnit.attach_intercept(connection) | |
context.configure( | |
compare_type=True, | |
include_schemas=True, | |
connection=connection, | |
target_metadata=target_metadata, | |
process_revision_directives=process_revision_directives, | |
) | |
with context.begin_transaction(): | |
context.execute('SET search_path TO public') | |
context.run_migrations() | |
if context.is_offline_mode(): | |
run_migrations_offline() | |
else: | |
run_migrations_online() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
schema evolution support | |
""" | |
from contextvars import ContextVar | |
import sqlalchemy as sa | |
from alembic.ddl import base as ale_ddl | |
from sqlalchemy import engine | |
from sqlalchemy import event | |
from sqlalchemy.sql import ddl as sa_ddl | |
from tester.dbms.filer.base import ClusterFilerDBMS | |
from tester.dbms.model import ModelDBMS | |
from tester.dbms.tracker.base import ClusterTrackerDBMS | |
alembic_table_name = ContextVar("alembic_table_name", default=None) | |
alembic_visit_name = ContextVar("alembic_visit_name", default=None) | |
class AlembicUnit: | |
"schema evolution support" | |
target_metadata = [ | |
ModelDBMS.metadata, | |
ClusterFilerDBMS.metadata, | |
ClusterTrackerDBMS.metadata, | |
] | |
@classmethod | |
def report_mapping(cls, source:str, target:str): | |
"render generated statement changes" | |
print(f"==============") | |
print(f"--- source ---") | |
print(source) | |
print(f"--- target ---") | |
print(target) | |
print(f"--------------") | |
@classmethod | |
def replicate_statement(cls, source:str) -> str: | |
"replicate DDL statement thoughout cluster" | |
table_name = alembic_table_name.get() | |
if table_name: | |
visit_name = alembic_visit_name.get() | |
set_name = "default" | |
if visit_name == "create_table": | |
target = f"SELECT pglogical_replicate_create('{set_name}','{table_name}',$${source}$$)" | |
elif visit_name == "drop_table": | |
target = f"SELECT pglogical_replicate_delete('{set_name}','{table_name}',$${source}$$)" | |
else: | |
target = f"SELECT pglogical_replicate_modify('{set_name}','{table_name}',$${source}$$)" | |
cls.report_mapping(source, target) | |
else: | |
target = source | |
return target | |
@classmethod | |
def attach_intercept(cls, connection:engine.Connection): | |
"setup ddl statement cluster replication" | |
@event.listens_for(connection, "before_execute", retval=True) | |
def intercept_clause(conn, clause, multi_param, param): | |
"extract table name and command type" | |
if isinstance(clause, sa_ddl.DDLElement): | |
table_name = None | |
visit_name = clause.__visit_name__ | |
if "table" in visit_name: | |
table_name = clause.element.name | |
elif "index" in visit_name: | |
table_name = clause.element.table.name | |
elif isinstance(clause, ale_ddl.AlterTable): | |
table_name = clause.table_name | |
else: | |
assert False, f"no type: visit_name={visit_name} clause={clause} clause_type={type(clause)}" | |
assert isinstance(table_name, str), f"no text: {table_name} {type(table_name)}" | |
alembic_table_name.set(table_name) | |
alembic_visit_name.set(visit_name) | |
else: | |
alembic_table_name.set(None) | |
alembic_visit_name.set(None) | |
return clause, multi_param, param | |
@event.listens_for(connection, "before_cursor_execute", retval=True) | |
def intercept_statement(conn, cursor, statement, parameters, context, executemany): | |
"apply cluster replication command" | |
statement = cls.replicate_statement(statement) | |
return statement, parameters |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
CREATE OR REPLACE FUNCTION pglogical_table_register( | |
set_name text, relation text | |
) | |
RETURNS text LANGUAGE plpgsql AS $FUN$ | |
BEGIN | |
IF relation LIKE 'cluster_%' THEN | |
RETURN format($$ | |
SELECT pglogical.replication_set_add_table( | |
set_name := '%s', relation := '%s' | |
); | |
$$, set_name, relation); | |
ELSE | |
RETURN ''; | |
END IF; | |
END; | |
$FUN$; | |
--- | |
CREATE OR REPLACE FUNCTION pglogical_table_unregister( | |
set_name text, relation text | |
) | |
RETURNS text LANGUAGE plpgsql AS $FUN$ | |
BEGIN | |
IF relation LIKE 'cluster_%' THEN | |
RETURN format($$ | |
SELECT pglogical.replication_set_remove_table( | |
set_name := '%s', relation := '%s' | |
); | |
$$, set_name, relation); | |
ELSE | |
RETURN ''; | |
END IF; | |
END; | |
$FUN$; | |
-- | |
-- invoke sql on this none | |
-- | |
CREATE OR REPLACE FUNCTION pglogical_replicate_invoke( | |
command text | |
) | |
RETURNS void LANGUAGE plpgsql AS $FUN$ | |
DECLARE | |
cluster_ident_min text := current_setting('cluster.ident_min'); | |
cluster_ident_max text := current_setting('cluster.ident_max'); | |
BEGIN | |
command := replace(command, '{cluster.ident_min}', cluster_ident_min); | |
command := replace(command, '{cluster.ident_max}', cluster_ident_max); | |
command := format($$ | |
SET search_path TO public; | |
%s; | |
$$, command); | |
EXECUTE command; | |
END; | |
$FUN$; | |
-- | |
-- replicate invoke command | |
-- | |
CREATE OR REPLACE FUNCTION pglogical_replicate_modify( | |
set_name text, relation text, command text | |
) | |
RETURNS void LANGUAGE plpgsql AS $FUN$ | |
DECLARE | |
command_modify text := format($SQL$ | |
SET search_path TO public; | |
SELECT pglogical_replicate_invoke($$ %s $$); | |
$SQL$, command); | |
BEGIN | |
PERFORM pglogical.replicate_ddl_command( | |
command := command_modify, replication_sets := ARRAY[set_name] | |
); | |
END; | |
$FUN$; | |
--- | |
CREATE OR REPLACE FUNCTION pglogical_replicate_create( | |
set_name text, relation text, command text | |
) | |
RETURNS void LANGUAGE plpgsql AS $FUN$ | |
DECLARE | |
command_create text := format($$ | |
%s; %s; | |
$$, command, pglogical_table_register(set_name, relation)); | |
BEGIN | |
PERFORM pglogical_replicate_modify(set_name, relation, command_create); | |
END; | |
$FUN$; | |
--- | |
CREATE OR REPLACE FUNCTION pglogical_replicate_delete( | |
set_name text, relation text, command text | |
) | |
RETURNS void LANGUAGE plpgsql AS $FUN$ | |
DECLARE | |
command_delete text := format($$ | |
%s; %s; | |
$$, pglogical_table_unregister(set_name, relation), command); | |
BEGIN | |
PERFORM pglogical_replicate_modify(set_name, relation, command_delete); | |
END; | |
$FUN$; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment