Last active
January 16, 2023 15:04
-
-
Save nwithan8/34c36c615a347dd576e0a8d157f69fc0 to your computer and use it in GitHub Desktop.
Create a new Jellyfin user from CLI (option to upload credentials to PrivateBin and shorten link)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os | |
| import requests | |
| import argparse | |
| import random | |
| import string | |
| import socket | |
| import json | |
| import base64 | |
| import zlib | |
| from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC | |
| from cryptography.hazmat.backends import default_backend | |
| # EDIT THESE VALUES # | |
| VERBOSE = False | |
| JELLYFIN_URL = 'https://jellyfin.example.org' | |
| JELLYFIN_API_KEY = 'FILLME' | |
| JELLYFIN_ADMIN_USERNAME = 'FILLME' | |
| JELLYFIN_ADMIN_PASSWORD = 'FILLME' | |
| JELLYFIN_USER_POLICY = { | |
| "IsAdministrator": "false", | |
| "IsHidden": "true", | |
| "IsHiddenRemotely": "true", | |
| "IsDisabled": "false", | |
| "EnableRemoteControlOfOtherUsers": "false", | |
| "EnableSharedDeviceControl": "false", | |
| "EnableRemoteAccess": "true", | |
| "EnableLiveTvManagement": "false", | |
| "EnableLiveTvAccess": "false", | |
| "EnableMediaPlayback": "true", | |
| "EnableAudioPlaybackTranscoding": "false", | |
| "EnableVideoPlaybackTranscoding": "false", | |
| "EnablePlaybackRemuxing": "true", | |
| "EnableContentDeletion": "false", | |
| "EnableContentDownloading": "false", | |
| "EnableSyncTranscoding": "false", | |
| "EnableMediaConversion": "false", | |
| "EnableAllDevices": "true", | |
| "EnableAllChannels": "true", | |
| "EnableAllFolders": "true", | |
| "InvalidLoginAttemptCount": 0, | |
| "LoginAttemptsBeforeLockout": -1, | |
| "RemoteClientBitrateLimit": 0, | |
| "EnablePublicSharing": "false", | |
| "AuthenticationProviderId": "Jellyfin.Server.Implementations.Users.DefaultAuthenticationProvider", | |
| "PasswordResetProviderId": "Jellyfin.Server.Implementations.Users.DefaultPasswordResetProvider" | |
| } | |
| PRIVATEBIN_URL = '' | |
| YOURLS_URL = '' | |
| # Complete either signature or username/password | |
| YOURLS_SIGNATURE = '' | |
| YOURLS_USERNAME = '' | |
| YOURLS_PASSWORD = '' | |
| # DO NOT EDIT BELOW # | |
| # Jellyfin API # | |
| token_header = None | |
| def load_token_header(file): | |
| global token_header | |
| if os.path.exists(file): | |
| with open(file, 'r') as f: | |
| token_header = {'X-Emby-Token': '{}'.format(f.readline())} | |
| return True | |
| return False | |
| def save_token(file, token): | |
| with open(file, 'w+') as f: | |
| f.write(str(token)) | |
| def make_password(length): | |
| """ | |
| Generate a random string of letters and digits | |
| """ | |
| return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length)) | |
| def postWithToken(hdr, method, data=None): | |
| hdr = {'accept': 'application/json', 'Content-Type': 'application/json', **hdr} | |
| res = requests.post('{}{}'.format(JELLYFIN_URL, method), headers=hdr, data=json.dumps(data)) | |
| if VERBOSE: | |
| print(res.content) | |
| return res | |
| def post(cmd, params, payload): | |
| res = requests.post( | |
| '{}{}?api_key={}{}'.format(JELLYFIN_URL, cmd, JELLYFIN_API_KEY, | |
| ("&" + params if params is not None else "")), | |
| json=payload) | |
| if VERBOSE: | |
| print(res.content) | |
| return res | |
| def makeUser(username): | |
| url = '/Users/New' | |
| data = { | |
| 'Name': str(username) | |
| } | |
| return post(url, None, payload=data) | |
| def resetPassword(userId): | |
| url = '/Users/{}/Password'.format(userId) | |
| data = { | |
| 'Id': str(userId), | |
| 'ResetPassword': 'true' | |
| } | |
| return postWithToken(hdr=token_header, method=url, data=data) | |
| def setUserPassword(userId, currentPass, newPass): | |
| url = '/Users/{}/Password'.format(userId) | |
| data = { | |
| 'Id': userId, | |
| 'CurrentPw': currentPass, | |
| 'NewPw': newPass | |
| } | |
| return postWithToken(hdr=token_header, method=url, data=data) | |
| def updatePolicy(userId, policy=None): | |
| if not policy: | |
| policy = JELLYFIN_USER_POLICY | |
| url = '/Users/{}/Policy'.format(userId) | |
| return postWithToken(hdr=token_header, method=url, data=policy) | |
| def authenticate(): | |
| global token_header | |
| if not load_token_header('.jf_token'): | |
| xEmbyAuth = { | |
| 'X-Emby-Authorization': 'Emby UserId="{UserId}", Client="{Client}", Device="{Device}", DeviceId="{' | |
| 'DeviceId}", Version="{Version}", Token="""'.format( | |
| UserId="", # not required, if it was we would have to first request the UserId from the username | |
| Client='account-automation', | |
| Device=socket.gethostname(), | |
| DeviceId=hash(socket.gethostname()), | |
| Version=1, | |
| Token="" # not required | |
| )} | |
| data = {'Username': JELLYFIN_ADMIN_USERNAME, 'Password': JELLYFIN_ADMIN_PASSWORD, | |
| 'Pw': JELLYFIN_ADMIN_PASSWORD} | |
| try: | |
| res = postWithToken(hdr=xEmbyAuth, method='/Users/AuthenticateByName', data=data).json() | |
| token_header = {'X-Emby-Token': '{}'.format(res['AccessToken'])} | |
| save_token('.jf_token', res['AccessToken']) | |
| except Exception as e: | |
| print('Could not log into Jellyfin.\n{}'.format(e)) | |
| # PrivateBin API # | |
| pb_expiration_options = ["5min", "10min", "1hour", "1day", "1week", "1month", "1year", "never"] | |
| def _json_encode(d): | |
| return json.dumps(d, separators=(',', ':')).encode('utf-8') | |
| def _base58_encode(v): | |
| # 58 char alphabet | |
| alphabet = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' | |
| alphabet_len = len(alphabet) | |
| if isinstance(v, str) and not isinstance(v, bytes): | |
| v = v.encode('ascii') | |
| nPad = len(v) | |
| v = v.lstrip(b'\0') | |
| nPad -= len(v) | |
| l = 0 | |
| for (i, c) in enumerate(v[::-1]): | |
| if isinstance(c, str): | |
| c = ord(c) | |
| l += c << (8 * i) | |
| string = b'' | |
| while l: | |
| l, idx = divmod(l, alphabet_len) | |
| string = alphabet[idx:idx + 1] + string | |
| return alphabet[0:1] * nPad + string | |
| # | |
| # The encryption format is described here: | |
| # https://github.com/PrivateBin/PrivateBin/wiki/Encryption-format | |
| # | |
| def _privatebin_encrypt(paste_passphrase, | |
| paste_password, | |
| paste_plaintext, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion): | |
| if paste_password: | |
| paste_passphrase += bytes(paste_password, 'utf-8') | |
| # PBKDF | |
| # kdf_salt = get_random_bytes(8) | |
| kdf_salt = bytes(os.urandom(8)) | |
| kdf_iterations = 100000 | |
| kdf_keysize = 256 # size of resulting kdf_key | |
| backend = default_backend() | |
| kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), | |
| length=int(kdf_keysize / 8), # 256bit | |
| salt=kdf_salt, | |
| iterations=kdf_iterations, | |
| backend=backend) | |
| kdf_key = kdf.derive(paste_passphrase) | |
| # AES-GCM | |
| adata_size = 128 | |
| # cipher_iv = get_random_bytes(int(adata_size / 8)) | |
| cipher_iv = bytes(os.urandom(int(adata_size / 8))) | |
| cipher_algo = "aes" | |
| cipher_mode = "gcm" | |
| compression_type = "none" | |
| if paste_compress: | |
| compression_type = "zlib" | |
| # compress plaintext | |
| paste_data = {'paste': paste_plaintext} | |
| if paste_attachment_name and paste_attachment: | |
| paste_data['attachment'] = paste_attachment | |
| paste_data['attachment_name'] = paste_attachment_name | |
| print(paste_attachment_name) | |
| print(paste_attachment) | |
| if paste_compress: | |
| zobj = zlib.compressobj(wbits=-zlib.MAX_WBITS) | |
| paste_blob = zobj.compress(_json_encode(paste_data)) + zobj.flush() | |
| else: | |
| paste_blob = _json_encode(paste_data) | |
| # Associated data to authenticate | |
| paste_adata = [ | |
| [ | |
| base64.b64encode(cipher_iv).decode("utf-8"), | |
| base64.b64encode(kdf_salt).decode("utf-8"), | |
| kdf_iterations, | |
| kdf_keysize, | |
| adata_size, | |
| cipher_algo, | |
| cipher_mode, | |
| compression_type, | |
| ], | |
| paste_formatter, | |
| int(paste_opendicussion), | |
| int(paste_burn), | |
| ] | |
| paste_adata_json = _json_encode(paste_adata) | |
| aesgcm = AESGCM(kdf_key) | |
| ciphertext = aesgcm.encrypt(cipher_iv, paste_blob, paste_adata_json) | |
| # Validate | |
| # aesgcm.decrypt(cipher_iv, ciphertext, paste_adata_json) | |
| paste_ciphertext = base64.b64encode(ciphertext).decode("utf-8") | |
| return paste_adata, paste_ciphertext | |
| def _privatebin_send(paste_url, | |
| paste_password, | |
| paste_plaintext, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion, | |
| paste_expire): | |
| paste_passphrase = bytes(os.urandom(32)) | |
| # paste_passphrase = get_random_bytes(32) | |
| paste_adata, paste_ciphertext = _privatebin_encrypt(paste_passphrase, | |
| paste_password, | |
| paste_plaintext, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion) | |
| # json payload for the post API | |
| # https://github.com/PrivateBin/PrivateBin/wiki/API | |
| payload = { | |
| "v": 2, | |
| "adata": paste_adata, | |
| "ct": paste_ciphertext, | |
| "meta": { | |
| "expire": paste_expire, | |
| } | |
| } | |
| # http content type | |
| headers = {'X-Requested-With': 'JSONHttpRequest'} | |
| r = requests.post(paste_url, | |
| data=_json_encode(payload), | |
| headers=headers) | |
| r.raise_for_status() | |
| try: | |
| result = r.json() | |
| except: | |
| return False, 'Error parsing JSON: {}'.format(r.text) | |
| paste_status = result['status'] | |
| if paste_status: | |
| paste_message = result['message'] | |
| return False, "Error getting status: {}".format(paste_message) | |
| paste_id = result['id'] | |
| paste_url_id = result['url'] | |
| paste_deletetoken = result['deletetoken'] | |
| return {'url': '{}{}#{}'.format(paste_url, paste_url_id, _base58_encode(paste_passphrase).decode("utf-8")), | |
| 'delete': '{}/?pasteid={}&deletetoken={}'.format(paste_url, paste_id, paste_deletetoken)}, None | |
| def privatebin(text, url='https://privatebin.net', pass_protect=False, expiration='never', burn_after_reading=False): | |
| paste_url = url | |
| paste_formatter = 'plaintext' | |
| paste_compress = True | |
| paste_opendicussion = 0 | |
| paste_burn = 0 | |
| paste_password = None | |
| paste_attachment_name = None | |
| paste_attachment = None | |
| if not text: | |
| return False, "You did not provide any text to send to {}".format(url) | |
| if expiration not in pb_expiration_options: | |
| return False, "Incorrect how_long option. Options: '{}'".format("', '".join(expiration_options)) | |
| if burn_after_reading: | |
| paste_burn = 1 | |
| if pass_protect: | |
| paste_password = pass_protect | |
| return _privatebin_send(paste_url, | |
| paste_password, | |
| text, | |
| paste_formatter, | |
| paste_attachment_name, | |
| paste_attachment, | |
| paste_compress, | |
| paste_burn, | |
| paste_opendicussion, | |
| expiration) | |
| # Yourls API # | |
| class Yourls: | |
| def __init__(self, api_url, username=None, password=None, signature=None, | |
| expiration_age=None, expiration_age_type=None): | |
| self.apiurl = api_url | |
| if username and password and signature is None: | |
| self._data = dict(username=username, password=password) | |
| elif username is None and password is None and signature: | |
| self._data = dict(signature=signature) | |
| elif username is None and password is None and signature is None: | |
| self._data = dict() | |
| else: | |
| raise TypeError( | |
| 'If server requires authentication, either pass username and ' | |
| 'password or signature. Otherwise, leave set to default (None)') | |
| self._data['format'] = 'json' | |
| if expiration_age and expiration_age_type: | |
| if expiration_age_type in ['min', 'hr', 'day']: | |
| self._data['expiry'] = 'clock' | |
| self._data['age'] = expiration_age | |
| self._data['ageMod'] = expiration_age_type | |
| print(f"Adding {expiration_age} {expiration_age_type} expiration to Yourls link...") | |
| def _api_request(self, params): | |
| params = params.copy() | |
| params.update(self._data) | |
| response = requests.get(self.apiurl, params=params) | |
| return response.json() | |
| def _get_short_url(self, jsondata): | |
| if not jsondata.get('shorturl'): | |
| return jsondata['url']['shorturl'] | |
| return jsondata['shorturl'] | |
| def shorten(self, url, keyword=None, title=None): | |
| data = dict(action='shorturl', | |
| url=url, | |
| keyword=keyword, | |
| title=title) | |
| jsondata = self._api_request(params=data) | |
| return self._get_short_url(jsondata=jsondata) | |
| # Helper functions # | |
| def status(response): | |
| if VERBOSE: | |
| print(response.status_code) | |
| if response: | |
| return True | |
| return False | |
| def convert_pb_expiration_to_yourls(pb_expiration_choice): | |
| conversions = { | |
| "5min": [5, 'min'], | |
| "10min": [10, 'min'], | |
| "1hour": [1, 'hr'], | |
| "1day": [1, 'day'], | |
| "1week": [7, 'day'], | |
| "1month": [30, 'day'], | |
| "1year": [365, 'day'], | |
| "never": [None, None] | |
| } | |
| yourls_expiration_choice = conversions.get(pb_expiration_choice) | |
| if not yourls_expiration_choice: | |
| return [None, None] | |
| return yourls_expiration_choice | |
| def makeJellyfinUser(username, password): | |
| authenticate() | |
| res = makeUser(username) | |
| if status(res): | |
| uid = res.json()['Id'] | |
| if uid: | |
| res = resetPassword(uid) | |
| if status(res): | |
| res = setUserPassword(uid, "", password) | |
| if status(res): | |
| if JELLYFIN_USER_POLICY: | |
| res = updatePolicy(uid, JELLYFIN_USER_POLICY) | |
| if status(res): | |
| return True, "User has been created, password has been updated, and policy has been " \ | |
| "enforced. " | |
| return False, "User has been created and password has been updated, but their policy could " \ | |
| "not be enforced. " | |
| return True, "User has been created and password has been updated." | |
| return False, "Could not change the user password (step 2 of setting new password).\nUser has been " \ | |
| "created, but with a blank password." | |
| return False, "Could not reset user password (step 1 of setting new user password).\nUser has been " \ | |
| "created, but with a blank password. " | |
| return False, "Could not get User ID to complete process." | |
| return False, "Could not make user." | |
| def checkVars(): | |
| return all([JELLYFIN_USER_POLICY, JELLYFIN_URL, JELLYFIN_ADMIN_USERNAME, JELLYFIN_API_KEY, JELLYFIN_ADMIN_PASSWORD]) | |
| """ Handling arguments """ | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('username', help="Username of new Jellyfin user (Required)") | |
| parser.add_argument('-p', "--password", help="Specify password fo the new Jellyfin user") | |
| parser.add_argument('-l', "--length", help="How many characters in randomly-generated password") | |
| parser.add_argument('-b', help="Automatically upload to PrivateBin", action='store_true') | |
| parser.add_argument('-t', choices=pb_expiration_options, | |
| help="How long until the paste and link expires") | |
| parser.add_argument('-s', help="Shorten link with YOURLS", action='store_true') | |
| args = parser.parse_args() | |
| def run(): | |
| if checkVars(): | |
| if args.password: | |
| password = args.password | |
| else: | |
| password = make_password(int(args.length) if args.length else 10) | |
| success, msg = makeJellyfinUser(args.username, password) | |
| print(msg) | |
| if success and args.b: | |
| if PRIVATEBIN_URL: | |
| text = "Username: {}\nPassword: {}\nWebsite: {}".format(args.username, password, JELLYFIN_URL) | |
| data, error = privatebin( | |
| text=text, | |
| url=PRIVATEBIN_URL, | |
| pass_protect=False, | |
| expiration=(args.t if args.t else '1week'), # defaults to expire after 1 week | |
| burn_after_reading=False | |
| ) | |
| if not error: | |
| if args.s: | |
| expiration_settings = convert_pb_expiration_to_yourls(args.t if args.t else '1week') | |
| if YOURLS_URL and (YOURLS_SIGNATURE or (YOURLS_USERNAME and YOURLS_PASSWORD)): | |
| if YOURLS_SIGNATURE: | |
| yourls = Yourls(YOURLS_URL + '/yourls-api.php', | |
| signature=YOURLS_SIGNATURE, | |
| expiration_age=expiration_settings[0], | |
| expiration_age_type=expiration_settings[1]) | |
| else: | |
| yourls = Yourls(YOURLS_URL + '/yourls-api.php', | |
| username=YOURLS_USERNAME, | |
| password=YOURLS_PASSWORD, | |
| expiration_age=expiration_settings[0], | |
| expiration_age_type=expiration_settings[1]) | |
| return True, "Uploaded to PrivateBin and shortened: {}".format( | |
| yourls.shorten(data['url'])) | |
| return False, "Please complete the YOURLS settings inside this file." | |
| return True, "Uploaded to PrivateBin: {}".format(data['url']) | |
| return False, "Error uploading to PrivateBin: {}".format(error) | |
| return False, "Please complete the Privatebin settings inside this file." | |
| return True, None | |
| return False, "Please complete all the Jellyfin settings inside this file." | |
| success, msg = run() | |
| print(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment