Created
July 17, 2022 14:01
-
-
Save hikineet0/53e61101b863850d9b708b6aa9d5b6ac to your computer and use it in GitHub Desktop.
Script to create a relational database out of a user input root directory's directory tree or to query such database.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import annotations | |
| import os | |
| import sys | |
| import json | |
| import pprint | |
| import sqlite3 | |
| import argparse | |
| from pathlib import Path | |
| from typing import cast, List, NewType, Tuple, TypedDict, Generator, Union | |
| Size = NewType("Size", str) | |
| PathLikeString = NewType("PathLikeString", str) | |
| class DirectoryData(TypedDict): | |
| id: int | |
| name: str | |
| size: str | |
| parent: str | |
| children: List[str] | |
| files: List[Tuple[str, Size]] | |
| #----------------------------- Argparse functions ------------------------------# | |
| def validate_root_directory(inp): | |
| abs_inp = os.path.abspath(inp) | |
| if not os.path.exists(abs_inp): | |
| print( | |
| f'Directory does not exist: "{inp}"' | |
| '\n' | |
| "If the directory does exist, please pass the absolute path." | |
| ) | |
| sys.exit() | |
| return abs_inp | |
| def validate_table_name(inp): | |
| if inp.startswith("sqlite_"): | |
| print( | |
| f'Invalid table name: "{inp}"' | |
| '\n' | |
| 'Table name cannot begin with "sqlite_"' | |
| ) | |
| sys.exit() | |
| return inp | |
| def validate_database_name(inp): | |
| if any(char in r':*?"<>|@' for char in inp): | |
| print( | |
| f'Invalid file/directory name: "{inp}"' | |
| '\n' | |
| r"""File/directory name should not contain any of the following characters: ':*?"<>|@'""" | |
| ) | |
| sys.exit() | |
| if not inp.endswith(".db"): | |
| inp += ".db" | |
| return inp | |
| def validate_depth(inp): | |
| try: | |
| inp = int(inp) | |
| if inp <= 0: | |
| raise ValueError("Input depth value < 0.") | |
| except ValueError: | |
| print(f'Invalid depth: "{inp}"' | |
| '\n' | |
| "Depth values must be a number > 0.") | |
| sys.exit() | |
| return inp | |
| def args_parse(): | |
| parser = argparse.ArgumentParser(description="Create and query directory tree database.") | |
| parser.add_argument( | |
| dest="database_name", | |
| type=validate_database_name, | |
| help="Name for database file to create.", | |
| ) | |
| parser.add_argument( | |
| dest="table_name", | |
| type=validate_table_name, | |
| help="Name for database table to create.", | |
| ) | |
| mutually_exclusive = parser.add_mutually_exclusive_group() | |
| mutually_exclusive.add_argument( | |
| "-c", | |
| "--create", | |
| dest="root", | |
| metavar='', | |
| type=validate_root_directory, | |
| help="Root directory to create a directory tree database for." | |
| ' ' | |
| "Works best with absolute path." | |
| ) | |
| mutually_exclusive.add_argument( | |
| "-g", | |
| "--get", | |
| dest="get", | |
| metavar='', | |
| type=str, | |
| help="Name of file/directory to search for." | |
| ) | |
| parser.add_argument( | |
| "-d", | |
| "--depth", | |
| dest="depth", | |
| metavar='', | |
| type=validate_depth, | |
| default=5, | |
| help="For use with -g/--get, denotes depth of nested output. Defaults to 5.", | |
| ) | |
| return parser.parse_args() | |
| #------------------------- End of argparse functions ---------------------------# | |
| #----------------------------- Creation functions ------------------------------# | |
| def convert_size_with_prefix(size, precision=3): | |
| SIZES = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"] | |
| for unit in SIZES: | |
| if (size / 1024) < 1: | |
| return round(size, precision), unit | |
| else: | |
| size = size / 1024 | |
| def create_database(conn, table_name): | |
| conn.execute(f'DROP TABLE IF EXISTS "{table_name}"') | |
| conn.execute(f"""CREATE TABLE "{table_name}" ( | |
| id INT UNIQUE, | |
| name TEXT, | |
| size TEXT, | |
| parent INT NULL, | |
| children JSON NULL, | |
| files JSON, | |
| FOREIGN KEY (parent) REFERENCES "{table_name}"(id) | |
| ) | |
| """) | |
| def create_record( | |
| directory_data_dict: DirectoryData, | |
| table_name: str, | |
| conn: sqlite3.Connection | |
| ): | |
| conn.execute( | |
| f'INSERT INTO "{table_name}"\ | |
| (id, name, size, parent, children, files)\ | |
| VALUES (?, ?, ?, ?, ?, ?)', | |
| ( | |
| directory_data_dict["id"], | |
| directory_data_dict["name"], | |
| directory_data_dict["size"], | |
| directory_data_dict["parent"], | |
| json.dumps(directory_data_dict["children"], ensure_ascii=False), | |
| json.dumps(directory_data_dict["files"], ensure_ascii=False), | |
| ), | |
| ) | |
| def create_json(directory_root: PathLikeString) -> Generator: | |
| """ | |
| Traverses the root directory and creates a dictionary for each sub-directory | |
| with keys: | |
| id: | |
| Unique identifier for a directory. | |
| name: | |
| Name of the directory (absolute path for the root directory). | |
| size: | |
| Total size of the directory. | |
| parent: | |
| The parent directory of a directory, an integer referring to the | |
| id of some other directory (None for root directory); foreign key | |
| referencing id in database. | |
| children: | |
| List of all sub-directories in a sub-directory. | |
| files: | |
| List of all files in a sub-directory along with their sizes. | |
| Parameters | |
| ----------- | |
| directory_root (PathLikeString): | |
| User input root directory. | |
| Yields | |
| ------- | |
| DirectoryData | |
| """ | |
| index_children: List[Tuple[int, List[str]]] = [] | |
| for index, (root, dirs, files) in enumerate(os.walk(directory_root), start=1): | |
| parent: Union[str, int, None] | |
| print(os.path.abspath(root)) | |
| if index == 1: # If root directory. | |
| parent = None | |
| name = root # Absolute path to user input root directory. | |
| else: | |
| parent = root | |
| split = root.split("/") | |
| name = split[-1] or split[-2] # -2 is path ends in '/' else -1. | |
| for idx, children in index_children: | |
| if root.split("/")[-1] in children: | |
| # If current working directory is a child of an already processed | |
| # directory. | |
| parent = idx | |
| break | |
| index_children.append((index, dirs)) | |
| directory_data_dict = { | |
| "id": index, | |
| "name": name, | |
| "size": Size( | |
| " ".join( | |
| map( | |
| str, | |
| convert_size_with_prefix( | |
| size=sum( | |
| f.stat().st_size | |
| for f in Path(root).glob("**/*") | |
| if f.is_file() | |
| ) | |
| ) | |
| ) | |
| ) | |
| ), | |
| "parent": parent, | |
| "children": sorted(dirs), | |
| "files": [ | |
| file + ' (' + | |
| Size( | |
| " ".join( | |
| map( | |
| str, | |
| convert_size_with_prefix( | |
| size=os.stat(os.path.join(root, file)).st_size | |
| ) | |
| ) | |
| ) | |
| ) | |
| + ')' | |
| for file in sorted(files) | |
| ], | |
| } | |
| yield cast(DirectoryData, directory_data_dict) | |
| #------------------------- End of creation functions ---------------------------# | |
| #------------------------------ Search functions -------------------------------# | |
| def recurse_dict(child, dict_, replacement_dict): | |
| # Works on references only. | |
| if dict_.get("children") is None: # None because it can be `[]`. | |
| # Should not be true except for the first run of this function. | |
| # First run because it gets passed an empty dictionary. | |
| return False | |
| for idx, val in enumerate(dict_["children"]): | |
| if val == child: | |
| # Replace ["value"] with respective dictionary. | |
| dict_["children"][idx] = replacement_dict | |
| return True | |
| else: # For loop exited normally means `child` was not found in `dict_["children"]`. | |
| for element in dict_["children"]: | |
| # Check to see if there are any dictionaries in `dict_["children"]`. | |
| # If any, call this function with those dictionaries. | |
| if isinstance(element, dict): | |
| if res:=recurse_dict(child, element, replacement_dict): | |
| return True | |
| else: # For loop exited normally means no match for `child` was found. | |
| # Should not happen except for the top level directory. | |
| return False | |
| def search(depth, /, **query_kwargs): | |
| KEYS = ("id", "name", "size", "parent", "children", "files") | |
| results = _query_database(**query_kwargs) | |
| results = sorted(set(results.fetchall()), key=lambda x: x[0]) | |
| results = results[len(results) > 1:] # [0:] or [1:] | |
| result_dict = dict() | |
| for result in map(list, results): | |
| # `map` because they're tuples, following two lines won't work with | |
| # tuples. | |
| result[4] = json.loads(result[4]) | |
| result[5] = json.loads(result[5]) | |
| replaced = recurse_dict( | |
| result[1], | |
| result_dict, | |
| {KEYS[idx]: val for idx, val in enumerate(result)} # Dictionary representation of a single result. | |
| ) | |
| if not replaced: | |
| result_dict = {KEYS[idx]: val for idx, val in enumerate(result)} | |
| if result_dict: | |
| print("MATCH(ES) FOUND IN:") | |
| print('-'*80) | |
| pprint.pprint( | |
| [ | |
| { | |
| KEYS[idx]: res | |
| for idx, res in enumerate(result) | |
| } | |
| for result in results | |
| ], | |
| sort_dicts=False | |
| ) | |
| print() | |
| print() | |
| print("DIRECTORY TREE:") | |
| print('-'*80) | |
| pprint.pprint(result_dict, sort_dicts=False, depth=depth) | |
| else: | |
| print("Match not found.") | |
| def _query_database(conn: sqlite3.Connection, get: str, table_name: str): | |
| command = f"""WITH RECURSIVE directory_tree AS ( | |
| SELECT | |
| dir_data.id, | |
| dir_data.parent, | |
| dir_data.name, | |
| dir_data.size, | |
| dir_data.children, | |
| dir_data.files | |
| FROM "{table_name}" dir_data | |
| WHERE dir_data.name LIKE "%{get}%" OR dir_data.files LIKE "%{get}%" | |
| UNION ALL | |
| SELECT | |
| dir_data2.id, | |
| dir_data2.parent, | |
| dir_data2.name, | |
| dir_data2.size, | |
| dir_data2.children, | |
| dir_data2.files | |
| FROM "{table_name}" dir_data2 | |
| JOIN directory_tree dt ON dt.parent = dir_data2.id | |
| ) SELECT | |
| id, | |
| name, | |
| size, | |
| parent, | |
| children, | |
| files | |
| FROM directory_tree""" | |
| return conn.execute(command) | |
| #-------------------------- End of search functions ----------------------------# | |
| def main(): | |
| args = args_parse() | |
| database_name = args.database_name | |
| table_name = args.table_name | |
| directory_root = args.root | |
| depth = args.depth | |
| get = args.get | |
| if get is not None: | |
| if not os.path.exists(database_name): | |
| print( | |
| f'Database file not found: "{database_name}"' | |
| '\n' | |
| "Please make sure you entered the correct name for the database file." | |
| ) | |
| sys.exit() | |
| with sqlite3.connect(database_name) as conn: | |
| try: | |
| search(depth, conn=conn, get=get, table_name=table_name) | |
| except sqlite3.OperationalError as e: | |
| if "no such table" in e.args[0]: | |
| name = ': '.join(e.args[0].split(': ')[1:]) | |
| print( | |
| f'No such table: "{name}"' | |
| '\n' | |
| "Please make sure the table name you entered exists in the database." | |
| ) | |
| sys.exit() | |
| else: | |
| with sqlite3.connect(database_name) as conn: | |
| create_database(conn, table_name) | |
| for json_data in create_json(directory_root=PathLikeString(directory_root)): | |
| create_record(json_data, table_name, conn) | |
| if __name__ == "__main__": | |
| try: | |
| main() | |
| except KeyboardInterrupt: | |
| print("\nExit.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment