Last active
March 15, 2021 08:20
-
-
Save nwithan8/399d3fae2d9d8639d633fbfbbafb8c91 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # RUN THIS COMMAND TO INSTALL DEPENDENCIES | |
| # pip3 install requests argparse cryptography yourls | |
| import requests | |
| import argparse | |
| import random | |
| import string | |
| import socket | |
| import json | |
| import os | |
| import base64 | |
| import zlib | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| from cryptography.hazmat.backends import default_backend | |
| from yourls import YOURLSClient | |
| # EDIT THESE VALUES # | |
| VERBOSE = True | |
| JELLYFIN_URL = '' | |
| JELLYFIN_API_KEY = '' | |
| JELLYFIN_ADMIN_USERNAME = '' | |
| JELLYFIN_ADMIN_PASSWORD = '' | |
| JELLYFIN_USER_POLICY = { | |
| "IsAdministrator": False, | |
| "IsHidden": True, | |
| "IsHiddenRemotely": True, | |
| "IsDisabled": False, | |
| "EnableRemoteControlOfOtherUsers": False, | |
| "EnableSharedDeviceControl": False, | |
| "EnableRemoteAccess": True, | |
| "EnableLiveTvManagement": False, | |
| "EnableLiveTvAccess": False, | |
| "EnableContentDeletion": False, | |
| "EnableContentDownloading": False, | |
| "EnableSyncTranscoding": False, | |
| "EnableSubtitleManagement": False, | |
| "EnableAllDevices": True, | |
| "EnableAllChannels": False, | |
| "EnablePublicSharing": False, | |
| "InvalidLoginAttemptCount": 5, | |
| "BlockedChannels": [ | |
| "IPTV", | |
| "TVHeadEnd Recordings" | |
| ], | |
| "AuthenticationProviderId": "Jellyfin.Server.Implementations.Library.DefaultAuthenticationProvider" | |
| } | |
| PRIVATEBIN_URL = '' | |
| YOURLS_URL = '' | |
| # Complete either signature or username/password | |
| YOURLS_SIGNATURE = '' | |
| YOURLS_USERNAME = '' | |
| YOURLS_PASSWORD = '' | |
| # DO NOT EDIT BELOW # | |
| # Jellyfin API # | |
| token_header = None | |
| def load_token_header(file): | |
| global token_header | |
| if os.path.exists(file): | |
| with open(file, 'r') as f: | |
| token_header = {'X-Emby-Token': '{}'.format(f.readline())} | |
| return True | |
| return False | |
| def save_token(file, token): | |
| with open(file, 'w+') as f: | |
| f.write(str(token)) | |
| def make_password(length): | |
| """ | |
| Generate a random string of letters and digits | |
| """ | |
| return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length)) | |
| def get(cmd, params): | |
| res = requests.get( | |
| '{}{}?api_key={}{}'.format(JELLYFIN_URL, cmd, JELLYFIN_API_KEY, | |
| ("&" + params if params is not None else "")) | |
| ) | |
| if VERBOSE: | |
| print(res.content) | |
| return res | |
| def postWithToken(hdr, method, data=None): | |
| hdr = {'accept': 'application/json', 'Content-Type': 'application/json', **hdr} | |
| res = requests.post('{}{}'.format(JELLYFIN_URL, method), headers=hdr, data=json.dumps(data)) | |
| if VERBOSE: | |
| print(res.content) | |
| return res | |
| def post(cmd, params, payload): | |
| res = requests.post( | |
| '{}{}?api_key={}{}'.format(JELLYFIN_URL, cmd, JELLYFIN_API_KEY, | |
| ("&" + params if params is not None else "")), | |
| json=payload) | |
| if VERBOSE: | |
| print(res.content) | |
| return res | |
| def getUser(username): | |
| url = '/Users' | |
| res = get(url, None) | |
| if res: | |
| for user in res.json(): | |
| if user.get('Name') == username: | |
| return user | |
| return None | |
| def resetPassword(userId): | |
| url = '/Users/{}/Password'.format(userId) | |
| data = { | |
| 'Id': str(userId), | |
| 'ResetPassword': True | |
| } | |
| return postWithToken(hdr=token_header, method=url, data=data) | |
| def setUserPassword(userId, currentPass, newPass): | |
| url = '/Users/{}/Password'.format(userId) | |
| data = { | |
| 'Id': userId, | |
| 'CurrentPw': currentPass, | |
| 'NewPw': newPass | |
| } | |
| return postWithToken(hdr=token_header, method=url, data=data) | |
| def updatePolicy(userId, policy=None): | |
| if not policy: | |
| policy = JELLYFIN_USER_POLICY | |
| url = '/Users/{}/Policy'.format(userId) | |
| return postWithToken(hdr=token_header, method=url, data=policy) | |
| def authenticate(): | |
| global token_header | |
| if not load_token_header('.jf_token'): | |
| xEmbyAuth = { | |
| 'X-Emby-Authorization': 'Emby UserId="{UserId}", Client="{Client}", Device="{Device}", DeviceId="{' | |
| 'DeviceId}", Version="{Version}", Token="""'.format( | |
| UserId="", # not required, if it was we would have to first request the UserId from the username | |
| Client='account-automation', | |
| Device=socket.gethostname(), | |
| DeviceId=hash(socket.gethostname()), | |
| Version=1, | |
| Token="" # not required | |
| )} | |
| data = {'Username': JELLYFIN_ADMIN_USERNAME, 'Password': JELLYFIN_ADMIN_PASSWORD, | |
| 'Pw': JELLYFIN_ADMIN_PASSWORD} | |
| try: | |
| res = postWithToken(hdr=xEmbyAuth, method='/Users/AuthenticateByName', data=data).json() | |
| token_header = {'X-Emby-Token': '{}'.format(res['AccessToken'])} | |
| save_token('.jf_token', res['AccessToken']) | |
| except Exception as e: | |
| print('Could not log into Jellyfin.\n{}'.format(e)) | |
| # PrivateBin API # | |
| expiration_options = ["5min", "10min", "1hour", "1day", "1week", "1month", "1year", "never"] | |
| def _json_encode(d): | |
| return json.dumps(d, separators=(',', ':')).encode('utf-8') | |
| def _base58_encode(v): | |
| # 58 char alphabet | |
| alphabet = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' | |
| alphabet_len = len(alphabet) | |
| if isinstance(v, str) and not isinstance(v, bytes): | |
| v = v.encode('ascii') | |
| nPad = len(v) | |
| v = v.lstrip(b'\0') | |
| nPad -= len(v) | |
| l = 0 | |
| for (i, c) in enumerate(v[::-1]): | |
| if isinstance(c, str): | |
| c = ord(c) | |
| l += c << (8 * i) | |
| string = b'' | |
| while l: | |
| l, idx = divmod(l, alphabet_len) | |
| string = alphabet[idx:idx + 1] + string | |
| return alphabet[0:1] * nPad + string | |
| # | |
| # The encryption format is described here: | |
| # https://github.com/PrivateBin/PrivateBin/wiki/Encryption-format | |
| # | |
| def _privatebin_encrypt(paste_passphrase, | |
| paste_password, | |
| paste_plaintext, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion): | |
| if paste_password: | |
| paste_passphrase += bytes(paste_password, 'utf-8') | |
| # PBKDF | |
| # kdf_salt = get_random_bytes(8) | |
| kdf_salt = bytes(os.urandom(8)) | |
| kdf_iterations = 100000 | |
| kdf_keysize = 256 # size of resulting kdf_key | |
| backend = default_backend() | |
| kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), | |
| length=int(kdf_keysize / 8), # 256bit | |
| salt=kdf_salt, | |
| iterations=kdf_iterations, | |
| backend=backend) | |
| kdf_key = kdf.derive(paste_passphrase) | |
| # AES-GCM | |
| adata_size = 128 | |
| # cipher_iv = get_random_bytes(int(adata_size / 8)) | |
| cipher_iv = bytes(os.urandom(int(adata_size / 8))) | |
| cipher_algo = "aes" | |
| cipher_mode = "gcm" | |
| compression_type = "none" | |
| if paste_compress: | |
| compression_type = "zlib" | |
| # compress plaintext | |
| paste_data = {'paste': paste_plaintext} | |
| if paste_attachment_name and paste_attachment: | |
| paste_data['attachment'] = paste_attachment | |
| paste_data['attachment_name'] = paste_attachment_name | |
| print(paste_attachment_name) | |
| print(paste_attachment) | |
| if paste_compress: | |
| zobj = zlib.compressobj(wbits=-zlib.MAX_WBITS) | |
| paste_blob = zobj.compress(_json_encode(paste_data)) + zobj.flush() | |
| else: | |
| paste_blob = _json_encode(paste_data) | |
| # Associated data to authenticate | |
| paste_adata = [ | |
| [ | |
| base64.b64encode(cipher_iv).decode("utf-8"), | |
| base64.b64encode(kdf_salt).decode("utf-8"), | |
| kdf_iterations, | |
| kdf_keysize, | |
| adata_size, | |
| cipher_algo, | |
| cipher_mode, | |
| compression_type, | |
| ], | |
| paste_formatter, | |
| int(paste_opendicussion), | |
| int(paste_burn), | |
| ] | |
| paste_adata_json = _json_encode(paste_adata) | |
| aesgcm = AESGCM(kdf_key) | |
| ciphertext = aesgcm.encrypt(cipher_iv, paste_blob, paste_adata_json) | |
| # Validate | |
| # aesgcm.decrypt(cipher_iv, ciphertext, paste_adata_json) | |
| paste_ciphertext = base64.b64encode(ciphertext).decode("utf-8") | |
| return paste_adata, paste_ciphertext | |
| def _privatebin_send(paste_url, | |
| paste_password, | |
| paste_plaintext, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion, | |
| paste_expire): | |
| paste_passphrase = bytes(os.urandom(32)) | |
| # paste_passphrase = get_random_bytes(32) | |
| paste_adata, paste_ciphertext = _privatebin_encrypt(paste_passphrase, | |
| paste_password, | |
| paste_plaintext, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion) | |
| # json payload for the post API | |
| # https://github.com/PrivateBin/PrivateBin/wiki/API | |
| payload = { | |
| "v": 2, | |
| "adata": paste_adata, | |
| "ct": paste_ciphertext, | |
| "meta": { | |
| "expire": paste_expire, | |
| } | |
| } | |
| # http content type | |
| headers = {'X-Requested-With': 'JSONHttpRequest'} | |
| r = requests.post(paste_url, | |
| data=_json_encode(payload), | |
| headers=headers) | |
| r.raise_for_status() | |
| try: | |
| result = r.json() | |
| except: | |
| return False, 'Error parsing JSON: {}'.format(r.text) | |
| paste_status = result['status'] | |
| if paste_status: | |
| paste_message = result['message'] | |
| return False, "Error getting status: {}".format(paste_message) | |
| paste_id = result['id'] | |
| paste_url_id = result['url'] | |
| paste_deletetoken = result['deletetoken'] | |
| return {'url': '{}{}#{}'.format(paste_url, paste_url_id, _base58_encode(paste_passphrase).decode("utf-8")), | |
| 'delete': '{}/?pasteid={}&deletetoken={}'.format(paste_url, paste_id, paste_deletetoken)}, None | |
| def privatebin(text, url='https://privatebin.net', pass_protect=False, expiration='never', burn_after_reading=False): | |
| paste_url = url | |
| paste_formatter = 'plaintext' | |
| paste_compress = True | |
| paste_opendicussion = 0 | |
| paste_burn = 0 | |
| paste_password = None | |
| paste_attachment_name = None | |
| paste_attachment = None | |
| if not text: | |
| return False, "You did not provide any text to send to {}".format(url) | |
| if expiration not in expiration_options: | |
| return False, "Incorrect how_long option. Options: '{}'".format("', '".join(expiration_options)) | |
| if burn_after_reading: | |
| paste_burn = 1 | |
| if pass_protect: | |
| paste_password = pass_protect | |
| return _privatebin_send(paste_url, | |
| paste_password, | |
| text, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion, | |
| expiration) | |
| # Helper functions # | |
| def status(response): | |
| if VERBOSE: | |
| print(response.status_code) | |
| if str(response.status_code).startswith('2'): | |
| return True | |
| return False | |
| def resetJellyfinUser(username, password): | |
| authenticate() | |
| user = getUser(username) | |
| if user: | |
| uid = user['Id'] | |
| if uid: | |
| res = resetPassword(uid) | |
| if status(res): | |
| res = setUserPassword(uid, "", password) | |
| if status(res): | |
| if JELLYFIN_USER_POLICY: | |
| res = updatePolicy(uid, JELLYFIN_USER_POLICY) | |
| if status(res): | |
| return True, "User's password has been updated, and policy has been " \ | |
| "enforced. " | |
| return False, "User's password has been updated, but their policy could " \ | |
| "not be enforced. " | |
| return True, "User's password has been updated." | |
| return False, "Could not change the user password (step 2 of setting new password)." | |
| return False, "Could not reset user password (step 1 of setting new user password)." | |
| return False, "Could not get User ID to complete process." | |
| return False, "Could not find user." | |
| def checkVars(): | |
| return all([JELLYFIN_USER_POLICY, JELLYFIN_URL, JELLYFIN_ADMIN_USERNAME, JELLYFIN_API_KEY, JELLYFIN_ADMIN_PASSWORD]) | |
| """ Handling arguments """ | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('username', help="Username of new Jellyfin user (Required)") | |
| parser.add_argument('-p', "--password", help="Specify password fo the new Jellyfin user") | |
| parser.add_argument('-l', "--length", help="How many characters in randomly-generated password") | |
| parser.add_argument('-b', help="Automatically upload to PrivateBin", action='store_true') | |
| parser.add_argument('-t', choices=["5min", "10min", "1hour", "1day", "1week", "1month", "1year", "never"], | |
| help="How long until the paste expires") | |
| parser.add_argument('-s', help="Shorten link with YOURLS", action='store_true') | |
| args = parser.parse_args() | |
| def run(): | |
| if checkVars(): | |
| if args.password: | |
| password = args.password | |
| else: | |
| password = make_password(int(args.length) if args.length else 10) | |
| success, msg = resetJellyfinUser(args.username, password) | |
| print(msg) | |
| if success and args.b: | |
| if PRIVATEBIN_URL: | |
| text = "Hostname: {}\nUsername: {}\nPassword: {}".format(JELLYFIN_URL, args.username, password) | |
| data, error = privatebin( | |
| text=text, | |
| url=PRIVATEBIN_URL, | |
| pass_protect=False, | |
| expiration=(args.t if args.t else '1week'), # defaults to expire after 1 week | |
| burn_after_reading=False | |
| ) | |
| if not error: | |
| if args.s: | |
| if YOURLS_URL and (YOURLS_SIGNATURE or (YOURLS_USERNAME and YOURLS_PASSWORD)): | |
| if YOURLS_SIGNATURE: | |
| yourls = YOURLSClient(YOURLS_URL + '/yourls-api.php', signature=YOURLS_SIGNATURE) | |
| else: | |
| yourls = YOURLSClient(YOURLS_URL + '/yourls-api.php', username=YOURLS_USERNAME, | |
| password=YOURLS_PASSWORD) | |
| return True, "Uploaded to PrivateBin and shortened: {}".format( | |
| yourls.shorten(data['url']).shorturl) | |
| return False, "Please complete the YOURLS settings inside this file." | |
| return True, "Uploaded to PrivateBin: {}".format(data['url']) | |
| return False, "Error uploading to PrivateBin: {}".format(error) | |
| return False, "Please complete the Privatebin settings inside this file." | |
| return True, None | |
| return False, "Please complete all the Jellyfin settings inside this file." | |
| success, msg = run() | |
| if msg: | |
| print(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment