Skip to content

Instantly share code, notes, and snippets.

@cryocaustik
Last active January 2, 2019 23:48
Show Gist options
  • Save cryocaustik/89ca03538e763ab0816e46f801b7031b to your computer and use it in GitHub Desktop.
Save cryocaustik/89ca03538e763ab0816e46f801b7031b to your computer and use it in GitHub Desktop.
quick script to find names of tables used in .sql queries
import os
import re
class FindTables:
"""Finds SQL files in directory tree and returns and SQL tables found.
String searches are case in-sensitive and are only considered if they follow a [from|into|join] string.
Returns:
dict: dict of SQL table names, broken down by Database, Schema, and Table name; e.g. {db: schema: [tables]}
"""
TABLE_REGEX = r'(?<=from|join|into)\s{1,}\S*?(?=\s|\t|\n|\r\n|$)'
DIR_IGNORE = ['.git', '.archive']
def __init__(self, search_dir):
self.search_dir = search_dir
def __call__(self):
return self.find_tables()
def valid_dir(self, path):
"""Verify if path is a valid directory and that is does not contain an excluded directory.
Args:
path (str): path to validate
Returns:
bool: Boolean of valid path
"""
if not os.path.isdir(path):
return False
for v in self.DIR_IGNORE:
if v in path:
return False
else:
return True
def find_sql_files(self):
"""Walk directory and find all files ending with 'sql'.
Returns:
list: List of paths to SQL files within root path.
"""
files = []
for p, d, f in os.walk(self.search_dir):
# skip invalid directories
if not self.valid_dir(p):
continue
for fn in f:
# skip if file ext != sql
if not fn.endswith('sql'):
continue
files.append(os.path.join(p, fn))
return files
def find_tables(self):
"""Using RegEx, find SQL tables and return list of the table names.
Returns:
dict: dict of SQL table names, broken down by Database, Schema, and Table name; e.g. {db: schema: [tables]}
"""
files = self.find_sql_files()
tables = set()
for f in files:
with open(f, 'r') as _f:
data = _f.read()
_f.close()
reg_matches = re.findall(self.TABLE_REGEX, data, re.IGNORECASE)
if reg_matches:
for v in reg_matches:
tables.add(v.strip())
parsed_tables = {
'unknown': {'': []},
'temp': {'': []},
'global_temp': {'': []},
}
for t in tables:
# if <db.schema.table>
if t.count('.') == 2:
db, schema, tbl = t.split('.')
# if new database, create it
if db not in parsed_tables:
parsed_tables[db] = {}
# if new schema, create it
if schema not in parsed_tables[db]:
parsed_tables[db][schema] = []
parsed_tables[db][schema].append(tbl)
# if <schema.table>
elif t.count('.') == 1:
schema, tbl = t.split('.')
# if new schema, create it
if schema not in parsed_tables[db]:
parsed_tables['unknown'][schema] = []
parsed_tables['unknown'][schema].append(tbl)
# if session temp <#table>
elif t.count('#') == 1:
parsed_tables['temp'][''].append(t)
# if global temp <##table>
elif t.count('#') == 2:
parsed_tables['global_temp'][''].append(t)
# all other cases
else:
parsed_tables['unknown'][''].append(t)
return parsed_tables
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment