Last active
September 30, 2025 23:07
-
-
Save JonnyWong16/6720b9d9edc5686c72957d94b0d5b381 to your computer and use it in GitHub Desktop.
Example script to demonstrate JWT authentication with Plex.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Example script to demonstrate JWT authentication with Plex. | |
References: | |
https://developer.plex.tv/pms/#section/API-Info/Authenticating-with-Plex | |
Requires: | |
- plexapi | |
- pyjwt[crypto] | |
- python-dotenv | |
""" | |
import base64 | |
import dotenv | |
import hashlib | |
import jwt | |
import os | |
import requests | |
import uuid | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import ed25519 | |
from datetime import datetime, timedelta | |
from plexapi.myplex import MyPlexAccount | |
from plexapi.utils import plexOAuth | |
dotenv.load_dotenv() | |
def getenv(key: str, default: str = '') -> str: | |
return os.getenv(key, default) | |
def setenv(key: str, value: str) -> None: | |
dotenv.set_key('.env', key, value) | |
def generate_client_identifier() -> str: | |
return str(uuid.uuid4()) | |
def get_plex_token(client_identifier: str) -> str: | |
account = plexOAuth( | |
headers={ | |
'X-Plex-Client-Identifier': client_identifier | |
} | |
) | |
return account.authenticationToken | |
def generate_ed25519_keypair() -> tuple[bytes, bytes]: | |
private_key = ed25519.Ed25519PrivateKey.generate() | |
public_key = private_key.public_key() | |
private_raw = private_key.private_bytes( | |
encoding=serialization.Encoding.Raw, | |
format=serialization.PrivateFormat.Raw, | |
encryption_algorithm=serialization.NoEncryption() | |
) | |
public_raw = public_key.public_bytes( | |
encoding=serialization.Encoding.Raw, | |
format=serialization.PublicFormat.Raw | |
) | |
return private_raw, public_raw | |
def generate_key_id(private_key: bytes, public_key: bytes) -> str: | |
return hashlib.sha256(private_key + public_key).hexdigest()[:32] | |
def base64url_encode(data: bytes) -> str: | |
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8') | |
def create_private_jwk(private_key: bytes, public_key: bytes, key_id: str) -> jwt.PyJWK: | |
return jwt.PyJWK.from_dict({ | |
'kty': 'OKP', | |
'crv': 'Ed25519', | |
'x': base64url_encode(public_key), | |
'd': base64url_encode(private_key), | |
'use': 'sig', | |
'alg': 'EdDSA', | |
'kid': key_id, | |
}) | |
def create_public_jwk(public_key: bytes, key_id: str) -> jwt.PyJWK: | |
return jwt.PyJWK.from_dict({ | |
'kty': 'OKP', | |
'crv': 'Ed25519', | |
'x': base64url_encode(public_key), | |
'use': 'sig', | |
'alg': 'EdDSA', | |
'kid': key_id, | |
}) | |
def encode_jwt( | |
nonce: str, | |
client_identifier: str, | |
scope: list[str], | |
key_id: str, | |
private_jwk: jwt.PyJWK, | |
) -> str: | |
payload = { | |
'nonce': nonce, | |
'scope': ','.join(scope), | |
'aud': 'plex.tv', | |
'iss': client_identifier, | |
'iat': int(datetime.now().timestamp()), | |
'exp': int((datetime.now() + timedelta(minutes=5)).timestamp()), | |
} | |
headers = { | |
'kid': key_id | |
} | |
return jwt.encode( | |
payload, | |
key=private_jwk, | |
algorithm='EdDSA', | |
headers=headers | |
) | |
def decode_jwt( | |
jwt_token: str, | |
client_identifier: str, | |
public_jwk: jwt.PyJWK, | |
) -> dict: | |
return jwt.decode( | |
jwt_token, | |
key=public_jwk, | |
algorithms=['EdDSA'], | |
options={ | |
'require': ['aud', 'iss', 'exp', 'iat', 'thumbprint'] | |
}, | |
audience=['plex.tv', client_identifier], | |
issuer='plex.tv', | |
) | |
def register_plex_device( | |
client_identifier: str, | |
plex_token: str, | |
public_jwk: jwt.PyJWK, | |
) -> None: | |
url = 'https://clients.plex.tv/api/v2/auth/jwk' | |
headers = { | |
'X-Plex-Client-Identifier': client_identifier, | |
'X-Plex-Token': plex_token, | |
} | |
body = { | |
'jwk': public_jwk._jwk_data | |
} | |
response = requests.post(url, headers=headers, json=body) | |
response.raise_for_status() | |
def get_plex_nonce(client_identifier: str) -> str: | |
url = 'https://clients.plex.tv/api/v2/auth/nonce' | |
headers = { | |
'X-Plex-Client-Identifier': client_identifier, | |
} | |
response = requests.get(url, headers=headers) | |
response.raise_for_status() | |
return response.json().get('nonce', '') | |
def exchange_plex_jwt(client_identifier: str, jwt_token: str) -> str: | |
url = 'https://clients.plex.tv/api/v2/auth/token' | |
headers = { | |
'X-Plex-Client-Identifier': client_identifier, | |
} | |
body = { | |
'jwt': jwt_token | |
} | |
response = requests.post(url, headers=headers, json=body) | |
response.raise_for_status() | |
return response.json().get('auth_token') | |
def get_plex_public_jwk() -> jwt.PyJWK: | |
url = 'https://clients.plex.tv/api/v2/auth/keys' | |
response = requests.get(url) | |
response.raise_for_status() | |
data = response.json().get('keys', [])[0] | |
return jwt.PyJWK.from_dict(data) | |
def register_device( | |
client_identifier: str, | |
plex_token: str, | |
public_key: bytes, | |
key_id: str | |
) -> None: | |
public_jwk = create_public_jwk(public_key, key_id) | |
register_plex_device(client_identifier, plex_token, public_jwk) | |
def refresh_jwt_token( | |
client_identifier: str, | |
private_key: bytes, | |
public_key: bytes, | |
key_id: str, | |
scope: list[str] | |
) -> str: | |
private_jwk = create_private_jwk(private_key, public_key, key_id) | |
nonce = get_plex_nonce(client_identifier) | |
jwt_token = encode_jwt(nonce, client_identifier, scope, key_id, private_jwk) | |
return exchange_plex_jwt(client_identifier, jwt_token) | |
def verify_jwt_token( | |
jwt_token: str, | |
client_identifier: str, | |
) -> dict: | |
plex_public_jwk = get_plex_public_jwk() | |
return decode_jwt(jwt_token, client_identifier, plex_public_jwk) | |
if __name__ == '__main__': | |
if not (CLIENT_IDENTIFIER := getenv('PLEXAPI_HEADER_IDENTIFIER')): | |
print('No existing client identifier found, generating new one') | |
CLIENT_IDENTIFIER = generate_client_identifier() | |
setenv('PLEXAPI_HEADER_IDENTIFIER', CLIENT_IDENTIFIER) | |
if not (PLEX_TOKEN := getenv('PLEXAPI_PLEX_TOKEN')): | |
print('No existing Plex token found, retrieving new one using Plex OAuth') | |
PLEX_TOKEN = get_plex_token(CLIENT_IDENTIFIER) | |
setenv('PLEXAPI_PLEX_TOKEN', PLEX_TOKEN) | |
if not os.path.exists('private.key') or not os.path.exists('public.key'): | |
print('No existing ED25519 key pair found, generating new one') | |
PRIVATE_KEY, PUBLIC_KEY = generate_ed25519_keypair() | |
with open('private.key', 'wb') as privfile, open('public.key', 'wb') as pubfile: | |
privfile.write(PRIVATE_KEY) | |
pubfile.write(PUBLIC_KEY) | |
else: | |
with open('private.key', 'rb') as privkey, open('public.key', 'rb') as pubkey: | |
PRIVATE_KEY = privkey.read() | |
PUBLIC_KEY = pubkey.read() | |
# Key ID just needs to be any unique identifier for the key pair | |
key_id = generate_key_id(PRIVATE_KEY, PUBLIC_KEY) | |
jwt_refresh_required = False | |
# Refresh JWT if it's expiring within this many days | |
refresh_within_days = 1 | |
# Available scopes: | |
# * username - Access to the user's username | |
# * email - Access to the user's email address | |
# * friendly_name - Access to the user's friendly name | |
# * restricted - Access to the user's restricted status | |
# * anonymous - Access to the user's anonymous status | |
# * joinedAt - Access to the user's account creation timestamp | |
scope = ['username', 'email', 'friendly_name'] | |
if not (PLEX_JWT_TOKEN := getenv('PLEXAPI_PLEX_JWT_TOKEN')): | |
print('No existing JWT token found, registering device') | |
register_device(CLIENT_IDENTIFIER, PLEX_TOKEN, PUBLIC_KEY, key_id) | |
jwt_refresh_required = True | |
else: | |
try: | |
decoded_jwt = verify_jwt_token(PLEX_JWT_TOKEN, CLIENT_IDENTIFIER) | |
except jwt.ExpiredSignatureError: | |
print('Existing JWT token has expired') | |
jwt_refresh_required = True | |
except jwt.InvalidSignatureError: | |
print('Existing JWT token has invalid signature') | |
jwt_refresh_required = True | |
except jwt.InvalidTokenError as e: | |
print(f'Existing JWT token is invalid: {e}') | |
jwt_refresh_required = True | |
else: | |
if decoded_jwt['thumbprint'] != key_id: | |
print('Existing JWT token was signed with a different key') | |
jwt_refresh_required = True | |
elif decoded_jwt['exp'] < int((datetime.now() + timedelta(days=refresh_within_days)).timestamp()): | |
print(F'Existing JWT token is expiring within {refresh_within_days} day(s)') | |
jwt_refresh_required = True | |
if jwt_refresh_required: | |
print(f'Refreshing JWT token with scope: {scope}') | |
PLEX_JWT_TOKEN = refresh_jwt_token(CLIENT_IDENTIFIER, PRIVATE_KEY, PUBLIC_KEY, key_id, scope) | |
setenv('PLEXAPI_PLEX_JWT_TOKEN', PLEX_JWT_TOKEN) | |
decoded_jwt = verify_jwt_token(PLEX_JWT_TOKEN, CLIENT_IDENTIFIER) | |
print(f'Decoded JWT: {decoded_jwt}') | |
issued = datetime.fromtimestamp(decoded_jwt['iat']) | |
print(f'JWT token was issued at {issued}') | |
expiry = datetime.fromtimestamp(decoded_jwt['exp']) | |
print(f'JWT token is valid until {expiry}') | |
account = MyPlexAccount(token=PLEX_JWT_TOKEN) | |
print(f'Username: {account.username}') | |
print(f'Email: {account.email}') | |
print(f'Friendly Name: {account.friendlyName}') | |
print(f'Restricted: {account.restricted}') | |
print(f'Anonymous: {account.anonymous}') | |
print(f'Joined At: {account.joinedAt}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment