Skip to content

Instantly share code, notes, and snippets.

@posulliv
Created December 14, 2022 18:36
Show Gist options
  • Save posulliv/46d8b8709d59a55b55355431a2a8b4fb to your computer and use it in GitHub Desktop.
Save posulliv/46d8b8709d59a55b55355431a2a8b4fb to your computer and use it in GitHub Desktop.
Python script to extract high level metrics from Trino explain plan generated with EXPLAIN ANALYZE
#!/usr/bin/env python3
from timeit import default_timer as timer
import configparser
import trino
import click
@click.command()
@click.option('--queries', type=click.Path(exists=True), help='SQL file with queries to extract metrics for.')
def run(queries):
conn = trino_connection()
cursor = conn.cursor()
with open(queries) as queries_file:
sql_queries = queries_file.readlines()
for query in sql_queries:
print(f"Query: {query}")
# prepend explain analyze to query
explain_query = 'EXPLAIN ANALYZE {}'.format(query)
start = timer()
explain_output = run_query(cursor, explain_query)
end = timer()
seconds = end - start
total_cpu, total_scheduled = extract_metrics(explain_output[0][0])
active = (total_cpu / total_scheduled) * 100
parallelism = total_cpu / seconds
print(' elapsed time : {} seconds'.format(seconds))
print(' total cpu : {} seconds'.format(total_cpu))
print(' total scheduled: {} seconds'.format(total_scheduled))
print(' Parallelism : {}'.format(parallelism))
print(' Active : {}%'.format(active))
def run_query(cursor, query):
cursor.execute(query)
return cursor.fetchall()
def extract_metrics(explain_output):
total_cpu = 0
total_scheduled = 0
for line in list(iter(explain_output.splitlines())):
if "per task" in line:
metrics = line.split(",")
cpu_time = convert_timemetric_to_seconds(metrics[0].split(":")[1])
scheduled_time = convert_timemetric_to_seconds(metrics[1].split(":")[1])
total_cpu += cpu_time
total_scheduled += scheduled_time
return total_cpu, total_scheduled
def convert_timemetric_to_seconds(metric):
if "ns" in metric:
return float(metric[:-2]) / 1000000000
if "us" in metric:
return float(metric[:-2]) / 1000000
if "ms" in metric:
return float(metric[:-2]) / 1000
if "s" in metric:
return float(metric[:-1])
if "m" in metric:
return float(metric[:-1]) * 60
if "h" in metric:
return float(metric[:-1]) * (60 * 60)
if "d" in metric:
return float(metric[:-1]) * (60 * 60 * 24)
return 0
def trino_connection():
config = configparser.ConfigParser()
config.read('config.ini')
trino_config = config['trino']
return trino.dbapi.connect(
host = trino_config['host'],
port = trino_config['port'],
user = trino_config['user'],
catalog = trino_config['catalog'],
schema = trino_config['schema'],
http_scheme = trino_config['http_scheme'],
auth = trino.auth.BasicAuthentication(trino_config['user'], trino_config['password']),
verify = trino_config['verify_certs'].lower() == 'true'
)
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment