Skip to content

Instantly share code, notes, and snippets.

@laughingman7743
Last active April 13, 2016 12:04
Show Gist options
  • Save laughingman7743/27a97a2375a37662927a to your computer and use it in GitHub Desktop.
Save laughingman7743/27a97a2375a37662927a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
analyze-vacuum-schema.py
* Copyright 2015, Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
The Redshift Analyze Vacuum Utility gives you the ability to automate VACUUM and ANALYZE operations.
When run, it will analyze or vacuum an entire schema or individual tables. This Utility Analyzes
and Vacuums table(s) in a Redshift Database schema, based on certain parameters like unsorted,
stats off and size of the table and system alerts from stl_explain & stl_alert_event_log.
By turning on/off '--analyze-flag' and '--vacuum-flag' parameters, you can run it as 'vacuum-only'
or 'analyze-only' utility. This script can be scheduled to run VACUUM and ANALYZE as part of
regular maintenance/housekeeping activities, when there are less database activities (quiet period).
This script will:
1) Analyze a single table or tables in a schema based on,
a) Alerts from stl_explain & stl_alert_event_log.
b) 'stats off' metrics from SVV_TABLE_INFO.
2) Vacuum a single table or tables in a schema based on,
a) The alerts from stl_alert_event_log.
b) The 'unsorted' and 'size' metrics from SVV_TABLE_INFO.
c) Vacuum reindex to analyze the interleaved sort keys
Srinikri Amazon Web Services (2015)
11/21/2015 : Added support for vacuum reindex to analyze the interleaved sort keys.
"""
import logging
import urllib
from sqlalchemy.engine import create_engine
from sqlalchemy.orm.session import sessionmaker
__version__ = ".9.1.3.4"
class AnalyzeVacuumUtility(object):
"""
Runs vacuum AND/OR analyze on table(s) in a schema
"""
def __init__(self,
db,
db_user,
db_pwd,
db_host,
db_port=5439,
schema_name='public',
query_group=None,
query_slot_count=1):
"""
:param db:
The Database to Use
:param db_user:
The Database User to connect to
:param db_pwd:
The Password for the Database User to connect to
:param db_host:
The Cluster endpoint
:param db_port:
The Cluster endpoint port : Default=5439
:param schema_name:
The Schema to be Analyzed or Vacuumed : Default=public
:param query_group:
Set the query_group for all queries
:param query_slot_count:
Modify the wlm_query_slot_count : Default=1
"""
self.db = db
self.db_user = db_user
self.db_pwd = db_pwd
self.db_host = db_host
self.db_port = db_port
self.schema_name = schema_name
self.query_group = query_group
self.query_slot_count = query_slot_count
self._init_conn()
def _init_conn(self):
logging.debug('Connect %s:%s:%s:%s', self.db_host, self.db_port, self.db, self.db_user)
try:
options = 'keepalives=1&keepalives_idle=200&keepalives_interval=200&keepalives_count=5'
connection_string = 'postgresql+psycopg2://{user}:{password}@{server}:{port}/{database}?{options}'\
.format(user=self.db_user,
password=urllib.quote_plus(self.db_pwd),
server=self.db_host,
port=self.db_port,
database=self.db,
options=options)
self.engine = create_engine(connection_string)
self.conn = self.engine.connect()
except Exception as e:
logging.exception('Unable to connect to Cluster Endpoint')
self.cleanup()
raise e
if self.schema_name:
search_path = "set search_path = '$user',public,{0}".format(','.join(self.schema_name))
logging.debug(search_path)
try:
self.conn.execute(search_path)
except Exception as e:
logging.exception('Schema %s does not exist', self.schema_name)
self.cleanup()
raise e
if self.query_group:
query_group = 'set query_group to {0}'.format(self.query_group)
logging.debug(query_group)
self.conn.execute(query_group)
if self.query_slot_count != 1:
query_slot_count = 'set wlm_query_slot_count = {0}'.format(self.query_slot_count)
logging.debug(query_slot_count)
self.conn.execute(query_slot_count)
set_timeout = "set statement_timeout = '36000000'"
logging.debug(set_timeout)
self.conn.execute(set_timeout)
def cleanup(self):
if self.conn:
self.conn.close()
if self.engine:
self.engine.dispose()
def _get_statement(self, query):
results = []
session = sessionmaker(bind=self.engine)()
try:
cursors = session.execute(query)
for cursor in cursors.fetchall():
results.append(str(cursor[0]))
cursors.close()
session.commit()
except Exception as e:
logging.exception('Query execution failed: %s', query)
session.rollback()
raise e
finally:
session.close()
logging.debug('Query execution returned %s results', len(results))
return results
def _run_statements_with_transaction(self, queries=[]):
session = sessionmaker(bind=self.engine)()
for query in queries:
try:
logging.info('Running query: %s', query)
session.execute(query)
session.commit()
except Exception as e:
logging.exception('Query execution failed %s', query)
session.rollback()
session.close()
raise e
session.close()
return True
def _run_statements(self, queries=[]):
level = self.conn.connection.isolation_level
self.conn.connection.set_isolation_level(0)
for query in queries:
try:
logging.info('Running query: %s', query)
self.conn.execute(query)
except Exception as e:
logging.exception('Query execution failed %s', query)
raise e
self.conn.connection.set_isolation_level(level)
def run_vacuum(self,
schema_name,
table_name=None,
vacuum_parameter='FULL',
min_unsorted_pct=5,
max_unsorted_pct=50,
deleted_pct=5,
max_table_size_mb=(700*1024),
goback_no_of_days=1,
query_rank=25):
if table_name:
get_vacuum_statement = """--
SELECT DISTINCT 'vacuum {vacuum_parameter} ' + "schema" + '.' + "table" + ' ; '
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table"
+ ', Size : ' + CAST("size" AS VARCHAR(10)) + ' MB, Unsorted_pct : ' + CAST("unsorted" AS VARCHAR(10))
+ ', Deleted_pct : ' + CAST("empty" AS VARCHAR(10)) +' */ ;'
FROM svv_table_info
WHERE (unsorted > {min_unsorted_pct} OR empty > {deleted_pct})
AND size < {max_table_size_mb}
AND "schema" = '{schema_name}'
AND "table" = '{table_name}';
""".format(vacuum_parameter=vacuum_parameter,
min_unsorted_pct=min_unsorted_pct,
deleted_pct=deleted_pct,
max_table_size_mb=max_table_size_mb,
schema_name=schema_name,
table_name=table_name)
else:
# query for all tables in the schema ordered by size descending
logging.info("Extracting Candidate Tables for vacuum based on the alerts...")
get_vacuum_statement = """--
SELECT DISTINCT 'vacuum {vacuum_parameter} ' + feedback_tbl.schema_name + '.' + feedback_tbl.table_name + ' ; '
+ '/* '+ ' Table Name : ' + info_tbl."schema" + '.' + info_tbl."table"
+ ', Size : ' + CAST(info_tbl."size" AS VARCHAR(10)) + ' MB, Unsorted_pct : ' + CAST(info_tbl."unsorted" AS VARCHAR(10))
+ ', Deleted_pct : ' + CAST(info_tbl."empty" AS VARCHAR(10)) +' */ ;'
FROM (SELECT schema_name,
table_name
FROM (SELECT TRIM(n.nspname) schema_name,
c.relname table_name,
DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) AS qry_rnk,
COUNT(*)
FROM stl_alert_event_log AS l
JOIN (SELECT query,
tbl,
perm_table_name
FROM stl_scan
WHERE perm_table_name <> 'Internal Worktable'
GROUP BY query,
tbl,
perm_table_name) AS s ON s.query = l.query
JOIN pg_class c ON c.oid = s.tbl
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE l.userid > 1
AND l.event_time >= dateadd (DAY,-{goback_no_of_days},CURRENT_DATE)
AND l.Solution LIKE '%VACUUM command%'
GROUP BY TRIM(n.nspname), c.relname) anlyz_tbl
WHERE anlyz_tbl.qry_rnk < {query_rank}) feedback_tbl
JOIN svv_table_info info_tbl
ON info_tbl.schema = feedback_tbl.schema_name
AND info_tbl.table = feedback_tbl.table_name
WHERE /*(info_tbl.unsorted > {min_unsorted_pct} OR info_tbl.empty > {deleted_pct}) AND */
info_tbl.size < {max_table_size_mb}
AND TRIM(info_tbl.schema) = '{schema_name}'
AND (sortkey1 not ilike 'INTERLEAVED%' OR sortkey1 IS NULL)
ORDER BY info_tbl.size ASC, info_tbl.skew_rows ASC;
""".format(vacuum_parameter=vacuum_parameter,
goback_no_of_days=goback_no_of_days,
query_rank=query_rank,
min_unsorted_pct=min_unsorted_pct,
deleted_pct=deleted_pct,
max_table_size_mb=max_table_size_mb,
schema_name=schema_name)
logging.debug(get_vacuum_statement)
vacuum_statements = self._get_statement(get_vacuum_statement)
self._run_statements(vacuum_statements)
if not table_name:
# query for all tables in the schema ordered by size descending
logging.info("Extracting Candidate Tables for vacuum...")
get_vacuum_statement = """--
SELECT DISTINCT 'vacuum {vacuum_parameter} ' + "schema" + '.' + "table" + ' ; '
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table"
+ ', Size : ' + CAST("size" AS VARCHAR(10)) + ' MB, Unsorted_pct : ' + CAST("unsorted" AS VARCHAR(10))
+ ', Deleted_pct : ' + CAST("empty" AS VARCHAR(10)) +' */ ;'
FROM svv_table_info
WHERE "schema" = '{schema_name}'
AND
(
--If the size of the table is less than the max_table_size_mb then , run vacuum based on condition: >min_unsorted_pct AND >deleted_pct
((size < {max_table_size_mb}) AND (unsorted > {min_unsorted_pct} OR empty > {deleted_pct}))
OR
--If the size of the table is greater than the max_table_size_mb then , run vacuum based on condition:
-- >min_unsorted_pct AND < max_unsorted_pct AND >deleted_pct
--This is to avoid big table with large unsorted_pct
((size > {max_table_size_mb}) AND (unsorted > {min_unsorted_pct} AND unsorted < {max_unsorted_pct} ))
)
AND (sortkey1 not ilike 'INTERLEAVED%' OR sortkey1 IS NULL)
ORDER BY "size" ASC ,skew_rows ASC;
""".format(vacuum_parameter=vacuum_parameter,
schema_name=schema_name,
max_table_size_mb=max_table_size_mb,
min_unsorted_pct=min_unsorted_pct,
max_unsorted_pct=max_unsorted_pct,
deleted_pct=deleted_pct)
logging.debug(get_vacuum_statement)
vacuum_statements = self._get_statement(get_vacuum_statement)
self._run_statements(vacuum_statements)
if not table_name:
# query for all tables in the schema for vacuum reindex
logging.info("Extracting Candidate Tables for vacuum reindex...")
get_vacuum_statement = """--
SELECT DISTINCT 'vacuum REINDEX ' + schema_name + '.' + table_name + ' ; ' + '/* ' + ' Table Name : '
+ schema_name + '.' + table_name + ', Rows : ' + CAST("rows" AS VARCHAR(10))
+ ', Interleaved_skew : ' + CAST("max_skew" AS VARCHAR(10))
+ ' , Reindex Flag : ' + CAST(reindex_flag AS VARCHAR(10)) + ' */ ;'
FROM (SELECT TRIM(n.nspname) schema_name, t.relname table_name,
MAX(v.interleaved_skew) max_skew, MAX(c.count) AS rows,
CASE
WHEN (MAX(v.interleaved_skew) > 5 AND MAX(c.count) > 10240) THEN 'Yes'
ELSE 'No'
END AS reindex_flag
FROM svv_interleaved_columns v
JOIN (SELECT tbl,col,SUM(count) AS count
FROM stv_interleaved_counts
GROUP BY tbl,col) c
ON (v.tbl = c.tbl AND v.col = c.col)
JOIN pg_class t ON t.oid = c.tbl
JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace
GROUP BY 1, 2)
WHERE reindex_flag = 'Yes'
AND schema_name = '{schema_name}'
""".format(schema_name=schema_name)
logging.debug(get_vacuum_statement)
vacuum_statements = self._get_statement(get_vacuum_statement)
self._run_statements(vacuum_statements)
def run_analyze(self,
schema_name,
table_name=None,
stats_off_pct=10,
goback_no_of_days=1,
query_rank=25):
if table_name:
# If it is one table , just check if this needs to be analyzed and prepare analyze statements
get_analyze_statement_feedback = """--
SELECT DISTINCT 'analyze ' + "schema" + '.' + "table" + ' ; '
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table"
+ ', stats_off : ' + CAST("stats_off" AS VARCHAR(10)) + ' */ ;'
FROM svv_table_info
WHERE stats_off::DECIMAL (32,4) > {stats_off_pct} ::DECIMAL (32,4)
AND trim("schema") = '{schema_name}'
AND trim("table") = '{table_name}';
""".format(stats_off_pct=stats_off_pct,
schema_name=schema_name,
table_name=table_name)
else:
# query for all tables in the schema
logging.info("Extracting Candidate Tables for analyze based on Query Optimizer Alerts(Feedbacks)...")
get_analyze_statement_feedback = """--Get top N rank tables based on the missing statistics alerts
SELECT DISTINCT 'analyze ' + feedback_tbl.schema_name + '.' + feedback_tbl.table_name + ' ; '
+ '/* '+ ' Table Name : ' + info_tbl."schema" + '.' + info_tbl."table"
+ ', Stats_Off : ' + CAST(info_tbl."stats_off" AS VARCHAR(10)) + ' */ ;'
FROM ((SELECT TRIM(n.nspname) schema_name,
c.relname table_name
FROM (SELECT TRIM(SPLIT_PART(SPLIT_PART(a.plannode,':',2),' ',2)) AS Table_Name,
COUNT(a.query),
DENSE_RANK() OVER (ORDER BY COUNT(a.query) DESC) AS qry_rnk
FROM stl_explain a,
stl_query b
WHERE a.query = b.query
AND CAST(b.starttime AS DATE) >= dateadd (DAY,-{goback_no_of_days},CURRENT_DATE)
AND a.userid > 1
AND a.plannode LIKE '%missing statistics%'
AND a.plannode NOT LIKE '%_bkp_%'
GROUP BY Table_Name) miss_tbl
LEFT JOIN pg_class c ON c.relname = TRIM (miss_tbl.table_name)
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE miss_tbl.qry_rnk <= {query_rank})
-- Get the top N rank tables based on the stl_alert_event_log alerts
UNION
SELECT schema_name,
table_name
FROM (SELECT TRIM(n.nspname) schema_name,
c.relname table_name,
DENSE_RANK() OVER (ORDER BY COUNT(*) DESC) AS qry_rnk,
COUNT(*)
FROM stl_alert_event_log AS l
JOIN (SELECT query,
tbl,
perm_table_name
FROM stl_scan
WHERE perm_table_name <> 'Internal Worktable'
GROUP BY query,
tbl,
perm_table_name) AS s ON s.query = l.query
JOIN pg_class c ON c.oid = s.tbl
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE l.userid > 1
AND l.event_time >= dateadd (DAY,-{goback_no_of_days},CURRENT_DATE)
AND l.Solution LIKE '%ANALYZE command%'
GROUP BY TRIM(n.nspname),
c.relname) anlyz_tbl
WHERE anlyz_tbl.qry_rnk < {query_rank}) feedback_tbl
JOIN svv_table_info info_tbl
ON info_tbl.schema = feedback_tbl.schema_name
AND info_tbl.table = feedback_tbl.table_name
WHERE info_tbl.stats_off::DECIMAL (32,4) > {stats_off_pct}::DECIMAL (32,4)
AND TRIM(info_tbl.schema) = '{schema_name}'
ORDER BY info_tbl.size ASC;
""".format(goback_no_of_days=goback_no_of_days,
query_rank=query_rank,
stats_off_pct=stats_off_pct,
schema_name=schema_name)
logging.debug(get_analyze_statement_feedback)
analyze_statements = self._get_statement(get_analyze_statement_feedback)
self._run_statements_with_transaction(analyze_statements)
if not table_name:
logging.info("Extracting Candidate Tables for analyze based on stats off from system table info...")
get_analyze_statement = """--
SELECT DISTINCT 'analyze ' + "schema" + '.' + "table" + ' ; '
+ '/* '+ ' Table Name : ' + "schema" + '.' + "table"
+ ', Stats_Off : ' + CAST("stats_off" AS VARCHAR(10)) + ' */ ;'
FROM svv_table_info
WHERE stats_off::DECIMAL (32,4) > {stats_off_pct}::DECIMAL (32,4)
AND trim("schema") = '{schema_name}'
ORDER BY "size" ASC ;
""".format(stats_off_pct=stats_off_pct,
schema_name=schema_name)
logging.debug(get_analyze_statement)
analyze_statements = self._get_statement(get_analyze_statement)
self._run_statements_with_transaction(analyze_statements)
if __name__ == '__main__':
import setting
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
utility = AnalyzeVacuumUtility(setting.REDSHIFT_DATABASE,
setting.REDSHIFT_USER,
setting.REDSHIFT_PASSWORD,
setting.REDSHIFT_HOST,
setting.REDSHIFT_PORT,
setting.REDSHIFT_SCHEMA)
for schema in setting.REDSHIFT_SCHEMA:
utility.run_vacuum(schema)
utility.run_analyze(schema)
utility.cleanup()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
REDSHIFT_HOST = 'YOUR_HOST.redshift.amazonaws.com'
REDSHIFT_PORT = 5439
REDSHIFT_USER = 'YOUR_USER'
REDSHIFT_PASSWORD = 'YOUR_PASSWORD'
REDSHIFT_DATABASE = 'YOUR_DATABASE'
REDSHIFT_SCHEMA = ['YOUR_SCHEMA_1', 'YOUR_SCHEMA_1']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment