Last active
April 13, 2023 17:27
-
-
Save rachidbch/2801be992c9985a69952759ec73c2df3 to your computer and use it in GitHub Desktop.
Colab Helper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__version__ = '0.1.0' | |
import subprocess | |
import os | |
import re | |
import sys | |
import json | |
import base64 | |
import ast | |
import getpass | |
from pty import slave_open | |
from collections import namedtuple | |
from cryptography.fernet import Fernet | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
def secret_manager(passphrase): | |
passphrase = passphrase.encode() | |
salt = b'\x00' | |
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=480000) | |
key = base64.urlsafe_b64encode(kdf.derive(passphrase)) | |
cipher_suite = Fernet(key) | |
VAULT = 'gAAAAABkODuA_siN630KFD5pltSe7CXX93-kRgPvNODAG9qDG3F136fUiVUMfGwYuWMniceCoHyLkxU4bNFnq732--B8bzJDAUj2Od-c0VkM6Xtt5byBkxGAM9Kp40MkANDyjW10ySOGuR1VCp_Uyxk6VLVQEOH2fvNPyOri9dZI3oRqBMTsW25efKnBaMvLKJy5MiHhpOCeZ-zUBvDx1WM_X0ojIRTpCKBJGdcOrLSigXAvpRH6Cfq0v8CcLFY1e7atkk4PFiVEmnTcUuWdnXEGSZ0fiaQMJeuz9P6fEHK-mnuzDIhKknad2EmvU-udpHeHLekWrRdDddXM_EebMKMlbz_WTDJFPKFS5aLLwRlWzplYGs24U5HomK9eKiwv75U269MRQ2P4wcMigvcdNrzxrKcM5UeWGpJVlai1Qp8iowt7c1npYEaipyW845vhK7IdkIAK_weOgTvrSBIMAWg_2me5qm-POrYgsbJB2BWgAdKQPfTarL4tDx-vdX8uDnVtx1TjBcM-FnKaCyVAPaqikIvHy-K-j0uaN8e66ousHFUPq7o6vwpYOat6bKzZBhlZe8Kh9GzU' | |
GIST_ID = '2801be992c9985a69952759ec73c2df3' | |
def pdict(d): | |
for (key, value) in d.items(): | |
if isinstance(value, dict): | |
d[key] = pdict(value) | |
return namedtuple('obj', d.keys())(*d.values()) | |
def store_gist_id(id): | |
with open(__file__, 'r') as f: | |
tree = ast.parse(f.read()) | |
for node in ast.walk(tree): | |
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager': | |
for sub_node in ast.walk(node): | |
if isinstance(sub_node, ast.Assign): | |
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'GIST_ID': | |
sub_node.value = ast.Str(s=id) | |
with open(__file__, 'w') as f: | |
f.write(ast.unparse(tree)) | |
def read_gist_id(): | |
with open(__file__, 'r') as f: | |
tree = ast.parse(f.read()) | |
for node in ast.walk(tree): | |
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager': | |
for sub_node in ast.walk(node): | |
if isinstance(sub_node, ast.Assign): | |
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'GIST_ID': | |
return str(sub_node.value.s) | |
def gistid_from_url(url): | |
pattern = '(https?://gist\\.github\\.com/\\w+/([a-f0-9]+))' | |
match = re.search(pattern, url) | |
if match: | |
return ((gist_id := match.group(2)), None) | |
else: | |
return (None, 'Unrecognized Gist url format.') | |
def save_gist(gist_id=None, description=None): | |
try: | |
subprocess.check_output(['gh', '--version']) | |
except OSError: | |
print('GitHub CLI (gh) not found. Please install it first.', file=sys.stderr) | |
return | |
with open(__file__, 'r') as f: | |
content = f.read() | |
cmd = ['gh', 'gist'] | |
gist_id = gist_id if gist_id else read_gist_id() | |
if gist_id: | |
cmd += ['edit', gist_id] | |
cmd += ['--filename', 'zcreds.py'] | |
cmd += [__file__] | |
p = subprocess.Popen(cmd, stdin=subprocess.PIPE) | |
(out, err) = p.communicate(input=content.encode()) | |
if p.returncode != 0: | |
print(f'Error updating gist: {err}', file=sys.stderr) | |
else: | |
print(f'Gist updated successfully.') | |
else: | |
cmd += ['create'] | |
description = 'COLAB Helper' if not description else description | |
cmd += ['-d', description] | |
cmd += ['--filename', 'zcreds.py'] | |
cmd += ['--public'] | |
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
(out, err) = p.communicate(input=content.encode()) | |
if p.returncode != 0: | |
print(f'Error creating new gist: {err}', file=sys.stderr) | |
else: | |
url = out.decode().strip() | |
(gistid, err) = gistid_from_url(url) | |
if not err: | |
store_gist_id(gistid) | |
save_gist(gistid) | |
print(f'Gist created successfully at {url}') | |
else: | |
print(f'Error parsing gist url: {err}', file=sys.stderr) | |
def store_vault(secrets): | |
with open(__file__, 'r') as f: | |
tree = ast.parse(f.read()) | |
for node in ast.walk(tree): | |
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager': | |
for sub_node in ast.walk(node): | |
if isinstance(sub_node, ast.Assign): | |
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'VAULT': | |
sub_node.value = ast.Str(s=secrets) | |
with open(__file__, 'w') as f: | |
f.write(ast.unparse(tree)) | |
def read_vault(): | |
with open(__file__, 'r') as f: | |
tree = ast.parse(f.read()) | |
for node in ast.walk(tree): | |
if isinstance(node, ast.FunctionDef) and node.name == 'secret_manager': | |
for sub_node in ast.walk(node): | |
if isinstance(sub_node, ast.Assign): | |
if isinstance(sub_node.targets[0], ast.Name) and sub_node.targets[0].id == 'VAULT': | |
return str(sub_node.value.s) | |
def encrypt_secret(secret): | |
""" | |
Encrypts a JSON object containing secrets and stores it in this file. | |
Args: | |
secret (dict): A dictionary of secrets to be encrypted. | |
""" | |
secret = json.loads(secret) | |
encrypted_vault = cipher_suite.encrypt(json.dumps(secret).encode()).decode() | |
store_vault(encrypted_vault) | |
save_gist() | |
def decrypt_secret(): | |
""" | |
Decrypts previously-encrypted JSON object containing secrets stored in this file. | |
Returns: | |
dict: A dictionary of decrypted secrets. | |
""" | |
encrypted_vault = read_vault() | |
secrets = cipher_suite.decrypt(encrypted_vault.encode()) | |
return json.loads(secrets.decode()) | |
return {'encrypt_secret': encrypt_secret, 'decrypt_secret': decrypt_secret, 'pdict': pdict} | |
def cli(): | |
is_colab = 'google.colab' in sys.modules | |
key_prompt_message = '' | |
if is_colab: | |
key_prompt_message += 'Notebook running on Google Colaboratory.\n' | |
key_prompt_message += 'Enter encryption/decryption key:\n' | |
else: | |
key_prompt_message += 'Enter encryption/decryption key: ' | |
passphrase = getpass.getpass(prompt='Enter passphrase: ') | |
manager = secret_manager(passphrase) | |
secrets = '' | |
if not is_colab: | |
if not os.isatty(sys.stdin.fileno()): | |
secrets = '' | |
while True: | |
line_inputted_by_user = input().strip() | |
if not line_inputted_by_user: | |
break | |
secrets += line_inputted_by_user | |
elif len(sys.argv) > 1: | |
with open(sys.argv[1], 'r') as file: | |
secrets = file.read() | |
if secrets: | |
try: | |
manager['encrypt_secret'](secrets) | |
except ValueError as e: | |
print(f'Error parsing JSON: {e}', file=sys.stderr) | |
else: | |
try: | |
ZCREDS = manager['decrypt_secret']() | |
print(json.dumps(ZCREDS)) if not is_colab else None | |
ZCREDS = manager['pdict'](ZCREDS) | |
return ZCREDS | |
except ValueError as e: | |
print(f'Error parsing JSON: {e}', file=sys.stderr) | |
if __name__ == '__main__': | |
ZCREDS = cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment