Last active
July 20, 2024 22:05
-
-
Save AdamGagorik/e66782f46cc83aed654ea1673833f86b to your computer and use it in GitHub Desktop.
A script to memorize common directories
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Memorize key directories and cd to them. | |
examples: | |
cd $(harpoon) | |
cd $(harpoon key) | |
harpoon add <key> <path> | |
harpoon register <shell> | |
After running harpoon register, and sourcing your rc file, the `h` command will be availiable. | |
examples: | |
h # run directory selection | |
h add key path # run any harpoon command | |
""" | |
import argparse | |
import contextlib | |
import dataclasses | |
import logging | |
import os | |
import shelve | |
import sys | |
from dataclasses import dataclass | |
from pathlib import Path | |
from subprocess import PIPE, CalledProcessError, Popen, run | |
from typing import Any, Generator | |
DATABASE = Path(__file__).parent.joinpath("harpoon.db") | |
@contextlib.contextmanager | |
def context() -> Generator[shelve.Shelf, None, None]: | |
try: | |
with shelve.open(str(DATABASE), writeback=True) as db: | |
yield db | |
finally: | |
pass | |
def yes(msg: str, default: str = "y") -> bool: | |
return ( | |
input(f"{msg} (yes/no) [default={default}]: ") or default | |
).strip().lower() in {"y", "yes"} | |
def initialize_db(): | |
with context(): | |
pass | |
def insert_new_item(key: str, path: Path, file: bool): | |
path = path.resolve() | |
if not path.exists(): | |
raise FileNotFoundError(path) | |
if not file and not path.is_dir(): | |
raise NotADirectoryError(path) | |
with context() as db: | |
if key in db: | |
logging.warning("key already exists! %s", key) | |
logging.info("add %s", key) | |
db[key] = str(path) | |
def delete_existing_item(key: str, force: bool): | |
with context() as db: | |
if key in db: | |
if force or yes(f"Delete {key}?"): | |
logging.info("del %s", key) | |
del db[key] | |
else: | |
logging.error("operation canceled") | |
else: | |
logging.warning("key does not exist! %s", key) | |
def delete_missing_items(force: bool) -> None: | |
with context() as db: | |
for key, path in db.items(): | |
if not Path(path).exists(): | |
if force or yes(f"Delete {key}?"): | |
logging.info("del %s", key) | |
del db[key] | |
else: | |
logging.error("operation canceled") | |
def display_existing_item(key: str): | |
with context() as db: | |
if key in db: | |
print(db[key]) | |
else: | |
logging.error("key does not exist! %s", key) | |
print("NOT_FOUND") | |
def show_all_items_in_storage(): | |
with context() as db: | |
size = len(db) | |
if size == 0: | |
print(f"total: {size}") | |
else: | |
print("\n".join(make_databse_table()) + f" total: {size}") | |
def get_database_info() -> Generator[dict[str, Any], None, None]: | |
with context() as db: | |
for key, path in db.items(): | |
path = Path(path) | |
yield { | |
"key": key, | |
"path": str(path), | |
"exists": "x" if path.exists() else "", | |
"is_dir": "x" if path.is_dir() else "", | |
"is_file": "x" if path.is_file() else "", | |
} | |
def make_databse_table() -> Generator[str, None, None]: | |
widths = {} | |
for row in get_database_info(): | |
for col, value in row.items(): | |
widths[col] = max(widths.get(col, len(col)), len(str(value))) | |
yield "| {} |".format(" | ".join(f"{k:<{w}}" for k, w in widths.items())) | |
yield "|{}|".format("|".join("-" * (w + 2) for k, w in widths.items())) | |
for row in get_database_info(): | |
yield "| {} |".format( | |
" | ".join(f"{value:<{widths[col]}}" for col, value in row.items()) | |
) | |
def remove_database(force: bool): | |
with context() as db: | |
count = len(db) | |
if force or yes(f"Delete {count} items?"): | |
for ext in {"bak", "dat", "dir"}: | |
if (path := DATABASE.with_suffix(f".db.{ext}")).exists(): | |
logging.info("rm %s", path) | |
os.remove(path) | |
else: | |
logging.error("operation canceled") | |
@dataclass | |
class Selector: | |
items: tuple[Any, ...] = () | |
headers: tuple[str, ...] = () | |
delimiter: str = "\u00a0" | |
select: int | None = None | |
match: int | None = None | |
def __call__(self) -> str: | |
command = [ | |
"fzf", | |
"--height=~100%", | |
"--delimiter", | |
self.delimiter, | |
*(("--nth", f"{self.match + 1}") if self.match is not None else ()), | |
"--header-lines", | |
"1", | |
] | |
inputs = b"\n".join(self.inputs) | |
process = Popen(command, stdin=PIPE, stdout=PIPE) | |
stdout, stderr = process.communicate(input=inputs) | |
if process.returncode != 0: | |
raise CalledProcessError( | |
process.returncode, " ".join(command), stdout, stderr | |
) | |
if self.select is not None: | |
return [ | |
token.strip() | |
for token in stdout.decode("utf-8").strip().split(self.delimiter) | |
][self.select] | |
else: | |
return stdout.decode("utf-8").strip() | |
@property | |
def inputs(self) -> Generator[bytes, None, None]: | |
widths = {} | |
for i, item in enumerate(self.items): | |
for j, value in enumerate(item): | |
widths[j] = max(widths.get(j, len(self.headers[j])), len(str(value))) | |
yield self.delimiter.join( | |
f"{header:<{widths[i]}}" for i, header in enumerate(self.headers) | |
).encode("utf-8") | |
for item in self.items: | |
yield self.delimiter.join( | |
f"{value:<{widths[i]}}" for i, value in enumerate(item) | |
).encode("utf-8") | |
def select_from_table(col: int = 0) -> str: | |
with context() as db: | |
if db: | |
return Selector( | |
items=tuple(db.items()), headers=("KEY", "PATH"), match=0, select=col | |
)() | |
else: | |
raise RuntimeError("no key to select, database is empty!") | |
def select_path_with_fzf(file: bool = False) -> Path: | |
if file: | |
p0 = Popen(["fd", "--type", "f"], stdout=PIPE, stderr=PIPE) | |
else: | |
p0 = Popen(["fd", "--type", "d"], stdout=PIPE, stderr=PIPE) | |
p1 = Popen(["fzf"], stdin=p0.stdout, stdout=PIPE, stderr=PIPE) | |
stdout, stderr = p1.communicate() | |
for p in {p1}: | |
if p.returncode != 0: | |
raise CalledProcessError(p.returncode, " ".join(map(str, p.args))) | |
path = Path(stdout.decode("utf-8").strip()) | |
if not path.exists(): | |
raise FileNotFoundError(path) | |
return path | |
H_COMMAND = r""" | |
function h() { | |
if [ $# -eq 0 ]; then | |
cd "$(harpoon show)" | |
else | |
harpoon "$@" | |
fi | |
} | |
"""[ | |
1:-1 | |
].rstrip() | |
def add_h_command_to_rc_file(shell: str) -> None: | |
path: Path = { | |
"zsh": Path.home().joinpath(".zshrc"), | |
"bash": Path.home().joinpath(".bashrc"), | |
}[shell] | |
with path.open("r") as stream: | |
contents = stream.read() | |
START_EDIT_LINE = "# >>> __harpoon__" | |
FINAL_EDIT_LINE = "# <<< __harpoon__" | |
if START_EDIT_LINE in contents or FINAL_EDIT_LINE in contents: | |
raise NotImplementedError("shell file already edited") | |
else: | |
with path.open("a") as stream: | |
stream.write(f"\n{START_EDIT_LINE}\n{H_COMMAND}\n{FINAL_EDIT_LINE}\n") | |
def main(): | |
logging.basicConfig( | |
level=logging.INFO, stream=sys.stderr, format="%(levelname)-7s | %(message)s" | |
) | |
parent = argparse.ArgumentParser(add_help=False) | |
parent.add_argument("action", nargs="?") | |
parser = argparse.ArgumentParser( | |
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter | |
) | |
subparsers = parser.add_subparsers(dest="action") | |
parser_lut = { | |
"add": subparsers.add_parser("add", help="add a new row to the storage"), | |
"del": subparsers.add_parser("del", help="remove the row at the given key"), | |
"show": subparsers.add_parser("show", help="show the path given the key"), | |
"list": subparsers.add_parser("list", help="list all items in the database"), | |
"clear": subparsers.add_parser( | |
"clear", help="clear all items from the database" | |
), | |
"prune": subparsers.add_parser( | |
"prune", help="clear items that do not exist from the database" | |
), | |
"register": subparsers.add_parser( | |
"register", help="Add the `h` command to the specified shell" | |
), | |
} | |
for k in {"add", "del", "show"}: | |
kwargs = dict(help="The associated key of a path") | |
if k not in {"add"}: | |
kwargs.update(nargs="?", default=None) | |
parser_lut[k].add_argument("key", **kwargs) | |
for k in {"add"}: | |
parser_lut[k].add_argument( | |
"path", type=Path, nargs="?", default=None, help="The path to memorize" | |
) | |
parser_lut[k].add_argument( | |
"--file", action="store_true", help="allow files to be memorized" | |
) | |
for k in {"del", "prune", "clear"}: | |
parser_lut[k].add_argument( | |
"--yes", action="store_true", help="do not prompt for confirmations?" | |
) | |
parser_lut["register"].add_argument( | |
"shell", choices=["zsh", "bash"], help="the shell to modify" | |
) | |
opts, remaining = parent.parse_known_args() | |
if "--help" in sys.argv or "-h" in sys.argv: | |
parser.parse_args() | |
return 0 | |
elif opts.action is None: | |
print(select_from_table(col=1)) | |
return 0 | |
elif opts.action in parser_lut: | |
opts = parser.parse_args() | |
else: | |
opts = parser.parse_args(args=("show", opts.action, *remaining)) | |
if opts.action == "add": | |
opts.path = ( | |
opts.path if opts.path is not None else select_path_with_fzf(file=opts.file) | |
) | |
insert_new_item(key=opts.key, path=opts.path, file=opts.file) | |
elif opts.action == "del": | |
opts.key = opts.key if opts.key is not None else select_from_table(col=0) | |
delete_existing_item(key=opts.key, force=opts.yes) | |
elif opts.action == "show": | |
opts.key = opts.key if opts.key is not None else select_from_table(col=0) | |
display_existing_item(opts.key) | |
elif opts.action == "list": | |
show_all_items_in_storage() | |
elif opts.action == "prune": | |
delete_missing_items(force=opts.yes) | |
elif opts.action == "clear": | |
remove_database(force=opts.yes) | |
elif opts.action == "register": | |
add_h_command_to_rc_file(shell=opts.shell) | |
else: | |
logging.error("unknown action: %s", opts.action) | |
return 1 | |
return 0 | |
if __name__ == "__main__": | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment