Skip to content

Instantly share code, notes, and snippets.

@evindunn
Last active June 27, 2022 23:29
Show Gist options
  • Save evindunn/08dc8daefce5394bbf016ab5f4135cbc to your computer and use it in GitHub Desktop.
Save evindunn/08dc8daefce5394bbf016ab5f4135cbc to your computer and use it in GitHub Desktop.
Async argon2-ffi
from asyncio import wrap_future, get_running_loop
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count, Lock
from atexit import register as atexit_register
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from os import getpid
class CryptoWorker:
_MAX_PROCESSES = 8
_POOL = None
_HASHER = PasswordHasher()
@staticmethod
async def hash(plain_text: str):
CryptoWorker._setup()
loop = get_running_loop()
return await loop.run_in_executor(CryptoWorker._POOL, CryptoWorker._hash_sync, CryptoWorker._HASHER, plain_text)
@staticmethod
async def verify(hash_: str, plain_text: str):
CryptoWorker._setup()
loop = get_running_loop()
return await loop.run_in_executor(CryptoWorker._POOL, CryptoWorker._verify_sync, CryptoWorker._HASHER, hash_, plain_text)
@staticmethod
def _setup():
if CryptoWorker._POOL is None:
max_workers=min([cpu_count() + 1, CryptoWorker._MAX_PROCESSES])
CryptoWorker._POOL = ProcessPoolExecutor(max_workers)
@staticmethod
@atexit_register
def _shutdown():
# _POOL will only be None in child processes, so this will only get called by the parent process
if CryptoWorker._POOL is not None:
CryptoWorker._POOL.shutdown(wait=False, cancel_futures=True)
@staticmethod
def _hash_sync(hasher: PasswordHasher, plain_text: str):
return hasher.hash(plain_text)
@staticmethod
def _verify_sync(hasher: PasswordHasher, hash_: str, plain_text: str):
try:
return hasher.verify(hash_, plain_text)
except VerifyMismatchError:
return False
#!/usr/bin/env python3
from asyncio import run as asyncio_run, as_completed as asyncio_as_completed
from random import randint
from string import ascii_letters, punctuation, digits
from async_testing import CryptoWorker
async def main():
valid_chars = ascii_letters + punctuation + digits
pwd = ''.join([valid_chars[randint(0, len(valid_chars) - 1)] for i in range(80)])
print("Password: {}".format(pwd))
hsh = await CryptoWorker.hash(pwd)
futures = list()
for test in ["b" * 1000, "blue" * 100, pwd, "blah" * 128, "q", "q", "q", "q", "q"]:
futures.append(CryptoWorker.verify(hsh, test))
for coro in asyncio_as_completed(futures):
print(await coro)
if __name__ == "__main__":
asyncio_run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment