Created
July 20, 2023 19:26
-
-
Save john-adeojo/fcd8a286ce2c3c97c22d4c8b33456cf0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module provides the data transformation function | |
""" | |
import pandas as pd | |
import numpy as np | |
from utils import parse_duration | |
def transform_data(response_flights_data, response_airline_lookup_data, originLocationCode, destinationLocationCode): | |
""" | |
Processes raw flight data and airline lookup data, and returns structured journey data. | |
This function takes raw response data from a flight search API and an airline lookup API, and processes it | |
into structured dataframes. The function handles extraction of itinerary details, validation of airline codes, | |
and calculation of total journey duration. It also generates several additional data fields for | |
convenience, such as 'journey_id', 'travel_direction', and 'total_legs'. | |
Parameters: | |
response_flights_data (list): A list of dictionaries containing raw flight data from the flight search API. | |
response_airline_lookup_data (list): A list of dictionaries containing airline lookup data from the airline lookup API. | |
originLocationCode (str): The IATA code of the origin location. | |
destinationLocationCode (str): The IATA code of the destination location. | |
Returns: | |
df_flights (DataFrame): A DataFrame containing processed flight data, with each row representing a leg of a journey. | |
journey_pricing (DataFrame): A DataFrame containing pricing information for each journey. | |
flights (DataFrame): A DataFrame containing detailed information for each leg of each journey. | |
""" | |
# Load the data into a DataFrame | |
df = pd.DataFrame(response_flights_data) | |
df_airline_codes = pd.json_normalize(response_airline_lookup_data) | |
# Extract itineraries, validatingAirlineCodes, price (total and currency) and id into separate dataframes | |
df_itineraries = df[['id', 'itineraries']].explode('itineraries').reset_index(drop=True) | |
# In the itineraries column, each cell is a dictionary. So, we need to convert those dictionaries into separate columns. | |
df_itineraries = df_itineraries.join(pd.json_normalize(df_itineraries['itineraries'])).drop(columns='itineraries') | |
# At this point, 'segments' column is a list of dictionaries where each dictionary represents a leg of the journey. | |
# We want each leg to be a separate row in the dataframe. So, explode the 'segments' column. | |
df_itineraries = df_itineraries.explode('segments').reset_index(drop=True) | |
# Add a 'leg_id' column to identify each leg of the journey | |
df_itineraries['leg_id'] = df_itineraries.groupby('id').cumcount() + 1 | |
# Now, convert the dictionaries in the 'segments' column into separate columns | |
df_segments = pd.json_normalize(df_itineraries['segments']) | |
# To avoid overlapping columns, add a prefix to the column names of the new dataframe | |
df_segments.columns = ['flight_' + str(col) for col in df_segments.columns] | |
# Now join the original dataframe with the new one | |
df_itineraries = df_itineraries.join(df_segments).drop(columns='segments') | |
df_validatingAirlineCodes = df[['id', 'validatingAirlineCodes']] | |
# For the price column, we only need total and currency. So, extract only those into a new dataframe | |
df_price = df['price'].apply(pd.Series)[['total', 'currency']] | |
df_price['id'] = df['id'] | |
# Now join these dataframes on the 'id' column | |
df_flights = pd.merge(df_itineraries, df_validatingAirlineCodes, on='id') | |
df_flights = pd.merge(df_flights, df_price, on='id') | |
# Create a new column for the total number of legs per journey | |
df_flights['total_legs'] = df_flights.groupby('id')['leg_id'].transform('max') | |
df_flights = df_flights.merge(right=df_airline_codes, how='left', left_on="flight_operating.carrierCode", right_on="iataCode") | |
df_flights.rename(columns={"id":"journey_id", "commonName":"airline" }, inplace=True) | |
try: | |
df_flights.drop(columns=["flight_id", "validatingAirlineCodes", "businessName", "flight_operating.carrierCode", "flight_aircraft.code", "flight_stops"], inplace=True) | |
except Exception as e: | |
print(f"Flight stops non existent: {e}, will skip removal") | |
df_flights.drop(columns=["flight_id", "validatingAirlineCodes", "businessName", "flight_operating.carrierCode", "flight_aircraft.code"], inplace=True) | |
df_flights.columns = df_flights.columns.str.replace('.', '_') | |
df_flights['total'] = pd.to_numeric(df_flights['total'], errors='coerce') | |
# convert duration into numeric form | |
df_flights['flight_duration'] = np.round(df_flights['flight_duration'].apply(parse_duration),1) | |
df_flights['flight_departure_at'] = pd.to_datetime(df_flights['flight_departure_at']) | |
df_flights['flight_arrival_at'] = pd.to_datetime(df_flights['flight_arrival_at']) | |
outbound_origin = originLocationCode | |
outbound_destination = destinationLocationCode | |
inbound_origin = destinationLocationCode | |
inbound_destination = originLocationCode | |
# Create conditions | |
cond1 = (df_flights['flight_departure_iataCode'] == outbound_origin) | (df_flights['flight_arrival_iataCode'] == destinationLocationCode) | |
cond2 = (df_flights['flight_departure_iataCode'] == inbound_origin) | (df_flights['flight_arrival_iataCode'] == inbound_destination) | |
# Update 'Journey Start' and 'Journey End' based on conditions | |
df_flights.loc[cond1, 'Journey Start'] = originLocationCode | |
df_flights.loc[cond1, 'Journey End'] = destinationLocationCode | |
df_flights.loc[cond2, 'Journey Start'] = destinationLocationCode | |
df_flights.loc[cond2, 'Journey End'] = originLocationCode | |
# Update 'travel_direction' based on 'Journey Start' | |
df_flights.loc[df_flights['Journey Start'] == originLocationCode, 'travel_direction'] = 'Inbound' | |
df_flights.loc[df_flights['Journey Start'] == destinationLocationCode, 'travel_direction'] = 'Outbound' | |
df_flights.loc[df_flights['flight_arrival_iataCode'] == df_flights['Journey End'], 'flight_arrival_iataCode'] = 'N/A' | |
df_flights.loc[df_flights['flight_departure_iataCode'] == df_flights['Journey Start'], 'flight_departure_iataCode'] = 'N/A' | |
# calculate total duration | |
total_duration = df_flights.groupby(['journey_id', 'travel_direction'])['flight_duration'].sum().reset_index() | |
total_duration.rename(columns={'flight_duration': 'total_duration'}, inplace=True) | |
df_flights = pd.merge(df_flights, total_duration, on=['journey_id', 'travel_direction']) | |
df_flights.rename(columns={'flight_departure_iataCode': 'intermediate_journey_departure', | |
'flight_arrival_iataCode': 'intermediate_journey_arrival', 'total': 'journey_price'}, inplace=True) | |
journey_pricing = df_flights[['journey_id', 'Journey Start', 'Journey End', 'travel_direction', 'total_duration', 'journey_price', 'currency']].drop_duplicates() | |
flights = df_flights.drop(columns=['Journey Start', 'Journey End', 'total_duration', 'journey_price', 'duration', 'currency']) | |
return df_flights, journey_pricing, flights |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment