Created
January 7, 2021 18:12
-
-
Save mschrader15/3cb88c22756ae01829c6abad82335d20 to your computer and use it in GitHub Desktop.
Sumo Emissions Output File Processing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from lxml import etree | |
import numpy as np | |
import pandas as pd | |
import time | |
import csv | |
def parse_detector_xml(file_path, save_path): | |
fields = ['interval_begin', 'interval_end', 'interval_flow', 'interval_harmonicMeanSpeed', 'interval_id', | |
'interval_length', 'interval_nVehContrib', 'interval_nVehEntered', 'interval_occupancy', 'interval_meanSpeed'] | |
fields_simp = [col_name.split(sep="_")[-1] for col_name in fields] | |
data = [] | |
context = etree.iterparse(file_path) | |
for element in context: | |
if len(element[1].attrib) == len(fields): | |
data.append([element[1].attrib[key] for key in fields_simp]) | |
element[1].clear() | |
while element[1].getprevious() is not None: | |
try: | |
del element[1].getparent()[0] | |
except TypeError: | |
break | |
pd.DataFrame(data=np.array(data), columns=fields).to_csv(save_path, index=False) | |
def parse_tl_xml(file_path, save_path): | |
fields = ['tlLogic_id', 'tlLogic_programID', 'tlLogic_type', 'phase_duration', 'phase_state'] | |
data = [] | |
context = etree.iterparse(file_path, events=("start", "end")) | |
for element in context: | |
if element[1].tag == 'tlLogic': | |
if len(element[1].attrib) == 3: | |
meta_data = [element[1].attrib['id'], element[1].attrib['programID'], element[1].attrib['type']] | |
elif element[1].tag == 'phase': | |
if len(element[1].attrib) >= 2: | |
data.append(meta_data + [element[1].attrib['duration'], element[1].attrib['state']]) | |
element[1].clear() | |
pd.DataFrame(data=np.array(data), columns=fields).to_csv(save_path, index=False) | |
class parse_emissions_xml: | |
fields = ['timestep_time', | |
'vehicle_CO', | |
'vehicle_CO2', | |
'vehicle_HC', | |
'vehicle_NOx', | |
'vehicle_PMx', | |
'vehicle_angle', | |
'vehicle_eclass', | |
'vehicle_electricity', | |
'vehicle_fuel', | |
'vehicle_id', | |
'vehicle_lane', | |
'vehicle_noise', | |
'vehicle_pos', | |
'vehicle_route', | |
'vehicle_speed', | |
'vehicle_type', | |
'vehicle_waiting', | |
'vehicle_x', | |
'vehicle_y'] | |
col_name_split = [col_name.split(sep="_")[-1] for col_name in fields] | |
meta_data = [0] | |
def parse_and_write(self, elem): | |
if (elem.tag == 'timestep') and (len(elem.attrib) > 0): | |
self.meta_data = [elem.attrib['time']] | |
elif (elem.tag == 'vehicle') and (len(elem.attrib) >= 19): | |
self._csv_writer.writerow(self.meta_data + [elem.attrib[col_name] for col_name in | |
self.col_name_split[1:]]) | |
def fast_iter(self, context, func): | |
for event, elem in context: | |
func(elem) | |
elem.clear() | |
while elem.getprevious() is not None: | |
try: | |
del elem.getparent()[0] | |
except TypeError: | |
break | |
del context | |
def main(self, file_path, save_path): | |
context = etree.iterparse(file_path, events=("start", "end")) | |
with open(save_path, mode='w+') as file: | |
self._csv_writer = csv.writer(file, delimiter=',') | |
self._csv_writer.writerow(self.fields) | |
self.fast_iter(context, func=self.parse_and_write) | |
class parse_detector_xml: | |
fields = ['interval_begin', 'interval_end', 'interval_flow', 'interval_harmonicMeanSpeed', 'interval_id', | |
'interval_length', 'interval_nVehContrib', 'interval_nVehEntered', 'interval_occupancy', | |
'interval_meanSpeed'] | |
e2_fields = ['interval_begin', | |
'interval_end', | |
'interval_id', | |
'interval_sampledSeconds', | |
'interval_nVehEntered', | |
'interval_nVehLeft', | |
'interval_nVehSeen', | |
'interval_meanSpeed', | |
# 'interval_meanTimeLoss', | |
# 'interval_meanOccupancy', | |
# 'interval_maxOccupancy', | |
# 'interval_meanMaxJamLengthInVehicles', | |
# 'interval_meanMaxJamLengthInMeters', | |
# 'interval_maxJamLengthInVehicles', | |
# 'interval_maxJamLengthInMeters', | |
# 'interval_jamLengthInVehiclesSum', | |
# 'interval_jamLengthInMetersSum', | |
# 'interval_meanHaltingDuration', | |
# 'interval_maxHaltingDuration', | |
# 'interval_haltingDurationSum', | |
# 'interval_meanIntervalHaltingDuration', | |
# 'interval_maxIntervalHaltingDuration', | |
# 'interval_intervalHaltingDurationSum', | |
# 'interval_startedHalts', | |
# 'interval_meanVehicleNumber', | |
# 'interval_maxVehicleNumber' | |
] | |
fields_simp = [col_name.split(sep="_")[-1] for col_name in fields] | |
fields_simp[-1] = 'speed' # fixes the meanSpeed vs speed difference | |
e2_fields_simp = [col_name.split(sep="_")[-1] for col_name in e2_fields] | |
def parse_and_write(self, elem, fields): | |
try: | |
self._csv_writer.writerow([elem.attrib[key] for key in fields]) | |
except KeyError: | |
return 0 | |
def fast_iter(self, context, func, fields): | |
for event, elem in context: | |
func(elem, fields) | |
elem.clear() | |
while elem.getprevious() is not None: | |
try: | |
del elem.getparent()[0] | |
except TypeError: | |
break | |
del context | |
def main(self, file_path, save_path, detect_type): | |
header_fields = self.fields if detect_type == 'e1' else self.e2_fields | |
fields = self.fields_simp if detect_type == 'e1' else self.e2_fields_simp | |
context = etree.iterparse(file_path, events=("start", "end")) | |
with open(save_path, mode='w+') as file: | |
self._csv_writer = csv.writer(file, delimiter=',') | |
self._csv_writer.writerow(header_fields) | |
self.fast_iter(context, func=self.parse_and_write, fields=fields) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pandas as pd | |
import random | |
import math | |
from datetime import datetime, timedelta | |
import sumolib | |
from time import time | |
from research_paper.global_config_parameters import CONFIG | |
from sumo_function_library.unit_conversion import EmissionsUnits | |
from sumo_function_library.function_timing import timing | |
from sumo_function_library.multi_processor_funcs import mp_funcs | |
from sumo_function_library.functions.post_processing.xml_parse import parse_emissions_xml | |
import numpy as np | |
import pyproj | |
# Read in the net file: | |
print("Reading in the net file") | |
NET = sumolib.net.readNet(CONFIG.net_file) | |
PROJECTOR = pyproj.Proj(projparams=NET._location['projParameter']) | |
LOCATION_OFFSET = NET.getLocationOffset() | |
shortest_trip_coords = [[-287.69, -175.55], [97.73, -227.72]] | |
SHORT_TRIP_LENGTH = math.sqrt( | |
(shortest_trip_coords[0][0] - shortest_trip_coords[1][0]) ** 2 + (shortest_trip_coords[0][1] | |
- shortest_trip_coords[1][ | |
1]) ** 2) * EmissionsUnits.meters_to_miles | |
col_level_one = ['vehicle_CO2', 'vehicle_CO', 'vehicle_HC', 'vehicle_NOx', 'vehicle_PMx', 'vehicle_electricity'] | |
col_level_two = ['total', 'average_per_step', 'per_100km'] | |
col_tuple = [(level_one, level_two) for level_one in ['distance', 'norm_time'] for level_two in ['total']] + \ | |
[(level_one, level_two) for level_one in ['vehicle_fuel'] for level_two in col_level_two + ['mpg']] + \ | |
[(level_one, level_two) for level_one in col_level_one for level_two in col_level_two] | |
COLUMNS = pd.MultiIndex.from_tuples(col_tuple) | |
COLUMNS_ITER_LIST = list(set([level[0] for level in col_tuple])) | |
SIM_TIME_STEP = None | |
@timing | |
def _get_sample_df(percent, unique, df): | |
sample_num = int(percent * len(unique)) | |
rand_index = random.sample(range(0, len(unique)), sample_num) | |
sample_ids = unique[rand_index] | |
return sample_ids, df.loc[df['vehicle_id'].isin(sample_ids)].copy() | |
def _do_unit_conversion(df, sim_time_step): | |
for col_name in EmissionsUnits.unit_conversion.keys(): | |
conversion = EmissionsUnits.unit_conversion[col_name]['conversion'] | |
if EmissionsUnits.unit_conversion[col_name]['timestep_mult']: | |
conversion = conversion * sim_time_step | |
df.loc[:, col_name] = df[col_name] * conversion | |
# replace the 0 fuel consumption during decel: | |
mask = df['vehicle_fuel'] <= 0.000076 | |
df.loc[mask, 'vehicle_fuel'] = 0.000077 | |
return df | |
@timing | |
def _calc_distance_and_time(vehicle_ids, df): | |
df['distance'] = None | |
df['norm_time'] = None | |
percent = 0 | |
for i, vehicle_id in enumerate(vehicle_ids): | |
if (i / len(vehicle_ids) * 100) > (percent + 5): | |
print(f"Distance calc is {i / len(vehicle_ids) * 100}% done") | |
vehicle_df = df.loc[df['vehicle_id'] == vehicle_id].copy() | |
distance = [] | |
norm_time = [] | |
# x = vehicle_df.loc[:, ['vehicle_x', 'vehicle_y']].rolling(2).apply(func) | |
for i in range(len(vehicle_df.index)): | |
if i > 0: | |
x_dist = vehicle_df['vehicle_x'].iloc[i] - vehicle_df['vehicle_x'].iloc[i - 1] | |
y_dist = vehicle_df['vehicle_y'].iloc[i] - vehicle_df['vehicle_y'].iloc[i - 1] | |
distance.append(math.sqrt(x_dist ** 2 + y_dist ** 2) * EmissionsUnits.meters_to_miles + distance[i - 1]) | |
norm_time.append(vehicle_df['timestep_time'].iloc[i] - initial_time) | |
else: | |
initial_time = vehicle_df['timestep_time'].iloc[i] | |
distance.append(0) | |
norm_time.append(0) | |
vehicle_df.loc[:, 'distance'] = distance | |
vehicle_df.loc[:, 'norm_time'] = norm_time | |
df.loc[df['vehicle_id'] == vehicle_id, ['distance', 'norm_time']] = \ | |
vehicle_df[['distance', 'norm_time']] | |
return df | |
def _calc_distance_and_time_rolling(vehicle_ids, df, sim_time_step): | |
df['distance'] = None | |
df['distance_inst'] = None | |
df['norm_time'] = None | |
df['accel'] = None | |
new_df = pd.DataFrame(columns=df.columns) | |
meters_2_miles = EmissionsUnits.meters_to_miles | |
percent = 0 | |
for i, vehicle_id in enumerate(vehicle_ids): | |
if (i / len(vehicle_ids) * 100) > (percent + 5): | |
percent = i / len(vehicle_ids) * 100 | |
print(f"Distance calc is {percent}% done") | |
mask = df['vehicle_id'].values == vehicle_id | |
vehicle_df = df[mask].copy() | |
x_dist = vehicle_df['vehicle_x'].diff().values | |
y_dist = vehicle_df['vehicle_y'].diff().values | |
x_dist[0] = 0 | |
y_dist[0] = 0 | |
vehicle_df['distance_inst'] = (x_dist ** 2 + y_dist ** 2) ** (1 / 2) * meters_2_miles | |
vehicle_df['distance_inst'].iloc[0] = 0 | |
vehicle_df.loc[:, 'distance'] = vehicle_df['distance_inst'].cumsum() | |
initial_time = vehicle_df['timestep_time'].iloc[0] | |
vehicle_df.loc[:, 'norm_time'] = vehicle_df['timestep_time'].iloc[1:].subtract(initial_time) | |
vehicle_df['norm_time'].iloc[0] = 0 | |
vehicle_df['distance'].iloc[0] = 0 | |
accel = vehicle_df['vehicle_speed'].diff().divide(sim_time_step) | |
accel[0] = 0 | |
vehicle_df.loc[:, 'accel'] = accel | |
new_df = new_df.append(vehicle_df) | |
return new_df | |
def mp_summary_statistics(ids, df, cutoff_start, cutoff_end): | |
sum_df = pd.DataFrame(index=ids, columns=COLUMNS) | |
percent = 0 | |
for i, vehicle_id in enumerate(ids): | |
drop = False | |
if (i / len(ids) * 100) > (percent + 5): | |
percent = i / len(ids) * 100 | |
print(f"Summary calc is {percent}% done") | |
if 'Total' not in vehicle_id: | |
mask = df['vehicle_id'].values == vehicle_id | |
local_df = df[mask] | |
total_distance = local_df['distance'].iloc[-1] | |
# in miles | |
if total_distance < SHORT_TRIP_LENGTH: | |
sum_df = sum_df.drop(vehicle_id) | |
drop = True | |
elif cutoff_start: | |
if (local_df['timestep_time'].iloc[0] <= cutoff_start) or \ | |
(local_df['timestep_time'].iloc[-1] >= cutoff_end): | |
sum_df = sum_df.drop(vehicle_id) | |
drop = True | |
if not drop: | |
for col in COLUMNS_ITER_LIST: | |
average = None | |
total = None | |
per_100_km = None | |
mile_per = None | |
if (col != 'vehicle_speed') and (col != 'distance') and ( | |
col != 'norm_time'): # lazy logic statement | |
total = local_df[col].sum() | |
average = local_df[col].mean() | |
per_100_km = total * EmissionsUnits.gal_to_L / ( | |
total_distance * EmissionsUnits.miles_to_km) * 100 | |
if col == 'vehicle_fuel': | |
mile_per = total_distance / total | |
elif col == 'vehicle_speed': | |
average = local_df[col].mean() | |
elif col == 'distance': | |
total = total_distance | |
elif col == 'norm_time': | |
total = local_df['norm_time'].iloc[-1] | |
iter_list = [total, average, per_100_km, mile_per] | |
write_chunk = [val for val in iter_list if val is not None] | |
sum_df.loc[vehicle_id, col] = write_chunk | |
return sum_df | |
@timing | |
def _calc_summary_statistics(vehicle_ids, df, cutoff_start, cutoff_end): | |
rows = ['Total_sum', 'Total_average'] | |
sum_df = pd.DataFrame(index=rows, columns=COLUMNS) | |
ids = np.array_split(vehicle_ids, mp_funcs.CPU_COUNT - 1) | |
local_sum_df = mp_funcs.multi_process_divided_indexer_sever(ids, mp_summary_statistics, | |
shared_df={'Use': True, 'df_name': 'df'}, df=df, | |
cutoff_start=cutoff_start, cutoff_end=cutoff_end | |
) | |
return sum_df.append(local_sum_df) | |
@timing | |
def _calc_total_summary_stats(df): | |
for column in df: | |
if 'total' in column[1]: | |
df.loc['Total_sum', column] = df[column].iloc[2:].sum() | |
df.loc['Total_average', column] = df[column].iloc[2:].mean() | |
elif 'average_per_step' in column[1]: | |
df.loc['Total_sum', column] = '' | |
df.loc['Total_average', column] = df[column].iloc[2:].mean() | |
elif '100km' in column[1]: | |
conversion_factor = EmissionsUnits.gal_to_L if column[0] == 'vehicle_fuel' else 1 | |
df.loc['Total_average', column] = df.loc[ | |
'Total_sum', (column[0], 'total')] * conversion_factor \ | |
/ (df.loc['Total_sum', ('distance', 'total')] | |
* EmissionsUnits.miles_to_km) * 100 | |
df.loc['Total_sum', column] = '' | |
elif 'mpg' in column[1]: | |
df.loc['Total_sum', column] = '' | |
df.loc['Total_average', column] = df.loc['Total_sum', ('distance', 'total')] / \ | |
df.loc['Total_sum', (column[0], 'total')] | |
df.loc['Total_std'] = df.iloc[2:].std() | |
return df | |
def convertXY2LonLat(x, y, ): | |
x -= LOCATION_OFFSET[0] | |
y -= LOCATION_OFFSET[1] | |
return PROJECTOR(x, y, inverse=True) | |
def geo_convert(df): | |
df[['vehicle_x_geo', 'vehicle_y_geo']] = convertXY2LonLat(df['vehicle_x'], df['vehicle_y']) | |
return df | |
@timing | |
def _convert_xy_to_lat_lon(df): | |
for col in ['vehicle_x_geo', 'vehicle_y_geo']: | |
df[col] = None | |
return mp_funcs.apply_by_multiprocessing(df, geo_convert, axis=1) | |
@timing | |
def _calculate_dist_and_time_mp(unique_ids, df, sim_time_step): | |
ids = np.array_split(unique_ids, mp_funcs.CPU_COUNT - 1) | |
return mp_funcs.multi_process_divided_indexer_sever(ids, _calc_distance_and_time_rolling, | |
shared_df={'Use': True, 'df_name': 'df'}, df=df, sim_time_step=sim_time_step) | |
def main(config, emissions_file_xml, emissions_file_csv, analysis_output_path, cut_off_start, cut_off_end): | |
dask_df = False | |
if not os.path.exists(emissions_file_csv): | |
print("Converting the XML file to .csv") | |
parse_emissions_xml().main(emissions_file_xml, save_path=emissions_file_csv) | |
print("Read in the emissions csv file") | |
try: | |
emissions_df = pd.read_csv(emissions_file_csv) | |
print("Using Pandas DataFrame") | |
except MemoryError: | |
print("Memory Error. Reading in as Dask DF") | |
emissions_df = dd.read_csv(emissions_file_csv) | |
dask_df = True | |
unique_ids = emissions_df['vehicle_id'].unique() | |
# Convert the units of the DataFrame | |
print("Doing the unit conversion") | |
emissions_df = _do_unit_conversion(emissions_df, float(config.emissions_output_step)) | |
ts = time() | |
# Calculate each vehicles distance travelled and time in the network: | |
print("Calculating distance and time") | |
# emissions_df, _ = _calc_distance_and_time(unique_ids, emissions_df) | |
emissions_df, _ = _calculate_dist_and_time_mp(unique_ids, emissions_df, float(config.emissions_output_step)) | |
# Calculate the summary statistics | |
print("Calculating the summary statistics") | |
summary_df, _ = _calc_summary_statistics(unique_ids, emissions_df, cutoff_start=cut_off_start, | |
cutoff_end=cut_off_end) | |
# Get the totals of the summary statistic | |
print("Calculating the total summary statistics") | |
summary_df, _ = _calc_total_summary_stats(summary_df) | |
# Calculate the geo_locations of vehicles | |
print("Calculating the lat and lon") | |
emissions_df, _ = _convert_xy_to_lat_lon(emissions_df) | |
# Write the data to folder: | |
tf = time() | |
print("The loop took {} s in total".format(tf - ts)) | |
# try: | |
# os.mkdir(ANALYSIS_OUTPUT_PATH) | |
# option = 'y' | |
# except FileExistsError: | |
# print("Directory Exists") | |
# option = input("Do you wish to overwrite the output files? (y/n)") | |
# if option == 'y': | |
summary_df.to_csv(os.path.join(analysis_output_path, 'data_summary.csv')) | |
emissions_df.to_csv(os.path.join(analysis_output_path, 'data.csv')) | |
if __name__ == "__main__": | |
OUTPUT_FOLDER = '9-2-2020' | |
CSV_PATH = os.environ.get('SQL_DATA_PATH') | |
OUTPUT_PATH = os.path.join(os.environ.get('SIM_OUTPUT_DIR'), OUTPUT_FOLDER) | |
file_name = '2020-02-13' | |
LOWER_TIME_BOUND = "23:00:00.000" | |
UPPER_TIME_BOUND = "00:59:59.999" | |
SECONDS = 26 * 3600 | |
DAYS_EXTENDED = 1 | |
date_low = (datetime.strptime(file_name, "%Y-%m-%d") - timedelta(days=DAYS_EXTENDED)).strftime("%m/%d/%Y") | |
date_high = (datetime.strptime(file_name, "%Y-%m-%d") + timedelta(days=DAYS_EXTENDED)).strftime("%m/%d/%Y") | |
analysis_time_low = datetime.strptime(file_name, "%Y-%m-%d").strftime("%m/%d/%Y") + " 00:00:00" | |
analysis_time_high = datetime.strptime(file_name, "%Y-%m-%d").strftime("%m/%d/%Y") + " 23:59:59" | |
time_low = " ".join([date_low, LOWER_TIME_BOUND]) | |
time_high = " ".join([date_high, UPPER_TIME_BOUND]) | |
time_low = datetime.strptime(time_low, "%m/%d/%Y %H:%M:%S.%f") | |
time_high = datetime.strptime(time_high, "%m/%d/%Y %H:%M:%S.%f") | |
analysis_time_low = datetime.strptime(analysis_time_low, "%m/%d/%Y %H:%M:%S") | |
analysis_time_high = datetime.strptime(analysis_time_high, "%m/%d/%Y %H:%M:%S") | |
CUT_OFF_START = (analysis_time_low - time_low).total_seconds() | |
CUT_OFF_END = (analysis_time_high - time_low).total_seconds() | |
EMISSIONS_FILE_XML = os.path.join(os.path.join(OUTPUT_PATH, file_name), '_OUTPUT_emissions.xml') | |
EMISSIONS_FILE_CSV = os.path.join(os.path.join(OUTPUT_PATH, file_name), '_OUTPUT_emissions.csv') | |
ANALYSIS_OUTPUT_PATH = os.path.join(OUTPUT_PATH, file_name) | |
main(emissions_file_xml=EMISSIONS_FILE_XML, emissions_file_csv=EMISSIONS_FILE_CSV, | |
analysis_output_path=ANALYSIS_OUTPUT_PATH, cut_off_start=CUT_OFF_START, cut_off_end=CUT_OFF_END, | |
config=CONFIG) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
SUMO has a built-in method python script for parsing XML but it is slow so I made my own script