-
-
Save mcarbonneaux/f0b9d70a666dd804cd6581abbe9e67e8 to your computer and use it in GitHub Desktop.
clean old partitions in clickhouse
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
databases: | |
- logs: 7 | |
- vms: 9 | |
- statistics: 20 | |
connect_line: '127.0.0.1' | |
log_file: 'cl_clean_partitions.log' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import logging | |
import yaml | |
import argparse | |
import os | |
import os.path | |
import time | |
import datetime | |
import clickhouse_driver | |
parser = argparse.ArgumentParser(description='Clean old clickhouse partitions') | |
parser.add_argument('--config', dest='config_path', help='path to config path') | |
parser.add_argument('-s', '--simulate', action='store_true') | |
parser.add_argument('--debug', action='store_true') | |
args = parser.parse_args() | |
#print(args.config_path) | |
#print(args.simulate) | |
if args.config_path == None: | |
print("Need config path via --config flag") | |
os._exit(1) | |
if not os.path.isfile(args.config_path): | |
print("Config file with name \"%s\" doesnt exist" % args.config_path) | |
os._exit(1) | |
with open(args.config_path, 'r') as stream: | |
try: | |
config = yaml.load(stream) | |
except yaml.YAMLError as exc: | |
print(exc) | |
#print(config) | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
fh = logging.FileHandler(config['log_file']) | |
fh.setLevel(logging.DEBUG) | |
fh.setFormatter(formatter) | |
logger.addHandler(fh) | |
ch = logging.StreamHandler() | |
if args.debug: | |
ch.setLevel(logging.DEBUG) | |
else: | |
ch.setLevel(logging.WARNING) | |
#ch.setLevel(logging.DEBUG) | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
client = clickhouse_driver.Client(config['connect_line']) | |
all_databases_raw = client.execute("SHOW databases") | |
all_databases = [] | |
for d in all_databases_raw: | |
all_databases.append(d[0]) | |
for db in config['databases']: | |
dbname = db.keys()[0] | |
max_days = db[dbname] | |
logger.info("Try clean db '%s' with %d max days" % (dbname, max_days)) | |
if not dbname in all_databases: | |
logger.warning("We have not database %s - continue" % dbname) | |
continue | |
query = "SHOW TABLES FROM " + dbname | |
tables = client.execute(query) | |
for tables_t in tables: | |
table = tables_t[0] | |
logger.debug("we found table %s in db %s" % (table, dbname)) | |
query = "select partition from (select partition,max(max_date) as m from system.parts where database = '%s' and table = '%s' group by partition) where m < today()-%d order by partition;" % (dbname,table,max_days) | |
#print(query) | |
partitions = client.execute(query) | |
for partition_t in partitions: | |
partition = partition_t[0] | |
partition_delete_command = "ALTER TABLE %s.%s DROP PARTITION %s" % (dbname, table, partition) | |
#print(partition) | |
#print(partition_delete_command) | |
if args.simulate: | |
print("Simulate only! Not execute \"%s\"" % partition_delete_command) | |
else: | |
client.execute(partition_delete_command) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment