Skip to content

Instantly share code, notes, and snippets.

@mschrader15
Created January 7, 2021 18:12
Show Gist options
  • Save mschrader15/3cb88c22756ae01829c6abad82335d20 to your computer and use it in GitHub Desktop.
Save mschrader15/3cb88c22756ae01829c6abad82335d20 to your computer and use it in GitHub Desktop.
Sumo Emissions Output File Processing
from lxml import etree
import numpy as np
import pandas as pd
import time
import csv
def parse_detector_xml(file_path, save_path):
fields = ['interval_begin', 'interval_end', 'interval_flow', 'interval_harmonicMeanSpeed', 'interval_id',
'interval_length', 'interval_nVehContrib', 'interval_nVehEntered', 'interval_occupancy', 'interval_meanSpeed']
fields_simp = [col_name.split(sep="_")[-1] for col_name in fields]
data = []
context = etree.iterparse(file_path)
for element in context:
if len(element[1].attrib) == len(fields):
data.append([element[1].attrib[key] for key in fields_simp])
element[1].clear()
while element[1].getprevious() is not None:
try:
del element[1].getparent()[0]
except TypeError:
break
pd.DataFrame(data=np.array(data), columns=fields).to_csv(save_path, index=False)
def parse_tl_xml(file_path, save_path):
fields = ['tlLogic_id', 'tlLogic_programID', 'tlLogic_type', 'phase_duration', 'phase_state']
data = []
context = etree.iterparse(file_path, events=("start", "end"))
for element in context:
if element[1].tag == 'tlLogic':
if len(element[1].attrib) == 3:
meta_data = [element[1].attrib['id'], element[1].attrib['programID'], element[1].attrib['type']]
elif element[1].tag == 'phase':
if len(element[1].attrib) >= 2:
data.append(meta_data + [element[1].attrib['duration'], element[1].attrib['state']])
element[1].clear()
pd.DataFrame(data=np.array(data), columns=fields).to_csv(save_path, index=False)
class parse_emissions_xml:
fields = ['timestep_time',
'vehicle_CO',
'vehicle_CO2',
'vehicle_HC',
'vehicle_NOx',
'vehicle_PMx',
'vehicle_angle',
'vehicle_eclass',
'vehicle_electricity',
'vehicle_fuel',
'vehicle_id',
'vehicle_lane',
'vehicle_noise',
'vehicle_pos',
'vehicle_route',
'vehicle_speed',
'vehicle_type',
'vehicle_waiting',
'vehicle_x',
'vehicle_y']
col_name_split = [col_name.split(sep="_")[-1] for col_name in fields]
meta_data = [0]
def parse_and_write(self, elem):
if (elem.tag == 'timestep') and (len(elem.attrib) > 0):
self.meta_data = [elem.attrib['time']]
elif (elem.tag == 'vehicle') and (len(elem.attrib) >= 19):
self._csv_writer.writerow(self.meta_data + [elem.attrib[col_name] for col_name in
self.col_name_split[1:]])
def fast_iter(self, context, func):
for event, elem in context:
func(elem)
elem.clear()
while elem.getprevious() is not None:
try:
del elem.getparent()[0]
except TypeError:
break
del context
def main(self, file_path, save_path):
context = etree.iterparse(file_path, events=("start", "end"))
with open(save_path, mode='w+') as file:
self._csv_writer = csv.writer(file, delimiter=',')
self._csv_writer.writerow(self.fields)
self.fast_iter(context, func=self.parse_and_write)
class parse_detector_xml:
fields = ['interval_begin', 'interval_end', 'interval_flow', 'interval_harmonicMeanSpeed', 'interval_id',
'interval_length', 'interval_nVehContrib', 'interval_nVehEntered', 'interval_occupancy',
'interval_meanSpeed']
e2_fields = ['interval_begin',
'interval_end',
'interval_id',
'interval_sampledSeconds',
'interval_nVehEntered',
'interval_nVehLeft',
'interval_nVehSeen',
'interval_meanSpeed',
# 'interval_meanTimeLoss',
# 'interval_meanOccupancy',
# 'interval_maxOccupancy',
# 'interval_meanMaxJamLengthInVehicles',
# 'interval_meanMaxJamLengthInMeters',
# 'interval_maxJamLengthInVehicles',
# 'interval_maxJamLengthInMeters',
# 'interval_jamLengthInVehiclesSum',
# 'interval_jamLengthInMetersSum',
# 'interval_meanHaltingDuration',
# 'interval_maxHaltingDuration',
# 'interval_haltingDurationSum',
# 'interval_meanIntervalHaltingDuration',
# 'interval_maxIntervalHaltingDuration',
# 'interval_intervalHaltingDurationSum',
# 'interval_startedHalts',
# 'interval_meanVehicleNumber',
# 'interval_maxVehicleNumber'
]
fields_simp = [col_name.split(sep="_")[-1] for col_name in fields]
fields_simp[-1] = 'speed' # fixes the meanSpeed vs speed difference
e2_fields_simp = [col_name.split(sep="_")[-1] for col_name in e2_fields]
def parse_and_write(self, elem, fields):
try:
self._csv_writer.writerow([elem.attrib[key] for key in fields])
except KeyError:
return 0
def fast_iter(self, context, func, fields):
for event, elem in context:
func(elem, fields)
elem.clear()
while elem.getprevious() is not None:
try:
del elem.getparent()[0]
except TypeError:
break
del context
def main(self, file_path, save_path, detect_type):
header_fields = self.fields if detect_type == 'e1' else self.e2_fields
fields = self.fields_simp if detect_type == 'e1' else self.e2_fields_simp
context = etree.iterparse(file_path, events=("start", "end"))
with open(save_path, mode='w+') as file:
self._csv_writer = csv.writer(file, delimiter=',')
self._csv_writer.writerow(header_fields)
self.fast_iter(context, func=self.parse_and_write, fields=fields)
import os
import pandas as pd
import random
import math
from datetime import datetime, timedelta
import sumolib
from time import time
from research_paper.global_config_parameters import CONFIG
from sumo_function_library.unit_conversion import EmissionsUnits
from sumo_function_library.function_timing import timing
from sumo_function_library.multi_processor_funcs import mp_funcs
from sumo_function_library.functions.post_processing.xml_parse import parse_emissions_xml
import numpy as np
import pyproj
# Read in the net file:
print("Reading in the net file")
NET = sumolib.net.readNet(CONFIG.net_file)
PROJECTOR = pyproj.Proj(projparams=NET._location['projParameter'])
LOCATION_OFFSET = NET.getLocationOffset()
shortest_trip_coords = [[-287.69, -175.55], [97.73, -227.72]]
SHORT_TRIP_LENGTH = math.sqrt(
(shortest_trip_coords[0][0] - shortest_trip_coords[1][0]) ** 2 + (shortest_trip_coords[0][1]
- shortest_trip_coords[1][
1]) ** 2) * EmissionsUnits.meters_to_miles
col_level_one = ['vehicle_CO2', 'vehicle_CO', 'vehicle_HC', 'vehicle_NOx', 'vehicle_PMx', 'vehicle_electricity']
col_level_two = ['total', 'average_per_step', 'per_100km']
col_tuple = [(level_one, level_two) for level_one in ['distance', 'norm_time'] for level_two in ['total']] + \
[(level_one, level_two) for level_one in ['vehicle_fuel'] for level_two in col_level_two + ['mpg']] + \
[(level_one, level_two) for level_one in col_level_one for level_two in col_level_two]
COLUMNS = pd.MultiIndex.from_tuples(col_tuple)
COLUMNS_ITER_LIST = list(set([level[0] for level in col_tuple]))
SIM_TIME_STEP = None
@timing
def _get_sample_df(percent, unique, df):
sample_num = int(percent * len(unique))
rand_index = random.sample(range(0, len(unique)), sample_num)
sample_ids = unique[rand_index]
return sample_ids, df.loc[df['vehicle_id'].isin(sample_ids)].copy()
def _do_unit_conversion(df, sim_time_step):
for col_name in EmissionsUnits.unit_conversion.keys():
conversion = EmissionsUnits.unit_conversion[col_name]['conversion']
if EmissionsUnits.unit_conversion[col_name]['timestep_mult']:
conversion = conversion * sim_time_step
df.loc[:, col_name] = df[col_name] * conversion
# replace the 0 fuel consumption during decel:
mask = df['vehicle_fuel'] <= 0.000076
df.loc[mask, 'vehicle_fuel'] = 0.000077
return df
@timing
def _calc_distance_and_time(vehicle_ids, df):
df['distance'] = None
df['norm_time'] = None
percent = 0
for i, vehicle_id in enumerate(vehicle_ids):
if (i / len(vehicle_ids) * 100) > (percent + 5):
print(f"Distance calc is {i / len(vehicle_ids) * 100}% done")
vehicle_df = df.loc[df['vehicle_id'] == vehicle_id].copy()
distance = []
norm_time = []
# x = vehicle_df.loc[:, ['vehicle_x', 'vehicle_y']].rolling(2).apply(func)
for i in range(len(vehicle_df.index)):
if i > 0:
x_dist = vehicle_df['vehicle_x'].iloc[i] - vehicle_df['vehicle_x'].iloc[i - 1]
y_dist = vehicle_df['vehicle_y'].iloc[i] - vehicle_df['vehicle_y'].iloc[i - 1]
distance.append(math.sqrt(x_dist ** 2 + y_dist ** 2) * EmissionsUnits.meters_to_miles + distance[i - 1])
norm_time.append(vehicle_df['timestep_time'].iloc[i] - initial_time)
else:
initial_time = vehicle_df['timestep_time'].iloc[i]
distance.append(0)
norm_time.append(0)
vehicle_df.loc[:, 'distance'] = distance
vehicle_df.loc[:, 'norm_time'] = norm_time
df.loc[df['vehicle_id'] == vehicle_id, ['distance', 'norm_time']] = \
vehicle_df[['distance', 'norm_time']]
return df
def _calc_distance_and_time_rolling(vehicle_ids, df, sim_time_step):
df['distance'] = None
df['distance_inst'] = None
df['norm_time'] = None
df['accel'] = None
new_df = pd.DataFrame(columns=df.columns)
meters_2_miles = EmissionsUnits.meters_to_miles
percent = 0
for i, vehicle_id in enumerate(vehicle_ids):
if (i / len(vehicle_ids) * 100) > (percent + 5):
percent = i / len(vehicle_ids) * 100
print(f"Distance calc is {percent}% done")
mask = df['vehicle_id'].values == vehicle_id
vehicle_df = df[mask].copy()
x_dist = vehicle_df['vehicle_x'].diff().values
y_dist = vehicle_df['vehicle_y'].diff().values
x_dist[0] = 0
y_dist[0] = 0
vehicle_df['distance_inst'] = (x_dist ** 2 + y_dist ** 2) ** (1 / 2) * meters_2_miles
vehicle_df['distance_inst'].iloc[0] = 0
vehicle_df.loc[:, 'distance'] = vehicle_df['distance_inst'].cumsum()
initial_time = vehicle_df['timestep_time'].iloc[0]
vehicle_df.loc[:, 'norm_time'] = vehicle_df['timestep_time'].iloc[1:].subtract(initial_time)
vehicle_df['norm_time'].iloc[0] = 0
vehicle_df['distance'].iloc[0] = 0
accel = vehicle_df['vehicle_speed'].diff().divide(sim_time_step)
accel[0] = 0
vehicle_df.loc[:, 'accel'] = accel
new_df = new_df.append(vehicle_df)
return new_df
def mp_summary_statistics(ids, df, cutoff_start, cutoff_end):
sum_df = pd.DataFrame(index=ids, columns=COLUMNS)
percent = 0
for i, vehicle_id in enumerate(ids):
drop = False
if (i / len(ids) * 100) > (percent + 5):
percent = i / len(ids) * 100
print(f"Summary calc is {percent}% done")
if 'Total' not in vehicle_id:
mask = df['vehicle_id'].values == vehicle_id
local_df = df[mask]
total_distance = local_df['distance'].iloc[-1]
# in miles
if total_distance < SHORT_TRIP_LENGTH:
sum_df = sum_df.drop(vehicle_id)
drop = True
elif cutoff_start:
if (local_df['timestep_time'].iloc[0] <= cutoff_start) or \
(local_df['timestep_time'].iloc[-1] >= cutoff_end):
sum_df = sum_df.drop(vehicle_id)
drop = True
if not drop:
for col in COLUMNS_ITER_LIST:
average = None
total = None
per_100_km = None
mile_per = None
if (col != 'vehicle_speed') and (col != 'distance') and (
col != 'norm_time'): # lazy logic statement
total = local_df[col].sum()
average = local_df[col].mean()
per_100_km = total * EmissionsUnits.gal_to_L / (
total_distance * EmissionsUnits.miles_to_km) * 100
if col == 'vehicle_fuel':
mile_per = total_distance / total
elif col == 'vehicle_speed':
average = local_df[col].mean()
elif col == 'distance':
total = total_distance
elif col == 'norm_time':
total = local_df['norm_time'].iloc[-1]
iter_list = [total, average, per_100_km, mile_per]
write_chunk = [val for val in iter_list if val is not None]
sum_df.loc[vehicle_id, col] = write_chunk
return sum_df
@timing
def _calc_summary_statistics(vehicle_ids, df, cutoff_start, cutoff_end):
rows = ['Total_sum', 'Total_average']
sum_df = pd.DataFrame(index=rows, columns=COLUMNS)
ids = np.array_split(vehicle_ids, mp_funcs.CPU_COUNT - 1)
local_sum_df = mp_funcs.multi_process_divided_indexer_sever(ids, mp_summary_statistics,
shared_df={'Use': True, 'df_name': 'df'}, df=df,
cutoff_start=cutoff_start, cutoff_end=cutoff_end
)
return sum_df.append(local_sum_df)
@timing
def _calc_total_summary_stats(df):
for column in df:
if 'total' in column[1]:
df.loc['Total_sum', column] = df[column].iloc[2:].sum()
df.loc['Total_average', column] = df[column].iloc[2:].mean()
elif 'average_per_step' in column[1]:
df.loc['Total_sum', column] = ''
df.loc['Total_average', column] = df[column].iloc[2:].mean()
elif '100km' in column[1]:
conversion_factor = EmissionsUnits.gal_to_L if column[0] == 'vehicle_fuel' else 1
df.loc['Total_average', column] = df.loc[
'Total_sum', (column[0], 'total')] * conversion_factor \
/ (df.loc['Total_sum', ('distance', 'total')]
* EmissionsUnits.miles_to_km) * 100
df.loc['Total_sum', column] = ''
elif 'mpg' in column[1]:
df.loc['Total_sum', column] = ''
df.loc['Total_average', column] = df.loc['Total_sum', ('distance', 'total')] / \
df.loc['Total_sum', (column[0], 'total')]
df.loc['Total_std'] = df.iloc[2:].std()
return df
def convertXY2LonLat(x, y, ):
x -= LOCATION_OFFSET[0]
y -= LOCATION_OFFSET[1]
return PROJECTOR(x, y, inverse=True)
def geo_convert(df):
df[['vehicle_x_geo', 'vehicle_y_geo']] = convertXY2LonLat(df['vehicle_x'], df['vehicle_y'])
return df
@timing
def _convert_xy_to_lat_lon(df):
for col in ['vehicle_x_geo', 'vehicle_y_geo']:
df[col] = None
return mp_funcs.apply_by_multiprocessing(df, geo_convert, axis=1)
@timing
def _calculate_dist_and_time_mp(unique_ids, df, sim_time_step):
ids = np.array_split(unique_ids, mp_funcs.CPU_COUNT - 1)
return mp_funcs.multi_process_divided_indexer_sever(ids, _calc_distance_and_time_rolling,
shared_df={'Use': True, 'df_name': 'df'}, df=df, sim_time_step=sim_time_step)
def main(config, emissions_file_xml, emissions_file_csv, analysis_output_path, cut_off_start, cut_off_end):
dask_df = False
if not os.path.exists(emissions_file_csv):
print("Converting the XML file to .csv")
parse_emissions_xml().main(emissions_file_xml, save_path=emissions_file_csv)
print("Read in the emissions csv file")
try:
emissions_df = pd.read_csv(emissions_file_csv)
print("Using Pandas DataFrame")
except MemoryError:
print("Memory Error. Reading in as Dask DF")
emissions_df = dd.read_csv(emissions_file_csv)
dask_df = True
unique_ids = emissions_df['vehicle_id'].unique()
# Convert the units of the DataFrame
print("Doing the unit conversion")
emissions_df = _do_unit_conversion(emissions_df, float(config.emissions_output_step))
ts = time()
# Calculate each vehicles distance travelled and time in the network:
print("Calculating distance and time")
# emissions_df, _ = _calc_distance_and_time(unique_ids, emissions_df)
emissions_df, _ = _calculate_dist_and_time_mp(unique_ids, emissions_df, float(config.emissions_output_step))
# Calculate the summary statistics
print("Calculating the summary statistics")
summary_df, _ = _calc_summary_statistics(unique_ids, emissions_df, cutoff_start=cut_off_start,
cutoff_end=cut_off_end)
# Get the totals of the summary statistic
print("Calculating the total summary statistics")
summary_df, _ = _calc_total_summary_stats(summary_df)
# Calculate the geo_locations of vehicles
print("Calculating the lat and lon")
emissions_df, _ = _convert_xy_to_lat_lon(emissions_df)
# Write the data to folder:
tf = time()
print("The loop took {} s in total".format(tf - ts))
# try:
# os.mkdir(ANALYSIS_OUTPUT_PATH)
# option = 'y'
# except FileExistsError:
# print("Directory Exists")
# option = input("Do you wish to overwrite the output files? (y/n)")
# if option == 'y':
summary_df.to_csv(os.path.join(analysis_output_path, 'data_summary.csv'))
emissions_df.to_csv(os.path.join(analysis_output_path, 'data.csv'))
if __name__ == "__main__":
OUTPUT_FOLDER = '9-2-2020'
CSV_PATH = os.environ.get('SQL_DATA_PATH')
OUTPUT_PATH = os.path.join(os.environ.get('SIM_OUTPUT_DIR'), OUTPUT_FOLDER)
file_name = '2020-02-13'
LOWER_TIME_BOUND = "23:00:00.000"
UPPER_TIME_BOUND = "00:59:59.999"
SECONDS = 26 * 3600
DAYS_EXTENDED = 1
date_low = (datetime.strptime(file_name, "%Y-%m-%d") - timedelta(days=DAYS_EXTENDED)).strftime("%m/%d/%Y")
date_high = (datetime.strptime(file_name, "%Y-%m-%d") + timedelta(days=DAYS_EXTENDED)).strftime("%m/%d/%Y")
analysis_time_low = datetime.strptime(file_name, "%Y-%m-%d").strftime("%m/%d/%Y") + " 00:00:00"
analysis_time_high = datetime.strptime(file_name, "%Y-%m-%d").strftime("%m/%d/%Y") + " 23:59:59"
time_low = " ".join([date_low, LOWER_TIME_BOUND])
time_high = " ".join([date_high, UPPER_TIME_BOUND])
time_low = datetime.strptime(time_low, "%m/%d/%Y %H:%M:%S.%f")
time_high = datetime.strptime(time_high, "%m/%d/%Y %H:%M:%S.%f")
analysis_time_low = datetime.strptime(analysis_time_low, "%m/%d/%Y %H:%M:%S")
analysis_time_high = datetime.strptime(analysis_time_high, "%m/%d/%Y %H:%M:%S")
CUT_OFF_START = (analysis_time_low - time_low).total_seconds()
CUT_OFF_END = (analysis_time_high - time_low).total_seconds()
EMISSIONS_FILE_XML = os.path.join(os.path.join(OUTPUT_PATH, file_name), '_OUTPUT_emissions.xml')
EMISSIONS_FILE_CSV = os.path.join(os.path.join(OUTPUT_PATH, file_name), '_OUTPUT_emissions.csv')
ANALYSIS_OUTPUT_PATH = os.path.join(OUTPUT_PATH, file_name)
main(emissions_file_xml=EMISSIONS_FILE_XML, emissions_file_csv=EMISSIONS_FILE_CSV,
analysis_output_path=ANALYSIS_OUTPUT_PATH, cut_off_start=CUT_OFF_START, cut_off_end=CUT_OFF_END,
config=CONFIG)
@mschrader15
Copy link
Author

SUMO has a built-in method python script for parsing XML but it is slow so I made my own script

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment