Skip to content

Instantly share code, notes, and snippets.

@nwithan8
Last active January 16, 2023 15:04
Show Gist options
  • Select an option

  • Save nwithan8/34c36c615a347dd576e0a8d157f69fc0 to your computer and use it in GitHub Desktop.

Select an option

Save nwithan8/34c36c615a347dd576e0a8d157f69fc0 to your computer and use it in GitHub Desktop.
Create a new Jellyfin user from CLI (option to upload credentials to PrivateBin and shorten link)
#!/usr/bin/env python3
import os
import requests
import argparse
import random
import string
import socket
import json
import base64
import zlib
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
# EDIT THESE VALUES #
VERBOSE = False
JELLYFIN_URL = 'https://jellyfin.example.org'
JELLYFIN_API_KEY = 'FILLME'
JELLYFIN_ADMIN_USERNAME = 'FILLME'
JELLYFIN_ADMIN_PASSWORD = 'FILLME'
JELLYFIN_USER_POLICY = {
"IsAdministrator": "false",
"IsHidden": "true",
"IsHiddenRemotely": "true",
"IsDisabled": "false",
"EnableRemoteControlOfOtherUsers": "false",
"EnableSharedDeviceControl": "false",
"EnableRemoteAccess": "true",
"EnableLiveTvManagement": "false",
"EnableLiveTvAccess": "false",
"EnableMediaPlayback": "true",
"EnableAudioPlaybackTranscoding": "false",
"EnableVideoPlaybackTranscoding": "false",
"EnablePlaybackRemuxing": "true",
"EnableContentDeletion": "false",
"EnableContentDownloading": "false",
"EnableSyncTranscoding": "false",
"EnableMediaConversion": "false",
"EnableAllDevices": "true",
"EnableAllChannels": "true",
"EnableAllFolders": "true",
"InvalidLoginAttemptCount": 0,
"LoginAttemptsBeforeLockout": -1,
"RemoteClientBitrateLimit": 0,
"EnablePublicSharing": "false",
"AuthenticationProviderId": "Jellyfin.Server.Implementations.Users.DefaultAuthenticationProvider",
"PasswordResetProviderId": "Jellyfin.Server.Implementations.Users.DefaultPasswordResetProvider"
}
PRIVATEBIN_URL = ''
YOURLS_URL = ''
# Complete either signature or username/password
YOURLS_SIGNATURE = ''
YOURLS_USERNAME = ''
YOURLS_PASSWORD = ''
# DO NOT EDIT BELOW #
# Jellyfin API #
token_header = None
def load_token_header(file):
global token_header
if os.path.exists(file):
with open(file, 'r') as f:
token_header = {'X-Emby-Token': '{}'.format(f.readline())}
return True
return False
def save_token(file, token):
with open(file, 'w+') as f:
f.write(str(token))
def make_password(length):
"""
Generate a random string of letters and digits
"""
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length))
def postWithToken(hdr, method, data=None):
hdr = {'accept': 'application/json', 'Content-Type': 'application/json', **hdr}
res = requests.post('{}{}'.format(JELLYFIN_URL, method), headers=hdr, data=json.dumps(data))
if VERBOSE:
print(res.content)
return res
def post(cmd, params, payload):
res = requests.post(
'{}{}?api_key={}{}'.format(JELLYFIN_URL, cmd, JELLYFIN_API_KEY,
("&" + params if params is not None else "")),
json=payload)
if VERBOSE:
print(res.content)
return res
def makeUser(username):
url = '/Users/New'
data = {
'Name': str(username)
}
return post(url, None, payload=data)
def resetPassword(userId):
url = '/Users/{}/Password'.format(userId)
data = {
'Id': str(userId),
'ResetPassword': 'true'
}
return postWithToken(hdr=token_header, method=url, data=data)
def setUserPassword(userId, currentPass, newPass):
url = '/Users/{}/Password'.format(userId)
data = {
'Id': userId,
'CurrentPw': currentPass,
'NewPw': newPass
}
return postWithToken(hdr=token_header, method=url, data=data)
def updatePolicy(userId, policy=None):
if not policy:
policy = JELLYFIN_USER_POLICY
url = '/Users/{}/Policy'.format(userId)
return postWithToken(hdr=token_header, method=url, data=policy)
def authenticate():
global token_header
if not load_token_header('.jf_token'):
xEmbyAuth = {
'X-Emby-Authorization': 'Emby UserId="{UserId}", Client="{Client}", Device="{Device}", DeviceId="{'
'DeviceId}", Version="{Version}", Token="""'.format(
UserId="", # not required, if it was we would have to first request the UserId from the username
Client='account-automation',
Device=socket.gethostname(),
DeviceId=hash(socket.gethostname()),
Version=1,
Token="" # not required
)}
data = {'Username': JELLYFIN_ADMIN_USERNAME, 'Password': JELLYFIN_ADMIN_PASSWORD,
'Pw': JELLYFIN_ADMIN_PASSWORD}
try:
res = postWithToken(hdr=xEmbyAuth, method='/Users/AuthenticateByName', data=data).json()
token_header = {'X-Emby-Token': '{}'.format(res['AccessToken'])}
save_token('.jf_token', res['AccessToken'])
except Exception as e:
print('Could not log into Jellyfin.\n{}'.format(e))
# PrivateBin API #
pb_expiration_options = ["5min", "10min", "1hour", "1day", "1week", "1month", "1year", "never"]
def _json_encode(d):
return json.dumps(d, separators=(',', ':')).encode('utf-8')
def _base58_encode(v):
# 58 char alphabet
alphabet = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
alphabet_len = len(alphabet)
if isinstance(v, str) and not isinstance(v, bytes):
v = v.encode('ascii')
nPad = len(v)
v = v.lstrip(b'\0')
nPad -= len(v)
l = 0
for (i, c) in enumerate(v[::-1]):
if isinstance(c, str):
c = ord(c)
l += c << (8 * i)
string = b''
while l:
l, idx = divmod(l, alphabet_len)
string = alphabet[idx:idx + 1] + string
return alphabet[0:1] * nPad + string
#
# The encryption format is described here:
# https://github.com/PrivateBin/PrivateBin/wiki/Encryption-format
#
def _privatebin_encrypt(paste_passphrase,
paste_password,
paste_plaintext,
paste_formatter,
paste_attachment_name,
paste_attachment,
paste_compress,
paste_burn,
paste_opendicussion):
if paste_password:
paste_passphrase += bytes(paste_password, 'utf-8')
# PBKDF
# kdf_salt = get_random_bytes(8)
kdf_salt = bytes(os.urandom(8))
kdf_iterations = 100000
kdf_keysize = 256 # size of resulting kdf_key
backend = default_backend()
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(),
length=int(kdf_keysize / 8), # 256bit
salt=kdf_salt,
iterations=kdf_iterations,
backend=backend)
kdf_key = kdf.derive(paste_passphrase)
# AES-GCM
adata_size = 128
# cipher_iv = get_random_bytes(int(adata_size / 8))
cipher_iv = bytes(os.urandom(int(adata_size / 8)))
cipher_algo = "aes"
cipher_mode = "gcm"
compression_type = "none"
if paste_compress:
compression_type = "zlib"
# compress plaintext
paste_data = {'paste': paste_plaintext}
if paste_attachment_name and paste_attachment:
paste_data['attachment'] = paste_attachment
paste_data['attachment_name'] = paste_attachment_name
print(paste_attachment_name)
print(paste_attachment)
if paste_compress:
zobj = zlib.compressobj(wbits=-zlib.MAX_WBITS)
paste_blob = zobj.compress(_json_encode(paste_data)) + zobj.flush()
else:
paste_blob = _json_encode(paste_data)
# Associated data to authenticate
paste_adata = [
[
base64.b64encode(cipher_iv).decode("utf-8"),
base64.b64encode(kdf_salt).decode("utf-8"),
kdf_iterations,
kdf_keysize,
adata_size,
cipher_algo,
cipher_mode,
compression_type,
],
paste_formatter,
int(paste_opendicussion),
int(paste_burn),
]
paste_adata_json = _json_encode(paste_adata)
aesgcm = AESGCM(kdf_key)
ciphertext = aesgcm.encrypt(cipher_iv, paste_blob, paste_adata_json)
# Validate
# aesgcm.decrypt(cipher_iv, ciphertext, paste_adata_json)
paste_ciphertext = base64.b64encode(ciphertext).decode("utf-8")
return paste_adata, paste_ciphertext
def _privatebin_send(paste_url,
paste_password,
paste_plaintext,
paste_formatter,
paste_attachment_name,
paste_attachment,
paste_compress,
paste_burn,
paste_opendicussion,
paste_expire):
paste_passphrase = bytes(os.urandom(32))
# paste_passphrase = get_random_bytes(32)
paste_adata, paste_ciphertext = _privatebin_encrypt(paste_passphrase,
paste_password,
paste_plaintext,
paste_formatter,
paste_attachment_name,
paste_attachment,
paste_compress,
paste_burn,
paste_opendicussion)
# json payload for the post API
# https://github.com/PrivateBin/PrivateBin/wiki/API
payload = {
"v": 2,
"adata": paste_adata,
"ct": paste_ciphertext,
"meta": {
"expire": paste_expire,
}
}
# http content type
headers = {'X-Requested-With': 'JSONHttpRequest'}
r = requests.post(paste_url,
data=_json_encode(payload),
headers=headers)
r.raise_for_status()
try:
result = r.json()
except:
return False, 'Error parsing JSON: {}'.format(r.text)
paste_status = result['status']
if paste_status:
paste_message = result['message']
return False, "Error getting status: {}".format(paste_message)
paste_id = result['id']
paste_url_id = result['url']
paste_deletetoken = result['deletetoken']
return {'url': '{}{}#{}'.format(paste_url, paste_url_id, _base58_encode(paste_passphrase).decode("utf-8")),
'delete': '{}/?pasteid={}&deletetoken={}'.format(paste_url, paste_id, paste_deletetoken)}, None
def privatebin(text, url='https://privatebin.net', pass_protect=False, expiration='never', burn_after_reading=False):
paste_url = url
paste_formatter = 'plaintext'
paste_compress = True
paste_opendicussion = 0
paste_burn = 0
paste_password = None
paste_attachment_name = None
paste_attachment = None
if not text:
return False, "You did not provide any text to send to {}".format(url)
if expiration not in pb_expiration_options:
return False, "Incorrect how_long option. Options: '{}'".format("', '".join(expiration_options))
if burn_after_reading:
paste_burn = 1
if pass_protect:
paste_password = pass_protect
return _privatebin_send(paste_url,
paste_password,
text,
paste_formatter,
paste_attachment_name,
paste_attachment,
paste_compress,
paste_burn,
paste_opendicussion,
expiration)
# Yourls API #
class Yourls:
def __init__(self, api_url, username=None, password=None, signature=None,
expiration_age=None, expiration_age_type=None):
self.apiurl = api_url
if username and password and signature is None:
self._data = dict(username=username, password=password)
elif username is None and password is None and signature:
self._data = dict(signature=signature)
elif username is None and password is None and signature is None:
self._data = dict()
else:
raise TypeError(
'If server requires authentication, either pass username and '
'password or signature. Otherwise, leave set to default (None)')
self._data['format'] = 'json'
if expiration_age and expiration_age_type:
if expiration_age_type in ['min', 'hr', 'day']:
self._data['expiry'] = 'clock'
self._data['age'] = expiration_age
self._data['ageMod'] = expiration_age_type
print(f"Adding {expiration_age} {expiration_age_type} expiration to Yourls link...")
def _api_request(self, params):
params = params.copy()
params.update(self._data)
response = requests.get(self.apiurl, params=params)
return response.json()
def _get_short_url(self, jsondata):
if not jsondata.get('shorturl'):
return jsondata['url']['shorturl']
return jsondata['shorturl']
def shorten(self, url, keyword=None, title=None):
data = dict(action='shorturl',
url=url,
keyword=keyword,
title=title)
jsondata = self._api_request(params=data)
return self._get_short_url(jsondata=jsondata)
# Helper functions #
def status(response):
if VERBOSE:
print(response.status_code)
if response:
return True
return False
def convert_pb_expiration_to_yourls(pb_expiration_choice):
conversions = {
"5min": [5, 'min'],
"10min": [10, 'min'],
"1hour": [1, 'hr'],
"1day": [1, 'day'],
"1week": [7, 'day'],
"1month": [30, 'day'],
"1year": [365, 'day'],
"never": [None, None]
}
yourls_expiration_choice = conversions.get(pb_expiration_choice)
if not yourls_expiration_choice:
return [None, None]
return yourls_expiration_choice
def makeJellyfinUser(username, password):
authenticate()
res = makeUser(username)
if status(res):
uid = res.json()['Id']
if uid:
res = resetPassword(uid)
if status(res):
res = setUserPassword(uid, "", password)
if status(res):
if JELLYFIN_USER_POLICY:
res = updatePolicy(uid, JELLYFIN_USER_POLICY)
if status(res):
return True, "User has been created, password has been updated, and policy has been " \
"enforced. "
return False, "User has been created and password has been updated, but their policy could " \
"not be enforced. "
return True, "User has been created and password has been updated."
return False, "Could not change the user password (step 2 of setting new password).\nUser has been " \
"created, but with a blank password."
return False, "Could not reset user password (step 1 of setting new user password).\nUser has been " \
"created, but with a blank password. "
return False, "Could not get User ID to complete process."
return False, "Could not make user."
def checkVars():
return all([JELLYFIN_USER_POLICY, JELLYFIN_URL, JELLYFIN_ADMIN_USERNAME, JELLYFIN_API_KEY, JELLYFIN_ADMIN_PASSWORD])
""" Handling arguments """
parser = argparse.ArgumentParser()
parser.add_argument('username', help="Username of new Jellyfin user (Required)")
parser.add_argument('-p', "--password", help="Specify password fo the new Jellyfin user")
parser.add_argument('-l', "--length", help="How many characters in randomly-generated password")
parser.add_argument('-b', help="Automatically upload to PrivateBin", action='store_true')
parser.add_argument('-t', choices=pb_expiration_options,
help="How long until the paste and link expires")
parser.add_argument('-s', help="Shorten link with YOURLS", action='store_true')
args = parser.parse_args()
def run():
if checkVars():
if args.password:
password = args.password
else:
password = make_password(int(args.length) if args.length else 10)
success, msg = makeJellyfinUser(args.username, password)
print(msg)
if success and args.b:
if PRIVATEBIN_URL:
text = "Username: {}\nPassword: {}\nWebsite: {}".format(args.username, password, JELLYFIN_URL)
data, error = privatebin(
text=text,
url=PRIVATEBIN_URL,
pass_protect=False,
expiration=(args.t if args.t else '1week'), # defaults to expire after 1 week
burn_after_reading=False
)
if not error:
if args.s:
expiration_settings = convert_pb_expiration_to_yourls(args.t if args.t else '1week')
if YOURLS_URL and (YOURLS_SIGNATURE or (YOURLS_USERNAME and YOURLS_PASSWORD)):
if YOURLS_SIGNATURE:
yourls = Yourls(YOURLS_URL + '/yourls-api.php',
signature=YOURLS_SIGNATURE,
expiration_age=expiration_settings[0],
expiration_age_type=expiration_settings[1])
else:
yourls = Yourls(YOURLS_URL + '/yourls-api.php',
username=YOURLS_USERNAME,
password=YOURLS_PASSWORD,
expiration_age=expiration_settings[0],
expiration_age_type=expiration_settings[1])
return True, "Uploaded to PrivateBin and shortened: {}".format(
yourls.shorten(data['url']))
return False, "Please complete the YOURLS settings inside this file."
return True, "Uploaded to PrivateBin: {}".format(data['url'])
return False, "Error uploading to PrivateBin: {}".format(error)
return False, "Please complete the Privatebin settings inside this file."
return True, None
return False, "Please complete all the Jellyfin settings inside this file."
success, msg = run()
print(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment