-
-
Save rolltidehero/71341dae7100e604fcf37f0a0b409e2d to your computer and use it in GitHub Desktop.
1password duplicate remover (alpha, only run in debugger with breakpoints everywhere *g*)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import json | |
import subprocess | |
import sys | |
from concurrent.futures import ThreadPoolExecutor | |
import html | |
import dictdiffer | |
import iso8601 | |
_BAD_ENTRY_KEY = '_bad_password_key' | |
def get_user_pw(vault_uuid: str, item_uuid: str): | |
item = subprocess.check_output(["/Users/amohr/bin/op", 'get', 'item', f"--vault={vault_uuid}", item_uuid]) | |
item = json.loads(item) | |
details = item["details"] | |
if details.get('fields'): | |
saved_entries = set() | |
for field in item["details"]["fields"]: | |
if not field['value']: | |
continue | |
if not "name" in field: | |
continue | |
if "&" in field['value']: # sigh | |
field['value'] = html.unescape(field['value']) | |
item[_BAD_ENTRY_KEY] = True | |
saved_entries.add((field['name'], field['value'])) | |
assert saved_entries | |
return saved_entries, item | |
else: | |
if 'fields' in details: | |
del details['fields'] # for later comparison | |
assert details.keys() - {'notesPlain', 'sections'} == set() | |
return (item["details"], None), item | |
def normalize_item(item: dict): | |
for key in {"pbe", "ps", "pgrng"}: # PasswordBitEntropy/PasswordStrength/? | |
if key in item['overview']: | |
del item['overview'][key] | |
for tag in list(item["overview"].get('tags', [])): | |
if tag == '(none)': # sigh | |
item["overview"]["tags"].remove(tag) | |
if "tags" in item["overview"] and not item["overview"]["tags"]: | |
del item["overview"]["tags"] | |
for url in list(item["overview"].get("URLs", [])): | |
if url == {'l': ''}: | |
item["overview"]["URLs"].remove(url) | |
continue | |
if url.get('l') == '': | |
url['l'] = 'website' | |
# old lastpass export html encoded the URLs :( | |
url["u"] = url["u"].lower() | |
if "&" in url["u"]: | |
url["u"] = html.unescape(url["u"]) | |
if "url" in item["overview"]: | |
item["overview"]["url"] = item["overview"]["url"].lower() | |
if "&" in item["overview"]["url"]: | |
item["overview"]["url"] = html.unescape(item["overview"]["url"]) | |
item[_BAD_ENTRY_KEY] = True | |
def main(): | |
passwords = dict() # {(vault_uuid, title, user_name): item} | |
items = subprocess.check_output(["/Users/amohr/bin/op", 'list', 'items']) | |
items = json.loads(items) | |
with ThreadPoolExecutor() as pool: | |
for item in items: | |
if item["trashed"] != "N": | |
continue | |
normalize_item(item) | |
vault_uuid = item["vaultUuid"] | |
pw_key = (vault_uuid, item["overview"]["title"], item["overview"].get("ainfo")) | |
if pw_key not in passwords: | |
passwords[pw_key] = item | |
else: | |
fut1 = pool.submit(get_user_pw, vault_uuid, passwords[pw_key]["uuid"]) | |
fut2 = pool.submit(get_user_pw, vault_uuid, item["uuid"]) | |
existing_user_pwd, existing_item = fut1.result() | |
item_user_pwd, item_pw = fut2.result() | |
if item_user_pwd == existing_user_pwd: | |
# TODO: support keeping version which has appIds | |
diff = list(dictdiffer.diff(item["overview"], passwords[pw_key]["overview"])) | |
if diff: | |
print(f"non-empty overview diff of key: {pw_key} diff: {diff}") | |
continue | |
existing_item["createdAt"] = iso8601.parse_date(existing_item["createdAt"]) | |
item_pw["createdAt"] = iso8601.parse_date(item_pw["createdAt"]) | |
print(f"deleting instance of: {pw_key}") | |
assert not (item_pw.get(_BAD_ENTRY_KEY) and existing_item.get(_BAD_ENTRY_KEY)) | |
# TODO: add validation of "details" section keys | |
if item_pw.get(_BAD_ENTRY_KEY) or (existing_item["details"].get('passwordHistory') and not item_pw["details"].get('passwordHistory')): | |
subprocess.check_call(["/Users/amohr/bin/op", 'delete', 'item', f"--vault={vault_uuid}", item_pw["uuid"]]) | |
elif existing_item.get(_BAD_ENTRY_KEY) or (not existing_item["details"].get('passwordHistory') and item_pw["details"].get('passwordHistory')): | |
subprocess.check_call(["/Users/amohr/bin/op", 'delete', 'item', f"--vault={vault_uuid}", existing_item["uuid"]]) | |
passwords[pw_key] = item | |
elif existing_item["createdAt"] <= item_pw["createdAt"]: | |
subprocess.check_call(["/Users/amohr/bin/op", 'delete', 'item', f"--vault={vault_uuid}", item_pw["uuid"]]) | |
else: | |
subprocess.check_call(["/Users/amohr/bin/op", 'delete', 'item', f"--vault={vault_uuid}", existing_item["uuid"]]) | |
passwords[pw_key] = item | |
else: | |
print(f"different: {item}, {passwords[pw_key]}") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment