Last active
April 24, 2022 00:22
-
-
Save captainGeech42/e0deb7e95d5589dcadba81202cbd7c62 to your computer and use it in GitHub Desktop.
Simple TOTP manager, backed by sqlite, intended for backing up TOTP in a usable manner.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# script for managing OTP codes. requires py3.6+ | |
# secrets can optionally be encrypted at rest with pbkdf2/chacha20 | |
# uses a sqlite database for storing data | |
# to run tests: python -m pytest otpboi.py | |
import argparse | |
import base64 | |
import binascii | |
import logging | |
import os | |
import sqlite3 | |
import sys | |
from typing import Callable, List, Optional, Tuple | |
import unittest | |
logging.basicConfig( | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
level=logging.INFO, | |
datefmt="%Y/%m/%d %H:%M:%S", | |
handlers=[ | |
logging.StreamHandler() | |
] | |
) | |
# check 3p deps | |
# mfw no requirements.txt, the sacrifices we make for single file python scripts :( | |
try: | |
import pyotp | |
except ImportError: | |
logging.fatal("You need to install pyotp from pip") | |
sys.exit(2) | |
try: | |
from Crypto.Protocol.KDF import PBKDF2 | |
from Crypto.Cipher import ChaCha20 | |
from Crypto.Hash import SHA512 | |
from Crypto.Random import get_random_bytes | |
except ImportError: | |
logging.fatal("You need to install pycryptodome from pip") | |
sys.exit(2) | |
class EncryptionManager(): | |
# KDF args | |
count = 10000 | |
hash_algo = SHA512 | |
def __init__(self, key: str, salt: bytes): | |
self._user_key = key | |
self._salt = salt | |
self._chacha_key = PBKDF2(self._user_key, self._salt, 32, count=EncryptionManager.count, hmac_hash_module=EncryptionManager.hash_algo) | |
logging.debug("EncryptionManager successfully initialized") | |
def encrypt(self, ptxt: str) -> Tuple[str, str]: | |
""" | |
Encrypt a value using ChaCha20 | |
Returns a tuple of (b64 ctxt, b64 nonce) | |
""" | |
cipher = ChaCha20.new(key=self._chacha_key) | |
ctxt = base64.b64encode(cipher.encrypt(ptxt.encode())).decode() | |
nonce = base64.b64encode(cipher.nonce).decode() | |
logging.debug("Encrypted %d bytes using ChaCha20", len(ptxt)) | |
return (ctxt, nonce) | |
def decrypt(self, ctxt: str, nonce: str) -> str: | |
""" | |
Decrypt a value using ChaCha20 | |
ctxt and nonce should be b64 encoded | |
Returns the plaintext string | |
""" | |
cipher = ChaCha20.new(key=self._chacha_key, nonce=base64.b64decode(nonce)) | |
ptxt = cipher.decrypt(base64.b64decode(ctxt)) | |
return ptxt.decode() | |
class DbManager: | |
def __init__(self, path: str = ":memory:"): | |
self._path = path | |
self._handle = sqlite3.connect(self._path) | |
logging.debug("Connected to database at path: %s", self._path) | |
self.ready = self._migrate() | |
logging.debug("Database migrations completed successfully") | |
def _migrate(self) -> bool: | |
""" | |
Add tables to the database | |
""" | |
if self._handle is None: | |
logging.warning("Tried to execute database migrations while database handle is unset") | |
return False | |
try: | |
# encrypted: if 1, yes encrypted | |
# nonce: b64 encoded, optional (if encrypted==1, must be present) | |
# secret is b64 encoded if encrypted, otherwise just normal text value | |
self._handle.execute(""" | |
CREATE TABLE IF NOT EXISTS `totp` ( | |
`id` INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, | |
`date_created` TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
`full_name` TEXT NOT NULL UNIQUE, | |
`short_name` TEXT NOT NULL UNIQUE, | |
`secret` TEXT NOT NULL, | |
`digits` INTEGER NOT NULL, | |
`period` INTEGER NOT NULL, | |
`encrypted` INTEGER NOT NULL, | |
`nonce` TEXT | |
) | |
""") | |
# pbkdf2 salt, b64 encoded | |
self._handle.execute(""" | |
CREATE TABLE IF NOT EXISTS `encryption` ( | |
`date_created` TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, | |
`salt` TEXT NOT NULL | |
) | |
""") | |
self._handle.commit() | |
except sqlite3.OperationalError: | |
logging.exception("Failed to complete database migrations") | |
return False | |
return True | |
def add_generator(self, full_name: str, short_name: str, secret: str, digits: int, period: int, nonce: Optional[str] = None) -> bool: | |
""" | |
Add a generator to the database. If the secret should be encrypted, that should already have happened. | |
If nonce is None, secret is assumed to be in plaintext | |
""" | |
if not self.ready: | |
logging.error("Can't add a generator when the database isn't ready") | |
return False | |
try: | |
c = self._handle.execute(""" | |
INSERT INTO `totp` (`full_name`, `short_name`, `secret`, `encrypted`, `digits`, `period`, `nonce`) VALUES (?, ?, ?, ?, ?, ?, ?) | |
""", (full_name, short_name, secret, 1 if nonce is not None else 0, digits, period, nonce)) | |
self._handle.commit() | |
except sqlite3.OperationalError: | |
logging.exception("Failed to add a new generator to the database") | |
return False | |
logging.debug("Added generator to the database") | |
return c.rowcount > 0 | |
def get_totp(self, short_name: str, enc_mgr: Optional[EncryptionManager] = None) -> Optional[pyotp.TOTP]: | |
""" | |
Get a TOTP object using data from the database | |
""" | |
# make sure to handle if secret is encrypted and enc_mgr is none, thats an error | |
# should be a test case too | |
try: | |
c = self._handle.execute(""" | |
SELECT `full_name`, `secret`, `encrypted`, `digits`, `period`, `nonce` FROM `totp` WHERE `short_name` = ? | |
""", (short_name,)) | |
row = c.fetchone() | |
if row is None: | |
logging.warning("Couldn't find a TOTP entry for '%s'", short_name) | |
return None | |
if row[2] > 0: | |
# secret is encrypted, decrypt dat boi | |
if enc_mgr is None: | |
logging.error("Can't decrypt secret from database because enc_mgr is None") | |
return None | |
secret = enc_mgr.decrypt(row[1], row[5]) | |
else: | |
# not encrypted, don't need to decrypt (and don't need an enc_mgr) | |
secret = row[1] | |
return pyotp.TOTP(secret, digits=row[3], interval=row[4]) | |
except sqlite3.OperationalError: | |
logging.exception("Failed to look up TOTP entry from the database") | |
return None | |
def get_list_of_generators(self) -> Optional[List[str]]: | |
"""Return a list of generators, string formatted for printing""" | |
try: | |
c = self._handle.execute(""" | |
SELECT `rowid`, `full_name`, `short_name` FROM `totp` | |
""") | |
rows = c.fetchall() | |
gens = [] | |
for r in rows: | |
gens.append(f"({r[0]}) {r[2]}: {r[1]}") | |
return gens | |
except sqlite3.OperationalError: | |
logging.exception("Failed to get list of TOTP generators from the database") | |
return None | |
def _save_salt(self, salt: str) -> bool: | |
""" | |
Save the PBKDF2 salt. Should be b64 encoded already. | |
This function assumes there is no salt already in the db, caller must verify that | |
""" | |
try: | |
c = self._handle.execute(""" | |
SELECT COUNT(*) FROM `encryption`; | |
""") | |
if c.fetchone()[0] > 0: | |
logging.error("Can't save PBKDF2 salt, there is already one in the database") | |
return False | |
c = self._handle.execute(""" | |
INSERT INTO `encryption` (`salt`) VALUES (?) | |
""", (salt,)) | |
self._handle.commit() | |
return c.rowcount > 0 | |
except sqlite3.OperationalError: | |
logging.exception("Couldn't save PBKDF2 salt into the database") | |
return False | |
def get_salt(self) -> Optional[bytes]: | |
""" | |
Get the salt from the database | |
If there is no salt saved, generate one, save it, and return it | |
""" | |
try: | |
c = self._handle.execute(""" | |
SELECT `salt` FROM `encryption` LIMIT 1; | |
""") | |
row = c.fetchone() | |
if row is None: | |
logging.debug("No salt in the database, generating one") | |
# no salt, make one | |
salt = get_random_bytes(32) | |
self._save_salt(base64.b64encode(salt).decode()) | |
return salt | |
else: | |
logging.debug("Using existing salt from the database") | |
return base64.b64decode(row[0]) | |
except sqlite3.OperationalError: | |
logging.exception("Couldn't get PBKDF2 salt from the database") | |
return None | |
def does_generator_exist(self, short_name: str) -> bool: | |
"""Returns True if a generator with the specified short_name exists""" | |
try: | |
c = self._handle.execute(""" | |
SELECT COUNT(*) FROM `totp` WHERE `short_name` = ? | |
""", (short_name,)) | |
return c.fetchone()[0] > 0 | |
except sqlite3.OperationalError: | |
logging.exception("Couldn't check if specified generator exists in the database") | |
return False | |
def delete_generator(self, short_name: str) -> bool: | |
"""Delete a generator from the database, returns True if successful""" | |
try: | |
c = self._handle.execute(""" | |
DELETE FROM `totp` WHERE `short_name` = ? | |
""", (short_name,)) | |
self._handle.commit() | |
return c.rowcount > 0 | |
except sqlite3.OperationalError: | |
logging.exception("Couldn't delete the specified generator from the database") | |
return False | |
def __repr__(self) -> str: | |
return f"<DbManager '{self._path}'>" | |
def __str__(self) -> str: | |
return f"SQLite DB @ \"{self._path}\"" | |
def add_b32_padding(x: str) -> str: | |
"""Add padding to a base32 string""" | |
diff = len(x) % 8 | |
if diff > 0: | |
x += "="*(8-diff) | |
return x | |
def is_valid_b32(x: str) -> bool: | |
"""Returns True if input string is valid base32""" | |
try: | |
x = add_b32_padding(x) | |
base64.b32decode(x.upper()) | |
return True | |
except binascii.Error: | |
return False | |
def is_valid_int(x: str) -> bool: | |
"""Returns True if input string is a valid int""" | |
try: | |
int(x) | |
return True | |
except ValueError: | |
return False | |
def is_yes_no(x: str) -> bool: | |
"""Returns True if input string is yes or no coerced""" | |
return x.lower() in "yn" | |
def get_valid_input(prompt: str, default: Optional[str] = None, validator: Optional[Callable[[str], bool]] = None) -> str: | |
"""Get a valid input from the user""" | |
txt = "" | |
if default is not None: | |
prompt += f" (default, press Enter to use: {default})" | |
prompt += ": " | |
while len(txt) == 0 or (validator is not None and not validator(txt)): | |
txt = input(prompt).strip() | |
if len(txt) == 0 and default is not None: | |
txt = default | |
break | |
return txt | |
def get_encryption_key() -> str: | |
"""Get the encryption key used to encrypt secrets""" | |
key = os.getenv("OTPBOI_KEY") | |
if key is None: | |
key = get_valid_input("Please provide encryption key") | |
return key | |
def handle_cmd_code(db: DbManager, names: List[str], encrypt: bool): | |
if encrypt: | |
enc_key = get_encryption_key() | |
salt = db.get_salt() | |
enc_mgr = EncryptionManager(enc_key, salt) | |
else: | |
enc_mgr = None | |
for n in names: | |
totp = db.get_totp(n, enc_mgr) | |
if totp is None: | |
continue | |
# TODO: figure out how long the code will be good for | |
print(totp.now()) | |
def handle_cmd_list(db: DbManager): | |
gens = db.get_list_of_generators() | |
if gens is None: | |
logging.error("Couldn't get a list of generators from the database") | |
else: | |
if len(gens) > 0: | |
print(f"There are {len(gens)} generators in the database") | |
[print(x) for x in gens] | |
else: | |
print(f"There are no generators in the database") | |
def handle_cmd_add(db: DbManager, encrypt: bool): | |
if encrypt: | |
enc_key = get_encryption_key() | |
salt = db.get_salt() | |
enc_mgr = EncryptionManager(enc_key, salt) | |
else: | |
enc_mgr = None | |
full_name = get_valid_input("Please enter the full name for the generator (ex: \"GitHub ([email protected])\")") | |
short_name = get_valid_input("Please enter the short name for the generator (ex: \"github\")", default=full_name.split(" ")[0].lower()) | |
secret = add_b32_padding(get_valid_input("Please enter the Base32-encoded secret for the generator (case insensitive; ex: NBSWY3DPORUGK4TF)", validator=is_valid_b32).upper()) | |
digits = int(get_valid_input("Please enter the number of digits the generator should output", default="6", validator=is_valid_int)) | |
validity = int(get_valid_input("Please enter how long a code the generator outputs is valid for", default="30", validator=is_valid_int)) | |
if encrypt: | |
secret, nonce = enc_mgr.encrypt(secret) | |
else: | |
nonce = None | |
if db.add_generator(full_name, short_name, secret, digits, validity, nonce): | |
logging.info("Successfully added generator to the database") | |
else: | |
logging.error("Failed to add generator to the database") | |
def handle_cmd_delete(db: DbManager, short_names: List[str]): | |
for short_name in short_names: | |
if not db.does_generator_exist(short_name): | |
logging.error("No generator with the specified short name could be found: %s", short_name) | |
continue | |
confirmation = get_valid_input(f"Are you sure you want to PERMANENTLY delete the generator '{short_name}' (y/N)", default="n", validator=is_yes_no) | |
if confirmation.lower() == "y": | |
if db.delete_generator(short_name): | |
logging.info("Successfully deleted the '%s' generator from the database", short_name) | |
else: | |
logging.error("Failed to delete the '%s' generator from the database", short_name) | |
else: | |
logging.info("Aborting, *not* deleting '%s' generator", short_name) | |
def parse_args() -> argparse.Namespace: | |
parser = argparse.ArgumentParser() | |
parent_parser = argparse.ArgumentParser(description="Global options", add_help=False) | |
parent_parser.add_argument("-d", "--otp-database", metavar="DB_PATH", default="$OTPBOI_DB_PATH", help="Path to OTP database") | |
subparsers = parser.add_subparsers(title="Commands", dest="command") | |
code_parser = subparsers.add_parser("code", help="Get a TOTP code", parents=[parent_parser]) | |
code_parser.add_argument("-e", "--encrypt", action="store_true", help="Use encryption when storing TOTP secrets (key provided interactively or via $OTPBOI_KEY)") | |
code_parser.add_argument("generator", metavar="generator", type=str, nargs=1, help="Generator to get a code for") | |
subparsers.add_parser("list", help="List available code generators", parents=[parent_parser]) | |
add_parser = subparsers.add_parser("add", help="Add a code generator", parents=[parent_parser]) | |
add_parser.add_argument("-e", "--encrypt", action="store_true", help="Use encryption when storing TOTP secrets (key provided interactively or via $OTPBOI_KEY)") | |
delete_parser = subparsers.add_parser("delete", help="Remove a code generator", parents=[parent_parser]) | |
delete_parser.add_argument("generator", metavar="generator", type=str, nargs=1, help="Generator to remove") | |
args = parser.parse_args() | |
if args.command is None: | |
logging.fatal("You must specify a command") | |
parser.print_usage() | |
return None | |
return args | |
def main(): | |
# get args | |
args = parse_args() | |
if args is None: | |
return 2 | |
# discern db path | |
if args.otp_database == "$OTPBOI_DB_PATH": | |
db_path = os.getenv(args.otp_database) | |
if db_path is None: | |
logging.fatal("You must provide the path to the OTP database with -d/--otp-database or $OTPBOI_DB_PATH") | |
return 2 | |
else: | |
db_path = args.otp_database | |
logging.debug("Using %s for the OTP database", db_path) | |
if db_path != ":memory:": | |
if not os.path.exists(db_path): | |
logging.info("Creating a new database at %s", db_path) | |
# load up the database | |
db = DbManager(db_path) | |
if not db.ready: | |
logging.error("Database not ready, exiting") | |
return 1 | |
# handle the command | |
if args.command == "code": | |
handle_cmd_code(db, args.generator, args.encrypt) | |
elif args.command == "list": | |
handle_cmd_list(db) | |
elif args.command == "add": | |
handle_cmd_add(db, args.encrypt) | |
elif args.command == "delete": | |
handle_cmd_delete(db, args.generator) | |
else: | |
# shouldn't be reachable | |
logging.fatal("Invalid command specified: %s", args.command) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) | |
########################################################################################################################## | |
class Tests(unittest.TestCase): | |
def setUp(self) -> None: | |
# FYSA this is run *per test case* | |
self.db = DbManager() | |
self.assertTrue(self.db.ready) | |
self.salt = self.db.get_salt() | |
self.key = "this is a strong encryption key" | |
self.enc_mgr = EncryptionManager(self.key, self.salt) | |
def test_gauth_otp_no_enc(self): | |
self.assertTrue(self.db.add_generator("test name", "test", "base32secret3232", 6, 30)) | |
self.assertTrue(self.db.does_generator_exist("test")) | |
totp = self.db.get_totp("test") | |
self.assertIsNotNone(totp) | |
otp = totp.at(1650734201) | |
self.assertEqual(otp, "420700") | |
self.assertTrue(self.db.delete_generator("test")) | |
self.assertFalse(self.db.does_generator_exist("test")) | |
def test_gauth_otp_enc(self): | |
secret = "base32secret3232" | |
enc_secret, nonce = self.enc_mgr.encrypt(secret) | |
self.assertTrue(self.db.add_generator("test name", "test", enc_secret, 6, 30, nonce)) | |
totp = self.db.get_totp("test") | |
self.assertIsNone(totp) | |
totp = self.db.get_totp("test", self.enc_mgr) | |
self.assertIsNotNone(totp) | |
otp = totp.at(1650734201) | |
self.assertEqual(otp, "420700") | |
def test_no_otp(self): | |
self.assertIsNone(self.db.get_totp("nonexistent")) | |
def test_salt(self): | |
# don't use setUp one so we can test both code paths of get_salt together | |
db = DbManager() | |
salt1 = db.get_salt() | |
salt2 = db.get_salt() | |
self.assertEqual(salt1, salt2) | |
def test_enc(self): | |
salt = self.db.get_salt() | |
key = "this is a strong encryption key" | |
enc_mgr = EncryptionManager(key, salt) | |
ptxt = "hello general kenobi" | |
ctxt, nonce = enc_mgr.encrypt(ptxt) | |
ctxt2, nonce2 = enc_mgr.encrypt(ptxt) | |
self.assertNotEqual(ctxt, ctxt2) | |
self.assertNotEqual(nonce, nonce2) | |
ptxt_dec = enc_mgr.decrypt(ctxt, nonce) | |
self.assertEqual(ptxt, ptxt_dec) | |
def test_padding(self): | |
a = "a" * 13 | |
a_pad = add_b32_padding(a) | |
self.assertEqual(len(a_pad), 16) | |
b = "b" * 16 | |
b_pad = add_b32_padding(b) | |
self.assertEqual(b_pad, b) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment