Last active
February 25, 2020 08:26
-
-
Save JonnyWong16/48d6362884b5edbf5e6d78859035183a to your computer and use it in GitHub Desktop.
Send a PlexPy notification with geolocation data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 1. Install the requests module for python. | |
# pip install requests | |
# 2. Add script arguments in PlexPy. The following script arguments are available by default. More can be added below. | |
# -ip {ip_address} -u {user} -mt {media_type} -t {title} -pf {platform} -pl {player} -da {datestamp} -ti {timestamp} | |
import argparse | |
import requests | |
import sys | |
## EDIT THESE SETTINGS ## | |
PLEXPY_APIKEY = 'XXXX' # Your PlexPy API key | |
PLEXPY_URL = 'http://localhost:8181/' # Your PlexPy URL | |
AGENT_ID = 7 # The notification agent ID for PlexPy | |
# Replace LAN IP addresses that start with the LAN_SUBNET with a WAN IP address | |
# to retrieve geolocation data. Leave REPLACEMENT_WAN_IP blank for no replacement. | |
LAN_SUBNET = '192.168.0' | |
REPLACEMENT_WAN_IP = '' | |
# The notification subject and body | |
# - Use "{p.argument}" for script arguments | |
# - Use "{g.value}" for geolocation data | |
SUBJECT_TEXT = "PlexPy Notification" | |
BODY_TEXT = "User <b>{p.user}</b> has watched {p.media_type} " \ | |
"<font color='blue'><b>'{p.title}'</b></font> on " \ | |
"<font color='green'>'{p.platform}'</font> " \ | |
"[<font color='green'>'{p.player}'</font>] in {g.city}, {g.country} " \ | |
"||| {p.datestamp}-{p.timestamp}" | |
## CODE BELOW ## | |
class GeoData(object): | |
def __init__(self, data=None): | |
data = data or {} | |
self.continent = data.get('continent', 'N/A') | |
self.country = data.get('country', 'N/A') | |
self.region = data.get('region', 'N/A') | |
self.city = data.get('city', 'N/A') | |
self.postal_code = data.get('postal_code', 'N/A') | |
self.timezone = data.get('timezone', 'N/A') | |
self.latitude = data.get('latitude', 'N/A') | |
self.longitude = data.get('longitude', 'N/A') | |
self.longitude = data.get('longitude', 'N/A') | |
self.accuracy = data.get('accuracy', 'N/A') | |
def get_geoip_info(ip_address=''): | |
# Get the geo IP lookup from PlexPy | |
payload = {'apikey': PLEXPY_APIKEY, | |
'cmd': 'get_geoip_lookup', | |
'ip_address': ip_address} | |
try: | |
r = requests.get(PLEXPY_URL.rstrip('/') + '/api/v2', params=payload) | |
response = r.json() | |
if response['response']['result'] == 'success': | |
data = response['response']['data'] | |
if data.get('error'): | |
raise Exception(data['error']) | |
else: | |
sys.stdout.write("Successfully retrieved geolocation data.") | |
return GeoData(data=data) | |
else: | |
raise Exception(response['response']['message']) | |
except Exception as e: | |
sys.stderr.write("PlexPy API 'get_geoip_lookup' request failed: {0}.".format(e)) | |
return GeoData() | |
def send_notification(arguments=None, geodata=None): | |
# Format notification text | |
try: | |
subject = SUBJECT_TEXT.format(p=arguments, g=geodata) | |
body = BODY_TEXT.format(p=arguments, g=geodata) | |
except LookupError as e: | |
sys.stderr.write("Unable to substitute '{0}' in the notification subject or body".format(e)) | |
return None | |
# Send the notification through PlexPy | |
payload = {'apikey': PLEXPY_APIKEY, | |
'cmd': 'notify', | |
'agent_id': AGENT_ID, | |
'subject': subject, | |
'body': body} | |
try: | |
r = requests.post(PLEXPY_URL.rstrip('/') + '/api/v2', params=payload) | |
response = r.json() | |
if response['response']['result'] == 'success': | |
sys.stdout.write("Successfully sent PlexPy notification.") | |
else: | |
raise Exception(response['response']['message']) | |
except Exception as e: | |
sys.stderr.write("PlexPy API 'notify' request failed: {0}.".format(e)) | |
return None | |
if __name__ == '__main__': | |
# Parse arguments from PlexPy | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-ip', '--ip_address', action='store', default='', | |
help='The IP address of the stream') | |
parser.add_argument('-u', '--user', action='store', default='', | |
help='Username of the person watching the stream') | |
parser.add_argument('-mt', '--media_type', action='store', default='', | |
help='The media type of the stream') | |
parser.add_argument('-t', '--title', action='store', default='', | |
help='The title of the media') | |
parser.add_argument('-pf', '--platform', action='store', default='', | |
help='The platform of the stream') | |
parser.add_argument('-pl', '--player', action='store', default='', | |
help='The player of the stream') | |
parser.add_argument('-da', '--datestamp', action='store', default='', | |
help='The date of the stream') | |
parser.add_argument('-ti', '--timestamp', action='store', default='', | |
help='The time of the stream') | |
# Add more arguments as needed | |
p = parser.parse_args() | |
# Check to make sure there is an IP address before proceeding | |
if p.ip_address: | |
if p.ip_address.startswith(LAN_SUBNET) and REPLACEMENT_WAN_IP: | |
ip_address = REPLACEMENT_WAN_IP | |
else: | |
ip_address = p.ip_address | |
geodata = get_geoip_info(ip_address=ip_address) | |
send_notification(arguments=p, geodata=geodata) | |
else: | |
sys.stdout.write("No IP address passed from PlexPy.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@nesrine19992,
That's possible, but you have to code it yourself most likely.