Created
October 4, 2020 02:51
-
-
Save pjhoberman/531d36e301eda45eb8f34b43270b26e4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
keys = ['COUNTY', 'NAME', 'DIRECTIONS', 'IS_DROP_BOX', 'IS_EARLY_VOTING', 'ADDRESS LINE 1', 'ADDRESS LINE 2', 'CITY', 'STATE', 'ZIP', 'START TIME', 'END TIME', 'START DATE', 'END DATE'] | |
locations = [] | |
with open('co2.csv', 'r') as file: | |
reader = csv.reader(file) | |
previous_address = None | |
for i, line in enumerate(reader): | |
# print(i, line) | |
if i == 0: | |
continue | |
# print(line) | |
# print(f"{i} - {previous_address}") | |
location = dict(zip(keys, line)) | |
start_time = datetime.datetime.strptime( | |
f"{location['START DATE']} {location['START TIME']}", | |
'%m/%d/%Y %I:%M %p' | |
) | |
end_time = datetime.datetime.strptime( | |
f"{location['END DATE']} {location['END TIME']}", | |
'%m/%d/%Y %I:%M %p') | |
location['times'] = [{ | |
'drop_box': location['IS_DROP_BOX'], | |
'early_voting': location['IS_EARLY_VOTING'], | |
'start_time': location['START TIME'], | |
'end_time': location['END TIME'], | |
'start_date': location['START DATE'], | |
'end_date': location['END DATE'], | |
'start_datetime': start_time, | |
'end_datetime': end_time, | |
}] | |
location_test = [idx for idx, l in enumerate(locations) if | |
l.get('COUNTY', None) == location.get('COUNTY', None) and | |
l.get('ADDRESS LINE 1', None) == location.get('ADDRESS LINE 1', None)] | |
if location_test: | |
locations[location_test[0]]['times'].append({ | |
'drop_box': location['IS_DROP_BOX'], | |
'early_voting': location['IS_EARLY_VOTING'], | |
'start_time': location['START TIME'], | |
'end_time': location['END TIME'], | |
'start_date': location['START DATE'], | |
'end_date': location['END DATE'], | |
'start_datetime': start_time, | |
'end_datetime': end_time, | |
}) | |
else: | |
locations.append(location) | |
def actual_times(times): | |
if not times: | |
return [] | |
previous_start_time = None | |
previous_end_time = None | |
start_date = None | |
end_date = None | |
actuals = [] | |
# todo: check for skipped days | |
for time in times: | |
if not previous_start_time: | |
previous_start_time = time.get('start_time') | |
previous_end_time = time.get('end_time') | |
start_date = time.get('start_date') | |
end_date = time.get('end_date') | |
elif time.get('start_time') != previous_start_time or time.get('end_time') != previous_end_time: | |
if start_date == end_date: | |
actuals.append(f"{start_date}: {previous_start_time} - {previous_end_time}") | |
else: | |
actuals.append(f"{start_date} - {end_date}: {previous_start_time} - {previous_end_time}") | |
previous_start_time = time.get('start_time') | |
previous_end_time = time.get('end_time') | |
start_date = time.get('start_date') | |
end_date = time.get('end_date') | |
else: | |
end_date = time.get('end_date') | |
# capture last time | |
if time.get('start_time') != previous_start_time or time.get('end_time') != previous_end_time: | |
if time.get('start_date') == time.get('end_date'): | |
actuals.append(f"{time.get('start_date')}: {time.get('start_time')} - {time.get('end_time')}") | |
else: | |
actuals.append(f"{time.get('start_date')} - {time.get('end_date')}: {time.get('start_time')} - {time.get('end_time')}") | |
# and if the last time matches | |
else: | |
if start_date == time.get('end_date'): | |
actuals.append(f"{start_date}: {time.get('start_time')} - {time.get('end_time')}") | |
else: | |
actuals.append(f"{start_date} - {time.get('end_date')}: {time.get('start_time')} - {time.get('end_time')}") | |
return actuals | |
for i, location in enumerate(locations): | |
drop_box = [] | |
early = [] | |
nov3 = [] | |
for time in location.get('times', None): | |
if time.get('drop_box', None) is True or time.get('drop_box', None) == "TRUE": | |
drop_box.append(time) | |
elif time.get('early_voting', None) is True or time.get('early_voting', None) == "TRUE": | |
early.append(time) | |
else: | |
nov3.append(time) | |
drop_box = sorted(drop_box, key=lambda t: t.get('start_datetime')) | |
early = sorted(early, key=lambda t: t.get('start_datetime')) | |
nov3 = sorted(nov3, key=lambda t: t.get('start_datetime')) | |
drop_box = actual_times(drop_box) | |
early = actual_times(early) | |
nov3 = actual_times(nov3) | |
location['drop_box_hours'] = "\n".join(drop_box) | |
# f"{time.get('start_date', None)} - {time.get('end_date', None)}: {time.get('start_time', None)} - {time.get('end_time', None)}" | |
# for time in drop_box]) | |
location['early_voting_hours'] = "\n".join(early) | |
# f"{time.get('start_date', None)} - {time.get('end_date', None)}: {time.get('start_time', None)} - {time.get('end_time', None)}" | |
# for time in early]) | |
location['elect_day_hours'] = "\n".join(nov3) | |
# f"{time.get('start_date', None)} - {time.get('end_date', None)}: {time.get('start_time', None)} - {time.get('end_time', None)}" | |
# for time in nov3]) | |
location['all_hours'] = "" | |
if location['drop_box_hours']: | |
location['all_hours'] += "Drop Box Hours:\n" + location['drop_box_hours'] + "\n\n" | |
if location['early_voting_hours']: | |
location['all_hours'] += "Early Voting Hours:\n" + location['early_voting_hours'] + "\n\n" | |
if location['elect_day_hours']: | |
location['all_hours'] += "Election Day Hours:\n" + location['elect_day_hours'] + "\n\n" | |
# cleanup | |
for location in locations: | |
if location.get('DIRECTIONS', None): | |
location['NAME'] += f", {location['DIRECTIONS']}" | |
if location.get('ADDRESS LINE 2', None): | |
location['ADDRESS LINE 1'] += f", {location['ADDRESS LINE 2']}" | |
for key in ['DIRECTIONS', 'ADDRESS LINE 2', 'IS_DROP_BOX', 'IS_EARLY_VOTING', | |
'START TIME', 'END TIME', 'START DATE', 'END DATE', | |
'times', 'drop_box_hours', 'early_voting_hours', 'elect_day_hours']: | |
location.pop(key, None) | |
with open('co_out.csv', 'w', newline='') as output_file: | |
dict_writer = csv.DictWriter(output_file, locations[-1].keys()) | |
dict_writer.writeheader() | |
dict_writer.writerows(locations) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment