|
import json |
|
from os import walk |
|
import re |
|
from subprocess import run |
|
from typing import Any, Generator |
|
|
|
|
|
def get_folders(path) -> list[str]: |
|
folders = [] |
|
for root, _, _ in walk(path): |
|
name = root.replace(f"{path}/", "") |
|
if name != path: |
|
folders.append(name) |
|
return folders |
|
|
|
|
|
def get_folder_id_map(path: str) -> dict[str, str]: |
|
folder_id_map = {} |
|
for folder in get_folders(path): |
|
p_encode = run(["bw", "encode"], input=json.dumps({"name": folder}), text=True, capture_output=True) |
|
assert p_encode.returncode |
|
p_create = run(["bw", "create", "folder", p_encode.stdout], text=True, capture_output=True) |
|
if p_create.returncode > 0: |
|
raise RuntimeError(p_create.stdout) |
|
id_ = json.loads(p_create.stdout)["id"] |
|
folder_id_map[folder] = id_ |
|
return folder_id_map |
|
|
|
|
|
def gen_items(path: str, folder_id_map: dict[str, str]) -> Generator[dict[str, Any], None, None]: |
|
for root, _, files in walk(path): |
|
folder_key = root.replace(f"{path}", "")[1:] |
|
for file in files: |
|
item = {"type": 1, "name": file, "login": {}} |
|
with open(f"{root}/{file}") as f: |
|
lines = f.read().splitlines() |
|
note_lines = [] |
|
if not lines[0].startswith("otpauth://"): |
|
item["login"]["password"] = lines[0] |
|
lines = lines[1:] |
|
for line in lines: |
|
if line.startswith("otpauth://"): |
|
item["login"]["totp"] = line |
|
else: |
|
parts = line.split(": ") |
|
if len(parts) == 2: |
|
field, value = parts |
|
if field.lower() in {"username", "email", "e-mail", "login", "user"}: |
|
if existing := item["login"].get("username"): |
|
if "fields" not in item: |
|
item["fields"] = [] |
|
if field.lower() in {"email", "e-mail"}: |
|
item["fields"].append({"type": 0, "name": field, "value": value}) |
|
else: |
|
item["fields"].append({"type": 0, "name": "email", "value": existing}) |
|
item["login"]["username"] = value |
|
else: |
|
item["login"]["username"] = value |
|
elif field.lower() in {"uri", "url"}: |
|
if "uris" not in item["login"]: |
|
item["login"]["uris"] = [] |
|
item["login"]["uris"].append({"uri": value}) |
|
else: |
|
if "fields" not in item: |
|
item["fields"] = [] |
|
item["fields"].append({"type": 0, "name": field, "value": value}) |
|
else: |
|
note_lines.append(line) |
|
if (notes := "\n".join(note_lines)) and notes != "\n": |
|
item["notes"] = notes |
|
if (domain := get_domain(f"{root}/{file}")) and "uris" not in item["login"]: |
|
item["login"]["uris"] = [{"uri": domain}] |
|
if "username" not in item["login"]: |
|
if "/" in folder_key: |
|
fkey, name = folder_key.rsplit("/", 1) |
|
else: |
|
name = folder_key |
|
fkey = None |
|
item["name"] = name or file |
|
item["login"]["username"] = file |
|
else: |
|
fkey = folder_key |
|
if fkey and (fid := folder_id_map.get(fkey)): |
|
item["folderId"] = fid |
|
yield item |
|
|
|
|
|
def get_domain(folder) -> str | None: |
|
for part in folder.split("/"): |
|
if re.fullmatch(r"[a-zA-Z]+(\.[a-zA-Z]+)+", part): |
|
return part |
|
return None |
❤️