Skip to content

Instantly share code, notes, and snippets.

@tycho
Created March 1, 2018 09:15
Show Gist options
  • Select an option

  • Save tycho/506e810f9378218c5bee451655a9af39 to your computer and use it in GitHub Desktop.

Select an option

Save tycho/506e810f9378218c5bee451655a9af39 to your computer and use it in GitHub Desktop.
LastPass CSV export to KeePass database script
#!/usr/bin/python3
import base64
import codecs
import csv
import getpass
import hashlib
import html
import logging
import os
import pickle
import secrets
import struct
import sys
import time
import uuid
from requests.exceptions import *
from multiprocessing.pool import ThreadPool
from urllib import parse
import requests
from io import BytesIO
from PIL import Image
from bs4 import BeautifulSoup
from lxml import etree
from lxml import objectify
E = objectify.ElementMaker(annotate=False)
from pykeepass import PyKeePass
from pykeepass.icons import KEY, FOLDER
from libkeepass.kdb4 import KDB4Header, KDB4Reader, KDB4_SALSA20_IV
from libkeepass.crypto import sha256, Salsa20
TIMEOUTS = (5, 15)
log = logging.getLogger(__file__)
urllib3_logger = logging.getLogger('urllib3')
urllib3_logger.setLevel(logging.CRITICAL)
root_logger = logging.getLogger('')
root_logger.setLevel(logging.INFO)
console_log_format = logging.Formatter(
'%(asctime)s %(levelname)7s: %(message)s',
'%Y-%m-%d %H:%M:%S')
console_logger = logging.StreamHandler(sys.stderr)
console_logger.setFormatter(console_log_format)
console_logger.setLevel(logging.INFO)
root_logger.addHandler(console_logger)
def rand_bytes(nbytes):
assert nbytes % 4 == 0
rbytes = b''
while nbytes > 0:
rbytes += struct.pack('<I', secrets.randbits(32))
nbytes -= 4
return rbytes
class KDB4Writer(KDB4Reader):
def __init__(self, **credentials):
KDB4Reader.__init__(self, None, **credentials)
self.header = KDB4Header()
self.header.EndOfHeader = b'\r\n\r\n'
self.header.CipherID = codecs.decode(b'31c1f2e6bf714350be5805216afc5aff', 'hex') # AES
self.header.CompressionFlags = 1
self.header.MasterSeed = rand_bytes(32)
self.header.TransformSeed = rand_bytes(32)
self.header.TransformRounds = 1000000
self.header.EncryptionIV = rand_bytes(16)
self.header.ProtectedStreamKey = rand_bytes(32)
self.header.StreamStartBytes = rand_bytes(32)
self.header.InnerRandomStreamID = 2 # Salsa20
self.tree = etree.ElementTree(E.KeePassFile(
E.Meta(
E.Generator('lp2kp'),
E.MemoryProtection(
E.ProtectTitle('False'),
E.ProtectUserName('False'),
E.ProtectPassword('False'),
E.ProtectURL('False'),
E.ProtectNotes('False')),
E.Color(),
E.CustomIcons(),
E.Binaries(),
E.HistoryMaxItems('32'),
E.HistoryMaxSize('1048576'),
),
E.Root(
E.DeletedObjects(),
E.Group(
E.UUID(base64.b64encode(uuid.uuid1().bytes)),
E.Name('Root'),
E.Notes(),
E.IconID(48),
E.IsExpanded('True')
)
)
))
objectify.deannotate(self.tree, pytype=True, cleanup_namespaces=True)
self.obj_root = self.tree.getroot()
self.salsa = Salsa20(
sha256(self.header.ProtectedStreamKey),
KDB4_SALSA20_IV)
self._reset_salsa()
def is_integer(v):
"""
>>> is_integer('3')
True
>>> is_integer('foo')
False
"""
try:
v = int(v)
return True
except ValueError:
return False
class EmptyPyKeePass(PyKeePass):
def __init__(self, filename=None, password=None, keyfile=None):
super(EmptyPyKeePass, self).__init__(None, password, keyfile)
self.kdb_filename = filename
self.kdb = KDB4Writer(password=password, keyfile=keyfile)
def read(self, filename=None, password=None, keyfile=None):
if not filename:
return None
return super(EmptyPyKeePass, self).read(filename, password, keyfile)
def add_entry(self, *args, **kwargs):
icon = kwargs.get('icon')
if not is_integer(icon):
# Custom icon, store it in CustomIconUUID instead, and set Icon to
# 0 as a fallback for KeePass variants that don't understand custom
# icons.
kwargs['icon'] = '0'
entry = super(EmptyPyKeePass, self).add_entry(*args, **kwargs)
if not is_integer(icon):
entry._element.append(E.CustomIconUUID(icon))
return entry
def _fetch_image(url):
"""
Fetches image data from a URL and returns a tuple of the SHA-1 digest and
PIL Image instance for it. We construct the PIL Image here because we need
to know whether the image data is valid or not. If it's invalid we may try
another URL in the caller.
"""
imgbytes = None
parsed = parse.urlparse(url)
if parsed.scheme in ['http', 'https']:
try:
r = requests.get(url, timeout=TIMEOUTS)
if 200 <= r.status_code < 300:
imgbytes = r.content
except SSLError as exc:
parsed_actual = parse.urlparse(exc.request.url)
if parsed_actual.scheme != parsed.scheme and parsed_actual.hostname == parsed.hostname:
logging.error("Redirected to HTTPS for %s, but certificate is still bad", url)
raise
else:
# We'll just retry with http:// next.
logging.error("Certificate for %s is bad, trying fallbacks", url)
return (None, None)
elif parsed.scheme == 'data':
# e.g. data:image/png;base64,YXJnbGJhcmdsZQo=
meta, data = parsed.path.split(',', 1)
mime, encoding = meta.split(';')
if encoding == 'base64':
imgbytes = base64.b64decode(data)
else:
# Unknown scheme, can't try to fetch.
return (None, None)
if not imgbytes:
# Failed to download (maybe zero byte response?)
return (None, None)
sha1 = hashlib.sha1()
sha1.update(imgbytes)
digest = sha1.digest()
try:
img = Image.open(BytesIO(imgbytes))
except:
# Not valid image data?
return (None, None)
return (digest, img)
def _fetch_text(url):
"""
Fetches text data from a URL and returns a tuple of the actual URL
retrieved (following redirects) and the text content of the document.
"""
try:
r = requests.get(url, timeout=TIMEOUTS)
if 200 <= r.status_code < 300:
return (r.url, r.text)
except SSLError:
logging.error("Certificate for %s is bad, going to fall back to regular HTTP", url)
except:
#pass
raise
return (None, None)
def _fetch_favicon_basic(hostname):
"""
Attempts to blindly fetch favicons from the commonly-used favicon.ico file
path. Returns a tuple of the SHA-1 digest of the original image data and
a PIL Image object.
"""
urls = [
'https://%s/favicon.ico',
'http://%s/favicon.ico',
]
for url in urls:
url = url % (hostname)
digest, img = _fetch_image(url)
if img:
return (digest, img)
return (None, None)
def _fetch_favicon_soup(hostname):
"""
Slightly smarter favicon fetcher. Retrieves the index document and searches
it for a relevant favicon <link> element in the header. If that succeeds,
it follows up with a fetch of the image data. Returns a tuple of the SHA-1
digest of the original image data and a PIL Image object.
"""
urls = [
'https://%s/',
'http://%s/',
]
imgbytes = None
for url in urls:
url = url % (hostname)
url, text = _fetch_text(url)
if not text:
continue
soup = BeautifulSoup(text, "lxml")
for link in soup.find_all('link'):
parsed = parse.urlparse(url)
absurl = parse.urljoin(parsed.scheme + '://' + parsed.netloc + os.path.dirname(parsed.path), link['href'])
for rel in link['rel']:
rel = rel.lower()
if 'icon' in rel:
digest, img = _fetch_image(absurl)
if img:
return (digest, img)
return (None, None)
def fetch_favicon(hostname):
"""
Attempts to find and retrieve a favicon image from the specified website.
Returns a tuple of the requested hostname, the SHA-1 digest of the original
image data, and the raw bytes of a PNG thumbnail version of the favicon (if
it was too large -- some sites make them crazy big like Authy which gives
a 196x196 image).
"""
try:
digest, img = _fetch_favicon_basic(hostname)
if not img:
digest, img = _fetch_favicon_soup(hostname)
except ConnectionError:
log.error("Host %s connection error, couldn't fetch favicon", hostname)
return (hostname, None, None)
except ReadTimeout:
log.error("Host %s read timeout, couldn't fetch favicon", hostname)
return (hostname, None, None)
except:
log.exception("Caught an unexpected exception, oh no!")
return (hostname, None, None)
if not img:
return (hostname, None, None)
imgbytes = BytesIO()
if img.width > 48 or img.height > 48:
img.thumbnail((48, 48), Image.ANTIALIAS)
img.save(imgbytes, 'PNG')
return (hostname, digest, imgbytes.getvalue())
def digest_to_uuid(digest):
"""
Truncates a SHA-1 digest and turns it into a 16-byte UUID v4.
"""
# Truncate
digest = digest[-16:]
# Convert to RFC4122-compliant "random" UUID
duuid = uuid.UUID(bytes=digest, version=4)
return duuid.bytes
class IconManager(object):
def __init__(self, cachefilename):
self.cachefilename = cachefilename
self.pool = ThreadPool()
self.jobs = []
self.queued = []
self.hosts = {}
self.icons = {}
self._load()
def _load(self):
if os.path.isfile(self.cachefilename):
cache = pickle.load(open(self.cachefilename, 'rb'))
self.hosts = cache['hosts']
self.icons = cache['icons']
def _save(self):
pickle.dump(
{
'hosts': self.hosts,
'icons': self.icons
}, open(self.cachefilename, 'wb')
)
def enqueue_hostname(self, hostname):
if not hostname:
return
if hostname in self.queued:
return
if self.hosts.get(hostname, None) is not None:
# Icon exists in cache.
return
if hostname in self.hosts:
# Cache entry exists, but no icon data (failed to retrieve it previously)
return
log.debug("Hostname %s icon is missing, trying to find it" % (hostname,))
self.jobs.append(self.pool.apply_async(fetch_favicon, (hostname,)))
self.queued.append(hostname)
def join(self):
jobsleft = len(self.jobs)
if jobsleft == 0:
return
log.info("Waiting for %3d favicon fetch jobs to finish...", jobsleft)
for job in self.jobs:
hostname, digest, imgbytes = job.get(999)
jobsleft -= 1
if jobsleft % 15 == 0:
log.info("... %3d jobs left", jobsleft)
self._save()
if not imgbytes:
self.hosts[hostname] = None
continue
iconuuid = base64.b64encode(digest_to_uuid(digest)).decode('utf-8')
self.icons[iconuuid] = imgbytes
self.hosts[hostname] = iconuuid
log.info("Favicon fetch jobs done.")
self._save()
def get_icon_uuid(self, hostname):
return self.hosts.get(hostname, None)
def serialize(self, meta):
icons = E.CustomIcons()
for key, value in self.icons.items():
child = icons.append(
E.Icon(
E.UUID(key),
E.Data(base64.b64encode(value).decode('utf-8'))
)
)
meta.append(icons)
def check_password():
dbpass = getpass.getpass('New password: ')
if dbpass != getpass.getpass('Retype new password: '):
log.error("Sorry, passwords do not match.")
return None
if len(dbpass) < 8:
log.error("Please use a longer password! After all, this is the *only* password you should have to remember.")
return None
try:
from zxcvbn import zxcvbn
response = zxcvbn(dbpass)
if response['score'] < 4:
log.warning("")
log.warning("On fast hardware, your password could be cracked in %s",
response['crack_times_display']['offline_slow_hashing_1e4_per_second'])
hints = response['feedback']['suggestions']
if hints:
log.warning("")
log.warning("zxcvbn offers these suggestions:")
for hint in hints:
log.warning(" %s", hint)
log.warning("")
log.warning("Your password could stand to be stronger. Are you sure you want to use this password?")
log.warning("Press Ctrl-C now or forever hold your peace... (waiting 8 seconds)")
time.sleep(8)
except ImportError:
log.warning("Couldn't import zxcvbn, skipping robust password strength checks.")
time.sleep(2)
return dbpass
def main():
log.info("Please specify a strong password for the new KeePass database file")
try:
dbpass = check_password()
except KeyboardInterrupt:
sys.exit(1)
if not dbpass:
sys.exit(1)
# Step 1, load and preprocess the LastPass CSV export.
log.info("Loading and sanitizing LastPass export csv...")
reader = csv.DictReader(open('lastpass.csv', 'rt'), delimiter=',')
lines = []
for line in reader:
# All entries are mangled because of how LastPass exports things. Every
# CSV field has HTML entities escaped, e.g. '&' -> '&amp;'. We need to
# undo this braindamage before we can import anything.
for key in list(line.keys()):
line[key] = html.unescape(line[key])
# Note-only entries get this bogus URL
if line['url'] == 'http://sn':
line['url'] = ''
# We don't care about these.
del line['fav']
del line['grouping']
lines.append(line)
# Step 2, fetch all the favicons and create custom icons for sites with
# such icons.
log.info("Fetching favicons to create custom icons for each KeePass entry...")
icons = IconManager('lastpass.iconcache')
for line in lines:
url = parse.urlparse(line['url'])
icons.enqueue_hostname(url.hostname)
# Wait for favicons to finish downloading before constructing the KeePass
# database.
icons.join()
# Step 3, construct the new KeePass database.
log.info("Constructing KeePass datbase...")
kp = EmptyPyKeePass('lastpass.kdbx', password=dbpass)
root = kp.root_group
for line in lines:
url = parse.urlparse(line['url'])
icon = icons.get_icon_uuid(url.hostname)
kp.add_entry(root,
line['name'],
line['username'],
line['password'],
url=line['url'],
notes=line['extra'],
icon=icon if icon else KEY)
# Now we inject the CustomIcons into the KeePass XML
icons.serialize(kp.kdb.obj_root.Meta)
# Finally, write the database to disk.
log.info("Writing KeePass database to disk...")
kp.save()
log.info("Done!")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment