-
-
Save naviat/9069ac2ab7a3276799f431f7563ff9dd to your computer and use it in GitHub Desktop.
This is my attempt to get Alexa to return Plex's On Deck and Recently Downloaded lists. It's not the prettiest, but Plex's API isn't the best at the moment. Step-by-step blog post may be found here: http://mlapida.com/thoughts/plex-alexa-interacting-with-your-media-server-through-voice
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import urllib | |
import urllib2 | |
import xml.etree.ElementTree | |
import logging | |
#enable basic logging to CloudWatch Logs | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def lambda_handler(event, context): | |
print("event.session.application.applicationId=" + | |
event['session']['application']['applicationId']) | |
#only run if requested by a specific app ID (past your app ID below) | |
if (event['session']['application']['applicationId'] != | |
"amzn1.echo-sdk-ams.app.[AppIDHere]"): | |
raise ValueError("Invalid Application ID") | |
if event['session']['new']: | |
on_session_started({'requestId': event['request']['requestId']}, | |
event['session']) | |
if event['request']['type'] == "LaunchRequest": | |
return on_launch(event['request'], event['session']) | |
elif event['request']['type'] == "IntentRequest": | |
return on_intent(event['request'], event['session']) | |
elif event['request']['type'] == "SessionEndedRequest": | |
return on_session_ended(event['request'], event['session']) | |
def on_session_started(session_started_request, session): | |
#called when the session starts | |
#nothing to do here | |
print("on_session_started requestId=" + session_started_request['requestId'] | |
+ ", sessionId=" + session['sessionId']) | |
def on_launch(launch_request, session): | |
#called when the user launches the skill without specifying what they want | |
print("on_launch requestId=" + launch_request['requestId'] + | |
", sessionId=" + session['sessionId']) | |
# Dispatch to your skill's launch | |
return get_welcome_response() | |
def on_intent(intent_request, session): | |
#called when the user specifies an intent for this skill | |
print("on_intent requestId=" + intent_request['requestId'] + | |
", sessionId=" + session['sessionId']) | |
intent = intent_request['intent'] | |
intent_name = intent_request['intent']['name'] | |
#Plex Credentials - Please Fill In | |
username = "[username here]" | |
password = "[password]" | |
authKey = plex_login(username,password) | |
# Dispatch to your skill's intent handlers | |
if intent_name == "Plex": | |
return plex_list_on_desk(intent, session, authKey) | |
elif intent_name == "OnDeck": | |
return plex_list_on_desk(intent, session, authKey) | |
elif intent_name == "Download": | |
return plex_list_download(intent, session, authKey) | |
elif intent_name == "AMAZON.HelpIntent": | |
return get_welcome_response() | |
else: | |
raise ValueError("Invalid intent") | |
def on_session_ended(session_ended_request, session): | |
#Called when the user ends the session | |
print("on_session_ended requestId=" + session_ended_request['requestId'] + | |
", sessionId=" + session['sessionId']) | |
# add cleanup logic here | |
# --------------- Functions that control the skill's behavior ------------------ | |
def get_welcome_response(): | |
#the standard welcome message | |
session_attributes = {} | |
card_title = "Welcome" | |
speech_output = "Welcome to the Plex Skill. " \ | |
"You can requet items on deck " \ | |
"or list downloads" | |
# If the user either does not reply to the welcome message or says something | |
# that is not understood, they will be prompted again with this text. | |
reprompt_text = "Welcome to the Plex Skill. " \ | |
"You can requet items on deck " \ | |
"or list downloads" | |
should_end_session = False | |
return build_response(session_attributes, build_speechlet_response( | |
card_title, speech_output, reprompt_text, should_end_session)) | |
def plex_list_on_desk(intent, session, authKey): | |
#return on deck TV shows | |
card_title = intent['name'] | |
session_attributes = {} | |
should_end_session = True | |
#call the on deck function | |
OnDeck = ret_on_deck(str(find_plex_server(authKey)),authKey) | |
speech_output = OnDeck | |
reprompt_text = OnDeck | |
return build_response(session_attributes, build_speechlet_response( | |
card_title, speech_output, reprompt_text, should_end_session)) | |
def plex_list_download(intent, session, authKey): | |
#return recently downloaded | |
card_title = intent['name'] | |
session_attributes = {} | |
should_end_session = True | |
#call the recently downloaded function | |
OnDeck = ret_download(str(find_plex_server(authKey)),authKey) | |
speech_output = OnDeck | |
reprompt_text = OnDeck | |
return build_response(session_attributes, build_speechlet_response( | |
card_title, speech_output, reprompt_text, should_end_session)) | |
# --------------- Helpers that build all of the responses ---------------------- | |
def build_speechlet_response(title, output, reprompt_text, should_end_session): | |
#creates the JSON payload for Alexa | |
return { | |
'outputSpeech': { | |
'type': 'PlainText', | |
'text': output | |
}, | |
'card': { | |
'type': 'Simple', | |
'title': 'SessionSpeechlet - ' + title, | |
'content': 'SessionSpeechlet - ' + output | |
}, | |
'reprompt': { | |
'outputSpeech': { | |
'type': 'PlainText', | |
'text': reprompt_text | |
} | |
}, | |
'shouldEndSession': should_end_session | |
} | |
def build_response(session_attributes, speechlet_response): | |
return { | |
'version': '1.0', | |
'sessionAttributes': session_attributes, | |
'response': speechlet_response | |
} | |
# --------------- Functions specific to Plex ---------------------- | |
def find_plex_server(authKey): | |
#Select the first server you find | |
url = "https://plex.tv/devices.xml?X-Plex-Token=" + authKey | |
try: | |
request = urllib2.Request(url) | |
result = urllib2.urlopen(request) | |
e = xml.etree.ElementTree.fromstring(result.read()) | |
x = 0 | |
#[TODO]: There should be a better way to do this. Check the API docs | |
for atype in e.findall('Device'): | |
if atype.get('provides') == "server": | |
for conns in atype.findall('Connection'): | |
if x == 0: | |
return(conns.get('uri')) | |
x += 1 | |
except urllib2.URLError, e: | |
print(e) | |
def ret_on_deck(url,authKey): | |
#returns a list of TV Shows that are "on deck" | |
MainServerURL = url + "/library/onDeck?X-Plex-Token=" + authKey | |
s = "TV Shows On Deck: \n" | |
try: | |
request = urllib2.Request(MainServerURL) | |
result = urllib2.urlopen(request) | |
e = xml.etree.ElementTree.fromstring(result.read()) | |
#look for the TV shows only | |
for atype in e.findall('Video'): | |
if atype.get('librarySectionTitle') == "TV Shows": | |
s += atype.get('grandparentTitle').split("(")[0].strip() + ". \n" | |
return s | |
except urllib2.URLError, e: | |
print(e) | |
def ret_download(url,authKey): | |
#returns a list of downloads | |
MainServerURL = url + "/library/recentlyAdded?X-Plex-Token=" + authKey | |
#start the string | |
s = "Recently Added: \n" | |
#number of shows to list | |
c = 5 | |
try: | |
request = urllib2.Request(MainServerURL) | |
result = urllib2.urlopen(request) | |
e = xml.etree.ElementTree.fromstring(result.read()) | |
t = 0 | |
m = 0 | |
#search for TV shows | |
for atype in e.findall('Directory'): | |
if t == 0: | |
s = s + "In TV: \n" | |
if atype.get('type') == "season" and t < c: | |
s += atype.get('parentTitle').split("(")[0].strip() + ". \n" | |
t += 1 | |
#search for Movies | |
for atype in e.findall('Video'): | |
if m == 0: | |
s += "In Movies: \n" | |
if atype.get('type') == "movie" and m < c: | |
s += atype.get('title').split("(")[0].strip() + ". \n" | |
m += 1 | |
return s | |
except urllib2.URLError, e: | |
print(e) | |
def plex_login(username,password): | |
#returns a Plex auth token | |
try: | |
url = "https://plex.tv/users/sign_in.xml" | |
headers = { | |
'x-plex-device-name': "AWS Lambda", | |
'x-plex-device': "AWSv01", | |
'x-plex-client-identifier': "049ouolknf9u42oihen" | |
} | |
values = { | |
'user[login]' : username, | |
'user[password]': password | |
} | |
data = urllib.urlencode(values) | |
req = urllib2.Request(url,data,headers) | |
response = urllib2.urlopen(req) | |
e = xml.etree.ElementTree.fromstring(response.read()) | |
return(e.get('authenticationToken')) | |
except urllib2.URLError, e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment