Created
March 1, 2018 09:15
-
-
Save tycho/506e810f9378218c5bee451655a9af39 to your computer and use it in GitHub Desktop.
LastPass CSV export to KeePass database script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| import base64 | |
| import codecs | |
| import csv | |
| import getpass | |
| import hashlib | |
| import html | |
| import logging | |
| import os | |
| import pickle | |
| import secrets | |
| import struct | |
| import sys | |
| import time | |
| import uuid | |
| from requests.exceptions import * | |
| from multiprocessing.pool import ThreadPool | |
| from urllib import parse | |
| import requests | |
| from io import BytesIO | |
| from PIL import Image | |
| from bs4 import BeautifulSoup | |
| from lxml import etree | |
| from lxml import objectify | |
| E = objectify.ElementMaker(annotate=False) | |
| from pykeepass import PyKeePass | |
| from pykeepass.icons import KEY, FOLDER | |
| from libkeepass.kdb4 import KDB4Header, KDB4Reader, KDB4_SALSA20_IV | |
| from libkeepass.crypto import sha256, Salsa20 | |
| TIMEOUTS = (5, 15) | |
| log = logging.getLogger(__file__) | |
| urllib3_logger = logging.getLogger('urllib3') | |
| urllib3_logger.setLevel(logging.CRITICAL) | |
| root_logger = logging.getLogger('') | |
| root_logger.setLevel(logging.INFO) | |
| console_log_format = logging.Formatter( | |
| '%(asctime)s %(levelname)7s: %(message)s', | |
| '%Y-%m-%d %H:%M:%S') | |
| console_logger = logging.StreamHandler(sys.stderr) | |
| console_logger.setFormatter(console_log_format) | |
| console_logger.setLevel(logging.INFO) | |
| root_logger.addHandler(console_logger) | |
| def rand_bytes(nbytes): | |
| assert nbytes % 4 == 0 | |
| rbytes = b'' | |
| while nbytes > 0: | |
| rbytes += struct.pack('<I', secrets.randbits(32)) | |
| nbytes -= 4 | |
| return rbytes | |
| class KDB4Writer(KDB4Reader): | |
| def __init__(self, **credentials): | |
| KDB4Reader.__init__(self, None, **credentials) | |
| self.header = KDB4Header() | |
| self.header.EndOfHeader = b'\r\n\r\n' | |
| self.header.CipherID = codecs.decode(b'31c1f2e6bf714350be5805216afc5aff', 'hex') # AES | |
| self.header.CompressionFlags = 1 | |
| self.header.MasterSeed = rand_bytes(32) | |
| self.header.TransformSeed = rand_bytes(32) | |
| self.header.TransformRounds = 1000000 | |
| self.header.EncryptionIV = rand_bytes(16) | |
| self.header.ProtectedStreamKey = rand_bytes(32) | |
| self.header.StreamStartBytes = rand_bytes(32) | |
| self.header.InnerRandomStreamID = 2 # Salsa20 | |
| self.tree = etree.ElementTree(E.KeePassFile( | |
| E.Meta( | |
| E.Generator('lp2kp'), | |
| E.MemoryProtection( | |
| E.ProtectTitle('False'), | |
| E.ProtectUserName('False'), | |
| E.ProtectPassword('False'), | |
| E.ProtectURL('False'), | |
| E.ProtectNotes('False')), | |
| E.Color(), | |
| E.CustomIcons(), | |
| E.Binaries(), | |
| E.HistoryMaxItems('32'), | |
| E.HistoryMaxSize('1048576'), | |
| ), | |
| E.Root( | |
| E.DeletedObjects(), | |
| E.Group( | |
| E.UUID(base64.b64encode(uuid.uuid1().bytes)), | |
| E.Name('Root'), | |
| E.Notes(), | |
| E.IconID(48), | |
| E.IsExpanded('True') | |
| ) | |
| ) | |
| )) | |
| objectify.deannotate(self.tree, pytype=True, cleanup_namespaces=True) | |
| self.obj_root = self.tree.getroot() | |
| self.salsa = Salsa20( | |
| sha256(self.header.ProtectedStreamKey), | |
| KDB4_SALSA20_IV) | |
| self._reset_salsa() | |
| def is_integer(v): | |
| """ | |
| >>> is_integer('3') | |
| True | |
| >>> is_integer('foo') | |
| False | |
| """ | |
| try: | |
| v = int(v) | |
| return True | |
| except ValueError: | |
| return False | |
| class EmptyPyKeePass(PyKeePass): | |
| def __init__(self, filename=None, password=None, keyfile=None): | |
| super(EmptyPyKeePass, self).__init__(None, password, keyfile) | |
| self.kdb_filename = filename | |
| self.kdb = KDB4Writer(password=password, keyfile=keyfile) | |
| def read(self, filename=None, password=None, keyfile=None): | |
| if not filename: | |
| return None | |
| return super(EmptyPyKeePass, self).read(filename, password, keyfile) | |
| def add_entry(self, *args, **kwargs): | |
| icon = kwargs.get('icon') | |
| if not is_integer(icon): | |
| # Custom icon, store it in CustomIconUUID instead, and set Icon to | |
| # 0 as a fallback for KeePass variants that don't understand custom | |
| # icons. | |
| kwargs['icon'] = '0' | |
| entry = super(EmptyPyKeePass, self).add_entry(*args, **kwargs) | |
| if not is_integer(icon): | |
| entry._element.append(E.CustomIconUUID(icon)) | |
| return entry | |
| def _fetch_image(url): | |
| """ | |
| Fetches image data from a URL and returns a tuple of the SHA-1 digest and | |
| PIL Image instance for it. We construct the PIL Image here because we need | |
| to know whether the image data is valid or not. If it's invalid we may try | |
| another URL in the caller. | |
| """ | |
| imgbytes = None | |
| parsed = parse.urlparse(url) | |
| if parsed.scheme in ['http', 'https']: | |
| try: | |
| r = requests.get(url, timeout=TIMEOUTS) | |
| if 200 <= r.status_code < 300: | |
| imgbytes = r.content | |
| except SSLError as exc: | |
| parsed_actual = parse.urlparse(exc.request.url) | |
| if parsed_actual.scheme != parsed.scheme and parsed_actual.hostname == parsed.hostname: | |
| logging.error("Redirected to HTTPS for %s, but certificate is still bad", url) | |
| raise | |
| else: | |
| # We'll just retry with http:// next. | |
| logging.error("Certificate for %s is bad, trying fallbacks", url) | |
| return (None, None) | |
| elif parsed.scheme == 'data': | |
| # e.g. data:image/png;base64,YXJnbGJhcmdsZQo= | |
| meta, data = parsed.path.split(',', 1) | |
| mime, encoding = meta.split(';') | |
| if encoding == 'base64': | |
| imgbytes = base64.b64decode(data) | |
| else: | |
| # Unknown scheme, can't try to fetch. | |
| return (None, None) | |
| if not imgbytes: | |
| # Failed to download (maybe zero byte response?) | |
| return (None, None) | |
| sha1 = hashlib.sha1() | |
| sha1.update(imgbytes) | |
| digest = sha1.digest() | |
| try: | |
| img = Image.open(BytesIO(imgbytes)) | |
| except: | |
| # Not valid image data? | |
| return (None, None) | |
| return (digest, img) | |
| def _fetch_text(url): | |
| """ | |
| Fetches text data from a URL and returns a tuple of the actual URL | |
| retrieved (following redirects) and the text content of the document. | |
| """ | |
| try: | |
| r = requests.get(url, timeout=TIMEOUTS) | |
| if 200 <= r.status_code < 300: | |
| return (r.url, r.text) | |
| except SSLError: | |
| logging.error("Certificate for %s is bad, going to fall back to regular HTTP", url) | |
| except: | |
| #pass | |
| raise | |
| return (None, None) | |
| def _fetch_favicon_basic(hostname): | |
| """ | |
| Attempts to blindly fetch favicons from the commonly-used favicon.ico file | |
| path. Returns a tuple of the SHA-1 digest of the original image data and | |
| a PIL Image object. | |
| """ | |
| urls = [ | |
| 'https://%s/favicon.ico', | |
| 'http://%s/favicon.ico', | |
| ] | |
| for url in urls: | |
| url = url % (hostname) | |
| digest, img = _fetch_image(url) | |
| if img: | |
| return (digest, img) | |
| return (None, None) | |
| def _fetch_favicon_soup(hostname): | |
| """ | |
| Slightly smarter favicon fetcher. Retrieves the index document and searches | |
| it for a relevant favicon <link> element in the header. If that succeeds, | |
| it follows up with a fetch of the image data. Returns a tuple of the SHA-1 | |
| digest of the original image data and a PIL Image object. | |
| """ | |
| urls = [ | |
| 'https://%s/', | |
| 'http://%s/', | |
| ] | |
| imgbytes = None | |
| for url in urls: | |
| url = url % (hostname) | |
| url, text = _fetch_text(url) | |
| if not text: | |
| continue | |
| soup = BeautifulSoup(text, "lxml") | |
| for link in soup.find_all('link'): | |
| parsed = parse.urlparse(url) | |
| absurl = parse.urljoin(parsed.scheme + '://' + parsed.netloc + os.path.dirname(parsed.path), link['href']) | |
| for rel in link['rel']: | |
| rel = rel.lower() | |
| if 'icon' in rel: | |
| digest, img = _fetch_image(absurl) | |
| if img: | |
| return (digest, img) | |
| return (None, None) | |
| def fetch_favicon(hostname): | |
| """ | |
| Attempts to find and retrieve a favicon image from the specified website. | |
| Returns a tuple of the requested hostname, the SHA-1 digest of the original | |
| image data, and the raw bytes of a PNG thumbnail version of the favicon (if | |
| it was too large -- some sites make them crazy big like Authy which gives | |
| a 196x196 image). | |
| """ | |
| try: | |
| digest, img = _fetch_favicon_basic(hostname) | |
| if not img: | |
| digest, img = _fetch_favicon_soup(hostname) | |
| except ConnectionError: | |
| log.error("Host %s connection error, couldn't fetch favicon", hostname) | |
| return (hostname, None, None) | |
| except ReadTimeout: | |
| log.error("Host %s read timeout, couldn't fetch favicon", hostname) | |
| return (hostname, None, None) | |
| except: | |
| log.exception("Caught an unexpected exception, oh no!") | |
| return (hostname, None, None) | |
| if not img: | |
| return (hostname, None, None) | |
| imgbytes = BytesIO() | |
| if img.width > 48 or img.height > 48: | |
| img.thumbnail((48, 48), Image.ANTIALIAS) | |
| img.save(imgbytes, 'PNG') | |
| return (hostname, digest, imgbytes.getvalue()) | |
| def digest_to_uuid(digest): | |
| """ | |
| Truncates a SHA-1 digest and turns it into a 16-byte UUID v4. | |
| """ | |
| # Truncate | |
| digest = digest[-16:] | |
| # Convert to RFC4122-compliant "random" UUID | |
| duuid = uuid.UUID(bytes=digest, version=4) | |
| return duuid.bytes | |
| class IconManager(object): | |
| def __init__(self, cachefilename): | |
| self.cachefilename = cachefilename | |
| self.pool = ThreadPool() | |
| self.jobs = [] | |
| self.queued = [] | |
| self.hosts = {} | |
| self.icons = {} | |
| self._load() | |
| def _load(self): | |
| if os.path.isfile(self.cachefilename): | |
| cache = pickle.load(open(self.cachefilename, 'rb')) | |
| self.hosts = cache['hosts'] | |
| self.icons = cache['icons'] | |
| def _save(self): | |
| pickle.dump( | |
| { | |
| 'hosts': self.hosts, | |
| 'icons': self.icons | |
| }, open(self.cachefilename, 'wb') | |
| ) | |
| def enqueue_hostname(self, hostname): | |
| if not hostname: | |
| return | |
| if hostname in self.queued: | |
| return | |
| if self.hosts.get(hostname, None) is not None: | |
| # Icon exists in cache. | |
| return | |
| if hostname in self.hosts: | |
| # Cache entry exists, but no icon data (failed to retrieve it previously) | |
| return | |
| log.debug("Hostname %s icon is missing, trying to find it" % (hostname,)) | |
| self.jobs.append(self.pool.apply_async(fetch_favicon, (hostname,))) | |
| self.queued.append(hostname) | |
| def join(self): | |
| jobsleft = len(self.jobs) | |
| if jobsleft == 0: | |
| return | |
| log.info("Waiting for %3d favicon fetch jobs to finish...", jobsleft) | |
| for job in self.jobs: | |
| hostname, digest, imgbytes = job.get(999) | |
| jobsleft -= 1 | |
| if jobsleft % 15 == 0: | |
| log.info("... %3d jobs left", jobsleft) | |
| self._save() | |
| if not imgbytes: | |
| self.hosts[hostname] = None | |
| continue | |
| iconuuid = base64.b64encode(digest_to_uuid(digest)).decode('utf-8') | |
| self.icons[iconuuid] = imgbytes | |
| self.hosts[hostname] = iconuuid | |
| log.info("Favicon fetch jobs done.") | |
| self._save() | |
| def get_icon_uuid(self, hostname): | |
| return self.hosts.get(hostname, None) | |
| def serialize(self, meta): | |
| icons = E.CustomIcons() | |
| for key, value in self.icons.items(): | |
| child = icons.append( | |
| E.Icon( | |
| E.UUID(key), | |
| E.Data(base64.b64encode(value).decode('utf-8')) | |
| ) | |
| ) | |
| meta.append(icons) | |
| def check_password(): | |
| dbpass = getpass.getpass('New password: ') | |
| if dbpass != getpass.getpass('Retype new password: '): | |
| log.error("Sorry, passwords do not match.") | |
| return None | |
| if len(dbpass) < 8: | |
| log.error("Please use a longer password! After all, this is the *only* password you should have to remember.") | |
| return None | |
| try: | |
| from zxcvbn import zxcvbn | |
| response = zxcvbn(dbpass) | |
| if response['score'] < 4: | |
| log.warning("") | |
| log.warning("On fast hardware, your password could be cracked in %s", | |
| response['crack_times_display']['offline_slow_hashing_1e4_per_second']) | |
| hints = response['feedback']['suggestions'] | |
| if hints: | |
| log.warning("") | |
| log.warning("zxcvbn offers these suggestions:") | |
| for hint in hints: | |
| log.warning(" %s", hint) | |
| log.warning("") | |
| log.warning("Your password could stand to be stronger. Are you sure you want to use this password?") | |
| log.warning("Press Ctrl-C now or forever hold your peace... (waiting 8 seconds)") | |
| time.sleep(8) | |
| except ImportError: | |
| log.warning("Couldn't import zxcvbn, skipping robust password strength checks.") | |
| time.sleep(2) | |
| return dbpass | |
| def main(): | |
| log.info("Please specify a strong password for the new KeePass database file") | |
| try: | |
| dbpass = check_password() | |
| except KeyboardInterrupt: | |
| sys.exit(1) | |
| if not dbpass: | |
| sys.exit(1) | |
| # Step 1, load and preprocess the LastPass CSV export. | |
| log.info("Loading and sanitizing LastPass export csv...") | |
| reader = csv.DictReader(open('lastpass.csv', 'rt'), delimiter=',') | |
| lines = [] | |
| for line in reader: | |
| # All entries are mangled because of how LastPass exports things. Every | |
| # CSV field has HTML entities escaped, e.g. '&' -> '&'. We need to | |
| # undo this braindamage before we can import anything. | |
| for key in list(line.keys()): | |
| line[key] = html.unescape(line[key]) | |
| # Note-only entries get this bogus URL | |
| if line['url'] == 'http://sn': | |
| line['url'] = '' | |
| # We don't care about these. | |
| del line['fav'] | |
| del line['grouping'] | |
| lines.append(line) | |
| # Step 2, fetch all the favicons and create custom icons for sites with | |
| # such icons. | |
| log.info("Fetching favicons to create custom icons for each KeePass entry...") | |
| icons = IconManager('lastpass.iconcache') | |
| for line in lines: | |
| url = parse.urlparse(line['url']) | |
| icons.enqueue_hostname(url.hostname) | |
| # Wait for favicons to finish downloading before constructing the KeePass | |
| # database. | |
| icons.join() | |
| # Step 3, construct the new KeePass database. | |
| log.info("Constructing KeePass datbase...") | |
| kp = EmptyPyKeePass('lastpass.kdbx', password=dbpass) | |
| root = kp.root_group | |
| for line in lines: | |
| url = parse.urlparse(line['url']) | |
| icon = icons.get_icon_uuid(url.hostname) | |
| kp.add_entry(root, | |
| line['name'], | |
| line['username'], | |
| line['password'], | |
| url=line['url'], | |
| notes=line['extra'], | |
| icon=icon if icon else KEY) | |
| # Now we inject the CustomIcons into the KeePass XML | |
| icons.serialize(kp.kdb.obj_root.Meta) | |
| # Finally, write the database to disk. | |
| log.info("Writing KeePass database to disk...") | |
| kp.save() | |
| log.info("Done!") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment