Last active
December 7, 2024 16:26
-
-
Save gpproton/f8bdb298be0d624937cd7ffc876e0b83 to your computer and use it in GitHub Desktop.
Quick script for resolving the routes distance and ETA using only simple names of locations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Copyright (c) 2024 Godwin Peter .O | |
Licensed under the MIT License | |
you may not use this file except in compliance with the License. | |
https://opensource.org/license/mit | |
Unless required by applicable law or agreed to in writing, software | |
distributed under the License is distributed on an "AS IS" BASIS, | |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
See the License for the specific language governing permissions and | |
limitations under the License. | |
Author: Godwin peter .O ([email protected]) | |
Created At: Thursday, 7th Nov 2024 | |
Modified By: Godwin peter .O | |
Modified At: Sat Dec 07 2024 | |
""" | |
import asyncio | |
import csv | |
import http.client | |
import json | |
import re | |
import sys | |
import urllib.parse | |
from datetime import datetime | |
from itertools import batched | |
from typing import TypedDict | |
from dateutil.relativedelta import relativedelta | |
## Default Values | |
global_input_file: str = "files/trip_input.csv" | |
global_output_file: str = "files/trip_output.csv" | |
global_nominatim_url: str = "nominatim.openstreetmap.org" | |
global_routing_host: str = "valhalla1.openstreetmap.de" | |
global_http_chunks = 2 | |
global_http_delay = 0.15 | |
global_fixed_speed = 41 | |
global_top_speed = 59 | |
## Shared Types | |
Coord = TypedDict("Coord", {"lat": float, "lon": float}) | |
Location = TypedDict("Location", {"name": str, "address": str, "lat": float, "lon": float}) | |
RouteInfo = TypedDict("RouteInfo", { | |
"trip_code": str, | |
"length": float, | |
"time": str, | |
"source": Coord, | |
"destination": Coord, | |
}) | |
TripInfo = TypedDict("TripInfo", { | |
"trip_code": str, | |
"length": str, | |
"time": str, | |
"source": str, | |
"source_coords": str, | |
"destination": str, | |
"destination_coords": str, | |
}) | |
class Utils: | |
""" | |
Utility class containing helper methods for logging and time formatting. | |
The Utils class offers simple utilities for managing log outputs and converting | |
time values for better readability. This is useful for debugging and timing solutions. | |
- current_time(): Returns the current system time formatted as HH:MM:SS. | |
- log_info(message: str): Logs the provided message with the current timestamp. | |
- format_time(value: float): Takes a number of seconds and formats it into HH:MM:SS. | |
""" | |
@classmethod | |
def current_time(cls) -> str: | |
""" | |
Get the current time formatted as HH:MM:SS. | |
Returns: | |
str: Current time in HH:MM:SS format. | |
""" | |
now = datetime.now() | |
return now.strftime("%H:%M:%S") | |
@classmethod | |
def log_info(cls, message: str) -> None: | |
self = cls() | |
print("{0} | {1}".format(self.current_time(), message)) | |
@classmethod | |
def format_time(cls, value: float) -> str: | |
rt = relativedelta(seconds=int(value)) | |
return "{:02d}:{:02d}:{:02d}".format(int(rt.hours), int(rt.minutes), int(rt.seconds)) | |
class CoordService: | |
""" | |
Service class for geocoding and route resolution using HTTP APIs. | |
The CoordService class is designed to interact with geocoding and | |
routing services through HTTP requests, providing end-users the ability to resolve | |
addresses to coordinates and compute route attributes such as length and estimated travel time. | |
Class Attributes: | |
headers (dict): A dictionary containing default headers used for HTTP connections. | |
Attributes: | |
geocoding_host (str): Hostname for geocoding service. | |
routing_host (str): Hostname for routing service. | |
Methods: | |
- get_coords(address: str): Sends a request to the geocoding service to resolve | |
an address into geographic coordinates (latitude and longitude). | |
- get_route_attributes(trip_code: str, source: Coord, destination: Coord): | |
Queries the routing service to fetch route details including length and estimated duration | |
between specified coordinates. | |
""" | |
geocoding_host: str | |
routing_host: str | |
headers = { | |
"Content-Type": "application/json", | |
"User-Agent": "Chrome" | |
} | |
def __init__(self, geocoding_host: str, routing_host: str) -> None: | |
""" | |
Initializes CoordService with specified geocoding and routing hostnames. | |
Args: | |
geocoding_host (str): Hostname for geocoding service. | |
routing_host (str): Hostname for routing service. | |
""" | |
self.geocoding_host = geocoding_host | |
self.routing_host = routing_host | |
def get_coords(self, address: str) -> Location | None: | |
try: | |
conn = http.client.HTTPSConnection(self.geocoding_host) | |
url_path = "/search?format=json&limit=1&addressdetails=0&[email protected]&q=" + urllib.parse.quote( | |
address) | |
payload = "" | |
conn.request("GET", url_path, payload, self.headers) | |
res = conn.getresponse() | |
data = res.read() | |
result_items = data.decode("utf-8") | |
item = json.loads(result_items)[0] | |
return {"name": address, "lat": item["lat"], "lon": item["lon"], "address": item["display_name"]} | |
except http.client.HTTPException: | |
Utils.log_info("Error: There was an error with the HTTP request") | |
return None | |
def get_route_attributes(self, trip_code: str, source: Coord, destination: Coord) -> RouteInfo | None: | |
global global_fixed_speed | |
global global_top_speed | |
try: | |
conn = http.client.HTTPSConnection(self.routing_host) | |
payload = json.dumps({ | |
"format": "json", | |
"shape_format": "polyline6", | |
"units": "kilometers", | |
"alternates": 0, | |
"search_filter": { | |
"exclude_closures": True | |
}, | |
"costing": "auto", | |
"costing_options": { | |
"auto": {"fixed_speed": global_fixed_speed, "top_speed": global_top_speed} | |
}, | |
"locations": [ | |
{ | |
"lat": source["lat"], | |
"lon": source["lon"] | |
}, | |
{ | |
"lat": destination["lat"], | |
"lon": destination["lon"] | |
} | |
] | |
}) | |
conn.request("POST", "/route", payload, self.headers) | |
res = conn.getresponse() | |
data = res.read() | |
result = data.decode("utf-8") | |
item = json.loads(result) | |
return { | |
"trip_code": trip_code, | |
"length": item["trip"]["summary"]["length"], | |
"time": item["trip"]["summary"]["time"], | |
"source": {"lat": source["lat"], "lon": source["lon"]}, | |
"destination": {"lat": destination["lat"], "lon": destination["lon"]}, | |
} | |
except http.client.HTTPException: | |
Utils.log_info("Error: There was an error with the HTTP request") | |
return None | |
class DataHandler: | |
""" | |
Handles data loading, transformation, and output file generation. | |
The DataHandler class processes CSV input files and extracts relevant location and trip | |
data. It also transforms raw data into structured format and writes out results. | |
Attributes: | |
input_path (str): Path to the input CSV file. | |
output_path (str): Path to the output CSV file. | |
source_data (list): Raw data loaded from the input file. | |
locations (list): List of unique locations extracted from source data. | |
routes (list): List of resolved routes. | |
trips (list): Final list of trip data for output. | |
Methods: | |
- get_location_object(name: str): Finds a Location instance in the locations list | |
by matching the given location name. | |
- get_route_object(trip_code: str): Searches for a resolved RouteInfo instance using | |
the trip code. | |
- generate_output(): Compiles trip data and writes to the output CSV file. | |
- save_output_file(data: list[TripInfo]): Takes trip data and outputs it to the | |
designated output CSV file. | |
""" | |
input_path: str | |
output_path: str | |
source_data = [] | |
locations: list[Location] = [] | |
routes: list[RouteInfo] = [] | |
trips: list[TripInfo] = [] | |
def __init__(self, input_path: str, output_path: str) -> None: | |
""" | |
Initializes the DataHandler with specified paths for input and output files. | |
Args: | |
input_path (str): Path to the input CSV file. | |
output_path (str): Path to the output CSV file. | |
""" | |
self.input_path = input_path | |
self.output_path = output_path | |
self.__load() | |
self.__load_locations() | |
def get_location_object(self, name: str) -> Location | None: | |
return next((item for item in self.locations if item['name'] == name), None) | |
def get_route_object(self, trip_code: str) -> RouteInfo | None: | |
return next((item for item in self.routes if item['trip_code'] == trip_code), None) | |
def __load(self) -> None: | |
try: | |
with open(self.input_path, "r") as source_file: | |
local_source = list(csv.DictReader(source_file, delimiter=",")) | |
self.source_data.extend(local_source) | |
# Correct locations name | |
for line_item in self.source_data: | |
source_value = re.sub(r'[^a-zA-Z0-9\s]', ' ', line_item["source"]).capitalize() | |
destination_value = re.sub(r'[^a-zA-Z0-9\s]', ' ', line_item["destination"]).capitalize() | |
trip_code: str = line_item["trip_code"] | |
line_item["source"] = source_value | |
line_item["destination"] = destination_value | |
line_item["trip_code"] = trip_code.lower() | |
except IOError: | |
Utils.log_info("There was an error reading file {0}".format(self.input_path)) | |
sys.exit() | |
def __load_locations(self) -> None: | |
for line_item in self.source_data: | |
trip_source: str = line_item["source"] | |
trip_destination: str = line_item["destination"] | |
check_source = self.get_location_object(trip_source) | |
check_destination = self.get_location_object(trip_destination) | |
if check_source is None or len(check_source) < 1: | |
self.locations.append({'name': trip_source}) | |
if check_destination is None or len(check_destination) < 1: | |
self.locations.append({'name': trip_destination}) | |
Utils.log_info("Loaded {0} locations for processing...".format(len(self.locations))) | |
def generate_output(self) -> None: | |
if len(self.routes) > 0: | |
for loc in self.source_data: | |
route: RouteInfo = self.get_route_object(loc["trip_code"]) | |
# noinspection PyTypeChecker | |
self.trips.append({ | |
"trip_code": loc["trip_code"], | |
"length": route["length"], | |
"time": Utils.format_time(route["time"]), | |
"source": loc["source"], | |
"source_coords": "{0}, {1}".format(route["source"]["lat"], route["source"]["lon"]), | |
"destination": loc["destination"], | |
"destination_coords": "{0}, {1}".format(route["destination"]["lat"], route["destination"]["lon"]), | |
}) | |
self.save_output_file(self.trips) | |
else: | |
Utils.log_info("Error: No route information resolved...") | |
# noinspection PyTypeChecker | |
def save_output_file(self, data: list[TripInfo]) -> None: | |
# Get the keys from the first dictionary as the header | |
keys = data[0].keys() | |
try: | |
# Open the file in write mode | |
with open(self.output_path, 'w', newline='') as csv_file: | |
writer = csv.DictWriter(csv_file, fieldnames=keys) | |
writer.writeheader() | |
writer.writerows(data) | |
Utils.log_info("Successfully outputted resolved trips...") | |
except IOError: | |
Utils.log_info("There was an error writing to {0}".format(self.input_path)) | |
sys.exit() | |
class DataProcessing: | |
""" | |
Coordinates the asynchronous processing of locations and routes. | |
The DataProcessing class leverages asynchronous functions to handle large datasets | |
efficiently, minimizing blocking I/O during geocode and route resolution. | |
Coordinates the processing of locations and route information. | |
Attributes: | |
data (DataHandler): An instance of DataHandler to manage data transformation. | |
service (CoordService): An instance of CoordService to provide coordinates and | |
routing information services. | |
location_pool (list[Location]): A pool of resolved locations used for processing routes. | |
service (CoordService): Service for resolving coordinates and routes. | |
Methods: | |
- bootstrap(data: DataHandler, service: CoordService): Initializes data processing by | |
resolving coordinates and retrieving route data. | |
- __geocode_coords(): Asynchronously processes and resolves geographic coordinates | |
for each location. | |
- __process_routes(): Asynchronously fetches route details based on resolved | |
locations and source input data. | |
""" | |
data: DataHandler | |
service: CoordService | |
location_pool: list[Location] | |
def __init__(self, data: DataHandler, service: CoordService) -> None: | |
self.data = data | |
self.service = service | |
@classmethod | |
async def bootstrap(cls, data: DataHandler, service: CoordService): | |
""" | |
Bootstrap the data processing with geocoding and routing services. | |
Args: | |
data (DataHandler): Data handler for source and processed data. | |
service (CoordService): Service for resolving coordinates and routes. | |
Returns: | |
DataProcessing: An initialized DataProcessing instance. | |
""" | |
self = cls(data, service) | |
self.location_pool = await self.__geocode_coords() | |
self.data.routes = await self.__process_routes() | |
return self | |
async def __geocode_coords(self) -> list[Location]: | |
global global_http_chunks | |
# Local coord resolver functions | |
async def resolve_location(name: str, delay: float = 0.15) -> Location: | |
await asyncio.sleep(delay) | |
response = self.service.get_coords(name) | |
Utils.log_info("Name: {0}, Lat: {1}, Lon: {2}".format(response["name"], response["lat"], response["lon"])) | |
return response | |
async def exec_chunked(location_chunk: list[Location] | tuple[dict[str, str | float], ...]) -> list[Location]: | |
global global_http_delay | |
chunk_tasks = [resolve_location(location_item["name"], global_http_delay) for location_item in | |
location_chunk] | |
result = await asyncio.gather(*chunk_tasks) | |
return result | |
try: | |
chunked_locations = list(batched(self.data.locations, global_http_chunks)) | |
tasks = [exec_chunked(loc_chunks) for loc_chunks in chunked_locations] | |
Utils.log_info("Started resolving {0} locations for coordinates...".format(len(self.data.locations))) | |
grouped_location: list[list[Location]] = await asyncio.gather(*tasks) | |
Utils.log_info("Completed resolving locations...") | |
resolved_locations: list[Location] = [item for sublist in grouped_location for item in sublist] | |
# update tagged data | |
for local_item in self.data.locations: | |
loc: Location = next((item for item in resolved_locations if item['name'] == local_item["name"]), None) | |
local_item.update(loc) | |
return resolved_locations | |
except Exception as error: | |
Utils.log_info('Error processing coordinates... {0}'.format(error)) | |
return [] | |
async def __process_routes(self) -> list[TripInfo]: | |
try: | |
if len(self.location_pool) > 0: | |
query_data: list = self.data.source_data | |
async def exec_delayed(input_trip_code: str, input_source: Coord, | |
input_destination: Coord) -> RouteInfo: | |
await asyncio.sleep(global_http_delay) | |
response: RouteInfo = self.service.get_route_attributes(input_trip_code, input_source, | |
input_destination) | |
distance = "{0}KM".format(response["length"]) | |
Utils.log_info("Route:{0}, Time: {1}, Distance: {2}".format( | |
input_trip_code, | |
Utils.format_time(float(response["time"])), | |
distance | |
)) | |
return response | |
tasks = [] | |
for item in query_data: | |
data_source: Location = self.data.get_location_object(item["source"]) | |
data_destination: Location = self.data.get_location_object(item["destination"]) | |
trip_code = item["trip_code"] | |
source: Coord = {"lat": data_source["lat"], "lon": data_source["lon"]} | |
destination: Coord = {"lat": data_destination["lat"], "lon": data_destination["lon"]} | |
tasks.append(exec_delayed(trip_code, source, destination)) | |
return await asyncio.gather(*tasks) | |
except Exception as error: | |
Utils.log_info('Error processing route information'.format(error)) | |
return [] | |
async def main(): | |
global global_input_file | |
global global_output_file | |
global global_routing_host | |
global global_nominatim_url | |
proc = await DataProcessing.bootstrap(DataHandler(global_input_file, global_output_file), | |
CoordService(global_nominatim_url, global_routing_host)) | |
proc.data.generate_output() | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
customer | trip_code | trip_id | source | tonnage | destination | contract_type | trailer_type | remarks | fuel_type | sub_ftn | customer_name | billing_fuel_quantity | billing_mileage | backup_3 | backup_2 | backup_1 | created_by | created_on | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2000015 | LUC_IKORO_ABAKALIKI_30_CO_FL_D_L_XXX | T1000002 | IKORODU | 30 | ABAKALIKI | FLOATING | CONTAINER | DIESEL | LOADING | LUCKY FIBRES (NIGERIA) LTD. | 690.000 | 2.100 | 1 | 1 | 1 | FF_TM_04 | 04-23-24 | ||
2000015 | LUC_IKORO_ABUJA_30_CO_FL_D_L_XXX | T1000003 | IKORODU | 30 | ABUJA | FLOATING | CONTAINER | DIESEL | LOADING | LUCKY FIBRES (NIGERIA) LTD. | 800.000 | 2.100 | 1 | 1 | 1 | FF_TM_04 | 04-23-24 | ||
2000015 | LUC_IKORO_ADO_EKITI_30_CO_FL_D_L_XXX | T1000004 | IKORODU | 30 | ADO_EKITI | FLOATING | CONTAINER | DIESEL | LOADING | LUCKY FIBRES (NIGERIA) LTD. | 331.000 | 2.100 | 1 | 1 | 1 | FF_TM_04 | 04-23-24 | ||
2000015 | LUC_IKORO_AKURE_30_CO_FL_D_L_XXX | T1000005 | IKORODU | 30 | AKURE | FLOATING | CONTAINER | DIESEL | LOADING | LUCKY FIBRES (NIGERIA) LTD. | 280.000 | 2.100 | 1 | 1 | 1 | FF_TM_04 | 04-23-24 | ||
2000015 | LUC_IKORO_ASABA_30_CO_FL_D_L_XXX | T1000006 | IKORODU | 30 | ASABA | FLOATING | CONTAINER | DIESEL | LOADING | LUCKY FIBRES (NIGERIA) LTD. | 480.000 | 2.100 | 1 | 1 | 1 | FF_TM_04 | 04-23-24 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@3scan0r Next will be swarming it with large amount of data to write all appropriate error handling.