|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import json |
|
import os |
|
import sys |
|
from collections import defaultdict |
|
from xml.etree import ElementTree |
|
import requests |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
logging.basicConfig(stream=sys.stdout, format='%(levelname)s:%(message)s', level=logging.DEBUG) |
|
|
|
|
|
def add_share(server_id, plex_token, json_data): |
|
""" |
|
Posts to the shared servers to enable the share |
|
:param server_id: the server id |
|
:param plex_token: the plex token so that api calls work |
|
:param json_data: the payload of data as json |
|
:return: the posted response or None |
|
""" |
|
url = f'https://plex.tv/api/servers/{server_id}/shared_servers' |
|
headers = {'X-Plex-Token': plex_token, 'Accept': 'application/json'} |
|
resp = None |
|
try: |
|
resp = requests.post(url, headers=headers, json=json_data) |
|
resp.raise_for_status() |
|
except ConnectionError as e: |
|
logger.error(f'A connection error occurred, {e}') |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f'A request exception occurred, {e}') |
|
return resp |
|
|
|
|
|
def get_shares(server_id, plex_token): |
|
""" |
|
Gets the current shares from the api, expected response is xml |
|
:param server_id: the server id |
|
:param plex_token: the plex token so that api calls work |
|
:return: the get response or None |
|
""" |
|
url = f'https://plex.tv/api/servers/{server_id}/shared_servers' |
|
headers = {'X-Plex-Token': plex_token, 'Accept': 'application/json'} |
|
resp = None |
|
try: |
|
resp = requests.get(url, headers=headers) |
|
resp.raise_for_status() |
|
except ConnectionError as e: |
|
logger.error(f'A connection error occurred, {e}') |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f'A request exception occurred, {e}') |
|
return resp |
|
|
|
|
|
def delete_share(server_id, plex_token, user_server_id): |
|
url = f'https://plex.tv/api/servers/{server_id}/shared_servers/{user_server_id}' |
|
headers = {'X-Plex-Token': plex_token, 'Accept': 'application/json'} |
|
resp = None |
|
try: |
|
resp = requests.delete(url, headers=headers) |
|
resp.raise_for_status() |
|
except ConnectionError as e: |
|
logger.error(f'A connection error occurred, {e}') |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f'A request exception occurred, {e}') |
|
return resp |
|
|
|
|
|
def parse_shared_servers(content): |
|
""" |
|
Takes the shares xml feed of data and returns a dict |
|
:param content: the raw text content, expecting xml string |
|
:return: a dict of key being user_id and value being list of server_id |
|
""" |
|
data = defaultdict(list) |
|
tree = ElementTree.fromstring(content) |
|
shared_servers = tree.findall('SharedServer') |
|
for shared_server in shared_servers: |
|
user_id = int(shared_server.get('userID')) |
|
for server in shared_server.findall('Section'): |
|
data[user_id].append(int(server.get('id'))) |
|
return data |
|
|
|
|
|
def share(server_id, plex_token, user_libraries): |
|
""" |
|
Enables the share on all users |
|
:param server_id: the server id |
|
:param plex_token: the plex token so that api calls work |
|
""" |
|
for user_id, library_ids in user_libraries.items(): |
|
payload = {"server_id": server_id, "shared_server": {"library_section_ids": library_ids, "invited_id": user_id}} |
|
response = add_share(server_id, plex_token, payload) |
|
if response.ok: |
|
logger.info("Successfully added share") |
|
|
|
|
|
def un_share(server_id, plex_token, user_libraries): |
|
""" |
|
Removes the share on all users |
|
:param server_id: the server id |
|
:param plex_token: the plex token so that api calls work |
|
""" |
|
get_response = get_shares(server_id, plex_token) |
|
shared_servers = parse_shared_servers(get_response.content) |
|
for user_id, library_ids in user_libraries.items(): |
|
user_server_id = shared_servers.get(user_id) |
|
if server_id: |
|
delete_response = delete_share(server_id, plex_token, user_server_id) |
|
if delete_response.ok: |
|
logger.info(f'Unshared libraries with user {user_id}') |
|
else: |
|
logger.info(f'No libraries shared with user {user_id}') |
|
|
|
|
|
def init_args(): |
|
""" |
|
Sets up argument parsing and returns the arguments |
|
:return: argparse values |
|
""" |
|
parser = argparse.ArgumentParser(description='Share / Un share Plex') |
|
parser.add_argument('-t', action="store", dest="plex_token", type=str, default=os.environ.get('PLEX_TOKEN', '')) |
|
parser.add_argument('-s', action="store", dest="server_id", type=str, default=os.environ.get('SERVER_ID', '')) |
|
parser.add_argument('-l', action="store", dest="user_libraries_file", type=argparse.FileType('r'), required=True) |
|
group = parser.add_mutually_exclusive_group() |
|
group.add_argument('--share', action='store_true', default=False, dest='share') |
|
group.add_argument('--unshare', action='store_true', default=False, dest='unshare') |
|
args = parser.parse_args() |
|
return args |
|
|
|
|
|
def main(): |
|
""" |
|
Entry function that uses the arguments passed to it and calls share or unshare accordingly |
|
""" |
|
args = init_args() |
|
user_libraries = json.loads(args.user_libraries_file) |
|
if args.share: |
|
share(args.server_id, args.plex_token, user_libraries) |
|
else: |
|
un_share(args.server_id, args.plex_token, user_libraries) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |