Created
January 20, 2021 01:38
-
-
Save Cediddi/cfcf84aa3541a46beeee4064e3e33e40 to your computer and use it in GitHub Desktop.
A simple script that can run as a daemon to check and notify you about your public transport arriving in next hour.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pathlib | |
import sys | |
import time | |
import datetime | |
from copy import deepcopy | |
from typing import List, Dict, Any | |
from urllib.parse import urljoin | |
import appdirs | |
import daemonocle | |
import requests | |
from notifypy import Notify | |
""" | |
How to use? | |
1- Run the script with configure action `python allons_y.py configure` | |
2- Start the daemon as `python allons_y.py start` | |
3- ??? | |
4- PROFIT | |
""" | |
class AllonsDaemon(daemonocle.Daemon): | |
""" | |
The main class Allons-Y uses. It deals with process management as well as the business logic. | |
""" | |
user_config_dir = pathlib.Path(appdirs.user_config_dir(appname="allons_y", roaming=True, version="1")) | |
config_file = user_config_dir / "config.json" | |
config_template = { | |
"options": { | |
"notification_interval": 10, # seconds | |
}, | |
"routes": { | |
"home": { | |
"station_id": None, # Station id from the api | |
"line_id": None, # Line id from the api | |
"start_time": None, # When should notifications start | |
"end_time": None, # When should notifications end | |
}, | |
"work": {"station_id": None, "line_id": None, "start_time": None, "end_time": None}, | |
} | |
} | |
base_path = "https://v5.vbb.transport.rest/" | |
def __init__(self, **kwargs): | |
kwargs["worker"] = self.main | |
kwargs["pid_file"] = self.user_config_dir / "pid" | |
super().__init__(**kwargs) | |
def search_station_api(self, name: str) -> Dict[str, Dict]: | |
""" | |
Searches stations based on the name query. | |
:param name: A name query | |
:return: List of stations that match | |
""" | |
resp = requests.get(urljoin(self.base_path, "stations"), params={"query": name, "limit": 10, "fuzzy": True}) | |
resp.raise_for_status() | |
return resp.json() | |
def get_station_api(self, station_id: str) -> Dict[str, Any]: | |
""" | |
Get's the details of a station. | |
:param station_id: Station_id | |
:return: Detailed station object | |
""" | |
resp = requests.get(urljoin(self.base_path, f"stations/{station_id}")) | |
resp.raise_for_status() | |
return resp.json() | |
def get_line_api(self, line_id: str) -> Dict[str, Any]: | |
""" | |
Get's the details of a line | |
:param line_id: Line object | |
:return: Detailed line object | |
""" | |
resp = requests.get(urljoin(self.base_path, f"lines/{line_id}")) | |
resp.raise_for_status() | |
return resp.json() | |
def get_stop_api(self, station_id: str) -> Dict[str, Any]: | |
""" | |
Gets the details of a stop. | |
There's an ongoing issue with this endpoint. | |
Please check: https://github.com/derhuerst/vbb-rest/issues/45 | |
:param station_id: ID of the station | |
:return: Details of the stop | |
""" | |
resp = requests.get(urljoin(self.base_path, f"stops/{station_id}")) | |
resp.raise_for_status() | |
return resp.json() | |
def get_stop_arrivals_api(self, station_id: str, line_name: str) -> List[Dict[str, Any]]: | |
""" | |
Gets the arrivals of a stop/station. | |
There's an ongoing issue with this endpoint. | |
Please check: https://github.com/derhuerst/vbb-rest/issues/45 | |
:param station_id: ID of the station | |
:param line_name: ID of the stop | |
:return: Details of the stop | |
""" | |
resp = requests.get(urljoin(self.base_path, f"stops/{station_id}/arrivals")) | |
resp.raise_for_status() | |
data = resp.json() | |
related_data = [] | |
for arrival in data: | |
if arrival['line']['name'] == line_name: | |
related_data.append(arrival) | |
related_data.sort(key=lambda x: datetime.datetime.fromisoformat(x['when'])) | |
return related_data | |
def update_config(self, config: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Fetches the station and line information from api. | |
:param config: The configuration object. | |
:return: Updated config object | |
""" | |
updated_config = deepcopy(config) | |
for key, value in config['routes'].items(): | |
updated_config['routes'][key]['station'] = self.get_station_api(value['station']) | |
updated_config['routes'][key]['line'] = self.get_line_api(value['line']) | |
return updated_config | |
def check_route(self, config: Dict[str, Any]) -> List[str]: | |
""" | |
Checks the arrivals in next hour. Filters down to selected lines and notifies the user if there are any arrivals | |
within the notification period. | |
:param config: The configuration object. | |
:return: None | |
""" | |
notifications = [] | |
for route in config['routes'].values(): | |
route_arrivals = self.get_stop_arrivals_api(route['station']['id'], route['line']['name']) | |
for arrival in route_arrivals: | |
when = datetime.datetime.fromisoformat(arrival['when']) | |
timespan_related = (route['start_time'] > when.time() > route['end_time']) | |
if timespan_related: | |
notifications.append(f"{route['line']['name']} will arrive to {route['station']['name']} " | |
f"in {when - datetime.datetime.now()}. Be Ready!") | |
return notifications | |
def get_station(self, stations: List[Dict]) -> Dict[str, Any]: | |
"""Asks the user to choose one of the stations""" | |
while True: | |
print("Please select the station by it's index.") | |
for idx, station in enumerate(stations): | |
print(f"[{idx}] - {station['name']}") | |
user_input = input("> ") | |
if user_input.isdigit() and int(user_input) in range(len(stations)): | |
actual_selected = stations[int(user_input)] | |
print(f"You have selected {actual_selected['name']}") | |
return self.get_station_api(actual_selected['id']) | |
else: | |
print("Invalid input.") | |
continue | |
def filter_station(self, key: str) -> List[Dict[str, Any]]: | |
"""Takes a station name query from the user and gets the correct station.""" | |
while True: | |
print(f"What is the nearest station to your {key}?") | |
user_input = input("> ") | |
if not user_input: | |
print("Invalid Input") | |
continue | |
stations = self.search_station_api(user_input) | |
if not stations: | |
print("No station has been found...") | |
continue | |
else: | |
return list(stations.values()) | |
def get_line(self, station: Dict[str, Any]) -> Dict[str, Any]: | |
"""Asks the user to select a line that's related to the selected station.""" | |
lines = station["lines"] | |
while True: | |
print("Please select the line by it's index.") | |
for idx, line in enumerate(lines): | |
line = self.get_line_api(line['id']) | |
variants = sorted(line["variants"], key=lambda x: x["trips"], reverse=True) | |
most_trips = variants[0] | |
# BUG: https://github.com/derhuerst/vbb-rest/issues/45 | |
# first_stop = self.get_stop_api(most_trips["stops"][0]) | |
# last_stop = self.get_stop_api(most_trips["stops"][-1]) | |
# print(f"[{idx}] - {line['name']} ({first_stop['name']} -> {last_stop['name']})") | |
print(f"[{idx}] - {line['name']}") | |
user_input = input("> ") | |
if user_input.isdigit() and int(user_input) in range(len(lines)): | |
actual_selected = lines[int(user_input)] | |
print(f"You have selected {actual_selected['name']}") | |
return actual_selected | |
else: | |
print("Invalid input.") | |
continue | |
def ask_time(self, key: str): | |
"""Reads a iso8601 formatted time from user""" | |
while True: | |
print(f"When should notifications {key}? (ISO-8601 formatted)") | |
user_input = input("> ") | |
if not user_input: | |
print("Invalid Input") | |
continue | |
try: | |
return datetime.time.fromisoformat(user_input) | |
except ValueError: | |
print('Invalid iso time.') | |
continue | |
def get_route(self, route): | |
"""Completes a route to be saved to the config""" | |
stations = self.filter_station(route) | |
station = self.get_station(stations) | |
line = self.get_line(station) | |
start = self.ask_time("start") | |
end = self.ask_time("end") | |
return { | |
"station": station['id'], | |
"line": line['id'], | |
"start_time": start.isoformat(), | |
"end_time": end.isoformat(), | |
} | |
def ask_options(self): | |
"""Asks user the additional options.""" | |
notification_interval = self.config_template['options']['notification_interval'] | |
while True: | |
print(f"How fast should we notify you? [default={notification_interval}, min=1]") | |
user_input = input("> ") | |
if not user_input: | |
break | |
try: | |
value = int(user_input) | |
if value < 1: | |
print('Invalid Input') | |
continue | |
else: | |
notification_interval = value | |
break | |
except ValueError: | |
print('Invalid Input') | |
continue | |
return {"notification_interval": notification_interval} | |
def questions(self): | |
empty_config: Dict[str, Any] = deepcopy(self.config_template) | |
for route in empty_config["routes"].keys(): | |
route_config = self.get_route(route) | |
empty_config["routes"][route] = route_config | |
empty_config["options"] = self.ask_options() | |
return empty_config | |
@daemonocle.expose_action | |
def configure(self): | |
""" | |
The configuration workflow. You must use it before running the main loop. | |
:return: | |
""" | |
config = self.questions() | |
self.user_config_dir.mkdir(exist_ok=True, parents=True) | |
if not self.config_file.exists(): | |
with self.config_file.open("w") as f: | |
json.dump(config, f) | |
print(f"Configuration created at {self.config_file}") | |
exit(0) | |
def main(self): | |
""" | |
Application loop, checks the route notifications every 10 seconds. | |
:return: None | |
""" | |
if not self.config_file.exists(): | |
print("No configuration found, please run 'allons_y configure'") | |
exit(1) | |
with self.config_file.open() as f: | |
try: | |
config = json.load(f) | |
except json.JSONDecodeError: | |
print("Corrupt configuration, please run 'allons_y configure'") | |
exit(1) | |
updated_config = self.update_config(config) | |
last_notified = datetime.datetime.now() - datetime.timedelta( | |
seconds=updated_config['options']['notification_interval']) | |
while True: | |
now = datetime.datetime.now() | |
notifications = self.check_route(updated_config) | |
time.sleep(10) | |
if notifications and (now - last_notified).total_seconds() > config["options"]["notification_interval"]: | |
notification = Notify() | |
notification.title = "Allons-Y Alonso, we have a public transport vehicle to catch!" | |
notification.message = "\n".join(notifications) | |
notification.send() | |
last_notified = now | |
if __name__ == '__main__': | |
daemon = AllonsDaemon() | |
daemon.cli() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
appdirs | |
daemonocle | |
requests | |
notify-py |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment