Last active
January 2, 2019 23:48
-
-
Save cryocaustik/89ca03538e763ab0816e46f801b7031b to your computer and use it in GitHub Desktop.
quick script to find names of tables used in .sql queries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
class FindTables: | |
"""Finds SQL files in directory tree and returns and SQL tables found. | |
String searches are case in-sensitive and are only considered if they follow a [from|into|join] string. | |
Returns: | |
dict: dict of SQL table names, broken down by Database, Schema, and Table name; e.g. {db: schema: [tables]} | |
""" | |
TABLE_REGEX = r'(?<=from|join|into)\s{1,}\S*?(?=\s|\t|\n|\r\n|$)' | |
DIR_IGNORE = ['.git', '.archive'] | |
def __init__(self, search_dir): | |
self.search_dir = search_dir | |
def __call__(self): | |
return self.find_tables() | |
def valid_dir(self, path): | |
"""Verify if path is a valid directory and that is does not contain an excluded directory. | |
Args: | |
path (str): path to validate | |
Returns: | |
bool: Boolean of valid path | |
""" | |
if not os.path.isdir(path): | |
return False | |
for v in self.DIR_IGNORE: | |
if v in path: | |
return False | |
else: | |
return True | |
def find_sql_files(self): | |
"""Walk directory and find all files ending with 'sql'. | |
Returns: | |
list: List of paths to SQL files within root path. | |
""" | |
files = [] | |
for p, d, f in os.walk(self.search_dir): | |
# skip invalid directories | |
if not self.valid_dir(p): | |
continue | |
for fn in f: | |
# skip if file ext != sql | |
if not fn.endswith('sql'): | |
continue | |
files.append(os.path.join(p, fn)) | |
return files | |
def find_tables(self): | |
"""Using RegEx, find SQL tables and return list of the table names. | |
Returns: | |
dict: dict of SQL table names, broken down by Database, Schema, and Table name; e.g. {db: schema: [tables]} | |
""" | |
files = self.find_sql_files() | |
tables = set() | |
for f in files: | |
with open(f, 'r') as _f: | |
data = _f.read() | |
_f.close() | |
reg_matches = re.findall(self.TABLE_REGEX, data, re.IGNORECASE) | |
if reg_matches: | |
for v in reg_matches: | |
tables.add(v.strip()) | |
parsed_tables = { | |
'unknown': {'': []}, | |
'temp': {'': []}, | |
'global_temp': {'': []}, | |
} | |
for t in tables: | |
# if <db.schema.table> | |
if t.count('.') == 2: | |
db, schema, tbl = t.split('.') | |
# if new database, create it | |
if db not in parsed_tables: | |
parsed_tables[db] = {} | |
# if new schema, create it | |
if schema not in parsed_tables[db]: | |
parsed_tables[db][schema] = [] | |
parsed_tables[db][schema].append(tbl) | |
# if <schema.table> | |
elif t.count('.') == 1: | |
schema, tbl = t.split('.') | |
# if new schema, create it | |
if schema not in parsed_tables[db]: | |
parsed_tables['unknown'][schema] = [] | |
parsed_tables['unknown'][schema].append(tbl) | |
# if session temp <#table> | |
elif t.count('#') == 1: | |
parsed_tables['temp'][''].append(t) | |
# if global temp <##table> | |
elif t.count('#') == 2: | |
parsed_tables['global_temp'][''].append(t) | |
# all other cases | |
else: | |
parsed_tables['unknown'][''].append(t) | |
return parsed_tables |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment