Created
June 1, 2020 16:32
-
-
Save futureshape/c5ce561cd2d4495c8327fcb640812618 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import random | |
import logging | |
import requests | |
import re | |
from ask_sdk.standard import StandardSkillBuilder | |
from ask_sdk_core.utils import is_request_type, is_intent_name | |
from ask_sdk_core.handler_input import HandlerInput | |
from ask_sdk_core.response_helper import get_plain_text_content | |
from ask_sdk_model.interfaces.display import ( | |
ImageInstance, Image, RenderTemplateDirective, | |
BackButtonBehavior, BodyTemplate3, BodyTemplate1) | |
from ask_sdk_model import ui | |
from ask_sdk_model.ui import SimpleCard | |
from ask_sdk_model import Response | |
from ask_sdk_model.ui import AskForPermissionsConsentCard | |
from ask_sdk_model.services import ServiceException | |
import boto3 | |
from boto3.dynamodb.conditions import Key, Attr | |
dynamodb = boto3.resource('dynamodb') | |
from geopy import Point, geocoders | |
from geopy.distance import distance, VincentyDistance, geodesic | |
from opensky_api import OpenSkyApi | |
sb = StandardSkillBuilder() | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
MODE_NEAREST = "nearest" | |
MODE_RANDOM = "random" | |
NOTIFY_MISSING_PERMISSIONS = ("Please enable Location permissions in " | |
"the Amazon Alexa app.") | |
NO_ADDRESS = ("It looks like you don't have an address set. " | |
"You can set your address from the companion app.") | |
NO_GB = ("I'm sorry, it looks like you're located outside the UK, " | |
"which isn't supported by this skill.") | |
ADDRESS_AVAILABLE = "Your postcode is <say-as interpret-as='spell-out'>{}</say-as>" | |
ERROR = "Uh Oh. Looks like something went wrong." | |
LOCATION_FAILURE = ("There was an error with the Device Address API. " | |
"Please try again.") | |
POSTCODE_FAILURE = ("I'm sorry, there was a problem getting your exact location based on your postcode. Please check you've entered your postcode correctly in the Alexa app.") | |
permissions = ["read::alexa:device:all:address:country_and_postal_code"] | |
BG_IMAGE = "https://s3.amazonaws.com/net.futureshape.planes-in-the-sky/plane-background-alexa.jpg" | |
TAG_RE = re.compile(r'<[^>]+>') | |
def remove_tags(text): | |
return TAG_RE.sub('', text) | |
def supports_display(handler_input): | |
try: | |
if hasattr(handler_input.request_envelope.context.system.device.supported_interfaces, 'display'): | |
return (handler_input.request_envelope.context.system.device.supported_interfaces.display is not None) | |
else: | |
return False | |
except: | |
return False | |
def get_airline_name(callsign): | |
if not callsign: | |
return "" | |
icao_airline_code = callsign[:3] | |
airlines_table = dynamodb.Table('net.futureshape.planes-in-the-sky.airlines') | |
airlines_response = airlines_table.query( | |
KeyConditionExpression=Key('icao_code').eq(icao_airline_code) | |
) | |
if airlines_response['Items']: | |
return airlines_response['Items'][0]['airline_name'] | |
else: | |
return "" | |
def cleanup_airplane_model(raw_model): | |
if "707" in raw_model: | |
return "Boeing <sub alias='Seven Oh Seven'>707</sub>" | |
elif "717" in raw_model: | |
return "Boeing <sub alias='Seven One Seven'>717</sub>" | |
elif "727" in raw_model: | |
return "Boeing <sub alias='Seven Two Seven'>727</sub>" | |
elif "737" in raw_model: | |
return "Boeing <sub alias='Seven Three Seven'>737</sub>" | |
elif "747" in raw_model: | |
return "Boeing <sub alias='Seven Four Seven'>747</sub>" | |
elif "757" in raw_model: | |
return "Boeing <sub alias='Seven Five Seven'>757</sub>" | |
elif "767" in raw_model: | |
return "Boeing <sub alias='Seven Six Seven'>767</sub>" | |
elif "777" in raw_model: | |
return "Boeing <sub alias='Triple Seven'>777</sub>" | |
elif "787" in raw_model: | |
return "Boeing <sub alias='Seven Eight Seven'>787</sub> Dreamliner" | |
elif "A319" in raw_model: | |
return "Airbus <sub alias=\"'A' Three One Nine\">A319</sub>" | |
elif "A320" in raw_model: | |
return "Airbus <sub alias=\"'A' Three Twenty\">A320</sub>" | |
elif "A321" in raw_model: | |
return "Airbus <sub alias=\"'A' Three Two One\">A321</sub>" | |
elif "A380" in raw_model: | |
return "Airbus <sub alias=\"'A' Three Eighty\">A380</sub>" | |
elif "A310" in raw_model: | |
return "Airbus <sub alias=\"'A' Three Ten\">A310</sub>" | |
elif "A33" in raw_model: | |
return "Airbus <sub alias=\"'A' Three Thirty\">A330</sub>" | |
elif "A34" in raw_model: | |
return "Airbus <sub alias=\"'A' Three Fourty\">A340</sub>" | |
else: | |
return raw_model | |
def create_response(handler_input, search_mode): | |
req_envelope = handler_input.request_envelope | |
service_client_fact = handler_input.service_client_factory | |
response_builder = handler_input.response_builder | |
if search_mode == MODE_NEAREST: | |
if not (req_envelope.context.system.user.permissions and | |
req_envelope.context.system.user.permissions.consent_token): | |
response_builder.speak(NOTIFY_MISSING_PERMISSIONS) | |
response_builder.set_card( | |
AskForPermissionsConsentCard(permissions=permissions)) | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
try: | |
device_id = req_envelope.context.system.device.device_id | |
device_addr_client = service_client_fact.get_device_address_service() | |
addr = device_addr_client.get_country_and_postal_code(device_id) | |
logger.info("Postcode: {} - Country: {}".format(addr.postal_code, addr.country_code)) | |
if addr.postal_code is None: | |
response_builder.speak(NO_ADDRESS) | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
if addr.country_code.lower() != "gb": | |
response_builder.speak(NO_GB) | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
except ServiceException: | |
response_builder.speak(ERROR) | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
except Exception as e: | |
raise e | |
try: | |
r = requests.get('https://api.postcodes.io/postcodes/{}'.format(addr.postal_code)) | |
r.raise_for_status() | |
center_lat = r.json()["result"]["latitude"] | |
center_lon = r.json()["result"]["longitude"] | |
logger.info("Location: {}, {}".format(center_lat, center_lon)) | |
except: | |
response_builder.speak(POSTCODE_FAILURE) | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
distKm = 10 | |
north = VincentyDistance(kilometers=distKm).destination(Point(center_lat, center_lon), 0) | |
east = VincentyDistance(kilometers=distKm).destination(Point(center_lat, center_lon), 90) | |
south = VincentyDistance(kilometers=distKm).destination(Point(center_lat, center_lon), 180) | |
west = VincentyDistance(kilometers=distKm).destination(Point(center_lat, center_lon), 270) | |
bounding_box = ( | |
min(north.latitude, east.latitude, south.latitude, west.latitude), | |
max(north.latitude, east.latitude, south.latitude, west.latitude), | |
min(north.longitude, east.longitude, south.longitude, west.longitude), | |
max(north.longitude, east.longitude, south.longitude, west.longitude)) | |
api = OpenSkyApi() | |
open_sky_response = api.get_states(bbox=bounding_box) | |
states = open_sky_response.states | |
if not states: | |
response_builder.speak("Hmmm, looks like there are no planes near you right now.") | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
for plane in states: | |
plane.horizontal_distance_km = geodesic((plane.latitude, plane.longitude), (center_lat, center_lon)).km | |
sorted_states = sorted(states, key=lambda plane: plane.horizontal_distance_km) | |
selected_plane = sorted_states[0] | |
logger.info("Callsign: {}".format(selected_plane.callsign)) | |
if not selected_plane.callsign: | |
response_builder.speak("Sorry, I can't find any information about this plane.") | |
response_builder.set_should_end_session(True) | |
return response_builder.response | |
elif search_mode == MODE_RANDOM: | |
api = OpenSkyApi() | |
open_sky_response = api.get_states() | |
states = open_sky_response.states | |
selected_plane = random.choice(states) | |
while (not selected_plane.callsign) or (not get_airline_name(selected_plane.callsign)): | |
# eliminate planes with no callsign or with obscure callsigns not mapping to an airline | |
selected_plane = random.choice(states) | |
else: | |
raise ValueError("create_response: Incorrect search_mode") | |
logger.info("Selected callsign: {}".format(selected_plane.callsign)) | |
airline_name = get_airline_name(selected_plane.callsign) | |
logger.info("Airline: {}".format(airline_name)) | |
r = requests.get('https://opensky-network.org/api/metadata/aircraft/icao/{}'.format(selected_plane.icao24)) | |
if r.status_code == 200: # returns 404 if plane not found | |
raw_airplane_model = r.json().get("model", "") | |
airplane_model = cleanup_airplane_model(raw_airplane_model) | |
else: | |
raw_airplane_model = "" | |
airplane_model = "" | |
logger.info("Airplane model: {} > {}".format(raw_airplane_model, airplane_model)) | |
iata_flight_number = None | |
r = requests.get('https://opensky-network.org/api/routes?callsign={}'.format(selected_plane.callsign)) | |
route_desc = "" | |
if r.status_code == 200: # returns 404 if route not found | |
iata_flight_number = r.json().get("operatorIata", "") + str(r.json().get("flightNumber", "")) | |
route = r.json().get("route", []) | |
if len(route) >= 2: | |
origin_airport_icao = route[0] | |
destination_airport_icao = route[-1] | |
r = requests.get('https://opensky-network.org/api/airports/?icao={}'.format(origin_airport_icao)) | |
r.raise_for_status() | |
origin_airport_name = r.json().get("name", "") | |
origin_airport_municipality = r.json().get("municipality", "") | |
if origin_airport_municipality not in origin_airport_name: | |
origin_airport_name = origin_airport_municipality + " " + origin_airport_name | |
logger.info("Origin: {}".format(origin_airport_name)) | |
r = requests.get('https://opensky-network.org/api/airports/?icao={}'.format(destination_airport_icao)) | |
r.raise_for_status() | |
destination_airport_name = r.json().get("name", "") | |
destination_airport_municipality = r.json().get("municipality", "") | |
if destination_airport_municipality not in destination_airport_name: | |
destination_airport_name = destination_airport_municipality + " " + destination_airport_name | |
logger.info("Destination: {}".format(destination_airport_name)) | |
#TODO: abstract name cleanup and add vias | |
route_desc = "from {} to {}".format(origin_airport_name, destination_airport_name) | |
position_desc = "" | |
if search_mode == MODE_RANDOM: | |
# TODO: try geonames for names of places over water | |
nom = geocoders.Nominatim(user_agent="net.futureshape.planes-in-the-sky.alexa-skill") | |
# gn = geocoders.GeoNames(username="futureshape", user_agent="net.futureshape.planes-in-the-sky.alexa-skill", format_string=None, ssl_context=DEFAULT_SENTINEL, scheme='http') | |
location = nom.reverse((selected_plane.latitude, selected_plane.longitude), language="en-GB, en-US") | |
# location = gn.reverse((selected_plane.latitude, selected_plane.longitude), lang="GB", find_nearby_type='findNearbyPlaceName') | |
logger.info("Location: {}".format(location.raw)) | |
if location.raw.get('address', False): | |
city = location.raw['address'].get('city', "") | |
if not city: | |
city = location.raw['address'].get('state', "") | |
country = location.raw['address'].get('country', "") | |
position_desc = "currently flying over {}, {}".format(city, country) | |
if supports_display(handler_input): | |
title = "Your nearest flight is " if search_mode == MODE_NEAREST else "A random flight is " | |
if iata_flight_number: | |
title = title + iata_flight_number | |
else: | |
title = title + selected_plane.callsign | |
description = get_plain_text_content( | |
primary_text="{} {}".format(airline_name, remove_tags(airplane_model)), | |
secondary_text=(route_desc + ", " + position_desc) if position_desc else route_desc) | |
thumb_url = None | |
try: | |
r = requests.get('http://www.airport-data.com/api/ac_thumb.json?m={}&n=1'.format(selected_plane.icao24), timeout=3) | |
if r.status_code == 200 and r.json()["status"] == 200: | |
thumb_url = r.json()["data"][0]["image"] | |
thumb_url = thumb_url.replace("thumbnails","small") | |
logger.info("Image: {}".format(thumb_url)) | |
except requests.exceptions.Timeout: | |
logger.info("Image: REQUEST TIMED OUT") | |
bg_img = Image(sources=[ImageInstance(url=BG_IMAGE)]) | |
if thumb_url: | |
img = Image( | |
sources=[ImageInstance(url=thumb_url)]) | |
response_builder.add_directive( | |
RenderTemplateDirective( | |
BodyTemplate3( | |
back_button=BackButtonBehavior.HIDDEN, | |
image=img, background_image=bg_img, title=title, | |
text_content=description))) | |
else: | |
response_builder.add_directive( | |
RenderTemplateDirective( | |
BodyTemplate1( | |
back_button=BackButtonBehavior.HIDDEN, | |
background_image=bg_img, title=title, | |
text_content=description))) | |
response_builder.set_should_end_session(True) | |
intro = "The nearest plane" if search_mode == MODE_NEAREST else "A random plane" | |
return response_builder.speak("{} is a {} {} {}, {}".format(intro, airline_name, airplane_model, route_desc, position_desc)).response | |
@sb.request_handler(can_handle_func=is_request_type("LaunchRequest")) | |
def launch_request_handler(handler_input): | |
return create_response(handler_input, MODE_NEAREST) | |
@sb.request_handler(can_handle_func=is_intent_name("GetNearbyPlanes")) | |
def nearby_planes_intent_handler(handler_input): | |
return create_response(handler_input, MODE_NEAREST) | |
@sb.request_handler(can_handle_func=is_intent_name("GetRandomFlight")) | |
def random_flight_intent_handler(handler_input): | |
return create_response(handler_input, MODE_RANDOM) | |
@sb.request_handler(can_handle_func=is_intent_name("AMAZON.HelpIntent")) | |
def help_intent_handler(handler_input): | |
"""Handler for Help Intent.""" | |
# type: (HandlerInput) -> Response | |
speech_text = "Just ask for nearby or random planes!" | |
return handler_input.response_builder.speak(speech_text).response | |
@sb.request_handler( | |
can_handle_func=lambda handler_input: | |
is_intent_name("AMAZON.CancelIntent")(handler_input) or | |
is_intent_name("AMAZON.StopIntent")(handler_input)) | |
def cancel_and_stop_intent_handler(handler_input): | |
"""Single handler for Cancel and Stop Intent.""" | |
# type: (HandlerInput) -> Response | |
speech_text = "Goodbye, and enjoy gazing at the sky!" | |
handler_input.response_builder.set_should_end_session(True) | |
return handler_input.response_builder.speak(speech_text).response | |
@sb.request_handler(can_handle_func=is_intent_name("AMAZON.FallbackIntent")) | |
def fallback_handler(handler_input): | |
"""AMAZON.FallbackIntent is only available in en-US locale. | |
This handler will not be triggered except in that locale, | |
so it is safe to deploy on any locale. | |
""" | |
handler_input.response_builder.speak("Sorry, I can't help you with this request but you can ask for planes flying near you or random planes.") | |
handler_input.response_builder.set_should_end_session(True) | |
return handler_input.response_builder.response | |
@sb.request_handler(can_handle_func=is_request_type("SessionEndedRequest")) | |
def session_ended_request_handler(handler_input): | |
"""Handler for Session End.""" | |
# type: (HandlerInput) -> Response | |
return handler_input.response_builder.response | |
@sb.exception_handler(can_handle_func=lambda i, e: True) | |
def all_exception_handler(handler_input, exception): | |
"""Catch all exception handler, log exception and | |
respond with custom message. | |
""" | |
# type: (HandlerInput, Exception) -> Response | |
logger.error(exception, exc_info=True) | |
speech = "Sorry, there was a problem retrieving this information. Please try again later." | |
handler_input.response_builder.speak(speech).ask(speech) | |
handler_input.response_builder.set_should_end_session(True) | |
return handler_input.response_builder.response | |
handler = sb.lambda_handler() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment