Last active
April 13, 2016 12:04
-
-
Save laughingman7743/27a97a2375a37662927a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
analyze-vacuum-schema.py | |
* Copyright 2015, Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
* | |
* Licensed under the Amazon Software License (the "License"). | |
* You may not use this file except in compliance with the License. | |
* A copy of the License is located at | |
* | |
* http://aws.amazon.com/asl/ | |
* | |
* or in the "license" file accompanying this file. This file is distributed | |
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | |
* express or implied. See the License for the specific language governing | |
* permissions and limitations under the License. | |
The Redshift Analyze Vacuum Utility gives you the ability to automate VACUUM and ANALYZE operations. | |
When run, it will analyze or vacuum an entire schema or individual tables. This Utility Analyzes | |
and Vacuums table(s) in a Redshift Database schema, based on certain parameters like unsorted, | |
stats off and size of the table and system alerts from stl_explain & stl_alert_event_log. | |
By turning on/off '--analyze-flag' and '--vacuum-flag' parameters, you can run it as 'vacuum-only' | |
or 'analyze-only' utility. This script can be scheduled to run VACUUM and ANALYZE as part of | |
regular maintenance/housekeeping activities, when there are less database activities (quiet period). | |
This script will: | |
1) Analyze a single table or tables in a schema based on, | |
a) Alerts from stl_explain & stl_alert_event_log. | |
b) 'stats off' metrics from SVV_TABLE_INFO. | |
2) Vacuum a single table or tables in a schema based on, | |
a) The alerts from stl_alert_event_log. | |
b) The 'unsorted' and 'size' metrics from SVV_TABLE_INFO. | |
c) Vacuum reindex to analyze the interleaved sort keys | |
Srinikri Amazon Web Services (2015) | |
11/21/2015 : Added support for vacuum reindex to analyze the interleaved sort keys. | |
""" | |
import logging | |
import urllib | |
from sqlalchemy.engine import create_engine | |
from sqlalchemy.orm.session import sessionmaker | |
__version__ = ".9.1.3.4" | |
class AnalyzeVacuumUtility(object): | |
""" | |
Runs vacuum AND/OR analyze on table(s) in a schema | |
""" | |
def __init__(self, | |
db, | |
db_user, | |
db_pwd, | |
db_host, | |
db_port=5439, | |
schema_name='public', | |
query_group=None, | |
query_slot_count=1): | |
""" | |
:param db: | |
The Database to Use | |
:param db_user: | |
The Database User to connect to | |
:param db_pwd: | |
The Password for the Database User to connect to | |
:param db_host: | |
The Cluster endpoint | |
:param db_port: | |
The Cluster endpoint port : Default=5439 | |
:param schema_name: | |
The Schema to be Analyzed or Vacuumed : Default=public | |
:param query_group: | |
Set the query_group for all queries | |
:param query_slot_count: | |
Modify the wlm_query_slot_count : Default=1 | |
""" | |
self.db = db | |
self.db_user = db_user | |
self.db_pwd = db_pwd | |
self.db_host = db_host | |
self.db_port = db_port | |
self.schema_name = schema_name | |
self.query_group = query_group | |
self.query_slot_count = query_slot_count | |
self._init_conn() | |
def _init_conn(self): | |
logging.debug('Connect %s:%s:%s:%s', self.db_host, self.db_port, self.db, self.db_user) | |
try: | |
options = 'keepalives=1&keepalives_idle=200&keepalives_interval=200&keepalives_count=5' | |
connection_string = 'postgresql+psycopg2://{user}:{password}@{server}:{port}/{database}?{options}'\ | |
.format(user=self.db_user, | |
password=urllib.quote_plus(self.db_pwd), | |
server=self.db_host, | |
port=self.db_port, | |
database=self.db, | |
options=options) | |
self.engine = create_engine(connection_string) | |
self.conn = self.engine.connect() | |
except Exception as e: | |
logging.exception('Unable to connect to Cluster Endpoint') | |
self.cleanup() | |
raise e | |
if self.schema_name: | |
search_path = "set search_path = '$user',public,{0}".format(','.join(self.schema_name)) | |
logging.debug(search_path) | |
try: | |
self.conn.execute(search_path) | |
except Exception as e: | |
logging.exception('Schema %s does not exist', self.schema_name) | |
self.cleanup() | |
raise e | |
if self.query_group: | |
query_group = 'set query_group to {0}'.format(self.query_group) | |
logging.debug(query_group) | |
self.conn.execute(query_group) | |
if self.query_slot_count != 1: | |
query_slot_count = 'set wlm_query_slot_count = {0}'.format(self.query_slot_count) | |
logging.debug(query_slot_count) | |
self.conn.execute(query_slot_count) | |
set_timeout = "set statement_timeout = '36000000'" | |
logging.debug(set_timeout) | |
self.conn.execute(set_timeout) | |
def cleanup(self): | |
if self.conn: | |
self.conn.close() | |
if self.engine: | |
self.engine.dispose() | |
def _get_statement(self, query): | |
results = [] | |
session = sessionmaker(bind=self.engine)() | |
try: | |
cursors = session.execute(query) | |
for cursor in cursors.fetchall(): | |
results.append(str(cursor[0])) | |
cursors.close() | |
session.commit() | |
except Exception as e: | |
logging.exception('Query execution failed: %s', query) | |
session.rollback() | |
raise e | |
finally: | |
session.close() | |
logging.debug('Query execution returned %s results', len(results)) | |
return results | |
def _run_statements_with_transaction(self, queries=[]): | |
session = sessionmaker(bind=self.engine)() | |
for query in queries: | |
try: | |
logging.info('Running query: %s', query) | |
session.execute(query) | |
session.commit() | |
except Exception as e: | |
logging.exception('Query execution failed %s', query) | |
session.rollback() | |
session.close() | |
raise e | |
session.close() | |
return True | |
def _run_statements(self, queries=[]): | |
level = self.conn.connection.isolation_level | |
self.conn.connection.set_isolation_level(0) | |
for query in queries: | |
try: | |
logging.info('Running query: %s', query) | |
self.conn.execute(query) | |
except Exception as e: | |
logging.exception('Query execution failed %s', query) | |
raise e | |
self.conn.connection.set_isolation_level(level) | |
def run_vacuum(self, | |
schema_name, | |
table_name=None, | |
vacuum_parameter='FULL', | |
min_unsorted_pct=5, | |
max_unsorted_pct=50, | |
deleted_pct=5, | |
max_table_size_mb=(700*1024), | |
goback_no_of_days=1, | |
query_rank=25): | |
if table_name: | |
get_vacuum_statement = """-- | |
SELECT DISTINCT 'vacuum {vacuum_parameter} ' + "schema" + '.' + "table" + ' ; ' | |
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table" | |
+ ', Size : ' + CAST("size" AS VARCHAR(10)) + ' MB, Unsorted_pct : ' + CAST("unsorted" AS VARCHAR(10)) | |
+ ', Deleted_pct : ' + CAST("empty" AS VARCHAR(10)) +' */ ;' | |
FROM svv_table_info | |
WHERE (unsorted > {min_unsorted_pct} OR empty > {deleted_pct}) | |
AND size < {max_table_size_mb} | |
AND "schema" = '{schema_name}' | |
AND "table" = '{table_name}'; | |
""".format(vacuum_parameter=vacuum_parameter, | |
min_unsorted_pct=min_unsorted_pct, | |
deleted_pct=deleted_pct, | |
max_table_size_mb=max_table_size_mb, | |
schema_name=schema_name, | |
table_name=table_name) | |
else: | |
# query for all tables in the schema ordered by size descending | |
logging.info("Extracting Candidate Tables for vacuum based on the alerts...") | |
get_vacuum_statement = """-- | |
SELECT DISTINCT 'vacuum {vacuum_parameter} ' + feedback_tbl.schema_name + '.' + feedback_tbl.table_name + ' ; ' | |
+ '/* '+ ' Table Name : ' + info_tbl."schema" + '.' + info_tbl."table" | |
+ ', Size : ' + CAST(info_tbl."size" AS VARCHAR(10)) + ' MB, Unsorted_pct : ' + CAST(info_tbl."unsorted" AS VARCHAR(10)) | |
+ ', Deleted_pct : ' + CAST(info_tbl."empty" AS VARCHAR(10)) +' */ ;' | |
FROM (SELECT schema_name, | |
table_name | |
FROM (SELECT TRIM(n.nspname) schema_name, | |
c.relname table_name, | |
DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) AS qry_rnk, | |
COUNT(*) | |
FROM stl_alert_event_log AS l | |
JOIN (SELECT query, | |
tbl, | |
perm_table_name | |
FROM stl_scan | |
WHERE perm_table_name <> 'Internal Worktable' | |
GROUP BY query, | |
tbl, | |
perm_table_name) AS s ON s.query = l.query | |
JOIN pg_class c ON c.oid = s.tbl | |
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace | |
WHERE l.userid > 1 | |
AND l.event_time >= dateadd (DAY,-{goback_no_of_days},CURRENT_DATE) | |
AND l.Solution LIKE '%VACUUM command%' | |
GROUP BY TRIM(n.nspname), c.relname) anlyz_tbl | |
WHERE anlyz_tbl.qry_rnk < {query_rank}) feedback_tbl | |
JOIN svv_table_info info_tbl | |
ON info_tbl.schema = feedback_tbl.schema_name | |
AND info_tbl.table = feedback_tbl.table_name | |
WHERE /*(info_tbl.unsorted > {min_unsorted_pct} OR info_tbl.empty > {deleted_pct}) AND */ | |
info_tbl.size < {max_table_size_mb} | |
AND TRIM(info_tbl.schema) = '{schema_name}' | |
AND (sortkey1 not ilike 'INTERLEAVED%' OR sortkey1 IS NULL) | |
ORDER BY info_tbl.size ASC, info_tbl.skew_rows ASC; | |
""".format(vacuum_parameter=vacuum_parameter, | |
goback_no_of_days=goback_no_of_days, | |
query_rank=query_rank, | |
min_unsorted_pct=min_unsorted_pct, | |
deleted_pct=deleted_pct, | |
max_table_size_mb=max_table_size_mb, | |
schema_name=schema_name) | |
logging.debug(get_vacuum_statement) | |
vacuum_statements = self._get_statement(get_vacuum_statement) | |
self._run_statements(vacuum_statements) | |
if not table_name: | |
# query for all tables in the schema ordered by size descending | |
logging.info("Extracting Candidate Tables for vacuum...") | |
get_vacuum_statement = """-- | |
SELECT DISTINCT 'vacuum {vacuum_parameter} ' + "schema" + '.' + "table" + ' ; ' | |
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table" | |
+ ', Size : ' + CAST("size" AS VARCHAR(10)) + ' MB, Unsorted_pct : ' + CAST("unsorted" AS VARCHAR(10)) | |
+ ', Deleted_pct : ' + CAST("empty" AS VARCHAR(10)) +' */ ;' | |
FROM svv_table_info | |
WHERE "schema" = '{schema_name}' | |
AND | |
( | |
--If the size of the table is less than the max_table_size_mb then , run vacuum based on condition: >min_unsorted_pct AND >deleted_pct | |
((size < {max_table_size_mb}) AND (unsorted > {min_unsorted_pct} OR empty > {deleted_pct})) | |
OR | |
--If the size of the table is greater than the max_table_size_mb then , run vacuum based on condition: | |
-- >min_unsorted_pct AND < max_unsorted_pct AND >deleted_pct | |
--This is to avoid big table with large unsorted_pct | |
((size > {max_table_size_mb}) AND (unsorted > {min_unsorted_pct} AND unsorted < {max_unsorted_pct} )) | |
) | |
AND (sortkey1 not ilike 'INTERLEAVED%' OR sortkey1 IS NULL) | |
ORDER BY "size" ASC ,skew_rows ASC; | |
""".format(vacuum_parameter=vacuum_parameter, | |
schema_name=schema_name, | |
max_table_size_mb=max_table_size_mb, | |
min_unsorted_pct=min_unsorted_pct, | |
max_unsorted_pct=max_unsorted_pct, | |
deleted_pct=deleted_pct) | |
logging.debug(get_vacuum_statement) | |
vacuum_statements = self._get_statement(get_vacuum_statement) | |
self._run_statements(vacuum_statements) | |
if not table_name: | |
# query for all tables in the schema for vacuum reindex | |
logging.info("Extracting Candidate Tables for vacuum reindex...") | |
get_vacuum_statement = """-- | |
SELECT DISTINCT 'vacuum REINDEX ' + schema_name + '.' + table_name + ' ; ' + '/* ' + ' Table Name : ' | |
+ schema_name + '.' + table_name + ', Rows : ' + CAST("rows" AS VARCHAR(10)) | |
+ ', Interleaved_skew : ' + CAST("max_skew" AS VARCHAR(10)) | |
+ ' , Reindex Flag : ' + CAST(reindex_flag AS VARCHAR(10)) + ' */ ;' | |
FROM (SELECT TRIM(n.nspname) schema_name, t.relname table_name, | |
MAX(v.interleaved_skew) max_skew, MAX(c.count) AS rows, | |
CASE | |
WHEN (MAX(v.interleaved_skew) > 5 AND MAX(c.count) > 10240) THEN 'Yes' | |
ELSE 'No' | |
END AS reindex_flag | |
FROM svv_interleaved_columns v | |
JOIN (SELECT tbl,col,SUM(count) AS count | |
FROM stv_interleaved_counts | |
GROUP BY tbl,col) c | |
ON (v.tbl = c.tbl AND v.col = c.col) | |
JOIN pg_class t ON t.oid = c.tbl | |
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace | |
GROUP BY 1, 2) | |
WHERE reindex_flag = 'Yes' | |
AND schema_name = '{schema_name}' | |
""".format(schema_name=schema_name) | |
logging.debug(get_vacuum_statement) | |
vacuum_statements = self._get_statement(get_vacuum_statement) | |
self._run_statements(vacuum_statements) | |
def run_analyze(self, | |
schema_name, | |
table_name=None, | |
stats_off_pct=10, | |
goback_no_of_days=1, | |
query_rank=25): | |
if table_name: | |
# If it is one table , just check if this needs to be analyzed and prepare analyze statements | |
get_analyze_statement_feedback = """-- | |
SELECT DISTINCT 'analyze ' + "schema" + '.' + "table" + ' ; ' | |
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table" | |
+ ', stats_off : ' + CAST("stats_off" AS VARCHAR(10)) + ' */ ;' | |
FROM svv_table_info | |
WHERE stats_off::DECIMAL (32,4) > {stats_off_pct} ::DECIMAL (32,4) | |
AND trim("schema") = '{schema_name}' | |
AND trim("table") = '{table_name}'; | |
""".format(stats_off_pct=stats_off_pct, | |
schema_name=schema_name, | |
table_name=table_name) | |
else: | |
# query for all tables in the schema | |
logging.info("Extracting Candidate Tables for analyze based on Query Optimizer Alerts(Feedbacks)...") | |
get_analyze_statement_feedback = """--Get top N rank tables based on the missing statistics alerts | |
SELECT DISTINCT 'analyze ' + feedback_tbl.schema_name + '.' + feedback_tbl.table_name + ' ; ' | |
+ '/* '+ ' Table Name : ' + info_tbl."schema" + '.' + info_tbl."table" | |
+ ', Stats_Off : ' + CAST(info_tbl."stats_off" AS VARCHAR(10)) + ' */ ;' | |
FROM ((SELECT TRIM(n.nspname) schema_name, | |
c.relname table_name | |
FROM (SELECT TRIM(SPLIT_PART(SPLIT_PART(a.plannode,':',2),' ',2)) AS Table_Name, | |
COUNT(a.query), | |
DENSE_RANK() OVER (ORDER BY COUNT(a.query) DESC) AS qry_rnk | |
FROM stl_explain a, | |
stl_query b | |
WHERE a.query = b.query | |
AND CAST(b.starttime AS DATE) >= dateadd (DAY,-{goback_no_of_days},CURRENT_DATE) | |
AND a.userid > 1 | |
AND a.plannode LIKE '%missing statistics%' | |
AND a.plannode NOT LIKE '%_bkp_%' | |
GROUP BY Table_Name) miss_tbl | |
LEFT JOIN pg_class c ON c.relname = TRIM (miss_tbl.table_name) | |
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace | |
WHERE miss_tbl.qry_rnk <= {query_rank}) | |
-- Get the top N rank tables based on the stl_alert_event_log alerts | |
UNION | |
SELECT schema_name, | |
table_name | |
FROM (SELECT TRIM(n.nspname) schema_name, | |
c.relname table_name, | |
DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) AS qry_rnk, | |
COUNT(*) | |
FROM stl_alert_event_log AS l | |
JOIN (SELECT query, | |
tbl, | |
perm_table_name | |
FROM stl_scan | |
WHERE perm_table_name <> 'Internal Worktable' | |
GROUP BY query, | |
tbl, | |
perm_table_name) AS s ON s.query = l.query | |
JOIN pg_class c ON c.oid = s.tbl | |
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace | |
WHERE l.userid > 1 | |
AND l.event_time >= dateadd (DAY,-{goback_no_of_days},CURRENT_DATE) | |
AND l.Solution LIKE '%ANALYZE command%' | |
GROUP BY TRIM(n.nspname), | |
c.relname) anlyz_tbl | |
WHERE anlyz_tbl.qry_rnk < {query_rank}) feedback_tbl | |
JOIN svv_table_info info_tbl | |
ON info_tbl.schema = feedback_tbl.schema_name | |
AND info_tbl.table = feedback_tbl.table_name | |
WHERE info_tbl.stats_off::DECIMAL (32,4) > {stats_off_pct}::DECIMAL (32,4) | |
AND TRIM(info_tbl.schema) = '{schema_name}' | |
ORDER BY info_tbl.size ASC; | |
""".format(goback_no_of_days=goback_no_of_days, | |
query_rank=query_rank, | |
stats_off_pct=stats_off_pct, | |
schema_name=schema_name) | |
logging.debug(get_analyze_statement_feedback) | |
analyze_statements = self._get_statement(get_analyze_statement_feedback) | |
self._run_statements_with_transaction(analyze_statements) | |
if not table_name: | |
logging.info("Extracting Candidate Tables for analyze based on stats off from system table info...") | |
get_analyze_statement = """-- | |
SELECT DISTINCT 'analyze ' + "schema" + '.' + "table" + ' ; ' | |
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table" | |
+ ', Stats_Off : ' + CAST("stats_off" AS VARCHAR(10)) + ' */ ;' | |
FROM svv_table_info | |
WHERE stats_off::DECIMAL (32,4) > {stats_off_pct}::DECIMAL (32,4) | |
AND trim("schema") = '{schema_name}' | |
ORDER BY "size" ASC ; | |
""".format(stats_off_pct=stats_off_pct, | |
schema_name=schema_name) | |
logging.debug(get_analyze_statement) | |
analyze_statements = self._get_statement(get_analyze_statement) | |
self._run_statements_with_transaction(analyze_statements) | |
if __name__ == '__main__': | |
import setting | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
utility = AnalyzeVacuumUtility(setting.REDSHIFT_DATABASE, | |
setting.REDSHIFT_USER, | |
setting.REDSHIFT_PASSWORD, | |
setting.REDSHIFT_HOST, | |
setting.REDSHIFT_PORT, | |
setting.REDSHIFT_SCHEMA) | |
for schema in setting.REDSHIFT_SCHEMA: | |
utility.run_vacuum(schema) | |
utility.run_analyze(schema) | |
utility.cleanup() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
REDSHIFT_HOST = 'YOUR_HOST.redshift.amazonaws.com' | |
REDSHIFT_PORT = 5439 | |
REDSHIFT_USER = 'YOUR_USER' | |
REDSHIFT_PASSWORD = 'YOUR_PASSWORD' | |
REDSHIFT_DATABASE = 'YOUR_DATABASE' | |
REDSHIFT_SCHEMA = ['YOUR_SCHEMA_1', 'YOUR_SCHEMA_1'] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment