Created
November 11, 2020 11:54
-
-
Save loganlinn/72315449d13d94d6fde71fb7643db55c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import os | |
import sys | |
from inspect import cleandoc | |
from collections import namedtuple | |
import clickhouse_driver | |
assert sys.version_info > (3,) | |
logger = logging.getLogger(sys.modules["__main__"].__file__) | |
def expand_macro(client, macro): | |
return client.execute("SELECT getMacro(%(macro)s)", {"macro": macro})[0][0] | |
def get_cluster_hosts(client, cluster): | |
"""Returns list of fully-qualified domain names for each host in the named cluster""" | |
return [ | |
row[0] | |
for row in client.execute( | |
"SELECT fqdn() FROM cluster(%(cluster)s, system.one)", {"cluster": cluster}, | |
) | |
] | |
def get_create_distributed_objects(client, hosts): | |
""" | |
Returns | |
names: list of datbase/table names | |
create_commands: a list of 'CREATE` commands""" | |
cluster_tables = "remote('{}', system, tables)".format(",".join(hosts)) | |
create_db_query = """ | |
SELECT | |
DISTINCT database AS name, | |
concat('CREATE DATABASE IF NOT EXISTS "', name, '"') AS create_query | |
FROM | |
( | |
SELECT DISTINCT arrayJoin([database, extract(engine_full, 'Distributed\\\\([^,]+, *\\\'?([^,\\\']+)\\\'?, *[^,]+')]) database | |
FROM {cluster_tables} tables | |
WHERE engine = 'Distributed' | |
SETTINGS skip_unavailable_shards = 1 | |
)""".format( | |
cluster_tables=cluster_tables | |
) | |
create_tables_query = """ | |
SELECT DISTINCT | |
concat(database, '.', name) AS name, | |
replaceRegexpOne(create_table_query, 'CREATE (TABLE|VIEW|MATERIALIZED VIEW)', 'CREATE \\\\1 IF NOT EXISTS') | |
FROM | |
( | |
SELECT | |
database, | |
name, | |
create_table_query, | |
2 AS order | |
FROM {cluster_tables} tables | |
WHERE engine = 'Distributed' | |
SETTINGS skip_unavailable_shards = 1 | |
UNION ALL | |
SELECT | |
extract(engine_full, 'Distributed\\\\([^,]+, *\\\'?([^,\\\']+)\\\'?, *[^,]+') AS database, | |
extract(engine_full, 'Distributed\\\\([^,]+, [^,]+, *\\\'?([^,\\\\\\\')]+)') AS name, | |
t.create_table_query, | |
1 AS order | |
FROM {cluster_tables} tables | |
LEFT JOIN ( | |
SELECT DISTINCT database, name, create_table_query | |
FROM {cluster_tables} | |
SETTINGS skip_unavailable_shards = 1 | |
) t USING (database, name) | |
WHERE engine = 'Distributed' AND t.create_table_query != '' | |
SETTINGS skip_unavailable_shards = 1 | |
) tables | |
ORDER BY order | |
""".format( | |
cluster_tables=cluster_tables | |
) | |
db_names, create_dbs = client.execute(create_db_query, columnar=True) | |
table_names, create_tables = client.execute(create_tables_query, columnar=True) | |
return DistributedObjects(db_names + table_names, create_dbs + create_tables) | |
DistributedObjects = namedtuple("DistributedObjects", ["names", "create_queries"]) | |
def get_client(host_or_url): | |
if str(host_or_url).startswith("clickhouse://"): | |
return clickhouse_driver.Client.from_url(host_or_url) | |
else: | |
return clickhouse_driver.Client(host=str(host_or_url)) | |
def main(): | |
args = parse_args() | |
setup_logging(args.verbosity) | |
host = args.hosts[0] | |
client = get_client(host) | |
cluster_name = args.cluster or expand_macro(client, "cluster") | |
cluster_hosts = get_cluster_hosts(client, cluster_name) | |
names, create_queries = get_create_distributed_objects(client, cluster_hosts) | |
logger.info("Executing CREATE commands for: %s", names) | |
for host in cluster_hosts: | |
c = get_client(host) | |
for query in create_queries: | |
# TODO retry attempts | |
if args.execute: | |
c.execute(query) | |
else: | |
logger.debug("SKIP: host=%s query=%s", host, query) | |
return 0 | |
def parse_args(): | |
parser = argparse.ArgumentParser(description="Manage schema of ClickHouse cluster.") | |
# parser.add_argument("--dsn", help="ClickHouse DSN, like clickhouse://[user:password]@localhost:9000/default") | |
parser.add_argument("--execute", action="store_true") | |
parser.add_argument("--cluster") | |
parser.add_argument("hosts", nargs="+") | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="count", | |
default=0, | |
dest="verbosity", | |
help="Verbosity (-v, -vv, etc)", | |
) | |
parser.add_argument( | |
"-q", | |
"--quiet", | |
action="store_const", | |
const=-1, | |
default=0, | |
dest="verbosity", | |
help="quiet output (show errors only)", | |
) | |
return parser.parse_args() | |
def setup_logging(verbosity): | |
base_loglevel = getattr(logging, (os.getenv("LOGLEVEL", "WARNING")).upper()) | |
verbosity = min(verbosity, 2) | |
loglevel = base_loglevel - (verbosity * 10) | |
logging.basicConfig(level=loglevel, format="[%(levelname)s] %(message)s") | |
if __name__ == "__main__": | |
exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment