Skip to content

Instantly share code, notes, and snippets.

@hikineet0
Created July 17, 2022 14:01
Show Gist options
  • Select an option

  • Save hikineet0/53e61101b863850d9b708b6aa9d5b6ac to your computer and use it in GitHub Desktop.

Select an option

Save hikineet0/53e61101b863850d9b708b6aa9d5b6ac to your computer and use it in GitHub Desktop.
Script to create a relational database out of a user input root directory's directory tree or to query such database.
from __future__ import annotations
import os
import sys
import json
import pprint
import sqlite3
import argparse
from pathlib import Path
from typing import cast, List, NewType, Tuple, TypedDict, Generator, Union
Size = NewType("Size", str)
PathLikeString = NewType("PathLikeString", str)
class DirectoryData(TypedDict):
id: int
name: str
size: str
parent: str
children: List[str]
files: List[Tuple[str, Size]]
#----------------------------- Argparse functions ------------------------------#
def validate_root_directory(inp):
abs_inp = os.path.abspath(inp)
if not os.path.exists(abs_inp):
print(
f'Directory does not exist: "{inp}"'
'\n'
"If the directory does exist, please pass the absolute path."
)
sys.exit()
return abs_inp
def validate_table_name(inp):
if inp.startswith("sqlite_"):
print(
f'Invalid table name: "{inp}"'
'\n'
'Table name cannot begin with "sqlite_"'
)
sys.exit()
return inp
def validate_database_name(inp):
if any(char in r':*?"<>|@' for char in inp):
print(
f'Invalid file/directory name: "{inp}"'
'\n'
r"""File/directory name should not contain any of the following characters: ':*?"<>|@'"""
)
sys.exit()
if not inp.endswith(".db"):
inp += ".db"
return inp
def validate_depth(inp):
try:
inp = int(inp)
if inp <= 0:
raise ValueError("Input depth value < 0.")
except ValueError:
print(f'Invalid depth: "{inp}"'
'\n'
"Depth values must be a number > 0.")
sys.exit()
return inp
def args_parse():
parser = argparse.ArgumentParser(description="Create and query directory tree database.")
parser.add_argument(
dest="database_name",
type=validate_database_name,
help="Name for database file to create.",
)
parser.add_argument(
dest="table_name",
type=validate_table_name,
help="Name for database table to create.",
)
mutually_exclusive = parser.add_mutually_exclusive_group()
mutually_exclusive.add_argument(
"-c",
"--create",
dest="root",
metavar='',
type=validate_root_directory,
help="Root directory to create a directory tree database for."
' '
"Works best with absolute path."
)
mutually_exclusive.add_argument(
"-g",
"--get",
dest="get",
metavar='',
type=str,
help="Name of file/directory to search for."
)
parser.add_argument(
"-d",
"--depth",
dest="depth",
metavar='',
type=validate_depth,
default=5,
help="For use with -g/--get, denotes depth of nested output. Defaults to 5.",
)
return parser.parse_args()
#------------------------- End of argparse functions ---------------------------#
#----------------------------- Creation functions ------------------------------#
def convert_size_with_prefix(size, precision=3):
SIZES = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"]
for unit in SIZES:
if (size / 1024) < 1:
return round(size, precision), unit
else:
size = size / 1024
def create_database(conn, table_name):
conn.execute(f'DROP TABLE IF EXISTS "{table_name}"')
conn.execute(f"""CREATE TABLE "{table_name}" (
id INT UNIQUE,
name TEXT,
size TEXT,
parent INT NULL,
children JSON NULL,
files JSON,
FOREIGN KEY (parent) REFERENCES "{table_name}"(id)
)
""")
def create_record(
directory_data_dict: DirectoryData,
table_name: str,
conn: sqlite3.Connection
):
conn.execute(
f'INSERT INTO "{table_name}"\
(id, name, size, parent, children, files)\
VALUES (?, ?, ?, ?, ?, ?)',
(
directory_data_dict["id"],
directory_data_dict["name"],
directory_data_dict["size"],
directory_data_dict["parent"],
json.dumps(directory_data_dict["children"], ensure_ascii=False),
json.dumps(directory_data_dict["files"], ensure_ascii=False),
),
)
def create_json(directory_root: PathLikeString) -> Generator:
"""
Traverses the root directory and creates a dictionary for each sub-directory
with keys:
id:
Unique identifier for a directory.
name:
Name of the directory (absolute path for the root directory).
size:
Total size of the directory.
parent:
The parent directory of a directory, an integer referring to the
id of some other directory (None for root directory); foreign key
referencing id in database.
children:
List of all sub-directories in a sub-directory.
files:
List of all files in a sub-directory along with their sizes.
Parameters
-----------
directory_root (PathLikeString):
User input root directory.
Yields
-------
DirectoryData
"""
index_children: List[Tuple[int, List[str]]] = []
for index, (root, dirs, files) in enumerate(os.walk(directory_root), start=1):
parent: Union[str, int, None]
print(os.path.abspath(root))
if index == 1: # If root directory.
parent = None
name = root # Absolute path to user input root directory.
else:
parent = root
split = root.split("/")
name = split[-1] or split[-2] # -2 is path ends in '/' else -1.
for idx, children in index_children:
if root.split("/")[-1] in children:
# If current working directory is a child of an already processed
# directory.
parent = idx
break
index_children.append((index, dirs))
directory_data_dict = {
"id": index,
"name": name,
"size": Size(
" ".join(
map(
str,
convert_size_with_prefix(
size=sum(
f.stat().st_size
for f in Path(root).glob("**/*")
if f.is_file()
)
)
)
)
),
"parent": parent,
"children": sorted(dirs),
"files": [
file + ' (' +
Size(
" ".join(
map(
str,
convert_size_with_prefix(
size=os.stat(os.path.join(root, file)).st_size
)
)
)
)
+ ')'
for file in sorted(files)
],
}
yield cast(DirectoryData, directory_data_dict)
#------------------------- End of creation functions ---------------------------#
#------------------------------ Search functions -------------------------------#
def recurse_dict(child, dict_, replacement_dict):
# Works on references only.
if dict_.get("children") is None: # None because it can be `[]`.
# Should not be true except for the first run of this function.
# First run because it gets passed an empty dictionary.
return False
for idx, val in enumerate(dict_["children"]):
if val == child:
# Replace ["value"] with respective dictionary.
dict_["children"][idx] = replacement_dict
return True
else: # For loop exited normally means `child` was not found in `dict_["children"]`.
for element in dict_["children"]:
# Check to see if there are any dictionaries in `dict_["children"]`.
# If any, call this function with those dictionaries.
if isinstance(element, dict):
if res:=recurse_dict(child, element, replacement_dict):
return True
else: # For loop exited normally means no match for `child` was found.
# Should not happen except for the top level directory.
return False
def search(depth, /, **query_kwargs):
KEYS = ("id", "name", "size", "parent", "children", "files")
results = _query_database(**query_kwargs)
results = sorted(set(results.fetchall()), key=lambda x: x[0])
results = results[len(results) > 1:] # [0:] or [1:]
result_dict = dict()
for result in map(list, results):
# `map` because they're tuples, following two lines won't work with
# tuples.
result[4] = json.loads(result[4])
result[5] = json.loads(result[5])
replaced = recurse_dict(
result[1],
result_dict,
{KEYS[idx]: val for idx, val in enumerate(result)} # Dictionary representation of a single result.
)
if not replaced:
result_dict = {KEYS[idx]: val for idx, val in enumerate(result)}
if result_dict:
print("MATCH(ES) FOUND IN:")
print('-'*80)
pprint.pprint(
[
{
KEYS[idx]: res
for idx, res in enumerate(result)
}
for result in results
],
sort_dicts=False
)
print()
print()
print("DIRECTORY TREE:")
print('-'*80)
pprint.pprint(result_dict, sort_dicts=False, depth=depth)
else:
print("Match not found.")
def _query_database(conn: sqlite3.Connection, get: str, table_name: str):
command = f"""WITH RECURSIVE directory_tree AS (
SELECT
dir_data.id,
dir_data.parent,
dir_data.name,
dir_data.size,
dir_data.children,
dir_data.files
FROM "{table_name}" dir_data
WHERE dir_data.name LIKE "%{get}%" OR dir_data.files LIKE "%{get}%"
UNION ALL
SELECT
dir_data2.id,
dir_data2.parent,
dir_data2.name,
dir_data2.size,
dir_data2.children,
dir_data2.files
FROM "{table_name}" dir_data2
JOIN directory_tree dt ON dt.parent = dir_data2.id
) SELECT
id,
name,
size,
parent,
children,
files
FROM directory_tree"""
return conn.execute(command)
#-------------------------- End of search functions ----------------------------#
def main():
args = args_parse()
database_name = args.database_name
table_name = args.table_name
directory_root = args.root
depth = args.depth
get = args.get
if get is not None:
if not os.path.exists(database_name):
print(
f'Database file not found: "{database_name}"'
'\n'
"Please make sure you entered the correct name for the database file."
)
sys.exit()
with sqlite3.connect(database_name) as conn:
try:
search(depth, conn=conn, get=get, table_name=table_name)
except sqlite3.OperationalError as e:
if "no such table" in e.args[0]:
name = ': '.join(e.args[0].split(': ')[1:])
print(
f'No such table: "{name}"'
'\n'
"Please make sure the table name you entered exists in the database."
)
sys.exit()
else:
with sqlite3.connect(database_name) as conn:
create_database(conn, table_name)
for json_data in create_json(directory_root=PathLikeString(directory_root)):
create_record(json_data, table_name, conn)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nExit.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment