Last active
September 12, 2024 09:36
-
-
Save sebastianknopf/c8cadcfa659831ce6df177d253a2403e to your computer and use it in GitHub Desktop.
Python helper class to process *.asc files used in IVU.pool data for timetable exchange
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import re | |
import os | |
from isa.ascdef import name2def | |
######################################################################################################################## | |
# Helper class for reading and modifying *.asc files. | |
######################################################################################################################## | |
def read_asc_file(filename): | |
asc_file = AscFile() | |
asc_file.read(filename) | |
return asc_file | |
def create_asc_file(filename): | |
asc_file = AscFile(filename) | |
return asc_file | |
class AscFile: | |
def __init__(self, filename=None): | |
self.null_value = 'NULL' | |
self.strict = False | |
self._internal_init() | |
def read(self, filename): | |
self._filename = filename | |
self._definition = name2def(os.path.basename(filename)) | |
if self._definition is None: | |
pass | |
if 'DIMENSIONS' in self._definition: | |
self._dimensions = { | |
'NUM_DIMENSIONS': 0, | |
'REPEAT_FROM': self._definition['DIMENSIONS']['REPEAT_FROM'] | |
} | |
else: | |
self._dimensions = None | |
with open(self._filename, newline='') as asc_file: | |
asc_reader = csv.reader(asc_file, delimiter='#', quotechar='"') | |
ptr_header = 0 | |
lst_record_group = None | |
for index, asc_row in enumerate(asc_reader): | |
if 'HEADER' in self._definition: | |
if index == ptr_header: | |
# add last record group if available | |
if lst_record_group is not None: | |
self.records.append(lst_record_group) | |
lst_record_group = list() | |
# read header entry | |
entry = self._read_entry(asc_row, self._definition['HEADER']) | |
self.headers.append(entry) | |
# check whether we're monitoring dimensions and update number of dimensions for the next subset | |
if self._dimensions is not None: | |
self._dimensions['NUM_DIMENSIONS'] = entry[self._definition['DIMENSIONS']['INDICATOR']] | |
# set next header pointer value | |
ptr_header = ptr_header + entry[self._definition['INCREMENTOR']] + 1 | |
else: | |
# add record only to record group | |
lst_record_group.append( | |
self._read_entry(asc_row, self._definition['DATA'], self._dimensions) | |
) | |
else: | |
# there's no header handling, simply add the record | |
self.records.append( | |
self._read_entry(asc_row, self._definition['DATA']) | |
) | |
# if there were headers handled, add the last remaining record group | |
if lst_record_group is not None: | |
self.records.append(lst_record_group) | |
def write(self, filename=None): | |
if filename == None: | |
filename = self._filename | |
with open(filename, 'w', newline='') as asc_file: | |
asc_writer = csv.writer(asc_file, delimiter='#', quotechar='"', lineterminator='#\n') | |
if len(self.headers) > 0: | |
for index, header in enumerate(self.headers): | |
asc_headers = list() | |
for header_key, header_value in header.items(): | |
def_dtype = self._definition['HEADER'][list(header.keys()).index(header_key)][1] | |
def_dlen = self._definition['HEADER'][list(header.keys()).index(header_key)][2] | |
asc_headers.append(self._create_value(header_value, def_dtype, def_dlen)) | |
asc_writer.writerow(asc_headers) | |
for record in self.records[index]: | |
asc_values = list() | |
for record_key, record_value in record.items(): | |
if record_key == 'DIMENSIONS': | |
for dimension in record_value: | |
for dimension_key, dimension_value in dimension.items(): | |
index = next((i for i, item in enumerate(self._definition['DATA']) if item[0] == dimension_key), -1) | |
def_dtype = self._definition['DATA'][index][1] | |
def_dlen = self._definition['DATA'][index][2] | |
asc_values.append(self._create_value(dimension_value, def_dtype, def_dlen)) | |
else: | |
def_dtype = self._definition['DATA'][list(record.keys()).index(record_key)][1] | |
def_dlen = self._definition['DATA'][list(record.keys()).index(record_key)][2] | |
asc_values.append(self._create_value(record_value, def_dtype, def_dlen)) | |
asc_writer.writerow(asc_values) | |
else: | |
for record in self.records: | |
asc_values = list() | |
for record_key, record_value in record.items(): | |
def_dtype = self._definition['DATA'][list(record.keys()).index(record_key)][1] | |
def_dlen = self._definition['DATA'][list(record.keys()).index(record_key)][2] | |
asc_values.append(self._create_value(record_value, def_dtype, def_dlen)) | |
asc_writer.writerow(asc_values) | |
def find_record(self, rdata, primary_key, foreign_key): | |
record_pkfields = self._create_compare_record(rdata, primary_key) | |
for record in self.records: | |
compare_record = self._create_compare_record(record, foreign_key) | |
if list(record_pkfields.values()) == list(compare_record.values()): | |
return record | |
return None | |
def add_record(self, rdata, primary_key=None): | |
"""record_existing = False | |
record_pkfields = self._create_compare_record(rdata, primary_key) | |
for i in range(len(self.records)): | |
compare_record = self._create_compare_record(self.records[i], primary_key) | |
if record_pkfields == compare_record: | |
record_existing = True | |
break | |
if not record_existing: | |
self.records.append(rdata)""" | |
def remove_records(self, rdata, primary_key=None): | |
"""updated_records = list() | |
for i in range(len(self.records)): | |
compare_record = self._create_compare_record(self.records[i], primary_key) | |
if rdata != compare_record: | |
updated_records.append(self.records[i]) | |
self.records = updated_records""" | |
def replace_foreign_keys(self, foreign_key_columns, repl_map): | |
for i in range(len(self.records)): | |
original_record = self.records[i] | |
updated_record = dict(original_record) | |
updated = False | |
for fkc in foreign_key_columns: | |
if original_record[fkc] in repl_map: | |
updated_record[fkc] = repl_map[original_record[fkc]] | |
updated = True | |
if updated: | |
self.records[i] = updated_record | |
def close(self): | |
self._internal_init() | |
def _internal_init(self): | |
self._filename = None | |
self._definition = None | |
self.headers = list() | |
self.records = list() | |
def _create_value(self, val, dtype=str, dlen=0): | |
if dtype == bool: | |
value = '1' if val == True else '0' | |
else: | |
value = str(val) | |
if not value == '': | |
if dtype == int or dtype == float: | |
value = value.rjust(dlen, ' ') | |
else: | |
value = value.ljust(dlen, ' ') | |
return value | |
def _read_entry(self, row_data, definition, dimensions=None): | |
entry = dict() | |
if dimensions is not None: | |
dimensions_index = [i for i, k in enumerate(definition) if k[0] == dimensions['REPEAT_FROM']][0] | |
dimension_size = len(definition) - dimensions_index | |
for index, def_obj in enumerate(definition): | |
if index < len(row_data): | |
def_key = def_obj[0] | |
def_dtype = def_obj[1] | |
def_optional = def_obj[3] | |
if dimensions is not None and index >= dimensions_index: | |
if 'DIMENSIONS' not in entry: | |
entry['DIMENSIONS'] = list() | |
for d in range(0, dimensions['NUM_DIMENSIONS']): | |
if len(entry['DIMENSIONS']) <= d: | |
entry['DIMENSIONS'].append(dict()) | |
entry['DIMENSIONS'][d][def_key] = self._read_value( | |
row_data[index + dimension_size * d], | |
def_dtype, | |
def_optional | |
) | |
else: | |
entry[def_key] = self._read_value(row_data[index], def_dtype, def_optional) | |
return entry | |
def _read_value(self, val, dtype, optional): | |
val = val.strip() | |
if dtype == str: | |
if not optional and val == '': | |
raise ValueError(f"column {def_key} must not be empty") | |
return val | |
elif dtype == int: | |
if optional and not val == '': | |
return int(val) | |
elif optional and val == '': | |
return val | |
elif not optional: | |
return int(val) | |
elif dtype == float: | |
if optional and not val == '': | |
return float(val) | |
elif optional and val == '': | |
return val | |
elif not optional: | |
return float(val) | |
elif dtype == bool: | |
if not optional and val == '': | |
raise ValueError(f"column {def_key} must not be empty") | |
return True if val == '1' else False | |
def _create_compare_record(self, record, primary_key): | |
if primary_key is not None: | |
compare_record = dict(record) | |
for k in record: | |
if k not in primary_key: | |
del compare_record[k] | |
return compare_record | |
else: | |
return record |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
######################################################################################################################## | |
# Definition of header and data lines in *.asc files. | |
######################################################################################################################## | |
def name2def(filename): | |
filename = os.path.basename(filename) | |
filename = filename.split('.')[0] | |
if filename == 'VERSIONE': | |
return VERSIONS | |
elif filename == 'BITFELD': | |
return BITFIELD | |
elif filename == 'HALTESTE': | |
return STATIONS | |
elif filename == 'BETRIEBE': | |
return OPERATORS | |
elif filename == 'BETRIEBSTEILE': | |
return OPERATOR_ORGANISATIONS | |
elif filename == 'LINIEN': | |
return LINES | |
elif filename == 'LVATTRIB': | |
return LINE_COURSE_ATTRIBUTES | |
elif filename.startswith('LD'): | |
return LDXXXXXX | |
elif filename.startswith('LF'): | |
return LFXXXXXX | |
elif filename == 'FAHRTATT': | |
return TRIP_ATTRIBUTES | |
elif filename.startswith('FD'): | |
return FDXXXXXX | |
elif filename == 'FPLTAB': | |
return TIMETABLE | |
elif filename == 'VERKEHRM': | |
return VEHICLE_TYPES | |
else: | |
return None | |
VERSIONS = { | |
'DATA': [ | |
('ID', int, 10, False), | |
('Name', str, 60, False), | |
('StartDate', str, 10, False), | |
('EndDate', str, 10, False), | |
('CalendarID', int, 10, True), | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID' | |
] | |
} | |
} | |
BITFIELD = { | |
'DATA': [ | |
('ID', int, 10, False), | |
('BitField', str, 255, False) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID' | |
] | |
} | |
} | |
STATIONS = { | |
'DATA': [ | |
('ID', int, 10, False), | |
('DelivererID', str, 10, False), | |
('ParentID', int, 10, True), | |
('ParentDelivererID', str, 10, True), | |
('StationType', str, 2, True), | |
('Code', str, 8, True), | |
('Longitude', float, 10, True), | |
('Latitude', float, 10, True), | |
('MunicipalCode', str, 11, True), | |
('WheelchairAccessible', str, 1, True), | |
('LongName', str, 60, True), | |
('HeadsignText', str, 60, True), | |
('TimetableInformationName', str, 60, True), | |
('PrintingName', str, 60, True), | |
('KilometerInfo', int, 6, True), | |
('InterchangePriority', int, 2, True), | |
('ExportFlag', str, 2, True), | |
('ItcsNumber', int, 10, True), | |
('LocationType', str, 1, True), | |
('GlobalID', str, 60, True) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID', | |
'DelivererID' | |
] | |
} | |
} | |
OPERATORS = { | |
'DATA': [ | |
('ID', int, 10, False), | |
('OperatorNumber', int, 10, True), | |
('Code', str, 8, False), | |
('Name', str, 60, False), | |
('AdditionalName', str, 255, True), | |
('AddressID', int, 10, True), | |
('ShortName', str, 3, True), | |
('Logo', str, 255, True) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID' | |
] | |
} | |
} | |
OPERATOR_ORGANISATIONS = { | |
'DATA': [ | |
('Code', str, 8, False), | |
('Name', str, 60, False), | |
('ID', str, 6, False), | |
('VehicleTypeGroup', str, 32, False), | |
('DelivererID', str, 10, False), | |
('OperatorID', int, 10, True), | |
('AddressID', int, 10, True), | |
('OrganisationNumber', int, 8, False) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID' | |
] | |
} | |
} | |
LINES = { | |
'DATA': [ | |
('OperatorOrganisationID', str, 6, False), | |
('LineNumber', str, 32, False), | |
('Name', str, 32, True), | |
('Type', str, 3, True), | |
('VehicleTypeGroup', str, 32, True), | |
('InternationalID', str, 50, True), | |
('PseudoFlag', bool, 1, True), | |
('ExportNameFlag', bool, 1, True), | |
('TextColor', str, 6, True), | |
('BackgroundColor', str, 6, True), | |
('HafasLogo', str, 255, True), | |
('Comment', str, 255, True) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'OperatorOrganisationID', | |
'LineNumber' | |
] | |
} | |
} | |
LINE_COURSE_ATTRIBUTES = { | |
'DATA': [ | |
('OperatorOrganisationID', str, 6, False), | |
('LineNumber', str, 8, False), | |
('LineVersionNumber', int, 10, False), | |
('AttributeID', str, 10, False), | |
('Value', str, 511, True), | |
('CalendarID', int, 10, True), | |
('CalendarCode', str, 4, True) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
] | |
} | |
} | |
LDXXXXXX = { | |
'HEADER': [ | |
('LineNumber', str, 32, False), | |
('LineVersionNumber', int, 10, False), | |
('LineVersionPriority', int, 3, False), | |
('OperatorOrganisationID', str, 6, False), | |
('SubLineNumber', int, 8, False), | |
('DirectionID', str, 2, False), | |
('NumStops', int, 3, False), | |
('NumTimeDemandTypes', int, 3, False), | |
('VehicleTypeID', str, 10, False), | |
('CalendarID', int, 10, True) | |
], | |
'DATA': [ | |
('ConsecutiveNumber', int, 4, False), | |
('StopCode', str, 8, True), | |
('StopID', int, 10, False), | |
('Kilometers', int, 7, True), | |
('PositionSequenceArrivalTime', int, 4, True), | |
('PositionSequenceDepartureTime', int, 4, True), | |
('TravelTime', str, 6, False), | |
('WaitingTime', str, 6, False), | |
('NoEntry', bool, 1, True), | |
('NoExit', bool, 1, True), | |
('DemandStop', bool, 1, True) | |
], | |
'PRIMARY': { | |
'HEADER': [ | |
'LineNumber', | |
'LineVersionNumber', | |
'OperatorOrganisationID', | |
'SubLineNumber', | |
'DirectionID' | |
], | |
'DATA': [ | |
'ConsecutiveNumber' | |
] | |
}, | |
'INCREMENTOR': 'NumStops', | |
'DIMENSIONS': { | |
'INDICATOR': 'NumTimeDemandTypes', | |
'REPEAT_FROM': 'TravelTime' | |
} | |
} | |
LFXXXXXX = { | |
'HEADER': [ | |
('OperatorOrganisationID', str, 6, False), | |
('LineNumber', str, 32, False), | |
('DirectionID', str, 2, False), | |
('LineVersionNumber', int, 10, False), | |
('NumStops', int, 3, False) | |
], | |
'DATA': [ | |
('StopID', int, 10, False), | |
('ConsecutiveNumber', int, 4, False), | |
('DisplayBoldFlag', bool, 1, True), | |
('DisplayCursiveFlag', bool, 1, True), | |
('DisplayPostedTimetableFlag', bool, 1, True), | |
('DisplayGuidlineFlag', bool, 1, True), | |
('DisplayTimetableFlag', bool, 1, True), | |
('DisplayArrivalFlag', bool, 1, True), | |
('DisplayDepartureFlag', bool, 1, True), | |
('DisplayStopName', str, 60, True) | |
], | |
'PRIMARY': { | |
'HEADER': [ | |
'OperatorOrganisationID', | |
'LineNumber', | |
'DirectionID', | |
'LineVersionNumber' | |
], | |
'DATA': [ | |
'ConsecutiveNumber' | |
] | |
}, | |
'INCREMENTOR': 'NumStops' | |
} | |
TRIP_ATTRIBUTES = { | |
'DATA': [ | |
('OperatorOrganisationID', str, 6, False), | |
('LineNumber', str, 8, False), | |
('DirectionID', str, 2, False), | |
('VersionNumber', int, 10, False), | |
('InternalTripNumber', str, 10, True), | |
('PositionSequenceFrom', int, 4, True), | |
('PositionSequenceTo', int, 4, True), | |
('AttributeID', str, 10, False), | |
('Value', str, 511, True), | |
('CalendarID', int, 10, True), | |
('CalendarCode', str, 4, True) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
] | |
} | |
} | |
FDXXXXXX = { | |
'HEADER': [ | |
('LineNumber', str, 8, False), | |
('LineVersionNumber', int, 10, False), | |
('OperatorOrganisationID', str, 6, False), | |
('DirectionID', str, 2, False), | |
('SubLineNumber', int, 8, False), | |
('NumTrips', int, 10, False) | |
], | |
'DATA': [ | |
('PositionSequenceStartStop', int, 4, False), | |
('StartStopID', int, 10, False), | |
('StartTime', str, 8, False), | |
('PositionSequenceDestinationStop', int, 4, False), | |
('DestinationStopID', int, 10, False), | |
('ArrivalTime', str, 6, True), | |
('VehicleTypeID', str, 10, True), | |
('NumTimeDemandTypes', int, 3, False), | |
('ExternalTripNumber', str, 10, True), | |
('CalendarDayTypesTimetable', str, 7, True), | |
('NumFollowingTrips', int, 5, False), | |
('TimeSpanFollowingTrips', str, 7, False), | |
('CalendarID', int, 10, True), | |
('ID', str, 10, True), | |
('Type', str, 3, True), | |
('InternationalTripID', str, 255, True), | |
('CalendarCode', str, 4, True) | |
], | |
'PRIMARY': { | |
'HEADER': [ | |
'LineNumber', | |
'LineVersionNumber', | |
'OperatorOrganisationID', | |
'DirectionID', | |
'SubLineNumber' | |
], | |
'DATA': [ | |
] | |
}, | |
'INCREMENTOR': 'NumTrips' | |
} | |
TIMETABLE = { | |
'DATA': [ | |
('ID', int, 10, False), | |
('DelivererID', str, 10, False), | |
('VersionNumber', int, 10, False), | |
('LineNumberText', str, 32, False), | |
('LineCourseText', str, 1000, False), | |
('LineDirectionCode', str, 1, False), | |
('OperatorOrganisationID', str, 6, False), | |
('LineNumber', str, 10, False), | |
('DirectionID', str, 2, False), | |
('LineVersionNumber', int, 10, False) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID', | |
'LineDirectionCode' | |
] | |
} | |
} | |
VEHICLE_TYPES = { | |
'DATA': [ | |
('ID', str, 10, False), | |
('VehicleTypeGroup', str, 32, True), | |
('VehicleTypeName', str, 50, True) | |
], | |
'PRIMARY': { | |
'DATA': [ | |
'ID' | |
] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment